remoc-obs 0.5.0

Collections (Vec, HashMap, HashSet, append-only list) that can be observed and mirrored remotely.
Documentation
//! Observable append-only list.
//!
//! This provides a locally and remotely observable append-only list.
//! The observable list sends a change event each time a change is performed on it.
//! The [resulting event stream](ListSubscription) can either be processed event-wise
//! or used to build a [mirrored list](MirroredList).
//!
//! # Alternatives
//!
//! The [observable vector](crate::vec) provides most operations of a [vector](Vec),
//! but allocates a separate event buffer per subscriber and thus uses more memory.
//!
//! # Basic use
//!
//! Create a [ObservableList] and obtain a [subscription](ListSubscription) to it using
//! [ObservableList::subscribe].
//! Send this subscription to a remote endpoint via a [remote channel](remoc::rch) and call
//! [ListSubscription::mirror] on the remote endpoint to obtain a live mirror of the observed
//! vector or process each change event individually using [ListSubscription::recv].
//!

use futures::{future, Future, FutureExt};
use remoc::prelude::*;
use serde::{Deserialize, Serialize};
use std::{
    fmt,
    marker::PhantomData,
    ops::Deref,
    sync::{
        atomic::{AtomicUsize, Ordering},
        Arc,
    },
};
use tokio::sync::{mpsc, oneshot, watch, Mutex, OwnedMutexGuard, RwLock, RwLockReadGuard};

use crate::{default_on_err, ChangeNotifier, ChangeSender, RecvError, SendError};

/// A list change event.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ListEvent<T> {
    /// The subscription has reached the value of the observed
    /// list at the time it was subscribed.
    #[serde(skip)]
    InitialComplete,
    /// An item was pushed at the end of the list.
    Push(T),
    /// The list has reached its final state and
    /// no further events will occur.
    Done,
}

/// Observable list task request.
enum Req<T> {
    /// Push item.
    Push(T),
    /// List is complete, i.e. no more items will be sent.
    Done,
    /// Borrow the current list.
    Borrow(oneshot::Sender<OwnedMutexGuard<Vec<T>>>),
    /// Set the error handler.
    SetErrorHandler(Box<dyn Fn(SendError) + Send + Sync + 'static>),
}

/// Oberservable list subscribing related task request.
enum DistReq<T, Codec> {
    /// Subscribe receiver.
    Subscribe(rch::mpsc::Sender<ListEvent<T>, Codec>),
    /// Notify when no subscribers are left.
    NotifyNoSubscribers(oneshot::Sender<()>),
}

/// An observable list distributor allows subscribing to an observable list.
///
/// This is clonable and can be sent to other tasks.
#[derive(Clone)]
pub struct ObservableListDistributor<T, Codec = remoc::codec::Default> {
    tx: mpsc::UnboundedSender<DistReq<T, Codec>>,
    len: Arc<AtomicUsize>,
    subscriber_count: Arc<AtomicUsize>,
}

impl<T, Codec> fmt::Debug for ObservableListDistributor<T, Codec> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("ObservableListDistributor")
            .field("len", &self.len.load(Ordering::Relaxed))
            .field("subscriber_count", &self.subscriber_count.load(Ordering::Relaxed))
            .finish()
    }
}

impl<T, Codec> ObservableListDistributor<T, Codec>
where
    T: RemoteSend + Clone,
    Codec: remoc::codec::Codec,
{
    /// Sends a request to the task.
    fn req(&self, req: DistReq<T, Codec>) {
        if self.tx.send(req).is_err() {
            panic!("observable list task was terminated");
        }
    }

    /// Subscribes to the observable list with incremental sending of the current contents.
    pub fn subscribe(&self) -> ListSubscription<T, Codec> {
        let (tx, rx) = rch::mpsc::channel(1);
        let _ = self.tx.send(DistReq::Subscribe(tx));
        ListSubscription::new(self.len.load(Ordering::Relaxed), rx)
    }

    /// Current number of subscribers.
    pub fn subscriber_count(&self) -> usize {
        self.subscriber_count.load(Ordering::Relaxed)
    }

    /// Returns when all subscribers have quit.
    ///
    /// If no subscribers are currently present, this return immediately.
    /// This also returns when [done](ObservableList::done) has been called and
    /// all subscribers have received all elements of the list.
    pub fn closed(&self) -> impl Future<Output = ()> {
        let (tx, rx) = oneshot::channel();
        self.req(DistReq::NotifyNoSubscribers(tx));
        async move {
            let _ = rx.await;
        }
    }

    /// Returns `true` if there are currently no subscribers.
    pub fn is_closed(&self) -> bool {
        self.subscriber_count() == 0
    }
}

