ringkernel_core/hybrid/
dispatcher.rs1use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use super::config::{HybridConfig, ProcessingMode};
8use super::error::{HybridError, HybridResult};
9use super::stats::HybridStats;
10use super::traits::HybridWorkload;
11
12pub struct HybridDispatcher {
18 config: HybridConfig,
20 stats: Arc<HybridStats>,
22 adaptive_threshold: AtomicUsize,
24}
25
26impl HybridDispatcher {
27 #[must_use]
29 pub fn new(config: HybridConfig) -> Self {
30 let initial_threshold = match config.mode {
31 ProcessingMode::Hybrid { gpu_threshold } => gpu_threshold,
32 _ => 10_000,
33 };
34
35 Self {
36 config,
37 stats: Arc::new(HybridStats::new()),
38 adaptive_threshold: AtomicUsize::new(initial_threshold),
39 }
40 }
41
42 #[must_use]
44 pub fn with_defaults() -> Self {
45 Self::new(HybridConfig::default())
46 }
47
48 #[must_use]
50 pub fn should_use_gpu(&self, workload_size: usize) -> bool {
51 if !self.config.gpu_available {
52 return false;
53 }
54
55 match self.config.mode {
56 ProcessingMode::GpuOnly => true,
57 ProcessingMode::CpuOnly => false,
58 ProcessingMode::Hybrid { gpu_threshold } => workload_size >= gpu_threshold,
59 ProcessingMode::Adaptive => {
60 workload_size >= self.adaptive_threshold.load(Ordering::Relaxed)
61 }
62 }
63 }
64
65 pub fn execute<W: HybridWorkload>(&self, workload: &W) -> HybridResult<W::Result> {
69 let size = workload.workload_size();
70
71 if self.config.max_workload_size > 0 && size > self.config.max_workload_size {
73 return Err(HybridError::WorkloadTooLarge {
74 requested: size,
75 maximum: self.config.max_workload_size,
76 });
77 }
78
79 let use_gpu = self.should_use_gpu(size) && workload.supports_gpu();
80
81 if use_gpu {
82 let start = Instant::now();
83 let result = workload.execute_gpu()?;
84 let elapsed = start.elapsed();
85 self.stats.record_gpu_execution(elapsed, size);
86 Ok(result)
87 } else {
88 let start = Instant::now();
89 let result = workload.execute_cpu();
90 let elapsed = start.elapsed();
91 self.stats.record_cpu_execution(elapsed, size);
92 Ok(result)
93 }
94 }
95
96 pub fn execute_measured<W: HybridWorkload>(&self, workload: &W) -> HybridResult<W::Result>
101 where
102 W::Result: Clone,
103 {
104 let size = workload.workload_size();
105
106 if !self.config.gpu_available || !workload.supports_gpu() {
107 let start = Instant::now();
108 let result = workload.execute_cpu();
109 let elapsed = start.elapsed();
110 self.stats.record_cpu_execution(elapsed, size);
111 return Ok(result);
112 }
113
114 let cpu_start = Instant::now();
116 let cpu_result = workload.execute_cpu();
117 let cpu_time = cpu_start.elapsed();
118
119 let gpu_start = Instant::now();
121 let gpu_result = workload.execute_gpu()?;
122 let gpu_time = gpu_start.elapsed();
123
124 self.update_adaptive_threshold(size, cpu_time, gpu_time);
126
127 if gpu_time < cpu_time {
129 self.stats.record_gpu_execution(gpu_time, size);
130 Ok(gpu_result)
131 } else {
132 self.stats.record_cpu_execution(cpu_time, size);
133 Ok(cpu_result)
134 }
135 }
136
137 pub fn update_adaptive_threshold(
139 &self,
140 _workload_size: usize,
141 cpu_time: Duration,
142 gpu_time: Duration,
143 ) {
144 if !matches!(self.config.mode, ProcessingMode::Adaptive) {
145 return;
146 }
147
148 let current = self.adaptive_threshold.load(Ordering::Relaxed);
149 let ratio = cpu_time.as_nanos() as f32 / gpu_time.as_nanos().max(1) as f32;
150
151 let new_threshold = if ratio > 1.5 {
152 let adjustment = (current as f32 * self.config.learning_rate) as usize;
154 current
155 .saturating_sub(adjustment)
156 .max(self.config.min_adaptive_threshold)
157 } else if ratio < 0.7 {
158 let adjustment = (current as f32 * self.config.learning_rate) as usize;
160 current
161 .saturating_add(adjustment)
162 .min(self.config.max_adaptive_threshold)
163 } else {
164 current
165 };
166
167 self.adaptive_threshold
168 .store(new_threshold, Ordering::Relaxed);
169 self.stats.set_learned_threshold(new_threshold);
170 }
171
172 pub fn execute_cpu<W: HybridWorkload>(&self, workload: &W) -> W::Result {
174 let start = Instant::now();
175 let result = workload.execute_cpu();
176 let elapsed = start.elapsed();
177 self.stats
178 .record_cpu_execution(elapsed, workload.workload_size());
179 result
180 }
181
182 pub fn execute_gpu<W: HybridWorkload>(&self, workload: &W) -> HybridResult<W::Result> {
184 if !self.config.gpu_available {
185 return Err(HybridError::GpuNotAvailable);
186 }
187
188 let start = Instant::now();
189 let result = workload.execute_gpu()?;
190 let elapsed = start.elapsed();
191 self.stats
192 .record_gpu_execution(elapsed, workload.workload_size());
193 Ok(result)
194 }
195
196 #[must_use]
198 pub fn config(&self) -> &HybridConfig {
199 &self.config
200 }
201
202 #[must_use]
204 pub fn stats(&self) -> &HybridStats {
205 &self.stats
206 }
207
208 #[must_use]
210 pub fn stats_arc(&self) -> Arc<HybridStats> {
211 Arc::clone(&self.stats)
212 }
213
214 #[must_use]
216 pub fn adaptive_threshold(&self) -> usize {
217 self.adaptive_threshold.load(Ordering::Relaxed)
218 }
219
220 pub fn set_adaptive_threshold(&self, threshold: usize) {
222 let clamped = threshold
223 .max(self.config.min_adaptive_threshold)
224 .min(self.config.max_adaptive_threshold);
225 self.adaptive_threshold.store(clamped, Ordering::Relaxed);
226 self.stats.set_learned_threshold(clamped);
227 }
228}
229
230#[derive(Debug, Clone)]
232#[allow(dead_code)]
233pub struct HybridExecutionResult<T> {
234 pub value: T,
236 pub execution_time: Duration,
238 pub used_gpu: bool,
240 pub workload_size: usize,
242}
243
244#[allow(dead_code)]
245impl<T> HybridExecutionResult<T> {
246 pub fn new(value: T, execution_time: Duration, used_gpu: bool, workload_size: usize) -> Self {
248 Self {
249 value,
250 execution_time,
251 used_gpu,
252 workload_size,
253 }
254 }
255
256 #[must_use]
258 pub fn throughput(&self) -> f64 {
259 if self.execution_time.is_zero() {
260 return 0.0;
261 }
262 self.workload_size as f64 / self.execution_time.as_secs_f64()
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 struct TestWorkload {
271 size: usize,
272 supports_gpu: bool,
273 }
274
275 impl HybridWorkload for TestWorkload {
276 type Result = usize;
277
278 fn workload_size(&self) -> usize {
279 self.size
280 }
281
282 fn execute_cpu(&self) -> Self::Result {
283 self.size * 2
284 }
285
286 fn execute_gpu(&self) -> HybridResult<Self::Result> {
287 Ok(self.size * 3)
288 }
289
290 fn supports_gpu(&self) -> bool {
291 self.supports_gpu
292 }
293 }
294
295 #[test]
296 fn test_dispatcher_new() {
297 let dispatcher = HybridDispatcher::new(HybridConfig::default());
298 assert!(!dispatcher.config().gpu_available);
299 }
300
301 #[test]
302 fn test_should_use_gpu_hybrid() {
303 let config = HybridConfig::builder()
304 .mode(ProcessingMode::Hybrid {
305 gpu_threshold: 1000,
306 })
307 .gpu_available(true)
308 .build();
309 let dispatcher = HybridDispatcher::new(config);
310
311 assert!(!dispatcher.should_use_gpu(500));
312 assert!(dispatcher.should_use_gpu(1000));
313 assert!(dispatcher.should_use_gpu(5000));
314 }
315
316 #[test]
317 fn test_should_use_gpu_cpu_only() {
318 let config = HybridConfig::cpu_only();
319 let dispatcher = HybridDispatcher::new(config);
320
321 assert!(!dispatcher.should_use_gpu(1_000_000));
322 }
323
324 #[test]
325 fn test_should_use_gpu_gpu_only() {
326 let config = HybridConfig::gpu_only();
327 let dispatcher = HybridDispatcher::new(config);
328
329 assert!(dispatcher.should_use_gpu(1));
330 }
331
332 #[test]
333 fn test_execute_cpu_path() {
334 let config = HybridConfig::cpu_only();
335 let dispatcher = HybridDispatcher::new(config);
336
337 let workload = TestWorkload {
338 size: 100,
339 supports_gpu: true,
340 };
341 let result = dispatcher.execute(&workload).unwrap();
342
343 assert_eq!(result, 200); assert_eq!(dispatcher.stats().cpu_executions(), 1);
345 }
346
347 #[test]
348 fn test_execute_gpu_path() {
349 let config = HybridConfig::gpu_only();
350 let dispatcher = HybridDispatcher::new(config);
351
352 let workload = TestWorkload {
353 size: 100,
354 supports_gpu: true,
355 };
356 let result = dispatcher.execute(&workload).unwrap();
357
358 assert_eq!(result, 300); assert_eq!(dispatcher.stats().gpu_executions(), 1);
360 }
361
362 #[test]
363 fn test_execute_falls_back_if_gpu_unsupported() {
364 let config = HybridConfig::gpu_only();
365 let dispatcher = HybridDispatcher::new(config);
366
367 let workload = TestWorkload {
368 size: 100,
369 supports_gpu: false,
370 };
371 let result = dispatcher.execute(&workload).unwrap();
372
373 assert_eq!(result, 200); assert_eq!(dispatcher.stats().cpu_executions(), 1);
375 }
376
377 #[test]
378 fn test_workload_too_large() {
379 let config = HybridConfig::builder().max_workload_size(100).build();
380 let dispatcher = HybridDispatcher::new(config);
381
382 let workload = TestWorkload {
383 size: 1000,
384 supports_gpu: true,
385 };
386 let result = dispatcher.execute(&workload);
387
388 assert!(matches!(
389 result,
390 Err(HybridError::WorkloadTooLarge {
391 requested: 1000,
392 maximum: 100
393 })
394 ));
395 }
396
397 #[test]
398 fn test_adaptive_threshold_update_gpu_faster() {
399 let config = HybridConfig::builder()
400 .mode(ProcessingMode::Adaptive)
401 .gpu_available(true)
402 .learning_rate(0.5)
403 .build();
404 let dispatcher = HybridDispatcher::new(config);
405
406 let initial = dispatcher.adaptive_threshold();
407
408 dispatcher.update_adaptive_threshold(
410 5000,
411 Duration::from_millis(100),
412 Duration::from_millis(10),
413 );
414
415 assert!(dispatcher.adaptive_threshold() < initial);
416 }
417
418 #[test]
419 fn test_adaptive_threshold_update_cpu_faster() {
420 let config = HybridConfig::builder()
421 .mode(ProcessingMode::Adaptive)
422 .gpu_available(true)
423 .learning_rate(0.5)
424 .build();
425 let dispatcher = HybridDispatcher::new(config);
426
427 let initial = dispatcher.adaptive_threshold();
428
429 dispatcher.update_adaptive_threshold(
431 5000,
432 Duration::from_millis(10),
433 Duration::from_millis(100),
434 );
435
436 assert!(dispatcher.adaptive_threshold() > initial);
437 }
438
439 #[test]
440 fn test_set_adaptive_threshold_clamping() {
441 let config = HybridConfig::builder()
442 .mode(ProcessingMode::Adaptive)
443 .min_adaptive_threshold(100)
444 .max_adaptive_threshold(10000)
445 .build();
446 let dispatcher = HybridDispatcher::new(config);
447
448 dispatcher.set_adaptive_threshold(50);
449 assert_eq!(dispatcher.adaptive_threshold(), 100);
450
451 dispatcher.set_adaptive_threshold(50000);
452 assert_eq!(dispatcher.adaptive_threshold(), 10000);
453 }
454
455 #[test]
456 fn test_execution_result_throughput() {
457 let result = HybridExecutionResult::new(42, Duration::from_secs(1), false, 1000);
458 assert!((result.throughput() - 1000.0).abs() < f64::EPSILON);
459 }
460}