Skip to main content

ringkernel_core/hybrid/
dispatcher.rs

1//! Hybrid CPU-GPU dispatcher.
2
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use super::config::{HybridConfig, ProcessingMode};
8use super::error::{HybridError, HybridResult};
9use super::stats::HybridStats;
10use super::traits::HybridWorkload;
11
12/// Dispatcher for routing workloads between CPU and GPU.
13///
14/// The dispatcher uses the configured `ProcessingMode` to decide where to
15/// execute each workload. In `Adaptive` mode, it learns the optimal threshold
16/// from runtime measurements.
17pub struct HybridDispatcher {
18    /// Configuration.
19    config: HybridConfig,
20    /// Execution statistics.
21    stats: Arc<HybridStats>,
22    /// Adaptive threshold (updated based on measurements).
23    adaptive_threshold: AtomicUsize,
24}
25
26impl HybridDispatcher {
27    /// Creates a new hybrid dispatcher.
28    #[must_use]
29    pub fn new(config: HybridConfig) -> Self {
30        let initial_threshold = match config.mode {
31            ProcessingMode::Hybrid { gpu_threshold } => gpu_threshold,
32            _ => 10_000,
33        };
34
35        Self {
36            config,
37            stats: Arc::new(HybridStats::new()),
38            adaptive_threshold: AtomicUsize::new(initial_threshold),
39        }
40    }
41
42    /// Creates a dispatcher with default configuration.
43    #[must_use]
44    pub fn with_defaults() -> Self {
45        Self::new(HybridConfig::default())
46    }
47
48    /// Returns whether GPU should be used for the given workload size.
49    #[must_use]
50    pub fn should_use_gpu(&self, workload_size: usize) -> bool {
51        if !self.config.gpu_available {
52            return false;
53        }
54
55        match self.config.mode {
56            ProcessingMode::GpuOnly => true,
57            ProcessingMode::CpuOnly => false,
58            ProcessingMode::Hybrid { gpu_threshold } => workload_size >= gpu_threshold,
59            ProcessingMode::Adaptive => {
60                workload_size >= self.adaptive_threshold.load(Ordering::Relaxed)
61            }
62        }
63    }
64
65    /// Executes a workload using the appropriate backend.
66    ///
67    /// Returns the result and records execution statistics.
68    pub fn execute<W: HybridWorkload>(&self, workload: &W) -> HybridResult<W::Result> {
69        let size = workload.workload_size();
70
71        // Check workload size limit
72        if self.config.max_workload_size > 0 && size > self.config.max_workload_size {
73            return Err(HybridError::WorkloadTooLarge {
74                requested: size,
75                maximum: self.config.max_workload_size,
76            });
77        }
78
79        let use_gpu = self.should_use_gpu(size) && workload.supports_gpu();
80
81        if use_gpu {
82            let start = Instant::now();
83            let result = workload.execute_gpu()?;
84            let elapsed = start.elapsed();
85            self.stats.record_gpu_execution(elapsed, size);
86            Ok(result)
87        } else {
88            let start = Instant::now();
89            let result = workload.execute_cpu();
90            let elapsed = start.elapsed();
91            self.stats.record_cpu_execution(elapsed, size);
92            Ok(result)
93        }
94    }
95
96    /// Executes a workload and measures both backends for comparison.
97    ///
98    /// In `Adaptive` mode, this updates the threshold based on measurements.
99    /// Returns the result from the faster backend.
100    pub fn execute_measured<W: HybridWorkload>(&self, workload: &W) -> HybridResult<W::Result>
101    where
102        W::Result: Clone,
103    {
104        let size = workload.workload_size();
105
106        if !self.config.gpu_available || !workload.supports_gpu() {
107            let start = Instant::now();
108            let result = workload.execute_cpu();
109            let elapsed = start.elapsed();
110            self.stats.record_cpu_execution(elapsed, size);
111            return Ok(result);
112        }
113
114        // Execute on CPU
115        let cpu_start = Instant::now();
116        let cpu_result = workload.execute_cpu();
117        let cpu_time = cpu_start.elapsed();
118
119        // Execute on GPU
120        let gpu_start = Instant::now();
121        let gpu_result = workload.execute_gpu()?;
122        let gpu_time = gpu_start.elapsed();
123
124        // Update adaptive threshold
125        self.update_adaptive_threshold(size, cpu_time, gpu_time);
126
127        // Record whichever was faster
128        if gpu_time < cpu_time {
129            self.stats.record_gpu_execution(gpu_time, size);
130            Ok(gpu_result)
131        } else {
132            self.stats.record_cpu_execution(cpu_time, size);
133            Ok(cpu_result)
134        }
135    }
136
137    /// Updates the adaptive threshold based on runtime measurements.
138    pub fn update_adaptive_threshold(
139        &self,
140        _workload_size: usize,
141        cpu_time: Duration,
142        gpu_time: Duration,
143    ) {
144        if !matches!(self.config.mode, ProcessingMode::Adaptive) {
145            return;
146        }
147
148        let current = self.adaptive_threshold.load(Ordering::Relaxed);
149        let ratio = cpu_time.as_nanos() as f32 / gpu_time.as_nanos().max(1) as f32;
150
151        let new_threshold = if ratio > 1.5 {
152            // GPU significantly faster - lower threshold
153            let adjustment = (current as f32 * self.config.learning_rate) as usize;
154            current
155                .saturating_sub(adjustment)
156                .max(self.config.min_adaptive_threshold)
157        } else if ratio < 0.7 {
158            // CPU significantly faster - raise threshold
159            let adjustment = (current as f32 * self.config.learning_rate) as usize;
160            current
161                .saturating_add(adjustment)
162                .min(self.config.max_adaptive_threshold)
163        } else {
164            current
165        };
166
167        self.adaptive_threshold
168            .store(new_threshold, Ordering::Relaxed);
169        self.stats.set_learned_threshold(new_threshold);
170    }
171
172    /// Forces execution on CPU regardless of mode.
173    pub fn execute_cpu<W: HybridWorkload>(&self, workload: &W) -> W::Result {
174        let start = Instant::now();
175        let result = workload.execute_cpu();
176        let elapsed = start.elapsed();
177        self.stats
178            .record_cpu_execution(elapsed, workload.workload_size());
179        result
180    }
181
182    /// Forces execution on GPU regardless of mode.
183    pub fn execute_gpu<W: HybridWorkload>(&self, workload: &W) -> HybridResult<W::Result> {
184        if !self.config.gpu_available {
185            return Err(HybridError::GpuNotAvailable);
186        }
187
188        let start = Instant::now();
189        let result = workload.execute_gpu()?;
190        let elapsed = start.elapsed();
191        self.stats
192            .record_gpu_execution(elapsed, workload.workload_size());
193        Ok(result)
194    }
195
196    /// Returns the configuration.
197    #[must_use]
198    pub fn config(&self) -> &HybridConfig {
199        &self.config
200    }
201
202    /// Returns the execution statistics.
203    #[must_use]
204    pub fn stats(&self) -> &HybridStats {
205        &self.stats
206    }
207
208    /// Returns a shared reference to the statistics.
209    #[must_use]
210    pub fn stats_arc(&self) -> Arc<HybridStats> {
211        Arc::clone(&self.stats)
212    }
213
214    /// Returns the current adaptive threshold.
215    #[must_use]
216    pub fn adaptive_threshold(&self) -> usize {
217        self.adaptive_threshold.load(Ordering::Relaxed)
218    }
219
220    /// Manually sets the adaptive threshold.
221    pub fn set_adaptive_threshold(&self, threshold: usize) {
222        let clamped = threshold
223            .max(self.config.min_adaptive_threshold)
224            .min(self.config.max_adaptive_threshold);
225        self.adaptive_threshold.store(clamped, Ordering::Relaxed);
226        self.stats.set_learned_threshold(clamped);
227    }
228}
229
230/// Result of a hybrid execution.
231#[derive(Debug, Clone)]
232#[allow(dead_code)]
233pub struct HybridExecutionResult<T> {
234    /// The result value.
235    pub value: T,
236    /// Execution time.
237    pub execution_time: Duration,
238    /// Whether GPU was used.
239    pub used_gpu: bool,
240    /// Workload size.
241    pub workload_size: usize,
242}
243
244#[allow(dead_code)]
245impl<T> HybridExecutionResult<T> {
246    /// Creates a new execution result.
247    pub fn new(value: T, execution_time: Duration, used_gpu: bool, workload_size: usize) -> Self {
248        Self {
249            value,
250            execution_time,
251            used_gpu,
252            workload_size,
253        }
254    }
255
256    /// Returns throughput in elements per second.
257    #[must_use]
258    pub fn throughput(&self) -> f64 {
259        if self.execution_time.is_zero() {
260            return 0.0;
261        }
262        self.workload_size as f64 / self.execution_time.as_secs_f64()
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    struct TestWorkload {
271        size: usize,
272        supports_gpu: bool,
273    }
274
275    impl HybridWorkload for TestWorkload {
276        type Result = usize;
277
278        fn workload_size(&self) -> usize {
279            self.size
280        }
281
282        fn execute_cpu(&self) -> Self::Result {
283            self.size * 2
284        }
285
286        fn execute_gpu(&self) -> HybridResult<Self::Result> {
287            Ok(self.size * 3)
288        }
289
290        fn supports_gpu(&self) -> bool {
291            self.supports_gpu
292        }
293    }
294
295    #[test]
296    fn test_dispatcher_new() {
297        let dispatcher = HybridDispatcher::new(HybridConfig::default());
298        assert!(!dispatcher.config().gpu_available);
299    }
300
301    #[test]
302    fn test_should_use_gpu_hybrid() {
303        let config = HybridConfig::builder()
304            .mode(ProcessingMode::Hybrid {
305                gpu_threshold: 1000,
306            })
307            .gpu_available(true)
308            .build();
309        let dispatcher = HybridDispatcher::new(config);
310
311        assert!(!dispatcher.should_use_gpu(500));
312        assert!(dispatcher.should_use_gpu(1000));
313        assert!(dispatcher.should_use_gpu(5000));
314    }
315
316    #[test]
317    fn test_should_use_gpu_cpu_only() {
318        let config = HybridConfig::cpu_only();
319        let dispatcher = HybridDispatcher::new(config);
320
321        assert!(!dispatcher.should_use_gpu(1_000_000));
322    }
323
324    #[test]
325    fn test_should_use_gpu_gpu_only() {
326        let config = HybridConfig::gpu_only();
327        let dispatcher = HybridDispatcher::new(config);
328
329        assert!(dispatcher.should_use_gpu(1));
330    }
331
332    #[test]
333    fn test_execute_cpu_path() {
334        let config = HybridConfig::cpu_only();
335        let dispatcher = HybridDispatcher::new(config);
336
337        let workload = TestWorkload {
338            size: 100,
339            supports_gpu: true,
340        };
341        let result = dispatcher.execute(&workload).unwrap();
342
343        assert_eq!(result, 200); // CPU result
344        assert_eq!(dispatcher.stats().cpu_executions(), 1);
345    }
346
347    #[test]
348    fn test_execute_gpu_path() {
349        let config = HybridConfig::gpu_only();
350        let dispatcher = HybridDispatcher::new(config);
351
352        let workload = TestWorkload {
353            size: 100,
354            supports_gpu: true,
355        };
356        let result = dispatcher.execute(&workload).unwrap();
357
358        assert_eq!(result, 300); // GPU result
359        assert_eq!(dispatcher.stats().gpu_executions(), 1);
360    }
361
362    #[test]
363    fn test_execute_falls_back_if_gpu_unsupported() {
364        let config = HybridConfig::gpu_only();
365        let dispatcher = HybridDispatcher::new(config);
366
367        let workload = TestWorkload {
368            size: 100,
369            supports_gpu: false,
370        };
371        let result = dispatcher.execute(&workload).unwrap();
372
373        assert_eq!(result, 200); // CPU result
374        assert_eq!(dispatcher.stats().cpu_executions(), 1);
375    }
376
377    #[test]
378    fn test_workload_too_large() {
379        let config = HybridConfig::builder().max_workload_size(100).build();
380        let dispatcher = HybridDispatcher::new(config);
381
382        let workload = TestWorkload {
383            size: 1000,
384            supports_gpu: true,
385        };
386        let result = dispatcher.execute(&workload);
387
388        assert!(matches!(
389            result,
390            Err(HybridError::WorkloadTooLarge {
391                requested: 1000,
392                maximum: 100
393            })
394        ));
395    }
396
397    #[test]
398    fn test_adaptive_threshold_update_gpu_faster() {
399        let config = HybridConfig::builder()
400            .mode(ProcessingMode::Adaptive)
401            .gpu_available(true)
402            .learning_rate(0.5)
403            .build();
404        let dispatcher = HybridDispatcher::new(config);
405
406        let initial = dispatcher.adaptive_threshold();
407
408        // GPU significantly faster
409        dispatcher.update_adaptive_threshold(
410            5000,
411            Duration::from_millis(100),
412            Duration::from_millis(10),
413        );
414
415        assert!(dispatcher.adaptive_threshold() < initial);
416    }
417
418    #[test]
419    fn test_adaptive_threshold_update_cpu_faster() {
420        let config = HybridConfig::builder()
421            .mode(ProcessingMode::Adaptive)
422            .gpu_available(true)
423            .learning_rate(0.5)
424            .build();
425        let dispatcher = HybridDispatcher::new(config);
426
427        let initial = dispatcher.adaptive_threshold();
428
429        // CPU significantly faster
430        dispatcher.update_adaptive_threshold(
431            5000,
432            Duration::from_millis(10),
433            Duration::from_millis(100),
434        );
435
436        assert!(dispatcher.adaptive_threshold() > initial);
437    }
438
439    #[test]
440    fn test_set_adaptive_threshold_clamping() {
441        let config = HybridConfig::builder()
442            .mode(ProcessingMode::Adaptive)
443            .min_adaptive_threshold(100)
444            .max_adaptive_threshold(10000)
445            .build();
446        let dispatcher = HybridDispatcher::new(config);
447
448        dispatcher.set_adaptive_threshold(50);
449        assert_eq!(dispatcher.adaptive_threshold(), 100);
450
451        dispatcher.set_adaptive_threshold(50000);
452        assert_eq!(dispatcher.adaptive_threshold(), 10000);
453    }
454
455    #[test]
456    fn test_execution_result_throughput() {
457        let result = HybridExecutionResult::new(42, Duration::from_secs(1), false, 1000);
458        assert!((result.throughput() - 1000.0).abs() < f64::EPSILON);
459    }
460}