1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::sync::OnceLock;
10use std::time::Instant;
11
12pub const EPOCH_UNPINNED: u64 = u64::MAX;
14
15pub struct EpochCounter {
20 current: AtomicU64,
21}
22
23impl Default for EpochCounter {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29const _: fn() = || {
31 fn assert<T: Send + Sync>() {}
32 assert::<EpochCounter>();
33};
34
35impl EpochCounter {
36 pub fn new() -> Self {
38 Self {
39 current: AtomicU64::new(0),
40 }
41 }
42
43 pub fn advance(&self) -> u64 {
46 self.current.fetch_add(1, Ordering::Release) + 1
47 }
48
49 pub fn current(&self) -> u64 {
51 self.current.load(Ordering::Acquire)
52 }
53}
54
55#[repr(align(128))]
64pub struct WorkerEpoch {
65 pinned: AtomicU64,
68
69 pin_start_ns: AtomicU64,
73
74 last_quiesce_ns: AtomicU64,
76
77 cancel: AtomicBool,
79
80 #[allow(dead_code)]
82 worker_id: u32,
83}
84
85const _: fn() = || {
87 fn assert<T: Send + Sync>() {}
88 assert::<WorkerEpoch>();
89};
90
91impl WorkerEpoch {
92 pub fn new(worker_id: u32) -> Self {
98 let now = monotonic_nanos();
99 Self {
100 pinned: AtomicU64::new(EPOCH_UNPINNED),
101 pin_start_ns: AtomicU64::new(now),
102 last_quiesce_ns: AtomicU64::new(now),
103 cancel: AtomicBool::new(false),
104 worker_id,
105 }
106 }
107
108 pub fn pin(&self, epoch: u64) {
111 self.pin_start_ns
112 .store(monotonic_nanos(), Ordering::Release);
113 self.pinned.store(epoch, Ordering::Release);
114 }
115
116 pub fn unpin(&self) {
119 self.pinned.store(EPOCH_UNPINNED, Ordering::Release);
120 let now_ns = monotonic_nanos();
121 self.last_quiesce_ns.store(now_ns, Ordering::Release);
122 }
123
124 pub fn is_pinned(&self) -> bool {
126 self.pinned.load(Ordering::Acquire) != EPOCH_UNPINNED
127 }
128
129 pub fn pinned_epoch(&self) -> u64 {
131 self.pinned.load(Ordering::Acquire)
132 }
133
134 pub fn pin_start_ns(&self) -> u64 {
137 self.pin_start_ns.load(Ordering::Acquire)
138 }
139
140 pub fn last_quiesce_ns(&self) -> u64 {
142 self.last_quiesce_ns.load(Ordering::Acquire)
143 }
144
145 pub fn is_cancelled(&self) -> bool {
147 self.cancel.load(Ordering::Acquire)
148 }
149
150 pub fn request_cancel(&self) {
152 self.cancel.store(true, Ordering::Release);
153 }
154
155 pub fn clear_cancel(&self) {
157 self.cancel.store(false, Ordering::Release);
158 }
159}
160
161pub fn min_pinned_epoch(workers: &[WorkerEpoch]) -> u64 {
165 workers
166 .iter()
167 .map(|w| w.pinned_epoch())
168 .min()
169 .unwrap_or(EPOCH_UNPINNED)
170}
171
172pub(crate) fn monotonic_nanos() -> u64 {
181 static EPOCH: OnceLock<Instant> = OnceLock::new();
182 let epoch = EPOCH.get_or_init(Instant::now);
183 Instant::now().duration_since(*epoch).as_nanos() as u64
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn test_epoch_advance() {
192 let counter = EpochCounter::new();
193 assert_eq!(counter.current(), 0);
194 assert_eq!(counter.advance(), 1);
195 assert_eq!(counter.advance(), 2);
196 assert_eq!(counter.advance(), 3);
197 assert_eq!(counter.current(), 3);
198
199 let mut prev = counter.current();
201 for _ in 0..100 {
202 let next = counter.advance();
203 assert!(next > prev);
204 prev = next;
205 }
206 }
207
208 #[test]
209 fn test_worker_pin_unpin() {
210 let worker = WorkerEpoch::new(0);
211
212 assert!(!worker.is_pinned());
214 assert_eq!(worker.pinned_epoch(), EPOCH_UNPINNED);
215 let initial_quiesce = worker.last_quiesce_ns();
216 assert!(
217 initial_quiesce > 0,
218 "quiesce time should be seeded at creation"
219 );
220
221 worker.pin(5);
223 assert!(worker.is_pinned());
224 assert_eq!(worker.pinned_epoch(), 5);
225
226 worker.unpin();
228 assert!(!worker.is_pinned());
229 assert_eq!(worker.pinned_epoch(), EPOCH_UNPINNED);
230 assert!(worker.last_quiesce_ns() >= initial_quiesce);
231 }
232
233 #[test]
234 fn test_min_pinned_no_workers() {
235 let workers: Vec<WorkerEpoch> = vec![];
236 assert_eq!(min_pinned_epoch(&workers), EPOCH_UNPINNED);
237 }
238
239 #[test]
240 fn test_min_pinned_mixed() {
241 let workers: Vec<WorkerEpoch> = (0..4).map(WorkerEpoch::new).collect();
242
243 workers[0].pin(10);
245 workers[2].pin(5);
246
247 assert_eq!(min_pinned_epoch(&workers), 5);
248
249 workers[2].unpin();
251 assert_eq!(min_pinned_epoch(&workers), 10);
252
253 workers[0].unpin();
255 assert_eq!(min_pinned_epoch(&workers), EPOCH_UNPINNED);
256 }
257
258 #[test]
259 fn test_cancel_flag() {
260 let worker = WorkerEpoch::new(0);
261
262 assert!(!worker.is_cancelled());
263
264 worker.request_cancel();
265 assert!(worker.is_cancelled());
266
267 worker.clear_cancel();
268 assert!(!worker.is_cancelled());
269 }
270
271 #[test]
272 fn test_worker_epoch_alignment() {
273 assert!(
274 std::mem::align_of::<WorkerEpoch>() >= 128,
275 "WorkerEpoch must be cache-line aligned (>= 128 bytes)"
276 );
277 }
278
279 #[test]
280 fn test_pin_start_ns_records_pin_time() {
281 let worker = WorkerEpoch::new(0);
282
283 let initial = worker.pin_start_ns();
285 assert!(initial > 0, "pin_start_ns should be seeded at construction");
286
287 std::thread::sleep(std::time::Duration::from_millis(5));
289 worker.pin(42);
290 let after_pin = worker.pin_start_ns();
291 assert!(
292 after_pin > initial,
293 "pin_start_ns should advance on pin(): initial={initial}, after={after_pin}"
294 );
295
296 worker.unpin();
298 let after_unpin = worker.pin_start_ns();
299 assert_eq!(
300 after_pin, after_unpin,
301 "pin_start_ns should not change on unpin()"
302 );
303 }
304}