Skip to main content

compio_runtime/future/
future.rs

1//! Future for submitting operations to the runtime.
2
3use std::{
4    future::Future,
5    marker::PhantomData,
6    pin::Pin,
7    task::{Context, Poll, Waker},
8};
9
10use compio_buf::BufResult;
11use compio_driver::{Extra, Key, OpCode, PushEntry};
12use futures_util::future::FusedFuture;
13
14use crate::{
15    CancelToken, Runtime,
16    waker::{get_ext, get_waker},
17};
18
19pub(crate) trait ContextExt {
20    /// Remove all wrapped [`ExtWaker`] and return the underlying waker.
21    ///
22    /// This is the same as calling [`Context::waker`] if the waker was never
23    /// wrapped.
24    fn get_waker(&self) -> &Waker;
25
26    /// Get the cancel token
27    fn get_cancel(&mut self) -> Option<&CancelToken>;
28
29    /// Set the ext data associated with the waker to an [`Extra`].
30    fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra>;
31}
32
33impl ContextExt for Context<'_> {
34    fn get_waker(&self) -> &Waker {
35        get_waker(self.waker())
36    }
37
38    fn get_cancel(&mut self) -> Option<&CancelToken> {
39        get_ext(self.waker())?.get_cancel()
40    }
41
42    fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra> {
43        let ext = get_ext(self.waker())?;
44        let mut extra = default();
45        ext.set_extra(&mut extra);
46        Some(extra)
47    }
48}
49
50pin_project_lite::pin_project! {
51    /// Returned [`Future`] for [`Runtime::submit`].
52    ///
53    /// When this is dropped and the operation hasn't finished yet, it will try to
54    /// cancel the operation.
55    ///
56    /// By default, this implements `Future<Output = BufResult<usize, T>>`. If
57    /// [`Extra`] is needed, call [`.with_extra()`] to get a `Submit<T, Extra>`
58    /// which implements `Future<Output = (BufResult<usize, T>, Extra)>`.
59    ///
60    /// [`.with_extra()`]: Submit::with_extra
61    pub struct Submit<T: OpCode, E = ()> {
62        runtime: Runtime,
63        state: Option<State<T, E>>,
64    }
65
66    impl<T: OpCode, E> PinnedDrop for Submit<T, E> {
67        fn drop(this: Pin<&mut Self>) {
68            let this = this.project();
69            if let Some(State::Submitted { key, .. }) = this.state.take() {
70                this.runtime.cancel(key);
71            }
72        }
73    }
74
75}
76
77enum State<T: OpCode, E> {
78    Idle { op: T },
79    Submitted { key: Key<T>, _p: PhantomData<E> },
80}
81
82impl<T: OpCode, E> State<T, E> {
83    fn submitted(key: Key<T>) -> Self {
84        State::Submitted {
85            key,
86            _p: PhantomData,
87        }
88    }
89}
90
91impl<T: OpCode> Submit<T, ()> {
92    pub(crate) fn new(runtime: Runtime, op: T) -> Self {
93        Submit {
94            runtime,
95            state: Some(State::Idle { op }),
96        }
97    }
98
99    /// Convert this future to one that returns [`Extra`] along with the result.
100    ///
101    /// This is useful if you need to access extra information provided by the
102    /// runtime upon completion of the operation.
103    pub fn with_extra(mut self) -> Submit<T, Extra> {
104        let runtime = self.runtime.clone();
105        let Some(state) = self.state.take() else {
106            return Submit {
107                runtime,
108                state: None,
109            };
110        };
111        let state = match state {
112            State::Submitted { key, .. } => State::Submitted {
113                key,
114                _p: PhantomData,
115            },
116            State::Idle { op } => State::Idle { op },
117        };
118        Submit {
119            runtime,
120            state: Some(state),
121        }
122    }
123}
124
125impl<T: OpCode + 'static> Future for Submit<T, ()> {
126    type Output = BufResult<usize, T>;
127
128    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
129        let this = self.project();
130
131        loop {
132            match this.state.take().expect("Cannot poll after ready") {
133                State::Submitted { key, .. } => match this.runtime.poll_task(cx.get_waker(), key) {
134                    PushEntry::Pending(key) => {
135                        *this.state = Some(State::submitted(key));
136                        return Poll::Pending;
137                    }
138                    PushEntry::Ready(res) => return Poll::Ready(res),
139                },
140                State::Idle { op } => {
141                    let extra = cx.as_extra(|| this.runtime.default_extra());
142                    match this.runtime.submit_raw(op, extra) {
143                        PushEntry::Pending(key) => {
144                            // TODO: Should we register it only the first time or every time it's
145                            // being polled?
146                            if let Some(cancel) = cx.get_cancel() {
147                                cancel.register(&key);
148                            };
149
150                            *this.state = Some(State::submitted(key))
151                        }
152                        PushEntry::Ready(res) => {
153                            return Poll::Ready(res);
154                        }
155                    }
156                }
157            }
158        }
159    }
160}
161
162impl<T: OpCode + 'static> Future for Submit<T, Extra> {
163    type Output = (BufResult<usize, T>, Extra);
164
165    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166        let this = self.project();
167
168        loop {
169            match this.state.take().expect("Cannot poll after ready") {
170                State::Submitted { key, .. } => {
171                    match this.runtime.poll_task_with_extra(cx.get_waker(), key) {
172                        PushEntry::Pending(key) => {
173                            *this.state = Some(State::submitted(key));
174                            return Poll::Pending;
175                        }
176                        PushEntry::Ready(res) => return Poll::Ready(res),
177                    }
178                }
179                State::Idle { op } => {
180                    let extra = cx.as_extra(|| this.runtime.default_extra());
181                    match this.runtime.submit_raw(op, extra) {
182                        PushEntry::Pending(key) => {
183                            if let Some(cancel) = cx.get_cancel() {
184                                cancel.register(&key);
185                            }
186
187                            *this.state = Some(State::submitted(key))
188                        }
189                        PushEntry::Ready(res) => {
190                            return Poll::Ready((res, this.runtime.default_extra()));
191                        }
192                    }
193                }
194            }
195        }
196    }
197}
198
199impl<T: OpCode, E> FusedFuture for Submit<T, E>
200where
201    Submit<T, E>: Future,
202{
203    fn is_terminated(&self) -> bool {
204        self.state.is_none()
205    }
206}