Skip to main content

commonware_storage/freezer/
storage.rs

1use super::{Config, Error, Identifier};
2use crate::{
3    journal::segmented::oversized::{
4        Config as OversizedConfig, Oversized, Record as OversizedRecord,
5    },
6    kv,
7};
8use commonware_codec::{CodecShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite};
9use commonware_cryptography::{crc32, Crc32, Hasher};
10use commonware_runtime::{buffer, Blob, Buf, BufMut, BufferPooler, Clock, IoBuf, Metrics, Storage};
11use commonware_utils::{Array, Span};
12use futures::future::{try_join, try_join_all};
13use prometheus_client::metrics::counter::Counter;
14use std::{cmp::Ordering, collections::BTreeSet, num::NonZeroUsize, ops::Deref};
15use tracing::debug;
16
17/// The percentage of table entries that must reach `table_resize_frequency`
18/// before a resize is triggered.
19const RESIZE_THRESHOLD: u64 = 50;
20
21/// Location of an item in the [Freezer].
22///
23/// This can be used to directly access the data for a given
24/// key-value pair (rather than walking the journal chain).
25#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
26#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
27#[repr(transparent)]
28pub struct Cursor([u8; u64::SIZE + u64::SIZE + u32::SIZE]);
29
30impl Cursor {
31    /// Create a new [Cursor].
32    fn new(section: u64, offset: u64, size: u32) -> Self {
33        let mut buf = [0u8; u64::SIZE + u64::SIZE + u32::SIZE];
34        buf[..u64::SIZE].copy_from_slice(&section.to_be_bytes());
35        buf[u64::SIZE..u64::SIZE + u64::SIZE].copy_from_slice(&offset.to_be_bytes());
36        buf[u64::SIZE + u64::SIZE..].copy_from_slice(&size.to_be_bytes());
37        Self(buf)
38    }
39
40    /// Get the section of the cursor.
41    fn section(&self) -> u64 {
42        u64::from_be_bytes(self.0[..u64::SIZE].try_into().unwrap())
43    }
44
45    /// Get the offset of the cursor.
46    fn offset(&self) -> u64 {
47        u64::from_be_bytes(self.0[u64::SIZE..u64::SIZE + u64::SIZE].try_into().unwrap())
48    }
49
50    /// Get the size of the value.
51    fn size(&self) -> u32 {
52        u32::from_be_bytes(self.0[u64::SIZE + u64::SIZE..].try_into().unwrap())
53    }
54}
55
56impl Read for Cursor {
57    type Cfg = ();
58
59    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
60        <[u8; u64::SIZE + u64::SIZE + u32::SIZE]>::read(buf).map(Self)
61    }
62}
63
64impl CodecWrite for Cursor {
65    fn write(&self, buf: &mut impl BufMut) {
66        self.0.write(buf);
67    }
68}
69
70impl FixedSize for Cursor {
71    const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
72}
73
74impl Span for Cursor {}
75
76impl Array for Cursor {}
77
78impl Deref for Cursor {
79    type Target = [u8];
80    fn deref(&self) -> &Self::Target {
81        &self.0
82    }
83}
84
85impl AsRef<[u8]> for Cursor {
86    fn as_ref(&self) -> &[u8] {
87        &self.0
88    }
89}
90
91impl std::fmt::Debug for Cursor {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        write!(
94            f,
95            "Cursor(section={}, offset={}, size={})",
96            self.section(),
97            self.offset(),
98            self.size()
99        )
100    }
101}
102
103impl std::fmt::Display for Cursor {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        write!(
106            f,
107            "Cursor(section={}, offset={}, size={})",
108            self.section(),
109            self.offset(),
110            self.size()
111        )
112    }
113}
114
115/// Marker of [Freezer] progress.
116///
117/// This can be used to restore the [Freezer] to a consistent
118/// state after shutdown.
119#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
120#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
121pub struct Checkpoint {
122    /// The epoch of the last committed operation.
123    epoch: u64,
124    /// The section of the last committed operation.
125    section: u64,
126    /// The size of the oversized index journal in the last committed section.
127    oversized_size: u64,
128    /// The size of the table.
129    table_size: u32,
130}
131
132impl Checkpoint {
133    /// Initialize a new [Checkpoint].
134    const fn init(table_size: u32) -> Self {
135        Self {
136            table_size,
137            epoch: 0,
138            section: 0,
139            oversized_size: 0,
140        }
141    }
142}
143
144impl Read for Checkpoint {
145    type Cfg = ();
146    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
147        let epoch = u64::read(buf)?;
148        let section = u64::read(buf)?;
149        let oversized_size = u64::read(buf)?;
150        let table_size = u32::read(buf)?;
151        Ok(Self {
152            epoch,
153            section,
154            oversized_size,
155            table_size,
156        })
157    }
158}
159
160impl CodecWrite for Checkpoint {
161    fn write(&self, buf: &mut impl BufMut) {
162        self.epoch.write(buf);
163        self.section.write(buf);
164        self.oversized_size.write(buf);
165        self.table_size.write(buf);
166    }
167}
168
169impl FixedSize for Checkpoint {
170    const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
171}
172
173/// Name of the table blob.
174const TABLE_BLOB_NAME: &[u8] = b"table";
175
176/// Single table entry stored in the table blob.
177#[derive(Debug, Clone, PartialEq)]
178#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
179struct Entry {
180    // Epoch in which this slot was written
181    epoch: u64,
182    // Section in which this slot was written
183    section: u64,
184    // Position in the key index for this section
185    position: u64,
186    // Number of items added to this entry since last resize
187    added: u8,
188    // CRC of (epoch | section | position | added)
189    crc: u32,
190}
191
192impl Entry {
193    /// The full size of a table entry (2 slots).
194    const FULL_SIZE: usize = Self::SIZE * 2;
195
196    /// Compute a checksum for [Entry].
197    fn compute_crc(epoch: u64, section: u64, position: u64, added: u8) -> u32 {
198        let mut hasher = Crc32::new();
199        hasher.update(&epoch.to_be_bytes());
200        hasher.update(&section.to_be_bytes());
201        hasher.update(&position.to_be_bytes());
202        hasher.update(&added.to_be_bytes());
203        hasher.finalize().as_u32()
204    }
205
206    /// Create a new [Entry].
207    fn new(epoch: u64, section: u64, position: u64, added: u8) -> Self {
208        Self {
209            epoch,
210            section,
211            position,
212            added,
213            crc: Self::compute_crc(epoch, section, position, added),
214        }
215    }
216
217    /// Create a new empty [Entry].
218    const fn new_empty() -> Self {
219        Self {
220            epoch: 0,
221            section: 0,
222            position: 0,
223            added: 0,
224            crc: 0,
225        }
226    }
227
228    /// Check if this entry is empty (all zeros).
229    const fn is_empty(&self) -> bool {
230        self.epoch == 0
231            && self.section == 0
232            && self.position == 0
233            && self.added == 0
234            && self.crc == 0
235    }
236
237    /// Check if this entry is valid.
238    ///
239    /// An empty entry does not have a valid checksum and is treated as invalid by this function.
240    fn is_valid(&self) -> bool {
241        Self::compute_crc(self.epoch, self.section, self.position, self.added) == self.crc
242    }
243}
244
245impl FixedSize for Entry {
246    const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u8::SIZE + crc32::Digest::SIZE;
247}
248
249impl CodecWrite for Entry {
250    fn write(&self, buf: &mut impl BufMut) {
251        self.epoch.write(buf);
252        self.section.write(buf);
253        self.position.write(buf);
254        self.added.write(buf);
255        self.crc.write(buf);
256    }
257}
258
259impl Read for Entry {
260    type Cfg = ();
261    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
262        let epoch = u64::read(buf)?;
263        let section = u64::read(buf)?;
264        let position = u64::read(buf)?;
265        let added = u8::read(buf)?;
266        let crc = u32::read(buf)?;
267
268        Ok(Self {
269            epoch,
270            section,
271            position,
272            added,
273            crc,
274        })
275    }
276}
277
278/// Sentinel value indicating no next entry in the collision chain.
279const NO_NEXT_SECTION: u64 = u64::MAX;
280const NO_NEXT_POSITION: u64 = u64::MAX;
281
282/// Key entry stored in the segmented/fixed key index journal.
283///
284/// All fields are fixed size, enabling efficient collision chain traversal
285/// without reading large values.
286///
287/// The `next` pointer uses sentinel values (u64::MAX, u64::MAX) to indicate
288/// "no next entry" instead of Option, ensuring fixed-size encoding.
289#[derive(Debug, Clone, PartialEq)]
290struct Record<K: Array> {
291    /// The key for this entry.
292    key: K,
293    /// Pointer to next entry in collision chain (section, position in key index).
294    /// Uses (u64::MAX, u64::MAX) as sentinel for "no next".
295    next_section: u64,
296    next_position: u64,
297    /// Byte offset in value journal (same section).
298    value_offset: u64,
299    /// Size of value data in the value journal.
300    value_size: u32,
301}
302
303impl<K: Array> Record<K> {
304    /// Create a new [Record].
305    fn new(key: K, next: Option<(u64, u64)>, value_offset: u64, value_size: u32) -> Self {
306        let (next_section, next_position) = next.unwrap_or((NO_NEXT_SECTION, NO_NEXT_POSITION));
307        Self {
308            key,
309            next_section,
310            next_position,
311            value_offset,
312            value_size,
313        }
314    }
315
316    /// Get the next entry in the collision chain, if any.
317    const fn next(&self) -> Option<(u64, u64)> {
318        if self.next_section == NO_NEXT_SECTION && self.next_position == NO_NEXT_POSITION {
319            None
320        } else {
321            Some((self.next_section, self.next_position))
322        }
323    }
324}
325
326impl<K: Array> CodecWrite for Record<K> {
327    fn write(&self, buf: &mut impl BufMut) {
328        self.key.write(buf);
329        self.next_section.write(buf);
330        self.next_position.write(buf);
331        self.value_offset.write(buf);
332        self.value_size.write(buf);
333    }
334}
335
336impl<K: Array> Read for Record<K> {
337    type Cfg = ();
338    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
339        let key = K::read(buf)?;
340        let next_section = u64::read(buf)?;
341        let next_position = u64::read(buf)?;
342        let value_offset = u64::read(buf)?;
343        let value_size = u32::read(buf)?;
344
345        Ok(Self {
346            key,
347            next_section,
348            next_position,
349            value_offset,
350            value_size,
351        })
352    }
353}
354
355impl<K: Array> FixedSize for Record<K> {
356    // key + next_section + next_position + value_offset + value_size
357    const SIZE: usize = K::SIZE + u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
358}
359
360impl<K: Array> OversizedRecord for Record<K> {
361    fn value_location(&self) -> (u64, u32) {
362        (self.value_offset, self.value_size)
363    }
364
365    fn with_location(mut self, offset: u64, size: u32) -> Self {
366        self.value_offset = offset;
367        self.value_size = size;
368        self
369    }
370}
371
372#[cfg(feature = "arbitrary")]
373impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
374where
375    K: for<'a> arbitrary::Arbitrary<'a>,
376{
377    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
378        Ok(Self {
379            key: K::arbitrary(u)?,
380            next_section: u64::arbitrary(u)?,
381            next_position: u64::arbitrary(u)?,
382            value_offset: u64::arbitrary(u)?,
383            value_size: u32::arbitrary(u)?,
384        })
385    }
386}
387
388/// Implementation of [Freezer].
389pub struct Freezer<E: BufferPooler + Storage + Metrics + Clock, K: Array, V: CodecShared> {
390    // Context for storage operations
391    context: E,
392
393    // Table configuration
394    table_partition: String,
395    table_size: u32,
396    table_resize_threshold: u64,
397    table_resize_frequency: u8,
398    table_resize_chunk_size: u32,
399
400    // Table blob that maps slots to key index chain heads
401    table: E::Blob,
402
403    // Combined key index + value storage with crash recovery
404    oversized: Oversized<E, Record<K>, V>,
405
406    // Target size for value blob sections
407    blob_target_size: u64,
408
409    // Current section for new writes
410    current_section: u64,
411    next_epoch: u64,
412
413    // Sections with pending table updates to be synced
414    modified_sections: BTreeSet<u64>,
415    resizable: u32,
416    resize_progress: Option<u32>,
417
418    // Metrics
419    puts: Counter,
420    gets: Counter,
421    unnecessary_reads: Counter,
422    unnecessary_writes: Counter,
423    resizes: Counter,
424}
425
426impl<E: BufferPooler + Storage + Metrics + Clock, K: Array, V: CodecShared> Freezer<E, K, V> {
427    /// Calculate the byte offset for a table index.
428    #[inline]
429    const fn table_offset(table_index: u32) -> u64 {
430        table_index as u64 * Entry::FULL_SIZE as u64
431    }
432
433    /// Parse table entries from a buffer.
434    fn parse_entries(mut buf: impl Buf) -> Result<(Entry, Entry), Error> {
435        let entry1 = Entry::read(&mut buf)?;
436        let entry2 = Entry::read(&mut buf)?;
437        Ok((entry1, entry2))
438    }
439
440    /// Read entries from the table blob.
441    async fn read_table(blob: &E::Blob, table_index: u32) -> Result<(Entry, Entry), Error> {
442        let offset = Self::table_offset(table_index);
443        let read_buf = blob.read_at(offset, Entry::FULL_SIZE).await?;
444
445        Self::parse_entries(read_buf)
446    }
447
448    /// Recover a single table entry and update tracking.
449    async fn recover_entry(
450        blob: &E::Blob,
451        entry: &mut Entry,
452        entry_offset: u64,
453        max_valid_epoch: Option<u64>,
454        max_epoch: &mut u64,
455        max_section: &mut u64,
456    ) -> Result<bool, Error> {
457        if entry.is_empty() {
458            return Ok(false);
459        }
460
461        if !entry.is_valid()
462            || (max_valid_epoch.is_some() && entry.epoch > max_valid_epoch.unwrap())
463        {
464            debug!(
465                valid_epoch = max_valid_epoch,
466                entry_epoch = entry.epoch,
467                "found invalid table entry"
468            );
469            *entry = Entry::new_empty();
470            let zero_buf = vec![0u8; Entry::SIZE];
471            blob.write_at(entry_offset, zero_buf).await?;
472            Ok(true)
473        } else if max_valid_epoch.is_none() && entry.epoch > *max_epoch {
474            // Only track max epoch if we're discovering it (not validating against a known epoch)
475            *max_epoch = entry.epoch;
476            *max_section = entry.section;
477            Ok(false)
478        } else {
479            Ok(false)
480        }
481    }
482
483    /// Validate and clean invalid table entries for a given epoch.
484    ///
485    /// Returns (modified, max_epoch, max_section, resizable) where:
486    /// - modified: whether any entries were cleaned
487    /// - max_epoch: the maximum valid epoch found
488    /// - max_section: the section corresponding to `max_epoch`
489    /// - resizable: the number of entries that can be resized
490    async fn recover_table(
491        pooler: &impl BufferPooler,
492        blob: &E::Blob,
493        table_size: u32,
494        table_resize_frequency: u8,
495        max_valid_epoch: Option<u64>,
496        table_replay_buffer: NonZeroUsize,
497    ) -> Result<(bool, u64, u64, u32), Error> {
498        // Create a buffered reader for efficient scanning
499        let blob_size = Self::table_offset(table_size);
500        let mut reader =
501            buffer::Read::from_pooler(pooler, blob.clone(), blob_size, table_replay_buffer);
502
503        // Iterate over all table entries and overwrite invalid ones
504        let mut modified = false;
505        let mut max_epoch = 0u64;
506        let mut max_section = 0u64;
507        let mut resizable = 0u32;
508        for table_index in 0..table_size {
509            let offset = Self::table_offset(table_index);
510
511            // Read both entries from the buffer.
512            let entry_buf = reader.read(Entry::FULL_SIZE).await?;
513            let (mut entry1, mut entry2) = Self::parse_entries(entry_buf)?;
514
515            // Check both entries
516            let entry1_cleared = Self::recover_entry(
517                blob,
518                &mut entry1,
519                offset,
520                max_valid_epoch,
521                &mut max_epoch,
522                &mut max_section,
523            )
524            .await?;
525            let entry2_cleared = Self::recover_entry(
526                blob,
527                &mut entry2,
528                offset + Entry::SIZE as u64,
529                max_valid_epoch,
530                &mut max_epoch,
531                &mut max_section,
532            )
533            .await?;
534            modified |= entry1_cleared || entry2_cleared;
535
536            // If the latest entry has reached the resize frequency, increment the resizable entries
537            if let Some((_, _, added)) = Self::read_latest_entry(&entry1, &entry2) {
538                if added >= table_resize_frequency {
539                    resizable += 1;
540                }
541            }
542        }
543
544        Ok((modified, max_epoch, max_section, resizable))
545    }
546
547    /// Determine the write offset for a table entry based on current entries and epoch.
548    const fn compute_write_offset(entry1: &Entry, entry2: &Entry, epoch: u64) -> u64 {
549        // If either entry matches the current epoch, overwrite it
550        if !entry1.is_empty() && entry1.epoch == epoch {
551            return 0;
552        }
553        if !entry2.is_empty() && entry2.epoch == epoch {
554            return Entry::SIZE as u64;
555        }
556
557        // Otherwise, write to the older slot (or empty slot)
558        match (entry1.is_empty(), entry2.is_empty()) {
559            (true, _) => 0,                  // First slot is empty
560            (_, true) => Entry::SIZE as u64, // Second slot is empty
561            (false, false) => {
562                if entry1.epoch < entry2.epoch {
563                    0
564                } else {
565                    Entry::SIZE as u64
566                }
567            }
568        }
569    }
570
571    /// Read the latest valid entry from two table slots.
572    fn read_latest_entry(entry1: &Entry, entry2: &Entry) -> Option<(u64, u64, u8)> {
573        match (
574            !entry1.is_empty() && entry1.is_valid(),
575            !entry2.is_empty() && entry2.is_valid(),
576        ) {
577            (true, true) => match entry1.epoch.cmp(&entry2.epoch) {
578                Ordering::Greater => Some((entry1.section, entry1.position, entry1.added)),
579                Ordering::Less => Some((entry2.section, entry2.position, entry2.added)),
580                Ordering::Equal => {
581                    unreachable!("two valid entries with the same epoch")
582                }
583            },
584            (true, false) => Some((entry1.section, entry1.position, entry1.added)),
585            (false, true) => Some((entry2.section, entry2.position, entry2.added)),
586            (false, false) => None,
587        }
588    }
589
590    /// Write a table entry to the appropriate slot based on epoch.
591    async fn update_head(
592        table: &E::Blob,
593        table_index: u32,
594        entry1: &Entry,
595        entry2: &Entry,
596        update: Entry,
597    ) -> Result<(), Error> {
598        // Calculate the base offset for this table index
599        let table_offset = Self::table_offset(table_index);
600
601        // Determine which slot to write to based on the provided entries
602        let start = Self::compute_write_offset(entry1, entry2, update.epoch);
603
604        // Write the new entry
605        table
606            .write_at(table_offset + start, update.encode_mut())
607            .await
608            .map_err(Error::Runtime)
609    }
610
611    /// Initialize table with given size and sync.
612    async fn init_table(blob: &E::Blob, table_size: u32) -> Result<(), Error> {
613        let table_len = Self::table_offset(table_size);
614        blob.resize(table_len).await?;
615        blob.sync().await?;
616        Ok(())
617    }
618
619    /// Initialize a new [Freezer] instance.
620    pub async fn init(context: E, config: Config<V::Cfg>) -> Result<Self, Error> {
621        Self::init_with_checkpoint(context, config, None).await
622    }
623
624    /// Initialize a new [Freezer] instance with a [Checkpoint].
625    // TODO(#1227): Hide this complexity from the caller.
626    pub async fn init_with_checkpoint(
627        context: E,
628        config: Config<V::Cfg>,
629        checkpoint: Option<Checkpoint>,
630    ) -> Result<Self, Error> {
631        // Validate that initial_table_size is a power of 2
632        assert!(
633            config.table_initial_size > 0 && config.table_initial_size.is_power_of_two(),
634            "table_initial_size must be a power of 2"
635        );
636
637        // Initialize oversized journal (handles crash recovery)
638        let oversized_cfg = OversizedConfig {
639            index_partition: config.key_partition.clone(),
640            value_partition: config.value_partition.clone(),
641            index_page_cache: config.key_page_cache.clone(),
642            index_write_buffer: config.key_write_buffer,
643            value_write_buffer: config.value_write_buffer,
644            compression: config.value_compression,
645            codec_config: config.codec_config,
646        };
647        let mut oversized: Oversized<E, Record<K>, V> =
648            Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
649
650        // Open table blob
651        let (table, table_len) = context
652            .open(&config.table_partition, TABLE_BLOB_NAME)
653            .await?;
654
655        // Determine checkpoint based on initialization scenario
656        let (checkpoint, resizable) = match (table_len, checkpoint) {
657            // New table with no data
658            (0, None) => {
659                Self::init_table(&table, config.table_initial_size).await?;
660                (Checkpoint::init(config.table_initial_size), 0)
661            }
662
663            // New table with explicit checkpoint (must be empty)
664            (0, Some(checkpoint)) => {
665                assert_eq!(checkpoint.epoch, 0);
666                assert_eq!(checkpoint.section, 0);
667                assert_eq!(checkpoint.oversized_size, 0);
668                assert_eq!(checkpoint.table_size, 0);
669
670                Self::init_table(&table, config.table_initial_size).await?;
671                (Checkpoint::init(config.table_initial_size), 0)
672            }
673
674            // Existing table with checkpoint
675            (_, Some(checkpoint)) => {
676                assert!(
677                    checkpoint.table_size > 0 && checkpoint.table_size.is_power_of_two(),
678                    "table_size must be a power of 2"
679                );
680
681                // Rewind oversized to the committed section and key size
682                oversized
683                    .rewind(checkpoint.section, checkpoint.oversized_size)
684                    .await?;
685
686                // Sync oversized
687                oversized.sync(checkpoint.section).await?;
688
689                // Resize table if needed
690                let expected_table_len = Self::table_offset(checkpoint.table_size);
691                let mut modified = if table_len != expected_table_len {
692                    table.resize(expected_table_len).await?;
693                    true
694                } else {
695                    false
696                };
697
698                // Validate and clean invalid entries
699                let (table_modified, _, _, resizable) = Self::recover_table(
700                    &context,
701                    &table,
702                    checkpoint.table_size,
703                    config.table_resize_frequency,
704                    Some(checkpoint.epoch),
705                    config.table_replay_buffer,
706                )
707                .await?;
708                if table_modified {
709                    modified = true;
710                }
711
712                // Sync table if needed
713                if modified {
714                    table.sync().await?;
715                }
716
717                (checkpoint, resizable)
718            }
719
720            // Existing table without checkpoint
721            (_, None) => {
722                // Find max epoch/section and clean invalid entries in a single pass
723                let table_size = (table_len / Entry::FULL_SIZE as u64) as u32;
724                let (modified, max_epoch, max_section, resizable) = Self::recover_table(
725                    &context,
726                    &table,
727                    table_size,
728                    config.table_resize_frequency,
729                    None,
730                    config.table_replay_buffer,
731                )
732                .await?;
733
734                // Sync table if needed
735                if modified {
736                    table.sync().await?;
737                }
738
739                // Get sizes from oversized (crash recovery already ran during init)
740                let oversized_size = oversized.size(max_section).await?;
741
742                (
743                    Checkpoint {
744                        epoch: max_epoch,
745                        section: max_section,
746                        oversized_size,
747                        table_size,
748                    },
749                    resizable,
750                )
751            }
752        };
753
754        // Create metrics
755        let puts = Counter::default();
756        let gets = Counter::default();
757        let unnecessary_reads = Counter::default();
758        let unnecessary_writes = Counter::default();
759        let resizes = Counter::default();
760        context.register("puts", "number of put operations", puts.clone());
761        context.register("gets", "number of get operations", gets.clone());
762        context.register(
763            "unnecessary_reads",
764            "number of unnecessary reads performed during key lookups",
765            unnecessary_reads.clone(),
766        );
767        context.register(
768            "unnecessary_writes",
769            "number of unnecessary writes performed during resize",
770            unnecessary_writes.clone(),
771        );
772        context.register(
773            "resizes",
774            "number of table resizing operations",
775            resizes.clone(),
776        );
777
778        Ok(Self {
779            context,
780            table_partition: config.table_partition,
781            table_size: checkpoint.table_size,
782            table_resize_threshold: checkpoint.table_size as u64 * RESIZE_THRESHOLD / 100,
783            table_resize_frequency: config.table_resize_frequency,
784            table_resize_chunk_size: config.table_resize_chunk_size,
785            table,
786            oversized,
787            blob_target_size: config.value_target_size,
788            current_section: checkpoint.section,
789            next_epoch: checkpoint.epoch.checked_add(1).expect("epoch overflow"),
790            modified_sections: BTreeSet::new(),
791            resizable,
792            resize_progress: None,
793            puts,
794            gets,
795            unnecessary_reads,
796            unnecessary_writes,
797            resizes,
798        })
799    }
800
801    /// Compute the table index for a given key.
802    ///
803    /// As the table doubles in size during a resize, each existing entry splits into two:
804    /// one at the original index and another at a new index (original index + previous table size).
805    ///
806    /// For example, with an initial table size of 4 (2^2):
807    /// - Initially: uses 2 bits of the hash, mapping to entries 0, 1, 2, 3.
808    /// - After resizing to 8: uses 3 bits, entry 0 splits into indices 0 and 4.
809    /// - After resizing to 16: uses 4 bits, entry 0 splits into indices 0 and 8, and so on.
810    ///
811    /// To determine the appropriate entry, we AND the key's hash with the current table size.
812    fn table_index(&self, key: &K) -> u32 {
813        let hash = Crc32::checksum(key.as_ref());
814        hash & (self.table_size - 1)
815    }
816
817    /// Determine if the table should be resized.
818    const fn should_resize(&self) -> bool {
819        self.resizable as u64 >= self.table_resize_threshold
820    }
821
822    /// Determine which blob section to write to based on current blob size.
823    async fn update_section(&mut self) -> Result<(), Error> {
824        // Get the current value blob section size
825        let value_size = self.oversized.value_size(self.current_section).await?;
826
827        // If the current section has reached the target size, create a new section
828        if value_size >= self.blob_target_size {
829            self.current_section += 1;
830            debug!(
831                size = value_size,
832                section = self.current_section,
833                "updated section"
834            );
835        }
836
837        Ok(())
838    }
839
840    /// Put a key-value pair into the [Freezer].
841    /// If the key already exists, the value is updated.
842    pub async fn put(&mut self, key: K, value: V) -> Result<Cursor, Error> {
843        self.puts.inc();
844
845        // Update the section if needed
846        self.update_section().await?;
847
848        // Get head of the chain from table
849        let table_index = self.table_index(&key);
850        let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
851        let head = Self::read_latest_entry(&entry1, &entry2);
852
853        // Create key entry with pointer to previous head (value location set by oversized.append)
854        let key_entry = Record::new(
855            key,
856            head.map(|(section, position, _)| (section, position)),
857            0,
858            0,
859        );
860
861        // Write value and key entry (glob first, then index)
862        let (position, value_offset, value_size) = self
863            .oversized
864            .append(self.current_section, key_entry, &value)
865            .await?;
866
867        // Update the number of items added to the entry.
868        //
869        // We use `saturating_add` to handle overflow (when the table is at max size) gracefully.
870        let mut added = head.map(|(_, _, added)| added).unwrap_or(0);
871        added = added.saturating_add(1);
872
873        // If we've reached the threshold for resizing, increment the resizable entries
874        if added == self.table_resize_frequency {
875            self.resizable += 1;
876        }
877
878        // Update the old position
879        self.modified_sections.insert(self.current_section);
880        let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
881        Self::update_head(&self.table, table_index, &entry1, &entry2, new_entry).await?;
882
883        // If we're mid-resize and this entry has already been processed, update the new position too
884        if let Some(resize_progress) = self.resize_progress {
885            if table_index < resize_progress {
886                self.unnecessary_writes.inc();
887
888                // If the previous entry crossed the threshold, so did this one
889                if added == self.table_resize_frequency {
890                    self.resizable += 1;
891                }
892
893                // This entry has been processed, so we need to update the new position as well.
894                //
895                // The entries are still identical to the old ones, so we don't need to read them again.
896                let new_table_index = self.table_size + table_index;
897                let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
898                Self::update_head(&self.table, new_table_index, &entry1, &entry2, new_entry)
899                    .await?;
900            }
901        }
902
903        Ok(Cursor::new(self.current_section, value_offset, value_size))
904    }
905
906    /// Get the value for a given [Cursor].
907    async fn get_cursor(&self, cursor: Cursor) -> Result<V, Error> {
908        let value = self
909            .oversized
910            .get_value(cursor.section(), cursor.offset(), cursor.size())
911            .await?;
912
913        Ok(value)
914    }
915
916    /// Get the first value for a given key.
917    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
918        self.gets.inc();
919
920        // Get head of the chain from table
921        let table_index = self.table_index(key);
922        let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
923        let Some((mut section, mut position, _)) = Self::read_latest_entry(&entry1, &entry2) else {
924            return Ok(None);
925        };
926
927        // Follow the linked list chain to find the first matching key
928        loop {
929            // Get the key entry from the fixed key index (efficient, good cache locality)
930            let key_entry = self.oversized.get(section, position).await?;
931
932            // Check if this key matches
933            if key_entry.key.as_ref() == key.as_ref() {
934                let value = self
935                    .oversized
936                    .get_value(section, key_entry.value_offset, key_entry.value_size)
937                    .await?;
938                return Ok(Some(value));
939            }
940
941            // Increment unnecessary reads
942            self.unnecessary_reads.inc();
943
944            // Follow the chain
945            let Some(next) = key_entry.next() else {
946                break; // End of chain
947            };
948            section = next.0;
949            position = next.1;
950        }
951
952        Ok(None)
953    }
954
955    /// Get the value for a given [Identifier].
956    ///
957    /// If a [Cursor] is known for the required key, it
958    /// is much faster to use it than searching for a `key`.
959    pub async fn get<'a>(&'a self, identifier: Identifier<'a, K>) -> Result<Option<V>, Error> {
960        match identifier {
961            Identifier::Cursor(cursor) => self.get_cursor(cursor).await.map(Some),
962            Identifier::Key(key) => self.get_key(key).await,
963        }
964    }
965
966    /// Resize the table by doubling its size and split each entry into two.
967    async fn start_resize(&mut self) -> Result<(), Error> {
968        self.resizes.inc();
969
970        // Double the table size (if not already at the max size)
971        let old_size = self.table_size;
972        let Some(new_size) = old_size.checked_mul(2) else {
973            return Ok(());
974        };
975        self.table.resize(Self::table_offset(new_size)).await?;
976
977        // Start the resize
978        self.resize_progress = Some(0);
979        debug!(old = old_size, new = new_size, "table resize started");
980
981        Ok(())
982    }
983
984    /// Write a pair of entries to a buffer, replacing one slot with the new entry.
985    fn rewrite_entries(buf: &mut Vec<u8>, entry1: &Entry, entry2: &Entry, new_entry: &Entry) {
986        if Self::compute_write_offset(entry1, entry2, new_entry.epoch) == 0 {
987            buf.extend_from_slice(&new_entry.encode());
988            buf.extend_from_slice(&entry2.encode());
989        } else {
990            buf.extend_from_slice(&entry1.encode());
991            buf.extend_from_slice(&new_entry.encode());
992        }
993    }
994
995    /// Continue a resize operation by processing the next chunk of entries.
996    ///
997    /// This function processes `table_resize_chunk_size` entries at a time, allowing the resize to
998    /// be spread across multiple sync operations to avoid latency spikes.
999    async fn advance_resize(&mut self) -> Result<(), Error> {
1000        // Compute the range to update
1001        let current_index = self.resize_progress.unwrap();
1002        let old_size = self.table_size;
1003        let chunk_end = (current_index + self.table_resize_chunk_size).min(old_size);
1004        let chunk_size = chunk_end - current_index;
1005
1006        // Read the entire chunk
1007        let chunk_bytes = chunk_size as usize * Entry::FULL_SIZE;
1008        let read_offset = Self::table_offset(current_index);
1009        let mut read_buf = self.table.read_at(read_offset, chunk_bytes).await?;
1010
1011        // Process each entry in the chunk
1012        let mut writes = Vec::with_capacity(chunk_bytes);
1013        for _ in 0..chunk_size {
1014            // Parse the next two slots directly from the read stream.
1015            let (entry1, entry2) = Self::parse_entries(&mut read_buf)?;
1016
1017            // Get the current head
1018            let head = Self::read_latest_entry(&entry1, &entry2);
1019
1020            // Get the reset entry (may be empty)
1021            let reset_entry = match head {
1022                Some((section, position, added)) => {
1023                    // If the entry was at or over the threshold, decrement the resizable entries.
1024                    if added >= self.table_resize_frequency {
1025                        self.resizable -= 1;
1026                    }
1027                    Entry::new(self.next_epoch, section, position, 0)
1028                }
1029                None => Entry::new_empty(),
1030            };
1031
1032            // Rewrite the entries
1033            Self::rewrite_entries(&mut writes, &entry1, &entry2, &reset_entry);
1034        }
1035
1036        // Put the writes into the table.
1037        let writes = IoBuf::from(writes);
1038        let old_write = self.table.write_at(read_offset, writes.clone());
1039        let new_offset = (old_size as usize * Entry::FULL_SIZE) as u64 + read_offset;
1040        let new_write = self.table.write_at(new_offset, writes);
1041        try_join(old_write, new_write).await?;
1042
1043        // Update progress
1044        if chunk_end >= old_size {
1045            // Resize complete
1046            self.table_size = old_size * 2;
1047            self.table_resize_threshold = self.table_size as u64 * RESIZE_THRESHOLD / 100;
1048            self.resize_progress = None;
1049            debug!(
1050                old = old_size,
1051                new = self.table_size,
1052                "table resize completed"
1053            );
1054        } else {
1055            // More chunks to process
1056            self.resize_progress = Some(chunk_end);
1057            debug!(current = current_index, chunk_end, "table resize progress");
1058        }
1059
1060        Ok(())
1061    }
1062
1063    /// Sync all pending data in [Freezer].
1064    ///
1065    /// If the table needs to be resized, the resize will begin during this sync.
1066    /// The resize operation is performed incrementally across multiple sync calls
1067    /// to avoid a large latency spike (or unexpected long latency for [Freezer::put]).
1068    /// Each sync will process up to `table_resize_chunk_size` entries until the resize
1069    /// is complete.
1070    //
1071    // TODO:(<https://github.com/commonwarexyz/monorepo/issues/2910>): Make this non &mut.
1072    pub async fn sync(&mut self) -> Result<Checkpoint, Error> {
1073        // Sync all modified sections for oversized journal
1074        let syncs: Vec<_> = self
1075            .modified_sections
1076            .iter()
1077            .map(|section| self.oversized.sync(*section))
1078            .collect();
1079        try_join_all(syncs).await?;
1080        self.modified_sections.clear();
1081
1082        // Start a resize (if needed)
1083        if self.should_resize() && self.resize_progress.is_none() {
1084            self.start_resize().await?;
1085        }
1086
1087        // Continue a resize (if ongoing)
1088        if self.resize_progress.is_some() {
1089            self.advance_resize().await?;
1090        }
1091
1092        // Sync updated table entries
1093        self.table.sync().await?;
1094        let stored_epoch = self.next_epoch;
1095        self.next_epoch = self.next_epoch.checked_add(1).expect("epoch overflow");
1096
1097        // Get size from oversized
1098        let oversized_size = self.oversized.size(self.current_section).await?;
1099
1100        Ok(Checkpoint {
1101            epoch: stored_epoch,
1102            section: self.current_section,
1103            oversized_size,
1104            table_size: self.table_size,
1105        })
1106    }
1107
1108    /// Close the [Freezer] and return a [Checkpoint] for recovery.
1109    pub async fn close(mut self) -> Result<Checkpoint, Error> {
1110        // If we're mid-resize, complete it
1111        while self.resize_progress.is_some() {
1112            self.advance_resize().await?;
1113        }
1114
1115        // Sync any pending updates before closing
1116        let checkpoint = self.sync().await?;
1117
1118        Ok(checkpoint)
1119    }
1120
1121    /// Close and remove any underlying blobs created by the [Freezer].
1122    pub async fn destroy(self) -> Result<(), Error> {
1123        // Destroy oversized journal
1124        self.oversized.destroy().await?;
1125
1126        // Destroy the table
1127        drop(self.table);
1128        self.context
1129            .remove(&self.table_partition, Some(TABLE_BLOB_NAME))
1130            .await?;
1131        self.context.remove(&self.table_partition, None).await?;
1132
1133        Ok(())
1134    }
1135
1136    /// Get the current progress of the resize operation.
1137    ///
1138    /// Returns `None` if the [Freezer] is not resizing.
1139    #[cfg(test)]
1140    pub const fn resizing(&self) -> Option<u32> {
1141        self.resize_progress
1142    }
1143
1144    /// Get the number of resizable entries.
1145    #[cfg(test)]
1146    pub const fn resizable(&self) -> u32 {
1147        self.resizable
1148    }
1149}
1150
1151impl<E: BufferPooler + Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Gettable
1152    for Freezer<E, K, V>
1153{
1154    type Key = K;
1155    type Value = V;
1156    type Error = Error;
1157
1158    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
1159        self.get(Identifier::Key(key)).await
1160    }
1161}
1162
1163impl<E: BufferPooler + Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Updatable
1164    for Freezer<E, K, V>
1165{
1166    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
1167        self.put(key, value).await?;
1168        Ok(())
1169    }
1170}
1171
1172#[cfg(all(test, feature = "arbitrary"))]
1173mod conformance {
1174    use super::*;
1175    use commonware_codec::conformance::CodecConformance;
1176    use commonware_utils::sequence::U64;
1177
1178    commonware_conformance::conformance_tests! {
1179        CodecConformance<Cursor>,
1180        CodecConformance<Checkpoint>,
1181        CodecConformance<Entry>,
1182        CodecConformance<Record<U64>>
1183    }
1184}
1185
1186#[cfg(test)]
1187mod tests {
1188    use super::*;
1189    use crate::kv::tests::{assert_gettable, assert_send, assert_updatable, test_key};
1190    use commonware_macros::test_traced;
1191    use commonware_runtime::{
1192        buffer::paged::CacheRef, deterministic, deterministic::Context, Runner, Storage,
1193    };
1194    use commonware_utils::{
1195        sequence::{FixedBytes, U64},
1196        NZUsize, NZU16,
1197    };
1198
1199    type TestFreezer = Freezer<Context, U64, u64>;
1200
1201    #[allow(dead_code)]
1202    fn assert_freezer_futures_are_send(freezer: &mut TestFreezer, key: U64) {
1203        assert_gettable(freezer, &key);
1204        assert_updatable(freezer, key, 0u64);
1205    }
1206
1207    #[allow(dead_code)]
1208    fn assert_freezer_destroy_is_send(freezer: TestFreezer) {
1209        assert_send(freezer.destroy());
1210    }
1211
1212    #[test_traced]
1213    fn issue_2966_regression() {
1214        let executor = deterministic::Runner::default();
1215        executor.start(|context| async move {
1216            let cfg = super::super::Config {
1217                key_partition: "test-key-index".into(),
1218                key_write_buffer: NZUsize!(1024),
1219                key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
1220                value_partition: "test-value-journal".into(),
1221                value_compression: None,
1222                value_write_buffer: NZUsize!(1024),
1223                value_target_size: 10 * 1024 * 1024,
1224                table_partition: "test-table".into(),
1225                // Use 4 entries but only insert to 2, leaving 2 empty
1226                table_initial_size: 4,
1227                table_resize_frequency: 1,
1228                table_resize_chunk_size: 4,
1229                table_replay_buffer: NZUsize!(64 * 1024),
1230                codec_config: (),
1231            };
1232            let mut freezer =
1233                Freezer::<_, FixedBytes<64>, i32>::init(context.with_label("first"), cfg.clone())
1234                    .await
1235                    .unwrap();
1236
1237            // Insert only 2 keys to different entries. With table_size=4, entries 2 and 3
1238            // should remain empty.
1239            freezer.put(test_key("key0"), 0).await.unwrap();
1240            freezer.put(test_key("key2"), 1).await.unwrap();
1241            freezer.close().await.unwrap();
1242
1243            let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
1244            let table_data = blob.read_at(0, size as usize).await.unwrap().coalesce();
1245
1246            // Verify resize happened (table doubled from 4 to 8)
1247            let num_entries = size as usize / Entry::FULL_SIZE;
1248            assert_eq!(num_entries, 8);
1249
1250            // Count entries where both slots are truly empty. The bug would cause empty
1251            // entries to have one slot with epoch != 0 and valid CRC.
1252            let mut both_empty_count = 0;
1253            for entry_idx in 0..num_entries {
1254                let offset = entry_idx * Entry::FULL_SIZE;
1255                let buf = &table_data.as_ref()[offset..offset + Entry::FULL_SIZE];
1256                let (slot0, slot1) =
1257                    Freezer::<Context, FixedBytes<64>, i32>::parse_entries(buf).unwrap();
1258                if slot0.is_empty() && slot1.is_empty() {
1259                    both_empty_count += 1;
1260                }
1261            }
1262            // 2 keys in 4 entries = 2 empty. After resize to 8, those become 4 empty.
1263            assert_eq!(both_empty_count, 4);
1264        });
1265    }
1266
1267    #[test_traced]
1268    fn issue_2955_regression() {
1269        let executor = deterministic::Runner::default();
1270        executor.start(|context| async move {
1271            let cfg = super::super::Config {
1272                key_partition: "test-key-index".into(),
1273                key_write_buffer: NZUsize!(1024),
1274                key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
1275                value_partition: "test-value-journal".into(),
1276                value_compression: None,
1277                value_write_buffer: NZUsize!(1024),
1278                value_target_size: 10 * 1024 * 1024,
1279                table_partition: "test-table".into(),
1280                table_initial_size: 4,
1281                table_resize_frequency: 1,
1282                table_resize_chunk_size: 4,
1283                table_replay_buffer: NZUsize!(64 * 1024),
1284                codec_config: (),
1285            };
1286
1287            // Create freezer with data
1288            let checkpoint = {
1289                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
1290                    context.with_label("first"),
1291                    cfg.clone(),
1292                )
1293                .await
1294                .unwrap();
1295                freezer.put(test_key("key0"), 42).await.unwrap();
1296                freezer.sync().await.unwrap();
1297                freezer.close().await.unwrap()
1298            };
1299
1300            // Corrupt the CRC in both slots of the table entry
1301            {
1302                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
1303                let entry_data = blob.read_at(0, Entry::FULL_SIZE).await.unwrap();
1304                let mut corrupted = entry_data.coalesce();
1305                // Corrupt CRC of first slot (last 4 bytes of first slot)
1306                corrupted.as_mut()[Entry::SIZE - 4] ^= 0xFF;
1307                // Corrupt CRC of second slot (last 4 bytes of second slot)
1308                corrupted.as_mut()[Entry::FULL_SIZE - 4] ^= 0xFF;
1309                blob.write_at(0, corrupted).await.unwrap();
1310                blob.sync().await.unwrap();
1311            }
1312
1313            // Reopen to trigger recovery. The bug would set both cleared entries to
1314            // Entry::new(0,0,0,0) which has is_empty()=false and is_valid()=true.
1315            // read_latest_entry would then see two "valid" entries with epoch=0 and
1316            // panic on unreachable!().
1317            let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1318                context.with_label("second"),
1319                cfg.clone(),
1320                Some(checkpoint),
1321            )
1322            .await
1323            .unwrap();
1324            drop(freezer);
1325        });
1326    }
1327}