1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use actix_rt;
use futures::FutureExt;
use tokio::time::{Duration, sleep};
use vuo::Stream; // For .boxed()
async fn run_merge_example() {
let local_set = tokio::task::LocalSet::new();
local_set
.run_until(async {
println!("Setting up streams for merge example...");
// Stream 1: Emits 1, 2, 3. Each item is "processed" with a 50ms delay.
// We use par_map_ordered with parallelism 1 to simulate sequential asynchronous item production.
let stream1_source_data = vec![1, 2, 3];
let stream1 = Stream::emits(stream1_source_data).par_map_ordered(1, |item| {
async move {
// println!("Stream 1: Processing {} (will take 50ms)", item); // Uncomment for verbose logging
sleep(Duration::from_millis(50)).await;
// println!("Stream 1: Emitting {}", item); // Uncomment for verbose logging
item
}
.boxed() // Ensure the future is BoxFuture<'static, _> + Send
});
// Stream 2: Emits 101, 102, 103. Each item is "processed" with a 30ms delay.
// This stream is slightly faster per item, which should lead to interleaving.
let stream2_source_data = vec![101, 102, 103];
let stream2 = Stream::emits(stream2_source_data).par_map_ordered(1, |item| {
// Renamed item to avoid capture issues if any, though not strictly needed here
async move {
// println!("Stream 2: Processing {} (will take 30ms)", item); // Uncomment for verbose logging
sleep(Duration::from_millis(30)).await;
// println!("Stream 2: Emitting {}", item); // Uncomment for verbose logging
item
}
.boxed() // Ensure the future is BoxFuture<'static, _> + Send
});
println!("Merging streams...");
let merged_stream = stream1.merge(stream2);
println!("Collecting results from merged stream...");
let results = merged_stream.compile_to_list().await;
match results {
Ok(mut data) => {
println!(
"Merged results (order may vary due to concurrency): {:?}",
data
);
// Verify that all items from both streams are present.
// The exact order of interleaving is non-deterministic.
// For stable verification, we sort the collected data.
let mut expected_data = vec![1, 2, 3, 101, 102, 103];
// Check length first
assert_eq!(
data.len(),
expected_data.len(),
"Merged data length does not match expected length."
);
// Sort both for comparison
data.sort();
expected_data.sort(); // Should already be sorted, but good practice.
println!("Merged results (sorted for verification): {:?}", data);
assert_eq!(
data, expected_data,
"Sorted merged data does not match expected elements."
);
println!("Merge example successful! All items received and verified.");
}
Err(_) => {
eprintln!(
"Merge example failed: Stream compilation to list resulted in an error."
);
// Fail the test explicitly if there's an error
assert!(false, "Stream compilation failed");
}
}
})
.await;
}
fn main() {
// Optional: Initialize a logger (like env_logger) to see any internal vuo logs
// or the println! statements within the async blocks if you uncomment them.
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace")).init();
// Setup Actix system and run the example
actix_rt::System::new().block_on(run_merge_example());
}