use byteorder::{ReadBytesExt, WriteBytesExt, LE};
use std::{
io::{self, ErrorKind},
time::Duration,
};
use super::{Cfg, MultiplexError};
macro_rules! invalid_data {
($msg:expr) => {
io::Error::new(io::ErrorKind::InvalidData, format!("invalid value for {} received", $msg))
};
}
pub const MAGIC: &[u8; 6] = b"CHMUX\0";
#[derive(Debug)]
pub enum MultiplexMsg {
Reset,
Hello {
version: u8,
cfg: ExchangedCfg,
},
Ping,
OpenPort {
client_port: u32,
wait: bool,
},
PortOpened {
client_port: u32,
server_port: u32,
},
Rejected {
client_port: u32,
no_ports: bool,
},
Data {
port: u32,
first: bool,
last: bool,
},
PortData {
port: u32,
first: bool,
last: bool,
wait: bool,
ports: Vec<u32>,
},
PortCredits {
port: u32,
credits: u32,
},
SendFinish {
port: u32,
},
ReceiveClose {
port: u32,
},
ReceiveFinish {
port: u32,
},
ClientFinish,
ListenerFinish,
Goodbye,
}
pub const MSG_RESET: u8 = 1;
pub const MSG_HELLO: u8 = 2;
pub const MSG_PING: u8 = 3;
pub const MSG_OPEN_PORT: u8 = 4;
pub const MSG_PORT_OPENED: u8 = 5;
pub const MSG_REJECTED: u8 = 6;
pub const MSG_DATA: u8 = 7;
pub const MSG_PORT_DATA: u8 = 8;
pub const MSG_PORT_CREDITS: u8 = 9;
pub const MSG_SEND_FINISH: u8 = 10;
pub const MSG_RECEIVE_CLOSE: u8 = 11;
pub const MSG_RECEIVE_FINISH: u8 = 12;
pub const MSG_CLIENT_FINISH: u8 = 13;
pub const MSG_LISTENER_FINISH: u8 = 14;
pub const MSG_GOODBYE: u8 = 15;
pub const MSG_OPEN_PORT_FLAG_WAIT: u8 = 0b00000001;
pub const MSG_REJECTED_FLAG_NO_PORTS: u8 = 0b00000001;
pub const MSG_DATA_FLAG_FIRST: u8 = 0b00000001;
pub const MSG_DATA_FLAG_LAST: u8 = 0b00000010;
pub const MSG_PORT_DATA_FLAG_FIRST: u8 = 0b00000001;
pub const MSG_PORT_DATA_FLAG_LAST: u8 = 0b00000010;
pub const MSG_PORT_DATA_FLAG_WAIT: u8 = 0b00000100;
impl MultiplexMsg {
pub(crate) fn write(&self, mut writer: impl io::Write) -> Result<(), io::Error> {
match self {
MultiplexMsg::Reset => {
writer.write_u8(MSG_RESET)?;
}
MultiplexMsg::Hello { version, cfg } => {
writer.write_u8(MSG_HELLO)?;
writer.write_all(MAGIC)?;
writer.write_u8(*version)?;
cfg.write(&mut writer)?;
}
MultiplexMsg::Ping => {
writer.write_u8(MSG_PING)?;
}
MultiplexMsg::OpenPort { client_port, wait } => {
writer.write_u8(MSG_OPEN_PORT)?;
writer.write_u32::<LE>(*client_port)?;
writer.write_u8(if *wait { MSG_OPEN_PORT_FLAG_WAIT } else { 0 })?;
}
MultiplexMsg::PortOpened { client_port, server_port } => {
writer.write_u8(MSG_PORT_OPENED)?;
writer.write_u32::<LE>(*client_port)?;
writer.write_u32::<LE>(*server_port)?;
}
MultiplexMsg::Rejected { client_port, no_ports } => {
writer.write_u8(MSG_REJECTED)?;
writer.write_u32::<LE>(*client_port)?;
writer.write_u8(if *no_ports { MSG_REJECTED_FLAG_NO_PORTS } else { 0 })?;
}
MultiplexMsg::Data { port, first, last } => {
writer.write_u8(MSG_DATA)?;
writer.write_u32::<LE>(*port)?;
let mut flags = 0;
if *first {
flags |= MSG_DATA_FLAG_FIRST;
}
if *last {
flags |= MSG_DATA_FLAG_LAST;
}
writer.write_u8(flags)?;
}
MultiplexMsg::PortData { port, first, last, wait, ports } => {
writer.write_u8(MSG_PORT_DATA)?;
let mut flags = 0;
if *first {
flags |= MSG_PORT_DATA_FLAG_FIRST;
}
if *last {
flags |= MSG_PORT_DATA_FLAG_LAST;
}
if *wait {
flags |= MSG_PORT_DATA_FLAG_WAIT;
}
writer.write_u8(flags)?;
writer.write_u32::<LE>(*port)?;
for p in ports {
writer.write_u32::<LE>(*p)?;
}
}
MultiplexMsg::PortCredits { port, credits } => {
writer.write_u8(MSG_PORT_CREDITS)?;
writer.write_u32::<LE>(*port)?;
writer.write_u32::<LE>(*credits)?;
}
MultiplexMsg::SendFinish { port } => {
writer.write_u8(MSG_SEND_FINISH)?;
writer.write_u32::<LE>(*port)?;
}
MultiplexMsg::ReceiveClose { port } => {
writer.write_u8(MSG_RECEIVE_CLOSE)?;
writer.write_u32::<LE>(*port)?;
}
MultiplexMsg::ReceiveFinish { port } => {
writer.write_u8(MSG_RECEIVE_FINISH)?;
writer.write_u32::<LE>(*port)?;
}
MultiplexMsg::ClientFinish => {
writer.write_u8(MSG_CLIENT_FINISH)?;
}
MultiplexMsg::ListenerFinish => {
writer.write_u8(MSG_LISTENER_FINISH)?;
}
MultiplexMsg::Goodbye => {
writer.write_u8(MSG_GOODBYE)?;
}
}
Ok(())
}
pub(crate) fn read(mut reader: impl io::Read) -> Result<Self, io::Error> {
let msg = match reader.read_u8()? {
MSG_RESET => Self::Reset,
MSG_HELLO => {
let mut magic = vec![0; MAGIC.len()];
reader.read_exact(&mut magic)?;
if magic != MAGIC {
return Err(invalid_data!("invalid magic"));
}
Self::Hello { version: reader.read_u8()?, cfg: ExchangedCfg::read(&mut reader)? }
}
MSG_PING => Self::Ping,
MSG_OPEN_PORT => Self::OpenPort {
client_port: reader.read_u32::<LE>()?,
wait: reader.read_u8()? & MSG_OPEN_PORT_FLAG_WAIT != 0,
},
MSG_PORT_OPENED => {
Self::PortOpened { client_port: reader.read_u32::<LE>()?, server_port: reader.read_u32::<LE>()? }
}
MSG_REJECTED => Self::Rejected {
client_port: reader.read_u32::<LE>()?,
no_ports: reader.read_u8()? & MSG_REJECTED_FLAG_NO_PORTS != 0,
},
MSG_DATA => {
let port = reader.read_u32::<LE>()?;
let flags = reader.read_u8()?;
Self::Data {
port,
first: flags & MSG_DATA_FLAG_FIRST != 0,
last: flags & MSG_DATA_FLAG_LAST != 0,
}
}
MSG_PORT_DATA => {
let port = reader.read_u32::<LE>()?;
let flags = reader.read_u8()?;
let first = flags & MSG_PORT_DATA_FLAG_FIRST != 0;
let last = flags & MSG_PORT_DATA_FLAG_LAST != 0;
let wait = flags & MSG_PORT_DATA_FLAG_WAIT != 0;
let mut ports = Vec::new();
loop {
match reader.read_u32::<LE>() {
Ok(p) => ports.push(p),
Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
Err(err) => return Err(err),
}
}
Self::PortData { port, first, last, wait, ports }
}
MSG_PORT_CREDITS => {
Self::PortCredits { port: reader.read_u32::<LE>()?, credits: reader.read_u32::<LE>()? }
}
MSG_SEND_FINISH => Self::SendFinish { port: reader.read_u32::<LE>()? },
MSG_RECEIVE_CLOSE => Self::ReceiveClose { port: reader.read_u32::<LE>()? },
MSG_RECEIVE_FINISH => Self::ReceiveFinish { port: reader.read_u32::<LE>()? },
MSG_CLIENT_FINISH => Self::ClientFinish,
MSG_LISTENER_FINISH => Self::ListenerFinish,
MSG_GOODBYE => Self::Goodbye,
_ => return Err(invalid_data!("invalid message id")),
};
Ok(msg)
}
pub(crate) fn to_vec(&self) -> Vec<u8> {
let mut data = Vec::new();
self.write(&mut data).expect("message serialization failed");
data
}
pub(crate) fn from_slice<SinkError, StreamError>(
data: &[u8],
) -> Result<Self, MultiplexError<SinkError, StreamError>> {
Self::read(data).map_err(|err| MultiplexError::Protocol(err.to_string()))
}
}
#[derive(Clone, Debug)]
pub struct ExchangedCfg {
pub connection_timeout: Option<Duration>,
pub chunk_size: u32,
pub port_receive_buffer: u32,
pub connect_queue: u16,
}
impl ExchangedCfg {
pub(crate) fn write(&self, mut writer: impl io::Write) -> Result<(), io::Error> {
writer.write_u64::<LE>(
self.connection_timeout.unwrap_or_default().as_millis().min(u64::MAX as u128) as u64
)?;
writer.write_u32::<LE>(self.chunk_size)?;
writer.write_u32::<LE>(self.port_receive_buffer)?;
writer.write_u16::<LE>(self.connect_queue)?;
Ok(())
}
pub(crate) fn read(mut reader: impl io::Read) -> Result<Self, io::Error> {
let this = Self {
connection_timeout: match reader.read_u64::<LE>()? {
0 => None,
millis => Some(Duration::from_millis(millis)),
},
chunk_size: match reader.read_u32::<LE>()? {
cs if cs >= 4 => cs,
_ => return Err(invalid_data!("chunk_size")),
},
port_receive_buffer: match reader.read_u32::<LE>()? {
prb if prb >= 4 => prb,
_ => return Err(invalid_data!("port_receive_buffer")),
},
connect_queue: reader.read_u16::<LE>()?,
};
Ok(this)
}
}
impl From<&Cfg> for ExchangedCfg {
fn from(cfg: &Cfg) -> Self {
Self {
connection_timeout: cfg.connection_timeout,
chunk_size: cfg.chunk_size,
port_receive_buffer: cfg.receive_buffer,
connect_queue: cfg.connect_queue,
}
}
}