1use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::Arc;
13
14use bytes::{Bytes, BytesMut};
15use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
16use tokio::net::TcpStream;
17use tokio::sync::broadcast;
18use tokio::time::timeout;
19
20use crate::registry::{BroadcastFrame, FrameType, StreamKey, StreamRegistry};
21
22use crate::amf::AmfValue;
23use crate::error::{Error, ProtocolError, Result};
24use crate::media::enhanced_audio::EnhancedAudioData;
25use crate::media::enhanced_video::EnhancedVideoData;
26use crate::media::flv::FlvTag;
27use crate::media::fourcc::{AudioFourCc, VideoFourCc};
28use crate::media::{AacData, H264Data};
29use crate::protocol::chunk::{ChunkDecoder, ChunkEncoder, RtmpChunk};
30use crate::protocol::constants::*;
31use crate::protocol::enhanced::EnhancedRtmpMode;
32use crate::protocol::handshake::{Handshake, HandshakeRole};
33use crate::protocol::message::{
34 Command, ConnectParams, ConnectResponseBuilder, DataMessage, PlayParams, PublishParams,
35 RtmpMessage, UserControlEvent,
36};
37use crate::protocol::quirks::EncoderType;
38use crate::server::config::ServerConfig;
39use crate::server::handler::{AuthResult, MediaDeliveryMode, RtmpHandler};
40use crate::session::context::{SessionContext, StreamContext};
41use crate::session::state::SessionState;
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45enum DetectedCodec {
46 LegacyVideo(u8),
48 EnhancedVideo(VideoFourCc),
50 LegacyAudio(u8),
52 EnhancedAudio(AudioFourCc),
54}
55
56impl DetectedCodec {
57 fn name(&self) -> &'static str {
59 match self {
60 DetectedCodec::LegacyVideo(id) => match id {
61 7 => "H.264/AVC",
62 4 => "VP6",
63 2 => "H.263",
64 _ => "Unknown Video",
65 },
66 DetectedCodec::EnhancedVideo(fourcc) => match fourcc {
67 VideoFourCc::Avc => "H.264/AVC (E-RTMP)",
68 VideoFourCc::Hevc => "H.265/HEVC",
69 VideoFourCc::Av1 => "AV1",
70 VideoFourCc::Vp9 => "VP9",
71 VideoFourCc::Vp8 => "VP8",
72 },
73 DetectedCodec::LegacyAudio(id) => match id {
74 10 => "AAC",
75 2 => "MP3",
76 11 => "Speex",
77 1 => "ADPCM",
78 0 => "PCM",
79 _ => "Unknown Audio",
80 },
81 DetectedCodec::EnhancedAudio(fourcc) => match fourcc {
82 AudioFourCc::Aac => "AAC (E-RTMP)",
83 AudioFourCc::Opus => "Opus",
84 AudioFourCc::Flac => "FLAC",
85 AudioFourCc::Ac3 => "AC-3",
86 AudioFourCc::Eac3 => "E-AC-3",
87 AudioFourCc::Mp3 => "MP3 (E-RTMP)",
88 },
89 }
90 }
91
92 fn is_enhanced(&self) -> bool {
94 matches!(
95 self,
96 DetectedCodec::EnhancedVideo(_) | DetectedCodec::EnhancedAudio(_)
97 )
98 }
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103enum SubscriberState {
104 Normal,
106 SkippingToKeyframe,
108}
109
110pub struct Connection<H: RtmpHandler> {
112 state: SessionState,
114
115 context: SessionContext,
117
118 reader: BufReader<tokio::io::ReadHalf<TcpStream>>,
120 writer: BufWriter<tokio::io::WriteHalf<TcpStream>>,
121
122 read_buf: BytesMut,
124
125 chunk_decoder: ChunkDecoder,
127 chunk_encoder: ChunkEncoder,
128
129 write_buf: BytesMut,
131
132 config: ServerConfig,
134
135 handler: Arc<H>,
137
138 registry: Arc<StreamRegistry>,
140
141 pending_fc: HashMap<String, f64>,
143
144 publishing_to: Option<StreamKey>,
146
147 subscribed_to: Option<StreamKey>,
149
150 last_audio_ts: Option<u32>,
151
152 last_video_ts: Option<u32>,
153
154 detected_video_codec: Option<DetectedCodec>,
156
157 detected_audio_codec: Option<DetectedCodec>,
159
160 frame_rx: Option<broadcast::Receiver<BroadcastFrame>>,
162
163 subscriber_state: SubscriberState,
165
166 consecutive_lag_count: u32,
168
169 playback_stream_id: Option<u32>,
171
172 is_paused: bool,
174
175 frames_dropped_while_paused: u64,
177
178 skip_audio_until_keyframe: bool,
181}
182
183impl<H: RtmpHandler> Connection<H> {
184 pub fn new(
186 session_id: u64,
187 socket: TcpStream,
188 peer_addr: SocketAddr,
189 config: ServerConfig,
190 handler: Arc<H>,
191 registry: Arc<StreamRegistry>,
192 ) -> Self {
193 let (read_half, write_half) = tokio::io::split(socket);
194
195 Self {
196 state: SessionState::new(session_id, peer_addr),
197 context: SessionContext::new(session_id, peer_addr),
198 reader: BufReader::with_capacity(config.read_buffer_size, read_half),
199 writer: BufWriter::with_capacity(config.write_buffer_size, write_half),
200 read_buf: BytesMut::with_capacity(config.read_buffer_size),
201 chunk_decoder: ChunkDecoder::new(),
202 chunk_encoder: ChunkEncoder::new(),
203 write_buf: BytesMut::with_capacity(config.write_buffer_size),
204 config,
205 handler,
206 registry,
207 pending_fc: HashMap::new(),
208 publishing_to: None,
209 subscribed_to: None,
210 last_audio_ts: None,
211 last_video_ts: None,
212 detected_video_codec: None,
213 detected_audio_codec: None,
214 frame_rx: None,
215 subscriber_state: SubscriberState::Normal,
216 consecutive_lag_count: 0,
217 playback_stream_id: None,
218 is_paused: false,
219 frames_dropped_while_paused: 0,
220 skip_audio_until_keyframe: false,
221 }
222 }
223
224 pub async fn run(&mut self) -> Result<()> {
226 if !self.handler.on_connection(&self.context).await {
228 return Err(Error::Rejected("Connection rejected by handler".into()));
229 }
230
231 self.do_handshake().await?;
233 self.handler.on_handshake_complete(&self.context).await;
234
235 tracing::debug!(
237 session_id = self.state.id,
238 chunk_size = self.config.chunk_size,
239 "Sending set chunk size"
240 );
241 self.send_set_chunk_size(self.config.chunk_size).await?;
242 self.chunk_encoder.set_chunk_size(self.config.chunk_size);
243
244 tracing::debug!(session_id = self.state.id, "Entering main message loop");
245
246 let idle_timeout = self.config.idle_timeout;
248 let result = loop {
249 let mut frame_rx = self.frame_rx.take();
251
252 let loop_result = if let Some(ref mut rx) = frame_rx {
254 tokio::select! {
256 biased;
257
258 frame_result = rx.recv() => {
260 match frame_result {
261 Ok(frame) => {
262 self.frame_rx = frame_rx;
264 self.consecutive_lag_count = 0;
266 if let Err(e) = self.send_broadcast_frame(frame).await {
267 tracing::debug!(error = %e, "Failed to send frame");
268 Err(e)
269 } else {
270 Ok(true)
271 }
272 }
273 Err(broadcast::error::RecvError::Lagged(n)) => {
274 self.frame_rx = frame_rx;
275 self.handle_lag(n).await.map(|_| true)
276 }
277 Err(broadcast::error::RecvError::Closed) => {
278 self.frame_rx = frame_rx;
279 if let Err(e) = self.handle_stream_ended().await {
281 tracing::debug!(error = %e, "Error handling stream end");
282 }
283 Ok(false) }
285 }
286 }
287
288 result = timeout(idle_timeout, self.read_and_process()) => {
290 self.frame_rx = frame_rx;
291 match result {
292 Ok(Ok(continue_loop)) => Ok(continue_loop),
293 Ok(Err(e)) => {
294 tracing::debug!(error = %e, "Processing error");
295 Err(e)
296 }
297 Err(_) => {
298 tracing::debug!("Idle timeout");
299 Ok(false)
300 }
301 }
302 }
303 }
304 } else {
305 self.frame_rx = frame_rx;
307 match timeout(idle_timeout, self.read_and_process()).await {
308 Ok(Ok(continue_loop)) => Ok(continue_loop),
309 Ok(Err(e)) => {
310 tracing::debug!(error = %e, "Processing error");
311 Err(e)
312 }
313 Err(_) => {
314 tracing::debug!("Idle timeout");
315 Ok(false)
316 }
317 }
318 };
319
320 match loop_result {
321 Ok(true) => continue,
322 Ok(false) => break Ok(()),
323 Err(e) => break Err(e),
324 }
325 };
326
327 self.cleanup_on_disconnect().await;
329
330 self.handler.on_disconnect(&self.context).await;
332
333 result
334 }
335
336 async fn cleanup_on_disconnect(&mut self) {
338 if let Some(ref key) = self.publishing_to {
340 self.registry.unregister_publisher(key, self.state.id).await;
341 tracing::debug!(
342 session_id = self.state.id,
343 stream = %key,
344 "Unregistered publisher on disconnect"
345 );
346 }
347
348 if let Some(ref key) = self.subscribed_to {
350 self.registry.unsubscribe(key).await;
351 tracing::debug!(
352 session_id = self.state.id,
353 stream = %key,
354 "Unsubscribed on disconnect"
355 );
356 }
357 }
358
359 async fn handle_lag(&mut self, skipped: u64) -> Result<()> {
361 let config = self.registry.config();
362
363 self.consecutive_lag_count += 1;
364
365 if skipped < config.lag_threshold_low {
366 tracing::debug!(
368 session_id = self.state.id,
369 skipped = skipped,
370 "Minor broadcast lag, continuing"
371 );
372 return Ok(());
373 }
374
375 if self.subscriber_state != SubscriberState::SkippingToKeyframe {
377 self.subscriber_state = SubscriberState::SkippingToKeyframe;
378 tracing::warn!(
379 session_id = self.state.id,
380 skipped = skipped,
381 "Subscriber lagging, skipping to next keyframe"
382 );
383 }
384
385 if self.consecutive_lag_count >= config.max_consecutive_lag_events {
387 tracing::warn!(
388 session_id = self.state.id,
389 consecutive_lags = self.consecutive_lag_count,
390 "Disconnecting slow subscriber"
391 );
392 return Err(Error::Rejected("Subscriber too slow".into()));
393 }
394
395 Ok(())
396 }
397
398 async fn handle_stream_ended(&mut self) -> Result<()> {
400 if self.is_paused {
402 self.is_paused = false;
403 }
404
405 if let Some(stream_id) = self.playback_stream_id {
406 self.send_user_control(UserControlEvent::StreamEof(stream_id))
408 .await?;
409
410 let status = Command::on_status(stream_id, "status", NS_PLAY_STOP, "Stream ended");
412 self.send_command(CSID_COMMAND, stream_id, &status).await?;
413
414 tracing::info!(
415 session_id = self.state.id,
416 stream_id = stream_id,
417 "Stream ended, notified subscriber"
418 );
419 }
420
421 Ok(())
422 }
423
424 async fn do_handshake(&mut self) -> Result<()> {
426 let mut handshake = Handshake::new(HandshakeRole::Server);
427
428 handshake.generate_initial();
430 self.state.start_handshake();
431
432 let connection_timeout = self.config.connection_timeout;
434 timeout(connection_timeout, async {
435 loop {
436 let bytes_needed = handshake.bytes_needed();
438 if bytes_needed > 0 && self.read_buf.len() < bytes_needed {
439 let n = self.reader.read_buf(&mut self.read_buf).await?;
440 if n == 0 {
441 return Err(Error::ConnectionClosed);
442 }
443 }
444
445 let mut buf = Bytes::copy_from_slice(&self.read_buf);
447 let response = handshake.process(&mut buf)?;
448
449 let consumed = self.read_buf.len() - buf.len();
451 if consumed > 0 {
452 self.read_buf.advance(consumed);
453 }
454
455 if let Some(data) = response {
457 self.writer.write_all(&data).await?;
458 self.writer.flush().await?;
459 }
460
461 if handshake.is_done() {
462 break;
463 }
464 }
465
466 Ok::<_, Error>(())
467 })
468 .await
469 .map_err(|_| Error::Timeout)??;
470
471 self.state.complete_handshake();
472 tracing::debug!(
473 session_id = self.state.id,
474 remaining_buf = self.read_buf.len(),
475 "Handshake complete"
476 );
477
478 if !self.read_buf.is_empty() {
480 let dump_len = self.read_buf.len().min(32);
481 let hex: String = self.read_buf[..dump_len]
482 .iter()
483 .map(|b| format!("{:02x}", b))
484 .collect::<Vec<_>>()
485 .join(" ");
486 tracing::debug!(
487 session_id = self.state.id,
488 first_bytes = %hex,
489 "Buffer after handshake"
490 );
491 }
492
493 Ok(())
494 }
495
496 async fn read_and_process(&mut self) -> Result<bool> {
498 tracing::trace!(
499 session_id = self.state.id,
500 buf_len = self.read_buf.len(),
501 "read_and_process called"
502 );
503
504 loop {
508 let buf_len_before = self.read_buf.len();
509
510 if let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
512 tracing::trace!(
513 session_id = self.state.id,
514 csid = chunk.csid,
515 msg_type = chunk.message_type,
516 "Decoded chunk from buffer"
517 );
518 self.handle_chunk(chunk).await?;
519 continue;
521 }
522
523 let buf_len_after = self.read_buf.len();
526 if buf_len_after < buf_len_before {
527 continue;
529 }
530
531 break;
533 }
534
535 tracing::trace!(
536 session_id = self.state.id,
537 buf_len = self.read_buf.len(),
538 "Waiting for more data"
539 );
540
541 let n = self.reader.read_buf(&mut self.read_buf).await?;
543 if n == 0 {
544 return Ok(false); }
546
547 tracing::trace!(
548 session_id = self.state.id,
549 bytes_read = n,
550 buf_len = self.read_buf.len(),
551 "Read data from socket"
552 );
553
554 let needs_ack = self.state.add_bytes_received(n as u64);
555
556 while let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
558 tracing::trace!(
559 session_id = self.state.id,
560 csid = chunk.csid,
561 msg_type = chunk.message_type,
562 "Decoded chunk after read"
563 );
564 self.handle_chunk(chunk).await?;
565 }
566
567 if needs_ack {
569 self.send_acknowledgement().await?;
570 }
571
572 Ok(true)
573 }
574
575 async fn handle_chunk(&mut self, chunk: RtmpChunk) -> Result<()> {
577 let message = RtmpMessage::from_chunk(&chunk)?;
578
579 match message {
580 RtmpMessage::SetChunkSize(size) => {
581 tracing::debug!(size = size, "Peer set chunk size");
582 self.chunk_decoder.set_chunk_size(size);
583 self.state.in_chunk_size = size;
584 }
585
586 RtmpMessage::Abort { csid } => {
587 self.chunk_decoder.abort(csid);
588 }
589
590 RtmpMessage::Acknowledgement { sequence: _ } => {
591 }
593
594 RtmpMessage::WindowAckSize(size) => {
595 self.state.window_ack_size = size;
596 }
597
598 RtmpMessage::SetPeerBandwidth {
599 size,
600 limit_type: _,
601 } => {
602 self.send_window_ack_size(size).await?;
604 }
605
606 RtmpMessage::UserControl(event) => {
607 self.handle_user_control(event).await?;
608 }
609
610 RtmpMessage::Command(cmd) | RtmpMessage::CommandAmf3(cmd) => {
611 self.handle_command(cmd).await?;
612 }
613
614 RtmpMessage::Data(data) | RtmpMessage::DataAmf3(data) => {
615 self.handle_data(data).await?;
616 }
617
618 RtmpMessage::Audio { timestamp, data } => {
619 self.handle_audio(timestamp, data).await?;
620 }
621
622 RtmpMessage::Video { timestamp, data } => {
623 self.handle_video(timestamp, data).await?;
624 }
625
626 _ => {
627 tracing::trace!(message = ?message, "Unhandled message");
628 }
629 }
630
631 Ok(())
632 }
633
634 async fn handle_user_control(&mut self, event: UserControlEvent) -> Result<()> {
636 match event {
637 UserControlEvent::PingRequest(timestamp) => {
638 self.send_ping_response(timestamp).await?;
639 }
640 UserControlEvent::SetBufferLength {
641 stream_id: _,
642 buffer_ms: _,
643 } => {
644 }
646 _ => {}
647 }
648 Ok(())
649 }
650
651 async fn handle_command(&mut self, cmd: Command) -> Result<()> {
653 tracing::debug!(
654 session_id = self.state.id,
655 command = cmd.name,
656 transaction_id = cmd.transaction_id,
657 stream_id = cmd.stream_id,
658 args = ?cmd.arguments,
659 "Received command"
660 );
661 match cmd.name.as_str() {
662 CMD_CONNECT => self.handle_connect(cmd).await?,
663 CMD_CREATE_STREAM => self.handle_create_stream(cmd).await?,
664 CMD_DELETE_STREAM => self.handle_delete_stream(cmd).await?,
665 CMD_PUBLISH => self.handle_publish(cmd).await?,
666 CMD_PLAY => self.handle_play(cmd).await?,
667 CMD_FC_PUBLISH => self.handle_fc_publish(cmd).await?,
668 CMD_FC_UNPUBLISH => self.handle_fc_unpublish(cmd).await?,
669 CMD_RELEASE_STREAM => self.handle_release_stream(cmd).await?,
670 CMD_PAUSE => self.handle_pause(cmd).await?,
671 CMD_CLOSE | "closeStream" => self.handle_close_stream(cmd).await?,
672 _ => {
673 tracing::debug!(command = cmd.name, "Unknown command");
674 }
675 }
676 Ok(())
677 }
678
679 async fn handle_connect(&mut self, cmd: Command) -> Result<()> {
681 let params = ConnectParams::from_amf(&cmd.command_object);
682 let encoder_type = params
683 .flash_ver
684 .as_deref()
685 .map(EncoderType::from_flash_ver)
686 .unwrap_or(EncoderType::Unknown);
687
688 let client_has_ertmp = params.has_enhanced_rtmp();
690 let negotiated_caps = self.negotiate_enhanced_rtmp(¶ms)?;
691
692 let result = self.handler.on_connect(&self.context, ¶ms).await;
694
695 match result {
696 AuthResult::Accept => {
697 self.state.on_connect(params.clone(), encoder_type);
698 self.context.with_connect(params, encoder_type);
699
700 if let Some(ref caps) = negotiated_caps {
702 self.context.with_enhanced_capabilities(caps.clone());
703 }
704
705 self.send_window_ack_size(self.config.window_ack_size)
707 .await?;
708
709 self.send_peer_bandwidth(self.config.peer_bandwidth).await?;
711
712 self.send_user_control(UserControlEvent::StreamBegin(0))
714 .await?;
715
716 self.send_connect_result_with_ertmp(cmd.transaction_id, negotiated_caps.as_ref())
718 .await?;
719
720 tracing::info!(
721 session_id = self.state.id,
722 app = self.context.app,
723 enhanced_rtmp = client_has_ertmp
724 && negotiated_caps.as_ref().map(|c| c.enabled).unwrap_or(false),
725 "Connected"
726 );
727 }
728 AuthResult::Reject(reason) => {
729 self.send_connect_error(cmd.transaction_id, &reason).await?;
730 return Err(Error::Rejected(reason));
731 }
732 AuthResult::Redirect { url } => {
733 self.send_connect_redirect(cmd.transaction_id, &url).await?;
734 return Err(Error::Rejected(format!("Redirected to {}", url)));
735 }
736 }
737
738 Ok(())
739 }
740
741 fn negotiate_enhanced_rtmp(
745 &self,
746 params: &ConnectParams,
747 ) -> Result<Option<crate::protocol::enhanced::EnhancedCapabilities>> {
748 let client_has_ertmp = params.has_enhanced_rtmp();
749
750 match self.config.enhanced_rtmp {
751 EnhancedRtmpMode::LegacyOnly => {
752 tracing::debug!(
754 session_id = self.state.id,
755 "E-RTMP disabled (LegacyOnly mode)"
756 );
757 Ok(None)
758 }
759 EnhancedRtmpMode::EnhancedOnly => {
760 if !client_has_ertmp {
761 tracing::warn!(
763 session_id = self.state.id,
764 "Rejecting legacy client (EnhancedOnly mode)"
765 );
766 return Err(Error::Rejected(
767 "Server requires Enhanced RTMP support".into(),
768 ));
769 }
770
771 let server_caps = self.config.enhanced_capabilities.to_enhanced_capabilities();
773 let client_caps = params.to_enhanced_capabilities();
774 let negotiated = server_caps.intersect(&client_caps);
775
776 tracing::debug!(
777 session_id = self.state.id,
778 video_codecs = negotiated.video_codecs.len(),
779 audio_codecs = negotiated.audio_codecs.len(),
780 caps_ex = negotiated.caps_ex.bits(),
781 "E-RTMP negotiated (EnhancedOnly mode)"
782 );
783
784 Ok(Some(negotiated))
785 }
786 EnhancedRtmpMode::Auto => {
787 if !client_has_ertmp {
788 tracing::debug!(
790 session_id = self.state.id,
791 "Using legacy RTMP (client doesn't support E-RTMP). The client may still send enhanced audio/video"
792 );
793 return Ok(None);
794 }
795
796 let server_caps = self.config.enhanced_capabilities.to_enhanced_capabilities();
798 let client_caps = params.to_enhanced_capabilities();
799 let negotiated = server_caps.intersect(&client_caps);
800
801 if !negotiated.enabled {
802 tracing::debug!(
804 session_id = self.state.id,
805 "Falling back to legacy RTMP (no common E-RTMP capabilities)"
806 );
807 return Ok(None);
808 }
809
810 tracing::debug!(
811 session_id = self.state.id,
812 video_codecs = negotiated.video_codecs.len(),
813 audio_codecs = negotiated.audio_codecs.len(),
814 caps_ex = negotiated.caps_ex.bits(),
815 "E-RTMP negotiated (Auto mode)"
816 );
817
818 Ok(Some(negotiated))
819 }
820 }
821 }
822
823 async fn handle_create_stream(&mut self, cmd: Command) -> Result<()> {
825 let stream_id = self.state.allocate_stream_id();
826
827 let result = Command::result(
828 cmd.transaction_id,
829 AmfValue::Null,
830 AmfValue::Number(stream_id as f64),
831 );
832
833 self.send_command(CSID_COMMAND, 0, &result).await?;
834
835 tracing::debug!(stream_id = stream_id, "Stream created");
836 Ok(())
837 }
838
839 async fn handle_delete_stream(&mut self, cmd: Command) -> Result<()> {
841 let stream_id = cmd
842 .arguments
843 .first()
844 .and_then(|v| v.as_number())
845 .unwrap_or(0.0) as u32;
846
847 if let Some(stream) = self.state.remove_stream(stream_id) {
848 if stream.is_publishing() {
849 let stream_ctx = StreamContext::new(
850 self.context.clone(),
851 stream_id,
852 stream.stream_key.unwrap_or_default(),
853 true,
854 );
855 #[allow(deprecated)]
856 self.handler.on_publish_stop(&stream_ctx).await;
857 self.handler.on_unpublish(&stream_ctx).await;
858 }
859 }
860
861 Ok(())
862 }
863
864 async fn handle_fc_publish(&mut self, cmd: Command) -> Result<()> {
866 let stream_key = cmd
867 .arguments
868 .first()
869 .and_then(|v| v.as_str())
870 .unwrap_or("")
871 .to_string();
872
873 let result = self.handler.on_fc_publish(&self.context, &stream_key).await;
874
875 match result {
876 AuthResult::Accept => {
877 self.pending_fc
879 .insert(stream_key.clone(), cmd.transaction_id);
880
881 let response = Command {
883 name: CMD_ON_FC_PUBLISH.to_string(),
884 transaction_id: 0.0,
885 command_object: AmfValue::Null,
886 arguments: vec![],
887 stream_id: 0,
888 };
889 self.send_command(CSID_COMMAND, 0, &response).await?;
890 }
891 AuthResult::Reject(reason) => {
892 return Err(Error::Rejected(reason));
893 }
894 AuthResult::Redirect { url } => {
895 return Err(Error::Rejected(format!("Redirected to {}", url)));
896 }
897 }
898
899 Ok(())
900 }
901
902 async fn handle_fc_unpublish(&mut self, cmd: Command) -> Result<()> {
904 let stream_key = cmd.arguments.first().and_then(|v| v.as_str()).unwrap_or("");
905
906 self.pending_fc.remove(stream_key);
907
908 let response = Command {
910 name: CMD_ON_FC_UNPUBLISH.to_string(),
911 transaction_id: 0.0,
912 command_object: AmfValue::Null,
913 arguments: vec![],
914 stream_id: 0,
915 };
916 self.send_command(CSID_COMMAND, 0, &response).await?;
917
918 Ok(())
919 }
920
921 async fn handle_release_stream(&mut self, _cmd: Command) -> Result<()> {
923 Ok(())
925 }
926
927 async fn handle_publish(&mut self, cmd: Command) -> Result<()> {
929 let stream_key = cmd
930 .arguments
931 .first()
932 .and_then(|v| v.as_str())
933 .unwrap_or("")
934 .to_string();
935
936 let publish_type = cmd
937 .arguments
938 .get(1)
939 .and_then(|v| v.as_str())
940 .unwrap_or("live")
941 .to_string();
942
943 let params = PublishParams {
944 stream_key: stream_key.clone(),
945 publish_type: publish_type.clone(),
946 stream_id: cmd.stream_id,
947 };
948
949 let result = self.handler.on_publish(&self.context, ¶ms).await;
950
951 match result {
952 AuthResult::Accept => {
953 let app = self.context.app.clone();
955 let registry_key = StreamKey::new(&app, &stream_key);
956
957 if let Err(e) = self
959 .registry
960 .register_publisher(®istry_key, self.state.id)
961 .await
962 {
963 tracing::warn!(
964 session_id = self.state.id,
965 stream = %registry_key,
966 error = %e,
967 "Failed to register publisher"
968 );
969 let status = Command::on_status(
970 cmd.stream_id,
971 "error",
972 NS_PUBLISH_BAD_NAME,
973 &format!("Stream already publishing: {}", e),
974 );
975 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
976 .await?;
977 return Err(Error::Rejected(format!("Stream already publishing: {}", e)));
978 }
979
980 self.publishing_to = Some(registry_key);
982
983 if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
985 stream.start_publish(stream_key.clone(), publish_type);
986 }
987
988 self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
990 .await?;
991
992 let status = Command::on_status(
994 cmd.stream_id,
995 "status",
996 NS_PUBLISH_START,
997 &format!("{} is now published", stream_key),
998 );
999 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1000 .await?;
1001
1002 tracing::info!(
1003 session_id = self.state.id,
1004 stream_key = stream_key,
1005 "Publishing started"
1006 );
1007 }
1008 AuthResult::Reject(reason) => {
1009 let status =
1010 Command::on_status(cmd.stream_id, "error", NS_PUBLISH_BAD_NAME, &reason);
1011 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1012 .await?;
1013 return Err(Error::Rejected(reason));
1014 }
1015 AuthResult::Redirect { url } => {
1016 return Err(Error::Rejected(format!("Redirected to {}", url)));
1017 }
1018 }
1019
1020 Ok(())
1021 }
1022
1023 async fn handle_play(&mut self, cmd: Command) -> Result<()> {
1025 let stream_name = cmd
1026 .arguments
1027 .first()
1028 .and_then(|v| v.as_str())
1029 .unwrap_or("")
1030 .to_string();
1031
1032 let start = cmd
1033 .arguments
1034 .get(1)
1035 .and_then(|v| v.as_number())
1036 .unwrap_or(-2.0);
1037
1038 let duration = cmd
1039 .arguments
1040 .get(2)
1041 .and_then(|v| v.as_number())
1042 .unwrap_or(-1.0);
1043
1044 let reset = cmd
1045 .arguments
1046 .get(3)
1047 .and_then(|v| v.as_bool())
1048 .unwrap_or(true);
1049
1050 let params = PlayParams {
1051 stream_name: stream_name.clone(),
1052 start,
1053 duration,
1054 reset,
1055 stream_id: cmd.stream_id,
1056 };
1057
1058 let result = self.handler.on_play(&self.context, ¶ms).await;
1059
1060 match result {
1061 AuthResult::Accept => {
1062 let app = self.context.app.clone();
1064 let registry_key = StreamKey::new(&app, &stream_name);
1065
1066 let (rx, catchup_frames) = match self.registry.subscribe(®istry_key).await {
1068 Ok(result) => result,
1069 Err(e) => {
1070 tracing::debug!(
1071 session_id = self.state.id,
1072 stream = %registry_key,
1073 error = %e,
1074 "Stream not found for play"
1075 );
1076 let status = Command::on_status(
1077 cmd.stream_id,
1078 "error",
1079 NS_PLAY_STREAM_NOT_FOUND,
1080 &format!("Stream not found: {}", stream_name),
1081 );
1082 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1083 .await?;
1084 return Ok(());
1085 }
1086 };
1087
1088 self.subscribed_to = Some(registry_key.clone());
1090 self.frame_rx = Some(rx);
1091 self.playback_stream_id = Some(cmd.stream_id);
1092
1093 if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
1094 stream.start_play(stream_name.clone());
1095 }
1096
1097 self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
1099 .await?;
1100
1101 if reset {
1103 let status = Command::on_status(
1104 cmd.stream_id,
1105 "status",
1106 NS_PLAY_RESET,
1107 "Playing and resetting",
1108 );
1109 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1110 .await?;
1111 }
1112
1113 let status = Command::on_status(
1115 cmd.stream_id,
1116 "status",
1117 NS_PLAY_START,
1118 &format!("Started playing {}", stream_name),
1119 );
1120 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1121 .await?;
1122
1123 tracing::debug!(
1125 session_id = self.state.id,
1126 stream = %registry_key,
1127 catchup_frames = catchup_frames.len(),
1128 "Sending catchup frames"
1129 );
1130
1131 for frame in catchup_frames {
1132 self.send_broadcast_frame(frame).await?;
1133 }
1134
1135 tracing::info!(
1136 session_id = self.state.id,
1137 stream_name = stream_name,
1138 "Playing started"
1139 );
1140 }
1141 AuthResult::Reject(reason) => {
1142 let status =
1143 Command::on_status(cmd.stream_id, "error", NS_PLAY_STREAM_NOT_FOUND, &reason);
1144 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1145 .await?;
1146 }
1147 AuthResult::Redirect { url: _ } => {
1148 }
1150 }
1151
1152 Ok(())
1153 }
1154
1155 async fn handle_close_stream(&mut self, cmd: Command) -> Result<()> {
1157 if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
1158 stream.stop();
1159 }
1160 Ok(())
1161 }
1162
1163 async fn handle_pause(&mut self, cmd: Command) -> Result<()> {
1165 let stream_id = match self.playback_stream_id {
1166 Some(id) => id,
1167 None => return Ok(()), };
1169
1170 let pause_flag = cmd
1172 .arguments
1173 .first()
1174 .and_then(|v| v.as_bool())
1175 .unwrap_or(true);
1176
1177 if pause_flag {
1178 self.do_pause(stream_id).await
1179 } else {
1180 self.do_unpause(stream_id).await
1181 }
1182 }
1183
1184 async fn do_pause(&mut self, stream_id: u32) -> Result<()> {
1186 if self.is_paused {
1187 return Ok(()); }
1189
1190 self.is_paused = true;
1191 self.frames_dropped_while_paused = 0;
1192
1193 let status = Command::on_status(stream_id, "status", NS_PAUSE_NOTIFY, "Playback paused");
1195 self.send_command(CSID_COMMAND, stream_id, &status).await?;
1196
1197 self.send_user_control(UserControlEvent::StreamDry(stream_id))
1199 .await?;
1200
1201 let stream_key = self
1203 .subscribed_to
1204 .as_ref()
1205 .map(|k| k.name.clone())
1206 .unwrap_or_default();
1207 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1208 self.handler.on_pause(&stream_ctx).await;
1209
1210 tracing::info!(session_id = self.state.id, "Subscriber paused");
1211 Ok(())
1212 }
1213
1214 async fn do_unpause(&mut self, stream_id: u32) -> Result<()> {
1216 if !self.is_paused {
1217 return Ok(()); }
1219
1220 self.is_paused = false;
1221
1222 self.subscriber_state = SubscriberState::SkippingToKeyframe;
1225 self.skip_audio_until_keyframe = true;
1226
1227 let status = Command::on_status(stream_id, "status", NS_UNPAUSE_NOTIFY, "Playback resumed");
1229 self.send_command(CSID_COMMAND, stream_id, &status).await?;
1230
1231 self.send_user_control(UserControlEvent::StreamBegin(stream_id))
1233 .await?;
1234
1235 if let Some(ref key) = self.subscribed_to {
1238 let headers = self.registry.get_sequence_headers(key).await;
1239 tracing::debug!(
1240 session_id = self.state.id,
1241 header_count = headers.len(),
1242 "Resending sequence headers after unpause"
1243 );
1244 for frame in headers {
1245 match frame.frame_type {
1246 FrameType::Video => {
1247 self.send_video(stream_id, frame.timestamp, frame.data)
1248 .await?;
1249 }
1250 FrameType::Audio => {
1251 self.send_audio(stream_id, frame.timestamp, frame.data)
1252 .await?;
1253 }
1254 _ => {}
1255 }
1256 }
1257 self.writer.flush().await?;
1258 }
1259
1260 let stream_key = self
1262 .subscribed_to
1263 .as_ref()
1264 .map(|k| k.name.clone())
1265 .unwrap_or_default();
1266 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1267 self.handler.on_unpause(&stream_ctx).await;
1268
1269 tracing::info!(
1270 session_id = self.state.id,
1271 frames_dropped = self.frames_dropped_while_paused,
1272 "Subscriber unpaused (waiting for keyframe)"
1273 );
1274 self.frames_dropped_while_paused = 0;
1275 Ok(())
1276 }
1277
1278 async fn handle_data(&mut self, data: DataMessage) -> Result<()> {
1280 match data.name.as_str() {
1281 CMD_SET_DATA_FRAME => {
1282 if let Some(AmfValue::String(name)) = data.values.first() {
1284 if name == CMD_ON_METADATA {
1285 self.handle_metadata(data.stream_id, &data.values[1..])
1286 .await?;
1287 }
1288 }
1289 }
1290 CMD_ON_METADATA => {
1291 self.handle_metadata(data.stream_id, &data.values).await?;
1292 }
1293 _ => {
1294 tracing::trace!(name = data.name, "Unknown data message");
1295 }
1296 }
1297 Ok(())
1298 }
1299
1300 async fn handle_metadata(&mut self, stream_id: u32, values: &[AmfValue]) -> Result<()> {
1302 let metadata: HashMap<String, AmfValue> = values
1304 .first()
1305 .and_then(|v| v.as_object().cloned())
1306 .unwrap_or_default();
1307
1308 if let Some(stream) = self.state.get_stream_mut(stream_id) {
1309 stream.on_metadata();
1310 }
1311
1312 if let Some(stream) = self.state.get_stream(stream_id) {
1313 if stream.is_publishing() {
1314 let stream_ctx = StreamContext::new(
1315 self.context.clone(),
1316 stream_id,
1317 stream.stream_key.clone().unwrap_or_default(),
1318 true,
1319 );
1320 self.handler.on_metadata(&stream_ctx, &metadata).await;
1321 }
1322 }
1323
1324 Ok(())
1325 }
1326
1327 async fn handle_audio(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1329 if data.is_empty() {
1330 return Ok(());
1331 }
1332
1333 if let Some(prev_audio_ts) = self.last_audio_ts {
1334 let timestamp_delta = timestamp.wrapping_sub(prev_audio_ts);
1336 tracing::trace!(
1337 timestamp = timestamp,
1338 last_audio_ts = prev_audio_ts,
1339 timestamp_delta = timestamp_delta,
1340 "connection handle_audio"
1341 );
1342 }
1343 self.last_audio_ts = Some(timestamp);
1344
1345 let is_enhanced = EnhancedAudioData::is_enhanced(data[0]);
1347
1348 let detected = if is_enhanced {
1350 if data.len() >= 5 {
1352 AudioFourCc::from_bytes(&data[1..]).map(DetectedCodec::EnhancedAudio)
1353 } else {
1354 None
1355 }
1356 } else {
1357 Some(DetectedCodec::LegacyAudio(data[0] >> 4))
1359 };
1360
1361 if let Some(ref codec) = detected {
1362 if self.detected_audio_codec.as_ref() != Some(codec) {
1363 tracing::info!(
1364 session_id = self.state.id,
1365 codec = codec.name(),
1366 enhanced = codec.is_enhanced(),
1367 "Audio codec detected"
1368 );
1369 self.detected_audio_codec = Some(codec.clone());
1370 }
1371 }
1372
1373 let stream_id = self.find_publishing_stream()?;
1375 let stream = self
1376 .state
1377 .get_stream_mut(stream_id)
1378 .ok_or(ProtocolError::StreamNotFound(stream_id))?;
1379
1380 let is_header = if is_enhanced {
1381 (data[0] & 0x0F) == 0
1383 } else {
1384 data.len() >= 2 && (data[0] >> 4) == 10 && data[1] == 0
1386 };
1387 stream.on_audio(timestamp, is_header, data.len());
1388
1389 if is_header {
1391 let tag = FlvTag::audio(timestamp, data.clone());
1392 stream.gop_buffer.set_audio_header(tag);
1393 }
1394
1395 let stream_key = stream.stream_key.clone().unwrap_or_default();
1397 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1398
1399 let mode = self.handler.media_delivery_mode();
1401
1402 if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1403 let tag = FlvTag::audio(timestamp, data.clone());
1404 self.handler.on_media_tag(&stream_ctx, &tag).await;
1405 }
1406
1407 if matches!(
1408 mode,
1409 MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1410 ) {
1411 if is_enhanced {
1412 if let Ok(enhanced_data) = EnhancedAudioData::parse(data.clone()) {
1414 self.handler
1415 .on_enhanced_audio_frame(&stream_ctx, &enhanced_data, timestamp)
1416 .await;
1417 }
1418 } else if data.len() >= 2 && (data[0] >> 4) == 10 {
1419 if let Ok(aac_data) = AacData::parse(data.slice(1..)) {
1421 self.handler
1422 .on_audio_frame(&stream_ctx, &aac_data, timestamp)
1423 .await;
1424 }
1425 }
1426 }
1427
1428 if let Some(ref key) = self.publishing_to {
1430 let frame = BroadcastFrame::audio(timestamp, data, is_header);
1431 self.registry.broadcast(key, frame).await;
1432 }
1433
1434 Ok(())
1435 }
1436
1437 async fn handle_video(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1439 if data.is_empty() {
1440 return Ok(());
1441 }
1442
1443 if let Some(prev_video_ts) = self.last_video_ts {
1444 let timestamp_delta = timestamp.wrapping_sub(prev_video_ts);
1446 tracing::trace!(
1447 timestamp = timestamp,
1448 last_video_ts = prev_video_ts,
1449 timestamp_delta = timestamp_delta,
1450 "connection handle_video"
1451 );
1452 }
1453 self.last_video_ts = Some(timestamp);
1454
1455 let is_enhanced = EnhancedVideoData::is_enhanced(data[0]);
1457
1458 let detected = if is_enhanced {
1460 if data.len() >= 5 {
1462 VideoFourCc::from_bytes(&data[1..]).map(DetectedCodec::EnhancedVideo)
1463 } else {
1464 None
1465 }
1466 } else {
1467 Some(DetectedCodec::LegacyVideo(data[0] & 0x0F))
1469 };
1470
1471 if let Some(ref codec) = detected {
1472 if self.detected_video_codec.as_ref() != Some(codec) {
1473 tracing::info!(
1474 session_id = self.state.id,
1475 codec = codec.name(),
1476 enhanced = codec.is_enhanced(),
1477 "Video codec detected"
1478 );
1479 self.detected_video_codec = Some(codec.clone());
1480 }
1481 }
1482
1483 let stream_id = self.find_publishing_stream()?;
1485 let stream = self
1486 .state
1487 .get_stream_mut(stream_id)
1488 .ok_or(ProtocolError::StreamNotFound(stream_id))?;
1489
1490 let (is_keyframe, is_header) = if is_enhanced {
1491 let frame_type = (data[0] >> 4) & 0x07;
1493 let packet_type = data[0] & 0x0F;
1494 let is_keyframe = frame_type == 1 || frame_type == 4; let is_header = packet_type == 0; (is_keyframe, is_header)
1497 } else {
1498 let is_keyframe = (data[0] >> 4) == 1;
1500 let is_header = data.len() >= 2 && (data[0] & 0x0F) == 7 && data[1] == 0;
1501 (is_keyframe, is_header)
1502 };
1503 stream.on_video(timestamp, is_keyframe, is_header, data.len());
1504
1505 let tag = FlvTag::video(timestamp, data.clone());
1507
1508 if is_header {
1510 stream.gop_buffer.set_video_header(tag.clone());
1511 } else {
1512 stream.gop_buffer.push(tag.clone());
1514 }
1515
1516 let stream_key = stream.stream_key.clone().unwrap_or_default();
1518 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1519
1520 if is_keyframe && !is_header {
1522 self.handler.on_keyframe(&stream_ctx, timestamp).await;
1523 }
1524
1525 let mode = self.handler.media_delivery_mode();
1527
1528 if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1529 self.handler.on_media_tag(&stream_ctx, &tag).await;
1530 }
1531
1532 if matches!(
1533 mode,
1534 MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1535 ) {
1536 if is_enhanced {
1537 if let Ok(enhanced_data) = EnhancedVideoData::parse(data.clone()) {
1539 self.handler
1540 .on_enhanced_video_frame(&stream_ctx, &enhanced_data, timestamp)
1541 .await;
1542 }
1543 } else if data.len() >= 2 && (data[0] & 0x0F) == 7 {
1544 if let Ok(h264_data) = H264Data::parse(data.slice(1..)) {
1546 self.handler
1547 .on_video_frame(&stream_ctx, &h264_data, timestamp)
1548 .await;
1549 }
1550 }
1551 }
1552
1553 if let Some(ref key) = self.publishing_to {
1555 let frame = BroadcastFrame::video(timestamp, data, is_keyframe, is_header);
1556 self.registry.broadcast(key, frame).await;
1557 }
1558
1559 Ok(())
1560 }
1561
1562 fn find_publishing_stream(&self) -> Result<u32> {
1564 for (id, stream) in &self.state.streams {
1565 if stream.is_publishing() {
1566 return Ok(*id);
1567 }
1568 }
1569 Err(ProtocolError::StreamNotFound(0).into())
1570 }
1571
1572 async fn send_command(&mut self, csid: u32, stream_id: u32, cmd: &Command) -> Result<()> {
1575 let (msg_type, payload) = RtmpMessage::Command(cmd.clone()).encode();
1576
1577 let chunk = RtmpChunk {
1578 csid,
1579 timestamp: 0,
1580 message_type: msg_type,
1581 stream_id,
1582 payload,
1583 };
1584
1585 self.write_buf.clear();
1586 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1587 self.writer.write_all(&self.write_buf).await?;
1588 self.writer.flush().await?;
1589
1590 Ok(())
1591 }
1592
1593 async fn send_connect_result_with_ertmp(
1594 &mut self,
1595 transaction_id: f64,
1596 negotiated_caps: Option<&crate::protocol::enhanced::EnhancedCapabilities>,
1597 ) -> Result<()> {
1598 let mut builder = ConnectResponseBuilder::new()
1600 .fms_ver("FMS/3,5,7,7009")
1601 .capabilities(31);
1602
1603 if let Some(caps) = negotiated_caps {
1605 if caps.enabled {
1606 builder = builder.enhanced_capabilities(caps);
1607 }
1608 }
1609
1610 let result = builder.build(transaction_id);
1611
1612 self.send_command(CSID_COMMAND, 0, &result).await
1613 }
1614
1615 async fn send_connect_error(&mut self, transaction_id: f64, reason: &str) -> Result<()> {
1616 let mut info = HashMap::new();
1617 info.insert("level".to_string(), AmfValue::String("error".into()));
1618 info.insert(
1619 "code".to_string(),
1620 AmfValue::String(NC_CONNECT_REJECTED.into()),
1621 );
1622 info.insert("description".to_string(), AmfValue::String(reason.into()));
1623
1624 let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1625
1626 self.send_command(CSID_COMMAND, 0, &error).await
1627 }
1628
1629 async fn send_connect_redirect(&mut self, transaction_id: f64, url: &str) -> Result<()> {
1630 let mut info = HashMap::new();
1631 info.insert("level".to_string(), AmfValue::String("error".into()));
1632 info.insert(
1633 "code".to_string(),
1634 AmfValue::String(NC_CONNECT_REJECTED.into()),
1635 );
1636 info.insert(
1637 "description".to_string(),
1638 AmfValue::String("Redirect".into()),
1639 );
1640 info.insert("ex.redirect".to_string(), AmfValue::String(url.into()));
1641
1642 let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1643
1644 self.send_command(CSID_COMMAND, 0, &error).await
1645 }
1646
1647 async fn send_set_chunk_size(&mut self, size: u32) -> Result<()> {
1648 let (msg_type, payload) = RtmpMessage::SetChunkSize(size).encode();
1649
1650 let chunk = RtmpChunk {
1651 csid: CSID_PROTOCOL_CONTROL,
1652 timestamp: 0,
1653 message_type: msg_type,
1654 stream_id: 0,
1655 payload,
1656 };
1657
1658 self.write_buf.clear();
1659 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1660 self.writer.write_all(&self.write_buf).await?;
1661 self.writer.flush().await?;
1662
1663 Ok(())
1664 }
1665
1666 async fn send_window_ack_size(&mut self, size: u32) -> Result<()> {
1667 let (msg_type, payload) = RtmpMessage::WindowAckSize(size).encode();
1668
1669 let chunk = RtmpChunk {
1670 csid: CSID_PROTOCOL_CONTROL,
1671 timestamp: 0,
1672 message_type: msg_type,
1673 stream_id: 0,
1674 payload,
1675 };
1676
1677 self.write_buf.clear();
1678 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1679 self.writer.write_all(&self.write_buf).await?;
1680 self.writer.flush().await?;
1681
1682 Ok(())
1683 }
1684
1685 async fn send_peer_bandwidth(&mut self, size: u32) -> Result<()> {
1686 let (msg_type, payload) = RtmpMessage::SetPeerBandwidth {
1687 size,
1688 limit_type: BANDWIDTH_LIMIT_DYNAMIC,
1689 }
1690 .encode();
1691
1692 let chunk = RtmpChunk {
1693 csid: CSID_PROTOCOL_CONTROL,
1694 timestamp: 0,
1695 message_type: msg_type,
1696 stream_id: 0,
1697 payload,
1698 };
1699
1700 self.write_buf.clear();
1701 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1702 self.writer.write_all(&self.write_buf).await?;
1703 self.writer.flush().await?;
1704
1705 Ok(())
1706 }
1707
1708 async fn send_user_control(&mut self, event: UserControlEvent) -> Result<()> {
1709 let (msg_type, payload) = RtmpMessage::UserControl(event).encode();
1710
1711 let chunk = RtmpChunk {
1712 csid: CSID_PROTOCOL_CONTROL,
1713 timestamp: 0,
1714 message_type: msg_type,
1715 stream_id: 0,
1716 payload,
1717 };
1718
1719 self.write_buf.clear();
1720 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1721 self.writer.write_all(&self.write_buf).await?;
1722 self.writer.flush().await?;
1723
1724 Ok(())
1725 }
1726
1727 async fn send_acknowledgement(&mut self) -> Result<()> {
1728 let sequence = self.state.bytes_received as u32;
1729
1730 let (msg_type, payload) = RtmpMessage::Acknowledgement { sequence }.encode();
1731
1732 let chunk = RtmpChunk {
1733 csid: CSID_PROTOCOL_CONTROL,
1734 timestamp: 0,
1735 message_type: msg_type,
1736 stream_id: 0,
1737 payload,
1738 };
1739
1740 self.write_buf.clear();
1741 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1742 self.writer.write_all(&self.write_buf).await?;
1743 self.writer.flush().await?;
1744
1745 self.state.mark_ack_sent();
1746 Ok(())
1747 }
1748
1749 async fn send_ping_response(&mut self, timestamp: u32) -> Result<()> {
1750 self.send_user_control(UserControlEvent::PingResponse(timestamp))
1751 .await
1752 }
1753
1754 async fn send_video(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1758 let (msg_type, payload) = RtmpMessage::Video {
1759 timestamp,
1760 data: data.clone(),
1761 }
1762 .encode();
1763
1764 let chunk = RtmpChunk {
1765 csid: CSID_VIDEO,
1766 timestamp,
1767 message_type: msg_type,
1768 stream_id,
1769 payload,
1770 };
1771
1772 self.write_buf.clear();
1773 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1774 self.writer.write_all(&self.write_buf).await?;
1775 Ok(())
1777 }
1778
1779 async fn send_audio(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1781 let (msg_type, payload) = RtmpMessage::Audio {
1782 timestamp,
1783 data: data.clone(),
1784 }
1785 .encode();
1786
1787 let chunk = RtmpChunk {
1788 csid: CSID_AUDIO,
1789 timestamp,
1790 message_type: msg_type,
1791 stream_id,
1792 payload,
1793 };
1794
1795 self.write_buf.clear();
1796 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1797 self.writer.write_all(&self.write_buf).await?;
1798 Ok(())
1800 }
1801
1802 async fn send_broadcast_frame(&mut self, frame: BroadcastFrame) -> Result<()> {
1807 let stream_id = self.playback_stream_id.unwrap_or(1);
1808
1809 if self.is_paused {
1811 self.frames_dropped_while_paused += 1;
1812 tracing::trace!(session_id = self.state.id, "Frame dropped (paused)");
1813 return Ok(());
1814 }
1815
1816 if self.subscriber_state == SubscriberState::SkippingToKeyframe {
1818 match frame.frame_type {
1819 FrameType::Video => {
1820 if frame.is_keyframe || frame.is_header {
1821 self.subscriber_state = SubscriberState::Normal;
1823 self.skip_audio_until_keyframe = false;
1824 tracing::debug!(
1825 session_id = self.state.id,
1826 "Received keyframe, resuming normal playback"
1827 );
1828 } else {
1829 return Ok(());
1831 }
1832 }
1833 FrameType::Audio => {
1834 if self.skip_audio_until_keyframe {
1837 return Ok(());
1838 }
1839 }
1840 FrameType::Metadata => {
1841 }
1843 }
1844 }
1845
1846 match frame.frame_type {
1848 FrameType::Video => {
1849 self.send_video(stream_id, frame.timestamp, frame.data)
1850 .await?;
1851 }
1852 FrameType::Audio => {
1853 self.send_audio(stream_id, frame.timestamp, frame.data)
1854 .await?;
1855 }
1856 FrameType::Metadata => {
1857 self.send_metadata_frame(stream_id, frame.data).await?;
1859 }
1860 }
1861
1862 self.writer.flush().await?;
1865
1866 Ok(())
1867 }
1868
1869 async fn send_metadata_frame(&mut self, stream_id: u32, data: Bytes) -> Result<()> {
1871 let data_msg = DataMessage {
1873 name: CMD_ON_METADATA.to_string(),
1874 values: vec![AmfValue::String("onMetaData".to_string())],
1875 stream_id,
1876 };
1877
1878 let (msg_type, payload) = RtmpMessage::Data(data_msg).encode();
1879
1880 let chunk = RtmpChunk {
1883 csid: CSID_COMMAND,
1884 timestamp: 0,
1885 message_type: msg_type,
1886 stream_id,
1887 payload: if data.is_empty() { payload } else { data },
1888 };
1889
1890 self.write_buf.clear();
1891 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1892 self.writer.write_all(&self.write_buf).await?;
1893
1894 Ok(())
1895 }
1896}
1897
1898use bytes::Buf;