#![allow(dead_code)]
#![allow(clippy::doc_markdown)]
#![allow(clippy::similar_names)]
#![allow(clippy::unreadable_literal)]
#![allow(clippy::cast_possible_truncation)]
#![allow(clippy::cast_precision_loss)]
#![allow(clippy::cast_lossless)]
#![allow(clippy::cast_sign_loss)]
#![allow(clippy::match_same_arms)]
#![allow(clippy::many_single_char_names)]
#![allow(clippy::unnecessary_wraps)]
#![allow(clippy::range_plus_one)]
#![allow(clippy::needless_pass_by_value)]
#![allow(clippy::manual_div_ceil)]
#![allow(clippy::comparison_chain)]
#![allow(clippy::unused_self)]
#![allow(clippy::trivially_copy_pass_by_ref)]
#![allow(clippy::missing_errors_doc)]
#![allow(clippy::too_many_arguments)]
#![allow(clippy::struct_excessive_bools)]
#![allow(clippy::needless_range_loop)]
#![allow(clippy::redundant_closure_for_method_calls)]
#![allow(clippy::must_use_candidate)]
#![allow(clippy::should_implement_trait)]
#![allow(clippy::items_after_statements)]
#![allow(clippy::if_not_else)]
#![allow(clippy::format_push_string)]
#![allow(clippy::single_match_else)]
#![allow(clippy::redundant_slicing)]
#![allow(clippy::uninlined_format_args)]
#![allow(clippy::map_unwrap_or)]
#![allow(clippy::derivable_impls)]
#![allow(clippy::assigning_clones)]
#![allow(clippy::if_same_then_else)]
#![allow(clippy::format_collect)]
#![allow(clippy::useless_conversion)]
#![allow(clippy::unused_async)]
#![allow(clippy::identity_op)]
use crate::error::{NetError, NetResult};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::collections::HashMap;
pub const DEFAULT_CHUNK_SIZE: u32 = 128;
pub const MAX_CHUNK_SIZE: u32 = 65536;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChunkHeaderType {
Full,
SameStream,
TimestampOnly,
Continuation,
}
impl ChunkHeaderType {
#[must_use]
pub const fn format_value(&self) -> u8 {
match self {
Self::Full => 0,
Self::SameStream => 1,
Self::TimestampOnly => 2,
Self::Continuation => 3,
}
}
#[must_use]
pub const fn from_format(fmt: u8) -> Option<Self> {
match fmt {
0 => Some(Self::Full),
1 => Some(Self::SameStream),
2 => Some(Self::TimestampOnly),
3 => Some(Self::Continuation),
_ => None,
}
}
#[must_use]
pub const fn header_size(&self) -> usize {
match self {
Self::Full => 11,
Self::SameStream => 7,
Self::TimestampOnly => 3,
Self::Continuation => 0,
}
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct MessageHeader {
pub timestamp: u32,
pub message_length: u32,
pub message_type: u8,
pub message_stream_id: u32,
}
impl MessageHeader {
#[must_use]
pub const fn new(
timestamp: u32,
message_length: u32,
message_type: u8,
message_stream_id: u32,
) -> Self {
Self {
timestamp,
message_length,
message_type,
message_stream_id,
}
}
#[must_use]
pub const fn needs_extended_timestamp(&self) -> bool {
self.timestamp >= 0x00FF_FFFF
}
}
#[derive(Debug, Clone)]
pub struct ChunkHeader {
pub format: ChunkHeaderType,
pub chunk_stream_id: u32,
pub message: MessageHeader,
pub extended_timestamp: Option<u32>,
}
impl ChunkHeader {
#[must_use]
pub fn new(format: ChunkHeaderType, chunk_stream_id: u32, message: MessageHeader) -> Self {
let extended_timestamp = if message.needs_extended_timestamp() {
Some(message.timestamp)
} else {
None
};
Self {
format,
chunk_stream_id,
message,
extended_timestamp,
}
}
#[must_use]
pub const fn timestamp(&self) -> u32 {
match self.extended_timestamp {
Some(ts) => ts,
None => self.message.timestamp,
}
}
pub fn encode_basic_header(&self, buf: &mut BytesMut) {
let fmt = self.format.format_value();
let csid = self.chunk_stream_id;
if csid < 64 {
buf.put_u8((fmt << 6) | (csid as u8));
} else if csid < 320 {
buf.put_u8(fmt << 6);
buf.put_u8((csid - 64) as u8);
} else {
buf.put_u8((fmt << 6) | 1);
let adjusted = csid - 64;
buf.put_u8((adjusted & 0xFF) as u8);
buf.put_u8(((adjusted >> 8) & 0xFF) as u8);
}
}
pub fn encode(&self, buf: &mut BytesMut) {
self.encode_basic_header(buf);
let timestamp = if self.message.needs_extended_timestamp() {
0x00FF_FFFF
} else {
self.message.timestamp
};
match self.format {
ChunkHeaderType::Full => {
buf.put_u8(((timestamp >> 16) & 0xFF) as u8);
buf.put_u8(((timestamp >> 8) & 0xFF) as u8);
buf.put_u8((timestamp & 0xFF) as u8);
buf.put_u8(((self.message.message_length >> 16) & 0xFF) as u8);
buf.put_u8(((self.message.message_length >> 8) & 0xFF) as u8);
buf.put_u8((self.message.message_length & 0xFF) as u8);
buf.put_u8(self.message.message_type);
buf.put_u32_le(self.message.message_stream_id);
}
ChunkHeaderType::SameStream => {
buf.put_u8(((timestamp >> 16) & 0xFF) as u8);
buf.put_u8(((timestamp >> 8) & 0xFF) as u8);
buf.put_u8((timestamp & 0xFF) as u8);
buf.put_u8(((self.message.message_length >> 16) & 0xFF) as u8);
buf.put_u8(((self.message.message_length >> 8) & 0xFF) as u8);
buf.put_u8((self.message.message_length & 0xFF) as u8);
buf.put_u8(self.message.message_type);
}
ChunkHeaderType::TimestampOnly => {
buf.put_u8(((timestamp >> 16) & 0xFF) as u8);
buf.put_u8(((timestamp >> 8) & 0xFF) as u8);
buf.put_u8((timestamp & 0xFF) as u8);
}
ChunkHeaderType::Continuation => {
}
}
if let Some(ext) = self.extended_timestamp {
buf.put_u32(ext);
}
}
pub fn decode(buf: &mut &[u8], prev_headers: &HashMap<u32, ChunkHeader>) -> NetResult<Self> {
if buf.is_empty() {
return Err(NetError::parse(0, "Empty chunk header"));
}
let first_byte = buf.get_u8();
let fmt = first_byte >> 6;
let csid_marker = first_byte & 0x3F;
let chunk_stream_id = match csid_marker {
0 => {
if buf.is_empty() {
return Err(NetError::parse(0, "Incomplete chunk stream ID"));
}
64 + u32::from(buf.get_u8())
}
1 => {
if buf.len() < 2 {
return Err(NetError::parse(0, "Incomplete chunk stream ID"));
}
let b0 = buf.get_u8();
let b1 = buf.get_u8();
64 + u32::from(b0) + (u32::from(b1) << 8)
}
_ => u32::from(csid_marker),
};
let format = ChunkHeaderType::from_format(fmt)
.ok_or_else(|| NetError::parse(0, "Invalid chunk format"))?;
let prev = prev_headers.get(&chunk_stream_id);
let mut message = prev.map(|h| h.message).unwrap_or_default();
#[allow(unused_assignments)]
let mut timestamp_field = 0u32;
match format {
ChunkHeaderType::Full => {
if buf.len() < 11 {
return Err(NetError::parse(0, "Incomplete type 0 header"));
}
timestamp_field = (u32::from(buf.get_u8()) << 16)
| (u32::from(buf.get_u8()) << 8)
| u32::from(buf.get_u8());
message.message_length = (u32::from(buf.get_u8()) << 16)
| (u32::from(buf.get_u8()) << 8)
| u32::from(buf.get_u8());
message.message_type = buf.get_u8();
message.message_stream_id = buf.get_u32_le();
message.timestamp = timestamp_field;
}
ChunkHeaderType::SameStream => {
if buf.len() < 7 {
return Err(NetError::parse(0, "Incomplete type 1 header"));
}
timestamp_field = (u32::from(buf.get_u8()) << 16)
| (u32::from(buf.get_u8()) << 8)
| u32::from(buf.get_u8());
message.message_length = (u32::from(buf.get_u8()) << 16)
| (u32::from(buf.get_u8()) << 8)
| u32::from(buf.get_u8());
message.message_type = buf.get_u8();
message.timestamp = timestamp_field;
}
ChunkHeaderType::TimestampOnly => {
if buf.len() < 3 {
return Err(NetError::parse(0, "Incomplete type 2 header"));
}
timestamp_field = (u32::from(buf.get_u8()) << 16)
| (u32::from(buf.get_u8()) << 8)
| u32::from(buf.get_u8());
message.timestamp = timestamp_field;
}
ChunkHeaderType::Continuation => {
timestamp_field = prev.map(|h| h.message.timestamp).unwrap_or(0);
}
}
let extended_timestamp = if timestamp_field == 0x00FF_FFFF {
if buf.len() < 4 {
return Err(NetError::parse(0, "Incomplete extended timestamp"));
}
let ext = buf.get_u32();
message.timestamp = ext;
Some(ext)
} else {
None
};
Ok(Self {
format,
chunk_stream_id,
message,
extended_timestamp,
})
}
}
#[derive(Debug)]
struct ChunkStreamState {
last_header: Option<ChunkHeader>,
message_buffer: BytesMut,
message_length: u32,
bytes_received: u32,
}
impl ChunkStreamState {
fn new() -> Self {
Self {
last_header: None,
message_buffer: BytesMut::new(),
message_length: 0,
bytes_received: 0,
}
}
fn reset(&mut self) {
self.message_buffer.clear();
self.message_length = 0;
self.bytes_received = 0;
}
}
#[derive(Debug, Clone)]
pub struct AssembledMessage {
pub chunk_stream_id: u32,
pub header: MessageHeader,
pub payload: Bytes,
}
#[derive(Debug)]
pub struct ChunkStream {
rx_chunk_size: u32,
tx_chunk_size: u32,
streams: HashMap<u32, ChunkStreamState>,
prev_headers: HashMap<u32, ChunkHeader>,
}
impl ChunkStream {
#[must_use]
pub fn new() -> Self {
Self {
rx_chunk_size: DEFAULT_CHUNK_SIZE,
tx_chunk_size: DEFAULT_CHUNK_SIZE,
streams: HashMap::new(),
prev_headers: HashMap::new(),
}
}
pub fn set_rx_chunk_size(&mut self, size: u32) {
self.rx_chunk_size = size.min(MAX_CHUNK_SIZE);
}
pub fn set_tx_chunk_size(&mut self, size: u32) {
self.tx_chunk_size = size.min(MAX_CHUNK_SIZE);
}
#[must_use]
pub const fn rx_chunk_size(&self) -> u32 {
self.rx_chunk_size
}
#[must_use]
pub const fn tx_chunk_size(&self) -> u32 {
self.tx_chunk_size
}
pub fn process_chunk(&mut self, data: &[u8]) -> NetResult<Vec<AssembledMessage>> {
let mut messages = Vec::new();
let mut buf = data;
while !buf.is_empty() {
let header = ChunkHeader::decode(&mut buf, &self.prev_headers)?;
let csid = header.chunk_stream_id;
let state = self
.streams
.entry(csid)
.or_insert_with(ChunkStreamState::new);
if !matches!(header.format, ChunkHeaderType::Continuation) || state.message_length == 0
{
state.reset();
state.message_length = header.message.message_length;
state.message_buffer.reserve(state.message_length as usize);
}
let remaining = state.message_length - state.bytes_received;
let chunk_size = remaining.min(self.rx_chunk_size);
if buf.len() < chunk_size as usize {
return Err(NetError::parse(0, "Incomplete chunk payload"));
}
state.message_buffer.put_slice(&buf[..chunk_size as usize]);
state.bytes_received += chunk_size;
buf = &buf[chunk_size as usize..];
if state.bytes_received >= state.message_length {
messages.push(AssembledMessage {
chunk_stream_id: csid,
header: header.message,
payload: state.message_buffer.split().freeze(),
});
state.reset();
}
state.last_header = Some(header.clone());
self.prev_headers.insert(csid, header);
}
Ok(messages)
}
#[must_use]
pub fn encode_message(
&mut self,
chunk_stream_id: u32,
message: &MessageHeader,
payload: &[u8],
) -> Bytes {
let mut buf = BytesMut::new();
let format = match self.prev_headers.get(&chunk_stream_id) {
None => ChunkHeaderType::Full,
Some(prev) => {
if prev.message.message_stream_id != message.message_stream_id {
ChunkHeaderType::Full
} else if prev.message.message_length != message.message_length
|| prev.message.message_type != message.message_type
{
ChunkHeaderType::SameStream
} else if prev.message.timestamp != message.timestamp {
ChunkHeaderType::TimestampOnly
} else {
ChunkHeaderType::Continuation
}
}
};
let header = ChunkHeader::new(format, chunk_stream_id, *message);
header.encode(&mut buf);
let chunk_size = self.tx_chunk_size as usize;
let first_chunk_size = chunk_size.min(payload.len());
buf.put_slice(&payload[..first_chunk_size]);
let mut offset = first_chunk_size;
while offset < payload.len() {
let cont_header =
ChunkHeader::new(ChunkHeaderType::Continuation, chunk_stream_id, *message);
cont_header.encode_basic_header(&mut buf);
if message.needs_extended_timestamp() {
buf.put_u32(message.timestamp);
}
let chunk_end = (offset + chunk_size).min(payload.len());
buf.put_slice(&payload[offset..chunk_end]);
offset = chunk_end;
}
self.prev_headers.insert(chunk_stream_id, header);
buf.freeze()
}
}
impl Default for ChunkStream {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum Amf0Value {
Number(f64),
Boolean(bool),
String(String),
Object(Vec<(String, Amf0Value)>),
Null,
Array(Vec<(String, Amf0Value)>),
}
impl Amf0Value {
pub fn encode_into(&self, buf: &mut BytesMut) {
match self {
Self::Number(n) => {
buf.put_u8(0x00); buf.put_f64(*n);
}
Self::Boolean(b) => {
buf.put_u8(0x01); buf.put_u8(u8::from(*b));
}
Self::String(s) => {
let bytes = s.as_bytes();
if bytes.len() <= 0xFFFF {
buf.put_u8(0x02); buf.put_u16(bytes.len() as u16);
} else {
buf.put_u8(0x0C); buf.put_u32(bytes.len() as u32);
}
buf.put_slice(bytes);
}
Self::Object(props) => {
buf.put_u8(0x03); for (key, val) in props {
let key_bytes = key.as_bytes();
buf.put_u16(key_bytes.len() as u16);
buf.put_slice(key_bytes);
val.encode_into(buf);
}
buf.put_u16(0);
buf.put_u8(0x09);
}
Self::Null => {
buf.put_u8(0x05); }
Self::Array(props) => {
buf.put_u8(0x08); buf.put_u32(props.len() as u32);
for (key, val) in props {
let key_bytes = key.as_bytes();
buf.put_u16(key_bytes.len() as u16);
buf.put_slice(key_bytes);
val.encode_into(buf);
}
buf.put_u16(0);
buf.put_u8(0x09);
}
}
}
#[must_use]
pub fn encode(&self) -> Bytes {
let mut buf = BytesMut::new();
self.encode_into(&mut buf);
buf.freeze()
}
}
#[derive(Debug)]
pub struct ChunkDecoder {
buffer: BytesMut,
streams: HashMap<u32, ChunkStreamState>,
prev_headers: HashMap<u32, ChunkHeader>,
chunk_size: u32,
}
impl ChunkDecoder {
#[must_use]
pub fn new() -> Self {
Self {
buffer: BytesMut::new(),
streams: HashMap::new(),
prev_headers: HashMap::new(),
chunk_size: DEFAULT_CHUNK_SIZE,
}
}
pub fn set_chunk_size(&mut self, size: u32) {
self.chunk_size = size.min(MAX_CHUNK_SIZE);
}
#[must_use]
pub const fn chunk_size(&self) -> u32 {
self.chunk_size
}
pub fn decode(&mut self, data: &[u8]) -> NetResult<Vec<AssembledMessage>> {
self.buffer.put_slice(data);
let mut messages = Vec::new();
loop {
let snapshot = self.buffer.len();
match self.try_decode_one() {
Ok(Some(msg)) => {
if msg.header.message_type == 1 && msg.payload.len() >= 4 {
let p = msg.payload.as_ref();
let new_size = (u32::from(p[0]) << 24
| u32::from(p[1]) << 16
| u32::from(p[2]) << 8
| u32::from(p[3]))
& 0x7FFF_FFFF;
if new_size > 0 {
self.chunk_size = new_size.min(MAX_CHUNK_SIZE);
}
}
messages.push(msg);
}
Ok(None) => {
if self.buffer.len() >= snapshot {
break;
}
}
Err(e) => return Err(e),
}
if self.buffer.len() == snapshot {
break;
}
}
Ok(messages)
}
fn try_decode_one(&mut self) -> NetResult<Option<AssembledMessage>> {
if self.buffer.is_empty() {
return Ok(None);
}
let buf_ref: &[u8] = &self.buffer;
if buf_ref.is_empty() {
return Ok(None);
}
let first = buf_ref[0];
let fmt = first >> 6;
let csid_marker = first & 0x3F;
let basic_header_size: usize = match csid_marker {
0 => 2,
1 => 3,
_ => 1,
};
let msg_header_size: usize = match fmt {
0 => 11,
1 => 7,
2 => 3,
_ => 0,
};
let min_needed = basic_header_size + msg_header_size;
if self.buffer.len() < min_needed {
return Ok(None);
}
let needs_ext_ts = if msg_header_size >= 3 {
let ts_start = basic_header_size;
let ts = (u32::from(buf_ref[ts_start]) << 16)
| (u32::from(buf_ref[ts_start + 1]) << 8)
| u32::from(buf_ref[ts_start + 2]);
ts == 0x00FF_FFFF
} else {
let csid = self.compute_csid(buf_ref);
self.prev_headers
.get(&csid)
.and_then(|h| h.extended_timestamp)
.is_some()
};
let ext_ts_size = if needs_ext_ts { 4 } else { 0 };
let total_header_size = min_needed + ext_ts_size;
if self.buffer.len() < total_header_size {
return Ok(None);
}
let header_bytes = self.buffer.clone().freeze();
let mut slice: &[u8] = &header_bytes;
let header = ChunkHeader::decode(&mut slice, &self.prev_headers)?;
let consumed_header = total_header_size;
let csid = header.chunk_stream_id;
let state = self
.streams
.entry(csid)
.or_insert_with(ChunkStreamState::new);
if !matches!(header.format, ChunkHeaderType::Continuation) || state.message_length == 0 {
state.reset();
state.message_length = header.message.message_length;
}
let remaining = state.message_length.saturating_sub(state.bytes_received);
let chunk_payload_size = remaining.min(self.chunk_size) as usize;
let total_needed = consumed_header + chunk_payload_size;
if self.buffer.len() < total_needed {
return Ok(None);
}
let _ = self.buffer.split_to(consumed_header);
let payload_bytes = self.buffer.split_to(chunk_payload_size);
let state = self
.streams
.entry(csid)
.or_insert_with(ChunkStreamState::new);
state.message_buffer.put_slice(&payload_bytes);
state.bytes_received += chunk_payload_size as u32;
let result = if state.bytes_received >= state.message_length {
let assembled = AssembledMessage {
chunk_stream_id: csid,
header: header.message,
payload: state.message_buffer.split().freeze(),
};
state.reset();
Some(assembled)
} else {
None
};
self.prev_headers.insert(csid, header);
Ok(result)
}
fn compute_csid(&self, buf: &[u8]) -> u32 {
if buf.is_empty() {
return 0;
}
let marker = buf[0] & 0x3F;
match marker {
0 => {
if buf.len() >= 2 {
64 + u32::from(buf[1])
} else {
0
}
}
1 => {
if buf.len() >= 3 {
64 + u32::from(buf[1]) + (u32::from(buf[2]) << 8)
} else {
0
}
}
_ => u32::from(marker),
}
}
}
impl Default for ChunkDecoder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct ChunkEncoder {
chunk_size: u32,
prev_headers: HashMap<u32, ChunkHeader>,
}
impl ChunkEncoder {
#[must_use]
pub fn new() -> Self {
Self {
chunk_size: DEFAULT_CHUNK_SIZE,
prev_headers: HashMap::new(),
}
}
#[must_use]
pub fn with_chunk_size(chunk_size: u32) -> Self {
Self {
chunk_size: chunk_size.min(MAX_CHUNK_SIZE),
prev_headers: HashMap::new(),
}
}
#[must_use]
pub const fn chunk_size(&self) -> u32 {
self.chunk_size
}
pub fn set_chunk_size(&mut self, size: u32) {
self.chunk_size = size.min(MAX_CHUNK_SIZE);
}
pub fn encode(
&mut self,
chunk_stream_id: u32,
message: &MessageHeader,
payload: &[u8],
) -> Bytes {
let mut buf = BytesMut::new();
let format = match self.prev_headers.get(&chunk_stream_id) {
None => ChunkHeaderType::Full,
Some(prev) => {
if prev.message.message_stream_id != message.message_stream_id {
ChunkHeaderType::Full
} else if prev.message.message_length != message.message_length
|| prev.message.message_type != message.message_type
{
ChunkHeaderType::SameStream
} else if prev.message.timestamp != message.timestamp {
ChunkHeaderType::TimestampOnly
} else {
ChunkHeaderType::Continuation
}
}
};
let header = ChunkHeader::new(format, chunk_stream_id, *message);
header.encode(&mut buf);
let chunk_size = self.chunk_size as usize;
let first_end = chunk_size.min(payload.len());
buf.put_slice(&payload[..first_end]);
let mut offset = first_end;
while offset < payload.len() {
let cont = ChunkHeader::new(ChunkHeaderType::Continuation, chunk_stream_id, *message);
cont.encode_basic_header(&mut buf);
if message.needs_extended_timestamp() {
buf.put_u32(message.timestamp);
}
let end = (offset + chunk_size).min(payload.len());
buf.put_slice(&payload[offset..end]);
offset = end;
}
self.prev_headers.insert(chunk_stream_id, header);
buf.freeze()
}
pub fn encode_set_chunk_size(&mut self, new_size: u32) -> Bytes {
self.chunk_size = new_size.min(MAX_CHUNK_SIZE);
let payload_val = new_size & 0x7FFF_FFFF;
let mut payload = BytesMut::with_capacity(4);
payload.put_u32(payload_val);
let msg = MessageHeader::new(0, 4, 1, 0);
self.encode(2, &msg, &payload)
}
}
impl Default for ChunkEncoder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_chunk_header_type() {
assert_eq!(ChunkHeaderType::Full.format_value(), 0);
assert_eq!(ChunkHeaderType::SameStream.format_value(), 1);
assert_eq!(ChunkHeaderType::TimestampOnly.format_value(), 2);
assert_eq!(ChunkHeaderType::Continuation.format_value(), 3);
assert_eq!(ChunkHeaderType::from_format(0), Some(ChunkHeaderType::Full));
assert_eq!(ChunkHeaderType::from_format(4), None);
}
#[test]
fn test_message_header() {
let msg = MessageHeader::new(1000, 500, 20, 1);
assert_eq!(msg.timestamp, 1000);
assert_eq!(msg.message_length, 500);
assert!(!msg.needs_extended_timestamp());
let msg2 = MessageHeader::new(0x00FF_FFFF, 100, 20, 1);
assert!(msg2.needs_extended_timestamp());
}
#[test]
fn test_chunk_header_encode_decode() {
let msg = MessageHeader::new(1000, 100, 20, 1);
let header = ChunkHeader::new(ChunkHeaderType::Full, 3, msg);
let mut buf = BytesMut::new();
header.encode(&mut buf);
let encoded = buf.freeze();
let mut slice = &encoded[..];
let prev = HashMap::new();
let decoded = ChunkHeader::decode(&mut slice, &prev).expect("should succeed in test");
assert_eq!(decoded.chunk_stream_id, 3);
assert_eq!(decoded.message.timestamp, 1000);
assert_eq!(decoded.message.message_length, 100);
assert_eq!(decoded.message.message_type, 20);
}
#[test]
fn test_chunk_header_large_csid() {
let msg = MessageHeader::new(0, 100, 1, 0);
let header = ChunkHeader::new(ChunkHeaderType::Full, 100, msg);
let mut buf = BytesMut::new();
header.encode(&mut buf);
let encoded = buf.freeze();
let mut slice = &encoded[..];
let prev = HashMap::new();
let decoded = ChunkHeader::decode(&mut slice, &prev).expect("should succeed in test");
assert_eq!(decoded.chunk_stream_id, 100);
}
#[test]
fn test_chunk_stream_encode_decode() {
let mut cs = ChunkStream::new();
cs.set_tx_chunk_size(128);
cs.set_rx_chunk_size(128);
let msg = MessageHeader::new(0, 50, 20, 1);
let payload = vec![1u8; 50];
let encoded = cs.encode_message(3, &msg, &payload);
let messages = cs.process_chunk(&encoded).expect("should succeed in test");
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].payload.len(), 50);
}
#[test]
fn test_chunk_stream_large_message() {
let mut cs = ChunkStream::new();
cs.set_tx_chunk_size(128);
cs.set_rx_chunk_size(128);
let msg = MessageHeader::new(0, 300, 20, 1);
let payload = vec![0xAB; 300];
let encoded = cs.encode_message(3, &msg, &payload);
let messages = cs.process_chunk(&encoded).expect("should succeed in test");
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].payload.len(), 300);
assert_eq!(&messages[0].payload[..], &payload[..]);
}
#[test]
fn test_extended_timestamp() {
let msg = MessageHeader::new(0x01000000, 10, 1, 0);
let header = ChunkHeader::new(ChunkHeaderType::Full, 3, msg);
assert!(header.extended_timestamp.is_some());
assert_eq!(header.timestamp(), 0x01000000);
}
#[test]
fn test_chunk_decoder_simple_message() {
let mut encoder = ChunkEncoder::with_chunk_size(128);
let msg = MessageHeader::new(100, 32, 20, 1);
let payload = vec![0xBB; 32];
let encoded = encoder.encode(3, &msg, &payload);
let mut decoder = ChunkDecoder::new();
let messages = decoder.decode(&encoded).expect("should succeed in test");
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].payload.len(), 32);
assert_eq!(&messages[0].payload[..], &payload[..]);
}
#[test]
fn test_chunk_decoder_large_message() {
let mut encoder = ChunkEncoder::with_chunk_size(128);
let msg = MessageHeader::new(0, 512, 9, 1);
let payload = (0u8..=255u8).cycle().take(512).collect::<Vec<_>>();
let encoded = encoder.encode(4, &msg, &payload);
let mut decoder = ChunkDecoder::new();
let messages = decoder.decode(&encoded).expect("should succeed in test");
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].payload.len(), 512);
assert_eq!(&messages[0].payload[..], &payload[..]);
}
#[test]
fn test_chunk_decoder_incremental_feed() {
let mut encoder = ChunkEncoder::with_chunk_size(64);
let msg = MessageHeader::new(500, 128, 8, 1);
let payload = vec![0xCC; 128];
let encoded = encoder.encode(3, &msg, &payload);
let mut decoder = ChunkDecoder::new();
decoder.set_chunk_size(64);
let mut all_messages = Vec::new();
for chunk in encoded.chunks(10) {
let msgs = decoder.decode(chunk).expect("should succeed in test");
all_messages.extend(msgs);
}
assert_eq!(all_messages.len(), 1);
assert_eq!(&all_messages[0].payload[..], &payload[..]);
}
#[test]
fn test_chunk_decoder_multiple_messages() {
let mut encoder = ChunkEncoder::with_chunk_size(128);
let msg1 = MessageHeader::new(0, 20, 8, 1);
let payload1 = vec![0x11; 20];
let msg2 = MessageHeader::new(100, 20, 9, 1);
let payload2 = vec![0x22; 20];
let mut combined = Vec::new();
combined.extend_from_slice(&encoder.encode(3, &msg1, &payload1));
combined.extend_from_slice(&encoder.encode(4, &msg2, &payload2));
let mut decoder = ChunkDecoder::new();
let messages = decoder.decode(&combined).expect("should succeed in test");
assert_eq!(messages.len(), 2);
}
#[test]
fn test_chunk_decoder_set_chunk_size() {
let mut decoder = ChunkDecoder::new();
assert_eq!(decoder.chunk_size(), DEFAULT_CHUNK_SIZE);
decoder.set_chunk_size(4096);
assert_eq!(decoder.chunk_size(), 4096);
decoder.set_chunk_size(MAX_CHUNK_SIZE + 1000);
assert_eq!(decoder.chunk_size(), MAX_CHUNK_SIZE);
}
#[test]
fn test_chunk_decoder_empty_input() {
let mut decoder = ChunkDecoder::new();
let messages = decoder.decode(&[]).expect("should succeed in test");
assert!(messages.is_empty());
}
#[test]
fn test_chunk_encoder_default_chunk_size() {
let enc = ChunkEncoder::new();
assert_eq!(enc.chunk_size(), DEFAULT_CHUNK_SIZE);
}
#[test]
fn test_chunk_encoder_with_chunk_size() {
let enc = ChunkEncoder::with_chunk_size(4096);
assert_eq!(enc.chunk_size(), 4096);
}
#[test]
fn test_chunk_encoder_set_chunk_size_capped() {
let mut enc = ChunkEncoder::new();
enc.set_chunk_size(MAX_CHUNK_SIZE + 1);
assert_eq!(enc.chunk_size(), MAX_CHUNK_SIZE);
}
#[test]
fn test_chunk_encoder_encode_small_payload() {
let mut enc = ChunkEncoder::with_chunk_size(256);
let msg = MessageHeader::new(0, 10, 20, 1);
let payload = vec![0xAA; 10];
let encoded = enc.encode(3, &msg, &payload);
let mut dec = ChunkDecoder::new();
dec.set_chunk_size(256);
let messages = dec.decode(&encoded).expect("should succeed in test");
assert_eq!(messages.len(), 1);
assert_eq!(&messages[0].payload[..], &payload[..]);
}
#[test]
fn test_chunk_encoder_header_compression() {
let mut enc = ChunkEncoder::with_chunk_size(512);
let msg1 = MessageHeader::new(0, 10, 8, 1);
let msg2 = MessageHeader::new(33, 10, 8, 1);
let payload = vec![0u8; 10];
let enc1 = enc.encode(3, &msg1, &payload);
let enc2 = enc.encode(3, &msg2, &payload);
assert!(enc2.len() < enc1.len());
}
#[test]
fn test_chunk_encoder_encode_set_chunk_size() {
let mut enc = ChunkEncoder::new();
let encoded = enc.encode_set_chunk_size(4096);
assert!(!encoded.is_empty());
assert_eq!(enc.chunk_size(), 4096);
}
#[test]
fn test_amf0_number_encoding() {
let val = Amf0Value::Number(42.5);
let encoded = val.encode();
assert_eq!(encoded[0], 0x00); assert_eq!(encoded.len(), 9); }
#[test]
fn test_amf0_boolean_encoding() {
let val_true = Amf0Value::Boolean(true);
let encoded_true = val_true.encode();
assert_eq!(encoded_true[0], 0x01);
assert_eq!(encoded_true[1], 1);
let val_false = Amf0Value::Boolean(false);
let encoded_false = val_false.encode();
assert_eq!(encoded_false[1], 0);
}
#[test]
fn test_amf0_string_encoding() {
let val = Amf0Value::String("hello".to_string());
let encoded = val.encode();
assert_eq!(encoded[0], 0x02); assert_eq!(encoded[1], 0); assert_eq!(encoded[2], 5); assert_eq!(&encoded[3..], b"hello");
}
#[test]
fn test_amf0_null_encoding() {
let val = Amf0Value::Null;
let encoded = val.encode();
assert_eq!(encoded[0], 0x05);
assert_eq!(encoded.len(), 1);
}
#[test]
fn test_amf0_object_encoding() {
let props = vec![
("width".to_string(), Amf0Value::Number(1920.0)),
("height".to_string(), Amf0Value::Number(1080.0)),
];
let val = Amf0Value::Object(props);
let encoded = val.encode();
assert_eq!(encoded[0], 0x03); assert_eq!(*encoded.last().expect("should succeed in test"), 0x09);
}
#[test]
fn test_amf0_array_encoding() {
let props = vec![("fps".to_string(), Amf0Value::Number(30.0))];
let val = Amf0Value::Array(props);
let encoded = val.encode();
assert_eq!(encoded[0], 0x08); assert_eq!(encoded[1], 0); assert_eq!(encoded[2], 0);
assert_eq!(encoded[3], 0);
assert_eq!(encoded[4], 1); }
#[test]
fn test_amf0_encode_into_buf() {
let mut buf = BytesMut::new();
let num = Amf0Value::Number(1.0);
let s = Amf0Value::String("test".to_string());
num.encode_into(&mut buf);
s.encode_into(&mut buf);
assert_eq!(buf.len(), 9 + 7);
}
}