Skip to main content

compio_runtime/future/
future.rs

1//! Future for submitting operations to the runtime.
2
3use std::{
4    cell::RefCell,
5    future::Future,
6    marker::PhantomData,
7    pin::Pin,
8    rc::Rc,
9    task::{Context, Poll, Waker},
10};
11
12use compio_buf::BufResult;
13use compio_driver::{Extra, Key, OpCode, Proactor, PushEntry};
14use futures_util::future::FusedFuture;
15
16use crate::{
17    CancelToken,
18    future::{poll_task, poll_task_with_extra, submit_raw},
19    waker::{get_ext, get_waker},
20};
21
22pub(crate) trait ContextExt {
23    /// Remove all wrapped [`ExtWaker`] and return the underlying waker.
24    ///
25    /// This is the same as calling [`Context::waker`] if the waker was never
26    /// wrapped.
27    fn get_waker(&self) -> &Waker;
28
29    /// Get the cancel token
30    fn get_cancel(&mut self) -> Option<&CancelToken>;
31
32    /// Set the ext data associated with the waker to an [`Extra`].
33    fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra>;
34}
35
36impl ContextExt for Context<'_> {
37    fn get_waker(&self) -> &Waker {
38        get_waker(self.waker())
39    }
40
41    fn get_cancel(&mut self) -> Option<&CancelToken> {
42        get_ext(self.waker())?.get_cancel()
43    }
44
45    fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra> {
46        let ext = get_ext(self.waker())?;
47        let mut extra = default();
48        ext.set_extra(&mut extra);
49        Some(extra)
50    }
51}
52
53pin_project_lite::pin_project! {
54    /// Returned [`Future`] for [`Runtime::submit`].
55    ///
56    /// When this is dropped and the operation hasn't finished yet, it will try to
57    /// cancel the operation.
58    ///
59    /// By default, this implements `Future<Output = BufResult<usize, T>>`. If
60    /// [`Extra`] is needed, call [`.with_extra()`] to get a `Submit<T, Extra>`
61    /// which implements `Future<Output = (BufResult<usize, T>, Extra)>`.
62    ///
63    /// [`.with_extra()`]: Submit::with_extra
64    pub struct Submit<T: OpCode, E = ()> {
65        driver: Rc<RefCell<Proactor>>,
66        state: Option<State<T, E>>,
67    }
68
69    impl<T: OpCode, E> PinnedDrop for Submit<T, E> {
70        fn drop(this: Pin<&mut Self>) {
71            let this = this.project();
72            if let Some(State::Submitted { key, .. }) = this.state.take() {
73                this.driver.borrow_mut().cancel(key);
74            }
75        }
76    }
77}
78
79enum State<T: OpCode, E> {
80    Idle { op: T },
81    Submitted { key: Key<T>, _p: PhantomData<E> },
82}
83
84impl<T: OpCode, E> State<T, E> {
85    fn submitted(key: Key<T>) -> Self {
86        State::Submitted {
87            key,
88            _p: PhantomData,
89        }
90    }
91}
92
93impl<T: OpCode> Submit<T, ()> {
94    pub(crate) fn new(driver: Rc<RefCell<Proactor>>, op: T) -> Self {
95        Submit {
96            driver,
97            state: Some(State::Idle { op }),
98        }
99    }
100
101    /// Convert this future to one that returns [`Extra`] along with the result.
102    ///
103    /// This is useful if you need to access extra information provided by the
104    /// runtime upon completion of the operation.
105    pub fn with_extra(mut self) -> Submit<T, Extra> {
106        let driver = self.driver.clone();
107        let Some(state) = self.state.take() else {
108            return Submit {
109                driver,
110                state: None,
111            };
112        };
113        let state = match state {
114            State::Submitted { key, .. } => State::Submitted {
115                key,
116                _p: PhantomData,
117            },
118            State::Idle { op } => State::Idle { op },
119        };
120        Submit {
121            driver,
122            state: Some(state),
123        }
124    }
125}
126
127impl<T: OpCode + 'static> Future for Submit<T, ()> {
128    type Output = BufResult<usize, T>;
129
130    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131        let this = self.project();
132
133        loop {
134            match this.state.take().expect("Cannot poll after ready") {
135                State::Submitted { key, .. } => {
136                    let entry = poll_task(&mut this.driver.borrow_mut(), cx.get_waker(), key);
137                    match entry {
138                        PushEntry::Pending(key) => {
139                            *this.state = Some(State::submitted(key));
140                            return Poll::Pending;
141                        }
142                        PushEntry::Ready(res) => return Poll::Ready(res),
143                    }
144                }
145                State::Idle { op } => {
146                    let extra = cx.as_extra(|| this.driver.borrow().default_extra());
147                    let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra);
148                    match entry {
149                        PushEntry::Pending(key) => {
150                            // TODO: Should we register it only the first time or every time it's
151                            // being polled?
152                            if let Some(cancel) = cx.get_cancel() {
153                                cancel.register(&key);
154                            };
155
156                            *this.state = Some(State::submitted(key))
157                        }
158                        PushEntry::Ready(res) => {
159                            return Poll::Ready(res);
160                        }
161                    }
162                }
163            }
164        }
165    }
166}
167
168impl<T: OpCode + 'static> Future for Submit<T, Extra> {
169    type Output = (BufResult<usize, T>, Extra);
170
171    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
172        let this = self.project();
173
174        loop {
175            match this.state.take().expect("Cannot poll after ready") {
176                State::Submitted { key, .. } => {
177                    let entry =
178                        poll_task_with_extra(&mut this.driver.borrow_mut(), cx.get_waker(), key);
179                    match entry {
180                        PushEntry::Pending(key) => {
181                            *this.state = Some(State::submitted(key));
182                            return Poll::Pending;
183                        }
184                        PushEntry::Ready(res) => return Poll::Ready(res),
185                    }
186                }
187                State::Idle { op } => {
188                    let extra = cx.as_extra(|| this.driver.borrow().default_extra());
189                    let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra);
190                    match entry {
191                        PushEntry::Pending(key) => {
192                            if let Some(cancel) = cx.get_cancel() {
193                                cancel.register(&key);
194                            }
195
196                            *this.state = Some(State::submitted(key))
197                        }
198                        PushEntry::Ready(res) => {
199                            return Poll::Ready((res, this.driver.borrow().default_extra()));
200                        }
201                    }
202                }
203            }
204        }
205    }
206}
207
208impl<T: OpCode, E> FusedFuture for Submit<T, E>
209where
210    Submit<T, E>: Future,
211{
212    fn is_terminated(&self) -> bool {
213        self.state.is_none()
214    }
215}