#![allow(dead_code)]
#![allow(clippy::doc_markdown)]
#![allow(clippy::similar_names)]
#![allow(clippy::unreadable_literal)]
#![allow(clippy::cast_possible_truncation)]
#![allow(clippy::cast_precision_loss)]
#![allow(clippy::cast_lossless)]
#![allow(clippy::cast_sign_loss)]
#![allow(clippy::match_same_arms)]
#![allow(clippy::many_single_char_names)]
#![allow(clippy::unnecessary_wraps)]
#![allow(clippy::range_plus_one)]
#![allow(clippy::needless_pass_by_value)]
#![allow(clippy::manual_div_ceil)]
#![allow(clippy::comparison_chain)]
#![allow(clippy::unused_self)]
#![allow(clippy::trivially_copy_pass_by_ref)]
#![allow(clippy::missing_errors_doc)]
#![allow(clippy::too_many_arguments)]
#![allow(clippy::struct_excessive_bools)]
#![allow(clippy::needless_range_loop)]
#![allow(clippy::redundant_closure_for_method_calls)]
#![allow(clippy::must_use_candidate)]
#![allow(clippy::should_implement_trait)]
#![allow(clippy::items_after_statements)]
#![allow(clippy::if_not_else)]
#![allow(clippy::format_push_string)]
#![allow(clippy::single_match_else)]
#![allow(clippy::redundant_slicing)]
#![allow(clippy::uninlined_format_args)]
#![allow(clippy::map_unwrap_or)]
#![allow(clippy::derivable_impls)]
#![allow(clippy::assigning_clones)]
#![allow(clippy::if_same_then_else)]
#![allow(clippy::format_collect)]
#![allow(clippy::useless_conversion)]
#![allow(clippy::unused_async)]
#![allow(clippy::identity_op)]
use crate::error::{NetError, NetResult};
use bytes::{Buf, BufMut, Bytes, BytesMut};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum MessageType {
SetChunkSize = 1,
Abort = 2,
Acknowledgement = 3,
UserControl = 4,
WindowAckSize = 5,
SetPeerBandwidth = 6,
Audio = 8,
Video = 9,
DataAmf3 = 15,
SharedObjectAmf3 = 16,
CommandAmf3 = 17,
DataAmf0 = 18,
SharedObjectAmf0 = 19,
CommandAmf0 = 20,
Aggregate = 22,
}
impl MessageType {
#[must_use]
pub const fn from_id(id: u8) -> Option<Self> {
match id {
1 => Some(Self::SetChunkSize),
2 => Some(Self::Abort),
3 => Some(Self::Acknowledgement),
4 => Some(Self::UserControl),
5 => Some(Self::WindowAckSize),
6 => Some(Self::SetPeerBandwidth),
8 => Some(Self::Audio),
9 => Some(Self::Video),
15 => Some(Self::DataAmf3),
16 => Some(Self::SharedObjectAmf3),
17 => Some(Self::CommandAmf3),
18 => Some(Self::DataAmf0),
19 => Some(Self::SharedObjectAmf0),
20 => Some(Self::CommandAmf0),
22 => Some(Self::Aggregate),
_ => None,
}
}
#[must_use]
pub const fn is_control(&self) -> bool {
matches!(
self,
Self::SetChunkSize
| Self::Abort
| Self::Acknowledgement
| Self::UserControl
| Self::WindowAckSize
| Self::SetPeerBandwidth
)
}
#[must_use]
pub const fn is_command(&self) -> bool {
matches!(self, Self::CommandAmf0 | Self::CommandAmf3)
}
#[must_use]
pub const fn is_media(&self) -> bool {
matches!(self, Self::Audio | Self::Video)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u16)]
pub enum UserControlEvent {
StreamBegin = 0,
StreamEof = 1,
StreamDry = 2,
SetBufferLength = 3,
StreamIsRecorded = 4,
PingRequest = 6,
PingResponse = 7,
}
impl UserControlEvent {
#[must_use]
pub const fn from_id(id: u16) -> Option<Self> {
match id {
0 => Some(Self::StreamBegin),
1 => Some(Self::StreamEof),
2 => Some(Self::StreamDry),
3 => Some(Self::SetBufferLength),
4 => Some(Self::StreamIsRecorded),
6 => Some(Self::PingRequest),
7 => Some(Self::PingResponse),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub enum ControlMessage {
SetChunkSize(u32),
Abort(u32),
Acknowledgement(u32),
WindowAckSize(u32),
SetPeerBandwidth {
size: u32,
limit_type: u8,
},
UserControl {
event: UserControlEvent,
data: u32,
extra: Option<u32>,
},
}
impl ControlMessage {
#[must_use]
pub fn encode(&self) -> Bytes {
let mut buf = BytesMut::new();
match self {
Self::SetChunkSize(size) => {
buf.put_u32(*size & 0x7FFF_FFFF); }
Self::Abort(csid) => {
buf.put_u32(*csid);
}
Self::Acknowledgement(seq) => {
buf.put_u32(*seq);
}
Self::WindowAckSize(size) => {
buf.put_u32(*size);
}
Self::SetPeerBandwidth { size, limit_type } => {
buf.put_u32(*size);
buf.put_u8(*limit_type);
}
Self::UserControl { event, data, extra } => {
buf.put_u16(*event as u16);
buf.put_u32(*data);
if let Some(ex) = extra {
buf.put_u32(*ex);
}
}
}
buf.freeze()
}
pub fn decode(message_type: MessageType, data: &[u8]) -> NetResult<Self> {
let mut buf = data;
match message_type {
MessageType::SetChunkSize => {
if buf.len() < 4 {
return Err(NetError::parse(0, "SetChunkSize too short"));
}
Ok(Self::SetChunkSize(buf.get_u32() & 0x7FFF_FFFF))
}
MessageType::Abort => {
if buf.len() < 4 {
return Err(NetError::parse(0, "Abort too short"));
}
Ok(Self::Abort(buf.get_u32()))
}
MessageType::Acknowledgement => {
if buf.len() < 4 {
return Err(NetError::parse(0, "Ack too short"));
}
Ok(Self::Acknowledgement(buf.get_u32()))
}
MessageType::WindowAckSize => {
if buf.len() < 4 {
return Err(NetError::parse(0, "WindowAckSize too short"));
}
Ok(Self::WindowAckSize(buf.get_u32()))
}
MessageType::SetPeerBandwidth => {
if buf.len() < 5 {
return Err(NetError::parse(0, "SetPeerBandwidth too short"));
}
let size = buf.get_u32();
let limit_type = buf.get_u8();
Ok(Self::SetPeerBandwidth { size, limit_type })
}
MessageType::UserControl => {
if buf.len() < 6 {
return Err(NetError::parse(0, "UserControl too short"));
}
let event_id = buf.get_u16();
let event = UserControlEvent::from_id(event_id).ok_or_else(|| {
NetError::parse(0, format!("Unknown user control event: {event_id}"))
})?;
let data = buf.get_u32();
let extra = if buf.remaining() >= 4 {
Some(buf.get_u32())
} else {
None
};
Ok(Self::UserControl { event, data, extra })
}
_ => Err(NetError::parse(
0,
format!("Not a control message: {:?}", message_type),
)),
}
}
#[must_use]
pub const fn message_type(&self) -> MessageType {
match self {
Self::SetChunkSize(_) => MessageType::SetChunkSize,
Self::Abort(_) => MessageType::Abort,
Self::Acknowledgement(_) => MessageType::Acknowledgement,
Self::WindowAckSize(_) => MessageType::WindowAckSize,
Self::SetPeerBandwidth { .. } => MessageType::SetPeerBandwidth,
Self::UserControl { .. } => MessageType::UserControl,
}
}
}
#[derive(Debug, Clone)]
pub struct CommandMessage {
pub name: String,
pub transaction_id: f64,
pub command_object: Option<super::amf::AmfValue>,
pub args: Vec<super::amf::AmfValue>,
}
impl CommandMessage {
#[must_use]
pub fn new(name: impl Into<String>, transaction_id: f64) -> Self {
Self {
name: name.into(),
transaction_id,
command_object: None,
args: Vec::new(),
}
}
#[must_use]
pub fn with_command_object(mut self, obj: super::amf::AmfValue) -> Self {
self.command_object = Some(obj);
self
}
#[must_use]
pub fn with_arg(mut self, arg: super::amf::AmfValue) -> Self {
self.args.push(arg);
self
}
#[must_use]
pub fn connect(app: &str, tc_url: &str) -> Self {
use super::amf::AmfValue;
let mut props = std::collections::HashMap::new();
props.insert("app".to_string(), AmfValue::String(app.to_string()));
props.insert("tcUrl".to_string(), AmfValue::String(tc_url.to_string()));
props.insert("fpad".to_string(), AmfValue::Boolean(false));
props.insert("capabilities".to_string(), AmfValue::Number(239.0));
props.insert("audioCodecs".to_string(), AmfValue::Number(3575.0));
props.insert("videoCodecs".to_string(), AmfValue::Number(252.0));
props.insert("videoFunction".to_string(), AmfValue::Number(1.0));
Self::new("connect", 1.0).with_command_object(AmfValue::Object(props))
}
#[must_use]
pub fn create_stream(transaction_id: f64) -> Self {
use super::amf::AmfValue;
Self::new("createStream", transaction_id).with_command_object(AmfValue::Null)
}
#[must_use]
pub fn play(stream_name: &str, transaction_id: f64) -> Self {
use super::amf::AmfValue;
Self::new("play", transaction_id)
.with_command_object(AmfValue::Null)
.with_arg(AmfValue::String(stream_name.to_string()))
}
#[must_use]
pub fn publish(stream_name: &str, publish_type: &str, transaction_id: f64) -> Self {
use super::amf::AmfValue;
Self::new("publish", transaction_id)
.with_command_object(AmfValue::Null)
.with_arg(AmfValue::String(stream_name.to_string()))
.with_arg(AmfValue::String(publish_type.to_string()))
}
#[must_use]
pub fn result(transaction_id: f64, result: super::amf::AmfValue) -> Self {
Self::new("_result", transaction_id)
.with_command_object(super::amf::AmfValue::Null)
.with_arg(result)
}
#[must_use]
pub fn error(transaction_id: f64, error: super::amf::AmfValue) -> Self {
Self::new("_error", transaction_id)
.with_command_object(super::amf::AmfValue::Null)
.with_arg(error)
}
}
#[derive(Debug, Clone)]
pub struct DataMessage {
pub handler: String,
pub values: Vec<super::amf::AmfValue>,
}
impl DataMessage {
#[must_use]
pub fn new(handler: impl Into<String>) -> Self {
Self {
handler: handler.into(),
values: Vec::new(),
}
}
#[must_use]
pub fn with_value(mut self, value: super::amf::AmfValue) -> Self {
self.values.push(value);
self
}
#[must_use]
pub fn on_metadata(metadata: super::amf::AmfValue) -> Self {
Self::new("onMetaData").with_value(metadata)
}
}
#[derive(Debug, Clone)]
pub enum RtmpMessage {
Control(ControlMessage),
Command(CommandMessage),
Data(DataMessage),
Audio(Bytes),
Video(Bytes),
Unknown {
type_id: u8,
payload: Bytes,
},
}
impl RtmpMessage {
#[must_use]
pub fn type_id(&self) -> u8 {
match self {
Self::Control(ctrl) => ctrl.message_type() as u8,
Self::Command(_) => MessageType::CommandAmf0 as u8,
Self::Data(_) => MessageType::DataAmf0 as u8,
Self::Audio(_) => MessageType::Audio as u8,
Self::Video(_) => MessageType::Video as u8,
Self::Unknown { type_id, .. } => *type_id,
}
}
#[must_use]
pub const fn is_control(&self) -> bool {
matches!(self, Self::Control(_))
}
#[must_use]
pub const fn is_media(&self) -> bool {
matches!(self, Self::Audio(_) | Self::Video(_))
}
#[must_use]
pub const fn is_command(&self) -> bool {
matches!(self, Self::Command(_))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_message_type_from_id() {
assert_eq!(MessageType::from_id(1), Some(MessageType::SetChunkSize));
assert_eq!(MessageType::from_id(20), Some(MessageType::CommandAmf0));
assert_eq!(MessageType::from_id(99), None);
}
#[test]
fn test_message_type_classification() {
assert!(MessageType::SetChunkSize.is_control());
assert!(!MessageType::SetChunkSize.is_command());
assert!(MessageType::CommandAmf0.is_command());
assert!(MessageType::Video.is_media());
}
#[test]
fn test_control_message_encode_decode() {
let msg = ControlMessage::SetChunkSize(4096);
let encoded = msg.encode();
let decoded = ControlMessage::decode(MessageType::SetChunkSize, &encoded)
.expect("should succeed in test");
if let ControlMessage::SetChunkSize(size) = decoded {
assert_eq!(size, 4096);
} else {
panic!("Wrong message type");
}
}
#[test]
fn test_window_ack_size() {
let msg = ControlMessage::WindowAckSize(2_500_000);
let encoded = msg.encode();
let decoded = ControlMessage::decode(MessageType::WindowAckSize, &encoded)
.expect("should succeed in test");
if let ControlMessage::WindowAckSize(size) = decoded {
assert_eq!(size, 2_500_000);
} else {
panic!("Wrong message type");
}
}
#[test]
fn test_peer_bandwidth() {
let msg = ControlMessage::SetPeerBandwidth {
size: 5_000_000,
limit_type: 2,
};
let encoded = msg.encode();
let decoded = ControlMessage::decode(MessageType::SetPeerBandwidth, &encoded)
.expect("should succeed in test");
if let ControlMessage::SetPeerBandwidth { size, limit_type } = decoded {
assert_eq!(size, 5_000_000);
assert_eq!(limit_type, 2);
} else {
panic!("Wrong message type");
}
}
#[test]
fn test_user_control_event() {
let msg = ControlMessage::UserControl {
event: UserControlEvent::StreamBegin,
data: 1,
extra: None,
};
let encoded = msg.encode();
let decoded = ControlMessage::decode(MessageType::UserControl, &encoded)
.expect("should succeed in test");
if let ControlMessage::UserControl { event, data, .. } = decoded {
assert_eq!(event, UserControlEvent::StreamBegin);
assert_eq!(data, 1);
} else {
panic!("Wrong message type");
}
}
#[test]
fn test_command_message_connect() {
let cmd = CommandMessage::connect("live", "rtmp://localhost/live");
assert_eq!(cmd.name, "connect");
assert_eq!(cmd.transaction_id, 1.0);
assert!(cmd.command_object.is_some());
}
#[test]
fn test_command_message_play() {
let cmd = CommandMessage::play("stream1", 5.0);
assert_eq!(cmd.name, "play");
assert_eq!(cmd.transaction_id, 5.0);
assert_eq!(cmd.args.len(), 1);
}
#[test]
fn test_rtmp_message_type_id() {
let ctrl = RtmpMessage::Control(ControlMessage::SetChunkSize(128));
assert_eq!(ctrl.type_id(), 1);
assert!(ctrl.is_control());
let audio = RtmpMessage::Audio(Bytes::new());
assert_eq!(audio.type_id(), 8);
assert!(audio.is_media());
}
}