use std::sync::{Arc, Condvar, Mutex};
use moq_mux::container::Timestamp;
use crate::Error;
use crate::capture::{self, Camera};
use super::encoder::{self, Encoder};
const DEFAULT_FRAMERATE: u32 = 30;
pub struct Producer {
import: moq_mux::codec::h264::Import,
}
impl Producer {
pub fn new(broadcast: moq_net::BroadcastProducer, catalog: moq_mux::catalog::Producer) -> Result<Self, Error> {
let import =
moq_mux::codec::h264::Import::new(broadcast, catalog).with_mode(moq_mux::codec::h264::Mode::Avc3)?;
Ok(Self { import })
}
pub fn track(&self) -> Option<&moq_net::TrackProducer> {
self.import.track()
}
pub fn publish(&mut self, packets: Vec<bytes::Bytes>, timestamp: Timestamp) -> Result<(), Error> {
for mut packet in packets {
self.import.decode_frame(&mut packet, Some(timestamp))?;
}
Ok(())
}
pub fn finish(&mut self) -> Result<(), Error> {
self.import.finish()?;
Ok(())
}
}
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub struct Options {
pub bitrate: Option<u64>,
pub kind: encoder::Kind,
}
pub async fn publish_capture(
broadcast: moq_net::BroadcastProducer,
catalog: moq_mux::catalog::Producer,
capture: capture::Config,
encode: Options,
clock: moq_mux::Clock,
) -> Result<(), Error> {
if capture.framerate == Some(0) {
return Err(Error::InvalidFramerate(0));
}
let producer = Producer::new(broadcast, catalog)?;
let track = producer
.track()
.cloned()
.ok_or_else(|| Error::Codec(anyhow::anyhow!("avc3 track was not created")))?;
let gate = Gate::new();
let worker_gate = gate.clone();
let mut worker = tokio::task::spawn_blocking(move || capture_loop(producer, capture, encode, worker_gate, clock));
tokio::select! {
res = &mut worker => res.map_err(|e| Error::Codec(anyhow::anyhow!("capture task: {e}")))?,
() = monitor_demand(&track, &gate) => {
gate.close();
worker
.await
.map_err(|e| Error::Codec(anyhow::anyhow!("capture task: {e}")))?
}
}
}
async fn monitor_demand(track: &moq_net::TrackProducer, gate: &Gate) {
loop {
match track.used().await {
Ok(()) => gate.set_active(true),
Err(err) => return log_track_ended(err),
}
match track.unused().await {
Ok(()) => gate.set_active(false),
Err(err) => return log_track_ended(err),
}
}
}
fn log_track_ended(err: moq_net::Error) {
if matches!(err, moq_net::Error::Dropped | moq_net::Error::Closed) {
tracing::debug!("video track no longer announced; stopping capture");
} else {
tracing::warn!(error = %err, "video track aborted; stopping capture");
}
}
fn capture_loop(
mut producer: Producer,
capture: capture::Config,
encode: Options,
gate: Arc<Gate>,
clock: moq_mux::Clock,
) -> Result<(), Error> {
let mut camera: Option<Camera> = None;
let mut encoder: Option<Encoder> = None;
let mut last_ts = Timestamp::from_micros(0)?;
let mut catalog_ready = false;
loop {
if catalog_ready && !gate.is_active() {
if camera.take().is_some() {
encoder = None;
tracing::info!("no viewers: released camera");
}
if !gate.wait_active() {
break; }
continue;
}
if camera.is_none() {
let cam = Camera::open(&capture)?;
let framerate = capture
.framerate
.or_else(|| cam.framerate())
.unwrap_or(DEFAULT_FRAMERATE);
let mut encoder_config = encoder::Config::new(cam.width(), cam.height(), framerate);
encoder_config.bitrate = encode.bitrate;
encoder_config.kind = encode.kind.clone();
let enc = Encoder::new(&encoder_config)?;
tracing::info!(
encoder = enc.name(),
device = cam.device(),
"viewer subscribed: capturing"
);
camera = Some(cam);
encoder = Some(enc);
}
let frame = match camera.as_mut().expect("camera open above").read()? {
Some(frame) => frame,
None => break, };
let ts = Timestamp::from_micros(clock.micros())?;
last_ts = ts;
let packets = encoder.as_mut().expect("encoder built above").encode(&frame)?;
catalog_ready |= !packets.is_empty();
producer.publish(packets, ts)?;
}
if let Some(enc) = encoder.as_mut() {
match enc.finish() {
Ok(packets) => {
if let Err(err) = producer.publish(packets, last_ts) {
tracing::warn!(error = %err, "failed to publish final video packets");
}
}
Err(err) => tracing::warn!(error = %err, "failed to flush video encoder"),
}
}
producer.finish()?;
Ok(())
}
struct Gate {
state: Mutex<GateState>,
cond: Condvar,
}
#[derive(Default)]
struct GateState {
active: bool,
closed: bool,
}
impl Gate {
fn new() -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(GateState::default()),
cond: Condvar::new(),
})
}
fn set_active(&self, active: bool) {
let mut state = self.state.lock().unwrap();
state.active = active;
self.cond.notify_all();
}
fn close(&self) {
let mut state = self.state.lock().unwrap();
state.active = false;
state.closed = true;
self.cond.notify_all();
}
fn is_active(&self) -> bool {
self.state.lock().unwrap().active
}
fn wait_active(&self) -> bool {
let mut state = self.state.lock().unwrap();
while !state.active && !state.closed {
state = self.cond.wait(state).unwrap();
}
!state.closed
}
}