oxideav_rtmp/server.rs
1//! RTMP server: accepts an incoming publisher.
2//!
3//! The exposed flow is intentionally two-phase so consumers can
4//! verify stream keys / auth:
5//!
6//! ```text
7//! let server = RtmpServer::bind("0.0.0.0:1935")?;
8//! loop {
9//! let req = server.accept()?;
10//! if !my_auth(&req.app, &req.stream_name) {
11//! req.reject("unauthorized")?;
12//! continue;
13//! }
14//! let mut session = req.accept()?;
15//! while let Some(pkt) = session.next_packet()? { … }
16//! }
17//! ```
18//!
19//! [`RtmpServer::serve`] wraps the above in a thread-per-connection
20//! loop for callers who want to handle many publishers at once.
21//! Single-client use — the typical oxideav case — just calls
22//! [`RtmpServer::accept`] directly.
23
24use std::collections::VecDeque;
25use std::io::{Read, Write};
26use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
27use std::thread;
28use std::time::Duration;
29
30use crate::aggregate::parse_aggregate;
31use crate::amf::{self, Amf0Value};
32use crate::amf3;
33use crate::caps::ConnectCapabilities;
34use crate::chunk::{ChunkReader, ChunkWriter, Message};
35use crate::error::{Error, Result};
36use crate::flv::{parse_audio, parse_video, AudioTag, VideoTag};
37use crate::message::*;
38
39/// After-connect server chunk size. Larger = fewer chunk headers per
40/// message. 4 KiB is what most commodity ingest paths negotiate in practice.
41const SERVER_CHUNK_SIZE: u32 = 4096;
42/// Initial window-ack size advertised to the peer. Values of this
43/// order are what "normal" RTMP servers announce.
44const WINDOW_ACK_SIZE: u32 = 5_000_000;
45/// `limit_type` for SetPeerBandwidth — 2 = "dynamic".
46const PEER_BW_LIMIT_DYNAMIC: u8 = 2;
47
48/// Listening socket for incoming RTMP publishers.
49pub struct RtmpServer {
50 listener: TcpListener,
51 /// Enhanced RTMP capability block this server advertises in the
52 /// `_result(connect)` info object (`videoFourCcInfoMap` / `capsEx`
53 /// etc., per `enhanced-rtmp-v2.pdf` §"Enhancing NetConnection
54 /// connect Command"). Defaults to empty so legacy publishers see
55 /// the pre-2023 byte layout exactly. Mutate with
56 /// [`set_capabilities`](Self::set_capabilities).
57 capabilities: ConnectCapabilities,
58}
59
60impl RtmpServer {
61 pub fn bind(addr: impl ToSocketAddrs) -> Result<Self> {
62 let listener = TcpListener::bind(addr)?;
63 Ok(Self {
64 listener,
65 capabilities: ConnectCapabilities::default(),
66 })
67 }
68
69 pub fn local_addr(&self) -> Result<SocketAddr> {
70 Ok(self.listener.local_addr()?)
71 }
72
73 /// Advertise the given Enhanced RTMP v1+v2 capabilities to every
74 /// subsequent `accept`-ed publisher. The block is appended to the
75 /// `_result(connect)` info object alongside the standard
76 /// `NetConnection.Connect.Success` status; legacy publishers ignore
77 /// the unknown properties and stay on the pre-2023 path. Pre-2023
78 /// is also what `set_capabilities(ConnectCapabilities::default())`
79 /// (or never calling this method) wires up.
80 pub fn set_capabilities(&mut self, caps: ConnectCapabilities) -> &mut Self {
81 self.capabilities = caps;
82 self
83 }
84
85 /// Capability block this server currently advertises.
86 pub fn capabilities(&self) -> &ConnectCapabilities {
87 &self.capabilities
88 }
89
90 /// Accept one connection, run the handshake + connect + publish
91 /// setup, and return the first point where the consumer gets to
92 /// decide whether to take the stream.
93 pub fn accept(&self) -> Result<PublishRequest> {
94 loop {
95 let (stream, peer_addr) = self.listener.accept()?;
96 // Individual parse failures shouldn't bring down the
97 // server — log via Err(...) once, then keep listening. A
98 // caller that wants fine-grained control uses `incoming()`
99 // plus their own handshake.
100 match drive_until_publish(stream, peer_addr, &self.capabilities) {
101 Ok(req) => return Ok(req),
102 Err(e) => {
103 eprintln!("oxideav-rtmp: dropped connection from {peer_addr}: {e}");
104 }
105 }
106 }
107 }
108
109 /// Loop forever, spawning one thread per accepted publisher. The
110 /// `handler` is called after `accept()` — i.e. it receives a
111 /// `PublishRequest` it can accept / reject the same way the
112 /// single-client path does.
113 ///
114 /// The handler should do its own work on the returned
115 /// [`RtmpSession`] (call `next_packet` until it returns `None`,
116 /// then drop). Panics in the handler are caught by the per-thread
117 /// panic boundary.
118 pub fn serve<F>(&self, handler: F) -> Result<()>
119 where
120 F: Fn(PublishRequest) + Send + Sync + 'static,
121 {
122 use std::sync::Arc;
123 let handler = Arc::new(handler);
124 let caps = Arc::new(self.capabilities.clone());
125 for conn in self.listener.incoming() {
126 let stream = match conn {
127 Ok(s) => s,
128 Err(e) => {
129 eprintln!("oxideav-rtmp: accept failed: {e}");
130 continue;
131 }
132 };
133 let peer_addr = match stream.peer_addr() {
134 Ok(a) => a,
135 Err(_) => continue,
136 };
137 let h = handler.clone();
138 let c = caps.clone();
139 thread::Builder::new()
140 .name(format!("oxideav-rtmp-session-{peer_addr}"))
141 .spawn(move || match drive_until_publish(stream, peer_addr, &c) {
142 Ok(req) => h(req),
143 Err(e) => {
144 eprintln!("oxideav-rtmp: dropped connection from {peer_addr}: {e}");
145 }
146 })
147 .map_err(|e| Error::Other(format!("spawn session thread: {e}")))?;
148 }
149 Ok(())
150 }
151}
152
153/// The protocol has gotten through `publish` — we know which app the
154/// client connected to and the stream name (commonly the stream key).
155/// Consumer decides whether to accept.
156pub struct PublishRequest {
157 pub app: String,
158 pub stream_name: String,
159 /// Usually `"live"`; occasionally `"record"` or `"append"`.
160 pub publish_type: String,
161 pub peer_addr: SocketAddr,
162 /// The `tcUrl` field from the client's connect command — useful
163 /// when consumers want the full url for logging.
164 pub tc_url: String,
165 /// Enhanced RTMP v1+v2 capability block lifted from the publisher's
166 /// `connect` Command Object (`fourCcList` /
167 /// `audio|videoFourCcInfoMap` / `capsEx`, per
168 /// `enhanced-rtmp-v2.pdf` §"Enhancing NetConnection connect
169 /// Command"). Empty for legacy publishers that don't advertise any
170 /// E-RTMP capabilities.
171 pub capabilities: ConnectCapabilities,
172 pending: PendingSession,
173}
174
175struct PendingSession {
176 stream: TcpStream,
177 reader: ChunkReader<TcpStream>,
178 writer: ChunkWriter<TcpStream>,
179 stream_id: u32,
180 /// Kept in the struct so a future "send _result for publish"
181 /// tweak can reference the right tx id. Currently we skip the
182 /// _result and go straight to onStatus.
183 #[allow(dead_code)]
184 publish_tx_id: f64,
185}
186
187impl PublishRequest {
188 /// Take the stream: send `NetStream.Publish.Start` and return a
189 /// session the caller pumps via [`RtmpSession::next_packet`].
190 pub fn accept(self) -> Result<RtmpSession> {
191 let PublishRequest {
192 app,
193 stream_name,
194 publish_type,
195 peer_addr,
196 tc_url: _,
197 capabilities: _,
198 pending,
199 } = self;
200 let PendingSession {
201 stream,
202 reader,
203 mut writer,
204 stream_id,
205 publish_tx_id: _,
206 } = pending;
207
208 writer.write_message(
209 CSID_PROTOCOL_CONTROL,
210 &build_user_control_stream_begin(stream_id),
211 )?;
212 writer.write_message(
213 CSID_COMMAND,
214 &build_on_status(
215 stream_id,
216 "status",
217 "NetStream.Publish.Start",
218 &format!("Started publishing {stream_name}"),
219 ),
220 )?;
221 writer.flush()?;
222
223 Ok(RtmpSession {
224 stream,
225 reader,
226 writer,
227 app,
228 stream_name,
229 publish_type,
230 peer_addr,
231 stream_id,
232 ended: false,
233 pending_subs: VecDeque::new(),
234 })
235 }
236
237 /// Politely reject the publish: emit `NetStream.Publish.BadName`
238 /// with `reason` as the description, then drop the connection.
239 pub fn reject(self, reason: &str) -> Result<()> {
240 let PublishRequest { pending, .. } = self;
241 let PendingSession {
242 stream,
243 mut writer,
244 stream_id,
245 ..
246 } = pending;
247 let _ = writer.write_message(
248 CSID_COMMAND,
249 &build_on_status(stream_id, "error", "NetStream.Publish.BadName", reason),
250 );
251 let _ = writer.flush();
252 let _ = stream.shutdown(Shutdown::Both);
253 Err(Error::Rejected(reason.to_string()))
254 }
255}
256
257/// Active publish after `accept`. Iterate via [`RtmpSession::next_packet`].
258pub struct RtmpSession {
259 stream: TcpStream,
260 reader: ChunkReader<TcpStream>,
261 writer: ChunkWriter<TcpStream>,
262 app: String,
263 stream_name: String,
264 publish_type: String,
265 peer_addr: SocketAddr,
266 stream_id: u32,
267 ended: bool,
268 /// Sub-messages decomposed out of an Aggregate Message (type 22)
269 /// per RTMP 1.0 §7.1.6 but not yet surfaced as a [`StreamPacket`].
270 /// When [`next_packet`](Self::next_packet) sees a `MSG_AGGREGATE`
271 /// on the wire, [`parse_aggregate`] splits the body into
272 /// FLV-shaped sub-messages (audio / video / data / command) with
273 /// the §7.1.6 timestamp re-normalisation already applied and the
274 /// `msg_stream_id` override resolved to the aggregate's; those
275 /// subs land here and the dispatch loop drains the queue ahead of
276 /// every subsequent wire read so the caller observes the
277 /// per-sub packets in the order the publisher packed them.
278 pending_subs: VecDeque<Message>,
279}
280
281/// One media-layer event reported to the caller.
282#[derive(Debug, Clone)]
283pub enum StreamPacket {
284 Audio {
285 timestamp: u32,
286 tag: AudioTag,
287 },
288 Video {
289 timestamp: u32,
290 tag: VideoTag,
291 },
292 /// `@setDataFrame("onMetaData", <amf0>)`. The AMF0 value is the
293 /// metadata object (usually width, height, codec ids, framerate,
294 /// bitrate, audiodatarate, ...).
295 Metadata(Amf0Value),
296}
297
298impl RtmpSession {
299 pub fn app(&self) -> &str {
300 &self.app
301 }
302 pub fn stream_name(&self) -> &str {
303 &self.stream_name
304 }
305 pub fn publish_type(&self) -> &str {
306 &self.publish_type
307 }
308 pub fn peer_addr(&self) -> SocketAddr {
309 self.peer_addr
310 }
311
312 /// Configure a read timeout on the underlying TCP socket — helpful
313 /// when you want `next_packet` to return periodically so an outer
314 /// shutdown signal can be observed. Passes through to
315 /// [`TcpStream::set_read_timeout`].
316 ///
317 /// The timeout is applied to the chunk reader's actual socket
318 /// clone (the one [`next_packet`](Self::next_packet) reads
319 /// through) rather than the session's bookkeeping clone. On
320 /// Linux a sockopt set through one `try_clone` descriptor carries
321 /// to its sibling clones because they share one file description;
322 /// Windows assigns each clone its own kernel handle with
323 /// independent socket options, so the timeout must be installed
324 /// on the exact socket that will issue the `recv` call.
325 pub fn set_read_timeout(&mut self, d: Option<Duration>) -> Result<()> {
326 self.reader.inner_mut().set_read_timeout(d)?;
327 // Also apply to the bookkeeping clone for any future direct
328 // reads through `self.stream` (none today, but defensive).
329 let _ = self.stream.set_read_timeout(d);
330 Ok(())
331 }
332
333 /// Emit a `UserControl StreamDry(stream_id)` event on the publish
334 /// stream (RTMP 1.0 §3.7, UCM type 2).
335 ///
336 /// Per spec: "the server sends this event to notify the client
337 /// that there is no more data on the stream. If the server does
338 /// not detect any message for a time period, it can notify the
339 /// subscribed clients that the stream is dry." Distinct from
340 /// [`close`](Self::close)'s `StreamEOF`: `StreamDry` is a
341 /// transient "we have nothing right now" signal that may resolve
342 /// when more data arrives, not a teardown.
343 pub fn send_stream_dry(&mut self) -> Result<()> {
344 self.writer.write_message(
345 CSID_PROTOCOL_CONTROL,
346 &build_user_control_stream_dry(self.stream_id),
347 )?;
348 self.writer.flush()?;
349 Ok(())
350 }
351
352 /// Emit a `UserControl StreamIsRecorded(stream_id)` event on the
353 /// publish stream (RTMP 1.0 §3.7, UCM type 4).
354 ///
355 /// Per spec: "the server sends this event to notify the client
356 /// that the stream is a recorded stream." A server fronting an
357 /// archival recorder may want to advertise this after the publish
358 /// handshake settles so a forwarding peer knows the captured
359 /// stream is replayable rather than ephemeral.
360 pub fn send_stream_is_recorded(&mut self) -> Result<()> {
361 self.writer.write_message(
362 CSID_PROTOCOL_CONTROL,
363 &build_user_control_stream_is_recorded(self.stream_id),
364 )?;
365 self.writer.flush()?;
366 Ok(())
367 }
368
369 /// Emit a `UserControl PingRequest(timestamp_ms)` event (RTMP 1.0
370 /// §3.7, UCM type 6).
371 ///
372 /// Per spec, "the server sends this event to test whether the
373 /// client is reachable. Event data is a 4-byte timestamp,
374 /// representing the local server time when the server dispatched
375 /// the command." The client (our [`RtmpClient`]) replies with the
376 /// matching `PingResponse` carrying the same 4 bytes —
377 /// `RtmpClient::poll_event` answers the ping internally without
378 /// surfacing the request to the publisher caller.
379 pub fn send_ping_request(&mut self, timestamp_ms: u32) -> Result<()> {
380 self.writer.write_message(
381 CSID_PROTOCOL_CONTROL,
382 &build_user_control_ping_request(timestamp_ms),
383 )?;
384 self.writer.flush()?;
385 Ok(())
386 }
387
388 /// Ask the publisher to reconnect — Enhanced RTMP v2 §"Reconnect
389 /// Request".
390 ///
391 /// Emits the `onStatus(NetConnection.Connect.ReconnectRequest)`
392 /// NetConnection command (message stream 0, transaction id 0, null
393 /// Command Object). Per the spec's message flow, a server does
394 /// this "prior to the shutdown of the live streaming server or
395 /// when the server intends to remap the client to another server
396 /// instance" — and when remapping, it MUST pass the target via
397 /// `tc_url` (absolute or relative URI reference; `None` tells the
398 /// client to re-dial the tcUrl of the current connection).
399 ///
400 /// After sending, the spec requires the old server to "continue
401 /// processing messages from the client until the client
402 /// disconnects" — so keep pumping
403 /// [`next_packet`](Self::next_packet) as usual; the publisher
404 /// drains up to its next appropriate media boundary (such as a
405 /// keyframe) before it actually moves.
406 ///
407 /// Note: per §"Enhancing NetConnection connect Command" the peer
408 /// advertises reconnect support via the `capsEx`
409 /// [`CAPS_EX_RECONNECT`](crate::caps::CAPS_EX_RECONNECT) bit —
410 /// check [`PublishRequest::capabilities`] before relying on the
411 /// client honouring this event.
412 pub fn send_reconnect_request(
413 &mut self,
414 tc_url: Option<&str>,
415 description: Option<&str>,
416 ) -> Result<()> {
417 self.writer
418 .write_message(CSID_COMMAND, &build_reconnect_request(tc_url, description))?;
419 self.writer.flush()?;
420 Ok(())
421 }
422
423 /// Close the session politely.
424 ///
425 /// On the wire we emit, in order:
426 ///
427 /// 1. A `UserControl StreamEOF(stream_id)` event so the peer's
428 /// chunk-stream state machine learns the publish is done before
429 /// it observes the TCP FIN (RTMP 1.0 §7.1.7).
430 /// 2. `onStatus(NetStream.Unpublish.Success)` on the publish stream.
431 /// 3. A chunk-writer `flush()` so every buffered chunk reaches the
432 /// kernel before the half-close.
433 ///
434 /// Then we send a write-half FIN (`Shutdown::Write`) rather than
435 /// tearing both halves down at once. `Shutdown::Both` instantly
436 /// closes the read half too, which on some platforms makes the
437 /// kernel answer the peer's still-unacked data with a RST and
438 /// discard any A/V messages the peer hasn't yet drained from its
439 /// receive buffer — closeStream / the StreamEOF event / the last
440 /// frames just written can be thrown away mid-stream. A write-half
441 /// FIN lets the peer read everything we just wrote, then observe
442 /// EOF cleanly. The read half closes when `self` (and its owned
443 /// `TcpStream`) drops at end of scope.
444 pub fn close(mut self) -> Result<()> {
445 let _ = self.writer.write_message(
446 CSID_PROTOCOL_CONTROL,
447 &build_user_control_stream_eof(self.stream_id),
448 );
449 let _ = self.writer.write_message(
450 CSID_COMMAND,
451 &build_on_status(
452 self.stream_id,
453 "status",
454 "NetStream.Unpublish.Success",
455 "Stream closed.",
456 ),
457 );
458 let _ = self.writer.flush();
459 let _ = self.stream.shutdown(Shutdown::Write);
460 Ok(())
461 }
462
463 /// Read the next audio / video / metadata packet from the
464 /// publisher. Returns `Ok(None)` when the peer cleanly closed the
465 /// stream (via `closeStream` / `deleteStream` / `FCUnpublish`).
466 ///
467 /// Aggregate Messages (RTMP 1.0 §7.1.6, message type id `22`) are
468 /// decomposed transparently: the sub-messages enter an internal
469 /// queue and the dispatch loop drains them in publish order ahead
470 /// of any further wire read, so a publisher that bundles several
471 /// frames into one aggregate (fewer chunk headers on the wire)
472 /// surfaces the same per-frame `StreamPacket` sequence as a
473 /// publisher that sends them individually.
474 pub fn next_packet(&mut self) -> Result<Option<StreamPacket>> {
475 while !self.ended {
476 // Drain queued aggregate sub-messages ahead of any further
477 // wire read so the publisher's pack order is preserved.
478 if let Some(sub) = self.pending_subs.pop_front() {
479 if let Some(pkt) = self.handle_message(sub)? {
480 return Ok(Some(pkt));
481 }
482 continue;
483 }
484 let msg = match self.reader.read_message() {
485 Ok(m) => m,
486 Err(Error::Io(e))
487 if matches!(
488 e.kind(),
489 std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionReset
490 ) =>
491 {
492 return Ok(None);
493 }
494 Err(e) => return Err(e),
495 };
496 // §5.3: once the publisher has sent a full window of bytes,
497 // owe it an Acknowledgement carrying the running sequence
498 // number. Send before dispatching so the ack reflects the
499 // bytes through this message.
500 self.maybe_send_ack()?;
501 if let Some(pkt) = self.handle_message(msg)? {
502 return Ok(Some(pkt));
503 }
504 }
505 Ok(None)
506 }
507
508 /// Emit a §5.3 Acknowledgement if the reader's received-byte count
509 /// has crossed the peer-negotiated §5.5 window since the last one.
510 /// No-op until a window has been negotiated (`Window Acknowledgement
511 /// Size` / `Set Peer Bandwidth` from the publisher).
512 fn maybe_send_ack(&mut self) -> Result<()> {
513 if let Some(seq) = self.reader.ack_due() {
514 self.writer
515 .write_message(CSID_PROTOCOL_CONTROL, &build_ack(seq))?;
516 self.writer.flush()?;
517 }
518 Ok(())
519 }
520
521 /// Per-message dispatch shared between the wire path and the
522 /// aggregate-sub-drain path. Returns `Ok(Some(packet))` if the
523 /// message produced a user-visible event, `Ok(None)` if it was
524 /// consumed silently (protocol control, command teardown setting
525 /// `self.ended`, etc.) and the loop should keep reading.
526 fn handle_message(&mut self, msg: Message) -> Result<Option<StreamPacket>> {
527 match msg.msg_type_id {
528 MSG_AUDIO => {
529 let tag = parse_audio(&msg.payload)?;
530 Ok(Some(StreamPacket::Audio {
531 timestamp: msg.timestamp,
532 tag,
533 }))
534 }
535 MSG_VIDEO => {
536 let tag = parse_video(&msg.payload)?;
537 Ok(Some(StreamPacket::Video {
538 timestamp: msg.timestamp,
539 tag,
540 }))
541 }
542 MSG_DATA_AMF0 => {
543 // @setDataFrame + onMetaData + <object>
544 let values = amf::decode_all(&msg.payload)?;
545 // Common shape: ["@setDataFrame", "onMetaData",
546 // <meta>]. Some clients omit "@setDataFrame" and
547 // just send ["onMetaData", <meta>]. Accept both.
548 Ok(metadata_object(&values).map(StreamPacket::Metadata))
549 }
550 MSG_DATA_AMF3 => {
551 // AMF3-encoded data message (type 15). Per AMF3 §4.1
552 // the body is an AMF0 frame switching to AMF3 via the
553 // avmplus marker; decode it and bridge each value onto
554 // the AMF0 shape so metadata flows through the same
555 // path as MSG_DATA_AMF0.
556 let values: Vec<Amf0Value> = amf3::decode_data_message(&msg.payload)?
557 .iter()
558 .map(amf3::Amf3Value::to_amf0)
559 .collect();
560 Ok(metadata_object(&values).map(StreamPacket::Metadata))
561 }
562 MSG_COMMAND_AMF0 => {
563 // Likely closeStream / deleteStream /
564 // FCUnpublish — peer is shutting down.
565 let values = amf::decode_all(&msg.payload)?;
566 if let Some(name) = values.first().and_then(Amf0Value::as_str) {
567 if matches!(name, "closeStream" | "deleteStream" | "FCUnpublish") {
568 self.ended = true;
569 }
570 }
571 Ok(None)
572 }
573 MSG_COMMAND_AMF3 => {
574 // AMF3-encoded command (type 17). Same teardown
575 // detection as the AMF0 command path.
576 let values: Vec<Amf0Value> = amf3::decode_data_message(&msg.payload)?
577 .iter()
578 .map(amf3::Amf3Value::to_amf0)
579 .collect();
580 if let Some(name) = values.first().and_then(Amf0Value::as_str) {
581 if matches!(name, "closeStream" | "deleteStream" | "FCUnpublish") {
582 self.ended = true;
583 }
584 }
585 Ok(None)
586 }
587 MSG_AGGREGATE => {
588 // RTMP 1.0 §7.1.6 Aggregate Message. Split into
589 // FLV-shaped sub-messages with the §7.1.6 timestamp
590 // re-normalisation applied and the message-stream-id
591 // override resolved; queue them so subsequent calls
592 // surface the per-sub packets in publish order. Sub
593 // ordering is preserved verbatim. A nested aggregate
594 // (sub `msg_type_id == 22`) is forwarded to the queue
595 // and the next dispatch tick recurses through the same
596 // `MSG_AGGREGATE` arm so a bounded depth of nesting
597 // resolves transparently; an unbounded chain would
598 // surface as repeated parser work, not stack growth.
599 let subs = parse_aggregate(&msg)?;
600 self.pending_subs.extend(subs);
601 Ok(None)
602 }
603 MSG_SET_CHUNK_SIZE => {
604 let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
605 self.reader.set_chunk_size(size as usize);
606 Ok(None)
607 }
608 MSG_WINDOW_ACK_SIZE => {
609 // §5.5: the peer is telling us which window size to use
610 // when sending Acknowledgements. Honour it so our §5.3
611 // ack cadence matches what the publisher expects.
612 let size = read_u32_be(&msg.payload)?;
613 self.reader.set_window_ack_size(size);
614 Ok(None)
615 }
616 MSG_SET_PEER_BANDWIDTH => {
617 // §5.6: "The output bandwidth value is the same as the
618 // window size for the peer." The first 4 bytes carry
619 // that window size; adopt it as our send-side ack
620 // window too. (The trailing Limit type byte is
621 // advisory and doesn't change our framing.)
622 if msg.payload.len() >= 4 {
623 let size = read_u32_be(&msg.payload[..4])?;
624 self.reader.set_window_ack_size(size);
625 }
626 Ok(None)
627 }
628 MSG_ACK | MSG_USER_CONTROL => {
629 // Informational — the peer's §5.3 sequence number (ACK)
630 // or a user-control event we don't surface as a packet.
631 Ok(None)
632 }
633 _ => {
634 // Unknown / unhandled — swallow and keep going.
635 Ok(None)
636 }
637 }
638 }
639}
640
641// ---------------------------------------------------------------------------
642// Protocol driver: handshake → connect → createStream → publish
643// ---------------------------------------------------------------------------
644
645fn drive_until_publish(
646 stream: TcpStream,
647 peer_addr: SocketAddr,
648 server_caps: &ConnectCapabilities,
649) -> Result<PublishRequest> {
650 // TCP-level defaults: nodelay (RTMP is command-heavy during setup),
651 // keepalive so idle publishers are detected.
652 let _ = stream.set_nodelay(true);
653
654 // Run the handshake on a plain clone of the stream (no chunk state
655 // yet).
656 let mut hs_stream = stream.try_clone()?;
657 crate::handshake::server_handshake(&mut hs_stream)?;
658
659 // Reader / writer share the same TCP stream via `try_clone`.
660 let reader_stream = stream.try_clone()?;
661 let writer_stream = stream.try_clone()?;
662 let mut reader = ChunkReader::new(reader_stream);
663 let mut writer = ChunkWriter::new(writer_stream);
664
665 // Wait for connect. These get populated when we see the
666 // `connect` command below.
667 let tc_url;
668 let app;
669 let client_capabilities;
670 loop {
671 let msg = reader.read_message()?;
672 match msg.msg_type_id {
673 MSG_SET_CHUNK_SIZE => {
674 let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
675 reader.set_chunk_size(size as usize);
676 }
677 MSG_WINDOW_ACK_SIZE => {
678 let size = read_u32_be(&msg.payload)?;
679 reader.set_window_ack_size(size);
680 }
681 MSG_SET_PEER_BANDWIDTH if msg.payload.len() >= 4 => {
682 let size = read_u32_be(&msg.payload[..4])?;
683 reader.set_window_ack_size(size);
684 }
685 MSG_COMMAND_AMF0 => {
686 let values = amf::decode_all(&msg.payload)?;
687 let name = values
688 .first()
689 .and_then(Amf0Value::as_str)
690 .ok_or_else(|| Error::InvalidCommand("missing command name".into()))?;
691 if name != "connect" {
692 return Err(Error::InvalidCommand(format!(
693 "expected `connect` first, got `{name}`"
694 )));
695 }
696 let tx_id = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(1.0);
697 let cmd_obj = values.get(2).ok_or_else(|| {
698 Error::InvalidCommand("`connect` missing command object".into())
699 })?;
700 tc_url = cmd_obj
701 .get("tcUrl")
702 .and_then(Amf0Value::as_str)
703 .unwrap_or("")
704 .to_owned();
705 app = cmd_obj
706 .get("app")
707 .and_then(Amf0Value::as_str)
708 .unwrap_or("")
709 .to_owned();
710 // Lift Enhanced RTMP v1+v2 capability advertisement out
711 // of the Command Object. Legacy publishers leave this
712 // empty.
713 client_capabilities = ConnectCapabilities::from_amf0(cmd_obj);
714
715 // Reply: WindowAckSize + SetPeerBandwidth + StreamBegin
716 // + _result + SetChunkSize. Order matches what most
717 // commodity ingest servers send. The server's own
718 // capability advertisement rides inside the _result
719 // info object — see `build_connect_result_with_caps`.
720 writer.write_message(
721 CSID_PROTOCOL_CONTROL,
722 &build_window_ack_size(WINDOW_ACK_SIZE),
723 )?;
724 writer.write_message(
725 CSID_PROTOCOL_CONTROL,
726 &build_set_peer_bandwidth(WINDOW_ACK_SIZE, PEER_BW_LIMIT_DYNAMIC),
727 )?;
728 writer.write_message(CSID_PROTOCOL_CONTROL, &build_user_control_stream_begin(0))?;
729 writer.write_message(
730 CSID_COMMAND,
731 &build_connect_result_with_caps(tx_id, server_caps),
732 )?;
733 writer.write_message(
734 CSID_PROTOCOL_CONTROL,
735 &build_set_chunk_size(SERVER_CHUNK_SIZE),
736 )?;
737 writer.set_chunk_size(SERVER_CHUNK_SIZE as usize);
738 writer.flush()?;
739 break;
740 }
741 _ => {
742 // Silently accept other pre-connect messages (usually
743 // nothing but SetChunkSize).
744 }
745 }
746 }
747
748 // Handle releaseStream / FCPublish / createStream / publish until
749 // we see publish.
750 let mut next_stream_id: u32 = 1;
751 loop {
752 let msg = reader.read_message()?;
753 match msg.msg_type_id {
754 MSG_SET_CHUNK_SIZE => {
755 let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
756 reader.set_chunk_size(size as usize);
757 continue;
758 }
759 MSG_WINDOW_ACK_SIZE => {
760 let size = read_u32_be(&msg.payload)?;
761 reader.set_window_ack_size(size);
762 continue;
763 }
764 MSG_SET_PEER_BANDWIDTH if msg.payload.len() >= 4 => {
765 let size = read_u32_be(&msg.payload[..4])?;
766 reader.set_window_ack_size(size);
767 continue;
768 }
769 MSG_COMMAND_AMF0 => {
770 let values = amf::decode_all(&msg.payload)?;
771 let name = values
772 .first()
773 .and_then(Amf0Value::as_str)
774 .ok_or_else(|| Error::InvalidCommand("missing command name".into()))?
775 .to_owned();
776 let tx_id = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(0.0);
777 match name.as_str() {
778 "releaseStream" | "FCPublish" => {
779 // Many peers want a _result back; send a minimal
780 // one. Arg slot [3] is the stream name we can
781 // echo.
782 let payload = amf::encode_command(
783 "_result",
784 tx_id,
785 Amf0Value::Null,
786 &[Amf0Value::Undefined],
787 );
788 let reply = Message {
789 msg_type_id: MSG_COMMAND_AMF0,
790 msg_stream_id: 0,
791 timestamp: 0,
792 payload,
793 };
794 writer.write_message(CSID_COMMAND, &reply)?;
795 writer.flush()?;
796 }
797 "createStream" => {
798 let sid = next_stream_id;
799 next_stream_id += 1;
800 writer.write_message(
801 CSID_COMMAND,
802 &build_create_stream_result(tx_id, sid as f64),
803 )?;
804 writer.flush()?;
805 }
806 "publish" => {
807 // Args: [stream_name, publish_type].
808 let stream_name = values
809 .get(3)
810 .and_then(Amf0Value::as_str)
811 .ok_or_else(|| {
812 Error::InvalidCommand("publish missing stream_name".into())
813 })?
814 .to_owned();
815 let publish_type = values
816 .get(4)
817 .and_then(Amf0Value::as_str)
818 .unwrap_or("live")
819 .to_owned();
820 return Ok(PublishRequest {
821 app,
822 stream_name,
823 publish_type,
824 peer_addr,
825 tc_url,
826 capabilities: client_capabilities,
827 pending: PendingSession {
828 stream,
829 reader,
830 writer,
831 stream_id: msg.msg_stream_id.max(1),
832 publish_tx_id: tx_id,
833 },
834 });
835 }
836 _ => {
837 // Unknown command — keep listening.
838 }
839 }
840 }
841 _ => {
842 // Ignore audio / video / data / control messages
843 // arriving before publish — not strictly legal but
844 // seen in the wild.
845 }
846 }
847 }
848}
849
850/// Pull the metadata object out of a decoded data-message value list.
851///
852/// `@setDataFrame("onMetaData", <meta>)` is the standard publish shape;
853/// some clients omit the leading `@setDataFrame` and send just
854/// `["onMetaData", <meta>]`. Either way the payload object is the last
855/// Object / ECMA-array value in the list, so search from the back.
856fn metadata_object(values: &[Amf0Value]) -> Option<Amf0Value> {
857 values
858 .iter()
859 .rev()
860 .find(|v| matches!(v, Amf0Value::Object(_) | Amf0Value::EcmaArray(_)))
861 .cloned()
862}
863
864fn read_u32_be(buf: &[u8]) -> Result<u32> {
865 if buf.len() < 4 {
866 return Err(Error::ProtocolViolation("need 4 bytes for u32be".into()));
867 }
868 Ok(u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]))
869}
870
871/// Free TCP-level helper for `stream`-owner code to read pending
872/// writes synchronously.
873#[allow(dead_code)]
874fn flush_writer<W: Write>(w: &mut W) -> Result<()> {
875 w.flush()?;
876 Ok(())
877}
878
879#[allow(dead_code)]
880fn read_exact<R: Read>(r: &mut R, n: usize) -> Result<Vec<u8>> {
881 let mut buf = vec![0u8; n];
882 r.read_exact(&mut buf)?;
883 Ok(buf)
884}