radicle-node 0.19.0

The Radicle Node
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
mod controller;
mod listener;
mod session;
mod timer;
mod token;
mod transport;

use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::io::ErrorKind;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use std::{io, thread};

use crossbeam_channel::{unbounded, Receiver, TryRecvError};
use mio::event::{Event, Source};
use mio::{Events, Interest, Poll, Waker};
use thiserror::Error;

use timer::Timer;
use token::WAKER;

use crate::wire;

pub(crate) use self::controller::{ControlMessage, Controller};
pub(crate) use listener::Listener;
pub use session::{NoiseSession, ProtocolArtifact, Socks5Session};
pub(crate) use token::{Token, Tokens};
pub(crate) use transport::{SessionEvent, Transport};

const SECONDS_IN_AN_HOUR: u64 = 60 * 60;

/// Maximum amount of time to wait for I/O.
const WAIT_TIMEOUT: Duration = Duration::from_secs(SECONDS_IN_AN_HOUR);

/// Maximum duration to accept the service to spend handling events (and errors,
/// ticking, etc.) without warning. Set to log whenever the service becomes so
/// is so slow to respond that it would not be able to handle at least 10
/// "requests" per second, i.e. `1s / 10 = 100ms`.
const LAG_TIMEOUT: Duration = Duration::from_millis(100);

/// A resource which can be managed by the reactor.
pub trait EventHandler {
    /// The type of reactions which this resource may generate upon receiving
    /// I/O from the reactor via [`EventHandler::handle`]. These events are
    /// passed to the reactor [`crate::reactor::ReactionHandler`].
    type Reaction;

    /// Method informing the reactor which types of events this resource is subscribed for.
    fn interests(&self) -> Option<Interest>;

    /// Method called by the reactor when an I/O readiness event
    /// is received for this resource.
    fn handle(&mut self, event: &Event) -> Vec<Self::Reaction>;
}

/// The trait guarantees that the data are either written in full or, in case
/// of an error, none of the data is written. Types implementing the trait must
/// also guarantee that multiple attempts to write do not result in
/// data to be written out of the initial ordering.
pub trait WriteAtomic: std::io::Write {
    /// Atomic non-blocking I/O write operation, which must either write the whole buffer to a
    /// resource without blocking or fail.
    ///
    /// # Panics
    ///
    /// If [`WriteAtomic::write_or_buf`] returns an [`std::io::Error`] of kind
    /// [`ErrorKind::Interrupted`], [`ErrorKind::WouldBlock`], [`ErrorKind::WriteZero`].
    /// In this case, [`WriteAtomic::write_or_buf`] is expected to buffer.
    fn write_atomic(&mut self, buf: &[u8]) -> io::Result<()> {
        use ErrorKind::*;

        if !self.is_ready_to_write() {
            panic!("WriteAtomic::write_atomic was called when the resource is not ready to write");
        }

        let result = self.write_or_buf(buf);

        debug_assert!(
            !matches!(
                result.as_ref().err().map(|err| err.kind()),
                Some(Interrupted | WouldBlock | WriteZero)
            ),
            "WriteAtomic::write_or_buf must handle errors of kind {Interrupted:?}, {WouldBlock:?}, {WriteZero:?} by buffering",
        );

        result
    }

    /// Checks whether resource can be written to without blocking.
    fn is_ready_to_write(&self) -> bool;

    /// Writes to the resource in a non-blocking way, buffering the data if necessary,
    /// or failing with a system-level error.
    ///
    /// This method shouldn't be called directly; call [`WriteAtomic::write_atomic`] instead.
    ///
    /// The method must handle [`std::io::Error`] of kind
    /// [`ErrorKind::Interrupted`], [`ErrorKind::WouldBlock`], [`ErrorKind::WriteZero`].
    /// and buffer the data in such cases.
    fn write_or_buf(&mut self, buf: &[u8]) -> io::Result<()>;
}

/// Reactor errors
#[derive(Error)]
pub enum Error<L: EventHandler, T: EventHandler> {
    #[error("listener {0:?} got disconnected during poll operation")]
    ListenerDisconnect(Token, L),

    #[error("transport {0:?} got disconnected during poll operation")]
    TransportDisconnect(Token, T),

