1use super::{Config, Error, Identifier};
2use crate::{
3 journal::segmented::oversized::{
4 Config as OversizedConfig, Oversized, Record as OversizedRecord,
5 },
6 kv, Persistable,
7};
8use bytes::{Buf, BufMut};
9use commonware_codec::{CodecShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite};
10use commonware_cryptography::{crc32, Crc32, Hasher};
11use commonware_runtime::{buffer, Blob, Clock, Metrics, Storage};
12use commonware_utils::{Array, Span};
13use futures::future::{try_join, try_join_all};
14use prometheus_client::metrics::counter::Counter;
15use std::{cmp::Ordering, collections::BTreeSet, num::NonZeroUsize, ops::Deref};
16use tracing::debug;
17
18const RESIZE_THRESHOLD: u64 = 50;
21
22#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
27#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
28#[repr(transparent)]
29pub struct Cursor([u8; u64::SIZE + u64::SIZE + u32::SIZE]);
30
31impl Cursor {
32 fn new(section: u64, offset: u64, size: u32) -> Self {
34 let mut buf = [0u8; u64::SIZE + u64::SIZE + u32::SIZE];
35 buf[..u64::SIZE].copy_from_slice(§ion.to_be_bytes());
36 buf[u64::SIZE..u64::SIZE + u64::SIZE].copy_from_slice(&offset.to_be_bytes());
37 buf[u64::SIZE + u64::SIZE..].copy_from_slice(&size.to_be_bytes());
38 Self(buf)
39 }
40
41 fn section(&self) -> u64 {
43 u64::from_be_bytes(self.0[..u64::SIZE].try_into().unwrap())
44 }
45
46 fn offset(&self) -> u64 {
48 u64::from_be_bytes(self.0[u64::SIZE..u64::SIZE + u64::SIZE].try_into().unwrap())
49 }
50
51 fn size(&self) -> u32 {
53 u32::from_be_bytes(self.0[u64::SIZE + u64::SIZE..].try_into().unwrap())
54 }
55}
56
57impl Read for Cursor {
58 type Cfg = ();
59
60 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
61 <[u8; u64::SIZE + u64::SIZE + u32::SIZE]>::read(buf).map(Self)
62 }
63}
64
65impl CodecWrite for Cursor {
66 fn write(&self, buf: &mut impl BufMut) {
67 self.0.write(buf);
68 }
69}
70
71impl FixedSize for Cursor {
72 const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
73}
74
75impl Span for Cursor {}
76
77impl Array for Cursor {}
78
79impl Deref for Cursor {
80 type Target = [u8];
81 fn deref(&self) -> &Self::Target {
82 &self.0
83 }
84}
85
86impl AsRef<[u8]> for Cursor {
87 fn as_ref(&self) -> &[u8] {
88 &self.0
89 }
90}
91
92impl std::fmt::Debug for Cursor {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 write!(
95 f,
96 "Cursor(section={}, offset={}, size={})",
97 self.section(),
98 self.offset(),
99 self.size()
100 )
101 }
102}
103
104impl std::fmt::Display for Cursor {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 write!(
107 f,
108 "Cursor(section={}, offset={}, size={})",
109 self.section(),
110 self.offset(),
111 self.size()
112 )
113 }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
121#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
122pub struct Checkpoint {
123 epoch: u64,
125 section: u64,
127 oversized_size: u64,
129 table_size: u32,
131}
132
133impl Checkpoint {
134 const fn init(table_size: u32) -> Self {
136 Self {
137 table_size,
138 epoch: 0,
139 section: 0,
140 oversized_size: 0,
141 }
142 }
143}
144
145impl Read for Checkpoint {
146 type Cfg = ();
147 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
148 let epoch = u64::read(buf)?;
149 let section = u64::read(buf)?;
150 let oversized_size = u64::read(buf)?;
151 let table_size = u32::read(buf)?;
152 Ok(Self {
153 epoch,
154 section,
155 oversized_size,
156 table_size,
157 })
158 }
159}
160
161impl CodecWrite for Checkpoint {
162 fn write(&self, buf: &mut impl BufMut) {
163 self.epoch.write(buf);
164 self.section.write(buf);
165 self.oversized_size.write(buf);
166 self.table_size.write(buf);
167 }
168}
169
170impl FixedSize for Checkpoint {
171 const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
172}
173
174const TABLE_BLOB_NAME: &[u8] = b"table";
176
177#[derive(Debug, Clone, PartialEq)]
179#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
180struct Entry {
181 epoch: u64,
183 section: u64,
185 position: u64,
187 added: u8,
189 crc: u32,
191}
192
193impl Entry {
194 const FULL_SIZE: usize = Self::SIZE * 2;
196
197 fn compute_crc(epoch: u64, section: u64, position: u64, added: u8) -> u32 {
199 let mut hasher = Crc32::new();
200 hasher.update(&epoch.to_be_bytes());
201 hasher.update(§ion.to_be_bytes());
202 hasher.update(&position.to_be_bytes());
203 hasher.update(&added.to_be_bytes());
204 hasher.finalize().as_u32()
205 }
206
207 fn new(epoch: u64, section: u64, position: u64, added: u8) -> Self {
209 Self {
210 epoch,
211 section,
212 position,
213 added,
214 crc: Self::compute_crc(epoch, section, position, added),
215 }
216 }
217
218 const fn is_empty(&self) -> bool {
220 self.section == 0 && self.position == 0 && self.crc == 0
221 }
222
223 fn is_valid(&self) -> bool {
225 Self::compute_crc(self.epoch, self.section, self.position, self.added) == self.crc
226 }
227}
228
229impl FixedSize for Entry {
230 const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u8::SIZE + crc32::Digest::SIZE;
231}
232
233impl CodecWrite for Entry {
234 fn write(&self, buf: &mut impl BufMut) {
235 self.epoch.write(buf);
236 self.section.write(buf);
237 self.position.write(buf);
238 self.added.write(buf);
239 self.crc.write(buf);
240 }
241}
242
243impl Read for Entry {
244 type Cfg = ();
245 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
246 let epoch = u64::read(buf)?;
247 let section = u64::read(buf)?;
248 let position = u64::read(buf)?;
249 let added = u8::read(buf)?;
250 let crc = u32::read(buf)?;
251
252 Ok(Self {
253 epoch,
254 section,
255 position,
256 added,
257 crc,
258 })
259 }
260}
261
262const NO_NEXT_SECTION: u64 = u64::MAX;
264const NO_NEXT_POSITION: u64 = u64::MAX;
265
266#[derive(Debug, Clone, PartialEq)]
274struct Record<K: Array> {
275 key: K,
277 next_section: u64,
280 next_position: u64,
281 value_offset: u64,
283 value_size: u32,
285}
286
287impl<K: Array> Record<K> {
288 fn new(key: K, next: Option<(u64, u64)>, value_offset: u64, value_size: u32) -> Self {
290 let (next_section, next_position) = next.unwrap_or((NO_NEXT_SECTION, NO_NEXT_POSITION));
291 Self {
292 key,
293 next_section,
294 next_position,
295 value_offset,
296 value_size,
297 }
298 }
299
300 const fn next(&self) -> Option<(u64, u64)> {
302 if self.next_section == NO_NEXT_SECTION && self.next_position == NO_NEXT_POSITION {
303 None
304 } else {
305 Some((self.next_section, self.next_position))
306 }
307 }
308}
309
310impl<K: Array> CodecWrite for Record<K> {
311 fn write(&self, buf: &mut impl BufMut) {
312 self.key.write(buf);
313 self.next_section.write(buf);
314 self.next_position.write(buf);
315 self.value_offset.write(buf);
316 self.value_size.write(buf);
317 }
318}
319
320impl<K: Array> Read for Record<K> {
321 type Cfg = ();
322 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
323 let key = K::read(buf)?;
324 let next_section = u64::read(buf)?;
325 let next_position = u64::read(buf)?;
326 let value_offset = u64::read(buf)?;
327 let value_size = u32::read(buf)?;
328
329 Ok(Self {
330 key,
331 next_section,
332 next_position,
333 value_offset,
334 value_size,
335 })
336 }
337}
338
339impl<K: Array> FixedSize for Record<K> {
340 const SIZE: usize = K::SIZE + u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
342}
343
344impl<K: Array> OversizedRecord for Record<K> {
345 fn value_location(&self) -> (u64, u32) {
346 (self.value_offset, self.value_size)
347 }
348
349 fn with_location(mut self, offset: u64, size: u32) -> Self {
350 self.value_offset = offset;
351 self.value_size = size;
352 self
353 }
354}
355
356#[cfg(feature = "arbitrary")]
357impl<K: Array> arbitrary::Arbitrary<'_> for Record<K>
358where
359 K: for<'a> arbitrary::Arbitrary<'a>,
360{
361 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
362 Ok(Self {
363 key: K::arbitrary(u)?,
364 next_section: u64::arbitrary(u)?,
365 next_position: u64::arbitrary(u)?,
366 value_offset: u64::arbitrary(u)?,
367 value_size: u32::arbitrary(u)?,
368 })
369 }
370}
371
372pub struct Freezer<E: Storage + Metrics + Clock, K: Array, V: CodecShared> {
374 context: E,
376
377 table_partition: String,
379 table_size: u32,
380 table_resize_threshold: u64,
381 table_resize_frequency: u8,
382 table_resize_chunk_size: u32,
383
384 table: E::Blob,
386
387 oversized: Oversized<E, Record<K>, V>,
389
390 blob_target_size: u64,
392
393 current_section: u64,
395 next_epoch: u64,
396
397 modified_sections: BTreeSet<u64>,
399 resizable: u32,
400 resize_progress: Option<u32>,
401
402 puts: Counter,
404 gets: Counter,
405 unnecessary_reads: Counter,
406 unnecessary_writes: Counter,
407 resizes: Counter,
408}
409
410impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Freezer<E, K, V> {
411 #[inline]
413 const fn table_offset(table_index: u32) -> u64 {
414 table_index as u64 * Entry::FULL_SIZE as u64
415 }
416
417 fn parse_entries(buf: &[u8]) -> Result<(Entry, Entry), Error> {
419 let mut buf1 = &buf[0..Entry::SIZE];
420 let entry1 = Entry::read(&mut buf1)?;
421 let mut buf2 = &buf[Entry::SIZE..Entry::FULL_SIZE];
422 let entry2 = Entry::read(&mut buf2)?;
423 Ok((entry1, entry2))
424 }
425
426 async fn read_table(blob: &E::Blob, table_index: u32) -> Result<(Entry, Entry), Error> {
428 let offset = Self::table_offset(table_index);
429 let buf = vec![0u8; Entry::FULL_SIZE];
430 let read_buf = blob.read_at(buf, offset).await?;
431
432 Self::parse_entries(read_buf.as_ref())
433 }
434
435 async fn recover_entry(
437 blob: &E::Blob,
438 entry: &mut Entry,
439 entry_offset: u64,
440 max_valid_epoch: Option<u64>,
441 max_epoch: &mut u64,
442 max_section: &mut u64,
443 ) -> Result<bool, Error> {
444 if entry.is_empty() {
445 return Ok(false);
446 }
447
448 if !entry.is_valid()
449 || (max_valid_epoch.is_some() && entry.epoch > max_valid_epoch.unwrap())
450 {
451 debug!(
452 valid_epoch = max_valid_epoch,
453 entry_epoch = entry.epoch,
454 "found invalid table entry"
455 );
456 *entry = Entry::new(0, 0, 0, 0);
457 let zero_buf = vec![0u8; Entry::SIZE];
458 blob.write_at(zero_buf, entry_offset).await?;
459 Ok(true)
460 } else if max_valid_epoch.is_none() && entry.epoch > *max_epoch {
461 *max_epoch = entry.epoch;
463 *max_section = entry.section;
464 Ok(false)
465 } else {
466 Ok(false)
467 }
468 }
469
470 async fn recover_table(
478 blob: &E::Blob,
479 table_size: u32,
480 table_resize_frequency: u8,
481 max_valid_epoch: Option<u64>,
482 table_replay_buffer: NonZeroUsize,
483 ) -> Result<(bool, u64, u64, u32), Error> {
484 let blob_size = Self::table_offset(table_size);
486 let mut reader = buffer::Read::new(blob.clone(), blob_size, table_replay_buffer);
487
488 let mut modified = false;
490 let mut max_epoch = 0u64;
491 let mut max_section = 0u64;
492 let mut resizable = 0u32;
493 for table_index in 0..table_size {
494 let offset = Self::table_offset(table_index);
495
496 let mut buf = [0u8; Entry::FULL_SIZE];
498 reader.read_exact(&mut buf, Entry::FULL_SIZE).await?;
499 let (mut entry1, mut entry2) = Self::parse_entries(&buf)?;
500
501 let entry1_cleared = Self::recover_entry(
503 blob,
504 &mut entry1,
505 offset,
506 max_valid_epoch,
507 &mut max_epoch,
508 &mut max_section,
509 )
510 .await?;
511 let entry2_cleared = Self::recover_entry(
512 blob,
513 &mut entry2,
514 offset + Entry::SIZE as u64,
515 max_valid_epoch,
516 &mut max_epoch,
517 &mut max_section,
518 )
519 .await?;
520 modified |= entry1_cleared || entry2_cleared;
521
522 if let Some((_, _, added)) = Self::read_latest_entry(&entry1, &entry2) {
524 if added >= table_resize_frequency {
525 resizable += 1;
526 }
527 }
528 }
529
530 Ok((modified, max_epoch, max_section, resizable))
531 }
532
533 const fn compute_write_offset(entry1: &Entry, entry2: &Entry, epoch: u64) -> u64 {
535 if !entry1.is_empty() && entry1.epoch == epoch {
537 return 0;
538 }
539 if !entry2.is_empty() && entry2.epoch == epoch {
540 return Entry::SIZE as u64;
541 }
542
543 match (entry1.is_empty(), entry2.is_empty()) {
545 (true, _) => 0, (_, true) => Entry::SIZE as u64, (false, false) => {
548 if entry1.epoch < entry2.epoch {
549 0
550 } else {
551 Entry::SIZE as u64
552 }
553 }
554 }
555 }
556
557 fn read_latest_entry(entry1: &Entry, entry2: &Entry) -> Option<(u64, u64, u8)> {
559 match (
560 !entry1.is_empty() && entry1.is_valid(),
561 !entry2.is_empty() && entry2.is_valid(),
562 ) {
563 (true, true) => match entry1.epoch.cmp(&entry2.epoch) {
564 Ordering::Greater => Some((entry1.section, entry1.position, entry1.added)),
565 Ordering::Less => Some((entry2.section, entry2.position, entry2.added)),
566 Ordering::Equal => {
567 unreachable!("two valid entries with the same epoch")
568 }
569 },
570 (true, false) => Some((entry1.section, entry1.position, entry1.added)),
571 (false, true) => Some((entry2.section, entry2.position, entry2.added)),
572 (false, false) => None,
573 }
574 }
575
576 async fn update_head(
578 table: &E::Blob,
579 table_index: u32,
580 entry1: &Entry,
581 entry2: &Entry,
582 update: Entry,
583 ) -> Result<(), Error> {
584 let table_offset = Self::table_offset(table_index);
586
587 let start = Self::compute_write_offset(entry1, entry2, update.epoch);
589
590 table
592 .write_at(update.encode_mut(), table_offset + start)
593 .await
594 .map_err(Error::Runtime)
595 }
596
597 async fn init_table(blob: &E::Blob, table_size: u32) -> Result<(), Error> {
599 let table_len = Self::table_offset(table_size);
600 blob.resize(table_len).await?;
601 blob.sync().await?;
602 Ok(())
603 }
604
605 pub async fn init(context: E, config: Config<V::Cfg>) -> Result<Self, Error> {
607 Self::init_with_checkpoint(context, config, None).await
608 }
609
610 pub async fn init_with_checkpoint(
613 context: E,
614 config: Config<V::Cfg>,
615 checkpoint: Option<Checkpoint>,
616 ) -> Result<Self, Error> {
617 assert!(
619 config.table_initial_size > 0 && config.table_initial_size.is_power_of_two(),
620 "table_initial_size must be a power of 2"
621 );
622
623 let oversized_cfg = OversizedConfig {
625 index_partition: config.key_partition.clone(),
626 value_partition: config.value_partition.clone(),
627 index_buffer_pool: config.key_buffer_pool.clone(),
628 index_write_buffer: config.key_write_buffer,
629 value_write_buffer: config.value_write_buffer,
630 compression: config.value_compression,
631 codec_config: config.codec_config,
632 };
633 let mut oversized: Oversized<E, Record<K>, V> =
634 Oversized::init(context.with_label("oversized"), oversized_cfg).await?;
635
636 let (table, table_len) = context
638 .open(&config.table_partition, TABLE_BLOB_NAME)
639 .await?;
640
641 let (checkpoint, resizable) = match (table_len, checkpoint) {
643 (0, None) => {
645 Self::init_table(&table, config.table_initial_size).await?;
646 (Checkpoint::init(config.table_initial_size), 0)
647 }
648
649 (0, Some(checkpoint)) => {
651 assert_eq!(checkpoint.epoch, 0);
652 assert_eq!(checkpoint.section, 0);
653 assert_eq!(checkpoint.oversized_size, 0);
654 assert_eq!(checkpoint.table_size, 0);
655
656 Self::init_table(&table, config.table_initial_size).await?;
657 (Checkpoint::init(config.table_initial_size), 0)
658 }
659
660 (_, Some(checkpoint)) => {
662 assert!(
663 checkpoint.table_size > 0 && checkpoint.table_size.is_power_of_two(),
664 "table_size must be a power of 2"
665 );
666
667 oversized
669 .rewind(checkpoint.section, checkpoint.oversized_size)
670 .await?;
671
672 oversized.sync(checkpoint.section).await?;
674
675 let expected_table_len = Self::table_offset(checkpoint.table_size);
677 let mut modified = if table_len != expected_table_len {
678 table.resize(expected_table_len).await?;
679 true
680 } else {
681 false
682 };
683
684 let (table_modified, _, _, resizable) = Self::recover_table(
686 &table,
687 checkpoint.table_size,
688 config.table_resize_frequency,
689 Some(checkpoint.epoch),
690 config.table_replay_buffer,
691 )
692 .await?;
693 if table_modified {
694 modified = true;
695 }
696
697 if modified {
699 table.sync().await?;
700 }
701
702 (checkpoint, resizable)
703 }
704
705 (_, None) => {
707 let table_size = (table_len / Entry::FULL_SIZE as u64) as u32;
709 let (modified, max_epoch, max_section, resizable) = Self::recover_table(
710 &table,
711 table_size,
712 config.table_resize_frequency,
713 None,
714 config.table_replay_buffer,
715 )
716 .await?;
717
718 if modified {
720 table.sync().await?;
721 }
722
723 let oversized_size = oversized.size(max_section).await?;
725
726 (
727 Checkpoint {
728 epoch: max_epoch,
729 section: max_section,
730 oversized_size,
731 table_size,
732 },
733 resizable,
734 )
735 }
736 };
737
738 let puts = Counter::default();
740 let gets = Counter::default();
741 let unnecessary_reads = Counter::default();
742 let unnecessary_writes = Counter::default();
743 let resizes = Counter::default();
744 context.register("puts", "number of put operations", puts.clone());
745 context.register("gets", "number of get operations", gets.clone());
746 context.register(
747 "unnecessary_reads",
748 "number of unnecessary reads performed during key lookups",
749 unnecessary_reads.clone(),
750 );
751 context.register(
752 "unnecessary_writes",
753 "number of unnecessary writes performed during resize",
754 unnecessary_writes.clone(),
755 );
756 context.register(
757 "resizes",
758 "number of table resizing operations",
759 resizes.clone(),
760 );
761
762 Ok(Self {
763 context,
764 table_partition: config.table_partition,
765 table_size: checkpoint.table_size,
766 table_resize_threshold: checkpoint.table_size as u64 * RESIZE_THRESHOLD / 100,
767 table_resize_frequency: config.table_resize_frequency,
768 table_resize_chunk_size: config.table_resize_chunk_size,
769 table,
770 oversized,
771 blob_target_size: config.value_target_size,
772 current_section: checkpoint.section,
773 next_epoch: checkpoint.epoch.checked_add(1).expect("epoch overflow"),
774 modified_sections: BTreeSet::new(),
775 resizable,
776 resize_progress: None,
777 puts,
778 gets,
779 unnecessary_reads,
780 unnecessary_writes,
781 resizes,
782 })
783 }
784
785 fn table_index(&self, key: &K) -> u32 {
797 let hash = Crc32::checksum(key.as_ref());
798 hash & (self.table_size - 1)
799 }
800
801 const fn should_resize(&self) -> bool {
803 self.resizable as u64 >= self.table_resize_threshold
804 }
805
806 async fn update_section(&mut self) -> Result<(), Error> {
808 let value_size = self.oversized.value_size(self.current_section).await?;
810
811 if value_size >= self.blob_target_size {
813 self.current_section += 1;
814 debug!(
815 size = value_size,
816 section = self.current_section,
817 "updated section"
818 );
819 }
820
821 Ok(())
822 }
823
824 pub async fn put(&mut self, key: K, value: V) -> Result<Cursor, Error> {
827 self.puts.inc();
828
829 self.update_section().await?;
831
832 let table_index = self.table_index(&key);
834 let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
835 let head = Self::read_latest_entry(&entry1, &entry2);
836
837 let key_entry = Record::new(
839 key,
840 head.map(|(section, position, _)| (section, position)),
841 0,
842 0,
843 );
844
845 let (position, value_offset, value_size) = self
847 .oversized
848 .append(self.current_section, key_entry, &value)
849 .await?;
850
851 let mut added = head.map(|(_, _, added)| added).unwrap_or(0);
855 added = added.saturating_add(1);
856
857 if added == self.table_resize_frequency {
859 self.resizable += 1;
860 }
861
862 self.modified_sections.insert(self.current_section);
864 let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
865 Self::update_head(&self.table, table_index, &entry1, &entry2, new_entry).await?;
866
867 if let Some(resize_progress) = self.resize_progress {
869 if table_index < resize_progress {
870 self.unnecessary_writes.inc();
871
872 if added == self.table_resize_frequency {
874 self.resizable += 1;
875 }
876
877 let new_table_index = self.table_size + table_index;
881 let new_entry = Entry::new(self.next_epoch, self.current_section, position, added);
882 Self::update_head(&self.table, new_table_index, &entry1, &entry2, new_entry)
883 .await?;
884 }
885 }
886
887 Ok(Cursor::new(self.current_section, value_offset, value_size))
888 }
889
890 async fn get_cursor(&self, cursor: Cursor) -> Result<V, Error> {
892 let value = self
893 .oversized
894 .get_value(cursor.section(), cursor.offset(), cursor.size())
895 .await?;
896
897 Ok(value)
898 }
899
900 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
902 self.gets.inc();
903
904 let table_index = self.table_index(key);
906 let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
907 let Some((mut section, mut position, _)) = Self::read_latest_entry(&entry1, &entry2) else {
908 return Ok(None);
909 };
910
911 loop {
913 let key_entry = self.oversized.get(section, position).await?;
915
916 if key_entry.key.as_ref() == key.as_ref() {
918 let value = self
919 .oversized
920 .get_value(section, key_entry.value_offset, key_entry.value_size)
921 .await?;
922 return Ok(Some(value));
923 }
924
925 self.unnecessary_reads.inc();
927
928 let Some(next) = key_entry.next() else {
930 break; };
932 section = next.0;
933 position = next.1;
934 }
935
936 Ok(None)
937 }
938
939 pub async fn get<'a>(&'a self, identifier: Identifier<'a, K>) -> Result<Option<V>, Error> {
944 match identifier {
945 Identifier::Cursor(cursor) => self.get_cursor(cursor).await.map(Some),
946 Identifier::Key(key) => self.get_key(key).await,
947 }
948 }
949
950 async fn start_resize(&mut self) -> Result<(), Error> {
952 self.resizes.inc();
953
954 let old_size = self.table_size;
956 let Some(new_size) = old_size.checked_mul(2) else {
957 return Ok(());
958 };
959 self.table.resize(Self::table_offset(new_size)).await?;
960
961 self.resize_progress = Some(0);
963 debug!(old = old_size, new = new_size, "table resize started");
964
965 Ok(())
966 }
967
968 fn rewrite_entries(buf: &mut Vec<u8>, entry1: &Entry, entry2: &Entry, new_entry: &Entry) {
970 if Self::compute_write_offset(entry1, entry2, new_entry.epoch) == 0 {
971 buf.extend_from_slice(&new_entry.encode());
972 buf.extend_from_slice(&entry2.encode());
973 } else {
974 buf.extend_from_slice(&entry1.encode());
975 buf.extend_from_slice(&new_entry.encode());
976 }
977 }
978
979 async fn advance_resize(&mut self) -> Result<(), Error> {
985 let current_index = self.resize_progress.unwrap();
987 let old_size = self.table_size;
988 let chunk_end = (current_index + self.table_resize_chunk_size).min(old_size);
989 let chunk_size = chunk_end - current_index;
990
991 let chunk_bytes = chunk_size as usize * Entry::FULL_SIZE;
993 let read_offset = Self::table_offset(current_index);
994 let read_buf = vec![0u8; chunk_bytes];
995 let read_buf: Vec<u8> = self.table.read_at(read_buf, read_offset).await?.into();
996
997 let mut writes = Vec::with_capacity(chunk_bytes);
999 for i in 0..chunk_size {
1000 let entry_offset = i as usize * Entry::FULL_SIZE;
1002 let entry_end = entry_offset + Entry::FULL_SIZE;
1003 let entry_buf = &read_buf[entry_offset..entry_end];
1004
1005 let (entry1, entry2) = Self::parse_entries(entry_buf)?;
1007
1008 let (section, position, added) =
1010 Self::read_latest_entry(&entry1, &entry2).unwrap_or((0, 0, 0));
1011
1012 if added >= self.table_resize_frequency {
1014 self.resizable -= 1;
1015 }
1016
1017 let reset_entry = Entry::new(self.next_epoch, section, position, 0);
1019 Self::rewrite_entries(&mut writes, &entry1, &entry2, &reset_entry);
1020 }
1021
1022 let old_write = self.table.write_at(writes.clone(), read_offset);
1024 let new_offset = (old_size as usize * Entry::FULL_SIZE) as u64 + read_offset;
1025 let new_write = self.table.write_at(writes, new_offset);
1026 try_join(old_write, new_write).await?;
1027
1028 if chunk_end >= old_size {
1030 self.table_size = old_size * 2;
1032 self.table_resize_threshold = self.table_size as u64 * RESIZE_THRESHOLD / 100;
1033 self.resize_progress = None;
1034 debug!(
1035 old = old_size,
1036 new = self.table_size,
1037 "table resize completed"
1038 );
1039 } else {
1040 self.resize_progress = Some(chunk_end);
1042 debug!(current = current_index, chunk_end, "table resize progress");
1043 }
1044
1045 Ok(())
1046 }
1047
1048 pub async fn sync(&mut self) -> Result<Checkpoint, Error> {
1056 let syncs: Vec<_> = self
1058 .modified_sections
1059 .iter()
1060 .map(|section| self.oversized.sync(*section))
1061 .collect();
1062 try_join_all(syncs).await?;
1063 self.modified_sections.clear();
1064
1065 if self.should_resize() && self.resize_progress.is_none() {
1067 self.start_resize().await?;
1068 }
1069
1070 if self.resize_progress.is_some() {
1072 self.advance_resize().await?;
1073 }
1074
1075 self.table.sync().await?;
1077 let stored_epoch = self.next_epoch;
1078 self.next_epoch = self.next_epoch.checked_add(1).expect("epoch overflow");
1079
1080 let oversized_size = self.oversized.size(self.current_section).await?;
1082
1083 Ok(Checkpoint {
1084 epoch: stored_epoch,
1085 section: self.current_section,
1086 oversized_size,
1087 table_size: self.table_size,
1088 })
1089 }
1090
1091 pub async fn close(mut self) -> Result<Checkpoint, Error> {
1093 while self.resize_progress.is_some() {
1095 self.advance_resize().await?;
1096 }
1097
1098 let checkpoint = self.sync().await?;
1100
1101 Ok(checkpoint)
1102 }
1103
1104 pub async fn destroy(self) -> Result<(), Error> {
1106 self.oversized.destroy().await?;
1108
1109 drop(self.table);
1111 self.context
1112 .remove(&self.table_partition, Some(TABLE_BLOB_NAME))
1113 .await?;
1114 self.context.remove(&self.table_partition, None).await?;
1115
1116 Ok(())
1117 }
1118
1119 #[cfg(test)]
1123 pub const fn resizing(&self) -> Option<u32> {
1124 self.resize_progress
1125 }
1126
1127 #[cfg(test)]
1129 pub const fn resizable(&self) -> u32 {
1130 self.resizable
1131 }
1132}
1133
1134impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Gettable for Freezer<E, K, V> {
1135 type Key = K;
1136 type Value = V;
1137 type Error = Error;
1138
1139 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
1140 self.get(Identifier::Key(key)).await
1141 }
1142}
1143
1144impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> kv::Updatable for Freezer<E, K, V> {
1145 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
1146 self.put(key, value).await?;
1147 Ok(())
1148 }
1149}
1150
1151impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Persistable for Freezer<E, K, V> {
1152 type Error = Error;
1153
1154 async fn commit(&mut self) -> Result<(), Self::Error> {
1155 self.sync().await?;
1156 Ok(())
1157 }
1158
1159 async fn sync(&mut self) -> Result<(), Self::Error> {
1160 self.sync().await?;
1161 Ok(())
1162 }
1163
1164 async fn destroy(self) -> Result<(), Self::Error> {
1165 self.destroy().await?;
1166 Ok(())
1167 }
1168}
1169
1170#[cfg(all(test, feature = "arbitrary"))]
1171mod conformance {
1172 use super::*;
1173 use commonware_codec::conformance::CodecConformance;
1174 use commonware_utils::sequence::U64;
1175
1176 commonware_conformance::conformance_tests! {
1177 CodecConformance<Cursor>,
1178 CodecConformance<Checkpoint>,
1179 CodecConformance<Entry>,
1180 CodecConformance<Record<U64>>
1181 }
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186 use super::*;
1187 use crate::kv::tests::{assert_gettable, assert_send, assert_updatable};
1188 use commonware_runtime::deterministic::Context;
1189 use commonware_utils::sequence::U64;
1190
1191 type TestFreezer = Freezer<Context, U64, u64>;
1192
1193 #[allow(dead_code)]
1194 fn assert_freezer_futures_are_send(freezer: &mut TestFreezer, key: U64) {
1195 assert_gettable(freezer, &key);
1196 assert_updatable(freezer, key, 0u64);
1197 }
1198
1199 #[allow(dead_code)]
1200 fn assert_freezer_destroy_is_send(freezer: TestFreezer) {
1201 assert_send(freezer.destroy());
1202 }
1203}