/// An append-only list that emits an event for each change.
///
/// Use [subscribe](Self::subscribe) to obtain an event stream
/// that can be used for building a mirror of this list.
///
/// The [distributor method](Self::distributor) can be used to obtain a clonable object
/// that can be used to make subscriptions from other tasks.
pub struct ObservableList<T, Codec = remoc::codec::Default> {
    tx: mpsc::UnboundedSender<Req<T>>,
    change: ChangeSender,
    len: Arc<AtomicUsize>,
    done: bool,
    dist: ObservableListDistributor<T, Codec>,
}

impl<T, Codec> fmt::Debug for ObservableList<T, Codec> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("ObservableList")
            .field("len", &self.len.load(Ordering::Relaxed))
            .field("done", &self.done)
            .field("subscriber_count", &self.dist.subscriber_count.load(Ordering::Relaxed))
            .finish()
    }
}

impl<T, Codec> Default for ObservableList<T, Codec>
where
    T: RemoteSend + Clone,
    Codec: remoc::codec::Codec,
{
    fn default() -> Self {
        Self::from(Vec::new())
    }
}

impl<T, Codec> From<Vec<T>> for ObservableList<T, Codec>
where
    T: RemoteSend + Clone,
    Codec: remoc::codec::Codec,
{
    fn from(initial: Vec<T>) -> Self {
        let (tx, rx) = mpsc::unbounded_channel();
        let (sub_tx, sub_rx) = mpsc::unbounded_channel();
        let len = Arc::new(AtomicUsize::new(initial.len()));
        let subscriber_count = Arc::new(AtomicUsize::new(0));
        tokio::spawn(Self::task(initial, rx, sub_rx, subscriber_count.clone()));
        Self {
            tx,
            change: ChangeSender::new(),
            len: len.clone(),
            done: false,
            dist: ObservableListDistributor { tx: sub_tx, len, subscriber_count },
        }
    }
}

