use super::*;
pub struct ServerConnection {
info: ConnectionInfo,
stream: TcpStream,
handshake: Handshake,
chunk_stream: ChunkStream,
config: RtmpServerConfig,
read_buffer: BytesMut,
timestamp: u32,
transaction_id: f64,
message_tx: mpsc::UnboundedSender<OutgoingMessage>,
message_rx: mpsc::UnboundedReceiver<OutgoingMessage>,
stream_registry: Arc<StreamRegistry>,
auth_handler: Arc<dyn AuthHandler>,
media_broadcaster: Option<broadcast::Sender<MediaPacket>>,
media_receiver: Option<broadcast::Receiver<MediaPacket>>,
stream_metadata: Option<StreamMetadata>,
}
impl ServerConnection {
pub(super) fn new(
id: u64,
stream: TcpStream,
address: SocketAddr,
config: RtmpServerConfig,
stream_registry: Arc<StreamRegistry>,
auth_handler: Arc<dyn AuthHandler>,
) -> Self {
let (message_tx, message_rx) = mpsc::unbounded_channel();
let mut chunk_stream = ChunkStream::new();
chunk_stream.set_tx_chunk_size(config.chunk_size);
chunk_stream.set_rx_chunk_size(config.chunk_size);
Self {
info: ConnectionInfo::new(id, address),
stream,
handshake: Handshake::new(),
chunk_stream,
config,
read_buffer: BytesMut::with_capacity(8192),
timestamp: 0,
transaction_id: 1.0,
message_tx,
message_rx,
stream_registry,
auth_handler,
media_broadcaster: None,
media_receiver: None,
stream_metadata: None,
}
}
#[must_use]
pub const fn info(&self) -> &ConnectionInfo {
&self.info
}
pub async fn run(mut self) -> NetResult<()> {
if let Err(e) = self.perform_handshake().await {
self.cleanup().await;
return Err(e);
}
self.info.state = ServerConnectionState::WaitingConnect;
self.send_initial_messages().await?;
loop {
match self.read_and_process_messages().await {
Ok(()) => {}
Err(NetError::Eof) => break,
Err(e) => {
self.cleanup().await;
return Err(e);
}
}
loop {
match self.message_rx.try_recv() {
Ok(msg) => {
if let Err(e) = self
.send_message_internal(msg.message, msg.chunk_stream_id)
.await
{
self.cleanup().await;
return Err(e);
}
}
Err(_) => break,
}
}
{
enum RecvOutcome {
Packet(MediaPacket),
Empty,
Closed,
}
loop {
let outcome = if let Some(rx) = &mut self.media_receiver {
match rx.try_recv() {
Ok(pkt) => RecvOutcome::Packet(pkt),
Err(broadcast::error::TryRecvError::Closed) => RecvOutcome::Closed,
Err(_) => RecvOutcome::Empty,
}
} else {
RecvOutcome::Empty
};
match outcome {
RecvOutcome::Packet(packet) => {
if let Err(e) = self.send_media_packet(packet).await {
self.cleanup().await;
return Err(e);
}
}
RecvOutcome::Closed => {
self.cleanup().await;
return Ok(());
}
RecvOutcome::Empty => break,
}
}
}
}
self.cleanup().await;
self.info.state = ServerConnectionState::Closed;
Ok(())
}
async fn cleanup(&self) {
if self.info.state == ServerConnectionState::Publishing {
if !self.info.app.is_empty() && !self.info.stream_name.is_empty() {
let stream_key = format!("{}/{}", self.info.app, self.info.stream_name);
self.stream_registry.unregister_stream(&stream_key).await;
}
}
}
async fn perform_handshake(&mut self) -> NetResult<()> {
self.update_timestamp();
self.handshake.set_epoch(self.timestamp);
let mut buf = vec![0u8; C0_SIZE + HANDSHAKE_SIZE];
timeout(self.config.read_timeout, self.stream.read_exact(&mut buf))
.await
.map_err(|_| NetError::timeout("Handshake read timeout"))?
.map_err(|e| NetError::handshake(format!("Failed to read C0+C1: {e}")))?;
self.info.bytes_received += buf.len() as u64;
self.handshake.parse_c0c1(&buf)?;
let s0s1s2 = self.handshake.generate_s0s1s2();
timeout(self.config.write_timeout, self.stream.write_all(&s0s1s2))
.await
.map_err(|_| NetError::timeout("Handshake write timeout"))?
.map_err(|e| NetError::handshake(format!("Failed to send S0+S1+S2: {e}")))?;
self.info.bytes_sent += s0s1s2.len() as u64;
let mut c2 = vec![0u8; HANDSHAKE_SIZE];
timeout(self.config.read_timeout, self.stream.read_exact(&mut c2))
.await
.map_err(|_| NetError::timeout("Handshake C2 read timeout"))?
.map_err(|e| NetError::handshake(format!("Failed to read C2: {e}")))?;
self.info.bytes_received += c2.len() as u64;
self.handshake.parse_c2(&c2)?;
Ok(())
}
async fn send_initial_messages(&mut self) -> NetResult<()> {
let msg = RtmpMessage::Control(ControlMessage::WindowAckSize(self.config.window_ack_size));
self.send_message_internal(msg, 2).await?;
let msg = RtmpMessage::Control(ControlMessage::SetPeerBandwidth {
size: self.config.window_ack_size,
limit_type: 2, });
self.send_message_internal(msg, 2).await?;
let msg = RtmpMessage::Control(ControlMessage::SetChunkSize(self.config.chunk_size));
self.send_message_internal(msg, 2).await?;
Ok(())
}
async fn read_and_process_messages(&mut self) -> NetResult<()> {
let mut temp_buf = vec![0u8; self.config.chunk_size as usize * 4];
let n = timeout(self.config.read_timeout, self.stream.read(&mut temp_buf))
.await
.map_err(|_| NetError::timeout("Read timeout"))?
.map_err(|e| NetError::connection(format!("Read failed: {e}")))?;
if n == 0 {
return Err(NetError::Eof);
}
self.read_buffer.extend_from_slice(&temp_buf[..n]);
self.info.bytes_received += n as u64;
let assembled = self.chunk_stream.process_chunk(&self.read_buffer)?;
self.read_buffer.clear();
for msg in assembled {
self.handle_message(msg).await?;
}
Ok(())
}
async fn handle_message(&mut self, msg: AssembledMessage) -> NetResult<()> {
let timestamp = msg.header.timestamp;
let stream_id = msg.header.message_stream_id;
let rtmp_msg = self.decode_message(msg)?;
match &rtmp_msg {
RtmpMessage::Control(ctrl) => {
self.handle_control_message(ctrl).await?;
}
RtmpMessage::Command(cmd) => {
self.handle_command_message(cmd).await?;
}
RtmpMessage::Data(data) => {
self.handle_data_message(data).await?;
}
RtmpMessage::Audio(payload) => {
if self.info.state == ServerConnectionState::Publishing {
if let Some(tx) = &self.media_broadcaster {
let packet = MediaPacket {
packet_type: MediaPacketType::Audio,
timestamp,
stream_id,
data: payload.clone(),
};
let _ = tx.send(packet);
}
}
}
RtmpMessage::Video(payload) => {
if self.info.state == ServerConnectionState::Publishing {
if let Some(tx) = &self.media_broadcaster {
let packet = MediaPacket {
packet_type: MediaPacketType::Video,
timestamp,
stream_id,
data: payload.clone(),
};
let _ = tx.send(packet);
}
}
}
_ => {}
}
Ok(())
}
async fn handle_data_message(&mut self, data: &DataMessage) -> NetResult<()> {
if data.handler == "@setDataFrame" || data.handler == "onMetaData" {
if let Some(metadata) = data.values.first() {
if let Some(stream_metadata) = &mut self.stream_metadata {
stream_metadata.update_from_amf(metadata);
}
if self.info.state == ServerConnectionState::Publishing {
if let Some(tx) = &self.media_broadcaster {
let mut encoder = AmfEncoder::new();
encoder.encode(&AmfValue::String(data.handler.clone()));
for value in &data.values {
encoder.encode(value);
}
let packet = MediaPacket {
packet_type: MediaPacketType::Data,
timestamp: 0,
stream_id: self.info.stream_id,
data: encoder.finish(),
};
let _ = tx.send(packet);
}
}
}
}
Ok(())
}
async fn send_media_packet(&mut self, packet: MediaPacket) -> NetResult<()> {
let (csid, msg_type) = match packet.packet_type {
MediaPacketType::Audio => (4, MessageType::Audio as u8),
MediaPacketType::Video => (5, MessageType::Video as u8),
MediaPacketType::Data => (6, MessageType::DataAmf0 as u8),
};
let header = MessageHeader::new(
packet.timestamp,
packet.data.len() as u32,
msg_type,
packet.stream_id,
);
let chunks = self
.chunk_stream
.encode_message(csid, &header, &packet.data);
timeout(self.config.write_timeout, self.stream.write_all(&chunks))
.await
.map_err(|_| NetError::timeout("Media write timeout"))?
.map_err(|e| NetError::connection(format!("Media write failed: {e}")))?;
self.info.bytes_sent += chunks.len() as u64;
Ok(())
}
fn decode_message(&self, msg: AssembledMessage) -> NetResult<RtmpMessage> {
let msg_type = MessageType::from_id(msg.header.message_type).ok_or_else(|| {
NetError::protocol(format!("Unknown message type: {}", msg.header.message_type))
})?;
if msg_type.is_control() {
let ctrl = ControlMessage::decode(msg_type, &msg.payload)?;
Ok(RtmpMessage::Control(ctrl))
} else if msg_type.is_command() {
self.decode_command(&msg.payload)
} else if msg_type == MessageType::DataAmf0 {
self.decode_data(&msg.payload)
} else if msg_type == MessageType::Audio {
Ok(RtmpMessage::Audio(msg.payload))
} else if msg_type == MessageType::Video {
Ok(RtmpMessage::Video(msg.payload))
} else {
Ok(RtmpMessage::Unknown {
type_id: msg.header.message_type,
payload: msg.payload,
})
}
}
fn decode_command(&self, data: &[u8]) -> NetResult<RtmpMessage> {
let mut dec = AmfDecoder::new(data);
let name = dec
.decode()?
.as_str()
.ok_or_else(|| NetError::encoding("Command name must be string"))?
.to_string();
let transaction_id = dec
.decode()?
.as_number()
.ok_or_else(|| NetError::encoding("Transaction ID must be number"))?;
let command_object = if dec.has_remaining() {
Some(dec.decode()?)
} else {
None
};
let mut args = Vec::new();
while dec.has_remaining() {
args.push(dec.decode()?);
}
Ok(RtmpMessage::Command(CommandMessage {
name,
transaction_id,
command_object,
args,
}))
}
fn decode_data(&self, data: &[u8]) -> NetResult<RtmpMessage> {
let mut dec = AmfDecoder::new(data);
let handler = dec
.decode()?
.as_str()
.ok_or_else(|| NetError::encoding("Handler must be string"))?
.to_string();
let mut values = Vec::new();
while dec.has_remaining() {
values.push(dec.decode()?);
}
Ok(RtmpMessage::Data(DataMessage { handler, values }))
}
async fn handle_control_message(&mut self, ctrl: &ControlMessage) -> NetResult<()> {
match ctrl {
ControlMessage::SetChunkSize(size) => {
self.chunk_stream.set_rx_chunk_size(*size);
}
ControlMessage::Acknowledgement(_seq) => {
}
ControlMessage::WindowAckSize(size) => {
let _ = size;
}
ControlMessage::UserControl { event, data, .. } => {
let _ = (event, data);
}
_ => {}
}
Ok(())
}
async fn handle_command_message(&mut self, cmd: &CommandMessage) -> NetResult<()> {
match cmd.name.as_str() {
"connect" => self.handle_connect(cmd).await?,
"createStream" => self.handle_create_stream(cmd).await?,
"publish" => self.handle_publish(cmd).await?,
"play" => self.handle_play(cmd).await?,
"deleteStream" => self.handle_delete_stream(cmd).await?,
"closeStream" => self.handle_close_stream(cmd).await?,
"releaseStream" => self.handle_release_stream(cmd).await?,
"FCPublish" => self.handle_fc_publish(cmd).await?,
"FCUnpublish" => self.handle_fc_unpublish(cmd).await?,
_ => {}
}
Ok(())
}
async fn handle_connect(&mut self, cmd: &CommandMessage) -> NetResult<()> {
let mut app = String::new();
let mut tc_url = String::new();
let params = HashMap::new();
if let Some(obj) = &cmd.command_object {
if let Some(app_val) = obj.get("app").and_then(AmfValue::as_str) {
app = app_val.to_string();
}
if let Some(tc_url_val) = obj.get("tcUrl").and_then(AmfValue::as_str) {
tc_url = tc_url_val.to_string();
}
}
let auth_result = self
.auth_handler
.authenticate_connect(&app, &tc_url, ¶ms)
.await;
match auth_result {
AuthResult::Success => {
self.info.app = app;
self.info.state = ServerConnectionState::Connected;
let mut props = HashMap::new();
props.insert(
"fmsVer".to_string(),
AmfValue::String("OxiMedia/1.0".to_string()),
);
props.insert("capabilities".to_string(), AmfValue::Number(127.0));
props.insert("mode".to_string(), AmfValue::Number(1.0));
let mut info = HashMap::new();
info.insert("level".to_string(), AmfValue::String("status".to_string()));
info.insert(
"code".to_string(),
AmfValue::String("NetConnection.Connect.Success".to_string()),
);
info.insert(
"description".to_string(),
AmfValue::String("Connection succeeded".to_string()),
);
info.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
let result = CommandMessage::result(cmd.transaction_id, AmfValue::Object(info))
.with_command_object(AmfValue::Object(props));
self.send_command(result, 3).await?;
}
AuthResult::Failed(reason) => {
let mut info = HashMap::new();
info.insert("level".to_string(), AmfValue::String("error".to_string()));
info.insert(
"code".to_string(),
AmfValue::String("NetConnection.Connect.Rejected".to_string()),
);
info.insert("description".to_string(), AmfValue::String(reason));
let error = CommandMessage::error(cmd.transaction_id, AmfValue::Object(info));
self.send_command(error, 3).await?;
return Err(NetError::authentication("Connection rejected"));
}
}
Ok(())
}
async fn handle_create_stream(&mut self, cmd: &CommandMessage) -> NetResult<()> {
self.info.stream_id = 1;
let result = CommandMessage::result(
cmd.transaction_id,
AmfValue::Number(f64::from(self.info.stream_id)),
);
self.send_command(result, 3).await?;
Ok(())
}
async fn handle_publish(&mut self, cmd: &CommandMessage) -> NetResult<()> {
let stream_name = cmd.args.first().and_then(AmfValue::as_str).unwrap_or("");
let publish_type_str = cmd.args.get(1).and_then(AmfValue::as_str).unwrap_or("live");
let publish_type = PublishType::from_str(publish_type_str).unwrap_or(PublishType::Live);
let auth_result = self
.auth_handler
.authenticate_publish(&self.info.app, stream_name, publish_type)
.await;
match auth_result {
AuthResult::Success => {
self.info.stream_name = stream_name.to_string();
let metadata = StreamMetadata::new(stream_name, &self.info.app);
let stream_key = format!("{}/{}", self.info.app, stream_name);
let media_tx = self
.stream_registry
.register_stream(stream_key.clone(), metadata.clone(), self.info.id)
.await?;
self.stream_metadata = Some(metadata);
self.media_broadcaster = Some(media_tx);
self.info.state = ServerConnectionState::Publishing;
let user_ctrl = RtmpMessage::Control(ControlMessage::UserControl {
event: crate::rtmp::message::UserControlEvent::StreamBegin,
data: self.info.stream_id,
extra: None,
});
self.send_message_internal(user_ctrl, 2).await?;
let mut info = HashMap::new();
info.insert("level".to_string(), AmfValue::String("status".to_string()));
info.insert(
"code".to_string(),
AmfValue::String("NetStream.Publish.Start".to_string()),
);
info.insert(
"description".to_string(),
AmfValue::String(format!("Publishing {stream_name}")),
);
let status = CommandMessage::new("onStatus", 0.0)
.with_command_object(AmfValue::Null)
.with_arg(AmfValue::Object(info));
self.send_command(status, 3).await?;
}
AuthResult::Failed(reason) => {
let mut info = HashMap::new();
info.insert("level".to_string(), AmfValue::String("error".to_string()));
info.insert(
"code".to_string(),
AmfValue::String("NetStream.Publish.BadName".to_string()),
);
info.insert("description".to_string(), AmfValue::String(reason));
let status = CommandMessage::new("onStatus", 0.0)
.with_command_object(AmfValue::Null)
.with_arg(AmfValue::Object(info));
self.send_command(status, 3).await?;
return Err(NetError::authentication("Publish rejected"));
}
}
Ok(())
}
async fn handle_play(&mut self, cmd: &CommandMessage) -> NetResult<()> {
let stream_name = cmd.args.first().and_then(AmfValue::as_str).unwrap_or("");
let auth_result = self
.auth_handler
.authenticate_play(&self.info.app, stream_name)
.await;
match auth_result {
AuthResult::Success => {
let stream_key = format!("{}/{}", self.info.app, stream_name);
let active_stream = self.stream_registry.get_stream(&stream_key).await;
if let Some(stream) = active_stream {
self.info.stream_name = stream_name.to_string();
self.info.state = ServerConnectionState::Playing;
let media_rx = stream.media_tx.subscribe();
self.media_receiver = Some(media_rx);
let user_ctrl = RtmpMessage::Control(ControlMessage::UserControl {
event: crate::rtmp::message::UserControlEvent::StreamBegin,
data: self.info.stream_id,
extra: None,
});
self.send_message_internal(user_ctrl, 2).await?;
let mut reset_info = HashMap::new();
reset_info.insert("level".to_string(), AmfValue::String("status".to_string()));
reset_info.insert(
"code".to_string(),
AmfValue::String("NetStream.Play.Reset".to_string()),
);
reset_info.insert(
"description".to_string(),
AmfValue::String(format!("Playing {stream_name}")),
);
let reset_status = CommandMessage::new("onStatus", 0.0)
.with_command_object(AmfValue::Null)
.with_arg(AmfValue::Object(reset_info));
self.send_command(reset_status, 3).await?;
let mut start_info = HashMap::new();
start_info.insert("level".to_string(), AmfValue::String("status".to_string()));
start_info.insert(
"code".to_string(),
AmfValue::String("NetStream.Play.Start".to_string()),
);
start_info.insert(
"description".to_string(),
AmfValue::String(format!("Started playing {stream_name}")),
);
let start_status = CommandMessage::new("onStatus", 0.0)
.with_command_object(AmfValue::Null)
.with_arg(AmfValue::Object(start_info));
self.send_command(start_status, 3).await?;
} else {
let mut info = HashMap::new();
info.insert("level".to_string(), AmfValue::String("error".to_string()));
info.insert(
"code".to_string(),
AmfValue::String("NetStream.Play.StreamNotFound".to_string()),
);
info.insert(
"description".to_string(),
AmfValue::String(format!("Stream not found: {stream_name}")),
);
let status = CommandMessage::new("onStatus", 0.0)
.with_command_object(AmfValue::Null)
.with_arg(AmfValue::Object(info));
self.send_command(status, 3).await?;
return Err(NetError::not_found(format!(
"Stream not found: {stream_name}"
)));
}
}
AuthResult::Failed(reason) => {
let mut info = HashMap::new();
info.insert("level".to_string(), AmfValue::String("error".to_string()));
info.insert(
"code".to_string(),
AmfValue::String("NetStream.Play.Failed".to_string()),
);
info.insert("description".to_string(), AmfValue::String(reason));
let status = CommandMessage::new("onStatus", 0.0)
.with_command_object(AmfValue::Null)
.with_arg(AmfValue::Object(info));
self.send_command(status, 3).await?;
return Err(NetError::authentication("Play rejected"));
}
}
Ok(())
}
async fn handle_delete_stream(&mut self, _cmd: &CommandMessage) -> NetResult<()> {
self.info.stream_id = 0;
self.info.state = ServerConnectionState::Connected;
Ok(())
}
async fn handle_close_stream(&mut self, _cmd: &CommandMessage) -> NetResult<()> {
self.info.state = ServerConnectionState::Connected;
Ok(())
}
async fn handle_release_stream(&mut self, cmd: &CommandMessage) -> NetResult<()> {
let result = CommandMessage::result(cmd.transaction_id, AmfValue::Undefined);
self.send_command(result, 3).await?;
Ok(())
}
async fn handle_fc_publish(&mut self, _cmd: &CommandMessage) -> NetResult<()> {
Ok(())
}
async fn handle_fc_unpublish(&mut self, _cmd: &CommandMessage) -> NetResult<()> {
Ok(())
}
async fn send_command(&mut self, cmd: CommandMessage, csid: u32) -> NetResult<()> {
let msg = RtmpMessage::Command(cmd);
self.send_message_internal(msg, csid).await
}
async fn send_message_internal(&mut self, message: RtmpMessage, csid: u32) -> NetResult<()> {
let payload = self.encode_message_payload(&message)?;
self.update_timestamp();
let header = MessageHeader::new(
self.timestamp,
payload.len() as u32,
message.type_id(),
self.info.stream_id,
);
let chunks = self.chunk_stream.encode_message(csid, &header, &payload);
timeout(self.config.write_timeout, self.stream.write_all(&chunks))
.await
.map_err(|_| NetError::timeout("Write timeout"))?
.map_err(|e| NetError::connection(format!("Write failed: {e}")))?;
self.info.bytes_sent += chunks.len() as u64;
Ok(())
}
fn encode_message_payload(&self, message: &RtmpMessage) -> NetResult<Bytes> {
match message {
RtmpMessage::Control(ctrl) => Ok(ctrl.encode()),
RtmpMessage::Command(cmd) => self.encode_command(cmd),
RtmpMessage::Data(data) => self.encode_data(data),
RtmpMessage::Audio(bytes) => Ok(bytes.clone()),
RtmpMessage::Video(bytes) => Ok(bytes.clone()),
RtmpMessage::Unknown { payload, .. } => Ok(payload.clone()),
}
}
fn encode_command(&self, cmd: &CommandMessage) -> NetResult<Bytes> {
let mut enc = AmfEncoder::new();
enc.encode(&AmfValue::String(cmd.name.clone()));
enc.encode(&AmfValue::Number(cmd.transaction_id));
if let Some(obj) = &cmd.command_object {
enc.encode(obj);
} else {
enc.encode(&AmfValue::Null);
}
for arg in &cmd.args {
enc.encode(arg);
}
Ok(enc.finish())
}
fn encode_data(&self, data: &DataMessage) -> NetResult<Bytes> {
let mut enc = AmfEncoder::new();
enc.encode(&AmfValue::String(data.handler.clone()));
for value in &data.values {
enc.encode(value);
}
Ok(enc.finish())
}
fn update_timestamp(&mut self) {
if let Ok(duration) = SystemTime::now().duration_since(UNIX_EPOCH) {
self.timestamp = duration.as_millis() as u32;
}
}
#[must_use]
pub fn message_sender(&self) -> mpsc::UnboundedSender<OutgoingMessage> {
self.message_tx.clone()
}
}