scirs2_core/distributed/cluster/
allocator.rs1use crate::error::{CoreError, CoreResult, ErrorContext};
7use std::collections::HashMap;
8
9#[cfg(feature = "logging")]
10use log;
11
12use super::types::{
13 AllocationId, AllocationStrategy, ComputeCapacity, NodeInfo, NodeStatus, ResourceAllocation,
14 ResourceRequirements, TaskId,
15};
16
17#[derive(Debug)]
19pub struct ResourceAllocator {
20 allocations: HashMap<TaskId, ResourceAllocation>,
21 available_resources: ComputeCapacity,
22 allocation_strategy: AllocationStrategy,
23}
24
25impl Default for ResourceAllocator {
26 fn default() -> Self {
27 Self::new()
28 }
29}
30
31#[allow(dead_code)]
32impl ResourceAllocator {
33 pub fn new() -> Self {
35 Self {
36 allocations: HashMap::new(),
37 available_resources: ComputeCapacity::default(),
38 allocation_strategy: AllocationStrategy::FirstFit,
39 }
40 }
41
42 pub fn update_available_resources(&mut self, nodes: &[NodeInfo]) -> CoreResult<()> {
44 self.available_resources = ComputeCapacity::default();
45
46 for node in nodes {
47 if node.status == NodeStatus::Healthy {
48 self.available_resources.cpu_cores += node.capabilities.cpu_cores;
49 self.available_resources.memory_gb += node.capabilities.memory_gb;
50 self.available_resources.gpu_count += node.capabilities.gpu_count;
51 self.available_resources.disk_space_gb += node.capabilities.disk_space_gb;
52 }
53 }
54
55 for allocation in self.allocations.values() {
57 self.available_resources.cpu_cores = self
58 .available_resources
59 .cpu_cores
60 .saturating_sub(allocation.allocated_resources.cpu_cores);
61 self.available_resources.memory_gb = self
62 .available_resources
63 .memory_gb
64 .saturating_sub(allocation.allocated_resources.memory_gb);
65 self.available_resources.gpu_count = self
66 .available_resources
67 .gpu_count
68 .saturating_sub(allocation.allocated_resources.gpu_count);
69 self.available_resources.disk_space_gb = self
70 .available_resources
71 .disk_space_gb
72 .saturating_sub(allocation.allocated_resources.disk_space_gb);
73 }
74
75 Ok(())
76 }
77
78 pub fn allocate_resources(
80 &self,
81 requirements: &ResourceRequirements,
82 ) -> CoreResult<ResourceAllocation> {
83 if !self.can_satisfy_requirements(requirements) {
85 return Err(CoreError::ResourceError(ErrorContext::new(
86 "Insufficient resources available",
87 )));
88 }
89
90 Ok(ResourceAllocation {
92 allocation_id: AllocationId::generate(),
93 allocated_resources: ComputeCapacity {
94 cpu_cores: requirements.cpu_cores,
95 memory_gb: requirements.memory_gb,
96 gpu_count: requirements.gpu_count,
97 disk_space_gb: requirements.disk_space_gb,
98 },
99 assigned_nodes: Vec::new(), created_at: std::time::Instant::now(),
101 expires_at: None,
102 })
103 }
104
105 fn can_satisfy_requirements(&self, requirements: &ResourceRequirements) -> bool {
107 self.available_resources.cpu_cores >= requirements.cpu_cores
108 && self.available_resources.memory_gb >= requirements.memory_gb
109 && self.available_resources.gpu_count >= requirements.gpu_count
110 && self.available_resources.disk_space_gb >= requirements.disk_space_gb
111 }
112
113 pub fn optimize_resource_allocation(&mut self) -> CoreResult<()> {
115 match self.allocation_strategy {
117 AllocationStrategy::FirstFit => {
118 }
120 AllocationStrategy::BestFit => {
121 self.optimize_best_fit()?;
123 }
124 AllocationStrategy::LoadBalanced => {
125 self.optimize_load_balanced()?;
127 }
128 }
129 Ok(())
130 }
131
132 fn optimize_best_fit(&mut self) -> CoreResult<()> {
134 let mut allocations: Vec<(TaskId, ResourceAllocation)> = self
139 .allocations
140 .iter()
141 .map(|(k, v)| (k.clone(), v.clone()))
142 .collect();
143
144 allocations.sort_by(|a, b| {
147 let weight_a = a.1.allocated_resources.cpu_cores
148 + a.1.allocated_resources.memory_gb
149 + a.1.allocated_resources.gpu_count * 4 + a.1.allocated_resources.disk_space_gb / 10; let weight_b = b.1.allocated_resources.cpu_cores
152 + b.1.allocated_resources.memory_gb
153 + b.1.allocated_resources.gpu_count * 4
154 + b.1.allocated_resources.disk_space_gb / 10;
155 weight_b.cmp(&weight_a)
156 });
157
158 let mut optimizations_made = 0;
163 let fragmentation_score_before = self.calculate_fragmentation_score();
164
165 let (large_allocations, medium_allocations, small_allocations): (Vec<_>, Vec<_>, Vec<_>) = {
167 let mut large = Vec::new();
168 let mut medium = Vec::new();
169 let mut small = Vec::new();
170
171 for (taskid, allocation) in allocations {
172 let total_resources = allocation.allocated_resources.cpu_cores
173 + allocation.allocated_resources.memory_gb
174 + allocation.allocated_resources.gpu_count * 4;
175
176 if total_resources >= 32 {
177 large.push((taskid.clone(), allocation.clone()));
178 } else if total_resources >= 8 {
179 medium.push((taskid.clone(), allocation.clone()));
180 } else {
181 small.push((taskid.clone(), allocation.clone()));
182 }
183 }
184
185 (large, medium, small)
186 };
187
188 for (taskid, allocation) in large_allocations {
191 if allocation.assigned_nodes.len() > 1 {
192 if self.attempt_consolidation(&taskid, &allocation)? {
194 optimizations_made += 1;
195 }
196 }
197 }
198
199 for (taskid, allocation) in medium_allocations {
202 if self.attempt_best_fit_pairing(&taskid, &allocation)? {
203 optimizations_made += 1;
204 }
205 }
206
207 for (taskid, allocation) in small_allocations {
210 if self.attempt_small_allocation_packing(&taskid, &allocation)? {
211 optimizations_made += 1;
212 }
213 }
214
215 let fragmentation_score_after = self.calculate_fragmentation_score();
217 let _improvement = fragmentation_score_before - fragmentation_score_after;
218
219 if optimizations_made > 0 {
220 #[cfg(feature = "logging")]
221 log::info!(
222 "Best-fit optimization completed: {optimizations_made} optimizations, fragmentation improved by {_improvement:.2}"
223 );
224 }
225
226 Ok(())
227 }
228
229 fn optimize_load_balanced(&mut self) -> CoreResult<()> {
231 let mut nodeloads = HashMap::new();
236 let mut total_load = 0.0f64;
237
238 for allocation in self.allocations.values() {
240 for nodeid in &allocation.assigned_nodes {
241 let load_weight =
242 self.calculate_allocation_load_weight(&allocation.allocated_resources);
243 *nodeloads.entry(nodeid.clone()).or_insert(0.0) += load_weight;
244 total_load += load_weight;
245 }
246 }
247
248 let num_active_nodes = nodeloads.len().max(1);
250 let target_load_per_node = total_load / num_active_nodes as f64;
251 let load_variance_threshold = target_load_per_node * 0.15f64; let mut overloaded_nodes = Vec::new();
255 let mut underloaded_nodes = Vec::new();
256
257 for (nodeid, ¤t_load) in &nodeloads {
258 let load_diff = current_load - target_load_per_node;
259 if load_diff > load_variance_threshold {
260 overloaded_nodes.push((nodeid.clone(), current_load, load_diff));
261 } else if load_diff < -load_variance_threshold {
262 underloaded_nodes.push((nodeid.clone(), current_load, -load_diff));
263 }
264 }
265
266 overloaded_nodes.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap());
268 underloaded_nodes.sort_by(|a, b| b.2.partial_cmp(&a.2).unwrap());
269
270 let mut rebalancing_actions = 0;
271 let initial_variance = self.calculate_load_variance(&nodeloads);
272
273 for (overloaded_node, current_load, overloaded_amount) in overloaded_nodes {
275 let moveable_allocations = self.find_moveable_allocations(&overloaded_node);
277
278 for (taskid, allocation) in moveable_allocations {
279 if let Some((target_node, _)) = self.find_best_target_node(
281 &allocation.allocated_resources,
282 &underloaded_nodes
283 .iter()
284 .map(|(nodeid, load, _)| (nodeid.clone(), *load))
285 .collect::<Vec<_>>(),
286 )? {
287 if self.attempt_allocation_migration(&taskid, &target_node)? {
289 rebalancing_actions += 1;
290
291 let allocation_weight =
293 self.calculate_allocation_load_weight(&allocation.allocated_resources);
294 if let Some(old_load) = nodeloads.get_mut(&overloaded_node) {
295 *old_load -= allocation_weight;
296 }
297 if let Some(new_load) = nodeloads.get_mut(&target_node) {
298 *new_load += allocation_weight;
299 }
300
301 if nodeloads.get(&overloaded_node).copied().unwrap_or(0.0)
303 <= target_load_per_node + load_variance_threshold
304 {
305 break; }
307 }
308 }
309 }
310 }
311
312 let single_node_allocations: Vec<(TaskId, ResourceAllocation)> = self
314 .allocations
315 .iter()
316 .filter(|(_, allocation)| allocation.assigned_nodes.len() == 1)
317 .map(|(k, v)| (k.clone(), v.clone()))
318 .collect();
319
320 for (taskid, allocation) in single_node_allocations {
321 let load_weight =
322 self.calculate_allocation_load_weight(&allocation.allocated_resources);
323 if load_weight > target_load_per_node * 0.6 {
324 if self.attempt_allocation_spreading(&taskid, &allocation)? {
326 rebalancing_actions += 1;
327 }
328 }
329 }
330
331 let final_variance = self.calculate_load_variance(&nodeloads);
333 let _variance_improvement = initial_variance - final_variance;
334
335 if rebalancing_actions > 0 {
336 #[cfg(feature = "logging")]
337 log::info!(
338 "Load-balanced optimization completed: {rebalancing_actions} rebalancing actions, \
339 load variance improved by {_variance_improvement:.2}"
340 );
341 }
342
343 Ok(())
344 }
345
346 pub fn get_available_capacity(&self) -> ComputeCapacity {
348 self.available_resources.clone()
349 }
350
351 fn calculate_fragmentation_score(&self) -> f64 {
355 let total_allocated_resources = self.allocations.len() as f64;
358 if total_allocated_resources == 0.0 {
359 return 0.0f64;
360 }
361
362 let split_allocations = self
364 .allocations
365 .values()
366 .filter(|alloc| alloc.assigned_nodes.len() > 1)
367 .count() as f64;
368
369 let mut total_efficiency = 0.0f64;
371 for allocation in self.allocations.values() {
372 let resource_efficiency =
373 self.calculate_resource_efficiency(&allocation.allocated_resources);
374 total_efficiency += resource_efficiency;
375 }
376 let avg_efficiency = total_efficiency / total_allocated_resources;
377
378 let split_ratio = split_allocations / total_allocated_resources;
380 (split_ratio * 0.6 + (1.0 - avg_efficiency) * 0.4f64) * 100.0
381 }
382
383 fn calculate_resource_efficiency(&self, resources: &ComputeCapacity) -> f64 {
385 let cpu_ratio = resources.cpu_cores as f64;
390 let _memory_ratio = resources.memory_gb as f64 / 4.0f64; let gpu_ratio = resources.gpu_count as f64 * 8.0f64; let total_compute = cpu_ratio + gpu_ratio;
394 let balanced_memory = total_compute * 4.0f64;
395
396 let memory_efficiency = if resources.memory_gb as f64 > 0.0 {
398 balanced_memory.min(resources.memory_gb as f64)
399 / balanced_memory.max(resources.memory_gb as f64)
400 } else {
401 1.0
402 };
403
404 let scale_efficiency = if total_compute < 2.0 {
406 total_compute / 2.0 } else {
408 1.0
409 };
410
411 let combined_efficiency = memory_efficiency * 0.7 + scale_efficiency * 0.3f64;
412 combined_efficiency.min(1.0)
413 }
414
415 fn calculate_allocation_load_weight(&self, resources: &ComputeCapacity) -> f64 {
417 let cpu_weight = resources.cpu_cores as f64;
420 let memory_weight = resources.memory_gb as f64 * 0.25f64; let gpu_weight = resources.gpu_count as f64 * 8.0f64; let disk_weight = resources.disk_space_gb as f64 * 0.01f64; cpu_weight + memory_weight + gpu_weight + disk_weight
425 }
426
427 fn calculate_load_variance(&self, nodeloads: &HashMap<String, f64>) -> f64 {
429 if nodeloads.len() <= 1 {
431 return 0.0f64;
432 }
433
434 let total_load: f64 = nodeloads.values().sum();
435 let mean_load = total_load / nodeloads.len() as f64;
436
437 let variance = nodeloads
438 .values()
439 .map(|&load| (load - mean_load).powi(2))
440 .sum::<f64>()
441 / nodeloads.len() as f64;
442
443 variance.sqrt() }
445
446 fn find_moveable_allocations(&self, nodeid: &str) -> Vec<(TaskId, ResourceAllocation)> {
448 self.allocations
450 .iter()
451 .filter(|(_, allocation)| allocation.assigned_nodes.contains(&nodeid.to_string()))
452 .map(|(taskid, allocation)| (taskid.clone(), allocation.clone()))
453 .collect()
454 }
455
456 pub fn available_capacity(&self) -> &ComputeCapacity {
458 &self.available_resources
459 }
460
461 pub fn attempt_consolidation(
463 &mut self,
464 _taskid: &TaskId,
465 _allocation: &ResourceAllocation,
466 ) -> CoreResult<bool> {
467 Ok(false)
469 }
470
471 pub fn attempt_best_fit_pairing(
473 &mut self,
474 _taskid: &TaskId,
475 _allocation: &ResourceAllocation,
476 ) -> CoreResult<bool> {
477 Ok(false)
479 }
480
481 pub fn attempt_small_allocation_packing(
483 &mut self,
484 _taskid: &TaskId,
485 _allocation: &ResourceAllocation,
486 ) -> CoreResult<bool> {
487 Ok(false)
489 }
490
491 pub fn find_best_target_node(
493 &mut self,
494 _resources: &ComputeCapacity,
495 _underloaded_nodes: &[(String, f64)],
496 ) -> CoreResult<Option<(String, f64)>> {
497 Ok(None)
499 }
500
501 pub fn attempt_allocation_migration(
503 &mut self,
504 _taskid: &TaskId,
505 _to_node: &str,
506 ) -> CoreResult<bool> {
507 Ok(false)
509 }
510
511 pub fn attempt_allocation_spreading(
513 &mut self,
514 _taskid: &TaskId,
515 _allocation: &ResourceAllocation,
516 ) -> CoreResult<bool> {
517 Ok(false)
519 }
520
521 pub fn set_allocation_strategy(&mut self, strategy: AllocationStrategy) {
523 self.allocation_strategy = strategy;
524 }
525
526 pub fn get_allocation_strategy(&self) -> AllocationStrategy {
528 self.allocation_strategy
529 }
530
531 pub fn add_allocation(&mut self, task_id: TaskId, allocation: ResourceAllocation) {
533 self.allocations.insert(task_id, allocation);
534 }
535
536 pub fn remove_allocation(&mut self, task_id: &TaskId) -> Option<ResourceAllocation> {
538 self.allocations.remove(task_id)
539 }
540
541 pub fn get_allocations(&self) -> &HashMap<TaskId, ResourceAllocation> {
543 &self.allocations
544 }
545}