tokio/runtime/driver/
op.rs

1use crate::io::uring::open::Open;
2use crate::io::uring::write::Write;
3use crate::runtime::Handle;
4use io_uring::cqueue;
5use io_uring::squeue::Entry;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::Context;
9use std::task::Poll;
10use std::task::Waker;
11use std::{io, mem};
12
13// This field isn't accessed directly, but it holds cancellation data,
14// so `#[allow(dead_code)]` is needed.
15#[allow(dead_code)]
16#[derive(Debug)]
17pub(crate) enum CancelData {
18    Open(Open),
19    Write(Write),
20}
21
22#[derive(Debug)]
23pub(crate) enum Lifecycle {
24    /// The operation has been submitted to uring and is currently in-flight
25    Submitted,
26
27    /// The submitter is waiting for the completion of the operation
28    Waiting(Waker),
29
30    /// The submitter no longer has interest in the operation result. The state
31    /// must be passed to the driver and held until the operation completes.
32    Cancelled(
33        // This field isn't accessed directly, but it holds cancellation data,
34        // so `#[allow(dead_code)]` is needed.
35        #[allow(dead_code)] CancelData,
36    ),
37
38    /// The operation has completed with a single cqe result
39    Completed(io_uring::cqueue::Entry),
40}
41
42pub(crate) enum State {
43    Initialize(Option<Entry>),
44    Polled(usize),
45    Complete,
46}
47
48pub(crate) struct Op<T: Cancellable> {
49    // Handle to the runtime
50    handle: Handle,
51    // State of this Op
52    state: State,
53    // Per operation data.
54    data: Option<T>,
55}
56
57impl<T: Cancellable> Op<T> {
58    /// # Safety
59    ///
60    /// Callers must ensure that parameters of the entry (such as buffer) are valid and will
61    /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
62    pub(crate) unsafe fn new(entry: Entry, data: T) -> Self {
63        let handle = Handle::current();
64        Self {
65            handle,
66            data: Some(data),
67            state: State::Initialize(Some(entry)),
68        }
69    }
70    pub(crate) fn take_data(&mut self) -> Option<T> {
71        self.data.take()
72    }
73}
74
75impl<T: Cancellable> Drop for Op<T> {
76    fn drop(&mut self) {
77        match self.state {
78            // We've already dropped this Op.
79            State::Complete => (),
80            // We will cancel this Op.
81            State::Polled(index) => {
82                let data = self.take_data();
83                let handle = &mut self.handle;
84                handle.inner.driver().io().cancel_op(index, data);
85            }
86            // This Op has not been polled yet.
87            // We don't need to do anything here.
88            State::Initialize(_) => (),
89        }
90    }
91}
92
93/// A single CQE result
94pub(crate) struct CqeResult {
95    pub(crate) result: io::Result<u32>,
96}
97
98impl From<cqueue::Entry> for CqeResult {
99    fn from(cqe: cqueue::Entry) -> Self {
100        let res = cqe.result();
101        let result = if res >= 0 {
102            Ok(res as u32)
103        } else {
104            Err(io::Error::from_raw_os_error(-res))
105        };
106        CqeResult { result }
107    }
108}
109
110/// A trait that converts a CQE result into a usable value for each operation.
111pub(crate) trait Completable {
112    type Output;
113    fn complete(self, cqe: CqeResult) -> io::Result<Self::Output>;
114}
115
116/// Extracts the `CancelData` needed to safely cancel an in-flight io_uring operation.
117pub(crate) trait Cancellable {
118    fn cancel(self) -> CancelData;
119}
120
121impl<T: Cancellable> Unpin for Op<T> {}
122
123impl<T: Cancellable + Completable + Send> Future for Op<T> {
124    type Output = io::Result<T::Output>;
125
126    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
127        let this = self.get_mut();
128        let handle = &mut this.handle;
129        let driver = handle.inner.driver().io();
130
131        match &mut this.state {
132            State::Initialize(entry_opt) => {
133                let entry = entry_opt.take().expect("Entry must be present");
134                let waker = cx.waker().clone();
135                // SAFETY: entry is valid for the entire duration of the operation
136                let idx = unsafe { driver.register_op(entry, waker)? };
137                this.state = State::Polled(idx);
138                Poll::Pending
139            }
140
141            State::Polled(idx) => {
142                let mut ctx = driver.get_uring().lock();
143                let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present");
144
145                match mem::replace(lifecycle, Lifecycle::Submitted) {
146                    // Only replace the stored waker if it wouldn't wake the new one
147                    Lifecycle::Waiting(prev) if !prev.will_wake(cx.waker()) => {
148                        let waker = cx.waker().clone();
149                        *lifecycle = Lifecycle::Waiting(waker);
150                        Poll::Pending
151                    }
152
153                    Lifecycle::Waiting(prev) => {
154                        *lifecycle = Lifecycle::Waiting(prev);
155                        Poll::Pending
156                    }
157
158                    Lifecycle::Completed(cqe) => {
159                        // Clean up and complete the future
160                        ctx.remove_op(*idx);
161
162                        this.state = State::Complete;
163
164                        drop(ctx);
165
166                        let data = this
167                            .take_data()
168                            .expect("Data must be present on completion");
169                        Poll::Ready(data.complete(cqe.into()))
170                    }
171
172                    Lifecycle::Submitted => {
173                        unreachable!("Submitted lifecycle should never be seen here");
174                    }
175
176                    Lifecycle::Cancelled(_) => {
177                        unreachable!("Cancelled lifecycle should never be seen here");
178                    }
179                }
180            }
181
182            State::Complete => {
183                panic!("Future polled after completion");
184            }
185        }
186    }
187}