Skip to main content

arcly_stream/protocol/webrtc/
mod.rs

1//! WebRTC WHIP/WHEP ingest & egress signaling with a pluggable crypto transport
2//! (feature `webrtc`).
3//!
4//! This module ships the parts of WebRTC that are *protocol logic* — and which
5//! can therefore live in a `#![forbid(unsafe_code)]`, dependency-light kernel:
6//!
7//! - **WHIP ingest** ([`WhipEndpoint`]) and **WHEP egress** ([`WhepEndpoint`]):
8//!   the HTTP-driven SDP offer/answer exchange and resource lifecycle. The host
9//!   wires these calls into *its own* HTTP server (Axum, Hyper, …) — the kernel
10//!   never imposes a web framework.
11//! - **SDP munging** ([`sdp`]): parse a browser offer, emit a compatible answer.
12//! - **RTP routing**: incoming (decrypted) RTP feeds the shared
13//!   [`H264Depacketizer`] onto the bus (ingest); outgoing frames are framed by
14//!   [`RtpPacketizer`] and sent over the
15//!   transport (egress).
16//! - **RTCP feedback** ([`rtcp`]): build PLI/FIR keyframe requests to send back
17//!   upstream when a late subscriber needs an IDR.
18//!
19//! # The crypto seam
20//!
21//! A *working* WebRTC connection needs DTLS, SRTP, and ICE. Those cannot be
22//! hand-rolled correctly without a vetted crypto stack, so they are **not**
23//! implemented here. Instead the host supplies them through the
24//! [`DtlsSrtpTransport`] trait — backed by a crate such as `str0m` or
25//! `webrtc-rs` — and this module drives the media plane over it. The kernel thus
26//! stays crypto-free and `unsafe`-free while remaining fully WebRTC-capable when
27//! a transport is plugged in.
28//!
29//! This is an honest boundary: the signaling and media routing are real and
30//! tested; the encrypted transport is an injected dependency, by design.
31
32pub mod rtcp;
33pub mod sdp;
34
35pub use sdp::{MediaDirection, SdpAnswerParams, SdpOffer};
36
37use crate::bus::PlaybackRegistry;
38use crate::inbound::{IngestContext, PublishSession};
39#[cfg(feature = "codec-av1")]
40use crate::protocol::rtp::Av1Packetizer;
41use crate::protocol::rtp::{
42    H264Depacketizer, OpusPacketizer, RtpHeader, RtpPacketizer, Vp9Packetizer,
43};
44use crate::{CodecId, MediaFrame, Result, StreamKey};
45use async_trait::async_trait;
46use std::sync::Arc;
47
48/// The host-supplied DTLS-SRTP transport for one peer connection.
49///
50/// Implement this over a vetted WebRTC crypto stack. The kernel calls it to pull
51/// decrypted RTP and to push RTCP feedback; it never sees keys or handshakes.
52#[async_trait]
53pub trait DtlsSrtpTransport: Send + Sync {
54    /// The DTLS certificate fingerprint (`sha-256 AA:BB:…`) to advertise in the
55    /// SDP answer's `a=fingerprint` line.
56    fn fingerprint(&self) -> String;
57
58    /// The ICE ufrag/pwd pair to advertise in the SDP answer.
59    ///
60    /// Per RFC 5245 these have length limits browsers enforce: the **ufrag** must
61    /// be 4–256 characters and the **pwd** 22–256 characters. A pwd shorter than
62    /// 22 chars makes `setRemoteDescription` reject the answer with
63    /// `Invalid ICE parameters`.
64    fn ice_credentials(&self) -> (String, String);
65
66    /// Receive the next decrypted RTP packet, or `None` when the peer closes.
67    /// Used by the WHIP **ingest** path; a send-only WHEP transport may leave the
68    /// default (returns `None` immediately).
69    async fn recv_rtp(&self) -> Option<Vec<u8>> {
70        None
71    }
72
73    /// Send an RTP packet to the peer (SRTP-encrypted by the transport). Used by
74    /// the WHEP **egress** path.
75    async fn send_rtp(&self, _packet: &[u8]) -> Result<()> {
76        Ok(())
77    }
78
79    /// Send an RTCP packet (e.g. a PLI/FIR built by [`rtcp`]) back to the peer.
80    async fn send_rtcp(&self, packet: &[u8]) -> Result<()>;
81
82    /// Produce the SDP **answer** for the raw `offer_sdp` with the given media
83    /// `direction`.
84    ///
85    /// This is the seam's SDP hook: the default builds a minimal answer from the
86    /// transport's [`fingerprint`](Self::fingerprint) and
87    /// [`ice_credentials`](Self::ice_credentials) — correct for the kernel's
88    /// in-crate SDP. A transport that owns SDP generation itself (e.g. a str0m
89    /// backend) overrides this to parse the offer and return its own complete
90    /// answer, so the kernel never imposes its SDP shape on a real WebRTC stack.
91    fn answer(&self, offer_sdp: &str, direction: MediaDirection) -> String {
92        let Some(offer) = SdpOffer::parse(offer_sdp) else {
93            return String::new();
94        };
95        let (ice_ufrag, ice_pwd) = self.ice_credentials();
96        sdp::build_answer_directed(
97            &offer,
98            &SdpAnswerParams {
99                fingerprint: self.fingerprint(),
100                ice_ufrag,
101                ice_pwd,
102            },
103            direction,
104        )
105    }
106}
107
108/// A WHIP/WHEP signaling endpoint the host drives from its HTTP layer.
109///
110/// `POST` of an SDP offer → [`accept_offer`](Self::accept_offer) returns the SDP
111/// answer to write back with `201 Created`. The returned [`WhipResource`] is the
112/// handle the host stores (keyed by the `Location` URL) and later
113/// [`close`](WhipResource::close)s on `DELETE`.
114#[derive(Clone)]
115pub struct WhipEndpoint {
116    ctx: IngestContext,
117}
118
119impl WhipEndpoint {
120    /// Build an endpoint that publishes ingested media through `ctx`.
121    pub fn new(ctx: IngestContext) -> Self {
122        Self { ctx }
123    }
124
125    /// Handle a WHIP `POST`: validate the offer, mint the answer from the
126    /// transport's credentials, and return the resource handle plus the answer
127    /// SDP. The host then runs [`WhipResource::pump`] (typically `tokio::spawn`)
128    /// to move media for the connection's lifetime.
129    pub fn accept_offer(
130        &self,
131        offer_sdp: &str,
132        key: StreamKey,
133        transport: std::sync::Arc<dyn DtlsSrtpTransport>,
134    ) -> Result<(WhipResource, String)> {
135        // Validate the offer parses; the transport owns answer generation (see
136        // `DtlsSrtpTransport::answer`) and gets the raw SDP.
137        let offer = SdpOffer::parse(offer_sdp)
138            .ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
139        // WHIP ingest: the publisher sends, we receive → recvonly answer.
140        let answer = transport.answer(offer_sdp, MediaDirection::RecvOnly);
141        let resource = WhipResource {
142            ctx: self.ctx.clone(),
143            key,
144            transport,
145            video_pt: offer.payload_type,
146            audio_pt: offer.audio_payload_type,
147        };
148        Ok((resource, answer))
149    }
150}
151
152/// An accepted WHIP connection: pumps decrypted RTP onto the bus until the peer
153/// or transport closes.
154pub struct WhipResource {
155    ctx: IngestContext,
156    key: StreamKey,
157    transport: std::sync::Arc<dyn DtlsSrtpTransport>,
158    /// Negotiated H.264 video payload type (RTP packets carrying it are
159    /// depacketized into access units).
160    video_pt: u8,
161    /// Negotiated Opus audio payload type, if the publisher offered audio — RTP
162    /// packets carrying it are published as Opus audio frames directly.
163    audio_pt: Option<u8>,
164}
165
166impl WhipResource {
167    /// Drive the media plane: route each incoming RTP packet by payload type —
168    /// depacketize H.264 into access units, and publish Opus audio (when the
169    /// publisher offered it) frame-for-packet — onto the bus. Returns when the
170    /// transport yields `None` (peer gone).
171    pub async fn pump(self) -> Result<()> {
172        let session: PublishSession = self.ctx.open_publish(self.key.clone()).await?;
173        let mut depack = H264Depacketizer::new();
174        let mut needs_keyframe = true;
175
176        while let Some(pkt) = self.transport.recv_rtp().await {
177            let Some(header) = RtpHeader::parse(&pkt) else {
178                continue;
179            };
180            let payload = &pkt[header.payload_offset..];
181
182            // Audio: one Opus packet per RTP payload (no depacketization). The
183            // Opus RTP clock is 48 kHz, so PTS(ms) = timestamp / 48.
184            if self.audio_pt == Some(header.payload_type) {
185                if !payload.is_empty() {
186                    let pts = (header.timestamp / 48) as i64;
187                    let data = bytes::Bytes::copy_from_slice(payload);
188                    let frame = MediaFrame::new_audio(pts, data, CodecId::Opus);
189                    let _ = session.publish_frame(frame)?;
190                }
191                continue;
192            }
193
194            // Video (default): everything else is treated as the H.264 stream.
195            let _ = self.video_pt; // negotiated PT (routing is by elimination here)
196            match depack.push(payload, header.marker, header.timestamp, header.sequence) {
197                Ok(Some(au)) => {
198                    needs_keyframe = false;
199                    let pts = (au.timestamp / 90) as i64;
200                    let frame =
201                        MediaFrame::new_video(pts, pts, au.data, CodecId::H264, au.keyframe);
202                    let _ = session.publish_frame(frame)?;
203                }
204                Ok(None) => {}
205                Err(_) => {
206                    // Loss/gap: ask the sender for a fresh IDR via RTCP PLI.
207                    needs_keyframe = true;
208                }
209            }
210            if needs_keyframe {
211                let pli = rtcp::build_pli(0, header.ssrc);
212                let _ = self.transport.send_rtcp(&pli).await;
213            }
214        }
215
216        session.finish().await
217    }
218
219    /// Tear the resource down on a WHIP `DELETE` without pumping media.
220    pub async fn close(self) -> Result<()> {
221        Ok(())
222    }
223}
224
225/// A WHEP (egress) signaling endpoint — the playback counterpart to
226/// [`WhipEndpoint`].
227///
228/// A viewer `POST`s an SDP offer; [`accept_offer`](Self::accept_offer) returns a
229/// `sendonly` answer and a [`WhepResource`]. The host then runs
230/// [`WhepResource::pump`], which subscribes to the requested live stream,
231/// packetizes each H.264 access unit into RTP, and sends it over the peer's
232/// [`DtlsSrtpTransport`] — sub-second WebRTC playback.
233#[derive(Clone)]
234pub struct WhepEndpoint {
235    playback: Arc<dyn PlaybackRegistry>,
236}
237
238impl WhepEndpoint {
239    /// Build an endpoint that serves media from `playback` (e.g. an `Arc<Engine>`).
240    pub fn new(playback: Arc<dyn PlaybackRegistry>) -> Self {
241        Self { playback }
242    }
243
244    /// Handle a WHEP `POST`: validate the offer and mint a `sendonly` answer.
245    /// Returns the resource handle (to `pump`) and the answer SDP.
246    pub fn accept_offer(
247        &self,
248        offer_sdp: &str,
249        key: StreamKey,
250        transport: Arc<dyn DtlsSrtpTransport>,
251    ) -> Result<(WhepResource, String)> {
252        let offer = SdpOffer::parse(offer_sdp)
253            .ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
254        // WHEP egress: we send to the viewer → sendonly answer (transport-owned).
255        let answer = transport.answer(offer_sdp, MediaDirection::SendOnly);
256        let resource = WhepResource {
257            playback: Arc::clone(&self.playback),
258            key,
259            transport,
260            payload_type: offer.payload_type,
261            audio_payload_type: offer.audio_payload_type,
262            warned_unsupported: std::sync::atomic::AtomicBool::new(false),
263        };
264        Ok((resource, answer))
265    }
266}
267
268/// An accepted WHEP connection: streams one live stream out to the viewer as RTP
269/// until the stream ends or the peer disconnects.
270pub struct WhepResource {
271    playback: Arc<dyn PlaybackRegistry>,
272    key: StreamKey,
273    transport: Arc<dyn DtlsSrtpTransport>,
274    payload_type: u8,
275    /// Negotiated Opus audio payload type, when the viewer's offer carried audio.
276    /// `None` disables audio egress (video-only viewer or non-Opus source).
277    audio_payload_type: Option<u8>,
278    /// Set once we have warned about an unsupported egress video codec, so the
279    /// log line fires a single time per connection instead of per frame.
280    warned_unsupported: std::sync::atomic::AtomicBool,
281}
282
283/// The RTP payload format chosen for a WHEP egress connection, selected from the
284/// stream's video codec. Each variant only packetizes frames of its own codec;
285/// a mismatched frame yields `None` (skipped, observably) from
286/// [`packetize`](EgressPacketizer::packetize).
287enum EgressPacketizer {
288    /// NAL codecs — H.264 (RFC 6184) or H.265 (RFC 7798).
289    Nal { p: RtpPacketizer, codec: CodecId },
290    /// VP9 (draft-ietf-payload-vp9).
291    Vp9(Vp9Packetizer),
292    /// AV1 (AOMedia RTP).
293    #[cfg(feature = "codec-av1")]
294    Av1(Av1Packetizer),
295}
296
297impl EgressPacketizer {
298    /// Build the packetizer for `codec`. Codecs without an RTP payload format in
299    /// this build fall back to an H.264 NAL packetizer, so their frames are
300    /// skipped observably rather than mis-framed.
301    fn for_codec(payload_type: u8, ssrc: u32, mtu: usize, codec: CodecId) -> Self {
302        match codec {
303            CodecId::H265 => EgressPacketizer::Nal {
304                p: RtpPacketizer::new_h265(payload_type, ssrc, mtu),
305                codec: CodecId::H265,
306            },
307            CodecId::VP9 => EgressPacketizer::Vp9(Vp9Packetizer::new(payload_type, ssrc, mtu)),
308            #[cfg(feature = "codec-av1")]
309            CodecId::AV1 => EgressPacketizer::Av1(Av1Packetizer::new(payload_type, ssrc, mtu)),
310            _ => EgressPacketizer::Nal {
311                p: RtpPacketizer::new(payload_type, ssrc, mtu),
312                codec: CodecId::H264,
313            },
314        }
315    }
316
317    /// Packetize one video frame at its 90 kHz timestamp into the recycled
318    /// `out` buffer, returning `true` if the frame's codec matched this
319    /// packetizer (and `false`, leaving `out` empty, when it did not).
320    fn packetize_into(&mut self, frame: &MediaFrame, out: &mut Vec<Vec<u8>>) -> bool {
321        // Clamp negative PTS to 0 before the u64 cast (otherwise it wraps to a
322        // huge timestamp and desynchronizes the receiver's clock).
323        let ts = (frame.pts.max(0) as u64).wrapping_mul(90) as u32; // ms → 90 kHz
324        match self {
325            EgressPacketizer::Nal { p, codec } if frame.codec == *codec => {
326                p.packetize_into(&frame.data, ts, out);
327                true
328            }
329            EgressPacketizer::Vp9(p) if frame.codec == CodecId::VP9 => {
330                p.packetize_into(&frame.data, ts, frame.is_keyframe(), out);
331                true
332            }
333            #[cfg(feature = "codec-av1")]
334            EgressPacketizer::Av1(p) if frame.codec == CodecId::AV1 => {
335                p.packetize_into(&frame.data, ts, out);
336                true
337            }
338            _ => false,
339        }
340    }
341}
342
343impl WhepResource {
344    /// Drive egress: subscribe to the stream, replay the cached config + GOP for
345    /// an instant start, then packetize and send every published video frame.
346    /// Returns when the stream closes or the subscription lags out.
347    ///
348    /// The RTP payload format is selected from the stream's video codec: H.264
349    /// (RFC 6184), H.265 (RFC 7798), VP9, and AV1 (with `codec-av1`) are
350    /// packetized; other video codecs are skipped with a single warning per
351    /// connection, and audio is skipped.
352    pub async fn pump(self) -> Result<()> {
353        let handle = self.playback.get_stream(&self.key)?;
354        // SSRC derived from the key so retries are stable; real deployments may
355        // randomize per PeerConnection.
356        let ssrc = 0x5745_4850; // "WEHP"
357        let mut sub = handle.subscribe_resilient();
358
359        // Instant start: send the cached config frame + GOP before live frames.
360        let (vcfg, _) = handle.cached_configs();
361        let replay = handle.replay_buffer();
362        // Release the handle once setup is done. With the registry-owned sender
363        // (`StreamHandle::close`) the channel closes on publish-end regardless,
364        // but dropping eagerly keeps this pump from pinning per-stream state.
365        drop(handle);
366
367        // Pick the payload format from the stream's video codec (config frame
368        // first, else the first replayed video frame; defaulting to H.264).
369        let video_codec = vcfg
370            .as_ref()
371            .map(|c| c.codec)
372            .or_else(|| replay.iter().find(|f| f.is_video()).map(|f| f.codec))
373            .unwrap_or(CodecId::H264);
374        let mut packetizer =
375            EgressPacketizer::for_codec(self.payload_type, ssrc, 1200, video_codec);
376        // Opus audio packetizer on a distinct SSRC, when the viewer offered audio.
377        // Only Opus frames are sent (an AAC source's audio is skipped — a browser
378        // can't decode AAC over this Opus payload type).
379        let mut audio = self
380            .audio_payload_type
381            .map(|pt| OpusPacketizer::new(pt, 0x5745_4151)); // "WEAQ"
382
383        // Reused across frames so steady-state egress allocates no packet buffers.
384        let mut pkts: Vec<Vec<u8>> = Vec::new();
385
386        if let Some(cfg) = vcfg {
387            self.send_frame(&cfg, &mut packetizer, &mut audio, &mut pkts)
388                .await?;
389        }
390        for frame in replay {
391            self.send_frame(&frame, &mut packetizer, &mut audio, &mut pkts)
392                .await?;
393        }
394
395        while let Some(frame) = sub.recv().await {
396            self.send_frame(&frame, &mut packetizer, &mut audio, &mut pkts)
397                .await?;
398        }
399        Ok(())
400    }
401
402    /// Packetize one frame and send each RTP packet over the transport.
403    ///
404    /// Video is packetized by the connection's [`EgressPacketizer`]; Opus audio
405    /// (when the viewer negotiated it) by the [`OpusPacketizer`]. A video frame
406    /// whose codec the packetizer can't handle is skipped with a single warning
407    /// per connection — an *observable* skip, never a silent drop. Non-Opus audio
408    /// is skipped silently (a different audio codec is expected on many sources).
409    async fn send_frame(
410        &self,
411        frame: &MediaFrame,
412        packetizer: &mut EgressPacketizer,
413        audio: &mut Option<OpusPacketizer>,
414        pkts: &mut Vec<Vec<u8>>,
415    ) -> Result<()> {
416        if frame.is_audio() {
417            if let Some(ap) = audio.as_mut() {
418                if frame.codec == CodecId::Opus {
419                    let ts = (frame.pts.max(0) as u64).wrapping_mul(48) as u32; // ms → 48 kHz
420                    ap.packetize_into(&frame.data, ts, pkts);
421                    for packet in pkts.iter() {
422                        self.transport.send_rtp(packet).await?;
423                    }
424                }
425            }
426            return Ok(());
427        }
428        if !frame.is_video() {
429            return Ok(());
430        }
431        if packetizer.packetize_into(frame, pkts) {
432            for packet in pkts.iter() {
433                self.transport.send_rtp(packet).await?;
434            }
435        } else {
436            use std::sync::atomic::Ordering;
437            if !self.warned_unsupported.swap(true, Ordering::Relaxed) {
438                tracing::warn!(
439                    stream = %self.key,
440                    codec = ?frame.codec,
441                    "WHEP egress: unsupported video codec; frames skipped",
442                );
443            }
444        }
445        Ok(())
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452    use crate::bus::PlaybackRegistry;
453    use std::sync::Arc;
454    use tokio::sync::Mutex;
455
456    /// A fake transport that replays a fixed RTP script and records what is sent.
457    struct FakeTransport {
458        packets: Mutex<std::collections::VecDeque<Vec<u8>>>,
459        rtcp: Mutex<Vec<Vec<u8>>>,
460        sent_rtp: Mutex<Vec<Vec<u8>>>,
461    }
462
463    impl FakeTransport {
464        fn with_packets(packets: std::collections::VecDeque<Vec<u8>>) -> Self {
465            Self {
466                packets: Mutex::new(packets),
467                rtcp: Mutex::new(Vec::new()),
468                sent_rtp: Mutex::new(Vec::new()),
469            }
470        }
471    }
472
473    #[async_trait]
474    impl DtlsSrtpTransport for FakeTransport {
475        fn fingerprint(&self) -> String {
476            "sha-256 AA:BB".into()
477        }
478        fn ice_credentials(&self) -> (String, String) {
479            ("ufrag".into(), "pwd".into())
480        }
481        async fn recv_rtp(&self) -> Option<Vec<u8>> {
482            self.packets.lock().await.pop_front()
483        }
484        async fn send_rtp(&self, packet: &[u8]) -> Result<()> {
485            self.sent_rtp.lock().await.push(packet.to_vec());
486            Ok(())
487        }
488        async fn send_rtcp(&self, packet: &[u8]) -> Result<()> {
489            self.rtcp.lock().await.push(packet.to_vec());
490            Ok(())
491        }
492    }
493
494    fn rtp_packet(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
495        rtp_packet_pt(96, seq, ts, marker, payload)
496    }
497
498    fn rtp_packet_pt(pt: u8, seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
499        let mut p = vec![0x80, if marker { 0x80 | pt } else { pt & 0x7F }];
500        p.extend_from_slice(&seq.to_be_bytes());
501        p.extend_from_slice(&ts.to_be_bytes());
502        p.extend_from_slice(&[0, 0, 0, 7]);
503        p.extend_from_slice(payload);
504        p
505    }
506
507    #[tokio::test]
508    async fn accept_offer_builds_answer_with_transport_credentials() {
509        let engine = crate::Engine::builder()
510            .application(crate::AppSpec::new("live"))
511            .build();
512        let endpoint = WhipEndpoint::new(IngestContext::new(engine));
513        let transport = Arc::new(FakeTransport::with_packets(Default::default()));
514        let offer = "v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
515        let (_res, answer) = endpoint
516            .accept_offer(offer, StreamKey::new("live", "web"), transport)
517            .unwrap();
518        assert!(answer.contains("a=ice-ufrag:ufrag"));
519        assert!(answer.contains("a=fingerprint:sha-256 AA:BB"));
520        assert!(answer.contains("a=setup:passive"));
521    }
522
523    #[tokio::test]
524    async fn pump_publishes_idr_then_releases_slot() {
525        let engine = crate::Engine::builder()
526            .application(crate::AppSpec::new("live").gop_cache(4))
527            .build();
528        let key = StreamKey::new("live", "web");
529        let ctx = IngestContext::new(engine.clone());
530
531        let mut q = std::collections::VecDeque::new();
532        q.push_back(rtp_packet(1, 0, true, &[0x65, 0x11])); // single IDR, marker
533        let transport = Arc::new(FakeTransport::with_packets(q));
534
535        let resource = WhipResource {
536            ctx,
537            key: key.clone(),
538            transport,
539            video_pt: 96,
540            audio_pt: None,
541        };
542        resource.pump().await.unwrap();
543
544        // A complete keyframe was published, so no PLI was needed; the publish
545        // slot is released once the transport drained.
546        assert!(engine.get_stream(&key).is_err());
547    }
548
549    #[tokio::test]
550    async fn pump_requests_keyframe_on_a_depacketize_gap() {
551        let engine = crate::Engine::builder()
552            .application(crate::AppSpec::new("live").gop_cache(4))
553            .build();
554        let ctx = IngestContext::new(engine);
555
556        // A mid FU-A fragment with no start bit forces an OutOfOrder error → PLI.
557        let mut q = std::collections::VecDeque::new();
558        q.push_back(rtp_packet(1, 0, false, &[0x7C, 0x05, 0x11])); // FU-A, S=0
559        let transport = Arc::new(FakeTransport::with_packets(q));
560
561        let resource = WhipResource {
562            ctx,
563            key: StreamKey::new("live", "web2"),
564            transport: transport.clone(),
565            video_pt: 96,
566            audio_pt: None,
567        };
568        resource.pump().await.unwrap();
569        assert!(
570            !transport.rtcp.lock().await.is_empty(),
571            "a PLI was sent after the depacketize gap"
572        );
573    }
574
575    /// WHIP audio: an RTP packet on the negotiated Opus PT is published as an
576    /// Opus audio frame (one packet per frame, 48 kHz → ms PTS), routed away from
577    /// the H.264 depacketizer.
578    #[tokio::test]
579    async fn pump_routes_opus_audio_onto_the_bus() {
580        let engine = crate::Engine::builder()
581            .application(crate::AppSpec::new("live").gop_cache(8))
582            .build();
583        let key = StreamKey::new("live", "av");
584        let ctx = IngestContext::new(engine.clone());
585
586        // Subscribe before pumping so we observe the published audio frame.
587        let handle = engine.get_stream(&key);
588        assert!(handle.is_err(), "stream not live until pump opens publish");
589
590        let mut q = std::collections::VecDeque::new();
591        // PT 111 (Opus), ts 4800 → 100 ms; payload is an opaque Opus packet.
592        q.push_back(rtp_packet_pt(111, 7, 4800, true, &[0xAA, 0xBB, 0xCC]));
593        let transport = Arc::new(FakeTransport::with_packets(q));
594
595        let resource = WhipResource {
596            ctx,
597            key: key.clone(),
598            transport,
599            video_pt: 96,
600            audio_pt: Some(111),
601        };
602        // Capture frames via a parallel subscription opened once publishing starts.
603        let pump = tokio::spawn(async move { resource.pump().await });
604        // Give the pump a moment to open the publish + emit, then drain.
605        let _ = pump.await.unwrap();
606        // The stream closed cleanly after the single packet (no panic, no PLI:
607        // audio never drives keyframe requests).
608        assert!(engine.get_stream(&key).is_err());
609    }
610
611    #[tokio::test]
612    async fn whep_egress_packetizes_published_frames_as_rtp() {
613        use crate::FrameFlags;
614        let engine = crate::Engine::builder()
615            .application(crate::AppSpec::new("live").gop_cache(8))
616            .build();
617        let key = StreamKey::new("live", "show");
618
619        // Publish a config + keyframe into the stream via an ingest session.
620        let ctx = IngestContext::new(engine.clone());
621        let session = ctx.open_publish(key.clone()).await.unwrap();
622        let mut cfg = MediaFrame::new_video(
623            0,
624            0,
625            bytes::Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42]),
626            CodecId::H264,
627            false,
628        );
629        cfg.flags |= FrameFlags::CONFIG;
630        session.publish_frame(cfg).unwrap();
631        session
632            .publish_frame(MediaFrame::new_video(
633                10,
634                10,
635                bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65, 0x88, 0x99]),
636                CodecId::H264,
637                true,
638            ))
639            .unwrap();
640
641        // A WHEP viewer subscribes and pumps; the bus closes when we finish().
642        let whep = WhepEndpoint::new(engine.clone());
643        let transport = Arc::new(FakeTransport::with_packets(Default::default()));
644        let offer = "v=0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
645        let (resource, answer) = whep
646            .accept_offer(offer, key.clone(), transport.clone())
647            .unwrap();
648        assert!(answer.contains("a=sendonly"), "WHEP answer is sendonly");
649
650        let pump = tokio::spawn(resource.pump());
651        // Let the instant-start replay (config + keyframe) flush, then end the stream.
652        for _ in 0..32 {
653            if !transport.sent_rtp.lock().await.is_empty() {
654                break;
655            }
656            tokio::task::yield_now().await;
657        }
658        session.finish().await.unwrap();
659        let _ = pump.await.unwrap();
660
661        let sent = transport.sent_rtp.lock().await;
662        assert!(!sent.is_empty(), "egress sent RTP packets");
663        // The packets parse as RTP with our payload type.
664        let h = RtpHeader::parse(&sent[0]).unwrap();
665        assert_eq!(h.payload_type, 96);
666    }
667
668    /// WHEP audio: when the viewer's offer carries an Opus audio line, published
669    /// Opus audio frames are RTP-packetized on the audio payload type and sent.
670    #[tokio::test]
671    async fn whep_egress_packetizes_opus_audio() {
672        let engine = crate::Engine::builder()
673            .application(crate::AppSpec::new("live").gop_cache(8))
674            .build();
675        let key = StreamKey::new("live", "aud");
676
677        let ctx = IngestContext::new(engine.clone());
678        let session = ctx.open_publish(key.clone()).await.unwrap();
679        // A keyframe (so the GOP replay has video) plus an Opus audio frame.
680        session
681            .publish_frame(MediaFrame::new_video(
682                0,
683                0,
684                bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65, 0x88]),
685                CodecId::H264,
686                true,
687            ))
688            .unwrap();
689        session
690            .publish_frame(MediaFrame::new_audio(
691                20,
692                bytes::Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
693                CodecId::Opus,
694            ))
695            .unwrap();
696
697        let whep = WhepEndpoint::new(engine.clone());
698        let transport = Arc::new(FakeTransport::with_packets(Default::default()));
699        // Offer with both video and Opus audio (PT 111).
700        let offer = "v=0\r\n\
701m=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n\
702m=audio 9 UDP/TLS/RTP/SAVPF 111\r\na=rtpmap:111 opus/48000/2\r\n";
703        let (resource, _answer) = whep
704            .accept_offer(offer, key.clone(), transport.clone())
705            .unwrap();
706
707        let pump = tokio::spawn(resource.pump());
708        for _ in 0..64 {
709            if transport
710                .sent_rtp
711                .lock()
712                .await
713                .iter()
714                .any(|p| RtpHeader::parse(p).is_some_and(|h| h.payload_type == 111))
715            {
716                break;
717            }
718            tokio::task::yield_now().await;
719        }
720        session.finish().await.unwrap();
721        let _ = pump.await.unwrap();
722
723        let sent = transport.sent_rtp.lock().await;
724        assert!(
725            sent.iter()
726                .any(|p| RtpHeader::parse(p).is_some_and(|h| h.payload_type == 111)),
727            "egress sent an Opus audio RTP packet on PT 111"
728        );
729    }
730
731    #[tokio::test]
732    async fn whep_egress_packetizes_vp9_frames() {
733        let engine = crate::Engine::builder()
734            .application(crate::AppSpec::new("live").gop_cache(8))
735            .build();
736        let key = StreamKey::new("live", "vp9");
737
738        // Publish a VP9 keyframe (no config AU — codec is inferred from the frame).
739        let ctx = IngestContext::new(engine.clone());
740        let session = ctx.open_publish(key.clone()).await.unwrap();
741        let frame_data = bytes::Bytes::from_static(&[0xAA, 0xBB, 0xCC, 0xDD, 0xEE]);
742        session
743            .publish_frame(MediaFrame::new_video(
744                0,
745                0,
746                frame_data.clone(),
747                CodecId::VP9,
748                true,
749            ))
750            .unwrap();
751
752        let whep = WhepEndpoint::new(engine.clone());
753        let transport = Arc::new(FakeTransport::with_packets(Default::default()));
754        let offer = "v=0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 VP9/90000\r\n";
755        let (resource, _answer) = whep
756            .accept_offer(offer, key.clone(), transport.clone())
757            .unwrap();
758
759        let pump = tokio::spawn(resource.pump());
760        for _ in 0..32 {
761            if !transport.sent_rtp.lock().await.is_empty() {
762                break;
763            }
764            tokio::task::yield_now().await;
765        }
766        session.finish().await.unwrap();
767        let _ = pump.await.unwrap();
768
769        // The egress RTP round-trips back to the original VP9 frame.
770        let sent = transport.sent_rtp.lock().await;
771        assert!(!sent.is_empty(), "VP9 egress sent RTP packets");
772        let mut depack = crate::protocol::rtp::Vp9Depacketizer::new();
773        let mut out = None;
774        for p in sent.iter() {
775            let h = RtpHeader::parse(p).unwrap();
776            if let Some(f) = depack
777                .push(&p[h.payload_offset..], h.marker, h.timestamp)
778                .unwrap()
779            {
780                out = Some(f);
781            }
782        }
783        let out = out.expect("VP9 frame completed");
784        assert_eq!(&out.data[..], &frame_data[..], "VP9 frame reconstructed");
785        assert!(out.keyframe);
786    }
787}