1use super::{Config, Error, Identifier};
2use crate::journal::segmented::variable::{Config as JournalConfig, Journal};
3use bytes::{Buf, BufMut};
4use commonware_codec::{Codec, Encode, EncodeSize, FixedSize, Read, ReadExt, Write as CodecWrite};
5use commonware_runtime::{buffer, Blob, Clock, Metrics, Storage};
6use commonware_utils::{Array, Span};
7use futures::future::{try_join, try_join_all};
8use prometheus_client::metrics::counter::Counter;
9use std::{
10 cmp::Ordering, collections::BTreeSet, marker::PhantomData, num::NonZeroUsize, ops::Deref,
11};
12use tracing::debug;
13
14const RESIZE_THRESHOLD: u64 = 50;
17
18#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
23#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
24#[repr(transparent)]
25pub struct Cursor([u8; u64::SIZE + u32::SIZE]);
26
27impl Cursor {
28 fn new(section: u64, offset: u32) -> Self {
30 let mut buf = [0u8; u64::SIZE + u32::SIZE];
31 buf[..u64::SIZE].copy_from_slice(§ion.to_be_bytes());
32 buf[u64::SIZE..].copy_from_slice(&offset.to_be_bytes());
33 Self(buf)
34 }
35
36 fn section(&self) -> u64 {
38 u64::from_be_bytes(self.0[..u64::SIZE].try_into().unwrap())
39 }
40
41 fn offset(&self) -> u32 {
43 u32::from_be_bytes(self.0[u64::SIZE..].try_into().unwrap())
44 }
45}
46
47impl Read for Cursor {
48 type Cfg = ();
49
50 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
51 <[u8; u64::SIZE + u32::SIZE]>::read(buf).map(Self)
52 }
53}
54
55impl CodecWrite for Cursor {
56 fn write(&self, buf: &mut impl BufMut) {
57 self.0.write(buf);
58 }
59}
60
61impl FixedSize for Cursor {
62 const SIZE: usize = u64::SIZE + u32::SIZE;
63}
64
65impl Span for Cursor {}
66
67impl Array for Cursor {}
68
69impl Deref for Cursor {
70 type Target = [u8];
71 fn deref(&self) -> &Self::Target {
72 &self.0
73 }
74}
75
76impl AsRef<[u8]> for Cursor {
77 fn as_ref(&self) -> &[u8] {
78 &self.0
79 }
80}
81
82impl std::fmt::Debug for Cursor {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 write!(
85 f,
86 "Cursor(section={}, offset={})",
87 self.section(),
88 self.offset()
89 )
90 }
91}
92
93impl std::fmt::Display for Cursor {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 write!(
96 f,
97 "Cursor(section={}, offset={})",
98 self.section(),
99 self.offset()
100 )
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
109#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
110pub struct Checkpoint {
111 epoch: u64,
113 section: u64,
115 size: u64,
117 table_size: u32,
119}
120
121impl Checkpoint {
122 const fn init(table_size: u32) -> Self {
124 Self {
125 table_size,
126 epoch: 0,
127 section: 0,
128 size: 0,
129 }
130 }
131}
132
133impl Read for Checkpoint {
134 type Cfg = ();
135 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
136 let epoch = u64::read(buf)?;
137 let section = u64::read(buf)?;
138 let size = u64::read(buf)?;
139 let table_size = u32::read(buf)?;
140 Ok(Self {
141 epoch,
142 section,
143 size,
144 table_size,
145 })
146 }
147}
148
149impl CodecWrite for Checkpoint {
150 fn write(&self, buf: &mut impl BufMut) {
151 self.epoch.write(buf);
152 self.section.write(buf);
153 self.size.write(buf);
154 self.table_size.write(buf);
155 }
156}
157
158impl FixedSize for Checkpoint {
159 const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
160}
161
162const TABLE_BLOB_NAME: &[u8] = b"table";
164
165#[derive(Debug, Clone, PartialEq)]
167#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
168struct Entry {
169 epoch: u64,
171 section: u64,
173 offset: u32,
175 added: u8,
177 crc: u32,
179}
180
181impl Entry {
182 const FULL_SIZE: usize = Self::SIZE * 2;
184
185 fn compute_crc(epoch: u64, section: u64, offset: u32, added: u8) -> u32 {
187 let mut hasher = crc32fast::Hasher::new();
188 hasher.update(&epoch.to_be_bytes());
189 hasher.update(§ion.to_be_bytes());
190 hasher.update(&offset.to_be_bytes());
191 hasher.update(&added.to_be_bytes());
192 hasher.finalize()
193 }
194
195 fn new(epoch: u64, section: u64, offset: u32, added: u8) -> Self {
197 Self {
198 epoch,
199 section,
200 offset,
201 added,
202 crc: Self::compute_crc(epoch, section, offset, added),
203 }
204 }
205
206 const fn is_empty(&self) -> bool {
208 self.section == 0 && self.offset == 0 && self.crc == 0
209 }
210
211 fn is_valid(&self) -> bool {
213 Self::compute_crc(self.epoch, self.section, self.offset, self.added) == self.crc
214 }
215}
216
217impl FixedSize for Entry {
218 const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE + u8::SIZE + u32::SIZE;
219}
220
221impl CodecWrite for Entry {
222 fn write(&self, buf: &mut impl BufMut) {
223 self.epoch.write(buf);
224 self.section.write(buf);
225 self.offset.write(buf);
226 self.added.write(buf);
227 self.crc.write(buf);
228 }
229}
230
231impl Read for Entry {
232 type Cfg = ();
233 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
234 let epoch = u64::read(buf)?;
235 let section = u64::read(buf)?;
236 let offset = u32::read(buf)?;
237 let added = u8::read(buf)?;
238 let crc = u32::read(buf)?;
239
240 Ok(Self {
241 epoch,
242 section,
243 offset,
244 added,
245 crc,
246 })
247 }
248}
249
250struct Record<K: Array, V: Codec> {
252 key: K,
253 value: V,
254 next: Option<(u64, u32)>,
255}
256
257impl<K: Array, V: Codec> Record<K, V> {
258 const fn new(key: K, value: V, next: Option<(u64, u32)>) -> Self {
260 Self { key, value, next }
261 }
262}
263
264impl<K: Array, V: Codec> CodecWrite for Record<K, V> {
265 fn write(&self, buf: &mut impl BufMut) {
266 self.key.write(buf);
267 self.value.write(buf);
268 self.next.write(buf);
269 }
270}
271
272impl<K: Array, V: Codec> Read for Record<K, V> {
273 type Cfg = V::Cfg;
274 fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
275 let key = K::read(buf)?;
276 let value = V::read_cfg(buf, cfg)?;
277 let next = Option::<(u64, u32)>::read_cfg(buf, &((), ()))?;
278
279 Ok(Self { key, value, next })
280 }
281}
282
283impl<K: Array, V: Codec> EncodeSize for Record<K, V> {
284 fn encode_size(&self) -> usize {
285 K::SIZE + self.value.encode_size() + self.next.encode_size()
286 }
287}
288
289#[cfg(feature = "arbitrary")]
290impl<K: Array, V: Codec> arbitrary::Arbitrary<'_> for Record<K, V>
291where
292 K: for<'a> arbitrary::Arbitrary<'a>,
293 V: for<'a> arbitrary::Arbitrary<'a>,
294{
295 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
296 Ok(Self {
297 key: K::arbitrary(u)?,
298 value: V::arbitrary(u)?,
299 next: Option::<(u64, u32)>::arbitrary(u)?,
300 })
301 }
302}
303
304pub struct Freezer<E: Storage + Metrics + Clock, K: Array, V: Codec> {
306 context: E,
308
309 table_partition: String,
311 table_size: u32,
312 table_resize_threshold: u64,
313 table_resize_frequency: u8,
314 table_resize_chunk_size: u32,
315
316 table: E::Blob,
318
319 journal: Journal<E, Record<K, V>>,
321 journal_target_size: u64,
322
323 current_section: u64,
325 next_epoch: u64,
326
327 modified_sections: BTreeSet<u64>,
329 resizable: u32,
330 resize_progress: Option<u32>,
331
332 puts: Counter,
334 gets: Counter,
335 unnecessary_reads: Counter,
336 unnecessary_writes: Counter,
337 resizes: Counter,
338
339 _phantom: PhantomData<(K, V)>,
341}
342
343impl<E: Storage + Metrics + Clock, K: Array, V: Codec> Freezer<E, K, V> {
344 #[inline]
346 const fn table_offset(table_index: u32) -> u64 {
347 table_index as u64 * Entry::FULL_SIZE as u64
348 }
349
350 fn parse_entries(buf: &[u8]) -> Result<(Entry, Entry), Error> {
352 let mut buf1 = &buf[0..Entry::SIZE];
353 let entry1 = Entry::read(&mut buf1)?;
354 let mut buf2 = &buf[Entry::SIZE..Entry::FULL_SIZE];
355 let entry2 = Entry::read(&mut buf2)?;
356 Ok((entry1, entry2))
357 }
358
359 async fn read_table(blob: &E::Blob, table_index: u32) -> Result<(Entry, Entry), Error> {
361 let offset = Self::table_offset(table_index);
362 let buf = vec![0u8; Entry::FULL_SIZE];
363 let read_buf = blob.read_at(buf, offset).await?;
364
365 Self::parse_entries(read_buf.as_ref())
366 }
367
368 async fn recover_entry(
370 blob: &E::Blob,
371 entry: &mut Entry,
372 entry_offset: u64,
373 max_valid_epoch: Option<u64>,
374 max_epoch: &mut u64,
375 max_section: &mut u64,
376 ) -> Result<bool, Error> {
377 if entry.is_empty() {
378 return Ok(false);
379 }
380
381 if !entry.is_valid()
382 || (max_valid_epoch.is_some() && entry.epoch > max_valid_epoch.unwrap())
383 {
384 debug!(
385 valid_epoch = max_valid_epoch,
386 entry_epoch = entry.epoch,
387 "found invalid table entry"
388 );
389 *entry = Entry::new(0, 0, 0, 0);
390 let zero_buf = vec![0u8; Entry::SIZE];
391 blob.write_at(zero_buf, entry_offset).await?;
392 Ok(true)
393 } else if max_valid_epoch.is_none() && entry.epoch > *max_epoch {
394 *max_epoch = entry.epoch;
396 *max_section = entry.section;
397 Ok(false)
398 } else {
399 Ok(false)
400 }
401 }
402
403 async fn recover_table(
411 blob: &E::Blob,
412 table_size: u32,
413 table_resize_frequency: u8,
414 max_valid_epoch: Option<u64>,
415 table_replay_buffer: NonZeroUsize,
416 ) -> Result<(bool, u64, u64, u32), Error> {
417 let blob_size = Self::table_offset(table_size);
419 let mut reader = buffer::Read::new(blob.clone(), blob_size, table_replay_buffer);
420
421 let mut modified = false;
423 let mut max_epoch = 0u64;
424 let mut max_section = 0u64;
425 let mut resizable = 0u32;
426 for table_index in 0..table_size {
427 let offset = Self::table_offset(table_index);
428
429 let mut buf = [0u8; Entry::FULL_SIZE];
431 reader.read_exact(&mut buf, Entry::FULL_SIZE).await?;
432 let (mut entry1, mut entry2) = Self::parse_entries(&buf)?;
433
434 let entry1_cleared = Self::recover_entry(
436 blob,
437 &mut entry1,
438 offset,
439 max_valid_epoch,
440 &mut max_epoch,
441 &mut max_section,
442 )
443 .await?;
444 let entry2_cleared = Self::recover_entry(
445 blob,
446 &mut entry2,
447 offset + Entry::SIZE as u64,
448 max_valid_epoch,
449 &mut max_epoch,
450 &mut max_section,
451 )
452 .await?;
453 modified |= entry1_cleared || entry2_cleared;
454
455 if let Some((_, _, added)) = Self::read_latest_entry(&entry1, &entry2) {
457 if added >= table_resize_frequency {
458 resizable += 1;
459 }
460 }
461 }
462
463 Ok((modified, max_epoch, max_section, resizable))
464 }
465
466 const fn compute_write_offset(entry1: &Entry, entry2: &Entry, epoch: u64) -> u64 {
468 if !entry1.is_empty() && entry1.epoch == epoch {
470 return 0;
471 }
472 if !entry2.is_empty() && entry2.epoch == epoch {
473 return Entry::SIZE as u64;
474 }
475
476 match (entry1.is_empty(), entry2.is_empty()) {
478 (true, _) => 0, (_, true) => Entry::SIZE as u64, (false, false) => {
481 if entry1.epoch < entry2.epoch {
482 0
483 } else {
484 Entry::SIZE as u64
485 }
486 }
487 }
488 }
489
490 fn read_latest_entry(entry1: &Entry, entry2: &Entry) -> Option<(u64, u32, u8)> {
492 match (
493 !entry1.is_empty() && entry1.is_valid(),
494 !entry2.is_empty() && entry2.is_valid(),
495 ) {
496 (true, true) => match entry1.epoch.cmp(&entry2.epoch) {
497 Ordering::Greater => Some((entry1.section, entry1.offset, entry1.added)),
498 Ordering::Less => Some((entry2.section, entry2.offset, entry2.added)),
499 Ordering::Equal => {
500 unreachable!("two valid entries with the same epoch")
501 }
502 },
503 (true, false) => Some((entry1.section, entry1.offset, entry1.added)),
504 (false, true) => Some((entry2.section, entry2.offset, entry2.added)),
505 (false, false) => None,
506 }
507 }
508
509 async fn update_head(
511 table: &E::Blob,
512 table_index: u32,
513 entry1: &Entry,
514 entry2: &Entry,
515 update: Entry,
516 ) -> Result<(), Error> {
517 let table_offset = Self::table_offset(table_index);
519
520 let start = Self::compute_write_offset(entry1, entry2, update.epoch);
522
523 table
525 .write_at(update.encode(), table_offset + start)
526 .await
527 .map_err(Error::Runtime)
528 }
529
530 async fn init_table(blob: &E::Blob, table_size: u32) -> Result<(), Error> {
532 let table_len = Self::table_offset(table_size);
533 blob.resize(table_len).await?;
534 blob.sync().await?;
535 Ok(())
536 }
537
538 pub async fn init(context: E, config: Config<V::Cfg>) -> Result<Self, Error> {
540 Self::init_with_checkpoint(context, config, None).await
541 }
542
543 pub async fn init_with_checkpoint(
546 context: E,
547 config: Config<V::Cfg>,
548 checkpoint: Option<Checkpoint>,
549 ) -> Result<Self, Error> {
550 assert!(
552 config.table_initial_size > 0 && config.table_initial_size.is_power_of_two(),
553 "table_initial_size must be a power of 2"
554 );
555
556 let journal_config = JournalConfig {
558 partition: config.journal_partition,
559 compression: config.journal_compression,
560 codec_config: config.codec_config,
561 write_buffer: config.journal_write_buffer,
562 buffer_pool: config.journal_buffer_pool,
563 };
564 let mut journal = Journal::init(context.with_label("journal"), journal_config).await?;
565
566 let (table, table_len) = context
568 .open(&config.table_partition, TABLE_BLOB_NAME)
569 .await?;
570
571 let (checkpoint, resizable) = match (table_len, checkpoint) {
573 (0, None) => {
575 Self::init_table(&table, config.table_initial_size).await?;
576 (Checkpoint::init(config.table_initial_size), 0)
577 }
578
579 (0, Some(checkpoint)) => {
581 assert_eq!(checkpoint.epoch, 0);
582 assert_eq!(checkpoint.section, 0);
583 assert_eq!(checkpoint.size, 0);
584 assert_eq!(checkpoint.table_size, 0);
585
586 Self::init_table(&table, config.table_initial_size).await?;
587 (Checkpoint::init(config.table_initial_size), 0)
588 }
589
590 (_, Some(checkpoint)) => {
592 assert!(
593 checkpoint.table_size > 0 && checkpoint.table_size.is_power_of_two(),
594 "table_size must be a power of 2"
595 );
596
597 journal.rewind(checkpoint.section, checkpoint.size).await?;
599 journal.sync(checkpoint.section).await?;
600
601 let expected_table_len = Self::table_offset(checkpoint.table_size);
603 let mut modified = if table_len != expected_table_len {
604 table.resize(expected_table_len).await?;
605 true
606 } else {
607 false
608 };
609
610 let (table_modified, _, _, resizable) = Self::recover_table(
612 &table,
613 checkpoint.table_size,
614 config.table_resize_frequency,
615 Some(checkpoint.epoch),
616 config.table_replay_buffer,
617 )
618 .await?;
619 if table_modified {
620 modified = true;
621 }
622
623 if modified {
625 table.sync().await?;
626 }
627
628 (checkpoint, resizable)
629 }
630
631 (_, None) => {
633 let table_size = (table_len / Entry::FULL_SIZE as u64) as u32;
635 let (modified, max_epoch, max_section, resizable) = Self::recover_table(
636 &table,
637 table_size,
638 config.table_resize_frequency,
639 None,
640 config.table_replay_buffer,
641 )
642 .await?;
643
644 if modified {
646 table.sync().await?;
647 }
648
649 (
650 Checkpoint {
651 epoch: max_epoch,
652 section: max_section,
653 size: journal.size(max_section).await?,
654 table_size,
655 },
656 resizable,
657 )
658 }
659 };
660
661 let puts = Counter::default();
663 let gets = Counter::default();
664 let unnecessary_reads = Counter::default();
665 let unnecessary_writes = Counter::default();
666 let resizes = Counter::default();
667 context.register("puts", "number of put operations", puts.clone());
668 context.register("gets", "number of get operations", gets.clone());
669 context.register(
670 "unnecessary_reads",
671 "number of unnecessary reads performed during key lookups",
672 unnecessary_reads.clone(),
673 );
674 context.register(
675 "unnecessary_writes",
676 "number of unnecessary writes performed during resize",
677 unnecessary_writes.clone(),
678 );
679 context.register(
680 "resizes",
681 "number of table resizing operations",
682 resizes.clone(),
683 );
684
685 Ok(Self {
686 context,
687 table_partition: config.table_partition,
688 table_size: checkpoint.table_size,
689 table_resize_threshold: checkpoint.table_size as u64 * RESIZE_THRESHOLD / 100,
690 table_resize_frequency: config.table_resize_frequency,
691 table_resize_chunk_size: config.table_resize_chunk_size,
692 table,
693 journal,
694 journal_target_size: config.journal_target_size,
695 current_section: checkpoint.section,
696 next_epoch: checkpoint.epoch.checked_add(1).expect("epoch overflow"),
697 modified_sections: BTreeSet::new(),
698 resizable,
699 resize_progress: None,
700 puts,
701 gets,
702 unnecessary_reads,
703 unnecessary_writes,
704 resizes,
705 _phantom: PhantomData,
706 })
707 }
708
709 fn table_index(&self, key: &K) -> u32 {
721 let hash = crc32fast::hash(key.as_ref());
722 hash & (self.table_size - 1)
723 }
724
725 const fn should_resize(&self) -> bool {
727 self.resizable as u64 >= self.table_resize_threshold
728 }
729
730 async fn update_section(&mut self) -> Result<(), Error> {
732 let size = self.journal.size(self.current_section).await?;
734
735 if size >= self.journal_target_size {
737 self.current_section += 1;
738 debug!(size, section = self.current_section, "updated section");
739 }
740
741 Ok(())
742 }
743
744 pub async fn put(&mut self, key: K, value: V) -> Result<Cursor, Error> {
747 self.puts.inc();
748
749 self.update_section().await?;
751
752 let table_index = self.table_index(&key);
754 let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
755 let head = Self::read_latest_entry(&entry1, &entry2);
756
757 let entry = Record::new(
759 key,
760 value,
761 head.map(|(section, offset, _)| (section, offset)),
762 );
763
764 let (offset, _) = self.journal.append(self.current_section, entry).await?;
766
767 let mut added = head.map(|(_, _, added)| added).unwrap_or(0);
771 added = added.saturating_add(1);
772
773 if added == self.table_resize_frequency {
775 self.resizable += 1;
776 }
777
778 self.modified_sections.insert(self.current_section);
780 let new_entry = Entry::new(self.next_epoch, self.current_section, offset, added);
781 Self::update_head(&self.table, table_index, &entry1, &entry2, new_entry).await?;
782
783 if let Some(resize_progress) = self.resize_progress {
785 if table_index < resize_progress {
786 self.unnecessary_writes.inc();
787
788 if added == self.table_resize_frequency {
790 self.resizable += 1;
791 }
792
793 let new_table_index = self.table_size + table_index;
797 let new_entry = Entry::new(self.next_epoch, self.current_section, offset, added);
798 Self::update_head(&self.table, new_table_index, &entry1, &entry2, new_entry)
799 .await?;
800 }
801 }
802
803 Ok(Cursor::new(self.current_section, offset))
804 }
805
806 async fn get_cursor(&self, cursor: Cursor) -> Result<V, Error> {
808 let entry = self.journal.get(cursor.section(), cursor.offset()).await?;
809
810 Ok(entry.value)
811 }
812
813 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
815 self.gets.inc();
816
817 let table_index = self.table_index(key);
819 let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
820 let Some((mut section, mut offset, _)) = Self::read_latest_entry(&entry1, &entry2) else {
821 return Ok(None);
822 };
823
824 loop {
826 let entry = self.journal.get(section, offset).await?;
828
829 if entry.key.as_ref() == key.as_ref() {
831 return Ok(Some(entry.value));
832 }
833
834 self.unnecessary_reads.inc();
836
837 let Some(next) = entry.next else {
839 break; };
841 section = next.0;
842 offset = next.1;
843 }
844
845 Ok(None)
846 }
847
848 pub async fn get<'a>(&'a self, identifier: Identifier<'a, K>) -> Result<Option<V>, Error> {
853 match identifier {
854 Identifier::Cursor(cursor) => self.get_cursor(cursor).await.map(Some),
855 Identifier::Key(key) => self.get_key(key).await,
856 }
857 }
858
859 async fn start_resize(&mut self) -> Result<(), Error> {
861 self.resizes.inc();
862
863 let old_size = self.table_size;
865 let Some(new_size) = old_size.checked_mul(2) else {
866 return Ok(());
867 };
868 self.table.resize(Self::table_offset(new_size)).await?;
869
870 self.resize_progress = Some(0);
872 debug!(old = old_size, new = new_size, "table resize started");
873
874 Ok(())
875 }
876
877 fn rewrite_entries(buf: &mut Vec<u8>, entry1: &Entry, entry2: &Entry, new_entry: &Entry) {
879 if Self::compute_write_offset(entry1, entry2, new_entry.epoch) == 0 {
880 buf.extend_from_slice(&new_entry.encode());
881 buf.extend_from_slice(&entry2.encode());
882 } else {
883 buf.extend_from_slice(&entry1.encode());
884 buf.extend_from_slice(&new_entry.encode());
885 }
886 }
887
888 async fn advance_resize(&mut self) -> Result<(), Error> {
894 let current_index = self.resize_progress.unwrap();
896 let old_size = self.table_size;
897 let chunk_end = (current_index + self.table_resize_chunk_size).min(old_size);
898 let chunk_size = chunk_end - current_index;
899
900 let chunk_bytes = chunk_size as usize * Entry::FULL_SIZE;
902 let read_offset = Self::table_offset(current_index);
903 let read_buf = vec![0u8; chunk_bytes];
904 let read_buf: Vec<u8> = self.table.read_at(read_buf, read_offset).await?.into();
905
906 let mut writes = Vec::with_capacity(chunk_bytes);
908 for i in 0..chunk_size {
909 let entry_offset = i as usize * Entry::FULL_SIZE;
911 let entry_end = entry_offset + Entry::FULL_SIZE;
912 let entry_buf = &read_buf[entry_offset..entry_end];
913
914 let (entry1, entry2) = Self::parse_entries(entry_buf)?;
916
917 let (section, offset, added) =
919 Self::read_latest_entry(&entry1, &entry2).unwrap_or((0, 0, 0));
920
921 if added >= self.table_resize_frequency {
923 self.resizable -= 1;
924 }
925
926 let reset_entry = Entry::new(self.next_epoch, section, offset, 0);
928 Self::rewrite_entries(&mut writes, &entry1, &entry2, &reset_entry);
929 }
930
931 let old_write = self.table.write_at(writes.clone(), read_offset);
933 let new_offset = (old_size as usize * Entry::FULL_SIZE) as u64 + read_offset;
934 let new_write = self.table.write_at(writes, new_offset);
935 try_join(old_write, new_write).await?;
936
937 if chunk_end >= old_size {
939 self.table_size = old_size * 2;
941 self.table_resize_threshold = self.table_size as u64 * RESIZE_THRESHOLD / 100;
942 self.resize_progress = None;
943 debug!(
944 old = old_size,
945 new = self.table_size,
946 "table resize completed"
947 );
948 } else {
949 self.resize_progress = Some(chunk_end);
951 debug!(current = current_index, chunk_end, "table resize progress");
952 }
953
954 Ok(())
955 }
956
957 pub async fn sync(&mut self) -> Result<Checkpoint, Error> {
965 let mut updates = Vec::with_capacity(self.modified_sections.len());
967 for section in &self.modified_sections {
968 updates.push(self.journal.sync(*section));
969 }
970 try_join_all(updates).await?;
971 self.modified_sections.clear();
972
973 if self.should_resize() && self.resize_progress.is_none() {
975 self.start_resize().await?;
976 }
977
978 if self.resize_progress.is_some() {
980 self.advance_resize().await?;
981 }
982
983 self.table.sync().await?;
985 let stored_epoch = self.next_epoch;
986 self.next_epoch = self.next_epoch.checked_add(1).expect("epoch overflow");
987
988 Ok(Checkpoint {
989 epoch: stored_epoch,
990 section: self.current_section,
991 size: self.journal.size(self.current_section).await?,
992 table_size: self.table_size,
993 })
994 }
995
996 pub async fn close(mut self) -> Result<Checkpoint, Error> {
998 while self.resize_progress.is_some() {
1000 self.advance_resize().await?;
1001 }
1002
1003 let checkpoint = self.sync().await?;
1005
1006 self.journal.close().await?;
1007 self.table.sync().await?;
1008 Ok(checkpoint)
1009 }
1010
1011 pub async fn destroy(self) -> Result<(), Error> {
1013 self.journal.destroy().await?;
1015
1016 drop(self.table);
1018 self.context
1019 .remove(&self.table_partition, Some(TABLE_BLOB_NAME))
1020 .await?;
1021 self.context.remove(&self.table_partition, None).await?;
1022
1023 Ok(())
1024 }
1025
1026 #[cfg(test)]
1030 pub const fn resizing(&self) -> Option<u32> {
1031 self.resize_progress
1032 }
1033
1034 #[cfg(test)]
1036 pub const fn resizable(&self) -> u32 {
1037 self.resizable
1038 }
1039}
1040
1041impl<E: Storage + Metrics + Clock, K: Array, V: Codec> crate::store::Store for Freezer<E, K, V> {
1042 type Key = K;
1043 type Value = V;
1044 type Error = Error;
1045
1046 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
1047 self.get(Identifier::Key(key)).await
1048 }
1049}
1050
1051impl<E: Storage + Metrics + Clock, K: Array, V: Codec> crate::store::StoreMut for Freezer<E, K, V> {
1052 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
1053 self.put(key, value).await?;
1054 Ok(())
1055 }
1056}
1057
1058impl<E: Storage + Metrics + Clock, K: Array, V: Codec> crate::store::StorePersistable
1059 for Freezer<E, K, V>
1060{
1061 async fn commit(&mut self) -> Result<(), Self::Error> {
1062 self.sync().await?;
1063 Ok(())
1064 }
1065
1066 async fn destroy(self) -> Result<(), Self::Error> {
1067 self.destroy().await
1068 }
1069}
1070
1071#[cfg(all(test, feature = "arbitrary"))]
1072mod conformance {
1073 use super::*;
1074 use commonware_codec::conformance::CodecConformance;
1075 use commonware_utils::sequence::U64;
1076
1077 commonware_conformance::conformance_tests! {
1078 CodecConformance<Cursor>,
1079 CodecConformance<Checkpoint>,
1080 CodecConformance<Entry>,
1081 CodecConformance<Record<U64, U64>>
1082 }
1083}