#![allow(dead_code)]
#[derive(Debug, Clone)]
pub struct StreamChannel {
pub id: u8,
pub label: String,
pub max_frame_size: usize,
active: bool,
}
impl StreamChannel {
#[must_use]
pub fn new(id: u8, label: impl Into<String>, max_frame_size: usize) -> Self {
Self {
id,
label: label.into(),
max_frame_size,
active: true,
}
}
#[must_use]
pub fn is_active(&self) -> bool {
self.active
}
pub fn activate(&mut self) {
self.active = true;
}
pub fn deactivate(&mut self) {
self.active = false;
}
}
#[derive(Debug, Clone)]
pub struct MuxConfig {
pub max_channels: usize,
pub default_max_frame: usize,
}
impl Default for MuxConfig {
fn default() -> Self {
Self {
max_channels: 16,
default_max_frame: 65_535,
}
}
}
impl MuxConfig {
#[must_use]
pub fn new(max_channels: usize, default_max_frame: usize) -> Self {
Self {
max_channels,
default_max_frame,
}
}
#[must_use]
pub fn channel_count(&self) -> usize {
self.max_channels
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MuxError {
UnknownChannel(u8),
InactiveChannel(u8),
PayloadTooLarge,
CapacityExceeded,
FrameTooShort,
FrameLengthMismatch,
}
const HEADER_LEN: usize = 5;
#[derive(Debug)]
pub struct StreamMux {
channels: Vec<StreamChannel>,
config: MuxConfig,
frames_muxed: u64,
frames_demuxed: u64,
}
impl StreamMux {
#[must_use]
pub fn new(config: MuxConfig) -> Self {
Self {
channels: Vec::new(),
config,
frames_muxed: 0,
frames_demuxed: 0,
}
}
#[must_use]
pub fn channel_count(&self) -> usize {
self.channels.len()
}
#[must_use]
pub fn frames_muxed(&self) -> u64 {
self.frames_muxed
}
#[must_use]
pub fn frames_demuxed(&self) -> u64 {
self.frames_demuxed
}
pub fn add_channel(&mut self, channel: StreamChannel) -> Result<(), MuxError> {
if self.channels.len() >= self.config.max_channels {
return Err(MuxError::CapacityExceeded);
}
if self.channels.iter().any(|c| c.id == channel.id) {
return Err(MuxError::UnknownChannel(channel.id));
}
self.channels.push(channel);
Ok(())
}
pub fn remove_channel(&mut self, id: u8) {
self.channels.retain(|c| c.id != id);
}
pub fn mux_frame(&mut self, channel_id: u8, payload: &[u8]) -> Result<Vec<u8>, MuxError> {
let ch = self
.channels
.iter()
.find(|c| c.id == channel_id)
.ok_or(MuxError::UnknownChannel(channel_id))?;
if !ch.is_active() {
return Err(MuxError::InactiveChannel(channel_id));
}
if payload.len() > ch.max_frame_size {
return Err(MuxError::PayloadTooLarge);
}
let len = payload.len() as u32;
let mut frame = Vec::with_capacity(HEADER_LEN + payload.len());
frame.push(channel_id);
frame.extend_from_slice(&len.to_be_bytes());
frame.extend_from_slice(payload);
self.frames_muxed += 1;
Ok(frame)
}
pub fn demux_frame<'a>(&mut self, frame: &'a [u8]) -> Result<(u8, &'a [u8]), MuxError> {
if frame.len() < HEADER_LEN {
return Err(MuxError::FrameTooShort);
}
let channel_id = frame[0];
let len = u32::from_be_bytes([frame[1], frame[2], frame[3], frame[4]]) as usize;
if frame.len() != HEADER_LEN + len {
return Err(MuxError::FrameLengthMismatch);
}
let payload = &frame[HEADER_LEN..];
self.frames_demuxed += 1;
Ok((channel_id, payload))
}
}
impl Default for StreamMux {
fn default() -> Self {
Self::new(MuxConfig::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn basic_mux() -> StreamMux {
let mut mux = StreamMux::default();
mux.add_channel(StreamChannel::new(1, "video", 1_000_000))
.expect("should succeed in test");
mux.add_channel(StreamChannel::new(2, "audio", 65_535))
.expect("should succeed in test");
mux
}
#[test]
fn test_channel_is_active_default() {
let ch = StreamChannel::new(0, "test", 1024);
assert!(ch.is_active());
}
#[test]
fn test_channel_deactivate() {
let mut ch = StreamChannel::new(0, "test", 1024);
ch.deactivate();
assert!(!ch.is_active());
}
#[test]
fn test_channel_activate() {
let mut ch = StreamChannel::new(0, "test", 1024);
ch.deactivate();
ch.activate();
assert!(ch.is_active());
}
#[test]
fn test_mux_config_channel_count() {
let cfg = MuxConfig::new(8, 4096);
assert_eq!(cfg.channel_count(), 8);
}
#[test]
fn test_mux_add_channel() {
let mux = basic_mux();
assert_eq!(mux.channel_count(), 2);
}
#[test]
fn test_mux_add_duplicate_id_error() {
let mut mux = basic_mux();
let result = mux.add_channel(StreamChannel::new(1, "dup", 512));
assert!(result.is_err());
}
#[test]
fn test_mux_frame_roundtrip() {
let mut mux = basic_mux();
let payload = b"hello world";
let frame = mux.mux_frame(1, payload).expect("should succeed in test");
let (ch_id, recovered) = mux.demux_frame(&frame).expect("should succeed in test");
assert_eq!(ch_id, 1);
assert_eq!(recovered, payload);
}
#[test]
fn test_mux_unknown_channel() {
let mut mux = basic_mux();
assert_eq!(
mux.mux_frame(99, b"data").unwrap_err(),
MuxError::UnknownChannel(99)
);
}
#[test]
fn test_mux_inactive_channel() {
let mut mux = basic_mux();
mux.channels[0].deactivate();
assert_eq!(
mux.mux_frame(1, b"data").unwrap_err(),
MuxError::InactiveChannel(1)
);
}
#[test]
fn test_mux_payload_too_large() {
let mut mux = StreamMux::default();
mux.add_channel(StreamChannel::new(5, "small", 4))
.expect("should succeed in test");
assert_eq!(
mux.mux_frame(5, b"toolarge").unwrap_err(),
MuxError::PayloadTooLarge
);
}
#[test]
fn test_demux_frame_too_short() {
let mut mux = basic_mux();
assert_eq!(
mux.demux_frame(&[1u8; 3]).unwrap_err(),
MuxError::FrameTooShort
);
}
#[test]
fn test_demux_frame_length_mismatch() {
let mut mux = basic_mux();
let mut bad = vec![1u8, 0, 0, 0, 100];
bad.extend_from_slice(&[0u8; 2]);
assert_eq!(
mux.demux_frame(&bad).unwrap_err(),
MuxError::FrameLengthMismatch
);
}
#[test]
fn test_mux_frame_counter() {
let mut mux = basic_mux();
mux.mux_frame(1, b"a").expect("should succeed in test");
mux.mux_frame(2, b"b").expect("should succeed in test");
assert_eq!(mux.frames_muxed(), 2);
}
#[test]
fn test_demux_frame_counter() {
let mut mux = basic_mux();
let frame = mux.mux_frame(1, b"test").expect("should succeed in test");
mux.demux_frame(&frame).expect("should succeed in test");
assert_eq!(mux.frames_demuxed(), 1);
}
}