impl<T, Codec> ObservableList<T, Codec>
where
    T: RemoteSend + Clone,
    Codec: remoc::codec::Codec,
{
    /// Creates a new, empty observable list.
    pub fn new() -> Self {
        Self::default()
    }

    /// Sends a request to the task.
    fn req(&self, req: Req<T>) {
        if self.tx.send(req).is_err() {
            panic!("observable list task was terminated");
        }
    }

    /// Panics when `done` has been called.
    fn assert_not_done(&self) {
        if self.done {
            panic!("observable list cannot be changed after done has been called");
        }
    }

    /// Sets the error handler function that is called when sending an
    /// event fails.
    pub fn set_error_handler<E>(&mut self, on_err: E)
    where
        E: Fn(SendError) + Send + Sync + 'static,
    {
        self.req(Req::SetErrorHandler(Box::new(on_err)));
    }

    /// Returns an observable list distributor that can be used to make subscriptions
    /// to this observable list.
    ///
    /// It is clonable and can be sent to other tasks.
    pub fn distributor(&self) -> ObservableListDistributor<T, Codec> {
        self.dist.clone()
    }

    /// Subscribes to the observable list with incremental sending of the current contents.
    pub fn subscribe(&self) -> ListSubscription<T, Codec> {
        self.dist.subscribe()
    }

    /// Current number of subscribers.
    pub fn subscriber_count(&self) -> usize {
        self.dist.subscriber_count()
    }

    /// Returns a [change notifier](ChangeNotifier) that can be used *locally* to be
    /// notified of changes to this collection.
    pub fn notifier(&self) -> ChangeNotifier {
        self.change.subscribe()
    }

    /// Returns when all subscribers have quit.
    ///
    /// If no subscribers are currently present, this return immediately.
    /// This also returns when [done](Self::done) has been called and
    /// all subscribers have received all elements of the list.
    pub fn closed(&self) -> impl Future<Output = ()> {
        self.dist.closed()
    }

    /// Returns `true` if there are currently no subscribers.
    pub fn is_closed(&self) -> bool {
        self.dist.is_closed()
    }

    /// Appends an element at the end.
    ///
    /// A [ListEvent::Push] change event is sent.
    ///
    /// # Panics
    /// Panics when [done](Self::done) has been called before.
    pub fn push(&mut self, value: T) {
        self.assert_not_done();
        self.req(Req::Push(value));
        self.len.fetch_add(1, Ordering::Relaxed);
        self.change.notify();
    }

    /// The current number of elements in the observable list.
    pub fn len(&self) -> usize {
        self.len.load(Ordering::Relaxed)
    }

    /// Returns whether this observable list is empty.
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Prevents further changes of this list and notifies
    /// are subscribers that no further events will occur.
    ///
    /// Methods that modify the list will panic after this has been called.
    /// It is still possible to subscribe to this observable list.
    pub fn done(&mut self) {
        if !self.done {
            self.req(Req::Done);
            self.done = true;
        }
    }

    /// Returns `true` if [done](Self::done) has been called and further
    /// changes are prohibited.
    ///
    /// Methods that modify the list will panic in this case.
    pub fn is_done(&self) -> bool {
        self.done
    }

    /// Borrows the current value of the observable list.
    ///
    /// While the borrow is held sending of events to subscribers is paused.
    #[allow(clippy::needless_lifetimes)]
    pub async fn borrow<'a>(&'a self) -> ObservableListRef<'a, T> {
        let (tx, rx) = oneshot::channel();
        self.req(Req::Borrow(tx));
        ObservableListRef { buffer: rx.await.unwrap(), _phantom: PhantomData }
    }

    /// Event dispatch task.
    async fn task(
        buffer: Vec<T>, rx: mpsc::UnboundedReceiver<Req<T>>, dist_rx: mpsc::UnboundedReceiver<DistReq<T, Codec>>,
        subscriber_count: Arc<AtomicUsize>,
    ) {
        /// State of a subscription.
        struct SubState<T, Codec> {
            pos: usize,
            done: bool,
            tx: rch::mpsc::Sender<ListEvent<T>, Codec>,
        }

        let buffer_shared = Arc::new(Mutex::new(buffer));
        let mut buffer_guard_opt = None;
        let mut rx_opt = Some(rx);
        let mut dist_rx_opt = Some(dist_rx);
        let mut subs: Vec<SubState<T, Codec>> = Vec::new();
        let mut done = false;
        let mut on_err: Box<dyn Fn(SendError) + Send + Sync + 'static> = Box::new(default_on_err);
        let mut no_sub_notify: Vec<oneshot::Sender<()>> = Vec::new();

        // Event and transmit loop.
        loop {
            // Obtain buffer mutex.
            let buffer = match &mut buffer_guard_opt {
                Some(br) => br,
                None => {
                    buffer_guard_opt = Some(buffer_shared.clone().lock_owned().await);
                    buffer_guard_opt.as_mut().unwrap()
                }
            };

            // If no more items can arrive, keep only subscription that have
            // not yet received all items.
            if rx_opt.is_none() {
                if done {
                    subs.retain(|sub| !sub.done);
                } else {
                    subs.retain(|sub| sub.pos < buffer.len());
                }
            }

            // Update subscriber counts.
            subscriber_count.store(subs.len(), Ordering::Relaxed);
            if subs.is_empty() {
                for tx in no_sub_notify.drain(..) {
                    let _ = tx.send(());
                }
            }

            // Create tasks that obtain send permits.
            let mut permit_tasks = Vec::new();
            for (i, sub) in subs.iter().enumerate() {
                if sub.pos < buffer.len() || (done && !sub.done) {
                    permit_tasks.push(async move { (i, sub.tx.reserve().await) }.boxed());
                }
            }

            // Check for termination.
            if rx_opt.is_none() && dist_rx_opt.is_none() && permit_tasks.is_empty() {
                break;
            }

            tokio::select! {
                biased;

                // Process request.
                req = async {
                    match &mut rx_opt {
                        Some(rx) => rx.recv().await,
                        None => future::pending().await,
                    }
                } => match req {
                    Some(Req::Push(v)) => buffer.push(v),
                    Some(Req::Done) => done = true,
                    Some(Req::Borrow(tx)) => {
                        let _ = tx.send(buffer_guard_opt.take().unwrap());
                    }
                    Some(Req::SetErrorHandler(handler)) => on_err = handler,
                    None => rx_opt = None,
                },

                // Process distribution request.
                req = async {
                    match &mut dist_rx_opt {
                        Some(rx) => rx.recv().await,
                        None => future::pending().await,
                    }
                } => match req {
                    Some(DistReq::Subscribe(tx)) => subs.push(SubState { pos: 0, done: false, tx }),
                    Some(DistReq::NotifyNoSubscribers(tx)) => no_sub_notify.push(tx),
                    None => dist_rx_opt = None,
                },

                // Process send permit ready.
                (i, res) = async move {
                    if permit_tasks.is_empty() {
                        future::pending().await
                    } else {
                        future::select_all(permit_tasks).await.0
                    }
                } => match res {
                    Ok(permit) => {
                        let sub = &mut subs[i];
                        if sub.pos < buffer.len() {
                            permit.send(ListEvent::Push(buffer[sub.pos].clone()));
                            sub.pos += 1;
                        } else if done && !sub.done {
                            permit.send(ListEvent::Done);
                            sub.done = true;
                        } else {
                            unreachable!()
                        }
                    }
                    Err(err) => {
                        subs.swap_remove(i);
                        if let Ok(err) = SendError::try_from(err) {
                            on_err(err);
                        }
                    }
                },
            }
        }
    }
}

