use dbuff::{SharedDomainData, StreamState, StreamStatus};
use futures_util::stream;
use rpds::VectorSync;
use std::time::Duration;
#[derive(Clone, Default)]
struct AppData {
completion: StreamStatus<VectorSync<String>>,
audio_status: StreamStatus<()>,
}
#[tokio::main]
async fn main() {
let (domain, wh) = SharedDomainData::with_coalesce(AppData::default(), Duration::from_millis(1));
tokio::spawn(wh.run());
let rt = tokio::runtime::Handle::current();
let tokens: Vec<String> = vec![
"Hello".into(),
",".into(),
" world".into(),
"!".into(),
];
let token_stream = stream::unfold((tokens, 0), |(tokens, i)| async move {
if i < tokens.len() {
tokio::time::sleep(Duration::from_millis(200)).await;
Some((Ok::<_, std::io::Error>(tokens[i].clone()), (tokens, i + 1)))
} else {
None
}
});
let _token_handle = domain.stream(rt.clone())
.try_from_stream(token_stream)
.into(
|d: &mut AppData, s: StreamStatus<VectorSync<String>>| d.completion = s,
VectorSync::new_sync(),
|buf: &mut VectorSync<String>, token: String| *buf = buf.push_back(token),
)
.go();
println!("=== LLM Streaming ===");
loop {
let status = &domain.read().completion;
match status.state() {
StreamState::Idle => println!(" idle"),
StreamState::Streaming => {
if let Some(buf) = status.buffer() {
let text: String = buf.iter().cloned().collect();
println!(" streaming: \"{text}\"");
}
}
StreamState::Completed => {
if let Some(buf) = status.buffer() {
let text: String = buf.iter().cloned().collect();
println!(" completed: \"{text}\"");
assert_eq!(buf.len(), 4);
}
break;
}
StreamState::Error(e) => {
panic!("unexpected error: {e}");
}
StreamState::Aborted => {
panic!("unexpected abort");
}
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
domain.modify(|d: &mut AppData| d.completion = StreamStatus::idle());
let frames: Vec<u32> = (0..50).collect();
let frame_stream = stream::iter(frames);
let _audio_handle = domain.stream(rt.clone())
.from_stream(frame_stream)
.into(
|d: &mut AppData, s: StreamStatus<()>| d.audio_status = s,
(),
|_buf: &mut (), _frame: u32| {
},
)
.batch(Duration::from_millis(5))
.go();
println!("\n=== Audio Discard Mode ===");
loop {
let status = &domain.read().audio_status;
match status.state() {
StreamState::Idle => println!(" idle"),
StreamState::Streaming => {
println!(" streaming...");
}
StreamState::Completed => {
println!(" completed");
break;
}
StreamState::Error(e) => {
panic!("unexpected error: {e}");
}
StreamState::Aborted => {
panic!("unexpected abort");
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let items: Vec<i32> = vec![1, 2, 3, 4, 5];
let item_stream = stream::iter(items);
let handle = domain.stream(rt)
.from_stream(item_stream)
.go();
println!("\n=== Fire-and-Forget ===");
handle.await.unwrap();
println!(" stream consumed (no domain writes)");
println!("\nAll examples passed!");
}