pasts/
notify.rs

1//! Asynchronous event notifys
2//!
3//! A [`Notify`] is kind of like a cross between a [`Future`] and an
4//! [`AsyncIterator`](core::async_iter::AsyncIterator).  Like streams, they may
5//! return more than one value, and are expected to not panic after polling.
6//! Like futures, they produce non-optional values.  In a sense they are an
7//! infinite stream.  In another sense, they are a repeating future.
8//!
9//! # Why Another Abstraction?
10//! Notifys allow for some nice ergonomics and guarantees when working with
11//! event-loop based asynchronous code, which could lead to some
12//! simplifications.  Unlike futures and streams, they do not need to be fused,
13//! and if your stream is infinite, you won't need to sprinkle `unwrap()`s in
14//! your code at each call to `.next()`.  They also lend themselves nicely for
15//! creating clean and simple multimedia APIs.
16
17use core::fmt;
18
19use crate::prelude::*;
20
21/// An owned dynamically typed [`Notify`] for use in cases where you can’t
22/// statically type your result or need to add some indirection.
23pub type BoxNotify<'a, T = ()> = Pin<Box<dyn Notify<Event = T> + Send + 'a>>;
24
25impl<T> fmt::Debug for BoxNotify<'_, T> {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        f.write_str("BoxNotify")
28    }
29}
30
31/// [`BoxNotify`] without the [`Send`] requirement.
32pub type LocalBoxNotify<'a, T = ()> = Pin<Box<dyn Notify<Event = T> + 'a>>;
33
34impl<T> fmt::Debug for LocalBoxNotify<'_, T> {
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36        f.write_str("LocalBoxNotify")
37    }
38}
39
40/// Trait for asynchronous event notification
41///
42/// Similar to [`AsyncIterator`](core::async_iter::AsyncIterator), but infinite.
43///
44/// It's expected that [`Notify`]s can be polled indefinitely without causing
45/// panics or undefined behavior.  They are not required to continue sending
46/// events indefinitely, though.
47pub trait Notify {
48    /// The event produced by this notify
49    type Event;
50
51    /// Get the next event from this notify, registering a wakeup when not
52    /// ready.
53    ///
54    /// # Return Value
55    ///  - `Poll::Pending` - Not ready yet
56    ///  - `Poll::Ready(val)` - Ready with next value
57    fn poll_next(self: Pin<&mut Self>, t: &mut Task<'_>) -> Poll<Self::Event>;
58}
59
60impl<N> Notify for Box<N>
61where
62    N: ?Sized + Notify + Unpin,
63{
64    type Event = N::Event;
65
66    #[inline]
67    fn poll_next(self: Pin<&mut Self>, t: &mut Task<'_>) -> Poll<N::Event> {
68        Pin::new(self.get_mut().as_mut()).poll_next(t)
69    }
70}
71
72impl<N, P> Notify for Pin<P>
73where
74    P: core::ops::DerefMut<Target = N> + Unpin,
75    N: Notify + ?Sized,
76{
77    type Event = N::Event;
78
79    fn poll_next(self: Pin<&mut Self>, t: &mut Task<'_>) -> Poll<Self::Event> {
80        Pin::get_mut(self).as_mut().poll_next(t)
81    }
82}
83
84impl<N> Notify for &mut N
85where
86    N: Notify + Unpin + ?Sized,
87{
88    type Event = N::Event;
89
90    #[inline]
91    fn poll_next(mut self: Pin<&mut Self>, t: &mut Task<'_>) -> Poll<N::Event> {
92        Pin::new(&mut **self).poll_next(t)
93    }
94}
95
96impl<N> Notify for [N]
97where
98    N: Notify + Unpin,
99{
100    type Event = (usize, N::Event);
101
102    #[inline]
103    fn poll_next(self: Pin<&mut Self>, t: &mut Task<'_>) -> Poll<Self::Event> {
104        for (i, this) in self.get_mut().iter_mut().enumerate() {
105            if let Poll::Ready(value) = Pin::new(this).poll_next(t) {
106                return Poll::Ready((i, value));
107            }
108        }
109
110        Poll::Pending
111    }
112}
113
114/// An extension trait for [`Notify`]s that provides a variety of convenient
115/// adapters.
116pub trait NotifyExt: Notify + Sized + Unpin {
117    /// Get the next [`Notify::Event`]
118    ///
119    /// # Usage
120    /// ```rust
121    /// use pasts::prelude::*;
122    /// use async_main::Spawn;
123    ///
124    /// struct MyAsyncIter;
125    ///
126    /// impl Notify for MyAsyncIter {
127    ///     type Event = Option<u32>;
128    ///
129    ///     fn poll_next(self: Pin<&mut Self>, _: &mut Task<'_>) -> Poll<Self::Event> {
130    ///         Ready(Some(1))
131    ///     }
132    /// }
133    ///
134    /// #[async_main::async_main]
135    /// async fn main(_spawner: impl Spawn) {
136    ///     let mut count = 0;
137    ///     let mut async_iter = MyAsyncIter;
138    ///     let mut iterations = 0;
139    ///     while let Some(i) = async_iter.next().await {
140    ///         count += i;
141    ///         iterations += 1;
142    ///         if iterations == 3 {
143    ///             break;
144    ///         }
145    ///     }
146    ///     assert_eq!(count, 3);
147    /// }
148    /// ```
149    #[inline(always)]
150    fn next(&mut self) -> Next<'_, Self> {
151        Next(self)
152    }
153
154    /// Transform produced [`Notify::Event`]s with a function.
155    #[inline(always)]
156    fn map<F>(self, f: F) -> Map<Self, F> {
157        let noti = self;
158
159        Map { noti, f }
160    }
161}
162
163impl<N: Notify + Sized + Unpin> NotifyExt for N {}
164
165/// The [`Future`] returned from [`NotifyExt::next()`]
166#[derive(Debug)]
167pub struct Next<'a, N>(&'a mut N)
168where
169    N: Notify + Unpin;
170
171impl<N> Future for Next<'_, N>
172where
173    N: Notify + Unpin,
174{
175    type Output = N::Event;
176
177    #[inline]
178    fn poll(self: Pin<&mut Self>, t: &mut Task<'_>) -> Poll<Self::Output> {
179        Pin::new(&mut self.get_mut().0).poll_next(t)
180    }
181}
182
183/// Trait for "fusing" a [`Future`] (conversion to a [`Notify`])
184pub trait Fuse: Sized {
185    /// Fuse the [`Future`]
186    fn fuse(self) -> Option<Self>;
187}
188
189impl<F> Fuse for F
190where
191    F: Future,
192{
193    fn fuse(self) -> Option<Self> {
194        self.into()
195    }
196}
197
198impl<F: Future> Notify for Option<F> {
199    type Event = F::Output;
200
201    #[inline]
202    fn poll_next(self: Pin<&mut Self>, t: &mut Task<'_>) -> Poll<F::Output> {
203        let mut s = self;
204        let out = s.as_mut().as_pin_mut().map(|f| f.poll(t));
205        if matches!(out, Some(Poll::Ready(_))) {
206            s.set(None);
207        }
208        out.unwrap_or(Poll::Pending)
209    }
210}
211
212/// The [`Notify`] returned from [`NotifyExt::map()`]
213#[derive(Debug)]
214pub struct Map<N, F> {
215    noti: N,
216    f: F,
217}
218
219impl<N, F, E> Notify for Map<N, F>
220where
221    N: Notify + Unpin,
222    F: FnMut(N::Event) -> E + Unpin,
223{
224    type Event = E;
225
226    #[inline]
227    fn poll_next(mut self: Pin<&mut Self>, t: &mut Task<'_>) -> Poll<E> {
228        Pin::new(&mut self.noti).poll_next(t).map(&mut self.f)
229    }
230}
231
232/// A [`Notify`] that wraps a function returning a [`Future`]
233///
234/// This struct is created by [`future_fn()`].  See its documentation for more.
235#[derive(Debug)]
236pub struct FutureFn<T, F>(T, F);
237
238impl<T, F> Notify for FutureFn<T, F>
239where
240    T: Future + Unpin,
241    F: FnMut() -> T + Unpin,
242{
243    type Event = T::Output;
244
245    #[inline]
246    fn poll_next(self: Pin<&mut Self>, t: &mut Task<'_>) -> Poll<T::Output> {
247        let this = self.get_mut();
248        let poll = Pin::new(&mut this.0).poll(t);
249
250        if poll.is_ready() {
251            Pin::new(&mut this.0).set(this.1());
252        }
253
254        poll
255    }
256}
257
258/// A [`Notify`] created from a function returning [`Poll`]
259#[derive(Debug)]
260pub struct PollFn<F>(F);
261
262impl<T, F> Notify for PollFn<F>
263where
264    F: FnMut(&mut Task<'_>) -> Poll<T> + Unpin,
265{
266    type Event = T;
267
268    #[inline]
269    fn poll_next(self: Pin<&mut Self>, t: &mut Task<'_>) -> Poll<T> {
270        self.get_mut().0(t)
271    }
272}
273
274/// A [`Notify`] that never produces an event.
275///
276/// This struct is created by [`pending()`].  See its documentation for more.
277#[derive(Debug)]
278pub struct Pending<T>(core::marker::PhantomData<fn() -> T>);
279
280impl<T> Notify for Pending<T> {
281    type Event = T;
282
283    fn poll_next(self: Pin<&mut Self>, _task: &mut Task<'_>) -> Poll<T> {
284        Poll::Pending
285    }
286}
287
288/// A [`Notify`] that immediately produces a single event.
289///
290/// This struct is created by [`ready()`].  See its documentation for more.
291#[derive(Debug)]
292pub struct Ready<T: Unpin>(Option<T>);
293
294impl<T: Unpin> Notify for Ready<T> {
295    type Event = T;
296
297    fn poll_next(self: Pin<&mut Self>, _task: &mut Task<'_>) -> Poll<T> {
298        let Some(event) = self.get_mut().0.take() else {
299            return Poll::Pending;
300        };
301
302        Poll::Ready(event)
303    }
304}
305
306/// A [`Notify`] that selects over a list of [`Notify`]s
307///
308/// This struct is created by [`select()`].  See its documentation for more.
309pub struct Select<'a, E, const N: usize>(
310    [&'a mut (dyn Notify<Event = E> + Unpin); N],
311    usize,
312);
313
314impl<E, const N: usize> fmt::Debug for Select<'_, E, N> {
315    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
316        f.write_str("Select")
317    }
318}
319
320impl<E: Unpin, const N: usize> Notify for Select<'_, E, N> {
321    type Event = E;
322
323    fn poll_next(self: Pin<&mut Self>, task: &mut Task<'_>) -> Poll<E> {
324        let s = self.get_mut();
325        let start = s.1;
326
327        // Early exit for if there is nothing that can be ready.
328        if N == 0 {
329            return Poll::Pending;
330        }
331
332        for i in (start..N).chain(0..start) {
333            if let Poll::Ready(event) = Pin::new(&mut s.0[i]).poll_next(task) {
334                return Poll::Ready(event);
335            }
336        }
337
338        Poll::Pending
339    }
340}
341
342/// Create a [`Notify`] that wraps a function returning a [`Future`].
343///
344/// Polling the notify delegates to future returned by the wrapped function.
345/// The wrapped function is called immediately, and is only called again once
346/// the future is polled and returns `Ready`.
347pub fn future_fn<T, F>(mut f: F) -> FutureFn<T, F>
348where
349    T: Future + Unpin,
350    F: FnMut() -> T + Unpin,
351{
352    FutureFn(f(), f)
353}
354
355/// Create a [`Notify`] that wraps a function returning [`Poll`].
356///
357/// Polling the future delegates to the wrapped function.
358pub fn poll_fn<T, F>(f: F) -> PollFn<F>
359where
360    F: FnMut(&mut Task<'_>) -> Poll<T> + Unpin,
361{
362    PollFn(f)
363}
364
365/// Create a [`Notify`] which never becomes ready with an event.
366pub fn pending<T>() -> Pending<T> {
367    Pending(core::marker::PhantomData)
368}
369
370/// Create a [`Notify`] which is immediately ready with an event.
371pub fn ready<T: Unpin>(t: T) -> Ready<T> {
372    Ready(t.into())
373}
374
375/// Create a [`Notify`] that selects over a list of [`Notify`]s.
376pub fn select<E, const N: usize>(
377    notifys: [&mut (dyn Notify<Event = E> + Unpin); N],
378) -> Select<'_, E, N> {
379    Select(notifys, 0)
380}