1use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::Arc;
13
14use bytes::{Bytes, BytesMut};
15use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
16use tokio::net::TcpStream;
17use tokio::sync::broadcast;
18use tokio::time::timeout;
19
20use crate::registry::{BroadcastFrame, FrameType, StreamKey, StreamRegistry};
21
22use crate::amf::AmfValue;
23use crate::error::{Error, ProtocolError, Result};
24use crate::media::flv::FlvTag;
25use crate::media::{AacData, H264Data};
26use crate::protocol::chunk::{ChunkDecoder, ChunkEncoder, RtmpChunk};
27use crate::protocol::constants::*;
28use crate::protocol::handshake::{Handshake, HandshakeRole};
29use crate::protocol::message::{
30 Command, ConnectParams, DataMessage, PlayParams, PublishParams, RtmpMessage, UserControlEvent,
31};
32use crate::protocol::quirks::EncoderType;
33use crate::server::config::ServerConfig;
34use crate::server::handler::{AuthResult, MediaDeliveryMode, RtmpHandler};
35use crate::session::context::{SessionContext, StreamContext};
36use crate::session::state::SessionState;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40enum SubscriberState {
41 Normal,
43 SkippingToKeyframe,
45}
46
47pub struct Connection<H: RtmpHandler> {
49 state: SessionState,
51
52 context: SessionContext,
54
55 reader: BufReader<tokio::io::ReadHalf<TcpStream>>,
57 writer: BufWriter<tokio::io::WriteHalf<TcpStream>>,
58
59 read_buf: BytesMut,
61
62 chunk_decoder: ChunkDecoder,
64 chunk_encoder: ChunkEncoder,
65
66 write_buf: BytesMut,
68
69 config: ServerConfig,
71
72 handler: Arc<H>,
74
75 registry: Arc<StreamRegistry>,
77
78 pending_fc: HashMap<String, f64>,
80
81 publishing_to: Option<StreamKey>,
83
84 subscribed_to: Option<StreamKey>,
86
87 last_audio_ts: Option<u32>,
88
89 last_video_ts: Option<u32>,
90
91 frame_rx: Option<broadcast::Receiver<BroadcastFrame>>,
93
94 subscriber_state: SubscriberState,
96
97 consecutive_lag_count: u32,
99
100 playback_stream_id: Option<u32>,
102
103 is_paused: bool,
105
106 frames_dropped_while_paused: u64,
108
109 skip_audio_until_keyframe: bool,
112}
113
114impl<H: RtmpHandler> Connection<H> {
115 pub fn new(
117 session_id: u64,
118 socket: TcpStream,
119 peer_addr: SocketAddr,
120 config: ServerConfig,
121 handler: Arc<H>,
122 registry: Arc<StreamRegistry>,
123 ) -> Self {
124 let (read_half, write_half) = tokio::io::split(socket);
125
126 Self {
127 state: SessionState::new(session_id, peer_addr),
128 context: SessionContext::new(session_id, peer_addr),
129 reader: BufReader::with_capacity(config.read_buffer_size, read_half),
130 writer: BufWriter::with_capacity(config.write_buffer_size, write_half),
131 read_buf: BytesMut::with_capacity(config.read_buffer_size),
132 chunk_decoder: ChunkDecoder::new(),
133 chunk_encoder: ChunkEncoder::new(),
134 write_buf: BytesMut::with_capacity(config.write_buffer_size),
135 config,
136 handler,
137 registry,
138 pending_fc: HashMap::new(),
139 publishing_to: None,
140 subscribed_to: None,
141 last_audio_ts: None,
142 last_video_ts: None,
143 frame_rx: None,
144 subscriber_state: SubscriberState::Normal,
145 consecutive_lag_count: 0,
146 playback_stream_id: None,
147 is_paused: false,
148 frames_dropped_while_paused: 0,
149 skip_audio_until_keyframe: false,
150 }
151 }
152
153 pub async fn run(&mut self) -> Result<()> {
155 if !self.handler.on_connection(&self.context).await {
157 return Err(Error::Rejected("Connection rejected by handler".into()));
158 }
159
160 self.do_handshake().await?;
162 self.handler.on_handshake_complete(&self.context).await;
163
164 tracing::debug!(
166 session_id = self.state.id,
167 chunk_size = self.config.chunk_size,
168 "Sending set chunk size"
169 );
170 self.send_set_chunk_size(self.config.chunk_size).await?;
171 self.chunk_encoder.set_chunk_size(self.config.chunk_size);
172
173 tracing::debug!(session_id = self.state.id, "Entering main message loop");
174
175 let idle_timeout = self.config.idle_timeout;
177 let result = loop {
178 let mut frame_rx = self.frame_rx.take();
180
181 let loop_result = if let Some(ref mut rx) = frame_rx {
183 tokio::select! {
185 biased;
186
187 frame_result = rx.recv() => {
189 match frame_result {
190 Ok(frame) => {
191 self.frame_rx = frame_rx;
193 self.consecutive_lag_count = 0;
195 if let Err(e) = self.send_broadcast_frame(frame).await {
196 tracing::debug!(error = %e, "Failed to send frame");
197 Err(e)
198 } else {
199 Ok(true)
200 }
201 }
202 Err(broadcast::error::RecvError::Lagged(n)) => {
203 self.frame_rx = frame_rx;
204 self.handle_lag(n).await.map(|_| true)
205 }
206 Err(broadcast::error::RecvError::Closed) => {
207 self.frame_rx = frame_rx;
208 if let Err(e) = self.handle_stream_ended().await {
210 tracing::debug!(error = %e, "Error handling stream end");
211 }
212 Ok(false) }
214 }
215 }
216
217 result = timeout(idle_timeout, self.read_and_process()) => {
219 self.frame_rx = frame_rx;
220 match result {
221 Ok(Ok(continue_loop)) => Ok(continue_loop),
222 Ok(Err(e)) => {
223 tracing::debug!(error = %e, "Processing error");
224 Err(e)
225 }
226 Err(_) => {
227 tracing::debug!("Idle timeout");
228 Ok(false)
229 }
230 }
231 }
232 }
233 } else {
234 self.frame_rx = frame_rx;
236 match timeout(idle_timeout, self.read_and_process()).await {
237 Ok(Ok(continue_loop)) => Ok(continue_loop),
238 Ok(Err(e)) => {
239 tracing::debug!(error = %e, "Processing error");
240 Err(e)
241 }
242 Err(_) => {
243 tracing::debug!("Idle timeout");
244 Ok(false)
245 }
246 }
247 };
248
249 match loop_result {
250 Ok(true) => continue,
251 Ok(false) => break Ok(()),
252 Err(e) => break Err(e),
253 }
254 };
255
256 self.cleanup_on_disconnect().await;
258
259 self.handler.on_disconnect(&self.context).await;
261
262 result
263 }
264
265 async fn cleanup_on_disconnect(&mut self) {
267 if let Some(ref key) = self.publishing_to {
269 self.registry.unregister_publisher(key, self.state.id).await;
270 tracing::debug!(
271 session_id = self.state.id,
272 stream = %key,
273 "Unregistered publisher on disconnect"
274 );
275 }
276
277 if let Some(ref key) = self.subscribed_to {
279 self.registry.unsubscribe(key).await;
280 tracing::debug!(
281 session_id = self.state.id,
282 stream = %key,
283 "Unsubscribed on disconnect"
284 );
285 }
286 }
287
288 async fn handle_lag(&mut self, skipped: u64) -> Result<()> {
290 let config = self.registry.config();
291
292 self.consecutive_lag_count += 1;
293
294 if skipped < config.lag_threshold_low {
295 tracing::debug!(
297 session_id = self.state.id,
298 skipped = skipped,
299 "Minor broadcast lag, continuing"
300 );
301 return Ok(());
302 }
303
304 if self.subscriber_state != SubscriberState::SkippingToKeyframe {
306 self.subscriber_state = SubscriberState::SkippingToKeyframe;
307 tracing::warn!(
308 session_id = self.state.id,
309 skipped = skipped,
310 "Subscriber lagging, skipping to next keyframe"
311 );
312 }
313
314 if self.consecutive_lag_count >= config.max_consecutive_lag_events {
316 tracing::warn!(
317 session_id = self.state.id,
318 consecutive_lags = self.consecutive_lag_count,
319 "Disconnecting slow subscriber"
320 );
321 return Err(Error::Rejected("Subscriber too slow".into()));
322 }
323
324 Ok(())
325 }
326
327 async fn handle_stream_ended(&mut self) -> Result<()> {
329 if self.is_paused {
331 self.is_paused = false;
332 }
333
334 if let Some(stream_id) = self.playback_stream_id {
335 self.send_user_control(UserControlEvent::StreamEof(stream_id))
337 .await?;
338
339 let status = Command::on_status(stream_id, "status", NS_PLAY_STOP, "Stream ended");
341 self.send_command(CSID_COMMAND, stream_id, &status).await?;
342
343 tracing::info!(
344 session_id = self.state.id,
345 stream_id = stream_id,
346 "Stream ended, notified subscriber"
347 );
348 }
349
350 Ok(())
351 }
352
353 async fn do_handshake(&mut self) -> Result<()> {
355 let mut handshake = Handshake::new(HandshakeRole::Server);
356
357 handshake.generate_initial();
359 self.state.start_handshake();
360
361 let connection_timeout = self.config.connection_timeout;
363 timeout(connection_timeout, async {
364 loop {
365 let bytes_needed = handshake.bytes_needed();
367 if bytes_needed > 0 && self.read_buf.len() < bytes_needed {
368 let n = self.reader.read_buf(&mut self.read_buf).await?;
369 if n == 0 {
370 return Err(Error::ConnectionClosed);
371 }
372 }
373
374 let mut buf = Bytes::copy_from_slice(&self.read_buf);
376 let response = handshake.process(&mut buf)?;
377
378 let consumed = self.read_buf.len() - buf.len();
380 if consumed > 0 {
381 self.read_buf.advance(consumed);
382 }
383
384 if let Some(data) = response {
386 self.writer.write_all(&data).await?;
387 self.writer.flush().await?;
388 }
389
390 if handshake.is_done() {
391 break;
392 }
393 }
394
395 Ok::<_, Error>(())
396 })
397 .await
398 .map_err(|_| Error::Timeout)??;
399
400 self.state.complete_handshake();
401 tracing::debug!(
402 session_id = self.state.id,
403 remaining_buf = self.read_buf.len(),
404 "Handshake complete"
405 );
406
407 if !self.read_buf.is_empty() {
409 let dump_len = self.read_buf.len().min(32);
410 let hex: String = self.read_buf[..dump_len]
411 .iter()
412 .map(|b| format!("{:02x}", b))
413 .collect::<Vec<_>>()
414 .join(" ");
415 tracing::debug!(
416 session_id = self.state.id,
417 first_bytes = %hex,
418 "Buffer after handshake"
419 );
420 }
421
422 Ok(())
423 }
424
425 async fn read_and_process(&mut self) -> Result<bool> {
427 tracing::trace!(
428 session_id = self.state.id,
429 buf_len = self.read_buf.len(),
430 "read_and_process called"
431 );
432
433 loop {
437 let buf_len_before = self.read_buf.len();
438
439 if let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
441 tracing::trace!(
442 session_id = self.state.id,
443 csid = chunk.csid,
444 msg_type = chunk.message_type,
445 "Decoded chunk from buffer"
446 );
447 self.handle_chunk(chunk).await?;
448 continue;
450 }
451
452 let buf_len_after = self.read_buf.len();
455 if buf_len_after < buf_len_before {
456 continue;
458 }
459
460 break;
462 }
463
464 tracing::trace!(
465 session_id = self.state.id,
466 buf_len = self.read_buf.len(),
467 "Waiting for more data"
468 );
469
470 let n = self.reader.read_buf(&mut self.read_buf).await?;
472 if n == 0 {
473 return Ok(false); }
475
476 tracing::trace!(
477 session_id = self.state.id,
478 bytes_read = n,
479 buf_len = self.read_buf.len(),
480 "Read data from socket"
481 );
482
483 let needs_ack = self.state.add_bytes_received(n as u64);
484
485 while let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
487 tracing::trace!(
488 session_id = self.state.id,
489 csid = chunk.csid,
490 msg_type = chunk.message_type,
491 "Decoded chunk after read"
492 );
493 self.handle_chunk(chunk).await?;
494 }
495
496 if needs_ack {
498 self.send_acknowledgement().await?;
499 }
500
501 Ok(true)
502 }
503
504 async fn handle_chunk(&mut self, chunk: RtmpChunk) -> Result<()> {
506 let message = RtmpMessage::from_chunk(&chunk)?;
507
508 match message {
509 RtmpMessage::SetChunkSize(size) => {
510 tracing::debug!(size = size, "Peer set chunk size");
511 self.chunk_decoder.set_chunk_size(size);
512 self.state.in_chunk_size = size;
513 }
514
515 RtmpMessage::Abort { csid } => {
516 self.chunk_decoder.abort(csid);
517 }
518
519 RtmpMessage::Acknowledgement { sequence: _ } => {
520 }
522
523 RtmpMessage::WindowAckSize(size) => {
524 self.state.window_ack_size = size;
525 }
526
527 RtmpMessage::SetPeerBandwidth {
528 size,
529 limit_type: _,
530 } => {
531 self.send_window_ack_size(size).await?;
533 }
534
535 RtmpMessage::UserControl(event) => {
536 self.handle_user_control(event).await?;
537 }
538
539 RtmpMessage::Command(cmd) | RtmpMessage::CommandAmf3(cmd) => {
540 self.handle_command(cmd).await?;
541 }
542
543 RtmpMessage::Data(data) | RtmpMessage::DataAmf3(data) => {
544 self.handle_data(data).await?;
545 }
546
547 RtmpMessage::Audio { timestamp, data } => {
548 self.handle_audio(timestamp, data).await?;
549 }
550
551 RtmpMessage::Video { timestamp, data } => {
552 self.handle_video(timestamp, data).await?;
553 }
554
555 _ => {
556 tracing::trace!(message = ?message, "Unhandled message");
557 }
558 }
559
560 Ok(())
561 }
562
563 async fn handle_user_control(&mut self, event: UserControlEvent) -> Result<()> {
565 match event {
566 UserControlEvent::PingRequest(timestamp) => {
567 self.send_ping_response(timestamp).await?;
568 }
569 UserControlEvent::SetBufferLength {
570 stream_id: _,
571 buffer_ms: _,
572 } => {
573 }
575 _ => {}
576 }
577 Ok(())
578 }
579
580 async fn handle_command(&mut self, cmd: Command) -> Result<()> {
582 tracing::debug!(
583 session_id = self.state.id,
584 command = cmd.name,
585 transaction_id = cmd.transaction_id,
586 stream_id = cmd.stream_id,
587 args = ?cmd.arguments,
588 "Received command"
589 );
590 match cmd.name.as_str() {
591 CMD_CONNECT => self.handle_connect(cmd).await?,
592 CMD_CREATE_STREAM => self.handle_create_stream(cmd).await?,
593 CMD_DELETE_STREAM => self.handle_delete_stream(cmd).await?,
594 CMD_PUBLISH => self.handle_publish(cmd).await?,
595 CMD_PLAY => self.handle_play(cmd).await?,
596 CMD_FC_PUBLISH => self.handle_fc_publish(cmd).await?,
597 CMD_FC_UNPUBLISH => self.handle_fc_unpublish(cmd).await?,
598 CMD_RELEASE_STREAM => self.handle_release_stream(cmd).await?,
599 CMD_PAUSE => self.handle_pause(cmd).await?,
600 CMD_CLOSE | "closeStream" => self.handle_close_stream(cmd).await?,
601 _ => {
602 tracing::debug!(command = cmd.name, "Unknown command");
603 }
604 }
605 Ok(())
606 }
607
608 async fn handle_connect(&mut self, cmd: Command) -> Result<()> {
610 let params = ConnectParams::from_amf(&cmd.command_object);
611 let encoder_type = params
612 .flash_ver
613 .as_deref()
614 .map(EncoderType::from_flash_ver)
615 .unwrap_or(EncoderType::Unknown);
616
617 let result = self.handler.on_connect(&self.context, ¶ms).await;
619
620 match result {
621 AuthResult::Accept => {
622 self.state.on_connect(params.clone(), encoder_type);
623 self.context.with_connect(params, encoder_type);
624
625 self.send_window_ack_size(self.config.window_ack_size)
627 .await?;
628
629 self.send_peer_bandwidth(self.config.peer_bandwidth).await?;
631
632 self.send_user_control(UserControlEvent::StreamBegin(0))
634 .await?;
635
636 self.send_connect_result(cmd.transaction_id).await?;
638
639 tracing::info!(
640 session_id = self.state.id,
641 app = self.context.app,
642 "Connected"
643 );
644 }
645 AuthResult::Reject(reason) => {
646 self.send_connect_error(cmd.transaction_id, &reason).await?;
647 return Err(Error::Rejected(reason));
648 }
649 AuthResult::Redirect { url } => {
650 self.send_connect_redirect(cmd.transaction_id, &url).await?;
651 return Err(Error::Rejected(format!("Redirected to {}", url)));
652 }
653 }
654
655 Ok(())
656 }
657
658 async fn handle_create_stream(&mut self, cmd: Command) -> Result<()> {
660 let stream_id = self.state.allocate_stream_id();
661
662 let result = Command::result(
663 cmd.transaction_id,
664 AmfValue::Null,
665 AmfValue::Number(stream_id as f64),
666 );
667
668 self.send_command(CSID_COMMAND, 0, &result).await?;
669
670 tracing::debug!(stream_id = stream_id, "Stream created");
671 Ok(())
672 }
673
674 async fn handle_delete_stream(&mut self, cmd: Command) -> Result<()> {
676 let stream_id = cmd
677 .arguments
678 .first()
679 .and_then(|v| v.as_number())
680 .unwrap_or(0.0) as u32;
681
682 if let Some(stream) = self.state.remove_stream(stream_id) {
683 if stream.is_publishing() {
684 let stream_ctx = StreamContext::new(
685 self.context.clone(),
686 stream_id,
687 stream.stream_key.unwrap_or_default(),
688 true,
689 );
690 self.handler.on_publish_stop(&stream_ctx).await;
691 }
692 }
693
694 Ok(())
695 }
696
697 async fn handle_fc_publish(&mut self, cmd: Command) -> Result<()> {
699 let stream_key = cmd
700 .arguments
701 .first()
702 .and_then(|v| v.as_str())
703 .unwrap_or("")
704 .to_string();
705
706 let result = self.handler.on_fc_publish(&self.context, &stream_key).await;
707
708 match result {
709 AuthResult::Accept => {
710 self.pending_fc
712 .insert(stream_key.clone(), cmd.transaction_id);
713
714 let response = Command {
716 name: CMD_ON_FC_PUBLISH.to_string(),
717 transaction_id: 0.0,
718 command_object: AmfValue::Null,
719 arguments: vec![],
720 stream_id: 0,
721 };
722 self.send_command(CSID_COMMAND, 0, &response).await?;
723 }
724 AuthResult::Reject(reason) => {
725 return Err(Error::Rejected(reason));
726 }
727 AuthResult::Redirect { url } => {
728 return Err(Error::Rejected(format!("Redirected to {}", url)));
729 }
730 }
731
732 Ok(())
733 }
734
735 async fn handle_fc_unpublish(&mut self, cmd: Command) -> Result<()> {
737 let stream_key = cmd.arguments.first().and_then(|v| v.as_str()).unwrap_or("");
738
739 self.pending_fc.remove(stream_key);
740
741 let response = Command {
743 name: CMD_ON_FC_UNPUBLISH.to_string(),
744 transaction_id: 0.0,
745 command_object: AmfValue::Null,
746 arguments: vec![],
747 stream_id: 0,
748 };
749 self.send_command(CSID_COMMAND, 0, &response).await?;
750
751 Ok(())
752 }
753
754 async fn handle_release_stream(&mut self, _cmd: Command) -> Result<()> {
756 Ok(())
758 }
759
760 async fn handle_publish(&mut self, cmd: Command) -> Result<()> {
762 let stream_key = cmd
763 .arguments
764 .first()
765 .and_then(|v| v.as_str())
766 .unwrap_or("")
767 .to_string();
768
769 let publish_type = cmd
770 .arguments
771 .get(1)
772 .and_then(|v| v.as_str())
773 .unwrap_or("live")
774 .to_string();
775
776 let params = PublishParams {
777 stream_key: stream_key.clone(),
778 publish_type: publish_type.clone(),
779 stream_id: cmd.stream_id,
780 };
781
782 let result = self.handler.on_publish(&self.context, ¶ms).await;
783
784 match result {
785 AuthResult::Accept => {
786 let app = self.context.app.clone();
788 let registry_key = StreamKey::new(&app, &stream_key);
789
790 if let Err(e) = self
792 .registry
793 .register_publisher(®istry_key, self.state.id)
794 .await
795 {
796 tracing::warn!(
797 session_id = self.state.id,
798 stream = %registry_key,
799 error = %e,
800 "Failed to register publisher"
801 );
802 let status = Command::on_status(
803 cmd.stream_id,
804 "error",
805 NS_PUBLISH_BAD_NAME,
806 &format!("Stream already publishing: {}", e),
807 );
808 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
809 .await?;
810 return Err(Error::Rejected(format!("Stream already publishing: {}", e)));
811 }
812
813 self.publishing_to = Some(registry_key);
815
816 if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
818 stream.start_publish(stream_key.clone(), publish_type);
819 }
820
821 self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
823 .await?;
824
825 let status = Command::on_status(
827 cmd.stream_id,
828 "status",
829 NS_PUBLISH_START,
830 &format!("{} is now published", stream_key),
831 );
832 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
833 .await?;
834
835 tracing::info!(
836 session_id = self.state.id,
837 stream_key = stream_key,
838 "Publishing started"
839 );
840 }
841 AuthResult::Reject(reason) => {
842 let status =
843 Command::on_status(cmd.stream_id, "error", NS_PUBLISH_BAD_NAME, &reason);
844 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
845 .await?;
846 return Err(Error::Rejected(reason));
847 }
848 AuthResult::Redirect { url } => {
849 return Err(Error::Rejected(format!("Redirected to {}", url)));
850 }
851 }
852
853 Ok(())
854 }
855
856 async fn handle_play(&mut self, cmd: Command) -> Result<()> {
858 let stream_name = cmd
859 .arguments
860 .first()
861 .and_then(|v| v.as_str())
862 .unwrap_or("")
863 .to_string();
864
865 let start = cmd
866 .arguments
867 .get(1)
868 .and_then(|v| v.as_number())
869 .unwrap_or(-2.0);
870
871 let duration = cmd
872 .arguments
873 .get(2)
874 .and_then(|v| v.as_number())
875 .unwrap_or(-1.0);
876
877 let reset = cmd
878 .arguments
879 .get(3)
880 .and_then(|v| v.as_bool())
881 .unwrap_or(true);
882
883 let params = PlayParams {
884 stream_name: stream_name.clone(),
885 start,
886 duration,
887 reset,
888 stream_id: cmd.stream_id,
889 };
890
891 let result = self.handler.on_play(&self.context, ¶ms).await;
892
893 match result {
894 AuthResult::Accept => {
895 let app = self.context.app.clone();
897 let registry_key = StreamKey::new(&app, &stream_name);
898
899 let (rx, catchup_frames) = match self.registry.subscribe(®istry_key).await {
901 Ok(result) => result,
902 Err(e) => {
903 tracing::debug!(
904 session_id = self.state.id,
905 stream = %registry_key,
906 error = %e,
907 "Stream not found for play"
908 );
909 let status = Command::on_status(
910 cmd.stream_id,
911 "error",
912 NS_PLAY_STREAM_NOT_FOUND,
913 &format!("Stream not found: {}", stream_name),
914 );
915 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
916 .await?;
917 return Ok(());
918 }
919 };
920
921 self.subscribed_to = Some(registry_key.clone());
923 self.frame_rx = Some(rx);
924 self.playback_stream_id = Some(cmd.stream_id);
925
926 if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
927 stream.start_play(stream_name.clone());
928 }
929
930 self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
932 .await?;
933
934 if reset {
936 let status = Command::on_status(
937 cmd.stream_id,
938 "status",
939 NS_PLAY_RESET,
940 "Playing and resetting",
941 );
942 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
943 .await?;
944 }
945
946 let status = Command::on_status(
948 cmd.stream_id,
949 "status",
950 NS_PLAY_START,
951 &format!("Started playing {}", stream_name),
952 );
953 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
954 .await?;
955
956 tracing::debug!(
958 session_id = self.state.id,
959 stream = %registry_key,
960 catchup_frames = catchup_frames.len(),
961 "Sending catchup frames"
962 );
963
964 for frame in catchup_frames {
965 self.send_broadcast_frame(frame).await?;
966 }
967
968 tracing::info!(
969 session_id = self.state.id,
970 stream_name = stream_name,
971 "Playing started"
972 );
973 }
974 AuthResult::Reject(reason) => {
975 let status =
976 Command::on_status(cmd.stream_id, "error", NS_PLAY_STREAM_NOT_FOUND, &reason);
977 self.send_command(CSID_COMMAND, cmd.stream_id, &status)
978 .await?;
979 }
980 AuthResult::Redirect { url: _ } => {
981 }
983 }
984
985 Ok(())
986 }
987
988 async fn handle_close_stream(&mut self, cmd: Command) -> Result<()> {
990 if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
991 stream.stop();
992 }
993 Ok(())
994 }
995
996 async fn handle_pause(&mut self, cmd: Command) -> Result<()> {
998 let stream_id = match self.playback_stream_id {
999 Some(id) => id,
1000 None => return Ok(()), };
1002
1003 let pause_flag = cmd
1005 .arguments
1006 .first()
1007 .and_then(|v| v.as_bool())
1008 .unwrap_or(true);
1009
1010 if pause_flag {
1011 self.do_pause(stream_id).await
1012 } else {
1013 self.do_unpause(stream_id).await
1014 }
1015 }
1016
1017 async fn do_pause(&mut self, stream_id: u32) -> Result<()> {
1019 if self.is_paused {
1020 return Ok(()); }
1022
1023 self.is_paused = true;
1024 self.frames_dropped_while_paused = 0;
1025
1026 let status = Command::on_status(stream_id, "status", NS_PAUSE_NOTIFY, "Playback paused");
1028 self.send_command(CSID_COMMAND, stream_id, &status).await?;
1029
1030 self.send_user_control(UserControlEvent::StreamDry(stream_id))
1032 .await?;
1033
1034 let stream_key = self
1036 .subscribed_to
1037 .as_ref()
1038 .map(|k| k.name.clone())
1039 .unwrap_or_default();
1040 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1041 self.handler.on_pause(&stream_ctx).await;
1042
1043 tracing::info!(session_id = self.state.id, "Subscriber paused");
1044 Ok(())
1045 }
1046
1047 async fn do_unpause(&mut self, stream_id: u32) -> Result<()> {
1049 if !self.is_paused {
1050 return Ok(()); }
1052
1053 self.is_paused = false;
1054
1055 self.subscriber_state = SubscriberState::SkippingToKeyframe;
1058 self.skip_audio_until_keyframe = true;
1059
1060 let status = Command::on_status(stream_id, "status", NS_UNPAUSE_NOTIFY, "Playback resumed");
1062 self.send_command(CSID_COMMAND, stream_id, &status).await?;
1063
1064 self.send_user_control(UserControlEvent::StreamBegin(stream_id))
1066 .await?;
1067
1068 if let Some(ref key) = self.subscribed_to {
1071 let headers = self.registry.get_sequence_headers(key).await;
1072 tracing::debug!(
1073 session_id = self.state.id,
1074 header_count = headers.len(),
1075 "Resending sequence headers after unpause"
1076 );
1077 for frame in headers {
1078 match frame.frame_type {
1079 FrameType::Video => {
1080 self.send_video(stream_id, frame.timestamp, frame.data)
1081 .await?;
1082 }
1083 FrameType::Audio => {
1084 self.send_audio(stream_id, frame.timestamp, frame.data)
1085 .await?;
1086 }
1087 _ => {}
1088 }
1089 }
1090 self.writer.flush().await?;
1091 }
1092
1093 let stream_key = self
1095 .subscribed_to
1096 .as_ref()
1097 .map(|k| k.name.clone())
1098 .unwrap_or_default();
1099 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1100 self.handler.on_unpause(&stream_ctx).await;
1101
1102 tracing::info!(
1103 session_id = self.state.id,
1104 frames_dropped = self.frames_dropped_while_paused,
1105 "Subscriber unpaused (waiting for keyframe)"
1106 );
1107 self.frames_dropped_while_paused = 0;
1108 Ok(())
1109 }
1110
1111 async fn handle_data(&mut self, data: DataMessage) -> Result<()> {
1113 match data.name.as_str() {
1114 CMD_SET_DATA_FRAME => {
1115 if let Some(AmfValue::String(name)) = data.values.first() {
1117 if name == CMD_ON_METADATA {
1118 self.handle_metadata(data.stream_id, &data.values[1..])
1119 .await?;
1120 }
1121 }
1122 }
1123 CMD_ON_METADATA => {
1124 self.handle_metadata(data.stream_id, &data.values).await?;
1125 }
1126 _ => {
1127 tracing::trace!(name = data.name, "Unknown data message");
1128 }
1129 }
1130 Ok(())
1131 }
1132
1133 async fn handle_metadata(&mut self, stream_id: u32, values: &[AmfValue]) -> Result<()> {
1135 let metadata: HashMap<String, AmfValue> = values
1137 .first()
1138 .and_then(|v| v.as_object().cloned())
1139 .unwrap_or_default();
1140
1141 if let Some(stream) = self.state.get_stream_mut(stream_id) {
1142 stream.on_metadata();
1143 }
1144
1145 if let Some(stream) = self.state.get_stream(stream_id) {
1146 if stream.is_publishing() {
1147 let stream_ctx = StreamContext::new(
1148 self.context.clone(),
1149 stream_id,
1150 stream.stream_key.clone().unwrap_or_default(),
1151 true,
1152 );
1153 self.handler.on_metadata(&stream_ctx, &metadata).await;
1154 }
1155 }
1156
1157 Ok(())
1158 }
1159
1160 async fn handle_audio(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1162 if data.is_empty() {
1163 return Ok(());
1164 }
1165
1166 if let Some(prev_audio_ts) = self.last_audio_ts {
1167 let timestamp_delta = timestamp - prev_audio_ts;
1168 tracing::trace!(
1169 timestamp = timestamp,
1170 last_audio_ts = prev_audio_ts,
1171 timestamp_delta = timestamp_delta,
1172 "connection handle_audio"
1173 );
1174 }
1175 self.last_audio_ts = Some(timestamp);
1176
1177 let stream_id = self.find_publishing_stream()?;
1179 let stream = self
1180 .state
1181 .get_stream_mut(stream_id)
1182 .ok_or_else(|| ProtocolError::StreamNotFound(stream_id))?;
1183
1184 let is_header = data.len() >= 2 && (data[0] >> 4) == 10 && data[1] == 0;
1185 stream.on_audio(timestamp, is_header, data.len());
1186
1187 if is_header {
1189 let tag = FlvTag::audio(timestamp, data.clone());
1190 stream.gop_buffer.set_audio_header(tag);
1191 }
1192
1193 let stream_key = stream.stream_key.clone().unwrap_or_default();
1195 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1196
1197 let mode = self.handler.media_delivery_mode();
1199
1200 if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1201 let tag = FlvTag::audio(timestamp, data.clone());
1202 self.handler.on_media_tag(&stream_ctx, &tag).await;
1203 }
1204
1205 if matches!(
1206 mode,
1207 MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1208 ) {
1209 if data.len() >= 2 && (data[0] >> 4) == 10 {
1210 if let Ok(aac_data) = AacData::parse(data.slice(1..)) {
1212 self.handler
1213 .on_audio_frame(&stream_ctx, &aac_data, timestamp)
1214 .await;
1215 }
1216 }
1217 }
1218
1219 if let Some(ref key) = self.publishing_to {
1221 let frame = BroadcastFrame::audio(timestamp, data, is_header);
1222 self.registry.broadcast(key, frame).await;
1223 }
1224
1225 Ok(())
1226 }
1227
1228 async fn handle_video(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1230 if data.is_empty() {
1231 return Ok(());
1232 }
1233
1234 if let Some(prev_video_ts) = self.last_video_ts {
1235 let timestamp_delta = timestamp - prev_video_ts;
1236 tracing::trace!(
1237 timestamp = timestamp,
1238 last_video_ts = prev_video_ts,
1239 timestamp_delta = timestamp_delta,
1240 "connection handle_video"
1241 );
1242 }
1243 self.last_video_ts = Some(timestamp);
1244
1245 let stream_id = self.find_publishing_stream()?;
1247 let stream = self
1248 .state
1249 .get_stream_mut(stream_id)
1250 .ok_or_else(|| ProtocolError::StreamNotFound(stream_id))?;
1251
1252 let is_keyframe = (data[0] >> 4) == 1;
1253 let is_header = data.len() >= 2 && (data[0] & 0x0F) == 7 && data[1] == 0;
1254 stream.on_video(timestamp, is_keyframe, is_header, data.len());
1255
1256 let tag = FlvTag::video(timestamp, data.clone());
1258
1259 if is_header {
1261 stream.gop_buffer.set_video_header(tag.clone());
1262 } else {
1263 stream.gop_buffer.push(tag.clone());
1265 }
1266
1267 let stream_key = stream.stream_key.clone().unwrap_or_default();
1269 let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1270
1271 if is_keyframe && !is_header {
1273 self.handler.on_keyframe(&stream_ctx, timestamp).await;
1274 }
1275
1276 let mode = self.handler.media_delivery_mode();
1278
1279 if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1280 self.handler.on_media_tag(&stream_ctx, &tag).await;
1281 }
1282
1283 if matches!(
1284 mode,
1285 MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1286 ) {
1287 if data.len() >= 2 && (data[0] & 0x0F) == 7 {
1288 if let Ok(h264_data) = H264Data::parse(data.slice(1..)) {
1290 self.handler
1291 .on_video_frame(&stream_ctx, &h264_data, timestamp)
1292 .await;
1293 }
1294 }
1295 }
1296
1297 if let Some(ref key) = self.publishing_to {
1299 let frame = BroadcastFrame::video(timestamp, data, is_keyframe, is_header);
1300 self.registry.broadcast(key, frame).await;
1301 }
1302
1303 Ok(())
1304 }
1305
1306 fn find_publishing_stream(&self) -> Result<u32> {
1308 for (id, stream) in &self.state.streams {
1309 if stream.is_publishing() {
1310 return Ok(*id);
1311 }
1312 }
1313 Err(ProtocolError::StreamNotFound(0).into())
1314 }
1315
1316 async fn send_command(&mut self, csid: u32, stream_id: u32, cmd: &Command) -> Result<()> {
1319 let (msg_type, payload) = RtmpMessage::Command(cmd.clone()).encode();
1320
1321 let chunk = RtmpChunk {
1322 csid,
1323 timestamp: 0,
1324 message_type: msg_type,
1325 stream_id,
1326 payload,
1327 };
1328
1329 self.write_buf.clear();
1330 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1331 self.writer.write_all(&self.write_buf).await?;
1332 self.writer.flush().await?;
1333
1334 Ok(())
1335 }
1336
1337 async fn send_connect_result(&mut self, transaction_id: f64) -> Result<()> {
1338 let mut props = HashMap::new();
1339 props.insert(
1340 "fmsVer".to_string(),
1341 AmfValue::String("FMS/3,5,7,7009".into()),
1342 );
1343 props.insert("capabilities".to_string(), AmfValue::Number(31.0));
1344 props.insert("mode".to_string(), AmfValue::Number(1.0));
1345
1346 let mut info = HashMap::new();
1347 info.insert("level".to_string(), AmfValue::String("status".into()));
1348 info.insert(
1349 "code".to_string(),
1350 AmfValue::String(NC_CONNECT_SUCCESS.into()),
1351 );
1352 info.insert(
1353 "description".to_string(),
1354 AmfValue::String("Connection succeeded".into()),
1355 );
1356 info.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
1357
1358 let result = Command::result(
1359 transaction_id,
1360 AmfValue::Object(props),
1361 AmfValue::Object(info),
1362 );
1363
1364 self.send_command(CSID_COMMAND, 0, &result).await
1365 }
1366
1367 async fn send_connect_error(&mut self, transaction_id: f64, reason: &str) -> Result<()> {
1368 let mut info = HashMap::new();
1369 info.insert("level".to_string(), AmfValue::String("error".into()));
1370 info.insert(
1371 "code".to_string(),
1372 AmfValue::String(NC_CONNECT_REJECTED.into()),
1373 );
1374 info.insert("description".to_string(), AmfValue::String(reason.into()));
1375
1376 let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1377
1378 self.send_command(CSID_COMMAND, 0, &error).await
1379 }
1380
1381 async fn send_connect_redirect(&mut self, transaction_id: f64, url: &str) -> Result<()> {
1382 let mut info = HashMap::new();
1383 info.insert("level".to_string(), AmfValue::String("error".into()));
1384 info.insert(
1385 "code".to_string(),
1386 AmfValue::String(NC_CONNECT_REJECTED.into()),
1387 );
1388 info.insert(
1389 "description".to_string(),
1390 AmfValue::String("Redirect".into()),
1391 );
1392 info.insert("ex.redirect".to_string(), AmfValue::String(url.into()));
1393
1394 let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1395
1396 self.send_command(CSID_COMMAND, 0, &error).await
1397 }
1398
1399 async fn send_set_chunk_size(&mut self, size: u32) -> Result<()> {
1400 let (msg_type, payload) = RtmpMessage::SetChunkSize(size).encode();
1401
1402 let chunk = RtmpChunk {
1403 csid: CSID_PROTOCOL_CONTROL,
1404 timestamp: 0,
1405 message_type: msg_type,
1406 stream_id: 0,
1407 payload,
1408 };
1409
1410 self.write_buf.clear();
1411 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1412 self.writer.write_all(&self.write_buf).await?;
1413 self.writer.flush().await?;
1414
1415 Ok(())
1416 }
1417
1418 async fn send_window_ack_size(&mut self, size: u32) -> Result<()> {
1419 let (msg_type, payload) = RtmpMessage::WindowAckSize(size).encode();
1420
1421 let chunk = RtmpChunk {
1422 csid: CSID_PROTOCOL_CONTROL,
1423 timestamp: 0,
1424 message_type: msg_type,
1425 stream_id: 0,
1426 payload,
1427 };
1428
1429 self.write_buf.clear();
1430 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1431 self.writer.write_all(&self.write_buf).await?;
1432 self.writer.flush().await?;
1433
1434 Ok(())
1435 }
1436
1437 async fn send_peer_bandwidth(&mut self, size: u32) -> Result<()> {
1438 let (msg_type, payload) = RtmpMessage::SetPeerBandwidth {
1439 size,
1440 limit_type: BANDWIDTH_LIMIT_DYNAMIC,
1441 }
1442 .encode();
1443
1444 let chunk = RtmpChunk {
1445 csid: CSID_PROTOCOL_CONTROL,
1446 timestamp: 0,
1447 message_type: msg_type,
1448 stream_id: 0,
1449 payload,
1450 };
1451
1452 self.write_buf.clear();
1453 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1454 self.writer.write_all(&self.write_buf).await?;
1455 self.writer.flush().await?;
1456
1457 Ok(())
1458 }
1459
1460 async fn send_user_control(&mut self, event: UserControlEvent) -> Result<()> {
1461 let (msg_type, payload) = RtmpMessage::UserControl(event).encode();
1462
1463 let chunk = RtmpChunk {
1464 csid: CSID_PROTOCOL_CONTROL,
1465 timestamp: 0,
1466 message_type: msg_type,
1467 stream_id: 0,
1468 payload,
1469 };
1470
1471 self.write_buf.clear();
1472 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1473 self.writer.write_all(&self.write_buf).await?;
1474 self.writer.flush().await?;
1475
1476 Ok(())
1477 }
1478
1479 async fn send_acknowledgement(&mut self) -> Result<()> {
1480 let sequence = self.state.bytes_received as u32;
1481
1482 let (msg_type, payload) = RtmpMessage::Acknowledgement { sequence }.encode();
1483
1484 let chunk = RtmpChunk {
1485 csid: CSID_PROTOCOL_CONTROL,
1486 timestamp: 0,
1487 message_type: msg_type,
1488 stream_id: 0,
1489 payload,
1490 };
1491
1492 self.write_buf.clear();
1493 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1494 self.writer.write_all(&self.write_buf).await?;
1495 self.writer.flush().await?;
1496
1497 self.state.mark_ack_sent();
1498 Ok(())
1499 }
1500
1501 async fn send_ping_response(&mut self, timestamp: u32) -> Result<()> {
1502 self.send_user_control(UserControlEvent::PingResponse(timestamp))
1503 .await
1504 }
1505
1506 async fn send_video(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1510 let (msg_type, payload) = RtmpMessage::Video {
1511 timestamp,
1512 data: data.clone(),
1513 }
1514 .encode();
1515
1516 let chunk = RtmpChunk {
1517 csid: CSID_VIDEO,
1518 timestamp,
1519 message_type: msg_type,
1520 stream_id,
1521 payload,
1522 };
1523
1524 self.write_buf.clear();
1525 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1526 self.writer.write_all(&self.write_buf).await?;
1527 Ok(())
1529 }
1530
1531 async fn send_audio(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1533 let (msg_type, payload) = RtmpMessage::Audio {
1534 timestamp,
1535 data: data.clone(),
1536 }
1537 .encode();
1538
1539 let chunk = RtmpChunk {
1540 csid: CSID_AUDIO,
1541 timestamp,
1542 message_type: msg_type,
1543 stream_id,
1544 payload,
1545 };
1546
1547 self.write_buf.clear();
1548 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1549 self.writer.write_all(&self.write_buf).await?;
1550 Ok(())
1552 }
1553
1554 async fn send_broadcast_frame(&mut self, frame: BroadcastFrame) -> Result<()> {
1559 let stream_id = self.playback_stream_id.unwrap_or(1);
1560
1561 if self.is_paused {
1563 self.frames_dropped_while_paused += 1;
1564 tracing::trace!(session_id = self.state.id, "Frame dropped (paused)");
1565 return Ok(());
1566 }
1567
1568 if self.subscriber_state == SubscriberState::SkippingToKeyframe {
1570 match frame.frame_type {
1571 FrameType::Video => {
1572 if frame.is_keyframe || frame.is_header {
1573 self.subscriber_state = SubscriberState::Normal;
1575 self.skip_audio_until_keyframe = false;
1576 tracing::debug!(
1577 session_id = self.state.id,
1578 "Received keyframe, resuming normal playback"
1579 );
1580 } else {
1581 return Ok(());
1583 }
1584 }
1585 FrameType::Audio => {
1586 if self.skip_audio_until_keyframe {
1589 return Ok(());
1590 }
1591 }
1592 FrameType::Metadata => {
1593 }
1595 }
1596 }
1597
1598 match frame.frame_type {
1600 FrameType::Video => {
1601 self.send_video(stream_id, frame.timestamp, frame.data)
1602 .await?;
1603 }
1604 FrameType::Audio => {
1605 self.send_audio(stream_id, frame.timestamp, frame.data)
1606 .await?;
1607 }
1608 FrameType::Metadata => {
1609 self.send_metadata_frame(stream_id, frame.data).await?;
1611 }
1612 }
1613
1614 self.writer.flush().await?;
1617
1618 Ok(())
1619 }
1620
1621 async fn send_metadata_frame(&mut self, stream_id: u32, data: Bytes) -> Result<()> {
1623 let data_msg = DataMessage {
1625 name: CMD_ON_METADATA.to_string(),
1626 values: vec![AmfValue::String("onMetaData".to_string())],
1627 stream_id,
1628 };
1629
1630 let (msg_type, payload) = RtmpMessage::Data(data_msg).encode();
1631
1632 let chunk = RtmpChunk {
1635 csid: CSID_COMMAND,
1636 timestamp: 0,
1637 message_type: msg_type,
1638 stream_id,
1639 payload: if data.is_empty() { payload } else { data },
1640 };
1641
1642 self.write_buf.clear();
1643 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1644 self.writer.write_all(&self.write_buf).await?;
1645
1646 Ok(())
1647 }
1648}
1649
1650use bytes::Buf;