use crate::object::RtObject;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct ActorMessage {
pub from: ActorId,
pub to: ActorId,
pub payload: RtObject,
pub seq: u64,
}
#[allow(dead_code)]
impl ActorMessage {
pub fn new(from: ActorId, to: ActorId, payload: RtObject, seq: u64) -> Self {
ActorMessage {
from,
to,
payload,
seq,
}
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct RoundRobinToken {
slots: usize,
current: usize,
}
#[allow(dead_code)]
impl RoundRobinToken {
pub fn new(slots: usize) -> Self {
assert!(slots > 0, "slots must be > 0");
Self { slots, current: 0 }
}
pub fn next(&mut self) -> usize {
let slot = self.current;
self.current = (self.current + 1) % self.slots;
slot
}
pub fn peek(&self) -> usize {
self.current
}
pub fn reset(&mut self) {
self.current = 0;
}
}
#[allow(dead_code)]
pub struct SchedulerTestHarness {
pub tasks: Vec<(TaskId, RtObject)>,
pub execution_order: Vec<TaskId>,
pub results: HashMap<TaskId, RtObject>,
next_id: u64,
}
#[allow(dead_code)]
impl SchedulerTestHarness {
pub fn new() -> Self {
SchedulerTestHarness {
tasks: Vec::new(),
execution_order: Vec::new(),
results: HashMap::new(),
next_id: 0,
}
}
pub fn submit(&mut self, action: RtObject) -> TaskId {
let id = TaskId::new(self.next_id);
self.next_id += 1;
self.tasks.push((id, action));
id
}
pub fn run_all<F: FnMut(&RtObject) -> RtObject>(&mut self, mut f: F) {
let tasks = std::mem::take(&mut self.tasks);
for (id, action) in tasks {
let result = f(&action);
self.execution_order.push(id);
self.results.insert(id, result);
}
}
pub fn get_result(&self, id: TaskId) -> Option<&RtObject> {
self.results.get(&id)
}
pub fn completed(&self) -> usize {
self.results.len()
}
pub fn reset(&mut self) {
self.tasks.clear();
self.execution_order.clear();
self.results.clear();
self.next_id = 0;
}
}
#[allow(dead_code)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TaskAffinity {
Any,
Worker(usize),
Prefer(usize),
MainThread,
}
#[allow(dead_code)]
impl TaskAffinity {
pub fn allows(&self, worker: usize) -> bool {
match self {
TaskAffinity::Any => true,
TaskAffinity::Worker(w) => *w == worker,
TaskAffinity::Prefer(w) => *w == worker,
TaskAffinity::MainThread => worker == 0,
}
}
pub fn is_pinned(&self) -> bool {
matches!(self, TaskAffinity::Worker(_) | TaskAffinity::MainThread)
}
pub fn allows_steal(&self) -> bool {
matches!(self, TaskAffinity::Any | TaskAffinity::Prefer(_))
}
}
#[allow(dead_code)]
pub struct PriorityTaskQueue {
buckets: [VecDeque<TaskId>; 5],
total: usize,
}
#[allow(dead_code)]
impl PriorityTaskQueue {
pub fn new() -> Self {
PriorityTaskQueue {
buckets: [
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
VecDeque::new(),
],
total: 0,
}
}
pub fn push(&mut self, id: TaskId, priority: TaskPriority) {
self.buckets[priority.value() as usize].push_back(id);
self.total += 1;
}
pub fn pop(&mut self) -> Option<(TaskId, TaskPriority)> {
for level in (0..5).rev() {
if let Some(id) = self.buckets[level].pop_front() {
self.total -= 1;
return Some((id, TaskPriority::from_u8(level as u8)));
}
}
None
}
pub fn len(&self) -> usize {
self.total
}
pub fn is_empty(&self) -> bool {
self.total == 0
}
pub fn count_at(&self, priority: TaskPriority) -> usize {
self.buckets[priority.value() as usize].len()
}
pub fn clear(&mut self) {
for bucket in &mut self.buckets {
bucket.clear();
}
self.total = 0;
}
}
#[derive(Clone, Debug)]
pub struct Task {
pub id: TaskId,
pub name: Option<String>,
pub priority: TaskPriority,
pub state: TaskState,
pub action: RtObject,
pub dependencies: Vec<TaskId>,
pub dependents: Vec<TaskId>,
pub created_at: u64,
pub completed_at: Option<u64>,
}
impl Task {
pub fn new(id: TaskId, action: RtObject) -> Self {
Task {
id,
name: None,
priority: TaskPriority::Normal,
state: TaskState::Created,
action,
dependencies: Vec::new(),
dependents: Vec::new(),
created_at: 0,
completed_at: None,
}
}
pub fn named(id: TaskId, name: String, action: RtObject) -> Self {
Task {
id,
name: Some(name),
priority: TaskPriority::Normal,
state: TaskState::Created,
action,
dependencies: Vec::new(),
dependents: Vec::new(),
created_at: 0,
completed_at: None,
}
}
pub fn with_priority(mut self, priority: TaskPriority) -> Self {
self.priority = priority;
self
}
pub fn depends_on(mut self, dep: TaskId) -> Self {
self.dependencies.push(dep);
self
}
pub fn dependencies_satisfied(&self, completed: &[TaskId]) -> bool {
self.dependencies.iter().all(|dep| completed.contains(dep))
}
pub fn complete(&mut self, result: RtObject) {
self.state = TaskState::Completed { result };
}
pub fn fail(&mut self, error: String) {
self.state = TaskState::Failed { error };
}
pub fn cancel(&mut self) {
self.state = TaskState::Cancelled;
}
pub fn result(&self) -> Option<&RtObject> {
if let TaskState::Completed { ref result } = self.state {
Some(result)
} else {
None
}
}
pub fn error(&self) -> Option<&str> {
if let TaskState::Failed { ref error } = self.state {
Some(error)
} else {
None
}
}
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct ActorId(pub u64);
#[allow(dead_code)]
impl ActorId {
pub fn new(id: u64) -> Self {
ActorId(id)
}
pub fn raw(self) -> u64 {
self.0
}
}
#[derive(Debug)]
pub struct Worker {
pub id: usize,
pub deque: WorkStealingDeque,
pub tasks_completed: u64,
pub tasks_stolen_from: u64,
pub tasks_stolen: u64,
pub idle: bool,
pub current_task: Option<TaskId>,
}
impl Worker {
pub fn new(id: usize, deque_capacity: usize) -> Self {
Worker {
id,
deque: WorkStealingDeque::new(deque_capacity),
tasks_completed: 0,
tasks_stolen_from: 0,
tasks_stolen: 0,
idle: true,
current_task: None,
}
}
pub fn push_task(&mut self, task_id: TaskId) -> bool {
self.deque.push(task_id)
}
pub fn pop_task(&mut self) -> Option<TaskId> {
self.deque.pop()
}
pub fn start_task(&mut self, task_id: TaskId) {
self.current_task = Some(task_id);
self.idle = false;
}
pub fn finish_task(&mut self) {
self.current_task = None;
self.idle = true;
self.tasks_completed += 1;
}
pub fn load(&self) -> usize {
self.deque.len() + if self.current_task.is_some() { 1 } else { 0 }
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LoadBalanceStrategy {
RoundRobin,
LeastLoaded,
Random,
WorkStealing,
}
#[derive(Clone, Debug)]
pub struct WorkerStats {
pub id: usize,
pub tasks_completed: u64,
pub tasks_stolen: u64,
pub tasks_stolen_from: u64,
pub queue_length: usize,
pub idle: bool,
}
#[allow(dead_code)]
pub struct YieldHandle {
requested: Arc<AtomicBool>,
}
#[allow(dead_code)]
impl YieldHandle {
pub fn request(&self) {
self.requested.store(true, Ordering::Release);
}
pub fn is_pending(&self) -> bool {
self.requested.load(Ordering::Acquire)
}
}
#[allow(dead_code)]
#[derive(Clone, Debug, Default)]
pub struct ExtSchedulerStats {
pub tasks_created: u64,
pub tasks_completed: u64,
pub tasks_cancelled: u64,
pub tasks_stolen: u64,
pub idle_samples: u64,
pub busy_samples: u64,
pub total_latency_ticks: u64,
pub max_latency_ticks: u64,
pub latency_violations: u64,
pub latency_threshold_ticks: u64,
}
#[allow(dead_code)]
impl ExtSchedulerStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_created(&mut self) {
self.tasks_created += 1;
}
pub fn record_completed(&mut self, latency_ticks: u64) {
self.tasks_completed += 1;
self.total_latency_ticks += latency_ticks;
if latency_ticks > self.max_latency_ticks {
self.max_latency_ticks = latency_ticks;
}
if self.latency_threshold_ticks > 0 && latency_ticks > self.latency_threshold_ticks {
self.latency_violations += 1;
}
}
pub fn record_cancelled(&mut self) {
self.tasks_cancelled += 1;
}
pub fn record_steal(&mut self) {
self.tasks_stolen += 1;
}
pub fn record_sample(&mut self, busy: bool) {
if busy {
self.busy_samples += 1;
} else {
self.idle_samples += 1;
}
}
pub fn utilization(&self) -> f64 {
let total = self.busy_samples + self.idle_samples;
if total == 0 {
return 0.0;
}
self.busy_samples as f64 / total as f64
}
pub fn avg_latency(&self) -> f64 {
if self.tasks_completed == 0 {
return 0.0;
}
self.total_latency_ticks as f64 / self.tasks_completed as f64
}
pub fn merge(&mut self, other: &ExtSchedulerStats) {
self.tasks_created += other.tasks_created;
self.tasks_completed += other.tasks_completed;
self.tasks_cancelled += other.tasks_cancelled;
self.tasks_stolen += other.tasks_stolen;
self.idle_samples += other.idle_samples;
self.busy_samples += other.busy_samples;
self.total_latency_ticks += other.total_latency_ticks;
if other.max_latency_ticks > self.max_latency_ticks {
self.max_latency_ticks = other.max_latency_ticks;
}
self.latency_violations += other.latency_violations;
}
pub fn reset(&mut self) {
*self = Self::default();
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum TaskPriority {
Background = 0,
Low = 1,
#[default]
Normal = 2,
High = 3,
Critical = 4,
}
impl TaskPriority {
pub fn from_u8(v: u8) -> Self {
match v {
0 => TaskPriority::Background,
1 => TaskPriority::Low,
2 => TaskPriority::Normal,
3 => TaskPriority::High,
_ => TaskPriority::Critical,
}
}
pub fn value(self) -> u8 {
self as u8
}
pub fn is_high(self) -> bool {
self >= TaskPriority::High
}
pub fn is_background(self) -> bool {
self == TaskPriority::Background
}
}
#[allow(dead_code)]
pub struct BackpressureController {
pub high_watermark: usize,
pub low_watermark: usize,
pub current_depth: usize,
throttled: bool,
pub throttle_events: u64,
}
#[allow(dead_code)]
impl BackpressureController {
pub fn new(high_watermark: usize, low_watermark: usize) -> Self {
BackpressureController {
high_watermark,
low_watermark,
current_depth: 0,
throttled: false,
throttle_events: 0,
}
}
pub fn enqueue(&mut self) {
self.current_depth += 1;
if self.current_depth >= self.high_watermark && !self.throttled {
self.throttled = true;
self.throttle_events += 1;
}
}
pub fn dequeue(&mut self) {
if self.current_depth > 0 {
self.current_depth -= 1;
}
if self.current_depth <= self.low_watermark {
self.throttled = false;
}
}
pub fn is_throttled(&self) -> bool {
self.throttled
}
pub fn fill_ratio(&self) -> f64 {
if self.high_watermark == 0 {
return 1.0;
}
(self.current_depth as f64 / self.high_watermark as f64).min(1.0)
}
pub fn reset(&mut self) {
self.current_depth = 0;
self.throttled = false;
}
}
pub struct ParallelEval;
impl ParallelEval {
pub fn par_map(scheduler: &mut Scheduler, actions: Vec<RtObject>) -> Vec<TaskId> {
actions
.into_iter()
.map(|action| scheduler.spawn(action))
.collect()
}
pub fn par_pair(
scheduler: &mut Scheduler,
action_a: RtObject,
action_b: RtObject,
) -> (TaskId, TaskId) {
let a = scheduler.spawn(action_a);
let b = scheduler.spawn(action_b);
(a, b)
}
pub fn when_all(
scheduler: &mut Scheduler,
deps: Vec<TaskId>,
continuation: RtObject,
) -> TaskId {
scheduler.spawn_with_deps(continuation, deps)
}
pub fn barrier(
scheduler: &mut Scheduler,
dep_actions: Vec<RtObject>,
continuation: RtObject,
) -> (Vec<TaskId>, TaskId) {
let dep_ids: Vec<TaskId> = dep_actions
.into_iter()
.map(|action| scheduler.spawn(action))
.collect();
let barrier_id = scheduler.spawn_with_deps(continuation, dep_ids.clone());
(dep_ids, barrier_id)
}
}
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct TaskProfile {
pub id: TaskId,
pub name: Option<String>,
pub created_at: u64,
pub started_at: Option<u64>,
pub completed_at: Option<u64>,
pub yield_count: u32,
pub steal_count: u32,
pub completed_by: Option<usize>,
}
#[allow(dead_code)]
impl TaskProfile {
pub fn new(id: TaskId, created_at: u64) -> Self {
TaskProfile {
id,
name: None,
created_at,
started_at: None,
completed_at: None,
yield_count: 0,
steal_count: 0,
completed_by: None,
}
}
pub fn start(&mut self, tick: u64) {
self.started_at = Some(tick);
}
pub fn complete(&mut self, tick: u64, worker: usize) {
self.completed_at = Some(tick);
self.completed_by = Some(worker);
}
pub fn queue_latency(&self) -> Option<u64> {
self.started_at.map(|s| s - self.created_at)
}
pub fn execution_time(&self) -> Option<u64> {
match (self.started_at, self.completed_at) {
(Some(s), Some(c)) => Some(c - s),
_ => None,
}
}
pub fn total_latency(&self) -> Option<u64> {
self.completed_at.map(|c| c - self.created_at)
}
}
pub struct LoadBalancer {
strategy: LoadBalanceStrategy,
rr_counter: usize,
num_workers: usize,
}
impl LoadBalancer {
pub fn new(strategy: LoadBalanceStrategy, num_workers: usize) -> Self {
LoadBalancer {
strategy,
rr_counter: 0,
num_workers,
}
}
pub fn select_worker(&mut self, worker_loads: &[usize]) -> usize {
match self.strategy {
LoadBalanceStrategy::RoundRobin => {
let worker = self.rr_counter % self.num_workers;
self.rr_counter += 1;
worker
}
LoadBalanceStrategy::LeastLoaded => worker_loads
.iter()
.enumerate()
.min_by_key(|(_, load)| *load)
.map(|(i, _)| i)
.unwrap_or(0),
LoadBalanceStrategy::Random => {
self.rr_counter = self
.rr_counter
.wrapping_mul(6364136223846793005)
.wrapping_add(1);
(self.rr_counter >> 16) % self.num_workers
}
LoadBalanceStrategy::WorkStealing => 0,
}
}
}
pub struct SharedState {
pub shutdown: Arc<AtomicBool>,
pub task_counter: Arc<AtomicU64>,
pub global_queue: Arc<Mutex<VecDeque<TaskId>>>,
pub results: Arc<Mutex<HashMap<TaskId, RtObject>>>,
}
impl SharedState {
pub fn new() -> Self {
SharedState {
shutdown: Arc::new(AtomicBool::new(false)),
task_counter: Arc::new(AtomicU64::new(0)),
global_queue: Arc::new(Mutex::new(VecDeque::new())),
results: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn request_shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
}
pub fn should_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Acquire)
}
pub fn next_task_id(&self) -> TaskId {
let id = self.task_counter.fetch_add(1, Ordering::Relaxed);
TaskId::new(id)
}
pub fn push_task(&self, task_id: TaskId) {
if let Ok(mut queue) = self.global_queue.lock() {
queue.push_back(task_id);
}
}
pub fn pop_task(&self) -> Option<TaskId> {
self.global_queue.lock().ok()?.pop_front()
}
pub fn store_result(&self, task_id: TaskId, result: RtObject) {
if let Ok(mut results) = self.results.lock() {
results.insert(task_id, result);
}
}
pub fn get_result(&self, task_id: TaskId) -> Option<RtObject> {
self.results.lock().ok()?.get(&task_id).cloned()
}
}
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub struct PreemptionSimulator {
pub time_slice: u64,
pub ticks_used: u64,
pub preemptions: u64,
pub active_task: Option<TaskId>,
}
#[allow(dead_code)]
impl PreemptionSimulator {
pub fn new(time_slice: u64) -> Self {
PreemptionSimulator {
time_slice,
ticks_used: 0,
preemptions: 0,
active_task: None,
}
}
pub fn set_active(&mut self, id: TaskId) {
self.active_task = Some(id);
self.ticks_used = 0;
}
pub fn tick(&mut self) -> bool {
self.ticks_used += 1;
if self.ticks_used >= self.time_slice {
self.preemptions += 1;
self.ticks_used = 0;
self.active_task = None;
true
} else {
false
}
}
pub fn remaining(&self) -> u64 {
self.time_slice.saturating_sub(self.ticks_used)
}
}
#[derive(Clone, Debug)]
pub struct SchedulerConfig {
pub num_workers: usize,
pub deque_capacity: usize,
pub max_tasks: usize,
pub work_stealing: bool,
pub steal_batch_size: usize,
pub priority_scheduling: bool,
pub max_retries: u32,
}
impl SchedulerConfig {
pub fn new() -> Self {
SchedulerConfig {
num_workers: 4,
deque_capacity: 1024,
max_tasks: 100_000,
work_stealing: true,
steal_batch_size: 4,
priority_scheduling: true,
max_retries: 3,
}
}
pub fn single_threaded() -> Self {
SchedulerConfig {
num_workers: 1,
deque_capacity: 1024,
max_tasks: 100_000,
work_stealing: false,
steal_batch_size: 1,
priority_scheduling: false,
max_retries: 0,
}
}
pub fn with_workers(mut self, n: usize) -> Self {
self.num_workers = n.max(1);
self
}
pub fn with_deque_capacity(mut self, cap: usize) -> Self {
self.deque_capacity = cap;
self
}
pub fn with_max_tasks(mut self, max: usize) -> Self {
self.max_tasks = max;
self
}
pub fn with_work_stealing(mut self, enabled: bool) -> Self {
self.work_stealing = enabled;
self
}
}
pub struct Scheduler {
config: SchedulerConfig,
pub(super) workers: Vec<Worker>,
tasks: HashMap<TaskId, Task>,
pub(super) global_queue: VecDeque<TaskId>,
pub(super) completed: Vec<TaskId>,
next_task_id: u64,
running: bool,
stats: SchedulerStats,
}
impl Scheduler {
pub fn new(config: SchedulerConfig) -> Self {
let workers: Vec<Worker> = (0..config.num_workers)
.map(|id| Worker::new(id, config.deque_capacity))
.collect();
Scheduler {
config,
workers,
tasks: HashMap::new(),
global_queue: VecDeque::new(),
completed: Vec::new(),
next_task_id: 0,
running: false,
stats: SchedulerStats::default(),
}
}
pub fn single_threaded() -> Self {
Scheduler::new(SchedulerConfig::single_threaded())
}
pub fn spawn(&mut self, action: RtObject) -> TaskId {
let id = TaskId::new(self.next_task_id);
self.next_task_id += 1;
let task = Task::new(id, action);
self.tasks.insert(id, task);
self.global_queue.push_back(id);
self.stats.tasks_created += 1;
let active = self.active_task_count() as u64;
if active > self.stats.peak_active_tasks {
self.stats.peak_active_tasks = active;
}
id
}
pub fn spawn_named(&mut self, name: String, action: RtObject) -> TaskId {
let id = TaskId::new(self.next_task_id);
self.next_task_id += 1;
let task = Task::named(id, name, action);
self.tasks.insert(id, task);
self.global_queue.push_back(id);
self.stats.tasks_created += 1;
id
}
pub fn spawn_with_priority(&mut self, action: RtObject, priority: TaskPriority) -> TaskId {
let id = TaskId::new(self.next_task_id);
self.next_task_id += 1;
let task = Task::new(id, action).with_priority(priority);
self.tasks.insert(id, task);
if self.config.priority_scheduling && priority >= TaskPriority::High {
self.global_queue.push_front(id);
} else {
self.global_queue.push_back(id);
}
self.stats.tasks_created += 1;
id
}
pub fn spawn_with_deps(&mut self, action: RtObject, deps: Vec<TaskId>) -> TaskId {
let id = TaskId::new(self.next_task_id);
self.next_task_id += 1;
let mut task = Task::new(id, action);
task.dependencies = deps.clone();
let all_satisfied = deps.iter().all(|dep| self.completed.contains(dep));
if all_satisfied {
task.state = TaskState::Created;
self.global_queue.push_back(id);
} else {
task.state = TaskState::Suspended {
waiting_on: deps.clone(),
};
}
for dep in &deps {
if let Some(dep_task) = self.tasks.get_mut(dep) {
dep_task.dependents.push(id);
}
}
self.tasks.insert(id, task);
self.stats.tasks_created += 1;
id
}
pub fn get_task(&self, id: TaskId) -> Option<&Task> {
self.tasks.get(&id)
}
pub fn get_task_mut(&mut self, id: TaskId) -> Option<&mut Task> {
self.tasks.get_mut(&id)
}
pub fn cancel(&mut self, id: TaskId) -> bool {
if let Some(task) = self.tasks.get_mut(&id) {
if !task.state.is_terminal() {
task.cancel();
self.stats.tasks_cancelled += 1;
return true;
}
}
false
}
pub fn is_complete(&self, id: TaskId) -> bool {
self.tasks
.get(&id)
.map(|t| t.state.is_terminal())
.unwrap_or(false)
}
pub fn get_result(&self, id: TaskId) -> Option<&RtObject> {
self.tasks.get(&id).and_then(|t| t.result())
}
pub fn schedule_step(&mut self) -> Option<(usize, TaskId)> {
self.stats.scheduling_rounds += 1;
while let Some(task_id) = self.global_queue.pop_front() {
let target_worker = self.find_least_loaded_worker();
if !self.workers[target_worker].push_task(task_id) {
self.global_queue.push_front(task_id);
break;
}
if let Some(task) = self.tasks.get_mut(&task_id) {
task.state = TaskState::Queued;
}
}
for worker_id in 0..self.workers.len() {
if let Some(task_id) = self.workers[worker_id].pop_task() {
self.workers[worker_id].start_task(task_id);
if let Some(task) = self.tasks.get_mut(&task_id) {
task.state = TaskState::Running { worker_id };
}
return Some((worker_id, task_id));
}
}
if self.config.work_stealing {
if let Some((worker_id, task_id)) = self.try_steal() {
self.workers[worker_id].start_task(task_id);
if let Some(task) = self.tasks.get_mut(&task_id) {
task.state = TaskState::Running { worker_id };
}
return Some((worker_id, task_id));
}
}
self.stats.idle_cycles += 1;
None
}
pub fn complete_task(&mut self, task_id: TaskId, result: RtObject) {
let dependents = self
.tasks
.get(&task_id)
.map(|t| t.dependents.clone())
.unwrap_or_default();
if let Some(task) = self.tasks.get_mut(&task_id) {
task.complete(result);
self.completed.push(task_id);
self.stats.tasks_completed += 1;
}
for worker in &mut self.workers {
if worker.current_task == Some(task_id) {
worker.finish_task();
break;
}
}
for dep_id in &dependents {
self.try_wake_task(*dep_id);
}
}
pub fn fail_task(&mut self, task_id: TaskId, error: String) {
if let Some(task) = self.tasks.get_mut(&task_id) {
task.fail(error);
self.stats.tasks_failed += 1;
}
for worker in &mut self.workers {
if worker.current_task == Some(task_id) {
worker.finish_task();
break;
}
}
}
fn try_wake_task(&mut self, task_id: TaskId) {
let should_wake = if let Some(task) = self.tasks.get(&task_id) {
if let TaskState::Suspended { ref waiting_on } = task.state {
waiting_on.iter().all(|dep| self.completed.contains(dep))
} else {
false
}
} else {
false
};
if should_wake {
if let Some(task) = self.tasks.get_mut(&task_id) {
task.state = TaskState::Queued;
}
self.global_queue.push_back(task_id);
}
}
fn find_least_loaded_worker(&self) -> usize {
self.workers
.iter()
.enumerate()
.min_by_key(|(_, w)| w.load())
.map(|(i, _)| i)
.unwrap_or(0)
}
fn try_steal(&mut self) -> Option<(usize, TaskId)> {
self.stats.steal_attempts += 1;
let idle_worker = self.workers.iter().position(|w| w.idle)?;
let busy_worker = self
.workers
.iter()
.enumerate()
.filter(|(i, _)| *i != idle_worker)
.max_by_key(|(_, w)| w.deque.len())?
.0;
if self.workers[busy_worker].deque.is_empty() {
return None;
}
let stolen = self.workers[busy_worker].deque.steal()?;
self.workers[busy_worker].tasks_stolen_from += 1;
self.workers[idle_worker].tasks_stolen += 1;
self.stats.total_steals += 1;
Some((idle_worker, stolen))
}
pub fn active_task_count(&self) -> usize {
self.tasks
.values()
.filter(|t| !t.state.is_terminal())
.count()
}
pub fn completed_count(&self) -> usize {
self.completed.len()
}
pub fn num_workers(&self) -> usize {
self.workers.len()
}
pub fn worker_stats(&self) -> Vec<WorkerStats> {
self.workers
.iter()
.map(|w| WorkerStats {
id: w.id,
tasks_completed: w.tasks_completed,
tasks_stolen: w.tasks_stolen,
tasks_stolen_from: w.tasks_stolen_from,
queue_length: w.deque.len(),
idle: w.idle,
})
.collect()
}
pub fn stats(&self) -> &SchedulerStats {
&self.stats
}
pub fn config(&self) -> &SchedulerConfig {
&self.config
}
pub fn is_running(&self) -> bool {
self.running
}
pub fn start(&mut self) {
self.running = true;
}
pub fn stop(&mut self) {
self.running = false;
}
pub fn reset(&mut self) {
self.tasks.clear();
self.global_queue.clear();
self.completed.clear();
self.next_task_id = 0;
self.stats = SchedulerStats::default();
for worker in &mut self.workers {
worker.deque.clear();
worker.tasks_completed = 0;
worker.tasks_stolen = 0;
worker.tasks_stolen_from = 0;
worker.idle = true;
worker.current_task = None;
}
}
pub fn run_all(&mut self, mut executor: impl FnMut(&Task) -> Result<RtObject, String>) {
self.start();
while self.active_task_count() > 0 {
if let Some((_worker_id, task_id)) = self.schedule_step() {
let result = {
let task = self
.tasks
.get(&task_id)
.expect("task_id returned by schedule_step must exist in the tasks map");
executor(task)
};
match result {
Ok(value) => self.complete_task(task_id, value),
Err(error) => self.fail_task(task_id, error),
}
} else {
let has_suspended = self.tasks.values().any(|t| t.state.is_suspended());
if has_suspended && self.global_queue.is_empty() {
let suspended: Vec<TaskId> = self
.tasks
.iter()
.filter(|(_, t)| t.state.is_suspended())
.map(|(id, _)| *id)
.collect();
for id in suspended {
self.fail_task(id, "deadlock detected".to_string());
}
}
break;
}
}
self.stop();
}
}
pub struct WorkStealingDeque {
pub(super) deque: VecDeque<TaskId>,
pub(super) capacity: usize,
}
impl WorkStealingDeque {
pub fn new(capacity: usize) -> Self {
WorkStealingDeque {
deque: VecDeque::with_capacity(capacity),
capacity,
}
}
pub fn push(&mut self, task_id: TaskId) -> bool {
if self.deque.len() >= self.capacity {
return false;
}
self.deque.push_back(task_id);
true
}
pub fn pop(&mut self) -> Option<TaskId> {
self.deque.pop_back()
}
pub fn steal(&mut self) -> Option<TaskId> {
self.deque.pop_front()
}
pub fn len(&self) -> usize {
self.deque.len()
}
pub fn is_empty(&self) -> bool {
self.deque.is_empty()
}
pub fn is_full(&self) -> bool {
self.deque.len() >= self.capacity
}
pub fn clear(&mut self) {
self.deque.clear();
}
pub fn peek(&self) -> Option<&TaskId> {
self.deque.back()
}
pub fn steal_batch(&mut self, n: usize) -> Vec<TaskId> {
let count = n.min(self.deque.len() / 2).max(1).min(self.deque.len());
let mut stolen = Vec::with_capacity(count);
for _ in 0..count {
if let Some(task_id) = self.deque.pop_front() {
stolen.push(task_id);
} else {
break;
}
}
stolen
}
}
#[derive(Clone, Debug, Default)]
pub struct SchedulerStats {
pub tasks_created: u64,
pub tasks_completed: u64,
pub tasks_failed: u64,
pub tasks_cancelled: u64,
pub total_steals: u64,
pub steal_attempts: u64,
pub idle_cycles: u64,
pub peak_active_tasks: u64,
pub scheduling_rounds: u64,
}
#[allow(dead_code)]
pub struct ActorMailbox {
pub id: ActorId,
messages: VecDeque<ActorMessage>,
pub total_received: u64,
pub total_processed: u64,
}
#[allow(dead_code)]
impl ActorMailbox {
pub fn new(id: ActorId) -> Self {
ActorMailbox {
id,
messages: VecDeque::new(),
total_received: 0,
total_processed: 0,
}
}
pub fn send(&mut self, msg: ActorMessage) {
self.messages.push_back(msg);
self.total_received += 1;
}
pub fn receive(&mut self) -> Option<ActorMessage> {
let msg = self.messages.pop_front();
if msg.is_some() {
self.total_processed += 1;
}
msg
}
pub fn pending(&self) -> usize {
self.messages.len()
}
pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct TaskId(pub u64);
impl TaskId {
pub fn new(id: u64) -> Self {
TaskId(id)
}
pub fn raw(self) -> u64 {
self.0
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TaskState {
Created,
Queued,
Running {
worker_id: usize,
},
Suspended {
waiting_on: Vec<TaskId>,
},
Completed {
result: RtObject,
},
Failed {
error: String,
},
Cancelled,
}
impl TaskState {
pub fn is_terminal(&self) -> bool {
matches!(
self,
TaskState::Completed { .. } | TaskState::Failed { .. } | TaskState::Cancelled
)
}
pub fn is_runnable(&self) -> bool {
matches!(self, TaskState::Created | TaskState::Queued)
}
pub fn is_running(&self) -> bool {
matches!(self, TaskState::Running { .. })
}
pub fn is_suspended(&self) -> bool {
matches!(self, TaskState::Suspended { .. })
}
}
#[allow(dead_code)]
pub struct YieldPoint {
requested: Arc<AtomicBool>,
pub check_count: u64,
pub yield_count: u64,
pub check_interval: u64,
}
#[allow(dead_code)]
impl YieldPoint {
pub fn new() -> Self {
YieldPoint {
requested: Arc::new(AtomicBool::new(false)),
check_count: 0,
yield_count: 0,
check_interval: 100,
}
}
pub fn with_interval(check_interval: u64) -> Self {
YieldPoint {
requested: Arc::new(AtomicBool::new(false)),
check_count: 0,
yield_count: 0,
check_interval,
}
}
pub fn request_yield(&self) {
self.requested.store(true, Ordering::Release);
}
pub fn clear_request(&self) {
self.requested.store(false, Ordering::Release);
}
pub fn should_yield(&mut self) -> bool {
self.check_count += 1;
if self.requested.load(Ordering::Acquire) {
self.yield_count += 1;
self.clear_request();
true
} else {
false
}
}
pub fn handle(&self) -> YieldHandle {
YieldHandle {
requested: Arc::clone(&self.requested),
}
}
}