commonware_storage/journal/segmented/
fixed.rs

1//! Segmented journal for fixed-size items.
2//!
3//! # Format
4//!
5//! Data is stored in one blob per section. Items are stored sequentially:
6//!
7//! ```text
8//! +--------+--------+--------+----------+
9//! | item_0 | item_1 |   ...  | item_n-1 |
10//! +--------+--------+--------+----------+
11//! ```
12//!
13//! # Sync
14//!
15//! Data written to `Journal` may not be immediately persisted to `Storage`. Use the
16//! `sync` method to force pending data to be written.
17//!
18//! # Pruning
19//!
20//! All data must be assigned to a `section`. This allows pruning entire sections
21//! (and their corresponding blobs) independently.
22
23use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
24use crate::journal::Error;
25use bytes::Buf;
26use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _};
27use commonware_runtime::{
28    buffer::pool::{PoolRef, Replay},
29    Blob, Metrics, Storage,
30};
31use futures::{
32    stream::{self, Stream},
33    StreamExt,
34};
35use std::{marker::PhantomData, num::NonZeroUsize};
36use tracing::{trace, warn};
37
38/// State for replaying a single section's blob.
39struct ReplayState<B: Blob> {
40    section: u64,
41    replay: Replay<B>,
42    position: u64,
43    done: bool,
44}
45
46/// Configuration for the fixed segmented journal.
47#[derive(Clone)]
48pub struct Config {
49    /// The partition to use for storing blobs.
50    pub partition: String,
51
52    /// The buffer pool to use for caching data.
53    pub buffer_pool: PoolRef,
54
55    /// The size of the write buffer to use for each blob.
56    pub write_buffer: NonZeroUsize,
57}
58
59/// A segmented journal with fixed-size entries.
60///
61/// Each section is stored in a separate blob. Within each blob, items are fixed-size.
62///
63/// # Repair
64///
65/// Like
66/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
67/// and
68/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
69/// the first invalid data read will be considered the new end of the journal (and the
70/// underlying [Blob] will be truncated to the last valid item). Repair occurs during
71/// init by checking each blob's size.
72pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
73    manager: Manager<E, AppendFactory>,
74    _array: PhantomData<A>,
75}
76
77impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
78    /// Size of each entry.
79    pub const CHUNK_SIZE: usize = A::SIZE;
80    const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
81
82    /// Initialize a new `Journal` instance.
83    ///
84    /// All backing blobs are opened but not read during initialization. Use `replay`
85    /// to iterate over all items.
86    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
87        let manager_cfg = ManagerConfig {
88            partition: cfg.partition,
89            factory: AppendFactory {
90                write_buffer: cfg.write_buffer,
91                pool_ref: cfg.buffer_pool,
92            },
93        };
94        let mut manager = Manager::init(context, manager_cfg).await?;
95
96        // Repair any blobs with trailing bytes (incomplete items from crash)
97        let sections: Vec<_> = manager.sections().collect();
98        for section in sections {
99            let size = manager.size(section).await?;
100            if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
101                let valid_size = size - (size % Self::CHUNK_SIZE_U64);
102                warn!(
103                    section,
104                    invalid_size = size,
105                    new_size = valid_size,
106                    "trailing bytes detected: truncating"
107                );
108                manager.rewind_section(section, valid_size).await?;
109            }
110        }
111
112        Ok(Self {
113            manager,
114            _array: PhantomData,
115        })
116    }
117
118    /// Append a new item to the journal in the given section.
119    ///
120    /// Returns the position of the item within the section (0-indexed).
121    pub async fn append(&mut self, section: u64, item: A) -> Result<u64, Error> {
122        let blob = self.manager.get_or_create(section).await?;
123
124        let size = blob.size().await;
125        if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
126            return Err(Error::InvalidBlobSize(section, size));
127        }
128        let position = size / Self::CHUNK_SIZE_U64;
129
130        // Encode the item
131        let buf = item.encode_mut();
132        blob.append(&buf).await?;
133        trace!(section, position, "appended item");
134
135        Ok(position)
136    }
137
138    /// Read the item at the given section and position.
139    ///
140    /// # Errors
141    ///
142    /// - [Error::AlreadyPrunedToSection] if the section has been pruned.
143    /// - [Error::SectionOutOfRange] if the section doesn't exist.
144    /// - [Error::ItemOutOfRange] if the position is beyond the blob size.
145    pub async fn get(&self, section: u64, position: u64) -> Result<A, Error> {
146        let blob = self
147            .manager
148            .get(section)?
149            .ok_or(Error::SectionOutOfRange(section))?;
150
151        let offset = position
152            .checked_mul(Self::CHUNK_SIZE_U64)
153            .ok_or(Error::ItemOutOfRange(position))?;
154        let end = offset
155            .checked_add(Self::CHUNK_SIZE_U64)
156            .ok_or(Error::ItemOutOfRange(position))?;
157        if end > blob.size().await {
158            return Err(Error::ItemOutOfRange(position));
159        }
160
161        let buf = blob.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
162        A::decode(buf.as_ref()).map_err(Error::Codec)
163    }
164
165    /// Read the last item in a section, if any.
166    pub async fn last(&self, section: u64) -> Result<Option<A>, Error> {
167        let blob = self
168            .manager
169            .get(section)?
170            .ok_or(Error::SectionOutOfRange(section))?;
171
172        let size = blob.size().await;
173        if size < Self::CHUNK_SIZE_U64 {
174            return Ok(None);
175        }
176
177        let last_position = (size / Self::CHUNK_SIZE_U64) - 1;
178        let offset = last_position * Self::CHUNK_SIZE_U64;
179        let buf = blob.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
180        A::decode(buf.as_ref()).map_err(Error::Codec).map(Some)
181    }
182
183    /// Returns a stream of all items starting from the given section.
184    ///
185    /// Each item is returned as (section, position, item).
186    pub async fn replay(
187        &self,
188        start_section: u64,
189        start_position: u64,
190        buffer: NonZeroUsize,
191    ) -> Result<impl Stream<Item = Result<(u64, u64, A), Error>> + Send + '_, Error> {
192        // Pre-create readers from blobs (async operation)
193        let mut blob_info = Vec::new();
194        for (&section, blob) in self.manager.sections_from(start_section) {
195            let blob_size = blob.size().await;
196            let mut replay = blob.replay(buffer).await?;
197            // For the first section, seek to the start position
198            let initial_position = if section == start_section {
199                let start = start_position * Self::CHUNK_SIZE_U64;
200                if start > blob_size {
201                    return Err(Error::ItemOutOfRange(start_position));
202                }
203                replay.seek_to(start).await?;
204                start_position
205            } else {
206                0
207            };
208            blob_info.push((section, replay, initial_position));
209        }
210
211        // Stream items as they are read to avoid occupying too much memory.
212        // Each blob is processed sequentially, yielding batches of items that are then
213        // flattened into individual stream elements.
214        Ok(
215            stream::iter(blob_info).flat_map(move |(section, replay, initial_position)| {
216                stream::unfold(
217                    ReplayState {
218                        section,
219                        replay,
220                        position: initial_position,
221                        done: false,
222                    },
223                    move |mut state| async move {
224                        if state.done {
225                            return None;
226                        }
227
228                        let mut batch: Vec<Result<(u64, u64, A), Error>> = Vec::new();
229                        loop {
230                            // Ensure we have enough data for one item
231                            match state.replay.ensure(Self::CHUNK_SIZE).await {
232                                Ok(true) => {}
233                                Ok(false) => {
234                                    // Reader exhausted - we're done with this blob
235                                    state.done = true;
236                                    return if batch.is_empty() {
237                                        None
238                                    } else {
239                                        Some((batch, state))
240                                    };
241                                }
242                                Err(err) => {
243                                    batch.push(Err(Error::Runtime(err)));
244                                    state.done = true;
245                                    return Some((batch, state));
246                                }
247                            }
248
249                            // Decode items from buffer
250                            while state.replay.remaining() >= Self::CHUNK_SIZE {
251                                match A::read(&mut state.replay) {
252                                    Ok(item) => {
253                                        batch.push(Ok((state.section, state.position, item)));
254                                        state.position += 1;
255                                    }
256                                    Err(err) => {
257                                        batch.push(Err(Error::Codec(err)));
258                                        state.done = true;
259                                        return Some((batch, state));
260                                    }
261                                }
262                            }
263
264                            // Return batch if we have items
265                            if !batch.is_empty() {
266                                return Some((batch, state));
267                            }
268                        }
269                    },
270                )
271                .flat_map(stream::iter)
272            }),
273        )
274    }
275
276    /// Sync the given section to storage.
277    pub async fn sync(&self, section: u64) -> Result<(), Error> {
278        self.manager.sync(section).await
279    }
280
281    /// Sync all sections to storage.
282    pub async fn sync_all(&self) -> Result<(), Error> {
283        self.manager.sync_all().await
284    }
285
286    /// Prune all sections less than `min`. Returns true if any were pruned.
287    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
288        self.manager.prune(min).await
289    }
290
291    /// Returns the oldest section number, if any blobs exist.
292    pub fn oldest_section(&self) -> Option<u64> {
293        self.manager.oldest_section()
294    }
295
296    /// Returns the newest section number, if any blobs exist.
297    pub fn newest_section(&self) -> Option<u64> {
298        self.manager.newest_section()
299    }
300
301    /// Returns an iterator over all section numbers.
302    pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
303        self.manager.sections_from(0).map(|(section, _)| *section)
304    }
305
306    /// Returns the number of items in the given section.
307    pub async fn section_len(&self, section: u64) -> Result<u64, Error> {
308        let size = self.manager.size(section).await?;
309        Ok(size / Self::CHUNK_SIZE_U64)
310    }
311
312    /// Returns the byte size of the given section.
313    pub async fn size(&self, section: u64) -> Result<u64, Error> {
314        self.manager.size(section).await
315    }
316
317    /// Rewind the journal to a specific section and byte offset.
318    ///
319    /// This truncates the section to the given size. All sections
320    /// after `section` are removed.
321    pub async fn rewind(&mut self, section: u64, offset: u64) -> Result<(), Error> {
322        self.manager.rewind(section, offset).await
323    }
324
325    /// Rewind only the given section to a specific byte offset.
326    ///
327    /// Unlike `rewind`, this does not affect other sections.
328    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
329        self.manager.rewind_section(section, size).await
330    }
331
332    /// Remove all underlying blobs.
333    pub async fn destroy(self) -> Result<(), Error> {
334        self.manager.destroy().await
335    }
336
337    /// Initialize a section with a specific number of zero-filled items.
338    ///
339    /// This creates the section's blob and fills it with `item_count` items worth of zeros.
340    /// The data is written through the Append wrapper which handles checksums properly.
341    ///
342    /// # Arguments
343    /// * `section` - The section number to initialize
344    /// * `item_count` - Number of zero-filled items to write
345    pub(crate) async fn init_section_at_size(
346        &mut self,
347        section: u64,
348        item_count: u64,
349    ) -> Result<(), Error> {
350        // Get or create the blob for this section
351        let blob = self.manager.get_or_create(section).await?;
352
353        // Calculate the target byte size
354        let target_size = item_count * Self::CHUNK_SIZE_U64;
355
356        // Resize grows the blob by appending zeros, which handles checksums properly
357        blob.resize(target_size).await?;
358
359        Ok(())
360    }
361
362    /// Ensure a section exists, creating an empty blob if needed.
363    ///
364    /// This is used to maintain the invariant that at least one blob always exists
365    /// (the "tail" blob), which allows reconstructing journal size on reopen.
366    pub(crate) async fn ensure_section_exists(&mut self, section: u64) -> Result<(), Error> {
367        self.manager.get_or_create(section).await?;
368        Ok(())
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
376    use commonware_macros::test_traced;
377    use commonware_runtime::{buffer::PoolRef, deterministic, Runner};
378    use commonware_utils::{NZUsize, NZU16};
379    use core::num::NonZeroU16;
380    use futures::{pin_mut, StreamExt};
381
382    const PAGE_SIZE: NonZeroU16 = NZU16!(44);
383    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
384
385    fn test_digest(value: u64) -> Digest {
386        Sha256::hash(&value.to_be_bytes())
387    }
388
389    fn test_cfg() -> Config {
390        Config {
391            partition: "test_partition".into(),
392            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
393            write_buffer: NZUsize!(2048),
394        }
395    }
396
397    #[test_traced]
398    fn test_segmented_fixed_append_and_get() {
399        let executor = deterministic::Runner::default();
400        executor.start(|context| async move {
401            let cfg = test_cfg();
402            let mut journal = Journal::init(context.clone(), cfg.clone())
403                .await
404                .expect("failed to init");
405
406            let pos0 = journal
407                .append(1, test_digest(0))
408                .await
409                .expect("failed to append");
410            assert_eq!(pos0, 0);
411
412            let pos1 = journal
413                .append(1, test_digest(1))
414                .await
415                .expect("failed to append");
416            assert_eq!(pos1, 1);
417
418            let pos2 = journal
419                .append(2, test_digest(2))
420                .await
421                .expect("failed to append");
422            assert_eq!(pos2, 0);
423
424            let item0 = journal.get(1, 0).await.expect("failed to get");
425            assert_eq!(item0, test_digest(0));
426
427            let item1 = journal.get(1, 1).await.expect("failed to get");
428            assert_eq!(item1, test_digest(1));
429
430            let item2 = journal.get(2, 0).await.expect("failed to get");
431            assert_eq!(item2, test_digest(2));
432
433            let err = journal.get(1, 2).await;
434            assert!(matches!(err, Err(Error::ItemOutOfRange(2))));
435
436            let err = journal.get(3, 0).await;
437            assert!(matches!(err, Err(Error::SectionOutOfRange(3))));
438
439            journal.destroy().await.expect("failed to destroy");
440        });
441    }
442
443    #[test_traced]
444    fn test_segmented_fixed_replay() {
445        let executor = deterministic::Runner::default();
446        executor.start(|context| async move {
447            let cfg = test_cfg();
448            let mut journal = Journal::init(context.clone(), cfg.clone())
449                .await
450                .expect("failed to init");
451
452            for i in 0u64..10 {
453                journal
454                    .append(1, test_digest(i))
455                    .await
456                    .expect("failed to append");
457            }
458            for i in 10u64..20 {
459                journal
460                    .append(2, test_digest(i))
461                    .await
462                    .expect("failed to append");
463            }
464
465            journal.sync_all().await.expect("failed to sync");
466            drop(journal);
467
468            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
469                .await
470                .expect("failed to re-init");
471
472            let items = {
473                let stream = journal
474                    .replay(0, 0, NZUsize!(1024))
475                    .await
476                    .expect("failed to replay");
477                pin_mut!(stream);
478
479                let mut items = Vec::new();
480                while let Some(result) = stream.next().await {
481                    match result {
482                        Ok((section, pos, item)) => items.push((section, pos, item)),
483                        Err(err) => panic!("replay error: {err}"),
484                    }
485                }
486                items
487            };
488
489            assert_eq!(items.len(), 20);
490            for (i, item) in items.iter().enumerate().take(10) {
491                assert_eq!(item.0, 1);
492                assert_eq!(item.1, i as u64);
493                assert_eq!(item.2, test_digest(i as u64));
494            }
495            for (i, item) in items.iter().enumerate().skip(10).take(10) {
496                assert_eq!(item.0, 2);
497                assert_eq!(item.1, (i - 10) as u64);
498                assert_eq!(item.2, test_digest(i as u64));
499            }
500
501            journal.destroy().await.expect("failed to destroy");
502        });
503    }
504
505    #[test_traced]
506    fn test_segmented_fixed_replay_with_start_offset() {
507        // Test that replay with a non-zero start_position correctly skips items.
508        let executor = deterministic::Runner::default();
509        executor.start(|context| async move {
510            let cfg = test_cfg();
511            let mut journal = Journal::init(context.clone(), cfg.clone())
512                .await
513                .expect("failed to init");
514
515            // Append 10 items to section 1
516            for i in 0u64..10 {
517                journal
518                    .append(1, test_digest(i))
519                    .await
520                    .expect("failed to append");
521            }
522            // Append 5 items to section 2
523            for i in 10u64..15 {
524                journal
525                    .append(2, test_digest(i))
526                    .await
527                    .expect("failed to append");
528            }
529            journal.sync_all().await.expect("failed to sync");
530            drop(journal);
531
532            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
533                .await
534                .expect("failed to re-init");
535
536            // Replay from section 1, position 5 - should get items 5-9 from section 1 and all of section 2
537            {
538                let stream = journal
539                    .replay(1, 5, NZUsize!(1024))
540                    .await
541                    .expect("failed to replay");
542                pin_mut!(stream);
543
544                let mut items = Vec::new();
545                while let Some(result) = stream.next().await {
546                    let (section, pos, item) = result.expect("replay error");
547                    items.push((section, pos, item));
548                }
549
550                assert_eq!(
551                    items.len(),
552                    10,
553                    "Should have 5 items from section 1 + 5 from section 2"
554                );
555
556                // Check section 1 items (positions 5-9)
557                for (i, (section, pos, item)) in items.iter().enumerate().take(5) {
558                    assert_eq!(*section, 1);
559                    assert_eq!(*pos, (i + 5) as u64);
560                    assert_eq!(*item, test_digest((i + 5) as u64));
561                }
562
563                // Check section 2 items (positions 0-4)
564                for (i, (section, pos, item)) in items.iter().enumerate().skip(5) {
565                    assert_eq!(*section, 2);
566                    assert_eq!(*pos, (i - 5) as u64);
567                    assert_eq!(*item, test_digest((i + 5) as u64));
568                }
569            }
570
571            // Replay from section 1, position 9 - should get only item 9 from section 1 and all of section 2
572            {
573                let stream = journal
574                    .replay(1, 9, NZUsize!(1024))
575                    .await
576                    .expect("failed to replay");
577                pin_mut!(stream);
578
579                let mut items = Vec::new();
580                while let Some(result) = stream.next().await {
581                    let (section, pos, item) = result.expect("replay error");
582                    items.push((section, pos, item));
583                }
584
585                assert_eq!(
586                    items.len(),
587                    6,
588                    "Should have 1 item from section 1 + 5 from section 2"
589                );
590                assert_eq!(items[0], (1, 9, test_digest(9)));
591                for (i, (section, pos, item)) in items.iter().enumerate().skip(1) {
592                    assert_eq!(*section, 2);
593                    assert_eq!(*pos, (i - 1) as u64);
594                    assert_eq!(*item, test_digest((i + 9) as u64));
595                }
596            }
597
598            // Replay from section 2, position 3 - should get only items 3-4 from section 2
599            {
600                let stream = journal
601                    .replay(2, 3, NZUsize!(1024))
602                    .await
603                    .expect("failed to replay");
604                pin_mut!(stream);
605
606                let mut items = Vec::new();
607                while let Some(result) = stream.next().await {
608                    let (section, pos, item) = result.expect("replay error");
609                    items.push((section, pos, item));
610                }
611
612                assert_eq!(items.len(), 2, "Should have 2 items from section 2");
613                assert_eq!(items[0], (2, 3, test_digest(13)));
614                assert_eq!(items[1], (2, 4, test_digest(14)));
615            }
616
617            // Replay from position past the end should return ItemOutOfRange error
618            let result = journal.replay(1, 100, NZUsize!(1024)).await;
619            assert!(matches!(result, Err(Error::ItemOutOfRange(100))));
620            drop(result);
621
622            journal.destroy().await.expect("failed to destroy");
623        });
624    }
625
626    #[test_traced]
627    fn test_segmented_fixed_prune() {
628        let executor = deterministic::Runner::default();
629        executor.start(|context| async move {
630            let cfg = test_cfg();
631            let mut journal = Journal::init(context.clone(), cfg.clone())
632                .await
633                .expect("failed to init");
634
635            for section in 1u64..=5 {
636                journal
637                    .append(section, test_digest(section))
638                    .await
639                    .expect("failed to append");
640            }
641            journal.sync_all().await.expect("failed to sync");
642
643            journal.prune(3).await.expect("failed to prune");
644
645            let err = journal.get(1, 0).await;
646            assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
647
648            let err = journal.get(2, 0).await;
649            assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
650
651            let item = journal.get(3, 0).await.expect("should exist");
652            assert_eq!(item, test_digest(3));
653
654            journal.destroy().await.expect("failed to destroy");
655        });
656    }
657
658    #[test_traced]
659    fn test_segmented_fixed_rewind() {
660        let executor = deterministic::Runner::default();
661        executor.start(|context| async move {
662            let cfg = test_cfg();
663            let mut journal = Journal::init(context.clone(), cfg.clone())
664                .await
665                .expect("failed to init");
666
667            // Create sections 1, 2, 3
668            for section in 1u64..=3 {
669                journal
670                    .append(section, test_digest(section))
671                    .await
672                    .expect("failed to append");
673            }
674            journal.sync_all().await.expect("failed to sync");
675
676            // Verify all sections exist
677            for section in 1u64..=3 {
678                let size = journal.size(section).await.expect("failed to get size");
679                assert!(size > 0, "section {section} should have data");
680            }
681
682            // Rewind to section 1 (should remove sections 2, 3)
683            let size = journal.size(1).await.expect("failed to get size");
684            journal.rewind(1, size).await.expect("failed to rewind");
685
686            // Verify section 1 still has data
687            let size = journal.size(1).await.expect("failed to get size");
688            assert!(size > 0, "section 1 should still have data");
689
690            // Verify sections 2, 3 are removed
691            for section in 2u64..=3 {
692                let size = journal.size(section).await.expect("failed to get size");
693                assert_eq!(size, 0, "section {section} should be removed");
694            }
695
696            // Verify data in section 1 is still readable
697            let item = journal.get(1, 0).await.expect("failed to get");
698            assert_eq!(item, test_digest(1));
699
700            journal.destroy().await.expect("failed to destroy");
701        });
702    }
703
704    #[test_traced]
705    fn test_segmented_fixed_rewind_many_sections() {
706        let executor = deterministic::Runner::default();
707        executor.start(|context| async move {
708            let cfg = test_cfg();
709            let mut journal = Journal::init(context.clone(), cfg.clone())
710                .await
711                .expect("failed to init");
712
713            // Create sections 1-10
714            for section in 1u64..=10 {
715                journal
716                    .append(section, test_digest(section))
717                    .await
718                    .expect("failed to append");
719            }
720            journal.sync_all().await.expect("failed to sync");
721
722            // Rewind to section 5 (should remove sections 6-10)
723            let size = journal.size(5).await.expect("failed to get size");
724            journal.rewind(5, size).await.expect("failed to rewind");
725
726            // Verify sections 1-5 still have data
727            for section in 1u64..=5 {
728                let size = journal.size(section).await.expect("failed to get size");
729                assert!(size > 0, "section {section} should still have data");
730            }
731
732            // Verify sections 6-10 are removed
733            for section in 6u64..=10 {
734                let size = journal.size(section).await.expect("failed to get size");
735                assert_eq!(size, 0, "section {section} should be removed");
736            }
737
738            // Verify data integrity via replay
739            {
740                let stream = journal
741                    .replay(0, 0, NZUsize!(1024))
742                    .await
743                    .expect("failed to replay");
744                pin_mut!(stream);
745                let mut items = Vec::new();
746                while let Some(result) = stream.next().await {
747                    let (section, _, item) = result.expect("failed to read");
748                    items.push((section, item));
749                }
750                assert_eq!(items.len(), 5);
751                for (i, (section, item)) in items.iter().enumerate() {
752                    assert_eq!(*section, (i + 1) as u64);
753                    assert_eq!(*item, test_digest((i + 1) as u64));
754                }
755            }
756
757            journal.destroy().await.expect("failed to destroy");
758        });
759    }
760
761    #[test_traced]
762    fn test_segmented_fixed_rewind_persistence() {
763        let executor = deterministic::Runner::default();
764        executor.start(|context| async move {
765            let cfg = test_cfg();
766
767            // Create sections 1-5
768            let mut journal = Journal::init(context.clone(), cfg.clone())
769                .await
770                .expect("failed to init");
771            for section in 1u64..=5 {
772                journal
773                    .append(section, test_digest(section))
774                    .await
775                    .expect("failed to append");
776            }
777            journal.sync_all().await.expect("failed to sync");
778
779            // Rewind to section 2
780            let size = journal.size(2).await.expect("failed to get size");
781            journal.rewind(2, size).await.expect("failed to rewind");
782            journal.sync_all().await.expect("failed to sync");
783            drop(journal);
784
785            // Re-init and verify only sections 1-2 exist
786            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
787                .await
788                .expect("failed to re-init");
789
790            // Verify sections 1-2 have data
791            for section in 1u64..=2 {
792                let size = journal.size(section).await.expect("failed to get size");
793                assert!(size > 0, "section {section} should have data after restart");
794            }
795
796            // Verify sections 3-5 are gone
797            for section in 3u64..=5 {
798                let size = journal.size(section).await.expect("failed to get size");
799                assert_eq!(size, 0, "section {section} should be gone after restart");
800            }
801
802            // Verify data integrity
803            let item1 = journal.get(1, 0).await.expect("failed to get");
804            assert_eq!(item1, test_digest(1));
805            let item2 = journal.get(2, 0).await.expect("failed to get");
806            assert_eq!(item2, test_digest(2));
807
808            journal.destroy().await.expect("failed to destroy");
809        });
810    }
811
812    #[test_traced]
813    fn test_segmented_fixed_corruption_recovery() {
814        let executor = deterministic::Runner::default();
815        executor.start(|context| async move {
816            let cfg = test_cfg();
817            let mut journal = Journal::init(context.clone(), cfg.clone())
818                .await
819                .expect("failed to init");
820
821            for i in 0u64..5 {
822                journal
823                    .append(1, test_digest(i))
824                    .await
825                    .expect("failed to append");
826            }
827            journal.sync_all().await.expect("failed to sync");
828            drop(journal);
829
830            let (blob, size) = context
831                .open(&cfg.partition, &1u64.to_be_bytes())
832                .await
833                .expect("failed to open blob");
834            blob.resize(size - 1).await.expect("failed to truncate");
835            blob.sync().await.expect("failed to sync");
836
837            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
838                .await
839                .expect("failed to re-init");
840
841            let count = {
842                let stream = journal
843                    .replay(0, 0, NZUsize!(1024))
844                    .await
845                    .expect("failed to replay");
846                pin_mut!(stream);
847
848                let mut count = 0;
849                while let Some(result) = stream.next().await {
850                    result.expect("should be ok");
851                    count += 1;
852                }
853                count
854            };
855            assert_eq!(count, 4);
856
857            journal.destroy().await.expect("failed to destroy");
858        });
859    }
860
861    #[test_traced]
862    fn test_segmented_fixed_persistence() {
863        let executor = deterministic::Runner::default();
864        executor.start(|context| async move {
865            let cfg = test_cfg();
866
867            // Create and populate journal
868            let mut journal = Journal::init(context.clone(), cfg.clone())
869                .await
870                .expect("failed to init");
871
872            for i in 0u64..5 {
873                journal
874                    .append(1, test_digest(i))
875                    .await
876                    .expect("failed to append");
877            }
878            journal.sync_all().await.expect("failed to sync");
879            drop(journal);
880
881            // Reopen and verify data persisted
882            let journal = Journal::<_, Digest>::init(context.clone(), cfg)
883                .await
884                .expect("failed to re-init");
885
886            for i in 0u64..5 {
887                let item = journal.get(1, i).await.expect("failed to get");
888                assert_eq!(item, test_digest(i));
889            }
890
891            journal.destroy().await.expect("failed to destroy");
892        });
893    }
894
895    #[test_traced]
896    fn test_segmented_fixed_section_len() {
897        let executor = deterministic::Runner::default();
898        executor.start(|context| async move {
899            let cfg = test_cfg();
900            let mut journal = Journal::init(context.clone(), cfg.clone())
901                .await
902                .expect("failed to init");
903
904            assert_eq!(journal.section_len(1).await.unwrap(), 0);
905
906            for i in 0u64..5 {
907                journal
908                    .append(1, test_digest(i))
909                    .await
910                    .expect("failed to append");
911            }
912
913            assert_eq!(journal.section_len(1).await.unwrap(), 5);
914            assert_eq!(journal.section_len(2).await.unwrap(), 0);
915
916            journal.destroy().await.expect("failed to destroy");
917        });
918    }
919
920    #[test_traced]
921    fn test_segmented_fixed_non_contiguous_sections() {
922        // Test that sections with gaps in numbering work correctly.
923        // Sections 1, 5, 10 should all be independent and accessible.
924        let executor = deterministic::Runner::default();
925        executor.start(|context| async move {
926            let cfg = test_cfg();
927            let mut journal = Journal::init(context.clone(), cfg.clone())
928                .await
929                .expect("failed to init");
930
931            // Create sections with gaps: 1, 5, 10
932            journal
933                .append(1, test_digest(100))
934                .await
935                .expect("failed to append");
936            journal
937                .append(5, test_digest(500))
938                .await
939                .expect("failed to append");
940            journal
941                .append(10, test_digest(1000))
942                .await
943                .expect("failed to append");
944            journal.sync_all().await.expect("failed to sync");
945
946            // Verify random access to each section
947            assert_eq!(journal.get(1, 0).await.unwrap(), test_digest(100));
948            assert_eq!(journal.get(5, 0).await.unwrap(), test_digest(500));
949            assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(1000));
950
951            // Verify non-existent sections return appropriate errors
952            for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
953                let result = journal.get(missing_section, 0).await;
954                assert!(
955                    matches!(result, Err(Error::SectionOutOfRange(_))),
956                    "Expected SectionOutOfRange for section {}, got {:?}",
957                    missing_section,
958                    result
959                );
960            }
961
962            // Drop and reopen to test replay
963            drop(journal);
964            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
965                .await
966                .expect("failed to re-init");
967
968            // Replay and verify all items in order
969            {
970                let stream = journal
971                    .replay(0, 0, NZUsize!(1024))
972                    .await
973                    .expect("failed to replay");
974                pin_mut!(stream);
975
976                let mut items = Vec::new();
977                while let Some(result) = stream.next().await {
978                    let (section, _, item) = result.expect("replay error");
979                    items.push((section, item));
980                }
981
982                assert_eq!(items.len(), 3, "Should have 3 items");
983                assert_eq!(items[0], (1, test_digest(100)));
984                assert_eq!(items[1], (5, test_digest(500)));
985                assert_eq!(items[2], (10, test_digest(1000)));
986            }
987
988            // Test replay starting from middle section (5)
989            {
990                let stream = journal
991                    .replay(5, 0, NZUsize!(1024))
992                    .await
993                    .expect("failed to replay from section 5");
994                pin_mut!(stream);
995
996                let mut items = Vec::new();
997                while let Some(result) = stream.next().await {
998                    let (section, _, item) = result.expect("replay error");
999                    items.push((section, item));
1000                }
1001
1002                assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
1003                assert_eq!(items[0], (5, test_digest(500)));
1004                assert_eq!(items[1], (10, test_digest(1000)));
1005            }
1006
1007            journal.destroy().await.expect("failed to destroy");
1008        });
1009    }
1010
1011    #[test_traced]
1012    fn test_segmented_fixed_empty_section_in_middle() {
1013        // Test that replay correctly handles an empty section between sections with data.
1014        // Section 1 has data, section 2 is empty, section 3 has data.
1015        let executor = deterministic::Runner::default();
1016        executor.start(|context| async move {
1017            let cfg = test_cfg();
1018            let mut journal = Journal::init(context.clone(), cfg.clone())
1019                .await
1020                .expect("failed to init");
1021
1022            // Append to section 1
1023            journal
1024                .append(1, test_digest(100))
1025                .await
1026                .expect("failed to append");
1027
1028            // Create section 2 but make it empty via rewind
1029            journal
1030                .append(2, test_digest(200))
1031                .await
1032                .expect("failed to append");
1033            journal.sync(2).await.expect("failed to sync");
1034            journal
1035                .rewind_section(2, 0)
1036                .await
1037                .expect("failed to rewind");
1038
1039            // Append to section 3
1040            journal
1041                .append(3, test_digest(300))
1042                .await
1043                .expect("failed to append");
1044
1045            journal.sync_all().await.expect("failed to sync");
1046
1047            // Verify section lengths
1048            assert_eq!(journal.section_len(1).await.unwrap(), 1);
1049            assert_eq!(journal.section_len(2).await.unwrap(), 0);
1050            assert_eq!(journal.section_len(3).await.unwrap(), 1);
1051
1052            // Drop and reopen to test replay
1053            drop(journal);
1054            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1055                .await
1056                .expect("failed to re-init");
1057
1058            // Replay all - should get items from sections 1 and 3, skipping empty section 2
1059            {
1060                let stream = journal
1061                    .replay(0, 0, NZUsize!(1024))
1062                    .await
1063                    .expect("failed to replay");
1064                pin_mut!(stream);
1065
1066                let mut items = Vec::new();
1067                while let Some(result) = stream.next().await {
1068                    let (section, _, item) = result.expect("replay error");
1069                    items.push((section, item));
1070                }
1071
1072                assert_eq!(
1073                    items.len(),
1074                    2,
1075                    "Should have 2 items (skipping empty section)"
1076                );
1077                assert_eq!(items[0], (1, test_digest(100)));
1078                assert_eq!(items[1], (3, test_digest(300)));
1079            }
1080
1081            // Replay starting from empty section 2 - should get only section 3
1082            {
1083                let stream = journal
1084                    .replay(2, 0, NZUsize!(1024))
1085                    .await
1086                    .expect("failed to replay from section 2");
1087                pin_mut!(stream);
1088
1089                let mut items = Vec::new();
1090                while let Some(result) = stream.next().await {
1091                    let (section, _, item) = result.expect("replay error");
1092                    items.push((section, item));
1093                }
1094
1095                assert_eq!(items.len(), 1, "Should have 1 item from section 3");
1096                assert_eq!(items[0], (3, test_digest(300)));
1097            }
1098
1099            journal.destroy().await.expect("failed to destroy");
1100        });
1101    }
1102
1103    #[test_traced]
1104    fn test_segmented_fixed_truncation_recovery_across_page_boundary() {
1105        // Test that truncating a single byte from a blob that has items straddling a page boundary
1106        // correctly recovers by removing the incomplete item.
1107        //
1108        // With PAGE_SIZE=44 and ITEM_SIZE=32:
1109        // - Item 0: bytes 0-31
1110        // - Item 1: bytes 32-63 (straddles page boundary at 44)
1111        // - Item 2: bytes 64-95 (straddles page boundary at 88)
1112        //
1113        // After 3 items we have 96 bytes = 2 full pages + 8 bytes. Truncating 1 byte leaves 95
1114        // bytes, which is not a multiple of 32. Recovery should truncate to 64 bytes (2 complete
1115        // items).
1116        let executor = deterministic::Runner::default();
1117        executor.start(|context| async move {
1118            let cfg = test_cfg();
1119            let mut journal = Journal::init(context.clone(), cfg.clone())
1120                .await
1121                .expect("failed to init");
1122
1123            // Append 3 items (just over 2 pages worth)
1124            for i in 0u64..3 {
1125                journal
1126                    .append(1, test_digest(i))
1127                    .await
1128                    .expect("failed to append");
1129            }
1130            journal.sync_all().await.expect("failed to sync");
1131
1132            // Verify all 3 items are readable
1133            for i in 0u64..3 {
1134                let item = journal.get(1, i).await.expect("failed to get");
1135                assert_eq!(item, test_digest(i));
1136            }
1137            drop(journal);
1138
1139            // Truncate the blob by exactly 1 byte to simulate partial write
1140            let (blob, size) = context
1141                .open(&cfg.partition, &1u64.to_be_bytes())
1142                .await
1143                .expect("failed to open blob");
1144            blob.resize(size - 1).await.expect("failed to truncate");
1145            blob.sync().await.expect("failed to sync");
1146            drop(blob);
1147
1148            // Reopen journal - should recover by truncating last page due to failed checksum, and
1149            // end up with a correct blob size due to partial-item trimming.
1150            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1151                .await
1152                .expect("failed to re-init");
1153
1154            // Verify section now has only 2 items
1155            assert_eq!(journal.section_len(1).await.unwrap(), 2);
1156
1157            // Verify size is the expected multiple of ITEM_SIZE (this would fail if we didn't trim
1158            // items and just relied on page-level checksum recovery).
1159            assert_eq!(journal.size(1).await.unwrap(), 64);
1160
1161            // Items 0 and 1 should still be readable
1162            let item0 = journal.get(1, 0).await.expect("failed to get item 0");
1163            assert_eq!(item0, test_digest(0));
1164            let item1 = journal.get(1, 1).await.expect("failed to get item 1");
1165            assert_eq!(item1, test_digest(1));
1166
1167            // Item 2 should return ItemOutOfRange
1168            let err = journal.get(1, 2).await;
1169            assert!(
1170                matches!(err, Err(Error::ItemOutOfRange(2))),
1171                "expected ItemOutOfRange(2), got {:?}",
1172                err
1173            );
1174
1175            journal.destroy().await.expect("failed to destroy");
1176        });
1177    }
1178
1179    #[test_traced]
1180    fn test_segmented_fixed_init_section_at_size() {
1181        // Test that init_section_at_size correctly initializes a section with zero-filled items.
1182        let executor = deterministic::Runner::default();
1183        executor.start(|context| async move {
1184            let cfg = test_cfg();
1185            let mut journal = Journal::init(context.clone(), cfg.clone())
1186                .await
1187                .expect("failed to init");
1188
1189            // Initialize section 1 with 5 zero-filled items
1190            journal
1191                .init_section_at_size(1, 5)
1192                .await
1193                .expect("failed to init section at size");
1194
1195            // Verify section has correct length
1196            assert_eq!(journal.section_len(1).await.unwrap(), 5);
1197
1198            // Verify size is correct (5 items * 32 bytes per Digest)
1199            assert_eq!(journal.size(1).await.unwrap(), 5 * 32);
1200
1201            // Verify we can read the zero-filled items
1202            let zero_digest = Sha256::fill(0);
1203            for i in 0u64..5 {
1204                let item = journal.get(1, i).await.expect("failed to get");
1205                assert_eq!(item, zero_digest, "item {i} should be zero-filled");
1206            }
1207
1208            // Verify position past the initialized range returns error
1209            let err = journal.get(1, 5).await;
1210            assert!(matches!(err, Err(Error::ItemOutOfRange(5))));
1211
1212            // Verify we can append after the initialized items
1213            let pos = journal
1214                .append(1, test_digest(100))
1215                .await
1216                .expect("failed to append");
1217            assert_eq!(pos, 5, "append should return position 5");
1218
1219            // Verify section now has 6 items
1220            assert_eq!(journal.section_len(1).await.unwrap(), 6);
1221
1222            // Verify the appended item is readable
1223            let item = journal.get(1, 5).await.expect("failed to get");
1224            assert_eq!(item, test_digest(100));
1225
1226            journal.sync_all().await.expect("failed to sync");
1227            drop(journal);
1228
1229            // Test persistence - reopen and verify
1230            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1231                .await
1232                .expect("failed to re-init");
1233
1234            assert_eq!(journal.section_len(1).await.unwrap(), 6);
1235
1236            // Verify zero-filled items persisted
1237            for i in 0u64..5 {
1238                let item = journal.get(1, i).await.expect("failed to get");
1239                assert_eq!(
1240                    item, zero_digest,
1241                    "item {i} should still be zero-filled after restart"
1242                );
1243            }
1244
1245            // Verify appended item persisted
1246            let item = journal.get(1, 5).await.expect("failed to get");
1247            assert_eq!(item, test_digest(100));
1248
1249            // Test replay includes zero-filled items
1250            {
1251                let stream = journal
1252                    .replay(1, 0, NZUsize!(1024))
1253                    .await
1254                    .expect("failed to replay");
1255                pin_mut!(stream);
1256
1257                let mut items = Vec::new();
1258                while let Some(result) = stream.next().await {
1259                    let (section, pos, item) = result.expect("replay error");
1260                    items.push((section, pos, item));
1261                }
1262
1263                assert_eq!(items.len(), 6);
1264                for (i, item) in items.iter().enumerate().take(5) {
1265                    assert_eq!(*item, (1, i as u64, zero_digest));
1266                }
1267                assert_eq!(items[5], (1, 5, test_digest(100)));
1268            }
1269
1270            // Test replay with non-zero start offset skips zero-filled items
1271            {
1272                let stream = journal
1273                    .replay(1, 3, NZUsize!(1024))
1274                    .await
1275                    .expect("failed to replay");
1276                pin_mut!(stream);
1277
1278                let mut items = Vec::new();
1279                while let Some(result) = stream.next().await {
1280                    let (section, pos, item) = result.expect("replay error");
1281                    items.push((section, pos, item));
1282                }
1283
1284                assert_eq!(items.len(), 3);
1285                assert_eq!(items[0], (1, 3, zero_digest));
1286                assert_eq!(items[1], (1, 4, zero_digest));
1287                assert_eq!(items[2], (1, 5, test_digest(100)));
1288            }
1289
1290            journal.destroy().await.expect("failed to destroy");
1291        });
1292    }
1293}