commonware_storage/cache/
mod.rs

1//! A prunable cache for ordered data with index-based lookups.
2//!
3//! Data is stored in [crate::journal::segmented::variable::Journal] (an append-only log) and the location of
4//! written data is tracked in-memory by index to enable **single-read lookups** for cached data.
5//!
6//! Unlike [crate::archive::Archive], the [Cache] is optimized for simplicity and does
7//! not support key-based lookups (only index-based access is provided). This makes it ideal for
8//! caching sequential data where you know the exact index of the item you want to retrieve.
9//!
10//! # Memory Overhead
11//!
12//! [Cache] maintains a single in-memory map to track the location of each index item. The memory
13//! used to track each item is `8 + 4 + 4` bytes (where `8` is the index, `4` is the offset, and
14//! `4` is the length). This results in approximately `16` bytes of memory overhead per cached item.
15//!
16//! # Pruning
17//!
18//! [Cache] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
19//! called on a `section`, all interaction with a `section` less than the pruned `section` will
20//! return an error. The pruning granularity is determined by `items_per_blob` in the configuration.
21//!
22//! # Single Operation Reads
23//!
24//! To enable single operation reads (i.e. reading all of an item in a single call to
25//! [commonware_runtime::Blob]), [Cache] stores the length of each item in its in-memory index.
26//! This ensures that reading a cached item requires only one disk operation.
27//!
28//! # Compression
29//!
30//! [Cache] supports compressing data before storing it on disk. This can be enabled by setting
31//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
32//! can be changed between initializations of [Cache], however, it must remain populated if any
33//! data was written with compression enabled.
34//!
35//! # Querying for Gaps
36//!
37//! [Cache] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
38//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
39//!
40//! # Example
41//!
42//! ```rust
43//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
44//! use commonware_storage::cache::{Cache, Config};
45//! use commonware_utils::{NZUsize, NZU16, NZU64};
46//!
47//! let executor = deterministic::Runner::default();
48//! executor.start(|context| async move {
49//!     // Create a cache
50//!     let cfg = Config {
51//!         partition: "cache".into(),
52//!         compression: Some(3),
53//!         codec_config: (),
54//!         items_per_blob: NZU64!(1024),
55//!         write_buffer: NZUsize!(1024 * 1024),
56//!         replay_buffer: NZUsize!(4096),
57//!         buffer_pool: PoolRef::new(NZU16!(1024), NZUsize!(10)),
58//!     };
59//!     let mut cache = Cache::init(context, cfg).await.unwrap();
60//!
61//!     // Put data at index
62//!     cache.put(1, 100u32).await.unwrap();
63//!
64//!     // Get data by index
65//!     let data: Option<u32> = cache.get(1).await.unwrap();
66//!     assert_eq!(data, Some(100));
67//!
68//!     // Check for gaps in the index space
69//!     cache.put(10, 200u32).await.unwrap();
70//!     let (current_end, start_next) = cache.next_gap(5);
71//!     assert!(current_end.is_none());
72//!     assert_eq!(start_next, Some(10));
73//!
74//!     // Sync the cache
75//!     cache.sync().await.unwrap();
76//! });
77//! ```
78
79use commonware_runtime::buffer::PoolRef;
80use std::num::{NonZeroU64, NonZeroUsize};
81use thiserror::Error;
82
83mod storage;
84pub use storage::Cache;
85
86/// Errors that can occur when interacting with the cache.
87#[derive(Debug, Error)]
88pub enum Error {
89    #[error("journal error: {0}")]
90    Journal(#[from] crate::journal::Error),
91    #[error("record corrupted")]
92    RecordCorrupted,
93    #[error("already pruned to: {0}")]
94    AlreadyPrunedTo(u64),
95    #[error("record too large")]
96    RecordTooLarge,
97}
98
99/// Configuration for [Cache] storage.
100#[derive(Clone)]
101pub struct Config<C> {
102    /// The partition to use for the cache's [crate::journal] storage.
103    pub partition: String,
104
105    /// The compression level to use for the cache's [crate::journal] storage.
106    pub compression: Option<u8>,
107
108    /// The [commonware_codec::Codec] configuration to use for the value stored in the cache.
109    pub codec_config: C,
110
111    /// The number of items per section (the granularity of pruning).
112    pub items_per_blob: NonZeroU64,
113
114    /// The amount of bytes that can be buffered in a section before being written to a
115    /// [commonware_runtime::Blob].
116    pub write_buffer: NonZeroUsize,
117
118    /// The buffer size to use when replaying a [commonware_runtime::Blob].
119    pub replay_buffer: NonZeroUsize,
120
121    /// The buffer pool to use for the cache's [crate::journal] storage.
122    pub buffer_pool: PoolRef,
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use crate::journal::Error as JournalError;
129    use commonware_macros::{test_group, test_traced};
130    use commonware_runtime::{deterministic, Metrics, Runner};
131    use commonware_utils::{NZUsize, NZU16, NZU64};
132    use rand::Rng;
133    use std::{collections::BTreeMap, num::NonZeroU16};
134
135    const DEFAULT_ITEMS_PER_BLOB: u64 = 65536;
136    const DEFAULT_WRITE_BUFFER: usize = 1024;
137    const DEFAULT_REPLAY_BUFFER: usize = 4096;
138    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
139    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
140
141    #[test_traced]
142    fn test_cache_compression_then_none() {
143        // Initialize the deterministic context
144        let executor = deterministic::Runner::default();
145        executor.start(|context| async move {
146            // Initialize the cache
147            let cfg = Config {
148                partition: "test_partition".into(),
149                codec_config: (),
150                compression: Some(3),
151                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
152                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
153                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
154                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
155            };
156            let mut cache = Cache::init(context.clone(), cfg.clone())
157                .await
158                .expect("Failed to initialize cache");
159
160            // Put the data
161            let index = 1u64;
162            let data = 1;
163            cache.put(index, data).await.expect("Failed to put data");
164
165            // Sync and drop the cache
166            cache.sync().await.expect("Failed to sync cache");
167            drop(cache);
168
169            // Initialize the cache again without compression
170            let cfg = Config {
171                partition: "test_partition".into(),
172                codec_config: (),
173                compression: None,
174                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
175                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
176                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
177                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
178            };
179            let result = Cache::<_, i32>::init(context, cfg.clone()).await;
180            assert!(matches!(
181                result,
182                Err(Error::Journal(JournalError::Codec(_)))
183            ));
184        });
185    }
186
187    #[test_traced]
188    fn test_cache_prune() {
189        // Initialize the deterministic context
190        let executor = deterministic::Runner::default();
191        executor.start(|context| async move {
192            // Initialize the cache
193            let cfg = Config {
194                partition: "test_partition".into(),
195                codec_config: (),
196                compression: None,
197                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
198                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
199                items_per_blob: NZU64!(1), // no mask - each item is its own section
200                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
201            };
202            let mut cache = Cache::init(context.clone(), cfg.clone())
203                .await
204                .expect("Failed to initialize cache");
205
206            // Insert multiple items across different sections
207            let items = vec![(1u64, 1), (2u64, 2), (3u64, 3), (4u64, 4), (5u64, 5)];
208            for (index, data) in &items {
209                cache.put(*index, *data).await.expect("Failed to put data");
210            }
211            assert_eq!(cache.first(), Some(1));
212
213            // Check metrics
214            let buffer = context.encode();
215            assert!(buffer.contains("items_tracked 5"));
216
217            // Prune sections less than 3
218            cache.prune(3).await.expect("Failed to prune");
219
220            // Ensure items 1 and 2 are no longer present
221            for (index, data) in items {
222                let retrieved = cache.get(index).await.expect("Failed to get data");
223                if index < 3 {
224                    assert!(retrieved.is_none());
225                } else {
226                    assert_eq!(retrieved.expect("Data not found"), data);
227                }
228            }
229            assert_eq!(cache.first(), Some(3));
230
231            // Check metrics
232            let buffer = context.encode();
233            assert!(buffer.contains("items_tracked 3"));
234
235            // Try to prune older section
236            cache.prune(2).await.expect("Failed to prune");
237            assert_eq!(cache.first(), Some(3));
238
239            // Try to prune current section again
240            cache.prune(3).await.expect("Failed to prune");
241            assert_eq!(cache.first(), Some(3));
242
243            // Try to put older index
244            let result = cache.put(1, 1).await;
245            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
246        });
247    }
248
249    fn test_cache_restart(num_items: usize) -> String {
250        // Initialize the deterministic context
251        let executor = deterministic::Runner::default();
252        executor.start(|mut context| async move {
253            // Initialize the cache
254            let items_per_blob = 256u64;
255            let cfg = Config {
256                partition: "test_partition".into(),
257                codec_config: (),
258                compression: None,
259                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
260                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
261                items_per_blob: NZU64!(items_per_blob),
262                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
263            };
264            let mut cache = Cache::init(context.clone(), cfg.clone())
265                .await
266                .expect("Failed to initialize cache");
267
268            // Insert multiple items
269            let mut items = BTreeMap::new();
270            while items.len() < num_items {
271                let index = items.len() as u64;
272                let mut data = [0u8; 1024];
273                context.fill(&mut data);
274                items.insert(index, data);
275
276                cache.put(index, data).await.expect("Failed to put data");
277            }
278
279            // Ensure all items can be retrieved
280            for (index, data) in &items {
281                let retrieved = cache
282                    .get(*index)
283                    .await
284                    .expect("Failed to get data")
285                    .expect("Data not found");
286                assert_eq!(retrieved, *data);
287            }
288
289            // Check metrics
290            let buffer = context.encode();
291            let tracked = format!("items_tracked {num_items:?}");
292            assert!(buffer.contains(&tracked));
293
294            // Sync and drop the cache
295            cache.sync().await.expect("Failed to sync cache");
296            drop(cache);
297
298            // Reinitialize the cache
299            let cfg = Config {
300                partition: "test_partition".into(),
301                codec_config: (),
302                compression: None,
303                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
304                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
305                items_per_blob: NZU64!(items_per_blob),
306                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
307            };
308            let mut cache = Cache::<_, [u8; 1024]>::init(context.clone(), cfg.clone())
309                .await
310                .expect("Failed to initialize cache");
311
312            // Ensure all items can be retrieved
313            for (index, data) in &items {
314                let retrieved = cache
315                    .get(*index)
316                    .await
317                    .expect("Failed to get data")
318                    .expect("Data not found");
319                assert_eq!(&retrieved, data);
320            }
321
322            // Prune first half
323            let min = (items.len() / 2) as u64;
324            cache.prune(min).await.expect("Failed to prune");
325
326            // Ensure all items can be retrieved that haven't been pruned
327            let min = (min / items_per_blob) * items_per_blob;
328            let mut removed = 0;
329            for (index, data) in items {
330                if index >= min {
331                    let retrieved = cache
332                        .get(index)
333                        .await
334                        .expect("Failed to get data")
335                        .expect("Data not found");
336                    assert_eq!(retrieved, data);
337                } else {
338                    let retrieved = cache.get(index).await.expect("Failed to get data");
339                    assert!(retrieved.is_none());
340                    removed += 1;
341                }
342            }
343
344            // Check metrics
345            let buffer = context.encode();
346            let tracked = format!("items_tracked {:?}", num_items - removed);
347            assert!(buffer.contains(&tracked));
348
349            context.auditor().state()
350        })
351    }
352
353    #[test_group("slow")]
354    #[test_traced]
355    fn test_cache_many_items_and_restart() {
356        test_cache_restart(100_000);
357    }
358
359    #[test_group("slow")]
360    #[test_traced]
361    fn test_determinism() {
362        let state1 = test_cache_restart(5_000);
363        let state2 = test_cache_restart(5_000);
364        assert_eq!(state1, state2);
365    }
366
367    #[test_traced]
368    fn test_cache_next_gap() {
369        let executor = deterministic::Runner::default();
370        executor.start(|context| async move {
371            let cfg = Config {
372                partition: "test_partition".into(),
373                codec_config: (),
374                compression: None,
375                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
376                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
377                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
378                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
379            };
380            let mut cache = Cache::init(context.clone(), cfg.clone())
381                .await
382                .expect("Failed to initialize cache");
383
384            // Check first
385            assert_eq!(cache.first(), None);
386
387            // Insert values with gaps
388            cache.put(1, 1).await.unwrap();
389            cache.put(10, 10).await.unwrap();
390            cache.put(11, 11).await.unwrap();
391            cache.put(14, 14).await.unwrap();
392
393            // Check gaps
394            let (current_end, start_next) = cache.next_gap(0);
395            assert!(current_end.is_none());
396            assert_eq!(start_next, Some(1));
397            assert_eq!(cache.first(), Some(1));
398
399            let (current_end, start_next) = cache.next_gap(1);
400            assert_eq!(current_end, Some(1));
401            assert_eq!(start_next, Some(10));
402
403            let (current_end, start_next) = cache.next_gap(10);
404            assert_eq!(current_end, Some(11));
405            assert_eq!(start_next, Some(14));
406
407            let (current_end, start_next) = cache.next_gap(11);
408            assert_eq!(current_end, Some(11));
409            assert_eq!(start_next, Some(14));
410
411            let (current_end, start_next) = cache.next_gap(12);
412            assert!(current_end.is_none());
413            assert_eq!(start_next, Some(14));
414
415            let (current_end, start_next) = cache.next_gap(14);
416            assert_eq!(current_end, Some(14));
417            assert!(start_next.is_none());
418        });
419    }
420
421    #[test_traced]
422    fn test_cache_missing_items() {
423        let executor = deterministic::Runner::default();
424        executor.start(|context| async move {
425            let cfg = Config {
426                partition: "test_partition".into(),
427                codec_config: (),
428                compression: None,
429                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
430                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
431                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
432                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
433            };
434            let mut cache = Cache::init(context.clone(), cfg.clone())
435                .await
436                .expect("Failed to initialize cache");
437
438            // Test 1: Empty cache - should return no items
439            assert_eq!(cache.first(), None);
440            assert_eq!(cache.missing_items(0, 5), Vec::<u64>::new());
441            assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
442
443            // Test 2: Insert values with gaps
444            cache.put(1, 1).await.unwrap();
445            cache.put(2, 2).await.unwrap();
446            cache.put(5, 5).await.unwrap();
447            cache.put(6, 6).await.unwrap();
448            cache.put(10, 10).await.unwrap();
449
450            // Test 3: Find missing items from the beginning
451            assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
452            assert_eq!(cache.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
453            assert_eq!(cache.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
454
455            // Test 4: Find missing items from within a gap
456            assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
457            assert_eq!(cache.missing_items(4, 2), vec![4, 7]);
458
459            // Test 5: Find missing items from within a range
460            assert_eq!(cache.missing_items(1, 3), vec![3, 4, 7]);
461            assert_eq!(cache.missing_items(2, 4), vec![3, 4, 7, 8]);
462            assert_eq!(cache.missing_items(5, 2), vec![7, 8]);
463
464            // Test 6: Find missing items after the last range (no more gaps)
465            assert_eq!(cache.missing_items(11, 5), Vec::<u64>::new());
466            assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
467
468            // Test 7: Large gap scenario
469            cache.put(1000, 1000).await.unwrap();
470
471            // Gap between 10 and 1000
472            let items = cache.missing_items(11, 10);
473            assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
474
475            // Request more items than available in gap
476            let items = cache.missing_items(990, 15);
477            assert_eq!(
478                items,
479                vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
480            );
481
482            // Test 8: After syncing (data should remain consistent)
483            cache.sync().await.unwrap();
484            assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
485            assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
486
487            // Test 9: Cross-section boundary scenario
488            cache.put(DEFAULT_ITEMS_PER_BLOB - 1, 99).await.unwrap();
489            cache.put(DEFAULT_ITEMS_PER_BLOB + 1, 101).await.unwrap();
490
491            // Find missing items across section boundary
492            let items = cache.missing_items(DEFAULT_ITEMS_PER_BLOB - 2, 5);
493            assert_eq!(
494                items,
495                vec![DEFAULT_ITEMS_PER_BLOB - 2, DEFAULT_ITEMS_PER_BLOB]
496            );
497        });
498    }
499
500    #[test_traced]
501    fn test_cache_intervals_after_restart() {
502        let executor = deterministic::Runner::default();
503        executor.start(|context| async move {
504            let cfg = Config {
505                partition: "test_partition".into(),
506                codec_config: (),
507                compression: None,
508                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
509                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
510                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
511                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
512            };
513
514            // Insert data and sync
515            {
516                let mut cache = Cache::init(context.clone(), cfg.clone())
517                    .await
518                    .expect("Failed to initialize cache");
519
520                cache.put(0, 0).await.expect("Failed to put data");
521                cache.put(100, 100).await.expect("Failed to put data");
522                cache.put(1000, 1000).await.expect("Failed to put data");
523
524                cache.sync().await.expect("Failed to sync cache");
525            }
526
527            // Reopen and verify intervals are preserved
528            {
529                let cache = Cache::<_, i32>::init(context.clone(), cfg.clone())
530                    .await
531                    .expect("Failed to initialize cache");
532
533                // Check gaps are preserved
534                let (current_end, start_next) = cache.next_gap(0);
535                assert_eq!(current_end, Some(0));
536                assert_eq!(start_next, Some(100));
537
538                let (current_end, start_next) = cache.next_gap(100);
539                assert_eq!(current_end, Some(100));
540                assert_eq!(start_next, Some(1000));
541
542                // Check missing items
543                let items = cache.missing_items(1, 5);
544                assert_eq!(items, vec![1, 2, 3, 4, 5]);
545            }
546        });
547    }
548
549    #[test_traced]
550    fn test_cache_intervals_with_pruning() {
551        let executor = deterministic::Runner::default();
552        executor.start(|context| async move {
553            let cfg = Config {
554                partition: "test_partition".into(),
555                codec_config: (),
556                compression: None,
557                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
558                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
559                items_per_blob: NZU64!(100), // Smaller sections for easier testing
560                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
561            };
562            let mut cache = Cache::init(context.clone(), cfg.clone())
563                .await
564                .expect("Failed to initialize cache");
565
566            // Insert values across multiple sections
567            cache.put(50, 50).await.unwrap();
568            cache.put(150, 150).await.unwrap();
569            cache.put(250, 250).await.unwrap();
570            cache.put(350, 350).await.unwrap();
571
572            // Check gaps before pruning
573            let (current_end, start_next) = cache.next_gap(0);
574            assert!(current_end.is_none());
575            assert_eq!(start_next, Some(50));
576
577            // Prune sections less than 200
578            cache.prune(200).await.expect("Failed to prune");
579
580            // Check that pruned indices are not accessible
581            assert!(!cache.has(50));
582            assert!(!cache.has(150));
583
584            // Check gaps after pruning - should not include pruned ranges
585            let (current_end, start_next) = cache.next_gap(200);
586            assert!(current_end.is_none());
587            assert_eq!(start_next, Some(250));
588
589            // Missing items should not include pruned ranges
590            let items = cache.missing_items(200, 5);
591            assert_eq!(items, vec![200, 201, 202, 203, 204]);
592
593            // Verify remaining data is still accessible
594            assert!(cache.has(250));
595            assert!(cache.has(350));
596            assert_eq!(cache.get(250).await.unwrap(), Some(250));
597            assert_eq!(cache.get(350).await.unwrap(), Some(350));
598        });
599    }
600
601    #[test_traced]
602    fn test_cache_sparse_indices() {
603        let executor = deterministic::Runner::default();
604        executor.start(|context| async move {
605            let cfg = Config {
606                partition: "test_partition".into(),
607                codec_config: (),
608                compression: None,
609                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
610                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
611                items_per_blob: NZU64!(100), // Smaller sections for testing
612                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
613            };
614            let mut cache = Cache::init(context.clone(), cfg.clone())
615                .await
616                .expect("Failed to initialize cache");
617
618            // Insert sparse values
619            let indices = vec![
620                (0u64, 0),
621                (99u64, 99),   // End of first section
622                (100u64, 100), // Start of second section
623                (500u64, 500), // Start of sixth section
624            ];
625
626            for (index, value) in &indices {
627                cache.put(*index, *value).await.expect("Failed to put data");
628            }
629
630            // Check that intermediate indices don't exist
631            assert!(!cache.has(1));
632            assert!(!cache.has(50));
633            assert!(!cache.has(101));
634            assert!(!cache.has(499));
635
636            // Verify gap detection works correctly
637            let (current_end, start_next) = cache.next_gap(50);
638            assert!(current_end.is_none());
639            assert_eq!(start_next, Some(99));
640
641            let (current_end, start_next) = cache.next_gap(99);
642            assert_eq!(current_end, Some(100));
643            assert_eq!(start_next, Some(500));
644
645            // Sync and verify
646            cache.sync().await.expect("Failed to sync");
647
648            for (index, value) in &indices {
649                let retrieved = cache
650                    .get(*index)
651                    .await
652                    .expect("Failed to get data")
653                    .expect("Data not found");
654                assert_eq!(retrieved, *value);
655            }
656        });
657    }
658
659    #[test_traced]
660    fn test_cache_intervals_edge_cases() {
661        let executor = deterministic::Runner::default();
662        executor.start(|context| async move {
663            let cfg = Config {
664                partition: "test_partition".into(),
665                codec_config: (),
666                compression: None,
667                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
668                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
669                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
670                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
671            };
672            let mut cache = Cache::init(context.clone(), cfg.clone())
673                .await
674                .expect("Failed to initialize cache");
675
676            // Test edge case: single item
677            cache.put(42, 42).await.unwrap();
678
679            let (current_end, start_next) = cache.next_gap(42);
680            assert_eq!(current_end, Some(42));
681            assert!(start_next.is_none());
682
683            let (current_end, start_next) = cache.next_gap(41);
684            assert!(current_end.is_none());
685            assert_eq!(start_next, Some(42));
686
687            let (current_end, start_next) = cache.next_gap(43);
688            assert!(current_end.is_none());
689            assert!(start_next.is_none());
690
691            // Test edge case: consecutive items
692            cache.put(43, 43).await.unwrap();
693            cache.put(44, 44).await.unwrap();
694
695            let (current_end, start_next) = cache.next_gap(42);
696            assert_eq!(current_end, Some(44));
697            assert!(start_next.is_none());
698
699            // Test edge case: boundary values
700            cache.put(u64::MAX - 1, 999).await.unwrap();
701
702            let (current_end, start_next) = cache.next_gap(u64::MAX - 2);
703            assert!(current_end.is_none());
704            assert_eq!(start_next, Some(u64::MAX - 1));
705
706            let (current_end, start_next) = cache.next_gap(u64::MAX - 1);
707            assert_eq!(current_end, Some(u64::MAX - 1));
708            assert!(start_next.is_none());
709        });
710    }
711
712    #[test_traced]
713    fn test_cache_intervals_duplicate_inserts() {
714        let executor = deterministic::Runner::default();
715        executor.start(|context| async move {
716            let cfg = Config {
717                partition: "test_partition".into(),
718                codec_config: (),
719                compression: None,
720                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
721                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
722                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
723                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
724            };
725            let mut cache = Cache::init(context.clone(), cfg.clone())
726                .await
727                .expect("Failed to initialize cache");
728
729            // Insert initial value
730            cache.put(10, 10).await.unwrap();
731            assert!(cache.has(10));
732            assert_eq!(cache.get(10).await.unwrap(), Some(10));
733
734            // Try to insert duplicate - should be no-op
735            cache.put(10, 20).await.unwrap();
736            assert!(cache.has(10));
737            assert_eq!(cache.get(10).await.unwrap(), Some(10)); // Should still be original value
738
739            // Verify intervals are correct
740            let (current_end, start_next) = cache.next_gap(10);
741            assert_eq!(current_end, Some(10));
742            assert!(start_next.is_none());
743
744            // Insert adjacent values
745            cache.put(9, 9).await.unwrap();
746            cache.put(11, 11).await.unwrap();
747
748            // Verify intervals updated correctly
749            let (current_end, start_next) = cache.next_gap(9);
750            assert_eq!(current_end, Some(11));
751            assert!(start_next.is_none());
752        });
753    }
754}