Skip to main content

rivet/
encoder_worker.rs

1//! Per-segment encoder worker: pop a chunk → encode K frames →
2//! emit one CMAF segment file → repeat.
3//!
4//! v2 multi-GPU model (2026-05-11): each worker owns one GPU lease
5//! and one encoder for its lifetime, but builds a fresh
6//! `CmafVideoMuxer` per claimed segment. The muxer is configured
7//! with the segment's index + base decode time so the on-disk
8//! filename + tfdt match what a single-encoder pipeline would
9//! produce. Helpers attaching mid-flight just start popping from
10//! the queue's current head; no decode-and-discard.
11//!
12//! Workers exit when `queue.pop()` returns `None` (pump closed +
13//! queue drained). The returned `WorkerOutput` lists every segment
14//! the worker wrote so the orchestrator can merge contributions
15//! into the per-rung manifest.
16
17use anyhow::{Context, Result, anyhow};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::RwLock;
21
22use codec::encode::{self, EncoderConfig};
23use codec::frame::{ColorMetadata, PixelFormat, VideoCodec};
24use codec::pixel_format::{
25    Av1SequenceHeader, H264SpsInfo, HevcSpsInfo, parse_av1_sequence_header, parse_h264_sps,
26    parse_hevc_sps,
27};
28use container::cmaf::{CmafVideoMuxer, CmafVideoMuxerOptions, SegmentInfo};
29use tokio::sync::mpsc;
30
31use crate::cmaf_util::add_packet_with_segment_flush;
32use crate::frame_queue::{SegmentChunk, SegmentChunkQueue};
33
34/// Mandatory AV1 sequence-header fields that every encoder
35/// contributing segments to a single rendition MUST agree on.
36///
37/// Why these specific fields: each is part of the codec-init contract
38/// that the player sets up once from `av1C` and expects to hold for
39/// every segment. The decoder re-parses the inline OBU sequence
40/// header in each segment's IDR; if its parsed values disagree with
41/// the av1C from `init.mp4` on any of these fields, strict decoders
42/// (dav1d in conformance mode, Safari AVFoundation, hls.js+libdav1d)
43/// will reject the segment. Optional fields not listed here (timing
44/// info presence, decoder model presence, film grain `present` flag,
45/// operating-point details) are tolerated by every major player; we
46/// deliberately don't check them so that NVENC + QSV + AMF + rav1e
47/// can co-exist on one rendition without cosmetic byte differences
48/// triggering false rejections.
49///
50/// First worker on a rung SETS the invariant. Subsequent workers
51/// (helpers from any vendor) COMPARE; mismatch fails the run loudly
52/// instead of silently corrupting output.
53/// Per-rung codec invariant. Each chunk encoded on a different GPU must agree on
54/// these decode-init fields, or strict players reject the stitched stream. AV1
55/// compares sequence-header fields; H.264/H.265 compare the SPS profile / level
56/// / chroma / bit-depth / dims (the `avcC`/`hvcC` decode-init contract).
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub enum RungCodecInvariant {
59    Av1(Av1Invariant),
60    /// Shared by H.264 + H.265 — a rung is single-codec, so the variant only
61    /// ever compares chunks of the same codec.
62    H26x(H26xInvariant),
63}
64
65impl RungCodecInvariant {
66    /// Human-readable diff for error messages. Empty when the two agree.
67    fn describe_diff(&self, other: &Self) -> String {
68        if self == other {
69            return String::new();
70        }
71        match (self, other) {
72            (RungCodecInvariant::Av1(a), RungCodecInvariant::Av1(b)) => a.describe_diff(b),
73            _ => format!("rung={self:?}, this worker={other:?}"),
74        }
75    }
76}
77
78/// H.264 / H.265 decode-init invariant, derived from the SPS.
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct H26xInvariant {
81    pub profile_idc: u8,
82    pub level_idc: u8,
83    pub chroma_format_idc: u8,
84    pub bit_depth_luma: u8,
85    pub bit_depth_chroma: u8,
86    pub width: u32,
87    pub height: u32,
88}
89
90impl H26xInvariant {
91    fn from_h264(sps: &H264SpsInfo) -> Self {
92        Self {
93            profile_idc: sps.profile_idc,
94            level_idc: sps.level_idc,
95            chroma_format_idc: sps.chroma_format_idc,
96            bit_depth_luma: sps.bit_depth_luma,
97            bit_depth_chroma: sps.bit_depth_chroma,
98            width: sps.width.unwrap_or(0),
99            height: sps.height.unwrap_or(0),
100        }
101    }
102
103    fn from_h265(sps: &HevcSpsInfo) -> Self {
104        Self {
105            profile_idc: sps.profile_idc,
106            level_idc: sps.level_idc,
107            chroma_format_idc: sps.chroma_format_idc,
108            bit_depth_luma: sps.bit_depth_luma,
109            bit_depth_chroma: sps.bit_depth_chroma,
110            width: sps.width.unwrap_or(0),
111            height: sps.height.unwrap_or(0),
112        }
113    }
114}
115
116/// AV1 sequence-header invariant — every encoder contributing segments to a
117/// single rendition MUST agree on these fields.
118///
119/// Why these specific fields: each is part of the codec-init contract that the
120/// player sets up once from `av1C` and expects to hold for every segment. The
121/// decoder re-parses the inline OBU sequence header in each segment's IDR; if
122/// its parsed values disagree with the av1C from `init.mp4`, strict decoders
123/// (dav1d in conformance mode, Safari AVFoundation, hls.js+libdav1d) reject the
124/// segment. Optional fields (timing info, decoder model, film grain present,
125/// operating points) are tolerated by every major player; we deliberately don't
126/// check them so NVENC + QSV + AMF + rav1e co-exist on one rendition without
127/// cosmetic byte differences triggering false rejections.
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct Av1Invariant {
130    pub seq_profile: u8,
131    pub seq_level_idx_0: u8,
132    pub seq_tier_0: u8,
133    pub bit_depth: u8,
134    pub monochrome: bool,
135    pub chroma_subsampling_x: bool,
136    pub chroma_subsampling_y: bool,
137    pub color_primaries: u8,
138    pub transfer_characteristics: u8,
139    pub matrix_coefficients: u8,
140    pub color_range: bool,
141    pub max_frame_width_minus1: u32,
142    pub max_frame_height_minus1: u32,
143    pub still_picture: bool,
144}
145
146impl Av1Invariant {
147    pub fn from_sequence_header(sh: &Av1SequenceHeader) -> Self {
148        Self {
149            seq_profile: sh.seq_profile,
150            seq_level_idx_0: sh.seq_level_idx_0,
151            seq_tier_0: sh.seq_tier_0,
152            bit_depth: sh.bit_depth,
153            monochrome: sh.monochrome,
154            chroma_subsampling_x: sh.chroma_subsampling_x,
155            chroma_subsampling_y: sh.chroma_subsampling_y,
156            color_primaries: sh.color_primaries,
157            transfer_characteristics: sh.transfer_characteristics,
158            matrix_coefficients: sh.matrix_coefficients,
159            color_range: sh.color_range,
160            max_frame_width_minus1: sh.max_frame_width_minus1,
161            max_frame_height_minus1: sh.max_frame_height_minus1,
162            still_picture: sh.still_picture,
163        }
164    }
165
166    /// Human-readable diff for error messages.
167    fn describe_diff(&self, other: &Self) -> String {
168        let mut diffs = Vec::new();
169        macro_rules! diff_field {
170            ($field:ident) => {
171                if self.$field != other.$field {
172                    diffs.push(format!(
173                        "{}: rung={:?}, this worker={:?}",
174                        stringify!($field),
175                        self.$field,
176                        other.$field
177                    ));
178                }
179            };
180        }
181        diff_field!(seq_profile);
182        diff_field!(seq_level_idx_0);
183        diff_field!(seq_tier_0);
184        diff_field!(bit_depth);
185        diff_field!(monochrome);
186        diff_field!(chroma_subsampling_x);
187        diff_field!(chroma_subsampling_y);
188        diff_field!(color_primaries);
189        diff_field!(transfer_characteristics);
190        diff_field!(matrix_coefficients);
191        diff_field!(color_range);
192        diff_field!(max_frame_width_minus1);
193        diff_field!(max_frame_height_minus1);
194        diff_field!(still_picture);
195        diffs.join("; ")
196    }
197}
198
199/// Outcome of comparing a worker's first packet against the rung's
200/// codec invariant. The caller — `run_encoder_worker_blocking` —
201/// branches on this to decide whether to keep encoding, soft-fail
202/// (requeue the chunk for another worker), or hard-fail (parse error
203/// from a malformed bitstream).
204#[derive(Debug)]
205pub enum InvariantCheck {
206    /// First worker on the rung. Invariant has been recorded.
207    SetByThisWorker,
208    /// Matches the rung's invariant. Proceed to publish.
209    Matched,
210    /// Mandatory fields mismatch. Worker should requeue its chunk and
211    /// exit cleanly; the rung continues with workers whose vendors
212    /// agree with the invariant the first worker set. **Mission-
213    /// critical jobs DO NOT abort on this** — only this one helper's
214    /// contribution is lost, and another worker picks up the chunk.
215    Mismatched { diff: String },
216}
217
218/// Parse a worker's first packet, derive the codec invariant, and
219/// compare-or-set it against the per-rung slot. Returns
220/// [`InvariantCheck`] on a successful parse; an `Err` only on
221/// malformed bitstream (the encoder failed to emit an
222/// `OBU_SEQUENCE_HEADER` at all, which is a configuration bug that
223/// nothing downstream can recover from).
224pub fn validate_or_set_rung_invariant(
225    rung_idx: usize,
226    gpu_vendor: Option<codec::gpu::GpuVendor>,
227    slot: &RwLock<Option<RungCodecInvariant>>,
228    first_packet: &[u8],
229    codec: VideoCodec,
230) -> Result<InvariantCheck> {
231    // Derive the codec invariant from the worker's first encoded packet: AV1
232    // from the OBU sequence header, H.264/H.265 from the SPS in the Annex-B AU.
233    let observed = match codec {
234        VideoCodec::Av1 => {
235            let parsed = parse_av1_sequence_header(first_packet).ok_or_else(|| {
236                anyhow!(
237                    "rung {} (vendor {:?}): could not parse AV1 sequence header from first \
238                     encoded packet; encoder did not emit OBU_SEQUENCE_HEADER as required for \
239                     segment alignment",
240                    rung_idx,
241                    gpu_vendor,
242                )
243            })?;
244            RungCodecInvariant::Av1(Av1Invariant::from_sequence_header(&parsed))
245        }
246        VideoCodec::H264 => {
247            let sps = parse_h264_sps(first_packet).ok_or_else(|| {
248                anyhow!(
249                    "rung {} (vendor {:?}): could not parse H.264 SPS from first encoded packet; \
250                     encoder did not emit an SPS NAL on the first IDR",
251                    rung_idx,
252                    gpu_vendor,
253                )
254            })?;
255            RungCodecInvariant::H26x(H26xInvariant::from_h264(&sps))
256        }
257        VideoCodec::H265 => {
258            let sps = parse_hevc_sps(first_packet).ok_or_else(|| {
259                anyhow!(
260                    "rung {} (vendor {:?}): could not parse H.265 SPS from first encoded packet; \
261                     encoder did not emit an SPS NAL on the first IRAP",
262                    rung_idx,
263                    gpu_vendor,
264                )
265            })?;
266            RungCodecInvariant::H26x(H26xInvariant::from_h265(&sps))
267        }
268    };
269
270    // Fast path: read lock, check if set + matches.
271    if let Some(existing) = &*slot.read().unwrap() {
272        if existing == &observed {
273            return Ok(InvariantCheck::Matched);
274        }
275        return Ok(InvariantCheck::Mismatched {
276            diff: existing.describe_diff(&observed),
277        });
278    }
279    // First worker — write under write-lock with double-check (race
280    // against another worker setting the slot between read and write).
281    let mut w = slot.write().unwrap();
282    match &*w {
283        Some(existing) if existing != &observed => Ok(InvariantCheck::Mismatched {
284            diff: existing.describe_diff(&observed),
285        }),
286        Some(_) => Ok(InvariantCheck::Matched),
287        None => {
288            tracing::info!(
289                rung_idx,
290                gpu_vendor = ?gpu_vendor,
291                ?codec,
292                invariant = ?observed,
293                "rung codec invariant captured from first worker"
294            );
295            *w = Some(observed);
296            Ok(InvariantCheck::SetByThisWorker)
297        }
298    }
299}
300
301#[derive(Clone)]
302pub struct EncoderWorkerConfig {
303    pub rung_idx: usize,
304    /// Output video codec (AV1 / H.264 / H.265) — drives the encoder dispatch
305    /// and the per-rung codec invariant parse.
306    pub codec: VideoCodec,
307    pub width: u32,
308    pub height: u32,
309    pub frame_rate: f64,
310    /// Legacy CRF escape hatch (`u8::MAX` = derive from `target`).
311    pub quality: u8,
312    /// Speed preset escape hatch (`u8::MAX` = derive from `tier`).
313    pub speed_preset: u8,
314    /// Perceptual quality target (used when `quality` is the sentinel).
315    pub target: codec::encode::tuning::QualityTarget,
316    /// Speed tier (used when `speed_preset` is the sentinel).
317    pub tier: codec::encode::tuning::SpeedTier,
318    pub threads: usize,
319    pub gpu_index: Option<u32>,
320    pub gpu_vendor: Option<codec::gpu::GpuVendor>,
321    /// Resolved **output** color metadata + pixel format (the encoder's input
322    /// format and bitstream signaling). The engine computes these from the
323    /// `OutputSpec`'s `ColorPolicy` / `BitDepth` via `resolve_output`, so the
324    /// worker no longer folds HDR→SDR itself — it just encodes to this format.
325    pub output_color_metadata: ColorMetadata,
326    pub output_pixel_format: PixelFormat,
327    /// Prefer constant-QP rate control (seam-flat chunked single-file under
328    /// `ChunkSeamMode::ParallelConstQp`). Forwarded to `EncoderConfig.constant_qp`.
329    pub constant_qp: bool,
330    pub timescale: u32,
331    pub per_frame_ticks: u32,
332    pub keyframe_interval: u32,
333    pub segment_target_ticks: u64,
334    pub output_dir: PathBuf,
335    /// Shared per-rung codec invariant slot. First worker on the rung
336    /// SETS it; helpers (any vendor) COMPARE on their first packet.
337    /// On mismatch the helper requeues its chunk and exits cleanly so
338    /// the run continues without it — never aborts mission-critical
339    /// jobs. See `validate_or_set_rung_invariant` + the requeue path
340    /// in `run_encoder_worker_blocking`.
341    pub rung_invariant: Arc<RwLock<Option<RungCodecInvariant>>>,
342}
343
344#[derive(Debug, Clone)]
345pub struct WorkerOutput {
346    pub gpu_index: Option<u32>,
347    pub segments: Vec<SegmentInfo>,
348}
349
350/// Run the encoder loop until the chunk queue is closed and drained.
351/// Designed to be wrapped in `tokio::task::spawn_blocking`.
352///
353/// `progress_tx` receives the shared cumulative `frames_encoded_total`
354/// after every encoded frame; the caller's drain task fires wire
355/// events from this stream. Multiple workers bump the same counter,
356/// so the progress reading stays monotonic across worker handoffs.
357#[allow(clippy::too_many_arguments)]
358pub fn run_encoder_worker_blocking(
359    cfg: EncoderWorkerConfig,
360    queue: Arc<SegmentChunkQueue>,
361    rt: tokio::runtime::Handle,
362    shared_frames_encoded: Arc<std::sync::atomic::AtomicU64>,
363    progress_tx: mpsc::Sender<u64>,
364) -> Result<WorkerOutput> {
365    let enc_config = build_enc_config(&cfg);
366    let encoder_color_metadata = cfg.output_color_metadata;
367
368    let mut segments_written: Vec<SegmentInfo> = Vec::new();
369    let mut init_segment_written = false;
370
371    tracing::debug!(rung_idx = cfg.rung_idx, gpu_index = ?cfg.gpu_index, "encoder worker started; awaiting first chunk");
372    loop {
373        let chunk = match rt.block_on(queue.pop()) {
374            Some(c) => c,
375            None => break,
376        };
377        tracing::debug!(rung_idx = cfg.rung_idx, segment = chunk.segment_idx, frames = chunk.frames.len(), "encoder worker popped chunk");
378        match encode_one_segment(
379            &cfg,
380            &enc_config,
381            encoder_color_metadata,
382            chunk,
383            &mut init_segment_written,
384            &shared_frames_encoded,
385            &progress_tx,
386        )? {
387            SegmentOutcome::Wrote {
388                info,
389                segment_idx,
390                frames,
391            } => {
392                let role = if segment_idx == 0 {
393                    "primary"
394                } else {
395                    "worker"
396                };
397                tracing::info!(
398                    rung_idx = cfg.rung_idx,
399                    gpu_index = ?cfg.gpu_index,
400                    role,
401                    segment = segment_idx,
402                    frames_encoded = frames,
403                    "rung segment flushed",
404                );
405                segments_written.push(info);
406            }
407            SegmentOutcome::RequeuedOnMismatch {
408                chunk: rejected,
409                diff,
410            } => {
411                // Helper from a vendor whose AV1 sequence header diverges
412                // from the rung's invariant on mandatory fields. Put the
413                // chunk back at the head of the queue so a matching-vendor
414                // worker (always at least the initial worker) picks it up.
415                // Exit clean — the run completes without this helper.
416                tracing::warn!(
417                    rung_idx = cfg.rung_idx,
418                    gpu_index = ?cfg.gpu_index,
419                    gpu_vendor = ?cfg.gpu_vendor,
420                    rejected_segment = rejected.segment_idx,
421                    diff = %diff,
422                    "encoder worker: codec invariant mismatch on first packet — \
423                     requeuing chunk for a matching-vendor worker and exiting",
424                );
425                let _ = queue.push_front(rejected);
426                break;
427            }
428        }
429    }
430
431    Ok(WorkerOutput {
432        gpu_index: cfg.gpu_index,
433        segments: segments_written,
434    })
435}
436
437/// Outcome of an `encode_one_segment` call. `Wrote` is the happy
438/// path; `RequeuedOnMismatch` returns the chunk verbatim so the outer
439/// loop can put it back at the head of the queue for another worker.
440enum SegmentOutcome {
441    Wrote {
442        info: SegmentInfo,
443        segment_idx: usize,
444        frames: usize,
445    },
446    RequeuedOnMismatch {
447        chunk: SegmentChunk,
448        diff: String,
449    },
450}
451
452fn encode_one_segment(
453    cfg: &EncoderWorkerConfig,
454    enc_config: &EncoderConfig,
455    encoder_color_metadata: ColorMetadata,
456    chunk: SegmentChunk,
457    init_segment_written: &mut bool,
458    shared_frames_encoded: &std::sync::atomic::AtomicU64,
459    progress_tx: &mpsc::Sender<u64>,
460) -> Result<SegmentOutcome> {
461    let write_init = chunk.segment_idx == 0 && !*init_segment_written;
462    let muxer_options = CmafVideoMuxerOptions {
463        first_segment_index: (chunk.segment_idx as u32) + 1,
464        first_segment_base_decode_time: chunk.segment_idx as u64 * cfg.segment_target_ticks,
465        write_init_segment: write_init,
466    };
467    let mut muxer = CmafVideoMuxer::new_with_codec_options(
468        &cfg.output_dir,
469        cfg.width,
470        cfg.height,
471        cfg.timescale,
472        encoder_color_metadata,
473        cfg.codec,
474        muxer_options,
475    )
476    .with_context(|| {
477        format!(
478            "creating CmafVideoMuxer for segment {} in {}",
479            chunk.segment_idx,
480            cfg.output_dir.display()
481        )
482    })?;
483
484    let mut encoder =
485        encode::select_encoder(enc_config.clone(), None).context("creating encoder for segment")?;
486
487    // Buffered packets emitted from the encoder, awaiting either
488    // commit-to-muxer (after invariant validation passes) or discard
489    // (on mismatch). The first packet's bytes are the AV1 sequence
490    // header OBU that we feed to the invariant validator.
491    let mut pending_packets: Vec<codec::encode::EncodedPacket> = Vec::new();
492    let mut first_packet_decision: Option<bool> = None; // None=undecided, Some(true)=commit, Some(false)=reject
493
494    let segment_idx = chunk.segment_idx;
495    let frame_count = chunk.frames.len();
496
497    for frame in &chunk.frames {
498        encoder
499            .send_frame(frame)
500            .context("encoder.send_frame in worker")?;
501        while let Some(packet) = encoder
502            .receive_packet()
503            .context("encoder.receive_packet in worker")?
504        {
505            if first_packet_decision.is_none() {
506                match validate_or_set_rung_invariant(
507                    cfg.rung_idx,
508                    cfg.gpu_vendor,
509                    &cfg.rung_invariant,
510                    &packet.data,
511                    cfg.codec,
512                )? {
513                    InvariantCheck::Matched | InvariantCheck::SetByThisWorker => {
514                        first_packet_decision = Some(true);
515                    }
516                    InvariantCheck::Mismatched { diff } => {
517                        // Discard everything in flight. The muxer hasn't
518                        // flushed any segment yet (first packet of a
519                        // chunk is far below the segment-duration target),
520                        // and init.mp4 is only written by finalize() —
521                        // which we don't call. Drop muxer + encoder
522                        // implicitly when we return.
523                        return Ok(SegmentOutcome::RequeuedOnMismatch { chunk, diff });
524                    }
525                }
526                pending_packets.push(packet);
527                continue;
528            }
529            // first_packet_decision == Some(true): commit
530            // First drain any buffered packets we held back during
531            // validation.
532            if !pending_packets.is_empty() {
533                for held in pending_packets.drain(..) {
534                    add_packet_with_segment_flush(
535                        &mut muxer,
536                        &held,
537                        cfg.per_frame_ticks,
538                        cfg.segment_target_ticks,
539                    )
540                    .context("CMAF segment-flush add (held)")?;
541                }
542            }
543            add_packet_with_segment_flush(
544                &mut muxer,
545                &packet,
546                cfg.per_frame_ticks,
547                cfg.segment_target_ticks,
548            )
549            .context("CMAF segment-flush add (worker)")?;
550        }
551        let n = shared_frames_encoded.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1;
552        let _ = progress_tx.try_send(n);
553    }
554
555    // Drain remaining held packets (e.g. if the only packets emitted
556    // were buffered during the single validation step).
557    if first_packet_decision == Some(true) && !pending_packets.is_empty() {
558        for held in pending_packets.drain(..) {
559            add_packet_with_segment_flush(
560                &mut muxer,
561                &held,
562                cfg.per_frame_ticks,
563                cfg.segment_target_ticks,
564            )
565            .context("CMAF segment-flush add (final-held)")?;
566        }
567    }
568
569    encoder.flush().context("encoder.flush in worker")?;
570    while let Some(packet) = encoder
571        .receive_packet()
572        .context("encoder.receive_packet after flush")?
573    {
574        add_packet_with_segment_flush(
575            &mut muxer,
576            &packet,
577            cfg.per_frame_ticks,
578            cfg.segment_target_ticks,
579        )
580        .context("CMAF segment-flush add post-flush (worker)")?;
581    }
582
583    let manifest = muxer
584        .finalize()
585        .context("finalize CmafVideoMuxer (per-segment worker)")?;
586
587    if write_init {
588        *init_segment_written = true;
589    }
590
591    let info = manifest
592        .segments
593        .last()
594        .ok_or_else(|| {
595            anyhow::anyhow!(
596                "encoder worker produced no segment for chunk idx {} (rung {}, gpu {:?}); \
597                 frames in chunk = {}",
598                segment_idx,
599                cfg.rung_idx,
600                cfg.gpu_index,
601                frame_count,
602            )
603        })?
604        .clone();
605    Ok(SegmentOutcome::Wrote {
606        info,
607        segment_idx,
608        frames: frame_count,
609    })
610}
611
612// ---------------------------------------------------------------------------
613// Single-file chunked encode: workers collect packets (instead of writing CMAF
614// segments) so the orchestrator can stitch them, in segment order, into one MP4.
615// ---------------------------------------------------------------------------
616
617/// One chunk's encoded packets, in encode (= display, no B-frames) order.
618#[derive(Debug)]
619pub struct ChunkPackets {
620    pub segment_idx: usize,
621    pub packets: Vec<encode::EncodedPacket>,
622}
623
624/// Build the per-rung `EncoderConfig` from the resolved output format + quality
625/// knobs. Shared by the CMAF and packet workers.
626fn build_enc_config(cfg: &EncoderWorkerConfig) -> EncoderConfig {
627    EncoderConfig {
628        codec: cfg.codec,
629        width: cfg.width,
630        height: cfg.height,
631        frame_rate: cfg.frame_rate,
632        quality: cfg.quality,
633        speed_preset: cfg.speed_preset,
634        keyframe_interval: cfg.keyframe_interval,
635        threads: cfg.threads,
636        pixel_format: cfg.output_pixel_format,
637        color_metadata: cfg.output_color_metadata,
638        gpu_index: cfg.gpu_index,
639        gpu_vendor: cfg.gpu_vendor,
640        target: cfg.target,
641        tier: cfg.tier,
642        constant_qp: cfg.constant_qp,
643        ..EncoderConfig::default()
644    }
645}
646
647/// Encoder worker that COLLECTS packets per chunk (single-file path). Each
648/// chunk is encoded by a fresh encoder (first frame an IDR); the cross-vendor
649/// codec invariant is enforced on the first packet (mismatch → requeue + exit,
650/// exactly like the CMAF worker). Ordered `ChunkPackets` are pushed to `out`.
651#[allow(clippy::too_many_arguments)]
652pub fn run_chunk_encoder_worker_blocking(
653    cfg: EncoderWorkerConfig,
654    queue: Arc<SegmentChunkQueue>,
655    rt: tokio::runtime::Handle,
656    shared_frames_encoded: Arc<std::sync::atomic::AtomicU64>,
657    progress_tx: mpsc::Sender<u64>,
658    out: Arc<std::sync::Mutex<Vec<ChunkPackets>>>,
659) -> Result<()> {
660    let enc_config = build_enc_config(&cfg);
661    loop {
662        let chunk = match rt.block_on(queue.pop()) {
663            Some(c) => c,
664            None => break,
665        };
666        match encode_chunk_to_packets(&cfg, &enc_config, chunk, &shared_frames_encoded, &progress_tx)?
667        {
668            ChunkOutcome::Encoded(c) => out.lock().unwrap().push(c),
669            ChunkOutcome::RequeuedOnMismatch { chunk, diff } => {
670                tracing::warn!(
671                    rung_idx = cfg.rung_idx,
672                    gpu_vendor = ?cfg.gpu_vendor,
673                    diff = %diff,
674                    "chunk worker: codec invariant mismatch — requeuing chunk and exiting"
675                );
676                let _ = queue.push_front(chunk);
677                break;
678            }
679        }
680    }
681    Ok(())
682}
683
684enum ChunkOutcome {
685    Encoded(ChunkPackets),
686    RequeuedOnMismatch { chunk: SegmentChunk, diff: String },
687}
688
689fn encode_chunk_to_packets(
690    cfg: &EncoderWorkerConfig,
691    enc_config: &EncoderConfig,
692    chunk: SegmentChunk,
693    shared_frames_encoded: &std::sync::atomic::AtomicU64,
694    progress_tx: &mpsc::Sender<u64>,
695) -> Result<ChunkOutcome> {
696    let mut encoder =
697        encode::select_encoder(enc_config.clone(), None).context("creating encoder for chunk")?;
698    let segment_idx = chunk.segment_idx;
699    let mut packets: Vec<encode::EncodedPacket> = Vec::new();
700    let mut pending: Vec<encode::EncodedPacket> = Vec::new();
701    let mut decided = false;
702
703    for frame in &chunk.frames {
704        encoder.send_frame(frame).context("send_frame in chunk worker")?;
705        while let Some(packet) = encoder.receive_packet().context("receive_packet in chunk worker")? {
706            if !decided {
707                match validate_or_set_rung_invariant(
708                    cfg.rung_idx,
709                    cfg.gpu_vendor,
710                    &cfg.rung_invariant,
711                    &packet.data,
712                    cfg.codec,
713                )? {
714                    InvariantCheck::Matched | InvariantCheck::SetByThisWorker => decided = true,
715                    InvariantCheck::Mismatched { diff } => {
716                        return Ok(ChunkOutcome::RequeuedOnMismatch { chunk, diff });
717                    }
718                }
719                pending.push(packet);
720                continue;
721            }
722            packets.append(&mut pending);
723            packets.push(packet);
724        }
725        let n = shared_frames_encoded.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1;
726        let _ = progress_tx.try_send(n);
727    }
728    if decided {
729        packets.append(&mut pending);
730    }
731    encoder.flush().context("flush in chunk worker")?;
732    while let Some(packet) = encoder
733        .receive_packet()
734        .context("receive_packet after flush in chunk worker")?
735    {
736        packets.push(packet);
737    }
738    Ok(ChunkOutcome::Encoded(ChunkPackets { segment_idx, packets }))
739}
740
741#[cfg(test)]
742mod tests {
743    use super::*;
744
745    #[test]
746    fn config_clone_preserves_fields() {
747        let cfg = EncoderWorkerConfig {
748            rung_idx: 2,
749            codec: VideoCodec::Av1,
750            width: 1280,
751            height: 720,
752            frame_rate: 30.0,
753            quality: 32,
754            speed_preset: u8::MAX,
755            target: codec::encode::tuning::QualityTarget::Standard,
756            tier: codec::encode::tuning::SpeedTier::Standard,
757            threads: 4,
758            gpu_index: Some(1),
759            gpu_vendor: None,
760            output_color_metadata: ColorMetadata::default(),
761            output_pixel_format: PixelFormat::Yuv420p,
762            constant_qp: false,
763            timescale: 30000,
764            per_frame_ticks: 1000,
765            keyframe_interval: 60,
766            segment_target_ticks: 60_000,
767            output_dir: PathBuf::from("/tmp/x"),
768            rung_invariant: Arc::new(RwLock::new(None)),
769        };
770        let copy = cfg.clone();
771        assert_eq!(copy.rung_idx, 2);
772        assert_eq!(copy.keyframe_interval, 60);
773    }
774
775    #[test]
776    fn invariant_matches_itself() {
777        let a = RungCodecInvariant::Av1(Av1Invariant {
778            seq_profile: 0,
779            seq_level_idx_0: 8,
780            seq_tier_0: 0,
781            bit_depth: 8,
782            monochrome: false,
783            chroma_subsampling_x: true,
784            chroma_subsampling_y: true,
785            color_primaries: 1,
786            transfer_characteristics: 1,
787            matrix_coefficients: 1,
788            color_range: false,
789            max_frame_width_minus1: 1919,
790            max_frame_height_minus1: 1079,
791            still_picture: false,
792        });
793        assert_eq!(a.clone(), a);
794        assert_eq!(a.describe_diff(&a), "");
795    }
796
797    #[test]
798    fn invariant_diff_lists_changed_fields() {
799        let inner = Av1Invariant {
800            seq_profile: 0,
801            seq_level_idx_0: 8,
802            seq_tier_0: 0,
803            bit_depth: 8,
804            monochrome: false,
805            chroma_subsampling_x: true,
806            chroma_subsampling_y: true,
807            color_primaries: 1,
808            transfer_characteristics: 1,
809            matrix_coefficients: 1,
810            color_range: false,
811            max_frame_width_minus1: 1919,
812            max_frame_height_minus1: 1079,
813            still_picture: false,
814        };
815        let mut inner_b = inner.clone();
816        inner_b.bit_depth = 10;
817        inner_b.color_primaries = 9;
818        let a = RungCodecInvariant::Av1(inner);
819        let b = RungCodecInvariant::Av1(inner_b);
820        let diff = a.describe_diff(&b);
821        assert!(diff.contains("bit_depth"));
822        assert!(diff.contains("color_primaries"));
823        assert!(!diff.contains("seq_profile"));
824    }
825
826    #[test]
827    fn validator_parse_error_returns_err_not_mismatch() {
828        // Junk bytes — no recognisable AV1 sequence header OBU.
829        // Distinct from a mismatch: this is a malformed-bitstream
830        // condition that nothing downstream can recover from. The
831        // worker propagates this Err and fails the run, unlike the
832        // soft-fail Mismatched case.
833        let slot: RwLock<Option<RungCodecInvariant>> = RwLock::new(None);
834        let junk = vec![0u8; 8];
835        let err = validate_or_set_rung_invariant(
836            0,
837            Some(codec::gpu::GpuVendor::Intel),
838            &slot,
839            &junk,
840            VideoCodec::Av1,
841        )
842        .unwrap_err();
843        assert!(
844            err.to_string()
845                .contains("could not parse AV1 sequence header")
846        );
847        assert!(slot.read().unwrap().is_none());
848    }
849
850    #[test]
851    fn mismatched_diff_includes_changed_field() {
852        let inner = Av1Invariant {
853            seq_profile: 0,
854            seq_level_idx_0: 8,
855            seq_tier_0: 0,
856            bit_depth: 8,
857            monochrome: false,
858            chroma_subsampling_x: true,
859            chroma_subsampling_y: true,
860            color_primaries: 1,
861            transfer_characteristics: 1,
862            matrix_coefficients: 1,
863            color_range: false,
864            max_frame_width_minus1: 1919,
865            max_frame_height_minus1: 1079,
866            still_picture: false,
867        };
868        let mut other_inner = inner.clone();
869        other_inner.bit_depth = 10;
870        let existing = RungCodecInvariant::Av1(inner);
871        let other = RungCodecInvariant::Av1(other_inner);
872        let diff = existing.describe_diff(&other);
873        assert!(
874            diff.contains("bit_depth"),
875            "diff should mention bit_depth; got {diff}"
876        );
877    }
878
879    #[test]
880    fn h26x_invariant_equality_and_diff() {
881        // Two H.264/H.265 chunks agree iff their SPS-derived fields match.
882        let a = RungCodecInvariant::H26x(H26xInvariant {
883            profile_idc: 100,
884            level_idc: 31,
885            chroma_format_idc: 1,
886            bit_depth_luma: 8,
887            bit_depth_chroma: 8,
888            width: 1280,
889            height: 720,
890        });
891        assert_eq!(a.clone(), a);
892        assert_eq!(a.describe_diff(&a), "");
893        let b = RungCodecInvariant::H26x(H26xInvariant {
894            profile_idc: 100,
895            level_idc: 31,
896            chroma_format_idc: 1,
897            bit_depth_luma: 10, // a 10-bit chunk must NOT match an 8-bit one
898            bit_depth_chroma: 10,
899            width: 1280,
900            height: 720,
901        });
902        assert_ne!(a, b);
903        assert!(!a.describe_diff(&b).is_empty());
904        // An AV1 invariant never equals an H26x one (defensive — single-codec rungs).
905        let av1 = RungCodecInvariant::Av1(Av1Invariant {
906            seq_profile: 0,
907            seq_level_idx_0: 8,
908            seq_tier_0: 0,
909            bit_depth: 8,
910            monochrome: false,
911            chroma_subsampling_x: true,
912            chroma_subsampling_y: true,
913            color_primaries: 1,
914            transfer_characteristics: 1,
915            matrix_coefficients: 1,
916            color_range: false,
917            max_frame_width_minus1: 1279,
918            max_frame_height_minus1: 719,
919            still_picture: false,
920        });
921        assert_ne!(a, av1);
922    }
923}