1use super::{
44 fixed::{Config as FixedConfig, Journal as FixedJournal},
45 glob::{Config as GlobConfig, Glob},
46};
47use crate::journal::Error;
48use commonware_codec::{Codec, CodecFixed, CodecShared};
49use commonware_runtime::{Metrics, Storage};
50use futures::{future::try_join, stream::Stream};
51use std::{collections::HashSet, num::NonZeroUsize};
52use tracing::{debug, warn};
53
54pub trait Record: CodecFixed<Cfg = ()> + Clone {
59 fn value_location(&self) -> (u64, u32);
61
62 fn with_location(self, offset: u64, size: u32) -> Self;
66}
67
68#[derive(Clone)]
70pub struct Config<C> {
71 pub index_partition: String,
73
74 pub value_partition: String,
76
77 pub index_page_cache: commonware_runtime::buffer::paged::CacheRef,
79
80 pub index_write_buffer: NonZeroUsize,
82
83 pub value_write_buffer: NonZeroUsize,
85
86 pub compression: Option<u8>,
88
89 pub codec_config: C,
91}
92
93pub struct Oversized<E: Storage + Metrics, I: Record, V: Codec> {
98 index: FixedJournal<E, I>,
99 values: Glob<E, V>,
100}
101
102impl<E: Storage + Metrics, I: Record + Send + Sync, V: CodecShared> Oversized<E, I, V> {
103 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
109 let index_cfg = FixedConfig {
111 partition: cfg.index_partition,
112 page_cache: cfg.index_page_cache,
113 write_buffer: cfg.index_write_buffer,
114 };
115 let index = FixedJournal::init(context.with_label("index"), index_cfg).await?;
116
117 let value_cfg = GlobConfig {
118 partition: cfg.value_partition,
119 compression: cfg.compression,
120 codec_config: cfg.codec_config,
121 write_buffer: cfg.value_write_buffer,
122 };
123 let values = Glob::init(context.with_label("values"), value_cfg).await?;
124
125 let mut oversized = Self { index, values };
126
127 oversized.recover().await?;
129
130 Ok(oversized)
131 }
132
133 async fn recover(&mut self) -> Result<(), Error> {
139 let chunk_size = FixedJournal::<E, I>::CHUNK_SIZE as u64;
140 let sections: Vec<u64> = self.index.sections().collect();
141
142 for section in sections {
143 let index_size = self.index.size(section).await?;
144 if index_size == 0 {
145 continue;
146 }
147
148 let glob_size = match self.values.size(section).await {
149 Ok(size) => size,
150 Err(Error::AlreadyPrunedToSection(oldest)) => {
151 warn!(
156 section,
157 oldest, "index has section that glob already pruned"
158 );
159 0
160 }
161 Err(e) => return Err(e),
162 };
163
164 let entry_count = index_size / chunk_size;
166 let aligned_size = entry_count * chunk_size;
167 if aligned_size < index_size {
168 warn!(
169 section,
170 index_size, aligned_size, "trailing bytes detected: truncating"
171 );
172 self.index.rewind_section(section, aligned_size).await?;
173 }
174
175 if entry_count == 0 {
177 warn!(
178 section,
179 index_size, "trailing bytes detected: truncating to 0"
180 );
181 self.values.rewind_section(section, 0).await?;
182 continue;
183 }
184
185 let (valid_count, glob_target) = self
187 .find_last_valid_entry(section, entry_count, glob_size)
188 .await;
189
190 if valid_count < entry_count {
192 let valid_size = valid_count * chunk_size;
193 debug!(section, entry_count, valid_count, "rewinding index");
194 self.index.rewind_section(section, valid_size).await?;
195 }
196
197 if glob_size > glob_target {
200 debug!(
201 section,
202 glob_size, glob_target, "truncating glob trailing garbage"
203 );
204 self.values.rewind_section(section, glob_target).await?;
205 }
206 }
207
208 self.cleanup_orphan_value_sections().await?;
210
211 Ok(())
212 }
213
214 async fn cleanup_orphan_value_sections(&mut self) -> Result<(), Error> {
221 let index_sections: HashSet<u64> = self.index.sections().collect();
223
224 let orphan_sections: Vec<u64> = self
226 .values
227 .sections()
228 .filter(|s| !index_sections.contains(s))
229 .collect();
230
231 for section in orphan_sections {
233 warn!(section, "removing orphan value section");
234 self.values.remove_section(section).await?;
235 }
236
237 Ok(())
238 }
239
240 async fn find_last_valid_entry(
246 &self,
247 section: u64,
248 entry_count: u64,
249 glob_size: u64,
250 ) -> (u64, u64) {
251 for pos in (0..entry_count).rev() {
252 match self.index.get(section, pos).await {
253 Ok(entry) => {
254 let (offset, size) = entry.value_location();
255 let entry_end = offset.saturating_add(u64::from(size));
256 if entry_end <= glob_size {
257 return (pos + 1, entry_end);
258 }
259 if pos == entry_count - 1 {
260 warn!(
261 section,
262 pos, glob_size, entry_end, "invalid entry: glob truncated"
263 );
264 }
265 }
266 Err(_) => {
267 if pos == entry_count - 1 {
268 warn!(section, pos, "corrupted last entry, scanning backwards");
269 }
270 }
271 }
272 }
273 (0, 0)
274 }
275
276 pub async fn append(
285 &mut self,
286 section: u64,
287 entry: I,
288 value: &V,
289 ) -> Result<(u64, u64, u32), Error> {
290 let (offset, size) = self.values.append(section, value).await?;
293
294 let entry_with_location = entry.with_location(offset, size);
296 let position = self.index.append(section, entry_with_location).await?;
297
298 Ok((position, offset, size))
299 }
300
301 pub async fn get(&self, section: u64, position: u64) -> Result<I, Error> {
303 self.index.get(section, position).await
304 }
305
306 pub async fn last(&self, section: u64) -> Result<Option<I>, Error> {
308 self.index.last(section).await
309 }
310
311 pub async fn get_value(&self, section: u64, offset: u64, size: u32) -> Result<V, Error> {
315 self.values.get(section, offset, size).await
316 }
317
318 pub async fn replay(
322 &self,
323 start_section: u64,
324 start_position: u64,
325 buffer: NonZeroUsize,
326 ) -> Result<impl Stream<Item = Result<(u64, u64, I), Error>> + Send + '_, Error> {
327 self.index
328 .replay(start_section, start_position, buffer)
329 .await
330 }
331
332 pub async fn sync(&self, section: u64) -> Result<(), Error> {
334 try_join(self.index.sync(section), self.values.sync(section))
335 .await
336 .map(|_| ())
337 }
338
339 pub async fn sync_all(&self) -> Result<(), Error> {
341 try_join(self.index.sync_all(), self.values.sync_all())
342 .await
343 .map(|_| ())
344 }
345
346 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
352 let index_pruned = self.index.prune(min).await?;
353 let value_pruned = self.values.prune(min).await?;
354 Ok(index_pruned || value_pruned)
355 }
356
357 pub async fn rewind(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
362 self.index.rewind(section, index_size).await?;
364
365 let value_size = match self.index.last(section).await? {
367 Some(entry) => {
368 let (offset, size) = entry.value_location();
369 offset
370 .checked_add(u64::from(size))
371 .ok_or(Error::OffsetOverflow)?
372 }
373 None => 0,
374 };
375
376 self.values.rewind(section, value_size).await
378 }
379
380 pub async fn rewind_section(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
385 self.index.rewind_section(section, index_size).await?;
387
388 let value_size = match self.index.last(section).await? {
390 Some(entry) => {
391 let (offset, size) = entry.value_location();
392 offset
393 .checked_add(u64::from(size))
394 .ok_or(Error::OffsetOverflow)?
395 }
396 None => 0,
397 };
398
399 self.values.rewind_section(section, value_size).await
401 }
402
403 pub async fn size(&self, section: u64) -> Result<u64, Error> {
407 self.index.size(section).await
408 }
409
410 pub async fn value_size(&self, section: u64) -> Result<u64, Error> {
412 match self.index.last(section).await {
413 Ok(Some(entry)) => {
414 let (offset, size) = entry.value_location();
415 offset
416 .checked_add(u64::from(size))
417 .ok_or(Error::OffsetOverflow)
418 }
419 Ok(None) => Ok(0),
420 Err(Error::SectionOutOfRange(_)) => Ok(0),
421 Err(e) => Err(e),
422 }
423 }
424
425 pub fn oldest_section(&self) -> Option<u64> {
427 self.index.oldest_section()
428 }
429
430 pub fn newest_section(&self) -> Option<u64> {
432 self.index.newest_section()
433 }
434
435 pub async fn destroy(self) -> Result<(), Error> {
437 try_join(self.index.destroy(), self.values.destroy())
438 .await
439 .map(|_| ())
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use commonware_codec::{FixedSize, Read, ReadExt, Write};
447 use commonware_cryptography::Crc32;
448 use commonware_macros::test_traced;
449 use commonware_runtime::{
450 buffer::paged::CacheRef, deterministic, Blob as _, Buf, BufMut, Metrics, Runner,
451 };
452 use commonware_utils::{NZUsize, NZU16};
453
454 fn byte_end(offset: u64, size: u32) -> u64 {
456 offset + u64::from(size)
457 }
458
459 #[derive(Debug, Clone, PartialEq)]
461 struct TestEntry {
462 id: u64,
463 value_offset: u64,
464 value_size: u32,
465 }
466
467 impl TestEntry {
468 fn new(id: u64, value_offset: u64, value_size: u32) -> Self {
469 Self {
470 id,
471 value_offset,
472 value_size,
473 }
474 }
475 }
476
477 impl Write for TestEntry {
478 fn write(&self, buf: &mut impl BufMut) {
479 self.id.write(buf);
480 self.value_offset.write(buf);
481 self.value_size.write(buf);
482 }
483 }
484
485 impl Read for TestEntry {
486 type Cfg = ();
487
488 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
489 let id = u64::read(buf)?;
490 let value_offset = u64::read(buf)?;
491 let value_size = u32::read(buf)?;
492 Ok(Self {
493 id,
494 value_offset,
495 value_size,
496 })
497 }
498 }
499
500 impl FixedSize for TestEntry {
501 const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
502 }
503
504 impl Record for TestEntry {
505 fn value_location(&self) -> (u64, u32) {
506 (self.value_offset, self.value_size)
507 }
508
509 fn with_location(mut self, offset: u64, size: u32) -> Self {
510 self.value_offset = offset;
511 self.value_size = size;
512 self
513 }
514 }
515
516 fn test_cfg() -> Config<()> {
517 Config {
518 index_partition: "test_index".to_string(),
519 value_partition: "test_values".to_string(),
520 index_page_cache: CacheRef::new(NZU16!(64), NZUsize!(8)),
521 index_write_buffer: NZUsize!(1024),
522 value_write_buffer: NZUsize!(1024),
523 compression: None,
524 codec_config: (),
525 }
526 }
527
528 type TestValue = [u8; 16];
530
531 #[test_traced]
532 fn test_oversized_append_and_get() {
533 let executor = deterministic::Runner::default();
534 executor.start(|context| async move {
535 let mut oversized: Oversized<_, TestEntry, TestValue> =
536 Oversized::init(context.clone(), test_cfg())
537 .await
538 .expect("Failed to init");
539
540 let value: TestValue = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
542 let entry = TestEntry::new(42, 0, 0);
543 let (position, offset, size) = oversized
544 .append(1, entry, &value)
545 .await
546 .expect("Failed to append");
547
548 assert_eq!(position, 0);
549
550 let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
552 assert_eq!(retrieved_entry.id, 42);
553
554 let retrieved_value = oversized
556 .get_value(1, offset, size)
557 .await
558 .expect("Failed to get value");
559 assert_eq!(retrieved_value, value);
560
561 oversized.destroy().await.expect("Failed to destroy");
562 });
563 }
564
565 #[test_traced]
566 fn test_oversized_crash_recovery() {
567 let executor = deterministic::Runner::default();
568 executor.start(|context| async move {
569 let cfg = test_cfg();
570
571 let mut oversized: Oversized<_, TestEntry, TestValue> =
573 Oversized::init(context.with_label("first"), cfg.clone())
574 .await
575 .expect("Failed to init");
576
577 let mut locations = Vec::new();
579 for i in 0..5u8 {
580 let value: TestValue = [i; 16];
581 let entry = TestEntry::new(i as u64, 0, 0);
582 let (position, offset, size) = oversized
583 .append(1, entry, &value)
584 .await
585 .expect("Failed to append");
586 locations.push((position, offset, size));
587 }
588 oversized.sync(1).await.expect("Failed to sync");
589 drop(oversized);
590
591 let (blob, _) = context
593 .open(&cfg.value_partition, &1u64.to_be_bytes())
594 .await
595 .expect("Failed to open blob");
596
597 let keep_size = byte_end(locations[2].1, locations[2].2);
599 blob.resize(keep_size).await.expect("Failed to truncate");
600 blob.sync().await.expect("Failed to sync");
601 drop(blob);
602
603 let oversized: Oversized<_, TestEntry, TestValue> =
605 Oversized::init(context.with_label("second"), cfg.clone())
606 .await
607 .expect("Failed to reinit");
608
609 for i in 0..3u8 {
611 let (position, offset, size) = locations[i as usize];
612 let entry = oversized.get(1, position).await.expect("Failed to get");
613 assert_eq!(entry.id, i as u64);
614
615 let value = oversized
616 .get_value(1, offset, size)
617 .await
618 .expect("Failed to get value");
619 assert_eq!(value, [i; 16]);
620 }
621
622 let result = oversized.get(1, 3).await;
624 assert!(result.is_err());
625
626 oversized.destroy().await.expect("Failed to destroy");
627 });
628 }
629
630 #[test_traced]
631 fn test_oversized_persistence() {
632 let executor = deterministic::Runner::default();
633 executor.start(|context| async move {
634 let cfg = test_cfg();
635
636 let mut oversized: Oversized<_, TestEntry, TestValue> =
638 Oversized::init(context.with_label("first"), cfg.clone())
639 .await
640 .expect("Failed to init");
641
642 let value: TestValue = [42; 16];
643 let entry = TestEntry::new(123, 0, 0);
644 let (position, offset, size) = oversized
645 .append(1, entry, &value)
646 .await
647 .expect("Failed to append");
648 oversized.sync(1).await.expect("Failed to sync");
649 drop(oversized);
650
651 let oversized: Oversized<_, TestEntry, TestValue> =
653 Oversized::init(context.with_label("second"), cfg)
654 .await
655 .expect("Failed to reinit");
656
657 let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
658 assert_eq!(retrieved_entry.id, 123);
659
660 let retrieved_value = oversized
661 .get_value(1, offset, size)
662 .await
663 .expect("Failed to get value");
664 assert_eq!(retrieved_value, value);
665
666 oversized.destroy().await.expect("Failed to destroy");
667 });
668 }
669
670 #[test_traced]
671 fn test_oversized_prune() {
672 let executor = deterministic::Runner::default();
673 executor.start(|context| async move {
674 let mut oversized: Oversized<_, TestEntry, TestValue> =
675 Oversized::init(context.clone(), test_cfg())
676 .await
677 .expect("Failed to init");
678
679 for section in 1u64..=5 {
681 let value: TestValue = [section as u8; 16];
682 let entry = TestEntry::new(section, 0, 0);
683 oversized
684 .append(section, entry, &value)
685 .await
686 .expect("Failed to append");
687 oversized.sync(section).await.expect("Failed to sync");
688 }
689
690 oversized.prune(3).await.expect("Failed to prune");
692
693 assert!(oversized.get(1, 0).await.is_err());
695 assert!(oversized.get(2, 0).await.is_err());
696
697 assert!(oversized.get(3, 0).await.is_ok());
699 assert!(oversized.get(4, 0).await.is_ok());
700 assert!(oversized.get(5, 0).await.is_ok());
701
702 oversized.destroy().await.expect("Failed to destroy");
703 });
704 }
705
706 #[test_traced]
707 fn test_recovery_empty_section() {
708 let executor = deterministic::Runner::default();
709 executor.start(|context| async move {
710 let cfg = test_cfg();
711
712 let mut oversized: Oversized<_, TestEntry, TestValue> =
714 Oversized::init(context.with_label("first"), cfg.clone())
715 .await
716 .expect("Failed to init");
717
718 let value: TestValue = [42; 16];
720 let entry = TestEntry::new(1, 0, 0);
721 oversized
722 .append(2, entry, &value)
723 .await
724 .expect("Failed to append");
725 oversized.sync(2).await.expect("Failed to sync");
726 drop(oversized);
727
728 let oversized: Oversized<_, TestEntry, TestValue> =
730 Oversized::init(context.with_label("second"), cfg)
731 .await
732 .expect("Failed to reinit");
733
734 let entry = oversized.get(2, 0).await.expect("Failed to get");
736 assert_eq!(entry.id, 1);
737
738 oversized.destroy().await.expect("Failed to destroy");
739 });
740 }
741
742 #[test_traced]
743 fn test_recovery_all_entries_invalid() {
744 let executor = deterministic::Runner::default();
745 executor.start(|context| async move {
746 let cfg = test_cfg();
747
748 let mut oversized: Oversized<_, TestEntry, TestValue> =
750 Oversized::init(context.with_label("first"), cfg.clone())
751 .await
752 .expect("Failed to init");
753
754 for i in 0..5u8 {
756 let value: TestValue = [i; 16];
757 let entry = TestEntry::new(i as u64, 0, 0);
758 oversized
759 .append(1, entry, &value)
760 .await
761 .expect("Failed to append");
762 }
763 oversized.sync(1).await.expect("Failed to sync");
764 drop(oversized);
765
766 let (blob, _) = context
768 .open(&cfg.value_partition, &1u64.to_be_bytes())
769 .await
770 .expect("Failed to open blob");
771 blob.resize(0).await.expect("Failed to truncate");
772 blob.sync().await.expect("Failed to sync");
773 drop(blob);
774
775 let mut oversized: Oversized<_, TestEntry, TestValue> =
777 Oversized::init(context.with_label("second"), cfg)
778 .await
779 .expect("Failed to reinit");
780
781 let result = oversized.get(1, 0).await;
783 assert!(result.is_err());
784
785 let value: TestValue = [99; 16];
787 let entry = TestEntry::new(100, 0, 0);
788 let (pos, offset, size) = oversized
789 .append(1, entry, &value)
790 .await
791 .expect("Failed to append after recovery");
792 assert_eq!(pos, 0);
793
794 let retrieved = oversized.get(1, 0).await.expect("Failed to get");
795 assert_eq!(retrieved.id, 100);
796 let retrieved_value = oversized
797 .get_value(1, offset, size)
798 .await
799 .expect("Failed to get value");
800 assert_eq!(retrieved_value, value);
801
802 oversized.destroy().await.expect("Failed to destroy");
803 });
804 }
805
806 #[test_traced]
807 fn test_recovery_multiple_sections_mixed_validity() {
808 let executor = deterministic::Runner::default();
809 executor.start(|context| async move {
810 let cfg = test_cfg();
811
812 let mut oversized: Oversized<_, TestEntry, TestValue> =
814 Oversized::init(context.with_label("first"), cfg.clone())
815 .await
816 .expect("Failed to init");
817
818 let mut section1_locations = Vec::new();
820 for i in 0..3u8 {
821 let value: TestValue = [i; 16];
822 let entry = TestEntry::new(i as u64, 0, 0);
823 let loc = oversized
824 .append(1, entry, &value)
825 .await
826 .expect("Failed to append");
827 section1_locations.push(loc);
828 }
829 oversized.sync(1).await.expect("Failed to sync");
830
831 let mut section2_locations = Vec::new();
833 for i in 0..5u8 {
834 let value: TestValue = [10 + i; 16];
835 let entry = TestEntry::new(10 + i as u64, 0, 0);
836 let loc = oversized
837 .append(2, entry, &value)
838 .await
839 .expect("Failed to append");
840 section2_locations.push(loc);
841 }
842 oversized.sync(2).await.expect("Failed to sync");
843
844 for i in 0..2u8 {
846 let value: TestValue = [20 + i; 16];
847 let entry = TestEntry::new(20 + i as u64, 0, 0);
848 oversized
849 .append(3, entry, &value)
850 .await
851 .expect("Failed to append");
852 }
853 oversized.sync(3).await.expect("Failed to sync");
854 drop(oversized);
855
856 let (blob, _) = context
858 .open(&cfg.value_partition, &1u64.to_be_bytes())
859 .await
860 .expect("Failed to open blob");
861 let keep_size = byte_end(section1_locations[0].1, section1_locations[0].2);
862 blob.resize(keep_size).await.expect("Failed to truncate");
863 blob.sync().await.expect("Failed to sync");
864 drop(blob);
865
866 let (blob, _) = context
868 .open(&cfg.value_partition, &2u64.to_be_bytes())
869 .await
870 .expect("Failed to open blob");
871 let keep_size = byte_end(section2_locations[2].1, section2_locations[2].2);
872 blob.resize(keep_size).await.expect("Failed to truncate");
873 blob.sync().await.expect("Failed to sync");
874 drop(blob);
875
876 let oversized: Oversized<_, TestEntry, TestValue> =
880 Oversized::init(context.with_label("second"), cfg)
881 .await
882 .expect("Failed to reinit");
883
884 assert!(oversized.get(1, 0).await.is_ok());
886 assert!(oversized.get(1, 1).await.is_err());
887 assert!(oversized.get(1, 2).await.is_err());
888
889 assert!(oversized.get(2, 0).await.is_ok());
891 assert!(oversized.get(2, 1).await.is_ok());
892 assert!(oversized.get(2, 2).await.is_ok());
893 assert!(oversized.get(2, 3).await.is_err());
894 assert!(oversized.get(2, 4).await.is_err());
895
896 assert!(oversized.get(3, 0).await.is_ok());
898 assert!(oversized.get(3, 1).await.is_ok());
899
900 oversized.destroy().await.expect("Failed to destroy");
901 });
902 }
903
904 #[test_traced]
905 fn test_recovery_corrupted_last_index_entry() {
906 let executor = deterministic::Runner::default();
907 executor.start(|context| async move {
908 let cfg = Config {
912 index_partition: "test_index".to_string(),
913 value_partition: "test_values".to_string(),
914 index_page_cache: CacheRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
915 index_write_buffer: NZUsize!(1024),
916 value_write_buffer: NZUsize!(1024),
917 compression: None,
918 codec_config: (),
919 };
920
921 let mut oversized: Oversized<_, TestEntry, TestValue> =
923 Oversized::init(context.with_label("first"), cfg.clone())
924 .await
925 .expect("Failed to init");
926
927 for i in 0..5u8 {
929 let value: TestValue = [i; 16];
930 let entry = TestEntry::new(i as u64, 0, 0);
931 oversized
932 .append(1, entry, &value)
933 .await
934 .expect("Failed to append");
935 }
936 oversized.sync(1).await.expect("Failed to sync");
937 drop(oversized);
938
939 let (blob, size) = context
941 .open(&cfg.index_partition, &1u64.to_be_bytes())
942 .await
943 .expect("Failed to open blob");
944
945 assert_eq!(size, 160);
949 let last_page_crc_offset = size - 12;
950 blob.write_at(last_page_crc_offset, vec![0xFF; 12])
951 .await
952 .expect("Failed to corrupt");
953 blob.sync().await.expect("Failed to sync");
954 drop(blob);
955
956 let mut oversized: Oversized<_, TestEntry, TestValue> =
958 Oversized::init(context.with_label("second"), cfg)
959 .await
960 .expect("Failed to reinit");
961
962 for i in 0..4u8 {
964 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
965 assert_eq!(entry.id, i as u64);
966 }
967
968 assert!(oversized.get(1, 4).await.is_err());
970
971 let value: TestValue = [99; 16];
973 let entry = TestEntry::new(100, 0, 0);
974 let (pos, offset, size) = oversized
975 .append(1, entry, &value)
976 .await
977 .expect("Failed to append after recovery");
978 assert_eq!(pos, 4);
979
980 let retrieved = oversized.get(1, 4).await.expect("Failed to get");
981 assert_eq!(retrieved.id, 100);
982 let retrieved_value = oversized
983 .get_value(1, offset, size)
984 .await
985 .expect("Failed to get value");
986 assert_eq!(retrieved_value, value);
987
988 oversized.destroy().await.expect("Failed to destroy");
989 });
990 }
991
992 #[test_traced]
993 fn test_recovery_all_entries_valid() {
994 let executor = deterministic::Runner::default();
995 executor.start(|context| async move {
996 let cfg = test_cfg();
997
998 let mut oversized: Oversized<_, TestEntry, TestValue> =
1000 Oversized::init(context.with_label("first"), cfg.clone())
1001 .await
1002 .expect("Failed to init");
1003
1004 for section in 1u64..=3 {
1006 for i in 0..10u8 {
1007 let value: TestValue = [(section as u8) * 10 + i; 16];
1008 let entry = TestEntry::new(section * 100 + i as u64, 0, 0);
1009 oversized
1010 .append(section, entry, &value)
1011 .await
1012 .expect("Failed to append");
1013 }
1014 oversized.sync(section).await.expect("Failed to sync");
1015 }
1016 drop(oversized);
1017
1018 let oversized: Oversized<_, TestEntry, TestValue> =
1020 Oversized::init(context.with_label("second"), cfg)
1021 .await
1022 .expect("Failed to reinit");
1023
1024 for section in 1u64..=3 {
1026 for i in 0..10u8 {
1027 let entry = oversized
1028 .get(section, i as u64)
1029 .await
1030 .expect("Failed to get");
1031 assert_eq!(entry.id, section * 100 + i as u64);
1032 }
1033 }
1034
1035 oversized.destroy().await.expect("Failed to destroy");
1036 });
1037 }
1038
1039 #[test_traced]
1040 fn test_recovery_single_entry_invalid() {
1041 let executor = deterministic::Runner::default();
1042 executor.start(|context| async move {
1043 let cfg = test_cfg();
1044
1045 let mut oversized: Oversized<_, TestEntry, TestValue> =
1047 Oversized::init(context.with_label("first"), cfg.clone())
1048 .await
1049 .expect("Failed to init");
1050
1051 let value: TestValue = [42; 16];
1052 let entry = TestEntry::new(1, 0, 0);
1053 oversized
1054 .append(1, entry, &value)
1055 .await
1056 .expect("Failed to append");
1057 oversized.sync(1).await.expect("Failed to sync");
1058 drop(oversized);
1059
1060 let (blob, _) = context
1062 .open(&cfg.value_partition, &1u64.to_be_bytes())
1063 .await
1064 .expect("Failed to open blob");
1065 blob.resize(0).await.expect("Failed to truncate");
1066 blob.sync().await.expect("Failed to sync");
1067 drop(blob);
1068
1069 let oversized: Oversized<_, TestEntry, TestValue> =
1071 Oversized::init(context.with_label("second"), cfg)
1072 .await
1073 .expect("Failed to reinit");
1074
1075 assert!(oversized.get(1, 0).await.is_err());
1077
1078 oversized.destroy().await.expect("Failed to destroy");
1079 });
1080 }
1081
1082 #[test_traced]
1083 fn test_recovery_last_entry_off_by_one() {
1084 let executor = deterministic::Runner::default();
1085 executor.start(|context| async move {
1086 let cfg = test_cfg();
1087
1088 let mut oversized: Oversized<_, TestEntry, TestValue> =
1090 Oversized::init(context.with_label("first"), cfg.clone())
1091 .await
1092 .expect("Failed to init");
1093
1094 let mut locations = Vec::new();
1095 for i in 0..3u8 {
1096 let value: TestValue = [i; 16];
1097 let entry = TestEntry::new(i as u64, 0, 0);
1098 let loc = oversized
1099 .append(1, entry, &value)
1100 .await
1101 .expect("Failed to append");
1102 locations.push(loc);
1103 }
1104 oversized.sync(1).await.expect("Failed to sync");
1105 drop(oversized);
1106
1107 let (blob, _) = context
1109 .open(&cfg.value_partition, &1u64.to_be_bytes())
1110 .await
1111 .expect("Failed to open blob");
1112
1113 let last = &locations[2];
1116 let truncate_to = byte_end(last.1, last.2) - 1;
1117 blob.resize(truncate_to).await.expect("Failed to truncate");
1118 blob.sync().await.expect("Failed to sync");
1119 drop(blob);
1120
1121 let mut oversized: Oversized<_, TestEntry, TestValue> =
1123 Oversized::init(context.with_label("second"), cfg)
1124 .await
1125 .expect("Failed to reinit");
1126
1127 assert!(oversized.get(1, 0).await.is_ok());
1129 assert!(oversized.get(1, 1).await.is_ok());
1130
1131 assert!(oversized.get(1, 2).await.is_err());
1133
1134 let value: TestValue = [99; 16];
1136 let entry = TestEntry::new(100, 0, 0);
1137 let (pos, offset, size) = oversized
1138 .append(1, entry, &value)
1139 .await
1140 .expect("Failed to append after recovery");
1141 assert_eq!(pos, 2);
1142
1143 let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1144 assert_eq!(retrieved.id, 100);
1145 let retrieved_value = oversized
1146 .get_value(1, offset, size)
1147 .await
1148 .expect("Failed to get value");
1149 assert_eq!(retrieved_value, value);
1150
1151 oversized.destroy().await.expect("Failed to destroy");
1152 });
1153 }
1154
1155 #[test_traced]
1156 fn test_recovery_glob_missing_entirely() {
1157 let executor = deterministic::Runner::default();
1158 executor.start(|context| async move {
1159 let cfg = test_cfg();
1160
1161 let mut oversized: Oversized<_, TestEntry, TestValue> =
1163 Oversized::init(context.with_label("first"), cfg.clone())
1164 .await
1165 .expect("Failed to init");
1166
1167 for i in 0..3u8 {
1168 let value: TestValue = [i; 16];
1169 let entry = TestEntry::new(i as u64, 0, 0);
1170 oversized
1171 .append(1, entry, &value)
1172 .await
1173 .expect("Failed to append");
1174 }
1175 oversized.sync(1).await.expect("Failed to sync");
1176 drop(oversized);
1177
1178 context
1180 .remove(&cfg.value_partition, Some(&1u64.to_be_bytes()))
1181 .await
1182 .expect("Failed to remove");
1183
1184 let oversized: Oversized<_, TestEntry, TestValue> =
1186 Oversized::init(context.with_label("second"), cfg)
1187 .await
1188 .expect("Failed to reinit");
1189
1190 assert!(oversized.get(1, 0).await.is_err());
1192 assert!(oversized.get(1, 1).await.is_err());
1193 assert!(oversized.get(1, 2).await.is_err());
1194
1195 oversized.destroy().await.expect("Failed to destroy");
1196 });
1197 }
1198
1199 #[test_traced]
1200 fn test_recovery_can_append_after_recovery() {
1201 let executor = deterministic::Runner::default();
1202 executor.start(|context| async move {
1203 let cfg = test_cfg();
1204
1205 let mut oversized: Oversized<_, TestEntry, TestValue> =
1207 Oversized::init(context.with_label("first"), cfg.clone())
1208 .await
1209 .expect("Failed to init");
1210
1211 let mut locations = Vec::new();
1212 for i in 0..5u8 {
1213 let value: TestValue = [i; 16];
1214 let entry = TestEntry::new(i as u64, 0, 0);
1215 let loc = oversized
1216 .append(1, entry, &value)
1217 .await
1218 .expect("Failed to append");
1219 locations.push(loc);
1220 }
1221 oversized.sync(1).await.expect("Failed to sync");
1222 drop(oversized);
1223
1224 let (blob, _) = context
1226 .open(&cfg.value_partition, &1u64.to_be_bytes())
1227 .await
1228 .expect("Failed to open blob");
1229 let keep_size = byte_end(locations[1].1, locations[1].2);
1230 blob.resize(keep_size).await.expect("Failed to truncate");
1231 blob.sync().await.expect("Failed to sync");
1232 drop(blob);
1233
1234 let mut oversized: Oversized<_, TestEntry, TestValue> =
1236 Oversized::init(context.with_label("second"), cfg.clone())
1237 .await
1238 .expect("Failed to reinit");
1239
1240 assert!(oversized.get(1, 0).await.is_ok());
1242 assert!(oversized.get(1, 1).await.is_ok());
1243 assert!(oversized.get(1, 2).await.is_err());
1244
1245 for i in 10..15u8 {
1247 let value: TestValue = [i; 16];
1248 let entry = TestEntry::new(i as u64, 0, 0);
1249 oversized
1250 .append(1, entry, &value)
1251 .await
1252 .expect("Failed to append after recovery");
1253 }
1254 oversized.sync(1).await.expect("Failed to sync");
1255
1256 for i in 0..5u8 {
1258 let entry = oversized
1259 .get(1, 2 + i as u64)
1260 .await
1261 .expect("Failed to get new entry");
1262 assert_eq!(entry.id, (10 + i) as u64);
1263 }
1264
1265 oversized.destroy().await.expect("Failed to destroy");
1266 });
1267 }
1268
1269 #[test_traced]
1270 fn test_recovery_glob_pruned_but_index_not() {
1271 let executor = deterministic::Runner::default();
1272 executor.start(|context| async move {
1273 let cfg = test_cfg();
1274
1275 let mut oversized: Oversized<_, TestEntry, TestValue> =
1277 Oversized::init(context.with_label("first"), cfg.clone())
1278 .await
1279 .expect("Failed to init");
1280
1281 for section in 1u64..=3 {
1282 let value: TestValue = [section as u8; 16];
1283 let entry = TestEntry::new(section, 0, 0);
1284 oversized
1285 .append(section, entry, &value)
1286 .await
1287 .expect("Failed to append");
1288 oversized.sync(section).await.expect("Failed to sync");
1289 }
1290 drop(oversized);
1291
1292 use crate::journal::segmented::glob::{Config as GlobConfig, Glob};
1295 let glob_cfg = GlobConfig {
1296 partition: cfg.value_partition.clone(),
1297 compression: cfg.compression,
1298 codec_config: (),
1299 write_buffer: cfg.value_write_buffer,
1300 };
1301 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1302 .await
1303 .expect("Failed to init glob");
1304 glob.prune(2).await.expect("Failed to prune glob");
1305 glob.sync_all().await.expect("Failed to sync glob");
1306 drop(glob);
1307
1308 let oversized: Oversized<_, TestEntry, TestValue> =
1311 Oversized::init(context.with_label("second"), cfg.clone())
1312 .await
1313 .expect("Failed to reinit");
1314
1315 assert!(oversized.get(1, 0).await.is_err());
1317
1318 assert!(oversized.get(2, 0).await.is_ok());
1320 assert!(oversized.get(3, 0).await.is_ok());
1321
1322 oversized.destroy().await.expect("Failed to destroy");
1323 });
1324 }
1325
1326 #[test_traced]
1327 fn test_recovery_index_partition_deleted() {
1328 let executor = deterministic::Runner::default();
1329 executor.start(|context| async move {
1330 let cfg = test_cfg();
1331
1332 let mut oversized: Oversized<_, TestEntry, TestValue> =
1334 Oversized::init(context.with_label("first"), cfg.clone())
1335 .await
1336 .expect("Failed to init");
1337
1338 for section in 1u64..=3 {
1339 let value: TestValue = [section as u8; 16];
1340 let entry = TestEntry::new(section, 0, 0);
1341 oversized
1342 .append(section, entry, &value)
1343 .await
1344 .expect("Failed to append");
1345 oversized.sync(section).await.expect("Failed to sync");
1346 }
1347 drop(oversized);
1348
1349 context
1351 .remove(&cfg.index_partition, Some(&2u64.to_be_bytes()))
1352 .await
1353 .expect("Failed to remove index");
1354
1355 let oversized: Oversized<_, TestEntry, TestValue> =
1358 Oversized::init(context.with_label("second"), cfg.clone())
1359 .await
1360 .expect("Failed to reinit");
1361
1362 assert!(oversized.get(1, 0).await.is_ok());
1364 assert!(oversized.get(3, 0).await.is_ok());
1365
1366 assert!(oversized.get(2, 0).await.is_err());
1368
1369 oversized.destroy().await.expect("Failed to destroy");
1370 });
1371 }
1372
1373 #[test_traced]
1374 fn test_recovery_index_synced_but_glob_not() {
1375 let executor = deterministic::Runner::default();
1376 executor.start(|context| async move {
1377 let cfg = test_cfg();
1378
1379 let mut oversized: Oversized<_, TestEntry, TestValue> =
1381 Oversized::init(context.with_label("first"), cfg.clone())
1382 .await
1383 .expect("Failed to init");
1384
1385 let mut locations = Vec::new();
1387 for i in 0..3u8 {
1388 let value: TestValue = [i; 16];
1389 let entry = TestEntry::new(i as u64, 0, 0);
1390 let loc = oversized
1391 .append(1, entry, &value)
1392 .await
1393 .expect("Failed to append");
1394 locations.push(loc);
1395 }
1396 oversized.sync(1).await.expect("Failed to sync");
1397
1398 for i in 10..15u8 {
1400 let value: TestValue = [i; 16];
1401 let entry = TestEntry::new(i as u64, 0, 0);
1402 oversized
1403 .append(1, entry, &value)
1404 .await
1405 .expect("Failed to append");
1406 }
1407 drop(oversized);
1409
1410 let (blob, _) = context
1413 .open(&cfg.value_partition, &1u64.to_be_bytes())
1414 .await
1415 .expect("Failed to open blob");
1416 let synced_size = byte_end(locations[2].1, locations[2].2);
1417 blob.resize(synced_size).await.expect("Failed to truncate");
1418 blob.sync().await.expect("Failed to sync");
1419 drop(blob);
1420
1421 let oversized: Oversized<_, TestEntry, TestValue> =
1423 Oversized::init(context.with_label("second"), cfg)
1424 .await
1425 .expect("Failed to reinit");
1426
1427 for i in 0..3u8 {
1429 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1430 assert_eq!(entry.id, i as u64);
1431 }
1432
1433 assert!(oversized.get(1, 3).await.is_err());
1435
1436 oversized.destroy().await.expect("Failed to destroy");
1437 });
1438 }
1439
1440 #[test_traced]
1441 fn test_recovery_glob_synced_but_index_not() {
1442 let executor = deterministic::Runner::default();
1443 executor.start(|context| async move {
1444 let cfg = Config {
1448 index_partition: "test_index".to_string(),
1449 value_partition: "test_values".to_string(),
1450 index_page_cache: CacheRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
1451 index_write_buffer: NZUsize!(1024),
1452 value_write_buffer: NZUsize!(1024),
1453 compression: None,
1454 codec_config: (),
1455 };
1456
1457 let mut oversized: Oversized<_, TestEntry, TestValue> =
1459 Oversized::init(context.with_label("first"), cfg.clone())
1460 .await
1461 .expect("Failed to init");
1462
1463 let mut locations = Vec::new();
1465 for i in 0..3u8 {
1466 let value: TestValue = [i; 16];
1467 let entry = TestEntry::new(i as u64, 0, 0);
1468 let loc = oversized
1469 .append(1, entry, &value)
1470 .await
1471 .expect("Failed to append");
1472 locations.push(loc);
1473 }
1474 oversized.sync(1).await.expect("Failed to sync");
1475 drop(oversized);
1476
1477 let (blob, _size) = context
1480 .open(&cfg.index_partition, &1u64.to_be_bytes())
1481 .await
1482 .expect("Failed to open blob");
1483
1484 let physical_page_size = (TestEntry::SIZE + 12) as u64;
1487 blob.resize(2 * physical_page_size)
1488 .await
1489 .expect("Failed to truncate");
1490 blob.sync().await.expect("Failed to sync");
1491 drop(blob);
1492
1493 let mut oversized: Oversized<_, TestEntry, TestValue> =
1495 Oversized::init(context.with_label("second"), cfg.clone())
1496 .await
1497 .expect("Failed to reinit");
1498
1499 for i in 0..2u8 {
1501 let (position, offset, size) = locations[i as usize];
1502 let entry = oversized.get(1, position).await.expect("Failed to get");
1503 assert_eq!(entry.id, i as u64);
1504
1505 let value = oversized
1506 .get_value(1, offset, size)
1507 .await
1508 .expect("Failed to get value");
1509 assert_eq!(value, [i; 16]);
1510 }
1511
1512 assert!(oversized.get(1, 2).await.is_err());
1514
1515 let mut new_locations = Vec::new();
1517 for i in 10..13u8 {
1518 let value: TestValue = [i; 16];
1519 let entry = TestEntry::new(i as u64, 0, 0);
1520 let (position, offset, size) = oversized
1521 .append(1, entry, &value)
1522 .await
1523 .expect("Failed to append after recovery");
1524
1525 assert_eq!(position, (i - 10 + 2) as u64);
1527 new_locations.push((position, offset, size, i));
1528
1529 let retrieved = oversized.get(1, position).await.expect("Failed to get");
1531 assert_eq!(retrieved.id, i as u64);
1532
1533 let retrieved_value = oversized
1534 .get_value(1, offset, size)
1535 .await
1536 .expect("Failed to get value");
1537 assert_eq!(retrieved_value, value);
1538 }
1539
1540 oversized.sync(1).await.expect("Failed to sync");
1542 drop(oversized);
1543
1544 let oversized: Oversized<_, TestEntry, TestValue> =
1546 Oversized::init(context.with_label("third"), cfg)
1547 .await
1548 .expect("Failed to reinit after append");
1549
1550 for i in 0..2u8 {
1553 let (position, offset, size) = locations[i as usize];
1554 let entry = oversized.get(1, position).await.expect("Failed to get");
1555 assert_eq!(entry.id, i as u64);
1556
1557 let value = oversized
1558 .get_value(1, offset, size)
1559 .await
1560 .expect("Failed to get value");
1561 assert_eq!(value, [i; 16]);
1562 }
1563
1564 for (position, offset, size, expected_id) in &new_locations {
1566 let entry = oversized
1567 .get(1, *position)
1568 .await
1569 .expect("Failed to get new entry after restart");
1570 assert_eq!(entry.id, *expected_id as u64);
1571
1572 let value = oversized
1573 .get_value(1, *offset, *size)
1574 .await
1575 .expect("Failed to get new value after restart");
1576 assert_eq!(value, [*expected_id; 16]);
1577 }
1578
1579 assert!(oversized.get(1, 4).await.is_ok());
1581 assert!(oversized.get(1, 5).await.is_err());
1582
1583 oversized.destroy().await.expect("Failed to destroy");
1584 });
1585 }
1586
1587 #[test_traced]
1588 fn test_recovery_partial_index_entry() {
1589 let executor = deterministic::Runner::default();
1590 executor.start(|context| async move {
1591 let cfg = test_cfg();
1592
1593 let mut oversized: Oversized<_, TestEntry, TestValue> =
1595 Oversized::init(context.with_label("first"), cfg.clone())
1596 .await
1597 .expect("Failed to init");
1598
1599 for i in 0..3u8 {
1601 let value: TestValue = [i; 16];
1602 let entry = TestEntry::new(i as u64, 0, 0);
1603 oversized
1604 .append(1, entry, &value)
1605 .await
1606 .expect("Failed to append");
1607 }
1608 oversized.sync(1).await.expect("Failed to sync");
1609 drop(oversized);
1610
1611 let (blob, _) = context
1615 .open(&cfg.index_partition, &1u64.to_be_bytes())
1616 .await
1617 .expect("Failed to open blob");
1618 let partial_size = 3 * 24 + 10; blob.resize(partial_size).await.expect("Failed to resize");
1620 blob.sync().await.expect("Failed to sync");
1621 drop(blob);
1622
1623 let mut oversized: Oversized<_, TestEntry, TestValue> =
1625 Oversized::init(context.with_label("second"), cfg.clone())
1626 .await
1627 .expect("Failed to reinit");
1628
1629 for i in 0..3u8 {
1631 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1632 assert_eq!(entry.id, i as u64);
1633 }
1634
1635 assert!(oversized.get(1, 3).await.is_err());
1637
1638 let value: TestValue = [42; 16];
1640 let entry = TestEntry::new(100, 0, 0);
1641 let (pos, offset, size) = oversized
1642 .append(1, entry, &value)
1643 .await
1644 .expect("Failed to append after recovery");
1645 assert_eq!(pos, 3);
1646
1647 let retrieved = oversized.get(1, 3).await.expect("Failed to get new entry");
1649 assert_eq!(retrieved.id, 100);
1650 let retrieved_value = oversized
1651 .get_value(1, offset, size)
1652 .await
1653 .expect("Failed to get new value");
1654 assert_eq!(retrieved_value, value);
1655
1656 oversized.destroy().await.expect("Failed to destroy");
1657 });
1658 }
1659
1660 #[test_traced]
1661 fn test_recovery_only_partial_entry() {
1662 let executor = deterministic::Runner::default();
1663 executor.start(|context| async move {
1664 let cfg = test_cfg();
1665
1666 let mut oversized: Oversized<_, TestEntry, TestValue> =
1668 Oversized::init(context.with_label("first"), cfg.clone())
1669 .await
1670 .expect("Failed to init");
1671
1672 let value: TestValue = [42; 16];
1673 let entry = TestEntry::new(1, 0, 0);
1674 oversized
1675 .append(1, entry, &value)
1676 .await
1677 .expect("Failed to append");
1678 oversized.sync(1).await.expect("Failed to sync");
1679 drop(oversized);
1680
1681 let (blob, _) = context
1683 .open(&cfg.index_partition, &1u64.to_be_bytes())
1684 .await
1685 .expect("Failed to open blob");
1686 blob.resize(10).await.expect("Failed to resize"); blob.sync().await.expect("Failed to sync");
1688 drop(blob);
1689
1690 let mut oversized: Oversized<_, TestEntry, TestValue> =
1692 Oversized::init(context.with_label("second"), cfg.clone())
1693 .await
1694 .expect("Failed to reinit");
1695
1696 assert!(oversized.get(1, 0).await.is_err());
1698
1699 let value: TestValue = [99; 16];
1701 let entry = TestEntry::new(100, 0, 0);
1702 let (pos, offset, size) = oversized
1703 .append(1, entry, &value)
1704 .await
1705 .expect("Failed to append after recovery");
1706 assert_eq!(pos, 0);
1707
1708 let retrieved = oversized.get(1, 0).await.expect("Failed to get");
1709 assert_eq!(retrieved.id, 100);
1710 let retrieved_value = oversized
1711 .get_value(1, offset, size)
1712 .await
1713 .expect("Failed to get value");
1714 assert_eq!(retrieved_value, value);
1715
1716 oversized.destroy().await.expect("Failed to destroy");
1717 });
1718 }
1719
1720 #[test_traced]
1721 fn test_recovery_crash_during_rewind_index_ahead() {
1722 let executor = deterministic::Runner::default();
1724 executor.start(|context| async move {
1725 let cfg = Config {
1729 index_partition: "test_index".to_string(),
1730 value_partition: "test_values".to_string(),
1731 index_page_cache: CacheRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
1732 index_write_buffer: NZUsize!(1024),
1733 value_write_buffer: NZUsize!(1024),
1734 compression: None,
1735 codec_config: (),
1736 };
1737
1738 let mut oversized: Oversized<_, TestEntry, TestValue> =
1740 Oversized::init(context.with_label("first"), cfg.clone())
1741 .await
1742 .expect("Failed to init");
1743
1744 let mut locations = Vec::new();
1745 for i in 0..5u8 {
1746 let value: TestValue = [i; 16];
1747 let entry = TestEntry::new(i as u64, 0, 0);
1748 let loc = oversized
1749 .append(1, entry, &value)
1750 .await
1751 .expect("Failed to append");
1752 locations.push(loc);
1753 }
1754 oversized.sync(1).await.expect("Failed to sync");
1755 drop(oversized);
1756
1757 let (blob, _) = context
1760 .open(&cfg.index_partition, &1u64.to_be_bytes())
1761 .await
1762 .expect("Failed to open blob");
1763 let physical_page_size = (TestEntry::SIZE + 12) as u64;
1765 blob.resize(2 * physical_page_size)
1766 .await
1767 .expect("Failed to truncate");
1768 blob.sync().await.expect("Failed to sync");
1769 drop(blob);
1770
1771 let mut oversized: Oversized<_, TestEntry, TestValue> =
1773 Oversized::init(context.with_label("second"), cfg.clone())
1774 .await
1775 .expect("Failed to reinit");
1776
1777 for i in 0..2u8 {
1779 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1780 assert_eq!(entry.id, i as u64);
1781 }
1782
1783 assert!(oversized.get(1, 2).await.is_err());
1785
1786 let (pos, _, _) = oversized
1788 .append(1, TestEntry::new(100, 0, 0), &[100u8; 16])
1789 .await
1790 .expect("Failed to append");
1791 assert_eq!(pos, 2);
1792
1793 oversized.destroy().await.expect("Failed to destroy");
1794 });
1795 }
1796
1797 #[test_traced]
1798 fn test_recovery_crash_during_rewind_glob_ahead() {
1799 let executor = deterministic::Runner::default();
1801 executor.start(|context| async move {
1802 let cfg = test_cfg();
1803
1804 let mut oversized: Oversized<_, TestEntry, TestValue> =
1806 Oversized::init(context.with_label("first"), cfg.clone())
1807 .await
1808 .expect("Failed to init");
1809
1810 let mut locations = Vec::new();
1811 for i in 0..5u8 {
1812 let value: TestValue = [i; 16];
1813 let entry = TestEntry::new(i as u64, 0, 0);
1814 let loc = oversized
1815 .append(1, entry, &value)
1816 .await
1817 .expect("Failed to append");
1818 locations.push(loc);
1819 }
1820 oversized.sync(1).await.expect("Failed to sync");
1821 drop(oversized);
1822
1823 let (blob, _) = context
1826 .open(&cfg.value_partition, &1u64.to_be_bytes())
1827 .await
1828 .expect("Failed to open blob");
1829 let keep_size = byte_end(locations[1].1, locations[1].2);
1830 blob.resize(keep_size).await.expect("Failed to truncate");
1831 blob.sync().await.expect("Failed to sync");
1832 drop(blob);
1833
1834 let mut oversized: Oversized<_, TestEntry, TestValue> =
1836 Oversized::init(context.with_label("second"), cfg.clone())
1837 .await
1838 .expect("Failed to reinit");
1839
1840 for i in 0..2u8 {
1842 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1843 assert_eq!(entry.id, i as u64);
1844 }
1845
1846 assert!(oversized.get(1, 2).await.is_err());
1848
1849 let value: TestValue = [99; 16];
1851 let entry = TestEntry::new(100, 0, 0);
1852 let (pos, offset, size) = oversized
1853 .append(1, entry, &value)
1854 .await
1855 .expect("Failed to append after recovery");
1856 assert_eq!(pos, 2);
1857
1858 let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1859 assert_eq!(retrieved.id, 100);
1860 let retrieved_value = oversized
1861 .get_value(1, offset, size)
1862 .await
1863 .expect("Failed to get value");
1864 assert_eq!(retrieved_value, value);
1865
1866 oversized.destroy().await.expect("Failed to destroy");
1867 });
1868 }
1869
1870 #[test_traced]
1871 fn test_oversized_get_value_invalid_size() {
1872 let executor = deterministic::Runner::default();
1873 executor.start(|context| async move {
1874 let mut oversized: Oversized<_, TestEntry, TestValue> =
1875 Oversized::init(context.clone(), test_cfg())
1876 .await
1877 .expect("Failed to init");
1878
1879 let value: TestValue = [42; 16];
1880 let entry = TestEntry::new(1, 0, 0);
1881 let (_, offset, _size) = oversized
1882 .append(1, entry, &value)
1883 .await
1884 .expect("Failed to append");
1885 oversized.sync(1).await.expect("Failed to sync");
1886
1887 assert!(oversized.get_value(1, offset, 0).await.is_err());
1889
1890 for size in 1..4u32 {
1893 let result = oversized.get_value(1, offset, size).await;
1894 assert!(
1895 matches!(
1896 result,
1897 Err(Error::Codec(_))
1898 | Err(Error::ChecksumMismatch(_, _))
1899 | Err(Error::Runtime(_))
1900 ),
1901 "expected error, got: {:?}",
1902 result
1903 );
1904 }
1905
1906 oversized.destroy().await.expect("Failed to destroy");
1907 });
1908 }
1909
1910 #[test_traced]
1911 fn test_oversized_get_value_wrong_size() {
1912 let executor = deterministic::Runner::default();
1913 executor.start(|context| async move {
1914 let mut oversized: Oversized<_, TestEntry, TestValue> =
1915 Oversized::init(context.clone(), test_cfg())
1916 .await
1917 .expect("Failed to init");
1918
1919 let value: TestValue = [42; 16];
1920 let entry = TestEntry::new(1, 0, 0);
1921 let (_, offset, correct_size) = oversized
1922 .append(1, entry, &value)
1923 .await
1924 .expect("Failed to append");
1925 oversized.sync(1).await.expect("Failed to sync");
1926
1927 let result = oversized.get_value(1, offset, correct_size - 1).await;
1930 assert!(
1931 matches!(
1932 result,
1933 Err(Error::Codec(_)) | Err(Error::ChecksumMismatch(_, _))
1934 ),
1935 "expected Codec or ChecksumMismatch error, got: {:?}",
1936 result
1937 );
1938
1939 oversized.destroy().await.expect("Failed to destroy");
1940 });
1941 }
1942
1943 #[test_traced]
1944 fn test_recovery_values_has_orphan_section() {
1945 let executor = deterministic::Runner::default();
1946 executor.start(|context| async move {
1947 let cfg = test_cfg();
1948
1949 let mut oversized: Oversized<_, TestEntry, TestValue> =
1951 Oversized::init(context.with_label("first"), cfg.clone())
1952 .await
1953 .expect("Failed to init");
1954
1955 for section in 1u64..=2 {
1956 let value: TestValue = [section as u8; 16];
1957 let entry = TestEntry::new(section, 0, 0);
1958 oversized
1959 .append(section, entry, &value)
1960 .await
1961 .expect("Failed to append");
1962 oversized.sync(section).await.expect("Failed to sync");
1963 }
1964 drop(oversized);
1965
1966 let glob_cfg = GlobConfig {
1968 partition: cfg.value_partition.clone(),
1969 compression: cfg.compression,
1970 codec_config: (),
1971 write_buffer: cfg.value_write_buffer,
1972 };
1973 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1974 .await
1975 .expect("Failed to init glob");
1976 let orphan_value: TestValue = [99; 16];
1977 glob.append(3, &orphan_value)
1978 .await
1979 .expect("Failed to append orphan");
1980 glob.sync(3).await.expect("Failed to sync glob");
1981 drop(glob);
1982
1983 let oversized: Oversized<_, TestEntry, TestValue> =
1985 Oversized::init(context.with_label("second"), cfg.clone())
1986 .await
1987 .expect("Failed to reinit");
1988
1989 assert!(oversized.get(1, 0).await.is_ok());
1991 assert!(oversized.get(2, 0).await.is_ok());
1992
1993 assert_eq!(oversized.newest_section(), Some(2));
1995
1996 oversized.destroy().await.expect("Failed to destroy");
1997 });
1998 }
1999
2000 #[test_traced]
2001 fn test_recovery_values_has_multiple_orphan_sections() {
2002 let executor = deterministic::Runner::default();
2003 executor.start(|context| async move {
2004 let cfg = test_cfg();
2005
2006 let mut oversized: Oversized<_, TestEntry, TestValue> =
2008 Oversized::init(context.with_label("first"), cfg.clone())
2009 .await
2010 .expect("Failed to init");
2011
2012 let value: TestValue = [1; 16];
2013 let entry = TestEntry::new(1, 0, 0);
2014 oversized
2015 .append(1, entry, &value)
2016 .await
2017 .expect("Failed to append");
2018 oversized.sync(1).await.expect("Failed to sync");
2019 drop(oversized);
2020
2021 let glob_cfg = GlobConfig {
2023 partition: cfg.value_partition.clone(),
2024 compression: cfg.compression,
2025 codec_config: (),
2026 write_buffer: cfg.value_write_buffer,
2027 };
2028 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2029 .await
2030 .expect("Failed to init glob");
2031
2032 for section in 2u64..=4 {
2033 let orphan_value: TestValue = [section as u8; 16];
2034 glob.append(section, &orphan_value)
2035 .await
2036 .expect("Failed to append orphan");
2037 glob.sync(section).await.expect("Failed to sync glob");
2038 }
2039 drop(glob);
2040
2041 let oversized: Oversized<_, TestEntry, TestValue> =
2043 Oversized::init(context.with_label("second"), cfg.clone())
2044 .await
2045 .expect("Failed to reinit");
2046
2047 assert!(oversized.get(1, 0).await.is_ok());
2049
2050 assert_eq!(oversized.newest_section(), Some(1));
2052
2053 oversized.destroy().await.expect("Failed to destroy");
2054 });
2055 }
2056
2057 #[test_traced]
2058 fn test_recovery_index_empty_but_values_exist() {
2059 let executor = deterministic::Runner::default();
2060 executor.start(|context| async move {
2061 let cfg = test_cfg();
2062
2063 let glob_cfg = GlobConfig {
2065 partition: cfg.value_partition.clone(),
2066 compression: cfg.compression,
2067 codec_config: (),
2068 write_buffer: cfg.value_write_buffer,
2069 };
2070 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2071 .await
2072 .expect("Failed to init glob");
2073
2074 for section in 1u64..=3 {
2075 let orphan_value: TestValue = [section as u8; 16];
2076 glob.append(section, &orphan_value)
2077 .await
2078 .expect("Failed to append orphan");
2079 glob.sync(section).await.expect("Failed to sync glob");
2080 }
2081 drop(glob);
2082
2083 let oversized: Oversized<_, TestEntry, TestValue> =
2085 Oversized::init(context.with_label("first"), cfg.clone())
2086 .await
2087 .expect("Failed to init");
2088
2089 assert_eq!(oversized.newest_section(), None);
2091 assert_eq!(oversized.oldest_section(), None);
2092
2093 oversized.destroy().await.expect("Failed to destroy");
2094 });
2095 }
2096
2097 #[test_traced]
2098 fn test_recovery_orphan_section_append_after() {
2099 let executor = deterministic::Runner::default();
2100 executor.start(|context| async move {
2101 let cfg = test_cfg();
2102
2103 let mut oversized: Oversized<_, TestEntry, TestValue> =
2105 Oversized::init(context.with_label("first"), cfg.clone())
2106 .await
2107 .expect("Failed to init");
2108
2109 let value: TestValue = [1; 16];
2110 let entry = TestEntry::new(1, 0, 0);
2111 let (_, offset1, size1) = oversized
2112 .append(1, entry, &value)
2113 .await
2114 .expect("Failed to append");
2115 oversized.sync(1).await.expect("Failed to sync");
2116 drop(oversized);
2117
2118 let glob_cfg = GlobConfig {
2120 partition: cfg.value_partition.clone(),
2121 compression: cfg.compression,
2122 codec_config: (),
2123 write_buffer: cfg.value_write_buffer,
2124 };
2125 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2126 .await
2127 .expect("Failed to init glob");
2128
2129 for section in 2u64..=3 {
2130 let orphan_value: TestValue = [section as u8; 16];
2131 glob.append(section, &orphan_value)
2132 .await
2133 .expect("Failed to append orphan");
2134 glob.sync(section).await.expect("Failed to sync glob");
2135 }
2136 drop(glob);
2137
2138 let mut oversized: Oversized<_, TestEntry, TestValue> =
2140 Oversized::init(context.with_label("second"), cfg.clone())
2141 .await
2142 .expect("Failed to reinit");
2143
2144 let entry = oversized.get(1, 0).await.expect("Failed to get");
2146 assert_eq!(entry.id, 1);
2147 let value = oversized
2148 .get_value(1, offset1, size1)
2149 .await
2150 .expect("Failed to get value");
2151 assert_eq!(value, [1; 16]);
2152
2153 let new_value: TestValue = [42; 16];
2155 let new_entry = TestEntry::new(42, 0, 0);
2156 let (pos, offset, size) = oversized
2157 .append(2, new_entry, &new_value)
2158 .await
2159 .expect("Failed to append after recovery");
2160 assert_eq!(pos, 0);
2161
2162 let retrieved = oversized.get(2, 0).await.expect("Failed to get");
2164 assert_eq!(retrieved.id, 42);
2165 let retrieved_value = oversized
2166 .get_value(2, offset, size)
2167 .await
2168 .expect("Failed to get value");
2169 assert_eq!(retrieved_value, new_value);
2170
2171 oversized.sync(2).await.expect("Failed to sync");
2173 drop(oversized);
2174
2175 let oversized: Oversized<_, TestEntry, TestValue> =
2176 Oversized::init(context.with_label("third"), cfg)
2177 .await
2178 .expect("Failed to reinit after append");
2179
2180 assert!(oversized.get(1, 0).await.is_ok());
2182 assert!(oversized.get(2, 0).await.is_ok());
2183 assert_eq!(oversized.newest_section(), Some(2));
2184
2185 oversized.destroy().await.expect("Failed to destroy");
2186 });
2187 }
2188
2189 #[test_traced]
2190 fn test_recovery_no_orphan_sections() {
2191 let executor = deterministic::Runner::default();
2192 executor.start(|context| async move {
2193 let cfg = test_cfg();
2194
2195 let mut oversized: Oversized<_, TestEntry, TestValue> =
2197 Oversized::init(context.with_label("first"), cfg.clone())
2198 .await
2199 .expect("Failed to init");
2200
2201 for section in 1u64..=3 {
2202 let value: TestValue = [section as u8; 16];
2203 let entry = TestEntry::new(section, 0, 0);
2204 oversized
2205 .append(section, entry, &value)
2206 .await
2207 .expect("Failed to append");
2208 oversized.sync(section).await.expect("Failed to sync");
2209 }
2210 drop(oversized);
2211
2212 let oversized: Oversized<_, TestEntry, TestValue> =
2214 Oversized::init(context.with_label("second"), cfg)
2215 .await
2216 .expect("Failed to reinit");
2217
2218 for section in 1u64..=3 {
2220 let entry = oversized.get(section, 0).await.expect("Failed to get");
2221 assert_eq!(entry.id, section);
2222 }
2223 assert_eq!(oversized.newest_section(), Some(3));
2224
2225 oversized.destroy().await.expect("Failed to destroy");
2226 });
2227 }
2228
2229 #[test_traced]
2230 fn test_recovery_orphan_with_empty_index_section() {
2231 let executor = deterministic::Runner::default();
2232 executor.start(|context| async move {
2233 let cfg = test_cfg();
2234
2235 let mut oversized: Oversized<_, TestEntry, TestValue> =
2237 Oversized::init(context.with_label("first"), cfg.clone())
2238 .await
2239 .expect("Failed to init");
2240
2241 let value: TestValue = [1; 16];
2242 let entry = TestEntry::new(1, 0, 0);
2243 oversized
2244 .append(1, entry, &value)
2245 .await
2246 .expect("Failed to append");
2247 oversized.sync(1).await.expect("Failed to sync");
2248 drop(oversized);
2249
2250 let glob_cfg = GlobConfig {
2252 partition: cfg.value_partition.clone(),
2253 compression: cfg.compression,
2254 codec_config: (),
2255 write_buffer: cfg.value_write_buffer,
2256 };
2257 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2258 .await
2259 .expect("Failed to init glob");
2260 let orphan_value: TestValue = [2; 16];
2261 glob.append(2, &orphan_value)
2262 .await
2263 .expect("Failed to append orphan");
2264 glob.sync(2).await.expect("Failed to sync glob");
2265 drop(glob);
2266
2267 let (blob, _) = context
2269 .open(&cfg.index_partition, &1u64.to_be_bytes())
2270 .await
2271 .expect("Failed to open blob");
2272 blob.resize(0).await.expect("Failed to truncate");
2273 blob.sync().await.expect("Failed to sync");
2274 drop(blob);
2275
2276 let oversized: Oversized<_, TestEntry, TestValue> =
2278 Oversized::init(context.with_label("second"), cfg)
2279 .await
2280 .expect("Failed to reinit");
2281
2282 assert!(oversized.get(1, 0).await.is_err());
2284
2285 assert_eq!(oversized.newest_section(), Some(1));
2287
2288 oversized.destroy().await.expect("Failed to destroy");
2289 });
2290 }
2291
2292 #[test_traced]
2293 fn test_recovery_orphan_sections_with_gaps() {
2294 let executor = deterministic::Runner::default();
2297 executor.start(|context| async move {
2298 let cfg = test_cfg();
2299
2300 let mut oversized: Oversized<_, TestEntry, TestValue> =
2302 Oversized::init(context.with_label("first"), cfg.clone())
2303 .await
2304 .expect("Failed to init");
2305
2306 for section in [1u64, 3, 5] {
2307 let value: TestValue = [section as u8; 16];
2308 let entry = TestEntry::new(section, 0, 0);
2309 oversized
2310 .append(section, entry, &value)
2311 .await
2312 .expect("Failed to append");
2313 oversized.sync(section).await.expect("Failed to sync");
2314 }
2315 drop(oversized);
2316
2317 let glob_cfg = GlobConfig {
2319 partition: cfg.value_partition.clone(),
2320 compression: cfg.compression,
2321 codec_config: (),
2322 write_buffer: cfg.value_write_buffer,
2323 };
2324 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2325 .await
2326 .expect("Failed to init glob");
2327
2328 for section in [2u64, 4, 6] {
2329 let orphan_value: TestValue = [section as u8; 16];
2330 glob.append(section, &orphan_value)
2331 .await
2332 .expect("Failed to append orphan");
2333 glob.sync(section).await.expect("Failed to sync glob");
2334 }
2335 drop(glob);
2336
2337 let oversized: Oversized<_, TestEntry, TestValue> =
2339 Oversized::init(context.with_label("second"), cfg)
2340 .await
2341 .expect("Failed to reinit");
2342
2343 for section in [1u64, 3, 5] {
2345 let entry = oversized.get(section, 0).await.expect("Failed to get");
2346 assert_eq!(entry.id, section);
2347 }
2348
2349 assert_eq!(oversized.oldest_section(), Some(1));
2351 assert_eq!(oversized.newest_section(), Some(5));
2352
2353 oversized.destroy().await.expect("Failed to destroy");
2354 });
2355 }
2356
2357 #[test_traced]
2358 fn test_recovery_glob_trailing_garbage_truncated() {
2359 let executor = deterministic::Runner::default();
2363 executor.start(|context| async move {
2364 let cfg = test_cfg();
2365
2366 let mut oversized: Oversized<_, TestEntry, TestValue> =
2368 Oversized::init(context.with_label("first"), cfg.clone())
2369 .await
2370 .expect("Failed to init");
2371
2372 let mut locations = Vec::new();
2374 for i in 0..2u8 {
2375 let value: TestValue = [i; 16];
2376 let entry = TestEntry::new(i as u64, 0, 0);
2377 let loc = oversized
2378 .append(1, entry, &value)
2379 .await
2380 .expect("Failed to append");
2381 locations.push(loc);
2382 }
2383 oversized.sync(1).await.expect("Failed to sync");
2384
2385 let expected_next_offset = byte_end(locations[1].1, locations[1].2);
2387 drop(oversized);
2388
2389 let (blob, size) = context
2391 .open(&cfg.value_partition, &1u64.to_be_bytes())
2392 .await
2393 .expect("Failed to open blob");
2394 assert_eq!(size, expected_next_offset);
2395
2396 let garbage = vec![0xDE; 100];
2398 blob.write_at(size, garbage)
2399 .await
2400 .expect("Failed to write garbage");
2401 blob.sync().await.expect("Failed to sync");
2402 drop(blob);
2403
2404 let (blob, new_size) = context
2406 .open(&cfg.value_partition, &1u64.to_be_bytes())
2407 .await
2408 .expect("Failed to open blob");
2409 assert_eq!(new_size, expected_next_offset + 100);
2410 drop(blob);
2411
2412 let mut oversized: Oversized<_, TestEntry, TestValue> =
2414 Oversized::init(context.with_label("second"), cfg.clone())
2415 .await
2416 .expect("Failed to reinit");
2417
2418 for i in 0..2u8 {
2420 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2421 assert_eq!(entry.id, i as u64);
2422 }
2423
2424 let new_value: TestValue = [99; 16];
2426 let new_entry = TestEntry::new(99, 0, 0);
2427 let (pos, offset, _size) = oversized
2428 .append(1, new_entry, &new_value)
2429 .await
2430 .expect("Failed to append after recovery");
2431
2432 assert_eq!(pos, 2);
2434
2435 assert_eq!(offset, expected_next_offset);
2437
2438 let retrieved = oversized.get(1, 2).await.expect("Failed to get new entry");
2440 assert_eq!(retrieved.id, 99);
2441
2442 oversized.destroy().await.expect("Failed to destroy");
2443 });
2444 }
2445
2446 #[test_traced]
2447 fn test_recovery_entry_with_overflow_offset() {
2448 let executor = deterministic::Runner::default();
2451 executor.start(|context| async move {
2452 let cfg = Config {
2454 index_partition: "test_index".to_string(),
2455 value_partition: "test_values".to_string(),
2456 index_page_cache: CacheRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
2457 index_write_buffer: NZUsize!(1024),
2458 value_write_buffer: NZUsize!(1024),
2459 compression: None,
2460 codec_config: (),
2461 };
2462
2463 let mut oversized: Oversized<_, TestEntry, TestValue> =
2465 Oversized::init(context.with_label("first"), cfg.clone())
2466 .await
2467 .expect("Failed to init");
2468
2469 let value: TestValue = [1; 16];
2470 let entry = TestEntry::new(1, 0, 0);
2471 oversized
2472 .append(1, entry, &value)
2473 .await
2474 .expect("Failed to append");
2475 oversized.sync(1).await.expect("Failed to sync");
2476 drop(oversized);
2477
2478 let (blob, _) = context
2482 .open(&cfg.index_partition, &1u64.to_be_bytes())
2483 .await
2484 .expect("Failed to open blob");
2485
2486 let mut entry_data = Vec::new();
2488 1u64.write(&mut entry_data); (u64::MAX - 10).write(&mut entry_data); 100u32.write(&mut entry_data); assert_eq!(entry_data.len(), TestEntry::SIZE);
2492
2493 let crc = Crc32::checksum(&entry_data);
2496 let len1 = TestEntry::SIZE as u16;
2497 let mut crc_record = Vec::new();
2498 crc_record.extend_from_slice(&len1.to_be_bytes()); crc_record.extend_from_slice(&crc.to_be_bytes()); crc_record.extend_from_slice(&0u16.to_be_bytes()); crc_record.extend_from_slice(&0u32.to_be_bytes()); assert_eq!(crc_record.len(), 12);
2503
2504 let mut page = entry_data;
2506 page.extend_from_slice(&crc_record);
2507 blob.write_at(0, page)
2508 .await
2509 .expect("Failed to write corrupted page");
2510 blob.sync().await.expect("Failed to sync");
2511 drop(blob);
2512
2513 let mut oversized: Oversized<_, TestEntry, TestValue> =
2516 Oversized::init(context.with_label("second"), cfg.clone())
2517 .await
2518 .expect("Failed to reinit");
2519
2520 assert!(oversized.get(1, 0).await.is_err());
2522
2523 let new_value: TestValue = [99; 16];
2525 let new_entry = TestEntry::new(99, 0, 0);
2526 let (pos, new_offset, _) = oversized
2527 .append(1, new_entry, &new_value)
2528 .await
2529 .expect("Failed to append after recovery");
2530
2531 assert_eq!(pos, 0);
2533 assert_eq!(new_offset, 0);
2535
2536 oversized.destroy().await.expect("Failed to destroy");
2537 });
2538 }
2539
2540 #[test_traced]
2541 fn test_empty_section_persistence() {
2542 let executor = deterministic::Runner::default();
2545 executor.start(|context| async move {
2546 let cfg = test_cfg();
2547
2548 let mut oversized: Oversized<_, TestEntry, TestValue> =
2550 Oversized::init(context.with_label("first"), cfg.clone())
2551 .await
2552 .expect("Failed to init");
2553
2554 for i in 0..3u8 {
2555 let value: TestValue = [i; 16];
2556 let entry = TestEntry::new(i as u64, 0, 0);
2557 oversized
2558 .append(1, entry, &value)
2559 .await
2560 .expect("Failed to append");
2561 }
2562 oversized.sync(1).await.expect("Failed to sync");
2563
2564 let value2: TestValue = [10; 16];
2566 let entry2 = TestEntry::new(10, 0, 0);
2567 oversized
2568 .append(2, entry2, &value2)
2569 .await
2570 .expect("Failed to append to section 2");
2571 oversized.sync(2).await.expect("Failed to sync section 2");
2572 drop(oversized);
2573
2574 let (blob, _) = context
2576 .open(&cfg.index_partition, &1u64.to_be_bytes())
2577 .await
2578 .expect("Failed to open blob");
2579 blob.resize(0).await.expect("Failed to truncate");
2580 blob.sync().await.expect("Failed to sync");
2581 drop(blob);
2582
2583 let mut oversized: Oversized<_, TestEntry, TestValue> =
2585 Oversized::init(context.with_label("second"), cfg.clone())
2586 .await
2587 .expect("Failed to reinit");
2588
2589 assert!(oversized.get(1, 0).await.is_err());
2591
2592 let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2594 assert_eq!(entry.id, 10);
2595
2596 assert_eq!(oversized.oldest_section(), Some(1));
2598
2599 let new_value: TestValue = [99; 16];
2605 let new_entry = TestEntry::new(99, 0, 0);
2606 let (pos, offset, size) = oversized
2607 .append(1, new_entry, &new_value)
2608 .await
2609 .expect("Failed to append to empty section");
2610 assert_eq!(pos, 0);
2611 assert!(offset > 0);
2613 oversized.sync(1).await.expect("Failed to sync");
2614
2615 let entry = oversized.get(1, 0).await.expect("Failed to get");
2617 assert_eq!(entry.id, 99);
2618 let value = oversized
2619 .get_value(1, offset, size)
2620 .await
2621 .expect("Failed to get value");
2622 assert_eq!(value, new_value);
2623
2624 drop(oversized);
2625
2626 let oversized: Oversized<_, TestEntry, TestValue> =
2628 Oversized::init(context.with_label("third"), cfg.clone())
2629 .await
2630 .expect("Failed to reinit again");
2631
2632 let entry = oversized.get(1, 0).await.expect("Failed to get");
2634 assert_eq!(entry.id, 99);
2635
2636 let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2638 assert_eq!(entry.id, 10);
2639
2640 oversized.destroy().await.expect("Failed to destroy");
2641 });
2642 }
2643
2644 #[test_traced]
2645 fn test_get_value_size_equals_crc_size() {
2646 let executor = deterministic::Runner::default();
2649 executor.start(|context| async move {
2650 let mut oversized: Oversized<_, TestEntry, TestValue> =
2651 Oversized::init(context.clone(), test_cfg())
2652 .await
2653 .expect("Failed to init");
2654
2655 let value: TestValue = [42; 16];
2656 let entry = TestEntry::new(1, 0, 0);
2657 let (_, offset, _) = oversized
2658 .append(1, entry, &value)
2659 .await
2660 .expect("Failed to append");
2661 oversized.sync(1).await.expect("Failed to sync");
2662
2663 let result = oversized.get_value(1, offset, 4).await;
2666 assert!(result.is_err());
2667
2668 oversized.destroy().await.expect("Failed to destroy");
2669 });
2670 }
2671
2672 #[test_traced]
2673 fn test_get_value_size_just_over_crc() {
2674 let executor = deterministic::Runner::default();
2677 executor.start(|context| async move {
2678 let mut oversized: Oversized<_, TestEntry, TestValue> =
2679 Oversized::init(context.clone(), test_cfg())
2680 .await
2681 .expect("Failed to init");
2682
2683 let value: TestValue = [42; 16];
2684 let entry = TestEntry::new(1, 0, 0);
2685 let (_, offset, _) = oversized
2686 .append(1, entry, &value)
2687 .await
2688 .expect("Failed to append");
2689 oversized.sync(1).await.expect("Failed to sync");
2690
2691 let result = oversized.get_value(1, offset, 5).await;
2694 assert!(result.is_err());
2695
2696 oversized.destroy().await.expect("Failed to destroy");
2697 });
2698 }
2699
2700 #[test_traced]
2701 fn test_recovery_maximum_section_numbers() {
2702 let executor = deterministic::Runner::default();
2705 executor.start(|context| async move {
2706 let cfg = test_cfg();
2707
2708 let large_sections = [u64::MAX - 3, u64::MAX - 2, u64::MAX - 1];
2710
2711 let mut oversized: Oversized<_, TestEntry, TestValue> =
2713 Oversized::init(context.with_label("first"), cfg.clone())
2714 .await
2715 .expect("Failed to init");
2716
2717 let mut locations = Vec::new();
2718 for §ion in &large_sections {
2719 let value: TestValue = [(section & 0xFF) as u8; 16];
2720 let entry = TestEntry::new(section, 0, 0);
2721 let loc = oversized
2722 .append(section, entry, &value)
2723 .await
2724 .expect("Failed to append");
2725 locations.push((section, loc));
2726 oversized.sync(section).await.expect("Failed to sync");
2727 }
2728 drop(oversized);
2729
2730 let middle_section = large_sections[1];
2732 let (blob, size) = context
2733 .open(&cfg.value_partition, &middle_section.to_be_bytes())
2734 .await
2735 .expect("Failed to open blob");
2736 blob.resize(size / 2).await.expect("Failed to truncate");
2737 blob.sync().await.expect("Failed to sync");
2738 drop(blob);
2739
2740 let oversized: Oversized<_, TestEntry, TestValue> =
2742 Oversized::init(context.with_label("second"), cfg.clone())
2743 .await
2744 .expect("Failed to reinit");
2745
2746 let entry = oversized
2748 .get(large_sections[0], 0)
2749 .await
2750 .expect("Failed to get first section");
2751 assert_eq!(entry.id, large_sections[0]);
2752
2753 let entry = oversized
2754 .get(large_sections[2], 0)
2755 .await
2756 .expect("Failed to get last section");
2757 assert_eq!(entry.id, large_sections[2]);
2758
2759 assert!(oversized.get(middle_section, 0).await.is_err());
2761
2762 let new_value: TestValue = [0xAB; 16];
2764 let new_entry = TestEntry::new(999, 0, 0);
2765 let mut oversized = oversized;
2766 oversized
2767 .append(middle_section, new_entry, &new_value)
2768 .await
2769 .expect("Failed to append after recovery");
2770
2771 oversized.destroy().await.expect("Failed to destroy");
2772 });
2773 }
2774
2775 #[test_traced]
2776 fn test_recovery_crash_during_recovery_rewind() {
2777 let executor = deterministic::Runner::default();
2781 executor.start(|context| async move {
2782 let cfg = test_cfg();
2783
2784 let mut oversized: Oversized<_, TestEntry, TestValue> =
2786 Oversized::init(context.with_label("first"), cfg.clone())
2787 .await
2788 .expect("Failed to init");
2789
2790 let mut locations = Vec::new();
2791 for i in 0..5u8 {
2792 let value: TestValue = [i; 16];
2793 let entry = TestEntry::new(i as u64, 0, 0);
2794 let loc = oversized
2795 .append(1, entry, &value)
2796 .await
2797 .expect("Failed to append");
2798 locations.push(loc);
2799 }
2800 oversized.sync(1).await.expect("Failed to sync");
2801 drop(oversized);
2802
2803 let (blob, _) = context
2805 .open(&cfg.value_partition, &1u64.to_be_bytes())
2806 .await
2807 .expect("Failed to open blob");
2808 let keep_size = byte_end(locations[2].1, locations[2].2);
2809 blob.resize(keep_size).await.expect("Failed to truncate");
2810 blob.sync().await.expect("Failed to sync");
2811 drop(blob);
2812
2813 let chunk_size = FixedJournal::<deterministic::Context, TestEntry>::CHUNK_SIZE as u64;
2818 let (index_blob, _) = context
2819 .open(&cfg.index_partition, &1u64.to_be_bytes())
2820 .await
2821 .expect("Failed to open index blob");
2822 let partial_rewind_size = 4 * chunk_size; index_blob
2824 .resize(partial_rewind_size)
2825 .await
2826 .expect("Failed to resize");
2827 index_blob.sync().await.expect("Failed to sync");
2828 drop(index_blob);
2829
2830 let oversized: Oversized<_, TestEntry, TestValue> =
2833 Oversized::init(context.with_label("second"), cfg.clone())
2834 .await
2835 .expect("Failed to reinit after nested crash");
2836
2837 for i in 0..3u8 {
2839 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2840 assert_eq!(entry.id, i as u64);
2841
2842 let (_, offset, size) = locations[i as usize];
2843 let value = oversized
2844 .get_value(1, offset, size)
2845 .await
2846 .expect("Failed to get value");
2847 assert_eq!(value, [i; 16]);
2848 }
2849
2850 assert!(oversized.get(1, 3).await.is_err());
2852
2853 let new_value: TestValue = [0xFF; 16];
2855 let new_entry = TestEntry::new(100, 0, 0);
2856 let mut oversized = oversized;
2857 let (pos, offset, _size) = oversized
2858 .append(1, new_entry, &new_value)
2859 .await
2860 .expect("Failed to append");
2861 assert_eq!(pos, 3); assert_eq!(offset, byte_end(locations[2].1, locations[2].2));
2865
2866 oversized.destroy().await.expect("Failed to destroy");
2867 });
2868 }
2869
2870 #[test_traced]
2871 fn test_recovery_crash_during_orphan_cleanup() {
2872 let executor = deterministic::Runner::default();
2875 executor.start(|context| async move {
2876 let cfg = test_cfg();
2877
2878 let mut oversized: Oversized<_, TestEntry, TestValue> =
2880 Oversized::init(context.with_label("first"), cfg.clone())
2881 .await
2882 .expect("Failed to init");
2883
2884 let value: TestValue = [1; 16];
2885 let entry = TestEntry::new(1, 0, 0);
2886 let (_, offset1, size1) = oversized
2887 .append(1, entry, &value)
2888 .await
2889 .expect("Failed to append");
2890 oversized.sync(1).await.expect("Failed to sync");
2891 drop(oversized);
2892
2893 let glob_cfg = GlobConfig {
2895 partition: cfg.value_partition.clone(),
2896 compression: cfg.compression,
2897 codec_config: (),
2898 write_buffer: cfg.value_write_buffer,
2899 };
2900 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2901 .await
2902 .expect("Failed to init glob");
2903
2904 for section in 2u64..=4 {
2905 let orphan_value: TestValue = [section as u8; 16];
2906 glob.append(section, &orphan_value)
2907 .await
2908 .expect("Failed to append orphan");
2909 glob.sync(section).await.expect("Failed to sync glob");
2910 }
2911 drop(glob);
2912
2913 context
2916 .remove(&cfg.value_partition, Some(&2u64.to_be_bytes()))
2917 .await
2918 .expect("Failed to remove section 2");
2919
2920 let mut oversized: Oversized<_, TestEntry, TestValue> =
2922 Oversized::init(context.with_label("second"), cfg.clone())
2923 .await
2924 .expect("Failed to reinit");
2925
2926 let entry = oversized.get(1, 0).await.expect("Failed to get");
2928 assert_eq!(entry.id, 1);
2929 let value = oversized
2930 .get_value(1, offset1, size1)
2931 .await
2932 .expect("Failed to get value");
2933 assert_eq!(value, [1; 16]);
2934
2935 assert_eq!(oversized.oldest_section(), Some(1));
2937 assert_eq!(oversized.newest_section(), Some(1));
2938
2939 let new_value: TestValue = [42; 16];
2941 let new_entry = TestEntry::new(42, 0, 0);
2942 let (pos, _, _) = oversized
2943 .append(2, new_entry, &new_value)
2944 .await
2945 .expect("Failed to append to section 2");
2946 assert_eq!(pos, 0); oversized.destroy().await.expect("Failed to destroy");
2949 });
2950 }
2951}