use flarch::web_rtc::connection::ConnectionConfig;
use flmodules::network::network_start;
use flmodules::timer::BrokerTimer;
use log::{error, info};
use std::collections::HashMap;
use thiserror::Error;
use tokio::sync::watch;
use flarch::{broker::BrokerError, nodeids::NodeID};
use flarch::{
data_storage::{DataStorage, StorageError},
tasks::now,
};
use flmodules::{
dht_router::{broker::DHTRouter, kademlia},
dht_storage::{self, broker::DHTStorage, core::DHTConfig},
flo::storage::CryptoStorage,
gossip_events::{
broker::Gossip,
core::{self, Category},
},
network::broker::{BrokerNetwork, NetworkError, NetworkIn},
nodeconfig::{ConfigError, NodeConfig, NodeInfo},
random_connections::broker::RandomBroker,
router::broker::{BrokerRouter, RouterNetwork, RouterRandom},
timer::Timer,
web_proxy::{
broker::{WebProxy, WebProxyError},
core::WebProxyConfig,
},
Modules,
};
use crate::stat::{NetStats, NetworkStats};
#[derive(Error, Debug)]
pub enum NodeError {
#[error("Couldn't get lock")]
Lock,
#[error("Missing subsystem {0}")]
Missing(String),
#[error(transparent)]
Config(#[from] ConfigError),
#[error(transparent)]
Storage(#[from] StorageError),
#[error(transparent)]
Network(#[from] NetworkError),
#[error(transparent)]
Broker(#[from] BrokerError),
#[error(transparent)]
Yaml(#[from] serde_yaml::Error),
#[error(transparent)]
WebProxy(#[from] WebProxyError),
#[error(transparent)]
DHTStorage(#[from] dht_storage::broker::StorageError),
}
pub struct Node {
pub node_config: NodeConfig,
pub storage: Box<dyn DataStorage + Send>,
pub broker_net: BrokerNetwork,
pub network_io: BrokerRouter,
pub timer: BrokerTimer,
pub crypto_storage: CryptoStorage,
pub stat: Option<watch::Receiver<NetStats>>,
pub random: Option<RandomBroker>,
pub gossip: Option<Gossip>,
pub webproxy: Option<WebProxy>,
pub dht_router: Option<DHTRouter>,
pub dht_storage: Option<DHTStorage>,
}
impl std::fmt::Debug for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Node")
.field("node_config", &self.node_config)
.field("storage", &self.storage)
.field("broker_net", &self.broker_net)
.field("network_io", &self.network_io)
.field("timer", &self.timer)
.field("stat", &self.stat)
.field("dht_storage", &self.dht_storage)
.finish()
}
}
const STORAGE_CONFIG: &str = "nodeConfig";
impl Node {
pub async fn start_network(
storage: Box<dyn DataStorage + Send>,
node_config: NodeConfig,
net_config: ConnectionConfig,
) -> anyhow::Result<Self> {
let mut timer = Timer::start().await?;
let net = network_start(node_config.clone(), net_config, &mut timer).await?;
Node::start(storage, node_config, net.broker, timer.broker).await
}
pub async fn start(
storage: Box<dyn DataStorage + Send>,
node_config: NodeConfig,
broker_net: BrokerNetwork,
timer: BrokerTimer,
) -> anyhow::Result<Self> {
info!(
"Starting node: {} = {}",
node_config.info.name,
node_config.info.get_id()
);
let network_io = RouterNetwork::start(broker_net.clone()).await?;
let modules = node_config.info.modules;
let id = node_config.info.get_id();
let mut random = None;
let mut gossip = None;
let mut webproxy = None;
let mut stat = None;
let mut dht_router = None;
let mut dht_storage = None;
if modules.contains(Modules::RAND) {
let rnd = RandomBroker::start(id, timer.clone(), broker_net.clone()).await?;
if modules.contains(Modules::GOSSIP) {
gossip = Some(
Gossip::start(
storage.clone(),
node_config.info.clone(),
timer.clone(),
rnd.broker.clone(),
)
.await?,
);
}
if modules.contains(Modules::WEBPROXY) {
webproxy = Some(
WebProxy::start(
storage.clone_box(),
id,
RouterRandom::start(rnd.broker.clone()).await?,
WebProxyConfig::default(),
)
.await?,
);
}
random = Some(rnd);
}
if modules.contains(Modules::STAT) {
stat = Some(NetworkStats::start(broker_net.clone()).await?)
}
if modules.contains(Modules::DHT_ROUTER) {
let routing = DHTRouter::start(
kademlia::Config::default(id),
timer.clone(),
network_io.clone(),
)
.await?;
if modules.contains(Modules::DHT_STORAGE) {
dht_storage = Some(
DHTStorage::start(
storage.clone_box(),
DHTConfig::default(id),
timer.clone(),
routing.broker.clone(),
)
.await?,
);
}
dht_router = Some(routing);
}
Ok(Self {
crypto_storage: CryptoStorage::new(storage.clone()),
storage,
node_config,
broker_net,
network_io,
timer,
stat,
random,
gossip,
webproxy,
dht_router,
dht_storage,
})
}
pub async fn request_list(&mut self) -> anyhow::Result<()> {
self.broker_net
.emit_msg_in(NetworkIn::WSUpdateListRequest)?;
Ok(())
}
pub fn nodes_info(&self, ids: Vec<NodeID>) -> anyhow::Result<Vec<NodeInfo>> {
let mut nodeinfos = self.nodes_info_all()?;
Ok(ids.iter().filter_map(|id| nodeinfos.remove(&id)).collect())
}
pub fn nodes_connected(&self) -> anyhow::Result<Vec<NodeInfo>> {
if let Some(r) = self.random.as_ref() {
return self.nodes_info(r.stats.borrow().connected.get_nodes().0);
}
Err(NodeError::Missing("Random".into()).into())
}
pub fn nodes_online(&self) -> anyhow::Result<Vec<NodeInfo>> {
if let Some(r) = self.random.as_ref() {
return self.nodes_info(r.stats.borrow().known.0.clone());
}
Err(NodeError::Missing("Random".into()).into())
}
pub fn nodes_info_all(&self) -> anyhow::Result<HashMap<NodeID, NodeInfo>> {
if let Some(g) = self.gossip.as_ref() {
let events = g.events(Category::NodeInfo);
let mut nodeinfos = HashMap::new();
for ni in events {
match NodeInfo::decode(&ni.msg) {
Ok(info) => {
nodeinfos.insert(info.get_id(), info);
}
Err(e) => log::error!("Parse-error {e:?} for {}", ni.msg),
}
}
Ok(nodeinfos)
} else {
Err(NodeError::Missing("Gossip".into()).into())
}
}
pub async fn add_chat_message(&mut self, msg: String) -> anyhow::Result<()> {
if let Some(g) = self.gossip.as_mut() {
let event = core::Event {
category: core::Category::TextMessage,
src: self.node_config.info.get_id(),
created: now(),
msg,
};
g.add_event(event).await?;
Ok(())
} else {
Err(NodeError::Missing("Gossip".into()).into())
}
}
pub fn get_config(storage: Box<dyn DataStorage>) -> anyhow::Result<NodeConfig> {
let config_str = match storage.get(STORAGE_CONFIG) {
Ok(s) => s,
Err(_) => {
log::info!("Couldn't load configuration - start with empty");
"".to_string()
}
};
let mut config = NodeConfig::decode(&config_str);
#[cfg(target_family = "wasm")]
let enable_webproxy_request = false;
#[cfg(target_family = "unix")]
let enable_webproxy_request = true;
config
.info
.modules
.set(Modules::WEBPROXY_REQUESTS, enable_webproxy_request);
Self::set_config(storage, &config.encode())?;
Ok(config)
}
pub fn set_config(mut storage: Box<dyn DataStorage>, config: &str) -> anyhow::Result<()> {
storage.set(STORAGE_CONFIG, config)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use flarch::{
broker::Broker, data_storage::DataStorageTemp, start_logging, start_logging_filter_level,
};
use flmodules::gossip_events::{
broker::GossipIn,
core::{Category, Event},
};
use super::*;
#[tokio::test]
async fn test_storage() -> anyhow::Result<()> {
start_logging();
let storage = DataStorageTemp::new();
let nc = NodeConfig::new();
let mut nd = Node::start(
storage.clone_box(),
nc.clone(),
Broker::new(),
Timer::start().await?.broker,
)
.await?;
let event = Event {
category: Category::TextMessage,
src: nc.info.get_id(),
created: 0,
msg: "something".into(),
};
nd.gossip
.as_mut()
.unwrap()
.broker
.settle_msg_in(GossipIn::AddEvent(event.clone()).into())
.await?;
let nd2 = Node::start(
storage.clone_box(),
nc.clone(),
Broker::new(),
Timer::start().await?.broker,
)
.await?;
let events = nd2
.gossip
.unwrap()
.storage
.borrow()
.events(Category::TextMessage);
assert_eq!(1, events.len());
assert_eq!(&event, events.get(0).unwrap());
Ok(())
}
#[tokio::test]
async fn test_store_node() -> anyhow::Result<()> {
start_logging_filter_level(vec![], log::LevelFilter::Info);
let mut node = Node::start(
Box::new(DataStorageTemp::new()),
NodeConfig::new(),
Broker::new(),
Timer::start().await?.broker,
)
.await?;
node.gossip.as_mut().unwrap().broker.settle(vec![]).await?;
log::debug!(
"storage is: {:?}",
node.gossip.as_ref().unwrap().storage.borrow()
);
assert_eq!(1, node.gossip.unwrap().events(Category::NodeInfo).len());
Ok(())
}
}