Skip to main content

rivet/
transcode.rs

1//! Single-file transcode: arbitrary input → AV1 + audio MP4.
2//!
3//! Pipeline shape (no S3 / SQS / multi-variant — this is the single-shot
4//! path; for segmented CMAF-HLS or an ABR ladder, drive the `container`
5//! and `codec` crates directly):
6//!
7//! ```text
8//! input bytes → demux_streaming → header/audio extraction
9//!             → create_decoder (GPU dispatch: NVDEC / QSV)
10//!             → for each video sample: push_sample → decode_next loop
11//!                 → colorspace::convert_to_yuv420p_bt709
12//!                 → encoder.send_frame → receive_packet → muxer.add_packet
13//!             → drain decoder → flush encoder → muxer.finalize
14//!             → output bytes
15//! ```
16//!
17//! Audio is handled per source codec: AAC / Opus / AC-3 / E-AC-3 pass
18//! through verbatim; MP3 / Vorbis are transcoded to Opus; anything else is
19//! dropped (video-only output) with a warning.
20
21use std::path::Path;
22use std::time::{Duration, Instant};
23
24use anyhow::{Context, Result};
25
26use codec::audio::{
27    AudioCodec, AudioEncoderConfig, create_decoder as audio_decoder,
28    create_encoder as audio_encoder,
29};
30use codec::colorspace;
31use codec::decode;
32use codec::encode::{self, EncoderBackend, EncoderConfig};
33use container::AudioInfo;
34use container::demux::AudioTrack;
35use container::mux::Av1Mp4Muxer;
36use container::streaming;
37
38/// Outcome of a single in-memory transcode.
39#[derive(Debug, Clone)]
40pub struct TranscodeOutcome {
41    /// Lower-cased input video codec label (e.g. `"h264"`, `"hevc"`, `"av1"`).
42    pub input_codec: String,
43    /// Lower-cased input audio codec label, if the source carried audio.
44    pub input_audio_codec: Option<String>,
45    /// Source video dimensions `(width, height)` in pixels.
46    pub input_dims: (u32, u32),
47    /// Source frame rate in frames per second.
48    pub input_frame_rate: f64,
49    /// Size of the input buffer in bytes.
50    pub input_bytes: usize,
51    /// The encoded AV1/MP4 output buffer.
52    pub output_bytes: Vec<u8>,
53    /// Number of decoded video frames fed to the encoder.
54    pub frames_processed: u64,
55    /// Number of AV1 packets emitted by the encoder.
56    pub packets_emitted: u64,
57    /// How the audio track was handled.
58    pub audio_handling: AudioHandling,
59    /// Wall-clock time spent transcoding.
60    pub elapsed: Duration,
61}
62
63/// What happened to the source audio track.
64#[derive(Debug, Clone)]
65pub enum AudioHandling {
66    /// No audio track in the source.
67    None,
68    /// Codec carried through verbatim (AAC / Opus / AC-3 / E-AC-3).
69    Passthrough(String),
70    /// Source decoded and re-encoded to Opus (MP3 / Vorbis).
71    TranscodedToOpus(String),
72    /// Source audio dropped — codec unsupported or too many channels.
73    Dropped(String),
74}
75
76impl AudioHandling {
77    /// Human-readable one-line summary.
78    pub fn label(&self) -> String {
79        match self {
80            Self::None => "no audio track".into(),
81            Self::Passthrough(c) => format!("{c} passthrough"),
82            Self::TranscodedToOpus(c) => format!("{c} → opus transcode"),
83            Self::Dropped(c) => format!("{c} dropped (unsupported)"),
84        }
85    }
86}
87
88/// Read `input`, transcode to AV1/MP4, and write the result to `output`.
89///
90/// Returns the [`TranscodeOutcome`]; `outcome.output_bytes` also holds the
91/// bytes that were written to disk.
92pub fn transcode_file(input: impl AsRef<Path>, output: impl AsRef<Path>) -> Result<TranscodeOutcome> {
93    let input = input.as_ref();
94    let output = output.as_ref();
95    let bytes = std::fs::read(input)
96        .with_context(|| format!("reading input file {}", input.display()))?;
97    let outcome = transcode_bytes(&bytes)?;
98    std::fs::write(output, &outcome.output_bytes)
99        .with_context(|| format!("writing output file {}", output.display()))?;
100    Ok(outcome)
101}
102
103/// Transcode an in-memory input buffer to an AV1/MP4 output buffer.
104///
105/// This is the primary library entry point.
106pub fn transcode_bytes(input: &[u8]) -> Result<TranscodeOutcome> {
107    let started = Instant::now();
108    let input_bytes = input.len();
109
110    let mut demuxer = streaming::demux_streaming(input).context("demux")?;
111    let header = demuxer.header().clone();
112    let codec_lower = header.codec.to_ascii_lowercase();
113    let input_dims = (header.info.width, header.info.height);
114    let input_frame_rate = header.info.frame_rate;
115
116    // GPU-only dispatch: NVDEC for NVIDIA, QSV for Intel, hard-fail otherwise.
117    let mut decoder: Box<dyn codec::decode::Decoder> =
118        decode::create_decoder(&header.codec, header.info.clone()).context("create_decoder")?;
119    tracing::debug!(codec = %header.codec, "decoder constructed");
120
121    let target_width = header.info.width;
122    let target_height = header.info.height;
123    let frame_rate = if header.info.frame_rate > 0.0 {
124        header.info.frame_rate.min(60.0)
125    } else {
126        30.0
127    };
128
129    let config = EncoderConfig {
130        width: target_width,
131        height: target_height,
132        frame_rate,
133        keyframe_interval: (frame_rate * 2.0) as u32,
134        pixel_format: header.info.pixel_format,
135        color_metadata: header.info.color_metadata,
136        ..EncoderConfig::default()
137    };
138
139    // GPU-only encoders. Dev override: set
140    // `TRANSCODE_ENCODER_BACKEND=nvenc|amf|qsv` to force a backend;
141    // otherwise the auto-select chain (NVENC → AMF → QSV) runs.
142    let backend_override = std::env::var("TRANSCODE_ENCODER_BACKEND")
143        .ok()
144        .and_then(|s| match s.to_ascii_lowercase().as_str() {
145            "nvenc" => Some(EncoderBackend::Nvenc),
146            "amf" => Some(EncoderBackend::Amf),
147            "qsv" => Some(EncoderBackend::Qsv),
148            _ => None,
149        });
150    tracing::debug!(?backend_override, "encoder backend selection");
151    let mut encoder = encode::select_encoder(config, backend_override).context("select_encoder")?;
152
153    let mut muxer =
154        Av1Mp4Muxer::new(target_width, target_height, frame_rate).context("Av1Mp4Muxer::new")?;
155    muxer.set_color_metadata(header.info.color_metadata);
156
157    let audio_track = demuxer.audio().cloned();
158    let input_audio_codec = audio_track.as_ref().map(|t| t.codec.to_ascii_lowercase());
159    let audio_handling = wire_audio(&mut muxer, audio_track.as_ref())?;
160
161    let mut frames_processed: u64 = 0;
162    let mut packets_emitted: u64 = 0;
163
164    loop {
165        match demuxer.next_video_sample().context("next_video_sample")? {
166            Some(sample) => {
167                decoder.push_sample(&sample.data).context("push_sample")?;
168                while let Some(frame) = decoder.decode_next().context("decode_next")? {
169                    pump_frame(&mut encoder, &mut muxer, frame, &mut packets_emitted)?;
170                    frames_processed += 1;
171                }
172            }
173            None => {
174                decoder.finish().context("decoder.finish")?;
175                while let Some(frame) = decoder.decode_next().context("decode_next drain")? {
176                    pump_frame(&mut encoder, &mut muxer, frame, &mut packets_emitted)?;
177                    frames_processed += 1;
178                }
179                encoder.flush().context("encoder.flush")?;
180                while let Some(pkt) = encoder.receive_packet().context("receive_packet drain")? {
181                    muxer.add_packet(pkt).context("muxer.add_packet drain")?;
182                    packets_emitted += 1;
183                }
184                break;
185            }
186        }
187    }
188
189    tracing::debug!(
190        frames_processed,
191        packets_emitted,
192        "decode loop complete"
193    );
194    let output_bytes = muxer.finalize().context("muxer.finalize")?.to_vec();
195
196    Ok(TranscodeOutcome {
197        input_codec: codec_lower,
198        input_audio_codec,
199        input_dims,
200        input_frame_rate,
201        input_bytes,
202        output_bytes,
203        frames_processed,
204        packets_emitted,
205        audio_handling,
206        elapsed: started.elapsed(),
207    })
208}
209
210fn pump_frame(
211    encoder: &mut Box<dyn encode::Encoder>,
212    muxer: &mut Av1Mp4Muxer,
213    frame: codec::frame::VideoFrame,
214    packets_out: &mut u64,
215) -> Result<()> {
216    let normalized =
217        colorspace::convert_to_yuv420p_bt709(&frame).context("colorspace conversion")?;
218    encoder
219        .send_frame(&normalized)
220        .context("encoder.send_frame")?;
221    while let Some(pkt) = encoder.receive_packet().context("receive_packet")? {
222        muxer.add_packet(pkt).context("muxer.add_packet")?;
223        *packets_out += 1;
224    }
225    Ok(())
226}
227
228fn wire_audio(muxer: &mut Av1Mp4Muxer, track: Option<&AudioTrack>) -> Result<AudioHandling> {
229    let Some(track) = track else {
230        return Ok(AudioHandling::None);
231    };
232    let codec_lower = track.codec.to_ascii_lowercase();
233
234    match codec_lower.as_str() {
235        "aac" | "opus" | "ac3" | "eac3" => {
236            let info = build_passthrough_info(&codec_lower, track);
237            if let Err(e) = muxer.with_audio(info) {
238                tracing::warn!("with_audio rejected ({e}); emitting video-only");
239                return Ok(AudioHandling::Dropped(codec_lower));
240            }
241            for (sample, dur) in track.samples.iter().zip(track.durations.iter().copied()) {
242                muxer
243                    .add_audio_sample(sample, 0, dur)
244                    .context("muxer.add_audio_sample")?;
245            }
246            Ok(AudioHandling::Passthrough(codec_lower))
247        }
248        "mp3" | "vorbis" => {
249            if track.channels > 2 {
250                return Ok(AudioHandling::Dropped(format!(
251                    "{codec_lower} ({}ch)",
252                    track.channels
253                )));
254            }
255            let extra: Option<&[u8]> = if track.codec_private.is_empty() {
256                None
257            } else {
258                Some(track.codec_private.as_slice())
259            };
260            let mut dec =
261                audio_decoder(&codec_lower, extra, track.sample_rate, track.channels as u8)
262                    .context("codec::audio::create_decoder")?;
263            let bitrate = if track.channels == 1 { 64_000 } else { 96_000 };
264            let mut enc = audio_encoder(AudioEncoderConfig {
265                codec: AudioCodec::Opus,
266                sample_rate: track.sample_rate,
267                channels: track.channels as u8,
268                bitrate,
269            })
270            .context("codec::audio::create_encoder (opus)")?;
271
272            let mut out: Vec<(Vec<u8>, u32)> = Vec::new();
273            let mut pts: i64 = 0;
274            for packet in &track.samples {
275                for frame in dec.decode(packet, pts).context("mp3/vorbis decode")? {
276                    pts = pts.saturating_add(
277                        (frame.samples.len() as i64) / frame.channels.max(1) as i64,
278                    );
279                    for pkt in enc.encode(&frame).context("opus encode")? {
280                        out.push((pkt.data, pkt.duration as u32));
281                    }
282                }
283            }
284            for frame in dec.flush().context("mp3/vorbis flush")? {
285                for pkt in enc.encode(&frame).context("opus encode (flush)")? {
286                    out.push((pkt.data, pkt.duration as u32));
287                }
288            }
289            for pkt in enc.flush().context("opus encoder flush")? {
290                out.push((pkt.data, pkt.duration as u32));
291            }
292            let info = AudioInfo {
293                codec: "opus".into(),
294                sample_rate: 48_000,
295                channels: track.channels,
296                timescale: 48_000,
297                asc_bytes: Vec::new(),
298                codec_private: enc.extra_data(),
299            };
300            if let Err(e) = muxer.with_audio(info) {
301                tracing::warn!("with_audio rejected ({e}); emitting video-only");
302                return Ok(AudioHandling::Dropped(codec_lower));
303            }
304            for (sample, dur) in out {
305                muxer
306                    .add_audio_sample(&sample, 0, dur)
307                    .context("muxer.add_audio_sample (opus)")?;
308            }
309            Ok(AudioHandling::TranscodedToOpus(codec_lower))
310        }
311        other => Ok(AudioHandling::Dropped(other.into())),
312    }
313}
314
315fn build_passthrough_info(codec_lower: &str, track: &AudioTrack) -> AudioInfo {
316    let timescale = if codec_lower == "opus" {
317        48_000
318    } else {
319        track.timescale
320    };
321    AudioInfo {
322        codec: codec_lower.into(),
323        sample_rate: track.sample_rate,
324        channels: track.channels,
325        timescale,
326        asc_bytes: if codec_lower == "aac" {
327            track.asc.clone()
328        } else {
329            Vec::new()
330        },
331        codec_private: if codec_lower == "aac" {
332            Vec::new()
333        } else {
334            track.codec_private.clone()
335        },
336    }
337}