Skip to main content

streaming_crypto/core_api/parallelism/
scheduler.rs

1// ## 3. `src/parallelism/scheduler.rs`
2
3#[derive(Debug, Clone, PartialEq)]
4pub enum WorkerTarget {
5    Cpu(usize), // index of CPU worker
6    Gpu(usize), // index of GPU device
7}
8
9/// Decide where to dispatch a segment based on size and load.
10pub fn dispatch_segment(
11    segment_size: usize,
12    cpu_workers: usize,
13    gpu_workers: usize,
14    gpu_threshold: usize, // e.g. 8 MB
15    cpu_load: &[usize],   // queue depth per CPU worker
16    gpu_load: &[usize],   // queue depth per GPU device
17) -> Option<WorkerTarget> {
18    if gpu_workers > 0 && segment_size >= gpu_threshold {
19        // Choose GPU with lowest load
20        let (idx, _) = gpu_load
21            .iter()
22            .enumerate()
23            .min_by_key(|(_, load)| *load)
24            .unwrap();
25       Some(WorkerTarget::Gpu(idx))
26    } 
27    else if cpu_workers > 0 {
28        // Choose CPU with lowest load
29        let (idx, _) = cpu_load
30            .iter()
31            .enumerate()
32            .min_by_key(|(_, load)| *load)
33            .unwrap();
34        Some(WorkerTarget::Cpu(idx))
35    }
36    else {
37        None
38    }
39}
40
41pub struct Scheduler {
42    cpu_load: Vec<usize>, // queue depth per CPU worker
43    gpu_load: Vec<usize>, // queue depth per GPU device
44    gpu_threshold: usize, // segment size threshold for GPU dispatch
45}
46
47impl Scheduler {
48    pub fn new(cpu_workers: usize, gpu_workers: usize, gpu_threshold: usize) -> Self {
49        Scheduler {
50            cpu_load: vec![0; cpu_workers],
51            gpu_load: vec![0; gpu_workers],
52            gpu_threshold,
53        }
54    }
55
56    /// Dispatch a segment to CPU or GPU based on size and current load
57    pub fn dispatch(&mut self, segment_size: usize) -> WorkerTarget {
58        if !self.gpu_load.is_empty() && segment_size >= self.gpu_threshold {
59            // Choose GPU with lowest load
60            let (idx, _) = self.gpu_load
61                .iter()
62                .enumerate()
63                .min_by_key(|(_, load)| *load)
64                .unwrap();
65            self.gpu_load[idx] += 1; // increment load
66            WorkerTarget::Gpu(idx)
67        } else {
68            // Choose CPU with lowest load
69            let (idx, _) = self.cpu_load
70                .iter()
71                .enumerate()
72                .min_by_key(|(_, load)| *load)
73                .unwrap();
74            self.cpu_load[idx] += 1; // increment load
75            WorkerTarget::Cpu(idx)
76        }
77    }
78
79    /// Mark a worker as finished with a segment
80    pub fn complete(&mut self, target: WorkerTarget) {
81        match target {
82            WorkerTarget::Cpu(idx) => {
83                if self.cpu_load[idx] > 0 {
84                    self.cpu_load[idx] -= 1;
85                }
86            }
87            WorkerTarget::Gpu(idx) => {
88                if self.gpu_load[idx] > 0 {
89                    self.gpu_load[idx] -= 1;
90                }
91            }
92        }
93    }
94}