Skip to main content

oxideav_rtmp/
client.rs

1//! RTMP client: push a live stream to a remote RTMP server.
2//!
3//! ```text
4//!   let mut client = RtmpClient::connect("rtmp://remote/live/key")?;
5//!   client.send_video_sequence_header(&avcc_bytes)?;
6//!   client.send_audio_sequence_header(&aac_config)?;
7//!   loop {
8//!       client.send_video(ts_ms, keyframe, &nalu_bytes)?;
9//!       client.send_audio(ts_ms, &aac_frame)?;
10//!   }
11//! ```
12//!
13//! This crate emits one H.264 NAL per video call (no re-fragmentation
14//! into AVCC length-prefixed packets beyond the single-NAL case).
15//! Callers with multiple NALUs per sample can concatenate them into
16//! one body — RTMP just forwards bytes on the video channel.
17
18use std::collections::VecDeque;
19use std::io::{Read, Write};
20use std::net::{Shutdown, TcpStream, ToSocketAddrs};
21use std::time::Duration;
22
23use crate::aggregate::parse_aggregate;
24use crate::amf::{self, Amf0Value};
25use crate::amf3;
26use crate::caps::ConnectCapabilities;
27use crate::chunk::{ChunkReader, ChunkWriter, Message};
28use crate::error::{Error, Result};
29use crate::flv::{self, AudioTag, VideoTag};
30use crate::message::*;
31
32/// Server-originated event observed by an [`RtmpClient`] in publish
33/// mode.
34///
35/// During an active publish the client mostly writes audio / video /
36/// data and the server stays mostly silent — but a few server→client
37/// notifications matter end-to-end. The most important is
38/// [`StreamEof`](Self::StreamEof): the server signalling, per RTMP 1.0
39/// §7.1.7, that "the stream is dry, no more data will be sent without
40/// additional commands." A symmetric publish-side server uses the same
41/// `UserControl StreamEOF` event to mark end-of-publish before closing
42/// the TCP write half — and the client should treat that as a clean
43/// stream end rather than as an unexpected FIN.
44#[derive(Debug, Clone, PartialEq)]
45pub enum ClientEvent {
46    /// The server emitted `UserControl StreamBegin(stream_id)`
47    /// (UCM event type 0). Informational — most servers send this once
48    /// right after `createStream` succeeds.
49    StreamBegin { stream_id: u32 },
50    /// The server emitted `UserControl StreamEOF(stream_id)`
51    /// (UCM event type 1). End-of-stream from the server side. After
52    /// observing this, the caller should stop writing and shut the
53    /// client down via [`RtmpClient::close`].
54    StreamEof { stream_id: u32 },
55    /// The server emitted `UserControl StreamDry(stream_id)`
56    /// (UCM event type 2). Per RTMP 1.0 §3.7, the server uses this to
57    /// notify the client "that there is no more data on the stream. If
58    /// the server does not detect any message for a time period, it
59    /// can notify the subscribed clients that the stream is dry."
60    /// Distinct from [`StreamEof`](Self::StreamEof): `StreamDry` is a
61    /// "no data right now" signal that may resolve once more data
62    /// arrives; `StreamEof` is "playback finished, no more without
63    /// further commands."
64    StreamDry { stream_id: u32 },
65    /// The server emitted `UserControl StreamIsRecorded(stream_id)`
66    /// (UCM event type 4). Per RTMP 1.0 §3.7, "the server sends this
67    /// event to notify the client that the stream is a recorded
68    /// stream." Servers typically emit this right after `StreamBegin`
69    /// for an on-demand stream; for a publish-only client the event is
70    /// informational and usually ignored.
71    StreamIsRecorded { stream_id: u32 },
72    /// The server emitted `UserControl PingResponse(timestamp_ms)`
73    /// (UCM event type 7). Per RTMP 1.0 §3.7, the client sends a
74    /// `PingResponse` "in response to the ping request. The event
75    /// data is a 4-byte timestamp, which was received with the
76    /// kMsgPingRequest request." A server that emits `PingResponse`
77    /// is typically echoing back our own (publisher-side) `PingRequest`
78    /// — useful for measuring round-trip latency. The variant carries
79    /// the echoed timestamp verbatim.
80    PingResponse { timestamp_ms: u32 },
81    /// The server emitted `onStatus(...)` carrying NetStream state.
82    /// `level` is typically `"status"` / `"warning"` / `"error"`;
83    /// `code` is e.g. `"NetStream.Publish.Start"` /
84    /// `"NetStream.Unpublish.Success"` / `"NetStream.Publish.BadName"`.
85    OnStatus {
86        level: String,
87        code: String,
88        description: String,
89    },
90    /// The server emitted
91    /// `onStatus(NetConnection.Connect.ReconnectRequest)` — Enhanced
92    /// RTMP v2 §"Reconnect Request". The server is asking us to
93    /// reconnect, e.g. ahead of a server update or to remap us to a
94    /// different server instance.
95    ///
96    /// Per the spec's message flow, on receipt the client "persists
97    /// in streaming to/from the current server up to the next
98    /// appropriate media boundary, such as a keyframe. Subsequently,
99    /// it establishes a connection with a new server and disconnects
100    /// from the old server." So: finish the current GOP, then dial
101    /// [`RtmpClient::resolve_reconnect_url`]`(tc_url.as_deref())` with
102    /// a fresh [`RtmpClient::connect`] and drop this client.
103    ///
104    /// `tc_url` is the optional Info-Object property naming where to
105    /// reconnect — an absolute (`rtmp://host/app`) or relative
106    /// (`//host/app`, `/app`) URI reference. `None` means "use the
107    /// tcUrl for the current connection" per spec.
108    ReconnectRequest {
109        tc_url: Option<String>,
110        description: String,
111    },
112    /// The server emitted `_result(transaction_id, ...)` for a command
113    /// the client issued. The publish-time `connect` / `createStream`
114    /// transactions are consumed internally by [`RtmpClient::connect`];
115    /// any subsequent `_result` (e.g. a custom RPC sent after publish
116    /// started) surfaces here so the caller can match it against its
117    /// own transaction id.
118    Result {
119        transaction_id: f64,
120        values: Vec<Amf0Value>,
121    },
122    /// The server emitted `_error(transaction_id, ...)`. Symmetric to
123    /// [`Result`](Self::Result) but for the failure path.
124    ErrorReply {
125        transaction_id: f64,
126        values: Vec<Amf0Value>,
127    },
128    /// Any other server-originated message (ping, ack, set-chunk-size,
129    /// bandwidth — most of which the client handles transparently
130    /// inside [`RtmpClient::poll_event`] before this variant ever fires).
131    /// The variant exists so the caller's `match` arm can keep going.
132    Other,
133}
134
135const CLIENT_CHUNK_SIZE: u32 = 4096;
136const FLASH_VER: &str = "FMLE/3.0 (compatible; oxideav-rtmp)";
137
138pub struct RtmpClient {
139    stream: TcpStream,
140    /// Kept around so `recv` helpers (ack, onStatus replies, the
141    /// server-side `UserControl StreamEOF` mirror of our own
142    /// publish-side teardown) have somewhere to drain the server's
143    /// side. Surfaced through [`poll_event`](Self::poll_event).
144    reader: ChunkReader<TcpStream>,
145    writer: ChunkWriter<TcpStream>,
146    stream_id: u32,
147    /// Monotonic counter used for AMF command transaction ids.
148    next_tx: f64,
149    /// Set once we've observed the read half drain (EOF / connection
150    /// reset) so subsequent `poll_event` calls return `Ok(None)` rather
151    /// than re-entering [`ChunkReader::read_message`] on a dead socket.
152    /// Distinct from a `StreamEOF` user-control event — the server
153    /// normally sends `StreamEOF` *first* then a trailing onStatus, so
154    /// `poll_event` keeps reading until the kernel reports EOF.
155    read_eof: bool,
156    /// Enhanced RTMP capability block lifted from the server's
157    /// `_result(connect)` info object. Empty when the server didn't
158    /// advertise any v1+v2 capabilities (the historical pre-2023
159    /// shape). Inspect via [`server_capabilities`](Self::server_capabilities).
160    server_caps: ConnectCapabilities,
161    /// The `tcUrl` this client dialled, kept so an Enhanced RTMP v2
162    /// `NetConnection.Connect.ReconnectRequest` whose Info Object
163    /// omits `tcUrl` — or names a *relative* URI reference — can be
164    /// resolved per spec ("if not specified, use the tcUrl for the
165    /// current connection. A relative URI reference should be
166    /// resolved relative to the tcUrl for the current connection").
167    tc_url: String,
168    /// Sub-messages decomposed out of a server-originated Aggregate
169    /// Message (type 22) per RTMP 1.0 §7.1.6 but not yet routed
170    /// through the [`poll_event`](Self::poll_event) classify path.
171    /// In publish mode a remote server rarely batches its replies
172    /// this way, but `poll_event` decomposes the aggregate
173    /// transparently so a publisher that opted into a server-side
174    /// aggregate digest (`@enableEnhancedRTMP`-style negotiation, or
175    /// a peer reflecting its own ack stream as an aggregate) still
176    /// sees the per-event classification.
177    pending_subs: VecDeque<Message>,
178}
179
180/// Parsed RTMP URL: `rtmp://host[:port]/app/stream_name`.
181#[derive(Debug, Clone)]
182pub struct RtmpUrl {
183    pub host: String,
184    pub port: u16,
185    pub app: String,
186    pub stream_name: String,
187    pub tc_url: String,
188}
189
190impl RtmpUrl {
191    pub fn parse(url: &str) -> Result<Self> {
192        let s = url
193            .strip_prefix("rtmp://")
194            .ok_or_else(|| Error::Other(format!("not an rtmp:// URL: {url}")))?;
195        // authority/path
196        let slash = s
197            .find('/')
198            .ok_or_else(|| Error::Other("missing /app in rtmp URL".into()))?;
199        let authority = &s[..slash];
200        let path = &s[slash + 1..];
201        let (host, port) = match authority.rsplit_once(':') {
202            Some((h, p)) => (
203                h.to_owned(),
204                p.parse::<u16>()
205                    .map_err(|e| Error::Other(format!("rtmp URL bad port: {e}")))?,
206            ),
207            None => (authority.to_owned(), 1935),
208        };
209        let (app, stream_name) = match path.find('/') {
210            Some(i) => (path[..i].to_owned(), path[i + 1..].to_owned()),
211            None => (path.to_owned(), String::new()),
212        };
213        let tc_url = format!("rtmp://{authority}/{app}");
214        Ok(Self {
215            host,
216            port,
217            app,
218            stream_name,
219            tc_url,
220        })
221    }
222}
223
224impl RtmpClient {
225    /// Dial the given `rtmp://host[:port]/app/stream_name` URL,
226    /// perform the full handshake + connect + createStream + publish
227    /// sequence, and return a ready-to-send client.
228    pub fn connect(url: &str) -> Result<Self> {
229        let parsed = RtmpUrl::parse(url)?;
230        Self::connect_parsed(&parsed, "live", &ConnectCapabilities::default())
231    }
232
233    /// Same as [`connect`](Self::connect) but lets the caller pick the
234    /// RTMP `publish` type (typically `"live"`, `"record"`, or
235    /// `"append"`).
236    pub fn connect_with_type(url: &str, publish_type: &str) -> Result<Self> {
237        let parsed = RtmpUrl::parse(url)?;
238        Self::connect_parsed(&parsed, publish_type, &ConnectCapabilities::default())
239    }
240
241    /// Connect and advertise the supplied Enhanced RTMP v1+v2
242    /// capability block in the NetConnection `connect` command
243    /// (`enhanced-rtmp-v2.pdf` §"Enhancing NetConnection connect
244    /// Command"). The block is appended to the legacy Command Object
245    /// in the documented order — peers that don't speak E-RTMP keep
246    /// parsing the message correctly because the extras are tacked on
247    /// after the historical `videoFunction` field.
248    ///
249    /// The server's `_result` properties object is parsed for the same
250    /// set of keys and stashed on the client; retrieve it via
251    /// [`server_capabilities`](Self::server_capabilities) to learn
252    /// which v1+v2 features the peer agreed to support.
253    pub fn connect_with_capabilities(
254        url: &str,
255        publish_type: &str,
256        caps: &ConnectCapabilities,
257    ) -> Result<Self> {
258        let parsed = RtmpUrl::parse(url)?;
259        Self::connect_parsed(&parsed, publish_type, caps)
260    }
261
262    /// Capability block advertised by the server in `_result(connect)`.
263    ///
264    /// Empty when the peer is pre-2023 / unaware of E-RTMP — callers can
265    /// detect that with [`ConnectCapabilities::is_empty`]. Otherwise
266    /// describes which FourCC codecs the server reports it can decode /
267    /// encode / forward, plus the v2 `capsEx` bitfield (Reconnect,
268    /// Multitrack, ModEx, TimestampNanoOffset).
269    pub fn server_capabilities(&self) -> &ConnectCapabilities {
270        &self.server_caps
271    }
272
273    /// The `tcUrl` this client dialled (e.g.
274    /// `rtmp://host:1935/app`) — the base every Enhanced RTMP v2
275    /// reconnect target resolves against.
276    pub fn tc_url(&self) -> &str {
277        &self.tc_url
278    }
279
280    /// Resolve the `tcUrl` carried by an Enhanced RTMP v2
281    /// [`ClientEvent::ReconnectRequest`] into the absolute URL to
282    /// re-dial, per `enhanced-rtmp-v2.pdf` §"Reconnect Request":
283    ///
284    /// * `None` → "use the tcUrl for the current connection".
285    /// * `Some(reference)` → "absolute or relative URI reference of
286    ///   the server to which to reconnect. A relative URI reference
287    ///   should be resolved relative to the tcUrl for the current
288    ///   connection." All four spec example shapes are honoured:
289    ///   `rtmp://foo.mydomain.com:1935/realtimeapp` (absolute),
290    ///   `//192.0.2.0/realtimeapp` (network-path: keep our scheme),
291    ///   `/realtimeapp` (absolute-path: keep scheme + authority), and
292    ///   `realtimeapp` (relative-path: merge onto our tcUrl's path).
293    ///
294    /// Append the stream key (`/{stream_name}`) and feed the result to
295    /// [`RtmpClient::connect`] to complete the spec's reconnect flow.
296    pub fn resolve_reconnect_url(&self, tc_url: Option<&str>) -> String {
297        match tc_url {
298            Some(reference) => resolve_tc_url(&self.tc_url, reference),
299            None => self.tc_url.clone(),
300        }
301    }
302
303    fn connect_parsed(u: &RtmpUrl, publish_type: &str, caps: &ConnectCapabilities) -> Result<Self> {
304        let sock_addr = (u.host.as_str(), u.port)
305            .to_socket_addrs()
306            .map_err(Error::from)?
307            .next()
308            .ok_or_else(|| Error::Other(format!("resolved no addresses for {}", u.host)))?;
309        let stream = TcpStream::connect_timeout(&sock_addr, Duration::from_secs(15))?;
310        let _ = stream.set_nodelay(true);
311
312        // Handshake on a fresh clone — no chunk state is shared with
313        // it.
314        let mut hs = stream.try_clone()?;
315        crate::handshake::client_handshake(&mut hs)?;
316
317        let mut reader = ChunkReader::new(stream.try_clone()?);
318        let mut writer = ChunkWriter::new(stream.try_clone()?);
319
320        // We bump chunk size immediately — most commodity publishers
321        // do this too. Saves a bunch of chunk headers over the A/V path.
322        writer.write_message(
323            CSID_PROTOCOL_CONTROL,
324            &build_set_chunk_size(CLIENT_CHUNK_SIZE),
325        )?;
326        writer.set_chunk_size(CLIENT_CHUNK_SIZE as usize);
327
328        // Send connect.
329        let tx = 1.0;
330        writer.write_message(
331            CSID_COMMAND,
332            &build_connect_with_caps(tx, &u.app, &u.tc_url, FLASH_VER, caps),
333        )?;
334        writer.flush()?;
335
336        // Drain until we see the _result for connect; lift the server's
337        // capability advertisement out of the info object.
338        let connect_result = wait_for_result(&mut reader, &mut writer, tx)?;
339        // Info object is the last Object/EcmaArray AMF0 value after the
340        // properties slot; per §"Enhancing NetConnection connect Command"
341        // the server stamps its capabilities into one of the `_result`
342        // parameters. Walk back to the first Object/ECMA-array carrying
343        // any of the documented capability keys.
344        let server_caps = extract_server_caps(&connect_result);
345
346        // releaseStream + FCPublish — optional but standard.
347        let tx_release = 2.0;
348        writer.write_message(
349            CSID_COMMAND,
350            &build_release_stream(tx_release, &u.stream_name),
351        )?;
352        let tx_fc = 3.0;
353        writer.write_message(CSID_COMMAND, &build_fc_publish(tx_fc, &u.stream_name))?;
354
355        // createStream.
356        let tx_cs = 4.0;
357        writer.write_message(CSID_COMMAND, &build_create_stream(tx_cs))?;
358        writer.flush()?;
359
360        let stream_id = wait_for_create_stream_result(&mut reader, &mut writer, tx_cs)?;
361
362        // publish.
363        let tx_pub = 5.0;
364        writer.write_message(
365            CSID_COMMAND,
366            &build_publish(tx_pub, stream_id, &u.stream_name, publish_type),
367        )?;
368        writer.flush()?;
369
370        // Wait for Publish.Start. Ignore any interleaved control
371        // messages. Some servers don't bother sending onStatus —
372        // don't block forever, just wait briefly.
373        wait_for_publish_start(&mut reader, &mut writer)?;
374
375        Ok(Self {
376            stream,
377            reader,
378            writer,
379            stream_id,
380            next_tx: 10.0,
381            read_eof: false,
382            server_caps,
383            tc_url: u.tc_url.clone(),
384            pending_subs: VecDeque::new(),
385        })
386    }
387
388    /// Send the AVC sequence header (`AVCDecoderConfigurationRecord`
389    /// aka avcC). Must be called once before any NALU-carrying
390    /// [`send_video`](Self::send_video).
391    pub fn send_video_sequence_header(&mut self, avc_c: &[u8]) -> Result<()> {
392        let tag = VideoTag {
393            mod_ex: Vec::new(),
394            frame_type: flv::VIDEO_FRAME_KEYFRAME,
395            codec_id: flv::VIDEO_CODEC_AVC,
396            avc_packet_type: Some(flv::AVC_PACKET_TYPE_SEQUENCE_HEADER),
397            composition_time: 0,
398            body: avc_c.to_vec(),
399            ex_packet_type: None,
400            fourcc: None,
401
402            multitrack: None,
403        };
404        self.send_video_tag(0, &tag)
405    }
406
407    /// Send one video access unit. `body` is the AVCC-formatted
408    /// content (one or more `[u32 length BE][NALU bytes]` pairs).
409    /// `is_keyframe` drives the FLV frame_type bits.
410    pub fn send_video(&mut self, timestamp_ms: u32, is_keyframe: bool, body: &[u8]) -> Result<()> {
411        let tag = VideoTag {
412            mod_ex: Vec::new(),
413            frame_type: if is_keyframe {
414                flv::VIDEO_FRAME_KEYFRAME
415            } else {
416                flv::VIDEO_FRAME_INTER
417            },
418            codec_id: flv::VIDEO_CODEC_AVC,
419            avc_packet_type: Some(flv::AVC_PACKET_TYPE_NALU),
420            composition_time: 0,
421            body: body.to_vec(),
422            ex_packet_type: None,
423            fourcc: None,
424
425            multitrack: None,
426        };
427        self.send_video_tag(timestamp_ms, &tag)
428    }
429
430    fn send_video_tag(&mut self, ts: u32, tag: &VideoTag) -> Result<()> {
431        let payload = flv::build_video(tag);
432        self.writer.write_message(
433            CSID_VIDEO,
434            &Message {
435                msg_type_id: MSG_VIDEO,
436                msg_stream_id: self.stream_id,
437                timestamp: ts,
438                payload,
439            },
440        )?;
441        self.writer.flush()?;
442        Ok(())
443    }
444
445    /// Send the AAC `AudioSpecificConfig` (2 bytes for LC-AAC 44.1k
446    /// stereo: `0x12 0x10`). Must be called once before any
447    /// raw-frame [`send_audio`](Self::send_audio).
448    pub fn send_audio_sequence_header(&mut self, asc: &[u8]) -> Result<()> {
449        let tag = AudioTag {
450            mod_ex: Vec::new(),
451            sound_format: flv::AUDIO_FORMAT_AAC,
452            sound_rate: 3,
453            sound_size_16bit: true,
454            stereo: true,
455            aac_packet_type: Some(flv::AAC_PACKET_TYPE_SEQUENCE_HEADER),
456            body: asc.to_vec(),
457            ex_packet_type: None,
458            audio_fourcc: None,
459
460            multitrack: None,
461        };
462        self.send_audio_tag(0, &tag)
463    }
464
465    /// Send one raw AAC frame.
466    pub fn send_audio(&mut self, timestamp_ms: u32, aac_frame: &[u8]) -> Result<()> {
467        let tag = AudioTag {
468            mod_ex: Vec::new(),
469            sound_format: flv::AUDIO_FORMAT_AAC,
470            sound_rate: 3,
471            sound_size_16bit: true,
472            stereo: true,
473            aac_packet_type: Some(flv::AAC_PACKET_TYPE_RAW),
474            body: aac_frame.to_vec(),
475            ex_packet_type: None,
476            audio_fourcc: None,
477
478            multitrack: None,
479        };
480        self.send_audio_tag(timestamp_ms, &tag)
481    }
482
483    fn send_audio_tag(&mut self, ts: u32, tag: &AudioTag) -> Result<()> {
484        let payload = flv::build_audio(tag);
485        self.writer.write_message(
486            CSID_AUDIO,
487            &Message {
488                msg_type_id: MSG_AUDIO,
489                msg_stream_id: self.stream_id,
490                timestamp: ts,
491                payload,
492            },
493        )?;
494        self.writer.flush()?;
495        Ok(())
496    }
497
498    /// Send `@setDataFrame("onMetaData", metadata)`. Metadata is an
499    /// AMF0 value, typically an ECMA array or object populated with
500    /// `width`, `height`, `duration`, `videodatarate`, `framerate`,
501    /// `videocodecid`, `audiodatarate`, `audiocodecid`, etc.
502    pub fn send_metadata(&mut self, metadata: Amf0Value) -> Result<()> {
503        let msg = build_set_data_frame(self.stream_id, metadata);
504        self.writer.write_message(CSID_DATA, &msg)?;
505        self.writer.flush()?;
506        Ok(())
507    }
508
509    /// Send `onMetaData` as an AMF3-encoded data message (RTMP message
510    /// type 15) instead of the AMF0 default.
511    ///
512    /// The body is framed per AMF 3 spec §4.1 / AMF 0 spec §3.1: the
513    /// outer NetConnection message structure is AMF0, and each value
514    /// switches to AMF3 by prefixing it with the `avmplus-object-marker`
515    /// (`0x11`). Most ingest endpoints stay on AMF0, so prefer
516    /// [`send_metadata`](Self::send_metadata); this exists for peers that
517    /// negotiated an AMF3 channel.
518    pub fn send_metadata_amf3(&mut self, metadata: amf3::Amf3Value) -> Result<()> {
519        let mut payload = Vec::new();
520        payload.push(amf3::AVMPLUS_OBJECT_MARKER);
521        amf3::encode(&mut payload, &amf3::Amf3Value::String("onMetaData".into()));
522        payload.push(amf3::AVMPLUS_OBJECT_MARKER);
523        amf3::encode(&mut payload, &metadata);
524        let msg = Message {
525            msg_type_id: MSG_DATA_AMF3,
526            msg_stream_id: self.stream_id,
527            timestamp: 0,
528            payload,
529        };
530        self.writer.write_message(CSID_DATA, &msg)?;
531        self.writer.flush()?;
532        Ok(())
533    }
534
535    /// Send a batch of audio / video / data sub-messages as one
536    /// Aggregate Message (RTMP 1.0 §7.1.6, message type id 22).
537    ///
538    /// An aggregate trades one extra 11-byte sub-header per sub-message
539    /// (plus a 4-byte back-pointer) for the chunk-header overhead the
540    /// chunk writer would emit on each sub if it sent them
541    /// individually. For an active publish with several A/V messages
542    /// queued at the same timestamp this can cut the chunk-header
543    /// surface in half.
544    ///
545    /// `subs` carries pre-built FLV-shaped messages — typically AVC
546    /// video (type 9) and AAC audio (type 8) bodies the caller has
547    /// already framed via [`flv::build_video`] / [`flv::build_audio`].
548    /// Caller-supplied `msg_stream_id` fields are overridden to this
549    /// client's publish stream id per §7.1.6 ("the message stream ID
550    /// of the aggregate message overrides the message stream IDs of
551    /// the sub-messages"). The aggregate's own wire timestamp is set
552    /// to the first sub's timestamp so the §7.1.6 re-normalisation
553    /// offset is zero on the wire.
554    ///
555    /// Returns the same errors as
556    /// [`build_aggregate`](crate::aggregate::build_aggregate) plus any
557    /// I/O error from the underlying chunk writer. An empty `subs`
558    /// slice is a no-op.
559    pub fn send_aggregate(&mut self, subs: &[Message]) -> Result<()> {
560        if subs.is_empty() {
561            return Ok(());
562        }
563        // Override every sub's msg_stream_id to ours so the §7.1.6
564        // override invariant holds without surprising the caller.
565        let normalized: Vec<Message> = subs
566            .iter()
567            .map(|s| Message {
568                msg_type_id: s.msg_type_id,
569                msg_stream_id: self.stream_id,
570                timestamp: s.timestamp,
571                payload: s.payload.clone(),
572            })
573            .collect();
574        let agg = crate::aggregate::build_aggregate(self.stream_id, &normalized)?;
575        // CSID_DATA (6) is the natural data-channel id — aggregates
576        // aren't a protocol-control event.
577        self.writer.write_message(CSID_DATA, &agg)?;
578        self.writer.flush()?;
579        Ok(())
580    }
581
582    /// Send a `UserControl PingRequest` (RTMP 1.0 §3.7, UCM type 6)
583    /// carrying the supplied 4-byte timestamp.
584    ///
585    /// The peer is expected to echo the value back as a `PingResponse`
586    /// (UCM type 7), which surfaces from
587    /// [`poll_event`](Self::poll_event) as
588    /// [`ClientEvent::PingResponse`]. Typical use is round-trip-time
589    /// measurement: stamp the local monotonic clock into the request,
590    /// then subtract from the response timestamp once the matching
591    /// `PingResponse` arrives. The publish direction normally never
592    /// needs this — but a publisher pumping a low-bandwidth feed over
593    /// a flaky link may want to probe liveness explicitly rather than
594    /// wait for TCP keepalive.
595    pub fn send_ping_request(&mut self, timestamp_ms: u32) -> Result<()> {
596        self.writer.write_message(
597            CSID_PROTOCOL_CONTROL,
598            &build_user_control_ping_request(timestamp_ms),
599        )?;
600        self.writer.flush()?;
601        Ok(())
602    }
603
604    /// Emit a §5.3 Acknowledgement if the reader's received-byte count
605    /// has crossed the server-negotiated §5.5 window since the last
606    /// one. No-op until the server advertises a Window Acknowledgement
607    /// Size / Set Peer Bandwidth (every commodity ingest does so right
608    /// after `connect`).
609    fn maybe_send_ack(&mut self) -> Result<()> {
610        if let Some(seq) = self.reader.ack_due() {
611            self.writer
612                .write_message(CSID_PROTOCOL_CONTROL, &build_ack(seq))?;
613            self.writer.flush()?;
614        }
615        Ok(())
616    }
617
618    /// Poll for one server-originated event.
619    ///
620    /// Reads up to one inbound RTMP message from the server, applies
621    /// protocol-level housekeeping internally (set-chunk-size,
622    /// window-ack-size, set-peer-bandwidth, ping-request/response,
623    /// acks), and surfaces externally-visible notifications as a
624    /// [`ClientEvent`]:
625    ///
626    /// * `UserControl StreamBegin(sid)` → [`ClientEvent::StreamBegin`]
627    /// * `UserControl StreamEOF(sid)`   → [`ClientEvent::StreamEof`]
628    ///   (mirror of [`RtmpSession::close`](crate::RtmpSession::close)'s
629    ///   server-side teardown; RTMP 1.0 §7.1.7)
630    /// * `onStatus(...)`                → [`ClientEvent::OnStatus`]
631    /// * `_result(tx_id, ...)`          → [`ClientEvent::Result`]
632    /// * `_error(tx_id, ...)`           → [`ClientEvent::ErrorReply`]
633    /// * everything else                → [`ClientEvent::Other`]
634    ///
635    /// Returns `Ok(None)` once the server has signalled a clean stream
636    /// end (`StreamEOF`) or once the TCP read half observes EOF /
637    /// connection-reset. After `Ok(None)` is returned the caller
638    /// should stop writing and finish the session with
639    /// [`close`](Self::close).
640    ///
641    /// This is a blocking call. Set a finite read timeout on
642    /// [`inner_mut`](Self::inner_mut) ahead of time if you want
643    /// `poll_event` to return periodically with an `Err(Error::Io)`
644    /// kind `WouldBlock` / `TimedOut` so an outer event loop can do
645    /// other work between polls — the underlying TCP read deadline is
646    /// the timeout granularity, not a poll interval.
647    pub fn poll_event(&mut self) -> Result<Option<ClientEvent>> {
648        loop {
649            if self.read_eof {
650                return Ok(None);
651            }
652            // Drain any sub-messages decomposed from a prior
653            // server-originated Aggregate Message (RTMP 1.0 §7.1.6)
654            // ahead of any further wire read so the publish order is
655            // preserved.
656            if let Some(sub) = self.pending_subs.pop_front() {
657                let ev = self.classify_message(sub)?;
658                if matches!(ev, ClientEvent::Other) {
659                    // Don't return `Other` for a queued sub — the caller
660                    // typically pumps `poll_event` to observe semantic
661                    // events; an aggregate full of `Other` would
662                    // otherwise return N `Other`s in a row.
663                    continue;
664                }
665                return Ok(Some(ev));
666            }
667            let msg = match self.reader.read_message() {
668                Ok(m) => m,
669                Err(Error::Io(e))
670                    if matches!(
671                        e.kind(),
672                        std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionReset
673                    ) =>
674                {
675                    self.read_eof = true;
676                    return Ok(None);
677                }
678                Err(Error::UnexpectedEof) => {
679                    self.read_eof = true;
680                    return Ok(None);
681                }
682                Err(e) => return Err(e),
683            };
684            // §5.3: acknowledge once the server has sent a full window
685            // of bytes since our last ack (window set by the server's
686            // §5.5 Window Acknowledgement Size / §5.6 Set Peer
687            // Bandwidth).
688            self.maybe_send_ack()?;
689            if msg.msg_type_id == MSG_AGGREGATE {
690                // RTMP 1.0 §7.1.6 Aggregate Message. Decompose into
691                // FLV-shaped sub-messages with the §7.1.6 timestamp
692                // re-normalisation applied and the message-stream-id
693                // override resolved; queue them so subsequent calls
694                // surface the per-sub events in publish order. Don't
695                // return on the aggregate itself — drain the queue.
696                let subs = parse_aggregate(&msg)?;
697                self.pending_subs.extend(subs);
698                continue;
699            }
700            return Ok(Some(self.classify_message(msg)?));
701        }
702    }
703
704    /// Per-message classification shared between the wire-read path
705    /// and the aggregate-sub-drain path.
706    fn classify_message(&mut self, msg: Message) -> Result<ClientEvent> {
707        match msg.msg_type_id {
708            MSG_SET_CHUNK_SIZE => {
709                let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
710                self.reader.set_chunk_size(size as usize);
711                Ok(ClientEvent::Other)
712            }
713            MSG_WINDOW_ACK_SIZE => {
714                // §5.5: the server tells us which window size to use
715                // when sending §5.3 Acknowledgements. Honour it so our
716                // ack cadence matches the server's expectation.
717                let size = read_u32_be(&msg.payload)?;
718                self.reader.set_window_ack_size(size);
719                Ok(ClientEvent::Other)
720            }
721            MSG_SET_PEER_BANDWIDTH => {
722                // §5.6: "The output bandwidth value is the same as the
723                // window size for the peer." Adopt the leading 4-byte
724                // window as our send-side ack window. (Trailing Limit
725                // type byte is advisory.)
726                if msg.payload.len() >= 4 {
727                    let size = read_u32_be(&msg.payload[..4])?;
728                    self.reader.set_window_ack_size(size);
729                }
730                Ok(ClientEvent::Other)
731            }
732            MSG_ACK => {
733                // The server's §5.3 sequence number (bytes it has
734                // received from us). Informational for a publisher.
735                Ok(ClientEvent::Other)
736            }
737            MSG_USER_CONTROL => {
738                let (event_type, event_data) = parse_user_control(&msg.payload)?;
739                match event_type {
740                    USR_STREAM_BEGIN => {
741                        let sid = ucm_stream_id(event_data)?;
742                        Ok(ClientEvent::StreamBegin { stream_id: sid })
743                    }
744                    USR_STREAM_EOF => {
745                        let sid = ucm_stream_id(event_data)?;
746                        // Don't latch here: the server typically sends
747                        // a trailing onStatus / Unpublish.Success after
748                        // StreamEOF, then half-closes; we let the
749                        // subsequent read drain those and report EOF
750                        // naturally.
751                        Ok(ClientEvent::StreamEof { stream_id: sid })
752                    }
753                    USR_STREAM_DRY => {
754                        // Per RTMP 1.0 §3.7, StreamDry is a transient
755                        // "no data on the stream right now" signal —
756                        // distinct from StreamEOF, which terminates
757                        // playback. Surface to the caller so an outer
758                        // event loop can react (e.g. warn UI, switch
759                        // to a fallback stream); don't latch read_eof.
760                        let sid = ucm_stream_id(event_data)?;
761                        Ok(ClientEvent::StreamDry { stream_id: sid })
762                    }
763                    USR_SET_BUFFER_LENGTH => {
764                        // Per RTMP 1.0 §3.7, SetBufferLength is the only
765                        // standard UCM event with an 8-byte event-data
766                        // body: 4-byte stream id + 4-byte buffer length
767                        // in milliseconds. It is sent from a *playback*
768                        // client to the server — a publish-direction
769                        // client almost never sees it inbound. We
770                        // validate the payload size so a malformed
771                        // SetBufferLength from a confused peer surfaces
772                        // a clean error instead of silently truncating;
773                        // and we surface it as Other (no action required
774                        // on the publisher side).
775                        if event_data.len() < 8 {
776                            return Err(Error::ProtocolViolation(
777                                "UserControl SetBufferLength: event data < 8 bytes".into(),
778                            ));
779                        }
780                        Ok(ClientEvent::Other)
781                    }
782                    USR_STREAM_IS_RECORDED => {
783                        // Per RTMP 1.0 §3.7, server announces that the
784                        // stream is a recorded (on-demand) stream.
785                        // Surface to the caller — a publish client may
786                        // want to log this if the server marks our own
787                        // publish stream as recorded after we asked for
788                        // "live" (mismatched publish type).
789                        let sid = ucm_stream_id(event_data)?;
790                        Ok(ClientEvent::StreamIsRecorded { stream_id: sid })
791                    }
792                    USR_PING_REQUEST => {
793                        // Server pings — reply with PingResponse echoing
794                        // the same 4-byte timestamp body so the server's
795                        // liveness probe succeeds.
796                        let ts_bytes = event_data;
797                        if ts_bytes.len() >= 4 {
798                            let mut p = Vec::with_capacity(6);
799                            p.extend_from_slice(&USR_PING_RESPONSE.to_be_bytes());
800                            p.extend_from_slice(&ts_bytes[..4]);
801                            let _ = self.writer.write_message(
802                                CSID_PROTOCOL_CONTROL,
803                                &Message {
804                                    msg_type_id: MSG_USER_CONTROL,
805                                    msg_stream_id: 0,
806                                    timestamp: 0,
807                                    payload: p,
808                                },
809                            );
810                            let _ = self.writer.flush();
811                        }
812                        Ok(ClientEvent::Other)
813                    }
814                    USR_PING_RESPONSE => {
815                        // Per RTMP 1.0 §3.7, this echoes back the 4-byte
816                        // timestamp the publisher carried in a prior
817                        // PingRequest. Surface to the caller so a
818                        // round-trip-time measurement loop can compare
819                        // the echoed value to its own send-time clock.
820                        if event_data.len() < 4 {
821                            return Err(Error::ProtocolViolation(
822                                "UserControl PingResponse: event data < 4 bytes".into(),
823                            ));
824                        }
825                        let ts = u32::from_be_bytes([
826                            event_data[0],
827                            event_data[1],
828                            event_data[2],
829                            event_data[3],
830                        ]);
831                        Ok(ClientEvent::PingResponse { timestamp_ms: ts })
832                    }
833                    _ => {
834                        // Unknown / reserved UCM event type — surface as
835                        // Other; forwarding ingest may receive future
836                        // event types we don't model yet.
837                        Ok(ClientEvent::Other)
838                    }
839                }
840            }
841            MSG_COMMAND_AMF0 => {
842                let values = amf::decode_all(&msg.payload)?;
843                Ok(classify_command(values))
844            }
845            MSG_COMMAND_AMF3 => {
846                let values: Vec<Amf0Value> = amf3::decode_data_message(&msg.payload)?
847                    .iter()
848                    .map(amf3::Amf3Value::to_amf0)
849                    .collect();
850                Ok(classify_command(values))
851            }
852            MSG_AGGREGATE => {
853                // A sub-message inside an aggregate is itself an
854                // aggregate. Forward to the same queue so the next
855                // `poll_event` tick decomposes it. The wire path above
856                // already handles top-level aggregates directly.
857                let subs = parse_aggregate(&msg)?;
858                self.pending_subs.extend(subs);
859                Ok(ClientEvent::Other)
860            }
861            _ => Ok(ClientEvent::Other),
862        }
863    }
864
865    /// Send `closeStream` / `deleteStream` and shut the TCP socket.
866    pub fn close(mut self) -> Result<()> {
867        let tx = self.next_tx;
868        self.next_tx += 1.0;
869        let payload = amf::encode_command(
870            "closeStream",
871            tx,
872            Amf0Value::Null,
873            &[Amf0Value::Number(self.stream_id as f64)],
874        );
875        let _ = self.writer.write_message(
876            CSID_COMMAND,
877            &Message {
878                msg_type_id: MSG_COMMAND_AMF0,
879                msg_stream_id: self.stream_id,
880                timestamp: 0,
881                payload,
882            },
883        );
884        let _ = self.writer.flush();
885        // Shut down the write half only (send a graceful FIN) rather
886        // than the whole socket. `Shutdown::Both` tears the read half
887        // down at the same instant, which on some platforms makes the
888        // kernel answer the peer's still-unacked data with a RST; that
889        // RST discards any A/V messages the peer hasn't yet drained
890        // from its receive buffer — closeStream and the last frames we
891        // just wrote can be thrown away mid-stream. A write-half FIN
892        // lets the peer read every buffered frame plus our closeStream
893        // command, then observe EOF cleanly. The read half closes when
894        // `self` (and its owned `TcpStream`) drops at end of scope.
895        let _ = self.stream.shutdown(Shutdown::Write);
896        Ok(())
897    }
898
899    pub fn inner_mut(&mut self) -> &mut TcpStream {
900        &mut self.stream
901    }
902
903    /// Apply a `recv` timeout to the chunk reader's actual socket
904    /// clone (the one [`poll_event`](Self::poll_event) blocks on)
905    /// rather than the session's bookkeeping clone.
906    ///
907    /// On Linux a sockopt set through one `try_clone` descriptor
908    /// carries to its sibling clones because they share one file
909    /// description; on Windows each clone has its own kernel handle
910    /// with independent socket options, so the timeout has to be
911    /// installed on the exact socket the `recv` call will issue
912    /// against. Use this rather than
913    /// [`inner_mut`](Self::inner_mut)`.set_read_timeout(...)` when the
914    /// goal is to bound `poll_event`'s block time.
915    pub fn set_read_timeout(&mut self, d: Option<Duration>) -> Result<()> {
916        self.reader.inner_mut().set_read_timeout(d)?;
917        let _ = self.stream.set_read_timeout(d);
918        Ok(())
919    }
920}
921
922/// Consume messages from `reader` until we see a command named
923/// `_result` for `expected_tx`. Forward relevant protocol-control
924/// updates (SetChunkSize) to the reader.
925fn wait_for_result<R: Read, W: Write>(
926    reader: &mut ChunkReader<R>,
927    _writer: &mut ChunkWriter<W>,
928    expected_tx: f64,
929) -> Result<Vec<Amf0Value>> {
930    loop {
931        let msg = reader.read_message()?;
932        match msg.msg_type_id {
933            MSG_SET_CHUNK_SIZE => {
934                let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
935                reader.set_chunk_size(size as usize);
936            }
937            MSG_WINDOW_ACK_SIZE => {
938                // §5.5: capture the server's window during setup so the
939                // §5.3 ack obligation is live before the first media
940                // frame flows.
941                let size = read_u32_be(&msg.payload)?;
942                reader.set_window_ack_size(size);
943            }
944            MSG_SET_PEER_BANDWIDTH if msg.payload.len() >= 4 => {
945                // §5.6: output bandwidth equals the peer's window size.
946                let size = read_u32_be(&msg.payload[..4])?;
947                reader.set_window_ack_size(size);
948            }
949            MSG_COMMAND_AMF0 => {
950                let values = amf::decode_all(&msg.payload)?;
951                let name = values.first().and_then(Amf0Value::as_str).unwrap_or("");
952                let tx = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(-1.0);
953                if name == "_result" && tx == expected_tx {
954                    return Ok(values);
955                }
956                if name == "_error" {
957                    return Err(Error::Other(format!(
958                        "RTMP _error from server: {:?}",
959                        values.get(3)
960                    )));
961                }
962                // Any other status notifications before our _result
963                // (StreamBegin, bandwidth negotiations, etc.) — ignore.
964            }
965            _ => {}
966        }
967    }
968}
969
970fn wait_for_create_stream_result<R: Read, W: Write>(
971    reader: &mut ChunkReader<R>,
972    writer: &mut ChunkWriter<W>,
973    expected_tx: f64,
974) -> Result<u32> {
975    let values = wait_for_result(reader, writer, expected_tx)?;
976    // _result carries the new stream id as the last AMF0 value
977    // (either arg slot [3] or further back if the peer sent an extra
978    // props object).
979    let sid = values
980        .iter()
981        .rev()
982        .find_map(Amf0Value::as_f64)
983        .ok_or_else(|| Error::InvalidCommand("createStream result has no stream id".into()))?;
984    Ok(sid as u32)
985}
986
987fn wait_for_publish_start<R: Read, W: Write>(
988    reader: &mut ChunkReader<R>,
989    _writer: &mut ChunkWriter<W>,
990) -> Result<()> {
991    // Be lenient: the spec says the server SHOULD send an onStatus
992    // with NetStream.Publish.Start, but some servers skip it. Bail
993    // after we've seen a user-control StreamBegin OR an onStatus on
994    // the publish stream.
995    for _ in 0..20 {
996        let msg = match reader.read_message() {
997            Ok(m) => m,
998            Err(Error::Io(ref e))
999                if matches!(
1000                    e.kind(),
1001                    std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
1002                ) =>
1003            {
1004                return Ok(());
1005            }
1006            Err(e) => return Err(e),
1007        };
1008        match msg.msg_type_id {
1009            MSG_USER_CONTROL => return Ok(()),
1010            MSG_COMMAND_AMF0 => {
1011                let values = amf::decode_all(&msg.payload)?;
1012                if values
1013                    .first()
1014                    .and_then(Amf0Value::as_str)
1015                    .map(|n| n == "onStatus" || n == "_result")
1016                    .unwrap_or(false)
1017                {
1018                    return Ok(());
1019                }
1020            }
1021            _ => {}
1022        }
1023    }
1024    Ok(())
1025}
1026
1027fn read_u32_be(buf: &[u8]) -> Result<u32> {
1028    if buf.len() < 4 {
1029        return Err(Error::ProtocolViolation("need 4 bytes for u32be".into()));
1030    }
1031    Ok(u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]))
1032}
1033
1034/// Locate the Enhanced RTMP capability block in a server's
1035/// `_result(connect, ...)` AMF0 value list.
1036///
1037/// Per `enhanced-rtmp-v2.pdf` §"Enhancing NetConnection connect Command"
1038/// the server stamps `videoFourCcInfoMap` / `audioFourCcInfoMap` /
1039/// `capsEx` etc. into one of the `_result` parameters; in practice
1040/// every implementation we have seen drops them into the trailing info
1041/// object (the slot that carries `NetConnection.Connect.Success`). Some
1042/// servers also drop them into the properties slot (the second AMF0
1043/// value, right after the transaction id), so we walk every
1044/// Object / ECMA-array in the response and return the first one whose
1045/// `ConnectCapabilities::from_amf0` carries an Enhanced-RTMP property.
1046///
1047/// `objectEncoding` alone doesn't count: every pre-2023 server stamps
1048/// `objectEncoding = 0` into its info object as part of the legacy
1049/// `NetConnection.Connect.Success` shape, so picking that up would make
1050/// every legacy server look like a v2 advertisement. A fully-legacy
1051/// server therefore returns the empty capability block.
1052fn extract_server_caps(values: &[Amf0Value]) -> ConnectCapabilities {
1053    for v in values {
1054        if matches!(v, Amf0Value::Object(_) | Amf0Value::EcmaArray(_)) {
1055            let caps = ConnectCapabilities::from_amf0(v);
1056            if has_enhanced_rtmp_property(&caps) {
1057                return caps;
1058            }
1059        }
1060    }
1061    ConnectCapabilities::default()
1062}
1063
1064/// True if `caps` carries any of the Enhanced RTMP v1+v2 capability
1065/// properties (`fourCcList`, `videoFourCcInfoMap`,
1066/// `audioFourCcInfoMap`, `capsEx`). `objectEncoding` alone — the
1067/// legacy `_result(connect)` field — does not count.
1068fn has_enhanced_rtmp_property(caps: &ConnectCapabilities) -> bool {
1069    !caps.fourcc_list.is_empty()
1070        || !caps.video_fourcc_info_map.is_empty()
1071        || !caps.audio_fourcc_info_map.is_empty()
1072        || caps.caps_ex != 0
1073}
1074
1075/// Decode the `User Control` payload framing per RTMP 1.0 §6.2 /
1076/// §7.1.7: a 16-bit BE event type followed by variable-length event
1077/// data. Returns `(event_type, event_data)` borrowed from the input.
1078fn parse_user_control(buf: &[u8]) -> Result<(u16, &[u8])> {
1079    if buf.len() < 2 {
1080        return Err(Error::ProtocolViolation(
1081            "UserControl: payload < 2 bytes".into(),
1082        ));
1083    }
1084    let event_type = u16::from_be_bytes([buf[0], buf[1]]);
1085    Ok((event_type, &buf[2..]))
1086}
1087
1088/// Stream-id-carrying UCM events (Stream Begin / Stream EOF / Stream
1089/// Dry / StreamIsRecorded) all use a 4-byte BE stream id as their
1090/// event data.
1091fn ucm_stream_id(event_data: &[u8]) -> Result<u32> {
1092    if event_data.len() < 4 {
1093        return Err(Error::ProtocolViolation(
1094            "UserControl: event data < 4 bytes (need stream id)".into(),
1095        ));
1096    }
1097    Ok(u32::from_be_bytes([
1098        event_data[0],
1099        event_data[1],
1100        event_data[2],
1101        event_data[3],
1102    ]))
1103}
1104
1105/// Resolve a reconnect-target URI reference against the current
1106/// connection's `tcUrl`, per `enhanced-rtmp-v2.pdf` §"Reconnect
1107/// Request" ("a relative URI reference should be resolved relative to
1108/// the tcUrl for the current connection"). Handles the four reference
1109/// shapes the spec's Info Object table gives as examples:
1110///
1111/// 1. `rtmp://foo.mydomain.com:1935/realtimeapp` — absolute (has a
1112///    scheme): taken verbatim.
1113/// 2. `//192.0.2.0/realtimeapp` — network-path reference: inherits
1114///    only the base's scheme.
1115/// 3. `/realtimeapp` — absolute-path reference: inherits the base's
1116///    scheme + authority.
1117/// 4. `realtimeapp` — relative-path reference: merged onto the base's
1118///    path with the last segment replaced.
1119///
1120/// An empty reference resolves to the base itself.
1121pub fn resolve_tc_url(base: &str, reference: &str) -> String {
1122    if reference.is_empty() {
1123        return base.to_owned();
1124    }
1125    if reference.contains("://") {
1126        // Absolute URI reference — already carries its own scheme.
1127        return reference.to_owned();
1128    }
1129    let (scheme, after_scheme) = match base.find("://") {
1130        Some(i) => (&base[..i], &base[i + 3..]),
1131        None => ("rtmp", base),
1132    };
1133    if let Some(net_path) = reference.strip_prefix("//") {
1134        // Network-path reference: keep only our scheme.
1135        return format!("{scheme}://{net_path}");
1136    }
1137    let (authority, base_path) = match after_scheme.find('/') {
1138        Some(i) => (&after_scheme[..i], &after_scheme[i..]),
1139        None => (after_scheme, ""),
1140    };
1141    if reference.starts_with('/') {
1142        // Absolute-path reference: keep scheme + authority.
1143        return format!("{scheme}://{authority}{reference}");
1144    }
1145    // Relative-path reference: merge — drop the base path's last
1146    // segment, keep everything up to (and including) its final '/'.
1147    let dir = match base_path.rfind('/') {
1148        Some(i) => &base_path[..=i],
1149        None => "/",
1150    };
1151    format!("{scheme}://{authority}{dir}{reference}")
1152}
1153
1154/// Classify a decoded AMF0 command message into a [`ClientEvent`].
1155/// Matches `onStatus` / `_result` / `_error` by name and pulls the
1156/// transaction id / info object out of the expected slots.
1157fn classify_command(values: Vec<Amf0Value>) -> ClientEvent {
1158    let name = values.first().and_then(Amf0Value::as_str).unwrap_or("");
1159    match name {
1160        "onStatus" => {
1161            // ["onStatus", 0.0, null, <info-object>]
1162            if let Some(info) = values.get(3) {
1163                let level = info
1164                    .get("level")
1165                    .and_then(Amf0Value::as_str)
1166                    .unwrap_or("")
1167                    .to_owned();
1168                let code = info
1169                    .get("code")
1170                    .and_then(Amf0Value::as_str)
1171                    .unwrap_or("")
1172                    .to_owned();
1173                let description = info
1174                    .get("description")
1175                    .and_then(Amf0Value::as_str)
1176                    .unwrap_or("")
1177                    .to_owned();
1178                // Enhanced RTMP v2 §"Reconnect Request": the server
1179                // asks us to reconnect via a NetConnection-level
1180                // onStatus whose code MUST be
1181                // NetConnection.Connect.ReconnectRequest and whose
1182                // level MUST be "status". Lift the optional tcUrl so
1183                // the caller can re-dial; an event with the right code
1184                // but the wrong level is NOT a valid reconnect request
1185                // per spec, so it falls through as a plain OnStatus.
1186                if code == crate::message::RECONNECT_REQUEST_CODE && level == "status" {
1187                    let tc_url = info
1188                        .get("tcUrl")
1189                        .and_then(Amf0Value::as_str)
1190                        .map(str::to_owned);
1191                    return ClientEvent::ReconnectRequest {
1192                        tc_url,
1193                        description,
1194                    };
1195                }
1196                return ClientEvent::OnStatus {
1197                    level,
1198                    code,
1199                    description,
1200                };
1201            }
1202            ClientEvent::Other
1203        }
1204        "_result" => {
1205            let tx = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(0.0);
1206            ClientEvent::Result {
1207                transaction_id: tx,
1208                values,
1209            }
1210        }
1211        "_error" => {
1212            let tx = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(0.0);
1213            ClientEvent::ErrorReply {
1214                transaction_id: tx,
1215                values,
1216            }
1217        }
1218        _ => ClientEvent::Other,
1219    }
1220}
1221
1222#[cfg(test)]
1223mod tests {
1224    use super::*;
1225
1226    #[test]
1227    fn parse_user_control_stream_eof_recovers_stream_id() {
1228        // Wire layout per RTMP 1.0 §7.1.7: [event_type=0x0001 BE]
1229        // [stream_id BE]. Our build_user_control_stream_eof emits this
1230        // exact 6-byte body — same six bytes the auditor's
1231        // session_close test asserts on.
1232        let payload: [u8; 6] = [0x00, 0x01, 0x00, 0x00, 0x00, 0x07];
1233        let (event_type, event_data) = parse_user_control(&payload).expect("parse UCM");
1234        assert_eq!(event_type, USR_STREAM_EOF);
1235        assert_eq!(ucm_stream_id(event_data).expect("sid"), 7);
1236    }
1237
1238    #[test]
1239    fn parse_user_control_rejects_truncated_payload() {
1240        // < 2 bytes — can't even read the event type.
1241        assert!(parse_user_control(&[0x00]).is_err());
1242        assert!(parse_user_control(&[]).is_err());
1243        // 2 bytes (event type only) but the event type is a
1244        // stream-id-carrying variant: event-data is empty so the SID
1245        // extractor refuses.
1246        let (event_type, event_data) = parse_user_control(&[0x00, 0x01]).expect("parse UCM");
1247        assert_eq!(event_type, USR_STREAM_EOF);
1248        assert!(ucm_stream_id(event_data).is_err());
1249    }
1250
1251    #[test]
1252    fn classify_command_recognises_on_status() {
1253        let info = Amf0Value::Object(vec![
1254            ("level".into(), Amf0Value::String("status".into())),
1255            (
1256                "code".into(),
1257                Amf0Value::String("NetStream.Publish.Start".into()),
1258            ),
1259            ("description".into(), Amf0Value::String("ready".into())),
1260        ]);
1261        let values = vec![
1262            Amf0Value::String("onStatus".into()),
1263            Amf0Value::Number(0.0),
1264            Amf0Value::Null,
1265            info,
1266        ];
1267        match classify_command(values) {
1268            ClientEvent::OnStatus {
1269                level,
1270                code,
1271                description,
1272            } => {
1273                assert_eq!(level, "status");
1274                assert_eq!(code, "NetStream.Publish.Start");
1275                assert_eq!(description, "ready");
1276            }
1277            other => panic!("expected OnStatus, got {other:?}"),
1278        }
1279    }
1280
1281    /// Enhanced RTMP v2 §"Reconnect Request": an onStatus whose code
1282    /// is `NetConnection.Connect.ReconnectRequest` (level `status`)
1283    /// lifts to [`ClientEvent::ReconnectRequest`] with the optional
1284    /// `tcUrl` extracted — and the same code under a non-`status`
1285    /// level is NOT a valid reconnect request per spec ("to reconnect
1286    /// the level MUST be set to status"), so it stays a plain
1287    /// OnStatus.
1288    #[test]
1289    fn classify_command_recognises_reconnect_request() {
1290        let msg = crate::message::build_reconnect_request(
1291            Some("//192.0.2.0/realtimeapp"),
1292            Some("server update"),
1293        );
1294        let values = amf::decode_all(&msg.payload).unwrap();
1295        match classify_command(values) {
1296            ClientEvent::ReconnectRequest {
1297                tc_url,
1298                description,
1299            } => {
1300                assert_eq!(tc_url.as_deref(), Some("//192.0.2.0/realtimeapp"));
1301                assert_eq!(description, "server update");
1302            }
1303            other => panic!("expected ReconnectRequest, got {other:?}"),
1304        }
1305
1306        // tcUrl omitted → None ("use the tcUrl for the current
1307        // connection").
1308        let msg = crate::message::build_reconnect_request(None, None);
1309        let values = amf::decode_all(&msg.payload).unwrap();
1310        match classify_command(values) {
1311            ClientEvent::ReconnectRequest { tc_url, .. } => assert_eq!(tc_url, None),
1312            other => panic!("expected ReconnectRequest, got {other:?}"),
1313        }
1314
1315        // Wrong level → plain OnStatus passthrough.
1316        let info = Amf0Value::Object(vec![
1317            ("level".into(), Amf0Value::String("error".into())),
1318            (
1319                "code".into(),
1320                Amf0Value::String(crate::message::RECONNECT_REQUEST_CODE.into()),
1321            ),
1322        ]);
1323        let values = vec![
1324            Amf0Value::String("onStatus".into()),
1325            Amf0Value::Number(0.0),
1326            Amf0Value::Null,
1327            info,
1328        ];
1329        match classify_command(values) {
1330            ClientEvent::OnStatus { level, code, .. } => {
1331                assert_eq!(level, "error");
1332                assert_eq!(code, crate::message::RECONNECT_REQUEST_CODE);
1333            }
1334            other => panic!("expected OnStatus, got {other:?}"),
1335        }
1336    }
1337
1338    /// The four reference shapes from the spec's Info Object table
1339    /// (`enhanced-rtmp-v2.pdf` §"Reconnect Request"), resolved against
1340    /// a typical publisher tcUrl.
1341    #[test]
1342    fn resolve_tc_url_spec_example_shapes() {
1343        let base = "rtmp://origin.example.com:1935/live";
1344        // 1. Absolute URI reference — verbatim.
1345        assert_eq!(
1346            resolve_tc_url(base, "rtmp://foo.mydomain.com:1935/realtimeapp"),
1347            "rtmp://foo.mydomain.com:1935/realtimeapp"
1348        );
1349        // 2. Network-path reference — inherits only our scheme.
1350        assert_eq!(
1351            resolve_tc_url(base, "//192.0.2.0/realtimeapp"),
1352            "rtmp://192.0.2.0/realtimeapp"
1353        );
1354        // 3. Absolute-path reference — inherits scheme + authority.
1355        assert_eq!(
1356            resolve_tc_url(base, "/realtimeapp"),
1357            "rtmp://origin.example.com:1935/realtimeapp"
1358        );
1359        // 4. Relative-path reference — merged onto the base path with
1360        //    the last segment replaced.
1361        assert_eq!(
1362            resolve_tc_url(base, "realtimeapp"),
1363            "rtmp://origin.example.com:1935/realtimeapp"
1364        );
1365        // Deeper base path: only the last segment is replaced.
1366        assert_eq!(
1367            resolve_tc_url("rtmp://h/app/inst", "other"),
1368            "rtmp://h/app/other"
1369        );
1370        // Empty reference → base itself.
1371        assert_eq!(resolve_tc_url(base, ""), base);
1372        // Base without any path: relative ref lands at the root.
1373        assert_eq!(
1374            resolve_tc_url("rtmp://h:1935", "realtimeapp"),
1375            "rtmp://h:1935/realtimeapp"
1376        );
1377    }
1378
1379    #[test]
1380    fn classify_command_recognises_result_and_error() {
1381        let result = classify_command(vec![
1382            Amf0Value::String("_result".into()),
1383            Amf0Value::Number(42.0),
1384            Amf0Value::Null,
1385            Amf0Value::Number(7.0),
1386        ]);
1387        match result {
1388            ClientEvent::Result {
1389                transaction_id,
1390                values,
1391            } => {
1392                assert_eq!(transaction_id, 42.0);
1393                assert_eq!(values.len(), 4);
1394            }
1395            other => panic!("expected Result, got {other:?}"),
1396        }
1397
1398        let err = classify_command(vec![
1399            Amf0Value::String("_error".into()),
1400            Amf0Value::Number(99.0),
1401            Amf0Value::Null,
1402            Amf0Value::Null,
1403        ]);
1404        match err {
1405            ClientEvent::ErrorReply { transaction_id, .. } => {
1406                assert_eq!(transaction_id, 99.0);
1407            }
1408            other => panic!("expected ErrorReply, got {other:?}"),
1409        }
1410    }
1411}