1use super::{
44 fixed::{Config as FixedConfig, Journal as FixedJournal},
45 glob::{Config as GlobConfig, Glob},
46};
47use crate::journal::Error;
48use commonware_codec::{Codec, CodecFixed, CodecShared};
49use commonware_runtime::{BufferPooler, Metrics, Storage};
50use futures::{future::try_join, stream::Stream};
51use std::{collections::HashSet, num::NonZeroUsize};
52use tracing::{debug, warn};
53
54pub trait Record: CodecFixed<Cfg = ()> + Clone {
59 fn value_location(&self) -> (u64, u32);
61
62 fn with_location(self, offset: u64, size: u32) -> Self;
66}
67
68#[derive(Clone)]
70pub struct Config<C> {
71 pub index_partition: String,
73
74 pub value_partition: String,
76
77 pub index_page_cache: commonware_runtime::buffer::paged::CacheRef,
79
80 pub index_write_buffer: NonZeroUsize,
82
83 pub value_write_buffer: NonZeroUsize,
85
86 pub compression: Option<u8>,
88
89 pub codec_config: C,
91}
92
93pub struct Oversized<E: BufferPooler + Storage + Metrics, I: Record, V: Codec> {
98 index: FixedJournal<E, I>,
99 values: Glob<E, V>,
100}
101
102impl<E: BufferPooler + Storage + Metrics, I: Record + Send + Sync, V: CodecShared>
103 Oversized<E, I, V>
104{
105 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
111 let index_cfg = FixedConfig {
113 partition: cfg.index_partition,
114 page_cache: cfg.index_page_cache,
115 write_buffer: cfg.index_write_buffer,
116 };
117 let index = FixedJournal::init(context.with_label("index"), index_cfg).await?;
118
119 let value_cfg = GlobConfig {
120 partition: cfg.value_partition,
121 compression: cfg.compression,
122 codec_config: cfg.codec_config,
123 write_buffer: cfg.value_write_buffer,
124 };
125 let values = Glob::init(context.with_label("values"), value_cfg).await?;
126
127 let mut oversized = Self { index, values };
128
129 oversized.recover().await?;
131
132 Ok(oversized)
133 }
134
135 async fn recover(&mut self) -> Result<(), Error> {
141 let chunk_size = FixedJournal::<E, I>::CHUNK_SIZE as u64;
142 let sections: Vec<u64> = self.index.sections().collect();
143
144 for section in sections {
145 let index_size = self.index.size(section).await?;
146 if index_size == 0 {
147 continue;
148 }
149
150 let glob_size = match self.values.size(section).await {
151 Ok(size) => size,
152 Err(Error::AlreadyPrunedToSection(oldest)) => {
153 warn!(
158 section,
159 oldest, "index has section that glob already pruned"
160 );
161 0
162 }
163 Err(e) => return Err(e),
164 };
165
166 let entry_count = index_size / chunk_size;
168 let aligned_size = entry_count * chunk_size;
169 if aligned_size < index_size {
170 warn!(
171 section,
172 index_size, aligned_size, "trailing bytes detected: truncating"
173 );
174 self.index.rewind_section(section, aligned_size).await?;
175 }
176
177 if entry_count == 0 {
179 warn!(
180 section,
181 index_size, "trailing bytes detected: truncating to 0"
182 );
183 self.values.rewind_section(section, 0).await?;
184 continue;
185 }
186
187 let (valid_count, glob_target) = self
189 .find_last_valid_entry(section, entry_count, glob_size)
190 .await;
191
192 if valid_count < entry_count {
194 let valid_size = valid_count * chunk_size;
195 debug!(section, entry_count, valid_count, "rewinding index");
196 self.index.rewind_section(section, valid_size).await?;
197 }
198
199 if glob_size > glob_target {
202 debug!(
203 section,
204 glob_size, glob_target, "truncating glob trailing garbage"
205 );
206 self.values.rewind_section(section, glob_target).await?;
207 }
208 }
209
210 self.cleanup_orphan_value_sections().await?;
212
213 Ok(())
214 }
215
216 async fn cleanup_orphan_value_sections(&mut self) -> Result<(), Error> {
223 let index_sections: HashSet<u64> = self.index.sections().collect();
225
226 let orphan_sections: Vec<u64> = self
228 .values
229 .sections()
230 .filter(|s| !index_sections.contains(s))
231 .collect();
232
233 for section in orphan_sections {
235 warn!(section, "removing orphan value section");
236 self.values.remove_section(section).await?;
237 }
238
239 Ok(())
240 }
241
242 async fn find_last_valid_entry(
248 &self,
249 section: u64,
250 entry_count: u64,
251 glob_size: u64,
252 ) -> (u64, u64) {
253 for pos in (0..entry_count).rev() {
254 match self.index.get(section, pos).await {
255 Ok(entry) => {
256 let (offset, size) = entry.value_location();
257 let entry_end = offset.saturating_add(u64::from(size));
258 if entry_end <= glob_size {
259 return (pos + 1, entry_end);
260 }
261 if pos == entry_count - 1 {
262 warn!(
263 section,
264 pos, glob_size, entry_end, "invalid entry: glob truncated"
265 );
266 }
267 }
268 Err(_) => {
269 if pos == entry_count - 1 {
270 warn!(section, pos, "corrupted last entry, scanning backwards");
271 }
272 }
273 }
274 }
275 (0, 0)
276 }
277
278 pub async fn append(
287 &mut self,
288 section: u64,
289 entry: I,
290 value: &V,
291 ) -> Result<(u64, u64, u32), Error> {
292 let (offset, size) = self.values.append(section, value).await?;
295
296 let entry_with_location = entry.with_location(offset, size);
298 let position = self.index.append(section, &entry_with_location).await?;
299
300 Ok((position, offset, size))
301 }
302
303 pub async fn get(&self, section: u64, position: u64) -> Result<I, Error> {
305 self.index.get(section, position).await
306 }
307
308 pub async fn last(&self, section: u64) -> Result<Option<I>, Error> {
317 self.index.last(section).await
318 }
319
320 pub async fn get_value(&self, section: u64, offset: u64, size: u32) -> Result<V, Error> {
324 self.values.get(section, offset, size).await
325 }
326
327 pub async fn replay(
331 &self,
332 start_section: u64,
333 start_position: u64,
334 buffer: NonZeroUsize,
335 ) -> Result<impl Stream<Item = Result<(u64, u64, I), Error>> + Send + '_, Error> {
336 self.index
337 .replay(start_section, start_position, buffer)
338 .await
339 }
340
341 pub async fn sync(&self, section: u64) -> Result<(), Error> {
343 try_join(self.index.sync(section), self.values.sync(section))
344 .await
345 .map(|_| ())
346 }
347
348 pub async fn sync_all(&self) -> Result<(), Error> {
350 try_join(self.index.sync_all(), self.values.sync_all())
351 .await
352 .map(|_| ())
353 }
354
355 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
361 let index_pruned = self.index.prune(min).await?;
362 let value_pruned = self.values.prune(min).await?;
363 Ok(index_pruned || value_pruned)
364 }
365
366 pub async fn rewind(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
371 self.index.rewind(section, index_size).await?;
373
374 let value_size = match self.index.last(section).await {
376 Ok(Some(entry)) => {
377 let (offset, size) = entry.value_location();
378 offset
379 .checked_add(u64::from(size))
380 .ok_or(Error::OffsetOverflow)?
381 }
382 Ok(None) => 0,
383 Err(Error::SectionOutOfRange(_)) if index_size == 0 => 0,
384 Err(e) => return Err(e),
385 };
386
387 self.values.rewind(section, value_size).await
389 }
390
391 pub async fn rewind_section(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
396 self.index.rewind_section(section, index_size).await?;
398
399 let value_size = match self.index.last(section).await {
401 Ok(Some(entry)) => {
402 let (offset, size) = entry.value_location();
403 offset
404 .checked_add(u64::from(size))
405 .ok_or(Error::OffsetOverflow)?
406 }
407 Ok(None) => 0,
408 Err(Error::SectionOutOfRange(_)) if index_size == 0 => 0,
409 Err(e) => return Err(e),
410 };
411
412 self.values.rewind_section(section, value_size).await
414 }
415
416 pub async fn size(&self, section: u64) -> Result<u64, Error> {
420 self.index.size(section).await
421 }
422
423 pub async fn value_size(&self, section: u64) -> Result<u64, Error> {
425 match self.index.last(section).await {
426 Ok(Some(entry)) => {
427 let (offset, size) = entry.value_location();
428 offset
429 .checked_add(u64::from(size))
430 .ok_or(Error::OffsetOverflow)
431 }
432 Ok(None) | Err(Error::SectionOutOfRange(_)) => Ok(0),
433 Err(e) => Err(e),
434 }
435 }
436
437 pub fn oldest_section(&self) -> Option<u64> {
439 self.index.oldest_section()
440 }
441
442 pub fn newest_section(&self) -> Option<u64> {
444 self.index.newest_section()
445 }
446
447 pub async fn destroy(self) -> Result<(), Error> {
449 try_join(self.index.destroy(), self.values.destroy())
450 .await
451 .map(|_| ())
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458 use commonware_codec::{FixedSize, Read, ReadExt, Write};
459 use commonware_cryptography::Crc32;
460 use commonware_macros::test_traced;
461 use commonware_runtime::{
462 buffer::paged::CacheRef, deterministic, Blob as _, Buf, BufMut, BufferPooler, Metrics,
463 Runner,
464 };
465 use commonware_utils::{NZUsize, NZU16};
466
467 fn byte_end(offset: u64, size: u32) -> u64 {
469 offset + u64::from(size)
470 }
471
472 #[derive(Debug, Clone, PartialEq)]
474 struct TestEntry {
475 id: u64,
476 value_offset: u64,
477 value_size: u32,
478 }
479
480 impl TestEntry {
481 fn new(id: u64, value_offset: u64, value_size: u32) -> Self {
482 Self {
483 id,
484 value_offset,
485 value_size,
486 }
487 }
488 }
489
490 impl Write for TestEntry {
491 fn write(&self, buf: &mut impl BufMut) {
492 self.id.write(buf);
493 self.value_offset.write(buf);
494 self.value_size.write(buf);
495 }
496 }
497
498 impl Read for TestEntry {
499 type Cfg = ();
500
501 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
502 let id = u64::read(buf)?;
503 let value_offset = u64::read(buf)?;
504 let value_size = u32::read(buf)?;
505 Ok(Self {
506 id,
507 value_offset,
508 value_size,
509 })
510 }
511 }
512
513 impl FixedSize for TestEntry {
514 const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
515 }
516
517 impl Record for TestEntry {
518 fn value_location(&self) -> (u64, u32) {
519 (self.value_offset, self.value_size)
520 }
521
522 fn with_location(mut self, offset: u64, size: u32) -> Self {
523 self.value_offset = offset;
524 self.value_size = size;
525 self
526 }
527 }
528
529 fn test_cfg(pooler: &impl BufferPooler) -> Config<()> {
530 Config {
531 index_partition: "test-index".into(),
532 value_partition: "test-values".into(),
533 index_page_cache: CacheRef::from_pooler(pooler, NZU16!(64), NZUsize!(8)),
534 index_write_buffer: NZUsize!(1024),
535 value_write_buffer: NZUsize!(1024),
536 compression: None,
537 codec_config: (),
538 }
539 }
540
541 type TestValue = [u8; 16];
543
544 #[test_traced]
545 fn test_oversized_append_and_get() {
546 let executor = deterministic::Runner::default();
547 executor.start(|context| async move {
548 let cfg = test_cfg(&context);
549 let mut oversized: Oversized<_, TestEntry, TestValue> =
550 Oversized::init(context, cfg).await.expect("Failed to init");
551
552 let value: TestValue = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
554 let entry = TestEntry::new(42, 0, 0);
555 let (position, offset, size) = oversized
556 .append(1, entry, &value)
557 .await
558 .expect("Failed to append");
559
560 assert_eq!(position, 0);
561
562 let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
564 assert_eq!(retrieved_entry.id, 42);
565
566 let retrieved_value = oversized
568 .get_value(1, offset, size)
569 .await
570 .expect("Failed to get value");
571 assert_eq!(retrieved_value, value);
572
573 oversized.destroy().await.expect("Failed to destroy");
574 });
575 }
576
577 #[test_traced]
578 fn test_oversized_crash_recovery() {
579 let executor = deterministic::Runner::default();
580 executor.start(|context| async move {
581 let cfg = test_cfg(&context);
582
583 let mut oversized: Oversized<_, TestEntry, TestValue> =
585 Oversized::init(context.with_label("first"), cfg.clone())
586 .await
587 .expect("Failed to init");
588
589 let mut locations = Vec::new();
591 for i in 0..5u8 {
592 let value: TestValue = [i; 16];
593 let entry = TestEntry::new(i as u64, 0, 0);
594 let (position, offset, size) = oversized
595 .append(1, entry, &value)
596 .await
597 .expect("Failed to append");
598 locations.push((position, offset, size));
599 }
600 oversized.sync(1).await.expect("Failed to sync");
601 drop(oversized);
602
603 let (blob, _) = context
605 .open(&cfg.value_partition, &1u64.to_be_bytes())
606 .await
607 .expect("Failed to open blob");
608
609 let keep_size = byte_end(locations[2].1, locations[2].2);
611 blob.resize(keep_size).await.expect("Failed to truncate");
612 blob.sync().await.expect("Failed to sync");
613 drop(blob);
614
615 let oversized: Oversized<_, TestEntry, TestValue> =
617 Oversized::init(context.with_label("second"), cfg.clone())
618 .await
619 .expect("Failed to reinit");
620
621 for i in 0..3u8 {
623 let (position, offset, size) = locations[i as usize];
624 let entry = oversized.get(1, position).await.expect("Failed to get");
625 assert_eq!(entry.id, i as u64);
626
627 let value = oversized
628 .get_value(1, offset, size)
629 .await
630 .expect("Failed to get value");
631 assert_eq!(value, [i; 16]);
632 }
633
634 let result = oversized.get(1, 3).await;
636 assert!(result.is_err());
637
638 oversized.destroy().await.expect("Failed to destroy");
639 });
640 }
641
642 #[test_traced]
643 fn test_oversized_persistence() {
644 let executor = deterministic::Runner::default();
645 executor.start(|context| async move {
646 let cfg = test_cfg(&context);
647
648 let mut oversized: Oversized<_, TestEntry, TestValue> =
650 Oversized::init(context.with_label("first"), cfg.clone())
651 .await
652 .expect("Failed to init");
653
654 let value: TestValue = [42; 16];
655 let entry = TestEntry::new(123, 0, 0);
656 let (position, offset, size) = oversized
657 .append(1, entry, &value)
658 .await
659 .expect("Failed to append");
660 oversized.sync(1).await.expect("Failed to sync");
661 drop(oversized);
662
663 let oversized: Oversized<_, TestEntry, TestValue> =
665 Oversized::init(context.with_label("second"), cfg)
666 .await
667 .expect("Failed to reinit");
668
669 let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
670 assert_eq!(retrieved_entry.id, 123);
671
672 let retrieved_value = oversized
673 .get_value(1, offset, size)
674 .await
675 .expect("Failed to get value");
676 assert_eq!(retrieved_value, value);
677
678 oversized.destroy().await.expect("Failed to destroy");
679 });
680 }
681
682 #[test_traced]
683 fn test_oversized_prune() {
684 let executor = deterministic::Runner::default();
685 executor.start(|context| async move {
686 let cfg = test_cfg(&context);
687 let mut oversized: Oversized<_, TestEntry, TestValue> =
688 Oversized::init(context, cfg).await.expect("Failed to init");
689
690 for section in 1u64..=5 {
692 let value: TestValue = [section as u8; 16];
693 let entry = TestEntry::new(section, 0, 0);
694 oversized
695 .append(section, entry, &value)
696 .await
697 .expect("Failed to append");
698 oversized.sync(section).await.expect("Failed to sync");
699 }
700
701 oversized.prune(3).await.expect("Failed to prune");
703
704 assert!(oversized.get(1, 0).await.is_err());
706 assert!(oversized.get(2, 0).await.is_err());
707
708 assert!(oversized.get(3, 0).await.is_ok());
710 assert!(oversized.get(4, 0).await.is_ok());
711 assert!(oversized.get(5, 0).await.is_ok());
712
713 oversized.destroy().await.expect("Failed to destroy");
714 });
715 }
716
717 #[test_traced]
718 fn test_recovery_empty_section() {
719 let executor = deterministic::Runner::default();
720 executor.start(|context| async move {
721 let cfg = test_cfg(&context);
722
723 let mut oversized: Oversized<_, TestEntry, TestValue> =
725 Oversized::init(context.with_label("first"), cfg.clone())
726 .await
727 .expect("Failed to init");
728
729 let value: TestValue = [42; 16];
731 let entry = TestEntry::new(1, 0, 0);
732 oversized
733 .append(2, entry, &value)
734 .await
735 .expect("Failed to append");
736 oversized.sync(2).await.expect("Failed to sync");
737 drop(oversized);
738
739 let oversized: Oversized<_, TestEntry, TestValue> =
741 Oversized::init(context.with_label("second"), cfg)
742 .await
743 .expect("Failed to reinit");
744
745 let entry = oversized.get(2, 0).await.expect("Failed to get");
747 assert_eq!(entry.id, 1);
748
749 oversized.destroy().await.expect("Failed to destroy");
750 });
751 }
752
753 #[test_traced]
754 fn test_recovery_all_entries_invalid() {
755 let executor = deterministic::Runner::default();
756 executor.start(|context| async move {
757 let cfg = test_cfg(&context);
758
759 let mut oversized: Oversized<_, TestEntry, TestValue> =
761 Oversized::init(context.with_label("first"), cfg.clone())
762 .await
763 .expect("Failed to init");
764
765 for i in 0..5u8 {
767 let value: TestValue = [i; 16];
768 let entry = TestEntry::new(i as u64, 0, 0);
769 oversized
770 .append(1, entry, &value)
771 .await
772 .expect("Failed to append");
773 }
774 oversized.sync(1).await.expect("Failed to sync");
775 drop(oversized);
776
777 let (blob, _) = context
779 .open(&cfg.value_partition, &1u64.to_be_bytes())
780 .await
781 .expect("Failed to open blob");
782 blob.resize(0).await.expect("Failed to truncate");
783 blob.sync().await.expect("Failed to sync");
784 drop(blob);
785
786 let mut oversized: Oversized<_, TestEntry, TestValue> =
788 Oversized::init(context.with_label("second"), cfg)
789 .await
790 .expect("Failed to reinit");
791
792 let result = oversized.get(1, 0).await;
794 assert!(result.is_err());
795
796 let value: TestValue = [99; 16];
798 let entry = TestEntry::new(100, 0, 0);
799 let (pos, offset, size) = oversized
800 .append(1, entry, &value)
801 .await
802 .expect("Failed to append after recovery");
803 assert_eq!(pos, 0);
804
805 let retrieved = oversized.get(1, 0).await.expect("Failed to get");
806 assert_eq!(retrieved.id, 100);
807 let retrieved_value = oversized
808 .get_value(1, offset, size)
809 .await
810 .expect("Failed to get value");
811 assert_eq!(retrieved_value, value);
812
813 oversized.destroy().await.expect("Failed to destroy");
814 });
815 }
816
817 #[test_traced]
818 fn test_recovery_multiple_sections_mixed_validity() {
819 let executor = deterministic::Runner::default();
820 executor.start(|context| async move {
821 let cfg = test_cfg(&context);
822
823 let mut oversized: Oversized<_, TestEntry, TestValue> =
825 Oversized::init(context.with_label("first"), cfg.clone())
826 .await
827 .expect("Failed to init");
828
829 let mut section1_locations = Vec::new();
831 for i in 0..3u8 {
832 let value: TestValue = [i; 16];
833 let entry = TestEntry::new(i as u64, 0, 0);
834 let loc = oversized
835 .append(1, entry, &value)
836 .await
837 .expect("Failed to append");
838 section1_locations.push(loc);
839 }
840 oversized.sync(1).await.expect("Failed to sync");
841
842 let mut section2_locations = Vec::new();
844 for i in 0..5u8 {
845 let value: TestValue = [10 + i; 16];
846 let entry = TestEntry::new(10 + i as u64, 0, 0);
847 let loc = oversized
848 .append(2, entry, &value)
849 .await
850 .expect("Failed to append");
851 section2_locations.push(loc);
852 }
853 oversized.sync(2).await.expect("Failed to sync");
854
855 for i in 0..2u8 {
857 let value: TestValue = [20 + i; 16];
858 let entry = TestEntry::new(20 + i as u64, 0, 0);
859 oversized
860 .append(3, entry, &value)
861 .await
862 .expect("Failed to append");
863 }
864 oversized.sync(3).await.expect("Failed to sync");
865 drop(oversized);
866
867 let (blob, _) = context
869 .open(&cfg.value_partition, &1u64.to_be_bytes())
870 .await
871 .expect("Failed to open blob");
872 let keep_size = byte_end(section1_locations[0].1, section1_locations[0].2);
873 blob.resize(keep_size).await.expect("Failed to truncate");
874 blob.sync().await.expect("Failed to sync");
875 drop(blob);
876
877 let (blob, _) = context
879 .open(&cfg.value_partition, &2u64.to_be_bytes())
880 .await
881 .expect("Failed to open blob");
882 let keep_size = byte_end(section2_locations[2].1, section2_locations[2].2);
883 blob.resize(keep_size).await.expect("Failed to truncate");
884 blob.sync().await.expect("Failed to sync");
885 drop(blob);
886
887 let oversized: Oversized<_, TestEntry, TestValue> =
891 Oversized::init(context.with_label("second"), cfg)
892 .await
893 .expect("Failed to reinit");
894
895 assert!(oversized.get(1, 0).await.is_ok());
897 assert!(oversized.get(1, 1).await.is_err());
898 assert!(oversized.get(1, 2).await.is_err());
899
900 assert!(oversized.get(2, 0).await.is_ok());
902 assert!(oversized.get(2, 1).await.is_ok());
903 assert!(oversized.get(2, 2).await.is_ok());
904 assert!(oversized.get(2, 3).await.is_err());
905 assert!(oversized.get(2, 4).await.is_err());
906
907 assert!(oversized.get(3, 0).await.is_ok());
909 assert!(oversized.get(3, 1).await.is_ok());
910
911 oversized.destroy().await.expect("Failed to destroy");
912 });
913 }
914
915 #[test_traced]
916 fn test_recovery_corrupted_last_index_entry() {
917 let executor = deterministic::Runner::default();
918 executor.start(|context| async move {
919 let cfg = Config {
923 index_partition: "test-index".into(),
924 value_partition: "test-values".into(),
925 index_page_cache: CacheRef::from_pooler(
926 &context,
927 NZU16!(TestEntry::SIZE as u16),
928 NZUsize!(8),
929 ),
930 index_write_buffer: NZUsize!(1024),
931 value_write_buffer: NZUsize!(1024),
932 compression: None,
933 codec_config: (),
934 };
935
936 let mut oversized: Oversized<_, TestEntry, TestValue> =
938 Oversized::init(context.with_label("first"), cfg.clone())
939 .await
940 .expect("Failed to init");
941
942 for i in 0..5u8 {
944 let value: TestValue = [i; 16];
945 let entry = TestEntry::new(i as u64, 0, 0);
946 oversized
947 .append(1, entry, &value)
948 .await
949 .expect("Failed to append");
950 }
951 oversized.sync(1).await.expect("Failed to sync");
952 drop(oversized);
953
954 let (blob, size) = context
956 .open(&cfg.index_partition, &1u64.to_be_bytes())
957 .await
958 .expect("Failed to open blob");
959
960 assert_eq!(size, 160);
964 let last_page_crc_offset = size - 12;
965 blob.write_at(last_page_crc_offset, vec![0xFF; 12])
966 .await
967 .expect("Failed to corrupt");
968 blob.sync().await.expect("Failed to sync");
969 drop(blob);
970
971 let mut oversized: Oversized<_, TestEntry, TestValue> =
973 Oversized::init(context.with_label("second"), cfg)
974 .await
975 .expect("Failed to reinit");
976
977 for i in 0..4u8 {
979 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
980 assert_eq!(entry.id, i as u64);
981 }
982
983 assert!(oversized.get(1, 4).await.is_err());
985
986 let value: TestValue = [99; 16];
988 let entry = TestEntry::new(100, 0, 0);
989 let (pos, offset, size) = oversized
990 .append(1, entry, &value)
991 .await
992 .expect("Failed to append after recovery");
993 assert_eq!(pos, 4);
994
995 let retrieved = oversized.get(1, 4).await.expect("Failed to get");
996 assert_eq!(retrieved.id, 100);
997 let retrieved_value = oversized
998 .get_value(1, offset, size)
999 .await
1000 .expect("Failed to get value");
1001 assert_eq!(retrieved_value, value);
1002
1003 oversized.destroy().await.expect("Failed to destroy");
1004 });
1005 }
1006
1007 #[test_traced]
1008 fn test_recovery_all_entries_valid() {
1009 let executor = deterministic::Runner::default();
1010 executor.start(|context| async move {
1011 let cfg = test_cfg(&context);
1012
1013 let mut oversized: Oversized<_, TestEntry, TestValue> =
1015 Oversized::init(context.with_label("first"), cfg.clone())
1016 .await
1017 .expect("Failed to init");
1018
1019 for section in 1u64..=3 {
1021 for i in 0..10u8 {
1022 let value: TestValue = [(section as u8) * 10 + i; 16];
1023 let entry = TestEntry::new(section * 100 + i as u64, 0, 0);
1024 oversized
1025 .append(section, entry, &value)
1026 .await
1027 .expect("Failed to append");
1028 }
1029 oversized.sync(section).await.expect("Failed to sync");
1030 }
1031 drop(oversized);
1032
1033 let oversized: Oversized<_, TestEntry, TestValue> =
1035 Oversized::init(context.with_label("second"), cfg)
1036 .await
1037 .expect("Failed to reinit");
1038
1039 for section in 1u64..=3 {
1041 for i in 0..10u8 {
1042 let entry = oversized
1043 .get(section, i as u64)
1044 .await
1045 .expect("Failed to get");
1046 assert_eq!(entry.id, section * 100 + i as u64);
1047 }
1048 }
1049
1050 oversized.destroy().await.expect("Failed to destroy");
1051 });
1052 }
1053
1054 #[test_traced]
1055 fn test_recovery_single_entry_invalid() {
1056 let executor = deterministic::Runner::default();
1057 executor.start(|context| async move {
1058 let cfg = test_cfg(&context);
1059
1060 let mut oversized: Oversized<_, TestEntry, TestValue> =
1062 Oversized::init(context.with_label("first"), cfg.clone())
1063 .await
1064 .expect("Failed to init");
1065
1066 let value: TestValue = [42; 16];
1067 let entry = TestEntry::new(1, 0, 0);
1068 oversized
1069 .append(1, entry, &value)
1070 .await
1071 .expect("Failed to append");
1072 oversized.sync(1).await.expect("Failed to sync");
1073 drop(oversized);
1074
1075 let (blob, _) = context
1077 .open(&cfg.value_partition, &1u64.to_be_bytes())
1078 .await
1079 .expect("Failed to open blob");
1080 blob.resize(0).await.expect("Failed to truncate");
1081 blob.sync().await.expect("Failed to sync");
1082 drop(blob);
1083
1084 let oversized: Oversized<_, TestEntry, TestValue> =
1086 Oversized::init(context.with_label("second"), cfg)
1087 .await
1088 .expect("Failed to reinit");
1089
1090 assert!(oversized.get(1, 0).await.is_err());
1092
1093 oversized.destroy().await.expect("Failed to destroy");
1094 });
1095 }
1096
1097 #[test_traced]
1098 fn test_recovery_last_entry_off_by_one() {
1099 let executor = deterministic::Runner::default();
1100 executor.start(|context| async move {
1101 let cfg = test_cfg(&context);
1102
1103 let mut oversized: Oversized<_, TestEntry, TestValue> =
1105 Oversized::init(context.with_label("first"), cfg.clone())
1106 .await
1107 .expect("Failed to init");
1108
1109 let mut locations = Vec::new();
1110 for i in 0..3u8 {
1111 let value: TestValue = [i; 16];
1112 let entry = TestEntry::new(i as u64, 0, 0);
1113 let loc = oversized
1114 .append(1, entry, &value)
1115 .await
1116 .expect("Failed to append");
1117 locations.push(loc);
1118 }
1119 oversized.sync(1).await.expect("Failed to sync");
1120 drop(oversized);
1121
1122 let (blob, _) = context
1124 .open(&cfg.value_partition, &1u64.to_be_bytes())
1125 .await
1126 .expect("Failed to open blob");
1127
1128 let last = &locations[2];
1131 let truncate_to = byte_end(last.1, last.2) - 1;
1132 blob.resize(truncate_to).await.expect("Failed to truncate");
1133 blob.sync().await.expect("Failed to sync");
1134 drop(blob);
1135
1136 let mut oversized: Oversized<_, TestEntry, TestValue> =
1138 Oversized::init(context.with_label("second"), cfg)
1139 .await
1140 .expect("Failed to reinit");
1141
1142 assert!(oversized.get(1, 0).await.is_ok());
1144 assert!(oversized.get(1, 1).await.is_ok());
1145
1146 assert!(oversized.get(1, 2).await.is_err());
1148
1149 let value: TestValue = [99; 16];
1151 let entry = TestEntry::new(100, 0, 0);
1152 let (pos, offset, size) = oversized
1153 .append(1, entry, &value)
1154 .await
1155 .expect("Failed to append after recovery");
1156 assert_eq!(pos, 2);
1157
1158 let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1159 assert_eq!(retrieved.id, 100);
1160 let retrieved_value = oversized
1161 .get_value(1, offset, size)
1162 .await
1163 .expect("Failed to get value");
1164 assert_eq!(retrieved_value, value);
1165
1166 oversized.destroy().await.expect("Failed to destroy");
1167 });
1168 }
1169
1170 #[test_traced]
1171 fn test_recovery_glob_missing_entirely() {
1172 let executor = deterministic::Runner::default();
1173 executor.start(|context| async move {
1174 let cfg = test_cfg(&context);
1175
1176 let mut oversized: Oversized<_, TestEntry, TestValue> =
1178 Oversized::init(context.with_label("first"), cfg.clone())
1179 .await
1180 .expect("Failed to init");
1181
1182 for i in 0..3u8 {
1183 let value: TestValue = [i; 16];
1184 let entry = TestEntry::new(i as u64, 0, 0);
1185 oversized
1186 .append(1, entry, &value)
1187 .await
1188 .expect("Failed to append");
1189 }
1190 oversized.sync(1).await.expect("Failed to sync");
1191 drop(oversized);
1192
1193 context
1195 .remove(&cfg.value_partition, Some(&1u64.to_be_bytes()))
1196 .await
1197 .expect("Failed to remove");
1198
1199 let oversized: Oversized<_, TestEntry, TestValue> =
1201 Oversized::init(context.with_label("second"), cfg)
1202 .await
1203 .expect("Failed to reinit");
1204
1205 assert!(oversized.get(1, 0).await.is_err());
1207 assert!(oversized.get(1, 1).await.is_err());
1208 assert!(oversized.get(1, 2).await.is_err());
1209
1210 oversized.destroy().await.expect("Failed to destroy");
1211 });
1212 }
1213
1214 #[test_traced]
1215 fn test_recovery_can_append_after_recovery() {
1216 let executor = deterministic::Runner::default();
1217 executor.start(|context| async move {
1218 let cfg = test_cfg(&context);
1219
1220 let mut oversized: Oversized<_, TestEntry, TestValue> =
1222 Oversized::init(context.with_label("first"), cfg.clone())
1223 .await
1224 .expect("Failed to init");
1225
1226 let mut locations = Vec::new();
1227 for i in 0..5u8 {
1228 let value: TestValue = [i; 16];
1229 let entry = TestEntry::new(i as u64, 0, 0);
1230 let loc = oversized
1231 .append(1, entry, &value)
1232 .await
1233 .expect("Failed to append");
1234 locations.push(loc);
1235 }
1236 oversized.sync(1).await.expect("Failed to sync");
1237 drop(oversized);
1238
1239 let (blob, _) = context
1241 .open(&cfg.value_partition, &1u64.to_be_bytes())
1242 .await
1243 .expect("Failed to open blob");
1244 let keep_size = byte_end(locations[1].1, locations[1].2);
1245 blob.resize(keep_size).await.expect("Failed to truncate");
1246 blob.sync().await.expect("Failed to sync");
1247 drop(blob);
1248
1249 let mut oversized: Oversized<_, TestEntry, TestValue> =
1251 Oversized::init(context.with_label("second"), cfg.clone())
1252 .await
1253 .expect("Failed to reinit");
1254
1255 assert!(oversized.get(1, 0).await.is_ok());
1257 assert!(oversized.get(1, 1).await.is_ok());
1258 assert!(oversized.get(1, 2).await.is_err());
1259
1260 for i in 10..15u8 {
1262 let value: TestValue = [i; 16];
1263 let entry = TestEntry::new(i as u64, 0, 0);
1264 oversized
1265 .append(1, entry, &value)
1266 .await
1267 .expect("Failed to append after recovery");
1268 }
1269 oversized.sync(1).await.expect("Failed to sync");
1270
1271 for i in 0..5u8 {
1273 let entry = oversized
1274 .get(1, 2 + i as u64)
1275 .await
1276 .expect("Failed to get new entry");
1277 assert_eq!(entry.id, (10 + i) as u64);
1278 }
1279
1280 oversized.destroy().await.expect("Failed to destroy");
1281 });
1282 }
1283
1284 #[test_traced]
1285 fn test_recovery_glob_pruned_but_index_not() {
1286 let executor = deterministic::Runner::default();
1287 executor.start(|context| async move {
1288 let cfg = test_cfg(&context);
1289
1290 let mut oversized: Oversized<_, TestEntry, TestValue> =
1292 Oversized::init(context.with_label("first"), cfg.clone())
1293 .await
1294 .expect("Failed to init");
1295
1296 for section in 1u64..=3 {
1297 let value: TestValue = [section as u8; 16];
1298 let entry = TestEntry::new(section, 0, 0);
1299 oversized
1300 .append(section, entry, &value)
1301 .await
1302 .expect("Failed to append");
1303 oversized.sync(section).await.expect("Failed to sync");
1304 }
1305 drop(oversized);
1306
1307 use crate::journal::segmented::glob::{Config as GlobConfig, Glob};
1310 let glob_cfg = GlobConfig {
1311 partition: cfg.value_partition.clone(),
1312 compression: cfg.compression,
1313 codec_config: (),
1314 write_buffer: cfg.value_write_buffer,
1315 };
1316 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1317 .await
1318 .expect("Failed to init glob");
1319 glob.prune(2).await.expect("Failed to prune glob");
1320 glob.sync_all().await.expect("Failed to sync glob");
1321 drop(glob);
1322
1323 let oversized: Oversized<_, TestEntry, TestValue> =
1326 Oversized::init(context.with_label("second"), cfg.clone())
1327 .await
1328 .expect("Failed to reinit");
1329
1330 assert!(oversized.get(1, 0).await.is_err());
1332
1333 assert!(oversized.get(2, 0).await.is_ok());
1335 assert!(oversized.get(3, 0).await.is_ok());
1336
1337 oversized.destroy().await.expect("Failed to destroy");
1338 });
1339 }
1340
1341 #[test_traced]
1342 fn test_recovery_index_partition_deleted() {
1343 let executor = deterministic::Runner::default();
1344 executor.start(|context| async move {
1345 let cfg = test_cfg(&context);
1346
1347 let mut oversized: Oversized<_, TestEntry, TestValue> =
1349 Oversized::init(context.with_label("first"), cfg.clone())
1350 .await
1351 .expect("Failed to init");
1352
1353 for section in 1u64..=3 {
1354 let value: TestValue = [section as u8; 16];
1355 let entry = TestEntry::new(section, 0, 0);
1356 oversized
1357 .append(section, entry, &value)
1358 .await
1359 .expect("Failed to append");
1360 oversized.sync(section).await.expect("Failed to sync");
1361 }
1362 drop(oversized);
1363
1364 context
1366 .remove(&cfg.index_partition, Some(&2u64.to_be_bytes()))
1367 .await
1368 .expect("Failed to remove index");
1369
1370 let oversized: Oversized<_, TestEntry, TestValue> =
1373 Oversized::init(context.with_label("second"), cfg.clone())
1374 .await
1375 .expect("Failed to reinit");
1376
1377 assert!(oversized.get(1, 0).await.is_ok());
1379 assert!(oversized.get(3, 0).await.is_ok());
1380
1381 assert!(oversized.get(2, 0).await.is_err());
1383
1384 oversized.destroy().await.expect("Failed to destroy");
1385 });
1386 }
1387
1388 #[test_traced]
1389 fn test_recovery_index_synced_but_glob_not() {
1390 let executor = deterministic::Runner::default();
1391 executor.start(|context| async move {
1392 let cfg = test_cfg(&context);
1393
1394 let mut oversized: Oversized<_, TestEntry, TestValue> =
1396 Oversized::init(context.with_label("first"), cfg.clone())
1397 .await
1398 .expect("Failed to init");
1399
1400 let mut locations = Vec::new();
1402 for i in 0..3u8 {
1403 let value: TestValue = [i; 16];
1404 let entry = TestEntry::new(i as u64, 0, 0);
1405 let loc = oversized
1406 .append(1, entry, &value)
1407 .await
1408 .expect("Failed to append");
1409 locations.push(loc);
1410 }
1411 oversized.sync(1).await.expect("Failed to sync");
1412
1413 for i in 10..15u8 {
1415 let value: TestValue = [i; 16];
1416 let entry = TestEntry::new(i as u64, 0, 0);
1417 oversized
1418 .append(1, entry, &value)
1419 .await
1420 .expect("Failed to append");
1421 }
1422 drop(oversized);
1424
1425 let (blob, _) = context
1428 .open(&cfg.value_partition, &1u64.to_be_bytes())
1429 .await
1430 .expect("Failed to open blob");
1431 let synced_size = byte_end(locations[2].1, locations[2].2);
1432 blob.resize(synced_size).await.expect("Failed to truncate");
1433 blob.sync().await.expect("Failed to sync");
1434 drop(blob);
1435
1436 let oversized: Oversized<_, TestEntry, TestValue> =
1438 Oversized::init(context.with_label("second"), cfg)
1439 .await
1440 .expect("Failed to reinit");
1441
1442 for i in 0..3u8 {
1444 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1445 assert_eq!(entry.id, i as u64);
1446 }
1447
1448 assert!(oversized.get(1, 3).await.is_err());
1450
1451 oversized.destroy().await.expect("Failed to destroy");
1452 });
1453 }
1454
1455 #[test_traced]
1456 fn test_recovery_glob_synced_but_index_not() {
1457 let executor = deterministic::Runner::default();
1458 executor.start(|context| async move {
1459 let cfg = Config {
1463 index_partition: "test-index".into(),
1464 value_partition: "test-values".into(),
1465 index_page_cache: CacheRef::from_pooler(
1466 &context,
1467 NZU16!(TestEntry::SIZE as u16),
1468 NZUsize!(8),
1469 ),
1470 index_write_buffer: NZUsize!(1024),
1471 value_write_buffer: NZUsize!(1024),
1472 compression: None,
1473 codec_config: (),
1474 };
1475
1476 let mut oversized: Oversized<_, TestEntry, TestValue> =
1478 Oversized::init(context.with_label("first"), cfg.clone())
1479 .await
1480 .expect("Failed to init");
1481
1482 let mut locations = Vec::new();
1484 for i in 0..3u8 {
1485 let value: TestValue = [i; 16];
1486 let entry = TestEntry::new(i as u64, 0, 0);
1487 let loc = oversized
1488 .append(1, entry, &value)
1489 .await
1490 .expect("Failed to append");
1491 locations.push(loc);
1492 }
1493 oversized.sync(1).await.expect("Failed to sync");
1494 drop(oversized);
1495
1496 let (blob, _size) = context
1499 .open(&cfg.index_partition, &1u64.to_be_bytes())
1500 .await
1501 .expect("Failed to open blob");
1502
1503 let physical_page_size = (TestEntry::SIZE + 12) as u64;
1506 blob.resize(2 * physical_page_size)
1507 .await
1508 .expect("Failed to truncate");
1509 blob.sync().await.expect("Failed to sync");
1510 drop(blob);
1511
1512 let mut oversized: Oversized<_, TestEntry, TestValue> =
1514 Oversized::init(context.with_label("second"), cfg.clone())
1515 .await
1516 .expect("Failed to reinit");
1517
1518 for i in 0..2u8 {
1520 let (position, offset, size) = locations[i as usize];
1521 let entry = oversized.get(1, position).await.expect("Failed to get");
1522 assert_eq!(entry.id, i as u64);
1523
1524 let value = oversized
1525 .get_value(1, offset, size)
1526 .await
1527 .expect("Failed to get value");
1528 assert_eq!(value, [i; 16]);
1529 }
1530
1531 assert!(oversized.get(1, 2).await.is_err());
1533
1534 let mut new_locations = Vec::new();
1536 for i in 10..13u8 {
1537 let value: TestValue = [i; 16];
1538 let entry = TestEntry::new(i as u64, 0, 0);
1539 let (position, offset, size) = oversized
1540 .append(1, entry, &value)
1541 .await
1542 .expect("Failed to append after recovery");
1543
1544 assert_eq!(position, (i - 10 + 2) as u64);
1546 new_locations.push((position, offset, size, i));
1547
1548 let retrieved = oversized.get(1, position).await.expect("Failed to get");
1550 assert_eq!(retrieved.id, i as u64);
1551
1552 let retrieved_value = oversized
1553 .get_value(1, offset, size)
1554 .await
1555 .expect("Failed to get value");
1556 assert_eq!(retrieved_value, value);
1557 }
1558
1559 oversized.sync(1).await.expect("Failed to sync");
1561 drop(oversized);
1562
1563 let oversized: Oversized<_, TestEntry, TestValue> =
1565 Oversized::init(context.with_label("third"), cfg)
1566 .await
1567 .expect("Failed to reinit after append");
1568
1569 for i in 0..2u8 {
1572 let (position, offset, size) = locations[i as usize];
1573 let entry = oversized.get(1, position).await.expect("Failed to get");
1574 assert_eq!(entry.id, i as u64);
1575
1576 let value = oversized
1577 .get_value(1, offset, size)
1578 .await
1579 .expect("Failed to get value");
1580 assert_eq!(value, [i; 16]);
1581 }
1582
1583 for (position, offset, size, expected_id) in &new_locations {
1585 let entry = oversized
1586 .get(1, *position)
1587 .await
1588 .expect("Failed to get new entry after restart");
1589 assert_eq!(entry.id, *expected_id as u64);
1590
1591 let value = oversized
1592 .get_value(1, *offset, *size)
1593 .await
1594 .expect("Failed to get new value after restart");
1595 assert_eq!(value, [*expected_id; 16]);
1596 }
1597
1598 assert!(oversized.get(1, 4).await.is_ok());
1600 assert!(oversized.get(1, 5).await.is_err());
1601
1602 oversized.destroy().await.expect("Failed to destroy");
1603 });
1604 }
1605
1606 #[test_traced]
1607 fn test_recovery_partial_index_entry() {
1608 let executor = deterministic::Runner::default();
1609 executor.start(|context| async move {
1610 let cfg = test_cfg(&context);
1611
1612 let mut oversized: Oversized<_, TestEntry, TestValue> =
1614 Oversized::init(context.with_label("first"), cfg.clone())
1615 .await
1616 .expect("Failed to init");
1617
1618 for i in 0..3u8 {
1620 let value: TestValue = [i; 16];
1621 let entry = TestEntry::new(i as u64, 0, 0);
1622 oversized
1623 .append(1, entry, &value)
1624 .await
1625 .expect("Failed to append");
1626 }
1627 oversized.sync(1).await.expect("Failed to sync");
1628 drop(oversized);
1629
1630 let (blob, _) = context
1634 .open(&cfg.index_partition, &1u64.to_be_bytes())
1635 .await
1636 .expect("Failed to open blob");
1637 let partial_size = 3 * 24 + 10; blob.resize(partial_size).await.expect("Failed to resize");
1639 blob.sync().await.expect("Failed to sync");
1640 drop(blob);
1641
1642 let mut oversized: Oversized<_, TestEntry, TestValue> =
1644 Oversized::init(context.with_label("second"), cfg.clone())
1645 .await
1646 .expect("Failed to reinit");
1647
1648 for i in 0..3u8 {
1650 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1651 assert_eq!(entry.id, i as u64);
1652 }
1653
1654 assert!(oversized.get(1, 3).await.is_err());
1656
1657 let value: TestValue = [42; 16];
1659 let entry = TestEntry::new(100, 0, 0);
1660 let (pos, offset, size) = oversized
1661 .append(1, entry, &value)
1662 .await
1663 .expect("Failed to append after recovery");
1664 assert_eq!(pos, 3);
1665
1666 let retrieved = oversized.get(1, 3).await.expect("Failed to get new entry");
1668 assert_eq!(retrieved.id, 100);
1669 let retrieved_value = oversized
1670 .get_value(1, offset, size)
1671 .await
1672 .expect("Failed to get new value");
1673 assert_eq!(retrieved_value, value);
1674
1675 oversized.destroy().await.expect("Failed to destroy");
1676 });
1677 }
1678
1679 #[test_traced]
1680 fn test_recovery_only_partial_entry() {
1681 let executor = deterministic::Runner::default();
1682 executor.start(|context| async move {
1683 let cfg = test_cfg(&context);
1684
1685 let mut oversized: Oversized<_, TestEntry, TestValue> =
1687 Oversized::init(context.with_label("first"), cfg.clone())
1688 .await
1689 .expect("Failed to init");
1690
1691 let value: TestValue = [42; 16];
1692 let entry = TestEntry::new(1, 0, 0);
1693 oversized
1694 .append(1, entry, &value)
1695 .await
1696 .expect("Failed to append");
1697 oversized.sync(1).await.expect("Failed to sync");
1698 drop(oversized);
1699
1700 let (blob, _) = context
1702 .open(&cfg.index_partition, &1u64.to_be_bytes())
1703 .await
1704 .expect("Failed to open blob");
1705 blob.resize(10).await.expect("Failed to resize"); blob.sync().await.expect("Failed to sync");
1707 drop(blob);
1708
1709 let mut oversized: Oversized<_, TestEntry, TestValue> =
1711 Oversized::init(context.with_label("second"), cfg.clone())
1712 .await
1713 .expect("Failed to reinit");
1714
1715 assert!(oversized.get(1, 0).await.is_err());
1717
1718 let value: TestValue = [99; 16];
1720 let entry = TestEntry::new(100, 0, 0);
1721 let (pos, offset, size) = oversized
1722 .append(1, entry, &value)
1723 .await
1724 .expect("Failed to append after recovery");
1725 assert_eq!(pos, 0);
1726
1727 let retrieved = oversized.get(1, 0).await.expect("Failed to get");
1728 assert_eq!(retrieved.id, 100);
1729 let retrieved_value = oversized
1730 .get_value(1, offset, size)
1731 .await
1732 .expect("Failed to get value");
1733 assert_eq!(retrieved_value, value);
1734
1735 oversized.destroy().await.expect("Failed to destroy");
1736 });
1737 }
1738
1739 #[test_traced]
1740 fn test_recovery_crash_during_rewind_index_ahead() {
1741 let executor = deterministic::Runner::default();
1743 executor.start(|context| async move {
1744 let cfg = Config {
1748 index_partition: "test-index".into(),
1749 value_partition: "test-values".into(),
1750 index_page_cache: CacheRef::from_pooler(
1751 &context,
1752 NZU16!(TestEntry::SIZE as u16),
1753 NZUsize!(8),
1754 ),
1755 index_write_buffer: NZUsize!(1024),
1756 value_write_buffer: NZUsize!(1024),
1757 compression: None,
1758 codec_config: (),
1759 };
1760
1761 let mut oversized: Oversized<_, TestEntry, TestValue> =
1763 Oversized::init(context.with_label("first"), cfg.clone())
1764 .await
1765 .expect("Failed to init");
1766
1767 let mut locations = Vec::new();
1768 for i in 0..5u8 {
1769 let value: TestValue = [i; 16];
1770 let entry = TestEntry::new(i as u64, 0, 0);
1771 let loc = oversized
1772 .append(1, entry, &value)
1773 .await
1774 .expect("Failed to append");
1775 locations.push(loc);
1776 }
1777 oversized.sync(1).await.expect("Failed to sync");
1778 drop(oversized);
1779
1780 let (blob, _) = context
1783 .open(&cfg.index_partition, &1u64.to_be_bytes())
1784 .await
1785 .expect("Failed to open blob");
1786 let physical_page_size = (TestEntry::SIZE + 12) as u64;
1788 blob.resize(2 * physical_page_size)
1789 .await
1790 .expect("Failed to truncate");
1791 blob.sync().await.expect("Failed to sync");
1792 drop(blob);
1793
1794 let mut oversized: Oversized<_, TestEntry, TestValue> =
1796 Oversized::init(context.with_label("second"), cfg.clone())
1797 .await
1798 .expect("Failed to reinit");
1799
1800 for i in 0..2u8 {
1802 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1803 assert_eq!(entry.id, i as u64);
1804 }
1805
1806 assert!(oversized.get(1, 2).await.is_err());
1808
1809 let (pos, _, _) = oversized
1811 .append(1, TestEntry::new(100, 0, 0), &[100u8; 16])
1812 .await
1813 .expect("Failed to append");
1814 assert_eq!(pos, 2);
1815
1816 oversized.destroy().await.expect("Failed to destroy");
1817 });
1818 }
1819
1820 #[test_traced]
1821 fn test_recovery_crash_during_rewind_glob_ahead() {
1822 let executor = deterministic::Runner::default();
1824 executor.start(|context| async move {
1825 let cfg = test_cfg(&context);
1826
1827 let mut oversized: Oversized<_, TestEntry, TestValue> =
1829 Oversized::init(context.with_label("first"), cfg.clone())
1830 .await
1831 .expect("Failed to init");
1832
1833 let mut locations = Vec::new();
1834 for i in 0..5u8 {
1835 let value: TestValue = [i; 16];
1836 let entry = TestEntry::new(i as u64, 0, 0);
1837 let loc = oversized
1838 .append(1, entry, &value)
1839 .await
1840 .expect("Failed to append");
1841 locations.push(loc);
1842 }
1843 oversized.sync(1).await.expect("Failed to sync");
1844 drop(oversized);
1845
1846 let (blob, _) = context
1849 .open(&cfg.value_partition, &1u64.to_be_bytes())
1850 .await
1851 .expect("Failed to open blob");
1852 let keep_size = byte_end(locations[1].1, locations[1].2);
1853 blob.resize(keep_size).await.expect("Failed to truncate");
1854 blob.sync().await.expect("Failed to sync");
1855 drop(blob);
1856
1857 let mut oversized: Oversized<_, TestEntry, TestValue> =
1859 Oversized::init(context.with_label("second"), cfg.clone())
1860 .await
1861 .expect("Failed to reinit");
1862
1863 for i in 0..2u8 {
1865 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1866 assert_eq!(entry.id, i as u64);
1867 }
1868
1869 assert!(oversized.get(1, 2).await.is_err());
1871
1872 let value: TestValue = [99; 16];
1874 let entry = TestEntry::new(100, 0, 0);
1875 let (pos, offset, size) = oversized
1876 .append(1, entry, &value)
1877 .await
1878 .expect("Failed to append after recovery");
1879 assert_eq!(pos, 2);
1880
1881 let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1882 assert_eq!(retrieved.id, 100);
1883 let retrieved_value = oversized
1884 .get_value(1, offset, size)
1885 .await
1886 .expect("Failed to get value");
1887 assert_eq!(retrieved_value, value);
1888
1889 oversized.destroy().await.expect("Failed to destroy");
1890 });
1891 }
1892
1893 #[test_traced]
1894 fn test_oversized_get_value_invalid_size() {
1895 let executor = deterministic::Runner::default();
1896 executor.start(|context| async move {
1897 let cfg = test_cfg(&context);
1898 let mut oversized: Oversized<_, TestEntry, TestValue> =
1899 Oversized::init(context, cfg).await.expect("Failed to init");
1900
1901 let value: TestValue = [42; 16];
1902 let entry = TestEntry::new(1, 0, 0);
1903 let (_, offset, _size) = oversized
1904 .append(1, entry, &value)
1905 .await
1906 .expect("Failed to append");
1907 oversized.sync(1).await.expect("Failed to sync");
1908
1909 assert!(oversized.get_value(1, offset, 0).await.is_err());
1911
1912 for size in 1..4u32 {
1915 let result = oversized.get_value(1, offset, size).await;
1916 assert!(
1917 matches!(
1918 result,
1919 Err(Error::Codec(_))
1920 | Err(Error::ChecksumMismatch(_, _))
1921 | Err(Error::Runtime(_))
1922 ),
1923 "expected error, got: {:?}",
1924 result
1925 );
1926 }
1927
1928 oversized.destroy().await.expect("Failed to destroy");
1929 });
1930 }
1931
1932 #[test_traced]
1933 fn test_oversized_get_value_wrong_size() {
1934 let executor = deterministic::Runner::default();
1935 executor.start(|context| async move {
1936 let cfg = test_cfg(&context);
1937 let mut oversized: Oversized<_, TestEntry, TestValue> =
1938 Oversized::init(context, cfg).await.expect("Failed to init");
1939
1940 let value: TestValue = [42; 16];
1941 let entry = TestEntry::new(1, 0, 0);
1942 let (_, offset, correct_size) = oversized
1943 .append(1, entry, &value)
1944 .await
1945 .expect("Failed to append");
1946 oversized.sync(1).await.expect("Failed to sync");
1947
1948 let result = oversized.get_value(1, offset, correct_size - 1).await;
1951 assert!(
1952 matches!(
1953 result,
1954 Err(Error::Codec(_)) | Err(Error::ChecksumMismatch(_, _))
1955 ),
1956 "expected Codec or ChecksumMismatch error, got: {:?}",
1957 result
1958 );
1959
1960 oversized.destroy().await.expect("Failed to destroy");
1961 });
1962 }
1963
1964 #[test_traced]
1965 fn test_recovery_values_has_orphan_section() {
1966 let executor = deterministic::Runner::default();
1967 executor.start(|context| async move {
1968 let cfg = test_cfg(&context);
1969
1970 let mut oversized: Oversized<_, TestEntry, TestValue> =
1972 Oversized::init(context.with_label("first"), cfg.clone())
1973 .await
1974 .expect("Failed to init");
1975
1976 for section in 1u64..=2 {
1977 let value: TestValue = [section as u8; 16];
1978 let entry = TestEntry::new(section, 0, 0);
1979 oversized
1980 .append(section, entry, &value)
1981 .await
1982 .expect("Failed to append");
1983 oversized.sync(section).await.expect("Failed to sync");
1984 }
1985 drop(oversized);
1986
1987 let glob_cfg = GlobConfig {
1989 partition: cfg.value_partition.clone(),
1990 compression: cfg.compression,
1991 codec_config: (),
1992 write_buffer: cfg.value_write_buffer,
1993 };
1994 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1995 .await
1996 .expect("Failed to init glob");
1997 let orphan_value: TestValue = [99; 16];
1998 glob.append(3, &orphan_value)
1999 .await
2000 .expect("Failed to append orphan");
2001 glob.sync(3).await.expect("Failed to sync glob");
2002 drop(glob);
2003
2004 let oversized: Oversized<_, TestEntry, TestValue> =
2006 Oversized::init(context.with_label("second"), cfg.clone())
2007 .await
2008 .expect("Failed to reinit");
2009
2010 assert!(oversized.get(1, 0).await.is_ok());
2012 assert!(oversized.get(2, 0).await.is_ok());
2013
2014 assert_eq!(oversized.newest_section(), Some(2));
2016
2017 oversized.destroy().await.expect("Failed to destroy");
2018 });
2019 }
2020
2021 #[test_traced]
2022 fn test_recovery_values_has_multiple_orphan_sections() {
2023 let executor = deterministic::Runner::default();
2024 executor.start(|context| async move {
2025 let cfg = test_cfg(&context);
2026
2027 let mut oversized: Oversized<_, TestEntry, TestValue> =
2029 Oversized::init(context.with_label("first"), cfg.clone())
2030 .await
2031 .expect("Failed to init");
2032
2033 let value: TestValue = [1; 16];
2034 let entry = TestEntry::new(1, 0, 0);
2035 oversized
2036 .append(1, entry, &value)
2037 .await
2038 .expect("Failed to append");
2039 oversized.sync(1).await.expect("Failed to sync");
2040 drop(oversized);
2041
2042 let glob_cfg = GlobConfig {
2044 partition: cfg.value_partition.clone(),
2045 compression: cfg.compression,
2046 codec_config: (),
2047 write_buffer: cfg.value_write_buffer,
2048 };
2049 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2050 .await
2051 .expect("Failed to init glob");
2052
2053 for section in 2u64..=4 {
2054 let orphan_value: TestValue = [section as u8; 16];
2055 glob.append(section, &orphan_value)
2056 .await
2057 .expect("Failed to append orphan");
2058 glob.sync(section).await.expect("Failed to sync glob");
2059 }
2060 drop(glob);
2061
2062 let oversized: Oversized<_, TestEntry, TestValue> =
2064 Oversized::init(context.with_label("second"), cfg.clone())
2065 .await
2066 .expect("Failed to reinit");
2067
2068 assert!(oversized.get(1, 0).await.is_ok());
2070
2071 assert_eq!(oversized.newest_section(), Some(1));
2073
2074 oversized.destroy().await.expect("Failed to destroy");
2075 });
2076 }
2077
2078 #[test_traced]
2079 fn test_recovery_index_empty_but_values_exist() {
2080 let executor = deterministic::Runner::default();
2081 executor.start(|context| async move {
2082 let cfg = test_cfg(&context);
2083
2084 let glob_cfg = GlobConfig {
2086 partition: cfg.value_partition.clone(),
2087 compression: cfg.compression,
2088 codec_config: (),
2089 write_buffer: cfg.value_write_buffer,
2090 };
2091 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2092 .await
2093 .expect("Failed to init glob");
2094
2095 for section in 1u64..=3 {
2096 let orphan_value: TestValue = [section as u8; 16];
2097 glob.append(section, &orphan_value)
2098 .await
2099 .expect("Failed to append orphan");
2100 glob.sync(section).await.expect("Failed to sync glob");
2101 }
2102 drop(glob);
2103
2104 let oversized: Oversized<_, TestEntry, TestValue> =
2106 Oversized::init(context.with_label("first"), cfg.clone())
2107 .await
2108 .expect("Failed to init");
2109
2110 assert_eq!(oversized.newest_section(), None);
2112 assert_eq!(oversized.oldest_section(), None);
2113
2114 oversized.destroy().await.expect("Failed to destroy");
2115 });
2116 }
2117
2118 #[test_traced]
2119 fn test_recovery_orphan_section_append_after() {
2120 let executor = deterministic::Runner::default();
2121 executor.start(|context| async move {
2122 let cfg = test_cfg(&context);
2123
2124 let mut oversized: Oversized<_, TestEntry, TestValue> =
2126 Oversized::init(context.with_label("first"), cfg.clone())
2127 .await
2128 .expect("Failed to init");
2129
2130 let value: TestValue = [1; 16];
2131 let entry = TestEntry::new(1, 0, 0);
2132 let (_, offset1, size1) = oversized
2133 .append(1, entry, &value)
2134 .await
2135 .expect("Failed to append");
2136 oversized.sync(1).await.expect("Failed to sync");
2137 drop(oversized);
2138
2139 let glob_cfg = GlobConfig {
2141 partition: cfg.value_partition.clone(),
2142 compression: cfg.compression,
2143 codec_config: (),
2144 write_buffer: cfg.value_write_buffer,
2145 };
2146 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2147 .await
2148 .expect("Failed to init glob");
2149
2150 for section in 2u64..=3 {
2151 let orphan_value: TestValue = [section as u8; 16];
2152 glob.append(section, &orphan_value)
2153 .await
2154 .expect("Failed to append orphan");
2155 glob.sync(section).await.expect("Failed to sync glob");
2156 }
2157 drop(glob);
2158
2159 let mut oversized: Oversized<_, TestEntry, TestValue> =
2161 Oversized::init(context.with_label("second"), cfg.clone())
2162 .await
2163 .expect("Failed to reinit");
2164
2165 let entry = oversized.get(1, 0).await.expect("Failed to get");
2167 assert_eq!(entry.id, 1);
2168 let value = oversized
2169 .get_value(1, offset1, size1)
2170 .await
2171 .expect("Failed to get value");
2172 assert_eq!(value, [1; 16]);
2173
2174 let new_value: TestValue = [42; 16];
2176 let new_entry = TestEntry::new(42, 0, 0);
2177 let (pos, offset, size) = oversized
2178 .append(2, new_entry, &new_value)
2179 .await
2180 .expect("Failed to append after recovery");
2181 assert_eq!(pos, 0);
2182
2183 let retrieved = oversized.get(2, 0).await.expect("Failed to get");
2185 assert_eq!(retrieved.id, 42);
2186 let retrieved_value = oversized
2187 .get_value(2, offset, size)
2188 .await
2189 .expect("Failed to get value");
2190 assert_eq!(retrieved_value, new_value);
2191
2192 oversized.sync(2).await.expect("Failed to sync");
2194 drop(oversized);
2195
2196 let oversized: Oversized<_, TestEntry, TestValue> =
2197 Oversized::init(context.with_label("third"), cfg)
2198 .await
2199 .expect("Failed to reinit after append");
2200
2201 assert!(oversized.get(1, 0).await.is_ok());
2203 assert!(oversized.get(2, 0).await.is_ok());
2204 assert_eq!(oversized.newest_section(), Some(2));
2205
2206 oversized.destroy().await.expect("Failed to destroy");
2207 });
2208 }
2209
2210 #[test_traced]
2211 fn test_recovery_no_orphan_sections() {
2212 let executor = deterministic::Runner::default();
2213 executor.start(|context| async move {
2214 let cfg = test_cfg(&context);
2215
2216 let mut oversized: Oversized<_, TestEntry, TestValue> =
2218 Oversized::init(context.with_label("first"), cfg.clone())
2219 .await
2220 .expect("Failed to init");
2221
2222 for section in 1u64..=3 {
2223 let value: TestValue = [section as u8; 16];
2224 let entry = TestEntry::new(section, 0, 0);
2225 oversized
2226 .append(section, entry, &value)
2227 .await
2228 .expect("Failed to append");
2229 oversized.sync(section).await.expect("Failed to sync");
2230 }
2231 drop(oversized);
2232
2233 let oversized: Oversized<_, TestEntry, TestValue> =
2235 Oversized::init(context.with_label("second"), cfg)
2236 .await
2237 .expect("Failed to reinit");
2238
2239 for section in 1u64..=3 {
2241 let entry = oversized.get(section, 0).await.expect("Failed to get");
2242 assert_eq!(entry.id, section);
2243 }
2244 assert_eq!(oversized.newest_section(), Some(3));
2245
2246 oversized.destroy().await.expect("Failed to destroy");
2247 });
2248 }
2249
2250 #[test_traced]
2251 fn test_recovery_orphan_with_empty_index_section() {
2252 let executor = deterministic::Runner::default();
2253 executor.start(|context| async move {
2254 let cfg = test_cfg(&context);
2255
2256 let mut oversized: Oversized<_, TestEntry, TestValue> =
2258 Oversized::init(context.with_label("first"), cfg.clone())
2259 .await
2260 .expect("Failed to init");
2261
2262 let value: TestValue = [1; 16];
2263 let entry = TestEntry::new(1, 0, 0);
2264 oversized
2265 .append(1, entry, &value)
2266 .await
2267 .expect("Failed to append");
2268 oversized.sync(1).await.expect("Failed to sync");
2269 drop(oversized);
2270
2271 let glob_cfg = GlobConfig {
2273 partition: cfg.value_partition.clone(),
2274 compression: cfg.compression,
2275 codec_config: (),
2276 write_buffer: cfg.value_write_buffer,
2277 };
2278 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2279 .await
2280 .expect("Failed to init glob");
2281 let orphan_value: TestValue = [2; 16];
2282 glob.append(2, &orphan_value)
2283 .await
2284 .expect("Failed to append orphan");
2285 glob.sync(2).await.expect("Failed to sync glob");
2286 drop(glob);
2287
2288 let (blob, _) = context
2290 .open(&cfg.index_partition, &1u64.to_be_bytes())
2291 .await
2292 .expect("Failed to open blob");
2293 blob.resize(0).await.expect("Failed to truncate");
2294 blob.sync().await.expect("Failed to sync");
2295 drop(blob);
2296
2297 let oversized: Oversized<_, TestEntry, TestValue> =
2299 Oversized::init(context.with_label("second"), cfg)
2300 .await
2301 .expect("Failed to reinit");
2302
2303 assert!(oversized.get(1, 0).await.is_err());
2305
2306 assert_eq!(oversized.newest_section(), Some(1));
2308
2309 oversized.destroy().await.expect("Failed to destroy");
2310 });
2311 }
2312
2313 #[test_traced]
2314 fn test_recovery_orphan_sections_with_gaps() {
2315 let executor = deterministic::Runner::default();
2318 executor.start(|context| async move {
2319 let cfg = test_cfg(&context);
2320
2321 let mut oversized: Oversized<_, TestEntry, TestValue> =
2323 Oversized::init(context.with_label("first"), cfg.clone())
2324 .await
2325 .expect("Failed to init");
2326
2327 for section in [1u64, 3, 5] {
2328 let value: TestValue = [section as u8; 16];
2329 let entry = TestEntry::new(section, 0, 0);
2330 oversized
2331 .append(section, entry, &value)
2332 .await
2333 .expect("Failed to append");
2334 oversized.sync(section).await.expect("Failed to sync");
2335 }
2336 drop(oversized);
2337
2338 let glob_cfg = GlobConfig {
2340 partition: cfg.value_partition.clone(),
2341 compression: cfg.compression,
2342 codec_config: (),
2343 write_buffer: cfg.value_write_buffer,
2344 };
2345 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2346 .await
2347 .expect("Failed to init glob");
2348
2349 for section in [2u64, 4, 6] {
2350 let orphan_value: TestValue = [section as u8; 16];
2351 glob.append(section, &orphan_value)
2352 .await
2353 .expect("Failed to append orphan");
2354 glob.sync(section).await.expect("Failed to sync glob");
2355 }
2356 drop(glob);
2357
2358 let oversized: Oversized<_, TestEntry, TestValue> =
2360 Oversized::init(context.with_label("second"), cfg)
2361 .await
2362 .expect("Failed to reinit");
2363
2364 for section in [1u64, 3, 5] {
2366 let entry = oversized.get(section, 0).await.expect("Failed to get");
2367 assert_eq!(entry.id, section);
2368 }
2369
2370 assert_eq!(oversized.oldest_section(), Some(1));
2372 assert_eq!(oversized.newest_section(), Some(5));
2373
2374 oversized.destroy().await.expect("Failed to destroy");
2375 });
2376 }
2377
2378 #[test_traced]
2379 fn test_recovery_glob_trailing_garbage_truncated() {
2380 let executor = deterministic::Runner::default();
2384 executor.start(|context| async move {
2385 let cfg = test_cfg(&context);
2386
2387 let mut oversized: Oversized<_, TestEntry, TestValue> =
2389 Oversized::init(context.with_label("first"), cfg.clone())
2390 .await
2391 .expect("Failed to init");
2392
2393 let mut locations = Vec::new();
2395 for i in 0..2u8 {
2396 let value: TestValue = [i; 16];
2397 let entry = TestEntry::new(i as u64, 0, 0);
2398 let loc = oversized
2399 .append(1, entry, &value)
2400 .await
2401 .expect("Failed to append");
2402 locations.push(loc);
2403 }
2404 oversized.sync(1).await.expect("Failed to sync");
2405
2406 let expected_next_offset = byte_end(locations[1].1, locations[1].2);
2408 drop(oversized);
2409
2410 let (blob, size) = context
2412 .open(&cfg.value_partition, &1u64.to_be_bytes())
2413 .await
2414 .expect("Failed to open blob");
2415 assert_eq!(size, expected_next_offset);
2416
2417 let garbage = vec![0xDE; 100];
2419 blob.write_at(size, garbage)
2420 .await
2421 .expect("Failed to write garbage");
2422 blob.sync().await.expect("Failed to sync");
2423 drop(blob);
2424
2425 let (blob, new_size) = context
2427 .open(&cfg.value_partition, &1u64.to_be_bytes())
2428 .await
2429 .expect("Failed to open blob");
2430 assert_eq!(new_size, expected_next_offset + 100);
2431 drop(blob);
2432
2433 let mut oversized: Oversized<_, TestEntry, TestValue> =
2435 Oversized::init(context.with_label("second"), cfg.clone())
2436 .await
2437 .expect("Failed to reinit");
2438
2439 for i in 0..2u8 {
2441 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2442 assert_eq!(entry.id, i as u64);
2443 }
2444
2445 let new_value: TestValue = [99; 16];
2447 let new_entry = TestEntry::new(99, 0, 0);
2448 let (pos, offset, _size) = oversized
2449 .append(1, new_entry, &new_value)
2450 .await
2451 .expect("Failed to append after recovery");
2452
2453 assert_eq!(pos, 2);
2455
2456 assert_eq!(offset, expected_next_offset);
2458
2459 let retrieved = oversized.get(1, 2).await.expect("Failed to get new entry");
2461 assert_eq!(retrieved.id, 99);
2462
2463 oversized.destroy().await.expect("Failed to destroy");
2464 });
2465 }
2466
2467 #[test_traced]
2468 fn test_recovery_entry_with_overflow_offset() {
2469 let executor = deterministic::Runner::default();
2472 executor.start(|context| async move {
2473 let cfg = Config {
2475 index_partition: "test-index".into(),
2476 value_partition: "test-values".into(),
2477 index_page_cache: CacheRef::from_pooler(
2478 &context,
2479 NZU16!(TestEntry::SIZE as u16),
2480 NZUsize!(8),
2481 ),
2482 index_write_buffer: NZUsize!(1024),
2483 value_write_buffer: NZUsize!(1024),
2484 compression: None,
2485 codec_config: (),
2486 };
2487
2488 let mut oversized: Oversized<_, TestEntry, TestValue> =
2490 Oversized::init(context.with_label("first"), cfg.clone())
2491 .await
2492 .expect("Failed to init");
2493
2494 let value: TestValue = [1; 16];
2495 let entry = TestEntry::new(1, 0, 0);
2496 oversized
2497 .append(1, entry, &value)
2498 .await
2499 .expect("Failed to append");
2500 oversized.sync(1).await.expect("Failed to sync");
2501 drop(oversized);
2502
2503 let (blob, _) = context
2507 .open(&cfg.index_partition, &1u64.to_be_bytes())
2508 .await
2509 .expect("Failed to open blob");
2510
2511 let mut entry_data = Vec::new();
2513 1u64.write(&mut entry_data); (u64::MAX - 10).write(&mut entry_data); 100u32.write(&mut entry_data); assert_eq!(entry_data.len(), TestEntry::SIZE);
2517
2518 let crc = Crc32::checksum(&entry_data);
2521 let len1 = TestEntry::SIZE as u16;
2522 let mut crc_record = Vec::new();
2523 crc_record.extend_from_slice(&len1.to_be_bytes()); crc_record.extend_from_slice(&crc.to_be_bytes()); crc_record.extend_from_slice(&0u16.to_be_bytes()); crc_record.extend_from_slice(&0u32.to_be_bytes()); assert_eq!(crc_record.len(), 12);
2528
2529 let mut page = entry_data;
2531 page.extend_from_slice(&crc_record);
2532 blob.write_at(0, page)
2533 .await
2534 .expect("Failed to write corrupted page");
2535 blob.sync().await.expect("Failed to sync");
2536 drop(blob);
2537
2538 let mut oversized: Oversized<_, TestEntry, TestValue> =
2541 Oversized::init(context.with_label("second"), cfg.clone())
2542 .await
2543 .expect("Failed to reinit");
2544
2545 assert!(oversized.get(1, 0).await.is_err());
2547
2548 let new_value: TestValue = [99; 16];
2550 let new_entry = TestEntry::new(99, 0, 0);
2551 let (pos, new_offset, _) = oversized
2552 .append(1, new_entry, &new_value)
2553 .await
2554 .expect("Failed to append after recovery");
2555
2556 assert_eq!(pos, 0);
2558 assert_eq!(new_offset, 0);
2560
2561 oversized.destroy().await.expect("Failed to destroy");
2562 });
2563 }
2564
2565 #[test_traced]
2566 fn test_empty_section_persistence() {
2567 let executor = deterministic::Runner::default();
2570 executor.start(|context| async move {
2571 let cfg = test_cfg(&context);
2572
2573 let mut oversized: Oversized<_, TestEntry, TestValue> =
2575 Oversized::init(context.with_label("first"), cfg.clone())
2576 .await
2577 .expect("Failed to init");
2578
2579 for i in 0..3u8 {
2580 let value: TestValue = [i; 16];
2581 let entry = TestEntry::new(i as u64, 0, 0);
2582 oversized
2583 .append(1, entry, &value)
2584 .await
2585 .expect("Failed to append");
2586 }
2587 oversized.sync(1).await.expect("Failed to sync");
2588
2589 let value2: TestValue = [10; 16];
2591 let entry2 = TestEntry::new(10, 0, 0);
2592 oversized
2593 .append(2, entry2, &value2)
2594 .await
2595 .expect("Failed to append to section 2");
2596 oversized.sync(2).await.expect("Failed to sync section 2");
2597 drop(oversized);
2598
2599 let (blob, _) = context
2601 .open(&cfg.index_partition, &1u64.to_be_bytes())
2602 .await
2603 .expect("Failed to open blob");
2604 blob.resize(0).await.expect("Failed to truncate");
2605 blob.sync().await.expect("Failed to sync");
2606 drop(blob);
2607
2608 let mut oversized: Oversized<_, TestEntry, TestValue> =
2610 Oversized::init(context.with_label("second"), cfg.clone())
2611 .await
2612 .expect("Failed to reinit");
2613
2614 assert!(oversized.get(1, 0).await.is_err());
2616
2617 let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2619 assert_eq!(entry.id, 10);
2620
2621 assert_eq!(oversized.oldest_section(), Some(1));
2623
2624 let new_value: TestValue = [99; 16];
2630 let new_entry = TestEntry::new(99, 0, 0);
2631 let (pos, offset, size) = oversized
2632 .append(1, new_entry, &new_value)
2633 .await
2634 .expect("Failed to append to empty section");
2635 assert_eq!(pos, 0);
2636 assert!(offset > 0);
2638 oversized.sync(1).await.expect("Failed to sync");
2639
2640 let entry = oversized.get(1, 0).await.expect("Failed to get");
2642 assert_eq!(entry.id, 99);
2643 let value = oversized
2644 .get_value(1, offset, size)
2645 .await
2646 .expect("Failed to get value");
2647 assert_eq!(value, new_value);
2648
2649 drop(oversized);
2650
2651 let oversized: Oversized<_, TestEntry, TestValue> =
2653 Oversized::init(context.with_label("third"), cfg.clone())
2654 .await
2655 .expect("Failed to reinit again");
2656
2657 let entry = oversized.get(1, 0).await.expect("Failed to get");
2659 assert_eq!(entry.id, 99);
2660
2661 let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2663 assert_eq!(entry.id, 10);
2664
2665 oversized.destroy().await.expect("Failed to destroy");
2666 });
2667 }
2668
2669 #[test_traced]
2670 fn test_get_value_size_equals_crc_size() {
2671 let executor = deterministic::Runner::default();
2674 executor.start(|context| async move {
2675 let cfg = test_cfg(&context);
2676 let mut oversized: Oversized<_, TestEntry, TestValue> =
2677 Oversized::init(context, cfg).await.expect("Failed to init");
2678
2679 let value: TestValue = [42; 16];
2680 let entry = TestEntry::new(1, 0, 0);
2681 let (_, offset, _) = oversized
2682 .append(1, entry, &value)
2683 .await
2684 .expect("Failed to append");
2685 oversized.sync(1).await.expect("Failed to sync");
2686
2687 let result = oversized.get_value(1, offset, 4).await;
2690 assert!(result.is_err());
2691
2692 oversized.destroy().await.expect("Failed to destroy");
2693 });
2694 }
2695
2696 #[test_traced]
2697 fn test_get_value_size_just_over_crc() {
2698 let executor = deterministic::Runner::default();
2701 executor.start(|context| async move {
2702 let cfg = test_cfg(&context);
2703 let mut oversized: Oversized<_, TestEntry, TestValue> =
2704 Oversized::init(context, cfg).await.expect("Failed to init");
2705
2706 let value: TestValue = [42; 16];
2707 let entry = TestEntry::new(1, 0, 0);
2708 let (_, offset, _) = oversized
2709 .append(1, entry, &value)
2710 .await
2711 .expect("Failed to append");
2712 oversized.sync(1).await.expect("Failed to sync");
2713
2714 let result = oversized.get_value(1, offset, 5).await;
2717 assert!(result.is_err());
2718
2719 oversized.destroy().await.expect("Failed to destroy");
2720 });
2721 }
2722
2723 #[test_traced]
2724 fn test_recovery_maximum_section_numbers() {
2725 let executor = deterministic::Runner::default();
2728 executor.start(|context| async move {
2729 let cfg = test_cfg(&context);
2730
2731 let large_sections = [u64::MAX - 3, u64::MAX - 2, u64::MAX - 1];
2733
2734 let mut oversized: Oversized<_, TestEntry, TestValue> =
2736 Oversized::init(context.with_label("first"), cfg.clone())
2737 .await
2738 .expect("Failed to init");
2739
2740 let mut locations = Vec::new();
2741 for §ion in &large_sections {
2742 let value: TestValue = [(section & 0xFF) as u8; 16];
2743 let entry = TestEntry::new(section, 0, 0);
2744 let loc = oversized
2745 .append(section, entry, &value)
2746 .await
2747 .expect("Failed to append");
2748 locations.push((section, loc));
2749 oversized.sync(section).await.expect("Failed to sync");
2750 }
2751 drop(oversized);
2752
2753 let middle_section = large_sections[1];
2755 let (blob, size) = context
2756 .open(&cfg.value_partition, &middle_section.to_be_bytes())
2757 .await
2758 .expect("Failed to open blob");
2759 blob.resize(size / 2).await.expect("Failed to truncate");
2760 blob.sync().await.expect("Failed to sync");
2761 drop(blob);
2762
2763 let mut oversized: Oversized<_, TestEntry, TestValue> =
2765 Oversized::init(context.with_label("second"), cfg.clone())
2766 .await
2767 .expect("Failed to reinit");
2768
2769 let entry = oversized
2771 .get(large_sections[0], 0)
2772 .await
2773 .expect("Failed to get first section");
2774 assert_eq!(entry.id, large_sections[0]);
2775
2776 let entry = oversized
2777 .get(large_sections[2], 0)
2778 .await
2779 .expect("Failed to get last section");
2780 assert_eq!(entry.id, large_sections[2]);
2781
2782 assert!(oversized.get(middle_section, 0).await.is_err());
2784
2785 let new_value: TestValue = [0xAB; 16];
2787 let new_entry = TestEntry::new(999, 0, 0);
2788 oversized
2789 .append(middle_section, new_entry, &new_value)
2790 .await
2791 .expect("Failed to append after recovery");
2792
2793 oversized.destroy().await.expect("Failed to destroy");
2794 });
2795 }
2796
2797 #[test_traced]
2798 fn test_recovery_crash_during_recovery_rewind() {
2799 let executor = deterministic::Runner::default();
2803 executor.start(|context| async move {
2804 let cfg = test_cfg(&context);
2805
2806 let mut oversized: Oversized<_, TestEntry, TestValue> =
2808 Oversized::init(context.with_label("first"), cfg.clone())
2809 .await
2810 .expect("Failed to init");
2811
2812 let mut locations = Vec::new();
2813 for i in 0..5u8 {
2814 let value: TestValue = [i; 16];
2815 let entry = TestEntry::new(i as u64, 0, 0);
2816 let loc = oversized
2817 .append(1, entry, &value)
2818 .await
2819 .expect("Failed to append");
2820 locations.push(loc);
2821 }
2822 oversized.sync(1).await.expect("Failed to sync");
2823 drop(oversized);
2824
2825 let (blob, _) = context
2827 .open(&cfg.value_partition, &1u64.to_be_bytes())
2828 .await
2829 .expect("Failed to open blob");
2830 let keep_size = byte_end(locations[2].1, locations[2].2);
2831 blob.resize(keep_size).await.expect("Failed to truncate");
2832 blob.sync().await.expect("Failed to sync");
2833 drop(blob);
2834
2835 let chunk_size = FixedJournal::<deterministic::Context, TestEntry>::CHUNK_SIZE as u64;
2840 let (index_blob, _) = context
2841 .open(&cfg.index_partition, &1u64.to_be_bytes())
2842 .await
2843 .expect("Failed to open index blob");
2844 let partial_rewind_size = 4 * chunk_size; index_blob
2846 .resize(partial_rewind_size)
2847 .await
2848 .expect("Failed to resize");
2849 index_blob.sync().await.expect("Failed to sync");
2850 drop(index_blob);
2851
2852 let mut oversized: Oversized<_, TestEntry, TestValue> =
2855 Oversized::init(context.with_label("second"), cfg.clone())
2856 .await
2857 .expect("Failed to reinit after nested crash");
2858
2859 for i in 0..3u8 {
2861 let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2862 assert_eq!(entry.id, i as u64);
2863
2864 let (_, offset, size) = locations[i as usize];
2865 let value = oversized
2866 .get_value(1, offset, size)
2867 .await
2868 .expect("Failed to get value");
2869 assert_eq!(value, [i; 16]);
2870 }
2871
2872 assert!(oversized.get(1, 3).await.is_err());
2874
2875 let new_value: TestValue = [0xFF; 16];
2877 let new_entry = TestEntry::new(100, 0, 0);
2878 let (pos, offset, _size) = oversized
2879 .append(1, new_entry, &new_value)
2880 .await
2881 .expect("Failed to append");
2882 assert_eq!(pos, 3); assert_eq!(offset, byte_end(locations[2].1, locations[2].2));
2886
2887 oversized.destroy().await.expect("Failed to destroy");
2888 });
2889 }
2890
2891 #[test_traced]
2892 fn test_recovery_crash_during_orphan_cleanup() {
2893 let executor = deterministic::Runner::default();
2896 executor.start(|context| async move {
2897 let cfg = test_cfg(&context);
2898
2899 let mut oversized: Oversized<_, TestEntry, TestValue> =
2901 Oversized::init(context.with_label("first"), cfg.clone())
2902 .await
2903 .expect("Failed to init");
2904
2905 let value: TestValue = [1; 16];
2906 let entry = TestEntry::new(1, 0, 0);
2907 let (_, offset1, size1) = oversized
2908 .append(1, entry, &value)
2909 .await
2910 .expect("Failed to append");
2911 oversized.sync(1).await.expect("Failed to sync");
2912 drop(oversized);
2913
2914 let glob_cfg = GlobConfig {
2916 partition: cfg.value_partition.clone(),
2917 compression: cfg.compression,
2918 codec_config: (),
2919 write_buffer: cfg.value_write_buffer,
2920 };
2921 let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2922 .await
2923 .expect("Failed to init glob");
2924
2925 for section in 2u64..=4 {
2926 let orphan_value: TestValue = [section as u8; 16];
2927 glob.append(section, &orphan_value)
2928 .await
2929 .expect("Failed to append orphan");
2930 glob.sync(section).await.expect("Failed to sync glob");
2931 }
2932 drop(glob);
2933
2934 context
2937 .remove(&cfg.value_partition, Some(&2u64.to_be_bytes()))
2938 .await
2939 .expect("Failed to remove section 2");
2940
2941 let mut oversized: Oversized<_, TestEntry, TestValue> =
2943 Oversized::init(context.with_label("second"), cfg.clone())
2944 .await
2945 .expect("Failed to reinit");
2946
2947 let entry = oversized.get(1, 0).await.expect("Failed to get");
2949 assert_eq!(entry.id, 1);
2950 let value = oversized
2951 .get_value(1, offset1, size1)
2952 .await
2953 .expect("Failed to get value");
2954 assert_eq!(value, [1; 16]);
2955
2956 assert_eq!(oversized.oldest_section(), Some(1));
2958 assert_eq!(oversized.newest_section(), Some(1));
2959
2960 let new_value: TestValue = [42; 16];
2962 let new_entry = TestEntry::new(42, 0, 0);
2963 let (pos, _, _) = oversized
2964 .append(2, new_entry, &new_value)
2965 .await
2966 .expect("Failed to append to section 2");
2967 assert_eq!(pos, 0); oversized.destroy().await.expect("Failed to destroy");
2970 });
2971 }
2972
2973 #[test_traced]
2974 fn test_rewind_to_zero_index_size() {
2975 let executor = deterministic::Runner::default();
2976 executor.start(|context| async move {
2977 let cfg = test_cfg(&context);
2978 let mut oversized: Oversized<_, TestEntry, TestValue> =
2979 Oversized::init(context, cfg).await.expect("Failed to init");
2980
2981 let value: TestValue = [1; 16];
2982 let entry = TestEntry::new(1, 0, 0);
2983 oversized
2984 .append(0, entry, &value)
2985 .await
2986 .expect("Failed to append");
2987 oversized.sync(0).await.expect("Failed to sync");
2988
2989 oversized
2990 .rewind(0, 0)
2991 .await
2992 .expect("rewind to zero index_size must not fail");
2993
2994 assert_eq!(oversized.last(0).await.unwrap(), None);
2995 assert_eq!(oversized.size(0).await.unwrap(), 0);
2996 assert_eq!(oversized.value_size(0).await.unwrap(), 0);
2997
2998 oversized.destroy().await.expect("Failed to destroy");
2999 });
3000 }
3001
3002 #[test_traced]
3003 fn test_rewind_to_zero_on_missing_section() {
3004 let executor = deterministic::Runner::default();
3005 executor.start(|context| async move {
3006 let cfg = test_cfg(&context);
3007 let mut oversized: Oversized<_, TestEntry, TestValue> =
3008 Oversized::init(context, cfg).await.expect("Failed to init");
3009
3010 oversized
3011 .rewind(0, 0)
3012 .await
3013 .expect("rewind on missing section must not fail");
3014
3015 assert!(matches!(
3016 oversized.last(0).await,
3017 Err(Error::SectionOutOfRange(0))
3018 ));
3019 assert_eq!(oversized.value_size(0).await.unwrap(), 0);
3020
3021 oversized.destroy().await.expect("Failed to destroy");
3022 });
3023 }
3024
3025 #[test_traced]
3026 fn test_rewind_nonzero_on_missing_section_errors() {
3027 let executor = deterministic::Runner::default();
3028 executor.start(|context| async move {
3029 let cfg = test_cfg(&context);
3030 let mut oversized: Oversized<_, TestEntry, TestValue> =
3031 Oversized::init(context, cfg).await.expect("Failed to init");
3032
3033 let result = oversized.rewind(0, 1).await;
3034 assert!(
3035 matches!(result, Err(Error::SectionOutOfRange(0))),
3036 "nonzero index_size on missing section must fail, got: {result:?}"
3037 );
3038
3039 oversized.destroy().await.expect("Failed to destroy");
3040 });
3041 }
3042
3043 #[test_traced]
3044 fn test_rewind_section_nonzero_on_missing_section_errors() {
3045 let executor = deterministic::Runner::default();
3046 executor.start(|context| async move {
3047 let cfg = test_cfg(&context);
3048 let mut oversized: Oversized<_, TestEntry, TestValue> =
3049 Oversized::init(context, cfg).await.expect("Failed to init");
3050
3051 let result = oversized.rewind_section(0, 1).await;
3052 assert!(
3053 matches!(result, Err(Error::SectionOutOfRange(0))),
3054 "nonzero index_size on missing section must fail, got: {result:?}"
3055 );
3056
3057 oversized.destroy().await.expect("Failed to destroy");
3058 });
3059 }
3060
3061 #[test_traced]
3062 fn test_last_pruned_section_returns_error() {
3063 let executor = deterministic::Runner::default();
3064 executor.start(|context| async move {
3065 let cfg = test_cfg(&context);
3066 let mut oversized: Oversized<_, TestEntry, TestValue> =
3067 Oversized::init(context, cfg).await.expect("Failed to init");
3068
3069 let value: TestValue = [1; 16];
3070 oversized
3071 .append(0, TestEntry::new(1, 0, 0), &value)
3072 .await
3073 .expect("Failed to append");
3074 oversized
3075 .append(1, TestEntry::new(2, 0, 0), &value)
3076 .await
3077 .expect("Failed to append");
3078 oversized.sync_all().await.expect("Failed to sync");
3079
3080 oversized.prune(1).await.expect("Failed to prune");
3081
3082 assert!(matches!(
3083 oversized.last(0).await,
3084 Err(Error::AlreadyPrunedToSection(1))
3085 ));
3086 assert!(oversized.last(1).await.unwrap().is_some());
3087
3088 oversized.destroy().await.expect("Failed to destroy");
3089 });
3090 }
3091}