use alloc::borrow::ToOwned;
use alloc::collections::VecDeque;
use alloc::string::String;
use core::fmt;
use crate::bytes::Buf;
use crate::error::Error;
use crate::media::{AudioFrame, VideoFrame};
use crate::rtmp_chunk::{RtmpChunkSize, RtmpChunkStreamId};
use crate::rtmp_command::RtmpCommand;
use crate::rtmp_message::RtmpMessage;
use crate::rtmp_message_decoder::RtmpMessageDecoder;
use crate::rtmp_message_encoder::RtmpMessageEncoder;
#[cfg(doc)]
use crate::rtmp_server_connection::RtmpServerConnection;
use crate::rtmp_user_control_event::RtmpUserControlEvent;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RtmpConnectionOptions {
pub chunk_size: RtmpChunkSize,
pub ack_window_size: u32,
}
impl Default for RtmpConnectionOptions {
fn default() -> Self {
Self {
chunk_size: RtmpChunkSize::saturating_new(4096),
ack_window_size: 5_000_000, }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RtmpConnectionEvent {
PublishRequested {
app: String,
tc_url: String,
stream_name: String,
},
PlayRequested {
app: String,
tc_url: String,
stream_name: String,
},
AudioReceived(AudioFrame),
VideoReceived(VideoFrame),
StateChanged(RtmpConnectionState),
DisconnectedByPeer {
reason: String,
},
CommandIgnored {
name: String,
detail: String,
},
MessageIgnored {
name: String,
detail: String,
},
UserControlEventIgnored {
name: String,
detail: String,
},
}
impl RtmpConnectionEvent {
pub(crate) fn command_ignored(command: &RtmpCommand) -> Self {
Self::CommandIgnored {
name: command.name().to_owned(),
detail: format!("{command:?}"),
}
}
pub(crate) fn message_ignored(message: &RtmpMessage) -> Self {
Self::MessageIgnored {
name: format!("{:?}", message.message_type()),
detail: format!("{message:?}"),
}
}
pub(crate) fn user_control_event_ignored(event: &RtmpUserControlEvent) -> Self {
Self::UserControlEventIgnored {
name: event.name().to_owned(),
detail: format!("{event:?}"),
}
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum RtmpConnectionState {
#[default]
Handshaking,
Connecting,
Connected,
MediaStreamCreated,
PublishPending,
Publishing,
PlayPending,
Playing,
Disconnecting,
}
impl RtmpConnectionState {
#[track_caller]
pub(crate) fn expect(self, expected: Self) -> Result<(), Error> {
if self == expected {
Ok(())
} else {
Err(Error::invalid_state(format!(
"expected connection state {expected}, but current state is {self}"
)))
}
}
}
impl fmt::Display for RtmpConnectionState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RtmpConnectionState::Handshaking => write!(f, "HANDSHAKING"),
RtmpConnectionState::Connecting => write!(f, "CONNECTING"),
RtmpConnectionState::Connected => write!(f, "CONNECTED"),
RtmpConnectionState::MediaStreamCreated => write!(f, "MEDIA_STREAM_CREATED"),
RtmpConnectionState::PublishPending => write!(f, "PUBLISH_PENDING"),
RtmpConnectionState::Publishing => write!(f, "PUBLISHING"),
RtmpConnectionState::PlayPending => write!(f, "PLAY_PENDING"),
RtmpConnectionState::Playing => write!(f, "PLAYING"),
RtmpConnectionState::Disconnecting => write!(f, "DISCONNECTING"),
}
}
}
#[derive(Debug, Default)]
pub struct RtmpMessageChannel {
decoder: RtmpMessageDecoder,
encoder: RtmpMessageEncoder,
decoded_messages: VecDeque<RtmpMessage>,
send_buf: Buf,
peer_ack_window_size: u32,
total_bytes_received: u32, last_ack_sent: u32,
local_ack_window_size: u32,
total_bytes_sent: u32, last_ack_received: u32, }
impl RtmpMessageChannel {
pub fn feed_recv_buf(&mut self, buf: &[u8]) -> Result<Option<u32>, Error> {
self.decoder.feed_buf(buf);
self.total_bytes_received = self.total_bytes_received.wrapping_add(buf.len() as u32);
while let Some(message) = self.decoder.decode()? {
self.decoded_messages.push_back(message);
}
let unacked_bytes = self.total_bytes_received.wrapping_sub(self.last_ack_sent);
if unacked_bytes > self.peer_ack_window_size / 2 {
self.last_ack_sent = self.total_bytes_received;
Ok(Some(self.total_bytes_received))
} else {
Ok(None)
}
}
pub fn send_buf(&self) -> &[u8] {
self.send_buf.get()
}
pub fn advance_send_buf(&mut self, n: usize) -> bool {
self.total_bytes_sent = self.total_bytes_sent.wrapping_add(n as u32);
self.send_buf.advance(n);
let unacked_bytes = self.total_bytes_sent.wrapping_sub(self.last_ack_received);
if unacked_bytes > self.local_ack_window_size * 2 {
false
} else {
true
}
}
pub fn next_recv_message(&mut self) -> Option<RtmpMessage> {
self.decoded_messages.pop_front()
}
pub fn feed_send_message(&mut self, message: RtmpMessage) {
let chunk_stream_id = RtmpChunkStreamId::from_message_stream_id(message.header().stream_id);
self.encoder
.encode(self.send_buf.inner_mut(), chunk_stream_id, message);
}
pub fn set_local_ack_window_size(&mut self, size: u32) {
self.local_ack_window_size = size;
}
pub fn set_peer_ack_window_size(&mut self, size: u32) {
self.peer_ack_window_size = size;
}
pub fn notify_ack_received(&mut self, acked_bytes: u32) {
self.last_ack_received = acked_bytes;
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::vec::Vec;
use crate::media::{
AudioFormat, AudioFrame, AudioSampleRate, VideoCodec, VideoFrame, VideoFrameType,
};
use crate::rtmp_message::{RtmpMessageHeader, RtmpMessageStreamId};
use crate::rtmp_message_decoder::RtmpMessageDecoder;
use crate::rtmp_timestamp::RtmpTimestamp;
use crate::rtmp_timestamp::RtmpTimestampDelta;
#[test]
fn message_channel_send_roundtrip_regression_pbt() {
let messages = vec![
RtmpMessage::SetChunkSize {
header: RtmpMessageHeader {
stream_id: RtmpMessageStreamId::PCM,
timestamp: RtmpTimestamp::ZERO,
},
size: RtmpChunkSize::new(1).unwrap(),
},
RtmpMessage::Video {
header: RtmpMessageHeader {
stream_id: RtmpMessageStreamId::new(65_597),
timestamp: RtmpTimestamp::from_millis(1),
},
frame: VideoFrame {
timestamp: RtmpTimestamp::from_millis(1),
composition_timestamp_offset: RtmpTimestampDelta::ZERO,
frame_type: VideoFrameType::KeyFrame,
codec: VideoCodec::Jpeg,
avc_packet_type: None,
data: vec![],
},
},
RtmpMessage::Audio {
header: RtmpMessageHeader {
stream_id: RtmpMessageStreamId::new(65_598),
timestamp: RtmpTimestamp::from_millis(16_777_215),
},
frame: AudioFrame {
timestamp: RtmpTimestamp::from_millis(16_777_215),
format: AudioFormat::Mp3,
sample_rate: AudioSampleRate::Khz5,
is_8bit_sample: false,
is_stereo: false,
is_aac_sequence_header: false,
data: vec![0],
},
},
];
let mut channel = RtmpMessageChannel::default();
for message in &messages {
channel.feed_send_message(message.clone());
}
let buf = channel.send_buf().to_vec();
let mut decoder = RtmpMessageDecoder::default();
decoder.feed_buf(&buf);
let mut decoded = Vec::new();
while let Some(message) = decoder.decode().unwrap() {
decoded.push(message);
}
assert_eq!(decoded, messages);
}
}