Skip to main content

commonware_storage/journal/segmented/
oversized.rs

1//! Segmented journal for oversized values.
2//!
3//! This module combines [super::fixed::Journal] with [super::glob::Glob] to handle
4//! entries that reference variable-length "oversized" values. It provides coordinated
5//! operations and built-in crash recovery.
6//!
7//! # Architecture
8//!
9//! ```text
10//! +-------------------+     +-------------------+
11//! | Fixed Journal     |     | Glob (Values)     |
12//! | (Index Entries)   |     |                   |
13//! +-------------------+     +-------------------+
14//! | entry_0           | --> | value_0           |
15//! | entry_1           | --> | value_1           |
16//! | ...               |     | ...               |
17//! +-------------------+     +-------------------+
18//! ```
19//!
20//! Each index entry contains `(value_offset, value_size)` pointing to its value in glob.
21//!
22//! # Crash Recovery
23//!
24//! On unclean shutdown, the index journal and glob may have different lengths:
25//! - Index entry pointing to non-existent glob data (dangerous)
26//! - Glob value without index entry (orphan - acceptable but cleaned up)
27//! - Glob sections without corresponding index sections (orphan sections - removed)
28//!
29//! During initialization, crash recovery is performed:
30//! 1. Each index entry's glob reference is validated (`value_offset + value_size <= glob_size`)
31//! 2. Invalid entries are skipped and the index journal is rewound
32//! 3. Orphan value sections (sections in glob but not in index) are removed
33//!
34//! This allows async writes (glob first, then index) while ensuring consistency
35//! after recovery.
36//!
37//! _Recovery only validates that index entries point to valid byte ranges
38//! within the glob. It does **not** verify value checksums during recovery (this would
39//! require reading all values). Value checksums are verified lazily when values are
40//! read via `get_value()`. If the underlying storage is corrupted, `get_value()` will
41//! return a checksum error even though the index entry exists._
42
43use super::{
44    fixed::{Config as FixedConfig, Journal as FixedJournal},
45    glob::{Config as GlobConfig, Glob},
46};
47use crate::journal::Error;
48use commonware_codec::{Codec, CodecFixed, CodecShared};
49use commonware_runtime::{Metrics, Storage};
50use futures::{future::try_join, stream::Stream};
51use std::{collections::HashSet, num::NonZeroUsize};
52use tracing::{debug, warn};
53
54/// Trait for index entries that reference oversized values in glob storage.
55///
56/// Implementations must provide access to the value location for crash recovery validation,
57/// and a way to set the location when appending.
58pub trait Record: CodecFixed<Cfg = ()> + Clone {
59    /// Returns `(value_offset, value_size)` for crash recovery validation.
60    fn value_location(&self) -> (u64, u32);
61
62    /// Returns a new entry with the value location set.
63    ///
64    /// Called during `append` after the value is written to glob storage.
65    fn with_location(self, offset: u64, size: u32) -> Self;
66}
67
68/// Configuration for oversized journal.
69#[derive(Clone)]
70pub struct Config<C> {
71    /// Partition for the fixed index journal.
72    pub index_partition: String,
73
74    /// Partition for the glob value storage.
75    pub value_partition: String,
76
77    /// Page cache for index journal caching.
78    pub index_page_cache: commonware_runtime::buffer::paged::CacheRef,
79
80    /// Write buffer size for the index journal.
81    pub index_write_buffer: NonZeroUsize,
82
83    /// Write buffer size for the value journal.
84    pub value_write_buffer: NonZeroUsize,
85
86    /// Optional compression level for values (using zstd).
87    pub compression: Option<u8>,
88
89    /// Codec configuration for values.
90    pub codec_config: C,
91}
92
93/// Segmented journal for entries with oversized values.
94///
95/// Combines a fixed-size index journal with glob storage for variable-length values.
96/// Provides coordinated operations and crash recovery.
97pub struct Oversized<E: Storage + Metrics, I: Record, V: Codec> {
98    index: FixedJournal<E, I>,
99    values: Glob<E, V>,
100}
101
102impl<E: Storage + Metrics, I: Record + Send + Sync, V: CodecShared> Oversized<E, I, V> {
103    /// Initialize with crash recovery validation.
104    ///
105    /// Validates each index entry's glob reference during replay. Invalid entries
106    /// (pointing beyond glob size) are skipped, and the index journal is rewound
107    /// to exclude trailing invalid entries.
108    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
109        // Initialize both journals
110        let index_cfg = FixedConfig {
111            partition: cfg.index_partition,
112            page_cache: cfg.index_page_cache,
113            write_buffer: cfg.index_write_buffer,
114        };
115        let index = FixedJournal::init(context.with_label("index"), index_cfg).await?;
116
117        let value_cfg = GlobConfig {
118            partition: cfg.value_partition,
119            compression: cfg.compression,
120            codec_config: cfg.codec_config,
121            write_buffer: cfg.value_write_buffer,
122        };
123        let values = Glob::init(context.with_label("values"), value_cfg).await?;
124
125        let mut oversized = Self { index, values };
126
127        // Perform crash recovery validation
128        oversized.recover().await?;
129
130        Ok(oversized)
131    }
132
133    /// Perform crash recovery by validating index entries against glob sizes.
134    ///
135    /// Only checks the last entry in each section. Since entries are appended sequentially
136    /// and value offsets are monotonically increasing within a section, if the last entry
137    /// is valid then all earlier entries must be valid too.
138    async fn recover(&mut self) -> Result<(), Error> {
139        let chunk_size = FixedJournal::<E, I>::CHUNK_SIZE as u64;
140        let sections: Vec<u64> = self.index.sections().collect();
141
142        for section in sections {
143            let index_size = self.index.size(section).await?;
144            if index_size == 0 {
145                continue;
146            }
147
148            let glob_size = match self.values.size(section).await {
149                Ok(size) => size,
150                Err(Error::AlreadyPrunedToSection(oldest)) => {
151                    // This shouldn't happen in normal operation: prune() prunes the index
152                    // first, then the glob. A crash between these would leave the glob
153                    // NOT pruned (opposite of this case). We handle this defensively in
154                    // case of external manipulation or future changes.
155                    warn!(
156                        section,
157                        oldest, "index has section that glob already pruned"
158                    );
159                    0
160                }
161                Err(e) => return Err(e),
162            };
163
164            // Truncate any trailing partial entry
165            let entry_count = index_size / chunk_size;
166            let aligned_size = entry_count * chunk_size;
167            if aligned_size < index_size {
168                warn!(
169                    section,
170                    index_size, aligned_size, "trailing bytes detected: truncating"
171                );
172                self.index.rewind_section(section, aligned_size).await?;
173            }
174
175            // If there is nothing, we can exit early and rewind values to 0
176            if entry_count == 0 {
177                warn!(
178                    section,
179                    index_size, "trailing bytes detected: truncating to 0"
180                );
181                self.values.rewind_section(section, 0).await?;
182                continue;
183            }
184
185            // Find last valid entry and target glob size
186            let (valid_count, glob_target) = self
187                .find_last_valid_entry(section, entry_count, glob_size)
188                .await;
189
190            // Rewind index if any entries are invalid
191            if valid_count < entry_count {
192                let valid_size = valid_count * chunk_size;
193                debug!(section, entry_count, valid_count, "rewinding index");
194                self.index.rewind_section(section, valid_size).await?;
195            }
196
197            // Truncate glob trailing garbage (can occur when value was written but
198            // index entry wasn't, or when index was truncated but glob wasn't)
199            if glob_size > glob_target {
200                debug!(
201                    section,
202                    glob_size, glob_target, "truncating glob trailing garbage"
203                );
204                self.values.rewind_section(section, glob_target).await?;
205            }
206        }
207
208        // Clean up orphan value sections that don't exist in index
209        self.cleanup_orphan_value_sections().await?;
210
211        Ok(())
212    }
213
214    /// Remove any value sections that don't have corresponding index sections.
215    ///
216    /// This can happen if a crash occurs after writing to values but before
217    /// writing to index for a new section. Since sections don't have to be
218    /// contiguous, we compare the actual sets of sections rather than just
219    /// comparing the newest section numbers.
220    async fn cleanup_orphan_value_sections(&mut self) -> Result<(), Error> {
221        // Collect index sections into a set for O(1) lookup
222        let index_sections: HashSet<u64> = self.index.sections().collect();
223
224        // Find value sections that don't exist in index
225        let orphan_sections: Vec<u64> = self
226            .values
227            .sections()
228            .filter(|s| !index_sections.contains(s))
229            .collect();
230
231        // Remove each orphan section
232        for section in orphan_sections {
233            warn!(section, "removing orphan value section");
234            self.values.remove_section(section).await?;
235        }
236
237        Ok(())
238    }
239
240    /// Find the number of valid entries and the corresponding glob target size.
241    ///
242    /// Scans backwards from the last entry until a valid one is found.
243    /// Returns `(valid_count, glob_target)` where `glob_target` is the end offset
244    /// of the last valid entry's value.
245    async fn find_last_valid_entry(
246        &self,
247        section: u64,
248        entry_count: u64,
249        glob_size: u64,
250    ) -> (u64, u64) {
251        for pos in (0..entry_count).rev() {
252            match self.index.get(section, pos).await {
253                Ok(entry) => {
254                    let (offset, size) = entry.value_location();
255                    let entry_end = offset.saturating_add(u64::from(size));
256                    if entry_end <= glob_size {
257                        return (pos + 1, entry_end);
258                    }
259                    if pos == entry_count - 1 {
260                        warn!(
261                            section,
262                            pos, glob_size, entry_end, "invalid entry: glob truncated"
263                        );
264                    }
265                }
266                Err(_) => {
267                    if pos == entry_count - 1 {
268                        warn!(section, pos, "corrupted last entry, scanning backwards");
269                    }
270                }
271            }
272        }
273        (0, 0)
274    }
275
276    /// Append entry + value.
277    ///
278    /// Writes value to glob first, then writes index entry with the value location.
279    ///
280    /// Returns `(position, offset, size)` where:
281    /// - `position`: Position in the index journal
282    /// - `offset`: Byte offset in glob
283    /// - `size`: Size of value in glob (including checksum)
284    pub async fn append(
285        &mut self,
286        section: u64,
287        entry: I,
288        value: &V,
289    ) -> Result<(u64, u64, u32), Error> {
290        // Write value first (glob). This will typically write to an in-memory
291        // buffer and return quickly (only blocks when the buffer is full).
292        let (offset, size) = self.values.append(section, value).await?;
293
294        // Update entry with actual location and write to index
295        let entry_with_location = entry.with_location(offset, size);
296        let position = self.index.append(section, entry_with_location).await?;
297
298        Ok((position, offset, size))
299    }
300
301    /// Get entry at position (index entry only, not value).
302    pub async fn get(&self, section: u64, position: u64) -> Result<I, Error> {
303        self.index.get(section, position).await
304    }
305
306    /// Get the last entry for a section, if any.
307    pub async fn last(&self, section: u64) -> Result<Option<I>, Error> {
308        self.index.last(section).await
309    }
310
311    /// Get value using offset/size from entry.
312    ///
313    /// The offset should be the byte offset from `append()` or from the entry's `value_location()`.
314    pub async fn get_value(&self, section: u64, offset: u64, size: u32) -> Result<V, Error> {
315        self.values.get(section, offset, size).await
316    }
317
318    /// Replay index entries starting from given section.
319    ///
320    /// Returns a stream of `(section, position, entry)` tuples.
321    pub async fn replay(
322        &self,
323        start_section: u64,
324        start_position: u64,
325        buffer: NonZeroUsize,
326    ) -> Result<impl Stream<Item = Result<(u64, u64, I), Error>> + Send + '_, Error> {
327        self.index
328            .replay(start_section, start_position, buffer)
329            .await
330    }
331
332    /// Sync both journals for given section.
333    pub async fn sync(&self, section: u64) -> Result<(), Error> {
334        try_join(self.index.sync(section), self.values.sync(section))
335            .await
336            .map(|_| ())
337    }
338
339    /// Sync all sections.
340    pub async fn sync_all(&self) -> Result<(), Error> {
341        try_join(self.index.sync_all(), self.values.sync_all())
342            .await
343            .map(|_| ())
344    }
345
346    /// Prune both journals. Returns true if any sections were pruned.
347    ///
348    /// Prunes index first, then glob. This order ensures crash safety:
349    /// - If crash after index prune but before glob: orphan data in glob (acceptable)
350    /// - If crash before index prune: no change, retry works
351    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
352        let index_pruned = self.index.prune(min).await?;
353        let value_pruned = self.values.prune(min).await?;
354        Ok(index_pruned || value_pruned)
355    }
356
357    /// Rewind both journals to a specific section and index size.
358    ///
359    /// This rewinds the section to the given index size and removes all sections
360    /// after the given section. The value size is derived from the last entry.
361    pub async fn rewind(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
362        // Rewind index first (this also removes sections after `section`)
363        self.index.rewind(section, index_size).await?;
364
365        // Derive value size from last entry
366        let value_size = match self.index.last(section).await? {
367            Some(entry) => {
368                let (offset, size) = entry.value_location();
369                offset
370                    .checked_add(u64::from(size))
371                    .ok_or(Error::OffsetOverflow)?
372            }
373            None => 0,
374        };
375
376        // Rewind values (this also removes sections after `section`)
377        self.values.rewind(section, value_size).await
378    }
379
380    /// Rewind only the given section to a specific index size.
381    ///
382    /// Unlike `rewind`, this does not affect other sections.
383    /// The value size is derived from the last entry after rewinding the index.
384    pub async fn rewind_section(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
385        // Rewind index first
386        self.index.rewind_section(section, index_size).await?;
387
388        // Derive value size from last entry
389        let value_size = match self.index.last(section).await? {
390            Some(entry) => {
391                let (offset, size) = entry.value_location();
392                offset
393                    .checked_add(u64::from(size))
394                    .ok_or(Error::OffsetOverflow)?
395            }
396            None => 0,
397        };
398
399        // Rewind values
400        self.values.rewind_section(section, value_size).await
401    }
402
403    /// Get index size for checkpoint.
404    ///
405    /// The value size can be derived from the last entry's location when needed.
406    pub async fn size(&self, section: u64) -> Result<u64, Error> {
407        self.index.size(section).await
408    }
409
410    /// Get the value size for a section, derived from the last entry's location.
411    pub async fn value_size(&self, section: u64) -> Result<u64, Error> {
412        match self.index.last(section).await {
413            Ok(Some(entry)) => {
414                let (offset, size) = entry.value_location();
415                offset
416                    .checked_add(u64::from(size))
417                    .ok_or(Error::OffsetOverflow)
418            }
419            Ok(None) => Ok(0),
420            Err(Error::SectionOutOfRange(_)) => Ok(0),
421            Err(e) => Err(e),
422        }
423    }
424
425    /// Returns the oldest section number, if any exist.
426    pub fn oldest_section(&self) -> Option<u64> {
427        self.index.oldest_section()
428    }
429
430    /// Returns the newest section number, if any exist.
431    pub fn newest_section(&self) -> Option<u64> {
432        self.index.newest_section()
433    }
434
435    /// Destroy all underlying storage.
436    pub async fn destroy(self) -> Result<(), Error> {
437        try_join(self.index.destroy(), self.values.destroy())
438            .await
439            .map(|_| ())
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use commonware_codec::{FixedSize, Read, ReadExt, Write};
447    use commonware_cryptography::Crc32;
448    use commonware_macros::test_traced;
449    use commonware_runtime::{
450        buffer::paged::CacheRef, deterministic, Blob as _, Buf, BufMut, Metrics, Runner,
451    };
452    use commonware_utils::{NZUsize, NZU16};
453
454    /// Convert offset + size to byte end position (for truncation tests).
455    fn byte_end(offset: u64, size: u32) -> u64 {
456        offset + u64::from(size)
457    }
458
459    /// Test index entry that stores a u64 id and references a value.
460    #[derive(Debug, Clone, PartialEq)]
461    struct TestEntry {
462        id: u64,
463        value_offset: u64,
464        value_size: u32,
465    }
466
467    impl TestEntry {
468        fn new(id: u64, value_offset: u64, value_size: u32) -> Self {
469            Self {
470                id,
471                value_offset,
472                value_size,
473            }
474        }
475    }
476
477    impl Write for TestEntry {
478        fn write(&self, buf: &mut impl BufMut) {
479            self.id.write(buf);
480            self.value_offset.write(buf);
481            self.value_size.write(buf);
482        }
483    }
484
485    impl Read for TestEntry {
486        type Cfg = ();
487
488        fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
489            let id = u64::read(buf)?;
490            let value_offset = u64::read(buf)?;
491            let value_size = u32::read(buf)?;
492            Ok(Self {
493                id,
494                value_offset,
495                value_size,
496            })
497        }
498    }
499
500    impl FixedSize for TestEntry {
501        const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
502    }
503
504    impl Record for TestEntry {
505        fn value_location(&self) -> (u64, u32) {
506            (self.value_offset, self.value_size)
507        }
508
509        fn with_location(mut self, offset: u64, size: u32) -> Self {
510            self.value_offset = offset;
511            self.value_size = size;
512            self
513        }
514    }
515
516    fn test_cfg() -> Config<()> {
517        Config {
518            index_partition: "test_index".to_string(),
519            value_partition: "test_values".to_string(),
520            index_page_cache: CacheRef::new(NZU16!(64), NZUsize!(8)),
521            index_write_buffer: NZUsize!(1024),
522            value_write_buffer: NZUsize!(1024),
523            compression: None,
524            codec_config: (),
525        }
526    }
527
528    /// Simple test value type with unit config.
529    type TestValue = [u8; 16];
530
531    #[test_traced]
532    fn test_oversized_append_and_get() {
533        let executor = deterministic::Runner::default();
534        executor.start(|context| async move {
535            let mut oversized: Oversized<_, TestEntry, TestValue> =
536                Oversized::init(context.clone(), test_cfg())
537                    .await
538                    .expect("Failed to init");
539
540            // Append entry with value
541            let value: TestValue = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
542            let entry = TestEntry::new(42, 0, 0);
543            let (position, offset, size) = oversized
544                .append(1, entry, &value)
545                .await
546                .expect("Failed to append");
547
548            assert_eq!(position, 0);
549
550            // Get entry
551            let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
552            assert_eq!(retrieved_entry.id, 42);
553
554            // Get value
555            let retrieved_value = oversized
556                .get_value(1, offset, size)
557                .await
558                .expect("Failed to get value");
559            assert_eq!(retrieved_value, value);
560
561            oversized.destroy().await.expect("Failed to destroy");
562        });
563    }
564
565    #[test_traced]
566    fn test_oversized_crash_recovery() {
567        let executor = deterministic::Runner::default();
568        executor.start(|context| async move {
569            let cfg = test_cfg();
570
571            // Create and populate oversized journal
572            let mut oversized: Oversized<_, TestEntry, TestValue> =
573                Oversized::init(context.with_label("first"), cfg.clone())
574                    .await
575                    .expect("Failed to init");
576
577            // Append multiple entries
578            let mut locations = Vec::new();
579            for i in 0..5u8 {
580                let value: TestValue = [i; 16];
581                let entry = TestEntry::new(i as u64, 0, 0);
582                let (position, offset, size) = oversized
583                    .append(1, entry, &value)
584                    .await
585                    .expect("Failed to append");
586                locations.push((position, offset, size));
587            }
588            oversized.sync(1).await.expect("Failed to sync");
589            drop(oversized);
590
591            // Simulate crash: truncate glob to lose last 2 values
592            let (blob, _) = context
593                .open(&cfg.value_partition, &1u64.to_be_bytes())
594                .await
595                .expect("Failed to open blob");
596
597            // Calculate size to keep first 3 entries
598            let keep_size = byte_end(locations[2].1, locations[2].2);
599            blob.resize(keep_size).await.expect("Failed to truncate");
600            blob.sync().await.expect("Failed to sync");
601            drop(blob);
602
603            // Reinitialize - should recover and rewind index
604            let oversized: Oversized<_, TestEntry, TestValue> =
605                Oversized::init(context.with_label("second"), cfg.clone())
606                    .await
607                    .expect("Failed to reinit");
608
609            // First 3 entries should still be valid
610            for i in 0..3u8 {
611                let (position, offset, size) = locations[i as usize];
612                let entry = oversized.get(1, position).await.expect("Failed to get");
613                assert_eq!(entry.id, i as u64);
614
615                let value = oversized
616                    .get_value(1, offset, size)
617                    .await
618                    .expect("Failed to get value");
619                assert_eq!(value, [i; 16]);
620            }
621
622            // Entry at position 3 should fail (index was rewound)
623            let result = oversized.get(1, 3).await;
624            assert!(result.is_err());
625
626            oversized.destroy().await.expect("Failed to destroy");
627        });
628    }
629
630    #[test_traced]
631    fn test_oversized_persistence() {
632        let executor = deterministic::Runner::default();
633        executor.start(|context| async move {
634            let cfg = test_cfg();
635
636            // Create and populate
637            let mut oversized: Oversized<_, TestEntry, TestValue> =
638                Oversized::init(context.with_label("first"), cfg.clone())
639                    .await
640                    .expect("Failed to init");
641
642            let value: TestValue = [42; 16];
643            let entry = TestEntry::new(123, 0, 0);
644            let (position, offset, size) = oversized
645                .append(1, entry, &value)
646                .await
647                .expect("Failed to append");
648            oversized.sync(1).await.expect("Failed to sync");
649            drop(oversized);
650
651            // Reopen and verify
652            let oversized: Oversized<_, TestEntry, TestValue> =
653                Oversized::init(context.with_label("second"), cfg)
654                    .await
655                    .expect("Failed to reinit");
656
657            let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
658            assert_eq!(retrieved_entry.id, 123);
659
660            let retrieved_value = oversized
661                .get_value(1, offset, size)
662                .await
663                .expect("Failed to get value");
664            assert_eq!(retrieved_value, value);
665
666            oversized.destroy().await.expect("Failed to destroy");
667        });
668    }
669
670    #[test_traced]
671    fn test_oversized_prune() {
672        let executor = deterministic::Runner::default();
673        executor.start(|context| async move {
674            let mut oversized: Oversized<_, TestEntry, TestValue> =
675                Oversized::init(context.clone(), test_cfg())
676                    .await
677                    .expect("Failed to init");
678
679            // Append to multiple sections
680            for section in 1u64..=5 {
681                let value: TestValue = [section as u8; 16];
682                let entry = TestEntry::new(section, 0, 0);
683                oversized
684                    .append(section, entry, &value)
685                    .await
686                    .expect("Failed to append");
687                oversized.sync(section).await.expect("Failed to sync");
688            }
689
690            // Prune sections < 3
691            oversized.prune(3).await.expect("Failed to prune");
692
693            // Sections 1, 2 should be gone
694            assert!(oversized.get(1, 0).await.is_err());
695            assert!(oversized.get(2, 0).await.is_err());
696
697            // Sections 3, 4, 5 should exist
698            assert!(oversized.get(3, 0).await.is_ok());
699            assert!(oversized.get(4, 0).await.is_ok());
700            assert!(oversized.get(5, 0).await.is_ok());
701
702            oversized.destroy().await.expect("Failed to destroy");
703        });
704    }
705
706    #[test_traced]
707    fn test_recovery_empty_section() {
708        let executor = deterministic::Runner::default();
709        executor.start(|context| async move {
710            let cfg = test_cfg();
711
712            // Create oversized journal
713            let mut oversized: Oversized<_, TestEntry, TestValue> =
714                Oversized::init(context.with_label("first"), cfg.clone())
715                    .await
716                    .expect("Failed to init");
717
718            // Append to section 2 only (section 1 remains empty after being opened)
719            let value: TestValue = [42; 16];
720            let entry = TestEntry::new(1, 0, 0);
721            oversized
722                .append(2, entry, &value)
723                .await
724                .expect("Failed to append");
725            oversized.sync(2).await.expect("Failed to sync");
726            drop(oversized);
727
728            // Reinitialize - recovery should handle the empty/non-existent section 1
729            let oversized: Oversized<_, TestEntry, TestValue> =
730                Oversized::init(context.with_label("second"), cfg)
731                    .await
732                    .expect("Failed to reinit");
733
734            // Section 2 entry should be valid
735            let entry = oversized.get(2, 0).await.expect("Failed to get");
736            assert_eq!(entry.id, 1);
737
738            oversized.destroy().await.expect("Failed to destroy");
739        });
740    }
741
742    #[test_traced]
743    fn test_recovery_all_entries_invalid() {
744        let executor = deterministic::Runner::default();
745        executor.start(|context| async move {
746            let cfg = test_cfg();
747
748            // Create and populate
749            let mut oversized: Oversized<_, TestEntry, TestValue> =
750                Oversized::init(context.with_label("first"), cfg.clone())
751                    .await
752                    .expect("Failed to init");
753
754            // Append 5 entries
755            for i in 0..5u8 {
756                let value: TestValue = [i; 16];
757                let entry = TestEntry::new(i as u64, 0, 0);
758                oversized
759                    .append(1, entry, &value)
760                    .await
761                    .expect("Failed to append");
762            }
763            oversized.sync(1).await.expect("Failed to sync");
764            drop(oversized);
765
766            // Truncate glob to 0 bytes - ALL entries become invalid
767            let (blob, _) = context
768                .open(&cfg.value_partition, &1u64.to_be_bytes())
769                .await
770                .expect("Failed to open blob");
771            blob.resize(0).await.expect("Failed to truncate");
772            blob.sync().await.expect("Failed to sync");
773            drop(blob);
774
775            // Reinitialize - should recover and rewind index to 0
776            let mut oversized: Oversized<_, TestEntry, TestValue> =
777                Oversized::init(context.with_label("second"), cfg)
778                    .await
779                    .expect("Failed to reinit");
780
781            // No entries should be accessible
782            let result = oversized.get(1, 0).await;
783            assert!(result.is_err());
784
785            // Should be able to append after recovery
786            let value: TestValue = [99; 16];
787            let entry = TestEntry::new(100, 0, 0);
788            let (pos, offset, size) = oversized
789                .append(1, entry, &value)
790                .await
791                .expect("Failed to append after recovery");
792            assert_eq!(pos, 0);
793
794            let retrieved = oversized.get(1, 0).await.expect("Failed to get");
795            assert_eq!(retrieved.id, 100);
796            let retrieved_value = oversized
797                .get_value(1, offset, size)
798                .await
799                .expect("Failed to get value");
800            assert_eq!(retrieved_value, value);
801
802            oversized.destroy().await.expect("Failed to destroy");
803        });
804    }
805
806    #[test_traced]
807    fn test_recovery_multiple_sections_mixed_validity() {
808        let executor = deterministic::Runner::default();
809        executor.start(|context| async move {
810            let cfg = test_cfg();
811
812            // Create and populate multiple sections
813            let mut oversized: Oversized<_, TestEntry, TestValue> =
814                Oversized::init(context.with_label("first"), cfg.clone())
815                    .await
816                    .expect("Failed to init");
817
818            // Section 1: 3 entries
819            let mut section1_locations = Vec::new();
820            for i in 0..3u8 {
821                let value: TestValue = [i; 16];
822                let entry = TestEntry::new(i as u64, 0, 0);
823                let loc = oversized
824                    .append(1, entry, &value)
825                    .await
826                    .expect("Failed to append");
827                section1_locations.push(loc);
828            }
829            oversized.sync(1).await.expect("Failed to sync");
830
831            // Section 2: 5 entries
832            let mut section2_locations = Vec::new();
833            for i in 0..5u8 {
834                let value: TestValue = [10 + i; 16];
835                let entry = TestEntry::new(10 + i as u64, 0, 0);
836                let loc = oversized
837                    .append(2, entry, &value)
838                    .await
839                    .expect("Failed to append");
840                section2_locations.push(loc);
841            }
842            oversized.sync(2).await.expect("Failed to sync");
843
844            // Section 3: 2 entries
845            for i in 0..2u8 {
846                let value: TestValue = [20 + i; 16];
847                let entry = TestEntry::new(20 + i as u64, 0, 0);
848                oversized
849                    .append(3, entry, &value)
850                    .await
851                    .expect("Failed to append");
852            }
853            oversized.sync(3).await.expect("Failed to sync");
854            drop(oversized);
855
856            // Truncate section 1 glob to keep only first entry
857            let (blob, _) = context
858                .open(&cfg.value_partition, &1u64.to_be_bytes())
859                .await
860                .expect("Failed to open blob");
861            let keep_size = byte_end(section1_locations[0].1, section1_locations[0].2);
862            blob.resize(keep_size).await.expect("Failed to truncate");
863            blob.sync().await.expect("Failed to sync");
864            drop(blob);
865
866            // Truncate section 2 glob to keep first 3 entries
867            let (blob, _) = context
868                .open(&cfg.value_partition, &2u64.to_be_bytes())
869                .await
870                .expect("Failed to open blob");
871            let keep_size = byte_end(section2_locations[2].1, section2_locations[2].2);
872            blob.resize(keep_size).await.expect("Failed to truncate");
873            blob.sync().await.expect("Failed to sync");
874            drop(blob);
875
876            // Section 3 remains intact
877
878            // Reinitialize
879            let oversized: Oversized<_, TestEntry, TestValue> =
880                Oversized::init(context.with_label("second"), cfg)
881                    .await
882                    .expect("Failed to reinit");
883
884            // Section 1: only position 0 valid
885            assert!(oversized.get(1, 0).await.is_ok());
886            assert!(oversized.get(1, 1).await.is_err());
887            assert!(oversized.get(1, 2).await.is_err());
888
889            // Section 2: positions 0,1,2 valid
890            assert!(oversized.get(2, 0).await.is_ok());
891            assert!(oversized.get(2, 1).await.is_ok());
892            assert!(oversized.get(2, 2).await.is_ok());
893            assert!(oversized.get(2, 3).await.is_err());
894            assert!(oversized.get(2, 4).await.is_err());
895
896            // Section 3: both positions valid
897            assert!(oversized.get(3, 0).await.is_ok());
898            assert!(oversized.get(3, 1).await.is_ok());
899
900            oversized.destroy().await.expect("Failed to destroy");
901        });
902    }
903
904    #[test_traced]
905    fn test_recovery_corrupted_last_index_entry() {
906        let executor = deterministic::Runner::default();
907        executor.start(|context| async move {
908            // Use page size = entry size so each entry is on its own page.
909            // This allows corrupting just the last entry's page without affecting others.
910            // Physical page size = TestEntry::SIZE (20) + 12 (CRC record) = 32 bytes.
911            let cfg = Config {
912                index_partition: "test_index".to_string(),
913                value_partition: "test_values".to_string(),
914                index_page_cache: CacheRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
915                index_write_buffer: NZUsize!(1024),
916                value_write_buffer: NZUsize!(1024),
917                compression: None,
918                codec_config: (),
919            };
920
921            // Create and populate
922            let mut oversized: Oversized<_, TestEntry, TestValue> =
923                Oversized::init(context.with_label("first"), cfg.clone())
924                    .await
925                    .expect("Failed to init");
926
927            // Append 5 entries (each on its own page)
928            for i in 0..5u8 {
929                let value: TestValue = [i; 16];
930                let entry = TestEntry::new(i as u64, 0, 0);
931                oversized
932                    .append(1, entry, &value)
933                    .await
934                    .expect("Failed to append");
935            }
936            oversized.sync(1).await.expect("Failed to sync");
937            drop(oversized);
938
939            // Corrupt the last page's CRC to trigger page-level integrity failure
940            let (blob, size) = context
941                .open(&cfg.index_partition, &1u64.to_be_bytes())
942                .await
943                .expect("Failed to open blob");
944
945            // Physical page size = 20 + 12 = 32 bytes
946            // 5 entries = 5 pages = 160 bytes total
947            // Last page CRC starts at offset 160 - 12 = 148
948            assert_eq!(size, 160);
949            let last_page_crc_offset = size - 12;
950            blob.write_at(last_page_crc_offset, vec![0xFF; 12])
951                .await
952                .expect("Failed to corrupt");
953            blob.sync().await.expect("Failed to sync");
954            drop(blob);
955
956            // Reinitialize - should detect page corruption and truncate
957            let mut oversized: Oversized<_, TestEntry, TestValue> =
958                Oversized::init(context.with_label("second"), cfg)
959                    .await
960                    .expect("Failed to reinit");
961
962            // First 4 entries should be valid (on pages 0-3)
963            for i in 0..4u8 {
964                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
965                assert_eq!(entry.id, i as u64);
966            }
967
968            // Entry 4 should be gone (its page was corrupted)
969            assert!(oversized.get(1, 4).await.is_err());
970
971            // Should be able to append after recovery
972            let value: TestValue = [99; 16];
973            let entry = TestEntry::new(100, 0, 0);
974            let (pos, offset, size) = oversized
975                .append(1, entry, &value)
976                .await
977                .expect("Failed to append after recovery");
978            assert_eq!(pos, 4);
979
980            let retrieved = oversized.get(1, 4).await.expect("Failed to get");
981            assert_eq!(retrieved.id, 100);
982            let retrieved_value = oversized
983                .get_value(1, offset, size)
984                .await
985                .expect("Failed to get value");
986            assert_eq!(retrieved_value, value);
987
988            oversized.destroy().await.expect("Failed to destroy");
989        });
990    }
991
992    #[test_traced]
993    fn test_recovery_all_entries_valid() {
994        let executor = deterministic::Runner::default();
995        executor.start(|context| async move {
996            let cfg = test_cfg();
997
998            // Create and populate
999            let mut oversized: Oversized<_, TestEntry, TestValue> =
1000                Oversized::init(context.with_label("first"), cfg.clone())
1001                    .await
1002                    .expect("Failed to init");
1003
1004            // Append entries to multiple sections
1005            for section in 1u64..=3 {
1006                for i in 0..10u8 {
1007                    let value: TestValue = [(section as u8) * 10 + i; 16];
1008                    let entry = TestEntry::new(section * 100 + i as u64, 0, 0);
1009                    oversized
1010                        .append(section, entry, &value)
1011                        .await
1012                        .expect("Failed to append");
1013                }
1014                oversized.sync(section).await.expect("Failed to sync");
1015            }
1016            drop(oversized);
1017
1018            // Reinitialize with no corruption - should be fast
1019            let oversized: Oversized<_, TestEntry, TestValue> =
1020                Oversized::init(context.with_label("second"), cfg)
1021                    .await
1022                    .expect("Failed to reinit");
1023
1024            // All entries should be valid
1025            for section in 1u64..=3 {
1026                for i in 0..10u8 {
1027                    let entry = oversized
1028                        .get(section, i as u64)
1029                        .await
1030                        .expect("Failed to get");
1031                    assert_eq!(entry.id, section * 100 + i as u64);
1032                }
1033            }
1034
1035            oversized.destroy().await.expect("Failed to destroy");
1036        });
1037    }
1038
1039    #[test_traced]
1040    fn test_recovery_single_entry_invalid() {
1041        let executor = deterministic::Runner::default();
1042        executor.start(|context| async move {
1043            let cfg = test_cfg();
1044
1045            // Create and populate with single entry
1046            let mut oversized: Oversized<_, TestEntry, TestValue> =
1047                Oversized::init(context.with_label("first"), cfg.clone())
1048                    .await
1049                    .expect("Failed to init");
1050
1051            let value: TestValue = [42; 16];
1052            let entry = TestEntry::new(1, 0, 0);
1053            oversized
1054                .append(1, entry, &value)
1055                .await
1056                .expect("Failed to append");
1057            oversized.sync(1).await.expect("Failed to sync");
1058            drop(oversized);
1059
1060            // Truncate glob to 0 - single entry becomes invalid
1061            let (blob, _) = context
1062                .open(&cfg.value_partition, &1u64.to_be_bytes())
1063                .await
1064                .expect("Failed to open blob");
1065            blob.resize(0).await.expect("Failed to truncate");
1066            blob.sync().await.expect("Failed to sync");
1067            drop(blob);
1068
1069            // Reinitialize
1070            let oversized: Oversized<_, TestEntry, TestValue> =
1071                Oversized::init(context.with_label("second"), cfg)
1072                    .await
1073                    .expect("Failed to reinit");
1074
1075            // Entry should be gone
1076            assert!(oversized.get(1, 0).await.is_err());
1077
1078            oversized.destroy().await.expect("Failed to destroy");
1079        });
1080    }
1081
1082    #[test_traced]
1083    fn test_recovery_last_entry_off_by_one() {
1084        let executor = deterministic::Runner::default();
1085        executor.start(|context| async move {
1086            let cfg = test_cfg();
1087
1088            // Create and populate
1089            let mut oversized: Oversized<_, TestEntry, TestValue> =
1090                Oversized::init(context.with_label("first"), cfg.clone())
1091                    .await
1092                    .expect("Failed to init");
1093
1094            let mut locations = Vec::new();
1095            for i in 0..3u8 {
1096                let value: TestValue = [i; 16];
1097                let entry = TestEntry::new(i as u64, 0, 0);
1098                let loc = oversized
1099                    .append(1, entry, &value)
1100                    .await
1101                    .expect("Failed to append");
1102                locations.push(loc);
1103            }
1104            oversized.sync(1).await.expect("Failed to sync");
1105            drop(oversized);
1106
1107            // Truncate glob to be off by 1 byte from last entry
1108            let (blob, _) = context
1109                .open(&cfg.value_partition, &1u64.to_be_bytes())
1110                .await
1111                .expect("Failed to open blob");
1112
1113            // Last entry needs: offset + size bytes
1114            // Truncate to offset + size - 1 (missing 1 byte)
1115            let last = &locations[2];
1116            let truncate_to = byte_end(last.1, last.2) - 1;
1117            blob.resize(truncate_to).await.expect("Failed to truncate");
1118            blob.sync().await.expect("Failed to sync");
1119            drop(blob);
1120
1121            // Reinitialize
1122            let mut oversized: Oversized<_, TestEntry, TestValue> =
1123                Oversized::init(context.with_label("second"), cfg)
1124                    .await
1125                    .expect("Failed to reinit");
1126
1127            // First 2 entries should be valid
1128            assert!(oversized.get(1, 0).await.is_ok());
1129            assert!(oversized.get(1, 1).await.is_ok());
1130
1131            // Entry 2 should be gone (truncated)
1132            assert!(oversized.get(1, 2).await.is_err());
1133
1134            // Should be able to append after recovery
1135            let value: TestValue = [99; 16];
1136            let entry = TestEntry::new(100, 0, 0);
1137            let (pos, offset, size) = oversized
1138                .append(1, entry, &value)
1139                .await
1140                .expect("Failed to append after recovery");
1141            assert_eq!(pos, 2);
1142
1143            let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1144            assert_eq!(retrieved.id, 100);
1145            let retrieved_value = oversized
1146                .get_value(1, offset, size)
1147                .await
1148                .expect("Failed to get value");
1149            assert_eq!(retrieved_value, value);
1150
1151            oversized.destroy().await.expect("Failed to destroy");
1152        });
1153    }
1154
1155    #[test_traced]
1156    fn test_recovery_glob_missing_entirely() {
1157        let executor = deterministic::Runner::default();
1158        executor.start(|context| async move {
1159            let cfg = test_cfg();
1160
1161            // Create and populate
1162            let mut oversized: Oversized<_, TestEntry, TestValue> =
1163                Oversized::init(context.with_label("first"), cfg.clone())
1164                    .await
1165                    .expect("Failed to init");
1166
1167            for i in 0..3u8 {
1168                let value: TestValue = [i; 16];
1169                let entry = TestEntry::new(i as u64, 0, 0);
1170                oversized
1171                    .append(1, entry, &value)
1172                    .await
1173                    .expect("Failed to append");
1174            }
1175            oversized.sync(1).await.expect("Failed to sync");
1176            drop(oversized);
1177
1178            // Delete the glob file entirely
1179            context
1180                .remove(&cfg.value_partition, Some(&1u64.to_be_bytes()))
1181                .await
1182                .expect("Failed to remove");
1183
1184            // Reinitialize - glob size will be 0, all entries invalid
1185            let oversized: Oversized<_, TestEntry, TestValue> =
1186                Oversized::init(context.with_label("second"), cfg)
1187                    .await
1188                    .expect("Failed to reinit");
1189
1190            // All entries should be gone
1191            assert!(oversized.get(1, 0).await.is_err());
1192            assert!(oversized.get(1, 1).await.is_err());
1193            assert!(oversized.get(1, 2).await.is_err());
1194
1195            oversized.destroy().await.expect("Failed to destroy");
1196        });
1197    }
1198
1199    #[test_traced]
1200    fn test_recovery_can_append_after_recovery() {
1201        let executor = deterministic::Runner::default();
1202        executor.start(|context| async move {
1203            let cfg = test_cfg();
1204
1205            // Create and populate
1206            let mut oversized: Oversized<_, TestEntry, TestValue> =
1207                Oversized::init(context.with_label("first"), cfg.clone())
1208                    .await
1209                    .expect("Failed to init");
1210
1211            let mut locations = Vec::new();
1212            for i in 0..5u8 {
1213                let value: TestValue = [i; 16];
1214                let entry = TestEntry::new(i as u64, 0, 0);
1215                let loc = oversized
1216                    .append(1, entry, &value)
1217                    .await
1218                    .expect("Failed to append");
1219                locations.push(loc);
1220            }
1221            oversized.sync(1).await.expect("Failed to sync");
1222            drop(oversized);
1223
1224            // Truncate glob to keep only first 2 entries
1225            let (blob, _) = context
1226                .open(&cfg.value_partition, &1u64.to_be_bytes())
1227                .await
1228                .expect("Failed to open blob");
1229            let keep_size = byte_end(locations[1].1, locations[1].2);
1230            blob.resize(keep_size).await.expect("Failed to truncate");
1231            blob.sync().await.expect("Failed to sync");
1232            drop(blob);
1233
1234            // Reinitialize
1235            let mut oversized: Oversized<_, TestEntry, TestValue> =
1236                Oversized::init(context.with_label("second"), cfg.clone())
1237                    .await
1238                    .expect("Failed to reinit");
1239
1240            // Verify first 2 entries exist
1241            assert!(oversized.get(1, 0).await.is_ok());
1242            assert!(oversized.get(1, 1).await.is_ok());
1243            assert!(oversized.get(1, 2).await.is_err());
1244
1245            // Append new entries after recovery
1246            for i in 10..15u8 {
1247                let value: TestValue = [i; 16];
1248                let entry = TestEntry::new(i as u64, 0, 0);
1249                oversized
1250                    .append(1, entry, &value)
1251                    .await
1252                    .expect("Failed to append after recovery");
1253            }
1254            oversized.sync(1).await.expect("Failed to sync");
1255
1256            // Verify new entries at positions 2, 3, 4, 5, 6
1257            for i in 0..5u8 {
1258                let entry = oversized
1259                    .get(1, 2 + i as u64)
1260                    .await
1261                    .expect("Failed to get new entry");
1262                assert_eq!(entry.id, (10 + i) as u64);
1263            }
1264
1265            oversized.destroy().await.expect("Failed to destroy");
1266        });
1267    }
1268
1269    #[test_traced]
1270    fn test_recovery_glob_pruned_but_index_not() {
1271        let executor = deterministic::Runner::default();
1272        executor.start(|context| async move {
1273            let cfg = test_cfg();
1274
1275            // Create and populate multiple sections
1276            let mut oversized: Oversized<_, TestEntry, TestValue> =
1277                Oversized::init(context.with_label("first"), cfg.clone())
1278                    .await
1279                    .expect("Failed to init");
1280
1281            for section in 1u64..=3 {
1282                let value: TestValue = [section as u8; 16];
1283                let entry = TestEntry::new(section, 0, 0);
1284                oversized
1285                    .append(section, entry, &value)
1286                    .await
1287                    .expect("Failed to append");
1288                oversized.sync(section).await.expect("Failed to sync");
1289            }
1290            drop(oversized);
1291
1292            // Simulate crash during prune: prune ONLY the glob, not the index
1293            // This creates the "glob pruned but index not" scenario
1294            use crate::journal::segmented::glob::{Config as GlobConfig, Glob};
1295            let glob_cfg = GlobConfig {
1296                partition: cfg.value_partition.clone(),
1297                compression: cfg.compression,
1298                codec_config: (),
1299                write_buffer: cfg.value_write_buffer,
1300            };
1301            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1302                .await
1303                .expect("Failed to init glob");
1304            glob.prune(2).await.expect("Failed to prune glob");
1305            glob.sync_all().await.expect("Failed to sync glob");
1306            drop(glob);
1307
1308            // Reinitialize - should recover gracefully with warning
1309            // Index section 1 will be rewound to 0 entries
1310            let oversized: Oversized<_, TestEntry, TestValue> =
1311                Oversized::init(context.with_label("second"), cfg.clone())
1312                    .await
1313                    .expect("Failed to reinit");
1314
1315            // Section 1 entries should be gone (index rewound due to glob pruned)
1316            assert!(oversized.get(1, 0).await.is_err());
1317
1318            // Sections 2 and 3 should still be valid
1319            assert!(oversized.get(2, 0).await.is_ok());
1320            assert!(oversized.get(3, 0).await.is_ok());
1321
1322            oversized.destroy().await.expect("Failed to destroy");
1323        });
1324    }
1325
1326    #[test_traced]
1327    fn test_recovery_index_partition_deleted() {
1328        let executor = deterministic::Runner::default();
1329        executor.start(|context| async move {
1330            let cfg = test_cfg();
1331
1332            // Create and populate multiple sections
1333            let mut oversized: Oversized<_, TestEntry, TestValue> =
1334                Oversized::init(context.with_label("first"), cfg.clone())
1335                    .await
1336                    .expect("Failed to init");
1337
1338            for section in 1u64..=3 {
1339                let value: TestValue = [section as u8; 16];
1340                let entry = TestEntry::new(section, 0, 0);
1341                oversized
1342                    .append(section, entry, &value)
1343                    .await
1344                    .expect("Failed to append");
1345                oversized.sync(section).await.expect("Failed to sync");
1346            }
1347            drop(oversized);
1348
1349            // Delete index blob for section 2 (simulate corruption/loss)
1350            context
1351                .remove(&cfg.index_partition, Some(&2u64.to_be_bytes()))
1352                .await
1353                .expect("Failed to remove index");
1354
1355            // Reinitialize - should handle gracefully
1356            // Section 2 is gone from index, orphan data in glob is acceptable
1357            let oversized: Oversized<_, TestEntry, TestValue> =
1358                Oversized::init(context.with_label("second"), cfg.clone())
1359                    .await
1360                    .expect("Failed to reinit");
1361
1362            // Section 1 and 3 should still be valid
1363            assert!(oversized.get(1, 0).await.is_ok());
1364            assert!(oversized.get(3, 0).await.is_ok());
1365
1366            // Section 2 should be gone (index file deleted)
1367            assert!(oversized.get(2, 0).await.is_err());
1368
1369            oversized.destroy().await.expect("Failed to destroy");
1370        });
1371    }
1372
1373    #[test_traced]
1374    fn test_recovery_index_synced_but_glob_not() {
1375        let executor = deterministic::Runner::default();
1376        executor.start(|context| async move {
1377            let cfg = test_cfg();
1378
1379            // Create and populate
1380            let mut oversized: Oversized<_, TestEntry, TestValue> =
1381                Oversized::init(context.with_label("first"), cfg.clone())
1382                    .await
1383                    .expect("Failed to init");
1384
1385            // Append entries and sync
1386            let mut locations = Vec::new();
1387            for i in 0..3u8 {
1388                let value: TestValue = [i; 16];
1389                let entry = TestEntry::new(i as u64, 0, 0);
1390                let loc = oversized
1391                    .append(1, entry, &value)
1392                    .await
1393                    .expect("Failed to append");
1394                locations.push(loc);
1395            }
1396            oversized.sync(1).await.expect("Failed to sync");
1397
1398            // Add more entries WITHOUT syncing (simulates unsynced writes)
1399            for i in 10..15u8 {
1400                let value: TestValue = [i; 16];
1401                let entry = TestEntry::new(i as u64, 0, 0);
1402                oversized
1403                    .append(1, entry, &value)
1404                    .await
1405                    .expect("Failed to append");
1406            }
1407            // Note: NOT calling sync() here
1408            drop(oversized);
1409
1410            // Simulate crash where index was synced but glob wasn't:
1411            // Truncate glob back to the synced size (3 entries)
1412            let (blob, _) = context
1413                .open(&cfg.value_partition, &1u64.to_be_bytes())
1414                .await
1415                .expect("Failed to open blob");
1416            let synced_size = byte_end(locations[2].1, locations[2].2);
1417            blob.resize(synced_size).await.expect("Failed to truncate");
1418            blob.sync().await.expect("Failed to sync");
1419            drop(blob);
1420
1421            // Reinitialize - should rewind index to match glob
1422            let oversized: Oversized<_, TestEntry, TestValue> =
1423                Oversized::init(context.with_label("second"), cfg)
1424                    .await
1425                    .expect("Failed to reinit");
1426
1427            // First 3 entries should be valid
1428            for i in 0..3u8 {
1429                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1430                assert_eq!(entry.id, i as u64);
1431            }
1432
1433            // Entries 3-7 should be gone (unsynced, index rewound)
1434            assert!(oversized.get(1, 3).await.is_err());
1435
1436            oversized.destroy().await.expect("Failed to destroy");
1437        });
1438    }
1439
1440    #[test_traced]
1441    fn test_recovery_glob_synced_but_index_not() {
1442        let executor = deterministic::Runner::default();
1443        executor.start(|context| async move {
1444            // Use page size = entry size so each entry is exactly one page.
1445            // This allows truncating by entry count to equal truncating by full pages,
1446            // maintaining page-level integrity.
1447            let cfg = Config {
1448                index_partition: "test_index".to_string(),
1449                value_partition: "test_values".to_string(),
1450                index_page_cache: CacheRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
1451                index_write_buffer: NZUsize!(1024),
1452                value_write_buffer: NZUsize!(1024),
1453                compression: None,
1454                codec_config: (),
1455            };
1456
1457            // Create and populate
1458            let mut oversized: Oversized<_, TestEntry, TestValue> =
1459                Oversized::init(context.with_label("first"), cfg.clone())
1460                    .await
1461                    .expect("Failed to init");
1462
1463            // Append entries and sync
1464            let mut locations = Vec::new();
1465            for i in 0..3u8 {
1466                let value: TestValue = [i; 16];
1467                let entry = TestEntry::new(i as u64, 0, 0);
1468                let loc = oversized
1469                    .append(1, entry, &value)
1470                    .await
1471                    .expect("Failed to append");
1472                locations.push(loc);
1473            }
1474            oversized.sync(1).await.expect("Failed to sync");
1475            drop(oversized);
1476
1477            // Simulate crash: truncate INDEX but leave GLOB intact
1478            // This creates orphan data in glob (glob ahead of index)
1479            let (blob, _size) = context
1480                .open(&cfg.index_partition, &1u64.to_be_bytes())
1481                .await
1482                .expect("Failed to open blob");
1483
1484            // Keep only first 2 index entries (2 full pages)
1485            // Physical page size = logical (20) + CRC record (12) = 32 bytes
1486            let physical_page_size = (TestEntry::SIZE + 12) as u64;
1487            blob.resize(2 * physical_page_size)
1488                .await
1489                .expect("Failed to truncate");
1490            blob.sync().await.expect("Failed to sync");
1491            drop(blob);
1492
1493            // Reinitialize - glob has orphan data from entry 3
1494            let mut oversized: Oversized<_, TestEntry, TestValue> =
1495                Oversized::init(context.with_label("second"), cfg.clone())
1496                    .await
1497                    .expect("Failed to reinit");
1498
1499            // First 2 entries should be valid
1500            for i in 0..2u8 {
1501                let (position, offset, size) = locations[i as usize];
1502                let entry = oversized.get(1, position).await.expect("Failed to get");
1503                assert_eq!(entry.id, i as u64);
1504
1505                let value = oversized
1506                    .get_value(1, offset, size)
1507                    .await
1508                    .expect("Failed to get value");
1509                assert_eq!(value, [i; 16]);
1510            }
1511
1512            // Entry at position 2 should fail (index was truncated)
1513            assert!(oversized.get(1, 2).await.is_err());
1514
1515            // Append new entries - should work despite orphan data in glob
1516            let mut new_locations = Vec::new();
1517            for i in 10..13u8 {
1518                let value: TestValue = [i; 16];
1519                let entry = TestEntry::new(i as u64, 0, 0);
1520                let (position, offset, size) = oversized
1521                    .append(1, entry, &value)
1522                    .await
1523                    .expect("Failed to append after recovery");
1524
1525                // New entries start at position 2 (after the 2 valid entries)
1526                assert_eq!(position, (i - 10 + 2) as u64);
1527                new_locations.push((position, offset, size, i));
1528
1529                // Verify we can read the new entry
1530                let retrieved = oversized.get(1, position).await.expect("Failed to get");
1531                assert_eq!(retrieved.id, i as u64);
1532
1533                let retrieved_value = oversized
1534                    .get_value(1, offset, size)
1535                    .await
1536                    .expect("Failed to get value");
1537                assert_eq!(retrieved_value, value);
1538            }
1539
1540            // Sync and restart again to verify persistence with orphan data
1541            oversized.sync(1).await.expect("Failed to sync");
1542            drop(oversized);
1543
1544            // Reinitialize after adding data on top of orphan glob data
1545            let oversized: Oversized<_, TestEntry, TestValue> =
1546                Oversized::init(context.with_label("third"), cfg)
1547                    .await
1548                    .expect("Failed to reinit after append");
1549
1550            // Read all valid entries in the index
1551            // First 2 entries from original data
1552            for i in 0..2u8 {
1553                let (position, offset, size) = locations[i as usize];
1554                let entry = oversized.get(1, position).await.expect("Failed to get");
1555                assert_eq!(entry.id, i as u64);
1556
1557                let value = oversized
1558                    .get_value(1, offset, size)
1559                    .await
1560                    .expect("Failed to get value");
1561                assert_eq!(value, [i; 16]);
1562            }
1563
1564            // New entries added after recovery
1565            for (position, offset, size, expected_id) in &new_locations {
1566                let entry = oversized
1567                    .get(1, *position)
1568                    .await
1569                    .expect("Failed to get new entry after restart");
1570                assert_eq!(entry.id, *expected_id as u64);
1571
1572                let value = oversized
1573                    .get_value(1, *offset, *size)
1574                    .await
1575                    .expect("Failed to get new value after restart");
1576                assert_eq!(value, [*expected_id; 16]);
1577            }
1578
1579            // Verify total entry count: 2 original + 3 new = 5
1580            assert!(oversized.get(1, 4).await.is_ok());
1581            assert!(oversized.get(1, 5).await.is_err());
1582
1583            oversized.destroy().await.expect("Failed to destroy");
1584        });
1585    }
1586
1587    #[test_traced]
1588    fn test_recovery_partial_index_entry() {
1589        let executor = deterministic::Runner::default();
1590        executor.start(|context| async move {
1591            let cfg = test_cfg();
1592
1593            // Create and populate
1594            let mut oversized: Oversized<_, TestEntry, TestValue> =
1595                Oversized::init(context.with_label("first"), cfg.clone())
1596                    .await
1597                    .expect("Failed to init");
1598
1599            // Append 3 entries
1600            for i in 0..3u8 {
1601                let value: TestValue = [i; 16];
1602                let entry = TestEntry::new(i as u64, 0, 0);
1603                oversized
1604                    .append(1, entry, &value)
1605                    .await
1606                    .expect("Failed to append");
1607            }
1608            oversized.sync(1).await.expect("Failed to sync");
1609            drop(oversized);
1610
1611            // Simulate crash during write: truncate index to partial entry
1612            // Each entry is TestEntry::SIZE (20) + 4 (CRC32) = 24 bytes
1613            // Truncate to 3 full entries + 10 bytes of partial entry
1614            let (blob, _) = context
1615                .open(&cfg.index_partition, &1u64.to_be_bytes())
1616                .await
1617                .expect("Failed to open blob");
1618            let partial_size = 3 * 24 + 10; // 3 full entries + partial
1619            blob.resize(partial_size).await.expect("Failed to resize");
1620            blob.sync().await.expect("Failed to sync");
1621            drop(blob);
1622
1623            // Reinitialize - should handle partial entry gracefully
1624            let mut oversized: Oversized<_, TestEntry, TestValue> =
1625                Oversized::init(context.with_label("second"), cfg.clone())
1626                    .await
1627                    .expect("Failed to reinit");
1628
1629            // First 3 entries should still be valid
1630            for i in 0..3u8 {
1631                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1632                assert_eq!(entry.id, i as u64);
1633            }
1634
1635            // Entry 3 should not exist (partial entry was removed)
1636            assert!(oversized.get(1, 3).await.is_err());
1637
1638            // Append new entry after recovery
1639            let value: TestValue = [42; 16];
1640            let entry = TestEntry::new(100, 0, 0);
1641            let (pos, offset, size) = oversized
1642                .append(1, entry, &value)
1643                .await
1644                .expect("Failed to append after recovery");
1645            assert_eq!(pos, 3);
1646
1647            // Verify we can read the new entry
1648            let retrieved = oversized.get(1, 3).await.expect("Failed to get new entry");
1649            assert_eq!(retrieved.id, 100);
1650            let retrieved_value = oversized
1651                .get_value(1, offset, size)
1652                .await
1653                .expect("Failed to get new value");
1654            assert_eq!(retrieved_value, value);
1655
1656            oversized.destroy().await.expect("Failed to destroy");
1657        });
1658    }
1659
1660    #[test_traced]
1661    fn test_recovery_only_partial_entry() {
1662        let executor = deterministic::Runner::default();
1663        executor.start(|context| async move {
1664            let cfg = test_cfg();
1665
1666            // Create and populate with single entry
1667            let mut oversized: Oversized<_, TestEntry, TestValue> =
1668                Oversized::init(context.with_label("first"), cfg.clone())
1669                    .await
1670                    .expect("Failed to init");
1671
1672            let value: TestValue = [42; 16];
1673            let entry = TestEntry::new(1, 0, 0);
1674            oversized
1675                .append(1, entry, &value)
1676                .await
1677                .expect("Failed to append");
1678            oversized.sync(1).await.expect("Failed to sync");
1679            drop(oversized);
1680
1681            // Truncate index to only partial data (less than one full entry)
1682            let (blob, _) = context
1683                .open(&cfg.index_partition, &1u64.to_be_bytes())
1684                .await
1685                .expect("Failed to open blob");
1686            blob.resize(10).await.expect("Failed to resize"); // Less than chunk size
1687            blob.sync().await.expect("Failed to sync");
1688            drop(blob);
1689
1690            // Reinitialize - should handle gracefully (rewind to 0)
1691            let mut oversized: Oversized<_, TestEntry, TestValue> =
1692                Oversized::init(context.with_label("second"), cfg.clone())
1693                    .await
1694                    .expect("Failed to reinit");
1695
1696            // No entries should exist
1697            assert!(oversized.get(1, 0).await.is_err());
1698
1699            // Should be able to append after recovery
1700            let value: TestValue = [99; 16];
1701            let entry = TestEntry::new(100, 0, 0);
1702            let (pos, offset, size) = oversized
1703                .append(1, entry, &value)
1704                .await
1705                .expect("Failed to append after recovery");
1706            assert_eq!(pos, 0);
1707
1708            let retrieved = oversized.get(1, 0).await.expect("Failed to get");
1709            assert_eq!(retrieved.id, 100);
1710            let retrieved_value = oversized
1711                .get_value(1, offset, size)
1712                .await
1713                .expect("Failed to get value");
1714            assert_eq!(retrieved_value, value);
1715
1716            oversized.destroy().await.expect("Failed to destroy");
1717        });
1718    }
1719
1720    #[test_traced]
1721    fn test_recovery_crash_during_rewind_index_ahead() {
1722        // Simulates crash where index was rewound but glob wasn't
1723        let executor = deterministic::Runner::default();
1724        executor.start(|context| async move {
1725            // Use page size = entry size so each entry is exactly one page.
1726            // This allows truncating by entry count to equal truncating by full pages,
1727            // maintaining page-level integrity.
1728            let cfg = Config {
1729                index_partition: "test_index".to_string(),
1730                value_partition: "test_values".to_string(),
1731                index_page_cache: CacheRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
1732                index_write_buffer: NZUsize!(1024),
1733                value_write_buffer: NZUsize!(1024),
1734                compression: None,
1735                codec_config: (),
1736            };
1737
1738            // Create and populate
1739            let mut oversized: Oversized<_, TestEntry, TestValue> =
1740                Oversized::init(context.with_label("first"), cfg.clone())
1741                    .await
1742                    .expect("Failed to init");
1743
1744            let mut locations = Vec::new();
1745            for i in 0..5u8 {
1746                let value: TestValue = [i; 16];
1747                let entry = TestEntry::new(i as u64, 0, 0);
1748                let loc = oversized
1749                    .append(1, entry, &value)
1750                    .await
1751                    .expect("Failed to append");
1752                locations.push(loc);
1753            }
1754            oversized.sync(1).await.expect("Failed to sync");
1755            drop(oversized);
1756
1757            // Simulate crash during rewind: truncate index to 2 entries but leave glob intact
1758            // This simulates: rewind(index) succeeded, crash before rewind(glob)
1759            let (blob, _) = context
1760                .open(&cfg.index_partition, &1u64.to_be_bytes())
1761                .await
1762                .expect("Failed to open blob");
1763            // Physical page size = logical (20) + CRC record (12) = 32 bytes
1764            let physical_page_size = (TestEntry::SIZE + 12) as u64;
1765            blob.resize(2 * physical_page_size)
1766                .await
1767                .expect("Failed to truncate");
1768            blob.sync().await.expect("Failed to sync");
1769            drop(blob);
1770
1771            // Reinitialize - recovery should succeed (glob has orphan data)
1772            let mut oversized: Oversized<_, TestEntry, TestValue> =
1773                Oversized::init(context.with_label("second"), cfg.clone())
1774                    .await
1775                    .expect("Failed to reinit");
1776
1777            // First 2 entries should be valid
1778            for i in 0..2u8 {
1779                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1780                assert_eq!(entry.id, i as u64);
1781            }
1782
1783            // Entries 2-4 should be gone (index was truncated)
1784            assert!(oversized.get(1, 2).await.is_err());
1785
1786            // Should be able to append new entries
1787            let (pos, _, _) = oversized
1788                .append(1, TestEntry::new(100, 0, 0), &[100u8; 16])
1789                .await
1790                .expect("Failed to append");
1791            assert_eq!(pos, 2);
1792
1793            oversized.destroy().await.expect("Failed to destroy");
1794        });
1795    }
1796
1797    #[test_traced]
1798    fn test_recovery_crash_during_rewind_glob_ahead() {
1799        // Simulates crash where glob was rewound but index wasn't
1800        let executor = deterministic::Runner::default();
1801        executor.start(|context| async move {
1802            let cfg = test_cfg();
1803
1804            // Create and populate
1805            let mut oversized: Oversized<_, TestEntry, TestValue> =
1806                Oversized::init(context.with_label("first"), cfg.clone())
1807                    .await
1808                    .expect("Failed to init");
1809
1810            let mut locations = Vec::new();
1811            for i in 0..5u8 {
1812                let value: TestValue = [i; 16];
1813                let entry = TestEntry::new(i as u64, 0, 0);
1814                let loc = oversized
1815                    .append(1, entry, &value)
1816                    .await
1817                    .expect("Failed to append");
1818                locations.push(loc);
1819            }
1820            oversized.sync(1).await.expect("Failed to sync");
1821            drop(oversized);
1822
1823            // Simulate crash during rewind: truncate glob to 2 entries but leave index intact
1824            // This simulates: rewind(glob) succeeded, crash before rewind(index)
1825            let (blob, _) = context
1826                .open(&cfg.value_partition, &1u64.to_be_bytes())
1827                .await
1828                .expect("Failed to open blob");
1829            let keep_size = byte_end(locations[1].1, locations[1].2);
1830            blob.resize(keep_size).await.expect("Failed to truncate");
1831            blob.sync().await.expect("Failed to sync");
1832            drop(blob);
1833
1834            // Reinitialize - recovery should detect index entries pointing beyond glob
1835            let mut oversized: Oversized<_, TestEntry, TestValue> =
1836                Oversized::init(context.with_label("second"), cfg.clone())
1837                    .await
1838                    .expect("Failed to reinit");
1839
1840            // First 2 entries should be valid (index rewound to match glob)
1841            for i in 0..2u8 {
1842                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1843                assert_eq!(entry.id, i as u64);
1844            }
1845
1846            // Entries 2-4 should be gone (index rewound during recovery)
1847            assert!(oversized.get(1, 2).await.is_err());
1848
1849            // Should be able to append after recovery
1850            let value: TestValue = [99; 16];
1851            let entry = TestEntry::new(100, 0, 0);
1852            let (pos, offset, size) = oversized
1853                .append(1, entry, &value)
1854                .await
1855                .expect("Failed to append after recovery");
1856            assert_eq!(pos, 2);
1857
1858            let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1859            assert_eq!(retrieved.id, 100);
1860            let retrieved_value = oversized
1861                .get_value(1, offset, size)
1862                .await
1863                .expect("Failed to get value");
1864            assert_eq!(retrieved_value, value);
1865
1866            oversized.destroy().await.expect("Failed to destroy");
1867        });
1868    }
1869
1870    #[test_traced]
1871    fn test_oversized_get_value_invalid_size() {
1872        let executor = deterministic::Runner::default();
1873        executor.start(|context| async move {
1874            let mut oversized: Oversized<_, TestEntry, TestValue> =
1875                Oversized::init(context.clone(), test_cfg())
1876                    .await
1877                    .expect("Failed to init");
1878
1879            let value: TestValue = [42; 16];
1880            let entry = TestEntry::new(1, 0, 0);
1881            let (_, offset, _size) = oversized
1882                .append(1, entry, &value)
1883                .await
1884                .expect("Failed to append");
1885            oversized.sync(1).await.expect("Failed to sync");
1886
1887            // Size 0 - should fail
1888            assert!(oversized.get_value(1, offset, 0).await.is_err());
1889
1890            // Size < value size - should fail with codec error, checksum mismatch, or
1891            // insufficient length (if size < 4 bytes for checksum)
1892            for size in 1..4u32 {
1893                let result = oversized.get_value(1, offset, size).await;
1894                assert!(
1895                    matches!(
1896                        result,
1897                        Err(Error::Codec(_))
1898                            | Err(Error::ChecksumMismatch(_, _))
1899                            | Err(Error::Runtime(_))
1900                    ),
1901                    "expected error, got: {:?}",
1902                    result
1903                );
1904            }
1905
1906            oversized.destroy().await.expect("Failed to destroy");
1907        });
1908    }
1909
1910    #[test_traced]
1911    fn test_oversized_get_value_wrong_size() {
1912        let executor = deterministic::Runner::default();
1913        executor.start(|context| async move {
1914            let mut oversized: Oversized<_, TestEntry, TestValue> =
1915                Oversized::init(context.clone(), test_cfg())
1916                    .await
1917                    .expect("Failed to init");
1918
1919            let value: TestValue = [42; 16];
1920            let entry = TestEntry::new(1, 0, 0);
1921            let (_, offset, correct_size) = oversized
1922                .append(1, entry, &value)
1923                .await
1924                .expect("Failed to append");
1925            oversized.sync(1).await.expect("Failed to sync");
1926
1927            // Size too small - will fail to decode or checksum mismatch
1928            // (checksum mismatch can occur because we read wrong bytes as the checksum)
1929            let result = oversized.get_value(1, offset, correct_size - 1).await;
1930            assert!(
1931                matches!(
1932                    result,
1933                    Err(Error::Codec(_)) | Err(Error::ChecksumMismatch(_, _))
1934                ),
1935                "expected Codec or ChecksumMismatch error, got: {:?}",
1936                result
1937            );
1938
1939            oversized.destroy().await.expect("Failed to destroy");
1940        });
1941    }
1942
1943    #[test_traced]
1944    fn test_recovery_values_has_orphan_section() {
1945        let executor = deterministic::Runner::default();
1946        executor.start(|context| async move {
1947            let cfg = test_cfg();
1948
1949            // Create and populate with sections 1 and 2
1950            let mut oversized: Oversized<_, TestEntry, TestValue> =
1951                Oversized::init(context.with_label("first"), cfg.clone())
1952                    .await
1953                    .expect("Failed to init");
1954
1955            for section in 1u64..=2 {
1956                let value: TestValue = [section as u8; 16];
1957                let entry = TestEntry::new(section, 0, 0);
1958                oversized
1959                    .append(section, entry, &value)
1960                    .await
1961                    .expect("Failed to append");
1962                oversized.sync(section).await.expect("Failed to sync");
1963            }
1964            drop(oversized);
1965
1966            // Manually create an orphan value section (section 3) without corresponding index
1967            let glob_cfg = GlobConfig {
1968                partition: cfg.value_partition.clone(),
1969                compression: cfg.compression,
1970                codec_config: (),
1971                write_buffer: cfg.value_write_buffer,
1972            };
1973            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1974                .await
1975                .expect("Failed to init glob");
1976            let orphan_value: TestValue = [99; 16];
1977            glob.append(3, &orphan_value)
1978                .await
1979                .expect("Failed to append orphan");
1980            glob.sync(3).await.expect("Failed to sync glob");
1981            drop(glob);
1982
1983            // Reinitialize - should detect and remove the orphan section
1984            let oversized: Oversized<_, TestEntry, TestValue> =
1985                Oversized::init(context.with_label("second"), cfg.clone())
1986                    .await
1987                    .expect("Failed to reinit");
1988
1989            // Sections 1 and 2 should still be valid
1990            assert!(oversized.get(1, 0).await.is_ok());
1991            assert!(oversized.get(2, 0).await.is_ok());
1992
1993            // Newest section should be 2 (orphan was removed)
1994            assert_eq!(oversized.newest_section(), Some(2));
1995
1996            oversized.destroy().await.expect("Failed to destroy");
1997        });
1998    }
1999
2000    #[test_traced]
2001    fn test_recovery_values_has_multiple_orphan_sections() {
2002        let executor = deterministic::Runner::default();
2003        executor.start(|context| async move {
2004            let cfg = test_cfg();
2005
2006            // Create and populate with only section 1
2007            let mut oversized: Oversized<_, TestEntry, TestValue> =
2008                Oversized::init(context.with_label("first"), cfg.clone())
2009                    .await
2010                    .expect("Failed to init");
2011
2012            let value: TestValue = [1; 16];
2013            let entry = TestEntry::new(1, 0, 0);
2014            oversized
2015                .append(1, entry, &value)
2016                .await
2017                .expect("Failed to append");
2018            oversized.sync(1).await.expect("Failed to sync");
2019            drop(oversized);
2020
2021            // Manually create multiple orphan value sections (2, 3, 4)
2022            let glob_cfg = GlobConfig {
2023                partition: cfg.value_partition.clone(),
2024                compression: cfg.compression,
2025                codec_config: (),
2026                write_buffer: cfg.value_write_buffer,
2027            };
2028            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2029                .await
2030                .expect("Failed to init glob");
2031
2032            for section in 2u64..=4 {
2033                let orphan_value: TestValue = [section as u8; 16];
2034                glob.append(section, &orphan_value)
2035                    .await
2036                    .expect("Failed to append orphan");
2037                glob.sync(section).await.expect("Failed to sync glob");
2038            }
2039            drop(glob);
2040
2041            // Reinitialize - should detect and remove all orphan sections
2042            let oversized: Oversized<_, TestEntry, TestValue> =
2043                Oversized::init(context.with_label("second"), cfg.clone())
2044                    .await
2045                    .expect("Failed to reinit");
2046
2047            // Section 1 should still be valid
2048            assert!(oversized.get(1, 0).await.is_ok());
2049
2050            // Newest section should be 1 (orphans removed)
2051            assert_eq!(oversized.newest_section(), Some(1));
2052
2053            oversized.destroy().await.expect("Failed to destroy");
2054        });
2055    }
2056
2057    #[test_traced]
2058    fn test_recovery_index_empty_but_values_exist() {
2059        let executor = deterministic::Runner::default();
2060        executor.start(|context| async move {
2061            let cfg = test_cfg();
2062
2063            // Manually create value sections without any index entries
2064            let glob_cfg = GlobConfig {
2065                partition: cfg.value_partition.clone(),
2066                compression: cfg.compression,
2067                codec_config: (),
2068                write_buffer: cfg.value_write_buffer,
2069            };
2070            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2071                .await
2072                .expect("Failed to init glob");
2073
2074            for section in 1u64..=3 {
2075                let orphan_value: TestValue = [section as u8; 16];
2076                glob.append(section, &orphan_value)
2077                    .await
2078                    .expect("Failed to append orphan");
2079                glob.sync(section).await.expect("Failed to sync glob");
2080            }
2081            drop(glob);
2082
2083            // Initialize oversized - should remove all orphan value sections
2084            let oversized: Oversized<_, TestEntry, TestValue> =
2085                Oversized::init(context.with_label("first"), cfg.clone())
2086                    .await
2087                    .expect("Failed to init");
2088
2089            // No sections should exist
2090            assert_eq!(oversized.newest_section(), None);
2091            assert_eq!(oversized.oldest_section(), None);
2092
2093            oversized.destroy().await.expect("Failed to destroy");
2094        });
2095    }
2096
2097    #[test_traced]
2098    fn test_recovery_orphan_section_append_after() {
2099        let executor = deterministic::Runner::default();
2100        executor.start(|context| async move {
2101            let cfg = test_cfg();
2102
2103            // Create and populate with section 1
2104            let mut oversized: Oversized<_, TestEntry, TestValue> =
2105                Oversized::init(context.with_label("first"), cfg.clone())
2106                    .await
2107                    .expect("Failed to init");
2108
2109            let value: TestValue = [1; 16];
2110            let entry = TestEntry::new(1, 0, 0);
2111            let (_, offset1, size1) = oversized
2112                .append(1, entry, &value)
2113                .await
2114                .expect("Failed to append");
2115            oversized.sync(1).await.expect("Failed to sync");
2116            drop(oversized);
2117
2118            // Manually create orphan value sections (2, 3)
2119            let glob_cfg = GlobConfig {
2120                partition: cfg.value_partition.clone(),
2121                compression: cfg.compression,
2122                codec_config: (),
2123                write_buffer: cfg.value_write_buffer,
2124            };
2125            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2126                .await
2127                .expect("Failed to init glob");
2128
2129            for section in 2u64..=3 {
2130                let orphan_value: TestValue = [section as u8; 16];
2131                glob.append(section, &orphan_value)
2132                    .await
2133                    .expect("Failed to append orphan");
2134                glob.sync(section).await.expect("Failed to sync glob");
2135            }
2136            drop(glob);
2137
2138            // Reinitialize - should remove orphan sections
2139            let mut oversized: Oversized<_, TestEntry, TestValue> =
2140                Oversized::init(context.with_label("second"), cfg.clone())
2141                    .await
2142                    .expect("Failed to reinit");
2143
2144            // Section 1 should still be valid
2145            let entry = oversized.get(1, 0).await.expect("Failed to get");
2146            assert_eq!(entry.id, 1);
2147            let value = oversized
2148                .get_value(1, offset1, size1)
2149                .await
2150                .expect("Failed to get value");
2151            assert_eq!(value, [1; 16]);
2152
2153            // Should be able to append to section 2 after recovery
2154            let new_value: TestValue = [42; 16];
2155            let new_entry = TestEntry::new(42, 0, 0);
2156            let (pos, offset, size) = oversized
2157                .append(2, new_entry, &new_value)
2158                .await
2159                .expect("Failed to append after recovery");
2160            assert_eq!(pos, 0);
2161
2162            // Verify the new entry
2163            let retrieved = oversized.get(2, 0).await.expect("Failed to get");
2164            assert_eq!(retrieved.id, 42);
2165            let retrieved_value = oversized
2166                .get_value(2, offset, size)
2167                .await
2168                .expect("Failed to get value");
2169            assert_eq!(retrieved_value, new_value);
2170
2171            // Sync and restart to verify persistence
2172            oversized.sync(2).await.expect("Failed to sync");
2173            drop(oversized);
2174
2175            let oversized: Oversized<_, TestEntry, TestValue> =
2176                Oversized::init(context.with_label("third"), cfg)
2177                    .await
2178                    .expect("Failed to reinit after append");
2179
2180            // Both sections should be valid
2181            assert!(oversized.get(1, 0).await.is_ok());
2182            assert!(oversized.get(2, 0).await.is_ok());
2183            assert_eq!(oversized.newest_section(), Some(2));
2184
2185            oversized.destroy().await.expect("Failed to destroy");
2186        });
2187    }
2188
2189    #[test_traced]
2190    fn test_recovery_no_orphan_sections() {
2191        let executor = deterministic::Runner::default();
2192        executor.start(|context| async move {
2193            let cfg = test_cfg();
2194
2195            // Create and populate with sections 1, 2, 3 (no orphans)
2196            let mut oversized: Oversized<_, TestEntry, TestValue> =
2197                Oversized::init(context.with_label("first"), cfg.clone())
2198                    .await
2199                    .expect("Failed to init");
2200
2201            for section in 1u64..=3 {
2202                let value: TestValue = [section as u8; 16];
2203                let entry = TestEntry::new(section, 0, 0);
2204                oversized
2205                    .append(section, entry, &value)
2206                    .await
2207                    .expect("Failed to append");
2208                oversized.sync(section).await.expect("Failed to sync");
2209            }
2210            drop(oversized);
2211
2212            // Reinitialize - no orphan cleanup needed
2213            let oversized: Oversized<_, TestEntry, TestValue> =
2214                Oversized::init(context.with_label("second"), cfg)
2215                    .await
2216                    .expect("Failed to reinit");
2217
2218            // All sections should be valid
2219            for section in 1u64..=3 {
2220                let entry = oversized.get(section, 0).await.expect("Failed to get");
2221                assert_eq!(entry.id, section);
2222            }
2223            assert_eq!(oversized.newest_section(), Some(3));
2224
2225            oversized.destroy().await.expect("Failed to destroy");
2226        });
2227    }
2228
2229    #[test_traced]
2230    fn test_recovery_orphan_with_empty_index_section() {
2231        let executor = deterministic::Runner::default();
2232        executor.start(|context| async move {
2233            let cfg = test_cfg();
2234
2235            // Create and populate section 1 with entries
2236            let mut oversized: Oversized<_, TestEntry, TestValue> =
2237                Oversized::init(context.with_label("first"), cfg.clone())
2238                    .await
2239                    .expect("Failed to init");
2240
2241            let value: TestValue = [1; 16];
2242            let entry = TestEntry::new(1, 0, 0);
2243            oversized
2244                .append(1, entry, &value)
2245                .await
2246                .expect("Failed to append");
2247            oversized.sync(1).await.expect("Failed to sync");
2248            drop(oversized);
2249
2250            // Manually create orphan value section 2
2251            let glob_cfg = GlobConfig {
2252                partition: cfg.value_partition.clone(),
2253                compression: cfg.compression,
2254                codec_config: (),
2255                write_buffer: cfg.value_write_buffer,
2256            };
2257            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2258                .await
2259                .expect("Failed to init glob");
2260            let orphan_value: TestValue = [2; 16];
2261            glob.append(2, &orphan_value)
2262                .await
2263                .expect("Failed to append orphan");
2264            glob.sync(2).await.expect("Failed to sync glob");
2265            drop(glob);
2266
2267            // Now truncate index section 1 to 0 (making it empty but still tracked)
2268            let (blob, _) = context
2269                .open(&cfg.index_partition, &1u64.to_be_bytes())
2270                .await
2271                .expect("Failed to open blob");
2272            blob.resize(0).await.expect("Failed to truncate");
2273            blob.sync().await.expect("Failed to sync");
2274            drop(blob);
2275
2276            // Reinitialize - should handle empty index section and remove orphan value section
2277            let oversized: Oversized<_, TestEntry, TestValue> =
2278                Oversized::init(context.with_label("second"), cfg)
2279                    .await
2280                    .expect("Failed to reinit");
2281
2282            // Section 1 should exist but have no entries (empty after truncation)
2283            assert!(oversized.get(1, 0).await.is_err());
2284
2285            // Orphan section 2 should be removed
2286            assert_eq!(oversized.newest_section(), Some(1));
2287
2288            oversized.destroy().await.expect("Failed to destroy");
2289        });
2290    }
2291
2292    #[test_traced]
2293    fn test_recovery_orphan_sections_with_gaps() {
2294        // Test non-contiguous sections: index has [1, 3, 5], values has [1, 2, 3, 4, 5, 6]
2295        // Orphan sections 2, 4, 6 should be removed
2296        let executor = deterministic::Runner::default();
2297        executor.start(|context| async move {
2298            let cfg = test_cfg();
2299
2300            // Create index with sections 1, 3, 5 (gaps)
2301            let mut oversized: Oversized<_, TestEntry, TestValue> =
2302                Oversized::init(context.with_label("first"), cfg.clone())
2303                    .await
2304                    .expect("Failed to init");
2305
2306            for section in [1u64, 3, 5] {
2307                let value: TestValue = [section as u8; 16];
2308                let entry = TestEntry::new(section, 0, 0);
2309                oversized
2310                    .append(section, entry, &value)
2311                    .await
2312                    .expect("Failed to append");
2313                oversized.sync(section).await.expect("Failed to sync");
2314            }
2315            drop(oversized);
2316
2317            // Manually create orphan value sections 2, 4, 6 (filling gaps and beyond)
2318            let glob_cfg = GlobConfig {
2319                partition: cfg.value_partition.clone(),
2320                compression: cfg.compression,
2321                codec_config: (),
2322                write_buffer: cfg.value_write_buffer,
2323            };
2324            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2325                .await
2326                .expect("Failed to init glob");
2327
2328            for section in [2u64, 4, 6] {
2329                let orphan_value: TestValue = [section as u8; 16];
2330                glob.append(section, &orphan_value)
2331                    .await
2332                    .expect("Failed to append orphan");
2333                glob.sync(section).await.expect("Failed to sync glob");
2334            }
2335            drop(glob);
2336
2337            // Reinitialize - should remove orphan sections 2, 4, 6
2338            let oversized: Oversized<_, TestEntry, TestValue> =
2339                Oversized::init(context.with_label("second"), cfg)
2340                    .await
2341                    .expect("Failed to reinit");
2342
2343            // Sections 1, 3, 5 should still be valid
2344            for section in [1u64, 3, 5] {
2345                let entry = oversized.get(section, 0).await.expect("Failed to get");
2346                assert_eq!(entry.id, section);
2347            }
2348
2349            // Verify only sections 1, 3, 5 exist (orphans removed)
2350            assert_eq!(oversized.oldest_section(), Some(1));
2351            assert_eq!(oversized.newest_section(), Some(5));
2352
2353            oversized.destroy().await.expect("Failed to destroy");
2354        });
2355    }
2356
2357    #[test_traced]
2358    fn test_recovery_glob_trailing_garbage_truncated() {
2359        // Tests the bug fix: when value is written to glob but index entry isn't
2360        // (crash after value write, before index write), recovery should truncate
2361        // the glob trailing garbage so subsequent appends start at correct offset.
2362        let executor = deterministic::Runner::default();
2363        executor.start(|context| async move {
2364            let cfg = test_cfg();
2365
2366            // Create and populate
2367            let mut oversized: Oversized<_, TestEntry, TestValue> =
2368                Oversized::init(context.with_label("first"), cfg.clone())
2369                    .await
2370                    .expect("Failed to init");
2371
2372            // Append 2 entries
2373            let mut locations = Vec::new();
2374            for i in 0..2u8 {
2375                let value: TestValue = [i; 16];
2376                let entry = TestEntry::new(i as u64, 0, 0);
2377                let loc = oversized
2378                    .append(1, entry, &value)
2379                    .await
2380                    .expect("Failed to append");
2381                locations.push(loc);
2382            }
2383            oversized.sync(1).await.expect("Failed to sync");
2384
2385            // Record where next entry SHOULD start (end of entry 1)
2386            let expected_next_offset = byte_end(locations[1].1, locations[1].2);
2387            drop(oversized);
2388
2389            // Simulate crash: write garbage to glob (simulating partial value write)
2390            let (blob, size) = context
2391                .open(&cfg.value_partition, &1u64.to_be_bytes())
2392                .await
2393                .expect("Failed to open blob");
2394            assert_eq!(size, expected_next_offset);
2395
2396            // Write 100 bytes of garbage (simulating partial/failed value write)
2397            let garbage = vec![0xDE; 100];
2398            blob.write_at(size, garbage)
2399                .await
2400                .expect("Failed to write garbage");
2401            blob.sync().await.expect("Failed to sync");
2402            drop(blob);
2403
2404            // Verify glob now has trailing garbage
2405            let (blob, new_size) = context
2406                .open(&cfg.value_partition, &1u64.to_be_bytes())
2407                .await
2408                .expect("Failed to open blob");
2409            assert_eq!(new_size, expected_next_offset + 100);
2410            drop(blob);
2411
2412            // Reinitialize - should truncate the trailing garbage
2413            let mut oversized: Oversized<_, TestEntry, TestValue> =
2414                Oversized::init(context.with_label("second"), cfg.clone())
2415                    .await
2416                    .expect("Failed to reinit");
2417
2418            // First 2 entries should still be valid
2419            for i in 0..2u8 {
2420                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2421                assert_eq!(entry.id, i as u64);
2422            }
2423
2424            // Append new entry - should start at expected_next_offset, NOT at garbage end
2425            let new_value: TestValue = [99; 16];
2426            let new_entry = TestEntry::new(99, 0, 0);
2427            let (pos, offset, _size) = oversized
2428                .append(1, new_entry, &new_value)
2429                .await
2430                .expect("Failed to append after recovery");
2431
2432            // Verify position is 2 (after the 2 existing entries)
2433            assert_eq!(pos, 2);
2434
2435            // Verify offset is at expected_next_offset (garbage was truncated)
2436            assert_eq!(offset, expected_next_offset);
2437
2438            // Verify we can read the new entry
2439            let retrieved = oversized.get(1, 2).await.expect("Failed to get new entry");
2440            assert_eq!(retrieved.id, 99);
2441
2442            oversized.destroy().await.expect("Failed to destroy");
2443        });
2444    }
2445
2446    #[test_traced]
2447    fn test_recovery_entry_with_overflow_offset() {
2448        // Tests that an entry with offset near u64::MAX that would overflow
2449        // when added to size is detected as invalid during recovery.
2450        let executor = deterministic::Runner::default();
2451        executor.start(|context| async move {
2452            // Use page size = entry size so one entry per page
2453            let cfg = Config {
2454                index_partition: "test_index".to_string(),
2455                value_partition: "test_values".to_string(),
2456                index_page_cache: CacheRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
2457                index_write_buffer: NZUsize!(1024),
2458                value_write_buffer: NZUsize!(1024),
2459                compression: None,
2460                codec_config: (),
2461            };
2462
2463            // Create and populate with valid entry
2464            let mut oversized: Oversized<_, TestEntry, TestValue> =
2465                Oversized::init(context.with_label("first"), cfg.clone())
2466                    .await
2467                    .expect("Failed to init");
2468
2469            let value: TestValue = [1; 16];
2470            let entry = TestEntry::new(1, 0, 0);
2471            oversized
2472                .append(1, entry, &value)
2473                .await
2474                .expect("Failed to append");
2475            oversized.sync(1).await.expect("Failed to sync");
2476            drop(oversized);
2477
2478            // Build a corrupted entry with offset near u64::MAX that would overflow.
2479            // We need to write a valid page (with correct page-level CRC) containing
2480            // the semantically-invalid entry data.
2481            let (blob, _) = context
2482                .open(&cfg.index_partition, &1u64.to_be_bytes())
2483                .await
2484                .expect("Failed to open blob");
2485
2486            // Build entry data: id (8) + value_offset (8) + value_size (4) = 20 bytes
2487            let mut entry_data = Vec::new();
2488            1u64.write(&mut entry_data); // id
2489            (u64::MAX - 10).write(&mut entry_data); // value_offset (near max)
2490            100u32.write(&mut entry_data); // value_size (offset + size overflows)
2491            assert_eq!(entry_data.len(), TestEntry::SIZE);
2492
2493            // Build page-level CRC record (12 bytes):
2494            // len1 (2) + crc1 (4) + len2 (2) + crc2 (4)
2495            let crc = Crc32::checksum(&entry_data);
2496            let len1 = TestEntry::SIZE as u16;
2497            let mut crc_record = Vec::new();
2498            crc_record.extend_from_slice(&len1.to_be_bytes()); // len1
2499            crc_record.extend_from_slice(&crc.to_be_bytes()); // crc1
2500            crc_record.extend_from_slice(&0u16.to_be_bytes()); // len2 (unused)
2501            crc_record.extend_from_slice(&0u32.to_be_bytes()); // crc2 (unused)
2502            assert_eq!(crc_record.len(), 12);
2503
2504            // Write the complete physical page: entry_data + crc_record
2505            let mut page = entry_data;
2506            page.extend_from_slice(&crc_record);
2507            blob.write_at(0, page)
2508                .await
2509                .expect("Failed to write corrupted page");
2510            blob.sync().await.expect("Failed to sync");
2511            drop(blob);
2512
2513            // Reinitialize - recovery should detect the invalid entry
2514            // (offset + size would overflow, and even with saturating_add it exceeds glob_size)
2515            let mut oversized: Oversized<_, TestEntry, TestValue> =
2516                Oversized::init(context.with_label("second"), cfg.clone())
2517                    .await
2518                    .expect("Failed to reinit");
2519
2520            // The corrupted entry should have been rewound (invalid)
2521            assert!(oversized.get(1, 0).await.is_err());
2522
2523            // Should be able to append after recovery
2524            let new_value: TestValue = [99; 16];
2525            let new_entry = TestEntry::new(99, 0, 0);
2526            let (pos, new_offset, _) = oversized
2527                .append(1, new_entry, &new_value)
2528                .await
2529                .expect("Failed to append after recovery");
2530
2531            // Position should be 0 (corrupted entry was removed)
2532            assert_eq!(pos, 0);
2533            // Offset should be 0 (glob was truncated to 0)
2534            assert_eq!(new_offset, 0);
2535
2536            oversized.destroy().await.expect("Failed to destroy");
2537        });
2538    }
2539
2540    #[test_traced]
2541    fn test_empty_section_persistence() {
2542        // Tests that sections that become empty (all entries removed/rewound)
2543        // are handled correctly across restart cycles.
2544        let executor = deterministic::Runner::default();
2545        executor.start(|context| async move {
2546            let cfg = test_cfg();
2547
2548            // Create and populate section 1 with entries
2549            let mut oversized: Oversized<_, TestEntry, TestValue> =
2550                Oversized::init(context.with_label("first"), cfg.clone())
2551                    .await
2552                    .expect("Failed to init");
2553
2554            for i in 0..3u8 {
2555                let value: TestValue = [i; 16];
2556                let entry = TestEntry::new(i as u64, 0, 0);
2557                oversized
2558                    .append(1, entry, &value)
2559                    .await
2560                    .expect("Failed to append");
2561            }
2562            oversized.sync(1).await.expect("Failed to sync");
2563
2564            // Also create section 2 to ensure it survives
2565            let value2: TestValue = [10; 16];
2566            let entry2 = TestEntry::new(10, 0, 0);
2567            oversized
2568                .append(2, entry2, &value2)
2569                .await
2570                .expect("Failed to append to section 2");
2571            oversized.sync(2).await.expect("Failed to sync section 2");
2572            drop(oversized);
2573
2574            // Truncate section 1's index to 0 (making it empty)
2575            let (blob, _) = context
2576                .open(&cfg.index_partition, &1u64.to_be_bytes())
2577                .await
2578                .expect("Failed to open blob");
2579            blob.resize(0).await.expect("Failed to truncate");
2580            blob.sync().await.expect("Failed to sync");
2581            drop(blob);
2582
2583            // First restart - recovery should handle empty section 1
2584            let mut oversized: Oversized<_, TestEntry, TestValue> =
2585                Oversized::init(context.with_label("second"), cfg.clone())
2586                    .await
2587                    .expect("Failed to reinit");
2588
2589            // Section 1 should exist but have no entries
2590            assert!(oversized.get(1, 0).await.is_err());
2591
2592            // Section 2 should still be valid
2593            let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2594            assert_eq!(entry.id, 10);
2595
2596            // Section 1 should still be tracked (blob exists but is empty)
2597            assert_eq!(oversized.oldest_section(), Some(1));
2598
2599            // Append to empty section 1
2600            // Note: When index is truncated to 0 but the index blob still exists,
2601            // the glob is NOT truncated (the section isn't considered an orphan).
2602            // The glob still has orphan DATA from the old entries, but this doesn't
2603            // affect correctness - new entries simply append after the orphan data.
2604            let new_value: TestValue = [99; 16];
2605            let new_entry = TestEntry::new(99, 0, 0);
2606            let (pos, offset, size) = oversized
2607                .append(1, new_entry, &new_value)
2608                .await
2609                .expect("Failed to append to empty section");
2610            assert_eq!(pos, 0);
2611            // Glob offset is non-zero because orphan data wasn't truncated
2612            assert!(offset > 0);
2613            oversized.sync(1).await.expect("Failed to sync");
2614
2615            // Verify the new entry is readable despite orphan data before it
2616            let entry = oversized.get(1, 0).await.expect("Failed to get");
2617            assert_eq!(entry.id, 99);
2618            let value = oversized
2619                .get_value(1, offset, size)
2620                .await
2621                .expect("Failed to get value");
2622            assert_eq!(value, new_value);
2623
2624            drop(oversized);
2625
2626            // Second restart - verify persistence
2627            let oversized: Oversized<_, TestEntry, TestValue> =
2628                Oversized::init(context.with_label("third"), cfg.clone())
2629                    .await
2630                    .expect("Failed to reinit again");
2631
2632            // Section 1's new entry should be valid
2633            let entry = oversized.get(1, 0).await.expect("Failed to get");
2634            assert_eq!(entry.id, 99);
2635
2636            // Section 2 should still be valid
2637            let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2638            assert_eq!(entry.id, 10);
2639
2640            oversized.destroy().await.expect("Failed to destroy");
2641        });
2642    }
2643
2644    #[test_traced]
2645    fn test_get_value_size_equals_crc_size() {
2646        // Tests the boundary condition where size = 4 (just CRC, no data).
2647        // This should fail because there's no actual data to decode.
2648        let executor = deterministic::Runner::default();
2649        executor.start(|context| async move {
2650            let mut oversized: Oversized<_, TestEntry, TestValue> =
2651                Oversized::init(context.clone(), test_cfg())
2652                    .await
2653                    .expect("Failed to init");
2654
2655            let value: TestValue = [42; 16];
2656            let entry = TestEntry::new(1, 0, 0);
2657            let (_, offset, _) = oversized
2658                .append(1, entry, &value)
2659                .await
2660                .expect("Failed to append");
2661            oversized.sync(1).await.expect("Failed to sync");
2662
2663            // Size = 4 (exactly CRC_SIZE) means 0 bytes of actual data
2664            // This should fail with ChecksumMismatch or decode error
2665            let result = oversized.get_value(1, offset, 4).await;
2666            assert!(result.is_err());
2667
2668            oversized.destroy().await.expect("Failed to destroy");
2669        });
2670    }
2671
2672    #[test_traced]
2673    fn test_get_value_size_just_over_crc() {
2674        // Tests size = 5 (CRC + 1 byte of data).
2675        // This should fail because the data is too short to decode.
2676        let executor = deterministic::Runner::default();
2677        executor.start(|context| async move {
2678            let mut oversized: Oversized<_, TestEntry, TestValue> =
2679                Oversized::init(context.clone(), test_cfg())
2680                    .await
2681                    .expect("Failed to init");
2682
2683            let value: TestValue = [42; 16];
2684            let entry = TestEntry::new(1, 0, 0);
2685            let (_, offset, _) = oversized
2686                .append(1, entry, &value)
2687                .await
2688                .expect("Failed to append");
2689            oversized.sync(1).await.expect("Failed to sync");
2690
2691            // Size = 5 means 1 byte of actual data (after stripping CRC)
2692            // This should fail with checksum mismatch since we're reading wrong bytes
2693            let result = oversized.get_value(1, offset, 5).await;
2694            assert!(result.is_err());
2695
2696            oversized.destroy().await.expect("Failed to destroy");
2697        });
2698    }
2699
2700    #[test_traced]
2701    fn test_recovery_maximum_section_numbers() {
2702        // Test recovery with very large section numbers near u64::MAX to check
2703        // for overflow edge cases in section arithmetic.
2704        let executor = deterministic::Runner::default();
2705        executor.start(|context| async move {
2706            let cfg = test_cfg();
2707
2708            // Use section numbers near u64::MAX
2709            let large_sections = [u64::MAX - 3, u64::MAX - 2, u64::MAX - 1];
2710
2711            // Create and populate with large section numbers
2712            let mut oversized: Oversized<_, TestEntry, TestValue> =
2713                Oversized::init(context.with_label("first"), cfg.clone())
2714                    .await
2715                    .expect("Failed to init");
2716
2717            let mut locations = Vec::new();
2718            for &section in &large_sections {
2719                let value: TestValue = [(section & 0xFF) as u8; 16];
2720                let entry = TestEntry::new(section, 0, 0);
2721                let loc = oversized
2722                    .append(section, entry, &value)
2723                    .await
2724                    .expect("Failed to append");
2725                locations.push((section, loc));
2726                oversized.sync(section).await.expect("Failed to sync");
2727            }
2728            drop(oversized);
2729
2730            // Simulate crash: truncate glob for middle section
2731            let middle_section = large_sections[1];
2732            let (blob, size) = context
2733                .open(&cfg.value_partition, &middle_section.to_be_bytes())
2734                .await
2735                .expect("Failed to open blob");
2736            blob.resize(size / 2).await.expect("Failed to truncate");
2737            blob.sync().await.expect("Failed to sync");
2738            drop(blob);
2739
2740            // Reinitialize - should recover without overflow panics
2741            let oversized: Oversized<_, TestEntry, TestValue> =
2742                Oversized::init(context.with_label("second"), cfg.clone())
2743                    .await
2744                    .expect("Failed to reinit");
2745
2746            // First and last sections should still be valid
2747            let entry = oversized
2748                .get(large_sections[0], 0)
2749                .await
2750                .expect("Failed to get first section");
2751            assert_eq!(entry.id, large_sections[0]);
2752
2753            let entry = oversized
2754                .get(large_sections[2], 0)
2755                .await
2756                .expect("Failed to get last section");
2757            assert_eq!(entry.id, large_sections[2]);
2758
2759            // Middle section should have been rewound (no entries)
2760            assert!(oversized.get(middle_section, 0).await.is_err());
2761
2762            // Verify we can still append to these large sections
2763            let new_value: TestValue = [0xAB; 16];
2764            let new_entry = TestEntry::new(999, 0, 0);
2765            let mut oversized = oversized;
2766            oversized
2767                .append(middle_section, new_entry, &new_value)
2768                .await
2769                .expect("Failed to append after recovery");
2770
2771            oversized.destroy().await.expect("Failed to destroy");
2772        });
2773    }
2774
2775    #[test_traced]
2776    fn test_recovery_crash_during_recovery_rewind() {
2777        // Tests a nested crash scenario: initial crash leaves inconsistent state,
2778        // then a second crash occurs during recovery's rewind operation.
2779        // This simulates the worst-case where recovery itself is interrupted.
2780        let executor = deterministic::Runner::default();
2781        executor.start(|context| async move {
2782            let cfg = test_cfg();
2783
2784            // Phase 1: Create valid data with 5 entries
2785            let mut oversized: Oversized<_, TestEntry, TestValue> =
2786                Oversized::init(context.with_label("first"), cfg.clone())
2787                    .await
2788                    .expect("Failed to init");
2789
2790            let mut locations = Vec::new();
2791            for i in 0..5u8 {
2792                let value: TestValue = [i; 16];
2793                let entry = TestEntry::new(i as u64, 0, 0);
2794                let loc = oversized
2795                    .append(1, entry, &value)
2796                    .await
2797                    .expect("Failed to append");
2798                locations.push(loc);
2799            }
2800            oversized.sync(1).await.expect("Failed to sync");
2801            drop(oversized);
2802
2803            // Phase 2: Simulate first crash - truncate glob to lose last 2 entries
2804            let (blob, _) = context
2805                .open(&cfg.value_partition, &1u64.to_be_bytes())
2806                .await
2807                .expect("Failed to open blob");
2808            let keep_size = byte_end(locations[2].1, locations[2].2);
2809            blob.resize(keep_size).await.expect("Failed to truncate");
2810            blob.sync().await.expect("Failed to sync");
2811            drop(blob);
2812
2813            // Phase 3: Simulate crash during recovery's rewind
2814            // Recovery would try to rewind index from 5 entries to 3 entries.
2815            // Simulate partial rewind by manually truncating index to 4 entries
2816            // (as if crash occurred mid-rewind).
2817            let chunk_size = FixedJournal::<deterministic::Context, TestEntry>::CHUNK_SIZE as u64;
2818            let (index_blob, _) = context
2819                .open(&cfg.index_partition, &1u64.to_be_bytes())
2820                .await
2821                .expect("Failed to open index blob");
2822            let partial_rewind_size = 4 * chunk_size; // 4 entries instead of 3
2823            index_blob
2824                .resize(partial_rewind_size)
2825                .await
2826                .expect("Failed to resize");
2827            index_blob.sync().await.expect("Failed to sync");
2828            drop(index_blob);
2829
2830            // Phase 4: Second recovery attempt should handle the inconsistent state
2831            // Index has 4 entries, but glob only supports 3.
2832            let oversized: Oversized<_, TestEntry, TestValue> =
2833                Oversized::init(context.with_label("second"), cfg.clone())
2834                    .await
2835                    .expect("Failed to reinit after nested crash");
2836
2837            // Only first 3 entries should be valid (recovery should rewind again)
2838            for i in 0..3u8 {
2839                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2840                assert_eq!(entry.id, i as u64);
2841
2842                let (_, offset, size) = locations[i as usize];
2843                let value = oversized
2844                    .get_value(1, offset, size)
2845                    .await
2846                    .expect("Failed to get value");
2847                assert_eq!(value, [i; 16]);
2848            }
2849
2850            // Entry 3 should not exist (index was rewound to match glob)
2851            assert!(oversized.get(1, 3).await.is_err());
2852
2853            // Verify append works after nested crash recovery
2854            let new_value: TestValue = [0xFF; 16];
2855            let new_entry = TestEntry::new(100, 0, 0);
2856            let mut oversized = oversized;
2857            let (pos, offset, _size) = oversized
2858                .append(1, new_entry, &new_value)
2859                .await
2860                .expect("Failed to append");
2861            assert_eq!(pos, 3); // Should be position 3 (after the 3 valid entries)
2862
2863            // Verify the offset starts where entry 2 ended (no gaps)
2864            assert_eq!(offset, byte_end(locations[2].1, locations[2].2));
2865
2866            oversized.destroy().await.expect("Failed to destroy");
2867        });
2868    }
2869
2870    #[test_traced]
2871    fn test_recovery_crash_during_orphan_cleanup() {
2872        // Tests crash during orphan section cleanup: recovery starts removing
2873        // orphan value sections, but crashes mid-cleanup.
2874        let executor = deterministic::Runner::default();
2875        executor.start(|context| async move {
2876            let cfg = test_cfg();
2877
2878            // Phase 1: Create valid data in section 1
2879            let mut oversized: Oversized<_, TestEntry, TestValue> =
2880                Oversized::init(context.with_label("first"), cfg.clone())
2881                    .await
2882                    .expect("Failed to init");
2883
2884            let value: TestValue = [1; 16];
2885            let entry = TestEntry::new(1, 0, 0);
2886            let (_, offset1, size1) = oversized
2887                .append(1, entry, &value)
2888                .await
2889                .expect("Failed to append");
2890            oversized.sync(1).await.expect("Failed to sync");
2891            drop(oversized);
2892
2893            // Phase 2: Create orphan value sections 2, 3, 4 (no index entries)
2894            let glob_cfg = GlobConfig {
2895                partition: cfg.value_partition.clone(),
2896                compression: cfg.compression,
2897                codec_config: (),
2898                write_buffer: cfg.value_write_buffer,
2899            };
2900            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2901                .await
2902                .expect("Failed to init glob");
2903
2904            for section in 2u64..=4 {
2905                let orphan_value: TestValue = [section as u8; 16];
2906                glob.append(section, &orphan_value)
2907                    .await
2908                    .expect("Failed to append orphan");
2909                glob.sync(section).await.expect("Failed to sync glob");
2910            }
2911            drop(glob);
2912
2913            // Phase 3: Simulate partial orphan cleanup (section 2 removed, 3 and 4 remain)
2914            // This simulates a crash during cleanup_orphan_value_sections()
2915            context
2916                .remove(&cfg.value_partition, Some(&2u64.to_be_bytes()))
2917                .await
2918                .expect("Failed to remove section 2");
2919
2920            // Phase 4: Recovery should complete the cleanup
2921            let mut oversized: Oversized<_, TestEntry, TestValue> =
2922                Oversized::init(context.with_label("second"), cfg.clone())
2923                    .await
2924                    .expect("Failed to reinit");
2925
2926            // Section 1 should still be valid
2927            let entry = oversized.get(1, 0).await.expect("Failed to get");
2928            assert_eq!(entry.id, 1);
2929            let value = oversized
2930                .get_value(1, offset1, size1)
2931                .await
2932                .expect("Failed to get value");
2933            assert_eq!(value, [1; 16]);
2934
2935            // No orphan sections should remain
2936            assert_eq!(oversized.oldest_section(), Some(1));
2937            assert_eq!(oversized.newest_section(), Some(1));
2938
2939            // Should be able to append to section 2 (now clean)
2940            let new_value: TestValue = [42; 16];
2941            let new_entry = TestEntry::new(42, 0, 0);
2942            let (pos, _, _) = oversized
2943                .append(2, new_entry, &new_value)
2944                .await
2945                .expect("Failed to append to section 2");
2946            assert_eq!(pos, 0); // First entry in new section
2947
2948            oversized.destroy().await.expect("Failed to destroy");
2949        });
2950    }
2951}