1use std::cell::UnsafeCell;
25use std::hash::{BuildHasher, Hasher};
26use std::sync::atomic::{AtomicUsize, Ordering};
27
28use rustc_hash::FxBuildHasher;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32#[repr(u8)]
33pub enum StateOp {
34 Put = 0,
36 Delete = 1,
38}
39
40impl StateOp {
41 #[inline]
43 #[must_use]
44 pub const fn to_u8(self) -> u8 {
45 self as u8
46 }
47
48 #[inline]
50 #[must_use]
51 pub const fn from_u8(val: u8) -> Self {
52 match val {
53 1 => Self::Delete,
54 _ => Self::Put,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Copy)]
75#[repr(C)]
76pub struct StateChangelogEntry {
77 pub epoch: u64,
79 pub key_hash: u64,
81 pub mmap_offset: u64,
83 pub value_len: u32,
85 op: u8,
87 _padding: [u8; 3],
89}
90
91impl StateChangelogEntry {
92 #[inline]
94 #[must_use]
95 pub fn put(epoch: u64, key_hash: u64, mmap_offset: u64, value_len: u32) -> Self {
96 Self {
97 epoch,
98 key_hash,
99 mmap_offset,
100 value_len,
101 op: StateOp::Put.to_u8(),
102 _padding: [0; 3],
103 }
104 }
105
106 #[inline]
108 #[must_use]
109 pub fn delete(epoch: u64, key_hash: u64) -> Self {
110 Self {
111 epoch,
112 key_hash,
113 mmap_offset: 0,
114 value_len: 0,
115 op: StateOp::Delete.to_u8(),
116 _padding: [0; 3],
117 }
118 }
119
120 #[inline]
122 #[must_use]
123 pub fn from_key(key: &[u8], epoch: u64, mmap_offset: u64, value_len: u32, op: StateOp) -> Self {
124 let key_hash = Self::hash_key(key);
125 Self {
126 epoch,
127 key_hash,
128 mmap_offset,
129 value_len,
130 op: op.to_u8(),
131 _padding: [0; 3],
132 }
133 }
134
135 #[inline]
137 #[must_use]
138 pub fn op(&self) -> StateOp {
139 StateOp::from_u8(self.op)
140 }
141
142 #[inline]
144 #[must_use]
145 pub fn is_put(&self) -> bool {
146 self.op == StateOp::Put.to_u8()
147 }
148
149 #[inline]
151 #[must_use]
152 pub fn is_delete(&self) -> bool {
153 self.op == StateOp::Delete.to_u8()
154 }
155
156 #[inline]
158 #[must_use]
159 pub fn hash_key(key: &[u8]) -> u64 {
160 let hasher_builder = FxBuildHasher;
161 let mut hasher = hasher_builder.build_hasher();
162 hasher.write(key);
163 hasher.finish()
164 }
165}
166
167const _: () = assert!(std::mem::size_of::<StateChangelogEntry>() == 32);
169
170pub struct StateChangelogBuffer {
201 entries: Box<[UnsafeCell<StateChangelogEntry>]>,
210 write_pos: AtomicUsize,
212 read_pos: AtomicUsize,
214 capacity: usize,
216 current_epoch: u64,
218 total_pushed: AtomicUsize,
220 total_drained: AtomicUsize,
222 overflow_count: AtomicUsize,
224 #[cfg(debug_assertions)]
226 producer_thread_id: std::sync::Mutex<Option<std::thread::ThreadId>>,
227}
228
229impl StateChangelogBuffer {
230 pub const DEFAULT_CAPACITY: usize = 16 * 1024;
232
233 #[must_use]
241 pub fn with_capacity(capacity: usize) -> Self {
242 assert!(capacity > 0, "capacity must be > 0");
243
244 let capacity = capacity.next_power_of_two();
246
247 let zero_entry = StateChangelogEntry {
249 epoch: 0,
250 key_hash: 0,
251 mmap_offset: 0,
252 value_len: 0,
253 op: 0,
254 _padding: [0; 3],
255 };
256 let entries: Vec<UnsafeCell<StateChangelogEntry>> =
257 (0..capacity).map(|_| UnsafeCell::new(zero_entry)).collect();
258
259 Self {
260 entries: entries.into_boxed_slice(),
261 write_pos: AtomicUsize::new(0),
262 read_pos: AtomicUsize::new(0),
263 capacity,
264 current_epoch: 0,
265 total_pushed: AtomicUsize::new(0),
266 total_drained: AtomicUsize::new(0),
267 overflow_count: AtomicUsize::new(0),
268 #[cfg(debug_assertions)]
269 producer_thread_id: std::sync::Mutex::new(None),
270 }
271 }
272
273 #[must_use]
275 pub fn new() -> Self {
276 Self::with_capacity(Self::DEFAULT_CAPACITY)
277 }
278
279 pub fn set_epoch(&mut self, epoch: u64) {
281 self.current_epoch = epoch;
282 }
283
284 #[must_use]
286 pub fn epoch(&self) -> u64 {
287 self.current_epoch
288 }
289
290 pub fn advance_epoch(&mut self) -> u64 {
292 self.current_epoch += 1;
293 self.current_epoch
294 }
295
296 #[inline]
300 #[allow(clippy::missing_panics_doc)] pub fn push(&self, entry: StateChangelogEntry) -> bool {
302 #[cfg(debug_assertions)]
303 {
304 let current = std::thread::current().id();
305 let mut guard = self.producer_thread_id.lock().unwrap();
306 if let Some(expected) = *guard {
307 debug_assert_eq!(
308 current, expected,
309 "SPSC violation: push() called from a different thread"
310 );
311 } else {
312 *guard = Some(current);
313 }
314 }
315
316 let write_pos = self.write_pos.load(Ordering::Relaxed);
317 let read_pos = self.read_pos.load(Ordering::Acquire);
318
319 let next_pos = (write_pos + 1) & (self.capacity - 1);
321 if next_pos == read_pos {
322 self.overflow_count.fetch_add(1, Ordering::Relaxed);
323 return false;
324 }
325
326 #[allow(unsafe_code)]
331 unsafe {
332 self.entries[write_pos].get().write(entry);
333 }
334
335 self.write_pos.store(next_pos, Ordering::Release);
337 self.total_pushed.fetch_add(1, Ordering::Relaxed);
338
339 true
340 }
341
342 #[inline]
344 pub fn push_put(&self, key: &[u8], mmap_offset: u64, value_len: u32) -> bool {
345 let entry = StateChangelogEntry::from_key(
346 key,
347 self.current_epoch,
348 mmap_offset,
349 value_len,
350 StateOp::Put,
351 );
352 self.push(entry)
353 }
354
355 #[inline]
357 pub fn push_delete(&self, key: &[u8]) -> bool {
358 let entry = StateChangelogEntry::from_key(key, self.current_epoch, 0, 0, StateOp::Delete);
359 self.push(entry)
360 }
361
362 #[inline]
364 pub fn pop(&self) -> Option<StateChangelogEntry> {
365 let read_pos = self.read_pos.load(Ordering::Relaxed);
366 let write_pos = self.write_pos.load(Ordering::Acquire);
367
368 if read_pos == write_pos {
369 return None;
370 }
371
372 #[allow(unsafe_code)]
378 let entry = unsafe { self.entries[read_pos].get().read() };
379
380 let next_pos = (read_pos + 1) & (self.capacity - 1);
382 self.read_pos.store(next_pos, Ordering::Release);
383 self.total_drained.fetch_add(1, Ordering::Relaxed);
384
385 Some(entry)
386 }
387
388 pub fn drain(&self, max_count: usize) -> impl Iterator<Item = StateChangelogEntry> + '_ {
392 DrainIter {
393 buffer: self,
394 remaining: max_count,
395 }
396 }
397
398 pub fn drain_all(&self) -> impl Iterator<Item = StateChangelogEntry> + '_ {
400 self.drain(usize::MAX)
401 }
402
403 #[must_use]
405 pub fn len(&self) -> usize {
406 let write_pos = self.write_pos.load(Ordering::Acquire);
407 let read_pos = self.read_pos.load(Ordering::Acquire);
408 write_pos.wrapping_sub(read_pos) & (self.capacity - 1)
409 }
410
411 #[must_use]
413 pub fn is_empty(&self) -> bool {
414 self.write_pos.load(Ordering::Acquire) == self.read_pos.load(Ordering::Acquire)
415 }
416
417 #[must_use]
419 pub fn is_full(&self) -> bool {
420 let write_pos = self.write_pos.load(Ordering::Acquire);
421 let read_pos = self.read_pos.load(Ordering::Acquire);
422 ((write_pos + 1) & (self.capacity - 1)) == read_pos
423 }
424
425 #[must_use]
427 pub fn capacity(&self) -> usize {
428 self.capacity
429 }
430
431 #[must_use]
433 pub fn available(&self) -> usize {
434 self.capacity - self.len() - 1
435 }
436
437 #[must_use]
439 pub fn total_pushed(&self) -> usize {
440 self.total_pushed.load(Ordering::Relaxed)
441 }
442
443 #[must_use]
445 pub fn total_drained(&self) -> usize {
446 self.total_drained.load(Ordering::Relaxed)
447 }
448
449 #[must_use]
451 pub fn overflow_count(&self) -> usize {
452 self.overflow_count.load(Ordering::Relaxed)
453 }
454
455 pub fn clear(&self) {
457 let write_pos = self.write_pos.load(Ordering::Acquire);
459 self.read_pos.store(write_pos, Ordering::Release);
460 }
461
462 #[must_use]
466 pub fn checkpoint_barrier(&self) -> (u64, usize) {
467 (self.current_epoch, self.write_pos.load(Ordering::Acquire))
468 }
469}
470
471impl Default for StateChangelogBuffer {
472 fn default() -> Self {
473 Self::new()
474 }
475}
476
477#[allow(unsafe_code)]
482unsafe impl Send for StateChangelogBuffer {}
483
484#[allow(unsafe_code)]
491unsafe impl Sync for StateChangelogBuffer {}
492
493struct DrainIter<'a> {
495 buffer: &'a StateChangelogBuffer,
496 remaining: usize,
497}
498
499impl Iterator for DrainIter<'_> {
500 type Item = StateChangelogEntry;
501
502 fn next(&mut self) -> Option<Self::Item> {
503 if self.remaining == 0 {
504 return None;
505 }
506 self.remaining -= 1;
507 self.buffer.pop()
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514
515 #[test]
516 fn test_state_op_roundtrip() {
517 assert_eq!(StateOp::from_u8(StateOp::Put.to_u8()), StateOp::Put);
518 assert_eq!(StateOp::from_u8(StateOp::Delete.to_u8()), StateOp::Delete);
519 assert_eq!(StateOp::from_u8(255), StateOp::Put); }
521
522 #[test]
523 fn test_changelog_entry_size() {
524 assert_eq!(std::mem::size_of::<StateChangelogEntry>(), 32);
525 }
526
527 #[test]
528 fn test_changelog_entry_put() {
529 let entry = StateChangelogEntry::put(1, 12345, 100, 50);
530 assert_eq!(entry.epoch, 1);
531 assert_eq!(entry.key_hash, 12345);
532 assert_eq!(entry.mmap_offset, 100);
533 assert_eq!(entry.value_len, 50);
534 assert!(entry.is_put());
535 assert!(!entry.is_delete());
536 }
537
538 #[test]
539 fn test_changelog_entry_delete() {
540 let entry = StateChangelogEntry::delete(2, 67890);
541 assert_eq!(entry.epoch, 2);
542 assert_eq!(entry.key_hash, 67890);
543 assert_eq!(entry.mmap_offset, 0);
544 assert_eq!(entry.value_len, 0);
545 assert!(entry.is_delete());
546 assert!(!entry.is_put());
547 }
548
549 #[test]
550 fn test_changelog_entry_from_key() {
551 let entry = StateChangelogEntry::from_key(b"test_key", 5, 200, 75, StateOp::Put);
552 assert_eq!(entry.epoch, 5);
553 assert_eq!(entry.mmap_offset, 200);
554 assert_eq!(entry.value_len, 75);
555 assert!(entry.is_put());
556
557 let entry2 = StateChangelogEntry::from_key(b"test_key", 6, 300, 80, StateOp::Delete);
559 assert_eq!(entry.key_hash, entry2.key_hash);
560 }
561
562 #[test]
563 fn test_buffer_basic_operations() {
564 let buffer = StateChangelogBuffer::with_capacity(16);
565 assert!(buffer.is_empty());
566 assert_eq!(buffer.len(), 0);
567 assert_eq!(buffer.capacity(), 16);
568
569 let entry = StateChangelogEntry::put(1, 100, 0, 10);
570 assert!(buffer.push(entry));
571 assert!(!buffer.is_empty());
572 assert_eq!(buffer.len(), 1);
573
574 let popped = buffer.pop().unwrap();
575 assert_eq!(popped.key_hash, 100);
576 assert!(buffer.is_empty());
577 }
578
579 #[test]
580 fn test_buffer_full() {
581 let buffer = StateChangelogBuffer::with_capacity(4);
582
583 for i in 0..3 {
585 assert!(buffer.push(StateChangelogEntry::put(1, i, 0, 10)));
586 }
587
588 assert!(buffer.is_full());
590 assert!(!buffer.push(StateChangelogEntry::put(1, 999, 0, 10)));
591 assert_eq!(buffer.overflow_count(), 1);
592 }
593
594 #[test]
595 fn test_buffer_drain() {
596 let buffer = StateChangelogBuffer::with_capacity(16);
597
598 for i in 0..5 {
599 buffer.push(StateChangelogEntry::put(1, i, 0, 10));
600 }
601
602 assert_eq!(buffer.len(), 5);
603
604 let drained: Vec<_> = buffer.drain(3).collect();
606 assert_eq!(drained.len(), 3);
607 assert_eq!(buffer.len(), 2);
608
609 let remaining: Vec<_> = buffer.drain_all().collect();
611 assert_eq!(remaining.len(), 2);
612 assert!(buffer.is_empty());
613 }
614
615 #[test]
616 fn test_buffer_epoch() {
617 let mut buffer = StateChangelogBuffer::with_capacity(16);
618 assert_eq!(buffer.epoch(), 0);
619
620 buffer.set_epoch(10);
621 assert_eq!(buffer.epoch(), 10);
622
623 assert_eq!(buffer.advance_epoch(), 11);
624 assert_eq!(buffer.epoch(), 11);
625 }
626
627 #[test]
628 fn test_buffer_push_helpers() {
629 let buffer = StateChangelogBuffer::with_capacity(16);
630
631 assert!(buffer.push_put(b"key1", 100, 50));
632 assert!(buffer.push_delete(b"key2"));
633
634 let entries: Vec<_> = buffer.drain_all().collect();
635 assert_eq!(entries.len(), 2);
636 assert!(entries[0].is_put());
637 assert!(entries[1].is_delete());
638 }
639
640 #[test]
641 fn test_buffer_clear() {
642 let buffer = StateChangelogBuffer::with_capacity(16);
643
644 for i in 0..5 {
645 buffer.push(StateChangelogEntry::put(1, i, 0, 10));
646 }
647 assert_eq!(buffer.len(), 5);
648
649 buffer.clear();
650 assert!(buffer.is_empty());
651 }
652
653 #[test]
654 fn test_buffer_checkpoint_barrier() {
655 let mut buffer = StateChangelogBuffer::with_capacity(16);
656 buffer.set_epoch(42);
657
658 buffer.push(StateChangelogEntry::put(42, 1, 0, 10));
659 buffer.push(StateChangelogEntry::put(42, 2, 0, 10));
660
661 let (epoch, pos) = buffer.checkpoint_barrier();
662 assert_eq!(epoch, 42);
663 assert_eq!(pos, 2);
664 }
665
666 #[test]
667 fn test_buffer_metrics() {
668 let buffer = StateChangelogBuffer::with_capacity(8);
669
670 for i in 0..5 {
671 buffer.push(StateChangelogEntry::put(1, i, 0, 10));
672 }
673 assert_eq!(buffer.total_pushed(), 5);
674 assert_eq!(buffer.total_drained(), 0);
675
676 let _ = buffer.pop();
677 let _ = buffer.pop();
678 assert_eq!(buffer.total_drained(), 2);
679 }
680
681 #[test]
682 fn test_entry_from_key() {
683 let key_hash = StateChangelogEntry::hash_key(b"mykey");
684
685 let put = StateChangelogEntry::put(100, key_hash, 500, 75);
686 assert_eq!(put.epoch, 100);
687 assert_eq!(put.mmap_offset, 500);
688 assert_eq!(put.value_len, 75);
689 assert!(put.is_put());
690
691 let delete = StateChangelogEntry::delete(100, key_hash);
692 assert_eq!(delete.epoch, 100);
693 assert!(delete.is_delete());
694
695 assert_eq!(put.key_hash, delete.key_hash);
697 }
698
699 #[test]
700 fn test_buffer_wraparound() {
701 let buffer = StateChangelogBuffer::with_capacity(4);
702
703 for iteration in 0..5 {
705 for i in 0..3 {
706 assert!(
707 buffer.push(StateChangelogEntry::put(1, i + iteration * 10, 0, 10)),
708 "Failed at iteration {iteration}, entry {i}"
709 );
710 }
711
712 let drained: Vec<_> = buffer.drain_all().collect();
713 assert_eq!(drained.len(), 3, "Failed at iteration {iteration}");
714 }
715 }
716
717 #[test]
718 fn test_key_hash_consistency() {
719 let key = b"consistent_key";
720 let hash1 = StateChangelogEntry::hash_key(key);
721 let hash2 = StateChangelogEntry::hash_key(key);
722 assert_eq!(hash1, hash2);
723
724 let different_key = b"different_key";
725 let hash3 = StateChangelogEntry::hash_key(different_key);
726 assert_ne!(hash1, hash3);
727 }
728}