dbuff 0.1.0

Double-buffered state with async command chains, streaming, and keyed task pools for ratatui applications
Documentation
use dbuff::{SharedDomainData, StreamState, StreamStatus};
use futures_util::stream;
use rpds::VectorSync;
use std::time::Duration;

#[derive(Clone, Default)]
struct AppData {
    /// Accumulates LLM tokens as they stream in.
    completion: StreamStatus<VectorSync<String>>,
    /// Tracks audio processing status (buffer is unit type — data is discarded).
    audio_status: StreamStatus<()>,
}

#[tokio::main]
async fn main() {
    let (domain, wh) = SharedDomainData::with_coalesce(AppData::default(), Duration::from_millis(1));
    tokio::spawn(wh.run());
    let rt = tokio::runtime::Handle::current();

    // -------------------------------------------------------------------------
    // Example 1: LLM-style token streaming (accumulate into domain)
    // -------------------------------------------------------------------------

    // Simulated LLM stream: yields tokens one by one
    let tokens: Vec<String> = vec![
        "Hello".into(),
        ",".into(),
        " world".into(),
        "!".into(),
    ];
    let token_stream = stream::unfold((tokens, 0), |(tokens, i)| async move {
        if i < tokens.len() {
            tokio::time::sleep(Duration::from_millis(200)).await;
            Some((Ok::<_, std::io::Error>(tokens[i].clone()), (tokens, i + 1)))
        } else {
            None
        }
    });

    let _token_handle = domain.stream(rt.clone())
        .try_from_stream(token_stream)
        .into(
            |d: &mut AppData, s: StreamStatus<VectorSync<String>>| d.completion = s,
            VectorSync::new_sync(),
            |buf: &mut VectorSync<String>, token: String| *buf = buf.push_back(token),
        )
        .go();

    // Poll for completion
    println!("=== LLM Streaming ===");
    loop {
        let status = &domain.read().completion;
        match status.state() {
            StreamState::Idle => println!("  idle"),
            StreamState::Streaming => {
                if let Some(buf) = status.buffer() {
                    let text: String = buf.iter().cloned().collect();
                    println!("  streaming: \"{text}\"");
                }
            }
            StreamState::Completed => {
                if let Some(buf) = status.buffer() {
                    let text: String = buf.iter().cloned().collect();
                    println!("  completed: \"{text}\"");
                    assert_eq!(buf.len(), 4);
                }
                break;
            }
            StreamState::Error(e) => {
                panic!("unexpected error: {e}");
            }
            StreamState::Aborted => {
                panic!("unexpected abort");
            }
        }
        tokio::time::sleep(Duration::from_millis(20)).await;
    }

    // -------------------------------------------------------------------------
    // Example 2: Discard mode (process items but don't accumulate in buffer)
    // -------------------------------------------------------------------------

    // Reset completion status for reuse
    domain.modify(|d: &mut AppData| d.completion = StreamStatus::idle());

    // Simulated audio frames: we process them (count) but don't store them
    let frames: Vec<u32> = (0..50).collect();
    let frame_stream = stream::iter(frames);

    let _audio_handle = domain.stream(rt.clone())
        .from_stream(frame_stream)
        .into(
            |d: &mut AppData, s: StreamStatus<()>| d.audio_status = s,
            (),
            |_buf: &mut (), _frame: u32| {
                // In real code, this would call an audio processor.
                // Here we just discard the frame.
            },
        )
        .batch(Duration::from_millis(5))
        .go();

    println!("\n=== Audio Discard Mode ===");
    loop {
        let status = &domain.read().audio_status;
        match status.state() {
            StreamState::Idle => println!("  idle"),
            StreamState::Streaming => {
                println!("  streaming...");
            }
            StreamState::Completed => {
                println!("  completed");
                break;
            }
            StreamState::Error(e) => {
                panic!("unexpected error: {e}");
            }
            StreamState::Aborted => {
                panic!("unexpected abort");
            }
        }
        tokio::time::sleep(Duration::from_millis(10)).await;
    }

    // -------------------------------------------------------------------------
    // Example 3: Fire-and-forget (no domain writes at all)
    // -------------------------------------------------------------------------

    let items: Vec<i32> = vec![1, 2, 3, 4, 5];
    let item_stream = stream::iter(items);

    let handle = domain.stream(rt)
        .from_stream(item_stream)
        .go();

    println!("\n=== Fire-and-Forget ===");
    handle.await.unwrap();
    println!("  stream consumed (no domain writes)");

    println!("\nAll examples passed!");
}