compio_runtime/runtime/
future.rs1use std::{
4 future::Future,
5 marker::PhantomData,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use compio_buf::BufResult;
11use compio_driver::{Extra, Key, OpCode, PushEntry};
12use futures_util::future::FusedFuture;
13
14use crate::runtime::Runtime;
15
16trait ContextExt {
17 fn as_extra(&mut self, extra: impl FnOnce() -> Extra) -> Option<Extra>;
18}
19
20#[cfg(feature = "future-combinator")]
21impl ContextExt for Context<'_> {
22 fn as_extra(&mut self, extra: impl FnOnce() -> Extra) -> Option<Extra> {
23 let ext = self.ext().downcast_mut::<crate::future::Ext>()?;
24 let mut extra = extra();
25 ext.set_extra(&mut extra);
26 Some(extra)
27 }
28}
29
30#[cfg(not(feature = "future-combinator"))]
31impl ContextExt for Context<'_> {
32 fn as_extra(&mut self, extra: impl FnOnce() -> Extra) -> Option<Extra> {
33 let _ = extra;
34 None
35 }
36}
37
38pub struct Submit<T: OpCode, E = ()> {
46 runtime: Runtime,
47 state: Option<State<T, E>>,
48}
49
50enum State<T: OpCode, E> {
51 Idle { op: T },
52 Submitted { key: Key<T>, _p: PhantomData<E> },
53}
54
55impl<T: OpCode, E> State<T, E> {
56 fn submitted(key: Key<T>) -> Self {
57 State::Submitted {
58 key,
59 _p: PhantomData,
60 }
61 }
62}
63
64impl<T: OpCode> Submit<T, ()> {
65 pub(crate) fn new(runtime: Runtime, op: T) -> Self {
66 Submit {
67 runtime,
68 state: Some(State::Idle { op }),
69 }
70 }
71
72 pub fn with_extra(mut self) -> Submit<T, Extra> {
77 let runtime = self.runtime.clone();
78 let Some(state) = self.state.take() else {
79 return Submit {
80 runtime,
81 state: None,
82 };
83 };
84 let state = match state {
85 State::Submitted { key, .. } => State::Submitted {
86 key,
87 _p: PhantomData,
88 },
89 State::Idle { op } => State::Idle { op },
90 };
91 Submit {
92 runtime,
93 state: Some(state),
94 }
95 }
96}
97
98impl<T: OpCode + 'static> Future for Submit<T, ()> {
99 type Output = BufResult<usize, T>;
100
101 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
102 let this = unsafe { self.get_unchecked_mut() };
103 loop {
104 match this.state.take().expect("Cannot poll after ready") {
105 State::Submitted { key, .. } => match this.runtime.poll_task(cx.waker(), key) {
106 PushEntry::Pending(key) => {
107 this.state = Some(State::submitted(key));
108 return Poll::Pending;
109 }
110 PushEntry::Ready(res) => return Poll::Ready(res),
111 },
112 State::Idle { op } => {
113 let extra = cx.as_extra(|| this.runtime.default_extra());
114 match this.runtime.submit_raw(op, extra) {
115 PushEntry::Pending(key) => this.state = Some(State::submitted(key)),
116 PushEntry::Ready(res) => {
117 return Poll::Ready(res);
118 }
119 }
120 }
121 }
122 }
123 }
124}
125
126impl<T: OpCode + 'static> Future for Submit<T, Extra> {
127 type Output = (BufResult<usize, T>, Extra);
128
129 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130 let this = unsafe { self.get_unchecked_mut() };
131 loop {
132 match this.state.take().expect("Cannot poll after ready") {
133 State::Submitted { key, .. } => match this.runtime.poll_task_with_extra(cx, key) {
134 PushEntry::Pending(key) => {
135 this.state = Some(State::submitted(key));
136 return Poll::Pending;
137 }
138 PushEntry::Ready(res) => return Poll::Ready(res),
139 },
140 State::Idle { op } => {
141 let extra = cx.as_extra(|| this.runtime.default_extra());
142 match this.runtime.submit_raw(op, extra) {
143 PushEntry::Pending(key) => this.state = Some(State::submitted(key)),
144 PushEntry::Ready(res) => {
145 return Poll::Ready((res, this.runtime.default_extra()));
146 }
147 }
148 }
149 }
150 }
151 }
152}
153
154impl<T: OpCode, E> FusedFuture for Submit<T, E>
155where
156 Submit<T, E>: Future,
157{
158 fn is_terminated(&self) -> bool {
159 self.state.is_none()
160 }
161}
162
163impl<T: OpCode, E> Drop for Submit<T, E> {
164 fn drop(&mut self) {
165 if let Some(State::Submitted { key, .. }) = self.state.take() {
166 self.runtime.cancel(key);
167 }
168 }
169}