1use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use anyhow::{Context, Result, bail};
20use bytes::Bytes;
21
22use codec::audio::{
23 AudioCodec, AudioEncoderConfig, create_decoder as audio_decoder,
24 create_encoder as audio_encoder,
25};
26use codec::encode::{self, EncoderBackend, EncoderConfig};
27use codec::frame::{ColorMetadata, VideoFrame};
28use codec::colorspace;
29use container::cmaf::CmafAudioMuxer;
30use container::demux::AudioTrack;
31use container::hls::{AudioVariantSpec, VideoVariantSpec, write_hls_package};
32use container::mux::Av1Mp4Muxer;
33use container::streaming::{self, DemuxHeader};
34use container::AudioInfo;
35
36use crate::cmaf_util::{self, add_audio_sample_with_segment_flush, keyframe_interval_for_segment};
37use crate::decode_pump::{DecodePumpConfig, run_shared_decode_pump_blocking};
38use crate::multigpu::{self, MultiGpuParams, RungManifest, RungPackets};
39use crate::progress::{JobEvent, ProgressSink, RungProgress, RungStatus};
40use crate::spec::{AudioPolicy, EncodePolicy, OutputMode, OutputSpec, Rung};
41use crate::validate::needs_chroma_downsample;
42
43const FRAME_CHANNEL_CAPACITY: usize = 8;
45
46#[derive(Debug)]
48pub enum RungArtifact {
49 File(Vec<u8>),
51 HlsRendition {
53 dir: PathBuf,
54 relative_dir: String,
55 },
56}
57
58#[derive(Debug)]
60pub struct RungOutput {
61 pub label: String,
62 pub width: u32,
63 pub height: u32,
64 pub frames: u64,
65 pub bytes: u64,
66 pub artifact: RungArtifact,
67}
68
69#[derive(Debug)]
71pub struct JobOutput {
72 pub rungs: Vec<RungOutput>,
75 pub hls_root: Option<PathBuf>,
77 pub master_playlist: Option<PathBuf>,
79 pub source_codec: String,
80 pub source_dims: (u32, u32),
81 pub source_frame_rate: f64,
82 pub audio_handling: String,
84 pub elapsed: Duration,
85}
86
87pub async fn run_job(
94 input: Bytes,
95 spec: &OutputSpec,
96 output_dir: Option<&Path>,
97 sink: Arc<dyn ProgressSink>,
98) -> Result<JobOutput> {
99 let started = Instant::now();
100 spec.validate().context("invalid OutputSpec")?;
101
102 let (header, audio_track) = {
103 let demuxer = streaming::demux_streaming(&input).context("demux")?;
104 (demuxer.header().clone(), demuxer.audio().cloned())
105 };
106 let source_codec = header.codec.to_ascii_lowercase();
107 let source_dims = (header.info.width, header.info.height);
108 let source_frame_rate = header.info.frame_rate;
109
110 sink.on_event(JobEvent::Started { rungs: spec.rungs.len() });
111 sink.on_event(JobEvent::Probed {
112 codec: source_codec.clone(),
113 width: header.info.width,
114 height: header.info.height,
115 frame_rate: header.info.frame_rate,
116 audio_codec: audio_track.as_ref().map(|t| t.codec.to_ascii_lowercase()),
117 });
118
119 let frame_rate = {
120 let mut fr = if header.info.frame_rate > 0.0 { header.info.frame_rate } else { 30.0 };
121 if let Some(cap) = spec.max_frame_rate {
122 fr = fr.min(cap);
123 }
124 fr
125 };
126 let frames_total = if header.info.total_frames > 0 {
127 Some(header.info.total_frames)
128 } else {
129 None
130 };
131
132 let prepared_audio = prepare_audio(audio_track.as_ref(), spec.audio).context("preparing audio")?;
133 let audio_handling = prepared_audio
134 .as_ref()
135 .map(|a| a.handling.clone())
136 .unwrap_or_else(|| "none".to_string());
137
138 let (rungs, hls_root, master_playlist) = match &spec.mode {
139 OutputMode::SingleFile => {
140 let rungs = run_single_file(
141 input.clone(),
142 spec,
143 &header,
144 frame_rate,
145 frames_total,
146 prepared_audio.as_ref(),
147 Arc::clone(&sink),
148 )
149 .await?;
150 (rungs, None, None)
151 }
152 OutputMode::Hls { segment_seconds } => {
153 run_hls(
154 input.clone(),
155 spec,
156 *segment_seconds,
157 &header,
158 frame_rate,
159 prepared_audio.as_ref(),
160 output_dir,
161 Arc::clone(&sink),
162 )
163 .await?
164 }
165 };
166
167 let completed = rungs.len();
168 sink.on_event(JobEvent::Finished {
169 rungs_completed: completed,
170 rungs_failed: spec.rungs.len().saturating_sub(completed),
171 });
172
173 Ok(JobOutput {
174 rungs,
175 hls_root,
176 master_playlist,
177 source_codec,
178 source_dims,
179 source_frame_rate,
180 audio_handling,
181 elapsed: started.elapsed(),
182 })
183}
184
185pub fn run_job_blocking(
187 input: &[u8],
188 spec: &OutputSpec,
189 output_dir: Option<&Path>,
190 sink: Arc<dyn ProgressSink>,
191) -> Result<JobOutput> {
192 let rt = tokio::runtime::Builder::new_multi_thread()
193 .enable_all()
194 .build()
195 .context("building Tokio runtime")?;
196 rt.block_on(run_job(Bytes::copy_from_slice(input), spec, output_dir, sink))
197}
198
199#[allow(clippy::too_many_arguments)]
204async fn run_single_file(
205 input: Bytes,
206 spec: &OutputSpec,
207 header: &DemuxHeader,
208 frame_rate: f64,
209 frames_total: Option<u64>,
210 audio: Option<&PreparedAudio>,
211 sink: Arc<dyn ProgressSink>,
212) -> Result<Vec<RungOutput>> {
213 let total_input_frames = if header.info.total_frames > 0 {
220 header.info.total_frames
221 } else {
222 (header.info.duration * frame_rate).round().max(0.0) as u64
223 };
224 let gpu_pool = multigpu::gpu_pool_for_policy(spec.encode_policy);
225 if matches!(
226 spec.encode_policy,
227 EncodePolicy::AllGpus | EncodePolicy::Family(_)
228 ) && total_input_frames > 0
229 && gpu_pool.capacity() > 1
230 && spec.chunk_seam_mode != crate::spec::ChunkSeamMode::Serial
233 {
234 return run_single_file_multigpu(
235 input,
236 spec,
237 header,
238 frame_rate,
239 total_input_frames,
240 audio,
241 gpu_pool,
242 sink,
243 )
244 .await;
245 }
246
247 let encode_gpu = multigpu::serial_gpu_for_policy(spec.encode_policy);
251 let decode_gpu = spec.decode_gpu.or(encode_gpu);
252 let (output_color_metadata, output_pixel_format) =
253 spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
254 let backend_override = encoder_backend_override();
255 let base_cfg = EncoderConfig {
256 frame_rate,
257 pixel_format: output_pixel_format,
258 color_metadata: output_color_metadata,
259 gpu_index: encode_gpu,
260 ..EncoderConfig::default()
261 };
262 let pump_cfg = DecodePumpConfig {
263 codec_name: header.codec.clone(),
264 info_for_decoder: header.info.clone(),
265 source_color_metadata: header.info.color_metadata,
266 source_pixel_format: header.info.pixel_format,
267 needs_downsample: needs_chroma_downsample(header.info.pixel_format),
268 tonemap_to_sdr: spec.tonemaps(),
269 gpu_index: decode_gpu,
270 };
271 let rt = tokio::runtime::Handle::current();
272
273 let mut senders = Vec::with_capacity(spec.rungs.len());
274 let mut handles = Vec::with_capacity(spec.rungs.len());
275 for (idx, rung) in spec.rungs.iter().cloned().enumerate() {
276 let (tx, rx) = tokio::sync::mpsc::channel::<VideoFrame>(FRAME_CHANNEL_CAPACITY);
277 senders.push(tx);
278 let sink = Arc::clone(&sink);
279 let base_cfg = base_cfg.clone();
280 let audio = audio.cloned();
281 let handle = tokio::task::spawn_blocking(move || {
282 let r = encode_rung_single_file(
283 idx, &rung, rx, base_cfg, backend_override, frame_rate, frames_total,
284 audio.as_ref(), sink.as_ref(),
285 );
286 (idx, rung, r)
287 });
288 handles.push(handle);
289 }
290
291 let pump_handle = {
292 let input = input.clone();
293 let rt = rt.clone();
294 tokio::task::spawn_blocking(move || {
295 run_shared_decode_pump_blocking(pump_cfg, input, senders, rt)
296 })
297 };
298
299 let mut outputs = Vec::new();
300 for handle in handles {
301 let (idx, rung, r) = handle.await.context("rung worker task panicked")?;
302 match r {
303 Ok(out) => outputs.push(out),
304 Err(e) => {
305 tracing::warn!(rung = %rung.label, error = %e, "rung failed");
306 report_failed(sink.as_ref(), idx, &rung, &e.to_string());
307 }
308 }
309 }
310 let _ = pump_handle.await.context("decode pump panicked")?.context("decode pump failed")?;
311 if outputs.is_empty() {
312 bail!("all {} rung(s) failed", spec.rungs.len());
313 }
314 Ok(outputs)
315}
316
317#[allow(clippy::too_many_arguments)]
323async fn run_single_file_multigpu(
324 input: Bytes,
325 spec: &OutputSpec,
326 header: &DemuxHeader,
327 frame_rate: f64,
328 total_input_frames: u64,
329 audio: Option<&PreparedAudio>,
330 gpu_pool: Arc<crate::gpu_pool::GpuPool>,
331 sink: Arc<dyn ProgressSink>,
332) -> Result<Vec<RungOutput>> {
333 const CHUNK_SECONDS: f64 = 2.0;
334 let timescale = (frame_rate * 1000.0).round().max(1.0) as u32;
335 let per_frame_ticks = (timescale as f64 / frame_rate.max(1.0)).round().max(1.0) as u32;
336 let keyframe_interval = keyframe_interval_for_segment(CHUNK_SECONDS, frame_rate);
337 let segment_target_ticks = (keyframe_interval as u64) * (per_frame_ticks as u64);
338
339 let (output_color_metadata, output_pixel_format) =
340 spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
341 let params = MultiGpuParams {
342 input,
343 rungs: &spec.rungs,
344 header: header.clone(),
345 source_color_metadata: header.info.color_metadata,
346 source_pixel_format: header.info.pixel_format,
347 tonemap_to_sdr: spec.tonemaps(),
348 output_color_metadata,
349 output_pixel_format,
350 needs_downsample: needs_chroma_downsample(header.info.pixel_format),
351 frame_rate,
352 gpu_pool,
353 gpu_indices: multigpu::policy_gpu_indices(spec.encode_policy),
354 decode_gpu: spec.decode_gpu,
355 output_root: std::env::temp_dir(),
357 timescale,
358 per_frame_ticks,
359 keyframe_interval,
360 segment_target_ticks,
361 total_input_frames,
362 constant_qp: spec.chunk_seam_mode == crate::spec::ChunkSeamMode::ParallelConstQp,
364 };
365 let rung_packets = multigpu::run_multigpu_single_file(params, Arc::clone(&sink)).await?;
366
367 let mut outputs = Vec::new();
368 for rp in rung_packets.into_iter().flatten() {
369 let label = rp.label.clone();
370 match mux_rung_packets_to_mp4(rp, frame_rate, output_color_metadata, audio) {
371 Ok(out) => outputs.push(out),
372 Err(e) => tracing::warn!(rung = %label, error = %e, "stitching rung MP4 failed"),
373 }
374 }
375 if outputs.is_empty() {
376 bail!("multi-GPU single-file: no rung produced a stitched MP4");
377 }
378 Ok(outputs)
379}
380
381fn mux_rung_packets_to_mp4(
383 rp: RungPackets,
384 frame_rate: f64,
385 color_metadata: ColorMetadata,
386 audio: Option<&PreparedAudio>,
387) -> Result<RungOutput> {
388 let mut muxer =
389 Av1Mp4Muxer::new(rp.width, rp.height, frame_rate).context("Av1Mp4Muxer::new")?;
390 muxer.set_color_metadata(color_metadata);
391 if let Some(a) = audio {
392 if let Err(e) = muxer.with_audio(a.info.clone()) {
393 tracing::warn!(rung = %rp.label, "audio rejected ({e}); video-only");
394 } else {
395 for (sample, dur) in &a.samples {
396 muxer.add_audio_sample(sample, 0, *dur).context("add_audio_sample")?;
397 }
398 }
399 }
400 let frames = rp.packets.len() as u64;
401 for pkt in rp.packets {
402 muxer.add_packet(pkt).context("add_packet")?;
403 }
404 let bytes = muxer.finalize().context("finalize")?.to_vec();
405 let nbytes = bytes.len() as u64;
406 Ok(RungOutput {
407 label: rp.label,
408 width: rp.width,
409 height: rp.height,
410 frames,
411 bytes: nbytes,
412 artifact: RungArtifact::File(bytes),
413 })
414}
415
416#[allow(clippy::too_many_arguments)]
417fn encode_rung_single_file(
418 rung_index: usize,
419 rung: &Rung,
420 mut rx: tokio::sync::mpsc::Receiver<VideoFrame>,
421 mut cfg: EncoderConfig,
422 backend: Option<EncoderBackend>,
423 frame_rate: f64,
424 frames_total: Option<u64>,
425 audio: Option<&PreparedAudio>,
426 sink: &dyn ProgressSink,
427) -> Result<RungOutput> {
428 cfg.width = rung.width;
429 cfg.height = rung.height;
430 rung.quality.apply(&mut cfg, frame_rate);
431
432 let out_color = cfg.color_metadata;
433 let mut encoder = encode::select_encoder(cfg, backend)
434 .with_context(|| format!("creating encoder for rung {}", rung.label))?;
435 let mut muxer = Av1Mp4Muxer::new(rung.width, rung.height, frame_rate).context("Av1Mp4Muxer::new")?;
436 muxer.set_color_metadata(out_color);
437
438 if let Some(a) = audio {
439 if let Err(e) = muxer.with_audio(a.info.clone()) {
440 tracing::warn!(rung = %rung.label, "audio rejected ({e}); video-only");
441 } else {
442 for (sample, dur) in &a.samples {
443 muxer.add_audio_sample(sample, 0, *dur).context("add_audio_sample")?;
444 }
445 }
446 }
447
448 let mut frames: u64 = 0;
449 report(sink, rung_index, rung, RungStatus::Running, 0, frames_total, 0, 0);
450 while let Some(frame) = rx.blocking_recv() {
451 let scaled = colorspace::scale_frame(&frame, rung.width, rung.height).context("scale_frame")?;
452 encoder.send_frame(&scaled).context("send_frame")?;
453 while let Some(pkt) = encoder.receive_packet().context("receive_packet")? {
454 muxer.add_packet(pkt).context("add_packet")?;
455 }
456 frames += 1;
457 if frames % 30 == 0 {
458 report(sink, rung_index, rung, RungStatus::Running, frames, frames_total, 0, 0);
459 }
460 }
461 encoder.flush().context("encoder flush")?;
462 while let Some(pkt) = encoder.receive_packet().context("receive_packet drain")? {
463 muxer.add_packet(pkt).context("add_packet drain")?;
464 }
465 report(sink, rung_index, rung, RungStatus::Finalizing, frames, frames_total, 0, 0);
466 let bytes = muxer.finalize().context("finalize")?.to_vec();
467 let nbytes = bytes.len() as u64;
468 report(sink, rung_index, rung, RungStatus::Completed, frames, frames_total, 0, nbytes);
469
470 Ok(RungOutput {
471 label: rung.label.clone(),
472 width: rung.width,
473 height: rung.height,
474 frames,
475 bytes: nbytes,
476 artifact: RungArtifact::File(bytes),
477 })
478}
479
480#[allow(clippy::too_many_arguments)]
485async fn run_hls(
486 input: Bytes,
487 spec: &OutputSpec,
488 segment_seconds: f32,
489 header: &DemuxHeader,
490 frame_rate: f64,
491 audio: Option<&PreparedAudio>,
492 output_dir: Option<&Path>,
493 sink: Arc<dyn ProgressSink>,
494) -> Result<(Vec<RungOutput>, Option<PathBuf>, Option<PathBuf>)> {
495 let root = match output_dir {
496 Some(d) => d.to_path_buf(),
497 None => tempfile::Builder::new()
498 .prefix("rivet-hls-")
499 .tempdir()
500 .context("creating HLS temp dir")?
501 .keep(),
502 };
503
504 let timescale = (frame_rate * 1000.0).round().max(1.0) as u32;
505 let per_frame_ticks = (timescale as f64 / frame_rate.max(1.0)).round().max(1.0) as u32;
506 let keyframe_interval = keyframe_interval_for_segment(segment_seconds as f64, frame_rate);
507 let segment_target_ticks = (keyframe_interval as u64) * (per_frame_ticks as u64);
508 let total_input_frames = if header.info.total_frames > 0 {
509 header.info.total_frames
510 } else {
511 (header.info.duration * frame_rate).round().max(0.0) as u64
512 };
513
514 let gpu_pool = multigpu::gpu_pool_for_policy(spec.encode_policy);
515 let (output_color_metadata, output_pixel_format) =
516 spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
517 let params = MultiGpuParams {
518 input,
519 rungs: &spec.rungs,
520 header: header.clone(),
521 source_color_metadata: header.info.color_metadata,
522 source_pixel_format: header.info.pixel_format,
523 tonemap_to_sdr: spec.tonemaps(),
524 output_color_metadata,
525 output_pixel_format,
526 needs_downsample: needs_chroma_downsample(header.info.pixel_format),
527 frame_rate,
528 gpu_pool,
529 gpu_indices: multigpu::policy_gpu_indices(spec.encode_policy),
530 decode_gpu: spec.decode_gpu,
531 output_root: root.clone(),
532 timescale,
533 per_frame_ticks,
534 keyframe_interval,
535 segment_target_ticks,
536 total_input_frames,
537 constant_qp: false,
539 };
540 let manifests = multigpu::run_multigpu_hls(params, Arc::clone(&sink)).await?;
541
542 let mut rung_outputs = Vec::new();
543 let mut video_specs = Vec::new();
544 for (idx, m) in manifests.into_iter().enumerate() {
545 match m {
546 Some(rm) => {
547 let dir = root.join(&rm.relative_dir);
548 let bytes = dir_size(&dir);
549 video_specs.push(build_video_variant_spec(&rm, frame_rate, bytes));
550 rung_outputs.push(RungOutput {
551 label: rm.label.clone(),
552 width: rm.width,
553 height: rm.height,
554 frames: total_input_frames,
555 bytes,
556 artifact: RungArtifact::HlsRendition {
557 dir,
558 relative_dir: rm.relative_dir,
559 },
560 });
561 }
562 None => {
563 if let Some(rung) = spec.rungs.get(idx) {
564 report_failed(sink.as_ref(), idx, rung, "rung produced no segments");
565 }
566 }
567 }
568 }
569 if rung_outputs.is_empty() {
570 bail!("all {} rung(s) failed", spec.rungs.len());
571 }
572
573 let audio_spec = match audio {
574 Some(a) => build_audio_rendition(&root, a, segment_seconds).context("building HLS audio rendition")?,
575 None => None,
576 };
577 let target_duration = segment_seconds.ceil() as u32;
578 let paths = write_hls_package(&root, &video_specs, audio_spec.as_ref(), target_duration)
579 .context("writing HLS package")?;
580
581 Ok((rung_outputs, Some(root), Some(paths.master_path)))
582}
583
584fn build_video_variant_spec(rm: &RungManifest, frame_rate: f64, bytes: u64) -> VideoVariantSpec {
585 let codec_string = cmaf_util::av1_codec_string_from_init(&rm.manifest.init_path)
586 .unwrap_or_else(|_| "av01.0.08M.08.0.110.01.01.01.0".to_string());
587 let (_avg, peak) = cmaf_util::measure_bandwidth(&rm.manifest);
588 let bandwidth = if peak > 0 {
589 peak
590 } else {
591 let dur = rm.manifest.duration_seconds().max(0.001);
592 ((bytes as f64 * 8.0) / dur) as u32
593 };
594 VideoVariantSpec {
595 width: rm.width,
596 height: rm.height,
597 frame_rate,
598 average_bandwidth_bps: bandwidth,
599 bandwidth_bps: bandwidth,
600 codec_string,
601 supplemental_codecs: None,
602 video_range: None,
603 relative_dir: rm.relative_dir.clone(),
604 manifest: rm.manifest.clone(),
605 }
606}
607
608#[derive(Clone)]
613struct PreparedAudio {
614 info: AudioInfo,
615 samples: Vec<(Vec<u8>, u32)>,
616 handling: String,
617}
618
619impl PreparedAudio {
620 fn has_samples(&self) -> bool {
621 !self.samples.is_empty()
622 }
623}
624
625fn prepare_audio(track: Option<&AudioTrack>, policy: AudioPolicy) -> Result<Option<PreparedAudio>> {
626 let Some(track) = track else {
627 return Ok(None);
628 };
629 if policy == AudioPolicy::Drop {
630 return Ok(None);
631 }
632 let codec = track.codec.to_ascii_lowercase();
633 let passthrough_ok = matches!(codec.as_str(), "aac" | "opus" | "ac3" | "eac3");
634 let force_opus = policy == AudioPolicy::ForceOpus;
635
636 if passthrough_ok && !(force_opus && codec != "opus") {
637 let info = passthrough_info(&codec, track);
638 let samples = track
639 .samples
640 .iter()
641 .cloned()
642 .zip(track.durations.iter().copied())
643 .collect();
644 return Ok(Some(PreparedAudio {
645 info,
646 samples,
647 handling: format!("{codec} passthrough"),
648 }));
649 }
650
651 if matches!(codec.as_str(), "mp3" | "vorbis") || force_opus {
652 if track.channels > 2 {
653 tracing::warn!(codec, channels = track.channels, "multichannel audio dropped");
654 return Ok(Some(dropped(format!("{codec} ({}ch)", track.channels))));
655 }
656 if !matches!(codec.as_str(), "mp3" | "vorbis") {
657 tracing::warn!(codec, "cannot transcode to opus; dropping audio");
658 return Ok(Some(dropped(codec)));
659 }
660 let extra: Option<&[u8]> =
661 if track.codec_private.is_empty() { None } else { Some(track.codec_private.as_slice()) };
662 let mut dec = audio_decoder(&codec, extra, track.sample_rate, track.channels as u8)
663 .context("audio decoder")?;
664 let bitrate = if track.channels == 1 { 64_000 } else { 96_000 };
665 let mut enc = audio_encoder(AudioEncoderConfig {
666 codec: AudioCodec::Opus,
667 sample_rate: track.sample_rate,
668 channels: track.channels as u8,
669 bitrate,
670 })
671 .context("opus encoder")?;
672
673 let mut samples: Vec<(Vec<u8>, u32)> = Vec::new();
674 let mut pts: i64 = 0;
675 for packet in &track.samples {
676 for frame in dec.decode(packet, pts).context("audio decode")? {
677 pts = pts.saturating_add((frame.samples.len() as i64) / frame.channels.max(1) as i64);
678 for pkt in enc.encode(&frame).context("opus encode")? {
679 samples.push((pkt.data, pkt.duration as u32));
680 }
681 }
682 }
683 for frame in dec.flush().context("audio flush")? {
684 for pkt in enc.encode(&frame).context("opus encode flush")? {
685 samples.push((pkt.data, pkt.duration as u32));
686 }
687 }
688 for pkt in enc.flush().context("opus encoder flush")? {
689 samples.push((pkt.data, pkt.duration as u32));
690 }
691 let info = AudioInfo::opus(48_000, track.channels, enc.extra_data());
692 return Ok(Some(PreparedAudio {
693 info,
694 samples,
695 handling: format!("{codec} → opus"),
696 }));
697 }
698
699 Ok(Some(dropped(codec)))
700}
701
702fn dropped(codec: String) -> PreparedAudio {
703 PreparedAudio {
704 info: AudioInfo::aac_lc(48_000, 2, Vec::new()),
705 samples: Vec::new(),
706 handling: format!("{codec} dropped"),
707 }
708}
709
710fn passthrough_info(codec: &str, track: &AudioTrack) -> AudioInfo {
711 match codec {
712 "aac" => AudioInfo::aac_lc(track.sample_rate, track.channels, track.asc.clone()),
713 "opus" => AudioInfo::opus(track.sample_rate, track.channels, track.codec_private.clone()),
714 "ac3" => AudioInfo::ac3(track.sample_rate, track.channels, track.codec_private.clone()),
715 "eac3" => AudioInfo::eac3(track.sample_rate, track.channels, track.codec_private.clone()),
716 _ => AudioInfo::aac_lc(track.sample_rate, track.channels, track.asc.clone()),
717 }
718}
719
720fn build_audio_rendition(
721 asset_root: &Path,
722 audio: &PreparedAudio,
723 segment_seconds: f32,
724) -> Result<Option<AudioVariantSpec>> {
725 if !audio.has_samples() {
726 return Ok(None);
727 }
728 let audio_dir = asset_root.join("audio");
729 let seg_target_ticks = (segment_seconds as f64 * audio.info.timescale as f64).round() as u64;
730 let mut muxer = CmafAudioMuxer::new(&audio_dir, audio.info.clone()).context("CmafAudioMuxer::new")?;
731 for (payload, dur) in &audio.samples {
732 add_audio_sample_with_segment_flush(&mut muxer, payload.clone(), *dur, seg_target_ticks)?;
733 }
734 muxer.flush_segment().context("final audio flush_segment")?;
735 let manifest = muxer.finalize().context("CmafAudioMuxer finalize")?;
736
737 let codec_string = match audio.info.codec.as_str() {
738 "opus" => "opus".to_string(),
739 _ => codec::codec_strings::AAC_LC_CODEC_STRING.to_string(),
740 };
741 Ok(Some(AudioVariantSpec {
742 codec_string,
743 channels: audio.info.channels,
744 sample_rate: audio.info.sample_rate,
745 relative_dir: "audio".to_string(),
746 language: "und".to_string(),
747 name: "Audio".to_string(),
748 manifest,
749 }))
750}
751
752fn encoder_backend_override() -> Option<EncoderBackend> {
757 std::env::var("TRANSCODE_ENCODER_BACKEND")
758 .ok()
759 .and_then(|s| match s.to_ascii_lowercase().as_str() {
760 "nvenc" => Some(EncoderBackend::Nvenc),
761 "amf" => Some(EncoderBackend::Amf),
762 "qsv" => Some(EncoderBackend::Qsv),
763 _ => None,
764 })
765}
766
767fn dir_size(dir: &Path) -> u64 {
768 let mut total = 0;
769 if let Ok(entries) = std::fs::read_dir(dir) {
770 for e in entries.flatten() {
771 if let Ok(meta) = e.metadata() {
772 if meta.is_file() {
773 total += meta.len();
774 }
775 }
776 }
777 }
778 total
779}
780
781#[allow(clippy::too_many_arguments)]
782fn report(
783 sink: &dyn ProgressSink,
784 rung_index: usize,
785 rung: &Rung,
786 status: RungStatus,
787 frames_done: u64,
788 frames_total: Option<u64>,
789 segments: u32,
790 bytes_out: u64,
791) {
792 let percent = match status {
793 RungStatus::Completed => 100.0,
794 RungStatus::Pending => 0.0,
795 _ => match frames_total {
796 Some(total) if total > 0 => ((frames_done as f32 / total as f32) * 100.0).min(99.0),
797 _ => {
798 if frames_done == 0 { 1.0 } else { 50.0 }
799 }
800 },
801 };
802 sink.on_rung(RungProgress {
803 rung_index,
804 label: rung.label.clone(),
805 width: rung.width,
806 height: rung.height,
807 status,
808 percent,
809 frames_done,
810 frames_total,
811 segments_written: segments,
812 bytes_out,
813 message: None,
814 });
815}
816
817fn report_failed(sink: &dyn ProgressSink, rung_index: usize, rung: &Rung, message: &str) {
818 sink.on_rung(RungProgress {
819 rung_index,
820 label: rung.label.clone(),
821 width: rung.width,
822 height: rung.height,
823 status: RungStatus::Failed,
824 percent: 0.0,
825 frames_done: 0,
826 frames_total: None,
827 segments_written: 0,
828 bytes_out: 0,
829 message: Some(message.to_string()),
830 });
831}