streaming_crypto/core_api/parallelism/
profiles.rs1use sysinfo::{System};
4use tracing::debug;
5use crate::{headers::{HeaderV1, Strategy}, types::StreamError};
6
7pub const GPU_THRESHOLD: usize = 4 * 1024 * 1024; #[derive(Debug, Copy, Clone)]
10pub enum GpuBackend {
11 None,
12 Cuda,
13 Wgpu,
14 OpenCL,
15}
16
17#[derive(Debug, Clone)]
18pub struct GpuInfo {
19 pub count: usize,
20 pub backend: GpuBackend,
21 pub device_names: Vec<String>,
22}
23
24#[cfg(feature = "cuda")]
25fn detect_cuda_devices() -> (usize, Vec<String>) {
26 let devices = cudarc::driver::CudaDevice::devices().unwrap_or_default();
28 let count = devices.len();
29 let names: Vec<String> = devices
30 .iter()
31 .map(|d| d.name().unwrap_or_else(|_| "Unknown CUDA device".to_string()))
32 .collect();
33
34 if count == 0 {
35 debug!("[GPU DETECT] No CUDA devices available");
36 }
37 (count, names)
38}
39
40#[cfg(feature = "metal")]
41fn detect_wgpu_devices() -> (usize, Vec<String>) {
42 let instance = wgpu::Instance::default();
43 let adapters: Vec<wgpu::Adapter> =
45 pollster::block_on(instance.enumerate_adapters(wgpu::Backends::all()));
46
47 let count = adapters.len();
48 let names: Vec<String> = adapters
49 .iter()
50 .map(|a| a.get_info().name.clone())
51 .collect();
52
53 (count, names)
54}
55
56#[cfg(feature = "opencl")]
57fn detect_opencl_devices() -> (usize, Vec<String>) {
58 use opencl3::platform::get_platforms;
59 use opencl3::device::{get_all_devices, Device, CL_DEVICE_TYPE_ALL};
60
61 let mut names = Vec::new();
62 let mut count = 0;
63
64 match get_platforms() {
65 Ok(platforms) => {
66 for _p in platforms {
67 if let Ok(device_ids) = get_all_devices(CL_DEVICE_TYPE_ALL) {
69 count += device_ids.len();
70 names.extend(device_ids.iter().map(|&id| {
71 let dev = Device::new(id);
72 dev.name().unwrap_or_else(|_| "Unknown OpenCL device".to_string())
73 }));
74 }
75 }
76 }
77 Err(_) => debug!("[GPU DETECT] No OpenCL platforms available"),
78 }
79 (count, names)
80}
81
82pub fn detect_gpu_info() -> GpuInfo {
83 #[cfg(feature = "cuda")]
85 {
86 let (count, names) = detect_cuda_devices();
87 if count > 0 {
88 eprintln!( "[GPU DETECT] CUDA devices found: {} ({})", count, names.join(", ") );
89 return GpuInfo {
90 count,
91 backend: GpuBackend::Cuda,
92 device_names: names,
93 };
94 }
95 }
96
97 #[cfg(feature = "metal")]
99 {
100 let (count, names) = detect_wgpu_devices();
102 if count > 0 {
103 eprintln!( "[GPU DETECT] wgpu adapters found: {} ({})", count, names.join(", ") );
104 return GpuInfo {
105 count,
106 backend: GpuBackend::Wgpu,
107 device_names: names,
108 };
109 }
110 }
111
112 #[cfg(feature = "opencl")]
114 {
115 let (count, names) = detect_opencl_devices();
116 if count > 0 {
117 eprintln!( "[GPU DETECT] OpenCL adapters found: {} ({})", count, names.join(", ") );
118 return GpuInfo {
119 count,
120 backend: GpuBackend::OpenCL,
121 device_names: names,
122 };
123 }
124 }
125
126 eprintln!("[GPU DETECT] No GPU devices found");
128 GpuInfo {
129 count: 0,
130 backend: GpuBackend::None,
131 device_names: Vec::new(),
132 }
133}
134
135#[derive(Debug, Clone)]
137pub struct ParallelismConfig {
138 cpu_workers: usize,
139 gpu_workers: usize,
140 mem_fraction: f64,
141 hard_cap: usize,
142}
143
144impl Default for ParallelismConfig {
145 fn default() -> Self {
146 Self {
147 cpu_workers: 1,
148 gpu_workers: 0,
149 mem_fraction: 0.2, hard_cap: 4, }
152 }
153}
154impl ParallelismConfig {
155 pub fn new(cpu_workers: usize, gpu_workers: usize, mem_fraction: f64, hard_cap: usize) -> Self {
156 Self {
157 cpu_workers,
158 gpu_workers,
159 mem_fraction,
160 hard_cap,
161 }
162 }
163}
164
165#[derive(Debug, Clone)]
167pub struct HybridParallelismProfile {
168 cpu_workers: usize,
169 gpu_workers: usize,
170 inflight_segments: usize,
171 gpu_threshold: usize,
172 gpu: Option<GpuInfo>,
173}
174
175impl HybridParallelismProfile {
176 fn new(max_segment_size: u32, opts: ParallelismConfig) -> Self {
178 let gpu = detect_gpu_info();
179
180 let cpu_workers = opts.cpu_workers.clamp(1, num_cpus::get().saturating_sub(1));
182 let gpu_workers = opts.gpu_workers.clamp(0, gpu.count);
183
184 let mut sys = System::new();
186 sys.refresh_memory();
187
188 let avail_bytes = sys.available_memory() * 1024; let fraction = opts.mem_fraction.min(0.75);
193 let usable_bytes = (avail_bytes as f64 * fraction) as u64;
194
195 let max_segments = (usable_bytes / max_segment_size as u64).max(1) as usize;
197
198 let inflight_segments_cap = opts.hard_cap.clamp(1, max_segments);
200 let inflight_cpus = inflight_segments_cap.min(cpu_workers * 4);
202 let inflight_gpus = inflight_segments_cap.min(gpu_workers * 4);
203 let inflight_segments = inflight_cpus.max(inflight_gpus) as usize;
204
205 Self {
206 cpu_workers,
207 gpu_workers,
208 inflight_segments,
209 gpu_threshold: GPU_THRESHOLD,
210 gpu: Some(gpu),
211 }
212 }
213
214 pub fn with_strategy(
215 strategy: Strategy,
216 max_segment_size: u32,
217 config: Option<ParallelismConfig>,
218 ) -> Result<Self, StreamError> {
219 let opts = config.unwrap_or_default();
220 match strategy {
221 Strategy::Auto => Ok(Self::dynamic(max_segment_size)),
222 Strategy::Sequential => Ok(Self::single_threaded()),
223 Strategy::Parallel => Ok(Self::new(max_segment_size, opts)),
224 }
225 }
226
227 pub fn from_stream_header(
228 header: HeaderV1,
229 config: Option<ParallelismConfig>,
230 ) -> Result<Self, StreamError> {
231 let max_segment_size = header.chunk_size;
232 let strategy = Strategy::from(header.strategy).map_err(StreamError::Header)?;
233
234 Self::with_strategy(strategy, max_segment_size, config)
235 }
236
237 pub fn cpu_workers(&self) -> usize {
239 self.cpu_workers
240 }
241
242 pub fn gpu_workers(&self) -> usize {
243 self.gpu_workers
244 }
245
246 pub fn inflight_segments(&self) -> usize {
247 self.inflight_segments
248 }
249
250 pub fn gpu_threshold(&self) -> usize {
251 self.gpu_threshold
252 }
253
254 pub fn gpu(&self) -> Option<GpuInfo> {
255 self.gpu.clone()
256 }
257
258 pub fn single_threaded() -> Self {
259 let gpu = detect_gpu_info();
260 Self {
261 cpu_workers: 1,
262 gpu_workers: 1.clamp(0, gpu.count),
263 inflight_segments: 1,
264 gpu_threshold: GPU_THRESHOLD,
265 gpu: Some(gpu),
266 }
267 }
268 pub fn semi_dynamic(max_segment_size: u32, mem_fraction: f64, hard_cap: usize) -> Self {
275 let cores = num_cpus::get().saturating_sub(2);
278 let cpu_workers = cores.max(1);
279
280 let mut sys = sysinfo::System::new_all();
281 sys.refresh_memory();
282 let avail_bytes = sys.available_memory() * 1024;
283 let budget = (avail_bytes as f64 * mem_fraction) as u32;
284 let max_segments = budget / max_segment_size;
285
286 let gpu = detect_gpu_info();
287 let gpu_workers = gpu.count;
288
289 debug!(
290 "[PROFILE] cpu_workers={}, gpu_workers={}, inflight_segments={}",
291 cpu_workers,
292 gpu_workers,
293 max_segments.min(hard_cap as u32)
294 );
295
296 Self {
297 cpu_workers,
298 gpu_workers,
299 inflight_segments: max_segments.min(hard_cap as u32) as usize,
300 gpu_threshold: GPU_THRESHOLD,
301 gpu: Some(gpu),
302 }
303 }
304
305 pub fn dynamic(max_segment_size: u32) -> Self {
312 let cores = num_cpus::get().saturating_sub(2);
313 let cpu_workers = cores.max(1);
314
315 let mut sys = sysinfo::System::new_all();
316 sys.refresh_memory();
317 let avail_bytes = sys.available_memory() * 1024;
318
319 let budget = (avail_bytes as f64 * 0.75) as u32;
321 let max_segments = budget / max_segment_size;
322
323 let gpu = detect_gpu_info();
324 let gpu_workers = gpu.count;
325
326 let inflight_cpus = max_segments.min(cpu_workers as u32 * 4);
328 let inflight_gpus = max_segments.min(gpu_workers as u32 * 4);
329 let inflight_segments = inflight_cpus.max(inflight_gpus) as usize;
330
331 debug!(
332 "[PROFILE] cpu_workers={}, gpu_workers={}, inflight_segments={}",
333 cpu_workers, gpu_workers, inflight_segments
334 );
335
336 Self {
337 cpu_workers,
338 gpu_workers,
339 inflight_segments,
340 gpu_threshold: GPU_THRESHOLD,
341 gpu: Some(gpu),
342 }
343 }
344
345}