eyeball-im 0.3.1

Observable collections based on the `im` crate.
Documentation
use std::{
    hint::unreachable_unchecked,
    mem,
    pin::Pin,
    task::{ready, Context, Poll},
    vec,
};

use futures_core::Stream;
use imbl::Vector;
use tokio::sync::broadcast::{
    error::{RecvError, TryRecvError},
    Receiver,
};
use tokio_util::sync::ReusableBoxFuture;
#[cfg(feature = "tracing")]
use tracing::info;

use super::{BroadcastMessage, OneOrManyDiffs, VectorDiff};

/// A subscriber for updates of a [`Vector`].
#[derive(Debug)]
pub struct VectorSubscriber<T> {
    rx: Receiver<BroadcastMessage<T>>,
}

impl<T: Clone + Send + Sync + 'static> VectorSubscriber<T> {
    pub(super) fn new(rx: Receiver<BroadcastMessage<T>>) -> Self {
        Self { rx }
    }

    /// Turn this `VectorSubcriber` into a stream of `VectorDiff`s.
    pub fn into_stream(self) -> VectorSubscriberStream<T> {
        VectorSubscriberStream {
            inner: ReusableBoxFuture::new(make_future(self.rx)),
            state: VectorSubscriberStreamState::Recv,
        }
    }

    /// Turn this `VectorSubcriber` into a stream of `Vec<VectorDiff>`s.
    pub fn into_batched_stream(self) -> VectorSubscriberBatchedStream<T> {
        VectorSubscriberBatchedStream { inner: ReusableBoxFuture::new(make_future(self.rx)) }
    }
}

/// A stream of `VectorDiff`s created from a [`VectorSubscriber`].
///
/// Use its [`Stream`] implementation to interact with it (futures-util and
/// other futures-related crates have extension traits with convenience
/// methods).
#[derive(Debug)]
pub struct VectorSubscriberStream<T> {
    inner: ReusableBoxFuture<'static, SubscriberFutureReturn<BroadcastMessage<T>>>,
    state: VectorSubscriberStreamState<T>,
}

#[derive(Debug)]
enum VectorSubscriberStreamState<T> {
    // Stream is waiting on a new message from the inner broadcast receiver.
    Recv,
    // Stream is yielding remaining items from a previous message with multiple
    // diffs.
    YieldBatch { iter: vec::IntoIter<VectorDiff<T>>, rx: Receiver<BroadcastMessage<T>> },
}

// Not clear why this explicit impl is needed, but it's not unsafe so it is fine
impl<T> Unpin for VectorSubscriberStreamState<T> {}

impl<T: Clone + Send + Sync + 'static> Stream for VectorSubscriberStream<T> {
    type Item = VectorDiff<T>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match &mut self.state {
            VectorSubscriberStreamState::Recv => {
                let (result, mut rx) = ready!(self.inner.poll(cx));

                let poll = match result {
                    Ok(msg) => match msg.diffs {
                        OneOrManyDiffs::One(diff) => Poll::Ready(Some(diff)),
                        OneOrManyDiffs::Many(diffs) if diffs.is_empty() => {
                            unreachable!("ObservableVectorTransaction never sends empty diffs")
                        }
                        OneOrManyDiffs::Many(mut diffs) if diffs.len() == 1 => {
                            Poll::Ready(Some(diffs.pop().unwrap()))
                        }
                        OneOrManyDiffs::Many(diffs) => {
                            let mut iter = diffs.into_iter();
                            let fst = iter.next().unwrap();
                            self.state = VectorSubscriberStreamState::YieldBatch { iter, rx };
                            return Poll::Ready(Some(fst));
                        }
                    },
                    Err(RecvError::Closed) => Poll::Ready(None),
                    Err(RecvError::Lagged(_)) => {
                        Poll::Ready(handle_lag(&mut rx).map(|values| VectorDiff::Reset { values }))
                    }
                };

                self.inner.set(make_future(rx));
                poll
            }
            VectorSubscriberStreamState::YieldBatch { iter, .. } => {
                let diff =
                    iter.next().expect("YieldBatch is never left empty when exiting poll_next");

                if iter.len() == 0 {
                    let old_state =
                        mem::replace(&mut self.state, VectorSubscriberStreamState::Recv);
                    let rx = match old_state {
                        VectorSubscriberStreamState::YieldBatch { rx, .. } => rx,
                        // Safety: We would not be in the outer branch otherwise
                        _ => unsafe { unreachable_unchecked() },
                    };

                    self.inner.set(make_future(rx));
                }

                Poll::Ready(Some(diff))
            }
        }
    }
}

