tubes 0.6.4

Host/Client protocol based on pipenet
Documentation
use crate::client_id::ClientId;
use crate::{core::to_pipenet, data::MessageDataInternal, prelude::*};

use pipenet::NonBlockStream;
use std::{
    collections::HashMap,
    net::{TcpListener, TcpStream},
    sync::Mutex,
    time::{Duration, Instant},
};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

pub(crate) fn accept_loop<const MS: usize>(
    server_uuid: ClientId,
    config: &Config,
    clients: &Mutex<HashMap<ClientId, NonBlockStream<MS>>>,
    listener: &TcpListener,
    accept_timeout: Duration,
) {
    while let Ok((stream, _)) = listener.accept() {
        let Ok(mut map) = clients.lock() else {
            return;
        };
        accept_one::<MS>(server_uuid, config, &mut map, accept_timeout, stream);
    }
}

fn accept_one<const MS: usize>(
    server_uuid: ClientId,
    config: &Config,
    map: &mut HashMap<ClientId, NonBlockStream<MS>>,
    accept_timeout: Duration,
    stream: TcpStream,
) {
    if stream.set_nonblocking(true).is_err() {
        // client will be dropped here: cannot set nonblocking
        eprintln!("[{server_uuid}] accept: Cannot set client to nonblocking");
        return;
    }
    let mut client = to_pipenet::<MS>(stream, config);
    let Ok(Some(accepted_uuid)) = wait_for_client_joined_message(accept_timeout, &mut client)
    else {
        // client will be dropped here: timed out waiting for join
        eprintln!("[{server_uuid}] accept: Timed out while accepting unknown client");
        return;
    };
    // Notify the client of the uuid of the server.
    let Ok(server_uuid_msg) = (MessageDataInternal::ServerUuid(server_uuid)).try_into() else {
        eprintln!("[{server_uuid}] accept: Cannot build message");
        return;
    };
    let Ok(accepted_client_joined_msg): Result<Vec<u8>> =
        (MessageDataInternal::ClientJoined(accepted_uuid)).try_into()
    else {
        eprintln!("[{server_uuid}] accept: Cannot build message");
        return;
    };
    let Ok(()) = client.write(server_uuid_msg) else {
        // If can't send uuid to client, drop and continue.
        eprintln!("[{server_uuid}] accept: Cannot send message to client {accepted_uuid}");
        return;
    };
    for (map_uuid, map_client) in map.iter_mut() {
        // Tell to each existing client that this new client joined
        let Ok(()) = map_client.write(accepted_client_joined_msg.clone()) else {
            // If can't send uuids to client, drop and continue.
            eprintln!("[{server_uuid}] accept: Cannot send message to client {map_uuid}");
            return;
        };
        // Also tell the new joiner about all the previously existing clients.
        let Ok(map_client_joined_msg): Result<Vec<u8>> =
            (MessageDataInternal::ClientJoined(*map_uuid)).try_into()
        else {
            eprintln!("[{server_uuid}] accept: Cannot build message");
            return;
        };
        let Ok(()) = client.write(map_client_joined_msg.clone()) else {
            // If can't send uuids to client, drop and continue.
            eprintln!("[{server_uuid}] accept: Cannot send message to client {accepted_uuid}");
            return;
        };
    }

    // Only now insert it into the map
    map.insert(accepted_uuid, client);
}

fn wait_for_client_joined_message<const MS: usize>(
    timeout: Duration,
    client: &mut NonBlockStream<MS>,
) -> Result<Option<ClientId>> {
    let now = Instant::now();
    loop {
        let Some(msg) = client.read()? else {
            continue;
        };
        let msg = MessageDataInternal::try_from(msg.as_slice())?;
        if let MessageDataInternal::ClientJoined(uuid) = msg {
            return Ok(Some(uuid));
        }
        if now.elapsed() > timeout {
            return Ok(None);
        }
    }
}