tokio/runtime/driver/
op.rs1use crate::io::uring::open::Open;
2use crate::io::uring::write::Write;
3use crate::runtime::Handle;
4use io_uring::cqueue;
5use io_uring::squeue::Entry;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::Context;
9use std::task::Poll;
10use std::task::Waker;
11use std::{io, mem};
12
13#[allow(dead_code)]
16#[derive(Debug)]
17pub(crate) enum CancelData {
18 Open(Open),
19 Write(Write),
20}
21
22#[derive(Debug)]
23pub(crate) enum Lifecycle {
24 Submitted,
26
27 Waiting(Waker),
29
30 Cancelled(
33 #[allow(dead_code)] CancelData,
36 ),
37
38 Completed(io_uring::cqueue::Entry),
40}
41
42pub(crate) enum State {
43 Initialize(Option<Entry>),
44 Polled(usize),
45 Complete,
46}
47
48pub(crate) struct Op<T: Cancellable> {
49 handle: Handle,
51 state: State,
53 data: Option<T>,
55}
56
57impl<T: Cancellable> Op<T> {
58 pub(crate) unsafe fn new(entry: Entry, data: T) -> Self {
63 let handle = Handle::current();
64 Self {
65 handle,
66 data: Some(data),
67 state: State::Initialize(Some(entry)),
68 }
69 }
70 pub(crate) fn take_data(&mut self) -> Option<T> {
71 self.data.take()
72 }
73}
74
75impl<T: Cancellable> Drop for Op<T> {
76 fn drop(&mut self) {
77 match self.state {
78 State::Complete => (),
80 State::Polled(index) => {
82 let data = self.take_data();
83 let handle = &mut self.handle;
84 handle.inner.driver().io().cancel_op(index, data);
85 }
86 State::Initialize(_) => (),
89 }
90 }
91}
92
93pub(crate) struct CqeResult {
95 pub(crate) result: io::Result<u32>,
96}
97
98impl From<cqueue::Entry> for CqeResult {
99 fn from(cqe: cqueue::Entry) -> Self {
100 let res = cqe.result();
101 let result = if res >= 0 {
102 Ok(res as u32)
103 } else {
104 Err(io::Error::from_raw_os_error(-res))
105 };
106 CqeResult { result }
107 }
108}
109
110pub(crate) trait Completable {
112 type Output;
113 fn complete(self, cqe: CqeResult) -> io::Result<Self::Output>;
114}
115
116pub(crate) trait Cancellable {
118 fn cancel(self) -> CancelData;
119}
120
121impl<T: Cancellable> Unpin for Op<T> {}
122
123impl<T: Cancellable + Completable + Send> Future for Op<T> {
124 type Output = io::Result<T::Output>;
125
126 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
127 let this = self.get_mut();
128 let handle = &mut this.handle;
129 let driver = handle.inner.driver().io();
130
131 match &mut this.state {
132 State::Initialize(entry_opt) => {
133 let entry = entry_opt.take().expect("Entry must be present");
134 let waker = cx.waker().clone();
135 let idx = unsafe { driver.register_op(entry, waker)? };
137 this.state = State::Polled(idx);
138 Poll::Pending
139 }
140
141 State::Polled(idx) => {
142 let mut ctx = driver.get_uring().lock();
143 let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present");
144
145 match mem::replace(lifecycle, Lifecycle::Submitted) {
146 Lifecycle::Waiting(prev) if !prev.will_wake(cx.waker()) => {
148 let waker = cx.waker().clone();
149 *lifecycle = Lifecycle::Waiting(waker);
150 Poll::Pending
151 }
152
153 Lifecycle::Waiting(prev) => {
154 *lifecycle = Lifecycle::Waiting(prev);
155 Poll::Pending
156 }
157
158 Lifecycle::Completed(cqe) => {
159 ctx.remove_op(*idx);
161
162 this.state = State::Complete;
163
164 drop(ctx);
165
166 let data = this
167 .take_data()
168 .expect("Data must be present on completion");
169 Poll::Ready(data.complete(cqe.into()))
170 }
171
172 Lifecycle::Submitted => {
173 unreachable!("Submitted lifecycle should never be seen here");
174 }
175
176 Lifecycle::Cancelled(_) => {
177 unreachable!("Cancelled lifecycle should never be seen here");
178 }
179 }
180 }
181
182 State::Complete => {
183 panic!("Future polled after completion");
184 }
185 }
186 }
187}