dbuff 0.1.0

Double-buffered state with async command chains, streaming, and keyed task pools for ratatui applications
Documentation
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;

use crate::domain::SharedDomainData;
use crate::stream_status::StreamStatus;

// ---------------------------------------------------------------------------
// Type aliases
// ---------------------------------------------------------------------------

type StreamStatusSetter<D, T> = Arc<dyn Fn(&mut D, StreamStatus<T>) + Send + Sync>;
type StreamItemCallback<T, I> = Arc<dyn Fn(&mut T, I) + Send + Sync>;

// ---------------------------------------------------------------------------
// StreamHandle
// ---------------------------------------------------------------------------

/// Handle to a spawned stream consumer task.
///
/// Returned by [`StreamSource::go`] or [`StreamConfig::go`]. Wraps a
/// [`tokio::task::JoinHandle`] and implements [`Future`] so it works with
/// [`tokio::select!`]. Supports [`.abort()`](StreamHandle::abort) for
/// graceful cancellation.
pub struct StreamHandle {
    cancel: tokio::sync::watch::Sender<bool>,
    inner: tokio::task::JoinHandle<()>,
}

impl StreamHandle {
    /// Abort the stream consumer.
    ///
    /// Sends a cancellation signal so the task can transition the domain
    /// status to [`StreamState::Aborted`] before exiting, then cancels
    /// the underlying tokio task.
    pub fn abort(&self) {
        let _ = self.cancel.send(true);
        self.inner.abort();
    }
}

impl Future for StreamHandle {
    type Output = Result<(), tokio::task::JoinError>;

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        // SAFETY: StreamHandle only contains cancel (Send + Sync) and inner
        // (JoinHandle). We project to the inner JoinHandle which is safe to
        // pin-project because it is structurally pinned.
        let inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) };
        inner.poll(cx)
    }
}

