use crate::error::LiquidError;
use crate::network::{message, Connection, ControlMsg, Message, MessageCodec};
use log::info;
use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::io::split;
use tokio::net::TcpListener;
use tokio_util::codec::{FramedRead, FramedWrite};
#[derive(Debug)]
pub struct Server {
pub(crate) address: SocketAddr,
pub(crate) msg_id: usize,
pub(crate) directory:
HashMap<String, HashMap<usize, Connection<ControlMsg>>>,
}
impl Server {
pub async fn new(address: &str) -> Result<Self, LiquidError> {
Ok(Server {
msg_id: 0,
directory: HashMap::new(),
address: address.parse().unwrap(),
})
}
pub async fn accept_new_connections(&mut self) -> Result<(), LiquidError> {
let mut listener = TcpListener::bind(&self.address).await?;
loop {
let (socket, _) = listener.accept().await?;
let (reader, writer) = split(socket);
let mut stream = FramedRead::new(reader, MessageCodec::new());
let sink = FramedWrite::new(writer, MessageCodec::new());
let address = message::read_msg(&mut stream).await?;
let (address, network_name) = if let ControlMsg::Introduction {
address,
network_name,
} = address.msg
{
(address, network_name)
} else {
return Err(LiquidError::UnexpectedMessage);
};
let conn = Connection { address, sink };
let target_id;
let dir;
match self.directory.get_mut(&network_name) {
Some(d) => {
target_id = d.len() + 1; dir = d.iter().map(|(k, v)| (*k, v.address)).collect();
d.insert(target_id, conn);
}
None => {
target_id = 1;
dir = Vec::new();
let mut d = HashMap::new();
d.insert(target_id, conn);
self.directory.insert(network_name.clone(), d);
}
};
info!(
"Connected to address: {:#?} joining network {:#?}, assigning id: {:#?}",
&address,
&network_name,
target_id
);
let dir_msg = ControlMsg::Directory { dir };
self.send_msg(target_id, &network_name, dir_msg).await?;
}
}
pub async fn send_msg(
&mut self,
target_id: usize,
network_name: &str,
message: ControlMsg,
) -> Result<(), LiquidError> {
let m = Message::new(self.msg_id, 0, target_id, message);
message::send_msg(
target_id,
m,
self.directory.get_mut(network_name).unwrap(),
)
.await?;
self.msg_id += 1;
Ok(())
}
pub async fn broadcast(
&mut self,
message: ControlMsg,
network_name: &str,
) -> Result<(), LiquidError> {
let d: Vec<usize> = self
.directory
.iter()
.find(|(k, _)| **k == network_name)
.unwrap()
.1
.iter()
.map(|(k, _)| *k)
.collect();
for k in d {
self.send_msg(k, network_name, message.clone()).await?;
}
Ok(())
}
}