broker_tokio/io/
registration.rs

1use crate::io::driver::{platform, Direction, Handle};
2use crate::util::slab::Address;
3
4use mio::{self, Evented};
5use std::io;
6use std::task::{Context, Poll};
7
8cfg_io_driver! {
9    /// Associates an I/O resource with the reactor instance that drives it.
10    ///
11    /// A registration represents an I/O resource registered with a Reactor such
12    /// that it will receive task notifications on readiness. This is the lowest
13    /// level API for integrating with a reactor.
14    ///
15    /// The association between an I/O resource is made by calling [`new`]. Once
16    /// the association is established, it remains established until the
17    /// registration instance is dropped.
18    ///
19    /// A registration instance represents two separate readiness streams. One
20    /// for the read readiness and one for write readiness. These streams are
21    /// independent and can be consumed from separate tasks.
22    ///
23    /// **Note**: while `Registration` is `Sync`, the caller must ensure that
24    /// there are at most two tasks that use a registration instance
25    /// concurrently. One task for [`poll_read_ready`] and one task for
26    /// [`poll_write_ready`]. While violating this requirement is "safe" from a
27    /// Rust memory safety point of view, it will result in unexpected behavior
28    /// in the form of lost notifications and tasks hanging.
29    ///
30    /// ## Platform-specific events
31    ///
32    /// `Registration` also allows receiving platform-specific `mio::Ready`
33    /// events. These events are included as part of the read readiness event
34    /// stream. The write readiness event stream is only for `Ready::writable()`
35    /// events.
36    ///
37    /// [`new`]: #method.new
38    /// [`poll_read_ready`]: #method.poll_read_ready`]
39    /// [`poll_write_ready`]: #method.poll_write_ready`]
40    #[derive(Debug)]
41    pub struct Registration {
42        handle: Handle,
43        address: Address,
44    }
45}
46
47// ===== impl Registration =====
48
49impl Registration {
50    /// Register the I/O resource with the default reactor.
51    ///
52    /// # Return
53    ///
54    /// - `Ok` if the registration happened successfully
55    /// - `Err` if an error was encountered during registration
56    ///
57    ///
58    /// # Panics
59    ///
60    /// This function panics if thread-local runtime is not set.
61    ///
62    /// The runtime is usually set implicitly when this function is called
63    /// from a future driven by a tokio runtime, otherwise runtime can be set
64    /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
65    pub fn new<T>(io: &T) -> io::Result<Registration>
66    where
67        T: Evented,
68    {
69        let handle = Handle::current();
70        let address = if let Some(inner) = handle.inner() {
71            inner.add_source(io)?
72        } else {
73            return Err(io::Error::new(
74                io::ErrorKind::Other,
75                "failed to find event loop",
76            ));
77        };
78
79        Ok(Registration { handle, address })
80    }
81
82    /// Deregister the I/O resource from the reactor it is associated with.
83    ///
84    /// This function must be called before the I/O resource associated with the
85    /// registration is dropped.
86    ///
87    /// Note that deregistering does not guarantee that the I/O resource can be
88    /// registered with a different reactor. Some I/O resource types can only be
89    /// associated with a single reactor instance for their lifetime.
90    ///
91    /// # Return
92    ///
93    /// If the deregistration was successful, `Ok` is returned. Any calls to
94    /// `Reactor::turn` that happen after a successful call to `deregister` will
95    /// no longer result in notifications getting sent for this registration.
96    ///
97    /// `Err` is returned if an error is encountered.
98    pub fn deregister<T>(&mut self, io: &T) -> io::Result<()>
99    where
100        T: Evented,
101    {
102        let inner = match self.handle.inner() {
103            Some(inner) => inner,
104            None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
105        };
106        inner.deregister_source(io)
107    }
108
109    /// Poll for events on the I/O resource's read readiness stream.
110    ///
111    /// If the I/O resource receives a new read readiness event since the last
112    /// call to `poll_read_ready`, it is returned. If it has not, the current
113    /// task is notified once a new event is received.
114    ///
115    /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
116    /// the function will always return `Ready(HUP)`. This should be treated as
117    /// the end of the readiness stream.
118    ///
119    /// Ensure that [`register`] has been called first.
120    ///
121    /// # Return value
122    ///
123    /// There are several possible return values:
124    ///
125    /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
126    ///   a new readiness event. The readiness value is included.
127    ///
128    /// * `Poll::Pending` means that no new readiness events have been received
129    ///   since the last call to `poll_read_ready`.
130    ///
131    /// * `Poll::Ready(Err(err))` means that the registration has encountered an
132    ///   error. This error either represents a permanent internal error **or**
133    ///   the fact that [`register`] was not called first.
134    ///
135    /// [`register`]: #method.register
136    /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
137    ///
138    /// # Panics
139    ///
140    /// This function will panic if called from outside of a task context.
141    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
142        let v = self.poll_ready(Direction::Read, Some(cx))?;
143        match v {
144            Some(v) => Poll::Ready(Ok(v)),
145            None => Poll::Pending,
146        }
147    }
148
149    /// Consume any pending read readiness event.
150    ///
151    /// This function is identical to [`poll_read_ready`] **except** that it
152    /// will not notify the current task when a new event is received. As such,
153    /// it is safe to call this function from outside of a task context.
154    ///
155    /// [`poll_read_ready`]: #method.poll_read_ready
156    pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
157        self.poll_ready(Direction::Read, None)
158    }
159
160    /// Poll for events on the I/O resource's write readiness stream.
161    ///
162    /// If the I/O resource receives a new write readiness event since the last
163    /// call to `poll_write_ready`, it is returned. If it has not, the current
164    /// task is notified once a new event is received.
165    ///
166    /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
167    /// the function will always return `Ready(HUP)`. This should be treated as
168    /// the end of the readiness stream.
169    ///
170    /// Ensure that [`register`] has been called first.
171    ///
172    /// # Return value
173    ///
174    /// There are several possible return values:
175    ///
176    /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
177    ///   a new readiness event. The readiness value is included.
178    ///
179    /// * `Poll::Pending` means that no new readiness events have been received
180    ///   since the last call to `poll_write_ready`.
181    ///
182    /// * `Poll::Ready(Err(err))` means that the registration has encountered an
183    ///   error. This error either represents a permanent internal error **or**
184    ///   the fact that [`register`] was not called first.
185    ///
186    /// [`register`]: #method.register
187    /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
188    ///
189    /// # Panics
190    ///
191    /// This function will panic if called from outside of a task context.
192    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
193        let v = self.poll_ready(Direction::Write, Some(cx))?;
194        match v {
195            Some(v) => Poll::Ready(Ok(v)),
196            None => Poll::Pending,
197        }
198    }
199
200    /// Consume any pending write readiness event.
201    ///
202    /// This function is identical to [`poll_write_ready`] **except** that it
203    /// will not notify the current task when a new event is received. As such,
204    /// it is safe to call this function from outside of a task context.
205    ///
206    /// [`poll_write_ready`]: #method.poll_write_ready
207    pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
208        self.poll_ready(Direction::Write, None)
209    }
210
211    /// Poll for events on the I/O resource's `direction` readiness stream.
212    ///
213    /// If called with a task context, notify the task when a new event is
214    /// received.
215    fn poll_ready(
216        &self,
217        direction: Direction,
218        cx: Option<&mut Context<'_>>,
219    ) -> io::Result<Option<mio::Ready>> {
220        let inner = match self.handle.inner() {
221            Some(inner) => inner,
222            None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
223        };
224
225        // If the task should be notified about new events, ensure that it has
226        // been registered
227        if let Some(ref cx) = cx {
228            inner.register(self.address, direction, cx.waker().clone())
229        }
230
231        let mask = direction.mask();
232        let mask_no_hup = (mask - platform::hup()).as_usize();
233
234        let sched = inner.io_dispatch.get(self.address).unwrap();
235
236        // This consumes the current readiness state **except** for HUP. HUP is
237        // excluded because a) it is a final state and never transitions out of
238        // HUP and b) both the read AND the write directions need to be able to
239        // observe this state.
240        //
241        // If HUP were to be cleared when `direction` is `Read`, then when
242        // `poll_ready` is called again with a _`direction` of `Write`, the HUP
243        // state would not be visible.
244        let curr_ready = sched
245            .set_readiness(self.address, |curr| curr & (!mask_no_hup))
246            .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
247
248        let mut ready = mask & mio::Ready::from_usize(curr_ready);
249
250        if ready.is_empty() {
251            if let Some(cx) = cx {
252                // Update the task info
253                match direction {
254                    Direction::Read => sched.reader.register_by_ref(cx.waker()),
255                    Direction::Write => sched.writer.register_by_ref(cx.waker()),
256                }
257
258                // Try again
259                let curr_ready = sched
260                    .set_readiness(self.address, |curr| curr & (!mask_no_hup))
261                    .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
262                ready = mask & mio::Ready::from_usize(curr_ready);
263            }
264        }
265
266        if ready.is_empty() {
267            Ok(None)
268        } else {
269            Ok(Some(ready))
270        }
271    }
272}
273
274unsafe impl Send for Registration {}
275unsafe impl Sync for Registration {}
276
277impl Drop for Registration {
278    fn drop(&mut self) {
279        let inner = match self.handle.inner() {
280            Some(inner) => inner,
281            None => return,
282        };
283        inner.drop_source(self.address);
284    }
285}