reactor/
reactor.rs

1// Library for concurrent I/O resource management using reactor pattern.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Written in 2021-2023 by
6//     Dr. Maxim Orlovsky <orlovsky@ubideco.org>
7//     Alexis Sellier <alexis@cloudhead.io>
8//
9// Copyright 2022-2023 UBIDECO Institute, Switzerland
10// Copyright 2021 Alexis Sellier <alexis@cloudhead.io>
11//
12// Licensed under the Apache License, Version 2.0 (the "License");
13// you may not use this file except in compliance with the License.
14// You may obtain a copy of the License at
15//
16//     http://www.apache.org/licenses/LICENSE-2.0
17//
18// Unless required by applicable law or agreed to in writing, software
19// distributed under the License is distributed on an "AS IS" BASIS,
20// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
21// See the License for the specific language governing permissions and
22// limitations under the License.
23
24#![allow(unused_variables)] // because we need them for feature-gated logger
25
26use std::collections::HashMap;
27use std::fmt::{Debug, Display, Formatter};
28use std::os::unix::io::{AsRawFd, RawFd};
29use std::thread::JoinHandle;
30use std::time::Duration;
31use std::{io, thread};
32
33use crossbeam_channel as chan;
34
35use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend};
36use crate::resource::WriteError;
37use crate::{Resource, ResourceId, ResourceType, Timer, Timestamp, WriteAtomic};
38
39/// Maximum amount of time to wait for I/O.
40const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60);
41
42/// Reactor errors
43#[derive(Error, Display, From)]
44#[display(doc_comments)]
45pub enum Error<L: Resource, T: Resource> {
46    /// transport {0} got disconnected during poll operation.
47    ListenerDisconnect(ResourceId, L),
48
49    /// transport {0} got disconnected during poll operation.
50    TransportDisconnect(ResourceId, T),
51
52    /// polling multiple reactor has failed. Details: {0:?}
53    Poll(io::Error),
54}
55
56impl<L: Resource, T: Resource> Debug for Error<L, T> {
57    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { Display::fmt(self, f) }
58}
59
60/// Actions which can be provided to the reactor by the [`Handler`].
61///
62/// Reactor reads actions on each event loop using [`Handler`] iterator interface.
63#[derive(Display)]
64pub enum Action<L: Resource, T: Resource> {
65    /// Register a new listener resource for the reactor poll.
66    ///
67    /// Reactor can't instantiate the resource, like bind a network listener.
68    /// Reactor only can register already active resource for polling in the event loop.
69    #[display("register_listener")]
70    RegisterListener(L),
71
72    /// Register a new transport resource for the reactor poll.
73    ///
74    /// Reactor can't instantiate the resource, like open a file or establish network connection.
75    /// Reactor only can register already active resource for polling in the event loop.
76    #[display("register_transport")]
77    RegisterTransport(T),
78
79    /// Unregister listener resource from the reactor poll and handover it to the [`Handler`] via
80    /// [`Handler::handover_listener`].
81    ///
82    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
83    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
84    /// handled by the handler upon the handover event.
85    #[display("unregister_listener")]
86    UnregisterListener(ResourceId),
87
88    /// Unregister transport resource from the reactor poll and handover it to the [`Handler`] via
89    /// [`Handler::handover_transport`].
90    ///
91    /// When the resource is unregistered no action is performed, i.e. the file descriptor is not
92    /// closed, listener is not unbound, connections are not closed etc. All these actions must be
93    /// handled by the handler upon the handover event.
94    #[display("unregister_transport")]
95    UnregisterTransport(ResourceId),
96
97    /// Write the data to one of the transport resources using [`io::Write`].
98    #[display("send_to({0})")]
99    Send(ResourceId, Vec<u8>),
100
101    /// Set a new timer for a given duration from this moment.
102    ///
103    /// When the timer fires reactor will timeout poll syscall and call [`Handler::handle_timer`].
104    #[display("set_timer({0:?})")]
105    SetTimer(Duration),
106}
107
108/// A service which handles I/O events generated in the [`Reactor`].
109pub trait Handler: Send + Iterator<Item = Action<Self::Listener, Self::Transport>> {
110    /// Type for a listener resource.
111    ///
112    /// Listener resources are resources which may spawn more resources and can't be written to. A
113    /// typical example of a listener resource is a [`std::net::TcpListener`], however this may also
114    /// be a special form of a peripheral device or something else.
115    type Listener: Resource;
116
117    /// Type for a transport resource.
118    ///
119    /// Transport is a "full" resource which can be read from - and written to. Usual files, network
120    /// connections, database connections etc are all fall into this category.
121    type Transport: Resource;
122
123    /// A command which may be sent to the [`Handler`] from outside of the [`Reactor`], including
124    /// other threads.
125    ///
126    /// The handler object is owned by the reactor runtime and executes always in the context of the
127    /// reactor runtime thread. Thus, if other (micro)services within the app needs to communicate
128    /// to the handler they have to use this data type, which usually is an enumeration for a set of
129    /// commands supported by the handler.
130    ///
131    /// The commands are sent by using reactor [`Controller`] API.
132    type Command: Debug + Send;
133
134    /// Method called by the reactor on the start of each event loop once the poll has returned.
135    fn tick(&mut self, time: Timestamp);
136
137    /// Method called by the reactor when a previously set timeout is fired.
138    ///
139    /// Related: [`Action::SetTimer`].
140    fn handle_timer(&mut self);
141
142    /// Method called by the reactor upon an I/O event on a listener resource.
143    ///
144    /// Since listener doesn't support writing, it can be only a read event (indicating that a new
145    /// resource can be spawned from the listener).
146    fn handle_listener_event(
147        &mut self,
148        id: ResourceId,
149        event: <Self::Listener as Resource>::Event,
150        time: Timestamp,
151    );
152
153    /// Method called by the reactor upon I/O event on a transport resource.
154    fn handle_transport_event(
155        &mut self,
156        id: ResourceId,
157        event: <Self::Transport as Resource>::Event,
158        time: Timestamp,
159    );
160
161    /// Method called by the reactor when a given resource was successfully registered and provided
162    /// with a resource id.
163    ///
164    /// The resource id will be used later in [`Self::handle_listener_event`],
165    /// [`Self::handle_transport_event`], [`Self::handover_listener`] and [`handover_transport`]
166    /// calls to the handler.
167    fn handle_registered(&mut self, fd: RawFd, id: ResourceId, ty: ResourceType);
168
169    /// Method called by the reactor when a [`Self::Command`] is received for the [`Handler`].
170    ///
171    /// The commands are sent via [`Controller`] from outside of the reactor, including other
172    /// threads.
173    fn handle_command(&mut self, cmd: Self::Command);
174
175    /// Method called by the reactor on any kind of error during the event loop, including errors of
176    /// the poll syscall or I/O errors returned as a part of the poll result events.
177    ///
178    /// See [`enum@Error`] for the details on errors which may happen.
179    fn handle_error(&mut self, err: Error<Self::Listener, Self::Transport>);
180
181    /// Method called by the reactor upon receiving [`Action::UnregisterListener`].
182    ///
183    /// Passes the listener resource to the [`Handler`] when it is already not a part of the reactor
184    /// poll. From this point of time it is safe to send the resource to other threads (like
185    /// workers) or close the resource.
186    fn handover_listener(&mut self, id: ResourceId, listener: Self::Listener);
187
188    /// Method called by the reactor upon receiving [`Action::UnregisterTransport`].
189    ///
190    /// Passes the transport resource to the [`Handler`] when it is already not a part of the
191    /// reactor poll. From this point of time it is safe to send the resource to other threads
192    /// (like workers) or close the resource.
193    fn handover_transport(&mut self, id: ResourceId, transport: Self::Transport);
194}
195
196/// High-level reactor API wrapping reactor [`Runtime`] into a thread and providing basic thread
197/// management for it.
198///
199/// Apps running the [`Reactor`] can interface it and a [`Handler`] via use of the [`Controller`]
200/// API.
201pub struct Reactor<C, P: Poll> {
202    thread: JoinHandle<()>,
203    controller: Controller<C, <P::Waker as Waker>::Send>,
204}
205
206impl<C, P: Poll> Reactor<C, P> {
207    /// Creates new reactor using provided [`Poll`] engine and a service exposing [`Handler`] API to
208    /// the reactor.
209    ///
210    /// Both poll engine and the service are sent to the newly created reactor thread which runs the
211    /// reactor [`Runtime`].
212    ///
213    /// # Error
214    ///
215    /// Errors with a system/OS error if it was impossible to spawn a thread.
216    pub fn new<H: Handler<Command = C>>(service: H, poller: P) -> Result<Self, io::Error>
217    where
218        H: 'static,
219        P: 'static,
220        C: 'static + Send,
221    {
222        Reactor::with(service, poller, thread::Builder::new())
223    }
224
225    /// Creates new reactor using provided [`Poll`] engine and a service exposing [`Handler`] API to
226    /// the reactor.
227    ///
228    /// Similar to the [`Reactor::new`], but allows to specify the name for the reactor thread.
229    /// Both poll engine and the service are sent to the newly created reactor thread which runs the
230    /// reactor [`Runtime`].
231    ///
232    /// # Error
233    ///
234    /// Errors with a system/OS error if it was impossible to spawn a thread.
235    pub fn named<H: Handler<Command = C>>(
236        service: H,
237        poller: P,
238        thread_name: String,
239    ) -> Result<Self, io::Error>
240    where
241        H: 'static,
242        P: 'static,
243        C: 'static + Send,
244    {
245        Reactor::with(service, poller, thread::Builder::new().name(thread_name))
246    }
247
248    /// Creates new reactor using provided [`Poll`] engine and a service exposing [`Handler`] API to
249    /// the reactor.
250    ///
251    /// Similar to the [`Reactor::new`], but allows to fully customize how the reactor thread is
252    /// constructed. Both poll engine and the service are sent to the newly created reactor
253    /// thread which runs the reactor [`Runtime`].
254    ///
255    /// # Error
256    ///
257    /// Errors with a system/OS error if it was impossible to spawn a thread.
258    pub fn with<H: Handler<Command = C>>(
259        service: H,
260        mut poller: P,
261        builder: thread::Builder,
262    ) -> Result<Self, io::Error>
263    where
264        H: 'static,
265        P: 'static,
266        C: 'static + Send,
267    {
268        let (ctl_send, ctl_recv) = chan::unbounded();
269
270        let (waker_writer, waker_reader) = P::Waker::pair()?;
271
272        let controller = Controller {
273            ctl_send,
274            waker: waker_writer,
275        };
276
277        #[cfg(feature = "log")]
278        log::debug!(target: "reactor-controller", "Initializing reactor thread...");
279
280        let runtime_controller = controller.clone();
281        let thread = builder.spawn(move || {
282            #[cfg(feature = "log")]
283            log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd());
284            poller.register_waker(&waker_reader);
285
286            let runtime = Runtime {
287                service,
288                poller,
289                controller: runtime_controller,
290                ctl_recv,
291                listeners: empty!(),
292                transports: empty!(),
293                waker: waker_reader,
294                timeouts: Timer::new(),
295            };
296
297            #[cfg(feature = "log")]
298            log::info!(target: "reactor", "Entering reactor event loop");
299
300            runtime.run();
301        })?;
302
303        // Waking up to consume actions which were provided by the service on launch
304        controller.wake()?;
305        Ok(Self { thread, controller })
306    }
307
308    /// Provides a copy of a [`Controller`] object which exposes an API to the reactor and a service
309    /// running inside of its thread.
310    ///
311    /// See [`Handler::Command`] for the details.
312    pub fn controller(&self) -> Controller<C, <P::Waker as Waker>::Send> { self.controller.clone() }
313
314    /// Joins the reactor thread.
315    pub fn join(self) -> thread::Result<()> { self.thread.join() }
316}
317
318enum Ctl<C> {
319    Cmd(C),
320    Shutdown,
321}
322
323/// Control API to the service which is run inside a reactor.
324///
325/// The service is passed to the [`Reactor`] constructor as a parameter and also exposes [`Handler`]
326/// API to the reactor itself for receiving reactor-generated events. This API is used by the
327/// reactor to inform the service about incoming commands, sent via this [`Controller`] API (see
328/// [`Handler::Command`] for the details).
329pub struct Controller<C, W: WakerSend> {
330    ctl_send: chan::Sender<Ctl<C>>,
331    waker: W,
332}
333
334impl<C, W: WakerSend> Clone for Controller<C, W> {
335    fn clone(&self) -> Self {
336        Controller {
337            ctl_send: self.ctl_send.clone(),
338            waker: self.waker.clone(),
339        }
340    }
341}
342
343impl<C, W: WakerSend> Controller<C, W> {
344    /// Send a command to the service inside a [`Reactor`] or a reactor [`Runtime`].
345    #[allow(unused_mut)] // because of the `log` feature gate
346    pub fn cmd(&self, mut command: C) -> Result<(), io::Error>
347    where C: 'static {
348        #[cfg(feature = "log")]
349        {
350            use std::any::Any;
351
352            let cmd = Box::new(command);
353            let any = cmd as Box<dyn Any>;
354            let any = match any.downcast::<Box<dyn Debug>>() {
355                Err(any) => {
356                    log::debug!(target: "reactor-controller", "Sending command to the reactor");
357                    any
358                }
359                Ok(debug) => {
360                    log::debug!(target: "reactor-controller", "Sending command {debug:?} to the reactor");
361                    debug
362                }
363            };
364            command = *any.downcast().expect("from upcast");
365        }
366
367        self.ctl_send.send(Ctl::Cmd(command)).map_err(|_| io::ErrorKind::BrokenPipe)?;
368        self.wake()?;
369        Ok(())
370    }
371
372    /// Shutdown the reactor.
373    pub fn shutdown(self) -> Result<(), Self> {
374        #[cfg(feature = "log")]
375        log::info!(target: "reactor-controller", "Initiating reactor shutdown...");
376
377        let res1 = self.ctl_send.send(Ctl::Shutdown);
378        let res2 = self.wake();
379        res1.or(res2).map_err(|_| self)
380    }
381
382    fn wake(&self) -> io::Result<()> {
383        #[cfg(feature = "log")]
384        log::trace!(target: "reactor-controller", "Wakening the reactor");
385        self.waker.wake()
386    }
387}
388
389/// Internal [`Reactor`] runtime which is run in a dedicated thread.
390///
391/// Use this structure direactly only if you'd like to have the full control over the reactor
392/// thread.
393///
394/// This runtime structure **does not** spawns a thread and is **blocking**. It implements the
395/// actual reactor event loop.
396pub struct Runtime<H: Handler, P: Poll> {
397    service: H,
398    poller: P,
399    controller: Controller<H::Command, <P::Waker as Waker>::Send>,
400    ctl_recv: chan::Receiver<Ctl<H::Command>>,
401    listeners: HashMap<ResourceId, H::Listener>,
402    transports: HashMap<ResourceId, H::Transport>,
403    waker: <P::Waker as Waker>::Recv,
404    timeouts: Timer,
405}
406
407impl<H: Handler, P: Poll> Runtime<H, P> {
408    /// Creates new reactor runtime using provided [`Poll`] engine and a service exposing
409    /// [`Handler`] API to the reactor.
410    pub fn with(service: H, mut poller: P) -> io::Result<Self> {
411        let (ctl_send, ctl_recv) = chan::unbounded();
412
413        let (waker_writer, waker_reader) = P::Waker::pair()?;
414
415        #[cfg(feature = "log")]
416        log::debug!(target: "reactor", "Registering waker (fd {})", waker_reader.as_raw_fd());
417        poller.register_waker(&waker_reader);
418
419        let controller = Controller {
420            ctl_send,
421            waker: waker_writer,
422        };
423
424        Ok(Runtime {
425            service,
426            poller,
427            controller,
428            ctl_recv,
429            listeners: empty!(),
430            transports: empty!(),
431            waker: waker_reader,
432            timeouts: Timer::new(),
433        })
434    }
435
436    /// Provides a copy of a [`Controller`] object which exposes an API to the reactor and a service
437    /// running inside of its thread.
438    ///
439    /// See [`Handler::Command`] for the details.
440    pub fn controller(&self) -> Controller<H::Command, <P::Waker as Waker>::Send> {
441        self.controller.clone()
442    }
443
444    fn run(mut self) {
445        loop {
446            let before_poll = Timestamp::now();
447            let timeout = self.timeouts.next_expiring_from(before_poll).unwrap_or(WAIT_TIMEOUT);
448
449            for (id, res) in &self.listeners {
450                self.poller.set_interest(*id, res.interests());
451            }
452            for (id, res) in &self.transports {
453                self.poller.set_interest(*id, res.interests());
454            }
455
456            // Blocking
457            #[cfg(feature = "log")]
458            log::trace!(target: "reactor", "Polling with timeout {timeout:?}");
459
460            let res = self.poller.poll(Some(timeout));
461            let now = Timestamp::now();
462            self.service.tick(now);
463
464            // Nb. The way this is currently used basically ignores which keys have
465            // timed out. So as long as *something* timed out, we wake the service.
466            let timers_fired = self.timeouts.remove_expired_by(now);
467            if timers_fired > 0 {
468                #[cfg(feature = "log")]
469                log::trace!(target: "reactor", "Timer has fired");
470                self.service.handle_timer();
471            }
472
473            match res {
474                Ok(0) if timers_fired == 0 => {
475                    #[cfg(feature = "log")]
476                    log::trace!(target: "reactor", "Poll timeout; no I/O events had happened");
477                }
478                Err(err) => {
479                    #[cfg(feature = "log")]
480                    log::error!(target: "reactor", "Error during polling: {err}");
481                    self.service.handle_error(Error::Poll(err));
482                }
483                _ => {}
484            }
485
486            let awoken = self.handle_events(now);
487
488            // Process the commands only if we awaken by the waker
489            if awoken {
490                loop {
491                    match self.ctl_recv.try_recv() {
492                        Err(chan::TryRecvError::Empty) => break,
493                        Err(chan::TryRecvError::Disconnected) => {
494                            panic!("control channel is broken")
495                        }
496                        Ok(Ctl::Shutdown) => return self.handle_shutdown(),
497                        Ok(Ctl::Cmd(cmd)) => self.service.handle_command(cmd),
498                    }
499                }
500            }
501
502            self.handle_actions(now);
503        }
504    }
505
506    /// # Returns
507    ///
508    /// Whether it was awakened by a waker
509    fn handle_events(&mut self, time: Timestamp) -> bool {
510        let mut awoken = false;
511
512        while let Some((id, res)) = self.poller.next() {
513            if id == ResourceId::WAKER {
514                if let Err(err) = res {
515                    #[cfg(feature = "log")]
516                    log::error!(target: "reactor", "Polling waker has failed: {err}");
517                    panic!("waker failure: {err}");
518                };
519
520                #[cfg(feature = "log")]
521                log::trace!(target: "reactor", "Awoken by the controller");
522
523                self.waker.reset();
524                awoken = true;
525            } else if self.listeners.contains_key(&id) {
526                match res {
527                    Ok(io) => {
528                        #[cfg(feature = "log")]
529                        log::trace!(target: "reactor", "Got `{io}` event from listener {id}");
530
531                        let listener = self.listeners.get_mut(&id).expect("resource disappeared");
532                        for io in io {
533                            if let Some(event) = listener.handle_io(io) {
534                                self.service.handle_listener_event(id, event, time);
535                            }
536                        }
537                    }
538                    Err(err) => {
539                        #[cfg(feature = "log")]
540                        log::trace!(target: "reactor", "Listener {id} {err}");
541                        let listener =
542                            self.unregister_listener(id).expect("listener has disappeared");
543                        self.service.handle_error(Error::ListenerDisconnect(id, listener));
544                    }
545                }
546            } else if self.transports.contains_key(&id) {
547                match res {
548                    Ok(io) => {
549                        #[cfg(feature = "log")]
550                        log::trace!(target: "reactor", "Got `{io}` event from transport {id}");
551
552                        let transport = self.transports.get_mut(&id).expect("resource disappeared");
553                        for io in io {
554                            if let Some(event) = transport.handle_io(io) {
555                                self.service.handle_transport_event(id, event, time);
556                            }
557                        }
558                    }
559                    Err(err) => {
560                        #[cfg(feature = "log")]
561                        log::trace!(target: "reactor", "Transport {id} {err}");
562                        let transport =
563                            self.unregister_transport(id).expect("transport has disappeared");
564                        self.service.handle_error(Error::TransportDisconnect(id, transport));
565                    }
566                }
567            } else {
568                panic!(
569                    "file descriptor in reactor which is not a known waker, listener or transport"
570                )
571            }
572        }
573
574        awoken
575    }
576
577    fn handle_actions(&mut self, time: Timestamp) {
578        while let Some(action) = self.service.next() {
579            #[cfg(feature = "log")]
580            log::trace!(target: "reactor", "Handling action {action} from the service");
581
582            // NB: Deadlock may happen here if the service will generate events over and over
583            // in the handle_* calls we may never get out of this loop
584            if let Err(err) = self.handle_action(action, time) {
585                #[cfg(feature = "log")]
586                log::error!(target: "reactor", "Error: {err}");
587                self.service.handle_error(err);
588            }
589        }
590    }
591
592    /// # Safety
593    ///
594    /// Panics on `Action::Send` for read-only resources or resources which are not ready for a
595    /// write operation (i.e. returning `false` from [`WriteAtomic::is_ready_to_write`]
596    /// implementation.
597    fn handle_action(
598        &mut self,
599        action: Action<H::Listener, H::Transport>,
600        time: Timestamp,
601    ) -> Result<(), Error<H::Listener, H::Transport>> {
602        match action {
603            Action::RegisterListener(listener) => {
604                let fd = listener.as_raw_fd();
605
606                #[cfg(feature = "log")]
607                log::debug!(target: "reactor", "Registering listener with fd={fd}");
608
609                let id = self.poller.register(&listener, IoType::read_only());
610                self.listeners.insert(id, listener);
611                self.service.handle_registered(fd, id, ResourceType::Listener);
612            }
613            Action::RegisterTransport(transport) => {
614                let fd = transport.as_raw_fd();
615
616                #[cfg(feature = "log")]
617                log::debug!(target: "reactor", "Registering transport with fd={fd}");
618
619                let id = self.poller.register(&transport, IoType::read_only());
620                self.transports.insert(id, transport);
621                self.service.handle_registered(fd, id, ResourceType::Transport);
622            }
623            Action::UnregisterListener(id) => {
624                let Some(listener) = self.unregister_listener(id) else {
625                    return Ok(());
626                };
627                #[cfg(feature = "log")]
628                log::debug!(target: "reactor", "Handling over listener {id}");
629                self.service.handover_listener(id, listener);
630            }
631            Action::UnregisterTransport(id) => {
632                let Some(transport) = self.unregister_transport(id) else {
633                    return Ok(());
634                };
635                #[cfg(feature = "log")]
636                log::debug!(target: "reactor", "Handling over transport {id}");
637                self.service.handover_transport(id, transport);
638            }
639            Action::Send(id, data) => {
640                #[cfg(feature = "log")]
641                log::trace!(target: "reactor", "Sending {} bytes to {id}", data.len());
642
643                let Some(transport) = self.transports.get_mut(&id) else {
644                    #[cfg(feature = "log")]
645                    log::error!(target: "reactor", "Transport {id} is not in the reactor");
646
647                    return Ok(());
648                };
649                match transport.write_atomic(&data) {
650                    Err(WriteError::NotReady) => {
651                        #[cfg(feature = "log")]
652                        log::error!(target: "reactor", internal = true; 
653                                "An attempt to write to transport {id} before it got ready");
654                        panic!(
655                            "application business logic error: write to transport {id} which is \
656                             read-only or not ready for a write operation"
657                        );
658                    }
659                    Err(WriteError::Io(e)) => {
660                        #[cfg(feature = "log")]
661                        log::error!(target: "reactor", "Fatal error writing to transport {id}, disconnecting. Error details: {e:?}");
662                        if let Some(transport) = self.unregister_transport(id) {
663                            return Err(Error::TransportDisconnect(id, transport));
664                        }
665                    }
666                    Ok(_) => {}
667                }
668            }
669            Action::SetTimer(duration) => {
670                #[cfg(feature = "log")]
671                log::debug!(target: "reactor", "Adding timer {duration:?} from now");
672
673                self.timeouts.set_timeout(duration, time);
674            }
675        }
676        Ok(())
677    }
678
679    fn handle_shutdown(self) {
680        #[cfg(feature = "log")]
681        log::info!(target: "reactor", "Shutdown");
682
683        // We just drop here?
684    }
685
686    fn unregister_listener(&mut self, id: ResourceId) -> Option<H::Listener> {
687        let Some(listener) = self.listeners.remove(&id) else {
688            #[cfg(feature = "log")]
689            log::warn!(target: "reactor", "Unregistering non-registered listener {id}");
690            return None;
691        };
692
693        #[cfg(feature = "log")]
694        log::debug!(target: "reactor", "Handling over listener {id} (fd={})", listener.as_raw_fd());
695
696        self.poller.unregister(id);
697
698        Some(listener)
699    }
700
701    fn unregister_transport(&mut self, id: ResourceId) -> Option<H::Transport> {
702        let Some(transport) = self.transports.remove(&id) else {
703            #[cfg(feature = "log")]
704            log::warn!(target: "reactor", "Unregistering non-registered transport {id}");
705            return None;
706        };
707
708        #[cfg(feature = "log")]
709        log::debug!(target: "reactor", "Unregistering over transport {id} (fd={})", transport.as_raw_fd());
710
711        self.poller.unregister(id);
712
713        Some(transport)
714    }
715}
716
717#[cfg(test)]
718mod test {
719    use std::io::stdout;
720    use std::thread::sleep;
721
722    use super::*;
723    use crate::{poller, Io};
724
725    pub struct DumbRes(Box<dyn AsRawFd + Send>);
726    impl DumbRes {
727        pub fn new() -> DumbRes { DumbRes(Box::new(stdout())) }
728    }
729    impl AsRawFd for DumbRes {
730        fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() }
731    }
732    impl io::Write for DumbRes {
733        fn write(&mut self, buf: &[u8]) -> io::Result<usize> { Ok(buf.len()) }
734        fn flush(&mut self) -> io::Result<()> { Ok(()) }
735    }
736    impl WriteAtomic for DumbRes {
737        fn is_ready_to_write(&self) -> bool { true }
738        fn empty_write_buf(&mut self) -> io::Result<bool> { Ok(true) }
739        fn write_or_buf(&mut self, _buf: &[u8]) -> io::Result<()> { Ok(()) }
740    }
741    impl Resource for DumbRes {
742        type Event = ();
743        fn interests(&self) -> IoType { IoType::read_write() }
744        fn handle_io(&mut self, _io: Io) -> Option<Self::Event> { None }
745    }
746
747    #[test]
748    fn timer() {
749        #[derive(Clone, Eq, PartialEq, Debug)]
750        enum Cmd {
751            Init,
752            Expect(Vec<Event>),
753        }
754        #[derive(Clone, Eq, PartialEq, Debug)]
755        enum Event {
756            Timer,
757        }
758        #[derive(Clone, Debug, Default)]
759        struct DumbService {
760            pub add_resource: bool,
761            pub set_timer: bool,
762            pub log: Vec<Event>,
763        }
764        impl Iterator for DumbService {
765            type Item = Action<DumbRes, DumbRes>;
766            fn next(&mut self) -> Option<Self::Item> {
767                if self.add_resource {
768                    self.add_resource = false;
769                    Some(Action::RegisterTransport(DumbRes::new()))
770                } else if self.set_timer {
771                    self.set_timer = false;
772                    Some(Action::SetTimer(Duration::from_millis(3)))
773                } else {
774                    None
775                }
776            }
777        }
778        impl Handler for DumbService {
779            type Listener = DumbRes;
780            type Transport = DumbRes;
781            type Command = Cmd;
782
783            fn tick(&mut self, _time: Timestamp) {}
784            fn handle_timer(&mut self) {
785                self.log.push(Event::Timer);
786                self.set_timer = true;
787            }
788            fn handle_listener_event(
789                &mut self,
790                _d: ResourceId,
791                _event: <Self::Listener as Resource>::Event,
792                _time: Timestamp,
793            ) {
794                unreachable!()
795            }
796            fn handle_transport_event(
797                &mut self,
798                _id: ResourceId,
799                _event: <Self::Transport as Resource>::Event,
800                _time: Timestamp,
801            ) {
802                unreachable!()
803            }
804            fn handle_registered(&mut self, _fd: RawFd, _id: ResourceId, _ty: ResourceType) {}
805            fn handle_command(&mut self, cmd: Self::Command) {
806                match cmd {
807                    Cmd::Init => {
808                        self.add_resource = true;
809                        self.set_timer = true;
810                    }
811                    Cmd::Expect(expected) => {
812                        assert_eq!(expected, self.log);
813                    }
814                }
815            }
816            fn handle_error(&mut self, err: Error<Self::Listener, Self::Transport>) {
817                panic!("{err}")
818            }
819            fn handover_listener(&mut self, _id: ResourceId, _listener: Self::Listener) {
820                unreachable!()
821            }
822            fn handover_transport(&mut self, _id: ResourceId, _transport: Self::Transport) {
823                unreachable!()
824            }
825        }
826
827        let reactor = Reactor::new(DumbService::default(), poller::popol::Poller::new()).unwrap();
828        reactor.controller().cmd(Cmd::Init).unwrap();
829        sleep(Duration::from_secs(2));
830        reactor.controller().cmd(Cmd::Expect(vec![Event::Timer; 6])).unwrap();
831    }
832}