impl<T, Codec> Drop for ObservableList<T, Codec> {
    fn drop(&mut self) {
        // empty
    }
}

impl<T, Codec> Extend<T> for ObservableList<T, Codec>
where
    T: RemoteSend + Clone,
    Codec: remoc::codec::Codec,
{
    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
        for value in iter {
            self.push(value);
        }
    }
}

/// A reference to an observable list.
///
/// While this is held sending of events to subscribers is paused.
pub struct ObservableListRef<'a, T> {
    buffer: OwnedMutexGuard<Vec<T>>,
    _phantom: PhantomData<&'a ()>,
}

impl<'a, T> fmt::Debug for ObservableListRef<'a, T>
where
    T: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        self.buffer.fmt(f)
    }
}

impl<'a, T> Deref for ObservableListRef<'a, T> {
    type Target = Vec<T>;

    fn deref(&self) -> &Self::Target {
        &*self.buffer
    }
}

struct MirroredListInner<T> {
    v: Vec<T>,
    complete: bool,
    done: bool,
    error: Option<RecvError>,
    max_size: usize,
}

impl<T> MirroredListInner<T> {
    fn handle_event(&mut self, event: ListEvent<T>) -> Result<(), RecvError> {
        match event {
            ListEvent::InitialComplete => {
                self.complete = true;
            }
            ListEvent::Push(v) => {
                self.v.push(v);
                if self.v.len() > self.max_size {
                    return Err(RecvError::MaxSizeExceeded(self.max_size));
                }
            }
            ListEvent::Done => {
                self.done = true;
            }
        }
        Ok(())
    }
}

