use std::{
io,
marker::PhantomData,
net::{TcpListener, TcpStream, ToSocketAddrs},
};
use serde::{de::DeserializeOwned, Serialize};
use crate::{
pkg::{Pkg, PkgData},
stog, PeerID,
};
pub struct Host<T: DeserializeOwned + Serialize> {
pub server: TcpListener,
pub clients: Vec<Option<TcpStream>>,
pub id: PeerID,
pub peers: Vec<Option<String>>,
pub _marker: PhantomData<T>,
}
impl<T: DeserializeOwned + Serialize> Host<T> {
pub fn bind<A>(addr: A) -> io::Result<Self>
where
A: ToSocketAddrs,
{
let listener = TcpListener::bind(addr)?;
listener
.set_nonblocking(true)
.expect("can't set nonblocking for host");
let fmt_addr = listener.local_addr()?.to_string();
let s = Self {
server: listener,
clients: vec![None], id: 0, peers: vec![Some(fmt_addr)],
_marker: PhantomData,
};
Ok(s)
}
pub fn get_next_free_id(&self) -> PeerID {
self.clients[1..]
.iter()
.position(Option::is_none)
.map_or_else(|| self.clients.len(), |p| p) as PeerID
}
pub fn get_active_clients_mut(&mut self) -> impl Iterator<Item = (PeerID, &mut TcpStream)> {
self.clients
.iter_mut()
.enumerate()
.filter_map(|(i, p)| p.as_mut().map(|p| (i as PeerID, p)))
}
pub fn get_active_clients_id(&self) -> impl Iterator<Item = PeerID> + '_ {
self.clients.iter().enumerate().filter_map(|(i, p)| {
if p.is_some() {
Some(i as PeerID)
} else {
None
}
})
}
pub fn send_to_all(&mut self, pkg: &Pkg<T>) -> stog::Result<()> {
for (_, c) in self.get_active_clients_mut() {
stog::send(pkg, c)?
}
Ok(())
}
pub fn send_to(&mut self, id: PeerID, pkg: &Pkg<T>) -> stog::Result<()> {
stog::send(pkg, self.clients[id as usize].as_mut().unwrap())
}
pub fn include_new_client(&mut self, new_c: TcpStream) -> stog::Result<PeerID> {
let client_id = self.get_next_free_id();
let new_p = Some(new_c.local_addr().map_err(stog::Error::Io)?.to_string());
let new_c = Some(new_c);
if client_id as usize >= self.clients.len() {
self.clients.resize_with(client_id as usize + 1, || None);
self.peers.resize_with(client_id as usize + 1, || None);
}
assert!(self.clients[client_id as usize].is_none());
self.clients[client_id as usize] = new_c;
self.peers[client_id as usize] = new_p;
self.send_to(client_id, &self.build_pkg(PkgData::SetPeer(client_id)))?;
let mut buf = Vec::default();
std::mem::swap(&mut buf, &mut self.peers);
let pkg_set_peers = self.build_pkg(PkgData::SetPeers(buf));
self.send_to_all(&pkg_set_peers)?;
self.peers = if let PkgData::SetPeers(ps) = pkg_set_peers.data {
ps
} else {
unreachable!()
};
Ok(client_id)
}
pub fn build_pkg(&self, data: PkgData<T>) -> Pkg<T> {
Pkg::new(self.id, data)
}
pub fn poll_conns(&mut self) -> stog::Result<()> {
loop {
let stream = self.server.incoming().next().unwrap();
match stream {
Ok(c) => {
c.set_read_timeout(crate::IO_TIMEOUT)
.expect("cant set IO_TIMEOUT for read in new TcpStream");
c.set_write_timeout(crate::IO_TIMEOUT)
.expect("cant set IO_TIMEOUT for write in new TcpStream");
self.include_new_client(c)?;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
break;
}
Err(e) => panic!("connection io error: {e:?}"),
}
}
Ok(())
}
pub fn poll_pkgs(&mut self) -> stog::Result<Option<Pkg<T>>> {
let clients_id: Vec<_> = self.get_active_clients_id().collect();
for id in clients_id {
loop {
match stog::read::<Pkg<T>>(self.clients[id as usize].as_mut().unwrap()) {
Ok(p) => match self.manage_traffic(p) {
Ok(Some(p)) => return Ok(Some(p)),
Err(e) => panic!("polling error: {e:?}"),
_ => (),
},
Err(stog::Error::NoPackage) => break,
Err(stog::Error::ConnClosed) => {
self.close_client(id)?;
return Ok(None);
}
Err(e) => panic!("polling error: {e:?}"),
}
}
}
Ok(None)
}
pub fn manage_traffic(&mut self, p: Pkg<T>) -> stog::Result<Option<Pkg<T>>> {
match p.data {
PkgData::GlobalData(_) => {
for (_, c) in self
.get_active_clients_mut()
.filter(|(id, _)| *id != p.sender)
{
stog::send(&p, c)?
}
Ok(Some(p))
}
PkgData::Data(_, id) => {
if id == self.id {
Ok(Some(p))
} else {
self.send_to(id, &p)?;
Ok(None)
}
}
_ => panic!("shouldn't receive any IO order from slaves"),
}
}
pub fn close_client(&mut self, id: PeerID) -> stog::Result<()> {
self.clients[id as usize] = None;
self.peers[id as usize] = None;
self.send_to_all(&self.build_pkg(PkgData::SetPeers(self.peers.clone())))
}
pub fn poll(&mut self) -> stog::Result<Option<Pkg<T>>> {
self.poll_conns()?;
self.poll_pkgs()
}
}