tubes 0.6.4

Host/Client protocol based on pipenet
Documentation
use crate::prelude::*;

use crate::core::ReconnectTo;
use crate::core::transport::is_aborted_error;
use crate::data::MessageDataInternal;
use pipenet::NonBlockStream;
use std::sync::Mutex;
use std::sync::mpsc::{Receiver, Sender};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

pub(crate) fn client_loop<const MS: usize>(
    mut socket: NonBlockStream<MS>,
    reconnect_to: &Mutex<Option<ReconnectTo>>,
    rx_writer: &Receiver<MessageDataInternal>,
    tx_reader: &Sender<MessageData>,
) {
    loop {
        let readres = client_read(&mut socket, tx_reader);
        if let Err(e) = readres {
            if is_aborted_error(e) {
                return;
            }
            continue;
        }
        if let Err(e) = client_write(&mut socket, rx_writer) {
            if is_aborted_error(e) {
                return;
            }
            continue;
        }

        if let Ok(Some(to)) = readres
            && let Ok(mut dst) = reconnect_to.lock()
        {
            *dst = Some(to);
        }
    }
}

fn client_read<const MS: usize>(
    socket: &mut NonBlockStream<MS>,
    tx_reader: &Sender<MessageData>,
) -> Result<Option<ReconnectTo>> {
    let mut reconnect_to = None;
    // Err in socket.read() means connection aborted.
    while let Some(msg) = socket.read()? {
        let msg: MessageDataInternal = (*msg).try_into()?;
        // Special cases, this needs to trigger an internal state change.
        match msg {
            MessageDataInternal::Broadcast(sender, m) => {
                let _ = tx_reader.send(MessageData::Broadcast {
                    from: sender,
                    data: m,
                });
            }
            MessageDataInternal::Send(sender, dst, m) => {
                let _ = tx_reader.send(MessageData::Send {
                    from: sender,
                    to: dst,
                    data: m,
                });
            }
            // This is handled directly on stream connect
            MessageDataInternal::ServerUuid(_) => {}
            MessageDataInternal::ClientJoined(uuid) => {
                let _ = tx_reader.send(MessageData::ClientJoined(uuid));
            }
            MessageDataInternal::ClientLeft(uuid) => {
                let _ = tx_reader.send(MessageData::ClientLeft(uuid));
            }
            MessageDataInternal::PromoteToHost(_, ip, port) => {
                let msg = MessageDataInternal::NewHost(ip, port);
                socket.write(msg.try_into()?)?;
                reconnect_to = Some(ReconnectTo {
                    become_server: true,
                    address: ip,
                    port,
                });
            }
            MessageDataInternal::NewHost(ip, port) => {
                reconnect_to = Some(ReconnectTo {
                    become_server: false,
                    address: ip,
                    port,
                });
            }
        }
    }
    Ok(reconnect_to)
}

fn client_write<const MS: usize>(
    socket: &mut NonBlockStream<MS>,
    rx_writer: &Receiver<MessageDataInternal>,
) -> Result<()> {
    while let Ok(msg) = rx_writer.try_recv() {
        // Err in socket.write() means connection aborted.
        socket.write(msg.try_into()?)?;
    }
    Ok(())
}