use alloc::borrow::ToOwned;
use alloc::collections::VecDeque;
use crate::error::Error;
use crate::media::{AudioFrame, VideoFrame};
use crate::rtmp_command::{
RtmpCommand, RtmpConnectCommand, RtmpOnStatusCommand, RtmpPlayCommand, RtmpPublishCommand,
RtmpResultCommand, TransactionId,
};
use crate::rtmp_connection::{
RtmpConnectionEvent, RtmpConnectionOptions, RtmpConnectionState, RtmpMessageChannel,
};
use crate::rtmp_handshake::RtmpServerHandshake;
use crate::rtmp_message::{RtmpMessage, RtmpMessageHeader, RtmpMessageStreamId};
use crate::rtmp_timestamp::RtmpTimestamp;
use crate::rtmp_user_control_event::RtmpUserControlEvent;
#[derive(Debug)]
pub struct RtmpServerConnection {
options: RtmpConnectionOptions,
state: RtmpConnectionState,
event_queue: VecDeque<RtmpConnectionEvent>,
handshake: RtmpServerHandshake,
message_channel: RtmpMessageChannel,
connect_command: Option<RtmpConnectCommand>,
pending_transaction_id: Option<TransactionId>,
}
impl RtmpServerConnection {
pub fn new() -> Self {
let mut this = Self {
options: RtmpConnectionOptions::default(),
state: RtmpConnectionState::default(),
event_queue: VecDeque::new(),
handshake: RtmpServerHandshake::new(),
message_channel: RtmpMessageChannel::default(),
connect_command: None,
pending_transaction_id: None,
};
this.change_state(RtmpConnectionState::Handshaking);
this
}
fn change_state(&mut self, new_state: RtmpConnectionState) {
self.state = new_state;
self.event_queue
.push_back(RtmpConnectionEvent::StateChanged(new_state));
}
pub fn feed_recv_buf(&mut self, buf: &[u8]) -> Result<(), Error> {
let feed_result = if self.state == RtmpConnectionState::Handshaking {
self.handshake.feed_recv_buf(buf)?;
if !self.handshake.is_recv_complete() {
return Ok(());
}
self.change_state(RtmpConnectionState::Connecting);
let remaining_recv_buf = self.handshake.take_recv_buf();
self.message_channel.feed_recv_buf(&remaining_recv_buf)?
} else {
self.message_channel.feed_recv_buf(buf)?
};
if let Some(total_bytes_received) = feed_result {
let ack_message = RtmpMessage::ack(total_bytes_received);
self.message_channel.feed_send_message(ack_message);
}
while let Some(message) = self.message_channel.next_recv_message() {
self.handle_recv_message(message)?;
}
Ok(())
}
fn handle_recv_message(&mut self, message: RtmpMessage) -> Result<(), Error> {
match message {
RtmpMessage::Command {
header,
name,
transaction_id,
object,
args,
..
} => {
let command = RtmpCommand::from_message(&name, transaction_id, object, args)?;
self.handle_command(header, command)?;
}
RtmpMessage::Audio { frame, .. } => {
self.event_queue
.push_back(RtmpConnectionEvent::AudioReceived(frame));
}
RtmpMessage::Video { frame, .. } => {
self.event_queue
.push_back(RtmpConnectionEvent::VideoReceived(frame));
}
RtmpMessage::WinAckSize { size, .. } => {
self.message_channel.set_peer_ack_window_size(size);
}
RtmpMessage::Ack {
sequence_number, ..
} => {
self.message_channel.notify_ack_received(sequence_number);
}
RtmpMessage::SetChunkSize { .. } => {
}
RtmpMessage::SetPeerBandwidth { size, .. } => {
let message = RtmpMessage::win_ack_size(self.options.ack_window_size);
self.message_channel.feed_send_message(message);
self.message_channel.set_local_ack_window_size(size);
}
RtmpMessage::UserControl {
event: RtmpUserControlEvent::SetBufferLength { .. },
..
} => {
}
RtmpMessage::UserControl {
event: RtmpUserControlEvent::PingRequest { timestamp },
..
} => {
let response = RtmpMessage::UserControl {
header: RtmpMessageHeader::PCM,
event: RtmpUserControlEvent::PingResponse { timestamp },
};
self.message_channel.feed_send_message(response);
}
RtmpMessage::UserControl { event, .. } => {
self.event_queue
.push_back(RtmpConnectionEvent::user_control_event_ignored(&event));
}
message => {
self.event_queue
.push_back(RtmpConnectionEvent::message_ignored(&message));
}
}
Ok(())
}
fn handle_command(
&mut self,
_header: RtmpMessageHeader,
command: RtmpCommand,
) -> Result<(), Error> {
match command {
RtmpCommand::Connect(c) => self.handle_connect_command(c),
RtmpCommand::CreateStream(c) => self.handle_create_stream_command(c),
RtmpCommand::Publish(c) => self.handle_publish_command(c),
RtmpCommand::Play(c) => self.handle_play_command(c),
RtmpCommand::GetStreamLength(c) => self.handle_get_stream_length_command(c),
_ => {
self.event_queue
.push_back(RtmpConnectionEvent::command_ignored(&command));
Ok(())
}
}
}
fn handle_get_stream_length_command(
&mut self,
command: crate::rtmp_command::RtmpGetStreamLengthCommand,
) -> Result<(), Error> {
let message = RtmpCommand::Result(RtmpResultCommand::get_stream_length_result(
command.transaction_id,
0.0,
))
.into_message(RtmpMessageHeader::PCM)?;
self.message_channel.feed_send_message(message);
Ok(())
}
fn handle_publish_command(
&mut self,
command: crate::rtmp_command::RtmpPublishCommand,
) -> Result<(), Error> {
self.state.expect(RtmpConnectionState::MediaStreamCreated)?;
self.change_state(RtmpConnectionState::PublishPending);
self.pending_transaction_id = Some(command.transaction_id);
let connect = self
.connect_command
.as_ref()
.ok_or_else(|| Error::invalid_state("Connect command not set before publish"))?;
self.event_queue
.push_back(RtmpConnectionEvent::PublishRequested {
app: connect.app.clone(),
tc_url: connect.tc_url.clone(),
stream_name: command.stream_name,
});
Ok(())
}
fn handle_play_command(
&mut self,
command: crate::rtmp_command::RtmpPlayCommand,
) -> Result<(), Error> {
self.state.expect(RtmpConnectionState::MediaStreamCreated)?;
self.change_state(RtmpConnectionState::PlayPending);
self.pending_transaction_id = Some(command.transaction_id);
let connect = self
.connect_command
.as_ref()
.ok_or_else(|| Error::invalid_state("Connect command not set before play"))?;
self.event_queue
.push_back(RtmpConnectionEvent::PlayRequested {
app: connect.app.clone(),
tc_url: connect.tc_url.clone(),
stream_name: command.stream_name,
});
Ok(())
}
fn handle_create_stream_command(
&mut self,
command: crate::rtmp_command::RtmpCreateStreamCommand,
) -> Result<(), Error> {
self.state.expect(RtmpConnectionState::Connected)?;
let message = RtmpCommand::Result(RtmpResultCommand::create_stream_result(
command.transaction_id,
RtmpMessageStreamId::MEDIA,
))
.into_message(RtmpMessageHeader::PCM)?;
self.message_channel.feed_send_message(message);
self.message_channel
.feed_send_message(RtmpMessage::stream_begin(RtmpMessageStreamId::MEDIA));
self.change_state(RtmpConnectionState::MediaStreamCreated);
Ok(())
}
fn handle_connect_command(&mut self, command: RtmpConnectCommand) -> Result<(), Error> {
self.message_channel
.feed_send_message(RtmpMessage::win_ack_size(self.options.ack_window_size));
self.message_channel
.set_local_ack_window_size(self.options.ack_window_size);
self.message_channel
.feed_send_message(RtmpMessage::set_chunk_size(self.options.chunk_size));
self.message_channel
.feed_send_message(RtmpMessage::set_peer_bandwidth(
self.options.ack_window_size,
));
self.message_channel
.feed_send_message(RtmpMessage::stream_begin(RtmpMessageStreamId::FIRST));
self.message_channel.feed_send_message(command.accept()?);
self.change_state(RtmpConnectionState::Connected);
self.connect_command = Some(command);
Ok(())
}
pub fn send_buf(&self) -> &[u8] {
if self.state == RtmpConnectionState::Handshaking {
self.handshake.send_buf()
} else {
self.message_channel.send_buf()
}
}
pub fn advance_send_buf(&mut self, n: usize) {
if !self.handshake.is_send_complete() {
self.handshake.advance_send_buf(n);
} else if !self.message_channel.advance_send_buf(n) {
self.event_queue
.push_back(RtmpConnectionEvent::DisconnectedByPeer {
reason: "ACK not received within expected interval".to_owned(),
});
self.change_state(RtmpConnectionState::Disconnecting);
}
}
pub fn state(&self) -> RtmpConnectionState {
self.state
}
pub fn accept(&mut self) -> Result<(), Error> {
if !matches!(
self.state,
RtmpConnectionState::PublishPending | RtmpConnectionState::PlayPending
) {
return Err(Error::invalid_state(format!(
"Cannot accept in {} state",
self.state
)));
}
let transaction_id = self
.pending_transaction_id
.ok_or_else(|| Error::invalid_state("Pending transaction ID not set"))?;
if self.state == RtmpConnectionState::PublishPending {
self.accept_publish(transaction_id)
} else {
self.accept_play(transaction_id)
}
}
fn accept_publish(
&mut self,
transaction_id: crate::rtmp_command::TransactionId,
) -> Result<(), Error> {
self.state.expect(RtmpConnectionState::PublishPending)?;
self.message_channel
.feed_send_message(RtmpPublishCommand::accept(
transaction_id,
RtmpMessageStreamId::MEDIA,
)?);
let message = RtmpCommand::OnStatus(RtmpOnStatusCommand::publish_start()).into_message(
RtmpMessageHeader {
stream_id: RtmpMessageStreamId::MEDIA,
timestamp: RtmpTimestamp::ZERO,
},
)?;
self.message_channel.feed_send_message(message);
self.change_state(RtmpConnectionState::Publishing);
Ok(())
}
fn accept_play(
&mut self,
transaction_id: crate::rtmp_command::TransactionId,
) -> Result<(), Error> {
self.state.expect(RtmpConnectionState::PlayPending)?;
self.message_channel
.feed_send_message(RtmpPlayCommand::accept(
transaction_id,
RtmpMessageStreamId::MEDIA,
)?);
let message = RtmpCommand::OnStatus(RtmpOnStatusCommand::play_start()).into_message(
RtmpMessageHeader {
stream_id: RtmpMessageStreamId::MEDIA,
timestamp: RtmpTimestamp::ZERO,
},
)?;
self.message_channel.feed_send_message(message);
self.change_state(RtmpConnectionState::Playing);
Ok(())
}
pub fn reject(&mut self, reason: &str) -> Result<(), Error> {
if !matches!(
self.state,
RtmpConnectionState::PublishPending | RtmpConnectionState::PlayPending
) {
return Err(Error::invalid_state(format!(
"Cannot reject in {} state",
self.state
)));
}
let message = if self.state == RtmpConnectionState::PublishPending {
RtmpCommand::OnStatus(RtmpOnStatusCommand::publish_bad_name(reason)).into_message(
RtmpMessageHeader {
stream_id: RtmpMessageStreamId::MEDIA,
timestamp: RtmpTimestamp::ZERO,
},
)?
} else {
RtmpCommand::OnStatus(RtmpOnStatusCommand::play_stream_not_found(reason)).into_message(
RtmpMessageHeader {
stream_id: RtmpMessageStreamId::MEDIA,
timestamp: RtmpTimestamp::ZERO,
},
)?
};
self.message_channel.feed_send_message(message);
self.change_state(RtmpConnectionState::Disconnecting);
Ok(())
}
pub fn send_audio(&mut self, frame: AudioFrame) -> Result<(), Error> {
self.state.expect(RtmpConnectionState::Playing)?;
let message = RtmpMessage::Audio {
header: RtmpMessageHeader {
stream_id: RtmpMessageStreamId::MEDIA,
timestamp: frame.timestamp,
},
frame,
};
self.message_channel.feed_send_message(message);
Ok(())
}
pub fn send_video(&mut self, frame: VideoFrame) -> Result<(), Error> {
self.state.expect(RtmpConnectionState::Playing)?;
let message = RtmpMessage::Video {
header: RtmpMessageHeader {
stream_id: RtmpMessageStreamId::MEDIA,
timestamp: frame.timestamp,
},
frame,
};
self.message_channel.feed_send_message(message);
Ok(())
}
pub fn next_event(&mut self) -> Option<RtmpConnectionEvent> {
self.event_queue.pop_front()
}
}
impl Default for RtmpServerConnection {
fn default() -> Self {
Self::new()
}
}