use std::net::SocketAddr;
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::{UdpSink, UdpStream};
use crate::helpers::decide;
use crate::protocol::{CoordMsg, MsgType, SubordResponse};
use crate::{Addresses, Opts};
pub(crate) async fn run_subordinate(outbound: UdpSink, inbound: UdpStream, opts: Opts) {
println!("Subordinate live!");
let path = opts.path();
let mut df: Hydroflow = hydroflow_syntax! {
server_addr = source_json(path)
-> map(|json: Addresses| json.coordinator)
-> map(|s| s.parse::<SocketAddr>().unwrap())
-> inspect(|coordinator| println!("Coordinator: {}", coordinator));
server_addr_join = cross_join::<'tick, 'static>();
server_addr -> [1]server_addr_join;
outbound_chan = union() -> [0]server_addr_join -> tee();
outbound_chan[0] -> dest_sink_serde(outbound);
inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap) -> map(|(m, _a)| m) -> tee();
msgs = inbound_chan[0] -> demux(|m:CoordMsg, var_args!(prepares, p2, ends, errs)| match m.mtype {
MsgType::Prepare => prepares.give(m),
MsgType::Abort => p2.give(m),
MsgType::Commit => p2.give(m),
MsgType::End {..} => ends.give(m),
_ => errs.give(m),
});
msgs[errs] -> for_each(|m| println!("Received unexpected message type: {:?}", m));
inbound_chan[1] -> for_each(|m| println!("Received {:?}", m));
outbound_chan[1] -> for_each(|m| println!("Sending {:?}", m));
report_chan = msgs[prepares] -> map(|m: CoordMsg| SubordResponse {
xid: m.xid,
mtype: if decide(80) { MsgType::Commit } else { MsgType::Abort }
});
report_chan -> [0]outbound_chan;
p2_response = map(|(m, t)| SubordResponse {
xid: m.xid,
mtype: t,
}) -> [1]outbound_chan;
msgs[p2] -> map(|m:CoordMsg| (m, MsgType::AckP2)) -> p2_response;
msgs[ends] -> map(|m:CoordMsg| SubordResponse {
xid: m.xid,
mtype: MsgType::Ended,
}) -> [2]outbound_chan;
};
#[cfg(feature = "debugging")]
if let Some(graph) = opts.graph {
let serde_graph = df
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, opts.write_config).unwrap();
}
df.run_async().await.unwrap();
}