Skip to main content

oxideav_rtmp/
message.rs

1//! RTMP message-type constants + tiny builders for the protocol
2//! control and command messages we send during publish setup.
3//!
4//! Each builder returns a [`chunk::Message`] ready to feed to
5//! [`chunk::ChunkWriter::write_message`].
6
7use crate::amf::{encode_command, Amf0Value};
8use crate::caps::ConnectCapabilities;
9use crate::chunk::Message;
10use crate::error::{Error, Result};
11
12// §6.1 "Message Header" — type ids.
13pub const MSG_SET_CHUNK_SIZE: u8 = 1;
14pub const MSG_ABORT: u8 = 2;
15pub const MSG_ACK: u8 = 3;
16pub const MSG_USER_CONTROL: u8 = 4;
17pub const MSG_WINDOW_ACK_SIZE: u8 = 5;
18pub const MSG_SET_PEER_BANDWIDTH: u8 = 6;
19pub const MSG_AUDIO: u8 = 8;
20pub const MSG_VIDEO: u8 = 9;
21pub const MSG_DATA_AMF3: u8 = 15;
22pub const MSG_SHARED_OBJECT_AMF3: u8 = 16;
23pub const MSG_COMMAND_AMF3: u8 = 17;
24pub const MSG_DATA_AMF0: u8 = 18;
25pub const MSG_SHARED_OBJECT_AMF0: u8 = 19;
26pub const MSG_COMMAND_AMF0: u8 = 20;
27pub const MSG_AGGREGATE: u8 = 22;
28
29// §7.1.7 "User Control Message Events"
30pub const USR_STREAM_BEGIN: u16 = 0;
31pub const USR_STREAM_EOF: u16 = 1;
32pub const USR_STREAM_DRY: u16 = 2;
33pub const USR_SET_BUFFER_LENGTH: u16 = 3;
34pub const USR_STREAM_IS_RECORDED: u16 = 4;
35pub const USR_PING_REQUEST: u16 = 6;
36pub const USR_PING_RESPONSE: u16 = 7;
37
38// Chunk stream id conventions — not mandated by spec but used by every
39// major commodity implementation we have interoperated with, so we match.
40pub const CSID_PROTOCOL_CONTROL: u32 = 2;
41pub const CSID_COMMAND: u32 = 3;
42pub const CSID_AUDIO: u32 = 4;
43pub const CSID_VIDEO: u32 = 5;
44pub const CSID_DATA: u32 = 6;
45
46// ---------------------------------------------------------------------------
47// Protocol control builders
48// ---------------------------------------------------------------------------
49
50pub fn build_set_chunk_size(size: u32) -> Message {
51    // Bit 31 is reserved → mask to 31 bits.
52    let size = size & 0x7FFF_FFFF;
53    Message {
54        msg_type_id: MSG_SET_CHUNK_SIZE,
55        msg_stream_id: 0,
56        timestamp: 0,
57        payload: size.to_be_bytes().to_vec(),
58    }
59}
60
61/// Abort Message (protocol control type 2, RTMP 1.0 §5.2).
62///
63/// Per the spec, "Protocol control message 2, Abort Message, is used to
64/// notify the peer if it is waiting for chunks to complete a message,
65/// then to discard the partially received message over a chunk stream
66/// and abort processing of that message. The peer receives the chunk
67/// stream ID of the message to be discarded as payload of this protocol
68/// message." The body is a single 4-byte big-endian chunk stream ID
69/// (Figure 3). Like every protocol-control message it travels on the
70/// control stream (`msg_stream_id == 0`).
71pub fn build_abort(chunk_stream_id: u32) -> Message {
72    Message {
73        msg_type_id: MSG_ABORT,
74        msg_stream_id: 0,
75        timestamp: 0,
76        payload: chunk_stream_id.to_be_bytes().to_vec(),
77    }
78}
79
80pub fn build_window_ack_size(size: u32) -> Message {
81    Message {
82        msg_type_id: MSG_WINDOW_ACK_SIZE,
83        msg_stream_id: 0,
84        timestamp: 0,
85        payload: size.to_be_bytes().to_vec(),
86    }
87}
88
89pub fn build_set_peer_bandwidth(size: u32, limit_type: u8) -> Message {
90    let mut p = Vec::with_capacity(5);
91    p.extend_from_slice(&size.to_be_bytes());
92    p.push(limit_type);
93    Message {
94        msg_type_id: MSG_SET_PEER_BANDWIDTH,
95        msg_stream_id: 0,
96        timestamp: 0,
97        payload: p,
98    }
99}
100
101pub fn build_user_control_stream_begin(stream_id: u32) -> Message {
102    let mut p = Vec::with_capacity(6);
103    p.extend_from_slice(&USR_STREAM_BEGIN.to_be_bytes());
104    p.extend_from_slice(&stream_id.to_be_bytes());
105    Message {
106        msg_type_id: MSG_USER_CONTROL,
107        msg_stream_id: 0,
108        timestamp: 0,
109        payload: p,
110    }
111}
112
113/// User-control `StreamEOF` event (`UCM` type 1).
114///
115/// Per RTMP 1.0 §7.1.7, the server uses this to tell the peer that
116/// "playback of data is over as requested ... that the stream is dry."
117/// In the publish direction we re-use it as the symmetric end-of-stream
118/// signal so the peer learns the publisher is done before observing the
119/// TCP FIN. The 4-byte event body is the stream id of the dry stream.
120pub fn build_user_control_stream_eof(stream_id: u32) -> Message {
121    let mut p = Vec::with_capacity(6);
122    p.extend_from_slice(&USR_STREAM_EOF.to_be_bytes());
123    p.extend_from_slice(&stream_id.to_be_bytes());
124    Message {
125        msg_type_id: MSG_USER_CONTROL,
126        msg_stream_id: 0,
127        timestamp: 0,
128        payload: p,
129    }
130}
131
132/// User-control `StreamDry` event (`UCM` type 2).
133///
134/// Per RTMP 1.0 §3.7 ("Commands Messages" — User Control message table),
135/// "the server sends this event to notify the client that there is no
136/// more data on the stream. If the server does not detect any message
137/// for a time period, it can notify the subscribed clients that the
138/// stream is dry." The 4-byte event body is the stream id of the dry
139/// stream. Distinct from `StreamEOF`: `StreamDry` is "no data right
140/// now," `StreamEOF` is "playback finished."
141pub fn build_user_control_stream_dry(stream_id: u32) -> Message {
142    let mut p = Vec::with_capacity(6);
143    p.extend_from_slice(&USR_STREAM_DRY.to_be_bytes());
144    p.extend_from_slice(&stream_id.to_be_bytes());
145    Message {
146        msg_type_id: MSG_USER_CONTROL,
147        msg_stream_id: 0,
148        timestamp: 0,
149        payload: p,
150    }
151}
152
153/// User-control `SetBufferLength` event (`UCM` type 3).
154///
155/// Per RTMP 1.0 §3.7, "the client sends this event to inform the server
156/// of the buffer size (in milliseconds) that is used to buffer any data
157/// coming over a stream. This event is sent before the server starts
158/// processing the stream. The first 4 bytes of the event data represent
159/// the stream ID and the next 4 bytes represent the buffer length, in
160/// milliseconds." This is the only standard UCM event with a non-4-byte
161/// event-data body (8 bytes total).
162pub fn build_user_control_set_buffer_length(stream_id: u32, buffer_ms: u32) -> Message {
163    let mut p = Vec::with_capacity(10);
164    p.extend_from_slice(&USR_SET_BUFFER_LENGTH.to_be_bytes());
165    p.extend_from_slice(&stream_id.to_be_bytes());
166    p.extend_from_slice(&buffer_ms.to_be_bytes());
167    Message {
168        msg_type_id: MSG_USER_CONTROL,
169        msg_stream_id: 0,
170        timestamp: 0,
171        payload: p,
172    }
173}
174
175/// User-control `StreamIsRecorded` event (`UCM` type 4).
176///
177/// Per RTMP 1.0 §3.7, "the server sends this event to notify the client
178/// that the stream is a recorded stream. The 4 bytes event data
179/// represent the stream ID of the recorded stream." Servers typically
180/// emit this right after `StreamBegin` for an on-demand stream.
181pub fn build_user_control_stream_is_recorded(stream_id: u32) -> Message {
182    let mut p = Vec::with_capacity(6);
183    p.extend_from_slice(&USR_STREAM_IS_RECORDED.to_be_bytes());
184    p.extend_from_slice(&stream_id.to_be_bytes());
185    Message {
186        msg_type_id: MSG_USER_CONTROL,
187        msg_stream_id: 0,
188        timestamp: 0,
189        payload: p,
190    }
191}
192
193/// User-control `PingRequest` event (`UCM` type 6).
194///
195/// Per RTMP 1.0 §3.7, "the server sends this event to test whether the
196/// client is reachable. Event data is a 4-byte timestamp, representing
197/// the local server time when the server dispatched the command. The
198/// client responds with kMsgPingResponse on receiving kMsgPingRequest."
199pub fn build_user_control_ping_request(timestamp_ms: u32) -> Message {
200    let mut p = Vec::with_capacity(6);
201    p.extend_from_slice(&USR_PING_REQUEST.to_be_bytes());
202    p.extend_from_slice(&timestamp_ms.to_be_bytes());
203    Message {
204        msg_type_id: MSG_USER_CONTROL,
205        msg_stream_id: 0,
206        timestamp: 0,
207        payload: p,
208    }
209}
210
211/// User-control `PingResponse` event (`UCM` type 7).
212///
213/// Per RTMP 1.0 §3.7, "the client sends this event to the server in
214/// response to the ping request. The event data is a 4-byte timestamp,
215/// which was received with the kMsgPingRequest request." The caller is
216/// responsible for echoing back the exact timestamp the peer's
217/// `PingRequest` carried.
218pub fn build_user_control_ping_response(timestamp_ms: u32) -> Message {
219    let mut p = Vec::with_capacity(6);
220    p.extend_from_slice(&USR_PING_RESPONSE.to_be_bytes());
221    p.extend_from_slice(&timestamp_ms.to_be_bytes());
222    Message {
223        msg_type_id: MSG_USER_CONTROL,
224        msg_stream_id: 0,
225        timestamp: 0,
226        payload: p,
227    }
228}
229
230// ---------------------------------------------------------------------------
231// User Control Message typed accessor (round-trip parser)
232// ---------------------------------------------------------------------------
233
234/// Strongly-typed view of a User Control Message body per RTMP 1.0
235/// §3.7 / §7.1.7.
236///
237/// The `build_user_control_*` family above produces a [`Message`]
238/// with `msg_type_id == MSG_USER_CONTROL` and a payload shaped
239/// `event_type:U16BE | event_data:..`. [`UserControlEvent::parse`]
240/// is the inverse: lift such a payload into one of the seven
241/// spec-defined variants (or the catch-all [`Self::Unknown`] for
242/// forward compatibility — the spec leaves event types 5, 8..,
243/// reserved).
244///
245/// `Unknown` carries both the raw `event_type` and the unconsumed
246/// `event_data` bytes so a forwarding ingest can route unrecognised
247/// UCMs without losing information; a strict consumer can refuse
248/// the message by matching on it.
249///
250/// Round-trip helper: [`UserControlEvent::to_message`] produces the
251/// same [`Message`] the matching `build_user_control_*` builder
252/// emits, so `parse(build_x().payload) == Ok(x)` for every variant.
253#[derive(Debug, Clone, PartialEq, Eq)]
254pub enum UserControlEvent {
255    /// UCM type 0 — server tells the client that a stream is ready to
256    /// receive messages on the given stream id. Emitted right after
257    /// `_result(createStream)` from the server side; carried as the
258    /// 4-byte BE stream id in the event data.
259    StreamBegin { stream_id: u32 },
260    /// UCM type 1 — playback / publish finished on the given stream
261    /// id. The publisher emits this before tearing down its socket so
262    /// the peer learns "EOF was intentional" rather than guessing
263    /// whether the TCP FIN was a crash. 4-byte BE stream id.
264    StreamEof { stream_id: u32 },
265    /// UCM type 2 — server has not seen any data on the given stream
266    /// for a while. Distinct from [`Self::StreamEof`]: this is a
267    /// transient "no data right now" signal; the stream may resume.
268    /// 4-byte BE stream id.
269    StreamDry { stream_id: u32 },
270    /// UCM type 3 — client tells the server how many milliseconds of
271    /// buffer it is willing to keep filled. The only standard UCM
272    /// event with an 8-byte event-data body: 4 bytes BE stream id
273    /// followed by 4 bytes BE buffer length in ms.
274    SetBufferLength { stream_id: u32, buffer_ms: u32 },
275    /// UCM type 4 — server announces that the stream is recorded
276    /// (on-demand / archival). 4-byte BE stream id. Typically emitted
277    /// right after [`Self::StreamBegin`] on a play request.
278    StreamIsRecorded { stream_id: u32 },
279    /// UCM type 6 — sender's local time in ms; receiver must echo the
280    /// same value back in a [`Self::PingResponse`]. Used for liveness
281    /// probing + RTT measurement. 4-byte BE timestamp.
282    PingRequest { timestamp_ms: u32 },
283    /// UCM type 7 — exact echo of the timestamp from a paired
284    /// [`Self::PingRequest`]. 4-byte BE timestamp.
285    PingResponse { timestamp_ms: u32 },
286    /// Any event type not assigned by RTMP 1.0 §3.7 — UCM 5 is
287    /// reserved, and any UCM type ≥ 8 is forward-compatible space
288    /// the spec leaves unspecified. `data` holds the unconsumed
289    /// event-data bytes verbatim so a forwarding ingest can route
290    /// the message through without re-encoding.
291    Unknown { event_type: u16, data: Vec<u8> },
292}
293
294impl UserControlEvent {
295    /// Decode a UCM payload (the contents of a [`Message`] with
296    /// `msg_type_id == MSG_USER_CONTROL`) into a [`UserControlEvent`]
297    /// per RTMP 1.0 §3.7 / §7.1.7.
298    ///
299    /// Returns [`Error::ProtocolViolation`] if the payload is shorter
300    /// than the 2-byte event-type header, or if one of the
301    /// fixed-shape spec-defined variants is truncated below its
302    /// declared event-data size (4 bytes for the stream-id-carrying
303    /// variants and ping, 8 bytes for `SetBufferLength`). Unknown
304    /// event types accept any tail length, including zero, so a
305    /// forwarding ingest never rejects forward-compatible messages.
306    pub fn parse(payload: &[u8]) -> Result<Self> {
307        if payload.len() < 2 {
308            return Err(Error::ProtocolViolation(
309                "UserControl: payload < 2 bytes (need event type)".into(),
310            ));
311        }
312        let event_type = u16::from_be_bytes([payload[0], payload[1]]);
313        let data = &payload[2..];
314        match event_type {
315            USR_STREAM_BEGIN => Ok(Self::StreamBegin {
316                stream_id: read_u32_be(data, "StreamBegin")?,
317            }),
318            USR_STREAM_EOF => Ok(Self::StreamEof {
319                stream_id: read_u32_be(data, "StreamEOF")?,
320            }),
321            USR_STREAM_DRY => Ok(Self::StreamDry {
322                stream_id: read_u32_be(data, "StreamDry")?,
323            }),
324            USR_SET_BUFFER_LENGTH => {
325                if data.len() < 8 {
326                    return Err(Error::ProtocolViolation(format!(
327                        "UserControl SetBufferLength: event data {} < 8 bytes",
328                        data.len()
329                    )));
330                }
331                let stream_id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
332                let buffer_ms = u32::from_be_bytes([data[4], data[5], data[6], data[7]]);
333                Ok(Self::SetBufferLength {
334                    stream_id,
335                    buffer_ms,
336                })
337            }
338            USR_STREAM_IS_RECORDED => Ok(Self::StreamIsRecorded {
339                stream_id: read_u32_be(data, "StreamIsRecorded")?,
340            }),
341            USR_PING_REQUEST => Ok(Self::PingRequest {
342                timestamp_ms: read_u32_be(data, "PingRequest")?,
343            }),
344            USR_PING_RESPONSE => Ok(Self::PingResponse {
345                timestamp_ms: read_u32_be(data, "PingResponse")?,
346            }),
347            other => Ok(Self::Unknown {
348                event_type: other,
349                data: data.to_vec(),
350            }),
351        }
352    }
353
354    /// 2-byte BE event-type identifier per §7.1.7. Matches the value
355    /// the wire form embeds in its first two bytes.
356    pub fn event_type(&self) -> u16 {
357        match self {
358            Self::StreamBegin { .. } => USR_STREAM_BEGIN,
359            Self::StreamEof { .. } => USR_STREAM_EOF,
360            Self::StreamDry { .. } => USR_STREAM_DRY,
361            Self::SetBufferLength { .. } => USR_SET_BUFFER_LENGTH,
362            Self::StreamIsRecorded { .. } => USR_STREAM_IS_RECORDED,
363            Self::PingRequest { .. } => USR_PING_REQUEST,
364            Self::PingResponse { .. } => USR_PING_RESPONSE,
365            Self::Unknown { event_type, .. } => *event_type,
366        }
367    }
368
369    /// True iff this is one of the seven event types §3.7 / §7.1.7
370    /// assigns a fixed shape to. [`Self::Unknown`] returns false.
371    pub fn is_spec_defined(&self) -> bool {
372        !matches!(self, Self::Unknown { .. })
373    }
374
375    /// Inverse of [`Self::parse`]: produce the matching protocol
376    /// control [`Message`] (msg_type_id = 4, msg_stream_id = 0,
377    /// timestamp = 0). For the seven spec-defined variants this
378    /// emits byte-for-byte the same payload the corresponding
379    /// `build_user_control_*` builder would; for [`Self::Unknown`]
380    /// the event-type bytes and the carried `data` are concatenated
381    /// verbatim, so a parse / re-encode cycle is byte-stable.
382    pub fn to_message(&self) -> Message {
383        match self {
384            Self::StreamBegin { stream_id } => build_user_control_stream_begin(*stream_id),
385            Self::StreamEof { stream_id } => build_user_control_stream_eof(*stream_id),
386            Self::StreamDry { stream_id } => build_user_control_stream_dry(*stream_id),
387            Self::SetBufferLength {
388                stream_id,
389                buffer_ms,
390            } => build_user_control_set_buffer_length(*stream_id, *buffer_ms),
391            Self::StreamIsRecorded { stream_id } => {
392                build_user_control_stream_is_recorded(*stream_id)
393            }
394            Self::PingRequest { timestamp_ms } => build_user_control_ping_request(*timestamp_ms),
395            Self::PingResponse { timestamp_ms } => build_user_control_ping_response(*timestamp_ms),
396            Self::Unknown { event_type, data } => {
397                let mut p = Vec::with_capacity(2 + data.len());
398                p.extend_from_slice(&event_type.to_be_bytes());
399                p.extend_from_slice(data);
400                Message {
401                    msg_type_id: MSG_USER_CONTROL,
402                    msg_stream_id: 0,
403                    timestamp: 0,
404                    payload: p,
405                }
406            }
407        }
408    }
409}
410
411/// Helper for [`UserControlEvent::parse`] — read a 4-byte BE field
412/// out of `event_data` or surface [`Error::ProtocolViolation`] with
413/// the variant name in the message.
414fn read_u32_be(data: &[u8], variant: &str) -> Result<u32> {
415    if data.len() < 4 {
416        return Err(Error::ProtocolViolation(format!(
417            "UserControl {variant}: event data {} < 4 bytes",
418            data.len()
419        )));
420    }
421    Ok(u32::from_be_bytes([data[0], data[1], data[2], data[3]]))
422}
423
424pub fn build_ack(bytes_received: u32) -> Message {
425    Message {
426        msg_type_id: MSG_ACK,
427        msg_stream_id: 0,
428        timestamp: 0,
429        payload: bytes_received.to_be_bytes().to_vec(),
430    }
431}
432
433// ---------------------------------------------------------------------------
434// Command (AMF0) builders
435// ---------------------------------------------------------------------------
436
437/// `connect` — sent by the client right after handshake to open a
438/// NetConnection onto the server's `app`. `tc_url` is the full
439/// `rtmp://host[:port]/app` string; `app` is the last path segment.
440///
441/// Legacy publisher shape: no Enhanced RTMP capabilities advertised.
442/// For an E-RTMP-aware publisher use
443/// [`build_connect_with_caps`] which extends the Command Object with
444/// `fourCcList` / `audio|videoFourCcInfoMap` / `capsEx`.
445pub fn build_connect(transaction_id: f64, app: &str, tc_url: &str, flash_ver: &str) -> Message {
446    build_connect_with_caps(
447        transaction_id,
448        app,
449        tc_url,
450        flash_ver,
451        &ConnectCapabilities::default(),
452    )
453}
454
455/// `connect` with Enhanced RTMP v1+v2 capability advertisements
456/// (`enhanced-rtmp-v2.pdf` §"Enhancing NetConnection connect Command").
457///
458/// The legacy Command Object properties (`app` / `type` / `flashVer` /
459/// `tcUrl` / `fpad` / `capabilities` / `audioCodecs` / `videoCodecs` /
460/// `videoFunction`) are emitted in the historical order every commodity
461/// peer expects, and the non-default `ConnectCapabilities` entries are
462/// appended after them via [`ConnectCapabilities::encode_into`]. The
463/// per-property emission order is the documented one:
464/// `objectEncoding` → `fourCcList` → `videoFourCcInfoMap` →
465/// `audioFourCcInfoMap` → `capsEx`. Empty / default fields are skipped,
466/// so an empty `caps` block produces exactly the byte layout
467/// [`build_connect`] would.
468pub fn build_connect_with_caps(
469    transaction_id: f64,
470    app: &str,
471    tc_url: &str,
472    flash_ver: &str,
473    caps: &ConnectCapabilities,
474) -> Message {
475    let mut pairs: Vec<(String, Amf0Value)> = vec![
476        ("app".into(), Amf0Value::String(app.into())),
477        ("type".into(), Amf0Value::String("nonprivate".into())),
478        ("flashVer".into(), Amf0Value::String(flash_ver.into())),
479        ("tcUrl".into(), Amf0Value::String(tc_url.into())),
480        ("fpad".into(), Amf0Value::Boolean(false)),
481        ("capabilities".into(), Amf0Value::Number(15.0)),
482        ("audioCodecs".into(), Amf0Value::Number(0x0FFF as f64)),
483        ("videoCodecs".into(), Amf0Value::Number(0x00FF as f64)),
484        ("videoFunction".into(), Amf0Value::Number(1.0)),
485    ];
486    caps.encode_into(&mut pairs);
487    let payload = encode_command("connect", transaction_id, Amf0Value::Object(pairs), &[]);
488    Message {
489        msg_type_id: MSG_COMMAND_AMF0,
490        msg_stream_id: 0,
491        timestamp: 0,
492        payload,
493    }
494}
495
496/// `_result` for the connect transaction. Standard server reply carries
497/// the server's flashVer + a NetConnection.Connect.Success info object.
498///
499/// Legacy server shape: no Enhanced RTMP capability advertisement.
500/// E-RTMP-aware servers should use [`build_connect_result_with_caps`]
501/// to echo `videoFourCcInfoMap` / `capsEx` etc. back at the client per
502/// `enhanced-rtmp-v2.pdf` §"Enhancing NetConnection connect Command".
503pub fn build_connect_result(transaction_id: f64) -> Message {
504    build_connect_result_with_caps(transaction_id, &ConnectCapabilities::default())
505}
506
507/// `_result` for the connect transaction with Enhanced RTMP capability
508/// advertisement.
509///
510/// The Command Object slot (the first AMF0 value after the transaction
511/// id) is the server's properties bag (`fmsVer` / `capabilities` /
512/// `mode`); the trailing single argument is the
513/// `NetConnection.Connect.Success` info object. Any non-default
514/// `ConnectCapabilities` properties are appended to the info object —
515/// `enhanced-rtmp-v2.pdf` is explicit: "the server provides some
516/// properties within an Object as one of the parameters" and gives
517/// `videoFourCcInfoMap` / `capsEx` as the canonical names. The info
518/// object's existing `level` / `code` / `description` /
519/// `objectEncoding` block is preserved, so a pre-2023 client still sees
520/// the success status it expects and a v2-aware client lifts the
521/// capability properties off the same object via
522/// [`crate::caps::ConnectCapabilities::from_amf0`].
523pub fn build_connect_result_with_caps(transaction_id: f64, caps: &ConnectCapabilities) -> Message {
524    let props = Amf0Value::Object(vec![
525        ("fmsVer".into(), Amf0Value::String("FMS/3,0,1,123".into())),
526        ("capabilities".into(), Amf0Value::Number(31.0)),
527        ("mode".into(), Amf0Value::Number(1.0)),
528    ]);
529    // Info object carries the success status + the capability block.
530    // `objectEncoding` is encoded twice when the caller sets it — once
531    // in our default `0.0` slot and once via `encode_into`. We pick
532    // whichever the caller asks for: drop the default if they set their
533    // own.
534    let mut info_pairs: Vec<(String, Amf0Value)> = vec![
535        ("level".into(), Amf0Value::String("status".into())),
536        (
537            "code".into(),
538            Amf0Value::String("NetConnection.Connect.Success".into()),
539        ),
540        (
541            "description".into(),
542            Amf0Value::String("Connection accepted.".into()),
543        ),
544    ];
545    if caps.object_encoding.is_none() {
546        info_pairs.push(("objectEncoding".into(), Amf0Value::Number(0.0)));
547    }
548    caps.encode_into(&mut info_pairs);
549    let payload = encode_command(
550        "_result",
551        transaction_id,
552        props,
553        &[Amf0Value::Object(info_pairs)],
554    );
555    Message {
556        msg_type_id: MSG_COMMAND_AMF0,
557        msg_stream_id: 0,
558        timestamp: 0,
559        payload,
560    }
561}
562
563/// `releaseStream` — client advisory sent right before publish. The
564/// server's reply isn't required for correctness.
565pub fn build_release_stream(transaction_id: f64, stream_name: &str) -> Message {
566    let payload = encode_command(
567        "releaseStream",
568        transaction_id,
569        Amf0Value::Null,
570        &[Amf0Value::String(stream_name.into())],
571    );
572    Message {
573        msg_type_id: MSG_COMMAND_AMF0,
574        msg_stream_id: 0,
575        timestamp: 0,
576        payload,
577    }
578}
579
580/// `FCPublish` — another pre-publish advisory from Flash Media Live
581/// Encoder. Many servers don't care; we send it for compatibility
582/// with the few that do.
583pub fn build_fc_publish(transaction_id: f64, stream_name: &str) -> Message {
584    let payload = encode_command(
585        "FCPublish",
586        transaction_id,
587        Amf0Value::Null,
588        &[Amf0Value::String(stream_name.into())],
589    );
590    Message {
591        msg_type_id: MSG_COMMAND_AMF0,
592        msg_stream_id: 0,
593        timestamp: 0,
594        payload,
595    }
596}
597
598/// `createStream` — client requests a new NetStream handle. Server
599/// replies with `_result` carrying a fresh stream id the client uses
600/// for subsequent audio/video messages.
601pub fn build_create_stream(transaction_id: f64) -> Message {
602    let payload = encode_command("createStream", transaction_id, Amf0Value::Null, &[]);
603    Message {
604        msg_type_id: MSG_COMMAND_AMF0,
605        msg_stream_id: 0,
606        timestamp: 0,
607        payload,
608    }
609}
610
611pub fn build_create_stream_result(transaction_id: f64, stream_id: f64) -> Message {
612    let payload = encode_command(
613        "_result",
614        transaction_id,
615        Amf0Value::Null,
616        &[Amf0Value::Number(stream_id)],
617    );
618    Message {
619        msg_type_id: MSG_COMMAND_AMF0,
620        msg_stream_id: 0,
621        timestamp: 0,
622        payload,
623    }
624}
625
626/// `publish` — client tells the server which stream name it's about
627/// to feed. `publish_type` is usually `"live"`, `"record"`, or
628/// `"append"`.
629pub fn build_publish(
630    transaction_id: f64,
631    stream_id: u32,
632    stream_name: &str,
633    publish_type: &str,
634) -> Message {
635    let payload = encode_command(
636        "publish",
637        transaction_id,
638        Amf0Value::Null,
639        &[
640            Amf0Value::String(stream_name.into()),
641            Amf0Value::String(publish_type.into()),
642        ],
643    );
644    Message {
645        msg_type_id: MSG_COMMAND_AMF0,
646        msg_stream_id: stream_id,
647        timestamp: 0,
648        payload,
649    }
650}
651
652/// `onStatus` — server pushes this on the NetStream to signal state
653/// changes (e.g. `NetStream.Publish.Start`). `code` / `description`
654/// vary per event.
655pub fn build_on_status(stream_id: u32, level: &str, code: &str, description: &str) -> Message {
656    let info = Amf0Value::Object(vec![
657        ("level".into(), Amf0Value::String(level.into())),
658        ("code".into(), Amf0Value::String(code.into())),
659        ("description".into(), Amf0Value::String(description.into())),
660    ]);
661    let payload = encode_command("onStatus", 0.0, Amf0Value::Null, &[info]);
662    Message {
663        msg_type_id: MSG_COMMAND_AMF0,
664        msg_stream_id: stream_id,
665        timestamp: 0,
666        payload,
667    }
668}
669
670/// The `code` string a server MUST set in the onStatus Info Object to
671/// request a client reconnect, per `enhanced-rtmp-v2.pdf` §"Reconnect
672/// Request" (table "Info Object parameter for onStatus command when
673/// handling reconnect").
674pub const RECONNECT_REQUEST_CODE: &str = "NetConnection.Connect.ReconnectRequest";
675
676/// `onStatus(NetConnection.Connect.ReconnectRequest)` — Enhanced RTMP
677/// v2 §"Reconnect Request". A server emits this NetConnection-level
678/// onStatus command to ask the client to reconnect — e.g. ahead of a
679/// live-streaming-server update, or to remap the client to a
680/// different server instance for load balancing / geolocation.
681///
682/// Per the spec's Info Object table:
683///
684/// * `code` MUST be `NetConnection.Connect.ReconnectRequest`
685///   ([`RECONNECT_REQUEST_CODE`]).
686/// * `level` MUST be `status`.
687/// * `tcUrl` (optional) — "absolute or relative URI reference of the
688///   server to which to reconnect. If not specified, use the tcUrl
689///   for the current connection." A server that aims to remap the
690///   client MUST set it.
691/// * `description` (optional) — human-readable information about the
692///   message.
693///
694/// The command rides the NetConnection (message stream id 0) with
695/// transaction id 0 ("no response needed") and a null Command Object,
696/// matching the spec's "Server to client, NetConnection onStatus
697/// command" table.
698pub fn build_reconnect_request(tc_url: Option<&str>, description: Option<&str>) -> Message {
699    let mut props = vec![
700        (
701            "code".into(),
702            Amf0Value::String(RECONNECT_REQUEST_CODE.into()),
703        ),
704        ("level".into(), Amf0Value::String("status".into())),
705    ];
706    if let Some(desc) = description {
707        props.push(("description".into(), Amf0Value::String(desc.into())));
708    }
709    if let Some(url) = tc_url {
710        props.push(("tcUrl".into(), Amf0Value::String(url.into())));
711    }
712    let payload = encode_command(
713        "onStatus",
714        0.0,
715        Amf0Value::Null,
716        &[Amf0Value::Object(props)],
717    );
718    Message {
719        msg_type_id: MSG_COMMAND_AMF0,
720        // NetConnection commands live on the control stream (message
721        // stream id 0) — this is a connection-level status event, not
722        // a NetStream one.
723        msg_stream_id: 0,
724        timestamp: 0,
725        payload,
726    }
727}
728
729/// `@setDataFrame("onMetaData", …)` — the standard way to publish
730/// per-stream metadata (width, height, video/audio codec ids,
731/// duration, …) before the first audio/video packet.
732pub fn build_set_data_frame(stream_id: u32, metadata: Amf0Value) -> Message {
733    let mut payload = Vec::new();
734    crate::amf::encode(&mut payload, &Amf0Value::String("@setDataFrame".into()));
735    crate::amf::encode(&mut payload, &Amf0Value::String("onMetaData".into()));
736    crate::amf::encode(&mut payload, &metadata);
737    Message {
738        msg_type_id: MSG_DATA_AMF0,
739        msg_stream_id: stream_id,
740        timestamp: 0,
741        payload,
742    }
743}
744
745#[cfg(test)]
746mod tests {
747    use super::*;
748
749    /// Exact wire bytes for a `UserControl StreamBegin` per RTMP 1.0
750    /// §7.1.7: 2-byte event type (0x0000) + 4-byte stream id BE.
751    #[test]
752    fn user_control_stream_begin_wire_bytes() {
753        let m = build_user_control_stream_begin(1);
754        assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
755        assert_eq!(m.msg_stream_id, 0);
756        assert_eq!(m.payload, vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x01]);
757    }
758
759    /// Symmetric wire bytes for `UserControl StreamEOF` (type 1): a
760    /// publisher-side close emits this to signal end-of-publish before
761    /// the TCP FIN, so the peer doesn't have to guess whether the
762    /// connection dropped or terminated cleanly.
763    #[test]
764    fn user_control_stream_eof_wire_bytes() {
765        let m = build_user_control_stream_eof(7);
766        assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
767        assert_eq!(m.msg_stream_id, 0);
768        assert_eq!(m.timestamp, 0);
769        // Event type 1 (StreamEOF) | stream id 7.
770        assert_eq!(m.payload, vec![0x00, 0x01, 0x00, 0x00, 0x00, 0x07]);
771    }
772
773    /// Wire bytes for `UserControl StreamDry` (type 2). Same six-byte
774    /// frame as StreamBegin / StreamEOF: 2-byte BE event type, 4-byte BE
775    /// stream id.
776    #[test]
777    fn user_control_stream_dry_wire_bytes() {
778        let m = build_user_control_stream_dry(0x0010_2030);
779        assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
780        assert_eq!(m.msg_stream_id, 0);
781        assert_eq!(m.payload, vec![0x00, 0x02, 0x00, 0x10, 0x20, 0x30]);
782    }
783
784    /// Wire bytes for `UserControl SetBufferLength` (type 3). The only
785    /// UCM event with an 8-byte event-data payload: 4 bytes stream id +
786    /// 4 bytes buffer length in milliseconds.
787    #[test]
788    fn user_control_set_buffer_length_wire_bytes() {
789        let m = build_user_control_set_buffer_length(1, 3000);
790        assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
791        assert_eq!(m.msg_stream_id, 0);
792        // Event type 3 (SetBufferLength) | stream id 1 | buffer 3000 ms.
793        assert_eq!(
794            m.payload,
795            vec![0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x0B, 0xB8],
796        );
797    }
798
799    /// Wire bytes for `UserControl StreamIsRecorded` (type 4): 2-byte BE
800    /// event type, 4-byte BE stream id.
801    #[test]
802    fn user_control_stream_is_recorded_wire_bytes() {
803        let m = build_user_control_stream_is_recorded(5);
804        assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
805        assert_eq!(m.msg_stream_id, 0);
806        assert_eq!(m.payload, vec![0x00, 0x04, 0x00, 0x00, 0x00, 0x05]);
807    }
808
809    /// Wire bytes for `UserControl PingRequest` (type 6): 2-byte BE
810    /// event type, 4-byte BE local-server-time timestamp.
811    #[test]
812    fn user_control_ping_request_wire_bytes() {
813        let m = build_user_control_ping_request(0xDEAD_BEEF);
814        assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
815        assert_eq!(m.msg_stream_id, 0);
816        assert_eq!(m.payload, vec![0x00, 0x06, 0xDE, 0xAD, 0xBE, 0xEF]);
817    }
818
819    /// Wire bytes for `UserControl PingResponse` (type 7): the same
820    /// 4-byte timestamp the matching PingRequest carried, prefixed with
821    /// the type-7 event header.
822    #[test]
823    fn user_control_ping_response_wire_bytes() {
824        let m = build_user_control_ping_response(0xDEAD_BEEF);
825        assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
826        assert_eq!(m.msg_stream_id, 0);
827        assert_eq!(m.payload, vec![0x00, 0x07, 0xDE, 0xAD, 0xBE, 0xEF]);
828    }
829
830    /// Wire bytes for an Abort Message (protocol control type 2, RTMP
831    /// 1.0 §5.2): a bare 4-byte big-endian chunk stream ID on the
832    /// control stream (`msg_stream_id == 0`, `timestamp == 0`).
833    #[test]
834    fn abort_wire_bytes() {
835        let m = build_abort(0x0001_0203);
836        assert_eq!(m.msg_type_id, MSG_ABORT);
837        assert_eq!(m.msg_stream_id, 0);
838        assert_eq!(m.timestamp, 0);
839        assert_eq!(m.payload, vec![0x00, 0x01, 0x02, 0x03]);
840    }
841
842    /// `build_connect_with_caps` with a default capability block emits
843    /// byte-identical output to the legacy `build_connect` builder.
844    #[test]
845    fn connect_with_empty_caps_matches_legacy() {
846        let legacy = build_connect(1.0, "live", "rtmp://srv/live", "FMLE/3.0");
847        let caps_empty = build_connect_with_caps(
848            1.0,
849            "live",
850            "rtmp://srv/live",
851            "FMLE/3.0",
852            &ConnectCapabilities::default(),
853        );
854        assert_eq!(legacy.payload, caps_empty.payload);
855    }
856
857    /// `build_connect_with_caps` appends the Enhanced RTMP properties
858    /// onto the Command Object in the documented v1+v2 order, after the
859    /// legacy `videoFunction` field.
860    #[test]
861    fn connect_with_caps_appends_in_documented_order() {
862        let mut video = crate::caps::FourCcInfoMap::new();
863        video.insert("*", crate::caps::FOURCC_INFO_CAN_FORWARD);
864        let mut audio = crate::caps::FourCcInfoMap::new();
865        audio.insert("Opus", crate::caps::FOURCC_INFO_CAN_DECODE);
866        let caps = ConnectCapabilities {
867            object_encoding: Some(crate::caps::OBJECT_ENCODING_AMF3),
868            fourcc_list: vec!["av01".into(), "hvc1".into()],
869            video_fourcc_info_map: video,
870            audio_fourcc_info_map: audio,
871            caps_ex: crate::caps::CAPS_EX_RECONNECT | crate::caps::CAPS_EX_MULTITRACK,
872        };
873
874        let msg = build_connect_with_caps(1.0, "live", "rtmp://srv/live", "FMLE/3.0", &caps);
875        // Walk the AMF0 payload and pull the Command Object's property
876        // names. The third value is the Command Object (post-name,
877        // post-tx-id).
878        let vals = crate::amf::decode_all(&msg.payload).unwrap();
879        assert_eq!(vals[0].as_str(), Some("connect"));
880        let cmd_obj = match &vals[2] {
881            Amf0Value::Object(p) => p,
882            other => panic!("expected Object for command object, got {other:?}"),
883        };
884        let names: Vec<&str> = cmd_obj.iter().map(|(k, _)| k.as_str()).collect();
885        let legacy_count = names
886            .iter()
887            .position(|n| *n == "videoFunction")
888            .expect("legacy block must end with videoFunction")
889            + 1;
890        let extras = &names[legacy_count..];
891        assert_eq!(
892            extras,
893            &[
894                "objectEncoding",
895                "fourCcList",
896                "videoFourCcInfoMap",
897                "audioFourCcInfoMap",
898                "capsEx",
899            ],
900        );
901    }
902
903    /// `build_connect_result_with_caps` echoes the capability block back
904    /// inside the trailing info object alongside the
905    /// `NetConnection.Connect.Success` status.
906    #[test]
907    fn connect_result_with_caps_emits_info_block() {
908        let mut video = crate::caps::FourCcInfoMap::new();
909        video.insert("hvc1", crate::caps::FOURCC_INFO_CAN_DECODE);
910        let caps = ConnectCapabilities {
911            video_fourcc_info_map: video,
912            caps_ex: crate::caps::CAPS_EX_MULTITRACK | crate::caps::CAPS_EX_MOD_EX,
913            ..Default::default()
914        };
915        let msg = build_connect_result_with_caps(1.0, &caps);
916
917        let vals = crate::amf::decode_all(&msg.payload).unwrap();
918        assert_eq!(vals[0].as_str(), Some("_result"));
919        // Info object is the fourth AMF0 value.
920        let info = &vals[3];
921        assert_eq!(
922            info.get("code").and_then(Amf0Value::as_str),
923            Some("NetConnection.Connect.Success"),
924        );
925        let parsed = ConnectCapabilities::from_amf0(info);
926        assert_eq!(parsed.caps_ex, caps.caps_ex);
927        assert_eq!(parsed.video_fourcc_info_map.get("hvc1"), Some(1));
928    }
929
930    /// `build_connect_result_with_caps` with an empty capability block
931    /// emits the legacy bytes verbatim — pre-2023 clients keep parsing
932    /// the same status info object they've always seen.
933    #[test]
934    fn connect_result_with_empty_caps_matches_legacy() {
935        let legacy = build_connect_result(7.0);
936        let empty = build_connect_result_with_caps(7.0, &ConnectCapabilities::default());
937        assert_eq!(legacy.payload, empty.payload);
938    }
939
940    // ---- UserControlEvent typed accessor (parse + round-trip) -----------
941
942    /// `UserControlEvent::parse` classifies each spec-defined event
943    /// type into its strongly-typed variant. Spot-check all seven.
944    #[test]
945    fn user_control_event_parse_recognises_spec_types() {
946        let cases: &[(&[u8], UserControlEvent)] = &[
947            (
948                &[0x00, 0x00, 0x00, 0x00, 0x00, 0x01],
949                UserControlEvent::StreamBegin { stream_id: 1 },
950            ),
951            (
952                &[0x00, 0x01, 0x00, 0x00, 0x00, 0x07],
953                UserControlEvent::StreamEof { stream_id: 7 },
954            ),
955            (
956                &[0x00, 0x02, 0x00, 0x10, 0x20, 0x30],
957                UserControlEvent::StreamDry {
958                    stream_id: 0x0010_2030,
959                },
960            ),
961            (
962                &[0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x0B, 0xB8],
963                UserControlEvent::SetBufferLength {
964                    stream_id: 1,
965                    buffer_ms: 3000,
966                },
967            ),
968            (
969                &[0x00, 0x04, 0x00, 0x00, 0x00, 0x05],
970                UserControlEvent::StreamIsRecorded { stream_id: 5 },
971            ),
972            (
973                &[0x00, 0x06, 0xDE, 0xAD, 0xBE, 0xEF],
974                UserControlEvent::PingRequest {
975                    timestamp_ms: 0xDEAD_BEEF,
976                },
977            ),
978            (
979                &[0x00, 0x07, 0xDE, 0xAD, 0xBE, 0xEF],
980                UserControlEvent::PingResponse {
981                    timestamp_ms: 0xDEAD_BEEF,
982                },
983            ),
984        ];
985        for (wire, expected) in cases {
986            let parsed = UserControlEvent::parse(wire).expect("parse UCM");
987            assert_eq!(&parsed, expected);
988            assert!(parsed.is_spec_defined());
989            assert_eq!(
990                parsed.event_type() as usize,
991                ((wire[0] as usize) << 8) | wire[1] as usize
992            );
993        }
994    }
995
996    /// Parse → re-encode of each spec-defined builder output is
997    /// byte-identical to the original. Locks the inverse property
998    /// against accidental wire-format drift.
999    #[test]
1000    fn user_control_event_round_trip_matches_builder_bytes() {
1001        let originals = [
1002            build_user_control_stream_begin(1),
1003            build_user_control_stream_eof(7),
1004            build_user_control_stream_dry(0x0010_2030),
1005            build_user_control_set_buffer_length(1, 3000),
1006            build_user_control_stream_is_recorded(5),
1007            build_user_control_ping_request(0xDEAD_BEEF),
1008            build_user_control_ping_response(0xDEAD_BEEF),
1009        ];
1010        for m in &originals {
1011            let parsed = UserControlEvent::parse(&m.payload).expect("parse UCM");
1012            let rebuilt = parsed.to_message();
1013            assert_eq!(rebuilt.msg_type_id, MSG_USER_CONTROL);
1014            assert_eq!(rebuilt.msg_stream_id, 0);
1015            assert_eq!(rebuilt.timestamp, 0);
1016            assert_eq!(rebuilt.payload, m.payload);
1017        }
1018    }
1019
1020    /// UCM event type 5 (spec-reserved) and any value ≥ 8 surface as
1021    /// [`UserControlEvent::Unknown`] with the unconsumed tail bytes
1022    /// preserved verbatim. Round-tripping an `Unknown` rebuilds the
1023    /// exact same payload — forwarding ingests stay format-neutral.
1024    #[test]
1025    fn user_control_event_unknown_preserves_event_type_and_tail() {
1026        // §7.1.7 leaves event type 5 reserved.
1027        let wire: &[u8] = &[0x00, 0x05, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE];
1028        let parsed = UserControlEvent::parse(wire).expect("parse reserved UCM");
1029        assert_eq!(
1030            parsed,
1031            UserControlEvent::Unknown {
1032                event_type: 5,
1033                data: vec![0xAA, 0xBB, 0xCC, 0xDD, 0xEE],
1034            },
1035        );
1036        assert!(!parsed.is_spec_defined());
1037        assert_eq!(parsed.event_type(), 5);
1038        // Round-trip: the rebuilt payload is byte-identical.
1039        let rebuilt = parsed.to_message();
1040        assert_eq!(rebuilt.payload, wire);
1041
1042        // Forward-compat: any event type ≥ 8 also lands in Unknown,
1043        // even with an empty event-data tail (no truncation refusal).
1044        let future: &[u8] = &[0xFF, 0xFE];
1045        let parsed_future = UserControlEvent::parse(future).expect("parse future UCM");
1046        assert_eq!(
1047            parsed_future,
1048            UserControlEvent::Unknown {
1049                event_type: 0xFFFE,
1050                data: Vec::new(),
1051            },
1052        );
1053        assert_eq!(parsed_future.to_message().payload, future);
1054    }
1055
1056    /// `parse` refuses payloads truncated below the 2-byte event-type
1057    /// header AND below the fixed event-data size of each spec-defined
1058    /// variant (`SetBufferLength` needs 8 bytes, every other
1059    /// spec-defined variant needs 4 bytes).
1060    #[test]
1061    fn user_control_event_parse_rejects_truncated_payload() {
1062        // < 2 bytes — can't even read the event type.
1063        assert!(matches!(
1064            UserControlEvent::parse(&[]),
1065            Err(Error::ProtocolViolation(_))
1066        ));
1067        assert!(matches!(
1068            UserControlEvent::parse(&[0x00]),
1069            Err(Error::ProtocolViolation(_))
1070        ));
1071        // event type present but spec-defined variant body truncated.
1072        for type_byte in [
1073            USR_STREAM_BEGIN,
1074            USR_STREAM_EOF,
1075            USR_STREAM_DRY,
1076            USR_STREAM_IS_RECORDED,
1077            USR_PING_REQUEST,
1078            USR_PING_RESPONSE,
1079        ] {
1080            let wire = [(type_byte >> 8) as u8, type_byte as u8, 0x00, 0x00, 0x00];
1081            assert!(matches!(
1082                UserControlEvent::parse(&wire),
1083                Err(Error::ProtocolViolation(_))
1084            ));
1085        }
1086        // SetBufferLength's 8-byte rule: 7 bytes refused, 8 accepted.
1087        let too_short = [0x00, 0x03, 0, 0, 0, 1, 0, 0, 11];
1088        assert!(matches!(
1089            UserControlEvent::parse(&too_short),
1090            Err(Error::ProtocolViolation(_))
1091        ));
1092    }
1093
1094    /// `build_reconnect_request` shape per `enhanced-rtmp-v2.pdf`
1095    /// §"Reconnect Request": `["onStatus", 0.0, null, info]`, where
1096    /// info carries `code = NetConnection.Connect.ReconnectRequest`,
1097    /// `level = status`, plus the optional `tcUrl` / `description`
1098    /// pairs — and the command rides message stream 0 (NetConnection,
1099    /// not NetStream).
1100    #[test]
1101    fn reconnect_request_full_info_object() {
1102        let m = build_reconnect_request(
1103            Some("rtmp://foo.mydomain.com:1935/realtimeapp"),
1104            Some("The streaming server is undergoing updates."),
1105        );
1106        assert_eq!(m.msg_type_id, MSG_COMMAND_AMF0);
1107        assert_eq!(m.msg_stream_id, 0, "NetConnection command stream");
1108        let vals = crate::amf::decode_all(&m.payload).unwrap();
1109        assert_eq!(vals[0].as_str(), Some("onStatus"));
1110        assert_eq!(vals[1].as_f64(), Some(0.0), "transaction id 0");
1111        assert_eq!(vals[2], Amf0Value::Null, "no command object");
1112        let info = &vals[3];
1113        assert_eq!(
1114            info.get("code").and_then(Amf0Value::as_str),
1115            Some(RECONNECT_REQUEST_CODE)
1116        );
1117        assert_eq!(
1118            info.get("level").and_then(Amf0Value::as_str),
1119            Some("status")
1120        );
1121        assert_eq!(
1122            info.get("tcUrl").and_then(Amf0Value::as_str),
1123            Some("rtmp://foo.mydomain.com:1935/realtimeapp")
1124        );
1125        assert_eq!(
1126            info.get("description").and_then(Amf0Value::as_str),
1127            Some("The streaming server is undergoing updates.")
1128        );
1129    }
1130
1131    /// Both Info Object extras are optional per spec — when neither is
1132    /// supplied the info object carries exactly the two mandatory
1133    /// pairs (`code`, `level`).
1134    #[test]
1135    fn reconnect_request_minimal_info_object() {
1136        let m = build_reconnect_request(None, None);
1137        let vals = crate::amf::decode_all(&m.payload).unwrap();
1138        let info = &vals[3];
1139        assert_eq!(
1140            info.get("code").and_then(Amf0Value::as_str),
1141            Some(RECONNECT_REQUEST_CODE)
1142        );
1143        assert_eq!(
1144            info.get("level").and_then(Amf0Value::as_str),
1145            Some("status")
1146        );
1147        assert!(info.get("tcUrl").is_none(), "tcUrl omitted when None");
1148        assert!(
1149            info.get("description").is_none(),
1150            "description omitted when None"
1151        );
1152    }
1153}