Skip to main content

murk_engine/
epoch.rs

1//! Epoch-based reclamation primitives for RealtimeAsync mode.
2//!
3//! Provides [`EpochCounter`] (global monotonic epoch) and [`WorkerEpoch`]
4//! (per-worker pin/unpin state with cache-line padding). These are the
5//! building blocks for the epoch reclamation protocol described in
6//! the [Architecture documentation](https://tachyon-beep.github.io/murk/architecture.html).
7
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::sync::OnceLock;
10use std::time::Instant;
11
12/// Sentinel value meaning "this worker is not pinned to any epoch."
13pub const EPOCH_UNPINNED: u64 = u64::MAX;
14
15/// Global epoch counter, incremented by TickEngine at each snapshot publication.
16///
17/// Monotonically increasing. Never wraps in practice (u64 overflow at 60 Hz
18/// would take ~9.7 billion years).
19pub struct EpochCounter {
20    current: AtomicU64,
21}
22
23impl Default for EpochCounter {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29// Compile-time assertion: EpochCounter must be Send + Sync.
30const _: fn() = || {
31    fn assert<T: Send + Sync>() {}
32    assert::<EpochCounter>();
33};
34
35impl EpochCounter {
36    /// Create a new epoch counter starting at 0.
37    pub fn new() -> Self {
38        Self {
39            current: AtomicU64::new(0),
40        }
41    }
42
43    /// Advance the epoch. Called by TickEngine after publishing a snapshot.
44    /// Returns the new epoch value.
45    pub fn advance(&self) -> u64 {
46        self.current.fetch_add(1, Ordering::Release) + 1
47    }
48
49    /// Read the current epoch value.
50    pub fn current(&self) -> u64 {
51        self.current.load(Ordering::Acquire)
52    }
53}
54
55/// Per-worker epoch state, padded to avoid false sharing.
56///
57/// Each egress worker holds one of these. The TickEngine reads all `pinned`
58/// fields during reclamation checks; without padding, adjacent workers'
59/// writes would invalidate each other's cache lines.
60///
61/// 128-byte alignment covers both 64-byte (x86) and 128-byte (Apple M-series)
62/// cache line sizes.
63#[repr(align(128))]
64pub struct WorkerEpoch {
65    /// The epoch this worker is currently pinned to.
66    /// `EPOCH_UNPINNED` means not holding any generation.
67    pinned: AtomicU64,
68
69    /// Monotonic timestamp (nanos) when `pin()` was called.
70    /// Used for stalled-worker detection: the tick thread computes
71    /// `now - pin_start_ns` to get the actual pin hold duration.
72    pin_start_ns: AtomicU64,
73
74    /// Monotonic timestamp (nanos) of the last unpin.
75    last_quiesce_ns: AtomicU64,
76
77    /// Cooperative cancellation flag.
78    cancel: AtomicBool,
79
80    /// Worker index (for diagnostics in stalled-worker reporting).
81    #[allow(dead_code)]
82    worker_id: u32,
83}
84
85// Compile-time assertion: WorkerEpoch must be Send + Sync.
86const _: fn() = || {
87    fn assert<T: Send + Sync>() {}
88    assert::<WorkerEpoch>();
89};
90
91impl WorkerEpoch {
92    /// Create a new worker epoch in the unpinned state.
93    ///
94    /// `last_quiesce_ns` is seeded to the current monotonic time so
95    /// that the first pin is never misclassified as a stall due to
96    /// elapsed process uptime.
97    pub fn new(worker_id: u32) -> Self {
98        let now = monotonic_nanos();
99        Self {
100            pinned: AtomicU64::new(EPOCH_UNPINNED),
101            pin_start_ns: AtomicU64::new(now),
102            last_quiesce_ns: AtomicU64::new(now),
103            cancel: AtomicBool::new(false),
104            worker_id,
105        }
106    }
107
108    /// Pin this worker to the given epoch before accessing a snapshot.
109    /// Records the pin-start timestamp for stall detection.
110    pub fn pin(&self, epoch: u64) {
111        self.pin_start_ns
112            .store(monotonic_nanos(), Ordering::Release);
113        self.pinned.store(epoch, Ordering::Release);
114    }
115
116    /// Unpin this worker after finishing with the snapshot.
117    /// Updates the quiescence timestamp.
118    pub fn unpin(&self) {
119        self.pinned.store(EPOCH_UNPINNED, Ordering::Release);
120        let now_ns = monotonic_nanos();
121        self.last_quiesce_ns.store(now_ns, Ordering::Release);
122    }
123
124    /// Whether this worker is currently pinned to an epoch.
125    pub fn is_pinned(&self) -> bool {
126        self.pinned.load(Ordering::Acquire) != EPOCH_UNPINNED
127    }
128
129    /// The epoch this worker is pinned to, or `EPOCH_UNPINNED`.
130    pub fn pinned_epoch(&self) -> u64 {
131        self.pinned.load(Ordering::Acquire)
132    }
133
134    /// Monotonic nanoseconds when `pin()` was last called.
135    /// Used by the tick thread to measure actual pin hold duration.
136    pub fn pin_start_ns(&self) -> u64 {
137        self.pin_start_ns.load(Ordering::Acquire)
138    }
139
140    /// Monotonic nanoseconds of the last unpin event.
141    pub fn last_quiesce_ns(&self) -> u64 {
142        self.last_quiesce_ns.load(Ordering::Acquire)
143    }
144
145    /// Check if cancellation has been requested.
146    pub fn is_cancelled(&self) -> bool {
147        self.cancel.load(Ordering::Acquire)
148    }
149
150    /// Request cancellation (called by TickEngine or shutdown).
151    pub fn request_cancel(&self) {
152        self.cancel.store(true, Ordering::Release);
153    }
154
155    /// Clear cancellation flag (called when worker is restarted/recycled).
156    pub fn clear_cancel(&self) {
157        self.cancel.store(false, Ordering::Release);
158    }
159}
160
161/// Compute the minimum pinned epoch across all workers.
162///
163/// Returns [`EPOCH_UNPINNED`] if no worker is pinned (all reclaimable).
164pub fn min_pinned_epoch(workers: &[WorkerEpoch]) -> u64 {
165    workers
166        .iter()
167        .map(|w| w.pinned_epoch())
168        .min()
169        .unwrap_or(EPOCH_UNPINNED)
170}
171
172/// Returns monotonic nanoseconds since an arbitrary process-local epoch.
173///
174/// Uses `OnceLock<Instant>` to lazily initialise a baseline. NOT wall-clock
175/// time — only for relative duration comparisons (stall detection).
176///
177/// This is the single source of truth for monotonic timestamps in the
178/// engine. All callers (epoch, tick_thread, egress) must use this
179/// function to avoid clock-skew between independent `OnceLock` statics.
180pub(crate) fn monotonic_nanos() -> u64 {
181    static EPOCH: OnceLock<Instant> = OnceLock::new();
182    let epoch = EPOCH.get_or_init(Instant::now);
183    Instant::now().duration_since(*epoch).as_nanos() as u64
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn test_epoch_advance() {
192        let counter = EpochCounter::new();
193        assert_eq!(counter.current(), 0);
194        assert_eq!(counter.advance(), 1);
195        assert_eq!(counter.advance(), 2);
196        assert_eq!(counter.advance(), 3);
197        assert_eq!(counter.current(), 3);
198
199        // Monotonicity: each advance returns a strictly larger value.
200        let mut prev = counter.current();
201        for _ in 0..100 {
202            let next = counter.advance();
203            assert!(next > prev);
204            prev = next;
205        }
206    }
207
208    #[test]
209    fn test_worker_pin_unpin() {
210        let worker = WorkerEpoch::new(0);
211
212        // Starts unpinned, with a seeded quiesce timestamp.
213        assert!(!worker.is_pinned());
214        assert_eq!(worker.pinned_epoch(), EPOCH_UNPINNED);
215        let initial_quiesce = worker.last_quiesce_ns();
216        assert!(
217            initial_quiesce > 0,
218            "quiesce time should be seeded at creation"
219        );
220
221        // Pin to epoch 5.
222        worker.pin(5);
223        assert!(worker.is_pinned());
224        assert_eq!(worker.pinned_epoch(), 5);
225
226        // Unpin — stores MAX and updates quiesce timestamp.
227        worker.unpin();
228        assert!(!worker.is_pinned());
229        assert_eq!(worker.pinned_epoch(), EPOCH_UNPINNED);
230        assert!(worker.last_quiesce_ns() >= initial_quiesce);
231    }
232
233    #[test]
234    fn test_min_pinned_no_workers() {
235        let workers: Vec<WorkerEpoch> = vec![];
236        assert_eq!(min_pinned_epoch(&workers), EPOCH_UNPINNED);
237    }
238
239    #[test]
240    fn test_min_pinned_mixed() {
241        let workers: Vec<WorkerEpoch> = (0..4).map(WorkerEpoch::new).collect();
242
243        // Workers 0 and 2 are pinned, 1 and 3 are unpinned.
244        workers[0].pin(10);
245        workers[2].pin(5);
246
247        assert_eq!(min_pinned_epoch(&workers), 5);
248
249        // Unpin worker 2 — min should now be 10.
250        workers[2].unpin();
251        assert_eq!(min_pinned_epoch(&workers), 10);
252
253        // Unpin worker 0 — all unpinned, result is EPOCH_UNPINNED.
254        workers[0].unpin();
255        assert_eq!(min_pinned_epoch(&workers), EPOCH_UNPINNED);
256    }
257
258    #[test]
259    fn test_cancel_flag() {
260        let worker = WorkerEpoch::new(0);
261
262        assert!(!worker.is_cancelled());
263
264        worker.request_cancel();
265        assert!(worker.is_cancelled());
266
267        worker.clear_cancel();
268        assert!(!worker.is_cancelled());
269    }
270
271    #[test]
272    fn test_worker_epoch_alignment() {
273        assert!(
274            std::mem::align_of::<WorkerEpoch>() >= 128,
275            "WorkerEpoch must be cache-line aligned (>= 128 bytes)"
276        );
277    }
278
279    #[test]
280    fn test_pin_start_ns_records_pin_time() {
281        let worker = WorkerEpoch::new(0);
282
283        // pin_start_ns is seeded at construction (not zero).
284        let initial = worker.pin_start_ns();
285        assert!(initial > 0, "pin_start_ns should be seeded at construction");
286
287        // Sleep briefly then pin — pin_start_ns should advance.
288        std::thread::sleep(std::time::Duration::from_millis(5));
289        worker.pin(42);
290        let after_pin = worker.pin_start_ns();
291        assert!(
292            after_pin > initial,
293            "pin_start_ns should advance on pin(): initial={initial}, after={after_pin}"
294        );
295
296        // Unpin should NOT change pin_start_ns (only updates last_quiesce_ns).
297        worker.unpin();
298        let after_unpin = worker.pin_start_ns();
299        assert_eq!(
300            after_pin, after_unpin,
301            "pin_start_ns should not change on unpin()"
302        );
303    }
304}