1mod amf;
37mod chunk;
38mod flv;
39mod handshake;
40
41use crate::bus::{PlaybackRegistry, PublishRegistry, StreamHandle};
42use crate::{MediaFrame, Result, StreamError, StreamKey};
43use amf::Amf0Value;
44use async_trait::async_trait;
45use chunk::{ChunkReader, ChunkWriter, RtmpMessage};
46use std::net::SocketAddr;
47use std::sync::Arc;
48use tokio::net::TcpStream;
49use tokio_util::sync::CancellationToken;
50use tracing::{debug, info, warn};
51
52pub const DEFAULT_RTMP_PORT: u16 = 1935;
54const OUT_CHUNK_SIZE: usize = 4096;
57
58const MSG_SET_CHUNK_SIZE: u8 = 1;
60const MSG_ACK: u8 = 3;
61const MSG_USER_CONTROL: u8 = 4;
62const MSG_WINDOW_ACK_SIZE: u8 = 5;
63const MSG_SET_PEER_BANDWIDTH: u8 = 6;
64const MSG_AUDIO: u8 = 8;
65const MSG_VIDEO: u8 = 9;
66const MSG_DATA_AMF3: u8 = 15;
67const MSG_DATA_AMF0: u8 = 18;
68const MSG_COMMAND_AMF3: u8 = 17;
69const MSG_COMMAND_AMF0: u8 = 20;
70
71const CSID_CONTROL: u8 = 2;
73const CSID_COMMAND: u8 = 3;
74const CSID_AUDIO: u8 = 4;
75const CSID_VIDEO: u8 = 6;
76
77const STREAM_ID: u32 = 1;
80
81pub struct RtmpHandler {
90 addr: SocketAddr,
91 max_connections: usize,
92 playback: Option<Arc<dyn PlaybackRegistry>>,
93}
94
95impl RtmpHandler {
96 pub fn new(addr: SocketAddr) -> Self {
98 Self {
99 addr,
100 max_connections: 1024,
101 playback: None,
102 }
103 }
104
105 pub fn max_connections(mut self, max: usize) -> Self {
107 self.max_connections = max;
108 self
109 }
110
111 pub fn with_playback(mut self, playback: Arc<dyn PlaybackRegistry>) -> Self {
114 self.playback = Some(playback);
115 self
116 }
117}
118
119#[async_trait]
120impl crate::ProtocolHandler for RtmpHandler {
121 fn name(&self) -> &'static str {
122 "rtmp"
123 }
124
125 async fn run(
126 &self,
127 registry: Arc<dyn PublishRegistry>,
128 shutdown: CancellationToken,
129 ) -> Result<()> {
130 info!(addr = %self.addr, "rtmp handler listening");
131 let playback = self.playback.clone();
132 crate::protocol::run_tcp_ingest_server(
133 self.addr,
134 self.max_connections,
135 shutdown,
136 move |sock, peer| {
137 let registry = Arc::clone(®istry);
138 let playback = playback.clone();
139 async move {
140 if let Err(e) = handle_connection(sock, peer, registry, playback).await {
141 warn!(%peer, error = %e, "rtmp session ended with error");
142 }
143 }
144 },
145 )
146 .await
147 }
148}
149
150async fn handle_connection(
152 mut sock: TcpStream,
153 peer: SocketAddr,
154 registry: Arc<dyn PublishRegistry>,
155 playback: Option<Arc<dyn PlaybackRegistry>>,
156) -> Result<()> {
157 handshake::accept(&mut sock).await?;
158 debug!(%peer, "rtmp handshake complete");
159 let (read_half, write_half) = sock.into_split();
160 let mut session = Session::new(
161 ChunkReader::new(read_half),
162 ChunkWriter::new(write_half),
163 registry,
164 playback,
165 );
166 session.run().await
167}
168
169#[derive(PartialEq)]
171enum Role {
172 Pending,
173 Publishing,
174 Playing,
175}
176
177struct Session<R, W> {
179 reader: ChunkReader<R>,
180 writer: ChunkWriter<W>,
181 registry: Arc<dyn PublishRegistry>,
182 playback: Option<Arc<dyn PlaybackRegistry>>,
183 app: Option<String>,
184 role: Role,
185 publish_key: Option<StreamKey>,
186 handle: Option<StreamHandle>,
187 avc: Option<flv::AvcConfig>,
188 aac: Option<flv::AudioConfig>,
189 audio_seq_sent: bool,
191 stop: bool,
192}
193
194impl<R, W> Session<R, W>
195where
196 R: tokio::io::AsyncRead + Unpin,
197 W: tokio::io::AsyncWrite + Unpin,
198{
199 fn new(
200 reader: ChunkReader<R>,
201 writer: ChunkWriter<W>,
202 registry: Arc<dyn PublishRegistry>,
203 playback: Option<Arc<dyn PlaybackRegistry>>,
204 ) -> Self {
205 Self {
206 reader,
207 writer,
208 registry,
209 playback,
210 app: None,
211 role: Role::Pending,
212 publish_key: None,
213 handle: None,
214 avc: None,
215 aac: None,
216 audio_seq_sent: false,
217 stop: false,
218 }
219 }
220
221 async fn run(&mut self) -> Result<()> {
223 let result = self.event_loop().await;
224 if let Some(key) = self.publish_key.take() {
225 let _ = self.registry.end_publish(&key).await;
226 }
227 match result {
228 Ok(()) => Ok(()),
229 Err(e) if is_disconnect(&e) => Ok(()),
230 Err(e) => Err(e),
231 }
232 }
233
234 async fn event_loop(&mut self) -> Result<()> {
235 while !self.stop {
236 let msg = self.reader.read_message().await?;
237 self.process(msg).await?;
238 }
239 Ok(())
240 }
241
242 async fn process(&mut self, msg: RtmpMessage) -> Result<()> {
243 match msg.type_id {
244 MSG_SET_CHUNK_SIZE => {
245 if let Ok(b) = <[u8; 4]>::try_from(&msg.payload[..msg.payload.len().min(4)]) {
246 self.reader.set_chunk_size(u32::from_be_bytes(b) as usize);
247 }
248 }
249 MSG_COMMAND_AMF0 => self.handle_command(&msg.payload).await?,
250 MSG_COMMAND_AMF3 => {
251 self.handle_command(&msg.payload[1.min(msg.payload.len())..])
254 .await?
255 }
256 MSG_VIDEO if self.role == Role::Publishing => self.on_video(&msg)?,
257 MSG_AUDIO if self.role == Role::Publishing => self.on_audio(&msg)?,
258 MSG_DATA_AMF0 | MSG_DATA_AMF3 => { }
259 MSG_ACK | MSG_WINDOW_ACK_SIZE | MSG_SET_PEER_BANDWIDTH | MSG_USER_CONTROL => {}
260 other => debug!(
261 type_id = other,
262 stream = msg.msg_stream_id,
263 "ignoring RTMP message"
264 ),
265 }
266 Ok(())
267 }
268
269 async fn handle_command(&mut self, payload: &[u8]) -> Result<()> {
272 let values = amf::decode_all(payload);
273 let Some(name) = values.first().and_then(Amf0Value::as_str) else {
274 return Ok(());
275 };
276 let txn = values.get(1).and_then(Amf0Value::as_number).unwrap_or(0.0);
277 match name {
278 "connect" => self.on_connect(txn, values.get(2)).await,
279 "createStream" => self.on_create_stream(txn).await,
280 "publish" => self.on_publish(values.get(3)).await,
281 "play" => self.on_play(values.get(3)).await,
282 "releaseStream" | "FCPublish" | "FCUnpublish" | "deleteStream" | "closeStream"
283 | "FCSubscribe" => Ok(()),
284 other => {
285 debug!(command = other, "unhandled RTMP command");
286 Ok(())
287 }
288 }
289 }
290
291 async fn on_connect(&mut self, txn: f64, cmd_obj: Option<&Amf0Value>) -> Result<()> {
292 let app = cmd_obj
293 .and_then(|o| o.get("app"))
294 .and_then(Amf0Value::as_str)
295 .unwrap_or("")
296 .split('?')
297 .next()
298 .unwrap_or("")
299 .to_string();
300 self.app = Some(app);
301
302 self.send_control(MSG_WINDOW_ACK_SIZE, &2_500_000u32.to_be_bytes())
304 .await?;
305 let mut bw = Vec::from(2_500_000u32.to_be_bytes());
306 bw.push(2); self.send_control(MSG_SET_PEER_BANDWIDTH, &bw).await?;
308 self.send_control(MSG_SET_CHUNK_SIZE, &(OUT_CHUNK_SIZE as u32).to_be_bytes())
309 .await?;
310 self.writer.set_chunk_size(OUT_CHUNK_SIZE);
311
312 let props = amf::object(vec![
313 ("fmsVer", amf::string("FMS/3,0,1,123")),
314 ("capabilities", Amf0Value::Number(31.0)),
315 ]);
316 let info = amf::object(vec![
317 ("level", amf::string("status")),
318 ("code", amf::string("NetConnection.Connect.Success")),
319 ("description", amf::string("Connection succeeded.")),
320 ("objectEncoding", Amf0Value::Number(0.0)),
321 ]);
322 self.send_command(
323 0,
324 &[amf::string("_result"), Amf0Value::Number(txn), props, info],
325 )
326 .await
327 }
328
329 async fn on_create_stream(&mut self, txn: f64) -> Result<()> {
330 self.send_command(
331 0,
332 &[
333 amf::string("_result"),
334 Amf0Value::Number(txn),
335 Amf0Value::Null,
336 Amf0Value::Number(STREAM_ID as f64),
337 ],
338 )
339 .await
340 }
341
342 async fn on_publish(&mut self, name: Option<&Amf0Value>) -> Result<()> {
343 let key = self.stream_key(name)?;
344 let handle = self.registry.start_publish(&key).await?;
345 info!(stream = %key, "rtmp publish started");
346 self.handle = Some(handle);
347 self.publish_key = Some(key);
348 self.role = Role::Publishing;
349 self.send_status("status", "NetStream.Publish.Start", "Publishing started.")
350 .await
351 }
352
353 async fn on_play(&mut self, name: Option<&Amf0Value>) -> Result<()> {
354 let key = self.stream_key(name)?;
355 let Some(playback) = self.playback.clone() else {
356 self.send_status("error", "NetStream.Play.Failed", "Playback not enabled.")
357 .await?;
358 self.stop = true;
359 return Ok(());
360 };
361 let handle = playback.get_stream(&key)?;
362 info!(stream = %key, "rtmp play started");
363 self.role = Role::Playing;
364
365 let mut begin = Vec::from(0u16.to_be_bytes()); begin.extend_from_slice(&STREAM_ID.to_be_bytes());
368 self.send_control(MSG_USER_CONTROL, &begin).await?;
369 self.send_status("status", "NetStream.Play.Reset", "Playing and resetting.")
370 .await?;
371 self.send_status("status", "NetStream.Play.Start", "Started playing.")
372 .await?;
373
374 self.serve_play(handle).await?;
375 self.stop = true;
376 Ok(())
377 }
378
379 fn on_video(&mut self, msg: &RtmpMessage) -> Result<()> {
382 let Some(header) = flv::parse_video_header(&msg.payload) else {
383 return Ok(()); };
385 let body = &msg.payload[header.body_offset..];
386 let ts = msg.timestamp as i64;
387
388 if header.avc_packet_type == flv::PKT_SEQUENCE_HEADER {
389 let cfg = flv::AvcConfig::parse(body)
390 .ok_or_else(|| StreamError::protocol("bad AVCDecoderConfigurationRecord"))?;
391 let mut frame =
392 MediaFrame::new_video(ts, ts, cfg.to_annexb(), crate::CodecId::H264, false);
393 frame.flags |= crate::FrameFlags::CONFIG;
394 self.avc = Some(cfg);
395 self.publish(frame);
396 } else if header.avc_packet_type == flv::PKT_RAW {
397 let Some(cfg) = self.avc.as_ref() else {
398 return Ok(()); };
400 let annexb = cfg
401 .avcc_to_annexb(body, header.is_keyframe)
402 .ok_or_else(|| StreamError::protocol("malformed AVCC NAL"))?;
403 let pts = ts + header.composition_time as i64;
404 let frame =
405 MediaFrame::new_video(pts, ts, annexb, crate::CodecId::H264, header.is_keyframe);
406 self.publish(frame);
407 }
408 Ok(())
409 }
410
411 fn on_audio(&mut self, msg: &RtmpMessage) -> Result<()> {
412 let payload = &msg.payload;
413 let Some(&b0) = payload.first() else {
414 return Ok(());
415 };
416 if b0 >> 4 != flv::AUDIO_FORMAT_AAC {
417 return Ok(()); }
419 let aac_packet_type = *payload.get(1).unwrap_or(&0);
420 let body = &payload[2.min(payload.len())..];
421 let ts = msg.timestamp as i64;
422
423 if aac_packet_type == flv::PKT_SEQUENCE_HEADER {
424 let cfg = flv::AudioConfig::parse(body)
425 .ok_or_else(|| StreamError::protocol("bad AudioSpecificConfig"))?;
426 self.aac = Some(cfg);
427 let mut frame =
428 MediaFrame::new_audio(ts, bytes::Bytes::copy_from_slice(body), crate::CodecId::AAC);
429 frame.flags |= crate::FrameFlags::CONFIG;
430 self.publish(frame);
431 } else if let Some(cfg) = self.aac.as_ref() {
432 let adts = cfg.to_adts(body);
433 self.publish(MediaFrame::new_audio(ts, adts, crate::CodecId::AAC));
434 }
435 Ok(())
436 }
437
438 fn publish(&self, frame: MediaFrame) {
439 if let Some(handle) = self.handle.as_ref() {
440 let _ = handle.publish_frame(frame);
441 }
442 }
443
444 async fn serve_play(&mut self, handle: StreamHandle) -> Result<()> {
447 for frame in handle.replay_buffer() {
449 self.send_media(&frame).await?;
450 }
451 let mut sub = handle.subscribe_resilient();
452 while let Some(frame) = sub.recv().await {
453 self.send_media(&frame).await?;
454 }
455 Ok(())
456 }
457
458 async fn send_media(&mut self, frame: &MediaFrame) -> Result<()> {
459 let is_config = frame.flags.contains(crate::FrameFlags::CONFIG);
460 match frame.codec {
461 crate::CodecId::H264 => {
462 let ts = frame.dts.max(0) as u32;
463 let tag = if is_config {
464 let cfg = annexb_to_avc_config(&frame.data);
465 flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &cfg.to_avc_record())
466 } else {
467 let avcc = crate::codec::h264::annexb_to_avcc(&frame.data);
468 let cts = (frame.pts - frame.dts) as i32;
469 flv::build_video_tag(frame.is_keyframe(), flv::PKT_RAW, cts, &avcc)
470 };
471 self.writer
472 .write_message(CSID_VIDEO, MSG_VIDEO, ts, STREAM_ID, &tag)
473 .await
474 }
475 crate::CodecId::AAC => {
476 let ts = frame.pts.max(0) as u32;
477 if is_config {
478 self.audio_seq_sent = true;
479 let tag = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &frame.data);
480 return self
481 .writer
482 .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
483 .await;
484 }
485 if !self.audio_seq_sent {
489 if let Some(cfg) = flv::AudioConfig::from_adts(&frame.data) {
490 let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &cfg.to_asc());
491 self.writer
492 .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &seq)
493 .await?;
494 self.audio_seq_sent = true;
495 }
496 }
497 let raw = frame.data.get(7..).unwrap_or(&[]);
499 let tag = flv::build_audio_tag(flv::PKT_RAW, raw);
500 self.writer
501 .write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
502 .await
503 }
504 _ => Ok(()), }
506 }
507
508 fn stream_key(&self, name: Option<&Amf0Value>) -> Result<StreamKey> {
512 let app = self
513 .app
514 .clone()
515 .ok_or_else(|| StreamError::protocol("stream command before connect"))?;
516 let stream = name
517 .and_then(Amf0Value::as_str)
518 .ok_or_else(|| StreamError::protocol("missing stream name"))?
519 .split('?')
520 .next()
521 .unwrap_or("")
522 .to_string();
523 Ok(StreamKey::new(app, stream))
524 }
525
526 async fn send_control(&mut self, type_id: u8, payload: &[u8]) -> Result<()> {
527 self.writer
528 .write_message(CSID_CONTROL, type_id, 0, 0, payload)
529 .await
530 }
531
532 async fn send_command(&mut self, msg_stream_id: u32, values: &[Amf0Value]) -> Result<()> {
533 let mut buf = bytes::BytesMut::new();
534 for v in values {
535 v.encode(&mut buf);
536 }
537 self.writer
538 .write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, msg_stream_id, &buf)
539 .await
540 }
541
542 async fn send_status(&mut self, level: &str, code: &str, description: &str) -> Result<()> {
543 let info = amf::object(vec![
544 ("level", amf::string(level)),
545 ("code", amf::string(code)),
546 ("description", amf::string(description)),
547 ]);
548 self.send_command(
549 STREAM_ID,
550 &[
551 amf::string("onStatus"),
552 Amf0Value::Number(0.0),
553 Amf0Value::Null,
554 info,
555 ],
556 )
557 .await
558 }
559}
560
561fn annexb_to_avc_config(annexb: &[u8]) -> flv::AvcConfig {
563 let mut cfg = flv::AvcConfig {
564 nal_length_size: 4,
565 ..Default::default()
566 };
567 for nal in crate::codec::h264::iter_nals_annexb(annexb) {
568 match nal.first().map(|b| b & 0x1F) {
569 Some(7) => cfg.sps.push(nal.to_vec()),
570 Some(8) => cfg.pps.push(nal.to_vec()),
571 _ => {}
572 }
573 }
574 cfg
575}
576
577fn is_disconnect(e: &StreamError) -> bool {
579 matches!(e, StreamError::Io(io) if matches!(
580 io.kind(),
581 std::io::ErrorKind::UnexpectedEof
582 | std::io::ErrorKind::ConnectionReset
583 | std::io::ErrorKind::BrokenPipe
584 | std::io::ErrorKind::ConnectionAborted
585 ))
586}
587
588#[cfg(test)]
589mod tests {
590 use super::*;
591 use crate::{AppSpec, Engine};
592 use bytes::Bytes;
593
594 fn avcc_idr() -> Vec<u8> {
596 let slice = [0x65u8, 0x88, 0x84, 0x00];
597 let mut v = (slice.len() as u32).to_be_bytes().to_vec();
598 v.extend_from_slice(&slice);
599 v
600 }
601
602 fn avc_decoder_config() -> Vec<u8> {
603 let sps = [0x67u8, 0x42, 0x00, 0x1F, 0xAA];
604 let pps = [0x68u8, 0xCE, 0x3C, 0x80];
605 let mut r = vec![1, sps[1], sps[2], sps[3], 0xFF, 0xE1];
606 r.extend_from_slice(&(sps.len() as u16).to_be_bytes());
607 r.extend_from_slice(&sps);
608 r.push(1);
609 r.extend_from_slice(&(pps.len() as u16).to_be_bytes());
610 r.extend_from_slice(&pps);
611 r
612 }
613
614 async fn publishing_session() -> (Session<tokio::io::Empty, tokio::io::Sink>, Arc<Engine>) {
617 let engine = Engine::builder()
618 .application(AppSpec::new("live").gop_cache(64))
619 .build();
620 let key = StreamKey::new("live", "cam");
621 let handle = engine.start_publish(&key).await.unwrap();
622
623 let mut session = Session::new(
624 ChunkReader::new(tokio::io::empty()),
625 ChunkWriter::new(tokio::io::sink()),
626 engine.clone(),
627 Some(engine.clone()),
628 );
629 session.app = Some("live".into());
630 session.handle = Some(handle);
631 session.role = Role::Publishing;
632 (session, engine)
633 }
634
635 #[tokio::test]
636 async fn publish_video_seq_header_then_keyframe_reaches_gop_cache() {
637 let (mut session, _engine) = publishing_session().await;
638
639 let seq = flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
641 session
642 .on_video(&RtmpMessage {
643 type_id: MSG_VIDEO,
644 timestamp: 0,
645 msg_stream_id: STREAM_ID,
646 payload: seq,
647 })
648 .unwrap();
649
650 let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
652 session
653 .on_video(&RtmpMessage {
654 type_id: MSG_VIDEO,
655 timestamp: 40,
656 msg_stream_id: STREAM_ID,
657 payload: kf,
658 })
659 .unwrap();
660
661 let handle = session.handle.as_ref().unwrap();
662 let (vcfg, _) = handle.cached_configs();
663 let vcfg = vcfg.expect("video config cached");
664 assert!(vcfg.flags.contains(crate::FrameFlags::CONFIG));
665 assert_eq!(&vcfg.data[..5], &[0, 0, 0, 1, 0x67]);
667
668 let replay = handle.replay_buffer();
671 let key = replay
672 .iter()
673 .find(|f| f.is_keyframe())
674 .expect("keyframe in GOP");
675 assert_eq!(&key.data[..5], &[0, 0, 0, 1, 0x67]);
676 }
677
678 #[tokio::test]
679 async fn publish_aac_builds_adts_frames() {
680 let (mut session, _engine) = publishing_session().await;
681 let asc = [0x12u8, 0x10];
683 let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &asc);
684 session
685 .on_audio(&RtmpMessage {
686 type_id: MSG_AUDIO,
687 timestamp: 0,
688 msg_stream_id: STREAM_ID,
689 payload: seq,
690 })
691 .unwrap();
692
693 let raw = flv::build_audio_tag(flv::PKT_RAW, &[0x01, 0x02, 0x03]);
694 session
695 .on_audio(&RtmpMessage {
696 type_id: MSG_AUDIO,
697 timestamp: 23,
698 msg_stream_id: STREAM_ID,
699 payload: raw,
700 })
701 .unwrap();
702
703 let handle = session.handle.as_ref().unwrap();
705 let (_, acfg) = handle.cached_configs();
706 assert_eq!(&acfg.unwrap().data[..], &asc);
707 }
708
709 #[tokio::test]
710 async fn full_publish_session_drives_real_chunk_reader() {
711 use tokio::io::AsyncReadExt;
712
713 let engine = Engine::builder()
714 .application(AppSpec::new("live").gop_cache(64))
715 .build();
716
717 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
721 let addr = listener.local_addr().unwrap();
722 let engine_for_server = engine.clone();
723 let server = tokio::spawn(async move {
724 let (sock, _) = listener.accept().await.unwrap();
725 let (r, w) = sock.into_split();
726 let mut session = Session::new(
727 ChunkReader::new(r),
728 ChunkWriter::new(w),
729 engine_for_server,
730 None,
731 );
732 session.run().await.unwrap(); let handle = session.handle.as_ref().expect("publish handle");
734 let has_config = handle.cached_configs().0.is_some();
735 let has_keyframe = handle.replay_buffer().iter().any(|f| f.is_keyframe());
736 (has_config, has_keyframe)
737 });
738
739 let client = tokio::spawn(async move {
740 let stream = tokio::net::TcpStream::connect(addr).await.unwrap();
741 let (mut read_half, write_half) = stream.into_split();
742 tokio::spawn(async move {
744 let mut buf = [0u8; 4096];
745 while let Ok(n) = read_half.read(&mut buf).await {
746 if n == 0 {
747 break;
748 }
749 }
750 });
751
752 let cmd = |values: &[Amf0Value]| {
753 let mut b = bytes::BytesMut::new();
754 for v in values {
755 v.encode(&mut b);
756 }
757 b
758 };
759 let mut w = ChunkWriter::new(write_half);
760 let connect = cmd(&[
761 amf::string("connect"),
762 Amf0Value::Number(1.0),
763 amf::object(vec![("app", amf::string("live"))]),
764 ]);
765 w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &connect)
766 .await
767 .unwrap();
768 let create = cmd(&[
769 amf::string("createStream"),
770 Amf0Value::Number(2.0),
771 Amf0Value::Null,
772 ]);
773 w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &create)
774 .await
775 .unwrap();
776 let publish = cmd(&[
777 amf::string("publish"),
778 Amf0Value::Number(3.0),
779 Amf0Value::Null,
780 amf::string("cam"),
781 amf::string("live"),
782 ]);
783 w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &publish)
784 .await
785 .unwrap();
786
787 let seq =
789 flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
790 w.write_message(CSID_VIDEO, MSG_VIDEO, 0, STREAM_ID, &seq)
791 .await
792 .unwrap();
793 let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
794 w.write_message(CSID_VIDEO, MSG_VIDEO, 40, STREAM_ID, &kf)
795 .await
796 .unwrap();
797 });
799
800 client.await.unwrap();
801 let (has_config, has_keyframe) = server.await.unwrap();
802 assert!(has_config, "video config reached the engine over the wire");
803 assert!(has_keyframe, "keyframe reached the GOP cache over the wire");
804 }
805
806 #[test]
807 fn annexb_config_splits_sps_and_pps() {
808 let annexb = Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42, 0x00, 1, 0, 0, 0, 1, 0x68, 0xCE]);
809 let cfg = annexb_to_avc_config(&annexb);
810 assert_eq!(cfg.sps.len(), 1);
811 assert_eq!(cfg.pps.len(), 1);
812 assert_eq!(cfg.sps[0][0] & 0x1F, 7);
813 assert_eq!(cfg.pps[0][0] & 0x1F, 8);
814 }
815}