    #[error("registration of a resource has failed: {0}")]
    Poll(io::Error),

    #[error("registration of a resource has failed: {0}")]
    Registration(io::Error),
}

impl<L: EventHandler, T: EventHandler> Debug for Error<L, T> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        Display::fmt(self, f)
    }
}

/// Actions which can be provided to the [`Reactor`] by the [`ReactionHandler`].
///
/// Reactor reads actions on each event loop using [`ReactionHandler`] iterator interface.
pub enum Action<L, T> {
    /// Register a new listener resource for the reactor poll.
    ///
    /// Reactor can't instantiate the resource, like bind a network listener.
    /// Reactor only can register already active resource for polling in the event loop.
    RegisterListener(Token, L),

    /// Register a new transport resource for the reactor poll.
    ///
    /// Reactor can't instantiate the resource, like open a file or establish network connection.
    /// Reactor only can register already active resource for polling in the event loop.
    RegisterTransport(Token, T),

    /// Unregister listener resource from the reactor poll and handover it to the [`ReactionHandler`] via
    /// [`ReactionHandler::handover_listener`].
    ///
    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
    /// handled by the handler upon the handover event.
    #[allow(dead_code)] // For future use
    UnregisterListener(Token),

    /// Unregister transport resource from the reactor poll and handover it to the [`ReactionHandler`] via
    /// [`ReactionHandler::handover_transport`].
    ///
    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
    /// handled by the handler upon the handover event.
    UnregisterTransport(Token),

    /// Write the data to one of the transport resources using [`io::Write`].
    Send(Token, Vec<u8>),

    /// Set a new timer for a given duration from this moment.
    ///
    /// When the timer elapses, the reactor will timeout from poll and call
    /// [`ReactionHandler::timer_reacted`].
    SetTimer(Duration),
}

impl<L: EventHandler, T: EventHandler> Display for Action<L, T> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match self {
            Action::RegisterListener(token, _listener) => f
                .debug_struct("RegisterListener")
                .field("token", token)
                .field("listener", &"<omitted>")
                .finish(),
            Action::RegisterTransport(token, _transport) => f
                .debug_struct("RegisterTransport")
                .field("token", token)
                .field("transport", &"<omitted>")
                .finish(),
            Action::UnregisterListener(token) => f
                .debug_struct("UnregisterListener")
                .field("token", token)
                .finish(),
            Action::UnregisterTransport(token) => f
                .debug_struct("UnregisterTransport")
                .field("token", token)
                .finish(),
            Action::Send(token, _data) => f
                .debug_struct("Send")
                .field("token", token)
                .field("data", &"<omitted>")
                .finish(),
            Action::SetTimer(duration) => f
                .debug_struct("SetTimer")
                .field("duration", duration)
                .finish(),
        }
    }
}

