dbuff 0.1.0

Double-buffered state with async command chains, streaming, and keyed task pools for ratatui applications
Documentation
use std::sync::Arc;
use std::time::Duration;

use arc_swap::ArcSwap;
use kanal::unbounded;

use crate::command::OnComplete;

type Update<D> = Box<dyn FnOnce(&mut D) + Send>;
type UpdateSender<D> = kanal::Sender<Update<D>>;
type UpdateReceiver<D> = kanal::Receiver<Update<D>>;
type Setter<D, T> = Arc<dyn Fn(&mut D, T) + Send + Sync>;

/// Handle to the write side of a [`SharedDomainData`] double buffer.
///
/// Returned by [`SharedDomainData::new`] or [`SharedDomainData::with_coalesce`].
/// Must be spawned as a tokio task via [`run`](DomainWriteHandle::run) at startup
/// to process pending updates.
///
/// The write task exits automatically when all [`SharedDomainData`] clones are
/// dropped (the underlying channel closes).
///
/// # Batching
///
/// Updates are coalesced into batches to minimize cloning and `ArcSwap` store
/// operations. After the first update arrives, the task drains any
/// immediately-available updates, waits for the configured coalesce interval,
/// drains again, then applies the entire batch in a single clone → apply → store
/// cycle.
pub struct DomainWriteHandle<D> {
    swap: Arc<ArcSwap<D>>,
    rx: UpdateReceiver<D>,
    coalesce_interval: Duration,
}

impl<D> DomainWriteHandle<D>
where
    D: Clone + Send + 'static,
{
    /// Run the write loop until the channel closes.
    ///
    /// Spawn this as a tokio task: `tokio::spawn(write_handle.run())`.
    ///
    /// Each iteration:
    /// 1. Awaits the first pending update (blocks).
    /// 2. Drains all immediately-available updates (non-blocking).
    /// 3. Sleeps for the [`coalesce_interval`](SharedDomainData::with_coalesce).
    /// 4. Drains again for any stragglers.
    /// 5. Clones the current data **once**, applies all closures, stores **once**.
    ///
    /// The loop exits when all [`SharedDomainData`] senders are dropped.
    pub async fn run(self) {
        loop {
            let Ok(first) = self.rx.as_async().recv().await else {
                return;
            };

            let mut batch: Vec<Update<D>> = vec![first];

            while let Ok(Some(update)) = self.rx.try_recv() {
                batch.push(update);
            }

            tokio::time::sleep(self.coalesce_interval).await;

            while let Ok(Some(update)) = self.rx.try_recv() {
                batch.push(update);
            }

            let current = self.swap.load();
            let mut new_data = (**current).clone();
            drop(current);
            for update in batch {
                update(&mut new_data);
            }
            self.swap.store(Arc::new(new_data));
        }
    }
}

/// Lock-free, double-buffered shared state with batched writes.
///
/// Reads via [`read`](SharedDomainData::read) are instantaneous — they return
/// a guard that dereferences to `&D` without acquiring any lock.
///
/// Writes go through an unbounded channel to a dedicated async task
/// ([`DomainWriteHandle`]) that batches and applies them. This means
/// [`modify`](SharedDomainData::modify), [`handler`](SharedDomainData::handler),
/// and [`handler_noop`](SharedDomainData::handler_noop) are non-blocking and
/// safe to call from sync contexts.
///
/// # Type parameters
///
/// - `D`: The domain data type. Must be `Clone` (the write task clones before
///   applying updates) and `Send + 'static` (sent across the channel).
///
/// # Example
///
/// ```
/// use dbuff::*;
/// use std::time::Duration;
///
/// #[derive(Clone, Default)]
/// struct MyData { count: i32 }
///
/// # async fn example() {
/// let (domain, write_handle) = SharedDomainData::new(MyData::default());
/// tokio::spawn(write_handle.run());
///
/// domain.modify(|data| data.count += 1);
///
/// tokio::time::sleep(Duration::from_millis(1)).await;
/// assert_eq!(domain.read().count, 1);
/// # }
/// ```
#[derive(Debug)]
pub struct SharedDomainData<D> {
    swap: Arc<ArcSwap<D>>,
    write_tx: UpdateSender<D>,
}

impl<D> Clone for SharedDomainData<D>
where
    D: Clone,
{
    fn clone(&self) -> Self {
        Self {
            swap: self.swap.clone(),
            write_tx: self.write_tx.clone(),
        }
    }
}

