use crate::chunk::Message;
use crate::error::{Error, Result};
use crate::message::MSG_AGGREGATE;
pub const SUB_HEADER_SIZE: usize = 11;
pub const BACK_POINTER_SIZE: usize = 4;
const UI24_MAX: u32 = 0x00FF_FFFF;
pub fn parse_aggregate(msg: &Message) -> Result<Vec<Message>> {
if msg.msg_type_id != MSG_AGGREGATE {
return Err(Error::InvalidChunk(format!(
"aggregate: expected message type {MSG_AGGREGATE}, got {}",
msg.msg_type_id
)));
}
let body = &msg.payload[..];
let mut pos = 0usize;
let mut subs: Vec<Message> = Vec::new();
let mut offset: Option<i64> = None;
while pos < body.len() {
if pos + SUB_HEADER_SIZE > body.len() {
return Err(Error::UnexpectedEof);
}
let tag_type = body[pos];
let data_size =
((body[pos + 1] as u32) << 16) | ((body[pos + 2] as u32) << 8) | (body[pos + 3] as u32);
let ts_lo =
((body[pos + 4] as u32) << 16) | ((body[pos + 5] as u32) << 8) | (body[pos + 6] as u32);
let ts_hi = body[pos + 7] as u32;
let wire_ts = (ts_hi << 24) | ts_lo;
let _wire_stream_id = ((body[pos + 8] as u32) << 16)
| ((body[pos + 9] as u32) << 8)
| (body[pos + 10] as u32);
pos += SUB_HEADER_SIZE;
let payload_end = pos
.checked_add(data_size as usize)
.ok_or_else(|| Error::InvalidChunk("aggregate: sub payload length overflow".into()))?;
if payload_end > body.len() {
return Err(Error::UnexpectedEof);
}
let payload = body[pos..payload_end].to_vec();
pos = payload_end;
if pos + BACK_POINTER_SIZE > body.len() {
return Err(Error::UnexpectedEof);
}
let prev_tag_size =
u32::from_be_bytes([body[pos], body[pos + 1], body[pos + 2], body[pos + 3]]);
pos += BACK_POINTER_SIZE;
let expected = (SUB_HEADER_SIZE as u32).saturating_add(data_size);
if prev_tag_size != expected {
return Err(Error::InvalidChunk(format!(
"aggregate: back pointer {prev_tag_size} != 11 + DataSize {expected} (§7.1.6 / §E.3)"
)));
}
let off = match offset {
Some(o) => o,
None => {
let o = (msg.timestamp as i64) - (wire_ts as i64);
offset = Some(o);
o
}
};
let normalized = (wire_ts as i64).wrapping_add(off) as u32;
subs.push(Message {
msg_type_id: tag_type,
msg_stream_id: msg.msg_stream_id,
timestamp: normalized,
payload,
});
}
Ok(subs)
}
pub fn build_aggregate(stream_id: u32, subs: &[Message]) -> Result<Message> {
let agg_ts = subs.first().map(|s| s.timestamp).unwrap_or(0);
let mut body: Vec<u8> = Vec::new();
for sub in subs {
if sub.payload.len() > UI24_MAX as usize {
return Err(Error::InvalidChunk(format!(
"aggregate: sub payload {} exceeds UI24 max {}",
sub.payload.len(),
UI24_MAX
)));
}
let data_size = sub.payload.len() as u32;
let inc = (SUB_HEADER_SIZE as u64) + (data_size as u64) + (BACK_POINTER_SIZE as u64);
if (body.len() as u64).saturating_add(inc) > u32::MAX as u64 {
return Err(Error::InvalidChunk(
"aggregate: cumulative body would exceed u32".into(),
));
}
body.push(sub.msg_type_id);
body.push(((data_size >> 16) & 0xFF) as u8);
body.push(((data_size >> 8) & 0xFF) as u8);
body.push((data_size & 0xFF) as u8);
let ts = sub.timestamp;
body.push(((ts >> 16) & 0xFF) as u8);
body.push(((ts >> 8) & 0xFF) as u8);
body.push((ts & 0xFF) as u8);
body.push(((ts >> 24) & 0xFF) as u8);
body.push(0);
body.push(0);
body.push(0);
body.extend_from_slice(&sub.payload);
let prev_tag_size = (SUB_HEADER_SIZE as u32).saturating_add(data_size);
body.extend_from_slice(&prev_tag_size.to_be_bytes());
}
Ok(Message {
msg_type_id: MSG_AGGREGATE,
msg_stream_id: stream_id,
timestamp: agg_ts,
payload: body,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::{MSG_AUDIO, MSG_DATA_AMF0, MSG_VIDEO};
#[test]
fn round_trip_three_subs_no_offset() {
let subs = vec![
Message {
msg_type_id: MSG_VIDEO,
msg_stream_id: 999,
timestamp: 1000,
payload: vec![0x17, 0x01, 0xAA, 0xBB, 0xCC],
},
Message {
msg_type_id: MSG_AUDIO,
msg_stream_id: 999,
timestamp: 1020,
payload: vec![0xAF, 0x01, 0xDE, 0xAD, 0xBE, 0xEF],
},
Message {
msg_type_id: MSG_VIDEO,
msg_stream_id: 999,
timestamp: 1040,
payload: vec![0x27, 0x01, 0x42],
},
];
let agg = build_aggregate(7, &subs).unwrap();
assert_eq!(agg.msg_type_id, MSG_AGGREGATE);
assert_eq!(agg.msg_stream_id, 7);
assert_eq!(agg.timestamp, 1000);
let parsed = parse_aggregate(&agg).unwrap();
assert_eq!(parsed.len(), 3);
for (got, want) in parsed.iter().zip(subs.iter()) {
assert_eq!(got.msg_type_id, want.msg_type_id);
assert_eq!(got.timestamp, want.timestamp);
assert_eq!(got.payload, want.payload);
assert_eq!(got.msg_stream_id, 7);
}
}
#[test]
fn timestamp_renormalisation_shifts_every_sub() {
let mut body: Vec<u8> = Vec::new();
for (ty, ts, pld) in &[
(MSG_VIDEO, 100u32, vec![0xAAu8]),
(MSG_AUDIO, 120u32, vec![0xBBu8, 0xCC]),
] {
body.push(*ty);
body.push(0);
body.push(0);
body.push(pld.len() as u8);
body.push(((ts >> 16) & 0xFF) as u8);
body.push(((ts >> 8) & 0xFF) as u8);
body.push((ts & 0xFF) as u8);
body.push(((ts >> 24) & 0xFF) as u8);
body.push(0);
body.push(0);
body.push(0);
body.extend_from_slice(pld);
let prev = 11u32 + pld.len() as u32;
body.extend_from_slice(&prev.to_be_bytes());
}
let agg = Message {
msg_type_id: MSG_AGGREGATE,
msg_stream_id: 5,
timestamp: 5000, payload: body,
};
let parsed = parse_aggregate(&agg).unwrap();
assert_eq!(parsed.len(), 2);
assert_eq!(parsed[0].timestamp, 5000);
assert_eq!(parsed[1].timestamp, 120 + 4900);
assert_eq!(parsed[0].msg_stream_id, 5);
assert_eq!(parsed[1].msg_stream_id, 5);
}
#[test]
fn empty_aggregate_round_trip() {
let agg = build_aggregate(3, &[]).unwrap();
assert!(agg.payload.is_empty());
assert_eq!(agg.timestamp, 0);
let parsed = parse_aggregate(&agg).unwrap();
assert!(parsed.is_empty());
}
#[test]
fn parse_rejects_wrong_outer_type() {
let msg = Message {
msg_type_id: MSG_VIDEO,
msg_stream_id: 1,
timestamp: 0,
payload: vec![],
};
match parse_aggregate(&msg) {
Err(Error::InvalidChunk(_)) => {}
other => panic!("expected InvalidChunk, got {other:?}"),
}
}
#[test]
fn truncated_sub_header_is_eof() {
let agg = Message {
msg_type_id: MSG_AGGREGATE,
msg_stream_id: 0,
timestamp: 0,
payload: vec![0x09, 0x00, 0x00, 0x01, 0x00],
};
match parse_aggregate(&agg) {
Err(Error::UnexpectedEof) => {}
other => panic!("expected UnexpectedEof, got {other:?}"),
}
}
#[test]
fn truncated_sub_payload_is_eof() {
let mut body = vec![MSG_VIDEO, 0, 0, 10];
body.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0]); body.extend_from_slice(&[0xAA, 0xBB, 0xCC]);
let agg = Message {
msg_type_id: MSG_AGGREGATE,
msg_stream_id: 0,
timestamp: 0,
payload: body,
};
match parse_aggregate(&agg) {
Err(Error::UnexpectedEof) => {}
other => panic!("expected UnexpectedEof, got {other:?}"),
}
}
#[test]
fn truncated_back_pointer_is_eof() {
let mut body = vec![MSG_VIDEO, 0, 0, 1];
body.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0]); body.push(0xAA); body.extend_from_slice(&[0, 0]); let agg = Message {
msg_type_id: MSG_AGGREGATE,
msg_stream_id: 0,
timestamp: 0,
payload: body,
};
match parse_aggregate(&agg) {
Err(Error::UnexpectedEof) => {}
other => panic!("expected UnexpectedEof, got {other:?}"),
}
}
#[test]
fn mismatched_back_pointer_is_invalid_chunk() {
let mut body = vec![MSG_VIDEO, 0, 0, 1];
body.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0]); body.push(0xAA); body.extend_from_slice(&999u32.to_be_bytes()); let agg = Message {
msg_type_id: MSG_AGGREGATE,
msg_stream_id: 0,
timestamp: 0,
payload: body,
};
match parse_aggregate(&agg) {
Err(Error::InvalidChunk(_)) => {}
other => panic!("expected InvalidChunk, got {other:?}"),
}
}
#[test]
fn build_emits_zero_stream_ids_on_sub_headers() {
let sub = Message {
msg_type_id: MSG_DATA_AMF0,
msg_stream_id: 0xABCDEF, timestamp: 50,
payload: vec![0x02, 0x00, 0x04, b'P', b'i', b'n', b'g'],
};
let agg = build_aggregate(7, &[sub]).unwrap();
let sid_bytes = &agg.payload[8..11];
assert_eq!(sid_bytes, &[0, 0, 0]);
let parsed = parse_aggregate(&agg).unwrap();
assert_eq!(parsed[0].msg_stream_id, 7);
}
#[test]
fn build_sets_outer_timestamp_to_first_sub() {
let subs = vec![
Message {
msg_type_id: MSG_VIDEO,
msg_stream_id: 0,
timestamp: 12345,
payload: vec![0x17],
},
Message {
msg_type_id: MSG_AUDIO,
msg_stream_id: 0,
timestamp: 12365,
payload: vec![0xAF],
},
];
let agg = build_aggregate(1, &subs).unwrap();
assert_eq!(agg.timestamp, 12345);
let parsed = parse_aggregate(&agg).unwrap();
assert_eq!(parsed[0].timestamp, 12345);
assert_eq!(parsed[1].timestamp, 12365);
}
#[test]
fn many_small_subs_round_trip() {
let subs: Vec<Message> = (0..100)
.map(|i| Message {
msg_type_id: MSG_VIDEO,
msg_stream_id: 0,
timestamp: i * 33,
payload: vec![i as u8; 4],
})
.collect();
let agg = build_aggregate(1, &subs).unwrap();
let parsed = parse_aggregate(&agg).unwrap();
assert_eq!(parsed.len(), subs.len());
for (got, want) in parsed.iter().zip(subs.iter()) {
assert_eq!(got.msg_type_id, want.msg_type_id);
assert_eq!(got.timestamp, want.timestamp);
assert_eq!(got.payload, want.payload);
}
}
#[test]
fn build_rejects_oversize_sub_payload() {
let oversize = Message {
msg_type_id: MSG_VIDEO,
msg_stream_id: 0,
timestamp: 0,
payload: vec![0u8; (UI24_MAX as usize) + 1],
};
match build_aggregate(0, &[oversize]) {
Err(Error::InvalidChunk(_)) => {}
other => panic!("expected InvalidChunk, got {other:?}"),
}
}
#[test]
fn parse_rejects_overflowing_data_size() {
let mut body = vec![MSG_VIDEO, 0xFF, 0xFF, 0xFF];
body.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0]);
let agg = Message {
msg_type_id: MSG_AGGREGATE,
msg_stream_id: 0,
timestamp: 0,
payload: body,
};
match parse_aggregate(&agg) {
Err(Error::UnexpectedEof) => {}
other => panic!("expected UnexpectedEof, got {other:?}"),
}
}
#[test]
fn fuzz_random_bodies_never_panic() {
let mut state: u32 = 0xCAFEF00D;
let mut next = || {
state ^= state << 13;
state ^= state >> 17;
state ^= state << 5;
state
};
for _ in 0..1024 {
let len = (next() % 512) as usize;
let mut body = vec![0u8; len];
for byte in body.iter_mut() {
*byte = (next() & 0xFF) as u8;
}
let agg = Message {
msg_type_id: MSG_AGGREGATE,
msg_stream_id: next(),
timestamp: next(),
payload: body,
};
let _ = parse_aggregate(&agg);
}
}
}