/// A service which handles reactions to the events generated in the [`Reactor`].
pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::Transport>> {
    /// Type for a listener resource.
    ///
    /// Listener resources are resources which may spawn more resources and can't be written to. A
    /// typical example of a listener resource is a [`std::net::TcpListener`], however this may also
    /// be a special form of a peripheral device or something else.
    type Listener: EventHandler + Source + Send + Debug;

    /// Type for a transport resource.
    ///
    /// Transport is a "full" resource which can be read from - and written to. Usual files, network
    /// connections, database connections etc are all fall into this category.
    type Transport: EventHandler + Source + Send + Debug + WriteAtomic;

    /// Method called by the reactor on the start of each event loop once the poll has returned.
    fn tick(&mut self);

    /// Method called by the reactor when a previously set timeout is fired.
    ///
    /// Related: [`Action::SetTimer`].
    fn timer_reacted(&mut self);

    /// Method called by the reactor upon a reaction to an I/O event on a listener resource.
    ///
    /// Since listener doesn't support writing, it can be only a read event (indicating that a new
    /// resource can be spawned from the listener).
    fn listener_reacted(
        &mut self,
        token: Token,
        reaction: <Self::Listener as EventHandler>::Reaction,
        instant: Instant,
    );

    /// Method called by the reactor upon a reaction to an I/O event on a transport resource.
    fn transport_reacted(
        &mut self,
        token: Token,
        reaction: <Self::Transport as EventHandler>::Reaction,
        instant: Instant,
    );

    /// Method called by the reactor when a given resource was successfully registered
    /// for given token.
    ///
    /// The token will be used later in [`ReactionHandler::listener_reacted`]
    /// and [`ReactionHandler::handover_listener`] calls to the handler.
    fn listener_registered(&mut self, token: Token, listener: &Self::Listener);

    /// Method called by the reactor when a given resource was successfully registered
    /// for given token.
    ///
    /// The token will be used later in [`ReactionHandler::transport_reacted`],
    /// [`ReactionHandler::handover_transport`] calls to the handler.
    fn transport_registered(&mut self, token: Token, transport: &Self::Transport);

    /// Method called by the reactor when a command is received for the
    /// [`ReactionHandler`].
    ///
    /// The commands are sent via `Controller` from outside of the reactor, including other
    /// threads.
    fn handle_command(&mut self, cmd: wire::Control);

    /// Method called by the reactor on any kind of error during the event loop, including errors of
    /// the poll syscall or I/O errors returned as a part of the poll result events.
    ///
    /// See [`enum@Error`] for the details on errors which may happen.
    fn handle_error(&mut self, err: Error<Self::Listener, Self::Transport>);

    /// Method called by the reactor upon receiving [`Action::UnregisterListener`].
    ///
    /// Passes the listener resource to the [`ReactionHandler`] when it is already not a part of the reactor
    /// poll. From this point of time it is safe to send the resource to other threads (like
    /// workers) or close the resource.
    fn handover_listener(&mut self, token: Token, listener: Self::Listener);

    /// Method called by the reactor upon receiving [`Action::UnregisterTransport`].
    ///
    /// Passes the transport resource to the [`ReactionHandler`] when it is already not a part of the
    /// reactor poll. From this point of time it is safe to send the resource to other threads
    /// (like workers) or close the resource.
    fn handover_transport(&mut self, token: Token, transport: Self::Transport);
}

/// High-level reactor API wrapping reactor [`Runtime`] into a thread and providing basic thread
/// management for it.
///
/// Apps running the [`Reactor`] can interface it and a [`ReactionHandler`] via use of the `Controller`
/// API.
pub struct Reactor {
    thread: JoinHandle<()>,
    controller: Controller,
}

impl Reactor {
    /// Creates new reactor and a service exposing the [`ReactionHandler`] to
    /// the reactor.
    ///
    /// The service is sent to the newly created reactor thread which runs the
    /// reactor [`Runtime`].
    pub fn new<H>(service: H, thread_name: String) -> Result<Self, io::Error>
    where
        H: 'static + ReactionHandler,
    {
        let builder = thread::Builder::new().name(thread_name);
        let (sender, receiver) = unbounded();
        let poll = Poll::new()?;
        let controller = Controller::new(sender, Arc::new(Waker::new(poll.registry(), WAKER)?));

        log::debug!(target: "reactor-controller", "Initializing reactor thread...");
        let thread = builder.spawn(move || {
            let runtime = Runtime {
                service,
                poll,
                receiver,
                listeners: HashMap::new(),
                transports: HashMap::new(),
                timeouts: Timer::new(),
            };

            log::info!(target: "reactor", "Entering reactor event loop");

            runtime.run();
        })?;

        // Waking up to consume actions which were provided by the service on launch
        controller.wake()?;

        Ok(Self { thread, controller })
    }

    /// Provides a `Controller` that can be used to send events to
    /// [`ReactionHandler`] via self.
    pub fn controller(&self) -> Controller {
        self.controller.clone()
    }

    /// Joins the reactor thread.
    pub fn join(self) -> thread::Result<()> {
        self.thread.join()
    }
}

/// Internal [`Reactor`] runtime which is run in a dedicated thread.
///
/// This runtime structure *does not* spawn a thread and is *blocking*.
/// It implements the actual reactor event loop.
pub struct Runtime<H: ReactionHandler> {
    service: H,
    poll: Poll,
    receiver: Receiver<ControlMessage>,
    listeners: HashMap<Token, H::Listener>,
    transports: HashMap<Token, H::Transport>,
    timeouts: Timer,
}

