use std::collections::BTreeSet;
use dfir_rs::assert_graphvis_snapshots;
use dfir_rs::scheduled::ticks::TickInstant;
use dfir_rs::util::collect_ready;
use multiplatform_test::multiplatform_test;
#[multiplatform_test]
pub fn test_fold_keyed_infer_basic() {
pub struct SubordResponse {
pub xid: &'static str,
pub mtype: u32,
}
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<(&'static str, u32)>();
let mut df = dfir_rs::dfir_syntax! {
source_iter([
SubordResponse { xid: "123", mtype: 33 },
SubordResponse { xid: "123", mtype: 52 },
SubordResponse { xid: "123", mtype: 72 },
SubordResponse { xid: "123", mtype: 83 },
SubordResponse { xid: "123", mtype: 78 },
])
-> map(|m: SubordResponse| (m.xid, m.mtype))
-> fold_keyed::<'static>(|| 0, |old: &mut u32, val: u32| *old += val)
-> for_each(|kv| result_send.send(kv).unwrap());
};
assert_graphvis_snapshots!(df);
assert_eq!(TickInstant::new(0), df.current_tick());
df.run_tick_sync();
assert_eq!(TickInstant::new(1), df.current_tick());
df.run_available_sync();
assert_eq!(
&[("123", 318), ("123", 318)],
&*collect_ready::<Vec<_>, _>(&mut result_recv)
);
}
#[multiplatform_test]
pub fn test_fold_keyed_typed_basic() {
pub struct SubordResponse {
pub xid: &'static str,
pub mtype: u32,
}
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<(&'static str, u32)>();
let mut df = dfir_rs::dfir_syntax! {
source_iter([
SubordResponse { xid: "123", mtype: 33 },
SubordResponse { xid: "123", mtype: 52 },
SubordResponse { xid: "123", mtype: 72 },
SubordResponse { xid: "123", mtype: 83 },
SubordResponse { xid: "123", mtype: 78 },
])
-> map(|m: SubordResponse| (m.xid, m.mtype))
-> fold_keyed::<'static, &'static str, u32>(|| 0, |old: &mut u32, val: u32| *old += val)
-> for_each(|kv| result_send.send(kv).unwrap());
};
assert_graphvis_snapshots!(df);
assert_eq!(TickInstant::new(0), df.current_tick());
df.run_tick_sync();
assert_eq!(TickInstant::new(1), df.current_tick());
df.run_available_sync();
assert_eq!(
&[("123", 318), ("123", 318)],
&*collect_ready::<Vec<_>, _>(&mut result_recv)
);
}
#[multiplatform_test]
pub fn test_fold_keyed_tick() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<(u32, Vec<u32>)>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<(u32, Vec<u32>)>();
let mut df = dfir_rs::dfir_syntax! {
source_stream(items_recv)
-> fold_keyed::<'tick>(Vec::new, |old: &mut Vec<u32>, mut x: Vec<u32>| old.append(&mut x))
-> for_each(|v| result_send.send(v).unwrap());
};
assert_graphvis_snapshots!(df);
assert_eq!(TickInstant::new(0), df.current_tick());
df.run_tick_sync();
assert_eq!(TickInstant::new(1), df.current_tick());
items_send.send((0, vec![1, 2])).unwrap();
items_send.send((0, vec![3, 4])).unwrap();
items_send.send((1, vec![1])).unwrap();
items_send.send((1, vec![1, 2])).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(2), df.current_tick());
assert_eq!(
[(0, vec![1, 2, 3, 4]), (1, vec![1, 1, 2])]
.into_iter()
.collect::<BTreeSet<_>>(),
collect_ready::<BTreeSet<_>, _>(&mut result_recv)
);
items_send.send((0, vec![5, 6])).unwrap();
items_send.send((0, vec![7, 8])).unwrap();
items_send.send((1, vec![10])).unwrap();
items_send.send((1, vec![11, 12])).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(3), df.current_tick());
assert_eq!(
[(0, vec![5, 6, 7, 8]), (1, vec![10, 11, 12])]
.into_iter()
.collect::<BTreeSet<_>>(),
collect_ready::<BTreeSet<_>, _>(&mut result_recv)
);
df.run_available_sync(); }
#[multiplatform_test]
pub fn test_fold_keyed_static() {
let (items_send, items_recv) = dfir_rs::util::unbounded_channel::<(u32, Vec<u32>)>();
let (result_send, mut result_recv) = dfir_rs::util::unbounded_channel::<(u32, Vec<u32>)>();
let mut df = dfir_rs::dfir_syntax! {
source_stream(items_recv)
-> fold_keyed::<'static>(Vec::new, |old: &mut Vec<u32>, mut x: Vec<u32>| old.append(&mut x))
-> for_each(|v| result_send.send(v).unwrap());
};
assert_graphvis_snapshots!(df);
assert_eq!(TickInstant::new(0), df.current_tick());
df.run_tick_sync();
assert_eq!(TickInstant::new(1), df.current_tick());
items_send.send((0, vec![1, 2])).unwrap();
items_send.send((0, vec![3, 4])).unwrap();
items_send.send((1, vec![1])).unwrap();
items_send.send((1, vec![1, 2])).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(2), df.current_tick());
assert_eq!(
[(0, vec![1, 2, 3, 4]), (1, vec![1, 1, 2])]
.into_iter()
.collect::<BTreeSet<_>>(),
collect_ready::<BTreeSet<_>, _>(&mut result_recv)
);
items_send.send((0, vec![5, 6])).unwrap();
items_send.send((0, vec![7, 8])).unwrap();
items_send.send((1, vec![10])).unwrap();
items_send.send((1, vec![11, 12])).unwrap();
df.run_tick_sync();
assert_eq!(TickInstant::new(3), df.current_tick());
assert_eq!(
[
(0, vec![1, 2, 3, 4, 5, 6, 7, 8]),
(1, vec![1, 1, 2, 10, 11, 12])
]
.into_iter()
.collect::<BTreeSet<_>>(),
collect_ready::<BTreeSet<_>, _>(&mut result_recv)
);
df.run_available_sync(); }