use flarch::{
add_translator_direct, add_translator_link, broker::Broker, data_storage::DataStorage,
nodeids::U256, tasks::now,
};
use tokio::sync::watch;
use super::{
core::{Category, Event, EventsStorage},
intern::{Intern, InternIn, InternOut},
};
use crate::{
nodeconfig::NodeInfo,
random_connections::broker::BrokerRandom,
timer::{BrokerTimer, Timer},
};
pub type BrokerGossip = Broker<GossipIn, GossipOut>;
pub(super) const MODULE_NAME: &str = "Gossip";
#[derive(Debug, Clone, PartialEq)]
pub enum GossipIn {
AddEvent(Event),
}
#[derive(Debug, Clone, PartialEq)]
pub enum GossipOut {
NewEvent(Event),
}
#[derive(Clone, Debug)]
pub struct Gossip {
pub broker: BrokerGossip,
pub storage: watch::Receiver<EventsStorage>,
pub info: NodeInfo,
}
impl Gossip {
pub async fn start(
storage: Box<dyn DataStorage + Send>,
info: NodeInfo,
timer: BrokerTimer,
rc: BrokerRandom,
) -> anyhow::Result<Self> {
let src = info.get_id();
let (messages, storage) = Intern::new(src.clone(), storage);
let mut intern = Broker::new_with_handler(Box::new(messages)).await?.0;
#[cfg(feature = "testing")]
Timer::second(timer, intern.clone(), InternIn::Tick).await?;
#[cfg(not(feature = "testing"))]
Timer::minute(timer, intern.clone(), InternIn::Tick).await?;
add_translator_link!(intern, rc, InternIn::Network, InternOut::Network);
let broker = Broker::new();
add_translator_direct!(intern, broker.clone(), InternIn::Gossip, InternOut::Gossip);
let mut gb = Gossip {
storage,
broker,
info: info.clone(),
};
gb.add_event(Event {
category: Category::NodeInfo,
src,
created: now(),
msg: info.encode(),
})
.await?;
Ok(gb)
}
pub async fn add_chat_message(&mut self, msg: String) -> anyhow::Result<()> {
self.add_event(Event {
category: Category::TextMessage,
src: self.info.get_id(),
created: now(),
msg,
})
.await
}
pub async fn add_event(&mut self, event: Event) -> anyhow::Result<()> {
self.broker.emit_msg_in(GossipIn::AddEvent(event))?;
Ok(())
}
pub fn chat_events(&self) -> Vec<Event> {
self.storage.borrow().events(Category::TextMessage)
}
pub fn event_ids(&self) -> Vec<U256> {
self.storage.borrow().event_ids()
}
pub fn event(&self, id: &U256) -> Option<Event> {
self.storage.borrow().event(id)
}
pub fn events(&self, cat: Category) -> Vec<Event> {
self.storage.borrow().events(cat)
}
}
#[cfg(test)]
mod tests {
use std::sync::mpsc;
use crate::gossip_events::core::{Category, Event};
use crate::gossip_events::intern::ModuleMessage;
use crate::nodeconfig::NodeConfig;
use crate::random_connections::broker::{RandomIn, RandomOut};
use crate::router::messages::NetworkWrapper;
use crate::timer::TimerMessage;
use flarch::data_storage::DataStorageTemp;
use flarch::nodeids::NodeID;
use flarch::start_logging_filter_level;
use flarch::tasks::now;
use super::*;
#[tokio::test]
async fn test_translation() -> anyhow::Result<()> {
start_logging_filter_level(vec![], log::LevelFilter::Info);
let node_info = NodeConfig::new().info;
let mut broker_rnd = Broker::new();
let mut timer = Timer::simul();
let gossip = Gossip::start(
Box::new(DataStorageTemp::new()),
node_info,
timer.broker.clone(),
broker_rnd.clone(),
)
.await?;
let id2 = NodeID::rnd();
let (tap_rnd, _) = broker_rnd.get_tap_in_sync().await?;
broker_rnd
.settle_msg_out(RandomOut::NodeIDsConnected(vec![id2].into()))
.await?;
assert_msg_reid(&tap_rnd, &id2)?;
tap_rnd.recv()?;
let event = Event {
category: Category::TextMessage,
src: id2,
created: now(),
msg: "test_msg".into(),
};
let msg = ModuleMessage::Events(vec![event.clone()]);
broker_rnd
.settle_msg_out(
RandomOut::NetworkWrapperFromNetwork(
id2,
NetworkWrapper::wrap_yaml(MODULE_NAME, &msg).unwrap(),
)
.into(),
)
.await?;
assert_eq!(1, gossip.storage.borrow().events(event.category).len());
timer.broker.settle_msg_out(TimerMessage::Second).await?;
timer.broker.settle_msg_out(TimerMessage::Second).await?;
timer.broker.settle_msg_out(TimerMessage::Second).await?;
assert_msg_reid(&tap_rnd, &id2)?;
Ok(())
}
fn assert_msg_reid(tap: &mpsc::Receiver<RandomIn>, id2: &NodeID) -> anyhow::Result<()> {
let msg = tap.recv()?;
log::info!("Message is: {msg:?}");
if let RandomIn::NetworkWrapperToNetwork(id, msg_mod) = msg {
assert_eq!(id2, &id);
assert_eq!(MODULE_NAME.to_string(), msg_mod.module);
let msg_yaml = serde_yaml::from_str(&msg_mod.msg)?;
assert_eq!(ModuleMessage::RequestEventIDs, msg_yaml);
Ok(())
} else {
anyhow::bail!("Message was {msg:?} instead of RequestEventIDs")
}
}
}