Skip to main content

oxilean_runtime/task_scheduler/
types.rs

1//! Concurrent task scheduler types: metrics, adaptive load balancing, work stealing.
2
3use std::collections::HashMap;
4
5/// Unique task identifier.
6#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
7pub struct TaskId(pub u64);
8
9impl TaskId {
10    /// Create a new TaskId from a raw value.
11    pub fn new(id: u64) -> Self {
12        TaskId(id)
13    }
14
15    /// Return the raw identifier.
16    pub fn raw(self) -> u64 {
17        self.0
18    }
19}
20
21impl std::fmt::Display for TaskId {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        write!(f, "task#{}", self.0)
24    }
25}
26
27/// Task execution priority level.
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
29#[repr(u8)]
30pub enum TaskPriority {
31    /// Highest priority — must run immediately.
32    Critical = 0,
33    /// High priority — runs before normal work.
34    High = 1,
35    /// Default priority.
36    Normal = 2,
37    /// Lower priority — deferred when system is busy.
38    Low = 3,
39    /// Runs only when workers are otherwise idle.
40    Background = 4,
41}
42
43impl TaskPriority {
44    /// Numeric priority level (lower = more urgent).
45    pub fn level(self) -> u8 {
46        self as u8
47    }
48
49    /// Convert from a raw level, clamping unknown values to `Background`.
50    pub fn from_level(level: u8) -> Self {
51        match level {
52            0 => TaskPriority::Critical,
53            1 => TaskPriority::High,
54            2 => TaskPriority::Normal,
55            3 => TaskPriority::Low,
56            _ => TaskPriority::Background,
57        }
58    }
59}
60
61impl std::fmt::Display for TaskPriority {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        let s = match self {
64            TaskPriority::Critical => "critical",
65            TaskPriority::High => "high",
66            TaskPriority::Normal => "normal",
67            TaskPriority::Low => "low",
68            TaskPriority::Background => "background",
69        };
70        write!(f, "{}", s)
71    }
72}
73
74/// Current lifecycle state of a task.
75#[derive(Clone, Debug, PartialEq, Eq)]
76pub enum TaskState {
77    /// Queued, waiting to be dispatched.
78    Pending,
79    /// Actively executing on the given worker.
80    Running {
81        /// Index of the worker executing this task.
82        worker: usize,
83    },
84    /// Finished successfully.
85    Completed,
86    /// Finished with an error.
87    Failed(String),
88    /// Removed from the queue before execution.
89    Cancelled,
90}
91
92impl TaskState {
93    /// Whether the task is in a terminal state (completed, failed, or cancelled).
94    pub fn is_terminal(&self) -> bool {
95        matches!(
96            self,
97            TaskState::Completed | TaskState::Failed(_) | TaskState::Cancelled
98        )
99    }
100
101    /// Whether the task is still eligible to run.
102    pub fn is_pending(&self) -> bool {
103        matches!(self, TaskState::Pending)
104    }
105
106    /// Whether the task is actively executing.
107    pub fn is_running(&self) -> bool {
108        matches!(self, TaskState::Running { .. })
109    }
110}
111
112impl std::fmt::Display for TaskState {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        match self {
115            TaskState::Pending => write!(f, "pending"),
116            TaskState::Running { worker } => write!(f, "running(worker={})", worker),
117            TaskState::Completed => write!(f, "completed"),
118            TaskState::Failed(msg) => write!(f, "failed({})", msg),
119            TaskState::Cancelled => write!(f, "cancelled"),
120        }
121    }
122}
123
124/// A schedulable unit of work.
125#[derive(Clone, Debug)]
126pub struct Task {
127    /// Unique task identifier.
128    pub id: TaskId,
129    /// Scheduling priority.
130    pub priority: TaskPriority,
131    /// Current state of the task.
132    pub state: TaskState,
133    /// Nanosecond timestamp when the task was enqueued.
134    pub enqueued_at: u64,
135    /// Nanosecond timestamp when the task started executing, if at all.
136    pub started_at: Option<u64>,
137    /// Nanosecond timestamp when the task reached a terminal state.
138    pub completed_at: Option<u64>,
139}
140
141impl Task {
142    /// Create a new pending task.
143    pub fn new(id: TaskId, priority: TaskPriority, enqueued_at: u64) -> Self {
144        Task {
145            id,
146            priority,
147            state: TaskState::Pending,
148            enqueued_at,
149            started_at: None,
150            completed_at: None,
151        }
152    }
153
154    /// Latency from enqueue to completion in nanoseconds.
155    pub fn latency_ns(&self) -> Option<u64> {
156        self.completed_at
157            .map(|c| c.saturating_sub(self.enqueued_at))
158    }
159
160    /// Queuing delay (time between enqueue and start) in nanoseconds.
161    pub fn queue_delay_ns(&self) -> Option<u64> {
162        self.started_at.map(|s| s.saturating_sub(self.enqueued_at))
163    }
164
165    /// Execution duration (start to completion) in nanoseconds.
166    pub fn execution_ns(&self) -> Option<u64> {
167        match (self.started_at, self.completed_at) {
168            (Some(s), Some(c)) => Some(c.saturating_sub(s)),
169            _ => None,
170        }
171    }
172}
173
174/// Per-worker execution statistics.
175#[derive(Clone, Debug, Default)]
176pub struct WorkerStats {
177    /// Worker index.
178    pub id: usize,
179    /// Number of tasks this worker has completed.
180    pub tasks_completed: u64,
181    /// Number of tasks stolen from other workers' queues.
182    pub tasks_stolen: u64,
183    /// Accumulated nanoseconds this worker spent idle.
184    pub idle_time_ns: u64,
185    /// Accumulated nanoseconds this worker spent executing tasks.
186    pub busy_time_ns: u64,
187}
188
189impl WorkerStats {
190    /// Create stats for a given worker id.
191    pub fn new(id: usize) -> Self {
192        WorkerStats {
193            id,
194            ..Default::default()
195        }
196    }
197
198    /// Utilization ratio: `busy_time / (busy_time + idle_time)`.
199    pub fn utilization(&self) -> f64 {
200        let total = self.busy_time_ns + self.idle_time_ns;
201        if total == 0 {
202            return 0.0;
203        }
204        self.busy_time_ns as f64 / total as f64
205    }
206
207    /// Total tasks handled (completed + stolen).
208    pub fn total_tasks(&self) -> u64 {
209        self.tasks_completed + self.tasks_stolen
210    }
211}
212
213impl std::fmt::Display for WorkerStats {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        write!(
216            f,
217            "Worker[{}] completed={} stolen={} util={:.2}",
218            self.id,
219            self.tasks_completed,
220            self.tasks_stolen,
221            self.utilization()
222        )
223    }
224}
225
226/// Aggregated scheduler-level metrics.
227#[derive(Clone, Debug, Default)]
228pub struct SchedulerMetrics {
229    /// Total tasks ever submitted.
230    pub total_tasks: u64,
231    /// Total tasks that completed successfully.
232    pub completed: u64,
233    /// Total tasks that failed.
234    pub failed: u64,
235    /// Per-worker statistics.
236    pub workers: Vec<WorkerStats>,
237    /// Rolling throughput estimate (tasks per second).
238    pub throughput_per_sec: f64,
239    /// Rolling average task latency (nanoseconds).
240    pub avg_latency_ns: u64,
241}
242
243impl SchedulerMetrics {
244    /// Number of tasks still in flight (submitted but not terminal).
245    pub fn in_flight(&self) -> u64 {
246        self.total_tasks
247            .saturating_sub(self.completed + self.failed)
248    }
249
250    /// Success rate in the range `[0.0, 1.0]`.
251    pub fn success_rate(&self) -> f64 {
252        let terminal = self.completed + self.failed;
253        if terminal == 0 {
254            return 1.0;
255        }
256        self.completed as f64 / terminal as f64
257    }
258
259    /// Find the busiest worker by tasks_completed count.
260    pub fn busiest_worker(&self) -> Option<&WorkerStats> {
261        self.workers.iter().max_by_key(|w| w.tasks_completed)
262    }
263
264    /// Find the least-loaded worker by tasks_completed count.
265    pub fn least_loaded_worker(&self) -> Option<&WorkerStats> {
266        self.workers.iter().min_by_key(|w| w.tasks_completed)
267    }
268}
269
270/// Work distribution policy for the adaptive scheduler.
271#[derive(Clone, Debug, PartialEq, Eq)]
272pub enum LoadBalancePolicy {
273    /// Assign tasks to workers in a rotating order.
274    RoundRobin,
275    /// Always assign to the worker with the fewest pending tasks.
276    LeastLoaded,
277    /// Idle workers steal tasks from overloaded peers.
278    WorkStealing,
279    /// Highest-priority tasks are dispatched first, regardless of worker.
280    PriorityFirst,
281}
282
283impl std::fmt::Display for LoadBalancePolicy {
284    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285        let s = match self {
286            LoadBalancePolicy::RoundRobin => "round-robin",
287            LoadBalancePolicy::LeastLoaded => "least-loaded",
288            LoadBalancePolicy::WorkStealing => "work-stealing",
289            LoadBalancePolicy::PriorityFirst => "priority-first",
290        };
291        write!(f, "{}", s)
292    }
293}
294
295/// Adaptive scheduler that tracks task lifecycle and worker metrics.
296pub struct AdaptiveScheduler {
297    /// Active load-balance policy.
298    pub policy: LoadBalancePolicy,
299    /// Per-worker statistics (index = worker id).
300    pub workers: Vec<WorkerStats>,
301    /// Global scheduler metrics.
302    pub metrics: SchedulerMetrics,
303    /// All submitted tasks, keyed by TaskId.
304    pub(super) tasks: HashMap<TaskId, Task>,
305    /// Monotonically increasing task id counter.
306    pub(super) next_task_id: u64,
307    /// Monotonically increasing clock (simulated nanoseconds).
308    pub(super) clock: u64,
309    /// Round-robin cursor for `RoundRobin` policy.
310    pub(super) rr_cursor: usize,
311    /// Accumulated latency for computing the rolling average.
312    pub(super) total_latency_ns: u64,
313    /// Number of completed tasks that contributed to the latency sum.
314    pub(super) latency_sample_count: u64,
315}