Skip to main content

arcly_stream/protocol/rtmp/
relay.rs

1//! RTMP **egress relay** — push a live stream to an upstream RTMP server.
2//!
3//! The client counterpart to the ingest handler: [`RtmpRelay`] dials an
4//! `rtmp://host[:port]/app/stream` URL, performs the client handshake, runs the
5//! `connect` → `createStream` → `publish` command exchange, then subscribes to a
6//! local stream and forwards every H.264/AAC frame as FLV tags over RTMP. Used
7//! to republish to a CDN ingest or another media server.
8//!
9//! ```no_run
10//! # use std::sync::Arc;
11//! # use arcly_stream::{Engine, StreamKey};
12//! # use arcly_stream::protocol::rtmp::RtmpRelay;
13//! # async fn run(engine: Arc<Engine>) -> arcly_stream::Result<()> {
14//! let relay = RtmpRelay::new(engine.clone(), "rtmp://cdn.example.com/live/backup")?;
15//! relay
16//!     .run(StreamKey::new("live", "cam"), tokio_util::sync::CancellationToken::new())
17//!     .await
18//! # }
19//! ```
20
21use std::ops::ControlFlow;
22use std::sync::Arc;
23use std::time::Duration;
24
25use tokio::io::AsyncWrite;
26use tokio::net::TcpStream;
27use tokio_util::sync::CancellationToken;
28use tracing::{debug, info, warn};
29
30use super::amf::{self, Amf0Value};
31use super::chunk::{ChunkReader, ChunkWriter};
32use super::{
33    annexb_to_avc_config, flv, CSID_AUDIO, CSID_COMMAND, CSID_CONTROL, CSID_VIDEO, MSG_AUDIO,
34    MSG_COMMAND_AMF0, MSG_SET_CHUNK_SIZE, MSG_VIDEO, OUT_CHUNK_SIZE,
35};
36use crate::bus::PlaybackRegistry;
37use crate::{MediaFrame, Result, StreamError, StreamKey};
38
39/// How long the setup phase (connect + command exchange) may take before the
40/// relay gives up — keeps a dead/blackhole upstream from wedging the task.
41const SETUP_TIMEOUT: Duration = Duration::from_secs(10);
42
43/// Pushes a local stream to an upstream RTMP server.
44pub struct RtmpRelay {
45    playback: Arc<dyn PlaybackRegistry>,
46    host: String,
47    port: u16,
48    app: String,
49    stream: String,
50}
51
52impl RtmpRelay {
53    /// Build a relay that reads from `playback` and pushes to `url`
54    /// (`rtmp://host[:port]/app/stream`).
55    pub fn new(playback: Arc<dyn PlaybackRegistry>, url: &str) -> Result<Self> {
56        let (authority, path) = crate::protocol::url::split_scheme(url, "rtmp://")
57            .ok_or_else(|| StreamError::protocol("relay url must be rtmp://host/app/stream"))?;
58        let (host, port) = crate::protocol::url::parse_authority(authority, 1935)?;
59        let (app, stream) = path
60            .split_once('/')
61            .ok_or_else(|| StreamError::protocol("relay url needs both app and stream"))?;
62        if app.is_empty() || stream.is_empty() {
63            return Err(StreamError::protocol("incomplete rtmp relay url"));
64        }
65        Ok(Self {
66            playback,
67            host,
68            port,
69            app: app.to_string(),
70            stream: stream.to_string(),
71        })
72    }
73
74    /// Connect, publish, and forward `key`'s frames until the stream ends or
75    /// `shutdown` fires.
76    pub async fn run(self, key: StreamKey, shutdown: CancellationToken) -> Result<()> {
77        let addr = format!("{}:{}", self.host, self.port);
78        let mut tcp = TcpStream::connect(&addr).await?;
79        super::handshake::initiate(&mut tcp).await?;
80        let (rd, wr) = tcp.into_split();
81        let mut reader = ChunkReader::new(rd);
82        let mut writer = ChunkWriter::new(wr);
83
84        // Setup is bounded so a silent upstream can't hang the relay forever.
85        let stream_id = tokio::time::timeout(SETUP_TIMEOUT, async {
86            // Announce our outbound chunk size.
87            writer
88                .write_message(
89                    CSID_CONTROL,
90                    MSG_SET_CHUNK_SIZE,
91                    0,
92                    0,
93                    &(OUT_CHUNK_SIZE as u32).to_be_bytes(),
94                )
95                .await?;
96            writer.set_chunk_size(OUT_CHUNK_SIZE);
97
98            let tc_url = format!("rtmp://{addr}/{}", self.app);
99            send_command(
100                &mut writer,
101                0,
102                &[
103                    amf::string("connect"),
104                    Amf0Value::Number(1.0),
105                    amf::object(vec![
106                        ("app", amf::string(self.app.as_str())),
107                        ("type", amf::string("nonprivate")),
108                        ("flashVer", amf::string("FMLE/3.0 (compatible; arcly)")),
109                        ("tcUrl", amf::string(tc_url.as_str())),
110                    ]),
111                ],
112            )
113            .await?;
114            read_until_command(&mut reader, "_result").await?;
115
116            send_command(
117                &mut writer,
118                0,
119                &[
120                    amf::string("createStream"),
121                    Amf0Value::Number(2.0),
122                    Amf0Value::Null,
123                ],
124            )
125            .await?;
126            let values = read_until_command(&mut reader, "_result").await?;
127            // createStream's result carries the assigned stream id as its 4th value.
128            let stream_id = values
129                .get(3)
130                .and_then(Amf0Value::as_number)
131                .map(|n| n as u32)
132                .unwrap_or(super::STREAM_ID);
133
134            send_command(
135                &mut writer,
136                stream_id,
137                &[
138                    amf::string("publish"),
139                    Amf0Value::Number(3.0),
140                    Amf0Value::Null,
141                    amf::string(self.stream.as_str()),
142                    amf::string("live"),
143                ],
144            )
145            .await?;
146            Ok::<u32, StreamError>(stream_id)
147        })
148        .await
149        .map_err(|_| StreamError::protocol("rtmp relay setup timed out"))??;
150
151        info!(%addr, app = %self.app, stream = %self.stream, stream_id, "rtmp relay publishing");
152
153        // Forward media: instant-start replay (configs + GOP), then live frames.
154        let handle = self.playback.get_stream(&key)?;
155        let mut sink = RtmpRelaySink {
156            writer: &mut writer,
157            stream_id,
158            audio_seq_sent: false,
159        };
160        handle.drive_to(&shutdown, &mut sink).await?;
161        debug!(%addr, "rtmp relay finished");
162        Ok(())
163    }
164}
165
166/// Forwards a stream's frames to one upstream RTMP server as FLV tags.
167struct RtmpRelaySink<'a, W: AsyncWrite + Unpin> {
168    writer: &'a mut ChunkWriter<W>,
169    stream_id: u32,
170    audio_seq_sent: bool,
171}
172
173#[async_trait::async_trait]
174impl<W: AsyncWrite + Unpin + Send> crate::bus::FrameSink for RtmpRelaySink<'_, W> {
175    async fn send(&mut self, frame: Arc<MediaFrame>) -> Result<std::ops::ControlFlow<()>> {
176        write_media(
177            self.writer,
178            &frame,
179            self.stream_id,
180            &mut self.audio_seq_sent,
181        )
182        .await?;
183        Ok(ControlFlow::Continue(()))
184    }
185}
186
187/// Encode and send an AMF0 command message on the command chunk stream.
188async fn send_command<W: AsyncWrite + Unpin>(
189    writer: &mut ChunkWriter<W>,
190    msg_stream_id: u32,
191    values: &[Amf0Value],
192) -> Result<()> {
193    let mut buf = bytes::BytesMut::new();
194    for v in values {
195        v.encode(&mut buf);
196    }
197    writer
198        .write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, msg_stream_id, &buf)
199        .await
200}
201
202/// Read messages until an AMF0 command whose name matches `want` arrives,
203/// honoring the server's Set Chunk Size and ignoring protocol-control chatter.
204/// Returns the matched command's decoded values.
205async fn read_until_command<R: tokio::io::AsyncRead + Unpin>(
206    reader: &mut ChunkReader<R>,
207    want: &str,
208) -> Result<Vec<Amf0Value>> {
209    for _ in 0..32 {
210        let msg = reader.read_message().await?;
211        match msg.type_id {
212            MSG_SET_CHUNK_SIZE => {
213                if let Some(b) = msg.payload.get(..4) {
214                    let size = u32::from_be_bytes([b[0], b[1], b[2], b[3]]) as usize;
215                    reader.set_chunk_size(size.max(1));
216                }
217            }
218            MSG_COMMAND_AMF0 => {
219                let values = amf::decode_all(&msg.payload);
220                match values.first().and_then(Amf0Value::as_str) {
221                    Some(name) if name == want => return Ok(values),
222                    Some("_error") => {
223                        return Err(StreamError::protocol("rtmp relay rejected by upstream"))
224                    }
225                    _ => {}
226                }
227            }
228            _ => {} // window-ack-size, set-peer-bandwidth, user-control: ignore
229        }
230    }
231    warn!(want, "rtmp relay: expected command not seen");
232    Err(StreamError::protocol(
233        "rtmp relay: upstream handshake stalled",
234    ))
235}
236
237/// Frame → FLV tag → RTMP audio/video message (the egress framing the play path
238/// uses, applied to the relay's outbound connection).
239async fn write_media<W: AsyncWrite + Unpin>(
240    writer: &mut ChunkWriter<W>,
241    frame: &MediaFrame,
242    stream_id: u32,
243    audio_seq_sent: &mut bool,
244) -> Result<()> {
245    let is_config = frame.flags.contains(crate::FrameFlags::CONFIG);
246    match frame.codec {
247        crate::CodecId::H264 => {
248            let ts = frame.dts.max(0) as u32;
249            let tag = if is_config {
250                let cfg = annexb_to_avc_config(&frame.data);
251                flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &cfg.to_avc_record())
252            } else {
253                let avcc = crate::codec::h264::annexb_to_avcc(&frame.data);
254                let cts = (frame.pts - frame.dts) as i32;
255                flv::build_video_tag(frame.is_keyframe(), flv::PKT_RAW, cts, &avcc)
256            };
257            writer
258                .write_message(CSID_VIDEO, MSG_VIDEO, ts, stream_id, &tag)
259                .await
260        }
261        crate::CodecId::AAC => {
262            let ts = frame.pts.max(0) as u32;
263            if is_config {
264                *audio_seq_sent = true;
265                let tag = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &frame.data);
266                return writer
267                    .write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &tag)
268                    .await;
269            }
270            if !*audio_seq_sent {
271                if let Some(cfg) = flv::AudioConfig::from_adts(&frame.data) {
272                    let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &cfg.to_asc());
273                    writer
274                        .write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &seq)
275                        .await?;
276                    *audio_seq_sent = true;
277                }
278            }
279            let raw = frame.data.get(7..).unwrap_or(&[]);
280            let tag = flv::build_audio_tag(flv::PKT_RAW, raw);
281            writer
282                .write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &tag)
283                .await
284        }
285        _ => Ok(()), // only H.264 + AAC are carried over RTMP
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use crate::Engine;
293
294    fn relay(url: &str) -> Result<RtmpRelay> {
295        let engine = Engine::builder()
296            .application(crate::AppSpec::new("live"))
297            .build();
298        RtmpRelay::new(engine, url)
299    }
300
301    #[test]
302    fn parses_rtmp_url_with_and_without_port() {
303        let r = relay("rtmp://cdn.example.com/live/backup").unwrap();
304        assert_eq!(
305            (r.host.as_str(), r.port, r.app.as_str(), r.stream.as_str()),
306            ("cdn.example.com", 1935, "live", "backup")
307        );
308        let r = relay("rtmp://10.0.0.5:1936/app/key123").unwrap();
309        assert_eq!(
310            (r.host.as_str(), r.port, r.app.as_str(), r.stream.as_str()),
311            ("10.0.0.5", 1936, "app", "key123")
312        );
313    }
314
315    #[test]
316    fn rejects_malformed_urls() {
317        assert!(relay("http://x/live/s").is_err());
318        assert!(relay("rtmp://hostonly").is_err());
319        assert!(relay("rtmp://host/liveonly").is_err());
320    }
321}