Skip to main content

rivet/
job.rs

1//! The transcode job engine.
2//!
3//! [`run_job`] takes an input buffer and an [`OutputSpec`] and drives the
4//! whole pipeline: demux → shared decode pump (decode once) → fan out to per-
5//! rung work → assemble the requested output mode. Progress is streamed
6//! through a [`ProgressSink`] as a uniform [`RungProgress`] per rung.
7//!
8//! - **SingleFile** mode: the decode pump fans frames to one per-rung worker
9//!   that scales + encodes + muxes a self-contained MP4.
10//! - **Hls** mode: the [`crate::multigpu`] orchestrator decodes once and
11//!   schedules every rung's CMAF segments across all GPUs (fair lease pool +
12//!   mid-flight helper dispatch + cross-vendor codec invariant), then this
13//!   module assembles the HLS package (audio rendition + playlists).
14
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use anyhow::{Context, Result, bail};
20use bytes::Bytes;
21
22use codec::audio::{
23    AudioCodec, AudioEncoderConfig, create_decoder as audio_decoder,
24    create_encoder as audio_encoder,
25};
26use codec::encode::{self, EncoderBackend, EncoderConfig};
27use codec::frame::{ColorMetadata, VideoFrame};
28use codec::colorspace;
29use container::cmaf::CmafAudioMuxer;
30use container::demux::AudioTrack;
31use container::hls::{AudioVariantSpec, VideoVariantSpec, write_hls_package};
32use container::mux::Av1Mp4Muxer;
33use container::streaming::{self, DemuxHeader};
34use container::AudioInfo;
35
36use crate::cmaf_util::{self, add_audio_sample_with_segment_flush, keyframe_interval_for_segment};
37use crate::decode_pump::{DecodePumpConfig, run_shared_decode_pump_blocking};
38use crate::multigpu::{self, MultiGpuParams, RungManifest, RungPackets};
39use crate::progress::{JobEvent, ProgressSink, RungProgress, RungStatus};
40use crate::spec::{AudioCodecPolicy, EncodePolicy, OutputMode, OutputSpec, Rung};
41use crate::validate::needs_chroma_downsample;
42
43/// Bounded per-rung frame channel — backpressures the decode pump.
44const FRAME_CHANNEL_CAPACITY: usize = 8;
45
46/// The artifact one rung produced.
47#[derive(Debug)]
48pub enum RungArtifact {
49    /// A single self-contained file (MP4 bytes).
50    File(Vec<u8>),
51    /// An HLS rendition: a directory of CMAF segments + a media playlist.
52    HlsRendition {
53        dir: PathBuf,
54        relative_dir: String,
55    },
56}
57
58/// Result for one completed rung.
59#[derive(Debug)]
60pub struct RungOutput {
61    pub label: String,
62    pub width: u32,
63    pub height: u32,
64    pub frames: u64,
65    pub bytes: u64,
66    pub artifact: RungArtifact,
67}
68
69/// The full job result.
70#[derive(Debug)]
71pub struct JobOutput {
72    /// One entry per rung that completed successfully (failed rungs are
73    /// reported via the progress sink with [`RungStatus::Failed`]).
74    pub rungs: Vec<RungOutput>,
75    /// HLS mode only: the asset root directory.
76    pub hls_root: Option<PathBuf>,
77    /// HLS mode only: path to the master playlist.
78    pub master_playlist: Option<PathBuf>,
79    pub source_codec: String,
80    pub source_dims: (u32, u32),
81    pub source_frame_rate: f64,
82    /// How the audio was handled.
83    pub audio_handling: String,
84    pub elapsed: Duration,
85}
86
87/// Run a transcode job. Async — call from within a Tokio runtime.
88///
89/// For [`OutputMode::Hls`], `output_dir` is the asset root the HLS package is
90/// written under; `None` uses a fresh temp directory (returned in
91/// [`JobOutput::hls_root`]). For [`OutputMode::SingleFile`] `output_dir` is
92/// ignored (bytes are returned).
93pub async fn run_job(
94    input: Bytes,
95    spec: &OutputSpec,
96    output_dir: Option<&Path>,
97    sink: Arc<dyn ProgressSink>,
98) -> Result<JobOutput> {
99    let started = Instant::now();
100    spec.validate().context("invalid OutputSpec")?;
101
102    let (header, audio_track) = {
103        let demuxer = streaming::demux_streaming(&input).context("demux")?;
104        (demuxer.header().clone(), demuxer.audio().cloned())
105    };
106    let source_codec = header.codec.to_ascii_lowercase();
107    let source_dims = (header.info.width, header.info.height);
108    let source_frame_rate = header.info.frame_rate;
109
110    sink.on_event(JobEvent::Started { rungs: spec.rungs.len() });
111    sink.on_event(JobEvent::Probed {
112        codec: source_codec.clone(),
113        width: header.info.width,
114        height: header.info.height,
115        frame_rate: header.info.frame_rate,
116        audio_codec: audio_track.as_ref().map(|t| t.codec.to_ascii_lowercase()),
117    });
118
119    let frame_rate = {
120        let mut fr = if header.info.frame_rate > 0.0 { header.info.frame_rate } else { 30.0 };
121        if let Some(cap) = spec.max_frame_rate {
122            fr = fr.min(cap);
123        }
124        fr
125    };
126    let frames_total = if header.info.total_frames > 0 {
127        Some(header.info.total_frames)
128    } else {
129        None
130    };
131
132    let prepared_audio = prepare_audio(audio_track.as_ref(), spec.audio).context("preparing audio")?;
133    let audio_handling = prepared_audio
134        .as_ref()
135        .map(|a| a.handling.clone())
136        .unwrap_or_else(|| "none".to_string());
137
138    // Prepare the video filter chain once (loads any overlay images), then share
139    // the Arc with every decode pump / multi-GPU param built below.
140    let filter_chain = Arc::new(
141        codec::filter::FilterChain::prepare(&spec.filters).context("preparing video filters")?,
142    );
143
144    let (rungs, hls_root, master_playlist) = match &spec.mode {
145        OutputMode::SingleFile => {
146            let rungs = run_single_file(
147                input.clone(),
148                spec,
149                &header,
150                frame_rate,
151                frames_total,
152                prepared_audio.as_ref(),
153                Arc::clone(&filter_chain),
154                Arc::clone(&sink),
155            )
156            .await?;
157            (rungs, None, None)
158        }
159        OutputMode::Hls { segment_seconds } => {
160            run_hls(
161                input.clone(),
162                spec,
163                *segment_seconds,
164                &header,
165                frame_rate,
166                prepared_audio.as_ref(),
167                Arc::clone(&filter_chain),
168                output_dir,
169                Arc::clone(&sink),
170            )
171            .await?
172        }
173    };
174
175    let completed = rungs.len();
176    sink.on_event(JobEvent::Finished {
177        rungs_completed: completed,
178        rungs_failed: spec.rungs.len().saturating_sub(completed),
179    });
180
181    Ok(JobOutput {
182        rungs,
183        hls_root,
184        master_playlist,
185        source_codec,
186        source_dims,
187        source_frame_rate,
188        audio_handling,
189        elapsed: started.elapsed(),
190    })
191}
192
193/// Synchronous wrapper that builds a multi-threaded Tokio runtime.
194pub fn run_job_blocking(
195    input: &[u8],
196    spec: &OutputSpec,
197    output_dir: Option<&Path>,
198    sink: Arc<dyn ProgressSink>,
199) -> Result<JobOutput> {
200    let rt = tokio::runtime::Builder::new_multi_thread()
201        .enable_all()
202        .build()
203        .context("building Tokio runtime")?;
204    rt.block_on(run_job(Bytes::copy_from_slice(input), spec, output_dir, sink))
205}
206
207// ---------------------------------------------------------------------------
208// SingleFile: decode-once fan-out to per-rung MP4 workers
209// ---------------------------------------------------------------------------
210
211#[allow(clippy::too_many_arguments)]
212async fn run_single_file(
213    input: Bytes,
214    spec: &OutputSpec,
215    header: &DemuxHeader,
216    frame_rate: f64,
217    frames_total: Option<u64>,
218    audio: Option<&PreparedAudio>,
219    filter_chain: Arc<codec::filter::FilterChain>,
220    sink: Arc<dyn ProgressSink>,
221) -> Result<Vec<RungOutput>> {
222    // When the frame count is known and the host has more than one GPU, run the
223    // multi-GPU engine for single-file too: decode once, chunk each rung at
224    // GOP boundaries, encode the chunks across all GPUs (fair lease pool +
225    // helper dispatch + cross-vendor codec invariant), then stitch the packets,
226    // in segment order, into one MP4 per rung. On a single-GPU host (or unknown
227    // frame count) the serial path below is used unchanged — no chunk overhead.
228    let total_input_frames = if header.info.total_frames > 0 {
229        header.info.total_frames
230    } else {
231        (header.info.duration * frame_rate).round().max(0.0) as u64
232    };
233    let gpu_pool = multigpu::gpu_pool_for_policy(spec.encode_policy, spec.video_codec.codec());
234    if matches!(
235        spec.encode_policy,
236        EncodePolicy::AllGpus | EncodePolicy::Family(_)
237    ) && total_input_frames > 0
238        && gpu_pool.capacity() > 1
239        // `ChunkSeamMode::Serial` forces one encoder (seam-free) even on a
240        // multi-GPU host — skip the chunk-and-stitch path entirely.
241        && spec.chunk_seam_mode != crate::spec::ChunkSeamMode::Serial
242    {
243        // The chunk-and-stitch path's codec invariant now handles av1C / avcC /
244        // hvcC, so AV1, H.264, and H.265 all chunk across GPUs. Each chunk is a
245        // closed GOP (first frame an IDR), so stitched H.264/H.265 streams reset
246        // refs cleanly at every chunk boundary.
247        return run_single_file_multigpu(
248            input,
249            spec,
250            header,
251            frame_rate,
252            total_input_frames,
253            audio,
254            gpu_pool,
255            filter_chain,
256            sink,
257        )
258        .await;
259    }
260
261    // Serial path: encode on the policy's GPU (the vendor's first device for
262    // Family, the pinned index for SingleGpu, auto for AllGpus); decode follows
263    // the explicit decode_gpu override, else the same GPU as encode.
264    let encode_gpu = multigpu::serial_gpu_for_policy(spec.encode_policy);
265    let decode_gpu = spec.decode_gpu.or(encode_gpu);
266    let (output_color_metadata, output_pixel_format) =
267        spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
268    let backend_override = encoder_backend_override();
269    let base_cfg = EncoderConfig {
270        frame_rate,
271        pixel_format: output_pixel_format,
272        color_metadata: output_color_metadata,
273        gpu_index: encode_gpu,
274        codec: spec.video_codec.codec(),
275        ..EncoderConfig::default()
276    };
277    let pump_cfg = DecodePumpConfig {
278        codec_name: header.codec.clone(),
279        info_for_decoder: header.info.clone(),
280        source_color_metadata: header.info.color_metadata,
281        source_pixel_format: header.info.pixel_format,
282        needs_downsample: needs_chroma_downsample(header.info.pixel_format),
283        tonemap_to_sdr: spec.tonemaps(),
284        gpu_index: decode_gpu,
285        filters: Arc::clone(&filter_chain),
286    };
287    let rt = tokio::runtime::Handle::current();
288
289    let mut senders = Vec::with_capacity(spec.rungs.len());
290    let mut handles = Vec::with_capacity(spec.rungs.len());
291    for (idx, rung) in spec.rungs.iter().cloned().enumerate() {
292        let (tx, rx) = tokio::sync::mpsc::channel::<VideoFrame>(FRAME_CHANNEL_CAPACITY);
293        senders.push(tx);
294        let sink = Arc::clone(&sink);
295        let base_cfg = base_cfg.clone();
296        let audio = audio.cloned();
297        let handle = tokio::task::spawn_blocking(move || {
298            let r = encode_rung_single_file(
299                idx, &rung, rx, base_cfg, backend_override, frame_rate, frames_total,
300                audio.as_ref(), sink.as_ref(),
301            );
302            (idx, rung, r)
303        });
304        handles.push(handle);
305    }
306
307    let pump_handle = {
308        let input = input.clone();
309        let rt = rt.clone();
310        tokio::task::spawn_blocking(move || {
311            run_shared_decode_pump_blocking(pump_cfg, input, senders, rt)
312        })
313    };
314
315    let mut outputs = Vec::new();
316    for handle in handles {
317        let (idx, rung, r) = handle.await.context("rung worker task panicked")?;
318        match r {
319            Ok(out) => outputs.push(out),
320            Err(e) => {
321                tracing::warn!(rung = %rung.label, error = %e, "rung failed");
322                report_failed(sink.as_ref(), idx, &rung, &e.to_string());
323            }
324        }
325    }
326    let _ = pump_handle.await.context("decode pump panicked")?.context("decode pump failed")?;
327    if outputs.is_empty() {
328        bail!("all {} rung(s) failed", spec.rungs.len());
329    }
330    Ok(outputs)
331}
332
333/// Single-file via the multi-GPU engine: chunk each rung across GPUs, then
334/// stitch the packets into one MP4 per rung (no disk round-trip — packets stay
335/// in memory). Chunk length is a 2 s GOP so each chunk is an independently
336/// decodable IDR sequence; the cross-vendor codec invariant keeps every chunk's
337/// `av1C` contract identical so cross-GPU/-vendor stitching is bit-safe.
338#[allow(clippy::too_many_arguments)]
339async fn run_single_file_multigpu(
340    input: Bytes,
341    spec: &OutputSpec,
342    header: &DemuxHeader,
343    frame_rate: f64,
344    total_input_frames: u64,
345    audio: Option<&PreparedAudio>,
346    gpu_pool: Arc<crate::gpu_pool::GpuPool>,
347    filter_chain: Arc<codec::filter::FilterChain>,
348    sink: Arc<dyn ProgressSink>,
349) -> Result<Vec<RungOutput>> {
350    const CHUNK_SECONDS: f64 = 2.0;
351    let timescale = (frame_rate * 1000.0).round().max(1.0) as u32;
352    let per_frame_ticks = (timescale as f64 / frame_rate.max(1.0)).round().max(1.0) as u32;
353    let keyframe_interval = keyframe_interval_for_segment(CHUNK_SECONDS, frame_rate);
354    let segment_target_ticks = (keyframe_interval as u64) * (per_frame_ticks as u64);
355
356    let (output_color_metadata, output_pixel_format) =
357        spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
358    let params = MultiGpuParams {
359        input,
360        codec: spec.video_codec.codec(),
361        rungs: &spec.rungs,
362        header: header.clone(),
363        source_color_metadata: header.info.color_metadata,
364        source_pixel_format: header.info.pixel_format,
365        tonemap_to_sdr: spec.tonemaps(),
366        output_color_metadata,
367        output_pixel_format,
368        needs_downsample: needs_chroma_downsample(header.info.pixel_format),
369        filters: Arc::clone(&filter_chain),
370        frame_rate,
371        gpu_pool,
372        gpu_indices: multigpu::policy_gpu_indices(spec.encode_policy),
373        decode_gpu: spec.decode_gpu,
374        // Chunk workers collect packets in memory; output_root is unused.
375        output_root: std::env::temp_dir(),
376        timescale,
377        per_frame_ticks,
378        keyframe_interval,
379        segment_target_ticks,
380        total_input_frames,
381        // ParallelConstQp ⇒ force constant-QP chunks so stitched seams are flat.
382        constant_qp: spec.chunk_seam_mode == crate::spec::ChunkSeamMode::ParallelConstQp,
383    };
384    let rung_packets = multigpu::run_multigpu_single_file(params, Arc::clone(&sink)).await?;
385
386    let mut outputs = Vec::new();
387    for rp in rung_packets.into_iter().flatten() {
388        let label = rp.label.clone();
389        match mux_rung_packets_to_mp4(rp, frame_rate, output_color_metadata, audio) {
390            Ok(out) => outputs.push(out),
391            Err(e) => tracing::warn!(rung = %label, error = %e, "stitching rung MP4 failed"),
392        }
393    }
394    if outputs.is_empty() {
395        bail!("multi-GPU single-file: no rung produced a stitched MP4");
396    }
397    Ok(outputs)
398}
399
400/// Stitch one rung's ordered AV1 packets (+ optional audio) into an MP4.
401fn mux_rung_packets_to_mp4(
402    rp: RungPackets,
403    frame_rate: f64,
404    color_metadata: ColorMetadata,
405    audio: Option<&PreparedAudio>,
406) -> Result<RungOutput> {
407    // Multi-GPU stitch: chunks come from independent encoders (possibly
408    // different vendors), so keep parameter sets inline per access unit
409    // (avc3/hev1 for H.264/H.265). AV1 ignores the flag (it stores OBUs verbatim).
410    let mut muxer = Av1Mp4Muxer::new_with_codec_inline(rp.width, rp.height, frame_rate, rp.codec)
411        .context("Av1Mp4Muxer::new_with_codec_inline")?;
412    muxer.set_color_metadata(color_metadata);
413    if let Some(a) = audio {
414        if let Err(e) = muxer.with_audio(a.info.clone()) {
415            tracing::warn!(rung = %rp.label, "audio rejected ({e}); video-only");
416        } else {
417            for (sample, dur) in &a.samples {
418                muxer.add_audio_sample(sample, 0, *dur).context("add_audio_sample")?;
419            }
420        }
421    }
422    let frames = rp.packets.len() as u64;
423    for pkt in rp.packets {
424        muxer.add_packet(pkt).context("add_packet")?;
425    }
426    let bytes = muxer.finalize().context("finalize")?.to_vec();
427    let nbytes = bytes.len() as u64;
428    Ok(RungOutput {
429        label: rp.label,
430        width: rp.width,
431        height: rp.height,
432        frames,
433        bytes: nbytes,
434        artifact: RungArtifact::File(bytes),
435    })
436}
437
438#[allow(clippy::too_many_arguments)]
439fn encode_rung_single_file(
440    rung_index: usize,
441    rung: &Rung,
442    mut rx: tokio::sync::mpsc::Receiver<VideoFrame>,
443    mut cfg: EncoderConfig,
444    backend: Option<EncoderBackend>,
445    frame_rate: f64,
446    frames_total: Option<u64>,
447    audio: Option<&PreparedAudio>,
448    sink: &dyn ProgressSink,
449) -> Result<RungOutput> {
450    cfg.width = rung.width;
451    cfg.height = rung.height;
452    rung.quality.apply(&mut cfg, frame_rate);
453
454    let out_color = cfg.color_metadata;
455    let out_codec = cfg.codec;
456    let mut encoder = encode::select_encoder(cfg, backend)
457        .with_context(|| format!("creating encoder for rung {}", rung.label))?;
458    let mut muxer = Av1Mp4Muxer::new_with_codec(rung.width, rung.height, frame_rate, out_codec)
459        .context("Av1Mp4Muxer::new_with_codec")?;
460    muxer.set_color_metadata(out_color);
461
462    if let Some(a) = audio {
463        if let Err(e) = muxer.with_audio(a.info.clone()) {
464            tracing::warn!(rung = %rung.label, "audio rejected ({e}); video-only");
465        } else {
466            for (sample, dur) in &a.samples {
467                muxer.add_audio_sample(sample, 0, *dur).context("add_audio_sample")?;
468            }
469        }
470    }
471
472    let mut frames: u64 = 0;
473    report(sink, rung_index, rung, RungStatus::Running, 0, frames_total, 0, 0);
474    while let Some(frame) = rx.blocking_recv() {
475        let scaled = colorspace::scale_frame(&frame, rung.width, rung.height).context("scale_frame")?;
476        encoder.send_frame(&scaled).context("send_frame")?;
477        while let Some(pkt) = encoder.receive_packet().context("receive_packet")? {
478            muxer.add_packet(pkt).context("add_packet")?;
479        }
480        frames += 1;
481        if frames % 30 == 0 {
482            report(sink, rung_index, rung, RungStatus::Running, frames, frames_total, 0, 0);
483        }
484    }
485    encoder.flush().context("encoder flush")?;
486    while let Some(pkt) = encoder.receive_packet().context("receive_packet drain")? {
487        muxer.add_packet(pkt).context("add_packet drain")?;
488    }
489    report(sink, rung_index, rung, RungStatus::Finalizing, frames, frames_total, 0, 0);
490    let bytes = muxer.finalize().context("finalize")?.to_vec();
491    let nbytes = bytes.len() as u64;
492    report(sink, rung_index, rung, RungStatus::Completed, frames, frames_total, 0, nbytes);
493
494    Ok(RungOutput {
495        label: rung.label.clone(),
496        width: rung.width,
497        height: rung.height,
498        frames,
499        bytes: nbytes,
500        artifact: RungArtifact::File(bytes),
501    })
502}
503
504// ---------------------------------------------------------------------------
505// Hls: multi-GPU orchestrator + package assembly
506// ---------------------------------------------------------------------------
507
508#[allow(clippy::too_many_arguments)]
509async fn run_hls(
510    input: Bytes,
511    spec: &OutputSpec,
512    segment_seconds: f32,
513    header: &DemuxHeader,
514    frame_rate: f64,
515    audio: Option<&PreparedAudio>,
516    filter_chain: Arc<codec::filter::FilterChain>,
517    output_dir: Option<&Path>,
518    sink: Arc<dyn ProgressSink>,
519) -> Result<(Vec<RungOutput>, Option<PathBuf>, Option<PathBuf>)> {
520    let root = match output_dir {
521        Some(d) => d.to_path_buf(),
522        None => tempfile::Builder::new()
523            .prefix("rivet-hls-")
524            .tempdir()
525            .context("creating HLS temp dir")?
526            .keep(),
527    };
528
529    let timescale = (frame_rate * 1000.0).round().max(1.0) as u32;
530    let per_frame_ticks = (timescale as f64 / frame_rate.max(1.0)).round().max(1.0) as u32;
531    let keyframe_interval = keyframe_interval_for_segment(segment_seconds as f64, frame_rate);
532    let segment_target_ticks = (keyframe_interval as u64) * (per_frame_ticks as u64);
533    let total_input_frames = if header.info.total_frames > 0 {
534        header.info.total_frames
535    } else {
536        (header.info.duration * frame_rate).round().max(0.0) as u64
537    };
538
539    let gpu_pool = multigpu::gpu_pool_for_policy(spec.encode_policy, spec.video_codec.codec());
540    let (output_color_metadata, output_pixel_format) =
541        spec.resolve_output(header.info.color_metadata, header.info.pixel_format);
542    let params = MultiGpuParams {
543        input,
544        codec: spec.video_codec.codec(),
545        rungs: &spec.rungs,
546        header: header.clone(),
547        source_color_metadata: header.info.color_metadata,
548        source_pixel_format: header.info.pixel_format,
549        tonemap_to_sdr: spec.tonemaps(),
550        output_color_metadata,
551        output_pixel_format,
552        needs_downsample: needs_chroma_downsample(header.info.pixel_format),
553        filters: Arc::clone(&filter_chain),
554        frame_rate,
555        gpu_pool,
556        gpu_indices: multigpu::policy_gpu_indices(spec.encode_policy),
557        decode_gpu: spec.decode_gpu,
558        output_root: root.clone(),
559        timescale,
560        per_frame_ticks,
561        keyframe_interval,
562        segment_target_ticks,
563        total_input_frames,
564        // HLS segments are independent files — no stitched seams to flatten.
565        constant_qp: false,
566    };
567    let manifests = multigpu::run_multigpu_hls(params, Arc::clone(&sink)).await?;
568
569    let mut rung_outputs = Vec::new();
570    let mut video_specs = Vec::new();
571    for (idx, m) in manifests.into_iter().enumerate() {
572        match m {
573            Some(rm) => {
574                let dir = root.join(&rm.relative_dir);
575                let bytes = dir_size(&dir);
576                video_specs.push(build_video_variant_spec(&rm, frame_rate, bytes));
577                rung_outputs.push(RungOutput {
578                    label: rm.label.clone(),
579                    width: rm.width,
580                    height: rm.height,
581                    frames: total_input_frames,
582                    bytes,
583                    artifact: RungArtifact::HlsRendition {
584                        dir,
585                        relative_dir: rm.relative_dir,
586                    },
587                });
588            }
589            None => {
590                if let Some(rung) = spec.rungs.get(idx) {
591                    report_failed(sink.as_ref(), idx, rung, "rung produced no segments");
592                }
593            }
594        }
595    }
596    if rung_outputs.is_empty() {
597        bail!("all {} rung(s) failed", spec.rungs.len());
598    }
599
600    let audio_spec = match audio {
601        Some(a) => build_audio_rendition(&root, a, segment_seconds).context("building HLS audio rendition")?,
602        None => None,
603    };
604    let target_duration = segment_seconds.ceil() as u32;
605    let paths = write_hls_package(&root, &video_specs, audio_spec.as_ref(), target_duration)
606        .context("writing HLS package")?;
607
608    Ok((rung_outputs, Some(root), Some(paths.master_path)))
609}
610
611fn build_video_variant_spec(rm: &RungManifest, frame_rate: f64, bytes: u64) -> VideoVariantSpec {
612    let codec_string = cmaf_util::codec_string_from_init(&rm.manifest.init_path)
613        .unwrap_or_else(|_| "av01.0.08M.08.0.110.01.01.01.0".to_string());
614    let (_avg, peak) = cmaf_util::measure_bandwidth(&rm.manifest);
615    let bandwidth = if peak > 0 {
616        peak
617    } else {
618        let dur = rm.manifest.duration_seconds().max(0.001);
619        ((bytes as f64 * 8.0) / dur) as u32
620    };
621    VideoVariantSpec {
622        width: rm.width,
623        height: rm.height,
624        frame_rate,
625        average_bandwidth_bps: bandwidth,
626        bandwidth_bps: bandwidth,
627        codec_string,
628        supplemental_codecs: None,
629        video_range: None,
630        relative_dir: rm.relative_dir.clone(),
631        manifest: rm.manifest.clone(),
632    }
633}
634
635// ---------------------------------------------------------------------------
636// Audio
637// ---------------------------------------------------------------------------
638
639#[derive(Clone)]
640struct PreparedAudio {
641    info: AudioInfo,
642    samples: Vec<(Vec<u8>, u32)>,
643    handling: String,
644}
645
646impl PreparedAudio {
647    fn has_samples(&self) -> bool {
648        !self.samples.is_empty()
649    }
650}
651
652fn prepare_audio(track: Option<&AudioTrack>, policy: AudioCodecPolicy) -> Result<Option<PreparedAudio>> {
653    let Some(track) = track else {
654        return Ok(None);
655    };
656    if policy == AudioCodecPolicy::Drop {
657        return Ok(None);
658    }
659    let codec = track.codec.to_ascii_lowercase();
660    let passthrough_ok = matches!(codec.as_str(), "aac" | "opus" | "ac3" | "eac3");
661    let force_opus = policy == AudioCodecPolicy::ForceOpus;
662
663    if passthrough_ok && !(force_opus && codec != "opus") {
664        let info = passthrough_info(&codec, track);
665        let samples = track
666            .samples
667            .iter()
668            .cloned()
669            .zip(track.durations.iter().copied())
670            .collect();
671        return Ok(Some(PreparedAudio {
672            info,
673            samples,
674            handling: format!("{codec} passthrough"),
675        }));
676    }
677
678    if matches!(codec.as_str(), "mp3" | "vorbis") || force_opus {
679        if track.channels > 2 {
680            tracing::warn!(codec, channels = track.channels, "multichannel audio dropped");
681            return Ok(Some(dropped(format!("{codec} ({}ch)", track.channels))));
682        }
683        if !matches!(codec.as_str(), "mp3" | "vorbis") {
684            tracing::warn!(codec, "cannot transcode to opus; dropping audio");
685            return Ok(Some(dropped(codec)));
686        }
687        let extra: Option<&[u8]> =
688            if track.codec_private.is_empty() { None } else { Some(track.codec_private.as_slice()) };
689        let mut dec = audio_decoder(&codec, extra, track.sample_rate, track.channels as u8)
690            .context("audio decoder")?;
691        let bitrate = if track.channels == 1 { 64_000 } else { 96_000 };
692        let mut enc = audio_encoder(AudioEncoderConfig {
693            codec: AudioCodec::Opus,
694            sample_rate: track.sample_rate,
695            channels: track.channels as u8,
696            bitrate,
697        })
698        .context("opus encoder")?;
699
700        let mut samples: Vec<(Vec<u8>, u32)> = Vec::new();
701        let mut pts: i64 = 0;
702        for packet in &track.samples {
703            for frame in dec.decode(packet, pts).context("audio decode")? {
704                pts = pts.saturating_add((frame.samples.len() as i64) / frame.channels.max(1) as i64);
705                for pkt in enc.encode(&frame).context("opus encode")? {
706                    samples.push((pkt.data, pkt.duration as u32));
707                }
708            }
709        }
710        for frame in dec.flush().context("audio flush")? {
711            for pkt in enc.encode(&frame).context("opus encode flush")? {
712                samples.push((pkt.data, pkt.duration as u32));
713            }
714        }
715        for pkt in enc.flush().context("opus encoder flush")? {
716            samples.push((pkt.data, pkt.duration as u32));
717        }
718        let info = AudioInfo::opus(48_000, track.channels, enc.extra_data());
719        return Ok(Some(PreparedAudio {
720            info,
721            samples,
722            handling: format!("{codec} → opus"),
723        }));
724    }
725
726    Ok(Some(dropped(codec)))
727}
728
729fn dropped(codec: String) -> PreparedAudio {
730    PreparedAudio {
731        info: AudioInfo::aac_lc(48_000, 2, Vec::new()),
732        samples: Vec::new(),
733        handling: format!("{codec} dropped"),
734    }
735}
736
737fn passthrough_info(codec: &str, track: &AudioTrack) -> AudioInfo {
738    match codec {
739        "aac" => AudioInfo::aac_lc(track.sample_rate, track.channels, track.asc.clone()),
740        "opus" => AudioInfo::opus(track.sample_rate, track.channels, track.codec_private.clone()),
741        "ac3" => AudioInfo::ac3(track.sample_rate, track.channels, track.codec_private.clone()),
742        "eac3" => AudioInfo::eac3(track.sample_rate, track.channels, track.codec_private.clone()),
743        _ => AudioInfo::aac_lc(track.sample_rate, track.channels, track.asc.clone()),
744    }
745}
746
747fn build_audio_rendition(
748    asset_root: &Path,
749    audio: &PreparedAudio,
750    segment_seconds: f32,
751) -> Result<Option<AudioVariantSpec>> {
752    if !audio.has_samples() {
753        return Ok(None);
754    }
755    let audio_dir = asset_root.join("audio");
756    let seg_target_ticks = (segment_seconds as f64 * audio.info.timescale as f64).round() as u64;
757    let mut muxer = CmafAudioMuxer::new(&audio_dir, audio.info.clone()).context("CmafAudioMuxer::new")?;
758    for (payload, dur) in &audio.samples {
759        add_audio_sample_with_segment_flush(&mut muxer, payload.clone(), *dur, seg_target_ticks)?;
760    }
761    muxer.flush_segment().context("final audio flush_segment")?;
762    let manifest = muxer.finalize().context("CmafAudioMuxer finalize")?;
763
764    let codec_string = match audio.info.codec.as_str() {
765        "opus" => "opus".to_string(),
766        _ => codec::codec_strings::AAC_LC_CODEC_STRING.to_string(),
767    };
768    Ok(Some(AudioVariantSpec {
769        codec_string,
770        channels: audio.info.channels,
771        sample_rate: audio.info.sample_rate,
772        relative_dir: "audio".to_string(),
773        language: "und".to_string(),
774        name: "Audio".to_string(),
775        manifest,
776    }))
777}
778
779// ---------------------------------------------------------------------------
780// Misc helpers
781// ---------------------------------------------------------------------------
782
783fn encoder_backend_override() -> Option<EncoderBackend> {
784    std::env::var("TRANSCODE_ENCODER_BACKEND")
785        .ok()
786        .and_then(|s| match s.to_ascii_lowercase().as_str() {
787            "nvenc" => Some(EncoderBackend::Nvenc),
788            "amf" => Some(EncoderBackend::Amf),
789            "qsv" => Some(EncoderBackend::Qsv),
790            _ => None,
791        })
792}
793
794fn dir_size(dir: &Path) -> u64 {
795    let mut total = 0;
796    if let Ok(entries) = std::fs::read_dir(dir) {
797        for e in entries.flatten() {
798            if let Ok(meta) = e.metadata() {
799                if meta.is_file() {
800                    total += meta.len();
801                }
802            }
803        }
804    }
805    total
806}
807
808#[allow(clippy::too_many_arguments)]
809fn report(
810    sink: &dyn ProgressSink,
811    rung_index: usize,
812    rung: &Rung,
813    status: RungStatus,
814    frames_done: u64,
815    frames_total: Option<u64>,
816    segments: u32,
817    bytes_out: u64,
818) {
819    let percent = match status {
820        RungStatus::Completed => 100.0,
821        RungStatus::Pending => 0.0,
822        _ => match frames_total {
823            Some(total) if total > 0 => ((frames_done as f32 / total as f32) * 100.0).min(99.0),
824            _ => {
825                if frames_done == 0 { 1.0 } else { 50.0 }
826            }
827        },
828    };
829    sink.on_rung(RungProgress {
830        rung_index,
831        label: rung.label.clone(),
832        width: rung.width,
833        height: rung.height,
834        status,
835        percent,
836        frames_done,
837        frames_total,
838        segments_written: segments,
839        bytes_out,
840        message: None,
841    });
842}
843
844fn report_failed(sink: &dyn ProgressSink, rung_index: usize, rung: &Rung, message: &str) {
845    sink.on_rung(RungProgress {
846        rung_index,
847        label: rung.label.clone(),
848        width: rung.width,
849        height: rung.height,
850        status: RungStatus::Failed,
851        percent: 0.0,
852        frames_done: 0,
853        frames_total: None,
854        segments_written: 0,
855        bytes_out: 0,
856        message: Some(message.to_string()),
857    });
858}