Skip to main content

trustformers_training/
resource_scheduling.rs

1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use std::collections::{BTreeMap, HashMap, VecDeque};
4use std::sync::{Arc, RwLock};
5use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct ResourceRequest {
9    pub request_id: String,
10    pub job_id: String,
11    pub resource_type: ResourceType,
12    pub quantity: u64,
13    pub duration: Option<Duration>,
14    pub priority: Priority,
15    pub constraints: ResourceConstraints,
16    pub created_at: u64,
17    pub deadline: Option<u64>,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub enum ResourceType {
22    CPU {
23        cores: u32,
24    },
25    Memory {
26        gb: u32,
27    },
28    GPU {
29        count: u32,
30        memory_gb: u32,
31        model: Option<String>,
32    },
33    Storage {
34        gb: u64,
35        speed: StorageSpeed,
36    },
37    Network {
38        bandwidth_mbps: u32,
39    },
40    Custom {
41        name: String,
42        units: u64,
43    },
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum StorageSpeed {
48    HDD,
49    SSD,
50    NVMe,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ResourceConstraints {
55    pub node_affinity: Vec<String>,
56    pub node_anti_affinity: Vec<String>,
57    pub require_dedicated: bool,
58    pub allow_preemption: bool,
59    pub max_nodes: Option<u32>,
60    pub locality_preference: LocalityPreference,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub enum LocalityPreference {
65    None,
66    SameRack,
67    SameDatacenter,
68    SameRegion,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)]
72pub enum Priority {
73    Critical = 4,
74    High = 3,
75    Normal = 2,
76    Low = 1,
77    BestEffort = 0,
78}
79
80#[derive(Debug, Clone)]
81pub struct ResourcePool {
82    pub pool_id: String,
83    pub name: String,
84    pub resource_type: ResourceType,
85    pub total_capacity: u64,
86    pub available_capacity: u64,
87    pub reserved_capacity: u64,
88    pub allocations: HashMap<String, ResourceAllocation>,
89    pub maintenance_windows: Vec<MaintenanceWindow>,
90    pub utilization_history: VecDeque<UtilizationSnapshot>,
91    pub cost_per_unit: f64,
92    pub tags: HashMap<String, String>,
93}
94
95#[derive(Debug, Clone)]
96pub struct ResourceAllocation {
97    pub allocation_id: String,
98    pub request_id: String,
99    pub job_id: String,
100    pub allocated_amount: u64,
101    pub start_time: Instant,
102    pub end_time: Option<Instant>,
103    pub actual_usage: Option<u64>,
104    pub status: AllocationStatus,
105}
106
107#[derive(Debug, Clone)]
108pub enum AllocationStatus {
109    Pending,
110    Active,
111    Completed,
112    Failed,
113    Preempted,
114}
115
116#[derive(Debug, Clone)]
117pub struct MaintenanceWindow {
118    pub window_id: String,
119    pub start_time: u64,
120    pub end_time: u64,
121    pub description: String,
122    pub affects_capacity: bool,
123    pub capacity_reduction: Option<u64>,
124}
125
126#[derive(Debug, Clone)]
127pub struct UtilizationSnapshot {
128    pub timestamp: Instant,
129    pub utilized_capacity: u64,
130    pub total_requests: u32,
131    pub average_request_size: f64,
132}
133
134pub struct ResourceScheduler {
135    pools: Arc<RwLock<HashMap<String, ResourcePool>>>,
136    pending_requests: Arc<RwLock<BTreeMap<u64, ResourceRequest>>>, // Ordered by priority and timestamp
137    active_allocations: Arc<RwLock<HashMap<String, ResourceAllocation>>>,
138    scheduling_policies: Arc<RwLock<SchedulingPolicies>>,
139    statistics: Arc<RwLock<SchedulingStatistics>>,
140    cost_optimizer: Arc<RwLock<CostOptimizer>>,
141}
142
143#[derive(Debug, Clone)]
144pub struct SchedulingPolicies {
145    pub default_scheduling_algorithm: SchedulingAlgorithm,
146    pub preemption_enabled: bool,
147    pub overcommit_ratio: f32,
148    pub fragmentation_threshold: f32,
149    pub load_balancing_enabled: bool,
150    pub cost_optimization_enabled: bool,
151    pub priority_weights: HashMap<Priority, f32>,
152}
153
154#[derive(Debug, Clone)]
155pub enum SchedulingAlgorithm {
156    FirstFit,
157    BestFit,
158    WorstFit,
159    NextFit,
160    QuickFit,
161    BuddySystem,
162    LoadBalanced,
163    CostOptimized,
164}
165
166#[derive(Debug, Default, Clone)]
167pub struct SchedulingStatistics {
168    pub total_requests: u64,
169    pub successful_allocations: u64,
170    pub failed_allocations: u64,
171    pub preempted_allocations: u64,
172    pub average_allocation_time: Duration,
173    pub average_utilization: f64,
174    pub fragmentation_ratio: f64,
175    pub cost_efficiency: f64,
176}
177
178pub struct CostOptimizer {
179    pub optimization_strategy: CostOptimizationStrategy,
180    pub cost_history: VecDeque<CostSnapshot>,
181    pub budget_limit: Option<f64>,
182    pub cost_alerts: Vec<CostAlert>,
183}
184
185#[derive(Debug, Clone)]
186pub enum CostOptimizationStrategy {
187    MinimizeCost,
188    MaximizeUtilization,
189    BalanceCostPerformance,
190    SpotInstanceOptimization,
191}
192
193#[derive(Debug, Clone)]
194pub struct CostSnapshot {
195    pub timestamp: Instant,
196    pub total_cost: f64,
197    pub cost_by_resource_type: HashMap<String, f64>,
198    pub efficiency_score: f64,
199}
200
201#[derive(Debug, Clone)]
202pub struct CostAlert {
203    pub alert_id: String,
204    pub threshold: f64,
205    pub current_value: f64,
206    pub message: String,
207    pub severity: AlertSeverity,
208}
209
210#[derive(Debug, Clone)]
211pub enum AlertSeverity {
212    Info,
213    Warning,
214    Critical,
215}
216
217impl Default for ResourceScheduler {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223impl ResourceScheduler {
224    pub fn new() -> Self {
225        Self {
226            pools: Arc::new(RwLock::new(HashMap::new())),
227            pending_requests: Arc::new(RwLock::new(BTreeMap::new())),
228            active_allocations: Arc::new(RwLock::new(HashMap::new())),
229            scheduling_policies: Arc::new(RwLock::new(SchedulingPolicies::default())),
230            statistics: Arc::new(RwLock::new(SchedulingStatistics::default())),
231            cost_optimizer: Arc::new(RwLock::new(CostOptimizer::new())),
232        }
233    }
234
235    pub fn register_resource_pool(&self, pool: ResourcePool) -> Result<()> {
236        let mut pools = self
237            .pools
238            .write()
239            .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on pools"))?;
240
241        pools.insert(pool.pool_id.clone(), pool);
242        Ok(())
243    }
244
245    pub fn submit_resource_request(&self, mut request: ResourceRequest) -> Result<String> {
246        request.created_at = SystemTime::now()
247            .duration_since(UNIX_EPOCH)
248            .expect("SystemTime should be after UNIX_EPOCH")
249            .as_secs();
250
251        let priority_score = self.calculate_priority_score(&request)?;
252
253        {
254            let mut pending = self
255                .pending_requests
256                .write()
257                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on pending requests"))?;
258            pending.insert(priority_score, request.clone());
259        }
260
261        // Update statistics
262        {
263            let mut stats = self
264                .statistics
265                .write()
266                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on statistics"))?;
267            stats.total_requests += 1;
268        }
269
270        // Try immediate scheduling
271        self.schedule_pending_requests()?;
272
273        Ok(request.request_id)
274    }
275
276    fn calculate_priority_score(&self, request: &ResourceRequest) -> Result<u64> {
277        let policies = self
278            .scheduling_policies
279            .read()
280            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on scheduling policies"))?;
281
282        let priority_weight = policies.priority_weights.get(&request.priority).unwrap_or(&1.0);
283
284        // Combine priority, age, and deadline urgency
285        let age_factor = (SystemTime::now()
286            .duration_since(UNIX_EPOCH)
287            .expect("SystemTime should be after UNIX_EPOCH")
288            .as_secs()
289            - request.created_at) as f64;
290
291        let deadline_urgency = if let Some(deadline) = request.deadline {
292            let time_to_deadline = deadline.saturating_sub(
293                SystemTime::now()
294                    .duration_since(UNIX_EPOCH)
295                    .expect("SystemTime should be after UNIX_EPOCH")
296                    .as_secs(),
297            ) as f64;
298            1.0 / (time_to_deadline + 1.0) // Higher urgency as deadline approaches
299        } else {
300            0.0
301        };
302
303        let score =
304            (*priority_weight as f64 * 1000000.0) + age_factor + (deadline_urgency * 100000.0);
305        Ok(score as u64)
306    }
307
308    pub fn schedule_pending_requests(&self) -> Result<Vec<String>> {
309        let mut allocated_requests = Vec::new();
310
311        let pending_requests = {
312            let pending = self
313                .pending_requests
314                .read()
315                .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pending requests"))?;
316            pending.values().cloned().collect::<Vec<_>>()
317        };
318
319        for request in pending_requests.iter().rev() {
320            // Process highest priority first
321            if let Ok(allocation_id) = self.try_allocate_request(request) {
322                allocated_requests.push(allocation_id);
323
324                // Remove from pending
325                let priority_score = self.calculate_priority_score(request)?;
326                let mut pending = self.pending_requests.write().map_err(|_| {
327                    anyhow::anyhow!("Failed to acquire write lock on pending requests")
328                })?;
329                pending.remove(&priority_score);
330            }
331        }
332
333        Ok(allocated_requests)
334    }
335
336    fn try_allocate_request(&self, request: &ResourceRequest) -> Result<String> {
337        let policies = self
338            .scheduling_policies
339            .read()
340            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on scheduling policies"))?;
341
342        match policies.default_scheduling_algorithm {
343            SchedulingAlgorithm::FirstFit => self.first_fit_allocation(request),
344            SchedulingAlgorithm::BestFit => self.best_fit_allocation(request),
345            SchedulingAlgorithm::WorstFit => self.worst_fit_allocation(request),
346            SchedulingAlgorithm::LoadBalanced => self.load_balanced_allocation(request),
347            SchedulingAlgorithm::CostOptimized => self.cost_optimized_allocation(request),
348            _ => self.first_fit_allocation(request), // Default fallback
349        }
350    }
351
352    fn first_fit_allocation(&self, request: &ResourceRequest) -> Result<String> {
353        let pools = self
354            .pools
355            .read()
356            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
357
358        for (pool_id, pool) in pools.iter() {
359            if self.can_satisfy_request(pool, request)? {
360                let pool_id_clone = pool_id.clone();
361                drop(pools); // Release read lock before acquiring write lock
362                return self.create_allocation(&pool_id_clone, request);
363            }
364        }
365
366        Err(anyhow::anyhow!("No suitable pool found for request"))
367    }
368
369    fn best_fit_allocation(&self, request: &ResourceRequest) -> Result<String> {
370        let pools = self
371            .pools
372            .read()
373            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
374
375        let mut best_pool = None;
376        let mut min_waste = u64::MAX;
377
378        for (pool_id, pool) in pools.iter() {
379            if self.can_satisfy_request(pool, request)? {
380                let waste = pool.available_capacity.saturating_sub(request.quantity);
381                if waste < min_waste {
382                    min_waste = waste;
383                    best_pool = Some(pool_id.clone());
384                }
385            }
386        }
387
388        if let Some(pool_id) = best_pool {
389            drop(pools);
390            self.create_allocation(&pool_id, request)
391        } else {
392            Err(anyhow::anyhow!("No suitable pool found for request"))
393        }
394    }
395
396    fn worst_fit_allocation(&self, request: &ResourceRequest) -> Result<String> {
397        let pools = self
398            .pools
399            .read()
400            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
401
402        let mut best_pool = None;
403        let mut max_remaining = 0;
404
405        for (pool_id, pool) in pools.iter() {
406            if self.can_satisfy_request(pool, request)? {
407                let remaining = pool.available_capacity.saturating_sub(request.quantity);
408                if remaining > max_remaining {
409                    max_remaining = remaining;
410                    best_pool = Some(pool_id.clone());
411                }
412            }
413        }
414
415        if let Some(pool_id) = best_pool {
416            drop(pools);
417            self.create_allocation(&pool_id, request)
418        } else {
419            Err(anyhow::anyhow!("No suitable pool found for request"))
420        }
421    }
422
423    fn load_balanced_allocation(&self, request: &ResourceRequest) -> Result<String> {
424        let pools = self
425            .pools
426            .read()
427            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
428
429        let mut best_pool = None;
430        let mut min_utilization = f64::MAX;
431
432        for (pool_id, pool) in pools.iter() {
433            if self.can_satisfy_request(pool, request)? {
434                let utilization = (pool.total_capacity - pool.available_capacity) as f64
435                    / pool.total_capacity as f64;
436                if utilization < min_utilization {
437                    min_utilization = utilization;
438                    best_pool = Some(pool_id.clone());
439                }
440            }
441        }
442
443        if let Some(pool_id) = best_pool {
444            drop(pools);
445            self.create_allocation(&pool_id, request)
446        } else {
447            Err(anyhow::anyhow!("No suitable pool found for request"))
448        }
449    }
450
451    fn cost_optimized_allocation(&self, request: &ResourceRequest) -> Result<String> {
452        let pools = self
453            .pools
454            .read()
455            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
456
457        let mut best_pool = None;
458        let mut min_cost = f64::MAX;
459
460        for (pool_id, pool) in pools.iter() {
461            if self.can_satisfy_request(pool, request)? {
462                let cost = pool.cost_per_unit * request.quantity as f64;
463                if cost < min_cost {
464                    min_cost = cost;
465                    best_pool = Some(pool_id.clone());
466                }
467            }
468        }
469
470        if let Some(pool_id) = best_pool {
471            drop(pools);
472            self.create_allocation(&pool_id, request)
473        } else {
474            Err(anyhow::anyhow!("No suitable pool found for request"))
475        }
476    }
477
478    fn can_satisfy_request(&self, pool: &ResourcePool, request: &ResourceRequest) -> Result<bool> {
479        // Check basic capacity
480        if pool.available_capacity < request.quantity {
481            return Ok(false);
482        }
483
484        // Check resource type compatibility
485        if !self.is_resource_type_compatible(&pool.resource_type, &request.resource_type) {
486            return Ok(false);
487        }
488
489        // Check constraints
490        if !request.constraints.node_affinity.is_empty() {
491            let has_affinity =
492                request.constraints.node_affinity.iter().any(|tag| pool.tags.contains_key(tag));
493            if !has_affinity {
494                return Ok(false);
495            }
496        }
497
498        if !request.constraints.node_anti_affinity.is_empty() {
499            let has_anti_affinity = request
500                .constraints
501                .node_anti_affinity
502                .iter()
503                .any(|tag| pool.tags.contains_key(tag));
504            if has_anti_affinity {
505                return Ok(false);
506            }
507        }
508
509        // Check maintenance windows
510        if self.is_maintenance_conflicting(pool, request)? {
511            return Ok(false);
512        }
513
514        Ok(true)
515    }
516
517    fn is_resource_type_compatible(
518        &self,
519        pool_type: &ResourceType,
520        request_type: &ResourceType,
521    ) -> bool {
522        match (pool_type, request_type) {
523            (ResourceType::CPU { .. }, ResourceType::CPU { .. }) => true,
524            (ResourceType::Memory { .. }, ResourceType::Memory { .. }) => true,
525            (ResourceType::GPU { .. }, ResourceType::GPU { .. }) => true,
526            (ResourceType::Storage { .. }, ResourceType::Storage { .. }) => true,
527            (ResourceType::Network { .. }, ResourceType::Network { .. }) => true,
528            (
529                ResourceType::Custom {
530                    name: pool_name, ..
531                },
532                ResourceType::Custom { name: req_name, .. },
533            ) => pool_name == req_name,
534            _ => false,
535        }
536    }
537
538    fn is_maintenance_conflicting(
539        &self,
540        pool: &ResourcePool,
541        request: &ResourceRequest,
542    ) -> Result<bool> {
543        let current_time = SystemTime::now()
544            .duration_since(UNIX_EPOCH)
545            .expect("SystemTime should be after UNIX_EPOCH")
546            .as_secs();
547
548        let request_end_time = if let Some(duration) = request.duration {
549            current_time + duration.as_secs()
550        } else {
551            request.deadline.unwrap_or(current_time + 3600) // Default 1 hour if no duration or deadline
552        };
553
554        for window in &pool.maintenance_windows {
555            if window.affects_capacity
556                && current_time < window.end_time
557                && request_end_time > window.start_time
558            {
559                return Ok(true);
560            }
561        }
562
563        Ok(false)
564    }
565
566    fn create_allocation(&self, pool_id: &str, request: &ResourceRequest) -> Result<String> {
567        let allocation_id = uuid::Uuid::new_v4().to_string();
568
569        // Update pool capacity
570        {
571            let mut pools = self
572                .pools
573                .write()
574                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on pools"))?;
575
576            if let Some(pool) = pools.get_mut(pool_id) {
577                if pool.available_capacity < request.quantity {
578                    return Err(anyhow::anyhow!("Insufficient capacity"));
579                }
580
581                pool.available_capacity -= request.quantity;
582
583                let allocation = ResourceAllocation {
584                    allocation_id: allocation_id.clone(),
585                    request_id: request.request_id.clone(),
586                    job_id: request.job_id.clone(),
587                    allocated_amount: request.quantity,
588                    start_time: Instant::now(),
589                    end_time: request.duration.map(|d| Instant::now() + d),
590                    actual_usage: None,
591                    status: AllocationStatus::Active,
592                };
593
594                pool.allocations.insert(allocation_id.clone(), allocation.clone());
595
596                // Add to active allocations
597                let mut active = self.active_allocations.write().map_err(|_| {
598                    anyhow::anyhow!("Failed to acquire write lock on active allocations")
599                })?;
600                active.insert(allocation_id.clone(), allocation);
601            } else {
602                return Err(anyhow::anyhow!("Pool not found: {}", pool_id));
603            }
604        }
605
606        // Update statistics
607        {
608            let mut stats = self
609                .statistics
610                .write()
611                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on statistics"))?;
612            stats.successful_allocations += 1;
613        }
614
615        Ok(allocation_id)
616    }
617
618    pub fn release_allocation(&self, allocation_id: &str) -> Result<()> {
619        // Find and remove from active allocations
620        let allocation = {
621            let mut active = self.active_allocations.write().map_err(|_| {
622                anyhow::anyhow!("Failed to acquire write lock on active allocations")
623            })?;
624            active
625                .remove(allocation_id)
626                .ok_or_else(|| anyhow::anyhow!("Allocation not found: {}", allocation_id))?
627        };
628
629        // Find the pool and release capacity
630        {
631            let mut pools = self
632                .pools
633                .write()
634                .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on pools"))?;
635
636            for pool in pools.values_mut() {
637                if let Some(mut pool_allocation) = pool.allocations.remove(allocation_id) {
638                    pool_allocation.status = AllocationStatus::Completed;
639                    pool_allocation.end_time = Some(Instant::now());
640
641                    pool.available_capacity += allocation.allocated_amount;
642                    break;
643                }
644            }
645        }
646
647        // Try to schedule pending requests now that resources are available
648        self.schedule_pending_requests()?;
649
650        Ok(())
651    }
652
653    pub fn update_actual_usage(&self, allocation_id: &str, actual_usage: u64) -> Result<()> {
654        let mut active = self
655            .active_allocations
656            .write()
657            .map_err(|_| anyhow::anyhow!("Failed to acquire write lock on active allocations"))?;
658
659        if let Some(allocation) = active.get_mut(allocation_id) {
660            allocation.actual_usage = Some(actual_usage);
661        } else {
662            return Err(anyhow::anyhow!("Allocation not found: {}", allocation_id));
663        }
664
665        Ok(())
666    }
667
668    pub fn get_resource_utilization(&self) -> Result<HashMap<String, f64>> {
669        let pools = self
670            .pools
671            .read()
672            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
673
674        let mut utilization = HashMap::new();
675
676        for (pool_id, pool) in pools.iter() {
677            let util =
678                (pool.total_capacity - pool.available_capacity) as f64 / pool.total_capacity as f64;
679            utilization.insert(pool_id.clone(), util);
680        }
681
682        Ok(utilization)
683    }
684
685    pub fn get_pending_requests_count(&self) -> Result<usize> {
686        let pending = self
687            .pending_requests
688            .read()
689            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pending requests"))?;
690        Ok(pending.len())
691    }
692
693    pub fn get_active_allocations_count(&self) -> Result<usize> {
694        let active = self
695            .active_allocations
696            .read()
697            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on active allocations"))?;
698        Ok(active.len())
699    }
700
701    pub fn get_statistics(&self) -> Result<SchedulingStatistics> {
702        let stats = self
703            .statistics
704            .read()
705            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on statistics"))?;
706        Ok((*stats).clone())
707    }
708
709    pub fn optimize_costs(&self) -> Result<Vec<CostOptimizationRecommendation>> {
710        let _cost_optimizer = self
711            .cost_optimizer
712            .read()
713            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on cost optimizer"))?;
714
715        let pools = self
716            .pools
717            .read()
718            .map_err(|_| anyhow::anyhow!("Failed to acquire read lock on pools"))?;
719
720        let mut recommendations = Vec::new();
721
722        // Identify underutilized resources
723        for (pool_id, pool) in pools.iter() {
724            let utilization =
725                (pool.total_capacity - pool.available_capacity) as f64 / pool.total_capacity as f64;
726
727            if utilization < 0.3 {
728                // Less than 30% utilized
729                recommendations.push(CostOptimizationRecommendation {
730                    pool_id: pool_id.clone(),
731                    recommendation_type: RecommendationType::ScaleDown,
732                    description: format!(
733                        "Pool {} is only {:.1}% utilized. Consider scaling down.",
734                        pool_id,
735                        utilization * 100.0
736                    ),
737                    potential_savings: pool.cost_per_unit * (pool.available_capacity as f64 * 0.7),
738                });
739            }
740
741            if utilization > 0.9 {
742                // More than 90% utilized
743                recommendations.push(CostOptimizationRecommendation {
744                    pool_id: pool_id.clone(),
745                    recommendation_type: RecommendationType::ScaleUp,
746                    description: format!(
747                        "Pool {} is {:.1}% utilized. Consider scaling up to avoid bottlenecks.",
748                        pool_id,
749                        utilization * 100.0
750                    ),
751                    potential_savings: 0.0, // This is actually a cost increase but prevents performance issues
752                });
753            }
754        }
755
756        Ok(recommendations)
757    }
758}
759
760#[derive(Debug, Clone)]
761pub struct CostOptimizationRecommendation {
762    pub pool_id: String,
763    pub recommendation_type: RecommendationType,
764    pub description: String,
765    pub potential_savings: f64,
766}
767
768#[derive(Debug, Clone)]
769pub enum RecommendationType {
770    ScaleUp,
771    ScaleDown,
772    ChangeInstanceType,
773    UseSpotInstances,
774    ConsolidateWorkloads,
775}
776
777impl SchedulingPolicies {
778    fn default() -> Self {
779        let mut priority_weights = HashMap::new();
780        priority_weights.insert(Priority::Critical, 4.0);
781        priority_weights.insert(Priority::High, 3.0);
782        priority_weights.insert(Priority::Normal, 2.0);
783        priority_weights.insert(Priority::Low, 1.0);
784        priority_weights.insert(Priority::BestEffort, 0.5);
785
786        Self {
787            default_scheduling_algorithm: SchedulingAlgorithm::FirstFit,
788            preemption_enabled: false,
789            overcommit_ratio: 1.0,
790            fragmentation_threshold: 0.1,
791            load_balancing_enabled: true,
792            cost_optimization_enabled: false,
793            priority_weights,
794        }
795    }
796}
797
798impl CostOptimizer {
799    fn new() -> Self {
800        Self {
801            optimization_strategy: CostOptimizationStrategy::BalanceCostPerformance,
802            cost_history: VecDeque::with_capacity(1000),
803            budget_limit: None,
804            cost_alerts: Vec::new(),
805        }
806    }
807}
808
809#[cfg(test)]
810mod tests {
811    use super::*;
812
813    #[test]
814    fn test_resource_scheduler_creation() {
815        let scheduler = ResourceScheduler::new();
816        assert_eq!(
817            scheduler.get_pending_requests_count().expect("operation failed in test"),
818            0
819        );
820        assert_eq!(
821            scheduler.get_active_allocations_count().expect("operation failed in test"),
822            0
823        );
824    }
825
826    #[test]
827    fn test_resource_pool_registration() {
828        let scheduler = ResourceScheduler::new();
829
830        let pool = ResourcePool {
831            pool_id: "cpu-pool-1".to_string(),
832            name: "CPU Pool 1".to_string(),
833            resource_type: ResourceType::CPU { cores: 16 },
834            total_capacity: 16,
835            available_capacity: 16,
836            reserved_capacity: 0,
837            allocations: HashMap::new(),
838            maintenance_windows: Vec::new(),
839            utilization_history: VecDeque::new(),
840            cost_per_unit: 0.1,
841            tags: HashMap::new(),
842        };
843
844        scheduler.register_resource_pool(pool).expect("operation failed in test");
845
846        let utilization = scheduler.get_resource_utilization().expect("operation failed in test");
847        assert_eq!(utilization.get("cpu-pool-1"), Some(&0.0));
848    }
849
850    #[test]
851    fn test_resource_request_submission() {
852        let scheduler = ResourceScheduler::new();
853
854        // Register a pool first
855        let pool = ResourcePool {
856            pool_id: "cpu-pool-1".to_string(),
857            name: "CPU Pool 1".to_string(),
858            resource_type: ResourceType::CPU { cores: 16 },
859            total_capacity: 16,
860            available_capacity: 16,
861            reserved_capacity: 0,
862            allocations: HashMap::new(),
863            maintenance_windows: Vec::new(),
864            utilization_history: VecDeque::new(),
865            cost_per_unit: 0.1,
866            tags: HashMap::new(),
867        };
868        scheduler.register_resource_pool(pool).expect("operation failed in test");
869
870        let request = ResourceRequest {
871            request_id: "req-1".to_string(),
872            job_id: "job-1".to_string(),
873            resource_type: ResourceType::CPU { cores: 4 },
874            quantity: 4,
875            duration: Some(Duration::from_secs(3600)),
876            priority: Priority::Normal,
877            constraints: ResourceConstraints {
878                node_affinity: vec![],
879                node_anti_affinity: vec![],
880                require_dedicated: false,
881                allow_preemption: false,
882                max_nodes: None,
883                locality_preference: LocalityPreference::None,
884            },
885            created_at: 0,
886            deadline: None,
887        };
888
889        let request_id =
890            scheduler.submit_resource_request(request).expect("operation failed in test");
891        assert_eq!(request_id, "req-1");
892
893        // Check that resources were allocated
894        assert_eq!(
895            scheduler.get_active_allocations_count().expect("operation failed in test"),
896            1
897        );
898
899        let utilization = scheduler.get_resource_utilization().expect("operation failed in test");
900        assert_eq!(utilization.get("cpu-pool-1"), Some(&0.25)); // 4/16 = 0.25
901    }
902
903    #[test]
904    fn test_allocation_release() {
905        let scheduler = ResourceScheduler::new();
906
907        // Register a pool
908        let pool = ResourcePool {
909            pool_id: "cpu-pool-1".to_string(),
910            name: "CPU Pool 1".to_string(),
911            resource_type: ResourceType::CPU { cores: 16 },
912            total_capacity: 16,
913            available_capacity: 16,
914            reserved_capacity: 0,
915            allocations: HashMap::new(),
916            maintenance_windows: Vec::new(),
917            utilization_history: VecDeque::new(),
918            cost_per_unit: 0.1,
919            tags: HashMap::new(),
920        };
921        scheduler.register_resource_pool(pool).expect("operation failed in test");
922
923        // Submit a request
924        let request = ResourceRequest {
925            request_id: "req-1".to_string(),
926            job_id: "job-1".to_string(),
927            resource_type: ResourceType::CPU { cores: 4 },
928            quantity: 4,
929            duration: Some(Duration::from_secs(3600)),
930            priority: Priority::Normal,
931            constraints: ResourceConstraints {
932                node_affinity: vec![],
933                node_anti_affinity: vec![],
934                require_dedicated: false,
935                allow_preemption: false,
936                max_nodes: None,
937                locality_preference: LocalityPreference::None,
938            },
939            created_at: 0,
940            deadline: None,
941        };
942
943        scheduler.submit_resource_request(request).expect("operation failed in test");
944
945        // Get allocation ID (in a real implementation, this would be returned)
946        let active_allocations =
947            scheduler.active_allocations.read().expect("lock should not be poisoned");
948        let allocation_id =
949            active_allocations.keys().next().expect("operation failed in test").clone();
950        drop(active_allocations);
951
952        // Release the allocation
953        scheduler.release_allocation(&allocation_id).expect("operation failed in test");
954
955        // Check that resources were freed
956        assert_eq!(
957            scheduler.get_active_allocations_count().expect("operation failed in test"),
958            0
959        );
960
961        let utilization = scheduler.get_resource_utilization().expect("operation failed in test");
962        assert_eq!(utilization.get("cpu-pool-1"), Some(&0.0));
963    }
964
965    #[test]
966    fn test_priority_scheduling() {
967        let scheduler = ResourceScheduler::new();
968
969        // Register a small pool to force queueing
970        let pool = ResourcePool {
971            pool_id: "cpu-pool-1".to_string(),
972            name: "CPU Pool 1".to_string(),
973            resource_type: ResourceType::CPU { cores: 4 },
974            total_capacity: 4,
975            available_capacity: 4,
976            reserved_capacity: 0,
977            allocations: HashMap::new(),
978            maintenance_windows: Vec::new(),
979            utilization_history: VecDeque::new(),
980            cost_per_unit: 0.1,
981            tags: HashMap::new(),
982        };
983        scheduler.register_resource_pool(pool).expect("operation failed in test");
984
985        // Submit low priority request first
986        let low_priority_request = ResourceRequest {
987            request_id: "req-low".to_string(),
988            job_id: "job-low".to_string(),
989            resource_type: ResourceType::CPU { cores: 4 },
990            quantity: 4,
991            duration: Some(Duration::from_secs(3600)),
992            priority: Priority::Low,
993            constraints: ResourceConstraints {
994                node_affinity: vec![],
995                node_anti_affinity: vec![],
996                require_dedicated: false,
997                allow_preemption: false,
998                max_nodes: None,
999                locality_preference: LocalityPreference::None,
1000            },
1001            created_at: 0,
1002            deadline: None,
1003        };
1004
1005        scheduler
1006            .submit_resource_request(low_priority_request)
1007            .expect("operation failed in test");
1008
1009        // Submit high priority request that should be prioritized
1010        let high_priority_request = ResourceRequest {
1011            request_id: "req-high".to_string(),
1012            job_id: "job-high".to_string(),
1013            resource_type: ResourceType::CPU { cores: 2 },
1014            quantity: 2,
1015            duration: Some(Duration::from_secs(1800)),
1016            priority: Priority::High,
1017            constraints: ResourceConstraints {
1018                node_affinity: vec![],
1019                node_anti_affinity: vec![],
1020                require_dedicated: false,
1021                allow_preemption: false,
1022                max_nodes: None,
1023                locality_preference: LocalityPreference::None,
1024            },
1025            created_at: 0,
1026            deadline: None,
1027        };
1028
1029        scheduler
1030            .submit_resource_request(high_priority_request)
1031            .expect("operation failed in test");
1032
1033        // In a real scenario, we would verify that the high priority request
1034        // gets allocated first when resources become available
1035        assert!(scheduler.get_pending_requests_count().expect("operation failed in test") > 0);
1036    }
1037}