use crate::prelude::*;
use crate::core::ReconnectTo;
use crate::core::transport::is_aborted_error;
use crate::data::MessageDataInternal;
use pipenet::NonBlockStream;
use std::sync::Mutex;
use std::sync::mpsc::{Receiver, Sender};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
pub(crate) fn client_loop<const MS: usize>(
mut socket: NonBlockStream<MS>,
reconnect_to: &Mutex<Option<ReconnectTo>>,
rx_writer: &Receiver<MessageDataInternal>,
tx_reader: &Sender<MessageData>,
) {
loop {
let readres = client_read(&mut socket, tx_reader);
if let Err(e) = readres {
if is_aborted_error(e) {
return;
}
continue;
}
if let Err(e) = client_write(&mut socket, rx_writer) {
if is_aborted_error(e) {
return;
}
continue;
}
if let Ok(Some(to)) = readres
&& let Ok(mut dst) = reconnect_to.lock()
{
*dst = Some(to);
}
}
}
fn client_read<const MS: usize>(
socket: &mut NonBlockStream<MS>,
tx_reader: &Sender<MessageData>,
) -> Result<Option<ReconnectTo>> {
let mut reconnect_to = None;
while let Some(msg) = socket.read()? {
let msg: MessageDataInternal = (*msg).try_into()?;
match msg {
MessageDataInternal::Broadcast(sender, m) => {
let _ = tx_reader.send(MessageData::Broadcast {
from: sender,
data: m,
});
}
MessageDataInternal::Send(sender, dst, m) => {
let _ = tx_reader.send(MessageData::Send {
from: sender,
to: dst,
data: m,
});
}
MessageDataInternal::ServerUuid(_) => {}
MessageDataInternal::ClientJoined(uuid) => {
let _ = tx_reader.send(MessageData::ClientJoined(uuid));
}
MessageDataInternal::ClientLeft(uuid) => {
let _ = tx_reader.send(MessageData::ClientLeft(uuid));
}
MessageDataInternal::PromoteToHost(_, ip, port) => {
let msg = MessageDataInternal::NewHost(ip, port);
socket.write(msg.try_into()?)?;
reconnect_to = Some(ReconnectTo {
become_server: true,
address: ip,
port,
});
}
MessageDataInternal::NewHost(ip, port) => {
reconnect_to = Some(ReconnectTo {
become_server: false,
address: ip,
port,
});
}
}
}
Ok(reconnect_to)
}
fn client_write<const MS: usize>(
socket: &mut NonBlockStream<MS>,
rx_writer: &Receiver<MessageDataInternal>,
) -> Result<()> {
while let Ok(msg) = rx_writer.try_recv() {
socket.write(msg.try_into()?)?;
}
Ok(())
}