1extern crate alloc;
24use alloc::collections::BTreeMap;
25use alloc::sync::Arc;
26use alloc::vec::Vec;
27use core::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
28
29use crate::wire_types::SequenceNumber;
30
31#[cfg(feature = "inspect")]
32use alloc::borrow::ToOwned;
33
34#[cfg(feature = "inspect")]
35fn dispatch_rtps_tap(label: &str, sn: SequenceNumber, payload: Vec<u8>) {
36 let ts_ns = std::time::SystemTime::now()
37 .duration_since(std::time::UNIX_EPOCH)
38 .map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
39 .unwrap_or(0);
40 #[allow(clippy::cast_sign_loss)]
41 let corr = sn.0 as u64;
42 let frame = zerodds_inspect_endpoint::Frame::rtps(label.to_owned(), ts_ns, corr, payload);
43 zerodds_inspect_endpoint::tap::dispatch(&frame);
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ChangeKind {
49 Alive,
51 AliveFiltered,
56 NotAliveDisposed,
58 NotAliveUnregistered,
60 NotAliveDisposedUnregistered,
62}
63
64impl ChangeKind {
65 #[must_use]
69 pub fn is_relevant(self) -> bool {
70 !matches!(self, Self::AliveFiltered)
71 }
72
73 #[must_use]
75 pub fn is_alive_kind(self) -> bool {
76 matches!(self, Self::Alive | Self::AliveFiltered)
77 }
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct CacheChange {
89 pub sequence_number: SequenceNumber,
91 pub payload: Arc<[u8]>,
93 pub kind: ChangeKind,
95 pub key_hash: Option<[u8; 16]>,
101}
102
103impl CacheChange {
104 #[must_use]
108 pub fn alive(sn: SequenceNumber, payload: Vec<u8>) -> Self {
109 Self::alive_arc(sn, Arc::from(payload))
110 }
111
112 #[must_use]
121 pub(crate) fn alive_arc(sn: SequenceNumber, payload: Arc<[u8]>) -> Self {
122 Self {
123 sequence_number: sn,
124 payload,
125 kind: ChangeKind::Alive,
126 key_hash: None,
127 }
128 }
129
130 #[must_use]
134 pub fn lifecycle(sn: SequenceNumber, payload: Vec<u8>, kind: ChangeKind) -> Self {
135 Self {
136 sequence_number: sn,
137 payload: Arc::from(payload),
138 kind,
139 key_hash: None,
140 }
141 }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum HistoryKind {
147 KeepAll,
152 KeepLast {
156 depth: usize,
158 },
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163#[non_exhaustive]
164pub enum CacheError {
165 CapacityExceeded,
167 DuplicateSequenceNumber,
169 ZeroDepth,
172}
173
174const STATS_SENTINEL_NO_SN: i64 = i64::MIN;
178
179#[derive(Debug)]
201pub struct HistoryCacheStats {
202 pub len: AtomicUsize,
204 pub evicted: AtomicU64,
206 pub max_sn: AtomicI64,
208 pub min_sn: AtomicI64,
210}
211
212impl Default for HistoryCacheStats {
213 fn default() -> Self {
214 Self {
215 len: AtomicUsize::new(0),
216 evicted: AtomicU64::new(0),
217 max_sn: AtomicI64::new(STATS_SENTINEL_NO_SN),
218 min_sn: AtomicI64::new(STATS_SENTINEL_NO_SN),
219 }
220 }
221}
222
223impl HistoryCacheStats {
224 #[must_use]
229 pub fn snapshot(&self) -> HistoryCacheSnapshot {
230 HistoryCacheSnapshot {
231 len: self.len.load(Ordering::Acquire),
232 evicted: self.evicted.load(Ordering::Acquire),
233 max_sn: decode_sn_atom(self.max_sn.load(Ordering::Acquire)),
234 min_sn: decode_sn_atom(self.min_sn.load(Ordering::Acquire)),
235 }
236 }
237}
238
239impl Clone for HistoryCacheStats {
240 fn clone(&self) -> Self {
241 Self {
242 len: AtomicUsize::new(self.len.load(Ordering::Acquire)),
243 evicted: AtomicU64::new(self.evicted.load(Ordering::Acquire)),
244 max_sn: AtomicI64::new(self.max_sn.load(Ordering::Acquire)),
245 min_sn: AtomicI64::new(self.min_sn.load(Ordering::Acquire)),
246 }
247 }
248}
249
250fn decode_sn_atom(v: i64) -> Option<SequenceNumber> {
251 if v == STATS_SENTINEL_NO_SN {
252 None
253 } else {
254 Some(SequenceNumber(v))
255 }
256}
257
258fn encode_sn_atom(sn: Option<SequenceNumber>) -> i64 {
259 sn.map_or(STATS_SENTINEL_NO_SN, |s| s.0)
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
266pub struct HistoryCacheSnapshot {
267 pub len: usize,
269 pub evicted: u64,
271 pub max_sn: Option<SequenceNumber>,
273 pub min_sn: Option<SequenceNumber>,
275}
276
277#[derive(Debug)]
287pub struct HistoryCache {
288 changes: BTreeMap<SequenceNumber, CacheChange>,
289 kind: HistoryKind,
290 max_samples: usize,
291 evicted_count: u64,
292 stats: Arc<HistoryCacheStats>,
293 #[cfg(feature = "inspect")]
296 inspect_label: Option<alloc::string::String>,
297}
298
299impl Clone for HistoryCache {
300 fn clone(&self) -> Self {
301 Self {
307 changes: self.changes.clone(),
308 kind: self.kind,
309 max_samples: self.max_samples,
310 evicted_count: self.evicted_count,
311 stats: Arc::new((*self.stats).clone()),
312 #[cfg(feature = "inspect")]
313 inspect_label: self.inspect_label.clone(),
314 }
315 }
316}
317
318impl HistoryCache {
319 #[must_use]
323 pub fn new_with_kind(kind: HistoryKind, max_samples: usize) -> Self {
324 Self {
325 changes: BTreeMap::new(),
326 kind,
327 max_samples,
328 evicted_count: 0,
329 stats: Arc::new(HistoryCacheStats::default()),
330 #[cfg(feature = "inspect")]
331 inspect_label: None,
332 }
333 }
334
335 #[cfg(feature = "inspect")]
338 pub fn set_inspect_label(&mut self, label: alloc::string::String) {
339 self.inspect_label = Some(label);
340 }
341
342 #[must_use]
349 pub fn stats(&self) -> Arc<HistoryCacheStats> {
350 Arc::clone(&self.stats)
351 }
352
353 fn refresh_stats(&self) {
356 self.stats.len.store(self.changes.len(), Ordering::Release);
357 self.stats
358 .evicted
359 .store(self.evicted_count, Ordering::Release);
360 let max = self.changes.keys().next_back().copied();
361 let min = self.changes.keys().next().copied();
362 self.stats
363 .max_sn
364 .store(encode_sn_atom(max), Ordering::Release);
365 self.stats
366 .min_sn
367 .store(encode_sn_atom(min), Ordering::Release);
368 }
369
370 #[must_use]
373 pub fn new(max_samples: usize) -> Self {
374 Self::new_with_kind(HistoryKind::KeepAll, max_samples)
375 }
376
377 #[must_use]
379 pub fn kind(&self) -> HistoryKind {
380 self.kind
381 }
382
383 #[must_use]
386 pub fn evicted_count(&self) -> u64 {
387 self.evicted_count
388 }
389
390 pub fn insert(&mut self, change: CacheChange) -> Result<(), CacheError> {
397 if self.changes.contains_key(&change.sequence_number) {
398 return Err(CacheError::DuplicateSequenceNumber);
399 }
400 let cap = self.effective_max_samples()?;
401 if self.changes.len() >= cap {
402 match self.kind {
403 HistoryKind::KeepAll => return Err(CacheError::CapacityExceeded),
404 HistoryKind::KeepLast { .. } => {
405 if let Some((&oldest, _)) = self.changes.iter().next() {
407 self.changes.remove(&oldest);
408 self.evicted_count = self.evicted_count.saturating_add(1);
409 }
410 }
411 }
412 }
413 #[cfg(feature = "inspect")]
414 let tap_view = self.inspect_label.as_ref().map(|label| {
415 (
416 label.clone(),
417 change.sequence_number,
418 change.payload.to_vec(),
419 )
420 });
421 self.changes.insert(change.sequence_number, change);
422 self.refresh_stats();
423 #[cfg(feature = "inspect")]
424 if let Some((label, sn, payload)) = tap_view {
425 dispatch_rtps_tap(&label, sn, payload);
426 }
427 Ok(())
428 }
429
430 fn effective_max_samples(&self) -> Result<usize, CacheError> {
432 match self.kind {
433 HistoryKind::KeepAll => Ok(self.max_samples),
434 HistoryKind::KeepLast { depth } => {
435 if depth == 0 {
436 return Err(CacheError::ZeroDepth);
437 }
438 Ok(core::cmp::min(depth, self.max_samples))
439 }
440 }
441 }
442
443 #[must_use]
445 pub fn get(&self, sn: SequenceNumber) -> Option<&CacheChange> {
446 self.changes.get(&sn)
447 }
448
449 pub fn remove_up_to(&mut self, sn: SequenceNumber) -> usize {
452 let keep = self.changes.split_off(&SequenceNumber(sn.0 + 1));
453 let removed = self.changes.len();
454 self.changes = keep;
455 self.refresh_stats();
456 removed
457 }
458
459 pub fn iter_range(
462 &self,
463 lo: SequenceNumber,
464 hi: SequenceNumber,
465 ) -> impl Iterator<Item = &CacheChange> + '_ {
466 self.changes.range(lo..=hi).map(|(_, v)| v)
467 }
468
469 #[must_use]
471 pub fn min_sn(&self) -> Option<SequenceNumber> {
472 self.changes.keys().next().copied()
473 }
474
475 #[must_use]
477 pub fn max_sn(&self) -> Option<SequenceNumber> {
478 self.changes.keys().next_back().copied()
479 }
480
481 #[must_use]
483 pub fn len(&self) -> usize {
484 self.changes.len()
485 }
486
487 #[must_use]
489 pub fn is_empty(&self) -> bool {
490 self.changes.is_empty()
491 }
492
493 #[must_use]
495 pub fn capacity(&self) -> usize {
496 self.max_samples
497 }
498}
499
500#[cfg(feature = "std")]
541#[derive(Debug)]
542pub struct LockFreeReadHistoryCache {
543 inner: zerodds_foundation::rcu::RcuCell<LockFreeInner>,
544 stats: Arc<HistoryCacheStats>,
545}
546
547#[cfg(feature = "std")]
551#[derive(Debug, Clone)]
552pub struct LockFreeInner {
553 pub changes: BTreeMap<SequenceNumber, CacheChange>,
555 pub kind: HistoryKind,
557 pub max_samples: usize,
559 pub evicted_count: u64,
561}
562
563#[cfg(feature = "std")]
564impl LockFreeInner {
565 fn effective_max_samples(&self) -> Result<usize, CacheError> {
566 match self.kind {
567 HistoryKind::KeepAll => Ok(self.max_samples),
568 HistoryKind::KeepLast { depth } => {
569 if depth == 0 {
570 return Err(CacheError::ZeroDepth);
571 }
572 Ok(core::cmp::min(depth, self.max_samples))
573 }
574 }
575 }
576}
577
578#[cfg(feature = "std")]
579impl LockFreeReadHistoryCache {
580 #[must_use]
582 pub fn new_with_kind(kind: HistoryKind, max_samples: usize) -> Self {
583 Self {
584 inner: zerodds_foundation::rcu::RcuCell::new(LockFreeInner {
585 changes: BTreeMap::new(),
586 kind,
587 max_samples,
588 evicted_count: 0,
589 }),
590 stats: Arc::new(HistoryCacheStats::default()),
591 }
592 }
593
594 #[must_use]
596 pub fn new(max_samples: usize) -> Self {
597 Self::new_with_kind(HistoryKind::KeepAll, max_samples)
598 }
599
600 #[must_use]
602 pub fn stats(&self) -> Arc<HistoryCacheStats> {
603 Arc::clone(&self.stats)
604 }
605
606 #[must_use]
609 pub fn snapshot(&self) -> Arc<LockFreeInner> {
610 self.inner.read()
611 }
612
613 #[must_use]
615 pub fn kind(&self) -> HistoryKind {
616 self.inner.read().kind
617 }
618
619 #[must_use]
621 pub fn evicted_count(&self) -> u64 {
622 self.stats.evicted.load(Ordering::Acquire)
623 }
624
625 #[must_use]
627 pub fn len(&self) -> usize {
628 self.stats.len.load(Ordering::Acquire)
629 }
630
631 #[must_use]
633 pub fn is_empty(&self) -> bool {
634 self.len() == 0
635 }
636
637 #[must_use]
639 pub fn min_sn(&self) -> Option<SequenceNumber> {
640 decode_sn_atom(self.stats.min_sn.load(Ordering::Acquire))
641 }
642
643 #[must_use]
645 pub fn max_sn(&self) -> Option<SequenceNumber> {
646 decode_sn_atom(self.stats.max_sn.load(Ordering::Acquire))
647 }
648
649 #[must_use]
651 pub fn capacity(&self) -> usize {
652 self.inner.read().max_samples
653 }
654
655 #[must_use]
658 pub fn get(&self, sn: SequenceNumber) -> Option<CacheChange> {
659 self.inner.read().changes.get(&sn).cloned()
660 }
661
662 #[must_use]
666 pub fn iter_range_snapshot(&self, lo: SequenceNumber, hi: SequenceNumber) -> Vec<CacheChange> {
667 let snap = self.inner.read();
668 snap.changes
669 .range(lo..=hi)
670 .map(|(_, v)| v.clone())
671 .collect()
672 }
673
674 pub fn insert(&self, change: CacheChange) -> Result<(), CacheError> {
679 let dup_or_full: Result<(), CacheError> = {
682 let snap = self.inner.read();
683 if snap.changes.contains_key(&change.sequence_number) {
684 Err(CacheError::DuplicateSequenceNumber)
685 } else {
686 let cap = snap.effective_max_samples()?;
687 if snap.changes.len() >= cap {
688 if matches!(snap.kind, HistoryKind::KeepAll) {
689 Err(CacheError::CapacityExceeded)
690 } else {
691 Ok(()) }
693 } else {
694 Ok(())
695 }
696 }
697 };
698 dup_or_full?;
699 self.inner.modify(|inner| {
700 let cap = match inner.effective_max_samples() {
702 Ok(c) => c,
703 Err(_) => return,
704 };
705 if inner.changes.len() >= cap {
706 if let HistoryKind::KeepLast { .. } = inner.kind {
707 if let Some((&oldest, _)) = inner.changes.iter().next() {
708 inner.changes.remove(&oldest);
709 inner.evicted_count = inner.evicted_count.saturating_add(1);
710 }
711 }
712 }
713 inner.changes.insert(change.sequence_number, change.clone());
714 });
715 self.refresh_stats();
716 Ok(())
717 }
718
719 pub fn remove_up_to(&self, sn: SequenceNumber) -> usize {
721 let mut removed = 0;
722 self.inner.modify(|inner| {
723 let keep = inner.changes.split_off(&SequenceNumber(sn.0 + 1));
724 removed = inner.changes.len();
725 inner.changes = keep;
726 });
727 self.refresh_stats();
728 removed
729 }
730
731 fn refresh_stats(&self) {
732 let snap = self.inner.read();
733 self.stats.len.store(snap.changes.len(), Ordering::Release);
734 self.stats
735 .evicted
736 .store(snap.evicted_count, Ordering::Release);
737 let max = snap.changes.keys().next_back().copied();
738 let min = snap.changes.keys().next().copied();
739 self.stats
740 .max_sn
741 .store(encode_sn_atom(max), Ordering::Release);
742 self.stats
743 .min_sn
744 .store(encode_sn_atom(min), Ordering::Release);
745 }
746}
747
748#[cfg(test)]
749#[allow(clippy::expect_used, clippy::unwrap_used)]
750mod tests {
751 use super::*;
752
753 fn sn(n: i64) -> SequenceNumber {
754 SequenceNumber(n)
755 }
756
757 fn alive(n: i64) -> CacheChange {
758 CacheChange::alive(sn(n), alloc::vec![n as u8])
759 }
760
761 #[test]
762 fn new_cache_is_empty() {
763 let c = HistoryCache::new(10);
764 assert_eq!(c.len(), 0);
765 assert!(c.is_empty());
766 assert_eq!(c.min_sn(), None);
767 assert_eq!(c.max_sn(), None);
768 }
769
770 #[test]
771 fn insert_and_get() {
772 let mut c = HistoryCache::new(10);
773 c.insert(alive(1)).expect("insert");
774 c.insert(alive(2)).expect("insert");
775 assert_eq!(
776 c.get(sn(1)).map(|ch| ch.payload.as_ref().to_vec()),
777 Some(alloc::vec![1])
778 );
779 assert_eq!(c.get(sn(3)), None);
780 assert_eq!(c.len(), 2);
781 }
782
783 #[test]
784 fn insert_duplicate_is_err() {
785 let mut c = HistoryCache::new(10);
786 c.insert(alive(1)).expect("insert");
787 assert_eq!(c.insert(alive(1)), Err(CacheError::DuplicateSequenceNumber));
788 }
789
790 #[test]
791 fn insert_at_capacity_is_err() {
792 let mut c = HistoryCache::new(2);
793 c.insert(alive(1)).expect("insert");
794 c.insert(alive(2)).expect("insert");
795 assert_eq!(c.insert(alive(3)), Err(CacheError::CapacityExceeded));
796 }
797
798 #[test]
799 fn min_max_sn_reflect_content() {
800 let mut c = HistoryCache::new(10);
801 c.insert(alive(5)).unwrap();
802 c.insert(alive(3)).unwrap();
803 c.insert(alive(7)).unwrap();
804 assert_eq!(c.min_sn(), Some(sn(3)));
805 assert_eq!(c.max_sn(), Some(sn(7)));
806 }
807
808 #[test]
809 fn remove_up_to_inclusive() {
810 let mut c = HistoryCache::new(10);
811 for i in 1..=5 {
812 c.insert(alive(i)).unwrap();
813 }
814 let removed = c.remove_up_to(sn(3));
815 assert_eq!(removed, 3);
816 assert_eq!(c.len(), 2);
817 assert_eq!(c.min_sn(), Some(sn(4)));
818 }
819
820 #[test]
821 fn remove_up_to_with_no_matches_is_noop() {
822 let mut c = HistoryCache::new(10);
823 c.insert(alive(10)).unwrap();
824 assert_eq!(c.remove_up_to(sn(5)), 0);
825 assert_eq!(c.len(), 1);
826 }
827
828 #[test]
829 fn iter_range_is_ordered() {
830 let mut c = HistoryCache::new(10);
831 for i in [5, 1, 3, 8, 2] {
832 c.insert(alive(i)).unwrap();
833 }
834 let collected: alloc::vec::Vec<i64> = c
835 .iter_range(sn(2), sn(5))
836 .map(|ch| ch.sequence_number.0)
837 .collect();
838 assert_eq!(collected, alloc::vec![2, 3, 5]);
839 }
840
841 #[test]
842 fn iter_range_empty_when_no_overlap() {
843 let mut c = HistoryCache::new(10);
844 c.insert(alive(1)).unwrap();
845 c.insert(alive(2)).unwrap();
846 assert_eq!(c.iter_range(sn(10), sn(20)).count(), 0);
847 }
848
849 #[test]
850 fn capacity_accessor() {
851 let c = HistoryCache::new(42);
852 assert_eq!(c.capacity(), 42);
853 }
854
855 #[test]
856 fn cache_change_alive_constructor() {
857 let ch = CacheChange::alive(sn(1), alloc::vec![1, 2, 3]);
858 assert_eq!(ch.kind, ChangeKind::Alive);
859 assert_eq!(ch.sequence_number, sn(1));
860 assert_eq!(ch.payload.as_ref(), &[1, 2, 3][..]);
861 }
862
863 #[test]
866 fn change_kind_alive_is_relevant_and_alive() {
867 assert!(ChangeKind::Alive.is_relevant());
868 assert!(ChangeKind::Alive.is_alive_kind());
869 }
870
871 #[test]
872 fn change_kind_alive_filtered_is_alive_but_not_relevant() {
873 assert!(ChangeKind::AliveFiltered.is_alive_kind());
875 assert!(!ChangeKind::AliveFiltered.is_relevant());
876 }
877
878 #[test]
879 fn change_kind_not_alive_kinds_are_not_alive() {
880 for k in [
881 ChangeKind::NotAliveDisposed,
882 ChangeKind::NotAliveUnregistered,
883 ChangeKind::NotAliveDisposedUnregistered,
884 ] {
885 assert!(!k.is_alive_kind(), "{k:?}");
886 assert!(k.is_relevant(), "{k:?}");
887 }
888 }
889
890 #[test]
891 fn change_kind_distinct_variants() {
892 let v = [
894 ChangeKind::Alive,
895 ChangeKind::AliveFiltered,
896 ChangeKind::NotAliveDisposed,
897 ChangeKind::NotAliveUnregistered,
898 ChangeKind::NotAliveDisposedUnregistered,
899 ];
900 for (i, a) in v.iter().enumerate() {
901 for (j, b) in v.iter().enumerate() {
902 if i == j {
903 assert_eq!(a, b);
904 } else {
905 assert_ne!(a, b);
906 }
907 }
908 }
909 }
910
911 #[test]
916 fn stats_default_is_empty_with_no_sn() {
917 let c = HistoryCache::new(10);
918 let snap = c.stats().snapshot();
919 assert_eq!(snap.len, 0);
920 assert_eq!(snap.evicted, 0);
921 assert_eq!(snap.max_sn, None);
922 assert_eq!(snap.min_sn, None);
923 }
924
925 #[test]
926 fn stats_track_insert_and_remove() {
927 let mut c = HistoryCache::new(10);
928 c.insert(alive(3)).unwrap();
929 c.insert(alive(5)).unwrap();
930 c.insert(alive(7)).unwrap();
931 let snap = c.stats().snapshot();
932 assert_eq!(snap.len, 3);
933 assert_eq!(snap.min_sn, Some(sn(3)));
934 assert_eq!(snap.max_sn, Some(sn(7)));
935 assert_eq!(snap.evicted, 0);
936
937 c.remove_up_to(sn(5));
938 let snap = c.stats().snapshot();
939 assert_eq!(snap.len, 1);
940 assert_eq!(snap.min_sn, Some(sn(7)));
941 assert_eq!(snap.max_sn, Some(sn(7)));
942 }
943
944 #[test]
945 fn stats_track_keeplast_eviction() {
946 let mut c = HistoryCache::new_with_kind(HistoryKind::KeepLast { depth: 2 }, 100);
947 c.insert(alive(1)).unwrap();
948 c.insert(alive(2)).unwrap();
949 c.insert(alive(3)).unwrap(); let snap = c.stats().snapshot();
951 assert_eq!(snap.len, 2);
952 assert_eq!(snap.evicted, 1);
953 assert_eq!(snap.min_sn, Some(sn(2)));
954 assert_eq!(snap.max_sn, Some(sn(3)));
955 }
956
957 #[test]
958 fn stats_arc_is_shared_across_clones_of_handle() {
959 let mut c = HistoryCache::new(10);
963 let s1 = c.stats();
964 let s2 = c.stats();
965 assert!(Arc::ptr_eq(&s1, &s2));
966 c.insert(alive(1)).unwrap();
967 assert_eq!(s1.snapshot().len, 1);
968 assert_eq!(s2.snapshot().len, 1);
969 }
970
971 #[test]
972 fn stats_reader_thread_sees_inserts_concurrently() {
973 use std::sync::Arc as StdArc;
976 use std::sync::Mutex as StdMutex;
977 use std::thread;
978 use std::time::Duration;
979
980 let cache = StdArc::new(StdMutex::new(HistoryCache::new(2_000)));
981 let stats = cache.lock().expect("init lock").stats();
982
983 let writer_cache = StdArc::clone(&cache);
984 let writer = thread::spawn(move || {
985 for i in 1..=1_000 {
986 let mut c = writer_cache.lock().expect("write lock");
987 c.insert(alive(i)).expect("insert");
988 }
989 });
990
991 let reader_stats = StdArc::clone(&stats);
992 let reader = thread::spawn(move || {
993 for _ in 0..100 {
995 let snap = reader_stats.snapshot();
996 assert!(snap.len <= 1_000);
998 if let Some(max) = snap.max_sn {
999 assert!(max.0 >= 1 && max.0 <= 1_000);
1000 }
1001 thread::sleep(Duration::from_micros(50));
1002 }
1003 });
1004
1005 writer.join().expect("writer joined");
1006 reader.join().expect("reader joined");
1007
1008 let final_snap = stats.snapshot();
1009 assert_eq!(final_snap.len, 1_000);
1010 assert_eq!(final_snap.max_sn, Some(sn(1_000)));
1011 assert_eq!(final_snap.min_sn, Some(sn(1)));
1012 }
1013
1014 #[test]
1015 fn clone_creates_independent_stats_handles() {
1016 let mut a = HistoryCache::new(10);
1019 a.insert(alive(1)).unwrap();
1020 let b = a.clone();
1021 assert!(!Arc::ptr_eq(&a.stats(), &b.stats()));
1022 assert_eq!(a.stats().snapshot().len, 1);
1023 assert_eq!(b.stats().snapshot().len, 1);
1024
1025 let mut a_mut = a;
1026 a_mut.insert(alive(2)).unwrap();
1027 assert_eq!(a_mut.stats().snapshot().len, 2);
1028 assert_eq!(b.stats().snapshot().len, 1, "clone unaffected");
1029 }
1030
1031 #[cfg(feature = "std")]
1036 mod lock_free_tests {
1037 use super::*;
1038
1039 #[test]
1040 fn lock_free_new_is_empty() {
1041 let c = LockFreeReadHistoryCache::new(10);
1042 assert_eq!(c.len(), 0);
1043 assert!(c.is_empty());
1044 assert_eq!(c.min_sn(), None);
1045 assert_eq!(c.max_sn(), None);
1046 }
1047
1048 #[test]
1049 fn lock_free_insert_and_get() {
1050 let c = LockFreeReadHistoryCache::new(10);
1051 c.insert(alive(1)).unwrap();
1053 c.insert(alive(2)).unwrap();
1054 assert_eq!(
1055 c.get(sn(1)).map(|ch| ch.payload.as_ref().to_vec()),
1056 Some(alloc::vec![1])
1057 );
1058 assert_eq!(c.get(sn(3)), None);
1059 assert_eq!(c.len(), 2);
1060 }
1061
1062 #[test]
1063 fn lock_free_min_max_lock_free_loads() {
1064 let c = LockFreeReadHistoryCache::new(10);
1065 c.insert(alive(5)).unwrap();
1066 c.insert(alive(3)).unwrap();
1067 c.insert(alive(7)).unwrap();
1068 assert_eq!(c.min_sn(), Some(sn(3)));
1069 assert_eq!(c.max_sn(), Some(sn(7)));
1070 }
1071
1072 #[test]
1073 fn lock_free_keeplast_evicts_oldest() {
1074 let c =
1075 LockFreeReadHistoryCache::new_with_kind(HistoryKind::KeepLast { depth: 2 }, 100);
1076 c.insert(alive(1)).unwrap();
1077 c.insert(alive(2)).unwrap();
1078 c.insert(alive(3)).unwrap(); assert_eq!(c.len(), 2);
1080 assert_eq!(c.min_sn(), Some(sn(2)));
1081 assert_eq!(c.max_sn(), Some(sn(3)));
1082 assert_eq!(c.evicted_count(), 1);
1083 }
1084
1085 #[test]
1086 fn lock_free_keepall_full_rejects() {
1087 let c = LockFreeReadHistoryCache::new(2);
1088 c.insert(alive(1)).unwrap();
1089 c.insert(alive(2)).unwrap();
1090 assert_eq!(c.insert(alive(3)), Err(CacheError::CapacityExceeded));
1091 }
1092
1093 #[test]
1094 fn lock_free_duplicate_sn_rejected() {
1095 let c = LockFreeReadHistoryCache::new(10);
1096 c.insert(alive(1)).unwrap();
1097 assert_eq!(c.insert(alive(1)), Err(CacheError::DuplicateSequenceNumber));
1098 }
1099
1100 #[test]
1101 fn lock_free_remove_up_to() {
1102 let c = LockFreeReadHistoryCache::new(10);
1103 for i in 1..=5 {
1104 c.insert(alive(i)).unwrap();
1105 }
1106 let removed = c.remove_up_to(sn(3));
1107 assert_eq!(removed, 3);
1108 assert_eq!(c.len(), 2);
1109 assert_eq!(c.min_sn(), Some(sn(4)));
1110 }
1111
1112 #[test]
1113 fn lock_free_iter_range_snapshot() {
1114 let c = LockFreeReadHistoryCache::new(10);
1115 for i in 1..=5 {
1116 c.insert(alive(i)).unwrap();
1117 }
1118 let mid: alloc::vec::Vec<_> = c
1119 .iter_range_snapshot(sn(2), sn(4))
1120 .iter()
1121 .map(|ch| ch.sequence_number)
1122 .collect();
1123 assert_eq!(mid, alloc::vec![sn(2), sn(3), sn(4)]);
1124 }
1125
1126 #[test]
1127 fn lock_free_snapshot_outlives_writes() {
1128 let c = LockFreeReadHistoryCache::new(10);
1131 c.insert(alive(1)).unwrap();
1132 let snap = c.snapshot();
1133 assert_eq!(snap.changes.len(), 1);
1134
1135 c.insert(alive(2)).unwrap();
1136 c.insert(alive(3)).unwrap();
1137 assert_eq!(snap.changes.len(), 1);
1139 assert!(snap.changes.contains_key(&sn(1)));
1140 assert_eq!(c.len(), 3);
1142 }
1143
1144 #[test]
1145 fn lock_free_concurrent_readers_writers_smoke() {
1146 use std::sync::Arc as StdArc;
1147 use std::thread;
1148
1149 let cache: StdArc<LockFreeReadHistoryCache> =
1150 StdArc::new(LockFreeReadHistoryCache::new(2_000));
1151 let cache_w = StdArc::clone(&cache);
1152 let writer = thread::spawn(move || {
1153 for i in 1..=500 {
1154 cache_w.insert(alive(i)).expect("insert");
1155 }
1156 });
1157
1158 let cache_r = StdArc::clone(&cache);
1159 let reader = thread::spawn(move || {
1160 for _ in 0..200 {
1161 let snap = cache_r.snapshot();
1162 if let (Some(min), Some(max)) = (
1165 snap.changes.keys().next().copied(),
1166 snap.changes.keys().next_back().copied(),
1167 ) {
1168 let inferred_count = (max.0 - min.0 + 1) as usize;
1169 assert!(
1170 snap.changes.len() <= inferred_count,
1171 "snapshot inkonsistent"
1172 );
1173 }
1174 }
1175 });
1176
1177 writer.join().expect("writer joined");
1178 reader.join().expect("reader joined");
1179
1180 assert_eq!(cache.len(), 500);
1181 assert_eq!(cache.max_sn(), Some(sn(500)));
1182 }
1183 }
1184}