Skip to main content

rivet/
multigpu.rs

1//! Multi-GPU reactive variant phase — **the rung benefit**.
2//!
3//! Decode the source **once** and dynamically schedule every rung's CMAF
4//! segments across all available GPUs using a fair lease pool with mid-flight
5//! helper dispatch:
6//!
7//! ```text
8//!   decode pump (decode once)
9//!        │  fan out normalized frames
10//!        ▼
11//!   per-rung scaler ──► SegmentChunkQueue ──► encoder worker (holds a GpuLease)
12//!                                        ──► helper worker (claims a freed lease)
13//! ```
14//!
15//! - One encoder per GPU at a time ([`GpuPool`] enforces it — concurrent
16//!   NVENC sessions on one context deadlock).
17//! - A fast rung releases its lease early; the **helper dispatcher** grabs the
18//!   freed lease and attaches an extra worker to a still-busy rung, so a slow
19//!   rung finishes sooner. Segment work is the unit of parallelism.
20//! - Helpers may land on a different GPU **vendor** than the rung's first
21//!   worker; the per-rung AV1 **codec invariant** ([`RungCodecInvariant`])
22//!   guarantees every contributed segment shares the `av1C` contract, so a
23//!   cross-vendor (NVENC + QSV) rendition still decodes cleanly. A mismatched
24//!   helper requeues its chunk and exits — the run never aborts on it.
25//!
26//! Storage/transport specifics stay out of the engine: progress is reported
27//! through the generic [`ProgressSink`], so a consumer can layer an uploader
28//! (object storage, a status queue, …) on top by watching `RungStatus::Completed`.
29
30use std::path::PathBuf;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
33
34use anyhow::{Result, anyhow, bail};
35use bytes::Bytes;
36use codec::encode::AUTO_FROM_TARGET;
37use codec::frame::{ColorMetadata, PixelFormat};
38use container::cmaf::CmafTrackManifest;
39use container::streaming::DemuxHeader;
40use tokio::sync::{Notify, mpsc};
41use tokio::task::JoinSet;
42
43use codec::encode::EncodedPacket;
44
45use crate::cmaf_util::{RungContribution, merge_rung_contributions, total_segments_for_rung};
46use crate::decode_pump::DecodePumpConfig;
47use crate::encoder_worker::{
48    ChunkPackets, EncoderWorkerConfig, RungCodecInvariant, WorkerOutput,
49    run_chunk_encoder_worker_blocking, run_encoder_worker_blocking,
50};
51use crate::frame_queue::SegmentChunkQueue;
52use crate::gpu_pool::{GpuLease, GpuPool};
53use crate::progress::{ProgressSink, RungProgress, RungStatus};
54use crate::spec::{EncodePolicy, GpuFamily, Rung};
55
56const QUEUE_CAPACITY: usize = 2;
57const FANOUT_CHANNEL_CAPACITY: usize = 4;
58const HELPER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(200);
59const PROGRESS_TICK: std::time::Duration = std::time::Duration::from_millis(500);
60
61/// One rung's finalized CMAF manifest.
62#[derive(Debug, Clone)]
63pub struct RungManifest {
64    pub rung_index: usize,
65    pub width: u32,
66    pub height: u32,
67    pub label: String,
68    /// Directory relative to the asset root, e.g. `"video/720p"`.
69    pub relative_dir: String,
70    pub manifest: CmafTrackManifest,
71}
72
73/// Inputs to [`run_multigpu_hls`].
74pub struct MultiGpuParams<'a> {
75    pub input: Bytes,
76    pub rungs: &'a [Rung],
77    pub header: DemuxHeader,
78    pub source_color_metadata: ColorMetadata,
79    pub source_pixel_format: PixelFormat,
80    /// Whether the decode pump tonemaps HDR→SDR (from the spec's `ColorPolicy`).
81    pub tonemap_to_sdr: bool,
82    /// Resolved **output** color metadata + pixel format the encoders target
83    /// (from `OutputSpec::resolve_output`).
84    pub output_color_metadata: ColorMetadata,
85    pub output_pixel_format: PixelFormat,
86    pub needs_downsample: bool,
87    /// Prepared per-frame video filter chain applied in the decode pump (before
88    /// scaling). Overlay images are loaded once at prepare time.
89    pub filters: Arc<codec::filter::FilterChain>,
90    pub frame_rate: f64,
91    pub gpu_pool: Arc<GpuPool>,
92    /// GPU indices the encode policy selected, in detection order. The decode
93    /// pump pins to these (round-robin for per-rung pumps) so decode honors the
94    /// same `Family` / `SingleGpu` / `AllGpus` constraint as encode. Empty ⇒
95    /// the decoder dispatch auto-selects (legacy behavior).
96    pub gpu_indices: Vec<u32>,
97    /// Explicit decode-pump GPU override. `Some(i)` forces every decode pump
98    /// onto GPU `i` regardless of `gpu_indices`; `None` follows the policy.
99    pub decode_gpu: Option<u32>,
100    pub output_root: PathBuf,
101    pub timescale: u32,
102    pub per_frame_ticks: u32,
103    pub keyframe_interval: u32,
104    pub segment_target_ticks: u64,
105    pub total_input_frames: u64,
106    /// Force constant-QP chunk encoding (single-file `ChunkSeamMode::ParallelConstQp`)
107    /// so stitched chunk seams are quality-flat. `false` for HLS (segments are
108    /// independent) and the default `Parallel` single-file mode.
109    pub constant_qp: bool,
110}
111
112impl MultiGpuParams<'_> {
113    /// Resolve the decode-pump GPU for the `i`-th per-rung pump (or the shared
114    /// pump when `i == 0`): the explicit `decode_gpu` override wins, else the
115    /// policy's GPU indices round-robin, else `None` (decoder auto-select).
116    fn decode_gpu_for(&self, i: usize) -> Option<u32> {
117        if self.decode_gpu.is_some() {
118            return self.decode_gpu;
119        }
120        if self.gpu_indices.is_empty() {
121            return None;
122        }
123        Some(self.gpu_indices[i % self.gpu_indices.len()])
124    }
125}
126
127/// Run the reactive multi-GPU variant phase. Returns one `Option<RungManifest>`
128/// per rung (in rung order); `None` means the rung produced no segments.
129pub async fn run_multigpu_hls(
130    params: MultiGpuParams<'_>,
131    sink: Arc<dyn ProgressSink>,
132) -> Result<Vec<Option<RungManifest>>> {
133    let rungs = params.rungs;
134    let n = rungs.len();
135    if n == 0 {
136        return Ok(Vec::new());
137    }
138    let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
139    if total_segments == 0 {
140        bail!(
141            "multigpu: total_segments == 0 (total_input_frames={}, keyframe_interval={})",
142            params.total_input_frames,
143            params.keyframe_interval
144        );
145    }
146
147    // Pre-flight: verify this host can actually construct an AV1 encoder
148    // before spawning the orchestration. Fail fast with a clear error instead
149    // of dispatching workers that fail at encoder construction — and, on
150    // drivers that re-init a failed NVENC session badly (e.g. Ampere with no
151    // AV1-encode silicon), would otherwise hang an uncancellable blocking task.
152    {
153        let probe = codec::encode::EncoderConfig {
154            width: rungs[0].width,
155            height: rungs[0].height,
156            frame_rate: params.frame_rate,
157            gpu_index: None,
158            ..Default::default()
159        };
160        codec::encode::select_encoder(probe, None).map_err(|e| {
161            anyhow!(
162                "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
163                 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature for a software encoder"
164            )
165        })?;
166    }
167
168    tracing::info!(
169        rungs = n,
170        total_segments,
171        gpu_pool_capacity = params.gpu_pool.capacity(),
172        "multi-GPU variant phase starting"
173    );
174
175    // Per-rung shared state.
176    let queues: Vec<Arc<SegmentChunkQueue>> =
177        (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
178    let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
179    let scaler_active: Vec<Arc<AtomicBool>> =
180        (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
181    let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
182        (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
183    let contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>> =
184        Arc::new((0..n).map(|_| std::sync::Mutex::new(Vec::new())).collect());
185    let active_workers: Arc<Vec<AtomicUsize>> =
186        Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
187    let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
188    let finalized: Arc<Vec<AtomicBool>> =
189        Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
190
191    // Periodic progress reporter.
192    let progress_stop = Arc::new(AtomicBool::new(false));
193    let progress_handle = spawn_progress_reporter(
194        rungs.to_vec(),
195        frames_encoded.clone(),
196        finalized.clone(),
197        params.total_input_frames,
198        Arc::clone(&sink),
199        Arc::clone(&progress_stop),
200    );
201
202    // Finalizers: one per rung, merges contributions → RungManifest.
203    let total_input_frames = params.total_input_frames;
204    let (finalizer_tx, mut finalizer_rx) =
205        mpsc::channel::<(usize, Result<Option<RungManifest>>)>(n.max(1));
206    let mut finalizer_handles = Vec::with_capacity(n);
207    for idx in 0..n {
208        let contributions_h = Arc::clone(&contributions);
209        let active_h = Arc::clone(&active_workers);
210        let rung_done_h = Arc::clone(&rung_done);
211        let finalized_h = Arc::clone(&finalized);
212        let tx = finalizer_tx.clone();
213        let rung = rungs[idx].clone();
214        let rel_dir = format!("video/{}", rung.label);
215        let output_root = params.output_root.clone();
216        let timescale = params.timescale;
217        let total_segments = total_segments;
218        let sink = Arc::clone(&sink);
219        finalizer_handles.push(tokio::spawn(async move {
220            // Wait for all of this rung's workers + the scaler to finish.
221            loop {
222                let notified = rung_done_h[idx].notified();
223                if active_h[idx].load(Ordering::Acquire) == 0 {
224                    break;
225                }
226                notified.await;
227            }
228            let outputs: Vec<WorkerOutput> = std::mem::take(&mut *contributions_h[idx].lock().unwrap());
229            if outputs.is_empty() {
230                finalized_h[idx].store(true, Ordering::Release);
231                let _ = tx.send((idx, Ok(None))).await;
232                return;
233            }
234            let init_path = output_root.join(&rel_dir).join("init.mp4");
235            let contribs: Vec<RungContribution> = outputs
236                .into_iter()
237                .map(|wo| RungContribution {
238                    width: rung.width,
239                    height: rung.height,
240                    relative_dir: rel_dir.clone(),
241                    manifest: CmafTrackManifest {
242                        init_path: init_path.clone(),
243                        segments: wo.segments,
244                        timescale,
245                    },
246                })
247                .collect();
248            let result = match merge_rung_contributions(contribs) {
249                Ok(merged) => {
250                    let got = merged.manifest.segments.len();
251                    if got != total_segments as usize {
252                        Err(anyhow!(
253                            "rung {} coverage incomplete: expected {} segments, got {}",
254                            rung.label,
255                            total_segments,
256                            got
257                        ))
258                    } else {
259                        let bytes: u64 = merged.manifest.segments.iter().map(|s| s.byte_size).sum();
260                        report(
261                            sink.as_ref(),
262                            idx,
263                            &rung,
264                            RungStatus::Completed,
265                            total_input_frames,
266                            Some(total_input_frames),
267                            got as u32,
268                            bytes,
269                            None,
270                        );
271                        Ok(Some(RungManifest {
272                            rung_index: idx,
273                            width: rung.width,
274                            height: rung.height,
275                            label: rung.label.clone(),
276                            relative_dir: rel_dir.clone(),
277                            manifest: merged.manifest,
278                        }))
279                    }
280                }
281                Err(e) => Err(anyhow!("merging contributions for rung {}: {e}", rung.label)),
282            };
283            finalized_h[idx].store(true, Ordering::Release);
284            let _ = tx.send((idx, result)).await;
285        }));
286    }
287    drop(finalizer_tx);
288
289    // Smallest-first claim order for initial workers.
290    let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
291    indexed.sort_by_key(|(_, r)| r.short_side());
292
293    // Decode pump(s) + fan-out channels.
294    let mut frame_senders = Vec::with_capacity(n);
295    let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
296        Vec::with_capacity(n);
297    for _ in 0..n {
298        let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
299        frame_senders.push(tx);
300        frame_receivers.push(Some(rx));
301    }
302
303    let use_shared_pump = n <= params.gpu_pool.capacity();
304    let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
305    let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
306        codec_name: params.header.codec.clone(),
307        info_for_decoder: params.header.info.clone(),
308        source_color_metadata: params.source_color_metadata,
309        source_pixel_format: params.source_pixel_format,
310        needs_downsample: params.needs_downsample,
311        tonemap_to_sdr: params.tonemap_to_sdr,
312        gpu_index,
313        filters: params.filters.clone(),
314    };
315    if use_shared_pump {
316        let cfg = make_pump_cfg(params.decode_gpu_for(0));
317        let senders = frame_senders;
318        let input = params.input.clone();
319        let rt = tokio::runtime::Handle::current();
320        pump_tasks.spawn(async move {
321            tokio::task::spawn_blocking(move || {
322                crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
323            })
324            .await
325            .map_err(|e| anyhow!("shared pump join error: {e}"))
326            .and_then(|r| r)
327        });
328    } else {
329        for (idx, sender) in frame_senders.into_iter().enumerate() {
330            let cfg = make_pump_cfg(params.decode_gpu_for(idx));
331            let input = params.input.clone();
332            let rt = tokio::runtime::Handle::current();
333            pump_tasks.spawn(async move {
334                tokio::task::spawn_blocking(move || {
335                    crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
336                })
337                .await
338                .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
339                .and_then(|r| r)
340            });
341        }
342    }
343
344    // Per-rung scalers.
345    let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
346    for (idx, rung) in rungs.iter().cloned().enumerate() {
347        let rx = frame_receivers[idx].take().expect("scaler rx slot");
348        let cfg = crate::rung_scaler::RungScalerConfig {
349            rung_idx: idx,
350            target_width: rung.width,
351            target_height: rung.height,
352            frames_per_chunk: params.keyframe_interval,
353        };
354        let queue = Arc::clone(&queues[idx]);
355        let rt = tokio::runtime::Handle::current();
356        let scaler_flag = Arc::clone(&scaler_active[idx]);
357        let active_h = Arc::clone(&active_workers);
358        let rung_done_h = Arc::clone(&rung_done);
359        scaler_flag.store(true, Ordering::Release);
360        active_h[idx].fetch_add(1, Ordering::AcqRel);
361        scaler_tasks.spawn(async move {
362            let result = tokio::task::spawn_blocking(move || {
363                crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
364            })
365            .await
366            .map_err(|e| anyhow!("scaler join error: {e}"))
367            .and_then(|r| r);
368            scaler_flag.store(false, Ordering::Release);
369            let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
370            if prev == 1 {
371                rung_done_h[idx].notify_one();
372            }
373            (idx, result)
374        });
375    }
376
377    // Initial encoder workers (one per rung, smallest first).
378    let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
379    let ctx = WorkerCtx {
380        frame_rate: params.frame_rate,
381        output_color_metadata: params.output_color_metadata,
382        output_pixel_format: params.output_pixel_format,
383        timescale: params.timescale,
384        per_frame_ticks: params.per_frame_ticks,
385        keyframe_interval: params.keyframe_interval,
386        segment_target_ticks: params.segment_target_ticks,
387        output_root: params.output_root.clone(),
388        constant_qp: params.constant_qp,
389    };
390    for (idx, rung) in indexed.iter().cloned() {
391        let lease = match Arc::clone(&params.gpu_pool).claim().await {
392            Some(l) => l,
393            None => {
394                progress_stop.store(true, Ordering::Release);
395                let _ = progress_handle.await;
396                bail!("multigpu: GPU pool returned no lease on a CPU-only host; at least one GPU is required");
397            }
398        };
399        spawn_encoder_worker(
400            &ctx,
401            idx,
402            &rung,
403            Arc::clone(&queues[idx]),
404            Arc::clone(&frames_encoded[idx]),
405            lease,
406            Arc::clone(&contributions),
407            Arc::clone(&active_workers),
408            Arc::clone(&rung_done),
409            Arc::clone(&rung_invariants[idx]),
410            Some(&mut worker_tasks),
411        );
412    }
413
414    // Helper dispatcher.
415    let helper_cancel = Arc::new(AtomicBool::new(false));
416    let helper_handle = {
417        let cancel = Arc::clone(&helper_cancel);
418        let pool = Arc::clone(&params.gpu_pool);
419        let queues = queues.clone();
420        let scaler_active = scaler_active.clone();
421        let frames_encoded = frames_encoded.clone();
422        let contributions = Arc::clone(&contributions);
423        let active_workers = Arc::clone(&active_workers);
424        let rung_done = Arc::clone(&rung_done);
425        let rung_invariants = rung_invariants.clone();
426        let rungs_owned: Vec<Rung> = rungs.to_vec();
427        let ctx = ctx.clone();
428        tokio::spawn(async move {
429            loop {
430                if cancel.load(Ordering::Acquire) {
431                    break;
432                }
433                tokio::time::sleep(HELPER_POLL_INTERVAL).await;
434                if pool.pending_claimers() > 0 {
435                    continue;
436                }
437                let mut target = None;
438                for (idx, q) in queues.iter().enumerate() {
439                    let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
440                    let has_pending = q.pushed_segments() > q.popped_segments();
441                    if scaler_alive || has_pending {
442                        target = Some(idx);
443                        break;
444                    }
445                }
446                let Some(rung_idx) = target else { break };
447                let lease = match pool.try_claim() {
448                    Some(l) => l,
449                    None => continue,
450                };
451                tracing::info!(rung_idx, gpu_index = lease.gpu_index, "multigpu helper dispatch");
452                spawn_encoder_worker(
453                    &ctx,
454                    rung_idx,
455                    &rungs_owned[rung_idx],
456                    Arc::clone(&queues[rung_idx]),
457                    Arc::clone(&frames_encoded[rung_idx]),
458                    lease,
459                    Arc::clone(&contributions),
460                    Arc::clone(&active_workers),
461                    Arc::clone(&rung_done),
462                    Arc::clone(&rung_invariants[rung_idx]),
463                    None,
464                );
465            }
466        })
467    };
468
469    // Drain everything.
470    let mut completed: Vec<Option<RungManifest>> = (0..n).map(|_| None).collect();
471    let mut pumps_remaining = pump_tasks.len();
472    let mut scalers_remaining = n;
473    let mut workers_remaining = n;
474    let mut finalizers_remaining = n;
475
476    macro_rules! teardown_err {
477        ($e:expr) => {{
478            helper_cancel.store(true, Ordering::Release);
479            let _ = helper_handle.await;
480            progress_stop.store(true, Ordering::Release);
481            let _ = progress_handle.await;
482            return Err($e);
483        }};
484    }
485
486    while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
487        tokio::select! {
488            biased;
489            p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
490                Some(Ok(Ok(n))) => { tracing::info!(frames = n, "decode pump finished"); pumps_remaining -= 1; }
491                Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
492                Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
493                None => pumps_remaining = 0,
494            },
495            s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
496                Some(Ok((idx, Ok(n)))) => { tracing::info!(idx, chunks = n, "scaler finished"); scalers_remaining -= 1; }
497                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
498                Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
499                None => scalers_remaining = 0,
500            },
501            w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
502                Some(Ok((idx, Ok(())))) => { tracing::info!(idx, "initial worker finished"); workers_remaining -= 1; }
503                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("worker for rung {idx} failed: {e}")),
504                Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
505                None => workers_remaining = 0,
506            },
507            f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
508                Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
509                Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
510                None => finalizers_remaining = 0,
511            },
512        }
513    }
514
515    helper_cancel.store(true, Ordering::Release);
516    let _ = helper_handle.await;
517    progress_stop.store(true, Ordering::Release);
518    let _ = progress_handle.await;
519    for h in finalizer_handles {
520        let _ = h.await;
521    }
522
523    Ok(completed)
524}
525
526/// Per-job constants shared by every encoder worker.
527#[derive(Clone)]
528struct WorkerCtx {
529    frame_rate: f64,
530    output_color_metadata: ColorMetadata,
531    output_pixel_format: PixelFormat,
532    timescale: u32,
533    per_frame_ticks: u32,
534    keyframe_interval: u32,
535    segment_target_ticks: u64,
536    output_root: PathBuf,
537    constant_qp: bool,
538}
539
540#[allow(clippy::too_many_arguments)]
541fn spawn_encoder_worker(
542    ctx: &WorkerCtx,
543    rung_idx: usize,
544    rung: &Rung,
545    queue: Arc<SegmentChunkQueue>,
546    frames_encoded: Arc<AtomicU64>,
547    lease: GpuLease,
548    contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>>,
549    active_workers: Arc<Vec<AtomicUsize>>,
550    rung_done: Arc<Vec<Notify>>,
551    rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
552    worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
553) {
554    let rel_dir = format!("video/{}", rung.label);
555    let output_dir = ctx.output_root.join(&rel_dir);
556    let gpu_index = lease.gpu_index;
557    let gpu_vendor = lease.vendor;
558
559    let cfg = EncoderWorkerConfig {
560        rung_idx,
561        width: rung.width,
562        height: rung.height,
563        frame_rate: ctx.frame_rate,
564        quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
565        speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
566        target: rung.quality.target,
567        tier: rung.quality.tier,
568        threads: 0,
569        gpu_index: Some(gpu_index),
570        gpu_vendor: Some(gpu_vendor),
571        output_color_metadata: ctx.output_color_metadata,
572        output_pixel_format: ctx.output_pixel_format,
573        constant_qp: ctx.constant_qp,
574        timescale: ctx.timescale,
575        per_frame_ticks: ctx.per_frame_ticks,
576        keyframe_interval: ctx.keyframe_interval,
577        segment_target_ticks: ctx.segment_target_ticks,
578        output_dir,
579        rung_invariant,
580    };
581
582    active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
583    let body = async move {
584        let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
585        let cfg_for_worker = cfg.clone();
586        let queue_for_worker = Arc::clone(&queue);
587        let rt = tokio::runtime::Handle::current();
588        let counter = Arc::clone(&frames_encoded);
589        let blocking = tokio::task::spawn_blocking(move || {
590            run_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx)
591        });
592        // Drain the per-frame progress channel (the shared AtomicU64 counter is
593        // the source of truth the reporter task reads; here we just keep the
594        // channel from backpressuring the worker).
595        let drain = async move { while progress_rx.recv().await.is_some() {} };
596        let (_, br) = tokio::join!(drain, blocking);
597
598        let task_status: Result<()> = match br {
599            Ok(Ok(out)) => {
600                contributions[rung_idx].lock().unwrap().push(out);
601                Ok(())
602            }
603            Ok(Err(e)) => Err(e),
604            Err(e) => Err(anyhow!("worker join error: {e}")),
605        };
606        drop(lease);
607        let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
608        if prev == 1 {
609            rung_done[rung_idx].notify_one();
610        }
611        (rung_idx, task_status)
612    };
613
614    match worker_tasks {
615        Some(set) => {
616            set.spawn(body);
617        }
618        None => {
619            tokio::spawn(async move {
620                let _ = body.await;
621            });
622        }
623    }
624}
625
626/// Periodic per-rung progress reporter. Reads the shared frame counters and
627/// emits `Running` updates until stopped; skips rungs already finalized.
628fn spawn_progress_reporter(
629    rungs: Vec<Rung>,
630    frames_encoded: Vec<Arc<AtomicU64>>,
631    finalized: Arc<Vec<AtomicBool>>,
632    total_input_frames: u64,
633    sink: Arc<dyn ProgressSink>,
634    stop: Arc<AtomicBool>,
635) -> tokio::task::JoinHandle<()> {
636    tokio::spawn(async move {
637        loop {
638            if stop.load(Ordering::Acquire) {
639                break;
640            }
641            tokio::time::sleep(PROGRESS_TICK).await;
642            for (idx, rung) in rungs.iter().enumerate() {
643                if finalized[idx].load(Ordering::Acquire) {
644                    continue;
645                }
646                let done = frames_encoded[idx].load(Ordering::Relaxed);
647                report(
648                    sink.as_ref(),
649                    idx,
650                    rung,
651                    RungStatus::Running,
652                    done,
653                    Some(total_input_frames),
654                    0,
655                    0,
656                    None,
657                );
658            }
659        }
660    })
661}
662
663#[allow(clippy::too_many_arguments)]
664fn report(
665    sink: &dyn ProgressSink,
666    rung_index: usize,
667    rung: &Rung,
668    status: RungStatus,
669    frames_done: u64,
670    frames_total: Option<u64>,
671    segments: u32,
672    bytes_out: u64,
673    message: Option<String>,
674) {
675    let percent = match status {
676        RungStatus::Completed => 100.0,
677        RungStatus::Pending => 0.0,
678        _ => match frames_total {
679            Some(t) if t > 0 => ((frames_done as f32 / t as f32) * 100.0).min(99.0),
680            _ => 1.0,
681        },
682    };
683    sink.on_rung(RungProgress {
684        rung_index,
685        label: rung.label.clone(),
686        width: rung.width,
687        height: rung.height,
688        status,
689        percent,
690        frames_done,
691        frames_total,
692        segments_written: segments,
693        bytes_out,
694        message,
695    });
696}
697
698/// Build a [`GpuPool`] from the host's detected GPU inventory.
699pub fn detect_gpu_pool() -> Arc<GpuPool> {
700    Arc::new(GpuPool::new(&codec::gpu::detect_gpus()))
701}
702
703fn policy_vendor(fam: GpuFamily) -> codec::gpu::GpuVendor {
704    match fam {
705        GpuFamily::Nvidia => codec::gpu::GpuVendor::Nvidia,
706        GpuFamily::Amd => codec::gpu::GpuVendor::Amd,
707        GpuFamily::Intel => codec::gpu::GpuVendor::Intel,
708    }
709}
710
711/// The host GPUs selected by an [`EncodePolicy`]: all of them for `AllGpus`,
712/// the first / pinned index for `SingleGpu`, every device of one vendor for
713/// `Family`.
714fn select_gpus_for_policy(policy: EncodePolicy) -> Vec<codec::gpu::GpuDevice> {
715    let gpus = codec::gpu::detect_gpus();
716    match policy {
717        EncodePolicy::AllGpus => gpus,
718        EncodePolicy::SingleGpu(None) => gpus.into_iter().take(1).collect(),
719        EncodePolicy::SingleGpu(Some(idx)) => gpus.into_iter().filter(|g| g.index == idx).collect(),
720        EncodePolicy::Family(fam) => {
721            let v = policy_vendor(fam);
722            gpus.into_iter().filter(|g| g.vendor == v).collect()
723        }
724    }
725}
726
727/// Build a [`GpuPool`] constrained to the given [`EncodePolicy`]. An empty pool
728/// (e.g. a pinned index or vendor family that isn't present) yields capacity 0,
729/// so the orchestrator's pre-flight probe / lease claim surfaces a clear error.
730pub fn gpu_pool_for_policy(policy: EncodePolicy) -> Arc<GpuPool> {
731    Arc::new(GpuPool::new(&select_gpus_for_policy(policy)))
732}
733
734/// The GPU indices an [`EncodePolicy`] selects, in detection order. Used to pin
735/// the decode pump to a device consistent with the policy (so decode honors a
736/// `Family` / `SingleGpu` constraint, not just encode).
737pub fn policy_gpu_indices(policy: EncodePolicy) -> Vec<u32> {
738    select_gpus_for_policy(policy).into_iter().map(|g| g.index).collect()
739}
740
741/// The GPU index to pin a *serial* (single-GPU) encode/decode to under a
742/// policy: `None` (auto/first-available) for `AllGpus`, the pinned index for
743/// `SingleGpu`, the first device of the vendor for `Family`.
744pub fn serial_gpu_for_policy(policy: EncodePolicy) -> Option<u32> {
745    match policy {
746        EncodePolicy::AllGpus => None,
747        EncodePolicy::SingleGpu(idx) => idx,
748        EncodePolicy::Family(_) => select_gpus_for_policy(policy).first().map(|g| g.index),
749    }
750}
751
752// ---------------------------------------------------------------------------
753// Single-file multi-GPU: chunk each rung across GPUs, stitch packets into MP4.
754// ---------------------------------------------------------------------------
755
756/// One rung's full ordered AV1 packet stream, stitched from chunks encoded
757/// across GPUs. The caller muxes these into a single MP4 (+ audio).
758#[derive(Debug)]
759pub struct RungPackets {
760    pub rung_index: usize,
761    pub width: u32,
762    pub height: u32,
763    pub label: String,
764    pub packets: Vec<EncodedPacket>,
765}
766
767/// Single-file counterpart to [`run_multigpu_hls`]: decode once, fan to per-rung
768/// scalers, and dynamically schedule each rung's GOP-sized chunks across all
769/// GPUs (fair lease pool + mid-flight helper dispatch + cross-vendor codec
770/// invariant). Each worker encodes its chunk to packets (a fresh encoder per
771/// chunk → first frame is an IDR); the finalizer concatenates them in segment
772/// order into one ordered packet stream per rung — no disk round-trip.
773pub async fn run_multigpu_single_file(
774    params: MultiGpuParams<'_>,
775    sink: Arc<dyn ProgressSink>,
776) -> Result<Vec<Option<RungPackets>>> {
777    let rungs = params.rungs;
778    let n = rungs.len();
779    if n == 0 {
780        return Ok(Vec::new());
781    }
782    let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
783    if total_segments == 0 {
784        bail!(
785            "multigpu single-file: total_segments == 0 (frames={}, keyframe_interval={})",
786            params.total_input_frames,
787            params.keyframe_interval
788        );
789    }
790
791    // Pre-flight encoder probe (same fail-fast as the HLS path).
792    {
793        let probe = codec::encode::EncoderConfig {
794            width: rungs[0].width,
795            height: rungs[0].height,
796            frame_rate: params.frame_rate,
797            gpu_index: None,
798            ..Default::default()
799        };
800        codec::encode::select_encoder(probe, None).map_err(|e| {
801            anyhow!(
802                "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
803                 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature"
804            )
805        })?;
806    }
807
808    tracing::info!(
809        rungs = n,
810        total_segments,
811        gpu_pool_capacity = params.gpu_pool.capacity(),
812        "multi-GPU single-file phase starting"
813    );
814
815    let queues: Vec<Arc<SegmentChunkQueue>> =
816        (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
817    let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
818    let scaler_active: Vec<Arc<AtomicBool>> =
819        (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
820    let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
821        (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
822    // Per-rung packet collectors (each its own Arc so chunk workers can push).
823    let contributions: Vec<Arc<std::sync::Mutex<Vec<ChunkPackets>>>> =
824        (0..n).map(|_| Arc::new(std::sync::Mutex::new(Vec::new()))).collect();
825    let active_workers: Arc<Vec<AtomicUsize>> =
826        Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
827    let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
828    let finalized: Arc<Vec<AtomicBool>> = Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
829
830    let progress_stop = Arc::new(AtomicBool::new(false));
831    let progress_handle = spawn_progress_reporter(
832        rungs.to_vec(),
833        frames_encoded.clone(),
834        finalized.clone(),
835        params.total_input_frames,
836        Arc::clone(&sink),
837        Arc::clone(&progress_stop),
838    );
839
840    // Finalizers: stitch each rung's chunks (sorted, deduped) into one stream.
841    let total_input_frames = params.total_input_frames;
842    let (finalizer_tx, mut finalizer_rx) =
843        mpsc::channel::<(usize, Result<Option<RungPackets>>)>(n.max(1));
844    let mut finalizer_handles = Vec::with_capacity(n);
845    for idx in 0..n {
846        let collector = Arc::clone(&contributions[idx]);
847        let active_h = Arc::clone(&active_workers);
848        let rung_done_h = Arc::clone(&rung_done);
849        let finalized_h = Arc::clone(&finalized);
850        let tx = finalizer_tx.clone();
851        let rung = rungs[idx].clone();
852        let total_segments = total_segments;
853        let sink = Arc::clone(&sink);
854        finalizer_handles.push(tokio::spawn(async move {
855            loop {
856                let notified = rung_done_h[idx].notified();
857                if active_h[idx].load(Ordering::Acquire) == 0 {
858                    break;
859                }
860                notified.await;
861            }
862            let mut chunks: Vec<ChunkPackets> = std::mem::take(&mut *collector.lock().unwrap());
863            if chunks.is_empty() {
864                finalized_h[idx].store(true, Ordering::Release);
865                let _ = tx.send((idx, Ok(None))).await;
866                return;
867            }
868            chunks.sort_by_key(|c| c.segment_idx);
869            chunks.dedup_by_key(|c| c.segment_idx);
870            // Coverage: contiguous 0..total_segments.
871            let got = chunks.len();
872            let contiguous = chunks
873                .iter()
874                .enumerate()
875                .all(|(i, c)| c.segment_idx == i);
876            let result = if got != total_segments as usize || !contiguous {
877                Err(anyhow!(
878                    "rung {} chunk coverage incomplete: expected {} contiguous chunks, got {}",
879                    rung.label,
880                    total_segments,
881                    got
882                ))
883            } else {
884                let mut packets: Vec<EncodedPacket> = Vec::new();
885                for c in chunks {
886                    packets.extend(c.packets);
887                }
888                let bytes: u64 = packets.iter().map(|p| p.data.len() as u64).sum();
889                report(
890                    sink.as_ref(),
891                    idx,
892                    &rung,
893                    RungStatus::Completed,
894                    total_input_frames,
895                    Some(total_input_frames),
896                    got as u32,
897                    bytes,
898                    None,
899                );
900                Ok(Some(RungPackets {
901                    rung_index: idx,
902                    width: rung.width,
903                    height: rung.height,
904                    label: rung.label.clone(),
905                    packets,
906                }))
907            };
908            finalized_h[idx].store(true, Ordering::Release);
909            let _ = tx.send((idx, result)).await;
910        }));
911    }
912    drop(finalizer_tx);
913
914    let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
915    indexed.sort_by_key(|(_, r)| r.short_side());
916
917    // Decode pump(s) + fan-out.
918    let mut frame_senders = Vec::with_capacity(n);
919    let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
920        Vec::with_capacity(n);
921    for _ in 0..n {
922        let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
923        frame_senders.push(tx);
924        frame_receivers.push(Some(rx));
925    }
926    let use_shared_pump = n <= params.gpu_pool.capacity();
927    let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
928    let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
929        codec_name: params.header.codec.clone(),
930        info_for_decoder: params.header.info.clone(),
931        source_color_metadata: params.source_color_metadata,
932        source_pixel_format: params.source_pixel_format,
933        needs_downsample: params.needs_downsample,
934        tonemap_to_sdr: params.tonemap_to_sdr,
935        gpu_index,
936        filters: params.filters.clone(),
937    };
938    if use_shared_pump {
939        let cfg = make_pump_cfg(params.decode_gpu_for(0));
940        let senders = frame_senders;
941        let input = params.input.clone();
942        let rt = tokio::runtime::Handle::current();
943        pump_tasks.spawn(async move {
944            tokio::task::spawn_blocking(move || {
945                crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
946            })
947            .await
948            .map_err(|e| anyhow!("shared pump join error: {e}"))
949            .and_then(|r| r)
950        });
951    } else {
952        for (idx, sender) in frame_senders.into_iter().enumerate() {
953            let cfg = make_pump_cfg(params.decode_gpu_for(idx));
954            let input = params.input.clone();
955            let rt = tokio::runtime::Handle::current();
956            pump_tasks.spawn(async move {
957                tokio::task::spawn_blocking(move || {
958                    crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
959                })
960                .await
961                .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
962                .and_then(|r| r)
963            });
964        }
965    }
966
967    // Per-rung scalers.
968    let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
969    for (idx, rung) in rungs.iter().cloned().enumerate() {
970        let rx = frame_receivers[idx].take().expect("scaler rx slot");
971        let cfg = crate::rung_scaler::RungScalerConfig {
972            rung_idx: idx,
973            target_width: rung.width,
974            target_height: rung.height,
975            frames_per_chunk: params.keyframe_interval,
976        };
977        let queue = Arc::clone(&queues[idx]);
978        let rt = tokio::runtime::Handle::current();
979        let scaler_flag = Arc::clone(&scaler_active[idx]);
980        let active_h = Arc::clone(&active_workers);
981        let rung_done_h = Arc::clone(&rung_done);
982        scaler_flag.store(true, Ordering::Release);
983        active_h[idx].fetch_add(1, Ordering::AcqRel);
984        scaler_tasks.spawn(async move {
985            let result = tokio::task::spawn_blocking(move || {
986                crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
987            })
988            .await
989            .map_err(|e| anyhow!("scaler join error: {e}"))
990            .and_then(|r| r);
991            scaler_flag.store(false, Ordering::Release);
992            let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
993            if prev == 1 {
994                rung_done_h[idx].notify_one();
995            }
996            (idx, result)
997        });
998    }
999
1000    // Initial chunk workers.
1001    let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
1002    let ctx = WorkerCtx {
1003        frame_rate: params.frame_rate,
1004        output_color_metadata: params.output_color_metadata,
1005        output_pixel_format: params.output_pixel_format,
1006        timescale: params.timescale,
1007        per_frame_ticks: params.per_frame_ticks,
1008        keyframe_interval: params.keyframe_interval,
1009        segment_target_ticks: params.segment_target_ticks,
1010        output_root: params.output_root.clone(),
1011        constant_qp: params.constant_qp,
1012    };
1013    for (idx, rung) in indexed.iter().cloned() {
1014        let lease = match Arc::clone(&params.gpu_pool).claim().await {
1015            Some(l) => l,
1016            None => {
1017                progress_stop.store(true, Ordering::Release);
1018                let _ = progress_handle.await;
1019                bail!("multigpu single-file: GPU pool returned no lease; at least one GPU required");
1020            }
1021        };
1022        spawn_chunk_worker(
1023            &ctx,
1024            idx,
1025            &rung,
1026            Arc::clone(&queues[idx]),
1027            Arc::clone(&frames_encoded[idx]),
1028            lease,
1029            Arc::clone(&contributions[idx]),
1030            Arc::clone(&active_workers),
1031            Arc::clone(&rung_done),
1032            Arc::clone(&rung_invariants[idx]),
1033            Some(&mut worker_tasks),
1034        );
1035    }
1036
1037    // Helper dispatcher.
1038    let helper_cancel = Arc::new(AtomicBool::new(false));
1039    let helper_handle = {
1040        let cancel = Arc::clone(&helper_cancel);
1041        let pool = Arc::clone(&params.gpu_pool);
1042        let queues = queues.clone();
1043        let scaler_active = scaler_active.clone();
1044        let frames_encoded = frames_encoded.clone();
1045        let contributions = contributions.clone();
1046        let active_workers = Arc::clone(&active_workers);
1047        let rung_done = Arc::clone(&rung_done);
1048        let rung_invariants = rung_invariants.clone();
1049        let rungs_owned: Vec<Rung> = rungs.to_vec();
1050        let ctx = ctx.clone();
1051        tokio::spawn(async move {
1052            loop {
1053                if cancel.load(Ordering::Acquire) {
1054                    break;
1055                }
1056                tokio::time::sleep(HELPER_POLL_INTERVAL).await;
1057                if pool.pending_claimers() > 0 {
1058                    continue;
1059                }
1060                let mut target = None;
1061                for (idx, q) in queues.iter().enumerate() {
1062                    let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
1063                    let has_pending = q.pushed_segments() > q.popped_segments();
1064                    if scaler_alive || has_pending {
1065                        target = Some(idx);
1066                        break;
1067                    }
1068                }
1069                let Some(rung_idx) = target else { break };
1070                let lease = match pool.try_claim() {
1071                    Some(l) => l,
1072                    None => continue,
1073                };
1074                tracing::info!(rung_idx, gpu_index = lease.gpu_index, "single-file helper dispatch");
1075                spawn_chunk_worker(
1076                    &ctx,
1077                    rung_idx,
1078                    &rungs_owned[rung_idx],
1079                    Arc::clone(&queues[rung_idx]),
1080                    Arc::clone(&frames_encoded[rung_idx]),
1081                    lease,
1082                    Arc::clone(&contributions[rung_idx]),
1083                    Arc::clone(&active_workers),
1084                    Arc::clone(&rung_done),
1085                    Arc::clone(&rung_invariants[rung_idx]),
1086                    None,
1087                );
1088            }
1089        })
1090    };
1091
1092    // Drain.
1093    let mut completed: Vec<Option<RungPackets>> = (0..n).map(|_| None).collect();
1094    let mut pumps_remaining = pump_tasks.len();
1095    let mut scalers_remaining = n;
1096    let mut workers_remaining = n;
1097    let mut finalizers_remaining = n;
1098    macro_rules! teardown_err {
1099        ($e:expr) => {{
1100            helper_cancel.store(true, Ordering::Release);
1101            let _ = helper_handle.await;
1102            progress_stop.store(true, Ordering::Release);
1103            let _ = progress_handle.await;
1104            return Err($e);
1105        }};
1106    }
1107    while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
1108        tokio::select! {
1109            biased;
1110            p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
1111                Some(Ok(Ok(_))) => pumps_remaining -= 1,
1112                Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
1113                Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
1114                None => pumps_remaining = 0,
1115            },
1116            s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
1117                Some(Ok((_, Ok(_)))) => scalers_remaining -= 1,
1118                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
1119                Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
1120                None => scalers_remaining = 0,
1121            },
1122            w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
1123                Some(Ok((_, Ok(())))) => workers_remaining -= 1,
1124                Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("chunk worker for rung {idx} failed: {e}")),
1125                Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
1126                None => workers_remaining = 0,
1127            },
1128            f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
1129                Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
1130                Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
1131                None => finalizers_remaining = 0,
1132            },
1133        }
1134    }
1135    helper_cancel.store(true, Ordering::Release);
1136    let _ = helper_handle.await;
1137    progress_stop.store(true, Ordering::Release);
1138    let _ = progress_handle.await;
1139    for h in finalizer_handles {
1140        let _ = h.await;
1141    }
1142    Ok(completed)
1143}
1144
1145#[allow(clippy::too_many_arguments)]
1146fn spawn_chunk_worker(
1147    ctx: &WorkerCtx,
1148    rung_idx: usize,
1149    rung: &Rung,
1150    queue: Arc<SegmentChunkQueue>,
1151    frames_encoded: Arc<AtomicU64>,
1152    lease: GpuLease,
1153    collector: Arc<std::sync::Mutex<Vec<ChunkPackets>>>,
1154    active_workers: Arc<Vec<AtomicUsize>>,
1155    rung_done: Arc<Vec<Notify>>,
1156    rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
1157    worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
1158) {
1159    let gpu_index = lease.gpu_index;
1160    let gpu_vendor = lease.vendor;
1161    let cfg = EncoderWorkerConfig {
1162        rung_idx,
1163        width: rung.width,
1164        height: rung.height,
1165        frame_rate: ctx.frame_rate,
1166        quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
1167        speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
1168        target: rung.quality.target,
1169        tier: rung.quality.tier,
1170        threads: 0,
1171        gpu_index: Some(gpu_index),
1172        gpu_vendor: Some(gpu_vendor),
1173        output_color_metadata: ctx.output_color_metadata,
1174        output_pixel_format: ctx.output_pixel_format,
1175        constant_qp: ctx.constant_qp,
1176        timescale: ctx.timescale,
1177        per_frame_ticks: ctx.per_frame_ticks,
1178        keyframe_interval: ctx.keyframe_interval,
1179        segment_target_ticks: ctx.segment_target_ticks,
1180        output_dir: ctx.output_root.clone(),
1181        rung_invariant,
1182    };
1183    active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
1184    let body = async move {
1185        let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
1186        let cfg_for_worker = cfg.clone();
1187        let queue_for_worker = Arc::clone(&queue);
1188        let rt = tokio::runtime::Handle::current();
1189        let counter = Arc::clone(&frames_encoded);
1190        let out = Arc::clone(&collector);
1191        let blocking = tokio::task::spawn_blocking(move || {
1192            run_chunk_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx, out)
1193        });
1194        let drain = async move { while progress_rx.recv().await.is_some() {} };
1195        let (_, br) = tokio::join!(drain, blocking);
1196        let task_status: Result<()> = match br {
1197            Ok(Ok(())) => Ok(()),
1198            Ok(Err(e)) => Err(e),
1199            Err(e) => Err(anyhow!("chunk worker join error: {e}")),
1200        };
1201        drop(lease);
1202        let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
1203        if prev == 1 {
1204            rung_done[rung_idx].notify_one();
1205        }
1206        (rung_idx, task_status)
1207    };
1208    match worker_tasks {
1209        Some(set) => {
1210            set.spawn(body);
1211        }
1212        None => {
1213            tokio::spawn(async move {
1214                let _ = body.await;
1215            });
1216        }
1217    }
1218}