seq_runtime/scheduler/registry.rs
1//! Lock-free strand registry (diagnostics feature only).
2//!
3//! A fixed-size array of slots for tracking active strands without locks.
4//! Each slot stores a strand ID (0 = free) and spawn timestamp.
5//!
6//! Design principles:
7//! - Fixed size: No dynamic allocation, predictable memory footprint
8//! - Lock-free: All operations use atomic CAS, no mutex contention
9//! - Bounded: If registry is full, strands still run but aren't tracked
10//! - Zero cost when not querying: Only diagnostics reads the registry
11//!
12//! Slot encoding:
13//! - `strand_id == 0`: slot is free
14//! - `strand_id > 0`: slot contains an active strand
15//!
16//! The registry size can be configured via `SEQ_STRAND_REGISTRY_SIZE` env var.
17//! Default is 1024 slots, which is sufficient for most applications.
18//!
19//! When the "diagnostics" feature is disabled, the registry is not compiled,
20//! eliminating the `SystemTime::now()` syscall and O(n) scans on every spawn.
21
22use std::sync::OnceLock;
23use std::sync::atomic::{AtomicU64, Ordering};
24
25/// Default strand registry size (number of trackable concurrent strands)
26const DEFAULT_REGISTRY_SIZE: usize = 1024;
27
28/// A slot in the strand registry
29///
30/// Uses two atomics to store strand info without locks.
31/// A slot is free when `strand_id == 0`.
32pub struct StrandSlot {
33 /// Strand ID (0 = free, >0 = active strand)
34 pub strand_id: AtomicU64,
35 /// Spawn timestamp (seconds since UNIX epoch, for detecting stuck strands)
36 pub spawn_time: AtomicU64,
37}
38
39impl StrandSlot {
40 const fn new() -> Self {
41 Self {
42 strand_id: AtomicU64::new(0),
43 spawn_time: AtomicU64::new(0),
44 }
45 }
46}
47
48/// Lock-free strand registry
49///
50/// Provides O(n) registration (scan for free slot) and O(n) unregistration.
51/// This is acceptable because:
52/// 1. N is bounded (default 1024)
53/// 2. Registration/unregistration are infrequent compared to strand work
54/// 3. No locks means no contention, just atomic ops
55pub struct StrandRegistry {
56 slots: Box<[StrandSlot]>,
57 /// Number of slots that couldn't be registered (registry full)
58 pub overflow_count: AtomicU64,
59}
60
61impl StrandRegistry {
62 /// Create a new registry with the given capacity
63 pub(super) fn new(capacity: usize) -> Self {
64 let mut slots = Vec::with_capacity(capacity);
65 for _ in 0..capacity {
66 slots.push(StrandSlot::new());
67 }
68 Self {
69 slots: slots.into_boxed_slice(),
70 overflow_count: AtomicU64::new(0),
71 }
72 }
73
74 /// Register a strand, returning the slot index if successful
75 ///
76 /// Uses CAS to atomically claim a free slot.
77 /// Returns None if the registry is full (strand still runs, just not tracked).
78 pub fn register(&self, strand_id: u64) -> Option<usize> {
79 let spawn_time = std::time::SystemTime::now()
80 .duration_since(std::time::UNIX_EPOCH)
81 .map(|d| d.as_secs())
82 .unwrap_or(0);
83
84 // Scan for a free slot
85 for (idx, slot) in self.slots.iter().enumerate() {
86 // Set spawn time first, before claiming the slot
87 // This prevents a race where a reader sees strand_id != 0 but spawn_time == 0
88 // If we fail to claim the slot, the owner will overwrite this value anyway
89 slot.spawn_time.store(spawn_time, Ordering::Relaxed);
90
91 // Try to claim this slot (CAS from 0 to strand_id)
92 // AcqRel ensures the spawn_time write above is visible before strand_id becomes non-zero
93 if slot
94 .strand_id
95 .compare_exchange(0, strand_id, Ordering::AcqRel, Ordering::Relaxed)
96 .is_ok()
97 {
98 return Some(idx);
99 }
100 }
101
102 // Registry full - track overflow but strand still runs
103 self.overflow_count.fetch_add(1, Ordering::Relaxed);
104 None
105 }
106
107 /// Unregister a strand by ID
108 ///
109 /// Scans for the slot containing this strand ID and clears it.
110 /// Returns true if found and cleared, false if not found.
111 ///
112 /// Note: ABA problem is not a concern here because strand IDs are monotonically
113 /// increasing u64 values. ID reuse would require 2^64 spawns, which is practically
114 /// impossible (at 1 billion spawns/sec, it would take ~584 years).
115 pub fn unregister(&self, strand_id: u64) -> bool {
116 for slot in self.slots.iter() {
117 // Check if this slot contains our strand
118 if slot
119 .strand_id
120 .compare_exchange(strand_id, 0, Ordering::AcqRel, Ordering::Relaxed)
121 .is_ok()
122 {
123 // Successfully cleared the slot
124 slot.spawn_time.store(0, Ordering::Release);
125 return true;
126 }
127 }
128 false
129 }
130
131 /// Iterate over active strands (for diagnostics)
132 ///
133 /// Returns an iterator of (strand_id, spawn_time) for non-empty slots.
134 /// Note: This is a snapshot and may be slightly inconsistent due to concurrent updates.
135 pub fn active_strands(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
136 self.slots.iter().filter_map(|slot| {
137 // Acquire on strand_id synchronizes with the Release in register()
138 let id = slot.strand_id.load(Ordering::Acquire);
139 if id > 0 {
140 // Relaxed is sufficient here - we've already synchronized via strand_id Acquire
141 // and spawn_time is written before strand_id in register()
142 let time = slot.spawn_time.load(Ordering::Relaxed);
143 Some((id, time))
144 } else {
145 None
146 }
147 })
148 }
149
150 /// Get the registry capacity
151 pub fn capacity(&self) -> usize {
152 self.slots.len()
153 }
154}
155
156// Global strand registry (lazy initialized)
157static STRAND_REGISTRY: OnceLock<StrandRegistry> = OnceLock::new();
158
159/// Get or initialize the global strand registry
160pub fn strand_registry() -> &'static StrandRegistry {
161 STRAND_REGISTRY.get_or_init(|| {
162 let size = std::env::var("SEQ_STRAND_REGISTRY_SIZE")
163 .ok()
164 .and_then(|s| s.parse().ok())
165 .unwrap_or(DEFAULT_REGISTRY_SIZE);
166 StrandRegistry::new(size)
167 })
168}