Skip to main content

arcly_stream/protocol/rtmp/
mod.rs

1//! A working **RTMP** [`InboundProtocol`](crate::InboundProtocol): publish
2//! (ingest) and play (egress) over the real Adobe wire protocol.
3//!
4//! This is the crate's **reference implementation** of the
5//! [multi-protocol ingestion architecture](crate::inbound) — a real, end-to-end
6//! template for the shape an RTSP, SRT, or WHIP worker would take.
7//!
8//! Gated behind the `rtmp` feature. This is not a stub over the generic TCP
9//! accept loop — it performs the simple handshake, reassembles the chunk stream,
10//! decodes AMF0 commands, and bridges FLV audio/video tags to the engine's
11//! lock-free bus:
12//!
13//! * **Publish** — `connect` → `createStream` → `publish` brings up a
14//!   [`StreamHandle`] via [`PublishRegistry::start_publish`]. Incoming AVC NALUs
15//!   are converted to Annex-B (keyframes self-contained with SPS/PPS) and AAC to
16//!   ADTS, then pushed through [`StreamHandle::publish_frame`] — so the GOP cache,
17//!   QoS counters, and any [`Muxer`](crate::packager::Muxer) (e.g.
18//!   [`MpegTsMuxer`](crate::packager::MpegTsMuxer)) work unchanged.
19//! * **Play** — `play` resolves an existing stream, replays the cached configs +
20//!   current GOP for an instant start, then forwards live frames as FLV tags.
21//!
22//! ```no_run
23//! # async fn demo() -> arcly_stream::Result<()> {
24//! use arcly_stream::prelude::*;
25//! use arcly_stream::protocol::rtmp::RtmpHandler;
26//!
27//! let engine = Engine::builder()
28//!     .application(AppSpec::new("live").gop_cache(120))
29//!     .build();
30//! // `.with_playback` enables `play`; `Arc<Engine>` coerces to both registry traits.
31//! let handler = RtmpHandler::new("0.0.0.0:1935".parse().unwrap())
32//!     .with_playback(engine.clone());
33//! // Publish to rtmp://host/live/<stream>, play from the same URL.
34//! engine.serve(vec![Box::new(handler)], CancellationToken::new()).await
35//! # }
36//! ```
37
38mod amf;
39mod chunk;
40mod flv;
41mod handshake;
42
43use crate::bus::{PlaybackRegistry, PublishRegistry, StreamHandle};
44use crate::{MediaFrame, Result, StreamError, StreamKey};
45use amf::Amf0Value;
46use async_trait::async_trait;
47use chunk::{ChunkReader, ChunkWriter, RtmpMessage};
48use std::net::SocketAddr;
49use std::sync::Arc;
50use tokio::net::TcpStream;
51use tokio_util::sync::CancellationToken;
52use tracing::{debug, info, warn};
53
54/// Default RTMP port.
55pub const DEFAULT_RTMP_PORT: u16 = 1935;
56/// Chunk size the server announces to peers (larger than the 128-byte default
57/// reduces per-chunk header overhead on the media path).
58const OUT_CHUNK_SIZE: usize = 4096;
59
60// RTMP message type ids.
61const MSG_SET_CHUNK_SIZE: u8 = 1;
62const MSG_ACK: u8 = 3;
63const MSG_USER_CONTROL: u8 = 4;
64const MSG_WINDOW_ACK_SIZE: u8 = 5;
65const MSG_SET_PEER_BANDWIDTH: u8 = 6;
66const MSG_AUDIO: u8 = 8;
67const MSG_VIDEO: u8 = 9;
68const MSG_DATA_AMF3: u8 = 15;
69const MSG_DATA_AMF0: u8 = 18;
70const MSG_COMMAND_AMF3: u8 = 17;
71const MSG_COMMAND_AMF0: u8 = 20;
72
73// Chunk-stream ids we emit on.
74const CSID_CONTROL: u8 = 2;
75const CSID_COMMAND: u8 = 3;
76const CSID_AUDIO: u8 = 4;
77const CSID_VIDEO: u8 = 6;
78
79/// The message stream id assigned to every published/played stream. RTMP allows
80/// many; one per connection is sufficient for a live origin.
81const STREAM_ID: u32 = 1;
82
83/// RTMP protocol handler. One instance serves an address; each connection is an
84/// independent publish or play session.
85///
86/// Publish (ingest) works through the [`InboundProtocol`](crate::InboundProtocol)
87/// contract alone — the [`IngestContext`](crate::inbound::IngestContext) handed to
88/// [`serve`](crate::InboundProtocol::serve) carries the publish registry. Play
89/// (egress) additionally needs read access to live streams, which the ingest
90/// context does not provide, so supply a [`PlaybackRegistry`] via
91/// [`with_playback`](Self::with_playback) (the bundled [`Engine`](crate::Engine)
92/// implements both). Without it, `play` requests are answered with a failure status.
93pub struct RtmpHandler {
94    addr: SocketAddr,
95    max_connections: usize,
96    playback: Option<Arc<dyn PlaybackRegistry>>,
97}
98
99impl RtmpHandler {
100    /// Create a handler bound to `addr` (typically `0.0.0.0:1935`).
101    pub fn new(addr: SocketAddr) -> Self {
102        Self {
103            addr,
104            max_connections: 1024,
105            playback: None,
106        }
107    }
108
109    /// Cap the number of concurrent connections (default 1024).
110    pub fn max_connections(mut self, max: usize) -> Self {
111        self.max_connections = max;
112        self
113    }
114
115    /// Enable `play` (egress) by providing a [`PlaybackRegistry`] to resolve live
116    /// streams from. Pass the same engine you publish into.
117    pub fn with_playback(mut self, playback: Arc<dyn PlaybackRegistry>) -> Self {
118        self.playback = Some(playback);
119        self
120    }
121}
122
123/// `RtmpHandler` is the **reference [`InboundProtocol`](crate::InboundProtocol)
124/// implementation**: a real-world template for the multi-protocol ingestion
125/// architecture. It owns a TCP listener via
126/// [`run_tcp_ingest_server`](crate::protocol::run_tcp_ingest_server), performs the
127/// RTMP handshake per connection, and bridges FLV AVC/AAC onto the bus through the
128/// [`IngestContext`](crate::inbound::IngestContext)'s registry — exactly the shape
129/// an RTSP, SRT, or WHIP worker would take.
130#[async_trait]
131impl crate::inbound::InboundProtocol for RtmpHandler {
132    fn name(&self) -> &'static str {
133        "rtmp"
134    }
135
136    async fn serve(
137        &self,
138        ctx: crate::inbound::IngestContext,
139        shutdown: CancellationToken,
140    ) -> Result<()> {
141        info!(addr = %self.addr, "rtmp handler listening");
142        // Per-connection sessions own their own multi-stream state machine, so
143        // the handler talks to the registry directly rather than a single
144        // PublishSession; the ergonomic session path is shown in the
145        // `custom_protocol_plugin` example.
146        let registry = Arc::clone(ctx.registry());
147        let playback = self.playback.clone();
148        crate::protocol::run_tcp_ingest_server(
149            self.addr,
150            self.max_connections,
151            shutdown,
152            move |sock, peer| {
153                let registry = Arc::clone(&registry);
154                let playback = playback.clone();
155                async move {
156                    if let Err(e) = handle_connection(sock, peer, registry, playback).await {
157                        warn!(%peer, error = %e, "rtmp session ended with error");
158                    }
159                }
160            },
161        )
162        .await
163    }
164}
165
166/// Handshake, split the socket, and run the session state machine.
167async fn handle_connection(
168    mut sock: TcpStream,
169    peer: SocketAddr,
170    registry: Arc<dyn PublishRegistry>,
171    playback: Option<Arc<dyn PlaybackRegistry>>,
172) -> Result<()> {
173    handshake::accept(&mut sock).await?;
174    debug!(%peer, "rtmp handshake complete");
175    let (read_half, write_half) = sock.into_split();
176    let mut session = Session::new(
177        ChunkReader::new(read_half),
178        ChunkWriter::new(write_half),
179        registry,
180        playback,
181    );
182    session.run().await
183}
184
185/// The decoded role of a session, determined by the first `publish`/`play` command.
186#[derive(PartialEq)]
187enum Role {
188    Pending,
189    Publishing,
190    Playing,
191}
192
193/// Per-connection RTMP session state.
194struct Session<R, W> {
195    reader: ChunkReader<R>,
196    writer: ChunkWriter<W>,
197    registry: Arc<dyn PublishRegistry>,
198    playback: Option<Arc<dyn PlaybackRegistry>>,
199    app: Option<String>,
200    role: Role,
201    publish_key: Option<StreamKey>,
202    handle: Option<StreamHandle>,
203    avc: Option<flv::AvcConfig>,
204    aac: Option<flv::AudioConfig>,
205    /// Egress: whether an AAC sequence header has been sent to a play client yet.
206    audio_seq_sent: bool,
207    stop: bool,
208}
209
210impl<R, W> Session<R, W>
211where
212    R: tokio::io::AsyncRead + Unpin,
213    W: tokio::io::AsyncWrite + Unpin,
214{
215    fn new(
216        reader: ChunkReader<R>,
217        writer: ChunkWriter<W>,
218        registry: Arc<dyn PublishRegistry>,
219        playback: Option<Arc<dyn PlaybackRegistry>>,
220    ) -> Self {
221        Self {
222            reader,
223            writer,
224            registry,
225            playback,
226            app: None,
227            role: Role::Pending,
228            publish_key: None,
229            handle: None,
230            avc: None,
231            aac: None,
232            audio_seq_sent: false,
233            stop: false,
234        }
235    }
236
237    /// Drive the session to completion, releasing the publish slot on exit.
238    async fn run(&mut self) -> Result<()> {
239        let result = self.event_loop().await;
240        if let Some(key) = self.publish_key.take() {
241            let _ = self.registry.end_publish(&key).await;
242        }
243        match result {
244            Ok(()) => Ok(()),
245            Err(e) if is_disconnect(&e) => Ok(()),
246            Err(e) => Err(e),
247        }
248    }
249
250    async fn event_loop(&mut self) -> Result<()> {
251        while !self.stop {
252            let msg = self.reader.read_message().await?;
253            self.process(msg).await?;
254        }
255        Ok(())
256    }
257
258    async fn process(&mut self, msg: RtmpMessage) -> Result<()> {
259        match msg.type_id {
260            MSG_SET_CHUNK_SIZE => {
261                if let Ok(b) = <[u8; 4]>::try_from(&msg.payload[..msg.payload.len().min(4)]) {
262                    self.reader.set_chunk_size(u32::from_be_bytes(b) as usize);
263                }
264            }
265            MSG_COMMAND_AMF0 => self.handle_command(&msg.payload).await?,
266            MSG_COMMAND_AMF3 => {
267                // AMF3 command payloads are prefixed with a single 0x00 byte then
268                // carry an AMF0 body in practice.
269                self.handle_command(&msg.payload[1.min(msg.payload.len())..])
270                    .await?
271            }
272            MSG_VIDEO if self.role == Role::Publishing => self.on_video(&msg)?,
273            MSG_AUDIO if self.role == Role::Publishing => self.on_audio(&msg)?,
274            MSG_DATA_AMF0 | MSG_DATA_AMF3 => { /* onMetaData — not required for transport */ }
275            MSG_ACK | MSG_WINDOW_ACK_SIZE | MSG_SET_PEER_BANDWIDTH | MSG_USER_CONTROL => {}
276            other => debug!(
277                type_id = other,
278                stream = msg.msg_stream_id,
279                "ignoring RTMP message"
280            ),
281        }
282        Ok(())
283    }
284
285    // ── Command channel (AMF0) ──────────────────────────────────────────────
286
287    async fn handle_command(&mut self, payload: &[u8]) -> Result<()> {
288        let values = amf::decode_all(payload);
289        let Some(name) = values.first().and_then(Amf0Value::as_str) else {
290            return Ok(());
291        };
292        let txn = values.get(1).and_then(Amf0Value::as_number).unwrap_or(0.0);
293        match name {
294            "connect" => self.on_connect(txn, values.get(2)).await,
295            "createStream" => self.on_create_stream(txn).await,
296            "publish" => self.on_publish(values.get(3)).await,
297            "play" => self.on_play(values.get(3)).await,
298            "releaseStream" | "FCPublish" | "FCUnpublish" | "deleteStream" | "closeStream"
299            | "FCSubscribe" => Ok(()),
300            other => {
301                debug!(command = other, "unhandled RTMP command");
302                Ok(())
303            }
304        }
305    }
306
307    async fn on_connect(&mut self, txn: f64, cmd_obj: Option<&Amf0Value>) -> Result<()> {
308        let app = cmd_obj
309            .and_then(|o| o.get("app"))
310            .and_then(Amf0Value::as_str)
311            .unwrap_or("")
312            .split('?')
313            .next()
314            .unwrap_or("")
315            .to_string();
316        self.app = Some(app);
317
318        // Window Ack Size, Set Peer Bandwidth, Set Chunk Size — protocol prelude.
319        self.send_control(MSG_WINDOW_ACK_SIZE, &2_500_000u32.to_be_bytes())
320            .await?;
321        let mut bw = Vec::from(2_500_000u32.to_be_bytes());
322        bw.push(2); // limit type: dynamic
323        self.send_control(MSG_SET_PEER_BANDWIDTH, &bw).await?;
324        self.send_control(MSG_SET_CHUNK_SIZE, &(OUT_CHUNK_SIZE as u32).to_be_bytes())
325            .await?;
326        self.writer.set_chunk_size(OUT_CHUNK_SIZE);
327
328        let props = amf::object(vec![
329            ("fmsVer", amf::string("FMS/3,0,1,123")),
330            ("capabilities", Amf0Value::Number(31.0)),
331        ]);
332        let info = amf::object(vec![
333            ("level", amf::string("status")),
334            ("code", amf::string("NetConnection.Connect.Success")),
335            ("description", amf::string("Connection succeeded.")),
336            ("objectEncoding", Amf0Value::Number(0.0)),
337        ]);
338        self.send_command(
339            0,
340            &[amf::string("_result"), Amf0Value::Number(txn), props, info],
341        )
342        .await
343    }
344
345    async fn on_create_stream(&mut self, txn: f64) -> Result<()> {
346        self.send_command(
347            0,
348            &[
349                amf::string("_result"),
350                Amf0Value::Number(txn),
351                Amf0Value::Null,
352                Amf0Value::Number(STREAM_ID as f64),
353            ],
354        )
355        .await
356    }
357
358    async fn on_publish(&mut self, name: Option<&Amf0Value>) -> Result<()> {
359        let key = self.stream_key(name)?;
360        let handle = self.registry.start_publish(&key).await?;
361        info!(stream = %key, "rtmp publish started");
362        self.handle = Some(handle);
363        self.publish_key = Some(key);
364        self.role = Role::Publishing;
365        self.send_status("status", "NetStream.Publish.Start", "Publishing started.")
366            .await
367    }
368
369    async fn on_play(&mut self, name: Option<&Amf0Value>) -> Result<()> {
370        let key = self.stream_key(name)?;
371        let Some(playback) = self.playback.clone() else {
372            self.send_status("error", "NetStream.Play.Failed", "Playback not enabled.")
373                .await?;
374            self.stop = true;
375            return Ok(());
376        };
377        let handle = playback.get_stream(&key)?;
378        info!(stream = %key, "rtmp play started");
379        self.role = Role::Playing;
380
381        // StreamBegin user-control event, then the play status events.
382        let mut begin = Vec::from(0u16.to_be_bytes()); // event type 0 = StreamBegin
383        begin.extend_from_slice(&STREAM_ID.to_be_bytes());
384        self.send_control(MSG_USER_CONTROL, &begin).await?;
385        self.send_status("status", "NetStream.Play.Reset", "Playing and resetting.")
386            .await?;
387        self.send_status("status", "NetStream.Play.Start", "Started playing.")
388            .await?;
389
390        self.serve_play(handle).await?;
391        self.stop = true;
392        Ok(())
393    }
394
395    // ── Publish: FLV tag → MediaFrame ───────────────────────────────────────
396
397    fn on_video(&mut self, msg: &RtmpMessage) -> Result<()> {
398        let Some(header) = flv::parse_video_header(&msg.payload) else {
399            return Ok(()); // non-AVC video, ignored
400        };
401        let body = &msg.payload[header.body_offset..];
402        let ts = msg.timestamp as i64;
403
404        if header.avc_packet_type == flv::PKT_SEQUENCE_HEADER {
405            let cfg = flv::AvcConfig::parse(body)
406                .ok_or_else(|| StreamError::protocol("bad AVCDecoderConfigurationRecord"))?;
407            let mut frame =
408                MediaFrame::new_video(ts, ts, cfg.to_annexb(), crate::CodecId::H264, false);
409            frame.flags |= crate::FrameFlags::CONFIG;
410            self.avc = Some(cfg);
411            self.publish(frame);
412        } else if header.avc_packet_type == flv::PKT_RAW {
413            let Some(cfg) = self.avc.as_ref() else {
414                return Ok(()); // NALUs before the sequence header; drop
415            };
416            let annexb = cfg
417                .avcc_to_annexb(body, header.is_keyframe)
418                .ok_or_else(|| StreamError::protocol("malformed AVCC NAL"))?;
419            let pts = ts + header.composition_time as i64;
420            let frame =
421                MediaFrame::new_video(pts, ts, annexb, crate::CodecId::H264, header.is_keyframe);
422            self.publish(frame);
423        }
424        Ok(())
425    }
426
427    fn on_audio(&mut self, msg: &RtmpMessage) -> Result<()> {
428        let payload = &msg.payload;
429        let Some(&b0) = payload.first() else {
430            return Ok(());
431        };
432        if b0 >> 4 != flv::AUDIO_FORMAT_AAC {
433            return Ok(()); // non-AAC audio, ignored
434        }
435        let aac_packet_type = *payload.get(1).unwrap_or(&0);
436        let body = &payload[2.min(payload.len())..];
437        let ts = msg.timestamp as i64;
438
439        if aac_packet_type == flv::PKT_SEQUENCE_HEADER {
440            let cfg = flv::AudioConfig::parse(body)
441                .ok_or_else(|| StreamError::protocol("bad AudioSpecificConfig"))?;
442            self.aac = Some(cfg);
443            let mut frame =
444                MediaFrame::new_audio(ts, bytes::Bytes::copy_from_slice(body), crate::CodecId::AAC);
445            frame.flags |= crate::FrameFlags::CONFIG;
446            self.publish(frame);
447        } else if let Some(cfg) = self.aac.as_ref() {
448            let adts = cfg.to_adts(body);
449            self.publish(MediaFrame::new_audio(ts, adts, crate::CodecId::AAC));
450        }
451        Ok(())
452    }
453
454    fn publish(&self, frame: MediaFrame) {
455        if let Some(handle) = self.handle.as_ref() {
456            let _ = handle.publish_frame(frame);
457        }
458    }
459
460    // ── Play: MediaFrame → FLV tag ──────────────────────────────────────────
461
462    async fn serve_play(&mut self, handle: StreamHandle) -> Result<()> {
463        // Instant start: replay cached configs + current GOP, then go live.
464        for frame in handle.replay_buffer() {
465            self.send_media(&frame).await?;
466        }
467        let mut sub = handle.subscribe_resilient();
468        while let Some(frame) = sub.recv().await {
469            self.send_media(&frame).await?;
470        }
471        Ok(())
472    }
473
474    async fn send_media(&mut self, frame: &MediaFrame) -> Result<()> {
475        let is_config = frame.flags.contains(crate::FrameFlags::CONFIG);
476        match frame.codec {
477            crate::CodecId::H264 => {
478                let ts = frame.dts.max(0) as u32;
479                let tag = if is_config {
480                    let cfg = annexb_to_avc_config(&frame.data);
481                    flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &cfg.to_avc_record())
482                } else {
483                    let avcc = crate::codec::h264::annexb_to_avcc(&frame.data);
484                    let cts = (frame.pts - frame.dts) as i32;
485                    flv::build_video_tag(frame.is_keyframe(), flv::PKT_RAW, cts, &avcc)
486                };
487                self.writer
488                    .write_message(CSID_VIDEO, MSG_VIDEO, ts, STREAM_ID, &tag)
489                    .await
490            }
491            crate::CodecId::AAC => {
492                let ts = frame.pts.max(0) as u32;
493                if is_config {
494                    self.audio_seq_sent = true;
495                    let tag = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &frame.data);
496                    return self
497                        .writer
498                        .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
499                        .await;
500                }
501                // A play client needs the AAC sequence header before raw frames.
502                // If the source provided no config frame, synthesize one from the
503                // ADTS header so egress is self-sufficient from any AAC source.
504                if !self.audio_seq_sent {
505                    if let Some(cfg) = flv::AudioConfig::from_adts(&frame.data) {
506                        let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &cfg.to_asc());
507                        self.writer
508                            .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &seq)
509                            .await?;
510                        self.audio_seq_sent = true;
511                    }
512                }
513                // Engine stores ADTS; strip the 7-byte header back to raw AAC.
514                let raw = frame.data.get(7..).unwrap_or(&[]);
515                let tag = flv::build_audio_tag(flv::PKT_RAW, raw);
516                self.writer
517                    .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
518                    .await
519            }
520            _ => Ok(()), // only H.264 + AAC are carried over RTMP
521        }
522    }
523
524    // ── Low-level senders ───────────────────────────────────────────────────
525
526    /// Resolve the [`StreamKey`] for this session from a publish/play stream name.
527    fn stream_key(&self, name: Option<&Amf0Value>) -> Result<StreamKey> {
528        let app = self
529            .app
530            .clone()
531            .ok_or_else(|| StreamError::protocol("stream command before connect"))?;
532        let stream = name
533            .and_then(Amf0Value::as_str)
534            .ok_or_else(|| StreamError::protocol("missing stream name"))?
535            .split('?')
536            .next()
537            .unwrap_or("")
538            .to_string();
539        Ok(StreamKey::new(app, stream))
540    }
541
542    async fn send_control(&mut self, type_id: u8, payload: &[u8]) -> Result<()> {
543        self.writer
544            .write_message(CSID_CONTROL, type_id, 0, 0, payload)
545            .await
546    }
547
548    async fn send_command(&mut self, msg_stream_id: u32, values: &[Amf0Value]) -> Result<()> {
549        let mut buf = bytes::BytesMut::new();
550        for v in values {
551            v.encode(&mut buf);
552        }
553        self.writer
554            .write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, msg_stream_id, &buf)
555            .await
556    }
557
558    async fn send_status(&mut self, level: &str, code: &str, description: &str) -> Result<()> {
559        let info = amf::object(vec![
560            ("level", amf::string(level)),
561            ("code", amf::string(code)),
562            ("description", amf::string(description)),
563        ]);
564        self.send_command(
565            STREAM_ID,
566            &[
567                amf::string("onStatus"),
568                Amf0Value::Number(0.0),
569                Amf0Value::Null,
570                info,
571            ],
572        )
573        .await
574    }
575}
576
577/// Split an Annex-B SPS/PPS access unit into an [`flv::AvcConfig`] for egress.
578fn annexb_to_avc_config(annexb: &[u8]) -> flv::AvcConfig {
579    let mut cfg = flv::AvcConfig {
580        nal_length_size: 4,
581        ..Default::default()
582    };
583    for nal in crate::codec::h264::iter_nals_annexb(annexb) {
584        match nal.first().map(|b| b & 0x1F) {
585            Some(7) => cfg.sps.push(nal.to_vec()),
586            Some(8) => cfg.pps.push(nal.to_vec()),
587            _ => {}
588        }
589    }
590    cfg
591}
592
593/// Whether an error is a benign peer disconnect (vs. a real protocol fault).
594fn is_disconnect(e: &StreamError) -> bool {
595    matches!(e, StreamError::Io(io) if matches!(
596        io.kind(),
597        std::io::ErrorKind::UnexpectedEof
598            | std::io::ErrorKind::ConnectionReset
599            | std::io::ErrorKind::BrokenPipe
600            | std::io::ErrorKind::ConnectionAborted
601    ))
602}
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607    use crate::{AppSpec, Engine};
608    use bytes::Bytes;
609
610    /// A keyframe AVCC NAL: 4-byte length + IDR slice bytes.
611    fn avcc_idr() -> Vec<u8> {
612        let slice = [0x65u8, 0x88, 0x84, 0x00];
613        let mut v = (slice.len() as u32).to_be_bytes().to_vec();
614        v.extend_from_slice(&slice);
615        v
616    }
617
618    fn avc_decoder_config() -> Vec<u8> {
619        let sps = [0x67u8, 0x42, 0x00, 0x1F, 0xAA];
620        let pps = [0x68u8, 0xCE, 0x3C, 0x80];
621        let mut r = vec![1, sps[1], sps[2], sps[3], 0xFF, 0xE1];
622        r.extend_from_slice(&(sps.len() as u16).to_be_bytes());
623        r.extend_from_slice(&sps);
624        r.push(1);
625        r.extend_from_slice(&(pps.len() as u16).to_be_bytes());
626        r.extend_from_slice(&pps);
627        r
628    }
629
630    /// Build a session whose writer discards output and whose reader is empty,
631    /// publishing into a freshly registered engine stream.
632    async fn publishing_session() -> (Session<tokio::io::Empty, tokio::io::Sink>, Arc<Engine>) {
633        let engine = Engine::builder()
634            .application(AppSpec::new("live").gop_cache(64))
635            .build();
636        let key = StreamKey::new("live", "cam");
637        let handle = engine.start_publish(&key).await.unwrap();
638
639        let mut session = Session::new(
640            ChunkReader::new(tokio::io::empty()),
641            ChunkWriter::new(tokio::io::sink()),
642            engine.clone(),
643            Some(engine.clone()),
644        );
645        session.app = Some("live".into());
646        session.handle = Some(handle);
647        session.role = Role::Publishing;
648        (session, engine)
649    }
650
651    #[tokio::test]
652    async fn publish_video_seq_header_then_keyframe_reaches_gop_cache() {
653        let (mut session, _engine) = publishing_session().await;
654
655        // 1) AVC sequence header → cached video config (Annex-B SPS/PPS).
656        let seq = flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
657        session
658            .on_video(&RtmpMessage {
659                type_id: MSG_VIDEO,
660                timestamp: 0,
661                msg_stream_id: STREAM_ID,
662                payload: seq,
663            })
664            .unwrap();
665
666        // 2) Keyframe NALU → GOP-anchoring frame, self-contained with SPS/PPS.
667        let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
668        session
669            .on_video(&RtmpMessage {
670                type_id: MSG_VIDEO,
671                timestamp: 40,
672                msg_stream_id: STREAM_ID,
673                payload: kf,
674            })
675            .unwrap();
676
677        let handle = session.handle.as_ref().unwrap();
678        let (vcfg, _) = handle.cached_configs();
679        let vcfg = vcfg.expect("video config cached");
680        assert!(vcfg.flags.contains(crate::FrameFlags::CONFIG));
681        // Config is Annex-B SPS then PPS.
682        assert_eq!(&vcfg.data[..5], &[0, 0, 0, 1, 0x67]);
683
684        // The keyframe is present in the replay buffer and starts with SPS (params
685        // were prepended so a player can start decoding immediately).
686        let replay = handle.replay_buffer();
687        let key = replay
688            .iter()
689            .find(|f| f.is_keyframe())
690            .expect("keyframe in GOP");
691        assert_eq!(&key.data[..5], &[0, 0, 0, 1, 0x67]);
692    }
693
694    #[tokio::test]
695    async fn publish_aac_builds_adts_frames() {
696        let (mut session, _engine) = publishing_session().await;
697        // AudioSpecificConfig: AAC-LC, 44.1 kHz, stereo → bytes 0x12 0x10.
698        let asc = [0x12u8, 0x10];
699        let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &asc);
700        session
701            .on_audio(&RtmpMessage {
702                type_id: MSG_AUDIO,
703                timestamp: 0,
704                msg_stream_id: STREAM_ID,
705                payload: seq,
706            })
707            .unwrap();
708
709        let raw = flv::build_audio_tag(flv::PKT_RAW, &[0x01, 0x02, 0x03]);
710        session
711            .on_audio(&RtmpMessage {
712                type_id: MSG_AUDIO,
713                timestamp: 23,
714                msg_stream_id: STREAM_ID,
715                payload: raw,
716            })
717            .unwrap();
718
719        // Subscribe after the fact: the cached audio config is the raw ASC.
720        let handle = session.handle.as_ref().unwrap();
721        let (_, acfg) = handle.cached_configs();
722        assert_eq!(&acfg.unwrap().data[..], &asc);
723    }
724
725    #[tokio::test]
726    async fn full_publish_session_drives_real_chunk_reader() {
727        use tokio::io::AsyncReadExt;
728
729        let engine = Engine::builder()
730            .application(AppSpec::new("live").gop_cache(64))
731            .build();
732
733        // Real TCP loopback so the client can half-close (FIN) its write side while
734        // still draining the server's control responses — exactly how OBS/FFmpeg
735        // behave. The session's post-handshake event loop is driven directly.
736        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
737        let addr = listener.local_addr().unwrap();
738        let engine_for_server = engine.clone();
739        let server = tokio::spawn(async move {
740            let (sock, _) = listener.accept().await.unwrap();
741            let (r, w) = sock.into_split();
742            let mut session = Session::new(
743                ChunkReader::new(r),
744                ChunkWriter::new(w),
745                engine_for_server,
746                None,
747            );
748            session.run().await.unwrap(); // EOF on client FIN → Ok
749            let handle = session.handle.as_ref().expect("publish handle");
750            let has_config = handle.cached_configs().0.is_some();
751            let has_keyframe = handle.replay_buffer().iter().any(|f| f.is_keyframe());
752            (has_config, has_keyframe)
753        });
754
755        let client = tokio::spawn(async move {
756            let stream = tokio::net::TcpStream::connect(addr).await.unwrap();
757            let (mut read_half, write_half) = stream.into_split();
758            // Drain server→client responses so its writes never fail.
759            tokio::spawn(async move {
760                let mut buf = [0u8; 4096];
761                while let Ok(n) = read_half.read(&mut buf).await {
762                    if n == 0 {
763                        break;
764                    }
765                }
766            });
767
768            let cmd = |values: &[Amf0Value]| {
769                let mut b = bytes::BytesMut::new();
770                for v in values {
771                    v.encode(&mut b);
772                }
773                b
774            };
775            let mut w = ChunkWriter::new(write_half);
776            let connect = cmd(&[
777                amf::string("connect"),
778                Amf0Value::Number(1.0),
779                amf::object(vec![("app", amf::string("live"))]),
780            ]);
781            w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &connect)
782                .await
783                .unwrap();
784            let create = cmd(&[
785                amf::string("createStream"),
786                Amf0Value::Number(2.0),
787                Amf0Value::Null,
788            ]);
789            w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &create)
790                .await
791                .unwrap();
792            let publish = cmd(&[
793                amf::string("publish"),
794                Amf0Value::Number(3.0),
795                Amf0Value::Null,
796                amf::string("cam"),
797                amf::string("live"),
798            ]);
799            w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &publish)
800                .await
801                .unwrap();
802
803            // Media: AVC sequence header then a keyframe.
804            let seq =
805                flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
806            w.write_message(CSID_VIDEO, MSG_VIDEO, 0, STREAM_ID, &seq)
807                .await
808                .unwrap();
809            let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
810            w.write_message(CSID_VIDEO, MSG_VIDEO, 40, STREAM_ID, &kf)
811                .await
812                .unwrap();
813            // Dropping the OwnedWriteHalf sends FIN — the server reader sees EOF.
814        });
815
816        client.await.unwrap();
817        let (has_config, has_keyframe) = server.await.unwrap();
818        assert!(has_config, "video config reached the engine over the wire");
819        assert!(has_keyframe, "keyframe reached the GOP cache over the wire");
820    }
821
822    #[test]
823    fn annexb_config_splits_sps_and_pps() {
824        let annexb = Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42, 0x00, 1, 0, 0, 0, 1, 0x68, 0xCE]);
825        let cfg = annexb_to_avc_config(&annexb);
826        assert_eq!(cfg.sps.len(), 1);
827        assert_eq!(cfg.pps.len(), 1);
828        assert_eq!(cfg.sps[0][0] & 0x1F, 7);
829        assert_eq!(cfg.pps[0][0] & 0x1F, 8);
830    }
831}