/// Observable list subscription.
///
/// This can be sent to a remote endpoint via a [remote channel](remoc::rch).
/// Then, on the remote endpoint, [mirror](Self::mirror) can be used to build
/// and keep up-to-date a mirror of the observed list.
///
/// The event stream can also be processed event-wise using [recv](Self::recv).
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound(serialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
#[serde(bound(deserialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
pub struct ListSubscription<T, Codec = remoc::codec::Default> {
    /// Length of list at time of subscription.
    initial_len: usize,
    /// Initial value received completely.
    #[serde(skip, default)]
    complete: bool,
    /// Change events receiver.
    events: Option<rch::mpsc::Receiver<ListEvent<T>, Codec>>,
    /// Number of elements received so far.
    len: usize,
}

impl<T, Codec> ListSubscription<T, Codec>
where
    T: RemoteSend + Clone,
    Codec: remoc::codec::Codec,
{
    fn new(initial_len: usize, events: rch::mpsc::Receiver<ListEvent<T>, Codec>) -> Self {
        Self { initial_len, complete: false, events: Some(events), len: 0 }
    }

    /// Returns whether the initial value event or
    /// stream of events that build up the initial value
    /// has completed.
    pub fn is_complete(&self) -> bool {
        self.complete
    }

    /// Returns whether the observed list has indicated that no further
    /// change events will occur.
    pub fn is_done(&self) -> bool {
        self.events.is_none()
    }

    /// Receives the next change event.
    pub async fn recv(&mut self) -> Result<Option<ListEvent<T>>, RecvError> {
        // Provide initial value complete event.
        if self.len == self.initial_len && !self.complete {
            self.complete = true;
            return Ok(Some(ListEvent::InitialComplete));
        }

        // Provide change event.
        match &mut self.events {
            Some(rx) => match rx.recv().await? {
                Some(ListEvent::InitialComplete) => unreachable!(),
                Some(evt @ ListEvent::Push(_)) => {
                    self.len += 1;
                    Ok(Some(evt))
                }
                Some(ListEvent::Done) => {
                    self.events = None;
                    Ok(Some(ListEvent::Done))
                }
                None => Err(RecvError::Closed),
            },
            None => Ok(None),
        }
    }

    /// Receives the next item.
    ///
    /// `Ok(None)` is returned when all items have been received and the observed
    /// list has been marked as done.
    pub async fn recv_item(&mut self) -> Result<Option<T>, RecvError> {
        loop {
            match self.recv().await? {
                Some(ListEvent::InitialComplete) => (),
                Some(ListEvent::Push(item)) => return Ok(Some(item)),
                Some(ListEvent::Done) => (),
                None => return Ok(None),
            }
        }
    }
}

impl<T, Codec> ListSubscription<T, Codec>
where
    T: RemoteSend + Clone + Sync,
    Codec: remoc::codec::Codec,
{
    /// Mirror the list that this subscription is observing.
    ///
    /// `max_size` specifies the maximum allowed size of the mirrored collection.
    /// If this size is reached, processing of events is stopped and
    /// [RecvError::MaxSizeExceeded] is returned.
    pub fn mirror(mut self, max_size: usize) -> MirroredList<T> {
        let (changed_tx, changed_rx) = watch::channel(());
        let (dropped_tx, mut dropped_rx) = mpsc::channel(1);

        // Build initial state.
        let inner = Arc::new(RwLock::new(MirroredListInner {
            v: Vec::new(),
            complete: false,
            done: false,
            error: None,
            max_size,
        }));
        let inner_task = Arc::downgrade(&inner);

        // Process change events.
        tokio::spawn(async move {
            loop {
                let event = tokio::select! {
                    event = self.recv() => event,
                    _ = dropped_rx.recv() => return,
                };

                let inner = match inner_task.upgrade() {
                    Some(inner) => inner,
                    None => return,
                };
                let mut inner = inner.write().await;

                changed_tx.send_replace(());

                match event {
                    Ok(Some(event)) => {
                        if let Err(err) = inner.handle_event(event) {
                            inner.error = Some(err);
                            return;
                        }

                        if inner.done {
                            break;
                        }
                    }
                    Ok(None) => break,
                    Err(err) => {
                        inner.error = Some(err);
                        return;
                    }
                }
            }
        });

        MirroredList { inner: Some(inner), changed_rx, _dropped_tx: dropped_tx }
    }
}

/// An append-only list that is mirroring an observable append-only list.
///
/// Clones of this are cheap and share the same underlying mirrored list.
#[derive(Clone)]
pub struct MirroredList<T> {
    inner: Option<Arc<RwLock<MirroredListInner<T>>>>,
    changed_rx: watch::Receiver<()>,
    _dropped_tx: mpsc::Sender<()>,
}

impl<T> fmt::Debug for MirroredList<T> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("MirroredList").finish()
    }
}

