use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use rustrtc::config::MediaCapabilities;
use rustrtc::media::{MediaKind, VideoFrame, sample_track};
use rustrtc::{
IceCandidate, PeerConnection, RtcConfiguration, RtpCodecParameters,
SdpType, SessionDescription,
};
use tokio::runtime::Handle;
use crate::h264::H264Frame;
const RTP_CLOCK_RATE: u32 = 90000;
const RTP_MTU: usize = 1200;
pub struct WebRtcHandle {
pub offer: String,
pc: PeerConnection,
sample_source: rustrtc::media::track::SampleStreamSource,
rtp_timestamp: AtomicU32,
rtp_ts_per_frame: u32,
rt: Handle,
}
impl WebRtcHandle {
pub fn new(rt: Handle, fps: u32, stop: Arc<AtomicBool>) -> Result<Self, String> {
let (pc, sample_source, offer_sdp) = rt.block_on(async {
let mut h264 = rustrtc::config::VideoCapability::h264();
h264.fmtp = Some(
"level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f"
.to_string(),
);
let mut caps = MediaCapabilities::default();
caps.video = vec![h264];
let config = RtcConfiguration {
ice_servers: vec![],
media_capabilities: Some(caps),
..Default::default()
};
let pc = PeerConnection::new(config);
let (source, track, _feedback_rx) = sample_track(MediaKind::Video, 120);
pc.add_track(
track,
RtpCodecParameters {
payload_type: 96,
clock_rate: RTP_CLOCK_RATE,
channels: 0,
},
)
.map_err(|e| e.to_string())?;
let offer = pc.create_offer().await.map_err(|e| e.to_string())?;
pc.set_local_description(offer)
.map_err(|e| e.to_string())?;
tokio::time::timeout(Duration::from_secs(3), pc.wait_for_gathering_complete())
.await
.ok();
let offer_sdp = pc
.local_description()
.ok_or("no local desc")?
.to_sdp_string();
let pc_c = pc.clone();
let stop_c = stop.clone();
tokio::spawn(async move {
while !stop_c.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(500)).await;
}
let _ = pc_c.close();
});
Ok::<_, String>((pc, source, offer_sdp))
})?;
Ok(WebRtcHandle {
offer: offer_sdp,
pc,
sample_source,
rtp_timestamp: AtomicU32::new(0),
rtp_ts_per_frame: RTP_CLOCK_RATE / fps.max(1),
rt,
})
}
pub fn set_answer(&self, answer_sdp: String) -> Result<(), String> {
let pc = self.pc.clone();
self.rt.block_on(async {
let answer =
SessionDescription::parse(SdpType::Answer, &answer_sdp).map_err(|e| e.to_string())?;
pc.set_remote_description(answer)
.await
.map_err(|e| e.to_string())
})
}
pub fn add_candidate(
&self,
candidate: &str,
_sdp_mid: Option<String>,
_sdp_mline_index: Option<u16>,
) -> Result<(), String> {
let ice_candidate =
IceCandidate::from_sdp(candidate).map_err(|e| format!("ICE parse error: {}", e))?;
self.pc
.add_ice_candidate(ice_candidate)
.map_err(|e| e.to_string())
}
pub fn close(&self) {
let pc = self.pc.clone();
self.rt.block_on(async move {
pc.close();
});
}
pub fn send_frame(&self, frame: &H264Frame) -> Result<(), String> {
let mut nal_units: Vec<(Vec<u8>, bool)> = Vec::new();
if frame.is_keyframe {
if let Some(ref sps) = frame.sps {
nal_units.push((sps.clone(), false));
}
if let Some(ref pps) = frame.pps {
nal_units.push((pps.clone(), false));
}
}
nal_units.extend(crate::h264::avcc_nal_units(&frame.data));
let ts = self
.rtp_timestamp
.fetch_add(self.rtp_ts_per_frame, Ordering::Relaxed);
let mut rtp_packets: Vec<(Vec<u8>, bool)> = Vec::new();
for (nal_data, is_last_nal) in nal_units {
rtp_packets.extend(packetize_nal(nal_data, is_last_nal, RTP_MTU));
}
let source = self.sample_source.clone();
self.rt.block_on(async move {
for (payload, is_last) in rtp_packets {
source
.send_video(VideoFrame {
rtp_timestamp: ts,
data: Bytes::from(payload),
is_last_packet: is_last,
..Default::default()
})
.await
.map_err(|e| e.to_string())?;
}
Ok(())
})
}
}
pub(crate) fn packetize_nal(
data: Vec<u8>,
is_last_nal: bool,
mtu: usize,
) -> Vec<(Vec<u8>, bool)> {
if data.is_empty() {
return vec![];
}
if data.len() <= mtu {
return vec![(data, is_last_nal)];
}
let nal_header = data[0];
let fu_indicator = 0x1c | (nal_header & 0x60);
let nal_type = nal_header & 0x1f;
let max_chunk = mtu - 2;
let mut packets = Vec::new();
let mut offset = 1;
while offset < data.len() {
let chunk_end = (offset + max_chunk).min(data.len());
let chunk = &data[offset..chunk_end];
let is_first = offset == 1;
let is_last = chunk_end >= data.len();
let fu_header = (if is_first { 0x80 } else { 0 })
| (if is_last { 0x40 } else { 0 })
| nal_type;
let mut payload = Vec::with_capacity(2 + chunk.len());
payload.push(fu_indicator);
payload.push(fu_header);
payload.extend_from_slice(chunk);
packets.push((payload, is_last && is_last_nal));
offset = chunk_end;
}
packets
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn packetize_empty() {
assert!(packetize_nal(vec![], false, 1200).is_empty());
}
#[test]
fn packetize_single_small() {
let nal = vec![0x41, 0x01, 0x02, 0x03];
let packets = packetize_nal(nal, true, 1200);
assert_eq!(packets.len(), 1);
assert_eq!(packets[0].0, &[0x41, 0x01, 0x02, 0x03]);
assert!(packets[0].1);
}
#[test]
fn packetize_exact_mtu() {
let mut nal = vec![0x41];
nal.extend(std::iter::repeat(0xFF).take(1199));
let packets = packetize_nal(nal, true, 1200);
assert_eq!(packets.len(), 1);
}
#[test]
fn packetize_is_last_propagated() {
let nal = vec![0x41, 0x01, 0x02, 0x03];
let packets = packetize_nal(nal, false, 1200);
assert_eq!(packets.len(), 1);
assert!(!packets[0].1);
}
#[test]
fn packetize_fua_basic() {
let nal_type = 0x65;
let mut nal = vec![nal_type];
nal.extend(std::iter::repeat(0xFF).take(2400));
let packets = packetize_nal(nal, true, 1200);
assert!(packets.len() >= 2, "should be fragmented into {} packets", packets.len());
for p in &packets {
assert!(p.0.len() <= 1200, "packet too large: {}", p.0.len());
}
assert_ne!(packets[0].0[1] & 0x80, 0);
assert!(!packets[0].1);
assert_ne!(packets.last().unwrap().0[1] & 0x40, 0);
assert!(packets.last().unwrap().1);
}
#[test]
fn packetize_fua_nal_type_preserved() {
let mut nal = vec![0x21]; nal.extend(std::iter::repeat(0xFF).take(2000));
let packets = packetize_nal(nal, true, 1200);
for p in &packets {
assert_eq!(p.0[1] & 0x1f, 1, "NAL type should be 1");
}
}
#[test]
fn packetize_fua_is_last_nal_false() {
let mut nal = vec![0x65];
nal.extend(std::iter::repeat(0xFF).take(2400));
let packets = packetize_nal(nal, false, 1200);
assert!(!packets.last().unwrap().1);
}
#[test]
fn packetize_fua_three_fragments() {
let mut nal = vec![0x41];
nal.extend(std::iter::repeat(0xAA).take(2400));
let packets = packetize_nal(nal, true, 1200);
assert_eq!(packets.len(), 3);
assert_ne!(packets[0].0[1] & 0x80, 0); assert_eq!(packets[0].0[1] & 0x40, 0);
assert_eq!(packets[1].0[1] & 0xC0, 0); assert_eq!(packets[2].0[1] & 0x80, 0);
assert_ne!(packets[2].0[1] & 0x40, 0); }
}