Skip to main content

commonware_storage/cache/
mod.rs

1//! A prunable cache for ordered data with index-based lookups.
2//!
3//! Data is stored in [crate::journal::segmented::variable::Journal] (an append-only log) and the location of
4//! written data is tracked in-memory by index to enable **single-read lookups** for cached data.
5//!
6//! Unlike [crate::archive::Archive], the [Cache] is optimized for simplicity and does
7//! not support key-based lookups (only index-based access is provided). This makes it ideal for
8//! caching sequential data where you know the exact index of the item you want to retrieve.
9//!
10//! # Memory Overhead
11//!
12//! [Cache] maintains a single in-memory map to track the location of each index item. The memory
13//! used to track each item is `8 + 4 + 4` bytes (where `8` is the index, `4` is the offset, and
14//! `4` is the length). This results in approximately `16` bytes of memory overhead per cached item.
15//!
16//! # Pruning
17//!
18//! [Cache] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
19//! called on a `section`, all interaction with a `section` less than the pruned `section` will
20//! return an error. The pruning granularity is determined by `items_per_blob` in the configuration.
21//!
22//! # Single Operation Reads
23//!
24//! To enable single operation reads (i.e. reading all of an item in a single call to
25//! [commonware_runtime::Blob]), [Cache] stores the length of each item in its in-memory index.
26//! This ensures that reading a cached item requires only one disk operation.
27//!
28//! # Compression
29//!
30//! [Cache] supports compressing data before storing it on disk. This can be enabled by setting
31//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
32//! can be changed between initializations of [Cache], however, it must remain populated if any
33//! data was written with compression enabled.
34//!
35//! # Querying for Gaps
36//!
37//! [Cache] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
38//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
39//!
40//! # Example
41//!
42//! ```rust
43//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
44//! use commonware_storage::cache::{Cache, Config};
45//! use commonware_utils::{NZUsize, NZU16, NZU64};
46//!
47//! let executor = deterministic::Runner::default();
48//! executor.start(|context| async move {
49//!     // Create a cache
50//!     let cfg = Config {
51//!         partition: "cache".into(),
52//!         compression: Some(3),
53//!         codec_config: (),
54//!         items_per_blob: NZU64!(1024),
55//!         write_buffer: NZUsize!(1024 * 1024),
56//!         replay_buffer: NZUsize!(4096),
57//!         page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
58//!     };
59//!     let mut cache = Cache::init(context, cfg).await.unwrap();
60//!
61//!     // Put data at index
62//!     cache.put(1, 100u32).await.unwrap();
63//!
64//!     // Get data by index
65//!     let data: Option<u32> = cache.get(1).await.unwrap();
66//!     assert_eq!(data, Some(100));
67//!
68//!     // Check for gaps in the index space
69//!     cache.put(10, 200u32).await.unwrap();
70//!     let (current_end, start_next) = cache.next_gap(5);
71//!     assert!(current_end.is_none());
72//!     assert_eq!(start_next, Some(10));
73//!
74//!     // Sync the cache
75//!     cache.sync().await.unwrap();
76//! });
77//! ```
78
79use commonware_runtime::buffer::paged::CacheRef;
80use std::num::{NonZeroU64, NonZeroUsize};
81use thiserror::Error;
82
83mod storage;
84pub use storage::Cache;
85
86/// Errors that can occur when interacting with the cache.
87#[derive(Debug, Error)]
88pub enum Error {
89    #[error("journal error: {0}")]
90    Journal(#[from] crate::journal::Error),
91    #[error("record corrupted")]
92    RecordCorrupted,
93    #[error("already pruned to: {0}")]
94    AlreadyPrunedTo(u64),
95    #[error("record too large")]
96    RecordTooLarge,
97}
98
99/// Configuration for [Cache] storage.
100#[derive(Clone)]
101pub struct Config<C> {
102    /// The partition to use for the cache's [crate::journal] storage.
103    pub partition: String,
104
105    /// The compression level to use for the cache's [crate::journal] storage.
106    pub compression: Option<u8>,
107
108    /// The [commonware_codec::Codec] configuration to use for the value stored in the cache.
109    pub codec_config: C,
110
111    /// The number of items per section (the granularity of pruning).
112    pub items_per_blob: NonZeroU64,
113
114    /// The amount of bytes that can be buffered in a section before being written to a
115    /// [commonware_runtime::Blob].
116    pub write_buffer: NonZeroUsize,
117
118    /// The buffer size to use when replaying a [commonware_runtime::Blob].
119    pub replay_buffer: NonZeroUsize,
120
121    /// The page cache to use for the underlying [crate::journal] storage.
122    pub page_cache: CacheRef,
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use crate::journal::Error as JournalError;
129    use commonware_macros::{test_group, test_traced};
130    use commonware_runtime::{
131        deterministic, telemetry::metrics::has_metric_value, Metrics as _, Runner, Supervisor as _,
132    };
133    use commonware_utils::{NZUsize, NZU16, NZU64};
134    use rand::Rng;
135    use std::{collections::BTreeMap, num::NonZeroU16};
136
137    const DEFAULT_ITEMS_PER_BLOB: u64 = 65536;
138    const DEFAULT_WRITE_BUFFER: usize = 1024;
139    const DEFAULT_REPLAY_BUFFER: usize = 4096;
140    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
141    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
142
143    #[test_traced]
144    fn test_cache_compression_then_none() {
145        // Initialize the deterministic context
146        let executor = deterministic::Runner::default();
147        executor.start(|context| async move {
148            // Initialize the cache
149            let cfg = Config {
150                partition: "test-partition".into(),
151                codec_config: (),
152                compression: Some(3),
153                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
154                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
155                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
156                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
157            };
158            let mut cache = Cache::init(context.child("first"), cfg.clone())
159                .await
160                .expect("Failed to initialize cache");
161
162            // Put the data
163            let index = 1u64;
164            let data = 1;
165            cache.put(index, data).await.expect("Failed to put data");
166
167            // Sync and drop the cache
168            cache.sync().await.expect("Failed to sync cache");
169            drop(cache);
170
171            // Initialize the cache again without compression
172            let cfg = Config {
173                partition: "test-partition".into(),
174                codec_config: (),
175                compression: None,
176                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
177                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
178                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
179                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
180            };
181            let result = Cache::<_, i32>::init(context.child("second"), cfg.clone()).await;
182            assert!(matches!(
183                result,
184                Err(Error::Journal(JournalError::Codec(_)))
185            ));
186        });
187    }
188
189    #[test_traced]
190    fn test_cache_prune() {
191        // Initialize the deterministic context
192        let executor = deterministic::Runner::default();
193        executor.start(|context| async move {
194            // Initialize the cache
195            let cfg = Config {
196                partition: "test-partition".into(),
197                codec_config: (),
198                compression: None,
199                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
200                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
201                items_per_blob: NZU64!(1), // no mask - each item is its own section
202                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
203            };
204            let mut cache = Cache::init(context.child("storage"), cfg.clone())
205                .await
206                .expect("Failed to initialize cache");
207
208            // Insert multiple items across different sections
209            let items = vec![(1u64, 1), (2u64, 2), (3u64, 3), (4u64, 4), (5u64, 5)];
210            for (index, data) in &items {
211                cache.put(*index, *data).await.expect("Failed to put data");
212            }
213            assert_eq!(cache.first(), Some(1));
214
215            // Check metrics
216            let buffer = context.encode();
217            assert!(has_metric_value(&buffer, "items_tracked", 5));
218
219            // Prune sections less than 3
220            cache.prune(3).await.expect("Failed to prune");
221
222            // Ensure items 1 and 2 are no longer present
223            for (index, data) in items {
224                let retrieved = cache.get(index).await.expect("Failed to get data");
225                if index < 3 {
226                    assert!(retrieved.is_none());
227                } else {
228                    assert_eq!(retrieved.expect("Data not found"), data);
229                }
230            }
231            assert_eq!(cache.first(), Some(3));
232
233            // Check metrics
234            let buffer = context.encode();
235            assert!(has_metric_value(&buffer, "items_tracked", 3));
236
237            // Try to prune older section
238            cache.prune(2).await.expect("Failed to prune");
239            assert_eq!(cache.first(), Some(3));
240
241            // Try to prune current section again
242            cache.prune(3).await.expect("Failed to prune");
243            assert_eq!(cache.first(), Some(3));
244
245            // Try to put older index
246            let result = cache.put(1, 1).await;
247            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
248        });
249    }
250
251    fn test_cache_restart(num_items: usize) -> String {
252        // Initialize the deterministic context
253        let executor = deterministic::Runner::default();
254        executor.start(|mut context| async move {
255            // Initialize the cache
256            let items_per_blob = 256u64;
257            let cfg = Config {
258                partition: "test-partition".into(),
259                codec_config: (),
260                compression: None,
261                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
262                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
263                items_per_blob: NZU64!(items_per_blob),
264                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
265            };
266            let mut cache = Cache::init(
267                context.child("init").with_attribute("index", 1),
268                cfg.clone(),
269            )
270            .await
271            .expect("Failed to initialize cache");
272
273            // Insert multiple items
274            let mut items = BTreeMap::new();
275            while items.len() < num_items {
276                let index = items.len() as u64;
277                let mut data = [0u8; 1024];
278                context.fill(&mut data);
279                items.insert(index, data);
280
281                cache.put(index, data).await.expect("Failed to put data");
282            }
283
284            // Ensure all items can be retrieved
285            for (index, data) in &items {
286                let retrieved = cache
287                    .get(*index)
288                    .await
289                    .expect("Failed to get data")
290                    .expect("Data not found");
291                assert_eq!(retrieved, *data);
292            }
293
294            // Check metrics
295            let buffer = context.encode();
296            assert!(has_metric_value(&buffer, "items_tracked", num_items));
297
298            // Sync and drop the cache
299            cache.sync().await.expect("Failed to sync cache");
300            drop(cache);
301
302            // Reinitialize the cache
303            let cfg = Config {
304                partition: "test-partition".into(),
305                codec_config: (),
306                compression: None,
307                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
308                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
309                items_per_blob: NZU64!(items_per_blob),
310                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
311            };
312            let mut cache = Cache::<_, [u8; 1024]>::init(
313                context.child("init").with_attribute("index", 2),
314                cfg.clone(),
315            )
316            .await
317            .expect("Failed to initialize cache");
318
319            // Ensure all items can be retrieved
320            for (index, data) in &items {
321                let retrieved = cache
322                    .get(*index)
323                    .await
324                    .expect("Failed to get data")
325                    .expect("Data not found");
326                assert_eq!(&retrieved, data);
327            }
328
329            // Prune first half
330            let min = (items.len() / 2) as u64;
331            cache.prune(min).await.expect("Failed to prune");
332
333            // Ensure all items can be retrieved that haven't been pruned
334            let min = (min / items_per_blob) * items_per_blob;
335            let mut removed = 0;
336            for (index, data) in items {
337                if index >= min {
338                    let retrieved = cache
339                        .get(index)
340                        .await
341                        .expect("Failed to get data")
342                        .expect("Data not found");
343                    assert_eq!(retrieved, data);
344                } else {
345                    let retrieved = cache.get(index).await.expect("Failed to get data");
346                    assert!(retrieved.is_none());
347                    removed += 1;
348                }
349            }
350
351            // Check metrics
352            let buffer = context.encode();
353            assert!(has_metric_value(
354                &buffer,
355                "items_tracked",
356                num_items - removed
357            ));
358
359            context.auditor().state()
360        })
361    }
362
363    #[test_group("slow")]
364    #[test_traced]
365    fn test_cache_many_items_and_restart() {
366        test_cache_restart(100_000);
367    }
368
369    #[test_group("slow")]
370    #[test_traced]
371    fn test_determinism() {
372        let state1 = test_cache_restart(5_000);
373        let state2 = test_cache_restart(5_000);
374        assert_eq!(state1, state2);
375    }
376
377    #[test_traced]
378    fn test_cache_next_gap() {
379        let executor = deterministic::Runner::default();
380        executor.start(|context| async move {
381            let cfg = Config {
382                partition: "test-partition".into(),
383                codec_config: (),
384                compression: None,
385                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
386                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
387                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
388                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
389            };
390            let mut cache = Cache::init(context.child("storage"), cfg.clone())
391                .await
392                .expect("Failed to initialize cache");
393
394            // Check first
395            assert_eq!(cache.first(), None);
396
397            // Insert values with gaps
398            cache.put(1, 1).await.unwrap();
399            cache.put(10, 10).await.unwrap();
400            cache.put(11, 11).await.unwrap();
401            cache.put(14, 14).await.unwrap();
402
403            // Check gaps
404            let (current_end, start_next) = cache.next_gap(0);
405            assert!(current_end.is_none());
406            assert_eq!(start_next, Some(1));
407            assert_eq!(cache.first(), Some(1));
408
409            let (current_end, start_next) = cache.next_gap(1);
410            assert_eq!(current_end, Some(1));
411            assert_eq!(start_next, Some(10));
412
413            let (current_end, start_next) = cache.next_gap(10);
414            assert_eq!(current_end, Some(11));
415            assert_eq!(start_next, Some(14));
416
417            let (current_end, start_next) = cache.next_gap(11);
418            assert_eq!(current_end, Some(11));
419            assert_eq!(start_next, Some(14));
420
421            let (current_end, start_next) = cache.next_gap(12);
422            assert!(current_end.is_none());
423            assert_eq!(start_next, Some(14));
424
425            let (current_end, start_next) = cache.next_gap(14);
426            assert_eq!(current_end, Some(14));
427            assert!(start_next.is_none());
428        });
429    }
430
431    #[test_traced]
432    fn test_cache_missing_items() {
433        let executor = deterministic::Runner::default();
434        executor.start(|context| async move {
435            let cfg = Config {
436                partition: "test-partition".into(),
437                codec_config: (),
438                compression: None,
439                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
440                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
441                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
442                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
443            };
444            let mut cache = Cache::init(context.child("storage"), cfg.clone())
445                .await
446                .expect("Failed to initialize cache");
447
448            // Test 1: Empty cache - should return no items
449            assert_eq!(cache.first(), None);
450            assert_eq!(cache.missing_items(0, 5), Vec::<u64>::new());
451            assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
452
453            // Test 2: Insert values with gaps
454            cache.put(1, 1).await.unwrap();
455            cache.put(2, 2).await.unwrap();
456            cache.put(5, 5).await.unwrap();
457            cache.put(6, 6).await.unwrap();
458            cache.put(10, 10).await.unwrap();
459
460            // Test 3: Find missing items from the beginning
461            assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
462            assert_eq!(cache.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
463            assert_eq!(cache.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
464
465            // Test 4: Find missing items from within a gap
466            assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
467            assert_eq!(cache.missing_items(4, 2), vec![4, 7]);
468
469            // Test 5: Find missing items from within a range
470            assert_eq!(cache.missing_items(1, 3), vec![3, 4, 7]);
471            assert_eq!(cache.missing_items(2, 4), vec![3, 4, 7, 8]);
472            assert_eq!(cache.missing_items(5, 2), vec![7, 8]);
473
474            // Test 6: Find missing items after the last range (no more gaps)
475            assert_eq!(cache.missing_items(11, 5), Vec::<u64>::new());
476            assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
477
478            // Test 7: Large gap scenario
479            cache.put(1000, 1000).await.unwrap();
480
481            // Gap between 10 and 1000
482            let items = cache.missing_items(11, 10);
483            assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
484
485            // Request more items than available in gap
486            let items = cache.missing_items(990, 15);
487            assert_eq!(
488                items,
489                vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
490            );
491
492            // Test 8: After syncing (data should remain consistent)
493            cache.sync().await.unwrap();
494            assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
495            assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
496
497            // Test 9: Cross-section boundary scenario
498            cache.put(DEFAULT_ITEMS_PER_BLOB - 1, 99).await.unwrap();
499            cache.put(DEFAULT_ITEMS_PER_BLOB + 1, 101).await.unwrap();
500
501            // Find missing items across section boundary
502            let items = cache.missing_items(DEFAULT_ITEMS_PER_BLOB - 2, 5);
503            assert_eq!(
504                items,
505                vec![DEFAULT_ITEMS_PER_BLOB - 2, DEFAULT_ITEMS_PER_BLOB]
506            );
507        });
508    }
509
510    #[test_traced]
511    fn test_cache_intervals_after_restart() {
512        let executor = deterministic::Runner::default();
513        executor.start(|context| async move {
514            let cfg = Config {
515                partition: "test-partition".into(),
516                codec_config: (),
517                compression: None,
518                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
519                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
520                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
521                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
522            };
523
524            // Insert data and sync
525            {
526                let mut cache = Cache::init(context.child("first"), cfg.clone())
527                    .await
528                    .expect("Failed to initialize cache");
529
530                cache.put(0, 0).await.expect("Failed to put data");
531                cache.put(100, 100).await.expect("Failed to put data");
532                cache.put(1000, 1000).await.expect("Failed to put data");
533
534                cache.sync().await.expect("Failed to sync cache");
535            }
536
537            // Reopen and verify intervals are preserved
538            {
539                let cache = Cache::<_, i32>::init(context.child("second"), cfg.clone())
540                    .await
541                    .expect("Failed to initialize cache");
542
543                // Check gaps are preserved
544                let (current_end, start_next) = cache.next_gap(0);
545                assert_eq!(current_end, Some(0));
546                assert_eq!(start_next, Some(100));
547
548                let (current_end, start_next) = cache.next_gap(100);
549                assert_eq!(current_end, Some(100));
550                assert_eq!(start_next, Some(1000));
551
552                // Check missing items
553                let items = cache.missing_items(1, 5);
554                assert_eq!(items, vec![1, 2, 3, 4, 5]);
555            }
556        });
557    }
558
559    #[test_traced]
560    fn test_cache_intervals_with_pruning() {
561        let executor = deterministic::Runner::default();
562        executor.start(|context| async move {
563            let cfg = Config {
564                partition: "test-partition".into(),
565                codec_config: (),
566                compression: None,
567                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
568                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
569                items_per_blob: NZU64!(100), // Smaller sections for easier testing
570                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
571            };
572            let mut cache = Cache::init(context.child("storage"), cfg.clone())
573                .await
574                .expect("Failed to initialize cache");
575
576            // Insert values across multiple sections
577            cache.put(50, 50).await.unwrap();
578            cache.put(150, 150).await.unwrap();
579            cache.put(250, 250).await.unwrap();
580            cache.put(350, 350).await.unwrap();
581
582            // Check gaps before pruning
583            let (current_end, start_next) = cache.next_gap(0);
584            assert!(current_end.is_none());
585            assert_eq!(start_next, Some(50));
586
587            // Prune sections less than 200
588            cache.prune(200).await.expect("Failed to prune");
589
590            // Check that pruned indices are not accessible
591            assert!(!cache.has(50));
592            assert!(!cache.has(150));
593
594            // Check gaps after pruning - should not include pruned ranges
595            let (current_end, start_next) = cache.next_gap(200);
596            assert!(current_end.is_none());
597            assert_eq!(start_next, Some(250));
598
599            // Missing items should not include pruned ranges
600            let items = cache.missing_items(200, 5);
601            assert_eq!(items, vec![200, 201, 202, 203, 204]);
602
603            // Verify remaining data is still accessible
604            assert!(cache.has(250));
605            assert!(cache.has(350));
606            assert_eq!(cache.get(250).await.unwrap(), Some(250));
607            assert_eq!(cache.get(350).await.unwrap(), Some(350));
608        });
609    }
610
611    #[test_traced]
612    fn test_cache_sparse_indices() {
613        let executor = deterministic::Runner::default();
614        executor.start(|context| async move {
615            let cfg = Config {
616                partition: "test-partition".into(),
617                codec_config: (),
618                compression: None,
619                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
620                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
621                items_per_blob: NZU64!(100), // Smaller sections for testing
622                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
623            };
624            let mut cache = Cache::init(context.child("storage"), cfg.clone())
625                .await
626                .expect("Failed to initialize cache");
627
628            // Insert sparse values
629            let indices = vec![
630                (0u64, 0),
631                (99u64, 99),   // End of first section
632                (100u64, 100), // Start of second section
633                (500u64, 500), // Start of sixth section
634            ];
635
636            for (index, value) in &indices {
637                cache.put(*index, *value).await.expect("Failed to put data");
638            }
639
640            // Check that intermediate indices don't exist
641            assert!(!cache.has(1));
642            assert!(!cache.has(50));
643            assert!(!cache.has(101));
644            assert!(!cache.has(499));
645
646            // Verify gap detection works correctly
647            let (current_end, start_next) = cache.next_gap(50);
648            assert!(current_end.is_none());
649            assert_eq!(start_next, Some(99));
650
651            let (current_end, start_next) = cache.next_gap(99);
652            assert_eq!(current_end, Some(100));
653            assert_eq!(start_next, Some(500));
654
655            // Sync and verify
656            cache.sync().await.expect("Failed to sync");
657
658            for (index, value) in &indices {
659                let retrieved = cache
660                    .get(*index)
661                    .await
662                    .expect("Failed to get data")
663                    .expect("Data not found");
664                assert_eq!(retrieved, *value);
665            }
666        });
667    }
668
669    #[test_traced]
670    fn test_cache_intervals_edge_cases() {
671        let executor = deterministic::Runner::default();
672        executor.start(|context| async move {
673            let cfg = Config {
674                partition: "test-partition".into(),
675                codec_config: (),
676                compression: None,
677                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
678                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
679                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
680                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
681            };
682            let mut cache = Cache::init(context.child("storage"), cfg.clone())
683                .await
684                .expect("Failed to initialize cache");
685
686            // Test edge case: single item
687            cache.put(42, 42).await.unwrap();
688
689            let (current_end, start_next) = cache.next_gap(42);
690            assert_eq!(current_end, Some(42));
691            assert!(start_next.is_none());
692
693            let (current_end, start_next) = cache.next_gap(41);
694            assert!(current_end.is_none());
695            assert_eq!(start_next, Some(42));
696
697            let (current_end, start_next) = cache.next_gap(43);
698            assert!(current_end.is_none());
699            assert!(start_next.is_none());
700
701            // Test edge case: consecutive items
702            cache.put(43, 43).await.unwrap();
703            cache.put(44, 44).await.unwrap();
704
705            let (current_end, start_next) = cache.next_gap(42);
706            assert_eq!(current_end, Some(44));
707            assert!(start_next.is_none());
708
709            // Test edge case: boundary values
710            cache.put(u64::MAX - 1, 999).await.unwrap();
711
712            let (current_end, start_next) = cache.next_gap(u64::MAX - 2);
713            assert!(current_end.is_none());
714            assert_eq!(start_next, Some(u64::MAX - 1));
715
716            let (current_end, start_next) = cache.next_gap(u64::MAX - 1);
717            assert_eq!(current_end, Some(u64::MAX - 1));
718            assert!(start_next.is_none());
719        });
720    }
721
722    #[test_traced]
723    fn test_cache_intervals_duplicate_inserts() {
724        let executor = deterministic::Runner::default();
725        executor.start(|context| async move {
726            let cfg = Config {
727                partition: "test-partition".into(),
728                codec_config: (),
729                compression: None,
730                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
731                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
732                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
733                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
734            };
735            let mut cache = Cache::init(context.child("storage"), cfg.clone())
736                .await
737                .expect("Failed to initialize cache");
738
739            // Insert initial value
740            cache.put(10, 10).await.unwrap();
741            assert!(cache.has(10));
742            assert_eq!(cache.get(10).await.unwrap(), Some(10));
743
744            // Try to insert duplicate - should be no-op
745            cache.put(10, 20).await.unwrap();
746            assert!(cache.has(10));
747            assert_eq!(cache.get(10).await.unwrap(), Some(10)); // Should still be original value
748
749            // Verify intervals are correct
750            let (current_end, start_next) = cache.next_gap(10);
751            assert_eq!(current_end, Some(10));
752            assert!(start_next.is_none());
753
754            // Insert adjacent values
755            cache.put(9, 9).await.unwrap();
756            cache.put(11, 11).await.unwrap();
757
758            // Verify intervals updated correctly
759            let (current_end, start_next) = cache.next_gap(9);
760            assert_eq!(current_end, Some(11));
761            assert!(start_next.is_none());
762        });
763    }
764}