1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::sync::OnceLock;
10use std::time::Instant;
11
12pub const EPOCH_UNPINNED: u64 = u64::MAX;
14
15pub struct EpochCounter {
20 current: AtomicU64,
21}
22
23impl Default for EpochCounter {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29const _: fn() = || {
31 fn assert<T: Send + Sync>() {}
32 assert::<EpochCounter>();
33};
34
35impl EpochCounter {
36 pub fn new() -> Self {
38 Self {
39 current: AtomicU64::new(0),
40 }
41 }
42
43 pub fn advance(&self) -> u64 {
46 self.current.fetch_add(1, Ordering::Release) + 1
47 }
48
49 pub fn current(&self) -> u64 {
51 self.current.load(Ordering::Acquire)
52 }
53}
54
55#[repr(align(128))]
64pub struct WorkerEpoch {
65 pinned: AtomicU64,
68
69 pin_start_ns: AtomicU64,
73
74 last_quiesce_ns: AtomicU64,
76
77 cancel: AtomicBool,
79
80 #[allow(dead_code)]
82 worker_id: u32,
83}
84
85const _: fn() = || {
87 fn assert<T: Send + Sync>() {}
88 assert::<WorkerEpoch>();
89};
90
91impl WorkerEpoch {
92 pub fn new(worker_id: u32) -> Self {
98 let now = monotonic_nanos();
99 Self {
100 pinned: AtomicU64::new(EPOCH_UNPINNED),
101 pin_start_ns: AtomicU64::new(now),
102 last_quiesce_ns: AtomicU64::new(now),
103 cancel: AtomicBool::new(false),
104 worker_id,
105 }
106 }
107
108 pub fn pin(&self, epoch: u64) {
111 self.pin_start_ns
112 .store(monotonic_nanos(), Ordering::Release);
113 self.pinned.store(epoch, Ordering::Release);
114 }
115
116 pub fn unpin(&self) {
119 self.pinned.store(EPOCH_UNPINNED, Ordering::Release);
120 let now_ns = monotonic_nanos();
121 self.last_quiesce_ns.store(now_ns, Ordering::Release);
122 }
123
124 pub fn is_pinned(&self) -> bool {
126 self.pinned.load(Ordering::Acquire) != EPOCH_UNPINNED
127 }
128
129 pub fn pinned_epoch(&self) -> u64 {
131 self.pinned.load(Ordering::Acquire)
132 }
133
134 pub fn pin_start_ns(&self) -> u64 {
137 self.pin_start_ns.load(Ordering::Acquire)
138 }
139
140 pub fn last_quiesce_ns(&self) -> u64 {
142 self.last_quiesce_ns.load(Ordering::Acquire)
143 }
144
145 pub fn is_cancelled(&self) -> bool {
147 self.cancel.load(Ordering::Acquire)
148 }
149
150 pub fn request_cancel(&self) {
152 self.cancel.store(true, Ordering::Release);
153 }
154
155 pub fn clear_cancel(&self) {
157 self.cancel.store(false, Ordering::Release);
158 }
159
160 pub fn pin_snapshot(&self) -> Option<(u64, u64)> {
167 for _ in 0..4 {
168 let epoch1 = self.pinned.load(Ordering::Acquire);
169 if epoch1 == EPOCH_UNPINNED {
170 return None;
171 }
172 let start_ns = self.pin_start_ns.load(Ordering::Acquire);
173 let epoch2 = self.pinned.load(Ordering::Acquire);
174 if epoch1 == epoch2 {
175 return Some((epoch1, start_ns));
176 }
177 }
179 None
181 }
182}
183
184pub fn min_pinned_epoch(workers: &[WorkerEpoch]) -> u64 {
188 workers
189 .iter()
190 .map(|w| w.pinned_epoch())
191 .min()
192 .unwrap_or(EPOCH_UNPINNED)
193}
194
195pub(crate) fn monotonic_nanos() -> u64 {
204 static EPOCH: OnceLock<Instant> = OnceLock::new();
205 let epoch = EPOCH.get_or_init(Instant::now);
206 Instant::now().duration_since(*epoch).as_nanos() as u64
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 #[test]
214 fn test_epoch_advance() {
215 let counter = EpochCounter::new();
216 assert_eq!(counter.current(), 0);
217 assert_eq!(counter.advance(), 1);
218 assert_eq!(counter.advance(), 2);
219 assert_eq!(counter.advance(), 3);
220 assert_eq!(counter.current(), 3);
221
222 let mut prev = counter.current();
224 for _ in 0..100 {
225 let next = counter.advance();
226 assert!(next > prev);
227 prev = next;
228 }
229 }
230
231 #[test]
232 fn test_worker_pin_unpin() {
233 let worker = WorkerEpoch::new(0);
234
235 assert!(!worker.is_pinned());
237 assert_eq!(worker.pinned_epoch(), EPOCH_UNPINNED);
238 let initial_quiesce = worker.last_quiesce_ns();
239 assert!(
240 initial_quiesce > 0,
241 "quiesce time should be seeded at creation"
242 );
243
244 worker.pin(5);
246 assert!(worker.is_pinned());
247 assert_eq!(worker.pinned_epoch(), 5);
248
249 worker.unpin();
251 assert!(!worker.is_pinned());
252 assert_eq!(worker.pinned_epoch(), EPOCH_UNPINNED);
253 assert!(worker.last_quiesce_ns() >= initial_quiesce);
254 }
255
256 #[test]
257 fn test_min_pinned_no_workers() {
258 let workers: Vec<WorkerEpoch> = vec![];
259 assert_eq!(min_pinned_epoch(&workers), EPOCH_UNPINNED);
260 }
261
262 #[test]
263 fn test_min_pinned_mixed() {
264 let workers: Vec<WorkerEpoch> = (0..4).map(WorkerEpoch::new).collect();
265
266 workers[0].pin(10);
268 workers[2].pin(5);
269
270 assert_eq!(min_pinned_epoch(&workers), 5);
271
272 workers[2].unpin();
274 assert_eq!(min_pinned_epoch(&workers), 10);
275
276 workers[0].unpin();
278 assert_eq!(min_pinned_epoch(&workers), EPOCH_UNPINNED);
279 }
280
281 #[test]
282 fn test_cancel_flag() {
283 let worker = WorkerEpoch::new(0);
284
285 assert!(!worker.is_cancelled());
286
287 worker.request_cancel();
288 assert!(worker.is_cancelled());
289
290 worker.clear_cancel();
291 assert!(!worker.is_cancelled());
292 }
293
294 #[test]
295 fn test_worker_epoch_alignment() {
296 assert!(
297 std::mem::align_of::<WorkerEpoch>() >= 128,
298 "WorkerEpoch must be cache-line aligned (>= 128 bytes)"
299 );
300 }
301
302 #[test]
303 fn test_pin_start_ns_records_pin_time() {
304 let worker = WorkerEpoch::new(0);
305
306 let initial = worker.pin_start_ns();
308 assert!(initial > 0, "pin_start_ns should be seeded at construction");
309
310 std::thread::sleep(std::time::Duration::from_millis(5));
312 worker.pin(42);
313 let after_pin = worker.pin_start_ns();
314 assert!(
315 after_pin > initial,
316 "pin_start_ns should advance on pin(): initial={initial}, after={after_pin}"
317 );
318
319 worker.unpin();
321 let after_unpin = worker.pin_start_ns();
322 assert_eq!(
323 after_pin, after_unpin,
324 "pin_start_ns should not change on unpin()"
325 );
326 }
327}