threader/reactor/
background.rs

1use super::{Handle, IoWaker, Reactor};
2use mio::{Evented, PollOpt, Ready, Registration, SetReadiness};
3use std::{
4    io,
5    sync::{
6        atomic::{AtomicBool, Ordering},
7        Arc,
8    },
9    thread::{self, JoinHandle},
10};
11
12/// A handle to a reactor running on a background thread. This
13/// can be used to ensure that the reactor is being constantly
14/// polled without blocking another thread.
15pub struct Background {
16    join_handle: Option<JoinHandle<io::Result<()>>>,
17    reactor_handle: Handle,
18    wakeup: SetReadiness,
19    shutdown: Arc<AtomicBool>,
20    _reg: Registration,
21}
22
23impl Background {
24    /// Creates a new background thread which polls the given
25    /// reactor. All errors returned during the creation of
26    /// the `ReactorThread` will be propagated.
27    pub fn new(mut reactor: Reactor) -> io::Result<Self> {
28        let (reg, wakeup) = Registration::new2();
29        let reactor_handle = reactor.handle();
30
31        reactor.register(&reg, Ready::readable(), PollOpt::edge())?;
32        let shutdown = Arc::new(AtomicBool::new(false));
33        let thread_shutdown = Arc::clone(&shutdown);
34
35        let join_handle = Some(thread::Builder::new().spawn(move || loop {
36            if thread_shutdown.load(Ordering::Relaxed) {
37                return Ok(());
38            } else {
39                reactor.poll(None)?;
40            }
41        })?);
42
43        Ok(Self {
44            join_handle,
45            reactor_handle,
46            wakeup,
47            shutdown,
48            _reg: reg,
49        })
50    }
51
52    /// Returns a handle to inner reactor.
53    pub fn handle(&self) -> Handle {
54        self.reactor_handle.clone()
55    }
56
57    /// Registers a new IO resource with this reactor.
58    pub fn register<E: Evented>(
59        &self,
60        resource: &E,
61        interest: Ready,
62        opts: PollOpt,
63    ) -> io::Result<Arc<IoWaker>> {
64        self.reactor_handle.register(resource, interest, opts)
65    }
66
67    pub fn reregister<E: Evented>(
68        &self,
69        resource: &E,
70        io_waker: &IoWaker,
71        interest: Ready,
72        opts: PollOpt,
73    ) -> io::Result<()> {
74        self.reactor_handle
75            .reregister(resource, io_waker, interest, opts)
76    }
77
78    /// Stops tracking notifications from the provided IO resource.
79    pub fn deregister<E: Evented>(&self, resource: &E, io_waker: &IoWaker) -> io::Result<()> {
80        self.reactor_handle.deregister(resource, io_waker)
81    }
82
83    /// Shuts down the thread where the reactor is being polled,
84    /// panicking if the reactor can not be woken up, and returning
85    /// any errors which happened in the thread where the reactor was
86    /// being polled.
87    pub fn shutdown_now(&mut self) -> io::Result<()> {
88        self.shutdown.store(true, Ordering::Relaxed);
89        self.wakeup
90            .set_readiness(Ready::readable())
91            .expect("could not set wakeup readiness");
92        self.join_handle
93            .take()
94            .unwrap()
95            .join()
96            .map_err(|_| io::Error::new(io::ErrorKind::Other, "reactor thread panicked"))?
97    }
98}
99
100impl Drop for Background {
101    fn drop(&mut self) {
102        let _ = self.shutdown_now();
103    }
104}