Skip to main content

arcly_stream/protocol/webrtc/
mod.rs

1//! WebRTC WHIP/WHEP ingest & egress signaling with a pluggable crypto transport
2//! (feature `webrtc`).
3//!
4//! This module ships the parts of WebRTC that are *protocol logic* — and which
5//! can therefore live in a `#![forbid(unsafe_code)]`, dependency-light kernel:
6//!
7//! - **WHIP ingest** ([`WhipEndpoint`]) and **WHEP egress** ([`WhepEndpoint`]):
8//!   the HTTP-driven SDP offer/answer exchange and resource lifecycle. The host
9//!   wires these calls into *its own* HTTP server (Axum, Hyper, …) — the kernel
10//!   never imposes a web framework.
11//! - **SDP munging** ([`sdp`]): parse a browser offer, emit a compatible answer.
12//! - **RTP routing**: incoming (decrypted) RTP feeds the shared
13//!   [`H264Depacketizer`] onto the bus (ingest); outgoing frames are framed by
14//!   [`RtpPacketizer`] and sent over the
15//!   transport (egress).
16//! - **RTCP feedback** ([`rtcp`]): build PLI/FIR keyframe requests to send back
17//!   upstream when a late subscriber needs an IDR.
18//!
19//! # The crypto seam
20//!
21//! A *working* WebRTC connection needs DTLS, SRTP, and ICE. Those cannot be
22//! hand-rolled correctly without a vetted crypto stack, so they are **not**
23//! implemented here. Instead the host supplies them through the
24//! [`DtlsSrtpTransport`] trait — backed by a crate such as `str0m` or
25//! `webrtc-rs` — and this module drives the media plane over it. The kernel thus
26//! stays crypto-free and `unsafe`-free while remaining fully WebRTC-capable when
27//! a transport is plugged in.
28//!
29//! This is an honest boundary: the signaling and media routing are real and
30//! tested; the encrypted transport is an injected dependency, by design.
31
32pub mod rtcp;
33pub mod sdp;
34
35pub use sdp::{MediaDirection, SdpAnswerParams, SdpOffer};
36
37use crate::bus::PlaybackRegistry;
38use crate::inbound::{IngestContext, PublishSession};
39use crate::protocol::rtp::{H264Depacketizer, RtpHeader, RtpPacketizer};
40use crate::{CodecId, MediaFrame, Result, StreamKey};
41use async_trait::async_trait;
42use std::sync::Arc;
43
44/// The host-supplied DTLS-SRTP transport for one peer connection.
45///
46/// Implement this over a vetted WebRTC crypto stack. The kernel calls it to pull
47/// decrypted RTP and to push RTCP feedback; it never sees keys or handshakes.
48#[async_trait]
49pub trait DtlsSrtpTransport: Send + Sync {
50    /// The DTLS certificate fingerprint (`sha-256 AA:BB:…`) to advertise in the
51    /// SDP answer's `a=fingerprint` line.
52    fn fingerprint(&self) -> String;
53
54    /// The ICE ufrag/pwd pair to advertise in the SDP answer.
55    fn ice_credentials(&self) -> (String, String);
56
57    /// Receive the next decrypted RTP packet, or `None` when the peer closes.
58    /// Used by the WHIP **ingest** path; a send-only WHEP transport may leave the
59    /// default (returns `None` immediately).
60    async fn recv_rtp(&self) -> Option<Vec<u8>> {
61        None
62    }
63
64    /// Send an RTP packet to the peer (SRTP-encrypted by the transport). Used by
65    /// the WHEP **egress** path.
66    async fn send_rtp(&self, _packet: &[u8]) -> Result<()> {
67        Ok(())
68    }
69
70    /// Send an RTCP packet (e.g. a PLI/FIR built by [`rtcp`]) back to the peer.
71    async fn send_rtcp(&self, packet: &[u8]) -> Result<()>;
72}
73
74/// A WHIP/WHEP signaling endpoint the host drives from its HTTP layer.
75///
76/// `POST` of an SDP offer → [`accept_offer`](Self::accept_offer) returns the SDP
77/// answer to write back with `201 Created`. The returned [`WhipResource`] is the
78/// handle the host stores (keyed by the `Location` URL) and later
79/// [`close`](WhipResource::close)s on `DELETE`.
80#[derive(Clone)]
81pub struct WhipEndpoint {
82    ctx: IngestContext,
83}
84
85impl WhipEndpoint {
86    /// Build an endpoint that publishes ingested media through `ctx`.
87    pub fn new(ctx: IngestContext) -> Self {
88        Self { ctx }
89    }
90
91    /// Handle a WHIP `POST`: validate the offer, mint the answer from the
92    /// transport's credentials, and return the resource handle plus the answer
93    /// SDP. The host then runs [`WhipResource::pump`] (typically `tokio::spawn`)
94    /// to move media for the connection's lifetime.
95    pub fn accept_offer(
96        &self,
97        offer_sdp: &str,
98        key: StreamKey,
99        transport: std::sync::Arc<dyn DtlsSrtpTransport>,
100    ) -> Result<(WhipResource, String)> {
101        let offer = SdpOffer::parse(offer_sdp)
102            .ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
103        let (ufrag, pwd) = transport.ice_credentials();
104        let answer = sdp::build_answer(
105            &offer,
106            &SdpAnswerParams {
107                fingerprint: transport.fingerprint(),
108                ice_ufrag: ufrag,
109                ice_pwd: pwd,
110            },
111        );
112        let resource = WhipResource {
113            ctx: self.ctx.clone(),
114            key,
115            transport,
116        };
117        Ok((resource, answer))
118    }
119}
120
121/// An accepted WHIP connection: pumps decrypted RTP onto the bus until the peer
122/// or transport closes.
123pub struct WhipResource {
124    ctx: IngestContext,
125    key: StreamKey,
126    transport: std::sync::Arc<dyn DtlsSrtpTransport>,
127}
128
129impl WhipResource {
130    /// Drive the media plane: depacketize incoming RTP into H.264 access units
131    /// and publish them. Returns when the transport yields `None` (peer gone).
132    pub async fn pump(self) -> Result<()> {
133        let session: PublishSession = self.ctx.open_publish(self.key.clone()).await?;
134        let mut depack = H264Depacketizer::new();
135        let mut needs_keyframe = true;
136
137        while let Some(pkt) = self.transport.recv_rtp().await {
138            let Some(header) = RtpHeader::parse(&pkt) else {
139                continue;
140            };
141            let payload = &pkt[header.payload_offset..];
142            match depack.push(payload, header.marker, header.timestamp, header.sequence) {
143                Ok(Some(au)) => {
144                    needs_keyframe = false;
145                    let pts = (au.timestamp / 90) as i64;
146                    let frame =
147                        MediaFrame::new_video(pts, pts, au.data, CodecId::H264, au.keyframe);
148                    let _ = session.publish_frame(frame)?;
149                }
150                Ok(None) => {}
151                Err(_) => {
152                    // Loss/gap: ask the sender for a fresh IDR via RTCP PLI.
153                    needs_keyframe = true;
154                }
155            }
156            if needs_keyframe {
157                let pli = rtcp::build_pli(0, header.ssrc);
158                let _ = self.transport.send_rtcp(&pli).await;
159            }
160        }
161
162        session.finish().await
163    }
164
165    /// Tear the resource down on a WHIP `DELETE` without pumping media.
166    pub async fn close(self) -> Result<()> {
167        Ok(())
168    }
169}
170
171/// A WHEP (egress) signaling endpoint — the playback counterpart to
172/// [`WhipEndpoint`].
173///
174/// A viewer `POST`s an SDP offer; [`accept_offer`](Self::accept_offer) returns a
175/// `sendonly` answer and a [`WhepResource`]. The host then runs
176/// [`WhepResource::pump`], which subscribes to the requested live stream,
177/// packetizes each H.264 access unit into RTP, and sends it over the peer's
178/// [`DtlsSrtpTransport`] — sub-second WebRTC playback.
179#[derive(Clone)]
180pub struct WhepEndpoint {
181    playback: Arc<dyn PlaybackRegistry>,
182}
183
184impl WhepEndpoint {
185    /// Build an endpoint that serves media from `playback` (e.g. an `Arc<Engine>`).
186    pub fn new(playback: Arc<dyn PlaybackRegistry>) -> Self {
187        Self { playback }
188    }
189
190    /// Handle a WHEP `POST`: validate the offer and mint a `sendonly` answer.
191    /// Returns the resource handle (to `pump`) and the answer SDP.
192    pub fn accept_offer(
193        &self,
194        offer_sdp: &str,
195        key: StreamKey,
196        transport: Arc<dyn DtlsSrtpTransport>,
197    ) -> Result<(WhepResource, String)> {
198        let offer = SdpOffer::parse(offer_sdp)
199            .ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
200        let (ufrag, pwd) = transport.ice_credentials();
201        let answer = sdp::build_answer_directed(
202            &offer,
203            &SdpAnswerParams {
204                fingerprint: transport.fingerprint(),
205                ice_ufrag: ufrag,
206                ice_pwd: pwd,
207            },
208            MediaDirection::SendOnly,
209        );
210        let resource = WhepResource {
211            playback: Arc::clone(&self.playback),
212            key,
213            transport,
214            payload_type: offer.payload_type,
215            warned_unsupported: std::sync::atomic::AtomicBool::new(false),
216        };
217        Ok((resource, answer))
218    }
219}
220
221/// An accepted WHEP connection: streams one live stream out to the viewer as RTP
222/// until the stream ends or the peer disconnects.
223pub struct WhepResource {
224    playback: Arc<dyn PlaybackRegistry>,
225    key: StreamKey,
226    transport: Arc<dyn DtlsSrtpTransport>,
227    payload_type: u8,
228    /// Set once we have warned about an unsupported egress video codec, so the
229    /// log line fires a single time per connection instead of per frame.
230    warned_unsupported: std::sync::atomic::AtomicBool,
231}
232
233impl WhepResource {
234    /// Drive egress: subscribe to the stream, replay the cached config + GOP for
235    /// an instant start, then packetize and send every published video frame.
236    /// Returns when the stream closes or the subscription lags out.
237    ///
238    /// The RTP payload format is selected from the stream's video codec: H.264
239    /// (RFC 6184) and H.265 (RFC 7798) are packetized; other video codecs are
240    /// skipped with a single warning per connection, and audio is skipped.
241    pub async fn pump(self) -> Result<()> {
242        let handle = self.playback.get_stream(&self.key)?;
243        // SSRC derived from the key so retries are stable; real deployments may
244        // randomize per PeerConnection.
245        let ssrc = 0x5745_4850; // "WEHP"
246        let mut sub = handle.subscribe_resilient();
247
248        // Instant start: send the cached config frame + GOP before live frames.
249        let (vcfg, _) = handle.cached_configs();
250        let replay = handle.replay_buffer();
251        // Release the handle once setup is done. With the registry-owned sender
252        // (`StreamHandle::close`) the channel closes on publish-end regardless,
253        // but dropping eagerly keeps this pump from pinning per-stream state.
254        drop(handle);
255
256        // Pick the payload format from the stream's video codec (config frame
257        // first, else the first replayed video frame; defaulting to H.264).
258        let video_codec = vcfg
259            .as_ref()
260            .map(|c| c.codec)
261            .or_else(|| replay.iter().find(|f| f.is_video()).map(|f| f.codec))
262            .unwrap_or(CodecId::H264);
263        let mut packetizer = match video_codec {
264            CodecId::H265 => RtpPacketizer::new_h265(self.payload_type, ssrc, 1200),
265            _ => RtpPacketizer::new(self.payload_type, ssrc, 1200),
266        };
267
268        if let Some(cfg) = vcfg {
269            self.send_frame(&cfg, &mut packetizer).await?;
270        }
271        for frame in replay {
272            self.send_frame(&frame, &mut packetizer).await?;
273        }
274
275        while let Some(frame) = sub.recv().await {
276            self.send_frame(&frame, &mut packetizer).await?;
277        }
278        Ok(())
279    }
280
281    /// Packetize one video frame and send each RTP packet over the transport.
282    ///
283    /// Audio frames are skipped (egress is video-only for now). Video is
284    /// packetized for the NAL-based codecs WHEP egress supports — H.264 (RFC
285    /// 6184) and H.265 (RFC 7798); any other video codec (AV1/VP9, whose RTP
286    /// payload formats are structurally different) is skipped with a single
287    /// warning per connection — an *observable* skip, never a silent drop.
288    async fn send_frame(&self, frame: &MediaFrame, packetizer: &mut RtpPacketizer) -> Result<()> {
289        if !frame.is_video() {
290            return Ok(()); // egress is video-only for now
291        }
292        if !matches!(frame.codec, CodecId::H264 | CodecId::H265) {
293            use std::sync::atomic::Ordering;
294            if !self.warned_unsupported.swap(true, Ordering::Relaxed) {
295                tracing::warn!(
296                    stream = %self.key,
297                    codec = ?frame.codec,
298                    "WHEP egress: unsupported video codec; frames skipped (H.264/H.265 only)",
299                );
300            }
301            return Ok(());
302        }
303        let timestamp = (frame.pts as u64).wrapping_mul(90) as u32; // ms → 90 kHz
304        for packet in packetizer.packetize(&frame.data, timestamp) {
305            self.transport.send_rtp(&packet).await?;
306        }
307        Ok(())
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use crate::bus::PlaybackRegistry;
315    use std::sync::Arc;
316    use tokio::sync::Mutex;
317
318    /// A fake transport that replays a fixed RTP script and records what is sent.
319    struct FakeTransport {
320        packets: Mutex<std::collections::VecDeque<Vec<u8>>>,
321        rtcp: Mutex<Vec<Vec<u8>>>,
322        sent_rtp: Mutex<Vec<Vec<u8>>>,
323    }
324
325    impl FakeTransport {
326        fn with_packets(packets: std::collections::VecDeque<Vec<u8>>) -> Self {
327            Self {
328                packets: Mutex::new(packets),
329                rtcp: Mutex::new(Vec::new()),
330                sent_rtp: Mutex::new(Vec::new()),
331            }
332        }
333    }
334
335    #[async_trait]
336    impl DtlsSrtpTransport for FakeTransport {
337        fn fingerprint(&self) -> String {
338            "sha-256 AA:BB".into()
339        }
340        fn ice_credentials(&self) -> (String, String) {
341            ("ufrag".into(), "pwd".into())
342        }
343        async fn recv_rtp(&self) -> Option<Vec<u8>> {
344            self.packets.lock().await.pop_front()
345        }
346        async fn send_rtp(&self, packet: &[u8]) -> Result<()> {
347            self.sent_rtp.lock().await.push(packet.to_vec());
348            Ok(())
349        }
350        async fn send_rtcp(&self, packet: &[u8]) -> Result<()> {
351            self.rtcp.lock().await.push(packet.to_vec());
352            Ok(())
353        }
354    }
355
356    fn rtp_packet(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
357        let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
358        p.extend_from_slice(&seq.to_be_bytes());
359        p.extend_from_slice(&ts.to_be_bytes());
360        p.extend_from_slice(&[0, 0, 0, 7]);
361        p.extend_from_slice(payload);
362        p
363    }
364
365    #[tokio::test]
366    async fn accept_offer_builds_answer_with_transport_credentials() {
367        let engine = crate::Engine::builder()
368            .application(crate::AppSpec::new("live"))
369            .build();
370        let endpoint = WhipEndpoint::new(IngestContext::new(engine));
371        let transport = Arc::new(FakeTransport::with_packets(Default::default()));
372        let offer = "v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
373        let (_res, answer) = endpoint
374            .accept_offer(offer, StreamKey::new("live", "web"), transport)
375            .unwrap();
376        assert!(answer.contains("a=ice-ufrag:ufrag"));
377        assert!(answer.contains("a=fingerprint:sha-256 AA:BB"));
378        assert!(answer.contains("a=setup:passive"));
379    }
380
381    #[tokio::test]
382    async fn pump_publishes_idr_then_releases_slot() {
383        let engine = crate::Engine::builder()
384            .application(crate::AppSpec::new("live").gop_cache(4))
385            .build();
386        let key = StreamKey::new("live", "web");
387        let ctx = IngestContext::new(engine.clone());
388
389        let mut q = std::collections::VecDeque::new();
390        q.push_back(rtp_packet(1, 0, true, &[0x65, 0x11])); // single IDR, marker
391        let transport = Arc::new(FakeTransport::with_packets(q));
392
393        let resource = WhipResource {
394            ctx,
395            key: key.clone(),
396            transport,
397        };
398        resource.pump().await.unwrap();
399
400        // A complete keyframe was published, so no PLI was needed; the publish
401        // slot is released once the transport drained.
402        assert!(engine.get_stream(&key).is_err());
403    }
404
405    #[tokio::test]
406    async fn pump_requests_keyframe_on_a_depacketize_gap() {
407        let engine = crate::Engine::builder()
408            .application(crate::AppSpec::new("live").gop_cache(4))
409            .build();
410        let ctx = IngestContext::new(engine);
411
412        // A mid FU-A fragment with no start bit forces an OutOfOrder error → PLI.
413        let mut q = std::collections::VecDeque::new();
414        q.push_back(rtp_packet(1, 0, false, &[0x7C, 0x05, 0x11])); // FU-A, S=0
415        let transport = Arc::new(FakeTransport::with_packets(q));
416
417        let resource = WhipResource {
418            ctx,
419            key: StreamKey::new("live", "web2"),
420            transport: transport.clone(),
421        };
422        resource.pump().await.unwrap();
423        assert!(
424            !transport.rtcp.lock().await.is_empty(),
425            "a PLI was sent after the depacketize gap"
426        );
427    }
428
429    #[tokio::test]
430    async fn whep_egress_packetizes_published_frames_as_rtp() {
431        use crate::FrameFlags;
432        let engine = crate::Engine::builder()
433            .application(crate::AppSpec::new("live").gop_cache(8))
434            .build();
435        let key = StreamKey::new("live", "show");
436
437        // Publish a config + keyframe into the stream via an ingest session.
438        let ctx = IngestContext::new(engine.clone());
439        let session = ctx.open_publish(key.clone()).await.unwrap();
440        let mut cfg = MediaFrame::new_video(
441            0,
442            0,
443            bytes::Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42]),
444            CodecId::H264,
445            false,
446        );
447        cfg.flags |= FrameFlags::CONFIG;
448        session.publish_frame(cfg).unwrap();
449        session
450            .publish_frame(MediaFrame::new_video(
451                10,
452                10,
453                bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65, 0x88, 0x99]),
454                CodecId::H264,
455                true,
456            ))
457            .unwrap();
458
459        // A WHEP viewer subscribes and pumps; the bus closes when we finish().
460        let whep = WhepEndpoint::new(engine.clone());
461        let transport = Arc::new(FakeTransport::with_packets(Default::default()));
462        let offer = "v=0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
463        let (resource, answer) = whep
464            .accept_offer(offer, key.clone(), transport.clone())
465            .unwrap();
466        assert!(answer.contains("a=sendonly"), "WHEP answer is sendonly");
467
468        let pump = tokio::spawn(resource.pump());
469        // Let the instant-start replay (config + keyframe) flush, then end the stream.
470        for _ in 0..32 {
471            if !transport.sent_rtp.lock().await.is_empty() {
472                break;
473            }
474            tokio::task::yield_now().await;
475        }
476        session.finish().await.unwrap();
477        let _ = pump.await.unwrap();
478
479        let sent = transport.sent_rtp.lock().await;
480        assert!(!sent.is_empty(), "egress sent RTP packets");
481        // The packets parse as RTP with our payload type.
482        let h = RtpHeader::parse(&sent[0]).unwrap();
483        assert_eq!(h.payload_type, 96);
484    }
485}