moq_video/encode/
producer.rs1use std::sync::{Arc, Condvar, Mutex};
10
11use moq_mux::container::Timestamp;
12
13use crate::Error;
14use crate::capture::{self, Camera};
15
16use super::encoder::{self, Encoder};
17
18const DEFAULT_FRAMERATE: u32 = 30;
20
21pub struct Producer {
28 import: moq_mux::codec::h264::Import,
29}
30
31impl Producer {
32 pub fn new(broadcast: moq_net::BroadcastProducer, catalog: moq_mux::catalog::Producer) -> Result<Self, Error> {
33 let import =
34 moq_mux::codec::h264::Import::new(broadcast, catalog).with_mode(moq_mux::codec::h264::Mode::Avc3)?;
35 Ok(Self { import })
36 }
37
38 pub fn track(&self) -> Option<&moq_net::TrackProducer> {
42 self.import.track()
43 }
44
45 pub fn publish(&mut self, packets: Vec<bytes::Bytes>, timestamp: Timestamp) -> Result<(), Error> {
47 for mut packet in packets {
48 self.import.decode_frame(&mut packet, Some(timestamp))?;
49 }
50 Ok(())
51 }
52
53 pub fn finish(&mut self) -> Result<(), Error> {
55 self.import.finish()?;
56 Ok(())
57 }
58}
59
60#[derive(Clone, Debug, Default)]
68#[non_exhaustive]
69pub struct Options {
70 pub bitrate: Option<u64>,
72 pub kind: encoder::Kind,
74}
75
76pub async fn publish_capture(
84 broadcast: moq_net::BroadcastProducer,
85 catalog: moq_mux::catalog::Producer,
86 capture: capture::Config,
87 encode: Options,
88 clock: moq_mux::Clock,
89) -> Result<(), Error> {
90 if capture.framerate == Some(0) {
93 return Err(Error::InvalidFramerate(0));
94 }
95
96 let producer = Producer::new(broadcast, catalog)?;
97 let track = producer
98 .track()
99 .cloned()
100 .ok_or_else(|| Error::Codec(anyhow::anyhow!("avc3 track was not created")))?;
101
102 let gate = Gate::new();
103
104 let worker_gate = gate.clone();
106 let mut worker = tokio::task::spawn_blocking(move || capture_loop(producer, capture, encode, worker_gate, clock));
107
108 tokio::select! {
109 res = &mut worker => res.map_err(|e| Error::Codec(anyhow::anyhow!("capture task: {e}")))?,
111 () = monitor_demand(&track, &gate) => {
113 gate.close();
114 worker
115 .await
116 .map_err(|e| Error::Codec(anyhow::anyhow!("capture task: {e}")))?
117 }
118 }
119}
120
121async fn monitor_demand(track: &moq_net::TrackProducer, gate: &Gate) {
124 loop {
125 match track.used().await {
126 Ok(()) => gate.set_active(true),
127 Err(err) => return log_track_ended(err),
128 }
129 match track.unused().await {
130 Ok(()) => gate.set_active(false),
131 Err(err) => return log_track_ended(err),
132 }
133 }
134}
135
136fn log_track_ended(err: moq_net::Error) {
140 if matches!(err, moq_net::Error::Dropped | moq_net::Error::Closed) {
141 tracing::debug!("video track no longer announced; stopping capture");
142 } else {
143 tracing::warn!(error = %err, "video track aborted; stopping capture");
144 }
145}
146
147fn capture_loop(
151 mut producer: Producer,
152 capture: capture::Config,
153 encode: Options,
154 gate: Arc<Gate>,
155 clock: moq_mux::Clock,
156) -> Result<(), Error> {
157 let mut camera: Option<Camera> = None;
158 let mut encoder: Option<Encoder> = None;
159 let mut last_ts = Timestamp::from_micros(0)?;
160 let mut catalog_ready = false;
165
166 loop {
167 if catalog_ready && !gate.is_active() {
168 if camera.take().is_some() {
171 encoder = None;
172 tracing::info!("no viewers: released camera");
173 }
174 if !gate.wait_active() {
175 break; }
177 continue;
178 }
179
180 if camera.is_none() {
183 let cam = Camera::open(&capture)?;
184 let framerate = capture
187 .framerate
188 .or_else(|| cam.framerate())
189 .unwrap_or(DEFAULT_FRAMERATE);
190 let mut encoder_config = encoder::Config::new(cam.width(), cam.height(), framerate);
191 encoder_config.bitrate = encode.bitrate;
192 encoder_config.kind = encode.kind.clone();
193 let enc = Encoder::new(&encoder_config)?;
194 tracing::info!(
195 encoder = enc.name(),
196 device = cam.device(),
197 "viewer subscribed: capturing"
198 );
199 camera = Some(cam);
200 encoder = Some(enc);
201 }
202
203 let frame = match camera.as_mut().expect("camera open above").read()? {
204 Some(frame) => frame,
205 None => break, };
207
208 let ts = Timestamp::from_micros(clock.micros())?;
209 last_ts = ts;
210
211 let packets = encoder.as_mut().expect("encoder built above").encode(&frame)?;
212 catalog_ready |= !packets.is_empty();
215 producer.publish(packets, ts)?;
216 }
217
218 if let Some(enc) = encoder.as_mut() {
222 match enc.finish() {
223 Ok(packets) => {
224 if let Err(err) = producer.publish(packets, last_ts) {
225 tracing::warn!(error = %err, "failed to publish final video packets");
226 }
227 }
228 Err(err) => tracing::warn!(error = %err, "failed to flush video encoder"),
229 }
230 }
231 producer.finish()?;
232 Ok(())
233}
234
235struct Gate {
238 state: Mutex<GateState>,
239 cond: Condvar,
240}
241
242#[derive(Default)]
243struct GateState {
244 active: bool,
245 closed: bool,
246}
247
248impl Gate {
249 fn new() -> Arc<Self> {
250 Arc::new(Self {
251 state: Mutex::new(GateState::default()),
252 cond: Condvar::new(),
253 })
254 }
255
256 fn set_active(&self, active: bool) {
257 let mut state = self.state.lock().unwrap();
258 state.active = active;
259 self.cond.notify_all();
260 }
261
262 fn close(&self) {
263 let mut state = self.state.lock().unwrap();
264 state.active = false;
268 state.closed = true;
269 self.cond.notify_all();
270 }
271
272 fn is_active(&self) -> bool {
273 self.state.lock().unwrap().active
274 }
275
276 fn wait_active(&self) -> bool {
278 let mut state = self.state.lock().unwrap();
279 while !state.active && !state.closed {
280 state = self.cond.wait(state).unwrap();
281 }
282 !state.closed
283 }
284}