use std::{fmt, time::Duration};
use thiserror::Error;
use tokio::sync::{mpsc::UnboundedReceiver, watch};
use tokio_stream::StreamExt;
use flarch::{
add_translator_direct, add_translator_link,
broker::{Broker, BrokerError},
nodeids::NodeID,
tasks::Interval,
web_rtc::{
messages::{ConnType, SetupError, SignalingState},
node_connection::{Direction, NCError},
websocket::BrokerWSClient,
BrokerWebRTCConn,
},
};
use crate::{
network::{
intern::{Intern, InternIn, InternOut},
signal::NodeStat,
},
nodeconfig::{NodeConfig, NodeInfo},
router::messages::NetworkWrapper,
timer::Timer,
};
use super::signal::FledgerConfig;
pub type BrokerNetwork = Broker<NetworkIn, NetworkOut>;
pub const MODULE_NAME: &str = "Network";
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, PartialEq)]
pub enum NetworkIn {
MessageToNode(NodeID, NetworkWrapper),
StatsToWS(Vec<NodeStat>),
WSUpdateListRequest,
Connect(NodeID),
Disconnect(NodeID),
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, PartialEq)]
pub enum NetworkOut {
MessageFromNode(NodeID, NetworkWrapper),
NodeListFromWS(Vec<NodeInfo>),
ConnectionState(NetworkConnectionState),
Connected(NodeID),
Disconnected(NodeID),
SystemConfig(FledgerConfig),
Error(String),
}
#[derive(Error, Debug)]
pub enum NetworkError {
#[error("Connection not found")]
ConnectionMissing,
#[error("Cannot connect to myself")]
ConnectMyself,
#[error("Signalling server doesn't respond")]
SignallingServer,
#[error(transparent)]
SerdeJSON(#[from] serde_json::Error),
#[error(transparent)]
NodeConnection(#[from] NCError),
#[error(transparent)]
Broker(#[from] BrokerError),
#[error(transparent)]
Setup(#[from] SetupError),
}
pub struct Network {
pub broker: BrokerNetwork,
pub connections: watch::Receiver<Vec<NodeID>>,
}
impl Network {
pub async fn start(
node_config: NodeConfig,
ws: BrokerWSClient,
web_rtc: BrokerWebRTCConn,
timer: &mut Timer,
) -> anyhow::Result<Self> {
let mut intern = Broker::new();
let (messages, connections) = Intern::start(node_config).await?;
intern.add_handler(Box::new(messages)).await?;
add_translator_link!(intern, ws, InternIn::WebSocket, InternOut::WebSocket);
add_translator_link!(intern, web_rtc, InternIn::WebRTC, InternOut::WebRTC);
let broker = Broker::new();
add_translator_direct!(
intern,
broker.clone(),
InternIn::Network,
InternOut::Network
);
timer.tick_second(intern, InternIn::Tick).await?;
Ok(Self {
broker,
connections,
})
}
}
pub struct NetworkWebRTC {
broker_net: BrokerNetwork,
tap: UnboundedReceiver<NetworkOut>,
}
impl NetworkWebRTC {
pub async fn start(mut broker_net: BrokerNetwork) -> anyhow::Result<Self> {
let (mut tap, _) = broker_net.get_tap_out().await?;
let mut timeout = Interval::new_interval(Duration::from_secs(10));
timeout.next().await;
loop {
tokio::select! {
_ = timeout.next() => {
return Err(NetworkError::SignallingServer.into());
}
msg = tap.recv() => {
if matches!(msg, Some(NetworkOut::NodeListFromWS(_))){
break;
}
}
}
}
Ok(Self { broker_net, tap })
}
pub async fn recv(&mut self) -> NetworkOut {
loop {
let msg = self.tap.recv().await;
if let Some(msg_reply) = msg {
return msg_reply;
}
}
}
pub fn send(&mut self, msg: NetworkIn) -> anyhow::Result<()> {
self.broker_net.emit_msg_in(msg)
}
pub fn send_list_request(&mut self) -> anyhow::Result<()> {
self.send(NetworkIn::WSUpdateListRequest)
}
}
impl fmt::Display for NetworkIn {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
NetworkIn::MessageToNode(_, _) => write!(f, "MessageToNode()"),
NetworkIn::StatsToWS(_) => write!(f, "StatsToWS()"),
NetworkIn::WSUpdateListRequest => write!(f, "WSUpdateListRequest"),
NetworkIn::Connect(_) => write!(f, "Connect()"),
NetworkIn::Disconnect(_) => write!(f, "Disconnect()"),
}
}
}
impl fmt::Display for NetworkOut {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
NetworkOut::MessageFromNode(_, _) => write!(f, "MessageFromNode()"),
NetworkOut::NodeListFromWS(_) => write!(f, "NodeListFromWS()"),
NetworkOut::ConnectionState(_) => write!(f, "ConnectionState()"),
NetworkOut::Connected(_) => write!(f, "Connected()"),
NetworkOut::Disconnected(_) => write!(f, "Disconnected()"),
NetworkOut::SystemConfig(_) => write!(f, "SystemConfig"),
NetworkOut::Error(_) => write!(f, "Error"),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct NetworkConnectionState {
pub id: NodeID,
pub dir: Direction,
pub s: ConnStats,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ConnStats {
pub type_local: ConnType,
pub type_remote: ConnType,
pub signaling: SignalingState,
pub rx_bytes: u64,
pub tx_bytes: u64,
pub delay_ms: u32,
}
#[cfg(test)]
mod tests {
use flarch::{nodeids::U256, start_logging};
use crate::network::signal::WSSignalMessageToNode;
#[test]
fn test_serialize() -> anyhow::Result<()> {
start_logging();
let cha = U256::rnd();
let msg = WSSignalMessageToNode::Challenge(2, cha);
let msg_str = serde_json::to_string(&msg)?;
log::debug!("Message string is: {msg_str}");
let msg_clone = serde_json::from_str(&msg_str)?;
assert_eq!(msg, msg_clone);
Ok(())
}
}