sklears_compose/resource_management/
cpu_manager.rs

1//! CPU resource management
2//!
3//! This module provides comprehensive CPU resource allocation, NUMA awareness,
4//! and performance optimization for the resource management system.
5
6use super::resource_types::CpuAllocation;
7use sklears_core::error::{Result as SklResult, SklearsError};
8use std::collections::HashMap;
9use std::time::Duration;
10
11/// CPU resource manager for core allocation and NUMA optimization
12#[derive(Debug)]
13pub struct CpuResourceManager {
14    /// CPU topology information
15    topology: CpuTopology,
16    /// Core allocations
17    allocations: HashMap<String, CpuAllocation>,
18    /// NUMA nodes
19    numa_nodes: Vec<NumaNode>,
20    /// CPU cores
21    cpu_cores: Vec<CpuCore>,
22    /// CPU configuration
23    config: CpuManagerConfig,
24    /// Performance statistics
25    stats: CpuStats,
26}
27
28/// CPU topology information
29#[derive(Debug, Clone)]
30pub struct CpuTopology {
31    /// Number of physical CPUs
32    pub physical_cpus: u32,
33    /// Cores per CPU
34    pub cores_per_cpu: u32,
35    /// Threads per core
36    pub threads_per_core: u32,
37    /// Total logical CPUs
38    pub logical_cpus: u32,
39    /// CPU architecture
40    pub architecture: String,
41    /// CPU features
42    pub features: Vec<String>,
43    /// Cache hierarchy
44    pub cache_hierarchy: CacheHierarchy,
45}
46
47/// CPU cache hierarchy
48#[derive(Debug, Clone)]
49pub struct CacheHierarchy {
50    /// L1 instruction cache
51    pub l1i_cache: CacheInfo,
52    /// L1 data cache
53    pub l1d_cache: CacheInfo,
54    /// L2 cache
55    pub l2_cache: CacheInfo,
56    /// L3 cache
57    pub l3_cache: Option<CacheInfo>,
58    /// L4 cache
59    pub l4_cache: Option<CacheInfo>,
60}
61
62/// CPU cache information
63#[derive(Debug, Clone)]
64pub struct CacheInfo {
65    /// Cache size in bytes
66    pub size: u64,
67    /// Cache line size
68    pub line_size: u32,
69    /// Cache associativity
70    pub associativity: u32,
71    /// Shared among cores
72    pub shared: bool,
73}
74
75/// NUMA node information
76#[derive(Debug, Clone)]
77pub struct NumaNode {
78    /// Node ID
79    pub node_id: usize,
80    /// CPU cores in this node
81    pub cpu_cores: Vec<usize>,
82    /// Memory size
83    pub memory_size: u64,
84    /// Memory bandwidth
85    pub memory_bandwidth: f64,
86    /// Distance to other nodes
87    pub distances: HashMap<usize, u32>,
88    /// Node utilization
89    pub utilization: f64,
90}
91
92/// Individual CPU core
93#[derive(Debug, Clone)]
94pub struct CpuCore {
95    /// Core ID
96    pub core_id: usize,
97    /// Physical core ID
98    pub physical_id: usize,
99    /// NUMA node
100    pub numa_node: usize,
101    /// Core state
102    pub state: CoreState,
103    /// Current frequency
104    pub current_frequency: f64,
105    /// Available frequencies
106    pub available_frequencies: Vec<f64>,
107    /// Current governor
108    pub governor: String,
109    /// Core utilization
110    pub utilization: f64,
111    /// Temperature
112    pub temperature: Option<f64>,
113}
114
115/// CPU core states
116#[derive(Debug, Clone, PartialEq)]
117pub enum CoreState {
118    /// Idle
119    Idle,
120    /// Running
121    Running,
122    /// Offline
123    Offline,
124    /// Throttled
125    Throttled,
126    /// Error
127    Error,
128}
129
130/// CPU manager configuration
131#[derive(Debug, Clone)]
132pub struct CpuManagerConfig {
133    /// Enable NUMA awareness
134    pub numa_aware: bool,
135    /// CPU affinity strategy
136    pub affinity_strategy: AffinityStrategy,
137    /// Enable dynamic frequency scaling
138    pub enable_dvfs: bool,
139    /// Default CPU governor
140    pub default_governor: String,
141    /// Enable thermal management
142    pub enable_thermal_management: bool,
143    /// Thermal throttling threshold
144    pub thermal_threshold: f64,
145    /// Core utilization threshold
146    pub utilization_threshold: f64,
147}
148
149/// CPU affinity strategies
150#[derive(Debug, Clone)]
151pub enum AffinityStrategy {
152    /// No specific affinity
153    None,
154    /// Prefer cores on same NUMA node
155    NumaLocal,
156    /// Prefer physical cores over hyperthreads
157    PhysicalCores,
158    /// Spread across all cores
159    Spread,
160    /// Pack cores together
161    Pack,
162    /// Custom affinity pattern
163    Custom(Vec<usize>),
164}
165
166/// CPU performance statistics
167#[derive(Debug, Clone)]
168pub struct CpuStats {
169    /// Total allocations
170    pub total_allocations: u64,
171    /// Active allocations
172    pub active_allocations: u64,
173    /// Average core utilization
174    pub avg_utilization: f64,
175    /// Peak utilization
176    pub peak_utilization: f64,
177    /// Context switches per second
178    pub context_switches_per_sec: f64,
179    /// Cache hit rate
180    pub cache_hit_rate: f64,
181    /// NUMA misses
182    pub numa_misses: u64,
183    /// Thermal throttling events
184    pub thermal_events: u64,
185}
186
187/// CPU allocation request
188#[derive(Debug, Clone)]
189pub struct CpuAllocationRequest {
190    /// Task ID
191    pub task_id: String,
192    /// Number of cores requested
193    pub cores_requested: u32,
194    /// Minimum frequency requirement
195    pub min_frequency: Option<f64>,
196    /// NUMA node preference
197    pub numa_preference: Option<usize>,
198    /// CPU affinity requirements
199    pub affinity_requirements: Option<Vec<usize>>,
200    /// Exclusive core access
201    pub exclusive_access: bool,
202    /// Performance requirements
203    pub performance_requirements: PerformanceRequirements,
204}
205
206/// Performance requirements for CPU allocation
207#[derive(Debug, Clone)]
208pub struct PerformanceRequirements {
209    /// Minimum CPU performance
210    pub min_performance: f64,
211    /// Maximum acceptable latency
212    pub max_latency: Duration,
213    /// Required cache coherency
214    pub cache_coherency: bool,
215    /// SIMD requirements
216    pub simd_requirements: Vec<String>,
217}
218
219impl Default for CpuResourceManager {
220    fn default() -> Self {
221        Self::new()
222    }
223}
224
225impl CpuResourceManager {
226    /// Create a new CPU resource manager
227    #[must_use]
228    pub fn new() -> Self {
229        let topology = Self::detect_topology();
230        let numa_nodes = Self::detect_numa_topology();
231        let cpu_cores = Self::initialize_cores(&topology, &numa_nodes);
232
233        Self {
234            topology,
235            allocations: HashMap::new(),
236            numa_nodes,
237            cpu_cores,
238            config: CpuManagerConfig::default(),
239            stats: CpuStats::default(),
240        }
241    }
242
243    /// Detect CPU topology
244    fn detect_topology() -> CpuTopology {
245        // In a real implementation, this would query the system
246        // For now, return a default topology
247        /// CpuTopology
248        CpuTopology {
249            physical_cpus: num_cpus::get_physical() as u32,
250            cores_per_cpu: 1,
251            threads_per_core: if num_cpus::get() > num_cpus::get_physical() {
252                2
253            } else {
254                1
255            },
256            logical_cpus: num_cpus::get() as u32,
257            architecture: std::env::consts::ARCH.to_string(),
258            features: vec!["sse".to_string(), "sse2".to_string(), "avx".to_string()],
259            cache_hierarchy: CacheHierarchy {
260                l1i_cache: CacheInfo {
261                    size: 32 * 1024,
262                    line_size: 64,
263                    associativity: 8,
264                    shared: false,
265                },
266                l1d_cache: CacheInfo {
267                    size: 32 * 1024,
268                    line_size: 64,
269                    associativity: 8,
270                    shared: false,
271                },
272                l2_cache: CacheInfo {
273                    size: 256 * 1024,
274                    line_size: 64,
275                    associativity: 8,
276                    shared: false,
277                },
278                l3_cache: Some(CacheInfo {
279                    size: 8 * 1024 * 1024,
280                    line_size: 64,
281                    associativity: 16,
282                    shared: true,
283                }),
284                l4_cache: None,
285            },
286        }
287    }
288
289    /// Detect NUMA topology
290    fn detect_numa_topology() -> Vec<NumaNode> {
291        // In a real implementation, this would query NUMA information
292        // For now, return a single NUMA node
293        vec![NumaNode {
294            node_id: 0,
295            cpu_cores: (0..num_cpus::get()).collect(),
296            memory_size: 16 * 1024 * 1024 * 1024, // 16GB default
297            memory_bandwidth: 25600.0,            // MB/s
298            distances: [(0, 10)].iter().copied().collect(),
299            utilization: 0.0,
300        }]
301    }
302
303    /// Initialize CPU cores
304    fn initialize_cores(topology: &CpuTopology, numa_nodes: &[NumaNode]) -> Vec<CpuCore> {
305        (0..topology.logical_cpus)
306            .map(|core_id| {
307                let numa_node = numa_nodes
308                    .iter()
309                    .find(|node| node.cpu_cores.contains(&(core_id as usize)))
310                    .map_or(0, |node| node.node_id);
311
312                /// CpuCore
313                CpuCore {
314                    core_id: core_id as usize,
315                    physical_id: core_id as usize / topology.threads_per_core as usize,
316                    numa_node,
317                    state: CoreState::Idle,
318                    current_frequency: 2400.0, // MHz
319                    available_frequencies: vec![1200.0, 1800.0, 2400.0, 3200.0],
320                    governor: "ondemand".to_string(),
321                    utilization: 0.0,
322                    temperature: None,
323                }
324            })
325            .collect()
326    }
327
328    /// Allocate CPU resources
329    pub fn allocate_cpu(&mut self, request: CpuAllocationRequest) -> SklResult<CpuAllocation> {
330        // Find available cores
331        let available_cores = self.find_available_cores(&request)?;
332
333        if available_cores.len() < request.cores_requested as usize {
334            return Err(SklearsError::ResourceAllocationError(format!(
335                "Not enough CPU cores available: requested {}, available {}",
336                request.cores_requested,
337                available_cores.len()
338            )));
339        }
340
341        // Select optimal cores based on strategy
342        let selected_cores = self.select_optimal_cores(&available_cores, &request)?;
343
344        // Create allocation
345        let allocation = CpuAllocation {
346            cores: selected_cores.clone(),
347            threads: selected_cores.clone(), // Simplified: assume 1:1 mapping
348            affinity_mask: self.create_affinity_mask(&selected_cores),
349            numa_node: self.get_numa_node_for_cores(&selected_cores),
350            frequency: request.min_frequency,
351            governor: Some(self.config.default_governor.clone()),
352        };
353
354        // Update core states
355        for &core_id in &selected_cores {
356            if let Some(core) = self.cpu_cores.iter_mut().find(|c| c.core_id == core_id) {
357                core.state = CoreState::Running;
358            }
359        }
360
361        // Store allocation
362        self.allocations.insert(request.task_id, allocation.clone());
363        self.stats.total_allocations += 1;
364        self.stats.active_allocations += 1;
365
366        Ok(allocation)
367    }
368
369    /// Release CPU allocation
370    pub fn release_cpu(&mut self, task_id: &str) -> SklResult<()> {
371        if let Some(allocation) = self.allocations.remove(task_id) {
372            // Update core states back to idle
373            for &core_id in &allocation.cores {
374                if let Some(core) = self.cpu_cores.iter_mut().find(|c| c.core_id == core_id) {
375                    core.state = CoreState::Idle;
376                }
377            }
378            self.stats.active_allocations = self.stats.active_allocations.saturating_sub(1);
379            Ok(())
380        } else {
381            Err(SklearsError::ResourceAllocationError(format!(
382                "No CPU allocation found for task {task_id}"
383            )))
384        }
385    }
386
387    /// Find available CPU cores
388    fn find_available_cores(&self, request: &CpuAllocationRequest) -> SklResult<Vec<usize>> {
389        let mut available = Vec::new();
390
391        for core in &self.cpu_cores {
392            if core.state == CoreState::Idle {
393                // Check NUMA preference
394                if let Some(preferred_numa) = request.numa_preference {
395                    if core.numa_node != preferred_numa {
396                        continue;
397                    }
398                }
399
400                // Check frequency requirement
401                if let Some(min_freq) = request.min_frequency {
402                    if core.available_frequencies.iter().all(|&f| f < min_freq) {
403                        continue;
404                    }
405                }
406
407                available.push(core.core_id);
408            }
409        }
410
411        Ok(available)
412    }
413
414    /// Select optimal cores based on affinity strategy
415    fn select_optimal_cores(
416        &self,
417        available_cores: &[usize],
418        request: &CpuAllocationRequest,
419    ) -> SklResult<Vec<usize>> {
420        let mut selected = Vec::new();
421        let cores_needed = request.cores_requested as usize;
422
423        if available_cores.len() < cores_needed {
424            return Err(SklearsError::ResourceAllocationError(
425                "Not enough cores available".to_string(),
426            ));
427        }
428
429        match &self.config.affinity_strategy {
430            AffinityStrategy::NumaLocal => {
431                // Group cores by NUMA node and prefer local allocation
432                let mut numa_groups: HashMap<usize, Vec<usize>> = HashMap::new();
433                for &core_id in available_cores {
434                    if let Some(core) = self.cpu_cores.iter().find(|c| c.core_id == core_id) {
435                        numa_groups.entry(core.numa_node).or_default().push(core_id);
436                    }
437                }
438
439                // Try to satisfy the request from a single NUMA node first
440                for (_, mut cores) in numa_groups {
441                    if cores.len() >= cores_needed {
442                        cores.sort_unstable();
443                        selected.extend(cores.into_iter().take(cores_needed));
444                        break;
445                    }
446                }
447
448                // If no single NUMA node can satisfy, distribute across nodes
449                if selected.is_empty() {
450                    let mut remaining = cores_needed;
451                    for &core_id in available_cores {
452                        if remaining == 0 {
453                            break;
454                        }
455                        selected.push(core_id);
456                        remaining -= 1;
457                    }
458                }
459            }
460            AffinityStrategy::PhysicalCores => {
461                // Prefer physical cores over hyperthreads
462                let mut physical_cores = Vec::new();
463                let mut hyperthreads = Vec::new();
464
465                for &core_id in available_cores {
466                    if let Some(core) = self.cpu_cores.iter().find(|c| c.core_id == core_id) {
467                        if core.core_id == core.physical_id {
468                            physical_cores.push(core_id);
469                        } else {
470                            hyperthreads.push(core_id);
471                        }
472                    }
473                }
474
475                // First use physical cores
476                let from_physical = physical_cores.into_iter().take(cores_needed);
477                selected.extend(from_physical);
478
479                // Then use hyperthreads if needed
480                let remaining = cores_needed - selected.len();
481                if remaining > 0 {
482                    selected.extend(hyperthreads.into_iter().take(remaining));
483                }
484            }
485            AffinityStrategy::Spread => {
486                // Distribute across all available NUMA nodes
487                let mut numa_assignment: HashMap<usize, Vec<usize>> = HashMap::new();
488                for &core_id in available_cores {
489                    if let Some(core) = self.cpu_cores.iter().find(|c| c.core_id == core_id) {
490                        numa_assignment
491                            .entry(core.numa_node)
492                            .or_default()
493                            .push(core_id);
494                    }
495                }
496
497                let numa_count = numa_assignment.len();
498                let cores_per_numa = cores_needed / numa_count;
499                let mut extra_cores = cores_needed % numa_count;
500
501                for (_, mut cores) in numa_assignment {
502                    let take_count = if extra_cores > 0 {
503                        extra_cores -= 1;
504                        cores_per_numa + 1
505                    } else {
506                        cores_per_numa
507                    };
508
509                    cores.sort_unstable();
510                    let cores_len = cores.len();
511                    selected.extend(cores.into_iter().take(take_count.min(cores_len)));
512                    if selected.len() >= cores_needed {
513                        break;
514                    }
515                }
516            }
517            AffinityStrategy::Pack => {
518                // Pack cores together on same NUMA node
519                let mut cores_sorted = available_cores.to_vec();
520                cores_sorted.sort_unstable();
521                selected.extend(cores_sorted.into_iter().take(cores_needed));
522            }
523            AffinityStrategy::Custom(pattern) => {
524                // Use custom affinity pattern
525                for &core_id in pattern {
526                    if available_cores.contains(&core_id) && selected.len() < cores_needed {
527                        selected.push(core_id);
528                    }
529                }
530                // Fill remaining with any available cores
531                for &core_id in available_cores {
532                    if !selected.contains(&core_id) && selected.len() < cores_needed {
533                        selected.push(core_id);
534                    }
535                }
536            }
537            AffinityStrategy::None => {
538                // Just take first available cores
539                selected.extend(available_cores.iter().take(cores_needed));
540            }
541        }
542
543        if selected.len() < cores_needed {
544            return Err(SklearsError::ResourceAllocationError(format!(
545                "Could not satisfy core allocation: needed {}, got {}",
546                cores_needed,
547                selected.len()
548            )));
549        }
550
551        Ok(selected)
552    }
553
554    /// Create CPU affinity mask from core list
555    fn create_affinity_mask(&self, cores: &[usize]) -> u64 {
556        let mut mask = 0u64;
557        for &core_id in cores {
558            if core_id < 64 {
559                mask |= 1u64 << core_id;
560            }
561        }
562        mask
563    }
564
565    /// Get NUMA node for a set of cores
566    fn get_numa_node_for_cores(&self, cores: &[usize]) -> Option<usize> {
567        // Return the most common NUMA node among the cores
568        let mut numa_counts: HashMap<usize, usize> = HashMap::new();
569
570        for &core_id in cores {
571            if let Some(core) = self.cpu_cores.iter().find(|c| c.core_id == core_id) {
572                *numa_counts.entry(core.numa_node).or_insert(0) += 1;
573            }
574        }
575
576        numa_counts
577            .into_iter()
578            .max_by_key(|(_, count)| *count)
579            .map(|(numa_node, _)| numa_node)
580    }
581
582    /// Get current CPU statistics
583    #[must_use]
584    pub fn get_stats(&self) -> &CpuStats {
585        &self.stats
586    }
587
588    /// Get CPU topology information
589    #[must_use]
590    pub fn get_topology(&self) -> &CpuTopology {
591        &self.topology
592    }
593
594    /// Update CPU utilization statistics
595    pub fn update_utilization(&mut self) -> SklResult<()> {
596        // In a real implementation, this would query system CPU usage
597        // For now, simulate some utilization
598        let total_utilization: f64 = self.cpu_cores.iter().map(|core| core.utilization).sum();
599
600        self.stats.avg_utilization = total_utilization / self.cpu_cores.len() as f64;
601        self.stats.peak_utilization = self
602            .cpu_cores
603            .iter()
604            .map(|core| core.utilization)
605            .fold(0.0, f64::max);
606
607        Ok(())
608    }
609}
610
611impl Default for CpuManagerConfig {
612    fn default() -> Self {
613        Self {
614            numa_aware: true,
615            affinity_strategy: AffinityStrategy::NumaLocal,
616            enable_dvfs: true,
617            default_governor: "ondemand".to_string(),
618            enable_thermal_management: true,
619            thermal_threshold: 85.0,     // Celsius
620            utilization_threshold: 80.0, // Percent
621        }
622    }
623}
624
625impl Default for CpuStats {
626    fn default() -> Self {
627        Self {
628            total_allocations: 0,
629            active_allocations: 0,
630            avg_utilization: 0.0,
631            peak_utilization: 0.0,
632            context_switches_per_sec: 0.0,
633            cache_hit_rate: 0.0,
634            numa_misses: 0,
635            thermal_events: 0,
636        }
637    }
638}