Skip to main content

rivet/
encoder_worker.rs

1//! Per-segment encoder worker: pop a chunk → encode K frames →
2//! emit one CMAF segment file → repeat.
3//!
4//! v2 multi-GPU model (2026-05-11): each worker owns one GPU lease
5//! and one encoder for its lifetime, but builds a fresh
6//! `CmafVideoMuxer` per claimed segment. The muxer is configured
7//! with the segment's index + base decode time so the on-disk
8//! filename + tfdt match what a single-encoder pipeline would
9//! produce. Helpers attaching mid-flight just start popping from
10//! the queue's current head; no decode-and-discard.
11//!
12//! Workers exit when `queue.pop()` returns `None` (pump closed +
13//! queue drained). The returned `WorkerOutput` lists every segment
14//! the worker wrote so the orchestrator can merge contributions
15//! into the per-rung manifest.
16
17use anyhow::{Context, Result, anyhow};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::RwLock;
21
22use codec::encode::{self, EncoderConfig};
23use codec::frame::{ColorMetadata, PixelFormat};
24use codec::pixel_format::{Av1SequenceHeader, parse_av1_sequence_header};
25use container::cmaf::{CmafVideoMuxer, CmafVideoMuxerOptions, SegmentInfo};
26use tokio::sync::mpsc;
27
28use crate::cmaf_util::add_packet_with_segment_flush;
29use crate::frame_queue::{SegmentChunk, SegmentChunkQueue};
30
31/// Mandatory AV1 sequence-header fields that every encoder
32/// contributing segments to a single rendition MUST agree on.
33///
34/// Why these specific fields: each is part of the codec-init contract
35/// that the player sets up once from `av1C` and expects to hold for
36/// every segment. The decoder re-parses the inline OBU sequence
37/// header in each segment's IDR; if its parsed values disagree with
38/// the av1C from `init.mp4` on any of these fields, strict decoders
39/// (dav1d in conformance mode, Safari AVFoundation, hls.js+libdav1d)
40/// will reject the segment. Optional fields not listed here (timing
41/// info presence, decoder model presence, film grain `present` flag,
42/// operating-point details) are tolerated by every major player; we
43/// deliberately don't check them so that NVENC + QSV + AMF + rav1e
44/// can co-exist on one rendition without cosmetic byte differences
45/// triggering false rejections.
46///
47/// First worker on a rung SETS the invariant. Subsequent workers
48/// (helpers from any vendor) COMPARE; mismatch fails the run loudly
49/// instead of silently corrupting output.
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct RungCodecInvariant {
52    pub seq_profile: u8,
53    pub seq_level_idx_0: u8,
54    pub seq_tier_0: u8,
55    pub bit_depth: u8,
56    pub monochrome: bool,
57    pub chroma_subsampling_x: bool,
58    pub chroma_subsampling_y: bool,
59    pub color_primaries: u8,
60    pub transfer_characteristics: u8,
61    pub matrix_coefficients: u8,
62    pub color_range: bool,
63    pub max_frame_width_minus1: u32,
64    pub max_frame_height_minus1: u32,
65    pub still_picture: bool,
66}
67
68impl RungCodecInvariant {
69    pub fn from_sequence_header(sh: &Av1SequenceHeader) -> Self {
70        Self {
71            seq_profile: sh.seq_profile,
72            seq_level_idx_0: sh.seq_level_idx_0,
73            seq_tier_0: sh.seq_tier_0,
74            bit_depth: sh.bit_depth,
75            monochrome: sh.monochrome,
76            chroma_subsampling_x: sh.chroma_subsampling_x,
77            chroma_subsampling_y: sh.chroma_subsampling_y,
78            color_primaries: sh.color_primaries,
79            transfer_characteristics: sh.transfer_characteristics,
80            matrix_coefficients: sh.matrix_coefficients,
81            color_range: sh.color_range,
82            max_frame_width_minus1: sh.max_frame_width_minus1,
83            max_frame_height_minus1: sh.max_frame_height_minus1,
84            still_picture: sh.still_picture,
85        }
86    }
87
88    /// Human-readable diff for error messages.
89    fn describe_diff(&self, other: &Self) -> String {
90        let mut diffs = Vec::new();
91        macro_rules! diff_field {
92            ($field:ident) => {
93                if self.$field != other.$field {
94                    diffs.push(format!(
95                        "{}: rung={:?}, this worker={:?}",
96                        stringify!($field),
97                        self.$field,
98                        other.$field
99                    ));
100                }
101            };
102        }
103        diff_field!(seq_profile);
104        diff_field!(seq_level_idx_0);
105        diff_field!(seq_tier_0);
106        diff_field!(bit_depth);
107        diff_field!(monochrome);
108        diff_field!(chroma_subsampling_x);
109        diff_field!(chroma_subsampling_y);
110        diff_field!(color_primaries);
111        diff_field!(transfer_characteristics);
112        diff_field!(matrix_coefficients);
113        diff_field!(color_range);
114        diff_field!(max_frame_width_minus1);
115        diff_field!(max_frame_height_minus1);
116        diff_field!(still_picture);
117        diffs.join("; ")
118    }
119}
120
121/// Outcome of comparing a worker's first packet against the rung's
122/// codec invariant. The caller — `run_encoder_worker_blocking` —
123/// branches on this to decide whether to keep encoding, soft-fail
124/// (requeue the chunk for another worker), or hard-fail (parse error
125/// from a malformed bitstream).
126#[derive(Debug)]
127pub enum InvariantCheck {
128    /// First worker on the rung. Invariant has been recorded.
129    SetByThisWorker,
130    /// Matches the rung's invariant. Proceed to publish.
131    Matched,
132    /// Mandatory fields mismatch. Worker should requeue its chunk and
133    /// exit cleanly; the rung continues with workers whose vendors
134    /// agree with the invariant the first worker set. **Mission-
135    /// critical jobs DO NOT abort on this** — only this one helper's
136    /// contribution is lost, and another worker picks up the chunk.
137    Mismatched { diff: String },
138}
139
140/// Parse a worker's first packet, derive the codec invariant, and
141/// compare-or-set it against the per-rung slot. Returns
142/// [`InvariantCheck`] on a successful parse; an `Err` only on
143/// malformed bitstream (the encoder failed to emit an
144/// `OBU_SEQUENCE_HEADER` at all, which is a configuration bug that
145/// nothing downstream can recover from).
146pub fn validate_or_set_rung_invariant(
147    rung_idx: usize,
148    gpu_vendor: Option<codec::gpu::GpuVendor>,
149    slot: &RwLock<Option<RungCodecInvariant>>,
150    first_packet: &[u8],
151) -> Result<InvariantCheck> {
152    let parsed = parse_av1_sequence_header(first_packet).ok_or_else(|| {
153        anyhow!(
154            "rung {} (vendor {:?}): could not parse AV1 sequence header from first encoded packet; \
155             encoder did not emit OBU_SEQUENCE_HEADER as required for CMAF segment alignment",
156            rung_idx,
157            gpu_vendor,
158        )
159    })?;
160    let observed = RungCodecInvariant::from_sequence_header(&parsed);
161
162    // Fast path: read lock, check if set + matches.
163    if let Some(existing) = &*slot.read().unwrap() {
164        if existing == &observed {
165            return Ok(InvariantCheck::Matched);
166        }
167        return Ok(InvariantCheck::Mismatched {
168            diff: existing.describe_diff(&observed),
169        });
170    }
171    // First worker — write under write-lock with double-check (race
172    // against another worker setting the slot between read and write).
173    let mut w = slot.write().unwrap();
174    match &*w {
175        Some(existing) if existing != &observed => Ok(InvariantCheck::Mismatched {
176            diff: existing.describe_diff(&observed),
177        }),
178        Some(_) => Ok(InvariantCheck::Matched),
179        None => {
180            tracing::info!(
181                rung_idx,
182                gpu_vendor = ?gpu_vendor,
183                seq_profile = observed.seq_profile,
184                seq_level_idx_0 = observed.seq_level_idx_0,
185                bit_depth = observed.bit_depth,
186                "rung codec invariant captured from first worker"
187            );
188            *w = Some(observed);
189            Ok(InvariantCheck::SetByThisWorker)
190        }
191    }
192}
193
194#[derive(Clone)]
195pub struct EncoderWorkerConfig {
196    pub rung_idx: usize,
197    pub width: u32,
198    pub height: u32,
199    pub frame_rate: f64,
200    /// Legacy CRF escape hatch (`u8::MAX` = derive from `target`).
201    pub quality: u8,
202    /// Speed preset escape hatch (`u8::MAX` = derive from `tier`).
203    pub speed_preset: u8,
204    /// Perceptual quality target (used when `quality` is the sentinel).
205    pub target: codec::encode::tuning::QualityTarget,
206    /// Speed tier (used when `speed_preset` is the sentinel).
207    pub tier: codec::encode::tuning::SpeedTier,
208    pub threads: usize,
209    pub gpu_index: Option<u32>,
210    pub gpu_vendor: Option<codec::gpu::GpuVendor>,
211    /// Resolved **output** color metadata + pixel format (the encoder's input
212    /// format and bitstream signaling). The engine computes these from the
213    /// `OutputSpec`'s `ColorPolicy` / `BitDepth` via `resolve_output`, so the
214    /// worker no longer folds HDR→SDR itself — it just encodes to this format.
215    pub output_color_metadata: ColorMetadata,
216    pub output_pixel_format: PixelFormat,
217    /// Prefer constant-QP rate control (seam-flat chunked single-file under
218    /// `ChunkSeamMode::ParallelConstQp`). Forwarded to `EncoderConfig.constant_qp`.
219    pub constant_qp: bool,
220    pub timescale: u32,
221    pub per_frame_ticks: u32,
222    pub keyframe_interval: u32,
223    pub segment_target_ticks: u64,
224    pub output_dir: PathBuf,
225    /// Shared per-rung codec invariant slot. First worker on the rung
226    /// SETS it; helpers (any vendor) COMPARE on their first packet.
227    /// On mismatch the helper requeues its chunk and exits cleanly so
228    /// the run continues without it — never aborts mission-critical
229    /// jobs. See `validate_or_set_rung_invariant` + the requeue path
230    /// in `run_encoder_worker_blocking`.
231    pub rung_invariant: Arc<RwLock<Option<RungCodecInvariant>>>,
232}
233
234#[derive(Debug, Clone)]
235pub struct WorkerOutput {
236    pub gpu_index: Option<u32>,
237    pub segments: Vec<SegmentInfo>,
238}
239
240/// Run the encoder loop until the chunk queue is closed and drained.
241/// Designed to be wrapped in `tokio::task::spawn_blocking`.
242///
243/// `progress_tx` receives the shared cumulative `frames_encoded_total`
244/// after every encoded frame; the caller's drain task fires wire
245/// events from this stream. Multiple workers bump the same counter,
246/// so the progress reading stays monotonic across worker handoffs.
247#[allow(clippy::too_many_arguments)]
248pub fn run_encoder_worker_blocking(
249    cfg: EncoderWorkerConfig,
250    queue: Arc<SegmentChunkQueue>,
251    rt: tokio::runtime::Handle,
252    shared_frames_encoded: Arc<std::sync::atomic::AtomicU64>,
253    progress_tx: mpsc::Sender<u64>,
254) -> Result<WorkerOutput> {
255    let enc_config = build_enc_config(&cfg);
256    let encoder_color_metadata = cfg.output_color_metadata;
257
258    let mut segments_written: Vec<SegmentInfo> = Vec::new();
259    let mut init_segment_written = false;
260
261    tracing::debug!(rung_idx = cfg.rung_idx, gpu_index = ?cfg.gpu_index, "encoder worker started; awaiting first chunk");
262    loop {
263        let chunk = match rt.block_on(queue.pop()) {
264            Some(c) => c,
265            None => break,
266        };
267        tracing::debug!(rung_idx = cfg.rung_idx, segment = chunk.segment_idx, frames = chunk.frames.len(), "encoder worker popped chunk");
268        match encode_one_segment(
269            &cfg,
270            &enc_config,
271            encoder_color_metadata,
272            chunk,
273            &mut init_segment_written,
274            &shared_frames_encoded,
275            &progress_tx,
276        )? {
277            SegmentOutcome::Wrote {
278                info,
279                segment_idx,
280                frames,
281            } => {
282                let role = if segment_idx == 0 {
283                    "primary"
284                } else {
285                    "worker"
286                };
287                tracing::info!(
288                    rung_idx = cfg.rung_idx,
289                    gpu_index = ?cfg.gpu_index,
290                    role,
291                    segment = segment_idx,
292                    frames_encoded = frames,
293                    "rung segment flushed",
294                );
295                segments_written.push(info);
296            }
297            SegmentOutcome::RequeuedOnMismatch {
298                chunk: rejected,
299                diff,
300            } => {
301                // Helper from a vendor whose AV1 sequence header diverges
302                // from the rung's invariant on mandatory fields. Put the
303                // chunk back at the head of the queue so a matching-vendor
304                // worker (always at least the initial worker) picks it up.
305                // Exit clean — the run completes without this helper.
306                tracing::warn!(
307                    rung_idx = cfg.rung_idx,
308                    gpu_index = ?cfg.gpu_index,
309                    gpu_vendor = ?cfg.gpu_vendor,
310                    rejected_segment = rejected.segment_idx,
311                    diff = %diff,
312                    "encoder worker: codec invariant mismatch on first packet — \
313                     requeuing chunk for a matching-vendor worker and exiting",
314                );
315                let _ = queue.push_front(rejected);
316                break;
317            }
318        }
319    }
320
321    Ok(WorkerOutput {
322        gpu_index: cfg.gpu_index,
323        segments: segments_written,
324    })
325}
326
327/// Outcome of an `encode_one_segment` call. `Wrote` is the happy
328/// path; `RequeuedOnMismatch` returns the chunk verbatim so the outer
329/// loop can put it back at the head of the queue for another worker.
330enum SegmentOutcome {
331    Wrote {
332        info: SegmentInfo,
333        segment_idx: usize,
334        frames: usize,
335    },
336    RequeuedOnMismatch {
337        chunk: SegmentChunk,
338        diff: String,
339    },
340}
341
342fn encode_one_segment(
343    cfg: &EncoderWorkerConfig,
344    enc_config: &EncoderConfig,
345    encoder_color_metadata: ColorMetadata,
346    chunk: SegmentChunk,
347    init_segment_written: &mut bool,
348    shared_frames_encoded: &std::sync::atomic::AtomicU64,
349    progress_tx: &mpsc::Sender<u64>,
350) -> Result<SegmentOutcome> {
351    let write_init = chunk.segment_idx == 0 && !*init_segment_written;
352    let muxer_options = CmafVideoMuxerOptions {
353        first_segment_index: (chunk.segment_idx as u32) + 1,
354        first_segment_base_decode_time: chunk.segment_idx as u64 * cfg.segment_target_ticks,
355        write_init_segment: write_init,
356    };
357    let mut muxer = CmafVideoMuxer::new_with_options(
358        &cfg.output_dir,
359        cfg.width,
360        cfg.height,
361        cfg.timescale,
362        encoder_color_metadata,
363        muxer_options,
364    )
365    .with_context(|| {
366        format!(
367            "creating CmafVideoMuxer for segment {} in {}",
368            chunk.segment_idx,
369            cfg.output_dir.display()
370        )
371    })?;
372
373    let mut encoder =
374        encode::select_encoder(enc_config.clone(), None).context("creating encoder for segment")?;
375
376    // Buffered packets emitted from the encoder, awaiting either
377    // commit-to-muxer (after invariant validation passes) or discard
378    // (on mismatch). The first packet's bytes are the AV1 sequence
379    // header OBU that we feed to the invariant validator.
380    let mut pending_packets: Vec<codec::encode::EncodedPacket> = Vec::new();
381    let mut first_packet_decision: Option<bool> = None; // None=undecided, Some(true)=commit, Some(false)=reject
382
383    let segment_idx = chunk.segment_idx;
384    let frame_count = chunk.frames.len();
385
386    for frame in &chunk.frames {
387        encoder
388            .send_frame(frame)
389            .context("encoder.send_frame in worker")?;
390        while let Some(packet) = encoder
391            .receive_packet()
392            .context("encoder.receive_packet in worker")?
393        {
394            if first_packet_decision.is_none() {
395                match validate_or_set_rung_invariant(
396                    cfg.rung_idx,
397                    cfg.gpu_vendor,
398                    &cfg.rung_invariant,
399                    &packet.data,
400                )? {
401                    InvariantCheck::Matched | InvariantCheck::SetByThisWorker => {
402                        first_packet_decision = Some(true);
403                    }
404                    InvariantCheck::Mismatched { diff } => {
405                        // Discard everything in flight. The muxer hasn't
406                        // flushed any segment yet (first packet of a
407                        // chunk is far below the segment-duration target),
408                        // and init.mp4 is only written by finalize() —
409                        // which we don't call. Drop muxer + encoder
410                        // implicitly when we return.
411                        return Ok(SegmentOutcome::RequeuedOnMismatch { chunk, diff });
412                    }
413                }
414                pending_packets.push(packet);
415                continue;
416            }
417            // first_packet_decision == Some(true): commit
418            // First drain any buffered packets we held back during
419            // validation.
420            if !pending_packets.is_empty() {
421                for held in pending_packets.drain(..) {
422                    add_packet_with_segment_flush(
423                        &mut muxer,
424                        &held,
425                        cfg.per_frame_ticks,
426                        cfg.segment_target_ticks,
427                    )
428                    .context("CMAF segment-flush add (held)")?;
429                }
430            }
431            add_packet_with_segment_flush(
432                &mut muxer,
433                &packet,
434                cfg.per_frame_ticks,
435                cfg.segment_target_ticks,
436            )
437            .context("CMAF segment-flush add (worker)")?;
438        }
439        let n = shared_frames_encoded.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1;
440        let _ = progress_tx.try_send(n);
441    }
442
443    // Drain remaining held packets (e.g. if the only packets emitted
444    // were buffered during the single validation step).
445    if first_packet_decision == Some(true) && !pending_packets.is_empty() {
446        for held in pending_packets.drain(..) {
447            add_packet_with_segment_flush(
448                &mut muxer,
449                &held,
450                cfg.per_frame_ticks,
451                cfg.segment_target_ticks,
452            )
453            .context("CMAF segment-flush add (final-held)")?;
454        }
455    }
456
457    encoder.flush().context("encoder.flush in worker")?;
458    while let Some(packet) = encoder
459        .receive_packet()
460        .context("encoder.receive_packet after flush")?
461    {
462        add_packet_with_segment_flush(
463            &mut muxer,
464            &packet,
465            cfg.per_frame_ticks,
466            cfg.segment_target_ticks,
467        )
468        .context("CMAF segment-flush add post-flush (worker)")?;
469    }
470
471    let manifest = muxer
472        .finalize()
473        .context("finalize CmafVideoMuxer (per-segment worker)")?;
474
475    if write_init {
476        *init_segment_written = true;
477    }
478
479    let info = manifest
480        .segments
481        .last()
482        .ok_or_else(|| {
483            anyhow::anyhow!(
484                "encoder worker produced no segment for chunk idx {} (rung {}, gpu {:?}); \
485                 frames in chunk = {}",
486                segment_idx,
487                cfg.rung_idx,
488                cfg.gpu_index,
489                frame_count,
490            )
491        })?
492        .clone();
493    Ok(SegmentOutcome::Wrote {
494        info,
495        segment_idx,
496        frames: frame_count,
497    })
498}
499
500// ---------------------------------------------------------------------------
501// Single-file chunked encode: workers collect packets (instead of writing CMAF
502// segments) so the orchestrator can stitch them, in segment order, into one MP4.
503// ---------------------------------------------------------------------------
504
505/// One chunk's encoded packets, in encode (= display, no B-frames) order.
506#[derive(Debug)]
507pub struct ChunkPackets {
508    pub segment_idx: usize,
509    pub packets: Vec<encode::EncodedPacket>,
510}
511
512/// Build the per-rung `EncoderConfig` from the resolved output format + quality
513/// knobs. Shared by the CMAF and packet workers.
514fn build_enc_config(cfg: &EncoderWorkerConfig) -> EncoderConfig {
515    EncoderConfig {
516        width: cfg.width,
517        height: cfg.height,
518        frame_rate: cfg.frame_rate,
519        quality: cfg.quality,
520        speed_preset: cfg.speed_preset,
521        keyframe_interval: cfg.keyframe_interval,
522        threads: cfg.threads,
523        pixel_format: cfg.output_pixel_format,
524        color_metadata: cfg.output_color_metadata,
525        gpu_index: cfg.gpu_index,
526        gpu_vendor: cfg.gpu_vendor,
527        target: cfg.target,
528        tier: cfg.tier,
529        constant_qp: cfg.constant_qp,
530        ..EncoderConfig::default()
531    }
532}
533
534/// Encoder worker that COLLECTS packets per chunk (single-file path). Each
535/// chunk is encoded by a fresh encoder (first frame an IDR); the cross-vendor
536/// codec invariant is enforced on the first packet (mismatch → requeue + exit,
537/// exactly like the CMAF worker). Ordered `ChunkPackets` are pushed to `out`.
538#[allow(clippy::too_many_arguments)]
539pub fn run_chunk_encoder_worker_blocking(
540    cfg: EncoderWorkerConfig,
541    queue: Arc<SegmentChunkQueue>,
542    rt: tokio::runtime::Handle,
543    shared_frames_encoded: Arc<std::sync::atomic::AtomicU64>,
544    progress_tx: mpsc::Sender<u64>,
545    out: Arc<std::sync::Mutex<Vec<ChunkPackets>>>,
546) -> Result<()> {
547    let enc_config = build_enc_config(&cfg);
548    loop {
549        let chunk = match rt.block_on(queue.pop()) {
550            Some(c) => c,
551            None => break,
552        };
553        match encode_chunk_to_packets(&cfg, &enc_config, chunk, &shared_frames_encoded, &progress_tx)?
554        {
555            ChunkOutcome::Encoded(c) => out.lock().unwrap().push(c),
556            ChunkOutcome::RequeuedOnMismatch { chunk, diff } => {
557                tracing::warn!(
558                    rung_idx = cfg.rung_idx,
559                    gpu_vendor = ?cfg.gpu_vendor,
560                    diff = %diff,
561                    "chunk worker: codec invariant mismatch — requeuing chunk and exiting"
562                );
563                let _ = queue.push_front(chunk);
564                break;
565            }
566        }
567    }
568    Ok(())
569}
570
571enum ChunkOutcome {
572    Encoded(ChunkPackets),
573    RequeuedOnMismatch { chunk: SegmentChunk, diff: String },
574}
575
576fn encode_chunk_to_packets(
577    cfg: &EncoderWorkerConfig,
578    enc_config: &EncoderConfig,
579    chunk: SegmentChunk,
580    shared_frames_encoded: &std::sync::atomic::AtomicU64,
581    progress_tx: &mpsc::Sender<u64>,
582) -> Result<ChunkOutcome> {
583    let mut encoder =
584        encode::select_encoder(enc_config.clone(), None).context("creating encoder for chunk")?;
585    let segment_idx = chunk.segment_idx;
586    let mut packets: Vec<encode::EncodedPacket> = Vec::new();
587    let mut pending: Vec<encode::EncodedPacket> = Vec::new();
588    let mut decided = false;
589
590    for frame in &chunk.frames {
591        encoder.send_frame(frame).context("send_frame in chunk worker")?;
592        while let Some(packet) = encoder.receive_packet().context("receive_packet in chunk worker")? {
593            if !decided {
594                match validate_or_set_rung_invariant(
595                    cfg.rung_idx,
596                    cfg.gpu_vendor,
597                    &cfg.rung_invariant,
598                    &packet.data,
599                )? {
600                    InvariantCheck::Matched | InvariantCheck::SetByThisWorker => decided = true,
601                    InvariantCheck::Mismatched { diff } => {
602                        return Ok(ChunkOutcome::RequeuedOnMismatch { chunk, diff });
603                    }
604                }
605                pending.push(packet);
606                continue;
607            }
608            packets.append(&mut pending);
609            packets.push(packet);
610        }
611        let n = shared_frames_encoded.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1;
612        let _ = progress_tx.try_send(n);
613    }
614    if decided {
615        packets.append(&mut pending);
616    }
617    encoder.flush().context("flush in chunk worker")?;
618    while let Some(packet) = encoder
619        .receive_packet()
620        .context("receive_packet after flush in chunk worker")?
621    {
622        packets.push(packet);
623    }
624    Ok(ChunkOutcome::Encoded(ChunkPackets { segment_idx, packets }))
625}
626
627#[cfg(test)]
628mod tests {
629    use super::*;
630
631    #[test]
632    fn config_clone_preserves_fields() {
633        let cfg = EncoderWorkerConfig {
634            rung_idx: 2,
635            width: 1280,
636            height: 720,
637            frame_rate: 30.0,
638            quality: 32,
639            speed_preset: u8::MAX,
640            target: codec::encode::tuning::QualityTarget::Standard,
641            tier: codec::encode::tuning::SpeedTier::Standard,
642            threads: 4,
643            gpu_index: Some(1),
644            gpu_vendor: None,
645            output_color_metadata: ColorMetadata::default(),
646            output_pixel_format: PixelFormat::Yuv420p,
647            constant_qp: false,
648            timescale: 30000,
649            per_frame_ticks: 1000,
650            keyframe_interval: 60,
651            segment_target_ticks: 60_000,
652            output_dir: PathBuf::from("/tmp/x"),
653            rung_invariant: Arc::new(RwLock::new(None)),
654        };
655        let copy = cfg.clone();
656        assert_eq!(copy.rung_idx, 2);
657        assert_eq!(copy.keyframe_interval, 60);
658    }
659
660    #[test]
661    fn invariant_matches_itself() {
662        let a = RungCodecInvariant {
663            seq_profile: 0,
664            seq_level_idx_0: 8,
665            seq_tier_0: 0,
666            bit_depth: 8,
667            monochrome: false,
668            chroma_subsampling_x: true,
669            chroma_subsampling_y: true,
670            color_primaries: 1,
671            transfer_characteristics: 1,
672            matrix_coefficients: 1,
673            color_range: false,
674            max_frame_width_minus1: 1919,
675            max_frame_height_minus1: 1079,
676            still_picture: false,
677        };
678        assert_eq!(a.clone(), a);
679        assert_eq!(a.describe_diff(&a), "");
680    }
681
682    #[test]
683    fn invariant_diff_lists_changed_fields() {
684        let a = RungCodecInvariant {
685            seq_profile: 0,
686            seq_level_idx_0: 8,
687            seq_tier_0: 0,
688            bit_depth: 8,
689            monochrome: false,
690            chroma_subsampling_x: true,
691            chroma_subsampling_y: true,
692            color_primaries: 1,
693            transfer_characteristics: 1,
694            matrix_coefficients: 1,
695            color_range: false,
696            max_frame_width_minus1: 1919,
697            max_frame_height_minus1: 1079,
698            still_picture: false,
699        };
700        let mut b = a.clone();
701        b.bit_depth = 10;
702        b.color_primaries = 9;
703        let diff = a.describe_diff(&b);
704        assert!(diff.contains("bit_depth"));
705        assert!(diff.contains("color_primaries"));
706        assert!(!diff.contains("seq_profile"));
707    }
708
709    #[test]
710    fn validator_parse_error_returns_err_not_mismatch() {
711        // Junk bytes — no recognisable AV1 sequence header OBU.
712        // Distinct from a mismatch: this is a malformed-bitstream
713        // condition that nothing downstream can recover from. The
714        // worker propagates this Err and fails the run, unlike the
715        // soft-fail Mismatched case.
716        let slot: RwLock<Option<RungCodecInvariant>> = RwLock::new(None);
717        let junk = vec![0u8; 8];
718        let err =
719            validate_or_set_rung_invariant(0, Some(codec::gpu::GpuVendor::Intel), &slot, &junk)
720                .unwrap_err();
721        assert!(
722            err.to_string()
723                .contains("could not parse AV1 sequence header")
724        );
725        assert!(slot.read().unwrap().is_none());
726    }
727
728    #[test]
729    fn mismatched_diff_includes_changed_field() {
730        let existing = RungCodecInvariant {
731            seq_profile: 0,
732            seq_level_idx_0: 8,
733            seq_tier_0: 0,
734            bit_depth: 8,
735            monochrome: false,
736            chroma_subsampling_x: true,
737            chroma_subsampling_y: true,
738            color_primaries: 1,
739            transfer_characteristics: 1,
740            matrix_coefficients: 1,
741            color_range: false,
742            max_frame_width_minus1: 1919,
743            max_frame_height_minus1: 1079,
744            still_picture: false,
745        };
746        let mut other = existing.clone();
747        other.bit_depth = 10;
748        let diff = existing.describe_diff(&other);
749        assert!(
750            diff.contains("bit_depth"),
751            "diff should mention bit_depth; got {diff}"
752        );
753    }
754}