Skip to main content

commonware_storage/journal/segmented/
oversized.rs

1//! Segmented journal for oversized values.
2//!
3//! This module combines [super::fixed::Journal] with [super::glob::Glob] to handle
4//! entries that reference variable-length "oversized" values. It provides coordinated
5//! operations and built-in crash recovery.
6//!
7//! # Architecture
8//!
9//! ```text
10//! +-------------------+     +-------------------+
11//! | Fixed Journal     |     | Glob (Values)     |
12//! | (Index Entries)   |     |                   |
13//! +-------------------+     +-------------------+
14//! | entry_0           | --> | value_0           |
15//! | entry_1           | --> | value_1           |
16//! | ...               |     | ...               |
17//! +-------------------+     +-------------------+
18//! ```
19//!
20//! Each index entry contains `(value_offset, value_size)` pointing to its value in glob.
21//!
22//! # Crash Recovery
23//!
24//! On unclean shutdown, the index journal and glob may have different lengths:
25//! - Index entry pointing to non-existent glob data (dangerous)
26//! - Glob value without index entry (orphan - acceptable but cleaned up)
27//! - Glob sections without corresponding index sections (orphan sections - removed)
28//!
29//! During initialization, crash recovery is performed:
30//! 1. Each index entry's glob reference is validated (`value_offset + value_size <= glob_size`)
31//! 2. Invalid entries are skipped and the index journal is rewound
32//! 3. Orphan value sections (sections in glob but not in index) are removed
33//!
34//! This allows async writes (glob first, then index) while ensuring consistency
35//! after recovery.
36//!
37//! _Recovery only validates that index entries point to valid byte ranges
38//! within the glob. It does **not** verify value checksums during recovery (this would
39//! require reading all values). Value checksums are verified lazily when values are
40//! read via `get_value()`. If the underlying storage is corrupted, `get_value()` will
41//! return a checksum error even though the index entry exists._
42
43use super::{
44    fixed::{Config as FixedConfig, Journal as FixedJournal},
45    glob::{Config as GlobConfig, Glob},
46};
47use crate::journal::Error;
48use commonware_codec::{Codec, CodecFixed, CodecShared};
49use commonware_runtime::{BufferPooler, Metrics, Storage};
50use futures::{future::try_join, stream::Stream};
51use std::{collections::HashSet, num::NonZeroUsize};
52use tracing::{debug, warn};
53
54/// Trait for index entries that reference oversized values in glob storage.
55///
56/// Implementations must provide access to the value location for crash recovery validation,
57/// and a way to set the location when appending.
58pub trait Record: CodecFixed<Cfg = ()> + Clone {
59    /// Returns `(value_offset, value_size)` for crash recovery validation.
60    fn value_location(&self) -> (u64, u32);
61
62    /// Returns a new entry with the value location set.
63    ///
64    /// Called during `append` after the value is written to glob storage.
65    fn with_location(self, offset: u64, size: u32) -> Self;
66}
67
68/// Configuration for oversized journal.
69#[derive(Clone)]
70pub struct Config<C> {
71    /// Partition for the fixed index journal.
72    pub index_partition: String,
73
74    /// Partition for the glob value storage.
75    pub value_partition: String,
76
77    /// Page cache for index journal caching.
78    pub index_page_cache: commonware_runtime::buffer::paged::CacheRef,
79
80    /// Write buffer size for the index journal.
81    pub index_write_buffer: NonZeroUsize,
82
83    /// Write buffer size for the value journal.
84    pub value_write_buffer: NonZeroUsize,
85
86    /// Optional compression level for values (using zstd).
87    pub compression: Option<u8>,
88
89    /// Codec configuration for values.
90    pub codec_config: C,
91}
92
93/// Segmented journal for entries with oversized values.
94///
95/// Combines a fixed-size index journal with glob storage for variable-length values.
96/// Provides coordinated operations and crash recovery.
97pub struct Oversized<E: BufferPooler + Storage + Metrics, I: Record, V: Codec> {
98    index: FixedJournal<E, I>,
99    values: Glob<E, V>,
100}
101
102impl<E: BufferPooler + Storage + Metrics, I: Record + Send + Sync, V: CodecShared>
103    Oversized<E, I, V>
104{
105    /// Initialize with crash recovery validation.
106    ///
107    /// Validates each index entry's glob reference during replay. Invalid entries
108    /// (pointing beyond glob size) are skipped, and the index journal is rewound
109    /// to exclude trailing invalid entries.
110    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
111        // Initialize both journals
112        let index_cfg = FixedConfig {
113            partition: cfg.index_partition,
114            page_cache: cfg.index_page_cache,
115            write_buffer: cfg.index_write_buffer,
116        };
117        let index = FixedJournal::init(context.with_label("index"), index_cfg).await?;
118
119        let value_cfg = GlobConfig {
120            partition: cfg.value_partition,
121            compression: cfg.compression,
122            codec_config: cfg.codec_config,
123            write_buffer: cfg.value_write_buffer,
124        };
125        let values = Glob::init(context.with_label("values"), value_cfg).await?;
126
127        let mut oversized = Self { index, values };
128
129        // Perform crash recovery validation
130        oversized.recover().await?;
131
132        Ok(oversized)
133    }
134
135    /// Perform crash recovery by validating index entries against glob sizes.
136    ///
137    /// Only checks the last entry in each section. Since entries are appended sequentially
138    /// and value offsets are monotonically increasing within a section, if the last entry
139    /// is valid then all earlier entries must be valid too.
140    async fn recover(&mut self) -> Result<(), Error> {
141        let chunk_size = FixedJournal::<E, I>::CHUNK_SIZE as u64;
142        let sections: Vec<u64> = self.index.sections().collect();
143
144        for section in sections {
145            let index_size = self.index.size(section).await?;
146            if index_size == 0 {
147                continue;
148            }
149
150            let glob_size = match self.values.size(section).await {
151                Ok(size) => size,
152                Err(Error::AlreadyPrunedToSection(oldest)) => {
153                    // This shouldn't happen in normal operation: prune() prunes the index
154                    // first, then the glob. A crash between these would leave the glob
155                    // NOT pruned (opposite of this case). We handle this defensively in
156                    // case of external manipulation or future changes.
157                    warn!(
158                        section,
159                        oldest, "index has section that glob already pruned"
160                    );
161                    0
162                }
163                Err(e) => return Err(e),
164            };
165
166            // Truncate any trailing partial entry
167            let entry_count = index_size / chunk_size;
168            let aligned_size = entry_count * chunk_size;
169            if aligned_size < index_size {
170                warn!(
171                    section,
172                    index_size, aligned_size, "trailing bytes detected: truncating"
173                );
174                self.index.rewind_section(section, aligned_size).await?;
175            }
176
177            // If there is nothing, we can exit early and rewind values to 0
178            if entry_count == 0 {
179                warn!(
180                    section,
181                    index_size, "trailing bytes detected: truncating to 0"
182                );
183                self.values.rewind_section(section, 0).await?;
184                continue;
185            }
186
187            // Find last valid entry and target glob size
188            let (valid_count, glob_target) = self
189                .find_last_valid_entry(section, entry_count, glob_size)
190                .await;
191
192            // Rewind index if any entries are invalid
193            if valid_count < entry_count {
194                let valid_size = valid_count * chunk_size;
195                debug!(section, entry_count, valid_count, "rewinding index");
196                self.index.rewind_section(section, valid_size).await?;
197            }
198
199            // Truncate glob trailing garbage (can occur when value was written but
200            // index entry wasn't, or when index was truncated but glob wasn't)
201            if glob_size > glob_target {
202                debug!(
203                    section,
204                    glob_size, glob_target, "truncating glob trailing garbage"
205                );
206                self.values.rewind_section(section, glob_target).await?;
207            }
208        }
209
210        // Clean up orphan value sections that don't exist in index
211        self.cleanup_orphan_value_sections().await?;
212
213        Ok(())
214    }
215
216    /// Remove any value sections that don't have corresponding index sections.
217    ///
218    /// This can happen if a crash occurs after writing to values but before
219    /// writing to index for a new section. Since sections don't have to be
220    /// contiguous, we compare the actual sets of sections rather than just
221    /// comparing the newest section numbers.
222    async fn cleanup_orphan_value_sections(&mut self) -> Result<(), Error> {
223        // Collect index sections into a set for O(1) lookup
224        let index_sections: HashSet<u64> = self.index.sections().collect();
225
226        // Find value sections that don't exist in index
227        let orphan_sections: Vec<u64> = self
228            .values
229            .sections()
230            .filter(|s| !index_sections.contains(s))
231            .collect();
232
233        // Remove each orphan section
234        for section in orphan_sections {
235            warn!(section, "removing orphan value section");
236            self.values.remove_section(section).await?;
237        }
238
239        Ok(())
240    }
241
242    /// Find the number of valid entries and the corresponding glob target size.
243    ///
244    /// Scans backwards from the last entry until a valid one is found.
245    /// Returns `(valid_count, glob_target)` where `glob_target` is the end offset
246    /// of the last valid entry's value.
247    async fn find_last_valid_entry(
248        &self,
249        section: u64,
250        entry_count: u64,
251        glob_size: u64,
252    ) -> (u64, u64) {
253        for pos in (0..entry_count).rev() {
254            match self.index.get(section, pos).await {
255                Ok(entry) => {
256                    let (offset, size) = entry.value_location();
257                    let entry_end = offset.saturating_add(u64::from(size));
258                    if entry_end <= glob_size {
259                        return (pos + 1, entry_end);
260                    }
261                    if pos == entry_count - 1 {
262                        warn!(
263                            section,
264                            pos, glob_size, entry_end, "invalid entry: glob truncated"
265                        );
266                    }
267                }
268                Err(_) => {
269                    if pos == entry_count - 1 {
270                        warn!(section, pos, "corrupted last entry, scanning backwards");
271                    }
272                }
273            }
274        }
275        (0, 0)
276    }
277
278    /// Append entry + value.
279    ///
280    /// Writes value to glob first, then writes index entry with the value location.
281    ///
282    /// Returns `(position, offset, size)` where:
283    /// - `position`: Position in the index journal
284    /// - `offset`: Byte offset in glob
285    /// - `size`: Size of value in glob (including checksum)
286    pub async fn append(
287        &mut self,
288        section: u64,
289        entry: I,
290        value: &V,
291    ) -> Result<(u64, u64, u32), Error> {
292        // Write value first (glob). This will typically write to an in-memory
293        // buffer and return quickly (only blocks when the buffer is full).
294        let (offset, size) = self.values.append(section, value).await?;
295
296        // Update entry with actual location and write to index
297        let entry_with_location = entry.with_location(offset, size);
298        let position = self.index.append(section, &entry_with_location).await?;
299
300        Ok((position, offset, size))
301    }
302
303    /// Get entry at position (index entry only, not value).
304    pub async fn get(&self, section: u64, position: u64) -> Result<I, Error> {
305        self.index.get(section, position).await
306    }
307
308    /// Get the last entry for a section, if any.
309    ///
310    /// Returns `Ok(None)` if the section is empty.
311    ///
312    /// # Errors
313    ///
314    /// - [Error::AlreadyPrunedToSection] if the section has been pruned.
315    /// - [Error::SectionOutOfRange] if the section doesn't exist.
316    pub async fn last(&self, section: u64) -> Result<Option<I>, Error> {
317        self.index.last(section).await
318    }
319
320    /// Get value using offset/size from entry.
321    ///
322    /// The offset should be the byte offset from `append()` or from the entry's `value_location()`.
323    pub async fn get_value(&self, section: u64, offset: u64, size: u32) -> Result<V, Error> {
324        self.values.get(section, offset, size).await
325    }
326
327    /// Replay index entries starting from given section.
328    ///
329    /// Returns a stream of `(section, position, entry)` tuples.
330    pub async fn replay(
331        &self,
332        start_section: u64,
333        start_position: u64,
334        buffer: NonZeroUsize,
335    ) -> Result<impl Stream<Item = Result<(u64, u64, I), Error>> + Send + '_, Error> {
336        self.index
337            .replay(start_section, start_position, buffer)
338            .await
339    }
340
341    /// Sync both journals for given section.
342    pub async fn sync(&self, section: u64) -> Result<(), Error> {
343        try_join(self.index.sync(section), self.values.sync(section))
344            .await
345            .map(|_| ())
346    }
347
348    /// Sync all sections.
349    pub async fn sync_all(&self) -> Result<(), Error> {
350        try_join(self.index.sync_all(), self.values.sync_all())
351            .await
352            .map(|_| ())
353    }
354
355    /// Prune both journals. Returns true if any sections were pruned.
356    ///
357    /// Prunes index first, then glob. This order ensures crash safety:
358    /// - If crash after index prune but before glob: orphan data in glob (acceptable)
359    /// - If crash before index prune: no change, retry works
360    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
361        let index_pruned = self.index.prune(min).await?;
362        let value_pruned = self.values.prune(min).await?;
363        Ok(index_pruned || value_pruned)
364    }
365
366    /// Rewind both journals to a specific section and index size.
367    ///
368    /// This rewinds the section to the given index size and removes all sections
369    /// after the given section. The value size is derived from the last entry.
370    pub async fn rewind(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
371        // Rewind index first (this also removes sections after `section`)
372        self.index.rewind(section, index_size).await?;
373
374        // Derive value size from last entry (section may not exist if empty)
375        let value_size = match self.index.last(section).await {
376            Ok(Some(entry)) => {
377                let (offset, size) = entry.value_location();
378                offset
379                    .checked_add(u64::from(size))
380                    .ok_or(Error::OffsetOverflow)?
381            }
382            Ok(None) => 0,
383            Err(Error::SectionOutOfRange(_)) if index_size == 0 => 0,
384            Err(e) => return Err(e),
385        };
386
387        // Rewind values (this also removes sections after `section`)
388        self.values.rewind(section, value_size).await
389    }
390
391    /// Rewind only the given section to a specific index size.
392    ///
393    /// Unlike `rewind`, this does not affect other sections.
394    /// The value size is derived from the last entry after rewinding the index.
395    pub async fn rewind_section(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
396        // Rewind index first
397        self.index.rewind_section(section, index_size).await?;
398
399        // Derive value size from last entry (section may not exist if empty)
400        let value_size = match self.index.last(section).await {
401            Ok(Some(entry)) => {
402                let (offset, size) = entry.value_location();
403                offset
404                    .checked_add(u64::from(size))
405                    .ok_or(Error::OffsetOverflow)?
406            }
407            Ok(None) => 0,
408            Err(Error::SectionOutOfRange(_)) if index_size == 0 => 0,
409            Err(e) => return Err(e),
410        };
411
412        // Rewind values
413        self.values.rewind_section(section, value_size).await
414    }
415
416    /// Get index size for checkpoint.
417    ///
418    /// The value size can be derived from the last entry's location when needed.
419    pub async fn size(&self, section: u64) -> Result<u64, Error> {
420        self.index.size(section).await
421    }
422
423    /// Get the value size for a section, derived from the last entry's location.
424    pub async fn value_size(&self, section: u64) -> Result<u64, Error> {
425        match self.index.last(section).await {
426            Ok(Some(entry)) => {
427                let (offset, size) = entry.value_location();
428                offset
429                    .checked_add(u64::from(size))
430                    .ok_or(Error::OffsetOverflow)
431            }
432            Ok(None) | Err(Error::SectionOutOfRange(_)) => Ok(0),
433            Err(e) => Err(e),
434        }
435    }
436
437    /// Returns the oldest section number, if any exist.
438    pub fn oldest_section(&self) -> Option<u64> {
439        self.index.oldest_section()
440    }
441
442    /// Returns the newest section number, if any exist.
443    pub fn newest_section(&self) -> Option<u64> {
444        self.index.newest_section()
445    }
446
447    /// Destroy all underlying storage.
448    pub async fn destroy(self) -> Result<(), Error> {
449        try_join(self.index.destroy(), self.values.destroy())
450            .await
451            .map(|_| ())
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458    use commonware_codec::{FixedSize, Read, ReadExt, Write};
459    use commonware_cryptography::Crc32;
460    use commonware_macros::test_traced;
461    use commonware_runtime::{
462        buffer::paged::CacheRef, deterministic, Blob as _, Buf, BufMut, BufferPooler, Metrics,
463        Runner,
464    };
465    use commonware_utils::{NZUsize, NZU16};
466
467    /// Convert offset + size to byte end position (for truncation tests).
468    fn byte_end(offset: u64, size: u32) -> u64 {
469        offset + u64::from(size)
470    }
471
472    /// Test index entry that stores a u64 id and references a value.
473    #[derive(Debug, Clone, PartialEq)]
474    struct TestEntry {
475        id: u64,
476        value_offset: u64,
477        value_size: u32,
478    }
479
480    impl TestEntry {
481        fn new(id: u64, value_offset: u64, value_size: u32) -> Self {
482            Self {
483                id,
484                value_offset,
485                value_size,
486            }
487        }
488    }
489
490    impl Write for TestEntry {
491        fn write(&self, buf: &mut impl BufMut) {
492            self.id.write(buf);
493            self.value_offset.write(buf);
494            self.value_size.write(buf);
495        }
496    }
497
498    impl Read for TestEntry {
499        type Cfg = ();
500
501        fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
502            let id = u64::read(buf)?;
503            let value_offset = u64::read(buf)?;
504            let value_size = u32::read(buf)?;
505            Ok(Self {
506                id,
507                value_offset,
508                value_size,
509            })
510        }
511    }
512
513    impl FixedSize for TestEntry {
514        const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
515    }
516
517    impl Record for TestEntry {
518        fn value_location(&self) -> (u64, u32) {
519            (self.value_offset, self.value_size)
520        }
521
522        fn with_location(mut self, offset: u64, size: u32) -> Self {
523            self.value_offset = offset;
524            self.value_size = size;
525            self
526        }
527    }
528
529    fn test_cfg(pooler: &impl BufferPooler) -> Config<()> {
530        Config {
531            index_partition: "test-index".into(),
532            value_partition: "test-values".into(),
533            index_page_cache: CacheRef::from_pooler(pooler, NZU16!(64), NZUsize!(8)),
534            index_write_buffer: NZUsize!(1024),
535            value_write_buffer: NZUsize!(1024),
536            compression: None,
537            codec_config: (),
538        }
539    }
540
541    /// Simple test value type with unit config.
542    type TestValue = [u8; 16];
543
544    #[test_traced]
545    fn test_oversized_append_and_get() {
546        let executor = deterministic::Runner::default();
547        executor.start(|context| async move {
548            let cfg = test_cfg(&context);
549            let mut oversized: Oversized<_, TestEntry, TestValue> =
550                Oversized::init(context, cfg).await.expect("Failed to init");
551
552            // Append entry with value
553            let value: TestValue = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
554            let entry = TestEntry::new(42, 0, 0);
555            let (position, offset, size) = oversized
556                .append(1, entry, &value)
557                .await
558                .expect("Failed to append");
559
560            assert_eq!(position, 0);
561
562            // Get entry
563            let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
564            assert_eq!(retrieved_entry.id, 42);
565
566            // Get value
567            let retrieved_value = oversized
568                .get_value(1, offset, size)
569                .await
570                .expect("Failed to get value");
571            assert_eq!(retrieved_value, value);
572
573            oversized.destroy().await.expect("Failed to destroy");
574        });
575    }
576
577    #[test_traced]
578    fn test_oversized_crash_recovery() {
579        let executor = deterministic::Runner::default();
580        executor.start(|context| async move {
581            let cfg = test_cfg(&context);
582
583            // Create and populate oversized journal
584            let mut oversized: Oversized<_, TestEntry, TestValue> =
585                Oversized::init(context.with_label("first"), cfg.clone())
586                    .await
587                    .expect("Failed to init");
588
589            // Append multiple entries
590            let mut locations = Vec::new();
591            for i in 0..5u8 {
592                let value: TestValue = [i; 16];
593                let entry = TestEntry::new(i as u64, 0, 0);
594                let (position, offset, size) = oversized
595                    .append(1, entry, &value)
596                    .await
597                    .expect("Failed to append");
598                locations.push((position, offset, size));
599            }
600            oversized.sync(1).await.expect("Failed to sync");
601            drop(oversized);
602
603            // Simulate crash: truncate glob to lose last 2 values
604            let (blob, _) = context
605                .open(&cfg.value_partition, &1u64.to_be_bytes())
606                .await
607                .expect("Failed to open blob");
608
609            // Calculate size to keep first 3 entries
610            let keep_size = byte_end(locations[2].1, locations[2].2);
611            blob.resize(keep_size).await.expect("Failed to truncate");
612            blob.sync().await.expect("Failed to sync");
613            drop(blob);
614
615            // Reinitialize - should recover and rewind index
616            let oversized: Oversized<_, TestEntry, TestValue> =
617                Oversized::init(context.with_label("second"), cfg.clone())
618                    .await
619                    .expect("Failed to reinit");
620
621            // First 3 entries should still be valid
622            for i in 0..3u8 {
623                let (position, offset, size) = locations[i as usize];
624                let entry = oversized.get(1, position).await.expect("Failed to get");
625                assert_eq!(entry.id, i as u64);
626
627                let value = oversized
628                    .get_value(1, offset, size)
629                    .await
630                    .expect("Failed to get value");
631                assert_eq!(value, [i; 16]);
632            }
633
634            // Entry at position 3 should fail (index was rewound)
635            let result = oversized.get(1, 3).await;
636            assert!(result.is_err());
637
638            oversized.destroy().await.expect("Failed to destroy");
639        });
640    }
641
642    #[test_traced]
643    fn test_oversized_persistence() {
644        let executor = deterministic::Runner::default();
645        executor.start(|context| async move {
646            let cfg = test_cfg(&context);
647
648            // Create and populate
649            let mut oversized: Oversized<_, TestEntry, TestValue> =
650                Oversized::init(context.with_label("first"), cfg.clone())
651                    .await
652                    .expect("Failed to init");
653
654            let value: TestValue = [42; 16];
655            let entry = TestEntry::new(123, 0, 0);
656            let (position, offset, size) = oversized
657                .append(1, entry, &value)
658                .await
659                .expect("Failed to append");
660            oversized.sync(1).await.expect("Failed to sync");
661            drop(oversized);
662
663            // Reopen and verify
664            let oversized: Oversized<_, TestEntry, TestValue> =
665                Oversized::init(context.with_label("second"), cfg)
666                    .await
667                    .expect("Failed to reinit");
668
669            let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
670            assert_eq!(retrieved_entry.id, 123);
671
672            let retrieved_value = oversized
673                .get_value(1, offset, size)
674                .await
675                .expect("Failed to get value");
676            assert_eq!(retrieved_value, value);
677
678            oversized.destroy().await.expect("Failed to destroy");
679        });
680    }
681
682    #[test_traced]
683    fn test_oversized_prune() {
684        let executor = deterministic::Runner::default();
685        executor.start(|context| async move {
686            let cfg = test_cfg(&context);
687            let mut oversized: Oversized<_, TestEntry, TestValue> =
688                Oversized::init(context, cfg).await.expect("Failed to init");
689
690            // Append to multiple sections
691            for section in 1u64..=5 {
692                let value: TestValue = [section as u8; 16];
693                let entry = TestEntry::new(section, 0, 0);
694                oversized
695                    .append(section, entry, &value)
696                    .await
697                    .expect("Failed to append");
698                oversized.sync(section).await.expect("Failed to sync");
699            }
700
701            // Prune sections < 3
702            oversized.prune(3).await.expect("Failed to prune");
703
704            // Sections 1, 2 should be gone
705            assert!(oversized.get(1, 0).await.is_err());
706            assert!(oversized.get(2, 0).await.is_err());
707
708            // Sections 3, 4, 5 should exist
709            assert!(oversized.get(3, 0).await.is_ok());
710            assert!(oversized.get(4, 0).await.is_ok());
711            assert!(oversized.get(5, 0).await.is_ok());
712
713            oversized.destroy().await.expect("Failed to destroy");
714        });
715    }
716
717    #[test_traced]
718    fn test_recovery_empty_section() {
719        let executor = deterministic::Runner::default();
720        executor.start(|context| async move {
721            let cfg = test_cfg(&context);
722
723            // Create oversized journal
724            let mut oversized: Oversized<_, TestEntry, TestValue> =
725                Oversized::init(context.with_label("first"), cfg.clone())
726                    .await
727                    .expect("Failed to init");
728
729            // Append to section 2 only (section 1 remains empty after being opened)
730            let value: TestValue = [42; 16];
731            let entry = TestEntry::new(1, 0, 0);
732            oversized
733                .append(2, entry, &value)
734                .await
735                .expect("Failed to append");
736            oversized.sync(2).await.expect("Failed to sync");
737            drop(oversized);
738
739            // Reinitialize - recovery should handle the empty/non-existent section 1
740            let oversized: Oversized<_, TestEntry, TestValue> =
741                Oversized::init(context.with_label("second"), cfg)
742                    .await
743                    .expect("Failed to reinit");
744
745            // Section 2 entry should be valid
746            let entry = oversized.get(2, 0).await.expect("Failed to get");
747            assert_eq!(entry.id, 1);
748
749            oversized.destroy().await.expect("Failed to destroy");
750        });
751    }
752
753    #[test_traced]
754    fn test_recovery_all_entries_invalid() {
755        let executor = deterministic::Runner::default();
756        executor.start(|context| async move {
757            let cfg = test_cfg(&context);
758
759            // Create and populate
760            let mut oversized: Oversized<_, TestEntry, TestValue> =
761                Oversized::init(context.with_label("first"), cfg.clone())
762                    .await
763                    .expect("Failed to init");
764
765            // Append 5 entries
766            for i in 0..5u8 {
767                let value: TestValue = [i; 16];
768                let entry = TestEntry::new(i as u64, 0, 0);
769                oversized
770                    .append(1, entry, &value)
771                    .await
772                    .expect("Failed to append");
773            }
774            oversized.sync(1).await.expect("Failed to sync");
775            drop(oversized);
776
777            // Truncate glob to 0 bytes - ALL entries become invalid
778            let (blob, _) = context
779                .open(&cfg.value_partition, &1u64.to_be_bytes())
780                .await
781                .expect("Failed to open blob");
782            blob.resize(0).await.expect("Failed to truncate");
783            blob.sync().await.expect("Failed to sync");
784            drop(blob);
785
786            // Reinitialize - should recover and rewind index to 0
787            let mut oversized: Oversized<_, TestEntry, TestValue> =
788                Oversized::init(context.with_label("second"), cfg)
789                    .await
790                    .expect("Failed to reinit");
791
792            // No entries should be accessible
793            let result = oversized.get(1, 0).await;
794            assert!(result.is_err());
795
796            // Should be able to append after recovery
797            let value: TestValue = [99; 16];
798            let entry = TestEntry::new(100, 0, 0);
799            let (pos, offset, size) = oversized
800                .append(1, entry, &value)
801                .await
802                .expect("Failed to append after recovery");
803            assert_eq!(pos, 0);
804
805            let retrieved = oversized.get(1, 0).await.expect("Failed to get");
806            assert_eq!(retrieved.id, 100);
807            let retrieved_value = oversized
808                .get_value(1, offset, size)
809                .await
810                .expect("Failed to get value");
811            assert_eq!(retrieved_value, value);
812
813            oversized.destroy().await.expect("Failed to destroy");
814        });
815    }
816
817    #[test_traced]
818    fn test_recovery_multiple_sections_mixed_validity() {
819        let executor = deterministic::Runner::default();
820        executor.start(|context| async move {
821            let cfg = test_cfg(&context);
822
823            // Create and populate multiple sections
824            let mut oversized: Oversized<_, TestEntry, TestValue> =
825                Oversized::init(context.with_label("first"), cfg.clone())
826                    .await
827                    .expect("Failed to init");
828
829            // Section 1: 3 entries
830            let mut section1_locations = Vec::new();
831            for i in 0..3u8 {
832                let value: TestValue = [i; 16];
833                let entry = TestEntry::new(i as u64, 0, 0);
834                let loc = oversized
835                    .append(1, entry, &value)
836                    .await
837                    .expect("Failed to append");
838                section1_locations.push(loc);
839            }
840            oversized.sync(1).await.expect("Failed to sync");
841
842            // Section 2: 5 entries
843            let mut section2_locations = Vec::new();
844            for i in 0..5u8 {
845                let value: TestValue = [10 + i; 16];
846                let entry = TestEntry::new(10 + i as u64, 0, 0);
847                let loc = oversized
848                    .append(2, entry, &value)
849                    .await
850                    .expect("Failed to append");
851                section2_locations.push(loc);
852            }
853            oversized.sync(2).await.expect("Failed to sync");
854
855            // Section 3: 2 entries
856            for i in 0..2u8 {
857                let value: TestValue = [20 + i; 16];
858                let entry = TestEntry::new(20 + i as u64, 0, 0);
859                oversized
860                    .append(3, entry, &value)
861                    .await
862                    .expect("Failed to append");
863            }
864            oversized.sync(3).await.expect("Failed to sync");
865            drop(oversized);
866
867            // Truncate section 1 glob to keep only first entry
868            let (blob, _) = context
869                .open(&cfg.value_partition, &1u64.to_be_bytes())
870                .await
871                .expect("Failed to open blob");
872            let keep_size = byte_end(section1_locations[0].1, section1_locations[0].2);
873            blob.resize(keep_size).await.expect("Failed to truncate");
874            blob.sync().await.expect("Failed to sync");
875            drop(blob);
876
877            // Truncate section 2 glob to keep first 3 entries
878            let (blob, _) = context
879                .open(&cfg.value_partition, &2u64.to_be_bytes())
880                .await
881                .expect("Failed to open blob");
882            let keep_size = byte_end(section2_locations[2].1, section2_locations[2].2);
883            blob.resize(keep_size).await.expect("Failed to truncate");
884            blob.sync().await.expect("Failed to sync");
885            drop(blob);
886
887            // Section 3 remains intact
888
889            // Reinitialize
890            let oversized: Oversized<_, TestEntry, TestValue> =
891                Oversized::init(context.with_label("second"), cfg)
892                    .await
893                    .expect("Failed to reinit");
894
895            // Section 1: only position 0 valid
896            assert!(oversized.get(1, 0).await.is_ok());
897            assert!(oversized.get(1, 1).await.is_err());
898            assert!(oversized.get(1, 2).await.is_err());
899
900            // Section 2: positions 0,1,2 valid
901            assert!(oversized.get(2, 0).await.is_ok());
902            assert!(oversized.get(2, 1).await.is_ok());
903            assert!(oversized.get(2, 2).await.is_ok());
904            assert!(oversized.get(2, 3).await.is_err());
905            assert!(oversized.get(2, 4).await.is_err());
906
907            // Section 3: both positions valid
908            assert!(oversized.get(3, 0).await.is_ok());
909            assert!(oversized.get(3, 1).await.is_ok());
910
911            oversized.destroy().await.expect("Failed to destroy");
912        });
913    }
914
915    #[test_traced]
916    fn test_recovery_corrupted_last_index_entry() {
917        let executor = deterministic::Runner::default();
918        executor.start(|context| async move {
919            // Use page size = entry size so each entry is on its own page.
920            // This allows corrupting just the last entry's page without affecting others.
921            // Physical page size = TestEntry::SIZE (20) + 12 (CRC record) = 32 bytes.
922            let cfg = Config {
923                index_partition: "test-index".into(),
924                value_partition: "test-values".into(),
925                index_page_cache: CacheRef::from_pooler(
926                    &context,
927                    NZU16!(TestEntry::SIZE as u16),
928                    NZUsize!(8),
929                ),
930                index_write_buffer: NZUsize!(1024),
931                value_write_buffer: NZUsize!(1024),
932                compression: None,
933                codec_config: (),
934            };
935
936            // Create and populate
937            let mut oversized: Oversized<_, TestEntry, TestValue> =
938                Oversized::init(context.with_label("first"), cfg.clone())
939                    .await
940                    .expect("Failed to init");
941
942            // Append 5 entries (each on its own page)
943            for i in 0..5u8 {
944                let value: TestValue = [i; 16];
945                let entry = TestEntry::new(i as u64, 0, 0);
946                oversized
947                    .append(1, entry, &value)
948                    .await
949                    .expect("Failed to append");
950            }
951            oversized.sync(1).await.expect("Failed to sync");
952            drop(oversized);
953
954            // Corrupt the last page's CRC to trigger page-level integrity failure
955            let (blob, size) = context
956                .open(&cfg.index_partition, &1u64.to_be_bytes())
957                .await
958                .expect("Failed to open blob");
959
960            // Physical page size = 20 + 12 = 32 bytes
961            // 5 entries = 5 pages = 160 bytes total
962            // Last page CRC starts at offset 160 - 12 = 148
963            assert_eq!(size, 160);
964            let last_page_crc_offset = size - 12;
965            blob.write_at(last_page_crc_offset, vec![0xFF; 12])
966                .await
967                .expect("Failed to corrupt");
968            blob.sync().await.expect("Failed to sync");
969            drop(blob);
970
971            // Reinitialize - should detect page corruption and truncate
972            let mut oversized: Oversized<_, TestEntry, TestValue> =
973                Oversized::init(context.with_label("second"), cfg)
974                    .await
975                    .expect("Failed to reinit");
976
977            // First 4 entries should be valid (on pages 0-3)
978            for i in 0..4u8 {
979                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
980                assert_eq!(entry.id, i as u64);
981            }
982
983            // Entry 4 should be gone (its page was corrupted)
984            assert!(oversized.get(1, 4).await.is_err());
985
986            // Should be able to append after recovery
987            let value: TestValue = [99; 16];
988            let entry = TestEntry::new(100, 0, 0);
989            let (pos, offset, size) = oversized
990                .append(1, entry, &value)
991                .await
992                .expect("Failed to append after recovery");
993            assert_eq!(pos, 4);
994
995            let retrieved = oversized.get(1, 4).await.expect("Failed to get");
996            assert_eq!(retrieved.id, 100);
997            let retrieved_value = oversized
998                .get_value(1, offset, size)
999                .await
1000                .expect("Failed to get value");
1001            assert_eq!(retrieved_value, value);
1002
1003            oversized.destroy().await.expect("Failed to destroy");
1004        });
1005    }
1006
1007    #[test_traced]
1008    fn test_recovery_all_entries_valid() {
1009        let executor = deterministic::Runner::default();
1010        executor.start(|context| async move {
1011            let cfg = test_cfg(&context);
1012
1013            // Create and populate
1014            let mut oversized: Oversized<_, TestEntry, TestValue> =
1015                Oversized::init(context.with_label("first"), cfg.clone())
1016                    .await
1017                    .expect("Failed to init");
1018
1019            // Append entries to multiple sections
1020            for section in 1u64..=3 {
1021                for i in 0..10u8 {
1022                    let value: TestValue = [(section as u8) * 10 + i; 16];
1023                    let entry = TestEntry::new(section * 100 + i as u64, 0, 0);
1024                    oversized
1025                        .append(section, entry, &value)
1026                        .await
1027                        .expect("Failed to append");
1028                }
1029                oversized.sync(section).await.expect("Failed to sync");
1030            }
1031            drop(oversized);
1032
1033            // Reinitialize with no corruption - should be fast
1034            let oversized: Oversized<_, TestEntry, TestValue> =
1035                Oversized::init(context.with_label("second"), cfg)
1036                    .await
1037                    .expect("Failed to reinit");
1038
1039            // All entries should be valid
1040            for section in 1u64..=3 {
1041                for i in 0..10u8 {
1042                    let entry = oversized
1043                        .get(section, i as u64)
1044                        .await
1045                        .expect("Failed to get");
1046                    assert_eq!(entry.id, section * 100 + i as u64);
1047                }
1048            }
1049
1050            oversized.destroy().await.expect("Failed to destroy");
1051        });
1052    }
1053
1054    #[test_traced]
1055    fn test_recovery_single_entry_invalid() {
1056        let executor = deterministic::Runner::default();
1057        executor.start(|context| async move {
1058            let cfg = test_cfg(&context);
1059
1060            // Create and populate with single entry
1061            let mut oversized: Oversized<_, TestEntry, TestValue> =
1062                Oversized::init(context.with_label("first"), cfg.clone())
1063                    .await
1064                    .expect("Failed to init");
1065
1066            let value: TestValue = [42; 16];
1067            let entry = TestEntry::new(1, 0, 0);
1068            oversized
1069                .append(1, entry, &value)
1070                .await
1071                .expect("Failed to append");
1072            oversized.sync(1).await.expect("Failed to sync");
1073            drop(oversized);
1074
1075            // Truncate glob to 0 - single entry becomes invalid
1076            let (blob, _) = context
1077                .open(&cfg.value_partition, &1u64.to_be_bytes())
1078                .await
1079                .expect("Failed to open blob");
1080            blob.resize(0).await.expect("Failed to truncate");
1081            blob.sync().await.expect("Failed to sync");
1082            drop(blob);
1083
1084            // Reinitialize
1085            let oversized: Oversized<_, TestEntry, TestValue> =
1086                Oversized::init(context.with_label("second"), cfg)
1087                    .await
1088                    .expect("Failed to reinit");
1089
1090            // Entry should be gone
1091            assert!(oversized.get(1, 0).await.is_err());
1092
1093            oversized.destroy().await.expect("Failed to destroy");
1094        });
1095    }
1096
1097    #[test_traced]
1098    fn test_recovery_last_entry_off_by_one() {
1099        let executor = deterministic::Runner::default();
1100        executor.start(|context| async move {
1101            let cfg = test_cfg(&context);
1102
1103            // Create and populate
1104            let mut oversized: Oversized<_, TestEntry, TestValue> =
1105                Oversized::init(context.with_label("first"), cfg.clone())
1106                    .await
1107                    .expect("Failed to init");
1108
1109            let mut locations = Vec::new();
1110            for i in 0..3u8 {
1111                let value: TestValue = [i; 16];
1112                let entry = TestEntry::new(i as u64, 0, 0);
1113                let loc = oversized
1114                    .append(1, entry, &value)
1115                    .await
1116                    .expect("Failed to append");
1117                locations.push(loc);
1118            }
1119            oversized.sync(1).await.expect("Failed to sync");
1120            drop(oversized);
1121
1122            // Truncate glob to be off by 1 byte from last entry
1123            let (blob, _) = context
1124                .open(&cfg.value_partition, &1u64.to_be_bytes())
1125                .await
1126                .expect("Failed to open blob");
1127
1128            // Last entry needs: offset + size bytes
1129            // Truncate to offset + size - 1 (missing 1 byte)
1130            let last = &locations[2];
1131            let truncate_to = byte_end(last.1, last.2) - 1;
1132            blob.resize(truncate_to).await.expect("Failed to truncate");
1133            blob.sync().await.expect("Failed to sync");
1134            drop(blob);
1135
1136            // Reinitialize
1137            let mut oversized: Oversized<_, TestEntry, TestValue> =
1138                Oversized::init(context.with_label("second"), cfg)
1139                    .await
1140                    .expect("Failed to reinit");
1141
1142            // First 2 entries should be valid
1143            assert!(oversized.get(1, 0).await.is_ok());
1144            assert!(oversized.get(1, 1).await.is_ok());
1145
1146            // Entry 2 should be gone (truncated)
1147            assert!(oversized.get(1, 2).await.is_err());
1148
1149            // Should be able to append after recovery
1150            let value: TestValue = [99; 16];
1151            let entry = TestEntry::new(100, 0, 0);
1152            let (pos, offset, size) = oversized
1153                .append(1, entry, &value)
1154                .await
1155                .expect("Failed to append after recovery");
1156            assert_eq!(pos, 2);
1157
1158            let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1159            assert_eq!(retrieved.id, 100);
1160            let retrieved_value = oversized
1161                .get_value(1, offset, size)
1162                .await
1163                .expect("Failed to get value");
1164            assert_eq!(retrieved_value, value);
1165
1166            oversized.destroy().await.expect("Failed to destroy");
1167        });
1168    }
1169
1170    #[test_traced]
1171    fn test_recovery_glob_missing_entirely() {
1172        let executor = deterministic::Runner::default();
1173        executor.start(|context| async move {
1174            let cfg = test_cfg(&context);
1175
1176            // Create and populate
1177            let mut oversized: Oversized<_, TestEntry, TestValue> =
1178                Oversized::init(context.with_label("first"), cfg.clone())
1179                    .await
1180                    .expect("Failed to init");
1181
1182            for i in 0..3u8 {
1183                let value: TestValue = [i; 16];
1184                let entry = TestEntry::new(i as u64, 0, 0);
1185                oversized
1186                    .append(1, entry, &value)
1187                    .await
1188                    .expect("Failed to append");
1189            }
1190            oversized.sync(1).await.expect("Failed to sync");
1191            drop(oversized);
1192
1193            // Delete the glob file entirely
1194            context
1195                .remove(&cfg.value_partition, Some(&1u64.to_be_bytes()))
1196                .await
1197                .expect("Failed to remove");
1198
1199            // Reinitialize - glob size will be 0, all entries invalid
1200            let oversized: Oversized<_, TestEntry, TestValue> =
1201                Oversized::init(context.with_label("second"), cfg)
1202                    .await
1203                    .expect("Failed to reinit");
1204
1205            // All entries should be gone
1206            assert!(oversized.get(1, 0).await.is_err());
1207            assert!(oversized.get(1, 1).await.is_err());
1208            assert!(oversized.get(1, 2).await.is_err());
1209
1210            oversized.destroy().await.expect("Failed to destroy");
1211        });
1212    }
1213
1214    #[test_traced]
1215    fn test_recovery_can_append_after_recovery() {
1216        let executor = deterministic::Runner::default();
1217        executor.start(|context| async move {
1218            let cfg = test_cfg(&context);
1219
1220            // Create and populate
1221            let mut oversized: Oversized<_, TestEntry, TestValue> =
1222                Oversized::init(context.with_label("first"), cfg.clone())
1223                    .await
1224                    .expect("Failed to init");
1225
1226            let mut locations = Vec::new();
1227            for i in 0..5u8 {
1228                let value: TestValue = [i; 16];
1229                let entry = TestEntry::new(i as u64, 0, 0);
1230                let loc = oversized
1231                    .append(1, entry, &value)
1232                    .await
1233                    .expect("Failed to append");
1234                locations.push(loc);
1235            }
1236            oversized.sync(1).await.expect("Failed to sync");
1237            drop(oversized);
1238
1239            // Truncate glob to keep only first 2 entries
1240            let (blob, _) = context
1241                .open(&cfg.value_partition, &1u64.to_be_bytes())
1242                .await
1243                .expect("Failed to open blob");
1244            let keep_size = byte_end(locations[1].1, locations[1].2);
1245            blob.resize(keep_size).await.expect("Failed to truncate");
1246            blob.sync().await.expect("Failed to sync");
1247            drop(blob);
1248
1249            // Reinitialize
1250            let mut oversized: Oversized<_, TestEntry, TestValue> =
1251                Oversized::init(context.with_label("second"), cfg.clone())
1252                    .await
1253                    .expect("Failed to reinit");
1254
1255            // Verify first 2 entries exist
1256            assert!(oversized.get(1, 0).await.is_ok());
1257            assert!(oversized.get(1, 1).await.is_ok());
1258            assert!(oversized.get(1, 2).await.is_err());
1259
1260            // Append new entries after recovery
1261            for i in 10..15u8 {
1262                let value: TestValue = [i; 16];
1263                let entry = TestEntry::new(i as u64, 0, 0);
1264                oversized
1265                    .append(1, entry, &value)
1266                    .await
1267                    .expect("Failed to append after recovery");
1268            }
1269            oversized.sync(1).await.expect("Failed to sync");
1270
1271            // Verify new entries at positions 2, 3, 4, 5, 6
1272            for i in 0..5u8 {
1273                let entry = oversized
1274                    .get(1, 2 + i as u64)
1275                    .await
1276                    .expect("Failed to get new entry");
1277                assert_eq!(entry.id, (10 + i) as u64);
1278            }
1279
1280            oversized.destroy().await.expect("Failed to destroy");
1281        });
1282    }
1283
1284    #[test_traced]
1285    fn test_recovery_glob_pruned_but_index_not() {
1286        let executor = deterministic::Runner::default();
1287        executor.start(|context| async move {
1288            let cfg = test_cfg(&context);
1289
1290            // Create and populate multiple sections
1291            let mut oversized: Oversized<_, TestEntry, TestValue> =
1292                Oversized::init(context.with_label("first"), cfg.clone())
1293                    .await
1294                    .expect("Failed to init");
1295
1296            for section in 1u64..=3 {
1297                let value: TestValue = [section as u8; 16];
1298                let entry = TestEntry::new(section, 0, 0);
1299                oversized
1300                    .append(section, entry, &value)
1301                    .await
1302                    .expect("Failed to append");
1303                oversized.sync(section).await.expect("Failed to sync");
1304            }
1305            drop(oversized);
1306
1307            // Simulate crash during prune: prune ONLY the glob, not the index
1308            // This creates the "glob pruned but index not" scenario
1309            use crate::journal::segmented::glob::{Config as GlobConfig, Glob};
1310            let glob_cfg = GlobConfig {
1311                partition: cfg.value_partition.clone(),
1312                compression: cfg.compression,
1313                codec_config: (),
1314                write_buffer: cfg.value_write_buffer,
1315            };
1316            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1317                .await
1318                .expect("Failed to init glob");
1319            glob.prune(2).await.expect("Failed to prune glob");
1320            glob.sync_all().await.expect("Failed to sync glob");
1321            drop(glob);
1322
1323            // Reinitialize - should recover gracefully with warning
1324            // Index section 1 will be rewound to 0 entries
1325            let oversized: Oversized<_, TestEntry, TestValue> =
1326                Oversized::init(context.with_label("second"), cfg.clone())
1327                    .await
1328                    .expect("Failed to reinit");
1329
1330            // Section 1 entries should be gone (index rewound due to glob pruned)
1331            assert!(oversized.get(1, 0).await.is_err());
1332
1333            // Sections 2 and 3 should still be valid
1334            assert!(oversized.get(2, 0).await.is_ok());
1335            assert!(oversized.get(3, 0).await.is_ok());
1336
1337            oversized.destroy().await.expect("Failed to destroy");
1338        });
1339    }
1340
1341    #[test_traced]
1342    fn test_recovery_index_partition_deleted() {
1343        let executor = deterministic::Runner::default();
1344        executor.start(|context| async move {
1345            let cfg = test_cfg(&context);
1346
1347            // Create and populate multiple sections
1348            let mut oversized: Oversized<_, TestEntry, TestValue> =
1349                Oversized::init(context.with_label("first"), cfg.clone())
1350                    .await
1351                    .expect("Failed to init");
1352
1353            for section in 1u64..=3 {
1354                let value: TestValue = [section as u8; 16];
1355                let entry = TestEntry::new(section, 0, 0);
1356                oversized
1357                    .append(section, entry, &value)
1358                    .await
1359                    .expect("Failed to append");
1360                oversized.sync(section).await.expect("Failed to sync");
1361            }
1362            drop(oversized);
1363
1364            // Delete index blob for section 2 (simulate corruption/loss)
1365            context
1366                .remove(&cfg.index_partition, Some(&2u64.to_be_bytes()))
1367                .await
1368                .expect("Failed to remove index");
1369
1370            // Reinitialize - should handle gracefully
1371            // Section 2 is gone from index, orphan data in glob is acceptable
1372            let oversized: Oversized<_, TestEntry, TestValue> =
1373                Oversized::init(context.with_label("second"), cfg.clone())
1374                    .await
1375                    .expect("Failed to reinit");
1376
1377            // Section 1 and 3 should still be valid
1378            assert!(oversized.get(1, 0).await.is_ok());
1379            assert!(oversized.get(3, 0).await.is_ok());
1380
1381            // Section 2 should be gone (index file deleted)
1382            assert!(oversized.get(2, 0).await.is_err());
1383
1384            oversized.destroy().await.expect("Failed to destroy");
1385        });
1386    }
1387
1388    #[test_traced]
1389    fn test_recovery_index_synced_but_glob_not() {
1390        let executor = deterministic::Runner::default();
1391        executor.start(|context| async move {
1392            let cfg = test_cfg(&context);
1393
1394            // Create and populate
1395            let mut oversized: Oversized<_, TestEntry, TestValue> =
1396                Oversized::init(context.with_label("first"), cfg.clone())
1397                    .await
1398                    .expect("Failed to init");
1399
1400            // Append entries and sync
1401            let mut locations = Vec::new();
1402            for i in 0..3u8 {
1403                let value: TestValue = [i; 16];
1404                let entry = TestEntry::new(i as u64, 0, 0);
1405                let loc = oversized
1406                    .append(1, entry, &value)
1407                    .await
1408                    .expect("Failed to append");
1409                locations.push(loc);
1410            }
1411            oversized.sync(1).await.expect("Failed to sync");
1412
1413            // Add more entries WITHOUT syncing (simulates unsynced writes)
1414            for i in 10..15u8 {
1415                let value: TestValue = [i; 16];
1416                let entry = TestEntry::new(i as u64, 0, 0);
1417                oversized
1418                    .append(1, entry, &value)
1419                    .await
1420                    .expect("Failed to append");
1421            }
1422            // Note: NOT calling sync() here
1423            drop(oversized);
1424
1425            // Simulate crash where index was synced but glob wasn't:
1426            // Truncate glob back to the synced size (3 entries)
1427            let (blob, _) = context
1428                .open(&cfg.value_partition, &1u64.to_be_bytes())
1429                .await
1430                .expect("Failed to open blob");
1431            let synced_size = byte_end(locations[2].1, locations[2].2);
1432            blob.resize(synced_size).await.expect("Failed to truncate");
1433            blob.sync().await.expect("Failed to sync");
1434            drop(blob);
1435
1436            // Reinitialize - should rewind index to match glob
1437            let oversized: Oversized<_, TestEntry, TestValue> =
1438                Oversized::init(context.with_label("second"), cfg)
1439                    .await
1440                    .expect("Failed to reinit");
1441
1442            // First 3 entries should be valid
1443            for i in 0..3u8 {
1444                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1445                assert_eq!(entry.id, i as u64);
1446            }
1447
1448            // Entries 3-7 should be gone (unsynced, index rewound)
1449            assert!(oversized.get(1, 3).await.is_err());
1450
1451            oversized.destroy().await.expect("Failed to destroy");
1452        });
1453    }
1454
1455    #[test_traced]
1456    fn test_recovery_glob_synced_but_index_not() {
1457        let executor = deterministic::Runner::default();
1458        executor.start(|context| async move {
1459            // Use page size = entry size so each entry is exactly one page.
1460            // This allows truncating by entry count to equal truncating by full pages,
1461            // maintaining page-level integrity.
1462            let cfg = Config {
1463                index_partition: "test-index".into(),
1464                value_partition: "test-values".into(),
1465                index_page_cache: CacheRef::from_pooler(
1466                    &context,
1467                    NZU16!(TestEntry::SIZE as u16),
1468                    NZUsize!(8),
1469                ),
1470                index_write_buffer: NZUsize!(1024),
1471                value_write_buffer: NZUsize!(1024),
1472                compression: None,
1473                codec_config: (),
1474            };
1475
1476            // Create and populate
1477            let mut oversized: Oversized<_, TestEntry, TestValue> =
1478                Oversized::init(context.with_label("first"), cfg.clone())
1479                    .await
1480                    .expect("Failed to init");
1481
1482            // Append entries and sync
1483            let mut locations = Vec::new();
1484            for i in 0..3u8 {
1485                let value: TestValue = [i; 16];
1486                let entry = TestEntry::new(i as u64, 0, 0);
1487                let loc = oversized
1488                    .append(1, entry, &value)
1489                    .await
1490                    .expect("Failed to append");
1491                locations.push(loc);
1492            }
1493            oversized.sync(1).await.expect("Failed to sync");
1494            drop(oversized);
1495
1496            // Simulate crash: truncate INDEX but leave GLOB intact
1497            // This creates orphan data in glob (glob ahead of index)
1498            let (blob, _size) = context
1499                .open(&cfg.index_partition, &1u64.to_be_bytes())
1500                .await
1501                .expect("Failed to open blob");
1502
1503            // Keep only first 2 index entries (2 full pages)
1504            // Physical page size = logical (20) + CRC record (12) = 32 bytes
1505            let physical_page_size = (TestEntry::SIZE + 12) as u64;
1506            blob.resize(2 * physical_page_size)
1507                .await
1508                .expect("Failed to truncate");
1509            blob.sync().await.expect("Failed to sync");
1510            drop(blob);
1511
1512            // Reinitialize - glob has orphan data from entry 3
1513            let mut oversized: Oversized<_, TestEntry, TestValue> =
1514                Oversized::init(context.with_label("second"), cfg.clone())
1515                    .await
1516                    .expect("Failed to reinit");
1517
1518            // First 2 entries should be valid
1519            for i in 0..2u8 {
1520                let (position, offset, size) = locations[i as usize];
1521                let entry = oversized.get(1, position).await.expect("Failed to get");
1522                assert_eq!(entry.id, i as u64);
1523
1524                let value = oversized
1525                    .get_value(1, offset, size)
1526                    .await
1527                    .expect("Failed to get value");
1528                assert_eq!(value, [i; 16]);
1529            }
1530
1531            // Entry at position 2 should fail (index was truncated)
1532            assert!(oversized.get(1, 2).await.is_err());
1533
1534            // Append new entries - should work despite orphan data in glob
1535            let mut new_locations = Vec::new();
1536            for i in 10..13u8 {
1537                let value: TestValue = [i; 16];
1538                let entry = TestEntry::new(i as u64, 0, 0);
1539                let (position, offset, size) = oversized
1540                    .append(1, entry, &value)
1541                    .await
1542                    .expect("Failed to append after recovery");
1543
1544                // New entries start at position 2 (after the 2 valid entries)
1545                assert_eq!(position, (i - 10 + 2) as u64);
1546                new_locations.push((position, offset, size, i));
1547
1548                // Verify we can read the new entry
1549                let retrieved = oversized.get(1, position).await.expect("Failed to get");
1550                assert_eq!(retrieved.id, i as u64);
1551
1552                let retrieved_value = oversized
1553                    .get_value(1, offset, size)
1554                    .await
1555                    .expect("Failed to get value");
1556                assert_eq!(retrieved_value, value);
1557            }
1558
1559            // Sync and restart again to verify persistence with orphan data
1560            oversized.sync(1).await.expect("Failed to sync");
1561            drop(oversized);
1562
1563            // Reinitialize after adding data on top of orphan glob data
1564            let oversized: Oversized<_, TestEntry, TestValue> =
1565                Oversized::init(context.with_label("third"), cfg)
1566                    .await
1567                    .expect("Failed to reinit after append");
1568
1569            // Read all valid entries in the index
1570            // First 2 entries from original data
1571            for i in 0..2u8 {
1572                let (position, offset, size) = locations[i as usize];
1573                let entry = oversized.get(1, position).await.expect("Failed to get");
1574                assert_eq!(entry.id, i as u64);
1575
1576                let value = oversized
1577                    .get_value(1, offset, size)
1578                    .await
1579                    .expect("Failed to get value");
1580                assert_eq!(value, [i; 16]);
1581            }
1582
1583            // New entries added after recovery
1584            for (position, offset, size, expected_id) in &new_locations {
1585                let entry = oversized
1586                    .get(1, *position)
1587                    .await
1588                    .expect("Failed to get new entry after restart");
1589                assert_eq!(entry.id, *expected_id as u64);
1590
1591                let value = oversized
1592                    .get_value(1, *offset, *size)
1593                    .await
1594                    .expect("Failed to get new value after restart");
1595                assert_eq!(value, [*expected_id; 16]);
1596            }
1597
1598            // Verify total entry count: 2 original + 3 new = 5
1599            assert!(oversized.get(1, 4).await.is_ok());
1600            assert!(oversized.get(1, 5).await.is_err());
1601
1602            oversized.destroy().await.expect("Failed to destroy");
1603        });
1604    }
1605
1606    #[test_traced]
1607    fn test_recovery_partial_index_entry() {
1608        let executor = deterministic::Runner::default();
1609        executor.start(|context| async move {
1610            let cfg = test_cfg(&context);
1611
1612            // Create and populate
1613            let mut oversized: Oversized<_, TestEntry, TestValue> =
1614                Oversized::init(context.with_label("first"), cfg.clone())
1615                    .await
1616                    .expect("Failed to init");
1617
1618            // Append 3 entries
1619            for i in 0..3u8 {
1620                let value: TestValue = [i; 16];
1621                let entry = TestEntry::new(i as u64, 0, 0);
1622                oversized
1623                    .append(1, entry, &value)
1624                    .await
1625                    .expect("Failed to append");
1626            }
1627            oversized.sync(1).await.expect("Failed to sync");
1628            drop(oversized);
1629
1630            // Simulate crash during write: truncate index to partial entry
1631            // Each entry is TestEntry::SIZE (20) + 4 (CRC32) = 24 bytes
1632            // Truncate to 3 full entries + 10 bytes of partial entry
1633            let (blob, _) = context
1634                .open(&cfg.index_partition, &1u64.to_be_bytes())
1635                .await
1636                .expect("Failed to open blob");
1637            let partial_size = 3 * 24 + 10; // 3 full entries + partial
1638            blob.resize(partial_size).await.expect("Failed to resize");
1639            blob.sync().await.expect("Failed to sync");
1640            drop(blob);
1641
1642            // Reinitialize - should handle partial entry gracefully
1643            let mut oversized: Oversized<_, TestEntry, TestValue> =
1644                Oversized::init(context.with_label("second"), cfg.clone())
1645                    .await
1646                    .expect("Failed to reinit");
1647
1648            // First 3 entries should still be valid
1649            for i in 0..3u8 {
1650                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1651                assert_eq!(entry.id, i as u64);
1652            }
1653
1654            // Entry 3 should not exist (partial entry was removed)
1655            assert!(oversized.get(1, 3).await.is_err());
1656
1657            // Append new entry after recovery
1658            let value: TestValue = [42; 16];
1659            let entry = TestEntry::new(100, 0, 0);
1660            let (pos, offset, size) = oversized
1661                .append(1, entry, &value)
1662                .await
1663                .expect("Failed to append after recovery");
1664            assert_eq!(pos, 3);
1665
1666            // Verify we can read the new entry
1667            let retrieved = oversized.get(1, 3).await.expect("Failed to get new entry");
1668            assert_eq!(retrieved.id, 100);
1669            let retrieved_value = oversized
1670                .get_value(1, offset, size)
1671                .await
1672                .expect("Failed to get new value");
1673            assert_eq!(retrieved_value, value);
1674
1675            oversized.destroy().await.expect("Failed to destroy");
1676        });
1677    }
1678
1679    #[test_traced]
1680    fn test_recovery_only_partial_entry() {
1681        let executor = deterministic::Runner::default();
1682        executor.start(|context| async move {
1683            let cfg = test_cfg(&context);
1684
1685            // Create and populate with single entry
1686            let mut oversized: Oversized<_, TestEntry, TestValue> =
1687                Oversized::init(context.with_label("first"), cfg.clone())
1688                    .await
1689                    .expect("Failed to init");
1690
1691            let value: TestValue = [42; 16];
1692            let entry = TestEntry::new(1, 0, 0);
1693            oversized
1694                .append(1, entry, &value)
1695                .await
1696                .expect("Failed to append");
1697            oversized.sync(1).await.expect("Failed to sync");
1698            drop(oversized);
1699
1700            // Truncate index to only partial data (less than one full entry)
1701            let (blob, _) = context
1702                .open(&cfg.index_partition, &1u64.to_be_bytes())
1703                .await
1704                .expect("Failed to open blob");
1705            blob.resize(10).await.expect("Failed to resize"); // Less than chunk size
1706            blob.sync().await.expect("Failed to sync");
1707            drop(blob);
1708
1709            // Reinitialize - should handle gracefully (rewind to 0)
1710            let mut oversized: Oversized<_, TestEntry, TestValue> =
1711                Oversized::init(context.with_label("second"), cfg.clone())
1712                    .await
1713                    .expect("Failed to reinit");
1714
1715            // No entries should exist
1716            assert!(oversized.get(1, 0).await.is_err());
1717
1718            // Should be able to append after recovery
1719            let value: TestValue = [99; 16];
1720            let entry = TestEntry::new(100, 0, 0);
1721            let (pos, offset, size) = oversized
1722                .append(1, entry, &value)
1723                .await
1724                .expect("Failed to append after recovery");
1725            assert_eq!(pos, 0);
1726
1727            let retrieved = oversized.get(1, 0).await.expect("Failed to get");
1728            assert_eq!(retrieved.id, 100);
1729            let retrieved_value = oversized
1730                .get_value(1, offset, size)
1731                .await
1732                .expect("Failed to get value");
1733            assert_eq!(retrieved_value, value);
1734
1735            oversized.destroy().await.expect("Failed to destroy");
1736        });
1737    }
1738
1739    #[test_traced]
1740    fn test_recovery_crash_during_rewind_index_ahead() {
1741        // Simulates crash where index was rewound but glob wasn't
1742        let executor = deterministic::Runner::default();
1743        executor.start(|context| async move {
1744            // Use page size = entry size so each entry is exactly one page.
1745            // This allows truncating by entry count to equal truncating by full pages,
1746            // maintaining page-level integrity.
1747            let cfg = Config {
1748                index_partition: "test-index".into(),
1749                value_partition: "test-values".into(),
1750                index_page_cache: CacheRef::from_pooler(
1751                    &context,
1752                    NZU16!(TestEntry::SIZE as u16),
1753                    NZUsize!(8),
1754                ),
1755                index_write_buffer: NZUsize!(1024),
1756                value_write_buffer: NZUsize!(1024),
1757                compression: None,
1758                codec_config: (),
1759            };
1760
1761            // Create and populate
1762            let mut oversized: Oversized<_, TestEntry, TestValue> =
1763                Oversized::init(context.with_label("first"), cfg.clone())
1764                    .await
1765                    .expect("Failed to init");
1766
1767            let mut locations = Vec::new();
1768            for i in 0..5u8 {
1769                let value: TestValue = [i; 16];
1770                let entry = TestEntry::new(i as u64, 0, 0);
1771                let loc = oversized
1772                    .append(1, entry, &value)
1773                    .await
1774                    .expect("Failed to append");
1775                locations.push(loc);
1776            }
1777            oversized.sync(1).await.expect("Failed to sync");
1778            drop(oversized);
1779
1780            // Simulate crash during rewind: truncate index to 2 entries but leave glob intact
1781            // This simulates: rewind(index) succeeded, crash before rewind(glob)
1782            let (blob, _) = context
1783                .open(&cfg.index_partition, &1u64.to_be_bytes())
1784                .await
1785                .expect("Failed to open blob");
1786            // Physical page size = logical (20) + CRC record (12) = 32 bytes
1787            let physical_page_size = (TestEntry::SIZE + 12) as u64;
1788            blob.resize(2 * physical_page_size)
1789                .await
1790                .expect("Failed to truncate");
1791            blob.sync().await.expect("Failed to sync");
1792            drop(blob);
1793
1794            // Reinitialize - recovery should succeed (glob has orphan data)
1795            let mut oversized: Oversized<_, TestEntry, TestValue> =
1796                Oversized::init(context.with_label("second"), cfg.clone())
1797                    .await
1798                    .expect("Failed to reinit");
1799
1800            // First 2 entries should be valid
1801            for i in 0..2u8 {
1802                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1803                assert_eq!(entry.id, i as u64);
1804            }
1805
1806            // Entries 2-4 should be gone (index was truncated)
1807            assert!(oversized.get(1, 2).await.is_err());
1808
1809            // Should be able to append new entries
1810            let (pos, _, _) = oversized
1811                .append(1, TestEntry::new(100, 0, 0), &[100u8; 16])
1812                .await
1813                .expect("Failed to append");
1814            assert_eq!(pos, 2);
1815
1816            oversized.destroy().await.expect("Failed to destroy");
1817        });
1818    }
1819
1820    #[test_traced]
1821    fn test_recovery_crash_during_rewind_glob_ahead() {
1822        // Simulates crash where glob was rewound but index wasn't
1823        let executor = deterministic::Runner::default();
1824        executor.start(|context| async move {
1825            let cfg = test_cfg(&context);
1826
1827            // Create and populate
1828            let mut oversized: Oversized<_, TestEntry, TestValue> =
1829                Oversized::init(context.with_label("first"), cfg.clone())
1830                    .await
1831                    .expect("Failed to init");
1832
1833            let mut locations = Vec::new();
1834            for i in 0..5u8 {
1835                let value: TestValue = [i; 16];
1836                let entry = TestEntry::new(i as u64, 0, 0);
1837                let loc = oversized
1838                    .append(1, entry, &value)
1839                    .await
1840                    .expect("Failed to append");
1841                locations.push(loc);
1842            }
1843            oversized.sync(1).await.expect("Failed to sync");
1844            drop(oversized);
1845
1846            // Simulate crash during rewind: truncate glob to 2 entries but leave index intact
1847            // This simulates: rewind(glob) succeeded, crash before rewind(index)
1848            let (blob, _) = context
1849                .open(&cfg.value_partition, &1u64.to_be_bytes())
1850                .await
1851                .expect("Failed to open blob");
1852            let keep_size = byte_end(locations[1].1, locations[1].2);
1853            blob.resize(keep_size).await.expect("Failed to truncate");
1854            blob.sync().await.expect("Failed to sync");
1855            drop(blob);
1856
1857            // Reinitialize - recovery should detect index entries pointing beyond glob
1858            let mut oversized: Oversized<_, TestEntry, TestValue> =
1859                Oversized::init(context.with_label("second"), cfg.clone())
1860                    .await
1861                    .expect("Failed to reinit");
1862
1863            // First 2 entries should be valid (index rewound to match glob)
1864            for i in 0..2u8 {
1865                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1866                assert_eq!(entry.id, i as u64);
1867            }
1868
1869            // Entries 2-4 should be gone (index rewound during recovery)
1870            assert!(oversized.get(1, 2).await.is_err());
1871
1872            // Should be able to append after recovery
1873            let value: TestValue = [99; 16];
1874            let entry = TestEntry::new(100, 0, 0);
1875            let (pos, offset, size) = oversized
1876                .append(1, entry, &value)
1877                .await
1878                .expect("Failed to append after recovery");
1879            assert_eq!(pos, 2);
1880
1881            let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1882            assert_eq!(retrieved.id, 100);
1883            let retrieved_value = oversized
1884                .get_value(1, offset, size)
1885                .await
1886                .expect("Failed to get value");
1887            assert_eq!(retrieved_value, value);
1888
1889            oversized.destroy().await.expect("Failed to destroy");
1890        });
1891    }
1892
1893    #[test_traced]
1894    fn test_oversized_get_value_invalid_size() {
1895        let executor = deterministic::Runner::default();
1896        executor.start(|context| async move {
1897            let cfg = test_cfg(&context);
1898            let mut oversized: Oversized<_, TestEntry, TestValue> =
1899                Oversized::init(context, cfg).await.expect("Failed to init");
1900
1901            let value: TestValue = [42; 16];
1902            let entry = TestEntry::new(1, 0, 0);
1903            let (_, offset, _size) = oversized
1904                .append(1, entry, &value)
1905                .await
1906                .expect("Failed to append");
1907            oversized.sync(1).await.expect("Failed to sync");
1908
1909            // Size 0 - should fail
1910            assert!(oversized.get_value(1, offset, 0).await.is_err());
1911
1912            // Size < value size - should fail with codec error, checksum mismatch, or
1913            // insufficient length (if size < 4 bytes for checksum)
1914            for size in 1..4u32 {
1915                let result = oversized.get_value(1, offset, size).await;
1916                assert!(
1917                    matches!(
1918                        result,
1919                        Err(Error::Codec(_))
1920                            | Err(Error::ChecksumMismatch(_, _))
1921                            | Err(Error::Runtime(_))
1922                    ),
1923                    "expected error, got: {:?}",
1924                    result
1925                );
1926            }
1927
1928            oversized.destroy().await.expect("Failed to destroy");
1929        });
1930    }
1931
1932    #[test_traced]
1933    fn test_oversized_get_value_wrong_size() {
1934        let executor = deterministic::Runner::default();
1935        executor.start(|context| async move {
1936            let cfg = test_cfg(&context);
1937            let mut oversized: Oversized<_, TestEntry, TestValue> =
1938                Oversized::init(context, cfg).await.expect("Failed to init");
1939
1940            let value: TestValue = [42; 16];
1941            let entry = TestEntry::new(1, 0, 0);
1942            let (_, offset, correct_size) = oversized
1943                .append(1, entry, &value)
1944                .await
1945                .expect("Failed to append");
1946            oversized.sync(1).await.expect("Failed to sync");
1947
1948            // Size too small - will fail to decode or checksum mismatch
1949            // (checksum mismatch can occur because we read wrong bytes as the checksum)
1950            let result = oversized.get_value(1, offset, correct_size - 1).await;
1951            assert!(
1952                matches!(
1953                    result,
1954                    Err(Error::Codec(_)) | Err(Error::ChecksumMismatch(_, _))
1955                ),
1956                "expected Codec or ChecksumMismatch error, got: {:?}",
1957                result
1958            );
1959
1960            oversized.destroy().await.expect("Failed to destroy");
1961        });
1962    }
1963
1964    #[test_traced]
1965    fn test_recovery_values_has_orphan_section() {
1966        let executor = deterministic::Runner::default();
1967        executor.start(|context| async move {
1968            let cfg = test_cfg(&context);
1969
1970            // Create and populate with sections 1 and 2
1971            let mut oversized: Oversized<_, TestEntry, TestValue> =
1972                Oversized::init(context.with_label("first"), cfg.clone())
1973                    .await
1974                    .expect("Failed to init");
1975
1976            for section in 1u64..=2 {
1977                let value: TestValue = [section as u8; 16];
1978                let entry = TestEntry::new(section, 0, 0);
1979                oversized
1980                    .append(section, entry, &value)
1981                    .await
1982                    .expect("Failed to append");
1983                oversized.sync(section).await.expect("Failed to sync");
1984            }
1985            drop(oversized);
1986
1987            // Manually create an orphan value section (section 3) without corresponding index
1988            let glob_cfg = GlobConfig {
1989                partition: cfg.value_partition.clone(),
1990                compression: cfg.compression,
1991                codec_config: (),
1992                write_buffer: cfg.value_write_buffer,
1993            };
1994            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1995                .await
1996                .expect("Failed to init glob");
1997            let orphan_value: TestValue = [99; 16];
1998            glob.append(3, &orphan_value)
1999                .await
2000                .expect("Failed to append orphan");
2001            glob.sync(3).await.expect("Failed to sync glob");
2002            drop(glob);
2003
2004            // Reinitialize - should detect and remove the orphan section
2005            let oversized: Oversized<_, TestEntry, TestValue> =
2006                Oversized::init(context.with_label("second"), cfg.clone())
2007                    .await
2008                    .expect("Failed to reinit");
2009
2010            // Sections 1 and 2 should still be valid
2011            assert!(oversized.get(1, 0).await.is_ok());
2012            assert!(oversized.get(2, 0).await.is_ok());
2013
2014            // Newest section should be 2 (orphan was removed)
2015            assert_eq!(oversized.newest_section(), Some(2));
2016
2017            oversized.destroy().await.expect("Failed to destroy");
2018        });
2019    }
2020
2021    #[test_traced]
2022    fn test_recovery_values_has_multiple_orphan_sections() {
2023        let executor = deterministic::Runner::default();
2024        executor.start(|context| async move {
2025            let cfg = test_cfg(&context);
2026
2027            // Create and populate with only section 1
2028            let mut oversized: Oversized<_, TestEntry, TestValue> =
2029                Oversized::init(context.with_label("first"), cfg.clone())
2030                    .await
2031                    .expect("Failed to init");
2032
2033            let value: TestValue = [1; 16];
2034            let entry = TestEntry::new(1, 0, 0);
2035            oversized
2036                .append(1, entry, &value)
2037                .await
2038                .expect("Failed to append");
2039            oversized.sync(1).await.expect("Failed to sync");
2040            drop(oversized);
2041
2042            // Manually create multiple orphan value sections (2, 3, 4)
2043            let glob_cfg = GlobConfig {
2044                partition: cfg.value_partition.clone(),
2045                compression: cfg.compression,
2046                codec_config: (),
2047                write_buffer: cfg.value_write_buffer,
2048            };
2049            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2050                .await
2051                .expect("Failed to init glob");
2052
2053            for section in 2u64..=4 {
2054                let orphan_value: TestValue = [section as u8; 16];
2055                glob.append(section, &orphan_value)
2056                    .await
2057                    .expect("Failed to append orphan");
2058                glob.sync(section).await.expect("Failed to sync glob");
2059            }
2060            drop(glob);
2061
2062            // Reinitialize - should detect and remove all orphan sections
2063            let oversized: Oversized<_, TestEntry, TestValue> =
2064                Oversized::init(context.with_label("second"), cfg.clone())
2065                    .await
2066                    .expect("Failed to reinit");
2067
2068            // Section 1 should still be valid
2069            assert!(oversized.get(1, 0).await.is_ok());
2070
2071            // Newest section should be 1 (orphans removed)
2072            assert_eq!(oversized.newest_section(), Some(1));
2073
2074            oversized.destroy().await.expect("Failed to destroy");
2075        });
2076    }
2077
2078    #[test_traced]
2079    fn test_recovery_index_empty_but_values_exist() {
2080        let executor = deterministic::Runner::default();
2081        executor.start(|context| async move {
2082            let cfg = test_cfg(&context);
2083
2084            // Manually create value sections without any index entries
2085            let glob_cfg = GlobConfig {
2086                partition: cfg.value_partition.clone(),
2087                compression: cfg.compression,
2088                codec_config: (),
2089                write_buffer: cfg.value_write_buffer,
2090            };
2091            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2092                .await
2093                .expect("Failed to init glob");
2094
2095            for section in 1u64..=3 {
2096                let orphan_value: TestValue = [section as u8; 16];
2097                glob.append(section, &orphan_value)
2098                    .await
2099                    .expect("Failed to append orphan");
2100                glob.sync(section).await.expect("Failed to sync glob");
2101            }
2102            drop(glob);
2103
2104            // Initialize oversized - should remove all orphan value sections
2105            let oversized: Oversized<_, TestEntry, TestValue> =
2106                Oversized::init(context.with_label("first"), cfg.clone())
2107                    .await
2108                    .expect("Failed to init");
2109
2110            // No sections should exist
2111            assert_eq!(oversized.newest_section(), None);
2112            assert_eq!(oversized.oldest_section(), None);
2113
2114            oversized.destroy().await.expect("Failed to destroy");
2115        });
2116    }
2117
2118    #[test_traced]
2119    fn test_recovery_orphan_section_append_after() {
2120        let executor = deterministic::Runner::default();
2121        executor.start(|context| async move {
2122            let cfg = test_cfg(&context);
2123
2124            // Create and populate with section 1
2125            let mut oversized: Oversized<_, TestEntry, TestValue> =
2126                Oversized::init(context.with_label("first"), cfg.clone())
2127                    .await
2128                    .expect("Failed to init");
2129
2130            let value: TestValue = [1; 16];
2131            let entry = TestEntry::new(1, 0, 0);
2132            let (_, offset1, size1) = oversized
2133                .append(1, entry, &value)
2134                .await
2135                .expect("Failed to append");
2136            oversized.sync(1).await.expect("Failed to sync");
2137            drop(oversized);
2138
2139            // Manually create orphan value sections (2, 3)
2140            let glob_cfg = GlobConfig {
2141                partition: cfg.value_partition.clone(),
2142                compression: cfg.compression,
2143                codec_config: (),
2144                write_buffer: cfg.value_write_buffer,
2145            };
2146            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2147                .await
2148                .expect("Failed to init glob");
2149
2150            for section in 2u64..=3 {
2151                let orphan_value: TestValue = [section as u8; 16];
2152                glob.append(section, &orphan_value)
2153                    .await
2154                    .expect("Failed to append orphan");
2155                glob.sync(section).await.expect("Failed to sync glob");
2156            }
2157            drop(glob);
2158
2159            // Reinitialize - should remove orphan sections
2160            let mut oversized: Oversized<_, TestEntry, TestValue> =
2161                Oversized::init(context.with_label("second"), cfg.clone())
2162                    .await
2163                    .expect("Failed to reinit");
2164
2165            // Section 1 should still be valid
2166            let entry = oversized.get(1, 0).await.expect("Failed to get");
2167            assert_eq!(entry.id, 1);
2168            let value = oversized
2169                .get_value(1, offset1, size1)
2170                .await
2171                .expect("Failed to get value");
2172            assert_eq!(value, [1; 16]);
2173
2174            // Should be able to append to section 2 after recovery
2175            let new_value: TestValue = [42; 16];
2176            let new_entry = TestEntry::new(42, 0, 0);
2177            let (pos, offset, size) = oversized
2178                .append(2, new_entry, &new_value)
2179                .await
2180                .expect("Failed to append after recovery");
2181            assert_eq!(pos, 0);
2182
2183            // Verify the new entry
2184            let retrieved = oversized.get(2, 0).await.expect("Failed to get");
2185            assert_eq!(retrieved.id, 42);
2186            let retrieved_value = oversized
2187                .get_value(2, offset, size)
2188                .await
2189                .expect("Failed to get value");
2190            assert_eq!(retrieved_value, new_value);
2191
2192            // Sync and restart to verify persistence
2193            oversized.sync(2).await.expect("Failed to sync");
2194            drop(oversized);
2195
2196            let oversized: Oversized<_, TestEntry, TestValue> =
2197                Oversized::init(context.with_label("third"), cfg)
2198                    .await
2199                    .expect("Failed to reinit after append");
2200
2201            // Both sections should be valid
2202            assert!(oversized.get(1, 0).await.is_ok());
2203            assert!(oversized.get(2, 0).await.is_ok());
2204            assert_eq!(oversized.newest_section(), Some(2));
2205
2206            oversized.destroy().await.expect("Failed to destroy");
2207        });
2208    }
2209
2210    #[test_traced]
2211    fn test_recovery_no_orphan_sections() {
2212        let executor = deterministic::Runner::default();
2213        executor.start(|context| async move {
2214            let cfg = test_cfg(&context);
2215
2216            // Create and populate with sections 1, 2, 3 (no orphans)
2217            let mut oversized: Oversized<_, TestEntry, TestValue> =
2218                Oversized::init(context.with_label("first"), cfg.clone())
2219                    .await
2220                    .expect("Failed to init");
2221
2222            for section in 1u64..=3 {
2223                let value: TestValue = [section as u8; 16];
2224                let entry = TestEntry::new(section, 0, 0);
2225                oversized
2226                    .append(section, entry, &value)
2227                    .await
2228                    .expect("Failed to append");
2229                oversized.sync(section).await.expect("Failed to sync");
2230            }
2231            drop(oversized);
2232
2233            // Reinitialize - no orphan cleanup needed
2234            let oversized: Oversized<_, TestEntry, TestValue> =
2235                Oversized::init(context.with_label("second"), cfg)
2236                    .await
2237                    .expect("Failed to reinit");
2238
2239            // All sections should be valid
2240            for section in 1u64..=3 {
2241                let entry = oversized.get(section, 0).await.expect("Failed to get");
2242                assert_eq!(entry.id, section);
2243            }
2244            assert_eq!(oversized.newest_section(), Some(3));
2245
2246            oversized.destroy().await.expect("Failed to destroy");
2247        });
2248    }
2249
2250    #[test_traced]
2251    fn test_recovery_orphan_with_empty_index_section() {
2252        let executor = deterministic::Runner::default();
2253        executor.start(|context| async move {
2254            let cfg = test_cfg(&context);
2255
2256            // Create and populate section 1 with entries
2257            let mut oversized: Oversized<_, TestEntry, TestValue> =
2258                Oversized::init(context.with_label("first"), cfg.clone())
2259                    .await
2260                    .expect("Failed to init");
2261
2262            let value: TestValue = [1; 16];
2263            let entry = TestEntry::new(1, 0, 0);
2264            oversized
2265                .append(1, entry, &value)
2266                .await
2267                .expect("Failed to append");
2268            oversized.sync(1).await.expect("Failed to sync");
2269            drop(oversized);
2270
2271            // Manually create orphan value section 2
2272            let glob_cfg = GlobConfig {
2273                partition: cfg.value_partition.clone(),
2274                compression: cfg.compression,
2275                codec_config: (),
2276                write_buffer: cfg.value_write_buffer,
2277            };
2278            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2279                .await
2280                .expect("Failed to init glob");
2281            let orphan_value: TestValue = [2; 16];
2282            glob.append(2, &orphan_value)
2283                .await
2284                .expect("Failed to append orphan");
2285            glob.sync(2).await.expect("Failed to sync glob");
2286            drop(glob);
2287
2288            // Now truncate index section 1 to 0 (making it empty but still tracked)
2289            let (blob, _) = context
2290                .open(&cfg.index_partition, &1u64.to_be_bytes())
2291                .await
2292                .expect("Failed to open blob");
2293            blob.resize(0).await.expect("Failed to truncate");
2294            blob.sync().await.expect("Failed to sync");
2295            drop(blob);
2296
2297            // Reinitialize - should handle empty index section and remove orphan value section
2298            let oversized: Oversized<_, TestEntry, TestValue> =
2299                Oversized::init(context.with_label("second"), cfg)
2300                    .await
2301                    .expect("Failed to reinit");
2302
2303            // Section 1 should exist but have no entries (empty after truncation)
2304            assert!(oversized.get(1, 0).await.is_err());
2305
2306            // Orphan section 2 should be removed
2307            assert_eq!(oversized.newest_section(), Some(1));
2308
2309            oversized.destroy().await.expect("Failed to destroy");
2310        });
2311    }
2312
2313    #[test_traced]
2314    fn test_recovery_orphan_sections_with_gaps() {
2315        // Test non-contiguous sections: index has [1, 3, 5], values has [1, 2, 3, 4, 5, 6]
2316        // Orphan sections 2, 4, 6 should be removed
2317        let executor = deterministic::Runner::default();
2318        executor.start(|context| async move {
2319            let cfg = test_cfg(&context);
2320
2321            // Create index with sections 1, 3, 5 (gaps)
2322            let mut oversized: Oversized<_, TestEntry, TestValue> =
2323                Oversized::init(context.with_label("first"), cfg.clone())
2324                    .await
2325                    .expect("Failed to init");
2326
2327            for section in [1u64, 3, 5] {
2328                let value: TestValue = [section as u8; 16];
2329                let entry = TestEntry::new(section, 0, 0);
2330                oversized
2331                    .append(section, entry, &value)
2332                    .await
2333                    .expect("Failed to append");
2334                oversized.sync(section).await.expect("Failed to sync");
2335            }
2336            drop(oversized);
2337
2338            // Manually create orphan value sections 2, 4, 6 (filling gaps and beyond)
2339            let glob_cfg = GlobConfig {
2340                partition: cfg.value_partition.clone(),
2341                compression: cfg.compression,
2342                codec_config: (),
2343                write_buffer: cfg.value_write_buffer,
2344            };
2345            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2346                .await
2347                .expect("Failed to init glob");
2348
2349            for section in [2u64, 4, 6] {
2350                let orphan_value: TestValue = [section as u8; 16];
2351                glob.append(section, &orphan_value)
2352                    .await
2353                    .expect("Failed to append orphan");
2354                glob.sync(section).await.expect("Failed to sync glob");
2355            }
2356            drop(glob);
2357
2358            // Reinitialize - should remove orphan sections 2, 4, 6
2359            let oversized: Oversized<_, TestEntry, TestValue> =
2360                Oversized::init(context.with_label("second"), cfg)
2361                    .await
2362                    .expect("Failed to reinit");
2363
2364            // Sections 1, 3, 5 should still be valid
2365            for section in [1u64, 3, 5] {
2366                let entry = oversized.get(section, 0).await.expect("Failed to get");
2367                assert_eq!(entry.id, section);
2368            }
2369
2370            // Verify only sections 1, 3, 5 exist (orphans removed)
2371            assert_eq!(oversized.oldest_section(), Some(1));
2372            assert_eq!(oversized.newest_section(), Some(5));
2373
2374            oversized.destroy().await.expect("Failed to destroy");
2375        });
2376    }
2377
2378    #[test_traced]
2379    fn test_recovery_glob_trailing_garbage_truncated() {
2380        // Tests the bug fix: when value is written to glob but index entry isn't
2381        // (crash after value write, before index write), recovery should truncate
2382        // the glob trailing garbage so subsequent appends start at correct offset.
2383        let executor = deterministic::Runner::default();
2384        executor.start(|context| async move {
2385            let cfg = test_cfg(&context);
2386
2387            // Create and populate
2388            let mut oversized: Oversized<_, TestEntry, TestValue> =
2389                Oversized::init(context.with_label("first"), cfg.clone())
2390                    .await
2391                    .expect("Failed to init");
2392
2393            // Append 2 entries
2394            let mut locations = Vec::new();
2395            for i in 0..2u8 {
2396                let value: TestValue = [i; 16];
2397                let entry = TestEntry::new(i as u64, 0, 0);
2398                let loc = oversized
2399                    .append(1, entry, &value)
2400                    .await
2401                    .expect("Failed to append");
2402                locations.push(loc);
2403            }
2404            oversized.sync(1).await.expect("Failed to sync");
2405
2406            // Record where next entry SHOULD start (end of entry 1)
2407            let expected_next_offset = byte_end(locations[1].1, locations[1].2);
2408            drop(oversized);
2409
2410            // Simulate crash: write garbage to glob (simulating partial value write)
2411            let (blob, size) = context
2412                .open(&cfg.value_partition, &1u64.to_be_bytes())
2413                .await
2414                .expect("Failed to open blob");
2415            assert_eq!(size, expected_next_offset);
2416
2417            // Write 100 bytes of garbage (simulating partial/failed value write)
2418            let garbage = vec![0xDE; 100];
2419            blob.write_at(size, garbage)
2420                .await
2421                .expect("Failed to write garbage");
2422            blob.sync().await.expect("Failed to sync");
2423            drop(blob);
2424
2425            // Verify glob now has trailing garbage
2426            let (blob, new_size) = context
2427                .open(&cfg.value_partition, &1u64.to_be_bytes())
2428                .await
2429                .expect("Failed to open blob");
2430            assert_eq!(new_size, expected_next_offset + 100);
2431            drop(blob);
2432
2433            // Reinitialize - should truncate the trailing garbage
2434            let mut oversized: Oversized<_, TestEntry, TestValue> =
2435                Oversized::init(context.with_label("second"), cfg.clone())
2436                    .await
2437                    .expect("Failed to reinit");
2438
2439            // First 2 entries should still be valid
2440            for i in 0..2u8 {
2441                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2442                assert_eq!(entry.id, i as u64);
2443            }
2444
2445            // Append new entry - should start at expected_next_offset, NOT at garbage end
2446            let new_value: TestValue = [99; 16];
2447            let new_entry = TestEntry::new(99, 0, 0);
2448            let (pos, offset, _size) = oversized
2449                .append(1, new_entry, &new_value)
2450                .await
2451                .expect("Failed to append after recovery");
2452
2453            // Verify position is 2 (after the 2 existing entries)
2454            assert_eq!(pos, 2);
2455
2456            // Verify offset is at expected_next_offset (garbage was truncated)
2457            assert_eq!(offset, expected_next_offset);
2458
2459            // Verify we can read the new entry
2460            let retrieved = oversized.get(1, 2).await.expect("Failed to get new entry");
2461            assert_eq!(retrieved.id, 99);
2462
2463            oversized.destroy().await.expect("Failed to destroy");
2464        });
2465    }
2466
2467    #[test_traced]
2468    fn test_recovery_entry_with_overflow_offset() {
2469        // Tests that an entry with offset near u64::MAX that would overflow
2470        // when added to size is detected as invalid during recovery.
2471        let executor = deterministic::Runner::default();
2472        executor.start(|context| async move {
2473            // Use page size = entry size so one entry per page
2474            let cfg = Config {
2475                index_partition: "test-index".into(),
2476                value_partition: "test-values".into(),
2477                index_page_cache: CacheRef::from_pooler(
2478                    &context,
2479                    NZU16!(TestEntry::SIZE as u16),
2480                    NZUsize!(8),
2481                ),
2482                index_write_buffer: NZUsize!(1024),
2483                value_write_buffer: NZUsize!(1024),
2484                compression: None,
2485                codec_config: (),
2486            };
2487
2488            // Create and populate with valid entry
2489            let mut oversized: Oversized<_, TestEntry, TestValue> =
2490                Oversized::init(context.with_label("first"), cfg.clone())
2491                    .await
2492                    .expect("Failed to init");
2493
2494            let value: TestValue = [1; 16];
2495            let entry = TestEntry::new(1, 0, 0);
2496            oversized
2497                .append(1, entry, &value)
2498                .await
2499                .expect("Failed to append");
2500            oversized.sync(1).await.expect("Failed to sync");
2501            drop(oversized);
2502
2503            // Build a corrupted entry with offset near u64::MAX that would overflow.
2504            // We need to write a valid page (with correct page-level CRC) containing
2505            // the semantically-invalid entry data.
2506            let (blob, _) = context
2507                .open(&cfg.index_partition, &1u64.to_be_bytes())
2508                .await
2509                .expect("Failed to open blob");
2510
2511            // Build entry data: id (8) + value_offset (8) + value_size (4) = 20 bytes
2512            let mut entry_data = Vec::new();
2513            1u64.write(&mut entry_data); // id
2514            (u64::MAX - 10).write(&mut entry_data); // value_offset (near max)
2515            100u32.write(&mut entry_data); // value_size (offset + size overflows)
2516            assert_eq!(entry_data.len(), TestEntry::SIZE);
2517
2518            // Build page-level CRC record (12 bytes):
2519            // len1 (2) + crc1 (4) + len2 (2) + crc2 (4)
2520            let crc = Crc32::checksum(&entry_data);
2521            let len1 = TestEntry::SIZE as u16;
2522            let mut crc_record = Vec::new();
2523            crc_record.extend_from_slice(&len1.to_be_bytes()); // len1
2524            crc_record.extend_from_slice(&crc.to_be_bytes()); // crc1
2525            crc_record.extend_from_slice(&0u16.to_be_bytes()); // len2 (unused)
2526            crc_record.extend_from_slice(&0u32.to_be_bytes()); // crc2 (unused)
2527            assert_eq!(crc_record.len(), 12);
2528
2529            // Write the complete physical page: entry_data + crc_record
2530            let mut page = entry_data;
2531            page.extend_from_slice(&crc_record);
2532            blob.write_at(0, page)
2533                .await
2534                .expect("Failed to write corrupted page");
2535            blob.sync().await.expect("Failed to sync");
2536            drop(blob);
2537
2538            // Reinitialize - recovery should detect the invalid entry
2539            // (offset + size would overflow, and even with saturating_add it exceeds glob_size)
2540            let mut oversized: Oversized<_, TestEntry, TestValue> =
2541                Oversized::init(context.with_label("second"), cfg.clone())
2542                    .await
2543                    .expect("Failed to reinit");
2544
2545            // The corrupted entry should have been rewound (invalid)
2546            assert!(oversized.get(1, 0).await.is_err());
2547
2548            // Should be able to append after recovery
2549            let new_value: TestValue = [99; 16];
2550            let new_entry = TestEntry::new(99, 0, 0);
2551            let (pos, new_offset, _) = oversized
2552                .append(1, new_entry, &new_value)
2553                .await
2554                .expect("Failed to append after recovery");
2555
2556            // Position should be 0 (corrupted entry was removed)
2557            assert_eq!(pos, 0);
2558            // Offset should be 0 (glob was truncated to 0)
2559            assert_eq!(new_offset, 0);
2560
2561            oversized.destroy().await.expect("Failed to destroy");
2562        });
2563    }
2564
2565    #[test_traced]
2566    fn test_empty_section_persistence() {
2567        // Tests that sections that become empty (all entries removed/rewound)
2568        // are handled correctly across restart cycles.
2569        let executor = deterministic::Runner::default();
2570        executor.start(|context| async move {
2571            let cfg = test_cfg(&context);
2572
2573            // Create and populate section 1 with entries
2574            let mut oversized: Oversized<_, TestEntry, TestValue> =
2575                Oversized::init(context.with_label("first"), cfg.clone())
2576                    .await
2577                    .expect("Failed to init");
2578
2579            for i in 0..3u8 {
2580                let value: TestValue = [i; 16];
2581                let entry = TestEntry::new(i as u64, 0, 0);
2582                oversized
2583                    .append(1, entry, &value)
2584                    .await
2585                    .expect("Failed to append");
2586            }
2587            oversized.sync(1).await.expect("Failed to sync");
2588
2589            // Also create section 2 to ensure it survives
2590            let value2: TestValue = [10; 16];
2591            let entry2 = TestEntry::new(10, 0, 0);
2592            oversized
2593                .append(2, entry2, &value2)
2594                .await
2595                .expect("Failed to append to section 2");
2596            oversized.sync(2).await.expect("Failed to sync section 2");
2597            drop(oversized);
2598
2599            // Truncate section 1's index to 0 (making it empty)
2600            let (blob, _) = context
2601                .open(&cfg.index_partition, &1u64.to_be_bytes())
2602                .await
2603                .expect("Failed to open blob");
2604            blob.resize(0).await.expect("Failed to truncate");
2605            blob.sync().await.expect("Failed to sync");
2606            drop(blob);
2607
2608            // First restart - recovery should handle empty section 1
2609            let mut oversized: Oversized<_, TestEntry, TestValue> =
2610                Oversized::init(context.with_label("second"), cfg.clone())
2611                    .await
2612                    .expect("Failed to reinit");
2613
2614            // Section 1 should exist but have no entries
2615            assert!(oversized.get(1, 0).await.is_err());
2616
2617            // Section 2 should still be valid
2618            let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2619            assert_eq!(entry.id, 10);
2620
2621            // Section 1 should still be tracked (blob exists but is empty)
2622            assert_eq!(oversized.oldest_section(), Some(1));
2623
2624            // Append to empty section 1
2625            // Note: When index is truncated to 0 but the index blob still exists,
2626            // the glob is NOT truncated (the section isn't considered an orphan).
2627            // The glob still has orphan DATA from the old entries, but this doesn't
2628            // affect correctness - new entries simply append after the orphan data.
2629            let new_value: TestValue = [99; 16];
2630            let new_entry = TestEntry::new(99, 0, 0);
2631            let (pos, offset, size) = oversized
2632                .append(1, new_entry, &new_value)
2633                .await
2634                .expect("Failed to append to empty section");
2635            assert_eq!(pos, 0);
2636            // Glob offset is non-zero because orphan data wasn't truncated
2637            assert!(offset > 0);
2638            oversized.sync(1).await.expect("Failed to sync");
2639
2640            // Verify the new entry is readable despite orphan data before it
2641            let entry = oversized.get(1, 0).await.expect("Failed to get");
2642            assert_eq!(entry.id, 99);
2643            let value = oversized
2644                .get_value(1, offset, size)
2645                .await
2646                .expect("Failed to get value");
2647            assert_eq!(value, new_value);
2648
2649            drop(oversized);
2650
2651            // Second restart - verify persistence
2652            let oversized: Oversized<_, TestEntry, TestValue> =
2653                Oversized::init(context.with_label("third"), cfg.clone())
2654                    .await
2655                    .expect("Failed to reinit again");
2656
2657            // Section 1's new entry should be valid
2658            let entry = oversized.get(1, 0).await.expect("Failed to get");
2659            assert_eq!(entry.id, 99);
2660
2661            // Section 2 should still be valid
2662            let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2663            assert_eq!(entry.id, 10);
2664
2665            oversized.destroy().await.expect("Failed to destroy");
2666        });
2667    }
2668
2669    #[test_traced]
2670    fn test_get_value_size_equals_crc_size() {
2671        // Tests the boundary condition where size = 4 (just CRC, no data).
2672        // This should fail because there's no actual data to decode.
2673        let executor = deterministic::Runner::default();
2674        executor.start(|context| async move {
2675            let cfg = test_cfg(&context);
2676            let mut oversized: Oversized<_, TestEntry, TestValue> =
2677                Oversized::init(context, cfg).await.expect("Failed to init");
2678
2679            let value: TestValue = [42; 16];
2680            let entry = TestEntry::new(1, 0, 0);
2681            let (_, offset, _) = oversized
2682                .append(1, entry, &value)
2683                .await
2684                .expect("Failed to append");
2685            oversized.sync(1).await.expect("Failed to sync");
2686
2687            // Size = 4 (exactly CRC_SIZE) means 0 bytes of actual data
2688            // This should fail with ChecksumMismatch or decode error
2689            let result = oversized.get_value(1, offset, 4).await;
2690            assert!(result.is_err());
2691
2692            oversized.destroy().await.expect("Failed to destroy");
2693        });
2694    }
2695
2696    #[test_traced]
2697    fn test_get_value_size_just_over_crc() {
2698        // Tests size = 5 (CRC + 1 byte of data).
2699        // This should fail because the data is too short to decode.
2700        let executor = deterministic::Runner::default();
2701        executor.start(|context| async move {
2702            let cfg = test_cfg(&context);
2703            let mut oversized: Oversized<_, TestEntry, TestValue> =
2704                Oversized::init(context, cfg).await.expect("Failed to init");
2705
2706            let value: TestValue = [42; 16];
2707            let entry = TestEntry::new(1, 0, 0);
2708            let (_, offset, _) = oversized
2709                .append(1, entry, &value)
2710                .await
2711                .expect("Failed to append");
2712            oversized.sync(1).await.expect("Failed to sync");
2713
2714            // Size = 5 means 1 byte of actual data (after stripping CRC)
2715            // This should fail with checksum mismatch since we're reading wrong bytes
2716            let result = oversized.get_value(1, offset, 5).await;
2717            assert!(result.is_err());
2718
2719            oversized.destroy().await.expect("Failed to destroy");
2720        });
2721    }
2722
2723    #[test_traced]
2724    fn test_recovery_maximum_section_numbers() {
2725        // Test recovery with very large section numbers near u64::MAX to check
2726        // for overflow edge cases in section arithmetic.
2727        let executor = deterministic::Runner::default();
2728        executor.start(|context| async move {
2729            let cfg = test_cfg(&context);
2730
2731            // Use section numbers near u64::MAX
2732            let large_sections = [u64::MAX - 3, u64::MAX - 2, u64::MAX - 1];
2733
2734            // Create and populate with large section numbers
2735            let mut oversized: Oversized<_, TestEntry, TestValue> =
2736                Oversized::init(context.with_label("first"), cfg.clone())
2737                    .await
2738                    .expect("Failed to init");
2739
2740            let mut locations = Vec::new();
2741            for &section in &large_sections {
2742                let value: TestValue = [(section & 0xFF) as u8; 16];
2743                let entry = TestEntry::new(section, 0, 0);
2744                let loc = oversized
2745                    .append(section, entry, &value)
2746                    .await
2747                    .expect("Failed to append");
2748                locations.push((section, loc));
2749                oversized.sync(section).await.expect("Failed to sync");
2750            }
2751            drop(oversized);
2752
2753            // Simulate crash: truncate glob for middle section
2754            let middle_section = large_sections[1];
2755            let (blob, size) = context
2756                .open(&cfg.value_partition, &middle_section.to_be_bytes())
2757                .await
2758                .expect("Failed to open blob");
2759            blob.resize(size / 2).await.expect("Failed to truncate");
2760            blob.sync().await.expect("Failed to sync");
2761            drop(blob);
2762
2763            // Reinitialize - should recover without overflow panics
2764            let mut oversized: Oversized<_, TestEntry, TestValue> =
2765                Oversized::init(context.with_label("second"), cfg.clone())
2766                    .await
2767                    .expect("Failed to reinit");
2768
2769            // First and last sections should still be valid
2770            let entry = oversized
2771                .get(large_sections[0], 0)
2772                .await
2773                .expect("Failed to get first section");
2774            assert_eq!(entry.id, large_sections[0]);
2775
2776            let entry = oversized
2777                .get(large_sections[2], 0)
2778                .await
2779                .expect("Failed to get last section");
2780            assert_eq!(entry.id, large_sections[2]);
2781
2782            // Middle section should have been rewound (no entries)
2783            assert!(oversized.get(middle_section, 0).await.is_err());
2784
2785            // Verify we can still append to these large sections
2786            let new_value: TestValue = [0xAB; 16];
2787            let new_entry = TestEntry::new(999, 0, 0);
2788            oversized
2789                .append(middle_section, new_entry, &new_value)
2790                .await
2791                .expect("Failed to append after recovery");
2792
2793            oversized.destroy().await.expect("Failed to destroy");
2794        });
2795    }
2796
2797    #[test_traced]
2798    fn test_recovery_crash_during_recovery_rewind() {
2799        // Tests a nested crash scenario: initial crash leaves inconsistent state,
2800        // then a second crash occurs during recovery's rewind operation.
2801        // This simulates the worst-case where recovery itself is interrupted.
2802        let executor = deterministic::Runner::default();
2803        executor.start(|context| async move {
2804            let cfg = test_cfg(&context);
2805
2806            // Phase 1: Create valid data with 5 entries
2807            let mut oversized: Oversized<_, TestEntry, TestValue> =
2808                Oversized::init(context.with_label("first"), cfg.clone())
2809                    .await
2810                    .expect("Failed to init");
2811
2812            let mut locations = Vec::new();
2813            for i in 0..5u8 {
2814                let value: TestValue = [i; 16];
2815                let entry = TestEntry::new(i as u64, 0, 0);
2816                let loc = oversized
2817                    .append(1, entry, &value)
2818                    .await
2819                    .expect("Failed to append");
2820                locations.push(loc);
2821            }
2822            oversized.sync(1).await.expect("Failed to sync");
2823            drop(oversized);
2824
2825            // Phase 2: Simulate first crash - truncate glob to lose last 2 entries
2826            let (blob, _) = context
2827                .open(&cfg.value_partition, &1u64.to_be_bytes())
2828                .await
2829                .expect("Failed to open blob");
2830            let keep_size = byte_end(locations[2].1, locations[2].2);
2831            blob.resize(keep_size).await.expect("Failed to truncate");
2832            blob.sync().await.expect("Failed to sync");
2833            drop(blob);
2834
2835            // Phase 3: Simulate crash during recovery's rewind
2836            // Recovery would try to rewind index from 5 entries to 3 entries.
2837            // Simulate partial rewind by manually truncating index to 4 entries
2838            // (as if crash occurred mid-rewind).
2839            let chunk_size = FixedJournal::<deterministic::Context, TestEntry>::CHUNK_SIZE as u64;
2840            let (index_blob, _) = context
2841                .open(&cfg.index_partition, &1u64.to_be_bytes())
2842                .await
2843                .expect("Failed to open index blob");
2844            let partial_rewind_size = 4 * chunk_size; // 4 entries instead of 3
2845            index_blob
2846                .resize(partial_rewind_size)
2847                .await
2848                .expect("Failed to resize");
2849            index_blob.sync().await.expect("Failed to sync");
2850            drop(index_blob);
2851
2852            // Phase 4: Second recovery attempt should handle the inconsistent state
2853            // Index has 4 entries, but glob only supports 3.
2854            let mut oversized: Oversized<_, TestEntry, TestValue> =
2855                Oversized::init(context.with_label("second"), cfg.clone())
2856                    .await
2857                    .expect("Failed to reinit after nested crash");
2858
2859            // Only first 3 entries should be valid (recovery should rewind again)
2860            for i in 0..3u8 {
2861                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2862                assert_eq!(entry.id, i as u64);
2863
2864                let (_, offset, size) = locations[i as usize];
2865                let value = oversized
2866                    .get_value(1, offset, size)
2867                    .await
2868                    .expect("Failed to get value");
2869                assert_eq!(value, [i; 16]);
2870            }
2871
2872            // Entry 3 should not exist (index was rewound to match glob)
2873            assert!(oversized.get(1, 3).await.is_err());
2874
2875            // Verify append works after nested crash recovery
2876            let new_value: TestValue = [0xFF; 16];
2877            let new_entry = TestEntry::new(100, 0, 0);
2878            let (pos, offset, _size) = oversized
2879                .append(1, new_entry, &new_value)
2880                .await
2881                .expect("Failed to append");
2882            assert_eq!(pos, 3); // Should be position 3 (after the 3 valid entries)
2883
2884            // Verify the offset starts where entry 2 ended (no gaps)
2885            assert_eq!(offset, byte_end(locations[2].1, locations[2].2));
2886
2887            oversized.destroy().await.expect("Failed to destroy");
2888        });
2889    }
2890
2891    #[test_traced]
2892    fn test_recovery_crash_during_orphan_cleanup() {
2893        // Tests crash during orphan section cleanup: recovery starts removing
2894        // orphan value sections, but crashes mid-cleanup.
2895        let executor = deterministic::Runner::default();
2896        executor.start(|context| async move {
2897            let cfg = test_cfg(&context);
2898
2899            // Phase 1: Create valid data in section 1
2900            let mut oversized: Oversized<_, TestEntry, TestValue> =
2901                Oversized::init(context.with_label("first"), cfg.clone())
2902                    .await
2903                    .expect("Failed to init");
2904
2905            let value: TestValue = [1; 16];
2906            let entry = TestEntry::new(1, 0, 0);
2907            let (_, offset1, size1) = oversized
2908                .append(1, entry, &value)
2909                .await
2910                .expect("Failed to append");
2911            oversized.sync(1).await.expect("Failed to sync");
2912            drop(oversized);
2913
2914            // Phase 2: Create orphan value sections 2, 3, 4 (no index entries)
2915            let glob_cfg = GlobConfig {
2916                partition: cfg.value_partition.clone(),
2917                compression: cfg.compression,
2918                codec_config: (),
2919                write_buffer: cfg.value_write_buffer,
2920            };
2921            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2922                .await
2923                .expect("Failed to init glob");
2924
2925            for section in 2u64..=4 {
2926                let orphan_value: TestValue = [section as u8; 16];
2927                glob.append(section, &orphan_value)
2928                    .await
2929                    .expect("Failed to append orphan");
2930                glob.sync(section).await.expect("Failed to sync glob");
2931            }
2932            drop(glob);
2933
2934            // Phase 3: Simulate partial orphan cleanup (section 2 removed, 3 and 4 remain)
2935            // This simulates a crash during cleanup_orphan_value_sections()
2936            context
2937                .remove(&cfg.value_partition, Some(&2u64.to_be_bytes()))
2938                .await
2939                .expect("Failed to remove section 2");
2940
2941            // Phase 4: Recovery should complete the cleanup
2942            let mut oversized: Oversized<_, TestEntry, TestValue> =
2943                Oversized::init(context.with_label("second"), cfg.clone())
2944                    .await
2945                    .expect("Failed to reinit");
2946
2947            // Section 1 should still be valid
2948            let entry = oversized.get(1, 0).await.expect("Failed to get");
2949            assert_eq!(entry.id, 1);
2950            let value = oversized
2951                .get_value(1, offset1, size1)
2952                .await
2953                .expect("Failed to get value");
2954            assert_eq!(value, [1; 16]);
2955
2956            // No orphan sections should remain
2957            assert_eq!(oversized.oldest_section(), Some(1));
2958            assert_eq!(oversized.newest_section(), Some(1));
2959
2960            // Should be able to append to section 2 (now clean)
2961            let new_value: TestValue = [42; 16];
2962            let new_entry = TestEntry::new(42, 0, 0);
2963            let (pos, _, _) = oversized
2964                .append(2, new_entry, &new_value)
2965                .await
2966                .expect("Failed to append to section 2");
2967            assert_eq!(pos, 0); // First entry in new section
2968
2969            oversized.destroy().await.expect("Failed to destroy");
2970        });
2971    }
2972
2973    #[test_traced]
2974    fn test_rewind_to_zero_index_size() {
2975        let executor = deterministic::Runner::default();
2976        executor.start(|context| async move {
2977            let cfg = test_cfg(&context);
2978            let mut oversized: Oversized<_, TestEntry, TestValue> =
2979                Oversized::init(context, cfg).await.expect("Failed to init");
2980
2981            let value: TestValue = [1; 16];
2982            let entry = TestEntry::new(1, 0, 0);
2983            oversized
2984                .append(0, entry, &value)
2985                .await
2986                .expect("Failed to append");
2987            oversized.sync(0).await.expect("Failed to sync");
2988
2989            oversized
2990                .rewind(0, 0)
2991                .await
2992                .expect("rewind to zero index_size must not fail");
2993
2994            assert_eq!(oversized.last(0).await.unwrap(), None);
2995            assert_eq!(oversized.size(0).await.unwrap(), 0);
2996            assert_eq!(oversized.value_size(0).await.unwrap(), 0);
2997
2998            oversized.destroy().await.expect("Failed to destroy");
2999        });
3000    }
3001
3002    #[test_traced]
3003    fn test_rewind_to_zero_on_missing_section() {
3004        let executor = deterministic::Runner::default();
3005        executor.start(|context| async move {
3006            let cfg = test_cfg(&context);
3007            let mut oversized: Oversized<_, TestEntry, TestValue> =
3008                Oversized::init(context, cfg).await.expect("Failed to init");
3009
3010            oversized
3011                .rewind(0, 0)
3012                .await
3013                .expect("rewind on missing section must not fail");
3014
3015            assert!(matches!(
3016                oversized.last(0).await,
3017                Err(Error::SectionOutOfRange(0))
3018            ));
3019            assert_eq!(oversized.value_size(0).await.unwrap(), 0);
3020
3021            oversized.destroy().await.expect("Failed to destroy");
3022        });
3023    }
3024
3025    #[test_traced]
3026    fn test_rewind_nonzero_on_missing_section_errors() {
3027        let executor = deterministic::Runner::default();
3028        executor.start(|context| async move {
3029            let cfg = test_cfg(&context);
3030            let mut oversized: Oversized<_, TestEntry, TestValue> =
3031                Oversized::init(context, cfg).await.expect("Failed to init");
3032
3033            let result = oversized.rewind(0, 1).await;
3034            assert!(
3035                matches!(result, Err(Error::SectionOutOfRange(0))),
3036                "nonzero index_size on missing section must fail, got: {result:?}"
3037            );
3038
3039            oversized.destroy().await.expect("Failed to destroy");
3040        });
3041    }
3042
3043    #[test_traced]
3044    fn test_rewind_section_nonzero_on_missing_section_errors() {
3045        let executor = deterministic::Runner::default();
3046        executor.start(|context| async move {
3047            let cfg = test_cfg(&context);
3048            let mut oversized: Oversized<_, TestEntry, TestValue> =
3049                Oversized::init(context, cfg).await.expect("Failed to init");
3050
3051            let result = oversized.rewind_section(0, 1).await;
3052            assert!(
3053                matches!(result, Err(Error::SectionOutOfRange(0))),
3054                "nonzero index_size on missing section must fail, got: {result:?}"
3055            );
3056
3057            oversized.destroy().await.expect("Failed to destroy");
3058        });
3059    }
3060
3061    #[test_traced]
3062    fn test_last_pruned_section_returns_error() {
3063        let executor = deterministic::Runner::default();
3064        executor.start(|context| async move {
3065            let cfg = test_cfg(&context);
3066            let mut oversized: Oversized<_, TestEntry, TestValue> =
3067                Oversized::init(context, cfg).await.expect("Failed to init");
3068
3069            let value: TestValue = [1; 16];
3070            oversized
3071                .append(0, TestEntry::new(1, 0, 0), &value)
3072                .await
3073                .expect("Failed to append");
3074            oversized
3075                .append(1, TestEntry::new(2, 0, 0), &value)
3076                .await
3077                .expect("Failed to append");
3078            oversized.sync_all().await.expect("Failed to sync");
3079
3080            oversized.prune(1).await.expect("Failed to prune");
3081
3082            assert!(matches!(
3083                oversized.last(0).await,
3084                Err(Error::AlreadyPrunedToSection(1))
3085            ));
3086            assert!(oversized.last(1).await.unwrap().is_some());
3087
3088            oversized.destroy().await.expect("Failed to destroy");
3089        });
3090    }
3091}