1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
use std::future::Future;
use std::io;
use std::mem::{self, ManuallyDrop};
use std::pin::Pin;
use std::task::{Context, Poll};

use futures_core::ready;

use crate::{Event, Drive};
use crate::completion::Completion;
use crate::drive::Completion as ExternalCompletion;

/// A [`Future`] representing an event submitted to io-uring
pub struct Submission<E: Event, D> {
    state: State,
    event: ManuallyDrop<E>,
    driver: D,
    completion: Completion,
}

#[derive(Copy, Clone, Eq, PartialEq)]
enum State {
    Waiting,
    Prepared,
    Submitted,
    Complete,
    Lost,
}

impl<E: Event, D: Drive> Submission<E, D> {
    /// Construct a new submission from an event and a driver.
    pub fn new(event: E, driver: D) -> Submission<E, D> {
        Submission {
            state: State::Waiting,
            event: ManuallyDrop::new(event),
            completion: Completion::dangling(),
            driver,
        }
    }

    #[inline(always)]
    unsafe fn try_prepare(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<()> {
        let this = Pin::get_unchecked_mut(self);
        let driver = Pin::new_unchecked(&mut this.driver);
        let event = &mut *this.event;
        let state = &mut this.state;
        let completion = ready!(driver.poll_prepare(ctx, |sqe, ctx| {
            prepare(sqe, ctx, event, state)
        }));
        *state = State::Prepared;
        this.completion = completion.real;
        Poll::Ready(())
    }

    #[inline(always)]
    unsafe fn try_submit(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<()> {
        let (event, driver) = self.as_mut().event_and_driver();
        // TODO figure out how to handle this result
        let _ = ready!(driver.poll_submit(ctx, event.is_eager()));
        Pin::get_unchecked_mut(self).state = State::Submitted;
        Poll::Pending
    }

    #[inline(always)]
    unsafe fn try_complete(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<(E, io::Result<usize>)> {
        let this = Pin::get_unchecked_mut(self);
        if let Some(result) = this.completion.check() {
            this.state = State::Complete;
            this.completion.deallocate();
            let event = ManuallyDrop::take(&mut this.event);
            Poll::Ready((event, result))
        } else {
            this.completion.set_waker(ctx.waker().clone());
            Poll::Pending
        }
    }

    #[inline(always)]
    fn event_and_driver(self: Pin<&mut Self>) -> (&mut E, Pin<&mut D>) {
        unsafe {
            let this: &mut Submission<E, D> = Pin::get_unchecked_mut(self);
            (&mut this.event, Pin::new_unchecked(&mut this.driver))
        }
    }
}

impl<E, D> Future for Submission<E, D> where
    E: Event,
    D: Drive,
{
    type Output = (E, io::Result<usize>);

    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        unsafe {
            match self.state {
                // In the waiting state, first attempt to prepare the event
                // for submission. If that succeeds, also attempt to submit it.
                State::Waiting     => {
                    ready!(self.as_mut().try_prepare(ctx));
                    ready!(self.as_mut().try_submit(ctx));
                    Poll::Pending
                }

                // If the event has been prepared but not submitted, first
                // try to complete it to check if it has been opportunistically
                // opportunistically submitted elsewhere. If not, attempt to
                // submit the event.
                State::Prepared    => {
                    match self.as_mut().try_complete(ctx) {
                        ready @ Poll::Ready(..) => ready,
                        Poll::Pending           => {
                            ready!(self.as_mut().try_submit(ctx));
                            Poll::Pending
                        }
                    }
                }

                // If the event is submitted, there is no work but to try to
                // complete it.
                State::Submitted   => self.try_complete(ctx),

                // If the event is completed, this future has been called after
                // returning ready.
                State::Complete     => panic!("Submission future polled after completion finished"),

                // The "Lost" state indicates that the event was prepared, but
                // its completion was not saved. This can only occur when the
                // driver is incorrectly implemented: preparing a completion
                // but then not returning it to us.
                State::Lost         => panic!("Submission future in a bad state; driver is faulty"),
            }
        }
    }
}


impl<E: Event, D> Drop for Submission<E, D> {
    fn drop(&mut self) {
        if matches!(self.state, State::Prepared | State::Submitted) {
            unsafe {
                self.completion.cancel(Event::cancellation(&mut self.event));
            }
        }
    }
}

#[inline(always)]
unsafe fn prepare<'cx, E: Event>(
    sqe: iou::SubmissionQueueEvent<'_>,
    ctx: &mut Context<'cx>,
    event: &mut E,
    state: &mut State,
) -> ExternalCompletion<'cx> {
    // Use the SubmissionCleaner guard to clear the submission of any data
    // in case the Event::prepare method panics
    struct SubmissionCleaner<'a>(iou::SubmissionQueueEvent<'a>);

    impl<'a> Drop for SubmissionCleaner<'a> {
        fn drop(&mut self) {
            unsafe {
                self.0.prep_nop();
                self.0.set_user_data(0);
            }
        }
    }

    let mut sqe = SubmissionCleaner(sqe);
    event.prepare(&mut sqe.0);
    
    // NB: State is put into the `Lost` state in case the driver fails to fulfill its side of the
    // contract and return the Completion back to us. In the Lost state, future attempts to poll
    // the submission will panic.
    *state = State::Lost;

    let completion = Completion::new(ctx.waker().clone());
    sqe.0.set_user_data(completion.addr());
    mem::forget(sqe);
    ExternalCompletion::new(completion, ctx)
}