/// A batched stream of `VectorDiff`s created from a [`VectorSubscriber`].
///
/// Use its [`Stream`] implementation to interact with it (futures-util and
/// other futures-related crates have extension traits with convenience
/// methods).
#[derive(Debug)]
pub struct VectorSubscriberBatchedStream<T> {
    inner: ReusableBoxFuture<'static, SubscriberFutureReturn<BroadcastMessage<T>>>,
}

impl<T: Clone + Send + Sync + 'static> Stream for VectorSubscriberBatchedStream<T> {
    type Item = Vec<VectorDiff<T>>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        fn append<T>(target: &mut Vec<VectorDiff<T>>, source: OneOrManyDiffs<T>) {
            match source {
                OneOrManyDiffs::One(diff) => target.push(diff),
                OneOrManyDiffs::Many(mut diffs) => target.append(&mut diffs),
            }
        }

        let (result, mut rx) = ready!(self.inner.poll(cx));

        let poll = match result {
            Ok(msg) => {
                let mut batch = msg.diffs.into_vec();
                loop {
                    match rx.try_recv() {
                        Ok(msg) => append(&mut batch, msg.diffs),
                        Err(TryRecvError::Empty | TryRecvError::Closed) => {
                            break Poll::Ready(Some(batch));
                        }
                        Err(TryRecvError::Lagged(_)) => {
                            break Poll::Ready(
                                handle_lag(&mut rx)
                                    .map(|values| vec![VectorDiff::Reset { values }]),
                            );
                        }
                    }
                }
            }
            Err(RecvError::Closed) => Poll::Ready(None),
            Err(RecvError::Lagged(_)) => {
                Poll::Ready(handle_lag(&mut rx).map(|values| vec![VectorDiff::Reset { values }]))
            }
        };

        self.inner.set(make_future(rx));
        poll
    }
}

fn handle_lag<T: Clone + Send + Sync + 'static>(
    rx: &mut Receiver<BroadcastMessage<T>>,
) -> Option<Vector<T>> {
    let mut msg = None;
    loop {
        match rx.try_recv() {
            // There's a newer message in the receiver's buffer, use that for reset.
            Ok(m) => {
                msg = Some(m);
            }
            // Ideally we'd return a `VecDiff::Reset` with the last state before the
            // channel was closed here, but we have no way of obtaining the last state.
            Err(TryRecvError::Closed) => {
                #[cfg(feature = "tracing")]
                info!("Channel closed after lag, can't return last state");
                return None;
            }
            // Lagged twice in a row, is this possible? If it is, it's fine to just
            // loop again and look at the next try_recv result.
            Err(TryRecvError::Lagged(_)) => {}
            Err(TryRecvError::Empty) => match msg {
                // We exhausted the internal buffer using try_recv, msg contains the
                // last message from it, which we use for the reset.
                Some(msg) => return Some(msg.state),
                // We exhausted the internal buffer using try_recv but there was no
                // message in it, even though we got TryRecvError::Lagged(_) before.
                None => unreachable!("got no new message via try_recv after lag"),
            },
        }
    }
}

type SubscriberFutureReturn<T> = (Result<T, RecvError>, Receiver<T>);

async fn make_future<T: Clone>(mut rx: Receiver<T>) -> SubscriberFutureReturn<T> {
    let result = rx.recv().await;
    (result, rx)
}