use crate::codec::{create_audio_encoder, create_video_encoder, AudioSamples, VideoFrame};
use crate::discovery::DiscoveryServer;
use crate::error::VideoIpResult;
use crate::fec::FecEncoder;
use crate::metadata::MetadataPacket;
use crate::packet::{PacketBuilder, PacketFlags};
use crate::ptz::PtzMessage;
use crate::stats::StatsTracker;
use crate::tally::TallyMessage;
use crate::transport::UdpTransport;
use crate::types::{AudioConfig, StreamType, VideoConfig};
use bytes::Bytes;
use parking_lot::RwLock;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::{interval, Interval};
#[allow(dead_code)]
const FEC_GROUP_SIZE: usize = 20;
pub struct VideoIpSource {
name: String,
video_config: VideoConfig,
audio_config: AudioConfig,
transport: UdpTransport,
discovery: Option<DiscoveryServer>,
video_encoder: Box<dyn crate::codec::VideoEncoder>,
audio_encoder: Box<dyn crate::codec::AudioEncoder>,
fec_encoder: Option<FecEncoder>,
sequence: u16,
destinations: Arc<RwLock<Vec<SocketAddr>>>,
stats: StatsTracker,
control_rx: mpsc::Receiver<ControlMessage>,
control_tx: mpsc::Sender<ControlMessage>,
frame_interval: Option<Interval>,
}
#[derive(Debug, Clone)]
pub enum ControlMessage {
AddDestination(SocketAddr),
RemoveDestination(SocketAddr),
Ptz(PtzMessage),
Tally(TallyMessage),
Metadata(MetadataPacket),
}
impl VideoIpSource {
pub async fn new(
name: impl Into<String>,
video_config: VideoConfig,
audio_config: AudioConfig,
) -> VideoIpResult<Self> {
let name = name.into();
let bind_addr = "0.0.0.0:0".parse().expect("should succeed in test");
let transport = UdpTransport::bind(bind_addr).await?;
let video_encoder = create_video_encoder(
video_config.format.codec,
video_config.format.resolution.width,
video_config.format.resolution.height,
video_config.target_bitrate,
)?;
let audio_encoder = create_audio_encoder(
audio_config.format.codec,
audio_config.format.sample_rate,
audio_config.format.channels,
)?;
let (control_tx, control_rx) = mpsc::channel(100);
Ok(Self {
name,
video_config,
audio_config,
transport,
discovery: None,
video_encoder,
audio_encoder,
fec_encoder: None,
sequence: 0,
destinations: Arc::new(RwLock::new(Vec::new())),
stats: StatsTracker::new(),
control_rx,
control_tx,
frame_interval: None,
})
}
pub fn start_broadcasting(&mut self) -> VideoIpResult<()> {
let mut discovery = DiscoveryServer::new()?;
let port = self.transport.local_addr().port();
discovery.announce(
&self.name,
port,
&self.video_config.format,
&self.audio_config.format,
)?;
self.discovery = Some(discovery);
let fps = self.video_config.format.frame_rate.to_float();
let frame_duration = Duration::from_secs_f64(1.0 / fps);
self.frame_interval = Some(interval(frame_duration));
Ok(())
}
pub fn stop_broadcasting(&mut self) -> VideoIpResult<()> {
if let Some(mut discovery) = self.discovery.take() {
discovery.stop_announce()?;
}
Ok(())
}
pub fn enable_fec(&mut self, ratio: f32) -> VideoIpResult<()> {
self.fec_encoder = Some(FecEncoder::with_ratio(ratio)?);
Ok(())
}
pub fn add_destination(&self, addr: SocketAddr) {
self.destinations.write().push(addr);
}
pub fn remove_destination(&self, addr: SocketAddr) {
self.destinations.write().retain(|a| *a != addr);
}
#[must_use]
pub fn control_sender(&self) -> mpsc::Sender<ControlMessage> {
self.control_tx.clone()
}
pub async fn send_frame(
&mut self,
video_frame: VideoFrame,
audio_samples: Option<AudioSamples>,
) -> VideoIpResult<()> {
if let Some(ref mut interval) = self.frame_interval {
interval.tick().await;
}
let video_data = self.video_encoder.encode(&video_frame)?;
let is_keyframe = video_frame.is_keyframe;
self.send_video_data(video_data, is_keyframe, video_frame.pts)
.await?;
if let Some(samples) = audio_samples {
let audio_data = self.audio_encoder.encode(&samples)?;
self.send_audio_data(audio_data, samples.pts).await?;
}
self.process_control_messages().await?;
Ok(())
}
async fn send_video_data(
&mut self,
data: Bytes,
is_keyframe: bool,
pts: u64,
) -> VideoIpResult<()> {
const MAX_PAYLOAD: usize = 8000;
let chunks: Vec<_> = data.chunks(MAX_PAYLOAD).collect();
let chunk_count = chunks.len();
for (i, chunk) in chunks.into_iter().enumerate() {
let mut flags = PacketFlags::VIDEO;
if is_keyframe {
flags |= PacketFlags::KEYFRAME;
}
if i == 0 {
flags |= PacketFlags::START_OF_FRAME;
}
if i == chunk_count - 1 {
flags |= PacketFlags::END_OF_FRAME;
}
let mut builder = PacketBuilder::new(self.sequence)
.with_timestamp(pts)
.with_stream_type(StreamType::Program);
if is_keyframe {
builder = builder.keyframe();
}
if i == 0 {
builder = builder.start_of_frame();
}
if i == chunk_count - 1 {
builder = builder.end_of_frame();
}
let packet = builder.video().build(Bytes::copy_from_slice(chunk))?;
self.send_packet(&packet).await?;
self.sequence = self.sequence.wrapping_add(1);
}
Ok(())
}
async fn send_audio_data(&mut self, data: Bytes, pts: u64) -> VideoIpResult<()> {
let packet = PacketBuilder::new(self.sequence)
.audio()
.with_timestamp(pts)
.with_stream_type(StreamType::Program)
.build(data)?;
self.send_packet(&packet).await?;
self.sequence = self.sequence.wrapping_add(1);
Ok(())
}
async fn send_packet(&mut self, packet: &crate::packet::Packet) -> VideoIpResult<()> {
let destinations = self.destinations.read().clone();
let packet_size = packet.size();
for dest in &destinations {
self.transport.send_packet(packet, *dest).await?;
}
self.stats.record_sent(packet_size);
Ok(())
}
async fn process_control_messages(&mut self) -> VideoIpResult<()> {
while let Ok(msg) = self.control_rx.try_recv() {
match msg {
ControlMessage::AddDestination(addr) => {
self.add_destination(addr);
}
ControlMessage::RemoveDestination(addr) => {
self.remove_destination(addr);
}
ControlMessage::Ptz(ptz_msg) => {
let data = ptz_msg.encode();
let packet = PacketBuilder::new(self.sequence)
.metadata()
.with_current_timestamp()
.build(data)?;
self.send_packet(&packet).await?;
self.sequence = self.sequence.wrapping_add(1);
}
ControlMessage::Tally(tally_msg) => {
let data = tally_msg.encode();
let packet = PacketBuilder::new(self.sequence)
.metadata()
.with_current_timestamp()
.build(data)?;
self.send_packet(&packet).await?;
self.sequence = self.sequence.wrapping_add(1);
}
ControlMessage::Metadata(metadata) => {
let data = metadata.encode();
let packet = PacketBuilder::new(self.sequence)
.metadata()
.with_current_timestamp()
.build(data)?;
self.send_packet(&packet).await?;
self.sequence = self.sequence.wrapping_add(1);
}
}
}
Ok(())
}
#[must_use]
pub fn stats(&self) -> crate::stats::NetworkStats {
self.stats.get_stats()
}
#[must_use]
pub fn local_addr(&self) -> SocketAddr {
self.transport.local_addr()
}
}
impl Drop for VideoIpSource {
fn drop(&mut self) {
let _ = self.stop_broadcasting();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_source_creation() {
let video_config = VideoConfig::new(1920, 1080, 30.0).expect("should succeed in test");
let audio_config = AudioConfig::new(48000, 2).expect("should succeed in test");
let source = VideoIpSource::new("Test Source", video_config, audio_config).await;
assert!(source.is_ok());
}
#[tokio::test]
async fn test_source_add_destination() {
let video_config = VideoConfig::new(1920, 1080, 30.0).expect("should succeed in test");
let audio_config = AudioConfig::new(48000, 2).expect("should succeed in test");
let source = VideoIpSource::new("Test", video_config, audio_config)
.await
.expect("should succeed in test");
let dest = "127.0.0.1:5000".parse().expect("should succeed in test");
source.add_destination(dest);
assert_eq!(source.destinations.read().len(), 1);
}
#[tokio::test]
async fn test_source_enable_fec() {
let video_config = VideoConfig::new(1920, 1080, 30.0).expect("should succeed in test");
let audio_config = AudioConfig::new(48000, 2).expect("should succeed in test");
let mut source = VideoIpSource::new("Test", video_config, audio_config)
.await
.expect("should succeed in test");
assert!(source.enable_fec(0.1).is_ok());
assert!(source.fec_encoder.is_some());
}
#[tokio::test]
async fn test_send_frame() {
let video_config = VideoConfig::new(640, 480, 30.0).expect("should succeed in test");
let audio_config = AudioConfig::new(48000, 2).expect("should succeed in test");
let mut source = VideoIpSource::new("Test", video_config, audio_config)
.await
.expect("should succeed in test");
let frame = VideoFrame::new(Bytes::from_static(b"test video data"), 640, 480, true, 0);
let samples = AudioSamples::new(Bytes::from_static(b"test audio"), 1024, 2, 48000, 0);
let result = source.send_frame(frame, Some(samples)).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_control_messages() {
let video_config = VideoConfig::new(640, 480, 30.0).expect("should succeed in test");
let audio_config = AudioConfig::new(48000, 2).expect("should succeed in test");
let source = VideoIpSource::new("Test", video_config, audio_config)
.await
.expect("should succeed in test");
let control_tx = source.control_sender();
let dest = "127.0.0.1:5000".parse().expect("should succeed in test");
control_tx
.send(ControlMessage::AddDestination(dest))
.await
.expect("should succeed in test");
}
}