use std::collections::HashSet;
use dfir_rs::scheduled::ticks::TickInstant;
use dfir_rs::util::collect_ready;
use dfir_rs::util::multiset::HashMultiSet;
use dfir_rs::{assert_graphvis_snapshots, dfir_syntax};
use multiplatform_test::multiplatform_test;
#[multiplatform_test]
pub fn test_basic_2() {
let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::<usize>();
let mut df = dfir_syntax! {
source_iter([1]) -> for_each(|v| out_send.send(v).unwrap());
};
assert_graphvis_snapshots!(df);
df.run_available_sync();
assert_eq!(&[1], &*collect_ready::<Vec<_>, _>(&mut out_recv));
}
#[multiplatform_test]
pub fn test_basic_3() {
let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::<usize>();
let mut df = dfir_syntax! {
source_iter([1]) -> map(|v| v + 1) -> for_each(|v| out_send.send(v).unwrap());
};
assert_graphvis_snapshots!(df);
df.run_available_sync();
assert_eq!(&[2], &*collect_ready::<Vec<_>, _>(&mut out_recv));
}
#[multiplatform_test]
pub fn test_basic_union() {
let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::<usize>();
let mut df = dfir_syntax! {
m = union() -> for_each(|v| out_send.send(v).unwrap());
source_iter([1]) -> [0]m;
source_iter([2]) -> [1]m;
};
assert_graphvis_snapshots!(df);
df.run_available_sync();
assert_eq!(&[1, 2], &*collect_ready::<Vec<_>, _>(&mut out_recv));
}
#[multiplatform_test]
pub fn test_basic_tee() {
let (out_send_a, mut out_recv) = dfir_rs::util::unbounded_channel::<String>();
let out_send_b = out_send_a.clone();
let mut df = dfir_syntax! {
t = source_iter([1]) -> tee();
t[0] -> for_each(|v| out_send_a.send(format!("A {}", v)).unwrap());
t[1] -> for_each(|v| out_send_b.send(format!("B {}", v)).unwrap());
};
df.run_available_sync();
let out: HashSet<_> = collect_ready(&mut out_recv);
assert_eq!(2, out.len());
assert!(out.contains("A 1"));
assert!(out.contains("B 1"));
}
#[multiplatform_test]
pub fn test_basic_inspect_null() {
use std::cell::RefCell;
use std::rc::Rc;
let seen = Rc::new(RefCell::new(Vec::new()));
let seen_inner = Rc::clone(&seen);
let mut df = dfir_syntax! {
source_iter([1, 2, 3, 4]) -> inspect(|&x| seen_inner.borrow_mut().push(x)) -> null();
};
df.run_available_sync();
assert_eq!(&[1, 2, 3, 4], &**seen.borrow());
}
#[multiplatform_test]
pub fn test_basic_inspect_no_null() {
use std::cell::RefCell;
use std::rc::Rc;
let seen = Rc::new(RefCell::new(Vec::new()));
let seen_inner = Rc::clone(&seen);
let mut df = dfir_syntax! {
source_iter([1, 2, 3, 4]) -> inspect(|&x| seen_inner.borrow_mut().push(x));
};
df.run_available_sync();
assert_eq!(&[1, 2, 3, 4], &**seen.borrow());
}
#[multiplatform_test]
pub fn test_large_diamond() {
let mut df = dfir_syntax! {
t = source_iter([1]) -> tee();
j = union() -> for_each(|x| println!("{}", x));
t[0] -> map(std::convert::identity) -> map(std::convert::identity) -> [0]j;
t[1] -> map(std::convert::identity) -> map(std::convert::identity) -> [1]j;
};
df.run_available_sync();
}
#[multiplatform_test]
pub fn test_recv_expr() {
let send_recv = dfir_rs::util::unbounded_channel::<usize>();
let mut df = dfir_syntax! {
source_stream(send_recv.1)
-> for_each(|v| print!("{:?}", v));
};
assert_graphvis_snapshots!(df);
df.run_available_sync();
let items_send = send_recv.0;
items_send.send(9).unwrap();
items_send.send(2).unwrap();
items_send.send(5).unwrap();
df.run_available_sync();
}
#[multiplatform_test]
pub fn test_join_order() {
let _df_good = dfir_syntax! {
yikes = join() -> for_each(|m: ((), (u32, String))| println!("{:?}", m));
source_iter([0,1,2]) -> map(|i| ((), i)) -> [0]yikes;
source_iter(["a".to_owned(),"b".to_owned(),"c".to_owned()]) -> map(|s| ((), s)) -> [1]yikes;
};
let _df_bad = dfir_syntax! {
yikes = join() -> for_each(|m: ((), (u32, String))| println!("{:?}", m));
source_iter(["a".to_owned(),"b".to_owned(),"c".to_owned()]) -> map(|s| ((), s)) -> [1]yikes;
source_iter([0,1,2]) -> map(|i| ((), i)) -> [0]yikes;
};
}
#[multiplatform_test]
pub fn test_multiset_join() {
{
use dfir_pipes::pull::HalfSetJoinState;
let (out_tx, mut out_rx) = dfir_rs::util::unbounded_channel::<(usize, (usize, usize))>();
let mut df = dfir_syntax! {
my_join = join::<HalfSetJoinState>() -> for_each(|m| out_tx.send(m).unwrap());
source_iter([(0, 1), (0, 1)]) -> [0]my_join;
source_iter([(0, 2)]) -> [1]my_join;
};
df.run_available_sync();
let out: Vec<_> = collect_ready(&mut out_rx);
assert_eq!(out, vec![(0, (1, 2))]);
}
{
use dfir_pipes::pull::HalfMultisetJoinState;
let (out_tx, mut out_rx) = dfir_rs::util::unbounded_channel::<(usize, (usize, usize))>();
let mut df = dfir_syntax! {
my_join = join::<HalfMultisetJoinState>() -> for_each(|m| out_tx.send(m).unwrap());
source_iter([(1, 1), (1, 1), (1, 1)]) -> [0]my_join;
source_iter([(1, 2), (1, 2), (1, 2), (1, 2)]) -> [1]my_join;
};
df.run_available_sync();
let out: Vec<_> = collect_ready(&mut out_rx);
assert_eq!(out, [(1, (1, 2)); 12].to_vec());
}
{
use dfir_pipes::pull::HalfMultisetJoinState;
let (out_tx, mut out_rx) = dfir_rs::util::unbounded_channel::<(usize, (usize, usize))>();
let mut df = dfir_syntax! {
my_join = join::<HalfMultisetJoinState>() -> for_each(|m| out_tx.send(m).unwrap());
source_iter([(1, 1), (1, 1), (1, 1), (1, 1)]) -> [0]my_join;
source_iter([(1, 2), (1, 2), (1, 2)]) -> [1]my_join;
};
df.run_available_sync();
let out: Vec<_> = collect_ready(&mut out_rx);
assert_eq!(out, [(1, (1, 2)); 12].to_vec());
}
}
#[multiplatform_test]
pub fn test_cross_join() {
let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::<(usize, &str)>();
let mut df = dfir_syntax! {
cj = cross_join() -> for_each(|v| out_send.send(v).unwrap());
source_iter([1, 2, 2, 3]) -> [0]cj;
source_iter(["a", "b", "c", "c"]) -> [1]cj;
};
df.run_available_sync();
let mut out: Vec<_> = collect_ready(&mut out_recv);
out.sort();
assert_eq!(
out,
[
(1, "a"),
(1, "b"),
(1, "c"),
(2, "a"),
(2, "b"),
(2, "c"),
(3, "a"),
(3, "b"),
(3, "c")
]
);
}
#[multiplatform_test]
pub fn test_cross_join_multiset() {
let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::<(usize, &str)>();
let mut df = dfir_syntax! {
cj = cross_join_multiset() -> for_each(|v| out_send.send(v).unwrap());
source_iter([1, 2, 2, 3]) -> [0]cj;
source_iter(["a", "b", "c", "c"]) -> [1]cj;
};
df.run_available_sync();
let mut out: Vec<_> = collect_ready(&mut out_recv);
out.sort();
assert_eq!(
out,
[
(1, "a"),
(1, "b"),
(1, "c"),
(1, "c"),
(2, "a"),
(2, "a"),
(2, "b"),
(2, "b"),
(2, "c"),
(2, "c"),
(2, "c"),
(2, "c"),
(3, "a"),
(3, "b"),
(3, "c"),
(3, "c"),
]
);
}
#[multiplatform_test]
pub fn test_defer_tick() {
let (inp_send, inp_recv) = dfir_rs::util::unbounded_channel::<usize>();
let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::<usize>();
let mut flow = dfir_syntax! {
inp = source_stream(inp_recv) -> tee();
diff = difference() -> for_each(|x| out_send.send(x).unwrap());
inp -> [pos]diff;
inp -> defer_tick() -> [neg]diff;
};
for x in [1, 2, 3, 4] {
inp_send.send(x).unwrap();
}
flow.run_tick_sync();
for x in [3, 4, 5, 6] {
inp_send.send(x).unwrap();
}
flow.run_tick_sync();
flow.run_available_sync();
assert_eq!(
HashMultiSet::from_iter([1, 2, 3, 4, 5, 6]),
collect_ready(&mut out_recv)
);
}
#[multiplatform_test]
pub fn test_channel_minimal() {
let (send, recv) = dfir_rs::util::unbounded_channel::<usize>();
let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::<usize>();
let mut df1 = dfir_syntax! {
source_iter([1, 2, 3]) -> for_each(|x| { send.send(x).unwrap(); });
};
let mut df2 = dfir_syntax! {
source_stream(recv) -> for_each(|x| out_send.send(x).unwrap());
};
df2.run_available_sync();
let results = collect_ready::<Vec<_>, _>(&mut out_recv);
assert_eq!([] as [usize; 0], *results);
df1.run_available_sync();
let results = collect_ready::<Vec<_>, _>(&mut out_recv);
assert_eq!([] as [usize; 0], *results);
df2.run_available_sync();
let results = collect_ready::<Vec<_>, _>(&mut out_recv);
assert_eq!([1, 2, 3], *results);
}
#[multiplatform_test]
pub fn test_assert_eq() {
let mut df = dfir_syntax! {
source_iter([1, 2, 3]) -> assert_eq([1, 2, 3]) -> assert_eq([1, 2, 3]); source_iter([1, 2, 3]) -> assert_eq([1, 2, 3]) -> assert_eq(vec![1, 2, 3]);
source_iter([1, 2, 3]) -> assert_eq(vec![1, 2, 3]) -> assert_eq([1, 2, 3]);
source_iter(vec![1, 2, 3]) -> assert_eq([1, 2, 3]) -> assert_eq([1, 2, 3]);
};
df.run_available_sync();
}
#[multiplatform_test(test)]
pub fn test_assert_failures() {
assert!(
std::panic::catch_unwind(|| {
let mut df = dfir_syntax! {
source_iter([0]) -> assert_eq([1]);
};
df.run_available_sync();
})
.is_err()
);
assert!(
std::panic::catch_unwind(|| {
let mut df = dfir_syntax! {
source_iter([0]) -> assert_eq([1]) -> null();
};
df.run_available_sync();
})
.is_err()
);
}
#[multiplatform_test]
pub fn test_iter_stream_batches() {
const ITEMS: usize = 100;
const BATCH: usize = 5;
let stream = dfir_rs::util::iter_batches_stream(0..ITEMS, BATCH);
let expected: Vec<_> = (0..ITEMS)
.map(|n| (TickInstant::new((n / BATCH).try_into().unwrap()), n))
.collect();
let mut df = dfir_syntax! {
source_stream(stream)
-> map(|x| (context.current_tick(), x))
-> assert_eq(expected);
};
df.run_available_sync();
}