1use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use crate::event_node::EventNode;
11use crate::oneshot_rt::OneshotId;
12use crate::reactor::{EventId, Reactor};
13use crate::runtime::Runtime;
14use crate::tracer::Tracer;
15
16const MODTRACE: bool = true;
18
19pub fn oneshot<'runtime, T, ReactorT: Reactor>(
30 rt: &'runtime Runtime<ReactorT>,
31) -> (
32 SenderOnce<'runtime, T, ReactorT>,
33 RecverOnce<'runtime, T, ReactorT>,
34) {
35 let oneshot_id = rt.oneshots().create();
36 (
37 SenderOnce::new(rt, oneshot_id),
38 RecverOnce::new(rt, oneshot_id),
39 )
40}
41
42#[derive(Debug)] pub struct RecvError;
46
47struct RuntimeOneshot<'runtime, ReactorT: Reactor> {
50 rt: &'runtime Runtime<ReactorT>,
51 oneshot_id: OneshotId,
52}
53
54impl<'runtime, ReactorT: Reactor> RuntimeOneshot<'runtime, ReactorT> {
55 fn new(rt: &'runtime Runtime<ReactorT>, oneshot_id: OneshotId) -> Self {
56 RuntimeOneshot { rt, oneshot_id }
57 }
58
59 fn reg_sender(&self, sender_event_id: EventId, pointer: *mut ()) {
60 self.rt
61 .oneshots()
62 .reg_sender(self.oneshot_id, sender_event_id, pointer);
63 }
64
65 fn reg_receiver(&self, receiver_event_id: EventId, pointer: *mut ()) {
66 self.rt
67 .oneshots()
68 .reg_receiver(self.oneshot_id, receiver_event_id, pointer);
69 }
70
71 unsafe fn exchange<T>(&self) -> bool {
72 self.rt.oneshots().exchange::<T>(self.oneshot_id)
73 }
74
75 fn cancel_sender(&self) {
76 self.rt.oneshots().cancel_sender(self.oneshot_id);
77 }
78
79 fn cancel_receiver(&self) {
80 self.rt.oneshots().cancel_receiver(self.oneshot_id);
81 }
82
83 fn oneshot_id(&self) -> OneshotId {
84 self.oneshot_id
85 }
86}
87
88pub struct SenderOnce<'runtime, T, ReactorT: Reactor> {
91 inner: SenderInner<'runtime, T, ReactorT>, }
93
94enum SenderInner<'runtime, T, ReactorT: Reactor> {
96 Created(RuntimeOneshot<'runtime, ReactorT>),
97 Sent(PhantomData<T>), }
99
100impl<'runtime, T, ReactorT: Reactor> SenderOnce<'runtime, T, ReactorT> {
101 fn new(rt: &'runtime Runtime<ReactorT>, oneshot_id: OneshotId) -> Self {
102 SenderOnce {
103 inner: SenderInner::Created(RuntimeOneshot::new(rt, oneshot_id)),
104 }
105 }
106
107 pub async fn send(&mut self, value: T) -> Result<(), T> {
110 let prev = std::mem::replace(&mut self.inner, SenderInner::Sent(PhantomData));
111
112 match prev {
114 SenderInner::Sent(_) => panic!(concat!(
115 "aiur: oneshot::SenderOnce::send() invoked twice.",
116 "Oneshot channel can be only used for one transfer."
117 )),
118
119 SenderInner::Created(ref rc) => SenderFuture::new(rc, value).await,
120 }
121 }
122}
123
124impl<'runtime, T, ReactorT: Reactor> Drop for SenderOnce<'runtime, T, ReactorT> {
125 fn drop(&mut self) {
126 if let SenderInner::Created(ref runtime_channel) = self.inner {
127 runtime_channel.cancel_sender();
128 }
129 }
130}
131
132#[derive(Debug)]
134enum PeerFutureState {
135 Created,
136 Exchanging,
137 Closed,
138}
139
140struct SenderFuture<'runtime, T, ReactorT: Reactor> {
142 runtime_channel: RuntimeOneshot<'runtime, ReactorT>,
143 event_node: EventNode,
144 data: Option<T>,
145 state: PeerFutureState,
146}
147
148impl<'runtime, T, ReactorT: Reactor> SenderFuture<'runtime, T, ReactorT> {
149 fn new(rc: &RuntimeOneshot<'runtime, ReactorT>, value: T) -> Self {
150 SenderFuture {
151 runtime_channel: RuntimeOneshot::new(rc.rt, rc.oneshot_id),
152 event_node: EventNode::new(),
153 data: Some(value),
154 state: PeerFutureState::Created,
155 }
156 }
157
158 fn set_state(&mut self, new_state: PeerFutureState) {
159 modtrace!(
160 self.tracer(),
161 "oneshot_sender_future: {:?} state {:?} -> {:?}",
162 self.runtime_channel.oneshot_id(),
163 self.state,
164 new_state
165 );
166 self.state = new_state;
167 }
168
169 fn transmit(&mut self, event_id: EventId) -> Poll<Result<(), T>> {
170 self.set_state(PeerFutureState::Exchanging);
171
172 self.runtime_channel
173 .reg_sender(event_id, (&mut self.data) as *mut Option<T> as *mut ());
174
175 Poll::Pending
176 }
177
178 fn close(&mut self) -> Poll<Result<(), T>> {
179 if !self.event_node.is_awoken_for(self.runtime_channel.rt) {
180 return Poll::Pending; }
182
183 self.set_state(PeerFutureState::Closed);
184
185 return if unsafe { self.runtime_channel.exchange::<T>() } {
186 Poll::Ready(Ok(()))
187 } else {
188 Poll::Ready(Err(self.data.take().unwrap()))
189 };
190 }
191
192 fn tracer(&self) -> &Tracer {
193 self.runtime_channel.rt.tracer()
194 }
195}
196
197impl<'runtime, T, ReactorT: Reactor> Future for SenderFuture<'runtime, T, ReactorT> {
198 type Output = Result<(), T>;
199
200 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
201 modtrace!(
202 self.tracer(),
203 "oneshot_sender_future: in the poll() {:?}",
204 self.runtime_channel.oneshot_id()
205 );
206
207 let this = unsafe { self.get_unchecked_mut() };
210
211 return match this.state {
212 PeerFutureState::Created => {
213 let event_id = unsafe { this.event_node.on_pin(ctx) };
214 this.transmit(event_id) }
216 PeerFutureState::Exchanging => this.close(),
217 PeerFutureState::Closed => {
218 panic!("aiur/oneshot_sender_future: was polled after completion.")
219 }
220 };
221 }
222}
223
224impl<'runtime, T, ReactorT: Reactor> Drop for SenderFuture<'runtime, T, ReactorT> {
225 fn drop(&mut self) {
226 modtrace!(self.tracer(), "oneshot_sender_future: drop()");
227 self.runtime_channel.cancel_sender();
228 let _ = self.event_node.on_cancel(); }
230}
231
232pub struct RecverOnce<'runtime, T, ReactorT: Reactor> {
243 runtime_channel: RuntimeOneshot<'runtime, ReactorT>,
244 event_node: EventNode,
245 state: PeerFutureState,
246 data: Option<T>,
247}
248
249impl<'runtime, T, ReactorT: Reactor> RecverOnce<'runtime, T, ReactorT> {
250 fn new(rt: &'runtime Runtime<ReactorT>, oneshot_id: OneshotId) -> Self {
251 RecverOnce {
252 runtime_channel: RuntimeOneshot::new(rt, oneshot_id),
253 event_node: EventNode::new(),
254 state: PeerFutureState::Created,
255 data: None,
256 }
257 }
258
259 fn set_state(&mut self, new_state: PeerFutureState) {
260 modtrace!(
261 self.tracer(),
262 "oneshot_recver_future: state {:?} -> {:?}",
263 self.state,
264 new_state
265 );
266 self.state = new_state;
267 }
268
269 fn transmit(&mut self, event_id: EventId) -> Poll<Result<T, RecvError>> {
270 self.set_state(PeerFutureState::Exchanging);
271 self.runtime_channel
272 .reg_receiver(event_id, (&mut self.data) as *mut Option<T> as *mut ());
273
274 Poll::Pending
275 }
276
277 fn close(&mut self) -> Poll<Result<T, RecvError>> {
278 if !self.event_node.is_awoken_for(self.runtime_channel.rt) {
279 return Poll::Pending;
280 }
281
282 self.set_state(PeerFutureState::Closed);
283 return if unsafe { self.runtime_channel.exchange::<T>() } {
284 Poll::Ready(Ok(self.data.take().unwrap()))
285 } else {
286 Poll::Ready(Err(RecvError))
287 };
288 }
289
290 fn tracer(&self) -> &Tracer {
291 self.runtime_channel.rt.tracer()
292 }
293}
294
295impl<'runtime, T, ReactorT: Reactor> Drop for RecverOnce<'runtime, T, ReactorT> {
296 fn drop(&mut self) {
297 modtrace!(self.tracer(), "oneshot_recver_future: in the drop()");
298 self.runtime_channel.cancel_receiver();
299 let _ = self.event_node.on_cancel(); }
301}
302
303impl<'runtime, T, ReactorT: Reactor> Future for RecverOnce<'runtime, T, ReactorT> {
304 type Output = Result<T, RecvError>;
305
306 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
307 modtrace!(self.tracer(), "oneshot_recver_future: in the poll()");
308
309 let this = unsafe { self.get_unchecked_mut() };
312
313 return match this.state {
314 PeerFutureState::Created => {
315 let event_id = unsafe { this.event_node.on_pin(ctx) };
316 this.transmit(event_id) }
318 PeerFutureState::Exchanging => this.close(),
319 PeerFutureState::Closed => {
320 panic!("aiur/oneshot_recver_future: was polled after completion.")
321 }
322 };
323 }
324}