1#![cfg_attr(not(feature = "std"), no_std)]
32
33extern crate alloc;
34use alloc::{vec::Vec, collections::BinaryHeap, sync::Arc};
35use core::{
36 cmp::Ordering,
37 sync::atomic::{AtomicU64, AtomicBool, Ordering as AtomicOrdering},
38 time::Duration,
39};
40
41#[cfg(feature = "wasm")]
42use wasm_bindgen::prelude::*;
43
44use cfg_if::cfg_if;
45use parking_lot::{RwLock, Mutex};
46use smallvec::SmallVec;
47
48#[cfg(feature = "serde")]
49use serde::{Serialize, Deserialize};
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
53#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
54pub struct Timestamp(u64);
55
56impl Timestamp {
57 #[inline(always)]
59 pub fn now() -> Self {
60 cfg_if! {
61 if #[cfg(all(target_arch = "x86_64", not(target_arch = "wasm32")))] {
62 unsafe {
64 let tsc: u64;
65 core::arch::asm!("rdtsc", out("rax") tsc, out("rdx") _, options(nostack, nomem));
66 Timestamp(tsc)
67 }
68 } else if #[cfg(target_arch = "wasm32")] {
69 let perf = web_sys::window()
71 .expect("no window")
72 .performance()
73 .expect("no performance");
74 Timestamp((perf.now() * 1_000_000.0) as u64) } else {
76 #[cfg(feature = "std")]
78 {
79 use std::time::{SystemTime, UNIX_EPOCH};
80 let nanos = SystemTime::now()
81 .duration_since(UNIX_EPOCH)
82 .unwrap()
83 .as_nanos() as u64;
84 Timestamp(nanos)
85 }
86 #[cfg(not(feature = "std"))]
87 {
88 static COUNTER: AtomicU64 = AtomicU64::new(0);
90 Timestamp(COUNTER.fetch_add(1, AtomicOrdering::SeqCst))
91 }
92 }
93 }
94 }
95
96 #[inline(always)]
98 pub fn as_nanos(&self) -> u64 {
99 self.0
100 }
101
102 #[inline(always)]
104 pub fn elapsed(&self) -> Duration {
105 let now = Self::now();
106 let diff = now.0.saturating_sub(self.0);
107 Duration::from_nanos(diff)
108 }
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
113#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
114pub enum Priority {
115 Low = 0,
116 Normal = 1,
117 High = 2,
118 Critical = 3,
119}
120
121#[derive(Clone)]
123pub struct Task {
124 id: u64,
125 execute_at: Timestamp,
126 priority: Priority,
127 callback: Arc<dyn Fn() + Send + Sync>,
128}
129
130impl Task {
131 pub fn new<F>(callback: F, delay: Duration) -> Self
133 where
134 F: Fn() + Send + Sync + 'static,
135 {
136 static TASK_ID: AtomicU64 = AtomicU64::new(0);
137 let execute_at = Timestamp::now();
138 let execute_at = Timestamp(execute_at.0 + delay.as_nanos() as u64);
139
140 Self {
141 id: TASK_ID.fetch_add(1, AtomicOrdering::SeqCst),
142 execute_at,
143 priority: Priority::Normal,
144 callback: Arc::new(callback),
145 }
146 }
147
148 pub fn with_priority(mut self, priority: Priority) -> Self {
150 self.priority = priority;
151 self
152 }
153}
154
155impl PartialEq for Task {
156 fn eq(&self, other: &Self) -> bool {
157 self.id == other.id
158 }
159}
160
161impl Eq for Task {}
162
163impl PartialOrd for Task {
164 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
165 Some(self.cmp(other))
166 }
167}
168
169impl Ord for Task {
170 fn cmp(&self, other: &Self) -> Ordering {
171 other.execute_at.cmp(&self.execute_at)
173 .then_with(|| self.priority.cmp(&other.priority))
174 }
175}
176
177#[derive(Debug, Clone)]
179#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
180pub struct Config {
181 pub tick_rate_ns: u64,
183 pub max_tasks_per_tick: usize,
185 pub parallel: bool,
187 pub lipschitz_constant: f64,
189 pub window_size: usize,
191}
192
193impl Default for Config {
194 fn default() -> Self {
195 Self {
196 tick_rate_ns: 1000, max_tasks_per_tick: 100,
198 parallel: cfg!(not(target_arch = "wasm32")),
199 lipschitz_constant: 0.9,
200 window_size: 100,
201 }
202 }
203}
204
205#[derive(Debug, Clone, Default)]
207#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
208pub struct Metrics {
209 pub total_ticks: u64,
210 pub total_tasks: u64,
211 pub avg_tick_time_ns: u64,
212 pub min_tick_time_ns: u64,
213 pub max_tick_time_ns: u64,
214 pub tasks_per_second: f64,
215}
216
217pub struct Scheduler {
219 config: Config,
220 task_queue: Arc<Mutex<BinaryHeap<Task>>>,
221 running: Arc<AtomicBool>,
222 metrics: Arc<RwLock<Metrics>>,
223 temporal_windows: Arc<RwLock<Vec<Timestamp>>>,
224 strange_loop_state: Arc<RwLock<f64>>,
225}
226
227impl Scheduler {
228 pub fn new(config: Config) -> Self {
230 Self {
231 config,
232 task_queue: Arc::new(Mutex::new(BinaryHeap::new())),
233 running: Arc::new(AtomicBool::new(false)),
234 metrics: Arc::new(RwLock::new(Metrics::default())),
235 temporal_windows: Arc::new(RwLock::new(Vec::new())),
236 strange_loop_state: Arc::new(RwLock::new(0.5)),
237 }
238 }
239
240 pub fn schedule(&self, task: Task) {
242 let mut queue = self.task_queue.lock();
243 queue.push(task);
244 }
245
246 pub fn run(&self) {
248 self.running.store(true, AtomicOrdering::SeqCst);
249
250 cfg_if! {
251 if #[cfg(target_arch = "wasm32")] {
252 self.tick();
255 } else {
256 while self.running.load(AtomicOrdering::SeqCst) {
258 self.tick();
259
260 let target_duration = Duration::from_nanos(self.config.tick_rate_ns);
262 let start = Timestamp::now();
263 while start.elapsed() < target_duration {
264 core::hint::spin_loop();
265 }
266 }
267 }
268 }
269 }
270
271 #[inline(always)]
273 pub fn tick(&self) {
274 let tick_start = Timestamp::now();
275 let now = tick_start;
276
277 {
279 let mut state = self.strange_loop_state.write();
280 let k = self.config.lipschitz_constant;
281 *state = k * (*state) + (1.0 - k) * 0.5;
282 }
283
284 {
286 let mut windows = self.temporal_windows.write();
287 windows.push(now);
288 if windows.len() > self.config.window_size {
289 windows.remove(0);
290 }
291 }
292
293 let mut executed = 0;
295 let mut tasks_to_execute = SmallVec::<[Task; 16]>::new();
296
297 {
298 let mut queue = self.task_queue.lock();
299 while executed < self.config.max_tasks_per_tick {
300 match queue.peek() {
301 Some(task) if task.execute_at <= now => {
302 if let Some(task) = queue.pop() {
303 tasks_to_execute.push(task);
304 executed += 1;
305 }
306 }
307 _ => break,
308 }
309 }
310 }
311
312 cfg_if! {
314 if #[cfg(all(feature = "parallel", not(target_arch = "wasm32")))] {
315 use rayon::prelude::*;
316 tasks_to_execute.par_iter().for_each(|task| {
317 (task.callback)();
318 });
319 } else {
320 for task in tasks_to_execute {
321 (task.callback)();
322 }
323 }
324 }
325
326 let tick_duration = tick_start.elapsed().as_nanos() as u64;
328 {
329 let mut metrics = self.metrics.write();
330 metrics.total_ticks += 1;
331 metrics.total_tasks += executed as u64;
332
333 if metrics.min_tick_time_ns == 0 || tick_duration < metrics.min_tick_time_ns {
335 metrics.min_tick_time_ns = tick_duration;
336 }
337 if tick_duration > metrics.max_tick_time_ns {
338 metrics.max_tick_time_ns = tick_duration;
339 }
340
341 let alpha = 0.1; metrics.avg_tick_time_ns = ((1.0 - alpha) * metrics.avg_tick_time_ns as f64
344 + alpha * tick_duration as f64) as u64;
345
346 if metrics.avg_tick_time_ns > 0 {
348 metrics.tasks_per_second = (executed as f64 * 1_000_000_000.0)
349 / metrics.avg_tick_time_ns as f64;
350 }
351 }
352 }
353
354 pub fn stop(&self) {
356 self.running.store(false, AtomicOrdering::SeqCst);
357 }
358
359 pub fn metrics(&self) -> Metrics {
361 self.metrics.read().clone()
362 }
363
364 pub fn temporal_overlap(&self) -> f64 {
366 let windows = self.temporal_windows.read();
367 if windows.len() < 2 {
368 return 0.0;
369 }
370
371 let mut overlaps = 0;
372 for i in 1..windows.len() {
373 let diff = windows[i].0.saturating_sub(windows[i-1].0);
374 if diff < self.config.tick_rate_ns * 2 {
375 overlaps += 1;
376 }
377 }
378
379 (overlaps as f64) / (windows.len() as f64 - 1.0)
380 }
381
382 pub fn strange_loop_state(&self) -> f64 {
384 *self.strange_loop_state.read()
385 }
386}
387
388#[cfg(feature = "wasm")]
390#[wasm_bindgen]
391pub struct WasmScheduler {
392 inner: Scheduler,
393}
394
395#[cfg(feature = "wasm")]
396#[wasm_bindgen]
397impl WasmScheduler {
398 #[wasm_bindgen(constructor)]
399 pub fn new() -> Self {
400 Self {
401 inner: Scheduler::new(Config::default()),
402 }
403 }
404
405 #[wasm_bindgen]
406 pub fn tick(&self) {
407 self.inner.tick();
408 }
409
410 #[cfg(feature = "serde")]
411 #[wasm_bindgen]
412 pub fn get_metrics(&self) -> js_sys::JsValue {
413 let metrics = self.inner.metrics();
414 serde_wasm_bindgen::to_value(&metrics).unwrap()
415 }
416}
417
418pub mod bench_utils;
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423
424 #[test]
425 fn test_timestamp_ordering() {
426 let t1 = Timestamp::now();
427 let t2 = Timestamp::now();
428 assert!(t2 >= t1);
429 }
430
431 #[test]
432 fn test_task_scheduling() {
433 let scheduler = Scheduler::new(Config::default());
434
435 let counter = Arc::new(AtomicU64::new(0));
436 let counter_clone = counter.clone();
437
438 scheduler.schedule(Task::new(
439 move || {
440 counter_clone.fetch_add(1, AtomicOrdering::SeqCst);
441 },
442 Duration::from_nanos(0)
443 ));
444
445 scheduler.tick();
446 assert_eq!(counter.load(AtomicOrdering::SeqCst), 1);
447 }
448
449 #[test]
450 fn test_strange_loop_convergence() {
451 let scheduler = Scheduler::new(Config {
452 lipschitz_constant: 0.9,
453 ..Default::default()
454 });
455
456 for _ in 0..100 {
457 scheduler.tick();
458 }
459
460 let state = scheduler.strange_loop_state();
461 assert!((state - 0.5).abs() < 0.1);
462 }
463}