1mod amf;
39mod chunk;
40mod flv;
41mod handshake;
42
43use crate::bus::{PlaybackRegistry, PublishRegistry, StreamHandle};
44use crate::{MediaFrame, Result, StreamError, StreamKey};
45use amf::Amf0Value;
46use async_trait::async_trait;
47use chunk::{ChunkReader, ChunkWriter, RtmpMessage};
48use std::net::SocketAddr;
49use std::sync::Arc;
50use tokio::net::TcpStream;
51use tokio_util::sync::CancellationToken;
52use tracing::{debug, info, warn};
53
54pub const DEFAULT_RTMP_PORT: u16 = 1935;
56const OUT_CHUNK_SIZE: usize = 4096;
59
60const MSG_SET_CHUNK_SIZE: u8 = 1;
62const MSG_ACK: u8 = 3;
63const MSG_USER_CONTROL: u8 = 4;
64const MSG_WINDOW_ACK_SIZE: u8 = 5;
65const MSG_SET_PEER_BANDWIDTH: u8 = 6;
66const MSG_AUDIO: u8 = 8;
67const MSG_VIDEO: u8 = 9;
68const MSG_DATA_AMF3: u8 = 15;
69const MSG_DATA_AMF0: u8 = 18;
70const MSG_COMMAND_AMF3: u8 = 17;
71const MSG_COMMAND_AMF0: u8 = 20;
72
73const CSID_CONTROL: u8 = 2;
75const CSID_COMMAND: u8 = 3;
76const CSID_AUDIO: u8 = 4;
77const CSID_VIDEO: u8 = 6;
78
79const STREAM_ID: u32 = 1;
82
83pub struct RtmpHandler {
94 addr: SocketAddr,
95 max_connections: usize,
96 playback: Option<Arc<dyn PlaybackRegistry>>,
97}
98
99impl RtmpHandler {
100 pub fn new(addr: SocketAddr) -> Self {
102 Self {
103 addr,
104 max_connections: 1024,
105 playback: None,
106 }
107 }
108
109 pub fn max_connections(mut self, max: usize) -> Self {
111 self.max_connections = max;
112 self
113 }
114
115 pub fn with_playback(mut self, playback: Arc<dyn PlaybackRegistry>) -> Self {
118 self.playback = Some(playback);
119 self
120 }
121}
122
123#[async_trait]
131impl crate::inbound::InboundProtocol for RtmpHandler {
132 fn name(&self) -> &'static str {
133 "rtmp"
134 }
135
136 async fn serve(
137 &self,
138 ctx: crate::inbound::IngestContext,
139 shutdown: CancellationToken,
140 ) -> Result<()> {
141 info!(addr = %self.addr, "rtmp handler listening");
142 let registry = Arc::clone(ctx.registry());
147 let playback = self.playback.clone();
148 crate::protocol::run_tcp_ingest_server(
149 self.addr,
150 self.max_connections,
151 shutdown,
152 move |sock, peer| {
153 let registry = Arc::clone(®istry);
154 let playback = playback.clone();
155 async move {
156 if let Err(e) = handle_connection(sock, peer, registry, playback).await {
157 warn!(%peer, error = %e, "rtmp session ended with error");
158 }
159 }
160 },
161 )
162 .await
163 }
164}
165
166async fn handle_connection(
168 mut sock: TcpStream,
169 peer: SocketAddr,
170 registry: Arc<dyn PublishRegistry>,
171 playback: Option<Arc<dyn PlaybackRegistry>>,
172) -> Result<()> {
173 handshake::accept(&mut sock).await?;
174 debug!(%peer, "rtmp handshake complete");
175 let (read_half, write_half) = sock.into_split();
176 let mut session = Session::new(
177 ChunkReader::new(read_half),
178 ChunkWriter::new(write_half),
179 registry,
180 playback,
181 );
182 session.run().await
183}
184
185#[derive(PartialEq)]
187enum Role {
188 Pending,
189 Publishing,
190 Playing,
191}
192
193struct Session<R, W> {
195 reader: ChunkReader<R>,
196 writer: ChunkWriter<W>,
197 registry: Arc<dyn PublishRegistry>,
198 playback: Option<Arc<dyn PlaybackRegistry>>,
199 app: Option<String>,
200 role: Role,
201 publish_key: Option<StreamKey>,
202 handle: Option<StreamHandle>,
203 avc: Option<flv::AvcConfig>,
204 aac: Option<flv::AudioConfig>,
205 audio_seq_sent: bool,
207 stop: bool,
208}
209
210impl<R, W> Session<R, W>
211where
212 R: tokio::io::AsyncRead + Unpin,
213 W: tokio::io::AsyncWrite + Unpin,
214{
215 fn new(
216 reader: ChunkReader<R>,
217 writer: ChunkWriter<W>,
218 registry: Arc<dyn PublishRegistry>,
219 playback: Option<Arc<dyn PlaybackRegistry>>,
220 ) -> Self {
221 Self {
222 reader,
223 writer,
224 registry,
225 playback,
226 app: None,
227 role: Role::Pending,
228 publish_key: None,
229 handle: None,
230 avc: None,
231 aac: None,
232 audio_seq_sent: false,
233 stop: false,
234 }
235 }
236
237 async fn run(&mut self) -> Result<()> {
239 let result = self.event_loop().await;
240 if let Some(key) = self.publish_key.take() {
241 let _ = self.registry.end_publish(&key).await;
242 }
243 match result {
244 Ok(()) => Ok(()),
245 Err(e) if is_disconnect(&e) => Ok(()),
246 Err(e) => Err(e),
247 }
248 }
249
250 async fn event_loop(&mut self) -> Result<()> {
251 while !self.stop {
252 let msg = self.reader.read_message().await?;
253 self.process(msg).await?;
254 }
255 Ok(())
256 }
257
258 async fn process(&mut self, msg: RtmpMessage) -> Result<()> {
259 match msg.type_id {
260 MSG_SET_CHUNK_SIZE => {
261 if let Ok(b) = <[u8; 4]>::try_from(&msg.payload[..msg.payload.len().min(4)]) {
262 self.reader.set_chunk_size(u32::from_be_bytes(b) as usize);
263 }
264 }
265 MSG_COMMAND_AMF0 => self.handle_command(&msg.payload).await?,
266 MSG_COMMAND_AMF3 => {
267 self.handle_command(&msg.payload[1.min(msg.payload.len())..])
270 .await?
271 }
272 MSG_VIDEO if self.role == Role::Publishing => self.on_video(&msg)?,
273 MSG_AUDIO if self.role == Role::Publishing => self.on_audio(&msg)?,
274 MSG_DATA_AMF0 | MSG_DATA_AMF3 => { }
275 MSG_ACK | MSG_WINDOW_ACK_SIZE | MSG_SET_PEER_BANDWIDTH | MSG_USER_CONTROL => {}
276 other => debug!(
277 type_id = other,
278 stream = msg.msg_stream_id,
279 "ignoring RTMP message"
280 ),
281 }
282 Ok(())
283 }
284
285 async fn handle_command(&mut self, payload: &[u8]) -> Result<()> {
288 let values = amf::decode_all(payload);
289 let Some(name) = values.first().and_then(Amf0Value::as_str) else {
290 return Ok(());
291 };
292 let txn = values.get(1).and_then(Amf0Value::as_number).unwrap_or(0.0);
293 match name {
294 "connect" => self.on_connect(txn, values.get(2)).await,
295 "createStream" => self.on_create_stream(txn).await,
296 "publish" => self.on_publish(values.get(3)).await,
297 "play" => self.on_play(values.get(3)).await,
298 "releaseStream" | "FCPublish" | "FCUnpublish" | "deleteStream" | "closeStream"
299 | "FCSubscribe" => Ok(()),
300 other => {
301 debug!(command = other, "unhandled RTMP command");
302 Ok(())
303 }
304 }
305 }
306
307 async fn on_connect(&mut self, txn: f64, cmd_obj: Option<&Amf0Value>) -> Result<()> {
308 let app = cmd_obj
309 .and_then(|o| o.get("app"))
310 .and_then(Amf0Value::as_str)
311 .unwrap_or("")
312 .split('?')
313 .next()
314 .unwrap_or("")
315 .to_string();
316 self.app = Some(app);
317
318 self.send_control(MSG_WINDOW_ACK_SIZE, &2_500_000u32.to_be_bytes())
320 .await?;
321 let mut bw = Vec::from(2_500_000u32.to_be_bytes());
322 bw.push(2); self.send_control(MSG_SET_PEER_BANDWIDTH, &bw).await?;
324 self.send_control(MSG_SET_CHUNK_SIZE, &(OUT_CHUNK_SIZE as u32).to_be_bytes())
325 .await?;
326 self.writer.set_chunk_size(OUT_CHUNK_SIZE);
327
328 let props = amf::object(vec![
329 ("fmsVer", amf::string("FMS/3,0,1,123")),
330 ("capabilities", Amf0Value::Number(31.0)),
331 ]);
332 let info = amf::object(vec![
333 ("level", amf::string("status")),
334 ("code", amf::string("NetConnection.Connect.Success")),
335 ("description", amf::string("Connection succeeded.")),
336 ("objectEncoding", Amf0Value::Number(0.0)),
337 ]);
338 self.send_command(
339 0,
340 &[amf::string("_result"), Amf0Value::Number(txn), props, info],
341 )
342 .await
343 }
344
345 async fn on_create_stream(&mut self, txn: f64) -> Result<()> {
346 self.send_command(
347 0,
348 &[
349 amf::string("_result"),
350 Amf0Value::Number(txn),
351 Amf0Value::Null,
352 Amf0Value::Number(STREAM_ID as f64),
353 ],
354 )
355 .await
356 }
357
358 async fn on_publish(&mut self, name: Option<&Amf0Value>) -> Result<()> {
359 let key = self.stream_key(name)?;
360 let handle = self.registry.start_publish(&key).await?;
361 info!(stream = %key, "rtmp publish started");
362 self.handle = Some(handle);
363 self.publish_key = Some(key);
364 self.role = Role::Publishing;
365 self.send_status("status", "NetStream.Publish.Start", "Publishing started.")
366 .await
367 }
368
369 async fn on_play(&mut self, name: Option<&Amf0Value>) -> Result<()> {
370 let key = self.stream_key(name)?;
371 let Some(playback) = self.playback.clone() else {
372 self.send_status("error", "NetStream.Play.Failed", "Playback not enabled.")
373 .await?;
374 self.stop = true;
375 return Ok(());
376 };
377 let handle = playback.get_stream(&key)?;
378 info!(stream = %key, "rtmp play started");
379 self.role = Role::Playing;
380
381 let mut begin = Vec::from(0u16.to_be_bytes()); begin.extend_from_slice(&STREAM_ID.to_be_bytes());
384 self.send_control(MSG_USER_CONTROL, &begin).await?;
385 self.send_status("status", "NetStream.Play.Reset", "Playing and resetting.")
386 .await?;
387 self.send_status("status", "NetStream.Play.Start", "Started playing.")
388 .await?;
389
390 self.serve_play(handle).await?;
391 self.stop = true;
392 Ok(())
393 }
394
395 fn on_video(&mut self, msg: &RtmpMessage) -> Result<()> {
398 let Some(header) = flv::parse_video_header(&msg.payload) else {
399 return Ok(()); };
401 let body = &msg.payload[header.body_offset..];
402 let ts = msg.timestamp as i64;
403
404 if header.avc_packet_type == flv::PKT_SEQUENCE_HEADER {
405 let cfg = flv::AvcConfig::parse(body)
406 .ok_or_else(|| StreamError::protocol("bad AVCDecoderConfigurationRecord"))?;
407 let mut frame =
408 MediaFrame::new_video(ts, ts, cfg.to_annexb(), crate::CodecId::H264, false);
409 frame.flags |= crate::FrameFlags::CONFIG;
410 self.avc = Some(cfg);
411 self.publish(frame);
412 } else if header.avc_packet_type == flv::PKT_RAW {
413 let Some(cfg) = self.avc.as_ref() else {
414 return Ok(()); };
416 let annexb = cfg
417 .avcc_to_annexb(body, header.is_keyframe)
418 .ok_or_else(|| StreamError::protocol("malformed AVCC NAL"))?;
419 let pts = ts + header.composition_time as i64;
420 let frame =
421 MediaFrame::new_video(pts, ts, annexb, crate::CodecId::H264, header.is_keyframe);
422 self.publish(frame);
423 }
424 Ok(())
425 }
426
427 fn on_audio(&mut self, msg: &RtmpMessage) -> Result<()> {
428 let payload = &msg.payload;
429 let Some(&b0) = payload.first() else {
430 return Ok(());
431 };
432 if b0 >> 4 != flv::AUDIO_FORMAT_AAC {
433 return Ok(()); }
435 let aac_packet_type = *payload.get(1).unwrap_or(&0);
436 let body = &payload[2.min(payload.len())..];
437 let ts = msg.timestamp as i64;
438
439 if aac_packet_type == flv::PKT_SEQUENCE_HEADER {
440 let cfg = flv::AudioConfig::parse(body)
441 .ok_or_else(|| StreamError::protocol("bad AudioSpecificConfig"))?;
442 self.aac = Some(cfg);
443 let mut frame =
444 MediaFrame::new_audio(ts, bytes::Bytes::copy_from_slice(body), crate::CodecId::AAC);
445 frame.flags |= crate::FrameFlags::CONFIG;
446 self.publish(frame);
447 } else if let Some(cfg) = self.aac.as_ref() {
448 let adts = cfg.to_adts(body);
449 self.publish(MediaFrame::new_audio(ts, adts, crate::CodecId::AAC));
450 }
451 Ok(())
452 }
453
454 fn publish(&self, frame: MediaFrame) {
455 if let Some(handle) = self.handle.as_ref() {
456 let _ = handle.publish_frame(frame);
457 }
458 }
459
460 async fn serve_play(&mut self, handle: StreamHandle) -> Result<()> {
463 for frame in handle.replay_buffer() {
465 self.send_media(&frame).await?;
466 }
467 let mut sub = handle.subscribe_resilient();
468 while let Some(frame) = sub.recv().await {
469 self.send_media(&frame).await?;
470 }
471 Ok(())
472 }
473
474 async fn send_media(&mut self, frame: &MediaFrame) -> Result<()> {
475 let is_config = frame.flags.contains(crate::FrameFlags::CONFIG);
476 match frame.codec {
477 crate::CodecId::H264 => {
478 let ts = frame.dts.max(0) as u32;
479 let tag = if is_config {
480 let cfg = annexb_to_avc_config(&frame.data);
481 flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &cfg.to_avc_record())
482 } else {
483 let avcc = crate::codec::h264::annexb_to_avcc(&frame.data);
484 let cts = (frame.pts - frame.dts) as i32;
485 flv::build_video_tag(frame.is_keyframe(), flv::PKT_RAW, cts, &avcc)
486 };
487 self.writer
488 .write_message(CSID_VIDEO, MSG_VIDEO, ts, STREAM_ID, &tag)
489 .await
490 }
491 crate::CodecId::AAC => {
492 let ts = frame.pts.max(0) as u32;
493 if is_config {
494 self.audio_seq_sent = true;
495 let tag = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &frame.data);
496 return self
497 .writer
498 .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
499 .await;
500 }
501 if !self.audio_seq_sent {
505 if let Some(cfg) = flv::AudioConfig::from_adts(&frame.data) {
506 let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &cfg.to_asc());
507 self.writer
508 .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &seq)
509 .await?;
510 self.audio_seq_sent = true;
511 }
512 }
513 let raw = frame.data.get(7..).unwrap_or(&[]);
515 let tag = flv::build_audio_tag(flv::PKT_RAW, raw);
516 self.writer
517 .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
518 .await
519 }
520 _ => Ok(()), }
522 }
523
524 fn stream_key(&self, name: Option<&Amf0Value>) -> Result<StreamKey> {
528 let app = self
529 .app
530 .clone()
531 .ok_or_else(|| StreamError::protocol("stream command before connect"))?;
532 let stream = name
533 .and_then(Amf0Value::as_str)
534 .ok_or_else(|| StreamError::protocol("missing stream name"))?
535 .split('?')
536 .next()
537 .unwrap_or("")
538 .to_string();
539 Ok(StreamKey::new(app, stream))
540 }
541
542 async fn send_control(&mut self, type_id: u8, payload: &[u8]) -> Result<()> {
543 self.writer
544 .write_message(CSID_CONTROL, type_id, 0, 0, payload)
545 .await
546 }
547
548 async fn send_command(&mut self, msg_stream_id: u32, values: &[Amf0Value]) -> Result<()> {
549 let mut buf = bytes::BytesMut::new();
550 for v in values {
551 v.encode(&mut buf);
552 }
553 self.writer
554 .write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, msg_stream_id, &buf)
555 .await
556 }
557
558 async fn send_status(&mut self, level: &str, code: &str, description: &str) -> Result<()> {
559 let info = amf::object(vec![
560 ("level", amf::string(level)),
561 ("code", amf::string(code)),
562 ("description", amf::string(description)),
563 ]);
564 self.send_command(
565 STREAM_ID,
566 &[
567 amf::string("onStatus"),
568 Amf0Value::Number(0.0),
569 Amf0Value::Null,
570 info,
571 ],
572 )
573 .await
574 }
575}
576
577fn annexb_to_avc_config(annexb: &[u8]) -> flv::AvcConfig {
579 let mut cfg = flv::AvcConfig {
580 nal_length_size: 4,
581 ..Default::default()
582 };
583 for nal in crate::codec::h264::iter_nals_annexb(annexb) {
584 match nal.first().map(|b| b & 0x1F) {
585 Some(7) => cfg.sps.push(nal.to_vec()),
586 Some(8) => cfg.pps.push(nal.to_vec()),
587 _ => {}
588 }
589 }
590 cfg
591}
592
593fn is_disconnect(e: &StreamError) -> bool {
595 matches!(e, StreamError::Io(io) if matches!(
596 io.kind(),
597 std::io::ErrorKind::UnexpectedEof
598 | std::io::ErrorKind::ConnectionReset
599 | std::io::ErrorKind::BrokenPipe
600 | std::io::ErrorKind::ConnectionAborted
601 ))
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607 use crate::{AppSpec, Engine};
608 use bytes::Bytes;
609
610 fn avcc_idr() -> Vec<u8> {
612 let slice = [0x65u8, 0x88, 0x84, 0x00];
613 let mut v = (slice.len() as u32).to_be_bytes().to_vec();
614 v.extend_from_slice(&slice);
615 v
616 }
617
618 fn avc_decoder_config() -> Vec<u8> {
619 let sps = [0x67u8, 0x42, 0x00, 0x1F, 0xAA];
620 let pps = [0x68u8, 0xCE, 0x3C, 0x80];
621 let mut r = vec![1, sps[1], sps[2], sps[3], 0xFF, 0xE1];
622 r.extend_from_slice(&(sps.len() as u16).to_be_bytes());
623 r.extend_from_slice(&sps);
624 r.push(1);
625 r.extend_from_slice(&(pps.len() as u16).to_be_bytes());
626 r.extend_from_slice(&pps);
627 r
628 }
629
630 async fn publishing_session() -> (Session<tokio::io::Empty, tokio::io::Sink>, Arc<Engine>) {
633 let engine = Engine::builder()
634 .application(AppSpec::new("live").gop_cache(64))
635 .build();
636 let key = StreamKey::new("live", "cam");
637 let handle = engine.start_publish(&key).await.unwrap();
638
639 let mut session = Session::new(
640 ChunkReader::new(tokio::io::empty()),
641 ChunkWriter::new(tokio::io::sink()),
642 engine.clone(),
643 Some(engine.clone()),
644 );
645 session.app = Some("live".into());
646 session.handle = Some(handle);
647 session.role = Role::Publishing;
648 (session, engine)
649 }
650
651 #[tokio::test]
652 async fn publish_video_seq_header_then_keyframe_reaches_gop_cache() {
653 let (mut session, _engine) = publishing_session().await;
654
655 let seq = flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
657 session
658 .on_video(&RtmpMessage {
659 type_id: MSG_VIDEO,
660 timestamp: 0,
661 msg_stream_id: STREAM_ID,
662 payload: seq,
663 })
664 .unwrap();
665
666 let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
668 session
669 .on_video(&RtmpMessage {
670 type_id: MSG_VIDEO,
671 timestamp: 40,
672 msg_stream_id: STREAM_ID,
673 payload: kf,
674 })
675 .unwrap();
676
677 let handle = session.handle.as_ref().unwrap();
678 let (vcfg, _) = handle.cached_configs();
679 let vcfg = vcfg.expect("video config cached");
680 assert!(vcfg.flags.contains(crate::FrameFlags::CONFIG));
681 assert_eq!(&vcfg.data[..5], &[0, 0, 0, 1, 0x67]);
683
684 let replay = handle.replay_buffer();
687 let key = replay
688 .iter()
689 .find(|f| f.is_keyframe())
690 .expect("keyframe in GOP");
691 assert_eq!(&key.data[..5], &[0, 0, 0, 1, 0x67]);
692 }
693
694 #[tokio::test]
695 async fn publish_aac_builds_adts_frames() {
696 let (mut session, _engine) = publishing_session().await;
697 let asc = [0x12u8, 0x10];
699 let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &asc);
700 session
701 .on_audio(&RtmpMessage {
702 type_id: MSG_AUDIO,
703 timestamp: 0,
704 msg_stream_id: STREAM_ID,
705 payload: seq,
706 })
707 .unwrap();
708
709 let raw = flv::build_audio_tag(flv::PKT_RAW, &[0x01, 0x02, 0x03]);
710 session
711 .on_audio(&RtmpMessage {
712 type_id: MSG_AUDIO,
713 timestamp: 23,
714 msg_stream_id: STREAM_ID,
715 payload: raw,
716 })
717 .unwrap();
718
719 let handle = session.handle.as_ref().unwrap();
721 let (_, acfg) = handle.cached_configs();
722 assert_eq!(&acfg.unwrap().data[..], &asc);
723 }
724
725 #[tokio::test]
726 async fn full_publish_session_drives_real_chunk_reader() {
727 use tokio::io::AsyncReadExt;
728
729 let engine = Engine::builder()
730 .application(AppSpec::new("live").gop_cache(64))
731 .build();
732
733 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
737 let addr = listener.local_addr().unwrap();
738 let engine_for_server = engine.clone();
739 let server = tokio::spawn(async move {
740 let (sock, _) = listener.accept().await.unwrap();
741 let (r, w) = sock.into_split();
742 let mut session = Session::new(
743 ChunkReader::new(r),
744 ChunkWriter::new(w),
745 engine_for_server,
746 None,
747 );
748 session.run().await.unwrap(); let handle = session.handle.as_ref().expect("publish handle");
750 let has_config = handle.cached_configs().0.is_some();
751 let has_keyframe = handle.replay_buffer().iter().any(|f| f.is_keyframe());
752 (has_config, has_keyframe)
753 });
754
755 let client = tokio::spawn(async move {
756 let stream = tokio::net::TcpStream::connect(addr).await.unwrap();
757 let (mut read_half, write_half) = stream.into_split();
758 tokio::spawn(async move {
760 let mut buf = [0u8; 4096];
761 while let Ok(n) = read_half.read(&mut buf).await {
762 if n == 0 {
763 break;
764 }
765 }
766 });
767
768 let cmd = |values: &[Amf0Value]| {
769 let mut b = bytes::BytesMut::new();
770 for v in values {
771 v.encode(&mut b);
772 }
773 b
774 };
775 let mut w = ChunkWriter::new(write_half);
776 let connect = cmd(&[
777 amf::string("connect"),
778 Amf0Value::Number(1.0),
779 amf::object(vec![("app", amf::string("live"))]),
780 ]);
781 w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &connect)
782 .await
783 .unwrap();
784 let create = cmd(&[
785 amf::string("createStream"),
786 Amf0Value::Number(2.0),
787 Amf0Value::Null,
788 ]);
789 w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &create)
790 .await
791 .unwrap();
792 let publish = cmd(&[
793 amf::string("publish"),
794 Amf0Value::Number(3.0),
795 Amf0Value::Null,
796 amf::string("cam"),
797 amf::string("live"),
798 ]);
799 w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &publish)
800 .await
801 .unwrap();
802
803 let seq =
805 flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
806 w.write_message(CSID_VIDEO, MSG_VIDEO, 0, STREAM_ID, &seq)
807 .await
808 .unwrap();
809 let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
810 w.write_message(CSID_VIDEO, MSG_VIDEO, 40, STREAM_ID, &kf)
811 .await
812 .unwrap();
813 });
815
816 client.await.unwrap();
817 let (has_config, has_keyframe) = server.await.unwrap();
818 assert!(has_config, "video config reached the engine over the wire");
819 assert!(has_keyframe, "keyframe reached the GOP cache over the wire");
820 }
821
822 #[test]
823 fn annexb_config_splits_sps_and_pps() {
824 let annexb = Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42, 0x00, 1, 0, 0, 0, 1, 0x68, 0xCE]);
825 let cfg = annexb_to_avc_config(&annexb);
826 assert_eq!(cfg.sps.len(), 1);
827 assert_eq!(cfg.pps.len(), 1);
828 assert_eq!(cfg.sps[0][0] & 0x1F, 7);
829 assert_eq!(cfg.pps[0][0] & 0x1F, 8);
830 }
831}