1use super::{Config, Error, Identifier};
2use crate::{
3 journal::segmented::oversized::{
4 Config as OversizedConfig, Oversized, Record as OversizedRecord,
5 },
6 kv, Persistable,
7};
8use commonware_codec::{CodecShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite};
9use commonware_cryptography::{crc32, Crc32, Hasher};
10use commonware_runtime::{buffer, Blob, Buf, BufMut, Clock, IoBufMut, Metrics, Storage};
11use commonware_utils::{Array, Span};
12use futures::future::{try_join, try_join_all};
13use prometheus_client::metrics::counter::Counter;
14use std::{cmp::Ordering, collections::BTreeSet, num::NonZeroUsize, ops::Deref};
15use tracing::debug;
16
17const RESIZE_THRESHOLD: u64 = 50;
20
21#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
26#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
27#[repr(transparent)]
28pub struct Cursor([u8; u64::SIZE + u64::SIZE + u32::SIZE]);
29
30impl Cursor {
31 fn new(section: u64, offset: u64, size: u32) -> Self {
33 let mut buf = [0u8; u64::SIZE + u64::SIZE + u32::SIZE];
34 buf[..u64::SIZE].copy_from_slice(§ion.to_be_bytes());
35 buf[u64::SIZE..u64::SIZE + u64::SIZE].copy_from_slice(&offset.to_be_bytes());
36 buf[u64::SIZE + u64::SIZE..].copy_from_slice(&size.to_be_bytes());
37 Self(buf)
38 }
39
40 fn section(&self) -> u64 {
42 u64::from_be_bytes(self.0[..u64::SIZE].try_into().unwrap())
43 }
44
45 fn offset(&self) -> u64 {
47 u64::from_be_bytes(self.0[u64::SIZE..u64::SIZE + u64::SIZE].try_into().unwrap())
48 }
49
50 fn size(&self) -> u32 {
52 u32::from_be_bytes(self.0[u64::SIZE + u64::SIZE..].try_into().unwrap())
53 }
54}
55
56impl Read for Cursor {
57 type Cfg = ();
58
59 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
60 <[u8; u64::SIZE + u64::SIZE + u32::SIZE]>::read(buf).map(Self)
61 }
62}
63
64impl CodecWrite for Cursor {
65 fn write(&self, buf: &mut impl BufMut) {
66 self.0.write(buf);
67 }
68}
69
70impl FixedSize for Cursor {
71 const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
72}
73
74impl Span for Cursor {}
75
76impl Array for Cursor {}
77
78impl Deref for Cursor {
79 type Target = [u8];
80 fn deref(&self) -> &Self::Target {
81 &self.0
82 }
83}
84
85impl AsRef<[u8]> for Cursor {
86 fn as_ref(&self) -> &[u8] {
87 &self.0
88 }
89}
90
91impl std::fmt::Debug for Cursor {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 write!(
94 f,
95 "Cursor(section={}, offset={}, size={})",
96 self.section(),
97 self.offset(),
98 self.size()
99 )
100 }
101}
102
103impl std::fmt::Display for Cursor {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 write!(
106 f,
107 "Cursor(section={}, offset={}, size={})",
108 self.section(),
109 self.offset(),
110 self.size()
111 )
112 }
113}
114
115#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
120#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
121pub struct Checkpoint {
122 epoch: u64,
124 section: u64,
126 oversized_size: u64,
128 table_size: u32,
130}
131
132impl Checkpoint {
133 const fn init(table_size: u32) -> Self {
135 Self {
136 table_size,
137 epoch: 0,
138 section: 0,
139 oversized_size: 0,
140 }
141 }
142}
143
144impl Read for Checkpoint {
145 type Cfg = ();
146 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
147 let epoch = u64::read(buf)?;
148 let section = u64::read(buf)?;
149 let oversized_size = u64::read(buf)?;
150 let table_size = u32::read(buf)?;
151 Ok(Self {
152 epoch,
153 section,
154 oversized_size,
155 table_size,
156 })
157 }
158}
159
160impl CodecWrite for Checkpoint {
161 fn write(&self, buf: &mut impl BufMut) {
162 self.epoch.write(buf);
163 self.section.write(buf);
164 self.oversized_size.write(buf);
165 self.table_size.write(buf);
166 }
167}
168
169impl FixedSize for Checkpoint {
170 const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
171}
172
173const TABLE_BLOB_NAME: &[u8] = b"table";
175
176#[derive(Debug, Clone, PartialEq)]
178#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
179struct Entry {
180 epoch: u64,
182 section: u64,
184 position: u64,
186 added: u8,
188 crc: u32,
190}
191
192impl Entry {
193 const FULL_SIZE: usize = Self::SIZE * 2;
195
196 fn compute_crc(epoch: u64, section: u64, position: u64, added: u8) -> u32 {
198 let mut hasher = Crc32::new();
199 hasher.update(&epoch.to_be_bytes());
200 hasher.update(§ion.to_be_bytes());
201 hasher.update(&position.to_be_bytes());
202 hasher.update(&added.to_be_bytes());
203 hasher.finalize().as_u32()
204 }
205
206 fn new(epoch: u64, section: u64, position: u64, added: u8) -> Self {
208 Self {
209 epoch,
210 section,
211 position,
212 added,
213 crc: Self::compute_crc(epoch, section, position, added),
214 }
215 }
216
217 const fn new_empty() -> Self {
219 Self {
220 epoch: 0,
221 section: 0,
222 position: 0,
223 added: 0,
224 crc: 0,
225 }
226 }
227
228 const fn is_empty(&self) -> bool {
230 self.epoch == 0
231 && self.section == 0
232 && self.position == 0
233 && self.added == 0
234 && self.crc == 0
235 }
236
237 fn is_valid(&self) -> bool {
241 Self::compute_crc(self.epoch, self.section, self.position, self.added) == self.crc
242 }
243}
244
245impl FixedSize for Entry {
246 const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u8::SIZE + crc32::Digest::SIZE;
247}
248
249impl CodecWrite for Entry {
250 fn write(&self, buf: &mut impl BufMut) {
251 self.epoch.write(buf);
252 self.section.write(buf);
253 self.position.write(buf);
254 self.added.write(buf);
255 self.crc.write(buf);
256 }
257}
258
259impl Read for Entry {
260 type Cfg = ();
261 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
262 let epoch = u64::read(buf)?;
263 let section = u64::read(buf)?;
264 let position = u64::read(buf)?;
265 let added = u8::read(buf)?;
266 let crc = u32::read(buf)?;
267
268 Ok(Self {
269 epoch,
270 section,
271 position,
272 added,
273 crc,
274 })
275 }
276}
277
278const NO_NEXT_SECTION: u64 = u64::MAX;
280const NO_NEXT_POSITION: u64 = u64::MAX;
281
282#[derive(Debug, Clone, PartialEq)]
290struct Record<K: Array> {
291 key: K,
293 next_section: u64,
296 next_position: u64,
297 value_offset: u64,
299 value_size: u32,
301}
302
303impl<K: Array> Record<K> {
304 fn new(key: K, next: Option<(u64, u64)>, value_offset: u64, value_size: u32) -> Self {
306 let (next_section, next_position) = next.unwrap_or((NO_NEXT_SECTION, NO_NEXT_POSITION));
307 Self {
308 key,
309 next_section,
310 next_position,
311 value_offset,
312 value_size,
313 }
314 }
315
316 const fn next(&self) -> Option<(u64, u64)> {
318 if self.next_section == NO_NEXT_SECTION && self.next_position == NO_NEXT_POSITION {
319 None
320 } else {
321 Some((self.next_section, self.next_position))
322 }
323 }
324}
325
326impl<K: Array> CodecWrite for Record<K> {
327 fn write(&self, buf: &mut impl BufMut) {
328 self.key.write(buf);
329 self.next_section.write(buf);
330 self.next_position.write(buf);
331 self.value_offset.write(buf);
332 self.value_size.write(buf);
333 }
334}
335
336impl<K: Array> Read for Record<K> {
337 type Cfg = ();
338 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
339 let key = K::read(buf)?;
340 let next_section = u64::read(buf)?;
341 let next_position = u64::read(buf)?;
342 let value_offset = u64::read(buf)?;
343 let value_size = u32::read(buf)?;
344
345 Ok(Self {
346 key,
347 next_section,
348 next_position,
349 value_offset,
350 value_size,
351 })
352 }
353}
354
355impl<K: Array> FixedSize for Record<K> {
356 const SIZE: usize = K::SIZE + u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
358}
359
360impl<K: Array> OversizedRecord for Record<K> {
361 fn value_location(&self) -> (u64, u32) {
362 (self.value_offset, self.value_size)
363 }
364
365 fn with_location(mut self, offset: u64, size: u32) -> Self {
366 self.value_offset = offset;
367 self.value_size = size;
368 self
369 }
370}
371
372#[cfg(feature = "arbitrary")]
373impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
374where
375 K: for<'a> arbitrary::Arbitrary<'a>,
376{
377 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
378 Ok(Self {
379 key: K::arbitrary(u)?,
380 next_section: u64::arbitrary(u)?,
381 next_position: u64::arbitrary(u)?,
382 value_offset: u64::arbitrary(u)?,
383 value_size: u32::arbitrary(u)?,
384 })
385 }
386}
387
388pub struct Freezer<E: Storage + Metrics + Clock, K: Array, V: CodecShared> {
390 context: E,
392
393 table_partition: String,
395 table_size: u32,
396 table_resize_threshold: u64,
397 table_resize_frequency: u8,
398 table_resize_chunk_size: u32,
399
400 table: E::Blob,
402
403 oversized: Oversized<E, Record<K>, V>,
405
406 blob_target_size: u64,
408
409 current_section: u64,
411 next_epoch: u64,
412
413 modified_sections: BTreeSet<u64>,
415 resizable: u32,
416 resize_progress: Option<u32>,
417
418 puts: Counter,
420 gets: Counter,
421 unnecessary_reads: Counter,
422 unnecessary_writes: Counter,
423 resizes: Counter,
424}
425
426impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Freezer<E, K, V> {
427 #[inline]
429 const fn table_offset(table_index: u32) -> u64 {
430 table_index as u64 * Entry::FULL_SIZE as u64
431 }
432
433 fn parse_entries(buf: &[u8]) -> Result<(Entry, Entry), Error> {
435 let mut buf1 = &buf[0..Entry::SIZE];
436 let entry1 = Entry::read(&mut buf1)?;
437 let mut buf2 = &buf[Entry::SIZE..Entry::FULL_SIZE];
438 let entry2 = Entry::read(&mut buf2)?;
439 Ok((entry1, entry2))
440 }
441
442 async fn read_table(blob: &E::Blob, table_index: u32) -> Result<(Entry, Entry), Error> {
444 let offset = Self::table_offset(table_index);
445 let read_buf = blob
446 .read_at(offset, IoBufMut::zeroed(Entry::FULL_SIZE))
447 .await?;
448
449 Self::parse_entries(read_buf.coalesce().as_ref())
450 }
451
452 async fn recover_entry(
454 blob: &E::Blob,
455 entry: &mut Entry,
456 entry_offset: u64,
457 max_valid_epoch: Option<u64>,
458 max_epoch: &mut u64,
459 max_section: &mut u64,
460 ) -> Result<bool, Error> {
461 if entry.is_empty() {
462 return Ok(false);
463 }
464
465 if !entry.is_valid()
466 || (max_valid_epoch.is_some() && entry.epoch > max_valid_epoch.unwrap())
467 {
468 debug!(
469 valid_epoch = max_valid_epoch,
470 entry_epoch = entry.epoch,
471 "found invalid table entry"
472 );
473 *entry = Entry::new_empty();
474 let zero_buf = vec![0u8; Entry::SIZE];
475 blob.write_at(entry_offset, zero_buf).await?;
476 Ok(true)
477 } else if max_valid_epoch.is_none() && entry.epoch > *max_epoch {
478 *max_epoch = entry.epoch;
480 *max_section = entry.section;
481 Ok(false)
482 } else {
483 Ok(false)
484 }
485 }
486
487 async fn recover_table(
495 blob: &E::Blob,
496 table_size: u32,
497 table_resize_frequency: u8,
498 max_valid_epoch: Option<u64>,
499 table_replay_buffer: NonZeroUsize,
500 ) -> Result<(bool, u64, u64, u32), Error> {
501 let blob_size = Self::table_offset(table_size);
503 let mut reader = buffer::Read::new(blob.clone(), blob_size, table_replay_buffer);
504
505 let mut modified = false;
507 let mut max_epoch = 0u64;
508 let mut max_section = 0u64;
509 let mut resizable = 0u32;
510 for table_index in 0..table_size {
511 let offset = Self::table_offset(table_index);
512
513 let mut buf = [0u8; Entry::FULL_SIZE];
515 reader.read_exact(&mut buf, Entry::FULL_SIZE).await?;
516 let (mut entry1, mut entry2) = Self::parse_entries(&buf)?;
517
518 let entry1_cleared = Self::recover_entry(
520 blob,
521 &mut entry1,
522 offset,
523 max_valid_epoch,
524 &mut max_epoch,
525 &mut max_section,
526 )
527 .await?;
528 let entry2_cleared = Self::recover_entry(
529 blob,
530 &mut entry2,
531 offset + Entry::SIZE as u64,
532 max_valid_epoch,
533 &mut max_epoch,
534 &mut max_section,
535 )
536 .await?;
537 modified |= entry1_cleared || entry2_cleared;
538
539 if let Some((_, _, added)) = Self::read_latest_entry(&entry1, &entry2) {
541 if added >= table_resize_frequency {
542 resizable += 1;
543 }
544 }
545 }
546
547 Ok((modified, max_epoch, max_section, resizable))
548 }
549
550 const fn compute_write_offset(entry1: &Entry, entry2: &Entry, epoch: u64) -> u64 {
552 if !entry1.is_empty() && entry1.epoch == epoch {
554 return 0;
555 }
556 if !entry2.is_empty() && entry2.epoch == epoch {
557 return Entry::SIZE as u64;
558 }
559
560 match (entry1.is_empty(), entry2.is_empty()) {
562 (true, _) => 0, (_, true) => Entry::SIZE as u64, (false, false) => {
565 if entry1.epoch < entry2.epoch {
566 0
567 } else {
568 Entry::SIZE as u64
569 }
570 }
571 }
572 }
573
574 fn read_latest_entry(entry1: &Entry, entry2: &Entry) -> Option<(u64, u64, u8)> {
576 match (
577 !entry1.is_empty() && entry1.is_valid(),
578 !entry2.is_empty() && entry2.is_valid(),
579 ) {
580 (true, true) => match entry1.epoch.cmp(&entry2.epoch) {
581 Ordering::Greater => Some((entry1.section, entry1.position, entry1.added)),
582 Ordering::Less => Some((entry2.section, entry2.position, entry2.added)),
583 Ordering::Equal => {
584 unreachable!("two valid entries with the same epoch")
585 }
586 },
587 (true, false) => Some((entry1.section, entry1.position, entry1.added)),
588 (false, true) => Some((entry2.section, entry2.position, entry2.added)),
589 (false, false) => None,
590 }
591 }
592
593 async fn update_head(
595 table: &E::Blob,
596 table_index: u32,
597 entry1: &Entry,
598 entry2: &Entry,
599 update: Entry,
600 ) -> Result<(), Error> {
601 let table_offset = Self::table_offset(table_index);
603
604 let start = Self::compute_write_offset(entry1, entry2, update.epoch);
606
607 table
609 .write_at(table_offset + start, update.encode_mut())
610 .await
611 .map_err(Error::Runtime)
612 }
613
614 async fn init_table(blob: &E::Blob, table_size: u32) -> Result<(), Error> {
616 let table_len = Self::table_offset(table_size);
617 blob.resize(table_len).await?;
618 blob.sync().await?;
619 Ok(())
620 }
621
622 pub async fn init(context: E, config: Config<V::Cfg>) -> Result<Self, Error> {
624 Self::init_with_checkpoint(context, config, None).await
625 }
626
627 pub async fn init_with_checkpoint(
630 context: E,
631 config: Config<V::Cfg>,
632 checkpoint: Option<Checkpoint>,
633 ) -> Result<Self, Error> {
634 assert!(
636 config.table_initial_size > 0 && config.table_initial_size.is_power_of_two(),
637 "table_initial_size must be a power of 2"
638 );
639
640 let oversized_cfg = OversizedConfig {
642 index_partition: config.key_partition.clone(),
643 value_partition: config.value_partition.clone(),
644 index_page_cache: config.key_page_cache.clone(),
645 index_write_buffer: config.key_write_buffer,
646 value_write_buffer: config.value_write_buffer,
647 compression: config.value_compression,
648 codec_config: config.codec_config,
649 };
650 let mut oversized: Oversized<E, Record<K>, V> =
651 Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
652
653 let (table, table_len) = context
655 .open(&config.table_partition, TABLE_BLOB_NAME)
656 .await?;
657
658 let (checkpoint, resizable) = match (table_len, checkpoint) {
660 (0, None) => {
662 Self::init_table(&table, config.table_initial_size).await?;
663 (Checkpoint::init(config.table_initial_size), 0)
664 }
665
666 (0, Some(checkpoint)) => {
668 assert_eq!(checkpoint.epoch, 0);
669 assert_eq!(checkpoint.section, 0);
670 assert_eq!(checkpoint.oversized_size, 0);
671 assert_eq!(checkpoint.table_size, 0);
672
673 Self::init_table(&table, config.table_initial_size).await?;
674 (Checkpoint::init(config.table_initial_size), 0)
675 }
676
677 (_, Some(checkpoint)) => {
679 assert!(
680 checkpoint.table_size > 0 && checkpoint.table_size.is_power_of_two(),
681 "table_size must be a power of 2"
682 );
683
684 oversized
686 .rewind(checkpoint.section, checkpoint.oversized_size)
687 .await?;
688
689 oversized.sync(checkpoint.section).await?;
691
692 let expected_table_len = Self::table_offset(checkpoint.table_size);
694 let mut modified = if table_len != expected_table_len {
695 table.resize(expected_table_len).await?;
696 true
697 } else {
698 false
699 };
700
701 let (table_modified, _, _, resizable) = Self::recover_table(
703 &table,
704 checkpoint.table_size,
705 config.table_resize_frequency,
706 Some(checkpoint.epoch),
707 config.table_replay_buffer,
708 )
709 .await?;
710 if table_modified {
711 modified = true;
712 }
713
714 if modified {
716 table.sync().await?;
717 }
718
719 (checkpoint, resizable)
720 }
721
722 (_, None) => {
724 let table_size = (table_len / Entry::FULL_SIZE as u64) as u32;
726 let (modified, max_epoch, max_section, resizable) = Self::recover_table(
727 &table,
728 table_size,
729 config.table_resize_frequency,
730 None,
731 config.table_replay_buffer,
732 )
733 .await?;
734
735 if modified {
737 table.sync().await?;
738 }
739
740 let oversized_size = oversized.size(max_section).await?;
742
743 (
744 Checkpoint {
745 epoch: max_epoch,
746 section: max_section,
747 oversized_size,
748 table_size,
749 },
750 resizable,
751 )
752 }
753 };
754
755 let puts = Counter::default();
757 let gets = Counter::default();
758 let unnecessary_reads = Counter::default();
759 let unnecessary_writes = Counter::default();
760 let resizes = Counter::default();
761 context.register("puts", "number of put operations", puts.clone());
762 context.register("gets", "number of get operations", gets.clone());
763 context.register(
764 "unnecessary_reads",
765 "number of unnecessary reads performed during key lookups",
766 unnecessary_reads.clone(),
767 );
768 context.register(
769 "unnecessary_writes",
770 "number of unnecessary writes performed during resize",
771 unnecessary_writes.clone(),
772 );
773 context.register(
774 "resizes",
775 "number of table resizing operations",
776 resizes.clone(),
777 );
778
779 Ok(Self {
780 context,
781 table_partition: config.table_partition,
782 table_size: checkpoint.table_size,
783 table_resize_threshold: checkpoint.table_size as u64 * RESIZE_THRESHOLD / 100,
784 table_resize_frequency: config.table_resize_frequency,
785 table_resize_chunk_size: config.table_resize_chunk_size,
786 table,
787 oversized,
788 blob_target_size: config.value_target_size,
789 current_section: checkpoint.section,
790 next_epoch: checkpoint.epoch.checked_add(1).expect("epoch overflow"),
791 modified_sections: BTreeSet::new(),
792 resizable,
793 resize_progress: None,
794 puts,
795 gets,
796 unnecessary_reads,
797 unnecessary_writes,
798 resizes,
799 })
800 }
801
802 fn table_index(&self, key: &K) -> u32 {
814 let hash = Crc32::checksum(key.as_ref());
815 hash & (self.table_size - 1)
816 }
817
818 const fn should_resize(&self) -> bool {
820 self.resizable as u64 >= self.table_resize_threshold
821 }
822
823 async fn update_section(&mut self) -> Result<(), Error> {
825 let value_size = self.oversized.value_size(self.current_section).await?;
827
828 if value_size >= self.blob_target_size {
830 self.current_section += 1;
831 debug!(
832 size = value_size,
833 section = self.current_section,
834 "updated section"
835 );
836 }
837
838 Ok(())
839 }
840
841 pub async fn put(&mut self, key: K, value: V) -> Result<Cursor, Error> {
844 self.puts.inc();
845
846 self.update_section().await?;
848
849 let table_index = self.table_index(&key);
851 let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
852 let head = Self::read_latest_entry(&entry1, &entry2);
853
854 let key_entry = Record::new(
856 key,
857 head.map(|(section, position, _)| (section, position)),
858 0,
859 0,
860 );
861
862 let (position, value_offset, value_size) = self
864 .oversized
865 .append(self.current_section, key_entry, &value)
866 .await?;
867
868 let mut added = head.map(|(_, _, added)| added).unwrap_or(0);
872 added = added.saturating_add(1);
873
874 if added == self.table_resize_frequency {
876 self.resizable += 1;
877 }
878
879 self.modified_sections.insert(self.current_section);
881 let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
882 Self::update_head(&self.table, table_index, &entry1, &entry2, new_entry).await?;
883
884 if let Some(resize_progress) = self.resize_progress {
886 if table_index < resize_progress {
887 self.unnecessary_writes.inc();
888
889 if added == self.table_resize_frequency {
891 self.resizable += 1;
892 }
893
894 let new_table_index = self.table_size + table_index;
898 let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
899 Self::update_head(&self.table, new_table_index, &entry1, &entry2, new_entry)
900 .await?;
901 }
902 }
903
904 Ok(Cursor::new(self.current_section, value_offset, value_size))
905 }
906
907 async fn get_cursor(&self, cursor: Cursor) -> Result<V, Error> {
909 let value = self
910 .oversized
911 .get_value(cursor.section(), cursor.offset(), cursor.size())
912 .await?;
913
914 Ok(value)
915 }
916
917 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
919 self.gets.inc();
920
921 let table_index = self.table_index(key);
923 let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
924 let Some((mut section, mut position, _)) = Self::read_latest_entry(&entry1, &entry2) else {
925 return Ok(None);
926 };
927
928 loop {
930 let key_entry = self.oversized.get(section, position).await?;
932
933 if key_entry.key.as_ref() == key.as_ref() {
935 let value = self
936 .oversized
937 .get_value(section, key_entry.value_offset, key_entry.value_size)
938 .await?;
939 return Ok(Some(value));
940 }
941
942 self.unnecessary_reads.inc();
944
945 let Some(next) = key_entry.next() else {
947 break; };
949 section = next.0;
950 position = next.1;
951 }
952
953 Ok(None)
954 }
955
956 pub async fn get<'a>(&'a self, identifier: Identifier<'a, K>) -> Result<Option<V>, Error> {
961 match identifier {
962 Identifier::Cursor(cursor) => self.get_cursor(cursor).await.map(Some),
963 Identifier::Key(key) => self.get_key(key).await,
964 }
965 }
966
967 async fn start_resize(&mut self) -> Result<(), Error> {
969 self.resizes.inc();
970
971 let old_size = self.table_size;
973 let Some(new_size) = old_size.checked_mul(2) else {
974 return Ok(());
975 };
976 self.table.resize(Self::table_offset(new_size)).await?;
977
978 self.resize_progress = Some(0);
980 debug!(old = old_size, new = new_size, "table resize started");
981
982 Ok(())
983 }
984
985 fn rewrite_entries(buf: &mut Vec<u8>, entry1: &Entry, entry2: &Entry, new_entry: &Entry) {
987 if Self::compute_write_offset(entry1, entry2, new_entry.epoch) == 0 {
988 buf.extend_from_slice(&new_entry.encode());
989 buf.extend_from_slice(&entry2.encode());
990 } else {
991 buf.extend_from_slice(&entry1.encode());
992 buf.extend_from_slice(&new_entry.encode());
993 }
994 }
995
996 async fn advance_resize(&mut self) -> Result<(), Error> {
1001 let current_index = self.resize_progress.unwrap();
1003 let old_size = self.table_size;
1004 let chunk_end = (current_index + self.table_resize_chunk_size).min(old_size);
1005 let chunk_size = chunk_end - current_index;
1006
1007 let chunk_bytes = chunk_size as usize * Entry::FULL_SIZE;
1009 let read_offset = Self::table_offset(current_index);
1010 let read_buf = self
1011 .table
1012 .read_at(read_offset, IoBufMut::zeroed(chunk_bytes))
1013 .await?
1014 .coalesce();
1015
1016 let mut writes = Vec::with_capacity(chunk_bytes);
1018 for i in 0..chunk_size {
1019 let entry_offset = i as usize * Entry::FULL_SIZE;
1021 let entry_end = entry_offset + Entry::FULL_SIZE;
1022 let entry_buf = &read_buf.as_ref()[entry_offset..entry_end];
1023
1024 let (entry1, entry2) = Self::parse_entries(entry_buf)?;
1026
1027 let head = Self::read_latest_entry(&entry1, &entry2);
1029
1030 let reset_entry = match head {
1032 Some((section, position, added)) => {
1033 if added >= self.table_resize_frequency {
1035 self.resizable -= 1;
1036 }
1037 Entry::new(self.next_epoch, section, position, 0)
1038 }
1039 None => Entry::new_empty(),
1040 };
1041
1042 Self::rewrite_entries(&mut writes, &entry1, &entry2, &reset_entry);
1044 }
1045
1046 let old_write = self.table.write_at(read_offset, writes.clone());
1048 let new_offset = (old_size as usize * Entry::FULL_SIZE) as u64 + read_offset;
1049 let new_write = self.table.write_at(new_offset, writes);
1050 try_join(old_write, new_write).await?;
1051
1052 if chunk_end >= old_size {
1054 self.table_size = old_size * 2;
1056 self.table_resize_threshold = self.table_size as u64 * RESIZE_THRESHOLD / 100;
1057 self.resize_progress = None;
1058 debug!(
1059 old = old_size,
1060 new = self.table_size,
1061 "table resize completed"
1062 );
1063 } else {
1064 self.resize_progress = Some(chunk_end);
1066 debug!(current = current_index, chunk_end, "table resize progress");
1067 }
1068
1069 Ok(())
1070 }
1071
1072 pub async fn sync(&mut self) -> Result<Checkpoint, Error> {
1080 let syncs: Vec<_> = self
1082 .modified_sections
1083 .iter()
1084 .map(|section| self.oversized.sync(*section))
1085 .collect();
1086 try_join_all(syncs).await?;
1087 self.modified_sections.clear();
1088
1089 if self.should_resize() && self.resize_progress.is_none() {
1091 self.start_resize().await?;
1092 }
1093
1094 if self.resize_progress.is_some() {
1096 self.advance_resize().await?;
1097 }
1098
1099 self.table.sync().await?;
1101 let stored_epoch = self.next_epoch;
1102 self.next_epoch = self.next_epoch.checked_add(1).expect("epoch overflow");
1103
1104 let oversized_size = self.oversized.size(self.current_section).await?;
1106
1107 Ok(Checkpoint {
1108 epoch: stored_epoch,
1109 section: self.current_section,
1110 oversized_size,
1111 table_size: self.table_size,
1112 })
1113 }
1114
1115 pub async fn close(mut self) -> Result<Checkpoint, Error> {
1117 while self.resize_progress.is_some() {
1119 self.advance_resize().await?;
1120 }
1121
1122 let checkpoint = self.sync().await?;
1124
1125 Ok(checkpoint)
1126 }
1127
1128 pub async fn destroy(self) -> Result<(), Error> {
1130 self.oversized.destroy().await?;
1132
1133 drop(self.table);
1135 self.context
1136 .remove(&self.table_partition, Some(TABLE_BLOB_NAME))
1137 .await?;
1138 self.context.remove(&self.table_partition, None).await?;
1139
1140 Ok(())
1141 }
1142
1143 #[cfg(test)]
1147 pub const fn resizing(&self) -> Option<u32> {
1148 self.resize_progress
1149 }
1150
1151 #[cfg(test)]
1153 pub const fn resizable(&self) -> u32 {
1154 self.resizable
1155 }
1156}
1157
1158impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Gettable for Freezer<E, K, V> {
1159 type Key = K;
1160 type Value = V;
1161 type Error = Error;
1162
1163 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
1164 self.get(Identifier::Key(key)).await
1165 }
1166}
1167
1168impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Updatable for Freezer<E, K, V> {
1169 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
1170 self.put(key, value).await?;
1171 Ok(())
1172 }
1173}
1174
1175impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Persistable for Freezer<E, K, V> {
1176 type Error = Error;
1177
1178 async fn commit(&mut self) -> Result<(), Self::Error> {
1179 self.sync().await?;
1180 Ok(())
1181 }
1182
1183 async fn sync(&mut self) -> Result<(), Self::Error> {
1184 self.sync().await?;
1185 Ok(())
1186 }
1187
1188 async fn destroy(self) -> Result<(), Self::Error> {
1189 self.destroy().await?;
1190 Ok(())
1191 }
1192}
1193
1194#[cfg(all(test, feature = "arbitrary"))]
1195mod conformance {
1196 use super::*;
1197 use commonware_codec::conformance::CodecConformance;
1198 use commonware_utils::sequence::U64;
1199
1200 commonware_conformance::conformance_tests! {
1201 CodecConformance<Cursor>,
1202 CodecConformance<Checkpoint>,
1203 CodecConformance<Entry>,
1204 CodecConformance<Record<U64>>
1205 }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use super::*;
1211 use crate::kv::tests::{assert_gettable, assert_send, assert_updatable, test_key};
1212 use commonware_macros::test_traced;
1213 use commonware_runtime::{
1214 buffer::paged::CacheRef, deterministic, deterministic::Context, IoBufMut, Runner, Storage,
1215 };
1216 use commonware_utils::{
1217 sequence::{FixedBytes, U64},
1218 NZUsize, NZU16,
1219 };
1220
1221 type TestFreezer = Freezer<Context, U64, u64>;
1222
1223 #[allow(dead_code)]
1224 fn assert_freezer_futures_are_send(freezer: &mut TestFreezer, key: U64) {
1225 assert_gettable(freezer, &key);
1226 assert_updatable(freezer, key, 0u64);
1227 }
1228
1229 #[allow(dead_code)]
1230 fn assert_freezer_destroy_is_send(freezer: TestFreezer) {
1231 assert_send(freezer.destroy());
1232 }
1233
1234 #[test_traced]
1235 fn issue_2966_regression() {
1236 let executor = deterministic::Runner::default();
1237 executor.start(|context| async move {
1238 let cfg = super::super::Config {
1239 key_partition: "test_key_index".into(),
1240 key_write_buffer: NZUsize!(1024),
1241 key_page_cache: CacheRef::new(NZU16!(1024), NZUsize!(10)),
1242 value_partition: "test_value_journal".into(),
1243 value_compression: None,
1244 value_write_buffer: NZUsize!(1024),
1245 value_target_size: 10 * 1024 * 1024,
1246 table_partition: "test_table".into(),
1247 table_initial_size: 4,
1249 table_resize_frequency: 1,
1250 table_resize_chunk_size: 4,
1251 table_replay_buffer: NZUsize!(64 * 1024),
1252 codec_config: (),
1253 };
1254 let mut freezer =
1255 Freezer::<_, FixedBytes<64>, i32>::init(context.with_label("first"), cfg.clone())
1256 .await
1257 .unwrap();
1258
1259 freezer.put(test_key("key0"), 0).await.unwrap();
1262 freezer.put(test_key("key2"), 1).await.unwrap();
1263 freezer.close().await.unwrap();
1264
1265 let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
1266 let table_data = blob
1267 .read_at(0, IoBufMut::zeroed(size as usize))
1268 .await
1269 .unwrap()
1270 .coalesce();
1271
1272 let num_entries = size as usize / Entry::FULL_SIZE;
1274 assert_eq!(num_entries, 8);
1275
1276 let mut both_empty_count = 0;
1279 for entry_idx in 0..num_entries {
1280 let offset = entry_idx * Entry::FULL_SIZE;
1281 let buf = &table_data.as_ref()[offset..offset + Entry::FULL_SIZE];
1282 let (slot0, slot1) =
1283 Freezer::<Context, FixedBytes<64>, i32>::parse_entries(buf).unwrap();
1284 if slot0.is_empty() && slot1.is_empty() {
1285 both_empty_count += 1;
1286 }
1287 }
1288 assert_eq!(both_empty_count, 4);
1290 });
1291 }
1292
1293 #[test_traced]
1294 fn issue_2955_regression() {
1295 let executor = deterministic::Runner::default();
1296 executor.start(|context| async move {
1297 let cfg = super::super::Config {
1298 key_partition: "test_key_index".into(),
1299 key_write_buffer: NZUsize!(1024),
1300 key_page_cache: CacheRef::new(NZU16!(1024), NZUsize!(10)),
1301 value_partition: "test_value_journal".into(),
1302 value_compression: None,
1303 value_write_buffer: NZUsize!(1024),
1304 value_target_size: 10 * 1024 * 1024,
1305 table_partition: "test_table".into(),
1306 table_initial_size: 4,
1307 table_resize_frequency: 1,
1308 table_resize_chunk_size: 4,
1309 table_replay_buffer: NZUsize!(64 * 1024),
1310 codec_config: (),
1311 };
1312
1313 let checkpoint = {
1315 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
1316 context.with_label("first"),
1317 cfg.clone(),
1318 )
1319 .await
1320 .unwrap();
1321 freezer.put(test_key("key0"), 42).await.unwrap();
1322 freezer.sync().await.unwrap();
1323 freezer.close().await.unwrap()
1324 };
1325
1326 {
1328 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
1329 let entry_data = blob
1330 .read_at(0, IoBufMut::zeroed(Entry::FULL_SIZE))
1331 .await
1332 .unwrap();
1333 let mut corrupted = entry_data.coalesce();
1334 corrupted.as_mut()[Entry::SIZE - 4] ^= 0xFF;
1336 corrupted.as_mut()[Entry::FULL_SIZE - 4] ^= 0xFF;
1338 blob.write_at(0, corrupted).await.unwrap();
1339 blob.sync().await.unwrap();
1340 }
1341
1342 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1347 context.with_label("second"),
1348 cfg.clone(),
1349 Some(checkpoint),
1350 )
1351 .await
1352 .unwrap();
1353 drop(freezer);
1354 });
1355 }
1356}