streaming_crypto/core_api/parallelism/
scheduler.rs1#[derive(Debug, Clone, PartialEq)]
4pub enum WorkerTarget {
5 Cpu(usize), Gpu(usize), }
8
9pub fn dispatch_segment(
11 segment_size: usize,
12 cpu_workers: usize,
13 gpu_workers: usize,
14 gpu_threshold: usize, cpu_load: &[usize], gpu_load: &[usize], ) -> Option<WorkerTarget> {
18 if gpu_workers > 0 && segment_size >= gpu_threshold {
19 let (idx, _) = gpu_load
21 .iter()
22 .enumerate()
23 .min_by_key(|(_, load)| *load)
24 .unwrap();
25 Some(WorkerTarget::Gpu(idx))
26 }
27 else if cpu_workers > 0 {
28 let (idx, _) = cpu_load
30 .iter()
31 .enumerate()
32 .min_by_key(|(_, load)| *load)
33 .unwrap();
34 Some(WorkerTarget::Cpu(idx))
35 }
36 else {
37 None
38 }
39}
40
41pub struct Scheduler {
42 cpu_load: Vec<usize>, gpu_load: Vec<usize>, gpu_threshold: usize, }
46
47impl Scheduler {
48 pub fn new(cpu_workers: usize, gpu_workers: usize, gpu_threshold: usize) -> Self {
49 Scheduler {
50 cpu_load: vec![0; cpu_workers],
51 gpu_load: vec![0; gpu_workers],
52 gpu_threshold,
53 }
54 }
55
56 pub fn dispatch(&mut self, segment_size: usize) -> WorkerTarget {
58 if !self.gpu_load.is_empty() && segment_size >= self.gpu_threshold {
59 let (idx, _) = self.gpu_load
61 .iter()
62 .enumerate()
63 .min_by_key(|(_, load)| *load)
64 .unwrap();
65 self.gpu_load[idx] += 1; WorkerTarget::Gpu(idx)
67 } else {
68 let (idx, _) = self.cpu_load
70 .iter()
71 .enumerate()
72 .min_by_key(|(_, load)| *load)
73 .unwrap();
74 self.cpu_load[idx] += 1; WorkerTarget::Cpu(idx)
76 }
77 }
78
79 pub fn complete(&mut self, target: WorkerTarget) {
81 match target {
82 WorkerTarget::Cpu(idx) => {
83 if self.cpu_load[idx] > 0 {
84 self.cpu_load[idx] -= 1;
85 }
86 }
87 WorkerTarget::Gpu(idx) => {
88 if self.gpu_load[idx] > 0 {
89 self.gpu_load[idx] -= 1;
90 }
91 }
92 }
93 }
94}