commonware_storage/cache/
mod.rs

1//! A prunable cache for ordered data with index-based lookups.
2//!
3//! Data is stored in [crate::journal::segmented::variable::Journal] (an append-only log) and the location of
4//! written data is tracked in-memory by index to enable **single-read lookups** for cached data.
5//!
6//! Unlike [crate::archive::Archive], the [Cache] is optimized for simplicity and does
7//! not support key-based lookups (only index-based access is provided). This makes it ideal for
8//! caching sequential data where you know the exact index of the item you want to retrieve.
9//!
10//! # Memory Overhead
11//!
12//! [Cache] maintains a single in-memory map to track the location of each index item. The memory
13//! used to track each item is `8 + 4 + 4` bytes (where `8` is the index, `4` is the offset, and
14//! `4` is the length). This results in approximately `16` bytes of memory overhead per cached item.
15//!
16//! # Pruning
17//!
18//! [Cache] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
19//! called on a `section`, all interaction with a `section` less than the pruned `section` will
20//! return an error. The pruning granularity is determined by `items_per_blob` in the configuration.
21//!
22//! # Single Operation Reads
23//!
24//! To enable single operation reads (i.e. reading all of an item in a single call to
25//! [commonware_runtime::Blob]), [Cache] stores the length of each item in its in-memory index.
26//! This ensures that reading a cached item requires only one disk operation.
27//!
28//! # Compression
29//!
30//! [Cache] supports compressing data before storing it on disk. This can be enabled by setting
31//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
32//! can be changed between initializations of [Cache], however, it must remain populated if any
33//! data was written with compression enabled.
34//!
35//! # Querying for Gaps
36//!
37//! [Cache] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
38//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
39//!
40//! # Example
41//!
42//! ```rust
43//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
44//! use commonware_storage::cache::{Cache, Config};
45//! use commonware_utils::{NZUsize, NZU64};
46//!
47//! let executor = deterministic::Runner::default();
48//! executor.start(|context| async move {
49//!     // Create a cache
50//!     let cfg = Config {
51//!         partition: "cache".into(),
52//!         compression: Some(3),
53//!         codec_config: (),
54//!         items_per_blob: NZU64!(1024),
55//!         write_buffer: NZUsize!(1024 * 1024),
56//!         replay_buffer: NZUsize!(4096),
57//!         buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
58//!     };
59//!     let mut cache = Cache::init(context, cfg).await.unwrap();
60//!
61//!     // Put data at index
62//!     cache.put(1, 100u32).await.unwrap();
63//!
64//!     // Get data by index
65//!     let data: Option<u32> = cache.get(1).await.unwrap();
66//!     assert_eq!(data, Some(100));
67//!
68//!     // Check for gaps in the index space
69//!     cache.put(10, 200u32).await.unwrap();
70//!     let (current_end, start_next) = cache.next_gap(5);
71//!     assert!(current_end.is_none());
72//!     assert_eq!(start_next, Some(10));
73//!
74//!     // Close the cache (also closes the journal)
75//!     cache.close().await.unwrap();
76//! });
77//! ```
78
79use commonware_runtime::buffer::PoolRef;
80use std::num::{NonZeroU64, NonZeroUsize};
81use thiserror::Error;
82
83mod storage;
84pub use storage::Cache;
85
86/// Errors that can occur when interacting with the cache.
87#[derive(Debug, Error)]
88pub enum Error {
89    #[error("journal error: {0}")]
90    Journal(#[from] crate::journal::Error),
91    #[error("record corrupted")]
92    RecordCorrupted,
93    #[error("already pruned to: {0}")]
94    AlreadyPrunedTo(u64),
95    #[error("record too large")]
96    RecordTooLarge,
97}
98
99/// Configuration for [Cache] storage.
100#[derive(Clone)]
101pub struct Config<C> {
102    /// The partition to use for the cache's [crate::journal] storage.
103    pub partition: String,
104
105    /// The compression level to use for the cache's [crate::journal] storage.
106    pub compression: Option<u8>,
107
108    /// The [commonware_codec::Codec] configuration to use for the value stored in the cache.
109    pub codec_config: C,
110
111    /// The number of items per section (the granularity of pruning).
112    pub items_per_blob: NonZeroU64,
113
114    /// The amount of bytes that can be buffered in a section before being written to a
115    /// [commonware_runtime::Blob].
116    pub write_buffer: NonZeroUsize,
117
118    /// The buffer size to use when replaying a [commonware_runtime::Blob].
119    pub replay_buffer: NonZeroUsize,
120
121    /// The buffer pool to use for the cache's [crate::journal] storage.
122    pub buffer_pool: PoolRef,
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use crate::journal::Error as JournalError;
129    use commonware_codec::{varint::UInt, EncodeSize};
130    use commonware_macros::test_traced;
131    use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
132    use commonware_utils::{NZUsize, NZU64};
133    use rand::Rng;
134    use std::collections::BTreeMap;
135
136    const DEFAULT_ITEMS_PER_BLOB: u64 = 65536;
137    const DEFAULT_WRITE_BUFFER: usize = 1024;
138    const DEFAULT_REPLAY_BUFFER: usize = 4096;
139    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
140    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
141
142    #[test_traced]
143    fn test_cache_compression_then_none() {
144        // Initialize the deterministic context
145        let executor = deterministic::Runner::default();
146        executor.start(|context| async move {
147            // Initialize the cache
148            let cfg = Config {
149                partition: "test_partition".into(),
150                codec_config: (),
151                compression: Some(3),
152                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
153                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
154                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
155                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
156            };
157            let mut cache = Cache::init(context.clone(), cfg.clone())
158                .await
159                .expect("Failed to initialize cache");
160
161            // Put the data
162            let index = 1u64;
163            let data = 1;
164            cache.put(index, data).await.expect("Failed to put data");
165
166            // Close the cache
167            cache.close().await.expect("Failed to close cache");
168
169            // Initialize the cache again without compression
170            let cfg = Config {
171                partition: "test_partition".into(),
172                codec_config: (),
173                compression: None,
174                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
175                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
176                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
177                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
178            };
179            let result = Cache::<_, i32>::init(context, cfg.clone()).await;
180            assert!(matches!(
181                result,
182                Err(Error::Journal(JournalError::Codec(_)))
183            ));
184        });
185    }
186
187    #[test_traced]
188    fn test_cache_record_corruption() {
189        // Initialize the deterministic context
190        let executor = deterministic::Runner::default();
191        executor.start(|context| async move {
192            // Initialize the cache
193            let cfg = Config {
194                partition: "test_partition".into(),
195                codec_config: (),
196                compression: None,
197                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
198                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
199                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
200                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
201            };
202            let mut cache = Cache::init(context.clone(), cfg.clone())
203                .await
204                .expect("Failed to initialize cache");
205
206            let index = 1u64;
207            let data = 1;
208
209            // Put the data
210            cache
211                .put(index, data)
212                .await
213                .expect("Failed to put data");
214
215            // Close the cache
216            cache.close().await.expect("Failed to close cache");
217
218            // Corrupt the value
219            let section = (index / DEFAULT_ITEMS_PER_BLOB) * DEFAULT_ITEMS_PER_BLOB;
220            let (blob, _) = context
221                .open("test_partition", &section.to_be_bytes())
222                .await
223                .unwrap();
224            let value_location = 4 /* journal size */ + UInt(1u64).encode_size() as u64 /* index */ + 4 /* value length */;
225            blob.write_at(b"testdaty".to_vec(), value_location).await.unwrap();
226            blob.sync().await.unwrap();
227
228            // Initialize the cache again
229            let cache = Cache::<_, i32>::init(
230                context,
231                Config {
232                    partition: "test_partition".into(),
233                    codec_config: (),
234                    compression: None,
235                    write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
236                    replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
237                    items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
238                    buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
239                },
240            )
241            .await.expect("Failed to initialize cache");
242
243            // Check that the cache is empty
244            let retrieved: Option<i32> = cache
245                .get(index)
246                .await
247                .expect("Failed to get data");
248            assert!(retrieved.is_none());
249        });
250    }
251
252    #[test_traced]
253    fn test_cache_prune() {
254        // Initialize the deterministic context
255        let executor = deterministic::Runner::default();
256        executor.start(|context| async move {
257            // Initialize the cache
258            let cfg = Config {
259                partition: "test_partition".into(),
260                codec_config: (),
261                compression: None,
262                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
263                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
264                items_per_blob: NZU64!(1), // no mask - each item is its own section
265                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
266            };
267            let mut cache = Cache::init(context.clone(), cfg.clone())
268                .await
269                .expect("Failed to initialize cache");
270
271            // Insert multiple items across different sections
272            let items = vec![(1u64, 1), (2u64, 2), (3u64, 3), (4u64, 4), (5u64, 5)];
273            for (index, data) in &items {
274                cache.put(*index, *data).await.expect("Failed to put data");
275            }
276            assert_eq!(cache.first(), Some(1));
277
278            // Check metrics
279            let buffer = context.encode();
280            assert!(buffer.contains("items_tracked 5"));
281
282            // Prune sections less than 3
283            cache.prune(3).await.expect("Failed to prune");
284
285            // Ensure items 1 and 2 are no longer present
286            for (index, data) in items {
287                let retrieved = cache.get(index).await.expect("Failed to get data");
288                if index < 3 {
289                    assert!(retrieved.is_none());
290                } else {
291                    assert_eq!(retrieved.expect("Data not found"), data);
292                }
293            }
294            assert_eq!(cache.first(), Some(3));
295
296            // Check metrics
297            let buffer = context.encode();
298            assert!(buffer.contains("items_tracked 3"));
299
300            // Try to prune older section
301            cache.prune(2).await.expect("Failed to prune");
302            assert_eq!(cache.first(), Some(3));
303
304            // Try to prune current section again
305            cache.prune(3).await.expect("Failed to prune");
306            assert_eq!(cache.first(), Some(3));
307
308            // Try to put older index
309            let result = cache.put(1, 1).await;
310            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
311        });
312    }
313
314    fn test_cache_restart(num_items: usize) -> String {
315        // Initialize the deterministic context
316        let executor = deterministic::Runner::default();
317        executor.start(|mut context| async move {
318            // Initialize the cache
319            let items_per_blob = 256u64;
320            let cfg = Config {
321                partition: "test_partition".into(),
322                codec_config: (),
323                compression: None,
324                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
325                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
326                items_per_blob: NZU64!(items_per_blob),
327                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
328            };
329            let mut cache = Cache::init(context.clone(), cfg.clone())
330                .await
331                .expect("Failed to initialize cache");
332
333            // Insert multiple items
334            let mut items = BTreeMap::new();
335            while items.len() < num_items {
336                let index = items.len() as u64;
337                let mut data = [0u8; 1024];
338                context.fill(&mut data);
339                items.insert(index, data);
340
341                cache.put(index, data).await.expect("Failed to put data");
342            }
343
344            // Ensure all items can be retrieved
345            for (index, data) in &items {
346                let retrieved = cache
347                    .get(*index)
348                    .await
349                    .expect("Failed to get data")
350                    .expect("Data not found");
351                assert_eq!(retrieved, *data);
352            }
353
354            // Check metrics
355            let buffer = context.encode();
356            let tracked = format!("items_tracked {num_items:?}");
357            assert!(buffer.contains(&tracked));
358
359            // Close the cache
360            cache.close().await.expect("Failed to close cache");
361
362            // Reinitialize the cache
363            let cfg = Config {
364                partition: "test_partition".into(),
365                codec_config: (),
366                compression: None,
367                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
368                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
369                items_per_blob: NZU64!(items_per_blob),
370                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
371            };
372            let mut cache = Cache::<_, [u8; 1024]>::init(context.clone(), cfg.clone())
373                .await
374                .expect("Failed to initialize cache");
375
376            // Ensure all items can be retrieved
377            for (index, data) in &items {
378                let retrieved = cache
379                    .get(*index)
380                    .await
381                    .expect("Failed to get data")
382                    .expect("Data not found");
383                assert_eq!(&retrieved, data);
384            }
385
386            // Prune first half
387            let min = (items.len() / 2) as u64;
388            cache.prune(min).await.expect("Failed to prune");
389
390            // Ensure all items can be retrieved that haven't been pruned
391            let min = (min / items_per_blob) * items_per_blob;
392            let mut removed = 0;
393            for (index, data) in items {
394                if index >= min {
395                    let retrieved = cache
396                        .get(index)
397                        .await
398                        .expect("Failed to get data")
399                        .expect("Data not found");
400                    assert_eq!(retrieved, data);
401                } else {
402                    let retrieved = cache.get(index).await.expect("Failed to get data");
403                    assert!(retrieved.is_none());
404                    removed += 1;
405                }
406            }
407
408            // Check metrics
409            let buffer = context.encode();
410            let tracked = format!("items_tracked {:?}", num_items - removed);
411            assert!(buffer.contains(&tracked));
412
413            context.auditor().state()
414        })
415    }
416
417    #[test_traced]
418    #[ignore]
419    fn test_cache_many_items_and_restart() {
420        test_cache_restart(100_000);
421    }
422
423    #[test_traced]
424    #[ignore]
425    fn test_determinism() {
426        let state1 = test_cache_restart(5_000);
427        let state2 = test_cache_restart(5_000);
428        assert_eq!(state1, state2);
429    }
430
431    #[test_traced]
432    fn test_cache_next_gap() {
433        let executor = deterministic::Runner::default();
434        executor.start(|context| async move {
435            let cfg = Config {
436                partition: "test_partition".into(),
437                codec_config: (),
438                compression: None,
439                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
440                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
441                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
442                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
443            };
444            let mut cache = Cache::init(context.clone(), cfg.clone())
445                .await
446                .expect("Failed to initialize cache");
447
448            // Check first
449            assert_eq!(cache.first(), None);
450
451            // Insert values with gaps
452            cache.put(1, 1).await.unwrap();
453            cache.put(10, 10).await.unwrap();
454            cache.put(11, 11).await.unwrap();
455            cache.put(14, 14).await.unwrap();
456
457            // Check gaps
458            let (current_end, start_next) = cache.next_gap(0);
459            assert!(current_end.is_none());
460            assert_eq!(start_next, Some(1));
461            assert_eq!(cache.first(), Some(1));
462
463            let (current_end, start_next) = cache.next_gap(1);
464            assert_eq!(current_end, Some(1));
465            assert_eq!(start_next, Some(10));
466
467            let (current_end, start_next) = cache.next_gap(10);
468            assert_eq!(current_end, Some(11));
469            assert_eq!(start_next, Some(14));
470
471            let (current_end, start_next) = cache.next_gap(11);
472            assert_eq!(current_end, Some(11));
473            assert_eq!(start_next, Some(14));
474
475            let (current_end, start_next) = cache.next_gap(12);
476            assert!(current_end.is_none());
477            assert_eq!(start_next, Some(14));
478
479            let (current_end, start_next) = cache.next_gap(14);
480            assert_eq!(current_end, Some(14));
481            assert!(start_next.is_none());
482        });
483    }
484
485    #[test_traced]
486    fn test_cache_missing_items() {
487        let executor = deterministic::Runner::default();
488        executor.start(|context| async move {
489            let cfg = Config {
490                partition: "test_partition".into(),
491                codec_config: (),
492                compression: None,
493                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
494                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
495                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
496                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
497            };
498            let mut cache = Cache::init(context.clone(), cfg.clone())
499                .await
500                .expect("Failed to initialize cache");
501
502            // Test 1: Empty cache - should return no items
503            assert_eq!(cache.first(), None);
504            assert_eq!(cache.missing_items(0, 5), Vec::<u64>::new());
505            assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
506
507            // Test 2: Insert values with gaps
508            cache.put(1, 1).await.unwrap();
509            cache.put(2, 2).await.unwrap();
510            cache.put(5, 5).await.unwrap();
511            cache.put(6, 6).await.unwrap();
512            cache.put(10, 10).await.unwrap();
513
514            // Test 3: Find missing items from the beginning
515            assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
516            assert_eq!(cache.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
517            assert_eq!(cache.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
518
519            // Test 4: Find missing items from within a gap
520            assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
521            assert_eq!(cache.missing_items(4, 2), vec![4, 7]);
522
523            // Test 5: Find missing items from within a range
524            assert_eq!(cache.missing_items(1, 3), vec![3, 4, 7]);
525            assert_eq!(cache.missing_items(2, 4), vec![3, 4, 7, 8]);
526            assert_eq!(cache.missing_items(5, 2), vec![7, 8]);
527
528            // Test 6: Find missing items after the last range (no more gaps)
529            assert_eq!(cache.missing_items(11, 5), Vec::<u64>::new());
530            assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
531
532            // Test 7: Large gap scenario
533            cache.put(1000, 1000).await.unwrap();
534
535            // Gap between 10 and 1000
536            let items = cache.missing_items(11, 10);
537            assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
538
539            // Request more items than available in gap
540            let items = cache.missing_items(990, 15);
541            assert_eq!(
542                items,
543                vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
544            );
545
546            // Test 8: After syncing (data should remain consistent)
547            cache.sync().await.unwrap();
548            assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
549            assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
550
551            // Test 9: Cross-section boundary scenario
552            cache.put(DEFAULT_ITEMS_PER_BLOB - 1, 99).await.unwrap();
553            cache.put(DEFAULT_ITEMS_PER_BLOB + 1, 101).await.unwrap();
554
555            // Find missing items across section boundary
556            let items = cache.missing_items(DEFAULT_ITEMS_PER_BLOB - 2, 5);
557            assert_eq!(
558                items,
559                vec![DEFAULT_ITEMS_PER_BLOB - 2, DEFAULT_ITEMS_PER_BLOB]
560            );
561
562            cache.close().await.expect("Failed to close cache");
563        });
564    }
565
566    #[test_traced]
567    fn test_cache_intervals_after_restart() {
568        let executor = deterministic::Runner::default();
569        executor.start(|context| async move {
570            let cfg = Config {
571                partition: "test_partition".into(),
572                codec_config: (),
573                compression: None,
574                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
575                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
576                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
577                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
578            };
579
580            // Insert data and close
581            {
582                let mut cache = Cache::init(context.clone(), cfg.clone())
583                    .await
584                    .expect("Failed to initialize cache");
585
586                cache.put(0, 0).await.expect("Failed to put data");
587                cache.put(100, 100).await.expect("Failed to put data");
588                cache.put(1000, 1000).await.expect("Failed to put data");
589
590                cache.close().await.expect("Failed to close cache");
591            }
592
593            // Reopen and verify intervals are preserved
594            {
595                let cache = Cache::<_, i32>::init(context.clone(), cfg.clone())
596                    .await
597                    .expect("Failed to initialize cache");
598
599                // Check gaps are preserved
600                let (current_end, start_next) = cache.next_gap(0);
601                assert_eq!(current_end, Some(0));
602                assert_eq!(start_next, Some(100));
603
604                let (current_end, start_next) = cache.next_gap(100);
605                assert_eq!(current_end, Some(100));
606                assert_eq!(start_next, Some(1000));
607
608                // Check missing items
609                let items = cache.missing_items(1, 5);
610                assert_eq!(items, vec![1, 2, 3, 4, 5]);
611            }
612        });
613    }
614
615    #[test_traced]
616    fn test_cache_intervals_with_pruning() {
617        let executor = deterministic::Runner::default();
618        executor.start(|context| async move {
619            let cfg = Config {
620                partition: "test_partition".into(),
621                codec_config: (),
622                compression: None,
623                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
624                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
625                items_per_blob: NZU64!(100), // Smaller sections for easier testing
626                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
627            };
628            let mut cache = Cache::init(context.clone(), cfg.clone())
629                .await
630                .expect("Failed to initialize cache");
631
632            // Insert values across multiple sections
633            cache.put(50, 50).await.unwrap();
634            cache.put(150, 150).await.unwrap();
635            cache.put(250, 250).await.unwrap();
636            cache.put(350, 350).await.unwrap();
637
638            // Check gaps before pruning
639            let (current_end, start_next) = cache.next_gap(0);
640            assert!(current_end.is_none());
641            assert_eq!(start_next, Some(50));
642
643            // Prune sections less than 200
644            cache.prune(200).await.expect("Failed to prune");
645
646            // Check that pruned indices are not accessible
647            assert!(!cache.has(50));
648            assert!(!cache.has(150));
649
650            // Check gaps after pruning - should not include pruned ranges
651            let (current_end, start_next) = cache.next_gap(200);
652            assert!(current_end.is_none());
653            assert_eq!(start_next, Some(250));
654
655            // Missing items should not include pruned ranges
656            let items = cache.missing_items(200, 5);
657            assert_eq!(items, vec![200, 201, 202, 203, 204]);
658
659            // Verify remaining data is still accessible
660            assert!(cache.has(250));
661            assert!(cache.has(350));
662            assert_eq!(cache.get(250).await.unwrap(), Some(250));
663            assert_eq!(cache.get(350).await.unwrap(), Some(350));
664        });
665    }
666
667    #[test_traced]
668    fn test_cache_sparse_indices() {
669        let executor = deterministic::Runner::default();
670        executor.start(|context| async move {
671            let cfg = Config {
672                partition: "test_partition".into(),
673                codec_config: (),
674                compression: None,
675                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
676                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
677                items_per_blob: NZU64!(100), // Smaller sections for testing
678                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
679            };
680            let mut cache = Cache::init(context.clone(), cfg.clone())
681                .await
682                .expect("Failed to initialize cache");
683
684            // Insert sparse values
685            let indices = vec![
686                (0u64, 0),
687                (99u64, 99),   // End of first section
688                (100u64, 100), // Start of second section
689                (500u64, 500), // Start of sixth section
690            ];
691
692            for (index, value) in &indices {
693                cache.put(*index, *value).await.expect("Failed to put data");
694            }
695
696            // Check that intermediate indices don't exist
697            assert!(!cache.has(1));
698            assert!(!cache.has(50));
699            assert!(!cache.has(101));
700            assert!(!cache.has(499));
701
702            // Verify gap detection works correctly
703            let (current_end, start_next) = cache.next_gap(50);
704            assert!(current_end.is_none());
705            assert_eq!(start_next, Some(99));
706
707            let (current_end, start_next) = cache.next_gap(99);
708            assert_eq!(current_end, Some(100));
709            assert_eq!(start_next, Some(500));
710
711            // Sync and verify
712            cache.sync().await.expect("Failed to sync");
713
714            for (index, value) in &indices {
715                let retrieved = cache
716                    .get(*index)
717                    .await
718                    .expect("Failed to get data")
719                    .expect("Data not found");
720                assert_eq!(retrieved, *value);
721            }
722        });
723    }
724
725    #[test_traced]
726    fn test_cache_intervals_edge_cases() {
727        let executor = deterministic::Runner::default();
728        executor.start(|context| async move {
729            let cfg = Config {
730                partition: "test_partition".into(),
731                codec_config: (),
732                compression: None,
733                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
734                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
735                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
736                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
737            };
738            let mut cache = Cache::init(context.clone(), cfg.clone())
739                .await
740                .expect("Failed to initialize cache");
741
742            // Test edge case: single item
743            cache.put(42, 42).await.unwrap();
744
745            let (current_end, start_next) = cache.next_gap(42);
746            assert_eq!(current_end, Some(42));
747            assert!(start_next.is_none());
748
749            let (current_end, start_next) = cache.next_gap(41);
750            assert!(current_end.is_none());
751            assert_eq!(start_next, Some(42));
752
753            let (current_end, start_next) = cache.next_gap(43);
754            assert!(current_end.is_none());
755            assert!(start_next.is_none());
756
757            // Test edge case: consecutive items
758            cache.put(43, 43).await.unwrap();
759            cache.put(44, 44).await.unwrap();
760
761            let (current_end, start_next) = cache.next_gap(42);
762            assert_eq!(current_end, Some(44));
763            assert!(start_next.is_none());
764
765            // Test edge case: boundary values
766            cache.put(u64::MAX - 1, 999).await.unwrap();
767
768            let (current_end, start_next) = cache.next_gap(u64::MAX - 2);
769            assert!(current_end.is_none());
770            assert_eq!(start_next, Some(u64::MAX - 1));
771
772            let (current_end, start_next) = cache.next_gap(u64::MAX - 1);
773            assert_eq!(current_end, Some(u64::MAX - 1));
774            assert!(start_next.is_none());
775        });
776    }
777
778    #[test_traced]
779    fn test_cache_intervals_duplicate_inserts() {
780        let executor = deterministic::Runner::default();
781        executor.start(|context| async move {
782            let cfg = Config {
783                partition: "test_partition".into(),
784                codec_config: (),
785                compression: None,
786                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
787                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
788                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
789                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
790            };
791            let mut cache = Cache::init(context.clone(), cfg.clone())
792                .await
793                .expect("Failed to initialize cache");
794
795            // Insert initial value
796            cache.put(10, 10).await.unwrap();
797            assert!(cache.has(10));
798            assert_eq!(cache.get(10).await.unwrap(), Some(10));
799
800            // Try to insert duplicate - should be no-op
801            cache.put(10, 20).await.unwrap();
802            assert!(cache.has(10));
803            assert_eq!(cache.get(10).await.unwrap(), Some(10)); // Should still be original value
804
805            // Verify intervals are correct
806            let (current_end, start_next) = cache.next_gap(10);
807            assert_eq!(current_end, Some(10));
808            assert!(start_next.is_none());
809
810            // Insert adjacent values
811            cache.put(9, 9).await.unwrap();
812            cache.put(11, 11).await.unwrap();
813
814            // Verify intervals updated correctly
815            let (current_end, start_next) = cache.next_gap(9);
816            assert_eq!(current_end, Some(11));
817            assert!(start_next.is_none());
818        });
819    }
820}