lilos_handoff/lib.rs
1//! Mechanism for handing data from one task to another, minimizing copies.
2//!
3//! This crate provides the `Handoff` abstraction for `lilos`.
4//!
5//! There are two sides to a `Handoff<T>`, the sender and the receiver. When both
6//! the sender and receiver are ready, a single `T` gets transferred from the
7//! sender's ownership to the receiver's. In this case, "ready" means that
8//! either the sender or receiver was already blocked waiting for its peer when
9//! that peer arrived -- with both tasks waiting at the handoff, we can copy the
10//! data and then unblock both.
11//!
12//! Because we don't need any sort of holding area for a copy of the `T`, a
13//! `Handoff<T>` is very small -- about the size of two pointers.
14//!
15//! In computer science this is referred to as a _rendezvous_, but that's harder
16//! to spell than handoff.
17//!
18//! # Creating and using a `Handoff`
19//!
20//! Because the `Handoff` itself contains no storage, they're cheap to create on
21//! the stack. You then need to `split` then into their `Pusher` and `Popper`
22//! ends -- these both _borrow_ the `Handoff`, so you need to keep it around.
23//! You can then hand the ends off to other futures. A typical use case looks
24//! like this:
25//!
26//! ```ignore
27//! let mut handoff = Handoff::new();
28//! let (push, pop) = handoff.split();
29//! join!(data_producer(push), data_consumer(pop));
30//! ```
31//!
32//! If you just want to synchronize two tasks at a rendezvous point, and don't
33//! need to move data, use `Handoff<()>`. It does the right thing.
34//!
35//! # Caveats and alternatives
36//!
37//! Only one `Pusher` and `Popper` can exist at a time -- the compiler ensures
38//! this. This simplifies the implementation quite a bit, but it means that if
39//! you want a multi-party rendezvous this isn't the right tool.
40//!
41//! If you would like to be able to push data and go on about your business
42//! without waiting for it to be popped, you want a queue, not a handoff. See
43//! the `lilos::spsc` module.
44//!
45//! Note that none of these types are `Send` or `Sync` -- they are very much not
46//! thread safe, so they can be freely used across `async` tasks but cannot be
47//! shared with an interrupt handler. For the same reason, you probably don't
48//! want to attempt to store one in a `static` -- you will succeed with enough
49//! `unsafe`, but the result will not be useful! The queues provided in `spsc`
50//! do not have this limitation, at the cost of being more work to set up.
51//!
52//! # Cancel safety
53//!
54//! `Handoff` is not strictly cancel-safe, unlike most of `lilos`. Concretely,
55//! dropping a `push` or `pop` future before it resolves can cause the loss of
56//! at most one data item.
57//!
58//! While technically cancel-unsafe, this is usually okay given the way handoffs
59//! are used in practice. Please read the docs for [`Pusher::push`] and
60//! [`Popper::pop`] carefully or you risk losing data.
61//!
62//! If the push and pop ends of the handoff are "long-lived," held by tasks that
63//! won't be cancelled (such as top-level tasks in `lilos`) and never used in
64//! contexts where the future might be cancelled (such as `with_timeout`), then
65//! you don't need to worry about that. This is not a property you can check
66//! with the compiler, though, so again -- be careful.
67
68#![no_std]
69
70#![warn(
71 elided_lifetimes_in_paths,
72 explicit_outlives_requirements,
73 missing_debug_implementations,
74 missing_docs,
75 semicolon_in_expressions_from_macros,
76 single_use_lifetimes,
77 trivial_casts,
78 trivial_numeric_casts,
79 unreachable_pub,
80 unsafe_op_in_unsafe_fn,
81 unused_qualifications,
82)]
83
84use core::cell::Cell;
85use core::ptr::NonNull;
86
87use scopeguard::ScopeGuard;
88
89use lilos::exec::Notify;
90
91/// Shared control block for a `Handoff`. See the module docs for more
92/// information.
93#[derive(Default)]
94pub struct Handoff<T> {
95 state: Cell<State<T>>,
96 ping: Notify,
97}
98
99impl<T> Handoff<T> {
100 /// Creates a new Handoff in idle state.
101 pub const fn new() -> Self {
102 Self {
103 state: Cell::new(State::Idle),
104 ping: Notify::new(),
105 }
106 }
107
108 /// Borrows `self` exclusively and produces `Pusher` and `Popper` endpoints.
109 /// The endpoints are guaranteed to be unique, since they can't be cloned
110 /// and you can't call `split` to make new ones until both are
111 /// dropped/forgotten.
112 pub fn split(&mut self) -> (Pusher<'_, T>, Popper<'_, T>) {
113 (Pusher(self), Popper(self))
114 }
115}
116
117impl<T> Drop for Handoff<T> {
118 fn drop(&mut self) {
119 // It should be impossible to drop a Handoff while anyone is waiting on
120 // it, but let's check.
121 debug_assert!(matches!(self.state.get(), State::Idle));
122 }
123}
124
125/// Implement Debug by hand so it doesn't require T: Debug.
126impl<T> core::fmt::Debug for Handoff<T> {
127 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
128 f.debug_struct("Handoff")
129 .field("state", &self.state)
130 .field("ping", &self.ping)
131 .finish()
132 }
133}
134
135/// Internal representation of handoff state.
136///
137/// Note that we store pointers to the inside of the futures. This is OK because
138/// they're only stored during `poll`, which in turn can only be called on a
139/// pinned future. So we know the futures cannot move without being dropped, and
140/// thus the pointers will remain valid (the futures take care to reset these
141/// pointers on drop).
142#[derive(Default)]
143enum State<T> {
144 /// Nobody's waiting.
145 #[default]
146 Idle,
147 /// Push side is blocked, and here is a pointer to the value they're
148 /// offering. (The `Option` will be `Some(value)`, and to pop you must reset
149 /// it to `None` and then write the state to `Idle`.)
150 PushWait(NonNull<Option<T>>),
151 /// Pop side is blocked, and here is a pointer to the buffer where a value
152 /// shall be deposited. (The `Option` will be `None`, and to push you must
153 /// set it to `Some(value)` and then write the state to `Idle`.)
154 PopWait(NonNull<Option<T>>),
155}
156
157/// Implement Debug by hand so it doesn't require T: Debug.
158impl<T> core::fmt::Debug for State<T> {
159 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
160 match self {
161 Self::Idle => f.write_str("Idle"),
162 Self::PushWait(p) => f.debug_tuple("PushWait").field(p).finish(),
163 Self::PopWait(p) => f.debug_tuple("PopWait").field(p).finish(),
164 }
165 }
166}
167
168// Manually deriving Copy and Clone so they don't require T: Copy/Clone.
169impl<T> Copy for State<T> {}
170impl<T> Clone for State<T> {
171 fn clone(&self) -> Self {
172 *self // thanks, Copy impl!
173 }
174}
175
176/// Push endpoint for a `Handoff<T>`. Holding this allows you to offer a single
177/// item at a time to whoever's holding the `Popper` side.
178pub struct Pusher<'a, T>(&'a Handoff<T>);
179
180impl<T> Pusher<'_, T> {
181 /// Offers `value` to our peer, if they are waiting to receive it.
182 ///
183 /// If someone is blocked on the `Popper` side, `value` is transferred to
184 /// them, they are unblocked, and this returns `Ok(())`.
185 ///
186 /// Otherwise, it returns `Err(value)`, giving `value` back to you.
187 pub fn try_push(&mut self, value: T) -> Result<(), T> {
188 match self.0.state.get() {
189 State::PopWait(dest_ptr) => {
190 // Our peer is waiting.
191 unsafe {
192 dest_ptr.as_ptr().write(Some(value));
193 }
194 self.0.state.set(State::Idle);
195 self.0.ping.notify();
196 Ok(())
197 },
198 #[cfg(debug_assertions)]
199 State::PushWait(_) => panic!(),
200 _ => Err(value),
201 }
202 }
203
204 /// Produces a future that resolves when `value` can be handed off to our
205 /// peer.
206 ///
207 /// # Cancellation
208 ///
209 /// **Cancel Safety:** Weak.
210 ///
211 /// If this future is dropped before it resolves, `value` is dropped, i.e.
212 /// both you and the peer lose access to it. This means the operation can't
213 /// reasonably be retried, and means that if collecting `value` in the first
214 /// place had side effects, there's no good way to roll those back.
215 ///
216 /// If the code using `push` can hang on to a copy of `value`, or if losing
217 /// `value` on cancellation is okay, then this operation _can_ be used
218 /// safely.
219 ///
220 /// I'm trying to figure out a version of this with strict safety.
221 /// Suggestions welcome.
222 pub async fn push(&mut self, value: T) {
223 let mut guard = scopeguard::guard(Some(value), |v| {
224 if v.is_some() {
225 // Cancelled while waiting to push! We know that...
226 // - We have been polled at least once (or we wouldn't be here)
227 // - All paths to await in this function set the state to
228 // PushWait.
229 // - If the peer sets the state to something other than
230 // PushWait, they take the value.
231 // - Thus the current state is...
232 debug_assert!(matches!(self.0.state.get(), State::PushWait(_)));
233 // ...and we want to eliminate the suggestion that a pusher is
234 // waiting.
235 self.0.state.set(State::Idle);
236 }
237 });
238 loop {
239 if guard.is_some() {
240 // Value has not yet been taken. What can we do about that?
241 match self.0.state.get() {
242 State::Idle => {
243 // Our peer is not waiting, we must block.
244 self.0.state.set(State::PushWait(
245 NonNull::from(&mut *guard)
246 ));
247 self.0.ping.until_next().await;
248 continue;
249 }
250 State::PushWait(_) => {
251 // We are already blocked; spurious wakeup.
252 self.0.ping.until_next().await;
253 continue;
254 }
255 State::PopWait(dest_ptr) => {
256 // Our peer is waiting. We can do the handoff
257 // immediately. Defuse the guard.
258 unsafe {
259 dest_ptr.as_ptr().write(ScopeGuard::into_inner(guard));
260 }
261 self.0.state.set(State::Idle);
262 self.0.ping.notify();
263 return;
264 },
265 }
266 } else {
267 // Value must have been taken while we were sleeping.
268 // Pop side should have left state in either of....
269 debug_assert!(matches!(self.0.state.get(), State::Idle | State::PopWait(_)));
270 break;
271 }
272 }
273 }
274}
275
276/// Implement Debug by hand so it doesn't require T: Debug.
277impl<T> core::fmt::Debug for Pusher<'_, T> {
278 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
279 f.debug_tuple("Pusher").field(&self.0).finish()
280 }
281}
282
283/// Pop endpoint for a `Handoff<T>`. Holding this allows you to take a single
284/// item at a time from whoever's holding the `Pusher` side.
285pub struct Popper<'a, T>(&'a Handoff<T>);
286
287impl<T> Popper<'_, T> {
288 /// Takes data from the `Pusher` peer if it's waiting.
289 ///
290 /// If the peer is blocked offering us data, this unblocks them and returns
291 /// `Some(value)`.
292 ///
293 /// Otherwise, returns `None`.
294 pub fn try_pop(&mut self) -> Option<T> {
295 match self.0.state.get() {
296 State::PushWait(src_ptr) => {
297 // Our peer is waiting. Take the thingy.
298 //
299 // Safety: if we're in this state the source pointer is valid
300 // and the backing memory is not being used -- since if the peer
301 // had resumed, it would have knocked us out of this state.
302 let value = unsafe { &mut *src_ptr.as_ptr() }.take();
303
304 self.0.state.set(State::Idle);
305 self.0.ping.notify();
306 value
307 },
308 #[cfg(debug_assertions)]
309 State::PopWait(_) => panic!(),
310 _ => None,
311 }
312 }
313
314 /// Produces a future that resolves when the peer offers a value.
315 ///
316 /// # Cancellation
317 ///
318 /// **Cancel Safety:** Weak.
319 ///
320 /// If this is dropped before it resolves, no data will be lost: we have
321 /// either taken data from the `Pusher` side and resolved, or we have not
322 /// taken data.
323 ///
324 /// *However,* if this future is pending when another task successfully
325 /// `push`-es, and _then_ this future is dropped before resolving, the
326 /// pushed data is lost with no feedback to the pusher.
327 pub async fn pop(&mut self) -> T {
328 let mut guard = scopeguard::guard(None, |v| {
329 if v.is_none() {
330 // Cancelled while waiting to pop! We know that...
331 // - We have been polled at least once (or we wouldn't be here)
332 // - All paths to await in this function set the state to
333 // PopWait.
334 // - If the peer sets the state to something other than
335 // PopWait, they deliver a value.
336 // - Thus the current state is...
337 debug_assert!(matches!(self.0.state.get(), State::PopWait(_)));
338 // ...and we want to eliminate the suggestion that a popper is
339 // waiting.
340 self.0.state.set(State::Idle);
341 }
342 });
343 loop {
344 if guard.is_some() {
345 // Value must have been deposited while we slept. The push side
346 // should either have left the state idle, or began blocking for
347 // our next item:
348 debug_assert!(matches!(self.0.state.get(), State::Idle | State::PushWait(_)));
349
350 return ScopeGuard::into_inner(guard).unwrap();
351 } else {
352 // Value has not yet been delivered. What can we do about that?
353 match self.0.state.get() {
354 State::Idle => {
355 // Our peer is not waiting, we must block.
356 self.0.state.set(State::PopWait(
357 NonNull::from(&mut *guard)
358 ));
359 self.0.ping.until_next().await;
360 continue;
361 }
362 State::PopWait(_) => {
363 // We are still blocked; spurious wakeup.
364 self.0.ping.until_next().await;
365 continue;
366 },
367 State::PushWait(src_ptr) => {
368 // Our peer is waiting. We can do the handoff
369 // immediately.
370 core::mem::swap(
371 unsafe { &mut *src_ptr.as_ptr() },
372 &mut *guard,
373 );
374 self.0.state.set(State::Idle);
375 self.0.ping.notify();
376 return ScopeGuard::into_inner(guard).unwrap();
377 },
378 }
379 }
380 }
381 }
382}
383
384/// Implement Debug by hand so it doesn't require T: Debug.
385impl<T> core::fmt::Debug for Popper<'_, T> {
386 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
387 f.debug_tuple("Popper").field(&self.0).finish()
388 }
389}