rtmp_rs/server/
connection.rs

1//! Per-connection RTMP handler
2//!
3//! Manages the lifecycle of a single RTMP connection:
4//! 1. Handshake
5//! 2. Connect command
6//! 3. Stream commands (publish/play)
7//! 4. Media handling
8//! 5. Disconnect
9
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::Arc;
13
14use bytes::{Bytes, BytesMut};
15use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
16use tokio::net::TcpStream;
17use tokio::sync::broadcast;
18use tokio::time::timeout;
19
20use crate::registry::{BroadcastFrame, FrameType, StreamKey, StreamRegistry};
21
22use crate::amf::AmfValue;
23use crate::error::{Error, ProtocolError, Result};
24use crate::media::flv::FlvTag;
25use crate::media::{AacData, H264Data};
26use crate::protocol::chunk::{ChunkDecoder, ChunkEncoder, RtmpChunk};
27use crate::protocol::constants::*;
28use crate::protocol::handshake::{Handshake, HandshakeRole};
29use crate::protocol::message::{
30    Command, ConnectParams, DataMessage, PlayParams, PublishParams, RtmpMessage, UserControlEvent,
31};
32use crate::protocol::quirks::EncoderType;
33use crate::server::config::ServerConfig;
34use crate::server::handler::{AuthResult, MediaDeliveryMode, RtmpHandler};
35use crate::session::context::{SessionContext, StreamContext};
36use crate::session::state::SessionState;
37
38/// Subscriber state for backpressure handling
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40enum SubscriberState {
41    /// Normal operation
42    Normal,
43    /// Skipping frames until next keyframe (due to lag)
44    SkippingToKeyframe,
45}
46
47/// Per-connection handler
48pub struct Connection<H: RtmpHandler> {
49    /// Session state
50    state: SessionState,
51
52    /// Session context for callbacks
53    context: SessionContext,
54
55    /// TCP stream (buffered)
56    reader: BufReader<tokio::io::ReadHalf<TcpStream>>,
57    writer: BufWriter<tokio::io::WriteHalf<TcpStream>>,
58
59    /// Read buffer
60    read_buf: BytesMut,
61
62    /// Chunk decoder/encoder
63    chunk_decoder: ChunkDecoder,
64    chunk_encoder: ChunkEncoder,
65
66    /// Write buffer for outgoing chunks
67    write_buf: BytesMut,
68
69    /// Server configuration
70    config: ServerConfig,
71
72    /// Application handler
73    handler: Arc<H>,
74
75    /// Stream registry for pub/sub routing
76    registry: Arc<StreamRegistry>,
77
78    /// Pending FC commands (stream key -> transaction ID)
79    pending_fc: HashMap<String, f64>,
80
81    /// Stream key we are publishing to (if any)
82    publishing_to: Option<StreamKey>,
83
84    /// Stream key we are subscribed to (if any)
85    subscribed_to: Option<StreamKey>,
86
87    last_audio_ts: Option<u32>,
88
89    last_video_ts: Option<u32>,
90
91    /// Broadcast receiver for subscriber mode
92    frame_rx: Option<broadcast::Receiver<BroadcastFrame>>,
93
94    /// Subscriber state for backpressure handling
95    subscriber_state: SubscriberState,
96
97    /// Count of consecutive lag events (for disconnection threshold)
98    consecutive_lag_count: u32,
99
100    /// RTMP stream ID we're using for playback (subscriber mode)
101    playback_stream_id: Option<u32>,
102
103    /// Whether the subscriber is paused
104    is_paused: bool,
105
106    /// Frames dropped while paused (for logging)
107    frames_dropped_while_paused: u64,
108
109    /// Skip audio until keyframe (set on unpause, cleared on keyframe)
110    /// This prevents the jarring experience of audio playing while video is frozen
111    skip_audio_until_keyframe: bool,
112}
113
114impl<H: RtmpHandler> Connection<H> {
115    /// Create a new connection handler
116    pub fn new(
117        session_id: u64,
118        socket: TcpStream,
119        peer_addr: SocketAddr,
120        config: ServerConfig,
121        handler: Arc<H>,
122        registry: Arc<StreamRegistry>,
123    ) -> Self {
124        let (read_half, write_half) = tokio::io::split(socket);
125
126        Self {
127            state: SessionState::new(session_id, peer_addr),
128            context: SessionContext::new(session_id, peer_addr),
129            reader: BufReader::with_capacity(config.read_buffer_size, read_half),
130            writer: BufWriter::with_capacity(config.write_buffer_size, write_half),
131            read_buf: BytesMut::with_capacity(config.read_buffer_size),
132            chunk_decoder: ChunkDecoder::new(),
133            chunk_encoder: ChunkEncoder::new(),
134            write_buf: BytesMut::with_capacity(config.write_buffer_size),
135            config,
136            handler,
137            registry,
138            pending_fc: HashMap::new(),
139            publishing_to: None,
140            subscribed_to: None,
141            last_audio_ts: None,
142            last_video_ts: None,
143            frame_rx: None,
144            subscriber_state: SubscriberState::Normal,
145            consecutive_lag_count: 0,
146            playback_stream_id: None,
147            is_paused: false,
148            frames_dropped_while_paused: 0,
149            skip_audio_until_keyframe: false,
150        }
151    }
152
153    /// Run the connection
154    pub async fn run(&mut self) -> Result<()> {
155        // Check if handler allows connection
156        if !self.handler.on_connection(&self.context).await {
157            return Err(Error::Rejected("Connection rejected by handler".into()));
158        }
159
160        // Perform handshake
161        self.do_handshake().await?;
162        self.handler.on_handshake_complete(&self.context).await;
163
164        // Set our chunk size
165        tracing::debug!(
166            session_id = self.state.id,
167            chunk_size = self.config.chunk_size,
168            "Sending set chunk size"
169        );
170        self.send_set_chunk_size(self.config.chunk_size).await?;
171        self.chunk_encoder.set_chunk_size(self.config.chunk_size);
172
173        tracing::debug!(session_id = self.state.id, "Entering main message loop");
174
175        // Main message loop
176        let idle_timeout = self.config.idle_timeout;
177        let result = loop {
178            // Handle subscriber mode: take frame_rx out to avoid borrow conflicts
179            let mut frame_rx = self.frame_rx.take();
180
181            // Use select! to handle both TCP input and broadcast frames
182            let loop_result = if let Some(ref mut rx) = frame_rx {
183                // Subscriber mode: listen for both TCP and broadcast frames
184                tokio::select! {
185                    biased;
186
187                    // Receive broadcast frames for subscribers (higher priority)
188                    frame_result = rx.recv() => {
189                        match frame_result {
190                            Ok(frame) => {
191                                // Put receiver back before processing
192                                self.frame_rx = frame_rx;
193                                // Reset lag count on successful receive
194                                self.consecutive_lag_count = 0;
195                                if let Err(e) = self.send_broadcast_frame(frame).await {
196                                    tracing::debug!(error = %e, "Failed to send frame");
197                                    Err(e)
198                                } else {
199                                    Ok(true)
200                                }
201                            }
202                            Err(broadcast::error::RecvError::Lagged(n)) => {
203                                self.frame_rx = frame_rx;
204                                self.handle_lag(n).await.map(|_| true)
205                            }
206                            Err(broadcast::error::RecvError::Closed) => {
207                                self.frame_rx = frame_rx;
208                                // Publisher ended, notify subscriber
209                                if let Err(e) = self.handle_stream_ended().await {
210                                    tracing::debug!(error = %e, "Error handling stream end");
211                                }
212                                Ok(false) // Signal to exit loop
213                            }
214                        }
215                    }
216
217                    // Read from TCP
218                    result = timeout(idle_timeout, self.read_and_process()) => {
219                        self.frame_rx = frame_rx;
220                        match result {
221                            Ok(Ok(continue_loop)) => Ok(continue_loop),
222                            Ok(Err(e)) => {
223                                tracing::debug!(error = %e, "Processing error");
224                                Err(e)
225                            }
226                            Err(_) => {
227                                tracing::debug!("Idle timeout");
228                                Ok(false)
229                            }
230                        }
231                    }
232                }
233            } else {
234                // Publisher mode: only listen for TCP
235                self.frame_rx = frame_rx;
236                match timeout(idle_timeout, self.read_and_process()).await {
237                    Ok(Ok(continue_loop)) => Ok(continue_loop),
238                    Ok(Err(e)) => {
239                        tracing::debug!(error = %e, "Processing error");
240                        Err(e)
241                    }
242                    Err(_) => {
243                        tracing::debug!("Idle timeout");
244                        Ok(false)
245                    }
246                }
247            };
248
249            match loop_result {
250                Ok(true) => continue,
251                Ok(false) => break Ok(()),
252                Err(e) => break Err(e),
253            }
254        };
255
256        // Cleanup: unregister publisher or unsubscribe
257        self.cleanup_on_disconnect().await;
258
259        // Notify handler of disconnect
260        self.handler.on_disconnect(&self.context).await;
261
262        result
263    }
264
265    /// Cleanup when connection disconnects
266    async fn cleanup_on_disconnect(&mut self) {
267        // Unregister as publisher if we were publishing
268        if let Some(ref key) = self.publishing_to {
269            self.registry.unregister_publisher(key, self.state.id).await;
270            tracing::debug!(
271                session_id = self.state.id,
272                stream = %key,
273                "Unregistered publisher on disconnect"
274            );
275        }
276
277        // Unsubscribe if we were subscribed
278        if let Some(ref key) = self.subscribed_to {
279            self.registry.unsubscribe(key).await;
280            tracing::debug!(
281                session_id = self.state.id,
282                stream = %key,
283                "Unsubscribed on disconnect"
284            );
285        }
286    }
287
288    /// Handle lag event from broadcast channel
289    async fn handle_lag(&mut self, skipped: u64) -> Result<()> {
290        let config = self.registry.config();
291
292        self.consecutive_lag_count += 1;
293
294        if skipped < config.lag_threshold_low {
295            // Minor lag, continue normally
296            tracing::debug!(
297                session_id = self.state.id,
298                skipped = skipped,
299                "Minor broadcast lag, continuing"
300            );
301            return Ok(());
302        }
303
304        // Significant lag - enter skip mode
305        if self.subscriber_state != SubscriberState::SkippingToKeyframe {
306            self.subscriber_state = SubscriberState::SkippingToKeyframe;
307            tracing::warn!(
308                session_id = self.state.id,
309                skipped = skipped,
310                "Subscriber lagging, skipping to next keyframe"
311            );
312        }
313
314        // Check if we should disconnect slow subscriber
315        if self.consecutive_lag_count >= config.max_consecutive_lag_events {
316            tracing::warn!(
317                session_id = self.state.id,
318                consecutive_lags = self.consecutive_lag_count,
319                "Disconnecting slow subscriber"
320            );
321            return Err(Error::Rejected("Subscriber too slow".into()));
322        }
323
324        Ok(())
325    }
326
327    /// Handle stream ended (publisher closed broadcast channel)
328    async fn handle_stream_ended(&mut self) -> Result<()> {
329        // Reset pause state on stream end
330        if self.is_paused {
331            self.is_paused = false;
332        }
333
334        if let Some(stream_id) = self.playback_stream_id {
335            // Send StreamEOF
336            self.send_user_control(UserControlEvent::StreamEof(stream_id))
337                .await?;
338
339            // Send onStatus
340            let status = Command::on_status(stream_id, "status", NS_PLAY_STOP, "Stream ended");
341            self.send_command(CSID_COMMAND, stream_id, &status).await?;
342
343            tracing::info!(
344                session_id = self.state.id,
345                stream_id = stream_id,
346                "Stream ended, notified subscriber"
347            );
348        }
349
350        Ok(())
351    }
352
353    /// Perform RTMP handshake
354    async fn do_handshake(&mut self) -> Result<()> {
355        let mut handshake = Handshake::new(HandshakeRole::Server);
356
357        // Move to waiting state
358        handshake.generate_initial();
359        self.state.start_handshake();
360
361        // Wait for C0C1
362        let connection_timeout = self.config.connection_timeout;
363        timeout(connection_timeout, async {
364            loop {
365                // Read more data
366                let bytes_needed = handshake.bytes_needed();
367                if bytes_needed > 0 && self.read_buf.len() < bytes_needed {
368                    let n = self.reader.read_buf(&mut self.read_buf).await?;
369                    if n == 0 {
370                        return Err(Error::ConnectionClosed);
371                    }
372                }
373
374                // Process handshake
375                let mut buf = Bytes::copy_from_slice(&self.read_buf);
376                let response = handshake.process(&mut buf)?;
377
378                // Always consume processed bytes (even if no response to send)
379                let consumed = self.read_buf.len() - buf.len();
380                if consumed > 0 {
381                    self.read_buf.advance(consumed);
382                }
383
384                // Send response if any (S0S1S2 for C0C1, nothing for C2)
385                if let Some(data) = response {
386                    self.writer.write_all(&data).await?;
387                    self.writer.flush().await?;
388                }
389
390                if handshake.is_done() {
391                    break;
392                }
393            }
394
395            Ok::<_, Error>(())
396        })
397        .await
398        .map_err(|_| Error::Timeout)??;
399
400        self.state.complete_handshake();
401        tracing::debug!(
402            session_id = self.state.id,
403            remaining_buf = self.read_buf.len(),
404            "Handshake complete"
405        );
406
407        // Debug: dump first 32 bytes of remaining buffer
408        if !self.read_buf.is_empty() {
409            let dump_len = self.read_buf.len().min(32);
410            let hex: String = self.read_buf[..dump_len]
411                .iter()
412                .map(|b| format!("{:02x}", b))
413                .collect::<Vec<_>>()
414                .join(" ");
415            tracing::debug!(
416                session_id = self.state.id,
417                first_bytes = %hex,
418                "Buffer after handshake"
419            );
420        }
421
422        Ok(())
423    }
424
425    /// Read data and process messages
426    async fn read_and_process(&mut self) -> Result<bool> {
427        tracing::trace!(
428            session_id = self.state.id,
429            buf_len = self.read_buf.len(),
430            "read_and_process called"
431        );
432
433        // Keep trying to decode until we need more data
434        // This is important for multi-chunk messages where multiple chunks
435        // may be in the buffer but only the last one completes the message
436        loop {
437            let buf_len_before = self.read_buf.len();
438
439            // Try to decode a complete message
440            if let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
441                tracing::trace!(
442                    session_id = self.state.id,
443                    csid = chunk.csid,
444                    msg_type = chunk.message_type,
445                    "Decoded chunk from buffer"
446                );
447                self.handle_chunk(chunk).await?;
448                // Continue to try decoding more messages
449                continue;
450            }
451
452            // decode() returned None - check if it consumed any data
453            // If it did, there's a partial chunk being assembled, keep trying
454            let buf_len_after = self.read_buf.len();
455            if buf_len_after < buf_len_before {
456                // Some data was consumed (partial chunk), try again
457                continue;
458            }
459
460            // No progress - either buffer is empty or we need more data
461            break;
462        }
463
464        tracing::trace!(
465            session_id = self.state.id,
466            buf_len = self.read_buf.len(),
467            "Waiting for more data"
468        );
469
470        // Wait for more data from the socket
471        let n = self.reader.read_buf(&mut self.read_buf).await?;
472        if n == 0 {
473            return Ok(false); // Connection closed
474        }
475
476        tracing::trace!(
477            session_id = self.state.id,
478            bytes_read = n,
479            buf_len = self.read_buf.len(),
480            "Read data from socket"
481        );
482
483        let needs_ack = self.state.add_bytes_received(n as u64);
484
485        // Process any complete messages with the new data
486        while let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
487            tracing::trace!(
488                session_id = self.state.id,
489                csid = chunk.csid,
490                msg_type = chunk.message_type,
491                "Decoded chunk after read"
492            );
493            self.handle_chunk(chunk).await?;
494        }
495
496        // Send acknowledgement if needed
497        if needs_ack {
498            self.send_acknowledgement().await?;
499        }
500
501        Ok(true)
502    }
503
504    /// Handle a decoded chunk
505    async fn handle_chunk(&mut self, chunk: RtmpChunk) -> Result<()> {
506        let message = RtmpMessage::from_chunk(&chunk)?;
507
508        match message {
509            RtmpMessage::SetChunkSize(size) => {
510                tracing::debug!(size = size, "Peer set chunk size");
511                self.chunk_decoder.set_chunk_size(size);
512                self.state.in_chunk_size = size;
513            }
514
515            RtmpMessage::Abort { csid } => {
516                self.chunk_decoder.abort(csid);
517            }
518
519            RtmpMessage::Acknowledgement { sequence: _ } => {
520                // Peer acknowledged bytes - we can track this for flow control
521            }
522
523            RtmpMessage::WindowAckSize(size) => {
524                self.state.window_ack_size = size;
525            }
526
527            RtmpMessage::SetPeerBandwidth {
528                size,
529                limit_type: _,
530            } => {
531                // Send window ack size back
532                self.send_window_ack_size(size).await?;
533            }
534
535            RtmpMessage::UserControl(event) => {
536                self.handle_user_control(event).await?;
537            }
538
539            RtmpMessage::Command(cmd) | RtmpMessage::CommandAmf3(cmd) => {
540                self.handle_command(cmd).await?;
541            }
542
543            RtmpMessage::Data(data) | RtmpMessage::DataAmf3(data) => {
544                self.handle_data(data).await?;
545            }
546
547            RtmpMessage::Audio { timestamp, data } => {
548                self.handle_audio(timestamp, data).await?;
549            }
550
551            RtmpMessage::Video { timestamp, data } => {
552                self.handle_video(timestamp, data).await?;
553            }
554
555            _ => {
556                tracing::trace!(message = ?message, "Unhandled message");
557            }
558        }
559
560        Ok(())
561    }
562
563    /// Handle user control event
564    async fn handle_user_control(&mut self, event: UserControlEvent) -> Result<()> {
565        match event {
566            UserControlEvent::PingRequest(timestamp) => {
567                self.send_ping_response(timestamp).await?;
568            }
569            UserControlEvent::SetBufferLength {
570                stream_id: _,
571                buffer_ms: _,
572            } => {
573                // Client's buffer length - we can use this for flow control
574            }
575            _ => {}
576        }
577        Ok(())
578    }
579
580    /// Handle command message
581    async fn handle_command(&mut self, cmd: Command) -> Result<()> {
582        tracing::debug!(
583            session_id = self.state.id,
584            command = cmd.name,
585            transaction_id = cmd.transaction_id,
586            stream_id = cmd.stream_id,
587            args = ?cmd.arguments,
588            "Received command"
589        );
590        match cmd.name.as_str() {
591            CMD_CONNECT => self.handle_connect(cmd).await?,
592            CMD_CREATE_STREAM => self.handle_create_stream(cmd).await?,
593            CMD_DELETE_STREAM => self.handle_delete_stream(cmd).await?,
594            CMD_PUBLISH => self.handle_publish(cmd).await?,
595            CMD_PLAY => self.handle_play(cmd).await?,
596            CMD_FC_PUBLISH => self.handle_fc_publish(cmd).await?,
597            CMD_FC_UNPUBLISH => self.handle_fc_unpublish(cmd).await?,
598            CMD_RELEASE_STREAM => self.handle_release_stream(cmd).await?,
599            CMD_PAUSE => self.handle_pause(cmd).await?,
600            CMD_CLOSE | "closeStream" => self.handle_close_stream(cmd).await?,
601            _ => {
602                tracing::debug!(command = cmd.name, "Unknown command");
603            }
604        }
605        Ok(())
606    }
607
608    /// Handle connect command
609    async fn handle_connect(&mut self, cmd: Command) -> Result<()> {
610        let params = ConnectParams::from_amf(&cmd.command_object);
611        let encoder_type = params
612            .flash_ver
613            .as_deref()
614            .map(EncoderType::from_flash_ver)
615            .unwrap_or(EncoderType::Unknown);
616
617        // Call handler
618        let result = self.handler.on_connect(&self.context, &params).await;
619
620        match result {
621            AuthResult::Accept => {
622                self.state.on_connect(params.clone(), encoder_type);
623                self.context.with_connect(params, encoder_type);
624
625                // Send window ack size
626                self.send_window_ack_size(self.config.window_ack_size)
627                    .await?;
628
629                // Send peer bandwidth
630                self.send_peer_bandwidth(self.config.peer_bandwidth).await?;
631
632                // Send stream begin
633                self.send_user_control(UserControlEvent::StreamBegin(0))
634                    .await?;
635
636                // Send connect result
637                self.send_connect_result(cmd.transaction_id).await?;
638
639                tracing::info!(
640                    session_id = self.state.id,
641                    app = self.context.app,
642                    "Connected"
643                );
644            }
645            AuthResult::Reject(reason) => {
646                self.send_connect_error(cmd.transaction_id, &reason).await?;
647                return Err(Error::Rejected(reason));
648            }
649            AuthResult::Redirect { url } => {
650                self.send_connect_redirect(cmd.transaction_id, &url).await?;
651                return Err(Error::Rejected(format!("Redirected to {}", url)));
652            }
653        }
654
655        Ok(())
656    }
657
658    /// Handle createStream command
659    async fn handle_create_stream(&mut self, cmd: Command) -> Result<()> {
660        let stream_id = self.state.allocate_stream_id();
661
662        let result = Command::result(
663            cmd.transaction_id,
664            AmfValue::Null,
665            AmfValue::Number(stream_id as f64),
666        );
667
668        self.send_command(CSID_COMMAND, 0, &result).await?;
669
670        tracing::debug!(stream_id = stream_id, "Stream created");
671        Ok(())
672    }
673
674    /// Handle deleteStream command
675    async fn handle_delete_stream(&mut self, cmd: Command) -> Result<()> {
676        let stream_id = cmd
677            .arguments
678            .first()
679            .and_then(|v| v.as_number())
680            .unwrap_or(0.0) as u32;
681
682        if let Some(stream) = self.state.remove_stream(stream_id) {
683            if stream.is_publishing() {
684                let stream_ctx = StreamContext::new(
685                    self.context.clone(),
686                    stream_id,
687                    stream.stream_key.unwrap_or_default(),
688                    true,
689                );
690                #[allow(deprecated)]
691                self.handler.on_publish_stop(&stream_ctx).await;
692                self.handler.on_unpublish(&stream_ctx).await;
693            }
694        }
695
696        Ok(())
697    }
698
699    /// Handle FCPublish command (OBS/Twitch compatibility)
700    async fn handle_fc_publish(&mut self, cmd: Command) -> Result<()> {
701        let stream_key = cmd
702            .arguments
703            .first()
704            .and_then(|v| v.as_str())
705            .unwrap_or("")
706            .to_string();
707
708        let result = self.handler.on_fc_publish(&self.context, &stream_key).await;
709
710        match result {
711            AuthResult::Accept => {
712                // Store for later publish command
713                self.pending_fc
714                    .insert(stream_key.clone(), cmd.transaction_id);
715
716                // Send onFCPublish response
717                let response = Command {
718                    name: CMD_ON_FC_PUBLISH.to_string(),
719                    transaction_id: 0.0,
720                    command_object: AmfValue::Null,
721                    arguments: vec![],
722                    stream_id: 0,
723                };
724                self.send_command(CSID_COMMAND, 0, &response).await?;
725            }
726            AuthResult::Reject(reason) => {
727                return Err(Error::Rejected(reason));
728            }
729            AuthResult::Redirect { url } => {
730                return Err(Error::Rejected(format!("Redirected to {}", url)));
731            }
732        }
733
734        Ok(())
735    }
736
737    /// Handle FCUnpublish command
738    async fn handle_fc_unpublish(&mut self, cmd: Command) -> Result<()> {
739        let stream_key = cmd.arguments.first().and_then(|v| v.as_str()).unwrap_or("");
740
741        self.pending_fc.remove(stream_key);
742
743        // Send onFCUnpublish response
744        let response = Command {
745            name: CMD_ON_FC_UNPUBLISH.to_string(),
746            transaction_id: 0.0,
747            command_object: AmfValue::Null,
748            arguments: vec![],
749            stream_id: 0,
750        };
751        self.send_command(CSID_COMMAND, 0, &response).await?;
752
753        Ok(())
754    }
755
756    /// Handle releaseStream command
757    async fn handle_release_stream(&mut self, _cmd: Command) -> Result<()> {
758        // No response needed, this is just cleanup notification
759        Ok(())
760    }
761
762    /// Handle publish command
763    async fn handle_publish(&mut self, cmd: Command) -> Result<()> {
764        let stream_key = cmd
765            .arguments
766            .first()
767            .and_then(|v| v.as_str())
768            .unwrap_or("")
769            .to_string();
770
771        let publish_type = cmd
772            .arguments
773            .get(1)
774            .and_then(|v| v.as_str())
775            .unwrap_or("live")
776            .to_string();
777
778        let params = PublishParams {
779            stream_key: stream_key.clone(),
780            publish_type: publish_type.clone(),
781            stream_id: cmd.stream_id,
782        };
783
784        let result = self.handler.on_publish(&self.context, &params).await;
785
786        match result {
787            AuthResult::Accept => {
788                // Create stream key for registry
789                let app = self.context.app.clone();
790                let registry_key = StreamKey::new(&app, &stream_key);
791
792                // Register as publisher in the registry
793                if let Err(e) = self
794                    .registry
795                    .register_publisher(&registry_key, self.state.id)
796                    .await
797                {
798                    tracing::warn!(
799                        session_id = self.state.id,
800                        stream = %registry_key,
801                        error = %e,
802                        "Failed to register publisher"
803                    );
804                    let status = Command::on_status(
805                        cmd.stream_id,
806                        "error",
807                        NS_PUBLISH_BAD_NAME,
808                        &format!("Stream already publishing: {}", e),
809                    );
810                    self.send_command(CSID_COMMAND, cmd.stream_id, &status)
811                        .await?;
812                    return Err(Error::Rejected(format!("Stream already publishing: {}", e)));
813                }
814
815                // Track that we're publishing to this stream
816                self.publishing_to = Some(registry_key);
817
818                // Update stream state
819                if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
820                    stream.start_publish(stream_key.clone(), publish_type);
821                }
822
823                // Send StreamBegin
824                self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
825                    .await?;
826
827                // Send onStatus
828                let status = Command::on_status(
829                    cmd.stream_id,
830                    "status",
831                    NS_PUBLISH_START,
832                    &format!("{} is now published", stream_key),
833                );
834                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
835                    .await?;
836
837                tracing::info!(
838                    session_id = self.state.id,
839                    stream_key = stream_key,
840                    "Publishing started"
841                );
842            }
843            AuthResult::Reject(reason) => {
844                let status =
845                    Command::on_status(cmd.stream_id, "error", NS_PUBLISH_BAD_NAME, &reason);
846                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
847                    .await?;
848                return Err(Error::Rejected(reason));
849            }
850            AuthResult::Redirect { url } => {
851                return Err(Error::Rejected(format!("Redirected to {}", url)));
852            }
853        }
854
855        Ok(())
856    }
857
858    /// Handle play command
859    async fn handle_play(&mut self, cmd: Command) -> Result<()> {
860        let stream_name = cmd
861            .arguments
862            .first()
863            .and_then(|v| v.as_str())
864            .unwrap_or("")
865            .to_string();
866
867        let start = cmd
868            .arguments
869            .get(1)
870            .and_then(|v| v.as_number())
871            .unwrap_or(-2.0);
872
873        let duration = cmd
874            .arguments
875            .get(2)
876            .and_then(|v| v.as_number())
877            .unwrap_or(-1.0);
878
879        let reset = cmd
880            .arguments
881            .get(3)
882            .and_then(|v| v.as_bool())
883            .unwrap_or(true);
884
885        let params = PlayParams {
886            stream_name: stream_name.clone(),
887            start,
888            duration,
889            reset,
890            stream_id: cmd.stream_id,
891        };
892
893        let result = self.handler.on_play(&self.context, &params).await;
894
895        match result {
896            AuthResult::Accept => {
897                // Create stream key for registry
898                let app = self.context.app.clone();
899                let registry_key = StreamKey::new(&app, &stream_name);
900
901                // Subscribe to the stream in registry
902                let (rx, catchup_frames) = match self.registry.subscribe(&registry_key).await {
903                    Ok(result) => result,
904                    Err(e) => {
905                        tracing::debug!(
906                            session_id = self.state.id,
907                            stream = %registry_key,
908                            error = %e,
909                            "Stream not found for play"
910                        );
911                        let status = Command::on_status(
912                            cmd.stream_id,
913                            "error",
914                            NS_PLAY_STREAM_NOT_FOUND,
915                            &format!("Stream not found: {}", stream_name),
916                        );
917                        self.send_command(CSID_COMMAND, cmd.stream_id, &status)
918                            .await?;
919                        return Ok(());
920                    }
921                };
922
923                // Store subscription info
924                self.subscribed_to = Some(registry_key.clone());
925                self.frame_rx = Some(rx);
926                self.playback_stream_id = Some(cmd.stream_id);
927
928                if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
929                    stream.start_play(stream_name.clone());
930                }
931
932                // Send StreamBegin
933                self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
934                    .await?;
935
936                // Send onStatus Reset
937                if reset {
938                    let status = Command::on_status(
939                        cmd.stream_id,
940                        "status",
941                        NS_PLAY_RESET,
942                        "Playing and resetting",
943                    );
944                    self.send_command(CSID_COMMAND, cmd.stream_id, &status)
945                        .await?;
946                }
947
948                // Send onStatus Start
949                let status = Command::on_status(
950                    cmd.stream_id,
951                    "status",
952                    NS_PLAY_START,
953                    &format!("Started playing {}", stream_name),
954                );
955                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
956                    .await?;
957
958                // Send catchup frames (sequence headers + GOP)
959                tracing::debug!(
960                    session_id = self.state.id,
961                    stream = %registry_key,
962                    catchup_frames = catchup_frames.len(),
963                    "Sending catchup frames"
964                );
965
966                for frame in catchup_frames {
967                    self.send_broadcast_frame(frame).await?;
968                }
969
970                tracing::info!(
971                    session_id = self.state.id,
972                    stream_name = stream_name,
973                    "Playing started"
974                );
975            }
976            AuthResult::Reject(reason) => {
977                let status =
978                    Command::on_status(cmd.stream_id, "error", NS_PLAY_STREAM_NOT_FOUND, &reason);
979                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
980                    .await?;
981            }
982            AuthResult::Redirect { url: _ } => {
983                // Handle redirect
984            }
985        }
986
987        Ok(())
988    }
989
990    /// Handle closeStream command
991    async fn handle_close_stream(&mut self, cmd: Command) -> Result<()> {
992        if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
993            stream.stop();
994        }
995        Ok(())
996    }
997
998    /// Handle pause command from subscriber
999    async fn handle_pause(&mut self, cmd: Command) -> Result<()> {
1000        let stream_id = match self.playback_stream_id {
1001            Some(id) => id,
1002            None => return Ok(()), // Not in play mode
1003        };
1004
1005        // The pause flag is the first argument (true = pause, false = unpause)
1006        let pause_flag = cmd
1007            .arguments
1008            .first()
1009            .and_then(|v| v.as_bool())
1010            .unwrap_or(true);
1011
1012        if pause_flag {
1013            self.do_pause(stream_id).await
1014        } else {
1015            self.do_unpause(stream_id).await
1016        }
1017    }
1018
1019    /// Pause playback for subscriber
1020    async fn do_pause(&mut self, stream_id: u32) -> Result<()> {
1021        if self.is_paused {
1022            return Ok(()); // Already paused
1023        }
1024
1025        self.is_paused = true;
1026        self.frames_dropped_while_paused = 0;
1027
1028        // Send onStatus(NetStream.Pause.Notify)
1029        let status = Command::on_status(stream_id, "status", NS_PAUSE_NOTIFY, "Playback paused");
1030        self.send_command(CSID_COMMAND, stream_id, &status).await?;
1031
1032        // Send StreamDry to indicate no data coming
1033        self.send_user_control(UserControlEvent::StreamDry(stream_id))
1034            .await?;
1035
1036        // Notify handler
1037        let stream_key = self
1038            .subscribed_to
1039            .as_ref()
1040            .map(|k| k.name.clone())
1041            .unwrap_or_default();
1042        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1043        self.handler.on_pause(&stream_ctx).await;
1044
1045        tracing::info!(session_id = self.state.id, "Subscriber paused");
1046        Ok(())
1047    }
1048
1049    /// Unpause playback for subscriber
1050    async fn do_unpause(&mut self, stream_id: u32) -> Result<()> {
1051        if !self.is_paused {
1052            return Ok(()); // Not paused
1053        }
1054
1055        self.is_paused = false;
1056
1057        // Force keyframe sync for clean video resumption
1058        // Also skip audio to avoid hearing audio while video is frozen
1059        self.subscriber_state = SubscriberState::SkippingToKeyframe;
1060        self.skip_audio_until_keyframe = true;
1061
1062        // Send onStatus(NetStream.Unpause.Notify)
1063        let status = Command::on_status(stream_id, "status", NS_UNPAUSE_NOTIFY, "Playback resumed");
1064        self.send_command(CSID_COMMAND, stream_id, &status).await?;
1065
1066        // Send StreamBegin to indicate data resuming
1067        self.send_user_control(UserControlEvent::StreamBegin(stream_id))
1068            .await?;
1069
1070        // Resend sequence headers to reinitialize decoder
1071        // This is critical for clean playback resumption
1072        if let Some(ref key) = self.subscribed_to {
1073            let headers = self.registry.get_sequence_headers(key).await;
1074            tracing::debug!(
1075                session_id = self.state.id,
1076                header_count = headers.len(),
1077                "Resending sequence headers after unpause"
1078            );
1079            for frame in headers {
1080                match frame.frame_type {
1081                    FrameType::Video => {
1082                        self.send_video(stream_id, frame.timestamp, frame.data)
1083                            .await?;
1084                    }
1085                    FrameType::Audio => {
1086                        self.send_audio(stream_id, frame.timestamp, frame.data)
1087                            .await?;
1088                    }
1089                    _ => {}
1090                }
1091            }
1092            self.writer.flush().await?;
1093        }
1094
1095        // Notify handler
1096        let stream_key = self
1097            .subscribed_to
1098            .as_ref()
1099            .map(|k| k.name.clone())
1100            .unwrap_or_default();
1101        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1102        self.handler.on_unpause(&stream_ctx).await;
1103
1104        tracing::info!(
1105            session_id = self.state.id,
1106            frames_dropped = self.frames_dropped_while_paused,
1107            "Subscriber unpaused (waiting for keyframe)"
1108        );
1109        self.frames_dropped_while_paused = 0;
1110        Ok(())
1111    }
1112
1113    /// Handle data message
1114    async fn handle_data(&mut self, data: DataMessage) -> Result<()> {
1115        match data.name.as_str() {
1116            CMD_SET_DATA_FRAME => {
1117                // @setDataFrame usually has "onMetaData" as first value
1118                if let Some(AmfValue::String(name)) = data.values.first() {
1119                    if name == CMD_ON_METADATA {
1120                        self.handle_metadata(data.stream_id, &data.values[1..])
1121                            .await?;
1122                    }
1123                }
1124            }
1125            CMD_ON_METADATA => {
1126                self.handle_metadata(data.stream_id, &data.values).await?;
1127            }
1128            _ => {
1129                tracing::trace!(name = data.name, "Unknown data message");
1130            }
1131        }
1132        Ok(())
1133    }
1134
1135    /// Handle metadata
1136    async fn handle_metadata(&mut self, stream_id: u32, values: &[AmfValue]) -> Result<()> {
1137        // Extract metadata object
1138        let metadata: HashMap<String, AmfValue> = values
1139            .first()
1140            .and_then(|v| v.as_object().cloned())
1141            .unwrap_or_default();
1142
1143        if let Some(stream) = self.state.get_stream_mut(stream_id) {
1144            stream.on_metadata();
1145        }
1146
1147        if let Some(stream) = self.state.get_stream(stream_id) {
1148            if stream.is_publishing() {
1149                let stream_ctx = StreamContext::new(
1150                    self.context.clone(),
1151                    stream_id,
1152                    stream.stream_key.clone().unwrap_or_default(),
1153                    true,
1154                );
1155                self.handler.on_metadata(&stream_ctx, &metadata).await;
1156            }
1157        }
1158
1159        Ok(())
1160    }
1161
1162    /// Handle audio message
1163    async fn handle_audio(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1164        if data.is_empty() {
1165            return Ok(());
1166        }
1167
1168        if let Some(prev_audio_ts) = self.last_audio_ts {
1169            let timestamp_delta = timestamp - prev_audio_ts;
1170            tracing::trace!(
1171                timestamp = timestamp,
1172                last_audio_ts = prev_audio_ts,
1173                timestamp_delta = timestamp_delta,
1174                "connection handle_audio"
1175            );
1176        }
1177        self.last_audio_ts = Some(timestamp);
1178
1179        // Find publishing stream
1180        let stream_id = self.find_publishing_stream()?;
1181        let stream = self
1182            .state
1183            .get_stream_mut(stream_id)
1184            .ok_or_else(|| ProtocolError::StreamNotFound(stream_id))?;
1185
1186        let is_header = data.len() >= 2 && (data[0] >> 4) == 10 && data[1] == 0;
1187        stream.on_audio(timestamp, is_header, data.len());
1188
1189        // Store sequence header
1190        if is_header {
1191            let tag = FlvTag::audio(timestamp, data.clone());
1192            stream.gop_buffer.set_audio_header(tag);
1193        }
1194
1195        // Create stream context for callbacks
1196        let stream_key = stream.stream_key.clone().unwrap_or_default();
1197        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1198
1199        // Deliver based on mode
1200        let mode = self.handler.media_delivery_mode();
1201
1202        if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1203            let tag = FlvTag::audio(timestamp, data.clone());
1204            self.handler.on_media_tag(&stream_ctx, &tag).await;
1205        }
1206
1207        if matches!(
1208            mode,
1209            MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1210        ) {
1211            if data.len() >= 2 && (data[0] >> 4) == 10 {
1212                // AAC
1213                if let Ok(aac_data) = AacData::parse(data.slice(1..)) {
1214                    self.handler
1215                        .on_audio_frame(&stream_ctx, &aac_data, timestamp)
1216                        .await;
1217                }
1218            }
1219        }
1220
1221        // Broadcast to subscribers via registry
1222        if let Some(ref key) = self.publishing_to {
1223            let frame = BroadcastFrame::audio(timestamp, data, is_header);
1224            self.registry.broadcast(key, frame).await;
1225        }
1226
1227        Ok(())
1228    }
1229
1230    /// Handle video message
1231    async fn handle_video(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1232        if data.is_empty() {
1233            return Ok(());
1234        }
1235
1236        if let Some(prev_video_ts) = self.last_video_ts {
1237            let timestamp_delta = timestamp - prev_video_ts;
1238            tracing::trace!(
1239                timestamp = timestamp,
1240                last_video_ts = prev_video_ts,
1241                timestamp_delta = timestamp_delta,
1242                "connection handle_video"
1243            );
1244        }
1245        self.last_video_ts = Some(timestamp);
1246
1247        // Find publishing stream
1248        let stream_id = self.find_publishing_stream()?;
1249        let stream = self
1250            .state
1251            .get_stream_mut(stream_id)
1252            .ok_or_else(|| ProtocolError::StreamNotFound(stream_id))?;
1253
1254        let is_keyframe = (data[0] >> 4) == 1;
1255        let is_header = data.len() >= 2 && (data[0] & 0x0F) == 7 && data[1] == 0;
1256        stream.on_video(timestamp, is_keyframe, is_header, data.len());
1257
1258        // Create FLV tag
1259        let tag = FlvTag::video(timestamp, data.clone());
1260
1261        // Store sequence header
1262        if is_header {
1263            stream.gop_buffer.set_video_header(tag.clone());
1264        } else {
1265            // Add to GOP buffer
1266            stream.gop_buffer.push(tag.clone());
1267        }
1268
1269        // Create stream context for callbacks
1270        let stream_key = stream.stream_key.clone().unwrap_or_default();
1271        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1272
1273        // Notify keyframe
1274        if is_keyframe && !is_header {
1275            self.handler.on_keyframe(&stream_ctx, timestamp).await;
1276        }
1277
1278        // Deliver based on mode
1279        let mode = self.handler.media_delivery_mode();
1280
1281        if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1282            self.handler.on_media_tag(&stream_ctx, &tag).await;
1283        }
1284
1285        if matches!(
1286            mode,
1287            MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1288        ) {
1289            if data.len() >= 2 && (data[0] & 0x0F) == 7 {
1290                // AVC/H.264
1291                if let Ok(h264_data) = H264Data::parse(data.slice(1..)) {
1292                    self.handler
1293                        .on_video_frame(&stream_ctx, &h264_data, timestamp)
1294                        .await;
1295                }
1296            }
1297        }
1298
1299        // Broadcast to subscribers via registry
1300        if let Some(ref key) = self.publishing_to {
1301            let frame = BroadcastFrame::video(timestamp, data, is_keyframe, is_header);
1302            self.registry.broadcast(key, frame).await;
1303        }
1304
1305        Ok(())
1306    }
1307
1308    /// Find the publishing stream (assumes single publish per connection)
1309    fn find_publishing_stream(&self) -> Result<u32> {
1310        for (id, stream) in &self.state.streams {
1311            if stream.is_publishing() {
1312                return Ok(*id);
1313            }
1314        }
1315        Err(ProtocolError::StreamNotFound(0).into())
1316    }
1317
1318    // === Message sending helpers ===
1319
1320    async fn send_command(&mut self, csid: u32, stream_id: u32, cmd: &Command) -> Result<()> {
1321        let (msg_type, payload) = RtmpMessage::Command(cmd.clone()).encode();
1322
1323        let chunk = RtmpChunk {
1324            csid,
1325            timestamp: 0,
1326            message_type: msg_type,
1327            stream_id,
1328            payload,
1329        };
1330
1331        self.write_buf.clear();
1332        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1333        self.writer.write_all(&self.write_buf).await?;
1334        self.writer.flush().await?;
1335
1336        Ok(())
1337    }
1338
1339    async fn send_connect_result(&mut self, transaction_id: f64) -> Result<()> {
1340        let mut props = HashMap::new();
1341        props.insert(
1342            "fmsVer".to_string(),
1343            AmfValue::String("FMS/3,5,7,7009".into()),
1344        );
1345        props.insert("capabilities".to_string(), AmfValue::Number(31.0));
1346        props.insert("mode".to_string(), AmfValue::Number(1.0));
1347
1348        let mut info = HashMap::new();
1349        info.insert("level".to_string(), AmfValue::String("status".into()));
1350        info.insert(
1351            "code".to_string(),
1352            AmfValue::String(NC_CONNECT_SUCCESS.into()),
1353        );
1354        info.insert(
1355            "description".to_string(),
1356            AmfValue::String("Connection succeeded".into()),
1357        );
1358        info.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
1359
1360        let result = Command::result(
1361            transaction_id,
1362            AmfValue::Object(props),
1363            AmfValue::Object(info),
1364        );
1365
1366        self.send_command(CSID_COMMAND, 0, &result).await
1367    }
1368
1369    async fn send_connect_error(&mut self, transaction_id: f64, reason: &str) -> Result<()> {
1370        let mut info = HashMap::new();
1371        info.insert("level".to_string(), AmfValue::String("error".into()));
1372        info.insert(
1373            "code".to_string(),
1374            AmfValue::String(NC_CONNECT_REJECTED.into()),
1375        );
1376        info.insert("description".to_string(), AmfValue::String(reason.into()));
1377
1378        let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1379
1380        self.send_command(CSID_COMMAND, 0, &error).await
1381    }
1382
1383    async fn send_connect_redirect(&mut self, transaction_id: f64, url: &str) -> Result<()> {
1384        let mut info = HashMap::new();
1385        info.insert("level".to_string(), AmfValue::String("error".into()));
1386        info.insert(
1387            "code".to_string(),
1388            AmfValue::String(NC_CONNECT_REJECTED.into()),
1389        );
1390        info.insert(
1391            "description".to_string(),
1392            AmfValue::String("Redirect".into()),
1393        );
1394        info.insert("ex.redirect".to_string(), AmfValue::String(url.into()));
1395
1396        let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1397
1398        self.send_command(CSID_COMMAND, 0, &error).await
1399    }
1400
1401    async fn send_set_chunk_size(&mut self, size: u32) -> Result<()> {
1402        let (msg_type, payload) = RtmpMessage::SetChunkSize(size).encode();
1403
1404        let chunk = RtmpChunk {
1405            csid: CSID_PROTOCOL_CONTROL,
1406            timestamp: 0,
1407            message_type: msg_type,
1408            stream_id: 0,
1409            payload,
1410        };
1411
1412        self.write_buf.clear();
1413        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1414        self.writer.write_all(&self.write_buf).await?;
1415        self.writer.flush().await?;
1416
1417        Ok(())
1418    }
1419
1420    async fn send_window_ack_size(&mut self, size: u32) -> Result<()> {
1421        let (msg_type, payload) = RtmpMessage::WindowAckSize(size).encode();
1422
1423        let chunk = RtmpChunk {
1424            csid: CSID_PROTOCOL_CONTROL,
1425            timestamp: 0,
1426            message_type: msg_type,
1427            stream_id: 0,
1428            payload,
1429        };
1430
1431        self.write_buf.clear();
1432        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1433        self.writer.write_all(&self.write_buf).await?;
1434        self.writer.flush().await?;
1435
1436        Ok(())
1437    }
1438
1439    async fn send_peer_bandwidth(&mut self, size: u32) -> Result<()> {
1440        let (msg_type, payload) = RtmpMessage::SetPeerBandwidth {
1441            size,
1442            limit_type: BANDWIDTH_LIMIT_DYNAMIC,
1443        }
1444        .encode();
1445
1446        let chunk = RtmpChunk {
1447            csid: CSID_PROTOCOL_CONTROL,
1448            timestamp: 0,
1449            message_type: msg_type,
1450            stream_id: 0,
1451            payload,
1452        };
1453
1454        self.write_buf.clear();
1455        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1456        self.writer.write_all(&self.write_buf).await?;
1457        self.writer.flush().await?;
1458
1459        Ok(())
1460    }
1461
1462    async fn send_user_control(&mut self, event: UserControlEvent) -> Result<()> {
1463        let (msg_type, payload) = RtmpMessage::UserControl(event).encode();
1464
1465        let chunk = RtmpChunk {
1466            csid: CSID_PROTOCOL_CONTROL,
1467            timestamp: 0,
1468            message_type: msg_type,
1469            stream_id: 0,
1470            payload,
1471        };
1472
1473        self.write_buf.clear();
1474        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1475        self.writer.write_all(&self.write_buf).await?;
1476        self.writer.flush().await?;
1477
1478        Ok(())
1479    }
1480
1481    async fn send_acknowledgement(&mut self) -> Result<()> {
1482        let sequence = self.state.bytes_received as u32;
1483
1484        let (msg_type, payload) = RtmpMessage::Acknowledgement { sequence }.encode();
1485
1486        let chunk = RtmpChunk {
1487            csid: CSID_PROTOCOL_CONTROL,
1488            timestamp: 0,
1489            message_type: msg_type,
1490            stream_id: 0,
1491            payload,
1492        };
1493
1494        self.write_buf.clear();
1495        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1496        self.writer.write_all(&self.write_buf).await?;
1497        self.writer.flush().await?;
1498
1499        self.state.mark_ack_sent();
1500        Ok(())
1501    }
1502
1503    async fn send_ping_response(&mut self, timestamp: u32) -> Result<()> {
1504        self.send_user_control(UserControlEvent::PingResponse(timestamp))
1505            .await
1506    }
1507
1508    // === Media sending methods for subscriber mode ===
1509
1510    /// Send a video message to the client
1511    async fn send_video(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1512        let (msg_type, payload) = RtmpMessage::Video {
1513            timestamp,
1514            data: data.clone(),
1515        }
1516        .encode();
1517
1518        let chunk = RtmpChunk {
1519            csid: CSID_VIDEO,
1520            timestamp,
1521            message_type: msg_type,
1522            stream_id,
1523            payload,
1524        };
1525
1526        self.write_buf.clear();
1527        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1528        self.writer.write_all(&self.write_buf).await?;
1529        // Don't flush after every frame - batch writes for efficiency
1530        Ok(())
1531    }
1532
1533    /// Send an audio message to the client
1534    async fn send_audio(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1535        let (msg_type, payload) = RtmpMessage::Audio {
1536            timestamp,
1537            data: data.clone(),
1538        }
1539        .encode();
1540
1541        let chunk = RtmpChunk {
1542            csid: CSID_AUDIO,
1543            timestamp,
1544            message_type: msg_type,
1545            stream_id,
1546            payload,
1547        };
1548
1549        self.write_buf.clear();
1550        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1551        self.writer.write_all(&self.write_buf).await?;
1552        // Don't flush after every frame - batch writes for efficiency
1553        Ok(())
1554    }
1555
1556    /// Send a broadcast frame to the subscriber client
1557    ///
1558    /// Handles backpressure by skipping non-keyframes when in skip mode.
1559    /// Also handles pause state by consuming frames without sending.
1560    async fn send_broadcast_frame(&mut self, frame: BroadcastFrame) -> Result<()> {
1561        let stream_id = self.playback_stream_id.unwrap_or(1);
1562
1563        // PAUSE: Consume frame but don't send
1564        if self.is_paused {
1565            self.frames_dropped_while_paused += 1;
1566            tracing::trace!(session_id = self.state.id, "Frame dropped (paused)");
1567            return Ok(());
1568        }
1569
1570        // Backpressure handling: skip non-keyframes if we're lagging
1571        if self.subscriber_state == SubscriberState::SkippingToKeyframe {
1572            match frame.frame_type {
1573                FrameType::Video => {
1574                    if frame.is_keyframe || frame.is_header {
1575                        // Got a keyframe or header, resume normal operation
1576                        self.subscriber_state = SubscriberState::Normal;
1577                        self.skip_audio_until_keyframe = false;
1578                        tracing::debug!(
1579                            session_id = self.state.id,
1580                            "Received keyframe, resuming normal playback"
1581                        );
1582                    } else {
1583                        // Skip non-keyframe video
1584                        return Ok(());
1585                    }
1586                }
1587                FrameType::Audio => {
1588                    // Skip audio after unpause to avoid audio playing while video frozen
1589                    // But keep audio during lag recovery (glitches worse than brief desync)
1590                    if self.skip_audio_until_keyframe {
1591                        return Ok(());
1592                    }
1593                }
1594                FrameType::Metadata => {
1595                    // Always forward metadata
1596                }
1597            }
1598        }
1599
1600        // Send the frame based on type
1601        match frame.frame_type {
1602            FrameType::Video => {
1603                self.send_video(stream_id, frame.timestamp, frame.data)
1604                    .await?;
1605            }
1606            FrameType::Audio => {
1607                self.send_audio(stream_id, frame.timestamp, frame.data)
1608                    .await?;
1609            }
1610            FrameType::Metadata => {
1611                // Send metadata as data message
1612                self.send_metadata_frame(stream_id, frame.data).await?;
1613            }
1614        }
1615
1616        // Periodically flush to ensure data is sent
1617        // The flush happens less frequently when sending many frames
1618        self.writer.flush().await?;
1619
1620        Ok(())
1621    }
1622
1623    /// Send metadata frame to subscriber
1624    async fn send_metadata_frame(&mut self, stream_id: u32, data: Bytes) -> Result<()> {
1625        // Metadata is sent as a data message with onMetaData
1626        let data_msg = DataMessage {
1627            name: CMD_ON_METADATA.to_string(),
1628            values: vec![AmfValue::String("onMetaData".to_string())],
1629            stream_id,
1630        };
1631
1632        let (msg_type, payload) = RtmpMessage::Data(data_msg).encode();
1633
1634        // If the original data is available, use it directly
1635        // Otherwise, encode the metadata
1636        let chunk = RtmpChunk {
1637            csid: CSID_COMMAND,
1638            timestamp: 0,
1639            message_type: msg_type,
1640            stream_id,
1641            payload: if data.is_empty() { payload } else { data },
1642        };
1643
1644        self.write_buf.clear();
1645        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1646        self.writer.write_all(&self.write_buf).await?;
1647
1648        Ok(())
1649    }
1650}
1651
1652use bytes::Buf;