use alloc::borrow::ToOwned;
use alloc::collections::VecDeque;
use alloc::string::{String, ToString};
use crate::error::Error;
use crate::media::{AudioFrame, VideoFrame};
use crate::rtmp_command::{RtmpCommand, RtmpConnectCommand, RtmpResultCommand, TransactionId};
use crate::rtmp_connection::{
RtmpConnectionEvent, RtmpConnectionOptions, RtmpConnectionState, RtmpMessageChannel,
};
use crate::rtmp_handshake::RtmpClientHandshake;
use crate::rtmp_message::{RtmpMessage, RtmpMessageHeader, RtmpMessageStreamId};
use crate::rtmp_timestamp::RtmpTimestamp;
use crate::rtmp_url::RtmpUrl;
use crate::rtmp_user_control_event::RtmpUserControlEvent;
const FLASH_VER: &str = "FMLE/3.0 (compatible; FME/3.0)";
#[derive(Debug)]
pub struct RtmpPublishClientConnection {
inner: RtmpClientConnection,
}
impl RtmpPublishClientConnection {
pub fn new(url: RtmpUrl) -> Self {
Self {
inner: RtmpClientConnection::new(url),
}
}
pub fn feed_recv_buf(&mut self, buf: &[u8]) -> Result<(), Error> {
self.inner.feed_recv_buf(buf)?;
if self.inner.state == RtmpConnectionState::MediaStreamCreated {
self.publish()?;
}
Ok(())
}
pub fn send_buf(&self) -> &[u8] {
self.inner.send_buf()
}
pub fn advance_send_buf(&mut self, n: usize) {
self.inner.advance_send_buf(n)
}
pub fn state(&self) -> RtmpConnectionState {
self.inner.state
}
pub fn send_audio(&mut self, frame: AudioFrame) -> Result<(), Error> {
self.inner.state.expect(RtmpConnectionState::Publishing)?;
let message = RtmpMessage::Audio {
header: RtmpMessageHeader {
stream_id: RtmpMessageStreamId::MEDIA,
timestamp: frame.timestamp,
},
frame,
};
self.inner.message_channel.feed_send_message(message);
Ok(())
}
pub fn send_video(&mut self, frame: VideoFrame) -> Result<(), Error> {
self.inner.state.expect(RtmpConnectionState::Publishing)?;
let message = RtmpMessage::Video {
header: RtmpMessageHeader {
stream_id: RtmpMessageStreamId::MEDIA,
timestamp: frame.timestamp,
},
frame,
};
self.inner.message_channel.feed_send_message(message);
Ok(())
}
pub fn next_event(&mut self) -> Option<RtmpConnectionEvent> {
self.inner.next_event()
}
fn publish(&mut self) -> Result<(), Error> {
self.inner
.change_state(RtmpConnectionState::PublishPending)?;
let command = RtmpCommand::Publish(crate::rtmp_command::RtmpPublishCommand {
transaction_id: self.inner.next_transaction_id,
stream_name: self.inner.url.stream_name.clone(),
});
let message = command.into_message(RtmpMessageHeader {
stream_id: RtmpMessageStreamId::MEDIA,
timestamp: RtmpTimestamp::ZERO,
})?;
self.inner.message_channel.feed_send_message(message);
self.inner.next_transaction_id.increment();
Ok(())
}
}
#[derive(Debug)]
pub struct RtmpPlayClientConnection {
inner: RtmpClientConnection,
}
impl RtmpPlayClientConnection {
pub fn new(url: RtmpUrl) -> Self {
Self {
inner: RtmpClientConnection::new(url),
}
}
pub fn feed_recv_buf(&mut self, buf: &[u8]) -> Result<(), Error> {
self.inner.feed_recv_buf(buf)?;
if self.inner.state == RtmpConnectionState::MediaStreamCreated {
self.play()?;
}
Ok(())
}
pub fn send_buf(&self) -> &[u8] {
self.inner.send_buf()
}
pub fn advance_send_buf(&mut self, n: usize) {
self.inner.advance_send_buf(n)
}
pub fn state(&self) -> RtmpConnectionState {
self.inner.state
}
pub fn next_event(&mut self) -> Option<RtmpConnectionEvent> {
self.inner.next_event()
}
fn play(&mut self) -> Result<(), Error> {
self.inner.change_state(RtmpConnectionState::PlayPending)?;
let command = RtmpCommand::Play(crate::rtmp_command::RtmpPlayCommand {
transaction_id: self.inner.next_transaction_id,
stream_name: self.inner.url.stream_name.clone(),
start: -1.0, });
let message = command.into_message(RtmpMessageHeader {
stream_id: RtmpMessageStreamId::MEDIA,
timestamp: RtmpTimestamp::ZERO,
})?;
self.inner.message_channel.feed_send_message(message);
self.inner.next_transaction_id.increment();
Ok(())
}
}
#[derive(Debug)]
struct RtmpClientConnection {
options: RtmpConnectionOptions,
state: RtmpConnectionState,
event_queue: VecDeque<RtmpConnectionEvent>,
handshake: RtmpClientHandshake,
message_channel: RtmpMessageChannel,
next_transaction_id: TransactionId,
url: RtmpUrl,
}
impl RtmpClientConnection {
fn new(url: RtmpUrl) -> Self {
let initial_state = RtmpConnectionState::Handshaking;
let mut event_queue = VecDeque::new();
event_queue.push_back(RtmpConnectionEvent::StateChanged(initial_state));
Self {
options: RtmpConnectionOptions::default(),
state: initial_state,
event_queue,
handshake: RtmpClientHandshake::new(),
message_channel: RtmpMessageChannel::default(),
next_transaction_id: TransactionId::NON_RESERVED_START,
url,
}
}
fn change_state(&mut self, new_state: RtmpConnectionState) -> Result<(), Error> {
self.state = new_state;
self.event_queue
.push_back(RtmpConnectionEvent::StateChanged(new_state));
match new_state {
RtmpConnectionState::Connecting => self.connect(),
RtmpConnectionState::Connected => self.create_stream(),
_ => Ok(()),
}
}
fn connect(&mut self) -> Result<(), Error> {
let message = RtmpMessage::win_ack_size(self.options.ack_window_size);
self.message_channel.feed_send_message(message);
self.message_channel
.set_local_ack_window_size(self.options.ack_window_size);
let message = RtmpMessage::set_peer_bandwidth(self.options.ack_window_size);
self.message_channel.feed_send_message(message);
self.message_channel
.feed_send_message(RtmpMessage::set_chunk_size(self.options.chunk_size));
let tc_url = format!(
"{}://{}:{}/{}",
if self.url.tls { "rtmps" } else { "rtmp" },
self.url.host,
self.url.port,
self.url.app
);
let command = RtmpCommand::Connect(RtmpConnectCommand {
app: self.url.app.clone(),
flash_ver: FLASH_VER.to_owned(),
tc_url,
});
let message = command.into_pcm_message()?;
self.message_channel.feed_send_message(message);
Ok(())
}
fn feed_recv_buf(&mut self, buf: &[u8]) -> Result<(), Error> {
if self.state == RtmpConnectionState::Handshaking {
self.handshake.feed_recv_buf(buf)?;
if self.handshake.is_recv_complete() {
self.change_state(RtmpConnectionState::Connecting)?;
let remaining_recv_buf = self.handshake.take_recv_buf();
self.message_channel.feed_recv_buf(&remaining_recv_buf)?;
}
} else {
if let Some(total_bytes_received) = self.message_channel.feed_recv_buf(buf)? {
let ack_message = RtmpMessage::ack(total_bytes_received);
self.message_channel.feed_send_message(ack_message);
}
while let Some(message) = self.message_channel.next_recv_message() {
self.handle_recv_message(message)?;
}
}
Ok(())
}
fn handle_recv_message(&mut self, message: RtmpMessage) -> Result<(), Error> {
match message {
RtmpMessage::Command {
header,
name,
transaction_id,
object,
args,
..
} => {
let command = RtmpCommand::from_message(&name, transaction_id, object, args)?;
self.handle_command(header, command)
}
RtmpMessage::Audio { frame, .. } => {
self.event_queue
.push_back(RtmpConnectionEvent::AudioReceived(frame));
Ok(())
}
RtmpMessage::Video { frame, .. } => {
self.event_queue
.push_back(RtmpConnectionEvent::VideoReceived(frame));
Ok(())
}
RtmpMessage::WinAckSize { size, .. } => {
self.message_channel.set_peer_ack_window_size(size);
Ok(())
}
RtmpMessage::Ack {
sequence_number, ..
} => {
self.message_channel.notify_ack_received(sequence_number);
Ok(())
}
RtmpMessage::SetPeerBandwidth { size, .. } => {
let message = RtmpMessage::win_ack_size(self.options.ack_window_size);
self.message_channel.feed_send_message(message);
self.message_channel.set_local_ack_window_size(size);
Ok(())
}
RtmpMessage::UserControl {
event: RtmpUserControlEvent::PingRequest { timestamp },
..
} => {
let response = RtmpMessage::UserControl {
header: RtmpMessageHeader::PCM,
event: RtmpUserControlEvent::PingResponse { timestamp },
};
self.message_channel.feed_send_message(response);
Ok(())
}
RtmpMessage::UserControl { event, .. } => {
self.event_queue
.push_back(RtmpConnectionEvent::user_control_event_ignored(&event));
Ok(())
}
RtmpMessage::SetChunkSize { .. } => {
Ok(())
}
message => {
self.event_queue
.push_back(RtmpConnectionEvent::message_ignored(&message));
Ok(())
}
}
}
fn handle_command(
&mut self,
_header: RtmpMessageHeader,
command: RtmpCommand,
) -> Result<(), Error> {
match command {
RtmpCommand::Result(result) => self.handle_result_command(result),
RtmpCommand::OnStatus(status) => self.handle_on_status_command(status),
_ => {
self.event_queue
.push_back(RtmpConnectionEvent::command_ignored(&command));
Ok(())
}
}
}
fn handle_result_command(&mut self, command: RtmpResultCommand) -> Result<(), Error> {
if command.is_error() {
self.event_queue
.push_back(RtmpConnectionEvent::DisconnectedByPeer {
reason: format!(
"Command response error: {}",
self.extract_error_description(&command)
),
});
self.change_state(RtmpConnectionState::Disconnecting)?;
return Ok(());
}
if command.transaction_id == TransactionId::CONNECT {
self.state.expect(RtmpConnectionState::Connecting)?;
self.change_state(RtmpConnectionState::Connected)?;
} else if command.transaction_id == TransactionId::NON_RESERVED_START {
self.state.expect(RtmpConnectionState::Connected)?;
self.change_state(RtmpConnectionState::MediaStreamCreated)?;
} else {
self.event_queue
.push_back(RtmpConnectionEvent::command_ignored(&RtmpCommand::Result(
command,
)));
}
Ok(())
}
fn handle_on_status_command(
&mut self,
command: crate::rtmp_command::RtmpOnStatusCommand,
) -> Result<(), Error> {
if command.is_publish_start() && self.state == RtmpConnectionState::PublishPending {
self.change_state(RtmpConnectionState::Publishing)?;
return Ok(());
}
if command.is_play_start() && self.state == RtmpConnectionState::PlayPending {
self.change_state(RtmpConnectionState::Playing)?;
return Ok(());
}
if command.level == "error" {
let mut reason = format!("OnStatus error: {}", command.code);
if let Some(description) = &command.description {
reason.push_str(&format!(" - {}", description));
}
if let Some(details) = &command.details {
reason.push_str(&format!(" ({})", details));
}
self.event_queue
.push_back(RtmpConnectionEvent::DisconnectedByPeer { reason });
self.change_state(RtmpConnectionState::Disconnecting)?;
return Ok(());
}
self.event_queue
.push_back(RtmpConnectionEvent::command_ignored(
&RtmpCommand::OnStatus(command),
));
Ok(())
}
fn extract_error_description(&self, command: &RtmpResultCommand) -> String {
command
.properties
.expect_object_member("description")
.and_then(|desc| desc.expect_str())
.map(|s| s.to_string())
.unwrap_or_else(|_| "Unknown error".to_string())
}
fn send_buf(&self) -> &[u8] {
if !self.handshake.is_send_complete() {
self.handshake.send_buf()
} else {
self.message_channel.send_buf()
}
}
fn advance_send_buf(&mut self, n: usize) {
if !self.handshake.is_send_complete() {
self.handshake.advance_send_buf(n);
} else if !self.message_channel.advance_send_buf(n) {
self.event_queue
.push_back(RtmpConnectionEvent::DisconnectedByPeer {
reason: "ACK not received within expected interval".to_owned(),
});
self.change_state(RtmpConnectionState::Disconnecting)
.expect("infallible");
}
}
fn create_stream(&mut self) -> Result<(), Error> {
self.state.expect(RtmpConnectionState::Connected)?;
let command = RtmpCommand::CreateStream(crate::rtmp_command::RtmpCreateStreamCommand {
transaction_id: self.next_transaction_id,
});
let message = command.into_pcm_message()?;
self.message_channel.feed_send_message(message);
self.next_transaction_id.increment();
Ok(())
}
fn next_event(&mut self) -> Option<RtmpConnectionEvent> {
self.event_queue.pop_front()
}
}