1use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use anyhow::{Context, Result, bail};
20use bytes::Bytes;
21
22use codec::audio::{
23 AudioCodec, AudioEncoderConfig, create_decoder as audio_decoder,
24 create_encoder as audio_encoder,
25};
26use codec::encode::{self, EncoderBackend, EncoderConfig};
27use codec::frame::{ColorMetadata, VideoFrame};
28use codec::colorspace;
29use container::cmaf::CmafAudioMuxer;
30use container::demux::AudioTrack;
31use container::hls::{AudioVariantSpec, VideoVariantSpec, write_hls_package};
32use container::mux::Av1Mp4Muxer;
33use container::streaming::{self, DemuxHeader};
34use container::AudioInfo;
35
36use crate::cmaf_util::{self, add_audio_sample_with_segment_flush, keyframe_interval_for_segment};
37use crate::decode_pump::{DecodePumpConfig, run_shared_decode_pump_blocking};
38use crate::multigpu::{self, MultiGpuParams, RungManifest, RungPackets};
39use crate::progress::{JobEvent, ProgressSink, RungProgress, RungStatus};
40use crate::spec::{AudioPolicy, EncodePolicy, OutputMode, OutputSpec, Rung};
41use crate::validate::needs_chroma_downsample;
42
43const FRAME_CHANNEL_CAPACITY: usize = 8;
45
46#[derive(Debug)]
48pub enum RungArtifact {
49 File(Vec<u8>),
51 HlsRendition {
53 dir: PathBuf,
54 relative_dir: String,
55 },
56}
57
58#[derive(Debug)]
60pub struct RungOutput {
61 pub label: String,
62 pub width: u32,
63 pub height: u32,
64 pub frames: u64,
65 pub bytes: u64,
66 pub artifact: RungArtifact,
67}
68
69#[derive(Debug)]
71pub struct JobOutput {
72 pub rungs: Vec<RungOutput>,
75 pub hls_root: Option<PathBuf>,
77 pub master_playlist: Option<PathBuf>,
79 pub source_codec: String,
80 pub source_dims: (u32, u32),
81 pub source_frame_rate: f64,
82 pub audio_handling: String,
84 pub elapsed: Duration,
85}
86
87pub async fn run_job(
94 input: Bytes,
95 spec: &OutputSpec,
96 output_dir: Option<&Path>,
97 sink: Arc<dyn ProgressSink>,
98) -> Result<JobOutput> {
99 let started = Instant::now();
100 spec.validate().context("invalid OutputSpec")?;
101
102 let (header, audio_track) = {
103 let demuxer = streaming::demux_streaming(&input).context("demux")?;
104 (demuxer.header().clone(), demuxer.audio().cloned())
105 };
106 let source_codec = header.codec.to_ascii_lowercase();
107 let source_dims = (header.info.width, header.info.height);
108 let source_frame_rate = header.info.frame_rate;
109
110 sink.on_event(JobEvent::Started { rungs: spec.rungs.len() });
111 sink.on_event(JobEvent::Probed {
112 codec: source_codec.clone(),
113 width: header.info.width,
114 height: header.info.height,
115 frame_rate: header.info.frame_rate,
116 audio_codec: audio_track.as_ref().map(|t| t.codec.to_ascii_lowercase()),
117 });
118
119 let frame_rate = {
120 let mut fr = if header.info.frame_rate > 0.0 { header.info.frame_rate } else { 30.0 };
121 if let Some(cap) = spec.max_frame_rate {
122 fr = fr.min(cap);
123 }
124 fr
125 };
126 let frames_total = if header.info.total_frames > 0 {
127 Some(header.info.total_frames)
128 } else {
129 None
130 };
131
132 let prepared_audio = prepare_audio(audio_track.as_ref(), spec.audio).context("preparing audio")?;
133 let audio_handling = prepared_audio
134 .as_ref()
135 .map(|a| a.handling.clone())
136 .unwrap_or_else(|| "none".to_string());
137
138 let filter_chain = Arc::new(
141 codec::filter::FilterChain::prepare(&spec.filters).context("preparing video filters")?,
142 );
143
144 let (rungs, hls_root, master_playlist) = match &spec.mode {
145 OutputMode::SingleFile => {
146 let rungs = run_single_file(
147 input.clone(),
148 spec,
149 &header,
150 frame_rate,
151 frames_total,
152 prepared_audio.as_ref(),
153 Arc::clone(&filter_chain),
154 Arc::clone(&sink),
155 )
156 .await?;
157 (rungs, None, None)
158 }
159 OutputMode::Hls { segment_seconds } => {
160 run_hls(
161 input.clone(),
162 spec,
163 *segment_seconds,
164 &header,
165 frame_rate,
166 prepared_audio.as_ref(),
167 Arc::clone(&filter_chain),
168 output_dir,
169 Arc::clone(&sink),
170 )
171 .await?
172 }
173 };
174
175 let completed = rungs.len();
176 sink.on_event(JobEvent::Finished {
177 rungs_completed: completed,
178 rungs_failed: spec.rungs.len().saturating_sub(completed),
179 });
180
181 Ok(JobOutput {
182 rungs,
183 hls_root,
184 master_playlist,
185 source_codec,
186 source_dims,
187 source_frame_rate,
188 audio_handling,
189 elapsed: started.elapsed(),
190 })
191}
192
193pub fn run_job_blocking(
195 input: &[u8],
196 spec: &OutputSpec,
197 output_dir: Option<&Path>,
198 sink: Arc<dyn ProgressSink>,
199) -> Result<JobOutput> {
200 let rt = tokio::runtime::Builder::new_multi_thread()
201 .enable_all()
202 .build()
203 .context("building Tokio runtime")?;
204 rt.block_on(run_job(Bytes::copy_from_slice(input), spec, output_dir, sink))
205}
206
207#[allow(clippy::too_many_arguments)]
212async fn run_single_file(
213 input: Bytes,
214 spec: &OutputSpec,
215 header: &DemuxHeader,
216 frame_rate: f64,
217 frames_total: Option<u64>,
218 audio: Option<&PreparedAudio>,
219 filter_chain: Arc<codec::filter::FilterChain>,
220 sink: Arc<dyn ProgressSink>,
221) -> Result<Vec<RungOutput>> {
222 let total_input_frames = if header.info.total_frames > 0 {
229 header.info.total_frames
230 } else {
231 (header.info.duration * frame_rate).round().max(0.0) as u64
232 };
233 let gpu_pool = multigpu::gpu_pool_for_policy(spec.encode_policy);
234 if matches!(
235 spec.encode_policy,
236 EncodePolicy::AllGpus | EncodePolicy::Family(_)
237 ) && total_input_frames > 0
238 && gpu_pool.capacity() > 1
239 && spec.chunk_seam_mode != crate::spec::ChunkSeamMode::Serial
242 {
243 return run_single_file_multigpu(
244 input,
245 spec,
246 header,
247 frame_rate,
248 total_input_frames,
249 audio,
250 gpu_pool,
251 filter_chain,
252 sink,
253 )
254 .await;
255 }
256
257 let encode_gpu = multigpu::serial_gpu_for_policy(spec.encode_policy);
261 let decode_gpu = spec.decode_gpu.or(encode_gpu);
262 let (output_color_metadata, output_pixel_format) =
263 spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
264 let backend_override = encoder_backend_override();
265 let base_cfg = EncoderConfig {
266 frame_rate,
267 pixel_format: output_pixel_format,
268 color_metadata: output_color_metadata,
269 gpu_index: encode_gpu,
270 ..EncoderConfig::default()
271 };
272 let pump_cfg = DecodePumpConfig {
273 codec_name: header.codec.clone(),
274 info_for_decoder: header.info.clone(),
275 source_color_metadata: header.info.color_metadata,
276 source_pixel_format: header.info.pixel_format,
277 needs_downsample: needs_chroma_downsample(header.info.pixel_format),
278 tonemap_to_sdr: spec.tonemaps(),
279 gpu_index: decode_gpu,
280 filters: Arc::clone(&filter_chain),
281 };
282 let rt = tokio::runtime::Handle::current();
283
284 let mut senders = Vec::with_capacity(spec.rungs.len());
285 let mut handles = Vec::with_capacity(spec.rungs.len());
286 for (idx, rung) in spec.rungs.iter().cloned().enumerate() {
287 let (tx, rx) = tokio::sync::mpsc::channel::<VideoFrame>(FRAME_CHANNEL_CAPACITY);
288 senders.push(tx);
289 let sink = Arc::clone(&sink);
290 let base_cfg = base_cfg.clone();
291 let audio = audio.cloned();
292 let handle = tokio::task::spawn_blocking(move || {
293 let r = encode_rung_single_file(
294 idx, &rung, rx, base_cfg, backend_override, frame_rate, frames_total,
295 audio.as_ref(), sink.as_ref(),
296 );
297 (idx, rung, r)
298 });
299 handles.push(handle);
300 }
301
302 let pump_handle = {
303 let input = input.clone();
304 let rt = rt.clone();
305 tokio::task::spawn_blocking(move || {
306 run_shared_decode_pump_blocking(pump_cfg, input, senders, rt)
307 })
308 };
309
310 let mut outputs = Vec::new();
311 for handle in handles {
312 let (idx, rung, r) = handle.await.context("rung worker task panicked")?;
313 match r {
314 Ok(out) => outputs.push(out),
315 Err(e) => {
316 tracing::warn!(rung = %rung.label, error = %e, "rung failed");
317 report_failed(sink.as_ref(), idx, &rung, &e.to_string());
318 }
319 }
320 }
321 let _ = pump_handle.await.context("decode pump panicked")?.context("decode pump failed")?;
322 if outputs.is_empty() {
323 bail!("all {} rung(s) failed", spec.rungs.len());
324 }
325 Ok(outputs)
326}
327
328#[allow(clippy::too_many_arguments)]
334async fn run_single_file_multigpu(
335 input: Bytes,
336 spec: &OutputSpec,
337 header: &DemuxHeader,
338 frame_rate: f64,
339 total_input_frames: u64,
340 audio: Option<&PreparedAudio>,
341 gpu_pool: Arc<crate::gpu_pool::GpuPool>,
342 filter_chain: Arc<codec::filter::FilterChain>,
343 sink: Arc<dyn ProgressSink>,
344) -> Result<Vec<RungOutput>> {
345 const CHUNK_SECONDS: f64 = 2.0;
346 let timescale = (frame_rate * 1000.0).round().max(1.0) as u32;
347 let per_frame_ticks = (timescale as f64 / frame_rate.max(1.0)).round().max(1.0) as u32;
348 let keyframe_interval = keyframe_interval_for_segment(CHUNK_SECONDS, frame_rate);
349 let segment_target_ticks = (keyframe_interval as u64) * (per_frame_ticks as u64);
350
351 let (output_color_metadata, output_pixel_format) =
352 spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
353 let params = MultiGpuParams {
354 input,
355 rungs: &spec.rungs,
356 header: header.clone(),
357 source_color_metadata: header.info.color_metadata,
358 source_pixel_format: header.info.pixel_format,
359 tonemap_to_sdr: spec.tonemaps(),
360 output_color_metadata,
361 output_pixel_format,
362 needs_downsample: needs_chroma_downsample(header.info.pixel_format),
363 filters: Arc::clone(&filter_chain),
364 frame_rate,
365 gpu_pool,
366 gpu_indices: multigpu::policy_gpu_indices(spec.encode_policy),
367 decode_gpu: spec.decode_gpu,
368 output_root: std::env::temp_dir(),
370 timescale,
371 per_frame_ticks,
372 keyframe_interval,
373 segment_target_ticks,
374 total_input_frames,
375 constant_qp: spec.chunk_seam_mode == crate::spec::ChunkSeamMode::ParallelConstQp,
377 };
378 let rung_packets = multigpu::run_multigpu_single_file(params, Arc::clone(&sink)).await?;
379
380 let mut outputs = Vec::new();
381 for rp in rung_packets.into_iter().flatten() {
382 let label = rp.label.clone();
383 match mux_rung_packets_to_mp4(rp, frame_rate, output_color_metadata, audio) {
384 Ok(out) => outputs.push(out),
385 Err(e) => tracing::warn!(rung = %label, error = %e, "stitching rung MP4 failed"),
386 }
387 }
388 if outputs.is_empty() {
389 bail!("multi-GPU single-file: no rung produced a stitched MP4");
390 }
391 Ok(outputs)
392}
393
394fn mux_rung_packets_to_mp4(
396 rp: RungPackets,
397 frame_rate: f64,
398 color_metadata: ColorMetadata,
399 audio: Option<&PreparedAudio>,
400) -> Result<RungOutput> {
401 let mut muxer =
402 Av1Mp4Muxer::new(rp.width, rp.height, frame_rate).context("Av1Mp4Muxer::new")?;
403 muxer.set_color_metadata(color_metadata);
404 if let Some(a) = audio {
405 if let Err(e) = muxer.with_audio(a.info.clone()) {
406 tracing::warn!(rung = %rp.label, "audio rejected ({e}); video-only");
407 } else {
408 for (sample, dur) in &a.samples {
409 muxer.add_audio_sample(sample, 0, *dur).context("add_audio_sample")?;
410 }
411 }
412 }
413 let frames = rp.packets.len() as u64;
414 for pkt in rp.packets {
415 muxer.add_packet(pkt).context("add_packet")?;
416 }
417 let bytes = muxer.finalize().context("finalize")?.to_vec();
418 let nbytes = bytes.len() as u64;
419 Ok(RungOutput {
420 label: rp.label,
421 width: rp.width,
422 height: rp.height,
423 frames,
424 bytes: nbytes,
425 artifact: RungArtifact::File(bytes),
426 })
427}
428
429#[allow(clippy::too_many_arguments)]
430fn encode_rung_single_file(
431 rung_index: usize,
432 rung: &Rung,
433 mut rx: tokio::sync::mpsc::Receiver<VideoFrame>,
434 mut cfg: EncoderConfig,
435 backend: Option<EncoderBackend>,
436 frame_rate: f64,
437 frames_total: Option<u64>,
438 audio: Option<&PreparedAudio>,
439 sink: &dyn ProgressSink,
440) -> Result<RungOutput> {
441 cfg.width = rung.width;
442 cfg.height = rung.height;
443 rung.quality.apply(&mut cfg, frame_rate);
444
445 let out_color = cfg.color_metadata;
446 let mut encoder = encode::select_encoder(cfg, backend)
447 .with_context(|| format!("creating encoder for rung {}", rung.label))?;
448 let mut muxer = Av1Mp4Muxer::new(rung.width, rung.height, frame_rate).context("Av1Mp4Muxer::new")?;
449 muxer.set_color_metadata(out_color);
450
451 if let Some(a) = audio {
452 if let Err(e) = muxer.with_audio(a.info.clone()) {
453 tracing::warn!(rung = %rung.label, "audio rejected ({e}); video-only");
454 } else {
455 for (sample, dur) in &a.samples {
456 muxer.add_audio_sample(sample, 0, *dur).context("add_audio_sample")?;
457 }
458 }
459 }
460
461 let mut frames: u64 = 0;
462 report(sink, rung_index, rung, RungStatus::Running, 0, frames_total, 0, 0);
463 while let Some(frame) = rx.blocking_recv() {
464 let scaled = colorspace::scale_frame(&frame, rung.width, rung.height).context("scale_frame")?;
465 encoder.send_frame(&scaled).context("send_frame")?;
466 while let Some(pkt) = encoder.receive_packet().context("receive_packet")? {
467 muxer.add_packet(pkt).context("add_packet")?;
468 }
469 frames += 1;
470 if frames % 30 == 0 {
471 report(sink, rung_index, rung, RungStatus::Running, frames, frames_total, 0, 0);
472 }
473 }
474 encoder.flush().context("encoder flush")?;
475 while let Some(pkt) = encoder.receive_packet().context("receive_packet drain")? {
476 muxer.add_packet(pkt).context("add_packet drain")?;
477 }
478 report(sink, rung_index, rung, RungStatus::Finalizing, frames, frames_total, 0, 0);
479 let bytes = muxer.finalize().context("finalize")?.to_vec();
480 let nbytes = bytes.len() as u64;
481 report(sink, rung_index, rung, RungStatus::Completed, frames, frames_total, 0, nbytes);
482
483 Ok(RungOutput {
484 label: rung.label.clone(),
485 width: rung.width,
486 height: rung.height,
487 frames,
488 bytes: nbytes,
489 artifact: RungArtifact::File(bytes),
490 })
491}
492
493#[allow(clippy::too_many_arguments)]
498async fn run_hls(
499 input: Bytes,
500 spec: &OutputSpec,
501 segment_seconds: f32,
502 header: &DemuxHeader,
503 frame_rate: f64,
504 audio: Option<&PreparedAudio>,
505 filter_chain: Arc<codec::filter::FilterChain>,
506 output_dir: Option<&Path>,
507 sink: Arc<dyn ProgressSink>,
508) -> Result<(Vec<RungOutput>, Option<PathBuf>, Option<PathBuf>)> {
509 let root = match output_dir {
510 Some(d) => d.to_path_buf(),
511 None => tempfile::Builder::new()
512 .prefix("rivet-hls-")
513 .tempdir()
514 .context("creating HLS temp dir")?
515 .keep(),
516 };
517
518 let timescale = (frame_rate * 1000.0).round().max(1.0) as u32;
519 let per_frame_ticks = (timescale as f64 / frame_rate.max(1.0)).round().max(1.0) as u32;
520 let keyframe_interval = keyframe_interval_for_segment(segment_seconds as f64, frame_rate);
521 let segment_target_ticks = (keyframe_interval as u64) * (per_frame_ticks as u64);
522 let total_input_frames = if header.info.total_frames > 0 {
523 header.info.total_frames
524 } else {
525 (header.info.duration * frame_rate).round().max(0.0) as u64
526 };
527
528 let gpu_pool = multigpu::gpu_pool_for_policy(spec.encode_policy);
529 let (output_color_metadata, output_pixel_format) =
530 spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
531 let params = MultiGpuParams {
532 input,
533 rungs: &spec.rungs,
534 header: header.clone(),
535 source_color_metadata: header.info.color_metadata,
536 source_pixel_format: header.info.pixel_format,
537 tonemap_to_sdr: spec.tonemaps(),
538 output_color_metadata,
539 output_pixel_format,
540 needs_downsample: needs_chroma_downsample(header.info.pixel_format),
541 filters: Arc::clone(&filter_chain),
542 frame_rate,
543 gpu_pool,
544 gpu_indices: multigpu::policy_gpu_indices(spec.encode_policy),
545 decode_gpu: spec.decode_gpu,
546 output_root: root.clone(),
547 timescale,
548 per_frame_ticks,
549 keyframe_interval,
550 segment_target_ticks,
551 total_input_frames,
552 constant_qp: false,
554 };
555 let manifests = multigpu::run_multigpu_hls(params, Arc::clone(&sink)).await?;
556
557 let mut rung_outputs = Vec::new();
558 let mut video_specs = Vec::new();
559 for (idx, m) in manifests.into_iter().enumerate() {
560 match m {
561 Some(rm) => {
562 let dir = root.join(&rm.relative_dir);
563 let bytes = dir_size(&dir);
564 video_specs.push(build_video_variant_spec(&rm, frame_rate, bytes));
565 rung_outputs.push(RungOutput {
566 label: rm.label.clone(),
567 width: rm.width,
568 height: rm.height,
569 frames: total_input_frames,
570 bytes,
571 artifact: RungArtifact::HlsRendition {
572 dir,
573 relative_dir: rm.relative_dir,
574 },
575 });
576 }
577 None => {
578 if let Some(rung) = spec.rungs.get(idx) {
579 report_failed(sink.as_ref(), idx, rung, "rung produced no segments");
580 }
581 }
582 }
583 }
584 if rung_outputs.is_empty() {
585 bail!("all {} rung(s) failed", spec.rungs.len());
586 }
587
588 let audio_spec = match audio {
589 Some(a) => build_audio_rendition(&root, a, segment_seconds).context("building HLS audio rendition")?,
590 None => None,
591 };
592 let target_duration = segment_seconds.ceil() as u32;
593 let paths = write_hls_package(&root, &video_specs, audio_spec.as_ref(), target_duration)
594 .context("writing HLS package")?;
595
596 Ok((rung_outputs, Some(root), Some(paths.master_path)))
597}
598
599fn build_video_variant_spec(rm: &RungManifest, frame_rate: f64, bytes: u64) -> VideoVariantSpec {
600 let codec_string = cmaf_util::av1_codec_string_from_init(&rm.manifest.init_path)
601 .unwrap_or_else(|_| "av01.0.08M.08.0.110.01.01.01.0".to_string());
602 let (_avg, peak) = cmaf_util::measure_bandwidth(&rm.manifest);
603 let bandwidth = if peak > 0 {
604 peak
605 } else {
606 let dur = rm.manifest.duration_seconds().max(0.001);
607 ((bytes as f64 * 8.0) / dur) as u32
608 };
609 VideoVariantSpec {
610 width: rm.width,
611 height: rm.height,
612 frame_rate,
613 average_bandwidth_bps: bandwidth,
614 bandwidth_bps: bandwidth,
615 codec_string,
616 supplemental_codecs: None,
617 video_range: None,
618 relative_dir: rm.relative_dir.clone(),
619 manifest: rm.manifest.clone(),
620 }
621}
622
623#[derive(Clone)]
628struct PreparedAudio {
629 info: AudioInfo,
630 samples: Vec<(Vec<u8>, u32)>,
631 handling: String,
632}
633
634impl PreparedAudio {
635 fn has_samples(&self) -> bool {
636 !self.samples.is_empty()
637 }
638}
639
640fn prepare_audio(track: Option<&AudioTrack>, policy: AudioPolicy) -> Result<Option<PreparedAudio>> {
641 let Some(track) = track else {
642 return Ok(None);
643 };
644 if policy == AudioPolicy::Drop {
645 return Ok(None);
646 }
647 let codec = track.codec.to_ascii_lowercase();
648 let passthrough_ok = matches!(codec.as_str(), "aac" | "opus" | "ac3" | "eac3");
649 let force_opus = policy == AudioPolicy::ForceOpus;
650
651 if passthrough_ok && !(force_opus && codec != "opus") {
652 let info = passthrough_info(&codec, track);
653 let samples = track
654 .samples
655 .iter()
656 .cloned()
657 .zip(track.durations.iter().copied())
658 .collect();
659 return Ok(Some(PreparedAudio {
660 info,
661 samples,
662 handling: format!("{codec} passthrough"),
663 }));
664 }
665
666 if matches!(codec.as_str(), "mp3" | "vorbis") || force_opus {
667 if track.channels > 2 {
668 tracing::warn!(codec, channels = track.channels, "multichannel audio dropped");
669 return Ok(Some(dropped(format!("{codec} ({}ch)", track.channels))));
670 }
671 if !matches!(codec.as_str(), "mp3" | "vorbis") {
672 tracing::warn!(codec, "cannot transcode to opus; dropping audio");
673 return Ok(Some(dropped(codec)));
674 }
675 let extra: Option<&[u8]> =
676 if track.codec_private.is_empty() { None } else { Some(track.codec_private.as_slice()) };
677 let mut dec = audio_decoder(&codec, extra, track.sample_rate, track.channels as u8)
678 .context("audio decoder")?;
679 let bitrate = if track.channels == 1 { 64_000 } else { 96_000 };
680 let mut enc = audio_encoder(AudioEncoderConfig {
681 codec: AudioCodec::Opus,
682 sample_rate: track.sample_rate,
683 channels: track.channels as u8,
684 bitrate,
685 })
686 .context("opus encoder")?;
687
688 let mut samples: Vec<(Vec<u8>, u32)> = Vec::new();
689 let mut pts: i64 = 0;
690 for packet in &track.samples {
691 for frame in dec.decode(packet, pts).context("audio decode")? {
692 pts = pts.saturating_add((frame.samples.len() as i64) / frame.channels.max(1) as i64);
693 for pkt in enc.encode(&frame).context("opus encode")? {
694 samples.push((pkt.data, pkt.duration as u32));
695 }
696 }
697 }
698 for frame in dec.flush().context("audio flush")? {
699 for pkt in enc.encode(&frame).context("opus encode flush")? {
700 samples.push((pkt.data, pkt.duration as u32));
701 }
702 }
703 for pkt in enc.flush().context("opus encoder flush")? {
704 samples.push((pkt.data, pkt.duration as u32));
705 }
706 let info = AudioInfo::opus(48_000, track.channels, enc.extra_data());
707 return Ok(Some(PreparedAudio {
708 info,
709 samples,
710 handling: format!("{codec} → opus"),
711 }));
712 }
713
714 Ok(Some(dropped(codec)))
715}
716
717fn dropped(codec: String) -> PreparedAudio {
718 PreparedAudio {
719 info: AudioInfo::aac_lc(48_000, 2, Vec::new()),
720 samples: Vec::new(),
721 handling: format!("{codec} dropped"),
722 }
723}
724
725fn passthrough_info(codec: &str, track: &AudioTrack) -> AudioInfo {
726 match codec {
727 "aac" => AudioInfo::aac_lc(track.sample_rate, track.channels, track.asc.clone()),
728 "opus" => AudioInfo::opus(track.sample_rate, track.channels, track.codec_private.clone()),
729 "ac3" => AudioInfo::ac3(track.sample_rate, track.channels, track.codec_private.clone()),
730 "eac3" => AudioInfo::eac3(track.sample_rate, track.channels, track.codec_private.clone()),
731 _ => AudioInfo::aac_lc(track.sample_rate, track.channels, track.asc.clone()),
732 }
733}
734
735fn build_audio_rendition(
736 asset_root: &Path,
737 audio: &PreparedAudio,
738 segment_seconds: f32,
739) -> Result<Option<AudioVariantSpec>> {
740 if !audio.has_samples() {
741 return Ok(None);
742 }
743 let audio_dir = asset_root.join("audio");
744 let seg_target_ticks = (segment_seconds as f64 * audio.info.timescale as f64).round() as u64;
745 let mut muxer = CmafAudioMuxer::new(&audio_dir, audio.info.clone()).context("CmafAudioMuxer::new")?;
746 for (payload, dur) in &audio.samples {
747 add_audio_sample_with_segment_flush(&mut muxer, payload.clone(), *dur, seg_target_ticks)?;
748 }
749 muxer.flush_segment().context("final audio flush_segment")?;
750 let manifest = muxer.finalize().context("CmafAudioMuxer finalize")?;
751
752 let codec_string = match audio.info.codec.as_str() {
753 "opus" => "opus".to_string(),
754 _ => codec::codec_strings::AAC_LC_CODEC_STRING.to_string(),
755 };
756 Ok(Some(AudioVariantSpec {
757 codec_string,
758 channels: audio.info.channels,
759 sample_rate: audio.info.sample_rate,
760 relative_dir: "audio".to_string(),
761 language: "und".to_string(),
762 name: "Audio".to_string(),
763 manifest,
764 }))
765}
766
767fn encoder_backend_override() -> Option<EncoderBackend> {
772 std::env::var("TRANSCODE_ENCODER_BACKEND")
773 .ok()
774 .and_then(|s| match s.to_ascii_lowercase().as_str() {
775 "nvenc" => Some(EncoderBackend::Nvenc),
776 "amf" => Some(EncoderBackend::Amf),
777 "qsv" => Some(EncoderBackend::Qsv),
778 _ => None,
779 })
780}
781
782fn dir_size(dir: &Path) -> u64 {
783 let mut total = 0;
784 if let Ok(entries) = std::fs::read_dir(dir) {
785 for e in entries.flatten() {
786 if let Ok(meta) = e.metadata() {
787 if meta.is_file() {
788 total += meta.len();
789 }
790 }
791 }
792 }
793 total
794}
795
796#[allow(clippy::too_many_arguments)]
797fn report(
798 sink: &dyn ProgressSink,
799 rung_index: usize,
800 rung: &Rung,
801 status: RungStatus,
802 frames_done: u64,
803 frames_total: Option<u64>,
804 segments: u32,
805 bytes_out: u64,
806) {
807 let percent = match status {
808 RungStatus::Completed => 100.0,
809 RungStatus::Pending => 0.0,
810 _ => match frames_total {
811 Some(total) if total > 0 => ((frames_done as f32 / total as f32) * 100.0).min(99.0),
812 _ => {
813 if frames_done == 0 { 1.0 } else { 50.0 }
814 }
815 },
816 };
817 sink.on_rung(RungProgress {
818 rung_index,
819 label: rung.label.clone(),
820 width: rung.width,
821 height: rung.height,
822 status,
823 percent,
824 frames_done,
825 frames_total,
826 segments_written: segments,
827 bytes_out,
828 message: None,
829 });
830}
831
832fn report_failed(sink: &dyn ProgressSink, rung_index: usize, rung: &Rung, message: &str) {
833 sink.on_rung(RungProgress {
834 rung_index,
835 label: rung.label.clone(),
836 width: rung.width,
837 height: rung.height,
838 status: RungStatus::Failed,
839 percent: 0.0,
840 frames_done: 0,
841 frames_total: None,
842 segments_written: 0,
843 bytes_out: 0,
844 message: Some(message.to_string()),
845 });
846}