1use anyhow::{Context, Result, anyhow};
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::RwLock;
21
22use codec::encode::{self, EncoderConfig};
23use codec::frame::{ColorMetadata, PixelFormat};
24use codec::pixel_format::{Av1SequenceHeader, parse_av1_sequence_header};
25use container::cmaf::{CmafVideoMuxer, CmafVideoMuxerOptions, SegmentInfo};
26use tokio::sync::mpsc;
27
28use crate::cmaf_util::add_packet_with_segment_flush;
29use crate::frame_queue::{SegmentChunk, SegmentChunkQueue};
30
31#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct RungCodecInvariant {
52 pub seq_profile: u8,
53 pub seq_level_idx_0: u8,
54 pub seq_tier_0: u8,
55 pub bit_depth: u8,
56 pub monochrome: bool,
57 pub chroma_subsampling_x: bool,
58 pub chroma_subsampling_y: bool,
59 pub color_primaries: u8,
60 pub transfer_characteristics: u8,
61 pub matrix_coefficients: u8,
62 pub color_range: bool,
63 pub max_frame_width_minus1: u32,
64 pub max_frame_height_minus1: u32,
65 pub still_picture: bool,
66}
67
68impl RungCodecInvariant {
69 pub fn from_sequence_header(sh: &Av1SequenceHeader) -> Self {
70 Self {
71 seq_profile: sh.seq_profile,
72 seq_level_idx_0: sh.seq_level_idx_0,
73 seq_tier_0: sh.seq_tier_0,
74 bit_depth: sh.bit_depth,
75 monochrome: sh.monochrome,
76 chroma_subsampling_x: sh.chroma_subsampling_x,
77 chroma_subsampling_y: sh.chroma_subsampling_y,
78 color_primaries: sh.color_primaries,
79 transfer_characteristics: sh.transfer_characteristics,
80 matrix_coefficients: sh.matrix_coefficients,
81 color_range: sh.color_range,
82 max_frame_width_minus1: sh.max_frame_width_minus1,
83 max_frame_height_minus1: sh.max_frame_height_minus1,
84 still_picture: sh.still_picture,
85 }
86 }
87
88 fn describe_diff(&self, other: &Self) -> String {
90 let mut diffs = Vec::new();
91 macro_rules! diff_field {
92 ($field:ident) => {
93 if self.$field != other.$field {
94 diffs.push(format!(
95 "{}: rung={:?}, this worker={:?}",
96 stringify!($field),
97 self.$field,
98 other.$field
99 ));
100 }
101 };
102 }
103 diff_field!(seq_profile);
104 diff_field!(seq_level_idx_0);
105 diff_field!(seq_tier_0);
106 diff_field!(bit_depth);
107 diff_field!(monochrome);
108 diff_field!(chroma_subsampling_x);
109 diff_field!(chroma_subsampling_y);
110 diff_field!(color_primaries);
111 diff_field!(transfer_characteristics);
112 diff_field!(matrix_coefficients);
113 diff_field!(color_range);
114 diff_field!(max_frame_width_minus1);
115 diff_field!(max_frame_height_minus1);
116 diff_field!(still_picture);
117 diffs.join("; ")
118 }
119}
120
121#[derive(Debug)]
127pub enum InvariantCheck {
128 SetByThisWorker,
130 Matched,
132 Mismatched { diff: String },
138}
139
140pub fn validate_or_set_rung_invariant(
147 rung_idx: usize,
148 gpu_vendor: Option<codec::gpu::GpuVendor>,
149 slot: &RwLock<Option<RungCodecInvariant>>,
150 first_packet: &[u8],
151) -> Result<InvariantCheck> {
152 let parsed = parse_av1_sequence_header(first_packet).ok_or_else(|| {
153 anyhow!(
154 "rung {} (vendor {:?}): could not parse AV1 sequence header from first encoded packet; \
155 encoder did not emit OBU_SEQUENCE_HEADER as required for CMAF segment alignment",
156 rung_idx,
157 gpu_vendor,
158 )
159 })?;
160 let observed = RungCodecInvariant::from_sequence_header(&parsed);
161
162 if let Some(existing) = &*slot.read().unwrap() {
164 if existing == &observed {
165 return Ok(InvariantCheck::Matched);
166 }
167 return Ok(InvariantCheck::Mismatched {
168 diff: existing.describe_diff(&observed),
169 });
170 }
171 let mut w = slot.write().unwrap();
174 match &*w {
175 Some(existing) if existing != &observed => Ok(InvariantCheck::Mismatched {
176 diff: existing.describe_diff(&observed),
177 }),
178 Some(_) => Ok(InvariantCheck::Matched),
179 None => {
180 tracing::info!(
181 rung_idx,
182 gpu_vendor = ?gpu_vendor,
183 seq_profile = observed.seq_profile,
184 seq_level_idx_0 = observed.seq_level_idx_0,
185 bit_depth = observed.bit_depth,
186 "rung codec invariant captured from first worker"
187 );
188 *w = Some(observed);
189 Ok(InvariantCheck::SetByThisWorker)
190 }
191 }
192}
193
194#[derive(Clone)]
195pub struct EncoderWorkerConfig {
196 pub rung_idx: usize,
197 pub width: u32,
198 pub height: u32,
199 pub frame_rate: f64,
200 pub quality: u8,
202 pub speed_preset: u8,
204 pub target: codec::encode::tuning::QualityTarget,
206 pub tier: codec::encode::tuning::SpeedTier,
208 pub threads: usize,
209 pub gpu_index: Option<u32>,
210 pub gpu_vendor: Option<codec::gpu::GpuVendor>,
211 pub output_color_metadata: ColorMetadata,
216 pub output_pixel_format: PixelFormat,
217 pub constant_qp: bool,
220 pub timescale: u32,
221 pub per_frame_ticks: u32,
222 pub keyframe_interval: u32,
223 pub segment_target_ticks: u64,
224 pub output_dir: PathBuf,
225 pub rung_invariant: Arc<RwLock<Option<RungCodecInvariant>>>,
232}
233
234#[derive(Debug, Clone)]
235pub struct WorkerOutput {
236 pub gpu_index: Option<u32>,
237 pub segments: Vec<SegmentInfo>,
238}
239
240#[allow(clippy::too_many_arguments)]
248pub fn run_encoder_worker_blocking(
249 cfg: EncoderWorkerConfig,
250 queue: Arc<SegmentChunkQueue>,
251 rt: tokio::runtime::Handle,
252 shared_frames_encoded: Arc<std::sync::atomic::AtomicU64>,
253 progress_tx: mpsc::Sender<u64>,
254) -> Result<WorkerOutput> {
255 let enc_config = build_enc_config(&cfg);
256 let encoder_color_metadata = cfg.output_color_metadata;
257
258 let mut segments_written: Vec<SegmentInfo> = Vec::new();
259 let mut init_segment_written = false;
260
261 tracing::debug!(rung_idx = cfg.rung_idx, gpu_index = ?cfg.gpu_index, "encoder worker started; awaiting first chunk");
262 loop {
263 let chunk = match rt.block_on(queue.pop()) {
264 Some(c) => c,
265 None => break,
266 };
267 tracing::debug!(rung_idx = cfg.rung_idx, segment = chunk.segment_idx, frames = chunk.frames.len(), "encoder worker popped chunk");
268 match encode_one_segment(
269 &cfg,
270 &enc_config,
271 encoder_color_metadata,
272 chunk,
273 &mut init_segment_written,
274 &shared_frames_encoded,
275 &progress_tx,
276 )? {
277 SegmentOutcome::Wrote {
278 info,
279 segment_idx,
280 frames,
281 } => {
282 let role = if segment_idx == 0 {
283 "primary"
284 } else {
285 "worker"
286 };
287 tracing::info!(
288 rung_idx = cfg.rung_idx,
289 gpu_index = ?cfg.gpu_index,
290 role,
291 segment = segment_idx,
292 frames_encoded = frames,
293 "rung segment flushed",
294 );
295 segments_written.push(info);
296 }
297 SegmentOutcome::RequeuedOnMismatch {
298 chunk: rejected,
299 diff,
300 } => {
301 tracing::warn!(
307 rung_idx = cfg.rung_idx,
308 gpu_index = ?cfg.gpu_index,
309 gpu_vendor = ?cfg.gpu_vendor,
310 rejected_segment = rejected.segment_idx,
311 diff = %diff,
312 "encoder worker: codec invariant mismatch on first packet — \
313 requeuing chunk for a matching-vendor worker and exiting",
314 );
315 let _ = queue.push_front(rejected);
316 break;
317 }
318 }
319 }
320
321 Ok(WorkerOutput {
322 gpu_index: cfg.gpu_index,
323 segments: segments_written,
324 })
325}
326
327enum SegmentOutcome {
331 Wrote {
332 info: SegmentInfo,
333 segment_idx: usize,
334 frames: usize,
335 },
336 RequeuedOnMismatch {
337 chunk: SegmentChunk,
338 diff: String,
339 },
340}
341
342fn encode_one_segment(
343 cfg: &EncoderWorkerConfig,
344 enc_config: &EncoderConfig,
345 encoder_color_metadata: ColorMetadata,
346 chunk: SegmentChunk,
347 init_segment_written: &mut bool,
348 shared_frames_encoded: &std::sync::atomic::AtomicU64,
349 progress_tx: &mpsc::Sender<u64>,
350) -> Result<SegmentOutcome> {
351 let write_init = chunk.segment_idx == 0 && !*init_segment_written;
352 let muxer_options = CmafVideoMuxerOptions {
353 first_segment_index: (chunk.segment_idx as u32) + 1,
354 first_segment_base_decode_time: chunk.segment_idx as u64 * cfg.segment_target_ticks,
355 write_init_segment: write_init,
356 };
357 let mut muxer = CmafVideoMuxer::new_with_options(
358 &cfg.output_dir,
359 cfg.width,
360 cfg.height,
361 cfg.timescale,
362 encoder_color_metadata,
363 muxer_options,
364 )
365 .with_context(|| {
366 format!(
367 "creating CmafVideoMuxer for segment {} in {}",
368 chunk.segment_idx,
369 cfg.output_dir.display()
370 )
371 })?;
372
373 let mut encoder =
374 encode::select_encoder(enc_config.clone(), None).context("creating encoder for segment")?;
375
376 let mut pending_packets: Vec<codec::encode::EncodedPacket> = Vec::new();
381 let mut first_packet_decision: Option<bool> = None; let segment_idx = chunk.segment_idx;
384 let frame_count = chunk.frames.len();
385
386 for frame in &chunk.frames {
387 encoder
388 .send_frame(frame)
389 .context("encoder.send_frame in worker")?;
390 while let Some(packet) = encoder
391 .receive_packet()
392 .context("encoder.receive_packet in worker")?
393 {
394 if first_packet_decision.is_none() {
395 match validate_or_set_rung_invariant(
396 cfg.rung_idx,
397 cfg.gpu_vendor,
398 &cfg.rung_invariant,
399 &packet.data,
400 )? {
401 InvariantCheck::Matched | InvariantCheck::SetByThisWorker => {
402 first_packet_decision = Some(true);
403 }
404 InvariantCheck::Mismatched { diff } => {
405 return Ok(SegmentOutcome::RequeuedOnMismatch { chunk, diff });
412 }
413 }
414 pending_packets.push(packet);
415 continue;
416 }
417 if !pending_packets.is_empty() {
421 for held in pending_packets.drain(..) {
422 add_packet_with_segment_flush(
423 &mut muxer,
424 &held,
425 cfg.per_frame_ticks,
426 cfg.segment_target_ticks,
427 )
428 .context("CMAF segment-flush add (held)")?;
429 }
430 }
431 add_packet_with_segment_flush(
432 &mut muxer,
433 &packet,
434 cfg.per_frame_ticks,
435 cfg.segment_target_ticks,
436 )
437 .context("CMAF segment-flush add (worker)")?;
438 }
439 let n = shared_frames_encoded.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1;
440 let _ = progress_tx.try_send(n);
441 }
442
443 if first_packet_decision == Some(true) && !pending_packets.is_empty() {
446 for held in pending_packets.drain(..) {
447 add_packet_with_segment_flush(
448 &mut muxer,
449 &held,
450 cfg.per_frame_ticks,
451 cfg.segment_target_ticks,
452 )
453 .context("CMAF segment-flush add (final-held)")?;
454 }
455 }
456
457 encoder.flush().context("encoder.flush in worker")?;
458 while let Some(packet) = encoder
459 .receive_packet()
460 .context("encoder.receive_packet after flush")?
461 {
462 add_packet_with_segment_flush(
463 &mut muxer,
464 &packet,
465 cfg.per_frame_ticks,
466 cfg.segment_target_ticks,
467 )
468 .context("CMAF segment-flush add post-flush (worker)")?;
469 }
470
471 let manifest = muxer
472 .finalize()
473 .context("finalize CmafVideoMuxer (per-segment worker)")?;
474
475 if write_init {
476 *init_segment_written = true;
477 }
478
479 let info = manifest
480 .segments
481 .last()
482 .ok_or_else(|| {
483 anyhow::anyhow!(
484 "encoder worker produced no segment for chunk idx {} (rung {}, gpu {:?}); \
485 frames in chunk = {}",
486 segment_idx,
487 cfg.rung_idx,
488 cfg.gpu_index,
489 frame_count,
490 )
491 })?
492 .clone();
493 Ok(SegmentOutcome::Wrote {
494 info,
495 segment_idx,
496 frames: frame_count,
497 })
498}
499
500#[derive(Debug)]
507pub struct ChunkPackets {
508 pub segment_idx: usize,
509 pub packets: Vec<encode::EncodedPacket>,
510}
511
512fn build_enc_config(cfg: &EncoderWorkerConfig) -> EncoderConfig {
515 EncoderConfig {
516 width: cfg.width,
517 height: cfg.height,
518 frame_rate: cfg.frame_rate,
519 quality: cfg.quality,
520 speed_preset: cfg.speed_preset,
521 keyframe_interval: cfg.keyframe_interval,
522 threads: cfg.threads,
523 pixel_format: cfg.output_pixel_format,
524 color_metadata: cfg.output_color_metadata,
525 gpu_index: cfg.gpu_index,
526 gpu_vendor: cfg.gpu_vendor,
527 target: cfg.target,
528 tier: cfg.tier,
529 constant_qp: cfg.constant_qp,
530 ..EncoderConfig::default()
531 }
532}
533
534#[allow(clippy::too_many_arguments)]
539pub fn run_chunk_encoder_worker_blocking(
540 cfg: EncoderWorkerConfig,
541 queue: Arc<SegmentChunkQueue>,
542 rt: tokio::runtime::Handle,
543 shared_frames_encoded: Arc<std::sync::atomic::AtomicU64>,
544 progress_tx: mpsc::Sender<u64>,
545 out: Arc<std::sync::Mutex<Vec<ChunkPackets>>>,
546) -> Result<()> {
547 let enc_config = build_enc_config(&cfg);
548 loop {
549 let chunk = match rt.block_on(queue.pop()) {
550 Some(c) => c,
551 None => break,
552 };
553 match encode_chunk_to_packets(&cfg, &enc_config, chunk, &shared_frames_encoded, &progress_tx)?
554 {
555 ChunkOutcome::Encoded(c) => out.lock().unwrap().push(c),
556 ChunkOutcome::RequeuedOnMismatch { chunk, diff } => {
557 tracing::warn!(
558 rung_idx = cfg.rung_idx,
559 gpu_vendor = ?cfg.gpu_vendor,
560 diff = %diff,
561 "chunk worker: codec invariant mismatch — requeuing chunk and exiting"
562 );
563 let _ = queue.push_front(chunk);
564 break;
565 }
566 }
567 }
568 Ok(())
569}
570
571enum ChunkOutcome {
572 Encoded(ChunkPackets),
573 RequeuedOnMismatch { chunk: SegmentChunk, diff: String },
574}
575
576fn encode_chunk_to_packets(
577 cfg: &EncoderWorkerConfig,
578 enc_config: &EncoderConfig,
579 chunk: SegmentChunk,
580 shared_frames_encoded: &std::sync::atomic::AtomicU64,
581 progress_tx: &mpsc::Sender<u64>,
582) -> Result<ChunkOutcome> {
583 let mut encoder =
584 encode::select_encoder(enc_config.clone(), None).context("creating encoder for chunk")?;
585 let segment_idx = chunk.segment_idx;
586 let mut packets: Vec<encode::EncodedPacket> = Vec::new();
587 let mut pending: Vec<encode::EncodedPacket> = Vec::new();
588 let mut decided = false;
589
590 for frame in &chunk.frames {
591 encoder.send_frame(frame).context("send_frame in chunk worker")?;
592 while let Some(packet) = encoder.receive_packet().context("receive_packet in chunk worker")? {
593 if !decided {
594 match validate_or_set_rung_invariant(
595 cfg.rung_idx,
596 cfg.gpu_vendor,
597 &cfg.rung_invariant,
598 &packet.data,
599 )? {
600 InvariantCheck::Matched | InvariantCheck::SetByThisWorker => decided = true,
601 InvariantCheck::Mismatched { diff } => {
602 return Ok(ChunkOutcome::RequeuedOnMismatch { chunk, diff });
603 }
604 }
605 pending.push(packet);
606 continue;
607 }
608 packets.append(&mut pending);
609 packets.push(packet);
610 }
611 let n = shared_frames_encoded.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1;
612 let _ = progress_tx.try_send(n);
613 }
614 if decided {
615 packets.append(&mut pending);
616 }
617 encoder.flush().context("flush in chunk worker")?;
618 while let Some(packet) = encoder
619 .receive_packet()
620 .context("receive_packet after flush in chunk worker")?
621 {
622 packets.push(packet);
623 }
624 Ok(ChunkOutcome::Encoded(ChunkPackets { segment_idx, packets }))
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630
631 #[test]
632 fn config_clone_preserves_fields() {
633 let cfg = EncoderWorkerConfig {
634 rung_idx: 2,
635 width: 1280,
636 height: 720,
637 frame_rate: 30.0,
638 quality: 32,
639 speed_preset: u8::MAX,
640 target: codec::encode::tuning::QualityTarget::Standard,
641 tier: codec::encode::tuning::SpeedTier::Standard,
642 threads: 4,
643 gpu_index: Some(1),
644 gpu_vendor: None,
645 output_color_metadata: ColorMetadata::default(),
646 output_pixel_format: PixelFormat::Yuv420p,
647 constant_qp: false,
648 timescale: 30000,
649 per_frame_ticks: 1000,
650 keyframe_interval: 60,
651 segment_target_ticks: 60_000,
652 output_dir: PathBuf::from("/tmp/x"),
653 rung_invariant: Arc::new(RwLock::new(None)),
654 };
655 let copy = cfg.clone();
656 assert_eq!(copy.rung_idx, 2);
657 assert_eq!(copy.keyframe_interval, 60);
658 }
659
660 #[test]
661 fn invariant_matches_itself() {
662 let a = RungCodecInvariant {
663 seq_profile: 0,
664 seq_level_idx_0: 8,
665 seq_tier_0: 0,
666 bit_depth: 8,
667 monochrome: false,
668 chroma_subsampling_x: true,
669 chroma_subsampling_y: true,
670 color_primaries: 1,
671 transfer_characteristics: 1,
672 matrix_coefficients: 1,
673 color_range: false,
674 max_frame_width_minus1: 1919,
675 max_frame_height_minus1: 1079,
676 still_picture: false,
677 };
678 assert_eq!(a.clone(), a);
679 assert_eq!(a.describe_diff(&a), "");
680 }
681
682 #[test]
683 fn invariant_diff_lists_changed_fields() {
684 let a = RungCodecInvariant {
685 seq_profile: 0,
686 seq_level_idx_0: 8,
687 seq_tier_0: 0,
688 bit_depth: 8,
689 monochrome: false,
690 chroma_subsampling_x: true,
691 chroma_subsampling_y: true,
692 color_primaries: 1,
693 transfer_characteristics: 1,
694 matrix_coefficients: 1,
695 color_range: false,
696 max_frame_width_minus1: 1919,
697 max_frame_height_minus1: 1079,
698 still_picture: false,
699 };
700 let mut b = a.clone();
701 b.bit_depth = 10;
702 b.color_primaries = 9;
703 let diff = a.describe_diff(&b);
704 assert!(diff.contains("bit_depth"));
705 assert!(diff.contains("color_primaries"));
706 assert!(!diff.contains("seq_profile"));
707 }
708
709 #[test]
710 fn validator_parse_error_returns_err_not_mismatch() {
711 let slot: RwLock<Option<RungCodecInvariant>> = RwLock::new(None);
717 let junk = vec![0u8; 8];
718 let err =
719 validate_or_set_rung_invariant(0, Some(codec::gpu::GpuVendor::Intel), &slot, &junk)
720 .unwrap_err();
721 assert!(
722 err.to_string()
723 .contains("could not parse AV1 sequence header")
724 );
725 assert!(slot.read().unwrap().is_none());
726 }
727
728 #[test]
729 fn mismatched_diff_includes_changed_field() {
730 let existing = RungCodecInvariant {
731 seq_profile: 0,
732 seq_level_idx_0: 8,
733 seq_tier_0: 0,
734 bit_depth: 8,
735 monochrome: false,
736 chroma_subsampling_x: true,
737 chroma_subsampling_y: true,
738 color_primaries: 1,
739 transfer_characteristics: 1,
740 matrix_coefficients: 1,
741 color_range: false,
742 max_frame_width_minus1: 1919,
743 max_frame_height_minus1: 1079,
744 still_picture: false,
745 };
746 let mut other = existing.clone();
747 other.bit_depth = 10;
748 let diff = existing.describe_diff(&other);
749 assert!(
750 diff.contains("bit_depth"),
751 "diff should mention bit_depth; got {diff}"
752 );
753 }
754}