Skip to main content

tokio/runtime/driver/
op.rs

1use crate::io::blocking::Buf;
2use crate::io::uring::open::Open;
3use crate::io::uring::read::Read;
4use crate::io::uring::utils::ArcFd;
5use crate::io::uring::write::Write;
6
7use crate::runtime::Handle;
8
9use io_uring::cqueue;
10use io_uring::squeue::Entry;
11use std::future::Future;
12use std::io::{self, Error};
13use std::mem;
14use std::os::fd::OwnedFd;
15use std::pin::Pin;
16use std::task::{Context, Poll, Waker};
17
18// This field isn't accessed directly, but it holds cancellation data,
19// so `#[allow(dead_code)]` is needed.
20#[allow(dead_code)]
21#[derive(Debug)]
22pub(crate) enum CancelData {
23    Open(Open),
24    Write(Write),
25    ReadVec(Read<Vec<u8>, OwnedFd>),
26    ReadBuf(Read<Buf, ArcFd>),
27}
28
29#[derive(Debug)]
30pub(crate) enum Lifecycle {
31    /// The operation has been submitted to uring and is currently in-flight
32    Submitted,
33
34    /// The submitter is waiting for the completion of the operation
35    Waiting(Waker),
36
37    /// The submitter no longer has interest in the operation result. The state
38    /// must be passed to the driver and held until the operation completes.
39    Cancelled(
40        // This field isn't accessed directly, but it holds cancellation data,
41        // so `#[allow(dead_code)]` is needed.
42        #[allow(dead_code)] CancelData,
43    ),
44
45    /// The operation has completed with a single cqe result
46    Completed(io_uring::cqueue::Entry),
47}
48
49pub(crate) enum State {
50    Initialize(Option<Entry>),
51    Polled(usize),
52    Complete,
53}
54
55pub(crate) struct Op<T: Cancellable> {
56    // Handle to the runtime
57    handle: Handle,
58    // State of this Op
59    state: State,
60    // Per operation data.
61    data: Option<T>,
62}
63
64impl<T: Cancellable> Op<T> {
65    /// # Safety
66    ///
67    /// Callers must ensure that parameters of the entry (such as buffer) are valid and will
68    /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
69    pub(crate) unsafe fn new(entry: Entry, data: T) -> Self {
70        let handle = Handle::current();
71        Self {
72            handle,
73            data: Some(data),
74            state: State::Initialize(Some(entry)),
75        }
76    }
77    pub(crate) fn take_data(&mut self) -> Option<T> {
78        self.data.take()
79    }
80}
81
82impl<T: Cancellable> Drop for Op<T> {
83    fn drop(&mut self) {
84        match self.state {
85            // We've already dropped this Op.
86            State::Complete => (),
87            // We will cancel this Op.
88            State::Polled(index) => {
89                let data = self.take_data();
90                let handle = &mut self.handle;
91                handle.inner.driver().io().cancel_op(index, data);
92            }
93            // This Op has not been polled yet.
94            // We don't need to do anything here.
95            State::Initialize(_) => (),
96        }
97    }
98}
99
100/// A single CQE result
101pub(crate) struct CqeResult {
102    pub(crate) result: io::Result<u32>,
103}
104
105impl From<cqueue::Entry> for CqeResult {
106    fn from(cqe: cqueue::Entry) -> Self {
107        let res = cqe.result();
108        let result = if res >= 0 {
109            Ok(res as u32)
110        } else {
111            Err(io::Error::from_raw_os_error(-res))
112        };
113        CqeResult { result }
114    }
115}
116
117/// A trait that converts a CQE result into a usable value for each operation.
118pub(crate) trait Completable {
119    type Output;
120    fn complete(self, cqe: CqeResult) -> Self::Output;
121
122    // This is used when you want to terminate an operation with an error.
123    //
124    // The `Op` type that implements this trait can return the passed error
125    // upstream by embedding it in the `Output`.
126    fn complete_with_error(self, error: Error) -> Self::Output;
127}
128
129/// Extracts the `CancelData` needed to safely cancel an in-flight io_uring operation.
130pub(crate) trait Cancellable {
131    fn cancel(self) -> CancelData;
132}
133
134impl<T: Cancellable> Unpin for Op<T> {}
135
136impl<T: Cancellable + Completable + Send> Future for Op<T> {
137    type Output = T::Output;
138
139    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
140        let this = self.get_mut();
141        let handle = &mut this.handle;
142        let driver = handle.inner.driver().io();
143
144        match &mut this.state {
145            State::Initialize(entry_opt) => {
146                let entry = entry_opt.take().expect("Entry must be present");
147                let waker = cx.waker().clone();
148
149                // SAFETY: entry is valid for the entire duration of the operation
150                match unsafe { driver.register_op(entry, waker) } {
151                    Ok(idx) => this.state = State::Polled(idx),
152                    Err(err) => {
153                        let data = this
154                            .take_data()
155                            .expect("Data must be present on Initialization");
156
157                        this.state = State::Complete;
158
159                        return Poll::Ready(data.complete_with_error(err));
160                    }
161                };
162
163                Poll::Pending
164            }
165
166            State::Polled(idx) => {
167                let mut ctx = driver.get_uring().lock();
168                let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present");
169
170                match mem::replace(lifecycle, Lifecycle::Submitted) {
171                    // Only replace the stored waker if it wouldn't wake the new one
172                    Lifecycle::Waiting(prev) if !prev.will_wake(cx.waker()) => {
173                        let waker = cx.waker().clone();
174                        *lifecycle = Lifecycle::Waiting(waker);
175                        Poll::Pending
176                    }
177
178                    Lifecycle::Waiting(prev) => {
179                        *lifecycle = Lifecycle::Waiting(prev);
180                        Poll::Pending
181                    }
182
183                    Lifecycle::Completed(cqe) => {
184                        // Clean up and complete the future
185                        ctx.remove_op(*idx);
186
187                        this.state = State::Complete;
188
189                        drop(ctx);
190
191                        let data = this
192                            .take_data()
193                            .expect("Data must be present on completion");
194                        Poll::Ready(data.complete(cqe.into()))
195                    }
196
197                    Lifecycle::Submitted => {
198                        unreachable!("Submitted lifecycle should never be seen here");
199                    }
200
201                    Lifecycle::Cancelled(_) => {
202                        unreachable!("Cancelled lifecycle should never be seen here");
203                    }
204                }
205            }
206
207            State::Complete => {
208                panic!("Future polled after completion");
209            }
210        }
211    }
212}