impl<H: ReactionHandler> Runtime<H> {
    fn register_interests(&mut self) -> io::Result<()> {
        let registry = self.poll.registry();
        for (id, res) in self.listeners.iter_mut() {
            match res.interests() {
                None => registry.deregister(res)?,
                Some(interests) => registry.reregister(res, *id, interests)?,
            };
        }
        for (id, res) in self.transports.iter_mut() {
            match res.interests() {
                None => registry.deregister(res)?,
                Some(interests) => registry.reregister(res, *id, interests)?,
            };
        }
        Ok(())
    }

    fn run(mut self) {
        loop {
            let timeout = self
                .timeouts
                .next_expiring_from(Instant::now())
                .unwrap_or(WAIT_TIMEOUT);

            self.register_interests()
                .expect("registering interests must work to ensure correct operation");

            log::trace!(target: "reactor", "Polling with timeout {timeout:?}");

            let mut events = Events::with_capacity(1024);

            // Block and wait for I/O events, wake by other threads, or timeout.
            let res = self.poll.poll(&mut events, Some(timeout));

            // This instant allows to measure the time spent by the service
            // to handle the result of polling.
            let tick = Instant::now();

            // Inform the service that time has advanced.
            self.service.tick();

            // Inform the service about errors during polling.
            if let Err(err) = res {
                log::warn!(target: "reactor", "Failure during polling: {err}");
                self.service.handle_error(Error::Poll(err));
            }

            // Inform the service that some timers have reacted.
            // The way this is currently used basically ignores which
            // timers have expired. As long as *something* timed out,
            // the service is informed.
            let timers_fired = self.timeouts.remove_expired_by(tick);
            if timers_fired > 0 {
                log::trace!(target: "reactor", "Timer has fired");
                self.service.timer_reacted();
            }

            if self.handle_events(tick, events) {
                // If a wake event was emitted, eagerly consume all control messages.
                loop {
                    use ControlMessage::*;
                    use TryRecvError::*;

                    match self.receiver.try_recv() {
                        Ok(Command(cmd)) => self.service.handle_command(*cmd),
                        Ok(Shutdown) => return self.handle_shutdown(),
                        Err(Empty) => break,
                        Err(Disconnected) => panic!("control channel disconnected unexpectedly"),
                    }
                }
            }

            let duration = Instant::now().duration_since(tick);
            if duration > LAG_TIMEOUT {
                log::debug!(target: "reactor", "Service was busy {:?} which exceeds the timeout of {:?}", duration, LAG_TIMEOUT);
            }

            self.handle_actions(tick);
        }
    }

