Skip to main content

seq_runtime/scheduler/
registry.rs

1//! Lock-free strand registry (diagnostics feature only).
2//!
3//! A fixed-size array of slots for tracking active strands without locks.
4//! Each slot stores a strand ID (0 = free) and spawn timestamp.
5//!
6//! Design principles:
7//! - Fixed size: No dynamic allocation, predictable memory footprint
8//! - Lock-free: All operations use atomic CAS, no mutex contention
9//! - Bounded: If registry is full, strands still run but aren't tracked
10//! - Zero cost when not querying: Only diagnostics reads the registry
11//!
12//! Slot encoding:
13//! - `strand_id == 0`: slot is free
14//! - `strand_id > 0`: slot contains an active strand
15//!
16//! The registry size can be configured via `SEQ_STRAND_REGISTRY_SIZE` env var.
17//! Default is 1024 slots, which is sufficient for most applications.
18//!
19//! When the "diagnostics" feature is disabled, the registry is not compiled,
20//! eliminating the `SystemTime::now()` syscall and O(n) scans on every spawn.
21
22use std::sync::OnceLock;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25/// Default strand registry size (number of trackable concurrent strands)
26const DEFAULT_REGISTRY_SIZE: usize = 1024;
27
28/// A slot in the strand registry
29///
30/// Uses two atomics to store strand info without locks.
31/// A slot is free when `strand_id == 0`.
32pub struct StrandSlot {
33    /// Strand ID (0 = free, >0 = active strand)
34    pub strand_id: AtomicU64,
35    /// Spawn timestamp (seconds since UNIX epoch, for detecting stuck strands)
36    pub spawn_time: AtomicU64,
37}
38
39impl StrandSlot {
40    const fn new() -> Self {
41        Self {
42            strand_id: AtomicU64::new(0),
43            spawn_time: AtomicU64::new(0),
44        }
45    }
46}
47
48/// Lock-free strand registry
49///
50/// Provides O(n) registration (scan for free slot) and O(n) unregistration.
51/// This is acceptable because:
52/// 1. N is bounded (default 1024)
53/// 2. Registration/unregistration are infrequent compared to strand work
54/// 3. No locks means no contention, just atomic ops
55pub struct StrandRegistry {
56    slots: Box<[StrandSlot]>,
57    /// Number of slots that couldn't be registered (registry full)
58    pub overflow_count: AtomicU64,
59}
60
61impl StrandRegistry {
62    /// Create a new registry with the given capacity
63    pub(super) fn new(capacity: usize) -> Self {
64        let mut slots = Vec::with_capacity(capacity);
65        for _ in 0..capacity {
66            slots.push(StrandSlot::new());
67        }
68        Self {
69            slots: slots.into_boxed_slice(),
70            overflow_count: AtomicU64::new(0),
71        }
72    }
73
74    /// Register a strand, returning the slot index if successful
75    ///
76    /// Uses CAS to atomically claim a free slot.
77    /// Returns None if the registry is full (strand still runs, just not tracked).
78    pub fn register(&self, strand_id: u64) -> Option<usize> {
79        let spawn_time = std::time::SystemTime::now()
80            .duration_since(std::time::UNIX_EPOCH)
81            .map(|d| d.as_secs())
82            .unwrap_or(0);
83
84        // Scan for a free slot
85        for (idx, slot) in self.slots.iter().enumerate() {
86            // Set spawn time first, before claiming the slot
87            // This prevents a race where a reader sees strand_id != 0 but spawn_time == 0
88            // If we fail to claim the slot, the owner will overwrite this value anyway
89            slot.spawn_time.store(spawn_time, Ordering::Relaxed);
90
91            // Try to claim this slot (CAS from 0 to strand_id)
92            // AcqRel ensures the spawn_time write above is visible before strand_id becomes non-zero
93            if slot
94                .strand_id
95                .compare_exchange(0, strand_id, Ordering::AcqRel, Ordering::Relaxed)
96                .is_ok()
97            {
98                return Some(idx);
99            }
100        }
101
102        // Registry full - track overflow but strand still runs
103        self.overflow_count.fetch_add(1, Ordering::Relaxed);
104        None
105    }
106
107    /// Unregister a strand by ID
108    ///
109    /// Scans for the slot containing this strand ID and clears it.
110    /// Returns true if found and cleared, false if not found.
111    ///
112    /// Note: ABA problem is not a concern here because strand IDs are monotonically
113    /// increasing u64 values. ID reuse would require 2^64 spawns, which is practically
114    /// impossible (at 1 billion spawns/sec, it would take ~584 years).
115    pub fn unregister(&self, strand_id: u64) -> bool {
116        for slot in self.slots.iter() {
117            // Check if this slot contains our strand
118            if slot
119                .strand_id
120                .compare_exchange(strand_id, 0, Ordering::AcqRel, Ordering::Relaxed)
121                .is_ok()
122            {
123                // Successfully cleared the slot
124                slot.spawn_time.store(0, Ordering::Release);
125                return true;
126            }
127        }
128        false
129    }
130
131    /// Iterate over active strands (for diagnostics)
132    ///
133    /// Returns an iterator of (strand_id, spawn_time) for non-empty slots.
134    /// Note: This is a snapshot and may be slightly inconsistent due to concurrent updates.
135    pub fn active_strands(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
136        self.slots.iter().filter_map(|slot| {
137            // Acquire on strand_id synchronizes with the Release in register()
138            let id = slot.strand_id.load(Ordering::Acquire);
139            if id > 0 {
140                // Relaxed is sufficient here - we've already synchronized via strand_id Acquire
141                // and spawn_time is written before strand_id in register()
142                let time = slot.spawn_time.load(Ordering::Relaxed);
143                Some((id, time))
144            } else {
145                None
146            }
147        })
148    }
149
150    /// Get the registry capacity
151    pub fn capacity(&self) -> usize {
152        self.slots.len()
153    }
154}
155
156// Global strand registry (lazy initialized)
157static STRAND_REGISTRY: OnceLock<StrandRegistry> = OnceLock::new();
158
159/// Get or initialize the global strand registry
160pub fn strand_registry() -> &'static StrandRegistry {
161    STRAND_REGISTRY.get_or_init(|| {
162        let size = std::env::var("SEQ_STRAND_REGISTRY_SIZE")
163            .ok()
164            .and_then(|s| s.parse().ok())
165            .unwrap_or(DEFAULT_REGISTRY_SIZE);
166        StrandRegistry::new(size)
167    })
168}