tokio/runtime/driver/
op.rs1use crate::io::blocking::Buf;
2use crate::io::uring::open::Open;
3use crate::io::uring::read::Read;
4use crate::io::uring::utils::ArcFd;
5use crate::io::uring::write::Write;
6
7use crate::runtime::Handle;
8
9use io_uring::cqueue;
10use io_uring::squeue::Entry;
11use std::future::Future;
12use std::io::{self, Error};
13use std::mem;
14use std::os::fd::OwnedFd;
15use std::pin::Pin;
16use std::task::{Context, Poll, Waker};
17
18#[allow(dead_code)]
21#[derive(Debug)]
22pub(crate) enum CancelData {
23 Open(Open),
24 Write(Write),
25 ReadVec(Read<Vec<u8>, OwnedFd>),
26 ReadBuf(Read<Buf, ArcFd>),
27}
28
29#[derive(Debug)]
30pub(crate) enum Lifecycle {
31 Submitted,
33
34 Waiting(Waker),
36
37 Cancelled(
40 #[allow(dead_code)] CancelData,
43 ),
44
45 Completed(io_uring::cqueue::Entry),
47}
48
49pub(crate) enum State {
50 Initialize(Option<Entry>),
51 Polled(usize),
52 Complete,
53}
54
55pub(crate) struct Op<T: Cancellable> {
56 handle: Handle,
58 state: State,
60 data: Option<T>,
62}
63
64impl<T: Cancellable> Op<T> {
65 pub(crate) unsafe fn new(entry: Entry, data: T) -> Self {
70 let handle = Handle::current();
71 Self {
72 handle,
73 data: Some(data),
74 state: State::Initialize(Some(entry)),
75 }
76 }
77 pub(crate) fn take_data(&mut self) -> Option<T> {
78 self.data.take()
79 }
80}
81
82impl<T: Cancellable> Drop for Op<T> {
83 fn drop(&mut self) {
84 match self.state {
85 State::Complete => (),
87 State::Polled(index) => {
89 let data = self.take_data();
90 let handle = &mut self.handle;
91 handle.inner.driver().io().cancel_op(index, data);
92 }
93 State::Initialize(_) => (),
96 }
97 }
98}
99
100pub(crate) struct CqeResult {
102 pub(crate) result: io::Result<u32>,
103}
104
105impl From<cqueue::Entry> for CqeResult {
106 fn from(cqe: cqueue::Entry) -> Self {
107 let res = cqe.result();
108 let result = if res >= 0 {
109 Ok(res as u32)
110 } else {
111 Err(io::Error::from_raw_os_error(-res))
112 };
113 CqeResult { result }
114 }
115}
116
117pub(crate) trait Completable {
119 type Output;
120 fn complete(self, cqe: CqeResult) -> Self::Output;
121
122 fn complete_with_error(self, error: Error) -> Self::Output;
127}
128
129pub(crate) trait Cancellable {
131 fn cancel(self) -> CancelData;
132}
133
134impl<T: Cancellable> Unpin for Op<T> {}
135
136impl<T: Cancellable + Completable + Send> Future for Op<T> {
137 type Output = T::Output;
138
139 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
140 let this = self.get_mut();
141 let handle = &mut this.handle;
142 let driver = handle.inner.driver().io();
143
144 match &mut this.state {
145 State::Initialize(entry_opt) => {
146 let entry = entry_opt.take().expect("Entry must be present");
147 let waker = cx.waker().clone();
148
149 match unsafe { driver.register_op(entry, waker) } {
151 Ok(idx) => this.state = State::Polled(idx),
152 Err(err) => {
153 let data = this
154 .take_data()
155 .expect("Data must be present on Initialization");
156
157 this.state = State::Complete;
158
159 return Poll::Ready(data.complete_with_error(err));
160 }
161 };
162
163 Poll::Pending
164 }
165
166 State::Polled(idx) => {
167 let mut ctx = driver.get_uring().lock();
168 let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present");
169
170 match mem::replace(lifecycle, Lifecycle::Submitted) {
171 Lifecycle::Waiting(prev) if !prev.will_wake(cx.waker()) => {
173 let waker = cx.waker().clone();
174 *lifecycle = Lifecycle::Waiting(waker);
175 Poll::Pending
176 }
177
178 Lifecycle::Waiting(prev) => {
179 *lifecycle = Lifecycle::Waiting(prev);
180 Poll::Pending
181 }
182
183 Lifecycle::Completed(cqe) => {
184 ctx.remove_op(*idx);
186
187 this.state = State::Complete;
188
189 drop(ctx);
190
191 let data = this
192 .take_data()
193 .expect("Data must be present on completion");
194 Poll::Ready(data.complete(cqe.into()))
195 }
196
197 Lifecycle::Submitted => {
198 unreachable!("Submitted lifecycle should never be seen here");
199 }
200
201 Lifecycle::Cancelled(_) => {
202 unreachable!("Cancelled lifecycle should never be seen here");
203 }
204 }
205 }
206
207 State::Complete => {
208 panic!("Future polled after completion");
209 }
210 }
211 }
212}