1use std::{
2 future::Future,
3 io,
4 marker::PhantomData,
5 pin::Pin,
6 sync::{Arc, Condvar, Mutex},
7 task::{Context, Poll, Waker},
8};
9
10use super::{
11 io_uring::io_uring_cqe, FromCqe, Measure, Uring, M,
12};
13
14#[derive(Debug)]
15struct CompletionState {
16 done: bool,
17 item: Option<io::Result<io_uring_cqe>>,
18 waker: Option<Waker>,
19}
20
21impl Default for CompletionState {
22 fn default() -> CompletionState {
23 CompletionState {
24 done: false,
25 item: None,
26 waker: None,
27 }
28 }
29}
30
31#[derive(Debug)]
41pub struct Completion<'a, C: FromCqe> {
42 lifetime: PhantomData<&'a C>,
43 mu: Arc<Mutex<CompletionState>>,
44 cv: Arc<Condvar>,
45 uring: &'a Uring,
46 pub(crate) sqe_id: u64,
47}
48
49#[derive(Debug)]
51pub struct Filler {
52 mu: Arc<Mutex<CompletionState>>,
53 cv: Arc<Condvar>,
54}
55
56pub fn pair<'a, C: FromCqe>(
59 uring: &'a Uring,
60) -> (Completion<'a, C>, Filler) {
61 let mu =
62 Arc::new(Mutex::new(CompletionState::default()));
63 let cv = Arc::new(Condvar::new());
64 let future = Completion {
65 lifetime: PhantomData,
66 mu: mu.clone(),
67 cv: cv.clone(),
68 sqe_id: 0,
69 uring,
70 };
71 let filler = Filler { mu, cv };
72
73 (future, filler)
74}
75
76impl<'a, C: FromCqe> Completion<'a, C> {
77 pub fn wait(self) -> io::Result<C>
80 where
81 C: FromCqe,
82 {
83 self.wait_inner().unwrap()
84 }
85
86 fn wait_inner(&self) -> Option<io::Result<C>>
87 where
88 C: FromCqe,
89 {
90 debug_assert_ne!(
91 self.sqe_id,
92 0,
93 "sqe_id was never filled-in for this Completion",
94 );
95
96 self.uring
97 .ensure_submitted(self.sqe_id)
98 .expect("failed to submit SQE from wait_inner");
99
100 let _ = Measure::new(&M.wait);
101
102 let mut inner = self.mu.lock().unwrap();
103
104 while !inner.done {
105 inner = self.cv.wait(inner).unwrap();
106 }
107
108 inner.item.take().map(|io_result| {
109 io_result.map(FromCqe::from_cqe)
110 })
111 }
112}
113
114impl<'a, C: FromCqe> Drop for Completion<'a, C> {
115 fn drop(&mut self) {
116 self.wait_inner();
117 }
118}
119
120impl<'a, C: FromCqe> Future for Completion<'a, C> {
121 type Output = io::Result<C>;
122
123 fn poll(
124 self: Pin<&mut Self>,
125 cx: &mut Context<'_>,
126 ) -> Poll<Self::Output> {
127 self.uring
128 .ensure_submitted(self.sqe_id)
129 .expect("failed to submit SQE from wait_inner");
130
131 let mut state = self.mu.lock().unwrap();
132 if state.item.is_some() {
133 Poll::Ready(
134 state
135 .item
136 .take()
137 .unwrap()
138 .map(FromCqe::from_cqe),
139 )
140 } else {
141 if !state.done {
142 state.waker = Some(cx.waker().clone());
143 }
144 Poll::Pending
145 }
146 }
147}
148
149impl Filler {
150 pub fn fill(self, inner: io::Result<io_uring_cqe>) {
152 let mut state = self.mu.lock().unwrap();
153
154 if let Some(waker) = state.waker.take() {
155 waker.wake();
156 }
157
158 state.item = Some(inner);
159 state.done = true;
160
161 self.cv.notify_all();
162 }
163}