1use super::resource_types::CpuAllocation;
7use sklears_core::error::{Result as SklResult, SklearsError};
8use std::collections::HashMap;
9use std::time::Duration;
10
11#[derive(Debug)]
13pub struct CpuResourceManager {
14 topology: CpuTopology,
16 allocations: HashMap<String, CpuAllocation>,
18 numa_nodes: Vec<NumaNode>,
20 cpu_cores: Vec<CpuCore>,
22 config: CpuManagerConfig,
24 stats: CpuStats,
26}
27
28#[derive(Debug, Clone)]
30pub struct CpuTopology {
31 pub physical_cpus: u32,
33 pub cores_per_cpu: u32,
35 pub threads_per_core: u32,
37 pub logical_cpus: u32,
39 pub architecture: String,
41 pub features: Vec<String>,
43 pub cache_hierarchy: CacheHierarchy,
45}
46
47#[derive(Debug, Clone)]
49pub struct CacheHierarchy {
50 pub l1i_cache: CacheInfo,
52 pub l1d_cache: CacheInfo,
54 pub l2_cache: CacheInfo,
56 pub l3_cache: Option<CacheInfo>,
58 pub l4_cache: Option<CacheInfo>,
60}
61
62#[derive(Debug, Clone)]
64pub struct CacheInfo {
65 pub size: u64,
67 pub line_size: u32,
69 pub associativity: u32,
71 pub shared: bool,
73}
74
75#[derive(Debug, Clone)]
77pub struct NumaNode {
78 pub node_id: usize,
80 pub cpu_cores: Vec<usize>,
82 pub memory_size: u64,
84 pub memory_bandwidth: f64,
86 pub distances: HashMap<usize, u32>,
88 pub utilization: f64,
90}
91
92#[derive(Debug, Clone)]
94pub struct CpuCore {
95 pub core_id: usize,
97 pub physical_id: usize,
99 pub numa_node: usize,
101 pub state: CoreState,
103 pub current_frequency: f64,
105 pub available_frequencies: Vec<f64>,
107 pub governor: String,
109 pub utilization: f64,
111 pub temperature: Option<f64>,
113}
114
115#[derive(Debug, Clone, PartialEq)]
117pub enum CoreState {
118 Idle,
120 Running,
122 Offline,
124 Throttled,
126 Error,
128}
129
130#[derive(Debug, Clone)]
132pub struct CpuManagerConfig {
133 pub numa_aware: bool,
135 pub affinity_strategy: AffinityStrategy,
137 pub enable_dvfs: bool,
139 pub default_governor: String,
141 pub enable_thermal_management: bool,
143 pub thermal_threshold: f64,
145 pub utilization_threshold: f64,
147}
148
149#[derive(Debug, Clone)]
151pub enum AffinityStrategy {
152 None,
154 NumaLocal,
156 PhysicalCores,
158 Spread,
160 Pack,
162 Custom(Vec<usize>),
164}
165
166#[derive(Debug, Clone)]
168pub struct CpuStats {
169 pub total_allocations: u64,
171 pub active_allocations: u64,
173 pub avg_utilization: f64,
175 pub peak_utilization: f64,
177 pub context_switches_per_sec: f64,
179 pub cache_hit_rate: f64,
181 pub numa_misses: u64,
183 pub thermal_events: u64,
185}
186
187#[derive(Debug, Clone)]
189pub struct CpuAllocationRequest {
190 pub task_id: String,
192 pub cores_requested: u32,
194 pub min_frequency: Option<f64>,
196 pub numa_preference: Option<usize>,
198 pub affinity_requirements: Option<Vec<usize>>,
200 pub exclusive_access: bool,
202 pub performance_requirements: PerformanceRequirements,
204}
205
206#[derive(Debug, Clone)]
208pub struct PerformanceRequirements {
209 pub min_performance: f64,
211 pub max_latency: Duration,
213 pub cache_coherency: bool,
215 pub simd_requirements: Vec<String>,
217}
218
219impl Default for CpuResourceManager {
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225impl CpuResourceManager {
226 #[must_use]
228 pub fn new() -> Self {
229 let topology = Self::detect_topology();
230 let numa_nodes = Self::detect_numa_topology();
231 let cpu_cores = Self::initialize_cores(&topology, &numa_nodes);
232
233 Self {
234 topology,
235 allocations: HashMap::new(),
236 numa_nodes,
237 cpu_cores,
238 config: CpuManagerConfig::default(),
239 stats: CpuStats::default(),
240 }
241 }
242
243 fn detect_topology() -> CpuTopology {
245 CpuTopology {
249 physical_cpus: num_cpus::get_physical() as u32,
250 cores_per_cpu: 1,
251 threads_per_core: if num_cpus::get() > num_cpus::get_physical() {
252 2
253 } else {
254 1
255 },
256 logical_cpus: num_cpus::get() as u32,
257 architecture: std::env::consts::ARCH.to_string(),
258 features: vec!["sse".to_string(), "sse2".to_string(), "avx".to_string()],
259 cache_hierarchy: CacheHierarchy {
260 l1i_cache: CacheInfo {
261 size: 32 * 1024,
262 line_size: 64,
263 associativity: 8,
264 shared: false,
265 },
266 l1d_cache: CacheInfo {
267 size: 32 * 1024,
268 line_size: 64,
269 associativity: 8,
270 shared: false,
271 },
272 l2_cache: CacheInfo {
273 size: 256 * 1024,
274 line_size: 64,
275 associativity: 8,
276 shared: false,
277 },
278 l3_cache: Some(CacheInfo {
279 size: 8 * 1024 * 1024,
280 line_size: 64,
281 associativity: 16,
282 shared: true,
283 }),
284 l4_cache: None,
285 },
286 }
287 }
288
289 fn detect_numa_topology() -> Vec<NumaNode> {
291 vec![NumaNode {
294 node_id: 0,
295 cpu_cores: (0..num_cpus::get()).collect(),
296 memory_size: 16 * 1024 * 1024 * 1024, memory_bandwidth: 25600.0, distances: [(0, 10)].iter().copied().collect(),
299 utilization: 0.0,
300 }]
301 }
302
303 fn initialize_cores(topology: &CpuTopology, numa_nodes: &[NumaNode]) -> Vec<CpuCore> {
305 (0..topology.logical_cpus)
306 .map(|core_id| {
307 let numa_node = numa_nodes
308 .iter()
309 .find(|node| node.cpu_cores.contains(&(core_id as usize)))
310 .map_or(0, |node| node.node_id);
311
312 CpuCore {
314 core_id: core_id as usize,
315 physical_id: core_id as usize / topology.threads_per_core as usize,
316 numa_node,
317 state: CoreState::Idle,
318 current_frequency: 2400.0, available_frequencies: vec![1200.0, 1800.0, 2400.0, 3200.0],
320 governor: "ondemand".to_string(),
321 utilization: 0.0,
322 temperature: None,
323 }
324 })
325 .collect()
326 }
327
328 pub fn allocate_cpu(&mut self, request: CpuAllocationRequest) -> SklResult<CpuAllocation> {
330 let available_cores = self.find_available_cores(&request)?;
332
333 if available_cores.len() < request.cores_requested as usize {
334 return Err(SklearsError::ResourceAllocationError(format!(
335 "Not enough CPU cores available: requested {}, available {}",
336 request.cores_requested,
337 available_cores.len()
338 )));
339 }
340
341 let selected_cores = self.select_optimal_cores(&available_cores, &request)?;
343
344 let allocation = CpuAllocation {
346 cores: selected_cores.clone(),
347 threads: selected_cores.clone(), affinity_mask: self.create_affinity_mask(&selected_cores),
349 numa_node: self.get_numa_node_for_cores(&selected_cores),
350 frequency: request.min_frequency,
351 governor: Some(self.config.default_governor.clone()),
352 };
353
354 for &core_id in &selected_cores {
356 if let Some(core) = self.cpu_cores.iter_mut().find(|c| c.core_id == core_id) {
357 core.state = CoreState::Running;
358 }
359 }
360
361 self.allocations.insert(request.task_id, allocation.clone());
363 self.stats.total_allocations += 1;
364 self.stats.active_allocations += 1;
365
366 Ok(allocation)
367 }
368
369 pub fn release_cpu(&mut self, task_id: &str) -> SklResult<()> {
371 if let Some(allocation) = self.allocations.remove(task_id) {
372 for &core_id in &allocation.cores {
374 if let Some(core) = self.cpu_cores.iter_mut().find(|c| c.core_id == core_id) {
375 core.state = CoreState::Idle;
376 }
377 }
378 self.stats.active_allocations = self.stats.active_allocations.saturating_sub(1);
379 Ok(())
380 } else {
381 Err(SklearsError::ResourceAllocationError(format!(
382 "No CPU allocation found for task {task_id}"
383 )))
384 }
385 }
386
387 fn find_available_cores(&self, request: &CpuAllocationRequest) -> SklResult<Vec<usize>> {
389 let mut available = Vec::new();
390
391 for core in &self.cpu_cores {
392 if core.state == CoreState::Idle {
393 if let Some(preferred_numa) = request.numa_preference {
395 if core.numa_node != preferred_numa {
396 continue;
397 }
398 }
399
400 if let Some(min_freq) = request.min_frequency {
402 if core.available_frequencies.iter().all(|&f| f < min_freq) {
403 continue;
404 }
405 }
406
407 available.push(core.core_id);
408 }
409 }
410
411 Ok(available)
412 }
413
414 fn select_optimal_cores(
416 &self,
417 available_cores: &[usize],
418 request: &CpuAllocationRequest,
419 ) -> SklResult<Vec<usize>> {
420 let mut selected = Vec::new();
421 let cores_needed = request.cores_requested as usize;
422
423 if available_cores.len() < cores_needed {
424 return Err(SklearsError::ResourceAllocationError(
425 "Not enough cores available".to_string(),
426 ));
427 }
428
429 match &self.config.affinity_strategy {
430 AffinityStrategy::NumaLocal => {
431 let mut numa_groups: HashMap<usize, Vec<usize>> = HashMap::new();
433 for &core_id in available_cores {
434 if let Some(core) = self.cpu_cores.iter().find(|c| c.core_id == core_id) {
435 numa_groups.entry(core.numa_node).or_default().push(core_id);
436 }
437 }
438
439 for (_, mut cores) in numa_groups {
441 if cores.len() >= cores_needed {
442 cores.sort_unstable();
443 selected.extend(cores.into_iter().take(cores_needed));
444 break;
445 }
446 }
447
448 if selected.is_empty() {
450 let mut remaining = cores_needed;
451 for &core_id in available_cores {
452 if remaining == 0 {
453 break;
454 }
455 selected.push(core_id);
456 remaining -= 1;
457 }
458 }
459 }
460 AffinityStrategy::PhysicalCores => {
461 let mut physical_cores = Vec::new();
463 let mut hyperthreads = Vec::new();
464
465 for &core_id in available_cores {
466 if let Some(core) = self.cpu_cores.iter().find(|c| c.core_id == core_id) {
467 if core.core_id == core.physical_id {
468 physical_cores.push(core_id);
469 } else {
470 hyperthreads.push(core_id);
471 }
472 }
473 }
474
475 let from_physical = physical_cores.into_iter().take(cores_needed);
477 selected.extend(from_physical);
478
479 let remaining = cores_needed - selected.len();
481 if remaining > 0 {
482 selected.extend(hyperthreads.into_iter().take(remaining));
483 }
484 }
485 AffinityStrategy::Spread => {
486 let mut numa_assignment: HashMap<usize, Vec<usize>> = HashMap::new();
488 for &core_id in available_cores {
489 if let Some(core) = self.cpu_cores.iter().find(|c| c.core_id == core_id) {
490 numa_assignment
491 .entry(core.numa_node)
492 .or_default()
493 .push(core_id);
494 }
495 }
496
497 let numa_count = numa_assignment.len();
498 let cores_per_numa = cores_needed / numa_count;
499 let mut extra_cores = cores_needed % numa_count;
500
501 for (_, mut cores) in numa_assignment {
502 let take_count = if extra_cores > 0 {
503 extra_cores -= 1;
504 cores_per_numa + 1
505 } else {
506 cores_per_numa
507 };
508
509 cores.sort_unstable();
510 let cores_len = cores.len();
511 selected.extend(cores.into_iter().take(take_count.min(cores_len)));
512 if selected.len() >= cores_needed {
513 break;
514 }
515 }
516 }
517 AffinityStrategy::Pack => {
518 let mut cores_sorted = available_cores.to_vec();
520 cores_sorted.sort_unstable();
521 selected.extend(cores_sorted.into_iter().take(cores_needed));
522 }
523 AffinityStrategy::Custom(pattern) => {
524 for &core_id in pattern {
526 if available_cores.contains(&core_id) && selected.len() < cores_needed {
527 selected.push(core_id);
528 }
529 }
530 for &core_id in available_cores {
532 if !selected.contains(&core_id) && selected.len() < cores_needed {
533 selected.push(core_id);
534 }
535 }
536 }
537 AffinityStrategy::None => {
538 selected.extend(available_cores.iter().take(cores_needed));
540 }
541 }
542
543 if selected.len() < cores_needed {
544 return Err(SklearsError::ResourceAllocationError(format!(
545 "Could not satisfy core allocation: needed {}, got {}",
546 cores_needed,
547 selected.len()
548 )));
549 }
550
551 Ok(selected)
552 }
553
554 fn create_affinity_mask(&self, cores: &[usize]) -> u64 {
556 let mut mask = 0u64;
557 for &core_id in cores {
558 if core_id < 64 {
559 mask |= 1u64 << core_id;
560 }
561 }
562 mask
563 }
564
565 fn get_numa_node_for_cores(&self, cores: &[usize]) -> Option<usize> {
567 let mut numa_counts: HashMap<usize, usize> = HashMap::new();
569
570 for &core_id in cores {
571 if let Some(core) = self.cpu_cores.iter().find(|c| c.core_id == core_id) {
572 *numa_counts.entry(core.numa_node).or_insert(0) += 1;
573 }
574 }
575
576 numa_counts
577 .into_iter()
578 .max_by_key(|(_, count)| *count)
579 .map(|(numa_node, _)| numa_node)
580 }
581
582 #[must_use]
584 pub fn get_stats(&self) -> &CpuStats {
585 &self.stats
586 }
587
588 #[must_use]
590 pub fn get_topology(&self) -> &CpuTopology {
591 &self.topology
592 }
593
594 pub fn update_utilization(&mut self) -> SklResult<()> {
596 let total_utilization: f64 = self.cpu_cores.iter().map(|core| core.utilization).sum();
599
600 self.stats.avg_utilization = total_utilization / self.cpu_cores.len() as f64;
601 self.stats.peak_utilization = self
602 .cpu_cores
603 .iter()
604 .map(|core| core.utilization)
605 .fold(0.0, f64::max);
606
607 Ok(())
608 }
609}
610
611impl Default for CpuManagerConfig {
612 fn default() -> Self {
613 Self {
614 numa_aware: true,
615 affinity_strategy: AffinityStrategy::NumaLocal,
616 enable_dvfs: true,
617 default_governor: "ondemand".to_string(),
618 enable_thermal_management: true,
619 thermal_threshold: 85.0, utilization_threshold: 80.0, }
622 }
623}
624
625impl Default for CpuStats {
626 fn default() -> Self {
627 Self {
628 total_allocations: 0,
629 active_allocations: 0,
630 avg_utilization: 0.0,
631 peak_utilization: 0.0,
632 context_switches_per_sec: 0.0,
633 cache_hit_rate: 0.0,
634 numa_misses: 0,
635 thermal_events: 0,
636 }
637 }
638}