nanosecond_scheduler/
lib.rs

1//! # Nanosecond Scheduler
2//!
3//! Ultra-low latency scheduler with nanosecond precision for temporal consciousness applications.
4//! Designed for both native and WASM environments with <1μs tick overhead.
5//!
6//! ## Features
7//! - Hardware TSC-based timing (x86_64) or high-resolution timers (WASM)
8//! - Lock-free task queue with atomic operations
9//! - Strange loop convergence with Lipschitz constraints
10//! - Temporal window overlap management
11//! - Identity continuity tracking
12//! - SIMD optimizations (when available)
13//!
14//! ## Example
15//! ```
16//! use nanosecond_scheduler::{Scheduler, Task, Config};
17//! use std::time::Duration;
18//!
19//! let config = Config::default();
20//! let scheduler = Scheduler::new(config);
21//!
22//! scheduler.schedule(Task::new(
23//!     || { println!("Task executed!"); },
24//!     Duration::from_nanos(100)
25//! ));
26//!
27//! // Just tick once for the example
28//! scheduler.tick();
29//! ```
30
31#![cfg_attr(not(feature = "std"), no_std)]
32
33extern crate alloc;
34use alloc::{vec::Vec, collections::BinaryHeap, sync::Arc};
35use core::{
36    cmp::Ordering,
37    sync::atomic::{AtomicU64, AtomicBool, Ordering as AtomicOrdering},
38    time::Duration,
39};
40
41#[cfg(feature = "wasm")]
42use wasm_bindgen::prelude::*;
43
44use cfg_if::cfg_if;
45use parking_lot::{RwLock, Mutex};
46use smallvec::SmallVec;
47
48#[cfg(feature = "serde")]
49use serde::{Serialize, Deserialize};
50
51/// High-precision timestamp using native or WASM timing
52#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
53#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
54pub struct Timestamp(u64);
55
56impl Timestamp {
57    /// Get current timestamp with nanosecond precision
58    #[inline(always)]
59    pub fn now() -> Self {
60        cfg_if! {
61            if #[cfg(all(target_arch = "x86_64", not(target_arch = "wasm32")))] {
62                // Use TSC on x86_64 for lowest overhead
63                unsafe {
64                    let tsc: u64;
65                    core::arch::asm!("rdtsc", out("rax") tsc, out("rdx") _, options(nostack, nomem));
66                    Timestamp(tsc)
67                }
68            } else if #[cfg(target_arch = "wasm32")] {
69                // Use performance.now() in WASM
70                let perf = web_sys::window()
71                    .expect("no window")
72                    .performance()
73                    .expect("no performance");
74                Timestamp((perf.now() * 1_000_000.0) as u64) // Convert ms to ns
75            } else {
76                // Fallback to std time
77                #[cfg(feature = "std")]
78                {
79                    use std::time::{SystemTime, UNIX_EPOCH};
80                    let nanos = SystemTime::now()
81                        .duration_since(UNIX_EPOCH)
82                        .unwrap()
83                        .as_nanos() as u64;
84                    Timestamp(nanos)
85                }
86                #[cfg(not(feature = "std"))]
87                {
88                    // No std, use a counter
89                    static COUNTER: AtomicU64 = AtomicU64::new(0);
90                    Timestamp(COUNTER.fetch_add(1, AtomicOrdering::SeqCst))
91                }
92            }
93        }
94    }
95
96    /// Get the raw timestamp value
97    #[inline(always)]
98    pub fn as_nanos(&self) -> u64 {
99        self.0
100    }
101
102    /// Calculate elapsed time since this timestamp
103    #[inline(always)]
104    pub fn elapsed(&self) -> Duration {
105        let now = Self::now();
106        let diff = now.0.saturating_sub(self.0);
107        Duration::from_nanos(diff)
108    }
109}
110
111/// Task priority for scheduling
112#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
113#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
114pub enum Priority {
115    Low = 0,
116    Normal = 1,
117    High = 2,
118    Critical = 3,
119}
120
121/// A schedulable task
122#[derive(Clone)]
123pub struct Task {
124    id: u64,
125    execute_at: Timestamp,
126    priority: Priority,
127    callback: Arc<dyn Fn() + Send + Sync>,
128}
129
130impl Task {
131    /// Create a new task
132    pub fn new<F>(callback: F, delay: Duration) -> Self
133    where
134        F: Fn() + Send + Sync + 'static,
135    {
136        static TASK_ID: AtomicU64 = AtomicU64::new(0);
137        let execute_at = Timestamp::now();
138        let execute_at = Timestamp(execute_at.0 + delay.as_nanos() as u64);
139
140        Self {
141            id: TASK_ID.fetch_add(1, AtomicOrdering::SeqCst),
142            execute_at,
143            priority: Priority::Normal,
144            callback: Arc::new(callback),
145        }
146    }
147
148    /// Set task priority
149    pub fn with_priority(mut self, priority: Priority) -> Self {
150        self.priority = priority;
151        self
152    }
153}
154
155impl PartialEq for Task {
156    fn eq(&self, other: &Self) -> bool {
157        self.id == other.id
158    }
159}
160
161impl Eq for Task {}
162
163impl PartialOrd for Task {
164    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
165        Some(self.cmp(other))
166    }
167}
168
169impl Ord for Task {
170    fn cmp(&self, other: &Self) -> Ordering {
171        // Reverse order for min-heap behavior (earliest first)
172        other.execute_at.cmp(&self.execute_at)
173            .then_with(|| self.priority.cmp(&other.priority))
174    }
175}
176
177/// Scheduler configuration
178#[derive(Debug, Clone)]
179#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
180pub struct Config {
181    /// Target tick rate in nanoseconds
182    pub tick_rate_ns: u64,
183    /// Maximum tasks per tick
184    pub max_tasks_per_tick: usize,
185    /// Enable parallel execution (native only)
186    pub parallel: bool,
187    /// Strange loop Lipschitz constant
188    pub lipschitz_constant: f64,
189    /// Temporal window size
190    pub window_size: usize,
191}
192
193impl Default for Config {
194    fn default() -> Self {
195        Self {
196            tick_rate_ns: 1000, // 1 microsecond
197            max_tasks_per_tick: 100,
198            parallel: cfg!(not(target_arch = "wasm32")),
199            lipschitz_constant: 0.9,
200            window_size: 100,
201        }
202    }
203}
204
205/// Performance metrics
206#[derive(Debug, Clone, Default)]
207#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
208pub struct Metrics {
209    pub total_ticks: u64,
210    pub total_tasks: u64,
211    pub avg_tick_time_ns: u64,
212    pub min_tick_time_ns: u64,
213    pub max_tick_time_ns: u64,
214    pub tasks_per_second: f64,
215}
216
217/// The main nanosecond scheduler
218pub struct Scheduler {
219    config: Config,
220    task_queue: Arc<Mutex<BinaryHeap<Task>>>,
221    running: Arc<AtomicBool>,
222    metrics: Arc<RwLock<Metrics>>,
223    temporal_windows: Arc<RwLock<Vec<Timestamp>>>,
224    strange_loop_state: Arc<RwLock<f64>>,
225}
226
227impl Scheduler {
228    /// Create a new scheduler with the given configuration
229    pub fn new(config: Config) -> Self {
230        Self {
231            config,
232            task_queue: Arc::new(Mutex::new(BinaryHeap::new())),
233            running: Arc::new(AtomicBool::new(false)),
234            metrics: Arc::new(RwLock::new(Metrics::default())),
235            temporal_windows: Arc::new(RwLock::new(Vec::new())),
236            strange_loop_state: Arc::new(RwLock::new(0.5)),
237        }
238    }
239
240    /// Schedule a task for execution
241    pub fn schedule(&self, task: Task) {
242        let mut queue = self.task_queue.lock();
243        queue.push(task);
244    }
245
246    /// Start the scheduler (blocks in native, returns immediately in WASM)
247    pub fn run(&self) {
248        self.running.store(true, AtomicOrdering::SeqCst);
249
250        cfg_if! {
251            if #[cfg(target_arch = "wasm32")] {
252                // In WASM, we need to use setInterval or requestAnimationFrame
253                // This is a simplified version
254                self.tick();
255            } else {
256                // Native blocking loop
257                while self.running.load(AtomicOrdering::SeqCst) {
258                    self.tick();
259
260                    // Precise sleep using spin-wait for sub-microsecond precision
261                    let target_duration = Duration::from_nanos(self.config.tick_rate_ns);
262                    let start = Timestamp::now();
263                    while start.elapsed() < target_duration {
264                        core::hint::spin_loop();
265                    }
266                }
267            }
268        }
269    }
270
271    /// Perform one scheduler tick
272    #[inline(always)]
273    pub fn tick(&self) {
274        let tick_start = Timestamp::now();
275        let now = tick_start;
276
277        // Update strange loop state (contraction mapping)
278        {
279            let mut state = self.strange_loop_state.write();
280            let k = self.config.lipschitz_constant;
281            *state = k * (*state) + (1.0 - k) * 0.5;
282        }
283
284        // Update temporal windows
285        {
286            let mut windows = self.temporal_windows.write();
287            windows.push(now);
288            if windows.len() > self.config.window_size {
289                windows.remove(0);
290            }
291        }
292
293        // Process ready tasks
294        let mut executed = 0;
295        let mut tasks_to_execute = SmallVec::<[Task; 16]>::new();
296
297        {
298            let mut queue = self.task_queue.lock();
299            while executed < self.config.max_tasks_per_tick {
300                match queue.peek() {
301                    Some(task) if task.execute_at <= now => {
302                        if let Some(task) = queue.pop() {
303                            tasks_to_execute.push(task);
304                            executed += 1;
305                        }
306                    }
307                    _ => break,
308                }
309            }
310        }
311
312        // Execute tasks (parallel if enabled and not in WASM)
313        cfg_if! {
314            if #[cfg(all(feature = "parallel", not(target_arch = "wasm32")))] {
315                use rayon::prelude::*;
316                tasks_to_execute.par_iter().for_each(|task| {
317                    (task.callback)();
318                });
319            } else {
320                for task in tasks_to_execute {
321                    (task.callback)();
322                }
323            }
324        }
325
326        // Update metrics
327        let tick_duration = tick_start.elapsed().as_nanos() as u64;
328        {
329            let mut metrics = self.metrics.write();
330            metrics.total_ticks += 1;
331            metrics.total_tasks += executed as u64;
332
333            // Update min/max
334            if metrics.min_tick_time_ns == 0 || tick_duration < metrics.min_tick_time_ns {
335                metrics.min_tick_time_ns = tick_duration;
336            }
337            if tick_duration > metrics.max_tick_time_ns {
338                metrics.max_tick_time_ns = tick_duration;
339            }
340
341            // Update average
342            let alpha = 0.1; // EWMA factor
343            metrics.avg_tick_time_ns = ((1.0 - alpha) * metrics.avg_tick_time_ns as f64
344                + alpha * tick_duration as f64) as u64;
345
346            // Calculate throughput
347            if metrics.avg_tick_time_ns > 0 {
348                metrics.tasks_per_second = (executed as f64 * 1_000_000_000.0)
349                    / metrics.avg_tick_time_ns as f64;
350            }
351        }
352    }
353
354    /// Stop the scheduler
355    pub fn stop(&self) {
356        self.running.store(false, AtomicOrdering::SeqCst);
357    }
358
359    /// Get current metrics
360    pub fn metrics(&self) -> Metrics {
361        self.metrics.read().clone()
362    }
363
364    /// Get temporal window overlap percentage
365    pub fn temporal_overlap(&self) -> f64 {
366        let windows = self.temporal_windows.read();
367        if windows.len() < 2 {
368            return 0.0;
369        }
370
371        let mut overlaps = 0;
372        for i in 1..windows.len() {
373            let diff = windows[i].0.saturating_sub(windows[i-1].0);
374            if diff < self.config.tick_rate_ns * 2 {
375                overlaps += 1;
376            }
377        }
378
379        (overlaps as f64) / (windows.len() as f64 - 1.0)
380    }
381
382    /// Get strange loop convergence state
383    pub fn strange_loop_state(&self) -> f64 {
384        *self.strange_loop_state.read()
385    }
386}
387
388/// WASM bindings
389#[cfg(feature = "wasm")]
390#[wasm_bindgen]
391pub struct WasmScheduler {
392    inner: Scheduler,
393}
394
395#[cfg(feature = "wasm")]
396#[wasm_bindgen]
397impl WasmScheduler {
398    #[wasm_bindgen(constructor)]
399    pub fn new() -> Self {
400        Self {
401            inner: Scheduler::new(Config::default()),
402        }
403    }
404
405    #[wasm_bindgen]
406    pub fn tick(&self) {
407        self.inner.tick();
408    }
409
410    #[cfg(feature = "serde")]
411    #[wasm_bindgen]
412    pub fn get_metrics(&self) -> js_sys::JsValue {
413        let metrics = self.inner.metrics();
414        serde_wasm_bindgen::to_value(&metrics).unwrap()
415    }
416}
417
418pub mod bench_utils;
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423
424    #[test]
425    fn test_timestamp_ordering() {
426        let t1 = Timestamp::now();
427        let t2 = Timestamp::now();
428        assert!(t2 >= t1);
429    }
430
431    #[test]
432    fn test_task_scheduling() {
433        let scheduler = Scheduler::new(Config::default());
434
435        let counter = Arc::new(AtomicU64::new(0));
436        let counter_clone = counter.clone();
437
438        scheduler.schedule(Task::new(
439            move || {
440                counter_clone.fetch_add(1, AtomicOrdering::SeqCst);
441            },
442            Duration::from_nanos(0)
443        ));
444
445        scheduler.tick();
446        assert_eq!(counter.load(AtomicOrdering::SeqCst), 1);
447    }
448
449    #[test]
450    fn test_strange_loop_convergence() {
451        let scheduler = Scheduler::new(Config {
452            lipschitz_constant: 0.9,
453            ..Default::default()
454        });
455
456        for _ in 0..100 {
457            scheduler.tick();
458        }
459
460        let state = scheduler.strange_loop_state();
461        assert!((state - 0.5).abs() < 0.1);
462    }
463}