Skip to main content

arcly_stream/protocol/rtmp/
mod.rs

1//! A working **RTMP** [`InboundProtocol`](crate::InboundProtocol): publish
2//! (ingest) and play (egress) over the real Adobe wire protocol.
3//!
4//! This is the crate's **reference implementation** of the
5//! [multi-protocol ingestion architecture](crate::inbound) — a real, end-to-end
6//! template for the shape an RTSP, SRT, or WHIP worker would take.
7//!
8//! Gated behind the `rtmp` feature. This is not a stub over the generic TCP
9//! accept loop — it performs the simple handshake, reassembles the chunk stream,
10//! decodes AMF0 commands, and bridges FLV audio/video tags to the engine's
11//! lock-free bus:
12//!
13//! * **Publish** — `connect` → `createStream` → `publish` brings up a
14//!   [`StreamHandle`] via [`PublishRegistry::start_publish`]. Incoming AVC NALUs
15//!   are converted to Annex-B (keyframes self-contained with SPS/PPS) and AAC to
16//!   ADTS, then pushed through [`StreamHandle::publish_frame`] — so the GOP cache,
17//!   QoS counters, and any [`Muxer`](crate::packager::Muxer) (e.g.
18//!   [`MpegTsMuxer`](crate::packager::MpegTsMuxer)) work unchanged.
19//! * **Play** — `play` resolves an existing stream, replays the cached configs +
20//!   current GOP for an instant start, then forwards live frames as FLV tags.
21//!
22//! ```no_run
23//! # async fn demo() -> arcly_stream::Result<()> {
24//! use arcly_stream::prelude::*;
25//! use arcly_stream::protocol::rtmp::RtmpHandler;
26//!
27//! let engine = Engine::builder()
28//!     .application(AppSpec::new("live").gop_cache(120))
29//!     .build();
30//! // `.with_playback` enables `play`; `Arc<Engine>` coerces to both registry traits.
31//! let handler = RtmpHandler::new("0.0.0.0:1935".parse().unwrap())
32//!     .with_playback(engine.clone());
33//! // Publish to rtmp://host/live/<stream>, play from the same URL.
34//! engine.serve(vec![Box::new(handler)], CancellationToken::new()).await
35//! # }
36//! ```
37
38mod amf;
39mod chunk;
40mod flv;
41mod handshake;
42mod relay;
43
44pub use relay::RtmpRelay;
45
46use crate::bus::{PlaybackRegistry, PublishRegistry, StreamHandle};
47use crate::{MediaFrame, Result, StreamError, StreamKey};
48use amf::Amf0Value;
49use async_trait::async_trait;
50use chunk::{ChunkReader, ChunkWriter, RtmpMessage};
51use std::net::SocketAddr;
52use std::sync::Arc;
53use tokio::net::TcpStream;
54use tokio_util::sync::CancellationToken;
55use tracing::{debug, info, warn};
56
57/// Default RTMP port.
58pub const DEFAULT_RTMP_PORT: u16 = 1935;
59/// Chunk size the server announces to peers (larger than the 128-byte default
60/// reduces per-chunk header overhead on the media path).
61const OUT_CHUNK_SIZE: usize = 4096;
62
63// RTMP message type ids.
64const MSG_SET_CHUNK_SIZE: u8 = 1;
65const MSG_ACK: u8 = 3;
66const MSG_USER_CONTROL: u8 = 4;
67const MSG_WINDOW_ACK_SIZE: u8 = 5;
68const MSG_SET_PEER_BANDWIDTH: u8 = 6;
69const MSG_AUDIO: u8 = 8;
70const MSG_VIDEO: u8 = 9;
71const MSG_DATA_AMF3: u8 = 15;
72const MSG_DATA_AMF0: u8 = 18;
73const MSG_COMMAND_AMF3: u8 = 17;
74const MSG_COMMAND_AMF0: u8 = 20;
75
76// Chunk-stream ids we emit on.
77const CSID_CONTROL: u8 = 2;
78const CSID_COMMAND: u8 = 3;
79const CSID_AUDIO: u8 = 4;
80const CSID_VIDEO: u8 = 6;
81
82/// The message stream id assigned to every published/played stream. RTMP allows
83/// many; one per connection is sufficient for a live origin.
84const STREAM_ID: u32 = 1;
85
86/// RTMP protocol handler. One instance serves an address; each connection is an
87/// independent publish or play session.
88///
89/// Publish (ingest) works through the [`InboundProtocol`](crate::InboundProtocol)
90/// contract alone — the [`IngestContext`](crate::inbound::IngestContext) handed to
91/// [`serve`](crate::InboundProtocol::serve) carries the publish registry. Play
92/// (egress) additionally needs read access to live streams, which the ingest
93/// context does not provide, so supply a [`PlaybackRegistry`] via
94/// [`with_playback`](Self::with_playback) (the bundled [`Engine`](crate::Engine)
95/// implements both). Without it, `play` requests are answered with a failure status.
96pub struct RtmpHandler {
97    addr: SocketAddr,
98    max_connections: usize,
99    playback: Option<Arc<dyn PlaybackRegistry>>,
100}
101
102impl RtmpHandler {
103    /// Create a handler bound to `addr` (typically `0.0.0.0:1935`).
104    pub fn new(addr: SocketAddr) -> Self {
105        Self {
106            addr,
107            max_connections: 1024,
108            playback: None,
109        }
110    }
111
112    /// Cap the number of concurrent connections (default 1024).
113    pub fn max_connections(mut self, max: usize) -> Self {
114        self.max_connections = max;
115        self
116    }
117
118    /// Enable `play` (egress) by providing a [`PlaybackRegistry`] to resolve live
119    /// streams from. Pass the same engine you publish into.
120    pub fn with_playback(mut self, playback: Arc<dyn PlaybackRegistry>) -> Self {
121        self.playback = Some(playback);
122        self
123    }
124}
125
126/// `RtmpHandler` is the **reference [`InboundProtocol`](crate::InboundProtocol)
127/// implementation**: a real-world template for the multi-protocol ingestion
128/// architecture. It owns a TCP listener via
129/// [`run_tcp_ingest_server`](crate::protocol::run_tcp_ingest_server), performs the
130/// RTMP handshake per connection, and bridges FLV AVC/AAC onto the bus through the
131/// [`IngestContext`](crate::inbound::IngestContext)'s registry — exactly the shape
132/// an RTSP, SRT, or WHIP worker would take.
133#[async_trait]
134impl crate::inbound::InboundProtocol for RtmpHandler {
135    fn name(&self) -> &'static str {
136        "rtmp"
137    }
138
139    async fn serve(
140        &self,
141        ctx: crate::inbound::IngestContext,
142        shutdown: CancellationToken,
143    ) -> Result<()> {
144        info!(addr = %self.addr, "rtmp handler listening");
145        // Per-connection sessions own their own multi-stream state machine, so
146        // the handler talks to the registry directly rather than a single
147        // PublishSession; the ergonomic session path is shown in the
148        // `custom_protocol_plugin` example.
149        let registry = Arc::clone(ctx.registry());
150        let playback = self.playback.clone();
151        crate::protocol::run_tcp_ingest_server(
152            self.addr,
153            self.max_connections,
154            shutdown,
155            move |sock, peer| {
156                let registry = Arc::clone(&registry);
157                let playback = playback.clone();
158                async move {
159                    if let Err(e) = handle_connection(sock, peer, registry, playback).await {
160                        warn!(%peer, error = %e, "rtmp session ended with error");
161                    }
162                }
163            },
164        )
165        .await
166    }
167}
168
169/// Handshake, split the socket, and run the session state machine.
170async fn handle_connection(
171    mut sock: TcpStream,
172    peer: SocketAddr,
173    registry: Arc<dyn PublishRegistry>,
174    playback: Option<Arc<dyn PlaybackRegistry>>,
175) -> Result<()> {
176    handshake::accept(&mut sock).await?;
177    debug!(%peer, "rtmp handshake complete");
178    let (read_half, write_half) = sock.into_split();
179    let mut session = Session::new(
180        ChunkReader::new(read_half),
181        ChunkWriter::new(write_half),
182        registry,
183        playback,
184    );
185    session.run().await
186}
187
188/// The decoded role of a session, determined by the first `publish`/`play` command.
189#[derive(PartialEq)]
190enum Role {
191    Pending,
192    Publishing,
193    Playing,
194}
195
196/// Per-connection RTMP session state.
197struct Session<R, W> {
198    reader: ChunkReader<R>,
199    writer: ChunkWriter<W>,
200    registry: Arc<dyn PublishRegistry>,
201    playback: Option<Arc<dyn PlaybackRegistry>>,
202    app: Option<String>,
203    role: Role,
204    publish_key: Option<StreamKey>,
205    handle: Option<StreamHandle>,
206    avc: Option<flv::AvcConfig>,
207    aac: Option<flv::AudioConfig>,
208    /// Egress: whether an AAC sequence header has been sent to a play client yet.
209    audio_seq_sent: bool,
210    stop: bool,
211}
212
213impl<R, W> Session<R, W>
214where
215    R: tokio::io::AsyncRead + Unpin,
216    W: tokio::io::AsyncWrite + Unpin,
217{
218    fn new(
219        reader: ChunkReader<R>,
220        writer: ChunkWriter<W>,
221        registry: Arc<dyn PublishRegistry>,
222        playback: Option<Arc<dyn PlaybackRegistry>>,
223    ) -> Self {
224        Self {
225            reader,
226            writer,
227            registry,
228            playback,
229            app: None,
230            role: Role::Pending,
231            publish_key: None,
232            handle: None,
233            avc: None,
234            aac: None,
235            audio_seq_sent: false,
236            stop: false,
237        }
238    }
239
240    /// Drive the session to completion, releasing the publish slot on exit.
241    async fn run(&mut self) -> Result<()> {
242        let result = self.event_loop().await;
243        if let Some(key) = self.publish_key.take() {
244            let _ = self.registry.end_publish(&key).await;
245        }
246        match result {
247            Ok(()) => Ok(()),
248            Err(e) if is_disconnect(&e) => Ok(()),
249            Err(e) => Err(e),
250        }
251    }
252
253    async fn event_loop(&mut self) -> Result<()> {
254        while !self.stop {
255            let msg = self.reader.read_message().await?;
256            self.process(msg).await?;
257        }
258        Ok(())
259    }
260
261    async fn process(&mut self, msg: RtmpMessage) -> Result<()> {
262        match msg.type_id {
263            MSG_SET_CHUNK_SIZE => {
264                if let Ok(b) = <[u8; 4]>::try_from(&msg.payload[..msg.payload.len().min(4)]) {
265                    self.reader.set_chunk_size(u32::from_be_bytes(b) as usize);
266                }
267            }
268            MSG_COMMAND_AMF0 => self.handle_command(&msg.payload).await?,
269            MSG_COMMAND_AMF3 => {
270                // AMF3 command payloads are prefixed with a single 0x00 byte then
271                // carry an AMF0 body in practice.
272                self.handle_command(&msg.payload[1.min(msg.payload.len())..])
273                    .await?
274            }
275            MSG_VIDEO if self.role == Role::Publishing => self.on_video(&msg)?,
276            MSG_AUDIO if self.role == Role::Publishing => self.on_audio(&msg)?,
277            MSG_DATA_AMF0 | MSG_DATA_AMF3 => { /* onMetaData — not required for transport */ }
278            MSG_ACK | MSG_WINDOW_ACK_SIZE | MSG_SET_PEER_BANDWIDTH | MSG_USER_CONTROL => {}
279            other => debug!(
280                type_id = other,
281                stream = msg.msg_stream_id,
282                "ignoring RTMP message"
283            ),
284        }
285        Ok(())
286    }
287
288    // ── Command channel (AMF0) ──────────────────────────────────────────────
289
290    async fn handle_command(&mut self, payload: &[u8]) -> Result<()> {
291        let values = amf::decode_all(payload);
292        let Some(name) = values.first().and_then(Amf0Value::as_str) else {
293            return Ok(());
294        };
295        let txn = values.get(1).and_then(Amf0Value::as_number).unwrap_or(0.0);
296        match name {
297            "connect" => self.on_connect(txn, values.get(2)).await,
298            "createStream" => self.on_create_stream(txn).await,
299            "publish" => self.on_publish(values.get(3)).await,
300            "play" => self.on_play(values.get(3)).await,
301            "releaseStream" | "FCPublish" | "FCUnpublish" | "deleteStream" | "closeStream"
302            | "FCSubscribe" => Ok(()),
303            other => {
304                debug!(command = other, "unhandled RTMP command");
305                Ok(())
306            }
307        }
308    }
309
310    async fn on_connect(&mut self, txn: f64, cmd_obj: Option<&Amf0Value>) -> Result<()> {
311        let app = cmd_obj
312            .and_then(|o| o.get("app"))
313            .and_then(Amf0Value::as_str)
314            .unwrap_or("")
315            .split('?')
316            .next()
317            .unwrap_or("")
318            .to_string();
319        self.app = Some(app);
320
321        // Window Ack Size, Set Peer Bandwidth, Set Chunk Size — protocol prelude.
322        self.send_control(MSG_WINDOW_ACK_SIZE, &2_500_000u32.to_be_bytes())
323            .await?;
324        let mut bw = Vec::from(2_500_000u32.to_be_bytes());
325        bw.push(2); // limit type: dynamic
326        self.send_control(MSG_SET_PEER_BANDWIDTH, &bw).await?;
327        self.send_control(MSG_SET_CHUNK_SIZE, &(OUT_CHUNK_SIZE as u32).to_be_bytes())
328            .await?;
329        self.writer.set_chunk_size(OUT_CHUNK_SIZE);
330
331        let props = amf::object(vec![
332            ("fmsVer", amf::string("FMS/3,0,1,123")),
333            ("capabilities", Amf0Value::Number(31.0)),
334        ]);
335        let info = amf::object(vec![
336            ("level", amf::string("status")),
337            ("code", amf::string("NetConnection.Connect.Success")),
338            ("description", amf::string("Connection succeeded.")),
339            ("objectEncoding", Amf0Value::Number(0.0)),
340        ]);
341        self.send_command(
342            0,
343            &[amf::string("_result"), Amf0Value::Number(txn), props, info],
344        )
345        .await
346    }
347
348    async fn on_create_stream(&mut self, txn: f64) -> Result<()> {
349        self.send_command(
350            0,
351            &[
352                amf::string("_result"),
353                Amf0Value::Number(txn),
354                Amf0Value::Null,
355                Amf0Value::Number(STREAM_ID as f64),
356            ],
357        )
358        .await
359    }
360
361    async fn on_publish(&mut self, name: Option<&Amf0Value>) -> Result<()> {
362        let key = self.stream_key(name)?;
363        let handle = self.registry.start_publish(&key).await?;
364        info!(stream = %key, "rtmp publish started");
365        self.handle = Some(handle);
366        self.publish_key = Some(key);
367        self.role = Role::Publishing;
368        self.send_status("status", "NetStream.Publish.Start", "Publishing started.")
369            .await
370    }
371
372    async fn on_play(&mut self, name: Option<&Amf0Value>) -> Result<()> {
373        let key = self.stream_key(name)?;
374        let Some(playback) = self.playback.clone() else {
375            self.send_status("error", "NetStream.Play.Failed", "Playback not enabled.")
376                .await?;
377            self.stop = true;
378            return Ok(());
379        };
380        let handle = playback.get_stream(&key)?;
381        info!(stream = %key, "rtmp play started");
382        self.role = Role::Playing;
383
384        // StreamBegin user-control event, then the play status events.
385        let mut begin = Vec::from(0u16.to_be_bytes()); // event type 0 = StreamBegin
386        begin.extend_from_slice(&STREAM_ID.to_be_bytes());
387        self.send_control(MSG_USER_CONTROL, &begin).await?;
388        self.send_status("status", "NetStream.Play.Reset", "Playing and resetting.")
389            .await?;
390        self.send_status("status", "NetStream.Play.Start", "Started playing.")
391            .await?;
392
393        self.serve_play(handle).await?;
394        self.stop = true;
395        Ok(())
396    }
397
398    // ── Publish: FLV tag → MediaFrame ───────────────────────────────────────
399
400    fn on_video(&mut self, msg: &RtmpMessage) -> Result<()> {
401        let Some(header) = flv::parse_video_header(&msg.payload) else {
402            return Ok(()); // non-AVC video, ignored
403        };
404        let body = &msg.payload[header.body_offset..];
405        let ts = msg.timestamp as i64;
406
407        if header.avc_packet_type == flv::PKT_SEQUENCE_HEADER {
408            let cfg = flv::AvcConfig::parse(body)
409                .ok_or_else(|| StreamError::protocol("bad AVCDecoderConfigurationRecord"))?;
410            let mut frame =
411                MediaFrame::new_video(ts, ts, cfg.to_annexb(), crate::CodecId::H264, false);
412            frame.flags |= crate::FrameFlags::CONFIG;
413            self.avc = Some(cfg);
414            self.publish(frame);
415        } else if header.avc_packet_type == flv::PKT_RAW {
416            let Some(cfg) = self.avc.as_ref() else {
417                return Ok(()); // NALUs before the sequence header; drop
418            };
419            let annexb = cfg
420                .avcc_to_annexb(body, header.is_keyframe)
421                .ok_or_else(|| StreamError::protocol("malformed AVCC NAL"))?;
422            let pts = ts + header.composition_time as i64;
423            let frame =
424                MediaFrame::new_video(pts, ts, annexb, crate::CodecId::H264, header.is_keyframe);
425            self.publish(frame);
426        }
427        Ok(())
428    }
429
430    fn on_audio(&mut self, msg: &RtmpMessage) -> Result<()> {
431        let payload = &msg.payload;
432        let Some(&b0) = payload.first() else {
433            return Ok(());
434        };
435        if b0 >> 4 != flv::AUDIO_FORMAT_AAC {
436            return Ok(()); // non-AAC audio, ignored
437        }
438        let aac_packet_type = *payload.get(1).unwrap_or(&0);
439        let body = &payload[2.min(payload.len())..];
440        let ts = msg.timestamp as i64;
441
442        if aac_packet_type == flv::PKT_SEQUENCE_HEADER {
443            let cfg = flv::AudioConfig::parse(body)
444                .ok_or_else(|| StreamError::protocol("bad AudioSpecificConfig"))?;
445            self.aac = Some(cfg);
446            let mut frame =
447                MediaFrame::new_audio(ts, bytes::Bytes::copy_from_slice(body), crate::CodecId::AAC);
448            frame.flags |= crate::FrameFlags::CONFIG;
449            self.publish(frame);
450        } else if let Some(cfg) = self.aac.as_ref() {
451            let adts = cfg.to_adts(body);
452            self.publish(MediaFrame::new_audio(ts, adts, crate::CodecId::AAC));
453        }
454        Ok(())
455    }
456
457    fn publish(&self, frame: MediaFrame) {
458        if let Some(handle) = self.handle.as_ref() {
459            let _ = handle.publish_frame(frame);
460        }
461    }
462
463    // ── Play: MediaFrame → FLV tag ──────────────────────────────────────────
464
465    async fn serve_play(&mut self, handle: StreamHandle) -> Result<()> {
466        // Instant start: replay cached configs + current GOP, then go live.
467        for frame in handle.replay_buffer() {
468            self.send_media(&frame).await?;
469        }
470        let mut sub = handle.subscribe_resilient();
471        while let Some(frame) = sub.recv().await {
472            self.send_media(&frame).await?;
473        }
474        Ok(())
475    }
476
477    async fn send_media(&mut self, frame: &MediaFrame) -> Result<()> {
478        let is_config = frame.flags.contains(crate::FrameFlags::CONFIG);
479        match frame.codec {
480            crate::CodecId::H264 => {
481                let ts = frame.dts.max(0) as u32;
482                let tag = if is_config {
483                    let cfg = annexb_to_avc_config(&frame.data);
484                    flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &cfg.to_avc_record())
485                } else {
486                    let avcc = crate::codec::h264::annexb_to_avcc(&frame.data);
487                    let cts = (frame.pts - frame.dts) as i32;
488                    flv::build_video_tag(frame.is_keyframe(), flv::PKT_RAW, cts, &avcc)
489                };
490                self.writer
491                    .write_message(CSID_VIDEO, MSG_VIDEO, ts, STREAM_ID, &tag)
492                    .await
493            }
494            crate::CodecId::AAC => {
495                let ts = frame.pts.max(0) as u32;
496                if is_config {
497                    self.audio_seq_sent = true;
498                    let tag = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &frame.data);
499                    return self
500                        .writer
501                        .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
502                        .await;
503                }
504                // A play client needs the AAC sequence header before raw frames.
505                // If the source provided no config frame, synthesize one from the
506                // ADTS header so egress is self-sufficient from any AAC source.
507                if !self.audio_seq_sent {
508                    if let Some(cfg) = flv::AudioConfig::from_adts(&frame.data) {
509                        let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &cfg.to_asc());
510                        self.writer
511                            .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &seq)
512                            .await?;
513                        self.audio_seq_sent = true;
514                    }
515                }
516                // Engine stores ADTS; strip the 7-byte header back to raw AAC.
517                let raw = frame.data.get(7..).unwrap_or(&[]);
518                let tag = flv::build_audio_tag(flv::PKT_RAW, raw);
519                self.writer
520                    .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
521                    .await
522            }
523            _ => Ok(()), // only H.264 + AAC are carried over RTMP
524        }
525    }
526
527    // ── Low-level senders ───────────────────────────────────────────────────
528
529    /// Resolve the [`StreamKey`] for this session from a publish/play stream name.
530    fn stream_key(&self, name: Option<&Amf0Value>) -> Result<StreamKey> {
531        let app = self
532            .app
533            .clone()
534            .ok_or_else(|| StreamError::protocol("stream command before connect"))?;
535        let stream = name
536            .and_then(Amf0Value::as_str)
537            .ok_or_else(|| StreamError::protocol("missing stream name"))?
538            .split('?')
539            .next()
540            .unwrap_or("")
541            .to_string();
542        Ok(StreamKey::new(app, stream))
543    }
544
545    async fn send_control(&mut self, type_id: u8, payload: &[u8]) -> Result<()> {
546        self.writer
547            .write_message(CSID_CONTROL, type_id, 0, 0, payload)
548            .await
549    }
550
551    async fn send_command(&mut self, msg_stream_id: u32, values: &[Amf0Value]) -> Result<()> {
552        let mut buf = bytes::BytesMut::new();
553        for v in values {
554            v.encode(&mut buf);
555        }
556        self.writer
557            .write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, msg_stream_id, &buf)
558            .await
559    }
560
561    async fn send_status(&mut self, level: &str, code: &str, description: &str) -> Result<()> {
562        let info = amf::object(vec![
563            ("level", amf::string(level)),
564            ("code", amf::string(code)),
565            ("description", amf::string(description)),
566        ]);
567        self.send_command(
568            STREAM_ID,
569            &[
570                amf::string("onStatus"),
571                Amf0Value::Number(0.0),
572                Amf0Value::Null,
573                info,
574            ],
575        )
576        .await
577    }
578}
579
580/// Split an Annex-B SPS/PPS access unit into an [`flv::AvcConfig`] for egress.
581fn annexb_to_avc_config(annexb: &[u8]) -> flv::AvcConfig {
582    let mut cfg = flv::AvcConfig {
583        nal_length_size: 4,
584        ..Default::default()
585    };
586    for nal in crate::codec::h264::iter_nals_annexb(annexb) {
587        match nal.first().map(|b| b & 0x1F) {
588            Some(7) => cfg.sps.push(nal.to_vec()),
589            Some(8) => cfg.pps.push(nal.to_vec()),
590            _ => {}
591        }
592    }
593    cfg
594}
595
596/// Whether an error is a benign peer disconnect (vs. a real protocol fault).
597fn is_disconnect(e: &StreamError) -> bool {
598    matches!(e, StreamError::Io(io) if matches!(
599        io.kind(),
600        std::io::ErrorKind::UnexpectedEof
601            | std::io::ErrorKind::ConnectionReset
602            | std::io::ErrorKind::BrokenPipe
603            | std::io::ErrorKind::ConnectionAborted
604    ))
605}
606
607#[cfg(test)]
608mod tests {
609    use super::*;
610    use crate::{AppSpec, Engine};
611    use bytes::Bytes;
612
613    /// A keyframe AVCC NAL: 4-byte length + IDR slice bytes.
614    fn avcc_idr() -> Vec<u8> {
615        let slice = [0x65u8, 0x88, 0x84, 0x00];
616        let mut v = (slice.len() as u32).to_be_bytes().to_vec();
617        v.extend_from_slice(&slice);
618        v
619    }
620
621    fn avc_decoder_config() -> Vec<u8> {
622        let sps = [0x67u8, 0x42, 0x00, 0x1F, 0xAA];
623        let pps = [0x68u8, 0xCE, 0x3C, 0x80];
624        let mut r = vec![1, sps[1], sps[2], sps[3], 0xFF, 0xE1];
625        r.extend_from_slice(&(sps.len() as u16).to_be_bytes());
626        r.extend_from_slice(&sps);
627        r.push(1);
628        r.extend_from_slice(&(pps.len() as u16).to_be_bytes());
629        r.extend_from_slice(&pps);
630        r
631    }
632
633    /// Build a session whose writer discards output and whose reader is empty,
634    /// publishing into a freshly registered engine stream.
635    async fn publishing_session() -> (Session<tokio::io::Empty, tokio::io::Sink>, Arc<Engine>) {
636        let engine = Engine::builder()
637            .application(AppSpec::new("live").gop_cache(64))
638            .build();
639        let key = StreamKey::new("live", "cam");
640        let handle = engine.start_publish(&key).await.unwrap();
641
642        let mut session = Session::new(
643            ChunkReader::new(tokio::io::empty()),
644            ChunkWriter::new(tokio::io::sink()),
645            engine.clone(),
646            Some(engine.clone()),
647        );
648        session.app = Some("live".into());
649        session.handle = Some(handle);
650        session.role = Role::Publishing;
651        (session, engine)
652    }
653
654    #[tokio::test]
655    async fn publish_video_seq_header_then_keyframe_reaches_gop_cache() {
656        let (mut session, _engine) = publishing_session().await;
657
658        // 1) AVC sequence header → cached video config (Annex-B SPS/PPS).
659        let seq = flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
660        session
661            .on_video(&RtmpMessage {
662                type_id: MSG_VIDEO,
663                timestamp: 0,
664                msg_stream_id: STREAM_ID,
665                payload: seq,
666            })
667            .unwrap();
668
669        // 2) Keyframe NALU → GOP-anchoring frame, self-contained with SPS/PPS.
670        let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
671        session
672            .on_video(&RtmpMessage {
673                type_id: MSG_VIDEO,
674                timestamp: 40,
675                msg_stream_id: STREAM_ID,
676                payload: kf,
677            })
678            .unwrap();
679
680        let handle = session.handle.as_ref().unwrap();
681        let (vcfg, _) = handle.cached_configs();
682        let vcfg = vcfg.expect("video config cached");
683        assert!(vcfg.flags.contains(crate::FrameFlags::CONFIG));
684        // Config is Annex-B SPS then PPS.
685        assert_eq!(&vcfg.data[..5], &[0, 0, 0, 1, 0x67]);
686
687        // The keyframe is present in the replay buffer and starts with SPS (params
688        // were prepended so a player can start decoding immediately).
689        let replay = handle.replay_buffer();
690        let key = replay
691            .iter()
692            .find(|f| f.is_keyframe())
693            .expect("keyframe in GOP");
694        assert_eq!(&key.data[..5], &[0, 0, 0, 1, 0x67]);
695    }
696
697    #[tokio::test]
698    async fn publish_aac_builds_adts_frames() {
699        let (mut session, _engine) = publishing_session().await;
700        // AudioSpecificConfig: AAC-LC, 44.1 kHz, stereo → bytes 0x12 0x10.
701        let asc = [0x12u8, 0x10];
702        let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &asc);
703        session
704            .on_audio(&RtmpMessage {
705                type_id: MSG_AUDIO,
706                timestamp: 0,
707                msg_stream_id: STREAM_ID,
708                payload: seq,
709            })
710            .unwrap();
711
712        let raw = flv::build_audio_tag(flv::PKT_RAW, &[0x01, 0x02, 0x03]);
713        session
714            .on_audio(&RtmpMessage {
715                type_id: MSG_AUDIO,
716                timestamp: 23,
717                msg_stream_id: STREAM_ID,
718                payload: raw,
719            })
720            .unwrap();
721
722        // Subscribe after the fact: the cached audio config is the raw ASC.
723        let handle = session.handle.as_ref().unwrap();
724        let (_, acfg) = handle.cached_configs();
725        assert_eq!(&acfg.unwrap().data[..], &asc);
726    }
727
728    #[tokio::test]
729    async fn full_publish_session_drives_real_chunk_reader() {
730        use tokio::io::AsyncReadExt;
731
732        let engine = Engine::builder()
733            .application(AppSpec::new("live").gop_cache(64))
734            .build();
735
736        // Real TCP loopback so the client can half-close (FIN) its write side while
737        // still draining the server's control responses — exactly how OBS/FFmpeg
738        // behave. The session's post-handshake event loop is driven directly.
739        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
740        let addr = listener.local_addr().unwrap();
741        let engine_for_server = engine.clone();
742        let server = tokio::spawn(async move {
743            let (sock, _) = listener.accept().await.unwrap();
744            let (r, w) = sock.into_split();
745            let mut session = Session::new(
746                ChunkReader::new(r),
747                ChunkWriter::new(w),
748                engine_for_server,
749                None,
750            );
751            session.run().await.unwrap(); // EOF on client FIN → Ok
752            let handle = session.handle.as_ref().expect("publish handle");
753            let has_config = handle.cached_configs().0.is_some();
754            let has_keyframe = handle.replay_buffer().iter().any(|f| f.is_keyframe());
755            (has_config, has_keyframe)
756        });
757
758        let client = tokio::spawn(async move {
759            let stream = tokio::net::TcpStream::connect(addr).await.unwrap();
760            let (mut read_half, write_half) = stream.into_split();
761            // Drain server→client responses so its writes never fail.
762            tokio::spawn(async move {
763                let mut buf = [0u8; 4096];
764                while let Ok(n) = read_half.read(&mut buf).await {
765                    if n == 0 {
766                        break;
767                    }
768                }
769            });
770
771            let cmd = |values: &[Amf0Value]| {
772                let mut b = bytes::BytesMut::new();
773                for v in values {
774                    v.encode(&mut b);
775                }
776                b
777            };
778            let mut w = ChunkWriter::new(write_half);
779            let connect = cmd(&[
780                amf::string("connect"),
781                Amf0Value::Number(1.0),
782                amf::object(vec![("app", amf::string("live"))]),
783            ]);
784            w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &connect)
785                .await
786                .unwrap();
787            let create = cmd(&[
788                amf::string("createStream"),
789                Amf0Value::Number(2.0),
790                Amf0Value::Null,
791            ]);
792            w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &create)
793                .await
794                .unwrap();
795            let publish = cmd(&[
796                amf::string("publish"),
797                Amf0Value::Number(3.0),
798                Amf0Value::Null,
799                amf::string("cam"),
800                amf::string("live"),
801            ]);
802            w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &publish)
803                .await
804                .unwrap();
805
806            // Media: AVC sequence header then a keyframe.
807            let seq =
808                flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
809            w.write_message(CSID_VIDEO, MSG_VIDEO, 0, STREAM_ID, &seq)
810                .await
811                .unwrap();
812            let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
813            w.write_message(CSID_VIDEO, MSG_VIDEO, 40, STREAM_ID, &kf)
814                .await
815                .unwrap();
816            // Dropping the OwnedWriteHalf sends FIN — the server reader sees EOF.
817        });
818
819        client.await.unwrap();
820        let (has_config, has_keyframe) = server.await.unwrap();
821        assert!(has_config, "video config reached the engine over the wire");
822        assert!(has_keyframe, "keyframe reached the GOP cache over the wire");
823    }
824
825    #[test]
826    fn annexb_config_splits_sps_and_pps() {
827        let annexb = Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42, 0x00, 1, 0, 0, 0, 1, 0x68, 0xCE]);
828        let cfg = annexb_to_avc_config(&annexb);
829        assert_eq!(cfg.sps.len(), 1);
830        assert_eq!(cfg.pps.len(), 1);
831        assert_eq!(cfg.sps[0][0] & 0x1F, 7);
832        assert_eq!(cfg.pps[0][0] & 0x1F, 8);
833    }
834}