use serde::{Deserialize, Serialize};
use stageleft::q;
use crate::live_collections::sliced::sliced;
use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
use crate::location::{Location, Process};
use crate::nondet::nondet;
use crate::prelude::FlowBuilder;
use crate::sim::{SimReceiver, SimSender};
mod trophies;
#[cfg(not(nightly))]
#[test]
#[should_panic]
#[cfg_attr(not(target_os = "linux"), ignore)] fn sim_crash_in_output() {
use bytes::Bytes;
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input();
let out_recv: SimReceiver<Bytes, TotalOrder, ExactlyOnce> = input.sim_output();
flow.sim().fuzz(async || {
in_send.send(bolero::any::<Vec<u8>>().into());
let x = out_recv.next().await.unwrap();
if !x.is_empty() && x[0] == 42 && x.len() > 1 && x[1] == 43 && x.len() > 2 && x[2] == 44 {
panic!("boom");
}
});
}
#[cfg(not(nightly))]
#[test]
#[should_panic]
#[cfg_attr(not(target_os = "linux"), ignore)] fn sim_crash_in_output_with_filter() {
use bytes::Bytes;
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input::<Bytes, _, _>();
let out_recv = input
.filter(q!(|x| x.len() > 1 && x[0] == 42 && x[1] == 43))
.sim_output();
flow.sim().fuzz(async || {
in_send.send(bolero::any::<Vec<u8>>().into());
if let Some(x) = out_recv.next().await
&& x.len() > 2
&& x[2] == 44
{
panic!("boom");
}
});
}
#[test]
fn sim_batch_preserves_order_fuzzed() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input();
let tick = node.tick();
let out_recv = input
.batch(&tick, nondet!())
.all_ticks()
.sim_output();
flow.sim().fuzz(async || {
in_send.send(1);
in_send.send(2);
in_send.send(3);
assert_eq!(out_recv.next().await.unwrap(), 1);
assert_eq!(out_recv.next().await.unwrap(), 2);
assert_eq!(out_recv.next().await.unwrap(), 3);
assert!(out_recv.next().await.is_none());
});
}
fn fuzzed_batching_program<'a>(
node: Process<'a>,
) -> (
SimSender<i32, TotalOrder, ExactlyOnce>,
SimReceiver<i32, TotalOrder, ExactlyOnce>,
) {
let tick = node.tick();
let (in_send, input) = node.sim_input();
let out_recv = input
.batch(&tick, nondet!())
.fold(q!(|| 0), q!(|acc, v| *acc += v))
.all_ticks()
.sim_output();
(in_send, out_recv)
}
fn fuzzed_batching_program_sliced<'a>(
node: Process<'a>,
) -> (
SimSender<i32, TotalOrder, ExactlyOnce>,
SimReceiver<i32, TotalOrder, ExactlyOnce>,
) {
let (in_send, input) = node.sim_input();
let out_recv = sliced! {
let batch = use(input, nondet!());
batch.fold(q!(|| 0), q!(|acc, v| *acc += v)).into_stream()
}
.sim_output();
(in_send, out_recv)
}
#[test]
#[should_panic]
fn sim_crash_with_fuzzed_batching() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, out_recv) = fuzzed_batching_program(node);
flow.sim().fuzz(async || {
for _ in 0..1000 {
in_send.send(456); }
in_send.send(100);
in_send.send(23);
in_send.send(99);
while let Some(out) = out_recv.next().await {
if out == 456 {
return;
} else if out == 123 {
panic!("boom");
}
}
});
}
#[test]
#[cfg_attr(target_os = "windows", ignore)] fn trace_for_fuzzed_batching() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, out_recv) = fuzzed_batching_program(node);
let repro_bytes = std::fs::read(
"./src/sim/tests/sim-failures/hydro_lang__sim__tests__sim_crash_with_fuzzed_batching.bin",
)
.unwrap();
let mut log_out = Vec::new();
colored::control::set_override(false);
flow.sim()
.compiled()
.fuzz_repro(repro_bytes, async |compiled| {
let schedule = compiled.schedule_with_logger(&mut log_out);
let rest = async move {
for _ in 0..1000 {
in_send.send(456); }
in_send.send(100);
in_send.send(23);
in_send.send(99);
while let Some(out) = out_recv.next().await {
if out == 456 {
return;
} else if out == 123 {
return;
}
}
};
tokio::select! {
biased;
_ = rest => {},
_ = schedule => {},
};
});
let log_str = String::from_utf8(log_out).unwrap();
hydro_build_utils::assert_snapshot!(log_str);
}
#[test]
#[cfg_attr(target_os = "windows", ignore)] fn trace_for_fuzzed_batching_sliced() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, out_recv) = fuzzed_batching_program_sliced(node);
let repro_bytes = std::fs::read(
"./src/sim/tests/sim-failures/hydro_lang__sim__tests__sim_crash_with_fuzzed_batching.bin",
)
.unwrap();
let mut log_out = Vec::new();
colored::control::set_override(false);
flow.sim()
.compiled()
.fuzz_repro(repro_bytes, async |compiled| {
let schedule = compiled.schedule_with_logger(&mut log_out);
let rest = async move {
for _ in 0..1000 {
in_send.send(456); }
in_send.send(100);
in_send.send(23);
in_send.send(99);
while let Some(out) = out_recv.next().await {
if out == 456 {
return;
} else if out == 123 {
return;
}
}
};
tokio::select! {
biased;
_ = rest => {},
_ = schedule => {},
};
});
let log_str = String::from_utf8(log_out).unwrap();
hydro_build_utils::assert_snapshot!(log_str);
}
#[derive(Serialize, Deserialize)]
struct Test {}
#[test]
fn sim_batch_nondebuggable_type() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
let tick = node.tick();
let _out_recv = input
.batch(&tick, nondet!())
.count()
.all_ticks()
.sim_output();
flow.sim().exhaustive(async || {
in_send.send(Test {});
let _: Vec<_> = _out_recv.collect().await;
});
}
#[test]
fn sim_cluster_e2m_m2e() {
let mut flow = FlowBuilder::new();
let cluster = flow.cluster::<()>();
let (in_send, input) = cluster.sim_input::<i32>();
let out_recv = input.map(q!(|x| x * 10)).sim_cluster_output();
flow.sim()
.with_cluster_size(&cluster, 3)
.exhaustive(async || {
in_send.send(0, 1); in_send.send(1, 2); in_send.send(2, 3);
assert_eq!(out_recv.next(0).await, Some(10));
assert_eq!(out_recv.next(1).await, Some(20));
assert_eq!(out_recv.next(2).await, Some(30));
});
}
#[test]
fn sim_send_after_assert_yields_only() {
let mut flow = FlowBuilder::new();
let process = flow.process::<()>();
let (send_port, input) = process.sim_input();
let output = input.atomic().end_atomic();
let out_port = output.sim_output();
flow.sim().exhaustive(async || {
send_port.send(1u32);
out_port.assert_yields_only([1u32]).await;
send_port.send(2u32);
out_port.assert_yields_only([2u32]).await;
});
}
#[test]
#[should_panic(expected = "unexpected message")]
fn assert_yields_only_catches_extra_value() {
let mut flow = FlowBuilder::new();
let process = flow.process::<()>();
let (send_port, input) = process.sim_input();
let out_port = input.atomic().end_atomic().sim_output();
flow.sim().exhaustive(async || {
send_port.send(1u32);
send_port.send(2u32);
out_port.assert_yields_only([1u32]).await;
});
}
#[test]
fn sim_collect_waits_for_all_ticks() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let tick = node.tick();
let (in_send, input) = node.sim_input();
let out_recv = input
.batch(&tick, nondet!())
.all_ticks()
.sim_output();
flow.sim().exhaustive(async || {
in_send.send(1);
in_send.send(2);
in_send.send(3);
let all: Vec<i32> = out_recv.collect().await;
assert_eq!(all, vec![1, 2, 3]);
});
}
#[test]
fn resolve_futures_blocking_preserves_bounded() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let tick = node.tick();
let resolved = node
.source_iter(q!(vec![1, 2, 3]))
.batch(&tick, nondet!())
.map(q!(|x| async move { x }))
.resolve_futures_blocking();
let crossed = resolved.cross_singleton(node.singleton(q!(10)).clone_into_tick(&tick));
let out_recv = crossed.all_ticks().sim_output();
flow.sim().exhaustive(async || {
let results: Vec<(i32, i32)> = out_recv.collect_sorted().await;
assert_eq!(results, vec![(1, 10), (2, 10), (3, 10)]);
});
}
#[test]
fn sim_fold_sample_eager_state_count() {
use crate::live_collections::stream::NoOrder;
use crate::properties::manual_proof;
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input::<i32, NoOrder, ExactlyOnce>();
let folded = input.fold(
q!(|| 0),
q!(
|acc, v| *acc += v,
commutative = manual_proof!()
),
);
let out_recv = sliced! {
let snapshot = use(folded, nondet!());
snapshot.into_stream()
}
.sim_output();
let count = flow.sim().exhaustive(async || {
in_send.send_many_unordered([1, 2, 3]);
let all: Vec<i32> = out_recv.collect().await;
assert_eq!(*all.last().unwrap(), 6);
});
assert_eq!(count, 108, "Exhaustive states explored");
}
#[test]
fn sim_fold_commutative_explores_all_subset_sums() {
use std::collections::HashSet;
use crate::live_collections::stream::NoOrder;
use crate::properties::manual_proof;
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input::<i32, NoOrder, ExactlyOnce>();
let folded = input.fold(
q!(|| 0),
q!(
|acc, v| *acc += v,
commutative = manual_proof!()
),
);
let out_recv = sliced! {
let snapshot = use(folded, nondet!());
snapshot.into_stream()
}
.sim_output();
let mut observed_values = HashSet::new();
flow.sim().exhaustive(async || {
in_send.send_many_unordered([1, 2, 4]);
let all: Vec<i32> = out_recv.collect().await;
assert_eq!(*all.last().unwrap(), 7);
for &v in &all {
observed_values.insert(v);
}
});
let expected: HashSet<i32> = (0..=7).collect();
assert_eq!(
observed_values, expected,
"Should observe all subset sums across all executions"
);
}
#[test]
fn sim_fold_total_order_no_permutation() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let source = node.source_stream(q!(tokio_stream::iter(vec!["a", "b", "c"])));
let folded = source.fold(q!(|| String::new()), q!(|acc, v| acc.push_str(v)));
let out_recv = sliced! {
let snapshot = use(folded, nondet!());
snapshot.into_stream()
}
.sim_output();
let mut all_observed = std::collections::HashSet::new();
flow.sim().exhaustive(async || {
let all: Vec<String> = out_recv.collect().await;
assert_eq!(all.last().unwrap(), "abc");
for v in all {
all_observed.insert(v);
}
});
for v in &all_observed {
assert!(
["", "a", "ab", "abc"].contains(&v.as_str()),
"Unexpected intermediate: {:?}",
v
);
}
}
#[test]
fn sim_fold_keyed_no_order() {
use crate::live_collections::stream::NoOrder;
use crate::properties::manual_proof;
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input::<(u32, i32), NoOrder, ExactlyOnce>();
let folded = input.into_keyed().fold(
q!(|| 0),
q!(
|acc, v| *acc += v,
commutative = manual_proof!()
),
);
let out_recv = sliced! {
let snapshot = use(folded, nondet!());
snapshot.entries()
}
.sim_output();
flow.sim().exhaustive(async || {
in_send.send_many_unordered([(1, 10), (2, 20), (1, 30)]);
let all: Vec<(u32, i32)> = out_recv.collect_sorted().await;
let mut last_by_key = std::collections::HashMap::new();
for (k, v) in all {
last_by_key.insert(k, v);
}
assert_eq!(last_by_key.get(&1), Some(&40));
assert_eq!(last_by_key.get(&2), Some(&20));
});
}
#[test]
fn sim_fold_tee_downstream_sees_different_subsets() {
use std::collections::HashSet;
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3])));
let folded = source.fold(q!(|| 0), q!(|acc, v| *acc += v));
let out_a = sliced! {
let snapshot = use(folded.clone(), nondet!());
snapshot.into_stream()
}
.sim_output();
let out_b = sliced! {
let snapshot = use(folded, nondet!());
snapshot.into_stream()
}
.sim_output();
let mut observed_pairs: HashSet<(Vec<i32>, Vec<i32>)> = HashSet::new();
flow.sim().exhaustive(async || {
let a_values: Vec<i32> = out_a.collect().await;
let b_values: Vec<i32> = out_b.collect().await;
assert_eq!(*a_values.last().unwrap(), 6);
assert_eq!(*b_values.last().unwrap(), 6);
observed_pairs.insert((a_values, b_values));
});
#[expect(clippy::disallowed_methods, reason = "order is not used in test")]
let has_divergent = observed_pairs.iter().any(|(a, b)| a != b);
assert!(
has_divergent,
"Expected at least one execution where downstream consumers see different intermediate states, \
but all observed pairs were identical: {:?}",
observed_pairs
);
}
#[test]
fn sim_fold_catches_false_commutativity() {
use std::collections::HashSet;
use crate::live_collections::stream::NoOrder;
use crate::properties::manual_proof;
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input::<String, NoOrder, ExactlyOnce>();
let folded = input.fold(
q!(|| String::new()),
q!(
|acc, v| acc.push_str(&v),
commutative = manual_proof!()
),
);
let out_recv = sliced! {
let snapshot = use(folded, nondet!());
snapshot.into_stream()
}
.sim_output();
let mut final_values = HashSet::new();
flow.sim().exhaustive(async || {
in_send.send_many_unordered(["a".to_owned(), "b".to_owned()]);
let all: Vec<String> = out_recv.collect().await;
final_values.insert(all.first().unwrap().clone());
});
assert!(
final_values.contains("ab") && final_values.contains("ba"),
"Expected both 'ab' and 'ba' to be observed, got: {:?}",
final_values
);
}
#[test]
fn sim_fold_in_tick_catches_false_commutativity() {
use std::collections::HashSet;
use crate::live_collections::stream::NoOrder;
use crate::properties::manual_proof;
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input::<String, NoOrder, ExactlyOnce>();
let tick = node.tick();
let out_recv = input
.batch(&tick, nondet!())
.fold(
q!(|| String::new()),
q!(
|acc, v| acc.push_str(&v),
commutative = manual_proof!()
),
)
.into_stream()
.all_ticks()
.sim_output();
let mut final_values = HashSet::new();
flow.sim().exhaustive(async || {
in_send.send_many_unordered(["a".to_owned(), "b".to_owned()]);
let all: Vec<String> = out_recv.collect().await;
for v in all {
final_values.insert(v);
}
});
assert!(
final_values.contains("ab") && final_values.contains("ba"),
"Expected both \"ab\" and \"ba\" to be observed, got: {:?}",
final_values
);
}
#[test]
fn sim_singleton_not_ready_until_producer_runs() {
use crate::live_collections::stream::NoOrder;
let mut flow = FlowBuilder::new();
let p = flow.process::<()>();
let (in_port, in_stream) = p.sim_input::<u32, TotalOrder, _>();
let in_no_order = in_stream.weaken_ordering::<NoOrder>();
let produced_singleton = sliced! {
let batch = use(in_no_order.clone(), nondet!());
batch.assume_ordering::<TotalOrder>(nondet!())
.fold(q!(|| 0u32), q!(|acc, v| *acc += v))
};
let out = sliced! {
let trigger = use(in_no_order, nondet!());
let snapshot = use(produced_singleton, nondet!());
trigger.cross_singleton(snapshot)
}
.assume_ordering::<TotalOrder>(nondet!());
let out_port = out.sim_output();
flow.sim().exhaustive(async || {
in_port.send(42);
let _ = out_port.next().await;
});
}