1use crate::object::RtObject;
6use std::collections::{HashMap, VecDeque};
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9
10#[allow(dead_code)]
12#[derive(Clone, Debug)]
13pub struct ActorMessage {
14 pub from: ActorId,
16 pub to: ActorId,
18 pub payload: RtObject,
20 pub seq: u64,
22}
23#[allow(dead_code)]
24impl ActorMessage {
25 pub fn new(from: ActorId, to: ActorId, payload: RtObject, seq: u64) -> Self {
27 ActorMessage {
28 from,
29 to,
30 payload,
31 seq,
32 }
33 }
34}
35#[allow(dead_code)]
37#[derive(Debug, Clone)]
38pub struct RoundRobinToken {
39 slots: usize,
40 current: usize,
41}
42#[allow(dead_code)]
43impl RoundRobinToken {
44 pub fn new(slots: usize) -> Self {
45 assert!(slots > 0, "slots must be > 0");
46 Self { slots, current: 0 }
47 }
48 pub fn next(&mut self) -> usize {
50 let slot = self.current;
51 self.current = (self.current + 1) % self.slots;
52 slot
53 }
54 pub fn peek(&self) -> usize {
56 self.current
57 }
58 pub fn reset(&mut self) {
60 self.current = 0;
61 }
62}
63#[allow(dead_code)]
65pub struct SchedulerTestHarness {
66 pub tasks: Vec<(TaskId, RtObject)>,
68 pub execution_order: Vec<TaskId>,
70 pub results: HashMap<TaskId, RtObject>,
72 next_id: u64,
74}
75#[allow(dead_code)]
76impl SchedulerTestHarness {
77 pub fn new() -> Self {
79 SchedulerTestHarness {
80 tasks: Vec::new(),
81 execution_order: Vec::new(),
82 results: HashMap::new(),
83 next_id: 0,
84 }
85 }
86 pub fn submit(&mut self, action: RtObject) -> TaskId {
88 let id = TaskId::new(self.next_id);
89 self.next_id += 1;
90 self.tasks.push((id, action));
91 id
92 }
93 pub fn run_all<F: FnMut(&RtObject) -> RtObject>(&mut self, mut f: F) {
95 let tasks = std::mem::take(&mut self.tasks);
96 for (id, action) in tasks {
97 let result = f(&action);
98 self.execution_order.push(id);
99 self.results.insert(id, result);
100 }
101 }
102 pub fn get_result(&self, id: TaskId) -> Option<&RtObject> {
104 self.results.get(&id)
105 }
106 pub fn completed(&self) -> usize {
108 self.results.len()
109 }
110 pub fn reset(&mut self) {
112 self.tasks.clear();
113 self.execution_order.clear();
114 self.results.clear();
115 self.next_id = 0;
116 }
117}
118#[allow(dead_code)]
120#[derive(Clone, Debug, PartialEq, Eq)]
121pub enum TaskAffinity {
122 Any,
124 Worker(usize),
126 Prefer(usize),
128 MainThread,
130}
131#[allow(dead_code)]
132impl TaskAffinity {
133 pub fn allows(&self, worker: usize) -> bool {
135 match self {
136 TaskAffinity::Any => true,
137 TaskAffinity::Worker(w) => *w == worker,
138 TaskAffinity::Prefer(w) => *w == worker,
139 TaskAffinity::MainThread => worker == 0,
140 }
141 }
142 pub fn is_pinned(&self) -> bool {
144 matches!(self, TaskAffinity::Worker(_) | TaskAffinity::MainThread)
145 }
146 pub fn allows_steal(&self) -> bool {
148 matches!(self, TaskAffinity::Any | TaskAffinity::Prefer(_))
149 }
150}
151#[allow(dead_code)]
153pub struct PriorityTaskQueue {
154 buckets: [VecDeque<TaskId>; 5],
156 total: usize,
158}
159#[allow(dead_code)]
160impl PriorityTaskQueue {
161 pub fn new() -> Self {
163 PriorityTaskQueue {
164 buckets: [
165 VecDeque::new(),
166 VecDeque::new(),
167 VecDeque::new(),
168 VecDeque::new(),
169 VecDeque::new(),
170 ],
171 total: 0,
172 }
173 }
174 pub fn push(&mut self, id: TaskId, priority: TaskPriority) {
176 self.buckets[priority.value() as usize].push_back(id);
177 self.total += 1;
178 }
179 pub fn pop(&mut self) -> Option<(TaskId, TaskPriority)> {
181 for level in (0..5).rev() {
182 if let Some(id) = self.buckets[level].pop_front() {
183 self.total -= 1;
184 return Some((id, TaskPriority::from_u8(level as u8)));
185 }
186 }
187 None
188 }
189 pub fn len(&self) -> usize {
191 self.total
192 }
193 pub fn is_empty(&self) -> bool {
195 self.total == 0
196 }
197 pub fn count_at(&self, priority: TaskPriority) -> usize {
199 self.buckets[priority.value() as usize].len()
200 }
201 pub fn clear(&mut self) {
203 for bucket in &mut self.buckets {
204 bucket.clear();
205 }
206 self.total = 0;
207 }
208}
209#[derive(Clone, Debug)]
211pub struct Task {
212 pub id: TaskId,
214 pub name: Option<String>,
216 pub priority: TaskPriority,
218 pub state: TaskState,
220 pub action: RtObject,
222 pub dependencies: Vec<TaskId>,
224 pub dependents: Vec<TaskId>,
226 pub created_at: u64,
228 pub completed_at: Option<u64>,
230}
231impl Task {
232 pub fn new(id: TaskId, action: RtObject) -> Self {
234 Task {
235 id,
236 name: None,
237 priority: TaskPriority::Normal,
238 state: TaskState::Created,
239 action,
240 dependencies: Vec::new(),
241 dependents: Vec::new(),
242 created_at: 0,
243 completed_at: None,
244 }
245 }
246 pub fn named(id: TaskId, name: String, action: RtObject) -> Self {
248 Task {
249 id,
250 name: Some(name),
251 priority: TaskPriority::Normal,
252 state: TaskState::Created,
253 action,
254 dependencies: Vec::new(),
255 dependents: Vec::new(),
256 created_at: 0,
257 completed_at: None,
258 }
259 }
260 pub fn with_priority(mut self, priority: TaskPriority) -> Self {
262 self.priority = priority;
263 self
264 }
265 pub fn depends_on(mut self, dep: TaskId) -> Self {
267 self.dependencies.push(dep);
268 self
269 }
270 pub fn dependencies_satisfied(&self, completed: &[TaskId]) -> bool {
272 self.dependencies.iter().all(|dep| completed.contains(dep))
273 }
274 pub fn complete(&mut self, result: RtObject) {
276 self.state = TaskState::Completed { result };
277 }
278 pub fn fail(&mut self, error: String) {
280 self.state = TaskState::Failed { error };
281 }
282 pub fn cancel(&mut self) {
284 self.state = TaskState::Cancelled;
285 }
286 pub fn result(&self) -> Option<&RtObject> {
288 if let TaskState::Completed { ref result } = self.state {
289 Some(result)
290 } else {
291 None
292 }
293 }
294 pub fn error(&self) -> Option<&str> {
296 if let TaskState::Failed { ref error } = self.state {
297 Some(error)
298 } else {
299 None
300 }
301 }
302}
303#[allow(dead_code)]
305#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
306pub struct ActorId(pub u64);
307#[allow(dead_code)]
308impl ActorId {
309 pub fn new(id: u64) -> Self {
311 ActorId(id)
312 }
313 pub fn raw(self) -> u64 {
315 self.0
316 }
317}
318#[derive(Debug)]
320pub struct Worker {
321 pub id: usize,
323 pub deque: WorkStealingDeque,
325 pub tasks_completed: u64,
327 pub tasks_stolen_from: u64,
329 pub tasks_stolen: u64,
331 pub idle: bool,
333 pub current_task: Option<TaskId>,
335}
336impl Worker {
337 pub fn new(id: usize, deque_capacity: usize) -> Self {
339 Worker {
340 id,
341 deque: WorkStealingDeque::new(deque_capacity),
342 tasks_completed: 0,
343 tasks_stolen_from: 0,
344 tasks_stolen: 0,
345 idle: true,
346 current_task: None,
347 }
348 }
349 pub fn push_task(&mut self, task_id: TaskId) -> bool {
351 self.deque.push(task_id)
352 }
353 pub fn pop_task(&mut self) -> Option<TaskId> {
355 self.deque.pop()
356 }
357 pub fn start_task(&mut self, task_id: TaskId) {
359 self.current_task = Some(task_id);
360 self.idle = false;
361 }
362 pub fn finish_task(&mut self) {
364 self.current_task = None;
365 self.idle = true;
366 self.tasks_completed += 1;
367 }
368 pub fn load(&self) -> usize {
370 self.deque.len() + if self.current_task.is_some() { 1 } else { 0 }
371 }
372}
373#[derive(Clone, Copy, Debug, PartialEq, Eq)]
375pub enum LoadBalanceStrategy {
376 RoundRobin,
378 LeastLoaded,
380 Random,
382 WorkStealing,
384}
385#[derive(Clone, Debug)]
387pub struct WorkerStats {
388 pub id: usize,
390 pub tasks_completed: u64,
392 pub tasks_stolen: u64,
394 pub tasks_stolen_from: u64,
396 pub queue_length: usize,
398 pub idle: bool,
400}
401#[allow(dead_code)]
403pub struct YieldHandle {
404 requested: Arc<AtomicBool>,
405}
406#[allow(dead_code)]
407impl YieldHandle {
408 pub fn request(&self) {
410 self.requested.store(true, Ordering::Release);
411 }
412 pub fn is_pending(&self) -> bool {
414 self.requested.load(Ordering::Acquire)
415 }
416}
417#[allow(dead_code)]
419#[derive(Clone, Debug, Default)]
420pub struct ExtSchedulerStats {
421 pub tasks_created: u64,
423 pub tasks_completed: u64,
425 pub tasks_cancelled: u64,
427 pub tasks_stolen: u64,
429 pub idle_samples: u64,
431 pub busy_samples: u64,
433 pub total_latency_ticks: u64,
435 pub max_latency_ticks: u64,
437 pub latency_violations: u64,
439 pub latency_threshold_ticks: u64,
441}
442#[allow(dead_code)]
443impl ExtSchedulerStats {
444 pub fn new() -> Self {
446 Self::default()
447 }
448 pub fn record_created(&mut self) {
450 self.tasks_created += 1;
451 }
452 pub fn record_completed(&mut self, latency_ticks: u64) {
454 self.tasks_completed += 1;
455 self.total_latency_ticks += latency_ticks;
456 if latency_ticks > self.max_latency_ticks {
457 self.max_latency_ticks = latency_ticks;
458 }
459 if self.latency_threshold_ticks > 0 && latency_ticks > self.latency_threshold_ticks {
460 self.latency_violations += 1;
461 }
462 }
463 pub fn record_cancelled(&mut self) {
465 self.tasks_cancelled += 1;
466 }
467 pub fn record_steal(&mut self) {
469 self.tasks_stolen += 1;
470 }
471 pub fn record_sample(&mut self, busy: bool) {
473 if busy {
474 self.busy_samples += 1;
475 } else {
476 self.idle_samples += 1;
477 }
478 }
479 pub fn utilization(&self) -> f64 {
481 let total = self.busy_samples + self.idle_samples;
482 if total == 0 {
483 return 0.0;
484 }
485 self.busy_samples as f64 / total as f64
486 }
487 pub fn avg_latency(&self) -> f64 {
489 if self.tasks_completed == 0 {
490 return 0.0;
491 }
492 self.total_latency_ticks as f64 / self.tasks_completed as f64
493 }
494 pub fn merge(&mut self, other: &ExtSchedulerStats) {
496 self.tasks_created += other.tasks_created;
497 self.tasks_completed += other.tasks_completed;
498 self.tasks_cancelled += other.tasks_cancelled;
499 self.tasks_stolen += other.tasks_stolen;
500 self.idle_samples += other.idle_samples;
501 self.busy_samples += other.busy_samples;
502 self.total_latency_ticks += other.total_latency_ticks;
503 if other.max_latency_ticks > self.max_latency_ticks {
504 self.max_latency_ticks = other.max_latency_ticks;
505 }
506 self.latency_violations += other.latency_violations;
507 }
508 pub fn reset(&mut self) {
510 *self = Self::default();
511 }
512}
513#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
515pub enum TaskPriority {
516 Background = 0,
518 Low = 1,
520 #[default]
522 Normal = 2,
523 High = 3,
525 Critical = 4,
527}
528impl TaskPriority {
529 pub fn from_u8(v: u8) -> Self {
531 match v {
532 0 => TaskPriority::Background,
533 1 => TaskPriority::Low,
534 2 => TaskPriority::Normal,
535 3 => TaskPriority::High,
536 _ => TaskPriority::Critical,
537 }
538 }
539 pub fn value(self) -> u8 {
541 self as u8
542 }
543 pub fn is_high(self) -> bool {
545 self >= TaskPriority::High
546 }
547 pub fn is_background(self) -> bool {
549 self == TaskPriority::Background
550 }
551}
552#[allow(dead_code)]
554pub struct BackpressureController {
555 pub high_watermark: usize,
557 pub low_watermark: usize,
559 pub current_depth: usize,
561 throttled: bool,
563 pub throttle_events: u64,
565}
566#[allow(dead_code)]
567impl BackpressureController {
568 pub fn new(high_watermark: usize, low_watermark: usize) -> Self {
570 BackpressureController {
571 high_watermark,
572 low_watermark,
573 current_depth: 0,
574 throttled: false,
575 throttle_events: 0,
576 }
577 }
578 pub fn enqueue(&mut self) {
580 self.current_depth += 1;
581 if self.current_depth >= self.high_watermark && !self.throttled {
582 self.throttled = true;
583 self.throttle_events += 1;
584 }
585 }
586 pub fn dequeue(&mut self) {
588 if self.current_depth > 0 {
589 self.current_depth -= 1;
590 }
591 if self.current_depth <= self.low_watermark {
592 self.throttled = false;
593 }
594 }
595 pub fn is_throttled(&self) -> bool {
597 self.throttled
598 }
599 pub fn fill_ratio(&self) -> f64 {
601 if self.high_watermark == 0 {
602 return 1.0;
603 }
604 (self.current_depth as f64 / self.high_watermark as f64).min(1.0)
605 }
606 pub fn reset(&mut self) {
608 self.current_depth = 0;
609 self.throttled = false;
610 }
611}
612pub struct ParallelEval;
614impl ParallelEval {
615 pub fn par_map(scheduler: &mut Scheduler, actions: Vec<RtObject>) -> Vec<TaskId> {
617 actions
618 .into_iter()
619 .map(|action| scheduler.spawn(action))
620 .collect()
621 }
622 pub fn par_pair(
624 scheduler: &mut Scheduler,
625 action_a: RtObject,
626 action_b: RtObject,
627 ) -> (TaskId, TaskId) {
628 let a = scheduler.spawn(action_a);
629 let b = scheduler.spawn(action_b);
630 (a, b)
631 }
632 pub fn when_all(
634 scheduler: &mut Scheduler,
635 deps: Vec<TaskId>,
636 continuation: RtObject,
637 ) -> TaskId {
638 scheduler.spawn_with_deps(continuation, deps)
639 }
640 pub fn barrier(
642 scheduler: &mut Scheduler,
643 dep_actions: Vec<RtObject>,
644 continuation: RtObject,
645 ) -> (Vec<TaskId>, TaskId) {
646 let dep_ids: Vec<TaskId> = dep_actions
647 .into_iter()
648 .map(|action| scheduler.spawn(action))
649 .collect();
650 let barrier_id = scheduler.spawn_with_deps(continuation, dep_ids.clone());
651 (dep_ids, barrier_id)
652 }
653}
654#[allow(dead_code)]
656#[derive(Clone, Debug)]
657pub struct TaskProfile {
658 pub id: TaskId,
660 pub name: Option<String>,
662 pub created_at: u64,
664 pub started_at: Option<u64>,
666 pub completed_at: Option<u64>,
668 pub yield_count: u32,
670 pub steal_count: u32,
672 pub completed_by: Option<usize>,
674}
675#[allow(dead_code)]
676impl TaskProfile {
677 pub fn new(id: TaskId, created_at: u64) -> Self {
679 TaskProfile {
680 id,
681 name: None,
682 created_at,
683 started_at: None,
684 completed_at: None,
685 yield_count: 0,
686 steal_count: 0,
687 completed_by: None,
688 }
689 }
690 pub fn start(&mut self, tick: u64) {
692 self.started_at = Some(tick);
693 }
694 pub fn complete(&mut self, tick: u64, worker: usize) {
696 self.completed_at = Some(tick);
697 self.completed_by = Some(worker);
698 }
699 pub fn queue_latency(&self) -> Option<u64> {
701 self.started_at.map(|s| s - self.created_at)
702 }
703 pub fn execution_time(&self) -> Option<u64> {
705 match (self.started_at, self.completed_at) {
706 (Some(s), Some(c)) => Some(c - s),
707 _ => None,
708 }
709 }
710 pub fn total_latency(&self) -> Option<u64> {
712 self.completed_at.map(|c| c - self.created_at)
713 }
714}
715pub struct LoadBalancer {
717 strategy: LoadBalanceStrategy,
719 rr_counter: usize,
721 num_workers: usize,
723}
724impl LoadBalancer {
725 pub fn new(strategy: LoadBalanceStrategy, num_workers: usize) -> Self {
727 LoadBalancer {
728 strategy,
729 rr_counter: 0,
730 num_workers,
731 }
732 }
733 pub fn select_worker(&mut self, worker_loads: &[usize]) -> usize {
735 match self.strategy {
736 LoadBalanceStrategy::RoundRobin => {
737 let worker = self.rr_counter % self.num_workers;
738 self.rr_counter += 1;
739 worker
740 }
741 LoadBalanceStrategy::LeastLoaded => worker_loads
742 .iter()
743 .enumerate()
744 .min_by_key(|(_, load)| *load)
745 .map(|(i, _)| i)
746 .unwrap_or(0),
747 LoadBalanceStrategy::Random => {
748 self.rr_counter = self
749 .rr_counter
750 .wrapping_mul(6364136223846793005)
751 .wrapping_add(1);
752 (self.rr_counter >> 16) % self.num_workers
753 }
754 LoadBalanceStrategy::WorkStealing => 0,
755 }
756 }
757}
758pub struct SharedState {
763 pub shutdown: Arc<AtomicBool>,
765 pub task_counter: Arc<AtomicU64>,
767 pub global_queue: Arc<Mutex<VecDeque<TaskId>>>,
769 pub results: Arc<Mutex<HashMap<TaskId, RtObject>>>,
771}
772impl SharedState {
773 pub fn new() -> Self {
775 SharedState {
776 shutdown: Arc::new(AtomicBool::new(false)),
777 task_counter: Arc::new(AtomicU64::new(0)),
778 global_queue: Arc::new(Mutex::new(VecDeque::new())),
779 results: Arc::new(Mutex::new(HashMap::new())),
780 }
781 }
782 pub fn request_shutdown(&self) {
784 self.shutdown.store(true, Ordering::Release);
785 }
786 pub fn should_shutdown(&self) -> bool {
788 self.shutdown.load(Ordering::Acquire)
789 }
790 pub fn next_task_id(&self) -> TaskId {
792 let id = self.task_counter.fetch_add(1, Ordering::Relaxed);
793 TaskId::new(id)
794 }
795 pub fn push_task(&self, task_id: TaskId) {
797 if let Ok(mut queue) = self.global_queue.lock() {
798 queue.push_back(task_id);
799 }
800 }
801 pub fn pop_task(&self) -> Option<TaskId> {
803 self.global_queue.lock().ok()?.pop_front()
804 }
805 pub fn store_result(&self, task_id: TaskId, result: RtObject) {
807 if let Ok(mut results) = self.results.lock() {
808 results.insert(task_id, result);
809 }
810 }
811 pub fn get_result(&self, task_id: TaskId) -> Option<RtObject> {
813 self.results.lock().ok()?.get(&task_id).cloned()
814 }
815}
816#[allow(dead_code)]
818#[derive(Clone, Debug)]
819pub struct PreemptionSimulator {
820 pub time_slice: u64,
822 pub ticks_used: u64,
824 pub preemptions: u64,
826 pub active_task: Option<TaskId>,
828}
829#[allow(dead_code)]
830impl PreemptionSimulator {
831 pub fn new(time_slice: u64) -> Self {
833 PreemptionSimulator {
834 time_slice,
835 ticks_used: 0,
836 preemptions: 0,
837 active_task: None,
838 }
839 }
840 pub fn set_active(&mut self, id: TaskId) {
842 self.active_task = Some(id);
843 self.ticks_used = 0;
844 }
845 pub fn tick(&mut self) -> bool {
847 self.ticks_used += 1;
848 if self.ticks_used >= self.time_slice {
849 self.preemptions += 1;
850 self.ticks_used = 0;
851 self.active_task = None;
852 true
853 } else {
854 false
855 }
856 }
857 pub fn remaining(&self) -> u64 {
859 self.time_slice.saturating_sub(self.ticks_used)
860 }
861}
862#[derive(Clone, Debug)]
864pub struct SchedulerConfig {
865 pub num_workers: usize,
867 pub deque_capacity: usize,
869 pub max_tasks: usize,
871 pub work_stealing: bool,
873 pub steal_batch_size: usize,
875 pub priority_scheduling: bool,
877 pub max_retries: u32,
879}
880impl SchedulerConfig {
881 pub fn new() -> Self {
883 SchedulerConfig {
884 num_workers: 4,
885 deque_capacity: 1024,
886 max_tasks: 100_000,
887 work_stealing: true,
888 steal_batch_size: 4,
889 priority_scheduling: true,
890 max_retries: 3,
891 }
892 }
893 pub fn single_threaded() -> Self {
895 SchedulerConfig {
896 num_workers: 1,
897 deque_capacity: 1024,
898 max_tasks: 100_000,
899 work_stealing: false,
900 steal_batch_size: 1,
901 priority_scheduling: false,
902 max_retries: 0,
903 }
904 }
905 pub fn with_workers(mut self, n: usize) -> Self {
907 self.num_workers = n.max(1);
908 self
909 }
910 pub fn with_deque_capacity(mut self, cap: usize) -> Self {
912 self.deque_capacity = cap;
913 self
914 }
915 pub fn with_max_tasks(mut self, max: usize) -> Self {
917 self.max_tasks = max;
918 self
919 }
920 pub fn with_work_stealing(mut self, enabled: bool) -> Self {
922 self.work_stealing = enabled;
923 self
924 }
925}
926pub struct Scheduler {
930 config: SchedulerConfig,
932 pub(super) workers: Vec<Worker>,
934 tasks: HashMap<TaskId, Task>,
936 pub(super) global_queue: VecDeque<TaskId>,
938 pub(super) completed: Vec<TaskId>,
940 next_task_id: u64,
942 running: bool,
944 stats: SchedulerStats,
946}
947impl Scheduler {
948 pub fn new(config: SchedulerConfig) -> Self {
950 let workers: Vec<Worker> = (0..config.num_workers)
951 .map(|id| Worker::new(id, config.deque_capacity))
952 .collect();
953 Scheduler {
954 config,
955 workers,
956 tasks: HashMap::new(),
957 global_queue: VecDeque::new(),
958 completed: Vec::new(),
959 next_task_id: 0,
960 running: false,
961 stats: SchedulerStats::default(),
962 }
963 }
964 pub fn single_threaded() -> Self {
966 Scheduler::new(SchedulerConfig::single_threaded())
967 }
968 pub fn spawn(&mut self, action: RtObject) -> TaskId {
970 let id = TaskId::new(self.next_task_id);
971 self.next_task_id += 1;
972 let task = Task::new(id, action);
973 self.tasks.insert(id, task);
974 self.global_queue.push_back(id);
975 self.stats.tasks_created += 1;
976 let active = self.active_task_count() as u64;
977 if active > self.stats.peak_active_tasks {
978 self.stats.peak_active_tasks = active;
979 }
980 id
981 }
982 pub fn spawn_named(&mut self, name: String, action: RtObject) -> TaskId {
984 let id = TaskId::new(self.next_task_id);
985 self.next_task_id += 1;
986 let task = Task::named(id, name, action);
987 self.tasks.insert(id, task);
988 self.global_queue.push_back(id);
989 self.stats.tasks_created += 1;
990 id
991 }
992 pub fn spawn_with_priority(&mut self, action: RtObject, priority: TaskPriority) -> TaskId {
994 let id = TaskId::new(self.next_task_id);
995 self.next_task_id += 1;
996 let task = Task::new(id, action).with_priority(priority);
997 self.tasks.insert(id, task);
998 if self.config.priority_scheduling && priority >= TaskPriority::High {
999 self.global_queue.push_front(id);
1000 } else {
1001 self.global_queue.push_back(id);
1002 }
1003 self.stats.tasks_created += 1;
1004 id
1005 }
1006 pub fn spawn_with_deps(&mut self, action: RtObject, deps: Vec<TaskId>) -> TaskId {
1008 let id = TaskId::new(self.next_task_id);
1009 self.next_task_id += 1;
1010 let mut task = Task::new(id, action);
1011 task.dependencies = deps.clone();
1012 let all_satisfied = deps.iter().all(|dep| self.completed.contains(dep));
1013 if all_satisfied {
1014 task.state = TaskState::Created;
1015 self.global_queue.push_back(id);
1016 } else {
1017 task.state = TaskState::Suspended {
1018 waiting_on: deps.clone(),
1019 };
1020 }
1021 for dep in &deps {
1022 if let Some(dep_task) = self.tasks.get_mut(dep) {
1023 dep_task.dependents.push(id);
1024 }
1025 }
1026 self.tasks.insert(id, task);
1027 self.stats.tasks_created += 1;
1028 id
1029 }
1030 pub fn get_task(&self, id: TaskId) -> Option<&Task> {
1032 self.tasks.get(&id)
1033 }
1034 pub fn get_task_mut(&mut self, id: TaskId) -> Option<&mut Task> {
1036 self.tasks.get_mut(&id)
1037 }
1038 pub fn cancel(&mut self, id: TaskId) -> bool {
1040 if let Some(task) = self.tasks.get_mut(&id) {
1041 if !task.state.is_terminal() {
1042 task.cancel();
1043 self.stats.tasks_cancelled += 1;
1044 return true;
1045 }
1046 }
1047 false
1048 }
1049 pub fn is_complete(&self, id: TaskId) -> bool {
1051 self.tasks
1052 .get(&id)
1053 .map(|t| t.state.is_terminal())
1054 .unwrap_or(false)
1055 }
1056 pub fn get_result(&self, id: TaskId) -> Option<&RtObject> {
1058 self.tasks.get(&id).and_then(|t| t.result())
1059 }
1060 pub fn schedule_step(&mut self) -> Option<(usize, TaskId)> {
1065 self.stats.scheduling_rounds += 1;
1066 while let Some(task_id) = self.global_queue.pop_front() {
1067 let target_worker = self.find_least_loaded_worker();
1068 if !self.workers[target_worker].push_task(task_id) {
1069 self.global_queue.push_front(task_id);
1070 break;
1071 }
1072 if let Some(task) = self.tasks.get_mut(&task_id) {
1073 task.state = TaskState::Queued;
1074 }
1075 }
1076 for worker_id in 0..self.workers.len() {
1077 if let Some(task_id) = self.workers[worker_id].pop_task() {
1078 self.workers[worker_id].start_task(task_id);
1079 if let Some(task) = self.tasks.get_mut(&task_id) {
1080 task.state = TaskState::Running { worker_id };
1081 }
1082 return Some((worker_id, task_id));
1083 }
1084 }
1085 if self.config.work_stealing {
1086 if let Some((worker_id, task_id)) = self.try_steal() {
1087 self.workers[worker_id].start_task(task_id);
1088 if let Some(task) = self.tasks.get_mut(&task_id) {
1089 task.state = TaskState::Running { worker_id };
1090 }
1091 return Some((worker_id, task_id));
1092 }
1093 }
1094 self.stats.idle_cycles += 1;
1095 None
1096 }
1097 pub fn complete_task(&mut self, task_id: TaskId, result: RtObject) {
1099 let dependents = self
1100 .tasks
1101 .get(&task_id)
1102 .map(|t| t.dependents.clone())
1103 .unwrap_or_default();
1104 if let Some(task) = self.tasks.get_mut(&task_id) {
1105 task.complete(result);
1106 self.completed.push(task_id);
1107 self.stats.tasks_completed += 1;
1108 }
1109 for worker in &mut self.workers {
1110 if worker.current_task == Some(task_id) {
1111 worker.finish_task();
1112 break;
1113 }
1114 }
1115 for dep_id in &dependents {
1116 self.try_wake_task(*dep_id);
1117 }
1118 }
1119 pub fn fail_task(&mut self, task_id: TaskId, error: String) {
1121 if let Some(task) = self.tasks.get_mut(&task_id) {
1122 task.fail(error);
1123 self.stats.tasks_failed += 1;
1124 }
1125 for worker in &mut self.workers {
1126 if worker.current_task == Some(task_id) {
1127 worker.finish_task();
1128 break;
1129 }
1130 }
1131 }
1132 fn try_wake_task(&mut self, task_id: TaskId) {
1134 let should_wake = if let Some(task) = self.tasks.get(&task_id) {
1135 if let TaskState::Suspended { ref waiting_on } = task.state {
1136 waiting_on.iter().all(|dep| self.completed.contains(dep))
1137 } else {
1138 false
1139 }
1140 } else {
1141 false
1142 };
1143 if should_wake {
1144 if let Some(task) = self.tasks.get_mut(&task_id) {
1145 task.state = TaskState::Queued;
1146 }
1147 self.global_queue.push_back(task_id);
1148 }
1149 }
1150 fn find_least_loaded_worker(&self) -> usize {
1152 self.workers
1153 .iter()
1154 .enumerate()
1155 .min_by_key(|(_, w)| w.load())
1156 .map(|(i, _)| i)
1157 .unwrap_or(0)
1158 }
1159 fn try_steal(&mut self) -> Option<(usize, TaskId)> {
1161 self.stats.steal_attempts += 1;
1162 let idle_worker = self.workers.iter().position(|w| w.idle)?;
1163 let busy_worker = self
1164 .workers
1165 .iter()
1166 .enumerate()
1167 .filter(|(i, _)| *i != idle_worker)
1168 .max_by_key(|(_, w)| w.deque.len())?
1169 .0;
1170 if self.workers[busy_worker].deque.is_empty() {
1171 return None;
1172 }
1173 let stolen = self.workers[busy_worker].deque.steal()?;
1174 self.workers[busy_worker].tasks_stolen_from += 1;
1175 self.workers[idle_worker].tasks_stolen += 1;
1176 self.stats.total_steals += 1;
1177 Some((idle_worker, stolen))
1178 }
1179 pub fn active_task_count(&self) -> usize {
1181 self.tasks
1182 .values()
1183 .filter(|t| !t.state.is_terminal())
1184 .count()
1185 }
1186 pub fn completed_count(&self) -> usize {
1188 self.completed.len()
1189 }
1190 pub fn num_workers(&self) -> usize {
1192 self.workers.len()
1193 }
1194 pub fn worker_stats(&self) -> Vec<WorkerStats> {
1196 self.workers
1197 .iter()
1198 .map(|w| WorkerStats {
1199 id: w.id,
1200 tasks_completed: w.tasks_completed,
1201 tasks_stolen: w.tasks_stolen,
1202 tasks_stolen_from: w.tasks_stolen_from,
1203 queue_length: w.deque.len(),
1204 idle: w.idle,
1205 })
1206 .collect()
1207 }
1208 pub fn stats(&self) -> &SchedulerStats {
1210 &self.stats
1211 }
1212 pub fn config(&self) -> &SchedulerConfig {
1214 &self.config
1215 }
1216 pub fn is_running(&self) -> bool {
1218 self.running
1219 }
1220 pub fn start(&mut self) {
1222 self.running = true;
1223 }
1224 pub fn stop(&mut self) {
1226 self.running = false;
1227 }
1228 pub fn reset(&mut self) {
1230 self.tasks.clear();
1231 self.global_queue.clear();
1232 self.completed.clear();
1233 self.next_task_id = 0;
1234 self.stats = SchedulerStats::default();
1235 for worker in &mut self.workers {
1236 worker.deque.clear();
1237 worker.tasks_completed = 0;
1238 worker.tasks_stolen = 0;
1239 worker.tasks_stolen_from = 0;
1240 worker.idle = true;
1241 worker.current_task = None;
1242 }
1243 }
1244 pub fn run_all(&mut self, mut executor: impl FnMut(&Task) -> Result<RtObject, String>) {
1246 self.start();
1247 while self.active_task_count() > 0 {
1248 if let Some((_worker_id, task_id)) = self.schedule_step() {
1249 let result = {
1250 let task = self
1251 .tasks
1252 .get(&task_id)
1253 .expect("task_id returned by schedule_step must exist in the tasks map");
1254 executor(task)
1255 };
1256 match result {
1257 Ok(value) => self.complete_task(task_id, value),
1258 Err(error) => self.fail_task(task_id, error),
1259 }
1260 } else {
1261 let has_suspended = self.tasks.values().any(|t| t.state.is_suspended());
1262 if has_suspended && self.global_queue.is_empty() {
1263 let suspended: Vec<TaskId> = self
1264 .tasks
1265 .iter()
1266 .filter(|(_, t)| t.state.is_suspended())
1267 .map(|(id, _)| *id)
1268 .collect();
1269 for id in suspended {
1270 self.fail_task(id, "deadlock detected".to_string());
1271 }
1272 }
1273 break;
1274 }
1275 }
1276 self.stop();
1277 }
1278}
1279pub struct WorkStealingDeque {
1285 pub(super) deque: VecDeque<TaskId>,
1287 pub(super) capacity: usize,
1289}
1290impl WorkStealingDeque {
1291 pub fn new(capacity: usize) -> Self {
1293 WorkStealingDeque {
1294 deque: VecDeque::with_capacity(capacity),
1295 capacity,
1296 }
1297 }
1298 pub fn push(&mut self, task_id: TaskId) -> bool {
1300 if self.deque.len() >= self.capacity {
1301 return false;
1302 }
1303 self.deque.push_back(task_id);
1304 true
1305 }
1306 pub fn pop(&mut self) -> Option<TaskId> {
1308 self.deque.pop_back()
1309 }
1310 pub fn steal(&mut self) -> Option<TaskId> {
1312 self.deque.pop_front()
1313 }
1314 pub fn len(&self) -> usize {
1316 self.deque.len()
1317 }
1318 pub fn is_empty(&self) -> bool {
1320 self.deque.is_empty()
1321 }
1322 pub fn is_full(&self) -> bool {
1324 self.deque.len() >= self.capacity
1325 }
1326 pub fn clear(&mut self) {
1328 self.deque.clear();
1329 }
1330 pub fn peek(&self) -> Option<&TaskId> {
1332 self.deque.back()
1333 }
1334 pub fn steal_batch(&mut self, n: usize) -> Vec<TaskId> {
1336 let count = n.min(self.deque.len() / 2).max(1).min(self.deque.len());
1337 let mut stolen = Vec::with_capacity(count);
1338 for _ in 0..count {
1339 if let Some(task_id) = self.deque.pop_front() {
1340 stolen.push(task_id);
1341 } else {
1342 break;
1343 }
1344 }
1345 stolen
1346 }
1347}
1348#[derive(Clone, Debug, Default)]
1350pub struct SchedulerStats {
1351 pub tasks_created: u64,
1353 pub tasks_completed: u64,
1355 pub tasks_failed: u64,
1357 pub tasks_cancelled: u64,
1359 pub total_steals: u64,
1361 pub steal_attempts: u64,
1363 pub idle_cycles: u64,
1365 pub peak_active_tasks: u64,
1367 pub scheduling_rounds: u64,
1369}
1370#[allow(dead_code)]
1372pub struct ActorMailbox {
1373 pub id: ActorId,
1375 messages: VecDeque<ActorMessage>,
1377 pub total_received: u64,
1379 pub total_processed: u64,
1381}
1382#[allow(dead_code)]
1383impl ActorMailbox {
1384 pub fn new(id: ActorId) -> Self {
1386 ActorMailbox {
1387 id,
1388 messages: VecDeque::new(),
1389 total_received: 0,
1390 total_processed: 0,
1391 }
1392 }
1393 pub fn send(&mut self, msg: ActorMessage) {
1395 self.messages.push_back(msg);
1396 self.total_received += 1;
1397 }
1398 pub fn receive(&mut self) -> Option<ActorMessage> {
1400 let msg = self.messages.pop_front();
1401 if msg.is_some() {
1402 self.total_processed += 1;
1403 }
1404 msg
1405 }
1406 pub fn pending(&self) -> usize {
1408 self.messages.len()
1409 }
1410 pub fn is_empty(&self) -> bool {
1412 self.messages.is_empty()
1413 }
1414}
1415#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
1417pub struct TaskId(pub u64);
1418impl TaskId {
1419 pub fn new(id: u64) -> Self {
1421 TaskId(id)
1422 }
1423 pub fn raw(self) -> u64 {
1425 self.0
1426 }
1427}
1428#[derive(Clone, Debug, PartialEq, Eq)]
1430pub enum TaskState {
1431 Created,
1433 Queued,
1435 Running {
1437 worker_id: usize,
1439 },
1440 Suspended {
1442 waiting_on: Vec<TaskId>,
1444 },
1445 Completed {
1447 result: RtObject,
1449 },
1450 Failed {
1452 error: String,
1454 },
1455 Cancelled,
1457}
1458impl TaskState {
1459 pub fn is_terminal(&self) -> bool {
1461 matches!(
1462 self,
1463 TaskState::Completed { .. } | TaskState::Failed { .. } | TaskState::Cancelled
1464 )
1465 }
1466 pub fn is_runnable(&self) -> bool {
1468 matches!(self, TaskState::Created | TaskState::Queued)
1469 }
1470 pub fn is_running(&self) -> bool {
1472 matches!(self, TaskState::Running { .. })
1473 }
1474 pub fn is_suspended(&self) -> bool {
1476 matches!(self, TaskState::Suspended { .. })
1477 }
1478}
1479#[allow(dead_code)]
1481pub struct YieldPoint {
1482 requested: Arc<AtomicBool>,
1484 pub check_count: u64,
1486 pub yield_count: u64,
1488 pub check_interval: u64,
1490}
1491#[allow(dead_code)]
1492impl YieldPoint {
1493 pub fn new() -> Self {
1495 YieldPoint {
1496 requested: Arc::new(AtomicBool::new(false)),
1497 check_count: 0,
1498 yield_count: 0,
1499 check_interval: 100,
1500 }
1501 }
1502 pub fn with_interval(check_interval: u64) -> Self {
1504 YieldPoint {
1505 requested: Arc::new(AtomicBool::new(false)),
1506 check_count: 0,
1507 yield_count: 0,
1508 check_interval,
1509 }
1510 }
1511 pub fn request_yield(&self) {
1513 self.requested.store(true, Ordering::Release);
1514 }
1515 pub fn clear_request(&self) {
1517 self.requested.store(false, Ordering::Release);
1518 }
1519 pub fn should_yield(&mut self) -> bool {
1521 self.check_count += 1;
1522 if self.requested.load(Ordering::Acquire) {
1523 self.yield_count += 1;
1524 self.clear_request();
1525 true
1526 } else {
1527 false
1528 }
1529 }
1530 pub fn handle(&self) -> YieldHandle {
1532 YieldHandle {
1533 requested: Arc::clone(&self.requested),
1534 }
1535 }
1536}