use std::net::SocketAddr;
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::{UdpSink, UdpStream};
use crate::helpers::parse_out;
use crate::protocol::{CoordMsg, MsgType, SubordResponse};
use crate::{Addresses, Opts};
pub(crate) async fn run_coordinator(outbound: UdpSink, inbound: UdpStream, opts: Opts) {
println!("Coordinator live!");
let path = opts.path();
let mut df: Hydroflow = hydroflow_syntax! {
subords = source_json(path)
-> flat_map(|json: Addresses| json.subordinates)
-> map(|s| s.parse::<SocketAddr>().unwrap())
-> tee();
phase_map = union() -> persist_mut_keyed::<'static>();
outbound_chan = tee();
outbound_chan[0] -> dest_sink_serde(outbound);
inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap) -> map(|(m, _a)| m) -> tee();
msgs = inbound_chan[0] -> demux(|m:SubordResponse, var_args!(commits, aborts, acks, endeds, errs)| match m.mtype {
MsgType::Commit => commits.give(m),
MsgType::Abort => aborts.give(m),
MsgType::AckP2 {..} => acks.give(m),
MsgType::Ended {..} => endeds.give(m),
_ => errs.give(m),
});
msgs[errs] -> for_each(|m| println!("Received unexpected message type: {:?}", m));
msgs[endeds]
-> map(|m: SubordResponse| hydroflow::util::PersistenceKeyed::Delete(m.xid))
-> defer_tick()
-> phase_map;
inbound_chan[1] -> for_each(|m| println!("Received {:?}", m));
outbound_chan[1] -> for_each(|(m, a)| println!("Sending {:?} to {:?}", m, a));
broadcast_join = cross_join::<'tick, 'static>() -> outbound_chan;
broadcast = union() -> [0]broadcast_join;
subords[1] -> [1]broadcast_join;
subords[2] -> for_each(|s| println!("Subordinate: {:?}", s));
initiate = source_stdin()
-> filter_map(|l: Result<std::string::String, std::io::Error>| parse_out(l.unwrap()))
-> tee();
initiate
-> flat_map(|xid: u16| [hydroflow::util::PersistenceKeyed::Delete(xid), hydroflow::util::PersistenceKeyed::Persist(xid, 1)])
-> phase_map;
initiate
-> map(|xid:u16| CoordMsg{xid, mtype: MsgType::Prepare})
-> [0]broadcast;
abort_p1s = msgs[aborts] -> tee();
abort_p1s
-> flat_map(|m: SubordResponse| [hydroflow::util::PersistenceKeyed::Delete(m.xid), hydroflow::util::PersistenceKeyed::Persist(m.xid, 2)])
-> defer_tick()
-> phase_map;
abort_p1s
-> map(|m: SubordResponse| CoordMsg{xid: m.xid, mtype: MsgType::Abort})
-> [1]broadcast;
commit_votes = msgs[commits]
-> map(|m: SubordResponse| (m.xid, 1))
-> fold_keyed::<'static, u16, u32>(|| 0, |acc: &mut _, val| *acc += val);
subord_total = subords[0] -> fold::<'static>(|| 0, |a: &mut _, _b| *a += 1);
commit_votes -> map(|(xid, c)| (c, xid)) -> [0]committed;
subord_total -> map(|c| (c, ())) -> [1]committed;
committed = join::<'tick,'tick>() -> map(|(_c, (xid, ()))| xid);
committed -> map(|xid| (xid, ())) -> [0]check_committed;
phase_map -> [1]check_committed;
check_committed = join::<'tick, 'tick>()
-> map(|(xid, (_, phase))| (xid, phase))
-> filter(|(_xid, phase)| *phase == 1)
-> map(|(xid, _phase)| xid)
-> tee();
check_committed
-> flat_map(|xid| [hydroflow::util::PersistenceKeyed::Delete(xid), hydroflow::util::PersistenceKeyed::Persist(xid, 2)])
-> defer_tick()
-> phase_map;
check_committed
-> map(|xid| CoordMsg{xid, mtype: MsgType::Commit})
-> [2]broadcast;
ack_p2s = msgs[acks] -> tee();
ack_p2s
-> flat_map(|m: SubordResponse| [hydroflow::util::PersistenceKeyed::Delete(m.xid), hydroflow::util::PersistenceKeyed::Persist(m.xid, 3)])
-> defer_tick()
-> phase_map;
ack_p2s
-> map(|m:SubordResponse| CoordMsg{xid: m.xid, mtype: MsgType::End,})
-> [3]broadcast;
};
#[cfg(feature = "debugging")]
if let Some(graph) = opts.graph {
let serde_graph = df
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, opts.write_config).unwrap();
}
df.run_async().await.unwrap();
}