1use std::path::PathBuf;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
33
34use anyhow::{Result, anyhow, bail};
35use bytes::Bytes;
36use codec::encode::AUTO_FROM_TARGET;
37use codec::frame::{ColorMetadata, PixelFormat};
38use container::cmaf::CmafTrackManifest;
39use container::streaming::DemuxHeader;
40use tokio::sync::{Notify, mpsc};
41use tokio::task::JoinSet;
42
43use codec::encode::EncodedPacket;
44
45use crate::cmaf_util::{RungContribution, merge_rung_contributions, total_segments_for_rung};
46use crate::decode_pump::DecodePumpConfig;
47use crate::encoder_worker::{
48 ChunkPackets, EncoderWorkerConfig, RungCodecInvariant, WorkerOutput,
49 run_chunk_encoder_worker_blocking, run_encoder_worker_blocking,
50};
51use crate::frame_queue::SegmentChunkQueue;
52use crate::gpu_pool::{GpuLease, GpuPool};
53use crate::progress::{ProgressSink, RungProgress, RungStatus};
54use crate::spec::{EncodePolicy, GpuFamily, Rung};
55
56const QUEUE_CAPACITY: usize = 2;
57const FANOUT_CHANNEL_CAPACITY: usize = 4;
58const HELPER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(200);
59const PROGRESS_TICK: std::time::Duration = std::time::Duration::from_millis(500);
60
61#[derive(Debug, Clone)]
63pub struct RungManifest {
64 pub rung_index: usize,
65 pub width: u32,
66 pub height: u32,
67 pub label: String,
68 pub relative_dir: String,
70 pub manifest: CmafTrackManifest,
71}
72
73pub struct MultiGpuParams<'a> {
75 pub input: Bytes,
76 pub rungs: &'a [Rung],
77 pub header: DemuxHeader,
78 pub source_color_metadata: ColorMetadata,
79 pub source_pixel_format: PixelFormat,
80 pub tonemap_to_sdr: bool,
82 pub output_color_metadata: ColorMetadata,
85 pub output_pixel_format: PixelFormat,
86 pub needs_downsample: bool,
87 pub frame_rate: f64,
88 pub gpu_pool: Arc<GpuPool>,
89 pub gpu_indices: Vec<u32>,
94 pub decode_gpu: Option<u32>,
97 pub output_root: PathBuf,
98 pub timescale: u32,
99 pub per_frame_ticks: u32,
100 pub keyframe_interval: u32,
101 pub segment_target_ticks: u64,
102 pub total_input_frames: u64,
103 pub constant_qp: bool,
107}
108
109impl MultiGpuParams<'_> {
110 fn decode_gpu_for(&self, i: usize) -> Option<u32> {
114 if self.decode_gpu.is_some() {
115 return self.decode_gpu;
116 }
117 if self.gpu_indices.is_empty() {
118 return None;
119 }
120 Some(self.gpu_indices[i % self.gpu_indices.len()])
121 }
122}
123
124pub async fn run_multigpu_hls(
127 params: MultiGpuParams<'_>,
128 sink: Arc<dyn ProgressSink>,
129) -> Result<Vec<Option<RungManifest>>> {
130 let rungs = params.rungs;
131 let n = rungs.len();
132 if n == 0 {
133 return Ok(Vec::new());
134 }
135 let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
136 if total_segments == 0 {
137 bail!(
138 "multigpu: total_segments == 0 (total_input_frames={}, keyframe_interval={})",
139 params.total_input_frames,
140 params.keyframe_interval
141 );
142 }
143
144 {
150 let probe = codec::encode::EncoderConfig {
151 width: rungs[0].width,
152 height: rungs[0].height,
153 frame_rate: params.frame_rate,
154 gpu_index: None,
155 ..Default::default()
156 };
157 codec::encode::select_encoder(probe, None).map_err(|e| {
158 anyhow!(
159 "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
160 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature for a software encoder"
161 )
162 })?;
163 }
164
165 tracing::info!(
166 rungs = n,
167 total_segments,
168 gpu_pool_capacity = params.gpu_pool.capacity(),
169 "multi-GPU variant phase starting"
170 );
171
172 let queues: Vec<Arc<SegmentChunkQueue>> =
174 (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
175 let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
176 let scaler_active: Vec<Arc<AtomicBool>> =
177 (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
178 let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
179 (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
180 let contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>> =
181 Arc::new((0..n).map(|_| std::sync::Mutex::new(Vec::new())).collect());
182 let active_workers: Arc<Vec<AtomicUsize>> =
183 Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
184 let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
185 let finalized: Arc<Vec<AtomicBool>> =
186 Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
187
188 let progress_stop = Arc::new(AtomicBool::new(false));
190 let progress_handle = spawn_progress_reporter(
191 rungs.to_vec(),
192 frames_encoded.clone(),
193 finalized.clone(),
194 params.total_input_frames,
195 Arc::clone(&sink),
196 Arc::clone(&progress_stop),
197 );
198
199 let total_input_frames = params.total_input_frames;
201 let (finalizer_tx, mut finalizer_rx) =
202 mpsc::channel::<(usize, Result<Option<RungManifest>>)>(n.max(1));
203 let mut finalizer_handles = Vec::with_capacity(n);
204 for idx in 0..n {
205 let contributions_h = Arc::clone(&contributions);
206 let active_h = Arc::clone(&active_workers);
207 let rung_done_h = Arc::clone(&rung_done);
208 let finalized_h = Arc::clone(&finalized);
209 let tx = finalizer_tx.clone();
210 let rung = rungs[idx].clone();
211 let rel_dir = format!("video/{}", rung.label);
212 let output_root = params.output_root.clone();
213 let timescale = params.timescale;
214 let total_segments = total_segments;
215 let sink = Arc::clone(&sink);
216 finalizer_handles.push(tokio::spawn(async move {
217 loop {
219 let notified = rung_done_h[idx].notified();
220 if active_h[idx].load(Ordering::Acquire) == 0 {
221 break;
222 }
223 notified.await;
224 }
225 let outputs: Vec<WorkerOutput> = std::mem::take(&mut *contributions_h[idx].lock().unwrap());
226 if outputs.is_empty() {
227 finalized_h[idx].store(true, Ordering::Release);
228 let _ = tx.send((idx, Ok(None))).await;
229 return;
230 }
231 let init_path = output_root.join(&rel_dir).join("init.mp4");
232 let contribs: Vec<RungContribution> = outputs
233 .into_iter()
234 .map(|wo| RungContribution {
235 width: rung.width,
236 height: rung.height,
237 relative_dir: rel_dir.clone(),
238 manifest: CmafTrackManifest {
239 init_path: init_path.clone(),
240 segments: wo.segments,
241 timescale,
242 },
243 })
244 .collect();
245 let result = match merge_rung_contributions(contribs) {
246 Ok(merged) => {
247 let got = merged.manifest.segments.len();
248 if got != total_segments as usize {
249 Err(anyhow!(
250 "rung {} coverage incomplete: expected {} segments, got {}",
251 rung.label,
252 total_segments,
253 got
254 ))
255 } else {
256 let bytes: u64 = merged.manifest.segments.iter().map(|s| s.byte_size).sum();
257 report(
258 sink.as_ref(),
259 idx,
260 &rung,
261 RungStatus::Completed,
262 total_input_frames,
263 Some(total_input_frames),
264 got as u32,
265 bytes,
266 None,
267 );
268 Ok(Some(RungManifest {
269 rung_index: idx,
270 width: rung.width,
271 height: rung.height,
272 label: rung.label.clone(),
273 relative_dir: rel_dir.clone(),
274 manifest: merged.manifest,
275 }))
276 }
277 }
278 Err(e) => Err(anyhow!("merging contributions for rung {}: {e}", rung.label)),
279 };
280 finalized_h[idx].store(true, Ordering::Release);
281 let _ = tx.send((idx, result)).await;
282 }));
283 }
284 drop(finalizer_tx);
285
286 let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
288 indexed.sort_by_key(|(_, r)| r.short_side());
289
290 let mut frame_senders = Vec::with_capacity(n);
292 let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
293 Vec::with_capacity(n);
294 for _ in 0..n {
295 let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
296 frame_senders.push(tx);
297 frame_receivers.push(Some(rx));
298 }
299
300 let use_shared_pump = n <= params.gpu_pool.capacity();
301 let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
302 let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
303 codec_name: params.header.codec.clone(),
304 info_for_decoder: params.header.info.clone(),
305 source_color_metadata: params.source_color_metadata,
306 source_pixel_format: params.source_pixel_format,
307 needs_downsample: params.needs_downsample,
308 tonemap_to_sdr: params.tonemap_to_sdr,
309 gpu_index,
310 };
311 if use_shared_pump {
312 let cfg = make_pump_cfg(params.decode_gpu_for(0));
313 let senders = frame_senders;
314 let input = params.input.clone();
315 let rt = tokio::runtime::Handle::current();
316 pump_tasks.spawn(async move {
317 tokio::task::spawn_blocking(move || {
318 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
319 })
320 .await
321 .map_err(|e| anyhow!("shared pump join error: {e}"))
322 .and_then(|r| r)
323 });
324 } else {
325 for (idx, sender) in frame_senders.into_iter().enumerate() {
326 let cfg = make_pump_cfg(params.decode_gpu_for(idx));
327 let input = params.input.clone();
328 let rt = tokio::runtime::Handle::current();
329 pump_tasks.spawn(async move {
330 tokio::task::spawn_blocking(move || {
331 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
332 })
333 .await
334 .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
335 .and_then(|r| r)
336 });
337 }
338 }
339
340 let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
342 for (idx, rung) in rungs.iter().cloned().enumerate() {
343 let rx = frame_receivers[idx].take().expect("scaler rx slot");
344 let cfg = crate::rung_scaler::RungScalerConfig {
345 rung_idx: idx,
346 target_width: rung.width,
347 target_height: rung.height,
348 frames_per_chunk: params.keyframe_interval,
349 };
350 let queue = Arc::clone(&queues[idx]);
351 let rt = tokio::runtime::Handle::current();
352 let scaler_flag = Arc::clone(&scaler_active[idx]);
353 let active_h = Arc::clone(&active_workers);
354 let rung_done_h = Arc::clone(&rung_done);
355 scaler_flag.store(true, Ordering::Release);
356 active_h[idx].fetch_add(1, Ordering::AcqRel);
357 scaler_tasks.spawn(async move {
358 let result = tokio::task::spawn_blocking(move || {
359 crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
360 })
361 .await
362 .map_err(|e| anyhow!("scaler join error: {e}"))
363 .and_then(|r| r);
364 scaler_flag.store(false, Ordering::Release);
365 let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
366 if prev == 1 {
367 rung_done_h[idx].notify_one();
368 }
369 (idx, result)
370 });
371 }
372
373 let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
375 let ctx = WorkerCtx {
376 frame_rate: params.frame_rate,
377 output_color_metadata: params.output_color_metadata,
378 output_pixel_format: params.output_pixel_format,
379 timescale: params.timescale,
380 per_frame_ticks: params.per_frame_ticks,
381 keyframe_interval: params.keyframe_interval,
382 segment_target_ticks: params.segment_target_ticks,
383 output_root: params.output_root.clone(),
384 constant_qp: params.constant_qp,
385 };
386 for (idx, rung) in indexed.iter().cloned() {
387 let lease = match Arc::clone(¶ms.gpu_pool).claim().await {
388 Some(l) => l,
389 None => {
390 progress_stop.store(true, Ordering::Release);
391 let _ = progress_handle.await;
392 bail!("multigpu: GPU pool returned no lease on a CPU-only host; at least one GPU is required");
393 }
394 };
395 spawn_encoder_worker(
396 &ctx,
397 idx,
398 &rung,
399 Arc::clone(&queues[idx]),
400 Arc::clone(&frames_encoded[idx]),
401 lease,
402 Arc::clone(&contributions),
403 Arc::clone(&active_workers),
404 Arc::clone(&rung_done),
405 Arc::clone(&rung_invariants[idx]),
406 Some(&mut worker_tasks),
407 );
408 }
409
410 let helper_cancel = Arc::new(AtomicBool::new(false));
412 let helper_handle = {
413 let cancel = Arc::clone(&helper_cancel);
414 let pool = Arc::clone(¶ms.gpu_pool);
415 let queues = queues.clone();
416 let scaler_active = scaler_active.clone();
417 let frames_encoded = frames_encoded.clone();
418 let contributions = Arc::clone(&contributions);
419 let active_workers = Arc::clone(&active_workers);
420 let rung_done = Arc::clone(&rung_done);
421 let rung_invariants = rung_invariants.clone();
422 let rungs_owned: Vec<Rung> = rungs.to_vec();
423 let ctx = ctx.clone();
424 tokio::spawn(async move {
425 loop {
426 if cancel.load(Ordering::Acquire) {
427 break;
428 }
429 tokio::time::sleep(HELPER_POLL_INTERVAL).await;
430 if pool.pending_claimers() > 0 {
431 continue;
432 }
433 let mut target = None;
434 for (idx, q) in queues.iter().enumerate() {
435 let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
436 let has_pending = q.pushed_segments() > q.popped_segments();
437 if scaler_alive || has_pending {
438 target = Some(idx);
439 break;
440 }
441 }
442 let Some(rung_idx) = target else { break };
443 let lease = match pool.try_claim() {
444 Some(l) => l,
445 None => continue,
446 };
447 tracing::info!(rung_idx, gpu_index = lease.gpu_index, "multigpu helper dispatch");
448 spawn_encoder_worker(
449 &ctx,
450 rung_idx,
451 &rungs_owned[rung_idx],
452 Arc::clone(&queues[rung_idx]),
453 Arc::clone(&frames_encoded[rung_idx]),
454 lease,
455 Arc::clone(&contributions),
456 Arc::clone(&active_workers),
457 Arc::clone(&rung_done),
458 Arc::clone(&rung_invariants[rung_idx]),
459 None,
460 );
461 }
462 })
463 };
464
465 let mut completed: Vec<Option<RungManifest>> = (0..n).map(|_| None).collect();
467 let mut pumps_remaining = pump_tasks.len();
468 let mut scalers_remaining = n;
469 let mut workers_remaining = n;
470 let mut finalizers_remaining = n;
471
472 macro_rules! teardown_err {
473 ($e:expr) => {{
474 helper_cancel.store(true, Ordering::Release);
475 let _ = helper_handle.await;
476 progress_stop.store(true, Ordering::Release);
477 let _ = progress_handle.await;
478 return Err($e);
479 }};
480 }
481
482 while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
483 tokio::select! {
484 biased;
485 p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
486 Some(Ok(Ok(n))) => { tracing::info!(frames = n, "decode pump finished"); pumps_remaining -= 1; }
487 Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
488 Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
489 None => pumps_remaining = 0,
490 },
491 s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
492 Some(Ok((idx, Ok(n)))) => { tracing::info!(idx, chunks = n, "scaler finished"); scalers_remaining -= 1; }
493 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
494 Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
495 None => scalers_remaining = 0,
496 },
497 w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
498 Some(Ok((idx, Ok(())))) => { tracing::info!(idx, "initial worker finished"); workers_remaining -= 1; }
499 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("worker for rung {idx} failed: {e}")),
500 Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
501 None => workers_remaining = 0,
502 },
503 f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
504 Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
505 Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
506 None => finalizers_remaining = 0,
507 },
508 }
509 }
510
511 helper_cancel.store(true, Ordering::Release);
512 let _ = helper_handle.await;
513 progress_stop.store(true, Ordering::Release);
514 let _ = progress_handle.await;
515 for h in finalizer_handles {
516 let _ = h.await;
517 }
518
519 Ok(completed)
520}
521
522#[derive(Clone)]
524struct WorkerCtx {
525 frame_rate: f64,
526 output_color_metadata: ColorMetadata,
527 output_pixel_format: PixelFormat,
528 timescale: u32,
529 per_frame_ticks: u32,
530 keyframe_interval: u32,
531 segment_target_ticks: u64,
532 output_root: PathBuf,
533 constant_qp: bool,
534}
535
536#[allow(clippy::too_many_arguments)]
537fn spawn_encoder_worker(
538 ctx: &WorkerCtx,
539 rung_idx: usize,
540 rung: &Rung,
541 queue: Arc<SegmentChunkQueue>,
542 frames_encoded: Arc<AtomicU64>,
543 lease: GpuLease,
544 contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>>,
545 active_workers: Arc<Vec<AtomicUsize>>,
546 rung_done: Arc<Vec<Notify>>,
547 rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
548 worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
549) {
550 let rel_dir = format!("video/{}", rung.label);
551 let output_dir = ctx.output_root.join(&rel_dir);
552 let gpu_index = lease.gpu_index;
553 let gpu_vendor = lease.vendor;
554
555 let cfg = EncoderWorkerConfig {
556 rung_idx,
557 width: rung.width,
558 height: rung.height,
559 frame_rate: ctx.frame_rate,
560 quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
561 speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
562 target: rung.quality.target,
563 tier: rung.quality.tier,
564 threads: 0,
565 gpu_index: Some(gpu_index),
566 gpu_vendor: Some(gpu_vendor),
567 output_color_metadata: ctx.output_color_metadata,
568 output_pixel_format: ctx.output_pixel_format,
569 constant_qp: ctx.constant_qp,
570 timescale: ctx.timescale,
571 per_frame_ticks: ctx.per_frame_ticks,
572 keyframe_interval: ctx.keyframe_interval,
573 segment_target_ticks: ctx.segment_target_ticks,
574 output_dir,
575 rung_invariant,
576 };
577
578 active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
579 let body = async move {
580 let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
581 let cfg_for_worker = cfg.clone();
582 let queue_for_worker = Arc::clone(&queue);
583 let rt = tokio::runtime::Handle::current();
584 let counter = Arc::clone(&frames_encoded);
585 let blocking = tokio::task::spawn_blocking(move || {
586 run_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx)
587 });
588 let drain = async move { while progress_rx.recv().await.is_some() {} };
592 let (_, br) = tokio::join!(drain, blocking);
593
594 let task_status: Result<()> = match br {
595 Ok(Ok(out)) => {
596 contributions[rung_idx].lock().unwrap().push(out);
597 Ok(())
598 }
599 Ok(Err(e)) => Err(e),
600 Err(e) => Err(anyhow!("worker join error: {e}")),
601 };
602 drop(lease);
603 let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
604 if prev == 1 {
605 rung_done[rung_idx].notify_one();
606 }
607 (rung_idx, task_status)
608 };
609
610 match worker_tasks {
611 Some(set) => {
612 set.spawn(body);
613 }
614 None => {
615 tokio::spawn(async move {
616 let _ = body.await;
617 });
618 }
619 }
620}
621
622fn spawn_progress_reporter(
625 rungs: Vec<Rung>,
626 frames_encoded: Vec<Arc<AtomicU64>>,
627 finalized: Arc<Vec<AtomicBool>>,
628 total_input_frames: u64,
629 sink: Arc<dyn ProgressSink>,
630 stop: Arc<AtomicBool>,
631) -> tokio::task::JoinHandle<()> {
632 tokio::spawn(async move {
633 loop {
634 if stop.load(Ordering::Acquire) {
635 break;
636 }
637 tokio::time::sleep(PROGRESS_TICK).await;
638 for (idx, rung) in rungs.iter().enumerate() {
639 if finalized[idx].load(Ordering::Acquire) {
640 continue;
641 }
642 let done = frames_encoded[idx].load(Ordering::Relaxed);
643 report(
644 sink.as_ref(),
645 idx,
646 rung,
647 RungStatus::Running,
648 done,
649 Some(total_input_frames),
650 0,
651 0,
652 None,
653 );
654 }
655 }
656 })
657}
658
659#[allow(clippy::too_many_arguments)]
660fn report(
661 sink: &dyn ProgressSink,
662 rung_index: usize,
663 rung: &Rung,
664 status: RungStatus,
665 frames_done: u64,
666 frames_total: Option<u64>,
667 segments: u32,
668 bytes_out: u64,
669 message: Option<String>,
670) {
671 let percent = match status {
672 RungStatus::Completed => 100.0,
673 RungStatus::Pending => 0.0,
674 _ => match frames_total {
675 Some(t) if t > 0 => ((frames_done as f32 / t as f32) * 100.0).min(99.0),
676 _ => 1.0,
677 },
678 };
679 sink.on_rung(RungProgress {
680 rung_index,
681 label: rung.label.clone(),
682 width: rung.width,
683 height: rung.height,
684 status,
685 percent,
686 frames_done,
687 frames_total,
688 segments_written: segments,
689 bytes_out,
690 message,
691 });
692}
693
694pub fn detect_gpu_pool() -> Arc<GpuPool> {
696 Arc::new(GpuPool::new(&codec::gpu::detect_gpus()))
697}
698
699fn policy_vendor(fam: GpuFamily) -> codec::gpu::GpuVendor {
700 match fam {
701 GpuFamily::Nvidia => codec::gpu::GpuVendor::Nvidia,
702 GpuFamily::Amd => codec::gpu::GpuVendor::Amd,
703 GpuFamily::Intel => codec::gpu::GpuVendor::Intel,
704 }
705}
706
707fn select_gpus_for_policy(policy: EncodePolicy) -> Vec<codec::gpu::GpuDevice> {
711 let gpus = codec::gpu::detect_gpus();
712 match policy {
713 EncodePolicy::AllGpus => gpus,
714 EncodePolicy::SingleGpu(None) => gpus.into_iter().take(1).collect(),
715 EncodePolicy::SingleGpu(Some(idx)) => gpus.into_iter().filter(|g| g.index == idx).collect(),
716 EncodePolicy::Family(fam) => {
717 let v = policy_vendor(fam);
718 gpus.into_iter().filter(|g| g.vendor == v).collect()
719 }
720 }
721}
722
723pub fn gpu_pool_for_policy(policy: EncodePolicy) -> Arc<GpuPool> {
727 Arc::new(GpuPool::new(&select_gpus_for_policy(policy)))
728}
729
730pub fn policy_gpu_indices(policy: EncodePolicy) -> Vec<u32> {
734 select_gpus_for_policy(policy).into_iter().map(|g| g.index).collect()
735}
736
737pub fn serial_gpu_for_policy(policy: EncodePolicy) -> Option<u32> {
741 match policy {
742 EncodePolicy::AllGpus => None,
743 EncodePolicy::SingleGpu(idx) => idx,
744 EncodePolicy::Family(_) => select_gpus_for_policy(policy).first().map(|g| g.index),
745 }
746}
747
748#[derive(Debug)]
755pub struct RungPackets {
756 pub rung_index: usize,
757 pub width: u32,
758 pub height: u32,
759 pub label: String,
760 pub packets: Vec<EncodedPacket>,
761}
762
763pub async fn run_multigpu_single_file(
770 params: MultiGpuParams<'_>,
771 sink: Arc<dyn ProgressSink>,
772) -> Result<Vec<Option<RungPackets>>> {
773 let rungs = params.rungs;
774 let n = rungs.len();
775 if n == 0 {
776 return Ok(Vec::new());
777 }
778 let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
779 if total_segments == 0 {
780 bail!(
781 "multigpu single-file: total_segments == 0 (frames={}, keyframe_interval={})",
782 params.total_input_frames,
783 params.keyframe_interval
784 );
785 }
786
787 {
789 let probe = codec::encode::EncoderConfig {
790 width: rungs[0].width,
791 height: rungs[0].height,
792 frame_rate: params.frame_rate,
793 gpu_index: None,
794 ..Default::default()
795 };
796 codec::encode::select_encoder(probe, None).map_err(|e| {
797 anyhow!(
798 "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
799 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature"
800 )
801 })?;
802 }
803
804 tracing::info!(
805 rungs = n,
806 total_segments,
807 gpu_pool_capacity = params.gpu_pool.capacity(),
808 "multi-GPU single-file phase starting"
809 );
810
811 let queues: Vec<Arc<SegmentChunkQueue>> =
812 (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
813 let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
814 let scaler_active: Vec<Arc<AtomicBool>> =
815 (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
816 let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
817 (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
818 let contributions: Vec<Arc<std::sync::Mutex<Vec<ChunkPackets>>>> =
820 (0..n).map(|_| Arc::new(std::sync::Mutex::new(Vec::new()))).collect();
821 let active_workers: Arc<Vec<AtomicUsize>> =
822 Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
823 let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
824 let finalized: Arc<Vec<AtomicBool>> = Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
825
826 let progress_stop = Arc::new(AtomicBool::new(false));
827 let progress_handle = spawn_progress_reporter(
828 rungs.to_vec(),
829 frames_encoded.clone(),
830 finalized.clone(),
831 params.total_input_frames,
832 Arc::clone(&sink),
833 Arc::clone(&progress_stop),
834 );
835
836 let total_input_frames = params.total_input_frames;
838 let (finalizer_tx, mut finalizer_rx) =
839 mpsc::channel::<(usize, Result<Option<RungPackets>>)>(n.max(1));
840 let mut finalizer_handles = Vec::with_capacity(n);
841 for idx in 0..n {
842 let collector = Arc::clone(&contributions[idx]);
843 let active_h = Arc::clone(&active_workers);
844 let rung_done_h = Arc::clone(&rung_done);
845 let finalized_h = Arc::clone(&finalized);
846 let tx = finalizer_tx.clone();
847 let rung = rungs[idx].clone();
848 let total_segments = total_segments;
849 let sink = Arc::clone(&sink);
850 finalizer_handles.push(tokio::spawn(async move {
851 loop {
852 let notified = rung_done_h[idx].notified();
853 if active_h[idx].load(Ordering::Acquire) == 0 {
854 break;
855 }
856 notified.await;
857 }
858 let mut chunks: Vec<ChunkPackets> = std::mem::take(&mut *collector.lock().unwrap());
859 if chunks.is_empty() {
860 finalized_h[idx].store(true, Ordering::Release);
861 let _ = tx.send((idx, Ok(None))).await;
862 return;
863 }
864 chunks.sort_by_key(|c| c.segment_idx);
865 chunks.dedup_by_key(|c| c.segment_idx);
866 let got = chunks.len();
868 let contiguous = chunks
869 .iter()
870 .enumerate()
871 .all(|(i, c)| c.segment_idx == i);
872 let result = if got != total_segments as usize || !contiguous {
873 Err(anyhow!(
874 "rung {} chunk coverage incomplete: expected {} contiguous chunks, got {}",
875 rung.label,
876 total_segments,
877 got
878 ))
879 } else {
880 let mut packets: Vec<EncodedPacket> = Vec::new();
881 for c in chunks {
882 packets.extend(c.packets);
883 }
884 let bytes: u64 = packets.iter().map(|p| p.data.len() as u64).sum();
885 report(
886 sink.as_ref(),
887 idx,
888 &rung,
889 RungStatus::Completed,
890 total_input_frames,
891 Some(total_input_frames),
892 got as u32,
893 bytes,
894 None,
895 );
896 Ok(Some(RungPackets {
897 rung_index: idx,
898 width: rung.width,
899 height: rung.height,
900 label: rung.label.clone(),
901 packets,
902 }))
903 };
904 finalized_h[idx].store(true, Ordering::Release);
905 let _ = tx.send((idx, result)).await;
906 }));
907 }
908 drop(finalizer_tx);
909
910 let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
911 indexed.sort_by_key(|(_, r)| r.short_side());
912
913 let mut frame_senders = Vec::with_capacity(n);
915 let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
916 Vec::with_capacity(n);
917 for _ in 0..n {
918 let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
919 frame_senders.push(tx);
920 frame_receivers.push(Some(rx));
921 }
922 let use_shared_pump = n <= params.gpu_pool.capacity();
923 let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
924 let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
925 codec_name: params.header.codec.clone(),
926 info_for_decoder: params.header.info.clone(),
927 source_color_metadata: params.source_color_metadata,
928 source_pixel_format: params.source_pixel_format,
929 needs_downsample: params.needs_downsample,
930 tonemap_to_sdr: params.tonemap_to_sdr,
931 gpu_index,
932 };
933 if use_shared_pump {
934 let cfg = make_pump_cfg(params.decode_gpu_for(0));
935 let senders = frame_senders;
936 let input = params.input.clone();
937 let rt = tokio::runtime::Handle::current();
938 pump_tasks.spawn(async move {
939 tokio::task::spawn_blocking(move || {
940 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
941 })
942 .await
943 .map_err(|e| anyhow!("shared pump join error: {e}"))
944 .and_then(|r| r)
945 });
946 } else {
947 for (idx, sender) in frame_senders.into_iter().enumerate() {
948 let cfg = make_pump_cfg(params.decode_gpu_for(idx));
949 let input = params.input.clone();
950 let rt = tokio::runtime::Handle::current();
951 pump_tasks.spawn(async move {
952 tokio::task::spawn_blocking(move || {
953 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
954 })
955 .await
956 .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
957 .and_then(|r| r)
958 });
959 }
960 }
961
962 let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
964 for (idx, rung) in rungs.iter().cloned().enumerate() {
965 let rx = frame_receivers[idx].take().expect("scaler rx slot");
966 let cfg = crate::rung_scaler::RungScalerConfig {
967 rung_idx: idx,
968 target_width: rung.width,
969 target_height: rung.height,
970 frames_per_chunk: params.keyframe_interval,
971 };
972 let queue = Arc::clone(&queues[idx]);
973 let rt = tokio::runtime::Handle::current();
974 let scaler_flag = Arc::clone(&scaler_active[idx]);
975 let active_h = Arc::clone(&active_workers);
976 let rung_done_h = Arc::clone(&rung_done);
977 scaler_flag.store(true, Ordering::Release);
978 active_h[idx].fetch_add(1, Ordering::AcqRel);
979 scaler_tasks.spawn(async move {
980 let result = tokio::task::spawn_blocking(move || {
981 crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
982 })
983 .await
984 .map_err(|e| anyhow!("scaler join error: {e}"))
985 .and_then(|r| r);
986 scaler_flag.store(false, Ordering::Release);
987 let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
988 if prev == 1 {
989 rung_done_h[idx].notify_one();
990 }
991 (idx, result)
992 });
993 }
994
995 let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
997 let ctx = WorkerCtx {
998 frame_rate: params.frame_rate,
999 output_color_metadata: params.output_color_metadata,
1000 output_pixel_format: params.output_pixel_format,
1001 timescale: params.timescale,
1002 per_frame_ticks: params.per_frame_ticks,
1003 keyframe_interval: params.keyframe_interval,
1004 segment_target_ticks: params.segment_target_ticks,
1005 output_root: params.output_root.clone(),
1006 constant_qp: params.constant_qp,
1007 };
1008 for (idx, rung) in indexed.iter().cloned() {
1009 let lease = match Arc::clone(¶ms.gpu_pool).claim().await {
1010 Some(l) => l,
1011 None => {
1012 progress_stop.store(true, Ordering::Release);
1013 let _ = progress_handle.await;
1014 bail!("multigpu single-file: GPU pool returned no lease; at least one GPU required");
1015 }
1016 };
1017 spawn_chunk_worker(
1018 &ctx,
1019 idx,
1020 &rung,
1021 Arc::clone(&queues[idx]),
1022 Arc::clone(&frames_encoded[idx]),
1023 lease,
1024 Arc::clone(&contributions[idx]),
1025 Arc::clone(&active_workers),
1026 Arc::clone(&rung_done),
1027 Arc::clone(&rung_invariants[idx]),
1028 Some(&mut worker_tasks),
1029 );
1030 }
1031
1032 let helper_cancel = Arc::new(AtomicBool::new(false));
1034 let helper_handle = {
1035 let cancel = Arc::clone(&helper_cancel);
1036 let pool = Arc::clone(¶ms.gpu_pool);
1037 let queues = queues.clone();
1038 let scaler_active = scaler_active.clone();
1039 let frames_encoded = frames_encoded.clone();
1040 let contributions = contributions.clone();
1041 let active_workers = Arc::clone(&active_workers);
1042 let rung_done = Arc::clone(&rung_done);
1043 let rung_invariants = rung_invariants.clone();
1044 let rungs_owned: Vec<Rung> = rungs.to_vec();
1045 let ctx = ctx.clone();
1046 tokio::spawn(async move {
1047 loop {
1048 if cancel.load(Ordering::Acquire) {
1049 break;
1050 }
1051 tokio::time::sleep(HELPER_POLL_INTERVAL).await;
1052 if pool.pending_claimers() > 0 {
1053 continue;
1054 }
1055 let mut target = None;
1056 for (idx, q) in queues.iter().enumerate() {
1057 let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
1058 let has_pending = q.pushed_segments() > q.popped_segments();
1059 if scaler_alive || has_pending {
1060 target = Some(idx);
1061 break;
1062 }
1063 }
1064 let Some(rung_idx) = target else { break };
1065 let lease = match pool.try_claim() {
1066 Some(l) => l,
1067 None => continue,
1068 };
1069 tracing::info!(rung_idx, gpu_index = lease.gpu_index, "single-file helper dispatch");
1070 spawn_chunk_worker(
1071 &ctx,
1072 rung_idx,
1073 &rungs_owned[rung_idx],
1074 Arc::clone(&queues[rung_idx]),
1075 Arc::clone(&frames_encoded[rung_idx]),
1076 lease,
1077 Arc::clone(&contributions[rung_idx]),
1078 Arc::clone(&active_workers),
1079 Arc::clone(&rung_done),
1080 Arc::clone(&rung_invariants[rung_idx]),
1081 None,
1082 );
1083 }
1084 })
1085 };
1086
1087 let mut completed: Vec<Option<RungPackets>> = (0..n).map(|_| None).collect();
1089 let mut pumps_remaining = pump_tasks.len();
1090 let mut scalers_remaining = n;
1091 let mut workers_remaining = n;
1092 let mut finalizers_remaining = n;
1093 macro_rules! teardown_err {
1094 ($e:expr) => {{
1095 helper_cancel.store(true, Ordering::Release);
1096 let _ = helper_handle.await;
1097 progress_stop.store(true, Ordering::Release);
1098 let _ = progress_handle.await;
1099 return Err($e);
1100 }};
1101 }
1102 while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
1103 tokio::select! {
1104 biased;
1105 p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
1106 Some(Ok(Ok(_))) => pumps_remaining -= 1,
1107 Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
1108 Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
1109 None => pumps_remaining = 0,
1110 },
1111 s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
1112 Some(Ok((_, Ok(_)))) => scalers_remaining -= 1,
1113 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
1114 Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
1115 None => scalers_remaining = 0,
1116 },
1117 w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
1118 Some(Ok((_, Ok(())))) => workers_remaining -= 1,
1119 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("chunk worker for rung {idx} failed: {e}")),
1120 Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
1121 None => workers_remaining = 0,
1122 },
1123 f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
1124 Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
1125 Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
1126 None => finalizers_remaining = 0,
1127 },
1128 }
1129 }
1130 helper_cancel.store(true, Ordering::Release);
1131 let _ = helper_handle.await;
1132 progress_stop.store(true, Ordering::Release);
1133 let _ = progress_handle.await;
1134 for h in finalizer_handles {
1135 let _ = h.await;
1136 }
1137 Ok(completed)
1138}
1139
1140#[allow(clippy::too_many_arguments)]
1141fn spawn_chunk_worker(
1142 ctx: &WorkerCtx,
1143 rung_idx: usize,
1144 rung: &Rung,
1145 queue: Arc<SegmentChunkQueue>,
1146 frames_encoded: Arc<AtomicU64>,
1147 lease: GpuLease,
1148 collector: Arc<std::sync::Mutex<Vec<ChunkPackets>>>,
1149 active_workers: Arc<Vec<AtomicUsize>>,
1150 rung_done: Arc<Vec<Notify>>,
1151 rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
1152 worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
1153) {
1154 let gpu_index = lease.gpu_index;
1155 let gpu_vendor = lease.vendor;
1156 let cfg = EncoderWorkerConfig {
1157 rung_idx,
1158 width: rung.width,
1159 height: rung.height,
1160 frame_rate: ctx.frame_rate,
1161 quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
1162 speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
1163 target: rung.quality.target,
1164 tier: rung.quality.tier,
1165 threads: 0,
1166 gpu_index: Some(gpu_index),
1167 gpu_vendor: Some(gpu_vendor),
1168 output_color_metadata: ctx.output_color_metadata,
1169 output_pixel_format: ctx.output_pixel_format,
1170 constant_qp: ctx.constant_qp,
1171 timescale: ctx.timescale,
1172 per_frame_ticks: ctx.per_frame_ticks,
1173 keyframe_interval: ctx.keyframe_interval,
1174 segment_target_ticks: ctx.segment_target_ticks,
1175 output_dir: ctx.output_root.clone(),
1176 rung_invariant,
1177 };
1178 active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
1179 let body = async move {
1180 let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
1181 let cfg_for_worker = cfg.clone();
1182 let queue_for_worker = Arc::clone(&queue);
1183 let rt = tokio::runtime::Handle::current();
1184 let counter = Arc::clone(&frames_encoded);
1185 let out = Arc::clone(&collector);
1186 let blocking = tokio::task::spawn_blocking(move || {
1187 run_chunk_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx, out)
1188 });
1189 let drain = async move { while progress_rx.recv().await.is_some() {} };
1190 let (_, br) = tokio::join!(drain, blocking);
1191 let task_status: Result<()> = match br {
1192 Ok(Ok(())) => Ok(()),
1193 Ok(Err(e)) => Err(e),
1194 Err(e) => Err(anyhow!("chunk worker join error: {e}")),
1195 };
1196 drop(lease);
1197 let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
1198 if prev == 1 {
1199 rung_done[rung_idx].notify_one();
1200 }
1201 (rung_idx, task_status)
1202 };
1203 match worker_tasks {
1204 Some(set) => {
1205 set.spawn(body);
1206 }
1207 None => {
1208 tokio::spawn(async move {
1209 let _ = body.await;
1210 });
1211 }
1212 }
1213}