pub mod rtcp;
pub mod sdp;
pub use sdp::{SdpAnswerParams, SdpOffer};
use crate::inbound::{IngestContext, PublishSession};
use crate::protocol::rtp::{H264Depacketizer, RtpHeader};
use crate::{CodecId, MediaFrame, Result, StreamKey};
use async_trait::async_trait;
#[async_trait]
pub trait DtlsSrtpTransport: Send + Sync {
fn fingerprint(&self) -> String;
fn ice_credentials(&self) -> (String, String);
async fn recv_rtp(&self) -> Option<Vec<u8>>;
async fn send_rtcp(&self, packet: &[u8]) -> Result<()>;
}
#[derive(Clone)]
pub struct WhipEndpoint {
ctx: IngestContext,
}
impl WhipEndpoint {
pub fn new(ctx: IngestContext) -> Self {
Self { ctx }
}
pub fn accept_offer(
&self,
offer_sdp: &str,
key: StreamKey,
transport: std::sync::Arc<dyn DtlsSrtpTransport>,
) -> Result<(WhipResource, String)> {
let offer = SdpOffer::parse(offer_sdp)
.ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
let (ufrag, pwd) = transport.ice_credentials();
let answer = sdp::build_answer(
&offer,
&SdpAnswerParams {
fingerprint: transport.fingerprint(),
ice_ufrag: ufrag,
ice_pwd: pwd,
},
);
let resource = WhipResource {
ctx: self.ctx.clone(),
key,
transport,
};
Ok((resource, answer))
}
}
pub struct WhipResource {
ctx: IngestContext,
key: StreamKey,
transport: std::sync::Arc<dyn DtlsSrtpTransport>,
}
impl WhipResource {
pub async fn pump(self) -> Result<()> {
let session: PublishSession = self.ctx.open_publish(self.key.clone()).await?;
let mut depack = H264Depacketizer::new();
let mut needs_keyframe = true;
while let Some(pkt) = self.transport.recv_rtp().await {
let Some(header) = RtpHeader::parse(&pkt) else {
continue;
};
let payload = &pkt[header.payload_offset..];
match depack.push(payload, header.marker, header.timestamp, header.sequence) {
Ok(Some(au)) => {
needs_keyframe = false;
let pts = (au.timestamp / 90) as i64;
let frame =
MediaFrame::new_video(pts, pts, au.data, CodecId::H264, au.keyframe);
let _ = session.publish_frame(frame)?;
}
Ok(None) => {}
Err(_) => {
needs_keyframe = true;
}
}
if needs_keyframe {
let pli = rtcp::build_pli(0, header.ssrc);
let _ = self.transport.send_rtcp(&pli).await;
}
}
session.finish().await
}
pub async fn close(self) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bus::PlaybackRegistry;
use std::sync::Arc;
use tokio::sync::Mutex;
struct FakeTransport {
packets: Mutex<std::collections::VecDeque<Vec<u8>>>,
rtcp: Mutex<Vec<Vec<u8>>>,
}
#[async_trait]
impl DtlsSrtpTransport for FakeTransport {
fn fingerprint(&self) -> String {
"sha-256 AA:BB".into()
}
fn ice_credentials(&self) -> (String, String) {
("ufrag".into(), "pwd".into())
}
async fn recv_rtp(&self) -> Option<Vec<u8>> {
self.packets.lock().await.pop_front()
}
async fn send_rtcp(&self, packet: &[u8]) -> Result<()> {
self.rtcp.lock().await.push(packet.to_vec());
Ok(())
}
}
fn rtp_packet(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
p.extend_from_slice(&seq.to_be_bytes());
p.extend_from_slice(&ts.to_be_bytes());
p.extend_from_slice(&[0, 0, 0, 7]);
p.extend_from_slice(payload);
p
}
#[tokio::test]
async fn accept_offer_builds_answer_with_transport_credentials() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live"))
.build();
let endpoint = WhipEndpoint::new(IngestContext::new(engine));
let transport = Arc::new(FakeTransport {
packets: Mutex::new(Default::default()),
rtcp: Mutex::new(Vec::new()),
});
let offer = "v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
let (_res, answer) = endpoint
.accept_offer(offer, StreamKey::new("live", "web"), transport)
.unwrap();
assert!(answer.contains("a=ice-ufrag:ufrag"));
assert!(answer.contains("a=fingerprint:sha-256 AA:BB"));
assert!(answer.contains("a=setup:passive"));
}
#[tokio::test]
async fn pump_publishes_idr_then_releases_slot() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live").gop_cache(4))
.build();
let key = StreamKey::new("live", "web");
let ctx = IngestContext::new(engine.clone());
let mut q = std::collections::VecDeque::new();
q.push_back(rtp_packet(1, 0, true, &[0x65, 0x11])); let transport = Arc::new(FakeTransport {
packets: Mutex::new(q),
rtcp: Mutex::new(Vec::new()),
});
let resource = WhipResource {
ctx,
key: key.clone(),
transport,
};
resource.pump().await.unwrap();
assert!(engine.get_stream(&key).is_err());
}
#[tokio::test]
async fn pump_requests_keyframe_on_a_depacketize_gap() {
let engine = crate::Engine::builder()
.application(crate::AppSpec::new("live").gop_cache(4))
.build();
let ctx = IngestContext::new(engine);
let mut q = std::collections::VecDeque::new();
q.push_back(rtp_packet(1, 0, false, &[0x7C, 0x05, 0x11])); let transport = Arc::new(FakeTransport {
packets: Mutex::new(q),
rtcp: Mutex::new(Vec::new()),
});
let resource = WhipResource {
ctx,
key: StreamKey::new("live", "web2"),
transport: transport.clone(),
};
resource.pump().await.unwrap();
assert!(
!transport.rtcp.lock().await.is_empty(),
"a PLI was sent after the depacketize gap"
);
}
}