pub mod rtcp;
pub mod sdp;
pub use sdp::{MediaDirection, SdpAnswerParams, SdpOffer};
use crate::bus::PlaybackRegistry;
use crate::inbound::{IngestContext, PublishSession};
#[cfg(feature = "codec-av1")]
use crate::protocol::rtp::Av1Packetizer;
use crate::protocol::rtp::{
H264Depacketizer, OpusPacketizer, RtpHeader, RtpPacketizer, Vp9Packetizer,
};
use crate::{CodecId, MediaFrame, Result, StreamKey};
use async_trait::async_trait;
use std::sync::Arc;
#[async_trait]
pub trait DtlsSrtpTransport: Send + Sync {
fn fingerprint(&self) -> String;
fn ice_credentials(&self) -> (String, String);
async fn recv_rtp(&self) -> Option<Vec<u8>> {
None
}
async fn send_rtp(&self, _packet: &[u8]) -> Result<()> {
Ok(())
}
async fn send_rtcp(&self, packet: &[u8]) -> Result<()>;
fn answer(&self, offer_sdp: &str, direction: MediaDirection) -> String {
let Some(offer) = SdpOffer::parse(offer_sdp) else {
return String::new();
};
let (ice_ufrag, ice_pwd) = self.ice_credentials();
sdp::build_answer_directed(
&offer,
&SdpAnswerParams {
fingerprint: self.fingerprint(),
ice_ufrag,
ice_pwd,
},
direction,
)
}
}
#[derive(Clone)]
pub struct WhipEndpoint {
ctx: IngestContext,
}
impl WhipEndpoint {
pub fn new(ctx: IngestContext) -> Self {
Self { ctx }
}
pub fn accept_offer(
&self,
offer_sdp: &str,
key: StreamKey,
transport: std::sync::Arc<dyn DtlsSrtpTransport>,
) -> Result<(WhipResource, String)> {
let offer = SdpOffer::parse(offer_sdp)
.ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
let answer = transport.answer(offer_sdp, MediaDirection::RecvOnly);
let resource = WhipResource {
ctx: self.ctx.clone(),
key,
transport,
video_pt: offer.payload_type,
audio_pt: offer.audio_payload_type,
};
Ok((resource, answer))
}
}
pub struct WhipResource {
ctx: IngestContext,
key: StreamKey,
transport: std::sync::Arc<dyn DtlsSrtpTransport>,
video_pt: u8,
audio_pt: Option<u8>,
}
impl WhipResource {
pub async fn pump(self) -> Result<()> {
let session: PublishSession = self.ctx.open_publish(self.key.clone()).await?;
let mut depack = H264Depacketizer::new();
let mut needs_keyframe = true;
while let Some(pkt) = self.transport.recv_rtp().await {
let Some(header) = RtpHeader::parse(&pkt) else {
continue;
};
let payload = &pkt[header.payload_offset..];
if self.audio_pt == Some(header.payload_type) {
if !payload.is_empty() {
let pts = (header.timestamp / 48) as i64;
let data = bytes::Bytes::copy_from_slice(payload);
let frame = MediaFrame::new_audio(pts, data, CodecId::Opus);
let _ = session.publish_frame(frame)?;
}
continue;
}
let _ = self.video_pt; match depack.push(payload, header.marker, header.timestamp, header.sequence) {
Ok(Some(au)) => {
needs_keyframe = false;
let pts = (au.timestamp / 90) as i64;
let frame =
MediaFrame::new_video(pts, pts, au.data, CodecId::H264, au.keyframe);
let _ = session.publish_frame(frame)?;
}
Ok(None) => {}
Err(_) => {
needs_keyframe = true;
}
}
if needs_keyframe {
let pli = rtcp::build_pli(0, header.ssrc);
let _ = self.transport.send_rtcp(&pli).await;
}
}
session.finish().await
}
pub async fn close(self) -> Result<()> {
Ok(())
}
}
#[derive(Clone)]
pub struct WhepEndpoint {
playback: Arc<dyn PlaybackRegistry>,
}
impl WhepEndpoint {
pub fn new(playback: Arc<dyn PlaybackRegistry>) -> Self {
Self { playback }
}
pub fn accept_offer(
&self,
offer_sdp: &str,
key: StreamKey,
transport: Arc<dyn DtlsSrtpTransport>,
) -> Result<(WhepResource, String)> {
let offer = SdpOffer::parse(offer_sdp)
.ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
let answer = transport.answer(offer_sdp, MediaDirection::SendOnly);
let resource = WhepResource {
playback: Arc::clone(&self.playback),
key,
transport,
payload_type: offer.payload_type,
audio_payload_type: offer.audio_payload_type,
warned_unsupported: std::sync::atomic::AtomicBool::new(false),
};
Ok((resource, answer))
}
}
pub struct WhepResource {
playback: Arc<dyn PlaybackRegistry>,
key: StreamKey,
transport: Arc<dyn DtlsSrtpTransport>,
payload_type: u8,
audio_payload_type: Option<u8>,
warned_unsupported: std::sync::atomic::AtomicBool,
}
enum EgressPacketizer {
Nal { p: RtpPacketizer, codec: CodecId },
Vp9(Vp9Packetizer),
#[cfg(feature = "codec-av1")]
Av1(Av1Packetizer),
}
impl EgressPacketizer {
fn for_codec(payload_type: u8, ssrc: u32, mtu: usize, codec: CodecId) -> Self {
match codec {
CodecId::H265 => EgressPacketizer::Nal {
p: RtpPacketizer::new_h265(payload_type, ssrc, mtu),
codec: CodecId::H265,
},
CodecId::VP9 => EgressPacketizer::Vp9(Vp9Packetizer::new(payload_type, ssrc, mtu)),
#[cfg(feature = "codec-av1")]
CodecId::AV1 => EgressPacketizer::Av1(Av1Packetizer::new(payload_type, ssrc, mtu)),
_ => EgressPacketizer::Nal {
p: RtpPacketizer::new(payload_type, ssrc, mtu),
codec: CodecId::H264,
},
}
}
fn packetize_into(&mut self, frame: &MediaFrame, out: &mut Vec<Vec<u8>>) -> bool {
let ts = (frame.pts.max(0) as u64).wrapping_mul(90) as u32; match self {
EgressPacketizer::Nal { p, codec } if frame.codec == *codec => {
p.packetize_into(&frame.data, ts, out);
true
}
EgressPacketizer::Vp9(p) if frame.codec == CodecId::VP9 => {
p.packetize_into(&frame.data, ts, frame.is_keyframe(), out);
true
}
#[cfg(feature = "codec-av1")]
EgressPacketizer::Av1(p) if frame.codec == CodecId::AV1 => {
p.packetize_into(&frame.data, ts, out);
true
}
_ => false,
}
}
}
impl WhepResource {
pub async fn pump(self) -> Result<()> {
let handle = self.playback.get_stream(&self.key)?;
let ssrc = 0x5745_4850; let mut sub = handle.subscribe_resilient();
let (vcfg, _) = handle.cached_configs();
let replay = handle.replay_buffer();
drop(handle);
let video_codec = vcfg
.as_ref()
.map(|c| c.codec)
.or_else(|| replay.iter().find(|f| f.is_video()).map(|f| f.codec))
.unwrap_or(CodecId::H264);
let mut packetizer =
EgressPacketizer::for_codec(self.payload_type, ssrc, 1200, video_codec);
let mut audio = self
.audio_payload_type
.map(|pt| OpusPacketizer::new(pt, 0x5745_4151));
let mut pkts: Vec<Vec<u8>> = Vec::new();
if let Some(cfg) = vcfg {
self.send_frame(&cfg, &mut packetizer, &mut audio, &mut pkts)
.await?;
}
for frame in replay {
self.send_frame(&frame, &mut packetizer, &mut audio, &mut pkts)
.await?;
}
while let Some(frame) = sub.recv().await {
self.send_frame(&frame, &mut packetizer, &mut audio, &mut pkts)
.await?;
}
Ok(())
}
async fn send_frame(
&self,
frame: &MediaFrame,
packetizer: &mut EgressPacketizer,
audio: &mut Option<OpusPacketizer>,
pkts: &mut Vec<Vec<u8>>,
) -> Result<()> {
if frame.is_audio() {
if let Some(ap) = audio.as_mut() {
if frame.codec == CodecId::Opus {
let ts = (frame.pts.max(0) as u64).wrapping_mul(48) as u32; ap.packetize_into(&frame.data, ts, pkts);
for packet in pkts.iter() {
self.transport.send_rtp(packet).await?;
}
}
}
return Ok(());
}
if !frame.is_video() {
return Ok(());
}
if packetizer.packetize_into(frame, pkts) {
for packet in pkts.iter() {
self.transport.send_rtp(packet).await?;
}
} else {
use std::sync::atomic::Ordering;
if !self.warned_unsupported.swap(true, Ordering::Relaxed) {
tracing::warn!(
stream = %self.key,
codec = ?frame.codec,
"WHEP egress: unsupported video codec; frames skipped",
);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bus::PlaybackRegistry;
use std::sync::Arc;
use tokio::sync::Mutex;
struct FakeTransport {
packets: Mutex<std::collections::VecDeque<Vec<u8>>>,
rtcp: Mutex<Vec<Vec<u8>>>,
sent_rtp: Mutex<Vec<Vec<u8>>>,
}
impl FakeTransport {
fn with_packets(packets: std::collections::VecDeque<Vec<u8>>) -> Self {
Self {
packets: Mutex::new(packets),
rtcp: Mutex::new(Vec::new()),
sent_rtp: Mutex::new(Vec::new()),
}
}
}
#[async_trait]
impl DtlsSrtpTransport for FakeTransport {
fn fingerprint(&self) -> String {
"sha-256 AA:BB".into()
}
fn ice_credentials(&self) -> (String, String) {
("ufrag".into(), "pwd".into())
}
async fn recv_rtp(&self) -> Option<Vec<u8>> {
self.packets.lock().await.pop_front()
}
async fn send_rtp(&self, packet: &[u8]) -> Result<()> {
self.sent_rtp.lock().await.push(packet.to_vec());
Ok(())
}
async fn send_rtcp(&self, packet: &[u8]) -> Result<()> {
self.rtcp.lock().await.push(packet.to_vec());
Ok(())
}
}
fn rtp_packet(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
rtp_packet_pt(96, seq, ts, marker, payload)
}
fn rtp_packet_pt(pt: u8, seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
let mut p = vec![0x80, if marker { 0x80 | pt } else { pt & 0x7F }];
p.extend_from_slice(&seq.to_be_bytes());
p.extend_from_slice(&ts.to_be_bytes());
p.extend_from_slice(&[0, 0, 0, 7]);
p.extend_from_slice(payload);
p
}
#[tokio::test]
async fn accept_offer_builds_answer_with_transport_credentials() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live"))
.build();
let endpoint = WhipEndpoint::new(IngestContext::new(engine));
let transport = Arc::new(FakeTransport::with_packets(Default::default()));
let offer = "v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
let (_res, answer) = endpoint
.accept_offer(offer, StreamKey::new("live", "web"), transport)
.unwrap();
assert!(answer.contains("a=ice-ufrag:ufrag"));
assert!(answer.contains("a=fingerprint:sha-256 AA:BB"));
assert!(answer.contains("a=setup:passive"));
}
#[tokio::test]
async fn pump_publishes_idr_then_releases_slot() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live").gop_cache(4))
.build();
let key = StreamKey::new("live", "web");
let ctx = IngestContext::new(engine.clone());
let mut q = std::collections::VecDeque::new();
q.push_back(rtp_packet(1, 0, true, &[0x65, 0x11])); let transport = Arc::new(FakeTransport::with_packets(q));
let resource = WhipResource {
ctx,
key: key.clone(),
transport,
video_pt: 96,
audio_pt: None,
};
resource.pump().await.unwrap();
assert!(engine.get_stream(&key).is_err());
}
#[tokio::test]
async fn pump_requests_keyframe_on_a_depacketize_gap() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live").gop_cache(4))
.build();
let ctx = IngestContext::new(engine);
let mut q = std::collections::VecDeque::new();
q.push_back(rtp_packet(1, 0, false, &[0x7C, 0x05, 0x11])); let transport = Arc::new(FakeTransport::with_packets(q));
let resource = WhipResource {
ctx,
key: StreamKey::new("live", "web2"),
transport: transport.clone(),
video_pt: 96,
audio_pt: None,
};
resource.pump().await.unwrap();
assert!(
!transport.rtcp.lock().await.is_empty(),
"a PLI was sent after the depacketize gap"
);
}
#[tokio::test]
async fn pump_routes_opus_audio_onto_the_bus() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live").gop_cache(8))
.build();
let key = StreamKey::new("live", "av");
let ctx = IngestContext::new(engine.clone());
let handle = engine.get_stream(&key);
assert!(handle.is_err(), "stream not live until pump opens publish");
let mut q = std::collections::VecDeque::new();
q.push_back(rtp_packet_pt(111, 7, 4800, true, &[0xAA, 0xBB, 0xCC]));
let transport = Arc::new(FakeTransport::with_packets(q));
let resource = WhipResource {
ctx,
key: key.clone(),
transport,
video_pt: 96,
audio_pt: Some(111),
};
let pump = tokio::spawn(async move { resource.pump().await });
let _ = pump.await.unwrap();
assert!(engine.get_stream(&key).is_err());
}
#[tokio::test]
async fn whep_egress_packetizes_published_frames_as_rtp() {
use crate::FrameFlags;
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live").gop_cache(8))
.build();
let key = StreamKey::new("live", "show");
let ctx = IngestContext::new(engine.clone());
let session = ctx.open_publish(key.clone()).await.unwrap();
let mut cfg = MediaFrame::new_video(
0,
0,
bytes::Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42]),
CodecId::H264,
false,
);
cfg.flags |= FrameFlags::CONFIG;
session.publish_frame(cfg).unwrap();
session
.publish_frame(MediaFrame::new_video(
10,
10,
bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65, 0x88, 0x99]),
CodecId::H264,
true,
))
.unwrap();
let whep = WhepEndpoint::new(engine.clone());
let transport = Arc::new(FakeTransport::with_packets(Default::default()));
let offer = "v=0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
let (resource, answer) = whep
.accept_offer(offer, key.clone(), transport.clone())
.unwrap();
assert!(answer.contains("a=sendonly"), "WHEP answer is sendonly");
let pump = tokio::spawn(resource.pump());
for _ in 0..32 {
if !transport.sent_rtp.lock().await.is_empty() {
break;
}
tokio::task::yield_now().await;
}
session.finish().await.unwrap();
let _ = pump.await.unwrap();
let sent = transport.sent_rtp.lock().await;
assert!(!sent.is_empty(), "egress sent RTP packets");
let h = RtpHeader::parse(&sent[0]).unwrap();
assert_eq!(h.payload_type, 96);
}
#[tokio::test]
async fn whep_egress_packetizes_opus_audio() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live").gop_cache(8))
.build();
let key = StreamKey::new("live", "aud");
let ctx = IngestContext::new(engine.clone());
let session = ctx.open_publish(key.clone()).await.unwrap();
session
.publish_frame(MediaFrame::new_video(
0,
0,
bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65, 0x88]),
CodecId::H264,
true,
))
.unwrap();
session
.publish_frame(MediaFrame::new_audio(
20,
bytes::Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
CodecId::Opus,
))
.unwrap();
let whep = WhepEndpoint::new(engine.clone());
let transport = Arc::new(FakeTransport::with_packets(Default::default()));
let offer = "v=0\r\n\
m=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n\
m=audio 9 UDP/TLS/RTP/SAVPF 111\r\na=rtpmap:111 opus/48000/2\r\n";
let (resource, _answer) = whep
.accept_offer(offer, key.clone(), transport.clone())
.unwrap();
let pump = tokio::spawn(resource.pump());
for _ in 0..64 {
if transport
.sent_rtp
.lock()
.await
.iter()
.any(|p| RtpHeader::parse(p).is_some_and(|h| h.payload_type == 111))
{
break;
}
tokio::task::yield_now().await;
}
session.finish().await.unwrap();
let _ = pump.await.unwrap();
let sent = transport.sent_rtp.lock().await;
assert!(
sent.iter()
.any(|p| RtpHeader::parse(p).is_some_and(|h| h.payload_type == 111)),
"egress sent an Opus audio RTP packet on PT 111"
);
}
#[tokio::test]
async fn whep_egress_packetizes_vp9_frames() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live").gop_cache(8))
.build();
let key = StreamKey::new("live", "vp9");
let ctx = IngestContext::new(engine.clone());
let session = ctx.open_publish(key.clone()).await.unwrap();
let frame_data = bytes::Bytes::from_static(&[0xAA, 0xBB, 0xCC, 0xDD, 0xEE]);
session
.publish_frame(MediaFrame::new_video(
0,
0,
frame_data.clone(),
CodecId::VP9,
true,
))
.unwrap();
let whep = WhepEndpoint::new(engine.clone());
let transport = Arc::new(FakeTransport::with_packets(Default::default()));
let offer = "v=0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 VP9/90000\r\n";
let (resource, _answer) = whep
.accept_offer(offer, key.clone(), transport.clone())
.unwrap();
let pump = tokio::spawn(resource.pump());
for _ in 0..32 {
if !transport.sent_rtp.lock().await.is_empty() {
break;
}
tokio::task::yield_now().await;
}
session.finish().await.unwrap();
let _ = pump.await.unwrap();
let sent = transport.sent_rtp.lock().await;
assert!(!sent.is_empty(), "VP9 egress sent RTP packets");
let mut depack = crate::protocol::rtp::Vp9Depacketizer::new();
let mut out = None;
for p in sent.iter() {
let h = RtpHeader::parse(p).unwrap();
if let Some(f) = depack
.push(&p[h.payload_offset..], h.marker, h.timestamp)
.unwrap()
{
out = Some(f);
}
}
let out = out.expect("VP9 frame completed");
assert_eq!(&out.data[..], &frame_data[..], "VP9 frame reconstructed");
assert!(out.keyframe);
}
}