lilos_handoff/
lib.rs

1//! Mechanism for handing data from one task to another, minimizing copies.
2//!
3//! This crate provides the `Handoff` abstraction for `lilos`.
4//!
5//! There are two sides to a `Handoff<T>`, the sender and the receiver. When both
6//! the sender and receiver are ready, a single `T` gets transferred from the
7//! sender's ownership to the receiver's. In this case, "ready" means that
8//! either the sender or receiver was already blocked waiting for its peer when
9//! that peer arrived -- with both tasks waiting at the handoff, we can copy the
10//! data and then unblock both.
11//!
12//! Because we don't need any sort of holding area for a copy of the `T`, a
13//! `Handoff<T>` is very small -- about the size of two pointers.
14//!
15//! In computer science this is referred to as a _rendezvous_, but that's harder
16//! to spell than handoff.
17//!
18//! # Creating and using a `Handoff`
19//!
20//! Because the `Handoff` itself contains no storage, they're cheap to create on
21//! the stack. You then need to `split` then into their `Pusher` and `Popper`
22//! ends -- these both _borrow_ the `Handoff`, so you need to keep it around.
23//! You can then hand the ends off to other futures. A typical use case looks
24//! like this:
25//!
26//! ```ignore
27//! let mut handoff = Handoff::new();
28//! let (push, pop) = handoff.split();
29//! join!(data_producer(push), data_consumer(pop));
30//! ```
31//!
32//! If you just want to synchronize two tasks at a rendezvous point, and don't
33//! need to move data, use `Handoff<()>`. It does the right thing.
34//!
35//! # Caveats and alternatives
36//!
37//! Only one `Pusher` and `Popper` can exist at a time -- the compiler ensures
38//! this.  This simplifies the implementation quite a bit, but it means that if
39//! you want a multi-party rendezvous this isn't the right tool.
40//!
41//! If you would like to be able to push data and go on about your business
42//! without waiting for it to be popped, you want a queue, not a handoff. See
43//! the `lilos::spsc` module.
44//!
45//! Note that none of these types are `Send` or `Sync` -- they are very much not
46//! thread safe, so they can be freely used across `async` tasks but cannot be
47//! shared with an interrupt handler. For the same reason, you probably don't
48//! want to attempt to store one in a `static` -- you will succeed with enough
49//! `unsafe`, but the result will not be useful! The queues provided in `spsc`
50//! do not have this limitation, at the cost of being more work to set up.
51//!
52//! # Cancel safety
53//!
54//! `Handoff` is not strictly cancel-safe, unlike most of `lilos`. Concretely,
55//! dropping a `push` or `pop` future before it resolves can cause the loss of
56//! at most one data item.
57//!
58//! While technically cancel-unsafe, this is usually okay given the way handoffs
59//! are used in practice. Please read the docs for [`Pusher::push`] and
60//! [`Popper::pop`] carefully or you risk losing data.
61//!
62//! If the push and pop ends of the handoff are "long-lived," held by tasks that
63//! won't be cancelled (such as top-level tasks in `lilos`) and never used in
64//! contexts where the future might be cancelled (such as `with_timeout`), then
65//! you don't need to worry about that. This is not a property you can check
66//! with the compiler, though, so again -- be careful.
67
68#![no_std]
69
70#![warn(
71    elided_lifetimes_in_paths,
72    explicit_outlives_requirements,
73    missing_debug_implementations,
74    missing_docs,
75    semicolon_in_expressions_from_macros,
76    single_use_lifetimes,
77    trivial_casts,
78    trivial_numeric_casts,
79    unreachable_pub,
80    unsafe_op_in_unsafe_fn,
81    unused_qualifications,
82)]
83
84use core::cell::Cell;
85use core::ptr::NonNull;
86
87use scopeguard::ScopeGuard;
88
89use lilos::exec::Notify;
90
91/// Shared control block for a `Handoff`. See the module docs for more
92/// information.
93#[derive(Default)]
94pub struct Handoff<T> {
95    state: Cell<State<T>>,
96    ping: Notify,
97}
98
99impl<T> Handoff<T> {
100    /// Creates a new Handoff in idle state.
101    pub const fn new() -> Self {
102        Self {
103            state: Cell::new(State::Idle),
104            ping: Notify::new(),
105        }
106    }
107
108    /// Borrows `self` exclusively and produces `Pusher` and `Popper` endpoints.
109    /// The endpoints are guaranteed to be unique, since they can't be cloned
110    /// and you can't call `split` to make new ones until both are
111    /// dropped/forgotten.
112    pub fn split(&mut self) -> (Pusher<'_, T>, Popper<'_, T>) {
113        (Pusher(self), Popper(self))
114    }
115}
116
117impl<T> Drop for Handoff<T> {
118    fn drop(&mut self) {
119        // It should be impossible to drop a Handoff while anyone is waiting on
120        // it, but let's check.
121        debug_assert!(matches!(self.state.get(), State::Idle));
122    }
123}
124
125/// Implement Debug by hand so it doesn't require T: Debug.
126impl<T> core::fmt::Debug for Handoff<T> {
127    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
128        f.debug_struct("Handoff")
129            .field("state", &self.state)
130            .field("ping", &self.ping)
131            .finish()
132    }
133}
134
135/// Internal representation of handoff state.
136///
137/// Note that we store pointers to the inside of the futures. This is OK because
138/// they're only stored during `poll`, which in turn can only be called on a
139/// pinned future. So we know the futures cannot move without being dropped, and
140/// thus the pointers will remain valid (the futures take care to reset these
141/// pointers on drop).
142#[derive(Default)]
143enum State<T> {
144    /// Nobody's waiting.
145    #[default]
146    Idle,
147    /// Push side is blocked, and here is a pointer to the value they're
148    /// offering. (The `Option` will be `Some(value)`, and to pop you must reset
149    /// it to `None` and then write the state to `Idle`.)
150    PushWait(NonNull<Option<T>>),
151    /// Pop side is blocked, and here is a pointer to the buffer where a value
152    /// shall be deposited. (The `Option` will be `None`, and to push you must
153    /// set it to `Some(value)` and then write the state to `Idle`.)
154    PopWait(NonNull<Option<T>>),
155}
156
157/// Implement Debug by hand so it doesn't require T: Debug.
158impl<T> core::fmt::Debug for State<T> {
159    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
160        match self {
161            Self::Idle => f.write_str("Idle"),
162            Self::PushWait(p) => f.debug_tuple("PushWait").field(p).finish(),
163            Self::PopWait(p) => f.debug_tuple("PopWait").field(p).finish(),
164        }
165    }
166}
167
168// Manually deriving Copy and Clone so they don't require T: Copy/Clone.
169impl<T> Copy for State<T> {}
170impl<T> Clone for State<T> {
171    fn clone(&self) -> Self {
172        *self  // thanks, Copy impl!
173    }
174}
175
176/// Push endpoint for a `Handoff<T>`. Holding this allows you to offer a single
177/// item at a time to whoever's holding the `Popper` side.
178pub struct Pusher<'a, T>(&'a Handoff<T>);
179
180impl<T> Pusher<'_, T> {
181    /// Offers `value` to our peer, if they are waiting to receive it.
182    ///
183    /// If someone is blocked on the `Popper` side, `value` is transferred to
184    /// them, they are unblocked, and this returns `Ok(())`.
185    ///
186    /// Otherwise, it returns `Err(value)`, giving `value` back to you.
187    pub fn try_push(&mut self, value: T) -> Result<(), T> {
188        match self.0.state.get() {
189            State::PopWait(dest_ptr) => {
190                // Our peer is waiting.
191                unsafe {
192                    dest_ptr.as_ptr().write(Some(value));
193                }
194                self.0.state.set(State::Idle);
195                self.0.ping.notify();
196                Ok(())
197            },
198            #[cfg(debug_assertions)]
199            State::PushWait(_) => panic!(),
200            _ => Err(value),
201        }
202    }
203
204    /// Produces a future that resolves when `value` can be handed off to our
205    /// peer.
206    ///
207    /// # Cancellation
208    ///
209    /// **Cancel Safety:** Weak.
210    ///
211    /// If this future is dropped before it resolves, `value` is dropped, i.e.
212    /// both you and the peer lose access to it. This means the operation can't
213    /// reasonably be retried, and means that if collecting `value` in the first
214    /// place had side effects, there's no good way to roll those back.
215    ///
216    /// If the code using `push` can hang on to a copy of `value`, or if losing
217    /// `value` on cancellation is okay, then this operation _can_ be used
218    /// safely.
219    ///
220    /// I'm trying to figure out a version of this with strict safety.
221    /// Suggestions welcome.
222    pub async fn push(&mut self, value: T) {
223        let mut guard = scopeguard::guard(Some(value), |v| {
224            if v.is_some() {
225                // Cancelled while waiting to push! We know that...
226                // - We have been polled at least once (or we wouldn't be here)
227                // - All paths to await in this function set the state to
228                //   PushWait.
229                // - If the peer sets the state to something other than
230                //   PushWait, they take the value.
231                // - Thus the current state is...
232                debug_assert!(matches!(self.0.state.get(), State::PushWait(_)));
233                // ...and we want to eliminate the suggestion that a pusher is
234                // waiting.
235                self.0.state.set(State::Idle);
236            }
237        });
238        loop {
239            if guard.is_some() {
240                // Value has not yet been taken. What can we do about that?
241                match self.0.state.get() {
242                    State::Idle => {
243                        // Our peer is not waiting, we must block.
244                        self.0.state.set(State::PushWait(
245                            NonNull::from(&mut *guard)
246                        ));
247                        self.0.ping.until_next().await;
248                        continue;
249                    }
250                    State::PushWait(_) => {
251                        // We are already blocked; spurious wakeup.
252                        self.0.ping.until_next().await;
253                        continue;
254                    }
255                    State::PopWait(dest_ptr) => {
256                        // Our peer is waiting. We can do the handoff
257                        // immediately. Defuse the guard.
258                        unsafe {
259                            dest_ptr.as_ptr().write(ScopeGuard::into_inner(guard));
260                        }
261                        self.0.state.set(State::Idle);
262                        self.0.ping.notify();
263                        return;
264                    },
265                }
266            } else {
267                // Value must have been taken while we were sleeping.
268                // Pop side should have left state in either of....
269                debug_assert!(matches!(self.0.state.get(), State::Idle | State::PopWait(_)));
270                break;
271            }
272        }
273    }
274}
275
276/// Implement Debug by hand so it doesn't require T: Debug.
277impl<T> core::fmt::Debug for Pusher<'_, T> {
278    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
279        f.debug_tuple("Pusher").field(&self.0).finish()
280    }
281}
282
283/// Pop endpoint for a `Handoff<T>`. Holding this allows you to take a single
284/// item at a time from whoever's holding the `Pusher` side.
285pub struct Popper<'a, T>(&'a Handoff<T>);
286
287impl<T> Popper<'_, T> {
288    /// Takes data from the `Pusher` peer if it's waiting.
289    ///
290    /// If the peer is blocked offering us data, this unblocks them and returns
291    /// `Some(value)`.
292    ///
293    /// Otherwise, returns `None`.
294    pub fn try_pop(&mut self) -> Option<T> {
295        match self.0.state.get() {
296            State::PushWait(src_ptr) => {
297                // Our peer is waiting. Take the thingy.
298                //
299                // Safety: if we're in this state the source pointer is valid
300                // and the backing memory is not being used -- since if the peer
301                // had resumed, it would have knocked us out of this state.
302                let value = unsafe { &mut *src_ptr.as_ptr() }.take();
303
304                self.0.state.set(State::Idle);
305                self.0.ping.notify();
306                value
307            },
308            #[cfg(debug_assertions)]
309            State::PopWait(_) => panic!(),
310            _ => None,
311        }
312    }
313
314    /// Produces a future that resolves when the peer offers a value.
315    ///
316    /// # Cancellation
317    ///
318    /// **Cancel Safety:** Weak.
319    ///
320    /// If this is dropped before it resolves, no data will be lost: we have
321    /// either taken data from the `Pusher` side and resolved, or we have not
322    /// taken data.
323    ///
324    /// *However,* if this future is pending when another task successfully
325    /// `push`-es, and _then_ this future is dropped before resolving, the
326    /// pushed data is lost with no feedback to the pusher.
327    pub async fn pop(&mut self) -> T {
328        let mut guard = scopeguard::guard(None, |v| {
329            if v.is_none() {
330                // Cancelled while waiting to pop! We know that...
331                // - We have been polled at least once (or we wouldn't be here)
332                // - All paths to await in this function set the state to
333                //   PopWait.
334                // - If the peer sets the state to something other than
335                //   PopWait, they deliver a value.
336                // - Thus the current state is...
337                debug_assert!(matches!(self.0.state.get(), State::PopWait(_)));
338                // ...and we want to eliminate the suggestion that a popper is
339                // waiting.
340                self.0.state.set(State::Idle);
341            }
342        });
343        loop {
344            if guard.is_some() {
345                // Value must have been deposited while we slept. The push side
346                // should either have left the state idle, or began blocking for
347                // our next item:
348                debug_assert!(matches!(self.0.state.get(), State::Idle | State::PushWait(_)));
349
350                return ScopeGuard::into_inner(guard).unwrap();
351            } else {
352                // Value has not yet been delivered. What can we do about that?
353                match self.0.state.get() {
354                    State::Idle => {
355                        // Our peer is not waiting, we must block.
356                        self.0.state.set(State::PopWait(
357                            NonNull::from(&mut *guard)
358                        ));
359                        self.0.ping.until_next().await;
360                        continue;
361                    }
362                    State::PopWait(_) => {
363                        // We are still blocked; spurious wakeup.
364                        self.0.ping.until_next().await;
365                        continue;
366                    },
367                    State::PushWait(src_ptr) => {
368                        // Our peer is waiting. We can do the handoff
369                        // immediately.
370                        core::mem::swap(
371                            unsafe { &mut *src_ptr.as_ptr() },
372                            &mut *guard,
373                        );
374                        self.0.state.set(State::Idle);
375                        self.0.ping.notify();
376                        return ScopeGuard::into_inner(guard).unwrap();
377                    },
378                }
379            }
380        }
381    }
382}
383
384/// Implement Debug by hand so it doesn't require T: Debug.
385impl<T> core::fmt::Debug for Popper<'_, T> {
386    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
387        f.debug_tuple("Popper").field(&self.0).finish()
388    }
389}