Skip to main content

streaming_crypto/core_api/parallelism/
profiles.rs

1// ## 3. `src/parallelism/profiles.rs`
2
3use sysinfo::{System};
4use tracing::debug;
5use crate::{headers::{HeaderV1, Strategy}, types::StreamError};
6
7pub const GPU_THRESHOLD: usize = 4 * 1024 * 1024; // 4 MB
8
9#[derive(Debug, Copy, Clone)]
10pub enum GpuBackend {
11    None,
12    Cuda,
13    Wgpu,
14    OpenCL,
15}
16
17#[derive(Debug, Clone)]
18pub struct GpuInfo {
19    pub count: usize,
20    pub backend: GpuBackend,
21    pub device_names: Vec<String>,
22}
23
24#[cfg(feature = "cuda")]
25fn detect_cuda_devices() -> (usize, Vec<String>) {
26    // cudarc automatically queries available devices
27    let devices = cudarc::driver::CudaDevice::devices().unwrap_or_default();
28    let count = devices.len();
29    let names: Vec<String> = devices
30        .iter()
31        .map(|d| d.name().unwrap_or_else(|_| "Unknown CUDA device".to_string()))
32        .collect();
33
34    if count == 0 {
35        debug!("[GPU DETECT] No CUDA devices available");
36    }
37    (count, names)
38}
39
40#[cfg(feature = "metal")]
41fn detect_wgpu_devices() -> (usize, Vec<String>) {
42    let instance = wgpu::Instance::default();
43    // enumerate_adapters returns a Future in wgpu 0.28
44    let adapters: Vec<wgpu::Adapter> =
45        pollster::block_on(instance.enumerate_adapters(wgpu::Backends::all()));
46
47    let count = adapters.len();
48    let names: Vec<String> = adapters
49        .iter()
50        .map(|a| a.get_info().name.clone())
51        .collect();
52
53    (count, names)
54}
55
56#[cfg(feature = "opencl")]
57fn detect_opencl_devices() -> (usize, Vec<String>) {
58    use opencl3::platform::get_platforms;
59    use opencl3::device::{get_all_devices, Device, CL_DEVICE_TYPE_ALL};
60
61    let mut names = Vec::new();
62    let mut count = 0;
63
64    match get_platforms() {
65        Ok(platforms) => {
66            for _p in platforms {
67                // get_all_devices only takes a device type
68                if let Ok(device_ids) = get_all_devices(CL_DEVICE_TYPE_ALL) {
69                    count += device_ids.len();
70                    names.extend(device_ids.iter().map(|&id| {
71                        let dev = Device::new(id);
72                        dev.name().unwrap_or_else(|_| "Unknown OpenCL device".to_string())
73                    }));
74                }
75            }
76        }
77        Err(_) => debug!("[GPU DETECT] No OpenCL platforms available"),
78    }
79    (count, names)
80}
81
82pub fn detect_gpu_info() -> GpuInfo {
83    // CUDA
84    #[cfg(feature = "cuda")]
85    {
86        let (count, names) = detect_cuda_devices();
87        if count > 0 {
88            eprintln!( "[GPU DETECT] CUDA devices found: {} ({})", count, names.join(", ") );
89            return GpuInfo {
90                count,
91                backend: GpuBackend::Cuda,
92                device_names: names,
93            };
94        }
95    }
96
97    // Vulkan/Metal/DX via wgpu
98    #[cfg(feature = "metal")]
99    {
100        // let count = pollster::block_on(detect_wgpu_count());
101        let (count, names) = detect_wgpu_devices();
102        if count > 0 {
103            eprintln!( "[GPU DETECT] wgpu adapters found: {} ({})", count, names.join(", ") );
104            return GpuInfo {
105                count,
106                backend: GpuBackend::Wgpu,
107                device_names: names,
108            };
109        }
110    }
111
112    // OpenCL
113    #[cfg(feature = "opencl")]
114    {
115        let (count, names) = detect_opencl_devices();
116        if count > 0 {
117            eprintln!( "[GPU DETECT] OpenCL adapters found: {} ({})", count, names.join(", ") );
118            return GpuInfo {
119                count,
120                backend: GpuBackend::OpenCL,
121                device_names: names,
122            };
123        }
124    }
125
126    // Fallback
127    eprintln!("[GPU DETECT] No GPU devices found");
128    GpuInfo {
129        count: 0,
130        backend: GpuBackend::None,
131        device_names: Vec::new(),
132    }
133}
134
135/// Parallelism configuration
136#[derive(Debug, Clone)]
137pub struct ParallelismConfig {
138    cpu_workers: usize, 
139    gpu_workers: usize, 
140    mem_fraction: f64,
141    hard_cap: usize,
142}
143
144impl Default for ParallelismConfig {
145    fn default() -> Self {
146        Self {
147            cpu_workers: 1,
148            gpu_workers: 0, 
149            mem_fraction: 0.2, // 20% of free memory 
150            hard_cap: 4, // Max in-flights segments limit on dynamic
151        }
152    }
153}
154impl ParallelismConfig {
155    pub fn new(cpu_workers: usize, gpu_workers: usize, mem_fraction: f64, hard_cap: usize) -> Self {
156        Self {
157            cpu_workers,
158            gpu_workers,
159            mem_fraction,
160            hard_cap,
161        }
162    }
163}
164
165/// Parallelism configuration
166#[derive(Debug, Clone)]
167pub struct HybridParallelismProfile {
168    cpu_workers: usize,
169    gpu_workers: usize,
170    inflight_segments: usize,
171    gpu_threshold: usize,
172    gpu: Option<GpuInfo>,
173}
174
175impl HybridParallelismProfile {
176    /// Controlled constructor
177    fn new(max_segment_size: u32, opts: ParallelismConfig) -> Self {
178        let gpu = detect_gpu_info();
179
180        // enforce sane limits
181        let cpu_workers = opts.cpu_workers.clamp(1, num_cpus::get().saturating_sub(1));
182        let gpu_workers = opts.gpu_workers.clamp(0, gpu.count);
183
184        // --- memory based inflight_segments calculation ---
185        let mut sys = System::new();
186        sys.refresh_memory();
187
188        // available memory in bytes
189        let avail_bytes = sys.available_memory() * 1024; // KiB → bytes
190
191        // fraction capped to 0.75 (use at most 75% of available memory)
192        let fraction = opts.mem_fraction.min(0.75);
193        let usable_bytes = (avail_bytes as f64 * fraction) as u64;
194
195        // cast max_segment_size to u64 before division
196        let max_segments = (usable_bytes / max_segment_size as u64).max(1) as usize;
197
198        // clamp against hard_cap
199        let inflight_segments_cap = opts.hard_cap.clamp(1, max_segments);
200        // Derive inflight segments from worker count
201        let inflight_cpus = inflight_segments_cap.min(cpu_workers * 4);
202        let inflight_gpus = inflight_segments_cap.min(gpu_workers * 4);
203        let inflight_segments = inflight_cpus.max(inflight_gpus) as usize;
204
205        Self {
206            cpu_workers,
207            gpu_workers,
208            inflight_segments,
209            gpu_threshold: GPU_THRESHOLD,
210            gpu: Some(gpu),
211        }
212    }
213
214    pub fn with_strategy(
215        strategy: Strategy,
216        max_segment_size: u32,
217        config: Option<ParallelismConfig>,
218    ) -> Result<Self, StreamError> {
219        let opts = config.unwrap_or_default();
220        match strategy {
221            Strategy::Auto => Ok(Self::dynamic(max_segment_size)),
222            Strategy::Sequential => Ok(Self::single_threaded()),
223            Strategy::Parallel => Ok(Self::new(max_segment_size, opts)),
224        }
225    }
226
227    pub fn from_stream_header(
228        header: HeaderV1,
229        config: Option<ParallelismConfig>,
230    ) -> Result<Self, StreamError> {
231        let max_segment_size = header.chunk_size;
232        let strategy = Strategy::from(header.strategy).map_err(StreamError::Header)?;
233
234        Self::with_strategy(strategy, max_segment_size, config)
235    }
236
237    /// Read-only accessors
238    pub fn cpu_workers(&self) -> usize {
239        self.cpu_workers
240    }
241
242    pub fn gpu_workers(&self) -> usize {
243        self.gpu_workers
244    }
245
246    pub fn inflight_segments(&self) -> usize {
247        self.inflight_segments
248    }
249
250    pub fn gpu_threshold(&self) -> usize {
251        self.gpu_threshold
252    }
253
254    pub fn gpu(&self) -> Option<GpuInfo> {
255        self.gpu.clone()
256    }
257    
258    pub fn single_threaded() -> Self {
259        let gpu = detect_gpu_info();
260        Self {
261            cpu_workers: 1,
262            gpu_workers: 1.clamp(0, gpu.count),
263            inflight_segments: 1,
264            gpu_threshold: GPU_THRESHOLD,
265            gpu: Some(gpu),
266        }
267    }
268    // * On a machine with 16 cores and 16 GB free RAM:
269    // * `worker_count = 15`
270    // * `budget = 8 GB` (50% of 16 GB)
271    // * `max_segment_size = 32 MB`
272    // * `max_segments = 8192 MB / 32 MB = 256`
273    // * With `hard_cap = 64`, we get `inflight_segments = 64`.
274    pub fn semi_dynamic(max_segment_size: u32, mem_fraction: f64, hard_cap: usize) -> Self {
275        // * Hyperthreads do not double AES throughput.
276        // * Physical cores matter.
277        let cores = num_cpus::get().saturating_sub(2);
278        let cpu_workers = cores.max(1);
279
280        let mut sys = sysinfo::System::new_all();
281        sys.refresh_memory();
282        let avail_bytes = sys.available_memory() * 1024;
283        let budget = (avail_bytes as f64 * mem_fraction) as u32;
284        let max_segments = budget / max_segment_size;
285
286        let gpu = detect_gpu_info();
287        let gpu_workers = gpu.count;
288
289        debug!(
290            "[PROFILE] cpu_workers={}, gpu_workers={}, inflight_segments={}",
291            cpu_workers,
292            gpu_workers,
293            max_segments.min(hard_cap as u32)
294        );
295
296        Self {
297            cpu_workers,
298            gpu_workers,
299            inflight_segments: max_segments.min(hard_cap as u32) as usize,
300            gpu_threshold: GPU_THRESHOLD,
301            gpu: Some(gpu),
302        }
303    }
304
305    // * On a machine with 16 cores and 16 GB free RAM:
306    // * `worker_count = 15`
307    // * `budget = 8 GB` (50% of 16 GB)
308    // * `max_segment_size = 32 MB`
309    // * `max_segments = 8192 MB / 32 MB = 256`
310    // * With `hard_cap = 64`, we get `inflight_segments = 64`.
311    pub fn dynamic(max_segment_size: u32) -> Self {
312        let cores = num_cpus::get().saturating_sub(2);
313        let cpu_workers = cores.max(1);
314
315        let mut sys = sysinfo::System::new_all();
316        sys.refresh_memory();
317        let avail_bytes = sys.available_memory() * 1024;
318
319        // Leave 25% headroom for OS and other processes
320        let budget = (avail_bytes as f64 * 0.75) as u32;
321        let max_segments = budget / max_segment_size;
322
323        let gpu = detect_gpu_info();
324        let gpu_workers = gpu.count;
325
326        // Derive inflight segments from worker count
327        let inflight_cpus = max_segments.min(cpu_workers as u32 * 4);
328        let inflight_gpus = max_segments.min(gpu_workers as u32 * 4);
329        let inflight_segments = inflight_cpus.max(inflight_gpus) as usize;
330
331        debug!(
332            "[PROFILE] cpu_workers={}, gpu_workers={}, inflight_segments={}",
333            cpu_workers, gpu_workers, inflight_segments
334        );
335
336        Self {
337            cpu_workers,
338            gpu_workers,
339            inflight_segments,
340            gpu_threshold: GPU_THRESHOLD,
341            gpu: Some(gpu),
342        }
343    }
344
345}