use anyhow::Result;
use flarch::{add_translator_direct, add_translator_link, broker::Broker, tasks::spawn_local};
use tokio::sync::watch;
use crate::{
network::broker::BrokerNetwork,
template::multi::intern::{Intern, InternIn, InternOut},
timer::{BrokerTimer, Timer},
};
#[derive(Debug, Clone, PartialEq)]
pub enum MultiIn {
Count,
}
#[derive(Debug, Clone, PartialEq)]
pub enum MultiOut {
State(Intern),
}
pub type BrokerMulti = Broker<MultiIn, MultiOut>;
pub struct Multi {
pub state: watch::Receiver<Intern>,
pub broker: BrokerMulti,
}
impl Multi {
pub async fn start(timer: BrokerTimer, network: BrokerNetwork) -> Result<Multi> {
let (tx, state) = watch::channel(Intern::default());
let mut state = Multi {
state,
broker: Self::broker(timer, network).await?,
};
let mut tap = state.broker.get_tap_out().await?.0;
spawn_local(async move {
while let Some(msg) = tap.recv().await {
match msg {
MultiOut::State(update) => tx.send(update).expect("Updating sum"),
}
}
});
Ok(state)
}
pub async fn broker(timer: BrokerTimer, network: BrokerNetwork) -> Result<BrokerMulti> {
let broker = Broker::new();
let mut intern = Intern::broker().await?;
add_translator_direct!(intern, broker.clone(), InternIn::Multi, InternOut::Multi);
add_translator_link!(intern, network, InternIn::Network, InternOut::Network);
Timer::minute(timer, intern, InternIn::Timer).await?;
Ok(broker)
}
}
#[cfg(test)]
mod test {
use flarch::nodeids::NodeID;
use crate::{
network::broker::{NetworkIn, NetworkOut},
nodeconfig::NodeInfo,
template::multi::intern::{ModuleMessage, MODULE_NAME},
timer::TimerMessage,
};
use super::*;
fn state_counter(counter: usize) -> Intern {
let mut m = Intern::default();
m.counter = counter;
m
}
fn state_counter_mo(counter: usize) -> MultiOut {
MultiOut::State(state_counter(counter))
}
#[tokio::test]
async fn test_update() -> Result<()> {
let mut timer = Broker::new();
let mut broker = Multi::broker(timer.clone(), Broker::new()).await?;
let mut tap = broker.get_tap_out().await?;
broker.emit_msg_in(MultiIn::Count)?;
broker.emit_msg_in(MultiIn::Count)?;
assert_eq!(Some(state_counter_mo(2)), tap.0.recv().await);
timer.emit_msg_out(TimerMessage::Minute)?;
assert_eq!(Some(state_counter_mo(0)), tap.0.recv().await);
Ok(())
}
#[tokio::test]
async fn test_state() -> Result<()> {
let mut timer = Broker::new();
let mut state = Multi::start(timer.clone(), Broker::new()).await?;
let mut tap = state.broker.get_tap_out().await?;
state.broker.emit_msg_in(MultiIn::Count)?;
assert_eq!(state_counter(0), *state.state.borrow());
state.broker.emit_msg_in(MultiIn::Count)?;
tap.0.recv().await;
assert_eq!(state_counter(2), *state.state.borrow());
timer.emit_msg_out(TimerMessage::Minute)?;
tap.0.recv().await;
assert_eq!(state_counter(0), *state.state.borrow());
Ok(())
}
#[tokio::test]
async fn test_network() -> Result<()> {
let mut timer = Broker::new();
let mut net = Broker::new();
let mut tap_net_out = net.get_tap_out().await?.0;
let mut tap_net_in = net.get_tap_in().await?.0;
let mut multi = Multi::broker(timer.clone(), net.clone()).await?;
let mut tap_multi = multi.get_tap_out().await?.0;
assert_eq!(true, tap_net_out.try_recv().is_err());
multi.emit_msg_in(MultiIn::Count)?;
multi.emit_msg_in(MultiIn::Count)?;
assert_eq!(state_counter_mo(2), tap_multi.recv().await.unwrap());
assert_eq!(true, tap_multi.try_recv().is_err());
timer.emit_msg_out(TimerMessage::Minute)?;
assert_eq!(
NetworkIn::WSUpdateListRequest,
tap_net_in.recv().await.unwrap()
);
assert_eq!(state_counter_mo(0), tap_multi.recv().await.unwrap());
let nodes = vec![
NodeInfo::new_from_id(NodeID::rnd()),
NodeInfo::new_from_id(NodeID::rnd()),
];
net.emit_msg_out(NetworkOut::NodeListFromWS(nodes.clone()))?;
multi.emit_msg_in(MultiIn::Count)?;
multi.emit_msg_in(MultiIn::Count)?;
let MultiOut::State(state) = tap_multi.recv().await.unwrap();
assert_eq!(2, state.nodes.len());
assert_eq!(2, state.counter);
let NetworkIn::MessageToNode(id1, msg1) = tap_net_in.recv().await.unwrap() else {
panic!("Got wrong message")
};
let NetworkIn::MessageToNode(id2, msg2) = tap_net_in.recv().await.unwrap() else {
panic!("Got wrong message")
};
assert_eq!(nodes[0].get_id(), id1);
assert_eq!(nodes[1].get_id(), id2);
assert_eq!(
Some(ModuleMessage::Counter(2)),
msg2.unwrap_yaml(MODULE_NAME)
);
net.emit_msg_out(NetworkOut::MessageFromNode(id1.clone(), msg1))?;
let MultiOut::State(state) = tap_multi.recv().await.unwrap();
assert_eq!(Some(&2usize), state.other.get(&id1));
Ok(())
}
}