1use std::collections::VecDeque;
84use std::net::ToSocketAddrs;
85use std::time::Duration;
86
87use oxideav_core::{
88 BytesSource, CodecId, CodecParameters, Error as CoreError, Packet, PacketSource,
89 Result as CoreResult, SourceRegistry, StreamInfo, TimeBase,
90};
91
92use crate::amf::Amf0Value;
93use crate::error::{Error as RtmpError, Result as RtmpResult};
94use crate::flv::{
95 self, AudioTag, VideoTag, AAC_PACKET_TYPE_SEQUENCE_HEADER, AUDIO_FORMAT_AAC,
96 AUDIO_FORMAT_ADPCM, AUDIO_FORMAT_G711_ALAW, AUDIO_FORMAT_G711_MULAW, AUDIO_FORMAT_MP3,
97 AUDIO_FORMAT_NELLYMOSER, AUDIO_FORMAT_NELLYMOSER_16K_MONO, AUDIO_FORMAT_NELLYMOSER_8K_MONO,
98 AUDIO_FORMAT_PCM_LE, AUDIO_FORMAT_PCM_LE_8BIT, AUDIO_FORMAT_SPEEX, VIDEO_CODEC_AVC,
99 VIDEO_CODEC_H263, VIDEO_CODEC_SCREEN, VIDEO_CODEC_SCREEN_V2, VIDEO_CODEC_VP6, VIDEO_CODEC_VP6A,
100};
101use crate::server::{RtmpServer, RtmpSession, StreamPacket};
102
103pub const AUDIO_STREAM_INDEX: u32 = 0;
105pub const VIDEO_STREAM_INDEX: u32 = 1;
107pub const RTMP_TIME_BASE: TimeBase = TimeBase::new(1, 1_000_000_000);
117
118pub const RTMP_MS_TO_NS: i64 = 1_000_000;
121
122pub const PROBE_LIMIT: usize = 32;
125
126pub const PROBE_READ_TIMEOUT: Duration = Duration::from_secs(10);
130
131struct BufferedPacket {
134 packet: Packet,
135 #[allow(dead_code)]
138 is_audio: bool,
139}
140
141pub struct RtmpPacketSource {
149 session: RtmpSession,
150 streams: Vec<StreamInfo>,
151 metadata: Vec<(String, String)>,
152 buffered: VecDeque<BufferedPacket>,
153 ended: bool,
158}
159
160impl RtmpPacketSource {
161 pub fn from_session(session: RtmpSession) -> Self {
166 Self {
167 session,
168 streams: Vec::new(),
169 metadata: Vec::new(),
170 buffered: VecDeque::new(),
171 ended: false,
172 }
173 }
174
175 pub fn from_session_with_probe(
186 mut session: RtmpSession,
187 read_timeout: Option<Duration>,
188 ) -> RtmpResult<Self> {
189 if let Some(d) = read_timeout {
190 let _ = session.set_read_timeout(Some(d));
192 }
193 let mut streams: Vec<StreamInfo> = Vec::new();
194 let mut metadata: Vec<(String, String)> = Vec::new();
195 let mut buffered: VecDeque<BufferedPacket> = VecDeque::new();
196 let mut have_audio = false;
197 let mut have_video = false;
198 let mut ended = false;
199
200 for _ in 0..PROBE_LIMIT {
201 if have_audio && have_video {
202 break;
203 }
204 let next = match session.next_packet() {
205 Ok(Some(p)) => p,
206 Ok(None) => {
207 ended = true;
208 break;
209 }
210 Err(RtmpError::Io(e))
211 if matches!(
212 e.kind(),
213 std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
214 ) =>
215 {
216 break;
219 }
220 Err(e) => return Err(e),
221 };
222 match next {
223 StreamPacket::Audio { timestamp, tag } => {
224 if !have_audio {
225 let params = audio_codec_params(&tag);
226 streams.push(StreamInfo {
227 index: AUDIO_STREAM_INDEX,
228 time_base: RTMP_TIME_BASE,
229 duration: None,
230 start_time: None,
231 params,
232 });
233 have_audio = true;
234 }
235 let pkt = audio_to_packet(timestamp, &tag);
236 buffered.push_back(BufferedPacket {
237 packet: pkt,
238 is_audio: true,
239 });
240 }
241 StreamPacket::Video { timestamp, tag } => {
242 if !have_video {
243 let params = video_codec_params(&tag);
244 streams.push(StreamInfo {
245 index: VIDEO_STREAM_INDEX,
246 time_base: RTMP_TIME_BASE,
247 duration: None,
248 start_time: None,
249 params,
250 });
251 have_video = true;
252 }
253 let pkt = video_to_packet(timestamp, &tag);
254 buffered.push_back(BufferedPacket {
255 packet: pkt,
256 is_audio: false,
257 });
258 }
259 StreamPacket::Metadata(value) => {
260 flatten_metadata(&value, &mut metadata);
261 }
262 }
263 }
264
265 let _ = session.set_read_timeout(None);
268
269 streams.sort_by_key(|s| s.index);
271
272 Ok(Self {
273 session,
274 streams,
275 metadata,
276 buffered,
277 ended,
278 })
279 }
280
281 pub fn session(&self) -> &RtmpSession {
286 &self.session
287 }
288}
289
290impl PacketSource for RtmpPacketSource {
291 fn streams(&self) -> &[StreamInfo] {
292 &self.streams
293 }
294
295 fn next_packet(&mut self) -> CoreResult<Packet> {
296 if let Some(buf) = self.buffered.pop_front() {
297 return Ok(buf.packet);
298 }
299 if self.ended {
300 return Err(CoreError::Eof);
301 }
302 loop {
303 let event = self.session.next_packet().map_err(rtmp_to_core_err)?;
304 match event {
305 Some(StreamPacket::Audio { timestamp, tag }) => {
306 if self.streams.iter().all(|s| s.index != AUDIO_STREAM_INDEX) {
307 let params = audio_codec_params(&tag);
308 self.streams.push(StreamInfo {
309 index: AUDIO_STREAM_INDEX,
310 time_base: RTMP_TIME_BASE,
311 duration: None,
312 start_time: None,
313 params,
314 });
315 self.streams.sort_by_key(|s| s.index);
316 }
317 return Ok(audio_to_packet(timestamp, &tag));
318 }
319 Some(StreamPacket::Video { timestamp, tag }) => {
320 if self.streams.iter().all(|s| s.index != VIDEO_STREAM_INDEX) {
321 let params = video_codec_params(&tag);
322 self.streams.push(StreamInfo {
323 index: VIDEO_STREAM_INDEX,
324 time_base: RTMP_TIME_BASE,
325 duration: None,
326 start_time: None,
327 params,
328 });
329 self.streams.sort_by_key(|s| s.index);
330 }
331 return Ok(video_to_packet(timestamp, &tag));
332 }
333 Some(StreamPacket::Metadata(value)) => {
334 flatten_metadata(&value, &mut self.metadata);
335 continue;
337 }
338 None => {
339 self.ended = true;
340 return Err(CoreError::Eof);
341 }
342 }
343 }
344 }
345
346 fn metadata(&self) -> &[(String, String)] {
347 &self.metadata
348 }
349
350 fn duration_micros(&self) -> Option<i64> {
351 None
353 }
354}
355
356pub fn audio_to_packet(timestamp_ms: u32, tag: &AudioTag) -> Packet {
384 let ts_ns = (timestamp_ms as i64) * RTMP_MS_TO_NS;
385 let nano_offset = tag.timestamp_offset_nano() as i64;
386 let presentation_ns = ts_ns + nano_offset;
387 let (data, is_header) = if tag.audio_fourcc.is_some() {
388 let header = matches!(
394 tag.ex_packet_type,
395 Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START) | Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_END)
396 );
397 (tag.body.clone(), header)
398 } else {
399 let mut data = Vec::with_capacity(tag.body.len() + 1);
400 if tag.sound_format == AUDIO_FORMAT_AAC {
401 data.push(tag.aac_packet_type.unwrap_or(flv::AAC_PACKET_TYPE_RAW));
402 }
403 data.extend_from_slice(&tag.body);
404 let header = tag.sound_format == AUDIO_FORMAT_AAC
405 && tag.aac_packet_type == Some(AAC_PACKET_TYPE_SEQUENCE_HEADER);
406 (data, header)
407 };
408 let flags = oxideav_core::packet::PacketFlags {
409 header: is_header,
410 ..Default::default()
411 };
412 Packet {
413 stream_index: AUDIO_STREAM_INDEX,
414 time_base: RTMP_TIME_BASE,
415 pts: Some(presentation_ns),
416 dts: Some(presentation_ns),
417 duration: None,
418 flags,
419 data,
420 }
421}
422
423pub fn video_to_packet(timestamp_ms: u32, tag: &VideoTag) -> Packet {
442 let dts_ns = (timestamp_ms as i64) * RTMP_MS_TO_NS;
443 let has_cts =
453 tag.codec_id == VIDEO_CODEC_AVC || (tag.fourcc.is_some() && tag.composition_time != 0);
454 let cts_ns = if has_cts {
455 (tag.composition_time as i64) * RTMP_MS_TO_NS
456 } else {
457 0
458 };
459 let nano_offset = tag.timestamp_offset_nano() as i64;
464 let pts_ns = dts_ns + cts_ns + nano_offset;
465 let is_header = tag.is_avc_sequence_header() || tag.is_ex_sequence_header();
470 let is_metadata = tag.is_ex_metadata();
477 let flags = oxideav_core::packet::PacketFlags {
478 keyframe: !is_metadata && tag.is_keyframe(),
479 header: is_header || is_metadata,
480 ..Default::default()
481 };
482 Packet {
483 stream_index: VIDEO_STREAM_INDEX,
484 time_base: RTMP_TIME_BASE,
485 pts: Some(pts_ns),
486 dts: Some(dts_ns),
487 duration: None,
488 flags,
489 data: tag.body.clone(),
490 }
491}
492
493pub fn audio_codec_id(sound_format: u8) -> CodecId {
503 let s = match sound_format {
504 AUDIO_FORMAT_PCM_LE => "pcm_s16le",
505 AUDIO_FORMAT_ADPCM => "adpcm_swf",
506 AUDIO_FORMAT_MP3 => "mp3",
507 AUDIO_FORMAT_PCM_LE_8BIT => "pcm_u8",
508 AUDIO_FORMAT_NELLYMOSER_16K_MONO => "nellymoser",
509 AUDIO_FORMAT_NELLYMOSER_8K_MONO => "nellymoser",
510 AUDIO_FORMAT_NELLYMOSER => "nellymoser",
511 AUDIO_FORMAT_G711_ALAW => "pcm_alaw",
512 AUDIO_FORMAT_G711_MULAW => "pcm_mulaw",
513 AUDIO_FORMAT_AAC => "aac",
514 AUDIO_FORMAT_SPEEX => "speex",
515 _ => "unknown",
516 };
517 CodecId::new(s)
518}
519
520pub fn audio_fourcc_codec_id(fourcc: [u8; 4]) -> CodecId {
525 let s = match &fourcc {
526 b"Opus" => "opus",
527 b"fLaC" => "flac",
528 b"ac-3" => "ac3",
529 b"ec-3" => "eac3",
530 b".mp3" => "mp3",
531 b"mp4a" => "aac",
532 _ => "unknown",
533 };
534 CodecId::new(s)
535}
536
537pub fn audio_codec_id_for_tag(tag: &AudioTag) -> CodecId {
541 if let Some(fcc) = tag.audio_fourcc {
542 audio_fourcc_codec_id(fcc)
543 } else {
544 audio_codec_id(tag.sound_format)
545 }
546}
547
548pub fn video_codec_id(codec_id: u8) -> CodecId {
553 let s = match codec_id {
554 VIDEO_CODEC_H263 => "h263",
555 VIDEO_CODEC_SCREEN => "flashsv",
556 VIDEO_CODEC_VP6 => "vp6f",
557 VIDEO_CODEC_VP6A => "vp6a",
558 VIDEO_CODEC_SCREEN_V2 => "flashsv2",
559 VIDEO_CODEC_AVC => "h264",
560 _ => "unknown",
561 };
562 CodecId::new(s)
563}
564
565pub fn video_fourcc_codec_id(fourcc: [u8; 4]) -> CodecId {
572 let s = match &fourcc {
573 b"av01" => "av1",
574 b"vp09" => "vp9",
575 b"hvc1" => "hevc",
576 b"vp08" => "vp8",
578 b"avc1" => "h264",
579 b"vvc1" => "vvc",
580 _ => "unknown",
581 };
582 CodecId::new(s)
583}
584
585pub fn video_codec_id_for_tag(tag: &VideoTag) -> CodecId {
589 if let Some(fcc) = tag.fourcc {
590 video_fourcc_codec_id(fcc)
591 } else {
592 video_codec_id(tag.codec_id)
593 }
594}
595
596fn audio_codec_params(tag: &AudioTag) -> CodecParameters {
614 let mut p = CodecParameters::audio(audio_codec_id_for_tag(tag));
615 if tag.audio_fourcc.is_some() {
616 if tag.ex_packet_type == Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START) {
617 p.extradata = tag.body.clone();
618 }
619 return p;
620 }
621 if tag.sound_format != AUDIO_FORMAT_AAC {
622 let rate = match tag.sound_rate {
624 0 => 5_512,
625 1 => 11_025,
626 2 => 22_050,
627 _ => 44_100,
628 };
629 p.sample_rate = Some(rate);
630 p.channels = Some(if tag.stereo { 2 } else { 1 });
631 }
632 if tag.sound_format == AUDIO_FORMAT_AAC
633 && tag.aac_packet_type == Some(AAC_PACKET_TYPE_SEQUENCE_HEADER)
634 {
635 p.extradata = tag.body.clone();
636 }
637 p
638}
639
640fn video_codec_params(tag: &VideoTag) -> CodecParameters {
646 let mut p = CodecParameters::video(video_codec_id_for_tag(tag));
647 if tag.is_avc_sequence_header() || tag.is_ex_sequence_header() {
656 p.extradata = tag.body.clone();
657 }
658 p
659}
660
661fn flatten_metadata(value: &Amf0Value, out: &mut Vec<(String, String)>) {
666 let pairs: &[(String, Amf0Value)] = match value {
667 Amf0Value::Object(p) => p.as_slice(),
668 Amf0Value::EcmaArray(p) => p.as_slice(),
669 _ => return,
670 };
671 for (k, v) in pairs {
672 let s = match v {
673 Amf0Value::Number(n) => format!("{n}"),
674 Amf0Value::Boolean(b) => b.to_string(),
675 Amf0Value::String(s) => s.clone(),
676 _ => continue,
679 };
680 out.push((k.clone(), s));
681 }
682}
683
684pub(crate) fn rtmp_to_core_err(e: RtmpError) -> CoreError {
689 match e {
690 RtmpError::Io(io) => CoreError::Io(io),
691 RtmpError::UnexpectedEof => CoreError::Eof,
692 RtmpError::Timeout => CoreError::Other("rtmp: timeout".to_string()),
693 RtmpError::Rejected(r) => CoreError::Other(format!("rtmp: rejected: {r}")),
694 RtmpError::ProtocolViolation(m) => CoreError::InvalidData(format!("rtmp protocol: {m}")),
695 RtmpError::InvalidAmf0(m) => CoreError::InvalidData(format!("rtmp amf0: {m}")),
696 RtmpError::InvalidChunk(m) => CoreError::InvalidData(format!("rtmp chunk: {m}")),
697 RtmpError::InvalidCommand(m) => CoreError::InvalidData(format!("rtmp command: {m}")),
698 RtmpError::UnsupportedHandshakeVersion(v) => {
699 CoreError::Unsupported(format!("rtmp handshake version 0x{v:02x}"))
700 }
701 RtmpError::Other(m) => CoreError::Other(format!("rtmp: {m}")),
702 }
703}
704
705#[derive(Debug, Clone)]
712struct ListenUrl {
713 bind_addr: String,
714 expected_app: String,
715 expected_stream: String,
716}
717
718impl ListenUrl {
719 fn parse(uri: &str) -> CoreResult<Self> {
720 let s = uri
721 .strip_prefix("rtmp://")
722 .ok_or_else(|| CoreError::InvalidData(format!("not an rtmp:// URL: {uri}")))?;
723 let slash = s
724 .find('/')
725 .ok_or_else(|| CoreError::InvalidData(format!("rtmp URL missing /app: {uri}")))?;
726 let authority = &s[..slash];
727 let path = &s[slash + 1..];
728 let (host, port_str) = match authority.rsplit_once(':') {
729 Some((h, p)) => (h, p),
730 None => (authority, "1935"),
731 };
732 let port: u16 = port_str
733 .parse()
734 .map_err(|e| CoreError::InvalidData(format!("rtmp URL bad port {port_str:?}: {e}")))?;
735 let bind_host = if host.is_empty() { "0.0.0.0" } else { host };
736 let bind_addr = format!("{bind_host}:{port}");
737 let (app, stream_name) = match path.find('/') {
738 Some(i) => (path[..i].to_owned(), path[i + 1..].to_owned()),
739 None => (path.to_owned(), String::new()),
740 };
741 Ok(Self {
742 bind_addr,
743 expected_app: app,
744 expected_stream: stream_name,
745 })
746 }
747}
748
749pub fn open_rtmp(uri: &str) -> CoreResult<Box<dyn PacketSource>> {
761 let url = ListenUrl::parse(uri)?;
762 let resolved = url
765 .bind_addr
766 .to_socket_addrs()
767 .map_err(CoreError::Io)?
768 .next()
769 .ok_or_else(|| {
770 CoreError::InvalidData(format!("rtmp URL resolved no addresses: {}", url.bind_addr))
771 })?;
772 let server = RtmpServer::bind(resolved).map_err(rtmp_to_core_err)?;
773 let req = server.accept().map_err(rtmp_to_core_err)?;
774 if !url.expected_app.is_empty() && req.app != url.expected_app {
775 let actual = req.app.clone();
776 let expected = url.expected_app.clone();
777 let _ = req.reject("unexpected app");
778 return Err(CoreError::InvalidData(format!(
779 "rtmp publisher app mismatch: expected {expected:?}, got {actual:?}"
780 )));
781 }
782 if !url.expected_stream.is_empty() && req.stream_name != url.expected_stream {
783 let actual = req.stream_name.clone();
784 let expected = url.expected_stream.clone();
785 let _ = req.reject("unexpected stream key");
786 return Err(CoreError::InvalidData(format!(
787 "rtmp publisher stream-name mismatch: expected {expected:?}, got {actual:?}"
788 )));
789 }
790 let session = req.accept().map_err(rtmp_to_core_err)?;
791 let source = RtmpPacketSource::from_session_with_probe(session, Some(PROBE_READ_TIMEOUT))
792 .map_err(rtmp_to_core_err)?;
793 Ok(Box::new(source))
794}
795
796pub fn register(registry: &mut SourceRegistry) {
803 registry.register_packets("rtmp", open_rtmp);
804}
805
806#[allow(dead_code)]
809fn _bytes_source_anchor(_: Box<dyn BytesSource>) {}
810
811#[cfg(test)]
812mod tests {
813 use super::*;
814 use crate::flv::{
815 AAC_PACKET_TYPE_RAW, AUDIO_FORMAT_EX_HEADER, AVC_PACKET_TYPE_NALU,
816 AVC_PACKET_TYPE_SEQUENCE_HEADER, EX_PACKET_TYPE_CODED_FRAMES, EX_PACKET_TYPE_METADATA,
817 EX_PACKET_TYPE_SEQUENCE_START, FOURCC_AV1, FOURCC_HEVC, FOURCC_VP9, VIDEO_FRAME_INTER,
818 VIDEO_FRAME_KEYFRAME,
819 };
820
821 #[test]
822 fn audio_codec_id_maps_aac_and_mp3() {
823 assert_eq!(audio_codec_id(AUDIO_FORMAT_AAC).as_str(), "aac");
824 assert_eq!(audio_codec_id(AUDIO_FORMAT_MP3).as_str(), "mp3");
825 assert_eq!(audio_codec_id(AUDIO_FORMAT_PCM_LE).as_str(), "pcm_s16le");
826 assert_eq!(audio_codec_id(0xFF).as_str(), "unknown");
829 }
830
831 #[test]
832 fn video_codec_id_maps_avc_and_h263() {
833 assert_eq!(video_codec_id(VIDEO_CODEC_AVC).as_str(), "h264");
834 assert_eq!(video_codec_id(VIDEO_CODEC_H263).as_str(), "h263");
835 assert_eq!(video_codec_id(VIDEO_CODEC_VP6).as_str(), "vp6f");
836 assert_eq!(video_codec_id(0xFF).as_str(), "unknown");
837 }
838
839 #[test]
840 fn audio_aac_seq_header_packet_carries_marker_and_header_flag() {
841 let tag = AudioTag {
842 mod_ex: Vec::new(),
843 sound_format: AUDIO_FORMAT_AAC,
844 sound_rate: 3,
845 sound_size_16bit: true,
846 stereo: true,
847 aac_packet_type: Some(AAC_PACKET_TYPE_SEQUENCE_HEADER),
848 body: vec![0x12, 0x10],
849 ex_packet_type: None,
850 audio_fourcc: None,
851
852 multitrack: None,
853 };
854 let pkt = audio_to_packet(0, &tag);
855 assert_eq!(pkt.stream_index, AUDIO_STREAM_INDEX);
856 assert_eq!(pkt.time_base, RTMP_TIME_BASE);
857 assert_eq!(pkt.pts, Some(0));
858 assert_eq!(pkt.dts, Some(0));
859 assert!(pkt.flags.header);
860 assert_eq!(pkt.data, vec![0x00, 0x12, 0x10]);
862 }
863
864 #[test]
865 fn audio_aac_raw_packet_keeps_packet_type_byte() {
866 let tag = AudioTag {
867 mod_ex: Vec::new(),
868 sound_format: AUDIO_FORMAT_AAC,
869 sound_rate: 3,
870 sound_size_16bit: true,
871 stereo: true,
872 aac_packet_type: Some(AAC_PACKET_TYPE_RAW),
873 body: vec![0xAB, 0xCD, 0xEF],
874 ex_packet_type: None,
875 audio_fourcc: None,
876
877 multitrack: None,
878 };
879 let pkt = audio_to_packet(123, &tag);
880 assert_eq!(pkt.pts, Some(123 * RTMP_MS_TO_NS));
882 assert_eq!(pkt.dts, Some(123 * RTMP_MS_TO_NS));
883 assert!(!pkt.flags.header);
884 assert_eq!(pkt.data, vec![0x01, 0xAB, 0xCD, 0xEF]);
885 }
886
887 #[test]
888 fn audio_mp3_packet_strips_flv_header_only() {
889 let tag = AudioTag {
890 mod_ex: Vec::new(),
891 sound_format: AUDIO_FORMAT_MP3,
892 sound_rate: 3,
893 sound_size_16bit: true,
894 stereo: true,
895 aac_packet_type: None,
896 body: vec![0xFF, 0xFB, 0x90, 0x00],
897 ex_packet_type: None,
898 audio_fourcc: None,
899
900 multitrack: None,
901 };
902 let pkt = audio_to_packet(40, &tag);
903 assert_eq!(pkt.data, vec![0xFF, 0xFB, 0x90, 0x00]);
905 assert_eq!(pkt.pts, Some(40 * RTMP_MS_TO_NS));
906 }
907
908 #[test]
911 fn audio_codec_id_for_tag_dispatches_legacy_vs_fourcc() {
912 let legacy_aac = AudioTag {
913 mod_ex: Vec::new(),
914 sound_format: AUDIO_FORMAT_AAC,
915 sound_rate: 3,
916 sound_size_16bit: true,
917 stereo: true,
918 aac_packet_type: Some(AAC_PACKET_TYPE_SEQUENCE_HEADER),
919 body: vec![],
920 ex_packet_type: None,
921 audio_fourcc: None,
922
923 multitrack: None,
924 };
925 assert_eq!(audio_codec_id_for_tag(&legacy_aac).as_str(), "aac");
926 for (fcc, expected) in [
927 (flv::FOURCC_OPUS, "opus"),
928 (flv::FOURCC_FLAC, "flac"),
929 (flv::FOURCC_AC3, "ac3"),
930 (flv::FOURCC_EAC3, "eac3"),
931 (flv::FOURCC_MP3, "mp3"),
932 (flv::FOURCC_AAC, "aac"),
933 ] {
934 let t = AudioTag {
935 mod_ex: Vec::new(),
936 sound_format: AUDIO_FORMAT_EX_HEADER,
937 sound_rate: 0,
938 sound_size_16bit: false,
939 stereo: false,
940 aac_packet_type: None,
941 ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_CODED_FRAMES),
942 audio_fourcc: Some(fcc),
943 body: vec![],
944
945 multitrack: None,
946 };
947 assert_eq!(audio_codec_id_for_tag(&t).as_str(), expected);
948 }
949 }
950
951 #[test]
952 fn audio_fourcc_codec_id_unknown_collapses() {
953 assert_eq!(audio_fourcc_codec_id(*b"xxxx").as_str(), "unknown");
956 }
957
958 #[test]
959 fn ex_opus_sequence_start_packet_sets_header_flag_and_strips_fourcc() {
960 let tag = AudioTag {
961 mod_ex: Vec::new(),
962 sound_format: AUDIO_FORMAT_EX_HEADER,
963 sound_rate: 0,
964 sound_size_16bit: false,
965 stereo: false,
966 aac_packet_type: None,
967 ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START),
968 audio_fourcc: Some(flv::FOURCC_OPUS),
969 body: b"OpusHead\x01\x02\x38\x01\x80\xbb\x00\x00\x00\x00\x00".to_vec(),
970
971 multitrack: None,
972 };
973 let pkt = audio_to_packet(0, &tag);
974 assert_eq!(pkt.stream_index, AUDIO_STREAM_INDEX);
975 assert!(pkt.flags.header);
976 assert_eq!(
980 pkt.data,
981 b"OpusHead\x01\x02\x38\x01\x80\xbb\x00\x00\x00\x00\x00".to_vec()
982 );
983 }
984
985 #[test]
986 fn ex_ac3_coded_frames_packet_strips_fourcc_and_keeps_body() {
987 let tag = AudioTag {
988 mod_ex: Vec::new(),
989 sound_format: AUDIO_FORMAT_EX_HEADER,
990 sound_rate: 0,
991 sound_size_16bit: false,
992 stereo: false,
993 aac_packet_type: None,
994 ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_CODED_FRAMES),
995 audio_fourcc: Some(flv::FOURCC_AC3),
996 body: vec![0x0B, 0x77, 0xAB, 0xCD, 0xEF],
997
998 multitrack: None,
999 };
1000 let pkt = audio_to_packet(200, &tag);
1001 assert!(!pkt.flags.header);
1002 assert_eq!(pkt.dts, Some(200 * RTMP_MS_TO_NS));
1003 assert_eq!(pkt.pts, Some(200 * RTMP_MS_TO_NS));
1004 assert_eq!(pkt.data, vec![0x0B, 0x77, 0xAB, 0xCD, 0xEF]);
1006 }
1007
1008 #[test]
1009 fn ex_audio_sequence_end_packet_flagged_header_with_empty_body() {
1010 let tag = AudioTag {
1011 mod_ex: Vec::new(),
1012 sound_format: AUDIO_FORMAT_EX_HEADER,
1013 sound_rate: 0,
1014 sound_size_16bit: false,
1015 stereo: false,
1016 aac_packet_type: None,
1017 ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_END),
1018 audio_fourcc: Some(flv::FOURCC_OPUS),
1019 body: vec![],
1020
1021 multitrack: None,
1022 };
1023 let pkt = audio_to_packet(999, &tag);
1024 assert!(pkt.flags.header);
1028 assert!(pkt.data.is_empty());
1029 assert_eq!(pkt.dts, Some(999 * RTMP_MS_TO_NS));
1030 }
1031
1032 #[test]
1033 fn ex_audio_codec_params_copies_sequence_start_to_extradata() {
1034 let tag = AudioTag {
1035 mod_ex: Vec::new(),
1036 sound_format: AUDIO_FORMAT_EX_HEADER,
1037 sound_rate: 0,
1038 sound_size_16bit: false,
1039 stereo: false,
1040 aac_packet_type: None,
1041 ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_SEQUENCE_START),
1042 audio_fourcc: Some(flv::FOURCC_FLAC),
1043 body: b"fLaC\x80\x00\x00\x22streaminfo-body-bytes".to_vec(),
1044
1045 multitrack: None,
1046 };
1047 let p = audio_codec_params(&tag);
1048 assert_eq!(p.codec_id.as_str(), "flac");
1049 assert_eq!(p.extradata, tag.body);
1050 assert_eq!(p.sample_rate, None);
1054 assert_eq!(p.channels, None);
1055 }
1056
1057 #[test]
1058 fn ex_audio_codec_params_ac3_coded_frames_leaves_extradata_empty() {
1059 let tag = AudioTag {
1064 mod_ex: Vec::new(),
1065 sound_format: AUDIO_FORMAT_EX_HEADER,
1066 sound_rate: 0,
1067 sound_size_16bit: false,
1068 stereo: false,
1069 aac_packet_type: None,
1070 ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_CODED_FRAMES),
1071 audio_fourcc: Some(flv::FOURCC_AC3),
1072 body: vec![0x0B, 0x77, 0xAB, 0xCD],
1073
1074 multitrack: None,
1075 };
1076 let p = audio_codec_params(&tag);
1077 assert_eq!(p.codec_id.as_str(), "ac3");
1078 assert!(p.extradata.is_empty());
1079 }
1080
1081 #[test]
1082 fn video_avc_keyframe_packet_keyframe_flag_and_no_pts_offset() {
1083 let tag = VideoTag {
1084 mod_ex: Vec::new(),
1085 frame_type: VIDEO_FRAME_KEYFRAME,
1086 codec_id: VIDEO_CODEC_AVC,
1087 avc_packet_type: Some(AVC_PACKET_TYPE_NALU),
1088 composition_time: 0,
1089 body: b"\x00\x00\x00\x05hello".to_vec(),
1090 ex_packet_type: None,
1091 fourcc: None,
1092
1093 multitrack: None,
1094 };
1095 let pkt = video_to_packet(33, &tag);
1096 assert_eq!(pkt.stream_index, VIDEO_STREAM_INDEX);
1097 assert!(pkt.flags.keyframe);
1098 assert!(!pkt.flags.header);
1099 assert_eq!(pkt.pts, Some(33 * RTMP_MS_TO_NS));
1100 assert_eq!(pkt.dts, Some(33 * RTMP_MS_TO_NS));
1101 assert_eq!(pkt.data, b"\x00\x00\x00\x05hello".to_vec());
1102 }
1103
1104 #[test]
1105 fn video_avc_inter_packet_with_negative_cts_offsets_pts() {
1106 let tag = VideoTag {
1107 mod_ex: Vec::new(),
1108 frame_type: VIDEO_FRAME_INTER,
1109 codec_id: VIDEO_CODEC_AVC,
1110 avc_packet_type: Some(AVC_PACKET_TYPE_NALU),
1111 composition_time: -10,
1112 body: vec![1, 2, 3],
1113 ex_packet_type: None,
1114 fourcc: None,
1115
1116 multitrack: None,
1117 };
1118 let pkt = video_to_packet(100, &tag);
1119 assert!(!pkt.flags.keyframe);
1120 assert_eq!(pkt.dts, Some(100 * RTMP_MS_TO_NS));
1121 assert_eq!(pkt.pts, Some(90 * RTMP_MS_TO_NS));
1122 }
1123
1124 #[test]
1125 fn video_avc_seq_header_marks_header_flag() {
1126 let tag = VideoTag {
1127 mod_ex: Vec::new(),
1128 frame_type: VIDEO_FRAME_KEYFRAME,
1129 codec_id: VIDEO_CODEC_AVC,
1130 avc_packet_type: Some(AVC_PACKET_TYPE_SEQUENCE_HEADER),
1131 composition_time: 0,
1132 body: b"\x01\x42\x80\x1e".to_vec(),
1133 ex_packet_type: None,
1134 fourcc: None,
1135
1136 multitrack: None,
1137 };
1138 let pkt = video_to_packet(0, &tag);
1139 assert!(pkt.flags.keyframe);
1140 assert!(pkt.flags.header);
1141 assert_eq!(pkt.data, b"\x01\x42\x80\x1e".to_vec());
1142 }
1143
1144 #[test]
1145 fn video_h263_packet_keeps_body_and_pts_eq_dts() {
1146 let tag = VideoTag {
1147 mod_ex: Vec::new(),
1148 frame_type: VIDEO_FRAME_INTER,
1149 codec_id: VIDEO_CODEC_H263,
1150 avc_packet_type: None,
1151 composition_time: 0,
1152 body: vec![0xAA, 0xBB, 0xCC],
1153 ex_packet_type: None,
1154 fourcc: None,
1155
1156 multitrack: None,
1157 };
1158 let pkt = video_to_packet(50, &tag);
1159 assert_eq!(pkt.pts, pkt.dts);
1160 assert_eq!(pkt.data, vec![0xAA, 0xBB, 0xCC]);
1161 }
1162
1163 #[test]
1166 fn video_codec_id_for_tag_dispatches_legacy_vs_fourcc() {
1167 let avc = VideoTag {
1168 mod_ex: Vec::new(),
1169 frame_type: VIDEO_FRAME_KEYFRAME,
1170 codec_id: VIDEO_CODEC_AVC,
1171 avc_packet_type: Some(AVC_PACKET_TYPE_NALU),
1172 composition_time: 0,
1173 body: vec![],
1174 ex_packet_type: None,
1175 fourcc: None,
1176
1177 multitrack: None,
1178 };
1179 assert_eq!(video_codec_id_for_tag(&avc).as_str(), "h264");
1180 for (fcc, expected) in [
1181 (FOURCC_HEVC, "hevc"),
1182 (FOURCC_AV1, "av1"),
1183 (FOURCC_VP9, "vp9"),
1184 ] {
1185 let t = VideoTag {
1186 mod_ex: Vec::new(),
1187 frame_type: VIDEO_FRAME_KEYFRAME,
1188 codec_id: 0,
1189 avc_packet_type: None,
1190 composition_time: 0,
1191 body: vec![],
1192 ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1193 fourcc: Some(fcc),
1194
1195 multitrack: None,
1196 };
1197 assert_eq!(video_codec_id_for_tag(&t).as_str(), expected);
1198 }
1199 }
1200
1201 #[test]
1202 fn ex_hevc_sequence_start_packet_sets_header_flag() {
1203 let tag = VideoTag {
1208 mod_ex: Vec::new(),
1209 frame_type: VIDEO_FRAME_KEYFRAME,
1210 codec_id: 0,
1211 avc_packet_type: None,
1212 composition_time: 0,
1213 body: b"\x01hvcc-stub".to_vec(),
1214 ex_packet_type: Some(EX_PACKET_TYPE_SEQUENCE_START),
1215 fourcc: Some(FOURCC_HEVC),
1216
1217 multitrack: None,
1218 };
1219 let pkt = video_to_packet(0, &tag);
1220 assert!(pkt.flags.header);
1221 assert!(pkt.flags.keyframe);
1222 assert_eq!(pkt.dts, Some(0));
1223 assert_eq!(pkt.pts, Some(0));
1224 assert_eq!(pkt.time_base, RTMP_TIME_BASE);
1225 assert_eq!(pkt.data, b"\x01hvcc-stub".to_vec());
1226 }
1227
1228 #[test]
1229 fn ex_hevc_coded_frames_with_cts_offsets_pts() {
1230 let tag = VideoTag {
1233 mod_ex: Vec::new(),
1234 frame_type: VIDEO_FRAME_INTER,
1235 codec_id: 0,
1236 avc_packet_type: None,
1237 composition_time: 17,
1238 body: b"\x00\x00\x00\x04NALU".to_vec(),
1239 ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1240 fourcc: Some(FOURCC_HEVC),
1241
1242 multitrack: None,
1243 };
1244 let pkt = video_to_packet(200, &tag);
1245 assert!(!pkt.flags.keyframe);
1246 assert!(!pkt.flags.header);
1247 assert_eq!(pkt.dts, Some(200 * RTMP_MS_TO_NS));
1248 assert_eq!(pkt.pts, Some(217 * RTMP_MS_TO_NS));
1249 }
1250
1251 #[test]
1252 fn ex_av1_coded_frames_no_cts_offset() {
1253 let tag = VideoTag {
1255 mod_ex: Vec::new(),
1256 frame_type: VIDEO_FRAME_KEYFRAME,
1257 codec_id: 0,
1258 avc_packet_type: None,
1259 composition_time: 0,
1260 body: vec![0x0a, 0x0b],
1261 ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1262 fourcc: Some(FOURCC_AV1),
1263
1264 multitrack: None,
1265 };
1266 let pkt = video_to_packet(500, &tag);
1267 assert!(pkt.flags.keyframe);
1268 assert!(!pkt.flags.header);
1269 assert_eq!(pkt.dts, pkt.pts);
1270 assert_eq!(pkt.dts, Some(500 * RTMP_MS_TO_NS));
1271 }
1272
1273 #[test]
1274 fn ex_metadata_packet_ignores_frame_type_flags() {
1275 let tag = VideoTag {
1280 mod_ex: Vec::new(),
1281 frame_type: VIDEO_FRAME_KEYFRAME, codec_id: 0,
1283 avc_packet_type: None,
1284 composition_time: 0,
1285 body: b"amf-payload".to_vec(),
1286 ex_packet_type: Some(EX_PACKET_TYPE_METADATA),
1287 fourcc: Some(FOURCC_HEVC),
1288
1289 multitrack: None,
1290 };
1291 let pkt = video_to_packet(123, &tag);
1292 assert!(!pkt.flags.keyframe);
1293 assert!(pkt.flags.header);
1294 assert_eq!(pkt.data, b"amf-payload".to_vec());
1295 }
1296
1297 #[test]
1300 fn audio_timestamp_offset_nano_folds_into_presentation_time() {
1301 let tag = AudioTag {
1308 mod_ex: vec![crate::flv::ModEx::timestamp_offset_nano_entry(750_000)],
1309 sound_format: AUDIO_FORMAT_EX_HEADER,
1310 sound_rate: 0,
1311 sound_size_16bit: false,
1312 stereo: false,
1313 aac_packet_type: None,
1314 ex_packet_type: Some(flv::AUDIO_PACKET_TYPE_CODED_FRAMES),
1315 audio_fourcc: Some(flv::FOURCC_OPUS),
1316 body: vec![0x12, 0x34, 0x56],
1317
1318 multitrack: None,
1319 };
1320 let pkt = audio_to_packet(40, &tag);
1321 assert_eq!(pkt.pts, Some(40_750_000));
1323 assert_eq!(pkt.dts, Some(40_750_000));
1324 assert_eq!(pkt.time_base, RTMP_TIME_BASE);
1325 }
1326
1327 #[test]
1328 fn video_timestamp_offset_nano_folds_into_pts_only() {
1329 let tag = VideoTag {
1333 mod_ex: vec![crate::flv::ModEx::timestamp_offset_nano_entry(123_456)],
1334 frame_type: VIDEO_FRAME_INTER,
1335 codec_id: 0,
1336 avc_packet_type: None,
1337 composition_time: 0,
1338 body: b"\x00\x00\x00\x04NALU".to_vec(),
1339 ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1340 fourcc: Some(FOURCC_AV1),
1341
1342 multitrack: None,
1343 };
1344 let pkt = video_to_packet(60, &tag);
1345 assert_eq!(pkt.dts, Some(60_000_000));
1347 assert_eq!(pkt.pts, Some(60_123_456));
1349 assert_eq!(pkt.time_base, RTMP_TIME_BASE);
1350 }
1351
1352 #[test]
1353 fn video_timestamp_offset_nano_stacks_on_cts_and_dts_unchanged() {
1354 let tag = VideoTag {
1358 mod_ex: vec![crate::flv::ModEx::timestamp_offset_nano_entry(500_000)],
1359 frame_type: VIDEO_FRAME_INTER,
1360 codec_id: 0,
1361 avc_packet_type: None,
1362 composition_time: 17,
1363 body: b"\x00\x00\x00\x04NALU".to_vec(),
1364 ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1365 fourcc: Some(FOURCC_HEVC),
1366
1367 multitrack: None,
1368 };
1369 let pkt = video_to_packet(200, &tag);
1370 assert_eq!(pkt.dts, Some(200_000_000));
1372 assert_eq!(pkt.pts, Some(217_500_000));
1374 }
1375
1376 #[test]
1377 fn video_timestamp_offset_nano_sums_multiple_modex_entries() {
1378 let tag = VideoTag {
1382 mod_ex: vec![
1383 crate::flv::ModEx::timestamp_offset_nano_entry(200_000),
1384 crate::flv::ModEx {
1386 mod_ex_type: 0x0F,
1387 data: vec![0xAA, 0xBB, 0xCC],
1388 },
1389 crate::flv::ModEx::timestamp_offset_nano_entry(300_000),
1390 ],
1391 frame_type: VIDEO_FRAME_KEYFRAME,
1392 codec_id: 0,
1393 avc_packet_type: None,
1394 composition_time: 0,
1395 body: vec![0x0A],
1396 ex_packet_type: Some(EX_PACKET_TYPE_CODED_FRAMES),
1397 fourcc: Some(FOURCC_VP9),
1398
1399 multitrack: None,
1400 };
1401 let pkt = video_to_packet(10, &tag);
1402 assert_eq!(pkt.pts, Some(10_500_000));
1404 assert_eq!(pkt.dts, Some(10_000_000));
1405 }
1406
1407 #[test]
1408 fn time_base_is_nanoseconds() {
1409 assert_eq!(RTMP_TIME_BASE, TimeBase::new(1, 1_000_000_000));
1411 assert_eq!(RTMP_MS_TO_NS, 1_000_000);
1412 }
1413
1414 #[test]
1415 fn legacy_avc_seq_header_constant_still_referenced() {
1416 let _ = AVC_PACKET_TYPE_SEQUENCE_HEADER;
1421 }
1422
1423 #[test]
1424 fn listen_url_parses_host_port_app_key() {
1425 let u = ListenUrl::parse("rtmp://127.0.0.1:1935/live/secret").expect("parse");
1426 assert_eq!(u.bind_addr, "127.0.0.1:1935");
1427 assert_eq!(u.expected_app, "live");
1428 assert_eq!(u.expected_stream, "secret");
1429 }
1430
1431 #[test]
1432 fn listen_url_default_port_is_1935() {
1433 let u = ListenUrl::parse("rtmp://0.0.0.0/live/key").expect("parse");
1434 assert_eq!(u.bind_addr, "0.0.0.0:1935");
1435 }
1436
1437 #[test]
1438 fn listen_url_accepts_app_only_path() {
1439 let u = ListenUrl::parse("rtmp://127.0.0.1:1935/live").expect("parse");
1440 assert_eq!(u.expected_app, "live");
1441 assert_eq!(u.expected_stream, "");
1442 }
1443
1444 #[test]
1445 fn listen_url_rejects_non_rtmp_scheme() {
1446 assert!(ListenUrl::parse("http://x/y").is_err());
1447 }
1448
1449 #[test]
1450 fn listen_url_rejects_missing_path() {
1451 assert!(ListenUrl::parse("rtmp://127.0.0.1:1935").is_err());
1452 }
1453
1454 #[test]
1455 fn flatten_metadata_keeps_scalars_and_drops_objects() {
1456 let v = Amf0Value::Object(vec![
1457 ("width".into(), Amf0Value::Number(1280.0)),
1458 ("height".into(), Amf0Value::Number(720.0)),
1459 ("encoder".into(), Amf0Value::String("oxideav".into())),
1460 ("vhost".into(), Amf0Value::Object(vec![])),
1461 ("live".into(), Amf0Value::Boolean(true)),
1462 ]);
1463 let mut out = Vec::new();
1464 flatten_metadata(&v, &mut out);
1465 assert_eq!(
1466 out,
1467 vec![
1468 ("width".to_string(), "1280".to_string()),
1469 ("height".to_string(), "720".to_string()),
1470 ("encoder".to_string(), "oxideav".to_string()),
1471 ("live".to_string(), "true".to_string()),
1473 ]
1474 );
1475 }
1476
1477 #[test]
1478 fn rtmp_to_core_err_maps_unexpected_eof_to_eof() {
1479 let core = rtmp_to_core_err(RtmpError::UnexpectedEof);
1480 assert!(matches!(core, CoreError::Eof));
1481 }
1482
1483 #[test]
1484 fn rtmp_to_core_err_maps_protocol_violation_to_invalid_data() {
1485 let core = rtmp_to_core_err(RtmpError::ProtocolViolation("bad chunk size".into()));
1486 match core {
1487 CoreError::InvalidData(s) => assert!(s.contains("bad chunk size")),
1488 _ => panic!("expected InvalidData"),
1489 }
1490 }
1491}