1pub mod rtcp;
33pub mod sdp;
34
35pub use sdp::{MediaDirection, SdpAnswerParams, SdpOffer};
36
37use crate::bus::PlaybackRegistry;
38use crate::inbound::{IngestContext, PublishSession};
39#[cfg(feature = "codec-av1")]
40use crate::protocol::rtp::Av1Packetizer;
41use crate::protocol::rtp::{
42 H264Depacketizer, OpusPacketizer, RtpHeader, RtpPacketizer, Vp9Packetizer,
43};
44use crate::{CodecId, MediaFrame, Result, StreamKey};
45use async_trait::async_trait;
46use std::sync::Arc;
47
48#[async_trait]
53pub trait DtlsSrtpTransport: Send + Sync {
54 fn fingerprint(&self) -> String;
57
58 fn ice_credentials(&self) -> (String, String);
65
66 async fn recv_rtp(&self) -> Option<Vec<u8>> {
70 None
71 }
72
73 async fn send_rtp(&self, _packet: &[u8]) -> Result<()> {
76 Ok(())
77 }
78
79 async fn send_rtcp(&self, packet: &[u8]) -> Result<()>;
81
82 fn answer(&self, offer_sdp: &str, direction: MediaDirection) -> String {
92 let Some(offer) = SdpOffer::parse(offer_sdp) else {
93 return String::new();
94 };
95 let (ice_ufrag, ice_pwd) = self.ice_credentials();
96 sdp::build_answer_directed(
97 &offer,
98 &SdpAnswerParams {
99 fingerprint: self.fingerprint(),
100 ice_ufrag,
101 ice_pwd,
102 },
103 direction,
104 )
105 }
106}
107
108#[derive(Clone)]
115pub struct WhipEndpoint {
116 ctx: IngestContext,
117}
118
119impl WhipEndpoint {
120 pub fn new(ctx: IngestContext) -> Self {
122 Self { ctx }
123 }
124
125 pub fn accept_offer(
130 &self,
131 offer_sdp: &str,
132 key: StreamKey,
133 transport: std::sync::Arc<dyn DtlsSrtpTransport>,
134 ) -> Result<(WhipResource, String)> {
135 let offer = SdpOffer::parse(offer_sdp)
138 .ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
139 let answer = transport.answer(offer_sdp, MediaDirection::RecvOnly);
141 let resource = WhipResource {
142 ctx: self.ctx.clone(),
143 key,
144 transport,
145 video_pt: offer.payload_type,
146 audio_pt: offer.audio_payload_type,
147 };
148 Ok((resource, answer))
149 }
150}
151
152pub struct WhipResource {
155 ctx: IngestContext,
156 key: StreamKey,
157 transport: std::sync::Arc<dyn DtlsSrtpTransport>,
158 video_pt: u8,
161 audio_pt: Option<u8>,
164}
165
166impl WhipResource {
167 pub async fn pump(self) -> Result<()> {
172 let session: PublishSession = self.ctx.open_publish(self.key.clone()).await?;
173 let mut depack = H264Depacketizer::new();
174 let mut needs_keyframe = true;
175
176 while let Some(pkt) = self.transport.recv_rtp().await {
177 let Some(header) = RtpHeader::parse(&pkt) else {
178 continue;
179 };
180 let payload = &pkt[header.payload_offset..];
181
182 if self.audio_pt == Some(header.payload_type) {
185 if !payload.is_empty() {
186 let pts = (header.timestamp / 48) as i64;
187 let data = bytes::Bytes::copy_from_slice(payload);
188 let frame = MediaFrame::new_audio(pts, data, CodecId::Opus);
189 let _ = session.publish_frame(frame)?;
190 }
191 continue;
192 }
193
194 let _ = self.video_pt; match depack.push(payload, header.marker, header.timestamp, header.sequence) {
197 Ok(Some(au)) => {
198 needs_keyframe = false;
199 let pts = (au.timestamp / 90) as i64;
200 let frame =
201 MediaFrame::new_video(pts, pts, au.data, CodecId::H264, au.keyframe);
202 let _ = session.publish_frame(frame)?;
203 }
204 Ok(None) => {}
205 Err(_) => {
206 needs_keyframe = true;
208 }
209 }
210 if needs_keyframe {
211 let pli = rtcp::build_pli(0, header.ssrc);
212 let _ = self.transport.send_rtcp(&pli).await;
213 }
214 }
215
216 session.finish().await
217 }
218
219 pub async fn close(self) -> Result<()> {
221 Ok(())
222 }
223}
224
225#[derive(Clone)]
234pub struct WhepEndpoint {
235 playback: Arc<dyn PlaybackRegistry>,
236}
237
238impl WhepEndpoint {
239 pub fn new(playback: Arc<dyn PlaybackRegistry>) -> Self {
241 Self { playback }
242 }
243
244 pub fn accept_offer(
247 &self,
248 offer_sdp: &str,
249 key: StreamKey,
250 transport: Arc<dyn DtlsSrtpTransport>,
251 ) -> Result<(WhepResource, String)> {
252 let offer = SdpOffer::parse(offer_sdp)
253 .ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
254 let answer = transport.answer(offer_sdp, MediaDirection::SendOnly);
256 let resource = WhepResource {
257 playback: Arc::clone(&self.playback),
258 key,
259 transport,
260 payload_type: offer.payload_type,
261 audio_payload_type: offer.audio_payload_type,
262 warned_unsupported: std::sync::atomic::AtomicBool::new(false),
263 };
264 Ok((resource, answer))
265 }
266}
267
268pub struct WhepResource {
271 playback: Arc<dyn PlaybackRegistry>,
272 key: StreamKey,
273 transport: Arc<dyn DtlsSrtpTransport>,
274 payload_type: u8,
275 audio_payload_type: Option<u8>,
278 warned_unsupported: std::sync::atomic::AtomicBool,
281}
282
283enum EgressPacketizer {
288 Nal { p: RtpPacketizer, codec: CodecId },
290 Vp9(Vp9Packetizer),
292 #[cfg(feature = "codec-av1")]
294 Av1(Av1Packetizer),
295}
296
297impl EgressPacketizer {
298 fn for_codec(payload_type: u8, ssrc: u32, mtu: usize, codec: CodecId) -> Self {
302 match codec {
303 CodecId::H265 => EgressPacketizer::Nal {
304 p: RtpPacketizer::new_h265(payload_type, ssrc, mtu),
305 codec: CodecId::H265,
306 },
307 CodecId::VP9 => EgressPacketizer::Vp9(Vp9Packetizer::new(payload_type, ssrc, mtu)),
308 #[cfg(feature = "codec-av1")]
309 CodecId::AV1 => EgressPacketizer::Av1(Av1Packetizer::new(payload_type, ssrc, mtu)),
310 _ => EgressPacketizer::Nal {
311 p: RtpPacketizer::new(payload_type, ssrc, mtu),
312 codec: CodecId::H264,
313 },
314 }
315 }
316
317 fn packetize_into(&mut self, frame: &MediaFrame, out: &mut Vec<Vec<u8>>) -> bool {
321 let ts = (frame.pts.max(0) as u64).wrapping_mul(90) as u32; match self {
325 EgressPacketizer::Nal { p, codec } if frame.codec == *codec => {
326 p.packetize_into(&frame.data, ts, out);
327 true
328 }
329 EgressPacketizer::Vp9(p) if frame.codec == CodecId::VP9 => {
330 p.packetize_into(&frame.data, ts, frame.is_keyframe(), out);
331 true
332 }
333 #[cfg(feature = "codec-av1")]
334 EgressPacketizer::Av1(p) if frame.codec == CodecId::AV1 => {
335 p.packetize_into(&frame.data, ts, out);
336 true
337 }
338 _ => false,
339 }
340 }
341}
342
343impl WhepResource {
344 pub async fn pump(self) -> Result<()> {
353 let handle = self.playback.get_stream(&self.key)?;
354 let ssrc = 0x5745_4850; let mut sub = handle.subscribe_resilient();
358
359 let (vcfg, _) = handle.cached_configs();
361 let replay = handle.replay_buffer();
362 drop(handle);
366
367 let video_codec = vcfg
370 .as_ref()
371 .map(|c| c.codec)
372 .or_else(|| replay.iter().find(|f| f.is_video()).map(|f| f.codec))
373 .unwrap_or(CodecId::H264);
374 let mut packetizer =
375 EgressPacketizer::for_codec(self.payload_type, ssrc, 1200, video_codec);
376 let mut audio = self
380 .audio_payload_type
381 .map(|pt| OpusPacketizer::new(pt, 0x5745_4151)); let mut pkts: Vec<Vec<u8>> = Vec::new();
385
386 if let Some(cfg) = vcfg {
387 self.send_frame(&cfg, &mut packetizer, &mut audio, &mut pkts)
388 .await?;
389 }
390 for frame in replay {
391 self.send_frame(&frame, &mut packetizer, &mut audio, &mut pkts)
392 .await?;
393 }
394
395 while let Some(frame) = sub.recv().await {
396 self.send_frame(&frame, &mut packetizer, &mut audio, &mut pkts)
397 .await?;
398 }
399 Ok(())
400 }
401
402 async fn send_frame(
410 &self,
411 frame: &MediaFrame,
412 packetizer: &mut EgressPacketizer,
413 audio: &mut Option<OpusPacketizer>,
414 pkts: &mut Vec<Vec<u8>>,
415 ) -> Result<()> {
416 if frame.is_audio() {
417 if let Some(ap) = audio.as_mut() {
418 if frame.codec == CodecId::Opus {
419 let ts = (frame.pts.max(0) as u64).wrapping_mul(48) as u32; ap.packetize_into(&frame.data, ts, pkts);
421 for packet in pkts.iter() {
422 self.transport.send_rtp(packet).await?;
423 }
424 }
425 }
426 return Ok(());
427 }
428 if !frame.is_video() {
429 return Ok(());
430 }
431 if packetizer.packetize_into(frame, pkts) {
432 for packet in pkts.iter() {
433 self.transport.send_rtp(packet).await?;
434 }
435 } else {
436 use std::sync::atomic::Ordering;
437 if !self.warned_unsupported.swap(true, Ordering::Relaxed) {
438 tracing::warn!(
439 stream = %self.key,
440 codec = ?frame.codec,
441 "WHEP egress: unsupported video codec; frames skipped",
442 );
443 }
444 }
445 Ok(())
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use crate::bus::PlaybackRegistry;
453 use std::sync::Arc;
454 use tokio::sync::Mutex;
455
456 struct FakeTransport {
458 packets: Mutex<std::collections::VecDeque<Vec<u8>>>,
459 rtcp: Mutex<Vec<Vec<u8>>>,
460 sent_rtp: Mutex<Vec<Vec<u8>>>,
461 }
462
463 impl FakeTransport {
464 fn with_packets(packets: std::collections::VecDeque<Vec<u8>>) -> Self {
465 Self {
466 packets: Mutex::new(packets),
467 rtcp: Mutex::new(Vec::new()),
468 sent_rtp: Mutex::new(Vec::new()),
469 }
470 }
471 }
472
473 #[async_trait]
474 impl DtlsSrtpTransport for FakeTransport {
475 fn fingerprint(&self) -> String {
476 "sha-256 AA:BB".into()
477 }
478 fn ice_credentials(&self) -> (String, String) {
479 ("ufrag".into(), "pwd".into())
480 }
481 async fn recv_rtp(&self) -> Option<Vec<u8>> {
482 self.packets.lock().await.pop_front()
483 }
484 async fn send_rtp(&self, packet: &[u8]) -> Result<()> {
485 self.sent_rtp.lock().await.push(packet.to_vec());
486 Ok(())
487 }
488 async fn send_rtcp(&self, packet: &[u8]) -> Result<()> {
489 self.rtcp.lock().await.push(packet.to_vec());
490 Ok(())
491 }
492 }
493
494 fn rtp_packet(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
495 rtp_packet_pt(96, seq, ts, marker, payload)
496 }
497
498 fn rtp_packet_pt(pt: u8, seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
499 let mut p = vec![0x80, if marker { 0x80 | pt } else { pt & 0x7F }];
500 p.extend_from_slice(&seq.to_be_bytes());
501 p.extend_from_slice(&ts.to_be_bytes());
502 p.extend_from_slice(&[0, 0, 0, 7]);
503 p.extend_from_slice(payload);
504 p
505 }
506
507 #[tokio::test]
508 async fn accept_offer_builds_answer_with_transport_credentials() {
509 let engine = crate::Engine::builder()
510 .application(crate::AppSpec::new("live"))
511 .build();
512 let endpoint = WhipEndpoint::new(IngestContext::new(engine));
513 let transport = Arc::new(FakeTransport::with_packets(Default::default()));
514 let offer = "v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
515 let (_res, answer) = endpoint
516 .accept_offer(offer, StreamKey::new("live", "web"), transport)
517 .unwrap();
518 assert!(answer.contains("a=ice-ufrag:ufrag"));
519 assert!(answer.contains("a=fingerprint:sha-256 AA:BB"));
520 assert!(answer.contains("a=setup:passive"));
521 }
522
523 #[tokio::test]
524 async fn pump_publishes_idr_then_releases_slot() {
525 let engine = crate::Engine::builder()
526 .application(crate::AppSpec::new("live").gop_cache(4))
527 .build();
528 let key = StreamKey::new("live", "web");
529 let ctx = IngestContext::new(engine.clone());
530
531 let mut q = std::collections::VecDeque::new();
532 q.push_back(rtp_packet(1, 0, true, &[0x65, 0x11])); let transport = Arc::new(FakeTransport::with_packets(q));
534
535 let resource = WhipResource {
536 ctx,
537 key: key.clone(),
538 transport,
539 video_pt: 96,
540 audio_pt: None,
541 };
542 resource.pump().await.unwrap();
543
544 assert!(engine.get_stream(&key).is_err());
547 }
548
549 #[tokio::test]
550 async fn pump_requests_keyframe_on_a_depacketize_gap() {
551 let engine = crate::Engine::builder()
552 .application(crate::AppSpec::new("live").gop_cache(4))
553 .build();
554 let ctx = IngestContext::new(engine);
555
556 let mut q = std::collections::VecDeque::new();
558 q.push_back(rtp_packet(1, 0, false, &[0x7C, 0x05, 0x11])); let transport = Arc::new(FakeTransport::with_packets(q));
560
561 let resource = WhipResource {
562 ctx,
563 key: StreamKey::new("live", "web2"),
564 transport: transport.clone(),
565 video_pt: 96,
566 audio_pt: None,
567 };
568 resource.pump().await.unwrap();
569 assert!(
570 !transport.rtcp.lock().await.is_empty(),
571 "a PLI was sent after the depacketize gap"
572 );
573 }
574
575 #[tokio::test]
579 async fn pump_routes_opus_audio_onto_the_bus() {
580 let engine = crate::Engine::builder()
581 .application(crate::AppSpec::new("live").gop_cache(8))
582 .build();
583 let key = StreamKey::new("live", "av");
584 let ctx = IngestContext::new(engine.clone());
585
586 let handle = engine.get_stream(&key);
588 assert!(handle.is_err(), "stream not live until pump opens publish");
589
590 let mut q = std::collections::VecDeque::new();
591 q.push_back(rtp_packet_pt(111, 7, 4800, true, &[0xAA, 0xBB, 0xCC]));
593 let transport = Arc::new(FakeTransport::with_packets(q));
594
595 let resource = WhipResource {
596 ctx,
597 key: key.clone(),
598 transport,
599 video_pt: 96,
600 audio_pt: Some(111),
601 };
602 let pump = tokio::spawn(async move { resource.pump().await });
604 let _ = pump.await.unwrap();
606 assert!(engine.get_stream(&key).is_err());
609 }
610
611 #[tokio::test]
612 async fn whep_egress_packetizes_published_frames_as_rtp() {
613 use crate::FrameFlags;
614 let engine = crate::Engine::builder()
615 .application(crate::AppSpec::new("live").gop_cache(8))
616 .build();
617 let key = StreamKey::new("live", "show");
618
619 let ctx = IngestContext::new(engine.clone());
621 let session = ctx.open_publish(key.clone()).await.unwrap();
622 let mut cfg = MediaFrame::new_video(
623 0,
624 0,
625 bytes::Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42]),
626 CodecId::H264,
627 false,
628 );
629 cfg.flags |= FrameFlags::CONFIG;
630 session.publish_frame(cfg).unwrap();
631 session
632 .publish_frame(MediaFrame::new_video(
633 10,
634 10,
635 bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65, 0x88, 0x99]),
636 CodecId::H264,
637 true,
638 ))
639 .unwrap();
640
641 let whep = WhepEndpoint::new(engine.clone());
643 let transport = Arc::new(FakeTransport::with_packets(Default::default()));
644 let offer = "v=0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
645 let (resource, answer) = whep
646 .accept_offer(offer, key.clone(), transport.clone())
647 .unwrap();
648 assert!(answer.contains("a=sendonly"), "WHEP answer is sendonly");
649
650 let pump = tokio::spawn(resource.pump());
651 for _ in 0..32 {
653 if !transport.sent_rtp.lock().await.is_empty() {
654 break;
655 }
656 tokio::task::yield_now().await;
657 }
658 session.finish().await.unwrap();
659 let _ = pump.await.unwrap();
660
661 let sent = transport.sent_rtp.lock().await;
662 assert!(!sent.is_empty(), "egress sent RTP packets");
663 let h = RtpHeader::parse(&sent[0]).unwrap();
665 assert_eq!(h.payload_type, 96);
666 }
667
668 #[tokio::test]
671 async fn whep_egress_packetizes_opus_audio() {
672 let engine = crate::Engine::builder()
673 .application(crate::AppSpec::new("live").gop_cache(8))
674 .build();
675 let key = StreamKey::new("live", "aud");
676
677 let ctx = IngestContext::new(engine.clone());
678 let session = ctx.open_publish(key.clone()).await.unwrap();
679 session
681 .publish_frame(MediaFrame::new_video(
682 0,
683 0,
684 bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65, 0x88]),
685 CodecId::H264,
686 true,
687 ))
688 .unwrap();
689 session
690 .publish_frame(MediaFrame::new_audio(
691 20,
692 bytes::Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
693 CodecId::Opus,
694 ))
695 .unwrap();
696
697 let whep = WhepEndpoint::new(engine.clone());
698 let transport = Arc::new(FakeTransport::with_packets(Default::default()));
699 let offer = "v=0\r\n\
701m=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n\
702m=audio 9 UDP/TLS/RTP/SAVPF 111\r\na=rtpmap:111 opus/48000/2\r\n";
703 let (resource, _answer) = whep
704 .accept_offer(offer, key.clone(), transport.clone())
705 .unwrap();
706
707 let pump = tokio::spawn(resource.pump());
708 for _ in 0..64 {
709 if transport
710 .sent_rtp
711 .lock()
712 .await
713 .iter()
714 .any(|p| RtpHeader::parse(p).is_some_and(|h| h.payload_type == 111))
715 {
716 break;
717 }
718 tokio::task::yield_now().await;
719 }
720 session.finish().await.unwrap();
721 let _ = pump.await.unwrap();
722
723 let sent = transport.sent_rtp.lock().await;
724 assert!(
725 sent.iter()
726 .any(|p| RtpHeader::parse(p).is_some_and(|h| h.payload_type == 111)),
727 "egress sent an Opus audio RTP packet on PT 111"
728 );
729 }
730
731 #[tokio::test]
732 async fn whep_egress_packetizes_vp9_frames() {
733 let engine = crate::Engine::builder()
734 .application(crate::AppSpec::new("live").gop_cache(8))
735 .build();
736 let key = StreamKey::new("live", "vp9");
737
738 let ctx = IngestContext::new(engine.clone());
740 let session = ctx.open_publish(key.clone()).await.unwrap();
741 let frame_data = bytes::Bytes::from_static(&[0xAA, 0xBB, 0xCC, 0xDD, 0xEE]);
742 session
743 .publish_frame(MediaFrame::new_video(
744 0,
745 0,
746 frame_data.clone(),
747 CodecId::VP9,
748 true,
749 ))
750 .unwrap();
751
752 let whep = WhepEndpoint::new(engine.clone());
753 let transport = Arc::new(FakeTransport::with_packets(Default::default()));
754 let offer = "v=0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 VP9/90000\r\n";
755 let (resource, _answer) = whep
756 .accept_offer(offer, key.clone(), transport.clone())
757 .unwrap();
758
759 let pump = tokio::spawn(resource.pump());
760 for _ in 0..32 {
761 if !transport.sent_rtp.lock().await.is_empty() {
762 break;
763 }
764 tokio::task::yield_now().await;
765 }
766 session.finish().await.unwrap();
767 let _ = pump.await.unwrap();
768
769 let sent = transport.sent_rtp.lock().await;
771 assert!(!sent.is_empty(), "VP9 egress sent RTP packets");
772 let mut depack = crate::protocol::rtp::Vp9Depacketizer::new();
773 let mut out = None;
774 for p in sent.iter() {
775 let h = RtpHeader::parse(p).unwrap();
776 if let Some(f) = depack
777 .push(&p[h.payload_offset..], h.marker, h.timestamp)
778 .unwrap()
779 {
780 out = Some(f);
781 }
782 }
783 let out = out.expect("VP9 frame completed");
784 assert_eq!(&out.data[..], &frame_data[..], "VP9 frame reconstructed");
785 assert!(out.keyframe);
786 }
787}