use std::hash::Hash;
use hydro_lang::live_collections::stream::{NoOrder, Ordering};
use hydro_lang::location::{Location, NoTick};
use hydro_lang::prelude::*;
#[expect(clippy::type_complexity, reason = "stream types with ordering")]
pub fn collect_quorum_with_response<
'a,
L: Location<'a> + NoTick,
Order: Ordering,
K: Clone + Eq + Hash,
V: Clone,
E: Clone,
>(
responses: Stream<(K, Result<V, E>), L, Unbounded, Order>,
min: usize,
max: usize,
) -> (
Stream<(K, V), L, Unbounded, Order>,
Stream<(K, E), L, Unbounded, Order>,
) {
let quorums = sliced! {
let new_inputs = use(responses.clone(), nondet!(
));
let mut not_all = use::state_null::<Stream<_, _, Bounded, Order>>();
let mut min_but_not_max = use::state_null::<Stream<K, _, Bounded, NoOrder>>();
let current_responses = not_all.chain(new_inputs);
let count_per_key = current_responses.clone().into_keyed().fold(
q!(move || (0, 0)),
q!(move |accum, value| {
if value.is_ok() {
accum.0 += 1;
} else {
accum.1 += 1;
}
}, commutative = manual_proof!()),
);
let not_reached_min_count = count_per_key
.clone()
.filter(q!(move |(success, _error)| success < &min))
.keys();
let reached_min_count = count_per_key
.clone()
.filter(q!(move |(success, _error)| success >= &min))
.keys();
let just_reached_quorum = if max == min {
not_all = current_responses.clone().anti_join(reached_min_count);
current_responses.anti_join(not_reached_min_count)
} else {
let received_from_all = count_per_key
.filter(q!(move |(success, error)| (success + error) >= max))
.keys();
not_all = current_responses.clone().anti_join(received_from_all.clone());
let out = current_responses
.anti_join(not_reached_min_count)
.anti_join(min_but_not_max);
min_but_not_max = reached_min_count.filter_not_in(received_from_all);
out
};
just_reached_quorum.filter_map(q!(move |(key, res)| match res {
Ok(v) => Some((key, v)),
Err(_) => None,
}))
};
(
quorums,
responses.filter_map(q!(move |(key, res)| match res {
Ok(_) => None,
Err(e) => Some((key, e)),
})),
)
}
#[expect(clippy::type_complexity, reason = "stream types with ordering")]
pub fn collect_quorum<
'a,
L: Location<'a> + NoTick,
Order: Ordering,
K: Clone + Eq + Hash,
E: Clone,
>(
responses: Stream<(K, Result<(), E>), L, Unbounded, Order>,
min: usize,
max: usize,
) -> (
Stream<K, L, Unbounded, NoOrder>,
Stream<(K, E), L, Unbounded, Order>,
) {
let just_reached_quorum = sliced! {
let new_inputs = use(responses.clone(), nondet!(
));
let mut not_all = use::state_null::<Stream<_, _, Bounded, Order>>();
let mut min_but_not_max = use::state_null::<Stream<K, _, Bounded, NoOrder>>();
let current_responses = not_all.chain(new_inputs);
let count_per_key = current_responses.clone().into_keyed().fold(
q!(move || (0, 0)),
q!(move |accum, value| {
if value.is_ok() {
accum.0 += 1;
} else {
accum.1 += 1;
}
}, commutative = manual_proof!()),
);
let reached_min_count = count_per_key
.clone()
.entries()
.filter_map(q!(move |(key, (success, _error))| if success >= min {
Some(key)
} else {
None
}));
let just_reached_quorum = if max == min {
not_all = current_responses.anti_join(reached_min_count.clone());
reached_min_count
} else {
let received_from_all = count_per_key
.filter(q!(move |(success, error)| (success + error) >= max))
.keys();
not_all = current_responses.anti_join(received_from_all.clone());
let out = reached_min_count.clone().filter_not_in(min_but_not_max);
min_but_not_max = reached_min_count.filter_not_in(received_from_all);
out
};
just_reached_quorum
};
(
just_reached_quorum,
responses.filter_map(q!(move |(key, res)| match res {
Ok(_) => None,
Err(e) => Some((key, e)),
})),
)
}
#[cfg(test)]
mod tests {
use hydro_lang::live_collections::stream::{NoOrder, TotalOrder};
use hydro_lang::prelude::*;
use super::{collect_quorum, collect_quorum_with_response};
#[test]
fn collect_quorum_with_response_preserves_order() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input();
let out_recv = collect_quorum_with_response(input, 3, 3).0.sim_output();
flow.sim().exhaustive(async || {
in_send.send((1, Ok::<(), ()>(())));
in_send.send((1, Ok(())));
in_send.send((1, Ok(())));
in_send.send((2, Ok(())));
in_send.send((2, Ok(())));
in_send.send((3, Ok(())));
in_send.send((3, Ok(())));
in_send.send((3, Ok(())));
assert_eq!(
out_recv.collect::<Vec<_>>().await,
vec![(1, ()), (1, ()), (1, ()), (3, ()), (3, ()), (3, ())]
)
});
}
#[test]
fn collect_quorum_with_response_no_order() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input::<_, NoOrder, _>();
let out_recv = collect_quorum_with_response(input, 2, 2).0.sim_output();
flow.sim().exhaustive(async || {
in_send.send_many_unordered([
(1, Ok::<(), ()>(())),
(1, Ok(())),
(2, Ok(())),
(3, Ok(())),
(3, Ok(())),
]);
out_recv
.assert_yields_only_unordered([(1, ()), (1, ()), (3, ()), (3, ())])
.await;
});
}
#[test]
fn collect_quorum_functionality() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input();
let (success_recv, error_recv) = {
let (success, error) = collect_quorum(input, 2, 3);
(success.sim_output(), error.sim_output())
};
let compiled_sim = flow.sim().compiled();
compiled_sim.exhaustive(async || {
in_send.send((1, Ok::<(), ()>(())));
in_send.send((1, Ok(())));
success_recv.assert_yields_only_unordered([1]).await;
error_recv.assert_no_more().await;
});
compiled_sim.exhaustive(async || {
in_send.send((2, Ok::<(), ()>(())));
in_send.send((2, Ok(())));
in_send.send((2, Err(())));
success_recv.assert_yields_only_unordered([2]).await;
error_recv.assert_yields_only([(2, ())]).await;
});
compiled_sim.exhaustive(async || {
in_send.send((3, Ok::<(), ()>(())));
in_send.send((3, Err(())));
in_send.send((3, Err(())));
success_recv.assert_no_more().await;
error_recv.assert_yields_only([(3, ()), (3, ())]).await;
});
compiled_sim.exhaustive(async || {
in_send.send((4, Ok::<(), ()>(())));
in_send.send((4, Ok(())));
in_send.send((4, Ok(())));
success_recv.assert_yields_only_unordered([4]).await;
error_recv.assert_no_more().await;
});
compiled_sim.exhaustive(async || {
in_send.send((5, Err::<(), ()>(())));
in_send.send((5, Err(())));
in_send.send((5, Err(())));
success_recv.assert_no_more().await;
error_recv
.assert_yields_only([(5, ()), (5, ()), (5, ())])
.await;
});
compiled_sim.exhaustive(async || {
in_send.send((6, Err::<(), ()>(())));
in_send.send((6, Ok(())));
in_send.send((6, Ok(())));
success_recv.assert_yields_only_unordered([6]).await;
error_recv.assert_yields_only([(6, ())]).await;
});
}
#[test]
fn collect_quorum_min_equals_max() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input();
let success_recv = collect_quorum(input, 2, 2).0.sim_output();
flow.sim().exhaustive(async || {
in_send.send((1, Ok::<(), ()>(())));
in_send.send((1, Ok(())));
in_send.send((2, Ok(())));
in_send.send((2, Err(())));
in_send.send((3, Ok(())));
in_send.send((3, Ok(())));
success_recv.assert_yields_only_unordered([1, 3]).await;
});
}
#[test]
fn collect_quorum_single_response() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input();
let success_recv = collect_quorum(input, 1, 1).0.sim_output();
flow.sim().exhaustive(async || {
in_send.send((1, Ok::<(), ()>(())));
in_send.send((2, Err(())));
in_send.send((3, Ok(())));
success_recv.assert_yields_only_unordered([1, 3]).await;
});
}
#[test]
fn collect_quorum_no_responses() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (_in_send, input) = node.sim_input::<_, TotalOrder, _>();
let success_recv = {
let (success, _error) = collect_quorum::<_, _, i32, ()>(input, 2, 3);
success.sim_output()
};
flow.sim().exhaustive(async || {
success_recv.assert_no_more().await;
});
}
#[test]
fn collect_quorum_no_double_quorum_before_max() {
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
let success_recv = collect_quorum(input, 2, 4).0.sim_output();
flow.sim().exhaustive(async || {
in_send.send((1, Ok::<(), ()>(())));
in_send.send((1, Ok(())));
in_send.send((1, Ok(())));
in_send.send((1, Ok(())));
in_send.send((2, Err(())));
in_send.send((2, Ok(())));
in_send.send((2, Ok(())));
in_send.send((2, Err(())));
success_recv.assert_yields_only_unordered([1, 2]).await;
});
}
}