ringbahn/
ring.rs

1use std::io;
2use std::mem;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures_core::ready;
7
8use crate::completion::Completion;
9use crate::drive::Completion as ExternalCompletion;
10use crate::drive::Drive;
11use crate::Cancellation;
12
13use State::*;
14
15/// A low-level primitive for building an IO object on io-uring
16/// 
17/// Ring is a state machine similar to `Submission`, but it is designed to cycle through multiple
18/// IO events submitted to io-uring, rather than representing a single submission. Because of this,
19/// it is more low level, but it is suitable fro building an IO object like a `File` on top of
20/// io-uring.
21///
22/// Users writing code on top of `Ring` are responsible for making sure that it is correct. For
23/// example, when calling `poll`, users must ensure that they are in the proper state to submit
24/// whatever type of IO they would be attempting to submit. Additionally, users should note that
25/// `Ring` does not implement `Drop`. In order to cancel any ongoing IO, users are responsible for
26/// implementing drop to call cancel properly.
27pub struct Ring<D: Drive> {
28    state: State,
29    completion: Option<Completion>,
30    driver: D,
31}
32
33
34#[derive(Debug, Eq, PartialEq)]
35enum State {
36    Inert = 0,
37    Prepared,
38    Submitted,
39    Lost,
40}
41
42impl<D: Default + Drive> Default for Ring<D> {
43    fn default() -> Ring<D> {
44        Ring::new(D::default())
45    }
46}
47
48impl<D: Drive + Clone> Clone for Ring<D> {
49    fn clone(&self) -> Ring<D> {
50        Ring::new(self.driver.clone())
51    }
52}
53
54impl<D: Drive> Ring<D> {
55    /// Construct a new Ring on top of a driver.
56    #[inline(always)]
57    pub fn new(driver: D) -> Ring<D> {
58        Ring {
59            state: Inert,
60            completion: None,
61            driver
62        }
63    }
64
65    pub fn driver(&self) -> &D {
66        &self.driver
67    }
68
69    /// Poll the ring state machine.
70    ///
71    /// This accepts a callback, `prepare`, which prepares an event to be submitted to io-uring.
72    /// This callback will only be called once during an iteration of ring's state machine: once an
73    /// event has been prepared, until it is completed or cancelled, a single ring instance will
74    /// not prepare any additional events.
75    #[inline]
76    pub fn poll(
77        mut self: Pin<&mut Self>,
78        ctx: &mut Context<'_>,
79        is_eager: bool,
80        prepare: impl FnOnce(&mut iou::SubmissionQueueEvent<'_>),
81    ) -> Poll<io::Result<usize>> {
82        match self.state {
83            Inert       => {
84                ready!(self.as_mut().poll_prepare(ctx, prepare));
85                ready!(self.as_mut().poll_submit(ctx, is_eager));
86                Poll::Pending
87            }
88            Prepared    => {
89                match self.as_mut().poll_complete(ctx) {
90                    ready @ Poll::Ready(..) => ready,
91                    Poll::Pending           => {
92                        ready!(self.poll_submit(ctx, is_eager));
93                        Poll::Pending
94                    }
95                }
96            }
97            Submitted   => self.poll_complete(ctx),
98            Lost        => panic!("Ring in a bad state; driver is faulty"),
99        }
100    }
101
102    #[inline(always)]
103    fn poll_prepare(
104        self: Pin<&mut Self>,
105        ctx: &mut Context<'_>,
106        prepare: impl FnOnce(&mut iou::SubmissionQueueEvent<'_>),
107    ) -> Poll<()> {
108        let (driver, state, completion_slot) = self.split();
109        let completion = ready!(driver.poll_prepare(ctx, |sqe, ctx| {
110            struct SubmissionCleaner<'a>(iou::SubmissionQueueEvent<'a>);
111
112            impl Drop for SubmissionCleaner<'_> {
113                fn drop(&mut self) {
114                    unsafe {
115                        self.0.prep_nop();
116                        self.0.set_user_data(0);
117                    }
118                }
119            }
120
121            let mut sqe = SubmissionCleaner(sqe);
122            *state = Lost;
123            prepare(&mut sqe.0);
124            let completion = Completion::new(ctx.waker().clone());
125            sqe.0.set_user_data(completion.addr());
126            mem::forget(sqe);
127            ExternalCompletion::new(completion, ctx)
128        }));
129        *state = Prepared;
130        *completion_slot = Some(completion.real);
131        Poll::Ready(())
132    }
133
134    #[inline(always)]
135    fn poll_submit(self: Pin<&mut Self>, ctx: &mut Context<'_>, is_eager: bool) -> Poll<()> {
136        let (driver, state, _) = self.split();
137        // TODO figure out how to handle this result
138        let _ = ready!(driver.poll_submit(ctx, is_eager));
139        *state = Submitted;
140        Poll::Ready(())
141    }
142
143    #[inline(always)]
144    fn poll_complete(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<usize>> {
145        let (_, state, completion_slot) = self.split();
146        match completion_slot.take().unwrap().check(ctx.waker()) {
147            Ok(result)      => {
148                *state = Inert;
149                Poll::Ready(result)
150            }
151            Err(completion) => {
152                *completion_slot = Some(completion);
153                Poll::Pending
154            }
155        }
156    }
157
158    /// Cancel any ongoing IO with this cancellation.
159    ///
160    /// Users are responsible for ensuring that the cancellation passed would be appropriate to
161    /// clean up the resources of the running event.
162    #[inline]
163    pub fn cancel(&mut self, cancellation: Cancellation) {
164        if let Some(completion) = self.completion.take() {
165            completion.cancel(cancellation);
166        }
167    }
168
169    /// Cancel any ongoing IO, but from a pinned reference.
170    ///
171    /// This has the same behavior of as Ring::cancel.
172    pub fn cancel_pinned(self: Pin<&mut Self>, cancellation: Cancellation) {
173        unsafe { Pin::get_unchecked_mut(self).cancel(cancellation) }
174    }
175
176    fn split(self: Pin<&mut Self>) -> (Pin<&mut D>, &mut State, &mut Option<Completion>) {
177        unsafe {
178            let this = Pin::get_unchecked_mut(self);
179            (Pin::new_unchecked(&mut this.driver), &mut this.state, &mut this.completion)
180        }
181    }
182}