Skip to main content

commonware_storage/freezer/
storage.rs

1use super::{Config, Error, Identifier};
2use crate::{
3    journal::segmented::oversized::{
4        Config as OversizedConfig, Oversized, Record as OversizedRecord,
5    },
6    kv, Persistable,
7};
8use commonware_codec::{CodecShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite};
9use commonware_cryptography::{crc32, Crc32, Hasher};
10use commonware_runtime::{buffer, Blob, Buf, BufMut, Clock, IoBufMut, Metrics, Storage};
11use commonware_utils::{Array, Span};
12use futures::future::{try_join, try_join_all};
13use prometheus_client::metrics::counter::Counter;
14use std::{cmp::Ordering, collections::BTreeSet, num::NonZeroUsize, ops::Deref};
15use tracing::debug;
16
17/// The percentage of table entries that must reach `table_resize_frequency`
18/// before a resize is triggered.
19const RESIZE_THRESHOLD: u64 = 50;
20
21/// Location of an item in the [Freezer].
22///
23/// This can be used to directly access the data for a given
24/// key-value pair (rather than walking the journal chain).
25#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
26#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
27#[repr(transparent)]
28pub struct Cursor([u8; u64::SIZE + u64::SIZE + u32::SIZE]);
29
30impl Cursor {
31    /// Create a new [Cursor].
32    fn new(section: u64, offset: u64, size: u32) -> Self {
33        let mut buf = [0u8; u64::SIZE + u64::SIZE + u32::SIZE];
34        buf[..u64::SIZE].copy_from_slice(&section.to_be_bytes());
35        buf[u64::SIZE..u64::SIZE + u64::SIZE].copy_from_slice(&offset.to_be_bytes());
36        buf[u64::SIZE + u64::SIZE..].copy_from_slice(&size.to_be_bytes());
37        Self(buf)
38    }
39
40    /// Get the section of the cursor.
41    fn section(&self) -> u64 {
42        u64::from_be_bytes(self.0[..u64::SIZE].try_into().unwrap())
43    }
44
45    /// Get the offset of the cursor.
46    fn offset(&self) -> u64 {
47        u64::from_be_bytes(self.0[u64::SIZE..u64::SIZE + u64::SIZE].try_into().unwrap())
48    }
49
50    /// Get the size of the value.
51    fn size(&self) -> u32 {
52        u32::from_be_bytes(self.0[u64::SIZE + u64::SIZE..].try_into().unwrap())
53    }
54}
55
56impl Read for Cursor {
57    type Cfg = ();
58
59    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
60        <[u8; u64::SIZE + u64::SIZE + u32::SIZE]>::read(buf).map(Self)
61    }
62}
63
64impl CodecWrite for Cursor {
65    fn write(&self, buf: &mut impl BufMut) {
66        self.0.write(buf);
67    }
68}
69
70impl FixedSize for Cursor {
71    const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
72}
73
74impl Span for Cursor {}
75
76impl Array for Cursor {}
77
78impl Deref for Cursor {
79    type Target = [u8];
80    fn deref(&self) -> &Self::Target {
81        &self.0
82    }
83}
84
85impl AsRef<[u8]> for Cursor {
86    fn as_ref(&self) -> &[u8] {
87        &self.0
88    }
89}
90
91impl std::fmt::Debug for Cursor {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        write!(
94            f,
95            "Cursor(section={}, offset={}, size={})",
96            self.section(),
97            self.offset(),
98            self.size()
99        )
100    }
101}
102
103impl std::fmt::Display for Cursor {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        write!(
106            f,
107            "Cursor(section={}, offset={}, size={})",
108            self.section(),
109            self.offset(),
110            self.size()
111        )
112    }
113}
114
115/// Marker of [Freezer] progress.
116///
117/// This can be used to restore the [Freezer] to a consistent
118/// state after shutdown.
119#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
120#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
121pub struct Checkpoint {
122    /// The epoch of the last committed operation.
123    epoch: u64,
124    /// The section of the last committed operation.
125    section: u64,
126    /// The size of the oversized index journal in the last committed section.
127    oversized_size: u64,
128    /// The size of the table.
129    table_size: u32,
130}
131
132impl Checkpoint {
133    /// Initialize a new [Checkpoint].
134    const fn init(table_size: u32) -> Self {
135        Self {
136            table_size,
137            epoch: 0,
138            section: 0,
139            oversized_size: 0,
140        }
141    }
142}
143
144impl Read for Checkpoint {
145    type Cfg = ();
146    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
147        let epoch = u64::read(buf)?;
148        let section = u64::read(buf)?;
149        let oversized_size = u64::read(buf)?;
150        let table_size = u32::read(buf)?;
151        Ok(Self {
152            epoch,
153            section,
154            oversized_size,
155            table_size,
156        })
157    }
158}
159
160impl CodecWrite for Checkpoint {
161    fn write(&self, buf: &mut impl BufMut) {
162        self.epoch.write(buf);
163        self.section.write(buf);
164        self.oversized_size.write(buf);
165        self.table_size.write(buf);
166    }
167}
168
169impl FixedSize for Checkpoint {
170    const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
171}
172
173/// Name of the table blob.
174const TABLE_BLOB_NAME: &[u8] = b"table";
175
176/// Single table entry stored in the table blob.
177#[derive(Debug, Clone, PartialEq)]
178#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
179struct Entry {
180    // Epoch in which this slot was written
181    epoch: u64,
182    // Section in which this slot was written
183    section: u64,
184    // Position in the key index for this section
185    position: u64,
186    // Number of items added to this entry since last resize
187    added: u8,
188    // CRC of (epoch | section | position | added)
189    crc: u32,
190}
191
192impl Entry {
193    /// The full size of a table entry (2 slots).
194    const FULL_SIZE: usize = Self::SIZE * 2;
195
196    /// Compute a checksum for [Entry].
197    fn compute_crc(epoch: u64, section: u64, position: u64, added: u8) -> u32 {
198        let mut hasher = Crc32::new();
199        hasher.update(&epoch.to_be_bytes());
200        hasher.update(&section.to_be_bytes());
201        hasher.update(&position.to_be_bytes());
202        hasher.update(&added.to_be_bytes());
203        hasher.finalize().as_u32()
204    }
205
206    /// Create a new [Entry].
207    fn new(epoch: u64, section: u64, position: u64, added: u8) -> Self {
208        Self {
209            epoch,
210            section,
211            position,
212            added,
213            crc: Self::compute_crc(epoch, section, position, added),
214        }
215    }
216
217    /// Create a new empty [Entry].
218    const fn new_empty() -> Self {
219        Self {
220            epoch: 0,
221            section: 0,
222            position: 0,
223            added: 0,
224            crc: 0,
225        }
226    }
227
228    /// Check if this entry is empty (all zeros).
229    const fn is_empty(&self) -> bool {
230        self.epoch == 0
231            && self.section == 0
232            && self.position == 0
233            && self.added == 0
234            && self.crc == 0
235    }
236
237    /// Check if this entry is valid.
238    ///
239    /// An empty entry does not have a valid checksum and is treated as invalid by this function.
240    fn is_valid(&self) -> bool {
241        Self::compute_crc(self.epoch, self.section, self.position, self.added) == self.crc
242    }
243}
244
245impl FixedSize for Entry {
246    const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u8::SIZE + crc32::Digest::SIZE;
247}
248
249impl CodecWrite for Entry {
250    fn write(&self, buf: &mut impl BufMut) {
251        self.epoch.write(buf);
252        self.section.write(buf);
253        self.position.write(buf);
254        self.added.write(buf);
255        self.crc.write(buf);
256    }
257}
258
259impl Read for Entry {
260    type Cfg = ();
261    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
262        let epoch = u64::read(buf)?;
263        let section = u64::read(buf)?;
264        let position = u64::read(buf)?;
265        let added = u8::read(buf)?;
266        let crc = u32::read(buf)?;
267
268        Ok(Self {
269            epoch,
270            section,
271            position,
272            added,
273            crc,
274        })
275    }
276}
277
278/// Sentinel value indicating no next entry in the collision chain.
279const NO_NEXT_SECTION: u64 = u64::MAX;
280const NO_NEXT_POSITION: u64 = u64::MAX;
281
282/// Key entry stored in the segmented/fixed key index journal.
283///
284/// All fields are fixed size, enabling efficient collision chain traversal
285/// without reading large values.
286///
287/// The `next` pointer uses sentinel values (u64::MAX, u64::MAX) to indicate
288/// "no next entry" instead of Option, ensuring fixed-size encoding.
289#[derive(Debug, Clone, PartialEq)]
290struct Record<K: Array> {
291    /// The key for this entry.
292    key: K,
293    /// Pointer to next entry in collision chain (section, position in key index).
294    /// Uses (u64::MAX, u64::MAX) as sentinel for "no next".
295    next_section: u64,
296    next_position: u64,
297    /// Byte offset in value journal (same section).
298    value_offset: u64,
299    /// Size of value data in the value journal.
300    value_size: u32,
301}
302
303impl<K: Array> Record<K> {
304    /// Create a new [Record].
305    fn new(key: K, next: Option<(u64, u64)>, value_offset: u64, value_size: u32) -> Self {
306        let (next_section, next_position) = next.unwrap_or((NO_NEXT_SECTION, NO_NEXT_POSITION));
307        Self {
308            key,
309            next_section,
310            next_position,
311            value_offset,
312            value_size,
313        }
314    }
315
316    /// Get the next entry in the collision chain, if any.
317    const fn next(&self) -> Option<(u64, u64)> {
318        if self.next_section == NO_NEXT_SECTION && self.next_position == NO_NEXT_POSITION {
319            None
320        } else {
321            Some((self.next_section, self.next_position))
322        }
323    }
324}
325
326impl<K: Array> CodecWrite for Record<K> {
327    fn write(&self, buf: &mut impl BufMut) {
328        self.key.write(buf);
329        self.next_section.write(buf);
330        self.next_position.write(buf);
331        self.value_offset.write(buf);
332        self.value_size.write(buf);
333    }
334}
335
336impl<K: Array> Read for Record<K> {
337    type Cfg = ();
338    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
339        let key = K::read(buf)?;
340        let next_section = u64::read(buf)?;
341        let next_position = u64::read(buf)?;
342        let value_offset = u64::read(buf)?;
343        let value_size = u32::read(buf)?;
344
345        Ok(Self {
346            key,
347            next_section,
348            next_position,
349            value_offset,
350            value_size,
351        })
352    }
353}
354
355impl<K: Array> FixedSize for Record<K> {
356    // key + next_section + next_position + value_offset + value_size
357    const SIZE: usize = K::SIZE + u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
358}
359
360impl<K: Array> OversizedRecord for Record<K> {
361    fn value_location(&self) -> (u64, u32) {
362        (self.value_offset, self.value_size)
363    }
364
365    fn with_location(mut self, offset: u64, size: u32) -> Self {
366        self.value_offset = offset;
367        self.value_size = size;
368        self
369    }
370}
371
372#[cfg(feature = "arbitrary")]
373impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
374where
375    K: for<'a> arbitrary::Arbitrary<'a>,
376{
377    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
378        Ok(Self {
379            key: K::arbitrary(u)?,
380            next_section: u64::arbitrary(u)?,
381            next_position: u64::arbitrary(u)?,
382            value_offset: u64::arbitrary(u)?,
383            value_size: u32::arbitrary(u)?,
384        })
385    }
386}
387
388/// Implementation of [Freezer].
389pub struct Freezer<E: Storage + Metrics + Clock, K: Array, V: CodecShared> {
390    // Context for storage operations
391    context: E,
392
393    // Table configuration
394    table_partition: String,
395    table_size: u32,
396    table_resize_threshold: u64,
397    table_resize_frequency: u8,
398    table_resize_chunk_size: u32,
399
400    // Table blob that maps slots to key index chain heads
401    table: E::Blob,
402
403    // Combined key index + value storage with crash recovery
404    oversized: Oversized<E, Record<K>, V>,
405
406    // Target size for value blob sections
407    blob_target_size: u64,
408
409    // Current section for new writes
410    current_section: u64,
411    next_epoch: u64,
412
413    // Sections with pending table updates to be synced
414    modified_sections: BTreeSet<u64>,
415    resizable: u32,
416    resize_progress: Option<u32>,
417
418    // Metrics
419    puts: Counter,
420    gets: Counter,
421    unnecessary_reads: Counter,
422    unnecessary_writes: Counter,
423    resizes: Counter,
424}
425
426impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Freezer<E, K, V> {
427    /// Calculate the byte offset for a table index.
428    #[inline]
429    const fn table_offset(table_index: u32) -> u64 {
430        table_index as u64 * Entry::FULL_SIZE as u64
431    }
432
433    /// Parse table entries from a buffer.
434    fn parse_entries(buf: &[u8]) -> Result<(Entry, Entry), Error> {
435        let mut buf1 = &buf[0..Entry::SIZE];
436        let entry1 = Entry::read(&mut buf1)?;
437        let mut buf2 = &buf[Entry::SIZE..Entry::FULL_SIZE];
438        let entry2 = Entry::read(&mut buf2)?;
439        Ok((entry1, entry2))
440    }
441
442    /// Read entries from the table blob.
443    async fn read_table(blob: &E::Blob, table_index: u32) -> Result<(Entry, Entry), Error> {
444        let offset = Self::table_offset(table_index);
445        let read_buf = blob
446            .read_at(offset, IoBufMut::zeroed(Entry::FULL_SIZE))
447            .await?;
448
449        Self::parse_entries(read_buf.coalesce().as_ref())
450    }
451
452    /// Recover a single table entry and update tracking.
453    async fn recover_entry(
454        blob: &E::Blob,
455        entry: &mut Entry,
456        entry_offset: u64,
457        max_valid_epoch: Option<u64>,
458        max_epoch: &mut u64,
459        max_section: &mut u64,
460    ) -> Result<bool, Error> {
461        if entry.is_empty() {
462            return Ok(false);
463        }
464
465        if !entry.is_valid()
466            || (max_valid_epoch.is_some() && entry.epoch > max_valid_epoch.unwrap())
467        {
468            debug!(
469                valid_epoch = max_valid_epoch,
470                entry_epoch = entry.epoch,
471                "found invalid table entry"
472            );
473            *entry = Entry::new_empty();
474            let zero_buf = vec![0u8; Entry::SIZE];
475            blob.write_at(entry_offset, zero_buf).await?;
476            Ok(true)
477        } else if max_valid_epoch.is_none() && entry.epoch > *max_epoch {
478            // Only track max epoch if we're discovering it (not validating against a known epoch)
479            *max_epoch = entry.epoch;
480            *max_section = entry.section;
481            Ok(false)
482        } else {
483            Ok(false)
484        }
485    }
486
487    /// Validate and clean invalid table entries for a given epoch.
488    ///
489    /// Returns (modified, max_epoch, max_section, resizable) where:
490    /// - modified: whether any entries were cleaned
491    /// - max_epoch: the maximum valid epoch found
492    /// - max_section: the section corresponding to `max_epoch`
493    /// - resizable: the number of entries that can be resized
494    async fn recover_table(
495        blob: &E::Blob,
496        table_size: u32,
497        table_resize_frequency: u8,
498        max_valid_epoch: Option<u64>,
499        table_replay_buffer: NonZeroUsize,
500    ) -> Result<(bool, u64, u64, u32), Error> {
501        // Create a buffered reader for efficient scanning
502        let blob_size = Self::table_offset(table_size);
503        let mut reader = buffer::Read::new(blob.clone(), blob_size, table_replay_buffer);
504
505        // Iterate over all table entries and overwrite invalid ones
506        let mut modified = false;
507        let mut max_epoch = 0u64;
508        let mut max_section = 0u64;
509        let mut resizable = 0u32;
510        for table_index in 0..table_size {
511            let offset = Self::table_offset(table_index);
512
513            // Read both entries from the buffer
514            let mut buf = [0u8; Entry::FULL_SIZE];
515            reader.read_exact(&mut buf, Entry::FULL_SIZE).await?;
516            let (mut entry1, mut entry2) = Self::parse_entries(&buf)?;
517
518            // Check both entries
519            let entry1_cleared = Self::recover_entry(
520                blob,
521                &mut entry1,
522                offset,
523                max_valid_epoch,
524                &mut max_epoch,
525                &mut max_section,
526            )
527            .await?;
528            let entry2_cleared = Self::recover_entry(
529                blob,
530                &mut entry2,
531                offset + Entry::SIZE as u64,
532                max_valid_epoch,
533                &mut max_epoch,
534                &mut max_section,
535            )
536            .await?;
537            modified |= entry1_cleared || entry2_cleared;
538
539            // If the latest entry has reached the resize frequency, increment the resizable entries
540            if let Some((_, _, added)) = Self::read_latest_entry(&entry1, &entry2) {
541                if added >= table_resize_frequency {
542                    resizable += 1;
543                }
544            }
545        }
546
547        Ok((modified, max_epoch, max_section, resizable))
548    }
549
550    /// Determine the write offset for a table entry based on current entries and epoch.
551    const fn compute_write_offset(entry1: &Entry, entry2: &Entry, epoch: u64) -> u64 {
552        // If either entry matches the current epoch, overwrite it
553        if !entry1.is_empty() && entry1.epoch == epoch {
554            return 0;
555        }
556        if !entry2.is_empty() && entry2.epoch == epoch {
557            return Entry::SIZE as u64;
558        }
559
560        // Otherwise, write to the older slot (or empty slot)
561        match (entry1.is_empty(), entry2.is_empty()) {
562            (true, _) => 0,                  // First slot is empty
563            (_, true) => Entry::SIZE as u64, // Second slot is empty
564            (false, false) => {
565                if entry1.epoch < entry2.epoch {
566                    0
567                } else {
568                    Entry::SIZE as u64
569                }
570            }
571        }
572    }
573
574    /// Read the latest valid entry from two table slots.
575    fn read_latest_entry(entry1: &Entry, entry2: &Entry) -> Option<(u64, u64, u8)> {
576        match (
577            !entry1.is_empty() && entry1.is_valid(),
578            !entry2.is_empty() && entry2.is_valid(),
579        ) {
580            (true, true) => match entry1.epoch.cmp(&entry2.epoch) {
581                Ordering::Greater => Some((entry1.section, entry1.position, entry1.added)),
582                Ordering::Less => Some((entry2.section, entry2.position, entry2.added)),
583                Ordering::Equal => {
584                    unreachable!("two valid entries with the same epoch")
585                }
586            },
587            (true, false) => Some((entry1.section, entry1.position, entry1.added)),
588            (false, true) => Some((entry2.section, entry2.position, entry2.added)),
589            (false, false) => None,
590        }
591    }
592
593    /// Write a table entry to the appropriate slot based on epoch.
594    async fn update_head(
595        table: &E::Blob,
596        table_index: u32,
597        entry1: &Entry,
598        entry2: &Entry,
599        update: Entry,
600    ) -> Result<(), Error> {
601        // Calculate the base offset for this table index
602        let table_offset = Self::table_offset(table_index);
603
604        // Determine which slot to write to based on the provided entries
605        let start = Self::compute_write_offset(entry1, entry2, update.epoch);
606
607        // Write the new entry
608        table
609            .write_at(table_offset + start, update.encode_mut())
610            .await
611            .map_err(Error::Runtime)
612    }
613
614    /// Initialize table with given size and sync.
615    async fn init_table(blob: &E::Blob, table_size: u32) -> Result<(), Error> {
616        let table_len = Self::table_offset(table_size);
617        blob.resize(table_len).await?;
618        blob.sync().await?;
619        Ok(())
620    }
621
622    /// Initialize a new [Freezer] instance.
623    pub async fn init(context: E, config: Config<V::Cfg>) -> Result<Self, Error> {
624        Self::init_with_checkpoint(context, config, None).await
625    }
626
627    /// Initialize a new [Freezer] instance with a [Checkpoint].
628    // TODO(#1227): Hide this complexity from the caller.
629    pub async fn init_with_checkpoint(
630        context: E,
631        config: Config<V::Cfg>,
632        checkpoint: Option<Checkpoint>,
633    ) -> Result<Self, Error> {
634        // Validate that initial_table_size is a power of 2
635        assert!(
636            config.table_initial_size > 0 && config.table_initial_size.is_power_of_two(),
637            "table_initial_size must be a power of 2"
638        );
639
640        // Initialize oversized journal (handles crash recovery)
641        let oversized_cfg = OversizedConfig {
642            index_partition: config.key_partition.clone(),
643            value_partition: config.value_partition.clone(),
644            index_page_cache: config.key_page_cache.clone(),
645            index_write_buffer: config.key_write_buffer,
646            value_write_buffer: config.value_write_buffer,
647            compression: config.value_compression,
648            codec_config: config.codec_config,
649        };
650        let mut oversized: Oversized<E, Record<K>, V> =
651            Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
652
653        // Open table blob
654        let (table, table_len) = context
655            .open(&config.table_partition, TABLE_BLOB_NAME)
656            .await?;
657
658        // Determine checkpoint based on initialization scenario
659        let (checkpoint, resizable) = match (table_len, checkpoint) {
660            // New table with no data
661            (0, None) => {
662                Self::init_table(&table, config.table_initial_size).await?;
663                (Checkpoint::init(config.table_initial_size), 0)
664            }
665
666            // New table with explicit checkpoint (must be empty)
667            (0, Some(checkpoint)) => {
668                assert_eq!(checkpoint.epoch, 0);
669                assert_eq!(checkpoint.section, 0);
670                assert_eq!(checkpoint.oversized_size, 0);
671                assert_eq!(checkpoint.table_size, 0);
672
673                Self::init_table(&table, config.table_initial_size).await?;
674                (Checkpoint::init(config.table_initial_size), 0)
675            }
676
677            // Existing table with checkpoint
678            (_, Some(checkpoint)) => {
679                assert!(
680                    checkpoint.table_size > 0 && checkpoint.table_size.is_power_of_two(),
681                    "table_size must be a power of 2"
682                );
683
684                // Rewind oversized to the committed section and key size
685                oversized
686                    .rewind(checkpoint.section, checkpoint.oversized_size)
687                    .await?;
688
689                // Sync oversized
690                oversized.sync(checkpoint.section).await?;
691
692                // Resize table if needed
693                let expected_table_len = Self::table_offset(checkpoint.table_size);
694                let mut modified = if table_len != expected_table_len {
695                    table.resize(expected_table_len).await?;
696                    true
697                } else {
698                    false
699                };
700
701                // Validate and clean invalid entries
702                let (table_modified, _, _, resizable) = Self::recover_table(
703                    &table,
704                    checkpoint.table_size,
705                    config.table_resize_frequency,
706                    Some(checkpoint.epoch),
707                    config.table_replay_buffer,
708                )
709                .await?;
710                if table_modified {
711                    modified = true;
712                }
713
714                // Sync table if needed
715                if modified {
716                    table.sync().await?;
717                }
718
719                (checkpoint, resizable)
720            }
721
722            // Existing table without checkpoint
723            (_, None) => {
724                // Find max epoch/section and clean invalid entries in a single pass
725                let table_size = (table_len / Entry::FULL_SIZE as u64) as u32;
726                let (modified, max_epoch, max_section, resizable) = Self::recover_table(
727                    &table,
728                    table_size,
729                    config.table_resize_frequency,
730                    None,
731                    config.table_replay_buffer,
732                )
733                .await?;
734
735                // Sync table if needed
736                if modified {
737                    table.sync().await?;
738                }
739
740                // Get sizes from oversized (crash recovery already ran during init)
741                let oversized_size = oversized.size(max_section).await?;
742
743                (
744                    Checkpoint {
745                        epoch: max_epoch,
746                        section: max_section,
747                        oversized_size,
748                        table_size,
749                    },
750                    resizable,
751                )
752            }
753        };
754
755        // Create metrics
756        let puts = Counter::default();
757        let gets = Counter::default();
758        let unnecessary_reads = Counter::default();
759        let unnecessary_writes = Counter::default();
760        let resizes = Counter::default();
761        context.register("puts", "number of put operations", puts.clone());
762        context.register("gets", "number of get operations", gets.clone());
763        context.register(
764            "unnecessary_reads",
765            "number of unnecessary reads performed during key lookups",
766            unnecessary_reads.clone(),
767        );
768        context.register(
769            "unnecessary_writes",
770            "number of unnecessary writes performed during resize",
771            unnecessary_writes.clone(),
772        );
773        context.register(
774            "resizes",
775            "number of table resizing operations",
776            resizes.clone(),
777        );
778
779        Ok(Self {
780            context,
781            table_partition: config.table_partition,
782            table_size: checkpoint.table_size,
783            table_resize_threshold: checkpoint.table_size as u64 * RESIZE_THRESHOLD / 100,
784            table_resize_frequency: config.table_resize_frequency,
785            table_resize_chunk_size: config.table_resize_chunk_size,
786            table,
787            oversized,
788            blob_target_size: config.value_target_size,
789            current_section: checkpoint.section,
790            next_epoch: checkpoint.epoch.checked_add(1).expect("epoch overflow"),
791            modified_sections: BTreeSet::new(),
792            resizable,
793            resize_progress: None,
794            puts,
795            gets,
796            unnecessary_reads,
797            unnecessary_writes,
798            resizes,
799        })
800    }
801
802    /// Compute the table index for a given key.
803    ///
804    /// As the table doubles in size during a resize, each existing entry splits into two:
805    /// one at the original index and another at a new index (original index + previous table size).
806    ///
807    /// For example, with an initial table size of 4 (2^2):
808    /// - Initially: uses 2 bits of the hash, mapping to entries 0, 1, 2, 3.
809    /// - After resizing to 8: uses 3 bits, entry 0 splits into indices 0 and 4.
810    /// - After resizing to 16: uses 4 bits, entry 0 splits into indices 0 and 8, and so on.
811    ///
812    /// To determine the appropriate entry, we AND the key's hash with the current table size.
813    fn table_index(&self, key: &K) -> u32 {
814        let hash = Crc32::checksum(key.as_ref());
815        hash & (self.table_size - 1)
816    }
817
818    /// Determine if the table should be resized.
819    const fn should_resize(&self) -> bool {
820        self.resizable as u64 >= self.table_resize_threshold
821    }
822
823    /// Determine which blob section to write to based on current blob size.
824    async fn update_section(&mut self) -> Result<(), Error> {
825        // Get the current value blob section size
826        let value_size = self.oversized.value_size(self.current_section).await?;
827
828        // If the current section has reached the target size, create a new section
829        if value_size >= self.blob_target_size {
830            self.current_section += 1;
831            debug!(
832                size = value_size,
833                section = self.current_section,
834                "updated section"
835            );
836        }
837
838        Ok(())
839    }
840
841    /// Put a key-value pair into the [Freezer].
842    /// If the key already exists, the value is updated.
843    pub async fn put(&mut self, key: K, value: V) -> Result<Cursor, Error> {
844        self.puts.inc();
845
846        // Update the section if needed
847        self.update_section().await?;
848
849        // Get head of the chain from table
850        let table_index = self.table_index(&key);
851        let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
852        let head = Self::read_latest_entry(&entry1, &entry2);
853
854        // Create key entry with pointer to previous head (value location set by oversized.append)
855        let key_entry = Record::new(
856            key,
857            head.map(|(section, position, _)| (section, position)),
858            0,
859            0,
860        );
861
862        // Write value and key entry (glob first, then index)
863        let (position, value_offset, value_size) = self
864            .oversized
865            .append(self.current_section, key_entry, &value)
866            .await?;
867
868        // Update the number of items added to the entry.
869        //
870        // We use `saturating_add` to handle overflow (when the table is at max size) gracefully.
871        let mut added = head.map(|(_, _, added)| added).unwrap_or(0);
872        added = added.saturating_add(1);
873
874        // If we've reached the threshold for resizing, increment the resizable entries
875        if added == self.table_resize_frequency {
876            self.resizable += 1;
877        }
878
879        // Update the old position
880        self.modified_sections.insert(self.current_section);
881        let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
882        Self::update_head(&self.table, table_index, &entry1, &entry2, new_entry).await?;
883
884        // If we're mid-resize and this entry has already been processed, update the new position too
885        if let Some(resize_progress) = self.resize_progress {
886            if table_index < resize_progress {
887                self.unnecessary_writes.inc();
888
889                // If the previous entry crossed the threshold, so did this one
890                if added == self.table_resize_frequency {
891                    self.resizable += 1;
892                }
893
894                // This entry has been processed, so we need to update the new position as well.
895                //
896                // The entries are still identical to the old ones, so we don't need to read them again.
897                let new_table_index = self.table_size + table_index;
898                let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
899                Self::update_head(&self.table, new_table_index, &entry1, &entry2, new_entry)
900                    .await?;
901            }
902        }
903
904        Ok(Cursor::new(self.current_section, value_offset, value_size))
905    }
906
907    /// Get the value for a given [Cursor].
908    async fn get_cursor(&self, cursor: Cursor) -> Result<V, Error> {
909        let value = self
910            .oversized
911            .get_value(cursor.section(), cursor.offset(), cursor.size())
912            .await?;
913
914        Ok(value)
915    }
916
917    /// Get the first value for a given key.
918    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
919        self.gets.inc();
920
921        // Get head of the chain from table
922        let table_index = self.table_index(key);
923        let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
924        let Some((mut section, mut position, _)) = Self::read_latest_entry(&entry1, &entry2) else {
925            return Ok(None);
926        };
927
928        // Follow the linked list chain to find the first matching key
929        loop {
930            // Get the key entry from the fixed key index (efficient, good cache locality)
931            let key_entry = self.oversized.get(section, position).await?;
932
933            // Check if this key matches
934            if key_entry.key.as_ref() == key.as_ref() {
935                let value = self
936                    .oversized
937                    .get_value(section, key_entry.value_offset, key_entry.value_size)
938                    .await?;
939                return Ok(Some(value));
940            }
941
942            // Increment unnecessary reads
943            self.unnecessary_reads.inc();
944
945            // Follow the chain
946            let Some(next) = key_entry.next() else {
947                break; // End of chain
948            };
949            section = next.0;
950            position = next.1;
951        }
952
953        Ok(None)
954    }
955
956    /// Get the value for a given [Identifier].
957    ///
958    /// If a [Cursor] is known for the required key, it
959    /// is much faster to use it than searching for a `key`.
960    pub async fn get<'a>(&'a self, identifier: Identifier<'a, K>) -> Result<Option<V>, Error> {
961        match identifier {
962            Identifier::Cursor(cursor) => self.get_cursor(cursor).await.map(Some),
963            Identifier::Key(key) => self.get_key(key).await,
964        }
965    }
966
967    /// Resize the table by doubling its size and split each entry into two.
968    async fn start_resize(&mut self) -> Result<(), Error> {
969        self.resizes.inc();
970
971        // Double the table size (if not already at the max size)
972        let old_size = self.table_size;
973        let Some(new_size) = old_size.checked_mul(2) else {
974            return Ok(());
975        };
976        self.table.resize(Self::table_offset(new_size)).await?;
977
978        // Start the resize
979        self.resize_progress = Some(0);
980        debug!(old = old_size, new = new_size, "table resize started");
981
982        Ok(())
983    }
984
985    /// Write a pair of entries to a buffer, replacing one slot with the new entry.
986    fn rewrite_entries(buf: &mut Vec<u8>, entry1: &Entry, entry2: &Entry, new_entry: &Entry) {
987        if Self::compute_write_offset(entry1, entry2, new_entry.epoch) == 0 {
988            buf.extend_from_slice(&new_entry.encode());
989            buf.extend_from_slice(&entry2.encode());
990        } else {
991            buf.extend_from_slice(&entry1.encode());
992            buf.extend_from_slice(&new_entry.encode());
993        }
994    }
995
996    /// Continue a resize operation by processing the next chunk of entries.
997    ///
998    /// This function processes `table_resize_chunk_size` entries at a time, allowing the resize to
999    /// be spread across multiple sync operations to avoid latency spikes.
1000    async fn advance_resize(&mut self) -> Result<(), Error> {
1001        // Compute the range to update
1002        let current_index = self.resize_progress.unwrap();
1003        let old_size = self.table_size;
1004        let chunk_end = (current_index + self.table_resize_chunk_size).min(old_size);
1005        let chunk_size = chunk_end - current_index;
1006
1007        // Read the entire chunk
1008        let chunk_bytes = chunk_size as usize * Entry::FULL_SIZE;
1009        let read_offset = Self::table_offset(current_index);
1010        let read_buf = self
1011            .table
1012            .read_at(read_offset, IoBufMut::zeroed(chunk_bytes))
1013            .await?
1014            .coalesce();
1015
1016        // Process each entry in the chunk
1017        let mut writes = Vec::with_capacity(chunk_bytes);
1018        for i in 0..chunk_size {
1019            // Get the entry
1020            let entry_offset = i as usize * Entry::FULL_SIZE;
1021            let entry_end = entry_offset + Entry::FULL_SIZE;
1022            let entry_buf = &read_buf.as_ref()[entry_offset..entry_end];
1023
1024            // Parse the two slots
1025            let (entry1, entry2) = Self::parse_entries(entry_buf)?;
1026
1027            // Get the current head
1028            let head = Self::read_latest_entry(&entry1, &entry2);
1029
1030            // Get the reset entry (may be empty)
1031            let reset_entry = match head {
1032                Some((section, position, added)) => {
1033                    // If the entry was at or over the threshold, decrement the resizable entries.
1034                    if added >= self.table_resize_frequency {
1035                        self.resizable -= 1;
1036                    }
1037                    Entry::new(self.next_epoch, section, position, 0)
1038                }
1039                None => Entry::new_empty(),
1040            };
1041
1042            // Rewrite the entries
1043            Self::rewrite_entries(&mut writes, &entry1, &entry2, &reset_entry);
1044        }
1045
1046        // Put the writes into the table
1047        let old_write = self.table.write_at(read_offset, writes.clone());
1048        let new_offset = (old_size as usize * Entry::FULL_SIZE) as u64 + read_offset;
1049        let new_write = self.table.write_at(new_offset, writes);
1050        try_join(old_write, new_write).await?;
1051
1052        // Update progress
1053        if chunk_end >= old_size {
1054            // Resize complete
1055            self.table_size = old_size * 2;
1056            self.table_resize_threshold = self.table_size as u64 * RESIZE_THRESHOLD / 100;
1057            self.resize_progress = None;
1058            debug!(
1059                old = old_size,
1060                new = self.table_size,
1061                "table resize completed"
1062            );
1063        } else {
1064            // More chunks to process
1065            self.resize_progress = Some(chunk_end);
1066            debug!(current = current_index, chunk_end, "table resize progress");
1067        }
1068
1069        Ok(())
1070    }
1071
1072    /// Sync all pending data in [Freezer].
1073    ///
1074    /// If the table needs to be resized, the resize will begin during this sync.
1075    /// The resize operation is performed incrementally across multiple sync calls
1076    /// to avoid a large latency spike (or unexpected long latency for [Freezer::put]).
1077    /// Each sync will process up to `table_resize_chunk_size` entries until the resize
1078    /// is complete.
1079    pub async fn sync(&mut self) -> Result<Checkpoint, Error> {
1080        // Sync all modified sections for oversized journal
1081        let syncs: Vec<_> = self
1082            .modified_sections
1083            .iter()
1084            .map(|section| self.oversized.sync(*section))
1085            .collect();
1086        try_join_all(syncs).await?;
1087        self.modified_sections.clear();
1088
1089        // Start a resize (if needed)
1090        if self.should_resize() && self.resize_progress.is_none() {
1091            self.start_resize().await?;
1092        }
1093
1094        // Continue a resize (if ongoing)
1095        if self.resize_progress.is_some() {
1096            self.advance_resize().await?;
1097        }
1098
1099        // Sync updated table entries
1100        self.table.sync().await?;
1101        let stored_epoch = self.next_epoch;
1102        self.next_epoch = self.next_epoch.checked_add(1).expect("epoch overflow");
1103
1104        // Get size from oversized
1105        let oversized_size = self.oversized.size(self.current_section).await?;
1106
1107        Ok(Checkpoint {
1108            epoch: stored_epoch,
1109            section: self.current_section,
1110            oversized_size,
1111            table_size: self.table_size,
1112        })
1113    }
1114
1115    /// Close the [Freezer] and return a [Checkpoint] for recovery.
1116    pub async fn close(mut self) -> Result<Checkpoint, Error> {
1117        // If we're mid-resize, complete it
1118        while self.resize_progress.is_some() {
1119            self.advance_resize().await?;
1120        }
1121
1122        // Sync any pending updates before closing
1123        let checkpoint = self.sync().await?;
1124
1125        Ok(checkpoint)
1126    }
1127
1128    /// Close and remove any underlying blobs created by the [Freezer].
1129    pub async fn destroy(self) -> Result<(), Error> {
1130        // Destroy oversized journal
1131        self.oversized.destroy().await?;
1132
1133        // Destroy the table
1134        drop(self.table);
1135        self.context
1136            .remove(&self.table_partition, Some(TABLE_BLOB_NAME))
1137            .await?;
1138        self.context.remove(&self.table_partition, None).await?;
1139
1140        Ok(())
1141    }
1142
1143    /// Get the current progress of the resize operation.
1144    ///
1145    /// Returns `None` if the [Freezer] is not resizing.
1146    #[cfg(test)]
1147    pub const fn resizing(&self) -> Option<u32> {
1148        self.resize_progress
1149    }
1150
1151    /// Get the number of resizable entries.
1152    #[cfg(test)]
1153    pub const fn resizable(&self) -> u32 {
1154        self.resizable
1155    }
1156}
1157
1158impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Gettable for Freezer<E, K, V> {
1159    type Key = K;
1160    type Value = V;
1161    type Error = Error;
1162
1163    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
1164        self.get(Identifier::Key(key)).await
1165    }
1166}
1167
1168impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Updatable for Freezer<E, K, V> {
1169    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
1170        self.put(key, value).await?;
1171        Ok(())
1172    }
1173}
1174
1175impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Persistable for Freezer<E, K, V> {
1176    type Error = Error;
1177
1178    async fn commit(&mut self) -> Result<(), Self::Error> {
1179        self.sync().await?;
1180        Ok(())
1181    }
1182
1183    async fn sync(&mut self) -> Result<(), Self::Error> {
1184        self.sync().await?;
1185        Ok(())
1186    }
1187
1188    async fn destroy(self) -> Result<(), Self::Error> {
1189        self.destroy().await?;
1190        Ok(())
1191    }
1192}
1193
1194#[cfg(all(test, feature = "arbitrary"))]
1195mod conformance {
1196    use super::*;
1197    use commonware_codec::conformance::CodecConformance;
1198    use commonware_utils::sequence::U64;
1199
1200    commonware_conformance::conformance_tests! {
1201        CodecConformance<Cursor>,
1202        CodecConformance<Checkpoint>,
1203        CodecConformance<Entry>,
1204        CodecConformance<Record<U64>>
1205    }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use super::*;
1211    use crate::kv::tests::{assert_gettable, assert_send, assert_updatable, test_key};
1212    use commonware_macros::test_traced;
1213    use commonware_runtime::{
1214        buffer::paged::CacheRef, deterministic, deterministic::Context, IoBufMut, Runner, Storage,
1215    };
1216    use commonware_utils::{
1217        sequence::{FixedBytes, U64},
1218        NZUsize, NZU16,
1219    };
1220
1221    type TestFreezer = Freezer<Context, U64, u64>;
1222
1223    #[allow(dead_code)]
1224    fn assert_freezer_futures_are_send(freezer: &mut TestFreezer, key: U64) {
1225        assert_gettable(freezer, &key);
1226        assert_updatable(freezer, key, 0u64);
1227    }
1228
1229    #[allow(dead_code)]
1230    fn assert_freezer_destroy_is_send(freezer: TestFreezer) {
1231        assert_send(freezer.destroy());
1232    }
1233
1234    #[test_traced]
1235    fn issue_2966_regression() {
1236        let executor = deterministic::Runner::default();
1237        executor.start(|context| async move {
1238            let cfg = super::super::Config {
1239                key_partition: "test_key_index".into(),
1240                key_write_buffer: NZUsize!(1024),
1241                key_page_cache: CacheRef::new(NZU16!(1024), NZUsize!(10)),
1242                value_partition: "test_value_journal".into(),
1243                value_compression: None,
1244                value_write_buffer: NZUsize!(1024),
1245                value_target_size: 10 * 1024 * 1024,
1246                table_partition: "test_table".into(),
1247                // Use 4 entries but only insert to 2, leaving 2 empty
1248                table_initial_size: 4,
1249                table_resize_frequency: 1,
1250                table_resize_chunk_size: 4,
1251                table_replay_buffer: NZUsize!(64 * 1024),
1252                codec_config: (),
1253            };
1254            let mut freezer =
1255                Freezer::<_, FixedBytes<64>, i32>::init(context.with_label("first"), cfg.clone())
1256                    .await
1257                    .unwrap();
1258
1259            // Insert only 2 keys to different entries. With table_size=4, entries 2 and 3
1260            // should remain empty.
1261            freezer.put(test_key("key0"), 0).await.unwrap();
1262            freezer.put(test_key("key2"), 1).await.unwrap();
1263            freezer.close().await.unwrap();
1264
1265            let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
1266            let table_data = blob
1267                .read_at(0, IoBufMut::zeroed(size as usize))
1268                .await
1269                .unwrap()
1270                .coalesce();
1271
1272            // Verify resize happened (table doubled from 4 to 8)
1273            let num_entries = size as usize / Entry::FULL_SIZE;
1274            assert_eq!(num_entries, 8);
1275
1276            // Count entries where both slots are truly empty. The bug would cause empty
1277            // entries to have one slot with epoch != 0 and valid CRC.
1278            let mut both_empty_count = 0;
1279            for entry_idx in 0..num_entries {
1280                let offset = entry_idx * Entry::FULL_SIZE;
1281                let buf = &table_data.as_ref()[offset..offset + Entry::FULL_SIZE];
1282                let (slot0, slot1) =
1283                    Freezer::<Context, FixedBytes<64>, i32>::parse_entries(buf).unwrap();
1284                if slot0.is_empty() && slot1.is_empty() {
1285                    both_empty_count += 1;
1286                }
1287            }
1288            // 2 keys in 4 entries = 2 empty. After resize to 8, those become 4 empty.
1289            assert_eq!(both_empty_count, 4);
1290        });
1291    }
1292
1293    #[test_traced]
1294    fn issue_2955_regression() {
1295        let executor = deterministic::Runner::default();
1296        executor.start(|context| async move {
1297            let cfg = super::super::Config {
1298                key_partition: "test_key_index".into(),
1299                key_write_buffer: NZUsize!(1024),
1300                key_page_cache: CacheRef::new(NZU16!(1024), NZUsize!(10)),
1301                value_partition: "test_value_journal".into(),
1302                value_compression: None,
1303                value_write_buffer: NZUsize!(1024),
1304                value_target_size: 10 * 1024 * 1024,
1305                table_partition: "test_table".into(),
1306                table_initial_size: 4,
1307                table_resize_frequency: 1,
1308                table_resize_chunk_size: 4,
1309                table_replay_buffer: NZUsize!(64 * 1024),
1310                codec_config: (),
1311            };
1312
1313            // Create freezer with data
1314            let checkpoint = {
1315                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
1316                    context.with_label("first"),
1317                    cfg.clone(),
1318                )
1319                .await
1320                .unwrap();
1321                freezer.put(test_key("key0"), 42).await.unwrap();
1322                freezer.sync().await.unwrap();
1323                freezer.close().await.unwrap()
1324            };
1325
1326            // Corrupt the CRC in both slots of the table entry
1327            {
1328                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
1329                let entry_data = blob
1330                    .read_at(0, IoBufMut::zeroed(Entry::FULL_SIZE))
1331                    .await
1332                    .unwrap();
1333                let mut corrupted = entry_data.coalesce();
1334                // Corrupt CRC of first slot (last 4 bytes of first slot)
1335                corrupted.as_mut()[Entry::SIZE - 4] ^= 0xFF;
1336                // Corrupt CRC of second slot (last 4 bytes of second slot)
1337                corrupted.as_mut()[Entry::FULL_SIZE - 4] ^= 0xFF;
1338                blob.write_at(0, corrupted).await.unwrap();
1339                blob.sync().await.unwrap();
1340            }
1341
1342            // Reopen to trigger recovery. The bug would set both cleared entries to
1343            // Entry::new(0,0,0,0) which has is_empty()=false and is_valid()=true.
1344            // read_latest_entry would then see two "valid" entries with epoch=0 and
1345            // panic on unreachable!().
1346            let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1347                context.with_label("second"),
1348                cfg.clone(),
1349                Some(checkpoint),
1350            )
1351            .await
1352            .unwrap();
1353            drop(freezer);
1354        });
1355    }
1356}