dbuff 0.1.0

Double-buffered state with async command chains, streaming, and keyed task pools for ratatui applications
Documentation
use dbuff::{SharedDomainData, StreamState, StreamStatus};
use futures_util::stream;
use rpds::VectorSync;
use std::time::Duration;

/// Accumulated audio frames with stream status.
#[derive(Clone, Default)]
struct AudioData {
    audio: StreamStatus<VectorSync<u32>>,
}

#[tokio::main]
async fn main() {
    let (domain, wh) =
        SharedDomainData::with_coalesce(AudioData::default(), Duration::from_millis(1));
    tokio::spawn(wh.run());
    let rt = tokio::runtime::Handle::current();

    // Simulate 1,000 audio frames arriving at high frequency.
    // In a real application, these would come from an audio source (e.g., a microphone
    // or audio file decoder) at 48kHz. Here we use a short sleep to simulate pacing.
    //
    // We use rpds::Vector<u32> as the buffer type. Unlike Vec, rpds::Vector uses
    // structural sharing — clone() is O(log n) instead of O(n). This makes the
    // buffer.clone() per flush cheap even as the buffer grows large.
    let total_frames: u32 = 1_000;
    let frame_stream = stream::unfold(0u32, move |i| async move {
        if i >= total_frames {
            None
        } else {
            // Simulate ~10kHz frame rate (100µs between frames).
            tokio::time::sleep(Duration::from_micros(100)).await;
            Some((i, i + 1))
        }
    });

    let _handle = domain
        .stream(rt)
        .from_stream(frame_stream)
        .into(
            |d: &mut AudioData, s: StreamStatus<VectorSync<u32>>| d.audio = s,
            VectorSync::new_sync(),
            |buf: &mut VectorSync<u32>, frame: u32| {
                // rpds::Vector is persistent — push_back returns a new vector.
                // The old vector's memory is shared (structural sharing), so this
                // is O(log n) instead of O(n) for Vec::push + clone.
                *buf = buf.push_back(frame);
            },
        )
        .batch(Duration::from_millis(5))
        .go();

    // Poll at ~60fps (16ms intervals) and watch the buffer grow.
    println!("=== High-Frequency Audio Accumulation ===");
    println!(
        "Streaming {total_frames} frames with 5ms batch interval...\n",
    );

    loop {
        let status = &domain.read().audio;
        match status.state() {
            StreamState::Idle => println!("  idle"),
            StreamState::Streaming => {
                if let Some(buf) = status.buffer() {
                    let first = buf.first().copied().unwrap_or(0);
                    let last = buf.get(buf.len().saturating_sub(1)).copied().unwrap_or(0);
                    println!("  streaming: {} frames ({}..{})", buf.len(), first, last);
                }
            }
            StreamState::Completed => {
                if let Some(buf) = status.buffer() {
                    println!("  completed: {} frames accumulated", buf.len());
                    assert_eq!(buf.len(), total_frames as usize);
                }
                break;
            }
            StreamState::Error(e) => {
                panic!("unexpected error: {e}");
            }
            StreamState::Aborted => {
                panic!("unexpected abort");
            }
        }
        tokio::time::sleep(Duration::from_millis(20)).await;
    }

    println!("\nHigh-performance streaming example passed!");
    println!("Key takeaway: rpds::Vector makes buffer.clone() O(log n) per flush,");
    println!("so even with 1,000 accumulated frames, each batch flush is cheap.");
}