Skip to main content

rivet/
multigpu.rs

1//! Multi-GPU reactive variant phase — **the rung benefit**.
2//!
3//! Decode the source **once** and dynamically schedule every rung's CMAF
4//! segments across all available GPUs using a fair lease pool with mid-flight
5//! helper dispatch:
6//!
7//! ```text
8//!   decode pump (decode once)
9//!        │  fan out normalized frames
10//!        ▼
11//!   per-rung scaler ──► SegmentChunkQueue ──► encoder worker (holds a GpuLease)
12//!                                        ──► helper worker (claims a freed lease)
13//! ```
14//!
15//! - One encoder per GPU at a time ([`GpuPool`] enforces it — concurrent
16//!   NVENC sessions on one context deadlock).
17//! - A fast rung releases its lease early; the **helper dispatcher** grabs the
18//!   freed lease and attaches an extra worker to a still-busy rung, so a slow
19//!   rung finishes sooner. Segment work is the unit of parallelism.
20//! - Helpers may land on a different GPU **vendor** than the rung's first
21//!   worker; the per-rung AV1 **codec invariant** ([`RungCodecInvariant`])
22//!   guarantees every contributed segment shares the `av1C` contract, so a
23//!   cross-vendor (NVENC + QSV) rendition still decodes cleanly. A mismatched
24//!   helper requeues its chunk and exits — the run never aborts on it.
25//!
26//! Storage/transport specifics stay out of the engine: progress is reported
27//! through the generic [`ProgressSink`], so a consumer can layer an uploader
28//! (object storage, a status queue, …) on top by watching `RungStatus::Completed`.
29
30use std::path::PathBuf;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
33
34use anyhow::{Result, anyhow, bail};
35use bytes::Bytes;
36use codec::encode::AUTO_FROM_TARGET;
37use codec::frame::{ColorMetadata, PixelFormat};
38use container::cmaf::CmafTrackManifest;
39use container::streaming::DemuxHeader;
40use tokio::sync::{Notify, mpsc};
41use tokio::task::JoinSet;
42
43use codec::encode::EncodedPacket;
44
45use crate::cmaf_util::{RungContribution, merge_rung_contributions, total_segments_for_rung};
46use crate::decode_pump::DecodePumpConfig;
47use crate::encoder_worker::{
48    ChunkPackets, EncoderWorkerConfig, RungCodecInvariant, WorkerOutput,
49    run_chunk_encoder_worker_blocking, run_encoder_worker_blocking,
50};
51use crate::frame_queue::SegmentChunkQueue;
52use crate::gpu_pool::{GpuLease, GpuPool};
53use crate::progress::{ProgressSink, RungProgress, RungStatus};
54use crate::spec::{EncodePolicy, GpuFamily, Rung};
55
56const QUEUE_CAPACITY: usize = 2;
57const FANOUT_CHANNEL_CAPACITY: usize = 4;
58const HELPER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(200);
59const PROGRESS_TICK: std::time::Duration = std::time::Duration::from_millis(500);
60
61/// One rung's finalized CMAF manifest.
62#[derive(Debug, Clone)]
63pub struct RungManifest {
64    pub rung_index: usize,
65    pub width: u32,
66    pub height: u32,
67    pub label: String,
68    /// Directory relative to the asset root, e.g. `"video/720p"`.
69    pub relative_dir: String,
70    pub manifest: CmafTrackManifest,
71}
72
73/// Inputs to [`run_multigpu_hls`].
74pub struct MultiGpuParams<'a> {
75    pub input: Bytes,
76    pub rungs: &'a [Rung],
77    pub header: DemuxHeader,
78    pub source_color_metadata: ColorMetadata,
79    pub source_pixel_format: PixelFormat,
80    /// Whether the decode pump tonemaps HDR→SDR (from the spec's `ColorPolicy`).
81    pub tonemap_to_sdr: bool,
82    /// Resolved **output** color metadata + pixel format the encoders target
83    /// (from `OutputSpec::resolve_output`).
84    pub output_color_metadata: ColorMetadata,
85    pub output_pixel_format: PixelFormat,
86    pub needs_downsample: bool,
87    pub frame_rate: f64,
88    pub gpu_pool: Arc<GpuPool>,
89    /// GPU indices the encode policy selected, in detection order. The decode
90    /// pump pins to these (round-robin for per-rung pumps) so decode honors the
91    /// same `Family` / `SingleGpu` / `AllGpus` constraint as encode. Empty ⇒
92    /// the decoder dispatch auto-selects (legacy behavior).
93    pub gpu_indices: Vec<u32>,
94    /// Explicit decode-pump GPU override. `Some(i)` forces every decode pump
95    /// onto GPU `i` regardless of `gpu_indices`; `None` follows the policy.
96    pub decode_gpu: Option<u32>,
97    pub output_root: PathBuf,
98    pub timescale: u32,
99    pub per_frame_ticks: u32,
100    pub keyframe_interval: u32,
101    pub segment_target_ticks: u64,
102    pub total_input_frames: u64,
103    /// Force constant-QP chunk encoding (single-file `ChunkSeamMode::ParallelConstQp`)
104    /// so stitched chunk seams are quality-flat. `false` for HLS (segments are
105    /// independent) and the default `Parallel` single-file mode.
106    pub constant_qp: bool,
107}
108
109impl MultiGpuParams<'_> {
110    /// Resolve the decode-pump GPU for the `i`-th per-rung pump (or the shared
111    /// pump when `i == 0`): the explicit `decode_gpu` override wins, else the
112    /// policy's GPU indices round-robin, else `None` (decoder auto-select).
113    fn decode_gpu_for(&self, i: usize) -> Option<u32> {
114        if self.decode_gpu.is_some() {
115            return self.decode_gpu;
116        }
117        if self.gpu_indices.is_empty() {
118            return None;
119        }
120        Some(self.gpu_indices[i % self.gpu_indices.len()])
121    }
122}
123
124/// Run the reactive multi-GPU variant phase. Returns one `Option<RungManifest>`
125/// per rung (in rung order); `None` means the rung produced no segments.
126pub async fn run_multigpu_hls(
127    params: MultiGpuParams<'_>,
128    sink: Arc<dyn ProgressSink>,
129) -> Result<Vec<Option<RungManifest>>> {
130    let rungs = params.rungs;
131    let n = rungs.len();
132    if n == 0 {
133        return Ok(Vec::new());
134    }
135    let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
136    if total_segments == 0 {
137        bail!(
138            "multigpu: total_segments == 0 (total_input_frames={}, keyframe_interval={})",
139            params.total_input_frames,
140            params.keyframe_interval
141        );
142    }
143
144    // Pre-flight: verify this host can actually construct an AV1 encoder
145    // before spawning the orchestration. Fail fast with a clear error instead
146    // of dispatching workers that fail at encoder construction — and, on
147    // drivers that re-init a failed NVENC session badly (e.g. Ampere with no
148    // AV1-encode silicon), would otherwise hang an uncancellable blocking task.
149    {
150        let probe = codec::encode::EncoderConfig {
151            width: rungs[0].width,
152            height: rungs[0].height,
153            frame_rate: params.frame_rate,
154            gpu_index: None,
155            ..Default::default()
156        };
157        codec::encode::select_encoder(probe, None).map_err(|e| {
158            anyhow!(
159                "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
160                 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature for a software encoder"
161            )
162        })?;
163    }
164
165    tracing::info!(
166        rungs = n,
167        total_segments,
168        gpu_pool_capacity = params.gpu_pool.capacity(),
169        "multi-GPU variant phase starting"
170    );
171
172    // Per-rung shared state.
173    let queues: Vec<Arc<SegmentChunkQueue>> =
174        (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
175    let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
176    let scaler_active: Vec<Arc<AtomicBool>> =
177        (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
178    let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
179        (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
180    let contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>> =
181        Arc::new((0..n).map(|_| std::sync::Mutex::new(Vec::new())).collect());
182    let active_workers: Arc<Vec<AtomicUsize>> =
183        Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
184    let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
185    let finalized: Arc<Vec<AtomicBool>> =
186        Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
187
188    // Periodic progress reporter.
189    let progress_stop = Arc::new(AtomicBool::new(false));
190    let progress_handle = spawn_progress_reporter(
191        rungs.to_vec(),
192        frames_encoded.clone(),
193        finalized.clone(),
194        params.total_input_frames,
195        Arc::clone(&sink),
196        Arc::clone(&progress_stop),
197    );
198
199    // Finalizers: one per rung, merges contributions → RungManifest.
200    let total_input_frames = params.total_input_frames;
201    let (finalizer_tx, mut finalizer_rx) =
202        mpsc::channel::<(usize, Result<Option<RungManifest>>)>(n.max(1));
203    let mut finalizer_handles = Vec::with_capacity(n);
204    for idx in 0..n {
205        let contributions_h = Arc::clone(&contributions);
206        let active_h = Arc::clone(&active_workers);
207        let rung_done_h = Arc::clone(&rung_done);
208        let finalized_h = Arc::clone(&finalized);
209        let tx = finalizer_tx.clone();
210        let rung = rungs[idx].clone();
211        let rel_dir = format!("video/{}", rung.label);
212        let output_root = params.output_root.clone();
213        let timescale = params.timescale;
214        let total_segments = total_segments;
215        let sink = Arc::clone(&sink);
216        finalizer_handles.push(tokio::spawn(async move {
217            // Wait for all of this rung's workers + the scaler to finish.
218            loop {
219                let notified = rung_done_h[idx].notified();
220                if active_h[idx].load(Ordering::Acquire) == 0 {
221                    break;
222                }
223                notified.await;
224            }
225            let outputs: Vec<WorkerOutput> = std::mem::take(&mut *contributions_h[idx].lock().unwrap());
226            if outputs.is_empty() {
227                finalized_h[idx].store(true, Ordering::Release);
228                let _ = tx.send((idx, Ok(None))).await;
229                return;
230            }
231            let init_path = output_root.join(&rel_dir).join("init.mp4");
232            let contribs: Vec<RungContribution> = outputs
233                .into_iter()
234                .map(|wo| RungContribution {
235                    width: rung.width,
236                    height: rung.height,
237                    relative_dir: rel_dir.clone(),
238                    manifest: CmafTrackManifest {
239                        init_path: init_path.clone(),
240                        segments: wo.segments,
241                        timescale,
242                    },
243                })
244                .collect();
245            let result = match merge_rung_contributions(contribs) {
246                Ok(merged) => {
247                    let got = merged.manifest.segments.len();
248                    if got != total_segments as usize {
249                        Err(anyhow!(
250                            "rung {} coverage incomplete: expected {} segments, got {}",
251                            rung.label,
252                            total_segments,
253                            got
254                        ))
255                    } else {
256                        let bytes: u64 = merged.manifest.segments.iter().map(|s| s.byte_size).sum();
257                        report(
258                            sink.as_ref(),
259                            idx,
260                            &rung,
261                            RungStatus::Completed,
262                            total_input_frames,
263                            Some(total_input_frames),
264                            got as u32,
265                            bytes,
266                            None,
267                        );
268                        Ok(Some(RungManifest {
269                            rung_index: idx,
270                            width: rung.width,
271                            height: rung.height,
272                            label: rung.label.clone(),
273                            relative_dir: rel_dir.clone(),
274                            manifest: merged.manifest,
275                        }))
276                    }
277                }
278                Err(e) => Err(anyhow!("merging contributions for rung {}: {e}", rung.label)),
279            };
280            finalized_h[idx].store(true, Ordering::Release);
281            let _ = tx.send((idx, result)).await;
282        }));
283    }
284    drop(finalizer_tx);
285
286    // Smallest-first claim order for initial workers.
287    let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
288    indexed.sort_by_key(|(_, r)| r.short_side());
289
290    // Decode pump(s) + fan-out channels.
291    let mut frame_senders = Vec::with_capacity(n);
292    let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
293        Vec::with_capacity(n);
294    for _ in 0..n {
295        let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
296        frame_senders.push(tx);
297        frame_receivers.push(Some(rx));
298    }
299
300    let use_shared_pump = n <= params.gpu_pool.capacity();
301    let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
302    let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
303        codec_name: params.header.codec.clone(),
304        info_for_decoder: params.header.info.clone(),
305        source_color_metadata: params.source_color_metadata,
306        source_pixel_format: params.source_pixel_format,
307        needs_downsample: params.needs_downsample,
308        tonemap_to_sdr: params.tonemap_to_sdr,
309        gpu_index,
310    };
311    if use_shared_pump {
312        let cfg = make_pump_cfg(params.decode_gpu_for(0));
313        let senders = frame_senders;
314        let input = params.input.clone();
315        let rt = tokio::runtime::Handle::current();
316        pump_tasks.spawn(async move {
317            tokio::task::spawn_blocking(move || {
318                crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
319            })
320            .await
321            .map_err(|e| anyhow!("shared pump join error: {e}"))
322            .and_then(|r| r)
323        });
324    } else {
325        for (idx, sender) in frame_senders.into_iter().enumerate() {
326            let cfg = make_pump_cfg(params.decode_gpu_for(idx));
327            let input = params.input.clone();
328            let rt = tokio::runtime::Handle::current();
329            pump_tasks.spawn(async move {
330                tokio::task::spawn_blocking(move || {
331                    crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
332                })
333                .await
334                .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
335                .and_then(|r| r)
336            });
337        }
338    }
339
340    // Per-rung scalers.
341    let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
342    for (idx, rung) in rungs.iter().cloned().enumerate() {
343        let rx = frame_receivers[idx].take().expect("scaler rx slot");
344        let cfg = crate::rung_scaler::RungScalerConfig {
345            rung_idx: idx,
346            target_width: rung.width,
347            target_height: rung.height,
348            frames_per_chunk: params.keyframe_interval,
349        };
350        let queue = Arc::clone(&queues[idx]);
351        let rt = tokio::runtime::Handle::current();
352        let scaler_flag = Arc::clone(&scaler_active[idx]);
353        let active_h = Arc::clone(&active_workers);
354        let rung_done_h = Arc::clone(&rung_done);
355        scaler_flag.store(true, Ordering::Release);
356        active_h[idx].fetch_add(1, Ordering::AcqRel);
357        scaler_tasks.spawn(async move {
358            let result = tokio::task::spawn_blocking(move || {
359                crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
360            })
361            .await
362            .map_err(|e| anyhow!("scaler join error: {e}"))
363            .and_then(|r| r);
364            scaler_flag.store(false, Ordering::Release);
365            let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
366            if prev == 1 {
367                rung_done_h[idx].notify_one();
368            }
369            (idx, result)
370        });
371    }
372
373    // Initial encoder workers (one per rung, smallest first).
374    let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
375    let ctx = WorkerCtx {
376        frame_rate: params.frame_rate,
377        output_color_metadata: params.output_color_metadata,
378        output_pixel_format: params.output_pixel_format,
379        timescale: params.timescale,
380        per_frame_ticks: params.per_frame_ticks,
381        keyframe_interval: params.keyframe_interval,
382        segment_target_ticks: params.segment_target_ticks,
383        output_root: params.output_root.clone(),
384        constant_qp: params.constant_qp,
385    };
386    for (idx, rung) in indexed.iter().cloned() {
387        let lease = match Arc::clone(&params.gpu_pool).claim().await {
388            Some(l) => l,
389            None => {
390                progress_stop.store(true, Ordering::Release);
391                let _ = progress_handle.await;
392                bail!("multigpu: GPU pool returned no lease on a CPU-only host; at least one GPU is required");
393            }
394        };
395        spawn_encoder_worker(
396            &ctx,
397            idx,
398            &rung,
399            Arc::clone(&queues[idx]),
400            Arc::clone(&frames_encoded[idx]),
401            lease,
402            Arc::clone(&contributions),
403            Arc::clone(&active_workers),
404            Arc::clone(&rung_done),
405            Arc::clone(&rung_invariants[idx]),
406            Some(&mut worker_tasks),
407        );
408    }
409
410    // Helper dispatcher.
411    let helper_cancel = Arc::new(AtomicBool::new(false));
412    let helper_handle = {
413        let cancel = Arc::clone(&helper_cancel);
414        let pool = Arc::clone(&params.gpu_pool);
415        let queues = queues.clone();
416        let scaler_active = scaler_active.clone();
417        let frames_encoded = frames_encoded.clone();
418        let contributions = Arc::clone(&contributions);
419        let active_workers = Arc::clone(&active_workers);
420        let rung_done = Arc::clone(&rung_done);
421        let rung_invariants = rung_invariants.clone();
422        let rungs_owned: Vec<Rung> = rungs.to_vec();
423        let ctx = ctx.clone();
424        tokio::spawn(async move {
425            loop {
426                if cancel.load(Ordering::Acquire) {
427                    break;
428                }
429                tokio::time::sleep(HELPER_POLL_INTERVAL).await;
430                if pool.pending_claimers() > 0 {
431                    continue;
432                }
433                let mut target = None;
434                for (idx, q) in queues.iter().enumerate() {
435                    let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
436                    let has_pending = q.pushed_segments() > q.popped_segments();
437                    if scaler_alive || has_pending {
438                        target = Some(idx);
439                        break;
440                    }
441                }
442                let Some(rung_idx) = target else { break };
443                let lease = match pool.try_claim() {
444                    Some(l) => l,
445                    None => continue,
446                };
447                tracing::info!(rung_idx, gpu_index = lease.gpu_index, "multigpu helper dispatch");
448                spawn_encoder_worker(
449                    &ctx,
450                    rung_idx,
451                    &rungs_owned[rung_idx],
452                    Arc::clone(&queues[rung_idx]),
453                    Arc::clone(&frames_encoded[rung_idx]),
454                    lease,
455                    Arc::clone(&contributions),
456                    Arc::clone(&active_workers),
457                    Arc::clone(&rung_done),
458                    Arc::clone(&rung_invariants[rung_idx]),
459                    None,
460                );
461            }
462        })
463    };
464
465    // Drain everything.
466    let mut completed: Vec<Option<RungManifest>> = (0..n).map(|_| None).collect();
467    let mut pumps_remaining = pump_tasks.len();
468    let mut scalers_remaining = n;
469    let mut workers_remaining = n;
470    let mut finalizers_remaining = n;
471
472    macro_rules! teardown_err {
473        ($e:expr) => {{
474            helper_cancel.store(true, Ordering::Release);
475            let _ = helper_handle.await;
476            progress_stop.store(true, Ordering::Release);
477            let _ = progress_handle.await;
478            return Err($e);
479        }};
480    }
481
482    while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
483        tokio::select! {
484            biased;
485            p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
486                Some(Ok(Ok(n))) => { tracing::info!(frames = n, "decode pump finished"); pumps_remaining -= 1; }
487                Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
488                Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
489                None => pumps_remaining = 0,
490            },
491            s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
492                Some(Ok((idx, Ok(n)))) => { tracing::info!(idx, chunks = n, "scaler finished"); scalers_remaining -= 1; }
493                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
494                Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
495                None => scalers_remaining = 0,
496            },
497            w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
498                Some(Ok((idx, Ok(())))) => { tracing::info!(idx, "initial worker finished"); workers_remaining -= 1; }
499                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("worker for rung {idx} failed: {e}")),
500                Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
501                None => workers_remaining = 0,
502            },
503            f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
504                Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
505                Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
506                None => finalizers_remaining = 0,
507            },
508        }
509    }
510
511    helper_cancel.store(true, Ordering::Release);
512    let _ = helper_handle.await;
513    progress_stop.store(true, Ordering::Release);
514    let _ = progress_handle.await;
515    for h in finalizer_handles {
516        let _ = h.await;
517    }
518
519    Ok(completed)
520}
521
522/// Per-job constants shared by every encoder worker.
523#[derive(Clone)]
524struct WorkerCtx {
525    frame_rate: f64,
526    output_color_metadata: ColorMetadata,
527    output_pixel_format: PixelFormat,
528    timescale: u32,
529    per_frame_ticks: u32,
530    keyframe_interval: u32,
531    segment_target_ticks: u64,
532    output_root: PathBuf,
533    constant_qp: bool,
534}
535
536#[allow(clippy::too_many_arguments)]
537fn spawn_encoder_worker(
538    ctx: &WorkerCtx,
539    rung_idx: usize,
540    rung: &Rung,
541    queue: Arc<SegmentChunkQueue>,
542    frames_encoded: Arc<AtomicU64>,
543    lease: GpuLease,
544    contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>>,
545    active_workers: Arc<Vec<AtomicUsize>>,
546    rung_done: Arc<Vec<Notify>>,
547    rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
548    worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
549) {
550    let rel_dir = format!("video/{}", rung.label);
551    let output_dir = ctx.output_root.join(&rel_dir);
552    let gpu_index = lease.gpu_index;
553    let gpu_vendor = lease.vendor;
554
555    let cfg = EncoderWorkerConfig {
556        rung_idx,
557        width: rung.width,
558        height: rung.height,
559        frame_rate: ctx.frame_rate,
560        quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
561        speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
562        target: rung.quality.target,
563        tier: rung.quality.tier,
564        threads: 0,
565        gpu_index: Some(gpu_index),
566        gpu_vendor: Some(gpu_vendor),
567        output_color_metadata: ctx.output_color_metadata,
568        output_pixel_format: ctx.output_pixel_format,
569        constant_qp: ctx.constant_qp,
570        timescale: ctx.timescale,
571        per_frame_ticks: ctx.per_frame_ticks,
572        keyframe_interval: ctx.keyframe_interval,
573        segment_target_ticks: ctx.segment_target_ticks,
574        output_dir,
575        rung_invariant,
576    };
577
578    active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
579    let body = async move {
580        let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
581        let cfg_for_worker = cfg.clone();
582        let queue_for_worker = Arc::clone(&queue);
583        let rt = tokio::runtime::Handle::current();
584        let counter = Arc::clone(&frames_encoded);
585        let blocking = tokio::task::spawn_blocking(move || {
586            run_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx)
587        });
588        // Drain the per-frame progress channel (the shared AtomicU64 counter is
589        // the source of truth the reporter task reads; here we just keep the
590        // channel from backpressuring the worker).
591        let drain = async move { while progress_rx.recv().await.is_some() {} };
592        let (_, br) = tokio::join!(drain, blocking);
593
594        let task_status: Result<()> = match br {
595            Ok(Ok(out)) => {
596                contributions[rung_idx].lock().unwrap().push(out);
597                Ok(())
598            }
599            Ok(Err(e)) => Err(e),
600            Err(e) => Err(anyhow!("worker join error: {e}")),
601        };
602        drop(lease);
603        let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
604        if prev == 1 {
605            rung_done[rung_idx].notify_one();
606        }
607        (rung_idx, task_status)
608    };
609
610    match worker_tasks {
611        Some(set) => {
612            set.spawn(body);
613        }
614        None => {
615            tokio::spawn(async move {
616                let _ = body.await;
617            });
618        }
619    }
620}
621
622/// Periodic per-rung progress reporter. Reads the shared frame counters and
623/// emits `Running` updates until stopped; skips rungs already finalized.
624fn spawn_progress_reporter(
625    rungs: Vec<Rung>,
626    frames_encoded: Vec<Arc<AtomicU64>>,
627    finalized: Arc<Vec<AtomicBool>>,
628    total_input_frames: u64,
629    sink: Arc<dyn ProgressSink>,
630    stop: Arc<AtomicBool>,
631) -> tokio::task::JoinHandle<()> {
632    tokio::spawn(async move {
633        loop {
634            if stop.load(Ordering::Acquire) {
635                break;
636            }
637            tokio::time::sleep(PROGRESS_TICK).await;
638            for (idx, rung) in rungs.iter().enumerate() {
639                if finalized[idx].load(Ordering::Acquire) {
640                    continue;
641                }
642                let done = frames_encoded[idx].load(Ordering::Relaxed);
643                report(
644                    sink.as_ref(),
645                    idx,
646                    rung,
647                    RungStatus::Running,
648                    done,
649                    Some(total_input_frames),
650                    0,
651                    0,
652                    None,
653                );
654            }
655        }
656    })
657}
658
659#[allow(clippy::too_many_arguments)]
660fn report(
661    sink: &dyn ProgressSink,
662    rung_index: usize,
663    rung: &Rung,
664    status: RungStatus,
665    frames_done: u64,
666    frames_total: Option<u64>,
667    segments: u32,
668    bytes_out: u64,
669    message: Option<String>,
670) {
671    let percent = match status {
672        RungStatus::Completed => 100.0,
673        RungStatus::Pending => 0.0,
674        _ => match frames_total {
675            Some(t) if t > 0 => ((frames_done as f32 / t as f32) * 100.0).min(99.0),
676            _ => 1.0,
677        },
678    };
679    sink.on_rung(RungProgress {
680        rung_index,
681        label: rung.label.clone(),
682        width: rung.width,
683        height: rung.height,
684        status,
685        percent,
686        frames_done,
687        frames_total,
688        segments_written: segments,
689        bytes_out,
690        message,
691    });
692}
693
694/// Build a [`GpuPool`] from the host's detected GPU inventory.
695pub fn detect_gpu_pool() -> Arc<GpuPool> {
696    Arc::new(GpuPool::new(&codec::gpu::detect_gpus()))
697}
698
699fn policy_vendor(fam: GpuFamily) -> codec::gpu::GpuVendor {
700    match fam {
701        GpuFamily::Nvidia => codec::gpu::GpuVendor::Nvidia,
702        GpuFamily::Amd => codec::gpu::GpuVendor::Amd,
703        GpuFamily::Intel => codec::gpu::GpuVendor::Intel,
704    }
705}
706
707/// The host GPUs selected by an [`EncodePolicy`]: all of them for `AllGpus`,
708/// the first / pinned index for `SingleGpu`, every device of one vendor for
709/// `Family`.
710fn select_gpus_for_policy(policy: EncodePolicy) -> Vec<codec::gpu::GpuDevice> {
711    let gpus = codec::gpu::detect_gpus();
712    match policy {
713        EncodePolicy::AllGpus => gpus,
714        EncodePolicy::SingleGpu(None) => gpus.into_iter().take(1).collect(),
715        EncodePolicy::SingleGpu(Some(idx)) => gpus.into_iter().filter(|g| g.index == idx).collect(),
716        EncodePolicy::Family(fam) => {
717            let v = policy_vendor(fam);
718            gpus.into_iter().filter(|g| g.vendor == v).collect()
719        }
720    }
721}
722
723/// Build a [`GpuPool`] constrained to the given [`EncodePolicy`]. An empty pool
724/// (e.g. a pinned index or vendor family that isn't present) yields capacity 0,
725/// so the orchestrator's pre-flight probe / lease claim surfaces a clear error.
726pub fn gpu_pool_for_policy(policy: EncodePolicy) -> Arc<GpuPool> {
727    Arc::new(GpuPool::new(&select_gpus_for_policy(policy)))
728}
729
730/// The GPU indices an [`EncodePolicy`] selects, in detection order. Used to pin
731/// the decode pump to a device consistent with the policy (so decode honors a
732/// `Family` / `SingleGpu` constraint, not just encode).
733pub fn policy_gpu_indices(policy: EncodePolicy) -> Vec<u32> {
734    select_gpus_for_policy(policy).into_iter().map(|g| g.index).collect()
735}
736
737/// The GPU index to pin a *serial* (single-GPU) encode/decode to under a
738/// policy: `None` (auto/first-available) for `AllGpus`, the pinned index for
739/// `SingleGpu`, the first device of the vendor for `Family`.
740pub fn serial_gpu_for_policy(policy: EncodePolicy) -> Option<u32> {
741    match policy {
742        EncodePolicy::AllGpus => None,
743        EncodePolicy::SingleGpu(idx) => idx,
744        EncodePolicy::Family(_) => select_gpus_for_policy(policy).first().map(|g| g.index),
745    }
746}
747
748// ---------------------------------------------------------------------------
749// Single-file multi-GPU: chunk each rung across GPUs, stitch packets into MP4.
750// ---------------------------------------------------------------------------
751
752/// One rung's full ordered AV1 packet stream, stitched from chunks encoded
753/// across GPUs. The caller muxes these into a single MP4 (+ audio).
754#[derive(Debug)]
755pub struct RungPackets {
756    pub rung_index: usize,
757    pub width: u32,
758    pub height: u32,
759    pub label: String,
760    pub packets: Vec<EncodedPacket>,
761}
762
763/// Single-file counterpart to [`run_multigpu_hls`]: decode once, fan to per-rung
764/// scalers, and dynamically schedule each rung's GOP-sized chunks across all
765/// GPUs (fair lease pool + mid-flight helper dispatch + cross-vendor codec
766/// invariant). Each worker encodes its chunk to packets (a fresh encoder per
767/// chunk → first frame is an IDR); the finalizer concatenates them in segment
768/// order into one ordered packet stream per rung — no disk round-trip.
769pub async fn run_multigpu_single_file(
770    params: MultiGpuParams<'_>,
771    sink: Arc<dyn ProgressSink>,
772) -> Result<Vec<Option<RungPackets>>> {
773    let rungs = params.rungs;
774    let n = rungs.len();
775    if n == 0 {
776        return Ok(Vec::new());
777    }
778    let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
779    if total_segments == 0 {
780        bail!(
781            "multigpu single-file: total_segments == 0 (frames={}, keyframe_interval={})",
782            params.total_input_frames,
783            params.keyframe_interval
784        );
785    }
786
787    // Pre-flight encoder probe (same fail-fast as the HLS path).
788    {
789        let probe = codec::encode::EncoderConfig {
790            width: rungs[0].width,
791            height: rungs[0].height,
792            frame_rate: params.frame_rate,
793            gpu_index: None,
794            ..Default::default()
795        };
796        codec::encode::select_encoder(probe, None).map_err(|e| {
797            anyhow!(
798                "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
799                 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature"
800            )
801        })?;
802    }
803
804    tracing::info!(
805        rungs = n,
806        total_segments,
807        gpu_pool_capacity = params.gpu_pool.capacity(),
808        "multi-GPU single-file phase starting"
809    );
810
811    let queues: Vec<Arc<SegmentChunkQueue>> =
812        (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
813    let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
814    let scaler_active: Vec<Arc<AtomicBool>> =
815        (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
816    let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
817        (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
818    // Per-rung packet collectors (each its own Arc so chunk workers can push).
819    let contributions: Vec<Arc<std::sync::Mutex<Vec<ChunkPackets>>>> =
820        (0..n).map(|_| Arc::new(std::sync::Mutex::new(Vec::new()))).collect();
821    let active_workers: Arc<Vec<AtomicUsize>> =
822        Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
823    let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
824    let finalized: Arc<Vec<AtomicBool>> = Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
825
826    let progress_stop = Arc::new(AtomicBool::new(false));
827    let progress_handle = spawn_progress_reporter(
828        rungs.to_vec(),
829        frames_encoded.clone(),
830        finalized.clone(),
831        params.total_input_frames,
832        Arc::clone(&sink),
833        Arc::clone(&progress_stop),
834    );
835
836    // Finalizers: stitch each rung's chunks (sorted, deduped) into one stream.
837    let total_input_frames = params.total_input_frames;
838    let (finalizer_tx, mut finalizer_rx) =
839        mpsc::channel::<(usize, Result<Option<RungPackets>>)>(n.max(1));
840    let mut finalizer_handles = Vec::with_capacity(n);
841    for idx in 0..n {
842        let collector = Arc::clone(&contributions[idx]);
843        let active_h = Arc::clone(&active_workers);
844        let rung_done_h = Arc::clone(&rung_done);
845        let finalized_h = Arc::clone(&finalized);
846        let tx = finalizer_tx.clone();
847        let rung = rungs[idx].clone();
848        let total_segments = total_segments;
849        let sink = Arc::clone(&sink);
850        finalizer_handles.push(tokio::spawn(async move {
851            loop {
852                let notified = rung_done_h[idx].notified();
853                if active_h[idx].load(Ordering::Acquire) == 0 {
854                    break;
855                }
856                notified.await;
857            }
858            let mut chunks: Vec<ChunkPackets> = std::mem::take(&mut *collector.lock().unwrap());
859            if chunks.is_empty() {
860                finalized_h[idx].store(true, Ordering::Release);
861                let _ = tx.send((idx, Ok(None))).await;
862                return;
863            }
864            chunks.sort_by_key(|c| c.segment_idx);
865            chunks.dedup_by_key(|c| c.segment_idx);
866            // Coverage: contiguous 0..total_segments.
867            let got = chunks.len();
868            let contiguous = chunks
869                .iter()
870                .enumerate()
871                .all(|(i, c)| c.segment_idx == i);
872            let result = if got != total_segments as usize || !contiguous {
873                Err(anyhow!(
874                    "rung {} chunk coverage incomplete: expected {} contiguous chunks, got {}",
875                    rung.label,
876                    total_segments,
877                    got
878                ))
879            } else {
880                let mut packets: Vec<EncodedPacket> = Vec::new();
881                for c in chunks {
882                    packets.extend(c.packets);
883                }
884                let bytes: u64 = packets.iter().map(|p| p.data.len() as u64).sum();
885                report(
886                    sink.as_ref(),
887                    idx,
888                    &rung,
889                    RungStatus::Completed,
890                    total_input_frames,
891                    Some(total_input_frames),
892                    got as u32,
893                    bytes,
894                    None,
895                );
896                Ok(Some(RungPackets {
897                    rung_index: idx,
898                    width: rung.width,
899                    height: rung.height,
900                    label: rung.label.clone(),
901                    packets,
902                }))
903            };
904            finalized_h[idx].store(true, Ordering::Release);
905            let _ = tx.send((idx, result)).await;
906        }));
907    }
908    drop(finalizer_tx);
909
910    let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
911    indexed.sort_by_key(|(_, r)| r.short_side());
912
913    // Decode pump(s) + fan-out.
914    let mut frame_senders = Vec::with_capacity(n);
915    let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
916        Vec::with_capacity(n);
917    for _ in 0..n {
918        let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
919        frame_senders.push(tx);
920        frame_receivers.push(Some(rx));
921    }
922    let use_shared_pump = n <= params.gpu_pool.capacity();
923    let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
924    let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
925        codec_name: params.header.codec.clone(),
926        info_for_decoder: params.header.info.clone(),
927        source_color_metadata: params.source_color_metadata,
928        source_pixel_format: params.source_pixel_format,
929        needs_downsample: params.needs_downsample,
930        tonemap_to_sdr: params.tonemap_to_sdr,
931        gpu_index,
932    };
933    if use_shared_pump {
934        let cfg = make_pump_cfg(params.decode_gpu_for(0));
935        let senders = frame_senders;
936        let input = params.input.clone();
937        let rt = tokio::runtime::Handle::current();
938        pump_tasks.spawn(async move {
939            tokio::task::spawn_blocking(move || {
940                crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
941            })
942            .await
943            .map_err(|e| anyhow!("shared pump join error: {e}"))
944            .and_then(|r| r)
945        });
946    } else {
947        for (idx, sender) in frame_senders.into_iter().enumerate() {
948            let cfg = make_pump_cfg(params.decode_gpu_for(idx));
949            let input = params.input.clone();
950            let rt = tokio::runtime::Handle::current();
951            pump_tasks.spawn(async move {
952                tokio::task::spawn_blocking(move || {
953                    crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
954                })
955                .await
956                .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
957                .and_then(|r| r)
958            });
959        }
960    }
961
962    // Per-rung scalers.
963    let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
964    for (idx, rung) in rungs.iter().cloned().enumerate() {
965        let rx = frame_receivers[idx].take().expect("scaler rx slot");
966        let cfg = crate::rung_scaler::RungScalerConfig {
967            rung_idx: idx,
968            target_width: rung.width,
969            target_height: rung.height,
970            frames_per_chunk: params.keyframe_interval,
971        };
972        let queue = Arc::clone(&queues[idx]);
973        let rt = tokio::runtime::Handle::current();
974        let scaler_flag = Arc::clone(&scaler_active[idx]);
975        let active_h = Arc::clone(&active_workers);
976        let rung_done_h = Arc::clone(&rung_done);
977        scaler_flag.store(true, Ordering::Release);
978        active_h[idx].fetch_add(1, Ordering::AcqRel);
979        scaler_tasks.spawn(async move {
980            let result = tokio::task::spawn_blocking(move || {
981                crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
982            })
983            .await
984            .map_err(|e| anyhow!("scaler join error: {e}"))
985            .and_then(|r| r);
986            scaler_flag.store(false, Ordering::Release);
987            let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
988            if prev == 1 {
989                rung_done_h[idx].notify_one();
990            }
991            (idx, result)
992        });
993    }
994
995    // Initial chunk workers.
996    let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
997    let ctx = WorkerCtx {
998        frame_rate: params.frame_rate,
999        output_color_metadata: params.output_color_metadata,
1000        output_pixel_format: params.output_pixel_format,
1001        timescale: params.timescale,
1002        per_frame_ticks: params.per_frame_ticks,
1003        keyframe_interval: params.keyframe_interval,
1004        segment_target_ticks: params.segment_target_ticks,
1005        output_root: params.output_root.clone(),
1006        constant_qp: params.constant_qp,
1007    };
1008    for (idx, rung) in indexed.iter().cloned() {
1009        let lease = match Arc::clone(&params.gpu_pool).claim().await {
1010            Some(l) => l,
1011            None => {
1012                progress_stop.store(true, Ordering::Release);
1013                let _ = progress_handle.await;
1014                bail!("multigpu single-file: GPU pool returned no lease; at least one GPU required");
1015            }
1016        };
1017        spawn_chunk_worker(
1018            &ctx,
1019            idx,
1020            &rung,
1021            Arc::clone(&queues[idx]),
1022            Arc::clone(&frames_encoded[idx]),
1023            lease,
1024            Arc::clone(&contributions[idx]),
1025            Arc::clone(&active_workers),
1026            Arc::clone(&rung_done),
1027            Arc::clone(&rung_invariants[idx]),
1028            Some(&mut worker_tasks),
1029        );
1030    }
1031
1032    // Helper dispatcher.
1033    let helper_cancel = Arc::new(AtomicBool::new(false));
1034    let helper_handle = {
1035        let cancel = Arc::clone(&helper_cancel);
1036        let pool = Arc::clone(&params.gpu_pool);
1037        let queues = queues.clone();
1038        let scaler_active = scaler_active.clone();
1039        let frames_encoded = frames_encoded.clone();
1040        let contributions = contributions.clone();
1041        let active_workers = Arc::clone(&active_workers);
1042        let rung_done = Arc::clone(&rung_done);
1043        let rung_invariants = rung_invariants.clone();
1044        let rungs_owned: Vec<Rung> = rungs.to_vec();
1045        let ctx = ctx.clone();
1046        tokio::spawn(async move {
1047            loop {
1048                if cancel.load(Ordering::Acquire) {
1049                    break;
1050                }
1051                tokio::time::sleep(HELPER_POLL_INTERVAL).await;
1052                if pool.pending_claimers() > 0 {
1053                    continue;
1054                }
1055                let mut target = None;
1056                for (idx, q) in queues.iter().enumerate() {
1057                    let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
1058                    let has_pending = q.pushed_segments() > q.popped_segments();
1059                    if scaler_alive || has_pending {
1060                        target = Some(idx);
1061                        break;
1062                    }
1063                }
1064                let Some(rung_idx) = target else { break };
1065                let lease = match pool.try_claim() {
1066                    Some(l) => l,
1067                    None => continue,
1068                };
1069                tracing::info!(rung_idx, gpu_index = lease.gpu_index, "single-file helper dispatch");
1070                spawn_chunk_worker(
1071                    &ctx,
1072                    rung_idx,
1073                    &rungs_owned[rung_idx],
1074                    Arc::clone(&queues[rung_idx]),
1075                    Arc::clone(&frames_encoded[rung_idx]),
1076                    lease,
1077                    Arc::clone(&contributions[rung_idx]),
1078                    Arc::clone(&active_workers),
1079                    Arc::clone(&rung_done),
1080                    Arc::clone(&rung_invariants[rung_idx]),
1081                    None,
1082                );
1083            }
1084        })
1085    };
1086
1087    // Drain.
1088    let mut completed: Vec<Option<RungPackets>> = (0..n).map(|_| None).collect();
1089    let mut pumps_remaining = pump_tasks.len();
1090    let mut scalers_remaining = n;
1091    let mut workers_remaining = n;
1092    let mut finalizers_remaining = n;
1093    macro_rules! teardown_err {
1094        ($e:expr) => {{
1095            helper_cancel.store(true, Ordering::Release);
1096            let _ = helper_handle.await;
1097            progress_stop.store(true, Ordering::Release);
1098            let _ = progress_handle.await;
1099            return Err($e);
1100        }};
1101    }
1102    while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
1103        tokio::select! {
1104            biased;
1105            p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
1106                Some(Ok(Ok(_))) => pumps_remaining -= 1,
1107                Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
1108                Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
1109                None => pumps_remaining = 0,
1110            },
1111            s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
1112                Some(Ok((_, Ok(_)))) => scalers_remaining -= 1,
1113                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
1114                Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
1115                None => scalers_remaining = 0,
1116            },
1117            w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
1118                Some(Ok((_, Ok(())))) => workers_remaining -= 1,
1119                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("chunk worker for rung {idx} failed: {e}")),
1120                Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
1121                None => workers_remaining = 0,
1122            },
1123            f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
1124                Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
1125                Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
1126                None => finalizers_remaining = 0,
1127            },
1128        }
1129    }
1130    helper_cancel.store(true, Ordering::Release);
1131    let _ = helper_handle.await;
1132    progress_stop.store(true, Ordering::Release);
1133    let _ = progress_handle.await;
1134    for h in finalizer_handles {
1135        let _ = h.await;
1136    }
1137    Ok(completed)
1138}
1139
1140#[allow(clippy::too_many_arguments)]
1141fn spawn_chunk_worker(
1142    ctx: &WorkerCtx,
1143    rung_idx: usize,
1144    rung: &Rung,
1145    queue: Arc<SegmentChunkQueue>,
1146    frames_encoded: Arc<AtomicU64>,
1147    lease: GpuLease,
1148    collector: Arc<std::sync::Mutex<Vec<ChunkPackets>>>,
1149    active_workers: Arc<Vec<AtomicUsize>>,
1150    rung_done: Arc<Vec<Notify>>,
1151    rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
1152    worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
1153) {
1154    let gpu_index = lease.gpu_index;
1155    let gpu_vendor = lease.vendor;
1156    let cfg = EncoderWorkerConfig {
1157        rung_idx,
1158        width: rung.width,
1159        height: rung.height,
1160        frame_rate: ctx.frame_rate,
1161        quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
1162        speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
1163        target: rung.quality.target,
1164        tier: rung.quality.tier,
1165        threads: 0,
1166        gpu_index: Some(gpu_index),
1167        gpu_vendor: Some(gpu_vendor),
1168        output_color_metadata: ctx.output_color_metadata,
1169        output_pixel_format: ctx.output_pixel_format,
1170        constant_qp: ctx.constant_qp,
1171        timescale: ctx.timescale,
1172        per_frame_ticks: ctx.per_frame_ticks,
1173        keyframe_interval: ctx.keyframe_interval,
1174        segment_target_ticks: ctx.segment_target_ticks,
1175        output_dir: ctx.output_root.clone(),
1176        rung_invariant,
1177    };
1178    active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
1179    let body = async move {
1180        let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
1181        let cfg_for_worker = cfg.clone();
1182        let queue_for_worker = Arc::clone(&queue);
1183        let rt = tokio::runtime::Handle::current();
1184        let counter = Arc::clone(&frames_encoded);
1185        let out = Arc::clone(&collector);
1186        let blocking = tokio::task::spawn_blocking(move || {
1187            run_chunk_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx, out)
1188        });
1189        let drain = async move { while progress_rx.recv().await.is_some() {} };
1190        let (_, br) = tokio::join!(drain, blocking);
1191        let task_status: Result<()> = match br {
1192            Ok(Ok(())) => Ok(()),
1193            Ok(Err(e)) => Err(e),
1194            Err(e) => Err(anyhow!("chunk worker join error: {e}")),
1195        };
1196        drop(lease);
1197        let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
1198        if prev == 1 {
1199            rung_done[rung_idx].notify_one();
1200        }
1201        (rung_idx, task_status)
1202    };
1203    match worker_tasks {
1204        Some(set) => {
1205            set.spawn(body);
1206        }
1207        None => {
1208            tokio::spawn(async move {
1209                let _ = body.await;
1210            });
1211        }
1212    }
1213}