impl<T> MirroredList<T>
where
    T: RemoteSend + Clone,
{
    /// Returns a reference to the current value of the list.
    ///
    /// Updates are paused while the read lock is held.
    ///
    /// This method returns an error if the observed list has been dropped
    /// or the connection to it failed before it was marked as done by calling
    /// [ObservableList::done].
    /// In this case the mirrored contents at the point of loss of connection
    /// can be obtained using [detach](Self::detach).
    pub async fn borrow(&self) -> Result<MirroredListRef<'_, T>, RecvError> {
        let inner = self.inner.as_ref().unwrap().read().await;
        match &inner.error {
            None => Ok(MirroredListRef(inner)),
            Some(err) => Err(err.clone()),
        }
    }

    /// Returns a reference to the current value of the list and marks it as seen.
    ///
    /// Thus [changed](Self::changed) will not return immediately until the value changes
    /// after this method returns.
    ///
    /// Updates are paused while the read lock is held.
    ///
    /// This method returns an error if the observed list has been dropped
    /// or the connection to it failed before it was marked as done by calling
    /// [ObservableList::done].
    /// In this case the mirrored contents at the point of loss of connection
    /// can be obtained using [detach](Self::detach).
    pub async fn borrow_and_update(&mut self) -> Result<MirroredListRef<'_, T>, RecvError> {
        let inner = self.inner.as_ref().unwrap().read().await;
        self.changed_rx.borrow_and_update();
        match &inner.error {
            None => Ok(MirroredListRef(inner)),
            Some(err) => Err(err.clone()),
        }
    }

    /// Stops updating the list and returns its current contents.
    ///
    /// If clones of this mirrored list exist, the cloned contents are returned.
    pub async fn detach(mut self) -> Vec<T> {
        match Arc::try_unwrap(self.inner.take().unwrap()) {
            Ok(inner) => inner.into_inner().v,
            Err(inner) => inner.read().await.v.clone(),
        }
    }

    /// Waits for a change (append of one or more elements) and marks the newest value as seen.
    ///
    /// This also returns when connection to the observed list has been lost
    /// or the list has been marked as done.
    pub async fn changed(&mut self) {
        let _ = self.changed_rx.changed().await;
    }

    /// Waits for the observed list to be marked as done and for all elements to
    /// be received by this mirror of it.
    ///
    /// This marks changes as seen, even when aborted.
    pub async fn done(&mut self) -> Result<(), RecvError> {
        while !self.borrow_and_update().await?.is_done() {
            self.changed().await;
        }
        Ok(())
    }
}

impl<T> Drop for MirroredList<T> {
    fn drop(&mut self) {
        // empty
    }
}

/// A snapshot view of an observable append-only list.
pub struct MirroredListRef<'a, T>(RwLockReadGuard<'a, MirroredListInner<T>>);

impl<'a, T> MirroredListRef<'a, T> {
    /// Returns `true` if the mirror list has reached the length of the observed
    /// list at the time of subscription.
    pub fn is_complete(&self) -> bool {
        self.0.complete
    }

    /// Returns `true` if the observed list has been marked as done by calling
    /// [ObservableList::done] and thus no further changes can occur.
    pub fn is_done(&self) -> bool {
        self.0.done
    }
}

impl<'a, T> fmt::Debug for MirroredListRef<'a, T>
where
    T: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        self.0.v.fmt(f)
    }
}

impl<'a, T> Deref for MirroredListRef<'a, T> {
    type Target = Vec<T>;

    fn deref(&self) -> &Self::Target {
        &self.0.v
    }
}

impl<'a, T> Drop for MirroredListRef<'a, T> {
    fn drop(&mut self) {
        // required for drop order
    }
}