impl<D> SharedDomainData<D>
where
    D: Clone + Send + Sync + 'static,
{
    /// Create a new shared data instance with the default 500µs coalesce interval.
    ///
    /// Returns the data handle and a [`DomainWriteHandle`] that must be spawned
    /// as a tokio task. See [`with_coalesce`](Self::with_coalesce) for a
    /// configurable interval.
    pub fn new(initial: D) -> (Self, DomainWriteHandle<D>) {
        Self::with_coalesce(initial, Duration::from_micros(500))
    }

    /// Create a new shared data instance with a custom coalesce interval.
    ///
    /// `coalesce_interval` controls how long the write task waits after the
    /// first update arrives before applying the batch. Shorter intervals
    /// reduce latency; longer intervals improve throughput by coalescing
    /// more updates into a single clone+store cycle.
    ///
    /// Returns the data handle and a [`DomainWriteHandle`] that must be spawned
    /// as a tokio task.
    pub fn with_coalesce(initial: D, coalesce_interval: Duration) -> (Self, DomainWriteHandle<D>) {
        let (tx, rx) = unbounded();
        let swap = Arc::new(ArcSwap::from_pointee(initial));
        (
            Self {
                swap: swap.clone(),
                write_tx: tx,
            },
            DomainWriteHandle {
                swap,
                rx,
                coalesce_interval,
            },
        )
    }

    /// Read the current snapshot of domain data.
    ///
    /// Returns an [`arc_swap::Guard`] that dereferences to `&D`. The guard
    /// is valid until the next write is applied. Reads are lock-free.
    pub fn read(&self) -> arc_swap::Guard<Arc<D>> {
        self.swap.load()
    }

    /// Enqueue an arbitrary mutation to be applied by the write task.
    ///
    /// The closure `f` receives `&mut D` and is executed asynchronously on the
    /// write task. This method is non-blocking — it sends the closure over
    /// the channel and returns immediately.
    ///
    /// The mutation will be batched with other pending updates within the
    /// coalesce window.
    pub fn modify(&self, f: impl FnOnce(&mut D) + Send + 'static) {
        let _ = self.write_tx.send(Box::new(f));
    }

    pub fn handler<T>(
        &self,
        setter: impl Fn(&mut D, T) + Send + Sync + 'static,
    ) -> DomainHandler<D, T> {
        DomainHandler {
            write_tx: self.write_tx.clone(),
            setter: Arc::new(setter),
        }
    }

    pub fn handler_noop<T>(&self) -> DomainHandler<D, T>
    where
        T: Send + 'static,
    {
        self.handler(|_, _| {})
    }

    /// Create a domain-bound executor for ergonomic command chaining.
    ///
    /// Returns a [`DomainExecutor`] that has the domain pre-bound, so commands
    /// can be added with inline setters without pre-creating or cloning handlers.
    ///
    /// # Example
    ///
    /// ```
    /// use dbuff::*;
    /// use std::time::Duration;
    ///
    /// # #[derive(Clone, Default)]
    /// # struct Data { total: i32 }
    /// # #[derive(Debug, Clone, wherror::Error)]
    /// # #[error(debug)]
    /// # struct E;
    /// # struct Add(i32);
    /// # #[async_trait::async_trait]
    /// # impl Command<()> for Add {
    /// #     type Output = i32;
    /// #     type Error = E;
    /// #     async fn execute(self, _: ()) -> Result<Self::Output, Self::Error> { Ok(self.0) }
    /// # }
    /// # async fn example() {
    /// # let (domain, wh) = SharedDomainData::with_coalesce(Data::default(), Duration::from_millis(1));
    /// # tokio::spawn(wh.run());
    /// # let rt = tokio::runtime::Handle::current();
    ///
    /// domain.bind((), rt)
    ///     .exec(Add(10), |d, v: &i32| d.total += *v)
    ///     .exec(Add(20), |d, v: &i32| d.total += *v)
    ///     .go();
    ///
    /// tokio::time::sleep(Duration::from_millis(10)).await;
    /// assert_eq!(domain.read().total, 30);
    /// # }
    /// ```
    pub fn bind<S>(
        &self,
        services: S,
        rt: tokio::runtime::Handle,
    ) -> crate::executor::DomainExecutor<D, S>
    where
        S: Clone + Send + 'static,
    {
        crate::executor::DomainExecutor {
            domain: self.clone(),
            services,
            rt,
            error_handler: None,
        }
    }

    /// Create a domain-bound stream executor for ergonomic stream consumption.
    ///
    /// Returns a [`StreamExecutor`](crate::StreamExecutor) that can be
    /// configured with a stream source, optional buffer strategy, and
    /// optional batch interval.
    ///
    /// # Example
    ///
    /// ```
    /// use dbuff::*;
    /// use std::time::Duration;
    ///
    /// # #[derive(Clone, Default)]
    /// # struct Data { tokens: StreamStatus<Vec<String>> }
    /// # async fn example() {
    /// # let (domain, wh) = SharedDomainData::with_coalesce(Data::default(), Duration::from_millis(1));
    /// # tokio::spawn(wh.run());
    /// # let rt = tokio::runtime::Handle::current();
    ///
    /// domain.stream(rt)
    ///     .from_stream(futures_util::stream::empty())
    ///     .into(
    ///         |d, s: StreamStatus<Vec<String>>| d.tokens = s,
    ///         Vec::new(),
    ///         |buf, token: String| buf.push(token),
    ///     )
    ///     .go();
    /// # }
    /// ```
    pub fn stream(&self, rt: tokio::runtime::Handle) -> crate::stream::StreamExecutor<D> {
        crate::stream::StreamExecutor {
            domain: self.clone(),
            rt,
        }
    }
}

/// A handle that writes results back into [`SharedDomainData`].
///
/// Holds a reference to the shared data and a setter closure that determines
/// which field receives the completed value.
pub struct DomainHandler<D, T> {
    write_tx: UpdateSender<D>,
    setter: Setter<D, T>,
}

impl<D, T> Clone for DomainHandler<D, T> {
    fn clone(&self) -> Self {
        Self {
            write_tx: self.write_tx.clone(),
            setter: self.setter.clone(),
        }
    }
}

impl<D, T> OnComplete<T> for DomainHandler<D, T>
where
    D: Send + 'static,
    T: Send + 'static,
{
    fn run(self, value: T) {
        let setter = self.setter;
        let update: Update<D> = Box::new(move |data: &mut D| {
            setter(data, value);
        });
        let _ = self.write_tx.send(update);
    }
}