1use super::{
44 fixed::{Config as FixedConfig, Journal as FixedJournal},
45 glob::{Config as GlobConfig, Glob},
46};
47use crate::journal::Error;
48use commonware_codec::{Codec, CodecFixed, CodecShared};
49use commonware_runtime::{Metrics, Storage};
50use futures::{future::try_join, stream::Stream};
51use std::{collections::HashSet, num::NonZeroUsize};
52use tracing::{debug, warn};
53
54pub trait Record: CodecFixed<Cfg = ()> + Clone {
59 fn value_location(&self) -> (u64, u32);
61
62 fn with_location(self, offset: u64, size: u32) -> Self;
66}
67
68#[derive(Clone)]
70pub struct Config<C> {
71 pub index_partition: String,
73
74 pub value_partition: String,
76
77 pub index_buffer_pool: commonware_runtime::buffer::PoolRef,
79
80 pub index_write_buffer: NonZeroUsize,
82
83 pub value_write_buffer: NonZeroUsize,
85
86 pub compression: Option<u8>,
88
89 pub codec_config: C,
91}
92
93pub struct Oversized<E: Storage + Metrics, I: Record, V: Codec> {
98 index: FixedJournal<E, I>,
99 values: Glob<E, V>,
100}
101
102impl<E: Storage + Metrics, I: Record + Send + Sync, V: CodecShared> Oversized<E, I, V> {
103 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
109 let index_cfg = FixedConfig {
111 partition: cfg.index_partition,
112 buffer_pool: cfg.index_buffer_pool,
113 write_buffer: cfg.index_write_buffer,
114 };
115 let index = FixedJournal::init(context.with_label("index"), index_cfg).await?;
116
117 let value_cfg = GlobConfig {
118 partition: cfg.value_partition,
119 compression: cfg.compression,
120 codec_config: cfg.codec_config,
121 write_buffer: cfg.value_write_buffer,
122 };
123 let values = Glob::init(context.with_label("values"), value_cfg).await?;
124
125 let mut oversized = Self { index, values };
126
127 oversized.recover().await?;
129
130 Ok(oversized)
131 }
132
133 async fn recover(&mut self) -> Result<(), Error> {
139 let chunk_size = FixedJournal::<E, I>::CHUNK_SIZE as u64;
140 let sections: Vec<u64> = self.index.sections().collect();
141
142 for section in sections {
143 let index_size = self.index.size(section).await?;
144 if index_size == 0 {
145 continue;
146 }
147
148 let glob_size = match self.values.size(section).await {
149 Ok(size) => size,
150 Err(Error::AlreadyPrunedToSection(oldest)) => {
151 warn!(
156 section,
157 oldest, "index has section that glob already pruned"
158 );
159 0
160 }
161 Err(e) => return Err(e),
162 };
163
164 let entry_count = index_size / chunk_size;
166 let aligned_size = entry_count * chunk_size;
167 if aligned_size < index_size {
168 warn!(
169 section,
170 index_size, aligned_size, "trailing bytes detected: truncating"
171 );
172 self.index.rewind_section(section, aligned_size).await?;
173 }
174
175 if entry_count == 0 {
177 warn!(
178 section,
179 index_size, "trailing bytes detected: truncating to 0"
180 );
181 self.values.rewind_section(section, 0).await?;
182 continue;
183 }
184
185 let (valid_count, glob_target) = self
187 .find_last_valid_entry(section, entry_count, glob_size)
188 .await;
189
190 if valid_count < entry_count {
192 let valid_size = valid_count * chunk_size;
193 debug!(section, entry_count, valid_count, "rewinding index");
194 self.index.rewind_section(section, valid_size).await?;
195 }
196
197 if glob_size > glob_target {
200 debug!(
201 section,
202 glob_size, glob_target, "truncating glob trailing garbage"
203 );
204 self.values.rewind_section(section, glob_target).await?;
205 }
206 }
207
208 self.cleanup_orphan_value_sections().await?;
210
211 Ok(())
212 }
213
214 async fn cleanup_orphan_value_sections(&mut self) -> Result<(), Error> {
221 let index_sections: HashSet<u64> = self.index.sections().collect();
223
224 let orphan_sections: Vec<u64> = self
226 .values
227 .sections()
228 .filter(|s| !index_sections.contains(s))
229 .collect();
230
231 for section in orphan_sections {
233 warn!(section, "removing orphan value section");
234 self.values.remove_section(section).await?;
235 }
236
237 Ok(())
238 }
239
240 async fn find_last_valid_entry(
246 &self,
247 section: u64,
248 entry_count: u64,
249 glob_size: u64,
250 ) -> (u64, u64) {
251 for pos in (0..entry_count).rev() {
252 match self.index.get(section, pos).await {
253 Ok(entry) => {
254 let (offset, size) = entry.value_location();
255 let entry_end = offset.saturating_add(u64::from(size));
256 if entry_end <= glob_size {
257 return (pos + 1, entry_end);
258 }
259 if pos == entry_count - 1 {
260 warn!(
261 section,
262 pos, glob_size, entry_end, "invalid entry: glob truncated"
263 );
264 }
265 }
266 Err(_) => {
267 if pos == entry_count - 1 {
268 warn!(section, pos, "corrupted last entry, scanning backwards");
269 }
270 }
271 }
272 }
273 (0, 0)
274 }
275
276 pub async fn append(
285 &mut self,
286 section: u64,
287 entry: I,
288 value: &V,
289 ) -> Result<(u64, u64, u32), Error> {
290 let (offset, size) = self.values.append(section, value).await?;
293
294 let entry_with_location = entry.with_location(offset, size);
296 let position = self.index.append(section, entry_with_location).await?;
297
298 Ok((position, offset, size))
299 }
300
301 pub async fn get(&self, section: u64, position: u64) -> Result<I, Error> {
303 self.index.get(section, position).await
304 }
305
306 pub async fn last(&self, section: u64) -> Result<Option<I>, Error> {
308 self.index.last(section).await
309 }
310
311 pub async fn get_value(&self, section: u64, offset: u64, size: u32) -> Result<V, Error> {
315 self.values.get(section, offset, size).await
316 }
317
318 pub async fn replay(
322 &self,
323 start_section: u64,
324 start_position: u64,
325 buffer: NonZeroUsize,
326 ) -> Result<impl Stream<Item = Result<(u64, u64, I), Error>> + Send + '_, Error> {
327 self.index
328 .replay(start_section, start_position, buffer)
329 .await
330 }
331
332 pub async fn sync(&self, section: u64) -> Result<(), Error> {
334 try_join(self.index.sync(section), self.values.sync(section))
335 .await
336 .map(|_| ())
337 }
338
339 pub async fn sync_all(&self) -> Result<(), Error> {
341 try_join(self.index.sync_all(), self.values.sync_all())
342 .await
343 .map(|_| ())
344 }
345
346 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
352 let index_pruned = self.index.prune(min).await?;
353 let value_pruned = self.values.prune(min).await?;
354 Ok(index_pruned || value_pruned)
355 }
356
357 pub async fn rewind(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
362 self.index.rewind(section, index_size).await?;
364
365 let value_size = match self.index.last(section).await? {
367 Some(entry) => {
368 let (offset, size) = entry.value_location();
369 offset
370 .checked_add(u64::from(size))
371 .ok_or(Error::OffsetOverflow)?
372 }
373 None => 0,
374 };
375
376 self.values.rewind(section, value_size).await
378 }
379
380 pub async fn rewind_section(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
385 self.index.rewind_section(section, index_size).await?;
387
388 let value_size = match self.index.last(section).await? {
390 Some(entry) => {
391 let (offset, size) = entry.value_location();
392 offset
393 .checked_add(u64::from(size))
394 .ok_or(Error::OffsetOverflow)?
395 }
396 None => 0,
397 };
398
399 self.values.rewind_section(section, value_size).await
401 }
402
403 pub async fn size(&self, section: u64) -> Result<u64, Error> {
407 self.index.size(section).await
408 }
409
410 pub async fn value_size(&self, section: u64) -> Result<u64, Error> {
412 match self.index.last(section).await {
413 Ok(Some(entry)) => {
414 let (offset, size) = entry.value_location();
415 offset
416 .checked_add(u64::from(size))
417 .ok_or(Error::OffsetOverflow)
418 }
419 Ok(None) => Ok(0),
420 Err(Error::SectionOutOfRange(_)) => Ok(0),
421 Err(e) => Err(e),
422 }
423 }
424
425 pub fn oldest_section(&self) -> Option<u64> {
427 self.index.oldest_section()
428 }
429
430 pub fn newest_section(&self) -> Option<u64> {
432 self.index.newest_section()
433 }
434
435 pub async fn destroy(self) -> Result<(), Error> {
437 try_join(self.index.destroy(), self.values.destroy())
438 .await
439 .map(|_| ())
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use bytes::{Buf, BufMut};
447 use commonware_codec::{FixedSize, Read, ReadExt, Write};
448 use commonware_cryptography::Crc32;
449 use commonware_macros::test_traced;
450 use commonware_runtime::{buffer::PoolRef, deterministic, Blob as _, Runner};
451 use commonware_utils::{NZUsize, NZU16};
452
453 fn byte_end(offset: u64, size: u32) -> u64 {
455 offset + u64::from(size)
456 }
457
458 #[derive(Debug, Clone, PartialEq)]
460 struct TestEntry {
461 id: u64,
462 value_offset: u64,
463 value_size: u32,
464 }
465
466 impl TestEntry {
467 fn new(id: u64, value_offset: u64, value_size: u32) -> Self {
468 Self {
469 id,
470 value_offset,
471 value_size,
472 }
473 }
474 }
475
476 impl Write for TestEntry {
477 fn write(&self, buf: &mut impl BufMut) {
478 self.id.write(buf);
479 self.value_offset.write(buf);
480 self.value_size.write(buf);
481 }
482 }
483
484 impl Read for TestEntry {
485 type Cfg = ();
486
487 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
488 let id = u64::read(buf)?;
489 let value_offset = u64::read(buf)?;
490 let value_size = u32::read(buf)?;
491 Ok(Self {
492 id,
493 value_offset,
494 value_size,
495 })
496 }
497 }
498
499 impl FixedSize for TestEntry {
500 const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
501 }
502
503 impl Record for TestEntry {
504 fn value_location(&self) -> (u64, u32) {
505 (self.value_offset, self.value_size)
506 }
507
508 fn with_location(mut self, offset: u64, size: u32) -> Self {
509 self.value_offset = offset;
510 self.value_size = size;
511 self
512 }
513 }
514
515 fn test_cfg() -> Config<()> {
516 Config {
517 index_partition: "test_index".to_string(),
518 value_partition: "test_values".to_string(),
519 index_buffer_pool: PoolRef::new(NZU16!(64), NZUsize!(8)),
520 index_write_buffer: NZUsize!(1024),
521 value_write_buffer: NZUsize!(1024),
522 compression: None,
523 codec_config: (),
524 }
525 }
526
527 type TestValue = [u8; 16];
529
530 #[test_traced]
531 fn test_oversized_append_and_get() {
532 let executor = deterministic::Runner::default();
533 executor.start(|context| async move {
534 let mut oversized: Oversized<_, TestEntry, TestValue> =
535 Oversized::init(context.clone(), test_cfg())
536 .await
537 .expect("Failed to init");
538
539 let value: TestValue = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
541 let entry = TestEntry::new(42, 0, 0);
542 let (position, offset, size) = oversized
543 .append(1, entry, &value)
544 .await
545 .expect("Failed to append");
546
547 assert_eq!(position, 0);
548
549 let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
551 assert_eq!(retrieved_entry.id, 42);
552
553 let retrieved_value = oversized
555 .get_value(1, offset, size)
556 .await
557 .expect("Failed to get value");
558 assert_eq!(retrieved_value, value);
559
560 oversized.destroy().await.expect("Failed to destroy");
561 });
562 }
563
564 #[test_traced]
565 fn test_oversized_crash_recovery() {
566 let executor = deterministic::Runner::default();
567 executor.start(|context| async move {
568 let cfg = test_cfg();
569
570 let mut oversized: Oversized<_, TestEntry, TestValue> =
572 Oversized::init(context.clone(), cfg.clone())
573 .await
574 .expect("Failed to init");
575
576 let mut locations = Vec::new();
578 for i in 0..5u8 {
579 let value: TestValue = [i; 16];
580 let entry = TestEntry::new(i as u64, 0, 0);
581 let (position, offset, size) = oversized
582 .append(1, entry, &value)
583 .await
584 .expect("Failed to append");
585 locations.push((position, offset, size));
586 }
587 oversized.sync(1).await.expect("Failed to sync");
588 drop(oversized);
589
590 let (blob, _) = context
592 .open(&cfg.value_partition, &1u64.to_be_bytes())
593 .await
594 .expect("Failed to open blob");
595
596 let keep_size = byte_end(locations[2].1, locations[2].2);
598 blob.resize(keep_size).await.expect("Failed to truncate");
599 blob.sync().await.expect("Failed to sync");
600 drop(blob);
601
602 let oversized: Oversized<_, TestEntry, TestValue> =
604 Oversized::init(context.clone(), cfg.clone())
605 .await
606 .expect("Failed to reinit");
607
608 for i in 0..3u8 {
610 let (position, offset, size) = locations[i as usize];
611 let entry = oversized.get(1, position).await.expect("Failed to get");
612 assert_eq!(entry.id, i as u64);
613
614 let value = oversized
615 .get_value(1, offset, size)
616 .await
617 .expect("Failed to get value");
618 assert_eq!(value, [i; 16]);
619 }
620
621 let result = oversized.get(1, 3).await;
623 assert!(result.is_err());
624
625 oversized.destroy().await.expect("Failed to destroy");
626 });
627 }
628
629 #[test_traced]
630 fn test_oversized_persistence() {
631 let executor = deterministic::Runner::default();
632 executor.start(|context| async move {
633 let cfg = test_cfg();
634
635 let mut oversized: Oversized<_, TestEntry, TestValue> =
637 Oversized::init(context.clone(), cfg.clone())
638 .await
639 .expect("Failed to init");
640
641 let value: TestValue = [42; 16];
642 let entry = TestEntry::new(123, 0, 0);
643 let (position, offset, size) = oversized
644 .append(1, entry, &value)
645 .await
646 .expect("Failed to append");
647 oversized.sync(1).await.expect("Failed to sync");
648 drop(oversized);
649
650 let oversized: Oversized<_, TestEntry, TestValue> =
652 Oversized::init(context.clone(), cfg)
653 .await
654 .expect("Failed to reinit");
655
656 let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
657 assert_eq!(retrieved_entry.id, 123);
658
659 let retrieved_value = oversized
660 .get_value(1, offset, size)
661 .await
662 .expect("Failed to get value");
663 assert_eq!(retrieved_value, value);
664
665 oversized.destroy().await.expect("Failed to destroy");
666 });
667 }
668
669 #[test_traced]
670 fn test_oversized_prune() {
671 let executor = deterministic::Runner::default();
672 executor.start(|context| async move {
673 let mut oversized: Oversized<_, TestEntry, TestValue> =
674 Oversized::init(context.clone(), test_cfg())
675 .await
676 .expect("Failed to init");
677
678 for section in 1u64..=5 {
680 let value: TestValue = [section as u8; 16];
681 let entry = TestEntry::new(section, 0, 0);
682 oversized
683 .append(section, entry, &value)
684 .await
685 .expect("Failed to append");
686 oversized.sync(section).await.expect("Failed to sync");
687 }
688
689 oversized.prune(3).await.expect("Failed to prune");
691
692 assert!(oversized.get(1, 0).await.is_err());
694 assert!(oversized.get(2, 0).await.is_err());
695
696 assert!(oversized.get(3, 0).await.is_ok());
698 assert!(oversized.get(4, 0).await.is_ok());
699 assert!(oversized.get(5, 0).await.is_ok());
700
701 oversized.destroy().await.expect("Failed to destroy");
702 });
703 }
704
705 #[test_traced]
706 fn test_recovery_empty_section() {
707 let executor = deterministic::Runner::default();
708 executor.start(|context| async move {
709 let cfg = test_cfg();
710
711 let mut oversized: Oversized<_, TestEntry, TestValue> =
713 Oversized::init(context.clone(), cfg.clone())
714 .await
715 .expect("Failed to init");
716
717 let value: TestValue = [42; 16];
719 let entry = TestEntry::new(1, 0, 0);
720 oversized
721 .append(2, entry, &value)
722 .await
723 .expect("Failed to append");
724 oversized.sync(2).await.expect("Failed to sync");
725 drop(oversized);
726
727 let oversized: Oversized<_, TestEntry, TestValue> =
729 Oversized::init(context.clone(), cfg)
730 .await
731 .expect("Failed to reinit");
732
733 let entry = oversized.get(2, 0).await.expect("Failed to get");
735 assert_eq!(entry.id, 1);
736
737 oversized.destroy().await.expect("Failed to destroy");
738 });
739 }
740
741 #[test_traced]
742 fn test_recovery_all_entries_invalid() {
743 let executor = deterministic::Runner::default();
744 executor.start(|context| async move {
745 let cfg = test_cfg();
746
747 let mut oversized: Oversized<_, TestEntry, TestValue> =
749 Oversized::init(context.clone(), cfg.clone())
750 .await
751 .expect("Failed to init");
752
753 for i in 0..5u8 {
755 let value: TestValue = [i; 16];
756 let entry = TestEntry::new(i as u64, 0, 0);
757 oversized
758 .append(1, entry, &value)
759 .await
760 .expect("Failed to append");
761 }
762 oversized.sync(1).await.expect("Failed to sync");
763 drop(oversized);
764
765 let (blob, _) = context
767 .open(&cfg.value_partition, &1u64.to_be_bytes())
768 .await
769 .expect("Failed to open blob");
770 blob.resize(0).await.expect("Failed to truncate");
771 blob.sync().await.expect("Failed to sync");
772 drop(blob);
773
774 let mut oversized: Oversized<_, TestEntry, TestValue> =
776 Oversized::init(context.clone(), cfg)
777 .await
778 .expect("Failed to reinit");
779
780 let result = oversized.get(1, 0).await;
782 assert!(result.is_err());
783
784 let value: TestValue = [99; 16];
786 let entry = TestEntry::new(100, 0, 0);
787 let (pos, offset, size) = oversized
788 .append(1, entry, &value)
789 .await
790 .expect("Failed to append after recovery");
791 assert_eq!(pos, 0);
792
793 let retrieved = oversized.get(1, 0).await.expect("Failed to get");
794 assert_eq!(retrieved.id, 100);
795 let retrieved_value = oversized
796 .get_value(1, offset, size)
797 .await
798 .expect("Failed to get value");
799 assert_eq!(retrieved_value, value);
800
801 oversized.destroy().await.expect("Failed to destroy");
802 });
803 }
804
805 #[test_traced]
806 fn test_recovery_multiple_sections_mixed_validity() {
807 let executor = deterministic::Runner::default();
808 executor.start(|context| async move {
809 let cfg = test_cfg();
810
811 let mut oversized: Oversized<_, TestEntry, TestValue> =
813 Oversized::init(context.clone(), cfg.clone())
814 .await
815 .expect("Failed to init");
816
817 let mut section1_locations = Vec::new();
819 for i in 0..3u8 {
820 let value: TestValue = [i; 16];
821 let entry = TestEntry::new(i as u64, 0, 0);
822 let loc = oversized
823 .append(1, entry, &value)
824 .await
825 .expect("Failed to append");
826 section1_locations.push(loc);
827 }
828 oversized.sync(1).await.expect("Failed to sync");
829
830 let mut section2_locations = Vec::new();
832 for i in 0..5u8 {
833 let value: TestValue = [10 + i; 16];
834 let entry = TestEntry::new(10 + i as u64, 0, 0);
835 let loc = oversized
836 .append(2, entry, &value)
837 .await
838 .expect("Failed to append");
839 section2_locations.push(loc);
840 }
841 oversized.sync(2).await.expect("Failed to sync");
842
843 for i in 0..2u8 {
845 let value: TestValue = [20 + i; 16];
846 let entry = TestEntry::new(20 + i as u64, 0, 0);
847 oversized
848 .append(3, entry, &value)
849 .await
850 .expect("Failed to append");
851 }
852 oversized.sync(3).await.expect("Failed to sync");
853 drop(oversized);
854
855 let (blob, _) = context
857 .open(&cfg.value_partition, &1u64.to_be_bytes())
858 .await
859 .expect("Failed to open blob");
860 let keep_size = byte_end(section1_locations[0].1, section1_locations[0].2);
861 blob.resize(keep_size).await.expect("Failed to truncate");
862 blob.sync().await.expect("Failed to sync");
863 drop(blob);
864
865 let (blob, _) = context
867 .open(&cfg.value_partition, &2u64.to_be_bytes())
868 .await
869 .expect("Failed to open blob");
870 let keep_size = byte_end(section2_locations[2].1, section2_locations[2].2);
871 blob.resize(keep_size).await.expect("Failed to truncate");
872 blob.sync().await.expect("Failed to sync");
873 drop(blob);
874
875 let oversized: Oversized<_, TestEntry, TestValue> =
879 Oversized::init(context.clone(), cfg)
880 .await
881 .expect("Failed to reinit");
882
883 assert!(oversized.get(1, 0).await.is_ok());
885 assert!(oversized.get(1, 1).await.is_err());
886 assert!(oversized.get(1, 2).await.is_err());
887
888 assert!(oversized.get(2, 0).await.is_ok());
890 assert!(oversized.get(2, 1).await.is_ok());
891 assert!(oversized.get(2, 2).await.is_ok());
892 assert!(oversized.get(2, 3).await.is_err());
893 assert!(oversized.get(2, 4).await.is_err());
894
895 assert!(oversized.get(3, 0).await.is_ok());
897 assert!(oversized.get(3, 1).await.is_ok());
898
899 oversized.destroy().await.expect("Failed to destroy");
900 });
901 }
902
903 #[test_traced]
904 fn test_recovery_corrupted_last_index_entry() {
905 let executor = deterministic::Runner::default();
906 executor.start(|context| async move {
907 let cfg = Config {
911 index_partition: "test_index".to_string(),
912 value_partition: "test_values".to_string(),
913 index_buffer_pool: PoolRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
914 index_write_buffer: NZUsize!(1024),
915 value_write_buffer: NZUsize!(1024),
916 compression: None,
917 codec_config: (),
918 };
919
920 let mut oversized: Oversized<_, TestEntry, TestValue> =
922 Oversized::init(context.clone(), cfg.clone())
923 .await
924 .expect("Failed to init");
925
926 for i in 0..5u8 {
928 let value: TestValue = [i; 16];
929 let entry = TestEntry::new(i as u64, 0, 0);
930 oversized
931 .append(1, entry, &value)
932 .await
933 .expect("Failed to append");
934 }
935 oversized.sync(1).await.expect("Failed to sync");
936 drop(oversized);
937
938 let (blob, size) = context
940 .open(&cfg.index_partition, &1u64.to_be_bytes())
941 .await
942 .expect("Failed to open blob");
943
944 assert_eq!(size, 160);
948 let last_page_crc_offset = size - 12;
949 blob.write_at(vec![0xFF; 12], last_page_crc_offset)
950 .await
951 .expect("Failed to corrupt");
952 blob.sync().await.expect("Failed to sync");
953 drop(blob);
954
955 let mut oversized: Oversized<_, TestEntry, TestValue> =
957 Oversized::init(context.clone(), cfg)
958 .await
959 .expect("Failed to reinit");
960
961 for i in 0..4u8 {
963 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
964 assert_eq!(entry.id, i as u64);
965 }
966
967 assert!(oversized.get(1, 4).await.is_err());
969
970 let value: TestValue = [99; 16];
972 let entry = TestEntry::new(100, 0, 0);
973 let (pos, offset, size) = oversized
974 .append(1, entry, &value)
975 .await
976 .expect("Failed to append after recovery");
977 assert_eq!(pos, 4);
978
979 let retrieved = oversized.get(1, 4).await.expect("Failed to get");
980 assert_eq!(retrieved.id, 100);
981 let retrieved_value = oversized
982 .get_value(1, offset, size)
983 .await
984 .expect("Failed to get value");
985 assert_eq!(retrieved_value, value);
986
987 oversized.destroy().await.expect("Failed to destroy");
988 });
989 }
990
991 #[test_traced]
992 fn test_recovery_all_entries_valid() {
993 let executor = deterministic::Runner::default();
994 executor.start(|context| async move {
995 let cfg = test_cfg();
996
997 let mut oversized: Oversized<_, TestEntry, TestValue> =
999 Oversized::init(context.clone(), cfg.clone())
1000 .await
1001 .expect("Failed to init");
1002
1003 for section in 1u64..=3 {
1005 for i in 0..10u8 {
1006 let value: TestValue = [(section as u8) * 10 + i; 16];
1007 let entry = TestEntry::new(section * 100 + i as u64, 0, 0);
1008 oversized
1009 .append(section, entry, &value)
1010 .await
1011 .expect("Failed to append");
1012 }
1013 oversized.sync(section).await.expect("Failed to sync");
1014 }
1015 drop(oversized);
1016
1017 let oversized: Oversized<_, TestEntry, TestValue> =
1019 Oversized::init(context.clone(), cfg)
1020 .await
1021 .expect("Failed to reinit");
1022
1023 for section in 1u64..=3 {
1025 for i in 0..10u8 {
1026 let entry = oversized
1027 .get(section, i as u64)
1028 .await
1029 .expect("Failed to get");
1030 assert_eq!(entry.id, section * 100 + i as u64);
1031 }
1032 }
1033
1034 oversized.destroy().await.expect("Failed to destroy");
1035 });
1036 }
1037
1038 #[test_traced]
1039 fn test_recovery_single_entry_invalid() {
1040 let executor = deterministic::Runner::default();
1041 executor.start(|context| async move {
1042 let cfg = test_cfg();
1043
1044 let mut oversized: Oversized<_, TestEntry, TestValue> =
1046 Oversized::init(context.clone(), cfg.clone())
1047 .await
1048 .expect("Failed to init");
1049
1050 let value: TestValue = [42; 16];
1051 let entry = TestEntry::new(1, 0, 0);
1052 oversized
1053 .append(1, entry, &value)
1054 .await
1055 .expect("Failed to append");
1056 oversized.sync(1).await.expect("Failed to sync");
1057 drop(oversized);
1058
1059 let (blob, _) = context
1061 .open(&cfg.value_partition, &1u64.to_be_bytes())
1062 .await
1063 .expect("Failed to open blob");
1064 blob.resize(0).await.expect("Failed to truncate");
1065 blob.sync().await.expect("Failed to sync");
1066 drop(blob);
1067
1068 let oversized: Oversized<_, TestEntry, TestValue> =
1070 Oversized::init(context.clone(), cfg)
1071 .await
1072 .expect("Failed to reinit");
1073
1074 assert!(oversized.get(1, 0).await.is_err());
1076
1077 oversized.destroy().await.expect("Failed to destroy");
1078 });
1079 }
1080
1081 #[test_traced]
1082 fn test_recovery_last_entry_off_by_one() {
1083 let executor = deterministic::Runner::default();
1084 executor.start(|context| async move {
1085 let cfg = test_cfg();
1086
1087 let mut oversized: Oversized<_, TestEntry, TestValue> =
1089 Oversized::init(context.clone(), cfg.clone())
1090 .await
1091 .expect("Failed to init");
1092
1093 let mut locations = Vec::new();
1094 for i in 0..3u8 {
1095 let value: TestValue = [i; 16];
1096 let entry = TestEntry::new(i as u64, 0, 0);
1097 let loc = oversized
1098 .append(1, entry, &value)
1099 .await
1100 .expect("Failed to append");
1101 locations.push(loc);
1102 }
1103 oversized.sync(1).await.expect("Failed to sync");
1104 drop(oversized);
1105
1106 let (blob, _) = context
1108 .open(&cfg.value_partition, &1u64.to_be_bytes())
1109 .await
1110 .expect("Failed to open blob");
1111
1112 let last = &locations[2];
1115 let truncate_to = byte_end(last.1, last.2) - 1;
1116 blob.resize(truncate_to).await.expect("Failed to truncate");
1117 blob.sync().await.expect("Failed to sync");
1118 drop(blob);
1119
1120 let mut oversized: Oversized<_, TestEntry, TestValue> =
1122 Oversized::init(context.clone(), cfg)
1123 .await
1124 .expect("Failed to reinit");
1125
1126 assert!(oversized.get(1, 0).await.is_ok());
1128 assert!(oversized.get(1, 1).await.is_ok());
1129
1130 assert!(oversized.get(1, 2).await.is_err());
1132
1133 let value: TestValue = [99; 16];
1135 let entry = TestEntry::new(100, 0, 0);
1136 let (pos, offset, size) = oversized
1137 .append(1, entry, &value)
1138 .await
1139 .expect("Failed to append after recovery");
1140 assert_eq!(pos, 2);
1141
1142 let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1143 assert_eq!(retrieved.id, 100);
1144 let retrieved_value = oversized
1145 .get_value(1, offset, size)
1146 .await
1147 .expect("Failed to get value");
1148 assert_eq!(retrieved_value, value);
1149
1150 oversized.destroy().await.expect("Failed to destroy");
1151 });
1152 }
1153
1154 #[test_traced]
1155 fn test_recovery_glob_missing_entirely() {
1156 let executor = deterministic::Runner::default();
1157 executor.start(|context| async move {
1158 let cfg = test_cfg();
1159
1160 let mut oversized: Oversized<_, TestEntry, TestValue> =
1162 Oversized::init(context.clone(), cfg.clone())
1163 .await
1164 .expect("Failed to init");
1165
1166 for i in 0..3u8 {
1167 let value: TestValue = [i; 16];
1168 let entry = TestEntry::new(i as u64, 0, 0);
1169 oversized
1170 .append(1, entry, &value)
1171 .await
1172 .expect("Failed to append");
1173 }
1174 oversized.sync(1).await.expect("Failed to sync");
1175 drop(oversized);
1176
1177 context
1179 .remove(&cfg.value_partition, Some(&1u64.to_be_bytes()))
1180 .await
1181 .expect("Failed to remove");
1182
1183 let oversized: Oversized<_, TestEntry, TestValue> =
1185 Oversized::init(context.clone(), cfg)
1186 .await
1187 .expect("Failed to reinit");
1188
1189 assert!(oversized.get(1, 0).await.is_err());
1191 assert!(oversized.get(1, 1).await.is_err());
1192 assert!(oversized.get(1, 2).await.is_err());
1193
1194 oversized.destroy().await.expect("Failed to destroy");
1195 });
1196 }
1197
1198 #[test_traced]
1199 fn test_recovery_can_append_after_recovery() {
1200 let executor = deterministic::Runner::default();
1201 executor.start(|context| async move {
1202 let cfg = test_cfg();
1203
1204 let mut oversized: Oversized<_, TestEntry, TestValue> =
1206 Oversized::init(context.clone(), cfg.clone())
1207 .await
1208 .expect("Failed to init");
1209
1210 let mut locations = Vec::new();
1211 for i in 0..5u8 {
1212 let value: TestValue = [i; 16];
1213 let entry = TestEntry::new(i as u64, 0, 0);
1214 let loc = oversized
1215 .append(1, entry, &value)
1216 .await
1217 .expect("Failed to append");
1218 locations.push(loc);
1219 }
1220 oversized.sync(1).await.expect("Failed to sync");
1221 drop(oversized);
1222
1223 let (blob, _) = context
1225 .open(&cfg.value_partition, &1u64.to_be_bytes())
1226 .await
1227 .expect("Failed to open blob");
1228 let keep_size = byte_end(locations[1].1, locations[1].2);
1229 blob.resize(keep_size).await.expect("Failed to truncate");
1230 blob.sync().await.expect("Failed to sync");
1231 drop(blob);
1232
1233 let mut oversized: Oversized<_, TestEntry, TestValue> =
1235 Oversized::init(context.clone(), cfg.clone())
1236 .await
1237 .expect("Failed to reinit");
1238
1239 assert!(oversized.get(1, 0).await.is_ok());
1241 assert!(oversized.get(1, 1).await.is_ok());
1242 assert!(oversized.get(1, 2).await.is_err());
1243
1244 for i in 10..15u8 {
1246 let value: TestValue = [i; 16];
1247 let entry = TestEntry::new(i as u64, 0, 0);
1248 oversized
1249 .append(1, entry, &value)
1250 .await
1251 .expect("Failed to append after recovery");
1252 }
1253 oversized.sync(1).await.expect("Failed to sync");
1254
1255 for i in 0..5u8 {
1257 let entry = oversized
1258 .get(1, 2 + i as u64)
1259 .await
1260 .expect("Failed to get new entry");
1261 assert_eq!(entry.id, (10 + i) as u64);
1262 }
1263
1264 oversized.destroy().await.expect("Failed to destroy");
1265 });
1266 }
1267
1268 #[test_traced]
1269 fn test_recovery_glob_pruned_but_index_not() {
1270 let executor = deterministic::Runner::default();
1271 executor.start(|context| async move {
1272 let cfg = test_cfg();
1273
1274 let mut oversized: Oversized<_, TestEntry, TestValue> =
1276 Oversized::init(context.clone(), cfg.clone())
1277 .await
1278 .expect("Failed to init");
1279
1280 for section in 1u64..=3 {
1281 let value: TestValue = [section as u8; 16];
1282 let entry = TestEntry::new(section, 0, 0);
1283 oversized
1284 .append(section, entry, &value)
1285 .await
1286 .expect("Failed to append");
1287 oversized.sync(section).await.expect("Failed to sync");
1288 }
1289 drop(oversized);
1290
1291 use crate::journal::segmented::glob::{Config as GlobConfig, Glob};
1294 let glob_cfg = GlobConfig {
1295 partition: cfg.value_partition.clone(),
1296 compression: cfg.compression,
1297 codec_config: (),
1298 write_buffer: cfg.value_write_buffer,
1299 };
1300 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1301 .await
1302 .expect("Failed to init glob");
1303 glob.prune(2).await.expect("Failed to prune glob");
1304 glob.sync_all().await.expect("Failed to sync glob");
1305 drop(glob);
1306
1307 let oversized: Oversized<_, TestEntry, TestValue> =
1310 Oversized::init(context.clone(), cfg.clone())
1311 .await
1312 .expect("Failed to reinit");
1313
1314 assert!(oversized.get(1, 0).await.is_err());
1316
1317 assert!(oversized.get(2, 0).await.is_ok());
1319 assert!(oversized.get(3, 0).await.is_ok());
1320
1321 oversized.destroy().await.expect("Failed to destroy");
1322 });
1323 }
1324
1325 #[test_traced]
1326 fn test_recovery_index_partition_deleted() {
1327 let executor = deterministic::Runner::default();
1328 executor.start(|context| async move {
1329 let cfg = test_cfg();
1330
1331 let mut oversized: Oversized<_, TestEntry, TestValue> =
1333 Oversized::init(context.clone(), cfg.clone())
1334 .await
1335 .expect("Failed to init");
1336
1337 for section in 1u64..=3 {
1338 let value: TestValue = [section as u8; 16];
1339 let entry = TestEntry::new(section, 0, 0);
1340 oversized
1341 .append(section, entry, &value)
1342 .await
1343 .expect("Failed to append");
1344 oversized.sync(section).await.expect("Failed to sync");
1345 }
1346 drop(oversized);
1347
1348 context
1350 .remove(&cfg.index_partition, Some(&2u64.to_be_bytes()))
1351 .await
1352 .expect("Failed to remove index");
1353
1354 let oversized: Oversized<_, TestEntry, TestValue> =
1357 Oversized::init(context.clone(), cfg.clone())
1358 .await
1359 .expect("Failed to reinit");
1360
1361 assert!(oversized.get(1, 0).await.is_ok());
1363 assert!(oversized.get(3, 0).await.is_ok());
1364
1365 assert!(oversized.get(2, 0).await.is_err());
1367
1368 oversized.destroy().await.expect("Failed to destroy");
1369 });
1370 }
1371
1372 #[test_traced]
1373 fn test_recovery_index_synced_but_glob_not() {
1374 let executor = deterministic::Runner::default();
1375 executor.start(|context| async move {
1376 let cfg = test_cfg();
1377
1378 let mut oversized: Oversized<_, TestEntry, TestValue> =
1380 Oversized::init(context.clone(), cfg.clone())
1381 .await
1382 .expect("Failed to init");
1383
1384 let mut locations = Vec::new();
1386 for i in 0..3u8 {
1387 let value: TestValue = [i; 16];
1388 let entry = TestEntry::new(i as u64, 0, 0);
1389 let loc = oversized
1390 .append(1, entry, &value)
1391 .await
1392 .expect("Failed to append");
1393 locations.push(loc);
1394 }
1395 oversized.sync(1).await.expect("Failed to sync");
1396
1397 for i in 10..15u8 {
1399 let value: TestValue = [i; 16];
1400 let entry = TestEntry::new(i as u64, 0, 0);
1401 oversized
1402 .append(1, entry, &value)
1403 .await
1404 .expect("Failed to append");
1405 }
1406 drop(oversized);
1408
1409 let (blob, _) = context
1412 .open(&cfg.value_partition, &1u64.to_be_bytes())
1413 .await
1414 .expect("Failed to open blob");
1415 let synced_size = byte_end(locations[2].1, locations[2].2);
1416 blob.resize(synced_size).await.expect("Failed to truncate");
1417 blob.sync().await.expect("Failed to sync");
1418 drop(blob);
1419
1420 let oversized: Oversized<_, TestEntry, TestValue> =
1422 Oversized::init(context.clone(), cfg)
1423 .await
1424 .expect("Failed to reinit");
1425
1426 for i in 0..3u8 {
1428 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1429 assert_eq!(entry.id, i as u64);
1430 }
1431
1432 assert!(oversized.get(1, 3).await.is_err());
1434
1435 oversized.destroy().await.expect("Failed to destroy");
1436 });
1437 }
1438
1439 #[test_traced]
1440 fn test_recovery_glob_synced_but_index_not() {
1441 let executor = deterministic::Runner::default();
1442 executor.start(|context| async move {
1443 let cfg = Config {
1447 index_partition: "test_index".to_string(),
1448 value_partition: "test_values".to_string(),
1449 index_buffer_pool: PoolRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
1450 index_write_buffer: NZUsize!(1024),
1451 value_write_buffer: NZUsize!(1024),
1452 compression: None,
1453 codec_config: (),
1454 };
1455
1456 let mut oversized: Oversized<_, TestEntry, TestValue> =
1458 Oversized::init(context.clone(), cfg.clone())
1459 .await
1460 .expect("Failed to init");
1461
1462 let mut locations = Vec::new();
1464 for i in 0..3u8 {
1465 let value: TestValue = [i; 16];
1466 let entry = TestEntry::new(i as u64, 0, 0);
1467 let loc = oversized
1468 .append(1, entry, &value)
1469 .await
1470 .expect("Failed to append");
1471 locations.push(loc);
1472 }
1473 oversized.sync(1).await.expect("Failed to sync");
1474 drop(oversized);
1475
1476 let (blob, _size) = context
1479 .open(&cfg.index_partition, &1u64.to_be_bytes())
1480 .await
1481 .expect("Failed to open blob");
1482
1483 let physical_page_size = (TestEntry::SIZE + 12) as u64;
1486 blob.resize(2 * physical_page_size)
1487 .await
1488 .expect("Failed to truncate");
1489 blob.sync().await.expect("Failed to sync");
1490 drop(blob);
1491
1492 let mut oversized: Oversized<_, TestEntry, TestValue> =
1494 Oversized::init(context.clone(), cfg.clone())
1495 .await
1496 .expect("Failed to reinit");
1497
1498 for i in 0..2u8 {
1500 let (position, offset, size) = locations[i as usize];
1501 let entry = oversized.get(1, position).await.expect("Failed to get");
1502 assert_eq!(entry.id, i as u64);
1503
1504 let value = oversized
1505 .get_value(1, offset, size)
1506 .await
1507 .expect("Failed to get value");
1508 assert_eq!(value, [i; 16]);
1509 }
1510
1511 assert!(oversized.get(1, 2).await.is_err());
1513
1514 let mut new_locations = Vec::new();
1516 for i in 10..13u8 {
1517 let value: TestValue = [i; 16];
1518 let entry = TestEntry::new(i as u64, 0, 0);
1519 let (position, offset, size) = oversized
1520 .append(1, entry, &value)
1521 .await
1522 .expect("Failed to append after recovery");
1523
1524 assert_eq!(position, (i - 10 + 2) as u64);
1526 new_locations.push((position, offset, size, i));
1527
1528 let retrieved = oversized.get(1, position).await.expect("Failed to get");
1530 assert_eq!(retrieved.id, i as u64);
1531
1532 let retrieved_value = oversized
1533 .get_value(1, offset, size)
1534 .await
1535 .expect("Failed to get value");
1536 assert_eq!(retrieved_value, value);
1537 }
1538
1539 oversized.sync(1).await.expect("Failed to sync");
1541 drop(oversized);
1542
1543 let oversized: Oversized<_, TestEntry, TestValue> =
1545 Oversized::init(context.clone(), cfg)
1546 .await
1547 .expect("Failed to reinit after append");
1548
1549 for i in 0..2u8 {
1552 let (position, offset, size) = locations[i as usize];
1553 let entry = oversized.get(1, position).await.expect("Failed to get");
1554 assert_eq!(entry.id, i as u64);
1555
1556 let value = oversized
1557 .get_value(1, offset, size)
1558 .await
1559 .expect("Failed to get value");
1560 assert_eq!(value, [i; 16]);
1561 }
1562
1563 for (position, offset, size, expected_id) in &new_locations {
1565 let entry = oversized
1566 .get(1, *position)
1567 .await
1568 .expect("Failed to get new entry after restart");
1569 assert_eq!(entry.id, *expected_id as u64);
1570
1571 let value = oversized
1572 .get_value(1, *offset, *size)
1573 .await
1574 .expect("Failed to get new value after restart");
1575 assert_eq!(value, [*expected_id; 16]);
1576 }
1577
1578 assert!(oversized.get(1, 4).await.is_ok());
1580 assert!(oversized.get(1, 5).await.is_err());
1581
1582 oversized.destroy().await.expect("Failed to destroy");
1583 });
1584 }
1585
1586 #[test_traced]
1587 fn test_recovery_partial_index_entry() {
1588 let executor = deterministic::Runner::default();
1589 executor.start(|context| async move {
1590 let cfg = test_cfg();
1591
1592 let mut oversized: Oversized<_, TestEntry, TestValue> =
1594 Oversized::init(context.clone(), cfg.clone())
1595 .await
1596 .expect("Failed to init");
1597
1598 for i in 0..3u8 {
1600 let value: TestValue = [i; 16];
1601 let entry = TestEntry::new(i as u64, 0, 0);
1602 oversized
1603 .append(1, entry, &value)
1604 .await
1605 .expect("Failed to append");
1606 }
1607 oversized.sync(1).await.expect("Failed to sync");
1608 drop(oversized);
1609
1610 let (blob, _) = context
1614 .open(&cfg.index_partition, &1u64.to_be_bytes())
1615 .await
1616 .expect("Failed to open blob");
1617 let partial_size = 3 * 24 + 10; blob.resize(partial_size).await.expect("Failed to resize");
1619 blob.sync().await.expect("Failed to sync");
1620 drop(blob);
1621
1622 let mut oversized: Oversized<_, TestEntry, TestValue> =
1624 Oversized::init(context.clone(), cfg.clone())
1625 .await
1626 .expect("Failed to reinit");
1627
1628 for i in 0..3u8 {
1630 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1631 assert_eq!(entry.id, i as u64);
1632 }
1633
1634 assert!(oversized.get(1, 3).await.is_err());
1636
1637 let value: TestValue = [42; 16];
1639 let entry = TestEntry::new(100, 0, 0);
1640 let (pos, offset, size) = oversized
1641 .append(1, entry, &value)
1642 .await
1643 .expect("Failed to append after recovery");
1644 assert_eq!(pos, 3);
1645
1646 let retrieved = oversized.get(1, 3).await.expect("Failed to get new entry");
1648 assert_eq!(retrieved.id, 100);
1649 let retrieved_value = oversized
1650 .get_value(1, offset, size)
1651 .await
1652 .expect("Failed to get new value");
1653 assert_eq!(retrieved_value, value);
1654
1655 oversized.destroy().await.expect("Failed to destroy");
1656 });
1657 }
1658
1659 #[test_traced]
1660 fn test_recovery_only_partial_entry() {
1661 let executor = deterministic::Runner::default();
1662 executor.start(|context| async move {
1663 let cfg = test_cfg();
1664
1665 let mut oversized: Oversized<_, TestEntry, TestValue> =
1667 Oversized::init(context.clone(), cfg.clone())
1668 .await
1669 .expect("Failed to init");
1670
1671 let value: TestValue = [42; 16];
1672 let entry = TestEntry::new(1, 0, 0);
1673 oversized
1674 .append(1, entry, &value)
1675 .await
1676 .expect("Failed to append");
1677 oversized.sync(1).await.expect("Failed to sync");
1678 drop(oversized);
1679
1680 let (blob, _) = context
1682 .open(&cfg.index_partition, &1u64.to_be_bytes())
1683 .await
1684 .expect("Failed to open blob");
1685 blob.resize(10).await.expect("Failed to resize"); blob.sync().await.expect("Failed to sync");
1687 drop(blob);
1688
1689 let mut oversized: Oversized<_, TestEntry, TestValue> =
1691 Oversized::init(context.clone(), cfg.clone())
1692 .await
1693 .expect("Failed to reinit");
1694
1695 assert!(oversized.get(1, 0).await.is_err());
1697
1698 let value: TestValue = [99; 16];
1700 let entry = TestEntry::new(100, 0, 0);
1701 let (pos, offset, size) = oversized
1702 .append(1, entry, &value)
1703 .await
1704 .expect("Failed to append after recovery");
1705 assert_eq!(pos, 0);
1706
1707 let retrieved = oversized.get(1, 0).await.expect("Failed to get");
1708 assert_eq!(retrieved.id, 100);
1709 let retrieved_value = oversized
1710 .get_value(1, offset, size)
1711 .await
1712 .expect("Failed to get value");
1713 assert_eq!(retrieved_value, value);
1714
1715 oversized.destroy().await.expect("Failed to destroy");
1716 });
1717 }
1718
1719 #[test_traced]
1720 fn test_recovery_crash_during_rewind_index_ahead() {
1721 let executor = deterministic::Runner::default();
1723 executor.start(|context| async move {
1724 let cfg = Config {
1728 index_partition: "test_index".to_string(),
1729 value_partition: "test_values".to_string(),
1730 index_buffer_pool: PoolRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
1731 index_write_buffer: NZUsize!(1024),
1732 value_write_buffer: NZUsize!(1024),
1733 compression: None,
1734 codec_config: (),
1735 };
1736
1737 let mut oversized: Oversized<_, TestEntry, TestValue> =
1739 Oversized::init(context.clone(), cfg.clone())
1740 .await
1741 .expect("Failed to init");
1742
1743 let mut locations = Vec::new();
1744 for i in 0..5u8 {
1745 let value: TestValue = [i; 16];
1746 let entry = TestEntry::new(i as u64, 0, 0);
1747 let loc = oversized
1748 .append(1, entry, &value)
1749 .await
1750 .expect("Failed to append");
1751 locations.push(loc);
1752 }
1753 oversized.sync(1).await.expect("Failed to sync");
1754 drop(oversized);
1755
1756 let (blob, _) = context
1759 .open(&cfg.index_partition, &1u64.to_be_bytes())
1760 .await
1761 .expect("Failed to open blob");
1762 let physical_page_size = (TestEntry::SIZE + 12) as u64;
1764 blob.resize(2 * physical_page_size)
1765 .await
1766 .expect("Failed to truncate");
1767 blob.sync().await.expect("Failed to sync");
1768 drop(blob);
1769
1770 let mut oversized: Oversized<_, TestEntry, TestValue> =
1772 Oversized::init(context.clone(), cfg.clone())
1773 .await
1774 .expect("Failed to reinit");
1775
1776 for i in 0..2u8 {
1778 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1779 assert_eq!(entry.id, i as u64);
1780 }
1781
1782 assert!(oversized.get(1, 2).await.is_err());
1784
1785 let (pos, _, _) = oversized
1787 .append(1, TestEntry::new(100, 0, 0), &[100u8; 16])
1788 .await
1789 .expect("Failed to append");
1790 assert_eq!(pos, 2);
1791
1792 oversized.destroy().await.expect("Failed to destroy");
1793 });
1794 }
1795
1796 #[test_traced]
1797 fn test_recovery_crash_during_rewind_glob_ahead() {
1798 let executor = deterministic::Runner::default();
1800 executor.start(|context| async move {
1801 let cfg = test_cfg();
1802
1803 let mut oversized: Oversized<_, TestEntry, TestValue> =
1805 Oversized::init(context.clone(), cfg.clone())
1806 .await
1807 .expect("Failed to init");
1808
1809 let mut locations = Vec::new();
1810 for i in 0..5u8 {
1811 let value: TestValue = [i; 16];
1812 let entry = TestEntry::new(i as u64, 0, 0);
1813 let loc = oversized
1814 .append(1, entry, &value)
1815 .await
1816 .expect("Failed to append");
1817 locations.push(loc);
1818 }
1819 oversized.sync(1).await.expect("Failed to sync");
1820 drop(oversized);
1821
1822 let (blob, _) = context
1825 .open(&cfg.value_partition, &1u64.to_be_bytes())
1826 .await
1827 .expect("Failed to open blob");
1828 let keep_size = byte_end(locations[1].1, locations[1].2);
1829 blob.resize(keep_size).await.expect("Failed to truncate");
1830 blob.sync().await.expect("Failed to sync");
1831 drop(blob);
1832
1833 let mut oversized: Oversized<_, TestEntry, TestValue> =
1835 Oversized::init(context.clone(), cfg.clone())
1836 .await
1837 .expect("Failed to reinit");
1838
1839 for i in 0..2u8 {
1841 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1842 assert_eq!(entry.id, i as u64);
1843 }
1844
1845 assert!(oversized.get(1, 2).await.is_err());
1847
1848 let value: TestValue = [99; 16];
1850 let entry = TestEntry::new(100, 0, 0);
1851 let (pos, offset, size) = oversized
1852 .append(1, entry, &value)
1853 .await
1854 .expect("Failed to append after recovery");
1855 assert_eq!(pos, 2);
1856
1857 let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1858 assert_eq!(retrieved.id, 100);
1859 let retrieved_value = oversized
1860 .get_value(1, offset, size)
1861 .await
1862 .expect("Failed to get value");
1863 assert_eq!(retrieved_value, value);
1864
1865 oversized.destroy().await.expect("Failed to destroy");
1866 });
1867 }
1868
1869 #[test_traced]
1870 fn test_oversized_get_value_invalid_size() {
1871 let executor = deterministic::Runner::default();
1872 executor.start(|context| async move {
1873 let mut oversized: Oversized<_, TestEntry, TestValue> =
1874 Oversized::init(context.clone(), test_cfg())
1875 .await
1876 .expect("Failed to init");
1877
1878 let value: TestValue = [42; 16];
1879 let entry = TestEntry::new(1, 0, 0);
1880 let (_, offset, _size) = oversized
1881 .append(1, entry, &value)
1882 .await
1883 .expect("Failed to append");
1884 oversized.sync(1).await.expect("Failed to sync");
1885
1886 assert!(oversized.get_value(1, offset, 0).await.is_err());
1888
1889 for size in 1..4u32 {
1892 let result = oversized.get_value(1, offset, size).await;
1893 assert!(
1894 matches!(
1895 result,
1896 Err(Error::Codec(_))
1897 | Err(Error::ChecksumMismatch(_, _))
1898 | Err(Error::Runtime(_))
1899 ),
1900 "expected error, got: {:?}",
1901 result
1902 );
1903 }
1904
1905 oversized.destroy().await.expect("Failed to destroy");
1906 });
1907 }
1908
1909 #[test_traced]
1910 fn test_oversized_get_value_wrong_size() {
1911 let executor = deterministic::Runner::default();
1912 executor.start(|context| async move {
1913 let mut oversized: Oversized<_, TestEntry, TestValue> =
1914 Oversized::init(context.clone(), test_cfg())
1915 .await
1916 .expect("Failed to init");
1917
1918 let value: TestValue = [42; 16];
1919 let entry = TestEntry::new(1, 0, 0);
1920 let (_, offset, correct_size) = oversized
1921 .append(1, entry, &value)
1922 .await
1923 .expect("Failed to append");
1924 oversized.sync(1).await.expect("Failed to sync");
1925
1926 let result = oversized.get_value(1, offset, correct_size - 1).await;
1929 assert!(
1930 matches!(
1931 result,
1932 Err(Error::Codec(_)) | Err(Error::ChecksumMismatch(_, _))
1933 ),
1934 "expected Codec or ChecksumMismatch error, got: {:?}",
1935 result
1936 );
1937
1938 oversized.destroy().await.expect("Failed to destroy");
1939 });
1940 }
1941
1942 #[test_traced]
1943 fn test_recovery_values_has_orphan_section() {
1944 let executor = deterministic::Runner::default();
1945 executor.start(|context| async move {
1946 let cfg = test_cfg();
1947
1948 let mut oversized: Oversized<_, TestEntry, TestValue> =
1950 Oversized::init(context.clone(), cfg.clone())
1951 .await
1952 .expect("Failed to init");
1953
1954 for section in 1u64..=2 {
1955 let value: TestValue = [section as u8; 16];
1956 let entry = TestEntry::new(section, 0, 0);
1957 oversized
1958 .append(section, entry, &value)
1959 .await
1960 .expect("Failed to append");
1961 oversized.sync(section).await.expect("Failed to sync");
1962 }
1963 drop(oversized);
1964
1965 let glob_cfg = GlobConfig {
1967 partition: cfg.value_partition.clone(),
1968 compression: cfg.compression,
1969 codec_config: (),
1970 write_buffer: cfg.value_write_buffer,
1971 };
1972 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1973 .await
1974 .expect("Failed to init glob");
1975 let orphan_value: TestValue = [99; 16];
1976 glob.append(3, &orphan_value)
1977 .await
1978 .expect("Failed to append orphan");
1979 glob.sync(3).await.expect("Failed to sync glob");
1980 drop(glob);
1981
1982 let oversized: Oversized<_, TestEntry, TestValue> =
1984 Oversized::init(context.clone(), cfg.clone())
1985 .await
1986 .expect("Failed to reinit");
1987
1988 assert!(oversized.get(1, 0).await.is_ok());
1990 assert!(oversized.get(2, 0).await.is_ok());
1991
1992 assert_eq!(oversized.newest_section(), Some(2));
1994
1995 oversized.destroy().await.expect("Failed to destroy");
1996 });
1997 }
1998
1999 #[test_traced]
2000 fn test_recovery_values_has_multiple_orphan_sections() {
2001 let executor = deterministic::Runner::default();
2002 executor.start(|context| async move {
2003 let cfg = test_cfg();
2004
2005 let mut oversized: Oversized<_, TestEntry, TestValue> =
2007 Oversized::init(context.clone(), cfg.clone())
2008 .await
2009 .expect("Failed to init");
2010
2011 let value: TestValue = [1; 16];
2012 let entry = TestEntry::new(1, 0, 0);
2013 oversized
2014 .append(1, entry, &value)
2015 .await
2016 .expect("Failed to append");
2017 oversized.sync(1).await.expect("Failed to sync");
2018 drop(oversized);
2019
2020 let glob_cfg = GlobConfig {
2022 partition: cfg.value_partition.clone(),
2023 compression: cfg.compression,
2024 codec_config: (),
2025 write_buffer: cfg.value_write_buffer,
2026 };
2027 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2028 .await
2029 .expect("Failed to init glob");
2030
2031 for section in 2u64..=4 {
2032 let orphan_value: TestValue = [section as u8; 16];
2033 glob.append(section, &orphan_value)
2034 .await
2035 .expect("Failed to append orphan");
2036 glob.sync(section).await.expect("Failed to sync glob");
2037 }
2038 drop(glob);
2039
2040 let oversized: Oversized<_, TestEntry, TestValue> =
2042 Oversized::init(context.clone(), cfg.clone())
2043 .await
2044 .expect("Failed to reinit");
2045
2046 assert!(oversized.get(1, 0).await.is_ok());
2048
2049 assert_eq!(oversized.newest_section(), Some(1));
2051
2052 oversized.destroy().await.expect("Failed to destroy");
2053 });
2054 }
2055
2056 #[test_traced]
2057 fn test_recovery_index_empty_but_values_exist() {
2058 let executor = deterministic::Runner::default();
2059 executor.start(|context| async move {
2060 let cfg = test_cfg();
2061
2062 let glob_cfg = GlobConfig {
2064 partition: cfg.value_partition.clone(),
2065 compression: cfg.compression,
2066 codec_config: (),
2067 write_buffer: cfg.value_write_buffer,
2068 };
2069 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2070 .await
2071 .expect("Failed to init glob");
2072
2073 for section in 1u64..=3 {
2074 let orphan_value: TestValue = [section as u8; 16];
2075 glob.append(section, &orphan_value)
2076 .await
2077 .expect("Failed to append orphan");
2078 glob.sync(section).await.expect("Failed to sync glob");
2079 }
2080 drop(glob);
2081
2082 let oversized: Oversized<_, TestEntry, TestValue> =
2084 Oversized::init(context.clone(), cfg.clone())
2085 .await
2086 .expect("Failed to init");
2087
2088 assert_eq!(oversized.newest_section(), None);
2090 assert_eq!(oversized.oldest_section(), None);
2091
2092 oversized.destroy().await.expect("Failed to destroy");
2093 });
2094 }
2095
2096 #[test_traced]
2097 fn test_recovery_orphan_section_append_after() {
2098 let executor = deterministic::Runner::default();
2099 executor.start(|context| async move {
2100 let cfg = test_cfg();
2101
2102 let mut oversized: Oversized<_, TestEntry, TestValue> =
2104 Oversized::init(context.clone(), cfg.clone())
2105 .await
2106 .expect("Failed to init");
2107
2108 let value: TestValue = [1; 16];
2109 let entry = TestEntry::new(1, 0, 0);
2110 let (_, offset1, size1) = oversized
2111 .append(1, entry, &value)
2112 .await
2113 .expect("Failed to append");
2114 oversized.sync(1).await.expect("Failed to sync");
2115 drop(oversized);
2116
2117 let glob_cfg = GlobConfig {
2119 partition: cfg.value_partition.clone(),
2120 compression: cfg.compression,
2121 codec_config: (),
2122 write_buffer: cfg.value_write_buffer,
2123 };
2124 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2125 .await
2126 .expect("Failed to init glob");
2127
2128 for section in 2u64..=3 {
2129 let orphan_value: TestValue = [section as u8; 16];
2130 glob.append(section, &orphan_value)
2131 .await
2132 .expect("Failed to append orphan");
2133 glob.sync(section).await.expect("Failed to sync glob");
2134 }
2135 drop(glob);
2136
2137 let mut oversized: Oversized<_, TestEntry, TestValue> =
2139 Oversized::init(context.clone(), cfg.clone())
2140 .await
2141 .expect("Failed to reinit");
2142
2143 let entry = oversized.get(1, 0).await.expect("Failed to get");
2145 assert_eq!(entry.id, 1);
2146 let value = oversized
2147 .get_value(1, offset1, size1)
2148 .await
2149 .expect("Failed to get value");
2150 assert_eq!(value, [1; 16]);
2151
2152 let new_value: TestValue = [42; 16];
2154 let new_entry = TestEntry::new(42, 0, 0);
2155 let (pos, offset, size) = oversized
2156 .append(2, new_entry, &new_value)
2157 .await
2158 .expect("Failed to append after recovery");
2159 assert_eq!(pos, 0);
2160
2161 let retrieved = oversized.get(2, 0).await.expect("Failed to get");
2163 assert_eq!(retrieved.id, 42);
2164 let retrieved_value = oversized
2165 .get_value(2, offset, size)
2166 .await
2167 .expect("Failed to get value");
2168 assert_eq!(retrieved_value, new_value);
2169
2170 oversized.sync(2).await.expect("Failed to sync");
2172 drop(oversized);
2173
2174 let oversized: Oversized<_, TestEntry, TestValue> =
2175 Oversized::init(context.clone(), cfg)
2176 .await
2177 .expect("Failed to reinit after append");
2178
2179 assert!(oversized.get(1, 0).await.is_ok());
2181 assert!(oversized.get(2, 0).await.is_ok());
2182 assert_eq!(oversized.newest_section(), Some(2));
2183
2184 oversized.destroy().await.expect("Failed to destroy");
2185 });
2186 }
2187
2188 #[test_traced]
2189 fn test_recovery_no_orphan_sections() {
2190 let executor = deterministic::Runner::default();
2191 executor.start(|context| async move {
2192 let cfg = test_cfg();
2193
2194 let mut oversized: Oversized<_, TestEntry, TestValue> =
2196 Oversized::init(context.clone(), cfg.clone())
2197 .await
2198 .expect("Failed to init");
2199
2200 for section in 1u64..=3 {
2201 let value: TestValue = [section as u8; 16];
2202 let entry = TestEntry::new(section, 0, 0);
2203 oversized
2204 .append(section, entry, &value)
2205 .await
2206 .expect("Failed to append");
2207 oversized.sync(section).await.expect("Failed to sync");
2208 }
2209 drop(oversized);
2210
2211 let oversized: Oversized<_, TestEntry, TestValue> =
2213 Oversized::init(context.clone(), cfg)
2214 .await
2215 .expect("Failed to reinit");
2216
2217 for section in 1u64..=3 {
2219 let entry = oversized.get(section, 0).await.expect("Failed to get");
2220 assert_eq!(entry.id, section);
2221 }
2222 assert_eq!(oversized.newest_section(), Some(3));
2223
2224 oversized.destroy().await.expect("Failed to destroy");
2225 });
2226 }
2227
2228 #[test_traced]
2229 fn test_recovery_orphan_with_empty_index_section() {
2230 let executor = deterministic::Runner::default();
2231 executor.start(|context| async move {
2232 let cfg = test_cfg();
2233
2234 let mut oversized: Oversized<_, TestEntry, TestValue> =
2236 Oversized::init(context.clone(), cfg.clone())
2237 .await
2238 .expect("Failed to init");
2239
2240 let value: TestValue = [1; 16];
2241 let entry = TestEntry::new(1, 0, 0);
2242 oversized
2243 .append(1, entry, &value)
2244 .await
2245 .expect("Failed to append");
2246 oversized.sync(1).await.expect("Failed to sync");
2247 drop(oversized);
2248
2249 let glob_cfg = GlobConfig {
2251 partition: cfg.value_partition.clone(),
2252 compression: cfg.compression,
2253 codec_config: (),
2254 write_buffer: cfg.value_write_buffer,
2255 };
2256 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2257 .await
2258 .expect("Failed to init glob");
2259 let orphan_value: TestValue = [2; 16];
2260 glob.append(2, &orphan_value)
2261 .await
2262 .expect("Failed to append orphan");
2263 glob.sync(2).await.expect("Failed to sync glob");
2264 drop(glob);
2265
2266 let (blob, _) = context
2268 .open(&cfg.index_partition, &1u64.to_be_bytes())
2269 .await
2270 .expect("Failed to open blob");
2271 blob.resize(0).await.expect("Failed to truncate");
2272 blob.sync().await.expect("Failed to sync");
2273 drop(blob);
2274
2275 let oversized: Oversized<_, TestEntry, TestValue> =
2277 Oversized::init(context.clone(), cfg)
2278 .await
2279 .expect("Failed to reinit");
2280
2281 assert!(oversized.get(1, 0).await.is_err());
2283
2284 assert_eq!(oversized.newest_section(), Some(1));
2286
2287 oversized.destroy().await.expect("Failed to destroy");
2288 });
2289 }
2290
2291 #[test_traced]
2292 fn test_recovery_orphan_sections_with_gaps() {
2293 let executor = deterministic::Runner::default();
2296 executor.start(|context| async move {
2297 let cfg = test_cfg();
2298
2299 let mut oversized: Oversized<_, TestEntry, TestValue> =
2301 Oversized::init(context.clone(), cfg.clone())
2302 .await
2303 .expect("Failed to init");
2304
2305 for section in [1u64, 3, 5] {
2306 let value: TestValue = [section as u8; 16];
2307 let entry = TestEntry::new(section, 0, 0);
2308 oversized
2309 .append(section, entry, &value)
2310 .await
2311 .expect("Failed to append");
2312 oversized.sync(section).await.expect("Failed to sync");
2313 }
2314 drop(oversized);
2315
2316 let glob_cfg = GlobConfig {
2318 partition: cfg.value_partition.clone(),
2319 compression: cfg.compression,
2320 codec_config: (),
2321 write_buffer: cfg.value_write_buffer,
2322 };
2323 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2324 .await
2325 .expect("Failed to init glob");
2326
2327 for section in [2u64, 4, 6] {
2328 let orphan_value: TestValue = [section as u8; 16];
2329 glob.append(section, &orphan_value)
2330 .await
2331 .expect("Failed to append orphan");
2332 glob.sync(section).await.expect("Failed to sync glob");
2333 }
2334 drop(glob);
2335
2336 let oversized: Oversized<_, TestEntry, TestValue> =
2338 Oversized::init(context.clone(), cfg)
2339 .await
2340 .expect("Failed to reinit");
2341
2342 for section in [1u64, 3, 5] {
2344 let entry = oversized.get(section, 0).await.expect("Failed to get");
2345 assert_eq!(entry.id, section);
2346 }
2347
2348 assert_eq!(oversized.oldest_section(), Some(1));
2350 assert_eq!(oversized.newest_section(), Some(5));
2351
2352 oversized.destroy().await.expect("Failed to destroy");
2353 });
2354 }
2355
2356 #[test_traced]
2357 fn test_recovery_glob_trailing_garbage_truncated() {
2358 let executor = deterministic::Runner::default();
2362 executor.start(|context| async move {
2363 let cfg = test_cfg();
2364
2365 let mut oversized: Oversized<_, TestEntry, TestValue> =
2367 Oversized::init(context.clone(), cfg.clone())
2368 .await
2369 .expect("Failed to init");
2370
2371 let mut locations = Vec::new();
2373 for i in 0..2u8 {
2374 let value: TestValue = [i; 16];
2375 let entry = TestEntry::new(i as u64, 0, 0);
2376 let loc = oversized
2377 .append(1, entry, &value)
2378 .await
2379 .expect("Failed to append");
2380 locations.push(loc);
2381 }
2382 oversized.sync(1).await.expect("Failed to sync");
2383
2384 let expected_next_offset = byte_end(locations[1].1, locations[1].2);
2386 drop(oversized);
2387
2388 let (blob, size) = context
2390 .open(&cfg.value_partition, &1u64.to_be_bytes())
2391 .await
2392 .expect("Failed to open blob");
2393 assert_eq!(size, expected_next_offset);
2394
2395 let garbage = vec![0xDE; 100];
2397 blob.write_at(garbage, size)
2398 .await
2399 .expect("Failed to write garbage");
2400 blob.sync().await.expect("Failed to sync");
2401 drop(blob);
2402
2403 let (blob, new_size) = context
2405 .open(&cfg.value_partition, &1u64.to_be_bytes())
2406 .await
2407 .expect("Failed to open blob");
2408 assert_eq!(new_size, expected_next_offset + 100);
2409 drop(blob);
2410
2411 let mut oversized: Oversized<_, TestEntry, TestValue> =
2413 Oversized::init(context.clone(), cfg.clone())
2414 .await
2415 .expect("Failed to reinit");
2416
2417 for i in 0..2u8 {
2419 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2420 assert_eq!(entry.id, i as u64);
2421 }
2422
2423 let new_value: TestValue = [99; 16];
2425 let new_entry = TestEntry::new(99, 0, 0);
2426 let (pos, offset, _size) = oversized
2427 .append(1, new_entry, &new_value)
2428 .await
2429 .expect("Failed to append after recovery");
2430
2431 assert_eq!(pos, 2);
2433
2434 assert_eq!(offset, expected_next_offset);
2436
2437 let retrieved = oversized.get(1, 2).await.expect("Failed to get new entry");
2439 assert_eq!(retrieved.id, 99);
2440
2441 oversized.destroy().await.expect("Failed to destroy");
2442 });
2443 }
2444
2445 #[test_traced]
2446 fn test_recovery_entry_with_overflow_offset() {
2447 let executor = deterministic::Runner::default();
2450 executor.start(|context| async move {
2451 let cfg = Config {
2453 index_partition: "test_index".to_string(),
2454 value_partition: "test_values".to_string(),
2455 index_buffer_pool: PoolRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
2456 index_write_buffer: NZUsize!(1024),
2457 value_write_buffer: NZUsize!(1024),
2458 compression: None,
2459 codec_config: (),
2460 };
2461
2462 let mut oversized: Oversized<_, TestEntry, TestValue> =
2464 Oversized::init(context.clone(), cfg.clone())
2465 .await
2466 .expect("Failed to init");
2467
2468 let value: TestValue = [1; 16];
2469 let entry = TestEntry::new(1, 0, 0);
2470 oversized
2471 .append(1, entry, &value)
2472 .await
2473 .expect("Failed to append");
2474 oversized.sync(1).await.expect("Failed to sync");
2475 drop(oversized);
2476
2477 let (blob, _) = context
2481 .open(&cfg.index_partition, &1u64.to_be_bytes())
2482 .await
2483 .expect("Failed to open blob");
2484
2485 let mut entry_data = Vec::new();
2487 1u64.write(&mut entry_data); (u64::MAX - 10).write(&mut entry_data); 100u32.write(&mut entry_data); assert_eq!(entry_data.len(), TestEntry::SIZE);
2491
2492 let crc = Crc32::checksum(&entry_data);
2495 let len1 = TestEntry::SIZE as u16;
2496 let mut crc_record = Vec::new();
2497 crc_record.extend_from_slice(&len1.to_be_bytes()); crc_record.extend_from_slice(&crc.to_be_bytes()); crc_record.extend_from_slice(&0u16.to_be_bytes()); crc_record.extend_from_slice(&0u32.to_be_bytes()); assert_eq!(crc_record.len(), 12);
2502
2503 let mut page = entry_data;
2505 page.extend_from_slice(&crc_record);
2506 blob.write_at(page, 0)
2507 .await
2508 .expect("Failed to write corrupted page");
2509 blob.sync().await.expect("Failed to sync");
2510 drop(blob);
2511
2512 let mut oversized: Oversized<_, TestEntry, TestValue> =
2515 Oversized::init(context.clone(), cfg.clone())
2516 .await
2517 .expect("Failed to reinit");
2518
2519 assert!(oversized.get(1, 0).await.is_err());
2521
2522 let new_value: TestValue = [99; 16];
2524 let new_entry = TestEntry::new(99, 0, 0);
2525 let (pos, new_offset, _) = oversized
2526 .append(1, new_entry, &new_value)
2527 .await
2528 .expect("Failed to append after recovery");
2529
2530 assert_eq!(pos, 0);
2532 assert_eq!(new_offset, 0);
2534
2535 oversized.destroy().await.expect("Failed to destroy");
2536 });
2537 }
2538
2539 #[test_traced]
2540 fn test_empty_section_persistence() {
2541 let executor = deterministic::Runner::default();
2544 executor.start(|context| async move {
2545 let cfg = test_cfg();
2546
2547 let mut oversized: Oversized<_, TestEntry, TestValue> =
2549 Oversized::init(context.clone(), cfg.clone())
2550 .await
2551 .expect("Failed to init");
2552
2553 for i in 0..3u8 {
2554 let value: TestValue = [i; 16];
2555 let entry = TestEntry::new(i as u64, 0, 0);
2556 oversized
2557 .append(1, entry, &value)
2558 .await
2559 .expect("Failed to append");
2560 }
2561 oversized.sync(1).await.expect("Failed to sync");
2562
2563 let value2: TestValue = [10; 16];
2565 let entry2 = TestEntry::new(10, 0, 0);
2566 oversized
2567 .append(2, entry2, &value2)
2568 .await
2569 .expect("Failed to append to section 2");
2570 oversized.sync(2).await.expect("Failed to sync section 2");
2571 drop(oversized);
2572
2573 let (blob, _) = context
2575 .open(&cfg.index_partition, &1u64.to_be_bytes())
2576 .await
2577 .expect("Failed to open blob");
2578 blob.resize(0).await.expect("Failed to truncate");
2579 blob.sync().await.expect("Failed to sync");
2580 drop(blob);
2581
2582 let mut oversized: Oversized<_, TestEntry, TestValue> =
2584 Oversized::init(context.clone(), cfg.clone())
2585 .await
2586 .expect("Failed to reinit");
2587
2588 assert!(oversized.get(1, 0).await.is_err());
2590
2591 let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2593 assert_eq!(entry.id, 10);
2594
2595 assert_eq!(oversized.oldest_section(), Some(1));
2597
2598 let new_value: TestValue = [99; 16];
2604 let new_entry = TestEntry::new(99, 0, 0);
2605 let (pos, offset, size) = oversized
2606 .append(1, new_entry, &new_value)
2607 .await
2608 .expect("Failed to append to empty section");
2609 assert_eq!(pos, 0);
2610 assert!(offset > 0);
2612 oversized.sync(1).await.expect("Failed to sync");
2613
2614 let entry = oversized.get(1, 0).await.expect("Failed to get");
2616 assert_eq!(entry.id, 99);
2617 let value = oversized
2618 .get_value(1, offset, size)
2619 .await
2620 .expect("Failed to get value");
2621 assert_eq!(value, new_value);
2622
2623 drop(oversized);
2624
2625 let oversized: Oversized<_, TestEntry, TestValue> =
2627 Oversized::init(context.clone(), cfg.clone())
2628 .await
2629 .expect("Failed to reinit again");
2630
2631 let entry = oversized.get(1, 0).await.expect("Failed to get");
2633 assert_eq!(entry.id, 99);
2634
2635 let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2637 assert_eq!(entry.id, 10);
2638
2639 oversized.destroy().await.expect("Failed to destroy");
2640 });
2641 }
2642
2643 #[test_traced]
2644 fn test_get_value_size_equals_crc_size() {
2645 let executor = deterministic::Runner::default();
2648 executor.start(|context| async move {
2649 let mut oversized: Oversized<_, TestEntry, TestValue> =
2650 Oversized::init(context.clone(), test_cfg())
2651 .await
2652 .expect("Failed to init");
2653
2654 let value: TestValue = [42; 16];
2655 let entry = TestEntry::new(1, 0, 0);
2656 let (_, offset, _) = oversized
2657 .append(1, entry, &value)
2658 .await
2659 .expect("Failed to append");
2660 oversized.sync(1).await.expect("Failed to sync");
2661
2662 let result = oversized.get_value(1, offset, 4).await;
2665 assert!(result.is_err());
2666
2667 oversized.destroy().await.expect("Failed to destroy");
2668 });
2669 }
2670
2671 #[test_traced]
2672 fn test_get_value_size_just_over_crc() {
2673 let executor = deterministic::Runner::default();
2676 executor.start(|context| async move {
2677 let mut oversized: Oversized<_, TestEntry, TestValue> =
2678 Oversized::init(context.clone(), test_cfg())
2679 .await
2680 .expect("Failed to init");
2681
2682 let value: TestValue = [42; 16];
2683 let entry = TestEntry::new(1, 0, 0);
2684 let (_, offset, _) = oversized
2685 .append(1, entry, &value)
2686 .await
2687 .expect("Failed to append");
2688 oversized.sync(1).await.expect("Failed to sync");
2689
2690 let result = oversized.get_value(1, offset, 5).await;
2693 assert!(result.is_err());
2694
2695 oversized.destroy().await.expect("Failed to destroy");
2696 });
2697 }
2698
2699 #[test_traced]
2700 fn test_recovery_maximum_section_numbers() {
2701 let executor = deterministic::Runner::default();
2704 executor.start(|context| async move {
2705 let cfg = test_cfg();
2706
2707 let large_sections = [u64::MAX - 3, u64::MAX - 2, u64::MAX - 1];
2709
2710 let mut oversized: Oversized<_, TestEntry, TestValue> =
2712 Oversized::init(context.clone(), cfg.clone())
2713 .await
2714 .expect("Failed to init");
2715
2716 let mut locations = Vec::new();
2717 for §ion in &large_sections {
2718 let value: TestValue = [(section & 0xFF) as u8; 16];
2719 let entry = TestEntry::new(section, 0, 0);
2720 let loc = oversized
2721 .append(section, entry, &value)
2722 .await
2723 .expect("Failed to append");
2724 locations.push((section, loc));
2725 oversized.sync(section).await.expect("Failed to sync");
2726 }
2727 drop(oversized);
2728
2729 let middle_section = large_sections[1];
2731 let (blob, size) = context
2732 .open(&cfg.value_partition, &middle_section.to_be_bytes())
2733 .await
2734 .expect("Failed to open blob");
2735 blob.resize(size / 2).await.expect("Failed to truncate");
2736 blob.sync().await.expect("Failed to sync");
2737 drop(blob);
2738
2739 let oversized: Oversized<_, TestEntry, TestValue> =
2741 Oversized::init(context.clone(), cfg.clone())
2742 .await
2743 .expect("Failed to reinit");
2744
2745 let entry = oversized
2747 .get(large_sections[0], 0)
2748 .await
2749 .expect("Failed to get first section");
2750 assert_eq!(entry.id, large_sections[0]);
2751
2752 let entry = oversized
2753 .get(large_sections[2], 0)
2754 .await
2755 .expect("Failed to get last section");
2756 assert_eq!(entry.id, large_sections[2]);
2757
2758 assert!(oversized.get(middle_section, 0).await.is_err());
2760
2761 let new_value: TestValue = [0xAB; 16];
2763 let new_entry = TestEntry::new(999, 0, 0);
2764 let mut oversized = oversized;
2765 oversized
2766 .append(middle_section, new_entry, &new_value)
2767 .await
2768 .expect("Failed to append after recovery");
2769
2770 oversized.destroy().await.expect("Failed to destroy");
2771 });
2772 }
2773
2774 #[test_traced]
2775 fn test_recovery_crash_during_recovery_rewind() {
2776 let executor = deterministic::Runner::default();
2780 executor.start(|context| async move {
2781 let cfg = test_cfg();
2782
2783 let mut oversized: Oversized<_, TestEntry, TestValue> =
2785 Oversized::init(context.clone(), cfg.clone())
2786 .await
2787 .expect("Failed to init");
2788
2789 let mut locations = Vec::new();
2790 for i in 0..5u8 {
2791 let value: TestValue = [i; 16];
2792 let entry = TestEntry::new(i as u64, 0, 0);
2793 let loc = oversized
2794 .append(1, entry, &value)
2795 .await
2796 .expect("Failed to append");
2797 locations.push(loc);
2798 }
2799 oversized.sync(1).await.expect("Failed to sync");
2800 drop(oversized);
2801
2802 let (blob, _) = context
2804 .open(&cfg.value_partition, &1u64.to_be_bytes())
2805 .await
2806 .expect("Failed to open blob");
2807 let keep_size = byte_end(locations[2].1, locations[2].2);
2808 blob.resize(keep_size).await.expect("Failed to truncate");
2809 blob.sync().await.expect("Failed to sync");
2810 drop(blob);
2811
2812 let chunk_size = FixedJournal::<deterministic::Context, TestEntry>::CHUNK_SIZE as u64;
2817 let (index_blob, _) = context
2818 .open(&cfg.index_partition, &1u64.to_be_bytes())
2819 .await
2820 .expect("Failed to open index blob");
2821 let partial_rewind_size = 4 * chunk_size; index_blob
2823 .resize(partial_rewind_size)
2824 .await
2825 .expect("Failed to resize");
2826 index_blob.sync().await.expect("Failed to sync");
2827 drop(index_blob);
2828
2829 let oversized: Oversized<_, TestEntry, TestValue> =
2832 Oversized::init(context.clone(), cfg.clone())
2833 .await
2834 .expect("Failed to reinit after nested crash");
2835
2836 for i in 0..3u8 {
2838 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2839 assert_eq!(entry.id, i as u64);
2840
2841 let (_, offset, size) = locations[i as usize];
2842 let value = oversized
2843 .get_value(1, offset, size)
2844 .await
2845 .expect("Failed to get value");
2846 assert_eq!(value, [i; 16]);
2847 }
2848
2849 assert!(oversized.get(1, 3).await.is_err());
2851
2852 let new_value: TestValue = [0xFF; 16];
2854 let new_entry = TestEntry::new(100, 0, 0);
2855 let mut oversized = oversized;
2856 let (pos, offset, _size) = oversized
2857 .append(1, new_entry, &new_value)
2858 .await
2859 .expect("Failed to append");
2860 assert_eq!(pos, 3); assert_eq!(offset, byte_end(locations[2].1, locations[2].2));
2864
2865 oversized.destroy().await.expect("Failed to destroy");
2866 });
2867 }
2868
2869 #[test_traced]
2870 fn test_recovery_crash_during_orphan_cleanup() {
2871 let executor = deterministic::Runner::default();
2874 executor.start(|context| async move {
2875 let cfg = test_cfg();
2876
2877 let mut oversized: Oversized<_, TestEntry, TestValue> =
2879 Oversized::init(context.clone(), cfg.clone())
2880 .await
2881 .expect("Failed to init");
2882
2883 let value: TestValue = [1; 16];
2884 let entry = TestEntry::new(1, 0, 0);
2885 let (_, offset1, size1) = oversized
2886 .append(1, entry, &value)
2887 .await
2888 .expect("Failed to append");
2889 oversized.sync(1).await.expect("Failed to sync");
2890 drop(oversized);
2891
2892 let glob_cfg = GlobConfig {
2894 partition: cfg.value_partition.clone(),
2895 compression: cfg.compression,
2896 codec_config: (),
2897 write_buffer: cfg.value_write_buffer,
2898 };
2899 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2900 .await
2901 .expect("Failed to init glob");
2902
2903 for section in 2u64..=4 {
2904 let orphan_value: TestValue = [section as u8; 16];
2905 glob.append(section, &orphan_value)
2906 .await
2907 .expect("Failed to append orphan");
2908 glob.sync(section).await.expect("Failed to sync glob");
2909 }
2910 drop(glob);
2911
2912 context
2915 .remove(&cfg.value_partition, Some(&2u64.to_be_bytes()))
2916 .await
2917 .expect("Failed to remove section 2");
2918
2919 let mut oversized: Oversized<_, TestEntry, TestValue> =
2921 Oversized::init(context.clone(), cfg.clone())
2922 .await
2923 .expect("Failed to reinit");
2924
2925 let entry = oversized.get(1, 0).await.expect("Failed to get");
2927 assert_eq!(entry.id, 1);
2928 let value = oversized
2929 .get_value(1, offset1, size1)
2930 .await
2931 .expect("Failed to get value");
2932 assert_eq!(value, [1; 16]);
2933
2934 assert_eq!(oversized.oldest_section(), Some(1));
2936 assert_eq!(oversized.newest_section(), Some(1));
2937
2938 let new_value: TestValue = [42; 16];
2940 let new_entry = TestEntry::new(42, 0, 0);
2941 let (pos, _, _) = oversized
2942 .append(2, new_entry, &new_value)
2943 .await
2944 .expect("Failed to append to section 2");
2945 assert_eq!(pos, 0); oversized.destroy().await.expect("Failed to destroy");
2948 });
2949 }
2950}