commonware_storage/cache/
mod.rs

1//! A prunable cache for ordered data with index-based lookups.
2//!
3//! Data is stored in [crate::journal::variable::Journal] (an append-only log) and the location of
4//! written data is tracked in-memory by index to enable **single-read lookups** for cached data.
5//!
6//! Unlike [crate::archive::Archive], the [Cache] is optimized for simplicity and does
7//! not support key-based lookups (only index-based access is provided). This makes it ideal for
8//! caching sequential data where you know the exact index of the item you want to retrieve.
9//!
10//! # Memory Overhead
11//!
12//! [Cache] maintains a single in-memory map to track the location of each index item. The memory
13//! used to track each item is `8 + 4 + 4` bytes (where `8` is the index, `4` is the offset, and
14//! `4` is the length). This results in approximately `16` bytes of memory overhead per cached item.
15//!
16//! # Pruning
17//!
18//! [Cache] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
19//! called on a `section`, all interaction with a `section` less than the pruned `section` will
20//! return an error. The pruning granularity is determined by `items_per_blob` in the configuration.
21//!
22//! # Single Operation Reads
23//!
24//! To enable single operation reads (i.e. reading all of an item in a single call to
25//! [commonware_runtime::Blob]), [Cache] stores the length of each item in its in-memory index.
26//! This ensures that reading a cached item requires only one disk operation.
27//!
28//! # Compression
29//!
30//! [Cache] supports compressing data before storing it on disk. This can be enabled by setting
31//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
32//! can be changed between initializations of [Cache], however, it must remain populated if any
33//! data was written with compression enabled.
34//!
35//! # Querying for Gaps
36//!
37//! [Cache] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
38//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
39//!
40//! # Example
41//!
42//! ```rust
43//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
44//! use commonware_storage::cache::{Cache, Config};
45//! use commonware_utils::{NZUsize, NZU64};
46//!
47//! let executor = deterministic::Runner::default();
48//! executor.start(|context| async move {
49//!     // Create a cache
50//!     let cfg = Config {
51//!         partition: "cache".into(),
52//!         compression: Some(3),
53//!         codec_config: (),
54//!         items_per_blob: NZU64!(1024),
55//!         write_buffer: NZUsize!(1024 * 1024),
56//!         replay_buffer: NZUsize!(4096),
57//!         buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
58//!     };
59//!     let mut cache = Cache::init(context, cfg).await.unwrap();
60//!
61//!     // Put data at index
62//!     cache.put(1, 100u32).await.unwrap();
63//!
64//!     // Get data by index
65//!     let data: Option<u32> = cache.get(1).await.unwrap();
66//!     assert_eq!(data, Some(100));
67//!
68//!     // Check for gaps in the index space
69//!     cache.put(10, 200u32).await.unwrap();
70//!     let (current_end, start_next) = cache.next_gap(5);
71//!     assert!(current_end.is_none());
72//!     assert_eq!(start_next, Some(10));
73//!
74//!     // Close the cache (also closes the journal)
75//!     cache.close().await.unwrap();
76//! });
77//! ```
78
79use commonware_runtime::buffer::PoolRef;
80use std::num::{NonZeroU64, NonZeroUsize};
81use thiserror::Error;
82
83mod storage;
84pub use storage::Cache;
85
86/// Errors that can occur when interacting with the cache.
87#[derive(Debug, Error)]
88pub enum Error {
89    #[error("journal error: {0}")]
90    Journal(#[from] crate::journal::Error),
91    #[error("record corrupted")]
92    RecordCorrupted,
93    #[error("already pruned to: {0}")]
94    AlreadyPrunedTo(u64),
95    #[error("record too large")]
96    RecordTooLarge,
97}
98
99/// Configuration for [Cache] storage.
100#[derive(Clone)]
101pub struct Config<C> {
102    /// The partition to use for the cache's [crate::journal] storage.
103    pub partition: String,
104
105    /// The compression level to use for the cache's [crate::journal] storage.
106    pub compression: Option<u8>,
107
108    /// The [commonware_codec::Codec] configuration to use for the value stored in the cache.
109    pub codec_config: C,
110
111    /// The number of items per section (the granularity of pruning).
112    pub items_per_blob: NonZeroU64,
113
114    /// The amount of bytes that can be buffered in a section before being written to a
115    /// [commonware_runtime::Blob].
116    pub write_buffer: NonZeroUsize,
117
118    /// The buffer size to use when replaying a [commonware_runtime::Blob].
119    pub replay_buffer: NonZeroUsize,
120
121    /// The buffer pool to use for the cache's [crate::journal] storage.
122    pub buffer_pool: PoolRef,
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use crate::journal::Error as JournalError;
129    use commonware_codec::{varint::UInt, EncodeSize};
130    use commonware_macros::test_traced;
131    use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
132    use commonware_utils::{NZUsize, NZU64};
133    use rand::Rng;
134    use std::collections::BTreeMap;
135
136    const DEFAULT_ITEMS_PER_BLOB: u64 = 65536;
137    const DEFAULT_WRITE_BUFFER: usize = 1024;
138    const DEFAULT_REPLAY_BUFFER: usize = 4096;
139    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
140    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
141
142    #[test_traced]
143    fn test_cache_compression_then_none() {
144        // Initialize the deterministic context
145        let executor = deterministic::Runner::default();
146        executor.start(|context| async move {
147            // Initialize the cache
148            let cfg = Config {
149                partition: "test_partition".into(),
150                codec_config: (),
151                compression: Some(3),
152                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
153                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
154                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
155                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
156            };
157            let mut cache = Cache::init(context.clone(), cfg.clone())
158                .await
159                .expect("Failed to initialize cache");
160
161            // Put the data
162            let index = 1u64;
163            let data = 1;
164            cache.put(index, data).await.expect("Failed to put data");
165
166            // Close the cache
167            cache.close().await.expect("Failed to close cache");
168
169            // Initialize the cache again without compression
170            let cfg = Config {
171                partition: "test_partition".into(),
172                codec_config: (),
173                compression: None,
174                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
175                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
176                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
177                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
178            };
179            let result = Cache::<_, i32>::init(context, cfg.clone()).await;
180            assert!(matches!(
181                result,
182                Err(Error::Journal(JournalError::Codec(_)))
183            ));
184        });
185    }
186
187    #[test_traced]
188    fn test_cache_record_corruption() {
189        // Initialize the deterministic context
190        let executor = deterministic::Runner::default();
191        executor.start(|context| async move {
192            // Initialize the cache
193            let cfg = Config {
194                partition: "test_partition".into(),
195                codec_config: (),
196                compression: None,
197                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
198                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
199                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
200                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
201            };
202            let mut cache = Cache::init(context.clone(), cfg.clone())
203                .await
204                .expect("Failed to initialize cache");
205
206            let index = 1u64;
207            let data = 1;
208
209            // Put the data
210            cache
211                .put(index, data)
212                .await
213                .expect("Failed to put data");
214
215            // Close the cache
216            cache.close().await.expect("Failed to close cache");
217
218            // Corrupt the value
219            let section = (index / DEFAULT_ITEMS_PER_BLOB) * DEFAULT_ITEMS_PER_BLOB;
220            let (blob, _) = context
221                .open("test_partition", &section.to_be_bytes())
222                .await
223                .unwrap();
224            let value_location = 4 /* journal size */ + UInt(1u64).encode_size() as u64 /* index */ + 4 /* value length */;
225            blob.write_at(b"testdaty".to_vec(), value_location).await.unwrap();
226            blob.sync().await.unwrap();
227
228            // Initialize the cache again
229            let cache = Cache::<_, i32>::init(
230                context,
231                Config {
232                    partition: "test_partition".into(),
233                    codec_config: (),
234                    compression: None,
235                    write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
236                    replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
237                    items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
238                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
239                },
240            )
241            .await.expect("Failed to initialize cache");
242
243            // Check that the cache is empty
244            let retrieved: Option<i32> = cache
245                .get(index)
246                .await
247                .expect("Failed to get data");
248            assert!(retrieved.is_none());
249        });
250    }
251
252    #[test_traced]
253    fn test_cache_prune() {
254        // Initialize the deterministic context
255        let executor = deterministic::Runner::default();
256        executor.start(|context| async move {
257            // Initialize the cache
258            let cfg = Config {
259                partition: "test_partition".into(),
260                codec_config: (),
261                compression: None,
262                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
263                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
264                items_per_blob: NZU64!(1), // no mask - each item is its own section
265                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
266            };
267            let mut cache = Cache::init(context.clone(), cfg.clone())
268                .await
269                .expect("Failed to initialize cache");
270
271            // Insert multiple items across different sections
272            let items = vec![(1u64, 1), (2u64, 2), (3u64, 3), (4u64, 4), (5u64, 5)];
273
274            for (index, data) in &items {
275                cache.put(*index, *data).await.expect("Failed to put data");
276            }
277
278            // Check metrics
279            let buffer = context.encode();
280            assert!(buffer.contains("items_tracked 5"));
281
282            // Prune sections less than 3
283            cache.prune(3).await.expect("Failed to prune");
284
285            // Ensure items 1 and 2 are no longer present
286            for (index, data) in items {
287                let retrieved = cache.get(index).await.expect("Failed to get data");
288                if index < 3 {
289                    assert!(retrieved.is_none());
290                } else {
291                    assert_eq!(retrieved.expect("Data not found"), data);
292                }
293            }
294
295            // Check metrics
296            let buffer = context.encode();
297            assert!(buffer.contains("items_tracked 3"));
298
299            // Try to prune older section
300            cache.prune(2).await.expect("Failed to prune");
301
302            // Try to prune current section again
303            cache.prune(3).await.expect("Failed to prune");
304
305            // Try to put older index
306            let result = cache.put(1, 1).await;
307            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
308        });
309    }
310
311    fn test_cache_restart(num_items: usize) -> String {
312        // Initialize the deterministic context
313        let executor = deterministic::Runner::default();
314        executor.start(|mut context| async move {
315            // Initialize the cache
316            let items_per_blob = 256u64;
317            let cfg = Config {
318                partition: "test_partition".into(),
319                codec_config: (),
320                compression: None,
321                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
322                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
323                items_per_blob: NZU64!(items_per_blob),
324                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
325            };
326            let mut cache = Cache::init(context.clone(), cfg.clone())
327                .await
328                .expect("Failed to initialize cache");
329
330            // Insert multiple items
331            let mut items = BTreeMap::new();
332            while items.len() < num_items {
333                let index = items.len() as u64;
334                let mut data = [0u8; 1024];
335                context.fill(&mut data);
336                items.insert(index, data);
337
338                cache.put(index, data).await.expect("Failed to put data");
339            }
340
341            // Ensure all items can be retrieved
342            for (index, data) in &items {
343                let retrieved = cache
344                    .get(*index)
345                    .await
346                    .expect("Failed to get data")
347                    .expect("Data not found");
348                assert_eq!(retrieved, *data);
349            }
350
351            // Check metrics
352            let buffer = context.encode();
353            let tracked = format!("items_tracked {num_items:?}");
354            assert!(buffer.contains(&tracked));
355
356            // Close the cache
357            cache.close().await.expect("Failed to close cache");
358
359            // Reinitialize the cache
360            let cfg = Config {
361                partition: "test_partition".into(),
362                codec_config: (),
363                compression: None,
364                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
365                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
366                items_per_blob: NZU64!(items_per_blob),
367                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
368            };
369            let mut cache = Cache::<_, [u8; 1024]>::init(context.clone(), cfg.clone())
370                .await
371                .expect("Failed to initialize cache");
372
373            // Ensure all items can be retrieved
374            for (index, data) in &items {
375                let retrieved = cache
376                    .get(*index)
377                    .await
378                    .expect("Failed to get data")
379                    .expect("Data not found");
380                assert_eq!(&retrieved, data);
381            }
382
383            // Prune first half
384            let min = (items.len() / 2) as u64;
385            cache.prune(min).await.expect("Failed to prune");
386
387            // Ensure all items can be retrieved that haven't been pruned
388            let min = (min / items_per_blob) * items_per_blob;
389            let mut removed = 0;
390            for (index, data) in items {
391                if index >= min {
392                    let retrieved = cache
393                        .get(index)
394                        .await
395                        .expect("Failed to get data")
396                        .expect("Data not found");
397                    assert_eq!(retrieved, data);
398                } else {
399                    let retrieved = cache.get(index).await.expect("Failed to get data");
400                    assert!(retrieved.is_none());
401                    removed += 1;
402                }
403            }
404
405            // Check metrics
406            let buffer = context.encode();
407            let tracked = format!("items_tracked {:?}", num_items - removed);
408            assert!(buffer.contains(&tracked));
409
410            context.auditor().state()
411        })
412    }
413
414    #[test_traced]
415    #[ignore]
416    fn test_cache_many_items_and_restart() {
417        test_cache_restart(100_000);
418    }
419
420    #[test_traced]
421    #[ignore]
422    fn test_determinism() {
423        let state1 = test_cache_restart(5_000);
424        let state2 = test_cache_restart(5_000);
425        assert_eq!(state1, state2);
426    }
427
428    #[test_traced]
429    fn test_cache_next_gap() {
430        let executor = deterministic::Runner::default();
431        executor.start(|context| async move {
432            let cfg = Config {
433                partition: "test_partition".into(),
434                codec_config: (),
435                compression: None,
436                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
437                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
438                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
439                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
440            };
441            let mut cache = Cache::init(context.clone(), cfg.clone())
442                .await
443                .expect("Failed to initialize cache");
444
445            // Insert values with gaps
446            cache.put(1, 1).await.unwrap();
447            cache.put(10, 10).await.unwrap();
448            cache.put(11, 11).await.unwrap();
449            cache.put(14, 14).await.unwrap();
450
451            // Check gaps
452            let (current_end, start_next) = cache.next_gap(0);
453            assert!(current_end.is_none());
454            assert_eq!(start_next, Some(1));
455
456            let (current_end, start_next) = cache.next_gap(1);
457            assert_eq!(current_end, Some(1));
458            assert_eq!(start_next, Some(10));
459
460            let (current_end, start_next) = cache.next_gap(10);
461            assert_eq!(current_end, Some(11));
462            assert_eq!(start_next, Some(14));
463
464            let (current_end, start_next) = cache.next_gap(11);
465            assert_eq!(current_end, Some(11));
466            assert_eq!(start_next, Some(14));
467
468            let (current_end, start_next) = cache.next_gap(12);
469            assert!(current_end.is_none());
470            assert_eq!(start_next, Some(14));
471
472            let (current_end, start_next) = cache.next_gap(14);
473            assert_eq!(current_end, Some(14));
474            assert!(start_next.is_none());
475        });
476    }
477
478    #[test_traced]
479    fn test_cache_missing_items() {
480        let executor = deterministic::Runner::default();
481        executor.start(|context| async move {
482            let cfg = Config {
483                partition: "test_partition".into(),
484                codec_config: (),
485                compression: None,
486                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
487                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
488                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
489                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
490            };
491            let mut cache = Cache::init(context.clone(), cfg.clone())
492                .await
493                .expect("Failed to initialize cache");
494
495            // Test 1: Empty cache - should return no items
496            assert_eq!(cache.missing_items(0, 5), Vec::<u64>::new());
497            assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
498
499            // Test 2: Insert values with gaps
500            cache.put(1, 1).await.unwrap();
501            cache.put(2, 2).await.unwrap();
502            cache.put(5, 5).await.unwrap();
503            cache.put(6, 6).await.unwrap();
504            cache.put(10, 10).await.unwrap();
505
506            // Test 3: Find missing items from the beginning
507            assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
508            assert_eq!(cache.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
509            assert_eq!(cache.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
510
511            // Test 4: Find missing items from within a gap
512            assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
513            assert_eq!(cache.missing_items(4, 2), vec![4, 7]);
514
515            // Test 5: Find missing items from within a range
516            assert_eq!(cache.missing_items(1, 3), vec![3, 4, 7]);
517            assert_eq!(cache.missing_items(2, 4), vec![3, 4, 7, 8]);
518            assert_eq!(cache.missing_items(5, 2), vec![7, 8]);
519
520            // Test 6: Find missing items after the last range (no more gaps)
521            assert_eq!(cache.missing_items(11, 5), Vec::<u64>::new());
522            assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
523
524            // Test 7: Large gap scenario
525            cache.put(1000, 1000).await.unwrap();
526
527            // Gap between 10 and 1000
528            let items = cache.missing_items(11, 10);
529            assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
530
531            // Request more items than available in gap
532            let items = cache.missing_items(990, 15);
533            assert_eq!(
534                items,
535                vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
536            );
537
538            // Test 8: After syncing (data should remain consistent)
539            cache.sync().await.unwrap();
540            assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
541            assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
542
543            // Test 9: Cross-section boundary scenario
544            cache.put(DEFAULT_ITEMS_PER_BLOB - 1, 99).await.unwrap();
545            cache.put(DEFAULT_ITEMS_PER_BLOB + 1, 101).await.unwrap();
546
547            // Find missing items across section boundary
548            let items = cache.missing_items(DEFAULT_ITEMS_PER_BLOB - 2, 5);
549            assert_eq!(
550                items,
551                vec![DEFAULT_ITEMS_PER_BLOB - 2, DEFAULT_ITEMS_PER_BLOB]
552            );
553
554            cache.close().await.expect("Failed to close cache");
555        });
556    }
557
558    #[test_traced]
559    fn test_cache_intervals_after_restart() {
560        let executor = deterministic::Runner::default();
561        executor.start(|context| async move {
562            let cfg = Config {
563                partition: "test_partition".into(),
564                codec_config: (),
565                compression: None,
566                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
567                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
568                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
569                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
570            };
571
572            // Insert data and close
573            {
574                let mut cache = Cache::init(context.clone(), cfg.clone())
575                    .await
576                    .expect("Failed to initialize cache");
577
578                cache.put(0, 0).await.expect("Failed to put data");
579                cache.put(100, 100).await.expect("Failed to put data");
580                cache.put(1000, 1000).await.expect("Failed to put data");
581
582                cache.close().await.expect("Failed to close cache");
583            }
584
585            // Reopen and verify intervals are preserved
586            {
587                let cache = Cache::<_, i32>::init(context.clone(), cfg.clone())
588                    .await
589                    .expect("Failed to initialize cache");
590
591                // Check gaps are preserved
592                let (current_end, start_next) = cache.next_gap(0);
593                assert_eq!(current_end, Some(0));
594                assert_eq!(start_next, Some(100));
595
596                let (current_end, start_next) = cache.next_gap(100);
597                assert_eq!(current_end, Some(100));
598                assert_eq!(start_next, Some(1000));
599
600                // Check missing items
601                let items = cache.missing_items(1, 5);
602                assert_eq!(items, vec![1, 2, 3, 4, 5]);
603            }
604        });
605    }
606
607    #[test_traced]
608    fn test_cache_intervals_with_pruning() {
609        let executor = deterministic::Runner::default();
610        executor.start(|context| async move {
611            let cfg = Config {
612                partition: "test_partition".into(),
613                codec_config: (),
614                compression: None,
615                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
616                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
617                items_per_blob: NZU64!(100), // Smaller sections for easier testing
618                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
619            };
620            let mut cache = Cache::init(context.clone(), cfg.clone())
621                .await
622                .expect("Failed to initialize cache");
623
624            // Insert values across multiple sections
625            cache.put(50, 50).await.unwrap();
626            cache.put(150, 150).await.unwrap();
627            cache.put(250, 250).await.unwrap();
628            cache.put(350, 350).await.unwrap();
629
630            // Check gaps before pruning
631            let (current_end, start_next) = cache.next_gap(0);
632            assert!(current_end.is_none());
633            assert_eq!(start_next, Some(50));
634
635            // Prune sections less than 200
636            cache.prune(200).await.expect("Failed to prune");
637
638            // Check that pruned indices are not accessible
639            assert!(!cache.has(50));
640            assert!(!cache.has(150));
641
642            // Check gaps after pruning - should not include pruned ranges
643            let (current_end, start_next) = cache.next_gap(200);
644            assert!(current_end.is_none());
645            assert_eq!(start_next, Some(250));
646
647            // Missing items should not include pruned ranges
648            let items = cache.missing_items(200, 5);
649            assert_eq!(items, vec![200, 201, 202, 203, 204]);
650
651            // Verify remaining data is still accessible
652            assert!(cache.has(250));
653            assert!(cache.has(350));
654            assert_eq!(cache.get(250).await.unwrap(), Some(250));
655            assert_eq!(cache.get(350).await.unwrap(), Some(350));
656        });
657    }
658
659    #[test_traced]
660    fn test_cache_sparse_indices() {
661        let executor = deterministic::Runner::default();
662        executor.start(|context| async move {
663            let cfg = Config {
664                partition: "test_partition".into(),
665                codec_config: (),
666                compression: None,
667                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
668                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
669                items_per_blob: NZU64!(100), // Smaller sections for testing
670                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
671            };
672            let mut cache = Cache::init(context.clone(), cfg.clone())
673                .await
674                .expect("Failed to initialize cache");
675
676            // Insert sparse values
677            let indices = vec![
678                (0u64, 0),
679                (99u64, 99),   // End of first section
680                (100u64, 100), // Start of second section
681                (500u64, 500), // Start of sixth section
682            ];
683
684            for (index, value) in &indices {
685                cache.put(*index, *value).await.expect("Failed to put data");
686            }
687
688            // Check that intermediate indices don't exist
689            assert!(!cache.has(1));
690            assert!(!cache.has(50));
691            assert!(!cache.has(101));
692            assert!(!cache.has(499));
693
694            // Verify gap detection works correctly
695            let (current_end, start_next) = cache.next_gap(50);
696            assert!(current_end.is_none());
697            assert_eq!(start_next, Some(99));
698
699            let (current_end, start_next) = cache.next_gap(99);
700            assert_eq!(current_end, Some(100));
701            assert_eq!(start_next, Some(500));
702
703            // Sync and verify
704            cache.sync().await.expect("Failed to sync");
705
706            for (index, value) in &indices {
707                let retrieved = cache
708                    .get(*index)
709                    .await
710                    .expect("Failed to get data")
711                    .expect("Data not found");
712                assert_eq!(retrieved, *value);
713            }
714        });
715    }
716
717    #[test_traced]
718    fn test_cache_intervals_edge_cases() {
719        let executor = deterministic::Runner::default();
720        executor.start(|context| async move {
721            let cfg = Config {
722                partition: "test_partition".into(),
723                codec_config: (),
724                compression: None,
725                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
726                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
727                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
728                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
729            };
730            let mut cache = Cache::init(context.clone(), cfg.clone())
731                .await
732                .expect("Failed to initialize cache");
733
734            // Test edge case: single item
735            cache.put(42, 42).await.unwrap();
736
737            let (current_end, start_next) = cache.next_gap(42);
738            assert_eq!(current_end, Some(42));
739            assert!(start_next.is_none());
740
741            let (current_end, start_next) = cache.next_gap(41);
742            assert!(current_end.is_none());
743            assert_eq!(start_next, Some(42));
744
745            let (current_end, start_next) = cache.next_gap(43);
746            assert!(current_end.is_none());
747            assert!(start_next.is_none());
748
749            // Test edge case: consecutive items
750            cache.put(43, 43).await.unwrap();
751            cache.put(44, 44).await.unwrap();
752
753            let (current_end, start_next) = cache.next_gap(42);
754            assert_eq!(current_end, Some(44));
755            assert!(start_next.is_none());
756
757            // Test edge case: boundary values
758            cache.put(u64::MAX - 1, 999).await.unwrap();
759
760            let (current_end, start_next) = cache.next_gap(u64::MAX - 2);
761            assert!(current_end.is_none());
762            assert_eq!(start_next, Some(u64::MAX - 1));
763
764            let (current_end, start_next) = cache.next_gap(u64::MAX - 1);
765            assert_eq!(current_end, Some(u64::MAX - 1));
766            assert!(start_next.is_none());
767        });
768    }
769
770    #[test_traced]
771    fn test_cache_intervals_duplicate_inserts() {
772        let executor = deterministic::Runner::default();
773        executor.start(|context| async move {
774            let cfg = Config {
775                partition: "test_partition".into(),
776                codec_config: (),
777                compression: None,
778                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
779                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
780                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
781                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
782            };
783            let mut cache = Cache::init(context.clone(), cfg.clone())
784                .await
785                .expect("Failed to initialize cache");
786
787            // Insert initial value
788            cache.put(10, 10).await.unwrap();
789            assert!(cache.has(10));
790            assert_eq!(cache.get(10).await.unwrap(), Some(10));
791
792            // Try to insert duplicate - should be no-op
793            cache.put(10, 20).await.unwrap();
794            assert!(cache.has(10));
795            assert_eq!(cache.get(10).await.unwrap(), Some(10)); // Should still be original value
796
797            // Verify intervals are correct
798            let (current_end, start_next) = cache.next_gap(10);
799            assert_eq!(current_end, Some(10));
800            assert!(start_next.is_none());
801
802            // Insert adjacent values
803            cache.put(9, 9).await.unwrap();
804            cache.put(11, 11).await.unwrap();
805
806            // Verify intervals updated correctly
807            let (current_end, start_next) = cache.next_gap(9);
808            assert_eq!(current_end, Some(11));
809            assert!(start_next.is_none());
810        });
811    }
812}