impl Deref for StreamHandle {
    type Target = tokio::task::JoinHandle<()>;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

// ---------------------------------------------------------------------------
// StreamExecutor
// ---------------------------------------------------------------------------

/// A domain-bound executor for consuming streams with ergonomic configuration.
///
/// Created via [`SharedDomainData::stream`]. Provide a stream source with
/// [`from_stream`](StreamExecutor::from_stream) or
/// [`try_from_stream`](StreamExecutor::try_from_stream), optionally configure
/// domain writes with [`.into()`](StreamSource::into), and finish with
/// [`.go()`](StreamSource::go) to spawn the consumer task.
///
/// # Type parameters
///
/// - `D`: The domain data type.
pub struct StreamExecutor<D> {
    pub(crate) domain: SharedDomainData<D>,
    pub(crate) rt: tokio::runtime::Handle,
}

impl<D> StreamExecutor<D>
where
    D: Clone + Send + Sync + 'static,
{
    /// Consume an infallible stream.
    ///
    /// All items yielded by the stream are processed. The stream ends
    /// naturally when the source returns `None`, transitioning to
    /// [`StreamState::Completed`].
    pub fn from_stream<St, I>(self, stream: St) -> StreamSource<D, I>
    where
        St: Stream<Item = I> + Send + 'static,
        I: Send + 'static,
    {
        StreamSource {
            domain: self.domain,
            rt: self.rt,
            stream: StreamKind::Infallible(Box::pin(stream)),
        }
    }

    /// Consume a fallible stream that yields `Result` items.
    ///
    /// `Ok(item)` items are processed normally. On `Err(e)`, the stream
    /// consumer transitions to [`StreamState::Error`] with the error message
    /// and stops. The buffer retains all items accumulated before the error.
    pub fn try_from_stream<St, I, E>(self, stream: St) -> StreamSource<D, I>
    where
        St: Stream<Item = Result<I, E>> + Send + 'static,
        I: Send + 'static,
        E: std::fmt::Display + Send + 'static,
    {
        let mapped = stream.map_err(|e: E| e.to_string());
        StreamSource {
            domain: self.domain,
            rt: self.rt,
            stream: StreamKind::Fallible(Box::pin(mapped)),
        }
    }
}

// ---------------------------------------------------------------------------
// StreamKind (internal)
// ---------------------------------------------------------------------------

/// Internal representation that unifies infallible and fallible streams.
///
/// Both variants conceptually yield `Option<Result<I, String>>`:
/// - `Infallible`: items are always `Ok`
/// - `Fallible`: items can be `Ok` or `Err`
enum StreamKind<I> {
    Infallible(Pin<Box<dyn Stream<Item = I> + Send>>),
    Fallible(Pin<Box<dyn Stream<Item = Result<I, String>> + Send>>),
}

impl<I> StreamKind<I> {
    /// Poll the next item, returning `Option<Result<I, String>>`.
    ///
    /// Uses a manually pinned future to avoid needing `StreamExt` at the
    /// call site. The `StreamExt` import is only needed for `map_err` in
    /// `try_from_stream`.
    async fn next(&mut self) -> Option<Result<I, String>> {
        match self {
            StreamKind::Infallible(s) => s.next().await.map(Ok),
            StreamKind::Fallible(s) => s.next().await,
        }
    }
}

// ---------------------------------------------------------------------------
// StreamSource
// ---------------------------------------------------------------------------

/// A configured stream source ready for domain binding or fire-and-forget.
///
/// Created by [`StreamExecutor::from_stream`] or
/// [`StreamExecutor::try_from_stream`]. Call [`.into()`](StreamSource::into)
/// to configure domain writes, or [`.go()`](StreamSource::go) for
/// fire-and-forget consumption.
///
/// # Type parameters
///
/// - `D`: The domain data type.
/// - `I`: The stream item type (the `Ok` variant for fallible streams).
pub struct StreamSource<D, I> {
    domain: SharedDomainData<D>,
    rt: tokio::runtime::Handle,
    stream: StreamKind<I>,
}

impl<D, I> StreamSource<D, I>
where
    D: Clone + Send + Sync + 'static,
    I: Send + 'static,
{
    /// Configure domain writes and buffer strategy.
    ///
    /// - `setter`: called via [`SharedDomainData::modify`] with the current
    ///   [`StreamStatus<T>`] at each state transition or flush.
    /// - `initial`: the starting buffer value. Becomes `Some(initial)` when
    ///   the stream transitions to [`StreamState::Streaming`].
    /// - `on_item`: called for each stream item to update the local buffer.
    ///   The buffer is flushed to domain state periodically (see [`.batch()`](StreamConfig::batch)).
    ///
    /// Returns a [`StreamConfig`] that can be further configured with
    /// [`.batch()`](StreamConfig::batch) and finished with [`.go()`](StreamConfig::go).
    pub fn into<T>(
        self,
        setter: impl Fn(&mut D, StreamStatus<T>) + Send + Sync + 'static,
        initial: T,
        on_item: impl Fn(&mut T, I) + Send + Sync + 'static,
    ) -> StreamConfig<D, I, T>
    where
        T: Clone + Send + 'static,
    {
        StreamConfig {
            domain: self.domain,
            rt: self.rt,
            stream: self.stream,
            setter: Arc::new(setter),
            on_item: Arc::new(on_item),
            initial,
            batch_interval: None,
        }
    }

    /// Consume the stream with no domain writes (fire-and-forget).
    ///
    /// Items are consumed from the source but not written to domain state.
    /// Returns a [`StreamHandle`] for abort or completion monitoring.
    ///
    /// For fallible streams (created via [`try_from_stream`](StreamExecutor::try_from_stream)),
    /// consumption stops on the first error.
    pub fn go(self) -> StreamHandle {
        let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);

        let inner = self.rt.spawn(async move {
            let mut stream = self.stream;

            loop {
                tokio::select! {
                    biased;
                    _ = cancel_rx.changed() => {
                        break;
                    }
                    item = stream.next() => {
                        match item {
                            Some(Ok(_)) => {}
                            Some(Err(_)) | None => {
                                break;
                            }
                        }
                    }
                }
            }
        });

        StreamHandle {
            cancel: cancel_tx,
            inner,
        }
    }
}

// ---------------------------------------------------------------------------
// StreamConfig
// ---------------------------------------------------------------------------

