1use super::{Config, Error, Identifier};
2use crate::{
3 journal::segmented::oversized::{
4 Config as OversizedConfig, Oversized, Record as OversizedRecord,
5 },
6 kv,
7};
8use commonware_codec::{CodecShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite};
9use commonware_cryptography::{crc32, Crc32, Hasher};
10use commonware_runtime::{buffer, Blob, Buf, BufMut, BufferPooler, Clock, IoBuf, Metrics, Storage};
11use commonware_utils::{Array, Span};
12use futures::future::{try_join, try_join_all};
13use prometheus_client::metrics::counter::Counter;
14use std::{cmp::Ordering, collections::BTreeSet, num::NonZeroUsize, ops::Deref};
15use tracing::debug;
16
17const RESIZE_THRESHOLD: u64 = 50;
20
21#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
26#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
27#[repr(transparent)]
28pub struct Cursor([u8; u64::SIZE + u64::SIZE + u32::SIZE]);
29
30impl Cursor {
31 fn new(section: u64, offset: u64, size: u32) -> Self {
33 let mut buf = [0u8; u64::SIZE + u64::SIZE + u32::SIZE];
34 buf[..u64::SIZE].copy_from_slice(§ion.to_be_bytes());
35 buf[u64::SIZE..u64::SIZE + u64::SIZE].copy_from_slice(&offset.to_be_bytes());
36 buf[u64::SIZE + u64::SIZE..].copy_from_slice(&size.to_be_bytes());
37 Self(buf)
38 }
39
40 fn section(&self) -> u64 {
42 u64::from_be_bytes(self.0[..u64::SIZE].try_into().unwrap())
43 }
44
45 fn offset(&self) -> u64 {
47 u64::from_be_bytes(self.0[u64::SIZE..u64::SIZE + u64::SIZE].try_into().unwrap())
48 }
49
50 fn size(&self) -> u32 {
52 u32::from_be_bytes(self.0[u64::SIZE + u64::SIZE..].try_into().unwrap())
53 }
54}
55
56impl Read for Cursor {
57 type Cfg = ();
58
59 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
60 <[u8; u64::SIZE + u64::SIZE + u32::SIZE]>::read(buf).map(Self)
61 }
62}
63
64impl CodecWrite for Cursor {
65 fn write(&self, buf: &mut impl BufMut) {
66 self.0.write(buf);
67 }
68}
69
70impl FixedSize for Cursor {
71 const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
72}
73
74impl Span for Cursor {}
75
76impl Array for Cursor {}
77
78impl Deref for Cursor {
79 type Target = [u8];
80 fn deref(&self) -> &Self::Target {
81 &self.0
82 }
83}
84
85impl AsRef<[u8]> for Cursor {
86 fn as_ref(&self) -> &[u8] {
87 &self.0
88 }
89}
90
91impl std::fmt::Debug for Cursor {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 write!(
94 f,
95 "Cursor(section={}, offset={}, size={})",
96 self.section(),
97 self.offset(),
98 self.size()
99 )
100 }
101}
102
103impl std::fmt::Display for Cursor {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 write!(
106 f,
107 "Cursor(section={}, offset={}, size={})",
108 self.section(),
109 self.offset(),
110 self.size()
111 )
112 }
113}
114
115#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
120#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
121pub struct Checkpoint {
122 epoch: u64,
124 section: u64,
126 oversized_size: u64,
128 table_size: u32,
130}
131
132impl Checkpoint {
133 const fn init(table_size: u32) -> Self {
135 Self {
136 table_size,
137 epoch: 0,
138 section: 0,
139 oversized_size: 0,
140 }
141 }
142}
143
144impl Read for Checkpoint {
145 type Cfg = ();
146 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
147 let epoch = u64::read(buf)?;
148 let section = u64::read(buf)?;
149 let oversized_size = u64::read(buf)?;
150 let table_size = u32::read(buf)?;
151 Ok(Self {
152 epoch,
153 section,
154 oversized_size,
155 table_size,
156 })
157 }
158}
159
160impl CodecWrite for Checkpoint {
161 fn write(&self, buf: &mut impl BufMut) {
162 self.epoch.write(buf);
163 self.section.write(buf);
164 self.oversized_size.write(buf);
165 self.table_size.write(buf);
166 }
167}
168
169impl FixedSize for Checkpoint {
170 const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
171}
172
173const TABLE_BLOB_NAME: &[u8] = b"table";
175
176#[derive(Debug, Clone, PartialEq)]
178#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
179struct Entry {
180 epoch: u64,
182 section: u64,
184 position: u64,
186 added: u8,
188 crc: u32,
190}
191
192impl Entry {
193 const FULL_SIZE: usize = Self::SIZE * 2;
195
196 fn compute_crc(epoch: u64, section: u64, position: u64, added: u8) -> u32 {
198 let mut hasher = Crc32::new();
199 hasher.update(&epoch.to_be_bytes());
200 hasher.update(§ion.to_be_bytes());
201 hasher.update(&position.to_be_bytes());
202 hasher.update(&added.to_be_bytes());
203 hasher.finalize().as_u32()
204 }
205
206 fn new(epoch: u64, section: u64, position: u64, added: u8) -> Self {
208 Self {
209 epoch,
210 section,
211 position,
212 added,
213 crc: Self::compute_crc(epoch, section, position, added),
214 }
215 }
216
217 const fn new_empty() -> Self {
219 Self {
220 epoch: 0,
221 section: 0,
222 position: 0,
223 added: 0,
224 crc: 0,
225 }
226 }
227
228 const fn is_empty(&self) -> bool {
230 self.epoch == 0
231 && self.section == 0
232 && self.position == 0
233 && self.added == 0
234 && self.crc == 0
235 }
236
237 fn is_valid(&self) -> bool {
241 Self::compute_crc(self.epoch, self.section, self.position, self.added) == self.crc
242 }
243}
244
245impl FixedSize for Entry {
246 const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u8::SIZE + crc32::Digest::SIZE;
247}
248
249impl CodecWrite for Entry {
250 fn write(&self, buf: &mut impl BufMut) {
251 self.epoch.write(buf);
252 self.section.write(buf);
253 self.position.write(buf);
254 self.added.write(buf);
255 self.crc.write(buf);
256 }
257}
258
259impl Read for Entry {
260 type Cfg = ();
261 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
262 let epoch = u64::read(buf)?;
263 let section = u64::read(buf)?;
264 let position = u64::read(buf)?;
265 let added = u8::read(buf)?;
266 let crc = u32::read(buf)?;
267
268 Ok(Self {
269 epoch,
270 section,
271 position,
272 added,
273 crc,
274 })
275 }
276}
277
278const NO_NEXT_SECTION: u64 = u64::MAX;
280const NO_NEXT_POSITION: u64 = u64::MAX;
281
282#[derive(Debug, Clone, PartialEq)]
290struct Record<K: Array> {
291 key: K,
293 next_section: u64,
296 next_position: u64,
297 value_offset: u64,
299 value_size: u32,
301}
302
303impl<K: Array> Record<K> {
304 fn new(key: K, next: Option<(u64, u64)>, value_offset: u64, value_size: u32) -> Self {
306 let (next_section, next_position) = next.unwrap_or((NO_NEXT_SECTION, NO_NEXT_POSITION));
307 Self {
308 key,
309 next_section,
310 next_position,
311 value_offset,
312 value_size,
313 }
314 }
315
316 const fn next(&self) -> Option<(u64, u64)> {
318 if self.next_section == NO_NEXT_SECTION && self.next_position == NO_NEXT_POSITION {
319 None
320 } else {
321 Some((self.next_section, self.next_position))
322 }
323 }
324}
325
326impl<K: Array> CodecWrite for Record<K> {
327 fn write(&self, buf: &mut impl BufMut) {
328 self.key.write(buf);
329 self.next_section.write(buf);
330 self.next_position.write(buf);
331 self.value_offset.write(buf);
332 self.value_size.write(buf);
333 }
334}
335
336impl<K: Array> Read for Record<K> {
337 type Cfg = ();
338 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
339 let key = K::read(buf)?;
340 let next_section = u64::read(buf)?;
341 let next_position = u64::read(buf)?;
342 let value_offset = u64::read(buf)?;
343 let value_size = u32::read(buf)?;
344
345 Ok(Self {
346 key,
347 next_section,
348 next_position,
349 value_offset,
350 value_size,
351 })
352 }
353}
354
355impl<K: Array> FixedSize for Record<K> {
356 const SIZE: usize = K::SIZE + u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
358}
359
360impl<K: Array> OversizedRecord for Record<K> {
361 fn value_location(&self) -> (u64, u32) {
362 (self.value_offset, self.value_size)
363 }
364
365 fn with_location(mut self, offset: u64, size: u32) -> Self {
366 self.value_offset = offset;
367 self.value_size = size;
368 self
369 }
370}
371
372#[cfg(feature = "arbitrary")]
373impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
374where
375 K: for<'a> arbitrary::Arbitrary<'a>,
376{
377 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
378 Ok(Self {
379 key: K::arbitrary(u)?,
380 next_section: u64::arbitrary(u)?,
381 next_position: u64::arbitrary(u)?,
382 value_offset: u64::arbitrary(u)?,
383 value_size: u32::arbitrary(u)?,
384 })
385 }
386}
387
388pub struct Freezer<E: BufferPooler + Storage + Metrics + Clock, K: Array, V: CodecShared> {
390 context: E,
392
393 table_partition: String,
395 table_size: u32,
396 table_resize_threshold: u64,
397 table_resize_frequency: u8,
398 table_resize_chunk_size: u32,
399
400 table: E::Blob,
402
403 oversized: Oversized<E, Record<K>, V>,
405
406 blob_target_size: u64,
408
409 current_section: u64,
411 next_epoch: u64,
412
413 modified_sections: BTreeSet<u64>,
415 resizable: u32,
416 resize_progress: Option<u32>,
417
418 puts: Counter,
420 gets: Counter,
421 unnecessary_reads: Counter,
422 unnecessary_writes: Counter,
423 resizes: Counter,
424}
425
426impl<E: BufferPooler + Storage + Metrics + Clock, K: Array, V: CodecShared> Freezer<E, K, V> {
427 #[inline]
429 const fn table_offset(table_index: u32) -> u64 {
430 table_index as u64 * Entry::FULL_SIZE as u64
431 }
432
433 fn parse_entries(mut buf: impl Buf) -> Result<(Entry, Entry), Error> {
435 let entry1 = Entry::read(&mut buf)?;
436 let entry2 = Entry::read(&mut buf)?;
437 Ok((entry1, entry2))
438 }
439
440 async fn read_table(blob: &E::Blob, table_index: u32) -> Result<(Entry, Entry), Error> {
442 let offset = Self::table_offset(table_index);
443 let read_buf = blob.read_at(offset, Entry::FULL_SIZE).await?;
444
445 Self::parse_entries(read_buf)
446 }
447
448 async fn recover_entry(
450 blob: &E::Blob,
451 entry: &mut Entry,
452 entry_offset: u64,
453 max_valid_epoch: Option<u64>,
454 max_epoch: &mut u64,
455 max_section: &mut u64,
456 ) -> Result<bool, Error> {
457 if entry.is_empty() {
458 return Ok(false);
459 }
460
461 if !entry.is_valid()
462 || (max_valid_epoch.is_some() && entry.epoch > max_valid_epoch.unwrap())
463 {
464 debug!(
465 valid_epoch = max_valid_epoch,
466 entry_epoch = entry.epoch,
467 "found invalid table entry"
468 );
469 *entry = Entry::new_empty();
470 let zero_buf = vec![0u8; Entry::SIZE];
471 blob.write_at(entry_offset, zero_buf).await?;
472 Ok(true)
473 } else if max_valid_epoch.is_none() && entry.epoch > *max_epoch {
474 *max_epoch = entry.epoch;
476 *max_section = entry.section;
477 Ok(false)
478 } else {
479 Ok(false)
480 }
481 }
482
483 async fn recover_table(
491 pooler: &impl BufferPooler,
492 blob: &E::Blob,
493 table_size: u32,
494 table_resize_frequency: u8,
495 max_valid_epoch: Option<u64>,
496 table_replay_buffer: NonZeroUsize,
497 ) -> Result<(bool, u64, u64, u32), Error> {
498 let blob_size = Self::table_offset(table_size);
500 let mut reader =
501 buffer::Read::from_pooler(pooler, blob.clone(), blob_size, table_replay_buffer);
502
503 let mut modified = false;
505 let mut max_epoch = 0u64;
506 let mut max_section = 0u64;
507 let mut resizable = 0u32;
508 for table_index in 0..table_size {
509 let offset = Self::table_offset(table_index);
510
511 let entry_buf = reader.read(Entry::FULL_SIZE).await?;
513 let (mut entry1, mut entry2) = Self::parse_entries(entry_buf)?;
514
515 let entry1_cleared = Self::recover_entry(
517 blob,
518 &mut entry1,
519 offset,
520 max_valid_epoch,
521 &mut max_epoch,
522 &mut max_section,
523 )
524 .await?;
525 let entry2_cleared = Self::recover_entry(
526 blob,
527 &mut entry2,
528 offset + Entry::SIZE as u64,
529 max_valid_epoch,
530 &mut max_epoch,
531 &mut max_section,
532 )
533 .await?;
534 modified |= entry1_cleared || entry2_cleared;
535
536 if let Some((_, _, added)) = Self::read_latest_entry(&entry1, &entry2) {
538 if added >= table_resize_frequency {
539 resizable += 1;
540 }
541 }
542 }
543
544 Ok((modified, max_epoch, max_section, resizable))
545 }
546
547 const fn compute_write_offset(entry1: &Entry, entry2: &Entry, epoch: u64) -> u64 {
549 if !entry1.is_empty() && entry1.epoch == epoch {
551 return 0;
552 }
553 if !entry2.is_empty() && entry2.epoch == epoch {
554 return Entry::SIZE as u64;
555 }
556
557 match (entry1.is_empty(), entry2.is_empty()) {
559 (true, _) => 0, (_, true) => Entry::SIZE as u64, (false, false) => {
562 if entry1.epoch < entry2.epoch {
563 0
564 } else {
565 Entry::SIZE as u64
566 }
567 }
568 }
569 }
570
571 fn read_latest_entry(entry1: &Entry, entry2: &Entry) -> Option<(u64, u64, u8)> {
573 match (
574 !entry1.is_empty() && entry1.is_valid(),
575 !entry2.is_empty() && entry2.is_valid(),
576 ) {
577 (true, true) => match entry1.epoch.cmp(&entry2.epoch) {
578 Ordering::Greater => Some((entry1.section, entry1.position, entry1.added)),
579 Ordering::Less => Some((entry2.section, entry2.position, entry2.added)),
580 Ordering::Equal => {
581 unreachable!("two valid entries with the same epoch")
582 }
583 },
584 (true, false) => Some((entry1.section, entry1.position, entry1.added)),
585 (false, true) => Some((entry2.section, entry2.position, entry2.added)),
586 (false, false) => None,
587 }
588 }
589
590 async fn update_head(
592 table: &E::Blob,
593 table_index: u32,
594 entry1: &Entry,
595 entry2: &Entry,
596 update: Entry,
597 ) -> Result<(), Error> {
598 let table_offset = Self::table_offset(table_index);
600
601 let start = Self::compute_write_offset(entry1, entry2, update.epoch);
603
604 table
606 .write_at(table_offset + start, update.encode_mut())
607 .await
608 .map_err(Error::Runtime)
609 }
610
611 async fn init_table(blob: &E::Blob, table_size: u32) -> Result<(), Error> {
613 let table_len = Self::table_offset(table_size);
614 blob.resize(table_len).await?;
615 blob.sync().await?;
616 Ok(())
617 }
618
619 pub async fn init(context: E, config: Config<V::Cfg>) -> Result<Self, Error> {
621 Self::init_with_checkpoint(context, config, None).await
622 }
623
624 pub async fn init_with_checkpoint(
627 context: E,
628 config: Config<V::Cfg>,
629 checkpoint: Option<Checkpoint>,
630 ) -> Result<Self, Error> {
631 assert!(
633 config.table_initial_size > 0 && config.table_initial_size.is_power_of_two(),
634 "table_initial_size must be a power of 2"
635 );
636
637 let oversized_cfg = OversizedConfig {
639 index_partition: config.key_partition.clone(),
640 value_partition: config.value_partition.clone(),
641 index_page_cache: config.key_page_cache.clone(),
642 index_write_buffer: config.key_write_buffer,
643 value_write_buffer: config.value_write_buffer,
644 compression: config.value_compression,
645 codec_config: config.codec_config,
646 };
647 let mut oversized: Oversized<E, Record<K>, V> =
648 Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
649
650 let (table, table_len) = context
652 .open(&config.table_partition, TABLE_BLOB_NAME)
653 .await?;
654
655 let (checkpoint, resizable) = match (table_len, checkpoint) {
657 (0, None) => {
659 Self::init_table(&table, config.table_initial_size).await?;
660 (Checkpoint::init(config.table_initial_size), 0)
661 }
662
663 (0, Some(checkpoint)) => {
665 assert_eq!(checkpoint.epoch, 0);
666 assert_eq!(checkpoint.section, 0);
667 assert_eq!(checkpoint.oversized_size, 0);
668 assert_eq!(checkpoint.table_size, 0);
669
670 Self::init_table(&table, config.table_initial_size).await?;
671 (Checkpoint::init(config.table_initial_size), 0)
672 }
673
674 (_, Some(checkpoint)) => {
676 assert!(
677 checkpoint.table_size > 0 && checkpoint.table_size.is_power_of_two(),
678 "table_size must be a power of 2"
679 );
680
681 oversized
683 .rewind(checkpoint.section, checkpoint.oversized_size)
684 .await?;
685
686 oversized.sync(checkpoint.section).await?;
688
689 let expected_table_len = Self::table_offset(checkpoint.table_size);
691 let mut modified = if table_len != expected_table_len {
692 table.resize(expected_table_len).await?;
693 true
694 } else {
695 false
696 };
697
698 let (table_modified, _, _, resizable) = Self::recover_table(
700 &context,
701 &table,
702 checkpoint.table_size,
703 config.table_resize_frequency,
704 Some(checkpoint.epoch),
705 config.table_replay_buffer,
706 )
707 .await?;
708 if table_modified {
709 modified = true;
710 }
711
712 if modified {
714 table.sync().await?;
715 }
716
717 (checkpoint, resizable)
718 }
719
720 (_, None) => {
722 let table_size = (table_len / Entry::FULL_SIZE as u64) as u32;
724 let (modified, max_epoch, max_section, resizable) = Self::recover_table(
725 &context,
726 &table,
727 table_size,
728 config.table_resize_frequency,
729 None,
730 config.table_replay_buffer,
731 )
732 .await?;
733
734 if modified {
736 table.sync().await?;
737 }
738
739 let oversized_size = oversized.size(max_section).await?;
741
742 (
743 Checkpoint {
744 epoch: max_epoch,
745 section: max_section,
746 oversized_size,
747 table_size,
748 },
749 resizable,
750 )
751 }
752 };
753
754 let puts = Counter::default();
756 let gets = Counter::default();
757 let unnecessary_reads = Counter::default();
758 let unnecessary_writes = Counter::default();
759 let resizes = Counter::default();
760 context.register("puts", "number of put operations", puts.clone());
761 context.register("gets", "number of get operations", gets.clone());
762 context.register(
763 "unnecessary_reads",
764 "number of unnecessary reads performed during key lookups",
765 unnecessary_reads.clone(),
766 );
767 context.register(
768 "unnecessary_writes",
769 "number of unnecessary writes performed during resize",
770 unnecessary_writes.clone(),
771 );
772 context.register(
773 "resizes",
774 "number of table resizing operations",
775 resizes.clone(),
776 );
777
778 Ok(Self {
779 context,
780 table_partition: config.table_partition,
781 table_size: checkpoint.table_size,
782 table_resize_threshold: checkpoint.table_size as u64 * RESIZE_THRESHOLD / 100,
783 table_resize_frequency: config.table_resize_frequency,
784 table_resize_chunk_size: config.table_resize_chunk_size,
785 table,
786 oversized,
787 blob_target_size: config.value_target_size,
788 current_section: checkpoint.section,
789 next_epoch: checkpoint.epoch.checked_add(1).expect("epoch overflow"),
790 modified_sections: BTreeSet::new(),
791 resizable,
792 resize_progress: None,
793 puts,
794 gets,
795 unnecessary_reads,
796 unnecessary_writes,
797 resizes,
798 })
799 }
800
801 fn table_index(&self, key: &K) -> u32 {
813 let hash = Crc32::checksum(key.as_ref());
814 hash & (self.table_size - 1)
815 }
816
817 const fn should_resize(&self) -> bool {
819 self.resizable as u64 >= self.table_resize_threshold
820 }
821
822 async fn update_section(&mut self) -> Result<(), Error> {
824 let value_size = self.oversized.value_size(self.current_section).await?;
826
827 if value_size >= self.blob_target_size {
829 self.current_section += 1;
830 debug!(
831 size = value_size,
832 section = self.current_section,
833 "updated section"
834 );
835 }
836
837 Ok(())
838 }
839
840 pub async fn put(&mut self, key: K, value: V) -> Result<Cursor, Error> {
843 self.puts.inc();
844
845 self.update_section().await?;
847
848 let table_index = self.table_index(&key);
850 let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
851 let head = Self::read_latest_entry(&entry1, &entry2);
852
853 let key_entry = Record::new(
855 key,
856 head.map(|(section, position, _)| (section, position)),
857 0,
858 0,
859 );
860
861 let (position, value_offset, value_size) = self
863 .oversized
864 .append(self.current_section, key_entry, &value)
865 .await?;
866
867 let mut added = head.map(|(_, _, added)| added).unwrap_or(0);
871 added = added.saturating_add(1);
872
873 if added == self.table_resize_frequency {
875 self.resizable += 1;
876 }
877
878 self.modified_sections.insert(self.current_section);
880 let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
881 Self::update_head(&self.table, table_index, &entry1, &entry2, new_entry).await?;
882
883 if let Some(resize_progress) = self.resize_progress {
885 if table_index < resize_progress {
886 self.unnecessary_writes.inc();
887
888 if added == self.table_resize_frequency {
890 self.resizable += 1;
891 }
892
893 let new_table_index = self.table_size + table_index;
897 let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
898 Self::update_head(&self.table, new_table_index, &entry1, &entry2, new_entry)
899 .await?;
900 }
901 }
902
903 Ok(Cursor::new(self.current_section, value_offset, value_size))
904 }
905
906 async fn get_cursor(&self, cursor: Cursor) -> Result<V, Error> {
908 let value = self
909 .oversized
910 .get_value(cursor.section(), cursor.offset(), cursor.size())
911 .await?;
912
913 Ok(value)
914 }
915
916 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
918 self.gets.inc();
919
920 let table_index = self.table_index(key);
922 let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
923 let Some((mut section, mut position, _)) = Self::read_latest_entry(&entry1, &entry2) else {
924 return Ok(None);
925 };
926
927 loop {
929 let key_entry = self.oversized.get(section, position).await?;
931
932 if key_entry.key.as_ref() == key.as_ref() {
934 let value = self
935 .oversized
936 .get_value(section, key_entry.value_offset, key_entry.value_size)
937 .await?;
938 return Ok(Some(value));
939 }
940
941 self.unnecessary_reads.inc();
943
944 let Some(next) = key_entry.next() else {
946 break; };
948 section = next.0;
949 position = next.1;
950 }
951
952 Ok(None)
953 }
954
955 pub async fn get<'a>(&'a self, identifier: Identifier<'a, K>) -> Result<Option<V>, Error> {
960 match identifier {
961 Identifier::Cursor(cursor) => self.get_cursor(cursor).await.map(Some),
962 Identifier::Key(key) => self.get_key(key).await,
963 }
964 }
965
966 async fn start_resize(&mut self) -> Result<(), Error> {
968 self.resizes.inc();
969
970 let old_size = self.table_size;
972 let Some(new_size) = old_size.checked_mul(2) else {
973 return Ok(());
974 };
975 self.table.resize(Self::table_offset(new_size)).await?;
976
977 self.resize_progress = Some(0);
979 debug!(old = old_size, new = new_size, "table resize started");
980
981 Ok(())
982 }
983
984 fn rewrite_entries(buf: &mut Vec<u8>, entry1: &Entry, entry2: &Entry, new_entry: &Entry) {
986 if Self::compute_write_offset(entry1, entry2, new_entry.epoch) == 0 {
987 buf.extend_from_slice(&new_entry.encode());
988 buf.extend_from_slice(&entry2.encode());
989 } else {
990 buf.extend_from_slice(&entry1.encode());
991 buf.extend_from_slice(&new_entry.encode());
992 }
993 }
994
995 async fn advance_resize(&mut self) -> Result<(), Error> {
1000 let current_index = self.resize_progress.unwrap();
1002 let old_size = self.table_size;
1003 let chunk_end = (current_index + self.table_resize_chunk_size).min(old_size);
1004 let chunk_size = chunk_end - current_index;
1005
1006 let chunk_bytes = chunk_size as usize * Entry::FULL_SIZE;
1008 let read_offset = Self::table_offset(current_index);
1009 let mut read_buf = self.table.read_at(read_offset, chunk_bytes).await?;
1010
1011 let mut writes = Vec::with_capacity(chunk_bytes);
1013 for _ in 0..chunk_size {
1014 let (entry1, entry2) = Self::parse_entries(&mut read_buf)?;
1016
1017 let head = Self::read_latest_entry(&entry1, &entry2);
1019
1020 let reset_entry = match head {
1022 Some((section, position, added)) => {
1023 if added >= self.table_resize_frequency {
1025 self.resizable -= 1;
1026 }
1027 Entry::new(self.next_epoch, section, position, 0)
1028 }
1029 None => Entry::new_empty(),
1030 };
1031
1032 Self::rewrite_entries(&mut writes, &entry1, &entry2, &reset_entry);
1034 }
1035
1036 let writes = IoBuf::from(writes);
1038 let old_write = self.table.write_at(read_offset, writes.clone());
1039 let new_offset = (old_size as usize * Entry::FULL_SIZE) as u64 + read_offset;
1040 let new_write = self.table.write_at(new_offset, writes);
1041 try_join(old_write, new_write).await?;
1042
1043 if chunk_end >= old_size {
1045 self.table_size = old_size * 2;
1047 self.table_resize_threshold = self.table_size as u64 * RESIZE_THRESHOLD / 100;
1048 self.resize_progress = None;
1049 debug!(
1050 old = old_size,
1051 new = self.table_size,
1052 "table resize completed"
1053 );
1054 } else {
1055 self.resize_progress = Some(chunk_end);
1057 debug!(current = current_index, chunk_end, "table resize progress");
1058 }
1059
1060 Ok(())
1061 }
1062
1063 pub async fn sync(&mut self) -> Result<Checkpoint, Error> {
1073 let syncs: Vec<_> = self
1075 .modified_sections
1076 .iter()
1077 .map(|section| self.oversized.sync(*section))
1078 .collect();
1079 try_join_all(syncs).await?;
1080 self.modified_sections.clear();
1081
1082 if self.should_resize() && self.resize_progress.is_none() {
1084 self.start_resize().await?;
1085 }
1086
1087 if self.resize_progress.is_some() {
1089 self.advance_resize().await?;
1090 }
1091
1092 self.table.sync().await?;
1094 let stored_epoch = self.next_epoch;
1095 self.next_epoch = self.next_epoch.checked_add(1).expect("epoch overflow");
1096
1097 let oversized_size = self.oversized.size(self.current_section).await?;
1099
1100 Ok(Checkpoint {
1101 epoch: stored_epoch,
1102 section: self.current_section,
1103 oversized_size,
1104 table_size: self.table_size,
1105 })
1106 }
1107
1108 pub async fn close(mut self) -> Result<Checkpoint, Error> {
1110 while self.resize_progress.is_some() {
1112 self.advance_resize().await?;
1113 }
1114
1115 let checkpoint = self.sync().await?;
1117
1118 Ok(checkpoint)
1119 }
1120
1121 pub async fn destroy(self) -> Result<(), Error> {
1123 self.oversized.destroy().await?;
1125
1126 drop(self.table);
1128 self.context
1129 .remove(&self.table_partition, Some(TABLE_BLOB_NAME))
1130 .await?;
1131 self.context.remove(&self.table_partition, None).await?;
1132
1133 Ok(())
1134 }
1135
1136 #[cfg(test)]
1140 pub const fn resizing(&self) -> Option<u32> {
1141 self.resize_progress
1142 }
1143
1144 #[cfg(test)]
1146 pub const fn resizable(&self) -> u32 {
1147 self.resizable
1148 }
1149}
1150
1151impl<E: BufferPooler + Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Gettable
1152 for Freezer<E, K, V>
1153{
1154 type Key = K;
1155 type Value = V;
1156 type Error = Error;
1157
1158 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
1159 self.get(Identifier::Key(key)).await
1160 }
1161}
1162
1163impl<E: BufferPooler + Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Updatable
1164 for Freezer<E, K, V>
1165{
1166 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
1167 self.put(key, value).await?;
1168 Ok(())
1169 }
1170}
1171
1172#[cfg(all(test, feature = "arbitrary"))]
1173mod conformance {
1174 use super::*;
1175 use commonware_codec::conformance::CodecConformance;
1176 use commonware_utils::sequence::U64;
1177
1178 commonware_conformance::conformance_tests! {
1179 CodecConformance<Cursor>,
1180 CodecConformance<Checkpoint>,
1181 CodecConformance<Entry>,
1182 CodecConformance<Record<U64>>
1183 }
1184}
1185
1186#[cfg(test)]
1187mod tests {
1188 use super::*;
1189 use crate::kv::tests::{assert_gettable, assert_send, assert_updatable, test_key};
1190 use commonware_macros::test_traced;
1191 use commonware_runtime::{
1192 buffer::paged::CacheRef, deterministic, deterministic::Context, Runner, Storage,
1193 };
1194 use commonware_utils::{
1195 sequence::{FixedBytes, U64},
1196 NZUsize, NZU16,
1197 };
1198
1199 type TestFreezer = Freezer<Context, U64, u64>;
1200
1201 #[allow(dead_code)]
1202 fn assert_freezer_futures_are_send(freezer: &mut TestFreezer, key: U64) {
1203 assert_gettable(freezer, &key);
1204 assert_updatable(freezer, key, 0u64);
1205 }
1206
1207 #[allow(dead_code)]
1208 fn assert_freezer_destroy_is_send(freezer: TestFreezer) {
1209 assert_send(freezer.destroy());
1210 }
1211
1212 #[test_traced]
1213 fn issue_2966_regression() {
1214 let executor = deterministic::Runner::default();
1215 executor.start(|context| async move {
1216 let cfg = super::super::Config {
1217 key_partition: "test-key-index".into(),
1218 key_write_buffer: NZUsize!(1024),
1219 key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
1220 value_partition: "test-value-journal".into(),
1221 value_compression: None,
1222 value_write_buffer: NZUsize!(1024),
1223 value_target_size: 10 * 1024 * 1024,
1224 table_partition: "test-table".into(),
1225 table_initial_size: 4,
1227 table_resize_frequency: 1,
1228 table_resize_chunk_size: 4,
1229 table_replay_buffer: NZUsize!(64 * 1024),
1230 codec_config: (),
1231 };
1232 let mut freezer =
1233 Freezer::<_, FixedBytes<64>, i32>::init(context.with_label("first"), cfg.clone())
1234 .await
1235 .unwrap();
1236
1237 freezer.put(test_key("key0"), 0).await.unwrap();
1240 freezer.put(test_key("key2"), 1).await.unwrap();
1241 freezer.close().await.unwrap();
1242
1243 let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
1244 let table_data = blob.read_at(0, size as usize).await.unwrap().coalesce();
1245
1246 let num_entries = size as usize / Entry::FULL_SIZE;
1248 assert_eq!(num_entries, 8);
1249
1250 let mut both_empty_count = 0;
1253 for entry_idx in 0..num_entries {
1254 let offset = entry_idx * Entry::FULL_SIZE;
1255 let buf = &table_data.as_ref()[offset..offset + Entry::FULL_SIZE];
1256 let (slot0, slot1) =
1257 Freezer::<Context, FixedBytes<64>, i32>::parse_entries(buf).unwrap();
1258 if slot0.is_empty() && slot1.is_empty() {
1259 both_empty_count += 1;
1260 }
1261 }
1262 assert_eq!(both_empty_count, 4);
1264 });
1265 }
1266
1267 #[test_traced]
1268 fn issue_2955_regression() {
1269 let executor = deterministic::Runner::default();
1270 executor.start(|context| async move {
1271 let cfg = super::super::Config {
1272 key_partition: "test-key-index".into(),
1273 key_write_buffer: NZUsize!(1024),
1274 key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
1275 value_partition: "test-value-journal".into(),
1276 value_compression: None,
1277 value_write_buffer: NZUsize!(1024),
1278 value_target_size: 10 * 1024 * 1024,
1279 table_partition: "test-table".into(),
1280 table_initial_size: 4,
1281 table_resize_frequency: 1,
1282 table_resize_chunk_size: 4,
1283 table_replay_buffer: NZUsize!(64 * 1024),
1284 codec_config: (),
1285 };
1286
1287 let checkpoint = {
1289 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
1290 context.with_label("first"),
1291 cfg.clone(),
1292 )
1293 .await
1294 .unwrap();
1295 freezer.put(test_key("key0"), 42).await.unwrap();
1296 freezer.sync().await.unwrap();
1297 freezer.close().await.unwrap()
1298 };
1299
1300 {
1302 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
1303 let entry_data = blob.read_at(0, Entry::FULL_SIZE).await.unwrap();
1304 let mut corrupted = entry_data.coalesce();
1305 corrupted.as_mut()[Entry::SIZE - 4] ^= 0xFF;
1307 corrupted.as_mut()[Entry::FULL_SIZE - 4] ^= 0xFF;
1309 blob.write_at(0, corrupted).await.unwrap();
1310 blob.sync().await.unwrap();
1311 }
1312
1313 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1318 context.with_label("second"),
1319 cfg.clone(),
1320 Some(checkpoint),
1321 )
1322 .await
1323 .unwrap();
1324 drop(freezer);
1325 });
1326 }
1327}