1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use std::collections::{BTreeMap, HashMap, VecDeque};
4use std::sync::{Arc, RwLock};
5use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct ResourceRequest {
9 pub request_id: String,
10 pub job_id: String,
11 pub resource_type: ResourceType,
12 pub quantity: u64,
13 pub duration: Option<Duration>,
14 pub priority: Priority,
15 pub constraints: ResourceConstraints,
16 pub created_at: u64,
17 pub deadline: Option<u64>,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub enum ResourceType {
22 CPU {
23 cores: u32,
24 },
25 Memory {
26 gb: u32,
27 },
28 GPU {
29 count: u32,
30 memory_gb: u32,
31 model: Option<String>,
32 },
33 Storage {
34 gb: u64,
35 speed: StorageSpeed,
36 },
37 Network {
38 bandwidth_mbps: u32,
39 },
40 Custom {
41 name: String,
42 units: u64,
43 },
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum StorageSpeed {
48 HDD,
49 SSD,
50 NVMe,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ResourceConstraints {
55 pub node_affinity: Vec<String>,
56 pub node_anti_affinity: Vec<String>,
57 pub require_dedicated: bool,
58 pub allow_preemption: bool,
59 pub max_nodes: Option<u32>,
60 pub locality_preference: LocalityPreference,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub enum LocalityPreference {
65 None,
66 SameRack,
67 SameDatacenter,
68 SameRegion,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)]
72pub enum Priority {
73 Critical = 4,
74 High = 3,
75 Normal = 2,
76 Low = 1,
77 BestEffort = 0,
78}
79
80#[derive(Debug, Clone)]
81pub struct ResourcePool {
82 pub pool_id: String,
83 pub name: String,
84 pub resource_type: ResourceType,
85 pub total_capacity: u64,
86 pub available_capacity: u64,
87 pub reserved_capacity: u64,
88 pub allocations: HashMap<String, ResourceAllocation>,
89 pub maintenance_windows: Vec<MaintenanceWindow>,
90 pub utilization_history: VecDeque<UtilizationSnapshot>,
91 pub cost_per_unit: f64,
92 pub tags: HashMap<String, String>,
93}
94
95#[derive(Debug, Clone)]
96pub struct ResourceAllocation {
97 pub allocation_id: String,
98 pub request_id: String,
99 pub job_id: String,
100 pub allocated_amount: u64,
101 pub start_time: Instant,
102 pub end_time: Option<Instant>,
103 pub actual_usage: Option<u64>,
104 pub status: AllocationStatus,
105}
106
107#[derive(Debug, Clone)]
108pub enum AllocationStatus {
109 Pending,
110 Active,
111 Completed,
112 Failed,
113 Preempted,
114}
115
116#[derive(Debug, Clone)]
117pub struct MaintenanceWindow {
118 pub window_id: String,
119 pub start_time: u64,
120 pub end_time: u64,
121 pub description: String,
122 pub affects_capacity: bool,
123 pub capacity_reduction: Option<u64>,
124}
125
126#[derive(Debug, Clone)]
127pub struct UtilizationSnapshot {
128 pub timestamp: Instant,
129 pub utilized_capacity: u64,
130 pub total_requests: u32,
131 pub average_request_size: f64,
132}
133
134pub struct ResourceScheduler {
135 pools: Arc<RwLock<HashMap<String, ResourcePool>>>,
136 pending_requests: Arc<RwLock<BTreeMap<u64, ResourceRequest>>>, active_allocations: Arc<RwLock<HashMap<String, ResourceAllocation>>>,
138 scheduling_policies: Arc<RwLock<SchedulingPolicies>>,
139 statistics: Arc<RwLock<SchedulingStatistics>>,
140 cost_optimizer: Arc<RwLock<CostOptimizer>>,
141}
142
143#[derive(Debug, Clone)]
144pub struct SchedulingPolicies {
145 pub default_scheduling_algorithm: SchedulingAlgorithm,
146 pub preemption_enabled: bool,
147 pub overcommit_ratio: f32,
148 pub fragmentation_threshold: f32,
149 pub load_balancing_enabled: bool,
150 pub cost_optimization_enabled: bool,
151 pub priority_weights: HashMap<Priority, f32>,
152}
153
154#[derive(Debug, Clone)]
155pub enum SchedulingAlgorithm {
156 FirstFit,
157 BestFit,
158 WorstFit,
159 NextFit,
160 QuickFit,
161 BuddySystem,
162 LoadBalanced,
163 CostOptimized,
164}
165
166#[derive(Debug, Default, Clone)]
167pub struct SchedulingStatistics {
168 pub total_requests: u64,
169 pub successful_allocations: u64,
170 pub failed_allocations: u64,
171 pub preempted_allocations: u64,
172 pub average_allocation_time: Duration,
173 pub average_utilization: f64,
174 pub fragmentation_ratio: f64,
175 pub cost_efficiency: f64,
176}
177
178pub struct CostOptimizer {
179 pub optimization_strategy: CostOptimizationStrategy,
180 pub cost_history: VecDeque<CostSnapshot>,
181 pub budget_limit: Option<f64>,
182 pub cost_alerts: Vec<CostAlert>,
183}
184
185#[derive(Debug, Clone)]
186pub enum CostOptimizationStrategy {
187 MinimizeCost,
188 MaximizeUtilization,
189 BalanceCostPerformance,
190 SpotInstanceOptimization,
191}
192
193#[derive(Debug, Clone)]
194pub struct CostSnapshot {
195 pub timestamp: Instant,
196 pub total_cost: f64,
197 pub cost_by_resource_type: HashMap<String, f64>,
198 pub efficiency_score: f64,
199}
200
201#[derive(Debug, Clone)]
202pub struct CostAlert {
203 pub alert_id: String,
204 pub threshold: f64,
205 pub current_value: f64,
206 pub message: String,
207 pub severity: AlertSeverity,
208}
209
210#[derive(Debug, Clone)]
211pub enum AlertSeverity {
212 Info,
213 Warning,
214 Critical,
215}
216
217impl Default for ResourceScheduler {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223impl ResourceScheduler {
224 pub fn new() -> Self {
225 Self {
226 pools: Arc::new(RwLock::new(HashMap::new())),
227 pending_requests: Arc::new(RwLock::new(BTreeMap::new())),
228 active_allocations: Arc::new(RwLock::new(HashMap::new())),
229 scheduling_policies: Arc::new(RwLock::new(SchedulingPolicies::default())),
230 statistics: Arc::new(RwLock::new(SchedulingStatistics::default())),
231 cost_optimizer: Arc::new(RwLock::new(CostOptimizer::new())),
232 }
233 }
234
235 pub fn register_resource_pool(&self, pool: ResourcePool) -> Result<()> {
236 let mut pools = self
237 .pools
238 .write()
239 .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on pools"))?;
240
241 pools.insert(pool.pool_id.clone(), pool);
242 Ok(())
243 }
244
245 pub fn submit_resource_request(&self, mut request: ResourceRequest) -> Result<String> {
246 request.created_at = SystemTime::now()
247 .duration_since(UNIX_EPOCH)
248 .expect("SystemTime should be after UNIX_EPOCH")
249 .as_secs();
250
251 let priority_score = self.calculate_priority_score(&request)?;
252
253 {
254 let mut pending = self
255 .pending_requests
256 .write()
257 .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on pending requests"))?;
258 pending.insert(priority_score, request.clone());
259 }
260
261 {
263 let mut stats = self
264 .statistics
265 .write()
266 .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on statistics"))?;
267 stats.total_requests += 1;
268 }
269
270 self.schedule_pending_requests()?;
272
273 Ok(request.request_id)
274 }
275
276 fn calculate_priority_score(&self, request: &ResourceRequest) -> Result<u64> {
277 let policies = self
278 .scheduling_policies
279 .read()
280 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on scheduling policies"))?;
281
282 let priority_weight = policies.priority_weights.get(&request.priority).unwrap_or(&1.0);
283
284 let age_factor = (SystemTime::now()
286 .duration_since(UNIX_EPOCH)
287 .expect("SystemTime should be after UNIX_EPOCH")
288 .as_secs()
289 - request.created_at) as f64;
290
291 let deadline_urgency = if let Some(deadline) = request.deadline {
292 let time_to_deadline = deadline.saturating_sub(
293 SystemTime::now()
294 .duration_since(UNIX_EPOCH)
295 .expect("SystemTime should be after UNIX_EPOCH")
296 .as_secs(),
297 ) as f64;
298 1.0 / (time_to_deadline + 1.0) } else {
300 0.0
301 };
302
303 let score =
304 (*priority_weight as f64 * 1000000.0) + age_factor + (deadline_urgency * 100000.0);
305 Ok(score as u64)
306 }
307
308 pub fn schedule_pending_requests(&self) -> Result<Vec<String>> {
309 let mut allocated_requests = Vec::new();
310
311 let pending_requests = {
312 let pending = self
313 .pending_requests
314 .read()
315 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pending requests"))?;
316 pending.values().cloned().collect::<Vec<_>>()
317 };
318
319 for request in pending_requests.iter().rev() {
320 if let Ok(allocation_id) = self.try_allocate_request(request) {
322 allocated_requests.push(allocation_id);
323
324 let priority_score = self.calculate_priority_score(request)?;
326 let mut pending = self.pending_requests.write().map_err(|_| {
327 anyhow::anyhow!("Failed to acquire write lock on pending requests")
328 })?;
329 pending.remove(&priority_score);
330 }
331 }
332
333 Ok(allocated_requests)
334 }
335
336 fn try_allocate_request(&self, request: &ResourceRequest) -> Result<String> {
337 let policies = self
338 .scheduling_policies
339 .read()
340 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on scheduling policies"))?;
341
342 match policies.default_scheduling_algorithm {
343 SchedulingAlgorithm::FirstFit => self.first_fit_allocation(request),
344 SchedulingAlgorithm::BestFit => self.best_fit_allocation(request),
345 SchedulingAlgorithm::WorstFit => self.worst_fit_allocation(request),
346 SchedulingAlgorithm::LoadBalanced => self.load_balanced_allocation(request),
347 SchedulingAlgorithm::CostOptimized => self.cost_optimized_allocation(request),
348 _ => self.first_fit_allocation(request), }
350 }
351
352 fn first_fit_allocation(&self, request: &ResourceRequest) -> Result<String> {
353 let pools = self
354 .pools
355 .read()
356 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
357
358 for (pool_id, pool) in pools.iter() {
359 if self.can_satisfy_request(pool, request)? {
360 let pool_id_clone = pool_id.clone();
361 drop(pools); return self.create_allocation(&pool_id_clone, request);
363 }
364 }
365
366 Err(anyhow::anyhow!("No suitable pool found for request"))
367 }
368
369 fn best_fit_allocation(&self, request: &ResourceRequest) -> Result<String> {
370 let pools = self
371 .pools
372 .read()
373 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
374
375 let mut best_pool = None;
376 let mut min_waste = u64::MAX;
377
378 for (pool_id, pool) in pools.iter() {
379 if self.can_satisfy_request(pool, request)? {
380 let waste = pool.available_capacity.saturating_sub(request.quantity);
381 if waste < min_waste {
382 min_waste = waste;
383 best_pool = Some(pool_id.clone());
384 }
385 }
386 }
387
388 if let Some(pool_id) = best_pool {
389 drop(pools);
390 self.create_allocation(&pool_id, request)
391 } else {
392 Err(anyhow::anyhow!("No suitable pool found for request"))
393 }
394 }
395
396 fn worst_fit_allocation(&self, request: &ResourceRequest) -> Result<String> {
397 let pools = self
398 .pools
399 .read()
400 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
401
402 let mut best_pool = None;
403 let mut max_remaining = 0;
404
405 for (pool_id, pool) in pools.iter() {
406 if self.can_satisfy_request(pool, request)? {
407 let remaining = pool.available_capacity.saturating_sub(request.quantity);
408 if remaining > max_remaining {
409 max_remaining = remaining;
410 best_pool = Some(pool_id.clone());
411 }
412 }
413 }
414
415 if let Some(pool_id) = best_pool {
416 drop(pools);
417 self.create_allocation(&pool_id, request)
418 } else {
419 Err(anyhow::anyhow!("No suitable pool found for request"))
420 }
421 }
422
423 fn load_balanced_allocation(&self, request: &ResourceRequest) -> Result<String> {
424 let pools = self
425 .pools
426 .read()
427 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
428
429 let mut best_pool = None;
430 let mut min_utilization = f64::MAX;
431
432 for (pool_id, pool) in pools.iter() {
433 if self.can_satisfy_request(pool, request)? {
434 let utilization = (pool.total_capacity - pool.available_capacity) as f64
435 / pool.total_capacity as f64;
436 if utilization < min_utilization {
437 min_utilization = utilization;
438 best_pool = Some(pool_id.clone());
439 }
440 }
441 }
442
443 if let Some(pool_id) = best_pool {
444 drop(pools);
445 self.create_allocation(&pool_id, request)
446 } else {
447 Err(anyhow::anyhow!("No suitable pool found for request"))
448 }
449 }
450
451 fn cost_optimized_allocation(&self, request: &ResourceRequest) -> Result<String> {
452 let pools = self
453 .pools
454 .read()
455 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
456
457 let mut best_pool = None;
458 let mut min_cost = f64::MAX;
459
460 for (pool_id, pool) in pools.iter() {
461 if self.can_satisfy_request(pool, request)? {
462 let cost = pool.cost_per_unit * request.quantity as f64;
463 if cost < min_cost {
464 min_cost = cost;
465 best_pool = Some(pool_id.clone());
466 }
467 }
468 }
469
470 if let Some(pool_id) = best_pool {
471 drop(pools);
472 self.create_allocation(&pool_id, request)
473 } else {
474 Err(anyhow::anyhow!("No suitable pool found for request"))
475 }
476 }
477
478 fn can_satisfy_request(&self, pool: &ResourcePool, request: &ResourceRequest) -> Result<bool> {
479 if pool.available_capacity < request.quantity {
481 return Ok(false);
482 }
483
484 if !self.is_resource_type_compatible(&pool.resource_type, &request.resource_type) {
486 return Ok(false);
487 }
488
489 if !request.constraints.node_affinity.is_empty() {
491 let has_affinity =
492 request.constraints.node_affinity.iter().any(|tag| pool.tags.contains_key(tag));
493 if !has_affinity {
494 return Ok(false);
495 }
496 }
497
498 if !request.constraints.node_anti_affinity.is_empty() {
499 let has_anti_affinity = request
500 .constraints
501 .node_anti_affinity
502 .iter()
503 .any(|tag| pool.tags.contains_key(tag));
504 if has_anti_affinity {
505 return Ok(false);
506 }
507 }
508
509 if self.is_maintenance_conflicting(pool, request)? {
511 return Ok(false);
512 }
513
514 Ok(true)
515 }
516
517 fn is_resource_type_compatible(
518 &self,
519 pool_type: &ResourceType,
520 request_type: &ResourceType,
521 ) -> bool {
522 match (pool_type, request_type) {
523 (ResourceType::CPU { .. }, ResourceType::CPU { .. }) => true,
524 (ResourceType::Memory { .. }, ResourceType::Memory { .. }) => true,
525 (ResourceType::GPU { .. }, ResourceType::GPU { .. }) => true,
526 (ResourceType::Storage { .. }, ResourceType::Storage { .. }) => true,
527 (ResourceType::Network { .. }, ResourceType::Network { .. }) => true,
528 (
529 ResourceType::Custom {
530 name: pool_name, ..
531 },
532 ResourceType::Custom { name: req_name, .. },
533 ) => pool_name == req_name,
534 _ => false,
535 }
536 }
537
538 fn is_maintenance_conflicting(
539 &self,
540 pool: &ResourcePool,
541 request: &ResourceRequest,
542 ) -> Result<bool> {
543 let current_time = SystemTime::now()
544 .duration_since(UNIX_EPOCH)
545 .expect("SystemTime should be after UNIX_EPOCH")
546 .as_secs();
547
548 let request_end_time = if let Some(duration) = request.duration {
549 current_time + duration.as_secs()
550 } else {
551 request.deadline.unwrap_or(current_time + 3600) };
553
554 for window in &pool.maintenance_windows {
555 if window.affects_capacity
556 && current_time < window.end_time
557 && request_end_time > window.start_time
558 {
559 return Ok(true);
560 }
561 }
562
563 Ok(false)
564 }
565
566 fn create_allocation(&self, pool_id: &str, request: &ResourceRequest) -> Result<String> {
567 let allocation_id = uuid::Uuid::new_v4().to_string();
568
569 {
571 let mut pools = self
572 .pools
573 .write()
574 .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on pools"))?;
575
576 if let Some(pool) = pools.get_mut(pool_id) {
577 if pool.available_capacity < request.quantity {
578 return Err(anyhow::anyhow!("Insufficient capacity"));
579 }
580
581 pool.available_capacity -= request.quantity;
582
583 let allocation = ResourceAllocation {
584 allocation_id: allocation_id.clone(),
585 request_id: request.request_id.clone(),
586 job_id: request.job_id.clone(),
587 allocated_amount: request.quantity,
588 start_time: Instant::now(),
589 end_time: request.duration.map(|d| Instant::now() + d),
590 actual_usage: None,
591 status: AllocationStatus::Active,
592 };
593
594 pool.allocations.insert(allocation_id.clone(), allocation.clone());
595
596 let mut active = self.active_allocations.write().map_err(|_| {
598 anyhow::anyhow!("Failed to acquire write lock on active allocations")
599 })?;
600 active.insert(allocation_id.clone(), allocation);
601 } else {
602 return Err(anyhow::anyhow!("Pool not found: {}", pool_id));
603 }
604 }
605
606 {
608 let mut stats = self
609 .statistics
610 .write()
611 .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on statistics"))?;
612 stats.successful_allocations += 1;
613 }
614
615 Ok(allocation_id)
616 }
617
618 pub fn release_allocation(&self, allocation_id: &str) -> Result<()> {
619 let allocation = {
621 let mut active = self.active_allocations.write().map_err(|_| {
622 anyhow::anyhow!("Failed to acquire write lock on active allocations")
623 })?;
624 active
625 .remove(allocation_id)
626 .ok_or_else(|| anyhow::anyhow!("Allocation not found: {}", allocation_id))?
627 };
628
629 {
631 let mut pools = self
632 .pools
633 .write()
634 .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on pools"))?;
635
636 for pool in pools.values_mut() {
637 if let Some(mut pool_allocation) = pool.allocations.remove(allocation_id) {
638 pool_allocation.status = AllocationStatus::Completed;
639 pool_allocation.end_time = Some(Instant::now());
640
641 pool.available_capacity += allocation.allocated_amount;
642 break;
643 }
644 }
645 }
646
647 self.schedule_pending_requests()?;
649
650 Ok(())
651 }
652
653 pub fn update_actual_usage(&self, allocation_id: &str, actual_usage: u64) -> Result<()> {
654 let mut active = self
655 .active_allocations
656 .write()
657 .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on active allocations"))?;
658
659 if let Some(allocation) = active.get_mut(allocation_id) {
660 allocation.actual_usage = Some(actual_usage);
661 } else {
662 return Err(anyhow::anyhow!("Allocation not found: {}", allocation_id));
663 }
664
665 Ok(())
666 }
667
668 pub fn get_resource_utilization(&self) -> Result<HashMap<String, f64>> {
669 let pools = self
670 .pools
671 .read()
672 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
673
674 let mut utilization = HashMap::new();
675
676 for (pool_id, pool) in pools.iter() {
677 let util =
678 (pool.total_capacity - pool.available_capacity) as f64 / pool.total_capacity as f64;
679 utilization.insert(pool_id.clone(), util);
680 }
681
682 Ok(utilization)
683 }
684
685 pub fn get_pending_requests_count(&self) -> Result<usize> {
686 let pending = self
687 .pending_requests
688 .read()
689 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pending requests"))?;
690 Ok(pending.len())
691 }
692
693 pub fn get_active_allocations_count(&self) -> Result<usize> {
694 let active = self
695 .active_allocations
696 .read()
697 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on active allocations"))?;
698 Ok(active.len())
699 }
700
701 pub fn get_statistics(&self) -> Result<SchedulingStatistics> {
702 let stats = self
703 .statistics
704 .read()
705 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on statistics"))?;
706 Ok((*stats).clone())
707 }
708
709 pub fn optimize_costs(&self) -> Result<Vec<CostOptimizationRecommendation>> {
710 let _cost_optimizer = self
711 .cost_optimizer
712 .read()
713 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on cost optimizer"))?;
714
715 let pools = self
716 .pools
717 .read()
718 .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
719
720 let mut recommendations = Vec::new();
721
722 for (pool_id, pool) in pools.iter() {
724 let utilization =
725 (pool.total_capacity - pool.available_capacity) as f64 / pool.total_capacity as f64;
726
727 if utilization < 0.3 {
728 recommendations.push(CostOptimizationRecommendation {
730 pool_id: pool_id.clone(),
731 recommendation_type: RecommendationType::ScaleDown,
732 description: format!(
733 "Pool {} is only {:.1}% utilized. Consider scaling down.",
734 pool_id,
735 utilization * 100.0
736 ),
737 potential_savings: pool.cost_per_unit * (pool.available_capacity as f64 * 0.7),
738 });
739 }
740
741 if utilization > 0.9 {
742 recommendations.push(CostOptimizationRecommendation {
744 pool_id: pool_id.clone(),
745 recommendation_type: RecommendationType::ScaleUp,
746 description: format!(
747 "Pool {} is {:.1}% utilized. Consider scaling up to avoid bottlenecks.",
748 pool_id,
749 utilization * 100.0
750 ),
751 potential_savings: 0.0, });
753 }
754 }
755
756 Ok(recommendations)
757 }
758}
759
760#[derive(Debug, Clone)]
761pub struct CostOptimizationRecommendation {
762 pub pool_id: String,
763 pub recommendation_type: RecommendationType,
764 pub description: String,
765 pub potential_savings: f64,
766}
767
768#[derive(Debug, Clone)]
769pub enum RecommendationType {
770 ScaleUp,
771 ScaleDown,
772 ChangeInstanceType,
773 UseSpotInstances,
774 ConsolidateWorkloads,
775}
776
777impl SchedulingPolicies {
778 fn default() -> Self {
779 let mut priority_weights = HashMap::new();
780 priority_weights.insert(Priority::Critical, 4.0);
781 priority_weights.insert(Priority::High, 3.0);
782 priority_weights.insert(Priority::Normal, 2.0);
783 priority_weights.insert(Priority::Low, 1.0);
784 priority_weights.insert(Priority::BestEffort, 0.5);
785
786 Self {
787 default_scheduling_algorithm: SchedulingAlgorithm::FirstFit,
788 preemption_enabled: false,
789 overcommit_ratio: 1.0,
790 fragmentation_threshold: 0.1,
791 load_balancing_enabled: true,
792 cost_optimization_enabled: false,
793 priority_weights,
794 }
795 }
796}
797
798impl CostOptimizer {
799 fn new() -> Self {
800 Self {
801 optimization_strategy: CostOptimizationStrategy::BalanceCostPerformance,
802 cost_history: VecDeque::with_capacity(1000),
803 budget_limit: None,
804 cost_alerts: Vec::new(),
805 }
806 }
807}
808
809#[cfg(test)]
810mod tests {
811 use super::*;
812
813 #[test]
814 fn test_resource_scheduler_creation() {
815 let scheduler = ResourceScheduler::new();
816 assert_eq!(
817 scheduler.get_pending_requests_count().expect("operation failed in test"),
818 0
819 );
820 assert_eq!(
821 scheduler.get_active_allocations_count().expect("operation failed in test"),
822 0
823 );
824 }
825
826 #[test]
827 fn test_resource_pool_registration() {
828 let scheduler = ResourceScheduler::new();
829
830 let pool = ResourcePool {
831 pool_id: "cpu-pool-1".to_string(),
832 name: "CPU Pool 1".to_string(),
833 resource_type: ResourceType::CPU { cores: 16 },
834 total_capacity: 16,
835 available_capacity: 16,
836 reserved_capacity: 0,
837 allocations: HashMap::new(),
838 maintenance_windows: Vec::new(),
839 utilization_history: VecDeque::new(),
840 cost_per_unit: 0.1,
841 tags: HashMap::new(),
842 };
843
844 scheduler.register_resource_pool(pool).expect("operation failed in test");
845
846 let utilization = scheduler.get_resource_utilization().expect("operation failed in test");
847 assert_eq!(utilization.get("cpu-pool-1"), Some(&0.0));
848 }
849
850 #[test]
851 fn test_resource_request_submission() {
852 let scheduler = ResourceScheduler::new();
853
854 let pool = ResourcePool {
856 pool_id: "cpu-pool-1".to_string(),
857 name: "CPU Pool 1".to_string(),
858 resource_type: ResourceType::CPU { cores: 16 },
859 total_capacity: 16,
860 available_capacity: 16,
861 reserved_capacity: 0,
862 allocations: HashMap::new(),
863 maintenance_windows: Vec::new(),
864 utilization_history: VecDeque::new(),
865 cost_per_unit: 0.1,
866 tags: HashMap::new(),
867 };
868 scheduler.register_resource_pool(pool).expect("operation failed in test");
869
870 let request = ResourceRequest {
871 request_id: "req-1".to_string(),
872 job_id: "job-1".to_string(),
873 resource_type: ResourceType::CPU { cores: 4 },
874 quantity: 4,
875 duration: Some(Duration::from_secs(3600)),
876 priority: Priority::Normal,
877 constraints: ResourceConstraints {
878 node_affinity: vec![],
879 node_anti_affinity: vec![],
880 require_dedicated: false,
881 allow_preemption: false,
882 max_nodes: None,
883 locality_preference: LocalityPreference::None,
884 },
885 created_at: 0,
886 deadline: None,
887 };
888
889 let request_id =
890 scheduler.submit_resource_request(request).expect("operation failed in test");
891 assert_eq!(request_id, "req-1");
892
893 assert_eq!(
895 scheduler.get_active_allocations_count().expect("operation failed in test"),
896 1
897 );
898
899 let utilization = scheduler.get_resource_utilization().expect("operation failed in test");
900 assert_eq!(utilization.get("cpu-pool-1"), Some(&0.25)); }
902
903 #[test]
904 fn test_allocation_release() {
905 let scheduler = ResourceScheduler::new();
906
907 let pool = ResourcePool {
909 pool_id: "cpu-pool-1".to_string(),
910 name: "CPU Pool 1".to_string(),
911 resource_type: ResourceType::CPU { cores: 16 },
912 total_capacity: 16,
913 available_capacity: 16,
914 reserved_capacity: 0,
915 allocations: HashMap::new(),
916 maintenance_windows: Vec::new(),
917 utilization_history: VecDeque::new(),
918 cost_per_unit: 0.1,
919 tags: HashMap::new(),
920 };
921 scheduler.register_resource_pool(pool).expect("operation failed in test");
922
923 let request = ResourceRequest {
925 request_id: "req-1".to_string(),
926 job_id: "job-1".to_string(),
927 resource_type: ResourceType::CPU { cores: 4 },
928 quantity: 4,
929 duration: Some(Duration::from_secs(3600)),
930 priority: Priority::Normal,
931 constraints: ResourceConstraints {
932 node_affinity: vec![],
933 node_anti_affinity: vec![],
934 require_dedicated: false,
935 allow_preemption: false,
936 max_nodes: None,
937 locality_preference: LocalityPreference::None,
938 },
939 created_at: 0,
940 deadline: None,
941 };
942
943 scheduler.submit_resource_request(request).expect("operation failed in test");
944
945 let active_allocations =
947 scheduler.active_allocations.read().expect("lock should not be poisoned");
948 let allocation_id =
949 active_allocations.keys().next().expect("operation failed in test").clone();
950 drop(active_allocations);
951
952 scheduler.release_allocation(&allocation_id).expect("operation failed in test");
954
955 assert_eq!(
957 scheduler.get_active_allocations_count().expect("operation failed in test"),
958 0
959 );
960
961 let utilization = scheduler.get_resource_utilization().expect("operation failed in test");
962 assert_eq!(utilization.get("cpu-pool-1"), Some(&0.0));
963 }
964
965 #[test]
966 fn test_priority_scheduling() {
967 let scheduler = ResourceScheduler::new();
968
969 let pool = ResourcePool {
971 pool_id: "cpu-pool-1".to_string(),
972 name: "CPU Pool 1".to_string(),
973 resource_type: ResourceType::CPU { cores: 4 },
974 total_capacity: 4,
975 available_capacity: 4,
976 reserved_capacity: 0,
977 allocations: HashMap::new(),
978 maintenance_windows: Vec::new(),
979 utilization_history: VecDeque::new(),
980 cost_per_unit: 0.1,
981 tags: HashMap::new(),
982 };
983 scheduler.register_resource_pool(pool).expect("operation failed in test");
984
985 let low_priority_request = ResourceRequest {
987 request_id: "req-low".to_string(),
988 job_id: "job-low".to_string(),
989 resource_type: ResourceType::CPU { cores: 4 },
990 quantity: 4,
991 duration: Some(Duration::from_secs(3600)),
992 priority: Priority::Low,
993 constraints: ResourceConstraints {
994 node_affinity: vec![],
995 node_anti_affinity: vec![],
996 require_dedicated: false,
997 allow_preemption: false,
998 max_nodes: None,
999 locality_preference: LocalityPreference::None,
1000 },
1001 created_at: 0,
1002 deadline: None,
1003 };
1004
1005 scheduler
1006 .submit_resource_request(low_priority_request)
1007 .expect("operation failed in test");
1008
1009 let high_priority_request = ResourceRequest {
1011 request_id: "req-high".to_string(),
1012 job_id: "job-high".to_string(),
1013 resource_type: ResourceType::CPU { cores: 2 },
1014 quantity: 2,
1015 duration: Some(Duration::from_secs(1800)),
1016 priority: Priority::High,
1017 constraints: ResourceConstraints {
1018 node_affinity: vec![],
1019 node_anti_affinity: vec![],
1020 require_dedicated: false,
1021 allow_preemption: false,
1022 max_nodes: None,
1023 locality_preference: LocalityPreference::None,
1024 },
1025 created_at: 0,
1026 deadline: None,
1027 };
1028
1029 scheduler
1030 .submit_resource_request(high_priority_request)
1031 .expect("operation failed in test");
1032
1033 assert!(scheduler.get_pending_requests_count().expect("operation failed in test") > 0);
1036 }
1037}