/// A fully configured stream consumer ready to spawn.
///
/// Created by [`StreamSource::into`]. Optionally call
/// [`.batch()`](StreamConfig::batch) for stream-level batching, then
/// [`.go()`](StreamConfig::go) to spawn the consumer task.
///
/// # Type parameters
///
/// - `D`: The domain data type.
/// - `I`: The stream item type.
/// - `T`: The buffer type stored in [`StreamStatus<T>`].
pub struct StreamConfig<D, I, T> {
    domain: SharedDomainData<D>,
    rt: tokio::runtime::Handle,
    stream: StreamKind<I>,
    setter: StreamStatusSetter<D, T>,
    on_item: StreamItemCallback<T, I>,
    initial: T,
    batch_interval: Option<Duration>,
}

impl<D, I, T> StreamConfig<D, I, T>
where
    D: Clone + Send + Sync + 'static,
    I: Send + 'static,
    T: Clone + Send + 'static,
{
    /// Set the batch flush interval.
    ///
    /// When set, items are accumulated in a local buffer and flushed to
    /// domain state at most once per `interval`. This reduces pressure on
    /// the domain write channel for high-frequency streams (e.g., audio).
    ///
    /// When not set (default), every item triggers an immediate flush.
    #[must_use = "the builder must be consumed with `.go()` to spawn the stream"]
    pub fn batch(mut self, interval: Duration) -> Self {
        self.batch_interval = Some(interval);
        self
    }

    /// Spawn the stream consumer as a tokio task.
    ///
    /// The consumer loop:
    /// 1. Sets [`StreamState::Streaming`] with the initial buffer.
    /// 2. Polls items from the source, calling `on_item` for each.
    /// 3. Flushes the buffer to domain state (respecting batch interval).
    /// 4. On stream end: sets [`StreamState::Completed`].
    /// 5. On error: sets [`StreamState::Error`] with partial buffer.
    /// 6. On abort: sets [`StreamState::Aborted`] with partial buffer.
    ///
    /// Returns a [`StreamHandle`] for abort and completion monitoring.
    #[must_use = "the config must be consumed with `.go()` to spawn the stream"]
    pub fn go(self) -> StreamHandle {
        let (cancel_tx, mut cancel_rx) = tokio::sync::watch::channel(false);

        let StreamConfig {
            domain,
            rt,
            stream,
            setter,
            on_item,
            initial,
            batch_interval,
        } = self;

        let inner = rt.spawn(async move {
            let mut buffer = initial;
            let mut last_flush = std::time::Instant::now();

            // Transition: Idle → Streaming
            {
                let s = setter.clone();
                let status = StreamStatus::streaming(buffer.clone());
                domain.modify(move |d| s(d, status));
            }

            let mut stream = stream;

            loop {
                tokio::select! {
                    biased;
                    _ = cancel_rx.changed() => {
                        // Transition: → Aborted
                        let s = setter.clone();
                        let status = StreamStatus::aborted(buffer.clone());
                        domain.modify(move |d| s(d, status));
                        break;
                    }
                    item = stream.next() => {
                        match item {
                            Some(Ok(item)) => {
                                on_item(&mut buffer, item);

                                let should_flush = match batch_interval {
                                    Some(interval) => last_flush.elapsed() >= interval,
                                    None => true,
                                };

                                if should_flush {
                                    last_flush = std::time::Instant::now();
                                    let s = setter.clone();
                                    let status = StreamStatus::streaming(buffer.clone());
                                    domain.modify(move |d| s(d, status));
                                }
                            }
                            Some(Err(e)) => {
                                // Transition: → Error (buffer retained)
                                let s = setter.clone();
                                let status = StreamStatus::error(&e, buffer.clone());
                                domain.modify(move |d| s(d, status));
                                break;
                            }
                            None => {
                                // Transition: → Completed
                                let s = setter.clone();
                                let status = StreamStatus::completed(buffer.clone());
                                domain.modify(move |d| s(d, status));
                                break;
                            }
                        }
                    }
                }
            }
        });

        StreamHandle {
            cancel: cancel_tx,
            inner,
        }
    }
}