Skip to main content

rivet/
multigpu.rs

1//! Multi-GPU reactive variant phase — **the rung benefit**.
2//!
3//! Decode the source **once** and dynamically schedule every rung's CMAF
4//! segments across all available GPUs using a fair lease pool with mid-flight
5//! helper dispatch:
6//!
7//! ```text
8//!   decode pump (decode once)
9//!        │  fan out normalized frames
10//!        ▼
11//!   per-rung scaler ──► SegmentChunkQueue ──► encoder worker (holds a GpuLease)
12//!                                        ──► helper worker (claims a freed lease)
13//! ```
14//!
15//! - One encoder per GPU at a time ([`GpuPool`] enforces it — concurrent
16//!   NVENC sessions on one context deadlock).
17//! - A fast rung releases its lease early; the **helper dispatcher** grabs the
18//!   freed lease and attaches an extra worker to a still-busy rung, so a slow
19//!   rung finishes sooner. Segment work is the unit of parallelism.
20//! - Helpers may land on a different GPU **vendor** than the rung's first
21//!   worker; the per-rung AV1 **codec invariant** ([`RungCodecInvariant`])
22//!   guarantees every contributed segment shares the `av1C` contract, so a
23//!   cross-vendor (NVENC + QSV) rendition still decodes cleanly. A mismatched
24//!   helper requeues its chunk and exits — the run never aborts on it.
25//!
26//! Storage/transport specifics stay out of the engine: progress is reported
27//! through the generic [`ProgressSink`], so a consumer can layer an uploader
28//! (object storage, a status queue, …) on top by watching `RungStatus::Completed`.
29
30use std::path::PathBuf;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
33
34use anyhow::{Result, anyhow, bail};
35use bytes::Bytes;
36use codec::encode::AUTO_FROM_TARGET;
37use codec::frame::{ColorMetadata, PixelFormat, VideoCodec};
38use container::cmaf::CmafTrackManifest;
39use container::streaming::DemuxHeader;
40use tokio::sync::{Notify, mpsc};
41use tokio::task::JoinSet;
42
43use codec::encode::EncodedPacket;
44
45use crate::cmaf_util::{RungContribution, merge_rung_contributions, total_segments_for_rung};
46use crate::decode_pump::DecodePumpConfig;
47use crate::encoder_worker::{
48    ChunkPackets, EncoderWorkerConfig, RungCodecInvariant, WorkerOutput,
49    run_chunk_encoder_worker_blocking, run_encoder_worker_blocking,
50};
51use crate::frame_queue::SegmentChunkQueue;
52use crate::gpu_pool::{GpuLease, GpuPool};
53use crate::progress::{ProgressSink, RungProgress, RungStatus};
54use crate::spec::{EncodePolicy, GpuFamily, Rung};
55
56const QUEUE_CAPACITY: usize = 2;
57const FANOUT_CHANNEL_CAPACITY: usize = 4;
58const HELPER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(200);
59const PROGRESS_TICK: std::time::Duration = std::time::Duration::from_millis(500);
60
61/// One rung's finalized CMAF manifest.
62#[derive(Debug, Clone)]
63pub struct RungManifest {
64    pub rung_index: usize,
65    pub width: u32,
66    pub height: u32,
67    pub label: String,
68    /// Directory relative to the asset root, e.g. `"video/720p"`.
69    pub relative_dir: String,
70    pub manifest: CmafTrackManifest,
71}
72
73/// Inputs to [`run_multigpu_hls`].
74pub struct MultiGpuParams<'a> {
75    pub input: Bytes,
76    /// Output video codec — drives the per-worker encoder dispatch, the codec
77    /// invariant parse, and the stitch muxer's sample-entry choice.
78    pub codec: VideoCodec,
79    pub rungs: &'a [Rung],
80    pub header: DemuxHeader,
81    pub source_color_metadata: ColorMetadata,
82    pub source_pixel_format: PixelFormat,
83    /// Whether the decode pump tonemaps HDR→SDR (from the spec's `ColorPolicy`).
84    pub tonemap_to_sdr: bool,
85    /// Resolved **output** color metadata + pixel format the encoders target
86    /// (from `OutputSpec::resolve_output`).
87    pub output_color_metadata: ColorMetadata,
88    pub output_pixel_format: PixelFormat,
89    pub needs_downsample: bool,
90    /// Prepared per-frame video filter chain applied in the decode pump (before
91    /// scaling). Overlay images are loaded once at prepare time.
92    pub filters: Arc<codec::filter::FilterChain>,
93    pub frame_rate: f64,
94    pub gpu_pool: Arc<GpuPool>,
95    /// GPU indices the encode policy selected, in detection order. The decode
96    /// pump pins to these (round-robin for per-rung pumps) so decode honors the
97    /// same `Family` / `SingleGpu` / `AllGpus` constraint as encode. Empty ⇒
98    /// the decoder dispatch auto-selects (legacy behavior).
99    pub gpu_indices: Vec<u32>,
100    /// Explicit decode-pump GPU override. `Some(i)` forces every decode pump
101    /// onto GPU `i` regardless of `gpu_indices`; `None` follows the policy.
102    pub decode_gpu: Option<u32>,
103    pub output_root: PathBuf,
104    pub timescale: u32,
105    pub per_frame_ticks: u32,
106    pub keyframe_interval: u32,
107    pub segment_target_ticks: u64,
108    pub total_input_frames: u64,
109    /// Force constant-QP chunk encoding (single-file `ChunkSeamMode::ParallelConstQp`)
110    /// so stitched chunk seams are quality-flat. `false` for HLS (segments are
111    /// independent) and the default `Parallel` single-file mode.
112    pub constant_qp: bool,
113}
114
115impl MultiGpuParams<'_> {
116    /// Resolve the decode-pump GPU for the `i`-th per-rung pump (or the shared
117    /// pump when `i == 0`): the explicit `decode_gpu` override wins, else the
118    /// policy's GPU indices round-robin, else `None` (decoder auto-select).
119    fn decode_gpu_for(&self, i: usize) -> Option<u32> {
120        if self.decode_gpu.is_some() {
121            return self.decode_gpu;
122        }
123        if self.gpu_indices.is_empty() {
124            return None;
125        }
126        Some(self.gpu_indices[i % self.gpu_indices.len()])
127    }
128}
129
130/// Run the reactive multi-GPU variant phase. Returns one `Option<RungManifest>`
131/// per rung (in rung order); `None` means the rung produced no segments.
132pub async fn run_multigpu_hls(
133    params: MultiGpuParams<'_>,
134    sink: Arc<dyn ProgressSink>,
135) -> Result<Vec<Option<RungManifest>>> {
136    let rungs = params.rungs;
137    let n = rungs.len();
138    if n == 0 {
139        return Ok(Vec::new());
140    }
141    let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
142    if total_segments == 0 {
143        bail!(
144            "multigpu: total_segments == 0 (total_input_frames={}, keyframe_interval={})",
145            params.total_input_frames,
146            params.keyframe_interval
147        );
148    }
149
150    // Pre-flight: verify this host can actually construct an encoder for the
151    // requested codec before spawning the orchestration. Fail fast with a clear
152    // error instead of dispatching workers that fail at encoder construction —
153    // and, on drivers that re-init a failed NVENC session badly (e.g. Ampere
154    // with no AV1-encode silicon), would otherwise hang an uncancellable task.
155    {
156        let probe = codec::encode::EncoderConfig {
157            width: rungs[0].width,
158            height: rungs[0].height,
159            frame_rate: params.frame_rate,
160            gpu_index: None,
161            codec: params.codec,
162            ..Default::default()
163        };
164        codec::encode::select_encoder(probe, None).map_err(|e| {
165            anyhow!(
166                "no {:?} encoder available on this host ({e}); need NVENC / AMF / QSV, or build \
167                 with the `ffmpeg` feature for a software encoder",
168                params.codec
169            )
170        })?;
171    }
172
173    tracing::info!(
174        rungs = n,
175        total_segments,
176        gpu_pool_capacity = params.gpu_pool.capacity(),
177        "multi-GPU variant phase starting"
178    );
179
180    // Per-rung shared state.
181    let queues: Vec<Arc<SegmentChunkQueue>> =
182        (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
183    let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
184    let scaler_active: Vec<Arc<AtomicBool>> =
185        (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
186    let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
187        (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
188    let contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>> =
189        Arc::new((0..n).map(|_| std::sync::Mutex::new(Vec::new())).collect());
190    let active_workers: Arc<Vec<AtomicUsize>> =
191        Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
192    let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
193    let finalized: Arc<Vec<AtomicBool>> =
194        Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
195
196    // Periodic progress reporter.
197    let progress_stop = Arc::new(AtomicBool::new(false));
198    let progress_handle = spawn_progress_reporter(
199        rungs.to_vec(),
200        frames_encoded.clone(),
201        finalized.clone(),
202        params.total_input_frames,
203        Arc::clone(&sink),
204        Arc::clone(&progress_stop),
205    );
206
207    // Finalizers: one per rung, merges contributions → RungManifest.
208    let total_input_frames = params.total_input_frames;
209    let (finalizer_tx, mut finalizer_rx) =
210        mpsc::channel::<(usize, Result<Option<RungManifest>>)>(n.max(1));
211    let mut finalizer_handles = Vec::with_capacity(n);
212    for idx in 0..n {
213        let contributions_h = Arc::clone(&contributions);
214        let active_h = Arc::clone(&active_workers);
215        let rung_done_h = Arc::clone(&rung_done);
216        let finalized_h = Arc::clone(&finalized);
217        let tx = finalizer_tx.clone();
218        let rung = rungs[idx].clone();
219        let rel_dir = format!("video/{}", rung.label);
220        let output_root = params.output_root.clone();
221        let timescale = params.timescale;
222        let total_segments = total_segments;
223        let sink = Arc::clone(&sink);
224        finalizer_handles.push(tokio::spawn(async move {
225            // Wait for all of this rung's workers + the scaler to finish.
226            loop {
227                let notified = rung_done_h[idx].notified();
228                if active_h[idx].load(Ordering::Acquire) == 0 {
229                    break;
230                }
231                notified.await;
232            }
233            let outputs: Vec<WorkerOutput> = std::mem::take(&mut *contributions_h[idx].lock().unwrap());
234            if outputs.is_empty() {
235                finalized_h[idx].store(true, Ordering::Release);
236                let _ = tx.send((idx, Ok(None))).await;
237                return;
238            }
239            let init_path = output_root.join(&rel_dir).join("init.mp4");
240            let contribs: Vec<RungContribution> = outputs
241                .into_iter()
242                .map(|wo| RungContribution {
243                    width: rung.width,
244                    height: rung.height,
245                    relative_dir: rel_dir.clone(),
246                    manifest: CmafTrackManifest {
247                        init_path: init_path.clone(),
248                        segments: wo.segments,
249                        timescale,
250                    },
251                })
252                .collect();
253            let result = match merge_rung_contributions(contribs) {
254                Ok(merged) => {
255                    let got = merged.manifest.segments.len();
256                    if got != total_segments as usize {
257                        Err(anyhow!(
258                            "rung {} coverage incomplete: expected {} segments, got {}",
259                            rung.label,
260                            total_segments,
261                            got
262                        ))
263                    } else {
264                        let bytes: u64 = merged.manifest.segments.iter().map(|s| s.byte_size).sum();
265                        report(
266                            sink.as_ref(),
267                            idx,
268                            &rung,
269                            RungStatus::Completed,
270                            total_input_frames,
271                            Some(total_input_frames),
272                            got as u32,
273                            bytes,
274                            None,
275                        );
276                        Ok(Some(RungManifest {
277                            rung_index: idx,
278                            width: rung.width,
279                            height: rung.height,
280                            label: rung.label.clone(),
281                            relative_dir: rel_dir.clone(),
282                            manifest: merged.manifest,
283                        }))
284                    }
285                }
286                Err(e) => Err(anyhow!("merging contributions for rung {}: {e}", rung.label)),
287            };
288            finalized_h[idx].store(true, Ordering::Release);
289            let _ = tx.send((idx, result)).await;
290        }));
291    }
292    drop(finalizer_tx);
293
294    // Smallest-first claim order for initial workers.
295    let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
296    indexed.sort_by_key(|(_, r)| r.short_side());
297
298    // Decode pump(s) + fan-out channels.
299    let mut frame_senders = Vec::with_capacity(n);
300    let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
301        Vec::with_capacity(n);
302    for _ in 0..n {
303        let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
304        frame_senders.push(tx);
305        frame_receivers.push(Some(rx));
306    }
307
308    let use_shared_pump = n <= params.gpu_pool.capacity();
309    let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
310    let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
311        codec_name: params.header.codec.clone(),
312        info_for_decoder: params.header.info.clone(),
313        source_color_metadata: params.source_color_metadata,
314        source_pixel_format: params.source_pixel_format,
315        needs_downsample: params.needs_downsample,
316        tonemap_to_sdr: params.tonemap_to_sdr,
317        gpu_index,
318        filters: params.filters.clone(),
319    };
320    if use_shared_pump {
321        let cfg = make_pump_cfg(params.decode_gpu_for(0));
322        let senders = frame_senders;
323        let input = params.input.clone();
324        let rt = tokio::runtime::Handle::current();
325        pump_tasks.spawn(async move {
326            tokio::task::spawn_blocking(move || {
327                crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
328            })
329            .await
330            .map_err(|e| anyhow!("shared pump join error: {e}"))
331            .and_then(|r| r)
332        });
333    } else {
334        for (idx, sender) in frame_senders.into_iter().enumerate() {
335            let cfg = make_pump_cfg(params.decode_gpu_for(idx));
336            let input = params.input.clone();
337            let rt = tokio::runtime::Handle::current();
338            pump_tasks.spawn(async move {
339                tokio::task::spawn_blocking(move || {
340                    crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
341                })
342                .await
343                .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
344                .and_then(|r| r)
345            });
346        }
347    }
348
349    // Per-rung scalers.
350    let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
351    for (idx, rung) in rungs.iter().cloned().enumerate() {
352        let rx = frame_receivers[idx].take().expect("scaler rx slot");
353        let cfg = crate::rung_scaler::RungScalerConfig {
354            rung_idx: idx,
355            target_width: rung.width,
356            target_height: rung.height,
357            frames_per_chunk: params.keyframe_interval,
358        };
359        let queue = Arc::clone(&queues[idx]);
360        let rt = tokio::runtime::Handle::current();
361        let scaler_flag = Arc::clone(&scaler_active[idx]);
362        let active_h = Arc::clone(&active_workers);
363        let rung_done_h = Arc::clone(&rung_done);
364        scaler_flag.store(true, Ordering::Release);
365        active_h[idx].fetch_add(1, Ordering::AcqRel);
366        scaler_tasks.spawn(async move {
367            let result = tokio::task::spawn_blocking(move || {
368                crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
369            })
370            .await
371            .map_err(|e| anyhow!("scaler join error: {e}"))
372            .and_then(|r| r);
373            scaler_flag.store(false, Ordering::Release);
374            let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
375            if prev == 1 {
376                rung_done_h[idx].notify_one();
377            }
378            (idx, result)
379        });
380    }
381
382    // Initial encoder workers (one per rung, smallest first).
383    let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
384    let ctx = WorkerCtx {
385        codec: params.codec,
386        frame_rate: params.frame_rate,
387        output_color_metadata: params.output_color_metadata,
388        output_pixel_format: params.output_pixel_format,
389        timescale: params.timescale,
390        per_frame_ticks: params.per_frame_ticks,
391        keyframe_interval: params.keyframe_interval,
392        segment_target_ticks: params.segment_target_ticks,
393        output_root: params.output_root.clone(),
394        constant_qp: params.constant_qp,
395    };
396    for (idx, rung) in indexed.iter().cloned() {
397        let lease = match Arc::clone(&params.gpu_pool).claim().await {
398            Some(l) => l,
399            None => {
400                progress_stop.store(true, Ordering::Release);
401                let _ = progress_handle.await;
402                bail!("multigpu: GPU pool returned no lease on a CPU-only host; at least one GPU is required");
403            }
404        };
405        spawn_encoder_worker(
406            &ctx,
407            idx,
408            &rung,
409            Arc::clone(&queues[idx]),
410            Arc::clone(&frames_encoded[idx]),
411            lease,
412            Arc::clone(&contributions),
413            Arc::clone(&active_workers),
414            Arc::clone(&rung_done),
415            Arc::clone(&rung_invariants[idx]),
416            Some(&mut worker_tasks),
417        );
418    }
419
420    // Helper dispatcher.
421    let helper_cancel = Arc::new(AtomicBool::new(false));
422    let helper_handle = {
423        let cancel = Arc::clone(&helper_cancel);
424        let pool = Arc::clone(&params.gpu_pool);
425        let queues = queues.clone();
426        let scaler_active = scaler_active.clone();
427        let frames_encoded = frames_encoded.clone();
428        let contributions = Arc::clone(&contributions);
429        let active_workers = Arc::clone(&active_workers);
430        let rung_done = Arc::clone(&rung_done);
431        let rung_invariants = rung_invariants.clone();
432        let rungs_owned: Vec<Rung> = rungs.to_vec();
433        let ctx = ctx.clone();
434        tokio::spawn(async move {
435            loop {
436                if cancel.load(Ordering::Acquire) {
437                    break;
438                }
439                tokio::time::sleep(HELPER_POLL_INTERVAL).await;
440                if pool.pending_claimers() > 0 {
441                    continue;
442                }
443                let mut target = None;
444                for (idx, q) in queues.iter().enumerate() {
445                    let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
446                    let has_pending = q.pushed_segments() > q.popped_segments();
447                    if scaler_alive || has_pending {
448                        target = Some(idx);
449                        break;
450                    }
451                }
452                let Some(rung_idx) = target else { break };
453                let lease = match pool.try_claim() {
454                    Some(l) => l,
455                    None => continue,
456                };
457                tracing::info!(rung_idx, gpu_index = lease.gpu_index, "multigpu helper dispatch");
458                spawn_encoder_worker(
459                    &ctx,
460                    rung_idx,
461                    &rungs_owned[rung_idx],
462                    Arc::clone(&queues[rung_idx]),
463                    Arc::clone(&frames_encoded[rung_idx]),
464                    lease,
465                    Arc::clone(&contributions),
466                    Arc::clone(&active_workers),
467                    Arc::clone(&rung_done),
468                    Arc::clone(&rung_invariants[rung_idx]),
469                    None,
470                );
471            }
472        })
473    };
474
475    // Drain everything.
476    let mut completed: Vec<Option<RungManifest>> = (0..n).map(|_| None).collect();
477    let mut pumps_remaining = pump_tasks.len();
478    let mut scalers_remaining = n;
479    let mut workers_remaining = n;
480    let mut finalizers_remaining = n;
481
482    macro_rules! teardown_err {
483        ($e:expr) => {{
484            helper_cancel.store(true, Ordering::Release);
485            let _ = helper_handle.await;
486            progress_stop.store(true, Ordering::Release);
487            let _ = progress_handle.await;
488            return Err($e);
489        }};
490    }
491
492    while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
493        tokio::select! {
494            biased;
495            p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
496                Some(Ok(Ok(n))) => { tracing::info!(frames = n, "decode pump finished"); pumps_remaining -= 1; }
497                Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
498                Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
499                None => pumps_remaining = 0,
500            },
501            s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
502                Some(Ok((idx, Ok(n)))) => { tracing::info!(idx, chunks = n, "scaler finished"); scalers_remaining -= 1; }
503                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
504                Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
505                None => scalers_remaining = 0,
506            },
507            w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
508                Some(Ok((idx, Ok(())))) => { tracing::info!(idx, "initial worker finished"); workers_remaining -= 1; }
509                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("worker for rung {idx} failed: {e}")),
510                Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
511                None => workers_remaining = 0,
512            },
513            f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
514                Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
515                Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
516                None => finalizers_remaining = 0,
517            },
518        }
519    }
520
521    helper_cancel.store(true, Ordering::Release);
522    let _ = helper_handle.await;
523    progress_stop.store(true, Ordering::Release);
524    let _ = progress_handle.await;
525    for h in finalizer_handles {
526        let _ = h.await;
527    }
528
529    Ok(completed)
530}
531
532/// Per-job constants shared by every encoder worker.
533#[derive(Clone)]
534struct WorkerCtx {
535    codec: VideoCodec,
536    frame_rate: f64,
537    output_color_metadata: ColorMetadata,
538    output_pixel_format: PixelFormat,
539    timescale: u32,
540    per_frame_ticks: u32,
541    keyframe_interval: u32,
542    segment_target_ticks: u64,
543    output_root: PathBuf,
544    constant_qp: bool,
545}
546
547#[allow(clippy::too_many_arguments)]
548fn spawn_encoder_worker(
549    ctx: &WorkerCtx,
550    rung_idx: usize,
551    rung: &Rung,
552    queue: Arc<SegmentChunkQueue>,
553    frames_encoded: Arc<AtomicU64>,
554    lease: GpuLease,
555    contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>>,
556    active_workers: Arc<Vec<AtomicUsize>>,
557    rung_done: Arc<Vec<Notify>>,
558    rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
559    worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
560) {
561    let rel_dir = format!("video/{}", rung.label);
562    let output_dir = ctx.output_root.join(&rel_dir);
563    let gpu_index = lease.gpu_index;
564    let gpu_vendor = lease.vendor;
565
566    let cfg = EncoderWorkerConfig {
567        rung_idx,
568        codec: ctx.codec,
569        width: rung.width,
570        height: rung.height,
571        frame_rate: ctx.frame_rate,
572        quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
573        speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
574        target: rung.quality.target,
575        tier: rung.quality.tier,
576        threads: 0,
577        gpu_index: Some(gpu_index),
578        gpu_vendor: Some(gpu_vendor),
579        output_color_metadata: ctx.output_color_metadata,
580        output_pixel_format: ctx.output_pixel_format,
581        constant_qp: ctx.constant_qp,
582        timescale: ctx.timescale,
583        per_frame_ticks: ctx.per_frame_ticks,
584        keyframe_interval: ctx.keyframe_interval,
585        segment_target_ticks: ctx.segment_target_ticks,
586        output_dir,
587        rung_invariant,
588    };
589
590    active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
591    let body = async move {
592        let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
593        let cfg_for_worker = cfg.clone();
594        let queue_for_worker = Arc::clone(&queue);
595        let rt = tokio::runtime::Handle::current();
596        let counter = Arc::clone(&frames_encoded);
597        let blocking = tokio::task::spawn_blocking(move || {
598            run_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx)
599        });
600        // Drain the per-frame progress channel (the shared AtomicU64 counter is
601        // the source of truth the reporter task reads; here we just keep the
602        // channel from backpressuring the worker).
603        let drain = async move { while progress_rx.recv().await.is_some() {} };
604        let (_, br) = tokio::join!(drain, blocking);
605
606        let task_status: Result<()> = match br {
607            Ok(Ok(out)) => {
608                contributions[rung_idx].lock().unwrap().push(out);
609                Ok(())
610            }
611            Ok(Err(e)) => Err(e),
612            Err(e) => Err(anyhow!("worker join error: {e}")),
613        };
614        drop(lease);
615        let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
616        if prev == 1 {
617            rung_done[rung_idx].notify_one();
618        }
619        (rung_idx, task_status)
620    };
621
622    match worker_tasks {
623        Some(set) => {
624            set.spawn(body);
625        }
626        None => {
627            tokio::spawn(async move {
628                let _ = body.await;
629            });
630        }
631    }
632}
633
634/// Periodic per-rung progress reporter. Reads the shared frame counters and
635/// emits `Running` updates until stopped; skips rungs already finalized.
636fn spawn_progress_reporter(
637    rungs: Vec<Rung>,
638    frames_encoded: Vec<Arc<AtomicU64>>,
639    finalized: Arc<Vec<AtomicBool>>,
640    total_input_frames: u64,
641    sink: Arc<dyn ProgressSink>,
642    stop: Arc<AtomicBool>,
643) -> tokio::task::JoinHandle<()> {
644    tokio::spawn(async move {
645        loop {
646            if stop.load(Ordering::Acquire) {
647                break;
648            }
649            tokio::time::sleep(PROGRESS_TICK).await;
650            for (idx, rung) in rungs.iter().enumerate() {
651                if finalized[idx].load(Ordering::Acquire) {
652                    continue;
653                }
654                let done = frames_encoded[idx].load(Ordering::Relaxed);
655                report(
656                    sink.as_ref(),
657                    idx,
658                    rung,
659                    RungStatus::Running,
660                    done,
661                    Some(total_input_frames),
662                    0,
663                    0,
664                    None,
665                );
666            }
667        }
668    })
669}
670
671#[allow(clippy::too_many_arguments)]
672fn report(
673    sink: &dyn ProgressSink,
674    rung_index: usize,
675    rung: &Rung,
676    status: RungStatus,
677    frames_done: u64,
678    frames_total: Option<u64>,
679    segments: u32,
680    bytes_out: u64,
681    message: Option<String>,
682) {
683    let percent = match status {
684        RungStatus::Completed => 100.0,
685        RungStatus::Pending => 0.0,
686        _ => match frames_total {
687            Some(t) if t > 0 => ((frames_done as f32 / t as f32) * 100.0).min(99.0),
688            _ => 1.0,
689        },
690    };
691    sink.on_rung(RungProgress {
692        rung_index,
693        label: rung.label.clone(),
694        width: rung.width,
695        height: rung.height,
696        status,
697        percent,
698        frames_done,
699        frames_total,
700        segments_written: segments,
701        bytes_out,
702        message,
703    });
704}
705
706/// Build a [`GpuPool`] from the host's detected GPU inventory.
707pub fn detect_gpu_pool() -> Arc<GpuPool> {
708    Arc::new(GpuPool::new(&codec::gpu::detect_gpus()))
709}
710
711fn policy_vendor(fam: GpuFamily) -> codec::gpu::GpuVendor {
712    match fam {
713        GpuFamily::Nvidia => codec::gpu::GpuVendor::Nvidia,
714        GpuFamily::Amd => codec::gpu::GpuVendor::Amd,
715        GpuFamily::Intel => codec::gpu::GpuVendor::Intel,
716    }
717}
718
719/// The host GPUs selected by an [`EncodePolicy`]: all of them for `AllGpus`,
720/// the first / pinned index for `SingleGpu`, every device of one vendor for
721/// `Family`.
722fn select_gpus_for_policy(policy: EncodePolicy) -> Vec<codec::gpu::GpuDevice> {
723    let gpus = codec::gpu::detect_gpus();
724    match policy {
725        EncodePolicy::AllGpus => gpus,
726        EncodePolicy::SingleGpu(None) => gpus.into_iter().take(1).collect(),
727        EncodePolicy::SingleGpu(Some(idx)) => gpus.into_iter().filter(|g| g.index == idx).collect(),
728        EncodePolicy::Family(fam) => {
729            let v = policy_vendor(fam);
730            gpus.into_iter().filter(|g| g.vendor == v).collect()
731        }
732    }
733}
734
735/// Build a [`GpuPool`] constrained to the given [`EncodePolicy`]. An empty pool
736/// (e.g. a pinned index or vendor family that isn't present) yields capacity 0,
737/// so the orchestrator's pre-flight probe / lease claim surfaces a clear error.
738///
739/// When more than one GPU is selected, cards that can't actually encode the
740/// REQUESTED `codec` (e.g. a pre-Ada NVIDIA that decodes via NVDEC but has no
741/// AV1 encode silicon — yet can still encode H.264/H.265) are dropped from the
742/// **encode** pool, so a worker never leases an incapable card and hard-fails
743/// the run; the capable cards do the encoding. A single selected GPU is left
744/// as-is, since the serial path's non-pinned encoder dispatch already falls
745/// through vendors. Dropped cards stay available for the decode pump
746/// ([`policy_gpu_indices`] is intentionally NOT filtered).
747pub fn gpu_pool_for_policy(policy: EncodePolicy, codec: codec::frame::VideoCodec) -> Arc<GpuPool> {
748    let selected = select_gpus_for_policy(policy);
749    let pool_gpus = if selected.len() > 1 {
750        selected.into_iter().filter(|g| codec::encode::encode_capable(g, codec)).collect()
751    } else {
752        selected
753    };
754    Arc::new(GpuPool::new(&pool_gpus))
755}
756
757/// The GPU indices an [`EncodePolicy`] selects, in detection order. Used to pin
758/// the decode pump to a device consistent with the policy (so decode honors a
759/// `Family` / `SingleGpu` constraint, not just encode).
760pub fn policy_gpu_indices(policy: EncodePolicy) -> Vec<u32> {
761    select_gpus_for_policy(policy).into_iter().map(|g| g.index).collect()
762}
763
764/// The GPU index to pin a *serial* (single-GPU) encode/decode to under a
765/// policy: `None` (auto/first-available) for `AllGpus`, the pinned index for
766/// `SingleGpu`, the first device of the vendor for `Family`.
767pub fn serial_gpu_for_policy(policy: EncodePolicy) -> Option<u32> {
768    match policy {
769        EncodePolicy::AllGpus => None,
770        EncodePolicy::SingleGpu(idx) => idx,
771        EncodePolicy::Family(_) => select_gpus_for_policy(policy).first().map(|g| g.index),
772    }
773}
774
775// ---------------------------------------------------------------------------
776// Single-file multi-GPU: chunk each rung across GPUs, stitch packets into MP4.
777// ---------------------------------------------------------------------------
778
779/// One rung's full ordered AV1 packet stream, stitched from chunks encoded
780/// across GPUs. The caller muxes these into a single MP4 (+ audio).
781#[derive(Debug)]
782pub struct RungPackets {
783    pub rung_index: usize,
784    pub codec: VideoCodec,
785    pub width: u32,
786    pub height: u32,
787    pub label: String,
788    pub packets: Vec<EncodedPacket>,
789}
790
791/// Single-file counterpart to [`run_multigpu_hls`]: decode once, fan to per-rung
792/// scalers, and dynamically schedule each rung's GOP-sized chunks across all
793/// GPUs (fair lease pool + mid-flight helper dispatch + cross-vendor codec
794/// invariant). Each worker encodes its chunk to packets (a fresh encoder per
795/// chunk → first frame is an IDR); the finalizer concatenates them in segment
796/// order into one ordered packet stream per rung — no disk round-trip.
797pub async fn run_multigpu_single_file(
798    params: MultiGpuParams<'_>,
799    sink: Arc<dyn ProgressSink>,
800) -> Result<Vec<Option<RungPackets>>> {
801    let rungs = params.rungs;
802    let n = rungs.len();
803    if n == 0 {
804        return Ok(Vec::new());
805    }
806    let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
807    if total_segments == 0 {
808        bail!(
809            "multigpu single-file: total_segments == 0 (frames={}, keyframe_interval={})",
810            params.total_input_frames,
811            params.keyframe_interval
812        );
813    }
814
815    // Pre-flight encoder probe (same fail-fast as the HLS path).
816    {
817        let probe = codec::encode::EncoderConfig {
818            width: rungs[0].width,
819            height: rungs[0].height,
820            frame_rate: params.frame_rate,
821            gpu_index: None,
822            codec: params.codec,
823            ..Default::default()
824        };
825        codec::encode::select_encoder(probe, None).map_err(|e| {
826            anyhow!(
827                "no {:?} encoder available on this host ({e}); need NVENC / AMF / QSV, or build \
828                 with the `ffmpeg` feature",
829                params.codec
830            )
831        })?;
832    }
833
834    tracing::info!(
835        rungs = n,
836        total_segments,
837        gpu_pool_capacity = params.gpu_pool.capacity(),
838        "multi-GPU single-file phase starting"
839    );
840
841    let queues: Vec<Arc<SegmentChunkQueue>> =
842        (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
843    let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
844    let scaler_active: Vec<Arc<AtomicBool>> =
845        (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
846    let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
847        (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
848    // Per-rung packet collectors (each its own Arc so chunk workers can push).
849    let contributions: Vec<Arc<std::sync::Mutex<Vec<ChunkPackets>>>> =
850        (0..n).map(|_| Arc::new(std::sync::Mutex::new(Vec::new()))).collect();
851    let active_workers: Arc<Vec<AtomicUsize>> =
852        Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
853    let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
854    let finalized: Arc<Vec<AtomicBool>> = Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
855
856    let progress_stop = Arc::new(AtomicBool::new(false));
857    let progress_handle = spawn_progress_reporter(
858        rungs.to_vec(),
859        frames_encoded.clone(),
860        finalized.clone(),
861        params.total_input_frames,
862        Arc::clone(&sink),
863        Arc::clone(&progress_stop),
864    );
865
866    // Finalizers: stitch each rung's chunks (sorted, deduped) into one stream.
867    let total_input_frames = params.total_input_frames;
868    let codec = params.codec; // Copy; captured by each finalizer closure
869    let (finalizer_tx, mut finalizer_rx) =
870        mpsc::channel::<(usize, Result<Option<RungPackets>>)>(n.max(1));
871    let mut finalizer_handles = Vec::with_capacity(n);
872    for idx in 0..n {
873        let collector = Arc::clone(&contributions[idx]);
874        let active_h = Arc::clone(&active_workers);
875        let rung_done_h = Arc::clone(&rung_done);
876        let finalized_h = Arc::clone(&finalized);
877        let tx = finalizer_tx.clone();
878        let rung = rungs[idx].clone();
879        let total_segments = total_segments;
880        let sink = Arc::clone(&sink);
881        finalizer_handles.push(tokio::spawn(async move {
882            loop {
883                let notified = rung_done_h[idx].notified();
884                if active_h[idx].load(Ordering::Acquire) == 0 {
885                    break;
886                }
887                notified.await;
888            }
889            let mut chunks: Vec<ChunkPackets> = std::mem::take(&mut *collector.lock().unwrap());
890            if chunks.is_empty() {
891                finalized_h[idx].store(true, Ordering::Release);
892                let _ = tx.send((idx, Ok(None))).await;
893                return;
894            }
895            chunks.sort_by_key(|c| c.segment_idx);
896            chunks.dedup_by_key(|c| c.segment_idx);
897            // Coverage: contiguous 0..total_segments.
898            let got = chunks.len();
899            let contiguous = chunks
900                .iter()
901                .enumerate()
902                .all(|(i, c)| c.segment_idx == i);
903            let result = if got != total_segments as usize || !contiguous {
904                Err(anyhow!(
905                    "rung {} chunk coverage incomplete: expected {} contiguous chunks, got {}",
906                    rung.label,
907                    total_segments,
908                    got
909                ))
910            } else {
911                let mut packets: Vec<EncodedPacket> = Vec::new();
912                for c in chunks {
913                    packets.extend(c.packets);
914                }
915                let bytes: u64 = packets.iter().map(|p| p.data.len() as u64).sum();
916                report(
917                    sink.as_ref(),
918                    idx,
919                    &rung,
920                    RungStatus::Completed,
921                    total_input_frames,
922                    Some(total_input_frames),
923                    got as u32,
924                    bytes,
925                    None,
926                );
927                Ok(Some(RungPackets {
928                    rung_index: idx,
929                    codec,
930                    width: rung.width,
931                    height: rung.height,
932                    label: rung.label.clone(),
933                    packets,
934                }))
935            };
936            finalized_h[idx].store(true, Ordering::Release);
937            let _ = tx.send((idx, result)).await;
938        }));
939    }
940    drop(finalizer_tx);
941
942    let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
943    indexed.sort_by_key(|(_, r)| r.short_side());
944
945    // Decode pump(s) + fan-out.
946    let mut frame_senders = Vec::with_capacity(n);
947    let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
948        Vec::with_capacity(n);
949    for _ in 0..n {
950        let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
951        frame_senders.push(tx);
952        frame_receivers.push(Some(rx));
953    }
954    let use_shared_pump = n <= params.gpu_pool.capacity();
955    let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
956    let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
957        codec_name: params.header.codec.clone(),
958        info_for_decoder: params.header.info.clone(),
959        source_color_metadata: params.source_color_metadata,
960        source_pixel_format: params.source_pixel_format,
961        needs_downsample: params.needs_downsample,
962        tonemap_to_sdr: params.tonemap_to_sdr,
963        gpu_index,
964        filters: params.filters.clone(),
965    };
966    if use_shared_pump {
967        let cfg = make_pump_cfg(params.decode_gpu_for(0));
968        let senders = frame_senders;
969        let input = params.input.clone();
970        let rt = tokio::runtime::Handle::current();
971        pump_tasks.spawn(async move {
972            tokio::task::spawn_blocking(move || {
973                crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
974            })
975            .await
976            .map_err(|e| anyhow!("shared pump join error: {e}"))
977            .and_then(|r| r)
978        });
979    } else {
980        for (idx, sender) in frame_senders.into_iter().enumerate() {
981            let cfg = make_pump_cfg(params.decode_gpu_for(idx));
982            let input = params.input.clone();
983            let rt = tokio::runtime::Handle::current();
984            pump_tasks.spawn(async move {
985                tokio::task::spawn_blocking(move || {
986                    crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
987                })
988                .await
989                .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
990                .and_then(|r| r)
991            });
992        }
993    }
994
995    // Per-rung scalers.
996    let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
997    for (idx, rung) in rungs.iter().cloned().enumerate() {
998        let rx = frame_receivers[idx].take().expect("scaler rx slot");
999        let cfg = crate::rung_scaler::RungScalerConfig {
1000            rung_idx: idx,
1001            target_width: rung.width,
1002            target_height: rung.height,
1003            frames_per_chunk: params.keyframe_interval,
1004        };
1005        let queue = Arc::clone(&queues[idx]);
1006        let rt = tokio::runtime::Handle::current();
1007        let scaler_flag = Arc::clone(&scaler_active[idx]);
1008        let active_h = Arc::clone(&active_workers);
1009        let rung_done_h = Arc::clone(&rung_done);
1010        scaler_flag.store(true, Ordering::Release);
1011        active_h[idx].fetch_add(1, Ordering::AcqRel);
1012        scaler_tasks.spawn(async move {
1013            let result = tokio::task::spawn_blocking(move || {
1014                crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
1015            })
1016            .await
1017            .map_err(|e| anyhow!("scaler join error: {e}"))
1018            .and_then(|r| r);
1019            scaler_flag.store(false, Ordering::Release);
1020            let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
1021            if prev == 1 {
1022                rung_done_h[idx].notify_one();
1023            }
1024            (idx, result)
1025        });
1026    }
1027
1028    // Initial chunk workers.
1029    let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
1030    let ctx = WorkerCtx {
1031        codec: params.codec,
1032        frame_rate: params.frame_rate,
1033        output_color_metadata: params.output_color_metadata,
1034        output_pixel_format: params.output_pixel_format,
1035        timescale: params.timescale,
1036        per_frame_ticks: params.per_frame_ticks,
1037        keyframe_interval: params.keyframe_interval,
1038        segment_target_ticks: params.segment_target_ticks,
1039        output_root: params.output_root.clone(),
1040        constant_qp: params.constant_qp,
1041    };
1042    for (idx, rung) in indexed.iter().cloned() {
1043        let lease = match Arc::clone(&params.gpu_pool).claim().await {
1044            Some(l) => l,
1045            None => {
1046                progress_stop.store(true, Ordering::Release);
1047                let _ = progress_handle.await;
1048                bail!("multigpu single-file: GPU pool returned no lease; at least one GPU required");
1049            }
1050        };
1051        spawn_chunk_worker(
1052            &ctx,
1053            idx,
1054            &rung,
1055            Arc::clone(&queues[idx]),
1056            Arc::clone(&frames_encoded[idx]),
1057            lease,
1058            Arc::clone(&contributions[idx]),
1059            Arc::clone(&active_workers),
1060            Arc::clone(&rung_done),
1061            Arc::clone(&rung_invariants[idx]),
1062            Some(&mut worker_tasks),
1063        );
1064    }
1065
1066    // Helper dispatcher.
1067    let helper_cancel = Arc::new(AtomicBool::new(false));
1068    let helper_handle = {
1069        let cancel = Arc::clone(&helper_cancel);
1070        let pool = Arc::clone(&params.gpu_pool);
1071        let queues = queues.clone();
1072        let scaler_active = scaler_active.clone();
1073        let frames_encoded = frames_encoded.clone();
1074        let contributions = contributions.clone();
1075        let active_workers = Arc::clone(&active_workers);
1076        let rung_done = Arc::clone(&rung_done);
1077        let rung_invariants = rung_invariants.clone();
1078        let rungs_owned: Vec<Rung> = rungs.to_vec();
1079        let ctx = ctx.clone();
1080        tokio::spawn(async move {
1081            loop {
1082                if cancel.load(Ordering::Acquire) {
1083                    break;
1084                }
1085                tokio::time::sleep(HELPER_POLL_INTERVAL).await;
1086                if pool.pending_claimers() > 0 {
1087                    continue;
1088                }
1089                let mut target = None;
1090                for (idx, q) in queues.iter().enumerate() {
1091                    let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
1092                    let has_pending = q.pushed_segments() > q.popped_segments();
1093                    if scaler_alive || has_pending {
1094                        target = Some(idx);
1095                        break;
1096                    }
1097                }
1098                let Some(rung_idx) = target else { break };
1099                let lease = match pool.try_claim() {
1100                    Some(l) => l,
1101                    None => continue,
1102                };
1103                tracing::info!(rung_idx, gpu_index = lease.gpu_index, "single-file helper dispatch");
1104                spawn_chunk_worker(
1105                    &ctx,
1106                    rung_idx,
1107                    &rungs_owned[rung_idx],
1108                    Arc::clone(&queues[rung_idx]),
1109                    Arc::clone(&frames_encoded[rung_idx]),
1110                    lease,
1111                    Arc::clone(&contributions[rung_idx]),
1112                    Arc::clone(&active_workers),
1113                    Arc::clone(&rung_done),
1114                    Arc::clone(&rung_invariants[rung_idx]),
1115                    None,
1116                );
1117            }
1118        })
1119    };
1120
1121    // Drain.
1122    let mut completed: Vec<Option<RungPackets>> = (0..n).map(|_| None).collect();
1123    let mut pumps_remaining = pump_tasks.len();
1124    let mut scalers_remaining = n;
1125    let mut workers_remaining = n;
1126    let mut finalizers_remaining = n;
1127    macro_rules! teardown_err {
1128        ($e:expr) => {{
1129            helper_cancel.store(true, Ordering::Release);
1130            let _ = helper_handle.await;
1131            progress_stop.store(true, Ordering::Release);
1132            let _ = progress_handle.await;
1133            return Err($e);
1134        }};
1135    }
1136    while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
1137        tokio::select! {
1138            biased;
1139            p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
1140                Some(Ok(Ok(_))) => pumps_remaining -= 1,
1141                Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
1142                Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
1143                None => pumps_remaining = 0,
1144            },
1145            s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
1146                Some(Ok((_, Ok(_)))) => scalers_remaining -= 1,
1147                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
1148                Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
1149                None => scalers_remaining = 0,
1150            },
1151            w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
1152                Some(Ok((_, Ok(())))) => workers_remaining -= 1,
1153                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("chunk worker for rung {idx} failed: {e}")),
1154                Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
1155                None => workers_remaining = 0,
1156            },
1157            f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
1158                Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
1159                Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
1160                None => finalizers_remaining = 0,
1161            },
1162        }
1163    }
1164    helper_cancel.store(true, Ordering::Release);
1165    let _ = helper_handle.await;
1166    progress_stop.store(true, Ordering::Release);
1167    let _ = progress_handle.await;
1168    for h in finalizer_handles {
1169        let _ = h.await;
1170    }
1171    Ok(completed)
1172}
1173
1174#[allow(clippy::too_many_arguments)]
1175fn spawn_chunk_worker(
1176    ctx: &WorkerCtx,
1177    rung_idx: usize,
1178    rung: &Rung,
1179    queue: Arc<SegmentChunkQueue>,
1180    frames_encoded: Arc<AtomicU64>,
1181    lease: GpuLease,
1182    collector: Arc<std::sync::Mutex<Vec<ChunkPackets>>>,
1183    active_workers: Arc<Vec<AtomicUsize>>,
1184    rung_done: Arc<Vec<Notify>>,
1185    rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
1186    worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
1187) {
1188    let gpu_index = lease.gpu_index;
1189    let gpu_vendor = lease.vendor;
1190    let cfg = EncoderWorkerConfig {
1191        rung_idx,
1192        codec: ctx.codec,
1193        width: rung.width,
1194        height: rung.height,
1195        frame_rate: ctx.frame_rate,
1196        quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
1197        speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
1198        target: rung.quality.target,
1199        tier: rung.quality.tier,
1200        threads: 0,
1201        gpu_index: Some(gpu_index),
1202        gpu_vendor: Some(gpu_vendor),
1203        output_color_metadata: ctx.output_color_metadata,
1204        output_pixel_format: ctx.output_pixel_format,
1205        constant_qp: ctx.constant_qp,
1206        timescale: ctx.timescale,
1207        per_frame_ticks: ctx.per_frame_ticks,
1208        keyframe_interval: ctx.keyframe_interval,
1209        segment_target_ticks: ctx.segment_target_ticks,
1210        output_dir: ctx.output_root.clone(),
1211        rung_invariant,
1212    };
1213    active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
1214    let body = async move {
1215        let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
1216        let cfg_for_worker = cfg.clone();
1217        let queue_for_worker = Arc::clone(&queue);
1218        let rt = tokio::runtime::Handle::current();
1219        let counter = Arc::clone(&frames_encoded);
1220        let out = Arc::clone(&collector);
1221        let blocking = tokio::task::spawn_blocking(move || {
1222            run_chunk_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx, out)
1223        });
1224        let drain = async move { while progress_rx.recv().await.is_some() {} };
1225        let (_, br) = tokio::join!(drain, blocking);
1226        let task_status: Result<()> = match br {
1227            Ok(Ok(())) => Ok(()),
1228            Ok(Err(e)) => Err(e),
1229            Err(e) => Err(anyhow!("chunk worker join error: {e}")),
1230        };
1231        drop(lease);
1232        let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
1233        if prev == 1 {
1234            rung_done[rung_idx].notify_one();
1235        }
1236        (rung_idx, task_status)
1237    };
1238    match worker_tasks {
1239        Some(set) => {
1240            set.spawn(body);
1241        }
1242        None => {
1243            tokio::spawn(async move {
1244                let _ = body.await;
1245            });
1246        }
1247    }
1248}