Skip to main content

lvqr_transcode/
audio_passthrough.rs

1//! [`AudioPassthroughTranscoder`] + [`AudioPassthroughTranscoderFactory`].
2//!
3//! Sibling of [`crate::SoftwareTranscoder`] that covers the audio track for
4//! the 4.6 ABR ladder. Unlike the software video encoder this factory
5//! carries no GStreamer dependency and is always available: the source's
6//! AAC `1.mp4` fragments are forwarded verbatim to
7//! `<source>/<rendition>/1.mp4` so each rendition broadcaster is a
8//! self-contained mp4 that LL-HLS composition can drain directly.
9//!
10//! Session 106 C ships this alongside the CLI wiring so every ladder rung
11//! gets paired video (software-encoded) + audio (passthrough) output
12//! broadcasts without the LL-HLS bridge having to special-case the
13//! missing audio.
14
15use std::sync::Arc;
16
17use lvqr_fragment::{Fragment, FragmentBroadcaster, FragmentBroadcasterRegistry, FragmentMeta};
18use tracing::{debug, info, warn};
19
20use crate::rendition::RenditionSpec;
21use crate::transcoder::{Transcoder, TranscoderContext, TranscoderFactory};
22
23/// Source track this factory targets. LVQR's audio-track convention is
24/// `"1.mp4"` across every ingest protocol.
25const SOURCE_TRACK: &str = "1.mp4";
26
27/// Output track name on the rendition broadcaster. Kept identical to the
28/// source track so the LL-HLS bridge's `ensure_audio` path picks it up
29/// without any special-casing.
30const OUTPUT_TRACK: &str = "1.mp4";
31
32/// Factory that builds one [`AudioPassthroughTranscoder`] per source
33/// audio track, republishing fragments onto `<source>/<rendition>/1.mp4`.
34///
35/// Ships without the `transcode` feature on purpose: the factory carries
36/// no GStreamer dependency, so operators without the ladder build still
37/// get audio passthrough when the CLI wiring installs it alongside the
38/// software video encoder.
39pub struct AudioPassthroughTranscoderFactory {
40    rendition: RenditionSpec,
41    output_registry: FragmentBroadcasterRegistry,
42    skip_source_suffixes: Vec<String>,
43}
44
45impl AudioPassthroughTranscoderFactory {
46    /// Build a factory for `rendition` that republishes source audio
47    /// fragments into `output_registry` under `<source>/<rendition>/1.mp4`.
48    pub fn new(rendition: RenditionSpec, output_registry: FragmentBroadcasterRegistry) -> Self {
49        Self {
50            rendition,
51            output_registry,
52            skip_source_suffixes: Vec::new(),
53        }
54    }
55
56    /// Register additional trailing-component suffixes that the factory
57    /// should treat as already-transcoded outputs and skip. Appends to the
58    /// built-in `\d+p` heuristic; the default recursion guard (an all-
59    /// digits + trailing `p` suffix like `720p`) stays in effect.
60    ///
61    /// Operators running custom rendition names (`ultra`, `low-motion`,
62    /// etc.) pass them here so the factory does not rebuild transcoders
63    /// on its own outputs.
64    pub fn skip_source_suffixes(mut self, suffixes: impl IntoIterator<Item = impl Into<String>>) -> Self {
65        self.skip_source_suffixes.extend(suffixes.into_iter().map(Into::into));
66        self
67    }
68}
69
70impl TranscoderFactory for AudioPassthroughTranscoderFactory {
71    fn name(&self) -> &str {
72        "audio-passthrough"
73    }
74
75    fn rendition(&self) -> &RenditionSpec {
76        &self.rendition
77    }
78
79    fn build(&self, ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
80        if ctx.track != SOURCE_TRACK {
81            return None;
82        }
83        if looks_like_rendition_output(&ctx.broadcast, &self.skip_source_suffixes) {
84            debug!(
85                broadcast = %ctx.broadcast,
86                rendition = %self.rendition.name,
87                "AudioPassthroughTranscoderFactory: skipping already-transcoded broadcast",
88            );
89            return None;
90        }
91        Some(Box::new(AudioPassthroughTranscoder::new(
92            self.rendition.clone(),
93            ctx.broadcast.clone(),
94            self.output_registry.clone(),
95        )))
96    }
97}
98
99/// Per-`(source, rendition)` audio passthrough. Forwards every source
100/// `Fragment` verbatim to the rendition broadcaster.
101pub struct AudioPassthroughTranscoder {
102    rendition: RenditionSpec,
103    source_broadcast: String,
104    output_registry: FragmentBroadcasterRegistry,
105    output_bc: Option<Arc<FragmentBroadcaster>>,
106    forwarded: u64,
107}
108
109impl AudioPassthroughTranscoder {
110    fn new(rendition: RenditionSpec, source_broadcast: String, output_registry: FragmentBroadcasterRegistry) -> Self {
111        Self {
112            rendition,
113            source_broadcast,
114            output_registry,
115            output_bc: None,
116            forwarded: 0,
117        }
118    }
119
120    fn output_broadcast_name(&self) -> String {
121        format!("{}/{}", self.source_broadcast, self.rendition.name)
122    }
123
124    /// Count of fragments forwarded to the output broadcaster since
125    /// construction. Test-facing.
126    pub fn forwarded(&self) -> u64 {
127        self.forwarded
128    }
129}
130
131impl Transcoder for AudioPassthroughTranscoder {
132    fn on_start(&mut self, ctx: &TranscoderContext) {
133        let output_name = self.output_broadcast_name();
134        let output_meta = FragmentMeta {
135            codec: ctx.meta.codec.clone(),
136            timescale: ctx.meta.timescale,
137            init_segment: ctx.meta.init_segment.clone(),
138        };
139        let bc = self
140            .output_registry
141            .get_or_create(&output_name, OUTPUT_TRACK, output_meta);
142        if let Some(ref init) = ctx.meta.init_segment {
143            bc.set_init_segment(init.clone());
144        }
145        info!(
146            broadcast = %self.source_broadcast,
147            output = %output_name,
148            rendition = %self.rendition.name,
149            codec = %ctx.meta.codec,
150            timescale = ctx.meta.timescale,
151            "AudioPassthroughTranscoder started",
152        );
153        self.output_bc = Some(bc);
154    }
155
156    fn on_fragment(&mut self, fragment: &Fragment) {
157        let Some(bc) = self.output_bc.as_ref() else {
158            warn!(
159                rendition = %self.rendition.name,
160                broadcast = %self.source_broadcast,
161                "AudioPassthroughTranscoder: on_fragment before on_start; dropping",
162            );
163            return;
164        };
165        let clone = Fragment::new(
166            OUTPUT_TRACK,
167            fragment.group_id,
168            fragment.object_id,
169            fragment.priority,
170            fragment.dts,
171            fragment.pts,
172            fragment.duration,
173            fragment.flags,
174            fragment.payload.clone(),
175        );
176        bc.emit(clone);
177        self.forwarded = self.forwarded.saturating_add(1);
178        metrics::counter!(
179            "lvqr_transcode_output_fragments_total",
180            "transcoder" => "audio-passthrough",
181            "rendition" => self.rendition.name.clone(),
182        )
183        .increment(1);
184        metrics::counter!(
185            "lvqr_transcode_output_bytes_total",
186            "transcoder" => "audio-passthrough",
187            "rendition" => self.rendition.name.clone(),
188        )
189        .increment(fragment.payload.len() as u64);
190    }
191
192    fn on_stop(&mut self) {
193        info!(
194            broadcast = %self.source_broadcast,
195            rendition = %self.rendition.name,
196            forwarded = self.forwarded,
197            "AudioPassthroughTranscoder stopped",
198        );
199        self.output_bc = None;
200    }
201}
202
203/// `true` when `broadcast`'s trailing path component looks like a
204/// rendition-output broadcast that this factory should skip. Matches the
205/// built-in `\d+p` convention (`720p`, `480p`, `1080p`) OR any suffix in
206/// `extra` (for operators running custom rendition names via
207/// [`AudioPassthroughTranscoderFactory::skip_source_suffixes`]).
208fn looks_like_rendition_output(broadcast: &str, extra: &[String]) -> bool {
209    let Some(suffix) = broadcast.rsplit('/').next() else {
210        return false;
211    };
212    if suffix.is_empty() {
213        return false;
214    }
215    if extra.iter().any(|s| s == suffix) {
216        return true;
217    }
218    if suffix.len() < 2 {
219        return false;
220    }
221    let bytes = suffix.as_bytes();
222    if *bytes.last().unwrap() != b'p' {
223        return false;
224    }
225    bytes[..bytes.len() - 1].iter().all(|b| b.is_ascii_digit())
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use bytes::Bytes;
232    use lvqr_fragment::{Fragment, FragmentFlags, FragmentMeta, FragmentStream};
233
234    fn ctx(broadcast: &str, track: &str, rendition: RenditionSpec) -> TranscoderContext {
235        TranscoderContext {
236            broadcast: broadcast.into(),
237            track: track.into(),
238            meta: FragmentMeta::new("mp4a.40.2", 48_000),
239            rendition,
240        }
241    }
242
243    fn audio_frag(idx: u64, payload: &[u8]) -> Fragment {
244        Fragment::new(
245            "1.mp4",
246            idx,
247            0,
248            0,
249            idx * 1024,
250            idx * 1024,
251            1024,
252            FragmentFlags::KEYFRAME,
253            Bytes::copy_from_slice(payload),
254        )
255    }
256
257    #[test]
258    fn factory_opts_out_of_non_audio_tracks() {
259        let registry = FragmentBroadcasterRegistry::new();
260        let factory = AudioPassthroughTranscoderFactory::new(RenditionSpec::preset_720p(), registry);
261        for track in ["0.mp4", "captions", "catalog", ".catalog"] {
262            let c = ctx("live/demo", track, factory.rendition().clone());
263            assert!(
264                factory.build(&c).is_none(),
265                "factory must opt out of non-audio track {track}",
266            );
267        }
268    }
269
270    #[test]
271    fn factory_builds_transcoder_for_audio_track() {
272        let registry = FragmentBroadcasterRegistry::new();
273        let factory = AudioPassthroughTranscoderFactory::new(RenditionSpec::preset_480p(), registry);
274        let c = ctx("live/demo", "1.mp4", factory.rendition().clone());
275        assert!(factory.build(&c).is_some());
276    }
277
278    #[test]
279    fn factory_skips_already_transcoded_broadcast() {
280        let registry = FragmentBroadcasterRegistry::new();
281        let factory = AudioPassthroughTranscoderFactory::new(RenditionSpec::preset_720p(), registry);
282        for broadcast in ["live/demo/720p", "live/demo/480p", "cam/1080p"] {
283            let c = ctx(broadcast, "1.mp4", factory.rendition().clone());
284            assert!(
285                factory.build(&c).is_none(),
286                "factory must skip already-transcoded broadcast {broadcast}",
287            );
288        }
289    }
290
291    #[test]
292    fn factory_honors_custom_skip_suffix() {
293        let registry = FragmentBroadcasterRegistry::new();
294        let factory = AudioPassthroughTranscoderFactory::new(RenditionSpec::preset_720p(), registry)
295            .skip_source_suffixes(["ultra"]);
296        let c = ctx("live/demo/ultra", "1.mp4", factory.rendition().clone());
297        assert!(factory.build(&c).is_none());
298        // The default `\d+p` heuristic still applies alongside the custom list.
299        let c = ctx("live/demo/720p", "1.mp4", factory.rendition().clone());
300        assert!(factory.build(&c).is_none());
301        // A regular source broadcast still builds.
302        let c = ctx("live/demo", "1.mp4", factory.rendition().clone());
303        assert!(factory.build(&c).is_some());
304    }
305
306    #[test]
307    fn transcoder_forwards_fragments_verbatim() {
308        let registry = FragmentBroadcasterRegistry::new();
309        let mut transcoder =
310            AudioPassthroughTranscoder::new(RenditionSpec::preset_720p(), "live/demo".into(), registry.clone());
311        let c = ctx("live/demo", "1.mp4", RenditionSpec::preset_720p());
312        transcoder.on_start(&c);
313
314        // Subscribe to the output broadcast AFTER on_start so the bc exists.
315        let bc = registry
316            .get("live/demo/720p", "1.mp4")
317            .expect("audio output broadcaster must exist after on_start");
318        let mut sub = bc.subscribe();
319
320        let payloads: Vec<&[u8]> = vec![b"aac0", b"aac1xx", b"aac2xxxx"];
321        for (i, p) in payloads.iter().enumerate() {
322            transcoder.on_fragment(&audio_frag(i as u64, p));
323        }
324
325        // Poll the output; the runtime wraps next_fragment in a future, so
326        // drive it via a tiny tokio current-thread runtime.
327        let rt = tokio::runtime::Builder::new_current_thread()
328            .enable_time()
329            .build()
330            .expect("runtime");
331        rt.block_on(async {
332            for (i, p) in payloads.iter().enumerate() {
333                let got = tokio::time::timeout(std::time::Duration::from_millis(200), sub.next_fragment())
334                    .await
335                    .expect("timeout")
336                    .expect("closed");
337                assert_eq!(got.group_id, i as u64);
338                assert_eq!(got.payload.as_ref(), *p);
339                assert_eq!(got.track_id.as_str(), "1.mp4");
340            }
341        });
342        assert_eq!(transcoder.forwarded(), 3);
343        transcoder.on_stop();
344    }
345
346    #[test]
347    fn transcoder_propagates_init_segment_to_output() {
348        let registry = FragmentBroadcasterRegistry::new();
349        let mut transcoder =
350            AudioPassthroughTranscoder::new(RenditionSpec::preset_480p(), "live/demo".into(), registry.clone());
351        let mut meta = FragmentMeta::new("mp4a.40.2", 48_000);
352        meta.init_segment = Some(Bytes::from_static(b"AAC-ASC"));
353        let c = TranscoderContext {
354            broadcast: "live/demo".into(),
355            track: "1.mp4".into(),
356            meta,
357            rendition: RenditionSpec::preset_480p(),
358        };
359        transcoder.on_start(&c);
360        let bc = registry
361            .get("live/demo/480p", "1.mp4")
362            .expect("output broadcaster exists after on_start");
363        let snapshot = bc.meta();
364        assert_eq!(
365            snapshot.init_segment.as_deref(),
366            Some(b"AAC-ASC" as &[u8]),
367            "passthrough must copy the source ASC onto the output broadcast",
368        );
369    }
370}