use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWrite;
use tokio::net::TcpStream;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use super::amf::{self, Amf0Value};
use super::chunk::{ChunkReader, ChunkWriter};
use super::{
annexb_to_avc_config, flv, CSID_AUDIO, CSID_COMMAND, CSID_CONTROL, CSID_VIDEO, MSG_AUDIO,
MSG_COMMAND_AMF0, MSG_SET_CHUNK_SIZE, MSG_VIDEO, OUT_CHUNK_SIZE,
};
use crate::bus::PlaybackRegistry;
use crate::{MediaFrame, Result, StreamError, StreamKey};
const SETUP_TIMEOUT: Duration = Duration::from_secs(10);
pub struct RtmpRelay {
playback: Arc<dyn PlaybackRegistry>,
host: String,
port: u16,
app: String,
stream: String,
}
impl RtmpRelay {
pub fn new(playback: Arc<dyn PlaybackRegistry>, url: &str) -> Result<Self> {
let (authority, path) = crate::protocol::url::split_scheme(url, "rtmp://")
.ok_or_else(|| StreamError::protocol("relay url must be rtmp://host/app/stream"))?;
let (host, port) = crate::protocol::url::parse_authority(authority, 1935)?;
let (app, stream) = path
.split_once('/')
.ok_or_else(|| StreamError::protocol("relay url needs both app and stream"))?;
if app.is_empty() || stream.is_empty() {
return Err(StreamError::protocol("incomplete rtmp relay url"));
}
Ok(Self {
playback,
host,
port,
app: app.to_string(),
stream: stream.to_string(),
})
}
pub async fn run(self, key: StreamKey, shutdown: CancellationToken) -> Result<()> {
let addr = format!("{}:{}", self.host, self.port);
let mut tcp = TcpStream::connect(&addr).await?;
super::handshake::initiate(&mut tcp).await?;
let (rd, wr) = tcp.into_split();
let mut reader = ChunkReader::new(rd);
let mut writer = ChunkWriter::new(wr);
let stream_id = tokio::time::timeout(SETUP_TIMEOUT, async {
writer
.write_message(
CSID_CONTROL,
MSG_SET_CHUNK_SIZE,
0,
0,
&(OUT_CHUNK_SIZE as u32).to_be_bytes(),
)
.await?;
writer.set_chunk_size(OUT_CHUNK_SIZE);
let tc_url = format!("rtmp://{addr}/{}", self.app);
send_command(
&mut writer,
0,
&[
amf::string("connect"),
Amf0Value::Number(1.0),
amf::object(vec![
("app", amf::string(self.app.as_str())),
("type", amf::string("nonprivate")),
("flashVer", amf::string("FMLE/3.0 (compatible; arcly)")),
("tcUrl", amf::string(tc_url.as_str())),
]),
],
)
.await?;
read_until_command(&mut reader, "_result").await?;
send_command(
&mut writer,
0,
&[
amf::string("createStream"),
Amf0Value::Number(2.0),
Amf0Value::Null,
],
)
.await?;
let values = read_until_command(&mut reader, "_result").await?;
let stream_id = values
.get(3)
.and_then(Amf0Value::as_number)
.map(|n| n as u32)
.unwrap_or(super::STREAM_ID);
send_command(
&mut writer,
stream_id,
&[
amf::string("publish"),
Amf0Value::Number(3.0),
Amf0Value::Null,
amf::string(self.stream.as_str()),
amf::string("live"),
],
)
.await?;
Ok::<u32, StreamError>(stream_id)
})
.await
.map_err(|_| StreamError::protocol("rtmp relay setup timed out"))??;
info!(%addr, app = %self.app, stream = %self.stream, stream_id, "rtmp relay publishing");
let handle = self.playback.get_stream(&key)?;
let mut sink = RtmpRelaySink {
writer: &mut writer,
stream_id,
audio_seq_sent: false,
};
handle.drive_to(&shutdown, &mut sink).await?;
debug!(%addr, "rtmp relay finished");
Ok(())
}
}
struct RtmpRelaySink<'a, W: AsyncWrite + Unpin> {
writer: &'a mut ChunkWriter<W>,
stream_id: u32,
audio_seq_sent: bool,
}
#[async_trait::async_trait]
impl<W: AsyncWrite + Unpin + Send> crate::bus::FrameSink for RtmpRelaySink<'_, W> {
async fn send(&mut self, frame: Arc<MediaFrame>) -> Result<std::ops::ControlFlow<()>> {
write_media(
self.writer,
&frame,
self.stream_id,
&mut self.audio_seq_sent,
)
.await?;
Ok(ControlFlow::Continue(()))
}
}
async fn send_command<W: AsyncWrite + Unpin>(
writer: &mut ChunkWriter<W>,
msg_stream_id: u32,
values: &[Amf0Value],
) -> Result<()> {
let mut buf = bytes::BytesMut::new();
for v in values {
v.encode(&mut buf);
}
writer
.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, msg_stream_id, &buf)
.await
}
async fn read_until_command<R: tokio::io::AsyncRead + Unpin>(
reader: &mut ChunkReader<R>,
want: &str,
) -> Result<Vec<Amf0Value>> {
for _ in 0..32 {
let msg = reader.read_message().await?;
match msg.type_id {
MSG_SET_CHUNK_SIZE => {
if let Some(b) = msg.payload.get(..4) {
let size = u32::from_be_bytes([b[0], b[1], b[2], b[3]]) as usize;
reader.set_chunk_size(size.max(1));
}
}
MSG_COMMAND_AMF0 => {
let values = amf::decode_all(&msg.payload);
match values.first().and_then(Amf0Value::as_str) {
Some(name) if name == want => return Ok(values),
Some("_error") => {
return Err(StreamError::protocol("rtmp relay rejected by upstream"))
}
_ => {}
}
}
_ => {} }
}
warn!(want, "rtmp relay: expected command not seen");
Err(StreamError::protocol(
"rtmp relay: upstream handshake stalled",
))
}
async fn write_media<W: AsyncWrite + Unpin>(
writer: &mut ChunkWriter<W>,
frame: &MediaFrame,
stream_id: u32,
audio_seq_sent: &mut bool,
) -> Result<()> {
let is_config = frame.flags.contains(crate::FrameFlags::CONFIG);
match frame.codec {
crate::CodecId::H264 => {
let ts = frame.dts.max(0) as u32;
let tag = if is_config {
let cfg = annexb_to_avc_config(&frame.data);
flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &cfg.to_avc_record())
} else {
let avcc = crate::codec::h264::annexb_to_avcc(&frame.data);
let cts = (frame.pts - frame.dts) as i32;
flv::build_video_tag(frame.is_keyframe(), flv::PKT_RAW, cts, &avcc)
};
writer
.write_message(CSID_VIDEO, MSG_VIDEO, ts, stream_id, &tag)
.await
}
crate::CodecId::AAC => {
let ts = frame.pts.max(0) as u32;
if is_config {
*audio_seq_sent = true;
let tag = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &frame.data);
return writer
.write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &tag)
.await;
}
if !*audio_seq_sent {
if let Some(cfg) = flv::AudioConfig::from_adts(&frame.data) {
let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &cfg.to_asc());
writer
.write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &seq)
.await?;
*audio_seq_sent = true;
}
}
let raw = frame.data.get(7..).unwrap_or(&[]);
let tag = flv::build_audio_tag(flv::PKT_RAW, raw);
writer
.write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &tag)
.await
}
_ => Ok(()), }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Engine;
fn relay(url: &str) -> Result<RtmpRelay> {
let engine = Engine::builder()
.application(crate::AppSpec::new("live"))
.build();
RtmpRelay::new(engine, url)
}
#[test]
fn parses_rtmp_url_with_and_without_port() {
let r = relay("rtmp://cdn.example.com/live/backup").unwrap();
assert_eq!(
(r.host.as_str(), r.port, r.app.as_str(), r.stream.as_str()),
("cdn.example.com", 1935, "live", "backup")
);
let r = relay("rtmp://10.0.0.5:1936/app/key123").unwrap();
assert_eq!(
(r.host.as_str(), r.port, r.app.as_str(), r.stream.as_str()),
("10.0.0.5", 1936, "app", "key123")
);
}
#[test]
fn rejects_malformed_urls() {
assert!(relay("http://x/live/s").is_err());
assert!(relay("rtmp://hostonly").is_err());
assert!(relay("rtmp://host/liveonly").is_err());
}
}