use serde::Serialize;
use serde::de::DeserializeOwned;
use crate::forward_handle::ForwardHandle;
use crate::live_collections::KeyedStream;
use crate::live_collections::stream::TotalOrder;
use crate::location::{Cluster, NoTick};
use crate::nondet::nondet;
#[cfg(stageleft_runtime)]
#[cfg(feature = "maelstrom")]
#[cfg_attr(docsrs, doc(cfg(feature = "maelstrom")))]
pub mod deploy_maelstrom;
pub mod deploy_runtime_maelstrom;
#[expect(clippy::type_complexity, reason = "stream markers")]
pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
cluster: &Cluster<'a, C>,
) -> (
KeyedStream<String, In, Cluster<'a, C>>,
ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
)
where
Cluster<'a, C>: NoTick,
{
use stageleft::q;
use crate::location::Location;
let meta: stageleft::RuntimeData<&deploy_runtime_maelstrom::MaelstromMeta> =
stageleft::RuntimeData::new("__hydro_lang_maelstrom_meta");
let input: KeyedStream<String, In, Cluster<'a, C>> = cluster
.source_stream(q!(deploy_runtime_maelstrom::maelstrom_client_source(meta)))
.into_keyed()
.map(q!(|b| serde_json::from_value(b).unwrap()));
let (fwd_handle, output_stream) =
cluster.forward_ref::<KeyedStream<String, Out, Cluster<'a, C>>>();
output_stream
.entries()
.assume_ordering::<TotalOrder>(nondet!())
.for_each(q!(|(client_id, body)| {
deploy_runtime_maelstrom::maelstrom_send_response(
&meta.node_id,
&client_id,
serde_json::to_value(body).unwrap(),
);
}));
(input, fwd_handle)
}