use std::collections::HashMap;
use dfir_rs::assert_graphvis_snapshots;
use dfir_rs::scheduled::ticks::TickInstant;
use dfir_rs::util::collect_ready;
use multiplatform_test::multiplatform_test;
#[multiplatform_test]
pub fn test_scan_tick() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<u32>();
let mut df = dfir_rs::dfir_syntax! {
source_stream(items_recv)
-> scan::<'tick>(|| 0, |acc: &mut u32, x: u32| {
*acc += x;
Some(*acc)
})
-> for_each(|v| result_send.send(v).unwrap());
};
assert_graphvis_snapshots!(df);
assert_eq!(TickInstant::new(0), df.current_tick());
items_send.send(1).unwrap();
items_send.send(2).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(1), df.current_tick());
assert_eq!(&[1, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
items_send.send(3).unwrap();
items_send.send(4).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(2), df.current_tick());
assert_eq!(&[3, 7], &*collect_ready::<Vec<_>, _>(&mut result_recv));
df.run_available_sync(); }
#[multiplatform_test]
pub fn test_scan_static() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<u32>();
let mut df = dfir_rs::dfir_syntax! {
source_stream(items_recv)
-> scan::<'static>(|| 0, |acc: &mut u32, x: u32| {
*acc += x;
Some(*acc)
})
-> for_each(|v| result_send.send(v).unwrap());
};
assert_graphvis_snapshots!(df);
assert_eq!(TickInstant::new(0), df.current_tick());
items_send.send(1).unwrap();
items_send.send(2).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(1), df.current_tick());
assert_eq!(&[1, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
items_send.send(3).unwrap();
items_send.send(4).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(2), df.current_tick());
assert_eq!(&[6, 10], &*collect_ready::<Vec<_>, _>(&mut result_recv));
df.run_available_sync(); }
#[multiplatform_test]
pub fn test_scan_empty_input() {
let (_items_send, items_recv) = dfir_rs::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<u32>();
let mut df = dfir_rs::dfir_syntax! {
source_stream(items_recv)
-> scan::<'tick>(|| 0, |acc: &mut u32, x: u32| {
*acc += x;
Some(*acc)
})
-> for_each(|v| result_send.send(v).unwrap());
};
df.run_tick_sync();
assert_eq!(
&[] as &[u32],
&*collect_ready::<Vec<_>, _>(&mut result_recv)
);
df.run_available_sync(); }
#[multiplatform_test]
pub fn test_scan_different_types() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<String>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<(usize, String)>();
let mut df = dfir_rs::dfir_syntax! {
source_stream(items_recv)
-> scan::<'tick>(|| (0, String::new()), |acc: &mut (usize, String), x: String| {
// Accumulator is a tuple of (count, concatenated_string)
acc.0 += 1;
if !acc.1.is_empty() {
acc.1.push_str(", ");
}
acc.1.push_str(&x);
// Clone the accumulator for the output since it doesn't implement Copy
Some((acc.0, acc.1.clone()))
})
-> for_each(|v| result_send.send(v).unwrap());
};
items_send.send("hello".to_owned()).unwrap();
items_send.send("world".to_owned()).unwrap();
df.run_tick_sync();
let results = collect_ready::<Vec<_>, _>(&mut result_recv);
assert_eq!(2, results.len());
assert_eq!((1, "hello".to_owned()), results[0]);
assert_eq!((2, "hello, world".to_owned()), results[1]);
df.run_available_sync(); }
#[multiplatform_test]
pub fn test_scan_early_termination() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<u32>();
let mut df = dfir_rs::dfir_syntax! {
source_stream(items_recv)
-> scan::<'tick>(|| 0, |acc: &mut u32, x: u32| {
*acc += x;
// Terminate when accumulator exceeds 5
if *acc > 5 {
None
} else {
Some(*acc)
}
})
-> for_each(|v| result_send.send(v).unwrap());
};
items_send.send(1).unwrap(); items_send.send(2).unwrap(); items_send.send(3).unwrap(); items_send.send(4).unwrap(); df.run_tick_sync();
assert_eq!(&[1, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
items_send.send(1).unwrap();
items_send.send(2).unwrap();
df.run_tick_sync();
assert_eq!(&[1, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
df.run_available_sync(); }
#[multiplatform_test]
pub fn test_scan_static_early_termination() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<u32>();
let mut df = dfir_rs::dfir_syntax! {
source_stream(items_recv)
-> scan::<'static>(|| 0, |acc: &mut u32, x: u32| {
*acc += x;
// Terminate when accumulator exceeds 5
if *acc > 5 {
None
} else {
Some(*acc)
}
})
-> for_each(|v| result_send.send(v).unwrap());
};
items_send.send(1).unwrap(); items_send.send(2).unwrap(); items_send.send(3).unwrap(); items_send.send(4).unwrap(); df.run_tick_sync();
assert_eq!(&[1, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
items_send.send(1).unwrap();
items_send.send(2).unwrap();
df.run_tick_sync();
assert_eq!(
&[] as &[u32],
&*collect_ready::<Vec<_>, _>(&mut result_recv)
);
df.run_available_sync(); }
#[multiplatform_test]
pub fn test_scan_complex_accumulator() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<(String, u32)>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<HashMap<String, u32>>();
let mut df = dfir_rs::dfir_syntax! {
source_stream(items_recv)
-> scan::<'tick>(HashMap::<String, u32>::new, |acc: &mut HashMap<String, u32>, item: (String, u32)| {
// Update frequency count for each key
let entry = acc.entry(item.0).or_insert(0);
*entry += item.1;
// Return a clone of the current state
Some(acc.clone())
})
-> for_each(|v| result_send.send(v).unwrap());
};
items_send.send(("apple".to_owned(), 1)).unwrap();
items_send.send(("banana".to_owned(), 2)).unwrap();
items_send.send(("apple".to_owned(), 3)).unwrap();
df.run_tick_sync();
let results = collect_ready::<Vec<_>, _>(&mut result_recv);
assert_eq!(3, results.len());
let mut expected1 = HashMap::new();
expected1.insert("apple".to_owned(), 1);
assert_eq!(expected1, results[0]);
let mut expected2 = HashMap::new();
expected2.insert("apple".to_owned(), 1);
expected2.insert("banana".to_owned(), 2);
assert_eq!(expected2, results[1]);
let mut expected3 = HashMap::new();
expected3.insert("apple".to_owned(), 4);
expected3.insert("banana".to_owned(), 2);
assert_eq!(expected3, results[2]);
df.run_available_sync(); }
#[multiplatform_test]
pub fn test_scan_push() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<u32>();
let mut df = dfir_rs::dfir_syntax! {
teed = source_stream(items_recv) -> tee();
teed -> for_each(|_| {}); teed
-> scan::<'tick>(|| 0, |acc: &mut u32, x: u32| {
*acc += x;
Some(*acc)
})
-> for_each(|v| result_send.send(v).unwrap());
};
assert_graphvis_snapshots!(df);
assert_eq!(TickInstant::new(0), df.current_tick());
items_send.send(1).unwrap();
items_send.send(2).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(1), df.current_tick());
assert_eq!(&[1, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
items_send.send(3).unwrap();
items_send.send(4).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(2), df.current_tick());
assert_eq!(&[3, 7], &*collect_ready::<Vec<_>, _>(&mut result_recv));
df.run_available_sync(); }