Skip to main content

compio_runtime/runtime/
future.rs

1//! Future for submitting operations to the runtime.
2
3use std::{
4    future::Future,
5    marker::PhantomData,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use compio_buf::BufResult;
11use compio_driver::{Extra, Key, OpCode, PushEntry};
12use futures_util::future::FusedFuture;
13
14use crate::runtime::Runtime;
15
16trait ContextExt {
17    fn as_extra(&mut self, extra: impl FnOnce() -> Extra) -> Option<Extra>;
18}
19
20#[cfg(feature = "future-combinator")]
21impl ContextExt for Context<'_> {
22    fn as_extra(&mut self, extra: impl FnOnce() -> Extra) -> Option<Extra> {
23        let ext = self.ext().downcast_mut::<crate::future::Ext>()?;
24        let mut extra = extra();
25        ext.set_extra(&mut extra);
26        Some(extra)
27    }
28}
29
30#[cfg(not(feature = "future-combinator"))]
31impl ContextExt for Context<'_> {
32    fn as_extra(&mut self, extra: impl FnOnce() -> Extra) -> Option<Extra> {
33        let _ = extra;
34        None
35    }
36}
37
38/// Return type for `Runtime::submit`
39///
40/// By default, this implements `Future<Output = BufResult<usize, T>>`. If
41/// [`Extra`] is needed, call [`.with_extra()`] to get a `Submit<T, Extra>`
42/// which implements `Future<Output = (BufResult<usize, T>, Extra)>`.
43///
44/// [`.with_extra()`]: Submit::with_extra
45pub struct Submit<T: OpCode, E = ()> {
46    runtime: Runtime,
47    state: Option<State<T, E>>,
48}
49
50enum State<T: OpCode, E> {
51    Idle { op: T },
52    Submitted { key: Key<T>, _p: PhantomData<E> },
53}
54
55impl<T: OpCode, E> State<T, E> {
56    fn submitted(key: Key<T>) -> Self {
57        State::Submitted {
58            key,
59            _p: PhantomData,
60        }
61    }
62}
63
64impl<T: OpCode> Submit<T, ()> {
65    pub(crate) fn new(runtime: Runtime, op: T) -> Self {
66        Submit {
67            runtime,
68            state: Some(State::Idle { op }),
69        }
70    }
71
72    /// Convert this future to one that returns [`Extra`] along with the result.
73    ///
74    /// This is useful if you need to access extra information provided by the
75    /// runtime upon completion of the operation.
76    pub fn with_extra(mut self) -> Submit<T, Extra> {
77        let runtime = self.runtime.clone();
78        let Some(state) = self.state.take() else {
79            return Submit {
80                runtime,
81                state: None,
82            };
83        };
84        let state = match state {
85            State::Submitted { key, .. } => State::Submitted {
86                key,
87                _p: PhantomData,
88            },
89            State::Idle { op } => State::Idle { op },
90        };
91        Submit {
92            runtime,
93            state: Some(state),
94        }
95    }
96}
97
98impl<T: OpCode + 'static> Future for Submit<T, ()> {
99    type Output = BufResult<usize, T>;
100
101    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
102        let this = unsafe { self.get_unchecked_mut() };
103        loop {
104            match this.state.take().expect("Cannot poll after ready") {
105                State::Submitted { key, .. } => match this.runtime.poll_task(cx.waker(), key) {
106                    PushEntry::Pending(key) => {
107                        this.state = Some(State::submitted(key));
108                        return Poll::Pending;
109                    }
110                    PushEntry::Ready(res) => return Poll::Ready(res),
111                },
112                State::Idle { op } => {
113                    let extra = cx.as_extra(|| this.runtime.default_extra());
114                    match this.runtime.submit_raw(op, extra) {
115                        PushEntry::Pending(key) => this.state = Some(State::submitted(key)),
116                        PushEntry::Ready(res) => {
117                            return Poll::Ready(res);
118                        }
119                    }
120                }
121            }
122        }
123    }
124}
125
126impl<T: OpCode + 'static> Future for Submit<T, Extra> {
127    type Output = (BufResult<usize, T>, Extra);
128
129    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130        let this = unsafe { self.get_unchecked_mut() };
131        loop {
132            match this.state.take().expect("Cannot poll after ready") {
133                State::Submitted { key, .. } => match this.runtime.poll_task_with_extra(cx, key) {
134                    PushEntry::Pending(key) => {
135                        this.state = Some(State::submitted(key));
136                        return Poll::Pending;
137                    }
138                    PushEntry::Ready(res) => return Poll::Ready(res),
139                },
140                State::Idle { op } => {
141                    let extra = cx.as_extra(|| this.runtime.default_extra());
142                    match this.runtime.submit_raw(op, extra) {
143                        PushEntry::Pending(key) => this.state = Some(State::submitted(key)),
144                        PushEntry::Ready(res) => {
145                            return Poll::Ready((res, this.runtime.default_extra()));
146                        }
147                    }
148                }
149            }
150        }
151    }
152}
153
154impl<T: OpCode, E> FusedFuture for Submit<T, E>
155where
156    Submit<T, E>: Future,
157{
158    fn is_terminated(&self) -> bool {
159        self.state.is_none()
160    }
161}
162
163impl<T: OpCode, E> Drop for Submit<T, E> {
164    fn drop(&mut self) {
165        if let Some(State::Submitted { key, .. }) = self.state.take() {
166            self.runtime.cancel(key);
167        }
168    }
169}