Skip to main content

cbtop/predictive_scheduler/
types.rs

1//! Core types for predictive scheduling: config, host profiles, workloads, and metrics.
2
3use std::collections::HashMap;
4use std::time::{Duration, Instant};
5
6/// Configuration for predictive scheduler
7#[derive(Debug, Clone)]
8pub struct PredictiveSchedulerConfig {
9    /// Target SLO compliance rate (0.0-1.0)
10    pub target_slo_compliance: f64,
11    /// Maximum cost per operation (normalized units)
12    pub max_cost_per_op: f64,
13    /// Enable spot instance scheduling
14    pub enable_spot_instances: bool,
15    /// Preemption buffer time (avoid scheduling near preemption)
16    pub preemption_buffer: Duration,
17    /// Load balancing weight decay factor
18    pub load_decay_factor: f64,
19    /// Minimum host capacity utilization before overflow
20    pub min_capacity_threshold: f64,
21    /// SLO violation penalty multiplier for cost function
22    pub slo_violation_penalty: f64,
23    /// History window for performance tracking
24    pub history_window: usize,
25}
26
27impl Default for PredictiveSchedulerConfig {
28    fn default() -> Self {
29        Self {
30            target_slo_compliance: 0.99,
31            max_cost_per_op: 1.0,
32            enable_spot_instances: true,
33            preemption_buffer: Duration::from_secs(300), // 5 minutes
34            load_decay_factor: 0.9,
35            min_capacity_threshold: 0.8,
36            slo_violation_penalty: 10.0,
37            history_window: 100,
38        }
39    }
40}
41
42/// Host instance type for cost modeling
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub enum InstanceType {
45    /// On-demand instances (guaranteed availability)
46    OnDemand,
47    /// Spot instances (can be preempted)
48    Spot,
49    /// Reserved instances (committed capacity)
50    Reserved,
51    /// Preemptible instances (scheduled termination)
52    Preemptible,
53}
54
55impl InstanceType {
56    /// Cost multiplier relative to on-demand
57    pub fn cost_multiplier(&self) -> f64 {
58        match self {
59            Self::OnDemand => 1.0,
60            Self::Spot => 0.3,        // 70% discount
61            Self::Reserved => 0.6,    // 40% discount
62            Self::Preemptible => 0.2, // 80% discount
63        }
64    }
65
66    /// Reliability score (probability of availability)
67    pub fn reliability(&self) -> f64 {
68        match self {
69            Self::OnDemand => 0.9999,
70            Self::Reserved => 0.9999,
71            Self::Spot => 0.85,
72            Self::Preemptible => 0.90,
73        }
74    }
75}
76
77/// Performance profile for a host
78#[derive(Debug, Clone)]
79pub struct HostProfile {
80    /// Unique host identifier
81    pub host_id: String,
82    /// Instance type for cost modeling
83    pub instance_type: InstanceType,
84    /// Compute capacity (operations per second)
85    pub compute_capacity: f64,
86    /// Memory capacity (bytes)
87    pub memory_capacity: u64,
88    /// Current load (0.0-1.0)
89    pub current_load: f64,
90    /// Base cost per hour (normalized units)
91    pub hourly_cost: f64,
92    /// Network latency to coordinator (ms)
93    pub network_latency_ms: f64,
94    /// Historical SLO compliance rate
95    pub historical_slo_compliance: f64,
96    /// Preemption deadline (None = not preemptible)
97    pub preemption_deadline: Option<Instant>,
98    /// Performance variance (CV of operation times)
99    pub performance_cv: f64,
100}
101
102impl HostProfile {
103    /// Create a new host profile
104    pub fn new(host_id: impl Into<String>, instance_type: InstanceType) -> Self {
105        Self {
106            host_id: host_id.into(),
107            instance_type,
108            compute_capacity: 1000.0,
109            memory_capacity: 8 * 1024 * 1024 * 1024, // 8GB
110            current_load: 0.0,
111            hourly_cost: 1.0,
112            network_latency_ms: 1.0,
113            historical_slo_compliance: 0.99,
114            preemption_deadline: None,
115            performance_cv: 0.1,
116        }
117    }
118
119    /// Effective cost per operation
120    pub fn cost_per_op(&self) -> f64 {
121        let base_cost = self.hourly_cost * self.instance_type.cost_multiplier();
122        // Cost per op = hourly cost / ops per hour, adjusted for current load
123        let effective_capacity = self.compute_capacity * (1.0 - self.current_load);
124        if effective_capacity > 0.0 {
125            base_cost / (effective_capacity * 3600.0)
126        } else {
127            f64::MAX
128        }
129    }
130
131    /// Available capacity (0.0-1.0)
132    pub fn available_capacity(&self) -> f64 {
133        (1.0 - self.current_load).max(0.0)
134    }
135
136    /// Time until preemption (if applicable)
137    pub fn time_until_preemption(&self) -> Option<Duration> {
138        self.preemption_deadline.map(|deadline| {
139            let now = Instant::now();
140            if deadline > now {
141                deadline - now
142            } else {
143                Duration::ZERO
144            }
145        })
146    }
147
148    /// Check if host is safe for scheduling (not near preemption)
149    pub fn is_safe_for_scheduling(&self, buffer: Duration) -> bool {
150        match self.time_until_preemption() {
151            Some(time_left) => time_left > buffer,
152            None => true, // Non-preemptible hosts are always safe
153        }
154    }
155}
156
157/// Workload characteristics for scheduling decisions
158#[derive(Debug, Clone)]
159pub struct WorkloadSpec {
160    /// Unique workload identifier
161    pub workload_id: String,
162    /// Estimated operation count
163    pub operation_count: u64,
164    /// Memory requirement (bytes)
165    pub memory_required: u64,
166    /// SLO deadline
167    pub slo_deadline: Duration,
168    /// Priority (higher = more important)
169    pub priority: u32,
170    /// Whether workload can be preempted
171    pub preemptible: bool,
172    /// Estimated compute intensity (ops per byte)
173    pub compute_intensity: f64,
174}
175
176impl WorkloadSpec {
177    /// Create a new workload specification
178    pub fn new(workload_id: impl Into<String>, operation_count: u64) -> Self {
179        Self {
180            workload_id: workload_id.into(),
181            operation_count,
182            memory_required: 1024 * 1024, // 1MB default
183            slo_deadline: Duration::from_millis(100),
184            priority: 1,
185            preemptible: true,
186            compute_intensity: 100.0,
187        }
188    }
189
190    /// Estimated execution time on a host
191    pub fn estimated_execution_time(&self, host: &HostProfile) -> Duration {
192        let ops_per_sec = host.compute_capacity * host.available_capacity();
193        if ops_per_sec > 0.0 {
194            let seconds = self.operation_count as f64 / ops_per_sec;
195            Duration::from_secs_f64(seconds)
196        } else {
197            Duration::MAX
198        }
199    }
200}
201
202/// Scheduling decision for a workload
203#[derive(Debug, Clone)]
204pub struct SchedulingDecision {
205    /// Target host for execution
206    pub host_id: String,
207    /// Predicted execution time
208    pub predicted_time: Duration,
209    /// Predicted cost
210    pub predicted_cost: f64,
211    /// SLO compliance probability
212    pub slo_compliance_prob: f64,
213    /// Scheduling score (higher = better)
214    pub score: f64,
215    /// Reason for selection
216    pub reason: String,
217}
218
219/// Scheduling metrics for monitoring
220#[derive(Debug, Clone, Default)]
221pub struct SchedulerMetrics {
222    /// Total scheduling decisions made
223    pub total_decisions: u64,
224    /// Decisions resulting in SLO violations
225    pub slo_violations: u64,
226    /// Total cost incurred
227    pub total_cost: f64,
228    /// Average scheduling latency
229    pub avg_scheduling_latency_us: f64,
230    /// Host utilization map
231    pub host_utilization: HashMap<String, f64>,
232    /// Spot instance savings
233    pub spot_savings: f64,
234}
235
236impl SchedulerMetrics {
237    /// Current SLO compliance rate
238    pub fn slo_compliance_rate(&self) -> f64 {
239        if self.total_decisions > 0 {
240            1.0 - (self.slo_violations as f64 / self.total_decisions as f64)
241        } else {
242            1.0
243        }
244    }
245
246    /// Average cost per decision
247    pub fn avg_cost_per_decision(&self) -> f64 {
248        if self.total_decisions > 0 {
249            self.total_cost / self.total_decisions as f64
250        } else {
251            0.0
252        }
253    }
254}