use actix::Recipient;
use actix_rt;
use futures::FutureExt; use futures::SinkExt; use futures::StreamExt as FuturesStreamExt; use futures::channel::mpsc;
use std::marker::PhantomData;
use std::time::Duration;
use tokio::task::{self, LocalSet};
use vuo::Stream;
use vuo::stream::StreamMessage;
#[derive(Debug, Clone, PartialEq)]
struct DataEvent {
id: u32,
value: String,
}
fn main() {
let system = actix_rt::System::new();
system.block_on(async {
let local_set = LocalSet::new();
local_set.run_until(async {
println!("[Main] group_within Example: Starting");
let group_max_size = 3;
let group_max_duration = Duration::from_millis(500);
let (mut tx_emitter_mpsc, rx_emitter_mpsc) =
mpsc::unbounded::<StreamMessage<DataEvent>>();
let emitter_handle = task::spawn_local(async move {
println!("[Emitter] Starting emissions...");
for i in 0..group_max_size {
let event = DataEvent { id: i as u32, value: format!("Item-{}", i) };
println!("[Emitter] Sending id: {}", event.id);
if tx_emitter_mpsc.send(StreamMessage::Element(event)).await.is_err() {
println!("[Emitter] Error sending Batch 1 item to MPSC. Receiver likely dropped.");
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
println!("[Emitter] Batch 1 (IDs 0,1,2 for size-trigger) sent.");
let batch2_id_start = group_max_size as u32;
let event_id_3 = batch2_id_start;
let event_id_4 = batch2_id_start + 1;
println!("[Emitter] Sending id: {}", event_id_3);
let event3 = DataEvent { id: event_id_3, value: format!("Item-{}", event_id_3) };
if tx_emitter_mpsc.send(StreamMessage::Element(event3)).await.is_err() { return; }
tokio::time::sleep(Duration::from_millis(50)).await;
println!("[Emitter] Sending id: {}", event_id_4);
let event4 = DataEvent { id: event_id_4, value: format!("Item-{}", event_id_4) };
if tx_emitter_mpsc.send(StreamMessage::Element(event4)).await.is_err() { return; }
println!("[Emitter] Batch 2 (IDs {},{}) sent. Waiting for their timeout ({}ms + margin)...", event_id_3, event_id_4, group_max_duration.as_millis());
tokio::time::sleep(group_max_duration + Duration::from_millis(2000)).await;
let final_event_id = batch2_id_start + 2;
let event_final = DataEvent { id: final_event_id, value: "FinalItem".to_string() };
println!("[Emitter] Sending final id: {}", event_final.id);
if tx_emitter_mpsc.send(StreamMessage::Element(event_final)).await.is_err() { return; }
println!("[Emitter] All test items sent. Sending End via MPSC.");
if tx_emitter_mpsc.send(StreamMessage::End).await.is_err() {
println!("[Emitter] Error sending End to MPSC.");
}
});
let source_stream_setup_fn = Box::new(
move |downstream_recipient: Recipient<StreamMessage<DataEvent>>| {
let mut rx_mpsc_clone = rx_emitter_mpsc;
async move {
while let Some(msg) = FuturesStreamExt::next(&mut rx_mpsc_clone).await {
if downstream_recipient.try_send(msg).is_err() { return Err(String::from("Downstream consumer closed for group_within example source")); }
}
let _ = downstream_recipient.try_send(StreamMessage::End);
Ok(())
}.boxed()
},
);
let source_stream = Stream { setup_fn: source_stream_setup_fn, _phantom: PhantomData };
println!("[Main] Applying group_within(size={}, duration={:?})", group_max_size, group_max_duration);
let grouped_stream: Stream<Vec<DataEvent>> = source_stream.group_within(group_max_size, group_max_duration);
println!("[Main] Collecting results from grouped stream...");
let grouped_results_handle = task::spawn_local(async move {
grouped_stream.compile_to_list().await
});
if emitter_handle.await.is_err() {
eprintln!("[Main] Emitter task panicked or failed.");
} else {
println!("[Main] Emitter task completed.");
}
match grouped_results_handle.await.expect("Grouped stream task panicked") {
Ok(chunks) => {
println!("\n[Main] Grouped results ({} chunks):", chunks.len());
for (idx, chunk) in chunks.iter().enumerate() {
println!(" Chunk {}: ({} items)", idx, chunk.len());
for item_in_chunk in chunk { println!(" -> ID: {}, Value: '{}'", item_in_chunk.id, item_in_chunk.value);
}
}
assert_eq!(chunks.len(), 3, "Expected 3 chunks, but got {}. Chunks: {:?}", chunks.len(), chunks);
if chunks.len() >= 1 {
assert_eq!(chunks[0].len(), group_max_size, "Chunk 0 (IDs 0,1,2 by size) wrong size. Actual: {:?}", chunks[0]);
if chunks[0].len() == group_max_size {
assert_eq!(chunks[0][0].id, 0);
assert_eq!(chunks[0][1].id, 1);
assert_eq!(chunks[0][2].id, 2);
}
}
if chunks.len() >= 2 {
assert_eq!(chunks[1].len(), 2, "Chunk 1 (IDs 3,4 by time) wrong size. Actual: {:?}", chunks[1]);
if chunks[1].len() == 2 {
assert_eq!(chunks[1][0].id, 3);
assert_eq!(chunks[1][1].id, 4);
}
}
if chunks.len() >= 3 {
assert_eq!(chunks[2].len(), 1, "Chunk 2 (ID 5 by end) wrong size. Actual: {:?}", chunks[2]);
if chunks[2].len() == 1 {
assert_eq!(chunks[2][0].id, 5);
assert_eq!(chunks[2][0].value, "FinalItem");
}
}
let total_received_items: usize = chunks.iter().map(|c| c.len()).sum();
assert_eq!(total_received_items, group_max_size + 2 + 1, "Total number of items mismatch");
println!("\n[Main] Verification successful.");
}
Err(_) => eprintln!("[Main] Grouped stream processing failed."),
}
println!("\n[Main] group_within Example: Complete.");
}).await;
});
}