1use std::path::PathBuf;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
33
34use anyhow::{Result, anyhow, bail};
35use bytes::Bytes;
36use codec::encode::AUTO_FROM_TARGET;
37use codec::frame::{ColorMetadata, PixelFormat};
38use container::cmaf::CmafTrackManifest;
39use container::streaming::DemuxHeader;
40use tokio::sync::{Notify, mpsc};
41use tokio::task::JoinSet;
42
43use codec::encode::EncodedPacket;
44
45use crate::cmaf_util::{RungContribution, merge_rung_contributions, total_segments_for_rung};
46use crate::decode_pump::DecodePumpConfig;
47use crate::encoder_worker::{
48 ChunkPackets, EncoderWorkerConfig, RungCodecInvariant, WorkerOutput,
49 run_chunk_encoder_worker_blocking, run_encoder_worker_blocking,
50};
51use crate::frame_queue::SegmentChunkQueue;
52use crate::gpu_pool::{GpuLease, GpuPool};
53use crate::progress::{ProgressSink, RungProgress, RungStatus};
54use crate::spec::{EncodePolicy, GpuFamily, Rung};
55
56const QUEUE_CAPACITY: usize = 2;
57const FANOUT_CHANNEL_CAPACITY: usize = 4;
58const HELPER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(200);
59const PROGRESS_TICK: std::time::Duration = std::time::Duration::from_millis(500);
60
61#[derive(Debug, Clone)]
63pub struct RungManifest {
64 pub rung_index: usize,
65 pub width: u32,
66 pub height: u32,
67 pub label: String,
68 pub relative_dir: String,
70 pub manifest: CmafTrackManifest,
71}
72
73pub struct MultiGpuParams<'a> {
75 pub input: Bytes,
76 pub rungs: &'a [Rung],
77 pub header: DemuxHeader,
78 pub source_color_metadata: ColorMetadata,
79 pub source_pixel_format: PixelFormat,
80 pub tonemap_to_sdr: bool,
82 pub output_color_metadata: ColorMetadata,
85 pub output_pixel_format: PixelFormat,
86 pub needs_downsample: bool,
87 pub filters: Arc<codec::filter::FilterChain>,
90 pub frame_rate: f64,
91 pub gpu_pool: Arc<GpuPool>,
92 pub gpu_indices: Vec<u32>,
97 pub decode_gpu: Option<u32>,
100 pub output_root: PathBuf,
101 pub timescale: u32,
102 pub per_frame_ticks: u32,
103 pub keyframe_interval: u32,
104 pub segment_target_ticks: u64,
105 pub total_input_frames: u64,
106 pub constant_qp: bool,
110}
111
112impl MultiGpuParams<'_> {
113 fn decode_gpu_for(&self, i: usize) -> Option<u32> {
117 if self.decode_gpu.is_some() {
118 return self.decode_gpu;
119 }
120 if self.gpu_indices.is_empty() {
121 return None;
122 }
123 Some(self.gpu_indices[i % self.gpu_indices.len()])
124 }
125}
126
127pub async fn run_multigpu_hls(
130 params: MultiGpuParams<'_>,
131 sink: Arc<dyn ProgressSink>,
132) -> Result<Vec<Option<RungManifest>>> {
133 let rungs = params.rungs;
134 let n = rungs.len();
135 if n == 0 {
136 return Ok(Vec::new());
137 }
138 let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
139 if total_segments == 0 {
140 bail!(
141 "multigpu: total_segments == 0 (total_input_frames={}, keyframe_interval={})",
142 params.total_input_frames,
143 params.keyframe_interval
144 );
145 }
146
147 {
153 let probe = codec::encode::EncoderConfig {
154 width: rungs[0].width,
155 height: rungs[0].height,
156 frame_rate: params.frame_rate,
157 gpu_index: None,
158 ..Default::default()
159 };
160 codec::encode::select_encoder(probe, None).map_err(|e| {
161 anyhow!(
162 "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
163 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature for a software encoder"
164 )
165 })?;
166 }
167
168 tracing::info!(
169 rungs = n,
170 total_segments,
171 gpu_pool_capacity = params.gpu_pool.capacity(),
172 "multi-GPU variant phase starting"
173 );
174
175 let queues: Vec<Arc<SegmentChunkQueue>> =
177 (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
178 let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
179 let scaler_active: Vec<Arc<AtomicBool>> =
180 (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
181 let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
182 (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
183 let contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>> =
184 Arc::new((0..n).map(|_| std::sync::Mutex::new(Vec::new())).collect());
185 let active_workers: Arc<Vec<AtomicUsize>> =
186 Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
187 let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
188 let finalized: Arc<Vec<AtomicBool>> =
189 Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
190
191 let progress_stop = Arc::new(AtomicBool::new(false));
193 let progress_handle = spawn_progress_reporter(
194 rungs.to_vec(),
195 frames_encoded.clone(),
196 finalized.clone(),
197 params.total_input_frames,
198 Arc::clone(&sink),
199 Arc::clone(&progress_stop),
200 );
201
202 let total_input_frames = params.total_input_frames;
204 let (finalizer_tx, mut finalizer_rx) =
205 mpsc::channel::<(usize, Result<Option<RungManifest>>)>(n.max(1));
206 let mut finalizer_handles = Vec::with_capacity(n);
207 for idx in 0..n {
208 let contributions_h = Arc::clone(&contributions);
209 let active_h = Arc::clone(&active_workers);
210 let rung_done_h = Arc::clone(&rung_done);
211 let finalized_h = Arc::clone(&finalized);
212 let tx = finalizer_tx.clone();
213 let rung = rungs[idx].clone();
214 let rel_dir = format!("video/{}", rung.label);
215 let output_root = params.output_root.clone();
216 let timescale = params.timescale;
217 let total_segments = total_segments;
218 let sink = Arc::clone(&sink);
219 finalizer_handles.push(tokio::spawn(async move {
220 loop {
222 let notified = rung_done_h[idx].notified();
223 if active_h[idx].load(Ordering::Acquire) == 0 {
224 break;
225 }
226 notified.await;
227 }
228 let outputs: Vec<WorkerOutput> = std::mem::take(&mut *contributions_h[idx].lock().unwrap());
229 if outputs.is_empty() {
230 finalized_h[idx].store(true, Ordering::Release);
231 let _ = tx.send((idx, Ok(None))).await;
232 return;
233 }
234 let init_path = output_root.join(&rel_dir).join("init.mp4");
235 let contribs: Vec<RungContribution> = outputs
236 .into_iter()
237 .map(|wo| RungContribution {
238 width: rung.width,
239 height: rung.height,
240 relative_dir: rel_dir.clone(),
241 manifest: CmafTrackManifest {
242 init_path: init_path.clone(),
243 segments: wo.segments,
244 timescale,
245 },
246 })
247 .collect();
248 let result = match merge_rung_contributions(contribs) {
249 Ok(merged) => {
250 let got = merged.manifest.segments.len();
251 if got != total_segments as usize {
252 Err(anyhow!(
253 "rung {} coverage incomplete: expected {} segments, got {}",
254 rung.label,
255 total_segments,
256 got
257 ))
258 } else {
259 let bytes: u64 = merged.manifest.segments.iter().map(|s| s.byte_size).sum();
260 report(
261 sink.as_ref(),
262 idx,
263 &rung,
264 RungStatus::Completed,
265 total_input_frames,
266 Some(total_input_frames),
267 got as u32,
268 bytes,
269 None,
270 );
271 Ok(Some(RungManifest {
272 rung_index: idx,
273 width: rung.width,
274 height: rung.height,
275 label: rung.label.clone(),
276 relative_dir: rel_dir.clone(),
277 manifest: merged.manifest,
278 }))
279 }
280 }
281 Err(e) => Err(anyhow!("merging contributions for rung {}: {e}", rung.label)),
282 };
283 finalized_h[idx].store(true, Ordering::Release);
284 let _ = tx.send((idx, result)).await;
285 }));
286 }
287 drop(finalizer_tx);
288
289 let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
291 indexed.sort_by_key(|(_, r)| r.short_side());
292
293 let mut frame_senders = Vec::with_capacity(n);
295 let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
296 Vec::with_capacity(n);
297 for _ in 0..n {
298 let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
299 frame_senders.push(tx);
300 frame_receivers.push(Some(rx));
301 }
302
303 let use_shared_pump = n <= params.gpu_pool.capacity();
304 let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
305 let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
306 codec_name: params.header.codec.clone(),
307 info_for_decoder: params.header.info.clone(),
308 source_color_metadata: params.source_color_metadata,
309 source_pixel_format: params.source_pixel_format,
310 needs_downsample: params.needs_downsample,
311 tonemap_to_sdr: params.tonemap_to_sdr,
312 gpu_index,
313 filters: params.filters.clone(),
314 };
315 if use_shared_pump {
316 let cfg = make_pump_cfg(params.decode_gpu_for(0));
317 let senders = frame_senders;
318 let input = params.input.clone();
319 let rt = tokio::runtime::Handle::current();
320 pump_tasks.spawn(async move {
321 tokio::task::spawn_blocking(move || {
322 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
323 })
324 .await
325 .map_err(|e| anyhow!("shared pump join error: {e}"))
326 .and_then(|r| r)
327 });
328 } else {
329 for (idx, sender) in frame_senders.into_iter().enumerate() {
330 let cfg = make_pump_cfg(params.decode_gpu_for(idx));
331 let input = params.input.clone();
332 let rt = tokio::runtime::Handle::current();
333 pump_tasks.spawn(async move {
334 tokio::task::spawn_blocking(move || {
335 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
336 })
337 .await
338 .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
339 .and_then(|r| r)
340 });
341 }
342 }
343
344 let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
346 for (idx, rung) in rungs.iter().cloned().enumerate() {
347 let rx = frame_receivers[idx].take().expect("scaler rx slot");
348 let cfg = crate::rung_scaler::RungScalerConfig {
349 rung_idx: idx,
350 target_width: rung.width,
351 target_height: rung.height,
352 frames_per_chunk: params.keyframe_interval,
353 };
354 let queue = Arc::clone(&queues[idx]);
355 let rt = tokio::runtime::Handle::current();
356 let scaler_flag = Arc::clone(&scaler_active[idx]);
357 let active_h = Arc::clone(&active_workers);
358 let rung_done_h = Arc::clone(&rung_done);
359 scaler_flag.store(true, Ordering::Release);
360 active_h[idx].fetch_add(1, Ordering::AcqRel);
361 scaler_tasks.spawn(async move {
362 let result = tokio::task::spawn_blocking(move || {
363 crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
364 })
365 .await
366 .map_err(|e| anyhow!("scaler join error: {e}"))
367 .and_then(|r| r);
368 scaler_flag.store(false, Ordering::Release);
369 let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
370 if prev == 1 {
371 rung_done_h[idx].notify_one();
372 }
373 (idx, result)
374 });
375 }
376
377 let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
379 let ctx = WorkerCtx {
380 frame_rate: params.frame_rate,
381 output_color_metadata: params.output_color_metadata,
382 output_pixel_format: params.output_pixel_format,
383 timescale: params.timescale,
384 per_frame_ticks: params.per_frame_ticks,
385 keyframe_interval: params.keyframe_interval,
386 segment_target_ticks: params.segment_target_ticks,
387 output_root: params.output_root.clone(),
388 constant_qp: params.constant_qp,
389 };
390 for (idx, rung) in indexed.iter().cloned() {
391 let lease = match Arc::clone(¶ms.gpu_pool).claim().await {
392 Some(l) => l,
393 None => {
394 progress_stop.store(true, Ordering::Release);
395 let _ = progress_handle.await;
396 bail!("multigpu: GPU pool returned no lease on a CPU-only host; at least one GPU is required");
397 }
398 };
399 spawn_encoder_worker(
400 &ctx,
401 idx,
402 &rung,
403 Arc::clone(&queues[idx]),
404 Arc::clone(&frames_encoded[idx]),
405 lease,
406 Arc::clone(&contributions),
407 Arc::clone(&active_workers),
408 Arc::clone(&rung_done),
409 Arc::clone(&rung_invariants[idx]),
410 Some(&mut worker_tasks),
411 );
412 }
413
414 let helper_cancel = Arc::new(AtomicBool::new(false));
416 let helper_handle = {
417 let cancel = Arc::clone(&helper_cancel);
418 let pool = Arc::clone(¶ms.gpu_pool);
419 let queues = queues.clone();
420 let scaler_active = scaler_active.clone();
421 let frames_encoded = frames_encoded.clone();
422 let contributions = Arc::clone(&contributions);
423 let active_workers = Arc::clone(&active_workers);
424 let rung_done = Arc::clone(&rung_done);
425 let rung_invariants = rung_invariants.clone();
426 let rungs_owned: Vec<Rung> = rungs.to_vec();
427 let ctx = ctx.clone();
428 tokio::spawn(async move {
429 loop {
430 if cancel.load(Ordering::Acquire) {
431 break;
432 }
433 tokio::time::sleep(HELPER_POLL_INTERVAL).await;
434 if pool.pending_claimers() > 0 {
435 continue;
436 }
437 let mut target = None;
438 for (idx, q) in queues.iter().enumerate() {
439 let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
440 let has_pending = q.pushed_segments() > q.popped_segments();
441 if scaler_alive || has_pending {
442 target = Some(idx);
443 break;
444 }
445 }
446 let Some(rung_idx) = target else { break };
447 let lease = match pool.try_claim() {
448 Some(l) => l,
449 None => continue,
450 };
451 tracing::info!(rung_idx, gpu_index = lease.gpu_index, "multigpu helper dispatch");
452 spawn_encoder_worker(
453 &ctx,
454 rung_idx,
455 &rungs_owned[rung_idx],
456 Arc::clone(&queues[rung_idx]),
457 Arc::clone(&frames_encoded[rung_idx]),
458 lease,
459 Arc::clone(&contributions),
460 Arc::clone(&active_workers),
461 Arc::clone(&rung_done),
462 Arc::clone(&rung_invariants[rung_idx]),
463 None,
464 );
465 }
466 })
467 };
468
469 let mut completed: Vec<Option<RungManifest>> = (0..n).map(|_| None).collect();
471 let mut pumps_remaining = pump_tasks.len();
472 let mut scalers_remaining = n;
473 let mut workers_remaining = n;
474 let mut finalizers_remaining = n;
475
476 macro_rules! teardown_err {
477 ($e:expr) => {{
478 helper_cancel.store(true, Ordering::Release);
479 let _ = helper_handle.await;
480 progress_stop.store(true, Ordering::Release);
481 let _ = progress_handle.await;
482 return Err($e);
483 }};
484 }
485
486 while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
487 tokio::select! {
488 biased;
489 p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
490 Some(Ok(Ok(n))) => { tracing::info!(frames = n, "decode pump finished"); pumps_remaining -= 1; }
491 Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
492 Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
493 None => pumps_remaining = 0,
494 },
495 s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
496 Some(Ok((idx, Ok(n)))) => { tracing::info!(idx, chunks = n, "scaler finished"); scalers_remaining -= 1; }
497 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
498 Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
499 None => scalers_remaining = 0,
500 },
501 w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
502 Some(Ok((idx, Ok(())))) => { tracing::info!(idx, "initial worker finished"); workers_remaining -= 1; }
503 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("worker for rung {idx} failed: {e}")),
504 Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
505 None => workers_remaining = 0,
506 },
507 f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
508 Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
509 Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
510 None => finalizers_remaining = 0,
511 },
512 }
513 }
514
515 helper_cancel.store(true, Ordering::Release);
516 let _ = helper_handle.await;
517 progress_stop.store(true, Ordering::Release);
518 let _ = progress_handle.await;
519 for h in finalizer_handles {
520 let _ = h.await;
521 }
522
523 Ok(completed)
524}
525
526#[derive(Clone)]
528struct WorkerCtx {
529 frame_rate: f64,
530 output_color_metadata: ColorMetadata,
531 output_pixel_format: PixelFormat,
532 timescale: u32,
533 per_frame_ticks: u32,
534 keyframe_interval: u32,
535 segment_target_ticks: u64,
536 output_root: PathBuf,
537 constant_qp: bool,
538}
539
540#[allow(clippy::too_many_arguments)]
541fn spawn_encoder_worker(
542 ctx: &WorkerCtx,
543 rung_idx: usize,
544 rung: &Rung,
545 queue: Arc<SegmentChunkQueue>,
546 frames_encoded: Arc<AtomicU64>,
547 lease: GpuLease,
548 contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>>,
549 active_workers: Arc<Vec<AtomicUsize>>,
550 rung_done: Arc<Vec<Notify>>,
551 rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
552 worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
553) {
554 let rel_dir = format!("video/{}", rung.label);
555 let output_dir = ctx.output_root.join(&rel_dir);
556 let gpu_index = lease.gpu_index;
557 let gpu_vendor = lease.vendor;
558
559 let cfg = EncoderWorkerConfig {
560 rung_idx,
561 width: rung.width,
562 height: rung.height,
563 frame_rate: ctx.frame_rate,
564 quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
565 speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
566 target: rung.quality.target,
567 tier: rung.quality.tier,
568 threads: 0,
569 gpu_index: Some(gpu_index),
570 gpu_vendor: Some(gpu_vendor),
571 output_color_metadata: ctx.output_color_metadata,
572 output_pixel_format: ctx.output_pixel_format,
573 constant_qp: ctx.constant_qp,
574 timescale: ctx.timescale,
575 per_frame_ticks: ctx.per_frame_ticks,
576 keyframe_interval: ctx.keyframe_interval,
577 segment_target_ticks: ctx.segment_target_ticks,
578 output_dir,
579 rung_invariant,
580 };
581
582 active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
583 let body = async move {
584 let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
585 let cfg_for_worker = cfg.clone();
586 let queue_for_worker = Arc::clone(&queue);
587 let rt = tokio::runtime::Handle::current();
588 let counter = Arc::clone(&frames_encoded);
589 let blocking = tokio::task::spawn_blocking(move || {
590 run_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx)
591 });
592 let drain = async move { while progress_rx.recv().await.is_some() {} };
596 let (_, br) = tokio::join!(drain, blocking);
597
598 let task_status: Result<()> = match br {
599 Ok(Ok(out)) => {
600 contributions[rung_idx].lock().unwrap().push(out);
601 Ok(())
602 }
603 Ok(Err(e)) => Err(e),
604 Err(e) => Err(anyhow!("worker join error: {e}")),
605 };
606 drop(lease);
607 let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
608 if prev == 1 {
609 rung_done[rung_idx].notify_one();
610 }
611 (rung_idx, task_status)
612 };
613
614 match worker_tasks {
615 Some(set) => {
616 set.spawn(body);
617 }
618 None => {
619 tokio::spawn(async move {
620 let _ = body.await;
621 });
622 }
623 }
624}
625
626fn spawn_progress_reporter(
629 rungs: Vec<Rung>,
630 frames_encoded: Vec<Arc<AtomicU64>>,
631 finalized: Arc<Vec<AtomicBool>>,
632 total_input_frames: u64,
633 sink: Arc<dyn ProgressSink>,
634 stop: Arc<AtomicBool>,
635) -> tokio::task::JoinHandle<()> {
636 tokio::spawn(async move {
637 loop {
638 if stop.load(Ordering::Acquire) {
639 break;
640 }
641 tokio::time::sleep(PROGRESS_TICK).await;
642 for (idx, rung) in rungs.iter().enumerate() {
643 if finalized[idx].load(Ordering::Acquire) {
644 continue;
645 }
646 let done = frames_encoded[idx].load(Ordering::Relaxed);
647 report(
648 sink.as_ref(),
649 idx,
650 rung,
651 RungStatus::Running,
652 done,
653 Some(total_input_frames),
654 0,
655 0,
656 None,
657 );
658 }
659 }
660 })
661}
662
663#[allow(clippy::too_many_arguments)]
664fn report(
665 sink: &dyn ProgressSink,
666 rung_index: usize,
667 rung: &Rung,
668 status: RungStatus,
669 frames_done: u64,
670 frames_total: Option<u64>,
671 segments: u32,
672 bytes_out: u64,
673 message: Option<String>,
674) {
675 let percent = match status {
676 RungStatus::Completed => 100.0,
677 RungStatus::Pending => 0.0,
678 _ => match frames_total {
679 Some(t) if t > 0 => ((frames_done as f32 / t as f32) * 100.0).min(99.0),
680 _ => 1.0,
681 },
682 };
683 sink.on_rung(RungProgress {
684 rung_index,
685 label: rung.label.clone(),
686 width: rung.width,
687 height: rung.height,
688 status,
689 percent,
690 frames_done,
691 frames_total,
692 segments_written: segments,
693 bytes_out,
694 message,
695 });
696}
697
698pub fn detect_gpu_pool() -> Arc<GpuPool> {
700 Arc::new(GpuPool::new(&codec::gpu::detect_gpus()))
701}
702
703fn policy_vendor(fam: GpuFamily) -> codec::gpu::GpuVendor {
704 match fam {
705 GpuFamily::Nvidia => codec::gpu::GpuVendor::Nvidia,
706 GpuFamily::Amd => codec::gpu::GpuVendor::Amd,
707 GpuFamily::Intel => codec::gpu::GpuVendor::Intel,
708 }
709}
710
711fn select_gpus_for_policy(policy: EncodePolicy) -> Vec<codec::gpu::GpuDevice> {
715 let gpus = codec::gpu::detect_gpus();
716 match policy {
717 EncodePolicy::AllGpus => gpus,
718 EncodePolicy::SingleGpu(None) => gpus.into_iter().take(1).collect(),
719 EncodePolicy::SingleGpu(Some(idx)) => gpus.into_iter().filter(|g| g.index == idx).collect(),
720 EncodePolicy::Family(fam) => {
721 let v = policy_vendor(fam);
722 gpus.into_iter().filter(|g| g.vendor == v).collect()
723 }
724 }
725}
726
727pub fn gpu_pool_for_policy(policy: EncodePolicy) -> Arc<GpuPool> {
739 let selected = select_gpus_for_policy(policy);
740 let pool_gpus = if selected.len() > 1 {
741 selected.into_iter().filter(|g| codec::encode::av1_encode_capable(g)).collect()
742 } else {
743 selected
744 };
745 Arc::new(GpuPool::new(&pool_gpus))
746}
747
748pub fn policy_gpu_indices(policy: EncodePolicy) -> Vec<u32> {
752 select_gpus_for_policy(policy).into_iter().map(|g| g.index).collect()
753}
754
755pub fn serial_gpu_for_policy(policy: EncodePolicy) -> Option<u32> {
759 match policy {
760 EncodePolicy::AllGpus => None,
761 EncodePolicy::SingleGpu(idx) => idx,
762 EncodePolicy::Family(_) => select_gpus_for_policy(policy).first().map(|g| g.index),
763 }
764}
765
766#[derive(Debug)]
773pub struct RungPackets {
774 pub rung_index: usize,
775 pub width: u32,
776 pub height: u32,
777 pub label: String,
778 pub packets: Vec<EncodedPacket>,
779}
780
781pub async fn run_multigpu_single_file(
788 params: MultiGpuParams<'_>,
789 sink: Arc<dyn ProgressSink>,
790) -> Result<Vec<Option<RungPackets>>> {
791 let rungs = params.rungs;
792 let n = rungs.len();
793 if n == 0 {
794 return Ok(Vec::new());
795 }
796 let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
797 if total_segments == 0 {
798 bail!(
799 "multigpu single-file: total_segments == 0 (frames={}, keyframe_interval={})",
800 params.total_input_frames,
801 params.keyframe_interval
802 );
803 }
804
805 {
807 let probe = codec::encode::EncoderConfig {
808 width: rungs[0].width,
809 height: rungs[0].height,
810 frame_rate: params.frame_rate,
811 gpu_index: None,
812 ..Default::default()
813 };
814 codec::encode::select_encoder(probe, None).map_err(|e| {
815 anyhow!(
816 "no AV1 encoder available on this host ({e}); need NVENC (Ada+) / AMF \
817 (RDNA3+) / QSV (Arc+), or build with the `ffmpeg` feature"
818 )
819 })?;
820 }
821
822 tracing::info!(
823 rungs = n,
824 total_segments,
825 gpu_pool_capacity = params.gpu_pool.capacity(),
826 "multi-GPU single-file phase starting"
827 );
828
829 let queues: Vec<Arc<SegmentChunkQueue>> =
830 (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
831 let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
832 let scaler_active: Vec<Arc<AtomicBool>> =
833 (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
834 let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
835 (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
836 let contributions: Vec<Arc<std::sync::Mutex<Vec<ChunkPackets>>>> =
838 (0..n).map(|_| Arc::new(std::sync::Mutex::new(Vec::new()))).collect();
839 let active_workers: Arc<Vec<AtomicUsize>> =
840 Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
841 let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
842 let finalized: Arc<Vec<AtomicBool>> = Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
843
844 let progress_stop = Arc::new(AtomicBool::new(false));
845 let progress_handle = spawn_progress_reporter(
846 rungs.to_vec(),
847 frames_encoded.clone(),
848 finalized.clone(),
849 params.total_input_frames,
850 Arc::clone(&sink),
851 Arc::clone(&progress_stop),
852 );
853
854 let total_input_frames = params.total_input_frames;
856 let (finalizer_tx, mut finalizer_rx) =
857 mpsc::channel::<(usize, Result<Option<RungPackets>>)>(n.max(1));
858 let mut finalizer_handles = Vec::with_capacity(n);
859 for idx in 0..n {
860 let collector = Arc::clone(&contributions[idx]);
861 let active_h = Arc::clone(&active_workers);
862 let rung_done_h = Arc::clone(&rung_done);
863 let finalized_h = Arc::clone(&finalized);
864 let tx = finalizer_tx.clone();
865 let rung = rungs[idx].clone();
866 let total_segments = total_segments;
867 let sink = Arc::clone(&sink);
868 finalizer_handles.push(tokio::spawn(async move {
869 loop {
870 let notified = rung_done_h[idx].notified();
871 if active_h[idx].load(Ordering::Acquire) == 0 {
872 break;
873 }
874 notified.await;
875 }
876 let mut chunks: Vec<ChunkPackets> = std::mem::take(&mut *collector.lock().unwrap());
877 if chunks.is_empty() {
878 finalized_h[idx].store(true, Ordering::Release);
879 let _ = tx.send((idx, Ok(None))).await;
880 return;
881 }
882 chunks.sort_by_key(|c| c.segment_idx);
883 chunks.dedup_by_key(|c| c.segment_idx);
884 let got = chunks.len();
886 let contiguous = chunks
887 .iter()
888 .enumerate()
889 .all(|(i, c)| c.segment_idx == i);
890 let result = if got != total_segments as usize || !contiguous {
891 Err(anyhow!(
892 "rung {} chunk coverage incomplete: expected {} contiguous chunks, got {}",
893 rung.label,
894 total_segments,
895 got
896 ))
897 } else {
898 let mut packets: Vec<EncodedPacket> = Vec::new();
899 for c in chunks {
900 packets.extend(c.packets);
901 }
902 let bytes: u64 = packets.iter().map(|p| p.data.len() as u64).sum();
903 report(
904 sink.as_ref(),
905 idx,
906 &rung,
907 RungStatus::Completed,
908 total_input_frames,
909 Some(total_input_frames),
910 got as u32,
911 bytes,
912 None,
913 );
914 Ok(Some(RungPackets {
915 rung_index: idx,
916 width: rung.width,
917 height: rung.height,
918 label: rung.label.clone(),
919 packets,
920 }))
921 };
922 finalized_h[idx].store(true, Ordering::Release);
923 let _ = tx.send((idx, result)).await;
924 }));
925 }
926 drop(finalizer_tx);
927
928 let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
929 indexed.sort_by_key(|(_, r)| r.short_side());
930
931 let mut frame_senders = Vec::with_capacity(n);
933 let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
934 Vec::with_capacity(n);
935 for _ in 0..n {
936 let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
937 frame_senders.push(tx);
938 frame_receivers.push(Some(rx));
939 }
940 let use_shared_pump = n <= params.gpu_pool.capacity();
941 let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
942 let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
943 codec_name: params.header.codec.clone(),
944 info_for_decoder: params.header.info.clone(),
945 source_color_metadata: params.source_color_metadata,
946 source_pixel_format: params.source_pixel_format,
947 needs_downsample: params.needs_downsample,
948 tonemap_to_sdr: params.tonemap_to_sdr,
949 gpu_index,
950 filters: params.filters.clone(),
951 };
952 if use_shared_pump {
953 let cfg = make_pump_cfg(params.decode_gpu_for(0));
954 let senders = frame_senders;
955 let input = params.input.clone();
956 let rt = tokio::runtime::Handle::current();
957 pump_tasks.spawn(async move {
958 tokio::task::spawn_blocking(move || {
959 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
960 })
961 .await
962 .map_err(|e| anyhow!("shared pump join error: {e}"))
963 .and_then(|r| r)
964 });
965 } else {
966 for (idx, sender) in frame_senders.into_iter().enumerate() {
967 let cfg = make_pump_cfg(params.decode_gpu_for(idx));
968 let input = params.input.clone();
969 let rt = tokio::runtime::Handle::current();
970 pump_tasks.spawn(async move {
971 tokio::task::spawn_blocking(move || {
972 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
973 })
974 .await
975 .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
976 .and_then(|r| r)
977 });
978 }
979 }
980
981 let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
983 for (idx, rung) in rungs.iter().cloned().enumerate() {
984 let rx = frame_receivers[idx].take().expect("scaler rx slot");
985 let cfg = crate::rung_scaler::RungScalerConfig {
986 rung_idx: idx,
987 target_width: rung.width,
988 target_height: rung.height,
989 frames_per_chunk: params.keyframe_interval,
990 };
991 let queue = Arc::clone(&queues[idx]);
992 let rt = tokio::runtime::Handle::current();
993 let scaler_flag = Arc::clone(&scaler_active[idx]);
994 let active_h = Arc::clone(&active_workers);
995 let rung_done_h = Arc::clone(&rung_done);
996 scaler_flag.store(true, Ordering::Release);
997 active_h[idx].fetch_add(1, Ordering::AcqRel);
998 scaler_tasks.spawn(async move {
999 let result = tokio::task::spawn_blocking(move || {
1000 crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
1001 })
1002 .await
1003 .map_err(|e| anyhow!("scaler join error: {e}"))
1004 .and_then(|r| r);
1005 scaler_flag.store(false, Ordering::Release);
1006 let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
1007 if prev == 1 {
1008 rung_done_h[idx].notify_one();
1009 }
1010 (idx, result)
1011 });
1012 }
1013
1014 let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
1016 let ctx = WorkerCtx {
1017 frame_rate: params.frame_rate,
1018 output_color_metadata: params.output_color_metadata,
1019 output_pixel_format: params.output_pixel_format,
1020 timescale: params.timescale,
1021 per_frame_ticks: params.per_frame_ticks,
1022 keyframe_interval: params.keyframe_interval,
1023 segment_target_ticks: params.segment_target_ticks,
1024 output_root: params.output_root.clone(),
1025 constant_qp: params.constant_qp,
1026 };
1027 for (idx, rung) in indexed.iter().cloned() {
1028 let lease = match Arc::clone(¶ms.gpu_pool).claim().await {
1029 Some(l) => l,
1030 None => {
1031 progress_stop.store(true, Ordering::Release);
1032 let _ = progress_handle.await;
1033 bail!("multigpu single-file: GPU pool returned no lease; at least one GPU required");
1034 }
1035 };
1036 spawn_chunk_worker(
1037 &ctx,
1038 idx,
1039 &rung,
1040 Arc::clone(&queues[idx]),
1041 Arc::clone(&frames_encoded[idx]),
1042 lease,
1043 Arc::clone(&contributions[idx]),
1044 Arc::clone(&active_workers),
1045 Arc::clone(&rung_done),
1046 Arc::clone(&rung_invariants[idx]),
1047 Some(&mut worker_tasks),
1048 );
1049 }
1050
1051 let helper_cancel = Arc::new(AtomicBool::new(false));
1053 let helper_handle = {
1054 let cancel = Arc::clone(&helper_cancel);
1055 let pool = Arc::clone(¶ms.gpu_pool);
1056 let queues = queues.clone();
1057 let scaler_active = scaler_active.clone();
1058 let frames_encoded = frames_encoded.clone();
1059 let contributions = contributions.clone();
1060 let active_workers = Arc::clone(&active_workers);
1061 let rung_done = Arc::clone(&rung_done);
1062 let rung_invariants = rung_invariants.clone();
1063 let rungs_owned: Vec<Rung> = rungs.to_vec();
1064 let ctx = ctx.clone();
1065 tokio::spawn(async move {
1066 loop {
1067 if cancel.load(Ordering::Acquire) {
1068 break;
1069 }
1070 tokio::time::sleep(HELPER_POLL_INTERVAL).await;
1071 if pool.pending_claimers() > 0 {
1072 continue;
1073 }
1074 let mut target = None;
1075 for (idx, q) in queues.iter().enumerate() {
1076 let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
1077 let has_pending = q.pushed_segments() > q.popped_segments();
1078 if scaler_alive || has_pending {
1079 target = Some(idx);
1080 break;
1081 }
1082 }
1083 let Some(rung_idx) = target else { break };
1084 let lease = match pool.try_claim() {
1085 Some(l) => l,
1086 None => continue,
1087 };
1088 tracing::info!(rung_idx, gpu_index = lease.gpu_index, "single-file helper dispatch");
1089 spawn_chunk_worker(
1090 &ctx,
1091 rung_idx,
1092 &rungs_owned[rung_idx],
1093 Arc::clone(&queues[rung_idx]),
1094 Arc::clone(&frames_encoded[rung_idx]),
1095 lease,
1096 Arc::clone(&contributions[rung_idx]),
1097 Arc::clone(&active_workers),
1098 Arc::clone(&rung_done),
1099 Arc::clone(&rung_invariants[rung_idx]),
1100 None,
1101 );
1102 }
1103 })
1104 };
1105
1106 let mut completed: Vec<Option<RungPackets>> = (0..n).map(|_| None).collect();
1108 let mut pumps_remaining = pump_tasks.len();
1109 let mut scalers_remaining = n;
1110 let mut workers_remaining = n;
1111 let mut finalizers_remaining = n;
1112 macro_rules! teardown_err {
1113 ($e:expr) => {{
1114 helper_cancel.store(true, Ordering::Release);
1115 let _ = helper_handle.await;
1116 progress_stop.store(true, Ordering::Release);
1117 let _ = progress_handle.await;
1118 return Err($e);
1119 }};
1120 }
1121 while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
1122 tokio::select! {
1123 biased;
1124 p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
1125 Some(Ok(Ok(_))) => pumps_remaining -= 1,
1126 Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
1127 Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
1128 None => pumps_remaining = 0,
1129 },
1130 s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
1131 Some(Ok((_, Ok(_)))) => scalers_remaining -= 1,
1132 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
1133 Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
1134 None => scalers_remaining = 0,
1135 },
1136 w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
1137 Some(Ok((_, Ok(())))) => workers_remaining -= 1,
1138 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("chunk worker for rung {idx} failed: {e}")),
1139 Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
1140 None => workers_remaining = 0,
1141 },
1142 f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
1143 Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
1144 Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
1145 None => finalizers_remaining = 0,
1146 },
1147 }
1148 }
1149 helper_cancel.store(true, Ordering::Release);
1150 let _ = helper_handle.await;
1151 progress_stop.store(true, Ordering::Release);
1152 let _ = progress_handle.await;
1153 for h in finalizer_handles {
1154 let _ = h.await;
1155 }
1156 Ok(completed)
1157}
1158
1159#[allow(clippy::too_many_arguments)]
1160fn spawn_chunk_worker(
1161 ctx: &WorkerCtx,
1162 rung_idx: usize,
1163 rung: &Rung,
1164 queue: Arc<SegmentChunkQueue>,
1165 frames_encoded: Arc<AtomicU64>,
1166 lease: GpuLease,
1167 collector: Arc<std::sync::Mutex<Vec<ChunkPackets>>>,
1168 active_workers: Arc<Vec<AtomicUsize>>,
1169 rung_done: Arc<Vec<Notify>>,
1170 rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
1171 worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
1172) {
1173 let gpu_index = lease.gpu_index;
1174 let gpu_vendor = lease.vendor;
1175 let cfg = EncoderWorkerConfig {
1176 rung_idx,
1177 width: rung.width,
1178 height: rung.height,
1179 frame_rate: ctx.frame_rate,
1180 quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
1181 speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
1182 target: rung.quality.target,
1183 tier: rung.quality.tier,
1184 threads: 0,
1185 gpu_index: Some(gpu_index),
1186 gpu_vendor: Some(gpu_vendor),
1187 output_color_metadata: ctx.output_color_metadata,
1188 output_pixel_format: ctx.output_pixel_format,
1189 constant_qp: ctx.constant_qp,
1190 timescale: ctx.timescale,
1191 per_frame_ticks: ctx.per_frame_ticks,
1192 keyframe_interval: ctx.keyframe_interval,
1193 segment_target_ticks: ctx.segment_target_ticks,
1194 output_dir: ctx.output_root.clone(),
1195 rung_invariant,
1196 };
1197 active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
1198 let body = async move {
1199 let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
1200 let cfg_for_worker = cfg.clone();
1201 let queue_for_worker = Arc::clone(&queue);
1202 let rt = tokio::runtime::Handle::current();
1203 let counter = Arc::clone(&frames_encoded);
1204 let out = Arc::clone(&collector);
1205 let blocking = tokio::task::spawn_blocking(move || {
1206 run_chunk_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx, out)
1207 });
1208 let drain = async move { while progress_rx.recv().await.is_some() {} };
1209 let (_, br) = tokio::join!(drain, blocking);
1210 let task_status: Result<()> = match br {
1211 Ok(Ok(())) => Ok(()),
1212 Ok(Err(e)) => Err(e),
1213 Err(e) => Err(anyhow!("chunk worker join error: {e}")),
1214 };
1215 drop(lease);
1216 let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
1217 if prev == 1 {
1218 rung_done[rung_idx].notify_one();
1219 }
1220 (rung_idx, task_status)
1221 };
1222 match worker_tasks {
1223 Some(set) => {
1224 set.spawn(body);
1225 }
1226 None => {
1227 tokio::spawn(async move {
1228 let _ = body.await;
1229 });
1230 }
1231 }
1232}