arcly_stream/protocol/rtmp/
relay.rs1use std::ops::ControlFlow;
22use std::sync::Arc;
23use std::time::Duration;
24
25use tokio::io::AsyncWrite;
26use tokio::net::TcpStream;
27use tokio_util::sync::CancellationToken;
28use tracing::{debug, info, warn};
29
30use super::amf::{self, Amf0Value};
31use super::chunk::{ChunkReader, ChunkWriter};
32use super::{
33 annexb_to_avc_config, flv, CSID_AUDIO, CSID_COMMAND, CSID_CONTROL, CSID_VIDEO, MSG_AUDIO,
34 MSG_COMMAND_AMF0, MSG_SET_CHUNK_SIZE, MSG_VIDEO, OUT_CHUNK_SIZE,
35};
36use crate::bus::PlaybackRegistry;
37use crate::{MediaFrame, Result, StreamError, StreamKey};
38
39const SETUP_TIMEOUT: Duration = Duration::from_secs(10);
42
43pub struct RtmpRelay {
45 playback: Arc<dyn PlaybackRegistry>,
46 host: String,
47 port: u16,
48 app: String,
49 stream: String,
50}
51
52impl RtmpRelay {
53 pub fn new(playback: Arc<dyn PlaybackRegistry>, url: &str) -> Result<Self> {
56 let (authority, path) = crate::protocol::url::split_scheme(url, "rtmp://")
57 .ok_or_else(|| StreamError::protocol("relay url must be rtmp://host/app/stream"))?;
58 let (host, port) = crate::protocol::url::parse_authority(authority, 1935)?;
59 let (app, stream) = path
60 .split_once('/')
61 .ok_or_else(|| StreamError::protocol("relay url needs both app and stream"))?;
62 if app.is_empty() || stream.is_empty() {
63 return Err(StreamError::protocol("incomplete rtmp relay url"));
64 }
65 Ok(Self {
66 playback,
67 host,
68 port,
69 app: app.to_string(),
70 stream: stream.to_string(),
71 })
72 }
73
74 pub async fn run(self, key: StreamKey, shutdown: CancellationToken) -> Result<()> {
77 let addr = format!("{}:{}", self.host, self.port);
78 let mut tcp = TcpStream::connect(&addr).await?;
79 super::handshake::initiate(&mut tcp).await?;
80 let (rd, wr) = tcp.into_split();
81 let mut reader = ChunkReader::new(rd);
82 let mut writer = ChunkWriter::new(wr);
83
84 let stream_id = tokio::time::timeout(SETUP_TIMEOUT, async {
86 writer
88 .write_message(
89 CSID_CONTROL,
90 MSG_SET_CHUNK_SIZE,
91 0,
92 0,
93 &(OUT_CHUNK_SIZE as u32).to_be_bytes(),
94 )
95 .await?;
96 writer.set_chunk_size(OUT_CHUNK_SIZE);
97
98 let tc_url = format!("rtmp://{addr}/{}", self.app);
99 send_command(
100 &mut writer,
101 0,
102 &[
103 amf::string("connect"),
104 Amf0Value::Number(1.0),
105 amf::object(vec![
106 ("app", amf::string(self.app.as_str())),
107 ("type", amf::string("nonprivate")),
108 ("flashVer", amf::string("FMLE/3.0 (compatible; arcly)")),
109 ("tcUrl", amf::string(tc_url.as_str())),
110 ]),
111 ],
112 )
113 .await?;
114 read_until_command(&mut reader, "_result").await?;
115
116 send_command(
117 &mut writer,
118 0,
119 &[
120 amf::string("createStream"),
121 Amf0Value::Number(2.0),
122 Amf0Value::Null,
123 ],
124 )
125 .await?;
126 let values = read_until_command(&mut reader, "_result").await?;
127 let stream_id = values
129 .get(3)
130 .and_then(Amf0Value::as_number)
131 .map(|n| n as u32)
132 .unwrap_or(super::STREAM_ID);
133
134 send_command(
135 &mut writer,
136 stream_id,
137 &[
138 amf::string("publish"),
139 Amf0Value::Number(3.0),
140 Amf0Value::Null,
141 amf::string(self.stream.as_str()),
142 amf::string("live"),
143 ],
144 )
145 .await?;
146 Ok::<u32, StreamError>(stream_id)
147 })
148 .await
149 .map_err(|_| StreamError::protocol("rtmp relay setup timed out"))??;
150
151 info!(%addr, app = %self.app, stream = %self.stream, stream_id, "rtmp relay publishing");
152
153 let handle = self.playback.get_stream(&key)?;
155 let mut sink = RtmpRelaySink {
156 writer: &mut writer,
157 stream_id,
158 audio_seq_sent: false,
159 };
160 handle.drive_to(&shutdown, &mut sink).await?;
161 debug!(%addr, "rtmp relay finished");
162 Ok(())
163 }
164}
165
166struct RtmpRelaySink<'a, W: AsyncWrite + Unpin> {
168 writer: &'a mut ChunkWriter<W>,
169 stream_id: u32,
170 audio_seq_sent: bool,
171}
172
173#[async_trait::async_trait]
174impl<W: AsyncWrite + Unpin + Send> crate::bus::FrameSink for RtmpRelaySink<'_, W> {
175 async fn send(&mut self, frame: Arc<MediaFrame>) -> Result<std::ops::ControlFlow<()>> {
176 write_media(
177 self.writer,
178 &frame,
179 self.stream_id,
180 &mut self.audio_seq_sent,
181 )
182 .await?;
183 Ok(ControlFlow::Continue(()))
184 }
185}
186
187async fn send_command<W: AsyncWrite + Unpin>(
189 writer: &mut ChunkWriter<W>,
190 msg_stream_id: u32,
191 values: &[Amf0Value],
192) -> Result<()> {
193 let mut buf = bytes::BytesMut::new();
194 for v in values {
195 v.encode(&mut buf);
196 }
197 writer
198 .write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, msg_stream_id, &buf)
199 .await
200}
201
202async fn read_until_command<R: tokio::io::AsyncRead + Unpin>(
206 reader: &mut ChunkReader<R>,
207 want: &str,
208) -> Result<Vec<Amf0Value>> {
209 for _ in 0..32 {
210 let msg = reader.read_message().await?;
211 match msg.type_id {
212 MSG_SET_CHUNK_SIZE => {
213 if let Some(b) = msg.payload.get(..4) {
214 let size = u32::from_be_bytes([b[0], b[1], b[2], b[3]]) as usize;
215 reader.set_chunk_size(size.max(1));
216 }
217 }
218 MSG_COMMAND_AMF0 => {
219 let values = amf::decode_all(&msg.payload);
220 match values.first().and_then(Amf0Value::as_str) {
221 Some(name) if name == want => return Ok(values),
222 Some("_error") => {
223 return Err(StreamError::protocol("rtmp relay rejected by upstream"))
224 }
225 _ => {}
226 }
227 }
228 _ => {} }
230 }
231 warn!(want, "rtmp relay: expected command not seen");
232 Err(StreamError::protocol(
233 "rtmp relay: upstream handshake stalled",
234 ))
235}
236
237async fn write_media<W: AsyncWrite + Unpin>(
240 writer: &mut ChunkWriter<W>,
241 frame: &MediaFrame,
242 stream_id: u32,
243 audio_seq_sent: &mut bool,
244) -> Result<()> {
245 let is_config = frame.flags.contains(crate::FrameFlags::CONFIG);
246 match frame.codec {
247 crate::CodecId::H264 => {
248 let ts = frame.dts.max(0) as u32;
249 let tag = if is_config {
250 let cfg = annexb_to_avc_config(&frame.data);
251 flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &cfg.to_avc_record())
252 } else {
253 let avcc = crate::codec::h264::annexb_to_avcc(&frame.data);
254 let cts = (frame.pts - frame.dts) as i32;
255 flv::build_video_tag(frame.is_keyframe(), flv::PKT_RAW, cts, &avcc)
256 };
257 writer
258 .write_message(CSID_VIDEO, MSG_VIDEO, ts, stream_id, &tag)
259 .await
260 }
261 crate::CodecId::AAC => {
262 let ts = frame.pts.max(0) as u32;
263 if is_config {
264 *audio_seq_sent = true;
265 let tag = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &frame.data);
266 return writer
267 .write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &tag)
268 .await;
269 }
270 if !*audio_seq_sent {
271 if let Some(cfg) = flv::AudioConfig::from_adts(&frame.data) {
272 let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &cfg.to_asc());
273 writer
274 .write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &seq)
275 .await?;
276 *audio_seq_sent = true;
277 }
278 }
279 let raw = frame.data.get(7..).unwrap_or(&[]);
280 let tag = flv::build_audio_tag(flv::PKT_RAW, raw);
281 writer
282 .write_message(CSID_AUDIO, MSG_AUDIO, ts, stream_id, &tag)
283 .await
284 }
285 _ => Ok(()), }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use crate::Engine;
293
294 fn relay(url: &str) -> Result<RtmpRelay> {
295 let engine = Engine::builder()
296 .application(crate::AppSpec::new("live"))
297 .build();
298 RtmpRelay::new(engine, url)
299 }
300
301 #[test]
302 fn parses_rtmp_url_with_and_without_port() {
303 let r = relay("rtmp://cdn.example.com/live/backup").unwrap();
304 assert_eq!(
305 (r.host.as_str(), r.port, r.app.as_str(), r.stream.as_str()),
306 ("cdn.example.com", 1935, "live", "backup")
307 );
308 let r = relay("rtmp://10.0.0.5:1936/app/key123").unwrap();
309 assert_eq!(
310 (r.host.as_str(), r.port, r.app.as_str(), r.stream.as_str()),
311 ("10.0.0.5", 1936, "app", "key123")
312 );
313 }
314
315 #[test]
316 fn rejects_malformed_urls() {
317 assert!(relay("http://x/live/s").is_err());
318 assert!(relay("rtmp://hostonly").is_err());
319 assert!(relay("rtmp://host/liveonly").is_err());
320 }
321}