Skip to main content

oxideav_rtmp/
server.rs

1//! RTMP server: accepts an incoming publisher.
2//!
3//! The exposed flow is intentionally two-phase so consumers can
4//! verify stream keys / auth:
5//!
6//! ```text
7//!   let server = RtmpServer::bind("0.0.0.0:1935")?;
8//!   loop {
9//!       let req = server.accept()?;
10//!       if !my_auth(&req.app, &req.stream_name) {
11//!           req.reject("unauthorized")?;
12//!           continue;
13//!       }
14//!       let mut session = req.accept()?;
15//!       while let Some(pkt) = session.next_packet()? { … }
16//!   }
17//! ```
18//!
19//! [`RtmpServer::serve`] wraps the above in a thread-per-connection
20//! loop for callers who want to handle many publishers at once.
21//! Single-client use — the typical oxideav case — just calls
22//! [`RtmpServer::accept`] directly.
23
24use std::collections::VecDeque;
25use std::io::{Read, Write};
26use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
27use std::thread;
28use std::time::Duration;
29
30use crate::aggregate::parse_aggregate;
31use crate::amf::{self, Amf0Value};
32use crate::amf3;
33use crate::caps::ConnectCapabilities;
34use crate::chunk::{ChunkReader, ChunkWriter, Message};
35use crate::error::{Error, Result};
36use crate::flv::{parse_audio, parse_video, AudioTag, VideoTag};
37use crate::message::*;
38
39/// After-connect server chunk size. Larger = fewer chunk headers per
40/// message. 4 KiB is what most commodity ingest paths negotiate in practice.
41const SERVER_CHUNK_SIZE: u32 = 4096;
42/// Initial window-ack size advertised to the peer. Values of this
43/// order are what "normal" RTMP servers announce.
44const WINDOW_ACK_SIZE: u32 = 5_000_000;
45/// `limit_type` for SetPeerBandwidth — 2 = "dynamic".
46const PEER_BW_LIMIT_DYNAMIC: u8 = 2;
47
48/// Listening socket for incoming RTMP publishers.
49pub struct RtmpServer {
50    listener: TcpListener,
51    /// Enhanced RTMP capability block this server advertises in the
52    /// `_result(connect)` info object (`videoFourCcInfoMap` / `capsEx`
53    /// etc., per `enhanced-rtmp-v2.pdf` §"Enhancing NetConnection
54    /// connect Command"). Defaults to empty so legacy publishers see
55    /// the pre-2023 byte layout exactly. Mutate with
56    /// [`set_capabilities`](Self::set_capabilities).
57    capabilities: ConnectCapabilities,
58}
59
60impl RtmpServer {
61    pub fn bind(addr: impl ToSocketAddrs) -> Result<Self> {
62        let listener = TcpListener::bind(addr)?;
63        Ok(Self {
64            listener,
65            capabilities: ConnectCapabilities::default(),
66        })
67    }
68
69    pub fn local_addr(&self) -> Result<SocketAddr> {
70        Ok(self.listener.local_addr()?)
71    }
72
73    /// Advertise the given Enhanced RTMP v1+v2 capabilities to every
74    /// subsequent `accept`-ed publisher. The block is appended to the
75    /// `_result(connect)` info object alongside the standard
76    /// `NetConnection.Connect.Success` status; legacy publishers ignore
77    /// the unknown properties and stay on the pre-2023 path. Pre-2023
78    /// is also what `set_capabilities(ConnectCapabilities::default())`
79    /// (or never calling this method) wires up.
80    pub fn set_capabilities(&mut self, caps: ConnectCapabilities) -> &mut Self {
81        self.capabilities = caps;
82        self
83    }
84
85    /// Capability block this server currently advertises.
86    pub fn capabilities(&self) -> &ConnectCapabilities {
87        &self.capabilities
88    }
89
90    /// Accept one connection, run the handshake + connect + publish
91    /// setup, and return the first point where the consumer gets to
92    /// decide whether to take the stream.
93    pub fn accept(&self) -> Result<PublishRequest> {
94        loop {
95            let (stream, peer_addr) = self.listener.accept()?;
96            // Individual parse failures shouldn't bring down the
97            // server — log via Err(...) once, then keep listening. A
98            // caller that wants fine-grained control uses `incoming()`
99            // plus their own handshake.
100            match drive_until_publish(stream, peer_addr, &self.capabilities) {
101                Ok(req) => return Ok(req),
102                Err(e) => {
103                    eprintln!("oxideav-rtmp: dropped connection from {peer_addr}: {e}");
104                }
105            }
106        }
107    }
108
109    /// Loop forever, spawning one thread per accepted publisher. The
110    /// `handler` is called after `accept()` — i.e. it receives a
111    /// `PublishRequest` it can accept / reject the same way the
112    /// single-client path does.
113    ///
114    /// The handler should do its own work on the returned
115    /// [`RtmpSession`] (call `next_packet` until it returns `None`,
116    /// then drop). Panics in the handler are caught by the per-thread
117    /// panic boundary.
118    pub fn serve<F>(&self, handler: F) -> Result<()>
119    where
120        F: Fn(PublishRequest) + Send + Sync + 'static,
121    {
122        use std::sync::Arc;
123        let handler = Arc::new(handler);
124        let caps = Arc::new(self.capabilities.clone());
125        for conn in self.listener.incoming() {
126            let stream = match conn {
127                Ok(s) => s,
128                Err(e) => {
129                    eprintln!("oxideav-rtmp: accept failed: {e}");
130                    continue;
131                }
132            };
133            let peer_addr = match stream.peer_addr() {
134                Ok(a) => a,
135                Err(_) => continue,
136            };
137            let h = handler.clone();
138            let c = caps.clone();
139            thread::Builder::new()
140                .name(format!("oxideav-rtmp-session-{peer_addr}"))
141                .spawn(move || match drive_until_publish(stream, peer_addr, &c) {
142                    Ok(req) => h(req),
143                    Err(e) => {
144                        eprintln!("oxideav-rtmp: dropped connection from {peer_addr}: {e}");
145                    }
146                })
147                .map_err(|e| Error::Other(format!("spawn session thread: {e}")))?;
148        }
149        Ok(())
150    }
151}
152
153/// The protocol has gotten through `publish` — we know which app the
154/// client connected to and the stream name (commonly the stream key).
155/// Consumer decides whether to accept.
156pub struct PublishRequest {
157    pub app: String,
158    pub stream_name: String,
159    /// Usually `"live"`; occasionally `"record"` or `"append"`.
160    pub publish_type: String,
161    pub peer_addr: SocketAddr,
162    /// The `tcUrl` field from the client's connect command — useful
163    /// when consumers want the full url for logging.
164    pub tc_url: String,
165    /// Enhanced RTMP v1+v2 capability block lifted from the publisher's
166    /// `connect` Command Object (`fourCcList` /
167    /// `audio|videoFourCcInfoMap` / `capsEx`, per
168    /// `enhanced-rtmp-v2.pdf` §"Enhancing NetConnection connect
169    /// Command"). Empty for legacy publishers that don't advertise any
170    /// E-RTMP capabilities.
171    pub capabilities: ConnectCapabilities,
172    pending: PendingSession,
173}
174
175struct PendingSession {
176    stream: TcpStream,
177    reader: ChunkReader<TcpStream>,
178    writer: ChunkWriter<TcpStream>,
179    stream_id: u32,
180    /// Kept in the struct so a future "send _result for publish"
181    /// tweak can reference the right tx id. Currently we skip the
182    /// _result and go straight to onStatus.
183    #[allow(dead_code)]
184    publish_tx_id: f64,
185}
186
187impl PublishRequest {
188    /// Take the stream: send `NetStream.Publish.Start` and return a
189    /// session the caller pumps via [`RtmpSession::next_packet`].
190    pub fn accept(self) -> Result<RtmpSession> {
191        let PublishRequest {
192            app,
193            stream_name,
194            publish_type,
195            peer_addr,
196            tc_url: _,
197            capabilities: _,
198            pending,
199        } = self;
200        let PendingSession {
201            stream,
202            reader,
203            mut writer,
204            stream_id,
205            publish_tx_id: _,
206        } = pending;
207
208        writer.write_message(
209            CSID_PROTOCOL_CONTROL,
210            &build_user_control_stream_begin(stream_id),
211        )?;
212        writer.write_message(
213            CSID_COMMAND,
214            &build_on_status(
215                stream_id,
216                "status",
217                "NetStream.Publish.Start",
218                &format!("Started publishing {stream_name}"),
219            ),
220        )?;
221        writer.flush()?;
222
223        Ok(RtmpSession {
224            stream,
225            reader,
226            writer,
227            app,
228            stream_name,
229            publish_type,
230            peer_addr,
231            stream_id,
232            ended: false,
233            pending_subs: VecDeque::new(),
234        })
235    }
236
237    /// Politely reject the publish: emit `NetStream.Publish.BadName`
238    /// with `reason` as the description, then drop the connection.
239    pub fn reject(self, reason: &str) -> Result<()> {
240        let PublishRequest { pending, .. } = self;
241        let PendingSession {
242            stream,
243            mut writer,
244            stream_id,
245            ..
246        } = pending;
247        let _ = writer.write_message(
248            CSID_COMMAND,
249            &build_on_status(stream_id, "error", "NetStream.Publish.BadName", reason),
250        );
251        let _ = writer.flush();
252        let _ = stream.shutdown(Shutdown::Both);
253        Err(Error::Rejected(reason.to_string()))
254    }
255}
256
257/// Active publish after `accept`. Iterate via [`RtmpSession::next_packet`].
258pub struct RtmpSession {
259    stream: TcpStream,
260    reader: ChunkReader<TcpStream>,
261    writer: ChunkWriter<TcpStream>,
262    app: String,
263    stream_name: String,
264    publish_type: String,
265    peer_addr: SocketAddr,
266    stream_id: u32,
267    ended: bool,
268    /// Sub-messages decomposed out of an Aggregate Message (type 22)
269    /// per RTMP 1.0 §7.1.6 but not yet surfaced as a [`StreamPacket`].
270    /// When [`next_packet`](Self::next_packet) sees a `MSG_AGGREGATE`
271    /// on the wire, [`parse_aggregate`] splits the body into
272    /// FLV-shaped sub-messages (audio / video / data / command) with
273    /// the §7.1.6 timestamp re-normalisation already applied and the
274    /// `msg_stream_id` override resolved to the aggregate's; those
275    /// subs land here and the dispatch loop drains the queue ahead of
276    /// every subsequent wire read so the caller observes the
277    /// per-sub packets in the order the publisher packed them.
278    pending_subs: VecDeque<Message>,
279}
280
281/// One media-layer event reported to the caller.
282#[derive(Debug, Clone)]
283pub enum StreamPacket {
284    Audio {
285        timestamp: u32,
286        tag: AudioTag,
287    },
288    Video {
289        timestamp: u32,
290        tag: VideoTag,
291    },
292    /// `@setDataFrame("onMetaData", <amf0>)`. The AMF0 value is the
293    /// metadata object (usually width, height, codec ids, framerate,
294    /// bitrate, audiodatarate, ...).
295    Metadata(Amf0Value),
296}
297
298impl RtmpSession {
299    pub fn app(&self) -> &str {
300        &self.app
301    }
302    pub fn stream_name(&self) -> &str {
303        &self.stream_name
304    }
305    pub fn publish_type(&self) -> &str {
306        &self.publish_type
307    }
308    pub fn peer_addr(&self) -> SocketAddr {
309        self.peer_addr
310    }
311
312    /// Configure a read timeout on the underlying TCP socket — helpful
313    /// when you want `next_packet` to return periodically so an outer
314    /// shutdown signal can be observed. Passes through to
315    /// [`TcpStream::set_read_timeout`].
316    ///
317    /// The timeout is applied to the chunk reader's actual socket
318    /// clone (the one [`next_packet`](Self::next_packet) reads
319    /// through) rather than the session's bookkeeping clone. On
320    /// Linux a sockopt set through one `try_clone` descriptor carries
321    /// to its sibling clones because they share one file description;
322    /// Windows assigns each clone its own kernel handle with
323    /// independent socket options, so the timeout must be installed
324    /// on the exact socket that will issue the `recv` call.
325    pub fn set_read_timeout(&mut self, d: Option<Duration>) -> Result<()> {
326        self.reader.inner_mut().set_read_timeout(d)?;
327        // Also apply to the bookkeeping clone for any future direct
328        // reads through `self.stream` (none today, but defensive).
329        let _ = self.stream.set_read_timeout(d);
330        Ok(())
331    }
332
333    /// Emit a `UserControl StreamDry(stream_id)` event on the publish
334    /// stream (RTMP 1.0 §3.7, UCM type 2).
335    ///
336    /// Per spec: "the server sends this event to notify the client
337    /// that there is no more data on the stream. If the server does
338    /// not detect any message for a time period, it can notify the
339    /// subscribed clients that the stream is dry." Distinct from
340    /// [`close`](Self::close)'s `StreamEOF`: `StreamDry` is a
341    /// transient "we have nothing right now" signal that may resolve
342    /// when more data arrives, not a teardown.
343    pub fn send_stream_dry(&mut self) -> Result<()> {
344        self.writer.write_message(
345            CSID_PROTOCOL_CONTROL,
346            &build_user_control_stream_dry(self.stream_id),
347        )?;
348        self.writer.flush()?;
349        Ok(())
350    }
351
352    /// Emit a `UserControl StreamIsRecorded(stream_id)` event on the
353    /// publish stream (RTMP 1.0 §3.7, UCM type 4).
354    ///
355    /// Per spec: "the server sends this event to notify the client
356    /// that the stream is a recorded stream." A server fronting an
357    /// archival recorder may want to advertise this after the publish
358    /// handshake settles so a forwarding peer knows the captured
359    /// stream is replayable rather than ephemeral.
360    pub fn send_stream_is_recorded(&mut self) -> Result<()> {
361        self.writer.write_message(
362            CSID_PROTOCOL_CONTROL,
363            &build_user_control_stream_is_recorded(self.stream_id),
364        )?;
365        self.writer.flush()?;
366        Ok(())
367    }
368
369    /// Emit a `UserControl PingRequest(timestamp_ms)` event (RTMP 1.0
370    /// §3.7, UCM type 6).
371    ///
372    /// Per spec, "the server sends this event to test whether the
373    /// client is reachable. Event data is a 4-byte timestamp,
374    /// representing the local server time when the server dispatched
375    /// the command." The client (our [`RtmpClient`]) replies with the
376    /// matching `PingResponse` carrying the same 4 bytes —
377    /// `RtmpClient::poll_event` answers the ping internally without
378    /// surfacing the request to the publisher caller.
379    pub fn send_ping_request(&mut self, timestamp_ms: u32) -> Result<()> {
380        self.writer.write_message(
381            CSID_PROTOCOL_CONTROL,
382            &build_user_control_ping_request(timestamp_ms),
383        )?;
384        self.writer.flush()?;
385        Ok(())
386    }
387
388    /// Ask the publisher to reconnect — Enhanced RTMP v2 §"Reconnect
389    /// Request".
390    ///
391    /// Emits the `onStatus(NetConnection.Connect.ReconnectRequest)`
392    /// NetConnection command (message stream 0, transaction id 0, null
393    /// Command Object). Per the spec's message flow, a server does
394    /// this "prior to the shutdown of the live streaming server or
395    /// when the server intends to remap the client to another server
396    /// instance" — and when remapping, it MUST pass the target via
397    /// `tc_url` (absolute or relative URI reference; `None` tells the
398    /// client to re-dial the tcUrl of the current connection).
399    ///
400    /// After sending, the spec requires the old server to "continue
401    /// processing messages from the client until the client
402    /// disconnects" — so keep pumping
403    /// [`next_packet`](Self::next_packet) as usual; the publisher
404    /// drains up to its next appropriate media boundary (such as a
405    /// keyframe) before it actually moves.
406    ///
407    /// Note: per §"Enhancing NetConnection connect Command" the peer
408    /// advertises reconnect support via the `capsEx`
409    /// [`CAPS_EX_RECONNECT`](crate::caps::CAPS_EX_RECONNECT) bit —
410    /// check [`PublishRequest::capabilities`] before relying on the
411    /// client honouring this event.
412    pub fn send_reconnect_request(
413        &mut self,
414        tc_url: Option<&str>,
415        description: Option<&str>,
416    ) -> Result<()> {
417        self.writer
418            .write_message(CSID_COMMAND, &build_reconnect_request(tc_url, description))?;
419        self.writer.flush()?;
420        Ok(())
421    }
422
423    /// Close the session politely.
424    ///
425    /// On the wire we emit, in order:
426    ///
427    /// 1. A `UserControl StreamEOF(stream_id)` event so the peer's
428    ///    chunk-stream state machine learns the publish is done before
429    ///    it observes the TCP FIN (RTMP 1.0 §7.1.7).
430    /// 2. `onStatus(NetStream.Unpublish.Success)` on the publish stream.
431    /// 3. A chunk-writer `flush()` so every buffered chunk reaches the
432    ///    kernel before the half-close.
433    ///
434    /// Then we send a write-half FIN (`Shutdown::Write`) rather than
435    /// tearing both halves down at once. `Shutdown::Both` instantly
436    /// closes the read half too, which on some platforms makes the
437    /// kernel answer the peer's still-unacked data with a RST and
438    /// discard any A/V messages the peer hasn't yet drained from its
439    /// receive buffer — closeStream / the StreamEOF event / the last
440    /// frames just written can be thrown away mid-stream. A write-half
441    /// FIN lets the peer read everything we just wrote, then observe
442    /// EOF cleanly. The read half closes when `self` (and its owned
443    /// `TcpStream`) drops at end of scope.
444    pub fn close(mut self) -> Result<()> {
445        let _ = self.writer.write_message(
446            CSID_PROTOCOL_CONTROL,
447            &build_user_control_stream_eof(self.stream_id),
448        );
449        let _ = self.writer.write_message(
450            CSID_COMMAND,
451            &build_on_status(
452                self.stream_id,
453                "status",
454                "NetStream.Unpublish.Success",
455                "Stream closed.",
456            ),
457        );
458        let _ = self.writer.flush();
459        let _ = self.stream.shutdown(Shutdown::Write);
460        Ok(())
461    }
462
463    /// Read the next audio / video / metadata packet from the
464    /// publisher. Returns `Ok(None)` when the peer cleanly closed the
465    /// stream (via `closeStream` / `deleteStream` / `FCUnpublish`).
466    ///
467    /// Aggregate Messages (RTMP 1.0 §7.1.6, message type id `22`) are
468    /// decomposed transparently: the sub-messages enter an internal
469    /// queue and the dispatch loop drains them in publish order ahead
470    /// of any further wire read, so a publisher that bundles several
471    /// frames into one aggregate (fewer chunk headers on the wire)
472    /// surfaces the same per-frame `StreamPacket` sequence as a
473    /// publisher that sends them individually.
474    pub fn next_packet(&mut self) -> Result<Option<StreamPacket>> {
475        while !self.ended {
476            // Drain queued aggregate sub-messages ahead of any further
477            // wire read so the publisher's pack order is preserved.
478            if let Some(sub) = self.pending_subs.pop_front() {
479                if let Some(pkt) = self.handle_message(sub)? {
480                    return Ok(Some(pkt));
481                }
482                continue;
483            }
484            let msg = match self.reader.read_message() {
485                Ok(m) => m,
486                Err(Error::Io(e))
487                    if matches!(
488                        e.kind(),
489                        std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionReset
490                    ) =>
491                {
492                    return Ok(None);
493                }
494                Err(e) => return Err(e),
495            };
496            // §5.3: once the publisher has sent a full window of bytes,
497            // owe it an Acknowledgement carrying the running sequence
498            // number. Send before dispatching so the ack reflects the
499            // bytes through this message.
500            self.maybe_send_ack()?;
501            if let Some(pkt) = self.handle_message(msg)? {
502                return Ok(Some(pkt));
503            }
504        }
505        Ok(None)
506    }
507
508    /// Emit a §5.3 Acknowledgement if the reader's received-byte count
509    /// has crossed the peer-negotiated §5.5 window since the last one.
510    /// No-op until a window has been negotiated (`Window Acknowledgement
511    /// Size` / `Set Peer Bandwidth` from the publisher).
512    fn maybe_send_ack(&mut self) -> Result<()> {
513        if let Some(seq) = self.reader.ack_due() {
514            self.writer
515                .write_message(CSID_PROTOCOL_CONTROL, &build_ack(seq))?;
516            self.writer.flush()?;
517        }
518        Ok(())
519    }
520
521    /// Per-message dispatch shared between the wire path and the
522    /// aggregate-sub-drain path. Returns `Ok(Some(packet))` if the
523    /// message produced a user-visible event, `Ok(None)` if it was
524    /// consumed silently (protocol control, command teardown setting
525    /// `self.ended`, etc.) and the loop should keep reading.
526    fn handle_message(&mut self, msg: Message) -> Result<Option<StreamPacket>> {
527        match msg.msg_type_id {
528            MSG_AUDIO => {
529                let tag = parse_audio(&msg.payload)?;
530                Ok(Some(StreamPacket::Audio {
531                    timestamp: msg.timestamp,
532                    tag,
533                }))
534            }
535            MSG_VIDEO => {
536                let tag = parse_video(&msg.payload)?;
537                Ok(Some(StreamPacket::Video {
538                    timestamp: msg.timestamp,
539                    tag,
540                }))
541            }
542            MSG_DATA_AMF0 => {
543                // @setDataFrame + onMetaData + <object>
544                let values = amf::decode_all(&msg.payload)?;
545                // Common shape: ["@setDataFrame", "onMetaData",
546                // <meta>]. Some clients omit "@setDataFrame" and
547                // just send ["onMetaData", <meta>]. Accept both.
548                Ok(metadata_object(&values).map(StreamPacket::Metadata))
549            }
550            MSG_DATA_AMF3 => {
551                // AMF3-encoded data message (type 15). Per AMF3 §4.1
552                // the body is an AMF0 frame switching to AMF3 via the
553                // avmplus marker; decode it and bridge each value onto
554                // the AMF0 shape so metadata flows through the same
555                // path as MSG_DATA_AMF0.
556                let values: Vec<Amf0Value> = amf3::decode_data_message(&msg.payload)?
557                    .iter()
558                    .map(amf3::Amf3Value::to_amf0)
559                    .collect();
560                Ok(metadata_object(&values).map(StreamPacket::Metadata))
561            }
562            MSG_COMMAND_AMF0 => {
563                // Likely closeStream / deleteStream /
564                // FCUnpublish — peer is shutting down.
565                let values = amf::decode_all(&msg.payload)?;
566                if let Some(name) = values.first().and_then(Amf0Value::as_str) {
567                    if matches!(name, "closeStream" | "deleteStream" | "FCUnpublish") {
568                        self.ended = true;
569                    }
570                }
571                Ok(None)
572            }
573            MSG_COMMAND_AMF3 => {
574                // AMF3-encoded command (type 17). Same teardown
575                // detection as the AMF0 command path.
576                let values: Vec<Amf0Value> = amf3::decode_data_message(&msg.payload)?
577                    .iter()
578                    .map(amf3::Amf3Value::to_amf0)
579                    .collect();
580                if let Some(name) = values.first().and_then(Amf0Value::as_str) {
581                    if matches!(name, "closeStream" | "deleteStream" | "FCUnpublish") {
582                        self.ended = true;
583                    }
584                }
585                Ok(None)
586            }
587            MSG_AGGREGATE => {
588                // RTMP 1.0 §7.1.6 Aggregate Message. Split into
589                // FLV-shaped sub-messages with the §7.1.6 timestamp
590                // re-normalisation applied and the message-stream-id
591                // override resolved; queue them so subsequent calls
592                // surface the per-sub packets in publish order. Sub
593                // ordering is preserved verbatim. A nested aggregate
594                // (sub `msg_type_id == 22`) is forwarded to the queue
595                // and the next dispatch tick recurses through the same
596                // `MSG_AGGREGATE` arm so a bounded depth of nesting
597                // resolves transparently; an unbounded chain would
598                // surface as repeated parser work, not stack growth.
599                let subs = parse_aggregate(&msg)?;
600                self.pending_subs.extend(subs);
601                Ok(None)
602            }
603            MSG_SET_CHUNK_SIZE => {
604                let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
605                self.reader.set_chunk_size(size as usize);
606                Ok(None)
607            }
608            MSG_WINDOW_ACK_SIZE => {
609                // §5.5: the peer is telling us which window size to use
610                // when sending Acknowledgements. Honour it so our §5.3
611                // ack cadence matches what the publisher expects.
612                let size = read_u32_be(&msg.payload)?;
613                self.reader.set_window_ack_size(size);
614                Ok(None)
615            }
616            MSG_SET_PEER_BANDWIDTH => {
617                // §5.6: "The output bandwidth value is the same as the
618                // window size for the peer." The first 4 bytes carry
619                // that window size; adopt it as our send-side ack
620                // window too. (The trailing Limit type byte is
621                // advisory and doesn't change our framing.)
622                if msg.payload.len() >= 4 {
623                    let size = read_u32_be(&msg.payload[..4])?;
624                    self.reader.set_window_ack_size(size);
625                }
626                Ok(None)
627            }
628            MSG_ACK | MSG_USER_CONTROL => {
629                // Informational — the peer's §5.3 sequence number (ACK)
630                // or a user-control event we don't surface as a packet.
631                Ok(None)
632            }
633            _ => {
634                // Unknown / unhandled — swallow and keep going.
635                Ok(None)
636            }
637        }
638    }
639}
640
641// ---------------------------------------------------------------------------
642// Protocol driver: handshake → connect → createStream → publish
643// ---------------------------------------------------------------------------
644
645fn drive_until_publish(
646    stream: TcpStream,
647    peer_addr: SocketAddr,
648    server_caps: &ConnectCapabilities,
649) -> Result<PublishRequest> {
650    // TCP-level defaults: nodelay (RTMP is command-heavy during setup),
651    // keepalive so idle publishers are detected.
652    let _ = stream.set_nodelay(true);
653
654    // Run the handshake on a plain clone of the stream (no chunk state
655    // yet).
656    let mut hs_stream = stream.try_clone()?;
657    crate::handshake::server_handshake(&mut hs_stream)?;
658
659    // Reader / writer share the same TCP stream via `try_clone`.
660    let reader_stream = stream.try_clone()?;
661    let writer_stream = stream.try_clone()?;
662    let mut reader = ChunkReader::new(reader_stream);
663    let mut writer = ChunkWriter::new(writer_stream);
664
665    // Wait for connect. These get populated when we see the
666    // `connect` command below.
667    let tc_url;
668    let app;
669    let client_capabilities;
670    loop {
671        let msg = reader.read_message()?;
672        match msg.msg_type_id {
673            MSG_SET_CHUNK_SIZE => {
674                let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
675                reader.set_chunk_size(size as usize);
676            }
677            MSG_WINDOW_ACK_SIZE => {
678                let size = read_u32_be(&msg.payload)?;
679                reader.set_window_ack_size(size);
680            }
681            MSG_SET_PEER_BANDWIDTH if msg.payload.len() >= 4 => {
682                let size = read_u32_be(&msg.payload[..4])?;
683                reader.set_window_ack_size(size);
684            }
685            MSG_COMMAND_AMF0 => {
686                let values = amf::decode_all(&msg.payload)?;
687                let name = values
688                    .first()
689                    .and_then(Amf0Value::as_str)
690                    .ok_or_else(|| Error::InvalidCommand("missing command name".into()))?;
691                if name != "connect" {
692                    return Err(Error::InvalidCommand(format!(
693                        "expected `connect` first, got `{name}`"
694                    )));
695                }
696                let tx_id = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(1.0);
697                let cmd_obj = values.get(2).ok_or_else(|| {
698                    Error::InvalidCommand("`connect` missing command object".into())
699                })?;
700                tc_url = cmd_obj
701                    .get("tcUrl")
702                    .and_then(Amf0Value::as_str)
703                    .unwrap_or("")
704                    .to_owned();
705                app = cmd_obj
706                    .get("app")
707                    .and_then(Amf0Value::as_str)
708                    .unwrap_or("")
709                    .to_owned();
710                // Lift Enhanced RTMP v1+v2 capability advertisement out
711                // of the Command Object. Legacy publishers leave this
712                // empty.
713                client_capabilities = ConnectCapabilities::from_amf0(cmd_obj);
714
715                // Reply: WindowAckSize + SetPeerBandwidth + StreamBegin
716                // + _result + SetChunkSize. Order matches what most
717                // commodity ingest servers send. The server's own
718                // capability advertisement rides inside the _result
719                // info object — see `build_connect_result_with_caps`.
720                writer.write_message(
721                    CSID_PROTOCOL_CONTROL,
722                    &build_window_ack_size(WINDOW_ACK_SIZE),
723                )?;
724                writer.write_message(
725                    CSID_PROTOCOL_CONTROL,
726                    &build_set_peer_bandwidth(WINDOW_ACK_SIZE, PEER_BW_LIMIT_DYNAMIC),
727                )?;
728                writer.write_message(CSID_PROTOCOL_CONTROL, &build_user_control_stream_begin(0))?;
729                writer.write_message(
730                    CSID_COMMAND,
731                    &build_connect_result_with_caps(tx_id, server_caps),
732                )?;
733                writer.write_message(
734                    CSID_PROTOCOL_CONTROL,
735                    &build_set_chunk_size(SERVER_CHUNK_SIZE),
736                )?;
737                writer.set_chunk_size(SERVER_CHUNK_SIZE as usize);
738                writer.flush()?;
739                break;
740            }
741            _ => {
742                // Silently accept other pre-connect messages (usually
743                // nothing but SetChunkSize).
744            }
745        }
746    }
747
748    // Handle releaseStream / FCPublish / createStream / publish until
749    // we see publish.
750    let mut next_stream_id: u32 = 1;
751    loop {
752        let msg = reader.read_message()?;
753        match msg.msg_type_id {
754            MSG_SET_CHUNK_SIZE => {
755                let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
756                reader.set_chunk_size(size as usize);
757                continue;
758            }
759            MSG_WINDOW_ACK_SIZE => {
760                let size = read_u32_be(&msg.payload)?;
761                reader.set_window_ack_size(size);
762                continue;
763            }
764            MSG_SET_PEER_BANDWIDTH if msg.payload.len() >= 4 => {
765                let size = read_u32_be(&msg.payload[..4])?;
766                reader.set_window_ack_size(size);
767                continue;
768            }
769            MSG_COMMAND_AMF0 => {
770                let values = amf::decode_all(&msg.payload)?;
771                let name = values
772                    .first()
773                    .and_then(Amf0Value::as_str)
774                    .ok_or_else(|| Error::InvalidCommand("missing command name".into()))?
775                    .to_owned();
776                let tx_id = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(0.0);
777                match name.as_str() {
778                    "releaseStream" | "FCPublish" => {
779                        // Many peers want a _result back; send a minimal
780                        // one. Arg slot [3] is the stream name we can
781                        // echo.
782                        let payload = amf::encode_command(
783                            "_result",
784                            tx_id,
785                            Amf0Value::Null,
786                            &[Amf0Value::Undefined],
787                        );
788                        let reply = Message {
789                            msg_type_id: MSG_COMMAND_AMF0,
790                            msg_stream_id: 0,
791                            timestamp: 0,
792                            payload,
793                        };
794                        writer.write_message(CSID_COMMAND, &reply)?;
795                        writer.flush()?;
796                    }
797                    "createStream" => {
798                        let sid = next_stream_id;
799                        next_stream_id += 1;
800                        writer.write_message(
801                            CSID_COMMAND,
802                            &build_create_stream_result(tx_id, sid as f64),
803                        )?;
804                        writer.flush()?;
805                    }
806                    "publish" => {
807                        // Args: [stream_name, publish_type].
808                        let stream_name = values
809                            .get(3)
810                            .and_then(Amf0Value::as_str)
811                            .ok_or_else(|| {
812                                Error::InvalidCommand("publish missing stream_name".into())
813                            })?
814                            .to_owned();
815                        let publish_type = values
816                            .get(4)
817                            .and_then(Amf0Value::as_str)
818                            .unwrap_or("live")
819                            .to_owned();
820                        return Ok(PublishRequest {
821                            app,
822                            stream_name,
823                            publish_type,
824                            peer_addr,
825                            tc_url,
826                            capabilities: client_capabilities,
827                            pending: PendingSession {
828                                stream,
829                                reader,
830                                writer,
831                                stream_id: msg.msg_stream_id.max(1),
832                                publish_tx_id: tx_id,
833                            },
834                        });
835                    }
836                    _ => {
837                        // Unknown command — keep listening.
838                    }
839                }
840            }
841            _ => {
842                // Ignore audio / video / data / control messages
843                // arriving before publish — not strictly legal but
844                // seen in the wild.
845            }
846        }
847    }
848}
849
850/// Pull the metadata object out of a decoded data-message value list.
851///
852/// `@setDataFrame("onMetaData", <meta>)` is the standard publish shape;
853/// some clients omit the leading `@setDataFrame` and send just
854/// `["onMetaData", <meta>]`. Either way the payload object is the last
855/// Object / ECMA-array value in the list, so search from the back.
856fn metadata_object(values: &[Amf0Value]) -> Option<Amf0Value> {
857    values
858        .iter()
859        .rev()
860        .find(|v| matches!(v, Amf0Value::Object(_) | Amf0Value::EcmaArray(_)))
861        .cloned()
862}
863
864fn read_u32_be(buf: &[u8]) -> Result<u32> {
865    if buf.len() < 4 {
866        return Err(Error::ProtocolViolation("need 4 bytes for u32be".into()));
867    }
868    Ok(u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]))
869}
870
871/// Free TCP-level helper for `stream`-owner code to read pending
872/// writes synchronously.
873#[allow(dead_code)]
874fn flush_writer<W: Write>(w: &mut W) -> Result<()> {
875    w.flush()?;
876    Ok(())
877}
878
879#[allow(dead_code)]
880fn read_exact<R: Read>(r: &mut R, n: usize) -> Result<Vec<u8>> {
881    let mut buf = vec![0u8; n];
882    r.read_exact(&mut buf)?;
883    Ok(buf)
884}