futures-join-stream 0.0.2

Efficient sorted joining of continuous sorted async Streams
Documentation
use crate::MaybeBorrowed;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_core::Stream;

/// An object that is ordered with respect to some ordering type.
///
/// This can be useful when you have one object that can be ordered in multiple ways.  For example,
/// you could sort `(x,y)` points by the x coordinate or by distance from the origin.
///
/// This is also useful when you have multiple types that all contain a common field (for example,
/// a timestamp) by which they can be ordered.  This trait allows determining the relative ordring
/// of these types without needing to implement [`PartialOrd`] for all possible pairs of types.
pub trait OrderedBy<Ordering: Ord> {
    fn ordering(&self) -> MaybeBorrowed<'_, Ordering>;
}

impl<T: Ord> OrderedBy<T> for T {
    fn ordering(&self) -> MaybeBorrowed<'_, T> {
        MaybeBorrowed::Borrowed(self)
    }
}

/// A [`Stream`] whose items are produced in order.
pub trait StreamOrderedBy<Ordering: Ord>: Stream
where
    Self::Item: OrderedBy<Ordering>,
{
    /// Return the current position of this stream.
    ///
    /// This is a hint; you should call [`Self::poll_position`] if you rely on tracking the
    /// position.
    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Ordering>> {
        None
    }

    /// Return the current position of this stream and wakes up the calling task when this changes.
    ///
    /// If this returns `None`, the position is unknown; a wakeup will be provided when this
    /// changes.
    ///
    /// If this returns a position, it is an assertion that this stream will never in the future
    /// produce an item whose position is less than the returned position value.
    ///
    /// This wakeup **may** disable the currently pending wakeup provided by [`Stream::poll_next`];
    /// however, if it does, then the context provided to this function must be woken instead.
    fn poll_position(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Option<MaybeBorrowed<'_, Ordering>>;

    /// Combine [`Stream::poll_next`] and [`Self::poll_position`] in one call.
    ///
    /// This is a convenience method for calling both functions.  The default implementation just
    /// calls both poll functions in order, but this can be overridden with a more efficient
    /// implementation if desired.
    ///
    /// If an item is returned, the returned position returned will be valid for the state of the
    /// stream *after* this function call, so it may be greater than the position of the returned
    /// item.
    fn poll_next_and_position(self: Pin<&mut Self>, cx: &mut Context<'_>) -> (Poll<Option<Self::Item>>, Option<MaybeBorrowed<'_, Ordering>>) {
        let mut this = self;
        let next = this.as_mut().poll_next(cx);
        let pos = this.poll_position(cx);
        (next, pos)
    }
}