monoio/driver/
scheduled_io.rs

1use std::task::{Context, Poll, Waker};
2
3use super::ready::{Direction, Ready};
4
5pub(crate) struct ScheduledIo {
6    readiness: Ready,
7
8    /// Waker used for AsyncRead.
9    reader: Option<Waker>,
10    /// Waker used for AsyncWrite.
11    writer: Option<Waker>,
12}
13
14impl Default for ScheduledIo {
15    #[inline]
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl ScheduledIo {
22    pub(crate) const fn new() -> Self {
23        Self {
24            readiness: Ready::EMPTY,
25            reader: None,
26            writer: None,
27        }
28    }
29
30    #[allow(unused)]
31    #[inline]
32    pub(crate) fn set_writable(&mut self) {
33        self.readiness |= Ready::WRITABLE;
34    }
35
36    #[inline]
37    pub(crate) fn set_readiness(&mut self, f: impl Fn(Ready) -> Ready) {
38        self.readiness = f(self.readiness);
39    }
40
41    #[inline]
42    pub(crate) fn wake(&mut self, ready: Ready) {
43        if ready.is_readable() {
44            if let Some(waker) = self.reader.take() {
45                waker.wake();
46            }
47        }
48        if ready.is_writable() {
49            if let Some(waker) = self.writer.take() {
50                waker.wake();
51            }
52        }
53    }
54
55    #[inline]
56    pub(crate) fn clear_readiness(&mut self, ready: Ready) {
57        self.readiness = self.readiness - ready;
58    }
59
60    #[allow(clippy::needless_pass_by_ref_mut)]
61    #[inline]
62    pub(crate) fn poll_readiness(
63        &mut self,
64        cx: &mut Context<'_>,
65        direction: Direction,
66    ) -> Poll<Ready> {
67        let ready = direction.mask() & self.readiness;
68        if !ready.is_empty() {
69            return Poll::Ready(ready);
70        }
71        self.set_waker(cx, direction);
72        Poll::Pending
73    }
74
75    #[inline]
76    pub(crate) fn set_waker(&mut self, cx: &mut Context<'_>, direction: Direction) {
77        let slot = match direction {
78            Direction::Read => &mut self.reader,
79            Direction::Write => &mut self.writer,
80        };
81        match slot {
82            Some(existing) => {
83                if !existing.will_wake(cx.waker()) {
84                    existing.clone_from(cx.waker());
85                }
86            }
87            None => {
88                *slot = Some(cx.waker().clone());
89            }
90        }
91    }
92}