1use std::io;
2use std::mem;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use futures_core::ready;
7
8use crate::completion::Completion;
9use crate::drive::Completion as ExternalCompletion;
10use crate::drive::Drive;
11use crate::Cancellation;
12
13use State::*;
14
15pub struct Ring<D: Drive> {
28 state: State,
29 completion: Option<Completion>,
30 driver: D,
31}
32
33
34#[derive(Debug, Eq, PartialEq)]
35enum State {
36 Inert = 0,
37 Prepared,
38 Submitted,
39 Lost,
40}
41
42impl<D: Default + Drive> Default for Ring<D> {
43 fn default() -> Ring<D> {
44 Ring::new(D::default())
45 }
46}
47
48impl<D: Drive + Clone> Clone for Ring<D> {
49 fn clone(&self) -> Ring<D> {
50 Ring::new(self.driver.clone())
51 }
52}
53
54impl<D: Drive> Ring<D> {
55 #[inline(always)]
57 pub fn new(driver: D) -> Ring<D> {
58 Ring {
59 state: Inert,
60 completion: None,
61 driver
62 }
63 }
64
65 pub fn driver(&self) -> &D {
66 &self.driver
67 }
68
69 #[inline]
76 pub fn poll(
77 mut self: Pin<&mut Self>,
78 ctx: &mut Context<'_>,
79 is_eager: bool,
80 prepare: impl FnOnce(&mut iou::SubmissionQueueEvent<'_>),
81 ) -> Poll<io::Result<usize>> {
82 match self.state {
83 Inert => {
84 ready!(self.as_mut().poll_prepare(ctx, prepare));
85 ready!(self.as_mut().poll_submit(ctx, is_eager));
86 Poll::Pending
87 }
88 Prepared => {
89 match self.as_mut().poll_complete(ctx) {
90 ready @ Poll::Ready(..) => ready,
91 Poll::Pending => {
92 ready!(self.poll_submit(ctx, is_eager));
93 Poll::Pending
94 }
95 }
96 }
97 Submitted => self.poll_complete(ctx),
98 Lost => panic!("Ring in a bad state; driver is faulty"),
99 }
100 }
101
102 #[inline(always)]
103 fn poll_prepare(
104 self: Pin<&mut Self>,
105 ctx: &mut Context<'_>,
106 prepare: impl FnOnce(&mut iou::SubmissionQueueEvent<'_>),
107 ) -> Poll<()> {
108 let (driver, state, completion_slot) = self.split();
109 let completion = ready!(driver.poll_prepare(ctx, |sqe, ctx| {
110 struct SubmissionCleaner<'a>(iou::SubmissionQueueEvent<'a>);
111
112 impl Drop for SubmissionCleaner<'_> {
113 fn drop(&mut self) {
114 unsafe {
115 self.0.prep_nop();
116 self.0.set_user_data(0);
117 }
118 }
119 }
120
121 let mut sqe = SubmissionCleaner(sqe);
122 *state = Lost;
123 prepare(&mut sqe.0);
124 let completion = Completion::new(ctx.waker().clone());
125 sqe.0.set_user_data(completion.addr());
126 mem::forget(sqe);
127 ExternalCompletion::new(completion, ctx)
128 }));
129 *state = Prepared;
130 *completion_slot = Some(completion.real);
131 Poll::Ready(())
132 }
133
134 #[inline(always)]
135 fn poll_submit(self: Pin<&mut Self>, ctx: &mut Context<'_>, is_eager: bool) -> Poll<()> {
136 let (driver, state, _) = self.split();
137 let _ = ready!(driver.poll_submit(ctx, is_eager));
139 *state = Submitted;
140 Poll::Ready(())
141 }
142
143 #[inline(always)]
144 fn poll_complete(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<usize>> {
145 let (_, state, completion_slot) = self.split();
146 match completion_slot.take().unwrap().check(ctx.waker()) {
147 Ok(result) => {
148 *state = Inert;
149 Poll::Ready(result)
150 }
151 Err(completion) => {
152 *completion_slot = Some(completion);
153 Poll::Pending
154 }
155 }
156 }
157
158 #[inline]
163 pub fn cancel(&mut self, cancellation: Cancellation) {
164 if let Some(completion) = self.completion.take() {
165 completion.cancel(cancellation);
166 }
167 }
168
169 pub fn cancel_pinned(self: Pin<&mut Self>, cancellation: Cancellation) {
173 unsafe { Pin::get_unchecked_mut(self).cancel(cancellation) }
174 }
175
176 fn split(self: Pin<&mut Self>) -> (Pin<&mut D>, &mut State, &mut Option<Completion>) {
177 unsafe {
178 let this = Pin::get_unchecked_mut(self);
179 (Pin::new_unchecked(&mut this.driver), &mut this.state, &mut this.completion)
180 }
181 }
182}