rtmp_rs/server/
connection.rs

1//! Per-connection RTMP handler
2//!
3//! Manages the lifecycle of a single RTMP connection:
4//! 1. Handshake
5//! 2. Connect command
6//! 3. Stream commands (publish/play)
7//! 4. Media handling
8//! 5. Disconnect
9
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::Arc;
13
14use bytes::{Bytes, BytesMut};
15use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
16use tokio::net::TcpStream;
17use tokio::sync::broadcast;
18use tokio::time::timeout;
19
20use crate::registry::{BroadcastFrame, FrameType, StreamKey, StreamRegistry};
21
22use crate::amf::AmfValue;
23use crate::error::{Error, ProtocolError, Result};
24use crate::media::flv::FlvTag;
25use crate::media::{AacData, H264Data};
26use crate::protocol::chunk::{ChunkDecoder, ChunkEncoder, RtmpChunk};
27use crate::protocol::constants::*;
28use crate::protocol::handshake::{Handshake, HandshakeRole};
29use crate::protocol::message::{
30    Command, ConnectParams, DataMessage, PlayParams, PublishParams, RtmpMessage, UserControlEvent,
31};
32use crate::protocol::quirks::EncoderType;
33use crate::server::config::ServerConfig;
34use crate::server::handler::{AuthResult, MediaDeliveryMode, RtmpHandler};
35use crate::session::context::{SessionContext, StreamContext};
36use crate::session::state::SessionState;
37
38/// Subscriber state for backpressure handling
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40enum SubscriberState {
41    /// Normal operation
42    Normal,
43    /// Skipping frames until next keyframe (due to lag)
44    SkippingToKeyframe,
45}
46
47/// Per-connection handler
48pub struct Connection<H: RtmpHandler> {
49    /// Session state
50    state: SessionState,
51
52    /// Session context for callbacks
53    context: SessionContext,
54
55    /// TCP stream (buffered)
56    reader: BufReader<tokio::io::ReadHalf<TcpStream>>,
57    writer: BufWriter<tokio::io::WriteHalf<TcpStream>>,
58
59    /// Read buffer
60    read_buf: BytesMut,
61
62    /// Chunk decoder/encoder
63    chunk_decoder: ChunkDecoder,
64    chunk_encoder: ChunkEncoder,
65
66    /// Write buffer for outgoing chunks
67    write_buf: BytesMut,
68
69    /// Server configuration
70    config: ServerConfig,
71
72    /// Application handler
73    handler: Arc<H>,
74
75    /// Stream registry for pub/sub routing
76    registry: Arc<StreamRegistry>,
77
78    /// Pending FC commands (stream key -> transaction ID)
79    pending_fc: HashMap<String, f64>,
80
81    /// Stream key we are publishing to (if any)
82    publishing_to: Option<StreamKey>,
83
84    /// Stream key we are subscribed to (if any)
85    subscribed_to: Option<StreamKey>,
86
87    last_audio_ts: Option<u32>,
88
89    last_video_ts: Option<u32>,
90
91    /// Broadcast receiver for subscriber mode
92    frame_rx: Option<broadcast::Receiver<BroadcastFrame>>,
93
94    /// Subscriber state for backpressure handling
95    subscriber_state: SubscriberState,
96
97    /// Count of consecutive lag events (for disconnection threshold)
98    consecutive_lag_count: u32,
99
100    /// RTMP stream ID we're using for playback (subscriber mode)
101    playback_stream_id: Option<u32>,
102
103    /// Whether the subscriber is paused
104    is_paused: bool,
105
106    /// Frames dropped while paused (for logging)
107    frames_dropped_while_paused: u64,
108
109    /// Skip audio until keyframe (set on unpause, cleared on keyframe)
110    /// This prevents the jarring experience of audio playing while video is frozen
111    skip_audio_until_keyframe: bool,
112}
113
114impl<H: RtmpHandler> Connection<H> {
115    /// Create a new connection handler
116    pub fn new(
117        session_id: u64,
118        socket: TcpStream,
119        peer_addr: SocketAddr,
120        config: ServerConfig,
121        handler: Arc<H>,
122        registry: Arc<StreamRegistry>,
123    ) -> Self {
124        let (read_half, write_half) = tokio::io::split(socket);
125
126        Self {
127            state: SessionState::new(session_id, peer_addr),
128            context: SessionContext::new(session_id, peer_addr),
129            reader: BufReader::with_capacity(config.read_buffer_size, read_half),
130            writer: BufWriter::with_capacity(config.write_buffer_size, write_half),
131            read_buf: BytesMut::with_capacity(config.read_buffer_size),
132            chunk_decoder: ChunkDecoder::new(),
133            chunk_encoder: ChunkEncoder::new(),
134            write_buf: BytesMut::with_capacity(config.write_buffer_size),
135            config,
136            handler,
137            registry,
138            pending_fc: HashMap::new(),
139            publishing_to: None,
140            subscribed_to: None,
141            last_audio_ts: None,
142            last_video_ts: None,
143            frame_rx: None,
144            subscriber_state: SubscriberState::Normal,
145            consecutive_lag_count: 0,
146            playback_stream_id: None,
147            is_paused: false,
148            frames_dropped_while_paused: 0,
149            skip_audio_until_keyframe: false,
150        }
151    }
152
153    /// Run the connection
154    pub async fn run(&mut self) -> Result<()> {
155        // Check if handler allows connection
156        if !self.handler.on_connection(&self.context).await {
157            return Err(Error::Rejected("Connection rejected by handler".into()));
158        }
159
160        // Perform handshake
161        self.do_handshake().await?;
162        self.handler.on_handshake_complete(&self.context).await;
163
164        // Set our chunk size
165        tracing::debug!(
166            session_id = self.state.id,
167            chunk_size = self.config.chunk_size,
168            "Sending set chunk size"
169        );
170        self.send_set_chunk_size(self.config.chunk_size).await?;
171        self.chunk_encoder.set_chunk_size(self.config.chunk_size);
172
173        tracing::debug!(session_id = self.state.id, "Entering main message loop");
174
175        // Main message loop
176        let idle_timeout = self.config.idle_timeout;
177        let result = loop {
178            // Handle subscriber mode: take frame_rx out to avoid borrow conflicts
179            let mut frame_rx = self.frame_rx.take();
180
181            // Use select! to handle both TCP input and broadcast frames
182            let loop_result = if let Some(ref mut rx) = frame_rx {
183                // Subscriber mode: listen for both TCP and broadcast frames
184                tokio::select! {
185                    biased;
186
187                    // Receive broadcast frames for subscribers (higher priority)
188                    frame_result = rx.recv() => {
189                        match frame_result {
190                            Ok(frame) => {
191                                // Put receiver back before processing
192                                self.frame_rx = frame_rx;
193                                // Reset lag count on successful receive
194                                self.consecutive_lag_count = 0;
195                                if let Err(e) = self.send_broadcast_frame(frame).await {
196                                    tracing::debug!(error = %e, "Failed to send frame");
197                                    Err(e)
198                                } else {
199                                    Ok(true)
200                                }
201                            }
202                            Err(broadcast::error::RecvError::Lagged(n)) => {
203                                self.frame_rx = frame_rx;
204                                self.handle_lag(n).await.map(|_| true)
205                            }
206                            Err(broadcast::error::RecvError::Closed) => {
207                                self.frame_rx = frame_rx;
208                                // Publisher ended, notify subscriber
209                                if let Err(e) = self.handle_stream_ended().await {
210                                    tracing::debug!(error = %e, "Error handling stream end");
211                                }
212                                Ok(false) // Signal to exit loop
213                            }
214                        }
215                    }
216
217                    // Read from TCP
218                    result = timeout(idle_timeout, self.read_and_process()) => {
219                        self.frame_rx = frame_rx;
220                        match result {
221                            Ok(Ok(continue_loop)) => Ok(continue_loop),
222                            Ok(Err(e)) => {
223                                tracing::debug!(error = %e, "Processing error");
224                                Err(e)
225                            }
226                            Err(_) => {
227                                tracing::debug!("Idle timeout");
228                                Ok(false)
229                            }
230                        }
231                    }
232                }
233            } else {
234                // Publisher mode: only listen for TCP
235                self.frame_rx = frame_rx;
236                match timeout(idle_timeout, self.read_and_process()).await {
237                    Ok(Ok(continue_loop)) => Ok(continue_loop),
238                    Ok(Err(e)) => {
239                        tracing::debug!(error = %e, "Processing error");
240                        Err(e)
241                    }
242                    Err(_) => {
243                        tracing::debug!("Idle timeout");
244                        Ok(false)
245                    }
246                }
247            };
248
249            match loop_result {
250                Ok(true) => continue,
251                Ok(false) => break Ok(()),
252                Err(e) => break Err(e),
253            }
254        };
255
256        // Cleanup: unregister publisher or unsubscribe
257        self.cleanup_on_disconnect().await;
258
259        // Notify handler of disconnect
260        self.handler.on_disconnect(&self.context).await;
261
262        result
263    }
264
265    /// Cleanup when connection disconnects
266    async fn cleanup_on_disconnect(&mut self) {
267        // Unregister as publisher if we were publishing
268        if let Some(ref key) = self.publishing_to {
269            self.registry.unregister_publisher(key, self.state.id).await;
270            tracing::debug!(
271                session_id = self.state.id,
272                stream = %key,
273                "Unregistered publisher on disconnect"
274            );
275        }
276
277        // Unsubscribe if we were subscribed
278        if let Some(ref key) = self.subscribed_to {
279            self.registry.unsubscribe(key).await;
280            tracing::debug!(
281                session_id = self.state.id,
282                stream = %key,
283                "Unsubscribed on disconnect"
284            );
285        }
286    }
287
288    /// Handle lag event from broadcast channel
289    async fn handle_lag(&mut self, skipped: u64) -> Result<()> {
290        let config = self.registry.config();
291
292        self.consecutive_lag_count += 1;
293
294        if skipped < config.lag_threshold_low {
295            // Minor lag, continue normally
296            tracing::debug!(
297                session_id = self.state.id,
298                skipped = skipped,
299                "Minor broadcast lag, continuing"
300            );
301            return Ok(());
302        }
303
304        // Significant lag - enter skip mode
305        if self.subscriber_state != SubscriberState::SkippingToKeyframe {
306            self.subscriber_state = SubscriberState::SkippingToKeyframe;
307            tracing::warn!(
308                session_id = self.state.id,
309                skipped = skipped,
310                "Subscriber lagging, skipping to next keyframe"
311            );
312        }
313
314        // Check if we should disconnect slow subscriber
315        if self.consecutive_lag_count >= config.max_consecutive_lag_events {
316            tracing::warn!(
317                session_id = self.state.id,
318                consecutive_lags = self.consecutive_lag_count,
319                "Disconnecting slow subscriber"
320            );
321            return Err(Error::Rejected("Subscriber too slow".into()));
322        }
323
324        Ok(())
325    }
326
327    /// Handle stream ended (publisher closed broadcast channel)
328    async fn handle_stream_ended(&mut self) -> Result<()> {
329        // Reset pause state on stream end
330        if self.is_paused {
331            self.is_paused = false;
332        }
333
334        if let Some(stream_id) = self.playback_stream_id {
335            // Send StreamEOF
336            self.send_user_control(UserControlEvent::StreamEof(stream_id))
337                .await?;
338
339            // Send onStatus
340            let status = Command::on_status(stream_id, "status", NS_PLAY_STOP, "Stream ended");
341            self.send_command(CSID_COMMAND, stream_id, &status).await?;
342
343            tracing::info!(
344                session_id = self.state.id,
345                stream_id = stream_id,
346                "Stream ended, notified subscriber"
347            );
348        }
349
350        Ok(())
351    }
352
353    /// Perform RTMP handshake
354    async fn do_handshake(&mut self) -> Result<()> {
355        let mut handshake = Handshake::new(HandshakeRole::Server);
356
357        // Move to waiting state
358        handshake.generate_initial();
359        self.state.start_handshake();
360
361        // Wait for C0C1
362        let connection_timeout = self.config.connection_timeout;
363        timeout(connection_timeout, async {
364            loop {
365                // Read more data
366                let bytes_needed = handshake.bytes_needed();
367                if bytes_needed > 0 && self.read_buf.len() < bytes_needed {
368                    let n = self.reader.read_buf(&mut self.read_buf).await?;
369                    if n == 0 {
370                        return Err(Error::ConnectionClosed);
371                    }
372                }
373
374                // Process handshake
375                let mut buf = Bytes::copy_from_slice(&self.read_buf);
376                let response = handshake.process(&mut buf)?;
377
378                // Always consume processed bytes (even if no response to send)
379                let consumed = self.read_buf.len() - buf.len();
380                if consumed > 0 {
381                    self.read_buf.advance(consumed);
382                }
383
384                // Send response if any (S0S1S2 for C0C1, nothing for C2)
385                if let Some(data) = response {
386                    self.writer.write_all(&data).await?;
387                    self.writer.flush().await?;
388                }
389
390                if handshake.is_done() {
391                    break;
392                }
393            }
394
395            Ok::<_, Error>(())
396        })
397        .await
398        .map_err(|_| Error::Timeout)??;
399
400        self.state.complete_handshake();
401        tracing::debug!(
402            session_id = self.state.id,
403            remaining_buf = self.read_buf.len(),
404            "Handshake complete"
405        );
406
407        // Debug: dump first 32 bytes of remaining buffer
408        if !self.read_buf.is_empty() {
409            let dump_len = self.read_buf.len().min(32);
410            let hex: String = self.read_buf[..dump_len]
411                .iter()
412                .map(|b| format!("{:02x}", b))
413                .collect::<Vec<_>>()
414                .join(" ");
415            tracing::debug!(
416                session_id = self.state.id,
417                first_bytes = %hex,
418                "Buffer after handshake"
419            );
420        }
421
422        Ok(())
423    }
424
425    /// Read data and process messages
426    async fn read_and_process(&mut self) -> Result<bool> {
427        tracing::trace!(
428            session_id = self.state.id,
429            buf_len = self.read_buf.len(),
430            "read_and_process called"
431        );
432
433        // Keep trying to decode until we need more data
434        // This is important for multi-chunk messages where multiple chunks
435        // may be in the buffer but only the last one completes the message
436        loop {
437            let buf_len_before = self.read_buf.len();
438
439            // Try to decode a complete message
440            if let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
441                tracing::trace!(
442                    session_id = self.state.id,
443                    csid = chunk.csid,
444                    msg_type = chunk.message_type,
445                    "Decoded chunk from buffer"
446                );
447                self.handle_chunk(chunk).await?;
448                // Continue to try decoding more messages
449                continue;
450            }
451
452            // decode() returned None - check if it consumed any data
453            // If it did, there's a partial chunk being assembled, keep trying
454            let buf_len_after = self.read_buf.len();
455            if buf_len_after < buf_len_before {
456                // Some data was consumed (partial chunk), try again
457                continue;
458            }
459
460            // No progress - either buffer is empty or we need more data
461            break;
462        }
463
464        tracing::trace!(
465            session_id = self.state.id,
466            buf_len = self.read_buf.len(),
467            "Waiting for more data"
468        );
469
470        // Wait for more data from the socket
471        let n = self.reader.read_buf(&mut self.read_buf).await?;
472        if n == 0 {
473            return Ok(false); // Connection closed
474        }
475
476        tracing::trace!(
477            session_id = self.state.id,
478            bytes_read = n,
479            buf_len = self.read_buf.len(),
480            "Read data from socket"
481        );
482
483        let needs_ack = self.state.add_bytes_received(n as u64);
484
485        // Process any complete messages with the new data
486        while let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
487            tracing::trace!(
488                session_id = self.state.id,
489                csid = chunk.csid,
490                msg_type = chunk.message_type,
491                "Decoded chunk after read"
492            );
493            self.handle_chunk(chunk).await?;
494        }
495
496        // Send acknowledgement if needed
497        if needs_ack {
498            self.send_acknowledgement().await?;
499        }
500
501        Ok(true)
502    }
503
504    /// Handle a decoded chunk
505    async fn handle_chunk(&mut self, chunk: RtmpChunk) -> Result<()> {
506        let message = RtmpMessage::from_chunk(&chunk)?;
507
508        match message {
509            RtmpMessage::SetChunkSize(size) => {
510                tracing::debug!(size = size, "Peer set chunk size");
511                self.chunk_decoder.set_chunk_size(size);
512                self.state.in_chunk_size = size;
513            }
514
515            RtmpMessage::Abort { csid } => {
516                self.chunk_decoder.abort(csid);
517            }
518
519            RtmpMessage::Acknowledgement { sequence: _ } => {
520                // Peer acknowledged bytes - we can track this for flow control
521            }
522
523            RtmpMessage::WindowAckSize(size) => {
524                self.state.window_ack_size = size;
525            }
526
527            RtmpMessage::SetPeerBandwidth {
528                size,
529                limit_type: _,
530            } => {
531                // Send window ack size back
532                self.send_window_ack_size(size).await?;
533            }
534
535            RtmpMessage::UserControl(event) => {
536                self.handle_user_control(event).await?;
537            }
538
539            RtmpMessage::Command(cmd) | RtmpMessage::CommandAmf3(cmd) => {
540                self.handle_command(cmd).await?;
541            }
542
543            RtmpMessage::Data(data) | RtmpMessage::DataAmf3(data) => {
544                self.handle_data(data).await?;
545            }
546
547            RtmpMessage::Audio { timestamp, data } => {
548                self.handle_audio(timestamp, data).await?;
549            }
550
551            RtmpMessage::Video { timestamp, data } => {
552                self.handle_video(timestamp, data).await?;
553            }
554
555            _ => {
556                tracing::trace!(message = ?message, "Unhandled message");
557            }
558        }
559
560        Ok(())
561    }
562
563    /// Handle user control event
564    async fn handle_user_control(&mut self, event: UserControlEvent) -> Result<()> {
565        match event {
566            UserControlEvent::PingRequest(timestamp) => {
567                self.send_ping_response(timestamp).await?;
568            }
569            UserControlEvent::SetBufferLength {
570                stream_id: _,
571                buffer_ms: _,
572            } => {
573                // Client's buffer length - we can use this for flow control
574            }
575            _ => {}
576        }
577        Ok(())
578    }
579
580    /// Handle command message
581    async fn handle_command(&mut self, cmd: Command) -> Result<()> {
582        tracing::debug!(
583            session_id = self.state.id,
584            command = cmd.name,
585            transaction_id = cmd.transaction_id,
586            stream_id = cmd.stream_id,
587            args = ?cmd.arguments,
588            "Received command"
589        );
590        match cmd.name.as_str() {
591            CMD_CONNECT => self.handle_connect(cmd).await?,
592            CMD_CREATE_STREAM => self.handle_create_stream(cmd).await?,
593            CMD_DELETE_STREAM => self.handle_delete_stream(cmd).await?,
594            CMD_PUBLISH => self.handle_publish(cmd).await?,
595            CMD_PLAY => self.handle_play(cmd).await?,
596            CMD_FC_PUBLISH => self.handle_fc_publish(cmd).await?,
597            CMD_FC_UNPUBLISH => self.handle_fc_unpublish(cmd).await?,
598            CMD_RELEASE_STREAM => self.handle_release_stream(cmd).await?,
599            CMD_PAUSE => self.handle_pause(cmd).await?,
600            CMD_CLOSE | "closeStream" => self.handle_close_stream(cmd).await?,
601            _ => {
602                tracing::debug!(command = cmd.name, "Unknown command");
603            }
604        }
605        Ok(())
606    }
607
608    /// Handle connect command
609    async fn handle_connect(&mut self, cmd: Command) -> Result<()> {
610        let params = ConnectParams::from_amf(&cmd.command_object);
611        let encoder_type = params
612            .flash_ver
613            .as_deref()
614            .map(EncoderType::from_flash_ver)
615            .unwrap_or(EncoderType::Unknown);
616
617        // Call handler
618        let result = self.handler.on_connect(&self.context, &params).await;
619
620        match result {
621            AuthResult::Accept => {
622                self.state.on_connect(params.clone(), encoder_type);
623                self.context.with_connect(params, encoder_type);
624
625                // Send window ack size
626                self.send_window_ack_size(self.config.window_ack_size)
627                    .await?;
628
629                // Send peer bandwidth
630                self.send_peer_bandwidth(self.config.peer_bandwidth).await?;
631
632                // Send stream begin
633                self.send_user_control(UserControlEvent::StreamBegin(0))
634                    .await?;
635
636                // Send connect result
637                self.send_connect_result(cmd.transaction_id).await?;
638
639                tracing::info!(
640                    session_id = self.state.id,
641                    app = self.context.app,
642                    "Connected"
643                );
644            }
645            AuthResult::Reject(reason) => {
646                self.send_connect_error(cmd.transaction_id, &reason).await?;
647                return Err(Error::Rejected(reason));
648            }
649            AuthResult::Redirect { url } => {
650                self.send_connect_redirect(cmd.transaction_id, &url).await?;
651                return Err(Error::Rejected(format!("Redirected to {}", url)));
652            }
653        }
654
655        Ok(())
656    }
657
658    /// Handle createStream command
659    async fn handle_create_stream(&mut self, cmd: Command) -> Result<()> {
660        let stream_id = self.state.allocate_stream_id();
661
662        let result = Command::result(
663            cmd.transaction_id,
664            AmfValue::Null,
665            AmfValue::Number(stream_id as f64),
666        );
667
668        self.send_command(CSID_COMMAND, 0, &result).await?;
669
670        tracing::debug!(stream_id = stream_id, "Stream created");
671        Ok(())
672    }
673
674    /// Handle deleteStream command
675    async fn handle_delete_stream(&mut self, cmd: Command) -> Result<()> {
676        let stream_id = cmd
677            .arguments
678            .first()
679            .and_then(|v| v.as_number())
680            .unwrap_or(0.0) as u32;
681
682        if let Some(stream) = self.state.remove_stream(stream_id) {
683            if stream.is_publishing() {
684                let stream_ctx = StreamContext::new(
685                    self.context.clone(),
686                    stream_id,
687                    stream.stream_key.unwrap_or_default(),
688                    true,
689                );
690                self.handler.on_publish_stop(&stream_ctx).await;
691            }
692        }
693
694        Ok(())
695    }
696
697    /// Handle FCPublish command (OBS/Twitch compatibility)
698    async fn handle_fc_publish(&mut self, cmd: Command) -> Result<()> {
699        let stream_key = cmd
700            .arguments
701            .first()
702            .and_then(|v| v.as_str())
703            .unwrap_or("")
704            .to_string();
705
706        let result = self.handler.on_fc_publish(&self.context, &stream_key).await;
707
708        match result {
709            AuthResult::Accept => {
710                // Store for later publish command
711                self.pending_fc
712                    .insert(stream_key.clone(), cmd.transaction_id);
713
714                // Send onFCPublish response
715                let response = Command {
716                    name: CMD_ON_FC_PUBLISH.to_string(),
717                    transaction_id: 0.0,
718                    command_object: AmfValue::Null,
719                    arguments: vec![],
720                    stream_id: 0,
721                };
722                self.send_command(CSID_COMMAND, 0, &response).await?;
723            }
724            AuthResult::Reject(reason) => {
725                return Err(Error::Rejected(reason));
726            }
727            AuthResult::Redirect { url } => {
728                return Err(Error::Rejected(format!("Redirected to {}", url)));
729            }
730        }
731
732        Ok(())
733    }
734
735    /// Handle FCUnpublish command
736    async fn handle_fc_unpublish(&mut self, cmd: Command) -> Result<()> {
737        let stream_key = cmd.arguments.first().and_then(|v| v.as_str()).unwrap_or("");
738
739        self.pending_fc.remove(stream_key);
740
741        // Send onFCUnpublish response
742        let response = Command {
743            name: CMD_ON_FC_UNPUBLISH.to_string(),
744            transaction_id: 0.0,
745            command_object: AmfValue::Null,
746            arguments: vec![],
747            stream_id: 0,
748        };
749        self.send_command(CSID_COMMAND, 0, &response).await?;
750
751        Ok(())
752    }
753
754    /// Handle releaseStream command
755    async fn handle_release_stream(&mut self, _cmd: Command) -> Result<()> {
756        // No response needed, this is just cleanup notification
757        Ok(())
758    }
759
760    /// Handle publish command
761    async fn handle_publish(&mut self, cmd: Command) -> Result<()> {
762        let stream_key = cmd
763            .arguments
764            .first()
765            .and_then(|v| v.as_str())
766            .unwrap_or("")
767            .to_string();
768
769        let publish_type = cmd
770            .arguments
771            .get(1)
772            .and_then(|v| v.as_str())
773            .unwrap_or("live")
774            .to_string();
775
776        let params = PublishParams {
777            stream_key: stream_key.clone(),
778            publish_type: publish_type.clone(),
779            stream_id: cmd.stream_id,
780        };
781
782        let result = self.handler.on_publish(&self.context, &params).await;
783
784        match result {
785            AuthResult::Accept => {
786                // Create stream key for registry
787                let app = self.context.app.clone();
788                let registry_key = StreamKey::new(&app, &stream_key);
789
790                // Register as publisher in the registry
791                if let Err(e) = self
792                    .registry
793                    .register_publisher(&registry_key, self.state.id)
794                    .await
795                {
796                    tracing::warn!(
797                        session_id = self.state.id,
798                        stream = %registry_key,
799                        error = %e,
800                        "Failed to register publisher"
801                    );
802                    let status = Command::on_status(
803                        cmd.stream_id,
804                        "error",
805                        NS_PUBLISH_BAD_NAME,
806                        &format!("Stream already publishing: {}", e),
807                    );
808                    self.send_command(CSID_COMMAND, cmd.stream_id, &status)
809                        .await?;
810                    return Err(Error::Rejected(format!("Stream already publishing: {}", e)));
811                }
812
813                // Track that we're publishing to this stream
814                self.publishing_to = Some(registry_key);
815
816                // Update stream state
817                if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
818                    stream.start_publish(stream_key.clone(), publish_type);
819                }
820
821                // Send StreamBegin
822                self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
823                    .await?;
824
825                // Send onStatus
826                let status = Command::on_status(
827                    cmd.stream_id,
828                    "status",
829                    NS_PUBLISH_START,
830                    &format!("{} is now published", stream_key),
831                );
832                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
833                    .await?;
834
835                tracing::info!(
836                    session_id = self.state.id,
837                    stream_key = stream_key,
838                    "Publishing started"
839                );
840            }
841            AuthResult::Reject(reason) => {
842                let status =
843                    Command::on_status(cmd.stream_id, "error", NS_PUBLISH_BAD_NAME, &reason);
844                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
845                    .await?;
846                return Err(Error::Rejected(reason));
847            }
848            AuthResult::Redirect { url } => {
849                return Err(Error::Rejected(format!("Redirected to {}", url)));
850            }
851        }
852
853        Ok(())
854    }
855
856    /// Handle play command
857    async fn handle_play(&mut self, cmd: Command) -> Result<()> {
858        let stream_name = cmd
859            .arguments
860            .first()
861            .and_then(|v| v.as_str())
862            .unwrap_or("")
863            .to_string();
864
865        let start = cmd
866            .arguments
867            .get(1)
868            .and_then(|v| v.as_number())
869            .unwrap_or(-2.0);
870
871        let duration = cmd
872            .arguments
873            .get(2)
874            .and_then(|v| v.as_number())
875            .unwrap_or(-1.0);
876
877        let reset = cmd
878            .arguments
879            .get(3)
880            .and_then(|v| v.as_bool())
881            .unwrap_or(true);
882
883        let params = PlayParams {
884            stream_name: stream_name.clone(),
885            start,
886            duration,
887            reset,
888            stream_id: cmd.stream_id,
889        };
890
891        let result = self.handler.on_play(&self.context, &params).await;
892
893        match result {
894            AuthResult::Accept => {
895                // Create stream key for registry
896                let app = self.context.app.clone();
897                let registry_key = StreamKey::new(&app, &stream_name);
898
899                // Subscribe to the stream in registry
900                let (rx, catchup_frames) = match self.registry.subscribe(&registry_key).await {
901                    Ok(result) => result,
902                    Err(e) => {
903                        tracing::debug!(
904                            session_id = self.state.id,
905                            stream = %registry_key,
906                            error = %e,
907                            "Stream not found for play"
908                        );
909                        let status = Command::on_status(
910                            cmd.stream_id,
911                            "error",
912                            NS_PLAY_STREAM_NOT_FOUND,
913                            &format!("Stream not found: {}", stream_name),
914                        );
915                        self.send_command(CSID_COMMAND, cmd.stream_id, &status)
916                            .await?;
917                        return Ok(());
918                    }
919                };
920
921                // Store subscription info
922                self.subscribed_to = Some(registry_key.clone());
923                self.frame_rx = Some(rx);
924                self.playback_stream_id = Some(cmd.stream_id);
925
926                if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
927                    stream.start_play(stream_name.clone());
928                }
929
930                // Send StreamBegin
931                self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
932                    .await?;
933
934                // Send onStatus Reset
935                if reset {
936                    let status = Command::on_status(
937                        cmd.stream_id,
938                        "status",
939                        NS_PLAY_RESET,
940                        "Playing and resetting",
941                    );
942                    self.send_command(CSID_COMMAND, cmd.stream_id, &status)
943                        .await?;
944                }
945
946                // Send onStatus Start
947                let status = Command::on_status(
948                    cmd.stream_id,
949                    "status",
950                    NS_PLAY_START,
951                    &format!("Started playing {}", stream_name),
952                );
953                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
954                    .await?;
955
956                // Send catchup frames (sequence headers + GOP)
957                tracing::debug!(
958                    session_id = self.state.id,
959                    stream = %registry_key,
960                    catchup_frames = catchup_frames.len(),
961                    "Sending catchup frames"
962                );
963
964                for frame in catchup_frames {
965                    self.send_broadcast_frame(frame).await?;
966                }
967
968                tracing::info!(
969                    session_id = self.state.id,
970                    stream_name = stream_name,
971                    "Playing started"
972                );
973            }
974            AuthResult::Reject(reason) => {
975                let status =
976                    Command::on_status(cmd.stream_id, "error", NS_PLAY_STREAM_NOT_FOUND, &reason);
977                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
978                    .await?;
979            }
980            AuthResult::Redirect { url: _ } => {
981                // Handle redirect
982            }
983        }
984
985        Ok(())
986    }
987
988    /// Handle closeStream command
989    async fn handle_close_stream(&mut self, cmd: Command) -> Result<()> {
990        if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
991            stream.stop();
992        }
993        Ok(())
994    }
995
996    /// Handle pause command from subscriber
997    async fn handle_pause(&mut self, cmd: Command) -> Result<()> {
998        let stream_id = match self.playback_stream_id {
999            Some(id) => id,
1000            None => return Ok(()), // Not in play mode
1001        };
1002
1003        // The pause flag is the first argument (true = pause, false = unpause)
1004        let pause_flag = cmd
1005            .arguments
1006            .first()
1007            .and_then(|v| v.as_bool())
1008            .unwrap_or(true);
1009
1010        if pause_flag {
1011            self.do_pause(stream_id).await
1012        } else {
1013            self.do_unpause(stream_id).await
1014        }
1015    }
1016
1017    /// Pause playback for subscriber
1018    async fn do_pause(&mut self, stream_id: u32) -> Result<()> {
1019        if self.is_paused {
1020            return Ok(()); // Already paused
1021        }
1022
1023        self.is_paused = true;
1024        self.frames_dropped_while_paused = 0;
1025
1026        // Send onStatus(NetStream.Pause.Notify)
1027        let status = Command::on_status(stream_id, "status", NS_PAUSE_NOTIFY, "Playback paused");
1028        self.send_command(CSID_COMMAND, stream_id, &status).await?;
1029
1030        // Send StreamDry to indicate no data coming
1031        self.send_user_control(UserControlEvent::StreamDry(stream_id))
1032            .await?;
1033
1034        // Notify handler
1035        let stream_key = self
1036            .subscribed_to
1037            .as_ref()
1038            .map(|k| k.name.clone())
1039            .unwrap_or_default();
1040        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1041        self.handler.on_pause(&stream_ctx).await;
1042
1043        tracing::info!(session_id = self.state.id, "Subscriber paused");
1044        Ok(())
1045    }
1046
1047    /// Unpause playback for subscriber
1048    async fn do_unpause(&mut self, stream_id: u32) -> Result<()> {
1049        if !self.is_paused {
1050            return Ok(()); // Not paused
1051        }
1052
1053        self.is_paused = false;
1054
1055        // Force keyframe sync for clean video resumption
1056        // Also skip audio to avoid hearing audio while video is frozen
1057        self.subscriber_state = SubscriberState::SkippingToKeyframe;
1058        self.skip_audio_until_keyframe = true;
1059
1060        // Send onStatus(NetStream.Unpause.Notify)
1061        let status = Command::on_status(stream_id, "status", NS_UNPAUSE_NOTIFY, "Playback resumed");
1062        self.send_command(CSID_COMMAND, stream_id, &status).await?;
1063
1064        // Send StreamBegin to indicate data resuming
1065        self.send_user_control(UserControlEvent::StreamBegin(stream_id))
1066            .await?;
1067
1068        // Resend sequence headers to reinitialize decoder
1069        // This is critical for clean playback resumption
1070        if let Some(ref key) = self.subscribed_to {
1071            let headers = self.registry.get_sequence_headers(key).await;
1072            tracing::debug!(
1073                session_id = self.state.id,
1074                header_count = headers.len(),
1075                "Resending sequence headers after unpause"
1076            );
1077            for frame in headers {
1078                match frame.frame_type {
1079                    FrameType::Video => {
1080                        self.send_video(stream_id, frame.timestamp, frame.data)
1081                            .await?;
1082                    }
1083                    FrameType::Audio => {
1084                        self.send_audio(stream_id, frame.timestamp, frame.data)
1085                            .await?;
1086                    }
1087                    _ => {}
1088                }
1089            }
1090            self.writer.flush().await?;
1091        }
1092
1093        // Notify handler
1094        let stream_key = self
1095            .subscribed_to
1096            .as_ref()
1097            .map(|k| k.name.clone())
1098            .unwrap_or_default();
1099        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1100        self.handler.on_unpause(&stream_ctx).await;
1101
1102        tracing::info!(
1103            session_id = self.state.id,
1104            frames_dropped = self.frames_dropped_while_paused,
1105            "Subscriber unpaused (waiting for keyframe)"
1106        );
1107        self.frames_dropped_while_paused = 0;
1108        Ok(())
1109    }
1110
1111    /// Handle data message
1112    async fn handle_data(&mut self, data: DataMessage) -> Result<()> {
1113        match data.name.as_str() {
1114            CMD_SET_DATA_FRAME => {
1115                // @setDataFrame usually has "onMetaData" as first value
1116                if let Some(AmfValue::String(name)) = data.values.first() {
1117                    if name == CMD_ON_METADATA {
1118                        self.handle_metadata(data.stream_id, &data.values[1..])
1119                            .await?;
1120                    }
1121                }
1122            }
1123            CMD_ON_METADATA => {
1124                self.handle_metadata(data.stream_id, &data.values).await?;
1125            }
1126            _ => {
1127                tracing::trace!(name = data.name, "Unknown data message");
1128            }
1129        }
1130        Ok(())
1131    }
1132
1133    /// Handle metadata
1134    async fn handle_metadata(&mut self, stream_id: u32, values: &[AmfValue]) -> Result<()> {
1135        // Extract metadata object
1136        let metadata: HashMap<String, AmfValue> = values
1137            .first()
1138            .and_then(|v| v.as_object().cloned())
1139            .unwrap_or_default();
1140
1141        if let Some(stream) = self.state.get_stream_mut(stream_id) {
1142            stream.on_metadata();
1143        }
1144
1145        if let Some(stream) = self.state.get_stream(stream_id) {
1146            if stream.is_publishing() {
1147                let stream_ctx = StreamContext::new(
1148                    self.context.clone(),
1149                    stream_id,
1150                    stream.stream_key.clone().unwrap_or_default(),
1151                    true,
1152                );
1153                self.handler.on_metadata(&stream_ctx, &metadata).await;
1154            }
1155        }
1156
1157        Ok(())
1158    }
1159
1160    /// Handle audio message
1161    async fn handle_audio(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1162        if data.is_empty() {
1163            return Ok(());
1164        }
1165
1166        if let Some(prev_audio_ts) = self.last_audio_ts {
1167            let timestamp_delta = timestamp - prev_audio_ts;
1168            tracing::trace!(
1169                timestamp = timestamp,
1170                last_audio_ts = prev_audio_ts,
1171                timestamp_delta = timestamp_delta,
1172                "connection handle_audio"
1173            );
1174        }
1175        self.last_audio_ts = Some(timestamp);
1176
1177        // Find publishing stream
1178        let stream_id = self.find_publishing_stream()?;
1179        let stream = self
1180            .state
1181            .get_stream_mut(stream_id)
1182            .ok_or_else(|| ProtocolError::StreamNotFound(stream_id))?;
1183
1184        let is_header = data.len() >= 2 && (data[0] >> 4) == 10 && data[1] == 0;
1185        stream.on_audio(timestamp, is_header, data.len());
1186
1187        // Store sequence header
1188        if is_header {
1189            let tag = FlvTag::audio(timestamp, data.clone());
1190            stream.gop_buffer.set_audio_header(tag);
1191        }
1192
1193        // Create stream context for callbacks
1194        let stream_key = stream.stream_key.clone().unwrap_or_default();
1195        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1196
1197        // Deliver based on mode
1198        let mode = self.handler.media_delivery_mode();
1199
1200        if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1201            let tag = FlvTag::audio(timestamp, data.clone());
1202            self.handler.on_media_tag(&stream_ctx, &tag).await;
1203        }
1204
1205        if matches!(
1206            mode,
1207            MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1208        ) {
1209            if data.len() >= 2 && (data[0] >> 4) == 10 {
1210                // AAC
1211                if let Ok(aac_data) = AacData::parse(data.slice(1..)) {
1212                    self.handler
1213                        .on_audio_frame(&stream_ctx, &aac_data, timestamp)
1214                        .await;
1215                }
1216            }
1217        }
1218
1219        // Broadcast to subscribers via registry
1220        if let Some(ref key) = self.publishing_to {
1221            let frame = BroadcastFrame::audio(timestamp, data, is_header);
1222            self.registry.broadcast(key, frame).await;
1223        }
1224
1225        Ok(())
1226    }
1227
1228    /// Handle video message
1229    async fn handle_video(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1230        if data.is_empty() {
1231            return Ok(());
1232        }
1233
1234        if let Some(prev_video_ts) = self.last_video_ts {
1235            let timestamp_delta = timestamp - prev_video_ts;
1236            tracing::trace!(
1237                timestamp = timestamp,
1238                last_video_ts = prev_video_ts,
1239                timestamp_delta = timestamp_delta,
1240                "connection handle_video"
1241            );
1242        }
1243        self.last_video_ts = Some(timestamp);
1244
1245        // Find publishing stream
1246        let stream_id = self.find_publishing_stream()?;
1247        let stream = self
1248            .state
1249            .get_stream_mut(stream_id)
1250            .ok_or_else(|| ProtocolError::StreamNotFound(stream_id))?;
1251
1252        let is_keyframe = (data[0] >> 4) == 1;
1253        let is_header = data.len() >= 2 && (data[0] & 0x0F) == 7 && data[1] == 0;
1254        stream.on_video(timestamp, is_keyframe, is_header, data.len());
1255
1256        // Create FLV tag
1257        let tag = FlvTag::video(timestamp, data.clone());
1258
1259        // Store sequence header
1260        if is_header {
1261            stream.gop_buffer.set_video_header(tag.clone());
1262        } else {
1263            // Add to GOP buffer
1264            stream.gop_buffer.push(tag.clone());
1265        }
1266
1267        // Create stream context for callbacks
1268        let stream_key = stream.stream_key.clone().unwrap_or_default();
1269        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1270
1271        // Notify keyframe
1272        if is_keyframe && !is_header {
1273            self.handler.on_keyframe(&stream_ctx, timestamp).await;
1274        }
1275
1276        // Deliver based on mode
1277        let mode = self.handler.media_delivery_mode();
1278
1279        if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1280            self.handler.on_media_tag(&stream_ctx, &tag).await;
1281        }
1282
1283        if matches!(
1284            mode,
1285            MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1286        ) {
1287            if data.len() >= 2 && (data[0] & 0x0F) == 7 {
1288                // AVC/H.264
1289                if let Ok(h264_data) = H264Data::parse(data.slice(1..)) {
1290                    self.handler
1291                        .on_video_frame(&stream_ctx, &h264_data, timestamp)
1292                        .await;
1293                }
1294            }
1295        }
1296
1297        // Broadcast to subscribers via registry
1298        if let Some(ref key) = self.publishing_to {
1299            let frame = BroadcastFrame::video(timestamp, data, is_keyframe, is_header);
1300            self.registry.broadcast(key, frame).await;
1301        }
1302
1303        Ok(())
1304    }
1305
1306    /// Find the publishing stream (assumes single publish per connection)
1307    fn find_publishing_stream(&self) -> Result<u32> {
1308        for (id, stream) in &self.state.streams {
1309            if stream.is_publishing() {
1310                return Ok(*id);
1311            }
1312        }
1313        Err(ProtocolError::StreamNotFound(0).into())
1314    }
1315
1316    // === Message sending helpers ===
1317
1318    async fn send_command(&mut self, csid: u32, stream_id: u32, cmd: &Command) -> Result<()> {
1319        let (msg_type, payload) = RtmpMessage::Command(cmd.clone()).encode();
1320
1321        let chunk = RtmpChunk {
1322            csid,
1323            timestamp: 0,
1324            message_type: msg_type,
1325            stream_id,
1326            payload,
1327        };
1328
1329        self.write_buf.clear();
1330        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1331        self.writer.write_all(&self.write_buf).await?;
1332        self.writer.flush().await?;
1333
1334        Ok(())
1335    }
1336
1337    async fn send_connect_result(&mut self, transaction_id: f64) -> Result<()> {
1338        let mut props = HashMap::new();
1339        props.insert(
1340            "fmsVer".to_string(),
1341            AmfValue::String("FMS/3,5,7,7009".into()),
1342        );
1343        props.insert("capabilities".to_string(), AmfValue::Number(31.0));
1344        props.insert("mode".to_string(), AmfValue::Number(1.0));
1345
1346        let mut info = HashMap::new();
1347        info.insert("level".to_string(), AmfValue::String("status".into()));
1348        info.insert(
1349            "code".to_string(),
1350            AmfValue::String(NC_CONNECT_SUCCESS.into()),
1351        );
1352        info.insert(
1353            "description".to_string(),
1354            AmfValue::String("Connection succeeded".into()),
1355        );
1356        info.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
1357
1358        let result = Command::result(
1359            transaction_id,
1360            AmfValue::Object(props),
1361            AmfValue::Object(info),
1362        );
1363
1364        self.send_command(CSID_COMMAND, 0, &result).await
1365    }
1366
1367    async fn send_connect_error(&mut self, transaction_id: f64, reason: &str) -> Result<()> {
1368        let mut info = HashMap::new();
1369        info.insert("level".to_string(), AmfValue::String("error".into()));
1370        info.insert(
1371            "code".to_string(),
1372            AmfValue::String(NC_CONNECT_REJECTED.into()),
1373        );
1374        info.insert("description".to_string(), AmfValue::String(reason.into()));
1375
1376        let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1377
1378        self.send_command(CSID_COMMAND, 0, &error).await
1379    }
1380
1381    async fn send_connect_redirect(&mut self, transaction_id: f64, url: &str) -> Result<()> {
1382        let mut info = HashMap::new();
1383        info.insert("level".to_string(), AmfValue::String("error".into()));
1384        info.insert(
1385            "code".to_string(),
1386            AmfValue::String(NC_CONNECT_REJECTED.into()),
1387        );
1388        info.insert(
1389            "description".to_string(),
1390            AmfValue::String("Redirect".into()),
1391        );
1392        info.insert("ex.redirect".to_string(), AmfValue::String(url.into()));
1393
1394        let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1395
1396        self.send_command(CSID_COMMAND, 0, &error).await
1397    }
1398
1399    async fn send_set_chunk_size(&mut self, size: u32) -> Result<()> {
1400        let (msg_type, payload) = RtmpMessage::SetChunkSize(size).encode();
1401
1402        let chunk = RtmpChunk {
1403            csid: CSID_PROTOCOL_CONTROL,
1404            timestamp: 0,
1405            message_type: msg_type,
1406            stream_id: 0,
1407            payload,
1408        };
1409
1410        self.write_buf.clear();
1411        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1412        self.writer.write_all(&self.write_buf).await?;
1413        self.writer.flush().await?;
1414
1415        Ok(())
1416    }
1417
1418    async fn send_window_ack_size(&mut self, size: u32) -> Result<()> {
1419        let (msg_type, payload) = RtmpMessage::WindowAckSize(size).encode();
1420
1421        let chunk = RtmpChunk {
1422            csid: CSID_PROTOCOL_CONTROL,
1423            timestamp: 0,
1424            message_type: msg_type,
1425            stream_id: 0,
1426            payload,
1427        };
1428
1429        self.write_buf.clear();
1430        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1431        self.writer.write_all(&self.write_buf).await?;
1432        self.writer.flush().await?;
1433
1434        Ok(())
1435    }
1436
1437    async fn send_peer_bandwidth(&mut self, size: u32) -> Result<()> {
1438        let (msg_type, payload) = RtmpMessage::SetPeerBandwidth {
1439            size,
1440            limit_type: BANDWIDTH_LIMIT_DYNAMIC,
1441        }
1442        .encode();
1443
1444        let chunk = RtmpChunk {
1445            csid: CSID_PROTOCOL_CONTROL,
1446            timestamp: 0,
1447            message_type: msg_type,
1448            stream_id: 0,
1449            payload,
1450        };
1451
1452        self.write_buf.clear();
1453        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1454        self.writer.write_all(&self.write_buf).await?;
1455        self.writer.flush().await?;
1456
1457        Ok(())
1458    }
1459
1460    async fn send_user_control(&mut self, event: UserControlEvent) -> Result<()> {
1461        let (msg_type, payload) = RtmpMessage::UserControl(event).encode();
1462
1463        let chunk = RtmpChunk {
1464            csid: CSID_PROTOCOL_CONTROL,
1465            timestamp: 0,
1466            message_type: msg_type,
1467            stream_id: 0,
1468            payload,
1469        };
1470
1471        self.write_buf.clear();
1472        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1473        self.writer.write_all(&self.write_buf).await?;
1474        self.writer.flush().await?;
1475
1476        Ok(())
1477    }
1478
1479    async fn send_acknowledgement(&mut self) -> Result<()> {
1480        let sequence = self.state.bytes_received as u32;
1481
1482        let (msg_type, payload) = RtmpMessage::Acknowledgement { sequence }.encode();
1483
1484        let chunk = RtmpChunk {
1485            csid: CSID_PROTOCOL_CONTROL,
1486            timestamp: 0,
1487            message_type: msg_type,
1488            stream_id: 0,
1489            payload,
1490        };
1491
1492        self.write_buf.clear();
1493        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1494        self.writer.write_all(&self.write_buf).await?;
1495        self.writer.flush().await?;
1496
1497        self.state.mark_ack_sent();
1498        Ok(())
1499    }
1500
1501    async fn send_ping_response(&mut self, timestamp: u32) -> Result<()> {
1502        self.send_user_control(UserControlEvent::PingResponse(timestamp))
1503            .await
1504    }
1505
1506    // === Media sending methods for subscriber mode ===
1507
1508    /// Send a video message to the client
1509    async fn send_video(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1510        let (msg_type, payload) = RtmpMessage::Video {
1511            timestamp,
1512            data: data.clone(),
1513        }
1514        .encode();
1515
1516        let chunk = RtmpChunk {
1517            csid: CSID_VIDEO,
1518            timestamp,
1519            message_type: msg_type,
1520            stream_id,
1521            payload,
1522        };
1523
1524        self.write_buf.clear();
1525        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1526        self.writer.write_all(&self.write_buf).await?;
1527        // Don't flush after every frame - batch writes for efficiency
1528        Ok(())
1529    }
1530
1531    /// Send an audio message to the client
1532    async fn send_audio(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1533        let (msg_type, payload) = RtmpMessage::Audio {
1534            timestamp,
1535            data: data.clone(),
1536        }
1537        .encode();
1538
1539        let chunk = RtmpChunk {
1540            csid: CSID_AUDIO,
1541            timestamp,
1542            message_type: msg_type,
1543            stream_id,
1544            payload,
1545        };
1546
1547        self.write_buf.clear();
1548        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1549        self.writer.write_all(&self.write_buf).await?;
1550        // Don't flush after every frame - batch writes for efficiency
1551        Ok(())
1552    }
1553
1554    /// Send a broadcast frame to the subscriber client
1555    ///
1556    /// Handles backpressure by skipping non-keyframes when in skip mode.
1557    /// Also handles pause state by consuming frames without sending.
1558    async fn send_broadcast_frame(&mut self, frame: BroadcastFrame) -> Result<()> {
1559        let stream_id = self.playback_stream_id.unwrap_or(1);
1560
1561        // PAUSE: Consume frame but don't send
1562        if self.is_paused {
1563            self.frames_dropped_while_paused += 1;
1564            tracing::trace!(session_id = self.state.id, "Frame dropped (paused)");
1565            return Ok(());
1566        }
1567
1568        // Backpressure handling: skip non-keyframes if we're lagging
1569        if self.subscriber_state == SubscriberState::SkippingToKeyframe {
1570            match frame.frame_type {
1571                FrameType::Video => {
1572                    if frame.is_keyframe || frame.is_header {
1573                        // Got a keyframe or header, resume normal operation
1574                        self.subscriber_state = SubscriberState::Normal;
1575                        self.skip_audio_until_keyframe = false;
1576                        tracing::debug!(
1577                            session_id = self.state.id,
1578                            "Received keyframe, resuming normal playback"
1579                        );
1580                    } else {
1581                        // Skip non-keyframe video
1582                        return Ok(());
1583                    }
1584                }
1585                FrameType::Audio => {
1586                    // Skip audio after unpause to avoid audio playing while video frozen
1587                    // But keep audio during lag recovery (glitches worse than brief desync)
1588                    if self.skip_audio_until_keyframe {
1589                        return Ok(());
1590                    }
1591                }
1592                FrameType::Metadata => {
1593                    // Always forward metadata
1594                }
1595            }
1596        }
1597
1598        // Send the frame based on type
1599        match frame.frame_type {
1600            FrameType::Video => {
1601                self.send_video(stream_id, frame.timestamp, frame.data)
1602                    .await?;
1603            }
1604            FrameType::Audio => {
1605                self.send_audio(stream_id, frame.timestamp, frame.data)
1606                    .await?;
1607            }
1608            FrameType::Metadata => {
1609                // Send metadata as data message
1610                self.send_metadata_frame(stream_id, frame.data).await?;
1611            }
1612        }
1613
1614        // Periodically flush to ensure data is sent
1615        // The flush happens less frequently when sending many frames
1616        self.writer.flush().await?;
1617
1618        Ok(())
1619    }
1620
1621    /// Send metadata frame to subscriber
1622    async fn send_metadata_frame(&mut self, stream_id: u32, data: Bytes) -> Result<()> {
1623        // Metadata is sent as a data message with onMetaData
1624        let data_msg = DataMessage {
1625            name: CMD_ON_METADATA.to_string(),
1626            values: vec![AmfValue::String("onMetaData".to_string())],
1627            stream_id,
1628        };
1629
1630        let (msg_type, payload) = RtmpMessage::Data(data_msg).encode();
1631
1632        // If the original data is available, use it directly
1633        // Otherwise, encode the metadata
1634        let chunk = RtmpChunk {
1635            csid: CSID_COMMAND,
1636            timestamp: 0,
1637            message_type: msg_type,
1638            stream_id,
1639            payload: if data.is_empty() { payload } else { data },
1640        };
1641
1642        self.write_buf.clear();
1643        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1644        self.writer.write_all(&self.write_buf).await?;
1645
1646        Ok(())
1647    }
1648}
1649
1650use bytes::Buf;