cig 0.1.2

Simplify TCP/IP applications with a transparential, persistent-mode and data-driven protocol
Documentation
use std::{
    io,
    marker::PhantomData,
    net::{TcpListener, TcpStream, ToSocketAddrs},
};

use serde::{de::DeserializeOwned, Serialize};

use crate::{
    pkg::{Pkg, PkgData},
    stog, PeerID,
};

pub struct Host<T: DeserializeOwned + Serialize> {
    pub server: TcpListener,
    pub clients: Vec<Option<TcpStream>>,
    pub id: PeerID,
    pub peers: Vec<Option<String>>,
    pub _marker: PhantomData<T>,
}

impl<T: DeserializeOwned + Serialize> Host<T> {
    pub fn bind<A>(addr: A) -> io::Result<Self>
    where
        A: ToSocketAddrs,
    {
        let listener = TcpListener::bind(addr)?;
        listener
            .set_nonblocking(true)
            .expect("can't set nonblocking for host");
        let fmt_addr = listener.local_addr()?.to_string();
        let s = Self {
            server: listener,
            clients: vec![None], // fixed
            id: 0,               // fixed
            peers: vec![Some(fmt_addr)],
            _marker: PhantomData,
        };
        Ok(s)
    }

    pub fn get_next_free_id(&self) -> PeerID {
        self.clients[1..]
            .iter()
            .position(Option::is_none)
            .map_or_else(|| self.clients.len(), |p| p) as PeerID
    }

    pub fn get_active_clients_mut(&mut self) -> impl Iterator<Item = (PeerID, &mut TcpStream)> {
        self.clients
            .iter_mut()
            .enumerate()
            .filter_map(|(i, p)| p.as_mut().map(|p| (i as PeerID, p)))
    }

    pub fn get_active_clients_id(&self) -> impl Iterator<Item = PeerID> + '_ {
        self.clients.iter().enumerate().filter_map(|(i, p)| {
            if p.is_some() {
                Some(i as PeerID)
            } else {
                None
            }
        })
    }

    pub fn send_to_all(&mut self, pkg: &Pkg<T>) -> stog::Result<()> {
        for (_, c) in self.get_active_clients_mut() {
            stog::send(pkg, c)?
        }
        Ok(())
    }

    pub fn send_to(&mut self, id: PeerID, pkg: &Pkg<T>) -> stog::Result<()> {
        stog::send(pkg, self.clients[id as usize].as_mut().unwrap())
    }

    pub fn include_new_client(&mut self, new_c: TcpStream) -> stog::Result<PeerID> {
        let client_id = self.get_next_free_id();
        // println!("new client identified. Peer: {client_id}");
        let new_p = Some(new_c.local_addr().map_err(stog::Error::Io)?.to_string());
        let new_c = Some(new_c);
        if client_id as usize >= self.clients.len() {
            self.clients.resize_with(client_id as usize + 1, || None);
            self.peers.resize_with(client_id as usize + 1, || None);
        }
        assert!(self.clients[client_id as usize].is_none());
        self.clients[client_id as usize] = new_c;
        self.peers[client_id as usize] = new_p;

        self.send_to(client_id, &self.build_pkg(PkgData::SetPeer(client_id)))?;

        let mut buf = Vec::default();
        std::mem::swap(&mut buf, &mut self.peers);
        let pkg_set_peers = self.build_pkg(PkgData::SetPeers(buf));

        self.send_to_all(&pkg_set_peers)?;
        self.peers = if let PkgData::SetPeers(ps) = pkg_set_peers.data {
            ps
        } else {
            unreachable!()
        };
        Ok(client_id)
    }

    pub fn build_pkg(&self, data: PkgData<T>) -> Pkg<T> {
        Pkg::new(self.id, data)
    }

    pub fn poll_conns(&mut self) -> stog::Result<()> {
        loop {
            let stream = self.server.incoming().next().unwrap();
            match stream {
                Ok(c) => {
                    c.set_read_timeout(crate::IO_TIMEOUT)
                        .expect("cant set IO_TIMEOUT for read in new TcpStream");
                    c.set_write_timeout(crate::IO_TIMEOUT)
                        .expect("cant set IO_TIMEOUT for write in new TcpStream");
                    self.include_new_client(c)?;
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    break;
                }
                Err(e) => panic!("connection io error: {e:?}"),
            }
        }
        Ok(())
    }

    pub fn poll_pkgs(&mut self) -> stog::Result<Option<Pkg<T>>> {
        // optimise
        let clients_id: Vec<_> = self.get_active_clients_id().collect();
        for id in clients_id {
            loop {
                match stog::read::<Pkg<T>>(self.clients[id as usize].as_mut().unwrap()) {
                    Ok(p) => match self.manage_traffic(p) {
                        Ok(Some(p)) => return Ok(Some(p)),
                        Err(e) => panic!("polling error: {e:?}"),
                        _ => (),
                    },
                    Err(stog::Error::NoPackage) => break,
                    Err(stog::Error::ConnClosed) => {
                        self.close_client(id)?;
                        return Ok(None);
                    }
                    Err(e) => panic!("polling error: {e:?}"),
                }
            }
        }
        Ok(None)
    }

    pub fn manage_traffic(&mut self, p: Pkg<T>) -> stog::Result<Option<Pkg<T>>> {
        match p.data {
            PkgData::GlobalData(_) => {
                // println!("handling a globaldata from {}", p.sender);
                for (_, c) in self
                    .get_active_clients_mut()
                    .filter(|(id, _)| *id != p.sender)
                {
                    // println!("\tsending to {}", id);
                    stog::send(&p, c)?
                }
                Ok(Some(p))
            }
            PkgData::Data(_, id) => {
                if id == self.id {
                    Ok(Some(p))
                } else {
                    self.send_to(id, &p)?;
                    Ok(None)
                }
            }
            _ => panic!("shouldn't receive any IO order from slaves"),
        }
    }

    pub fn close_client(&mut self, id: PeerID) -> stog::Result<()> {
        self.clients[id as usize] = None;
        self.peers[id as usize] = None;
        self.send_to_all(&self.build_pkg(PkgData::SetPeers(self.peers.clone())))
    }

    pub fn poll(&mut self) -> stog::Result<Option<Pkg<T>>> {
        self.poll_conns()?;
        self.poll_pkgs()
    }
}