1use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use anyhow::{Context, Result, bail};
20use bytes::Bytes;
21
22use codec::audio::{
23 AudioCodec, AudioEncoderConfig, create_decoder as audio_decoder,
24 create_encoder as audio_encoder,
25};
26use codec::encode::{self, EncoderBackend, EncoderConfig};
27use codec::frame::{ColorMetadata, VideoFrame};
28use codec::colorspace;
29use container::cmaf::CmafAudioMuxer;
30use container::demux::AudioTrack;
31use container::hls::{AudioVariantSpec, VideoVariantSpec, write_hls_package};
32use container::mux::Av1Mp4Muxer;
33use container::streaming::{self, DemuxHeader};
34use container::AudioInfo;
35
36use crate::cmaf_util::{self, add_audio_sample_with_segment_flush, keyframe_interval_for_segment};
37use crate::decode_pump::{DecodePumpConfig, run_shared_decode_pump_blocking};
38use crate::multigpu::{self, MultiGpuParams, RungManifest, RungPackets};
39use crate::progress::{JobEvent, ProgressSink, RungProgress, RungStatus};
40use crate::spec::{AudioCodecPolicy, EncodePolicy, OutputMode, OutputSpec, Rung};
41use crate::validate::needs_chroma_downsample;
42
43const FRAME_CHANNEL_CAPACITY: usize = 8;
45
46#[derive(Debug)]
48pub enum RungArtifact {
49 File(Vec<u8>),
51 HlsRendition {
53 dir: PathBuf,
54 relative_dir: String,
55 },
56}
57
58#[derive(Debug)]
60pub struct RungOutput {
61 pub label: String,
62 pub width: u32,
63 pub height: u32,
64 pub frames: u64,
65 pub bytes: u64,
66 pub artifact: RungArtifact,
67}
68
69#[derive(Debug)]
71pub struct JobOutput {
72 pub rungs: Vec<RungOutput>,
75 pub hls_root: Option<PathBuf>,
77 pub master_playlist: Option<PathBuf>,
79 pub source_codec: String,
80 pub source_dims: (u32, u32),
81 pub source_frame_rate: f64,
82 pub audio_handling: String,
84 pub elapsed: Duration,
85}
86
87pub async fn run_job(
94 input: Bytes,
95 spec: &OutputSpec,
96 output_dir: Option<&Path>,
97 sink: Arc<dyn ProgressSink>,
98) -> Result<JobOutput> {
99 let started = Instant::now();
100 spec.validate().context("invalid OutputSpec")?;
101
102 let (header, audio_track) = {
103 let demuxer = streaming::demux_streaming(&input).context("demux")?;
104 (demuxer.header().clone(), demuxer.audio().cloned())
105 };
106 let source_codec = header.codec.to_ascii_lowercase();
107 let source_dims = (header.info.width, header.info.height);
108 let source_frame_rate = header.info.frame_rate;
109
110 sink.on_event(JobEvent::Started { rungs: spec.rungs.len() });
111 sink.on_event(JobEvent::Probed {
112 codec: source_codec.clone(),
113 width: header.info.width,
114 height: header.info.height,
115 frame_rate: header.info.frame_rate,
116 audio_codec: audio_track.as_ref().map(|t| t.codec.to_ascii_lowercase()),
117 });
118
119 let frame_rate = {
120 let mut fr = if header.info.frame_rate > 0.0 { header.info.frame_rate } else { 30.0 };
121 if let Some(cap) = spec.max_frame_rate {
122 fr = fr.min(cap);
123 }
124 fr
125 };
126 let frames_total = if header.info.total_frames > 0 {
127 Some(header.info.total_frames)
128 } else {
129 None
130 };
131
132 let prepared_audio = prepare_audio(audio_track.as_ref(), spec.audio).context("preparing audio")?;
133 let audio_handling = prepared_audio
134 .as_ref()
135 .map(|a| a.handling.clone())
136 .unwrap_or_else(|| "none".to_string());
137
138 let filter_chain = Arc::new(
141 codec::filter::FilterChain::prepare(&spec.filters).context("preparing video filters")?,
142 );
143
144 let (rungs, hls_root, master_playlist) = match &spec.mode {
145 OutputMode::SingleFile => {
146 let rungs = run_single_file(
147 input.clone(),
148 spec,
149 &header,
150 frame_rate,
151 frames_total,
152 prepared_audio.as_ref(),
153 Arc::clone(&filter_chain),
154 Arc::clone(&sink),
155 )
156 .await?;
157 (rungs, None, None)
158 }
159 OutputMode::Hls { segment_seconds } => {
160 run_hls(
161 input.clone(),
162 spec,
163 *segment_seconds,
164 &header,
165 frame_rate,
166 prepared_audio.as_ref(),
167 Arc::clone(&filter_chain),
168 output_dir,
169 Arc::clone(&sink),
170 )
171 .await?
172 }
173 };
174
175 let completed = rungs.len();
176 sink.on_event(JobEvent::Finished {
177 rungs_completed: completed,
178 rungs_failed: spec.rungs.len().saturating_sub(completed),
179 });
180
181 Ok(JobOutput {
182 rungs,
183 hls_root,
184 master_playlist,
185 source_codec,
186 source_dims,
187 source_frame_rate,
188 audio_handling,
189 elapsed: started.elapsed(),
190 })
191}
192
193pub fn run_job_blocking(
195 input: &[u8],
196 spec: &OutputSpec,
197 output_dir: Option<&Path>,
198 sink: Arc<dyn ProgressSink>,
199) -> Result<JobOutput> {
200 let rt = tokio::runtime::Builder::new_multi_thread()
201 .enable_all()
202 .build()
203 .context("building Tokio runtime")?;
204 rt.block_on(run_job(Bytes::copy_from_slice(input), spec, output_dir, sink))
205}
206
207#[allow(clippy::too_many_arguments)]
212async fn run_single_file(
213 input: Bytes,
214 spec: &OutputSpec,
215 header: &DemuxHeader,
216 frame_rate: f64,
217 frames_total: Option<u64>,
218 audio: Option<&PreparedAudio>,
219 filter_chain: Arc<codec::filter::FilterChain>,
220 sink: Arc<dyn ProgressSink>,
221) -> Result<Vec<RungOutput>> {
222 let total_input_frames = if header.info.total_frames > 0 {
229 header.info.total_frames
230 } else {
231 (header.info.duration * frame_rate).round().max(0.0) as u64
232 };
233 let gpu_pool = multigpu::gpu_pool_for_policy(spec.encode_policy, spec.video_codec.codec());
234 if matches!(
235 spec.encode_policy,
236 EncodePolicy::AllGpus | EncodePolicy::Family(_)
237 ) && total_input_frames > 0
238 && gpu_pool.capacity() > 1
239 && spec.chunk_seam_mode != crate::spec::ChunkSeamMode::Serial
242 {
243 return run_single_file_multigpu(
248 input,
249 spec,
250 header,
251 frame_rate,
252 total_input_frames,
253 audio,
254 gpu_pool,
255 filter_chain,
256 sink,
257 )
258 .await;
259 }
260
261 let encode_gpu = multigpu::serial_gpu_for_policy(spec.encode_policy);
265 let decode_gpu = spec.decode_gpu.or(encode_gpu);
266 let (output_color_metadata, output_pixel_format) =
267 spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
268 let backend_override = encoder_backend_override();
269 let base_cfg = EncoderConfig {
270 frame_rate,
271 pixel_format: output_pixel_format,
272 color_metadata: output_color_metadata,
273 gpu_index: encode_gpu,
274 codec: spec.video_codec.codec(),
275 ..EncoderConfig::default()
276 };
277 let pump_cfg = DecodePumpConfig {
278 codec_name: header.codec.clone(),
279 info_for_decoder: header.info.clone(),
280 source_color_metadata: header.info.color_metadata,
281 source_pixel_format: header.info.pixel_format,
282 needs_downsample: needs_chroma_downsample(header.info.pixel_format),
283 tonemap_to_sdr: spec.tonemaps(),
284 gpu_index: decode_gpu,
285 filters: Arc::clone(&filter_chain),
286 };
287 let rt = tokio::runtime::Handle::current();
288
289 let mut senders = Vec::with_capacity(spec.rungs.len());
290 let mut handles = Vec::with_capacity(spec.rungs.len());
291 for (idx, rung) in spec.rungs.iter().cloned().enumerate() {
292 let (tx, rx) = tokio::sync::mpsc::channel::<VideoFrame>(FRAME_CHANNEL_CAPACITY);
293 senders.push(tx);
294 let sink = Arc::clone(&sink);
295 let base_cfg = base_cfg.clone();
296 let audio = audio.cloned();
297 let handle = tokio::task::spawn_blocking(move || {
298 let r = encode_rung_single_file(
299 idx, &rung, rx, base_cfg, backend_override, frame_rate, frames_total,
300 audio.as_ref(), sink.as_ref(),
301 );
302 (idx, rung, r)
303 });
304 handles.push(handle);
305 }
306
307 let pump_handle = {
308 let input = input.clone();
309 let rt = rt.clone();
310 tokio::task::spawn_blocking(move || {
311 run_shared_decode_pump_blocking(pump_cfg, input, senders, rt)
312 })
313 };
314
315 let mut outputs = Vec::new();
316 for handle in handles {
317 let (idx, rung, r) = handle.await.context("rung worker task panicked")?;
318 match r {
319 Ok(out) => outputs.push(out),
320 Err(e) => {
321 tracing::warn!(rung = %rung.label, error = %e, "rung failed");
322 report_failed(sink.as_ref(), idx, &rung, &e.to_string());
323 }
324 }
325 }
326 let _ = pump_handle.await.context("decode pump panicked")?.context("decode pump failed")?;
327 if outputs.is_empty() {
328 bail!("all {} rung(s) failed", spec.rungs.len());
329 }
330 Ok(outputs)
331}
332
333#[allow(clippy::too_many_arguments)]
339async fn run_single_file_multigpu(
340 input: Bytes,
341 spec: &OutputSpec,
342 header: &DemuxHeader,
343 frame_rate: f64,
344 total_input_frames: u64,
345 audio: Option<&PreparedAudio>,
346 gpu_pool: Arc<crate::gpu_pool::GpuPool>,
347 filter_chain: Arc<codec::filter::FilterChain>,
348 sink: Arc<dyn ProgressSink>,
349) -> Result<Vec<RungOutput>> {
350 const CHUNK_SECONDS: f64 = 2.0;
351 let timescale = (frame_rate * 1000.0).round().max(1.0) as u32;
352 let per_frame_ticks = (timescale as f64 / frame_rate.max(1.0)).round().max(1.0) as u32;
353 let keyframe_interval = keyframe_interval_for_segment(CHUNK_SECONDS, frame_rate);
354 let segment_target_ticks = (keyframe_interval as u64) * (per_frame_ticks as u64);
355
356 let (output_color_metadata, output_pixel_format) =
357 spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
358 let params = MultiGpuParams {
359 input,
360 codec: spec.video_codec.codec(),
361 rungs: &spec.rungs,
362 header: header.clone(),
363 source_color_metadata: header.info.color_metadata,
364 source_pixel_format: header.info.pixel_format,
365 tonemap_to_sdr: spec.tonemaps(),
366 output_color_metadata,
367 output_pixel_format,
368 needs_downsample: needs_chroma_downsample(header.info.pixel_format),
369 filters: Arc::clone(&filter_chain),
370 frame_rate,
371 gpu_pool,
372 gpu_indices: multigpu::policy_gpu_indices(spec.encode_policy),
373 decode_gpu: spec.decode_gpu,
374 output_root: std::env::temp_dir(),
376 timescale,
377 per_frame_ticks,
378 keyframe_interval,
379 segment_target_ticks,
380 total_input_frames,
381 constant_qp: spec.chunk_seam_mode == crate::spec::ChunkSeamMode::ParallelConstQp,
383 };
384 let rung_packets = multigpu::run_multigpu_single_file(params, Arc::clone(&sink)).await?;
385
386 let mut outputs = Vec::new();
387 for rp in rung_packets.into_iter().flatten() {
388 let label = rp.label.clone();
389 match mux_rung_packets_to_mp4(rp, frame_rate, output_color_metadata, audio) {
390 Ok(out) => outputs.push(out),
391 Err(e) => tracing::warn!(rung = %label, error = %e, "stitching rung MP4 failed"),
392 }
393 }
394 if outputs.is_empty() {
395 bail!("multi-GPU single-file: no rung produced a stitched MP4");
396 }
397 Ok(outputs)
398}
399
400fn mux_rung_packets_to_mp4(
402 rp: RungPackets,
403 frame_rate: f64,
404 color_metadata: ColorMetadata,
405 audio: Option<&PreparedAudio>,
406) -> Result<RungOutput> {
407 let mut muxer = Av1Mp4Muxer::new_with_codec_inline(rp.width, rp.height, frame_rate, rp.codec)
411 .context("Av1Mp4Muxer::new_with_codec_inline")?;
412 muxer.set_color_metadata(color_metadata);
413 if let Some(a) = audio {
414 if let Err(e) = muxer.with_audio(a.info.clone()) {
415 tracing::warn!(rung = %rp.label, "audio rejected ({e}); video-only");
416 } else {
417 for (sample, dur) in &a.samples {
418 muxer.add_audio_sample(sample, 0, *dur).context("add_audio_sample")?;
419 }
420 }
421 }
422 let frames = rp.packets.len() as u64;
423 for pkt in rp.packets {
424 muxer.add_packet(pkt).context("add_packet")?;
425 }
426 let bytes = muxer.finalize().context("finalize")?.to_vec();
427 let nbytes = bytes.len() as u64;
428 Ok(RungOutput {
429 label: rp.label,
430 width: rp.width,
431 height: rp.height,
432 frames,
433 bytes: nbytes,
434 artifact: RungArtifact::File(bytes),
435 })
436}
437
438#[allow(clippy::too_many_arguments)]
439fn encode_rung_single_file(
440 rung_index: usize,
441 rung: &Rung,
442 mut rx: tokio::sync::mpsc::Receiver<VideoFrame>,
443 mut cfg: EncoderConfig,
444 backend: Option<EncoderBackend>,
445 frame_rate: f64,
446 frames_total: Option<u64>,
447 audio: Option<&PreparedAudio>,
448 sink: &dyn ProgressSink,
449) -> Result<RungOutput> {
450 cfg.width = rung.width;
451 cfg.height = rung.height;
452 rung.quality.apply(&mut cfg, frame_rate);
453
454 let out_color = cfg.color_metadata;
455 let out_codec = cfg.codec;
456 let mut encoder = encode::select_encoder(cfg, backend)
457 .with_context(|| format!("creating encoder for rung {}", rung.label))?;
458 let mut muxer = Av1Mp4Muxer::new_with_codec(rung.width, rung.height, frame_rate, out_codec)
459 .context("Av1Mp4Muxer::new_with_codec")?;
460 muxer.set_color_metadata(out_color);
461
462 if let Some(a) = audio {
463 if let Err(e) = muxer.with_audio(a.info.clone()) {
464 tracing::warn!(rung = %rung.label, "audio rejected ({e}); video-only");
465 } else {
466 for (sample, dur) in &a.samples {
467 muxer.add_audio_sample(sample, 0, *dur).context("add_audio_sample")?;
468 }
469 }
470 }
471
472 let mut frames: u64 = 0;
473 report(sink, rung_index, rung, RungStatus::Running, 0, frames_total, 0, 0);
474 while let Some(frame) = rx.blocking_recv() {
475 let scaled = colorspace::scale_frame(&frame, rung.width, rung.height).context("scale_frame")?;
476 encoder.send_frame(&scaled).context("send_frame")?;
477 while let Some(pkt) = encoder.receive_packet().context("receive_packet")? {
478 muxer.add_packet(pkt).context("add_packet")?;
479 }
480 frames += 1;
481 if frames % 30 == 0 {
482 report(sink, rung_index, rung, RungStatus::Running, frames, frames_total, 0, 0);
483 }
484 }
485 encoder.flush().context("encoder flush")?;
486 while let Some(pkt) = encoder.receive_packet().context("receive_packet drain")? {
487 muxer.add_packet(pkt).context("add_packet drain")?;
488 }
489 report(sink, rung_index, rung, RungStatus::Finalizing, frames, frames_total, 0, 0);
490 let bytes = muxer.finalize().context("finalize")?.to_vec();
491 let nbytes = bytes.len() as u64;
492 report(sink, rung_index, rung, RungStatus::Completed, frames, frames_total, 0, nbytes);
493
494 Ok(RungOutput {
495 label: rung.label.clone(),
496 width: rung.width,
497 height: rung.height,
498 frames,
499 bytes: nbytes,
500 artifact: RungArtifact::File(bytes),
501 })
502}
503
504#[allow(clippy::too_many_arguments)]
509async fn run_hls(
510 input: Bytes,
511 spec: &OutputSpec,
512 segment_seconds: f32,
513 header: &DemuxHeader,
514 frame_rate: f64,
515 audio: Option<&PreparedAudio>,
516 filter_chain: Arc<codec::filter::FilterChain>,
517 output_dir: Option<&Path>,
518 sink: Arc<dyn ProgressSink>,
519) -> Result<(Vec<RungOutput>, Option<PathBuf>, Option<PathBuf>)> {
520 let root = match output_dir {
521 Some(d) => d.to_path_buf(),
522 None => tempfile::Builder::new()
523 .prefix("rivet-hls-")
524 .tempdir()
525 .context("creating HLS temp dir")?
526 .keep(),
527 };
528
529 let timescale = (frame_rate * 1000.0).round().max(1.0) as u32;
530 let per_frame_ticks = (timescale as f64 / frame_rate.max(1.0)).round().max(1.0) as u32;
531 let keyframe_interval = keyframe_interval_for_segment(segment_seconds as f64, frame_rate);
532 let segment_target_ticks = (keyframe_interval as u64) * (per_frame_ticks as u64);
533 let total_input_frames = if header.info.total_frames > 0 {
534 header.info.total_frames
535 } else {
536 (header.info.duration * frame_rate).round().max(0.0) as u64
537 };
538
539 let gpu_pool = multigpu::gpu_pool_for_policy(spec.encode_policy, spec.video_codec.codec());
540 let (output_color_metadata, output_pixel_format) =
541 spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
542 let params = MultiGpuParams {
543 input,
544 codec: spec.video_codec.codec(),
545 rungs: &spec.rungs,
546 header: header.clone(),
547 source_color_metadata: header.info.color_metadata,
548 source_pixel_format: header.info.pixel_format,
549 tonemap_to_sdr: spec.tonemaps(),
550 output_color_metadata,
551 output_pixel_format,
552 needs_downsample: needs_chroma_downsample(header.info.pixel_format),
553 filters: Arc::clone(&filter_chain),
554 frame_rate,
555 gpu_pool,
556 gpu_indices: multigpu::policy_gpu_indices(spec.encode_policy),
557 decode_gpu: spec.decode_gpu,
558 output_root: root.clone(),
559 timescale,
560 per_frame_ticks,
561 keyframe_interval,
562 segment_target_ticks,
563 total_input_frames,
564 constant_qp: false,
566 };
567 let manifests = multigpu::run_multigpu_hls(params, Arc::clone(&sink)).await?;
568
569 let mut rung_outputs = Vec::new();
570 let mut video_specs = Vec::new();
571 for (idx, m) in manifests.into_iter().enumerate() {
572 match m {
573 Some(rm) => {
574 let dir = root.join(&rm.relative_dir);
575 let bytes = dir_size(&dir);
576 video_specs.push(build_video_variant_spec(&rm, frame_rate, bytes));
577 rung_outputs.push(RungOutput {
578 label: rm.label.clone(),
579 width: rm.width,
580 height: rm.height,
581 frames: total_input_frames,
582 bytes,
583 artifact: RungArtifact::HlsRendition {
584 dir,
585 relative_dir: rm.relative_dir,
586 },
587 });
588 }
589 None => {
590 if let Some(rung) = spec.rungs.get(idx) {
591 report_failed(sink.as_ref(), idx, rung, "rung produced no segments");
592 }
593 }
594 }
595 }
596 if rung_outputs.is_empty() {
597 bail!("all {} rung(s) failed", spec.rungs.len());
598 }
599
600 let audio_spec = match audio {
601 Some(a) => build_audio_rendition(&root, a, segment_seconds).context("building HLS audio rendition")?,
602 None => None,
603 };
604 let target_duration = segment_seconds.ceil() as u32;
605 let paths = write_hls_package(&root, &video_specs, audio_spec.as_ref(), target_duration)
606 .context("writing HLS package")?;
607
608 Ok((rung_outputs, Some(root), Some(paths.master_path)))
609}
610
611fn build_video_variant_spec(rm: &RungManifest, frame_rate: f64, bytes: u64) -> VideoVariantSpec {
612 let codec_string = cmaf_util::codec_string_from_init(&rm.manifest.init_path)
613 .unwrap_or_else(|_| "av01.0.08M.08.0.110.01.01.01.0".to_string());
614 let (_avg, peak) = cmaf_util::measure_bandwidth(&rm.manifest);
615 let bandwidth = if peak > 0 {
616 peak
617 } else {
618 let dur = rm.manifest.duration_seconds().max(0.001);
619 ((bytes as f64 * 8.0) / dur) as u32
620 };
621 VideoVariantSpec {
622 width: rm.width,
623 height: rm.height,
624 frame_rate,
625 average_bandwidth_bps: bandwidth,
626 bandwidth_bps: bandwidth,
627 codec_string,
628 supplemental_codecs: None,
629 video_range: None,
630 relative_dir: rm.relative_dir.clone(),
631 manifest: rm.manifest.clone(),
632 }
633}
634
635#[derive(Clone)]
640struct PreparedAudio {
641 info: AudioInfo,
642 samples: Vec<(Vec<u8>, u32)>,
643 handling: String,
644}
645
646impl PreparedAudio {
647 fn has_samples(&self) -> bool {
648 !self.samples.is_empty()
649 }
650}
651
652fn prepare_audio(track: Option<&AudioTrack>, policy: AudioCodecPolicy) -> Result<Option<PreparedAudio>> {
653 let Some(track) = track else {
654 return Ok(None);
655 };
656 if policy == AudioCodecPolicy::Drop {
657 return Ok(None);
658 }
659 let codec = track.codec.to_ascii_lowercase();
660 let passthrough_ok = matches!(codec.as_str(), "aac" | "opus" | "ac3" | "eac3");
661 let force_opus = policy == AudioCodecPolicy::ForceOpus;
662
663 if passthrough_ok && !(force_opus && codec != "opus") {
664 let info = passthrough_info(&codec, track);
665 let samples = track
666 .samples
667 .iter()
668 .cloned()
669 .zip(track.durations.iter().copied())
670 .collect();
671 return Ok(Some(PreparedAudio {
672 info,
673 samples,
674 handling: format!("{codec} passthrough"),
675 }));
676 }
677
678 if matches!(codec.as_str(), "mp3" | "vorbis") || force_opus {
679 if track.channels > 2 {
680 tracing::warn!(codec, channels = track.channels, "multichannel audio dropped");
681 return Ok(Some(dropped(format!("{codec} ({}ch)", track.channels))));
682 }
683 if !matches!(codec.as_str(), "mp3" | "vorbis") {
684 tracing::warn!(codec, "cannot transcode to opus; dropping audio");
685 return Ok(Some(dropped(codec)));
686 }
687 let extra: Option<&[u8]> =
688 if track.codec_private.is_empty() { None } else { Some(track.codec_private.as_slice()) };
689 let mut dec = audio_decoder(&codec, extra, track.sample_rate, track.channels as u8)
690 .context("audio decoder")?;
691 let bitrate = if track.channels == 1 { 64_000 } else { 96_000 };
692 let mut enc = audio_encoder(AudioEncoderConfig {
693 codec: AudioCodec::Opus,
694 sample_rate: track.sample_rate,
695 channels: track.channels as u8,
696 bitrate,
697 })
698 .context("opus encoder")?;
699
700 let mut samples: Vec<(Vec<u8>, u32)> = Vec::new();
701 let mut pts: i64 = 0;
702 for packet in &track.samples {
703 for frame in dec.decode(packet, pts).context("audio decode")? {
704 pts = pts.saturating_add((frame.samples.len() as i64) / frame.channels.max(1) as i64);
705 for pkt in enc.encode(&frame).context("opus encode")? {
706 samples.push((pkt.data, pkt.duration as u32));
707 }
708 }
709 }
710 for frame in dec.flush().context("audio flush")? {
711 for pkt in enc.encode(&frame).context("opus encode flush")? {
712 samples.push((pkt.data, pkt.duration as u32));
713 }
714 }
715 for pkt in enc.flush().context("opus encoder flush")? {
716 samples.push((pkt.data, pkt.duration as u32));
717 }
718 let info = AudioInfo::opus(48_000, track.channels, enc.extra_data());
719 return Ok(Some(PreparedAudio {
720 info,
721 samples,
722 handling: format!("{codec} → opus"),
723 }));
724 }
725
726 Ok(Some(dropped(codec)))
727}
728
729fn dropped(codec: String) -> PreparedAudio {
730 PreparedAudio {
731 info: AudioInfo::aac_lc(48_000, 2, Vec::new()),
732 samples: Vec::new(),
733 handling: format!("{codec} dropped"),
734 }
735}
736
737fn passthrough_info(codec: &str, track: &AudioTrack) -> AudioInfo {
738 match codec {
739 "aac" => AudioInfo::aac_lc(track.sample_rate, track.channels, track.asc.clone()),
740 "opus" => AudioInfo::opus(track.sample_rate, track.channels, track.codec_private.clone()),
741 "ac3" => AudioInfo::ac3(track.sample_rate, track.channels, track.codec_private.clone()),
742 "eac3" => AudioInfo::eac3(track.sample_rate, track.channels, track.codec_private.clone()),
743 _ => AudioInfo::aac_lc(track.sample_rate, track.channels, track.asc.clone()),
744 }
745}
746
747fn build_audio_rendition(
748 asset_root: &Path,
749 audio: &PreparedAudio,
750 segment_seconds: f32,
751) -> Result<Option<AudioVariantSpec>> {
752 if !audio.has_samples() {
753 return Ok(None);
754 }
755 let audio_dir = asset_root.join("audio");
756 let seg_target_ticks = (segment_seconds as f64 * audio.info.timescale as f64).round() as u64;
757 let mut muxer = CmafAudioMuxer::new(&audio_dir, audio.info.clone()).context("CmafAudioMuxer::new")?;
758 for (payload, dur) in &audio.samples {
759 add_audio_sample_with_segment_flush(&mut muxer, payload.clone(), *dur, seg_target_ticks)?;
760 }
761 muxer.flush_segment().context("final audio flush_segment")?;
762 let manifest = muxer.finalize().context("CmafAudioMuxer finalize")?;
763
764 let codec_string = match audio.info.codec.as_str() {
765 "opus" => "opus".to_string(),
766 _ => codec::codec_strings::AAC_LC_CODEC_STRING.to_string(),
767 };
768 Ok(Some(AudioVariantSpec {
769 codec_string,
770 channels: audio.info.channels,
771 sample_rate: audio.info.sample_rate,
772 relative_dir: "audio".to_string(),
773 language: "und".to_string(),
774 name: "Audio".to_string(),
775 manifest,
776 }))
777}
778
779fn encoder_backend_override() -> Option<EncoderBackend> {
784 std::env::var("TRANSCODE_ENCODER_BACKEND")
785 .ok()
786 .and_then(|s| match s.to_ascii_lowercase().as_str() {
787 "nvenc" => Some(EncoderBackend::Nvenc),
788 "amf" => Some(EncoderBackend::Amf),
789 "qsv" => Some(EncoderBackend::Qsv),
790 _ => None,
791 })
792}
793
794fn dir_size(dir: &Path) -> u64 {
795 let mut total = 0;
796 if let Ok(entries) = std::fs::read_dir(dir) {
797 for e in entries.flatten() {
798 if let Ok(meta) = e.metadata() {
799 if meta.is_file() {
800 total += meta.len();
801 }
802 }
803 }
804 }
805 total
806}
807
808#[allow(clippy::too_many_arguments)]
809fn report(
810 sink: &dyn ProgressSink,
811 rung_index: usize,
812 rung: &Rung,
813 status: RungStatus,
814 frames_done: u64,
815 frames_total: Option<u64>,
816 segments: u32,
817 bytes_out: u64,
818) {
819 let percent = match status {
820 RungStatus::Completed => 100.0,
821 RungStatus::Pending => 0.0,
822 _ => match frames_total {
823 Some(total) if total > 0 => ((frames_done as f32 / total as f32) * 100.0).min(99.0),
824 _ => {
825 if frames_done == 0 { 1.0 } else { 50.0 }
826 }
827 },
828 };
829 sink.on_rung(RungProgress {
830 rung_index,
831 label: rung.label.clone(),
832 width: rung.width,
833 height: rung.height,
834 status,
835 percent,
836 frames_done,
837 frames_total,
838 segments_written: segments,
839 bytes_out,
840 message: None,
841 });
842}
843
844fn report_failed(sink: &dyn ProgressSink, rung_index: usize, rung: &Rung, message: &str) {
845 sink.on_rung(RungProgress {
846 rung_index,
847 label: rung.label.clone(),
848 width: rung.width,
849 height: rung.height,
850 status: RungStatus::Failed,
851 percent: 0.0,
852 frames_done: 0,
853 frames_total: None,
854 segments_written: 0,
855 bytes_out: 0,
856 message: Some(message.to_string()),
857 });
858}