pub mod rtcp;
pub mod sdp;
pub use sdp::{MediaDirection, SdpAnswerParams, SdpOffer};
use crate::bus::PlaybackRegistry;
use crate::inbound::{IngestContext, PublishSession};
use crate::protocol::rtp::{H264Depacketizer, RtpHeader, RtpPacketizer};
use crate::{CodecId, MediaFrame, Result, StreamKey};
use async_trait::async_trait;
use std::sync::Arc;
#[async_trait]
pub trait DtlsSrtpTransport: Send + Sync {
fn fingerprint(&self) -> String;
fn ice_credentials(&self) -> (String, String);
async fn recv_rtp(&self) -> Option<Vec<u8>> {
None
}
async fn send_rtp(&self, _packet: &[u8]) -> Result<()> {
Ok(())
}
async fn send_rtcp(&self, packet: &[u8]) -> Result<()>;
}
#[derive(Clone)]
pub struct WhipEndpoint {
ctx: IngestContext,
}
impl WhipEndpoint {
pub fn new(ctx: IngestContext) -> Self {
Self { ctx }
}
pub fn accept_offer(
&self,
offer_sdp: &str,
key: StreamKey,
transport: std::sync::Arc<dyn DtlsSrtpTransport>,
) -> Result<(WhipResource, String)> {
let offer = SdpOffer::parse(offer_sdp)
.ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
let (ufrag, pwd) = transport.ice_credentials();
let answer = sdp::build_answer(
&offer,
&SdpAnswerParams {
fingerprint: transport.fingerprint(),
ice_ufrag: ufrag,
ice_pwd: pwd,
},
);
let resource = WhipResource {
ctx: self.ctx.clone(),
key,
transport,
};
Ok((resource, answer))
}
}
pub struct WhipResource {
ctx: IngestContext,
key: StreamKey,
transport: std::sync::Arc<dyn DtlsSrtpTransport>,
}
impl WhipResource {
pub async fn pump(self) -> Result<()> {
let session: PublishSession = self.ctx.open_publish(self.key.clone()).await?;
let mut depack = H264Depacketizer::new();
let mut needs_keyframe = true;
while let Some(pkt) = self.transport.recv_rtp().await {
let Some(header) = RtpHeader::parse(&pkt) else {
continue;
};
let payload = &pkt[header.payload_offset..];
match depack.push(payload, header.marker, header.timestamp, header.sequence) {
Ok(Some(au)) => {
needs_keyframe = false;
let pts = (au.timestamp / 90) as i64;
let frame =
MediaFrame::new_video(pts, pts, au.data, CodecId::H264, au.keyframe);
let _ = session.publish_frame(frame)?;
}
Ok(None) => {}
Err(_) => {
needs_keyframe = true;
}
}
if needs_keyframe {
let pli = rtcp::build_pli(0, header.ssrc);
let _ = self.transport.send_rtcp(&pli).await;
}
}
session.finish().await
}
pub async fn close(self) -> Result<()> {
Ok(())
}
}
#[derive(Clone)]
pub struct WhepEndpoint {
playback: Arc<dyn PlaybackRegistry>,
}
impl WhepEndpoint {
pub fn new(playback: Arc<dyn PlaybackRegistry>) -> Self {
Self { playback }
}
pub fn accept_offer(
&self,
offer_sdp: &str,
key: StreamKey,
transport: Arc<dyn DtlsSrtpTransport>,
) -> Result<(WhepResource, String)> {
let offer = SdpOffer::parse(offer_sdp)
.ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
let (ufrag, pwd) = transport.ice_credentials();
let answer = sdp::build_answer_directed(
&offer,
&SdpAnswerParams {
fingerprint: transport.fingerprint(),
ice_ufrag: ufrag,
ice_pwd: pwd,
},
MediaDirection::SendOnly,
);
let resource = WhepResource {
playback: Arc::clone(&self.playback),
key,
transport,
payload_type: offer.payload_type,
warned_unsupported: std::sync::atomic::AtomicBool::new(false),
};
Ok((resource, answer))
}
}
pub struct WhepResource {
playback: Arc<dyn PlaybackRegistry>,
key: StreamKey,
transport: Arc<dyn DtlsSrtpTransport>,
payload_type: u8,
warned_unsupported: std::sync::atomic::AtomicBool,
}
impl WhepResource {
pub async fn pump(self) -> Result<()> {
let handle = self.playback.get_stream(&self.key)?;
let ssrc = 0x5745_4850; let mut packetizer = RtpPacketizer::new(self.payload_type, ssrc, 1200);
let mut sub = handle.subscribe_resilient();
let (vcfg, _) = handle.cached_configs();
let replay = handle.replay_buffer();
drop(handle);
if let Some(cfg) = vcfg {
self.send_frame(&cfg, &mut packetizer).await?;
}
for frame in replay {
self.send_frame(&frame, &mut packetizer).await?;
}
while let Some(frame) = sub.recv().await {
self.send_frame(&frame, &mut packetizer).await?;
}
Ok(())
}
async fn send_frame(&self, frame: &MediaFrame, packetizer: &mut RtpPacketizer) -> Result<()> {
if !frame.is_video() {
return Ok(()); }
if frame.codec != CodecId::H264 {
use std::sync::atomic::Ordering;
if !self.warned_unsupported.swap(true, Ordering::Relaxed) {
tracing::warn!(
stream = %self.key,
codec = ?frame.codec,
"WHEP egress: unsupported video codec; frames skipped (only H.264 is packetized today)",
);
}
return Ok(());
}
let timestamp = (frame.pts as u64).wrapping_mul(90) as u32; for packet in packetizer.packetize(&frame.data, timestamp) {
self.transport.send_rtp(&packet).await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bus::PlaybackRegistry;
use std::sync::Arc;
use tokio::sync::Mutex;
struct FakeTransport {
packets: Mutex<std::collections::VecDeque<Vec<u8>>>,
rtcp: Mutex<Vec<Vec<u8>>>,
sent_rtp: Mutex<Vec<Vec<u8>>>,
}
impl FakeTransport {
fn with_packets(packets: std::collections::VecDeque<Vec<u8>>) -> Self {
Self {
packets: Mutex::new(packets),
rtcp: Mutex::new(Vec::new()),
sent_rtp: Mutex::new(Vec::new()),
}
}
}
#[async_trait]
impl DtlsSrtpTransport for FakeTransport {
fn fingerprint(&self) -> String {
"sha-256 AA:BB".into()
}
fn ice_credentials(&self) -> (String, String) {
("ufrag".into(), "pwd".into())
}
async fn recv_rtp(&self) -> Option<Vec<u8>> {
self.packets.lock().await.pop_front()
}
async fn send_rtp(&self, packet: &[u8]) -> Result<()> {
self.sent_rtp.lock().await.push(packet.to_vec());
Ok(())
}
async fn send_rtcp(&self, packet: &[u8]) -> Result<()> {
self.rtcp.lock().await.push(packet.to_vec());
Ok(())
}
}
fn rtp_packet(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
p.extend_from_slice(&seq.to_be_bytes());
p.extend_from_slice(&ts.to_be_bytes());
p.extend_from_slice(&[0, 0, 0, 7]);
p.extend_from_slice(payload);
p
}
#[tokio::test]
async fn accept_offer_builds_answer_with_transport_credentials() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live"))
.build();
let endpoint = WhipEndpoint::new(IngestContext::new(engine));
let transport = Arc::new(FakeTransport::with_packets(Default::default()));
let offer = "v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
let (_res, answer) = endpoint
.accept_offer(offer, StreamKey::new("live", "web"), transport)
.unwrap();
assert!(answer.contains("a=ice-ufrag:ufrag"));
assert!(answer.contains("a=fingerprint:sha-256 AA:BB"));
assert!(answer.contains("a=setup:passive"));
}
#[tokio::test]
async fn pump_publishes_idr_then_releases_slot() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live").gop_cache(4))
.build();
let key = StreamKey::new("live", "web");
let ctx = IngestContext::new(engine.clone());
let mut q = std::collections::VecDeque::new();
q.push_back(rtp_packet(1, 0, true, &[0x65, 0x11])); let transport = Arc::new(FakeTransport::with_packets(q));
let resource = WhipResource {
ctx,
key: key.clone(),
transport,
};
resource.pump().await.unwrap();
assert!(engine.get_stream(&key).is_err());
}
#[tokio::test]
async fn pump_requests_keyframe_on_a_depacketize_gap() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live").gop_cache(4))
.build();
let ctx = IngestContext::new(engine);
let mut q = std::collections::VecDeque::new();
q.push_back(rtp_packet(1, 0, false, &[0x7C, 0x05, 0x11])); let transport = Arc::new(FakeTransport::with_packets(q));
let resource = WhipResource {
ctx,
key: StreamKey::new("live", "web2"),
transport: transport.clone(),
};
resource.pump().await.unwrap();
assert!(
!transport.rtcp.lock().await.is_empty(),
"a PLI was sent after the depacketize gap"
);
}
#[tokio::test]
async fn whep_egress_packetizes_published_frames_as_rtp() {
use crate::FrameFlags;
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live").gop_cache(8))
.build();
let key = StreamKey::new("live", "show");
let ctx = IngestContext::new(engine.clone());
let session = ctx.open_publish(key.clone()).await.unwrap();
let mut cfg = MediaFrame::new_video(
0,
0,
bytes::Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42]),
CodecId::H264,
false,
);
cfg.flags |= FrameFlags::CONFIG;
session.publish_frame(cfg).unwrap();
session
.publish_frame(MediaFrame::new_video(
10,
10,
bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65, 0x88, 0x99]),
CodecId::H264,
true,
))
.unwrap();
let whep = WhepEndpoint::new(engine.clone());
let transport = Arc::new(FakeTransport::with_packets(Default::default()));
let offer = "v=0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
let (resource, answer) = whep
.accept_offer(offer, key.clone(), transport.clone())
.unwrap();
assert!(answer.contains("a=sendonly"), "WHEP answer is sendonly");
let pump = tokio::spawn(resource.pump());
for _ in 0..32 {
if !transport.sent_rtp.lock().await.is_empty() {
break;
}
tokio::task::yield_now().await;
}
session.finish().await.unwrap();
let _ = pump.await.unwrap();
let sent = transport.sent_rtp.lock().await;
assert!(!sent.is_empty(), "egress sent RTP packets");
let h = RtpHeader::parse(&sent[0]).unwrap();
assert_eq!(h.payload_type, 96);
}
}