1use std::path::PathBuf;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
33
34use anyhow::{Result, anyhow, bail};
35use bytes::Bytes;
36use codec::encode::AUTO_FROM_TARGET;
37use codec::frame::{ColorMetadata, PixelFormat, VideoCodec};
38use container::cmaf::CmafTrackManifest;
39use container::streaming::DemuxHeader;
40use tokio::sync::{Notify, mpsc};
41use tokio::task::JoinSet;
42
43use codec::encode::EncodedPacket;
44
45use crate::cmaf_util::{RungContribution, merge_rung_contributions, total_segments_for_rung};
46use crate::decode_pump::DecodePumpConfig;
47use crate::encoder_worker::{
48 ChunkPackets, EncoderWorkerConfig, RungCodecInvariant, WorkerOutput,
49 run_chunk_encoder_worker_blocking, run_encoder_worker_blocking,
50};
51use crate::frame_queue::SegmentChunkQueue;
52use crate::gpu_pool::{GpuLease, GpuPool};
53use crate::progress::{ProgressSink, RungProgress, RungStatus};
54use crate::spec::{EncodePolicy, GpuFamily, Rung};
55
56const QUEUE_CAPACITY: usize = 2;
57const FANOUT_CHANNEL_CAPACITY: usize = 4;
58const HELPER_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(200);
59const PROGRESS_TICK: std::time::Duration = std::time::Duration::from_millis(500);
60
61#[derive(Debug, Clone)]
63pub struct RungManifest {
64 pub rung_index: usize,
65 pub width: u32,
66 pub height: u32,
67 pub label: String,
68 pub relative_dir: String,
70 pub manifest: CmafTrackManifest,
71}
72
73pub struct MultiGpuParams<'a> {
75 pub input: Bytes,
76 pub codec: VideoCodec,
79 pub rungs: &'a [Rung],
80 pub header: DemuxHeader,
81 pub source_color_metadata: ColorMetadata,
82 pub source_pixel_format: PixelFormat,
83 pub tonemap_to_sdr: bool,
85 pub output_color_metadata: ColorMetadata,
88 pub output_pixel_format: PixelFormat,
89 pub needs_downsample: bool,
90 pub filters: Arc<codec::filter::FilterChain>,
93 pub frame_rate: f64,
94 pub gpu_pool: Arc<GpuPool>,
95 pub gpu_indices: Vec<u32>,
100 pub decode_gpu: Option<u32>,
103 pub output_root: PathBuf,
104 pub timescale: u32,
105 pub per_frame_ticks: u32,
106 pub keyframe_interval: u32,
107 pub segment_target_ticks: u64,
108 pub total_input_frames: u64,
109 pub constant_qp: bool,
113}
114
115impl MultiGpuParams<'_> {
116 fn decode_gpu_for(&self, i: usize) -> Option<u32> {
120 if self.decode_gpu.is_some() {
121 return self.decode_gpu;
122 }
123 if self.gpu_indices.is_empty() {
124 return None;
125 }
126 Some(self.gpu_indices[i % self.gpu_indices.len()])
127 }
128}
129
130pub async fn run_multigpu_hls(
133 params: MultiGpuParams<'_>,
134 sink: Arc<dyn ProgressSink>,
135) -> Result<Vec<Option<RungManifest>>> {
136 let rungs = params.rungs;
137 let n = rungs.len();
138 if n == 0 {
139 return Ok(Vec::new());
140 }
141 let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
142 if total_segments == 0 {
143 bail!(
144 "multigpu: total_segments == 0 (total_input_frames={}, keyframe_interval={})",
145 params.total_input_frames,
146 params.keyframe_interval
147 );
148 }
149
150 {
156 let probe = codec::encode::EncoderConfig {
157 width: rungs[0].width,
158 height: rungs[0].height,
159 frame_rate: params.frame_rate,
160 gpu_index: None,
161 codec: params.codec,
162 ..Default::default()
163 };
164 codec::encode::select_encoder(probe, None).map_err(|e| {
165 anyhow!(
166 "no {:?} encoder available on this host ({e}); need NVENC / AMF / QSV, or build \
167 with the `ffmpeg` feature for a software encoder",
168 params.codec
169 )
170 })?;
171 }
172
173 tracing::info!(
174 rungs = n,
175 total_segments,
176 gpu_pool_capacity = params.gpu_pool.capacity(),
177 "multi-GPU variant phase starting"
178 );
179
180 let queues: Vec<Arc<SegmentChunkQueue>> =
182 (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
183 let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
184 let scaler_active: Vec<Arc<AtomicBool>> =
185 (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
186 let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
187 (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
188 let contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>> =
189 Arc::new((0..n).map(|_| std::sync::Mutex::new(Vec::new())).collect());
190 let active_workers: Arc<Vec<AtomicUsize>> =
191 Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
192 let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
193 let finalized: Arc<Vec<AtomicBool>> =
194 Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
195
196 let progress_stop = Arc::new(AtomicBool::new(false));
198 let progress_handle = spawn_progress_reporter(
199 rungs.to_vec(),
200 frames_encoded.clone(),
201 finalized.clone(),
202 params.total_input_frames,
203 Arc::clone(&sink),
204 Arc::clone(&progress_stop),
205 );
206
207 let total_input_frames = params.total_input_frames;
209 let (finalizer_tx, mut finalizer_rx) =
210 mpsc::channel::<(usize, Result<Option<RungManifest>>)>(n.max(1));
211 let mut finalizer_handles = Vec::with_capacity(n);
212 for idx in 0..n {
213 let contributions_h = Arc::clone(&contributions);
214 let active_h = Arc::clone(&active_workers);
215 let rung_done_h = Arc::clone(&rung_done);
216 let finalized_h = Arc::clone(&finalized);
217 let tx = finalizer_tx.clone();
218 let rung = rungs[idx].clone();
219 let rel_dir = format!("video/{}", rung.label);
220 let output_root = params.output_root.clone();
221 let timescale = params.timescale;
222 let total_segments = total_segments;
223 let sink = Arc::clone(&sink);
224 finalizer_handles.push(tokio::spawn(async move {
225 loop {
227 let notified = rung_done_h[idx].notified();
228 if active_h[idx].load(Ordering::Acquire) == 0 {
229 break;
230 }
231 notified.await;
232 }
233 let outputs: Vec<WorkerOutput> = std::mem::take(&mut *contributions_h[idx].lock().unwrap());
234 if outputs.is_empty() {
235 finalized_h[idx].store(true, Ordering::Release);
236 let _ = tx.send((idx, Ok(None))).await;
237 return;
238 }
239 let init_path = output_root.join(&rel_dir).join("init.mp4");
240 let contribs: Vec<RungContribution> = outputs
241 .into_iter()
242 .map(|wo| RungContribution {
243 width: rung.width,
244 height: rung.height,
245 relative_dir: rel_dir.clone(),
246 manifest: CmafTrackManifest {
247 init_path: init_path.clone(),
248 segments: wo.segments,
249 timescale,
250 },
251 })
252 .collect();
253 let result = match merge_rung_contributions(contribs) {
254 Ok(merged) => {
255 let got = merged.manifest.segments.len();
256 if got != total_segments as usize {
257 Err(anyhow!(
258 "rung {} coverage incomplete: expected {} segments, got {}",
259 rung.label,
260 total_segments,
261 got
262 ))
263 } else {
264 let bytes: u64 = merged.manifest.segments.iter().map(|s| s.byte_size).sum();
265 report(
266 sink.as_ref(),
267 idx,
268 &rung,
269 RungStatus::Completed,
270 total_input_frames,
271 Some(total_input_frames),
272 got as u32,
273 bytes,
274 None,
275 );
276 Ok(Some(RungManifest {
277 rung_index: idx,
278 width: rung.width,
279 height: rung.height,
280 label: rung.label.clone(),
281 relative_dir: rel_dir.clone(),
282 manifest: merged.manifest,
283 }))
284 }
285 }
286 Err(e) => Err(anyhow!("merging contributions for rung {}: {e}", rung.label)),
287 };
288 finalized_h[idx].store(true, Ordering::Release);
289 let _ = tx.send((idx, result)).await;
290 }));
291 }
292 drop(finalizer_tx);
293
294 let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
296 indexed.sort_by_key(|(_, r)| r.short_side());
297
298 let mut frame_senders = Vec::with_capacity(n);
300 let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
301 Vec::with_capacity(n);
302 for _ in 0..n {
303 let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
304 frame_senders.push(tx);
305 frame_receivers.push(Some(rx));
306 }
307
308 let use_shared_pump = n <= params.gpu_pool.capacity();
309 let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
310 let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
311 codec_name: params.header.codec.clone(),
312 info_for_decoder: params.header.info.clone(),
313 source_color_metadata: params.source_color_metadata,
314 source_pixel_format: params.source_pixel_format,
315 needs_downsample: params.needs_downsample,
316 tonemap_to_sdr: params.tonemap_to_sdr,
317 gpu_index,
318 filters: params.filters.clone(),
319 };
320 if use_shared_pump {
321 let cfg = make_pump_cfg(params.decode_gpu_for(0));
322 let senders = frame_senders;
323 let input = params.input.clone();
324 let rt = tokio::runtime::Handle::current();
325 pump_tasks.spawn(async move {
326 tokio::task::spawn_blocking(move || {
327 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
328 })
329 .await
330 .map_err(|e| anyhow!("shared pump join error: {e}"))
331 .and_then(|r| r)
332 });
333 } else {
334 for (idx, sender) in frame_senders.into_iter().enumerate() {
335 let cfg = make_pump_cfg(params.decode_gpu_for(idx));
336 let input = params.input.clone();
337 let rt = tokio::runtime::Handle::current();
338 pump_tasks.spawn(async move {
339 tokio::task::spawn_blocking(move || {
340 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
341 })
342 .await
343 .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
344 .and_then(|r| r)
345 });
346 }
347 }
348
349 let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
351 for (idx, rung) in rungs.iter().cloned().enumerate() {
352 let rx = frame_receivers[idx].take().expect("scaler rx slot");
353 let cfg = crate::rung_scaler::RungScalerConfig {
354 rung_idx: idx,
355 target_width: rung.width,
356 target_height: rung.height,
357 frames_per_chunk: params.keyframe_interval,
358 };
359 let queue = Arc::clone(&queues[idx]);
360 let rt = tokio::runtime::Handle::current();
361 let scaler_flag = Arc::clone(&scaler_active[idx]);
362 let active_h = Arc::clone(&active_workers);
363 let rung_done_h = Arc::clone(&rung_done);
364 scaler_flag.store(true, Ordering::Release);
365 active_h[idx].fetch_add(1, Ordering::AcqRel);
366 scaler_tasks.spawn(async move {
367 let result = tokio::task::spawn_blocking(move || {
368 crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
369 })
370 .await
371 .map_err(|e| anyhow!("scaler join error: {e}"))
372 .and_then(|r| r);
373 scaler_flag.store(false, Ordering::Release);
374 let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
375 if prev == 1 {
376 rung_done_h[idx].notify_one();
377 }
378 (idx, result)
379 });
380 }
381
382 let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
384 let ctx = WorkerCtx {
385 codec: params.codec,
386 frame_rate: params.frame_rate,
387 output_color_metadata: params.output_color_metadata,
388 output_pixel_format: params.output_pixel_format,
389 timescale: params.timescale,
390 per_frame_ticks: params.per_frame_ticks,
391 keyframe_interval: params.keyframe_interval,
392 segment_target_ticks: params.segment_target_ticks,
393 output_root: params.output_root.clone(),
394 constant_qp: params.constant_qp,
395 };
396 for (idx, rung) in indexed.iter().cloned() {
397 let lease = match Arc::clone(¶ms.gpu_pool).claim().await {
398 Some(l) => l,
399 None => {
400 progress_stop.store(true, Ordering::Release);
401 let _ = progress_handle.await;
402 bail!("multigpu: GPU pool returned no lease on a CPU-only host; at least one GPU is required");
403 }
404 };
405 spawn_encoder_worker(
406 &ctx,
407 idx,
408 &rung,
409 Arc::clone(&queues[idx]),
410 Arc::clone(&frames_encoded[idx]),
411 lease,
412 Arc::clone(&contributions),
413 Arc::clone(&active_workers),
414 Arc::clone(&rung_done),
415 Arc::clone(&rung_invariants[idx]),
416 Some(&mut worker_tasks),
417 );
418 }
419
420 let helper_cancel = Arc::new(AtomicBool::new(false));
422 let helper_handle = {
423 let cancel = Arc::clone(&helper_cancel);
424 let pool = Arc::clone(¶ms.gpu_pool);
425 let queues = queues.clone();
426 let scaler_active = scaler_active.clone();
427 let frames_encoded = frames_encoded.clone();
428 let contributions = Arc::clone(&contributions);
429 let active_workers = Arc::clone(&active_workers);
430 let rung_done = Arc::clone(&rung_done);
431 let rung_invariants = rung_invariants.clone();
432 let rungs_owned: Vec<Rung> = rungs.to_vec();
433 let ctx = ctx.clone();
434 tokio::spawn(async move {
435 loop {
436 if cancel.load(Ordering::Acquire) {
437 break;
438 }
439 tokio::time::sleep(HELPER_POLL_INTERVAL).await;
440 if pool.pending_claimers() > 0 {
441 continue;
442 }
443 let mut target = None;
444 for (idx, q) in queues.iter().enumerate() {
445 let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
446 let has_pending = q.pushed_segments() > q.popped_segments();
447 if scaler_alive || has_pending {
448 target = Some(idx);
449 break;
450 }
451 }
452 let Some(rung_idx) = target else { break };
453 let lease = match pool.try_claim() {
454 Some(l) => l,
455 None => continue,
456 };
457 tracing::info!(rung_idx, gpu_index = lease.gpu_index, "multigpu helper dispatch");
458 spawn_encoder_worker(
459 &ctx,
460 rung_idx,
461 &rungs_owned[rung_idx],
462 Arc::clone(&queues[rung_idx]),
463 Arc::clone(&frames_encoded[rung_idx]),
464 lease,
465 Arc::clone(&contributions),
466 Arc::clone(&active_workers),
467 Arc::clone(&rung_done),
468 Arc::clone(&rung_invariants[rung_idx]),
469 None,
470 );
471 }
472 })
473 };
474
475 let mut completed: Vec<Option<RungManifest>> = (0..n).map(|_| None).collect();
477 let mut pumps_remaining = pump_tasks.len();
478 let mut scalers_remaining = n;
479 let mut workers_remaining = n;
480 let mut finalizers_remaining = n;
481
482 macro_rules! teardown_err {
483 ($e:expr) => {{
484 helper_cancel.store(true, Ordering::Release);
485 let _ = helper_handle.await;
486 progress_stop.store(true, Ordering::Release);
487 let _ = progress_handle.await;
488 return Err($e);
489 }};
490 }
491
492 while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
493 tokio::select! {
494 biased;
495 p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
496 Some(Ok(Ok(n))) => { tracing::info!(frames = n, "decode pump finished"); pumps_remaining -= 1; }
497 Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
498 Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
499 None => pumps_remaining = 0,
500 },
501 s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
502 Some(Ok((idx, Ok(n)))) => { tracing::info!(idx, chunks = n, "scaler finished"); scalers_remaining -= 1; }
503 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
504 Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
505 None => scalers_remaining = 0,
506 },
507 w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
508 Some(Ok((idx, Ok(())))) => { tracing::info!(idx, "initial worker finished"); workers_remaining -= 1; }
509 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("worker for rung {idx} failed: {e}")),
510 Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
511 None => workers_remaining = 0,
512 },
513 f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
514 Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
515 Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
516 None => finalizers_remaining = 0,
517 },
518 }
519 }
520
521 helper_cancel.store(true, Ordering::Release);
522 let _ = helper_handle.await;
523 progress_stop.store(true, Ordering::Release);
524 let _ = progress_handle.await;
525 for h in finalizer_handles {
526 let _ = h.await;
527 }
528
529 Ok(completed)
530}
531
532#[derive(Clone)]
534struct WorkerCtx {
535 codec: VideoCodec,
536 frame_rate: f64,
537 output_color_metadata: ColorMetadata,
538 output_pixel_format: PixelFormat,
539 timescale: u32,
540 per_frame_ticks: u32,
541 keyframe_interval: u32,
542 segment_target_ticks: u64,
543 output_root: PathBuf,
544 constant_qp: bool,
545}
546
547#[allow(clippy::too_many_arguments)]
548fn spawn_encoder_worker(
549 ctx: &WorkerCtx,
550 rung_idx: usize,
551 rung: &Rung,
552 queue: Arc<SegmentChunkQueue>,
553 frames_encoded: Arc<AtomicU64>,
554 lease: GpuLease,
555 contributions: Arc<Vec<std::sync::Mutex<Vec<WorkerOutput>>>>,
556 active_workers: Arc<Vec<AtomicUsize>>,
557 rung_done: Arc<Vec<Notify>>,
558 rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
559 worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
560) {
561 let rel_dir = format!("video/{}", rung.label);
562 let output_dir = ctx.output_root.join(&rel_dir);
563 let gpu_index = lease.gpu_index;
564 let gpu_vendor = lease.vendor;
565
566 let cfg = EncoderWorkerConfig {
567 rung_idx,
568 codec: ctx.codec,
569 width: rung.width,
570 height: rung.height,
571 frame_rate: ctx.frame_rate,
572 quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
573 speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
574 target: rung.quality.target,
575 tier: rung.quality.tier,
576 threads: 0,
577 gpu_index: Some(gpu_index),
578 gpu_vendor: Some(gpu_vendor),
579 output_color_metadata: ctx.output_color_metadata,
580 output_pixel_format: ctx.output_pixel_format,
581 constant_qp: ctx.constant_qp,
582 timescale: ctx.timescale,
583 per_frame_ticks: ctx.per_frame_ticks,
584 keyframe_interval: ctx.keyframe_interval,
585 segment_target_ticks: ctx.segment_target_ticks,
586 output_dir,
587 rung_invariant,
588 };
589
590 active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
591 let body = async move {
592 let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
593 let cfg_for_worker = cfg.clone();
594 let queue_for_worker = Arc::clone(&queue);
595 let rt = tokio::runtime::Handle::current();
596 let counter = Arc::clone(&frames_encoded);
597 let blocking = tokio::task::spawn_blocking(move || {
598 run_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx)
599 });
600 let drain = async move { while progress_rx.recv().await.is_some() {} };
604 let (_, br) = tokio::join!(drain, blocking);
605
606 let task_status: Result<()> = match br {
607 Ok(Ok(out)) => {
608 contributions[rung_idx].lock().unwrap().push(out);
609 Ok(())
610 }
611 Ok(Err(e)) => Err(e),
612 Err(e) => Err(anyhow!("worker join error: {e}")),
613 };
614 drop(lease);
615 let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
616 if prev == 1 {
617 rung_done[rung_idx].notify_one();
618 }
619 (rung_idx, task_status)
620 };
621
622 match worker_tasks {
623 Some(set) => {
624 set.spawn(body);
625 }
626 None => {
627 tokio::spawn(async move {
628 let _ = body.await;
629 });
630 }
631 }
632}
633
634fn spawn_progress_reporter(
637 rungs: Vec<Rung>,
638 frames_encoded: Vec<Arc<AtomicU64>>,
639 finalized: Arc<Vec<AtomicBool>>,
640 total_input_frames: u64,
641 sink: Arc<dyn ProgressSink>,
642 stop: Arc<AtomicBool>,
643) -> tokio::task::JoinHandle<()> {
644 tokio::spawn(async move {
645 loop {
646 if stop.load(Ordering::Acquire) {
647 break;
648 }
649 tokio::time::sleep(PROGRESS_TICK).await;
650 for (idx, rung) in rungs.iter().enumerate() {
651 if finalized[idx].load(Ordering::Acquire) {
652 continue;
653 }
654 let done = frames_encoded[idx].load(Ordering::Relaxed);
655 report(
656 sink.as_ref(),
657 idx,
658 rung,
659 RungStatus::Running,
660 done,
661 Some(total_input_frames),
662 0,
663 0,
664 None,
665 );
666 }
667 }
668 })
669}
670
671#[allow(clippy::too_many_arguments)]
672fn report(
673 sink: &dyn ProgressSink,
674 rung_index: usize,
675 rung: &Rung,
676 status: RungStatus,
677 frames_done: u64,
678 frames_total: Option<u64>,
679 segments: u32,
680 bytes_out: u64,
681 message: Option<String>,
682) {
683 let percent = match status {
684 RungStatus::Completed => 100.0,
685 RungStatus::Pending => 0.0,
686 _ => match frames_total {
687 Some(t) if t > 0 => ((frames_done as f32 / t as f32) * 100.0).min(99.0),
688 _ => 1.0,
689 },
690 };
691 sink.on_rung(RungProgress {
692 rung_index,
693 label: rung.label.clone(),
694 width: rung.width,
695 height: rung.height,
696 status,
697 percent,
698 frames_done,
699 frames_total,
700 segments_written: segments,
701 bytes_out,
702 message,
703 });
704}
705
706pub fn detect_gpu_pool() -> Arc<GpuPool> {
708 Arc::new(GpuPool::new(&codec::gpu::detect_gpus()))
709}
710
711fn policy_vendor(fam: GpuFamily) -> codec::gpu::GpuVendor {
712 match fam {
713 GpuFamily::Nvidia => codec::gpu::GpuVendor::Nvidia,
714 GpuFamily::Amd => codec::gpu::GpuVendor::Amd,
715 GpuFamily::Intel => codec::gpu::GpuVendor::Intel,
716 }
717}
718
719fn select_gpus_for_policy(policy: EncodePolicy) -> Vec<codec::gpu::GpuDevice> {
723 let gpus = codec::gpu::detect_gpus();
724 match policy {
725 EncodePolicy::AllGpus => gpus,
726 EncodePolicy::SingleGpu(None) => gpus.into_iter().take(1).collect(),
727 EncodePolicy::SingleGpu(Some(idx)) => gpus.into_iter().filter(|g| g.index == idx).collect(),
728 EncodePolicy::Family(fam) => {
729 let v = policy_vendor(fam);
730 gpus.into_iter().filter(|g| g.vendor == v).collect()
731 }
732 }
733}
734
735pub fn gpu_pool_for_policy(policy: EncodePolicy, codec: codec::frame::VideoCodec) -> Arc<GpuPool> {
748 let selected = select_gpus_for_policy(policy);
749 let pool_gpus = if selected.len() > 1 {
750 selected.into_iter().filter(|g| codec::encode::encode_capable(g, codec)).collect()
751 } else {
752 selected
753 };
754 Arc::new(GpuPool::new(&pool_gpus))
755}
756
757pub fn policy_gpu_indices(policy: EncodePolicy) -> Vec<u32> {
761 select_gpus_for_policy(policy).into_iter().map(|g| g.index).collect()
762}
763
764pub fn serial_gpu_for_policy(policy: EncodePolicy) -> Option<u32> {
768 match policy {
769 EncodePolicy::AllGpus => None,
770 EncodePolicy::SingleGpu(idx) => idx,
771 EncodePolicy::Family(_) => select_gpus_for_policy(policy).first().map(|g| g.index),
772 }
773}
774
775#[derive(Debug)]
782pub struct RungPackets {
783 pub rung_index: usize,
784 pub codec: VideoCodec,
785 pub width: u32,
786 pub height: u32,
787 pub label: String,
788 pub packets: Vec<EncodedPacket>,
789}
790
791pub async fn run_multigpu_single_file(
798 params: MultiGpuParams<'_>,
799 sink: Arc<dyn ProgressSink>,
800) -> Result<Vec<Option<RungPackets>>> {
801 let rungs = params.rungs;
802 let n = rungs.len();
803 if n == 0 {
804 return Ok(Vec::new());
805 }
806 let total_segments = total_segments_for_rung(params.total_input_frames, params.keyframe_interval);
807 if total_segments == 0 {
808 bail!(
809 "multigpu single-file: total_segments == 0 (frames={}, keyframe_interval={})",
810 params.total_input_frames,
811 params.keyframe_interval
812 );
813 }
814
815 {
817 let probe = codec::encode::EncoderConfig {
818 width: rungs[0].width,
819 height: rungs[0].height,
820 frame_rate: params.frame_rate,
821 gpu_index: None,
822 codec: params.codec,
823 ..Default::default()
824 };
825 codec::encode::select_encoder(probe, None).map_err(|e| {
826 anyhow!(
827 "no {:?} encoder available on this host ({e}); need NVENC / AMF / QSV, or build \
828 with the `ffmpeg` feature",
829 params.codec
830 )
831 })?;
832 }
833
834 tracing::info!(
835 rungs = n,
836 total_segments,
837 gpu_pool_capacity = params.gpu_pool.capacity(),
838 "multi-GPU single-file phase starting"
839 );
840
841 let queues: Vec<Arc<SegmentChunkQueue>> =
842 (0..n).map(|_| Arc::new(SegmentChunkQueue::new(QUEUE_CAPACITY))).collect();
843 let frames_encoded: Vec<Arc<AtomicU64>> = (0..n).map(|_| Arc::new(AtomicU64::new(0))).collect();
844 let scaler_active: Vec<Arc<AtomicBool>> =
845 (0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
846 let rung_invariants: Vec<Arc<std::sync::RwLock<Option<RungCodecInvariant>>>> =
847 (0..n).map(|_| Arc::new(std::sync::RwLock::new(None))).collect();
848 let contributions: Vec<Arc<std::sync::Mutex<Vec<ChunkPackets>>>> =
850 (0..n).map(|_| Arc::new(std::sync::Mutex::new(Vec::new()))).collect();
851 let active_workers: Arc<Vec<AtomicUsize>> =
852 Arc::new((0..n).map(|_| AtomicUsize::new(0)).collect());
853 let rung_done: Arc<Vec<Notify>> = Arc::new((0..n).map(|_| Notify::new()).collect());
854 let finalized: Arc<Vec<AtomicBool>> = Arc::new((0..n).map(|_| AtomicBool::new(false)).collect());
855
856 let progress_stop = Arc::new(AtomicBool::new(false));
857 let progress_handle = spawn_progress_reporter(
858 rungs.to_vec(),
859 frames_encoded.clone(),
860 finalized.clone(),
861 params.total_input_frames,
862 Arc::clone(&sink),
863 Arc::clone(&progress_stop),
864 );
865
866 let total_input_frames = params.total_input_frames;
868 let codec = params.codec; let (finalizer_tx, mut finalizer_rx) =
870 mpsc::channel::<(usize, Result<Option<RungPackets>>)>(n.max(1));
871 let mut finalizer_handles = Vec::with_capacity(n);
872 for idx in 0..n {
873 let collector = Arc::clone(&contributions[idx]);
874 let active_h = Arc::clone(&active_workers);
875 let rung_done_h = Arc::clone(&rung_done);
876 let finalized_h = Arc::clone(&finalized);
877 let tx = finalizer_tx.clone();
878 let rung = rungs[idx].clone();
879 let total_segments = total_segments;
880 let sink = Arc::clone(&sink);
881 finalizer_handles.push(tokio::spawn(async move {
882 loop {
883 let notified = rung_done_h[idx].notified();
884 if active_h[idx].load(Ordering::Acquire) == 0 {
885 break;
886 }
887 notified.await;
888 }
889 let mut chunks: Vec<ChunkPackets> = std::mem::take(&mut *collector.lock().unwrap());
890 if chunks.is_empty() {
891 finalized_h[idx].store(true, Ordering::Release);
892 let _ = tx.send((idx, Ok(None))).await;
893 return;
894 }
895 chunks.sort_by_key(|c| c.segment_idx);
896 chunks.dedup_by_key(|c| c.segment_idx);
897 let got = chunks.len();
899 let contiguous = chunks
900 .iter()
901 .enumerate()
902 .all(|(i, c)| c.segment_idx == i);
903 let result = if got != total_segments as usize || !contiguous {
904 Err(anyhow!(
905 "rung {} chunk coverage incomplete: expected {} contiguous chunks, got {}",
906 rung.label,
907 total_segments,
908 got
909 ))
910 } else {
911 let mut packets: Vec<EncodedPacket> = Vec::new();
912 for c in chunks {
913 packets.extend(c.packets);
914 }
915 let bytes: u64 = packets.iter().map(|p| p.data.len() as u64).sum();
916 report(
917 sink.as_ref(),
918 idx,
919 &rung,
920 RungStatus::Completed,
921 total_input_frames,
922 Some(total_input_frames),
923 got as u32,
924 bytes,
925 None,
926 );
927 Ok(Some(RungPackets {
928 rung_index: idx,
929 codec,
930 width: rung.width,
931 height: rung.height,
932 label: rung.label.clone(),
933 packets,
934 }))
935 };
936 finalized_h[idx].store(true, Ordering::Release);
937 let _ = tx.send((idx, result)).await;
938 }));
939 }
940 drop(finalizer_tx);
941
942 let mut indexed: Vec<(usize, Rung)> = rungs.iter().cloned().enumerate().collect();
943 indexed.sort_by_key(|(_, r)| r.short_side());
944
945 let mut frame_senders = Vec::with_capacity(n);
947 let mut frame_receivers: Vec<Option<tokio::sync::mpsc::Receiver<codec::frame::VideoFrame>>> =
948 Vec::with_capacity(n);
949 for _ in 0..n {
950 let (tx, rx) = tokio::sync::mpsc::channel(FANOUT_CHANNEL_CAPACITY);
951 frame_senders.push(tx);
952 frame_receivers.push(Some(rx));
953 }
954 let use_shared_pump = n <= params.gpu_pool.capacity();
955 let mut pump_tasks: JoinSet<Result<u64>> = JoinSet::new();
956 let make_pump_cfg = |gpu_index: Option<u32>| DecodePumpConfig {
957 codec_name: params.header.codec.clone(),
958 info_for_decoder: params.header.info.clone(),
959 source_color_metadata: params.source_color_metadata,
960 source_pixel_format: params.source_pixel_format,
961 needs_downsample: params.needs_downsample,
962 tonemap_to_sdr: params.tonemap_to_sdr,
963 gpu_index,
964 filters: params.filters.clone(),
965 };
966 if use_shared_pump {
967 let cfg = make_pump_cfg(params.decode_gpu_for(0));
968 let senders = frame_senders;
969 let input = params.input.clone();
970 let rt = tokio::runtime::Handle::current();
971 pump_tasks.spawn(async move {
972 tokio::task::spawn_blocking(move || {
973 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, senders, rt)
974 })
975 .await
976 .map_err(|e| anyhow!("shared pump join error: {e}"))
977 .and_then(|r| r)
978 });
979 } else {
980 for (idx, sender) in frame_senders.into_iter().enumerate() {
981 let cfg = make_pump_cfg(params.decode_gpu_for(idx));
982 let input = params.input.clone();
983 let rt = tokio::runtime::Handle::current();
984 pump_tasks.spawn(async move {
985 tokio::task::spawn_blocking(move || {
986 crate::decode_pump::run_shared_decode_pump_blocking(cfg, input, vec![sender], rt)
987 })
988 .await
989 .map_err(|e| anyhow!("per-rung pump {idx} join error: {e}"))
990 .and_then(|r| r)
991 });
992 }
993 }
994
995 let mut scaler_tasks: JoinSet<(usize, Result<usize>)> = JoinSet::new();
997 for (idx, rung) in rungs.iter().cloned().enumerate() {
998 let rx = frame_receivers[idx].take().expect("scaler rx slot");
999 let cfg = crate::rung_scaler::RungScalerConfig {
1000 rung_idx: idx,
1001 target_width: rung.width,
1002 target_height: rung.height,
1003 frames_per_chunk: params.keyframe_interval,
1004 };
1005 let queue = Arc::clone(&queues[idx]);
1006 let rt = tokio::runtime::Handle::current();
1007 let scaler_flag = Arc::clone(&scaler_active[idx]);
1008 let active_h = Arc::clone(&active_workers);
1009 let rung_done_h = Arc::clone(&rung_done);
1010 scaler_flag.store(true, Ordering::Release);
1011 active_h[idx].fetch_add(1, Ordering::AcqRel);
1012 scaler_tasks.spawn(async move {
1013 let result = tokio::task::spawn_blocking(move || {
1014 crate::rung_scaler::run_rung_scaler_blocking(cfg, rx, queue, rt)
1015 })
1016 .await
1017 .map_err(|e| anyhow!("scaler join error: {e}"))
1018 .and_then(|r| r);
1019 scaler_flag.store(false, Ordering::Release);
1020 let prev = active_h[idx].fetch_sub(1, Ordering::AcqRel);
1021 if prev == 1 {
1022 rung_done_h[idx].notify_one();
1023 }
1024 (idx, result)
1025 });
1026 }
1027
1028 let mut worker_tasks: JoinSet<(usize, Result<()>)> = JoinSet::new();
1030 let ctx = WorkerCtx {
1031 codec: params.codec,
1032 frame_rate: params.frame_rate,
1033 output_color_metadata: params.output_color_metadata,
1034 output_pixel_format: params.output_pixel_format,
1035 timescale: params.timescale,
1036 per_frame_ticks: params.per_frame_ticks,
1037 keyframe_interval: params.keyframe_interval,
1038 segment_target_ticks: params.segment_target_ticks,
1039 output_root: params.output_root.clone(),
1040 constant_qp: params.constant_qp,
1041 };
1042 for (idx, rung) in indexed.iter().cloned() {
1043 let lease = match Arc::clone(¶ms.gpu_pool).claim().await {
1044 Some(l) => l,
1045 None => {
1046 progress_stop.store(true, Ordering::Release);
1047 let _ = progress_handle.await;
1048 bail!("multigpu single-file: GPU pool returned no lease; at least one GPU required");
1049 }
1050 };
1051 spawn_chunk_worker(
1052 &ctx,
1053 idx,
1054 &rung,
1055 Arc::clone(&queues[idx]),
1056 Arc::clone(&frames_encoded[idx]),
1057 lease,
1058 Arc::clone(&contributions[idx]),
1059 Arc::clone(&active_workers),
1060 Arc::clone(&rung_done),
1061 Arc::clone(&rung_invariants[idx]),
1062 Some(&mut worker_tasks),
1063 );
1064 }
1065
1066 let helper_cancel = Arc::new(AtomicBool::new(false));
1068 let helper_handle = {
1069 let cancel = Arc::clone(&helper_cancel);
1070 let pool = Arc::clone(¶ms.gpu_pool);
1071 let queues = queues.clone();
1072 let scaler_active = scaler_active.clone();
1073 let frames_encoded = frames_encoded.clone();
1074 let contributions = contributions.clone();
1075 let active_workers = Arc::clone(&active_workers);
1076 let rung_done = Arc::clone(&rung_done);
1077 let rung_invariants = rung_invariants.clone();
1078 let rungs_owned: Vec<Rung> = rungs.to_vec();
1079 let ctx = ctx.clone();
1080 tokio::spawn(async move {
1081 loop {
1082 if cancel.load(Ordering::Acquire) {
1083 break;
1084 }
1085 tokio::time::sleep(HELPER_POLL_INTERVAL).await;
1086 if pool.pending_claimers() > 0 {
1087 continue;
1088 }
1089 let mut target = None;
1090 for (idx, q) in queues.iter().enumerate() {
1091 let scaler_alive = scaler_active[idx].load(Ordering::Acquire);
1092 let has_pending = q.pushed_segments() > q.popped_segments();
1093 if scaler_alive || has_pending {
1094 target = Some(idx);
1095 break;
1096 }
1097 }
1098 let Some(rung_idx) = target else { break };
1099 let lease = match pool.try_claim() {
1100 Some(l) => l,
1101 None => continue,
1102 };
1103 tracing::info!(rung_idx, gpu_index = lease.gpu_index, "single-file helper dispatch");
1104 spawn_chunk_worker(
1105 &ctx,
1106 rung_idx,
1107 &rungs_owned[rung_idx],
1108 Arc::clone(&queues[rung_idx]),
1109 Arc::clone(&frames_encoded[rung_idx]),
1110 lease,
1111 Arc::clone(&contributions[rung_idx]),
1112 Arc::clone(&active_workers),
1113 Arc::clone(&rung_done),
1114 Arc::clone(&rung_invariants[rung_idx]),
1115 None,
1116 );
1117 }
1118 })
1119 };
1120
1121 let mut completed: Vec<Option<RungPackets>> = (0..n).map(|_| None).collect();
1123 let mut pumps_remaining = pump_tasks.len();
1124 let mut scalers_remaining = n;
1125 let mut workers_remaining = n;
1126 let mut finalizers_remaining = n;
1127 macro_rules! teardown_err {
1128 ($e:expr) => {{
1129 helper_cancel.store(true, Ordering::Release);
1130 let _ = helper_handle.await;
1131 progress_stop.store(true, Ordering::Release);
1132 let _ = progress_handle.await;
1133 return Err($e);
1134 }};
1135 }
1136 while pumps_remaining > 0 || scalers_remaining > 0 || workers_remaining > 0 || finalizers_remaining > 0 {
1137 tokio::select! {
1138 biased;
1139 p = pump_tasks.join_next(), if pumps_remaining > 0 => match p {
1140 Some(Ok(Ok(_))) => pumps_remaining -= 1,
1141 Some(Ok(Err(e))) => teardown_err!(anyhow!("decode pump failed: {e}")),
1142 Some(Err(je)) => teardown_err!(anyhow!("pump join error: {je}")),
1143 None => pumps_remaining = 0,
1144 },
1145 s = scaler_tasks.join_next(), if scalers_remaining > 0 => match s {
1146 Some(Ok((_, Ok(_)))) => scalers_remaining -= 1,
1147 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("scaler {idx} failed: {e}")),
1148 Some(Err(je)) => teardown_err!(anyhow!("scaler join error: {je}")),
1149 None => scalers_remaining = 0,
1150 },
1151 w = worker_tasks.join_next(), if workers_remaining > 0 => match w {
1152 Some(Ok((_, Ok(())))) => workers_remaining -= 1,
1153 Some(Ok((idx, Err(e)))) => teardown_err!(anyhow!("chunk worker for rung {idx} failed: {e}")),
1154 Some(Err(je)) => teardown_err!(anyhow!("worker join error: {je}")),
1155 None => workers_remaining = 0,
1156 },
1157 f = finalizer_rx.recv(), if finalizers_remaining > 0 => match f {
1158 Some((idx, Ok(opt))) => { completed[idx] = opt; finalizers_remaining -= 1; }
1159 Some((idx, Err(e))) => teardown_err!(anyhow!("finalizer for rung {idx} failed: {e}")),
1160 None => finalizers_remaining = 0,
1161 },
1162 }
1163 }
1164 helper_cancel.store(true, Ordering::Release);
1165 let _ = helper_handle.await;
1166 progress_stop.store(true, Ordering::Release);
1167 let _ = progress_handle.await;
1168 for h in finalizer_handles {
1169 let _ = h.await;
1170 }
1171 Ok(completed)
1172}
1173
1174#[allow(clippy::too_many_arguments)]
1175fn spawn_chunk_worker(
1176 ctx: &WorkerCtx,
1177 rung_idx: usize,
1178 rung: &Rung,
1179 queue: Arc<SegmentChunkQueue>,
1180 frames_encoded: Arc<AtomicU64>,
1181 lease: GpuLease,
1182 collector: Arc<std::sync::Mutex<Vec<ChunkPackets>>>,
1183 active_workers: Arc<Vec<AtomicUsize>>,
1184 rung_done: Arc<Vec<Notify>>,
1185 rung_invariant: Arc<std::sync::RwLock<Option<RungCodecInvariant>>>,
1186 worker_tasks: Option<&mut JoinSet<(usize, Result<()>)>>,
1187) {
1188 let gpu_index = lease.gpu_index;
1189 let gpu_vendor = lease.vendor;
1190 let cfg = EncoderWorkerConfig {
1191 rung_idx,
1192 codec: ctx.codec,
1193 width: rung.width,
1194 height: rung.height,
1195 frame_rate: ctx.frame_rate,
1196 quality: rung.quality.crf.unwrap_or(AUTO_FROM_TARGET),
1197 speed_preset: rung.quality.speed_preset.unwrap_or(AUTO_FROM_TARGET),
1198 target: rung.quality.target,
1199 tier: rung.quality.tier,
1200 threads: 0,
1201 gpu_index: Some(gpu_index),
1202 gpu_vendor: Some(gpu_vendor),
1203 output_color_metadata: ctx.output_color_metadata,
1204 output_pixel_format: ctx.output_pixel_format,
1205 constant_qp: ctx.constant_qp,
1206 timescale: ctx.timescale,
1207 per_frame_ticks: ctx.per_frame_ticks,
1208 keyframe_interval: ctx.keyframe_interval,
1209 segment_target_ticks: ctx.segment_target_ticks,
1210 output_dir: ctx.output_root.clone(),
1211 rung_invariant,
1212 };
1213 active_workers[rung_idx].fetch_add(1, Ordering::AcqRel);
1214 let body = async move {
1215 let (progress_tx, mut progress_rx) = mpsc::channel::<u64>(32);
1216 let cfg_for_worker = cfg.clone();
1217 let queue_for_worker = Arc::clone(&queue);
1218 let rt = tokio::runtime::Handle::current();
1219 let counter = Arc::clone(&frames_encoded);
1220 let out = Arc::clone(&collector);
1221 let blocking = tokio::task::spawn_blocking(move || {
1222 run_chunk_encoder_worker_blocking(cfg_for_worker, queue_for_worker, rt, counter, progress_tx, out)
1223 });
1224 let drain = async move { while progress_rx.recv().await.is_some() {} };
1225 let (_, br) = tokio::join!(drain, blocking);
1226 let task_status: Result<()> = match br {
1227 Ok(Ok(())) => Ok(()),
1228 Ok(Err(e)) => Err(e),
1229 Err(e) => Err(anyhow!("chunk worker join error: {e}")),
1230 };
1231 drop(lease);
1232 let prev = active_workers[rung_idx].fetch_sub(1, Ordering::AcqRel);
1233 if prev == 1 {
1234 rung_done[rung_idx].notify_one();
1235 }
1236 (rung_idx, task_status)
1237 };
1238 match worker_tasks {
1239 Some(set) => {
1240 set.spawn(body);
1241 }
1242 None => {
1243 tokio::spawn(async move {
1244 let _ = body.await;
1245 });
1246 }
1247 }
1248}