use std::io;
use std::io::prelude::*;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
const TAG_DATA: u8 = 7;
const TAG_FATAL: u8 = 1;
pub struct DemuxRead {
r: Box<dyn Read>,
current_packet_len: usize,
}
impl Read for DemuxRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.current_packet_len == 0 {
self.current_packet_len = self.read_header_consume_messages()?;
}
let max_len = std::cmp::min(buf.len(), self.current_packet_len);
let read_len = self.r.read(&mut buf[..max_len])?;
self.current_packet_len -= read_len;
Ok(read_len)
}
}
impl DemuxRead {
pub fn new(r: Box<dyn Read>) -> DemuxRead {
DemuxRead {
r,
current_packet_len: 0,
}
}
fn read_header_consume_messages(&mut self) -> io::Result<usize> {
loop {
let mut h = [0u8; 4];
if let Err(e) = self.r.read_exact(&mut h) {
match e.kind() {
io::ErrorKind::UnexpectedEof => {
debug!("Clean eof before mux packet");
return Ok(0);
}
_ => return Err(e),
}
}
let h = u32::from_le_bytes(h);
let tag = (h >> 24) as u8;
let len = (h & 0xff_ffff) as usize;
trace!("Read envelope tag {:#04x} length {:#x}", tag, len);
if tag == TAG_DATA {
if len == 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Zero-length data packet received",
));
}
return Ok(len);
}
let mut message = vec![0; len];
self.r.read_exact(&mut message)?;
info!("REMOTE: {}", String::from_utf8_lossy(&message).trim_end());
if tag == TAG_FATAL {
return Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
"Remote signalled fatal error",
));
}
}
}
}
#[allow(unused)]
pub struct MuxWrite {
w: Box<dyn Write>,
}
impl MuxWrite {
#[allow(unused)]
pub fn new(w: Box<dyn Write>) -> MuxWrite {
MuxWrite { w }
}
}
impl Write for MuxWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let l = buf.len();
assert!(
l < 0x0ff_ffff,
"Data length {:#x} is too much for one packet",
l
);
let l: u32 = l as u32 | ((TAG_DATA as u32) << 24);
let h = l.to_le_bytes();
self.w
.write_all(&h)
.expect("failed to write envelope header");
self.w
.write_all(buf)
.expect("failed to write envelope body");
trace!("Send envelope tag {:#x} data {}", l, hex::encode(buf));
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
self.w.flush()
}
}