use dfir_rs::scheduled::ticks::TickInstant;
use dfir_rs::util::collect_ready;
use dfir_rs::{assert_graphvis_snapshots, dfir_syntax};
use multiplatform_test::multiplatform_test;
#[multiplatform_test]
pub fn test_reduce_tick() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<u32>();
let mut df = dfir_rs::dfir_syntax! {
source_stream(items_recv)
-> reduce::<'tick>(|acc: &mut u32, next: u32| *acc += next)
-> for_each(|v| result_send.send(v).unwrap());
};
assert_graphvis_snapshots!(df);
assert_eq!(TickInstant::new(0), df.current_tick());
items_send.send(1).unwrap();
items_send.send(2).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(1), df.current_tick());
assert_eq!(&[3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
items_send.send(3).unwrap();
items_send.send(4).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(2), df.current_tick());
assert_eq!(&[7], &*collect_ready::<Vec<_>, _>(&mut result_recv));
}
#[multiplatform_test]
pub fn test_reduce_static() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<u32>();
let mut df = dfir_rs::dfir_syntax! {
source_stream(items_recv)
-> reduce::<'static>(|acc: &mut u32, next: u32| *acc += next)
-> for_each(|v| result_send.send(v).unwrap());
};
assert_graphvis_snapshots!(df);
assert_eq!(TickInstant::new(0), df.current_tick());
items_send.send(1).unwrap();
items_send.send(2).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(1), df.current_tick());
assert_eq!(&[3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
items_send.send(3).unwrap();
items_send.send(4).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(2), df.current_tick());
assert_eq!(&[10], &*collect_ready::<Vec<_>, _>(&mut result_recv));
}
#[multiplatform_test]
pub fn test_reduce_sum() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<usize>();
let mut df = dfir_syntax! {
source_stream(items_recv)
-> reduce(|a: &mut _, b| *a += b)
-> for_each(|v| print!("{:?}", v));
};
assert_graphvis_snapshots!(df);
assert_eq!(TickInstant::new(0), df.current_tick());
df.run_tick_sync();
assert_eq!(TickInstant::new(1), df.current_tick());
print!("\nA: ");
items_send.send(9).unwrap();
items_send.send(2).unwrap();
items_send.send(5).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(2), df.current_tick());
print!("\nB: ");
items_send.send(9).unwrap();
items_send.send(5).unwrap();
items_send.send(2).unwrap();
items_send.send(0).unwrap();
items_send.send(3).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(3), df.current_tick());
println!();
}