Skip to main content

radicle_node/
reactor.rs

1mod controller;
2mod listener;
3mod session;
4mod timer;
5mod token;
6mod transport;
7
8use std::collections::HashMap;
9use std::fmt::{Debug, Display, Formatter};
10use std::io::ErrorKind;
11use std::sync::Arc;
12use std::thread::JoinHandle;
13use std::time::{Duration, Instant};
14use std::{io, thread};
15
16use crossbeam_channel::{unbounded, Receiver, TryRecvError};
17use mio::event::{Event, Source};
18use mio::{Events, Interest, Poll, Waker};
19use thiserror::Error;
20
21use timer::Timer;
22use token::WAKER;
23
24use crate::wire;
25
26pub(crate) use self::controller::{ControlMessage, Controller};
27pub(crate) use listener::Listener;
28pub use session::{NoiseSession, ProtocolArtifact, Socks5Session};
29pub(crate) use token::{Token, Tokens};
30pub(crate) use transport::{SessionEvent, Transport};
31
32const SECONDS_IN_AN_HOUR: u64 = 60 * 60;
33
34/// Maximum amount of time to wait for I/O.
35const WAIT_TIMEOUT: Duration = Duration::from_secs(SECONDS_IN_AN_HOUR);
36
37/// Maximum duration to accept the service to spend handling events (and errors,
38/// ticking, etc.) without warning. Set to log whenever the service becomes so
39/// is so slow to respond that it would not be able to handle at least 10
40/// "requests" per second, i.e. `1s / 10 = 100ms`.
41const LAG_TIMEOUT: Duration = Duration::from_millis(100);
42
43/// A resource which can be managed by the reactor.
44pub trait EventHandler {
45    /// The type of reactions which this resource may generate upon receiving
46    /// I/O from the reactor via [`EventHandler::handle`]. These events are
47    /// passed to the reactor [`crate::reactor::ReactionHandler`].
48    type Reaction;
49
50    /// Method informing the reactor which types of events this resource is subscribed for.
51    fn interests(&self) -> Option<Interest>;
52
53    /// Method called by the reactor when an I/O readiness event
54    /// is received for this resource.
55    fn handle(&mut self, event: &Event) -> Vec<Self::Reaction>;
56}
57
58/// The trait guarantees that the data are either written in full or, in case
59/// of an error, none of the data is written. Types implementing the trait must
60/// also guarantee that multiple attempts to write do not result in
61/// data to be written out of the initial ordering.
62pub trait WriteAtomic: std::io::Write {
63    /// Atomic non-blocking I/O write operation, which must either write the whole buffer to a
64    /// resource without blocking or fail.
65    ///
66    /// # Panics
67    ///
68    /// If [`WriteAtomic::write_or_buf`] returns an [`std::io::Error`] of kind
69    /// [`ErrorKind::Interrupted`], [`ErrorKind::WouldBlock`], [`ErrorKind::WriteZero`].
70    /// In this case, [`WriteAtomic::write_or_buf`] is expected to buffer.
71    fn write_atomic(&mut self, buf: &[u8]) -> io::Result<()> {
72        use ErrorKind::*;
73
74        if !self.is_ready_to_write() {
75            panic!("WriteAtomic::write_atomic was called when the resource is not ready to write");
76        }
77
78        let result = self.write_or_buf(buf);
79
80        debug_assert!(
81            !matches!(
82                result.as_ref().err().map(|err| err.kind()),
83                Some(Interrupted | WouldBlock | WriteZero)
84            ),
85            "WriteAtomic::write_or_buf must handle errors of kind {Interrupted:?}, {WouldBlock:?}, {WriteZero:?} by buffering",
86        );
87
88        result
89    }
90
91    /// Checks whether resource can be written to without blocking.
92    fn is_ready_to_write(&self) -> bool;
93
94    /// Writes to the resource in a non-blocking way, buffering the data if necessary,
95    /// or failing with a system-level error.
96    ///
97    /// This method shouldn't be called directly; call [`WriteAtomic::write_atomic`] instead.
98    ///
99    /// The method must handle [`std::io::Error`] of kind
100    /// [`ErrorKind::Interrupted`], [`ErrorKind::WouldBlock`], [`ErrorKind::WriteZero`].
101    /// and buffer the data in such cases.
102    fn write_or_buf(&mut self, buf: &[u8]) -> io::Result<()>;
103}
104
105/// Reactor errors
106#[derive(Error)]
107pub enum Error<L: EventHandler, T: EventHandler> {
108    #[error("listener {0:?} got disconnected during poll operation")]
109    ListenerDisconnect(Token, L),
110
111    #[error("transport {0:?} got disconnected during poll operation")]
112    TransportDisconnect(Token, T),
113
114    #[error("registration of a resource has failed: {0}")]
115    Poll(io::Error),
116
117    #[error("registration of a resource has failed: {0}")]
118    Registration(io::Error),
119}
120
121impl<L: EventHandler, T: EventHandler> Debug for Error<L, T> {
122    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
123        Display::fmt(self, f)
124    }
125}
126
127/// Actions which can be provided to the [`Reactor`] by the [`ReactionHandler`].
128///
129/// Reactor reads actions on each event loop using [`ReactionHandler`] iterator interface.
130pub enum Action<L, T> {
131    /// Register a new listener resource for the reactor poll.
132    ///
133    /// Reactor can't instantiate the resource, like bind a network listener.
134    /// Reactor only can register already active resource for polling in the event loop.
135    RegisterListener(Token, L),
136
137    /// Register a new transport resource for the reactor poll.
138    ///
139    /// Reactor can't instantiate the resource, like open a file or establish network connection.
140    /// Reactor only can register already active resource for polling in the event loop.
141    RegisterTransport(Token, T),
142
143    /// Unregister listener resource from the reactor poll and handover it to the [`ReactionHandler`] via
144    /// [`ReactionHandler::handover_listener`].
145    ///
146    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
147    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
148    /// handled by the handler upon the handover event.
149    #[allow(dead_code)] // For future use
150    UnregisterListener(Token),
151
152    /// Unregister transport resource from the reactor poll and handover it to the [`ReactionHandler`] via
153    /// [`ReactionHandler::handover_transport`].
154    ///
155    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
156    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
157    /// handled by the handler upon the handover event.
158    UnregisterTransport(Token),
159
160    /// Write the data to one of the transport resources using [`io::Write`].
161    Send(Token, Vec<u8>),
162
163    /// Set a new timer for a given duration from this moment.
164    ///
165    /// When the timer elapses, the reactor will timeout from poll and call
166    /// [`ReactionHandler::timer_reacted`].
167    SetTimer(Duration),
168}
169
170impl<L: EventHandler, T: EventHandler> Display for Action<L, T> {
171    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
172        match self {
173            Action::RegisterListener(token, _listener) => f
174                .debug_struct("RegisterListener")
175                .field("token", token)
176                .field("listener", &"<omitted>")
177                .finish(),
178            Action::RegisterTransport(token, _transport) => f
179                .debug_struct("RegisterTransport")
180                .field("token", token)
181                .field("transport", &"<omitted>")
182                .finish(),
183            Action::UnregisterListener(token) => f
184                .debug_struct("UnregisterListener")
185                .field("token", token)
186                .finish(),
187            Action::UnregisterTransport(token) => f
188                .debug_struct("UnregisterTransport")
189                .field("token", token)
190                .finish(),
191            Action::Send(token, _data) => f
192                .debug_struct("Send")
193                .field("token", token)
194                .field("data", &"<omitted>")
195                .finish(),
196            Action::SetTimer(duration) => f
197                .debug_struct("SetTimer")
198                .field("duration", duration)
199                .finish(),
200        }
201    }
202}
203
204/// A service which handles reactions to the events generated in the [`Reactor`].
205pub trait ReactionHandler: Send + Iterator<Item = Action<Self::Listener, Self::Transport>> {
206    /// Type for a listener resource.
207    ///
208    /// Listener resources are resources which may spawn more resources and can't be written to. A
209    /// typical example of a listener resource is a [`std::net::TcpListener`], however this may also
210    /// be a special form of a peripheral device or something else.
211    type Listener: EventHandler + Source + Send + Debug;
212
213    /// Type for a transport resource.
214    ///
215    /// Transport is a "full" resource which can be read from - and written to. Usual files, network
216    /// connections, database connections etc are all fall into this category.
217    type Transport: EventHandler + Source + Send + Debug + WriteAtomic;
218
219    /// Method called by the reactor on the start of each event loop once the poll has returned.
220    fn tick(&mut self);
221
222    /// Method called by the reactor when a previously set timeout is fired.
223    ///
224    /// Related: [`Action::SetTimer`].
225    fn timer_reacted(&mut self);
226
227    /// Method called by the reactor upon a reaction to an I/O event on a listener resource.
228    ///
229    /// Since listener doesn't support writing, it can be only a read event (indicating that a new
230    /// resource can be spawned from the listener).
231    fn listener_reacted(
232        &mut self,
233        token: Token,
234        reaction: <Self::Listener as EventHandler>::Reaction,
235        instant: Instant,
236    );
237
238    /// Method called by the reactor upon a reaction to an I/O event on a transport resource.
239    fn transport_reacted(
240        &mut self,
241        token: Token,
242        reaction: <Self::Transport as EventHandler>::Reaction,
243        instant: Instant,
244    );
245
246    /// Method called by the reactor when a given resource was successfully registered
247    /// for given token.
248    ///
249    /// The token will be used later in [`ReactionHandler::listener_reacted`]
250    /// and [`ReactionHandler::handover_listener`] calls to the handler.
251    fn listener_registered(&mut self, token: Token, listener: &Self::Listener);
252
253    /// Method called by the reactor when a given resource was successfully registered
254    /// for given token.
255    ///
256    /// The token will be used later in [`ReactionHandler::transport_reacted`],
257    /// [`ReactionHandler::handover_transport`] calls to the handler.
258    fn transport_registered(&mut self, token: Token, transport: &Self::Transport);
259
260    /// Method called by the reactor when a command is received for the
261    /// [`ReactionHandler`].
262    ///
263    /// The commands are sent via `Controller` from outside of the reactor, including other
264    /// threads.
265    fn handle_command(&mut self, cmd: wire::Control);
266
267    /// Method called by the reactor on any kind of error during the event loop, including errors of
268    /// the poll syscall or I/O errors returned as a part of the poll result events.
269    ///
270    /// See [`enum@Error`] for the details on errors which may happen.
271    fn handle_error(&mut self, err: Error<Self::Listener, Self::Transport>);
272
273    /// Method called by the reactor upon receiving [`Action::UnregisterListener`].
274    ///
275    /// Passes the listener resource to the [`ReactionHandler`] when it is already not a part of the reactor
276    /// poll. From this point of time it is safe to send the resource to other threads (like
277    /// workers) or close the resource.
278    fn handover_listener(&mut self, token: Token, listener: Self::Listener);
279
280    /// Method called by the reactor upon receiving [`Action::UnregisterTransport`].
281    ///
282    /// Passes the transport resource to the [`ReactionHandler`] when it is already not a part of the
283    /// reactor poll. From this point of time it is safe to send the resource to other threads
284    /// (like workers) or close the resource.
285    fn handover_transport(&mut self, token: Token, transport: Self::Transport);
286}
287
288/// High-level reactor API wrapping reactor [`Runtime`] into a thread and providing basic thread
289/// management for it.
290///
291/// Apps running the [`Reactor`] can interface it and a [`ReactionHandler`] via use of the `Controller`
292/// API.
293pub struct Reactor {
294    thread: JoinHandle<()>,
295    controller: Controller,
296}
297
298impl Reactor {
299    /// Creates new reactor and a service exposing the [`ReactionHandler`] to
300    /// the reactor.
301    ///
302    /// The service is sent to the newly created reactor thread which runs the
303    /// reactor [`Runtime`].
304    pub fn new<H>(service: H, thread_name: String) -> Result<Self, io::Error>
305    where
306        H: 'static + ReactionHandler,
307    {
308        let builder = thread::Builder::new().name(thread_name);
309        let (sender, receiver) = unbounded();
310        let poll = Poll::new()?;
311        let controller = Controller::new(sender, Arc::new(Waker::new(poll.registry(), WAKER)?));
312
313        log::debug!(target: "reactor-controller", "Initializing reactor thread...");
314        let thread = builder.spawn(move || {
315            let runtime = Runtime {
316                service,
317                poll,
318                receiver,
319                listeners: HashMap::new(),
320                transports: HashMap::new(),
321                timeouts: Timer::new(),
322            };
323
324            log::info!(target: "reactor", "Entering reactor event loop");
325
326            runtime.run();
327        })?;
328
329        // Waking up to consume actions which were provided by the service on launch
330        controller.wake()?;
331
332        Ok(Self { thread, controller })
333    }
334
335    /// Provides a `Controller` that can be used to send events to
336    /// [`ReactionHandler`] via self.
337    pub fn controller(&self) -> Controller {
338        self.controller.clone()
339    }
340
341    /// Joins the reactor thread.
342    pub fn join(self) -> thread::Result<()> {
343        self.thread.join()
344    }
345}
346
347/// Internal [`Reactor`] runtime which is run in a dedicated thread.
348///
349/// This runtime structure *does not* spawn a thread and is *blocking*.
350/// It implements the actual reactor event loop.
351pub struct Runtime<H: ReactionHandler> {
352    service: H,
353    poll: Poll,
354    receiver: Receiver<ControlMessage>,
355    listeners: HashMap<Token, H::Listener>,
356    transports: HashMap<Token, H::Transport>,
357    timeouts: Timer,
358}
359
360impl<H: ReactionHandler> Runtime<H> {
361    fn register_interests(&mut self) -> io::Result<()> {
362        let registry = self.poll.registry();
363        for (id, res) in self.listeners.iter_mut() {
364            match res.interests() {
365                None => registry.deregister(res)?,
366                Some(interests) => registry.reregister(res, *id, interests)?,
367            };
368        }
369        for (id, res) in self.transports.iter_mut() {
370            match res.interests() {
371                None => registry.deregister(res)?,
372                Some(interests) => registry.reregister(res, *id, interests)?,
373            };
374        }
375        Ok(())
376    }
377
378    fn run(mut self) {
379        loop {
380            let timeout = self
381                .timeouts
382                .next_expiring_from(Instant::now())
383                .unwrap_or(WAIT_TIMEOUT);
384
385            self.register_interests()
386                .expect("registering interests must work to ensure correct operation");
387
388            log::trace!(target: "reactor", "Polling with timeout {timeout:?}");
389
390            let mut events = Events::with_capacity(1024);
391
392            // Block and wait for I/O events, wake by other threads, or timeout.
393            let res = self.poll.poll(&mut events, Some(timeout));
394
395            // This instant allows to measure the time spent by the service
396            // to handle the result of polling.
397            let tick = Instant::now();
398
399            // Inform the service that time has advanced.
400            self.service.tick();
401
402            // Inform the service about errors during polling.
403            if let Err(err) = res {
404                log::warn!(target: "reactor", "Failure during polling: {err}");
405                self.service.handle_error(Error::Poll(err));
406            }
407
408            // Inform the service that some timers have reacted.
409            // The way this is currently used basically ignores which
410            // timers have expired. As long as *something* timed out,
411            // the service is informed.
412            let timers_fired = self.timeouts.remove_expired_by(tick);
413            if timers_fired > 0 {
414                log::trace!(target: "reactor", "Timer has fired");
415                self.service.timer_reacted();
416            }
417
418            if self.handle_events(tick, events) {
419                // If a wake event was emitted, eagerly consume all control messages.
420                loop {
421                    use ControlMessage::*;
422                    use TryRecvError::*;
423
424                    match self.receiver.try_recv() {
425                        Ok(Command(cmd)) => self.service.handle_command(*cmd),
426                        Ok(Shutdown) => return self.handle_shutdown(),
427                        Err(Empty) => break,
428                        Err(Disconnected) => panic!("control channel disconnected unexpectedly"),
429                    }
430                }
431            }
432
433            let duration = Instant::now().duration_since(tick);
434            if duration > LAG_TIMEOUT {
435                log::warn!(target: "reactor", "Service was busy {:?} which exceeds the timeout of {:?}", duration, LAG_TIMEOUT);
436            }
437
438            self.handle_actions(tick);
439        }
440    }
441
442    /// # Returns
443    ///
444    /// Whether one of the events was originated from the waker.
445    fn handle_events(&mut self, instant: Instant, events: Events) -> bool {
446        log::trace!(target: "reactor", "Handling events");
447        let mut awoken = false;
448        let mut deregistered = Vec::new();
449
450        for event in events.into_iter() {
451            let token = event.token();
452
453            if token == WAKER {
454                log::trace!(target: "reactor", "Awoken by the controller");
455                awoken = true;
456            } else if self.listeners.contains_key(&token) {
457                log::trace!(target: "reactor", token=token.0; "Event from listener with token {}: {:?}", token.0, event);
458                if !event.is_error() {
459                    let listener = self
460                        .listeners
461                        .get_mut(&token)
462                        .expect("resource disappeared");
463                    listener
464                        .handle(event)
465                        .into_iter()
466                        .for_each(|service_event| {
467                            self.service.listener_reacted(token, service_event, instant);
468                        });
469                } else {
470                    let listener = self.deregister_listener(token).unwrap_or_else(|| {
471                        panic!("listener with token {} has disappeared", token.0)
472                    });
473                    self.service
474                        .handle_error(Error::ListenerDisconnect(token, listener));
475                    deregistered.push(token);
476                }
477            } else if self.transports.contains_key(&token) {
478                log::trace!(target: "reactor", token=token.0; "Event from transport with token {}: {:?}", token.0, event);
479                if !event.is_error() {
480                    let transport = self
481                        .transports
482                        .get_mut(&token)
483                        .expect("resource disappeared");
484                    transport
485                        .handle(event)
486                        .into_iter()
487                        .for_each(|service_event| {
488                            self.service
489                                .transport_reacted(token, service_event, instant);
490                        });
491                } else {
492                    let transport = self.deregister_transport(token).unwrap_or_else(|| {
493                        panic!("transport with token {} has disappeared", token.0)
494                    });
495                    self.service
496                        .handle_error(Error::TransportDisconnect(token, transport));
497                    deregistered.push(token);
498                }
499            } else if !deregistered.contains(&token) {
500                log::debug!(target: "reactor", token=token.0; "Event from unknown token {}: {:?}", token.0, event);
501            }
502        }
503
504        awoken
505    }
506
507    fn handle_actions(&mut self, instant: Instant) {
508        while let Some(action) = self.service.next() {
509            log::trace!(target: "reactor", "Handling action {action} from the service");
510
511            // Deadlock may happen here if the service will generate events over and over
512            // in the handle_* calls we may never get out of this loop
513            if let Err(err) = self.handle_action(action, instant) {
514                log::warn!(target: "reactor", "Failure: {err}");
515                self.service.handle_error(err);
516            }
517        }
518    }
519
520    fn handle_action(
521        &mut self,
522        action: Action<H::Listener, H::Transport>,
523        instant: Instant,
524    ) -> Result<(), Error<H::Listener, H::Transport>> {
525        match action {
526            Action::RegisterListener(token, mut listener) => {
527                log::trace!(target: "reactor", token=token.0; "Registering listener {:?} with token {}", listener, token.0);
528
529                self.poll
530                    .registry()
531                    .register(&mut listener, token, Interest::READABLE)
532                    .map_err(Error::Registration)?;
533                self.listeners.insert(token, listener);
534                self.service
535                    .listener_registered(token, &self.listeners[&token]);
536            }
537            Action::RegisterTransport(token, mut transport) => {
538                log::debug!(target: "reactor", token=token.0; "Registering transport");
539
540                self.poll
541                    .registry()
542                    .register(&mut transport, token, Interest::READABLE)
543                    .map_err(Error::Registration)?;
544                self.transports.insert(token, transport);
545                self.service
546                    .transport_registered(token, &self.transports[&token]);
547            }
548            Action::UnregisterListener(token) => {
549                let Some(listener) = self.deregister_listener(token) else {
550                    return Ok(());
551                };
552
553                log::debug!(target: "reactor", token=token.0; "Handing over listener {listener:?} with token {}", token.0);
554                self.service.handover_listener(token, listener);
555            }
556            Action::UnregisterTransport(token) => {
557                let Some(transport) = self.deregister_transport(token) else {
558                    return Ok(());
559                };
560
561                log::debug!(target: "reactor", token=token.0; "Handing over transport {transport:?} with token {}", token.0);
562                self.service.handover_transport(token, transport);
563            }
564            Action::Send(token, data) => {
565                log::trace!(target: "reactor", token=token.0; "Sending {} bytes to {token:?}", data.len());
566
567                if let Some(transport) = self.transports.get_mut(&token) {
568                    if let Err(e) = transport.write_atomic(&data) {
569                        log::error!(target: "reactor", "Fatal error writing to transport {token:?}, disconnecting. Error details: {e:?}");
570                        if let Some(transport) = self.deregister_transport(token) {
571                            return Err(Error::TransportDisconnect(token, transport));
572                        }
573                    }
574                } else {
575                    log::debug!(target: "reactor", token=token.0; "No transport with token {token:?} is known!");
576                }
577            }
578            Action::SetTimer(duration) => {
579                log::trace!(target: "reactor", "Adding timer {duration:?} from now");
580
581                self.timeouts.set_timeout(duration, instant);
582            }
583        }
584        Ok(())
585    }
586
587    fn handle_shutdown(self) {
588        log::info!(target: "reactor", "Shutdown");
589    }
590
591    fn deregister_listener(&mut self, token: Token) -> Option<H::Listener> {
592        let Some(mut source) = self.listeners.remove(&token) else {
593            log::debug!(target: "reactor", token=token.0; "Deregistering non-registered listener with token {}", token.0);
594            return None;
595        };
596
597        if let Err(err) = self.poll.registry().deregister(&mut source) {
598            log::debug!(target: "reactor", token=token.0; "Failed to deregister listener with token {} from mio: {err}", token.0);
599        }
600
601        Some(source)
602    }
603
604    fn deregister_transport(&mut self, token: Token) -> Option<H::Transport> {
605        let Some(mut source) = self.transports.remove(&token) else {
606            log::debug!(target: "reactor", token=token.0; "Deregistering non-registered transport with token {}", token.0);
607            return None;
608        };
609
610        if let Err(err) = self.poll.registry().deregister(&mut source) {
611            log::debug!(target: "reactor", token=token.0; "Failed to deregister transport with token {} from mio: {err}", token.0);
612        }
613
614        Some(source)
615    }
616}