use crate::amf::{encode_command, Amf0Value};
use crate::caps::ConnectCapabilities;
use crate::chunk::Message;
use crate::error::{Error, Result};
pub const MSG_SET_CHUNK_SIZE: u8 = 1;
pub const MSG_ABORT: u8 = 2;
pub const MSG_ACK: u8 = 3;
pub const MSG_USER_CONTROL: u8 = 4;
pub const MSG_WINDOW_ACK_SIZE: u8 = 5;
pub const MSG_SET_PEER_BANDWIDTH: u8 = 6;
pub const MSG_AUDIO: u8 = 8;
pub const MSG_VIDEO: u8 = 9;
pub const MSG_DATA_AMF3: u8 = 15;
pub const MSG_SHARED_OBJECT_AMF3: u8 = 16;
pub const MSG_COMMAND_AMF3: u8 = 17;
pub const MSG_DATA_AMF0: u8 = 18;
pub const MSG_SHARED_OBJECT_AMF0: u8 = 19;
pub const MSG_COMMAND_AMF0: u8 = 20;
pub const MSG_AGGREGATE: u8 = 22;
pub const USR_STREAM_BEGIN: u16 = 0;
pub const USR_STREAM_EOF: u16 = 1;
pub const USR_STREAM_DRY: u16 = 2;
pub const USR_SET_BUFFER_LENGTH: u16 = 3;
pub const USR_STREAM_IS_RECORDED: u16 = 4;
pub const USR_PING_REQUEST: u16 = 6;
pub const USR_PING_RESPONSE: u16 = 7;
pub const CSID_PROTOCOL_CONTROL: u32 = 2;
pub const CSID_COMMAND: u32 = 3;
pub const CSID_AUDIO: u32 = 4;
pub const CSID_VIDEO: u32 = 5;
pub const CSID_DATA: u32 = 6;
pub fn build_set_chunk_size(size: u32) -> Message {
let size = size & 0x7FFF_FFFF;
Message {
msg_type_id: MSG_SET_CHUNK_SIZE,
msg_stream_id: 0,
timestamp: 0,
payload: size.to_be_bytes().to_vec(),
}
}
pub fn build_abort(chunk_stream_id: u32) -> Message {
Message {
msg_type_id: MSG_ABORT,
msg_stream_id: 0,
timestamp: 0,
payload: chunk_stream_id.to_be_bytes().to_vec(),
}
}
pub fn build_window_ack_size(size: u32) -> Message {
Message {
msg_type_id: MSG_WINDOW_ACK_SIZE,
msg_stream_id: 0,
timestamp: 0,
payload: size.to_be_bytes().to_vec(),
}
}
pub fn build_set_peer_bandwidth(size: u32, limit_type: u8) -> Message {
let mut p = Vec::with_capacity(5);
p.extend_from_slice(&size.to_be_bytes());
p.push(limit_type);
Message {
msg_type_id: MSG_SET_PEER_BANDWIDTH,
msg_stream_id: 0,
timestamp: 0,
payload: p,
}
}
pub fn build_user_control_stream_begin(stream_id: u32) -> Message {
let mut p = Vec::with_capacity(6);
p.extend_from_slice(&USR_STREAM_BEGIN.to_be_bytes());
p.extend_from_slice(&stream_id.to_be_bytes());
Message {
msg_type_id: MSG_USER_CONTROL,
msg_stream_id: 0,
timestamp: 0,
payload: p,
}
}
pub fn build_user_control_stream_eof(stream_id: u32) -> Message {
let mut p = Vec::with_capacity(6);
p.extend_from_slice(&USR_STREAM_EOF.to_be_bytes());
p.extend_from_slice(&stream_id.to_be_bytes());
Message {
msg_type_id: MSG_USER_CONTROL,
msg_stream_id: 0,
timestamp: 0,
payload: p,
}
}
pub fn build_user_control_stream_dry(stream_id: u32) -> Message {
let mut p = Vec::with_capacity(6);
p.extend_from_slice(&USR_STREAM_DRY.to_be_bytes());
p.extend_from_slice(&stream_id.to_be_bytes());
Message {
msg_type_id: MSG_USER_CONTROL,
msg_stream_id: 0,
timestamp: 0,
payload: p,
}
}
pub fn build_user_control_set_buffer_length(stream_id: u32, buffer_ms: u32) -> Message {
let mut p = Vec::with_capacity(10);
p.extend_from_slice(&USR_SET_BUFFER_LENGTH.to_be_bytes());
p.extend_from_slice(&stream_id.to_be_bytes());
p.extend_from_slice(&buffer_ms.to_be_bytes());
Message {
msg_type_id: MSG_USER_CONTROL,
msg_stream_id: 0,
timestamp: 0,
payload: p,
}
}
pub fn build_user_control_stream_is_recorded(stream_id: u32) -> Message {
let mut p = Vec::with_capacity(6);
p.extend_from_slice(&USR_STREAM_IS_RECORDED.to_be_bytes());
p.extend_from_slice(&stream_id.to_be_bytes());
Message {
msg_type_id: MSG_USER_CONTROL,
msg_stream_id: 0,
timestamp: 0,
payload: p,
}
}
pub fn build_user_control_ping_request(timestamp_ms: u32) -> Message {
let mut p = Vec::with_capacity(6);
p.extend_from_slice(&USR_PING_REQUEST.to_be_bytes());
p.extend_from_slice(×tamp_ms.to_be_bytes());
Message {
msg_type_id: MSG_USER_CONTROL,
msg_stream_id: 0,
timestamp: 0,
payload: p,
}
}
pub fn build_user_control_ping_response(timestamp_ms: u32) -> Message {
let mut p = Vec::with_capacity(6);
p.extend_from_slice(&USR_PING_RESPONSE.to_be_bytes());
p.extend_from_slice(×tamp_ms.to_be_bytes());
Message {
msg_type_id: MSG_USER_CONTROL,
msg_stream_id: 0,
timestamp: 0,
payload: p,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum UserControlEvent {
StreamBegin { stream_id: u32 },
StreamEof { stream_id: u32 },
StreamDry { stream_id: u32 },
SetBufferLength { stream_id: u32, buffer_ms: u32 },
StreamIsRecorded { stream_id: u32 },
PingRequest { timestamp_ms: u32 },
PingResponse { timestamp_ms: u32 },
Unknown { event_type: u16, data: Vec<u8> },
}
impl UserControlEvent {
pub fn parse(payload: &[u8]) -> Result<Self> {
if payload.len() < 2 {
return Err(Error::ProtocolViolation(
"UserControl: payload < 2 bytes (need event type)".into(),
));
}
let event_type = u16::from_be_bytes([payload[0], payload[1]]);
let data = &payload[2..];
match event_type {
USR_STREAM_BEGIN => Ok(Self::StreamBegin {
stream_id: read_u32_be(data, "StreamBegin")?,
}),
USR_STREAM_EOF => Ok(Self::StreamEof {
stream_id: read_u32_be(data, "StreamEOF")?,
}),
USR_STREAM_DRY => Ok(Self::StreamDry {
stream_id: read_u32_be(data, "StreamDry")?,
}),
USR_SET_BUFFER_LENGTH => {
if data.len() < 8 {
return Err(Error::ProtocolViolation(format!(
"UserControl SetBufferLength: event data {} < 8 bytes",
data.len()
)));
}
let stream_id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
let buffer_ms = u32::from_be_bytes([data[4], data[5], data[6], data[7]]);
Ok(Self::SetBufferLength {
stream_id,
buffer_ms,
})
}
USR_STREAM_IS_RECORDED => Ok(Self::StreamIsRecorded {
stream_id: read_u32_be(data, "StreamIsRecorded")?,
}),
USR_PING_REQUEST => Ok(Self::PingRequest {
timestamp_ms: read_u32_be(data, "PingRequest")?,
}),
USR_PING_RESPONSE => Ok(Self::PingResponse {
timestamp_ms: read_u32_be(data, "PingResponse")?,
}),
other => Ok(Self::Unknown {
event_type: other,
data: data.to_vec(),
}),
}
}
pub fn event_type(&self) -> u16 {
match self {
Self::StreamBegin { .. } => USR_STREAM_BEGIN,
Self::StreamEof { .. } => USR_STREAM_EOF,
Self::StreamDry { .. } => USR_STREAM_DRY,
Self::SetBufferLength { .. } => USR_SET_BUFFER_LENGTH,
Self::StreamIsRecorded { .. } => USR_STREAM_IS_RECORDED,
Self::PingRequest { .. } => USR_PING_REQUEST,
Self::PingResponse { .. } => USR_PING_RESPONSE,
Self::Unknown { event_type, .. } => *event_type,
}
}
pub fn is_spec_defined(&self) -> bool {
!matches!(self, Self::Unknown { .. })
}
pub fn to_message(&self) -> Message {
match self {
Self::StreamBegin { stream_id } => build_user_control_stream_begin(*stream_id),
Self::StreamEof { stream_id } => build_user_control_stream_eof(*stream_id),
Self::StreamDry { stream_id } => build_user_control_stream_dry(*stream_id),
Self::SetBufferLength {
stream_id,
buffer_ms,
} => build_user_control_set_buffer_length(*stream_id, *buffer_ms),
Self::StreamIsRecorded { stream_id } => {
build_user_control_stream_is_recorded(*stream_id)
}
Self::PingRequest { timestamp_ms } => build_user_control_ping_request(*timestamp_ms),
Self::PingResponse { timestamp_ms } => build_user_control_ping_response(*timestamp_ms),
Self::Unknown { event_type, data } => {
let mut p = Vec::with_capacity(2 + data.len());
p.extend_from_slice(&event_type.to_be_bytes());
p.extend_from_slice(data);
Message {
msg_type_id: MSG_USER_CONTROL,
msg_stream_id: 0,
timestamp: 0,
payload: p,
}
}
}
}
}
fn read_u32_be(data: &[u8], variant: &str) -> Result<u32> {
if data.len() < 4 {
return Err(Error::ProtocolViolation(format!(
"UserControl {variant}: event data {} < 4 bytes",
data.len()
)));
}
Ok(u32::from_be_bytes([data[0], data[1], data[2], data[3]]))
}
pub fn build_ack(bytes_received: u32) -> Message {
Message {
msg_type_id: MSG_ACK,
msg_stream_id: 0,
timestamp: 0,
payload: bytes_received.to_be_bytes().to_vec(),
}
}
pub fn build_connect(transaction_id: f64, app: &str, tc_url: &str, flash_ver: &str) -> Message {
build_connect_with_caps(
transaction_id,
app,
tc_url,
flash_ver,
&ConnectCapabilities::default(),
)
}
pub fn build_connect_with_caps(
transaction_id: f64,
app: &str,
tc_url: &str,
flash_ver: &str,
caps: &ConnectCapabilities,
) -> Message {
let mut pairs: Vec<(String, Amf0Value)> = vec![
("app".into(), Amf0Value::String(app.into())),
("type".into(), Amf0Value::String("nonprivate".into())),
("flashVer".into(), Amf0Value::String(flash_ver.into())),
("tcUrl".into(), Amf0Value::String(tc_url.into())),
("fpad".into(), Amf0Value::Boolean(false)),
("capabilities".into(), Amf0Value::Number(15.0)),
("audioCodecs".into(), Amf0Value::Number(0x0FFF as f64)),
("videoCodecs".into(), Amf0Value::Number(0x00FF as f64)),
("videoFunction".into(), Amf0Value::Number(1.0)),
];
caps.encode_into(&mut pairs);
let payload = encode_command("connect", transaction_id, Amf0Value::Object(pairs), &[]);
Message {
msg_type_id: MSG_COMMAND_AMF0,
msg_stream_id: 0,
timestamp: 0,
payload,
}
}
pub fn build_connect_result(transaction_id: f64) -> Message {
build_connect_result_with_caps(transaction_id, &ConnectCapabilities::default())
}
pub fn build_connect_result_with_caps(transaction_id: f64, caps: &ConnectCapabilities) -> Message {
let props = Amf0Value::Object(vec![
("fmsVer".into(), Amf0Value::String("FMS/3,0,1,123".into())),
("capabilities".into(), Amf0Value::Number(31.0)),
("mode".into(), Amf0Value::Number(1.0)),
]);
let mut info_pairs: Vec<(String, Amf0Value)> = vec![
("level".into(), Amf0Value::String("status".into())),
(
"code".into(),
Amf0Value::String("NetConnection.Connect.Success".into()),
),
(
"description".into(),
Amf0Value::String("Connection accepted.".into()),
),
];
if caps.object_encoding.is_none() {
info_pairs.push(("objectEncoding".into(), Amf0Value::Number(0.0)));
}
caps.encode_into(&mut info_pairs);
let payload = encode_command(
"_result",
transaction_id,
props,
&[Amf0Value::Object(info_pairs)],
);
Message {
msg_type_id: MSG_COMMAND_AMF0,
msg_stream_id: 0,
timestamp: 0,
payload,
}
}
pub fn build_release_stream(transaction_id: f64, stream_name: &str) -> Message {
let payload = encode_command(
"releaseStream",
transaction_id,
Amf0Value::Null,
&[Amf0Value::String(stream_name.into())],
);
Message {
msg_type_id: MSG_COMMAND_AMF0,
msg_stream_id: 0,
timestamp: 0,
payload,
}
}
pub fn build_fc_publish(transaction_id: f64, stream_name: &str) -> Message {
let payload = encode_command(
"FCPublish",
transaction_id,
Amf0Value::Null,
&[Amf0Value::String(stream_name.into())],
);
Message {
msg_type_id: MSG_COMMAND_AMF0,
msg_stream_id: 0,
timestamp: 0,
payload,
}
}
pub fn build_create_stream(transaction_id: f64) -> Message {
let payload = encode_command("createStream", transaction_id, Amf0Value::Null, &[]);
Message {
msg_type_id: MSG_COMMAND_AMF0,
msg_stream_id: 0,
timestamp: 0,
payload,
}
}
pub fn build_create_stream_result(transaction_id: f64, stream_id: f64) -> Message {
let payload = encode_command(
"_result",
transaction_id,
Amf0Value::Null,
&[Amf0Value::Number(stream_id)],
);
Message {
msg_type_id: MSG_COMMAND_AMF0,
msg_stream_id: 0,
timestamp: 0,
payload,
}
}
pub fn build_publish(
transaction_id: f64,
stream_id: u32,
stream_name: &str,
publish_type: &str,
) -> Message {
let payload = encode_command(
"publish",
transaction_id,
Amf0Value::Null,
&[
Amf0Value::String(stream_name.into()),
Amf0Value::String(publish_type.into()),
],
);
Message {
msg_type_id: MSG_COMMAND_AMF0,
msg_stream_id: stream_id,
timestamp: 0,
payload,
}
}
pub fn build_on_status(stream_id: u32, level: &str, code: &str, description: &str) -> Message {
let info = Amf0Value::Object(vec![
("level".into(), Amf0Value::String(level.into())),
("code".into(), Amf0Value::String(code.into())),
("description".into(), Amf0Value::String(description.into())),
]);
let payload = encode_command("onStatus", 0.0, Amf0Value::Null, &[info]);
Message {
msg_type_id: MSG_COMMAND_AMF0,
msg_stream_id: stream_id,
timestamp: 0,
payload,
}
}
pub const RECONNECT_REQUEST_CODE: &str = "NetConnection.Connect.ReconnectRequest";
pub fn build_reconnect_request(tc_url: Option<&str>, description: Option<&str>) -> Message {
let mut props = vec![
(
"code".into(),
Amf0Value::String(RECONNECT_REQUEST_CODE.into()),
),
("level".into(), Amf0Value::String("status".into())),
];
if let Some(desc) = description {
props.push(("description".into(), Amf0Value::String(desc.into())));
}
if let Some(url) = tc_url {
props.push(("tcUrl".into(), Amf0Value::String(url.into())));
}
let payload = encode_command(
"onStatus",
0.0,
Amf0Value::Null,
&[Amf0Value::Object(props)],
);
Message {
msg_type_id: MSG_COMMAND_AMF0,
msg_stream_id: 0,
timestamp: 0,
payload,
}
}
pub fn build_set_data_frame(stream_id: u32, metadata: Amf0Value) -> Message {
let mut payload = Vec::new();
crate::amf::encode(&mut payload, &Amf0Value::String("@setDataFrame".into()));
crate::amf::encode(&mut payload, &Amf0Value::String("onMetaData".into()));
crate::amf::encode(&mut payload, &metadata);
Message {
msg_type_id: MSG_DATA_AMF0,
msg_stream_id: stream_id,
timestamp: 0,
payload,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn user_control_stream_begin_wire_bytes() {
let m = build_user_control_stream_begin(1);
assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
assert_eq!(m.msg_stream_id, 0);
assert_eq!(m.payload, vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x01]);
}
#[test]
fn user_control_stream_eof_wire_bytes() {
let m = build_user_control_stream_eof(7);
assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
assert_eq!(m.msg_stream_id, 0);
assert_eq!(m.timestamp, 0);
assert_eq!(m.payload, vec![0x00, 0x01, 0x00, 0x00, 0x00, 0x07]);
}
#[test]
fn user_control_stream_dry_wire_bytes() {
let m = build_user_control_stream_dry(0x0010_2030);
assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
assert_eq!(m.msg_stream_id, 0);
assert_eq!(m.payload, vec![0x00, 0x02, 0x00, 0x10, 0x20, 0x30]);
}
#[test]
fn user_control_set_buffer_length_wire_bytes() {
let m = build_user_control_set_buffer_length(1, 3000);
assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
assert_eq!(m.msg_stream_id, 0);
assert_eq!(
m.payload,
vec![0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x0B, 0xB8],
);
}
#[test]
fn user_control_stream_is_recorded_wire_bytes() {
let m = build_user_control_stream_is_recorded(5);
assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
assert_eq!(m.msg_stream_id, 0);
assert_eq!(m.payload, vec![0x00, 0x04, 0x00, 0x00, 0x00, 0x05]);
}
#[test]
fn user_control_ping_request_wire_bytes() {
let m = build_user_control_ping_request(0xDEAD_BEEF);
assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
assert_eq!(m.msg_stream_id, 0);
assert_eq!(m.payload, vec![0x00, 0x06, 0xDE, 0xAD, 0xBE, 0xEF]);
}
#[test]
fn user_control_ping_response_wire_bytes() {
let m = build_user_control_ping_response(0xDEAD_BEEF);
assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
assert_eq!(m.msg_stream_id, 0);
assert_eq!(m.payload, vec![0x00, 0x07, 0xDE, 0xAD, 0xBE, 0xEF]);
}
#[test]
fn abort_wire_bytes() {
let m = build_abort(0x0001_0203);
assert_eq!(m.msg_type_id, MSG_ABORT);
assert_eq!(m.msg_stream_id, 0);
assert_eq!(m.timestamp, 0);
assert_eq!(m.payload, vec![0x00, 0x01, 0x02, 0x03]);
}
#[test]
fn connect_with_empty_caps_matches_legacy() {
let legacy = build_connect(1.0, "live", "rtmp://srv/live", "FMLE/3.0");
let caps_empty = build_connect_with_caps(
1.0,
"live",
"rtmp://srv/live",
"FMLE/3.0",
&ConnectCapabilities::default(),
);
assert_eq!(legacy.payload, caps_empty.payload);
}
#[test]
fn connect_with_caps_appends_in_documented_order() {
let mut video = crate::caps::FourCcInfoMap::new();
video.insert("*", crate::caps::FOURCC_INFO_CAN_FORWARD);
let mut audio = crate::caps::FourCcInfoMap::new();
audio.insert("Opus", crate::caps::FOURCC_INFO_CAN_DECODE);
let caps = ConnectCapabilities {
object_encoding: Some(crate::caps::OBJECT_ENCODING_AMF3),
fourcc_list: vec!["av01".into(), "hvc1".into()],
video_fourcc_info_map: video,
audio_fourcc_info_map: audio,
caps_ex: crate::caps::CAPS_EX_RECONNECT | crate::caps::CAPS_EX_MULTITRACK,
};
let msg = build_connect_with_caps(1.0, "live", "rtmp://srv/live", "FMLE/3.0", &caps);
let vals = crate::amf::decode_all(&msg.payload).unwrap();
assert_eq!(vals[0].as_str(), Some("connect"));
let cmd_obj = match &vals[2] {
Amf0Value::Object(p) => p,
other => panic!("expected Object for command object, got {other:?}"),
};
let names: Vec<&str> = cmd_obj.iter().map(|(k, _)| k.as_str()).collect();
let legacy_count = names
.iter()
.position(|n| *n == "videoFunction")
.expect("legacy block must end with videoFunction")
+ 1;
let extras = &names[legacy_count..];
assert_eq!(
extras,
&[
"objectEncoding",
"fourCcList",
"videoFourCcInfoMap",
"audioFourCcInfoMap",
"capsEx",
],
);
}
#[test]
fn connect_result_with_caps_emits_info_block() {
let mut video = crate::caps::FourCcInfoMap::new();
video.insert("hvc1", crate::caps::FOURCC_INFO_CAN_DECODE);
let caps = ConnectCapabilities {
video_fourcc_info_map: video,
caps_ex: crate::caps::CAPS_EX_MULTITRACK | crate::caps::CAPS_EX_MOD_EX,
..Default::default()
};
let msg = build_connect_result_with_caps(1.0, &caps);
let vals = crate::amf::decode_all(&msg.payload).unwrap();
assert_eq!(vals[0].as_str(), Some("_result"));
let info = &vals[3];
assert_eq!(
info.get("code").and_then(Amf0Value::as_str),
Some("NetConnection.Connect.Success"),
);
let parsed = ConnectCapabilities::from_amf0(info);
assert_eq!(parsed.caps_ex, caps.caps_ex);
assert_eq!(parsed.video_fourcc_info_map.get("hvc1"), Some(1));
}
#[test]
fn connect_result_with_empty_caps_matches_legacy() {
let legacy = build_connect_result(7.0);
let empty = build_connect_result_with_caps(7.0, &ConnectCapabilities::default());
assert_eq!(legacy.payload, empty.payload);
}
#[test]
fn user_control_event_parse_recognises_spec_types() {
let cases: &[(&[u8], UserControlEvent)] = &[
(
&[0x00, 0x00, 0x00, 0x00, 0x00, 0x01],
UserControlEvent::StreamBegin { stream_id: 1 },
),
(
&[0x00, 0x01, 0x00, 0x00, 0x00, 0x07],
UserControlEvent::StreamEof { stream_id: 7 },
),
(
&[0x00, 0x02, 0x00, 0x10, 0x20, 0x30],
UserControlEvent::StreamDry {
stream_id: 0x0010_2030,
},
),
(
&[0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x0B, 0xB8],
UserControlEvent::SetBufferLength {
stream_id: 1,
buffer_ms: 3000,
},
),
(
&[0x00, 0x04, 0x00, 0x00, 0x00, 0x05],
UserControlEvent::StreamIsRecorded { stream_id: 5 },
),
(
&[0x00, 0x06, 0xDE, 0xAD, 0xBE, 0xEF],
UserControlEvent::PingRequest {
timestamp_ms: 0xDEAD_BEEF,
},
),
(
&[0x00, 0x07, 0xDE, 0xAD, 0xBE, 0xEF],
UserControlEvent::PingResponse {
timestamp_ms: 0xDEAD_BEEF,
},
),
];
for (wire, expected) in cases {
let parsed = UserControlEvent::parse(wire).expect("parse UCM");
assert_eq!(&parsed, expected);
assert!(parsed.is_spec_defined());
assert_eq!(
parsed.event_type() as usize,
((wire[0] as usize) << 8) | wire[1] as usize
);
}
}
#[test]
fn user_control_event_round_trip_matches_builder_bytes() {
let originals = [
build_user_control_stream_begin(1),
build_user_control_stream_eof(7),
build_user_control_stream_dry(0x0010_2030),
build_user_control_set_buffer_length(1, 3000),
build_user_control_stream_is_recorded(5),
build_user_control_ping_request(0xDEAD_BEEF),
build_user_control_ping_response(0xDEAD_BEEF),
];
for m in &originals {
let parsed = UserControlEvent::parse(&m.payload).expect("parse UCM");
let rebuilt = parsed.to_message();
assert_eq!(rebuilt.msg_type_id, MSG_USER_CONTROL);
assert_eq!(rebuilt.msg_stream_id, 0);
assert_eq!(rebuilt.timestamp, 0);
assert_eq!(rebuilt.payload, m.payload);
}
}
#[test]
fn user_control_event_unknown_preserves_event_type_and_tail() {
let wire: &[u8] = &[0x00, 0x05, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE];
let parsed = UserControlEvent::parse(wire).expect("parse reserved UCM");
assert_eq!(
parsed,
UserControlEvent::Unknown {
event_type: 5,
data: vec![0xAA, 0xBB, 0xCC, 0xDD, 0xEE],
},
);
assert!(!parsed.is_spec_defined());
assert_eq!(parsed.event_type(), 5);
let rebuilt = parsed.to_message();
assert_eq!(rebuilt.payload, wire);
let future: &[u8] = &[0xFF, 0xFE];
let parsed_future = UserControlEvent::parse(future).expect("parse future UCM");
assert_eq!(
parsed_future,
UserControlEvent::Unknown {
event_type: 0xFFFE,
data: Vec::new(),
},
);
assert_eq!(parsed_future.to_message().payload, future);
}
#[test]
fn user_control_event_parse_rejects_truncated_payload() {
assert!(matches!(
UserControlEvent::parse(&[]),
Err(Error::ProtocolViolation(_))
));
assert!(matches!(
UserControlEvent::parse(&[0x00]),
Err(Error::ProtocolViolation(_))
));
for type_byte in [
USR_STREAM_BEGIN,
USR_STREAM_EOF,
USR_STREAM_DRY,
USR_STREAM_IS_RECORDED,
USR_PING_REQUEST,
USR_PING_RESPONSE,
] {
let wire = [(type_byte >> 8) as u8, type_byte as u8, 0x00, 0x00, 0x00];
assert!(matches!(
UserControlEvent::parse(&wire),
Err(Error::ProtocolViolation(_))
));
}
let too_short = [0x00, 0x03, 0, 0, 0, 1, 0, 0, 11];
assert!(matches!(
UserControlEvent::parse(&too_short),
Err(Error::ProtocolViolation(_))
));
}
#[test]
fn reconnect_request_full_info_object() {
let m = build_reconnect_request(
Some("rtmp://foo.mydomain.com:1935/realtimeapp"),
Some("The streaming server is undergoing updates."),
);
assert_eq!(m.msg_type_id, MSG_COMMAND_AMF0);
assert_eq!(m.msg_stream_id, 0, "NetConnection command stream");
let vals = crate::amf::decode_all(&m.payload).unwrap();
assert_eq!(vals[0].as_str(), Some("onStatus"));
assert_eq!(vals[1].as_f64(), Some(0.0), "transaction id 0");
assert_eq!(vals[2], Amf0Value::Null, "no command object");
let info = &vals[3];
assert_eq!(
info.get("code").and_then(Amf0Value::as_str),
Some(RECONNECT_REQUEST_CODE)
);
assert_eq!(
info.get("level").and_then(Amf0Value::as_str),
Some("status")
);
assert_eq!(
info.get("tcUrl").and_then(Amf0Value::as_str),
Some("rtmp://foo.mydomain.com:1935/realtimeapp")
);
assert_eq!(
info.get("description").and_then(Amf0Value::as_str),
Some("The streaming server is undergoing updates.")
);
}
#[test]
fn reconnect_request_minimal_info_object() {
let m = build_reconnect_request(None, None);
let vals = crate::amf::decode_all(&m.payload).unwrap();
let info = &vals[3];
assert_eq!(
info.get("code").and_then(Amf0Value::as_str),
Some(RECONNECT_REQUEST_CODE)
);
assert_eq!(
info.get("level").and_then(Amf0Value::as_str),
Some("status")
);
assert!(info.get("tcUrl").is_none(), "tcUrl omitted when None");
assert!(
info.get("description").is_none(),
"description omitted when None"
);
}
}