Skip to main content

rtmp_rs/server/
connection.rs

1//! Per-connection RTMP handler
2//!
3//! Manages the lifecycle of a single RTMP connection:
4//! 1. Handshake
5//! 2. Connect command
6//! 3. Stream commands (publish/play)
7//! 4. Media handling
8//! 5. Disconnect
9
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::Arc;
13
14use bytes::{Bytes, BytesMut};
15use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
16use tokio::net::TcpStream;
17use tokio::sync::broadcast;
18use tokio::time::timeout;
19
20use crate::registry::{BroadcastFrame, FrameType, StreamKey, StreamRegistry};
21
22use crate::amf::AmfValue;
23use crate::error::{Error, ProtocolError, Result};
24use crate::media::enhanced_audio::EnhancedAudioData;
25use crate::media::enhanced_video::EnhancedVideoData;
26use crate::media::flv::FlvTag;
27use crate::media::fourcc::{AudioFourCc, VideoFourCc};
28use crate::media::{AacData, H264Data};
29use crate::protocol::chunk::{ChunkDecoder, ChunkEncoder, RtmpChunk};
30use crate::protocol::constants::*;
31use crate::protocol::enhanced::EnhancedRtmpMode;
32use crate::protocol::handshake::{Handshake, HandshakeRole};
33use crate::protocol::message::{
34    Command, ConnectParams, ConnectResponseBuilder, DataMessage, PlayParams, PublishParams,
35    RtmpMessage, UserControlEvent,
36};
37use crate::protocol::quirks::EncoderType;
38use crate::server::config::ServerConfig;
39use crate::server::handler::{AuthResult, MediaDeliveryMode, RtmpHandler};
40use crate::session::context::{SessionContext, StreamContext};
41use crate::session::state::SessionState;
42
43/// Detected codec for logging purposes.
44#[derive(Debug, Clone, PartialEq, Eq)]
45enum DetectedCodec {
46    /// Legacy video codec (H.264/AVC)
47    LegacyVideo(u8),
48    /// Enhanced video codec (HEVC, AV1, VP9, etc.)
49    EnhancedVideo(VideoFourCc),
50    /// Legacy audio codec (AAC, MP3, etc.)
51    LegacyAudio(u8),
52    /// Enhanced audio codec (Opus, FLAC, AC-3, etc.)
53    EnhancedAudio(AudioFourCc),
54}
55
56impl DetectedCodec {
57    /// Get a human-friendly name for the codec.
58    fn name(&self) -> &'static str {
59        match self {
60            DetectedCodec::LegacyVideo(id) => match id {
61                7 => "H.264/AVC",
62                4 => "VP6",
63                2 => "H.263",
64                _ => "Unknown Video",
65            },
66            DetectedCodec::EnhancedVideo(fourcc) => match fourcc {
67                VideoFourCc::Avc => "H.264/AVC (E-RTMP)",
68                VideoFourCc::Hevc => "H.265/HEVC",
69                VideoFourCc::Av1 => "AV1",
70                VideoFourCc::Vp9 => "VP9",
71                VideoFourCc::Vp8 => "VP8",
72            },
73            DetectedCodec::LegacyAudio(id) => match id {
74                10 => "AAC",
75                2 => "MP3",
76                11 => "Speex",
77                1 => "ADPCM",
78                0 => "PCM",
79                _ => "Unknown Audio",
80            },
81            DetectedCodec::EnhancedAudio(fourcc) => match fourcc {
82                AudioFourCc::Aac => "AAC (E-RTMP)",
83                AudioFourCc::Opus => "Opus",
84                AudioFourCc::Flac => "FLAC",
85                AudioFourCc::Ac3 => "AC-3",
86                AudioFourCc::Eac3 => "E-AC-3",
87                AudioFourCc::Mp3 => "MP3 (E-RTMP)",
88            },
89        }
90    }
91
92    /// Check if this is an enhanced (E-RTMP) codec.
93    fn is_enhanced(&self) -> bool {
94        matches!(
95            self,
96            DetectedCodec::EnhancedVideo(_) | DetectedCodec::EnhancedAudio(_)
97        )
98    }
99}
100
101/// Subscriber state for backpressure handling
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103enum SubscriberState {
104    /// Normal operation
105    Normal,
106    /// Skipping frames until next keyframe (due to lag)
107    SkippingToKeyframe,
108}
109
110/// Per-connection handler
111pub struct Connection<H: RtmpHandler> {
112    /// Session state
113    state: SessionState,
114
115    /// Session context for callbacks
116    context: SessionContext,
117
118    /// TCP stream (buffered)
119    reader: BufReader<tokio::io::ReadHalf<TcpStream>>,
120    writer: BufWriter<tokio::io::WriteHalf<TcpStream>>,
121
122    /// Read buffer
123    read_buf: BytesMut,
124
125    /// Chunk decoder/encoder
126    chunk_decoder: ChunkDecoder,
127    chunk_encoder: ChunkEncoder,
128
129    /// Write buffer for outgoing chunks
130    write_buf: BytesMut,
131
132    /// Server configuration
133    config: ServerConfig,
134
135    /// Application handler
136    handler: Arc<H>,
137
138    /// Stream registry for pub/sub routing
139    registry: Arc<StreamRegistry>,
140
141    /// Pending FC commands (stream key -> transaction ID)
142    pending_fc: HashMap<String, f64>,
143
144    /// Stream key we are publishing to (if any)
145    publishing_to: Option<StreamKey>,
146
147    /// Stream key we are subscribed to (if any)
148    subscribed_to: Option<StreamKey>,
149
150    last_audio_ts: Option<u32>,
151
152    last_video_ts: Option<u32>,
153
154    /// Detected video codec
155    detected_video_codec: Option<DetectedCodec>,
156
157    /// Detected audio codec
158    detected_audio_codec: Option<DetectedCodec>,
159
160    /// Broadcast receiver for subscriber mode
161    frame_rx: Option<broadcast::Receiver<BroadcastFrame>>,
162
163    /// Subscriber state for backpressure handling
164    subscriber_state: SubscriberState,
165
166    /// Count of consecutive lag events (for disconnection threshold)
167    consecutive_lag_count: u32,
168
169    /// RTMP stream ID we're using for playback (subscriber mode)
170    playback_stream_id: Option<u32>,
171
172    /// Whether the subscriber is paused
173    is_paused: bool,
174
175    /// Frames dropped while paused (for logging)
176    frames_dropped_while_paused: u64,
177
178    /// Skip audio until keyframe (set on unpause, cleared on keyframe)
179    /// This prevents the jarring experience of audio playing while video is frozen
180    skip_audio_until_keyframe: bool,
181}
182
183impl<H: RtmpHandler> Connection<H> {
184    /// Create a new connection handler
185    pub fn new(
186        session_id: u64,
187        socket: TcpStream,
188        peer_addr: SocketAddr,
189        config: ServerConfig,
190        handler: Arc<H>,
191        registry: Arc<StreamRegistry>,
192    ) -> Self {
193        let (read_half, write_half) = tokio::io::split(socket);
194
195        Self {
196            state: SessionState::new(session_id, peer_addr),
197            context: SessionContext::new(session_id, peer_addr),
198            reader: BufReader::with_capacity(config.read_buffer_size, read_half),
199            writer: BufWriter::with_capacity(config.write_buffer_size, write_half),
200            read_buf: BytesMut::with_capacity(config.read_buffer_size),
201            chunk_decoder: ChunkDecoder::new(),
202            chunk_encoder: ChunkEncoder::new(),
203            write_buf: BytesMut::with_capacity(config.write_buffer_size),
204            config,
205            handler,
206            registry,
207            pending_fc: HashMap::new(),
208            publishing_to: None,
209            subscribed_to: None,
210            last_audio_ts: None,
211            last_video_ts: None,
212            detected_video_codec: None,
213            detected_audio_codec: None,
214            frame_rx: None,
215            subscriber_state: SubscriberState::Normal,
216            consecutive_lag_count: 0,
217            playback_stream_id: None,
218            is_paused: false,
219            frames_dropped_while_paused: 0,
220            skip_audio_until_keyframe: false,
221        }
222    }
223
224    /// Run the connection
225    pub async fn run(&mut self) -> Result<()> {
226        // Check if handler allows connection
227        if !self.handler.on_connection(&self.context).await {
228            return Err(Error::Rejected("Connection rejected by handler".into()));
229        }
230
231        // Perform handshake
232        self.do_handshake().await?;
233        self.handler.on_handshake_complete(&self.context).await;
234
235        // Set our chunk size
236        tracing::debug!(
237            session_id = self.state.id,
238            chunk_size = self.config.chunk_size,
239            "Sending set chunk size"
240        );
241        self.send_set_chunk_size(self.config.chunk_size).await?;
242        self.chunk_encoder.set_chunk_size(self.config.chunk_size);
243
244        tracing::debug!(session_id = self.state.id, "Entering main message loop");
245
246        // Main message loop
247        let idle_timeout = self.config.idle_timeout;
248        let result = loop {
249            // Handle subscriber mode: take frame_rx out to avoid borrow conflicts
250            let mut frame_rx = self.frame_rx.take();
251
252            // Use select! to handle both TCP input and broadcast frames
253            let loop_result = if let Some(ref mut rx) = frame_rx {
254                // Subscriber mode: listen for both TCP and broadcast frames
255                tokio::select! {
256                    biased;
257
258                    // Receive broadcast frames for subscribers (higher priority)
259                    frame_result = rx.recv() => {
260                        match frame_result {
261                            Ok(frame) => {
262                                // Put receiver back before processing
263                                self.frame_rx = frame_rx;
264                                // Reset lag count on successful receive
265                                self.consecutive_lag_count = 0;
266                                if let Err(e) = self.send_broadcast_frame(frame).await {
267                                    tracing::debug!(error = %e, "Failed to send frame");
268                                    Err(e)
269                                } else {
270                                    Ok(true)
271                                }
272                            }
273                            Err(broadcast::error::RecvError::Lagged(n)) => {
274                                self.frame_rx = frame_rx;
275                                self.handle_lag(n).await.map(|_| true)
276                            }
277                            Err(broadcast::error::RecvError::Closed) => {
278                                self.frame_rx = frame_rx;
279                                // Publisher ended, notify subscriber
280                                if let Err(e) = self.handle_stream_ended().await {
281                                    tracing::debug!(error = %e, "Error handling stream end");
282                                }
283                                Ok(false) // Signal to exit loop
284                            }
285                        }
286                    }
287
288                    // Read from TCP
289                    result = timeout(idle_timeout, self.read_and_process()) => {
290                        self.frame_rx = frame_rx;
291                        match result {
292                            Ok(Ok(continue_loop)) => Ok(continue_loop),
293                            Ok(Err(e)) => {
294                                tracing::debug!(error = %e, "Processing error");
295                                Err(e)
296                            }
297                            Err(_) => {
298                                tracing::debug!("Idle timeout");
299                                Ok(false)
300                            }
301                        }
302                    }
303                }
304            } else {
305                // Publisher mode: only listen for TCP
306                self.frame_rx = frame_rx;
307                match timeout(idle_timeout, self.read_and_process()).await {
308                    Ok(Ok(continue_loop)) => Ok(continue_loop),
309                    Ok(Err(e)) => {
310                        tracing::debug!(error = %e, "Processing error");
311                        Err(e)
312                    }
313                    Err(_) => {
314                        tracing::debug!("Idle timeout");
315                        Ok(false)
316                    }
317                }
318            };
319
320            match loop_result {
321                Ok(true) => continue,
322                Ok(false) => break Ok(()),
323                Err(e) => break Err(e),
324            }
325        };
326
327        // Cleanup: unregister publisher or unsubscribe
328        self.cleanup_on_disconnect().await;
329
330        // Notify handler of disconnect
331        self.handler.on_disconnect(&self.context).await;
332
333        result
334    }
335
336    /// Cleanup when connection disconnects
337    async fn cleanup_on_disconnect(&mut self) {
338        // Unregister as publisher if we were publishing
339        if let Some(ref key) = self.publishing_to {
340            self.registry.unregister_publisher(key, self.state.id).await;
341            tracing::debug!(
342                session_id = self.state.id,
343                stream = %key,
344                "Unregistered publisher on disconnect"
345            );
346        }
347
348        // Unsubscribe if we were subscribed
349        if let Some(ref key) = self.subscribed_to {
350            self.registry.unsubscribe(key).await;
351            tracing::debug!(
352                session_id = self.state.id,
353                stream = %key,
354                "Unsubscribed on disconnect"
355            );
356        }
357    }
358
359    /// Handle lag event from broadcast channel
360    async fn handle_lag(&mut self, skipped: u64) -> Result<()> {
361        let config = self.registry.config();
362
363        self.consecutive_lag_count += 1;
364
365        if skipped < config.lag_threshold_low {
366            // Minor lag, continue normally
367            tracing::debug!(
368                session_id = self.state.id,
369                skipped = skipped,
370                "Minor broadcast lag, continuing"
371            );
372            return Ok(());
373        }
374
375        // Significant lag - enter skip mode
376        if self.subscriber_state != SubscriberState::SkippingToKeyframe {
377            self.subscriber_state = SubscriberState::SkippingToKeyframe;
378            tracing::warn!(
379                session_id = self.state.id,
380                skipped = skipped,
381                "Subscriber lagging, skipping to next keyframe"
382            );
383        }
384
385        // Check if we should disconnect slow subscriber
386        if self.consecutive_lag_count >= config.max_consecutive_lag_events {
387            tracing::warn!(
388                session_id = self.state.id,
389                consecutive_lags = self.consecutive_lag_count,
390                "Disconnecting slow subscriber"
391            );
392            return Err(Error::Rejected("Subscriber too slow".into()));
393        }
394
395        Ok(())
396    }
397
398    /// Handle stream ended (publisher closed broadcast channel)
399    async fn handle_stream_ended(&mut self) -> Result<()> {
400        // Reset pause state on stream end
401        if self.is_paused {
402            self.is_paused = false;
403        }
404
405        if let Some(stream_id) = self.playback_stream_id {
406            // Send StreamEOF
407            self.send_user_control(UserControlEvent::StreamEof(stream_id))
408                .await?;
409
410            // Send onStatus
411            let status = Command::on_status(stream_id, "status", NS_PLAY_STOP, "Stream ended");
412            self.send_command(CSID_COMMAND, stream_id, &status).await?;
413
414            tracing::info!(
415                session_id = self.state.id,
416                stream_id = stream_id,
417                "Stream ended, notified subscriber"
418            );
419        }
420
421        Ok(())
422    }
423
424    /// Perform RTMP handshake
425    async fn do_handshake(&mut self) -> Result<()> {
426        let mut handshake = Handshake::new(HandshakeRole::Server);
427
428        // Move to waiting state
429        handshake.generate_initial();
430        self.state.start_handshake();
431
432        // Wait for C0C1
433        let connection_timeout = self.config.connection_timeout;
434        timeout(connection_timeout, async {
435            loop {
436                // Read more data
437                let bytes_needed = handshake.bytes_needed();
438                if bytes_needed > 0 && self.read_buf.len() < bytes_needed {
439                    let n = self.reader.read_buf(&mut self.read_buf).await?;
440                    if n == 0 {
441                        return Err(Error::ConnectionClosed);
442                    }
443                }
444
445                // Process handshake
446                let mut buf = Bytes::copy_from_slice(&self.read_buf);
447                let response = handshake.process(&mut buf)?;
448
449                // Always consume processed bytes (even if no response to send)
450                let consumed = self.read_buf.len() - buf.len();
451                if consumed > 0 {
452                    self.read_buf.advance(consumed);
453                }
454
455                // Send response if any (S0S1S2 for C0C1, nothing for C2)
456                if let Some(data) = response {
457                    self.writer.write_all(&data).await?;
458                    self.writer.flush().await?;
459                }
460
461                if handshake.is_done() {
462                    break;
463                }
464            }
465
466            Ok::<_, Error>(())
467        })
468        .await
469        .map_err(|_| Error::Timeout)??;
470
471        self.state.complete_handshake();
472        tracing::debug!(
473            session_id = self.state.id,
474            remaining_buf = self.read_buf.len(),
475            "Handshake complete"
476        );
477
478        // Debug: dump first 32 bytes of remaining buffer
479        if !self.read_buf.is_empty() {
480            let dump_len = self.read_buf.len().min(32);
481            let hex: String = self.read_buf[..dump_len]
482                .iter()
483                .map(|b| format!("{:02x}", b))
484                .collect::<Vec<_>>()
485                .join(" ");
486            tracing::debug!(
487                session_id = self.state.id,
488                first_bytes = %hex,
489                "Buffer after handshake"
490            );
491        }
492
493        Ok(())
494    }
495
496    /// Read data and process messages
497    async fn read_and_process(&mut self) -> Result<bool> {
498        tracing::trace!(
499            session_id = self.state.id,
500            buf_len = self.read_buf.len(),
501            "read_and_process called"
502        );
503
504        // Keep trying to decode until we need more data
505        // This is important for multi-chunk messages where multiple chunks
506        // may be in the buffer but only the last one completes the message
507        loop {
508            let buf_len_before = self.read_buf.len();
509
510            // Try to decode a complete message
511            if let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
512                tracing::trace!(
513                    session_id = self.state.id,
514                    csid = chunk.csid,
515                    msg_type = chunk.message_type,
516                    "Decoded chunk from buffer"
517                );
518                self.handle_chunk(chunk).await?;
519                // Continue to try decoding more messages
520                continue;
521            }
522
523            // decode() returned None - check if it consumed any data
524            // If it did, there's a partial chunk being assembled, keep trying
525            let buf_len_after = self.read_buf.len();
526            if buf_len_after < buf_len_before {
527                // Some data was consumed (partial chunk), try again
528                continue;
529            }
530
531            // No progress - either buffer is empty or we need more data
532            break;
533        }
534
535        tracing::trace!(
536            session_id = self.state.id,
537            buf_len = self.read_buf.len(),
538            "Waiting for more data"
539        );
540
541        // Wait for more data from the socket
542        let n = self.reader.read_buf(&mut self.read_buf).await?;
543        if n == 0 {
544            return Ok(false); // Connection closed
545        }
546
547        tracing::trace!(
548            session_id = self.state.id,
549            bytes_read = n,
550            buf_len = self.read_buf.len(),
551            "Read data from socket"
552        );
553
554        let needs_ack = self.state.add_bytes_received(n as u64);
555
556        // Process any complete messages with the new data
557        while let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
558            tracing::trace!(
559                session_id = self.state.id,
560                csid = chunk.csid,
561                msg_type = chunk.message_type,
562                "Decoded chunk after read"
563            );
564            self.handle_chunk(chunk).await?;
565        }
566
567        // Send acknowledgement if needed
568        if needs_ack {
569            self.send_acknowledgement().await?;
570        }
571
572        Ok(true)
573    }
574
575    /// Handle a decoded chunk
576    async fn handle_chunk(&mut self, chunk: RtmpChunk) -> Result<()> {
577        let message = RtmpMessage::from_chunk(&chunk)?;
578
579        match message {
580            RtmpMessage::SetChunkSize(size) => {
581                tracing::debug!(size = size, "Peer set chunk size");
582                self.chunk_decoder.set_chunk_size(size);
583                self.state.in_chunk_size = size;
584            }
585
586            RtmpMessage::Abort { csid } => {
587                self.chunk_decoder.abort(csid);
588            }
589
590            RtmpMessage::Acknowledgement { sequence: _ } => {
591                // Peer acknowledged bytes - we can track this for flow control
592            }
593
594            RtmpMessage::WindowAckSize(size) => {
595                self.state.window_ack_size = size;
596            }
597
598            RtmpMessage::SetPeerBandwidth {
599                size,
600                limit_type: _,
601            } => {
602                // Send window ack size back
603                self.send_window_ack_size(size).await?;
604            }
605
606            RtmpMessage::UserControl(event) => {
607                self.handle_user_control(event).await?;
608            }
609
610            RtmpMessage::Command(cmd) | RtmpMessage::CommandAmf3(cmd) => {
611                self.handle_command(cmd).await?;
612            }
613
614            RtmpMessage::Data(data) | RtmpMessage::DataAmf3(data) => {
615                self.handle_data(data).await?;
616            }
617
618            RtmpMessage::Audio { timestamp, data } => {
619                self.handle_audio(timestamp, data).await?;
620            }
621
622            RtmpMessage::Video { timestamp, data } => {
623                self.handle_video(timestamp, data).await?;
624            }
625
626            _ => {
627                tracing::trace!(message = ?message, "Unhandled message");
628            }
629        }
630
631        Ok(())
632    }
633
634    /// Handle user control event
635    async fn handle_user_control(&mut self, event: UserControlEvent) -> Result<()> {
636        match event {
637            UserControlEvent::PingRequest(timestamp) => {
638                self.send_ping_response(timestamp).await?;
639            }
640            UserControlEvent::SetBufferLength {
641                stream_id: _,
642                buffer_ms: _,
643            } => {
644                // Client's buffer length - we can use this for flow control
645            }
646            _ => {}
647        }
648        Ok(())
649    }
650
651    /// Handle command message
652    async fn handle_command(&mut self, cmd: Command) -> Result<()> {
653        tracing::debug!(
654            session_id = self.state.id,
655            command = cmd.name,
656            transaction_id = cmd.transaction_id,
657            stream_id = cmd.stream_id,
658            args = ?cmd.arguments,
659            "Received command"
660        );
661        match cmd.name.as_str() {
662            CMD_CONNECT => self.handle_connect(cmd).await?,
663            CMD_CREATE_STREAM => self.handle_create_stream(cmd).await?,
664            CMD_DELETE_STREAM => self.handle_delete_stream(cmd).await?,
665            CMD_PUBLISH => self.handle_publish(cmd).await?,
666            CMD_PLAY => self.handle_play(cmd).await?,
667            CMD_FC_PUBLISH => self.handle_fc_publish(cmd).await?,
668            CMD_FC_UNPUBLISH => self.handle_fc_unpublish(cmd).await?,
669            CMD_RELEASE_STREAM => self.handle_release_stream(cmd).await?,
670            CMD_PAUSE => self.handle_pause(cmd).await?,
671            CMD_CLOSE | "closeStream" => self.handle_close_stream(cmd).await?,
672            _ => {
673                tracing::debug!(command = cmd.name, "Unknown command");
674            }
675        }
676        Ok(())
677    }
678
679    /// Handle connect command
680    async fn handle_connect(&mut self, cmd: Command) -> Result<()> {
681        let params = ConnectParams::from_amf(&cmd.command_object);
682        let encoder_type = params
683            .flash_ver
684            .as_deref()
685            .map(EncoderType::from_flash_ver)
686            .unwrap_or(EncoderType::Unknown);
687
688        // E-RTMP capability negotiation
689        let client_has_ertmp = params.has_enhanced_rtmp();
690        let negotiated_caps = self.negotiate_enhanced_rtmp(&params)?;
691
692        // Call handler
693        let result = self.handler.on_connect(&self.context, &params).await;
694
695        match result {
696            AuthResult::Accept => {
697                self.state.on_connect(params.clone(), encoder_type);
698                self.context.with_connect(params, encoder_type);
699
700                // Store negotiated E-RTMP capabilities
701                if let Some(ref caps) = negotiated_caps {
702                    self.context.with_enhanced_capabilities(caps.clone());
703                }
704
705                // Send window ack size
706                self.send_window_ack_size(self.config.window_ack_size)
707                    .await?;
708
709                // Send peer bandwidth
710                self.send_peer_bandwidth(self.config.peer_bandwidth).await?;
711
712                // Send stream begin
713                self.send_user_control(UserControlEvent::StreamBegin(0))
714                    .await?;
715
716                // Send connect result with E-RTMP capabilities if negotiated
717                self.send_connect_result_with_ertmp(cmd.transaction_id, negotiated_caps.as_ref())
718                    .await?;
719
720                tracing::info!(
721                    session_id = self.state.id,
722                    app = self.context.app,
723                    enhanced_rtmp = client_has_ertmp
724                        && negotiated_caps.as_ref().map(|c| c.enabled).unwrap_or(false),
725                    "Connected"
726                );
727            }
728            AuthResult::Reject(reason) => {
729                self.send_connect_error(cmd.transaction_id, &reason).await?;
730                return Err(Error::Rejected(reason));
731            }
732            AuthResult::Redirect { url } => {
733                self.send_connect_redirect(cmd.transaction_id, &url).await?;
734                return Err(Error::Rejected(format!("Redirected to {}", url)));
735            }
736        }
737
738        Ok(())
739    }
740
741    /// Negotiate E-RTMP capabilities with client.
742    ///
743    /// Returns the negotiated capabilities if E-RTMP is active, or None for legacy mode.
744    fn negotiate_enhanced_rtmp(
745        &self,
746        params: &ConnectParams,
747    ) -> Result<Option<crate::protocol::enhanced::EnhancedCapabilities>> {
748        let client_has_ertmp = params.has_enhanced_rtmp();
749
750        match self.config.enhanced_rtmp {
751            EnhancedRtmpMode::LegacyOnly => {
752                // Server configured for legacy only - ignore client E-RTMP fields
753                tracing::debug!(
754                    session_id = self.state.id,
755                    "E-RTMP disabled (LegacyOnly mode)"
756                );
757                Ok(None)
758            }
759            EnhancedRtmpMode::EnhancedOnly => {
760                if !client_has_ertmp {
761                    // Client doesn't support E-RTMP but server requires it
762                    tracing::warn!(
763                        session_id = self.state.id,
764                        "Rejecting legacy client (EnhancedOnly mode)"
765                    );
766                    return Err(Error::Rejected(
767                        "Server requires Enhanced RTMP support".into(),
768                    ));
769                }
770
771                // Negotiate capabilities
772                let server_caps = self.config.enhanced_capabilities.to_enhanced_capabilities();
773                let client_caps = params.to_enhanced_capabilities();
774                let negotiated = server_caps.intersect(&client_caps);
775
776                tracing::debug!(
777                    session_id = self.state.id,
778                    video_codecs = negotiated.video_codecs.len(),
779                    audio_codecs = negotiated.audio_codecs.len(),
780                    caps_ex = negotiated.caps_ex.bits(),
781                    "E-RTMP negotiated (EnhancedOnly mode)"
782                );
783
784                Ok(Some(negotiated))
785            }
786            EnhancedRtmpMode::Auto => {
787                if !client_has_ertmp {
788                    // Client doesn't support E-RTMP, use legacy mode
789                    tracing::debug!(
790                        session_id = self.state.id,
791                        "Using legacy RTMP (client doesn't support E-RTMP). The client may still send enhanced audio/video"
792                    );
793                    return Ok(None);
794                }
795
796                // Negotiate capabilities
797                let server_caps = self.config.enhanced_capabilities.to_enhanced_capabilities();
798                let client_caps = params.to_enhanced_capabilities();
799                let negotiated = server_caps.intersect(&client_caps);
800
801                if !negotiated.enabled {
802                    // No common capabilities, fall back to legacy
803                    tracing::debug!(
804                        session_id = self.state.id,
805                        "Falling back to legacy RTMP (no common E-RTMP capabilities)"
806                    );
807                    return Ok(None);
808                }
809
810                tracing::debug!(
811                    session_id = self.state.id,
812                    video_codecs = negotiated.video_codecs.len(),
813                    audio_codecs = negotiated.audio_codecs.len(),
814                    caps_ex = negotiated.caps_ex.bits(),
815                    "E-RTMP negotiated (Auto mode)"
816                );
817
818                Ok(Some(negotiated))
819            }
820        }
821    }
822
823    /// Handle createStream command
824    async fn handle_create_stream(&mut self, cmd: Command) -> Result<()> {
825        let stream_id = self.state.allocate_stream_id();
826
827        let result = Command::result(
828            cmd.transaction_id,
829            AmfValue::Null,
830            AmfValue::Number(stream_id as f64),
831        );
832
833        self.send_command(CSID_COMMAND, 0, &result).await?;
834
835        tracing::debug!(stream_id = stream_id, "Stream created");
836        Ok(())
837    }
838
839    /// Handle deleteStream command
840    async fn handle_delete_stream(&mut self, cmd: Command) -> Result<()> {
841        let stream_id = cmd
842            .arguments
843            .first()
844            .and_then(|v| v.as_number())
845            .unwrap_or(0.0) as u32;
846
847        if let Some(stream) = self.state.remove_stream(stream_id) {
848            if stream.is_publishing() {
849                let stream_ctx = StreamContext::new(
850                    self.context.clone(),
851                    stream_id,
852                    stream.stream_key.unwrap_or_default(),
853                    true,
854                );
855                #[allow(deprecated)]
856                self.handler.on_publish_stop(&stream_ctx).await;
857                self.handler.on_unpublish(&stream_ctx).await;
858            }
859        }
860
861        Ok(())
862    }
863
864    /// Handle FCPublish command (OBS/Twitch compatibility)
865    async fn handle_fc_publish(&mut self, cmd: Command) -> Result<()> {
866        let stream_key = cmd
867            .arguments
868            .first()
869            .and_then(|v| v.as_str())
870            .unwrap_or("")
871            .to_string();
872
873        let result = self.handler.on_fc_publish(&self.context, &stream_key).await;
874
875        match result {
876            AuthResult::Accept => {
877                // Store for later publish command
878                self.pending_fc
879                    .insert(stream_key.clone(), cmd.transaction_id);
880
881                // Send onFCPublish response
882                let response = Command {
883                    name: CMD_ON_FC_PUBLISH.to_string(),
884                    transaction_id: 0.0,
885                    command_object: AmfValue::Null,
886                    arguments: vec![],
887                    stream_id: 0,
888                };
889                self.send_command(CSID_COMMAND, 0, &response).await?;
890            }
891            AuthResult::Reject(reason) => {
892                return Err(Error::Rejected(reason));
893            }
894            AuthResult::Redirect { url } => {
895                return Err(Error::Rejected(format!("Redirected to {}", url)));
896            }
897        }
898
899        Ok(())
900    }
901
902    /// Handle FCUnpublish command
903    async fn handle_fc_unpublish(&mut self, cmd: Command) -> Result<()> {
904        let stream_key = cmd.arguments.first().and_then(|v| v.as_str()).unwrap_or("");
905
906        self.pending_fc.remove(stream_key);
907
908        // Send onFCUnpublish response
909        let response = Command {
910            name: CMD_ON_FC_UNPUBLISH.to_string(),
911            transaction_id: 0.0,
912            command_object: AmfValue::Null,
913            arguments: vec![],
914            stream_id: 0,
915        };
916        self.send_command(CSID_COMMAND, 0, &response).await?;
917
918        Ok(())
919    }
920
921    /// Handle releaseStream command
922    async fn handle_release_stream(&mut self, _cmd: Command) -> Result<()> {
923        // No response needed, this is just cleanup notification
924        Ok(())
925    }
926
927    /// Handle publish command
928    async fn handle_publish(&mut self, cmd: Command) -> Result<()> {
929        let stream_key = cmd
930            .arguments
931            .first()
932            .and_then(|v| v.as_str())
933            .unwrap_or("")
934            .to_string();
935
936        let publish_type = cmd
937            .arguments
938            .get(1)
939            .and_then(|v| v.as_str())
940            .unwrap_or("live")
941            .to_string();
942
943        let params = PublishParams {
944            stream_key: stream_key.clone(),
945            publish_type: publish_type.clone(),
946            stream_id: cmd.stream_id,
947        };
948
949        let result = self.handler.on_publish(&self.context, &params).await;
950
951        match result {
952            AuthResult::Accept => {
953                // Create stream key for registry
954                let app = self.context.app.clone();
955                let registry_key = StreamKey::new(&app, &stream_key);
956
957                // Register as publisher in the registry
958                if let Err(e) = self
959                    .registry
960                    .register_publisher(&registry_key, self.state.id)
961                    .await
962                {
963                    tracing::warn!(
964                        session_id = self.state.id,
965                        stream = %registry_key,
966                        error = %e,
967                        "Failed to register publisher"
968                    );
969                    let status = Command::on_status(
970                        cmd.stream_id,
971                        "error",
972                        NS_PUBLISH_BAD_NAME,
973                        &format!("Stream already publishing: {}", e),
974                    );
975                    self.send_command(CSID_COMMAND, cmd.stream_id, &status)
976                        .await?;
977                    return Err(Error::Rejected(format!("Stream already publishing: {}", e)));
978                }
979
980                // Track that we're publishing to this stream
981                self.publishing_to = Some(registry_key);
982
983                // Update stream state
984                if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
985                    stream.start_publish(stream_key.clone(), publish_type);
986                }
987
988                // Send StreamBegin
989                self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
990                    .await?;
991
992                // Send onStatus
993                let status = Command::on_status(
994                    cmd.stream_id,
995                    "status",
996                    NS_PUBLISH_START,
997                    &format!("{} is now published", stream_key),
998                );
999                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1000                    .await?;
1001
1002                tracing::info!(
1003                    session_id = self.state.id,
1004                    stream_key = stream_key,
1005                    "Publishing started"
1006                );
1007            }
1008            AuthResult::Reject(reason) => {
1009                let status =
1010                    Command::on_status(cmd.stream_id, "error", NS_PUBLISH_BAD_NAME, &reason);
1011                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1012                    .await?;
1013                return Err(Error::Rejected(reason));
1014            }
1015            AuthResult::Redirect { url } => {
1016                return Err(Error::Rejected(format!("Redirected to {}", url)));
1017            }
1018        }
1019
1020        Ok(())
1021    }
1022
1023    /// Handle play command
1024    async fn handle_play(&mut self, cmd: Command) -> Result<()> {
1025        let stream_name = cmd
1026            .arguments
1027            .first()
1028            .and_then(|v| v.as_str())
1029            .unwrap_or("")
1030            .to_string();
1031
1032        let start = cmd
1033            .arguments
1034            .get(1)
1035            .and_then(|v| v.as_number())
1036            .unwrap_or(-2.0);
1037
1038        let duration = cmd
1039            .arguments
1040            .get(2)
1041            .and_then(|v| v.as_number())
1042            .unwrap_or(-1.0);
1043
1044        let reset = cmd
1045            .arguments
1046            .get(3)
1047            .and_then(|v| v.as_bool())
1048            .unwrap_or(true);
1049
1050        let params = PlayParams {
1051            stream_name: stream_name.clone(),
1052            start,
1053            duration,
1054            reset,
1055            stream_id: cmd.stream_id,
1056        };
1057
1058        let result = self.handler.on_play(&self.context, &params).await;
1059
1060        match result {
1061            AuthResult::Accept => {
1062                // Create stream key for registry
1063                let app = self.context.app.clone();
1064                let registry_key = StreamKey::new(&app, &stream_name);
1065
1066                // Subscribe to the stream in registry
1067                let (rx, catchup_frames) = match self.registry.subscribe(&registry_key).await {
1068                    Ok(result) => result,
1069                    Err(e) => {
1070                        tracing::debug!(
1071                            session_id = self.state.id,
1072                            stream = %registry_key,
1073                            error = %e,
1074                            "Stream not found for play"
1075                        );
1076                        let status = Command::on_status(
1077                            cmd.stream_id,
1078                            "error",
1079                            NS_PLAY_STREAM_NOT_FOUND,
1080                            &format!("Stream not found: {}", stream_name),
1081                        );
1082                        self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1083                            .await?;
1084                        return Ok(());
1085                    }
1086                };
1087
1088                // Store subscription info
1089                self.subscribed_to = Some(registry_key.clone());
1090                self.frame_rx = Some(rx);
1091                self.playback_stream_id = Some(cmd.stream_id);
1092
1093                if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
1094                    stream.start_play(stream_name.clone());
1095                }
1096
1097                // Send StreamBegin
1098                self.send_user_control(UserControlEvent::StreamBegin(cmd.stream_id))
1099                    .await?;
1100
1101                // Send onStatus Reset
1102                if reset {
1103                    let status = Command::on_status(
1104                        cmd.stream_id,
1105                        "status",
1106                        NS_PLAY_RESET,
1107                        "Playing and resetting",
1108                    );
1109                    self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1110                        .await?;
1111                }
1112
1113                // Send onStatus Start
1114                let status = Command::on_status(
1115                    cmd.stream_id,
1116                    "status",
1117                    NS_PLAY_START,
1118                    &format!("Started playing {}", stream_name),
1119                );
1120                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1121                    .await?;
1122
1123                // Send catchup frames (sequence headers + GOP)
1124                tracing::debug!(
1125                    session_id = self.state.id,
1126                    stream = %registry_key,
1127                    catchup_frames = catchup_frames.len(),
1128                    "Sending catchup frames"
1129                );
1130
1131                for frame in catchup_frames {
1132                    self.send_broadcast_frame(frame).await?;
1133                }
1134
1135                tracing::info!(
1136                    session_id = self.state.id,
1137                    stream_name = stream_name,
1138                    "Playing started"
1139                );
1140            }
1141            AuthResult::Reject(reason) => {
1142                let status =
1143                    Command::on_status(cmd.stream_id, "error", NS_PLAY_STREAM_NOT_FOUND, &reason);
1144                self.send_command(CSID_COMMAND, cmd.stream_id, &status)
1145                    .await?;
1146            }
1147            AuthResult::Redirect { url: _ } => {
1148                // Handle redirect
1149            }
1150        }
1151
1152        Ok(())
1153    }
1154
1155    /// Handle closeStream command
1156    async fn handle_close_stream(&mut self, cmd: Command) -> Result<()> {
1157        if let Some(stream) = self.state.get_stream_mut(cmd.stream_id) {
1158            stream.stop();
1159        }
1160        Ok(())
1161    }
1162
1163    /// Handle pause command from subscriber
1164    async fn handle_pause(&mut self, cmd: Command) -> Result<()> {
1165        let stream_id = match self.playback_stream_id {
1166            Some(id) => id,
1167            None => return Ok(()), // Not in play mode
1168        };
1169
1170        // The pause flag is the first argument (true = pause, false = unpause)
1171        let pause_flag = cmd
1172            .arguments
1173            .first()
1174            .and_then(|v| v.as_bool())
1175            .unwrap_or(true);
1176
1177        if pause_flag {
1178            self.do_pause(stream_id).await
1179        } else {
1180            self.do_unpause(stream_id).await
1181        }
1182    }
1183
1184    /// Pause playback for subscriber
1185    async fn do_pause(&mut self, stream_id: u32) -> Result<()> {
1186        if self.is_paused {
1187            return Ok(()); // Already paused
1188        }
1189
1190        self.is_paused = true;
1191        self.frames_dropped_while_paused = 0;
1192
1193        // Send onStatus(NetStream.Pause.Notify)
1194        let status = Command::on_status(stream_id, "status", NS_PAUSE_NOTIFY, "Playback paused");
1195        self.send_command(CSID_COMMAND, stream_id, &status).await?;
1196
1197        // Send StreamDry to indicate no data coming
1198        self.send_user_control(UserControlEvent::StreamDry(stream_id))
1199            .await?;
1200
1201        // Notify handler
1202        let stream_key = self
1203            .subscribed_to
1204            .as_ref()
1205            .map(|k| k.name.clone())
1206            .unwrap_or_default();
1207        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1208        self.handler.on_pause(&stream_ctx).await;
1209
1210        tracing::info!(session_id = self.state.id, "Subscriber paused");
1211        Ok(())
1212    }
1213
1214    /// Unpause playback for subscriber
1215    async fn do_unpause(&mut self, stream_id: u32) -> Result<()> {
1216        if !self.is_paused {
1217            return Ok(()); // Not paused
1218        }
1219
1220        self.is_paused = false;
1221
1222        // Force keyframe sync for clean video resumption
1223        // Also skip audio to avoid hearing audio while video is frozen
1224        self.subscriber_state = SubscriberState::SkippingToKeyframe;
1225        self.skip_audio_until_keyframe = true;
1226
1227        // Send onStatus(NetStream.Unpause.Notify)
1228        let status = Command::on_status(stream_id, "status", NS_UNPAUSE_NOTIFY, "Playback resumed");
1229        self.send_command(CSID_COMMAND, stream_id, &status).await?;
1230
1231        // Send StreamBegin to indicate data resuming
1232        self.send_user_control(UserControlEvent::StreamBegin(stream_id))
1233            .await?;
1234
1235        // Resend sequence headers to reinitialize decoder
1236        // This is critical for clean playback resumption
1237        if let Some(ref key) = self.subscribed_to {
1238            let headers = self.registry.get_sequence_headers(key).await;
1239            tracing::debug!(
1240                session_id = self.state.id,
1241                header_count = headers.len(),
1242                "Resending sequence headers after unpause"
1243            );
1244            for frame in headers {
1245                match frame.frame_type {
1246                    FrameType::Video => {
1247                        self.send_video(stream_id, frame.timestamp, frame.data)
1248                            .await?;
1249                    }
1250                    FrameType::Audio => {
1251                        self.send_audio(stream_id, frame.timestamp, frame.data)
1252                            .await?;
1253                    }
1254                    _ => {}
1255                }
1256            }
1257            self.writer.flush().await?;
1258        }
1259
1260        // Notify handler
1261        let stream_key = self
1262            .subscribed_to
1263            .as_ref()
1264            .map(|k| k.name.clone())
1265            .unwrap_or_default();
1266        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, false);
1267        self.handler.on_unpause(&stream_ctx).await;
1268
1269        tracing::info!(
1270            session_id = self.state.id,
1271            frames_dropped = self.frames_dropped_while_paused,
1272            "Subscriber unpaused (waiting for keyframe)"
1273        );
1274        self.frames_dropped_while_paused = 0;
1275        Ok(())
1276    }
1277
1278    /// Handle data message
1279    async fn handle_data(&mut self, data: DataMessage) -> Result<()> {
1280        match data.name.as_str() {
1281            CMD_SET_DATA_FRAME => {
1282                // @setDataFrame usually has "onMetaData" as first value
1283                if let Some(AmfValue::String(name)) = data.values.first() {
1284                    if name == CMD_ON_METADATA {
1285                        self.handle_metadata(data.stream_id, &data.values[1..])
1286                            .await?;
1287                    }
1288                }
1289            }
1290            CMD_ON_METADATA => {
1291                self.handle_metadata(data.stream_id, &data.values).await?;
1292            }
1293            _ => {
1294                tracing::trace!(name = data.name, "Unknown data message");
1295            }
1296        }
1297        Ok(())
1298    }
1299
1300    /// Handle metadata
1301    async fn handle_metadata(&mut self, stream_id: u32, values: &[AmfValue]) -> Result<()> {
1302        // Extract metadata object
1303        let metadata: HashMap<String, AmfValue> = values
1304            .first()
1305            .and_then(|v| v.as_object().cloned())
1306            .unwrap_or_default();
1307
1308        if let Some(stream) = self.state.get_stream_mut(stream_id) {
1309            stream.on_metadata();
1310        }
1311
1312        if let Some(stream) = self.state.get_stream(stream_id) {
1313            if stream.is_publishing() {
1314                let stream_ctx = StreamContext::new(
1315                    self.context.clone(),
1316                    stream_id,
1317                    stream.stream_key.clone().unwrap_or_default(),
1318                    true,
1319                );
1320                self.handler.on_metadata(&stream_ctx, &metadata).await;
1321            }
1322        }
1323
1324        Ok(())
1325    }
1326
1327    /// Handle audio message
1328    async fn handle_audio(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1329        if data.is_empty() {
1330            return Ok(());
1331        }
1332
1333        if let Some(prev_audio_ts) = self.last_audio_ts {
1334            // Use wrapping_sub to handle timestamp wraparound (RTMP timestamps are 32-bit)
1335            let timestamp_delta = timestamp.wrapping_sub(prev_audio_ts);
1336            tracing::trace!(
1337                timestamp = timestamp,
1338                last_audio_ts = prev_audio_ts,
1339                timestamp_delta = timestamp_delta,
1340                "connection handle_audio"
1341            );
1342        }
1343        self.last_audio_ts = Some(timestamp);
1344
1345        // Detect Enhanced RTMP audio (SoundFormat=9) vs legacy FLV
1346        let is_enhanced = EnhancedAudioData::is_enhanced(data[0]);
1347
1348        // Detect and log codec on first frame or codec change
1349        let detected = if is_enhanced {
1350            // Enhanced audio: FourCC is at bytes 1-4 (after header byte)
1351            if data.len() >= 5 {
1352                AudioFourCc::from_bytes(&data[1..]).map(DetectedCodec::EnhancedAudio)
1353            } else {
1354                None
1355            }
1356        } else {
1357            // Legacy audio: sound format is in upper 4 bits
1358            Some(DetectedCodec::LegacyAudio(data[0] >> 4))
1359        };
1360
1361        if let Some(ref codec) = detected {
1362            if self.detected_audio_codec.as_ref() != Some(codec) {
1363                tracing::info!(
1364                    session_id = self.state.id,
1365                    codec = codec.name(),
1366                    enhanced = codec.is_enhanced(),
1367                    "Audio codec detected"
1368                );
1369                self.detected_audio_codec = Some(codec.clone());
1370            }
1371        }
1372
1373        // Find publishing stream
1374        let stream_id = self.find_publishing_stream()?;
1375        let stream = self
1376            .state
1377            .get_stream_mut(stream_id)
1378            .ok_or(ProtocolError::StreamNotFound(stream_id))?;
1379
1380        let is_header = if is_enhanced {
1381            // Enhanced audio: packet type 0 = SequenceStart
1382            (data[0] & 0x0F) == 0
1383        } else {
1384            // Legacy AAC: SoundFormat=10 (0xA_) and AACPacketType=0
1385            data.len() >= 2 && (data[0] >> 4) == 10 && data[1] == 0
1386        };
1387        stream.on_audio(timestamp, is_header, data.len());
1388
1389        // Store sequence header
1390        if is_header {
1391            let tag = FlvTag::audio(timestamp, data.clone());
1392            stream.gop_buffer.set_audio_header(tag);
1393        }
1394
1395        // Create stream context for callbacks
1396        let stream_key = stream.stream_key.clone().unwrap_or_default();
1397        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1398
1399        // Deliver based on mode
1400        let mode = self.handler.media_delivery_mode();
1401
1402        if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1403            let tag = FlvTag::audio(timestamp, data.clone());
1404            self.handler.on_media_tag(&stream_ctx, &tag).await;
1405        }
1406
1407        if matches!(
1408            mode,
1409            MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1410        ) {
1411            if is_enhanced {
1412                // Enhanced RTMP audio (Opus, FLAC, AC-3, etc.)
1413                if let Ok(enhanced_data) = EnhancedAudioData::parse(data.clone()) {
1414                    self.handler
1415                        .on_enhanced_audio_frame(&stream_ctx, &enhanced_data, timestamp)
1416                        .await;
1417                }
1418            } else if data.len() >= 2 && (data[0] >> 4) == 10 {
1419                // Legacy AAC
1420                if let Ok(aac_data) = AacData::parse(data.slice(1..)) {
1421                    self.handler
1422                        .on_audio_frame(&stream_ctx, &aac_data, timestamp)
1423                        .await;
1424                }
1425            }
1426        }
1427
1428        // Broadcast to subscribers via registry
1429        if let Some(ref key) = self.publishing_to {
1430            let frame = BroadcastFrame::audio(timestamp, data, is_header);
1431            self.registry.broadcast(key, frame).await;
1432        }
1433
1434        Ok(())
1435    }
1436
1437    /// Handle video message
1438    async fn handle_video(&mut self, timestamp: u32, data: Bytes) -> Result<()> {
1439        if data.is_empty() {
1440            return Ok(());
1441        }
1442
1443        if let Some(prev_video_ts) = self.last_video_ts {
1444            // Use wrapping_sub to handle timestamp wraparound (RTMP timestamps are 32-bit)
1445            let timestamp_delta = timestamp.wrapping_sub(prev_video_ts);
1446            tracing::trace!(
1447                timestamp = timestamp,
1448                last_video_ts = prev_video_ts,
1449                timestamp_delta = timestamp_delta,
1450                "connection handle_video"
1451            );
1452        }
1453        self.last_video_ts = Some(timestamp);
1454
1455        // Detect Enhanced RTMP video (ExVideoTagHeader) vs legacy FLV
1456        let is_enhanced = EnhancedVideoData::is_enhanced(data[0]);
1457
1458        // Detect and log codec on first frame or codec change
1459        let detected = if is_enhanced {
1460            // Enhanced video: FourCC is at bytes 1-4 (after header byte)
1461            if data.len() >= 5 {
1462                VideoFourCc::from_bytes(&data[1..]).map(DetectedCodec::EnhancedVideo)
1463            } else {
1464                None
1465            }
1466        } else {
1467            // Legacy video: codec ID is in lower 4 bits
1468            Some(DetectedCodec::LegacyVideo(data[0] & 0x0F))
1469        };
1470
1471        if let Some(ref codec) = detected {
1472            if self.detected_video_codec.as_ref() != Some(codec) {
1473                tracing::info!(
1474                    session_id = self.state.id,
1475                    codec = codec.name(),
1476                    enhanced = codec.is_enhanced(),
1477                    "Video codec detected"
1478                );
1479                self.detected_video_codec = Some(codec.clone());
1480            }
1481        }
1482
1483        // Find publishing stream
1484        let stream_id = self.find_publishing_stream()?;
1485        let stream = self
1486            .state
1487            .get_stream_mut(stream_id)
1488            .ok_or(ProtocolError::StreamNotFound(stream_id))?;
1489
1490        let (is_keyframe, is_header) = if is_enhanced {
1491            // ExVideoTagHeader format: bit 7 set, bits 4-6 are frame type, bits 0-3 are packet type
1492            let frame_type = (data[0] >> 4) & 0x07;
1493            let packet_type = data[0] & 0x0F;
1494            let is_keyframe = frame_type == 1 || frame_type == 4; // Keyframe or GeneratedKeyframe
1495            let is_header = packet_type == 0; // SequenceStart
1496            (is_keyframe, is_header)
1497        } else {
1498            // Legacy FLV format: bits 4-7 are frame type, bits 0-3 are codec ID
1499            let is_keyframe = (data[0] >> 4) == 1;
1500            let is_header = data.len() >= 2 && (data[0] & 0x0F) == 7 && data[1] == 0;
1501            (is_keyframe, is_header)
1502        };
1503        stream.on_video(timestamp, is_keyframe, is_header, data.len());
1504
1505        // Create FLV tag
1506        let tag = FlvTag::video(timestamp, data.clone());
1507
1508        // Store sequence header
1509        if is_header {
1510            stream.gop_buffer.set_video_header(tag.clone());
1511        } else {
1512            // Add to GOP buffer
1513            stream.gop_buffer.push(tag.clone());
1514        }
1515
1516        // Create stream context for callbacks
1517        let stream_key = stream.stream_key.clone().unwrap_or_default();
1518        let stream_ctx = StreamContext::new(self.context.clone(), stream_id, stream_key, true);
1519
1520        // Notify keyframe
1521        if is_keyframe && !is_header {
1522            self.handler.on_keyframe(&stream_ctx, timestamp).await;
1523        }
1524
1525        // Deliver based on mode
1526        let mode = self.handler.media_delivery_mode();
1527
1528        if matches!(mode, MediaDeliveryMode::RawFlv | MediaDeliveryMode::Both) {
1529            self.handler.on_media_tag(&stream_ctx, &tag).await;
1530        }
1531
1532        if matches!(
1533            mode,
1534            MediaDeliveryMode::ParsedFrames | MediaDeliveryMode::Both
1535        ) {
1536            if is_enhanced {
1537                // Enhanced RTMP video (HEVC, AV1, VP9, etc.)
1538                if let Ok(enhanced_data) = EnhancedVideoData::parse(data.clone()) {
1539                    self.handler
1540                        .on_enhanced_video_frame(&stream_ctx, &enhanced_data, timestamp)
1541                        .await;
1542                }
1543            } else if data.len() >= 2 && (data[0] & 0x0F) == 7 {
1544                // Legacy AVC/H.264
1545                if let Ok(h264_data) = H264Data::parse(data.slice(1..)) {
1546                    self.handler
1547                        .on_video_frame(&stream_ctx, &h264_data, timestamp)
1548                        .await;
1549                }
1550            }
1551        }
1552
1553        // Broadcast to subscribers via registry
1554        if let Some(ref key) = self.publishing_to {
1555            let frame = BroadcastFrame::video(timestamp, data, is_keyframe, is_header);
1556            self.registry.broadcast(key, frame).await;
1557        }
1558
1559        Ok(())
1560    }
1561
1562    /// Find the publishing stream (assumes single publish per connection)
1563    fn find_publishing_stream(&self) -> Result<u32> {
1564        for (id, stream) in &self.state.streams {
1565            if stream.is_publishing() {
1566                return Ok(*id);
1567            }
1568        }
1569        Err(ProtocolError::StreamNotFound(0).into())
1570    }
1571
1572    // === Message sending helpers ===
1573
1574    async fn send_command(&mut self, csid: u32, stream_id: u32, cmd: &Command) -> Result<()> {
1575        let (msg_type, payload) = RtmpMessage::Command(cmd.clone()).encode();
1576
1577        let chunk = RtmpChunk {
1578            csid,
1579            timestamp: 0,
1580            message_type: msg_type,
1581            stream_id,
1582            payload,
1583        };
1584
1585        self.write_buf.clear();
1586        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1587        self.writer.write_all(&self.write_buf).await?;
1588        self.writer.flush().await?;
1589
1590        Ok(())
1591    }
1592
1593    async fn send_connect_result_with_ertmp(
1594        &mut self,
1595        transaction_id: f64,
1596        negotiated_caps: Option<&crate::protocol::enhanced::EnhancedCapabilities>,
1597    ) -> Result<()> {
1598        // Build connect response using the builder
1599        let mut builder = ConnectResponseBuilder::new()
1600            .fms_ver("FMS/3,5,7,7009")
1601            .capabilities(31);
1602
1603        // Add E-RTMP capabilities if negotiated
1604        if let Some(caps) = negotiated_caps {
1605            if caps.enabled {
1606                builder = builder.enhanced_capabilities(caps);
1607            }
1608        }
1609
1610        let result = builder.build(transaction_id);
1611
1612        self.send_command(CSID_COMMAND, 0, &result).await
1613    }
1614
1615    async fn send_connect_error(&mut self, transaction_id: f64, reason: &str) -> Result<()> {
1616        let mut info = HashMap::new();
1617        info.insert("level".to_string(), AmfValue::String("error".into()));
1618        info.insert(
1619            "code".to_string(),
1620            AmfValue::String(NC_CONNECT_REJECTED.into()),
1621        );
1622        info.insert("description".to_string(), AmfValue::String(reason.into()));
1623
1624        let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1625
1626        self.send_command(CSID_COMMAND, 0, &error).await
1627    }
1628
1629    async fn send_connect_redirect(&mut self, transaction_id: f64, url: &str) -> Result<()> {
1630        let mut info = HashMap::new();
1631        info.insert("level".to_string(), AmfValue::String("error".into()));
1632        info.insert(
1633            "code".to_string(),
1634            AmfValue::String(NC_CONNECT_REJECTED.into()),
1635        );
1636        info.insert(
1637            "description".to_string(),
1638            AmfValue::String("Redirect".into()),
1639        );
1640        info.insert("ex.redirect".to_string(), AmfValue::String(url.into()));
1641
1642        let error = Command::error(transaction_id, AmfValue::Null, AmfValue::Object(info));
1643
1644        self.send_command(CSID_COMMAND, 0, &error).await
1645    }
1646
1647    async fn send_set_chunk_size(&mut self, size: u32) -> Result<()> {
1648        let (msg_type, payload) = RtmpMessage::SetChunkSize(size).encode();
1649
1650        let chunk = RtmpChunk {
1651            csid: CSID_PROTOCOL_CONTROL,
1652            timestamp: 0,
1653            message_type: msg_type,
1654            stream_id: 0,
1655            payload,
1656        };
1657
1658        self.write_buf.clear();
1659        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1660        self.writer.write_all(&self.write_buf).await?;
1661        self.writer.flush().await?;
1662
1663        Ok(())
1664    }
1665
1666    async fn send_window_ack_size(&mut self, size: u32) -> Result<()> {
1667        let (msg_type, payload) = RtmpMessage::WindowAckSize(size).encode();
1668
1669        let chunk = RtmpChunk {
1670            csid: CSID_PROTOCOL_CONTROL,
1671            timestamp: 0,
1672            message_type: msg_type,
1673            stream_id: 0,
1674            payload,
1675        };
1676
1677        self.write_buf.clear();
1678        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1679        self.writer.write_all(&self.write_buf).await?;
1680        self.writer.flush().await?;
1681
1682        Ok(())
1683    }
1684
1685    async fn send_peer_bandwidth(&mut self, size: u32) -> Result<()> {
1686        let (msg_type, payload) = RtmpMessage::SetPeerBandwidth {
1687            size,
1688            limit_type: BANDWIDTH_LIMIT_DYNAMIC,
1689        }
1690        .encode();
1691
1692        let chunk = RtmpChunk {
1693            csid: CSID_PROTOCOL_CONTROL,
1694            timestamp: 0,
1695            message_type: msg_type,
1696            stream_id: 0,
1697            payload,
1698        };
1699
1700        self.write_buf.clear();
1701        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1702        self.writer.write_all(&self.write_buf).await?;
1703        self.writer.flush().await?;
1704
1705        Ok(())
1706    }
1707
1708    async fn send_user_control(&mut self, event: UserControlEvent) -> Result<()> {
1709        let (msg_type, payload) = RtmpMessage::UserControl(event).encode();
1710
1711        let chunk = RtmpChunk {
1712            csid: CSID_PROTOCOL_CONTROL,
1713            timestamp: 0,
1714            message_type: msg_type,
1715            stream_id: 0,
1716            payload,
1717        };
1718
1719        self.write_buf.clear();
1720        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1721        self.writer.write_all(&self.write_buf).await?;
1722        self.writer.flush().await?;
1723
1724        Ok(())
1725    }
1726
1727    async fn send_acknowledgement(&mut self) -> Result<()> {
1728        let sequence = self.state.bytes_received as u32;
1729
1730        let (msg_type, payload) = RtmpMessage::Acknowledgement { sequence }.encode();
1731
1732        let chunk = RtmpChunk {
1733            csid: CSID_PROTOCOL_CONTROL,
1734            timestamp: 0,
1735            message_type: msg_type,
1736            stream_id: 0,
1737            payload,
1738        };
1739
1740        self.write_buf.clear();
1741        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1742        self.writer.write_all(&self.write_buf).await?;
1743        self.writer.flush().await?;
1744
1745        self.state.mark_ack_sent();
1746        Ok(())
1747    }
1748
1749    async fn send_ping_response(&mut self, timestamp: u32) -> Result<()> {
1750        self.send_user_control(UserControlEvent::PingResponse(timestamp))
1751            .await
1752    }
1753
1754    // === Media sending methods for subscriber mode ===
1755
1756    /// Send a video message to the client
1757    async fn send_video(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1758        let (msg_type, payload) = RtmpMessage::Video {
1759            timestamp,
1760            data: data.clone(),
1761        }
1762        .encode();
1763
1764        let chunk = RtmpChunk {
1765            csid: CSID_VIDEO,
1766            timestamp,
1767            message_type: msg_type,
1768            stream_id,
1769            payload,
1770        };
1771
1772        self.write_buf.clear();
1773        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1774        self.writer.write_all(&self.write_buf).await?;
1775        // Don't flush after every frame - batch writes for efficiency
1776        Ok(())
1777    }
1778
1779    /// Send an audio message to the client
1780    async fn send_audio(&mut self, stream_id: u32, timestamp: u32, data: Bytes) -> Result<()> {
1781        let (msg_type, payload) = RtmpMessage::Audio {
1782            timestamp,
1783            data: data.clone(),
1784        }
1785        .encode();
1786
1787        let chunk = RtmpChunk {
1788            csid: CSID_AUDIO,
1789            timestamp,
1790            message_type: msg_type,
1791            stream_id,
1792            payload,
1793        };
1794
1795        self.write_buf.clear();
1796        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1797        self.writer.write_all(&self.write_buf).await?;
1798        // Don't flush after every frame - batch writes for efficiency
1799        Ok(())
1800    }
1801
1802    /// Send a broadcast frame to the subscriber client
1803    ///
1804    /// Handles backpressure by skipping non-keyframes when in skip mode.
1805    /// Also handles pause state by consuming frames without sending.
1806    async fn send_broadcast_frame(&mut self, frame: BroadcastFrame) -> Result<()> {
1807        let stream_id = self.playback_stream_id.unwrap_or(1);
1808
1809        // PAUSE: Consume frame but don't send
1810        if self.is_paused {
1811            self.frames_dropped_while_paused += 1;
1812            tracing::trace!(session_id = self.state.id, "Frame dropped (paused)");
1813            return Ok(());
1814        }
1815
1816        // Backpressure handling: skip non-keyframes if we're lagging
1817        if self.subscriber_state == SubscriberState::SkippingToKeyframe {
1818            match frame.frame_type {
1819                FrameType::Video => {
1820                    if frame.is_keyframe || frame.is_header {
1821                        // Got a keyframe or header, resume normal operation
1822                        self.subscriber_state = SubscriberState::Normal;
1823                        self.skip_audio_until_keyframe = false;
1824                        tracing::debug!(
1825                            session_id = self.state.id,
1826                            "Received keyframe, resuming normal playback"
1827                        );
1828                    } else {
1829                        // Skip non-keyframe video
1830                        return Ok(());
1831                    }
1832                }
1833                FrameType::Audio => {
1834                    // Skip audio after unpause to avoid audio playing while video frozen
1835                    // But keep audio during lag recovery (glitches worse than brief desync)
1836                    if self.skip_audio_until_keyframe {
1837                        return Ok(());
1838                    }
1839                }
1840                FrameType::Metadata => {
1841                    // Always forward metadata
1842                }
1843            }
1844        }
1845
1846        // Send the frame based on type
1847        match frame.frame_type {
1848            FrameType::Video => {
1849                self.send_video(stream_id, frame.timestamp, frame.data)
1850                    .await?;
1851            }
1852            FrameType::Audio => {
1853                self.send_audio(stream_id, frame.timestamp, frame.data)
1854                    .await?;
1855            }
1856            FrameType::Metadata => {
1857                // Send metadata as data message
1858                self.send_metadata_frame(stream_id, frame.data).await?;
1859            }
1860        }
1861
1862        // Periodically flush to ensure data is sent
1863        // The flush happens less frequently when sending many frames
1864        self.writer.flush().await?;
1865
1866        Ok(())
1867    }
1868
1869    /// Send metadata frame to subscriber
1870    async fn send_metadata_frame(&mut self, stream_id: u32, data: Bytes) -> Result<()> {
1871        // Metadata is sent as a data message with onMetaData
1872        let data_msg = DataMessage {
1873            name: CMD_ON_METADATA.to_string(),
1874            values: vec![AmfValue::String("onMetaData".to_string())],
1875            stream_id,
1876        };
1877
1878        let (msg_type, payload) = RtmpMessage::Data(data_msg).encode();
1879
1880        // If the original data is available, use it directly
1881        // Otherwise, encode the metadata
1882        let chunk = RtmpChunk {
1883            csid: CSID_COMMAND,
1884            timestamp: 0,
1885            message_type: msg_type,
1886            stream_id,
1887            payload: if data.is_empty() { payload } else { data },
1888        };
1889
1890        self.write_buf.clear();
1891        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
1892        self.writer.write_all(&self.write_buf).await?;
1893
1894        Ok(())
1895    }
1896}
1897
1898use bytes::Buf;