1use super::{Config, Error, Identifier};
2use crate::journal::variable::{Config as JournalConfig, Journal};
3use bytes::{Buf, BufMut};
4use commonware_codec::{Codec, Encode, EncodeSize, FixedSize, Read, ReadExt, Write as CodecWrite};
5use commonware_runtime::{buffer, Blob, Clock, Metrics, Storage};
6use commonware_utils::{Array, Span};
7use futures::future::{try_join, try_join_all};
8use prometheus_client::metrics::counter::Counter;
9use std::{
10 cmp::Ordering, collections::BTreeSet, marker::PhantomData, num::NonZeroUsize, ops::Deref,
11};
12use tracing::debug;
13
14const RESIZE_THRESHOLD: u64 = 50;
17
18#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
23#[repr(transparent)]
24pub struct Cursor([u8; u64::SIZE + u32::SIZE]);
25
26impl Cursor {
27 fn new(section: u64, offset: u32) -> Self {
29 let mut buf = [0u8; u64::SIZE + u32::SIZE];
30 buf[..u64::SIZE].copy_from_slice(§ion.to_be_bytes());
31 buf[u64::SIZE..].copy_from_slice(&offset.to_be_bytes());
32 Self(buf)
33 }
34
35 fn section(&self) -> u64 {
37 u64::from_be_bytes(self.0[..u64::SIZE].try_into().unwrap())
38 }
39
40 fn offset(&self) -> u32 {
42 u32::from_be_bytes(self.0[u64::SIZE..].try_into().unwrap())
43 }
44}
45
46impl Read for Cursor {
47 type Cfg = ();
48
49 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
50 <[u8; u64::SIZE + u32::SIZE]>::read(buf).map(Self)
51 }
52}
53
54impl CodecWrite for Cursor {
55 fn write(&self, buf: &mut impl BufMut) {
56 self.0.write(buf);
57 }
58}
59
60impl FixedSize for Cursor {
61 const SIZE: usize = u64::SIZE + u32::SIZE;
62}
63
64impl Span for Cursor {}
65
66impl Array for Cursor {}
67
68impl Deref for Cursor {
69 type Target = [u8];
70 fn deref(&self) -> &Self::Target {
71 &self.0
72 }
73}
74
75impl AsRef<[u8]> for Cursor {
76 fn as_ref(&self) -> &[u8] {
77 &self.0
78 }
79}
80
81impl std::fmt::Debug for Cursor {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 write!(
84 f,
85 "Cursor(section={}, offset={})",
86 self.section(),
87 self.offset()
88 )
89 }
90}
91
92impl std::fmt::Display for Cursor {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 write!(
95 f,
96 "Cursor(section={}, offset={})",
97 self.section(),
98 self.offset()
99 )
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy)]
108pub struct Checkpoint {
109 epoch: u64,
111 section: u64,
113 size: u64,
115 table_size: u32,
117}
118
119impl Checkpoint {
120 fn init(table_size: u32) -> Self {
122 Self {
123 table_size,
124 epoch: 0,
125 section: 0,
126 size: 0,
127 }
128 }
129}
130
131impl Read for Checkpoint {
132 type Cfg = ();
133 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, commonware_codec::Error> {
134 let epoch = u64::read(buf)?;
135 let section = u64::read(buf)?;
136 let size = u64::read(buf)?;
137 let table_size = u32::read(buf)?;
138 Ok(Self {
139 epoch,
140 section,
141 size,
142 table_size,
143 })
144 }
145}
146
147impl CodecWrite for Checkpoint {
148 fn write(&self, buf: &mut impl BufMut) {
149 self.epoch.write(buf);
150 self.section.write(buf);
151 self.size.write(buf);
152 self.table_size.write(buf);
153 }
154}
155
156impl FixedSize for Checkpoint {
157 const SIZE: usize = u64::SIZE + u64::SIZE + u64::SIZE + u32::SIZE;
158}
159
160const TABLE_BLOB_NAME: &[u8] = b"table";
162
163#[derive(Debug, Clone, PartialEq)]
165struct Entry {
166 epoch: u64,
168 section: u64,
170 offset: u32,
172 added: u8,
174 crc: u32,
176}
177
178impl Entry {
179 const FULL_SIZE: usize = Self::SIZE * 2;
181
182 fn compute_crc(epoch: u64, section: u64, offset: u32, added: u8) -> u32 {
184 let mut hasher = crc32fast::Hasher::new();
185 hasher.update(&epoch.to_be_bytes());
186 hasher.update(§ion.to_be_bytes());
187 hasher.update(&offset.to_be_bytes());
188 hasher.update(&added.to_be_bytes());
189 hasher.finalize()
190 }
191
192 fn new(epoch: u64, section: u64, offset: u32, added: u8) -> Self {
194 Self {
195 epoch,
196 section,
197 offset,
198 added,
199 crc: Self::compute_crc(epoch, section, offset, added),
200 }
201 }
202
203 fn is_empty(&self) -> bool {
205 self.section == 0 && self.offset == 0 && self.crc == 0
206 }
207
208 fn is_valid(&self) -> bool {
210 Self::compute_crc(self.epoch, self.section, self.offset, self.added) == self.crc
211 }
212}
213
214impl FixedSize for Entry {
215 const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE + u8::SIZE + u32::SIZE;
216}
217
218impl CodecWrite for Entry {
219 fn write(&self, buf: &mut impl BufMut) {
220 self.epoch.write(buf);
221 self.section.write(buf);
222 self.offset.write(buf);
223 self.added.write(buf);
224 self.crc.write(buf);
225 }
226}
227
228impl Read for Entry {
229 type Cfg = ();
230 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
231 let epoch = u64::read(buf)?;
232 let section = u64::read(buf)?;
233 let offset = u32::read(buf)?;
234 let added = u8::read(buf)?;
235 let crc = u32::read(buf)?;
236
237 Ok(Self {
238 epoch,
239 section,
240 offset,
241 added,
242 crc,
243 })
244 }
245}
246
247struct Record<K: Array, V: Codec> {
249 key: K,
250 value: V,
251 next: Option<(u64, u32)>,
252}
253
254impl<K: Array, V: Codec> Record<K, V> {
255 fn new(key: K, value: V, next: Option<(u64, u32)>) -> Self {
257 Self { key, value, next }
258 }
259}
260
261impl<K: Array, V: Codec> CodecWrite for Record<K, V> {
262 fn write(&self, buf: &mut impl BufMut) {
263 self.key.write(buf);
264 self.value.write(buf);
265 self.next.write(buf);
266 }
267}
268
269impl<K: Array, V: Codec> Read for Record<K, V> {
270 type Cfg = V::Cfg;
271 fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
272 let key = K::read(buf)?;
273 let value = V::read_cfg(buf, cfg)?;
274 let next = Option::<(u64, u32)>::read_cfg(buf, &((), ()))?;
275
276 Ok(Self { key, value, next })
277 }
278}
279
280impl<K: Array, V: Codec> EncodeSize for Record<K, V> {
281 fn encode_size(&self) -> usize {
282 K::SIZE + self.value.encode_size() + self.next.encode_size()
283 }
284}
285
286pub struct Freezer<E: Storage + Metrics + Clock, K: Array, V: Codec> {
288 context: E,
290
291 table_partition: String,
293 table_size: u32,
294 table_resize_threshold: u64,
295 table_resize_frequency: u8,
296 table_resize_chunk_size: u32,
297
298 table: E::Blob,
300
301 journal: Journal<E, Record<K, V>>,
303 journal_target_size: u64,
304
305 current_section: u64,
307 next_epoch: u64,
308
309 modified_sections: BTreeSet<u64>,
311 resizable: u32,
312 resize_progress: Option<u32>,
313
314 puts: Counter,
316 gets: Counter,
317 unnecessary_reads: Counter,
318 unnecessary_writes: Counter,
319 resizes: Counter,
320
321 _phantom: PhantomData<(K, V)>,
323}
324
325impl<E: Storage + Metrics + Clock, K: Array, V: Codec> Freezer<E, K, V> {
326 #[inline]
328 fn table_offset(table_index: u32) -> u64 {
329 table_index as u64 * Entry::FULL_SIZE as u64
330 }
331
332 fn parse_entries(buf: &[u8]) -> Result<(Entry, Entry), Error> {
334 let mut buf1 = &buf[0..Entry::SIZE];
335 let entry1 = Entry::read(&mut buf1)?;
336 let mut buf2 = &buf[Entry::SIZE..Entry::FULL_SIZE];
337 let entry2 = Entry::read(&mut buf2)?;
338 Ok((entry1, entry2))
339 }
340
341 async fn read_table(blob: &E::Blob, table_index: u32) -> Result<(Entry, Entry), Error> {
343 let offset = Self::table_offset(table_index);
344 let buf = vec![0u8; Entry::FULL_SIZE];
345 let read_buf = blob.read_at(buf, offset).await?;
346
347 Self::parse_entries(read_buf.as_ref())
348 }
349
350 async fn recover_entry(
352 blob: &E::Blob,
353 entry: &mut Entry,
354 entry_offset: u64,
355 max_valid_epoch: Option<u64>,
356 max_epoch: &mut u64,
357 max_section: &mut u64,
358 ) -> Result<bool, Error> {
359 if entry.is_empty() {
360 return Ok(false);
361 }
362
363 if !entry.is_valid()
364 || (max_valid_epoch.is_some() && entry.epoch > max_valid_epoch.unwrap())
365 {
366 debug!(
367 valid_epoch = max_valid_epoch,
368 entry_epoch = entry.epoch,
369 "found invalid table entry"
370 );
371 *entry = Entry::new(0, 0, 0, 0);
372 let zero_buf = vec![0u8; Entry::SIZE];
373 blob.write_at(zero_buf, entry_offset).await?;
374 Ok(true)
375 } else if max_valid_epoch.is_none() && entry.epoch > *max_epoch {
376 *max_epoch = entry.epoch;
378 *max_section = entry.section;
379 Ok(false)
380 } else {
381 Ok(false)
382 }
383 }
384
385 async fn recover_table(
393 blob: &E::Blob,
394 table_size: u32,
395 table_resize_frequency: u8,
396 max_valid_epoch: Option<u64>,
397 table_replay_buffer: NonZeroUsize,
398 ) -> Result<(bool, u64, u64, u32), Error> {
399 let blob_size = Self::table_offset(table_size);
401 let mut reader = buffer::Read::new(blob.clone(), blob_size, table_replay_buffer);
402
403 let mut modified = false;
405 let mut max_epoch = 0u64;
406 let mut max_section = 0u64;
407 let mut resizable = 0u32;
408 for table_index in 0..table_size {
409 let offset = Self::table_offset(table_index);
410
411 let mut buf = [0u8; Entry::FULL_SIZE];
413 reader.read_exact(&mut buf, Entry::FULL_SIZE).await?;
414 let (mut entry1, mut entry2) = Self::parse_entries(&buf)?;
415
416 let entry1_cleared = Self::recover_entry(
418 blob,
419 &mut entry1,
420 offset,
421 max_valid_epoch,
422 &mut max_epoch,
423 &mut max_section,
424 )
425 .await?;
426 let entry2_cleared = Self::recover_entry(
427 blob,
428 &mut entry2,
429 offset + Entry::SIZE as u64,
430 max_valid_epoch,
431 &mut max_epoch,
432 &mut max_section,
433 )
434 .await?;
435 modified |= entry1_cleared || entry2_cleared;
436
437 if let Some((_, _, added)) = Self::read_latest_entry(&entry1, &entry2) {
439 if added >= table_resize_frequency {
440 resizable += 1;
441 }
442 }
443 }
444
445 Ok((modified, max_epoch, max_section, resizable))
446 }
447
448 fn compute_write_offset(entry1: &Entry, entry2: &Entry, epoch: u64) -> u64 {
450 if !entry1.is_empty() && entry1.epoch == epoch {
452 return 0;
453 }
454 if !entry2.is_empty() && entry2.epoch == epoch {
455 return Entry::SIZE as u64;
456 }
457
458 match (entry1.is_empty(), entry2.is_empty()) {
460 (true, _) => 0, (_, true) => Entry::SIZE as u64, (false, false) => {
463 if entry1.epoch < entry2.epoch {
464 0
465 } else {
466 Entry::SIZE as u64
467 }
468 }
469 }
470 }
471
472 fn read_latest_entry(entry1: &Entry, entry2: &Entry) -> Option<(u64, u32, u8)> {
474 match (
475 !entry1.is_empty() && entry1.is_valid(),
476 !entry2.is_empty() && entry2.is_valid(),
477 ) {
478 (true, true) => match entry1.epoch.cmp(&entry2.epoch) {
479 Ordering::Greater => Some((entry1.section, entry1.offset, entry1.added)),
480 Ordering::Less => Some((entry2.section, entry2.offset, entry2.added)),
481 Ordering::Equal => {
482 unreachable!("two valid entries with the same epoch")
483 }
484 },
485 (true, false) => Some((entry1.section, entry1.offset, entry1.added)),
486 (false, true) => Some((entry2.section, entry2.offset, entry2.added)),
487 (false, false) => None,
488 }
489 }
490
491 async fn update_head(
493 table: &E::Blob,
494 table_index: u32,
495 entry1: &Entry,
496 entry2: &Entry,
497 update: Entry,
498 ) -> Result<(), Error> {
499 let table_offset = Self::table_offset(table_index);
501
502 let start = Self::compute_write_offset(entry1, entry2, update.epoch);
504
505 table
507 .write_at(update.encode(), table_offset + start)
508 .await
509 .map_err(Error::Runtime)
510 }
511
512 async fn init_table(blob: &E::Blob, table_size: u32) -> Result<(), Error> {
514 let table_len = Self::table_offset(table_size);
515 blob.resize(table_len).await?;
516 blob.sync().await?;
517 Ok(())
518 }
519
520 pub async fn init(context: E, config: Config<V::Cfg>) -> Result<Self, Error> {
522 Self::init_with_checkpoint(context, config, None).await
523 }
524
525 pub async fn init_with_checkpoint(
528 context: E,
529 config: Config<V::Cfg>,
530 checkpoint: Option<Checkpoint>,
531 ) -> Result<Self, Error> {
532 assert!(
534 config.table_initial_size > 0 && config.table_initial_size.is_power_of_two(),
535 "table_initial_size must be a power of 2"
536 );
537
538 let journal_config = JournalConfig {
540 partition: config.journal_partition,
541 compression: config.journal_compression,
542 codec_config: config.codec_config,
543 write_buffer: config.journal_write_buffer,
544 buffer_pool: config.journal_buffer_pool,
545 };
546 let mut journal = Journal::init(context.clone(), journal_config).await?;
547
548 let (table, table_len) = context
550 .open(&config.table_partition, TABLE_BLOB_NAME)
551 .await?;
552
553 let (checkpoint, resizable) = match (table_len, checkpoint) {
555 (0, None) => {
557 Self::init_table(&table, config.table_initial_size).await?;
558 (Checkpoint::init(config.table_initial_size), 0)
559 }
560
561 (0, Some(checkpoint)) => {
563 assert_eq!(checkpoint.epoch, 0);
564 assert_eq!(checkpoint.section, 0);
565 assert_eq!(checkpoint.size, 0);
566 assert_eq!(checkpoint.table_size, 0);
567
568 Self::init_table(&table, config.table_initial_size).await?;
569 (Checkpoint::init(config.table_initial_size), 0)
570 }
571
572 (_, Some(checkpoint)) => {
574 assert!(
575 checkpoint.table_size > 0 && checkpoint.table_size.is_power_of_two(),
576 "table_size must be a power of 2"
577 );
578
579 journal.rewind(checkpoint.section, checkpoint.size).await?;
581 journal.sync(checkpoint.section).await?;
582
583 let expected_table_len = Self::table_offset(checkpoint.table_size);
585 let mut modified = if table_len != expected_table_len {
586 table.resize(expected_table_len).await?;
587 true
588 } else {
589 false
590 };
591
592 let (table_modified, _, _, resizable) = Self::recover_table(
594 &table,
595 checkpoint.table_size,
596 config.table_resize_frequency,
597 Some(checkpoint.epoch),
598 config.table_replay_buffer,
599 )
600 .await?;
601 if table_modified {
602 modified = true;
603 }
604
605 if modified {
607 table.sync().await?;
608 }
609
610 (checkpoint, resizable)
611 }
612
613 (_, None) => {
615 let table_size = (table_len / Entry::FULL_SIZE as u64) as u32;
617 let (modified, max_epoch, max_section, resizable) = Self::recover_table(
618 &table,
619 table_size,
620 config.table_resize_frequency,
621 None,
622 config.table_replay_buffer,
623 )
624 .await?;
625
626 if modified {
628 table.sync().await?;
629 }
630
631 (
632 Checkpoint {
633 epoch: max_epoch,
634 section: max_section,
635 size: journal.size(max_section).await?,
636 table_size,
637 },
638 resizable,
639 )
640 }
641 };
642
643 let puts = Counter::default();
645 let gets = Counter::default();
646 let unnecessary_reads = Counter::default();
647 let unnecessary_writes = Counter::default();
648 let resizes = Counter::default();
649 context.register("puts", "number of put operations", puts.clone());
650 context.register("gets", "number of get operations", gets.clone());
651 context.register(
652 "unnecessary_reads",
653 "number of unnecessary reads performed during key lookups",
654 unnecessary_reads.clone(),
655 );
656 context.register(
657 "unnecessary_writes",
658 "number of unnecessary writes performed during resize",
659 unnecessary_writes.clone(),
660 );
661 context.register(
662 "resizes",
663 "number of table resizing operations",
664 resizes.clone(),
665 );
666
667 Ok(Self {
668 context,
669 table_partition: config.table_partition,
670 table_size: checkpoint.table_size,
671 table_resize_threshold: checkpoint.table_size as u64 * RESIZE_THRESHOLD / 100,
672 table_resize_frequency: config.table_resize_frequency,
673 table_resize_chunk_size: config.table_resize_chunk_size,
674 table,
675 journal,
676 journal_target_size: config.journal_target_size,
677 current_section: checkpoint.section,
678 next_epoch: checkpoint.epoch.checked_add(1).expect("epoch overflow"),
679 modified_sections: BTreeSet::new(),
680 resizable,
681 resize_progress: None,
682 puts,
683 gets,
684 unnecessary_reads,
685 unnecessary_writes,
686 resizes,
687 _phantom: PhantomData,
688 })
689 }
690
691 fn table_index(&self, key: &K) -> u32 {
703 let hash = crc32fast::hash(key.as_ref());
704 hash & (self.table_size - 1)
705 }
706
707 fn should_resize(&self) -> bool {
709 self.resizable as u64 >= self.table_resize_threshold
710 }
711
712 async fn update_section(&mut self) -> Result<(), Error> {
714 let size = self.journal.size(self.current_section).await?;
716
717 if size >= self.journal_target_size {
719 self.current_section += 1;
720 debug!(size, section = self.current_section, "updated section");
721 }
722
723 Ok(())
724 }
725
726 pub async fn put(&mut self, key: K, value: V) -> Result<Cursor, Error> {
728 self.puts.inc();
729
730 self.update_section().await?;
732
733 let table_index = self.table_index(&key);
735 let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
736 let head = Self::read_latest_entry(&entry1, &entry2);
737
738 let entry = Record::new(
740 key,
741 value,
742 head.map(|(section, offset, _)| (section, offset)),
743 );
744
745 let (offset, _) = self.journal.append(self.current_section, entry).await?;
747
748 let mut added = head.map(|(_, _, added)| added).unwrap_or(0);
752 added = added.saturating_add(1);
753
754 if added == self.table_resize_frequency {
756 self.resizable += 1;
757 }
758
759 self.modified_sections.insert(self.current_section);
761 let new_entry = Entry::new(self.next_epoch, self.current_section, offset, added);
762 Self::update_head(&self.table, table_index, &entry1, &entry2, new_entry).await?;
763
764 if let Some(resize_progress) = self.resize_progress {
766 if table_index < resize_progress {
767 self.unnecessary_writes.inc();
768
769 if added == self.table_resize_frequency {
771 self.resizable += 1;
772 }
773
774 let new_table_index = self.table_size + table_index;
778 let new_entry = Entry::new(self.next_epoch, self.current_section, offset, added);
779 Self::update_head(&self.table, new_table_index, &entry1, &entry2, new_entry)
780 .await?;
781 }
782 }
783
784 Ok(Cursor::new(self.current_section, offset))
785 }
786
787 async fn get_cursor(&self, cursor: Cursor) -> Result<Option<V>, Error> {
789 let entry = self.journal.get(cursor.section(), cursor.offset()).await?;
790 let Some(entry) = entry else {
791 return Ok(None);
792 };
793
794 Ok(Some(entry.value))
795 }
796
797 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
799 self.gets.inc();
800
801 let table_index = self.table_index(key);
803 let (entry1, entry2) = Self::read_table(&self.table, table_index).await?;
804 let Some((mut section, mut offset, _)) = Self::read_latest_entry(&entry1, &entry2) else {
805 return Ok(None);
806 };
807
808 loop {
810 let entry = match self.journal.get(section, offset).await? {
812 Some(entry) => entry,
813 None => unreachable!("missing entry"),
814 };
815
816 if entry.key.as_ref() == key.as_ref() {
818 return Ok(Some(entry.value));
819 }
820
821 self.unnecessary_reads.inc();
823
824 let Some(next) = entry.next else {
826 break; };
828 section = next.0;
829 offset = next.1;
830 }
831
832 Ok(None)
833 }
834
835 pub async fn get<'a>(&'a self, identifier: Identifier<'a, K>) -> Result<Option<V>, Error> {
840 match identifier {
841 Identifier::Cursor(cursor) => self.get_cursor(cursor).await,
842 Identifier::Key(key) => self.get_key(key).await,
843 }
844 }
845
846 async fn start_resize(&mut self) -> Result<(), Error> {
848 self.resizes.inc();
849
850 let old_size = self.table_size;
852 let Some(new_size) = old_size.checked_mul(2) else {
853 return Ok(());
854 };
855 self.table.resize(Self::table_offset(new_size)).await?;
856
857 self.resize_progress = Some(0);
859 debug!(old = old_size, new = new_size, "table resize started");
860
861 Ok(())
862 }
863
864 fn rewrite_entries(buf: &mut Vec<u8>, entry1: &Entry, entry2: &Entry, new_entry: &Entry) {
866 if Self::compute_write_offset(entry1, entry2, new_entry.epoch) == 0 {
867 buf.extend_from_slice(&new_entry.encode());
868 buf.extend_from_slice(&entry2.encode());
869 } else {
870 buf.extend_from_slice(&entry1.encode());
871 buf.extend_from_slice(&new_entry.encode());
872 }
873 }
874
875 async fn advance_resize(&mut self) -> Result<(), Error> {
881 let current_index = self.resize_progress.unwrap();
883 let old_size = self.table_size;
884 let chunk_end = (current_index + self.table_resize_chunk_size).min(old_size);
885 let chunk_size = chunk_end - current_index;
886
887 let chunk_bytes = chunk_size as usize * Entry::FULL_SIZE;
889 let read_offset = Self::table_offset(current_index);
890 let read_buf = vec![0u8; chunk_bytes];
891 let read_buf: Vec<u8> = self.table.read_at(read_buf, read_offset).await?.into();
892
893 let mut writes = Vec::with_capacity(chunk_bytes);
895 for i in 0..chunk_size {
896 let entry_offset = i as usize * Entry::FULL_SIZE;
898 let entry_end = entry_offset + Entry::FULL_SIZE;
899 let entry_buf = &read_buf[entry_offset..entry_end];
900
901 let (entry1, entry2) = Self::parse_entries(entry_buf)?;
903
904 let (section, offset, added) =
906 Self::read_latest_entry(&entry1, &entry2).unwrap_or((0, 0, 0));
907
908 if added >= self.table_resize_frequency {
910 self.resizable -= 1;
911 }
912
913 let reset_entry = Entry::new(self.next_epoch, section, offset, 0);
915 Self::rewrite_entries(&mut writes, &entry1, &entry2, &reset_entry);
916 }
917
918 let old_write = self.table.write_at(writes.clone(), read_offset);
920 let new_offset = (old_size as usize * Entry::FULL_SIZE) as u64 + read_offset;
921 let new_write = self.table.write_at(writes, new_offset);
922 try_join(old_write, new_write).await?;
923
924 if chunk_end >= old_size {
926 self.table_size = old_size * 2;
928 self.table_resize_threshold = self.table_size as u64 * RESIZE_THRESHOLD / 100;
929 self.resize_progress = None;
930 debug!(
931 old = old_size,
932 new = self.table_size,
933 "table resize completed"
934 );
935 } else {
936 self.resize_progress = Some(chunk_end);
938 debug!(current = current_index, chunk_end, "table resize progress");
939 }
940
941 Ok(())
942 }
943
944 pub async fn sync(&mut self) -> Result<Checkpoint, Error> {
952 let mut updates = Vec::with_capacity(self.modified_sections.len());
954 for section in &self.modified_sections {
955 updates.push(self.journal.sync(*section));
956 }
957 try_join_all(updates).await?;
958 self.modified_sections.clear();
959
960 if self.should_resize() && self.resize_progress.is_none() {
962 self.start_resize().await?;
963 }
964
965 if self.resize_progress.is_some() {
967 self.advance_resize().await?;
968 }
969
970 self.table.sync().await?;
972 let stored_epoch = self.next_epoch;
973 self.next_epoch = self.next_epoch.checked_add(1).expect("epoch overflow");
974
975 Ok(Checkpoint {
976 epoch: stored_epoch,
977 section: self.current_section,
978 size: self.journal.size(self.current_section).await?,
979 table_size: self.table_size,
980 })
981 }
982
983 pub async fn close(mut self) -> Result<Checkpoint, Error> {
985 while self.resize_progress.is_some() {
987 self.advance_resize().await?;
988 }
989
990 let checkpoint = self.sync().await?;
992
993 self.journal.close().await?;
994 self.table.sync().await?;
995 Ok(checkpoint)
996 }
997
998 pub async fn destroy(self) -> Result<(), Error> {
1000 self.journal.destroy().await?;
1002
1003 drop(self.table);
1005 self.context
1006 .remove(&self.table_partition, Some(TABLE_BLOB_NAME))
1007 .await?;
1008 self.context.remove(&self.table_partition, None).await?;
1009
1010 Ok(())
1011 }
1012
1013 #[cfg(test)]
1017 pub fn resizing(&self) -> Option<u32> {
1018 self.resize_progress
1019 }
1020
1021 #[cfg(test)]
1023 pub fn resizable(&self) -> u32 {
1024 self.resizable
1025 }
1026}