use serde::{Deserialize, Serialize};
use stageleft::q;
use crate::live_collections::sliced::sliced;
use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
use crate::location::{Location, Process};
use crate::nondet::nondet;
use crate::prelude::FlowBuilder;
use crate::sim::{SimReceiver, SimSender};
mod trophies;
#[cfg(not(nightly))]
#[test]
#[should_panic]
#[cfg_attr(not(target_os = "linux"), ignore)] fn sim_crash_in_output() {
use bytes::Bytes;
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input();
let out_recv: SimReceiver<Bytes, TotalOrder, ExactlyOnce> = input.sim_output();
flow.sim().fuzz(async || {
in_send.send(bolero::any::<Vec<u8>>().into());
let x = out_recv.next().await.unwrap();
if !x.is_empty() && x[0] == 42 && x.len() > 1 && x[1] == 43 && x.len() > 2 && x[2] == 44 {
panic!("boom");
}
});
}
#[cfg(not(nightly))]
#[test]
#[should_panic]
#[cfg_attr(not(target_os = "linux"), ignore)] fn sim_crash_in_output_with_filter() {
use bytes::Bytes;
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input::<Bytes, _, _>();
let out_recv = input
.filter(q!(|x| x.len() > 1 && x[0] == 42 && x[1] == 43))
.sim_output();
flow.sim().fuzz(async || {
in_send.send(bolero::any::<Vec<u8>>().into());
if let Some(x) = out_recv.next().await
&& x.len() > 2
&& x[2] == 44
{
panic!("boom");
}
});
}
#[test]
fn sim_batch_preserves_order_fuzzed() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input();
let tick = node.tick();
let out_recv = input
.batch(&tick, nondet!())
.all_ticks()
.sim_output();
flow.sim().fuzz(async || {
in_send.send(1);
in_send.send(2);
in_send.send(3);
assert_eq!(out_recv.next().await.unwrap(), 1);
assert_eq!(out_recv.next().await.unwrap(), 2);
assert_eq!(out_recv.next().await.unwrap(), 3);
assert!(out_recv.next().await.is_none());
});
}
fn fuzzed_batching_program<'a>(
node: Process<'a>,
) -> (
SimSender<i32, TotalOrder, ExactlyOnce>,
SimReceiver<i32, TotalOrder, ExactlyOnce>,
) {
let tick = node.tick();
let (in_send, input) = node.sim_input();
let out_recv = input
.batch(&tick, nondet!())
.fold(q!(|| 0), q!(|acc, v| *acc += v))
.all_ticks()
.sim_output();
(in_send, out_recv)
}
fn fuzzed_batching_program_sliced<'a>(
node: Process<'a>,
) -> (
SimSender<i32, TotalOrder, ExactlyOnce>,
SimReceiver<i32, TotalOrder, ExactlyOnce>,
) {
let (in_send, input) = node.sim_input();
let out_recv = sliced! {
let batch = use(input, nondet!());
batch.fold(q!(|| 0), q!(|acc, v| *acc += v)).into_stream()
}
.sim_output();
(in_send, out_recv)
}
#[test]
#[should_panic]
fn sim_crash_with_fuzzed_batching() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, out_recv) = fuzzed_batching_program(node);
flow.sim().fuzz(async || {
for _ in 0..1000 {
in_send.send(456); }
in_send.send(100);
in_send.send(23);
in_send.send(99);
while let Some(out) = out_recv.next().await {
if out == 456 {
return;
} else if out == 123 {
panic!("boom");
}
}
});
}
#[test]
#[cfg_attr(target_os = "windows", ignore)] fn trace_for_fuzzed_batching() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, out_recv) = fuzzed_batching_program(node);
let repro_bytes = std::fs::read(
"./src/sim/tests/sim-failures/hydro_lang__sim__tests__sim_crash_with_fuzzed_batching.bin",
)
.unwrap();
let mut log_out = Vec::new();
colored::control::set_override(false);
flow.sim()
.compiled()
.fuzz_repro(repro_bytes, async |compiled| {
let schedule = compiled.schedule_with_logger(&mut log_out);
let rest = async move {
for _ in 0..1000 {
in_send.send(456); }
in_send.send(100);
in_send.send(23);
in_send.send(99);
while let Some(out) = out_recv.next().await {
if out == 456 {
return;
} else if out == 123 {
return;
}
}
};
tokio::select! {
biased;
_ = rest => {},
_ = schedule => {},
};
});
let log_str = String::from_utf8(log_out).unwrap();
hydro_build_utils::assert_snapshot!(log_str);
}
#[test]
#[cfg_attr(target_os = "windows", ignore)] fn trace_for_fuzzed_batching_sliced() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, out_recv) = fuzzed_batching_program_sliced(node);
let repro_bytes = std::fs::read(
"./src/sim/tests/sim-failures/hydro_lang__sim__tests__sim_crash_with_fuzzed_batching.bin",
)
.unwrap();
let mut log_out = Vec::new();
colored::control::set_override(false);
flow.sim()
.compiled()
.fuzz_repro(repro_bytes, async |compiled| {
let schedule = compiled.schedule_with_logger(&mut log_out);
let rest = async move {
for _ in 0..1000 {
in_send.send(456); }
in_send.send(100);
in_send.send(23);
in_send.send(99);
while let Some(out) = out_recv.next().await {
if out == 456 {
return;
} else if out == 123 {
return;
}
}
};
tokio::select! {
biased;
_ = rest => {},
_ = schedule => {},
};
});
let log_str = String::from_utf8(log_out).unwrap();
hydro_build_utils::assert_snapshot!(log_str);
}
#[derive(Serialize, Deserialize)]
struct Test {}
#[test]
fn sim_batch_nondebuggable_type() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
let tick = node.tick();
let _out_recv = input
.batch(&tick, nondet!())
.count()
.all_ticks()
.sim_output();
flow.sim().exhaustive(async || {
in_send.send(Test {});
let _: Vec<_> = _out_recv.collect().await;
});
}
#[test]
fn sim_cluster_e2m_m2e() {
let mut flow = FlowBuilder::new();
let cluster = flow.cluster::<()>();
let (in_send, input) = cluster.sim_input::<i32>();
let out_recv = input.map(q!(|x| x * 10)).sim_cluster_output();
flow.sim()
.with_cluster_size(&cluster, 3)
.exhaustive(async || {
in_send.send(0, 1); in_send.send(1, 2); in_send.send(2, 3);
assert_eq!(out_recv.next(0).await, Some(10));
assert_eq!(out_recv.next(1).await, Some(20));
assert_eq!(out_recv.next(2).await, Some(30));
});
}
#[test]
fn sim_send_after_assert_yields_only() {
let mut flow = FlowBuilder::new();
let process = flow.process::<()>();
let (send_port, input) = process.sim_input();
let output = input.atomic().end_atomic();
let out_port = output.sim_output();
flow.sim().exhaustive(async || {
send_port.send(1u32);
out_port.assert_yields_only([1u32]).await;
send_port.send(2u32);
out_port.assert_yields_only([2u32]).await;
});
}
#[test]
#[should_panic(expected = "unexpected message")]
fn assert_yields_only_catches_extra_value() {
let mut flow = FlowBuilder::new();
let process = flow.process::<()>();
let (send_port, input) = process.sim_input();
let out_port = input.atomic().end_atomic().sim_output();
flow.sim().exhaustive(async || {
send_port.send(1u32);
send_port.send(2u32);
out_port.assert_yields_only([1u32]).await;
});
}
#[test]
fn sim_collect_waits_for_all_ticks() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let tick = node.tick();
let (in_send, input) = node.sim_input();
let out_recv = input
.batch(&tick, nondet!())
.all_ticks()
.sim_output();
flow.sim().exhaustive(async || {
in_send.send(1);
in_send.send(2);
in_send.send(3);
let all: Vec<i32> = out_recv.collect().await;
assert_eq!(all, vec![1, 2, 3]);
});
}