commonware_storage/journal/segmented/
oversized.rs

1//! Segmented journal for oversized values.
2//!
3//! This module combines [super::fixed::Journal] with [super::glob::Glob] to handle
4//! entries that reference variable-length "oversized" values. It provides coordinated
5//! operations and built-in crash recovery.
6//!
7//! # Architecture
8//!
9//! ```text
10//! +-------------------+     +-------------------+
11//! | Fixed Journal     |     | Glob (Values)     |
12//! | (Index Entries)   |     |                   |
13//! +-------------------+     +-------------------+
14//! | entry_0           | --> | value_0           |
15//! | entry_1           | --> | value_1           |
16//! | ...               |     | ...               |
17//! +-------------------+     +-------------------+
18//! ```
19//!
20//! Each index entry contains `(value_offset, value_size)` pointing to its value in glob.
21//!
22//! # Crash Recovery
23//!
24//! On unclean shutdown, the index journal and glob may have different lengths:
25//! - Index entry pointing to non-existent glob data (dangerous)
26//! - Glob value without index entry (orphan - acceptable but cleaned up)
27//! - Glob sections without corresponding index sections (orphan sections - removed)
28//!
29//! During initialization, crash recovery is performed:
30//! 1. Each index entry's glob reference is validated (`value_offset + value_size <= glob_size`)
31//! 2. Invalid entries are skipped and the index journal is rewound
32//! 3. Orphan value sections (sections in glob but not in index) are removed
33//!
34//! This allows async writes (glob first, then index) while ensuring consistency
35//! after recovery.
36//!
37//! _Recovery only validates that index entries point to valid byte ranges
38//! within the glob. It does **not** verify value checksums during recovery (this would
39//! require reading all values). Value checksums are verified lazily when values are
40//! read via `get_value()`. If the underlying storage is corrupted, `get_value()` will
41//! return a checksum error even though the index entry exists._
42
43use super::{
44    fixed::{Config as FixedConfig, Journal as FixedJournal},
45    glob::{Config as GlobConfig, Glob},
46};
47use crate::journal::Error;
48use commonware_codec::{Codec, CodecFixed, CodecShared};
49use commonware_runtime::{Metrics, Storage};
50use futures::{future::try_join, stream::Stream};
51use std::{collections::HashSet, num::NonZeroUsize};
52use tracing::{debug, warn};
53
54/// Trait for index entries that reference oversized values in glob storage.
55///
56/// Implementations must provide access to the value location for crash recovery validation,
57/// and a way to set the location when appending.
58pub trait Record: CodecFixed<Cfg = ()> + Clone {
59    /// Returns `(value_offset, value_size)` for crash recovery validation.
60    fn value_location(&self) -> (u64, u32);
61
62    /// Returns a new entry with the value location set.
63    ///
64    /// Called during `append` after the value is written to glob storage.
65    fn with_location(self, offset: u64, size: u32) -> Self;
66}
67
68/// Configuration for oversized journal.
69#[derive(Clone)]
70pub struct Config<C> {
71    /// Partition for the fixed index journal.
72    pub index_partition: String,
73
74    /// Partition for the glob value storage.
75    pub value_partition: String,
76
77    /// Buffer pool for index journal caching.
78    pub index_buffer_pool: commonware_runtime::buffer::PoolRef,
79
80    /// Write buffer size for the index journal.
81    pub index_write_buffer: NonZeroUsize,
82
83    /// Write buffer size for the value journal.
84    pub value_write_buffer: NonZeroUsize,
85
86    /// Optional compression level for values (using zstd).
87    pub compression: Option<u8>,
88
89    /// Codec configuration for values.
90    pub codec_config: C,
91}
92
93/// Segmented journal for entries with oversized values.
94///
95/// Combines a fixed-size index journal with glob storage for variable-length values.
96/// Provides coordinated operations and crash recovery.
97pub struct Oversized<E: Storage + Metrics, I: Record, V: Codec> {
98    index: FixedJournal<E, I>,
99    values: Glob<E, V>,
100}
101
102impl<E: Storage + Metrics, I: Record + Send + Sync, V: CodecShared> Oversized<E, I, V> {
103    /// Initialize with crash recovery validation.
104    ///
105    /// Validates each index entry's glob reference during replay. Invalid entries
106    /// (pointing beyond glob size) are skipped, and the index journal is rewound
107    /// to exclude trailing invalid entries.
108    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
109        // Initialize both journals
110        let index_cfg = FixedConfig {
111            partition: cfg.index_partition,
112            buffer_pool: cfg.index_buffer_pool,
113            write_buffer: cfg.index_write_buffer,
114        };
115        let index = FixedJournal::init(context.with_label("index"), index_cfg).await?;
116
117        let value_cfg = GlobConfig {
118            partition: cfg.value_partition,
119            compression: cfg.compression,
120            codec_config: cfg.codec_config,
121            write_buffer: cfg.value_write_buffer,
122        };
123        let values = Glob::init(context.with_label("values"), value_cfg).await?;
124
125        let mut oversized = Self { index, values };
126
127        // Perform crash recovery validation
128        oversized.recover().await?;
129
130        Ok(oversized)
131    }
132
133    /// Perform crash recovery by validating index entries against glob sizes.
134    ///
135    /// Only checks the last entry in each section. Since entries are appended sequentially
136    /// and value offsets are monotonically increasing within a section, if the last entry
137    /// is valid then all earlier entries must be valid too.
138    async fn recover(&mut self) -> Result<(), Error> {
139        let chunk_size = FixedJournal::<E, I>::CHUNK_SIZE as u64;
140        let sections: Vec<u64> = self.index.sections().collect();
141
142        for section in sections {
143            let index_size = self.index.size(section).await?;
144            if index_size == 0 {
145                continue;
146            }
147
148            let glob_size = match self.values.size(section).await {
149                Ok(size) => size,
150                Err(Error::AlreadyPrunedToSection(oldest)) => {
151                    // This shouldn't happen in normal operation: prune() prunes the index
152                    // first, then the glob. A crash between these would leave the glob
153                    // NOT pruned (opposite of this case). We handle this defensively in
154                    // case of external manipulation or future changes.
155                    warn!(
156                        section,
157                        oldest, "index has section that glob already pruned"
158                    );
159                    0
160                }
161                Err(e) => return Err(e),
162            };
163
164            // Truncate any trailing partial entry
165            let entry_count = index_size / chunk_size;
166            let aligned_size = entry_count * chunk_size;
167            if aligned_size < index_size {
168                warn!(
169                    section,
170                    index_size, aligned_size, "trailing bytes detected: truncating"
171                );
172                self.index.rewind_section(section, aligned_size).await?;
173            }
174
175            // If there is nothing, we can exit early and rewind values to 0
176            if entry_count == 0 {
177                warn!(
178                    section,
179                    index_size, "trailing bytes detected: truncating to 0"
180                );
181                self.values.rewind_section(section, 0).await?;
182                continue;
183            }
184
185            // Find last valid entry and target glob size
186            let (valid_count, glob_target) = self
187                .find_last_valid_entry(section, entry_count, glob_size)
188                .await;
189
190            // Rewind index if any entries are invalid
191            if valid_count < entry_count {
192                let valid_size = valid_count * chunk_size;
193                debug!(section, entry_count, valid_count, "rewinding index");
194                self.index.rewind_section(section, valid_size).await?;
195            }
196
197            // Truncate glob trailing garbage (can occur when value was written but
198            // index entry wasn't, or when index was truncated but glob wasn't)
199            if glob_size > glob_target {
200                debug!(
201                    section,
202                    glob_size, glob_target, "truncating glob trailing garbage"
203                );
204                self.values.rewind_section(section, glob_target).await?;
205            }
206        }
207
208        // Clean up orphan value sections that don't exist in index
209        self.cleanup_orphan_value_sections().await?;
210
211        Ok(())
212    }
213
214    /// Remove any value sections that don't have corresponding index sections.
215    ///
216    /// This can happen if a crash occurs after writing to values but before
217    /// writing to index for a new section. Since sections don't have to be
218    /// contiguous, we compare the actual sets of sections rather than just
219    /// comparing the newest section numbers.
220    async fn cleanup_orphan_value_sections(&mut self) -> Result<(), Error> {
221        // Collect index sections into a set for O(1) lookup
222        let index_sections: HashSet<u64> = self.index.sections().collect();
223
224        // Find value sections that don't exist in index
225        let orphan_sections: Vec<u64> = self
226            .values
227            .sections()
228            .filter(|s| !index_sections.contains(s))
229            .collect();
230
231        // Remove each orphan section
232        for section in orphan_sections {
233            warn!(section, "removing orphan value section");
234            self.values.remove_section(section).await?;
235        }
236
237        Ok(())
238    }
239
240    /// Find the number of valid entries and the corresponding glob target size.
241    ///
242    /// Scans backwards from the last entry until a valid one is found.
243    /// Returns `(valid_count, glob_target)` where `glob_target` is the end offset
244    /// of the last valid entry's value.
245    async fn find_last_valid_entry(
246        &self,
247        section: u64,
248        entry_count: u64,
249        glob_size: u64,
250    ) -> (u64, u64) {
251        for pos in (0..entry_count).rev() {
252            match self.index.get(section, pos).await {
253                Ok(entry) => {
254                    let (offset, size) = entry.value_location();
255                    let entry_end = offset.saturating_add(u64::from(size));
256                    if entry_end <= glob_size {
257                        return (pos + 1, entry_end);
258                    }
259                    if pos == entry_count - 1 {
260                        warn!(
261                            section,
262                            pos, glob_size, entry_end, "invalid entry: glob truncated"
263                        );
264                    }
265                }
266                Err(_) => {
267                    if pos == entry_count - 1 {
268                        warn!(section, pos, "corrupted last entry, scanning backwards");
269                    }
270                }
271            }
272        }
273        (0, 0)
274    }
275
276    /// Append entry + value.
277    ///
278    /// Writes value to glob first, then writes index entry with the value location.
279    ///
280    /// Returns `(position, offset, size)` where:
281    /// - `position`: Position in the index journal
282    /// - `offset`: Byte offset in glob
283    /// - `size`: Size of value in glob (including checksum)
284    pub async fn append(
285        &mut self,
286        section: u64,
287        entry: I,
288        value: &V,
289    ) -> Result<(u64, u64, u32), Error> {
290        // Write value first (glob). This will typically write to an in-memory
291        // buffer and return quickly (only blocks when the buffer is full).
292        let (offset, size) = self.values.append(section, value).await?;
293
294        // Update entry with actual location and write to index
295        let entry_with_location = entry.with_location(offset, size);
296        let position = self.index.append(section, entry_with_location).await?;
297
298        Ok((position, offset, size))
299    }
300
301    /// Get entry at position (index entry only, not value).
302    pub async fn get(&self, section: u64, position: u64) -> Result<I, Error> {
303        self.index.get(section, position).await
304    }
305
306    /// Get the last entry for a section, if any.
307    pub async fn last(&self, section: u64) -> Result<Option<I>, Error> {
308        self.index.last(section).await
309    }
310
311    /// Get value using offset/size from entry.
312    ///
313    /// The offset should be the byte offset from `append()` or from the entry's `value_location()`.
314    pub async fn get_value(&self, section: u64, offset: u64, size: u32) -> Result<V, Error> {
315        self.values.get(section, offset, size).await
316    }
317
318    /// Replay index entries starting from given section.
319    ///
320    /// Returns a stream of `(section, position, entry)` tuples.
321    pub async fn replay(
322        &self,
323        start_section: u64,
324        start_position: u64,
325        buffer: NonZeroUsize,
326    ) -> Result<impl Stream<Item = Result<(u64, u64, I), Error>> + Send + '_, Error> {
327        self.index
328            .replay(start_section, start_position, buffer)
329            .await
330    }
331
332    /// Sync both journals for given section.
333    pub async fn sync(&self, section: u64) -> Result<(), Error> {
334        try_join(self.index.sync(section), self.values.sync(section))
335            .await
336            .map(|_| ())
337    }
338
339    /// Sync all sections.
340    pub async fn sync_all(&self) -> Result<(), Error> {
341        try_join(self.index.sync_all(), self.values.sync_all())
342            .await
343            .map(|_| ())
344    }
345
346    /// Prune both journals. Returns true if any sections were pruned.
347    ///
348    /// Prunes index first, then glob. This order ensures crash safety:
349    /// - If crash after index prune but before glob: orphan data in glob (acceptable)
350    /// - If crash before index prune: no change, retry works
351    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
352        let index_pruned = self.index.prune(min).await?;
353        let value_pruned = self.values.prune(min).await?;
354        Ok(index_pruned || value_pruned)
355    }
356
357    /// Rewind both journals to a specific section and index size.
358    ///
359    /// This rewinds the section to the given index size and removes all sections
360    /// after the given section. The value size is derived from the last entry.
361    pub async fn rewind(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
362        // Rewind index first (this also removes sections after `section`)
363        self.index.rewind(section, index_size).await?;
364
365        // Derive value size from last entry
366        let value_size = match self.index.last(section).await? {
367            Some(entry) => {
368                let (offset, size) = entry.value_location();
369                offset
370                    .checked_add(u64::from(size))
371                    .ok_or(Error::OffsetOverflow)?
372            }
373            None => 0,
374        };
375
376        // Rewind values (this also removes sections after `section`)
377        self.values.rewind(section, value_size).await
378    }
379
380    /// Rewind only the given section to a specific index size.
381    ///
382    /// Unlike `rewind`, this does not affect other sections.
383    /// The value size is derived from the last entry after rewinding the index.
384    pub async fn rewind_section(&mut self, section: u64, index_size: u64) -> Result<(), Error> {
385        // Rewind index first
386        self.index.rewind_section(section, index_size).await?;
387
388        // Derive value size from last entry
389        let value_size = match self.index.last(section).await? {
390            Some(entry) => {
391                let (offset, size) = entry.value_location();
392                offset
393                    .checked_add(u64::from(size))
394                    .ok_or(Error::OffsetOverflow)?
395            }
396            None => 0,
397        };
398
399        // Rewind values
400        self.values.rewind_section(section, value_size).await
401    }
402
403    /// Get index size for checkpoint.
404    ///
405    /// The value size can be derived from the last entry's location when needed.
406    pub async fn size(&self, section: u64) -> Result<u64, Error> {
407        self.index.size(section).await
408    }
409
410    /// Get the value size for a section, derived from the last entry's location.
411    pub async fn value_size(&self, section: u64) -> Result<u64, Error> {
412        match self.index.last(section).await {
413            Ok(Some(entry)) => {
414                let (offset, size) = entry.value_location();
415                offset
416                    .checked_add(u64::from(size))
417                    .ok_or(Error::OffsetOverflow)
418            }
419            Ok(None) => Ok(0),
420            Err(Error::SectionOutOfRange(_)) => Ok(0),
421            Err(e) => Err(e),
422        }
423    }
424
425    /// Returns the oldest section number, if any exist.
426    pub fn oldest_section(&self) -> Option<u64> {
427        self.index.oldest_section()
428    }
429
430    /// Returns the newest section number, if any exist.
431    pub fn newest_section(&self) -> Option<u64> {
432        self.index.newest_section()
433    }
434
435    /// Destroy all underlying storage.
436    pub async fn destroy(self) -> Result<(), Error> {
437        try_join(self.index.destroy(), self.values.destroy())
438            .await
439            .map(|_| ())
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use bytes::{Buf, BufMut};
447    use commonware_codec::{FixedSize, Read, ReadExt, Write};
448    use commonware_cryptography::Crc32;
449    use commonware_macros::test_traced;
450    use commonware_runtime::{buffer::PoolRef, deterministic, Blob as _, Runner};
451    use commonware_utils::{NZUsize, NZU16};
452
453    /// Convert offset + size to byte end position (for truncation tests).
454    fn byte_end(offset: u64, size: u32) -> u64 {
455        offset + u64::from(size)
456    }
457
458    /// Test index entry that stores a u64 id and references a value.
459    #[derive(Debug, Clone, PartialEq)]
460    struct TestEntry {
461        id: u64,
462        value_offset: u64,
463        value_size: u32,
464    }
465
466    impl TestEntry {
467        fn new(id: u64, value_offset: u64, value_size: u32) -> Self {
468            Self {
469                id,
470                value_offset,
471                value_size,
472            }
473        }
474    }
475
476    impl Write for TestEntry {
477        fn write(&self, buf: &mut impl BufMut) {
478            self.id.write(buf);
479            self.value_offset.write(buf);
480            self.value_size.write(buf);
481        }
482    }
483
484    impl Read for TestEntry {
485        type Cfg = ();
486
487        fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
488            let id = u64::read(buf)?;
489            let value_offset = u64::read(buf)?;
490            let value_size = u32::read(buf)?;
491            Ok(Self {
492                id,
493                value_offset,
494                value_size,
495            })
496        }
497    }
498
499    impl FixedSize for TestEntry {
500        const SIZE: usize = u64::SIZE + u64::SIZE + u32::SIZE;
501    }
502
503    impl Record for TestEntry {
504        fn value_location(&self) -> (u64, u32) {
505            (self.value_offset, self.value_size)
506        }
507
508        fn with_location(mut self, offset: u64, size: u32) -> Self {
509            self.value_offset = offset;
510            self.value_size = size;
511            self
512        }
513    }
514
515    fn test_cfg() -> Config<()> {
516        Config {
517            index_partition: "test_index".to_string(),
518            value_partition: "test_values".to_string(),
519            index_buffer_pool: PoolRef::new(NZU16!(64), NZUsize!(8)),
520            index_write_buffer: NZUsize!(1024),
521            value_write_buffer: NZUsize!(1024),
522            compression: None,
523            codec_config: (),
524        }
525    }
526
527    /// Simple test value type with unit config.
528    type TestValue = [u8; 16];
529
530    #[test_traced]
531    fn test_oversized_append_and_get() {
532        let executor = deterministic::Runner::default();
533        executor.start(|context| async move {
534            let mut oversized: Oversized<_, TestEntry, TestValue> =
535                Oversized::init(context.clone(), test_cfg())
536                    .await
537                    .expect("Failed to init");
538
539            // Append entry with value
540            let value: TestValue = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
541            let entry = TestEntry::new(42, 0, 0);
542            let (position, offset, size) = oversized
543                .append(1, entry, &value)
544                .await
545                .expect("Failed to append");
546
547            assert_eq!(position, 0);
548
549            // Get entry
550            let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
551            assert_eq!(retrieved_entry.id, 42);
552
553            // Get value
554            let retrieved_value = oversized
555                .get_value(1, offset, size)
556                .await
557                .expect("Failed to get value");
558            assert_eq!(retrieved_value, value);
559
560            oversized.destroy().await.expect("Failed to destroy");
561        });
562    }
563
564    #[test_traced]
565    fn test_oversized_crash_recovery() {
566        let executor = deterministic::Runner::default();
567        executor.start(|context| async move {
568            let cfg = test_cfg();
569
570            // Create and populate oversized journal
571            let mut oversized: Oversized<_, TestEntry, TestValue> =
572                Oversized::init(context.clone(), cfg.clone())
573                    .await
574                    .expect("Failed to init");
575
576            // Append multiple entries
577            let mut locations = Vec::new();
578            for i in 0..5u8 {
579                let value: TestValue = [i; 16];
580                let entry = TestEntry::new(i as u64, 0, 0);
581                let (position, offset, size) = oversized
582                    .append(1, entry, &value)
583                    .await
584                    .expect("Failed to append");
585                locations.push((position, offset, size));
586            }
587            oversized.sync(1).await.expect("Failed to sync");
588            drop(oversized);
589
590            // Simulate crash: truncate glob to lose last 2 values
591            let (blob, _) = context
592                .open(&cfg.value_partition, &1u64.to_be_bytes())
593                .await
594                .expect("Failed to open blob");
595
596            // Calculate size to keep first 3 entries
597            let keep_size = byte_end(locations[2].1, locations[2].2);
598            blob.resize(keep_size).await.expect("Failed to truncate");
599            blob.sync().await.expect("Failed to sync");
600            drop(blob);
601
602            // Reinitialize - should recover and rewind index
603            let oversized: Oversized<_, TestEntry, TestValue> =
604                Oversized::init(context.clone(), cfg.clone())
605                    .await
606                    .expect("Failed to reinit");
607
608            // First 3 entries should still be valid
609            for i in 0..3u8 {
610                let (position, offset, size) = locations[i as usize];
611                let entry = oversized.get(1, position).await.expect("Failed to get");
612                assert_eq!(entry.id, i as u64);
613
614                let value = oversized
615                    .get_value(1, offset, size)
616                    .await
617                    .expect("Failed to get value");
618                assert_eq!(value, [i; 16]);
619            }
620
621            // Entry at position 3 should fail (index was rewound)
622            let result = oversized.get(1, 3).await;
623            assert!(result.is_err());
624
625            oversized.destroy().await.expect("Failed to destroy");
626        });
627    }
628
629    #[test_traced]
630    fn test_oversized_persistence() {
631        let executor = deterministic::Runner::default();
632        executor.start(|context| async move {
633            let cfg = test_cfg();
634
635            // Create and populate
636            let mut oversized: Oversized<_, TestEntry, TestValue> =
637                Oversized::init(context.clone(), cfg.clone())
638                    .await
639                    .expect("Failed to init");
640
641            let value: TestValue = [42; 16];
642            let entry = TestEntry::new(123, 0, 0);
643            let (position, offset, size) = oversized
644                .append(1, entry, &value)
645                .await
646                .expect("Failed to append");
647            oversized.sync(1).await.expect("Failed to sync");
648            drop(oversized);
649
650            // Reopen and verify
651            let oversized: Oversized<_, TestEntry, TestValue> =
652                Oversized::init(context.clone(), cfg)
653                    .await
654                    .expect("Failed to reinit");
655
656            let retrieved_entry = oversized.get(1, position).await.expect("Failed to get");
657            assert_eq!(retrieved_entry.id, 123);
658
659            let retrieved_value = oversized
660                .get_value(1, offset, size)
661                .await
662                .expect("Failed to get value");
663            assert_eq!(retrieved_value, value);
664
665            oversized.destroy().await.expect("Failed to destroy");
666        });
667    }
668
669    #[test_traced]
670    fn test_oversized_prune() {
671        let executor = deterministic::Runner::default();
672        executor.start(|context| async move {
673            let mut oversized: Oversized<_, TestEntry, TestValue> =
674                Oversized::init(context.clone(), test_cfg())
675                    .await
676                    .expect("Failed to init");
677
678            // Append to multiple sections
679            for section in 1u64..=5 {
680                let value: TestValue = [section as u8; 16];
681                let entry = TestEntry::new(section, 0, 0);
682                oversized
683                    .append(section, entry, &value)
684                    .await
685                    .expect("Failed to append");
686                oversized.sync(section).await.expect("Failed to sync");
687            }
688
689            // Prune sections < 3
690            oversized.prune(3).await.expect("Failed to prune");
691
692            // Sections 1, 2 should be gone
693            assert!(oversized.get(1, 0).await.is_err());
694            assert!(oversized.get(2, 0).await.is_err());
695
696            // Sections 3, 4, 5 should exist
697            assert!(oversized.get(3, 0).await.is_ok());
698            assert!(oversized.get(4, 0).await.is_ok());
699            assert!(oversized.get(5, 0).await.is_ok());
700
701            oversized.destroy().await.expect("Failed to destroy");
702        });
703    }
704
705    #[test_traced]
706    fn test_recovery_empty_section() {
707        let executor = deterministic::Runner::default();
708        executor.start(|context| async move {
709            let cfg = test_cfg();
710
711            // Create oversized journal
712            let mut oversized: Oversized<_, TestEntry, TestValue> =
713                Oversized::init(context.clone(), cfg.clone())
714                    .await
715                    .expect("Failed to init");
716
717            // Append to section 2 only (section 1 remains empty after being opened)
718            let value: TestValue = [42; 16];
719            let entry = TestEntry::new(1, 0, 0);
720            oversized
721                .append(2, entry, &value)
722                .await
723                .expect("Failed to append");
724            oversized.sync(2).await.expect("Failed to sync");
725            drop(oversized);
726
727            // Reinitialize - recovery should handle the empty/non-existent section 1
728            let oversized: Oversized<_, TestEntry, TestValue> =
729                Oversized::init(context.clone(), cfg)
730                    .await
731                    .expect("Failed to reinit");
732
733            // Section 2 entry should be valid
734            let entry = oversized.get(2, 0).await.expect("Failed to get");
735            assert_eq!(entry.id, 1);
736
737            oversized.destroy().await.expect("Failed to destroy");
738        });
739    }
740
741    #[test_traced]
742    fn test_recovery_all_entries_invalid() {
743        let executor = deterministic::Runner::default();
744        executor.start(|context| async move {
745            let cfg = test_cfg();
746
747            // Create and populate
748            let mut oversized: Oversized<_, TestEntry, TestValue> =
749                Oversized::init(context.clone(), cfg.clone())
750                    .await
751                    .expect("Failed to init");
752
753            // Append 5 entries
754            for i in 0..5u8 {
755                let value: TestValue = [i; 16];
756                let entry = TestEntry::new(i as u64, 0, 0);
757                oversized
758                    .append(1, entry, &value)
759                    .await
760                    .expect("Failed to append");
761            }
762            oversized.sync(1).await.expect("Failed to sync");
763            drop(oversized);
764
765            // Truncate glob to 0 bytes - ALL entries become invalid
766            let (blob, _) = context
767                .open(&cfg.value_partition, &1u64.to_be_bytes())
768                .await
769                .expect("Failed to open blob");
770            blob.resize(0).await.expect("Failed to truncate");
771            blob.sync().await.expect("Failed to sync");
772            drop(blob);
773
774            // Reinitialize - should recover and rewind index to 0
775            let mut oversized: Oversized<_, TestEntry, TestValue> =
776                Oversized::init(context.clone(), cfg)
777                    .await
778                    .expect("Failed to reinit");
779
780            // No entries should be accessible
781            let result = oversized.get(1, 0).await;
782            assert!(result.is_err());
783
784            // Should be able to append after recovery
785            let value: TestValue = [99; 16];
786            let entry = TestEntry::new(100, 0, 0);
787            let (pos, offset, size) = oversized
788                .append(1, entry, &value)
789                .await
790                .expect("Failed to append after recovery");
791            assert_eq!(pos, 0);
792
793            let retrieved = oversized.get(1, 0).await.expect("Failed to get");
794            assert_eq!(retrieved.id, 100);
795            let retrieved_value = oversized
796                .get_value(1, offset, size)
797                .await
798                .expect("Failed to get value");
799            assert_eq!(retrieved_value, value);
800
801            oversized.destroy().await.expect("Failed to destroy");
802        });
803    }
804
805    #[test_traced]
806    fn test_recovery_multiple_sections_mixed_validity() {
807        let executor = deterministic::Runner::default();
808        executor.start(|context| async move {
809            let cfg = test_cfg();
810
811            // Create and populate multiple sections
812            let mut oversized: Oversized<_, TestEntry, TestValue> =
813                Oversized::init(context.clone(), cfg.clone())
814                    .await
815                    .expect("Failed to init");
816
817            // Section 1: 3 entries
818            let mut section1_locations = Vec::new();
819            for i in 0..3u8 {
820                let value: TestValue = [i; 16];
821                let entry = TestEntry::new(i as u64, 0, 0);
822                let loc = oversized
823                    .append(1, entry, &value)
824                    .await
825                    .expect("Failed to append");
826                section1_locations.push(loc);
827            }
828            oversized.sync(1).await.expect("Failed to sync");
829
830            // Section 2: 5 entries
831            let mut section2_locations = Vec::new();
832            for i in 0..5u8 {
833                let value: TestValue = [10 + i; 16];
834                let entry = TestEntry::new(10 + i as u64, 0, 0);
835                let loc = oversized
836                    .append(2, entry, &value)
837                    .await
838                    .expect("Failed to append");
839                section2_locations.push(loc);
840            }
841            oversized.sync(2).await.expect("Failed to sync");
842
843            // Section 3: 2 entries
844            for i in 0..2u8 {
845                let value: TestValue = [20 + i; 16];
846                let entry = TestEntry::new(20 + i as u64, 0, 0);
847                oversized
848                    .append(3, entry, &value)
849                    .await
850                    .expect("Failed to append");
851            }
852            oversized.sync(3).await.expect("Failed to sync");
853            drop(oversized);
854
855            // Truncate section 1 glob to keep only first entry
856            let (blob, _) = context
857                .open(&cfg.value_partition, &1u64.to_be_bytes())
858                .await
859                .expect("Failed to open blob");
860            let keep_size = byte_end(section1_locations[0].1, section1_locations[0].2);
861            blob.resize(keep_size).await.expect("Failed to truncate");
862            blob.sync().await.expect("Failed to sync");
863            drop(blob);
864
865            // Truncate section 2 glob to keep first 3 entries
866            let (blob, _) = context
867                .open(&cfg.value_partition, &2u64.to_be_bytes())
868                .await
869                .expect("Failed to open blob");
870            let keep_size = byte_end(section2_locations[2].1, section2_locations[2].2);
871            blob.resize(keep_size).await.expect("Failed to truncate");
872            blob.sync().await.expect("Failed to sync");
873            drop(blob);
874
875            // Section 3 remains intact
876
877            // Reinitialize
878            let oversized: Oversized<_, TestEntry, TestValue> =
879                Oversized::init(context.clone(), cfg)
880                    .await
881                    .expect("Failed to reinit");
882
883            // Section 1: only position 0 valid
884            assert!(oversized.get(1, 0).await.is_ok());
885            assert!(oversized.get(1, 1).await.is_err());
886            assert!(oversized.get(1, 2).await.is_err());
887
888            // Section 2: positions 0,1,2 valid
889            assert!(oversized.get(2, 0).await.is_ok());
890            assert!(oversized.get(2, 1).await.is_ok());
891            assert!(oversized.get(2, 2).await.is_ok());
892            assert!(oversized.get(2, 3).await.is_err());
893            assert!(oversized.get(2, 4).await.is_err());
894
895            // Section 3: both positions valid
896            assert!(oversized.get(3, 0).await.is_ok());
897            assert!(oversized.get(3, 1).await.is_ok());
898
899            oversized.destroy().await.expect("Failed to destroy");
900        });
901    }
902
903    #[test_traced]
904    fn test_recovery_corrupted_last_index_entry() {
905        let executor = deterministic::Runner::default();
906        executor.start(|context| async move {
907            // Use page size = entry size so each entry is on its own page.
908            // This allows corrupting just the last entry's page without affecting others.
909            // Physical page size = TestEntry::SIZE (20) + 12 (CRC record) = 32 bytes.
910            let cfg = Config {
911                index_partition: "test_index".to_string(),
912                value_partition: "test_values".to_string(),
913                index_buffer_pool: PoolRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
914                index_write_buffer: NZUsize!(1024),
915                value_write_buffer: NZUsize!(1024),
916                compression: None,
917                codec_config: (),
918            };
919
920            // Create and populate
921            let mut oversized: Oversized<_, TestEntry, TestValue> =
922                Oversized::init(context.clone(), cfg.clone())
923                    .await
924                    .expect("Failed to init");
925
926            // Append 5 entries (each on its own page)
927            for i in 0..5u8 {
928                let value: TestValue = [i; 16];
929                let entry = TestEntry::new(i as u64, 0, 0);
930                oversized
931                    .append(1, entry, &value)
932                    .await
933                    .expect("Failed to append");
934            }
935            oversized.sync(1).await.expect("Failed to sync");
936            drop(oversized);
937
938            // Corrupt the last page's CRC to trigger page-level integrity failure
939            let (blob, size) = context
940                .open(&cfg.index_partition, &1u64.to_be_bytes())
941                .await
942                .expect("Failed to open blob");
943
944            // Physical page size = 20 + 12 = 32 bytes
945            // 5 entries = 5 pages = 160 bytes total
946            // Last page CRC starts at offset 160 - 12 = 148
947            assert_eq!(size, 160);
948            let last_page_crc_offset = size - 12;
949            blob.write_at(vec![0xFF; 12], last_page_crc_offset)
950                .await
951                .expect("Failed to corrupt");
952            blob.sync().await.expect("Failed to sync");
953            drop(blob);
954
955            // Reinitialize - should detect page corruption and truncate
956            let mut oversized: Oversized<_, TestEntry, TestValue> =
957                Oversized::init(context.clone(), cfg)
958                    .await
959                    .expect("Failed to reinit");
960
961            // First 4 entries should be valid (on pages 0-3)
962            for i in 0..4u8 {
963                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
964                assert_eq!(entry.id, i as u64);
965            }
966
967            // Entry 4 should be gone (its page was corrupted)
968            assert!(oversized.get(1, 4).await.is_err());
969
970            // Should be able to append after recovery
971            let value: TestValue = [99; 16];
972            let entry = TestEntry::new(100, 0, 0);
973            let (pos, offset, size) = oversized
974                .append(1, entry, &value)
975                .await
976                .expect("Failed to append after recovery");
977            assert_eq!(pos, 4);
978
979            let retrieved = oversized.get(1, 4).await.expect("Failed to get");
980            assert_eq!(retrieved.id, 100);
981            let retrieved_value = oversized
982                .get_value(1, offset, size)
983                .await
984                .expect("Failed to get value");
985            assert_eq!(retrieved_value, value);
986
987            oversized.destroy().await.expect("Failed to destroy");
988        });
989    }
990
991    #[test_traced]
992    fn test_recovery_all_entries_valid() {
993        let executor = deterministic::Runner::default();
994        executor.start(|context| async move {
995            let cfg = test_cfg();
996
997            // Create and populate
998            let mut oversized: Oversized<_, TestEntry, TestValue> =
999                Oversized::init(context.clone(), cfg.clone())
1000                    .await
1001                    .expect("Failed to init");
1002
1003            // Append entries to multiple sections
1004            for section in 1u64..=3 {
1005                for i in 0..10u8 {
1006                    let value: TestValue = [(section as u8) * 10 + i; 16];
1007                    let entry = TestEntry::new(section * 100 + i as u64, 0, 0);
1008                    oversized
1009                        .append(section, entry, &value)
1010                        .await
1011                        .expect("Failed to append");
1012                }
1013                oversized.sync(section).await.expect("Failed to sync");
1014            }
1015            drop(oversized);
1016
1017            // Reinitialize with no corruption - should be fast
1018            let oversized: Oversized<_, TestEntry, TestValue> =
1019                Oversized::init(context.clone(), cfg)
1020                    .await
1021                    .expect("Failed to reinit");
1022
1023            // All entries should be valid
1024            for section in 1u64..=3 {
1025                for i in 0..10u8 {
1026                    let entry = oversized
1027                        .get(section, i as u64)
1028                        .await
1029                        .expect("Failed to get");
1030                    assert_eq!(entry.id, section * 100 + i as u64);
1031                }
1032            }
1033
1034            oversized.destroy().await.expect("Failed to destroy");
1035        });
1036    }
1037
1038    #[test_traced]
1039    fn test_recovery_single_entry_invalid() {
1040        let executor = deterministic::Runner::default();
1041        executor.start(|context| async move {
1042            let cfg = test_cfg();
1043
1044            // Create and populate with single entry
1045            let mut oversized: Oversized<_, TestEntry, TestValue> =
1046                Oversized::init(context.clone(), cfg.clone())
1047                    .await
1048                    .expect("Failed to init");
1049
1050            let value: TestValue = [42; 16];
1051            let entry = TestEntry::new(1, 0, 0);
1052            oversized
1053                .append(1, entry, &value)
1054                .await
1055                .expect("Failed to append");
1056            oversized.sync(1).await.expect("Failed to sync");
1057            drop(oversized);
1058
1059            // Truncate glob to 0 - single entry becomes invalid
1060            let (blob, _) = context
1061                .open(&cfg.value_partition, &1u64.to_be_bytes())
1062                .await
1063                .expect("Failed to open blob");
1064            blob.resize(0).await.expect("Failed to truncate");
1065            blob.sync().await.expect("Failed to sync");
1066            drop(blob);
1067
1068            // Reinitialize
1069            let oversized: Oversized<_, TestEntry, TestValue> =
1070                Oversized::init(context.clone(), cfg)
1071                    .await
1072                    .expect("Failed to reinit");
1073
1074            // Entry should be gone
1075            assert!(oversized.get(1, 0).await.is_err());
1076
1077            oversized.destroy().await.expect("Failed to destroy");
1078        });
1079    }
1080
1081    #[test_traced]
1082    fn test_recovery_last_entry_off_by_one() {
1083        let executor = deterministic::Runner::default();
1084        executor.start(|context| async move {
1085            let cfg = test_cfg();
1086
1087            // Create and populate
1088            let mut oversized: Oversized<_, TestEntry, TestValue> =
1089                Oversized::init(context.clone(), cfg.clone())
1090                    .await
1091                    .expect("Failed to init");
1092
1093            let mut locations = Vec::new();
1094            for i in 0..3u8 {
1095                let value: TestValue = [i; 16];
1096                let entry = TestEntry::new(i as u64, 0, 0);
1097                let loc = oversized
1098                    .append(1, entry, &value)
1099                    .await
1100                    .expect("Failed to append");
1101                locations.push(loc);
1102            }
1103            oversized.sync(1).await.expect("Failed to sync");
1104            drop(oversized);
1105
1106            // Truncate glob to be off by 1 byte from last entry
1107            let (blob, _) = context
1108                .open(&cfg.value_partition, &1u64.to_be_bytes())
1109                .await
1110                .expect("Failed to open blob");
1111
1112            // Last entry needs: offset + size bytes
1113            // Truncate to offset + size - 1 (missing 1 byte)
1114            let last = &locations[2];
1115            let truncate_to = byte_end(last.1, last.2) - 1;
1116            blob.resize(truncate_to).await.expect("Failed to truncate");
1117            blob.sync().await.expect("Failed to sync");
1118            drop(blob);
1119
1120            // Reinitialize
1121            let mut oversized: Oversized<_, TestEntry, TestValue> =
1122                Oversized::init(context.clone(), cfg)
1123                    .await
1124                    .expect("Failed to reinit");
1125
1126            // First 2 entries should be valid
1127            assert!(oversized.get(1, 0).await.is_ok());
1128            assert!(oversized.get(1, 1).await.is_ok());
1129
1130            // Entry 2 should be gone (truncated)
1131            assert!(oversized.get(1, 2).await.is_err());
1132
1133            // Should be able to append after recovery
1134            let value: TestValue = [99; 16];
1135            let entry = TestEntry::new(100, 0, 0);
1136            let (pos, offset, size) = oversized
1137                .append(1, entry, &value)
1138                .await
1139                .expect("Failed to append after recovery");
1140            assert_eq!(pos, 2);
1141
1142            let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1143            assert_eq!(retrieved.id, 100);
1144            let retrieved_value = oversized
1145                .get_value(1, offset, size)
1146                .await
1147                .expect("Failed to get value");
1148            assert_eq!(retrieved_value, value);
1149
1150            oversized.destroy().await.expect("Failed to destroy");
1151        });
1152    }
1153
1154    #[test_traced]
1155    fn test_recovery_glob_missing_entirely() {
1156        let executor = deterministic::Runner::default();
1157        executor.start(|context| async move {
1158            let cfg = test_cfg();
1159
1160            // Create and populate
1161            let mut oversized: Oversized<_, TestEntry, TestValue> =
1162                Oversized::init(context.clone(), cfg.clone())
1163                    .await
1164                    .expect("Failed to init");
1165
1166            for i in 0..3u8 {
1167                let value: TestValue = [i; 16];
1168                let entry = TestEntry::new(i as u64, 0, 0);
1169                oversized
1170                    .append(1, entry, &value)
1171                    .await
1172                    .expect("Failed to append");
1173            }
1174            oversized.sync(1).await.expect("Failed to sync");
1175            drop(oversized);
1176
1177            // Delete the glob file entirely
1178            context
1179                .remove(&cfg.value_partition, Some(&1u64.to_be_bytes()))
1180                .await
1181                .expect("Failed to remove");
1182
1183            // Reinitialize - glob size will be 0, all entries invalid
1184            let oversized: Oversized<_, TestEntry, TestValue> =
1185                Oversized::init(context.clone(), cfg)
1186                    .await
1187                    .expect("Failed to reinit");
1188
1189            // All entries should be gone
1190            assert!(oversized.get(1, 0).await.is_err());
1191            assert!(oversized.get(1, 1).await.is_err());
1192            assert!(oversized.get(1, 2).await.is_err());
1193
1194            oversized.destroy().await.expect("Failed to destroy");
1195        });
1196    }
1197
1198    #[test_traced]
1199    fn test_recovery_can_append_after_recovery() {
1200        let executor = deterministic::Runner::default();
1201        executor.start(|context| async move {
1202            let cfg = test_cfg();
1203
1204            // Create and populate
1205            let mut oversized: Oversized<_, TestEntry, TestValue> =
1206                Oversized::init(context.clone(), cfg.clone())
1207                    .await
1208                    .expect("Failed to init");
1209
1210            let mut locations = Vec::new();
1211            for i in 0..5u8 {
1212                let value: TestValue = [i; 16];
1213                let entry = TestEntry::new(i as u64, 0, 0);
1214                let loc = oversized
1215                    .append(1, entry, &value)
1216                    .await
1217                    .expect("Failed to append");
1218                locations.push(loc);
1219            }
1220            oversized.sync(1).await.expect("Failed to sync");
1221            drop(oversized);
1222
1223            // Truncate glob to keep only first 2 entries
1224            let (blob, _) = context
1225                .open(&cfg.value_partition, &1u64.to_be_bytes())
1226                .await
1227                .expect("Failed to open blob");
1228            let keep_size = byte_end(locations[1].1, locations[1].2);
1229            blob.resize(keep_size).await.expect("Failed to truncate");
1230            blob.sync().await.expect("Failed to sync");
1231            drop(blob);
1232
1233            // Reinitialize
1234            let mut oversized: Oversized<_, TestEntry, TestValue> =
1235                Oversized::init(context.clone(), cfg.clone())
1236                    .await
1237                    .expect("Failed to reinit");
1238
1239            // Verify first 2 entries exist
1240            assert!(oversized.get(1, 0).await.is_ok());
1241            assert!(oversized.get(1, 1).await.is_ok());
1242            assert!(oversized.get(1, 2).await.is_err());
1243
1244            // Append new entries after recovery
1245            for i in 10..15u8 {
1246                let value: TestValue = [i; 16];
1247                let entry = TestEntry::new(i as u64, 0, 0);
1248                oversized
1249                    .append(1, entry, &value)
1250                    .await
1251                    .expect("Failed to append after recovery");
1252            }
1253            oversized.sync(1).await.expect("Failed to sync");
1254
1255            // Verify new entries at positions 2, 3, 4, 5, 6
1256            for i in 0..5u8 {
1257                let entry = oversized
1258                    .get(1, 2 + i as u64)
1259                    .await
1260                    .expect("Failed to get new entry");
1261                assert_eq!(entry.id, (10 + i) as u64);
1262            }
1263
1264            oversized.destroy().await.expect("Failed to destroy");
1265        });
1266    }
1267
1268    #[test_traced]
1269    fn test_recovery_glob_pruned_but_index_not() {
1270        let executor = deterministic::Runner::default();
1271        executor.start(|context| async move {
1272            let cfg = test_cfg();
1273
1274            // Create and populate multiple sections
1275            let mut oversized: Oversized<_, TestEntry, TestValue> =
1276                Oversized::init(context.clone(), cfg.clone())
1277                    .await
1278                    .expect("Failed to init");
1279
1280            for section in 1u64..=3 {
1281                let value: TestValue = [section as u8; 16];
1282                let entry = TestEntry::new(section, 0, 0);
1283                oversized
1284                    .append(section, entry, &value)
1285                    .await
1286                    .expect("Failed to append");
1287                oversized.sync(section).await.expect("Failed to sync");
1288            }
1289            drop(oversized);
1290
1291            // Simulate crash during prune: prune ONLY the glob, not the index
1292            // This creates the "glob pruned but index not" scenario
1293            use crate::journal::segmented::glob::{Config as GlobConfig, Glob};
1294            let glob_cfg = GlobConfig {
1295                partition: cfg.value_partition.clone(),
1296                compression: cfg.compression,
1297                codec_config: (),
1298                write_buffer: cfg.value_write_buffer,
1299            };
1300            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1301                .await
1302                .expect("Failed to init glob");
1303            glob.prune(2).await.expect("Failed to prune glob");
1304            glob.sync_all().await.expect("Failed to sync glob");
1305            drop(glob);
1306
1307            // Reinitialize - should recover gracefully with warning
1308            // Index section 1 will be rewound to 0 entries
1309            let oversized: Oversized<_, TestEntry, TestValue> =
1310                Oversized::init(context.clone(), cfg.clone())
1311                    .await
1312                    .expect("Failed to reinit");
1313
1314            // Section 1 entries should be gone (index rewound due to glob pruned)
1315            assert!(oversized.get(1, 0).await.is_err());
1316
1317            // Sections 2 and 3 should still be valid
1318            assert!(oversized.get(2, 0).await.is_ok());
1319            assert!(oversized.get(3, 0).await.is_ok());
1320
1321            oversized.destroy().await.expect("Failed to destroy");
1322        });
1323    }
1324
1325    #[test_traced]
1326    fn test_recovery_index_partition_deleted() {
1327        let executor = deterministic::Runner::default();
1328        executor.start(|context| async move {
1329            let cfg = test_cfg();
1330
1331            // Create and populate multiple sections
1332            let mut oversized: Oversized<_, TestEntry, TestValue> =
1333                Oversized::init(context.clone(), cfg.clone())
1334                    .await
1335                    .expect("Failed to init");
1336
1337            for section in 1u64..=3 {
1338                let value: TestValue = [section as u8; 16];
1339                let entry = TestEntry::new(section, 0, 0);
1340                oversized
1341                    .append(section, entry, &value)
1342                    .await
1343                    .expect("Failed to append");
1344                oversized.sync(section).await.expect("Failed to sync");
1345            }
1346            drop(oversized);
1347
1348            // Delete index blob for section 2 (simulate corruption/loss)
1349            context
1350                .remove(&cfg.index_partition, Some(&2u64.to_be_bytes()))
1351                .await
1352                .expect("Failed to remove index");
1353
1354            // Reinitialize - should handle gracefully
1355            // Section 2 is gone from index, orphan data in glob is acceptable
1356            let oversized: Oversized<_, TestEntry, TestValue> =
1357                Oversized::init(context.clone(), cfg.clone())
1358                    .await
1359                    .expect("Failed to reinit");
1360
1361            // Section 1 and 3 should still be valid
1362            assert!(oversized.get(1, 0).await.is_ok());
1363            assert!(oversized.get(3, 0).await.is_ok());
1364
1365            // Section 2 should be gone (index file deleted)
1366            assert!(oversized.get(2, 0).await.is_err());
1367
1368            oversized.destroy().await.expect("Failed to destroy");
1369        });
1370    }
1371
1372    #[test_traced]
1373    fn test_recovery_index_synced_but_glob_not() {
1374        let executor = deterministic::Runner::default();
1375        executor.start(|context| async move {
1376            let cfg = test_cfg();
1377
1378            // Create and populate
1379            let mut oversized: Oversized<_, TestEntry, TestValue> =
1380                Oversized::init(context.clone(), cfg.clone())
1381                    .await
1382                    .expect("Failed to init");
1383
1384            // Append entries and sync
1385            let mut locations = Vec::new();
1386            for i in 0..3u8 {
1387                let value: TestValue = [i; 16];
1388                let entry = TestEntry::new(i as u64, 0, 0);
1389                let loc = oversized
1390                    .append(1, entry, &value)
1391                    .await
1392                    .expect("Failed to append");
1393                locations.push(loc);
1394            }
1395            oversized.sync(1).await.expect("Failed to sync");
1396
1397            // Add more entries WITHOUT syncing (simulates unsynced writes)
1398            for i in 10..15u8 {
1399                let value: TestValue = [i; 16];
1400                let entry = TestEntry::new(i as u64, 0, 0);
1401                oversized
1402                    .append(1, entry, &value)
1403                    .await
1404                    .expect("Failed to append");
1405            }
1406            // Note: NOT calling sync() here
1407            drop(oversized);
1408
1409            // Simulate crash where index was synced but glob wasn't:
1410            // Truncate glob back to the synced size (3 entries)
1411            let (blob, _) = context
1412                .open(&cfg.value_partition, &1u64.to_be_bytes())
1413                .await
1414                .expect("Failed to open blob");
1415            let synced_size = byte_end(locations[2].1, locations[2].2);
1416            blob.resize(synced_size).await.expect("Failed to truncate");
1417            blob.sync().await.expect("Failed to sync");
1418            drop(blob);
1419
1420            // Reinitialize - should rewind index to match glob
1421            let oversized: Oversized<_, TestEntry, TestValue> =
1422                Oversized::init(context.clone(), cfg)
1423                    .await
1424                    .expect("Failed to reinit");
1425
1426            // First 3 entries should be valid
1427            for i in 0..3u8 {
1428                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1429                assert_eq!(entry.id, i as u64);
1430            }
1431
1432            // Entries 3-7 should be gone (unsynced, index rewound)
1433            assert!(oversized.get(1, 3).await.is_err());
1434
1435            oversized.destroy().await.expect("Failed to destroy");
1436        });
1437    }
1438
1439    #[test_traced]
1440    fn test_recovery_glob_synced_but_index_not() {
1441        let executor = deterministic::Runner::default();
1442        executor.start(|context| async move {
1443            // Use page size = entry size so each entry is exactly one page.
1444            // This allows truncating by entry count to equal truncating by full pages,
1445            // maintaining page-level integrity.
1446            let cfg = Config {
1447                index_partition: "test_index".to_string(),
1448                value_partition: "test_values".to_string(),
1449                index_buffer_pool: PoolRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
1450                index_write_buffer: NZUsize!(1024),
1451                value_write_buffer: NZUsize!(1024),
1452                compression: None,
1453                codec_config: (),
1454            };
1455
1456            // Create and populate
1457            let mut oversized: Oversized<_, TestEntry, TestValue> =
1458                Oversized::init(context.clone(), cfg.clone())
1459                    .await
1460                    .expect("Failed to init");
1461
1462            // Append entries and sync
1463            let mut locations = Vec::new();
1464            for i in 0..3u8 {
1465                let value: TestValue = [i; 16];
1466                let entry = TestEntry::new(i as u64, 0, 0);
1467                let loc = oversized
1468                    .append(1, entry, &value)
1469                    .await
1470                    .expect("Failed to append");
1471                locations.push(loc);
1472            }
1473            oversized.sync(1).await.expect("Failed to sync");
1474            drop(oversized);
1475
1476            // Simulate crash: truncate INDEX but leave GLOB intact
1477            // This creates orphan data in glob (glob ahead of index)
1478            let (blob, _size) = context
1479                .open(&cfg.index_partition, &1u64.to_be_bytes())
1480                .await
1481                .expect("Failed to open blob");
1482
1483            // Keep only first 2 index entries (2 full pages)
1484            // Physical page size = logical (20) + CRC record (12) = 32 bytes
1485            let physical_page_size = (TestEntry::SIZE + 12) as u64;
1486            blob.resize(2 * physical_page_size)
1487                .await
1488                .expect("Failed to truncate");
1489            blob.sync().await.expect("Failed to sync");
1490            drop(blob);
1491
1492            // Reinitialize - glob has orphan data from entry 3
1493            let mut oversized: Oversized<_, TestEntry, TestValue> =
1494                Oversized::init(context.clone(), cfg.clone())
1495                    .await
1496                    .expect("Failed to reinit");
1497
1498            // First 2 entries should be valid
1499            for i in 0..2u8 {
1500                let (position, offset, size) = locations[i as usize];
1501                let entry = oversized.get(1, position).await.expect("Failed to get");
1502                assert_eq!(entry.id, i as u64);
1503
1504                let value = oversized
1505                    .get_value(1, offset, size)
1506                    .await
1507                    .expect("Failed to get value");
1508                assert_eq!(value, [i; 16]);
1509            }
1510
1511            // Entry at position 2 should fail (index was truncated)
1512            assert!(oversized.get(1, 2).await.is_err());
1513
1514            // Append new entries - should work despite orphan data in glob
1515            let mut new_locations = Vec::new();
1516            for i in 10..13u8 {
1517                let value: TestValue = [i; 16];
1518                let entry = TestEntry::new(i as u64, 0, 0);
1519                let (position, offset, size) = oversized
1520                    .append(1, entry, &value)
1521                    .await
1522                    .expect("Failed to append after recovery");
1523
1524                // New entries start at position 2 (after the 2 valid entries)
1525                assert_eq!(position, (i - 10 + 2) as u64);
1526                new_locations.push((position, offset, size, i));
1527
1528                // Verify we can read the new entry
1529                let retrieved = oversized.get(1, position).await.expect("Failed to get");
1530                assert_eq!(retrieved.id, i as u64);
1531
1532                let retrieved_value = oversized
1533                    .get_value(1, offset, size)
1534                    .await
1535                    .expect("Failed to get value");
1536                assert_eq!(retrieved_value, value);
1537            }
1538
1539            // Sync and restart again to verify persistence with orphan data
1540            oversized.sync(1).await.expect("Failed to sync");
1541            drop(oversized);
1542
1543            // Reinitialize after adding data on top of orphan glob data
1544            let oversized: Oversized<_, TestEntry, TestValue> =
1545                Oversized::init(context.clone(), cfg)
1546                    .await
1547                    .expect("Failed to reinit after append");
1548
1549            // Read all valid entries in the index
1550            // First 2 entries from original data
1551            for i in 0..2u8 {
1552                let (position, offset, size) = locations[i as usize];
1553                let entry = oversized.get(1, position).await.expect("Failed to get");
1554                assert_eq!(entry.id, i as u64);
1555
1556                let value = oversized
1557                    .get_value(1, offset, size)
1558                    .await
1559                    .expect("Failed to get value");
1560                assert_eq!(value, [i; 16]);
1561            }
1562
1563            // New entries added after recovery
1564            for (position, offset, size, expected_id) in &new_locations {
1565                let entry = oversized
1566                    .get(1, *position)
1567                    .await
1568                    .expect("Failed to get new entry after restart");
1569                assert_eq!(entry.id, *expected_id as u64);
1570
1571                let value = oversized
1572                    .get_value(1, *offset, *size)
1573                    .await
1574                    .expect("Failed to get new value after restart");
1575                assert_eq!(value, [*expected_id; 16]);
1576            }
1577
1578            // Verify total entry count: 2 original + 3 new = 5
1579            assert!(oversized.get(1, 4).await.is_ok());
1580            assert!(oversized.get(1, 5).await.is_err());
1581
1582            oversized.destroy().await.expect("Failed to destroy");
1583        });
1584    }
1585
1586    #[test_traced]
1587    fn test_recovery_partial_index_entry() {
1588        let executor = deterministic::Runner::default();
1589        executor.start(|context| async move {
1590            let cfg = test_cfg();
1591
1592            // Create and populate
1593            let mut oversized: Oversized<_, TestEntry, TestValue> =
1594                Oversized::init(context.clone(), cfg.clone())
1595                    .await
1596                    .expect("Failed to init");
1597
1598            // Append 3 entries
1599            for i in 0..3u8 {
1600                let value: TestValue = [i; 16];
1601                let entry = TestEntry::new(i as u64, 0, 0);
1602                oversized
1603                    .append(1, entry, &value)
1604                    .await
1605                    .expect("Failed to append");
1606            }
1607            oversized.sync(1).await.expect("Failed to sync");
1608            drop(oversized);
1609
1610            // Simulate crash during write: truncate index to partial entry
1611            // Each entry is TestEntry::SIZE (20) + 4 (CRC32) = 24 bytes
1612            // Truncate to 3 full entries + 10 bytes of partial entry
1613            let (blob, _) = context
1614                .open(&cfg.index_partition, &1u64.to_be_bytes())
1615                .await
1616                .expect("Failed to open blob");
1617            let partial_size = 3 * 24 + 10; // 3 full entries + partial
1618            blob.resize(partial_size).await.expect("Failed to resize");
1619            blob.sync().await.expect("Failed to sync");
1620            drop(blob);
1621
1622            // Reinitialize - should handle partial entry gracefully
1623            let mut oversized: Oversized<_, TestEntry, TestValue> =
1624                Oversized::init(context.clone(), cfg.clone())
1625                    .await
1626                    .expect("Failed to reinit");
1627
1628            // First 3 entries should still be valid
1629            for i in 0..3u8 {
1630                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1631                assert_eq!(entry.id, i as u64);
1632            }
1633
1634            // Entry 3 should not exist (partial entry was removed)
1635            assert!(oversized.get(1, 3).await.is_err());
1636
1637            // Append new entry after recovery
1638            let value: TestValue = [42; 16];
1639            let entry = TestEntry::new(100, 0, 0);
1640            let (pos, offset, size) = oversized
1641                .append(1, entry, &value)
1642                .await
1643                .expect("Failed to append after recovery");
1644            assert_eq!(pos, 3);
1645
1646            // Verify we can read the new entry
1647            let retrieved = oversized.get(1, 3).await.expect("Failed to get new entry");
1648            assert_eq!(retrieved.id, 100);
1649            let retrieved_value = oversized
1650                .get_value(1, offset, size)
1651                .await
1652                .expect("Failed to get new value");
1653            assert_eq!(retrieved_value, value);
1654
1655            oversized.destroy().await.expect("Failed to destroy");
1656        });
1657    }
1658
1659    #[test_traced]
1660    fn test_recovery_only_partial_entry() {
1661        let executor = deterministic::Runner::default();
1662        executor.start(|context| async move {
1663            let cfg = test_cfg();
1664
1665            // Create and populate with single entry
1666            let mut oversized: Oversized<_, TestEntry, TestValue> =
1667                Oversized::init(context.clone(), cfg.clone())
1668                    .await
1669                    .expect("Failed to init");
1670
1671            let value: TestValue = [42; 16];
1672            let entry = TestEntry::new(1, 0, 0);
1673            oversized
1674                .append(1, entry, &value)
1675                .await
1676                .expect("Failed to append");
1677            oversized.sync(1).await.expect("Failed to sync");
1678            drop(oversized);
1679
1680            // Truncate index to only partial data (less than one full entry)
1681            let (blob, _) = context
1682                .open(&cfg.index_partition, &1u64.to_be_bytes())
1683                .await
1684                .expect("Failed to open blob");
1685            blob.resize(10).await.expect("Failed to resize"); // Less than chunk size
1686            blob.sync().await.expect("Failed to sync");
1687            drop(blob);
1688
1689            // Reinitialize - should handle gracefully (rewind to 0)
1690            let mut oversized: Oversized<_, TestEntry, TestValue> =
1691                Oversized::init(context.clone(), cfg.clone())
1692                    .await
1693                    .expect("Failed to reinit");
1694
1695            // No entries should exist
1696            assert!(oversized.get(1, 0).await.is_err());
1697
1698            // Should be able to append after recovery
1699            let value: TestValue = [99; 16];
1700            let entry = TestEntry::new(100, 0, 0);
1701            let (pos, offset, size) = oversized
1702                .append(1, entry, &value)
1703                .await
1704                .expect("Failed to append after recovery");
1705            assert_eq!(pos, 0);
1706
1707            let retrieved = oversized.get(1, 0).await.expect("Failed to get");
1708            assert_eq!(retrieved.id, 100);
1709            let retrieved_value = oversized
1710                .get_value(1, offset, size)
1711                .await
1712                .expect("Failed to get value");
1713            assert_eq!(retrieved_value, value);
1714
1715            oversized.destroy().await.expect("Failed to destroy");
1716        });
1717    }
1718
1719    #[test_traced]
1720    fn test_recovery_crash_during_rewind_index_ahead() {
1721        // Simulates crash where index was rewound but glob wasn't
1722        let executor = deterministic::Runner::default();
1723        executor.start(|context| async move {
1724            // Use page size = entry size so each entry is exactly one page.
1725            // This allows truncating by entry count to equal truncating by full pages,
1726            // maintaining page-level integrity.
1727            let cfg = Config {
1728                index_partition: "test_index".to_string(),
1729                value_partition: "test_values".to_string(),
1730                index_buffer_pool: PoolRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
1731                index_write_buffer: NZUsize!(1024),
1732                value_write_buffer: NZUsize!(1024),
1733                compression: None,
1734                codec_config: (),
1735            };
1736
1737            // Create and populate
1738            let mut oversized: Oversized<_, TestEntry, TestValue> =
1739                Oversized::init(context.clone(), cfg.clone())
1740                    .await
1741                    .expect("Failed to init");
1742
1743            let mut locations = Vec::new();
1744            for i in 0..5u8 {
1745                let value: TestValue = [i; 16];
1746                let entry = TestEntry::new(i as u64, 0, 0);
1747                let loc = oversized
1748                    .append(1, entry, &value)
1749                    .await
1750                    .expect("Failed to append");
1751                locations.push(loc);
1752            }
1753            oversized.sync(1).await.expect("Failed to sync");
1754            drop(oversized);
1755
1756            // Simulate crash during rewind: truncate index to 2 entries but leave glob intact
1757            // This simulates: rewind(index) succeeded, crash before rewind(glob)
1758            let (blob, _) = context
1759                .open(&cfg.index_partition, &1u64.to_be_bytes())
1760                .await
1761                .expect("Failed to open blob");
1762            // Physical page size = logical (20) + CRC record (12) = 32 bytes
1763            let physical_page_size = (TestEntry::SIZE + 12) as u64;
1764            blob.resize(2 * physical_page_size)
1765                .await
1766                .expect("Failed to truncate");
1767            blob.sync().await.expect("Failed to sync");
1768            drop(blob);
1769
1770            // Reinitialize - recovery should succeed (glob has orphan data)
1771            let mut oversized: Oversized<_, TestEntry, TestValue> =
1772                Oversized::init(context.clone(), cfg.clone())
1773                    .await
1774                    .expect("Failed to reinit");
1775
1776            // First 2 entries should be valid
1777            for i in 0..2u8 {
1778                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1779                assert_eq!(entry.id, i as u64);
1780            }
1781
1782            // Entries 2-4 should be gone (index was truncated)
1783            assert!(oversized.get(1, 2).await.is_err());
1784
1785            // Should be able to append new entries
1786            let (pos, _, _) = oversized
1787                .append(1, TestEntry::new(100, 0, 0), &[100u8; 16])
1788                .await
1789                .expect("Failed to append");
1790            assert_eq!(pos, 2);
1791
1792            oversized.destroy().await.expect("Failed to destroy");
1793        });
1794    }
1795
1796    #[test_traced]
1797    fn test_recovery_crash_during_rewind_glob_ahead() {
1798        // Simulates crash where glob was rewound but index wasn't
1799        let executor = deterministic::Runner::default();
1800        executor.start(|context| async move {
1801            let cfg = test_cfg();
1802
1803            // Create and populate
1804            let mut oversized: Oversized<_, TestEntry, TestValue> =
1805                Oversized::init(context.clone(), cfg.clone())
1806                    .await
1807                    .expect("Failed to init");
1808
1809            let mut locations = Vec::new();
1810            for i in 0..5u8 {
1811                let value: TestValue = [i; 16];
1812                let entry = TestEntry::new(i as u64, 0, 0);
1813                let loc = oversized
1814                    .append(1, entry, &value)
1815                    .await
1816                    .expect("Failed to append");
1817                locations.push(loc);
1818            }
1819            oversized.sync(1).await.expect("Failed to sync");
1820            drop(oversized);
1821
1822            // Simulate crash during rewind: truncate glob to 2 entries but leave index intact
1823            // This simulates: rewind(glob) succeeded, crash before rewind(index)
1824            let (blob, _) = context
1825                .open(&cfg.value_partition, &1u64.to_be_bytes())
1826                .await
1827                .expect("Failed to open blob");
1828            let keep_size = byte_end(locations[1].1, locations[1].2);
1829            blob.resize(keep_size).await.expect("Failed to truncate");
1830            blob.sync().await.expect("Failed to sync");
1831            drop(blob);
1832
1833            // Reinitialize - recovery should detect index entries pointing beyond glob
1834            let mut oversized: Oversized<_, TestEntry, TestValue> =
1835                Oversized::init(context.clone(), cfg.clone())
1836                    .await
1837                    .expect("Failed to reinit");
1838
1839            // First 2 entries should be valid (index rewound to match glob)
1840            for i in 0..2u8 {
1841                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
1842                assert_eq!(entry.id, i as u64);
1843            }
1844
1845            // Entries 2-4 should be gone (index rewound during recovery)
1846            assert!(oversized.get(1, 2).await.is_err());
1847
1848            // Should be able to append after recovery
1849            let value: TestValue = [99; 16];
1850            let entry = TestEntry::new(100, 0, 0);
1851            let (pos, offset, size) = oversized
1852                .append(1, entry, &value)
1853                .await
1854                .expect("Failed to append after recovery");
1855            assert_eq!(pos, 2);
1856
1857            let retrieved = oversized.get(1, 2).await.expect("Failed to get");
1858            assert_eq!(retrieved.id, 100);
1859            let retrieved_value = oversized
1860                .get_value(1, offset, size)
1861                .await
1862                .expect("Failed to get value");
1863            assert_eq!(retrieved_value, value);
1864
1865            oversized.destroy().await.expect("Failed to destroy");
1866        });
1867    }
1868
1869    #[test_traced]
1870    fn test_oversized_get_value_invalid_size() {
1871        let executor = deterministic::Runner::default();
1872        executor.start(|context| async move {
1873            let mut oversized: Oversized<_, TestEntry, TestValue> =
1874                Oversized::init(context.clone(), test_cfg())
1875                    .await
1876                    .expect("Failed to init");
1877
1878            let value: TestValue = [42; 16];
1879            let entry = TestEntry::new(1, 0, 0);
1880            let (_, offset, _size) = oversized
1881                .append(1, entry, &value)
1882                .await
1883                .expect("Failed to append");
1884            oversized.sync(1).await.expect("Failed to sync");
1885
1886            // Size 0 - should fail
1887            assert!(oversized.get_value(1, offset, 0).await.is_err());
1888
1889            // Size < value size - should fail with codec error, checksum mismatch, or
1890            // insufficient length (if size < 4 bytes for checksum)
1891            for size in 1..4u32 {
1892                let result = oversized.get_value(1, offset, size).await;
1893                assert!(
1894                    matches!(
1895                        result,
1896                        Err(Error::Codec(_))
1897                            | Err(Error::ChecksumMismatch(_, _))
1898                            | Err(Error::Runtime(_))
1899                    ),
1900                    "expected error, got: {:?}",
1901                    result
1902                );
1903            }
1904
1905            oversized.destroy().await.expect("Failed to destroy");
1906        });
1907    }
1908
1909    #[test_traced]
1910    fn test_oversized_get_value_wrong_size() {
1911        let executor = deterministic::Runner::default();
1912        executor.start(|context| async move {
1913            let mut oversized: Oversized<_, TestEntry, TestValue> =
1914                Oversized::init(context.clone(), test_cfg())
1915                    .await
1916                    .expect("Failed to init");
1917
1918            let value: TestValue = [42; 16];
1919            let entry = TestEntry::new(1, 0, 0);
1920            let (_, offset, correct_size) = oversized
1921                .append(1, entry, &value)
1922                .await
1923                .expect("Failed to append");
1924            oversized.sync(1).await.expect("Failed to sync");
1925
1926            // Size too small - will fail to decode or checksum mismatch
1927            // (checksum mismatch can occur because we read wrong bytes as the checksum)
1928            let result = oversized.get_value(1, offset, correct_size - 1).await;
1929            assert!(
1930                matches!(
1931                    result,
1932                    Err(Error::Codec(_)) | Err(Error::ChecksumMismatch(_, _))
1933                ),
1934                "expected Codec or ChecksumMismatch error, got: {:?}",
1935                result
1936            );
1937
1938            oversized.destroy().await.expect("Failed to destroy");
1939        });
1940    }
1941
1942    #[test_traced]
1943    fn test_recovery_values_has_orphan_section() {
1944        let executor = deterministic::Runner::default();
1945        executor.start(|context| async move {
1946            let cfg = test_cfg();
1947
1948            // Create and populate with sections 1 and 2
1949            let mut oversized: Oversized<_, TestEntry, TestValue> =
1950                Oversized::init(context.clone(), cfg.clone())
1951                    .await
1952                    .expect("Failed to init");
1953
1954            for section in 1u64..=2 {
1955                let value: TestValue = [section as u8; 16];
1956                let entry = TestEntry::new(section, 0, 0);
1957                oversized
1958                    .append(section, entry, &value)
1959                    .await
1960                    .expect("Failed to append");
1961                oversized.sync(section).await.expect("Failed to sync");
1962            }
1963            drop(oversized);
1964
1965            // Manually create an orphan value section (section 3) without corresponding index
1966            let glob_cfg = GlobConfig {
1967                partition: cfg.value_partition.clone(),
1968                compression: cfg.compression,
1969                codec_config: (),
1970                write_buffer: cfg.value_write_buffer,
1971            };
1972            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
1973                .await
1974                .expect("Failed to init glob");
1975            let orphan_value: TestValue = [99; 16];
1976            glob.append(3, &orphan_value)
1977                .await
1978                .expect("Failed to append orphan");
1979            glob.sync(3).await.expect("Failed to sync glob");
1980            drop(glob);
1981
1982            // Reinitialize - should detect and remove the orphan section
1983            let oversized: Oversized<_, TestEntry, TestValue> =
1984                Oversized::init(context.clone(), cfg.clone())
1985                    .await
1986                    .expect("Failed to reinit");
1987
1988            // Sections 1 and 2 should still be valid
1989            assert!(oversized.get(1, 0).await.is_ok());
1990            assert!(oversized.get(2, 0).await.is_ok());
1991
1992            // Newest section should be 2 (orphan was removed)
1993            assert_eq!(oversized.newest_section(), Some(2));
1994
1995            oversized.destroy().await.expect("Failed to destroy");
1996        });
1997    }
1998
1999    #[test_traced]
2000    fn test_recovery_values_has_multiple_orphan_sections() {
2001        let executor = deterministic::Runner::default();
2002        executor.start(|context| async move {
2003            let cfg = test_cfg();
2004
2005            // Create and populate with only section 1
2006            let mut oversized: Oversized<_, TestEntry, TestValue> =
2007                Oversized::init(context.clone(), cfg.clone())
2008                    .await
2009                    .expect("Failed to init");
2010
2011            let value: TestValue = [1; 16];
2012            let entry = TestEntry::new(1, 0, 0);
2013            oversized
2014                .append(1, entry, &value)
2015                .await
2016                .expect("Failed to append");
2017            oversized.sync(1).await.expect("Failed to sync");
2018            drop(oversized);
2019
2020            // Manually create multiple orphan value sections (2, 3, 4)
2021            let glob_cfg = GlobConfig {
2022                partition: cfg.value_partition.clone(),
2023                compression: cfg.compression,
2024                codec_config: (),
2025                write_buffer: cfg.value_write_buffer,
2026            };
2027            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2028                .await
2029                .expect("Failed to init glob");
2030
2031            for section in 2u64..=4 {
2032                let orphan_value: TestValue = [section as u8; 16];
2033                glob.append(section, &orphan_value)
2034                    .await
2035                    .expect("Failed to append orphan");
2036                glob.sync(section).await.expect("Failed to sync glob");
2037            }
2038            drop(glob);
2039
2040            // Reinitialize - should detect and remove all orphan sections
2041            let oversized: Oversized<_, TestEntry, TestValue> =
2042                Oversized::init(context.clone(), cfg.clone())
2043                    .await
2044                    .expect("Failed to reinit");
2045
2046            // Section 1 should still be valid
2047            assert!(oversized.get(1, 0).await.is_ok());
2048
2049            // Newest section should be 1 (orphans removed)
2050            assert_eq!(oversized.newest_section(), Some(1));
2051
2052            oversized.destroy().await.expect("Failed to destroy");
2053        });
2054    }
2055
2056    #[test_traced]
2057    fn test_recovery_index_empty_but_values_exist() {
2058        let executor = deterministic::Runner::default();
2059        executor.start(|context| async move {
2060            let cfg = test_cfg();
2061
2062            // Manually create value sections without any index entries
2063            let glob_cfg = GlobConfig {
2064                partition: cfg.value_partition.clone(),
2065                compression: cfg.compression,
2066                codec_config: (),
2067                write_buffer: cfg.value_write_buffer,
2068            };
2069            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2070                .await
2071                .expect("Failed to init glob");
2072
2073            for section in 1u64..=3 {
2074                let orphan_value: TestValue = [section as u8; 16];
2075                glob.append(section, &orphan_value)
2076                    .await
2077                    .expect("Failed to append orphan");
2078                glob.sync(section).await.expect("Failed to sync glob");
2079            }
2080            drop(glob);
2081
2082            // Initialize oversized - should remove all orphan value sections
2083            let oversized: Oversized<_, TestEntry, TestValue> =
2084                Oversized::init(context.clone(), cfg.clone())
2085                    .await
2086                    .expect("Failed to init");
2087
2088            // No sections should exist
2089            assert_eq!(oversized.newest_section(), None);
2090            assert_eq!(oversized.oldest_section(), None);
2091
2092            oversized.destroy().await.expect("Failed to destroy");
2093        });
2094    }
2095
2096    #[test_traced]
2097    fn test_recovery_orphan_section_append_after() {
2098        let executor = deterministic::Runner::default();
2099        executor.start(|context| async move {
2100            let cfg = test_cfg();
2101
2102            // Create and populate with section 1
2103            let mut oversized: Oversized<_, TestEntry, TestValue> =
2104                Oversized::init(context.clone(), cfg.clone())
2105                    .await
2106                    .expect("Failed to init");
2107
2108            let value: TestValue = [1; 16];
2109            let entry = TestEntry::new(1, 0, 0);
2110            let (_, offset1, size1) = oversized
2111                .append(1, entry, &value)
2112                .await
2113                .expect("Failed to append");
2114            oversized.sync(1).await.expect("Failed to sync");
2115            drop(oversized);
2116
2117            // Manually create orphan value sections (2, 3)
2118            let glob_cfg = GlobConfig {
2119                partition: cfg.value_partition.clone(),
2120                compression: cfg.compression,
2121                codec_config: (),
2122                write_buffer: cfg.value_write_buffer,
2123            };
2124            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2125                .await
2126                .expect("Failed to init glob");
2127
2128            for section in 2u64..=3 {
2129                let orphan_value: TestValue = [section as u8; 16];
2130                glob.append(section, &orphan_value)
2131                    .await
2132                    .expect("Failed to append orphan");
2133                glob.sync(section).await.expect("Failed to sync glob");
2134            }
2135            drop(glob);
2136
2137            // Reinitialize - should remove orphan sections
2138            let mut oversized: Oversized<_, TestEntry, TestValue> =
2139                Oversized::init(context.clone(), cfg.clone())
2140                    .await
2141                    .expect("Failed to reinit");
2142
2143            // Section 1 should still be valid
2144            let entry = oversized.get(1, 0).await.expect("Failed to get");
2145            assert_eq!(entry.id, 1);
2146            let value = oversized
2147                .get_value(1, offset1, size1)
2148                .await
2149                .expect("Failed to get value");
2150            assert_eq!(value, [1; 16]);
2151
2152            // Should be able to append to section 2 after recovery
2153            let new_value: TestValue = [42; 16];
2154            let new_entry = TestEntry::new(42, 0, 0);
2155            let (pos, offset, size) = oversized
2156                .append(2, new_entry, &new_value)
2157                .await
2158                .expect("Failed to append after recovery");
2159            assert_eq!(pos, 0);
2160
2161            // Verify the new entry
2162            let retrieved = oversized.get(2, 0).await.expect("Failed to get");
2163            assert_eq!(retrieved.id, 42);
2164            let retrieved_value = oversized
2165                .get_value(2, offset, size)
2166                .await
2167                .expect("Failed to get value");
2168            assert_eq!(retrieved_value, new_value);
2169
2170            // Sync and restart to verify persistence
2171            oversized.sync(2).await.expect("Failed to sync");
2172            drop(oversized);
2173
2174            let oversized: Oversized<_, TestEntry, TestValue> =
2175                Oversized::init(context.clone(), cfg)
2176                    .await
2177                    .expect("Failed to reinit after append");
2178
2179            // Both sections should be valid
2180            assert!(oversized.get(1, 0).await.is_ok());
2181            assert!(oversized.get(2, 0).await.is_ok());
2182            assert_eq!(oversized.newest_section(), Some(2));
2183
2184            oversized.destroy().await.expect("Failed to destroy");
2185        });
2186    }
2187
2188    #[test_traced]
2189    fn test_recovery_no_orphan_sections() {
2190        let executor = deterministic::Runner::default();
2191        executor.start(|context| async move {
2192            let cfg = test_cfg();
2193
2194            // Create and populate with sections 1, 2, 3 (no orphans)
2195            let mut oversized: Oversized<_, TestEntry, TestValue> =
2196                Oversized::init(context.clone(), cfg.clone())
2197                    .await
2198                    .expect("Failed to init");
2199
2200            for section in 1u64..=3 {
2201                let value: TestValue = [section as u8; 16];
2202                let entry = TestEntry::new(section, 0, 0);
2203                oversized
2204                    .append(section, entry, &value)
2205                    .await
2206                    .expect("Failed to append");
2207                oversized.sync(section).await.expect("Failed to sync");
2208            }
2209            drop(oversized);
2210
2211            // Reinitialize - no orphan cleanup needed
2212            let oversized: Oversized<_, TestEntry, TestValue> =
2213                Oversized::init(context.clone(), cfg)
2214                    .await
2215                    .expect("Failed to reinit");
2216
2217            // All sections should be valid
2218            for section in 1u64..=3 {
2219                let entry = oversized.get(section, 0).await.expect("Failed to get");
2220                assert_eq!(entry.id, section);
2221            }
2222            assert_eq!(oversized.newest_section(), Some(3));
2223
2224            oversized.destroy().await.expect("Failed to destroy");
2225        });
2226    }
2227
2228    #[test_traced]
2229    fn test_recovery_orphan_with_empty_index_section() {
2230        let executor = deterministic::Runner::default();
2231        executor.start(|context| async move {
2232            let cfg = test_cfg();
2233
2234            // Create and populate section 1 with entries
2235            let mut oversized: Oversized<_, TestEntry, TestValue> =
2236                Oversized::init(context.clone(), cfg.clone())
2237                    .await
2238                    .expect("Failed to init");
2239
2240            let value: TestValue = [1; 16];
2241            let entry = TestEntry::new(1, 0, 0);
2242            oversized
2243                .append(1, entry, &value)
2244                .await
2245                .expect("Failed to append");
2246            oversized.sync(1).await.expect("Failed to sync");
2247            drop(oversized);
2248
2249            // Manually create orphan value section 2
2250            let glob_cfg = GlobConfig {
2251                partition: cfg.value_partition.clone(),
2252                compression: cfg.compression,
2253                codec_config: (),
2254                write_buffer: cfg.value_write_buffer,
2255            };
2256            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2257                .await
2258                .expect("Failed to init glob");
2259            let orphan_value: TestValue = [2; 16];
2260            glob.append(2, &orphan_value)
2261                .await
2262                .expect("Failed to append orphan");
2263            glob.sync(2).await.expect("Failed to sync glob");
2264            drop(glob);
2265
2266            // Now truncate index section 1 to 0 (making it empty but still tracked)
2267            let (blob, _) = context
2268                .open(&cfg.index_partition, &1u64.to_be_bytes())
2269                .await
2270                .expect("Failed to open blob");
2271            blob.resize(0).await.expect("Failed to truncate");
2272            blob.sync().await.expect("Failed to sync");
2273            drop(blob);
2274
2275            // Reinitialize - should handle empty index section and remove orphan value section
2276            let oversized: Oversized<_, TestEntry, TestValue> =
2277                Oversized::init(context.clone(), cfg)
2278                    .await
2279                    .expect("Failed to reinit");
2280
2281            // Section 1 should exist but have no entries (empty after truncation)
2282            assert!(oversized.get(1, 0).await.is_err());
2283
2284            // Orphan section 2 should be removed
2285            assert_eq!(oversized.newest_section(), Some(1));
2286
2287            oversized.destroy().await.expect("Failed to destroy");
2288        });
2289    }
2290
2291    #[test_traced]
2292    fn test_recovery_orphan_sections_with_gaps() {
2293        // Test non-contiguous sections: index has [1, 3, 5], values has [1, 2, 3, 4, 5, 6]
2294        // Orphan sections 2, 4, 6 should be removed
2295        let executor = deterministic::Runner::default();
2296        executor.start(|context| async move {
2297            let cfg = test_cfg();
2298
2299            // Create index with sections 1, 3, 5 (gaps)
2300            let mut oversized: Oversized<_, TestEntry, TestValue> =
2301                Oversized::init(context.clone(), cfg.clone())
2302                    .await
2303                    .expect("Failed to init");
2304
2305            for section in [1u64, 3, 5] {
2306                let value: TestValue = [section as u8; 16];
2307                let entry = TestEntry::new(section, 0, 0);
2308                oversized
2309                    .append(section, entry, &value)
2310                    .await
2311                    .expect("Failed to append");
2312                oversized.sync(section).await.expect("Failed to sync");
2313            }
2314            drop(oversized);
2315
2316            // Manually create orphan value sections 2, 4, 6 (filling gaps and beyond)
2317            let glob_cfg = GlobConfig {
2318                partition: cfg.value_partition.clone(),
2319                compression: cfg.compression,
2320                codec_config: (),
2321                write_buffer: cfg.value_write_buffer,
2322            };
2323            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2324                .await
2325                .expect("Failed to init glob");
2326
2327            for section in [2u64, 4, 6] {
2328                let orphan_value: TestValue = [section as u8; 16];
2329                glob.append(section, &orphan_value)
2330                    .await
2331                    .expect("Failed to append orphan");
2332                glob.sync(section).await.expect("Failed to sync glob");
2333            }
2334            drop(glob);
2335
2336            // Reinitialize - should remove orphan sections 2, 4, 6
2337            let oversized: Oversized<_, TestEntry, TestValue> =
2338                Oversized::init(context.clone(), cfg)
2339                    .await
2340                    .expect("Failed to reinit");
2341
2342            // Sections 1, 3, 5 should still be valid
2343            for section in [1u64, 3, 5] {
2344                let entry = oversized.get(section, 0).await.expect("Failed to get");
2345                assert_eq!(entry.id, section);
2346            }
2347
2348            // Verify only sections 1, 3, 5 exist (orphans removed)
2349            assert_eq!(oversized.oldest_section(), Some(1));
2350            assert_eq!(oversized.newest_section(), Some(5));
2351
2352            oversized.destroy().await.expect("Failed to destroy");
2353        });
2354    }
2355
2356    #[test_traced]
2357    fn test_recovery_glob_trailing_garbage_truncated() {
2358        // Tests the bug fix: when value is written to glob but index entry isn't
2359        // (crash after value write, before index write), recovery should truncate
2360        // the glob trailing garbage so subsequent appends start at correct offset.
2361        let executor = deterministic::Runner::default();
2362        executor.start(|context| async move {
2363            let cfg = test_cfg();
2364
2365            // Create and populate
2366            let mut oversized: Oversized<_, TestEntry, TestValue> =
2367                Oversized::init(context.clone(), cfg.clone())
2368                    .await
2369                    .expect("Failed to init");
2370
2371            // Append 2 entries
2372            let mut locations = Vec::new();
2373            for i in 0..2u8 {
2374                let value: TestValue = [i; 16];
2375                let entry = TestEntry::new(i as u64, 0, 0);
2376                let loc = oversized
2377                    .append(1, entry, &value)
2378                    .await
2379                    .expect("Failed to append");
2380                locations.push(loc);
2381            }
2382            oversized.sync(1).await.expect("Failed to sync");
2383
2384            // Record where next entry SHOULD start (end of entry 1)
2385            let expected_next_offset = byte_end(locations[1].1, locations[1].2);
2386            drop(oversized);
2387
2388            // Simulate crash: write garbage to glob (simulating partial value write)
2389            let (blob, size) = context
2390                .open(&cfg.value_partition, &1u64.to_be_bytes())
2391                .await
2392                .expect("Failed to open blob");
2393            assert_eq!(size, expected_next_offset);
2394
2395            // Write 100 bytes of garbage (simulating partial/failed value write)
2396            let garbage = vec![0xDE; 100];
2397            blob.write_at(garbage, size)
2398                .await
2399                .expect("Failed to write garbage");
2400            blob.sync().await.expect("Failed to sync");
2401            drop(blob);
2402
2403            // Verify glob now has trailing garbage
2404            let (blob, new_size) = context
2405                .open(&cfg.value_partition, &1u64.to_be_bytes())
2406                .await
2407                .expect("Failed to open blob");
2408            assert_eq!(new_size, expected_next_offset + 100);
2409            drop(blob);
2410
2411            // Reinitialize - should truncate the trailing garbage
2412            let mut oversized: Oversized<_, TestEntry, TestValue> =
2413                Oversized::init(context.clone(), cfg.clone())
2414                    .await
2415                    .expect("Failed to reinit");
2416
2417            // First 2 entries should still be valid
2418            for i in 0..2u8 {
2419                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2420                assert_eq!(entry.id, i as u64);
2421            }
2422
2423            // Append new entry - should start at expected_next_offset, NOT at garbage end
2424            let new_value: TestValue = [99; 16];
2425            let new_entry = TestEntry::new(99, 0, 0);
2426            let (pos, offset, _size) = oversized
2427                .append(1, new_entry, &new_value)
2428                .await
2429                .expect("Failed to append after recovery");
2430
2431            // Verify position is 2 (after the 2 existing entries)
2432            assert_eq!(pos, 2);
2433
2434            // Verify offset is at expected_next_offset (garbage was truncated)
2435            assert_eq!(offset, expected_next_offset);
2436
2437            // Verify we can read the new entry
2438            let retrieved = oversized.get(1, 2).await.expect("Failed to get new entry");
2439            assert_eq!(retrieved.id, 99);
2440
2441            oversized.destroy().await.expect("Failed to destroy");
2442        });
2443    }
2444
2445    #[test_traced]
2446    fn test_recovery_entry_with_overflow_offset() {
2447        // Tests that an entry with offset near u64::MAX that would overflow
2448        // when added to size is detected as invalid during recovery.
2449        let executor = deterministic::Runner::default();
2450        executor.start(|context| async move {
2451            // Use page size = entry size so one entry per page
2452            let cfg = Config {
2453                index_partition: "test_index".to_string(),
2454                value_partition: "test_values".to_string(),
2455                index_buffer_pool: PoolRef::new(NZU16!(TestEntry::SIZE as u16), NZUsize!(8)),
2456                index_write_buffer: NZUsize!(1024),
2457                value_write_buffer: NZUsize!(1024),
2458                compression: None,
2459                codec_config: (),
2460            };
2461
2462            // Create and populate with valid entry
2463            let mut oversized: Oversized<_, TestEntry, TestValue> =
2464                Oversized::init(context.clone(), cfg.clone())
2465                    .await
2466                    .expect("Failed to init");
2467
2468            let value: TestValue = [1; 16];
2469            let entry = TestEntry::new(1, 0, 0);
2470            oversized
2471                .append(1, entry, &value)
2472                .await
2473                .expect("Failed to append");
2474            oversized.sync(1).await.expect("Failed to sync");
2475            drop(oversized);
2476
2477            // Build a corrupted entry with offset near u64::MAX that would overflow.
2478            // We need to write a valid page (with correct page-level CRC) containing
2479            // the semantically-invalid entry data.
2480            let (blob, _) = context
2481                .open(&cfg.index_partition, &1u64.to_be_bytes())
2482                .await
2483                .expect("Failed to open blob");
2484
2485            // Build entry data: id (8) + value_offset (8) + value_size (4) = 20 bytes
2486            let mut entry_data = Vec::new();
2487            1u64.write(&mut entry_data); // id
2488            (u64::MAX - 10).write(&mut entry_data); // value_offset (near max)
2489            100u32.write(&mut entry_data); // value_size (offset + size overflows)
2490            assert_eq!(entry_data.len(), TestEntry::SIZE);
2491
2492            // Build page-level CRC record (12 bytes):
2493            // len1 (2) + crc1 (4) + len2 (2) + crc2 (4)
2494            let crc = Crc32::checksum(&entry_data);
2495            let len1 = TestEntry::SIZE as u16;
2496            let mut crc_record = Vec::new();
2497            crc_record.extend_from_slice(&len1.to_be_bytes()); // len1
2498            crc_record.extend_from_slice(&crc.to_be_bytes()); // crc1
2499            crc_record.extend_from_slice(&0u16.to_be_bytes()); // len2 (unused)
2500            crc_record.extend_from_slice(&0u32.to_be_bytes()); // crc2 (unused)
2501            assert_eq!(crc_record.len(), 12);
2502
2503            // Write the complete physical page: entry_data + crc_record
2504            let mut page = entry_data;
2505            page.extend_from_slice(&crc_record);
2506            blob.write_at(page, 0)
2507                .await
2508                .expect("Failed to write corrupted page");
2509            blob.sync().await.expect("Failed to sync");
2510            drop(blob);
2511
2512            // Reinitialize - recovery should detect the invalid entry
2513            // (offset + size would overflow, and even with saturating_add it exceeds glob_size)
2514            let mut oversized: Oversized<_, TestEntry, TestValue> =
2515                Oversized::init(context.clone(), cfg.clone())
2516                    .await
2517                    .expect("Failed to reinit");
2518
2519            // The corrupted entry should have been rewound (invalid)
2520            assert!(oversized.get(1, 0).await.is_err());
2521
2522            // Should be able to append after recovery
2523            let new_value: TestValue = [99; 16];
2524            let new_entry = TestEntry::new(99, 0, 0);
2525            let (pos, new_offset, _) = oversized
2526                .append(1, new_entry, &new_value)
2527                .await
2528                .expect("Failed to append after recovery");
2529
2530            // Position should be 0 (corrupted entry was removed)
2531            assert_eq!(pos, 0);
2532            // Offset should be 0 (glob was truncated to 0)
2533            assert_eq!(new_offset, 0);
2534
2535            oversized.destroy().await.expect("Failed to destroy");
2536        });
2537    }
2538
2539    #[test_traced]
2540    fn test_empty_section_persistence() {
2541        // Tests that sections that become empty (all entries removed/rewound)
2542        // are handled correctly across restart cycles.
2543        let executor = deterministic::Runner::default();
2544        executor.start(|context| async move {
2545            let cfg = test_cfg();
2546
2547            // Create and populate section 1 with entries
2548            let mut oversized: Oversized<_, TestEntry, TestValue> =
2549                Oversized::init(context.clone(), cfg.clone())
2550                    .await
2551                    .expect("Failed to init");
2552
2553            for i in 0..3u8 {
2554                let value: TestValue = [i; 16];
2555                let entry = TestEntry::new(i as u64, 0, 0);
2556                oversized
2557                    .append(1, entry, &value)
2558                    .await
2559                    .expect("Failed to append");
2560            }
2561            oversized.sync(1).await.expect("Failed to sync");
2562
2563            // Also create section 2 to ensure it survives
2564            let value2: TestValue = [10; 16];
2565            let entry2 = TestEntry::new(10, 0, 0);
2566            oversized
2567                .append(2, entry2, &value2)
2568                .await
2569                .expect("Failed to append to section 2");
2570            oversized.sync(2).await.expect("Failed to sync section 2");
2571            drop(oversized);
2572
2573            // Truncate section 1's index to 0 (making it empty)
2574            let (blob, _) = context
2575                .open(&cfg.index_partition, &1u64.to_be_bytes())
2576                .await
2577                .expect("Failed to open blob");
2578            blob.resize(0).await.expect("Failed to truncate");
2579            blob.sync().await.expect("Failed to sync");
2580            drop(blob);
2581
2582            // First restart - recovery should handle empty section 1
2583            let mut oversized: Oversized<_, TestEntry, TestValue> =
2584                Oversized::init(context.clone(), cfg.clone())
2585                    .await
2586                    .expect("Failed to reinit");
2587
2588            // Section 1 should exist but have no entries
2589            assert!(oversized.get(1, 0).await.is_err());
2590
2591            // Section 2 should still be valid
2592            let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2593            assert_eq!(entry.id, 10);
2594
2595            // Section 1 should still be tracked (blob exists but is empty)
2596            assert_eq!(oversized.oldest_section(), Some(1));
2597
2598            // Append to empty section 1
2599            // Note: When index is truncated to 0 but the index blob still exists,
2600            // the glob is NOT truncated (the section isn't considered an orphan).
2601            // The glob still has orphan DATA from the old entries, but this doesn't
2602            // affect correctness - new entries simply append after the orphan data.
2603            let new_value: TestValue = [99; 16];
2604            let new_entry = TestEntry::new(99, 0, 0);
2605            let (pos, offset, size) = oversized
2606                .append(1, new_entry, &new_value)
2607                .await
2608                .expect("Failed to append to empty section");
2609            assert_eq!(pos, 0);
2610            // Glob offset is non-zero because orphan data wasn't truncated
2611            assert!(offset > 0);
2612            oversized.sync(1).await.expect("Failed to sync");
2613
2614            // Verify the new entry is readable despite orphan data before it
2615            let entry = oversized.get(1, 0).await.expect("Failed to get");
2616            assert_eq!(entry.id, 99);
2617            let value = oversized
2618                .get_value(1, offset, size)
2619                .await
2620                .expect("Failed to get value");
2621            assert_eq!(value, new_value);
2622
2623            drop(oversized);
2624
2625            // Second restart - verify persistence
2626            let oversized: Oversized<_, TestEntry, TestValue> =
2627                Oversized::init(context.clone(), cfg.clone())
2628                    .await
2629                    .expect("Failed to reinit again");
2630
2631            // Section 1's new entry should be valid
2632            let entry = oversized.get(1, 0).await.expect("Failed to get");
2633            assert_eq!(entry.id, 99);
2634
2635            // Section 2 should still be valid
2636            let entry = oversized.get(2, 0).await.expect("Failed to get section 2");
2637            assert_eq!(entry.id, 10);
2638
2639            oversized.destroy().await.expect("Failed to destroy");
2640        });
2641    }
2642
2643    #[test_traced]
2644    fn test_get_value_size_equals_crc_size() {
2645        // Tests the boundary condition where size = 4 (just CRC, no data).
2646        // This should fail because there's no actual data to decode.
2647        let executor = deterministic::Runner::default();
2648        executor.start(|context| async move {
2649            let mut oversized: Oversized<_, TestEntry, TestValue> =
2650                Oversized::init(context.clone(), test_cfg())
2651                    .await
2652                    .expect("Failed to init");
2653
2654            let value: TestValue = [42; 16];
2655            let entry = TestEntry::new(1, 0, 0);
2656            let (_, offset, _) = oversized
2657                .append(1, entry, &value)
2658                .await
2659                .expect("Failed to append");
2660            oversized.sync(1).await.expect("Failed to sync");
2661
2662            // Size = 4 (exactly CRC_SIZE) means 0 bytes of actual data
2663            // This should fail with ChecksumMismatch or decode error
2664            let result = oversized.get_value(1, offset, 4).await;
2665            assert!(result.is_err());
2666
2667            oversized.destroy().await.expect("Failed to destroy");
2668        });
2669    }
2670
2671    #[test_traced]
2672    fn test_get_value_size_just_over_crc() {
2673        // Tests size = 5 (CRC + 1 byte of data).
2674        // This should fail because the data is too short to decode.
2675        let executor = deterministic::Runner::default();
2676        executor.start(|context| async move {
2677            let mut oversized: Oversized<_, TestEntry, TestValue> =
2678                Oversized::init(context.clone(), test_cfg())
2679                    .await
2680                    .expect("Failed to init");
2681
2682            let value: TestValue = [42; 16];
2683            let entry = TestEntry::new(1, 0, 0);
2684            let (_, offset, _) = oversized
2685                .append(1, entry, &value)
2686                .await
2687                .expect("Failed to append");
2688            oversized.sync(1).await.expect("Failed to sync");
2689
2690            // Size = 5 means 1 byte of actual data (after stripping CRC)
2691            // This should fail with checksum mismatch since we're reading wrong bytes
2692            let result = oversized.get_value(1, offset, 5).await;
2693            assert!(result.is_err());
2694
2695            oversized.destroy().await.expect("Failed to destroy");
2696        });
2697    }
2698
2699    #[test_traced]
2700    fn test_recovery_maximum_section_numbers() {
2701        // Test recovery with very large section numbers near u64::MAX to check
2702        // for overflow edge cases in section arithmetic.
2703        let executor = deterministic::Runner::default();
2704        executor.start(|context| async move {
2705            let cfg = test_cfg();
2706
2707            // Use section numbers near u64::MAX
2708            let large_sections = [u64::MAX - 3, u64::MAX - 2, u64::MAX - 1];
2709
2710            // Create and populate with large section numbers
2711            let mut oversized: Oversized<_, TestEntry, TestValue> =
2712                Oversized::init(context.clone(), cfg.clone())
2713                    .await
2714                    .expect("Failed to init");
2715
2716            let mut locations = Vec::new();
2717            for &section in &large_sections {
2718                let value: TestValue = [(section & 0xFF) as u8; 16];
2719                let entry = TestEntry::new(section, 0, 0);
2720                let loc = oversized
2721                    .append(section, entry, &value)
2722                    .await
2723                    .expect("Failed to append");
2724                locations.push((section, loc));
2725                oversized.sync(section).await.expect("Failed to sync");
2726            }
2727            drop(oversized);
2728
2729            // Simulate crash: truncate glob for middle section
2730            let middle_section = large_sections[1];
2731            let (blob, size) = context
2732                .open(&cfg.value_partition, &middle_section.to_be_bytes())
2733                .await
2734                .expect("Failed to open blob");
2735            blob.resize(size / 2).await.expect("Failed to truncate");
2736            blob.sync().await.expect("Failed to sync");
2737            drop(blob);
2738
2739            // Reinitialize - should recover without overflow panics
2740            let oversized: Oversized<_, TestEntry, TestValue> =
2741                Oversized::init(context.clone(), cfg.clone())
2742                    .await
2743                    .expect("Failed to reinit");
2744
2745            // First and last sections should still be valid
2746            let entry = oversized
2747                .get(large_sections[0], 0)
2748                .await
2749                .expect("Failed to get first section");
2750            assert_eq!(entry.id, large_sections[0]);
2751
2752            let entry = oversized
2753                .get(large_sections[2], 0)
2754                .await
2755                .expect("Failed to get last section");
2756            assert_eq!(entry.id, large_sections[2]);
2757
2758            // Middle section should have been rewound (no entries)
2759            assert!(oversized.get(middle_section, 0).await.is_err());
2760
2761            // Verify we can still append to these large sections
2762            let new_value: TestValue = [0xAB; 16];
2763            let new_entry = TestEntry::new(999, 0, 0);
2764            let mut oversized = oversized;
2765            oversized
2766                .append(middle_section, new_entry, &new_value)
2767                .await
2768                .expect("Failed to append after recovery");
2769
2770            oversized.destroy().await.expect("Failed to destroy");
2771        });
2772    }
2773
2774    #[test_traced]
2775    fn test_recovery_crash_during_recovery_rewind() {
2776        // Tests a nested crash scenario: initial crash leaves inconsistent state,
2777        // then a second crash occurs during recovery's rewind operation.
2778        // This simulates the worst-case where recovery itself is interrupted.
2779        let executor = deterministic::Runner::default();
2780        executor.start(|context| async move {
2781            let cfg = test_cfg();
2782
2783            // Phase 1: Create valid data with 5 entries
2784            let mut oversized: Oversized<_, TestEntry, TestValue> =
2785                Oversized::init(context.clone(), cfg.clone())
2786                    .await
2787                    .expect("Failed to init");
2788
2789            let mut locations = Vec::new();
2790            for i in 0..5u8 {
2791                let value: TestValue = [i; 16];
2792                let entry = TestEntry::new(i as u64, 0, 0);
2793                let loc = oversized
2794                    .append(1, entry, &value)
2795                    .await
2796                    .expect("Failed to append");
2797                locations.push(loc);
2798            }
2799            oversized.sync(1).await.expect("Failed to sync");
2800            drop(oversized);
2801
2802            // Phase 2: Simulate first crash - truncate glob to lose last 2 entries
2803            let (blob, _) = context
2804                .open(&cfg.value_partition, &1u64.to_be_bytes())
2805                .await
2806                .expect("Failed to open blob");
2807            let keep_size = byte_end(locations[2].1, locations[2].2);
2808            blob.resize(keep_size).await.expect("Failed to truncate");
2809            blob.sync().await.expect("Failed to sync");
2810            drop(blob);
2811
2812            // Phase 3: Simulate crash during recovery's rewind
2813            // Recovery would try to rewind index from 5 entries to 3 entries.
2814            // Simulate partial rewind by manually truncating index to 4 entries
2815            // (as if crash occurred mid-rewind).
2816            let chunk_size = FixedJournal::<deterministic::Context, TestEntry>::CHUNK_SIZE as u64;
2817            let (index_blob, _) = context
2818                .open(&cfg.index_partition, &1u64.to_be_bytes())
2819                .await
2820                .expect("Failed to open index blob");
2821            let partial_rewind_size = 4 * chunk_size; // 4 entries instead of 3
2822            index_blob
2823                .resize(partial_rewind_size)
2824                .await
2825                .expect("Failed to resize");
2826            index_blob.sync().await.expect("Failed to sync");
2827            drop(index_blob);
2828
2829            // Phase 4: Second recovery attempt should handle the inconsistent state
2830            // Index has 4 entries, but glob only supports 3.
2831            let oversized: Oversized<_, TestEntry, TestValue> =
2832                Oversized::init(context.clone(), cfg.clone())
2833                    .await
2834                    .expect("Failed to reinit after nested crash");
2835
2836            // Only first 3 entries should be valid (recovery should rewind again)
2837            for i in 0..3u8 {
2838                let entry = oversized.get(1, i as u64).await.expect("Failed to get");
2839                assert_eq!(entry.id, i as u64);
2840
2841                let (_, offset, size) = locations[i as usize];
2842                let value = oversized
2843                    .get_value(1, offset, size)
2844                    .await
2845                    .expect("Failed to get value");
2846                assert_eq!(value, [i; 16]);
2847            }
2848
2849            // Entry 3 should not exist (index was rewound to match glob)
2850            assert!(oversized.get(1, 3).await.is_err());
2851
2852            // Verify append works after nested crash recovery
2853            let new_value: TestValue = [0xFF; 16];
2854            let new_entry = TestEntry::new(100, 0, 0);
2855            let mut oversized = oversized;
2856            let (pos, offset, _size) = oversized
2857                .append(1, new_entry, &new_value)
2858                .await
2859                .expect("Failed to append");
2860            assert_eq!(pos, 3); // Should be position 3 (after the 3 valid entries)
2861
2862            // Verify the offset starts where entry 2 ended (no gaps)
2863            assert_eq!(offset, byte_end(locations[2].1, locations[2].2));
2864
2865            oversized.destroy().await.expect("Failed to destroy");
2866        });
2867    }
2868
2869    #[test_traced]
2870    fn test_recovery_crash_during_orphan_cleanup() {
2871        // Tests crash during orphan section cleanup: recovery starts removing
2872        // orphan value sections, but crashes mid-cleanup.
2873        let executor = deterministic::Runner::default();
2874        executor.start(|context| async move {
2875            let cfg = test_cfg();
2876
2877            // Phase 1: Create valid data in section 1
2878            let mut oversized: Oversized<_, TestEntry, TestValue> =
2879                Oversized::init(context.clone(), cfg.clone())
2880                    .await
2881                    .expect("Failed to init");
2882
2883            let value: TestValue = [1; 16];
2884            let entry = TestEntry::new(1, 0, 0);
2885            let (_, offset1, size1) = oversized
2886                .append(1, entry, &value)
2887                .await
2888                .expect("Failed to append");
2889            oversized.sync(1).await.expect("Failed to sync");
2890            drop(oversized);
2891
2892            // Phase 2: Create orphan value sections 2, 3, 4 (no index entries)
2893            let glob_cfg = GlobConfig {
2894                partition: cfg.value_partition.clone(),
2895                compression: cfg.compression,
2896                codec_config: (),
2897                write_buffer: cfg.value_write_buffer,
2898            };
2899            let mut glob: Glob<_, TestValue> = Glob::init(context.with_label("glob"), glob_cfg)
2900                .await
2901                .expect("Failed to init glob");
2902
2903            for section in 2u64..=4 {
2904                let orphan_value: TestValue = [section as u8; 16];
2905                glob.append(section, &orphan_value)
2906                    .await
2907                    .expect("Failed to append orphan");
2908                glob.sync(section).await.expect("Failed to sync glob");
2909            }
2910            drop(glob);
2911
2912            // Phase 3: Simulate partial orphan cleanup (section 2 removed, 3 and 4 remain)
2913            // This simulates a crash during cleanup_orphan_value_sections()
2914            context
2915                .remove(&cfg.value_partition, Some(&2u64.to_be_bytes()))
2916                .await
2917                .expect("Failed to remove section 2");
2918
2919            // Phase 4: Recovery should complete the cleanup
2920            let mut oversized: Oversized<_, TestEntry, TestValue> =
2921                Oversized::init(context.clone(), cfg.clone())
2922                    .await
2923                    .expect("Failed to reinit");
2924
2925            // Section 1 should still be valid
2926            let entry = oversized.get(1, 0).await.expect("Failed to get");
2927            assert_eq!(entry.id, 1);
2928            let value = oversized
2929                .get_value(1, offset1, size1)
2930                .await
2931                .expect("Failed to get value");
2932            assert_eq!(value, [1; 16]);
2933
2934            // No orphan sections should remain
2935            assert_eq!(oversized.oldest_section(), Some(1));
2936            assert_eq!(oversized.newest_section(), Some(1));
2937
2938            // Should be able to append to section 2 (now clean)
2939            let new_value: TestValue = [42; 16];
2940            let new_entry = TestEntry::new(42, 0, 0);
2941            let (pos, _, _) = oversized
2942                .append(2, new_entry, &new_value)
2943                .await
2944                .expect("Failed to append to section 2");
2945            assert_eq!(pos, 0); // First entry in new section
2946
2947            oversized.destroy().await.expect("Failed to destroy");
2948        });
2949    }
2950}