    /// # Returns
    ///
    /// Whether one of the events was originated from the waker.
    fn handle_events(&mut self, instant: Instant, events: Events) -> bool {
        log::trace!(target: "reactor", "Handling events");
        let mut awoken = false;
        let mut deregistered = Vec::new();

        for event in events.into_iter() {
            let token = event.token();

            if token == WAKER {
                log::trace!(target: "reactor", "Awoken by the controller");
                awoken = true;
            } else if self.listeners.contains_key(&token) {
                log::trace!(target: "reactor", token=token.0; "Event from listener with token {}: {:?}", token.0, event);
                if !event.is_error() {
                    let listener = self
                        .listeners
                        .get_mut(&token)
                        .expect("resource disappeared");
                    listener
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
                            self.service.listener_reacted(token, service_event, instant);
                        });
                } else {
                    let listener = self.deregister_listener(token).unwrap_or_else(|| {
                        panic!("listener with token {} has disappeared", token.0)
                    });
                    self.service
                        .handle_error(Error::ListenerDisconnect(token, listener));
                    deregistered.push(token);
                }
            } else if self.transports.contains_key(&token) {
                log::trace!(target: "reactor", token=token.0; "Event from transport with token {}: {:?}", token.0, event);
                if !event.is_error() {
                    let transport = self
                        .transports
                        .get_mut(&token)
                        .expect("resource disappeared");
                    transport
                        .handle(event)
                        .into_iter()
                        .for_each(|service_event| {
                            self.service
                                .transport_reacted(token, service_event, instant);
                        });
                } else {
                    let transport = self.deregister_transport(token).unwrap_or_else(|| {
                        panic!("transport with token {} has disappeared", token.0)
                    });
                    self.service
                        .handle_error(Error::TransportDisconnect(token, transport));
                    deregistered.push(token);
                }
            } else if !deregistered.contains(&token) {
                log::debug!(target: "reactor", token=token.0; "Event from unknown token {}: {:?}", token.0, event);
            }
        }

        awoken
    }

    fn handle_actions(&mut self, instant: Instant) {
        while let Some(action) = self.service.next() {
            log::trace!(target: "reactor", "Handling action {action} from the service");

            // Deadlock may happen here if the service will generate events over and over
            // in the handle_* calls we may never get out of this loop
            if let Err(err) = self.handle_action(action, instant) {
                log::warn!(target: "reactor", "Failure: {err}");
                self.service.handle_error(err);
            }
        }
    }

    fn handle_action(
        &mut self,
        action: Action<H::Listener, H::Transport>,
        instant: Instant,
    ) -> Result<(), Error<H::Listener, H::Transport>> {
        match action {
            Action::RegisterListener(token, mut listener) => {
                log::trace!(target: "reactor", token=token.0; "Registering listener {:?} with token {}", listener, token.0);

                self.poll
                    .registry()
                    .register(&mut listener, token, Interest::READABLE)
                    .map_err(Error::Registration)?;
                self.listeners.insert(token, listener);
                self.service
                    .listener_registered(token, &self.listeners[&token]);
            }
            Action::RegisterTransport(token, mut transport) => {
                log::debug!(target: "reactor", token=token.0; "Registering transport");

                self.poll
                    .registry()
                    .register(&mut transport, token, Interest::READABLE)
                    .map_err(Error::Registration)?;
                self.transports.insert(token, transport);
                self.service
                    .transport_registered(token, &self.transports[&token]);
            }
            Action::UnregisterListener(token) => {
                let Some(listener) = self.deregister_listener(token) else {
                    return Ok(());
                };

                log::debug!(target: "reactor", token=token.0; "Handing over listener {listener:?} with token {}", token.0);
                self.service.handover_listener(token, listener);
            }
            Action::UnregisterTransport(token) => {
                let Some(transport) = self.deregister_transport(token) else {
                    return Ok(());
                };

                log::debug!(target: "reactor", token=token.0; "Handing over transport {transport:?} with token {}", token.0);
                self.service.handover_transport(token, transport);
            }
            Action::Send(token, data) => {
                log::trace!(target: "reactor", token=token.0; "Sending {} bytes to {token:?}", data.len());

                if let Some(transport) = self.transports.get_mut(&token) {
                    if let Err(e) = transport.write_atomic(&data) {
                        log::error!(target: "reactor", "Fatal error writing to transport {token:?}, disconnecting. Error details: {e:?}");
                        if let Some(transport) = self.deregister_transport(token) {
                            return Err(Error::TransportDisconnect(token, transport));
                        }
                    }
                } else {
                    log::debug!(target: "reactor", token=token.0; "No transport with token {token:?} is known!");
                }
            }
            Action::SetTimer(duration) => {
                log::trace!(target: "reactor", "Adding timer {duration:?} from now");

                self.timeouts.set_timeout(duration, instant);
            }
        }
        Ok(())
    }

    fn handle_shutdown(self) {
        log::info!(target: "reactor", "Shutdown");
    }

    fn deregister_listener(&mut self, token: Token) -> Option<H::Listener> {
        let Some(mut source) = self.listeners.remove(&token) else {
            log::debug!(target: "reactor", token=token.0; "Deregistering non-registered listener with token {}", token.0);
            return None;
        };

        if let Err(err) = self.poll.registry().deregister(&mut source) {
            log::debug!(target: "reactor", token=token.0; "Failed to deregister listener with token {} from mio: {err}", token.0);
        }

        Some(source)
    }

    fn deregister_transport(&mut self, token: Token) -> Option<H::Transport> {
        let Some(mut source) = self.transports.remove(&token) else {
            log::debug!(target: "reactor", token=token.0; "Deregistering non-registered transport with token {}", token.0);
            return None;
        };

        if let Err(err) = self.poll.registry().deregister(&mut source) {
            log::debug!(target: "reactor", token=token.0; "Failed to deregister transport with token {} from mio: {err}", token.0);
        }

        Some(source)
    }
}