1use std::path::PathBuf;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
33
34use anyhow::{Result, anyhow, bail};
35use bytes::Bytes;
36use codec::encode::AUTO_FROM_TARGET;
37use codec::frame::{ColorMetadata, PixelFormat};
38use container::cmaf::CmafTrackManifest;
39use container::streaming::DemuxHeader;
40use tokio::sync::{Notify, mpsc};
41use tokio::task::JoinSet;
42
43use codec::encode::EncodedPacket;
44
45use crate::cmaf_util::{RungContribution, merge_rung_contributions, total_segments_for_rung};
46use crate::decode_pump::DecodePumpConfig;
47use crate::encoder_worker::{
48 ChunkPackets, EncoderWorkerConfig, RungCodecInvariant, WorkerOutput,
49 run_chunk_encoder_worker_blocking, run_encoder_worker_blocking,
50};
51use crate::frame_queue::SegmentChunkQueue;
52use crate::gpu_pool::{GpuLease, GpuPool};
53use crate::progress::{ProgressSink, RungProgress, RungStatus};
54use crate::spec::{EncodePolicy, GpuFamily, Rung};
55
56const QUEUE_CAPACITY: usize = 2;
57const FANOUT_CHANNEL_CAPACITY: usize = 4;
58const HELPER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(200);
59const PROGRESS_TICK: std::time::Duration = std::time::Duration::from_millis(500);
60
61#[derive(Debug, Clone)]
63pub struct RungManifest {
64 pub rung_index: usize,
65 pub width: u32,
66 pub height: u32,
67 pub label: String,
68 pub relative_dir: String,
70 pub manifest: CmafTrackManifest,
71}
72
73pub struct MultiGpuParams<'a> {
75 pub input: Bytes,
76 pub rungs: &'a [Rung],
77 pub header: DemuxHeader,
78 pub source_color_metadata: ColorMetadata,
79 pub source_pixel_format: PixelFormat,
80 pub tonemap_to_sdr: bool,
82 pub output_color_metadata: ColorMetadata,
85 pub output_pixel_format: PixelFormat,
86 pub needs_downsample: bool,
87 pub filters: Arc<codec::filter::FilterChain>,
90 pub frame_rate: f64,
91 pub gpu_pool: Arc<GpuPool>,
92 pub gpu_indices: Vec<u32>,
97 pub decode_gpu: Option<u32>,
100 pub output_root: PathBuf,
101 pub timescale: u32,
102 pub per_frame_ticks: u32,
103 pub keyframe_interval: u32,
104 pub segment_target_ticks: u64,
105 pub total_input_frames: u64,
106 pub constant_qp: bool,
110}
111
112impl MultiGpuParams<'_> {
113 fn decode_gpu_for(&self, i: usize) -> Option<u32> {
117 if self.decode_gpu.is_some() {
118 return self.decode_gpu;
119 }
120 if self.gpu_indices.is_empty() {
121 return None;
122 }
123 Some(self.gpu_indices[i % self.gpu_indices.len()])
124 }
125}
126
127pub async fn run_multigpu_hls(
130 params: MultiGpuParams<'_>,
131 sink: Arc<dyn ProgressSink>,
132) -> Result<Vec<Option<RungManifest>>> {
133 let rungs = params.rungs;
134 let n = rungs.len();
135 if n == 0 {
136 return Ok(Vec::new());
137 }
138 let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
139 if total_segments == 0 {
140 bail!(
141 "multigpu: total_segments == 0 (total_input_frames={}, keyframe_interval={})",
142 params.total_input_frames,
143 params.keyframe_interval
144 );
145 }
146
147 {
153 let probe = codec::encode::EncoderConfig {
154 width: rungs[0].width,
155 height: rungs[0].height,
156 frame_rate: params.frame_rate,
157 gpu_index: None,
158 ..Default::default()
159 };
160 codec::encode::select_encoder(probe, None).map_err(|e| {
161 anyhow!(
162 "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
163 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature for a software encoder"
164 )
165 })?;
166 }
167
168 tracing::info!(
169 rungs = n,
170 total_segments,
171 gpu_pool_capacity = params.gpu_pool.capacity(),
172 "multi-GPU variant phase starting"
173 );
174
175 let queues: Vec<Arc<SegmentChunkQueue>> =
177 (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
178 let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
179 let scaler_active: Vec<Arc<AtomicBool>> =
180 (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
181 let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
182 (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
183 let contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>> =
184 Arc::new((0..n).map(|_| std::sync::Mutex::new(Vec::new())).collect());
185 let active_workers: Arc<Vec<AtomicUsize>> =
186 Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
187 let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
188 let finalized: Arc<Vec<AtomicBool>> =
189 Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
190
191 let progress_stop = Arc::new(AtomicBool::new(false));
193 let progress_handle = spawn_progress_reporter(
194 rungs.to_vec(),
195 frames_encoded.clone(),
196 finalized.clone(),
197 params.total_input_frames,
198 Arc::clone(&sink),
199 Arc::clone(&progress_stop),
200 );
201
202 let total_input_frames = params.total_input_frames;
204 let (finalizer_tx, mut finalizer_rx) =
205 mpsc::channel::<(usize, Result<Option<RungManifest>>)>(n.max(1));
206 let mut finalizer_handles = Vec::with_capacity(n);
207 for idx in 0..n {
208 let contributions_h = Arc::clone(&contributions);
209 let active_h = Arc::clone(&active_workers);
210 let rung_done_h = Arc::clone(&rung_done);
211 let finalized_h = Arc::clone(&finalized);
212 let tx = finalizer_tx.clone();
213 let rung = rungs[idx].clone();
214 let rel_dir = format!("video/{}", rung.label);
215 let output_root = params.output_root.clone();
216 let timescale = params.timescale;
217 let total_segments = total_segments;
218 let sink = Arc::clone(&sink);
219 finalizer_handles.push(tokio::spawn(async move {
220 loop {
222 let notified = rung_done_h[idx].notified();
223 if active_h[idx].load(Ordering::Acquire) == 0 {
224 break;
225 }
226 notified.await;
227 }
228 let outputs: Vec<WorkerOutput> = std::mem::take(&mut *contributions_h[idx].lock().unwrap());
229 if outputs.is_empty() {
230 finalized_h[idx].store(true, Ordering::Release);
231 let _ = tx.send((idx, Ok(None))).await;
232 return;
233 }
234 let init_path = output_root.join(&rel_dir).join("init.mp4");
235 let contribs: Vec<RungContribution> = outputs
236 .into_iter()
237 .map(|wo| RungContribution {
238 width: rung.width,
239 height: rung.height,
240 relative_dir: rel_dir.clone(),
241 manifest: CmafTrackManifest {
242 init_path: init_path.clone(),
243 segments: wo.segments,
244 timescale,
245 },
246 })
247 .collect();
248 let result = match merge_rung_contributions(contribs) {
249 Ok(merged) => {
250 let got = merged.manifest.segments.len();
251 if got != total_segments as usize {
252 Err(anyhow!(
253 "rung {} coverage incomplete: expected {} segments, got {}",
254 rung.label,
255 total_segments,
256 got
257 ))
258 } else {
259 let bytes: u64 = merged.manifest.segments.iter().map(|s| s.byte_size).sum();
260 report(
261 sink.as_ref(),
262 idx,
263 &rung,
264 RungStatus::Completed,
265 total_input_frames,
266 Some(total_input_frames),
267 got as u32,
268 bytes,
269 None,
270 );
271 Ok(Some(RungManifest {
272 rung_index: idx,
273 width: rung.width,
274 height: rung.height,
275 label: rung.label.clone(),
276 relative_dir: rel_dir.clone(),
277 manifest: merged.manifest,
278 }))
279 }
280 }
281 Err(e) => Err(anyhow!("merging contributions for rung {}: {e}", rung.label)),
282 };
283 finalized_h[idx].store(true, Ordering::Release);
284 let _ = tx.send((idx, result)).await;
285 }));
286 }
287 drop(finalizer_tx);
288
289 let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
291 indexed.sort_by_key(|(_, r)| r.short_side());
292
293 let mut frame_senders = Vec::with_capacity(n);
295 let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
296 Vec::with_capacity(n);
297 for _ in 0..n {
298 let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
299 frame_senders.push(tx);
300 frame_receivers.push(Some(rx));
301 }
302
303 let use_shared_pump = n <= params.gpu_pool.capacity();
304 let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
305 let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
306 codec_name: params.header.codec.clone(),
307 info_for_decoder: params.header.info.clone(),
308 source_color_metadata: params.source_color_metadata,
309 source_pixel_format: params.source_pixel_format,
310 needs_downsample: params.needs_downsample,
311 tonemap_to_sdr: params.tonemap_to_sdr,
312 gpu_index,
313 filters: params.filters.clone(),
314 };
315 if use_shared_pump {
316 let cfg = make_pump_cfg(params.decode_gpu_for(0));
317 let senders = frame_senders;
318 let input = params.input.clone();
319 let rt = tokio::runtime::Handle::current();
320 pump_tasks.spawn(async move {
321 tokio::task::spawn_blocking(move || {
322 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
323 })
324 .await
325 .map_err(|e| anyhow!("shared pump join error: {e}"))
326 .and_then(|r| r)
327 });
328 } else {
329 for (idx, sender) in frame_senders.into_iter().enumerate() {
330 let cfg = make_pump_cfg(params.decode_gpu_for(idx));
331 let input = params.input.clone();
332 let rt = tokio::runtime::Handle::current();
333 pump_tasks.spawn(async move {
334 tokio::task::spawn_blocking(move || {
335 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
336 })
337 .await
338 .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
339 .and_then(|r| r)
340 });
341 }
342 }
343
344 let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
346 for (idx, rung) in rungs.iter().cloned().enumerate() {
347 let rx = frame_receivers[idx].take().expect("scaler rx slot");
348 let cfg = crate::rung_scaler::RungScalerConfig {
349 rung_idx: idx,
350 target_width: rung.width,
351 target_height: rung.height,
352 frames_per_chunk: params.keyframe_interval,
353 };
354 let queue = Arc::clone(&queues[idx]);
355 let rt = tokio::runtime::Handle::current();
356 let scaler_flag = Arc::clone(&scaler_active[idx]);
357 let active_h = Arc::clone(&active_workers);
358 let rung_done_h = Arc::clone(&rung_done);
359 scaler_flag.store(true, Ordering::Release);
360 active_h[idx].fetch_add(1, Ordering::AcqRel);
361 scaler_tasks.spawn(async move {
362 let result = tokio::task::spawn_blocking(move || {
363 crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
364 })
365 .await
366 .map_err(|e| anyhow!("scaler join error: {e}"))
367 .and_then(|r| r);
368 scaler_flag.store(false, Ordering::Release);
369 let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
370 if prev == 1 {
371 rung_done_h[idx].notify_one();
372 }
373 (idx, result)
374 });
375 }
376
377 let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
379 let ctx = WorkerCtx {
380 frame_rate: params.frame_rate,
381 output_color_metadata: params.output_color_metadata,
382 output_pixel_format: params.output_pixel_format,
383 timescale: params.timescale,
384 per_frame_ticks: params.per_frame_ticks,
385 keyframe_interval: params.keyframe_interval,
386 segment_target_ticks: params.segment_target_ticks,
387 output_root: params.output_root.clone(),
388 constant_qp: params.constant_qp,
389 };
390 for (idx, rung) in indexed.iter().cloned() {
391 let lease = match Arc::clone(¶ms.gpu_pool).claim().await {
392 Some(l) => l,
393 None => {
394 progress_stop.store(true, Ordering::Release);
395 let _ = progress_handle.await;
396 bail!("multigpu: GPU pool returned no lease on a CPU-only host; at least one GPU is required");
397 }
398 };
399 spawn_encoder_worker(
400 &ctx,
401 idx,
402 &rung,
403 Arc::clone(&queues[idx]),
404 Arc::clone(&frames_encoded[idx]),
405 lease,
406 Arc::clone(&contributions),
407 Arc::clone(&active_workers),
408 Arc::clone(&rung_done),
409 Arc::clone(&rung_invariants[idx]),
410 Some(&mut worker_tasks),
411 );
412 }
413
414 let helper_cancel = Arc::new(AtomicBool::new(false));
416 let helper_handle = {
417 let cancel = Arc::clone(&helper_cancel);
418 let pool = Arc::clone(¶ms.gpu_pool);
419 let queues = queues.clone();
420 let scaler_active = scaler_active.clone();
421 let frames_encoded = frames_encoded.clone();
422 let contributions = Arc::clone(&contributions);
423 let active_workers = Arc::clone(&active_workers);
424 let rung_done = Arc::clone(&rung_done);
425 let rung_invariants = rung_invariants.clone();
426 let rungs_owned: Vec<Rung> = rungs.to_vec();
427 let ctx = ctx.clone();
428 tokio::spawn(async move {
429 loop {
430 if cancel.load(Ordering::Acquire) {
431 break;
432 }
433 tokio::time::sleep(HELPER_POLL_INTERVAL).await;
434 if pool.pending_claimers() > 0 {
435 continue;
436 }
437 let mut target = None;
438 for (idx, q) in queues.iter().enumerate() {
439 let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
440 let has_pending = q.pushed_segments() > q.popped_segments();
441 if scaler_alive || has_pending {
442 target = Some(idx);
443 break;
444 }
445 }
446 let Some(rung_idx) = target else { break };
447 let lease = match pool.try_claim() {
448 Some(l) => l,
449 None => continue,
450 };
451 tracing::info!(rung_idx, gpu_index = lease.gpu_index, "multigpu helper dispatch");
452 spawn_encoder_worker(
453 &ctx,
454 rung_idx,
455 &rungs_owned[rung_idx],
456 Arc::clone(&queues[rung_idx]),
457 Arc::clone(&frames_encoded[rung_idx]),
458 lease,
459 Arc::clone(&contributions),
460 Arc::clone(&active_workers),
461 Arc::clone(&rung_done),
462 Arc::clone(&rung_invariants[rung_idx]),
463 None,
464 );
465 }
466 })
467 };
468
469 let mut completed: Vec<Option<RungManifest>> = (0..n).map(|_| None).collect();
471 let mut pumps_remaining = pump_tasks.len();
472 let mut scalers_remaining = n;
473 let mut workers_remaining = n;
474 let mut finalizers_remaining = n;
475
476 macro_rules! teardown_err {
477 ($e:expr) => {{
478 helper_cancel.store(true, Ordering::Release);
479 let _ = helper_handle.await;
480 progress_stop.store(true, Ordering::Release);
481 let _ = progress_handle.await;
482 return Err($e);
483 }};
484 }
485
486 while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
487 tokio::select! {
488 biased;
489 p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
490 Some(Ok(Ok(n))) => { tracing::info!(frames = n, "decode pump finished"); pumps_remaining -= 1; }
491 Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
492 Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
493 None => pumps_remaining = 0,
494 },
495 s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
496 Some(Ok((idx, Ok(n)))) => { tracing::info!(idx, chunks = n, "scaler finished"); scalers_remaining -= 1; }
497 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
498 Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
499 None => scalers_remaining = 0,
500 },
501 w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
502 Some(Ok((idx, Ok(())))) => { tracing::info!(idx, "initial worker finished"); workers_remaining -= 1; }
503 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("worker for rung {idx} failed: {e}")),
504 Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
505 None => workers_remaining = 0,
506 },
507 f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
508 Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
509 Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
510 None => finalizers_remaining = 0,
511 },
512 }
513 }
514
515 helper_cancel.store(true, Ordering::Release);
516 let _ = helper_handle.await;
517 progress_stop.store(true, Ordering::Release);
518 let _ = progress_handle.await;
519 for h in finalizer_handles {
520 let _ = h.await;
521 }
522
523 Ok(completed)
524}
525
526#[derive(Clone)]
528struct WorkerCtx {
529 frame_rate: f64,
530 output_color_metadata: ColorMetadata,
531 output_pixel_format: PixelFormat,
532 timescale: u32,
533 per_frame_ticks: u32,
534 keyframe_interval: u32,
535 segment_target_ticks: u64,
536 output_root: PathBuf,
537 constant_qp: bool,
538}
539
540#[allow(clippy::too_many_arguments)]
541fn spawn_encoder_worker(
542 ctx: &WorkerCtx,
543 rung_idx: usize,
544 rung: &Rung,
545 queue: Arc<SegmentChunkQueue>,
546 frames_encoded: Arc<AtomicU64>,
547 lease: GpuLease,
548 contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>>,
549 active_workers: Arc<Vec<AtomicUsize>>,
550 rung_done: Arc<Vec<Notify>>,
551 rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
552 worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
553) {
554 let rel_dir = format!("video/{}", rung.label);
555 let output_dir = ctx.output_root.join(&rel_dir);
556 let gpu_index = lease.gpu_index;
557 let gpu_vendor = lease.vendor;
558
559 let cfg = EncoderWorkerConfig {
560 rung_idx,
561 width: rung.width,
562 height: rung.height,
563 frame_rate: ctx.frame_rate,
564 quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
565 speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
566 target: rung.quality.target,
567 tier: rung.quality.tier,
568 threads: 0,
569 gpu_index: Some(gpu_index),
570 gpu_vendor: Some(gpu_vendor),
571 output_color_metadata: ctx.output_color_metadata,
572 output_pixel_format: ctx.output_pixel_format,
573 constant_qp: ctx.constant_qp,
574 timescale: ctx.timescale,
575 per_frame_ticks: ctx.per_frame_ticks,
576 keyframe_interval: ctx.keyframe_interval,
577 segment_target_ticks: ctx.segment_target_ticks,
578 output_dir,
579 rung_invariant,
580 };
581
582 active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
583 let body = async move {
584 let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
585 let cfg_for_worker = cfg.clone();
586 let queue_for_worker = Arc::clone(&queue);
587 let rt = tokio::runtime::Handle::current();
588 let counter = Arc::clone(&frames_encoded);
589 let blocking = tokio::task::spawn_blocking(move || {
590 run_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx)
591 });
592 let drain = async move { while progress_rx.recv().await.is_some() {} };
596 let (_, br) = tokio::join!(drain, blocking);
597
598 let task_status: Result<()> = match br {
599 Ok(Ok(out)) => {
600 contributions[rung_idx].lock().unwrap().push(out);
601 Ok(())
602 }
603 Ok(Err(e)) => Err(e),
604 Err(e) => Err(anyhow!("worker join error: {e}")),
605 };
606 drop(lease);
607 let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
608 if prev == 1 {
609 rung_done[rung_idx].notify_one();
610 }
611 (rung_idx, task_status)
612 };
613
614 match worker_tasks {
615 Some(set) => {
616 set.spawn(body);
617 }
618 None => {
619 tokio::spawn(async move {
620 let _ = body.await;
621 });
622 }
623 }
624}
625
626fn spawn_progress_reporter(
629 rungs: Vec<Rung>,
630 frames_encoded: Vec<Arc<AtomicU64>>,
631 finalized: Arc<Vec<AtomicBool>>,
632 total_input_frames: u64,
633 sink: Arc<dyn ProgressSink>,
634 stop: Arc<AtomicBool>,
635) -> tokio::task::JoinHandle<()> {
636 tokio::spawn(async move {
637 loop {
638 if stop.load(Ordering::Acquire) {
639 break;
640 }
641 tokio::time::sleep(PROGRESS_TICK).await;
642 for (idx, rung) in rungs.iter().enumerate() {
643 if finalized[idx].load(Ordering::Acquire) {
644 continue;
645 }
646 let done = frames_encoded[idx].load(Ordering::Relaxed);
647 report(
648 sink.as_ref(),
649 idx,
650 rung,
651 RungStatus::Running,
652 done,
653 Some(total_input_frames),
654 0,
655 0,
656 None,
657 );
658 }
659 }
660 })
661}
662
663#[allow(clippy::too_many_arguments)]
664fn report(
665 sink: &dyn ProgressSink,
666 rung_index: usize,
667 rung: &Rung,
668 status: RungStatus,
669 frames_done: u64,
670 frames_total: Option<u64>,
671 segments: u32,
672 bytes_out: u64,
673 message: Option<String>,
674) {
675 let percent = match status {
676 RungStatus::Completed => 100.0,
677 RungStatus::Pending => 0.0,
678 _ => match frames_total {
679 Some(t) if t > 0 => ((frames_done as f32 / t as f32) * 100.0).min(99.0),
680 _ => 1.0,
681 },
682 };
683 sink.on_rung(RungProgress {
684 rung_index,
685 label: rung.label.clone(),
686 width: rung.width,
687 height: rung.height,
688 status,
689 percent,
690 frames_done,
691 frames_total,
692 segments_written: segments,
693 bytes_out,
694 message,
695 });
696}
697
698pub fn detect_gpu_pool() -> Arc<GpuPool> {
700 Arc::new(GpuPool::new(&codec::gpu::detect_gpus()))
701}
702
703fn policy_vendor(fam: GpuFamily) -> codec::gpu::GpuVendor {
704 match fam {
705 GpuFamily::Nvidia => codec::gpu::GpuVendor::Nvidia,
706 GpuFamily::Amd => codec::gpu::GpuVendor::Amd,
707 GpuFamily::Intel => codec::gpu::GpuVendor::Intel,
708 }
709}
710
711fn select_gpus_for_policy(policy: EncodePolicy) -> Vec<codec::gpu::GpuDevice> {
715 let gpus = codec::gpu::detect_gpus();
716 match policy {
717 EncodePolicy::AllGpus => gpus,
718 EncodePolicy::SingleGpu(None) => gpus.into_iter().take(1).collect(),
719 EncodePolicy::SingleGpu(Some(idx)) => gpus.into_iter().filter(|g| g.index == idx).collect(),
720 EncodePolicy::Family(fam) => {
721 let v = policy_vendor(fam);
722 gpus.into_iter().filter(|g| g.vendor == v).collect()
723 }
724 }
725}
726
727pub fn gpu_pool_for_policy(policy: EncodePolicy) -> Arc<GpuPool> {
731 Arc::new(GpuPool::new(&select_gpus_for_policy(policy)))
732}
733
734pub fn policy_gpu_indices(policy: EncodePolicy) -> Vec<u32> {
738 select_gpus_for_policy(policy).into_iter().map(|g| g.index).collect()
739}
740
741pub fn serial_gpu_for_policy(policy: EncodePolicy) -> Option<u32> {
745 match policy {
746 EncodePolicy::AllGpus => None,
747 EncodePolicy::SingleGpu(idx) => idx,
748 EncodePolicy::Family(_) => select_gpus_for_policy(policy).first().map(|g| g.index),
749 }
750}
751
752#[derive(Debug)]
759pub struct RungPackets {
760 pub rung_index: usize,
761 pub width: u32,
762 pub height: u32,
763 pub label: String,
764 pub packets: Vec<EncodedPacket>,
765}
766
767pub async fn run_multigpu_single_file(
774 params: MultiGpuParams<'_>,
775 sink: Arc<dyn ProgressSink>,
776) -> Result<Vec<Option<RungPackets>>> {
777 let rungs = params.rungs;
778 let n = rungs.len();
779 if n == 0 {
780 return Ok(Vec::new());
781 }
782 let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
783 if total_segments == 0 {
784 bail!(
785 "multigpu single-file: total_segments == 0 (frames={}, keyframe_interval={})",
786 params.total_input_frames,
787 params.keyframe_interval
788 );
789 }
790
791 {
793 let probe = codec::encode::EncoderConfig {
794 width: rungs[0].width,
795 height: rungs[0].height,
796 frame_rate: params.frame_rate,
797 gpu_index: None,
798 ..Default::default()
799 };
800 codec::encode::select_encoder(probe, None).map_err(|e| {
801 anyhow!(
802 "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
803 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature"
804 )
805 })?;
806 }
807
808 tracing::info!(
809 rungs = n,
810 total_segments,
811 gpu_pool_capacity = params.gpu_pool.capacity(),
812 "multi-GPU single-file phase starting"
813 );
814
815 let queues: Vec<Arc<SegmentChunkQueue>> =
816 (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
817 let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
818 let scaler_active: Vec<Arc<AtomicBool>> =
819 (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
820 let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
821 (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
822 let contributions: Vec<Arc<std::sync::Mutex<Vec<ChunkPackets>>>> =
824 (0..n).map(|_| Arc::new(std::sync::Mutex::new(Vec::new()))).collect();
825 let active_workers: Arc<Vec<AtomicUsize>> =
826 Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
827 let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
828 let finalized: Arc<Vec<AtomicBool>> = Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
829
830 let progress_stop = Arc::new(AtomicBool::new(false));
831 let progress_handle = spawn_progress_reporter(
832 rungs.to_vec(),
833 frames_encoded.clone(),
834 finalized.clone(),
835 params.total_input_frames,
836 Arc::clone(&sink),
837 Arc::clone(&progress_stop),
838 );
839
840 let total_input_frames = params.total_input_frames;
842 let (finalizer_tx, mut finalizer_rx) =
843 mpsc::channel::<(usize, Result<Option<RungPackets>>)>(n.max(1));
844 let mut finalizer_handles = Vec::with_capacity(n);
845 for idx in 0..n {
846 let collector = Arc::clone(&contributions[idx]);
847 let active_h = Arc::clone(&active_workers);
848 let rung_done_h = Arc::clone(&rung_done);
849 let finalized_h = Arc::clone(&finalized);
850 let tx = finalizer_tx.clone();
851 let rung = rungs[idx].clone();
852 let total_segments = total_segments;
853 let sink = Arc::clone(&sink);
854 finalizer_handles.push(tokio::spawn(async move {
855 loop {
856 let notified = rung_done_h[idx].notified();
857 if active_h[idx].load(Ordering::Acquire) == 0 {
858 break;
859 }
860 notified.await;
861 }
862 let mut chunks: Vec<ChunkPackets> = std::mem::take(&mut *collector.lock().unwrap());
863 if chunks.is_empty() {
864 finalized_h[idx].store(true, Ordering::Release);
865 let _ = tx.send((idx, Ok(None))).await;
866 return;
867 }
868 chunks.sort_by_key(|c| c.segment_idx);
869 chunks.dedup_by_key(|c| c.segment_idx);
870 let got = chunks.len();
872 let contiguous = chunks
873 .iter()
874 .enumerate()
875 .all(|(i, c)| c.segment_idx == i);
876 let result = if got != total_segments as usize || !contiguous {
877 Err(anyhow!(
878 "rung {} chunk coverage incomplete: expected {} contiguous chunks, got {}",
879 rung.label,
880 total_segments,
881 got
882 ))
883 } else {
884 let mut packets: Vec<EncodedPacket> = Vec::new();
885 for c in chunks {
886 packets.extend(c.packets);
887 }
888 let bytes: u64 = packets.iter().map(|p| p.data.len() as u64).sum();
889 report(
890 sink.as_ref(),
891 idx,
892 &rung,
893 RungStatus::Completed,
894 total_input_frames,
895 Some(total_input_frames),
896 got as u32,
897 bytes,
898 None,
899 );
900 Ok(Some(RungPackets {
901 rung_index: idx,
902 width: rung.width,
903 height: rung.height,
904 label: rung.label.clone(),
905 packets,
906 }))
907 };
908 finalized_h[idx].store(true, Ordering::Release);
909 let _ = tx.send((idx, result)).await;
910 }));
911 }
912 drop(finalizer_tx);
913
914 let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
915 indexed.sort_by_key(|(_, r)| r.short_side());
916
917 let mut frame_senders = Vec::with_capacity(n);
919 let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
920 Vec::with_capacity(n);
921 for _ in 0..n {
922 let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
923 frame_senders.push(tx);
924 frame_receivers.push(Some(rx));
925 }
926 let use_shared_pump = n <= params.gpu_pool.capacity();
927 let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
928 let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
929 codec_name: params.header.codec.clone(),
930 info_for_decoder: params.header.info.clone(),
931 source_color_metadata: params.source_color_metadata,
932 source_pixel_format: params.source_pixel_format,
933 needs_downsample: params.needs_downsample,
934 tonemap_to_sdr: params.tonemap_to_sdr,
935 gpu_index,
936 filters: params.filters.clone(),
937 };
938 if use_shared_pump {
939 let cfg = make_pump_cfg(params.decode_gpu_for(0));
940 let senders = frame_senders;
941 let input = params.input.clone();
942 let rt = tokio::runtime::Handle::current();
943 pump_tasks.spawn(async move {
944 tokio::task::spawn_blocking(move || {
945 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
946 })
947 .await
948 .map_err(|e| anyhow!("shared pump join error: {e}"))
949 .and_then(|r| r)
950 });
951 } else {
952 for (idx, sender) in frame_senders.into_iter().enumerate() {
953 let cfg = make_pump_cfg(params.decode_gpu_for(idx));
954 let input = params.input.clone();
955 let rt = tokio::runtime::Handle::current();
956 pump_tasks.spawn(async move {
957 tokio::task::spawn_blocking(move || {
958 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
959 })
960 .await
961 .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
962 .and_then(|r| r)
963 });
964 }
965 }
966
967 let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
969 for (idx, rung) in rungs.iter().cloned().enumerate() {
970 let rx = frame_receivers[idx].take().expect("scaler rx slot");
971 let cfg = crate::rung_scaler::RungScalerConfig {
972 rung_idx: idx,
973 target_width: rung.width,
974 target_height: rung.height,
975 frames_per_chunk: params.keyframe_interval,
976 };
977 let queue = Arc::clone(&queues[idx]);
978 let rt = tokio::runtime::Handle::current();
979 let scaler_flag = Arc::clone(&scaler_active[idx]);
980 let active_h = Arc::clone(&active_workers);
981 let rung_done_h = Arc::clone(&rung_done);
982 scaler_flag.store(true, Ordering::Release);
983 active_h[idx].fetch_add(1, Ordering::AcqRel);
984 scaler_tasks.spawn(async move {
985 let result = tokio::task::spawn_blocking(move || {
986 crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
987 })
988 .await
989 .map_err(|e| anyhow!("scaler join error: {e}"))
990 .and_then(|r| r);
991 scaler_flag.store(false, Ordering::Release);
992 let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
993 if prev == 1 {
994 rung_done_h[idx].notify_one();
995 }
996 (idx, result)
997 });
998 }
999
1000 let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
1002 let ctx = WorkerCtx {
1003 frame_rate: params.frame_rate,
1004 output_color_metadata: params.output_color_metadata,
1005 output_pixel_format: params.output_pixel_format,
1006 timescale: params.timescale,
1007 per_frame_ticks: params.per_frame_ticks,
1008 keyframe_interval: params.keyframe_interval,
1009 segment_target_ticks: params.segment_target_ticks,
1010 output_root: params.output_root.clone(),
1011 constant_qp: params.constant_qp,
1012 };
1013 for (idx, rung) in indexed.iter().cloned() {
1014 let lease = match Arc::clone(¶ms.gpu_pool).claim().await {
1015 Some(l) => l,
1016 None => {
1017 progress_stop.store(true, Ordering::Release);
1018 let _ = progress_handle.await;
1019 bail!("multigpu single-file: GPU pool returned no lease; at least one GPU required");
1020 }
1021 };
1022 spawn_chunk_worker(
1023 &ctx,
1024 idx,
1025 &rung,
1026 Arc::clone(&queues[idx]),
1027 Arc::clone(&frames_encoded[idx]),
1028 lease,
1029 Arc::clone(&contributions[idx]),
1030 Arc::clone(&active_workers),
1031 Arc::clone(&rung_done),
1032 Arc::clone(&rung_invariants[idx]),
1033 Some(&mut worker_tasks),
1034 );
1035 }
1036
1037 let helper_cancel = Arc::new(AtomicBool::new(false));
1039 let helper_handle = {
1040 let cancel = Arc::clone(&helper_cancel);
1041 let pool = Arc::clone(¶ms.gpu_pool);
1042 let queues = queues.clone();
1043 let scaler_active = scaler_active.clone();
1044 let frames_encoded = frames_encoded.clone();
1045 let contributions = contributions.clone();
1046 let active_workers = Arc::clone(&active_workers);
1047 let rung_done = Arc::clone(&rung_done);
1048 let rung_invariants = rung_invariants.clone();
1049 let rungs_owned: Vec<Rung> = rungs.to_vec();
1050 let ctx = ctx.clone();
1051 tokio::spawn(async move {
1052 loop {
1053 if cancel.load(Ordering::Acquire) {
1054 break;
1055 }
1056 tokio::time::sleep(HELPER_POLL_INTERVAL).await;
1057 if pool.pending_claimers() > 0 {
1058 continue;
1059 }
1060 let mut target = None;
1061 for (idx, q) in queues.iter().enumerate() {
1062 let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
1063 let has_pending = q.pushed_segments() > q.popped_segments();
1064 if scaler_alive || has_pending {
1065 target = Some(idx);
1066 break;
1067 }
1068 }
1069 let Some(rung_idx) = target else { break };
1070 let lease = match pool.try_claim() {
1071 Some(l) => l,
1072 None => continue,
1073 };
1074 tracing::info!(rung_idx, gpu_index = lease.gpu_index, "single-file helper dispatch");
1075 spawn_chunk_worker(
1076 &ctx,
1077 rung_idx,
1078 &rungs_owned[rung_idx],
1079 Arc::clone(&queues[rung_idx]),
1080 Arc::clone(&frames_encoded[rung_idx]),
1081 lease,
1082 Arc::clone(&contributions[rung_idx]),
1083 Arc::clone(&active_workers),
1084 Arc::clone(&rung_done),
1085 Arc::clone(&rung_invariants[rung_idx]),
1086 None,
1087 );
1088 }
1089 })
1090 };
1091
1092 let mut completed: Vec<Option<RungPackets>> = (0..n).map(|_| None).collect();
1094 let mut pumps_remaining = pump_tasks.len();
1095 let mut scalers_remaining = n;
1096 let mut workers_remaining = n;
1097 let mut finalizers_remaining = n;
1098 macro_rules! teardown_err {
1099 ($e:expr) => {{
1100 helper_cancel.store(true, Ordering::Release);
1101 let _ = helper_handle.await;
1102 progress_stop.store(true, Ordering::Release);
1103 let _ = progress_handle.await;
1104 return Err($e);
1105 }};
1106 }
1107 while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
1108 tokio::select! {
1109 biased;
1110 p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
1111 Some(Ok(Ok(_))) => pumps_remaining -= 1,
1112 Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
1113 Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
1114 None => pumps_remaining = 0,
1115 },
1116 s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
1117 Some(Ok((_, Ok(_)))) => scalers_remaining -= 1,
1118 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
1119 Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
1120 None => scalers_remaining = 0,
1121 },
1122 w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
1123 Some(Ok((_, Ok(())))) => workers_remaining -= 1,
1124 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("chunk worker for rung {idx} failed: {e}")),
1125 Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
1126 None => workers_remaining = 0,
1127 },
1128 f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
1129 Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
1130 Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
1131 None => finalizers_remaining = 0,
1132 },
1133 }
1134 }
1135 helper_cancel.store(true, Ordering::Release);
1136 let _ = helper_handle.await;
1137 progress_stop.store(true, Ordering::Release);
1138 let _ = progress_handle.await;
1139 for h in finalizer_handles {
1140 let _ = h.await;
1141 }
1142 Ok(completed)
1143}
1144
1145#[allow(clippy::too_many_arguments)]
1146fn spawn_chunk_worker(
1147 ctx: &WorkerCtx,
1148 rung_idx: usize,
1149 rung: &Rung,
1150 queue: Arc<SegmentChunkQueue>,
1151 frames_encoded: Arc<AtomicU64>,
1152 lease: GpuLease,
1153 collector: Arc<std::sync::Mutex<Vec<ChunkPackets>>>,
1154 active_workers: Arc<Vec<AtomicUsize>>,
1155 rung_done: Arc<Vec<Notify>>,
1156 rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
1157 worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
1158) {
1159 let gpu_index = lease.gpu_index;
1160 let gpu_vendor = lease.vendor;
1161 let cfg = EncoderWorkerConfig {
1162 rung_idx,
1163 width: rung.width,
1164 height: rung.height,
1165 frame_rate: ctx.frame_rate,
1166 quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
1167 speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
1168 target: rung.quality.target,
1169 tier: rung.quality.tier,
1170 threads: 0,
1171 gpu_index: Some(gpu_index),
1172 gpu_vendor: Some(gpu_vendor),
1173 output_color_metadata: ctx.output_color_metadata,
1174 output_pixel_format: ctx.output_pixel_format,
1175 constant_qp: ctx.constant_qp,
1176 timescale: ctx.timescale,
1177 per_frame_ticks: ctx.per_frame_ticks,
1178 keyframe_interval: ctx.keyframe_interval,
1179 segment_target_ticks: ctx.segment_target_ticks,
1180 output_dir: ctx.output_root.clone(),
1181 rung_invariant,
1182 };
1183 active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
1184 let body = async move {
1185 let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
1186 let cfg_for_worker = cfg.clone();
1187 let queue_for_worker = Arc::clone(&queue);
1188 let rt = tokio::runtime::Handle::current();
1189 let counter = Arc::clone(&frames_encoded);
1190 let out = Arc::clone(&collector);
1191 let blocking = tokio::task::spawn_blocking(move || {
1192 run_chunk_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx, out)
1193 });
1194 let drain = async move { while progress_rx.recv().await.is_some() {} };
1195 let (_, br) = tokio::join!(drain, blocking);
1196 let task_status: Result<()> = match br {
1197 Ok(Ok(())) => Ok(()),
1198 Ok(Err(e)) => Err(e),
1199 Err(e) => Err(anyhow!("chunk worker join error: {e}")),
1200 };
1201 drop(lease);
1202 let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
1203 if prev == 1 {
1204 rung_done[rung_idx].notify_one();
1205 }
1206 (rung_idx, task_status)
1207 };
1208 match worker_tasks {
1209 Some(set) => {
1210 set.spawn(body);
1211 }
1212 None => {
1213 tokio::spawn(async move {
1214 let _ = body.await;
1215 });
1216 }
1217 }
1218}