commonware_storage/freezer/
storage.rs

1use super::{Config, Error, Identifier};
2use crate::{
3    journal::segmented::oversized::{
4        Config as OversizedConfig, Oversized, Record as OversizedRecord,
5    },
6    kv, Persistable,
7};
8use bytes::{Buf, BufMut};
9use commonware_codec::{CodecShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite};
10use commonware_cryptography::{crc32, Crc32, Hasher};
11use commonware_runtime::{buffer, Blob, Clock, Metrics, Storage};
12use commonware_utils::{Array, Span};
13use futures::future::{try_join, try_join_all};
14use prometheus_client::metrics::counter::Counter;
15use std::{cmp::Ordering, collections::BTreeSet, num::NonZeroUsize, ops::Deref};
16use tracing::debug;
17
18/// The percentage of table entries that must reach `table_resize_frequency`
19/// before a resize is triggered.
20const RESIZE_THRESHOLD: u64 = 50;
21
22/// Location of an item in the [Freezer].
23///
24/// This can be used to directly access the data for a given
25/// key-value pair (rather than walking the journal chain).
26#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
27#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
28#[repr(transparent)]
29pub struct Cursor([u8; u64::SIZE + u64::SIZE + u32::SIZE]);
30
31impl Cursor {
32    /// Create a new [Cursor].
33    fn new(section: u64, offset: u64, size: u32) -> Self {
34        let mut buf = [0u8; u64::SIZE + u64::SIZE + u32::SIZE];
35        buf[..u64::SIZE].copy_from_slice(&section.to_be_bytes());
36        buf[u64::SIZE..u64::SIZE + u64::SIZE].copy_from_slice(&offset.to_be_bytes());
37        buf[u64::SIZE + u64::SIZE..].copy_from_slice(&size.to_be_bytes());
38        Self(buf)
39    }
40
41    /// Get the section of the cursor.
42    fn section(&self) -> u64 {
43        u64::from_be_bytes(self.0[..u64::SIZE].try_into().unwrap())
44    }
45
46    /// Get the offset of the cursor.
47    fn offset(&self) -> u64 {
48        u64::from_be_bytes(self.0[u64::SIZE..u64::SIZE + u64::SIZE].try_into().unwrap())
49    }
50
51    /// Get the size of the value.
52    fn size(&self) -> u32 {
53        u32::from_be_bytes(self.0[u64::SIZE + u64::SIZE..].try_into().unwrap())
54    }
55}
56
57impl Read for Cursor {
58    type Cfg = ();
59
60    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
61        <[u8; u64::SIZE + u64::SIZE + u32::SIZE]>::read(buf).map(Self)
62    }
63}
64
65impl CodecWrite for Cursor {
66    fn write(&self, buf: &mut impl BufMut) {
67        self.0.write(buf);
68    }
69}
70
71impl FixedSize for Cursor {
72    const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
73}
74
75impl Span for Cursor {}
76
77impl Array for Cursor {}
78
79impl Deref for Cursor {
80    type Target = [u8];
81    fn deref(&self) -> &Self::Target {
82        &self.0
83    }
84}
85
86impl AsRef<[u8]> for Cursor {
87    fn as_ref(&self) -> &[u8] {
88        &self.0
89    }
90}
91
92impl std::fmt::Debug for Cursor {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        write!(
95            f,
96            "Cursor(section={}, offset={}, size={})",
97            self.section(),
98            self.offset(),
99            self.size()
100        )
101    }
102}
103
104impl std::fmt::Display for Cursor {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        write!(
107            f,
108            "Cursor(section={}, offset={}, size={})",
109            self.section(),
110            self.offset(),
111            self.size()
112        )
113    }
114}
115
116/// Marker of [Freezer] progress.
117///
118/// This can be used to restore the [Freezer] to a consistent
119/// state after shutdown.
120#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
121#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
122pub struct Checkpoint {
123    /// The epoch of the last committed operation.
124    epoch: u64,
125    /// The section of the last committed operation.
126    section: u64,
127    /// The size of the oversized index journal in the last committed section.
128    oversized_size: u64,
129    /// The size of the table.
130    table_size: u32,
131}
132
133impl Checkpoint {
134    /// Initialize a new [Checkpoint].
135    const fn init(table_size: u32) -> Self {
136        Self {
137            table_size,
138            epoch: 0,
139            section: 0,
140            oversized_size: 0,
141        }
142    }
143}
144
145impl Read for Checkpoint {
146    type Cfg = ();
147    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
148        let epoch = u64::read(buf)?;
149        let section = u64::read(buf)?;
150        let oversized_size = u64::read(buf)?;
151        let table_size = u32::read(buf)?;
152        Ok(Self {
153            epoch,
154            section,
155            oversized_size,
156            table_size,
157        })
158    }
159}
160
161impl CodecWrite for Checkpoint {
162    fn write(&self, buf: &mut impl BufMut) {
163        self.epoch.write(buf);
164        self.section.write(buf);
165        self.oversized_size.write(buf);
166        self.table_size.write(buf);
167    }
168}
169
170impl FixedSize for Checkpoint {
171    const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
172}
173
174/// Name of the table blob.
175const TABLE_BLOB_NAME: &[u8] = b"table";
176
177/// Single table entry stored in the table blob.
178#[derive(Debug, Clone, PartialEq)]
179#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
180struct Entry {
181    // Epoch in which this slot was written
182    epoch: u64,
183    // Section in which this slot was written
184    section: u64,
185    // Position in the key index for this section
186    position: u64,
187    // Number of items added to this entry since last resize
188    added: u8,
189    // CRC of (epoch | section | position | added)
190    crc: u32,
191}
192
193impl Entry {
194    /// The full size of a table entry (2 slots).
195    const FULL_SIZE: usize = Self::SIZE * 2;
196
197    /// Compute a checksum for [Entry].
198    fn compute_crc(epoch: u64, section: u64, position: u64, added: u8) -> u32 {
199        let mut hasher = Crc32::new();
200        hasher.update(&epoch.to_be_bytes());
201        hasher.update(&section.to_be_bytes());
202        hasher.update(&position.to_be_bytes());
203        hasher.update(&added.to_be_bytes());
204        hasher.finalize().as_u32()
205    }
206
207    /// Create a new [Entry].
208    fn new(epoch: u64, section: u64, position: u64, added: u8) -> Self {
209        Self {
210            epoch,
211            section,
212            position,
213            added,
214            crc: Self::compute_crc(epoch, section, position, added),
215        }
216    }
217
218    /// Check if this entry is empty (all zeros).
219    const fn is_empty(&self) -> bool {
220        self.section == 0 && self.position == 0 && self.crc == 0
221    }
222
223    /// Check if this entry is valid.
224    fn is_valid(&self) -> bool {
225        Self::compute_crc(self.epoch, self.section, self.position, self.added) == self.crc
226    }
227}
228
229impl FixedSize for Entry {
230    const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u8::SIZE + crc32::Digest::SIZE;
231}
232
233impl CodecWrite for Entry {
234    fn write(&self, buf: &mut impl BufMut) {
235        self.epoch.write(buf);
236        self.section.write(buf);
237        self.position.write(buf);
238        self.added.write(buf);
239        self.crc.write(buf);
240    }
241}
242
243impl Read for Entry {
244    type Cfg = ();
245    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
246        let epoch = u64::read(buf)?;
247        let section = u64::read(buf)?;
248        let position = u64::read(buf)?;
249        let added = u8::read(buf)?;
250        let crc = u32::read(buf)?;
251
252        Ok(Self {
253            epoch,
254            section,
255            position,
256            added,
257            crc,
258        })
259    }
260}
261
262/// Sentinel value indicating no next entry in the collision chain.
263const NO_NEXT_SECTION: u64 = u64::MAX;
264const NO_NEXT_POSITION: u64 = u64::MAX;
265
266/// Key entry stored in the segmented/fixed key index journal.
267///
268/// All fields are fixed size, enabling efficient collision chain traversal
269/// without reading large values.
270///
271/// The `next` pointer uses sentinel values (u64::MAX, u64::MAX) to indicate
272/// "no next entry" instead of Option, ensuring fixed-size encoding.
273#[derive(Debug, Clone, PartialEq)]
274struct Record<K: Array> {
275    /// The key for this entry.
276    key: K,
277    /// Pointer to next entry in collision chain (section, position in key index).
278    /// Uses (u64::MAX, u64::MAX) as sentinel for "no next".
279    next_section: u64,
280    next_position: u64,
281    /// Byte offset in value journal (same section).
282    value_offset: u64,
283    /// Size of value data in the value journal.
284    value_size: u32,
285}
286
287impl<K: Array> Record<K> {
288    /// Create a new [Record].
289    fn new(key: K, next: Option<(u64, u64)>, value_offset: u64, value_size: u32) -> Self {
290        let (next_section, next_position) = next.unwrap_or((NO_NEXT_SECTION, NO_NEXT_POSITION));
291        Self {
292            key,
293            next_section,
294            next_position,
295            value_offset,
296            value_size,
297        }
298    }
299
300    /// Get the next entry in the collision chain, if any.
301    const fn next(&self) -> Option<(u64, u64)> {
302        if self.next_section == NO_NEXT_SECTION && self.next_position == NO_NEXT_POSITION {
303            None
304        } else {
305            Some((self.next_section, self.next_position))
306        }
307    }
308}
309
310impl<K: Array> CodecWrite for Record<K> {
311    fn write(&self, buf: &mut impl BufMut) {
312        self.key.write(buf);
313        self.next_section.write(buf);
314        self.next_position.write(buf);
315        self.value_offset.write(buf);
316        self.value_size.write(buf);
317    }
318}
319
320impl<K: Array> Read for Record<K> {
321    type Cfg = ();
322    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
323        let key = K::read(buf)?;
324        let next_section = u64::read(buf)?;
325        let next_position = u64::read(buf)?;
326        let value_offset = u64::read(buf)?;
327        let value_size = u32::read(buf)?;
328
329        Ok(Self {
330            key,
331            next_section,
332            next_position,
333            value_offset,
334            value_size,
335        })
336    }
337}
338
339impl<K: Array> FixedSize for Record<K> {
340    // key + next_section + next_position + value_offset + value_size
341    const SIZE: usize = K::SIZE + u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
342}
343
344impl<K: Array> OversizedRecord for Record<K> {
345    fn value_location(&self) -> (u64, u32) {
346        (self.value_offset, self.value_size)
347    }
348
349    fn with_location(mut self, offset: u64, size: u32) -> Self {
350        self.value_offset = offset;
351        self.value_size = size;
352        self
353    }
354}
355
356#[cfg(feature = "arbitrary")]
357impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
358where
359    K: for<'a> arbitrary::Arbitrary<'a>,
360{
361    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
362        Ok(Self {
363            key: K::arbitrary(u)?,
364            next_section: u64::arbitrary(u)?,
365            next_position: u64::arbitrary(u)?,
366            value_offset: u64::arbitrary(u)?,
367            value_size: u32::arbitrary(u)?,
368        })
369    }
370}
371
372/// Implementation of [Freezer].
373pub struct Freezer<E: Storage + Metrics + Clock, K: Array, V: CodecShared> {
374    // Context for storage operations
375    context: E,
376
377    // Table configuration
378    table_partition: String,
379    table_size: u32,
380    table_resize_threshold: u64,
381    table_resize_frequency: u8,
382    table_resize_chunk_size: u32,
383
384    // Table blob that maps slots to key index chain heads
385    table: E::Blob,
386
387    // Combined key index + value storage with crash recovery
388    oversized: Oversized<E, Record<K>, V>,
389
390    // Target size for value blob sections
391    blob_target_size: u64,
392
393    // Current section for new writes
394    current_section: u64,
395    next_epoch: u64,
396
397    // Sections with pending table updates to be synced
398    modified_sections: BTreeSet<u64>,
399    resizable: u32,
400    resize_progress: Option<u32>,
401
402    // Metrics
403    puts: Counter,
404    gets: Counter,
405    unnecessary_reads: Counter,
406    unnecessary_writes: Counter,
407    resizes: Counter,
408}
409
410impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Freezer<E, K, V> {
411    /// Calculate the byte offset for a table index.
412    #[inline]
413    const fn table_offset(table_index: u32) -> u64 {
414        table_index as u64 * Entry::FULL_SIZE as u64
415    }
416
417    /// Parse table entries from a buffer.
418    fn parse_entries(buf: &[u8]) -> Result<(Entry, Entry), Error> {
419        let mut buf1 = &buf[0..Entry::SIZE];
420        let entry1 = Entry::read(&mut buf1)?;
421        let mut buf2 = &buf[Entry::SIZE..Entry::FULL_SIZE];
422        let entry2 = Entry::read(&mut buf2)?;
423        Ok((entry1, entry2))
424    }
425
426    /// Read entries from the table blob.
427    async fn read_table(blob: &E::Blob, table_index: u32) -> Result<(Entry, Entry), Error> {
428        let offset = Self::table_offset(table_index);
429        let buf = vec![0u8; Entry::FULL_SIZE];
430        let read_buf = blob.read_at(buf, offset).await?;
431
432        Self::parse_entries(read_buf.as_ref())
433    }
434
435    /// Recover a single table entry and update tracking.
436    async fn recover_entry(
437        blob: &E::Blob,
438        entry: &mut Entry,
439        entry_offset: u64,
440        max_valid_epoch: Option<u64>,
441        max_epoch: &mut u64,
442        max_section: &mut u64,
443    ) -> Result<bool, Error> {
444        if entry.is_empty() {
445            return Ok(false);
446        }
447
448        if !entry.is_valid()
449            || (max_valid_epoch.is_some() && entry.epoch > max_valid_epoch.unwrap())
450        {
451            debug!(
452                valid_epoch = max_valid_epoch,
453                entry_epoch = entry.epoch,
454                "found invalid table entry"
455            );
456            *entry = Entry::new(0, 0, 0, 0);
457            let zero_buf = vec![0u8; Entry::SIZE];
458            blob.write_at(zero_buf, entry_offset).await?;
459            Ok(true)
460        } else if max_valid_epoch.is_none() && entry.epoch > *max_epoch {
461            // Only track max epoch if we're discovering it (not validating against a known epoch)
462            *max_epoch = entry.epoch;
463            *max_section = entry.section;
464            Ok(false)
465        } else {
466            Ok(false)
467        }
468    }
469
470    /// Validate and clean invalid table entries for a given epoch.
471    ///
472    /// Returns (modified, max_epoch, max_section, resizable) where:
473    /// - modified: whether any entries were cleaned
474    /// - max_epoch: the maximum valid epoch found
475    /// - max_section: the section corresponding to `max_epoch`
476    /// - resizable: the number of entries that can be resized
477    async fn recover_table(
478        blob: &E::Blob,
479        table_size: u32,
480        table_resize_frequency: u8,
481        max_valid_epoch: Option<u64>,
482        table_replay_buffer: NonZeroUsize,
483    ) -> Result<(bool, u64, u64, u32), Error> {
484        // Create a buffered reader for efficient scanning
485        let blob_size = Self::table_offset(table_size);
486        let mut reader = buffer::Read::new(blob.clone(), blob_size, table_replay_buffer);
487
488        // Iterate over all table entries and overwrite invalid ones
489        let mut modified = false;
490        let mut max_epoch = 0u64;
491        let mut max_section = 0u64;
492        let mut resizable = 0u32;
493        for table_index in 0..table_size {
494            let offset = Self::table_offset(table_index);
495
496            // Read both entries from the buffer
497            let mut buf = [0u8; Entry::FULL_SIZE];
498            reader.read_exact(&mut buf, Entry::FULL_SIZE).await?;
499            let (mut entry1, mut entry2) = Self::parse_entries(&buf)?;
500
501            // Check both entries
502            let entry1_cleared = Self::recover_entry(
503                blob,
504                &mut entry1,
505                offset,
506                max_valid_epoch,
507                &mut max_epoch,
508                &mut max_section,
509            )
510            .await?;
511            let entry2_cleared = Self::recover_entry(
512                blob,
513                &mut entry2,
514                offset + Entry::SIZE as u64,
515                max_valid_epoch,
516                &mut max_epoch,
517                &mut max_section,
518            )
519            .await?;
520            modified |= entry1_cleared || entry2_cleared;
521
522            // If the latest entry has reached the resize frequency, increment the resizable entries
523            if let Some((_, _, added)) = Self::read_latest_entry(&entry1, &entry2) {
524                if added >= table_resize_frequency {
525                    resizable += 1;
526                }
527            }
528        }
529
530        Ok((modified, max_epoch, max_section, resizable))
531    }
532
533    /// Determine the write offset for a table entry based on current entries and epoch.
534    const fn compute_write_offset(entry1: &Entry, entry2: &Entry, epoch: u64) -> u64 {
535        // If either entry matches the current epoch, overwrite it
536        if !entry1.is_empty() && entry1.epoch == epoch {
537            return 0;
538        }
539        if !entry2.is_empty() && entry2.epoch == epoch {
540            return Entry::SIZE as u64;
541        }
542
543        // Otherwise, write to the older slot (or empty slot)
544        match (entry1.is_empty(), entry2.is_empty()) {
545            (true, _) => 0,                  // First slot is empty
546            (_, true) => Entry::SIZE as u64, // Second slot is empty
547            (false, false) => {
548                if entry1.epoch < entry2.epoch {
549                    0
550                } else {
551                    Entry::SIZE as u64
552                }
553            }
554        }
555    }
556
557    /// Read the latest valid entry from two table slots.
558    fn read_latest_entry(entry1: &Entry, entry2: &Entry) -> Option<(u64, u64, u8)> {
559        match (
560            !entry1.is_empty() && entry1.is_valid(),
561            !entry2.is_empty() && entry2.is_valid(),
562        ) {
563            (true, true) => match entry1.epoch.cmp(&entry2.epoch) {
564                Ordering::Greater => Some((entry1.section, entry1.position, entry1.added)),
565                Ordering::Less => Some((entry2.section, entry2.position, entry2.added)),
566                Ordering::Equal => {
567                    unreachable!("two valid entries with the same epoch")
568                }
569            },
570            (true, false) => Some((entry1.section, entry1.position, entry1.added)),
571            (false, true) => Some((entry2.section, entry2.position, entry2.added)),
572            (false, false) => None,
573        }
574    }
575
576    /// Write a table entry to the appropriate slot based on epoch.
577    async fn update_head(
578        table: &E::Blob,
579        table_index: u32,
580        entry1: &Entry,
581        entry2: &Entry,
582        update: Entry,
583    ) -> Result<(), Error> {
584        // Calculate the base offset for this table index
585        let table_offset = Self::table_offset(table_index);
586
587        // Determine which slot to write to based on the provided entries
588        let start = Self::compute_write_offset(entry1, entry2, update.epoch);
589
590        // Write the new entry
591        table
592            .write_at(update.encode_mut(), table_offset + start)
593            .await
594            .map_err(Error::Runtime)
595    }
596
597    /// Initialize table with given size and sync.
598    async fn init_table(blob: &E::Blob, table_size: u32) -> Result<(), Error> {
599        let table_len = Self::table_offset(table_size);
600        blob.resize(table_len).await?;
601        blob.sync().await?;
602        Ok(())
603    }
604
605    /// Initialize a new [Freezer] instance.
606    pub async fn init(context: E, config: Config<V::Cfg>) -> Result<Self, Error> {
607        Self::init_with_checkpoint(context, config, None).await
608    }
609
610    /// Initialize a new [Freezer] instance with a [Checkpoint].
611    // TODO(#1227): Hide this complexity from the caller.
612    pub async fn init_with_checkpoint(
613        context: E,
614        config: Config<V::Cfg>,
615        checkpoint: Option<Checkpoint>,
616    ) -> Result<Self, Error> {
617        // Validate that initial_table_size is a power of 2
618        assert!(
619            config.table_initial_size > 0 && config.table_initial_size.is_power_of_two(),
620            "table_initial_size must be a power of 2"
621        );
622
623        // Initialize oversized journal (handles crash recovery)
624        let oversized_cfg = OversizedConfig {
625            index_partition: config.key_partition.clone(),
626            value_partition: config.value_partition.clone(),
627            index_buffer_pool: config.key_buffer_pool.clone(),
628            index_write_buffer: config.key_write_buffer,
629            value_write_buffer: config.value_write_buffer,
630            compression: config.value_compression,
631            codec_config: config.codec_config,
632        };
633        let mut oversized: Oversized<E, Record<K>, V> =
634            Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
635
636        // Open table blob
637        let (table, table_len) = context
638            .open(&config.table_partition, TABLE_BLOB_NAME)
639            .await?;
640
641        // Determine checkpoint based on initialization scenario
642        let (checkpoint, resizable) = match (table_len, checkpoint) {
643            // New table with no data
644            (0, None) => {
645                Self::init_table(&table, config.table_initial_size).await?;
646                (Checkpoint::init(config.table_initial_size), 0)
647            }
648
649            // New table with explicit checkpoint (must be empty)
650            (0, Some(checkpoint)) => {
651                assert_eq!(checkpoint.epoch, 0);
652                assert_eq!(checkpoint.section, 0);
653                assert_eq!(checkpoint.oversized_size, 0);
654                assert_eq!(checkpoint.table_size, 0);
655
656                Self::init_table(&table, config.table_initial_size).await?;
657                (Checkpoint::init(config.table_initial_size), 0)
658            }
659
660            // Existing table with checkpoint
661            (_, Some(checkpoint)) => {
662                assert!(
663                    checkpoint.table_size > 0 && checkpoint.table_size.is_power_of_two(),
664                    "table_size must be a power of 2"
665                );
666
667                // Rewind oversized to the committed section and key size
668                oversized
669                    .rewind(checkpoint.section, checkpoint.oversized_size)
670                    .await?;
671
672                // Sync oversized
673                oversized.sync(checkpoint.section).await?;
674
675                // Resize table if needed
676                let expected_table_len = Self::table_offset(checkpoint.table_size);
677                let mut modified = if table_len != expected_table_len {
678                    table.resize(expected_table_len).await?;
679                    true
680                } else {
681                    false
682                };
683
684                // Validate and clean invalid entries
685                let (table_modified, _, _, resizable) = Self::recover_table(
686                    &table,
687                    checkpoint.table_size,
688                    config.table_resize_frequency,
689                    Some(checkpoint.epoch),
690                    config.table_replay_buffer,
691                )
692                .await?;
693                if table_modified {
694                    modified = true;
695                }
696
697                // Sync table if needed
698                if modified {
699                    table.sync().await?;
700                }
701
702                (checkpoint, resizable)
703            }
704
705            // Existing table without checkpoint
706            (_, None) => {
707                // Find max epoch/section and clean invalid entries in a single pass
708                let table_size = (table_len / Entry::FULL_SIZE as u64) as u32;
709                let (modified, max_epoch, max_section, resizable) = Self::recover_table(
710                    &table,
711                    table_size,
712                    config.table_resize_frequency,
713                    None,
714                    config.table_replay_buffer,
715                )
716                .await?;
717
718                // Sync table if needed
719                if modified {
720                    table.sync().await?;
721                }
722
723                // Get sizes from oversized (crash recovery already ran during init)
724                let oversized_size = oversized.size(max_section).await?;
725
726                (
727                    Checkpoint {
728                        epoch: max_epoch,
729                        section: max_section,
730                        oversized_size,
731                        table_size,
732                    },
733                    resizable,
734                )
735            }
736        };
737
738        // Create metrics
739        let puts = Counter::default();
740        let gets = Counter::default();
741        let unnecessary_reads = Counter::default();
742        let unnecessary_writes = Counter::default();
743        let resizes = Counter::default();
744        context.register("puts", "number of put operations", puts.clone());
745        context.register("gets", "number of get operations", gets.clone());
746        context.register(
747            "unnecessary_reads",
748            "number of unnecessary reads performed during key lookups",
749            unnecessary_reads.clone(),
750        );
751        context.register(
752            "unnecessary_writes",
753            "number of unnecessary writes performed during resize",
754            unnecessary_writes.clone(),
755        );
756        context.register(
757            "resizes",
758            "number of table resizing operations",
759            resizes.clone(),
760        );
761
762        Ok(Self {
763            context,
764            table_partition: config.table_partition,
765            table_size: checkpoint.table_size,
766            table_resize_threshold: checkpoint.table_size as u64 * RESIZE_THRESHOLD / 100,
767            table_resize_frequency: config.table_resize_frequency,
768            table_resize_chunk_size: config.table_resize_chunk_size,
769            table,
770            oversized,
771            blob_target_size: config.value_target_size,
772            current_section: checkpoint.section,
773            next_epoch: checkpoint.epoch.checked_add(1).expect("epoch overflow"),
774            modified_sections: BTreeSet::new(),
775            resizable,
776            resize_progress: None,
777            puts,
778            gets,
779            unnecessary_reads,
780            unnecessary_writes,
781            resizes,
782        })
783    }
784
785    /// Compute the table index for a given key.
786    ///
787    /// As the table doubles in size during a resize, each existing entry splits into two:
788    /// one at the original index and another at a new index (original index + previous table size).
789    ///
790    /// For example, with an initial table size of 4 (2^2):
791    /// - Initially: uses 2 bits of the hash, mapping to entries 0, 1, 2, 3.
792    /// - After resizing to 8: uses 3 bits, entry 0 splits into indices 0 and 4.
793    /// - After resizing to 16: uses 4 bits, entry 0 splits into indices 0 and 8, and so on.
794    ///
795    /// To determine the appropriate entry, we AND the key's hash with the current table size.
796    fn table_index(&self, key: &K) -> u32 {
797        let hash = Crc32::checksum(key.as_ref());
798        hash & (self.table_size - 1)
799    }
800
801    /// Determine if the table should be resized.
802    const fn should_resize(&self) -> bool {
803        self.resizable as u64 >= self.table_resize_threshold
804    }
805
806    /// Determine which blob section to write to based on current blob size.
807    async fn update_section(&mut self) -> Result<(), Error> {
808        // Get the current value blob section size
809        let value_size = self.oversized.value_size(self.current_section).await?;
810
811        // If the current section has reached the target size, create a new section
812        if value_size >= self.blob_target_size {
813            self.current_section += 1;
814            debug!(
815                size = value_size,
816                section = self.current_section,
817                "updated section"
818            );
819        }
820
821        Ok(())
822    }
823
824    /// Put a key-value pair into the [Freezer].
825    /// If the key already exists, the value is updated.
826    pub async fn put(&mut self, key: K, value: V) -> Result<Cursor, Error> {
827        self.puts.inc();
828
829        // Update the section if needed
830        self.update_section().await?;
831
832        // Get head of the chain from table
833        let table_index = self.table_index(&key);
834        let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
835        let head = Self::read_latest_entry(&entry1, &entry2);
836
837        // Create key entry with pointer to previous head (value location set by oversized.append)
838        let key_entry = Record::new(
839            key,
840            head.map(|(section, position, _)| (section, position)),
841            0,
842            0,
843        );
844
845        // Write value and key entry (glob first, then index)
846        let (position, value_offset, value_size) = self
847            .oversized
848            .append(self.current_section, key_entry, &value)
849            .await?;
850
851        // Update the number of items added to the entry.
852        //
853        // We use `saturating_add` to handle overflow (when the table is at max size) gracefully.
854        let mut added = head.map(|(_, _, added)| added).unwrap_or(0);
855        added = added.saturating_add(1);
856
857        // If we've reached the threshold for resizing, increment the resizable entries
858        if added == self.table_resize_frequency {
859            self.resizable += 1;
860        }
861
862        // Update the old position
863        self.modified_sections.insert(self.current_section);
864        let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
865        Self::update_head(&self.table, table_index, &entry1, &entry2, new_entry).await?;
866
867        // If we're mid-resize and this entry has already been processed, update the new position too
868        if let Some(resize_progress) = self.resize_progress {
869            if table_index < resize_progress {
870                self.unnecessary_writes.inc();
871
872                // If the previous entry crossed the threshold, so did this one
873                if added == self.table_resize_frequency {
874                    self.resizable += 1;
875                }
876
877                // This entry has been processed, so we need to update the new position as well.
878                //
879                // The entries are still identical to the old ones, so we don't need to read them again.
880                let new_table_index = self.table_size + table_index;
881                let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
882                Self::update_head(&self.table, new_table_index, &entry1, &entry2, new_entry)
883                    .await?;
884            }
885        }
886
887        Ok(Cursor::new(self.current_section, value_offset, value_size))
888    }
889
890    /// Get the value for a given [Cursor].
891    async fn get_cursor(&self, cursor: Cursor) -> Result<V, Error> {
892        let value = self
893            .oversized
894            .get_value(cursor.section(), cursor.offset(), cursor.size())
895            .await?;
896
897        Ok(value)
898    }
899
900    /// Get the first value for a given key.
901    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
902        self.gets.inc();
903
904        // Get head of the chain from table
905        let table_index = self.table_index(key);
906        let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
907        let Some((mut section, mut position, _)) = Self::read_latest_entry(&entry1, &entry2) else {
908            return Ok(None);
909        };
910
911        // Follow the linked list chain to find the first matching key
912        loop {
913            // Get the key entry from the fixed key index (efficient, good cache locality)
914            let key_entry = self.oversized.get(section, position).await?;
915
916            // Check if this key matches
917            if key_entry.key.as_ref() == key.as_ref() {
918                let value = self
919                    .oversized
920                    .get_value(section, key_entry.value_offset, key_entry.value_size)
921                    .await?;
922                return Ok(Some(value));
923            }
924
925            // Increment unnecessary reads
926            self.unnecessary_reads.inc();
927
928            // Follow the chain
929            let Some(next) = key_entry.next() else {
930                break; // End of chain
931            };
932            section = next.0;
933            position = next.1;
934        }
935
936        Ok(None)
937    }
938
939    /// Get the value for a given [Identifier].
940    ///
941    /// If a [Cursor] is known for the required key, it
942    /// is much faster to use it than searching for a `key`.
943    pub async fn get<'a>(&'a self, identifier: Identifier<'a, K>) -> Result<Option<V>, Error> {
944        match identifier {
945            Identifier::Cursor(cursor) => self.get_cursor(cursor).await.map(Some),
946            Identifier::Key(key) => self.get_key(key).await,
947        }
948    }
949
950    /// Resize the table by doubling its size and split each entry into two.
951    async fn start_resize(&mut self) -> Result<(), Error> {
952        self.resizes.inc();
953
954        // Double the table size (if not already at the max size)
955        let old_size = self.table_size;
956        let Some(new_size) = old_size.checked_mul(2) else {
957            return Ok(());
958        };
959        self.table.resize(Self::table_offset(new_size)).await?;
960
961        // Start the resize
962        self.resize_progress = Some(0);
963        debug!(old = old_size, new = new_size, "table resize started");
964
965        Ok(())
966    }
967
968    /// Write a pair of entries to a buffer, replacing one slot with the new entry.
969    fn rewrite_entries(buf: &mut Vec<u8>, entry1: &Entry, entry2: &Entry, new_entry: &Entry) {
970        if Self::compute_write_offset(entry1, entry2, new_entry.epoch) == 0 {
971            buf.extend_from_slice(&new_entry.encode());
972            buf.extend_from_slice(&entry2.encode());
973        } else {
974            buf.extend_from_slice(&entry1.encode());
975            buf.extend_from_slice(&new_entry.encode());
976        }
977    }
978
979    /// Continue a resize operation by processing the next chunk of entries.
980    ///
981    /// This function processes `table_resize_chunk_size` entries at a time,
982    /// allowing the resize to be spread across multiple sync operations to
983    /// avoid latency spikes.
984    async fn advance_resize(&mut self) -> Result<(), Error> {
985        // Compute the range to update
986        let current_index = self.resize_progress.unwrap();
987        let old_size = self.table_size;
988        let chunk_end = (current_index + self.table_resize_chunk_size).min(old_size);
989        let chunk_size = chunk_end - current_index;
990
991        // Read the entire chunk
992        let chunk_bytes = chunk_size as usize * Entry::FULL_SIZE;
993        let read_offset = Self::table_offset(current_index);
994        let read_buf = vec![0u8; chunk_bytes];
995        let read_buf: Vec<u8> = self.table.read_at(read_buf, read_offset).await?.into();
996
997        // Process each entry in the chunk
998        let mut writes = Vec::with_capacity(chunk_bytes);
999        for i in 0..chunk_size {
1000            // Get the entry
1001            let entry_offset = i as usize * Entry::FULL_SIZE;
1002            let entry_end = entry_offset + Entry::FULL_SIZE;
1003            let entry_buf = &read_buf[entry_offset..entry_end];
1004
1005            // Parse the two slots
1006            let (entry1, entry2) = Self::parse_entries(entry_buf)?;
1007
1008            // Get the current head
1009            let (section, position, added) =
1010                Self::read_latest_entry(&entry1, &entry2).unwrap_or((0, 0, 0));
1011
1012            // If the entry was over the threshold, decrement the resizable entries
1013            if added >= self.table_resize_frequency {
1014                self.resizable -= 1;
1015            }
1016
1017            // Rewrite the entries
1018            let reset_entry = Entry::new(self.next_epoch, section, position, 0);
1019            Self::rewrite_entries(&mut writes, &entry1, &entry2, &reset_entry);
1020        }
1021
1022        // Put the writes into the table
1023        let old_write = self.table.write_at(writes.clone(), read_offset);
1024        let new_offset = (old_size as usize * Entry::FULL_SIZE) as u64 + read_offset;
1025        let new_write = self.table.write_at(writes, new_offset);
1026        try_join(old_write, new_write).await?;
1027
1028        // Update progress
1029        if chunk_end >= old_size {
1030            // Resize complete
1031            self.table_size = old_size * 2;
1032            self.table_resize_threshold = self.table_size as u64 * RESIZE_THRESHOLD / 100;
1033            self.resize_progress = None;
1034            debug!(
1035                old = old_size,
1036                new = self.table_size,
1037                "table resize completed"
1038            );
1039        } else {
1040            // More chunks to process
1041            self.resize_progress = Some(chunk_end);
1042            debug!(current = current_index, chunk_end, "table resize progress");
1043        }
1044
1045        Ok(())
1046    }
1047
1048    /// Sync all pending data in [Freezer].
1049    ///
1050    /// If the table needs to be resized, the resize will begin during this sync.
1051    /// The resize operation is performed incrementally across multiple sync calls
1052    /// to avoid a large latency spike (or unexpected long latency for [Freezer::put]).
1053    /// Each sync will process up to `table_resize_chunk_size` entries until the resize
1054    /// is complete.
1055    pub async fn sync(&mut self) -> Result<Checkpoint, Error> {
1056        // Sync all modified sections for oversized journal
1057        let syncs: Vec<_> = self
1058            .modified_sections
1059            .iter()
1060            .map(|section| self.oversized.sync(*section))
1061            .collect();
1062        try_join_all(syncs).await?;
1063        self.modified_sections.clear();
1064
1065        // Start a resize (if needed)
1066        if self.should_resize() && self.resize_progress.is_none() {
1067            self.start_resize().await?;
1068        }
1069
1070        // Continue a resize (if ongoing)
1071        if self.resize_progress.is_some() {
1072            self.advance_resize().await?;
1073        }
1074
1075        // Sync updated table entries
1076        self.table.sync().await?;
1077        let stored_epoch = self.next_epoch;
1078        self.next_epoch = self.next_epoch.checked_add(1).expect("epoch overflow");
1079
1080        // Get size from oversized
1081        let oversized_size = self.oversized.size(self.current_section).await?;
1082
1083        Ok(Checkpoint {
1084            epoch: stored_epoch,
1085            section: self.current_section,
1086            oversized_size,
1087            table_size: self.table_size,
1088        })
1089    }
1090
1091    /// Close the [Freezer] and return a [Checkpoint] for recovery.
1092    pub async fn close(mut self) -> Result<Checkpoint, Error> {
1093        // If we're mid-resize, complete it
1094        while self.resize_progress.is_some() {
1095            self.advance_resize().await?;
1096        }
1097
1098        // Sync any pending updates before closing
1099        let checkpoint = self.sync().await?;
1100
1101        Ok(checkpoint)
1102    }
1103
1104    /// Close and remove any underlying blobs created by the [Freezer].
1105    pub async fn destroy(self) -> Result<(), Error> {
1106        // Destroy oversized journal
1107        self.oversized.destroy().await?;
1108
1109        // Destroy the table
1110        drop(self.table);
1111        self.context
1112            .remove(&self.table_partition, Some(TABLE_BLOB_NAME))
1113            .await?;
1114        self.context.remove(&self.table_partition, None).await?;
1115
1116        Ok(())
1117    }
1118
1119    /// Get the current progress of the resize operation.
1120    ///
1121    /// Returns `None` if the [Freezer] is not resizing.
1122    #[cfg(test)]
1123    pub const fn resizing(&self) -> Option<u32> {
1124        self.resize_progress
1125    }
1126
1127    /// Get the number of resizable entries.
1128    #[cfg(test)]
1129    pub const fn resizable(&self) -> u32 {
1130        self.resizable
1131    }
1132}
1133
1134impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Gettable for Freezer<E, K, V> {
1135    type Key = K;
1136    type Value = V;
1137    type Error = Error;
1138
1139    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
1140        self.get(Identifier::Key(key)).await
1141    }
1142}
1143
1144impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Updatable for Freezer<E, K, V> {
1145    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
1146        self.put(key, value).await?;
1147        Ok(())
1148    }
1149}
1150
1151impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Persistable for Freezer<E, K, V> {
1152    type Error = Error;
1153
1154    async fn commit(&mut self) -> Result<(), Self::Error> {
1155        self.sync().await?;
1156        Ok(())
1157    }
1158
1159    async fn sync(&mut self) -> Result<(), Self::Error> {
1160        self.sync().await?;
1161        Ok(())
1162    }
1163
1164    async fn destroy(self) -> Result<(), Self::Error> {
1165        self.destroy().await?;
1166        Ok(())
1167    }
1168}
1169
1170#[cfg(all(test, feature = "arbitrary"))]
1171mod conformance {
1172    use super::*;
1173    use commonware_codec::conformance::CodecConformance;
1174    use commonware_utils::sequence::U64;
1175
1176    commonware_conformance::conformance_tests! {
1177        CodecConformance<Cursor>,
1178        CodecConformance<Checkpoint>,
1179        CodecConformance<Entry>,
1180        CodecConformance<Record<U64>>
1181    }
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186    use super::*;
1187    use crate::kv::tests::{assert_gettable, assert_send, assert_updatable};
1188    use commonware_runtime::deterministic::Context;
1189    use commonware_utils::sequence::U64;
1190
1191    type TestFreezer = Freezer<Context, U64, u64>;
1192
1193    #[allow(dead_code)]
1194    fn assert_freezer_futures_are_send(freezer: &mut TestFreezer, key: U64) {
1195        assert_gettable(freezer, &key);
1196        assert_updatable(freezer, key, 0u64);
1197    }
1198
1199    #[allow(dead_code)]
1200    fn assert_freezer_destroy_is_send(freezer: TestFreezer) {
1201        assert_send(freezer.destroy());
1202    }
1203}