scirs2_core/distributed/cluster/
allocator.rs

1//! Resource allocation and management for cluster nodes
2//!
3//! This module provides comprehensive resource allocation capabilities
4//! including various allocation strategies and optimization algorithms.
5
6use crate::error::{CoreError, CoreResult, ErrorContext};
7use std::collections::HashMap;
8
9#[cfg(feature = "logging")]
10use log;
11
12use super::types::{
13    AllocationId, AllocationStrategy, ComputeCapacity, NodeInfo, NodeStatus, ResourceAllocation,
14    ResourceRequirements, TaskId,
15};
16
17/// Resource allocation and management
18#[derive(Debug)]
19pub struct ResourceAllocator {
20    allocations: HashMap<TaskId, ResourceAllocation>,
21    available_resources: ComputeCapacity,
22    allocation_strategy: AllocationStrategy,
23}
24
25impl Default for ResourceAllocator {
26    fn default() -> Self {
27        Self::new()
28    }
29}
30
31#[allow(dead_code)]
32impl ResourceAllocator {
33    /// Create a new resource allocator
34    pub fn new() -> Self {
35        Self {
36            allocations: HashMap::new(),
37            available_resources: ComputeCapacity::default(),
38            allocation_strategy: AllocationStrategy::FirstFit,
39        }
40    }
41
42    /// Update available resources based on current node status
43    pub fn update_available_resources(&mut self, nodes: &[NodeInfo]) -> CoreResult<()> {
44        self.available_resources = ComputeCapacity::default();
45
46        for node in nodes {
47            if node.status == NodeStatus::Healthy {
48                self.available_resources.cpu_cores += node.capabilities.cpu_cores;
49                self.available_resources.memory_gb += node.capabilities.memory_gb;
50                self.available_resources.gpu_count += node.capabilities.gpu_count;
51                self.available_resources.disk_space_gb += node.capabilities.disk_space_gb;
52            }
53        }
54
55        // Subtract already allocated resources
56        for allocation in self.allocations.values() {
57            self.available_resources.cpu_cores = self
58                .available_resources
59                .cpu_cores
60                .saturating_sub(allocation.allocated_resources.cpu_cores);
61            self.available_resources.memory_gb = self
62                .available_resources
63                .memory_gb
64                .saturating_sub(allocation.allocated_resources.memory_gb);
65            self.available_resources.gpu_count = self
66                .available_resources
67                .gpu_count
68                .saturating_sub(allocation.allocated_resources.gpu_count);
69            self.available_resources.disk_space_gb = self
70                .available_resources
71                .disk_space_gb
72                .saturating_sub(allocation.allocated_resources.disk_space_gb);
73        }
74
75        Ok(())
76    }
77
78    /// Allocate resources based on requirements
79    pub fn allocate_resources(
80        &self,
81        requirements: &ResourceRequirements,
82    ) -> CoreResult<ResourceAllocation> {
83        // Check if resources are available
84        if !self.can_satisfy_requirements(requirements) {
85            return Err(CoreError::ResourceError(ErrorContext::new(
86                "Insufficient resources available",
87            )));
88        }
89
90        // Create allocation
91        Ok(ResourceAllocation {
92            allocation_id: AllocationId::generate(),
93            allocated_resources: ComputeCapacity {
94                cpu_cores: requirements.cpu_cores,
95                memory_gb: requirements.memory_gb,
96                gpu_count: requirements.gpu_count,
97                disk_space_gb: requirements.disk_space_gb,
98            },
99            assigned_nodes: Vec::new(), // Would be populated with actual nodes
100            created_at: std::time::Instant::now(),
101            expires_at: None,
102        })
103    }
104
105    /// Check if requirements can be satisfied with available resources
106    fn can_satisfy_requirements(&self, requirements: &ResourceRequirements) -> bool {
107        self.available_resources.cpu_cores >= requirements.cpu_cores
108            && self.available_resources.memory_gb >= requirements.memory_gb
109            && self.available_resources.gpu_count >= requirements.gpu_count
110            && self.available_resources.disk_space_gb >= requirements.disk_space_gb
111    }
112
113    /// Optimize resource allocation using the configured strategy
114    pub fn optimize_resource_allocation(&mut self) -> CoreResult<()> {
115        // Implement resource optimization strategies
116        match self.allocation_strategy {
117            AllocationStrategy::FirstFit => {
118                // First-fit allocation (already implemented)
119            }
120            AllocationStrategy::BestFit => {
121                // Best-fit allocation
122                self.optimize_best_fit()?;
123            }
124            AllocationStrategy::LoadBalanced => {
125                // Load-balanced allocation
126                self.optimize_load_balanced()?;
127            }
128        }
129        Ok(())
130    }
131
132    /// Optimize using best-fit strategy
133    fn optimize_best_fit(&mut self) -> CoreResult<()> {
134        // Best-fit optimization: minimize resource fragmentation by allocating
135        // to nodes that most closely match the resource requirements
136
137        // Get all current allocations sorted by resource usage
138        let mut allocations: Vec<(TaskId, ResourceAllocation)> = self
139            .allocations
140            .iter()
141            .map(|(k, v)| (k.clone(), v.clone()))
142            .collect();
143
144        // Sort allocations by total resource "weight" (descending)
145        // This helps identify heavy allocations that could be better placed
146        allocations.sort_by(|a, b| {
147            let weight_a = a.1.allocated_resources.cpu_cores
148                + a.1.allocated_resources.memory_gb
149                + a.1.allocated_resources.gpu_count * 4  // Weight GPUs more heavily
150                + a.1.allocated_resources.disk_space_gb / 10; // Weight disk less
151            let weight_b = b.1.allocated_resources.cpu_cores
152                + b.1.allocated_resources.memory_gb
153                + b.1.allocated_resources.gpu_count * 4
154                + b.1.allocated_resources.disk_space_gb / 10;
155            weight_b.cmp(&weight_a)
156        });
157
158        // Optimization strategy: consolidate small allocations onto fewer nodes
159        // and ensure large allocations get dedicated resources
160
161        // Track optimization improvements
162        let mut optimizations_made = 0;
163        let fragmentation_score_before = self.calculate_fragmentation_score();
164
165        // Group allocations by size category
166        let (large_allocations, medium_allocations, small_allocations): (Vec<_>, Vec<_>, Vec<_>) = {
167            let mut large = Vec::new();
168            let mut medium = Vec::new();
169            let mut small = Vec::new();
170
171            for (taskid, allocation) in allocations {
172                let total_resources = allocation.allocated_resources.cpu_cores
173                    + allocation.allocated_resources.memory_gb
174                    + allocation.allocated_resources.gpu_count * 4;
175
176                if total_resources >= 32 {
177                    large.push((taskid.clone(), allocation.clone()));
178                } else if total_resources >= 8 {
179                    medium.push((taskid.clone(), allocation.clone()));
180                } else {
181                    small.push((taskid.clone(), allocation.clone()));
182                }
183            }
184
185            (large, medium, small)
186        };
187
188        // Best-fit strategy for large allocations:
189        // Ensure they get dedicated, high-capacity nodes
190        for (taskid, allocation) in large_allocations {
191            if allocation.assigned_nodes.len() > 1 {
192                // Try to consolidate onto a single high-capacity node
193                if self.attempt_consolidation(&taskid, &allocation)? {
194                    optimizations_made += 1;
195                }
196            }
197        }
198
199        // Best-fit strategy for medium allocations:
200        // Pair them efficiently to minimize waste
201        for (taskid, allocation) in medium_allocations {
202            if self.attempt_best_fit_pairing(&taskid, &allocation)? {
203                optimizations_made += 1;
204            }
205        }
206
207        // Best-fit strategy for small allocations:
208        // Pack them tightly onto shared nodes
209        for (taskid, allocation) in small_allocations {
210            if self.attempt_small_allocation_packing(&taskid, &allocation)? {
211                optimizations_made += 1;
212            }
213        }
214
215        // Calculate improvement
216        let fragmentation_score_after = self.calculate_fragmentation_score();
217        let _improvement = fragmentation_score_before - fragmentation_score_after;
218
219        if optimizations_made > 0 {
220            #[cfg(feature = "logging")]
221            log::info!(
222                "Best-fit optimization completed: {optimizations_made} optimizations, fragmentation improved by {_improvement:.2}"
223            );
224        }
225
226        Ok(())
227    }
228
229    /// Optimize using load-balanced strategy
230    fn optimize_load_balanced(&mut self) -> CoreResult<()> {
231        // Load-balanced optimization: distribute workload evenly across nodes
232        // to prevent hot spots and maximize overall cluster throughput
233
234        // Calculate current load distribution across nodes
235        let mut nodeloads = HashMap::new();
236        let mut total_load = 0.0f64;
237
238        // Calculate load for each node based on current allocations
239        for allocation in self.allocations.values() {
240            for nodeid in &allocation.assigned_nodes {
241                let load_weight =
242                    self.calculate_allocation_load_weight(&allocation.allocated_resources);
243                *nodeloads.entry(nodeid.clone()).or_insert(0.0) += load_weight;
244                total_load += load_weight;
245            }
246        }
247
248        // Identify the target load per node (assuming uniform node capabilities)
249        let num_active_nodes = nodeloads.len().max(1);
250        let target_load_per_node = total_load / num_active_nodes as f64;
251        let load_variance_threshold = target_load_per_node * 0.15f64; // 15% variance allowed
252
253        // Find overloaded and underloaded nodes
254        let mut overloaded_nodes = Vec::new();
255        let mut underloaded_nodes = Vec::new();
256
257        for (nodeid, &current_load) in &nodeloads {
258            let load_diff = current_load - target_load_per_node;
259            if load_diff > load_variance_threshold {
260                overloaded_nodes.push((nodeid.clone(), current_load, load_diff));
261            } else if load_diff < -load_variance_threshold {
262                underloaded_nodes.push((nodeid.clone(), current_load, -load_diff));
263            }
264        }
265
266        // Sort by load difference (most extreme first)
267        overloaded_nodes.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap());
268        underloaded_nodes.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap());
269
270        let mut rebalancing_actions = 0;
271        let initial_variance = self.calculate_load_variance(&nodeloads);
272
273        // Rebalancing algorithm: move allocations from overloaded to underloaded nodes
274        for (overloaded_node, current_load, overloaded_amount) in overloaded_nodes {
275            // Find allocations on this overloaded node that can be moved
276            let moveable_allocations = self.find_moveable_allocations(&overloaded_node);
277
278            for (taskid, allocation) in moveable_allocations {
279                // Find the best underloaded node for this allocation
280                if let Some((target_node, _)) = self.find_best_target_node(
281                    &allocation.allocated_resources,
282                    &underloaded_nodes
283                        .iter()
284                        .map(|(nodeid, load, _)| (nodeid.clone(), *load))
285                        .collect::<Vec<_>>(),
286                )? {
287                    // Attempt to move the allocation
288                    if self.attempt_allocation_migration(&taskid, &target_node)? {
289                        rebalancing_actions += 1;
290
291                        // Update node loads tracking
292                        let allocation_weight =
293                            self.calculate_allocation_load_weight(&allocation.allocated_resources);
294                        if let Some(old_load) = nodeloads.get_mut(&overloaded_node) {
295                            *old_load -= allocation_weight;
296                        }
297                        if let Some(new_load) = nodeloads.get_mut(&target_node) {
298                            *new_load += allocation_weight;
299                        }
300
301                        // Check if we've balanced enough
302                        if nodeloads.get(&overloaded_node).copied().unwrap_or(0.0)
303                            <= target_load_per_node + load_variance_threshold
304                        {
305                            break; // This node is now balanced
306                        }
307                    }
308                }
309            }
310        }
311
312        // Secondary optimization: spread single large allocations across multiple nodes
313        let single_node_allocations: Vec<(TaskId, ResourceAllocation)> = self
314            .allocations
315            .iter()
316            .filter(|(_, allocation)| allocation.assigned_nodes.len() == 1)
317            .map(|(k, v)| (k.clone(), v.clone()))
318            .collect();
319
320        for (taskid, allocation) in single_node_allocations {
321            let load_weight =
322                self.calculate_allocation_load_weight(&allocation.allocated_resources);
323            if load_weight > target_load_per_node * 0.6 {
324                // Large allocation
325                if self.attempt_allocation_spreading(&taskid, &allocation)? {
326                    rebalancing_actions += 1;
327                }
328            }
329        }
330
331        // Calculate improvement in load balance
332        let final_variance = self.calculate_load_variance(&nodeloads);
333        let _variance_improvement = initial_variance - final_variance;
334
335        if rebalancing_actions > 0 {
336            #[cfg(feature = "logging")]
337            log::info!(
338                "Load-balanced optimization completed: {rebalancing_actions} rebalancing actions, \
339                 load variance improved by {_variance_improvement:.2}"
340            );
341        }
342
343        Ok(())
344    }
345
346    /// Get available capacity
347    pub fn get_available_capacity(&self) -> ComputeCapacity {
348        self.available_resources.clone()
349    }
350
351    // Helper methods for optimization algorithms
352
353    /// Calculate fragmentation score (lower is better)
354    fn calculate_fragmentation_score(&self) -> f64 {
355        // Calculate how fragmented the resource allocation is
356        // Lower score = better (less fragmented)
357        let total_allocated_resources = self.allocations.len() as f64;
358        if total_allocated_resources == 0.0 {
359            return 0.0f64;
360        }
361
362        // Count allocations that are split across multiple nodes
363        let split_allocations = self
364            .allocations
365            .values()
366            .filter(|alloc| alloc.assigned_nodes.len() > 1)
367            .count() as f64;
368
369        // Calculate average resource utilization efficiency
370        let mut total_efficiency = 0.0f64;
371        for allocation in self.allocations.values() {
372            let resource_efficiency =
373                self.calculate_resource_efficiency(&allocation.allocated_resources);
374            total_efficiency += resource_efficiency;
375        }
376        let avg_efficiency = total_efficiency / total_allocated_resources;
377
378        // Fragmentation score: high split ratio + low efficiency = high fragmentation
379        let split_ratio = split_allocations / total_allocated_resources;
380        (split_ratio * 0.6 + (1.0 - avg_efficiency) * 0.4f64) * 100.0
381    }
382
383    /// Calculate resource efficiency (1.0 = perfect, 0.0 = inefficient)
384    fn calculate_resource_efficiency(&self, resources: &ComputeCapacity) -> f64 {
385        // Calculate how efficiently resources are being used
386        // 1.0 = perfect efficiency, 0.0 = completely inefficient
387
388        // Check resource balance (CPU:Memory:GPU ratio)
389        let cpu_ratio = resources.cpu_cores as f64;
390        let _memory_ratio = resources.memory_gb as f64 / 4.0f64; // Assume 4GB per CPU core is balanced
391        let gpu_ratio = resources.gpu_count as f64 * 8.0f64; // Each GPU equivalent to 8 CPU cores
392
393        let total_compute = cpu_ratio + gpu_ratio;
394        let balanced_memory = total_compute * 4.0f64;
395
396        // Efficiency is higher when memory allocation matches compute needs
397        let memory_efficiency = if resources.memory_gb as f64 > 0.0 {
398            balanced_memory.min(resources.memory_gb as f64)
399                / balanced_memory.max(resources.memory_gb as f64)
400        } else {
401            1.0
402        };
403
404        // Also consider if resources are "too small" (overhead penalty)
405        let scale_efficiency = if total_compute < 2.0 {
406            total_compute / 2.0 // Penalty for very small allocations
407        } else {
408            1.0
409        };
410
411        let combined_efficiency = memory_efficiency * 0.7 + scale_efficiency * 0.3f64;
412        combined_efficiency.min(1.0)
413    }
414
415    /// Calculate the load weight of an allocation
416    fn calculate_allocation_load_weight(&self, resources: &ComputeCapacity) -> f64 {
417        // Calculate the "load weight" of an allocation for load balancing
418        // Higher weight = more demanding allocation
419        let cpu_weight = resources.cpu_cores as f64;
420        let memory_weight = resources.memory_gb as f64 * 0.25f64; // Memory is less constraining than CPU
421        let gpu_weight = resources.gpu_count as f64 * 8.0f64; // GPUs are very constraining
422        let disk_weight = resources.disk_space_gb as f64 * 0.01f64; // Disk is least constraining
423
424        cpu_weight + memory_weight + gpu_weight + disk_weight
425    }
426
427    /// Calculate load variance across nodes
428    fn calculate_load_variance(&self, nodeloads: &HashMap<String, f64>) -> f64 {
429        // Calculate variance in load distribution across nodes
430        if nodeloads.len() <= 1 {
431            return 0.0f64;
432        }
433
434        let total_load: f64 = nodeloads.values().sum();
435        let mean_load = total_load / nodeloads.len() as f64;
436
437        let variance = nodeloads
438            .values()
439            .map(|&load| (load - mean_load).powi(2))
440            .sum::<f64>()
441            / nodeloads.len() as f64;
442
443        variance.sqrt() // Return standard deviation
444    }
445
446    /// Find allocations that can be moved from a specific node
447    fn find_moveable_allocations(&self, nodeid: &str) -> Vec<(TaskId, ResourceAllocation)> {
448        // Find allocations on a specific node that can potentially be moved
449        self.allocations
450            .iter()
451            .filter(|(_, allocation)| allocation.assigned_nodes.contains(&nodeid.to_string()))
452            .map(|(taskid, allocation)| (taskid.clone(), allocation.clone()))
453            .collect()
454    }
455
456    /// Get reference to available capacity
457    pub fn available_capacity(&self) -> &ComputeCapacity {
458        &self.available_resources
459    }
460
461    /// Attempt to consolidate an allocation onto fewer nodes
462    pub fn attempt_consolidation(
463        &mut self,
464        _taskid: &TaskId,
465        _allocation: &ResourceAllocation,
466    ) -> CoreResult<bool> {
467        // Placeholder implementation
468        Ok(false)
469    }
470
471    /// Attempt to pair allocations for better fit
472    pub fn attempt_best_fit_pairing(
473        &mut self,
474        _taskid: &TaskId,
475        _allocation: &ResourceAllocation,
476    ) -> CoreResult<bool> {
477        // Placeholder implementation
478        Ok(false)
479    }
480
481    /// Attempt to pack small allocations tightly
482    pub fn attempt_small_allocation_packing(
483        &mut self,
484        _taskid: &TaskId,
485        _allocation: &ResourceAllocation,
486    ) -> CoreResult<bool> {
487        // Placeholder implementation
488        Ok(false)
489    }
490
491    /// Find the best target node for an allocation
492    pub fn find_best_target_node(
493        &mut self,
494        _resources: &ComputeCapacity,
495        _underloaded_nodes: &[(String, f64)],
496    ) -> CoreResult<Option<(String, f64)>> {
497        // Placeholder implementation
498        Ok(None)
499    }
500
501    /// Attempt to migrate an allocation to a different node
502    pub fn attempt_allocation_migration(
503        &mut self,
504        _taskid: &TaskId,
505        _to_node: &str,
506    ) -> CoreResult<bool> {
507        // Placeholder implementation
508        Ok(false)
509    }
510
511    /// Attempt to spread an allocation across multiple nodes
512    pub fn attempt_allocation_spreading(
513        &mut self,
514        _taskid: &TaskId,
515        _allocation: &ResourceAllocation,
516    ) -> CoreResult<bool> {
517        // Placeholder implementation
518        Ok(false)
519    }
520
521    /// Set allocation strategy
522    pub fn set_allocation_strategy(&mut self, strategy: AllocationStrategy) {
523        self.allocation_strategy = strategy;
524    }
525
526    /// Get current allocation strategy
527    pub fn get_allocation_strategy(&self) -> AllocationStrategy {
528        self.allocation_strategy
529    }
530
531    /// Add an allocation
532    pub fn add_allocation(&mut self, task_id: TaskId, allocation: ResourceAllocation) {
533        self.allocations.insert(task_id, allocation);
534    }
535
536    /// Remove an allocation
537    pub fn remove_allocation(&mut self, task_id: &TaskId) -> Option<ResourceAllocation> {
538        self.allocations.remove(task_id)
539    }
540
541    /// Get all current allocations
542    pub fn get_allocations(&self) -> &HashMap<TaskId, ResourceAllocation> {
543        &self.allocations
544    }
545}