arcly-stream 0.1.7

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! RTMP **egress relay** — push a live stream to an upstream RTMP server.
//!
//! The client counterpart to the ingest handler: [`RtmpRelay`] dials an
//! `rtmp://host[:port]/app/stream` URL, performs the client handshake, runs the
//! `connect` → `createStream` → `publish` command exchange, then subscribes to a
//! local stream and forwards every H.264/AAC frame as FLV tags over RTMP. Used
//! to republish to a CDN ingest or another media server.
//!
//! ```no_run
//! # use std::sync::Arc;
//! # use arcly_stream::{Engine, StreamKey};
//! # use arcly_stream::protocol::rtmp::RtmpRelay;
//! # async fn run(engine: Arc<Engine>) -> arcly_stream::Result<()> {
//! let relay = RtmpRelay::new(engine.clone(), "rtmp://cdn.example.com/live/backup")?;
//! relay
//!     .run(StreamKey::new("live", "cam"), tokio_util::sync::CancellationToken::new())
//!     .await
//! # }
//! ```

use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Duration;

use tokio::io::AsyncWrite;
use tokio::net::TcpStream;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};

use super::amf::{self, Amf0Value};
use super::chunk::{ChunkReader, ChunkWriter};
use super::{
    annexb_to_avc_config, flv, CSID_AUDIO, CSID_COMMAND, CSID_CONTROL, CSID_VIDEO, MSG_AUDIO,
    MSG_COMMAND_AMF0, MSG_SET_CHUNK_SIZE, MSG_VIDEO, OUT_CHUNK_SIZE,
};
use crate::bus::PlaybackRegistry;
use crate::{MediaFrame, Result, StreamError, StreamKey};

/// How long the setup phase (connect + command exchange) may take before the
/// relay gives up — keeps a dead/blackhole upstream from wedging the task.
const SETUP_TIMEOUT: Duration = Duration::from_secs(10);

/// Pushes a local stream to an upstream RTMP server.
pub struct RtmpRelay {
    playback: Arc<dyn PlaybackRegistry>,
    host: String,
    port: u16,
    app: String,
    stream: String,
}

impl RtmpRelay {
    /// Build a relay that reads from `playback` and pushes to `url`
    /// (`rtmp://host[:port]/app/stream`).
    pub fn new(playback: Arc<dyn PlaybackRegistry>, url: &str) -> Result<Self> {
        let (authority, path) = crate::protocol::url::split_scheme(url, "rtmp://")
            .ok_or_else(|| StreamError::protocol("relay url must be rtmp://host/app/stream"))?;
        let (host, port) = crate::protocol::url::parse_authority(authority, 1935)?;
        let (app, stream) = path
            .split_once('/')
            .ok_or_else(|| StreamError::protocol("relay url needs both app and stream"))?;
        if app.is_empty() || stream.is_empty() {
            return Err(StreamError::protocol("incomplete rtmp relay url"));
        }
        Ok(Self {
            playback,
            host,
            port,
            app: app.to_string(),
            stream: stream.to_string(),
        })
    }

    /// Connect, publish, and forward `key`'s frames until the stream ends or
    /// `shutdown` fires.
    pub async fn run(self, key: StreamKey, shutdown: CancellationToken) -> Result<()> {
        let addr = format!("{}:{}", self.host, self.port);
        let mut tcp = TcpStream::connect(&addr).await?;
        super::handshake::initiate(&mut tcp).await?;
        let (rd, wr) = tcp.into_split();
        let mut reader = ChunkReader::new(rd);
        let mut writer = ChunkWriter::new(wr);

        // Setup is bounded so a silent upstream can't hang the relay forever.
        let stream_id = tokio::time::timeout(SETUP_TIMEOUT, async {
            // Announce our outbound chunk size.
            writer
                .write_message(
                    CSID_CONTROL,
                    MSG_SET_CHUNK_SIZE,
                    0,
                    0,
                    &(OUT_CHUNK_SIZE as u32).to_be_bytes(),
                )
                .await?;
            writer.set_chunk_size(OUT_CHUNK_SIZE);

            let tc_url = format!("rtmp://{addr}/{}", self.app);
            send_command(
                &mut writer,
                0,
                &[
                    amf::string("connect"),
                    Amf0Value::Number(1.0),
                    amf::object(vec![
                        ("app", amf::string(self.app.as_str())),
                        ("type", amf::string("nonprivate")),
                        ("flashVer", amf::string("FMLE/3.0 (compatible; arcly)")),
                        ("tcUrl", amf::string(tc_url.as_str())),
                    ]),
                ],
            )
            .await?;
            read_until_command(&mut reader, "_result").await?;

            send_command(
                &mut writer,
                0,
                &[
                    amf::string("createStream"),
                    Amf0Value::Number(2.0),
                    Amf0Value::Null,
                ],
            )
            .await?;
            let values = read_until_command(&mut reader, "_result").await?;
            // createStream's result carries the assigned stream id as its 4th value.
            let stream_id = values
                .get(3)
                .and_then(Amf0Value::as_number)
                .map(|n| n as u32)
                .unwrap_or(super::STREAM_ID);

            send_command(
                &mut writer,
                stream_id,
                &[
                    amf::string("publish"),
                    Amf0Value::Number(3.0),
                    Amf0Value::Null,
                    amf::string(self.stream.as_str()),
                    amf::string("live"),
                ],
            )
            .await?;
            Ok::<u32, StreamError>(stream_id)
        })
        .await
        .map_err(|_| StreamError::protocol("rtmp relay setup timed out"))??;

        info!(%addr, app = %self.app, stream = %self.stream, stream_id, "rtmp relay publishing");

        // Forward media: instant-start replay (configs + GOP), then live frames.
        let handle = self.playback.get_stream(&key)?;
        let mut sink = RtmpRelaySink {
            writer: &mut writer,
            stream_id,
            audio_seq_sent: false,
        };
        handle.drive_to(&shutdown, &mut sink).await?;
        debug!(%addr, "rtmp relay finished");
        Ok(())
    }
}

