commonware_storage/freezer/
storage.rs

1use super::{Config, Error, Identifier};
2use crate::journal::segmented::variable::{Config as JournalConfig, Journal};
3use bytes::{Buf, BufMut};
4use commonware_codec::{Codec, Encode, EncodeSize, FixedSize, Read, ReadExt, Write as CodecWrite};
5use commonware_runtime::{buffer, Blob, Clock, Metrics, Storage};
6use commonware_utils::{Array, Span};
7use futures::future::{try_join, try_join_all};
8use prometheus_client::metrics::counter::Counter;
9use std::{
10    cmp::Ordering, collections::BTreeSet, marker::PhantomData, num::NonZeroUsize, ops::Deref,
11};
12use tracing::debug;
13
14/// The percentage of table entries that must reach `table_resize_frequency`
15/// before a resize is triggered.
16const RESIZE_THRESHOLD: u64 = 50;
17
18/// Location of an item in the [Freezer].
19///
20/// This can be used to directly access the data for a given
21/// key-value pair (rather than walking the journal chain).
22#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
23#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
24#[repr(transparent)]
25pub struct Cursor([u8; u64::SIZE + u32::SIZE]);
26
27impl Cursor {
28    /// Create a new [Cursor].
29    fn new(section: u64, offset: u32) -> Self {
30        let mut buf = [0u8; u64::SIZE + u32::SIZE];
31        buf[..u64::SIZE].copy_from_slice(&section.to_be_bytes());
32        buf[u64::SIZE..].copy_from_slice(&offset.to_be_bytes());
33        Self(buf)
34    }
35
36    /// Get the section of the cursor.
37    fn section(&self) -> u64 {
38        u64::from_be_bytes(self.0[..u64::SIZE].try_into().unwrap())
39    }
40
41    /// Get the offset of the cursor.
42    fn offset(&self) -> u32 {
43        u32::from_be_bytes(self.0[u64::SIZE..].try_into().unwrap())
44    }
45}
46
47impl Read for Cursor {
48    type Cfg = ();
49
50    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
51        <[u8; u64::SIZE + u32::SIZE]>::read(buf).map(Self)
52    }
53}
54
55impl CodecWrite for Cursor {
56    fn write(&self, buf: &mut impl BufMut) {
57        self.0.write(buf);
58    }
59}
60
61impl FixedSize for Cursor {
62    const SIZE: usize = u64::SIZE + u32::SIZE;
63}
64
65impl Span for Cursor {}
66
67impl Array for Cursor {}
68
69impl Deref for Cursor {
70    type Target = [u8];
71    fn deref(&self) -> &Self::Target {
72        &self.0
73    }
74}
75
76impl AsRef<[u8]> for Cursor {
77    fn as_ref(&self) -> &[u8] {
78        &self.0
79    }
80}
81
82impl std::fmt::Debug for Cursor {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        write!(
85            f,
86            "Cursor(section={}, offset={})",
87            self.section(),
88            self.offset()
89        )
90    }
91}
92
93impl std::fmt::Display for Cursor {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        write!(
96            f,
97            "Cursor(section={}, offset={})",
98            self.section(),
99            self.offset()
100        )
101    }
102}
103
104/// Marker of [Freezer] progress.
105///
106/// This can be used to restore the [Freezer] to a consistent
107/// state after shutdown.
108#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
109#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
110pub struct Checkpoint {
111    /// The epoch of the last committed operation.
112    epoch: u64,
113    /// The section of the last committed operation.
114    section: u64,
115    /// The size of the journal in the last committed section.
116    size: u64,
117    /// The size of the table.
118    table_size: u32,
119}
120
121impl Checkpoint {
122    /// Initialize a new [Checkpoint].
123    const fn init(table_size: u32) -> Self {
124        Self {
125            table_size,
126            epoch: 0,
127            section: 0,
128            size: 0,
129        }
130    }
131}
132
133impl Read for Checkpoint {
134    type Cfg = ();
135    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
136        let epoch = u64::read(buf)?;
137        let section = u64::read(buf)?;
138        let size = u64::read(buf)?;
139        let table_size = u32::read(buf)?;
140        Ok(Self {
141            epoch,
142            section,
143            size,
144            table_size,
145        })
146    }
147}
148
149impl CodecWrite for Checkpoint {
150    fn write(&self, buf: &mut impl BufMut) {
151        self.epoch.write(buf);
152        self.section.write(buf);
153        self.size.write(buf);
154        self.table_size.write(buf);
155    }
156}
157
158impl FixedSize for Checkpoint {
159    const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
160}
161
162/// Name of the table blob.
163const TABLE_BLOB_NAME: &[u8] = b"table";
164
165/// Single table entry stored in the table blob.
166#[derive(Debug, Clone, PartialEq)]
167#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
168struct Entry {
169    // Epoch in which this slot was written
170    epoch: u64,
171    // Section in which this slot was written
172    section: u64,
173    // Offset in the section where this slot was written
174    offset: u32,
175    // Number of items added to this entry since last resize
176    added: u8,
177    // CRC of (epoch | section | offset | added)
178    crc: u32,
179}
180
181impl Entry {
182    /// The full size of a table entry (2 slots).
183    const FULL_SIZE: usize = Self::SIZE * 2;
184
185    /// Compute a checksum for [Entry].
186    fn compute_crc(epoch: u64, section: u64, offset: u32, added: u8) -> u32 {
187        let mut hasher = crc32fast::Hasher::new();
188        hasher.update(&epoch.to_be_bytes());
189        hasher.update(&section.to_be_bytes());
190        hasher.update(&offset.to_be_bytes());
191        hasher.update(&added.to_be_bytes());
192        hasher.finalize()
193    }
194
195    /// Create a new [Entry].
196    fn new(epoch: u64, section: u64, offset: u32, added: u8) -> Self {
197        Self {
198            epoch,
199            section,
200            offset,
201            added,
202            crc: Self::compute_crc(epoch, section, offset, added),
203        }
204    }
205
206    /// Check if this entry is empty (all zeros).
207    const fn is_empty(&self) -> bool {
208        self.section == 0 && self.offset == 0 && self.crc == 0
209    }
210
211    /// Check if this entry is valid.
212    fn is_valid(&self) -> bool {
213        Self::compute_crc(self.epoch, self.section, self.offset, self.added) == self.crc
214    }
215}
216
217impl FixedSize for Entry {
218    const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE + u8::SIZE + u32::SIZE;
219}
220
221impl CodecWrite for Entry {
222    fn write(&self, buf: &mut impl BufMut) {
223        self.epoch.write(buf);
224        self.section.write(buf);
225        self.offset.write(buf);
226        self.added.write(buf);
227        self.crc.write(buf);
228    }
229}
230
231impl Read for Entry {
232    type Cfg = ();
233    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
234        let epoch = u64::read(buf)?;
235        let section = u64::read(buf)?;
236        let offset = u32::read(buf)?;
237        let added = u8::read(buf)?;
238        let crc = u32::read(buf)?;
239
240        Ok(Self {
241            epoch,
242            section,
243            offset,
244            added,
245            crc,
246        })
247    }
248}
249
250/// A key-value pair stored in the [Journal].
251struct Record<K: Array, V: Codec> {
252    key: K,
253    value: V,
254    next: Option<(u64, u32)>,
255}
256
257impl<K: Array, V: Codec> Record<K, V> {
258    /// Create a new [Record].
259    const fn new(key: K, value: V, next: Option<(u64, u32)>) -> Self {
260        Self { key, value, next }
261    }
262}
263
264impl<K: Array, V: Codec> CodecWrite for Record<K, V> {
265    fn write(&self, buf: &mut impl BufMut) {
266        self.key.write(buf);
267        self.value.write(buf);
268        self.next.write(buf);
269    }
270}
271
272impl<K: Array, V: Codec> Read for Record<K, V> {
273    type Cfg = V::Cfg;
274    fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
275        let key = K::read(buf)?;
276        let value = V::read_cfg(buf, cfg)?;
277        let next = Option::<(u64, u32)>::read_cfg(buf, &((), ()))?;
278
279        Ok(Self { key, value, next })
280    }
281}
282
283impl<K: Array, V: Codec> EncodeSize for Record<K, V> {
284    fn encode_size(&self) -> usize {
285        K::SIZE + self.value.encode_size() + self.next.encode_size()
286    }
287}
288
289#[cfg(feature = "arbitrary")]
290impl<K: Array, V: Codec> arbitrary::Arbitrary<'_> for Record<K, V>
291where
292    K: for<'a> arbitrary::Arbitrary<'a>,
293    V: for<'a> arbitrary::Arbitrary<'a>,
294{
295    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
296        Ok(Self {
297            key: K::arbitrary(u)?,
298            value: V::arbitrary(u)?,
299            next: Option::<(u64, u32)>::arbitrary(u)?,
300        })
301    }
302}
303
304/// Implementation of [Freezer].
305pub struct Freezer<E: Storage + Metrics + Clock, K: Array, V: Codec> {
306    // Context for storage operations
307    context: E,
308
309    // Table configuration
310    table_partition: String,
311    table_size: u32,
312    table_resize_threshold: u64,
313    table_resize_frequency: u8,
314    table_resize_chunk_size: u32,
315
316    // Table blob that maps slots to journal chain heads
317    table: E::Blob,
318
319    // Variable journal for storing entries
320    journal: Journal<E, Record<K, V>>,
321    journal_target_size: u64,
322
323    // Current section for new writes
324    current_section: u64,
325    next_epoch: u64,
326
327    // Sections with pending table updates to be synced
328    modified_sections: BTreeSet<u64>,
329    resizable: u32,
330    resize_progress: Option<u32>,
331
332    // Metrics
333    puts: Counter,
334    gets: Counter,
335    unnecessary_reads: Counter,
336    unnecessary_writes: Counter,
337    resizes: Counter,
338
339    // Phantom data to satisfy the compiler about generic types
340    _phantom: PhantomData<(K, V)>,
341}
342
343impl<E: Storage + Metrics + Clock, K: Array, V: Codec> Freezer<E, K, V> {
344    /// Calculate the byte offset for a table index.
345    #[inline]
346    const fn table_offset(table_index: u32) -> u64 {
347        table_index as u64 * Entry::FULL_SIZE as u64
348    }
349
350    /// Parse table entries from a buffer.
351    fn parse_entries(buf: &[u8]) -> Result<(Entry, Entry), Error> {
352        let mut buf1 = &buf[0..Entry::SIZE];
353        let entry1 = Entry::read(&mut buf1)?;
354        let mut buf2 = &buf[Entry::SIZE..Entry::FULL_SIZE];
355        let entry2 = Entry::read(&mut buf2)?;
356        Ok((entry1, entry2))
357    }
358
359    /// Read entries from the table blob.
360    async fn read_table(blob: &E::Blob, table_index: u32) -> Result<(Entry, Entry), Error> {
361        let offset = Self::table_offset(table_index);
362        let buf = vec![0u8; Entry::FULL_SIZE];
363        let read_buf = blob.read_at(buf, offset).await?;
364
365        Self::parse_entries(read_buf.as_ref())
366    }
367
368    /// Recover a single table entry and update tracking.
369    async fn recover_entry(
370        blob: &E::Blob,
371        entry: &mut Entry,
372        entry_offset: u64,
373        max_valid_epoch: Option<u64>,
374        max_epoch: &mut u64,
375        max_section: &mut u64,
376    ) -> Result<bool, Error> {
377        if entry.is_empty() {
378            return Ok(false);
379        }
380
381        if !entry.is_valid()
382            || (max_valid_epoch.is_some() && entry.epoch > max_valid_epoch.unwrap())
383        {
384            debug!(
385                valid_epoch = max_valid_epoch,
386                entry_epoch = entry.epoch,
387                "found invalid table entry"
388            );
389            *entry = Entry::new(0, 0, 0, 0);
390            let zero_buf = vec![0u8; Entry::SIZE];
391            blob.write_at(zero_buf, entry_offset).await?;
392            Ok(true)
393        } else if max_valid_epoch.is_none() && entry.epoch > *max_epoch {
394            // Only track max epoch if we're discovering it (not validating against a known epoch)
395            *max_epoch = entry.epoch;
396            *max_section = entry.section;
397            Ok(false)
398        } else {
399            Ok(false)
400        }
401    }
402
403    /// Validate and clean invalid table entries for a given epoch.
404    ///
405    /// Returns (modified, max_epoch, max_section, resizable) where:
406    /// - modified: whether any entries were cleaned
407    /// - max_epoch: the maximum valid epoch found
408    /// - max_section: the section corresponding to `max_epoch`
409    /// - resizable: the number of entries that can be resized
410    async fn recover_table(
411        blob: &E::Blob,
412        table_size: u32,
413        table_resize_frequency: u8,
414        max_valid_epoch: Option<u64>,
415        table_replay_buffer: NonZeroUsize,
416    ) -> Result<(bool, u64, u64, u32), Error> {
417        // Create a buffered reader for efficient scanning
418        let blob_size = Self::table_offset(table_size);
419        let mut reader = buffer::Read::new(blob.clone(), blob_size, table_replay_buffer);
420
421        // Iterate over all table entries and overwrite invalid ones
422        let mut modified = false;
423        let mut max_epoch = 0u64;
424        let mut max_section = 0u64;
425        let mut resizable = 0u32;
426        for table_index in 0..table_size {
427            let offset = Self::table_offset(table_index);
428
429            // Read both entries from the buffer
430            let mut buf = [0u8; Entry::FULL_SIZE];
431            reader.read_exact(&mut buf, Entry::FULL_SIZE).await?;
432            let (mut entry1, mut entry2) = Self::parse_entries(&buf)?;
433
434            // Check both entries
435            let entry1_cleared = Self::recover_entry(
436                blob,
437                &mut entry1,
438                offset,
439                max_valid_epoch,
440                &mut max_epoch,
441                &mut max_section,
442            )
443            .await?;
444            let entry2_cleared = Self::recover_entry(
445                blob,
446                &mut entry2,
447                offset + Entry::SIZE as u64,
448                max_valid_epoch,
449                &mut max_epoch,
450                &mut max_section,
451            )
452            .await?;
453            modified |= entry1_cleared || entry2_cleared;
454
455            // If the latest entry has reached the resize frequency, increment the resizable entries
456            if let Some((_, _, added)) = Self::read_latest_entry(&entry1, &entry2) {
457                if added >= table_resize_frequency {
458                    resizable += 1;
459                }
460            }
461        }
462
463        Ok((modified, max_epoch, max_section, resizable))
464    }
465
466    /// Determine the write offset for a table entry based on current entries and epoch.
467    const fn compute_write_offset(entry1: &Entry, entry2: &Entry, epoch: u64) -> u64 {
468        // If either entry matches the current epoch, overwrite it
469        if !entry1.is_empty() && entry1.epoch == epoch {
470            return 0;
471        }
472        if !entry2.is_empty() && entry2.epoch == epoch {
473            return Entry::SIZE as u64;
474        }
475
476        // Otherwise, write to the older slot (or empty slot)
477        match (entry1.is_empty(), entry2.is_empty()) {
478            (true, _) => 0,                  // First slot is empty
479            (_, true) => Entry::SIZE as u64, // Second slot is empty
480            (false, false) => {
481                if entry1.epoch < entry2.epoch {
482                    0
483                } else {
484                    Entry::SIZE as u64
485                }
486            }
487        }
488    }
489
490    /// Read the latest valid entry from two table slots.
491    fn read_latest_entry(entry1: &Entry, entry2: &Entry) -> Option<(u64, u32, u8)> {
492        match (
493            !entry1.is_empty() && entry1.is_valid(),
494            !entry2.is_empty() && entry2.is_valid(),
495        ) {
496            (true, true) => match entry1.epoch.cmp(&entry2.epoch) {
497                Ordering::Greater => Some((entry1.section, entry1.offset, entry1.added)),
498                Ordering::Less => Some((entry2.section, entry2.offset, entry2.added)),
499                Ordering::Equal => {
500                    unreachable!("two valid entries with the same epoch")
501                }
502            },
503            (true, false) => Some((entry1.section, entry1.offset, entry1.added)),
504            (false, true) => Some((entry2.section, entry2.offset, entry2.added)),
505            (false, false) => None,
506        }
507    }
508
509    /// Write a table entry to the appropriate slot based on epoch.
510    async fn update_head(
511        table: &E::Blob,
512        table_index: u32,
513        entry1: &Entry,
514        entry2: &Entry,
515        update: Entry,
516    ) -> Result<(), Error> {
517        // Calculate the base offset for this table index
518        let table_offset = Self::table_offset(table_index);
519
520        // Determine which slot to write to based on the provided entries
521        let start = Self::compute_write_offset(entry1, entry2, update.epoch);
522
523        // Write the new entry
524        table
525            .write_at(update.encode(), table_offset + start)
526            .await
527            .map_err(Error::Runtime)
528    }
529
530    /// Initialize table with given size and sync.
531    async fn init_table(blob: &E::Blob, table_size: u32) -> Result<(), Error> {
532        let table_len = Self::table_offset(table_size);
533        blob.resize(table_len).await?;
534        blob.sync().await?;
535        Ok(())
536    }
537
538    /// Initialize a new [Freezer] instance.
539    pub async fn init(context: E, config: Config<V::Cfg>) -> Result<Self, Error> {
540        Self::init_with_checkpoint(context, config, None).await
541    }
542
543    /// Initialize a new [Freezer] instance with a [Checkpoint].
544    // TODO(#1227): Hide this complexity from the caller.
545    pub async fn init_with_checkpoint(
546        context: E,
547        config: Config<V::Cfg>,
548        checkpoint: Option<Checkpoint>,
549    ) -> Result<Self, Error> {
550        // Validate that initial_table_size is a power of 2
551        assert!(
552            config.table_initial_size > 0 && config.table_initial_size.is_power_of_two(),
553            "table_initial_size must be a power of 2"
554        );
555
556        // Initialize variable journal with a separate partition
557        let journal_config = JournalConfig {
558            partition: config.journal_partition,
559            compression: config.journal_compression,
560            codec_config: config.codec_config,
561            write_buffer: config.journal_write_buffer,
562            buffer_pool: config.journal_buffer_pool,
563        };
564        let mut journal = Journal::init(context.with_label("journal"), journal_config).await?;
565
566        // Open table blob
567        let (table, table_len) = context
568            .open(&config.table_partition, TABLE_BLOB_NAME)
569            .await?;
570
571        // Determine checkpoint based on initialization scenario
572        let (checkpoint, resizable) = match (table_len, checkpoint) {
573            // New table with no data
574            (0, None) => {
575                Self::init_table(&table, config.table_initial_size).await?;
576                (Checkpoint::init(config.table_initial_size), 0)
577            }
578
579            // New table with explicit checkpoint (must be empty)
580            (0, Some(checkpoint)) => {
581                assert_eq!(checkpoint.epoch, 0);
582                assert_eq!(checkpoint.section, 0);
583                assert_eq!(checkpoint.size, 0);
584                assert_eq!(checkpoint.table_size, 0);
585
586                Self::init_table(&table, config.table_initial_size).await?;
587                (Checkpoint::init(config.table_initial_size), 0)
588            }
589
590            // Existing table with checkpoint
591            (_, Some(checkpoint)) => {
592                assert!(
593                    checkpoint.table_size > 0 && checkpoint.table_size.is_power_of_two(),
594                    "table_size must be a power of 2"
595                );
596
597                // Rewind the journal to the committed section and offset
598                journal.rewind(checkpoint.section, checkpoint.size).await?;
599                journal.sync(checkpoint.section).await?;
600
601                // Resize table if needed
602                let expected_table_len = Self::table_offset(checkpoint.table_size);
603                let mut modified = if table_len != expected_table_len {
604                    table.resize(expected_table_len).await?;
605                    true
606                } else {
607                    false
608                };
609
610                // Validate and clean invalid entries
611                let (table_modified, _, _, resizable) = Self::recover_table(
612                    &table,
613                    checkpoint.table_size,
614                    config.table_resize_frequency,
615                    Some(checkpoint.epoch),
616                    config.table_replay_buffer,
617                )
618                .await?;
619                if table_modified {
620                    modified = true;
621                }
622
623                // Sync table if needed
624                if modified {
625                    table.sync().await?;
626                }
627
628                (checkpoint, resizable)
629            }
630
631            // Existing table without checkpoint
632            (_, None) => {
633                // Find max epoch/section and clean invalid entries in a single pass
634                let table_size = (table_len / Entry::FULL_SIZE as u64) as u32;
635                let (modified, max_epoch, max_section, resizable) = Self::recover_table(
636                    &table,
637                    table_size,
638                    config.table_resize_frequency,
639                    None,
640                    config.table_replay_buffer,
641                )
642                .await?;
643
644                // Sync table if needed
645                if modified {
646                    table.sync().await?;
647                }
648
649                (
650                    Checkpoint {
651                        epoch: max_epoch,
652                        section: max_section,
653                        size: journal.size(max_section).await?,
654                        table_size,
655                    },
656                    resizable,
657                )
658            }
659        };
660
661        // Create metrics
662        let puts = Counter::default();
663        let gets = Counter::default();
664        let unnecessary_reads = Counter::default();
665        let unnecessary_writes = Counter::default();
666        let resizes = Counter::default();
667        context.register("puts", "number of put operations", puts.clone());
668        context.register("gets", "number of get operations", gets.clone());
669        context.register(
670            "unnecessary_reads",
671            "number of unnecessary reads performed during key lookups",
672            unnecessary_reads.clone(),
673        );
674        context.register(
675            "unnecessary_writes",
676            "number of unnecessary writes performed during resize",
677            unnecessary_writes.clone(),
678        );
679        context.register(
680            "resizes",
681            "number of table resizing operations",
682            resizes.clone(),
683        );
684
685        Ok(Self {
686            context,
687            table_partition: config.table_partition,
688            table_size: checkpoint.table_size,
689            table_resize_threshold: checkpoint.table_size as u64 * RESIZE_THRESHOLD / 100,
690            table_resize_frequency: config.table_resize_frequency,
691            table_resize_chunk_size: config.table_resize_chunk_size,
692            table,
693            journal,
694            journal_target_size: config.journal_target_size,
695            current_section: checkpoint.section,
696            next_epoch: checkpoint.epoch.checked_add(1).expect("epoch overflow"),
697            modified_sections: BTreeSet::new(),
698            resizable,
699            resize_progress: None,
700            puts,
701            gets,
702            unnecessary_reads,
703            unnecessary_writes,
704            resizes,
705            _phantom: PhantomData,
706        })
707    }
708
709    /// Compute the table index for a given key.
710    ///
711    /// As the table doubles in size during a resize, each existing entry splits into two:
712    /// one at the original index and another at a new index (original index + previous table size).
713    ///
714    /// For example, with an initial table size of 4 (2^2):
715    /// - Initially: uses 2 bits of the hash, mapping to entries 0, 1, 2, 3.
716    /// - After resizing to 8: uses 3 bits, entry 0 splits into indices 0 and 4.
717    /// - After resizing to 16: uses 4 bits, entry 0 splits into indices 0 and 8, and so on.
718    ///
719    /// To determine the appropriate entry, we AND the key's hash with the current table size.
720    fn table_index(&self, key: &K) -> u32 {
721        let hash = crc32fast::hash(key.as_ref());
722        hash & (self.table_size - 1)
723    }
724
725    /// Determine if the table should be resized.
726    const fn should_resize(&self) -> bool {
727        self.resizable as u64 >= self.table_resize_threshold
728    }
729
730    /// Determine which journal section to write to based on current journal size.
731    async fn update_section(&mut self) -> Result<(), Error> {
732        // Get the current section size
733        let size = self.journal.size(self.current_section).await?;
734
735        // If the current section has reached the target size, create a new section
736        if size >= self.journal_target_size {
737            self.current_section += 1;
738            debug!(size, section = self.current_section, "updated section");
739        }
740
741        Ok(())
742    }
743
744    /// Put a key-value pair into the [Freezer].
745    /// If the key already exists, the value is updated.
746    pub async fn put(&mut self, key: K, value: V) -> Result<Cursor, Error> {
747        self.puts.inc();
748
749        // Update the section if needed
750        self.update_section().await?;
751
752        // Get head of the chain from table
753        let table_index = self.table_index(&key);
754        let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
755        let head = Self::read_latest_entry(&entry1, &entry2);
756
757        // Create new head of the chain
758        let entry = Record::new(
759            key,
760            value,
761            head.map(|(section, offset, _)| (section, offset)),
762        );
763
764        // Append entry to the variable journal
765        let (offset, _) = self.journal.append(self.current_section, entry).await?;
766
767        // Update the number of items added to the entry.
768        //
769        // We use `saturating_add` to handle overflow (when the table is at max size) gracefully.
770        let mut added = head.map(|(_, _, added)| added).unwrap_or(0);
771        added = added.saturating_add(1);
772
773        // If we've reached the threshold for resizing, increment the resizable entries
774        if added == self.table_resize_frequency {
775            self.resizable += 1;
776        }
777
778        // Update the old position
779        self.modified_sections.insert(self.current_section);
780        let new_entry = Entry::new(self.next_epoch, self.current_section, offset, added);
781        Self::update_head(&self.table, table_index, &entry1, &entry2, new_entry).await?;
782
783        // If we're mid-resize and this entry has already been processed, update the new position too
784        if let Some(resize_progress) = self.resize_progress {
785            if table_index < resize_progress {
786                self.unnecessary_writes.inc();
787
788                // If the previous entry crossed the threshold, so did this one
789                if added == self.table_resize_frequency {
790                    self.resizable += 1;
791                }
792
793                // This entry has been processed, so we need to update the new position as well.
794                //
795                // The entries are still identical to the old ones, so we don't need to read them again.
796                let new_table_index = self.table_size + table_index;
797                let new_entry = Entry::new(self.next_epoch, self.current_section, offset, added);
798                Self::update_head(&self.table, new_table_index, &entry1, &entry2, new_entry)
799                    .await?;
800            }
801        }
802
803        Ok(Cursor::new(self.current_section, offset))
804    }
805
806    /// Get the value for a given [Cursor].
807    async fn get_cursor(&self, cursor: Cursor) -> Result<V, Error> {
808        let entry = self.journal.get(cursor.section(), cursor.offset()).await?;
809
810        Ok(entry.value)
811    }
812
813    /// Get the first value for a given key.
814    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
815        self.gets.inc();
816
817        // Get head of the chain from table
818        let table_index = self.table_index(key);
819        let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
820        let Some((mut section, mut offset, _)) = Self::read_latest_entry(&entry1, &entry2) else {
821            return Ok(None);
822        };
823
824        // Follow the linked list chain to find the first matching key
825        loop {
826            // Get the entry from the variable journal
827            let entry = self.journal.get(section, offset).await?;
828
829            // Check if this key matches
830            if entry.key.as_ref() == key.as_ref() {
831                return Ok(Some(entry.value));
832            }
833
834            // Increment unnecessary reads
835            self.unnecessary_reads.inc();
836
837            // Follow the chain
838            let Some(next) = entry.next else {
839                break; // End of chain
840            };
841            section = next.0;
842            offset = next.1;
843        }
844
845        Ok(None)
846    }
847
848    /// Get the value for a given [Identifier].
849    ///
850    /// If a [Cursor] is known for the required key, it
851    /// is much faster to use it than searching for a `key`.
852    pub async fn get<'a>(&'a self, identifier: Identifier<'a, K>) -> Result<Option<V>, Error> {
853        match identifier {
854            Identifier::Cursor(cursor) => self.get_cursor(cursor).await.map(Some),
855            Identifier::Key(key) => self.get_key(key).await,
856        }
857    }
858
859    /// Resize the table by doubling its size and split each entry into two.
860    async fn start_resize(&mut self) -> Result<(), Error> {
861        self.resizes.inc();
862
863        // Double the table size (if not already at the max size)
864        let old_size = self.table_size;
865        let Some(new_size) = old_size.checked_mul(2) else {
866            return Ok(());
867        };
868        self.table.resize(Self::table_offset(new_size)).await?;
869
870        // Start the resize
871        self.resize_progress = Some(0);
872        debug!(old = old_size, new = new_size, "table resize started");
873
874        Ok(())
875    }
876
877    /// Write a pair of entries to a buffer, replacing one slot with the new entry.
878    fn rewrite_entries(buf: &mut Vec<u8>, entry1: &Entry, entry2: &Entry, new_entry: &Entry) {
879        if Self::compute_write_offset(entry1, entry2, new_entry.epoch) == 0 {
880            buf.extend_from_slice(&new_entry.encode());
881            buf.extend_from_slice(&entry2.encode());
882        } else {
883            buf.extend_from_slice(&entry1.encode());
884            buf.extend_from_slice(&new_entry.encode());
885        }
886    }
887
888    /// Continue a resize operation by processing the next chunk of entries.
889    ///
890    /// This function processes `table_resize_chunk_size` entries at a time,
891    /// allowing the resize to be spread across multiple sync operations to
892    /// avoid latency spikes.
893    async fn advance_resize(&mut self) -> Result<(), Error> {
894        // Compute the range to update
895        let current_index = self.resize_progress.unwrap();
896        let old_size = self.table_size;
897        let chunk_end = (current_index + self.table_resize_chunk_size).min(old_size);
898        let chunk_size = chunk_end - current_index;
899
900        // Read the entire chunk
901        let chunk_bytes = chunk_size as usize * Entry::FULL_SIZE;
902        let read_offset = Self::table_offset(current_index);
903        let read_buf = vec![0u8; chunk_bytes];
904        let read_buf: Vec<u8> = self.table.read_at(read_buf, read_offset).await?.into();
905
906        // Process each entry in the chunk
907        let mut writes = Vec::with_capacity(chunk_bytes);
908        for i in 0..chunk_size {
909            // Get the entry
910            let entry_offset = i as usize * Entry::FULL_SIZE;
911            let entry_end = entry_offset + Entry::FULL_SIZE;
912            let entry_buf = &read_buf[entry_offset..entry_end];
913
914            // Parse the two slots
915            let (entry1, entry2) = Self::parse_entries(entry_buf)?;
916
917            // Get the current head
918            let (section, offset, added) =
919                Self::read_latest_entry(&entry1, &entry2).unwrap_or((0, 0, 0));
920
921            // If the entry was over the threshold, decrement the resizable entries
922            if added >= self.table_resize_frequency {
923                self.resizable -= 1;
924            }
925
926            // Rewrite the entries
927            let reset_entry = Entry::new(self.next_epoch, section, offset, 0);
928            Self::rewrite_entries(&mut writes, &entry1, &entry2, &reset_entry);
929        }
930
931        // Put the writes into the table
932        let old_write = self.table.write_at(writes.clone(), read_offset);
933        let new_offset = (old_size as usize * Entry::FULL_SIZE) as u64 + read_offset;
934        let new_write = self.table.write_at(writes, new_offset);
935        try_join(old_write, new_write).await?;
936
937        // Update progress
938        if chunk_end >= old_size {
939            // Resize complete
940            self.table_size = old_size * 2;
941            self.table_resize_threshold = self.table_size as u64 * RESIZE_THRESHOLD / 100;
942            self.resize_progress = None;
943            debug!(
944                old = old_size,
945                new = self.table_size,
946                "table resize completed"
947            );
948        } else {
949            // More chunks to process
950            self.resize_progress = Some(chunk_end);
951            debug!(current = current_index, chunk_end, "table resize progress");
952        }
953
954        Ok(())
955    }
956
957    /// Sync all pending data in [Freezer].
958    ///
959    /// If the table needs to be resized, the resize will begin during this sync.
960    /// The resize operation is performed incrementally across multiple sync calls
961    /// to avoid a large latency spike (or unexpected long latency for [Freezer::put]).
962    /// Each sync will process up to `table_resize_chunk_size` entries until the resize
963    /// is complete.
964    pub async fn sync(&mut self) -> Result<Checkpoint, Error> {
965        // Sync all modified journal sections
966        let mut updates = Vec::with_capacity(self.modified_sections.len());
967        for section in &self.modified_sections {
968            updates.push(self.journal.sync(*section));
969        }
970        try_join_all(updates).await?;
971        self.modified_sections.clear();
972
973        // Start a resize (if needed)
974        if self.should_resize() && self.resize_progress.is_none() {
975            self.start_resize().await?;
976        }
977
978        // Continue a resize (if ongoing)
979        if self.resize_progress.is_some() {
980            self.advance_resize().await?;
981        }
982
983        // Sync updated table entries
984        self.table.sync().await?;
985        let stored_epoch = self.next_epoch;
986        self.next_epoch = self.next_epoch.checked_add(1).expect("epoch overflow");
987
988        Ok(Checkpoint {
989            epoch: stored_epoch,
990            section: self.current_section,
991            size: self.journal.size(self.current_section).await?,
992            table_size: self.table_size,
993        })
994    }
995
996    /// Close the [Freezer] and return a [Checkpoint] for recovery.
997    pub async fn close(mut self) -> Result<Checkpoint, Error> {
998        // If we're mid-resize, complete it
999        while self.resize_progress.is_some() {
1000            self.advance_resize().await?;
1001        }
1002
1003        // Sync any pending updates before closing
1004        let checkpoint = self.sync().await?;
1005
1006        self.journal.close().await?;
1007        self.table.sync().await?;
1008        Ok(checkpoint)
1009    }
1010
1011    /// Close and remove any underlying blobs created by the [Freezer].
1012    pub async fn destroy(self) -> Result<(), Error> {
1013        // Destroy the journal (removes all journal sections)
1014        self.journal.destroy().await?;
1015
1016        // Destroy the table
1017        drop(self.table);
1018        self.context
1019            .remove(&self.table_partition, Some(TABLE_BLOB_NAME))
1020            .await?;
1021        self.context.remove(&self.table_partition, None).await?;
1022
1023        Ok(())
1024    }
1025
1026    /// Get the current progress of the resize operation.
1027    ///
1028    /// Returns `None` if the [Freezer] is not resizing.
1029    #[cfg(test)]
1030    pub const fn resizing(&self) -> Option<u32> {
1031        self.resize_progress
1032    }
1033
1034    /// Get the number of resizable entries.
1035    #[cfg(test)]
1036    pub const fn resizable(&self) -> u32 {
1037        self.resizable
1038    }
1039}
1040
1041impl<E: Storage + Metrics + Clock, K: Array, V: Codec> crate::store::Store for Freezer<E, K, V> {
1042    type Key = K;
1043    type Value = V;
1044    type Error = Error;
1045
1046    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
1047        self.get(Identifier::Key(key)).await
1048    }
1049}
1050
1051impl<E: Storage + Metrics + Clock, K: Array, V: Codec> crate::store::StoreMut for Freezer<E, K, V> {
1052    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
1053        self.put(key, value).await?;
1054        Ok(())
1055    }
1056}
1057
1058impl<E: Storage + Metrics + Clock, K: Array, V: Codec> crate::store::StorePersistable
1059    for Freezer<E, K, V>
1060{
1061    async fn commit(&mut self) -> Result<(), Self::Error> {
1062        self.sync().await?;
1063        Ok(())
1064    }
1065
1066    async fn destroy(self) -> Result<(), Self::Error> {
1067        self.destroy().await
1068    }
1069}
1070
1071#[cfg(all(test, feature = "arbitrary"))]
1072mod conformance {
1073    use super::*;
1074    use commonware_codec::conformance::CodecConformance;
1075    use commonware_utils::sequence::U64;
1076
1077    commonware_conformance::conformance_tests! {
1078        CodecConformance<Cursor>,
1079        CodecConformance<Checkpoint>,
1080        CodecConformance<Entry>,
1081        CodecConformance<Record<U64, U64>>
1082    }
1083}