ordered_streams/
lib.rs

1#![no_std]
2#![doc = include_str!("../README.md")]
3
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7/// A stream that produces items that are ordered according to some token.
8///
9/// The main advantage of this trait over the standard `Stream` trait is the ability to implement a
10/// [`join`](join()) function that does not either block until both source streams produce an item
11/// or contain a race condition when rejoining streams that originated from a common well-ordered
12/// source.
13pub trait OrderedStream {
14    /// The type ordered by this stream.
15    ///
16    /// Each stream must produce values that are in ascending order according to this function,
17    /// although there is no requirement that the values be strictly ascending.
18    type Ordering: Ord;
19
20    /// The unordered data carried by this stream
21    ///
22    /// This is split from the `Ordering` type to allow specifying a smaller or cheaper-to-generate
23    /// type as the ordering key.  This is especially useful if you generate values to pass in to
24    /// `before`.
25    type Data;
26
27    /// Attempt to pull out the next value of this stream, registering the current task for wakeup
28    /// if needed, and returning `NoneBefore` if it is known that the stream will not produce any
29    /// more values ordered before the given point.
30    ///
31    /// # Return value
32    ///
33    /// There are several possible return values, each indicating a distinct stream state depending
34    /// on the value passed in `before`:
35    ///
36    /// - If `before` was `None`, `Poll::Pending` means that this stream's next value is not ready
37    /// yet. Implementations will ensure that the current task is notified when the next value may
38    /// be ready.
39    ///
40    /// - If `before` was `Some`, `Poll::Pending` means that this stream's next value is not ready
41    /// and that it is not yet known if the stream will produce a value ordered prior to the given
42    /// ordering value.  Implementations will ensure that the current task is notified when either
43    /// the next value is ready or once it is known that no such value will be produced.
44    ///
45    /// - `Poll::Ready(PollResult::Item)` means that the stream has successfully produced
46    /// an item.  The stream may produce further values on subsequent `poll_next_before` calls.
47    /// The returned ordering value must not be less than any prior ordering value returned by this
48    /// stream.  The returned ordering value **may** be greater than the value passed to `before`.
49    ///
50    /// - `Poll::Ready(PollResult::Terminated)` means that the stream has terminated, and
51    /// `poll_next_before` should not be invoked again.
52    ///
53    /// - `Poll::Ready(PollResult::NoneBefore)` means that the stream will not produce
54    /// any further ordering tokens less than the given token.  Subsequent `poll_next_before` calls
55    /// may still produce additional items, but their tokens will be greater than or equal to the
56    /// given token.  It does not make sense to return this value if `before` was `None`.
57    fn poll_next_before(
58        self: Pin<&mut Self>,
59        cx: &mut Context<'_>,
60        before: Option<&Self::Ordering>,
61    ) -> Poll<PollResult<Self::Ordering, Self::Data>>;
62
63    /// The minimum value of the ordering for any future items.
64    ///
65    /// If this does not return `None`, the returned ordering must be less than or equal to the
66    /// ordering of any future item returned from [`Self::poll_next_before`].  This value should
67    /// (but is not required to) be greater than or equal to the ordering of the most recent item
68    /// returned.
69    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
70        None
71    }
72
73    /// Returns the bounds on the remaining length of the stream.
74    fn size_hint(&self) -> (usize, Option<usize>) {
75        (0, None)
76    }
77}
78
79/// A value that is either borrowed or owned.
80///
81/// This is similar to `std::borrow::Cow`, but does not require the ability to convert from
82/// borrowed to owned.
83#[derive(Debug)]
84pub enum MaybeBorrowed<'a, T> {
85    Borrowed(&'a T),
86    Owned(T),
87}
88
89impl<'a, T> AsRef<T> for MaybeBorrowed<'a, T> {
90    fn as_ref(&self) -> &T {
91        match self {
92            Self::Borrowed(t) => t,
93            Self::Owned(t) => t,
94        }
95    }
96}
97
98impl<'a, T> core::ops::Deref for MaybeBorrowed<'a, T> {
99    type Target = T;
100
101    fn deref(&self) -> &T {
102        match self {
103            Self::Borrowed(t) => t,
104            Self::Owned(t) => t,
105        }
106    }
107}
108
109impl<P> OrderedStream for Pin<P>
110where
111    P: core::ops::DerefMut + Unpin,
112    P::Target: OrderedStream,
113{
114    type Data = <P::Target as OrderedStream>::Data;
115    type Ordering = <P::Target as OrderedStream>::Ordering;
116
117    fn poll_next_before(
118        self: Pin<&mut Self>,
119        cx: &mut Context<'_>,
120        before: Option<&Self::Ordering>,
121    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
122        self.get_mut().as_mut().poll_next_before(cx, before)
123    }
124
125    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
126        (**self).position_hint()
127    }
128
129    fn size_hint(&self) -> (usize, Option<usize>) {
130        (**self).size_hint()
131    }
132}
133
134/// An [`OrderedStream`] that tracks if the underlying stream should be polled.
135pub trait FusedOrderedStream: OrderedStream {
136    /// Returns `true` if the stream should no longer be polled.
137    fn is_terminated(&self) -> bool;
138}
139
140/// The result of a [`OrderedStream::poll_next_before`] operation.
141#[derive(Debug)]
142pub enum PollResult<Ordering, Data> {
143    /// An item with a corresponding ordering token.
144    Item { data: Data, ordering: Ordering },
145    /// This stream will not return any items prior to the given point.
146    NoneBefore,
147    /// This stream is terminated and should not be polled again.
148    Terminated,
149}
150
151impl<D, T> PollResult<T, D> {
152    /// Extract the data from the result.
153    pub fn into_data(self) -> Option<D> {
154        match self {
155            Self::Item { data, .. } => Some(data),
156            _ => None,
157        }
158    }
159
160    /// Extract the item from the result.
161    pub fn into_tuple(self) -> Option<(T, D)> {
162        match self {
163            Self::Item { data, ordering } => Some((ordering, data)),
164            _ => None,
165        }
166    }
167
168    /// Apply a closure to the data.
169    pub fn map_data<R>(self, f: impl FnOnce(D) -> R) -> PollResult<T, R> {
170        match self {
171            Self::Item { data, ordering } => PollResult::Item {
172                data: f(data),
173                ordering,
174            },
175            Self::NoneBefore => PollResult::NoneBefore,
176            Self::Terminated => PollResult::Terminated,
177        }
178    }
179}
180
181impl<T, D, E> PollResult<T, Result<D, E>> {
182    /// Extract the error of a [`Result`] item.
183    pub fn transpose_result(self) -> Result<PollResult<T, D>, E> {
184        self.transpose_result_item().map_err(|(_, e)| e)
185    }
186
187    /// Extract the error and ordering from a [`Result`] item.
188    pub fn transpose_result_item(self) -> Result<PollResult<T, D>, (T, E)> {
189        match self {
190            Self::Item {
191                data: Ok(data),
192                ordering,
193            } => Ok(PollResult::Item { data, ordering }),
194            Self::Item {
195                data: Err(data),
196                ordering,
197            } => Err((ordering, data)),
198            Self::NoneBefore => Ok(PollResult::NoneBefore),
199            Self::Terminated => Ok(PollResult::Terminated),
200        }
201    }
202}
203
204/// A [`Future`](core::future::Future) that produces an item with an associated ordering.
205///
206/// This is equivalent to an [`OrderedStream`] that always produces exactly one item.  This trait
207/// is not very useful on its own; see [`FromFuture`] to convert it to a stream.
208///
209/// It is valid to implement both [`Future`](core::future::Future) and [`OrderedFuture`] on the
210/// same type.  In this case, unless otherwise documented by the implementing type, neither poll
211/// function should be invoked after either returns an output value.
212pub trait OrderedFuture {
213    /// See [`OrderedStream::Ordering`].
214    type Ordering: Ord;
215
216    /// See [`OrderedStream::Data`].
217    type Output;
218
219    /// Attempt to pull out the value of this future, registering the current task for wakeup if
220    /// needed, and returning `None` if it is known that the future will not produce a value
221    /// ordered before the given point.
222    ///
223    /// # Return value
224    ///
225    /// There are several possible return values, each indicating a distinct state depending on the
226    /// value passed in `before`:
227    ///
228    /// - If `before` was `None`, `Poll::Pending` means that this future's value is not ready yet.
229    /// Implementations will ensure that the current task is notified when the next value may be
230    /// ready.
231    ///
232    /// - If `before` was `Some`, `Poll::Pending` means that this future's value is not ready and
233    /// that it is not yet known if the value will be ordered prior to the given ordering value.
234    /// Implementations will ensure that the current task is notified when either the next value is
235    /// ready or once it is known that no such value will be produced.
236    ///
237    /// - `Poll::Ready(Some(Data))` means that the future has successfully terminated.  The
238    /// returned ordering value **may** be greater than the value passed to `before`.  The
239    /// `poll_before` function should not be invoked again.
240    ///
241    /// - `Poll::Ready(None)` means that this future will not produce an ordering token less than
242    /// the given token.  It is an error to return `None` if `before` was `None`.
243    fn poll_before(
244        self: Pin<&mut Self>,
245        cx: &mut Context<'_>,
246        before: Option<&Self::Ordering>,
247    ) -> Poll<Option<(Self::Ordering, Self::Output)>>;
248
249    /// The minimum value of the ordering of the item.
250    ///
251    /// See [`OrderedStream::position_hint`] for details.
252    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
253        None
254    }
255}
256
257mod adapters;
258pub use adapters::*;
259mod join;
260pub use join::*;
261mod multi;
262pub use multi::*;