Skip to main content

rivet/
multigpu.rs

1//! Multi-GPU reactive variant phase — **the rung benefit**.
2//!
3//! Decode the source **once** and dynamically schedule every rung's CMAF
4//! segments across all available GPUs using a fair lease pool with mid-flight
5//! helper dispatch:
6//!
7//! ```text
8//!   decode pump (decode once)
9//!        │  fan out normalized frames
10//!        ▼
11//!   per-rung scaler ──► SegmentChunkQueue ──► encoder worker (holds a GpuLease)
12//!                                        ──► helper worker (claims a freed lease)
13//! ```
14//!
15//! - One encoder per GPU at a time ([`GpuPool`] enforces it — concurrent
16//!   NVENC sessions on one context deadlock).
17//! - A fast rung releases its lease early; the **helper dispatcher** grabs the
18//!   freed lease and attaches an extra worker to a still-busy rung, so a slow
19//!   rung finishes sooner. Segment work is the unit of parallelism.
20//! - Helpers may land on a different GPU **vendor** than the rung's first
21//!   worker; the per-rung AV1 **codec invariant** ([`RungCodecInvariant`])
22//!   guarantees every contributed segment shares the `av1C` contract, so a
23//!   cross-vendor (NVENC + QSV) rendition still decodes cleanly. A mismatched
24//!   helper requeues its chunk and exits — the run never aborts on it.
25//!
26//! Storage/transport specifics stay out of the engine: progress is reported
27//! through the generic [`ProgressSink`], so a consumer can layer an uploader
28//! (object storage, a status queue, …) on top by watching `RungStatus::Completed`.
29
30use std::path::PathBuf;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
33
34use anyhow::{Result, anyhow, bail};
35use bytes::Bytes;
36use codec::encode::AUTO_FROM_TARGET;
37use codec::frame::{ColorMetadata, PixelFormat};
38use container::cmaf::CmafTrackManifest;
39use container::streaming::DemuxHeader;
40use tokio::sync::{Notify, mpsc};
41use tokio::task::JoinSet;
42
43use codec::encode::EncodedPacket;
44
45use crate::cmaf_util::{RungContribution, merge_rung_contributions, total_segments_for_rung};
46use crate::decode_pump::DecodePumpConfig;
47use crate::encoder_worker::{
48    ChunkPackets, EncoderWorkerConfig, RungCodecInvariant, WorkerOutput,
49    run_chunk_encoder_worker_blocking, run_encoder_worker_blocking,
50};
51use crate::frame_queue::SegmentChunkQueue;
52use crate::gpu_pool::{GpuLease, GpuPool};
53use crate::progress::{ProgressSink, RungProgress, RungStatus};
54use crate::spec::{EncodePolicy, GpuFamily, Rung};
55
56const QUEUE_CAPACITY: usize = 2;
57const FANOUT_CHANNEL_CAPACITY: usize = 4;
58const HELPER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(200);
59const PROGRESS_TICK: std::time::Duration = std::time::Duration::from_millis(500);
60
61/// One rung's finalized CMAF manifest.
62#[derive(Debug, Clone)]
63pub struct RungManifest {
64    pub rung_index: usize,
65    pub width: u32,
66    pub height: u32,
67    pub label: String,
68    /// Directory relative to the asset root, e.g. `"video/720p"`.
69    pub relative_dir: String,
70    pub manifest: CmafTrackManifest,
71}
72
73/// Inputs to [`run_multigpu_hls`].
74pub struct MultiGpuParams<'a> {
75    pub input: Bytes,
76    pub rungs: &'a [Rung],
77    pub header: DemuxHeader,
78    pub source_color_metadata: ColorMetadata,
79    pub source_pixel_format: PixelFormat,
80    /// Whether the decode pump tonemaps HDR→SDR (from the spec's `ColorPolicy`).
81    pub tonemap_to_sdr: bool,
82    /// Resolved **output** color metadata + pixel format the encoders target
83    /// (from `OutputSpec::resolve_output`).
84    pub output_color_metadata: ColorMetadata,
85    pub output_pixel_format: PixelFormat,
86    pub needs_downsample: bool,
87    /// Prepared per-frame video filter chain applied in the decode pump (before
88    /// scaling). Overlay images are loaded once at prepare time.
89    pub filters: Arc<codec::filter::FilterChain>,
90    pub frame_rate: f64,
91    pub gpu_pool: Arc<GpuPool>,
92    /// GPU indices the encode policy selected, in detection order. The decode
93    /// pump pins to these (round-robin for per-rung pumps) so decode honors the
94    /// same `Family` / `SingleGpu` / `AllGpus` constraint as encode. Empty ⇒
95    /// the decoder dispatch auto-selects (legacy behavior).
96    pub gpu_indices: Vec<u32>,
97    /// Explicit decode-pump GPU override. `Some(i)` forces every decode pump
98    /// onto GPU `i` regardless of `gpu_indices`; `None` follows the policy.
99    pub decode_gpu: Option<u32>,
100    pub output_root: PathBuf,
101    pub timescale: u32,
102    pub per_frame_ticks: u32,
103    pub keyframe_interval: u32,
104    pub segment_target_ticks: u64,
105    pub total_input_frames: u64,
106    /// Force constant-QP chunk encoding (single-file `ChunkSeamMode::ParallelConstQp`)
107    /// so stitched chunk seams are quality-flat. `false` for HLS (segments are
108    /// independent) and the default `Parallel` single-file mode.
109    pub constant_qp: bool,
110}
111
112impl MultiGpuParams<'_> {
113    /// Resolve the decode-pump GPU for the `i`-th per-rung pump (or the shared
114    /// pump when `i == 0`): the explicit `decode_gpu` override wins, else the
115    /// policy's GPU indices round-robin, else `None` (decoder auto-select).
116    fn decode_gpu_for(&self, i: usize) -> Option<u32> {
117        if self.decode_gpu.is_some() {
118            return self.decode_gpu;
119        }
120        if self.gpu_indices.is_empty() {
121            return None;
122        }
123        Some(self.gpu_indices[i % self.gpu_indices.len()])
124    }
125}
126
127/// Run the reactive multi-GPU variant phase. Returns one `Option<RungManifest>`
128/// per rung (in rung order); `None` means the rung produced no segments.
129pub async fn run_multigpu_hls(
130    params: MultiGpuParams<'_>,
131    sink: Arc<dyn ProgressSink>,
132) -> Result<Vec<Option<RungManifest>>> {
133    let rungs = params.rungs;
134    let n = rungs.len();
135    if n == 0 {
136        return Ok(Vec::new());
137    }
138    let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
139    if total_segments == 0 {
140        bail!(
141            "multigpu: total_segments == 0 (total_input_frames={}, keyframe_interval={})",
142            params.total_input_frames,
143            params.keyframe_interval
144        );
145    }
146
147    // Pre-flight: verify this host can actually construct an AV1 encoder
148    // before spawning the orchestration. Fail fast with a clear error instead
149    // of dispatching workers that fail at encoder construction — and, on
150    // drivers that re-init a failed NVENC session badly (e.g. Ampere with no
151    // AV1-encode silicon), would otherwise hang an uncancellable blocking task.
152    {
153        let probe = codec::encode::EncoderConfig {
154            width: rungs[0].width,
155            height: rungs[0].height,
156            frame_rate: params.frame_rate,
157            gpu_index: None,
158            ..Default::default()
159        };
160        codec::encode::select_encoder(probe, None).map_err(|e| {
161            anyhow!(
162                "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
163                 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature for a software encoder"
164            )
165        })?;
166    }
167
168    tracing::info!(
169        rungs = n,
170        total_segments,
171        gpu_pool_capacity = params.gpu_pool.capacity(),
172        "multi-GPU variant phase starting"
173    );
174
175    // Per-rung shared state.
176    let queues: Vec<Arc<SegmentChunkQueue>> =
177        (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
178    let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
179    let scaler_active: Vec<Arc<AtomicBool>> =
180        (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
181    let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
182        (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
183    let contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>> =
184        Arc::new((0..n).map(|_| std::sync::Mutex::new(Vec::new())).collect());
185    let active_workers: Arc<Vec<AtomicUsize>> =
186        Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
187    let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
188    let finalized: Arc<Vec<AtomicBool>> =
189        Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
190
191    // Periodic progress reporter.
192    let progress_stop = Arc::new(AtomicBool::new(false));
193    let progress_handle = spawn_progress_reporter(
194        rungs.to_vec(),
195        frames_encoded.clone(),
196        finalized.clone(),
197        params.total_input_frames,
198        Arc::clone(&sink),
199        Arc::clone(&progress_stop),
200    );
201
202    // Finalizers: one per rung, merges contributions → RungManifest.
203    let total_input_frames = params.total_input_frames;
204    let (finalizer_tx, mut finalizer_rx) =
205        mpsc::channel::<(usize, Result<Option<RungManifest>>)>(n.max(1));
206    let mut finalizer_handles = Vec::with_capacity(n);
207    for idx in 0..n {
208        let contributions_h = Arc::clone(&contributions);
209        let active_h = Arc::clone(&active_workers);
210        let rung_done_h = Arc::clone(&rung_done);
211        let finalized_h = Arc::clone(&finalized);
212        let tx = finalizer_tx.clone();
213        let rung = rungs[idx].clone();
214        let rel_dir = format!("video/{}", rung.label);
215        let output_root = params.output_root.clone();
216        let timescale = params.timescale;
217        let total_segments = total_segments;
218        let sink = Arc::clone(&sink);
219        finalizer_handles.push(tokio::spawn(async move {
220            // Wait for all of this rung's workers + the scaler to finish.
221            loop {
222                let notified = rung_done_h[idx].notified();
223                if active_h[idx].load(Ordering::Acquire) == 0 {
224                    break;
225                }
226                notified.await;
227            }
228            let outputs: Vec<WorkerOutput> = std::mem::take(&mut *contributions_h[idx].lock().unwrap());
229            if outputs.is_empty() {
230                finalized_h[idx].store(true, Ordering::Release);
231                let _ = tx.send((idx, Ok(None))).await;
232                return;
233            }
234            let init_path = output_root.join(&rel_dir).join("init.mp4");
235            let contribs: Vec<RungContribution> = outputs
236                .into_iter()
237                .map(|wo| RungContribution {
238                    width: rung.width,
239                    height: rung.height,
240                    relative_dir: rel_dir.clone(),
241                    manifest: CmafTrackManifest {
242                        init_path: init_path.clone(),
243                        segments: wo.segments,
244                        timescale,
245                    },
246                })
247                .collect();
248            let result = match merge_rung_contributions(contribs) {
249                Ok(merged) => {
250                    let got = merged.manifest.segments.len();
251                    if got != total_segments as usize {
252                        Err(anyhow!(
253                            "rung {} coverage incomplete: expected {} segments, got {}",
254                            rung.label,
255                            total_segments,
256                            got
257                        ))
258                    } else {
259                        let bytes: u64 = merged.manifest.segments.iter().map(|s| s.byte_size).sum();
260                        report(
261                            sink.as_ref(),
262                            idx,
263                            &rung,
264                            RungStatus::Completed,
265                            total_input_frames,
266                            Some(total_input_frames),
267                            got as u32,
268                            bytes,
269                            None,
270                        );
271                        Ok(Some(RungManifest {
272                            rung_index: idx,
273                            width: rung.width,
274                            height: rung.height,
275                            label: rung.label.clone(),
276                            relative_dir: rel_dir.clone(),
277                            manifest: merged.manifest,
278                        }))
279                    }
280                }
281                Err(e) => Err(anyhow!("merging contributions for rung {}: {e}", rung.label)),
282            };
283            finalized_h[idx].store(true, Ordering::Release);
284            let _ = tx.send((idx, result)).await;
285        }));
286    }
287    drop(finalizer_tx);
288
289    // Smallest-first claim order for initial workers.
290    let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
291    indexed.sort_by_key(|(_, r)| r.short_side());
292
293    // Decode pump(s) + fan-out channels.
294    let mut frame_senders = Vec::with_capacity(n);
295    let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
296        Vec::with_capacity(n);
297    for _ in 0..n {
298        let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
299        frame_senders.push(tx);
300        frame_receivers.push(Some(rx));
301    }
302
303    let use_shared_pump = n <= params.gpu_pool.capacity();
304    let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
305    let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
306        codec_name: params.header.codec.clone(),
307        info_for_decoder: params.header.info.clone(),
308        source_color_metadata: params.source_color_metadata,
309        source_pixel_format: params.source_pixel_format,
310        needs_downsample: params.needs_downsample,
311        tonemap_to_sdr: params.tonemap_to_sdr,
312        gpu_index,
313        filters: params.filters.clone(),
314    };
315    if use_shared_pump {
316        let cfg = make_pump_cfg(params.decode_gpu_for(0));
317        let senders = frame_senders;
318        let input = params.input.clone();
319        let rt = tokio::runtime::Handle::current();
320        pump_tasks.spawn(async move {
321            tokio::task::spawn_blocking(move || {
322                crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
323            })
324            .await
325            .map_err(|e| anyhow!("shared pump join error: {e}"))
326            .and_then(|r| r)
327        });
328    } else {
329        for (idx, sender) in frame_senders.into_iter().enumerate() {
330            let cfg = make_pump_cfg(params.decode_gpu_for(idx));
331            let input = params.input.clone();
332            let rt = tokio::runtime::Handle::current();
333            pump_tasks.spawn(async move {
334                tokio::task::spawn_blocking(move || {
335                    crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
336                })
337                .await
338                .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
339                .and_then(|r| r)
340            });
341        }
342    }
343
344    // Per-rung scalers.
345    let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
346    for (idx, rung) in rungs.iter().cloned().enumerate() {
347        let rx = frame_receivers[idx].take().expect("scaler rx slot");
348        let cfg = crate::rung_scaler::RungScalerConfig {
349            rung_idx: idx,
350            target_width: rung.width,
351            target_height: rung.height,
352            frames_per_chunk: params.keyframe_interval,
353        };
354        let queue = Arc::clone(&queues[idx]);
355        let rt = tokio::runtime::Handle::current();
356        let scaler_flag = Arc::clone(&scaler_active[idx]);
357        let active_h = Arc::clone(&active_workers);
358        let rung_done_h = Arc::clone(&rung_done);
359        scaler_flag.store(true, Ordering::Release);
360        active_h[idx].fetch_add(1, Ordering::AcqRel);
361        scaler_tasks.spawn(async move {
362            let result = tokio::task::spawn_blocking(move || {
363                crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
364            })
365            .await
366            .map_err(|e| anyhow!("scaler join error: {e}"))
367            .and_then(|r| r);
368            scaler_flag.store(false, Ordering::Release);
369            let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
370            if prev == 1 {
371                rung_done_h[idx].notify_one();
372            }
373            (idx, result)
374        });
375    }
376
377    // Initial encoder workers (one per rung, smallest first).
378    let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
379    let ctx = WorkerCtx {
380        frame_rate: params.frame_rate,
381        output_color_metadata: params.output_color_metadata,
382        output_pixel_format: params.output_pixel_format,
383        timescale: params.timescale,
384        per_frame_ticks: params.per_frame_ticks,
385        keyframe_interval: params.keyframe_interval,
386        segment_target_ticks: params.segment_target_ticks,
387        output_root: params.output_root.clone(),
388        constant_qp: params.constant_qp,
389    };
390    for (idx, rung) in indexed.iter().cloned() {
391        let lease = match Arc::clone(&params.gpu_pool).claim().await {
392            Some(l) => l,
393            None => {
394                progress_stop.store(true, Ordering::Release);
395                let _ = progress_handle.await;
396                bail!("multigpu: GPU pool returned no lease on a CPU-only host; at least one GPU is required");
397            }
398        };
399        spawn_encoder_worker(
400            &ctx,
401            idx,
402            &rung,
403            Arc::clone(&queues[idx]),
404            Arc::clone(&frames_encoded[idx]),
405            lease,
406            Arc::clone(&contributions),
407            Arc::clone(&active_workers),
408            Arc::clone(&rung_done),
409            Arc::clone(&rung_invariants[idx]),
410            Some(&mut worker_tasks),
411        );
412    }
413
414    // Helper dispatcher.
415    let helper_cancel = Arc::new(AtomicBool::new(false));
416    let helper_handle = {
417        let cancel = Arc::clone(&helper_cancel);
418        let pool = Arc::clone(&params.gpu_pool);
419        let queues = queues.clone();
420        let scaler_active = scaler_active.clone();
421        let frames_encoded = frames_encoded.clone();
422        let contributions = Arc::clone(&contributions);
423        let active_workers = Arc::clone(&active_workers);
424        let rung_done = Arc::clone(&rung_done);
425        let rung_invariants = rung_invariants.clone();
426        let rungs_owned: Vec<Rung> = rungs.to_vec();
427        let ctx = ctx.clone();
428        tokio::spawn(async move {
429            loop {
430                if cancel.load(Ordering::Acquire) {
431                    break;
432                }
433                tokio::time::sleep(HELPER_POLL_INTERVAL).await;
434                if pool.pending_claimers() > 0 {
435                    continue;
436                }
437                let mut target = None;
438                for (idx, q) in queues.iter().enumerate() {
439                    let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
440                    let has_pending = q.pushed_segments() > q.popped_segments();
441                    if scaler_alive || has_pending {
442                        target = Some(idx);
443                        break;
444                    }
445                }
446                let Some(rung_idx) = target else { break };
447                let lease = match pool.try_claim() {
448                    Some(l) => l,
449                    None => continue,
450                };
451                tracing::info!(rung_idx, gpu_index = lease.gpu_index, "multigpu helper dispatch");
452                spawn_encoder_worker(
453                    &ctx,
454                    rung_idx,
455                    &rungs_owned[rung_idx],
456                    Arc::clone(&queues[rung_idx]),
457                    Arc::clone(&frames_encoded[rung_idx]),
458                    lease,
459                    Arc::clone(&contributions),
460                    Arc::clone(&active_workers),
461                    Arc::clone(&rung_done),
462                    Arc::clone(&rung_invariants[rung_idx]),
463                    None,
464                );
465            }
466        })
467    };
468
469    // Drain everything.
470    let mut completed: Vec<Option<RungManifest>> = (0..n).map(|_| None).collect();
471    let mut pumps_remaining = pump_tasks.len();
472    let mut scalers_remaining = n;
473    let mut workers_remaining = n;
474    let mut finalizers_remaining = n;
475
476    macro_rules! teardown_err {
477        ($e:expr) => {{
478            helper_cancel.store(true, Ordering::Release);
479            let _ = helper_handle.await;
480            progress_stop.store(true, Ordering::Release);
481            let _ = progress_handle.await;
482            return Err($e);
483        }};
484    }
485
486    while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
487        tokio::select! {
488            biased;
489            p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
490                Some(Ok(Ok(n))) => { tracing::info!(frames = n, "decode pump finished"); pumps_remaining -= 1; }
491                Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
492                Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
493                None => pumps_remaining = 0,
494            },
495            s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
496                Some(Ok((idx, Ok(n)))) => { tracing::info!(idx, chunks = n, "scaler finished"); scalers_remaining -= 1; }
497                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
498                Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
499                None => scalers_remaining = 0,
500            },
501            w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
502                Some(Ok((idx, Ok(())))) => { tracing::info!(idx, "initial worker finished"); workers_remaining -= 1; }
503                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("worker for rung {idx} failed: {e}")),
504                Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
505                None => workers_remaining = 0,
506            },
507            f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
508                Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
509                Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
510                None => finalizers_remaining = 0,
511            },
512        }
513    }
514
515    helper_cancel.store(true, Ordering::Release);
516    let _ = helper_handle.await;
517    progress_stop.store(true, Ordering::Release);
518    let _ = progress_handle.await;
519    for h in finalizer_handles {
520        let _ = h.await;
521    }
522
523    Ok(completed)
524}
525
526/// Per-job constants shared by every encoder worker.
527#[derive(Clone)]
528struct WorkerCtx {
529    frame_rate: f64,
530    output_color_metadata: ColorMetadata,
531    output_pixel_format: PixelFormat,
532    timescale: u32,
533    per_frame_ticks: u32,
534    keyframe_interval: u32,
535    segment_target_ticks: u64,
536    output_root: PathBuf,
537    constant_qp: bool,
538}
539
540#[allow(clippy::too_many_arguments)]
541fn spawn_encoder_worker(
542    ctx: &WorkerCtx,
543    rung_idx: usize,
544    rung: &Rung,
545    queue: Arc<SegmentChunkQueue>,
546    frames_encoded: Arc<AtomicU64>,
547    lease: GpuLease,
548    contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>>,
549    active_workers: Arc<Vec<AtomicUsize>>,
550    rung_done: Arc<Vec<Notify>>,
551    rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
552    worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
553) {
554    let rel_dir = format!("video/{}", rung.label);
555    let output_dir = ctx.output_root.join(&rel_dir);
556    let gpu_index = lease.gpu_index;
557    let gpu_vendor = lease.vendor;
558
559    let cfg = EncoderWorkerConfig {
560        rung_idx,
561        width: rung.width,
562        height: rung.height,
563        frame_rate: ctx.frame_rate,
564        quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
565        speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
566        target: rung.quality.target,
567        tier: rung.quality.tier,
568        threads: 0,
569        gpu_index: Some(gpu_index),
570        gpu_vendor: Some(gpu_vendor),
571        output_color_metadata: ctx.output_color_metadata,
572        output_pixel_format: ctx.output_pixel_format,
573        constant_qp: ctx.constant_qp,
574        timescale: ctx.timescale,
575        per_frame_ticks: ctx.per_frame_ticks,
576        keyframe_interval: ctx.keyframe_interval,
577        segment_target_ticks: ctx.segment_target_ticks,
578        output_dir,
579        rung_invariant,
580    };
581
582    active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
583    let body = async move {
584        let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
585        let cfg_for_worker = cfg.clone();
586        let queue_for_worker = Arc::clone(&queue);
587        let rt = tokio::runtime::Handle::current();
588        let counter = Arc::clone(&frames_encoded);
589        let blocking = tokio::task::spawn_blocking(move || {
590            run_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx)
591        });
592        // Drain the per-frame progress channel (the shared AtomicU64 counter is
593        // the source of truth the reporter task reads; here we just keep the
594        // channel from backpressuring the worker).
595        let drain = async move { while progress_rx.recv().await.is_some() {} };
596        let (_, br) = tokio::join!(drain, blocking);
597
598        let task_status: Result<()> = match br {
599            Ok(Ok(out)) => {
600                contributions[rung_idx].lock().unwrap().push(out);
601                Ok(())
602            }
603            Ok(Err(e)) => Err(e),
604            Err(e) => Err(anyhow!("worker join error: {e}")),
605        };
606        drop(lease);
607        let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
608        if prev == 1 {
609            rung_done[rung_idx].notify_one();
610        }
611        (rung_idx, task_status)
612    };
613
614    match worker_tasks {
615        Some(set) => {
616            set.spawn(body);
617        }
618        None => {
619            tokio::spawn(async move {
620                let _ = body.await;
621            });
622        }
623    }
624}
625
626/// Periodic per-rung progress reporter. Reads the shared frame counters and
627/// emits `Running` updates until stopped; skips rungs already finalized.
628fn spawn_progress_reporter(
629    rungs: Vec<Rung>,
630    frames_encoded: Vec<Arc<AtomicU64>>,
631    finalized: Arc<Vec<AtomicBool>>,
632    total_input_frames: u64,
633    sink: Arc<dyn ProgressSink>,
634    stop: Arc<AtomicBool>,
635) -> tokio::task::JoinHandle<()> {
636    tokio::spawn(async move {
637        loop {
638            if stop.load(Ordering::Acquire) {
639                break;
640            }
641            tokio::time::sleep(PROGRESS_TICK).await;
642            for (idx, rung) in rungs.iter().enumerate() {
643                if finalized[idx].load(Ordering::Acquire) {
644                    continue;
645                }
646                let done = frames_encoded[idx].load(Ordering::Relaxed);
647                report(
648                    sink.as_ref(),
649                    idx,
650                    rung,
651                    RungStatus::Running,
652                    done,
653                    Some(total_input_frames),
654                    0,
655                    0,
656                    None,
657                );
658            }
659        }
660    })
661}
662
663#[allow(clippy::too_many_arguments)]
664fn report(
665    sink: &dyn ProgressSink,
666    rung_index: usize,
667    rung: &Rung,
668    status: RungStatus,
669    frames_done: u64,
670    frames_total: Option<u64>,
671    segments: u32,
672    bytes_out: u64,
673    message: Option<String>,
674) {
675    let percent = match status {
676        RungStatus::Completed => 100.0,
677        RungStatus::Pending => 0.0,
678        _ => match frames_total {
679            Some(t) if t > 0 => ((frames_done as f32 / t as f32) * 100.0).min(99.0),
680            _ => 1.0,
681        },
682    };
683    sink.on_rung(RungProgress {
684        rung_index,
685        label: rung.label.clone(),
686        width: rung.width,
687        height: rung.height,
688        status,
689        percent,
690        frames_done,
691        frames_total,
692        segments_written: segments,
693        bytes_out,
694        message,
695    });
696}
697
698/// Build a [`GpuPool`] from the host's detected GPU inventory.
699pub fn detect_gpu_pool() -> Arc<GpuPool> {
700    Arc::new(GpuPool::new(&codec::gpu::detect_gpus()))
701}
702
703fn policy_vendor(fam: GpuFamily) -> codec::gpu::GpuVendor {
704    match fam {
705        GpuFamily::Nvidia => codec::gpu::GpuVendor::Nvidia,
706        GpuFamily::Amd => codec::gpu::GpuVendor::Amd,
707        GpuFamily::Intel => codec::gpu::GpuVendor::Intel,
708    }
709}
710
711/// The host GPUs selected by an [`EncodePolicy`]: all of them for `AllGpus`,
712/// the first / pinned index for `SingleGpu`, every device of one vendor for
713/// `Family`.
714fn select_gpus_for_policy(policy: EncodePolicy) -> Vec<codec::gpu::GpuDevice> {
715    let gpus = codec::gpu::detect_gpus();
716    match policy {
717        EncodePolicy::AllGpus => gpus,
718        EncodePolicy::SingleGpu(None) => gpus.into_iter().take(1).collect(),
719        EncodePolicy::SingleGpu(Some(idx)) => gpus.into_iter().filter(|g| g.index == idx).collect(),
720        EncodePolicy::Family(fam) => {
721            let v = policy_vendor(fam);
722            gpus.into_iter().filter(|g| g.vendor == v).collect()
723        }
724    }
725}
726
727/// Build a [`GpuPool`] constrained to the given [`EncodePolicy`]. An empty pool
728/// (e.g. a pinned index or vendor family that isn't present) yields capacity 0,
729/// so the orchestrator's pre-flight probe / lease claim surfaces a clear error.
730///
731/// When more than one GPU is selected, cards that can't actually encode AV1
732/// (e.g. a pre-Ada NVIDIA that decodes via NVDEC but has no AV1 encode silicon)
733/// are dropped from the **encode** pool — so a worker never leases an incapable
734/// card and hard-fails the run; the capable cards (e.g. the Arc) do the encoding.
735/// A single selected GPU is left as-is, since the serial path's non-pinned
736/// encoder dispatch already falls through vendors. Dropped cards stay available
737/// for the decode pump ([`policy_gpu_indices`] is intentionally NOT filtered).
738pub fn gpu_pool_for_policy(policy: EncodePolicy) -> Arc<GpuPool> {
739    let selected = select_gpus_for_policy(policy);
740    let pool_gpus = if selected.len() > 1 {
741        selected.into_iter().filter(|g| codec::encode::av1_encode_capable(g)).collect()
742    } else {
743        selected
744    };
745    Arc::new(GpuPool::new(&pool_gpus))
746}
747
748/// The GPU indices an [`EncodePolicy`] selects, in detection order. Used to pin
749/// the decode pump to a device consistent with the policy (so decode honors a
750/// `Family` / `SingleGpu` constraint, not just encode).
751pub fn policy_gpu_indices(policy: EncodePolicy) -> Vec<u32> {
752    select_gpus_for_policy(policy).into_iter().map(|g| g.index).collect()
753}
754
755/// The GPU index to pin a *serial* (single-GPU) encode/decode to under a
756/// policy: `None` (auto/first-available) for `AllGpus`, the pinned index for
757/// `SingleGpu`, the first device of the vendor for `Family`.
758pub fn serial_gpu_for_policy(policy: EncodePolicy) -> Option<u32> {
759    match policy {
760        EncodePolicy::AllGpus => None,
761        EncodePolicy::SingleGpu(idx) => idx,
762        EncodePolicy::Family(_) => select_gpus_for_policy(policy).first().map(|g| g.index),
763    }
764}
765
766// ---------------------------------------------------------------------------
767// Single-file multi-GPU: chunk each rung across GPUs, stitch packets into MP4.
768// ---------------------------------------------------------------------------
769
770/// One rung's full ordered AV1 packet stream, stitched from chunks encoded
771/// across GPUs. The caller muxes these into a single MP4 (+ audio).
772#[derive(Debug)]
773pub struct RungPackets {
774    pub rung_index: usize,
775    pub width: u32,
776    pub height: u32,
777    pub label: String,
778    pub packets: Vec<EncodedPacket>,
779}
780
781/// Single-file counterpart to [`run_multigpu_hls`]: decode once, fan to per-rung
782/// scalers, and dynamically schedule each rung's GOP-sized chunks across all
783/// GPUs (fair lease pool + mid-flight helper dispatch + cross-vendor codec
784/// invariant). Each worker encodes its chunk to packets (a fresh encoder per
785/// chunk → first frame is an IDR); the finalizer concatenates them in segment
786/// order into one ordered packet stream per rung — no disk round-trip.
787pub async fn run_multigpu_single_file(
788    params: MultiGpuParams<'_>,
789    sink: Arc<dyn ProgressSink>,
790) -> Result<Vec<Option<RungPackets>>> {
791    let rungs = params.rungs;
792    let n = rungs.len();
793    if n == 0 {
794        return Ok(Vec::new());
795    }
796    let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
797    if total_segments == 0 {
798        bail!(
799            "multigpu single-file: total_segments == 0 (frames={}, keyframe_interval={})",
800            params.total_input_frames,
801            params.keyframe_interval
802        );
803    }
804
805    // Pre-flight encoder probe (same fail-fast as the HLS path).
806    {
807        let probe = codec::encode::EncoderConfig {
808            width: rungs[0].width,
809            height: rungs[0].height,
810            frame_rate: params.frame_rate,
811            gpu_index: None,
812            ..Default::default()
813        };
814        codec::encode::select_encoder(probe, None).map_err(|e| {
815            anyhow!(
816                "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
817                 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature"
818            )
819        })?;
820    }
821
822    tracing::info!(
823        rungs = n,
824        total_segments,
825        gpu_pool_capacity = params.gpu_pool.capacity(),
826        "multi-GPU single-file phase starting"
827    );
828
829    let queues: Vec<Arc<SegmentChunkQueue>> =
830        (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
831    let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
832    let scaler_active: Vec<Arc<AtomicBool>> =
833        (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
834    let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
835        (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
836    // Per-rung packet collectors (each its own Arc so chunk workers can push).
837    let contributions: Vec<Arc<std::sync::Mutex<Vec<ChunkPackets>>>> =
838        (0..n).map(|_| Arc::new(std::sync::Mutex::new(Vec::new()))).collect();
839    let active_workers: Arc<Vec<AtomicUsize>> =
840        Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
841    let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
842    let finalized: Arc<Vec<AtomicBool>> = Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
843
844    let progress_stop = Arc::new(AtomicBool::new(false));
845    let progress_handle = spawn_progress_reporter(
846        rungs.to_vec(),
847        frames_encoded.clone(),
848        finalized.clone(),
849        params.total_input_frames,
850        Arc::clone(&sink),
851        Arc::clone(&progress_stop),
852    );
853
854    // Finalizers: stitch each rung's chunks (sorted, deduped) into one stream.
855    let total_input_frames = params.total_input_frames;
856    let (finalizer_tx, mut finalizer_rx) =
857        mpsc::channel::<(usize, Result<Option<RungPackets>>)>(n.max(1));
858    let mut finalizer_handles = Vec::with_capacity(n);
859    for idx in 0..n {
860        let collector = Arc::clone(&contributions[idx]);
861        let active_h = Arc::clone(&active_workers);
862        let rung_done_h = Arc::clone(&rung_done);
863        let finalized_h = Arc::clone(&finalized);
864        let tx = finalizer_tx.clone();
865        let rung = rungs[idx].clone();
866        let total_segments = total_segments;
867        let sink = Arc::clone(&sink);
868        finalizer_handles.push(tokio::spawn(async move {
869            loop {
870                let notified = rung_done_h[idx].notified();
871                if active_h[idx].load(Ordering::Acquire) == 0 {
872                    break;
873                }
874                notified.await;
875            }
876            let mut chunks: Vec<ChunkPackets> = std::mem::take(&mut *collector.lock().unwrap());
877            if chunks.is_empty() {
878                finalized_h[idx].store(true, Ordering::Release);
879                let _ = tx.send((idx, Ok(None))).await;
880                return;
881            }
882            chunks.sort_by_key(|c| c.segment_idx);
883            chunks.dedup_by_key(|c| c.segment_idx);
884            // Coverage: contiguous 0..total_segments.
885            let got = chunks.len();
886            let contiguous = chunks
887                .iter()
888                .enumerate()
889                .all(|(i, c)| c.segment_idx == i);
890            let result = if got != total_segments as usize || !contiguous {
891                Err(anyhow!(
892                    "rung {} chunk coverage incomplete: expected {} contiguous chunks, got {}",
893                    rung.label,
894                    total_segments,
895                    got
896                ))
897            } else {
898                let mut packets: Vec<EncodedPacket> = Vec::new();
899                for c in chunks {
900                    packets.extend(c.packets);
901                }
902                let bytes: u64 = packets.iter().map(|p| p.data.len() as u64).sum();
903                report(
904                    sink.as_ref(),
905                    idx,
906                    &rung,
907                    RungStatus::Completed,
908                    total_input_frames,
909                    Some(total_input_frames),
910                    got as u32,
911                    bytes,
912                    None,
913                );
914                Ok(Some(RungPackets {
915                    rung_index: idx,
916                    width: rung.width,
917                    height: rung.height,
918                    label: rung.label.clone(),
919                    packets,
920                }))
921            };
922            finalized_h[idx].store(true, Ordering::Release);
923            let _ = tx.send((idx, result)).await;
924        }));
925    }
926    drop(finalizer_tx);
927
928    let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
929    indexed.sort_by_key(|(_, r)| r.short_side());
930
931    // Decode pump(s) + fan-out.
932    let mut frame_senders = Vec::with_capacity(n);
933    let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
934        Vec::with_capacity(n);
935    for _ in 0..n {
936        let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
937        frame_senders.push(tx);
938        frame_receivers.push(Some(rx));
939    }
940    let use_shared_pump = n <= params.gpu_pool.capacity();
941    let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
942    let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
943        codec_name: params.header.codec.clone(),
944        info_for_decoder: params.header.info.clone(),
945        source_color_metadata: params.source_color_metadata,
946        source_pixel_format: params.source_pixel_format,
947        needs_downsample: params.needs_downsample,
948        tonemap_to_sdr: params.tonemap_to_sdr,
949        gpu_index,
950        filters: params.filters.clone(),
951    };
952    if use_shared_pump {
953        let cfg = make_pump_cfg(params.decode_gpu_for(0));
954        let senders = frame_senders;
955        let input = params.input.clone();
956        let rt = tokio::runtime::Handle::current();
957        pump_tasks.spawn(async move {
958            tokio::task::spawn_blocking(move || {
959                crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
960            })
961            .await
962            .map_err(|e| anyhow!("shared pump join error: {e}"))
963            .and_then(|r| r)
964        });
965    } else {
966        for (idx, sender) in frame_senders.into_iter().enumerate() {
967            let cfg = make_pump_cfg(params.decode_gpu_for(idx));
968            let input = params.input.clone();
969            let rt = tokio::runtime::Handle::current();
970            pump_tasks.spawn(async move {
971                tokio::task::spawn_blocking(move || {
972                    crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
973                })
974                .await
975                .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
976                .and_then(|r| r)
977            });
978        }
979    }
980
981    // Per-rung scalers.
982    let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
983    for (idx, rung) in rungs.iter().cloned().enumerate() {
984        let rx = frame_receivers[idx].take().expect("scaler rx slot");
985        let cfg = crate::rung_scaler::RungScalerConfig {
986            rung_idx: idx,
987            target_width: rung.width,
988            target_height: rung.height,
989            frames_per_chunk: params.keyframe_interval,
990        };
991        let queue = Arc::clone(&queues[idx]);
992        let rt = tokio::runtime::Handle::current();
993        let scaler_flag = Arc::clone(&scaler_active[idx]);
994        let active_h = Arc::clone(&active_workers);
995        let rung_done_h = Arc::clone(&rung_done);
996        scaler_flag.store(true, Ordering::Release);
997        active_h[idx].fetch_add(1, Ordering::AcqRel);
998        scaler_tasks.spawn(async move {
999            let result = tokio::task::spawn_blocking(move || {
1000                crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
1001            })
1002            .await
1003            .map_err(|e| anyhow!("scaler join error: {e}"))
1004            .and_then(|r| r);
1005            scaler_flag.store(false, Ordering::Release);
1006            let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
1007            if prev == 1 {
1008                rung_done_h[idx].notify_one();
1009            }
1010            (idx, result)
1011        });
1012    }
1013
1014    // Initial chunk workers.
1015    let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
1016    let ctx = WorkerCtx {
1017        frame_rate: params.frame_rate,
1018        output_color_metadata: params.output_color_metadata,
1019        output_pixel_format: params.output_pixel_format,
1020        timescale: params.timescale,
1021        per_frame_ticks: params.per_frame_ticks,
1022        keyframe_interval: params.keyframe_interval,
1023        segment_target_ticks: params.segment_target_ticks,
1024        output_root: params.output_root.clone(),
1025        constant_qp: params.constant_qp,
1026    };
1027    for (idx, rung) in indexed.iter().cloned() {
1028        let lease = match Arc::clone(&params.gpu_pool).claim().await {
1029            Some(l) => l,
1030            None => {
1031                progress_stop.store(true, Ordering::Release);
1032                let _ = progress_handle.await;
1033                bail!("multigpu single-file: GPU pool returned no lease; at least one GPU required");
1034            }
1035        };
1036        spawn_chunk_worker(
1037            &ctx,
1038            idx,
1039            &rung,
1040            Arc::clone(&queues[idx]),
1041            Arc::clone(&frames_encoded[idx]),
1042            lease,
1043            Arc::clone(&contributions[idx]),
1044            Arc::clone(&active_workers),
1045            Arc::clone(&rung_done),
1046            Arc::clone(&rung_invariants[idx]),
1047            Some(&mut worker_tasks),
1048        );
1049    }
1050
1051    // Helper dispatcher.
1052    let helper_cancel = Arc::new(AtomicBool::new(false));
1053    let helper_handle = {
1054        let cancel = Arc::clone(&helper_cancel);
1055        let pool = Arc::clone(&params.gpu_pool);
1056        let queues = queues.clone();
1057        let scaler_active = scaler_active.clone();
1058        let frames_encoded = frames_encoded.clone();
1059        let contributions = contributions.clone();
1060        let active_workers = Arc::clone(&active_workers);
1061        let rung_done = Arc::clone(&rung_done);
1062        let rung_invariants = rung_invariants.clone();
1063        let rungs_owned: Vec<Rung> = rungs.to_vec();
1064        let ctx = ctx.clone();
1065        tokio::spawn(async move {
1066            loop {
1067                if cancel.load(Ordering::Acquire) {
1068                    break;
1069                }
1070                tokio::time::sleep(HELPER_POLL_INTERVAL).await;
1071                if pool.pending_claimers() > 0 {
1072                    continue;
1073                }
1074                let mut target = None;
1075                for (idx, q) in queues.iter().enumerate() {
1076                    let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
1077                    let has_pending = q.pushed_segments() > q.popped_segments();
1078                    if scaler_alive || has_pending {
1079                        target = Some(idx);
1080                        break;
1081                    }
1082                }
1083                let Some(rung_idx) = target else { break };
1084                let lease = match pool.try_claim() {
1085                    Some(l) => l,
1086                    None => continue,
1087                };
1088                tracing::info!(rung_idx, gpu_index = lease.gpu_index, "single-file helper dispatch");
1089                spawn_chunk_worker(
1090                    &ctx,
1091                    rung_idx,
1092                    &rungs_owned[rung_idx],
1093                    Arc::clone(&queues[rung_idx]),
1094                    Arc::clone(&frames_encoded[rung_idx]),
1095                    lease,
1096                    Arc::clone(&contributions[rung_idx]),
1097                    Arc::clone(&active_workers),
1098                    Arc::clone(&rung_done),
1099                    Arc::clone(&rung_invariants[rung_idx]),
1100                    None,
1101                );
1102            }
1103        })
1104    };
1105
1106    // Drain.
1107    let mut completed: Vec<Option<RungPackets>> = (0..n).map(|_| None).collect();
1108    let mut pumps_remaining = pump_tasks.len();
1109    let mut scalers_remaining = n;
1110    let mut workers_remaining = n;
1111    let mut finalizers_remaining = n;
1112    macro_rules! teardown_err {
1113        ($e:expr) => {{
1114            helper_cancel.store(true, Ordering::Release);
1115            let _ = helper_handle.await;
1116            progress_stop.store(true, Ordering::Release);
1117            let _ = progress_handle.await;
1118            return Err($e);
1119        }};
1120    }
1121    while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
1122        tokio::select! {
1123            biased;
1124            p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
1125                Some(Ok(Ok(_))) => pumps_remaining -= 1,
1126                Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
1127                Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
1128                None => pumps_remaining = 0,
1129            },
1130            s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
1131                Some(Ok((_, Ok(_)))) => scalers_remaining -= 1,
1132                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
1133                Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
1134                None => scalers_remaining = 0,
1135            },
1136            w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
1137                Some(Ok((_, Ok(())))) => workers_remaining -= 1,
1138                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("chunk worker for rung {idx} failed: {e}")),
1139                Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
1140                None => workers_remaining = 0,
1141            },
1142            f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
1143                Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
1144                Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
1145                None => finalizers_remaining = 0,
1146            },
1147        }
1148    }
1149    helper_cancel.store(true, Ordering::Release);
1150    let _ = helper_handle.await;
1151    progress_stop.store(true, Ordering::Release);
1152    let _ = progress_handle.await;
1153    for h in finalizer_handles {
1154        let _ = h.await;
1155    }
1156    Ok(completed)
1157}
1158
1159#[allow(clippy::too_many_arguments)]
1160fn spawn_chunk_worker(
1161    ctx: &WorkerCtx,
1162    rung_idx: usize,
1163    rung: &Rung,
1164    queue: Arc<SegmentChunkQueue>,
1165    frames_encoded: Arc<AtomicU64>,
1166    lease: GpuLease,
1167    collector: Arc<std::sync::Mutex<Vec<ChunkPackets>>>,
1168    active_workers: Arc<Vec<AtomicUsize>>,
1169    rung_done: Arc<Vec<Notify>>,
1170    rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
1171    worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
1172) {
1173    let gpu_index = lease.gpu_index;
1174    let gpu_vendor = lease.vendor;
1175    let cfg = EncoderWorkerConfig {
1176        rung_idx,
1177        width: rung.width,
1178        height: rung.height,
1179        frame_rate: ctx.frame_rate,
1180        quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
1181        speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
1182        target: rung.quality.target,
1183        tier: rung.quality.tier,
1184        threads: 0,
1185        gpu_index: Some(gpu_index),
1186        gpu_vendor: Some(gpu_vendor),
1187        output_color_metadata: ctx.output_color_metadata,
1188        output_pixel_format: ctx.output_pixel_format,
1189        constant_qp: ctx.constant_qp,
1190        timescale: ctx.timescale,
1191        per_frame_ticks: ctx.per_frame_ticks,
1192        keyframe_interval: ctx.keyframe_interval,
1193        segment_target_ticks: ctx.segment_target_ticks,
1194        output_dir: ctx.output_root.clone(),
1195        rung_invariant,
1196    };
1197    active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
1198    let body = async move {
1199        let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
1200        let cfg_for_worker = cfg.clone();
1201        let queue_for_worker = Arc::clone(&queue);
1202        let rt = tokio::runtime::Handle::current();
1203        let counter = Arc::clone(&frames_encoded);
1204        let out = Arc::clone(&collector);
1205        let blocking = tokio::task::spawn_blocking(move || {
1206            run_chunk_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx, out)
1207        });
1208        let drain = async move { while progress_rx.recv().await.is_some() {} };
1209        let (_, br) = tokio::join!(drain, blocking);
1210        let task_status: Result<()> = match br {
1211            Ok(Ok(())) => Ok(()),
1212            Ok(Err(e)) => Err(e),
1213            Err(e) => Err(anyhow!("chunk worker join error: {e}")),
1214        };
1215        drop(lease);
1216        let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
1217        if prev == 1 {
1218            rung_done[rung_idx].notify_one();
1219        }
1220        (rung_idx, task_status)
1221    };
1222    match worker_tasks {
1223        Some(set) => {
1224            set.spawn(body);
1225        }
1226        None => {
1227            tokio::spawn(async move {
1228                let _ = body.await;
1229            });
1230        }
1231    }
1232}