use dbuff::{SharedDomainData, StreamState, StreamStatus};
use futures_util::stream;
use rpds::VectorSync;
use std::time::Duration;
#[derive(Clone, Default)]
struct AudioData {
audio: StreamStatus<VectorSync<u32>>,
}
#[tokio::main]
async fn main() {
let (domain, wh) =
SharedDomainData::with_coalesce(AudioData::default(), Duration::from_millis(1));
tokio::spawn(wh.run());
let rt = tokio::runtime::Handle::current();
let total_frames: u32 = 1_000;
let frame_stream = stream::unfold(0u32, move |i| async move {
if i >= total_frames {
None
} else {
tokio::time::sleep(Duration::from_micros(100)).await;
Some((i, i + 1))
}
});
let _handle = domain
.stream(rt)
.from_stream(frame_stream)
.into(
|d: &mut AudioData, s: StreamStatus<VectorSync<u32>>| d.audio = s,
VectorSync::new_sync(),
|buf: &mut VectorSync<u32>, frame: u32| {
*buf = buf.push_back(frame);
},
)
.batch(Duration::from_millis(5))
.go();
println!("=== High-Frequency Audio Accumulation ===");
println!(
"Streaming {total_frames} frames with 5ms batch interval...\n",
);
loop {
let status = &domain.read().audio;
match status.state() {
StreamState::Idle => println!(" idle"),
StreamState::Streaming => {
if let Some(buf) = status.buffer() {
let first = buf.first().copied().unwrap_or(0);
let last = buf.get(buf.len().saturating_sub(1)).copied().unwrap_or(0);
println!(" streaming: {} frames ({}..{})", buf.len(), first, last);
}
}
StreamState::Completed => {
if let Some(buf) = status.buffer() {
println!(" completed: {} frames accumulated", buf.len());
assert_eq!(buf.len(), total_frames as usize);
}
break;
}
StreamState::Error(e) => {
panic!("unexpected error: {e}");
}
StreamState::Aborted => {
panic!("unexpected abort");
}
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
println!("\nHigh-performance streaming example passed!");
println!("Key takeaway: rpds::Vector makes buffer.clone() O(log n) per flush,");
println!("so even with 1,000 accumulated frames, each batch flush is cheap.");
}