Skip to main content

arcly_stream/protocol/rtmp/
mod.rs

1//! A working **RTMP** [`ProtocolHandler`](crate::ProtocolHandler): publish
2//! (ingest) and play (egress) over the real Adobe wire protocol.
3//!
4//! Gated behind the `rtmp` feature. This is not a stub over the generic TCP
5//! accept loop — it performs the simple handshake, reassembles the chunk stream,
6//! decodes AMF0 commands, and bridges FLV audio/video tags to the engine's
7//! lock-free bus:
8//!
9//! * **Publish** — `connect` → `createStream` → `publish` brings up a
10//!   [`StreamHandle`] via [`PublishRegistry::start_publish`]. Incoming AVC NALUs
11//!   are converted to Annex-B (keyframes self-contained with SPS/PPS) and AAC to
12//!   ADTS, then pushed through [`StreamHandle::publish_frame`] — so the GOP cache,
13//!   QoS counters, and any [`Muxer`](crate::packager::Muxer) (e.g.
14//!   [`MpegTsMuxer`](crate::packager::MpegTsMuxer)) work unchanged.
15//! * **Play** — `play` resolves an existing stream, replays the cached configs +
16//!   current GOP for an instant start, then forwards live frames as FLV tags.
17//!
18//! ```no_run
19//! # async fn demo() -> arcly_stream::Result<()> {
20//! use arcly_stream::prelude::*;
21//! use arcly_stream::protocol::rtmp::RtmpHandler;
22//! use std::sync::Arc;
23//! use tokio_util::sync::CancellationToken;
24//!
25//! let engine = Engine::builder()
26//!     .application(AppSpec::new("live").gop_cache(120))
27//!     .build();
28//! // `.with_playback` enables `play`; `Arc<Engine>` coerces to both registry traits.
29//! let handler = RtmpHandler::new("0.0.0.0:1935".parse().unwrap())
30//!     .with_playback(engine.clone());
31//! // Publish to rtmp://host/live/<stream>, play from the same URL.
32//! handler.run(engine, CancellationToken::new()).await
33//! # }
34//! ```
35
36mod amf;
37mod chunk;
38mod flv;
39mod handshake;
40
41use crate::bus::{PlaybackRegistry, PublishRegistry, StreamHandle};
42use crate::{MediaFrame, Result, StreamError, StreamKey};
43use amf::Amf0Value;
44use async_trait::async_trait;
45use chunk::{ChunkReader, ChunkWriter, RtmpMessage};
46use std::net::SocketAddr;
47use std::sync::Arc;
48use tokio::net::TcpStream;
49use tokio_util::sync::CancellationToken;
50use tracing::{debug, info, warn};
51
52/// Default RTMP port.
53pub const DEFAULT_RTMP_PORT: u16 = 1935;
54/// Chunk size the server announces to peers (larger than the 128-byte default
55/// reduces per-chunk header overhead on the media path).
56const OUT_CHUNK_SIZE: usize = 4096;
57
58// RTMP message type ids.
59const MSG_SET_CHUNK_SIZE: u8 = 1;
60const MSG_ACK: u8 = 3;
61const MSG_USER_CONTROL: u8 = 4;
62const MSG_WINDOW_ACK_SIZE: u8 = 5;
63const MSG_SET_PEER_BANDWIDTH: u8 = 6;
64const MSG_AUDIO: u8 = 8;
65const MSG_VIDEO: u8 = 9;
66const MSG_DATA_AMF3: u8 = 15;
67const MSG_DATA_AMF0: u8 = 18;
68const MSG_COMMAND_AMF3: u8 = 17;
69const MSG_COMMAND_AMF0: u8 = 20;
70
71// Chunk-stream ids we emit on.
72const CSID_CONTROL: u8 = 2;
73const CSID_COMMAND: u8 = 3;
74const CSID_AUDIO: u8 = 4;
75const CSID_VIDEO: u8 = 6;
76
77/// The message stream id assigned to every published/played stream. RTMP allows
78/// many; one per connection is sufficient for a live origin.
79const STREAM_ID: u32 = 1;
80
81/// RTMP protocol handler. One instance serves an address; each connection is an
82/// independent publish or play session.
83///
84/// Publish (ingest) works through the [`ProtocolHandler`](crate::ProtocolHandler)
85/// contract alone. Play (egress) additionally needs read access to live streams,
86/// which `run` does not provide, so supply a [`PlaybackRegistry`] via
87/// [`with_playback`](Self::with_playback) (the bundled [`Engine`](crate::Engine)
88/// implements both). Without it, `play` requests are answered with a failure status.
89pub struct RtmpHandler {
90    addr: SocketAddr,
91    max_connections: usize,
92    playback: Option<Arc<dyn PlaybackRegistry>>,
93}
94
95impl RtmpHandler {
96    /// Create a handler bound to `addr` (typically `0.0.0.0:1935`).
97    pub fn new(addr: SocketAddr) -> Self {
98        Self {
99            addr,
100            max_connections: 1024,
101            playback: None,
102        }
103    }
104
105    /// Cap the number of concurrent connections (default 1024).
106    pub fn max_connections(mut self, max: usize) -> Self {
107        self.max_connections = max;
108        self
109    }
110
111    /// Enable `play` (egress) by providing a [`PlaybackRegistry`] to resolve live
112    /// streams from. Pass the same engine you publish into.
113    pub fn with_playback(mut self, playback: Arc<dyn PlaybackRegistry>) -> Self {
114        self.playback = Some(playback);
115        self
116    }
117}
118
119#[async_trait]
120impl crate::ProtocolHandler for RtmpHandler {
121    fn name(&self) -> &'static str {
122        "rtmp"
123    }
124
125    async fn run(
126        &self,
127        registry: Arc<dyn PublishRegistry>,
128        shutdown: CancellationToken,
129    ) -> Result<()> {
130        info!(addr = %self.addr, "rtmp handler listening");
131        let playback = self.playback.clone();
132        crate::protocol::run_tcp_ingest_server(
133            self.addr,
134            self.max_connections,
135            shutdown,
136            move |sock, peer| {
137                let registry = Arc::clone(&registry);
138                let playback = playback.clone();
139                async move {
140                    if let Err(e) = handle_connection(sock, peer, registry, playback).await {
141                        warn!(%peer, error = %e, "rtmp session ended with error");
142                    }
143                }
144            },
145        )
146        .await
147    }
148}
149
150/// Handshake, split the socket, and run the session state machine.
151async fn handle_connection(
152    mut sock: TcpStream,
153    peer: SocketAddr,
154    registry: Arc<dyn PublishRegistry>,
155    playback: Option<Arc<dyn PlaybackRegistry>>,
156) -> Result<()> {
157    handshake::accept(&mut sock).await?;
158    debug!(%peer, "rtmp handshake complete");
159    let (read_half, write_half) = sock.into_split();
160    let mut session = Session::new(
161        ChunkReader::new(read_half),
162        ChunkWriter::new(write_half),
163        registry,
164        playback,
165    );
166    session.run().await
167}
168
169/// The decoded role of a session, determined by the first `publish`/`play` command.
170#[derive(PartialEq)]
171enum Role {
172    Pending,
173    Publishing,
174    Playing,
175}
176
177/// Per-connection RTMP session state.
178struct Session<R, W> {
179    reader: ChunkReader<R>,
180    writer: ChunkWriter<W>,
181    registry: Arc<dyn PublishRegistry>,
182    playback: Option<Arc<dyn PlaybackRegistry>>,
183    app: Option<String>,
184    role: Role,
185    publish_key: Option<StreamKey>,
186    handle: Option<StreamHandle>,
187    avc: Option<flv::AvcConfig>,
188    aac: Option<flv::AudioConfig>,
189    /// Egress: whether an AAC sequence header has been sent to a play client yet.
190    audio_seq_sent: bool,
191    stop: bool,
192}
193
194impl<R, W> Session<R, W>
195where
196    R: tokio::io::AsyncRead + Unpin,
197    W: tokio::io::AsyncWrite + Unpin,
198{
199    fn new(
200        reader: ChunkReader<R>,
201        writer: ChunkWriter<W>,
202        registry: Arc<dyn PublishRegistry>,
203        playback: Option<Arc<dyn PlaybackRegistry>>,
204    ) -> Self {
205        Self {
206            reader,
207            writer,
208            registry,
209            playback,
210            app: None,
211            role: Role::Pending,
212            publish_key: None,
213            handle: None,
214            avc: None,
215            aac: None,
216            audio_seq_sent: false,
217            stop: false,
218        }
219    }
220
221    /// Drive the session to completion, releasing the publish slot on exit.
222    async fn run(&mut self) -> Result<()> {
223        let result = self.event_loop().await;
224        if let Some(key) = self.publish_key.take() {
225            let _ = self.registry.end_publish(&key).await;
226        }
227        match result {
228            Ok(()) => Ok(()),
229            Err(e) if is_disconnect(&e) => Ok(()),
230            Err(e) => Err(e),
231        }
232    }
233
234    async fn event_loop(&mut self) -> Result<()> {
235        while !self.stop {
236            let msg = self.reader.read_message().await?;
237            self.process(msg).await?;
238        }
239        Ok(())
240    }
241
242    async fn process(&mut self, msg: RtmpMessage) -> Result<()> {
243        match msg.type_id {
244            MSG_SET_CHUNK_SIZE => {
245                if let Ok(b) = <[u8; 4]>::try_from(&msg.payload[..msg.payload.len().min(4)]) {
246                    self.reader.set_chunk_size(u32::from_be_bytes(b) as usize);
247                }
248            }
249            MSG_COMMAND_AMF0 => self.handle_command(&msg.payload).await?,
250            MSG_COMMAND_AMF3 => {
251                // AMF3 command payloads are prefixed with a single 0x00 byte then
252                // carry an AMF0 body in practice.
253                self.handle_command(&msg.payload[1.min(msg.payload.len())..])
254                    .await?
255            }
256            MSG_VIDEO if self.role == Role::Publishing => self.on_video(&msg)?,
257            MSG_AUDIO if self.role == Role::Publishing => self.on_audio(&msg)?,
258            MSG_DATA_AMF0 | MSG_DATA_AMF3 => { /* onMetaData — not required for transport */ }
259            MSG_ACK | MSG_WINDOW_ACK_SIZE | MSG_SET_PEER_BANDWIDTH | MSG_USER_CONTROL => {}
260            other => debug!(
261                type_id = other,
262                stream = msg.msg_stream_id,
263                "ignoring RTMP message"
264            ),
265        }
266        Ok(())
267    }
268
269    // ── Command channel (AMF0) ──────────────────────────────────────────────
270
271    async fn handle_command(&mut self, payload: &[u8]) -> Result<()> {
272        let values = amf::decode_all(payload);
273        let Some(name) = values.first().and_then(Amf0Value::as_str) else {
274            return Ok(());
275        };
276        let txn = values.get(1).and_then(Amf0Value::as_number).unwrap_or(0.0);
277        match name {
278            "connect" => self.on_connect(txn, values.get(2)).await,
279            "createStream" => self.on_create_stream(txn).await,
280            "publish" => self.on_publish(values.get(3)).await,
281            "play" => self.on_play(values.get(3)).await,
282            "releaseStream" | "FCPublish" | "FCUnpublish" | "deleteStream" | "closeStream"
283            | "FCSubscribe" => Ok(()),
284            other => {
285                debug!(command = other, "unhandled RTMP command");
286                Ok(())
287            }
288        }
289    }
290
291    async fn on_connect(&mut self, txn: f64, cmd_obj: Option<&Amf0Value>) -> Result<()> {
292        let app = cmd_obj
293            .and_then(|o| o.get("app"))
294            .and_then(Amf0Value::as_str)
295            .unwrap_or("")
296            .split('?')
297            .next()
298            .unwrap_or("")
299            .to_string();
300        self.app = Some(app);
301
302        // Window Ack Size, Set Peer Bandwidth, Set Chunk Size — protocol prelude.
303        self.send_control(MSG_WINDOW_ACK_SIZE, &2_500_000u32.to_be_bytes())
304            .await?;
305        let mut bw = Vec::from(2_500_000u32.to_be_bytes());
306        bw.push(2); // limit type: dynamic
307        self.send_control(MSG_SET_PEER_BANDWIDTH, &bw).await?;
308        self.send_control(MSG_SET_CHUNK_SIZE, &(OUT_CHUNK_SIZE as u32).to_be_bytes())
309            .await?;
310        self.writer.set_chunk_size(OUT_CHUNK_SIZE);
311
312        let props = amf::object(vec![
313            ("fmsVer", amf::string("FMS/3,0,1,123")),
314            ("capabilities", Amf0Value::Number(31.0)),
315        ]);
316        let info = amf::object(vec![
317            ("level", amf::string("status")),
318            ("code", amf::string("NetConnection.Connect.Success")),
319            ("description", amf::string("Connection succeeded.")),
320            ("objectEncoding", Amf0Value::Number(0.0)),
321        ]);
322        self.send_command(
323            0,
324            &[amf::string("_result"), Amf0Value::Number(txn), props, info],
325        )
326        .await
327    }
328
329    async fn on_create_stream(&mut self, txn: f64) -> Result<()> {
330        self.send_command(
331            0,
332            &[
333                amf::string("_result"),
334                Amf0Value::Number(txn),
335                Amf0Value::Null,
336                Amf0Value::Number(STREAM_ID as f64),
337            ],
338        )
339        .await
340    }
341
342    async fn on_publish(&mut self, name: Option<&Amf0Value>) -> Result<()> {
343        let key = self.stream_key(name)?;
344        let handle = self.registry.start_publish(&key).await?;
345        info!(stream = %key, "rtmp publish started");
346        self.handle = Some(handle);
347        self.publish_key = Some(key);
348        self.role = Role::Publishing;
349        self.send_status("status", "NetStream.Publish.Start", "Publishing started.")
350            .await
351    }
352
353    async fn on_play(&mut self, name: Option<&Amf0Value>) -> Result<()> {
354        let key = self.stream_key(name)?;
355        let Some(playback) = self.playback.clone() else {
356            self.send_status("error", "NetStream.Play.Failed", "Playback not enabled.")
357                .await?;
358            self.stop = true;
359            return Ok(());
360        };
361        let handle = playback.get_stream(&key)?;
362        info!(stream = %key, "rtmp play started");
363        self.role = Role::Playing;
364
365        // StreamBegin user-control event, then the play status events.
366        let mut begin = Vec::from(0u16.to_be_bytes()); // event type 0 = StreamBegin
367        begin.extend_from_slice(&STREAM_ID.to_be_bytes());
368        self.send_control(MSG_USER_CONTROL, &begin).await?;
369        self.send_status("status", "NetStream.Play.Reset", "Playing and resetting.")
370            .await?;
371        self.send_status("status", "NetStream.Play.Start", "Started playing.")
372            .await?;
373
374        self.serve_play(handle).await?;
375        self.stop = true;
376        Ok(())
377    }
378
379    // ── Publish: FLV tag → MediaFrame ───────────────────────────────────────
380
381    fn on_video(&mut self, msg: &RtmpMessage) -> Result<()> {
382        let Some(header) = flv::parse_video_header(&msg.payload) else {
383            return Ok(()); // non-AVC video, ignored
384        };
385        let body = &msg.payload[header.body_offset..];
386        let ts = msg.timestamp as i64;
387
388        if header.avc_packet_type == flv::PKT_SEQUENCE_HEADER {
389            let cfg = flv::AvcConfig::parse(body)
390                .ok_or_else(|| StreamError::protocol("bad AVCDecoderConfigurationRecord"))?;
391            let mut frame =
392                MediaFrame::new_video(ts, ts, cfg.to_annexb(), crate::CodecId::H264, false);
393            frame.flags |= crate::FrameFlags::CONFIG;
394            self.avc = Some(cfg);
395            self.publish(frame);
396        } else if header.avc_packet_type == flv::PKT_RAW {
397            let Some(cfg) = self.avc.as_ref() else {
398                return Ok(()); // NALUs before the sequence header; drop
399            };
400            let annexb = cfg
401                .avcc_to_annexb(body, header.is_keyframe)
402                .ok_or_else(|| StreamError::protocol("malformed AVCC NAL"))?;
403            let pts = ts + header.composition_time as i64;
404            let frame =
405                MediaFrame::new_video(pts, ts, annexb, crate::CodecId::H264, header.is_keyframe);
406            self.publish(frame);
407        }
408        Ok(())
409    }
410
411    fn on_audio(&mut self, msg: &RtmpMessage) -> Result<()> {
412        let payload = &msg.payload;
413        let Some(&b0) = payload.first() else {
414            return Ok(());
415        };
416        if b0 >> 4 != flv::AUDIO_FORMAT_AAC {
417            return Ok(()); // non-AAC audio, ignored
418        }
419        let aac_packet_type = *payload.get(1).unwrap_or(&0);
420        let body = &payload[2.min(payload.len())..];
421        let ts = msg.timestamp as i64;
422
423        if aac_packet_type == flv::PKT_SEQUENCE_HEADER {
424            let cfg = flv::AudioConfig::parse(body)
425                .ok_or_else(|| StreamError::protocol("bad AudioSpecificConfig"))?;
426            self.aac = Some(cfg);
427            let mut frame =
428                MediaFrame::new_audio(ts, bytes::Bytes::copy_from_slice(body), crate::CodecId::AAC);
429            frame.flags |= crate::FrameFlags::CONFIG;
430            self.publish(frame);
431        } else if let Some(cfg) = self.aac.as_ref() {
432            let adts = cfg.to_adts(body);
433            self.publish(MediaFrame::new_audio(ts, adts, crate::CodecId::AAC));
434        }
435        Ok(())
436    }
437
438    fn publish(&self, frame: MediaFrame) {
439        if let Some(handle) = self.handle.as_ref() {
440            let _ = handle.publish_frame(frame);
441        }
442    }
443
444    // ── Play: MediaFrame → FLV tag ──────────────────────────────────────────
445
446    async fn serve_play(&mut self, handle: StreamHandle) -> Result<()> {
447        // Instant start: replay cached configs + current GOP, then go live.
448        for frame in handle.replay_buffer() {
449            self.send_media(&frame).await?;
450        }
451        let mut sub = handle.subscribe_resilient();
452        while let Some(frame) = sub.recv().await {
453            self.send_media(&frame).await?;
454        }
455        Ok(())
456    }
457
458    async fn send_media(&mut self, frame: &MediaFrame) -> Result<()> {
459        let is_config = frame.flags.contains(crate::FrameFlags::CONFIG);
460        match frame.codec {
461            crate::CodecId::H264 => {
462                let ts = frame.dts.max(0) as u32;
463                let tag = if is_config {
464                    let cfg = annexb_to_avc_config(&frame.data);
465                    flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &cfg.to_avc_record())
466                } else {
467                    let avcc = crate::codec::h264::annexb_to_avcc(&frame.data);
468                    let cts = (frame.pts - frame.dts) as i32;
469                    flv::build_video_tag(frame.is_keyframe(), flv::PKT_RAW, cts, &avcc)
470                };
471                self.writer
472                    .write_message(CSID_VIDEO, MSG_VIDEO, ts, STREAM_ID, &tag)
473                    .await
474            }
475            crate::CodecId::AAC => {
476                let ts = frame.pts.max(0) as u32;
477                if is_config {
478                    self.audio_seq_sent = true;
479                    let tag = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &frame.data);
480                    return self
481                        .writer
482                        .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
483                        .await;
484                }
485                // A play client needs the AAC sequence header before raw frames.
486                // If the source provided no config frame, synthesize one from the
487                // ADTS header so egress is self-sufficient from any AAC source.
488                if !self.audio_seq_sent {
489                    if let Some(cfg) = flv::AudioConfig::from_adts(&frame.data) {
490                        let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &cfg.to_asc());
491                        self.writer
492                            .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &seq)
493                            .await?;
494                        self.audio_seq_sent = true;
495                    }
496                }
497                // Engine stores ADTS; strip the 7-byte header back to raw AAC.
498                let raw = frame.data.get(7..).unwrap_or(&[]);
499                let tag = flv::build_audio_tag(flv::PKT_RAW, raw);
500                self.writer
501                    .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
502                    .await
503            }
504            _ => Ok(()), // only H.264 + AAC are carried over RTMP
505        }
506    }
507
508    // ── Low-level senders ───────────────────────────────────────────────────
509
510    /// Resolve the [`StreamKey`] for this session from a publish/play stream name.
511    fn stream_key(&self, name: Option<&Amf0Value>) -> Result<StreamKey> {
512        let app = self
513            .app
514            .clone()
515            .ok_or_else(|| StreamError::protocol("stream command before connect"))?;
516        let stream = name
517            .and_then(Amf0Value::as_str)
518            .ok_or_else(|| StreamError::protocol("missing stream name"))?
519            .split('?')
520            .next()
521            .unwrap_or("")
522            .to_string();
523        Ok(StreamKey::new(app, stream))
524    }
525
526    async fn send_control(&mut self, type_id: u8, payload: &[u8]) -> Result<()> {
527        self.writer
528            .write_message(CSID_CONTROL, type_id, 0, 0, payload)
529            .await
530    }
531
532    async fn send_command(&mut self, msg_stream_id: u32, values: &[Amf0Value]) -> Result<()> {
533        let mut buf = bytes::BytesMut::new();
534        for v in values {
535            v.encode(&mut buf);
536        }
537        self.writer
538            .write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, msg_stream_id, &buf)
539            .await
540    }
541
542    async fn send_status(&mut self, level: &str, code: &str, description: &str) -> Result<()> {
543        let info = amf::object(vec![
544            ("level", amf::string(level)),
545            ("code", amf::string(code)),
546            ("description", amf::string(description)),
547        ]);
548        self.send_command(
549            STREAM_ID,
550            &[
551                amf::string("onStatus"),
552                Amf0Value::Number(0.0),
553                Amf0Value::Null,
554                info,
555            ],
556        )
557        .await
558    }
559}
560
561/// Split an Annex-B SPS/PPS access unit into an [`flv::AvcConfig`] for egress.
562fn annexb_to_avc_config(annexb: &[u8]) -> flv::AvcConfig {
563    let mut cfg = flv::AvcConfig {
564        nal_length_size: 4,
565        ..Default::default()
566    };
567    for nal in crate::codec::h264::iter_nals_annexb(annexb) {
568        match nal.first().map(|b| b & 0x1F) {
569            Some(7) => cfg.sps.push(nal.to_vec()),
570            Some(8) => cfg.pps.push(nal.to_vec()),
571            _ => {}
572        }
573    }
574    cfg
575}
576
577/// Whether an error is a benign peer disconnect (vs. a real protocol fault).
578fn is_disconnect(e: &StreamError) -> bool {
579    matches!(e, StreamError::Io(io) if matches!(
580        io.kind(),
581        std::io::ErrorKind::UnexpectedEof
582            | std::io::ErrorKind::ConnectionReset
583            | std::io::ErrorKind::BrokenPipe
584            | std::io::ErrorKind::ConnectionAborted
585    ))
586}
587
588#[cfg(test)]
589mod tests {
590    use super::*;
591    use crate::{AppSpec, Engine};
592    use bytes::Bytes;
593
594    /// A keyframe AVCC NAL: 4-byte length + IDR slice bytes.
595    fn avcc_idr() -> Vec<u8> {
596        let slice = [0x65u8, 0x88, 0x84, 0x00];
597        let mut v = (slice.len() as u32).to_be_bytes().to_vec();
598        v.extend_from_slice(&slice);
599        v
600    }
601
602    fn avc_decoder_config() -> Vec<u8> {
603        let sps = [0x67u8, 0x42, 0x00, 0x1F, 0xAA];
604        let pps = [0x68u8, 0xCE, 0x3C, 0x80];
605        let mut r = vec![1, sps[1], sps[2], sps[3], 0xFF, 0xE1];
606        r.extend_from_slice(&(sps.len() as u16).to_be_bytes());
607        r.extend_from_slice(&sps);
608        r.push(1);
609        r.extend_from_slice(&(pps.len() as u16).to_be_bytes());
610        r.extend_from_slice(&pps);
611        r
612    }
613
614    /// Build a session whose writer discards output and whose reader is empty,
615    /// publishing into a freshly registered engine stream.
616    async fn publishing_session() -> (Session<tokio::io::Empty, tokio::io::Sink>, Arc<Engine>) {
617        let engine = Engine::builder()
618            .application(AppSpec::new("live").gop_cache(64))
619            .build();
620        let key = StreamKey::new("live", "cam");
621        let handle = engine.start_publish(&key).await.unwrap();
622
623        let mut session = Session::new(
624            ChunkReader::new(tokio::io::empty()),
625            ChunkWriter::new(tokio::io::sink()),
626            engine.clone(),
627            Some(engine.clone()),
628        );
629        session.app = Some("live".into());
630        session.handle = Some(handle);
631        session.role = Role::Publishing;
632        (session, engine)
633    }
634
635    #[tokio::test]
636    async fn publish_video_seq_header_then_keyframe_reaches_gop_cache() {
637        let (mut session, _engine) = publishing_session().await;
638
639        // 1) AVC sequence header → cached video config (Annex-B SPS/PPS).
640        let seq = flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
641        session
642            .on_video(&RtmpMessage {
643                type_id: MSG_VIDEO,
644                timestamp: 0,
645                msg_stream_id: STREAM_ID,
646                payload: seq,
647            })
648            .unwrap();
649
650        // 2) Keyframe NALU → GOP-anchoring frame, self-contained with SPS/PPS.
651        let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
652        session
653            .on_video(&RtmpMessage {
654                type_id: MSG_VIDEO,
655                timestamp: 40,
656                msg_stream_id: STREAM_ID,
657                payload: kf,
658            })
659            .unwrap();
660
661        let handle = session.handle.as_ref().unwrap();
662        let (vcfg, _) = handle.cached_configs();
663        let vcfg = vcfg.expect("video config cached");
664        assert!(vcfg.flags.contains(crate::FrameFlags::CONFIG));
665        // Config is Annex-B SPS then PPS.
666        assert_eq!(&vcfg.data[..5], &[0, 0, 0, 1, 0x67]);
667
668        // The keyframe is present in the replay buffer and starts with SPS (params
669        // were prepended so a player can start decoding immediately).
670        let replay = handle.replay_buffer();
671        let key = replay
672            .iter()
673            .find(|f| f.is_keyframe())
674            .expect("keyframe in GOP");
675        assert_eq!(&key.data[..5], &[0, 0, 0, 1, 0x67]);
676    }
677
678    #[tokio::test]
679    async fn publish_aac_builds_adts_frames() {
680        let (mut session, _engine) = publishing_session().await;
681        // AudioSpecificConfig: AAC-LC, 44.1 kHz, stereo → bytes 0x12 0x10.
682        let asc = [0x12u8, 0x10];
683        let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &asc);
684        session
685            .on_audio(&RtmpMessage {
686                type_id: MSG_AUDIO,
687                timestamp: 0,
688                msg_stream_id: STREAM_ID,
689                payload: seq,
690            })
691            .unwrap();
692
693        let raw = flv::build_audio_tag(flv::PKT_RAW, &[0x01, 0x02, 0x03]);
694        session
695            .on_audio(&RtmpMessage {
696                type_id: MSG_AUDIO,
697                timestamp: 23,
698                msg_stream_id: STREAM_ID,
699                payload: raw,
700            })
701            .unwrap();
702
703        // Subscribe after the fact: the cached audio config is the raw ASC.
704        let handle = session.handle.as_ref().unwrap();
705        let (_, acfg) = handle.cached_configs();
706        assert_eq!(&acfg.unwrap().data[..], &asc);
707    }
708
709    #[tokio::test]
710    async fn full_publish_session_drives_real_chunk_reader() {
711        use tokio::io::AsyncReadExt;
712
713        let engine = Engine::builder()
714            .application(AppSpec::new("live").gop_cache(64))
715            .build();
716
717        // Real TCP loopback so the client can half-close (FIN) its write side while
718        // still draining the server's control responses — exactly how OBS/FFmpeg
719        // behave. The session's post-handshake event loop is driven directly.
720        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
721        let addr = listener.local_addr().unwrap();
722        let engine_for_server = engine.clone();
723        let server = tokio::spawn(async move {
724            let (sock, _) = listener.accept().await.unwrap();
725            let (r, w) = sock.into_split();
726            let mut session = Session::new(
727                ChunkReader::new(r),
728                ChunkWriter::new(w),
729                engine_for_server,
730                None,
731            );
732            session.run().await.unwrap(); // EOF on client FIN → Ok
733            let handle = session.handle.as_ref().expect("publish handle");
734            let has_config = handle.cached_configs().0.is_some();
735            let has_keyframe = handle.replay_buffer().iter().any(|f| f.is_keyframe());
736            (has_config, has_keyframe)
737        });
738
739        let client = tokio::spawn(async move {
740            let stream = tokio::net::TcpStream::connect(addr).await.unwrap();
741            let (mut read_half, write_half) = stream.into_split();
742            // Drain server→client responses so its writes never fail.
743            tokio::spawn(async move {
744                let mut buf = [0u8; 4096];
745                while let Ok(n) = read_half.read(&mut buf).await {
746                    if n == 0 {
747                        break;
748                    }
749                }
750            });
751
752            let cmd = |values: &[Amf0Value]| {
753                let mut b = bytes::BytesMut::new();
754                for v in values {
755                    v.encode(&mut b);
756                }
757                b
758            };
759            let mut w = ChunkWriter::new(write_half);
760            let connect = cmd(&[
761                amf::string("connect"),
762                Amf0Value::Number(1.0),
763                amf::object(vec![("app", amf::string("live"))]),
764            ]);
765            w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &connect)
766                .await
767                .unwrap();
768            let create = cmd(&[
769                amf::string("createStream"),
770                Amf0Value::Number(2.0),
771                Amf0Value::Null,
772            ]);
773            w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &create)
774                .await
775                .unwrap();
776            let publish = cmd(&[
777                amf::string("publish"),
778                Amf0Value::Number(3.0),
779                Amf0Value::Null,
780                amf::string("cam"),
781                amf::string("live"),
782            ]);
783            w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &publish)
784                .await
785                .unwrap();
786
787            // Media: AVC sequence header then a keyframe.
788            let seq =
789                flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
790            w.write_message(CSID_VIDEO, MSG_VIDEO, 0, STREAM_ID, &seq)
791                .await
792                .unwrap();
793            let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
794            w.write_message(CSID_VIDEO, MSG_VIDEO, 40, STREAM_ID, &kf)
795                .await
796                .unwrap();
797            // Dropping the OwnedWriteHalf sends FIN — the server reader sees EOF.
798        });
799
800        client.await.unwrap();
801        let (has_config, has_keyframe) = server.await.unwrap();
802        assert!(has_config, "video config reached the engine over the wire");
803        assert!(has_keyframe, "keyframe reached the GOP cache over the wire");
804    }
805
806    #[test]
807    fn annexb_config_splits_sps_and_pps() {
808        let annexb = Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42, 0x00, 1, 0, 0, 0, 1, 0x68, 0xCE]);
809        let cfg = annexb_to_avc_config(&annexb);
810        assert_eq!(cfg.sps.len(), 1);
811        assert_eq!(cfg.pps.len(), 1);
812        assert_eq!(cfg.sps[0][0] & 0x1F, 7);
813        assert_eq!(cfg.pps[0][0] & 0x1F, 8);
814    }
815}