use std::hash::Hash;
use hydro_lang::live_collections::stream::NoOrder;
use hydro_lang::location::{Location, NoTick};
use hydro_lang::prelude::*;
type JoinResponses<K, M, V, L> = Stream<(K, (M, V)), L, Unbounded, NoOrder>;
pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location<'a> + NoTick>(
responses: Stream<(K, V), L, Unbounded, NoOrder>,
metadata: Stream<(K, M), Tick<L>, Bounded, NoOrder>,
) -> JoinResponses<K, M, V, L> {
sliced! {
let mut remaining_to_join = use::state_null::<Stream<(K, M), _, _, NoOrder>>();
let response_batch = use(responses, nondet!(
));
let metadata_batch = use::atomic(metadata.all_ticks_atomic(), nondet!(
));
let remaining_and_new = remaining_to_join.chain(metadata_batch);
let joined_this_tick = remaining_and_new
.clone()
.join(response_batch.clone())
.map(q!(|(key, (meta, resp))| (key, (meta, resp))));
remaining_to_join = remaining_and_new.anti_join(response_batch.map(q!(|(key, _)| key)));
joined_this_tick
}
}
#[cfg(test)]
mod tests {
use hydro_lang::prelude::*;
use super::*;
#[test]
fn test_join_responses_basic() {
let mut flow = FlowBuilder::new();
let process = flow.process::<()>();
let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
let metadata_processing = metadata_input.atomic();
let metadata_ack = metadata_processing.clone().end_atomic();
let metadata = metadata_processing
.batch_atomic(&process.tick(), nondet!())
.weaken_ordering();
let joined = join_responses(responses.weaken_ordering(), metadata);
let metadata_ack_recv = metadata_ack.sim_output();
let joined_recv = joined.sim_output();
flow.sim().exhaustive(async || {
metadata_send.send((1, 42));
metadata_ack_recv.assert_yields([(1, 42)]).await;
response_send.send((1, "hello".to_owned()));
joined_recv
.assert_yields_unordered([(1, (42, "hello".to_owned()))])
.await;
});
}
#[test]
fn test_join_responses_metadata_persists() {
let mut flow = FlowBuilder::new();
let process = flow.process::<()>();
let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
let metadata_processing = metadata_input.atomic();
let metadata_ack = metadata_processing.clone().end_atomic();
let metadata = metadata_processing
.batch_atomic(&process.tick(), nondet!())
.weaken_ordering();
let joined = join_responses(responses.weaken_ordering(), metadata);
let metadata_ack_recv = metadata_ack.sim_output();
let joined_recv = joined.sim_output();
flow.sim().exhaustive(async || {
metadata_send.send_many([(1, 10), (2, 20)]);
metadata_ack_recv.assert_yields([(1, 10), (2, 20)]).await;
response_send.send_many([(2, "two".to_owned()), (1, "one".to_owned())]);
joined_recv
.assert_yields_only_unordered([
(1, (10, "one".to_owned())),
(2, (20, "two".to_owned())),
])
.await;
});
}
#[test]
fn test_join_responses_no_metadata() {
let mut flow = FlowBuilder::new();
let process = flow.process::<()>();
let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
let metadata_processing = metadata_input.atomic();
let metadata_ack = metadata_processing.clone().end_atomic();
let metadata = metadata_processing
.batch_atomic(&process.tick(), nondet!())
.weaken_ordering();
let joined = join_responses(responses.weaken_ordering(), metadata);
let metadata_ack_recv = metadata_ack.sim_output();
let joined_recv = joined.sim_output();
flow.sim().exhaustive(async || {
metadata_send.send((1, 42));
metadata_ack_recv.assert_yields([(1, 42)]).await;
response_send.send_many([(1, "matched".to_owned()), (2, "unmatched".to_owned())]);
joined_recv
.assert_yields_only_unordered([(1, (42, "matched".to_owned()))])
.await;
});
}
#[test]
fn test_join_responses_metadata_removed_after_match() {
let mut flow = FlowBuilder::new();
let process = flow.process::<()>();
let (response_send, responses) = process.sim_input::<(u32, String), _, _>();
let (metadata_send, metadata_input) = process.sim_input::<(u32, i32), _, _>();
let metadata_processing = metadata_input.atomic();
let metadata_ack = metadata_processing.clone().end_atomic();
let metadata = metadata_processing
.batch_atomic(&process.tick(), nondet!())
.weaken_ordering();
let joined = join_responses(responses.weaken_ordering(), metadata);
let metadata_ack_recv = metadata_ack.sim_output();
let joined_recv = joined.sim_output();
flow.sim().exhaustive(async || {
metadata_send.send((1, 42));
metadata_ack_recv.assert_yields([(1, 42)]).await;
response_send.send((1, "first".to_owned()));
joined_recv
.assert_yields_unordered([(1, (42, "first".to_owned()))])
.await;
response_send.send((1, "second".to_owned()));
joined_recv.assert_no_more().await;
});
}
}