1use std::sync::Arc;
16
17use lvqr_fragment::{Fragment, FragmentBroadcaster, FragmentBroadcasterRegistry, FragmentMeta};
18use tracing::{debug, info, warn};
19
20use crate::rendition::RenditionSpec;
21use crate::transcoder::{Transcoder, TranscoderContext, TranscoderFactory};
22
23const SOURCE_TRACK: &str = "1.mp4";
26
27const OUTPUT_TRACK: &str = "1.mp4";
31
32pub struct AudioPassthroughTranscoderFactory {
40 rendition: RenditionSpec,
41 output_registry: FragmentBroadcasterRegistry,
42 skip_source_suffixes: Vec<String>,
43}
44
45impl AudioPassthroughTranscoderFactory {
46 pub fn new(rendition: RenditionSpec, output_registry: FragmentBroadcasterRegistry) -> Self {
49 Self {
50 rendition,
51 output_registry,
52 skip_source_suffixes: Vec::new(),
53 }
54 }
55
56 pub fn skip_source_suffixes(mut self, suffixes: impl IntoIterator<Item = impl Into<String>>) -> Self {
65 self.skip_source_suffixes.extend(suffixes.into_iter().map(Into::into));
66 self
67 }
68}
69
70impl TranscoderFactory for AudioPassthroughTranscoderFactory {
71 fn name(&self) -> &str {
72 "audio-passthrough"
73 }
74
75 fn rendition(&self) -> &RenditionSpec {
76 &self.rendition
77 }
78
79 fn build(&self, ctx: &TranscoderContext) -> Option<Box<dyn Transcoder>> {
80 if ctx.track != SOURCE_TRACK {
81 return None;
82 }
83 if looks_like_rendition_output(&ctx.broadcast, &self.skip_source_suffixes) {
84 debug!(
85 broadcast = %ctx.broadcast,
86 rendition = %self.rendition.name,
87 "AudioPassthroughTranscoderFactory: skipping already-transcoded broadcast",
88 );
89 return None;
90 }
91 Some(Box::new(AudioPassthroughTranscoder::new(
92 self.rendition.clone(),
93 ctx.broadcast.clone(),
94 self.output_registry.clone(),
95 )))
96 }
97}
98
99pub struct AudioPassthroughTranscoder {
102 rendition: RenditionSpec,
103 source_broadcast: String,
104 output_registry: FragmentBroadcasterRegistry,
105 output_bc: Option<Arc<FragmentBroadcaster>>,
106 forwarded: u64,
107}
108
109impl AudioPassthroughTranscoder {
110 fn new(rendition: RenditionSpec, source_broadcast: String, output_registry: FragmentBroadcasterRegistry) -> Self {
111 Self {
112 rendition,
113 source_broadcast,
114 output_registry,
115 output_bc: None,
116 forwarded: 0,
117 }
118 }
119
120 fn output_broadcast_name(&self) -> String {
121 format!("{}/{}", self.source_broadcast, self.rendition.name)
122 }
123
124 pub fn forwarded(&self) -> u64 {
127 self.forwarded
128 }
129}
130
131impl Transcoder for AudioPassthroughTranscoder {
132 fn on_start(&mut self, ctx: &TranscoderContext) {
133 let output_name = self.output_broadcast_name();
134 let output_meta = FragmentMeta {
135 codec: ctx.meta.codec.clone(),
136 timescale: ctx.meta.timescale,
137 init_segment: ctx.meta.init_segment.clone(),
138 };
139 let bc = self
140 .output_registry
141 .get_or_create(&output_name, OUTPUT_TRACK, output_meta);
142 if let Some(ref init) = ctx.meta.init_segment {
143 bc.set_init_segment(init.clone());
144 }
145 info!(
146 broadcast = %self.source_broadcast,
147 output = %output_name,
148 rendition = %self.rendition.name,
149 codec = %ctx.meta.codec,
150 timescale = ctx.meta.timescale,
151 "AudioPassthroughTranscoder started",
152 );
153 self.output_bc = Some(bc);
154 }
155
156 fn on_fragment(&mut self, fragment: &Fragment) {
157 let Some(bc) = self.output_bc.as_ref() else {
158 warn!(
159 rendition = %self.rendition.name,
160 broadcast = %self.source_broadcast,
161 "AudioPassthroughTranscoder: on_fragment before on_start; dropping",
162 );
163 return;
164 };
165 let clone = Fragment::new(
166 OUTPUT_TRACK,
167 fragment.group_id,
168 fragment.object_id,
169 fragment.priority,
170 fragment.dts,
171 fragment.pts,
172 fragment.duration,
173 fragment.flags,
174 fragment.payload.clone(),
175 );
176 bc.emit(clone);
177 self.forwarded = self.forwarded.saturating_add(1);
178 metrics::counter!(
179 "lvqr_transcode_output_fragments_total",
180 "transcoder" => "audio-passthrough",
181 "rendition" => self.rendition.name.clone(),
182 )
183 .increment(1);
184 metrics::counter!(
185 "lvqr_transcode_output_bytes_total",
186 "transcoder" => "audio-passthrough",
187 "rendition" => self.rendition.name.clone(),
188 )
189 .increment(fragment.payload.len() as u64);
190 }
191
192 fn on_stop(&mut self) {
193 info!(
194 broadcast = %self.source_broadcast,
195 rendition = %self.rendition.name,
196 forwarded = self.forwarded,
197 "AudioPassthroughTranscoder stopped",
198 );
199 self.output_bc = None;
200 }
201}
202
203fn looks_like_rendition_output(broadcast: &str, extra: &[String]) -> bool {
209 let Some(suffix) = broadcast.rsplit('/').next() else {
210 return false;
211 };
212 if suffix.is_empty() {
213 return false;
214 }
215 if extra.iter().any(|s| s == suffix) {
216 return true;
217 }
218 if suffix.len() < 2 {
219 return false;
220 }
221 let bytes = suffix.as_bytes();
222 if *bytes.last().unwrap() != b'p' {
223 return false;
224 }
225 bytes[..bytes.len() - 1].iter().all(|b| b.is_ascii_digit())
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use bytes::Bytes;
232 use lvqr_fragment::{Fragment, FragmentFlags, FragmentMeta, FragmentStream};
233
234 fn ctx(broadcast: &str, track: &str, rendition: RenditionSpec) -> TranscoderContext {
235 TranscoderContext {
236 broadcast: broadcast.into(),
237 track: track.into(),
238 meta: FragmentMeta::new("mp4a.40.2", 48_000),
239 rendition,
240 }
241 }
242
243 fn audio_frag(idx: u64, payload: &[u8]) -> Fragment {
244 Fragment::new(
245 "1.mp4",
246 idx,
247 0,
248 0,
249 idx * 1024,
250 idx * 1024,
251 1024,
252 FragmentFlags::KEYFRAME,
253 Bytes::copy_from_slice(payload),
254 )
255 }
256
257 #[test]
258 fn factory_opts_out_of_non_audio_tracks() {
259 let registry = FragmentBroadcasterRegistry::new();
260 let factory = AudioPassthroughTranscoderFactory::new(RenditionSpec::preset_720p(), registry);
261 for track in ["0.mp4", "captions", "catalog", ".catalog"] {
262 let c = ctx("live/demo", track, factory.rendition().clone());
263 assert!(
264 factory.build(&c).is_none(),
265 "factory must opt out of non-audio track {track}",
266 );
267 }
268 }
269
270 #[test]
271 fn factory_builds_transcoder_for_audio_track() {
272 let registry = FragmentBroadcasterRegistry::new();
273 let factory = AudioPassthroughTranscoderFactory::new(RenditionSpec::preset_480p(), registry);
274 let c = ctx("live/demo", "1.mp4", factory.rendition().clone());
275 assert!(factory.build(&c).is_some());
276 }
277
278 #[test]
279 fn factory_skips_already_transcoded_broadcast() {
280 let registry = FragmentBroadcasterRegistry::new();
281 let factory = AudioPassthroughTranscoderFactory::new(RenditionSpec::preset_720p(), registry);
282 for broadcast in ["live/demo/720p", "live/demo/480p", "cam/1080p"] {
283 let c = ctx(broadcast, "1.mp4", factory.rendition().clone());
284 assert!(
285 factory.build(&c).is_none(),
286 "factory must skip already-transcoded broadcast {broadcast}",
287 );
288 }
289 }
290
291 #[test]
292 fn factory_honors_custom_skip_suffix() {
293 let registry = FragmentBroadcasterRegistry::new();
294 let factory = AudioPassthroughTranscoderFactory::new(RenditionSpec::preset_720p(), registry)
295 .skip_source_suffixes(["ultra"]);
296 let c = ctx("live/demo/ultra", "1.mp4", factory.rendition().clone());
297 assert!(factory.build(&c).is_none());
298 let c = ctx("live/demo/720p", "1.mp4", factory.rendition().clone());
300 assert!(factory.build(&c).is_none());
301 let c = ctx("live/demo", "1.mp4", factory.rendition().clone());
303 assert!(factory.build(&c).is_some());
304 }
305
306 #[test]
307 fn transcoder_forwards_fragments_verbatim() {
308 let registry = FragmentBroadcasterRegistry::new();
309 let mut transcoder =
310 AudioPassthroughTranscoder::new(RenditionSpec::preset_720p(), "live/demo".into(), registry.clone());
311 let c = ctx("live/demo", "1.mp4", RenditionSpec::preset_720p());
312 transcoder.on_start(&c);
313
314 let bc = registry
316 .get("live/demo/720p", "1.mp4")
317 .expect("audio output broadcaster must exist after on_start");
318 let mut sub = bc.subscribe();
319
320 let payloads: Vec<&[u8]> = vec![b"aac0", b"aac1xx", b"aac2xxxx"];
321 for (i, p) in payloads.iter().enumerate() {
322 transcoder.on_fragment(&audio_frag(i as u64, p));
323 }
324
325 let rt = tokio::runtime::Builder::new_current_thread()
328 .enable_time()
329 .build()
330 .expect("runtime");
331 rt.block_on(async {
332 for (i, p) in payloads.iter().enumerate() {
333 let got = tokio::time::timeout(std::time::Duration::from_millis(200), sub.next_fragment())
334 .await
335 .expect("timeout")
336 .expect("closed");
337 assert_eq!(got.group_id, i as u64);
338 assert_eq!(got.payload.as_ref(), *p);
339 assert_eq!(got.track_id.as_str(), "1.mp4");
340 }
341 });
342 assert_eq!(transcoder.forwarded(), 3);
343 transcoder.on_stop();
344 }
345
346 #[test]
347 fn transcoder_propagates_init_segment_to_output() {
348 let registry = FragmentBroadcasterRegistry::new();
349 let mut transcoder =
350 AudioPassthroughTranscoder::new(RenditionSpec::preset_480p(), "live/demo".into(), registry.clone());
351 let mut meta = FragmentMeta::new("mp4a.40.2", 48_000);
352 meta.init_segment = Some(Bytes::from_static(b"AAC-ASC"));
353 let c = TranscoderContext {
354 broadcast: "live/demo".into(),
355 track: "1.mp4".into(),
356 meta,
357 rendition: RenditionSpec::preset_480p(),
358 };
359 transcoder.on_start(&c);
360 let bc = registry
361 .get("live/demo/480p", "1.mp4")
362 .expect("output broadcaster exists after on_start");
363 let snapshot = bc.meta();
364 assert_eq!(
365 snapshot.init_segment.as_deref(),
366 Some(b"AAC-ASC" as &[u8]),
367 "passthrough must copy the source ASC onto the output broadcast",
368 );
369 }
370}