oxideav_rtmp/client.rs
1//! RTMP client: push a live stream to a remote RTMP server.
2//!
3//! ```text
4//! let mut client = RtmpClient::connect("rtmp://remote/live/key")?;
5//! client.send_video_sequence_header(&avcc_bytes)?;
6//! client.send_audio_sequence_header(&aac_config)?;
7//! loop {
8//! client.send_video(ts_ms, keyframe, &nalu_bytes)?;
9//! client.send_audio(ts_ms, &aac_frame)?;
10//! }
11//! ```
12//!
13//! This crate emits one H.264 NAL per video call (no re-fragmentation
14//! into AVCC length-prefixed packets beyond the single-NAL case).
15//! Callers with multiple NALUs per sample can concatenate them into
16//! one body — RTMP just forwards bytes on the video channel.
17
18use std::collections::VecDeque;
19use std::io::{Read, Write};
20use std::net::{Shutdown, TcpStream, ToSocketAddrs};
21use std::time::Duration;
22
23use crate::aggregate::parse_aggregate;
24use crate::amf::{self, Amf0Value};
25use crate::amf3;
26use crate::caps::ConnectCapabilities;
27use crate::chunk::{ChunkReader, ChunkWriter, Message};
28use crate::error::{Error, Result};
29use crate::flv::{self, AudioTag, VideoTag};
30use crate::message::*;
31
32/// Server-originated event observed by an [`RtmpClient`] in publish
33/// mode.
34///
35/// During an active publish the client mostly writes audio / video /
36/// data and the server stays mostly silent — but a few server→client
37/// notifications matter end-to-end. The most important is
38/// [`StreamEof`](Self::StreamEof): the server signalling, per RTMP 1.0
39/// §7.1.7, that "the stream is dry, no more data will be sent without
40/// additional commands." A symmetric publish-side server uses the same
41/// `UserControl StreamEOF` event to mark end-of-publish before closing
42/// the TCP write half — and the client should treat that as a clean
43/// stream end rather than as an unexpected FIN.
44#[derive(Debug, Clone, PartialEq)]
45pub enum ClientEvent {
46 /// The server emitted `UserControl StreamBegin(stream_id)`
47 /// (UCM event type 0). Informational — most servers send this once
48 /// right after `createStream` succeeds.
49 StreamBegin { stream_id: u32 },
50 /// The server emitted `UserControl StreamEOF(stream_id)`
51 /// (UCM event type 1). End-of-stream from the server side. After
52 /// observing this, the caller should stop writing and shut the
53 /// client down via [`RtmpClient::close`].
54 StreamEof { stream_id: u32 },
55 /// The server emitted `UserControl StreamDry(stream_id)`
56 /// (UCM event type 2). Per RTMP 1.0 §3.7, the server uses this to
57 /// notify the client "that there is no more data on the stream. If
58 /// the server does not detect any message for a time period, it
59 /// can notify the subscribed clients that the stream is dry."
60 /// Distinct from [`StreamEof`](Self::StreamEof): `StreamDry` is a
61 /// "no data right now" signal that may resolve once more data
62 /// arrives; `StreamEof` is "playback finished, no more without
63 /// further commands."
64 StreamDry { stream_id: u32 },
65 /// The server emitted `UserControl StreamIsRecorded(stream_id)`
66 /// (UCM event type 4). Per RTMP 1.0 §3.7, "the server sends this
67 /// event to notify the client that the stream is a recorded
68 /// stream." Servers typically emit this right after `StreamBegin`
69 /// for an on-demand stream; for a publish-only client the event is
70 /// informational and usually ignored.
71 StreamIsRecorded { stream_id: u32 },
72 /// The server emitted `UserControl PingResponse(timestamp_ms)`
73 /// (UCM event type 7). Per RTMP 1.0 §3.7, the client sends a
74 /// `PingResponse` "in response to the ping request. The event
75 /// data is a 4-byte timestamp, which was received with the
76 /// kMsgPingRequest request." A server that emits `PingResponse`
77 /// is typically echoing back our own (publisher-side) `PingRequest`
78 /// — useful for measuring round-trip latency. The variant carries
79 /// the echoed timestamp verbatim.
80 PingResponse { timestamp_ms: u32 },
81 /// The server emitted `onStatus(...)` carrying NetStream state.
82 /// `level` is typically `"status"` / `"warning"` / `"error"`;
83 /// `code` is e.g. `"NetStream.Publish.Start"` /
84 /// `"NetStream.Unpublish.Success"` / `"NetStream.Publish.BadName"`.
85 OnStatus {
86 level: String,
87 code: String,
88 description: String,
89 },
90 /// The server emitted
91 /// `onStatus(NetConnection.Connect.ReconnectRequest)` — Enhanced
92 /// RTMP v2 §"Reconnect Request". The server is asking us to
93 /// reconnect, e.g. ahead of a server update or to remap us to a
94 /// different server instance.
95 ///
96 /// Per the spec's message flow, on receipt the client "persists
97 /// in streaming to/from the current server up to the next
98 /// appropriate media boundary, such as a keyframe. Subsequently,
99 /// it establishes a connection with a new server and disconnects
100 /// from the old server." So: finish the current GOP, then dial
101 /// [`RtmpClient::resolve_reconnect_url`]`(tc_url.as_deref())` with
102 /// a fresh [`RtmpClient::connect`] and drop this client.
103 ///
104 /// `tc_url` is the optional Info-Object property naming where to
105 /// reconnect — an absolute (`rtmp://host/app`) or relative
106 /// (`//host/app`, `/app`) URI reference. `None` means "use the
107 /// tcUrl for the current connection" per spec.
108 ReconnectRequest {
109 tc_url: Option<String>,
110 description: String,
111 },
112 /// The server emitted `_result(transaction_id, ...)` for a command
113 /// the client issued. The publish-time `connect` / `createStream`
114 /// transactions are consumed internally by [`RtmpClient::connect`];
115 /// any subsequent `_result` (e.g. a custom RPC sent after publish
116 /// started) surfaces here so the caller can match it against its
117 /// own transaction id.
118 Result {
119 transaction_id: f64,
120 values: Vec<Amf0Value>,
121 },
122 /// The server emitted `_error(transaction_id, ...)`. Symmetric to
123 /// [`Result`](Self::Result) but for the failure path.
124 ErrorReply {
125 transaction_id: f64,
126 values: Vec<Amf0Value>,
127 },
128 /// Any other server-originated message (ping, ack, set-chunk-size,
129 /// bandwidth — most of which the client handles transparently
130 /// inside [`RtmpClient::poll_event`] before this variant ever fires).
131 /// The variant exists so the caller's `match` arm can keep going.
132 Other,
133}
134
135const CLIENT_CHUNK_SIZE: u32 = 4096;
136const FLASH_VER: &str = "FMLE/3.0 (compatible; oxideav-rtmp)";
137
138pub struct RtmpClient {
139 stream: TcpStream,
140 /// Kept around so `recv` helpers (ack, onStatus replies, the
141 /// server-side `UserControl StreamEOF` mirror of our own
142 /// publish-side teardown) have somewhere to drain the server's
143 /// side. Surfaced through [`poll_event`](Self::poll_event).
144 reader: ChunkReader<TcpStream>,
145 writer: ChunkWriter<TcpStream>,
146 stream_id: u32,
147 /// Monotonic counter used for AMF command transaction ids.
148 next_tx: f64,
149 /// Set once we've observed the read half drain (EOF / connection
150 /// reset) so subsequent `poll_event` calls return `Ok(None)` rather
151 /// than re-entering [`ChunkReader::read_message`] on a dead socket.
152 /// Distinct from a `StreamEOF` user-control event — the server
153 /// normally sends `StreamEOF` *first* then a trailing onStatus, so
154 /// `poll_event` keeps reading until the kernel reports EOF.
155 read_eof: bool,
156 /// Enhanced RTMP capability block lifted from the server's
157 /// `_result(connect)` info object. Empty when the server didn't
158 /// advertise any v1+v2 capabilities (the historical pre-2023
159 /// shape). Inspect via [`server_capabilities`](Self::server_capabilities).
160 server_caps: ConnectCapabilities,
161 /// The `tcUrl` this client dialled, kept so an Enhanced RTMP v2
162 /// `NetConnection.Connect.ReconnectRequest` whose Info Object
163 /// omits `tcUrl` — or names a *relative* URI reference — can be
164 /// resolved per spec ("if not specified, use the tcUrl for the
165 /// current connection. A relative URI reference should be
166 /// resolved relative to the tcUrl for the current connection").
167 tc_url: String,
168 /// Sub-messages decomposed out of a server-originated Aggregate
169 /// Message (type 22) per RTMP 1.0 §7.1.6 but not yet routed
170 /// through the [`poll_event`](Self::poll_event) classify path.
171 /// In publish mode a remote server rarely batches its replies
172 /// this way, but `poll_event` decomposes the aggregate
173 /// transparently so a publisher that opted into a server-side
174 /// aggregate digest (`@enableEnhancedRTMP`-style negotiation, or
175 /// a peer reflecting its own ack stream as an aggregate) still
176 /// sees the per-event classification.
177 pending_subs: VecDeque<Message>,
178}
179
180/// Parsed RTMP URL: `rtmp://host[:port]/app/stream_name`.
181#[derive(Debug, Clone)]
182pub struct RtmpUrl {
183 pub host: String,
184 pub port: u16,
185 pub app: String,
186 pub stream_name: String,
187 pub tc_url: String,
188}
189
190impl RtmpUrl {
191 pub fn parse(url: &str) -> Result<Self> {
192 let s = url
193 .strip_prefix("rtmp://")
194 .ok_or_else(|| Error::Other(format!("not an rtmp:// URL: {url}")))?;
195 // authority/path
196 let slash = s
197 .find('/')
198 .ok_or_else(|| Error::Other("missing /app in rtmp URL".into()))?;
199 let authority = &s[..slash];
200 let path = &s[slash + 1..];
201 let (host, port) = match authority.rsplit_once(':') {
202 Some((h, p)) => (
203 h.to_owned(),
204 p.parse::<u16>()
205 .map_err(|e| Error::Other(format!("rtmp URL bad port: {e}")))?,
206 ),
207 None => (authority.to_owned(), 1935),
208 };
209 let (app, stream_name) = match path.find('/') {
210 Some(i) => (path[..i].to_owned(), path[i + 1..].to_owned()),
211 None => (path.to_owned(), String::new()),
212 };
213 let tc_url = format!("rtmp://{authority}/{app}");
214 Ok(Self {
215 host,
216 port,
217 app,
218 stream_name,
219 tc_url,
220 })
221 }
222}
223
224impl RtmpClient {
225 /// Dial the given `rtmp://host[:port]/app/stream_name` URL,
226 /// perform the full handshake + connect + createStream + publish
227 /// sequence, and return a ready-to-send client.
228 pub fn connect(url: &str) -> Result<Self> {
229 let parsed = RtmpUrl::parse(url)?;
230 Self::connect_parsed(&parsed, "live", &ConnectCapabilities::default())
231 }
232
233 /// Same as [`connect`](Self::connect) but lets the caller pick the
234 /// RTMP `publish` type (typically `"live"`, `"record"`, or
235 /// `"append"`).
236 pub fn connect_with_type(url: &str, publish_type: &str) -> Result<Self> {
237 let parsed = RtmpUrl::parse(url)?;
238 Self::connect_parsed(&parsed, publish_type, &ConnectCapabilities::default())
239 }
240
241 /// Connect and advertise the supplied Enhanced RTMP v1+v2
242 /// capability block in the NetConnection `connect` command
243 /// (`enhanced-rtmp-v2.pdf` §"Enhancing NetConnection connect
244 /// Command"). The block is appended to the legacy Command Object
245 /// in the documented order — peers that don't speak E-RTMP keep
246 /// parsing the message correctly because the extras are tacked on
247 /// after the historical `videoFunction` field.
248 ///
249 /// The server's `_result` properties object is parsed for the same
250 /// set of keys and stashed on the client; retrieve it via
251 /// [`server_capabilities`](Self::server_capabilities) to learn
252 /// which v1+v2 features the peer agreed to support.
253 pub fn connect_with_capabilities(
254 url: &str,
255 publish_type: &str,
256 caps: &ConnectCapabilities,
257 ) -> Result<Self> {
258 let parsed = RtmpUrl::parse(url)?;
259 Self::connect_parsed(&parsed, publish_type, caps)
260 }
261
262 /// Capability block advertised by the server in `_result(connect)`.
263 ///
264 /// Empty when the peer is pre-2023 / unaware of E-RTMP — callers can
265 /// detect that with [`ConnectCapabilities::is_empty`]. Otherwise
266 /// describes which FourCC codecs the server reports it can decode /
267 /// encode / forward, plus the v2 `capsEx` bitfield (Reconnect,
268 /// Multitrack, ModEx, TimestampNanoOffset).
269 pub fn server_capabilities(&self) -> &ConnectCapabilities {
270 &self.server_caps
271 }
272
273 /// The `tcUrl` this client dialled (e.g.
274 /// `rtmp://host:1935/app`) — the base every Enhanced RTMP v2
275 /// reconnect target resolves against.
276 pub fn tc_url(&self) -> &str {
277 &self.tc_url
278 }
279
280 /// Resolve the `tcUrl` carried by an Enhanced RTMP v2
281 /// [`ClientEvent::ReconnectRequest`] into the absolute URL to
282 /// re-dial, per `enhanced-rtmp-v2.pdf` §"Reconnect Request":
283 ///
284 /// * `None` → "use the tcUrl for the current connection".
285 /// * `Some(reference)` → "absolute or relative URI reference of
286 /// the server to which to reconnect. A relative URI reference
287 /// should be resolved relative to the tcUrl for the current
288 /// connection." All four spec example shapes are honoured:
289 /// `rtmp://foo.mydomain.com:1935/realtimeapp` (absolute),
290 /// `//192.0.2.0/realtimeapp` (network-path: keep our scheme),
291 /// `/realtimeapp` (absolute-path: keep scheme + authority), and
292 /// `realtimeapp` (relative-path: merge onto our tcUrl's path).
293 ///
294 /// Append the stream key (`/{stream_name}`) and feed the result to
295 /// [`RtmpClient::connect`] to complete the spec's reconnect flow.
296 pub fn resolve_reconnect_url(&self, tc_url: Option<&str>) -> String {
297 match tc_url {
298 Some(reference) => resolve_tc_url(&self.tc_url, reference),
299 None => self.tc_url.clone(),
300 }
301 }
302
303 fn connect_parsed(u: &RtmpUrl, publish_type: &str, caps: &ConnectCapabilities) -> Result<Self> {
304 let sock_addr = (u.host.as_str(), u.port)
305 .to_socket_addrs()
306 .map_err(Error::from)?
307 .next()
308 .ok_or_else(|| Error::Other(format!("resolved no addresses for {}", u.host)))?;
309 let stream = TcpStream::connect_timeout(&sock_addr, Duration::from_secs(15))?;
310 let _ = stream.set_nodelay(true);
311
312 // Handshake on a fresh clone — no chunk state is shared with
313 // it.
314 let mut hs = stream.try_clone()?;
315 crate::handshake::client_handshake(&mut hs)?;
316
317 let mut reader = ChunkReader::new(stream.try_clone()?);
318 let mut writer = ChunkWriter::new(stream.try_clone()?);
319
320 // We bump chunk size immediately — most commodity publishers
321 // do this too. Saves a bunch of chunk headers over the A/V path.
322 writer.write_message(
323 CSID_PROTOCOL_CONTROL,
324 &build_set_chunk_size(CLIENT_CHUNK_SIZE),
325 )?;
326 writer.set_chunk_size(CLIENT_CHUNK_SIZE as usize);
327
328 // Send connect.
329 let tx = 1.0;
330 writer.write_message(
331 CSID_COMMAND,
332 &build_connect_with_caps(tx, &u.app, &u.tc_url, FLASH_VER, caps),
333 )?;
334 writer.flush()?;
335
336 // Drain until we see the _result for connect; lift the server's
337 // capability advertisement out of the info object.
338 let connect_result = wait_for_result(&mut reader, &mut writer, tx)?;
339 // Info object is the last Object/EcmaArray AMF0 value after the
340 // properties slot; per §"Enhancing NetConnection connect Command"
341 // the server stamps its capabilities into one of the `_result`
342 // parameters. Walk back to the first Object/ECMA-array carrying
343 // any of the documented capability keys.
344 let server_caps = extract_server_caps(&connect_result);
345
346 // releaseStream + FCPublish — optional but standard.
347 let tx_release = 2.0;
348 writer.write_message(
349 CSID_COMMAND,
350 &build_release_stream(tx_release, &u.stream_name),
351 )?;
352 let tx_fc = 3.0;
353 writer.write_message(CSID_COMMAND, &build_fc_publish(tx_fc, &u.stream_name))?;
354
355 // createStream.
356 let tx_cs = 4.0;
357 writer.write_message(CSID_COMMAND, &build_create_stream(tx_cs))?;
358 writer.flush()?;
359
360 let stream_id = wait_for_create_stream_result(&mut reader, &mut writer, tx_cs)?;
361
362 // publish.
363 let tx_pub = 5.0;
364 writer.write_message(
365 CSID_COMMAND,
366 &build_publish(tx_pub, stream_id, &u.stream_name, publish_type),
367 )?;
368 writer.flush()?;
369
370 // Wait for Publish.Start. Ignore any interleaved control
371 // messages. Some servers don't bother sending onStatus —
372 // don't block forever, just wait briefly.
373 wait_for_publish_start(&mut reader, &mut writer)?;
374
375 Ok(Self {
376 stream,
377 reader,
378 writer,
379 stream_id,
380 next_tx: 10.0,
381 read_eof: false,
382 server_caps,
383 tc_url: u.tc_url.clone(),
384 pending_subs: VecDeque::new(),
385 })
386 }
387
388 /// Send the AVC sequence header (`AVCDecoderConfigurationRecord`
389 /// aka avcC). Must be called once before any NALU-carrying
390 /// [`send_video`](Self::send_video).
391 pub fn send_video_sequence_header(&mut self, avc_c: &[u8]) -> Result<()> {
392 let tag = VideoTag {
393 mod_ex: Vec::new(),
394 frame_type: flv::VIDEO_FRAME_KEYFRAME,
395 codec_id: flv::VIDEO_CODEC_AVC,
396 avc_packet_type: Some(flv::AVC_PACKET_TYPE_SEQUENCE_HEADER),
397 composition_time: 0,
398 body: avc_c.to_vec(),
399 ex_packet_type: None,
400 fourcc: None,
401
402 multitrack: None,
403 };
404 self.send_video_tag(0, &tag)
405 }
406
407 /// Send one video access unit. `body` is the AVCC-formatted
408 /// content (one or more `[u32 length BE][NALU bytes]` pairs).
409 /// `is_keyframe` drives the FLV frame_type bits.
410 pub fn send_video(&mut self, timestamp_ms: u32, is_keyframe: bool, body: &[u8]) -> Result<()> {
411 let tag = VideoTag {
412 mod_ex: Vec::new(),
413 frame_type: if is_keyframe {
414 flv::VIDEO_FRAME_KEYFRAME
415 } else {
416 flv::VIDEO_FRAME_INTER
417 },
418 codec_id: flv::VIDEO_CODEC_AVC,
419 avc_packet_type: Some(flv::AVC_PACKET_TYPE_NALU),
420 composition_time: 0,
421 body: body.to_vec(),
422 ex_packet_type: None,
423 fourcc: None,
424
425 multitrack: None,
426 };
427 self.send_video_tag(timestamp_ms, &tag)
428 }
429
430 fn send_video_tag(&mut self, ts: u32, tag: &VideoTag) -> Result<()> {
431 let payload = flv::build_video(tag);
432 self.writer.write_message(
433 CSID_VIDEO,
434 &Message {
435 msg_type_id: MSG_VIDEO,
436 msg_stream_id: self.stream_id,
437 timestamp: ts,
438 payload,
439 },
440 )?;
441 self.writer.flush()?;
442 Ok(())
443 }
444
445 /// Send the AAC `AudioSpecificConfig` (2 bytes for LC-AAC 44.1k
446 /// stereo: `0x12 0x10`). Must be called once before any
447 /// raw-frame [`send_audio`](Self::send_audio).
448 pub fn send_audio_sequence_header(&mut self, asc: &[u8]) -> Result<()> {
449 let tag = AudioTag {
450 mod_ex: Vec::new(),
451 sound_format: flv::AUDIO_FORMAT_AAC,
452 sound_rate: 3,
453 sound_size_16bit: true,
454 stereo: true,
455 aac_packet_type: Some(flv::AAC_PACKET_TYPE_SEQUENCE_HEADER),
456 body: asc.to_vec(),
457 ex_packet_type: None,
458 audio_fourcc: None,
459
460 multitrack: None,
461 };
462 self.send_audio_tag(0, &tag)
463 }
464
465 /// Send one raw AAC frame.
466 pub fn send_audio(&mut self, timestamp_ms: u32, aac_frame: &[u8]) -> Result<()> {
467 let tag = AudioTag {
468 mod_ex: Vec::new(),
469 sound_format: flv::AUDIO_FORMAT_AAC,
470 sound_rate: 3,
471 sound_size_16bit: true,
472 stereo: true,
473 aac_packet_type: Some(flv::AAC_PACKET_TYPE_RAW),
474 body: aac_frame.to_vec(),
475 ex_packet_type: None,
476 audio_fourcc: None,
477
478 multitrack: None,
479 };
480 self.send_audio_tag(timestamp_ms, &tag)
481 }
482
483 fn send_audio_tag(&mut self, ts: u32, tag: &AudioTag) -> Result<()> {
484 let payload = flv::build_audio(tag);
485 self.writer.write_message(
486 CSID_AUDIO,
487 &Message {
488 msg_type_id: MSG_AUDIO,
489 msg_stream_id: self.stream_id,
490 timestamp: ts,
491 payload,
492 },
493 )?;
494 self.writer.flush()?;
495 Ok(())
496 }
497
498 /// Send `@setDataFrame("onMetaData", metadata)`. Metadata is an
499 /// AMF0 value, typically an ECMA array or object populated with
500 /// `width`, `height`, `duration`, `videodatarate`, `framerate`,
501 /// `videocodecid`, `audiodatarate`, `audiocodecid`, etc.
502 pub fn send_metadata(&mut self, metadata: Amf0Value) -> Result<()> {
503 let msg = build_set_data_frame(self.stream_id, metadata);
504 self.writer.write_message(CSID_DATA, &msg)?;
505 self.writer.flush()?;
506 Ok(())
507 }
508
509 /// Send `onMetaData` as an AMF3-encoded data message (RTMP message
510 /// type 15) instead of the AMF0 default.
511 ///
512 /// The body is framed per AMF 3 spec §4.1 / AMF 0 spec §3.1: the
513 /// outer NetConnection message structure is AMF0, and each value
514 /// switches to AMF3 by prefixing it with the `avmplus-object-marker`
515 /// (`0x11`). Most ingest endpoints stay on AMF0, so prefer
516 /// [`send_metadata`](Self::send_metadata); this exists for peers that
517 /// negotiated an AMF3 channel.
518 pub fn send_metadata_amf3(&mut self, metadata: amf3::Amf3Value) -> Result<()> {
519 let mut payload = Vec::new();
520 payload.push(amf3::AVMPLUS_OBJECT_MARKER);
521 amf3::encode(&mut payload, &amf3::Amf3Value::String("onMetaData".into()));
522 payload.push(amf3::AVMPLUS_OBJECT_MARKER);
523 amf3::encode(&mut payload, &metadata);
524 let msg = Message {
525 msg_type_id: MSG_DATA_AMF3,
526 msg_stream_id: self.stream_id,
527 timestamp: 0,
528 payload,
529 };
530 self.writer.write_message(CSID_DATA, &msg)?;
531 self.writer.flush()?;
532 Ok(())
533 }
534
535 /// Send a batch of audio / video / data sub-messages as one
536 /// Aggregate Message (RTMP 1.0 §7.1.6, message type id 22).
537 ///
538 /// An aggregate trades one extra 11-byte sub-header per sub-message
539 /// (plus a 4-byte back-pointer) for the chunk-header overhead the
540 /// chunk writer would emit on each sub if it sent them
541 /// individually. For an active publish with several A/V messages
542 /// queued at the same timestamp this can cut the chunk-header
543 /// surface in half.
544 ///
545 /// `subs` carries pre-built FLV-shaped messages — typically AVC
546 /// video (type 9) and AAC audio (type 8) bodies the caller has
547 /// already framed via [`flv::build_video`] / [`flv::build_audio`].
548 /// Caller-supplied `msg_stream_id` fields are overridden to this
549 /// client's publish stream id per §7.1.6 ("the message stream ID
550 /// of the aggregate message overrides the message stream IDs of
551 /// the sub-messages"). The aggregate's own wire timestamp is set
552 /// to the first sub's timestamp so the §7.1.6 re-normalisation
553 /// offset is zero on the wire.
554 ///
555 /// Returns the same errors as
556 /// [`build_aggregate`](crate::aggregate::build_aggregate) plus any
557 /// I/O error from the underlying chunk writer. An empty `subs`
558 /// slice is a no-op.
559 pub fn send_aggregate(&mut self, subs: &[Message]) -> Result<()> {
560 if subs.is_empty() {
561 return Ok(());
562 }
563 // Override every sub's msg_stream_id to ours so the §7.1.6
564 // override invariant holds without surprising the caller.
565 let normalized: Vec<Message> = subs
566 .iter()
567 .map(|s| Message {
568 msg_type_id: s.msg_type_id,
569 msg_stream_id: self.stream_id,
570 timestamp: s.timestamp,
571 payload: s.payload.clone(),
572 })
573 .collect();
574 let agg = crate::aggregate::build_aggregate(self.stream_id, &normalized)?;
575 // CSID_DATA (6) is the natural data-channel id — aggregates
576 // aren't a protocol-control event.
577 self.writer.write_message(CSID_DATA, &agg)?;
578 self.writer.flush()?;
579 Ok(())
580 }
581
582 /// Send a `UserControl PingRequest` (RTMP 1.0 §3.7, UCM type 6)
583 /// carrying the supplied 4-byte timestamp.
584 ///
585 /// The peer is expected to echo the value back as a `PingResponse`
586 /// (UCM type 7), which surfaces from
587 /// [`poll_event`](Self::poll_event) as
588 /// [`ClientEvent::PingResponse`]. Typical use is round-trip-time
589 /// measurement: stamp the local monotonic clock into the request,
590 /// then subtract from the response timestamp once the matching
591 /// `PingResponse` arrives. The publish direction normally never
592 /// needs this — but a publisher pumping a low-bandwidth feed over
593 /// a flaky link may want to probe liveness explicitly rather than
594 /// wait for TCP keepalive.
595 pub fn send_ping_request(&mut self, timestamp_ms: u32) -> Result<()> {
596 self.writer.write_message(
597 CSID_PROTOCOL_CONTROL,
598 &build_user_control_ping_request(timestamp_ms),
599 )?;
600 self.writer.flush()?;
601 Ok(())
602 }
603
604 /// Emit a §5.3 Acknowledgement if the reader's received-byte count
605 /// has crossed the server-negotiated §5.5 window since the last
606 /// one. No-op until the server advertises a Window Acknowledgement
607 /// Size / Set Peer Bandwidth (every commodity ingest does so right
608 /// after `connect`).
609 fn maybe_send_ack(&mut self) -> Result<()> {
610 if let Some(seq) = self.reader.ack_due() {
611 self.writer
612 .write_message(CSID_PROTOCOL_CONTROL, &build_ack(seq))?;
613 self.writer.flush()?;
614 }
615 Ok(())
616 }
617
618 /// Poll for one server-originated event.
619 ///
620 /// Reads up to one inbound RTMP message from the server, applies
621 /// protocol-level housekeeping internally (set-chunk-size,
622 /// window-ack-size, set-peer-bandwidth, ping-request/response,
623 /// acks), and surfaces externally-visible notifications as a
624 /// [`ClientEvent`]:
625 ///
626 /// * `UserControl StreamBegin(sid)` → [`ClientEvent::StreamBegin`]
627 /// * `UserControl StreamEOF(sid)` → [`ClientEvent::StreamEof`]
628 /// (mirror of [`RtmpSession::close`](crate::RtmpSession::close)'s
629 /// server-side teardown; RTMP 1.0 §7.1.7)
630 /// * `onStatus(...)` → [`ClientEvent::OnStatus`]
631 /// * `_result(tx_id, ...)` → [`ClientEvent::Result`]
632 /// * `_error(tx_id, ...)` → [`ClientEvent::ErrorReply`]
633 /// * everything else → [`ClientEvent::Other`]
634 ///
635 /// Returns `Ok(None)` once the server has signalled a clean stream
636 /// end (`StreamEOF`) or once the TCP read half observes EOF /
637 /// connection-reset. After `Ok(None)` is returned the caller
638 /// should stop writing and finish the session with
639 /// [`close`](Self::close).
640 ///
641 /// This is a blocking call. Set a finite read timeout on
642 /// [`inner_mut`](Self::inner_mut) ahead of time if you want
643 /// `poll_event` to return periodically with an `Err(Error::Io)`
644 /// kind `WouldBlock` / `TimedOut` so an outer event loop can do
645 /// other work between polls — the underlying TCP read deadline is
646 /// the timeout granularity, not a poll interval.
647 pub fn poll_event(&mut self) -> Result<Option<ClientEvent>> {
648 loop {
649 if self.read_eof {
650 return Ok(None);
651 }
652 // Drain any sub-messages decomposed from a prior
653 // server-originated Aggregate Message (RTMP 1.0 §7.1.6)
654 // ahead of any further wire read so the publish order is
655 // preserved.
656 if let Some(sub) = self.pending_subs.pop_front() {
657 let ev = self.classify_message(sub)?;
658 if matches!(ev, ClientEvent::Other) {
659 // Don't return `Other` for a queued sub — the caller
660 // typically pumps `poll_event` to observe semantic
661 // events; an aggregate full of `Other` would
662 // otherwise return N `Other`s in a row.
663 continue;
664 }
665 return Ok(Some(ev));
666 }
667 let msg = match self.reader.read_message() {
668 Ok(m) => m,
669 Err(Error::Io(e))
670 if matches!(
671 e.kind(),
672 std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionReset
673 ) =>
674 {
675 self.read_eof = true;
676 return Ok(None);
677 }
678 Err(Error::UnexpectedEof) => {
679 self.read_eof = true;
680 return Ok(None);
681 }
682 Err(e) => return Err(e),
683 };
684 // §5.3: acknowledge once the server has sent a full window
685 // of bytes since our last ack (window set by the server's
686 // §5.5 Window Acknowledgement Size / §5.6 Set Peer
687 // Bandwidth).
688 self.maybe_send_ack()?;
689 if msg.msg_type_id == MSG_AGGREGATE {
690 // RTMP 1.0 §7.1.6 Aggregate Message. Decompose into
691 // FLV-shaped sub-messages with the §7.1.6 timestamp
692 // re-normalisation applied and the message-stream-id
693 // override resolved; queue them so subsequent calls
694 // surface the per-sub events in publish order. Don't
695 // return on the aggregate itself — drain the queue.
696 let subs = parse_aggregate(&msg)?;
697 self.pending_subs.extend(subs);
698 continue;
699 }
700 return Ok(Some(self.classify_message(msg)?));
701 }
702 }
703
704 /// Per-message classification shared between the wire-read path
705 /// and the aggregate-sub-drain path.
706 fn classify_message(&mut self, msg: Message) -> Result<ClientEvent> {
707 match msg.msg_type_id {
708 MSG_SET_CHUNK_SIZE => {
709 let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
710 self.reader.set_chunk_size(size as usize);
711 Ok(ClientEvent::Other)
712 }
713 MSG_WINDOW_ACK_SIZE => {
714 // §5.5: the server tells us which window size to use
715 // when sending §5.3 Acknowledgements. Honour it so our
716 // ack cadence matches the server's expectation.
717 let size = read_u32_be(&msg.payload)?;
718 self.reader.set_window_ack_size(size);
719 Ok(ClientEvent::Other)
720 }
721 MSG_SET_PEER_BANDWIDTH => {
722 // §5.6: "The output bandwidth value is the same as the
723 // window size for the peer." Adopt the leading 4-byte
724 // window as our send-side ack window. (Trailing Limit
725 // type byte is advisory.)
726 if msg.payload.len() >= 4 {
727 let size = read_u32_be(&msg.payload[..4])?;
728 self.reader.set_window_ack_size(size);
729 }
730 Ok(ClientEvent::Other)
731 }
732 MSG_ACK => {
733 // The server's §5.3 sequence number (bytes it has
734 // received from us). Informational for a publisher.
735 Ok(ClientEvent::Other)
736 }
737 MSG_USER_CONTROL => {
738 let (event_type, event_data) = parse_user_control(&msg.payload)?;
739 match event_type {
740 USR_STREAM_BEGIN => {
741 let sid = ucm_stream_id(event_data)?;
742 Ok(ClientEvent::StreamBegin { stream_id: sid })
743 }
744 USR_STREAM_EOF => {
745 let sid = ucm_stream_id(event_data)?;
746 // Don't latch here: the server typically sends
747 // a trailing onStatus / Unpublish.Success after
748 // StreamEOF, then half-closes; we let the
749 // subsequent read drain those and report EOF
750 // naturally.
751 Ok(ClientEvent::StreamEof { stream_id: sid })
752 }
753 USR_STREAM_DRY => {
754 // Per RTMP 1.0 §3.7, StreamDry is a transient
755 // "no data on the stream right now" signal —
756 // distinct from StreamEOF, which terminates
757 // playback. Surface to the caller so an outer
758 // event loop can react (e.g. warn UI, switch
759 // to a fallback stream); don't latch read_eof.
760 let sid = ucm_stream_id(event_data)?;
761 Ok(ClientEvent::StreamDry { stream_id: sid })
762 }
763 USR_SET_BUFFER_LENGTH => {
764 // Per RTMP 1.0 §3.7, SetBufferLength is the only
765 // standard UCM event with an 8-byte event-data
766 // body: 4-byte stream id + 4-byte buffer length
767 // in milliseconds. It is sent from a *playback*
768 // client to the server — a publish-direction
769 // client almost never sees it inbound. We
770 // validate the payload size so a malformed
771 // SetBufferLength from a confused peer surfaces
772 // a clean error instead of silently truncating;
773 // and we surface it as Other (no action required
774 // on the publisher side).
775 if event_data.len() < 8 {
776 return Err(Error::ProtocolViolation(
777 "UserControl SetBufferLength: event data < 8 bytes".into(),
778 ));
779 }
780 Ok(ClientEvent::Other)
781 }
782 USR_STREAM_IS_RECORDED => {
783 // Per RTMP 1.0 §3.7, server announces that the
784 // stream is a recorded (on-demand) stream.
785 // Surface to the caller — a publish client may
786 // want to log this if the server marks our own
787 // publish stream as recorded after we asked for
788 // "live" (mismatched publish type).
789 let sid = ucm_stream_id(event_data)?;
790 Ok(ClientEvent::StreamIsRecorded { stream_id: sid })
791 }
792 USR_PING_REQUEST => {
793 // Server pings — reply with PingResponse echoing
794 // the same 4-byte timestamp body so the server's
795 // liveness probe succeeds.
796 let ts_bytes = event_data;
797 if ts_bytes.len() >= 4 {
798 let mut p = Vec::with_capacity(6);
799 p.extend_from_slice(&USR_PING_RESPONSE.to_be_bytes());
800 p.extend_from_slice(&ts_bytes[..4]);
801 let _ = self.writer.write_message(
802 CSID_PROTOCOL_CONTROL,
803 &Message {
804 msg_type_id: MSG_USER_CONTROL,
805 msg_stream_id: 0,
806 timestamp: 0,
807 payload: p,
808 },
809 );
810 let _ = self.writer.flush();
811 }
812 Ok(ClientEvent::Other)
813 }
814 USR_PING_RESPONSE => {
815 // Per RTMP 1.0 §3.7, this echoes back the 4-byte
816 // timestamp the publisher carried in a prior
817 // PingRequest. Surface to the caller so a
818 // round-trip-time measurement loop can compare
819 // the echoed value to its own send-time clock.
820 if event_data.len() < 4 {
821 return Err(Error::ProtocolViolation(
822 "UserControl PingResponse: event data < 4 bytes".into(),
823 ));
824 }
825 let ts = u32::from_be_bytes([
826 event_data[0],
827 event_data[1],
828 event_data[2],
829 event_data[3],
830 ]);
831 Ok(ClientEvent::PingResponse { timestamp_ms: ts })
832 }
833 _ => {
834 // Unknown / reserved UCM event type — surface as
835 // Other; forwarding ingest may receive future
836 // event types we don't model yet.
837 Ok(ClientEvent::Other)
838 }
839 }
840 }
841 MSG_COMMAND_AMF0 => {
842 let values = amf::decode_all(&msg.payload)?;
843 Ok(classify_command(values))
844 }
845 MSG_COMMAND_AMF3 => {
846 let values: Vec<Amf0Value> = amf3::decode_data_message(&msg.payload)?
847 .iter()
848 .map(amf3::Amf3Value::to_amf0)
849 .collect();
850 Ok(classify_command(values))
851 }
852 MSG_AGGREGATE => {
853 // A sub-message inside an aggregate is itself an
854 // aggregate. Forward to the same queue so the next
855 // `poll_event` tick decomposes it. The wire path above
856 // already handles top-level aggregates directly.
857 let subs = parse_aggregate(&msg)?;
858 self.pending_subs.extend(subs);
859 Ok(ClientEvent::Other)
860 }
861 _ => Ok(ClientEvent::Other),
862 }
863 }
864
865 /// Send `closeStream` / `deleteStream` and shut the TCP socket.
866 pub fn close(mut self) -> Result<()> {
867 let tx = self.next_tx;
868 self.next_tx += 1.0;
869 let payload = amf::encode_command(
870 "closeStream",
871 tx,
872 Amf0Value::Null,
873 &[Amf0Value::Number(self.stream_id as f64)],
874 );
875 let _ = self.writer.write_message(
876 CSID_COMMAND,
877 &Message {
878 msg_type_id: MSG_COMMAND_AMF0,
879 msg_stream_id: self.stream_id,
880 timestamp: 0,
881 payload,
882 },
883 );
884 let _ = self.writer.flush();
885 // Shut down the write half only (send a graceful FIN) rather
886 // than the whole socket. `Shutdown::Both` tears the read half
887 // down at the same instant, which on some platforms makes the
888 // kernel answer the peer's still-unacked data with a RST; that
889 // RST discards any A/V messages the peer hasn't yet drained
890 // from its receive buffer — closeStream and the last frames we
891 // just wrote can be thrown away mid-stream. A write-half FIN
892 // lets the peer read every buffered frame plus our closeStream
893 // command, then observe EOF cleanly. The read half closes when
894 // `self` (and its owned `TcpStream`) drops at end of scope.
895 let _ = self.stream.shutdown(Shutdown::Write);
896 Ok(())
897 }
898
899 pub fn inner_mut(&mut self) -> &mut TcpStream {
900 &mut self.stream
901 }
902
903 /// Apply a `recv` timeout to the chunk reader's actual socket
904 /// clone (the one [`poll_event`](Self::poll_event) blocks on)
905 /// rather than the session's bookkeeping clone.
906 ///
907 /// On Linux a sockopt set through one `try_clone` descriptor
908 /// carries to its sibling clones because they share one file
909 /// description; on Windows each clone has its own kernel handle
910 /// with independent socket options, so the timeout has to be
911 /// installed on the exact socket the `recv` call will issue
912 /// against. Use this rather than
913 /// [`inner_mut`](Self::inner_mut)`.set_read_timeout(...)` when the
914 /// goal is to bound `poll_event`'s block time.
915 pub fn set_read_timeout(&mut self, d: Option<Duration>) -> Result<()> {
916 self.reader.inner_mut().set_read_timeout(d)?;
917 let _ = self.stream.set_read_timeout(d);
918 Ok(())
919 }
920}
921
922/// Consume messages from `reader` until we see a command named
923/// `_result` for `expected_tx`. Forward relevant protocol-control
924/// updates (SetChunkSize) to the reader.
925fn wait_for_result<R: Read, W: Write>(
926 reader: &mut ChunkReader<R>,
927 _writer: &mut ChunkWriter<W>,
928 expected_tx: f64,
929) -> Result<Vec<Amf0Value>> {
930 loop {
931 let msg = reader.read_message()?;
932 match msg.msg_type_id {
933 MSG_SET_CHUNK_SIZE => {
934 let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
935 reader.set_chunk_size(size as usize);
936 }
937 MSG_WINDOW_ACK_SIZE => {
938 // §5.5: capture the server's window during setup so the
939 // §5.3 ack obligation is live before the first media
940 // frame flows.
941 let size = read_u32_be(&msg.payload)?;
942 reader.set_window_ack_size(size);
943 }
944 MSG_SET_PEER_BANDWIDTH if msg.payload.len() >= 4 => {
945 // §5.6: output bandwidth equals the peer's window size.
946 let size = read_u32_be(&msg.payload[..4])?;
947 reader.set_window_ack_size(size);
948 }
949 MSG_COMMAND_AMF0 => {
950 let values = amf::decode_all(&msg.payload)?;
951 let name = values.first().and_then(Amf0Value::as_str).unwrap_or("");
952 let tx = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(-1.0);
953 if name == "_result" && tx == expected_tx {
954 return Ok(values);
955 }
956 if name == "_error" {
957 return Err(Error::Other(format!(
958 "RTMP _error from server: {:?}",
959 values.get(3)
960 )));
961 }
962 // Any other status notifications before our _result
963 // (StreamBegin, bandwidth negotiations, etc.) — ignore.
964 }
965 _ => {}
966 }
967 }
968}
969
970fn wait_for_create_stream_result<R: Read, W: Write>(
971 reader: &mut ChunkReader<R>,
972 writer: &mut ChunkWriter<W>,
973 expected_tx: f64,
974) -> Result<u32> {
975 let values = wait_for_result(reader, writer, expected_tx)?;
976 // _result carries the new stream id as the last AMF0 value
977 // (either arg slot [3] or further back if the peer sent an extra
978 // props object).
979 let sid = values
980 .iter()
981 .rev()
982 .find_map(Amf0Value::as_f64)
983 .ok_or_else(|| Error::InvalidCommand("createStream result has no stream id".into()))?;
984 Ok(sid as u32)
985}
986
987fn wait_for_publish_start<R: Read, W: Write>(
988 reader: &mut ChunkReader<R>,
989 _writer: &mut ChunkWriter<W>,
990) -> Result<()> {
991 // Be lenient: the spec says the server SHOULD send an onStatus
992 // with NetStream.Publish.Start, but some servers skip it. Bail
993 // after we've seen a user-control StreamBegin OR an onStatus on
994 // the publish stream.
995 for _ in 0..20 {
996 let msg = match reader.read_message() {
997 Ok(m) => m,
998 Err(Error::Io(ref e))
999 if matches!(
1000 e.kind(),
1001 std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
1002 ) =>
1003 {
1004 return Ok(());
1005 }
1006 Err(e) => return Err(e),
1007 };
1008 match msg.msg_type_id {
1009 MSG_USER_CONTROL => return Ok(()),
1010 MSG_COMMAND_AMF0 => {
1011 let values = amf::decode_all(&msg.payload)?;
1012 if values
1013 .first()
1014 .and_then(Amf0Value::as_str)
1015 .map(|n| n == "onStatus" || n == "_result")
1016 .unwrap_or(false)
1017 {
1018 return Ok(());
1019 }
1020 }
1021 _ => {}
1022 }
1023 }
1024 Ok(())
1025}
1026
1027fn read_u32_be(buf: &[u8]) -> Result<u32> {
1028 if buf.len() < 4 {
1029 return Err(Error::ProtocolViolation("need 4 bytes for u32be".into()));
1030 }
1031 Ok(u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]))
1032}
1033
1034/// Locate the Enhanced RTMP capability block in a server's
1035/// `_result(connect, ...)` AMF0 value list.
1036///
1037/// Per `enhanced-rtmp-v2.pdf` §"Enhancing NetConnection connect Command"
1038/// the server stamps `videoFourCcInfoMap` / `audioFourCcInfoMap` /
1039/// `capsEx` etc. into one of the `_result` parameters; in practice
1040/// every implementation we have seen drops them into the trailing info
1041/// object (the slot that carries `NetConnection.Connect.Success`). Some
1042/// servers also drop them into the properties slot (the second AMF0
1043/// value, right after the transaction id), so we walk every
1044/// Object / ECMA-array in the response and return the first one whose
1045/// `ConnectCapabilities::from_amf0` carries an Enhanced-RTMP property.
1046///
1047/// `objectEncoding` alone doesn't count: every pre-2023 server stamps
1048/// `objectEncoding = 0` into its info object as part of the legacy
1049/// `NetConnection.Connect.Success` shape, so picking that up would make
1050/// every legacy server look like a v2 advertisement. A fully-legacy
1051/// server therefore returns the empty capability block.
1052fn extract_server_caps(values: &[Amf0Value]) -> ConnectCapabilities {
1053 for v in values {
1054 if matches!(v, Amf0Value::Object(_) | Amf0Value::EcmaArray(_)) {
1055 let caps = ConnectCapabilities::from_amf0(v);
1056 if has_enhanced_rtmp_property(&caps) {
1057 return caps;
1058 }
1059 }
1060 }
1061 ConnectCapabilities::default()
1062}
1063
1064/// True if `caps` carries any of the Enhanced RTMP v1+v2 capability
1065/// properties (`fourCcList`, `videoFourCcInfoMap`,
1066/// `audioFourCcInfoMap`, `capsEx`). `objectEncoding` alone — the
1067/// legacy `_result(connect)` field — does not count.
1068fn has_enhanced_rtmp_property(caps: &ConnectCapabilities) -> bool {
1069 !caps.fourcc_list.is_empty()
1070 || !caps.video_fourcc_info_map.is_empty()
1071 || !caps.audio_fourcc_info_map.is_empty()
1072 || caps.caps_ex != 0
1073}
1074
1075/// Decode the `User Control` payload framing per RTMP 1.0 §6.2 /
1076/// §7.1.7: a 16-bit BE event type followed by variable-length event
1077/// data. Returns `(event_type, event_data)` borrowed from the input.
1078fn parse_user_control(buf: &[u8]) -> Result<(u16, &[u8])> {
1079 if buf.len() < 2 {
1080 return Err(Error::ProtocolViolation(
1081 "UserControl: payload < 2 bytes".into(),
1082 ));
1083 }
1084 let event_type = u16::from_be_bytes([buf[0], buf[1]]);
1085 Ok((event_type, &buf[2..]))
1086}
1087
1088/// Stream-id-carrying UCM events (Stream Begin / Stream EOF / Stream
1089/// Dry / StreamIsRecorded) all use a 4-byte BE stream id as their
1090/// event data.
1091fn ucm_stream_id(event_data: &[u8]) -> Result<u32> {
1092 if event_data.len() < 4 {
1093 return Err(Error::ProtocolViolation(
1094 "UserControl: event data < 4 bytes (need stream id)".into(),
1095 ));
1096 }
1097 Ok(u32::from_be_bytes([
1098 event_data[0],
1099 event_data[1],
1100 event_data[2],
1101 event_data[3],
1102 ]))
1103}
1104
1105/// Resolve a reconnect-target URI reference against the current
1106/// connection's `tcUrl`, per `enhanced-rtmp-v2.pdf` §"Reconnect
1107/// Request" ("a relative URI reference should be resolved relative to
1108/// the tcUrl for the current connection"). Handles the four reference
1109/// shapes the spec's Info Object table gives as examples:
1110///
1111/// 1. `rtmp://foo.mydomain.com:1935/realtimeapp` — absolute (has a
1112/// scheme): taken verbatim.
1113/// 2. `//192.0.2.0/realtimeapp` — network-path reference: inherits
1114/// only the base's scheme.
1115/// 3. `/realtimeapp` — absolute-path reference: inherits the base's
1116/// scheme + authority.
1117/// 4. `realtimeapp` — relative-path reference: merged onto the base's
1118/// path with the last segment replaced.
1119///
1120/// An empty reference resolves to the base itself.
1121pub fn resolve_tc_url(base: &str, reference: &str) -> String {
1122 if reference.is_empty() {
1123 return base.to_owned();
1124 }
1125 if reference.contains("://") {
1126 // Absolute URI reference — already carries its own scheme.
1127 return reference.to_owned();
1128 }
1129 let (scheme, after_scheme) = match base.find("://") {
1130 Some(i) => (&base[..i], &base[i + 3..]),
1131 None => ("rtmp", base),
1132 };
1133 if let Some(net_path) = reference.strip_prefix("//") {
1134 // Network-path reference: keep only our scheme.
1135 return format!("{scheme}://{net_path}");
1136 }
1137 let (authority, base_path) = match after_scheme.find('/') {
1138 Some(i) => (&after_scheme[..i], &after_scheme[i..]),
1139 None => (after_scheme, ""),
1140 };
1141 if reference.starts_with('/') {
1142 // Absolute-path reference: keep scheme + authority.
1143 return format!("{scheme}://{authority}{reference}");
1144 }
1145 // Relative-path reference: merge — drop the base path's last
1146 // segment, keep everything up to (and including) its final '/'.
1147 let dir = match base_path.rfind('/') {
1148 Some(i) => &base_path[..=i],
1149 None => "/",
1150 };
1151 format!("{scheme}://{authority}{dir}{reference}")
1152}
1153
1154/// Classify a decoded AMF0 command message into a [`ClientEvent`].
1155/// Matches `onStatus` / `_result` / `_error` by name and pulls the
1156/// transaction id / info object out of the expected slots.
1157fn classify_command(values: Vec<Amf0Value>) -> ClientEvent {
1158 let name = values.first().and_then(Amf0Value::as_str).unwrap_or("");
1159 match name {
1160 "onStatus" => {
1161 // ["onStatus", 0.0, null, <info-object>]
1162 if let Some(info) = values.get(3) {
1163 let level = info
1164 .get("level")
1165 .and_then(Amf0Value::as_str)
1166 .unwrap_or("")
1167 .to_owned();
1168 let code = info
1169 .get("code")
1170 .and_then(Amf0Value::as_str)
1171 .unwrap_or("")
1172 .to_owned();
1173 let description = info
1174 .get("description")
1175 .and_then(Amf0Value::as_str)
1176 .unwrap_or("")
1177 .to_owned();
1178 // Enhanced RTMP v2 §"Reconnect Request": the server
1179 // asks us to reconnect via a NetConnection-level
1180 // onStatus whose code MUST be
1181 // NetConnection.Connect.ReconnectRequest and whose
1182 // level MUST be "status". Lift the optional tcUrl so
1183 // the caller can re-dial; an event with the right code
1184 // but the wrong level is NOT a valid reconnect request
1185 // per spec, so it falls through as a plain OnStatus.
1186 if code == crate::message::RECONNECT_REQUEST_CODE && level == "status" {
1187 let tc_url = info
1188 .get("tcUrl")
1189 .and_then(Amf0Value::as_str)
1190 .map(str::to_owned);
1191 return ClientEvent::ReconnectRequest {
1192 tc_url,
1193 description,
1194 };
1195 }
1196 return ClientEvent::OnStatus {
1197 level,
1198 code,
1199 description,
1200 };
1201 }
1202 ClientEvent::Other
1203 }
1204 "_result" => {
1205 let tx = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(0.0);
1206 ClientEvent::Result {
1207 transaction_id: tx,
1208 values,
1209 }
1210 }
1211 "_error" => {
1212 let tx = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(0.0);
1213 ClientEvent::ErrorReply {
1214 transaction_id: tx,
1215 values,
1216 }
1217 }
1218 _ => ClientEvent::Other,
1219 }
1220}
1221
1222#[cfg(test)]
1223mod tests {
1224 use super::*;
1225
1226 #[test]
1227 fn parse_user_control_stream_eof_recovers_stream_id() {
1228 // Wire layout per RTMP 1.0 §7.1.7: [event_type=0x0001 BE]
1229 // [stream_id BE]. Our build_user_control_stream_eof emits this
1230 // exact 6-byte body — same six bytes the auditor's
1231 // session_close test asserts on.
1232 let payload: [u8; 6] = [0x00, 0x01, 0x00, 0x00, 0x00, 0x07];
1233 let (event_type, event_data) = parse_user_control(&payload).expect("parse UCM");
1234 assert_eq!(event_type, USR_STREAM_EOF);
1235 assert_eq!(ucm_stream_id(event_data).expect("sid"), 7);
1236 }
1237
1238 #[test]
1239 fn parse_user_control_rejects_truncated_payload() {
1240 // < 2 bytes — can't even read the event type.
1241 assert!(parse_user_control(&[0x00]).is_err());
1242 assert!(parse_user_control(&[]).is_err());
1243 // 2 bytes (event type only) but the event type is a
1244 // stream-id-carrying variant: event-data is empty so the SID
1245 // extractor refuses.
1246 let (event_type, event_data) = parse_user_control(&[0x00, 0x01]).expect("parse UCM");
1247 assert_eq!(event_type, USR_STREAM_EOF);
1248 assert!(ucm_stream_id(event_data).is_err());
1249 }
1250
1251 #[test]
1252 fn classify_command_recognises_on_status() {
1253 let info = Amf0Value::Object(vec![
1254 ("level".into(), Amf0Value::String("status".into())),
1255 (
1256 "code".into(),
1257 Amf0Value::String("NetStream.Publish.Start".into()),
1258 ),
1259 ("description".into(), Amf0Value::String("ready".into())),
1260 ]);
1261 let values = vec![
1262 Amf0Value::String("onStatus".into()),
1263 Amf0Value::Number(0.0),
1264 Amf0Value::Null,
1265 info,
1266 ];
1267 match classify_command(values) {
1268 ClientEvent::OnStatus {
1269 level,
1270 code,
1271 description,
1272 } => {
1273 assert_eq!(level, "status");
1274 assert_eq!(code, "NetStream.Publish.Start");
1275 assert_eq!(description, "ready");
1276 }
1277 other => panic!("expected OnStatus, got {other:?}"),
1278 }
1279 }
1280
1281 /// Enhanced RTMP v2 §"Reconnect Request": an onStatus whose code
1282 /// is `NetConnection.Connect.ReconnectRequest` (level `status`)
1283 /// lifts to [`ClientEvent::ReconnectRequest`] with the optional
1284 /// `tcUrl` extracted — and the same code under a non-`status`
1285 /// level is NOT a valid reconnect request per spec ("to reconnect
1286 /// the level MUST be set to status"), so it stays a plain
1287 /// OnStatus.
1288 #[test]
1289 fn classify_command_recognises_reconnect_request() {
1290 let msg = crate::message::build_reconnect_request(
1291 Some("//192.0.2.0/realtimeapp"),
1292 Some("server update"),
1293 );
1294 let values = amf::decode_all(&msg.payload).unwrap();
1295 match classify_command(values) {
1296 ClientEvent::ReconnectRequest {
1297 tc_url,
1298 description,
1299 } => {
1300 assert_eq!(tc_url.as_deref(), Some("//192.0.2.0/realtimeapp"));
1301 assert_eq!(description, "server update");
1302 }
1303 other => panic!("expected ReconnectRequest, got {other:?}"),
1304 }
1305
1306 // tcUrl omitted → None ("use the tcUrl for the current
1307 // connection").
1308 let msg = crate::message::build_reconnect_request(None, None);
1309 let values = amf::decode_all(&msg.payload).unwrap();
1310 match classify_command(values) {
1311 ClientEvent::ReconnectRequest { tc_url, .. } => assert_eq!(tc_url, None),
1312 other => panic!("expected ReconnectRequest, got {other:?}"),
1313 }
1314
1315 // Wrong level → plain OnStatus passthrough.
1316 let info = Amf0Value::Object(vec![
1317 ("level".into(), Amf0Value::String("error".into())),
1318 (
1319 "code".into(),
1320 Amf0Value::String(crate::message::RECONNECT_REQUEST_CODE.into()),
1321 ),
1322 ]);
1323 let values = vec![
1324 Amf0Value::String("onStatus".into()),
1325 Amf0Value::Number(0.0),
1326 Amf0Value::Null,
1327 info,
1328 ];
1329 match classify_command(values) {
1330 ClientEvent::OnStatus { level, code, .. } => {
1331 assert_eq!(level, "error");
1332 assert_eq!(code, crate::message::RECONNECT_REQUEST_CODE);
1333 }
1334 other => panic!("expected OnStatus, got {other:?}"),
1335 }
1336 }
1337
1338 /// The four reference shapes from the spec's Info Object table
1339 /// (`enhanced-rtmp-v2.pdf` §"Reconnect Request"), resolved against
1340 /// a typical publisher tcUrl.
1341 #[test]
1342 fn resolve_tc_url_spec_example_shapes() {
1343 let base = "rtmp://origin.example.com:1935/live";
1344 // 1. Absolute URI reference — verbatim.
1345 assert_eq!(
1346 resolve_tc_url(base, "rtmp://foo.mydomain.com:1935/realtimeapp"),
1347 "rtmp://foo.mydomain.com:1935/realtimeapp"
1348 );
1349 // 2. Network-path reference — inherits only our scheme.
1350 assert_eq!(
1351 resolve_tc_url(base, "//192.0.2.0/realtimeapp"),
1352 "rtmp://192.0.2.0/realtimeapp"
1353 );
1354 // 3. Absolute-path reference — inherits scheme + authority.
1355 assert_eq!(
1356 resolve_tc_url(base, "/realtimeapp"),
1357 "rtmp://origin.example.com:1935/realtimeapp"
1358 );
1359 // 4. Relative-path reference — merged onto the base path with
1360 // the last segment replaced.
1361 assert_eq!(
1362 resolve_tc_url(base, "realtimeapp"),
1363 "rtmp://origin.example.com:1935/realtimeapp"
1364 );
1365 // Deeper base path: only the last segment is replaced.
1366 assert_eq!(
1367 resolve_tc_url("rtmp://h/app/inst", "other"),
1368 "rtmp://h/app/other"
1369 );
1370 // Empty reference → base itself.
1371 assert_eq!(resolve_tc_url(base, ""), base);
1372 // Base without any path: relative ref lands at the root.
1373 assert_eq!(
1374 resolve_tc_url("rtmp://h:1935", "realtimeapp"),
1375 "rtmp://h:1935/realtimeapp"
1376 );
1377 }
1378
1379 #[test]
1380 fn classify_command_recognises_result_and_error() {
1381 let result = classify_command(vec![
1382 Amf0Value::String("_result".into()),
1383 Amf0Value::Number(42.0),
1384 Amf0Value::Null,
1385 Amf0Value::Number(7.0),
1386 ]);
1387 match result {
1388 ClientEvent::Result {
1389 transaction_id,
1390 values,
1391 } => {
1392 assert_eq!(transaction_id, 42.0);
1393 assert_eq!(values.len(), 4);
1394 }
1395 other => panic!("expected Result, got {other:?}"),
1396 }
1397
1398 let err = classify_command(vec![
1399 Amf0Value::String("_error".into()),
1400 Amf0Value::Number(99.0),
1401 Amf0Value::Null,
1402 Amf0Value::Null,
1403 ]);
1404 match err {
1405 ClientEvent::ErrorReply { transaction_id, .. } => {
1406 assert_eq!(transaction_id, 99.0);
1407 }
1408 other => panic!("expected ErrorReply, got {other:?}"),
1409 }
1410 }
1411}