oxilean_runtime/task_scheduler/
types.rs1use std::collections::HashMap;
4
5#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
7pub struct TaskId(pub u64);
8
9impl TaskId {
10 pub fn new(id: u64) -> Self {
12 TaskId(id)
13 }
14
15 pub fn raw(self) -> u64 {
17 self.0
18 }
19}
20
21impl std::fmt::Display for TaskId {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 write!(f, "task#{}", self.0)
24 }
25}
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
29#[repr(u8)]
30pub enum TaskPriority {
31 Critical = 0,
33 High = 1,
35 Normal = 2,
37 Low = 3,
39 Background = 4,
41}
42
43impl TaskPriority {
44 pub fn level(self) -> u8 {
46 self as u8
47 }
48
49 pub fn from_level(level: u8) -> Self {
51 match level {
52 0 => TaskPriority::Critical,
53 1 => TaskPriority::High,
54 2 => TaskPriority::Normal,
55 3 => TaskPriority::Low,
56 _ => TaskPriority::Background,
57 }
58 }
59}
60
61impl std::fmt::Display for TaskPriority {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 let s = match self {
64 TaskPriority::Critical => "critical",
65 TaskPriority::High => "high",
66 TaskPriority::Normal => "normal",
67 TaskPriority::Low => "low",
68 TaskPriority::Background => "background",
69 };
70 write!(f, "{}", s)
71 }
72}
73
74#[derive(Clone, Debug, PartialEq, Eq)]
76pub enum TaskState {
77 Pending,
79 Running {
81 worker: usize,
83 },
84 Completed,
86 Failed(String),
88 Cancelled,
90}
91
92impl TaskState {
93 pub fn is_terminal(&self) -> bool {
95 matches!(
96 self,
97 TaskState::Completed | TaskState::Failed(_) | TaskState::Cancelled
98 )
99 }
100
101 pub fn is_pending(&self) -> bool {
103 matches!(self, TaskState::Pending)
104 }
105
106 pub fn is_running(&self) -> bool {
108 matches!(self, TaskState::Running { .. })
109 }
110}
111
112impl std::fmt::Display for TaskState {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 match self {
115 TaskState::Pending => write!(f, "pending"),
116 TaskState::Running { worker } => write!(f, "running(worker={})", worker),
117 TaskState::Completed => write!(f, "completed"),
118 TaskState::Failed(msg) => write!(f, "failed({})", msg),
119 TaskState::Cancelled => write!(f, "cancelled"),
120 }
121 }
122}
123
124#[derive(Clone, Debug)]
126pub struct Task {
127 pub id: TaskId,
129 pub priority: TaskPriority,
131 pub state: TaskState,
133 pub enqueued_at: u64,
135 pub started_at: Option<u64>,
137 pub completed_at: Option<u64>,
139}
140
141impl Task {
142 pub fn new(id: TaskId, priority: TaskPriority, enqueued_at: u64) -> Self {
144 Task {
145 id,
146 priority,
147 state: TaskState::Pending,
148 enqueued_at,
149 started_at: None,
150 completed_at: None,
151 }
152 }
153
154 pub fn latency_ns(&self) -> Option<u64> {
156 self.completed_at
157 .map(|c| c.saturating_sub(self.enqueued_at))
158 }
159
160 pub fn queue_delay_ns(&self) -> Option<u64> {
162 self.started_at.map(|s| s.saturating_sub(self.enqueued_at))
163 }
164
165 pub fn execution_ns(&self) -> Option<u64> {
167 match (self.started_at, self.completed_at) {
168 (Some(s), Some(c)) => Some(c.saturating_sub(s)),
169 _ => None,
170 }
171 }
172}
173
174#[derive(Clone, Debug, Default)]
176pub struct WorkerStats {
177 pub id: usize,
179 pub tasks_completed: u64,
181 pub tasks_stolen: u64,
183 pub idle_time_ns: u64,
185 pub busy_time_ns: u64,
187}
188
189impl WorkerStats {
190 pub fn new(id: usize) -> Self {
192 WorkerStats {
193 id,
194 ..Default::default()
195 }
196 }
197
198 pub fn utilization(&self) -> f64 {
200 let total = self.busy_time_ns + self.idle_time_ns;
201 if total == 0 {
202 return 0.0;
203 }
204 self.busy_time_ns as f64 / total as f64
205 }
206
207 pub fn total_tasks(&self) -> u64 {
209 self.tasks_completed + self.tasks_stolen
210 }
211}
212
213impl std::fmt::Display for WorkerStats {
214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 write!(
216 f,
217 "Worker[{}] completed={} stolen={} util={:.2}",
218 self.id,
219 self.tasks_completed,
220 self.tasks_stolen,
221 self.utilization()
222 )
223 }
224}
225
226#[derive(Clone, Debug, Default)]
228pub struct SchedulerMetrics {
229 pub total_tasks: u64,
231 pub completed: u64,
233 pub failed: u64,
235 pub workers: Vec<WorkerStats>,
237 pub throughput_per_sec: f64,
239 pub avg_latency_ns: u64,
241}
242
243impl SchedulerMetrics {
244 pub fn in_flight(&self) -> u64 {
246 self.total_tasks
247 .saturating_sub(self.completed + self.failed)
248 }
249
250 pub fn success_rate(&self) -> f64 {
252 let terminal = self.completed + self.failed;
253 if terminal == 0 {
254 return 1.0;
255 }
256 self.completed as f64 / terminal as f64
257 }
258
259 pub fn busiest_worker(&self) -> Option<&WorkerStats> {
261 self.workers.iter().max_by_key(|w| w.tasks_completed)
262 }
263
264 pub fn least_loaded_worker(&self) -> Option<&WorkerStats> {
266 self.workers.iter().min_by_key(|w| w.tasks_completed)
267 }
268}
269
270#[derive(Clone, Debug, PartialEq, Eq)]
272pub enum LoadBalancePolicy {
273 RoundRobin,
275 LeastLoaded,
277 WorkStealing,
279 PriorityFirst,
281}
282
283impl std::fmt::Display for LoadBalancePolicy {
284 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285 let s = match self {
286 LoadBalancePolicy::RoundRobin => "round-robin",
287 LoadBalancePolicy::LeastLoaded => "least-loaded",
288 LoadBalancePolicy::WorkStealing => "work-stealing",
289 LoadBalancePolicy::PriorityFirst => "priority-first",
290 };
291 write!(f, "{}", s)
292 }
293}
294
295pub struct AdaptiveScheduler {
297 pub policy: LoadBalancePolicy,
299 pub workers: Vec<WorkerStats>,
301 pub metrics: SchedulerMetrics,
303 pub(super) tasks: HashMap<TaskId, Task>,
305 pub(super) next_task_id: u64,
307 pub(super) clock: u64,
309 pub(super) rr_cursor: usize,
311 pub(super) total_latency_ns: u64,
313 pub(super) latency_sample_count: u64,
315}