#![allow(dead_code)]
#![allow(clippy::doc_markdown)]
#![allow(clippy::similar_names)]
#![allow(clippy::unreadable_literal)]
#![allow(clippy::cast_possible_truncation)]
#![allow(clippy::cast_precision_loss)]
#![allow(clippy::cast_lossless)]
#![allow(clippy::cast_sign_loss)]
#![allow(clippy::match_same_arms)]
#![allow(clippy::many_single_char_names)]
#![allow(clippy::unnecessary_wraps)]
#![allow(clippy::range_plus_one)]
#![allow(clippy::needless_pass_by_value)]
#![allow(clippy::manual_div_ceil)]
#![allow(clippy::comparison_chain)]
#![allow(clippy::unused_self)]
#![allow(clippy::trivially_copy_pass_by_ref)]
#![allow(clippy::missing_errors_doc)]
#![allow(clippy::too_many_arguments)]
#![allow(clippy::struct_excessive_bools)]
#![allow(clippy::needless_range_loop)]
#![allow(clippy::redundant_closure_for_method_calls)]
#![allow(clippy::must_use_candidate)]
#![allow(clippy::should_implement_trait)]
#![allow(clippy::items_after_statements)]
#![allow(clippy::if_not_else)]
#![allow(clippy::format_push_string)]
#![allow(clippy::single_match_else)]
#![allow(clippy::redundant_slicing)]
#![allow(clippy::uninlined_format_args)]
#![allow(clippy::map_unwrap_or)]
#![allow(clippy::derivable_impls)]
#![allow(clippy::assigning_clones)]
#![allow(clippy::if_same_then_else)]
#![allow(clippy::format_collect)]
#![allow(clippy::useless_conversion)]
#![allow(clippy::unused_async)]
#![allow(clippy::identity_op)]
use super::{
amf::{AmfDecoder, AmfEncoder, AmfValue},
chunk::{AssembledMessage, ChunkStream, MessageHeader},
handshake::{Handshake, HANDSHAKE_SIZE},
message::{CommandMessage, ControlMessage, DataMessage, MessageType, RtmpMessage},
};
use crate::error::{NetError, NetResult};
use bytes::{Bytes, BytesMut};
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::timeout;
pub const DEFAULT_RTMP_PORT: u16 = 1935;
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
pub const DEFAULT_CHUNK_SIZE: u32 = 4096;
pub const DEFAULT_WINDOW_ACK_SIZE: u32 = 2_500_000;
pub mod chunk_stream_id {
pub const PROTOCOL_CONTROL: u32 = 2;
pub const COMMAND: u32 = 3;
pub const AUDIO: u32 = 4;
pub const VIDEO: u32 = 5;
pub const DATA: u32 = 6;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
Disconnected,
Handshaking,
Connected,
Connecting,
Ready,
Publishing,
Playing,
Error,
}
impl ConnectionState {
#[must_use]
pub const fn is_ready(&self) -> bool {
matches!(self, Self::Ready | Self::Publishing | Self::Playing)
}
#[must_use]
pub const fn is_connected(&self) -> bool {
!matches!(self, Self::Disconnected | Self::Error)
}
}
#[derive(Debug, Clone)]
pub struct RtmpUrl {
pub protocol: String,
pub host: String,
pub port: u16,
pub app: String,
pub stream: String,
}
impl RtmpUrl {
pub fn parse(url: &str) -> NetResult<Self> {
let url = url.trim();
let (protocol, rest) = if let Some(rest) = url.strip_prefix("rtmp://") {
("rtmp", rest)
} else if let Some(rest) = url.strip_prefix("rtmps://") {
("rtmps", rest)
} else {
return Err(NetError::invalid_url(
"URL must start with rtmp:// or rtmps://",
));
};
let parts: Vec<&str> = rest.splitn(2, '/').collect();
if parts.len() < 2 {
return Err(NetError::invalid_url("URL must contain application path"));
}
let host_port = parts[0];
let path = parts[1];
let (host, port) = if let Some((h, p)) = host_port.split_once(':') {
let port = p
.parse::<u16>()
.map_err(|_| NetError::invalid_url("Invalid port number"))?;
(h.to_string(), port)
} else {
(host_port.to_string(), DEFAULT_RTMP_PORT)
};
let path_parts: Vec<&str> = path.splitn(2, '/').collect();
let app = path_parts[0].to_string();
let stream = if path_parts.len() > 1 {
path_parts[1].to_string()
} else {
String::new()
};
Ok(Self {
protocol: protocol.to_string(),
host,
port,
app,
stream,
})
}
#[must_use]
pub fn address(&self) -> String {
format!("{}:{}", self.host, self.port)
}
#[must_use]
pub fn tc_url(&self) -> String {
format!("{}://{}/{}", self.protocol, self.host, self.app)
}
}
#[derive(Debug, Clone)]
pub struct RtmpClientConfig {
pub connect_timeout: Duration,
pub read_timeout: Duration,
pub write_timeout: Duration,
pub chunk_size: u32,
pub window_ack_size: u32,
pub extended_timestamp: bool,
}
impl Default for RtmpClientConfig {
fn default() -> Self {
Self {
connect_timeout: DEFAULT_TIMEOUT,
read_timeout: DEFAULT_TIMEOUT,
write_timeout: DEFAULT_TIMEOUT,
chunk_size: DEFAULT_CHUNK_SIZE,
window_ack_size: DEFAULT_WINDOW_ACK_SIZE,
extended_timestamp: true,
}
}
}
#[derive(Debug, Clone)]
pub struct SessionInfo {
pub transaction_id: f64,
pub stream_id: u32,
pub server_bandwidth: Option<u32>,
pub client_bandwidth: Option<u32>,
pub bytes_sent: u64,
pub bytes_received: u64,
pub timestamp: u32,
}
impl SessionInfo {
fn new() -> Self {
Self {
transaction_id: 1.0,
stream_id: 0,
server_bandwidth: None,
client_bandwidth: None,
bytes_sent: 0,
bytes_received: 0,
timestamp: 0,
}
}
fn next_transaction_id(&mut self) -> f64 {
let id = self.transaction_id;
self.transaction_id += 1.0;
id
}
fn update_timestamp(&mut self) {
if let Ok(duration) = SystemTime::now().duration_since(UNIX_EPOCH) {
self.timestamp = duration.as_millis() as u32;
}
}
}
type ResponseCallback = Box<dyn FnOnce(NetResult<RtmpMessage>) + Send>;
pub struct RtmpClient {
stream: Option<TcpStream>,
config: RtmpClientConfig,
state: ConnectionState,
handshake: Handshake,
chunk_stream: ChunkStream,
session: SessionInfo,
url: Option<RtmpUrl>,
pending_responses: HashMap<u32, ResponseCallback>,
read_buffer: BytesMut,
chunk_size: u32,
}
impl RtmpClient {
#[must_use]
pub fn new() -> Self {
Self::with_config(RtmpClientConfig::default())
}
#[must_use]
pub fn with_config(config: RtmpClientConfig) -> Self {
let chunk_size = config.chunk_size;
let mut chunk_stream = ChunkStream::new();
chunk_stream.set_tx_chunk_size(chunk_size);
chunk_stream.set_rx_chunk_size(chunk_size);
Self {
stream: None,
config,
state: ConnectionState::Disconnected,
handshake: Handshake::new(),
chunk_stream,
session: SessionInfo::new(),
url: None,
pending_responses: HashMap::new(),
read_buffer: BytesMut::with_capacity(8192),
chunk_size,
}
}
#[must_use]
pub const fn state(&self) -> ConnectionState {
self.state
}
#[must_use]
pub const fn is_connected(&self) -> bool {
self.state.is_connected()
}
#[must_use]
pub const fn is_ready(&self) -> bool {
self.state.is_ready()
}
#[must_use]
pub const fn session_info(&self) -> &SessionInfo {
&self.session
}
pub async fn connect(&mut self, url: &str) -> NetResult<()> {
let rtmp_url = RtmpUrl::parse(url)?;
let address = rtmp_url.address();
self.url = Some(rtmp_url.clone());
self.state = ConnectionState::Handshaking;
let stream = timeout(self.config.connect_timeout, TcpStream::connect(&address))
.await
.map_err(|_| NetError::timeout(format!("Connection to {address} timed out")))?
.map_err(|e| NetError::connection(format!("Failed to connect to {address}: {e}")))?;
self.stream = Some(stream);
self.perform_handshake().await?;
self.state = ConnectionState::Connected;
self.send_connect_command(&rtmp_url).await?;
Ok(())
}
async fn perform_handshake(&mut self) -> NetResult<()> {
let stream = self
.stream
.as_mut()
.ok_or_else(|| NetError::invalid_state("No active connection"))?;
self.session.update_timestamp();
self.handshake.set_epoch(self.session.timestamp);
let c0c1 = self.handshake.generate_c0c1();
timeout(self.config.write_timeout, stream.write_all(&c0c1))
.await
.map_err(|_| NetError::timeout("Handshake write timeout"))?
.map_err(|e| NetError::handshake(format!("Failed to send C0+C1: {e}")))?;
self.session.bytes_sent += c0c1.len() as u64;
let mut buf = vec![0u8; 1 + HANDSHAKE_SIZE * 2];
timeout(self.config.read_timeout, stream.read_exact(&mut buf))
.await
.map_err(|_| NetError::timeout("Handshake read timeout"))?
.map_err(|e| NetError::handshake(format!("Failed to read S0+S1+S2: {e}")))?;
self.session.bytes_received += buf.len() as u64;
self.handshake.parse_s0s1(&buf[..1 + HANDSHAKE_SIZE])?;
self.handshake.parse_s2(&buf[1 + HANDSHAKE_SIZE..])?;
let c2 = self.handshake.generate_c2();
timeout(self.config.write_timeout, stream.write_all(&c2))
.await
.map_err(|_| NetError::timeout("Handshake C2 write timeout"))?
.map_err(|e| NetError::handshake(format!("Failed to send C2: {e}")))?;
self.session.bytes_sent += c2.len() as u64;
if !self.handshake.is_done() {
return Err(NetError::handshake("Handshake incomplete"));
}
Ok(())
}
async fn send_connect_command(&mut self, url: &RtmpUrl) -> NetResult<()> {
self.state = ConnectionState::Connecting;
let cmd = CommandMessage::connect(&url.app, &url.tc_url());
self.send_command(cmd, chunk_stream_id::COMMAND).await?;
self.wait_for_connect_response().await?;
self.state = ConnectionState::Ready;
Ok(())
}
async fn wait_for_connect_response(&mut self) -> NetResult<()> {
loop {
let messages = self.read_messages().await?;
for msg in messages {
if let RtmpMessage::Command(cmd) = msg {
if cmd.name == "_result" || cmd.name == "onBWDone" {
return Ok(());
}
if cmd.name == "_error" {
return Err(NetError::protocol("Connect command failed"));
}
}
}
}
}
pub async fn publish(&mut self, stream_name: &str, publish_type: &str) -> NetResult<()> {
if !self.is_ready() {
return Err(NetError::invalid_state("Client not ready for publishing"));
}
let stream_id = self.create_stream().await?;
self.session.stream_id = stream_id;
let transaction_id = self.session.next_transaction_id();
let cmd = CommandMessage::publish(stream_name, publish_type, transaction_id);
self.send_command(cmd, chunk_stream_id::COMMAND).await?;
self.wait_for_publish_response().await?;
self.state = ConnectionState::Publishing;
Ok(())
}
async fn wait_for_publish_response(&mut self) -> NetResult<()> {
loop {
let messages = self.read_messages().await?;
for msg in messages {
if let RtmpMessage::Command(cmd) = msg {
if cmd.name == "onStatus" {
if let Some(info) = cmd.args.first() {
if let Some(obj) = info.as_object() {
if let Some(code) = obj.get("code") {
if let Some(code_str) = code.as_str() {
if code_str.contains("Success")
|| code_str.contains("Start")
{
return Ok(());
}
if code_str.contains("Error") || code_str.contains("Fail") {
return Err(NetError::protocol(format!(
"Publish failed: {code_str}"
)));
}
}
}
}
}
}
}
}
}
}
pub async fn play(&mut self, stream_name: &str) -> NetResult<()> {
if !self.is_ready() {
return Err(NetError::invalid_state("Client not ready for playing"));
}
let stream_id = self.create_stream().await?;
self.session.stream_id = stream_id;
let transaction_id = self.session.next_transaction_id();
let cmd = CommandMessage::play(stream_name, transaction_id);
self.send_command(cmd, chunk_stream_id::COMMAND).await?;
self.wait_for_play_response().await?;
self.state = ConnectionState::Playing;
Ok(())
}
async fn wait_for_play_response(&mut self) -> NetResult<()> {
loop {
let messages = self.read_messages().await?;
for msg in messages {
if let RtmpMessage::Command(cmd) = msg {
if cmd.name == "onStatus" {
if let Some(info) = cmd.args.first() {
if let Some(obj) = info.as_object() {
if let Some(code) = obj.get("code") {
if let Some(code_str) = code.as_str() {
if code_str.contains("Start") || code_str.contains("Reset")
{
return Ok(());
}
if code_str.contains("Error") || code_str.contains("Fail") {
return Err(NetError::protocol(format!(
"Play failed: {code_str}"
)));
}
}
}
}
}
}
}
}
}
}
async fn create_stream(&mut self) -> NetResult<u32> {
let transaction_id = self.session.next_transaction_id();
let cmd = CommandMessage::create_stream(transaction_id);
self.send_command(cmd, chunk_stream_id::COMMAND).await?;
loop {
let messages = self.read_messages().await?;
for msg in messages {
if let RtmpMessage::Command(cmd) = msg {
if cmd.name == "_result" && !cmd.args.is_empty() {
if let Some(stream_id) = cmd.args[0].as_number() {
return Ok(stream_id as u32);
}
}
}
}
}
}
async fn send_command(&mut self, cmd: CommandMessage, csid: u32) -> NetResult<()> {
let msg = RtmpMessage::Command(cmd);
self.send_message(msg, csid).await
}
pub async fn send_data(&mut self, data: DataMessage) -> NetResult<()> {
let msg = RtmpMessage::Data(data);
self.send_message(msg, chunk_stream_id::DATA).await
}
pub async fn send_audio(&mut self, data: Bytes) -> NetResult<()> {
let msg = RtmpMessage::Audio(data);
self.send_message(msg, chunk_stream_id::AUDIO).await
}
pub async fn send_video(&mut self, data: Bytes) -> NetResult<()> {
let msg = RtmpMessage::Video(data);
self.send_message(msg, chunk_stream_id::VIDEO).await
}
pub async fn send_message(&mut self, message: RtmpMessage, csid: u32) -> NetResult<()> {
let payload = self.encode_message_payload(&message)?;
self.session.update_timestamp();
let header = MessageHeader::new(
self.session.timestamp,
payload.len() as u32,
message.type_id(),
self.session.stream_id,
);
let chunks = self.chunk_stream.encode_message(csid, &header, &payload);
self.write_bytes(&chunks).await?;
Ok(())
}
fn encode_message_payload(&self, message: &RtmpMessage) -> NetResult<Bytes> {
match message {
RtmpMessage::Control(ctrl) => Ok(ctrl.encode()),
RtmpMessage::Command(cmd) => self.encode_command(cmd),
RtmpMessage::Data(data) => self.encode_data(data),
RtmpMessage::Audio(bytes) => Ok(bytes.clone()),
RtmpMessage::Video(bytes) => Ok(bytes.clone()),
RtmpMessage::Unknown { payload, .. } => Ok(payload.clone()),
}
}
fn encode_command(&self, cmd: &CommandMessage) -> NetResult<Bytes> {
let mut enc = AmfEncoder::new();
enc.encode(&AmfValue::String(cmd.name.clone()));
enc.encode(&AmfValue::Number(cmd.transaction_id));
if let Some(obj) = &cmd.command_object {
enc.encode(obj);
} else {
enc.encode(&AmfValue::Null);
}
for arg in &cmd.args {
enc.encode(arg);
}
Ok(enc.finish())
}
fn encode_data(&self, data: &DataMessage) -> NetResult<Bytes> {
let mut enc = AmfEncoder::new();
enc.encode(&AmfValue::String(data.handler.clone()));
for value in &data.values {
enc.encode(value);
}
Ok(enc.finish())
}
pub async fn read_messages(&mut self) -> NetResult<Vec<RtmpMessage>> {
self.read_chunk_data().await?;
let assembled = self.chunk_stream.process_chunk(&self.read_buffer)?;
self.read_buffer.clear();
let mut messages = Vec::new();
for msg in assembled {
let rtmp_msg = self.decode_message(msg)?;
if let RtmpMessage::Control(ref ctrl) = rtmp_msg {
self.handle_control_message(ctrl).await?;
}
messages.push(rtmp_msg);
}
Ok(messages)
}
async fn read_chunk_data(&mut self) -> NetResult<()> {
let stream = self
.stream
.as_mut()
.ok_or_else(|| NetError::invalid_state("No active connection"))?;
let mut temp_buf = vec![0u8; self.chunk_size as usize * 4];
let n = timeout(self.config.read_timeout, stream.read(&mut temp_buf))
.await
.map_err(|_| NetError::timeout("Read timeout"))?
.map_err(|e| NetError::connection(format!("Read failed: {e}")))?;
if n == 0 {
return Err(NetError::Eof);
}
self.read_buffer.extend_from_slice(&temp_buf[..n]);
self.session.bytes_received += n as u64;
Ok(())
}
fn decode_message(&self, msg: AssembledMessage) -> NetResult<RtmpMessage> {
let msg_type = MessageType::from_id(msg.header.message_type).ok_or_else(|| {
NetError::protocol(format!("Unknown message type: {}", msg.header.message_type))
})?;
if msg_type.is_control() {
let ctrl = ControlMessage::decode(msg_type, &msg.payload)?;
Ok(RtmpMessage::Control(ctrl))
} else if msg_type.is_command() {
self.decode_command(&msg.payload)
} else if msg_type == MessageType::DataAmf0 {
self.decode_data(&msg.payload)
} else if msg_type == MessageType::Audio {
Ok(RtmpMessage::Audio(msg.payload))
} else if msg_type == MessageType::Video {
Ok(RtmpMessage::Video(msg.payload))
} else {
Ok(RtmpMessage::Unknown {
type_id: msg.header.message_type,
payload: msg.payload,
})
}
}
fn decode_command(&self, data: &[u8]) -> NetResult<RtmpMessage> {
let mut dec = AmfDecoder::new(data);
let name = dec
.decode()?
.as_str()
.ok_or_else(|| NetError::encoding("Command name must be string"))?
.to_string();
let transaction_id = dec
.decode()?
.as_number()
.ok_or_else(|| NetError::encoding("Transaction ID must be number"))?;
let command_object = if dec.has_remaining() {
Some(dec.decode()?)
} else {
None
};
let mut args = Vec::new();
while dec.has_remaining() {
args.push(dec.decode()?);
}
Ok(RtmpMessage::Command(CommandMessage {
name,
transaction_id,
command_object,
args,
}))
}
fn decode_data(&self, data: &[u8]) -> NetResult<RtmpMessage> {
let mut dec = AmfDecoder::new(data);
let handler = dec
.decode()?
.as_str()
.ok_or_else(|| NetError::encoding("Handler must be string"))?
.to_string();
let mut values = Vec::new();
while dec.has_remaining() {
values.push(dec.decode()?);
}
Ok(RtmpMessage::Data(DataMessage { handler, values }))
}
async fn handle_control_message(&mut self, ctrl: &ControlMessage) -> NetResult<()> {
match ctrl {
ControlMessage::SetChunkSize(size) => {
self.chunk_stream.set_rx_chunk_size(*size);
self.chunk_size = *size;
}
ControlMessage::WindowAckSize(size) => {
self.session.server_bandwidth = Some(*size);
}
ControlMessage::SetPeerBandwidth { size, .. } => {
self.session.client_bandwidth = Some(*size);
let ack = ControlMessage::WindowAckSize(*size);
let msg = RtmpMessage::Control(ack);
self.send_message(msg, chunk_stream_id::PROTOCOL_CONTROL)
.await?;
}
ControlMessage::UserControl { .. } => {
}
_ => {}
}
Ok(())
}
async fn write_bytes(&mut self, data: &[u8]) -> NetResult<()> {
let stream = self
.stream
.as_mut()
.ok_or_else(|| NetError::invalid_state("No active connection"))?;
timeout(self.config.write_timeout, stream.write_all(data))
.await
.map_err(|_| NetError::timeout("Write timeout"))?
.map_err(|e| NetError::connection(format!("Write failed: {e}")))?;
self.session.bytes_sent += data.len() as u64;
Ok(())
}
pub async fn close(&mut self) -> NetResult<()> {
if let Some(mut stream) = self.stream.take() {
let _ = stream.shutdown().await;
}
self.state = ConnectionState::Disconnected;
Ok(())
}
pub async fn send_metadata(&mut self, metadata: HashMap<String, AmfValue>) -> NetResult<()> {
let data = DataMessage::on_metadata(AmfValue::Object(metadata));
self.send_data(data).await
}
pub async fn set_chunk_size(&mut self, size: u32) -> NetResult<()> {
let ctrl = ControlMessage::SetChunkSize(size);
let msg = RtmpMessage::Control(ctrl);
self.send_message(msg, chunk_stream_id::PROTOCOL_CONTROL)
.await?;
self.chunk_stream.set_tx_chunk_size(size);
self.chunk_size = size;
Ok(())
}
pub async fn send_acknowledgement(&mut self, sequence: u32) -> NetResult<()> {
let ctrl = ControlMessage::Acknowledgement(sequence);
let msg = RtmpMessage::Control(ctrl);
self.send_message(msg, chunk_stream_id::PROTOCOL_CONTROL)
.await
}
pub async fn send_window_ack_size(&mut self, size: u32) -> NetResult<()> {
let ctrl = ControlMessage::WindowAckSize(size);
let msg = RtmpMessage::Control(ctrl);
self.send_message(msg, chunk_stream_id::PROTOCOL_CONTROL)
.await
}
pub async fn send_peer_bandwidth(&mut self, size: u32, limit_type: u8) -> NetResult<()> {
let ctrl = ControlMessage::SetPeerBandwidth { size, limit_type };
let msg = RtmpMessage::Control(ctrl);
self.send_message(msg, chunk_stream_id::PROTOCOL_CONTROL)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn send_user_control_event(
&mut self,
event: super::message::UserControlEvent,
data: u32,
extra: Option<u32>,
) -> NetResult<()> {
let ctrl = ControlMessage::UserControl { event, data, extra };
let msg = RtmpMessage::Control(ctrl);
self.send_message(msg, chunk_stream_id::PROTOCOL_CONTROL)
.await
}
pub async fn send_stream_begin(&mut self, stream_id: u32) -> NetResult<()> {
self.send_user_control_event(
super::message::UserControlEvent::StreamBegin,
stream_id,
None,
)
.await
}
pub async fn send_ping_request(&mut self, timestamp: u32) -> NetResult<()> {
self.send_user_control_event(
super::message::UserControlEvent::PingRequest,
timestamp,
None,
)
.await
}
pub async fn send_ping_response(&mut self, timestamp: u32) -> NetResult<()> {
self.send_user_control_event(
super::message::UserControlEvent::PingResponse,
timestamp,
None,
)
.await
}
pub async fn receive_message(&mut self) -> NetResult<RtmpMessage> {
let messages = self.read_messages().await?;
messages
.into_iter()
.next()
.ok_or_else(|| NetError::protocol("No message received"))
}
pub async fn receive_messages_filtered<F>(
&mut self,
mut filter: F,
) -> NetResult<Vec<RtmpMessage>>
where
F: FnMut(&RtmpMessage) -> bool,
{
let messages = self.read_messages().await?;
Ok(messages.into_iter().filter(|m| filter(m)).collect())
}
pub async fn flush(&mut self) -> NetResult<()> {
if let Some(stream) = self.stream.as_mut() {
timeout(self.config.write_timeout, stream.flush())
.await
.map_err(|_| NetError::timeout("Flush timeout"))?
.map_err(|e| NetError::connection(format!("Flush failed: {e}")))?;
}
Ok(())
}
#[must_use]
pub const fn url(&self) -> Option<&RtmpUrl> {
self.url.as_ref()
}
#[must_use]
pub const fn bytes_sent(&self) -> u64 {
self.session.bytes_sent
}
#[must_use]
pub const fn bytes_received(&self) -> u64 {
self.session.bytes_received
}
#[must_use]
pub const fn stream_id(&self) -> u32 {
self.session.stream_id
}
}
impl Default for RtmpClient {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct RtmpClientBuilder {
config: RtmpClientConfig,
}
impl RtmpClientBuilder {
#[must_use]
pub fn new() -> Self {
Self {
config: RtmpClientConfig::default(),
}
}
#[must_use]
pub const fn connect_timeout(mut self, timeout: Duration) -> Self {
self.config.connect_timeout = timeout;
self
}
#[must_use]
pub const fn read_timeout(mut self, timeout: Duration) -> Self {
self.config.read_timeout = timeout;
self
}
#[must_use]
pub const fn write_timeout(mut self, timeout: Duration) -> Self {
self.config.write_timeout = timeout;
self
}
#[must_use]
pub const fn chunk_size(mut self, size: u32) -> Self {
self.config.chunk_size = size;
self
}
#[must_use]
pub const fn window_ack_size(mut self, size: u32) -> Self {
self.config.window_ack_size = size;
self
}
#[must_use]
pub const fn extended_timestamp(mut self, enabled: bool) -> Self {
self.config.extended_timestamp = enabled;
self
}
#[must_use]
pub fn build(self) -> RtmpClient {
RtmpClient::with_config(self.config)
}
}
impl Default for RtmpClientBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct StreamKey {
pub app: String,
pub key: String,
}
impl StreamKey {
pub fn new(app: impl Into<String>, key: impl Into<String>) -> Self {
Self {
app: app.into(),
key: key.into(),
}
}
pub fn from_url(url: &str) -> Result<Self, String> {
let rest = url
.strip_prefix("rtmp://")
.or_else(|| url.strip_prefix("rtmps://"))
.ok_or_else(|| format!("URL must start with rtmp:// or rtmps://: {url}"))?;
let path = rest
.splitn(2, '/')
.nth(1)
.ok_or_else(|| format!("URL must contain application path: {url}"))?;
let mut parts = path.splitn(2, '/');
let app = parts
.next()
.filter(|s| !s.is_empty())
.ok_or_else(|| format!("URL must contain app name: {url}"))?;
let key = parts.next().unwrap_or("");
Ok(Self::new(app, key))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PublishMode {
Live,
Record,
Append,
}
impl PublishMode {
pub fn as_str(self) -> &'static str {
match self {
Self::Live => "live",
Self::Record => "record",
Self::Append => "append",
}
}
}
#[derive(Debug, Clone, Default)]
pub struct RtmpClientStats {
pub bytes_sent: u64,
pub bytes_received: u64,
pub packets_sent: u64,
pub packets_received: u64,
pub dropped_frames: u64,
pub connection_duration_ms: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ClientState {
Disconnected,
Connecting,
Handshaking,
Connected,
Publishing,
Playing,
Disconnecting,
Error,
}
impl ClientState {
pub fn is_active(self) -> bool {
matches!(self, Self::Connected | Self::Publishing | Self::Playing)
}
pub fn can_publish(self) -> bool {
matches!(self, Self::Connected)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RtmpMediaPacketType {
VideoKeyframe,
VideoInterframe,
AudioAAC,
AudioOpus,
Metadata,
}
#[derive(Debug, Clone)]
pub struct RtmpMediaPacket {
pub packet_type: RtmpMediaPacketType,
pub timestamp_ms: u32,
pub data: Vec<u8>,
pub is_keyframe: bool,
pub composition_time_offset: i32,
}
pub struct SimpleRtmpClient {
config: RtmpClientConfig,
state: ClientState,
stats: RtmpClientStats,
stream_key: Option<StreamKey>,
chunk_sequence_number: u32,
}
impl SimpleRtmpClient {
pub fn new(config: RtmpClientConfig) -> Self {
Self {
config,
state: ClientState::Disconnected,
stats: RtmpClientStats::default(),
stream_key: None,
chunk_sequence_number: 0,
}
}
pub fn with_default_config() -> Self {
Self::new(RtmpClientConfig::default())
}
pub fn state(&self) -> ClientState {
self.state
}
pub fn stats(&self) -> &RtmpClientStats {
&self.stats
}
pub fn stream_key(&self) -> Option<&StreamKey> {
self.stream_key.as_ref()
}
pub fn build_connect_message(&self, app: &str, tc_url: &str) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend(amf0::encode_string("connect"));
buf.extend(amf0::encode_number(1.0));
buf.extend(amf0::start_object());
buf.extend(amf0::encode_property("app", &amf0::encode_string(app)));
buf.extend(amf0::encode_property("tcUrl", &amf0::encode_string(tc_url)));
buf.extend(amf0::encode_property(
"type",
&amf0::encode_string("nonprivate"),
));
buf.extend(amf0::end_object());
buf
}
pub fn build_create_stream_message(&self, transaction_id: f64) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend(amf0::encode_string("createStream"));
buf.extend(amf0::encode_number(transaction_id));
buf.extend(amf0::encode_null());
buf
}
pub fn build_publish_message(
&self,
stream_id: u32,
stream_name: &str,
publish_type: PublishMode,
) -> Vec<u8> {
let _ = stream_id; let mut buf = Vec::new();
buf.extend(amf0::encode_string("publish"));
buf.extend(amf0::encode_number(0.0)); buf.extend(amf0::encode_null());
buf.extend(amf0::encode_string(stream_name));
buf.extend(amf0::encode_string(publish_type.as_str()));
buf
}
pub fn build_video_message(&self, packet: &RtmpMediaPacket, stream_id: u32) -> Vec<u8> {
let _ = stream_id;
let mut buf = Vec::new();
buf.extend_from_slice(&packet.timestamp_ms.to_be_bytes());
let flags: u8 = if packet.is_keyframe { 0x17 } else { 0x27 };
buf.push(flags);
let ct = packet.composition_time_offset;
buf.push(((ct >> 16) & 0xFF) as u8);
buf.push(((ct >> 8) & 0xFF) as u8);
buf.push((ct & 0xFF) as u8);
buf.extend_from_slice(&packet.data);
buf
}
pub fn build_metadata_message(
&self,
width: u32,
height: u32,
frame_rate: f64,
stream_id: u32,
) -> Vec<u8> {
let _ = stream_id;
let mut buf = Vec::new();
buf.extend(amf0::encode_string("@setDataFrame"));
buf.extend(amf0::encode_string("onMetaData"));
buf.extend(amf0::start_object());
buf.extend(amf0::encode_property(
"width",
&amf0::encode_number(f64::from(width)),
));
buf.extend(amf0::encode_property(
"height",
&amf0::encode_number(f64::from(height)),
));
buf.extend(amf0::encode_property(
"framerate",
&amf0::encode_number(frame_rate),
));
buf.extend(amf0::end_object());
buf
}
pub fn record_send(&mut self, bytes: u64) {
self.stats.bytes_sent += bytes;
self.stats.packets_sent += 1;
}
pub fn record_receive(&mut self, bytes: u64) {
self.stats.bytes_received += bytes;
self.stats.packets_received += 1;
}
pub fn simulate_connect(&mut self, stream_key: StreamKey) {
self.state = ClientState::Connected;
self.stream_key = Some(stream_key);
}
pub fn simulate_publish(&mut self) {
if self.state == ClientState::Connected {
self.state = ClientState::Publishing;
}
}
pub fn simulate_disconnect(&mut self) {
self.state = ClientState::Disconnected;
self.stream_key = None;
}
fn next_sequence(&mut self) -> u32 {
let n = self.chunk_sequence_number;
self.chunk_sequence_number = self.chunk_sequence_number.wrapping_add(1);
n
}
}
pub mod amf0 {
const NUMBER_TYPE: u8 = 0x00;
const BOOL_TYPE: u8 = 0x01;
const STRING_TYPE: u8 = 0x02;
const OBJECT_TYPE: u8 = 0x03;
const NULL_TYPE: u8 = 0x05;
const OBJECT_END_MARKER: [u8; 3] = [0x00, 0x00, 0x09];
pub fn encode_string(s: &str) -> Vec<u8> {
let bytes = s.as_bytes();
let len = bytes.len() as u16;
let mut buf = Vec::with_capacity(3 + bytes.len());
buf.push(STRING_TYPE);
buf.extend_from_slice(&len.to_be_bytes());
buf.extend_from_slice(bytes);
buf
}
pub fn encode_number(n: f64) -> Vec<u8> {
let mut buf = Vec::with_capacity(9);
buf.push(NUMBER_TYPE);
buf.extend_from_slice(&n.to_bits().to_be_bytes());
buf
}
pub fn encode_bool(b: bool) -> Vec<u8> {
vec![BOOL_TYPE, if b { 1 } else { 0 }]
}
pub fn encode_null() -> Vec<u8> {
vec![NULL_TYPE]
}
pub fn start_object() -> Vec<u8> {
vec![OBJECT_TYPE]
}
pub fn end_object() -> Vec<u8> {
OBJECT_END_MARKER.to_vec()
}
pub fn encode_property(key: &str, value: &[u8]) -> Vec<u8> {
let key_bytes = key.as_bytes();
let key_len = key_bytes.len() as u16;
let mut buf = Vec::with_capacity(2 + key_bytes.len() + value.len());
buf.extend_from_slice(&key_len.to_be_bytes());
buf.extend_from_slice(key_bytes);
buf.extend_from_slice(value);
buf
}
}
#[cfg(test)]
mod client_spec_tests {
use super::*;
#[test]
fn test_stream_key_new() {
let sk = StreamKey::new("live", "stream1");
assert_eq!(sk.app, "live");
assert_eq!(sk.key, "stream1");
}
#[test]
fn test_stream_key_from_url() {
let sk =
StreamKey::from_url("rtmp://localhost/live/stream1").expect("should succeed in test");
assert_eq!(sk.app, "live");
assert_eq!(sk.key, "stream1");
}
#[test]
fn test_publish_mode_str() {
assert_eq!(PublishMode::Live.as_str(), "live");
assert_eq!(PublishMode::Record.as_str(), "record");
assert_eq!(PublishMode::Append.as_str(), "append");
}
#[test]
fn test_client_state_is_active() {
assert!(!ClientState::Disconnected.is_active());
assert!(!ClientState::Connecting.is_active());
assert!(!ClientState::Handshaking.is_active());
assert!(ClientState::Connected.is_active());
assert!(ClientState::Publishing.is_active());
assert!(ClientState::Playing.is_active());
assert!(!ClientState::Disconnecting.is_active());
assert!(!ClientState::Error.is_active());
}
#[test]
fn test_client_simulate_lifecycle() {
let mut client = SimpleRtmpClient::with_default_config();
assert_eq!(client.state(), ClientState::Disconnected);
client.simulate_connect(StreamKey::new("live", "test"));
assert_eq!(client.state(), ClientState::Connected);
assert!(client.stream_key().is_some());
client.simulate_publish();
assert_eq!(client.state(), ClientState::Publishing);
client.simulate_disconnect();
assert_eq!(client.state(), ClientState::Disconnected);
assert!(client.stream_key().is_none());
}
#[test]
fn test_build_connect_message_not_empty() {
let client = SimpleRtmpClient::with_default_config();
let msg = client.build_connect_message("live", "rtmp://localhost/live");
assert!(!msg.is_empty());
assert_eq!(msg[0], 0x02);
}
#[test]
fn test_build_video_message() {
let client = SimpleRtmpClient::with_default_config();
let packet = RtmpMediaPacket {
packet_type: RtmpMediaPacketType::VideoKeyframe,
timestamp_ms: 1234,
data: vec![0xAA, 0xBB, 0xCC],
is_keyframe: true,
composition_time_offset: 0,
};
let msg = client.build_video_message(&packet, 1);
assert_eq!(&msg[0..4], &1234u32.to_be_bytes());
assert!(!msg.is_empty());
}
#[test]
fn test_amf0_string() {
let encoded = amf0::encode_string("hello");
assert_eq!(encoded[0], 0x02);
assert_eq!(encoded[1], 0x00);
assert_eq!(encoded[2], 0x05);
assert_eq!(&encoded[3..], b"hello");
}
#[test]
fn test_amf0_number() {
let encoded = amf0::encode_number(1.0);
assert_eq!(encoded.len(), 9);
assert_eq!(encoded[0], 0x00);
}
#[test]
fn test_record_stats() {
let mut client = SimpleRtmpClient::with_default_config();
assert_eq!(client.stats().bytes_sent, 0);
client.record_send(1024);
assert_eq!(client.stats().bytes_sent, 1024);
assert_eq!(client.stats().packets_sent, 1);
client.record_send(512);
assert_eq!(client.stats().bytes_sent, 1536);
assert_eq!(client.stats().packets_sent, 2);
}
}