1mod amf;
39mod chunk;
40mod flv;
41mod handshake;
42mod relay;
43
44pub use relay::RtmpRelay;
45
46use crate::bus::{PlaybackRegistry, PublishRegistry, StreamHandle};
47use crate::{MediaFrame, Result, StreamError, StreamKey};
48use amf::Amf0Value;
49use async_trait::async_trait;
50use chunk::{ChunkReader, ChunkWriter, RtmpMessage};
51use std::net::SocketAddr;
52use std::sync::Arc;
53use tokio::net::TcpStream;
54use tokio_util::sync::CancellationToken;
55use tracing::{debug, info, warn};
56
57pub const DEFAULT_RTMP_PORT: u16 = 1935;
59const OUT_CHUNK_SIZE: usize = 4096;
62
63const MSG_SET_CHUNK_SIZE: u8 = 1;
65const MSG_ACK: u8 = 3;
66const MSG_USER_CONTROL: u8 = 4;
67const MSG_WINDOW_ACK_SIZE: u8 = 5;
68const MSG_SET_PEER_BANDWIDTH: u8 = 6;
69const MSG_AUDIO: u8 = 8;
70const MSG_VIDEO: u8 = 9;
71const MSG_DATA_AMF3: u8 = 15;
72const MSG_DATA_AMF0: u8 = 18;
73const MSG_COMMAND_AMF3: u8 = 17;
74const MSG_COMMAND_AMF0: u8 = 20;
75
76const CSID_CONTROL: u8 = 2;
78const CSID_COMMAND: u8 = 3;
79const CSID_AUDIO: u8 = 4;
80const CSID_VIDEO: u8 = 6;
81
82const STREAM_ID: u32 = 1;
85
86pub struct RtmpHandler {
97 addr: SocketAddr,
98 max_connections: usize,
99 playback: Option<Arc<dyn PlaybackRegistry>>,
100}
101
102impl RtmpHandler {
103 pub fn new(addr: SocketAddr) -> Self {
105 Self {
106 addr,
107 max_connections: 1024,
108 playback: None,
109 }
110 }
111
112 pub fn max_connections(mut self, max: usize) -> Self {
114 self.max_connections = max;
115 self
116 }
117
118 pub fn with_playback(mut self, playback: Arc<dyn PlaybackRegistry>) -> Self {
121 self.playback = Some(playback);
122 self
123 }
124}
125
126#[async_trait]
134impl crate::inbound::InboundProtocol for RtmpHandler {
135 fn name(&self) -> &'static str {
136 "rtmp"
137 }
138
139 async fn serve(
140 &self,
141 ctx: crate::inbound::IngestContext,
142 shutdown: CancellationToken,
143 ) -> Result<()> {
144 info!(addr = %self.addr, "rtmp handler listening");
145 let registry = Arc::clone(ctx.registry());
150 let playback = self.playback.clone();
151 crate::protocol::run_tcp_ingest_server(
152 self.addr,
153 self.max_connections,
154 shutdown,
155 move |sock, peer| {
156 let registry = Arc::clone(®istry);
157 let playback = playback.clone();
158 async move {
159 if let Err(e) = handle_connection(sock, peer, registry, playback).await {
160 warn!(%peer, error = %e, "rtmp session ended with error");
161 }
162 }
163 },
164 )
165 .await
166 }
167}
168
169async fn handle_connection(
171 mut sock: TcpStream,
172 peer: SocketAddr,
173 registry: Arc<dyn PublishRegistry>,
174 playback: Option<Arc<dyn PlaybackRegistry>>,
175) -> Result<()> {
176 handshake::accept(&mut sock).await?;
177 debug!(%peer, "rtmp handshake complete");
178 let (read_half, write_half) = sock.into_split();
179 let mut session = Session::new(
180 ChunkReader::new(read_half),
181 ChunkWriter::new(write_half),
182 registry,
183 playback,
184 );
185 session.run().await
186}
187
188#[derive(PartialEq)]
190enum Role {
191 Pending,
192 Publishing,
193 Playing,
194}
195
196struct Session<R, W> {
198 reader: ChunkReader<R>,
199 writer: ChunkWriter<W>,
200 registry: Arc<dyn PublishRegistry>,
201 playback: Option<Arc<dyn PlaybackRegistry>>,
202 app: Option<String>,
203 role: Role,
204 publish_key: Option<StreamKey>,
205 handle: Option<StreamHandle>,
206 avc: Option<flv::AvcConfig>,
207 aac: Option<flv::AudioConfig>,
208 audio_seq_sent: bool,
210 stop: bool,
211}
212
213impl<R, W> Session<R, W>
214where
215 R: tokio::io::AsyncRead + Unpin,
216 W: tokio::io::AsyncWrite + Unpin,
217{
218 fn new(
219 reader: ChunkReader<R>,
220 writer: ChunkWriter<W>,
221 registry: Arc<dyn PublishRegistry>,
222 playback: Option<Arc<dyn PlaybackRegistry>>,
223 ) -> Self {
224 Self {
225 reader,
226 writer,
227 registry,
228 playback,
229 app: None,
230 role: Role::Pending,
231 publish_key: None,
232 handle: None,
233 avc: None,
234 aac: None,
235 audio_seq_sent: false,
236 stop: false,
237 }
238 }
239
240 async fn run(&mut self) -> Result<()> {
242 let result = self.event_loop().await;
243 if let Some(key) = self.publish_key.take() {
244 let _ = self.registry.end_publish(&key).await;
245 }
246 match result {
247 Ok(()) => Ok(()),
248 Err(e) if is_disconnect(&e) => Ok(()),
249 Err(e) => Err(e),
250 }
251 }
252
253 async fn event_loop(&mut self) -> Result<()> {
254 while !self.stop {
255 let msg = self.reader.read_message().await?;
256 self.process(msg).await?;
257 }
258 Ok(())
259 }
260
261 async fn process(&mut self, msg: RtmpMessage) -> Result<()> {
262 match msg.type_id {
263 MSG_SET_CHUNK_SIZE => {
264 if let Ok(b) = <[u8; 4]>::try_from(&msg.payload[..msg.payload.len().min(4)]) {
265 self.reader.set_chunk_size(u32::from_be_bytes(b) as usize);
266 }
267 }
268 MSG_COMMAND_AMF0 => self.handle_command(&msg.payload).await?,
269 MSG_COMMAND_AMF3 => {
270 self.handle_command(&msg.payload[1.min(msg.payload.len())..])
273 .await?
274 }
275 MSG_VIDEO if self.role == Role::Publishing => self.on_video(&msg)?,
276 MSG_AUDIO if self.role == Role::Publishing => self.on_audio(&msg)?,
277 MSG_DATA_AMF0 | MSG_DATA_AMF3 => { }
278 MSG_ACK | MSG_WINDOW_ACK_SIZE | MSG_SET_PEER_BANDWIDTH | MSG_USER_CONTROL => {}
279 other => debug!(
280 type_id = other,
281 stream = msg.msg_stream_id,
282 "ignoring RTMP message"
283 ),
284 }
285 Ok(())
286 }
287
288 async fn handle_command(&mut self, payload: &[u8]) -> Result<()> {
291 let values = amf::decode_all(payload);
292 let Some(name) = values.first().and_then(Amf0Value::as_str) else {
293 return Ok(());
294 };
295 let txn = values.get(1).and_then(Amf0Value::as_number).unwrap_or(0.0);
296 match name {
297 "connect" => self.on_connect(txn, values.get(2)).await,
298 "createStream" => self.on_create_stream(txn).await,
299 "publish" => self.on_publish(values.get(3)).await,
300 "play" => self.on_play(values.get(3)).await,
301 "releaseStream" | "FCPublish" | "FCUnpublish" | "deleteStream" | "closeStream"
302 | "FCSubscribe" => Ok(()),
303 other => {
304 debug!(command = other, "unhandled RTMP command");
305 Ok(())
306 }
307 }
308 }
309
310 async fn on_connect(&mut self, txn: f64, cmd_obj: Option<&Amf0Value>) -> Result<()> {
311 let app = cmd_obj
312 .and_then(|o| o.get("app"))
313 .and_then(Amf0Value::as_str)
314 .unwrap_or("")
315 .split('?')
316 .next()
317 .unwrap_or("")
318 .to_string();
319 self.app = Some(app);
320
321 self.send_control(MSG_WINDOW_ACK_SIZE, &2_500_000u32.to_be_bytes())
323 .await?;
324 let mut bw = Vec::from(2_500_000u32.to_be_bytes());
325 bw.push(2); self.send_control(MSG_SET_PEER_BANDWIDTH, &bw).await?;
327 self.send_control(MSG_SET_CHUNK_SIZE, &(OUT_CHUNK_SIZE as u32).to_be_bytes())
328 .await?;
329 self.writer.set_chunk_size(OUT_CHUNK_SIZE);
330
331 let props = amf::object(vec![
332 ("fmsVer", amf::string("FMS/3,0,1,123")),
333 ("capabilities", Amf0Value::Number(31.0)),
334 ]);
335 let info = amf::object(vec![
336 ("level", amf::string("status")),
337 ("code", amf::string("NetConnection.Connect.Success")),
338 ("description", amf::string("Connection succeeded.")),
339 ("objectEncoding", Amf0Value::Number(0.0)),
340 ]);
341 self.send_command(
342 0,
343 &[amf::string("_result"), Amf0Value::Number(txn), props, info],
344 )
345 .await
346 }
347
348 async fn on_create_stream(&mut self, txn: f64) -> Result<()> {
349 self.send_command(
350 0,
351 &[
352 amf::string("_result"),
353 Amf0Value::Number(txn),
354 Amf0Value::Null,
355 Amf0Value::Number(STREAM_ID as f64),
356 ],
357 )
358 .await
359 }
360
361 async fn on_publish(&mut self, name: Option<&Amf0Value>) -> Result<()> {
362 let key = self.stream_key(name)?;
363 let handle = self.registry.start_publish(&key).await?;
364 info!(stream = %key, "rtmp publish started");
365 self.handle = Some(handle);
366 self.publish_key = Some(key);
367 self.role = Role::Publishing;
368 self.send_status("status", "NetStream.Publish.Start", "Publishing started.")
369 .await
370 }
371
372 async fn on_play(&mut self, name: Option<&Amf0Value>) -> Result<()> {
373 let key = self.stream_key(name)?;
374 let Some(playback) = self.playback.clone() else {
375 self.send_status("error", "NetStream.Play.Failed", "Playback not enabled.")
376 .await?;
377 self.stop = true;
378 return Ok(());
379 };
380 let handle = playback.get_stream(&key)?;
381 info!(stream = %key, "rtmp play started");
382 self.role = Role::Playing;
383
384 let mut begin = Vec::from(0u16.to_be_bytes()); begin.extend_from_slice(&STREAM_ID.to_be_bytes());
387 self.send_control(MSG_USER_CONTROL, &begin).await?;
388 self.send_status("status", "NetStream.Play.Reset", "Playing and resetting.")
389 .await?;
390 self.send_status("status", "NetStream.Play.Start", "Started playing.")
391 .await?;
392
393 self.serve_play(handle).await?;
394 self.stop = true;
395 Ok(())
396 }
397
398 fn on_video(&mut self, msg: &RtmpMessage) -> Result<()> {
401 let Some(header) = flv::parse_video_header(&msg.payload) else {
402 return Ok(()); };
404 let body = &msg.payload[header.body_offset..];
405 let ts = msg.timestamp as i64;
406
407 if header.avc_packet_type == flv::PKT_SEQUENCE_HEADER {
408 let cfg = flv::AvcConfig::parse(body)
409 .ok_or_else(|| StreamError::protocol("bad AVCDecoderConfigurationRecord"))?;
410 let mut frame =
411 MediaFrame::new_video(ts, ts, cfg.to_annexb(), crate::CodecId::H264, false);
412 frame.flags |= crate::FrameFlags::CONFIG;
413 self.avc = Some(cfg);
414 self.publish(frame);
415 } else if header.avc_packet_type == flv::PKT_RAW {
416 let Some(cfg) = self.avc.as_ref() else {
417 return Ok(()); };
419 let annexb = cfg
420 .avcc_to_annexb(body, header.is_keyframe)
421 .ok_or_else(|| StreamError::protocol("malformed AVCC NAL"))?;
422 let pts = ts + header.composition_time as i64;
423 let frame =
424 MediaFrame::new_video(pts, ts, annexb, crate::CodecId::H264, header.is_keyframe);
425 self.publish(frame);
426 }
427 Ok(())
428 }
429
430 fn on_audio(&mut self, msg: &RtmpMessage) -> Result<()> {
431 let payload = &msg.payload;
432 let Some(&b0) = payload.first() else {
433 return Ok(());
434 };
435 if b0 >> 4 != flv::AUDIO_FORMAT_AAC {
436 return Ok(()); }
438 let aac_packet_type = *payload.get(1).unwrap_or(&0);
439 let body = &payload[2.min(payload.len())..];
440 let ts = msg.timestamp as i64;
441
442 if aac_packet_type == flv::PKT_SEQUENCE_HEADER {
443 let cfg = flv::AudioConfig::parse(body)
444 .ok_or_else(|| StreamError::protocol("bad AudioSpecificConfig"))?;
445 self.aac = Some(cfg);
446 let mut frame =
447 MediaFrame::new_audio(ts, bytes::Bytes::copy_from_slice(body), crate::CodecId::AAC);
448 frame.flags |= crate::FrameFlags::CONFIG;
449 self.publish(frame);
450 } else if let Some(cfg) = self.aac.as_ref() {
451 let adts = cfg.to_adts(body);
452 self.publish(MediaFrame::new_audio(ts, adts, crate::CodecId::AAC));
453 }
454 Ok(())
455 }
456
457 fn publish(&self, frame: MediaFrame) {
458 if let Some(handle) = self.handle.as_ref() {
459 let _ = handle.publish_frame(frame);
460 }
461 }
462
463 async fn serve_play(&mut self, handle: StreamHandle) -> Result<()> {
466 for frame in handle.replay_buffer() {
468 self.send_media(&frame).await?;
469 }
470 let mut sub = handle.subscribe_resilient();
471 while let Some(frame) = sub.recv().await {
472 self.send_media(&frame).await?;
473 }
474 Ok(())
475 }
476
477 async fn send_media(&mut self, frame: &MediaFrame) -> Result<()> {
478 let is_config = frame.flags.contains(crate::FrameFlags::CONFIG);
479 match frame.codec {
480 crate::CodecId::H264 => {
481 let ts = frame.dts.max(0) as u32;
482 let tag = if is_config {
483 let cfg = annexb_to_avc_config(&frame.data);
484 flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &cfg.to_avc_record())
485 } else {
486 let avcc = crate::codec::h264::annexb_to_avcc(&frame.data);
487 let cts = (frame.pts - frame.dts) as i32;
488 flv::build_video_tag(frame.is_keyframe(), flv::PKT_RAW, cts, &avcc)
489 };
490 self.writer
491 .write_message(CSID_VIDEO, MSG_VIDEO, ts, STREAM_ID, &tag)
492 .await
493 }
494 crate::CodecId::AAC => {
495 let ts = frame.pts.max(0) as u32;
496 if is_config {
497 self.audio_seq_sent = true;
498 let tag = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &frame.data);
499 return self
500 .writer
501 .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
502 .await;
503 }
504 if !self.audio_seq_sent {
508 if let Some(cfg) = flv::AudioConfig::from_adts(&frame.data) {
509 let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &cfg.to_asc());
510 self.writer
511 .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &seq)
512 .await?;
513 self.audio_seq_sent = true;
514 }
515 }
516 let raw = frame.data.get(7..).unwrap_or(&[]);
518 let tag = flv::build_audio_tag(flv::PKT_RAW, raw);
519 self.writer
520 .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
521 .await
522 }
523 _ => Ok(()), }
525 }
526
527 fn stream_key(&self, name: Option<&Amf0Value>) -> Result<StreamKey> {
531 let app = self
532 .app
533 .clone()
534 .ok_or_else(|| StreamError::protocol("stream command before connect"))?;
535 let stream = name
536 .and_then(Amf0Value::as_str)
537 .ok_or_else(|| StreamError::protocol("missing stream name"))?
538 .split('?')
539 .next()
540 .unwrap_or("")
541 .to_string();
542 Ok(StreamKey::new(app, stream))
543 }
544
545 async fn send_control(&mut self, type_id: u8, payload: &[u8]) -> Result<()> {
546 self.writer
547 .write_message(CSID_CONTROL, type_id, 0, 0, payload)
548 .await
549 }
550
551 async fn send_command(&mut self, msg_stream_id: u32, values: &[Amf0Value]) -> Result<()> {
552 let mut buf = bytes::BytesMut::new();
553 for v in values {
554 v.encode(&mut buf);
555 }
556 self.writer
557 .write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, msg_stream_id, &buf)
558 .await
559 }
560
561 async fn send_status(&mut self, level: &str, code: &str, description: &str) -> Result<()> {
562 let info = amf::object(vec![
563 ("level", amf::string(level)),
564 ("code", amf::string(code)),
565 ("description", amf::string(description)),
566 ]);
567 self.send_command(
568 STREAM_ID,
569 &[
570 amf::string("onStatus"),
571 Amf0Value::Number(0.0),
572 Amf0Value::Null,
573 info,
574 ],
575 )
576 .await
577 }
578}
579
580fn annexb_to_avc_config(annexb: &[u8]) -> flv::AvcConfig {
582 let mut cfg = flv::AvcConfig {
583 nal_length_size: 4,
584 ..Default::default()
585 };
586 for nal in crate::codec::h264::iter_nals_annexb(annexb) {
587 match nal.first().map(|b| b & 0x1F) {
588 Some(7) => cfg.sps.push(nal.to_vec()),
589 Some(8) => cfg.pps.push(nal.to_vec()),
590 _ => {}
591 }
592 }
593 cfg
594}
595
596fn is_disconnect(e: &StreamError) -> bool {
598 matches!(e, StreamError::Io(io) if matches!(
599 io.kind(),
600 std::io::ErrorKind::UnexpectedEof
601 | std::io::ErrorKind::ConnectionReset
602 | std::io::ErrorKind::BrokenPipe
603 | std::io::ErrorKind::ConnectionAborted
604 ))
605}
606
607#[cfg(test)]
608mod tests {
609 use super::*;
610 use crate::{AppSpec, Engine};
611 use bytes::Bytes;
612
613 fn avcc_idr() -> Vec<u8> {
615 let slice = [0x65u8, 0x88, 0x84, 0x00];
616 let mut v = (slice.len() as u32).to_be_bytes().to_vec();
617 v.extend_from_slice(&slice);
618 v
619 }
620
621 fn avc_decoder_config() -> Vec<u8> {
622 let sps = [0x67u8, 0x42, 0x00, 0x1F, 0xAA];
623 let pps = [0x68u8, 0xCE, 0x3C, 0x80];
624 let mut r = vec![1, sps[1], sps[2], sps[3], 0xFF, 0xE1];
625 r.extend_from_slice(&(sps.len() as u16).to_be_bytes());
626 r.extend_from_slice(&sps);
627 r.push(1);
628 r.extend_from_slice(&(pps.len() as u16).to_be_bytes());
629 r.extend_from_slice(&pps);
630 r
631 }
632
633 async fn publishing_session() -> (Session<tokio::io::Empty, tokio::io::Sink>, Arc<Engine>) {
636 let engine = Engine::builder()
637 .application(AppSpec::new("live").gop_cache(64))
638 .build();
639 let key = StreamKey::new("live", "cam");
640 let handle = engine.start_publish(&key).await.unwrap();
641
642 let mut session = Session::new(
643 ChunkReader::new(tokio::io::empty()),
644 ChunkWriter::new(tokio::io::sink()),
645 engine.clone(),
646 Some(engine.clone()),
647 );
648 session.app = Some("live".into());
649 session.handle = Some(handle);
650 session.role = Role::Publishing;
651 (session, engine)
652 }
653
654 #[tokio::test]
655 async fn publish_video_seq_header_then_keyframe_reaches_gop_cache() {
656 let (mut session, _engine) = publishing_session().await;
657
658 let seq = flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
660 session
661 .on_video(&RtmpMessage {
662 type_id: MSG_VIDEO,
663 timestamp: 0,
664 msg_stream_id: STREAM_ID,
665 payload: seq,
666 })
667 .unwrap();
668
669 let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
671 session
672 .on_video(&RtmpMessage {
673 type_id: MSG_VIDEO,
674 timestamp: 40,
675 msg_stream_id: STREAM_ID,
676 payload: kf,
677 })
678 .unwrap();
679
680 let handle = session.handle.as_ref().unwrap();
681 let (vcfg, _) = handle.cached_configs();
682 let vcfg = vcfg.expect("video config cached");
683 assert!(vcfg.flags.contains(crate::FrameFlags::CONFIG));
684 assert_eq!(&vcfg.data[..5], &[0, 0, 0, 1, 0x67]);
686
687 let replay = handle.replay_buffer();
690 let key = replay
691 .iter()
692 .find(|f| f.is_keyframe())
693 .expect("keyframe in GOP");
694 assert_eq!(&key.data[..5], &[0, 0, 0, 1, 0x67]);
695 }
696
697 #[tokio::test]
698 async fn publish_aac_builds_adts_frames() {
699 let (mut session, _engine) = publishing_session().await;
700 let asc = [0x12u8, 0x10];
702 let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &asc);
703 session
704 .on_audio(&RtmpMessage {
705 type_id: MSG_AUDIO,
706 timestamp: 0,
707 msg_stream_id: STREAM_ID,
708 payload: seq,
709 })
710 .unwrap();
711
712 let raw = flv::build_audio_tag(flv::PKT_RAW, &[0x01, 0x02, 0x03]);
713 session
714 .on_audio(&RtmpMessage {
715 type_id: MSG_AUDIO,
716 timestamp: 23,
717 msg_stream_id: STREAM_ID,
718 payload: raw,
719 })
720 .unwrap();
721
722 let handle = session.handle.as_ref().unwrap();
724 let (_, acfg) = handle.cached_configs();
725 assert_eq!(&acfg.unwrap().data[..], &asc);
726 }
727
728 #[tokio::test]
729 async fn full_publish_session_drives_real_chunk_reader() {
730 use tokio::io::AsyncReadExt;
731
732 let engine = Engine::builder()
733 .application(AppSpec::new("live").gop_cache(64))
734 .build();
735
736 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
740 let addr = listener.local_addr().unwrap();
741 let engine_for_server = engine.clone();
742 let server = tokio::spawn(async move {
743 let (sock, _) = listener.accept().await.unwrap();
744 let (r, w) = sock.into_split();
745 let mut session = Session::new(
746 ChunkReader::new(r),
747 ChunkWriter::new(w),
748 engine_for_server,
749 None,
750 );
751 session.run().await.unwrap(); let handle = session.handle.as_ref().expect("publish handle");
753 let has_config = handle.cached_configs().0.is_some();
754 let has_keyframe = handle.replay_buffer().iter().any(|f| f.is_keyframe());
755 (has_config, has_keyframe)
756 });
757
758 let client = tokio::spawn(async move {
759 let stream = tokio::net::TcpStream::connect(addr).await.unwrap();
760 let (mut read_half, write_half) = stream.into_split();
761 tokio::spawn(async move {
763 let mut buf = [0u8; 4096];
764 while let Ok(n) = read_half.read(&mut buf).await {
765 if n == 0 {
766 break;
767 }
768 }
769 });
770
771 let cmd = |values: &[Amf0Value]| {
772 let mut b = bytes::BytesMut::new();
773 for v in values {
774 v.encode(&mut b);
775 }
776 b
777 };
778 let mut w = ChunkWriter::new(write_half);
779 let connect = cmd(&[
780 amf::string("connect"),
781 Amf0Value::Number(1.0),
782 amf::object(vec![("app", amf::string("live"))]),
783 ]);
784 w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &connect)
785 .await
786 .unwrap();
787 let create = cmd(&[
788 amf::string("createStream"),
789 Amf0Value::Number(2.0),
790 Amf0Value::Null,
791 ]);
792 w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &create)
793 .await
794 .unwrap();
795 let publish = cmd(&[
796 amf::string("publish"),
797 Amf0Value::Number(3.0),
798 Amf0Value::Null,
799 amf::string("cam"),
800 amf::string("live"),
801 ]);
802 w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &publish)
803 .await
804 .unwrap();
805
806 let seq =
808 flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
809 w.write_message(CSID_VIDEO, MSG_VIDEO, 0, STREAM_ID, &seq)
810 .await
811 .unwrap();
812 let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
813 w.write_message(CSID_VIDEO, MSG_VIDEO, 40, STREAM_ID, &kf)
814 .await
815 .unwrap();
816 });
818
819 client.await.unwrap();
820 let (has_config, has_keyframe) = server.await.unwrap();
821 assert!(has_config, "video config reached the engine over the wire");
822 assert!(has_keyframe, "keyframe reached the GOP cache over the wire");
823 }
824
825 #[test]
826 fn annexb_config_splits_sps_and_pps() {
827 let annexb = Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42, 0x00, 1, 0, 0, 0, 1, 0x68, 0xCE]);
828 let cfg = annexb_to_avc_config(&annexb);
829 assert_eq!(cfg.sps.len(), 1);
830 assert_eq!(cfg.pps.len(), 1);
831 assert_eq!(cfg.sps[0][0] & 0x1F, 7);
832 assert_eq!(cfg.pps[0][0] & 0x1F, 8);
833 }
834}