compio_runtime/future/
future.rs1use std::{
4 future::Future,
5 marker::PhantomData,
6 pin::Pin,
7 task::{Context, Poll, Waker},
8};
9
10use compio_buf::BufResult;
11use compio_driver::{Extra, Key, OpCode, PushEntry};
12use futures_util::future::FusedFuture;
13
14use crate::{
15 CancelToken, Runtime,
16 waker::{get_ext, get_waker},
17};
18
19pub(crate) trait ContextExt {
20 fn get_waker(&self) -> &Waker;
25
26 fn get_cancel(&mut self) -> Option<&CancelToken>;
28
29 fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra>;
31}
32
33impl ContextExt for Context<'_> {
34 fn get_waker(&self) -> &Waker {
35 get_waker(self.waker())
36 }
37
38 fn get_cancel(&mut self) -> Option<&CancelToken> {
39 get_ext(self.waker())?.get_cancel()
40 }
41
42 fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra> {
43 let ext = get_ext(self.waker())?;
44 let mut extra = default();
45 ext.set_extra(&mut extra);
46 Some(extra)
47 }
48}
49
50pin_project_lite::pin_project! {
51 pub struct Submit<T: OpCode, E = ()> {
62 runtime: Runtime,
63 state: Option<State<T, E>>,
64 }
65
66 impl<T: OpCode, E> PinnedDrop for Submit<T, E> {
67 fn drop(this: Pin<&mut Self>) {
68 let this = this.project();
69 if let Some(State::Submitted { key, .. }) = this.state.take() {
70 this.runtime.cancel(key);
71 }
72 }
73 }
74
75}
76
77enum State<T: OpCode, E> {
78 Idle { op: T },
79 Submitted { key: Key<T>, _p: PhantomData<E> },
80}
81
82impl<T: OpCode, E> State<T, E> {
83 fn submitted(key: Key<T>) -> Self {
84 State::Submitted {
85 key,
86 _p: PhantomData,
87 }
88 }
89}
90
91impl<T: OpCode> Submit<T, ()> {
92 pub(crate) fn new(runtime: Runtime, op: T) -> Self {
93 Submit {
94 runtime,
95 state: Some(State::Idle { op }),
96 }
97 }
98
99 pub fn with_extra(mut self) -> Submit<T, Extra> {
104 let runtime = self.runtime.clone();
105 let Some(state) = self.state.take() else {
106 return Submit {
107 runtime,
108 state: None,
109 };
110 };
111 let state = match state {
112 State::Submitted { key, .. } => State::Submitted {
113 key,
114 _p: PhantomData,
115 },
116 State::Idle { op } => State::Idle { op },
117 };
118 Submit {
119 runtime,
120 state: Some(state),
121 }
122 }
123}
124
125impl<T: OpCode + 'static> Future for Submit<T, ()> {
126 type Output = BufResult<usize, T>;
127
128 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
129 let this = self.project();
130
131 loop {
132 match this.state.take().expect("Cannot poll after ready") {
133 State::Submitted { key, .. } => match this.runtime.poll_task(cx.get_waker(), key) {
134 PushEntry::Pending(key) => {
135 *this.state = Some(State::submitted(key));
136 return Poll::Pending;
137 }
138 PushEntry::Ready(res) => return Poll::Ready(res),
139 },
140 State::Idle { op } => {
141 let extra = cx.as_extra(|| this.runtime.default_extra());
142 match this.runtime.submit_raw(op, extra) {
143 PushEntry::Pending(key) => {
144 if let Some(cancel) = cx.get_cancel() {
147 cancel.register(&key);
148 };
149
150 *this.state = Some(State::submitted(key))
151 }
152 PushEntry::Ready(res) => {
153 return Poll::Ready(res);
154 }
155 }
156 }
157 }
158 }
159 }
160}
161
162impl<T: OpCode + 'static> Future for Submit<T, Extra> {
163 type Output = (BufResult<usize, T>, Extra);
164
165 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166 let this = self.project();
167
168 loop {
169 match this.state.take().expect("Cannot poll after ready") {
170 State::Submitted { key, .. } => {
171 match this.runtime.poll_task_with_extra(cx.get_waker(), key) {
172 PushEntry::Pending(key) => {
173 *this.state = Some(State::submitted(key));
174 return Poll::Pending;
175 }
176 PushEntry::Ready(res) => return Poll::Ready(res),
177 }
178 }
179 State::Idle { op } => {
180 let extra = cx.as_extra(|| this.runtime.default_extra());
181 match this.runtime.submit_raw(op, extra) {
182 PushEntry::Pending(key) => {
183 if let Some(cancel) = cx.get_cancel() {
184 cancel.register(&key);
185 }
186
187 *this.state = Some(State::submitted(key))
188 }
189 PushEntry::Ready(res) => {
190 return Poll::Ready((res, this.runtime.default_extra()));
191 }
192 }
193 }
194 }
195 }
196 }
197}
198
199impl<T: OpCode, E> FusedFuture for Submit<T, E>
200where
201 Submit<T, E>: Future,
202{
203 fn is_terminated(&self) -> bool {
204 self.state.is_none()
205 }
206}