use flarch::{add_translator_direct, add_translator_link, broker::Broker, nodeids::U256};
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use crate::{
flo::realm::RealmID,
router::{broker::BrokerRouter, messages::NetworkWrapper},
timer::{BrokerTimer, Timer},
};
use flarch::nodeids::NodeID;
use super::{
intern::{Intern, InternIn, InternOut, Stats},
kademlia::Config,
};
pub(super) const MODULE_NAME: &str = "DHTRouter";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum DHTRouterIn {
MessageBroadcast(NetworkWrapper),
MessageNeighbour(NodeID, NetworkWrapper),
MessageClosest(U256, NetworkWrapper),
MessageDirect(NodeID, NetworkWrapper),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum DHTRouterOut {
MessageNeighbour(NodeID, NetworkWrapper),
MessageRouting(NodeID, NodeID, NodeID, U256, NetworkWrapper),
MessageClosest(NodeID, NodeID, U256, NetworkWrapper),
MessageDest(NodeID, NodeID, NetworkWrapper),
NodeList(Vec<NodeID>),
SystemRealm(RealmID),
}
pub type BrokerDHTRouter = Broker<DHTRouterIn, DHTRouterOut>;
#[derive(Clone, Debug)]
pub struct DHTRouter {
pub broker: BrokerDHTRouter,
pub stats: watch::Receiver<Stats>,
}
impl DHTRouter {
pub async fn start(
config: Config,
timer: BrokerTimer,
router: BrokerRouter,
) -> anyhow::Result<Self> {
let (messages, stats) = Intern::new(config);
let mut intern = Broker::new_with_handler(Box::new(messages)).await?.0;
add_translator_link!(intern, router, InternIn::Network, InternOut::Network);
let broker = Broker::new();
add_translator_direct!(
intern,
broker.clone(),
InternIn::DHTRouter,
InternOut::DHTRouter
);
#[cfg(feature = "testing")]
Timer::second(timer, intern, InternIn::Tick).await?;
#[cfg(not(feature = "testing"))]
Timer::minute(timer, intern, InternIn::Tick).await?;
Ok(DHTRouter { broker, stats })
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use flarch::{start_logging_filter_level, tasks::wait_ms};
use crate::{
router::messages::RouterOut, testing::router_simul::RouterSimul, timer::TimerMessage,
};
use super::*;
const LOG_LVL: log::LevelFilter = log::LevelFilter::Info;
#[tokio::test]
async fn test_routing() -> anyhow::Result<()> {
start_logging_filter_level(vec![], LOG_LVL);
let mut timer = Broker::new();
let mut config = Config {
root: NodeID::rnd(),
k: 1,
ping_interval: 2,
ping_timeout: 4,
};
let mut simul = RouterSimul::new().await?;
let mut node_ids = vec![];
let mut node_infos = vec![];
let mut router_nets = vec![];
let mut dht_nets = vec![];
let mut dhts = vec![];
let mut taps = vec![];
for start in ["00", "40", "41", "42", "43"] {
let (node_conf, brok) = simul.new_node_id(Some(U256::from_str(start)?)).await?;
config.root = node_conf.info.get_id();
let mut dht = DHTRouter::start(config.clone(), timer.clone(), brok.clone()).await?;
router_nets.push(brok);
taps.push(dht.broker.get_tap_out().await?.0);
dht_nets.push(dht.broker.clone());
dhts.push(dht);
node_ids.push(config.root);
node_infos.push(node_conf.info);
}
log::info!("After for");
for mut net in router_nets {
net.settle_msg_out(RouterOut::NodeInfoAvailable(node_infos.clone()))
.await?;
}
timer.settle_msg_out(TimerMessage::Second).await?;
let dhtm_closest = |id: NodeID, module: &str, msg: &str| {
DHTRouterIn::MessageClosest(
id,
NetworkWrapper {
module: module.into(),
msg: msg.into(),
},
)
};
let mut nodes = node_ids.clone();
nodes.push(U256::from_str("44")?);
let mut routing = 0;
for (i, node) in nodes.iter().enumerate() {
if i == 0 {
continue;
}
log::info!("{i} Sending to node {node}");
dht_nets[0]
.settle_msg_in(dhtm_closest(*node, "Test", &format!("Msg{i}")))
.await?;
for b in &mut dht_nets {
b.settle(vec![]).await?;
}
let mut dest = 0;
let mut closest = 0;
for (j, tap) in taps.iter_mut().enumerate() {
while let Ok(msg) = tap.try_recv() {
log::debug!("{j} got {msg:?}");
match msg {
DHTRouterOut::MessageRouting(_, _, _, _, _) => routing += 1,
DHTRouterOut::MessageClosest(_, _, _, _) => closest += 1,
DHTRouterOut::MessageDest(_, _, _) => dest += 1,
_ => {}
}
}
}
match i {
5 => assert_eq!([0, 1], [dest, closest]),
_ => assert_eq!([1, 0], [dest, closest]),
}
}
assert!(routing > 0);
Ok(())
}
#[tokio::test]
async fn test_update_nodes() -> anyhow::Result<()> {
start_logging_filter_level(vec!["flmodules"], log::LevelFilter::Info);
let config = Config {
root: NodeID::rnd(),
k: 1,
ping_interval: 2,
ping_timeout: 4,
};
let mut simul = RouterSimul::new().await?;
let mut node_ids = vec![];
let mut node_infos = vec![];
let mut router_nets = vec![];
let mut dht_nets = vec![];
let mut taps = vec![];
for start in ["00", "40", "41"] {
let (node_conf, brok) = simul.new_node_id(Some(U256::from_str(start)?)).await?;
let id = node_conf.info.get_id();
let mut dht = DHTRouter::start(config, Broker::new(), brok.clone()).await?;
router_nets.push(brok);
taps.push(dht.broker.get_tap_out().await?.0);
dht_nets.push(dht.broker.clone());
node_ids.push(id);
node_infos.push(node_conf.info);
}
wait_ms(1000).await;
Ok(())
}
}