1use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::Arc;
13
14use bytes::{Bytes, BytesMut};
15use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
16use tokio::net::TcpStream;
17use tokio::sync::broadcast;
18use tokio::time::timeout;
19
20use crate::registry::{BroadcastFrame, FrameType, StreamKey, StreamRegistry};
21
22use crate::amf::AmfValue;
23use crate::error::{Error, ProtocolError, Result};
24use crate::media::flv::FlvTag;
25use crate::media::{AacData, H264Data};
26use crate::protocol::chunk::{ChunkDecoder, ChunkEncoder, RtmpChunk};
27use crate::protocol::constants::*;
28use crate::protocol::handshake::{Handshake, HandshakeRole};
29use crate::protocol::message::{
30 Command, ConnectParams, DataMessage, PlayParams, PublishParams, RtmpMessage, UserControlEvent,
31};
32use crate::protocol::quirks::EncoderType;
33use crate::server::config::ServerConfig;
34use crate::server::handler::{AuthResult, MediaDeliveryMode, RtmpHandler};
35use crate::session::context::{SessionContext, StreamContext};
36use crate::session::state::SessionState;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40enum SubscriberState {
41 Normal,
43 SkippingToKeyframe,
45}
46
47pub struct Connection<H: RtmpHandler> {
49 state: SessionState,
51
52 context: SessionContext,
54
55 reader: BufReader<tokio::io::ReadHalf<TcpStream>>,
57 writer: BufWriter<tokio::io::WriteHalf<TcpStream>>,
58
59 read_buf: BytesMut,
61
62 chunk_decoder: ChunkDecoder,
64 chunk_encoder: ChunkEncoder,
65
66 write_buf: BytesMut,
68
69 config: ServerConfig,
71
72 handler: Arc<H>,
74
75 registry: Arc<StreamRegistry>,
77
78 pending_fc: HashMap<String, f64>,
80
81 publishing_to: Option<StreamKey>,
83
84 subscribed_to: Option<StreamKey>,
86
87 last_audio_ts: Option<u32>,
88
89 last_video_ts: Option<u32>,
90
91 frame_rx: Option<broadcast::Receiver<BroadcastFrame>>,
93
94 subscriber_state: SubscriberState,
96
97 consecutive_lag_count: u32,
99
100 playback_stream_id: Option<u32>,
102
103 is_paused: bool,
105
106 frames_dropped_while_paused: u64,
108
109 skip_audio_until_keyframe: bool,
112}
113
114impl<H: RtmpHandler> Connection<H> {
115 pub fn new(
117 session_id: u64,
118 socket: TcpStream,
119 peer_addr: SocketAddr,
120 config: ServerConfig,
121 handler: Arc<H>,
122 registry: Arc<StreamRegistry>,
123 ) -> Self {
124 let (read_half, write_half) = tokio::io::split(socket);
125
126 Self {
127 state: SessionState::new(session_id, peer_addr),
128 context: SessionContext::new(session_id, peer_addr),
129 reader: BufReader::with_capacity(config.read_buffer_size, read_half),
130 writer: BufWriter::with_capacity(config.write_buffer_size, write_half),
131 read_buf: BytesMut::with_capacity(config.read_buffer_size),
132 chunk_decoder: ChunkDecoder::new(),
133 chunk_encoder: ChunkEncoder::new(),
134 write_buf: BytesMut::with_capacity(config.write_buffer_size),
135 config,
136 handler,
137 registry,
138 pending_fc: HashMap::new(),
139 publishing_to: None,
140 subscribed_to: None,
141 last_audio_ts: None,
142 last_video_ts: None,
143 frame_rx: None,
144 subscriber_state: SubscriberState::Normal,
145 consecutive_lag_count: 0,
146 playback_stream_id: None,
147 is_paused: false,
148 frames_dropped_while_paused: 0,
149 skip_audio_until_keyframe: false,
150 }
151 }
152
153 pub async fn run(&mut self) -> Result<()> {
155 if !self.handler.on_connection(&self.context).await {
157 return Err(Error::Rejected("Connection rejected by handler".into()));
158 }
159
160 self.do_handshake().await?;
162 self.handler.on_handshake_complete(&self.context).await;
163
164 tracing::debug!(
166 session_id = self.state.id,
167 chunk_size = self.config.chunk_size,
168 "Sending set chunk size"
169 );
170 self.send_set_chunk_size(self.config.chunk_size).await?;
171 self.chunk_encoder.set_chunk_size(self.config.chunk_size);
172
173 tracing::debug!(session_id = self.state.id, "Entering main message loop");
174
175 let idle_timeout = self.config.idle_timeout;
177 let result = loop {
178 let mut frame_rx = self.frame_rx.take();
180
181 let loop_result = if let Some(ref mut rx) = frame_rx {
183 tokio::select! {
185 biased;
186
187 frame_result = rx.recv() => {
189 match frame_result {
190 Ok(frame) => {
191 self.frame_rx = frame_rx;
193 self.consecutive_lag_count = 0;
195 if let Err(e) = self.send_broadcast_frame(frame).await {
196 tracing::debug!(error = %e, "Failed to send frame");
197 Err(e)
198 } else {
199 Ok(true)
200 }
201 }
202 Err(broadcast::error::RecvError::Lagged(n)) => {
203 self.frame_rx = frame_rx;
204 self.handle_lag(n).await.map(|_| true)
205 }
206 Err(broadcast::error::RecvError::Closed) => {
207 self.frame_rx = frame_rx;
208 if let Err(e) = self.handle_stream_ended().await {
210 tracing::debug!(error = %e, "Error handling stream end");
211 }
212 Ok(false) }
214 }
215 }
216
217 result = timeout(idle_timeout, self.read_and_process()) => {
219 self.frame_rx = frame_rx;
220 match result {
221 Ok(Ok(continue_loop)) => Ok(continue_loop),
222 Ok(Err(e)) => {
223 tracing::debug!(error = %e, "Processing error");
224 Err(e)
225 }
226 Err(_) => {
227 tracing::debug!("Idle timeout");
228 Ok(false)
229 }
230 }
231 }
232 }
233 } else {
234 self.frame_rx = frame_rx;
236 match timeout(idle_timeout, self.read_and_process()).await {
237 Ok(Ok(continue_loop)) => Ok(continue_loop),
238 Ok(Err(e)) => {
239 tracing::debug!(error = %e, "Processing error");
240 Err(e)
241 }
242 Err(_) => {
243 tracing::debug!("Idle timeout");
244 Ok(false)
245 }
246 }
247 };
248
249 match loop_result {
250 Ok(true) => continue,
251 Ok(false) => break Ok(()),
252 Err(e) => break Err(e),
253 }
254 };
255
256 self.cleanup_on_disconnect().await;
258
259 self.handler.on_disconnect(&self.context).await;
261
262 result
263 }
264
265 async fn cleanup_on_disconnect(&mut self) {
267 if let Some(ref key) = self.publishing_to {
269 self.registry.unregister_publisher(key, self.state.id).await;
270 tracing::debug!(
271 session_id = self.state.id,
272 stream = %key,
273 "Unregistered publisher on disconnect"
274 );
275 }
276
277 if let Some(ref key) = self.subscribed_to {
279 self.registry.unsubscribe(key).await;
280 tracing::debug!(
281 session_id = self.state.id,
282 stream = %key,
283 "Unsubscribed on disconnect"
284 );
285 }
286 }
287
288 async fn handle_lag(&mut self, skipped: u64) -> Result<()> {
290 let config = self.registry.config();
291
292 self.consecutive_lag_count += 1;
293
294 if skipped < config.lag_threshold_low {
295 tracing::debug!(
297 session_id = self.state.id,
298 skipped = skipped,
299 "Minor broadcast lag, continuing"
300 );
301 return Ok(());
302 }
303
304 if self.subscriber_state != SubscriberState::SkippingToKeyframe {
306 self.subscriber_state = SubscriberState::SkippingToKeyframe;
307 tracing::warn!(
308 session_id = self.state.id,
309 skipped = skipped,
310 "Subscriber lagging, skipping to next keyframe"
311 );
312 }
313
314 if self.consecutive_lag_count >= config.max_consecutive_lag_events {
316 tracing::warn!(
317 session_id = self.state.id,
318 consecutive_lags = self.consecutive_lag_count,
319 "Disconnecting slow subscriber"
320 );
321 return Err(Error::Rejected("Subscriber too slow".into()));
322 }
323
324 Ok(())
325 }
326
327 async fn handle_stream_ended(&mut self) -> Result<()> {
329 if self.is_paused {
331 self.is_paused = false;
332 }
333
334 if let Some(stream_id) = self.playback_stream_id {
335 self.send_user_control(UserControlEvent::StreamEof(stream_id))
337 .await?;
338
339 let status = Command::on_status(stream_id, "status", NS_PLAY_STOP, "Stream ended");
341 self.send_command(CSID_COMMAND, stream_id, &status).await?;
342
343 tracing::info!(
344 session_id = self.state.id,
345 stream_id = stream_id,
346 "Stream ended, notified subscriber"
347 );
348 }
349
350 Ok(())
351 }
352
353 async fn do_handshake(&mut self) -> Result<()> {
355 let mut handshake = Handshake::new(HandshakeRole::Server);
356
357 handshake.generate_initial();
359 self.state.start_handshake();
360
361 let connection_timeout = self.config.connection_timeout;
363 timeout(connection_timeout, async {
364 loop {
365 let bytes_needed = handshake.bytes_needed();
367 if bytes_needed > 0 && self.read_buf.len() < bytes_needed {
368 let n = self.reader.read_buf(&mut self.read_buf).await?;
369 if n == 0 {
370 return Err(Error::ConnectionClosed);
371 }
372 }
373
374 let mut buf = Bytes::copy_from_slice(&self.read_buf);
376 let response = handshake.process(&mut buf)?;
377
378 let consumed = self.read_buf.len() - buf.len();
380 if consumed > 0 {
381 self.read_buf.advance(consumed);
382 }
383
384 if let Some(data) = response {
386 self.writer.write_all(&data).await?;
387 self.writer.flush().await?;
388 }
389
390 if handshake.is_done() {
391 break;
392 }
393 }
394
395 Ok::<_, Error>(())
396 })
397 .await
398 .map_err(|_| Error::Timeout)??;
399
400 self.state.complete_handshake();
401 tracing::debug!(
402 session_id = self.state.id,
403 remaining_buf = self.read_buf.len(),
404 "Handshake complete"
405 );
406
407 if !self.read_buf.is_empty() {
409 let dump_len = self.read_buf.len().min(32);
410 let hex: String = self.read_buf[..dump_len]
411 .iter()
412 .map(|b| format!("{:02x}", b))
413 .collect::<Vec<_>>()
414 .join(" ");
415 tracing::debug!(
416 session_id = self.state.id,
417 first_bytes = %hex,
418 "Buffer after handshake"
419 );
420 }
421
422 Ok(())
423 }
424
425 async fn read_and_process(&mut self) -> Result<bool> {
427 tracing::trace!(
428 session_id = self.state.id,
429 buf_len = self.read_buf.len(),
430 "read_and_process called"
431 );
432
433 loop {
437 let buf_len_before = self.read_buf.len();
438
439 if let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
441 tracing::trace!(
442 session_id = self.state.id,
443 csid = chunk.csid,
444 msg_type = chunk.message_type,
445 "Decoded chunk from buffer"
446 );
447 self.handle_chunk(chunk).await?;
448 continue;
450 }
451
452 let buf_len_after = self.read_buf.len();
455 if buf_len_after < buf_len_before {
456 continue;
458 }
459
460 break;
462 }
463
464 tracing::trace!(
465 session_id = self.state.id,
466 buf_len = self.read_buf.len(),
467 "Waiting for more data"
468 );
469
470 let n = self.reader.read_buf(&mut self.read_buf).await?;
472 if n == 0 {
473 return Ok(false); }
475
476 tracing::trace!(
477 session_id = self.state.id,
478 bytes_read = n,
479 buf_len = self.read_buf.len(),
480 "Read data from socket"
481 );
482
483 let needs_ack = self.state.add_bytes_received(n as u64);
484
485 while let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
487 tracing::trace!(
488 session_id = self.state.id,
489 csid = chunk.csid,
490 msg_type = chunk.message_type,
491 "Decoded chunk after read"
492 );
493 self.handle_chunk(chunk).await?;
494 }
495
496 if needs_ack {
498 self.send_acknowledgement().await?;
499 }
500
501 Ok(true)
502 }
503
504 async fn handle_chunk(&mut self, chunk: RtmpChunk) -> Result<()> {
506 let message = RtmpMessage::from_chunk(&chunk)?;
507
508 match message {
509 RtmpMessage::SetChunkSize(size) => {
510 tracing::debug!(size = size, "Peer set chunk size");
511 self.chunk_decoder.set_chunk_size(size);
512 self.state.in_chunk_size = size;
513 }
514
515 RtmpMessage::Abort { csid } => {
516 self.chunk_decoder.abort(csid);
517 }
518
519 RtmpMessage::Acknowledgement { sequence: _ } => {
520 }
522
523 RtmpMessage::WindowAckSize(size) => {
524 self.state.window_ack_size = size;
525 }
526
527 RtmpMessage::SetPeerBandwidth {
528 size,
529 limit_type: _,
530 } => {
531 self.send_window_ack_size(size).await?;
533 }
534
535 RtmpMessage::UserControl(event) => {
536 self.handle_user_control(event).await?;
537 }
538
539 RtmpMessage::Command(cmd) | RtmpMessage::CommandAmf3(cmd) => {
540 self.handle_command(cmd).await?;
541 }
542
543 RtmpMessage::Data(data) | RtmpMessage::DataAmf3(data) => {
544 self.handle_data(data).await?;
545 }
546
547 RtmpMessage::Audio { timestamp, data } => {
548 self.handle_audio(timestamp, data).await?;
549 }
550
551 RtmpMessage::Video { timestamp, data } => {
552 self.handle_video(timestamp, data).await?;
553 }
554
555 _ => {
556 tracing::trace!(message = ?message, "Unhandled message");
557 }
558 }
559
560 Ok(())
561 }
562
563 async fn handle_user_control(&mut self, event: UserControlEvent) -> Result<()> {
565 match event {
566 UserControlEvent::PingRequest(timestamp) => {
567 self.send_ping_response(timestamp).await?;
568 }
569 UserControlEvent::SetBufferLength {
570 stream_id: _,
571 buffer_ms: _,
572 } => {
573 }
575 _ => {}
576 }
577 Ok(())
578 }
579
580 async fn handle_command(&mut self, cmd: Command) -> Result<()> {
582 tracing::debug!(
583 session_id = self.state.id,
584 command = cmd.name,
585 transaction_id = cmd.transaction_id,
586 stream_id = cmd.stream_id,
587 args = ?cmd.arguments,
588 "Received command"
589 );
590 match cmd.name.as_str() {
591 CMD_CONNECT => self.handle_connect(cmd).await?,
592 CMD_CREATE_STREAM => self.handle_create_stream(cmd).await?,
593 CMD_DELETE_STREAM => self.handle_delete_stream(cmd).await?,
594 CMD_PUBLISH => self.handle_publish(cmd).await?,
595 CMD_PLAY => self.handle_play(cmd).await?,
596 CMD_FC_PUBLISH => self.handle_fc_publish(cmd).await?,
597 CMD_FC_UNPUBLISH => self.handle_fc_unpublish(cmd).await?,
598 CMD_RELEASE_STREAM => self.handle_release_stream(cmd).await?,
599 CMD_PAUSE => self.handle_pause(cmd).await?,
600 CMD_CLOSE | "closeStream" => self.handle_close_stream(cmd).await?,
601 _ => {
602 tracing::debug!(command = cmd.name, "Unknown command");
603 }
604 }
605 Ok(())
606 }
607
608 async fn handle_connect(&mut self, cmd: Command) -> Result<()> {
610 let params = ConnectParams::from_amf(&cmd.command_object);
611 let encoder_type = params
612 .flash_ver
613 .as_deref()
614 .map(EncoderType::from_flash_ver)
615 .unwrap_or(EncoderType::Unknown);
616
617 let result = self.handler.on_connect(&self.context, ¶ms).await;
619
620 match result {
621 AuthResult::Accept => {
622 self.state.on_connect(params.clone(), encoder_type);
623 self.context.with_connect(params, encoder_type);
624
625 self.send_window_ack_size(self.config.window_ack_size)
627 .await?;
628
629 self.send_peer_bandwidth(self.config.peer_bandwidth).await?;
631
632 self.send_user_control(UserControlEvent::StreamBegin(0))
634 .await?;
635
636 self.send_connect_result(cmd.transaction_id).await?;
638
639 tracing::info!(
640 session_id = self.state.id,
641 app = self.context.app,
642 "Connected"
643 );
644 }
645 AuthResult::Reject(reason) => {
646 self.send_connect_error(cmd.transaction_id, &reason).await?;
647 return Err(Error::Rejected(reason));
648 }
649 AuthResult::Redirect { url } => {
650 self.send_connect_redirect(cmd.transaction_id, &url).await?;
651 return Err(Error::Rejected(format!("Redirected to {}", url)));
652 }
653 }
654
655 Ok(())
656 }
657
658 async fn handle_create_stream(&mut self, cmd: Command) -> Result<()> {
660 let stream_id = self.state.allocate_stream_id();
661
662 let result = Command::result(
663 cmd.transaction_id,
664 AmfValue::Null,
665 AmfValue::Number(stream_id as f64),
666 );
667
668 self.send_command(CSID_COMMAND, 0, &result).await?;
669
670 tracing::debug!(stream_id = stream_id, "Stream created");
671 Ok(())
672 }
673
674 async fn handle_delete_stream(&mut self, cmd: Command) -> Result<()> {
676 let stream_id = cmd
677 .arguments
678 .first()
679 .and_then(|v| v.as_number())
680 .unwrap_or(0.0) as u32;
681
682 if let Some(stream) = self.state.remove_stream(stream_id) {
683 if stream.is_publishing() {
684 let stream_ctx = StreamContext::new(
685 self.context.clone(),
686 stream_id,
687 stream.stream_key.unwrap_or_default(),
688 true,
689 );
690 #[allow(deprecated)]
691 self.handler.on_publish_stop(&stream_ctx).await;
692 self.handler.on_unpublish(&stream_ctx).await;
693 }
694 }
695
696 Ok(())
697 }
698
699 async fn handle_fc_publish(&mut self, cmd: Command) -> Result<()> {
701 let stream_key = cmd
702 .arguments
703 .first()
704 .and_then(|v| v.as_str())
705 .unwrap_or("")
706 .to_string();
707
708 let result = self.handler.on_fc_publish(&self.context, &stream_key).await;
709
710 match result {
711 AuthResult::Accept => {
712 self.pending_fc
714 .insert(stream_key.clone(), cmd.transaction_id);
715
716 let response = Command {
718 name: CMD_ON_FC_PUBLISH.to_string(),
719 transaction_id: 0.0,
720 command_object: AmfValue::Null,
721 arguments: vec![],
722 stream_id: 0,
723 };
724 self.send_command(CSID_COMMAND, 0, &response).await?;
725 }
726 AuthResult::Reject(reason) => {
727 return Err(Error::Rejected(reason));
728 }
729 AuthResult::Redirect { url } => {
730 return Err(Error::Rejected(format!("Redirected to {}", url)));
731 }
732 }
733
734 Ok(())
735 }
736
737 async fn handle_fc_unpublish(&mut self, cmd: Command) -> Result<()> {
739 let stream_key = cmd.arguments.first().and_then(|v| v.as_str()).unwrap_or("");
740
741 self.pending_fc.remove(stream_key);
742
743 let response = Command {
745 name: CMD_ON_FC_UNPUBLISH.to_string(),
746 transaction_id: 0.0,
747 command_object: AmfValue::Null,
748 arguments: vec![],
749 stream_id: 0,
750 };
751 self.send_command(CSID_COMMAND, 0, &response).await?;
752
753 Ok(())
754 }
755
756 async fn handle_release_stream(&mut self, _cmd: Command) -> Result<()> {
758 Ok(())
760 }
761
762 async fn handle_publish(&mut self, cmd: Command) -> Result<()> {
764 let stream_key = cmd
765 .arguments
766 .first()
767 .and_then(|v| v.as_str())
768 .unwrap_or("")
769 .to_string();
770
771 let publish_type = cmd
772 .arguments
773 .get(1)
774 .and_then(|v| v.as_str())
775 .unwrap_or("live")
776 .to_string();
777
778 let params = PublishParams {
779 stream_key: stream_key.clone(),
780 publish_type: publish_type.clone(),
781 stream_id: cmd.stream_id,
782 };
783
784 let result = self.handler.on_publish(&self.context, ¶ms).await;
785
786 match result {
787 AuthResult::Accept => {
788 let app = self.context.app.clone();
790 let registry_key = StreamKey::new(&app, &stream_key);
791
792 if let Err(e) = self
794 .registry
795 .register_publisher(®istry_key, self.state.id)
796 .await
797 {
798 tracing::warn!(
799 session_id = self.state.id,
800 stream = %registry_key,
801 error = %e,
802 "Failed to register publisher"
803 );
804 let status = Command::on_status(
805 cmd.stream_id,
806 "error",
807 NS_PUBLISH_BAD_NAME,
808 &format!("Stream already publishing: {}", e),
809 );
810 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
811 .await?;
812 return Err(Error::Rejected(format!("Stream already publishing: {}", e)));
813 }
814
815 self.publishing_to = Some(registry_key);
817
818 if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
820 stream.start_publish(stream_key.clone(), publish_type);
821 }
822
823 self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
825 .await?;
826
827 let status = Command::on_status(
829 cmd.stream_id,
830 "status",
831 NS_PUBLISH_START,
832 &format!("{} is now published", stream_key),
833 );
834 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
835 .await?;
836
837 tracing::info!(
838 session_id = self.state.id,
839 stream_key = stream_key,
840 "Publishing started"
841 );
842 }
843 AuthResult::Reject(reason) => {
844 let status =
845 Command::on_status(cmd.stream_id, "error", NS_PUBLISH_BAD_NAME, &reason);
846 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
847 .await?;
848 return Err(Error::Rejected(reason));
849 }
850 AuthResult::Redirect { url } => {
851 return Err(Error::Rejected(format!("Redirected to {}", url)));
852 }
853 }
854
855 Ok(())
856 }
857
858 async fn handle_play(&mut self, cmd: Command) -> Result<()> {
860 let stream_name = cmd
861 .arguments
862 .first()
863 .and_then(|v| v.as_str())
864 .unwrap_or("")
865 .to_string();
866
867 let start = cmd
868 .arguments
869 .get(1)
870 .and_then(|v| v.as_number())
871 .unwrap_or(-2.0);
872
873 let duration = cmd
874 .arguments
875 .get(2)
876 .and_then(|v| v.as_number())
877 .unwrap_or(-1.0);
878
879 let reset = cmd
880 .arguments
881 .get(3)
882 .and_then(|v| v.as_bool())
883 .unwrap_or(true);
884
885 let params = PlayParams {
886 stream_name: stream_name.clone(),
887 start,
888 duration,
889 reset,
890 stream_id: cmd.stream_id,
891 };
892
893 let result = self.handler.on_play(&self.context, ¶ms).await;
894
895 match result {
896 AuthResult::Accept => {
897 let app = self.context.app.clone();
899 let registry_key = StreamKey::new(&app, &stream_name);
900
901 let (rx, catchup_frames) = match self.registry.subscribe(®istry_key).await {
903 Ok(result) => result,
904 Err(e) => {
905 tracing::debug!(
906 session_id = self.state.id,
907 stream = %registry_key,
908 error = %e,
909 "Stream not found for play"
910 );
911 let status = Command::on_status(
912 cmd.stream_id,
913 "error",
914 NS_PLAY_STREAM_NOT_FOUND,
915 &format!("Stream not found: {}", stream_name),
916 );
917 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
918 .await?;
919 return Ok(());
920 }
921 };
922
923 self.subscribed_to = Some(registry_key.clone());
925 self.frame_rx = Some(rx);
926 self.playback_stream_id = Some(cmd.stream_id);
927
928 if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
929 stream.start_play(stream_name.clone());
930 }
931
932 self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
934 .await?;
935
936 if reset {
938 let status = Command::on_status(
939 cmd.stream_id,
940 "status",
941 NS_PLAY_RESET,
942 "Playing and resetting",
943 );
944 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
945 .await?;
946 }
947
948 let status = Command::on_status(
950 cmd.stream_id,
951 "status",
952 NS_PLAY_START,
953 &format!("Started playing {}", stream_name),
954 );
955 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
956 .await?;
957
958 tracing::debug!(
960 session_id = self.state.id,
961 stream = %registry_key,
962 catchup_frames = catchup_frames.len(),
963 "Sending catchup frames"
964 );
965
966 for frame in catchup_frames {
967 self.send_broadcast_frame(frame).await?;
968 }
969
970 tracing::info!(
971 session_id = self.state.id,
972 stream_name = stream_name,
973 "Playing started"
974 );
975 }
976 AuthResult::Reject(reason) => {
977 let status =
978 Command::on_status(cmd.stream_id, "error", NS_PLAY_STREAM_NOT_FOUND, &reason);
979 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
980 .await?;
981 }
982 AuthResult::Redirect { url: _ } => {
983 }
985 }
986
987 Ok(())
988 }
989
990 async fn handle_close_stream(&mut self, cmd: Command) -> Result<()> {
992 if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
993 stream.stop();
994 }
995 Ok(())
996 }
997
998 async fn handle_pause(&mut self, cmd: Command) -> Result<()> {
1000 let stream_id = match self.playback_stream_id {
1001 Some(id) => id,
1002 None => return Ok(()), };
1004
1005 let pause_flag = cmd
1007 .arguments
1008 .first()
1009 .and_then(|v| v.as_bool())
1010 .unwrap_or(true);
1011
1012 if pause_flag {
1013 self.do_pause(stream_id).await
1014 } else {
1015 self.do_unpause(stream_id).await
1016 }
1017 }
1018
1019 async fn do_pause(&mut self, stream_id: u32) -> Result<()> {
1021 if self.is_paused {
1022 return Ok(()); }
1024
1025 self.is_paused = true;
1026 self.frames_dropped_while_paused = 0;
1027
1028 let status = Command::on_status(stream_id, "status", NS_PAUSE_NOTIFY, "Playback paused");
1030 self.send_command(CSID_COMMAND, stream_id, &status).await?;
1031
1032 self.send_user_control(UserControlEvent::StreamDry(stream_id))
1034 .await?;
1035
1036 let stream_key = self
1038 .subscribed_to
1039 .as_ref()
1040 .map(|k| k.name.clone())
1041 .unwrap_or_default();
1042 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1043 self.handler.on_pause(&stream_ctx).await;
1044
1045 tracing::info!(session_id = self.state.id, "Subscriber paused");
1046 Ok(())
1047 }
1048
1049 async fn do_unpause(&mut self, stream_id: u32) -> Result<()> {
1051 if !self.is_paused {
1052 return Ok(()); }
1054
1055 self.is_paused = false;
1056
1057 self.subscriber_state = SubscriberState::SkippingToKeyframe;
1060 self.skip_audio_until_keyframe = true;
1061
1062 let status = Command::on_status(stream_id, "status", NS_UNPAUSE_NOTIFY, "Playback resumed");
1064 self.send_command(CSID_COMMAND, stream_id, &status).await?;
1065
1066 self.send_user_control(UserControlEvent::StreamBegin(stream_id))
1068 .await?;
1069
1070 if let Some(ref key) = self.subscribed_to {
1073 let headers = self.registry.get_sequence_headers(key).await;
1074 tracing::debug!(
1075 session_id = self.state.id,
1076 header_count = headers.len(),
1077 "Resending sequence headers after unpause"
1078 );
1079 for frame in headers {
1080 match frame.frame_type {
1081 FrameType::Video => {
1082 self.send_video(stream_id, frame.timestamp, frame.data)
1083 .await?;
1084 }
1085 FrameType::Audio => {
1086 self.send_audio(stream_id, frame.timestamp, frame.data)
1087 .await?;
1088 }
1089 _ => {}
1090 }
1091 }
1092 self.writer.flush().await?;
1093 }
1094
1095 let stream_key = self
1097 .subscribed_to
1098 .as_ref()
1099 .map(|k| k.name.clone())
1100 .unwrap_or_default();
1101 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1102 self.handler.on_unpause(&stream_ctx).await;
1103
1104 tracing::info!(
1105 session_id = self.state.id,
1106 frames_dropped = self.frames_dropped_while_paused,
1107 "Subscriber unpaused (waiting for keyframe)"
1108 );
1109 self.frames_dropped_while_paused = 0;
1110 Ok(())
1111 }
1112
1113 async fn handle_data(&mut self, data: DataMessage) -> Result<()> {
1115 match data.name.as_str() {
1116 CMD_SET_DATA_FRAME => {
1117 if let Some(AmfValue::String(name)) = data.values.first() {
1119 if name == CMD_ON_METADATA {
1120 self.handle_metadata(data.stream_id, &data.values[1..])
1121 .await?;
1122 }
1123 }
1124 }
1125 CMD_ON_METADATA => {
1126 self.handle_metadata(data.stream_id, &data.values).await?;
1127 }
1128 _ => {
1129 tracing::trace!(name = data.name, "Unknown data message");
1130 }
1131 }
1132 Ok(())
1133 }
1134
1135 async fn handle_metadata(&mut self, stream_id: u32, values: &[AmfValue]) -> Result<()> {
1137 let metadata: HashMap<String, AmfValue> = values
1139 .first()
1140 .and_then(|v| v.as_object().cloned())
1141 .unwrap_or_default();
1142
1143 if let Some(stream) = self.state.get_stream_mut(stream_id) {
1144 stream.on_metadata();
1145 }
1146
1147 if let Some(stream) = self.state.get_stream(stream_id) {
1148 if stream.is_publishing() {
1149 let stream_ctx = StreamContext::new(
1150 self.context.clone(),
1151 stream_id,
1152 stream.stream_key.clone().unwrap_or_default(),
1153 true,
1154 );
1155 self.handler.on_metadata(&stream_ctx, &metadata).await;
1156 }
1157 }
1158
1159 Ok(())
1160 }
1161
1162 async fn handle_audio(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1164 if data.is_empty() {
1165 return Ok(());
1166 }
1167
1168 if let Some(prev_audio_ts) = self.last_audio_ts {
1169 let timestamp_delta = timestamp - prev_audio_ts;
1170 tracing::trace!(
1171 timestamp = timestamp,
1172 last_audio_ts = prev_audio_ts,
1173 timestamp_delta = timestamp_delta,
1174 "connection handle_audio"
1175 );
1176 }
1177 self.last_audio_ts = Some(timestamp);
1178
1179 let stream_id = self.find_publishing_stream()?;
1181 let stream = self
1182 .state
1183 .get_stream_mut(stream_id)
1184 .ok_or_else(|| ProtocolError::StreamNotFound(stream_id))?;
1185
1186 let is_header = data.len() >= 2 && (data[0] >> 4) == 10 && data[1] == 0;
1187 stream.on_audio(timestamp, is_header, data.len());
1188
1189 if is_header {
1191 let tag = FlvTag::audio(timestamp, data.clone());
1192 stream.gop_buffer.set_audio_header(tag);
1193 }
1194
1195 let stream_key = stream.stream_key.clone().unwrap_or_default();
1197 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1198
1199 let mode = self.handler.media_delivery_mode();
1201
1202 if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1203 let tag = FlvTag::audio(timestamp, data.clone());
1204 self.handler.on_media_tag(&stream_ctx, &tag).await;
1205 }
1206
1207 if matches!(
1208 mode,
1209 MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1210 ) {
1211 if data.len() >= 2 && (data[0] >> 4) == 10 {
1212 if let Ok(aac_data) = AacData::parse(data.slice(1..)) {
1214 self.handler
1215 .on_audio_frame(&stream_ctx, &aac_data, timestamp)
1216 .await;
1217 }
1218 }
1219 }
1220
1221 if let Some(ref key) = self.publishing_to {
1223 let frame = BroadcastFrame::audio(timestamp, data, is_header);
1224 self.registry.broadcast(key, frame).await;
1225 }
1226
1227 Ok(())
1228 }
1229
1230 async fn handle_video(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1232 if data.is_empty() {
1233 return Ok(());
1234 }
1235
1236 if let Some(prev_video_ts) = self.last_video_ts {
1237 let timestamp_delta = timestamp - prev_video_ts;
1238 tracing::trace!(
1239 timestamp = timestamp,
1240 last_video_ts = prev_video_ts,
1241 timestamp_delta = timestamp_delta,
1242 "connection handle_video"
1243 );
1244 }
1245 self.last_video_ts = Some(timestamp);
1246
1247 let stream_id = self.find_publishing_stream()?;
1249 let stream = self
1250 .state
1251 .get_stream_mut(stream_id)
1252 .ok_or_else(|| ProtocolError::StreamNotFound(stream_id))?;
1253
1254 let is_keyframe = (data[0] >> 4) == 1;
1255 let is_header = data.len() >= 2 && (data[0] & 0x0F) == 7 && data[1] == 0;
1256 stream.on_video(timestamp, is_keyframe, is_header, data.len());
1257
1258 let tag = FlvTag::video(timestamp, data.clone());
1260
1261 if is_header {
1263 stream.gop_buffer.set_video_header(tag.clone());
1264 } else {
1265 stream.gop_buffer.push(tag.clone());
1267 }
1268
1269 let stream_key = stream.stream_key.clone().unwrap_or_default();
1271 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1272
1273 if is_keyframe && !is_header {
1275 self.handler.on_keyframe(&stream_ctx, timestamp).await;
1276 }
1277
1278 let mode = self.handler.media_delivery_mode();
1280
1281 if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1282 self.handler.on_media_tag(&stream_ctx, &tag).await;
1283 }
1284
1285 if matches!(
1286 mode,
1287 MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1288 ) {
1289 if data.len() >= 2 && (data[0] & 0x0F) == 7 {
1290 if let Ok(h264_data) = H264Data::parse(data.slice(1..)) {
1292 self.handler
1293 .on_video_frame(&stream_ctx, &h264_data, timestamp)
1294 .await;
1295 }
1296 }
1297 }
1298
1299 if let Some(ref key) = self.publishing_to {
1301 let frame = BroadcastFrame::video(timestamp, data, is_keyframe, is_header);
1302 self.registry.broadcast(key, frame).await;
1303 }
1304
1305 Ok(())
1306 }
1307
1308 fn find_publishing_stream(&self) -> Result<u32> {
1310 for (id, stream) in &self.state.streams {
1311 if stream.is_publishing() {
1312 return Ok(*id);
1313 }
1314 }
1315 Err(ProtocolError::StreamNotFound(0).into())
1316 }
1317
1318 async fn send_command(&mut self, csid: u32, stream_id: u32, cmd: &Command) -> Result<()> {
1321 let (msg_type, payload) = RtmpMessage::Command(cmd.clone()).encode();
1322
1323 let chunk = RtmpChunk {
1324 csid,
1325 timestamp: 0,
1326 message_type: msg_type,
1327 stream_id,
1328 payload,
1329 };
1330
1331 self.write_buf.clear();
1332 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1333 self.writer.write_all(&self.write_buf).await?;
1334 self.writer.flush().await?;
1335
1336 Ok(())
1337 }
1338
1339 async fn send_connect_result(&mut self, transaction_id: f64) -> Result<()> {
1340 let mut props = HashMap::new();
1341 props.insert(
1342 "fmsVer".to_string(),
1343 AmfValue::String("FMS/3,5,7,7009".into()),
1344 );
1345 props.insert("capabilities".to_string(), AmfValue::Number(31.0));
1346 props.insert("mode".to_string(), AmfValue::Number(1.0));
1347
1348 let mut info = HashMap::new();
1349 info.insert("level".to_string(), AmfValue::String("status".into()));
1350 info.insert(
1351 "code".to_string(),
1352 AmfValue::String(NC_CONNECT_SUCCESS.into()),
1353 );
1354 info.insert(
1355 "description".to_string(),
1356 AmfValue::String("Connection succeeded".into()),
1357 );
1358 info.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
1359
1360 let result = Command::result(
1361 transaction_id,
1362 AmfValue::Object(props),
1363 AmfValue::Object(info),
1364 );
1365
1366 self.send_command(CSID_COMMAND, 0, &result).await
1367 }
1368
1369 async fn send_connect_error(&mut self, transaction_id: f64, reason: &str) -> Result<()> {
1370 let mut info = HashMap::new();
1371 info.insert("level".to_string(), AmfValue::String("error".into()));
1372 info.insert(
1373 "code".to_string(),
1374 AmfValue::String(NC_CONNECT_REJECTED.into()),
1375 );
1376 info.insert("description".to_string(), AmfValue::String(reason.into()));
1377
1378 let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1379
1380 self.send_command(CSID_COMMAND, 0, &error).await
1381 }
1382
1383 async fn send_connect_redirect(&mut self, transaction_id: f64, url: &str) -> Result<()> {
1384 let mut info = HashMap::new();
1385 info.insert("level".to_string(), AmfValue::String("error".into()));
1386 info.insert(
1387 "code".to_string(),
1388 AmfValue::String(NC_CONNECT_REJECTED.into()),
1389 );
1390 info.insert(
1391 "description".to_string(),
1392 AmfValue::String("Redirect".into()),
1393 );
1394 info.insert("ex.redirect".to_string(), AmfValue::String(url.into()));
1395
1396 let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1397
1398 self.send_command(CSID_COMMAND, 0, &error).await
1399 }
1400
1401 async fn send_set_chunk_size(&mut self, size: u32) -> Result<()> {
1402 let (msg_type, payload) = RtmpMessage::SetChunkSize(size).encode();
1403
1404 let chunk = RtmpChunk {
1405 csid: CSID_PROTOCOL_CONTROL,
1406 timestamp: 0,
1407 message_type: msg_type,
1408 stream_id: 0,
1409 payload,
1410 };
1411
1412 self.write_buf.clear();
1413 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1414 self.writer.write_all(&self.write_buf).await?;
1415 self.writer.flush().await?;
1416
1417 Ok(())
1418 }
1419
1420 async fn send_window_ack_size(&mut self, size: u32) -> Result<()> {
1421 let (msg_type, payload) = RtmpMessage::WindowAckSize(size).encode();
1422
1423 let chunk = RtmpChunk {
1424 csid: CSID_PROTOCOL_CONTROL,
1425 timestamp: 0,
1426 message_type: msg_type,
1427 stream_id: 0,
1428 payload,
1429 };
1430
1431 self.write_buf.clear();
1432 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1433 self.writer.write_all(&self.write_buf).await?;
1434 self.writer.flush().await?;
1435
1436 Ok(())
1437 }
1438
1439 async fn send_peer_bandwidth(&mut self, size: u32) -> Result<()> {
1440 let (msg_type, payload) = RtmpMessage::SetPeerBandwidth {
1441 size,
1442 limit_type: BANDWIDTH_LIMIT_DYNAMIC,
1443 }
1444 .encode();
1445
1446 let chunk = RtmpChunk {
1447 csid: CSID_PROTOCOL_CONTROL,
1448 timestamp: 0,
1449 message_type: msg_type,
1450 stream_id: 0,
1451 payload,
1452 };
1453
1454 self.write_buf.clear();
1455 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1456 self.writer.write_all(&self.write_buf).await?;
1457 self.writer.flush().await?;
1458
1459 Ok(())
1460 }
1461
1462 async fn send_user_control(&mut self, event: UserControlEvent) -> Result<()> {
1463 let (msg_type, payload) = RtmpMessage::UserControl(event).encode();
1464
1465 let chunk = RtmpChunk {
1466 csid: CSID_PROTOCOL_CONTROL,
1467 timestamp: 0,
1468 message_type: msg_type,
1469 stream_id: 0,
1470 payload,
1471 };
1472
1473 self.write_buf.clear();
1474 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1475 self.writer.write_all(&self.write_buf).await?;
1476 self.writer.flush().await?;
1477
1478 Ok(())
1479 }
1480
1481 async fn send_acknowledgement(&mut self) -> Result<()> {
1482 let sequence = self.state.bytes_received as u32;
1483
1484 let (msg_type, payload) = RtmpMessage::Acknowledgement { sequence }.encode();
1485
1486 let chunk = RtmpChunk {
1487 csid: CSID_PROTOCOL_CONTROL,
1488 timestamp: 0,
1489 message_type: msg_type,
1490 stream_id: 0,
1491 payload,
1492 };
1493
1494 self.write_buf.clear();
1495 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1496 self.writer.write_all(&self.write_buf).await?;
1497 self.writer.flush().await?;
1498
1499 self.state.mark_ack_sent();
1500 Ok(())
1501 }
1502
1503 async fn send_ping_response(&mut self, timestamp: u32) -> Result<()> {
1504 self.send_user_control(UserControlEvent::PingResponse(timestamp))
1505 .await
1506 }
1507
1508 async fn send_video(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1512 let (msg_type, payload) = RtmpMessage::Video {
1513 timestamp,
1514 data: data.clone(),
1515 }
1516 .encode();
1517
1518 let chunk = RtmpChunk {
1519 csid: CSID_VIDEO,
1520 timestamp,
1521 message_type: msg_type,
1522 stream_id,
1523 payload,
1524 };
1525
1526 self.write_buf.clear();
1527 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1528 self.writer.write_all(&self.write_buf).await?;
1529 Ok(())
1531 }
1532
1533 async fn send_audio(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1535 let (msg_type, payload) = RtmpMessage::Audio {
1536 timestamp,
1537 data: data.clone(),
1538 }
1539 .encode();
1540
1541 let chunk = RtmpChunk {
1542 csid: CSID_AUDIO,
1543 timestamp,
1544 message_type: msg_type,
1545 stream_id,
1546 payload,
1547 };
1548
1549 self.write_buf.clear();
1550 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1551 self.writer.write_all(&self.write_buf).await?;
1552 Ok(())
1554 }
1555
1556 async fn send_broadcast_frame(&mut self, frame: BroadcastFrame) -> Result<()> {
1561 let stream_id = self.playback_stream_id.unwrap_or(1);
1562
1563 if self.is_paused {
1565 self.frames_dropped_while_paused += 1;
1566 tracing::trace!(session_id = self.state.id, "Frame dropped (paused)");
1567 return Ok(());
1568 }
1569
1570 if self.subscriber_state == SubscriberState::SkippingToKeyframe {
1572 match frame.frame_type {
1573 FrameType::Video => {
1574 if frame.is_keyframe || frame.is_header {
1575 self.subscriber_state = SubscriberState::Normal;
1577 self.skip_audio_until_keyframe = false;
1578 tracing::debug!(
1579 session_id = self.state.id,
1580 "Received keyframe, resuming normal playback"
1581 );
1582 } else {
1583 return Ok(());
1585 }
1586 }
1587 FrameType::Audio => {
1588 if self.skip_audio_until_keyframe {
1591 return Ok(());
1592 }
1593 }
1594 FrameType::Metadata => {
1595 }
1597 }
1598 }
1599
1600 match frame.frame_type {
1602 FrameType::Video => {
1603 self.send_video(stream_id, frame.timestamp, frame.data)
1604 .await?;
1605 }
1606 FrameType::Audio => {
1607 self.send_audio(stream_id, frame.timestamp, frame.data)
1608 .await?;
1609 }
1610 FrameType::Metadata => {
1611 self.send_metadata_frame(stream_id, frame.data).await?;
1613 }
1614 }
1615
1616 self.writer.flush().await?;
1619
1620 Ok(())
1621 }
1622
1623 async fn send_metadata_frame(&mut self, stream_id: u32, data: Bytes) -> Result<()> {
1625 let data_msg = DataMessage {
1627 name: CMD_ON_METADATA.to_string(),
1628 values: vec![AmfValue::String("onMetaData".to_string())],
1629 stream_id,
1630 };
1631
1632 let (msg_type, payload) = RtmpMessage::Data(data_msg).encode();
1633
1634 let chunk = RtmpChunk {
1637 csid: CSID_COMMAND,
1638 timestamp: 0,
1639 message_type: msg_type,
1640 stream_id,
1641 payload: if data.is_empty() { payload } else { data },
1642 };
1643
1644 self.write_buf.clear();
1645 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1646 self.writer.write_all(&self.write_buf).await?;
1647
1648 Ok(())
1649 }
1650}
1651
1652use bytes::Buf;