Skip to main content

murk_engine/
epoch.rs

1//! Epoch-based reclamation primitives for RealtimeAsync mode.
2//!
3//! Provides [`EpochCounter`] (global monotonic epoch) and [`WorkerEpoch`]
4//! (per-worker pin/unpin state with cache-line padding). These are the
5//! building blocks for the epoch reclamation protocol described in
6//! the [Architecture documentation](https://tachyon-beep.github.io/murk/architecture.html).
7
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::sync::OnceLock;
10use std::time::Instant;
11
12/// Sentinel value meaning "this worker is not pinned to any epoch."
13pub const EPOCH_UNPINNED: u64 = u64::MAX;
14
15/// Global epoch counter, incremented by TickEngine at each snapshot publication.
16///
17/// Monotonically increasing. Never wraps in practice (u64 overflow at 60 Hz
18/// would take ~9.7 billion years).
19pub struct EpochCounter {
20    current: AtomicU64,
21}
22
23impl Default for EpochCounter {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29// Compile-time assertion: EpochCounter must be Send + Sync.
30const _: fn() = || {
31    fn assert<T: Send + Sync>() {}
32    assert::<EpochCounter>();
33};
34
35impl EpochCounter {
36    /// Create a new epoch counter starting at 0.
37    pub fn new() -> Self {
38        Self {
39            current: AtomicU64::new(0),
40        }
41    }
42
43    /// Advance the epoch. Called by TickEngine after publishing a snapshot.
44    /// Returns the new epoch value.
45    pub fn advance(&self) -> u64 {
46        self.current.fetch_add(1, Ordering::Release) + 1
47    }
48
49    /// Read the current epoch value.
50    pub fn current(&self) -> u64 {
51        self.current.load(Ordering::Acquire)
52    }
53}
54
55/// Per-worker epoch state, padded to avoid false sharing.
56///
57/// Each egress worker holds one of these. The TickEngine reads all `pinned`
58/// fields during reclamation checks; without padding, adjacent workers'
59/// writes would invalidate each other's cache lines.
60///
61/// 128-byte alignment covers both 64-byte (x86) and 128-byte (Apple M-series)
62/// cache line sizes.
63#[repr(align(128))]
64pub struct WorkerEpoch {
65    /// The epoch this worker is currently pinned to.
66    /// `EPOCH_UNPINNED` means not holding any generation.
67    pinned: AtomicU64,
68
69    /// Monotonic timestamp (nanos) when `pin()` was called.
70    /// Used for stalled-worker detection: the tick thread computes
71    /// `now - pin_start_ns` to get the actual pin hold duration.
72    pin_start_ns: AtomicU64,
73
74    /// Monotonic timestamp (nanos) of the last unpin.
75    last_quiesce_ns: AtomicU64,
76
77    /// Cooperative cancellation flag.
78    cancel: AtomicBool,
79
80    /// Worker index (for diagnostics in stalled-worker reporting).
81    #[allow(dead_code)]
82    worker_id: u32,
83}
84
85// Compile-time assertion: WorkerEpoch must be Send + Sync.
86const _: fn() = || {
87    fn assert<T: Send + Sync>() {}
88    assert::<WorkerEpoch>();
89};
90
91impl WorkerEpoch {
92    /// Create a new worker epoch in the unpinned state.
93    ///
94    /// `last_quiesce_ns` is seeded to the current monotonic time so
95    /// that the first pin is never misclassified as a stall due to
96    /// elapsed process uptime.
97    pub fn new(worker_id: u32) -> Self {
98        let now = monotonic_nanos();
99        Self {
100            pinned: AtomicU64::new(EPOCH_UNPINNED),
101            pin_start_ns: AtomicU64::new(now),
102            last_quiesce_ns: AtomicU64::new(now),
103            cancel: AtomicBool::new(false),
104            worker_id,
105        }
106    }
107
108    /// Pin this worker to the given epoch before accessing a snapshot.
109    /// Records the pin-start timestamp for stall detection.
110    pub fn pin(&self, epoch: u64) {
111        self.pin_start_ns
112            .store(monotonic_nanos(), Ordering::Release);
113        self.pinned.store(epoch, Ordering::Release);
114    }
115
116    /// Unpin this worker after finishing with the snapshot.
117    /// Updates the quiescence timestamp.
118    pub fn unpin(&self) {
119        self.pinned.store(EPOCH_UNPINNED, Ordering::Release);
120        let now_ns = monotonic_nanos();
121        self.last_quiesce_ns.store(now_ns, Ordering::Release);
122    }
123
124    /// Whether this worker is currently pinned to an epoch.
125    pub fn is_pinned(&self) -> bool {
126        self.pinned.load(Ordering::Acquire) != EPOCH_UNPINNED
127    }
128
129    /// The epoch this worker is pinned to, or `EPOCH_UNPINNED`.
130    pub fn pinned_epoch(&self) -> u64 {
131        self.pinned.load(Ordering::Acquire)
132    }
133
134    /// Monotonic nanoseconds when `pin()` was last called.
135    /// Used by the tick thread to measure actual pin hold duration.
136    pub fn pin_start_ns(&self) -> u64 {
137        self.pin_start_ns.load(Ordering::Acquire)
138    }
139
140    /// Monotonic nanoseconds of the last unpin event.
141    pub fn last_quiesce_ns(&self) -> u64 {
142        self.last_quiesce_ns.load(Ordering::Acquire)
143    }
144
145    /// Check if cancellation has been requested.
146    pub fn is_cancelled(&self) -> bool {
147        self.cancel.load(Ordering::Acquire)
148    }
149
150    /// Request cancellation (called by TickEngine or shutdown).
151    pub fn request_cancel(&self) {
152        self.cancel.store(true, Ordering::Release);
153    }
154
155    /// Clear cancellation flag (called when worker is restarted/recycled).
156    pub fn clear_cancel(&self) {
157        self.cancel.store(false, Ordering::Release);
158    }
159
160    /// Read a consistent (pinned_epoch, pin_start_ns) snapshot.
161    ///
162    /// Uses a seqlock-like double-check: read pinned, read pin_start_ns,
163    /// re-read pinned. If the pinned value changed between the two reads
164    /// (concurrent unpin/repin), retry. Returns `None` if the worker is
165    /// not pinned, or `Some((epoch, pin_start_ns))` for a consistent pair.
166    pub fn pin_snapshot(&self) -> Option<(u64, u64)> {
167        for _ in 0..4 {
168            let epoch1 = self.pinned.load(Ordering::Acquire);
169            if epoch1 == EPOCH_UNPINNED {
170                return None;
171            }
172            let start_ns = self.pin_start_ns.load(Ordering::Acquire);
173            let epoch2 = self.pinned.load(Ordering::Acquire);
174            if epoch1 == epoch2 {
175                return Some((epoch1, start_ns));
176            }
177            // Pinned value changed mid-read; retry.
178        }
179        // After retries, conservatively treat as unpinned (don't false-positive).
180        None
181    }
182}
183
184/// Compute the minimum pinned epoch across all workers.
185///
186/// Returns [`EPOCH_UNPINNED`] if no worker is pinned (all reclaimable).
187pub fn min_pinned_epoch(workers: &[WorkerEpoch]) -> u64 {
188    workers
189        .iter()
190        .map(|w| w.pinned_epoch())
191        .min()
192        .unwrap_or(EPOCH_UNPINNED)
193}
194
195/// Returns monotonic nanoseconds since an arbitrary process-local epoch.
196///
197/// Uses `OnceLock<Instant>` to lazily initialise a baseline. NOT wall-clock
198/// time — only for relative duration comparisons (stall detection).
199///
200/// This is the single source of truth for monotonic timestamps in the
201/// engine. All callers (epoch, tick_thread, egress) must use this
202/// function to avoid clock-skew between independent `OnceLock` statics.
203pub(crate) fn monotonic_nanos() -> u64 {
204    static EPOCH: OnceLock<Instant> = OnceLock::new();
205    let epoch = EPOCH.get_or_init(Instant::now);
206    Instant::now().duration_since(*epoch).as_nanos() as u64
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    #[test]
214    fn test_epoch_advance() {
215        let counter = EpochCounter::new();
216        assert_eq!(counter.current(), 0);
217        assert_eq!(counter.advance(), 1);
218        assert_eq!(counter.advance(), 2);
219        assert_eq!(counter.advance(), 3);
220        assert_eq!(counter.current(), 3);
221
222        // Monotonicity: each advance returns a strictly larger value.
223        let mut prev = counter.current();
224        for _ in 0..100 {
225            let next = counter.advance();
226            assert!(next > prev);
227            prev = next;
228        }
229    }
230
231    #[test]
232    fn test_worker_pin_unpin() {
233        let worker = WorkerEpoch::new(0);
234
235        // Starts unpinned, with a seeded quiesce timestamp.
236        assert!(!worker.is_pinned());
237        assert_eq!(worker.pinned_epoch(), EPOCH_UNPINNED);
238        let initial_quiesce = worker.last_quiesce_ns();
239        assert!(
240            initial_quiesce > 0,
241            "quiesce time should be seeded at creation"
242        );
243
244        // Pin to epoch 5.
245        worker.pin(5);
246        assert!(worker.is_pinned());
247        assert_eq!(worker.pinned_epoch(), 5);
248
249        // Unpin — stores MAX and updates quiesce timestamp.
250        worker.unpin();
251        assert!(!worker.is_pinned());
252        assert_eq!(worker.pinned_epoch(), EPOCH_UNPINNED);
253        assert!(worker.last_quiesce_ns() >= initial_quiesce);
254    }
255
256    #[test]
257    fn test_min_pinned_no_workers() {
258        let workers: Vec<WorkerEpoch> = vec![];
259        assert_eq!(min_pinned_epoch(&workers), EPOCH_UNPINNED);
260    }
261
262    #[test]
263    fn test_min_pinned_mixed() {
264        let workers: Vec<WorkerEpoch> = (0..4).map(WorkerEpoch::new).collect();
265
266        // Workers 0 and 2 are pinned, 1 and 3 are unpinned.
267        workers[0].pin(10);
268        workers[2].pin(5);
269
270        assert_eq!(min_pinned_epoch(&workers), 5);
271
272        // Unpin worker 2 — min should now be 10.
273        workers[2].unpin();
274        assert_eq!(min_pinned_epoch(&workers), 10);
275
276        // Unpin worker 0 — all unpinned, result is EPOCH_UNPINNED.
277        workers[0].unpin();
278        assert_eq!(min_pinned_epoch(&workers), EPOCH_UNPINNED);
279    }
280
281    #[test]
282    fn test_cancel_flag() {
283        let worker = WorkerEpoch::new(0);
284
285        assert!(!worker.is_cancelled());
286
287        worker.request_cancel();
288        assert!(worker.is_cancelled());
289
290        worker.clear_cancel();
291        assert!(!worker.is_cancelled());
292    }
293
294    #[test]
295    fn test_worker_epoch_alignment() {
296        assert!(
297            std::mem::align_of::<WorkerEpoch>() >= 128,
298            "WorkerEpoch must be cache-line aligned (>= 128 bytes)"
299        );
300    }
301
302    #[test]
303    fn test_pin_start_ns_records_pin_time() {
304        let worker = WorkerEpoch::new(0);
305
306        // pin_start_ns is seeded at construction (not zero).
307        let initial = worker.pin_start_ns();
308        assert!(initial > 0, "pin_start_ns should be seeded at construction");
309
310        // Sleep briefly then pin — pin_start_ns should advance.
311        std::thread::sleep(std::time::Duration::from_millis(5));
312        worker.pin(42);
313        let after_pin = worker.pin_start_ns();
314        assert!(
315            after_pin > initial,
316            "pin_start_ns should advance on pin(): initial={initial}, after={after_pin}"
317        );
318
319        // Unpin should NOT change pin_start_ns (only updates last_quiesce_ns).
320        worker.unpin();
321        let after_unpin = worker.pin_start_ns();
322        assert_eq!(
323            after_pin, after_unpin,
324            "pin_start_ns should not change on unpin()"
325        );
326    }
327}