seaslug/io/
completion.rs

1use std::{
2    future::Future,
3    io,
4    marker::PhantomData,
5    pin::Pin,
6    sync::{Arc, Condvar, Mutex},
7    task::{Context, Poll, Waker},
8};
9
10use super::{
11    io_uring::io_uring_cqe, FromCqe, Measure, Uring, M,
12};
13
14#[derive(Debug)]
15struct CompletionState {
16    done: bool,
17    item: Option<io::Result<io_uring_cqe>>,
18    waker: Option<Waker>,
19}
20
21impl Default for CompletionState {
22    fn default() -> CompletionState {
23        CompletionState {
24            done: false,
25            item: None,
26            waker: None,
27        }
28    }
29}
30
31/// A Future value which may or may not be filled
32///
33/// # Safety
34///
35/// To prevent undefined behavior in the form of
36/// use-after-free, never allow a Completion's
37/// lifetime to end without dropping it. This can
38/// happen with `std::mem::forget`, cycles in
39/// `Arc` or `Rc`, and in other ways.
40#[derive(Debug)]
41pub struct Completion<'a, C: FromCqe> {
42    lifetime: PhantomData<&'a C>,
43    mu: Arc<Mutex<CompletionState>>,
44    cv: Arc<Condvar>,
45    uring: &'a Uring,
46    pub(crate) sqe_id: u64,
47}
48
49/// The completer side of the Future
50#[derive(Debug)]
51pub struct Filler {
52    mu: Arc<Mutex<CompletionState>>,
53    cv: Arc<Condvar>,
54}
55
56/// Create a new `Filler` and the `Completion`
57/// that will be filled by its completion.
58pub fn pair<'a, C: FromCqe>(
59    uring: &'a Uring,
60) -> (Completion<'a, C>, Filler) {
61    let mu =
62        Arc::new(Mutex::new(CompletionState::default()));
63    let cv = Arc::new(Condvar::new());
64    let future = Completion {
65        lifetime: PhantomData,
66        mu: mu.clone(),
67        cv: cv.clone(),
68        sqe_id: 0,
69        uring,
70    };
71    let filler = Filler { mu, cv };
72
73    (future, filler)
74}
75
76impl<'a, C: FromCqe> Completion<'a, C> {
77    /// Block on the `Completion`'s completion
78    /// or dropping of the `Filler`
79    pub fn wait(self) -> io::Result<C>
80    where
81        C: FromCqe,
82    {
83        self.wait_inner().unwrap()
84    }
85
86    fn wait_inner(&self) -> Option<io::Result<C>>
87    where
88        C: FromCqe,
89    {
90        debug_assert_ne!(
91            self.sqe_id,
92            0,
93            "sqe_id was never filled-in for this Completion",
94        );
95
96        self.uring
97            .ensure_submitted(self.sqe_id)
98            .expect("failed to submit SQE from wait_inner");
99
100        let _ = Measure::new(&M.wait);
101
102        let mut inner = self.mu.lock().unwrap();
103
104        while !inner.done {
105            inner = self.cv.wait(inner).unwrap();
106        }
107
108        inner.item.take().map(|io_result| {
109            io_result.map(FromCqe::from_cqe)
110        })
111    }
112}
113
114impl<'a, C: FromCqe> Drop for Completion<'a, C> {
115    fn drop(&mut self) {
116        self.wait_inner();
117    }
118}
119
120impl<'a, C: FromCqe> Future for Completion<'a, C> {
121    type Output = io::Result<C>;
122
123    fn poll(
124        self: Pin<&mut Self>,
125        cx: &mut Context<'_>,
126    ) -> Poll<Self::Output> {
127        self.uring
128            .ensure_submitted(self.sqe_id)
129            .expect("failed to submit SQE from wait_inner");
130
131        let mut state = self.mu.lock().unwrap();
132        if state.item.is_some() {
133            Poll::Ready(
134                state
135                    .item
136                    .take()
137                    .unwrap()
138                    .map(FromCqe::from_cqe),
139            )
140        } else {
141            if !state.done {
142                state.waker = Some(cx.waker().clone());
143            }
144            Poll::Pending
145        }
146    }
147}
148
149impl Filler {
150    /// Complete the `Completion`
151    pub fn fill(self, inner: io::Result<io_uring_cqe>) {
152        let mut state = self.mu.lock().unwrap();
153
154        if let Some(waker) = state.waker.take() {
155            waker.wake();
156        }
157
158        state.item = Some(inner);
159        state.done = true;
160
161        self.cv.notify_all();
162    }
163}