broker_tokio/io/registration.rs
1use crate::io::driver::{platform, Direction, Handle};
2use crate::util::slab::Address;
3
4use mio::{self, Evented};
5use std::io;
6use std::task::{Context, Poll};
7
8cfg_io_driver! {
9 /// Associates an I/O resource with the reactor instance that drives it.
10 ///
11 /// A registration represents an I/O resource registered with a Reactor such
12 /// that it will receive task notifications on readiness. This is the lowest
13 /// level API for integrating with a reactor.
14 ///
15 /// The association between an I/O resource is made by calling [`new`]. Once
16 /// the association is established, it remains established until the
17 /// registration instance is dropped.
18 ///
19 /// A registration instance represents two separate readiness streams. One
20 /// for the read readiness and one for write readiness. These streams are
21 /// independent and can be consumed from separate tasks.
22 ///
23 /// **Note**: while `Registration` is `Sync`, the caller must ensure that
24 /// there are at most two tasks that use a registration instance
25 /// concurrently. One task for [`poll_read_ready`] and one task for
26 /// [`poll_write_ready`]. While violating this requirement is "safe" from a
27 /// Rust memory safety point of view, it will result in unexpected behavior
28 /// in the form of lost notifications and tasks hanging.
29 ///
30 /// ## Platform-specific events
31 ///
32 /// `Registration` also allows receiving platform-specific `mio::Ready`
33 /// events. These events are included as part of the read readiness event
34 /// stream. The write readiness event stream is only for `Ready::writable()`
35 /// events.
36 ///
37 /// [`new`]: #method.new
38 /// [`poll_read_ready`]: #method.poll_read_ready`]
39 /// [`poll_write_ready`]: #method.poll_write_ready`]
40 #[derive(Debug)]
41 pub struct Registration {
42 handle: Handle,
43 address: Address,
44 }
45}
46
47// ===== impl Registration =====
48
49impl Registration {
50 /// Register the I/O resource with the default reactor.
51 ///
52 /// # Return
53 ///
54 /// - `Ok` if the registration happened successfully
55 /// - `Err` if an error was encountered during registration
56 ///
57 ///
58 /// # Panics
59 ///
60 /// This function panics if thread-local runtime is not set.
61 ///
62 /// The runtime is usually set implicitly when this function is called
63 /// from a future driven by a tokio runtime, otherwise runtime can be set
64 /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
65 pub fn new<T>(io: &T) -> io::Result<Registration>
66 where
67 T: Evented,
68 {
69 let handle = Handle::current();
70 let address = if let Some(inner) = handle.inner() {
71 inner.add_source(io)?
72 } else {
73 return Err(io::Error::new(
74 io::ErrorKind::Other,
75 "failed to find event loop",
76 ));
77 };
78
79 Ok(Registration { handle, address })
80 }
81
82 /// Deregister the I/O resource from the reactor it is associated with.
83 ///
84 /// This function must be called before the I/O resource associated with the
85 /// registration is dropped.
86 ///
87 /// Note that deregistering does not guarantee that the I/O resource can be
88 /// registered with a different reactor. Some I/O resource types can only be
89 /// associated with a single reactor instance for their lifetime.
90 ///
91 /// # Return
92 ///
93 /// If the deregistration was successful, `Ok` is returned. Any calls to
94 /// `Reactor::turn` that happen after a successful call to `deregister` will
95 /// no longer result in notifications getting sent for this registration.
96 ///
97 /// `Err` is returned if an error is encountered.
98 pub fn deregister<T>(&mut self, io: &T) -> io::Result<()>
99 where
100 T: Evented,
101 {
102 let inner = match self.handle.inner() {
103 Some(inner) => inner,
104 None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
105 };
106 inner.deregister_source(io)
107 }
108
109 /// Poll for events on the I/O resource's read readiness stream.
110 ///
111 /// If the I/O resource receives a new read readiness event since the last
112 /// call to `poll_read_ready`, it is returned. If it has not, the current
113 /// task is notified once a new event is received.
114 ///
115 /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
116 /// the function will always return `Ready(HUP)`. This should be treated as
117 /// the end of the readiness stream.
118 ///
119 /// Ensure that [`register`] has been called first.
120 ///
121 /// # Return value
122 ///
123 /// There are several possible return values:
124 ///
125 /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
126 /// a new readiness event. The readiness value is included.
127 ///
128 /// * `Poll::Pending` means that no new readiness events have been received
129 /// since the last call to `poll_read_ready`.
130 ///
131 /// * `Poll::Ready(Err(err))` means that the registration has encountered an
132 /// error. This error either represents a permanent internal error **or**
133 /// the fact that [`register`] was not called first.
134 ///
135 /// [`register`]: #method.register
136 /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
137 ///
138 /// # Panics
139 ///
140 /// This function will panic if called from outside of a task context.
141 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
142 let v = self.poll_ready(Direction::Read, Some(cx))?;
143 match v {
144 Some(v) => Poll::Ready(Ok(v)),
145 None => Poll::Pending,
146 }
147 }
148
149 /// Consume any pending read readiness event.
150 ///
151 /// This function is identical to [`poll_read_ready`] **except** that it
152 /// will not notify the current task when a new event is received. As such,
153 /// it is safe to call this function from outside of a task context.
154 ///
155 /// [`poll_read_ready`]: #method.poll_read_ready
156 pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
157 self.poll_ready(Direction::Read, None)
158 }
159
160 /// Poll for events on the I/O resource's write readiness stream.
161 ///
162 /// If the I/O resource receives a new write readiness event since the last
163 /// call to `poll_write_ready`, it is returned. If it has not, the current
164 /// task is notified once a new event is received.
165 ///
166 /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
167 /// the function will always return `Ready(HUP)`. This should be treated as
168 /// the end of the readiness stream.
169 ///
170 /// Ensure that [`register`] has been called first.
171 ///
172 /// # Return value
173 ///
174 /// There are several possible return values:
175 ///
176 /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
177 /// a new readiness event. The readiness value is included.
178 ///
179 /// * `Poll::Pending` means that no new readiness events have been received
180 /// since the last call to `poll_write_ready`.
181 ///
182 /// * `Poll::Ready(Err(err))` means that the registration has encountered an
183 /// error. This error either represents a permanent internal error **or**
184 /// the fact that [`register`] was not called first.
185 ///
186 /// [`register`]: #method.register
187 /// [edge-triggered]: https://docs.rs/mio/0.6/mio/struct.Poll.html#edge-triggered-and-level-triggered
188 ///
189 /// # Panics
190 ///
191 /// This function will panic if called from outside of a task context.
192 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
193 let v = self.poll_ready(Direction::Write, Some(cx))?;
194 match v {
195 Some(v) => Poll::Ready(Ok(v)),
196 None => Poll::Pending,
197 }
198 }
199
200 /// Consume any pending write readiness event.
201 ///
202 /// This function is identical to [`poll_write_ready`] **except** that it
203 /// will not notify the current task when a new event is received. As such,
204 /// it is safe to call this function from outside of a task context.
205 ///
206 /// [`poll_write_ready`]: #method.poll_write_ready
207 pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
208 self.poll_ready(Direction::Write, None)
209 }
210
211 /// Poll for events on the I/O resource's `direction` readiness stream.
212 ///
213 /// If called with a task context, notify the task when a new event is
214 /// received.
215 fn poll_ready(
216 &self,
217 direction: Direction,
218 cx: Option<&mut Context<'_>>,
219 ) -> io::Result<Option<mio::Ready>> {
220 let inner = match self.handle.inner() {
221 Some(inner) => inner,
222 None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
223 };
224
225 // If the task should be notified about new events, ensure that it has
226 // been registered
227 if let Some(ref cx) = cx {
228 inner.register(self.address, direction, cx.waker().clone())
229 }
230
231 let mask = direction.mask();
232 let mask_no_hup = (mask - platform::hup()).as_usize();
233
234 let sched = inner.io_dispatch.get(self.address).unwrap();
235
236 // This consumes the current readiness state **except** for HUP. HUP is
237 // excluded because a) it is a final state and never transitions out of
238 // HUP and b) both the read AND the write directions need to be able to
239 // observe this state.
240 //
241 // If HUP were to be cleared when `direction` is `Read`, then when
242 // `poll_ready` is called again with a _`direction` of `Write`, the HUP
243 // state would not be visible.
244 let curr_ready = sched
245 .set_readiness(self.address, |curr| curr & (!mask_no_hup))
246 .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
247
248 let mut ready = mask & mio::Ready::from_usize(curr_ready);
249
250 if ready.is_empty() {
251 if let Some(cx) = cx {
252 // Update the task info
253 match direction {
254 Direction::Read => sched.reader.register_by_ref(cx.waker()),
255 Direction::Write => sched.writer.register_by_ref(cx.waker()),
256 }
257
258 // Try again
259 let curr_ready = sched
260 .set_readiness(self.address, |curr| curr & (!mask_no_hup))
261 .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
262 ready = mask & mio::Ready::from_usize(curr_ready);
263 }
264 }
265
266 if ready.is_empty() {
267 Ok(None)
268 } else {
269 Ok(Some(ready))
270 }
271 }
272}
273
274unsafe impl Send for Registration {}
275unsafe impl Sync for Registration {}
276
277impl Drop for Registration {
278 fn drop(&mut self) {
279 let inner = match self.handle.inner() {
280 Some(inner) => inner,
281 None => return,
282 };
283 inner.drop_source(self.address);
284 }
285}