aiur/
oneshot.rs

1//  \ O /
2//  / * \    aiur: the homeplanet for the famous executors
3// |' | '|   (c) 2020 - present, Vladimir Zvezda
4//   / \
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use crate::event_node::EventNode;
11use crate::oneshot_rt::OneshotId;
12use crate::reactor::{EventId, Reactor};
13use crate::runtime::Runtime;
14use crate::tracer::Tracer;
15
16// enable/disable output of modtrace! macro
17const MODTRACE: bool = true;
18
19// -----------------------------------------------------------------------------------------------
20// Public oneshot() API
21
22/// Creates a new oneshot channel and returns the pair of (sender, receiver).
23///
24/// The created channel is bounded, whenever a sender sends a data it is suspended in await
25/// point until either receiver had the data received or oneshot channel got disconnected.
26///
27/// Neither sender nor receiver can be cloned, it is single use, single producer, single consumer
28/// communication channel.
29pub fn oneshot<'runtime, T, ReactorT: Reactor>(
30    rt: &'runtime Runtime<ReactorT>,
31) -> (
32    SenderOnce<'runtime, T, ReactorT>,
33    RecverOnce<'runtime, T, ReactorT>,
34) {
35    let oneshot_id = rt.oneshots().create();
36    (
37        SenderOnce::new(rt, oneshot_id),
38        RecverOnce::new(rt, oneshot_id),
39    )
40}
41
42/// Error type returned by Receiver: the only possible error is oneshot channel closed
43/// on sender's side.
44#[derive(Debug)] // Debug required for Result.unwrap()
45pub struct RecvError;
46
47// -----------------------------------------------------------------------------------------------
48// RuntimeOneshot: it is often used here: runtime and oneshot_id coupled together.
49struct RuntimeOneshot<'runtime, ReactorT: Reactor> {
50    rt: &'runtime Runtime<ReactorT>,
51    oneshot_id: OneshotId,
52}
53
54impl<'runtime, ReactorT: Reactor> RuntimeOneshot<'runtime, ReactorT> {
55    fn new(rt: &'runtime Runtime<ReactorT>, oneshot_id: OneshotId) -> Self {
56        RuntimeOneshot { rt, oneshot_id }
57    }
58
59    fn reg_sender(&self, sender_event_id: EventId, pointer: *mut ()) {
60        self.rt
61            .oneshots()
62            .reg_sender(self.oneshot_id, sender_event_id, pointer);
63    }
64
65    fn reg_receiver(&self, receiver_event_id: EventId, pointer: *mut ()) {
66        self.rt
67            .oneshots()
68            .reg_receiver(self.oneshot_id, receiver_event_id, pointer);
69    }
70
71    unsafe fn exchange<T>(&self) -> bool {
72        self.rt.oneshots().exchange::<T>(self.oneshot_id)
73    }
74
75    fn cancel_sender(&self) {
76        self.rt.oneshots().cancel_sender(self.oneshot_id);
77    }
78
79    fn cancel_receiver(&self) {
80        self.rt.oneshots().cancel_receiver(self.oneshot_id);
81    }
82
83    fn oneshot_id(&self) -> OneshotId {
84        self.oneshot_id
85    }
86}
87
88// -----------------------------------------------------------------------------------------------
89/// The sending half of the oneshot channel created by [oneshot()] function.
90pub struct SenderOnce<'runtime, T, ReactorT: Reactor> {
91    inner: SenderInner<'runtime, T, ReactorT>, // use inner to hide enum internals
92}
93
94// Possible state of the SenderOnce: before and after .send() is invoked
95enum SenderInner<'runtime, T, ReactorT: Reactor> {
96    Created(RuntimeOneshot<'runtime, ReactorT>),
97    Sent(PhantomData<T>), // Type required for Future
98}
99
100impl<'runtime, T, ReactorT: Reactor> SenderOnce<'runtime, T, ReactorT> {
101    fn new(rt: &'runtime Runtime<ReactorT>, oneshot_id: OneshotId) -> Self {
102        SenderOnce {
103            inner: SenderInner::Created(RuntimeOneshot::new(rt, oneshot_id)),
104        }
105    }
106
107    /// Sends value to the receiver side of the channel. If receiver end is already closed,
108    /// the original value returned as error in result.
109    pub async fn send(&mut self, value: T) -> Result<(), T> {
110        let prev = std::mem::replace(&mut self.inner, SenderInner::Sent(PhantomData));
111
112        // TODO: perhaps send should receive (self,..) instead of (&mut self)?
113        match prev {
114            SenderInner::Sent(_) => panic!(concat!(
115                "aiur: oneshot::SenderOnce::send() invoked twice.",
116                "Oneshot channel can be only used for one transfer."
117            )),
118
119            SenderInner::Created(ref rc) => SenderFuture::new(rc, value).await,
120        }
121    }
122}
123
124impl<'runtime, T, ReactorT: Reactor> Drop for SenderOnce<'runtime, T, ReactorT> {
125    fn drop(&mut self) {
126        if let SenderInner::Created(ref runtime_channel) = self.inner {
127            runtime_channel.cancel_sender();
128        }
129    }
130}
131
132// -----------------------------------------------------------------------------------------------
133#[derive(Debug)]
134enum PeerFutureState {
135    Created,
136    Exchanging,
137    Closed,
138}
139
140// -----------------------------------------------------------------------------------------------
141struct SenderFuture<'runtime, T, ReactorT: Reactor> {
142    runtime_channel: RuntimeOneshot<'runtime, ReactorT>,
143    event_node: EventNode,
144    data: Option<T>,
145    state: PeerFutureState,
146}
147
148impl<'runtime, T, ReactorT: Reactor> SenderFuture<'runtime, T, ReactorT> {
149    fn new(rc: &RuntimeOneshot<'runtime, ReactorT>, value: T) -> Self {
150        SenderFuture {
151            runtime_channel: RuntimeOneshot::new(rc.rt, rc.oneshot_id),
152            event_node: EventNode::new(),
153            data: Some(value),
154            state: PeerFutureState::Created,
155        }
156    }
157
158    fn set_state(&mut self, new_state: PeerFutureState) {
159        modtrace!(
160            self.tracer(),
161            "oneshot_sender_future: {:?} state {:?} -> {:?}",
162            self.runtime_channel.oneshot_id(),
163            self.state,
164            new_state
165        );
166        self.state = new_state;
167    }
168
169    fn transmit(&mut self, event_id: EventId) -> Poll<Result<(), T>> {
170        self.set_state(PeerFutureState::Exchanging);
171
172        self.runtime_channel
173            .reg_sender(event_id, (&mut self.data) as *mut Option<T> as *mut ());
174
175        Poll::Pending
176    }
177
178    fn close(&mut self) -> Poll<Result<(), T>> {
179        if !self.event_node.is_awoken_for(self.runtime_channel.rt) {
180            return Poll::Pending; // not our event, ignore the poll
181        }
182
183        self.set_state(PeerFutureState::Closed);
184
185        return if unsafe { self.runtime_channel.exchange::<T>() } {
186            Poll::Ready(Ok(()))
187        } else {
188            Poll::Ready(Err(self.data.take().unwrap()))
189        };
190    }
191
192    fn tracer(&self) -> &Tracer {
193        self.runtime_channel.rt.tracer()
194    }
195}
196
197impl<'runtime, T, ReactorT: Reactor> Future for SenderFuture<'runtime, T, ReactorT> {
198    type Output = Result<(), T>;
199
200    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
201        modtrace!(
202            self.tracer(),
203            "oneshot_sender_future: in the poll() {:?}",
204            self.runtime_channel.oneshot_id()
205        );
206
207        // Unsafe usage: this function does not moves out data from self, as required by
208        // Pin::map_unchecked_mut().
209        let this = unsafe { self.get_unchecked_mut() };
210
211        return match this.state {
212            PeerFutureState::Created => {
213                let event_id = unsafe { this.event_node.on_pin(ctx) };
214                this.transmit(event_id) // always returns Pending
215            }
216            PeerFutureState::Exchanging => this.close(),
217            PeerFutureState::Closed => {
218                panic!("aiur/oneshot_sender_future: was polled after completion.")
219            }
220        };
221    }
222}
223
224impl<'runtime, T, ReactorT: Reactor> Drop for SenderFuture<'runtime, T, ReactorT> {
225    fn drop(&mut self) {
226        modtrace!(self.tracer(), "oneshot_sender_future: drop()");
227        self.runtime_channel.cancel_sender();
228        let _ = self.event_node.on_cancel(); // remove the events from frozen list
229    }
230}
231
232// -----------------------------------------------------------------------------------------------
233// RecverOnce (Future)
234//
235// Receiver has a lot of copy paste with SenderFuture, but unification produced more code and
236// less clarity.
237//
238/// The receiving half of the oneshot channel created by [oneshot()] function.
239///
240/// It implements the [std::future::Future], so app code just awaits on this object to receive
241/// the value from sender.
242pub struct RecverOnce<'runtime, T, ReactorT: Reactor> {
243    runtime_channel: RuntimeOneshot<'runtime, ReactorT>,
244    event_node: EventNode,
245    state: PeerFutureState,
246    data: Option<T>,
247}
248
249impl<'runtime, T, ReactorT: Reactor> RecverOnce<'runtime, T, ReactorT> {
250    fn new(rt: &'runtime Runtime<ReactorT>, oneshot_id: OneshotId) -> Self {
251        RecverOnce {
252            runtime_channel: RuntimeOneshot::new(rt, oneshot_id),
253            event_node: EventNode::new(),
254            state: PeerFutureState::Created,
255            data: None,
256        }
257    }
258
259    fn set_state(&mut self, new_state: PeerFutureState) {
260        modtrace!(
261            self.tracer(),
262            "oneshot_recver_future: state {:?} -> {:?}",
263            self.state,
264            new_state
265        );
266        self.state = new_state;
267    }
268
269    fn transmit(&mut self, event_id: EventId) -> Poll<Result<T, RecvError>> {
270        self.set_state(PeerFutureState::Exchanging);
271        self.runtime_channel
272            .reg_receiver(event_id, (&mut self.data) as *mut Option<T> as *mut ());
273
274        Poll::Pending
275    }
276
277    fn close(&mut self) -> Poll<Result<T, RecvError>> {
278        if !self.event_node.is_awoken_for(self.runtime_channel.rt) {
279            return Poll::Pending;
280        }
281
282        self.set_state(PeerFutureState::Closed);
283        return if unsafe { self.runtime_channel.exchange::<T>() } {
284            Poll::Ready(Ok(self.data.take().unwrap()))
285        } else {
286            Poll::Ready(Err(RecvError))
287        };
288    }
289
290    fn tracer(&self) -> &Tracer {
291        self.runtime_channel.rt.tracer()
292    }
293}
294
295impl<'runtime, T, ReactorT: Reactor> Drop for RecverOnce<'runtime, T, ReactorT> {
296    fn drop(&mut self) {
297        modtrace!(self.tracer(), "oneshot_recver_future: in the drop()");
298        self.runtime_channel.cancel_receiver();
299        let _ = self.event_node.on_cancel(); // remove the events from frozen list
300    }
301}
302
303impl<'runtime, T, ReactorT: Reactor> Future for RecverOnce<'runtime, T, ReactorT> {
304    type Output = Result<T, RecvError>;
305
306    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
307        modtrace!(self.tracer(), "oneshot_recver_future: in the poll()");
308
309        // Unsafe usage: this function does not moves out data from self, as required by
310        // Pin::map_unchecked_mut().
311        let this = unsafe { self.get_unchecked_mut() };
312
313        return match this.state {
314            PeerFutureState::Created => {
315                let event_id = unsafe { this.event_node.on_pin(ctx) };
316                this.transmit(event_id) // always returns Pending
317            }
318            PeerFutureState::Exchanging => this.close(),
319            PeerFutureState::Closed => {
320                panic!("aiur/oneshot_recver_future: was polled after completion.")
321            }
322        };
323    }
324}