compio_runtime/future/
future.rs1use std::{
4 cell::RefCell,
5 future::Future,
6 marker::PhantomData,
7 pin::Pin,
8 rc::Rc,
9 task::{Context, Poll, Waker},
10};
11
12use compio_buf::BufResult;
13use compio_driver::{Extra, Key, OpCode, Proactor, PushEntry};
14use futures_util::future::FusedFuture;
15
16use crate::{
17 CancelToken,
18 future::{poll_task, poll_task_with_extra, submit_raw},
19 waker::{get_ext, get_waker},
20};
21
22pub(crate) trait ContextExt {
23 fn get_waker(&self) -> &Waker;
28
29 fn get_cancel(&mut self) -> Option<&CancelToken>;
31
32 fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra>;
34}
35
36impl ContextExt for Context<'_> {
37 fn get_waker(&self) -> &Waker {
38 get_waker(self.waker())
39 }
40
41 fn get_cancel(&mut self) -> Option<&CancelToken> {
42 get_ext(self.waker())?.get_cancel()
43 }
44
45 fn as_extra(&mut self, default: impl FnOnce() -> Extra) -> Option<Extra> {
46 let ext = get_ext(self.waker())?;
47 let mut extra = default();
48 ext.set_extra(&mut extra);
49 Some(extra)
50 }
51}
52
53pin_project_lite::pin_project! {
54 pub struct Submit<T: OpCode, E = ()> {
65 driver: Rc<RefCell<Proactor>>,
66 state: Option<State<T, E>>,
67 }
68
69 impl<T: OpCode, E> PinnedDrop for Submit<T, E> {
70 fn drop(this: Pin<&mut Self>) {
71 let this = this.project();
72 if let Some(State::Submitted { key, .. }) = this.state.take() {
73 this.driver.borrow_mut().cancel(key);
74 }
75 }
76 }
77}
78
79enum State<T: OpCode, E> {
80 Idle { op: T },
81 Submitted { key: Key<T>, _p: PhantomData<E> },
82}
83
84impl<T: OpCode, E> State<T, E> {
85 fn submitted(key: Key<T>) -> Self {
86 State::Submitted {
87 key,
88 _p: PhantomData,
89 }
90 }
91}
92
93impl<T: OpCode> Submit<T, ()> {
94 pub(crate) fn new(driver: Rc<RefCell<Proactor>>, op: T) -> Self {
95 Submit {
96 driver,
97 state: Some(State::Idle { op }),
98 }
99 }
100
101 pub fn with_extra(mut self) -> Submit<T, Extra> {
106 let driver = self.driver.clone();
107 let Some(state) = self.state.take() else {
108 return Submit {
109 driver,
110 state: None,
111 };
112 };
113 let state = match state {
114 State::Submitted { key, .. } => State::Submitted {
115 key,
116 _p: PhantomData,
117 },
118 State::Idle { op } => State::Idle { op },
119 };
120 Submit {
121 driver,
122 state: Some(state),
123 }
124 }
125}
126
127impl<T: OpCode + 'static> Future for Submit<T, ()> {
128 type Output = BufResult<usize, T>;
129
130 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131 let this = self.project();
132
133 loop {
134 match this.state.take().expect("Cannot poll after ready") {
135 State::Submitted { key, .. } => {
136 let entry = poll_task(&mut this.driver.borrow_mut(), cx.get_waker(), key);
137 match entry {
138 PushEntry::Pending(key) => {
139 *this.state = Some(State::submitted(key));
140 return Poll::Pending;
141 }
142 PushEntry::Ready(res) => return Poll::Ready(res),
143 }
144 }
145 State::Idle { op } => {
146 let extra = cx.as_extra(|| this.driver.borrow().default_extra());
147 let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra);
148 match entry {
149 PushEntry::Pending(key) => {
150 if let Some(cancel) = cx.get_cancel() {
153 cancel.register(&key);
154 };
155
156 *this.state = Some(State::submitted(key))
157 }
158 PushEntry::Ready(res) => {
159 return Poll::Ready(res);
160 }
161 }
162 }
163 }
164 }
165 }
166}
167
168impl<T: OpCode + 'static> Future for Submit<T, Extra> {
169 type Output = (BufResult<usize, T>, Extra);
170
171 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
172 let this = self.project();
173
174 loop {
175 match this.state.take().expect("Cannot poll after ready") {
176 State::Submitted { key, .. } => {
177 let entry =
178 poll_task_with_extra(&mut this.driver.borrow_mut(), cx.get_waker(), key);
179 match entry {
180 PushEntry::Pending(key) => {
181 *this.state = Some(State::submitted(key));
182 return Poll::Pending;
183 }
184 PushEntry::Ready(res) => return Poll::Ready(res),
185 }
186 }
187 State::Idle { op } => {
188 let extra = cx.as_extra(|| this.driver.borrow().default_extra());
189 let entry = submit_raw(&mut this.driver.borrow_mut(), op, extra);
190 match entry {
191 PushEntry::Pending(key) => {
192 if let Some(cancel) = cx.get_cancel() {
193 cancel.register(&key);
194 }
195
196 *this.state = Some(State::submitted(key))
197 }
198 PushEntry::Ready(res) => {
199 return Poll::Ready((res, this.driver.borrow().default_extra()));
200 }
201 }
202 }
203 }
204 }
205 }
206}
207
208impl<T: OpCode, E> FusedFuture for Submit<T, E>
209where
210 Submit<T, E>: Future,
211{
212 fn is_terminated(&self) -> bool {
213 self.state.is_none()
214 }
215}