arcly_stream/protocol/webrtc/
mod.rs1pub mod rtcp;
33pub mod sdp;
34
35pub use sdp::{MediaDirection, SdpAnswerParams, SdpOffer};
36
37use crate::bus::PlaybackRegistry;
38use crate::inbound::{IngestContext, PublishSession};
39use crate::protocol::rtp::{H264Depacketizer, RtpHeader, RtpPacketizer};
40use crate::{CodecId, MediaFrame, Result, StreamKey};
41use async_trait::async_trait;
42use std::sync::Arc;
43
44#[async_trait]
49pub trait DtlsSrtpTransport: Send + Sync {
50 fn fingerprint(&self) -> String;
53
54 fn ice_credentials(&self) -> (String, String);
56
57 async fn recv_rtp(&self) -> Option<Vec<u8>> {
61 None
62 }
63
64 async fn send_rtp(&self, _packet: &[u8]) -> Result<()> {
67 Ok(())
68 }
69
70 async fn send_rtcp(&self, packet: &[u8]) -> Result<()>;
72}
73
74#[derive(Clone)]
81pub struct WhipEndpoint {
82 ctx: IngestContext,
83}
84
85impl WhipEndpoint {
86 pub fn new(ctx: IngestContext) -> Self {
88 Self { ctx }
89 }
90
91 pub fn accept_offer(
96 &self,
97 offer_sdp: &str,
98 key: StreamKey,
99 transport: std::sync::Arc<dyn DtlsSrtpTransport>,
100 ) -> Result<(WhipResource, String)> {
101 let offer = SdpOffer::parse(offer_sdp)
102 .ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
103 let (ufrag, pwd) = transport.ice_credentials();
104 let answer = sdp::build_answer(
105 &offer,
106 &SdpAnswerParams {
107 fingerprint: transport.fingerprint(),
108 ice_ufrag: ufrag,
109 ice_pwd: pwd,
110 },
111 );
112 let resource = WhipResource {
113 ctx: self.ctx.clone(),
114 key,
115 transport,
116 };
117 Ok((resource, answer))
118 }
119}
120
121pub struct WhipResource {
124 ctx: IngestContext,
125 key: StreamKey,
126 transport: std::sync::Arc<dyn DtlsSrtpTransport>,
127}
128
129impl WhipResource {
130 pub async fn pump(self) -> Result<()> {
133 let session: PublishSession = self.ctx.open_publish(self.key.clone()).await?;
134 let mut depack = H264Depacketizer::new();
135 let mut needs_keyframe = true;
136
137 while let Some(pkt) = self.transport.recv_rtp().await {
138 let Some(header) = RtpHeader::parse(&pkt) else {
139 continue;
140 };
141 let payload = &pkt[header.payload_offset..];
142 match depack.push(payload, header.marker, header.timestamp, header.sequence) {
143 Ok(Some(au)) => {
144 needs_keyframe = false;
145 let pts = (au.timestamp / 90) as i64;
146 let frame =
147 MediaFrame::new_video(pts, pts, au.data, CodecId::H264, au.keyframe);
148 let _ = session.publish_frame(frame)?;
149 }
150 Ok(None) => {}
151 Err(_) => {
152 needs_keyframe = true;
154 }
155 }
156 if needs_keyframe {
157 let pli = rtcp::build_pli(0, header.ssrc);
158 let _ = self.transport.send_rtcp(&pli).await;
159 }
160 }
161
162 session.finish().await
163 }
164
165 pub async fn close(self) -> Result<()> {
167 Ok(())
168 }
169}
170
171#[derive(Clone)]
180pub struct WhepEndpoint {
181 playback: Arc<dyn PlaybackRegistry>,
182}
183
184impl WhepEndpoint {
185 pub fn new(playback: Arc<dyn PlaybackRegistry>) -> Self {
187 Self { playback }
188 }
189
190 pub fn accept_offer(
193 &self,
194 offer_sdp: &str,
195 key: StreamKey,
196 transport: Arc<dyn DtlsSrtpTransport>,
197 ) -> Result<(WhepResource, String)> {
198 let offer = SdpOffer::parse(offer_sdp)
199 .ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
200 let (ufrag, pwd) = transport.ice_credentials();
201 let answer = sdp::build_answer_directed(
202 &offer,
203 &SdpAnswerParams {
204 fingerprint: transport.fingerprint(),
205 ice_ufrag: ufrag,
206 ice_pwd: pwd,
207 },
208 MediaDirection::SendOnly,
209 );
210 let resource = WhepResource {
211 playback: Arc::clone(&self.playback),
212 key,
213 transport,
214 payload_type: offer.payload_type,
215 warned_unsupported: std::sync::atomic::AtomicBool::new(false),
216 };
217 Ok((resource, answer))
218 }
219}
220
221pub struct WhepResource {
224 playback: Arc<dyn PlaybackRegistry>,
225 key: StreamKey,
226 transport: Arc<dyn DtlsSrtpTransport>,
227 payload_type: u8,
228 warned_unsupported: std::sync::atomic::AtomicBool,
231}
232
233impl WhepResource {
234 pub async fn pump(self) -> Result<()> {
242 let handle = self.playback.get_stream(&self.key)?;
243 let ssrc = 0x5745_4850; let mut sub = handle.subscribe_resilient();
247
248 let (vcfg, _) = handle.cached_configs();
250 let replay = handle.replay_buffer();
251 drop(handle);
255
256 let video_codec = vcfg
259 .as_ref()
260 .map(|c| c.codec)
261 .or_else(|| replay.iter().find(|f| f.is_video()).map(|f| f.codec))
262 .unwrap_or(CodecId::H264);
263 let mut packetizer = match video_codec {
264 CodecId::H265 => RtpPacketizer::new_h265(self.payload_type, ssrc, 1200),
265 _ => RtpPacketizer::new(self.payload_type, ssrc, 1200),
266 };
267
268 if let Some(cfg) = vcfg {
269 self.send_frame(&cfg, &mut packetizer).await?;
270 }
271 for frame in replay {
272 self.send_frame(&frame, &mut packetizer).await?;
273 }
274
275 while let Some(frame) = sub.recv().await {
276 self.send_frame(&frame, &mut packetizer).await?;
277 }
278 Ok(())
279 }
280
281 async fn send_frame(&self, frame: &MediaFrame, packetizer: &mut RtpPacketizer) -> Result<()> {
289 if !frame.is_video() {
290 return Ok(()); }
292 if !matches!(frame.codec, CodecId::H264 | CodecId::H265) {
293 use std::sync::atomic::Ordering;
294 if !self.warned_unsupported.swap(true, Ordering::Relaxed) {
295 tracing::warn!(
296 stream = %self.key,
297 codec = ?frame.codec,
298 "WHEP egress: unsupported video codec; frames skipped (H.264/H.265 only)",
299 );
300 }
301 return Ok(());
302 }
303 let timestamp = (frame.pts as u64).wrapping_mul(90) as u32; for packet in packetizer.packetize(&frame.data, timestamp) {
305 self.transport.send_rtp(&packet).await?;
306 }
307 Ok(())
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::bus::PlaybackRegistry;
315 use std::sync::Arc;
316 use tokio::sync::Mutex;
317
318 struct FakeTransport {
320 packets: Mutex<std::collections::VecDeque<Vec<u8>>>,
321 rtcp: Mutex<Vec<Vec<u8>>>,
322 sent_rtp: Mutex<Vec<Vec<u8>>>,
323 }
324
325 impl FakeTransport {
326 fn with_packets(packets: std::collections::VecDeque<Vec<u8>>) -> Self {
327 Self {
328 packets: Mutex::new(packets),
329 rtcp: Mutex::new(Vec::new()),
330 sent_rtp: Mutex::new(Vec::new()),
331 }
332 }
333 }
334
335 #[async_trait]
336 impl DtlsSrtpTransport for FakeTransport {
337 fn fingerprint(&self) -> String {
338 "sha-256 AA:BB".into()
339 }
340 fn ice_credentials(&self) -> (String, String) {
341 ("ufrag".into(), "pwd".into())
342 }
343 async fn recv_rtp(&self) -> Option<Vec<u8>> {
344 self.packets.lock().await.pop_front()
345 }
346 async fn send_rtp(&self, packet: &[u8]) -> Result<()> {
347 self.sent_rtp.lock().await.push(packet.to_vec());
348 Ok(())
349 }
350 async fn send_rtcp(&self, packet: &[u8]) -> Result<()> {
351 self.rtcp.lock().await.push(packet.to_vec());
352 Ok(())
353 }
354 }
355
356 fn rtp_packet(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
357 let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
358 p.extend_from_slice(&seq.to_be_bytes());
359 p.extend_from_slice(&ts.to_be_bytes());
360 p.extend_from_slice(&[0, 0, 0, 7]);
361 p.extend_from_slice(payload);
362 p
363 }
364
365 #[tokio::test]
366 async fn accept_offer_builds_answer_with_transport_credentials() {
367 let engine = crate::Engine::builder()
368 .application(crate::AppSpec::new("live"))
369 .build();
370 let endpoint = WhipEndpoint::new(IngestContext::new(engine));
371 let transport = Arc::new(FakeTransport::with_packets(Default::default()));
372 let offer = "v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
373 let (_res, answer) = endpoint
374 .accept_offer(offer, StreamKey::new("live", "web"), transport)
375 .unwrap();
376 assert!(answer.contains("a=ice-ufrag:ufrag"));
377 assert!(answer.contains("a=fingerprint:sha-256 AA:BB"));
378 assert!(answer.contains("a=setup:passive"));
379 }
380
381 #[tokio::test]
382 async fn pump_publishes_idr_then_releases_slot() {
383 let engine = crate::Engine::builder()
384 .application(crate::AppSpec::new("live").gop_cache(4))
385 .build();
386 let key = StreamKey::new("live", "web");
387 let ctx = IngestContext::new(engine.clone());
388
389 let mut q = std::collections::VecDeque::new();
390 q.push_back(rtp_packet(1, 0, true, &[0x65, 0x11])); let transport = Arc::new(FakeTransport::with_packets(q));
392
393 let resource = WhipResource {
394 ctx,
395 key: key.clone(),
396 transport,
397 };
398 resource.pump().await.unwrap();
399
400 assert!(engine.get_stream(&key).is_err());
403 }
404
405 #[tokio::test]
406 async fn pump_requests_keyframe_on_a_depacketize_gap() {
407 let engine = crate::Engine::builder()
408 .application(crate::AppSpec::new("live").gop_cache(4))
409 .build();
410 let ctx = IngestContext::new(engine);
411
412 let mut q = std::collections::VecDeque::new();
414 q.push_back(rtp_packet(1, 0, false, &[0x7C, 0x05, 0x11])); let transport = Arc::new(FakeTransport::with_packets(q));
416
417 let resource = WhipResource {
418 ctx,
419 key: StreamKey::new("live", "web2"),
420 transport: transport.clone(),
421 };
422 resource.pump().await.unwrap();
423 assert!(
424 !transport.rtcp.lock().await.is_empty(),
425 "a PLI was sent after the depacketize gap"
426 );
427 }
428
429 #[tokio::test]
430 async fn whep_egress_packetizes_published_frames_as_rtp() {
431 use crate::FrameFlags;
432 let engine = crate::Engine::builder()
433 .application(crate::AppSpec::new("live").gop_cache(8))
434 .build();
435 let key = StreamKey::new("live", "show");
436
437 let ctx = IngestContext::new(engine.clone());
439 let session = ctx.open_publish(key.clone()).await.unwrap();
440 let mut cfg = MediaFrame::new_video(
441 0,
442 0,
443 bytes::Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42]),
444 CodecId::H264,
445 false,
446 );
447 cfg.flags |= FrameFlags::CONFIG;
448 session.publish_frame(cfg).unwrap();
449 session
450 .publish_frame(MediaFrame::new_video(
451 10,
452 10,
453 bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65, 0x88, 0x99]),
454 CodecId::H264,
455 true,
456 ))
457 .unwrap();
458
459 let whep = WhepEndpoint::new(engine.clone());
461 let transport = Arc::new(FakeTransport::with_packets(Default::default()));
462 let offer = "v=0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
463 let (resource, answer) = whep
464 .accept_offer(offer, key.clone(), transport.clone())
465 .unwrap();
466 assert!(answer.contains("a=sendonly"), "WHEP answer is sendonly");
467
468 let pump = tokio::spawn(resource.pump());
469 for _ in 0..32 {
471 if !transport.sent_rtp.lock().await.is_empty() {
472 break;
473 }
474 tokio::task::yield_now().await;
475 }
476 session.finish().await.unwrap();
477 let _ = pump.await.unwrap();
478
479 let sent = transport.sent_rtp.lock().await;
480 assert!(!sent.is_empty(), "egress sent RTP packets");
481 let h = RtpHeader::parse(&sent[0]).unwrap();
483 assert_eq!(h.payload_type, 96);
484 }
485}