Skip to main content

zerodds_rtps/
history_cache.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! `HistoryCache` — geordnete Sample-Ablage fuer Reliable Writer/Reader.
4//!
5//! DDSI-RTPS 2.5 §8.4.8. Beide Seiten (Writer + Reader) halten je eine
6//! eigene Cache-Instanz:
7//!
8//! - **Writer-Cache**: per `write()` abgelegte `CacheChange`s, aus denen
9//!   auf AckNack-Request hin re-gesendet wird. Entfernt Samples erst,
10//!   wenn **alle** matched Reader sie via AckNack bestaetigt haben.
11//! - **Reader-Cache**: empfangene `CacheChange`s in SN-Reihenfolge, fuer
12//!   in-order Delivery an die Applikations-Schicht. Kann via
13//!   `remove_up_to` nach Delivery geleert werden.
14//!
15//! **History-QoS (WP 1.4 T3-Follow-up):** der Cache wird ueber
16//! [`HistoryKind`] konfiguriert — `KeepAll` (hart-begrenzt, Error bei
17//! Overflow) vs. `KeepLast(depth)` (Ring-Buffer, aeltestes Sample faellt
18//! bei Overflow heraus). KeepLast ist Spec-gerecht (§8.7.4) und
19//! entkoppelt Writer-Cache-GC von Reader-ACKNACK-Progress — ein
20//! stalled Reader verhindert damit nicht mehr, dass andere
21//! Reader weitere Samples bekommen ("per-destination queue"-Modell).
22
23extern crate alloc;
24use alloc::collections::BTreeMap;
25use alloc::sync::Arc;
26use alloc::vec::Vec;
27use core::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
28
29use crate::wire_types::SequenceNumber;
30
31#[cfg(feature = "inspect")]
32use alloc::borrow::ToOwned;
33
34#[cfg(feature = "inspect")]
35fn dispatch_rtps_tap(label: &str, sn: SequenceNumber, payload: Vec<u8>) {
36    let ts_ns = std::time::SystemTime::now()
37        .duration_since(std::time::UNIX_EPOCH)
38        .map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
39        .unwrap_or(0);
40    #[allow(clippy::cast_sign_loss)]
41    let corr = sn.0 as u64;
42    let frame = zerodds_inspect_endpoint::Frame::rtps(label.to_owned(), ts_ns, corr, payload);
43    zerodds_inspect_endpoint::tap::dispatch(&frame);
44}
45
46/// Art eines Cache-Eintrags (DDSI-RTPS §8.2.1.2 / §8.7.2.2.2).
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ChangeKind {
49    /// Gueltiges Sample, von DDS-DataReader-Filter akzeptiert.
50    Alive,
51    /// Gueltiges Sample, vom Reader-side Filter (TIME_BASED_FILTER /
52    /// ContentFilteredTopic) verworfen — bleibt aber im NACK-Pfad
53    /// "available", damit Reliable-Writer es nicht erneut sendet
54    /// (filteredCount-Zaehler in `ChangeFromWriter`).
55    AliveFiltered,
56    /// `dispose`-Marker.
57    NotAliveDisposed,
58    /// `unregister`-Marker.
59    NotAliveUnregistered,
60    /// Kombinierter dispose+unregister.
61    NotAliveDisposedUnregistered,
62}
63
64impl ChangeKind {
65    /// Spec §8.4.10.5 — `is_relevant` true fuer alle Live-Kinds; nur
66    /// `AliveFiltered` ist explizit *nicht* relevant fuer den DDS-
67    /// User-API-Path (zaehlt aber im NACK-Pfad als "received").
68    #[must_use]
69    pub fn is_relevant(self) -> bool {
70        !matches!(self, Self::AliveFiltered)
71    }
72
73    /// Spec §8.4.10.5 — `is_alive_kind` umfasst Alive + AliveFiltered.
74    #[must_use]
75    pub fn is_alive_kind(self) -> bool {
76        matches!(self, Self::Alive | Self::AliveFiltered)
77    }
78}
79
80/// Einzelner Cache-Eintrag.
81///
82/// `payload` wird als `Arc<[u8]>` gehalten — Cache, Writer-Build-
83/// Datagram-Pfad und Reader-Delivery teilen sich eine einzige
84/// Allocation. Das spart im Reliable-Writer-Tick den n-fachen
85/// `Vec::clone()` pro Reader-Proxy (Perf-Audit F7/F8/F10, ~30-50 %
86/// Throughput-Gewinn bei grossen Payloads).
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct CacheChange {
89    /// Sequence-Number (writer-lokal eindeutig).
90    pub sequence_number: SequenceNumber,
91    /// Nutzlast (serialisierter Sample), referenzgezaehlt.
92    pub payload: Arc<[u8]>,
93    /// Art des Events.
94    pub kind: ChangeKind,
95    /// Optionaler `PID_KEY_HASH` aus dem Inline-QoS (Spec §9.6.4.8).
96    /// Reader-Side: bei keyed Topics + ALIVE-Samples mit Inline-Hash
97    /// oder bei Lifecycle-Markern (Disposed/Unregistered) gefuellt
98    /// vom Reader-Pfad. Writer-Side: gesetzt vom Writer wenn der Pfad
99    /// den Hash entlang der Sample-Pipeline propagiert.
100    pub key_hash: Option<[u8; 16]>,
101}
102
103impl CacheChange {
104    /// Erstellt ein Alive-Change. Nimmt `Vec<u8>` entgegen und
105    /// konvertiert einmalig in `Arc<[u8]>`. Fuer Call-Sites, die die
106    /// Allocation bereits als Arc haben, gibt es [`Self::alive_arc`].
107    #[must_use]
108    pub fn alive(sn: SequenceNumber, payload: Vec<u8>) -> Self {
109        Self::alive_arc(sn, Arc::from(payload))
110    }
111
112    /// Erstellt ein Alive-Change aus einer bereits existierenden
113    /// `Arc<[u8]>`-Payload. Zero-Copy-Pfad fuer Caller, die die
114    /// Allocation teilen (Writer ↔ Cache ↔ Datagram).
115    ///
116    /// **Crate-intern:** externe Nutzer sollen via [`Self::alive`] mit
117    /// `Vec<u8>` eintreten — der Arc-Pfad ist reine Writer/Reader-
118    /// interne Optimierung und soll nicht versehentlich Teil der
119    /// ffentlichen API werden.
120    #[must_use]
121    pub(crate) fn alive_arc(sn: SequenceNumber, payload: Arc<[u8]>) -> Self {
122        Self {
123            sequence_number: sn,
124            payload,
125            kind: ChangeKind::Alive,
126            key_hash: None,
127        }
128    }
129
130    /// Erstellt einen Lifecycle-Marker (Spec §8.2.1.2). `payload` ist die
131    /// Key-Only-Serialisierung der disposed/unregistered Instanz —
132    /// genau das, was als `PID_KEY_HASH` in der Inline-QoS landet.
133    #[must_use]
134    pub fn lifecycle(sn: SequenceNumber, payload: Vec<u8>, kind: ChangeKind) -> Self {
135        Self {
136            sequence_number: sn,
137            payload: Arc::from(payload),
138            kind,
139            key_hash: None,
140        }
141    }
142}
143
144/// History-QoS (DDSI-RTPS §8.7.4, Spec Table 17).
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum HistoryKind {
147    /// Cache ist hart begrenzt; bei Overflow liefert `insert` einen
148    /// `CapacityExceeded`-Fehler. Nuetzlich fuer No-Loss-Szenarien, in
149    /// denen der Writer eher blockieren als Daten verwerfen will
150    /// (Dateitransfer, Logging).
151    KeepAll,
152    /// Cache haelt maximal `depth` neueste Samples. Bei Overflow faellt
153    /// automatisch das **aelteste** Sample raus — Writer-`insert`
154    /// schlaegt nie wegen Kapazitaet fehl. Spec-Default fuer DDS.
155    KeepLast {
156        /// Maximalzahl Samples im Cache.
157        depth: usize,
158    },
159}
160
161/// Fehler-Varianten fuer Cache-Operationen.
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163#[non_exhaustive]
164pub enum CacheError {
165    /// `KeepAll`-Cache hat seine Kapazitaet erreicht.
166    CapacityExceeded,
167    /// Dieselbe SN wurde bereits eingefuegt.
168    DuplicateSequenceNumber,
169    /// `KeepLast` mit `depth == 0` — jedes Insert waere sofortiges
170    /// Drop. Der Fall wird bei `insert` abgelehnt statt silent akzeptiert.
171    ZeroDepth,
172}
173
174/// Sentinel-Wert fuer "kein Eintrag" in den `AtomicI64`-Stats. Wird
175/// gewaehlt damit jeder gueltige `SequenceNumber.0` (>= 1 per Spec)
176/// als positiver Wert eindeutig unterscheidbar ist.
177const STATS_SENTINEL_NO_SN: i64 = i64::MIN;
178
179/// Atomar-aktualisierte Snapshot-Statistik eines [`HistoryCache`].
180///
181/// **** das eigentliche `BTreeMap`-Storage des Caches
182/// braucht weiter `&mut self` zum Mutieren (lese-/schreib-konkurrente
183/// `BTreeMap`-Mutation gibt's in `std` nicht), aber **Statistik-Werte**
184/// (Laenge, max/min SN, Eviction-Counter) werden parallel in einem
185/// `Arc<HistoryCacheStats>` mitgefuehrt. Monitoring-Threads, SEDP-Tick-
186/// Loops und Telemetrie koennen so zustimmungsfrei pollen, ohne den
187/// Writer/Reader-Lock zu nehmen.
188///
189/// Konsistenz-Garantie: jede mutierende Methode des Caches updated die
190/// Atomics **nach** der `BTreeMap`-Mutation, mit `Release`-Ordering.
191/// Reader nutzen `Acquire`-Ordering — sie sehen einen konsistenten
192/// Stand der **letzten** abgeschlossenen Cache-Operation, nie einen
193/// halb-aktualisierten Zustand der einzelnen Atomics.
194///
195/// Was *nicht* garantiert ist: cross-field-Konsistenz. Wenn ein
196/// Reader `len` und dann `max_sn` liest, koennen zwischen den
197/// Loads weitere Inserts passiert sein. Das ist akzeptabel fuer
198/// Monitoring; fuer harte Wire-Pfade (Heartbeat-Build) wird weiter
199/// der Writer-Lock genommen.
200#[derive(Debug)]
201pub struct HistoryCacheStats {
202    /// Anzahl Changes im Cache (entspricht `BTreeMap::len`).
203    pub len: AtomicUsize,
204    /// Anzahl per `KeepLast`-Eviction verworfener Samples seit Start.
205    pub evicted: AtomicU64,
206    /// Hoechste SN im Cache, oder [`STATS_SENTINEL_NO_SN`] wenn leer.
207    pub max_sn: AtomicI64,
208    /// Niedrigste SN im Cache, oder [`STATS_SENTINEL_NO_SN`] wenn leer.
209    pub min_sn: AtomicI64,
210}
211
212impl Default for HistoryCacheStats {
213    fn default() -> Self {
214        Self {
215            len: AtomicUsize::new(0),
216            evicted: AtomicU64::new(0),
217            max_sn: AtomicI64::new(STATS_SENTINEL_NO_SN),
218            min_sn: AtomicI64::new(STATS_SENTINEL_NO_SN),
219        }
220    }
221}
222
223impl HistoryCacheStats {
224    /// Snapshot der vier Atomics als Plain-Old-Data-Struct. Wird mit
225    /// `Acquire`-Ordering geladen — synchronisiert mit der
226    /// `Release`-Speicheroperation in [`HistoryCache::insert`] /
227    /// [`HistoryCache::remove_up_to`].
228    #[must_use]
229    pub fn snapshot(&self) -> HistoryCacheSnapshot {
230        HistoryCacheSnapshot {
231            len: self.len.load(Ordering::Acquire),
232            evicted: self.evicted.load(Ordering::Acquire),
233            max_sn: decode_sn_atom(self.max_sn.load(Ordering::Acquire)),
234            min_sn: decode_sn_atom(self.min_sn.load(Ordering::Acquire)),
235        }
236    }
237}
238
239impl Clone for HistoryCacheStats {
240    fn clone(&self) -> Self {
241        Self {
242            len: AtomicUsize::new(self.len.load(Ordering::Acquire)),
243            evicted: AtomicU64::new(self.evicted.load(Ordering::Acquire)),
244            max_sn: AtomicI64::new(self.max_sn.load(Ordering::Acquire)),
245            min_sn: AtomicI64::new(self.min_sn.load(Ordering::Acquire)),
246        }
247    }
248}
249
250fn decode_sn_atom(v: i64) -> Option<SequenceNumber> {
251    if v == STATS_SENTINEL_NO_SN {
252        None
253    } else {
254        Some(SequenceNumber(v))
255    }
256}
257
258fn encode_sn_atom(sn: Option<SequenceNumber>) -> i64 {
259    sn.map_or(STATS_SENTINEL_NO_SN, |s| s.0)
260}
261
262/// Plain-Old-Data-Snapshot der `HistoryCache`-Statistiken zu einem
263/// einzelnen Zeitpunkt. Wird von [`HistoryCacheStats::snapshot`]
264/// erzeugt; jede Komponente ist mit `Acquire`-Ordering geladen.
265#[derive(Debug, Clone, Copy, PartialEq, Eq)]
266pub struct HistoryCacheSnapshot {
267    /// Anzahl Changes.
268    pub len: usize,
269    /// Anzahl per `KeepLast`-Eviction verworfener Samples.
270    pub evicted: u64,
271    /// Hoechste SN, falls vorhanden.
272    pub max_sn: Option<SequenceNumber>,
273    /// Niedrigste SN, falls vorhanden.
274    pub min_sn: Option<SequenceNumber>,
275}
276
277/// Geordnete Sample-Ablage.
278///
279/// Interner Storage: `BTreeMap` fuer O(log n) Insert/Lookup und effizienten
280/// Range-Iterator.
281///
282/// **** die schreibenden Methoden brauchen weiter
283/// `&mut self` (BTreeMap ist nicht concurrent-safe), aber Stats sind
284/// in einem `Arc<HistoryCacheStats>` parallel verfuegbar — siehe
285/// [`stats`](Self::stats).
286#[derive(Debug)]
287pub struct HistoryCache {
288    changes: BTreeMap<SequenceNumber, CacheChange>,
289    kind: HistoryKind,
290    max_samples: usize,
291    evicted_count: u64,
292    stats: Arc<HistoryCacheStats>,
293    /// Optionaler Label fuer Inspect-Endpoint-Tap (R-026). Nur mit
294    /// Cargo-Feature `inspect` gesetzt; sonst phantom.
295    #[cfg(feature = "inspect")]
296    inspect_label: Option<alloc::string::String>,
297}
298
299impl Clone for HistoryCache {
300    fn clone(&self) -> Self {
301        // Jeder Klon bekommt ein **eigenes** `Arc<HistoryCacheStats>`,
302        // damit Mutationen am Klon nicht die Stats des Originals
303        // verfaelschen. Der initiale Wert wird aus dem Original
304        // uebernommen, sodass ein Cache.clone() einen aequivalenten
305        // Snapshot zeigt.
306        Self {
307            changes: self.changes.clone(),
308            kind: self.kind,
309            max_samples: self.max_samples,
310            evicted_count: self.evicted_count,
311            stats: Arc::new((*self.stats).clone()),
312            #[cfg(feature = "inspect")]
313            inspect_label: self.inspect_label.clone(),
314        }
315    }
316}
317
318impl HistoryCache {
319    /// Erzeugt einen neuen Cache. `max_samples` ist die obere Grenze:
320    /// bei `KeepAll` fuehrt Ueberschreitung zu `CapacityExceeded`, bei
321    /// `KeepLast` zu LRU-Eviction des aeltesten Samples.
322    #[must_use]
323    pub fn new_with_kind(kind: HistoryKind, max_samples: usize) -> Self {
324        Self {
325            changes: BTreeMap::new(),
326            kind,
327            max_samples,
328            evicted_count: 0,
329            stats: Arc::new(HistoryCacheStats::default()),
330            #[cfg(feature = "inspect")]
331            inspect_label: None,
332        }
333    }
334
335    /// Setzt das Inspect-Endpoint-Label fuer den RTPS-Layer-Tap.
336    /// No-op wenn Feature `inspect` aus ist.
337    #[cfg(feature = "inspect")]
338    pub fn set_inspect_label(&mut self, label: alloc::string::String) {
339        self.inspect_label = Some(label);
340    }
341
342    /// Geteilter Stats-Handle fuer **lock-freies** Monitoring.
343    ///
344    /// Konsumenten halten einen `Arc<HistoryCacheStats>` und pollen die
345    /// Atomics mit `Acquire`-Ordering. Werte spiegeln immer eine
346    /// abgeschlossene Cache-Mutation; cross-field-Konsistenz zwischen
347    /// `len` und `max_sn` ist nicht garantiert (Tear-Risiko).
348    #[must_use]
349    pub fn stats(&self) -> Arc<HistoryCacheStats> {
350        Arc::clone(&self.stats)
351    }
352
353    /// Synchronisiert die Atomics mit dem aktuellen `BTreeMap`-Zustand.
354    /// Wird von allen mutierenden Methoden am Ende aufgerufen.
355    fn refresh_stats(&self) {
356        self.stats.len.store(self.changes.len(), Ordering::Release);
357        self.stats
358            .evicted
359            .store(self.evicted_count, Ordering::Release);
360        let max = self.changes.keys().next_back().copied();
361        let min = self.changes.keys().next().copied();
362        self.stats
363            .max_sn
364            .store(encode_sn_atom(max), Ordering::Release);
365        self.stats
366            .min_sn
367            .store(encode_sn_atom(min), Ordering::Release);
368    }
369
370    /// Legacy-Konstruktor — erzeugt einen `KeepAll`-Cache mit harter
371    /// Kapazitaets-Grenze. Fuer neue Nutzer [`new_with_kind`] bevorzugen.
372    #[must_use]
373    pub fn new(max_samples: usize) -> Self {
374        Self::new_with_kind(HistoryKind::KeepAll, max_samples)
375    }
376
377    /// History-Kind dieses Caches.
378    #[must_use]
379    pub fn kind(&self) -> HistoryKind {
380        self.kind
381    }
382
383    /// Anzahl per `KeepLast`-Eviction verworfener Samples seit
384    /// Start.
385    #[must_use]
386    pub fn evicted_count(&self) -> u64 {
387        self.evicted_count
388    }
389
390    /// Fuegt ein Change ein.
391    ///
392    /// # Errors
393    /// - `CapacityExceeded`: nur bei `KeepAll`, Cache voll.
394    /// - `DuplicateSequenceNumber`: SN bereits vorhanden.
395    /// - `ZeroDepth`: `KeepLast { depth: 0 }`.
396    pub fn insert(&mut self, change: CacheChange) -> Result<(), CacheError> {
397        if self.changes.contains_key(&change.sequence_number) {
398            return Err(CacheError::DuplicateSequenceNumber);
399        }
400        let cap = self.effective_max_samples()?;
401        if self.changes.len() >= cap {
402            match self.kind {
403                HistoryKind::KeepAll => return Err(CacheError::CapacityExceeded),
404                HistoryKind::KeepLast { .. } => {
405                    // Aeltestes Sample raus (LRU nach SN-Reihenfolge).
406                    if let Some((&oldest, _)) = self.changes.iter().next() {
407                        self.changes.remove(&oldest);
408                        self.evicted_count = self.evicted_count.saturating_add(1);
409                    }
410                }
411            }
412        }
413        #[cfg(feature = "inspect")]
414        let tap_view = self.inspect_label.as_ref().map(|label| {
415            (
416                label.clone(),
417                change.sequence_number,
418                change.payload.to_vec(),
419            )
420        });
421        self.changes.insert(change.sequence_number, change);
422        self.refresh_stats();
423        #[cfg(feature = "inspect")]
424        if let Some((label, sn, payload)) = tap_view {
425            dispatch_rtps_tap(&label, sn, payload);
426        }
427        Ok(())
428    }
429
430    /// Effective max-samples = min(max_samples, depth).
431    fn effective_max_samples(&self) -> Result<usize, CacheError> {
432        match self.kind {
433            HistoryKind::KeepAll => Ok(self.max_samples),
434            HistoryKind::KeepLast { depth } => {
435                if depth == 0 {
436                    return Err(CacheError::ZeroDepth);
437                }
438                Ok(core::cmp::min(depth, self.max_samples))
439            }
440        }
441    }
442
443    /// Holt ein Change per SN.
444    #[must_use]
445    pub fn get(&self, sn: SequenceNumber) -> Option<&CacheChange> {
446        self.changes.get(&sn)
447    }
448
449    /// Entfernt alle Changes mit SN ≤ `sn`.
450    /// Liefert die Anzahl entfernter Eintraege.
451    pub fn remove_up_to(&mut self, sn: SequenceNumber) -> usize {
452        let keep = self.changes.split_off(&SequenceNumber(sn.0 + 1));
453        let removed = self.changes.len();
454        self.changes = keep;
455        self.refresh_stats();
456        removed
457    }
458
459    /// Iteriert in SN-Reihenfolge ueber Changes im Bereich `[lo, hi]`
460    /// (beide inklusiv).
461    pub fn iter_range(
462        &self,
463        lo: SequenceNumber,
464        hi: SequenceNumber,
465    ) -> impl Iterator<Item = &CacheChange> + '_ {
466        self.changes.range(lo..=hi).map(|(_, v)| v)
467    }
468
469    /// Kleinste SN im Cache.
470    #[must_use]
471    pub fn min_sn(&self) -> Option<SequenceNumber> {
472        self.changes.keys().next().copied()
473    }
474
475    /// Groesste SN im Cache.
476    #[must_use]
477    pub fn max_sn(&self) -> Option<SequenceNumber> {
478        self.changes.keys().next_back().copied()
479    }
480
481    /// Anzahl Changes.
482    #[must_use]
483    pub fn len(&self) -> usize {
484        self.changes.len()
485    }
486
487    /// True wenn keine Changes.
488    #[must_use]
489    pub fn is_empty(&self) -> bool {
490        self.changes.is_empty()
491    }
492
493    /// Maximale Kapazitaet.
494    #[must_use]
495    pub fn capacity(&self) -> usize {
496        self.max_samples
497    }
498}
499
500// ============================================================================
501// LockFreeReadHistoryCache — // ============================================================================
502
503/// `HistoryCache`-Variante mit lock-free Lese-Pfad via RCU/Copy-on-Write.
504///
505/// **** Reader (z.B. SEDP-Tick, Heartbeat-Bau,
506/// Resend-Iteration) sehen einen `Arc`-stabilen Snapshot des
507/// `BTreeMap`-Storage; sie greifen ohne weiteren Lock-Touch darauf zu.
508/// Writer serialisieren ueber einen internen [`RcuCell`]-Mutex und
509/// publizieren copy-on-write.
510///
511/// # Trade-Offs
512///
513/// * **Read-Pfad**: 1× Mutex-Acquire fuer Refcount-Inc + 0 weitere
514///   Locks. Concurrent Reader sehen denselben Arc; Read-Iteration ist
515///   im Wesentlichen Lock-Frei.
516/// * **Write-Pfad**: O(n) pro Insert/Remove, weil der `BTreeMap`-
517///   Inhalt geklont werden muss. Akzeptabel fuer kleine Caches
518///   (<= 1000 Samples). Fuer write-heavy Pfade weiter [`HistoryCache`]
519///   nutzen.
520/// * **Memory**: aktive Snapshots verbrauchen Speicher bis sie freigegeben
521///   werden (Reader-Lebensdauer). Cache-Mutationen invalidieren keine
522///   bestehenden Reader-Snapshots.
523///
524/// # Wann verwenden
525///
526/// * Discovery-Caches, die haeufig von SEDP-Tick + Match-Loops gelesen,
527///   aber nur bei `announce_publication`/`subscription` mutiert werden.
528/// * Monitoring-/Tooling-Pfade, die ohne Lock-Touch ueber den Cache
529///   iterieren wollen.
530///
531/// # Wann NICHT verwenden
532///
533/// * Reliable-Writer-Cache mit hoher Insert-Rate (jede Insert klont
534///   den ganzen BTreeMap). Da bleibt [`HistoryCache`] besser.
535///
536/// Persistente Datenstrukturen (`im::OrdMap`) wuerden den
537/// Write-Cost-Aufwand auf O(log n) druecken — das ist die
538/// natuerliche Folge-Optimierung wenn diese Variante in Production
539/// landet.
540#[cfg(feature = "std")]
541#[derive(Debug)]
542pub struct LockFreeReadHistoryCache {
543    inner: zerodds_foundation::rcu::RcuCell<LockFreeInner>,
544    stats: Arc<HistoryCacheStats>,
545}
546
547/// Innerer Zustand des [`LockFreeReadHistoryCache`]. Wird von
548/// [`LockFreeReadHistoryCache::snapshot`] als `Arc<LockFreeInner>`
549/// nach aussen gegeben — Reader iterieren ueber `changes` direkt.
550#[cfg(feature = "std")]
551#[derive(Debug, Clone)]
552pub struct LockFreeInner {
553    /// Sample-Map keyed by SequenceNumber.
554    pub changes: BTreeMap<SequenceNumber, CacheChange>,
555    /// History-QoS-Kind.
556    pub kind: HistoryKind,
557    /// Cap aus QoS.
558    pub max_samples: usize,
559    /// Eviction-Counter.
560    pub evicted_count: u64,
561}
562
563#[cfg(feature = "std")]
564impl LockFreeInner {
565    fn effective_max_samples(&self) -> Result<usize, CacheError> {
566        match self.kind {
567            HistoryKind::KeepAll => Ok(self.max_samples),
568            HistoryKind::KeepLast { depth } => {
569                if depth == 0 {
570                    return Err(CacheError::ZeroDepth);
571                }
572                Ok(core::cmp::min(depth, self.max_samples))
573            }
574        }
575    }
576}
577
578#[cfg(feature = "std")]
579impl LockFreeReadHistoryCache {
580    /// Erzeugt eine neue Lock-Free-Read-Cache.
581    #[must_use]
582    pub fn new_with_kind(kind: HistoryKind, max_samples: usize) -> Self {
583        Self {
584            inner: zerodds_foundation::rcu::RcuCell::new(LockFreeInner {
585                changes: BTreeMap::new(),
586                kind,
587                max_samples,
588                evicted_count: 0,
589            }),
590            stats: Arc::new(HistoryCacheStats::default()),
591        }
592    }
593
594    /// Legacy-Konstruktor — `KeepAll`.
595    #[must_use]
596    pub fn new(max_samples: usize) -> Self {
597        Self::new_with_kind(HistoryKind::KeepAll, max_samples)
598    }
599
600    /// Lock-free Read-Snapshot von Stats.
601    #[must_use]
602    pub fn stats(&self) -> Arc<HistoryCacheStats> {
603        Arc::clone(&self.stats)
604    }
605
606    /// Liefert einen `Arc`-Snapshot des aktuellen Cache-Stand fuer
607    /// lock-free-Iteration.
608    #[must_use]
609    pub fn snapshot(&self) -> Arc<LockFreeInner> {
610        self.inner.read()
611    }
612
613    /// History-Kind.
614    #[must_use]
615    pub fn kind(&self) -> HistoryKind {
616        self.inner.read().kind
617    }
618
619    /// Anzahl per `KeepLast`-Eviction verworfener Samples.
620    #[must_use]
621    pub fn evicted_count(&self) -> u64 {
622        self.stats.evicted.load(Ordering::Acquire)
623    }
624
625    /// Anzahl Changes (Acquire-Load des Atomic).
626    #[must_use]
627    pub fn len(&self) -> usize {
628        self.stats.len.load(Ordering::Acquire)
629    }
630
631    /// True wenn keine Changes.
632    #[must_use]
633    pub fn is_empty(&self) -> bool {
634        self.len() == 0
635    }
636
637    /// Kleinste SN aus Atom — lock-frei.
638    #[must_use]
639    pub fn min_sn(&self) -> Option<SequenceNumber> {
640        decode_sn_atom(self.stats.min_sn.load(Ordering::Acquire))
641    }
642
643    /// Groesste SN aus Atom — lock-frei.
644    #[must_use]
645    pub fn max_sn(&self) -> Option<SequenceNumber> {
646        decode_sn_atom(self.stats.max_sn.load(Ordering::Acquire))
647    }
648
649    /// Maximale Kapazitaet.
650    #[must_use]
651    pub fn capacity(&self) -> usize {
652        self.inner.read().max_samples
653    }
654
655    /// Holt ein Change per SN — geklont (CacheChange ist Arc-payload-
656    /// gewrapped, also ein Refcount-Inc).
657    #[must_use]
658    pub fn get(&self, sn: SequenceNumber) -> Option<CacheChange> {
659        self.inner.read().changes.get(&sn).cloned()
660    }
661
662    /// Sample-Snapshot im SN-Bereich `[lo, hi]`. Liefert Vec
663    /// — wir koennen keine Iter `<'a>` ueber einen Arc-Snapshot
664    /// zurueckgeben, wenn der Snapshot nicht referenziert wird.
665    #[must_use]
666    pub fn iter_range_snapshot(&self, lo: SequenceNumber, hi: SequenceNumber) -> Vec<CacheChange> {
667        let snap = self.inner.read();
668        snap.changes
669            .range(lo..=hi)
670            .map(|(_, v)| v.clone())
671            .collect()
672    }
673
674    /// Fuegt ein Change ein. Copy-on-Write des BTreeMap.
675    ///
676    /// # Errors
677    /// Wie [`HistoryCache::insert`].
678    pub fn insert(&self, change: CacheChange) -> Result<(), CacheError> {
679        // Pre-Check: BTreeMap-Klon vermeiden, wenn wir schon wissen dass
680        // der Insert fehlschlaegt (Duplicate, ZeroDepth, KeepAll-Full).
681        let dup_or_full: Result<(), CacheError> = {
682            let snap = self.inner.read();
683            if snap.changes.contains_key(&change.sequence_number) {
684                Err(CacheError::DuplicateSequenceNumber)
685            } else {
686                let cap = snap.effective_max_samples()?;
687                if snap.changes.len() >= cap {
688                    if matches!(snap.kind, HistoryKind::KeepAll) {
689                        Err(CacheError::CapacityExceeded)
690                    } else {
691                        Ok(()) // KeepLast: evict im write-with
692                    }
693                } else {
694                    Ok(())
695                }
696            }
697        };
698        dup_or_full?;
699        self.inner.modify(|inner| {
700            // Eviction-Logik: KeepLast laesst aelteste fallen.
701            let cap = match inner.effective_max_samples() {
702                Ok(c) => c,
703                Err(_) => return,
704            };
705            if inner.changes.len() >= cap {
706                if let HistoryKind::KeepLast { .. } = inner.kind {
707                    if let Some((&oldest, _)) = inner.changes.iter().next() {
708                        inner.changes.remove(&oldest);
709                        inner.evicted_count = inner.evicted_count.saturating_add(1);
710                    }
711                }
712            }
713            inner.changes.insert(change.sequence_number, change.clone());
714        });
715        self.refresh_stats();
716        Ok(())
717    }
718
719    /// Entfernt alle Changes mit SN ≤ `sn`. Liefert Anzahl entfernter.
720    pub fn remove_up_to(&self, sn: SequenceNumber) -> usize {
721        let mut removed = 0;
722        self.inner.modify(|inner| {
723            let keep = inner.changes.split_off(&SequenceNumber(sn.0 + 1));
724            removed = inner.changes.len();
725            inner.changes = keep;
726        });
727        self.refresh_stats();
728        removed
729    }
730
731    fn refresh_stats(&self) {
732        let snap = self.inner.read();
733        self.stats.len.store(snap.changes.len(), Ordering::Release);
734        self.stats
735            .evicted
736            .store(snap.evicted_count, Ordering::Release);
737        let max = snap.changes.keys().next_back().copied();
738        let min = snap.changes.keys().next().copied();
739        self.stats
740            .max_sn
741            .store(encode_sn_atom(max), Ordering::Release);
742        self.stats
743            .min_sn
744            .store(encode_sn_atom(min), Ordering::Release);
745    }
746}
747
748#[cfg(test)]
749#[allow(clippy::expect_used, clippy::unwrap_used)]
750mod tests {
751    use super::*;
752
753    fn sn(n: i64) -> SequenceNumber {
754        SequenceNumber(n)
755    }
756
757    fn alive(n: i64) -> CacheChange {
758        CacheChange::alive(sn(n), alloc::vec![n as u8])
759    }
760
761    #[test]
762    fn new_cache_is_empty() {
763        let c = HistoryCache::new(10);
764        assert_eq!(c.len(), 0);
765        assert!(c.is_empty());
766        assert_eq!(c.min_sn(), None);
767        assert_eq!(c.max_sn(), None);
768    }
769
770    #[test]
771    fn insert_and_get() {
772        let mut c = HistoryCache::new(10);
773        c.insert(alive(1)).expect("insert");
774        c.insert(alive(2)).expect("insert");
775        assert_eq!(
776            c.get(sn(1)).map(|ch| ch.payload.as_ref().to_vec()),
777            Some(alloc::vec![1])
778        );
779        assert_eq!(c.get(sn(3)), None);
780        assert_eq!(c.len(), 2);
781    }
782
783    #[test]
784    fn insert_duplicate_is_err() {
785        let mut c = HistoryCache::new(10);
786        c.insert(alive(1)).expect("insert");
787        assert_eq!(c.insert(alive(1)), Err(CacheError::DuplicateSequenceNumber));
788    }
789
790    #[test]
791    fn insert_at_capacity_is_err() {
792        let mut c = HistoryCache::new(2);
793        c.insert(alive(1)).expect("insert");
794        c.insert(alive(2)).expect("insert");
795        assert_eq!(c.insert(alive(3)), Err(CacheError::CapacityExceeded));
796    }
797
798    #[test]
799    fn min_max_sn_reflect_content() {
800        let mut c = HistoryCache::new(10);
801        c.insert(alive(5)).unwrap();
802        c.insert(alive(3)).unwrap();
803        c.insert(alive(7)).unwrap();
804        assert_eq!(c.min_sn(), Some(sn(3)));
805        assert_eq!(c.max_sn(), Some(sn(7)));
806    }
807
808    #[test]
809    fn remove_up_to_inclusive() {
810        let mut c = HistoryCache::new(10);
811        for i in 1..=5 {
812            c.insert(alive(i)).unwrap();
813        }
814        let removed = c.remove_up_to(sn(3));
815        assert_eq!(removed, 3);
816        assert_eq!(c.len(), 2);
817        assert_eq!(c.min_sn(), Some(sn(4)));
818    }
819
820    #[test]
821    fn remove_up_to_with_no_matches_is_noop() {
822        let mut c = HistoryCache::new(10);
823        c.insert(alive(10)).unwrap();
824        assert_eq!(c.remove_up_to(sn(5)), 0);
825        assert_eq!(c.len(), 1);
826    }
827
828    #[test]
829    fn iter_range_is_ordered() {
830        let mut c = HistoryCache::new(10);
831        for i in [5, 1, 3, 8, 2] {
832            c.insert(alive(i)).unwrap();
833        }
834        let collected: alloc::vec::Vec<i64> = c
835            .iter_range(sn(2), sn(5))
836            .map(|ch| ch.sequence_number.0)
837            .collect();
838        assert_eq!(collected, alloc::vec![2, 3, 5]);
839    }
840
841    #[test]
842    fn iter_range_empty_when_no_overlap() {
843        let mut c = HistoryCache::new(10);
844        c.insert(alive(1)).unwrap();
845        c.insert(alive(2)).unwrap();
846        assert_eq!(c.iter_range(sn(10), sn(20)).count(), 0);
847    }
848
849    #[test]
850    fn capacity_accessor() {
851        let c = HistoryCache::new(42);
852        assert_eq!(c.capacity(), 42);
853    }
854
855    #[test]
856    fn cache_change_alive_constructor() {
857        let ch = CacheChange::alive(sn(1), alloc::vec![1, 2, 3]);
858        assert_eq!(ch.kind, ChangeKind::Alive);
859        assert_eq!(ch.sequence_number, sn(1));
860        assert_eq!(ch.payload.as_ref(), &[1, 2, 3][..]);
861    }
862
863    // ---- §8.2.1.2 ChangeKind_t alle 4 Spec-Varianten + AliveFiltered ----
864
865    #[test]
866    fn change_kind_alive_is_relevant_and_alive() {
867        assert!(ChangeKind::Alive.is_relevant());
868        assert!(ChangeKind::Alive.is_alive_kind());
869    }
870
871    #[test]
872    fn change_kind_alive_filtered_is_alive_but_not_relevant() {
873        // Spec §8.4.10.5: AliveFiltered ist alive_kind aber !is_relevant.
874        assert!(ChangeKind::AliveFiltered.is_alive_kind());
875        assert!(!ChangeKind::AliveFiltered.is_relevant());
876    }
877
878    #[test]
879    fn change_kind_not_alive_kinds_are_not_alive() {
880        for k in [
881            ChangeKind::NotAliveDisposed,
882            ChangeKind::NotAliveUnregistered,
883            ChangeKind::NotAliveDisposedUnregistered,
884        ] {
885            assert!(!k.is_alive_kind(), "{k:?}");
886            assert!(k.is_relevant(), "{k:?}");
887        }
888    }
889
890    #[test]
891    fn change_kind_distinct_variants() {
892        // Identity-Sanity — alle 5 Varianten sind verschieden.
893        let v = [
894            ChangeKind::Alive,
895            ChangeKind::AliveFiltered,
896            ChangeKind::NotAliveDisposed,
897            ChangeKind::NotAliveUnregistered,
898            ChangeKind::NotAliveDisposedUnregistered,
899        ];
900        for (i, a) in v.iter().enumerate() {
901            for (j, b) in v.iter().enumerate() {
902                if i == j {
903                    assert_eq!(a, b);
904                } else {
905                    assert_ne!(a, b);
906                }
907            }
908        }
909    }
910
911    // ========================================================================
912    // D.4 Phase A — Atomic Stats / Lock-Free Snapshot Tests
913    // ========================================================================
914
915    #[test]
916    fn stats_default_is_empty_with_no_sn() {
917        let c = HistoryCache::new(10);
918        let snap = c.stats().snapshot();
919        assert_eq!(snap.len, 0);
920        assert_eq!(snap.evicted, 0);
921        assert_eq!(snap.max_sn, None);
922        assert_eq!(snap.min_sn, None);
923    }
924
925    #[test]
926    fn stats_track_insert_and_remove() {
927        let mut c = HistoryCache::new(10);
928        c.insert(alive(3)).unwrap();
929        c.insert(alive(5)).unwrap();
930        c.insert(alive(7)).unwrap();
931        let snap = c.stats().snapshot();
932        assert_eq!(snap.len, 3);
933        assert_eq!(snap.min_sn, Some(sn(3)));
934        assert_eq!(snap.max_sn, Some(sn(7)));
935        assert_eq!(snap.evicted, 0);
936
937        c.remove_up_to(sn(5));
938        let snap = c.stats().snapshot();
939        assert_eq!(snap.len, 1);
940        assert_eq!(snap.min_sn, Some(sn(7)));
941        assert_eq!(snap.max_sn, Some(sn(7)));
942    }
943
944    #[test]
945    fn stats_track_keeplast_eviction() {
946        let mut c = HistoryCache::new_with_kind(HistoryKind::KeepLast { depth: 2 }, 100);
947        c.insert(alive(1)).unwrap();
948        c.insert(alive(2)).unwrap();
949        c.insert(alive(3)).unwrap(); // evicts alive(1)
950        let snap = c.stats().snapshot();
951        assert_eq!(snap.len, 2);
952        assert_eq!(snap.evicted, 1);
953        assert_eq!(snap.min_sn, Some(sn(2)));
954        assert_eq!(snap.max_sn, Some(sn(3)));
955    }
956
957    #[test]
958    fn stats_arc_is_shared_across_clones_of_handle() {
959        // Mehrere `cache.stats()`-Aufrufe liefern denselben Arc — sodass
960        // ein Reader, der den Handle einmalig zieht, alle nachfolgenden
961        // Cache-Mutationen sieht.
962        let mut c = HistoryCache::new(10);
963        let s1 = c.stats();
964        let s2 = c.stats();
965        assert!(Arc::ptr_eq(&s1, &s2));
966        c.insert(alive(1)).unwrap();
967        assert_eq!(s1.snapshot().len, 1);
968        assert_eq!(s2.snapshot().len, 1);
969    }
970
971    #[test]
972    fn stats_reader_thread_sees_inserts_concurrently() {
973        // Lock-Free-Read aus einem zweiten Thread waehrend der Writer
974        // mutiert. Korrektheits-Test fuer die Acquire/Release-Ordering.
975        use std::sync::Arc as StdArc;
976        use std::sync::Mutex as StdMutex;
977        use std::thread;
978        use std::time::Duration;
979
980        let cache = StdArc::new(StdMutex::new(HistoryCache::new(2_000)));
981        let stats = cache.lock().expect("init lock").stats();
982
983        let writer_cache = StdArc::clone(&cache);
984        let writer = thread::spawn(move || {
985            for i in 1..=1_000 {
986                let mut c = writer_cache.lock().expect("write lock");
987                c.insert(alive(i)).expect("insert");
988            }
989        });
990
991        let reader_stats = StdArc::clone(&stats);
992        let reader = thread::spawn(move || {
993            // Lese 100x Stats, ohne den Writer-Lock zu nehmen.
994            for _ in 0..100 {
995                let snap = reader_stats.snapshot();
996                // len darf nur monoton wachsen waehrend Writer laeuft.
997                assert!(snap.len <= 1_000);
998                if let Some(max) = snap.max_sn {
999                    assert!(max.0 >= 1 && max.0 <= 1_000);
1000                }
1001                thread::sleep(Duration::from_micros(50));
1002            }
1003        });
1004
1005        writer.join().expect("writer joined");
1006        reader.join().expect("reader joined");
1007
1008        let final_snap = stats.snapshot();
1009        assert_eq!(final_snap.len, 1_000);
1010        assert_eq!(final_snap.max_sn, Some(sn(1_000)));
1011        assert_eq!(final_snap.min_sn, Some(sn(1)));
1012    }
1013
1014    #[test]
1015    fn clone_creates_independent_stats_handles() {
1016        // Cache.clone() darf nicht den Stats-Arc des Originals teilen,
1017        // sonst wuerden Mutationen am Klon das Original verfaelschen.
1018        let mut a = HistoryCache::new(10);
1019        a.insert(alive(1)).unwrap();
1020        let b = a.clone();
1021        assert!(!Arc::ptr_eq(&a.stats(), &b.stats()));
1022        assert_eq!(a.stats().snapshot().len, 1);
1023        assert_eq!(b.stats().snapshot().len, 1);
1024
1025        let mut a_mut = a;
1026        a_mut.insert(alive(2)).unwrap();
1027        assert_eq!(a_mut.stats().snapshot().len, 2);
1028        assert_eq!(b.stats().snapshot().len, 1, "clone unaffected");
1029    }
1030
1031    // ========================================================================
1032    // D.4 Phase C — LockFreeReadHistoryCache Tests
1033    // ========================================================================
1034
1035    #[cfg(feature = "std")]
1036    mod lock_free_tests {
1037        use super::*;
1038
1039        #[test]
1040        fn lock_free_new_is_empty() {
1041            let c = LockFreeReadHistoryCache::new(10);
1042            assert_eq!(c.len(), 0);
1043            assert!(c.is_empty());
1044            assert_eq!(c.min_sn(), None);
1045            assert_eq!(c.max_sn(), None);
1046        }
1047
1048        #[test]
1049        fn lock_free_insert_and_get() {
1050            let c = LockFreeReadHistoryCache::new(10);
1051            // Insert ohne &mut self — reine Interior-Mutability.
1052            c.insert(alive(1)).unwrap();
1053            c.insert(alive(2)).unwrap();
1054            assert_eq!(
1055                c.get(sn(1)).map(|ch| ch.payload.as_ref().to_vec()),
1056                Some(alloc::vec![1])
1057            );
1058            assert_eq!(c.get(sn(3)), None);
1059            assert_eq!(c.len(), 2);
1060        }
1061
1062        #[test]
1063        fn lock_free_min_max_lock_free_loads() {
1064            let c = LockFreeReadHistoryCache::new(10);
1065            c.insert(alive(5)).unwrap();
1066            c.insert(alive(3)).unwrap();
1067            c.insert(alive(7)).unwrap();
1068            assert_eq!(c.min_sn(), Some(sn(3)));
1069            assert_eq!(c.max_sn(), Some(sn(7)));
1070        }
1071
1072        #[test]
1073        fn lock_free_keeplast_evicts_oldest() {
1074            let c =
1075                LockFreeReadHistoryCache::new_with_kind(HistoryKind::KeepLast { depth: 2 }, 100);
1076            c.insert(alive(1)).unwrap();
1077            c.insert(alive(2)).unwrap();
1078            c.insert(alive(3)).unwrap(); // evicts 1
1079            assert_eq!(c.len(), 2);
1080            assert_eq!(c.min_sn(), Some(sn(2)));
1081            assert_eq!(c.max_sn(), Some(sn(3)));
1082            assert_eq!(c.evicted_count(), 1);
1083        }
1084
1085        #[test]
1086        fn lock_free_keepall_full_rejects() {
1087            let c = LockFreeReadHistoryCache::new(2);
1088            c.insert(alive(1)).unwrap();
1089            c.insert(alive(2)).unwrap();
1090            assert_eq!(c.insert(alive(3)), Err(CacheError::CapacityExceeded));
1091        }
1092
1093        #[test]
1094        fn lock_free_duplicate_sn_rejected() {
1095            let c = LockFreeReadHistoryCache::new(10);
1096            c.insert(alive(1)).unwrap();
1097            assert_eq!(c.insert(alive(1)), Err(CacheError::DuplicateSequenceNumber));
1098        }
1099
1100        #[test]
1101        fn lock_free_remove_up_to() {
1102            let c = LockFreeReadHistoryCache::new(10);
1103            for i in 1..=5 {
1104                c.insert(alive(i)).unwrap();
1105            }
1106            let removed = c.remove_up_to(sn(3));
1107            assert_eq!(removed, 3);
1108            assert_eq!(c.len(), 2);
1109            assert_eq!(c.min_sn(), Some(sn(4)));
1110        }
1111
1112        #[test]
1113        fn lock_free_iter_range_snapshot() {
1114            let c = LockFreeReadHistoryCache::new(10);
1115            for i in 1..=5 {
1116                c.insert(alive(i)).unwrap();
1117            }
1118            let mid: alloc::vec::Vec<_> = c
1119                .iter_range_snapshot(sn(2), sn(4))
1120                .iter()
1121                .map(|ch| ch.sequence_number)
1122                .collect();
1123            assert_eq!(mid, alloc::vec![sn(2), sn(3), sn(4)]);
1124        }
1125
1126        #[test]
1127        fn lock_free_snapshot_outlives_writes() {
1128            // Snapshot-API-Garantie: ein Reader-Arc bleibt unveraendert,
1129            // auch wenn der Writer den Cache spaeter mutiert.
1130            let c = LockFreeReadHistoryCache::new(10);
1131            c.insert(alive(1)).unwrap();
1132            let snap = c.snapshot();
1133            assert_eq!(snap.changes.len(), 1);
1134
1135            c.insert(alive(2)).unwrap();
1136            c.insert(alive(3)).unwrap();
1137            // Original-Snapshot: immer noch nur SN 1.
1138            assert_eq!(snap.changes.len(), 1);
1139            assert!(snap.changes.contains_key(&sn(1)));
1140            // Live-Cache: 3 Eintraege.
1141            assert_eq!(c.len(), 3);
1142        }
1143
1144        #[test]
1145        fn lock_free_concurrent_readers_writers_smoke() {
1146            use std::sync::Arc as StdArc;
1147            use std::thread;
1148
1149            let cache: StdArc<LockFreeReadHistoryCache> =
1150                StdArc::new(LockFreeReadHistoryCache::new(2_000));
1151            let cache_w = StdArc::clone(&cache);
1152            let writer = thread::spawn(move || {
1153                for i in 1..=500 {
1154                    cache_w.insert(alive(i)).expect("insert");
1155                }
1156            });
1157
1158            let cache_r = StdArc::clone(&cache);
1159            let reader = thread::spawn(move || {
1160                for _ in 0..200 {
1161                    let snap = cache_r.snapshot();
1162                    // Snapshot ist intern konsistent: changes.len matches
1163                    // den Range zwischen min und max.
1164                    if let (Some(min), Some(max)) = (
1165                        snap.changes.keys().next().copied(),
1166                        snap.changes.keys().next_back().copied(),
1167                    ) {
1168                        let inferred_count = (max.0 - min.0 + 1) as usize;
1169                        assert!(
1170                            snap.changes.len() <= inferred_count,
1171                            "snapshot inkonsistent"
1172                        );
1173                    }
1174                }
1175            });
1176
1177            writer.join().expect("writer joined");
1178            reader.join().expect("reader joined");
1179
1180            assert_eq!(cache.len(), 500);
1181            assert_eq!(cache.max_sn(), Some(sn(500)));
1182        }
1183    }
1184}