Skip to main content

moq_video/encode/
producer.rs

1//! Encode decoded video frames and publish them as an H.264 moq track.
2//!
3//! Encoding is strictly on demand: the avc3 track and catalog entry are
4//! advertised immediately, but the camera stays closed (LED off, no CPU)
5//! until a subscriber appears. When the last viewer leaves, the camera is
6//! released again. This mirrors `moq-boy`, which pauses its emulator on
7//! `TrackProducer::used()` / `unused()`.
8
9use std::sync::{Arc, Condvar, Mutex};
10
11use moq_mux::container::Timestamp;
12
13use crate::Error;
14use crate::capture::{self, Camera};
15
16use super::encoder::{self, Encoder};
17
18/// Last-resort framerate when neither the caller nor the camera reports one.
19const DEFAULT_FRAMERATE: u32 = 30;
20
21/// Publishes encoded H.264 frames as an avc3 moq track.
22///
23/// Built on the async side so the track is advertised (and the catalog
24/// registered) before the camera opens; this is what lets a subscriber
25/// trigger capture on demand. `moq_mux::codec::h264::Import` handles
26/// catalog registration and framing.
27pub struct Producer {
28	import: moq_mux::codec::h264::Import,
29}
30
31impl Producer {
32	pub fn new(broadcast: moq_net::BroadcastProducer, catalog: moq_mux::catalog::Producer) -> Result<Self, Error> {
33		let import =
34			moq_mux::codec::h264::Import::new(broadcast, catalog).with_mode(moq_mux::codec::h264::Mode::Avc3)?;
35		Ok(Self { import })
36	}
37
38	/// The underlying track producer, eagerly created by avc3 mode. Clone it
39	/// to watch subscription state via [`used`](moq_net::TrackProducer::used) /
40	/// [`unused`](moq_net::TrackProducer::unused).
41	pub fn track(&self) -> Option<&moq_net::TrackProducer> {
42		self.import.track()
43	}
44
45	/// Publish already-encoded Annex-B packets at the given timestamp.
46	pub fn publish(&mut self, packets: Vec<bytes::Bytes>, timestamp: Timestamp) -> Result<(), Error> {
47		for mut packet in packets {
48			self.import.decode_frame(&mut packet, Some(timestamp))?;
49		}
50		Ok(())
51	}
52
53	/// Finalize the track.
54	pub fn finish(&mut self) -> Result<(), Error> {
55		self.import.finish()?;
56		Ok(())
57	}
58}
59
60/// Source-agnostic encode knobs for [`publish_capture`], where the geometry
61/// (width / height / framerate) comes from the capture source, not the caller.
62/// For the bring-your-own-frames [`Encoder`](super::Encoder) path, where you
63/// must specify geometry, use [`Config`](super::Config) instead.
64///
65/// `#[non_exhaustive]`: construct via [`Options::default`] and set fields, so
66/// new knobs can be added without breaking callers.
67#[derive(Clone, Debug, Default)]
68#[non_exhaustive]
69pub struct Options {
70	/// Target bitrate in bits per second; `None` derives from resolution.
71	pub bitrate: Option<u64>,
72	/// Encoder implementation preference.
73	pub kind: encoder::Kind,
74}
75
76/// Capture a webcam and publish it as on-demand H.264.
77///
78/// Returns when the broadcast is dropped (the track stops being announced)
79/// or the capture loop fails. The camera is opened only while at least one
80/// subscriber is watching; frames are stamped from `clock`, so passing the
81/// same [`Clock`](moq_mux::Clock) to a concurrent audio publish keeps the two
82/// tracks aligned.
83pub async fn publish_capture(
84	broadcast: moq_net::BroadcastProducer,
85	catalog: moq_mux::catalog::Producer,
86	capture: capture::Config,
87	encode: Options,
88	clock: moq_mux::Clock,
89) -> Result<(), Error> {
90	// A caller asking for exactly zero is an error; omitting it (None) is
91	// fine and resolves to the camera's reported rate once it's open.
92	if capture.framerate == Some(0) {
93		return Err(Error::InvalidFramerate(0));
94	}
95
96	let producer = Producer::new(broadcast, catalog)?;
97	let track = producer
98		.track()
99		.cloned()
100		.ok_or_else(|| Error::Codec(anyhow::anyhow!("avc3 track was not created")))?;
101
102	let gate = Gate::new();
103
104	// ffmpeg capture + encode is blocking; keep it off the async runtime.
105	let worker_gate = gate.clone();
106	let mut worker = tokio::task::spawn_blocking(move || capture_loop(producer, capture, encode, worker_gate, clock));
107
108	tokio::select! {
109		// Surface a capture/encode failure (e.g. camera open) promptly.
110		res = &mut worker => res.map_err(|e| Error::Codec(anyhow::anyhow!("capture task: {e}")))?,
111		// The broadcast was dropped: stop the worker and wait for it to flush.
112		() = monitor_demand(&track, &gate) => {
113			gate.close();
114			worker
115				.await
116				.map_err(|e| Error::Codec(anyhow::anyhow!("capture task: {e}")))?
117		}
118	}
119}
120
121/// Toggle the gate as viewers subscribe and unsubscribe. Returns once the
122/// track stops being announced (broadcast dropped / aborted).
123async fn monitor_demand(track: &moq_net::TrackProducer, gate: &Gate) {
124	loop {
125		match track.used().await {
126			Ok(()) => gate.set_active(true),
127			Err(err) => return log_track_ended(err),
128		}
129		match track.unused().await {
130			Ok(()) => gate.set_active(false),
131			Err(err) => return log_track_ended(err),
132		}
133	}
134}
135
136/// A dropped or closed track is the normal end of a publish; any other cause is
137/// a real abort (e.g. a transport reset) worth surfacing rather than treating as
138/// a clean exit.
139fn log_track_ended(err: moq_net::Error) {
140	if matches!(err, moq_net::Error::Dropped | moq_net::Error::Closed) {
141		tracing::debug!("video track no longer announced; stopping capture");
142	} else {
143		tracing::warn!(error = %err, "video track aborted; stopping capture");
144	}
145}
146
147/// Blocking capture/encode loop. Captures one frame up front to populate the
148/// catalog (the codec/resolution only exist once the encoder has produced an
149/// SPS), then releases the camera whenever the gate goes idle.
150fn capture_loop(
151	mut producer: Producer,
152	capture: capture::Config,
153	encode: Options,
154	gate: Arc<Gate>,
155	clock: moq_mux::Clock,
156) -> Result<(), Error> {
157	let mut camera: Option<Camera> = None;
158	let mut encoder: Option<Encoder> = None;
159	let mut last_ts = Timestamp::from_micros(0)?;
160	// The catalog video rendition only appears once a frame has been encoded
161	// (the importer reads the SPS). Until then we keep capturing regardless of
162	// the gate, so a catalog-driven subscriber can discover the track and
163	// trigger `used()`. After that we release the camera while unwatched.
164	let mut catalog_ready = false;
165
166	loop {
167		if catalog_ready && !gate.is_active() {
168			// No viewers: drop the camera so its LED turns off and it stops
169			// consuming CPU, then block until someone subscribes.
170			if camera.take().is_some() {
171				encoder = None;
172				tracing::info!("no viewers: released camera");
173			}
174			if !gate.wait_active() {
175				break; // closed
176			}
177			continue;
178		}
179
180		// Open the camera (and an encoder sized to its negotiated mode) the
181		// first time we're watched after being idle.
182		if camera.is_none() {
183			let cam = Camera::open(&capture)?;
184			// Prefer an explicit --fps, otherwise use the camera's reported
185			// rate, falling back only if the backend doesn't expose one.
186			let framerate = capture
187				.framerate
188				.or_else(|| cam.framerate())
189				.unwrap_or(DEFAULT_FRAMERATE);
190			let mut encoder_config = encoder::Config::new(cam.width(), cam.height(), framerate);
191			encoder_config.bitrate = encode.bitrate;
192			encoder_config.kind = encode.kind.clone();
193			let enc = Encoder::new(&encoder_config)?;
194			tracing::info!(
195				encoder = enc.name(),
196				device = cam.device(),
197				"viewer subscribed: capturing"
198			);
199			camera = Some(cam);
200			encoder = Some(enc);
201		}
202
203		let frame = match camera.as_mut().expect("camera open above").read()? {
204			Some(frame) => frame,
205			None => break, // device stopped producing frames
206		};
207
208		let ts = Timestamp::from_micros(clock.micros())?;
209		last_ts = ts;
210
211		let packets = encoder.as_mut().expect("encoder built above").encode(&frame)?;
212		// Once the encoder has emitted a frame, the importer has parsed the SPS
213		// and the catalog rendition exists, so the gate can take over.
214		catalog_ready |= !packets.is_empty();
215		producer.publish(packets, ts)?;
216	}
217
218	// Flush whatever the encoder still holds, then close the track. Log
219	// (don't discard) flush/publish errors at shutdown; they're not worth
220	// aborting the close over, but silently dropping them hides real failures.
221	if let Some(enc) = encoder.as_mut() {
222		match enc.finish() {
223			Ok(packets) => {
224				if let Err(err) = producer.publish(packets, last_ts) {
225					tracing::warn!(error = %err, "failed to publish final video packets");
226				}
227			}
228			Err(err) => tracing::warn!(error = %err, "failed to flush video encoder"),
229		}
230	}
231	producer.finish()?;
232	Ok(())
233}
234
235/// Bridges the async demand monitor to the blocking capture thread: the
236/// monitor flips `active`, the capture loop waits on it.
237struct Gate {
238	state: Mutex<GateState>,
239	cond: Condvar,
240}
241
242#[derive(Default)]
243struct GateState {
244	active: bool,
245	closed: bool,
246}
247
248impl Gate {
249	fn new() -> Arc<Self> {
250		Arc::new(Self {
251			state: Mutex::new(GateState::default()),
252			cond: Condvar::new(),
253		})
254	}
255
256	fn set_active(&self, active: bool) {
257		let mut state = self.state.lock().unwrap();
258		state.active = active;
259		self.cond.notify_all();
260	}
261
262	fn close(&self) {
263		let mut state = self.state.lock().unwrap();
264		// Clear active too: otherwise a shutdown that races an
265		// still-subscribed track leaves the worker in the capture path,
266		// where it never checks `closed` until the next publish fails.
267		state.active = false;
268		state.closed = true;
269		self.cond.notify_all();
270	}
271
272	fn is_active(&self) -> bool {
273		self.state.lock().unwrap().active
274	}
275
276	/// Block until active or closed. Returns `false` if closed.
277	fn wait_active(&self) -> bool {
278		let mut state = self.state.lock().unwrap();
279		while !state.active && !state.closed {
280			state = self.cond.wait(state).unwrap();
281		}
282		!state.closed
283	}
284}