commonware_storage/freezer/
storage.rs

1use super::{Config, Error, Identifier};
2use crate::journal::variable::{Config as JournalConfig, Journal};
3use bytes::{Buf, BufMut};
4use commonware_codec::{Codec, Encode, EncodeSize, FixedSize, Read, ReadExt, Write as CodecWrite};
5use commonware_runtime::{buffer, Blob, Clock, Metrics, Storage};
6use commonware_utils::{Array, Span};
7use futures::future::{try_join, try_join_all};
8use prometheus_client::metrics::counter::Counter;
9use std::{
10    cmp::Ordering, collections::BTreeSet, marker::PhantomData, num::NonZeroUsize, ops::Deref,
11};
12use tracing::debug;
13
14/// The percentage of table entries that must reach `table_resize_frequency`
15/// before a resize is triggered.
16const RESIZE_THRESHOLD: u64 = 50;
17
18/// Location of an item in the [Freezer].
19///
20/// This can be used to directly access the data for a given
21/// key-value pair (rather than walking the journal chain).
22#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
23#[repr(transparent)]
24pub struct Cursor([u8; u64::SIZE + u32::SIZE]);
25
26impl Cursor {
27    /// Create a new [Cursor].
28    fn new(section: u64, offset: u32) -> Self {
29        let mut buf = [0u8; u64::SIZE + u32::SIZE];
30        buf[..u64::SIZE].copy_from_slice(&section.to_be_bytes());
31        buf[u64::SIZE..].copy_from_slice(&offset.to_be_bytes());
32        Self(buf)
33    }
34
35    /// Get the section of the cursor.
36    fn section(&self) -> u64 {
37        u64::from_be_bytes(self.0[..u64::SIZE].try_into().unwrap())
38    }
39
40    /// Get the offset of the cursor.
41    fn offset(&self) -> u32 {
42        u32::from_be_bytes(self.0[u64::SIZE..].try_into().unwrap())
43    }
44}
45
46impl Read for Cursor {
47    type Cfg = ();
48
49    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
50        <[u8; u64::SIZE + u32::SIZE]>::read(buf).map(Self)
51    }
52}
53
54impl CodecWrite for Cursor {
55    fn write(&self, buf: &mut impl BufMut) {
56        self.0.write(buf);
57    }
58}
59
60impl FixedSize for Cursor {
61    const SIZE: usize = u64::SIZE + u32::SIZE;
62}
63
64impl Span for Cursor {}
65
66impl Array for Cursor {}
67
68impl Deref for Cursor {
69    type Target = [u8];
70    fn deref(&self) -> &Self::Target {
71        &self.0
72    }
73}
74
75impl AsRef<[u8]> for Cursor {
76    fn as_ref(&self) -> &[u8] {
77        &self.0
78    }
79}
80
81impl std::fmt::Debug for Cursor {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        write!(
84            f,
85            "Cursor(section={}, offset={})",
86            self.section(),
87            self.offset()
88        )
89    }
90}
91
92impl std::fmt::Display for Cursor {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        write!(
95            f,
96            "Cursor(section={}, offset={})",
97            self.section(),
98            self.offset()
99        )
100    }
101}
102
103/// Marker of [Freezer] progress.
104///
105/// This can be used to restore the [Freezer] to a consistent
106/// state after shutdown.
107#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
108pub struct Checkpoint {
109    /// The epoch of the last committed operation.
110    epoch: u64,
111    /// The section of the last committed operation.
112    section: u64,
113    /// The size of the journal in the last committed section.
114    size: u64,
115    /// The size of the table.
116    table_size: u32,
117}
118
119impl Checkpoint {
120    /// Initialize a new [Checkpoint].
121    fn init(table_size: u32) -> Self {
122        Self {
123            table_size,
124            epoch: 0,
125            section: 0,
126            size: 0,
127        }
128    }
129}
130
131impl Read for Checkpoint {
132    type Cfg = ();
133    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
134        let epoch = u64::read(buf)?;
135        let section = u64::read(buf)?;
136        let size = u64::read(buf)?;
137        let table_size = u32::read(buf)?;
138        Ok(Self {
139            epoch,
140            section,
141            size,
142            table_size,
143        })
144    }
145}
146
147impl CodecWrite for Checkpoint {
148    fn write(&self, buf: &mut impl BufMut) {
149        self.epoch.write(buf);
150        self.section.write(buf);
151        self.size.write(buf);
152        self.table_size.write(buf);
153    }
154}
155
156impl FixedSize for Checkpoint {
157    const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
158}
159
160/// Name of the table blob.
161const TABLE_BLOB_NAME: &[u8] = b"table";
162
163/// Single table entry stored in the table blob.
164#[derive(Debug, Clone, PartialEq)]
165struct Entry {
166    // Epoch in which this slot was written
167    epoch: u64,
168    // Section in which this slot was written
169    section: u64,
170    // Offset in the section where this slot was written
171    offset: u32,
172    // Number of items added to this entry since last resize
173    added: u8,
174    // CRC of (epoch | section | offset | added)
175    crc: u32,
176}
177
178impl Entry {
179    /// The full size of a table entry (2 slots).
180    const FULL_SIZE: usize = Self::SIZE * 2;
181
182    /// Compute a checksum for [Entry].
183    fn compute_crc(epoch: u64, section: u64, offset: u32, added: u8) -> u32 {
184        let mut hasher = crc32fast::Hasher::new();
185        hasher.update(&epoch.to_be_bytes());
186        hasher.update(&section.to_be_bytes());
187        hasher.update(&offset.to_be_bytes());
188        hasher.update(&added.to_be_bytes());
189        hasher.finalize()
190    }
191
192    /// Create a new [Entry].
193    fn new(epoch: u64, section: u64, offset: u32, added: u8) -> Self {
194        Self {
195            epoch,
196            section,
197            offset,
198            added,
199            crc: Self::compute_crc(epoch, section, offset, added),
200        }
201    }
202
203    /// Check if this entry is empty (all zeros).
204    fn is_empty(&self) -> bool {
205        self.section == 0 && self.offset == 0 && self.crc == 0
206    }
207
208    /// Check if this entry is valid.
209    fn is_valid(&self) -> bool {
210        Self::compute_crc(self.epoch, self.section, self.offset, self.added) == self.crc
211    }
212}
213
214impl FixedSize for Entry {
215    const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE + u8::SIZE + u32::SIZE;
216}
217
218impl CodecWrite for Entry {
219    fn write(&self, buf: &mut impl BufMut) {
220        self.epoch.write(buf);
221        self.section.write(buf);
222        self.offset.write(buf);
223        self.added.write(buf);
224        self.crc.write(buf);
225    }
226}
227
228impl Read for Entry {
229    type Cfg = ();
230    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
231        let epoch = u64::read(buf)?;
232        let section = u64::read(buf)?;
233        let offset = u32::read(buf)?;
234        let added = u8::read(buf)?;
235        let crc = u32::read(buf)?;
236
237        Ok(Self {
238            epoch,
239            section,
240            offset,
241            added,
242            crc,
243        })
244    }
245}
246
247/// A key-value pair stored in the [Journal].
248struct Record<K: Array, V: Codec> {
249    key: K,
250    value: V,
251    next: Option<(u64, u32)>,
252}
253
254impl<K: Array, V: Codec> Record<K, V> {
255    /// Create a new [Record].
256    fn new(key: K, value: V, next: Option<(u64, u32)>) -> Self {
257        Self { key, value, next }
258    }
259}
260
261impl<K: Array, V: Codec> CodecWrite for Record<K, V> {
262    fn write(&self, buf: &mut impl BufMut) {
263        self.key.write(buf);
264        self.value.write(buf);
265        self.next.write(buf);
266    }
267}
268
269impl<K: Array, V: Codec> Read for Record<K, V> {
270    type Cfg = V::Cfg;
271    fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
272        let key = K::read(buf)?;
273        let value = V::read_cfg(buf, cfg)?;
274        let next = Option::<(u64, u32)>::read_cfg(buf, &((), ()))?;
275
276        Ok(Self { key, value, next })
277    }
278}
279
280impl<K: Array, V: Codec> EncodeSize for Record<K, V> {
281    fn encode_size(&self) -> usize {
282        K::SIZE + self.value.encode_size() + self.next.encode_size()
283    }
284}
285
286/// Implementation of [Freezer].
287pub struct Freezer<E: Storage + Metrics + Clock, K: Array, V: Codec> {
288    // Context for storage operations
289    context: E,
290
291    // Table configuration
292    table_partition: String,
293    table_size: u32,
294    table_resize_threshold: u64,
295    table_resize_frequency: u8,
296    table_resize_chunk_size: u32,
297
298    // Table blob that maps slots to journal chain heads
299    table: E::Blob,
300
301    // Variable journal for storing entries
302    journal: Journal<E, Record<K, V>>,
303    journal_target_size: u64,
304
305    // Current section for new writes
306    current_section: u64,
307    next_epoch: u64,
308
309    // Sections with pending table updates to be synced
310    modified_sections: BTreeSet<u64>,
311    resizable: u32,
312    resize_progress: Option<u32>,
313
314    // Metrics
315    puts: Counter,
316    gets: Counter,
317    unnecessary_reads: Counter,
318    unnecessary_writes: Counter,
319    resizes: Counter,
320
321    // Phantom data to satisfy the compiler about generic types
322    _phantom: PhantomData<(K, V)>,
323}
324
325impl<E: Storage + Metrics + Clock, K: Array, V: Codec> Freezer<E, K, V> {
326    /// Calculate the byte offset for a table index.
327    #[inline]
328    fn table_offset(table_index: u32) -> u64 {
329        table_index as u64 * Entry::FULL_SIZE as u64
330    }
331
332    /// Parse table entries from a buffer.
333    fn parse_entries(buf: &[u8]) -> Result<(Entry, Entry), Error> {
334        let mut buf1 = &buf[0..Entry::SIZE];
335        let entry1 = Entry::read(&mut buf1)?;
336        let mut buf2 = &buf[Entry::SIZE..Entry::FULL_SIZE];
337        let entry2 = Entry::read(&mut buf2)?;
338        Ok((entry1, entry2))
339    }
340
341    /// Read entries from the table blob.
342    async fn read_table(blob: &E::Blob, table_index: u32) -> Result<(Entry, Entry), Error> {
343        let offset = Self::table_offset(table_index);
344        let buf = vec![0u8; Entry::FULL_SIZE];
345        let read_buf = blob.read_at(buf, offset).await?;
346
347        Self::parse_entries(read_buf.as_ref())
348    }
349
350    /// Recover a single table entry and update tracking.
351    async fn recover_entry(
352        blob: &E::Blob,
353        entry: &mut Entry,
354        entry_offset: u64,
355        max_valid_epoch: Option<u64>,
356        max_epoch: &mut u64,
357        max_section: &mut u64,
358    ) -> Result<bool, Error> {
359        if entry.is_empty() {
360            return Ok(false);
361        }
362
363        if !entry.is_valid()
364            || (max_valid_epoch.is_some() && entry.epoch > max_valid_epoch.unwrap())
365        {
366            debug!(
367                valid_epoch = max_valid_epoch,
368                entry_epoch = entry.epoch,
369                "found invalid table entry"
370            );
371            *entry = Entry::new(0, 0, 0, 0);
372            let zero_buf = vec![0u8; Entry::SIZE];
373            blob.write_at(zero_buf, entry_offset).await?;
374            Ok(true)
375        } else if max_valid_epoch.is_none() && entry.epoch > *max_epoch {
376            // Only track max epoch if we're discovering it (not validating against a known epoch)
377            *max_epoch = entry.epoch;
378            *max_section = entry.section;
379            Ok(false)
380        } else {
381            Ok(false)
382        }
383    }
384
385    /// Validate and clean invalid table entries for a given epoch.
386    ///
387    /// Returns (modified, max_epoch, max_section, resizable) where:
388    /// - modified: whether any entries were cleaned
389    /// - max_epoch: the maximum valid epoch found
390    /// - max_section: the section corresponding to `max_epoch`
391    /// - resizable: the number of entries that can be resized
392    async fn recover_table(
393        blob: &E::Blob,
394        table_size: u32,
395        table_resize_frequency: u8,
396        max_valid_epoch: Option<u64>,
397        table_replay_buffer: NonZeroUsize,
398    ) -> Result<(bool, u64, u64, u32), Error> {
399        // Create a buffered reader for efficient scanning
400        let blob_size = Self::table_offset(table_size);
401        let mut reader = buffer::Read::new(blob.clone(), blob_size, table_replay_buffer);
402
403        // Iterate over all table entries and overwrite invalid ones
404        let mut modified = false;
405        let mut max_epoch = 0u64;
406        let mut max_section = 0u64;
407        let mut resizable = 0u32;
408        for table_index in 0..table_size {
409            let offset = Self::table_offset(table_index);
410
411            // Read both entries from the buffer
412            let mut buf = [0u8; Entry::FULL_SIZE];
413            reader.read_exact(&mut buf, Entry::FULL_SIZE).await?;
414            let (mut entry1, mut entry2) = Self::parse_entries(&buf)?;
415
416            // Check both entries
417            let entry1_cleared = Self::recover_entry(
418                blob,
419                &mut entry1,
420                offset,
421                max_valid_epoch,
422                &mut max_epoch,
423                &mut max_section,
424            )
425            .await?;
426            let entry2_cleared = Self::recover_entry(
427                blob,
428                &mut entry2,
429                offset + Entry::SIZE as u64,
430                max_valid_epoch,
431                &mut max_epoch,
432                &mut max_section,
433            )
434            .await?;
435            modified |= entry1_cleared || entry2_cleared;
436
437            // If the latest entry has reached the resize frequency, increment the resizable entries
438            if let Some((_, _, added)) = Self::read_latest_entry(&entry1, &entry2) {
439                if added >= table_resize_frequency {
440                    resizable += 1;
441                }
442            }
443        }
444
445        Ok((modified, max_epoch, max_section, resizable))
446    }
447
448    /// Determine the write offset for a table entry based on current entries and epoch.
449    fn compute_write_offset(entry1: &Entry, entry2: &Entry, epoch: u64) -> u64 {
450        // If either entry matches the current epoch, overwrite it
451        if !entry1.is_empty() && entry1.epoch == epoch {
452            return 0;
453        }
454        if !entry2.is_empty() && entry2.epoch == epoch {
455            return Entry::SIZE as u64;
456        }
457
458        // Otherwise, write to the older slot (or empty slot)
459        match (entry1.is_empty(), entry2.is_empty()) {
460            (true, _) => 0,                  // First slot is empty
461            (_, true) => Entry::SIZE as u64, // Second slot is empty
462            (false, false) => {
463                if entry1.epoch < entry2.epoch {
464                    0
465                } else {
466                    Entry::SIZE as u64
467                }
468            }
469        }
470    }
471
472    /// Read the latest valid entry from two table slots.
473    fn read_latest_entry(entry1: &Entry, entry2: &Entry) -> Option<(u64, u32, u8)> {
474        match (
475            !entry1.is_empty() && entry1.is_valid(),
476            !entry2.is_empty() && entry2.is_valid(),
477        ) {
478            (true, true) => match entry1.epoch.cmp(&entry2.epoch) {
479                Ordering::Greater => Some((entry1.section, entry1.offset, entry1.added)),
480                Ordering::Less => Some((entry2.section, entry2.offset, entry2.added)),
481                Ordering::Equal => {
482                    unreachable!("two valid entries with the same epoch")
483                }
484            },
485            (true, false) => Some((entry1.section, entry1.offset, entry1.added)),
486            (false, true) => Some((entry2.section, entry2.offset, entry2.added)),
487            (false, false) => None,
488        }
489    }
490
491    /// Write a table entry to the appropriate slot based on epoch.
492    async fn update_head(
493        table: &E::Blob,
494        table_index: u32,
495        entry1: &Entry,
496        entry2: &Entry,
497        update: Entry,
498    ) -> Result<(), Error> {
499        // Calculate the base offset for this table index
500        let table_offset = Self::table_offset(table_index);
501
502        // Determine which slot to write to based on the provided entries
503        let start = Self::compute_write_offset(entry1, entry2, update.epoch);
504
505        // Write the new entry
506        table
507            .write_at(update.encode(), table_offset + start)
508            .await
509            .map_err(Error::Runtime)
510    }
511
512    /// Initialize table with given size and sync.
513    async fn init_table(blob: &E::Blob, table_size: u32) -> Result<(), Error> {
514        let table_len = Self::table_offset(table_size);
515        blob.resize(table_len).await?;
516        blob.sync().await?;
517        Ok(())
518    }
519
520    /// Initialize a new [Freezer] instance.
521    pub async fn init(context: E, config: Config<V::Cfg>) -> Result<Self, Error> {
522        Self::init_with_checkpoint(context, config, None).await
523    }
524
525    /// Initialize a new [Freezer] instance with a [Checkpoint].
526    // TODO(#1227): Hide this complexity from the caller.
527    pub async fn init_with_checkpoint(
528        context: E,
529        config: Config<V::Cfg>,
530        checkpoint: Option<Checkpoint>,
531    ) -> Result<Self, Error> {
532        // Validate that initial_table_size is a power of 2
533        assert!(
534            config.table_initial_size > 0 && config.table_initial_size.is_power_of_two(),
535            "table_initial_size must be a power of 2"
536        );
537
538        // Initialize variable journal with a separate partition
539        let journal_config = JournalConfig {
540            partition: config.journal_partition,
541            compression: config.journal_compression,
542            codec_config: config.codec_config,
543            write_buffer: config.journal_write_buffer,
544            buffer_pool: config.journal_buffer_pool,
545        };
546        let mut journal = Journal::init(context.clone(), journal_config).await?;
547
548        // Open table blob
549        let (table, table_len) = context
550            .open(&config.table_partition, TABLE_BLOB_NAME)
551            .await?;
552
553        // Determine checkpoint based on initialization scenario
554        let (checkpoint, resizable) = match (table_len, checkpoint) {
555            // New table with no data
556            (0, None) => {
557                Self::init_table(&table, config.table_initial_size).await?;
558                (Checkpoint::init(config.table_initial_size), 0)
559            }
560
561            // New table with explicit checkpoint (must be empty)
562            (0, Some(checkpoint)) => {
563                assert_eq!(checkpoint.epoch, 0);
564                assert_eq!(checkpoint.section, 0);
565                assert_eq!(checkpoint.size, 0);
566                assert_eq!(checkpoint.table_size, 0);
567
568                Self::init_table(&table, config.table_initial_size).await?;
569                (Checkpoint::init(config.table_initial_size), 0)
570            }
571
572            // Existing table with checkpoint
573            (_, Some(checkpoint)) => {
574                assert!(
575                    checkpoint.table_size > 0 && checkpoint.table_size.is_power_of_two(),
576                    "table_size must be a power of 2"
577                );
578
579                // Rewind the journal to the committed section and offset
580                journal.rewind(checkpoint.section, checkpoint.size).await?;
581                journal.sync(checkpoint.section).await?;
582
583                // Resize table if needed
584                let expected_table_len = Self::table_offset(checkpoint.table_size);
585                let mut modified = if table_len != expected_table_len {
586                    table.resize(expected_table_len).await?;
587                    true
588                } else {
589                    false
590                };
591
592                // Validate and clean invalid entries
593                let (table_modified, _, _, resizable) = Self::recover_table(
594                    &table,
595                    checkpoint.table_size,
596                    config.table_resize_frequency,
597                    Some(checkpoint.epoch),
598                    config.table_replay_buffer,
599                )
600                .await?;
601                if table_modified {
602                    modified = true;
603                }
604
605                // Sync table if needed
606                if modified {
607                    table.sync().await?;
608                }
609
610                (checkpoint, resizable)
611            }
612
613            // Existing table without checkpoint
614            (_, None) => {
615                // Find max epoch/section and clean invalid entries in a single pass
616                let table_size = (table_len / Entry::FULL_SIZE as u64) as u32;
617                let (modified, max_epoch, max_section, resizable) = Self::recover_table(
618                    &table,
619                    table_size,
620                    config.table_resize_frequency,
621                    None,
622                    config.table_replay_buffer,
623                )
624                .await?;
625
626                // Sync table if needed
627                if modified {
628                    table.sync().await?;
629                }
630
631                (
632                    Checkpoint {
633                        epoch: max_epoch,
634                        section: max_section,
635                        size: journal.size(max_section).await?,
636                        table_size,
637                    },
638                    resizable,
639                )
640            }
641        };
642
643        // Create metrics
644        let puts = Counter::default();
645        let gets = Counter::default();
646        let unnecessary_reads = Counter::default();
647        let unnecessary_writes = Counter::default();
648        let resizes = Counter::default();
649        context.register("puts", "number of put operations", puts.clone());
650        context.register("gets", "number of get operations", gets.clone());
651        context.register(
652            "unnecessary_reads",
653            "number of unnecessary reads performed during key lookups",
654            unnecessary_reads.clone(),
655        );
656        context.register(
657            "unnecessary_writes",
658            "number of unnecessary writes performed during resize",
659            unnecessary_writes.clone(),
660        );
661        context.register(
662            "resizes",
663            "number of table resizing operations",
664            resizes.clone(),
665        );
666
667        Ok(Self {
668            context,
669            table_partition: config.table_partition,
670            table_size: checkpoint.table_size,
671            table_resize_threshold: checkpoint.table_size as u64 * RESIZE_THRESHOLD / 100,
672            table_resize_frequency: config.table_resize_frequency,
673            table_resize_chunk_size: config.table_resize_chunk_size,
674            table,
675            journal,
676            journal_target_size: config.journal_target_size,
677            current_section: checkpoint.section,
678            next_epoch: checkpoint.epoch.checked_add(1).expect("epoch overflow"),
679            modified_sections: BTreeSet::new(),
680            resizable,
681            resize_progress: None,
682            puts,
683            gets,
684            unnecessary_reads,
685            unnecessary_writes,
686            resizes,
687            _phantom: PhantomData,
688        })
689    }
690
691    /// Compute the table index for a given key.
692    ///
693    /// As the table doubles in size during a resize, each existing entry splits into two:
694    /// one at the original index and another at a new index (original index + previous table size).
695    ///
696    /// For example, with an initial table size of 4 (2^2):
697    /// - Initially: uses 2 bits of the hash, mapping to entries 0, 1, 2, 3.
698    /// - After resizing to 8: uses 3 bits, entry 0 splits into indices 0 and 4.
699    /// - After resizing to 16: uses 4 bits, entry 0 splits into indices 0 and 8, and so on.
700    ///
701    /// To determine the appropriate entry, we AND the key's hash with the current table size.
702    fn table_index(&self, key: &K) -> u32 {
703        let hash = crc32fast::hash(key.as_ref());
704        hash & (self.table_size - 1)
705    }
706
707    /// Determine if the table should be resized.
708    fn should_resize(&self) -> bool {
709        self.resizable as u64 >= self.table_resize_threshold
710    }
711
712    /// Determine which journal section to write to based on current journal size.
713    async fn update_section(&mut self) -> Result<(), Error> {
714        // Get the current section size
715        let size = self.journal.size(self.current_section).await?;
716
717        // If the current section has reached the target size, create a new section
718        if size >= self.journal_target_size {
719            self.current_section += 1;
720            debug!(size, section = self.current_section, "updated section");
721        }
722
723        Ok(())
724    }
725
726    /// Put a key-value pair into the [Freezer].
727    pub async fn put(&mut self, key: K, value: V) -> Result<Cursor, Error> {
728        self.puts.inc();
729
730        // Update the section if needed
731        self.update_section().await?;
732
733        // Get head of the chain from table
734        let table_index = self.table_index(&key);
735        let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
736        let head = Self::read_latest_entry(&entry1, &entry2);
737
738        // Create new head of the chain
739        let entry = Record::new(
740            key,
741            value,
742            head.map(|(section, offset, _)| (section, offset)),
743        );
744
745        // Append entry to the variable journal
746        let (offset, _) = self.journal.append(self.current_section, entry).await?;
747
748        // Update the number of items added to the entry.
749        //
750        // We use `saturating_add` to handle overflow (when the table is at max size) gracefully.
751        let mut added = head.map(|(_, _, added)| added).unwrap_or(0);
752        added = added.saturating_add(1);
753
754        // If we've reached the threshold for resizing, increment the resizable entries
755        if added == self.table_resize_frequency {
756            self.resizable += 1;
757        }
758
759        // Update the old position
760        self.modified_sections.insert(self.current_section);
761        let new_entry = Entry::new(self.next_epoch, self.current_section, offset, added);
762        Self::update_head(&self.table, table_index, &entry1, &entry2, new_entry).await?;
763
764        // If we're mid-resize and this entry has already been processed, update the new position too
765        if let Some(resize_progress) = self.resize_progress {
766            if table_index < resize_progress {
767                self.unnecessary_writes.inc();
768
769                // If the previous entry crossed the threshold, so did this one
770                if added == self.table_resize_frequency {
771                    self.resizable += 1;
772                }
773
774                // This entry has been processed, so we need to update the new position as well.
775                //
776                // The entries are still identical to the old ones, so we don't need to read them again.
777                let new_table_index = self.table_size + table_index;
778                let new_entry = Entry::new(self.next_epoch, self.current_section, offset, added);
779                Self::update_head(&self.table, new_table_index, &entry1, &entry2, new_entry)
780                    .await?;
781            }
782        }
783
784        Ok(Cursor::new(self.current_section, offset))
785    }
786
787    /// Get the value for a given [Cursor].
788    async fn get_cursor(&self, cursor: Cursor) -> Result<Option<V>, Error> {
789        let entry = self.journal.get(cursor.section(), cursor.offset()).await?;
790        let Some(entry) = entry else {
791            return Ok(None);
792        };
793
794        Ok(Some(entry.value))
795    }
796
797    /// Get the first value for a given key.
798    async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
799        self.gets.inc();
800
801        // Get head of the chain from table
802        let table_index = self.table_index(key);
803        let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
804        let Some((mut section, mut offset, _)) = Self::read_latest_entry(&entry1, &entry2) else {
805            return Ok(None);
806        };
807
808        // Follow the linked list chain to find the first matching key
809        loop {
810            // Get the entry from the variable journal
811            let entry = match self.journal.get(section, offset).await? {
812                Some(entry) => entry,
813                None => unreachable!("missing entry"),
814            };
815
816            // Check if this key matches
817            if entry.key.as_ref() == key.as_ref() {
818                return Ok(Some(entry.value));
819            }
820
821            // Increment unnecessary reads
822            self.unnecessary_reads.inc();
823
824            // Follow the chain
825            let Some(next) = entry.next else {
826                break; // End of chain
827            };
828            section = next.0;
829            offset = next.1;
830        }
831
832        Ok(None)
833    }
834
835    /// Get the value for a given [Identifier].
836    ///
837    /// If a [Cursor] is known for the required key, it
838    /// is much faster to use it than searching for a `key`.
839    pub async fn get<'a>(&'a self, identifier: Identifier<'a, K>) -> Result<Option<V>, Error> {
840        match identifier {
841            Identifier::Cursor(cursor) => self.get_cursor(cursor).await,
842            Identifier::Key(key) => self.get_key(key).await,
843        }
844    }
845
846    /// Resize the table by doubling its size and split each entry into two.
847    async fn start_resize(&mut self) -> Result<(), Error> {
848        self.resizes.inc();
849
850        // Double the table size (if not already at the max size)
851        let old_size = self.table_size;
852        let Some(new_size) = old_size.checked_mul(2) else {
853            return Ok(());
854        };
855        self.table.resize(Self::table_offset(new_size)).await?;
856
857        // Start the resize
858        self.resize_progress = Some(0);
859        debug!(old = old_size, new = new_size, "table resize started");
860
861        Ok(())
862    }
863
864    /// Write a pair of entries to a buffer, replacing one slot with the new entry.
865    fn rewrite_entries(buf: &mut Vec<u8>, entry1: &Entry, entry2: &Entry, new_entry: &Entry) {
866        if Self::compute_write_offset(entry1, entry2, new_entry.epoch) == 0 {
867            buf.extend_from_slice(&new_entry.encode());
868            buf.extend_from_slice(&entry2.encode());
869        } else {
870            buf.extend_from_slice(&entry1.encode());
871            buf.extend_from_slice(&new_entry.encode());
872        }
873    }
874
875    /// Continue a resize operation by processing the next chunk of entries.
876    ///
877    /// This function processes `table_resize_chunk_size` entries at a time,
878    /// allowing the resize to be spread across multiple sync operations to
879    /// avoid latency spikes.
880    async fn advance_resize(&mut self) -> Result<(), Error> {
881        // Compute the range to update
882        let current_index = self.resize_progress.unwrap();
883        let old_size = self.table_size;
884        let chunk_end = (current_index + self.table_resize_chunk_size).min(old_size);
885        let chunk_size = chunk_end - current_index;
886
887        // Read the entire chunk
888        let chunk_bytes = chunk_size as usize * Entry::FULL_SIZE;
889        let read_offset = Self::table_offset(current_index);
890        let read_buf = vec![0u8; chunk_bytes];
891        let read_buf: Vec<u8> = self.table.read_at(read_buf, read_offset).await?.into();
892
893        // Process each entry in the chunk
894        let mut writes = Vec::with_capacity(chunk_bytes);
895        for i in 0..chunk_size {
896            // Get the entry
897            let entry_offset = i as usize * Entry::FULL_SIZE;
898            let entry_end = entry_offset + Entry::FULL_SIZE;
899            let entry_buf = &read_buf[entry_offset..entry_end];
900
901            // Parse the two slots
902            let (entry1, entry2) = Self::parse_entries(entry_buf)?;
903
904            // Get the current head
905            let (section, offset, added) =
906                Self::read_latest_entry(&entry1, &entry2).unwrap_or((0, 0, 0));
907
908            // If the entry was over the threshold, decrement the resizable entries
909            if added >= self.table_resize_frequency {
910                self.resizable -= 1;
911            }
912
913            // Rewrite the entries
914            let reset_entry = Entry::new(self.next_epoch, section, offset, 0);
915            Self::rewrite_entries(&mut writes, &entry1, &entry2, &reset_entry);
916        }
917
918        // Put the writes into the table
919        let old_write = self.table.write_at(writes.clone(), read_offset);
920        let new_offset = (old_size as usize * Entry::FULL_SIZE) as u64 + read_offset;
921        let new_write = self.table.write_at(writes, new_offset);
922        try_join(old_write, new_write).await?;
923
924        // Update progress
925        if chunk_end >= old_size {
926            // Resize complete
927            self.table_size = old_size * 2;
928            self.table_resize_threshold = self.table_size as u64 * RESIZE_THRESHOLD / 100;
929            self.resize_progress = None;
930            debug!(
931                old = old_size,
932                new = self.table_size,
933                "table resize completed"
934            );
935        } else {
936            // More chunks to process
937            self.resize_progress = Some(chunk_end);
938            debug!(current = current_index, chunk_end, "table resize progress");
939        }
940
941        Ok(())
942    }
943
944    /// Sync all pending data in [Freezer].
945    ///
946    /// If the table needs to be resized, the resize will begin during this sync.
947    /// The resize operation is performed incrementally across multiple sync calls
948    /// to avoid a large latency spike (or unexpected long latency for [Freezer::put]).
949    /// Each sync will process up to `table_resize_chunk_size` entries until the resize
950    /// is complete.
951    pub async fn sync(&mut self) -> Result<Checkpoint, Error> {
952        // Sync all modified journal sections
953        let mut updates = Vec::with_capacity(self.modified_sections.len());
954        for section in &self.modified_sections {
955            updates.push(self.journal.sync(*section));
956        }
957        try_join_all(updates).await?;
958        self.modified_sections.clear();
959
960        // Start a resize (if needed)
961        if self.should_resize() && self.resize_progress.is_none() {
962            self.start_resize().await?;
963        }
964
965        // Continue a resize (if ongoing)
966        if self.resize_progress.is_some() {
967            self.advance_resize().await?;
968        }
969
970        // Sync updated table entries
971        self.table.sync().await?;
972        let stored_epoch = self.next_epoch;
973        self.next_epoch = self.next_epoch.checked_add(1).expect("epoch overflow");
974
975        Ok(Checkpoint {
976            epoch: stored_epoch,
977            section: self.current_section,
978            size: self.journal.size(self.current_section).await?,
979            table_size: self.table_size,
980        })
981    }
982
983    /// Close the [Freezer] and return a [Checkpoint] for recovery.
984    pub async fn close(mut self) -> Result<Checkpoint, Error> {
985        // If we're mid-resize, complete it
986        while self.resize_progress.is_some() {
987            self.advance_resize().await?;
988        }
989
990        // Sync any pending updates before closing
991        let checkpoint = self.sync().await?;
992
993        self.journal.close().await?;
994        self.table.sync().await?;
995        Ok(checkpoint)
996    }
997
998    /// Close and remove any underlying blobs created by the [Freezer].
999    pub async fn destroy(self) -> Result<(), Error> {
1000        // Destroy the journal (removes all journal sections)
1001        self.journal.destroy().await?;
1002
1003        // Destroy the table
1004        drop(self.table);
1005        self.context
1006            .remove(&self.table_partition, Some(TABLE_BLOB_NAME))
1007            .await?;
1008        self.context.remove(&self.table_partition, None).await?;
1009
1010        Ok(())
1011    }
1012
1013    /// Get the current progress of the resize operation.
1014    ///
1015    /// Returns `None` if the [Freezer] is not resizing.
1016    #[cfg(test)]
1017    pub fn resizing(&self) -> Option<u32> {
1018        self.resize_progress
1019    }
1020
1021    /// Get the number of resizable entries.
1022    #[cfg(test)]
1023    pub fn resizable(&self) -> u32 {
1024        self.resizable
1025    }
1026}