/// Forwards a stream's frames to one upstream RTMP server as FLV tags.
struct RtmpRelaySink<'a, W: AsyncWrite + Unpin> {
    writer: &'a mut ChunkWriter<W>,
    stream_id: u32,
    audio_seq_sent: bool,
}

#[async_trait::async_trait]
impl<W: AsyncWrite + Unpin + Send> crate::bus::FrameSink for RtmpRelaySink<'_, W> {
    async fn send(&mut self, frame: Arc<MediaFrame>) -> Result<std::ops::ControlFlow<()>> {
        write_media(
            self.writer,
            &frame,
            self.stream_id,
            &mut self.audio_seq_sent,
        )
        .await?;
        Ok(ControlFlow::Continue(()))
    }
}

/// Encode and send an AMF0 command message on the command chunk stream.
async fn send_command<W: AsyncWrite + Unpin>(
    writer: &mut ChunkWriter<W>,
    msg_stream_id: u32,
    values: &[Amf0Value],
) -> Result<()> {
    let mut buf = bytes::BytesMut::new();
    for v in values {
        v.encode(&mut buf);
    }
    writer
        .write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, msg_stream_id, &buf)
        .await
}

/// Read messages until an AMF0 command whose name matches `want` arrives,
/// honoring the server's Set Chunk Size and ignoring protocol-control chatter.
/// Returns the matched command's decoded values.
async fn read_until_command<R: tokio::io::AsyncRead + Unpin>(
    reader: &mut ChunkReader<R>,
    want: &str,
) -> Result<Vec<Amf0Value>> {
    for _ in 0..32 {
        let msg = reader.read_message().await?;
        match msg.type_id {
            MSG_SET_CHUNK_SIZE => {
                if let Some(b) = msg.payload.get(..4) {
                    let size = u32::from_be_bytes([b[0], b[1], b[2], b[3]]) as usize;
                    reader.set_chunk_size(size.max(1));
                }
            }
            MSG_COMMAND_AMF0 => {
                let values = amf::decode_all(&msg.payload);
                match values.first().and_then(Amf0Value::as_str) {
                    Some(name) if name == want => return Ok(values),
                    Some("_error") => {
                        return Err(StreamError::protocol("rtmp relay rejected by upstream"))
                    }
                    _ => {}
                }
            }
            _ => {} // window-ack-size, set-peer-bandwidth, user-control: ignore
        }
    }
    warn!(want, "rtmp relay: expected command not seen");
    Err(StreamError::protocol(
        "rtmp relay: upstream handshake stalled",
    ))
}

/// Frame → FLV tag → RTMP audio/video message (the egress framing the play path
/// uses, applied to the relay's outbound connection).
async fn write_media<W: AsyncWrite + Unpin>(
    writer: &mut ChunkWriter<W>,
    frame: &MediaFrame,
    stream_id: u32,
    audio_seq_sent: &mut bool,
) -> Result<()> {
    let is_config = frame.flags.contains(crate::FrameFlags::CONFIG);
    match frame.codec {
        crate::CodecId::H264 => {
            let ts = frame.dts.max(0) as u32;
            let tag = if is_config {
                let cfg = annexb_to_avc_config(&frame.data);
                flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &cfg.to_avc_record())
            } else {
                let avcc = crate::codec::h264::annexb_to_avcc(&frame.data);
                let cts = (frame.pts - frame.dts) as i32;
                flv::build_video_tag(frame.is_keyframe(), flv::PKT_RAW, cts, &avcc)
            };
            writer
                .write_message(CSID_VIDEO, MSG_VIDEO, ts, stream_id, &tag)
                .await
        }
        crate::CodecId::AAC => {
            let ts = frame.pts.max(0) as u32;
            if is_config {
                *audio_seq_sent = true;
                let tag = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &frame.data);
                return writer
                    .write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &tag)
                    .await;
            }
            if !*audio_seq_sent {
                if let Some(cfg) = flv::AudioConfig::from_adts(&frame.data) {
                    let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &cfg.to_asc());
                    writer
                        .write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &seq)
                        .await?;
                    *audio_seq_sent = true;
                }
            }
            let raw = frame.data.get(7..).unwrap_or(&[]);
            let tag = flv::build_audio_tag(flv::PKT_RAW, raw);
            writer
                .write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &tag)
                .await
        }
        _ => Ok(()), // only H.264 + AAC are carried over RTMP
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::Engine;

    fn relay(url: &str) -> Result<RtmpRelay> {
        let engine = Engine::builder()
            .application(crate::AppSpec::new("live"))
            .build();
        RtmpRelay::new(engine, url)
    }

    #[test]
    fn parses_rtmp_url_with_and_without_port() {
        let r = relay("rtmp://cdn.example.com/live/backup").unwrap();
        assert_eq!(
            (r.host.as_str(), r.port, r.app.as_str(), r.stream.as_str()),
            ("cdn.example.com", 1935, "live", "backup")
        );
        let r = relay("rtmp://10.0.0.5:1936/app/key123").unwrap();
        assert_eq!(
            (r.host.as_str(), r.port, r.app.as_str(), r.stream.as_str()),
            ("10.0.0.5", 1936, "app", "key123")
        );
    }

    #[test]
    fn rejects_malformed_urls() {
        assert!(relay("http://x/live/s").is_err());
        assert!(relay("rtmp://hostonly").is_err());
        assert!(relay("rtmp://host/liveonly").is_err());
    }
}