Skip to main content

commonware_storage/journal/segmented/
fixed.rs

1//! Segmented journal for fixed-size items.
2//!
3//! # Format
4//!
5//! Data is stored in one blob per section. Items are stored sequentially:
6//!
7//! ```text
8//! +--------+--------+--------+----------+
9//! | item_0 | item_1 |   ...  | item_n-1 |
10//! +--------+--------+--------+----------+
11//! ```
12//!
13//! # Sync
14//!
15//! Data written to `Journal` may not be immediately persisted to `Storage`. Use the
16//! `sync` method to force pending data to be written.
17//!
18//! # Pruning
19//!
20//! All data must be assigned to a `section`. This allows pruning entire sections
21//! (and their corresponding blobs) independently.
22
23use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
24use crate::journal::Error;
25use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _};
26use commonware_runtime::{
27    buffer::paged::{CacheRef, Replay},
28    Blob, Buf, IoBufMut, Metrics, Storage,
29};
30use futures::{
31    stream::{self, Stream},
32    StreamExt,
33};
34use std::{marker::PhantomData, num::NonZeroUsize};
35use tracing::{trace, warn};
36
37/// State for replaying a single section's blob.
38struct ReplayState<B: Blob> {
39    section: u64,
40    replay: Replay<B>,
41    position: u64,
42    done: bool,
43}
44
45/// Configuration for the fixed segmented journal.
46#[derive(Clone)]
47pub struct Config {
48    /// The partition to use for storing blobs.
49    pub partition: String,
50
51    /// The page cache to use for caching data.
52    pub page_cache: CacheRef,
53
54    /// The size of the write buffer to use for each blob.
55    pub write_buffer: NonZeroUsize,
56}
57
58/// A segmented journal with fixed-size entries.
59///
60/// Each section is stored in a separate blob. Within each blob, items are fixed-size.
61///
62/// # Repair
63///
64/// Like
65/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
66/// and
67/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
68/// the first invalid data read will be considered the new end of the journal (and the
69/// underlying [Blob] will be truncated to the last valid item). Repair occurs during
70/// init by checking each blob's size.
71pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
72    manager: Manager<E, AppendFactory>,
73    _array: PhantomData<A>,
74}
75
76impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
77    /// Size of each entry.
78    pub const CHUNK_SIZE: usize = A::SIZE;
79    const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
80
81    /// Initialize a new `Journal` instance.
82    ///
83    /// All backing blobs are opened but not read during initialization. Use `replay`
84    /// to iterate over all items.
85    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
86        let manager_cfg = ManagerConfig {
87            partition: cfg.partition,
88            factory: AppendFactory {
89                write_buffer: cfg.write_buffer,
90                page_cache_ref: cfg.page_cache,
91            },
92        };
93        let mut manager = Manager::init(context, manager_cfg).await?;
94
95        // Repair any blobs with trailing bytes (incomplete items from crash)
96        let sections: Vec<_> = manager.sections().collect();
97        for section in sections {
98            let size = manager.size(section).await?;
99            if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
100                let valid_size = size - (size % Self::CHUNK_SIZE_U64);
101                warn!(
102                    section,
103                    invalid_size = size,
104                    new_size = valid_size,
105                    "trailing bytes detected: truncating"
106                );
107                manager.rewind_section(section, valid_size).await?;
108            }
109        }
110
111        Ok(Self {
112            manager,
113            _array: PhantomData,
114        })
115    }
116
117    /// Append a new item to the journal in the given section.
118    ///
119    /// Returns the position of the item within the section (0-indexed).
120    pub async fn append(&mut self, section: u64, item: A) -> Result<u64, Error> {
121        let blob = self.manager.get_or_create(section).await?;
122
123        let size = blob.size().await;
124        if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
125            return Err(Error::InvalidBlobSize(section, size));
126        }
127        let position = size / Self::CHUNK_SIZE_U64;
128
129        // Encode the item
130        let buf = item.encode_mut();
131        blob.append(&buf).await?;
132        trace!(section, position, "appended item");
133
134        Ok(position)
135    }
136
137    /// Read the item at the given section and position.
138    ///
139    /// # Errors
140    ///
141    /// - [Error::AlreadyPrunedToSection] if the section has been pruned.
142    /// - [Error::SectionOutOfRange] if the section doesn't exist.
143    /// - [Error::ItemOutOfRange] if the position is beyond the blob size.
144    pub async fn get(&self, section: u64, position: u64) -> Result<A, Error> {
145        let blob = self
146            .manager
147            .get(section)?
148            .ok_or(Error::SectionOutOfRange(section))?;
149
150        let offset = position
151            .checked_mul(Self::CHUNK_SIZE_U64)
152            .ok_or(Error::ItemOutOfRange(position))?;
153        let end = offset
154            .checked_add(Self::CHUNK_SIZE_U64)
155            .ok_or(Error::ItemOutOfRange(position))?;
156        if end > blob.size().await {
157            return Err(Error::ItemOutOfRange(position));
158        }
159
160        let buf = blob
161            .read_at(offset, IoBufMut::zeroed(Self::CHUNK_SIZE))
162            .await?;
163        A::decode(buf.coalesce()).map_err(Error::Codec)
164    }
165
166    /// Read the last item in a section, if any.
167    pub async fn last(&self, section: u64) -> Result<Option<A>, Error> {
168        let blob = self
169            .manager
170            .get(section)?
171            .ok_or(Error::SectionOutOfRange(section))?;
172
173        let size = blob.size().await;
174        if size < Self::CHUNK_SIZE_U64 {
175            return Ok(None);
176        }
177
178        let last_position = (size / Self::CHUNK_SIZE_U64) - 1;
179        let offset = last_position * Self::CHUNK_SIZE_U64;
180        let buf = blob
181            .read_at(offset, IoBufMut::zeroed(Self::CHUNK_SIZE))
182            .await?;
183        A::decode(buf.coalesce()).map_err(Error::Codec).map(Some)
184    }
185
186    /// Returns a stream of all items starting from the given section.
187    ///
188    /// Each item is returned as (section, position, item).
189    pub async fn replay(
190        &self,
191        start_section: u64,
192        start_position: u64,
193        buffer: NonZeroUsize,
194    ) -> Result<impl Stream<Item = Result<(u64, u64, A), Error>> + Send + '_, Error> {
195        // Pre-create readers from blobs (async operation)
196        let mut blob_info = Vec::new();
197        for (&section, blob) in self.manager.sections_from(start_section) {
198            let blob_size = blob.size().await;
199            let mut replay = blob.replay(buffer).await?;
200            // For the first section, seek to the start position
201            let initial_position = if section == start_section {
202                let start = start_position * Self::CHUNK_SIZE_U64;
203                if start > blob_size {
204                    return Err(Error::ItemOutOfRange(start_position));
205                }
206                replay.seek_to(start).await?;
207                start_position
208            } else {
209                0
210            };
211            blob_info.push((section, replay, initial_position));
212        }
213
214        // Stream items as they are read to avoid occupying too much memory.
215        // Each blob is processed sequentially, yielding batches of items that are then
216        // flattened into individual stream elements.
217        Ok(
218            stream::iter(blob_info).flat_map(move |(section, replay, initial_position)| {
219                stream::unfold(
220                    ReplayState {
221                        section,
222                        replay,
223                        position: initial_position,
224                        done: false,
225                    },
226                    move |mut state| async move {
227                        if state.done {
228                            return None;
229                        }
230
231                        let mut batch: Vec<Result<(u64, u64, A), Error>> = Vec::new();
232                        loop {
233                            // Ensure we have enough data for one item
234                            match state.replay.ensure(Self::CHUNK_SIZE).await {
235                                Ok(true) => {}
236                                Ok(false) => {
237                                    // Reader exhausted - we're done with this blob
238                                    state.done = true;
239                                    return if batch.is_empty() {
240                                        None
241                                    } else {
242                                        Some((batch, state))
243                                    };
244                                }
245                                Err(err) => {
246                                    batch.push(Err(Error::Runtime(err)));
247                                    state.done = true;
248                                    return Some((batch, state));
249                                }
250                            }
251
252                            // Decode items from buffer
253                            while state.replay.remaining() >= Self::CHUNK_SIZE {
254                                match A::read(&mut state.replay) {
255                                    Ok(item) => {
256                                        batch.push(Ok((state.section, state.position, item)));
257                                        state.position += 1;
258                                    }
259                                    Err(err) => {
260                                        batch.push(Err(Error::Codec(err)));
261                                        state.done = true;
262                                        return Some((batch, state));
263                                    }
264                                }
265                            }
266
267                            // Return batch if we have items
268                            if !batch.is_empty() {
269                                return Some((batch, state));
270                            }
271                        }
272                    },
273                )
274                .flat_map(stream::iter)
275            }),
276        )
277    }
278
279    /// Sync the given section to storage.
280    pub async fn sync(&self, section: u64) -> Result<(), Error> {
281        self.manager.sync(section).await
282    }
283
284    /// Sync all sections to storage.
285    pub async fn sync_all(&self) -> Result<(), Error> {
286        self.manager.sync_all().await
287    }
288
289    /// Prune all sections less than `min`. Returns true if any were pruned.
290    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
291        self.manager.prune(min).await
292    }
293
294    /// Returns the oldest section number, if any blobs exist.
295    pub fn oldest_section(&self) -> Option<u64> {
296        self.manager.oldest_section()
297    }
298
299    /// Returns the newest section number, if any blobs exist.
300    pub fn newest_section(&self) -> Option<u64> {
301        self.manager.newest_section()
302    }
303
304    /// Returns an iterator over all section numbers.
305    pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
306        self.manager.sections_from(0).map(|(section, _)| *section)
307    }
308
309    /// Returns the number of items in the given section.
310    pub async fn section_len(&self, section: u64) -> Result<u64, Error> {
311        let size = self.manager.size(section).await?;
312        Ok(size / Self::CHUNK_SIZE_U64)
313    }
314
315    /// Returns the byte size of the given section.
316    pub async fn size(&self, section: u64) -> Result<u64, Error> {
317        self.manager.size(section).await
318    }
319
320    /// Rewind the journal to a specific section and byte offset.
321    ///
322    /// This truncates the section to the given size. All sections
323    /// after `section` are removed.
324    pub async fn rewind(&mut self, section: u64, offset: u64) -> Result<(), Error> {
325        self.manager.rewind(section, offset).await
326    }
327
328    /// Rewind only the given section to a specific byte offset.
329    ///
330    /// Unlike `rewind`, this does not affect other sections.
331    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
332        self.manager.rewind_section(section, size).await
333    }
334
335    /// Remove all underlying blobs.
336    pub async fn destroy(self) -> Result<(), Error> {
337        self.manager.destroy().await
338    }
339
340    /// Clear all data, resetting the journal to an empty state.
341    ///
342    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
343    pub async fn clear(&mut self) -> Result<(), Error> {
344        self.manager.clear().await
345    }
346
347    /// Ensure a section exists, creating an empty blob if needed.
348    ///
349    /// This is used to maintain the invariant that at least one blob always exists
350    /// (the "tail" blob), which allows reconstructing journal size on reopen.
351    pub(crate) async fn ensure_section_exists(&mut self, section: u64) -> Result<(), Error> {
352        self.manager.get_or_create(section).await?;
353        Ok(())
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
361    use commonware_macros::test_traced;
362    use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner};
363    use commonware_utils::{NZUsize, NZU16};
364    use core::num::NonZeroU16;
365    use futures::{pin_mut, StreamExt};
366
367    const PAGE_SIZE: NonZeroU16 = NZU16!(44);
368    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
369
370    fn test_digest(value: u64) -> Digest {
371        Sha256::hash(&value.to_be_bytes())
372    }
373
374    fn test_cfg() -> Config {
375        Config {
376            partition: "test_partition".into(),
377            page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
378            write_buffer: NZUsize!(2048),
379        }
380    }
381
382    #[test_traced]
383    fn test_segmented_fixed_append_and_get() {
384        let executor = deterministic::Runner::default();
385        executor.start(|context| async move {
386            let cfg = test_cfg();
387            let mut journal = Journal::init(context.clone(), cfg.clone())
388                .await
389                .expect("failed to init");
390
391            let pos0 = journal
392                .append(1, test_digest(0))
393                .await
394                .expect("failed to append");
395            assert_eq!(pos0, 0);
396
397            let pos1 = journal
398                .append(1, test_digest(1))
399                .await
400                .expect("failed to append");
401            assert_eq!(pos1, 1);
402
403            let pos2 = journal
404                .append(2, test_digest(2))
405                .await
406                .expect("failed to append");
407            assert_eq!(pos2, 0);
408
409            let item0 = journal.get(1, 0).await.expect("failed to get");
410            assert_eq!(item0, test_digest(0));
411
412            let item1 = journal.get(1, 1).await.expect("failed to get");
413            assert_eq!(item1, test_digest(1));
414
415            let item2 = journal.get(2, 0).await.expect("failed to get");
416            assert_eq!(item2, test_digest(2));
417
418            let err = journal.get(1, 2).await;
419            assert!(matches!(err, Err(Error::ItemOutOfRange(2))));
420
421            let err = journal.get(3, 0).await;
422            assert!(matches!(err, Err(Error::SectionOutOfRange(3))));
423
424            journal.destroy().await.expect("failed to destroy");
425        });
426    }
427
428    #[test_traced]
429    fn test_segmented_fixed_replay() {
430        let executor = deterministic::Runner::default();
431        executor.start(|context| async move {
432            let cfg = test_cfg();
433            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
434                .await
435                .expect("failed to init");
436
437            for i in 0u64..10 {
438                journal
439                    .append(1, test_digest(i))
440                    .await
441                    .expect("failed to append");
442            }
443            for i in 10u64..20 {
444                journal
445                    .append(2, test_digest(i))
446                    .await
447                    .expect("failed to append");
448            }
449
450            journal.sync_all().await.expect("failed to sync");
451            drop(journal);
452
453            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
454                .await
455                .expect("failed to re-init");
456
457            let items = {
458                let stream = journal
459                    .replay(0, 0, NZUsize!(1024))
460                    .await
461                    .expect("failed to replay");
462                pin_mut!(stream);
463
464                let mut items = Vec::new();
465                while let Some(result) = stream.next().await {
466                    match result {
467                        Ok((section, pos, item)) => items.push((section, pos, item)),
468                        Err(err) => panic!("replay error: {err}"),
469                    }
470                }
471                items
472            };
473
474            assert_eq!(items.len(), 20);
475            for (i, item) in items.iter().enumerate().take(10) {
476                assert_eq!(item.0, 1);
477                assert_eq!(item.1, i as u64);
478                assert_eq!(item.2, test_digest(i as u64));
479            }
480            for (i, item) in items.iter().enumerate().skip(10).take(10) {
481                assert_eq!(item.0, 2);
482                assert_eq!(item.1, (i - 10) as u64);
483                assert_eq!(item.2, test_digest(i as u64));
484            }
485
486            journal.destroy().await.expect("failed to destroy");
487        });
488    }
489
490    #[test_traced]
491    fn test_segmented_fixed_replay_with_start_offset() {
492        // Test that replay with a non-zero start_position correctly skips items.
493        let executor = deterministic::Runner::default();
494        executor.start(|context| async move {
495            let cfg = test_cfg();
496            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
497                .await
498                .expect("failed to init");
499
500            // Append 10 items to section 1
501            for i in 0u64..10 {
502                journal
503                    .append(1, test_digest(i))
504                    .await
505                    .expect("failed to append");
506            }
507            // Append 5 items to section 2
508            for i in 10u64..15 {
509                journal
510                    .append(2, test_digest(i))
511                    .await
512                    .expect("failed to append");
513            }
514            journal.sync_all().await.expect("failed to sync");
515            drop(journal);
516
517            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
518                .await
519                .expect("failed to re-init");
520
521            // Replay from section 1, position 5 - should get items 5-9 from section 1 and all of section 2
522            {
523                let stream = journal
524                    .replay(1, 5, NZUsize!(1024))
525                    .await
526                    .expect("failed to replay");
527                pin_mut!(stream);
528
529                let mut items = Vec::new();
530                while let Some(result) = stream.next().await {
531                    let (section, pos, item) = result.expect("replay error");
532                    items.push((section, pos, item));
533                }
534
535                assert_eq!(
536                    items.len(),
537                    10,
538                    "Should have 5 items from section 1 + 5 from section 2"
539                );
540
541                // Check section 1 items (positions 5-9)
542                for (i, (section, pos, item)) in items.iter().enumerate().take(5) {
543                    assert_eq!(*section, 1);
544                    assert_eq!(*pos, (i + 5) as u64);
545                    assert_eq!(*item, test_digest((i + 5) as u64));
546                }
547
548                // Check section 2 items (positions 0-4)
549                for (i, (section, pos, item)) in items.iter().enumerate().skip(5) {
550                    assert_eq!(*section, 2);
551                    assert_eq!(*pos, (i - 5) as u64);
552                    assert_eq!(*item, test_digest((i + 5) as u64));
553                }
554            }
555
556            // Replay from section 1, position 9 - should get only item 9 from section 1 and all of section 2
557            {
558                let stream = journal
559                    .replay(1, 9, NZUsize!(1024))
560                    .await
561                    .expect("failed to replay");
562                pin_mut!(stream);
563
564                let mut items = Vec::new();
565                while let Some(result) = stream.next().await {
566                    let (section, pos, item) = result.expect("replay error");
567                    items.push((section, pos, item));
568                }
569
570                assert_eq!(
571                    items.len(),
572                    6,
573                    "Should have 1 item from section 1 + 5 from section 2"
574                );
575                assert_eq!(items[0], (1, 9, test_digest(9)));
576                for (i, (section, pos, item)) in items.iter().enumerate().skip(1) {
577                    assert_eq!(*section, 2);
578                    assert_eq!(*pos, (i - 1) as u64);
579                    assert_eq!(*item, test_digest((i + 9) as u64));
580                }
581            }
582
583            // Replay from section 2, position 3 - should get only items 3-4 from section 2
584            {
585                let stream = journal
586                    .replay(2, 3, NZUsize!(1024))
587                    .await
588                    .expect("failed to replay");
589                pin_mut!(stream);
590
591                let mut items = Vec::new();
592                while let Some(result) = stream.next().await {
593                    let (section, pos, item) = result.expect("replay error");
594                    items.push((section, pos, item));
595                }
596
597                assert_eq!(items.len(), 2, "Should have 2 items from section 2");
598                assert_eq!(items[0], (2, 3, test_digest(13)));
599                assert_eq!(items[1], (2, 4, test_digest(14)));
600            }
601
602            // Replay from position past the end should return ItemOutOfRange error
603            let result = journal.replay(1, 100, NZUsize!(1024)).await;
604            assert!(matches!(result, Err(Error::ItemOutOfRange(100))));
605            drop(result);
606
607            journal.destroy().await.expect("failed to destroy");
608        });
609    }
610
611    #[test_traced]
612    fn test_segmented_fixed_prune() {
613        let executor = deterministic::Runner::default();
614        executor.start(|context| async move {
615            let cfg = test_cfg();
616            let mut journal = Journal::init(context.clone(), cfg.clone())
617                .await
618                .expect("failed to init");
619
620            for section in 1u64..=5 {
621                journal
622                    .append(section, test_digest(section))
623                    .await
624                    .expect("failed to append");
625            }
626            journal.sync_all().await.expect("failed to sync");
627
628            journal.prune(3).await.expect("failed to prune");
629
630            let err = journal.get(1, 0).await;
631            assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
632
633            let err = journal.get(2, 0).await;
634            assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
635
636            let item = journal.get(3, 0).await.expect("should exist");
637            assert_eq!(item, test_digest(3));
638
639            journal.destroy().await.expect("failed to destroy");
640        });
641    }
642
643    #[test_traced]
644    fn test_segmented_fixed_rewind() {
645        let executor = deterministic::Runner::default();
646        executor.start(|context| async move {
647            let cfg = test_cfg();
648            let mut journal = Journal::init(context.clone(), cfg.clone())
649                .await
650                .expect("failed to init");
651
652            // Create sections 1, 2, 3
653            for section in 1u64..=3 {
654                journal
655                    .append(section, test_digest(section))
656                    .await
657                    .expect("failed to append");
658            }
659            journal.sync_all().await.expect("failed to sync");
660
661            // Verify all sections exist
662            for section in 1u64..=3 {
663                let size = journal.size(section).await.expect("failed to get size");
664                assert!(size > 0, "section {section} should have data");
665            }
666
667            // Rewind to section 1 (should remove sections 2, 3)
668            let size = journal.size(1).await.expect("failed to get size");
669            journal.rewind(1, size).await.expect("failed to rewind");
670
671            // Verify section 1 still has data
672            let size = journal.size(1).await.expect("failed to get size");
673            assert!(size > 0, "section 1 should still have data");
674
675            // Verify sections 2, 3 are removed
676            for section in 2u64..=3 {
677                let size = journal.size(section).await.expect("failed to get size");
678                assert_eq!(size, 0, "section {section} should be removed");
679            }
680
681            // Verify data in section 1 is still readable
682            let item = journal.get(1, 0).await.expect("failed to get");
683            assert_eq!(item, test_digest(1));
684
685            journal.destroy().await.expect("failed to destroy");
686        });
687    }
688
689    #[test_traced]
690    fn test_segmented_fixed_rewind_many_sections() {
691        let executor = deterministic::Runner::default();
692        executor.start(|context| async move {
693            let cfg = test_cfg();
694            let mut journal = Journal::init(context.clone(), cfg.clone())
695                .await
696                .expect("failed to init");
697
698            // Create sections 1-10
699            for section in 1u64..=10 {
700                journal
701                    .append(section, test_digest(section))
702                    .await
703                    .expect("failed to append");
704            }
705            journal.sync_all().await.expect("failed to sync");
706
707            // Rewind to section 5 (should remove sections 6-10)
708            let size = journal.size(5).await.expect("failed to get size");
709            journal.rewind(5, size).await.expect("failed to rewind");
710
711            // Verify sections 1-5 still have data
712            for section in 1u64..=5 {
713                let size = journal.size(section).await.expect("failed to get size");
714                assert!(size > 0, "section {section} should still have data");
715            }
716
717            // Verify sections 6-10 are removed
718            for section in 6u64..=10 {
719                let size = journal.size(section).await.expect("failed to get size");
720                assert_eq!(size, 0, "section {section} should be removed");
721            }
722
723            // Verify data integrity via replay
724            {
725                let stream = journal
726                    .replay(0, 0, NZUsize!(1024))
727                    .await
728                    .expect("failed to replay");
729                pin_mut!(stream);
730                let mut items = Vec::new();
731                while let Some(result) = stream.next().await {
732                    let (section, _, item) = result.expect("failed to read");
733                    items.push((section, item));
734                }
735                assert_eq!(items.len(), 5);
736                for (i, (section, item)) in items.iter().enumerate() {
737                    assert_eq!(*section, (i + 1) as u64);
738                    assert_eq!(*item, test_digest((i + 1) as u64));
739                }
740            }
741
742            journal.destroy().await.expect("failed to destroy");
743        });
744    }
745
746    #[test_traced]
747    fn test_segmented_fixed_rewind_persistence() {
748        let executor = deterministic::Runner::default();
749        executor.start(|context| async move {
750            let cfg = test_cfg();
751
752            // Create sections 1-5
753            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
754                .await
755                .expect("failed to init");
756            for section in 1u64..=5 {
757                journal
758                    .append(section, test_digest(section))
759                    .await
760                    .expect("failed to append");
761            }
762            journal.sync_all().await.expect("failed to sync");
763
764            // Rewind to section 2
765            let size = journal.size(2).await.expect("failed to get size");
766            journal.rewind(2, size).await.expect("failed to rewind");
767            journal.sync_all().await.expect("failed to sync");
768            drop(journal);
769
770            // Re-init and verify only sections 1-2 exist
771            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
772                .await
773                .expect("failed to re-init");
774
775            // Verify sections 1-2 have data
776            for section in 1u64..=2 {
777                let size = journal.size(section).await.expect("failed to get size");
778                assert!(size > 0, "section {section} should have data after restart");
779            }
780
781            // Verify sections 3-5 are gone
782            for section in 3u64..=5 {
783                let size = journal.size(section).await.expect("failed to get size");
784                assert_eq!(size, 0, "section {section} should be gone after restart");
785            }
786
787            // Verify data integrity
788            let item1 = journal.get(1, 0).await.expect("failed to get");
789            assert_eq!(item1, test_digest(1));
790            let item2 = journal.get(2, 0).await.expect("failed to get");
791            assert_eq!(item2, test_digest(2));
792
793            journal.destroy().await.expect("failed to destroy");
794        });
795    }
796
797    #[test_traced]
798    fn test_segmented_fixed_corruption_recovery() {
799        let executor = deterministic::Runner::default();
800        executor.start(|context| async move {
801            let cfg = test_cfg();
802            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
803                .await
804                .expect("failed to init");
805
806            for i in 0u64..5 {
807                journal
808                    .append(1, test_digest(i))
809                    .await
810                    .expect("failed to append");
811            }
812            journal.sync_all().await.expect("failed to sync");
813            drop(journal);
814
815            let (blob, size) = context
816                .open(&cfg.partition, &1u64.to_be_bytes())
817                .await
818                .expect("failed to open blob");
819            blob.resize(size - 1).await.expect("failed to truncate");
820            blob.sync().await.expect("failed to sync");
821
822            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
823                .await
824                .expect("failed to re-init");
825
826            let count = {
827                let stream = journal
828                    .replay(0, 0, NZUsize!(1024))
829                    .await
830                    .expect("failed to replay");
831                pin_mut!(stream);
832
833                let mut count = 0;
834                while let Some(result) = stream.next().await {
835                    result.expect("should be ok");
836                    count += 1;
837                }
838                count
839            };
840            assert_eq!(count, 4);
841
842            journal.destroy().await.expect("failed to destroy");
843        });
844    }
845
846    #[test_traced]
847    fn test_segmented_fixed_persistence() {
848        let executor = deterministic::Runner::default();
849        executor.start(|context| async move {
850            let cfg = test_cfg();
851
852            // Create and populate journal
853            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
854                .await
855                .expect("failed to init");
856
857            for i in 0u64..5 {
858                journal
859                    .append(1, test_digest(i))
860                    .await
861                    .expect("failed to append");
862            }
863            journal.sync_all().await.expect("failed to sync");
864            drop(journal);
865
866            // Reopen and verify data persisted
867            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg)
868                .await
869                .expect("failed to re-init");
870
871            for i in 0u64..5 {
872                let item = journal.get(1, i).await.expect("failed to get");
873                assert_eq!(item, test_digest(i));
874            }
875
876            journal.destroy().await.expect("failed to destroy");
877        });
878    }
879
880    #[test_traced]
881    fn test_segmented_fixed_section_len() {
882        let executor = deterministic::Runner::default();
883        executor.start(|context| async move {
884            let cfg = test_cfg();
885            let mut journal = Journal::init(context.clone(), cfg.clone())
886                .await
887                .expect("failed to init");
888
889            assert_eq!(journal.section_len(1).await.unwrap(), 0);
890
891            for i in 0u64..5 {
892                journal
893                    .append(1, test_digest(i))
894                    .await
895                    .expect("failed to append");
896            }
897
898            assert_eq!(journal.section_len(1).await.unwrap(), 5);
899            assert_eq!(journal.section_len(2).await.unwrap(), 0);
900
901            journal.destroy().await.expect("failed to destroy");
902        });
903    }
904
905    #[test_traced]
906    fn test_segmented_fixed_non_contiguous_sections() {
907        // Test that sections with gaps in numbering work correctly.
908        // Sections 1, 5, 10 should all be independent and accessible.
909        let executor = deterministic::Runner::default();
910        executor.start(|context| async move {
911            let cfg = test_cfg();
912            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
913                .await
914                .expect("failed to init");
915
916            // Create sections with gaps: 1, 5, 10
917            journal
918                .append(1, test_digest(100))
919                .await
920                .expect("failed to append");
921            journal
922                .append(5, test_digest(500))
923                .await
924                .expect("failed to append");
925            journal
926                .append(10, test_digest(1000))
927                .await
928                .expect("failed to append");
929            journal.sync_all().await.expect("failed to sync");
930
931            // Verify random access to each section
932            assert_eq!(journal.get(1, 0).await.unwrap(), test_digest(100));
933            assert_eq!(journal.get(5, 0).await.unwrap(), test_digest(500));
934            assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(1000));
935
936            // Verify non-existent sections return appropriate errors
937            for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
938                let result = journal.get(missing_section, 0).await;
939                assert!(
940                    matches!(result, Err(Error::SectionOutOfRange(_))),
941                    "Expected SectionOutOfRange for section {}, got {:?}",
942                    missing_section,
943                    result
944                );
945            }
946
947            // Drop and reopen to test replay
948            drop(journal);
949            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
950                .await
951                .expect("failed to re-init");
952
953            // Replay and verify all items in order
954            {
955                let stream = journal
956                    .replay(0, 0, NZUsize!(1024))
957                    .await
958                    .expect("failed to replay");
959                pin_mut!(stream);
960
961                let mut items = Vec::new();
962                while let Some(result) = stream.next().await {
963                    let (section, _, item) = result.expect("replay error");
964                    items.push((section, item));
965                }
966
967                assert_eq!(items.len(), 3, "Should have 3 items");
968                assert_eq!(items[0], (1, test_digest(100)));
969                assert_eq!(items[1], (5, test_digest(500)));
970                assert_eq!(items[2], (10, test_digest(1000)));
971            }
972
973            // Test replay starting from middle section (5)
974            {
975                let stream = journal
976                    .replay(5, 0, NZUsize!(1024))
977                    .await
978                    .expect("failed to replay from section 5");
979                pin_mut!(stream);
980
981                let mut items = Vec::new();
982                while let Some(result) = stream.next().await {
983                    let (section, _, item) = result.expect("replay error");
984                    items.push((section, item));
985                }
986
987                assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
988                assert_eq!(items[0], (5, test_digest(500)));
989                assert_eq!(items[1], (10, test_digest(1000)));
990            }
991
992            journal.destroy().await.expect("failed to destroy");
993        });
994    }
995
996    #[test_traced]
997    fn test_segmented_fixed_empty_section_in_middle() {
998        // Test that replay correctly handles an empty section between sections with data.
999        // Section 1 has data, section 2 is empty, section 3 has data.
1000        let executor = deterministic::Runner::default();
1001        executor.start(|context| async move {
1002            let cfg = test_cfg();
1003            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1004                .await
1005                .expect("failed to init");
1006
1007            // Append to section 1
1008            journal
1009                .append(1, test_digest(100))
1010                .await
1011                .expect("failed to append");
1012
1013            // Create section 2 but make it empty via rewind
1014            journal
1015                .append(2, test_digest(200))
1016                .await
1017                .expect("failed to append");
1018            journal.sync(2).await.expect("failed to sync");
1019            journal
1020                .rewind_section(2, 0)
1021                .await
1022                .expect("failed to rewind");
1023
1024            // Append to section 3
1025            journal
1026                .append(3, test_digest(300))
1027                .await
1028                .expect("failed to append");
1029
1030            journal.sync_all().await.expect("failed to sync");
1031
1032            // Verify section lengths
1033            assert_eq!(journal.section_len(1).await.unwrap(), 1);
1034            assert_eq!(journal.section_len(2).await.unwrap(), 0);
1035            assert_eq!(journal.section_len(3).await.unwrap(), 1);
1036
1037            // Drop and reopen to test replay
1038            drop(journal);
1039            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1040                .await
1041                .expect("failed to re-init");
1042
1043            // Replay all - should get items from sections 1 and 3, skipping empty section 2
1044            {
1045                let stream = journal
1046                    .replay(0, 0, NZUsize!(1024))
1047                    .await
1048                    .expect("failed to replay");
1049                pin_mut!(stream);
1050
1051                let mut items = Vec::new();
1052                while let Some(result) = stream.next().await {
1053                    let (section, _, item) = result.expect("replay error");
1054                    items.push((section, item));
1055                }
1056
1057                assert_eq!(
1058                    items.len(),
1059                    2,
1060                    "Should have 2 items (skipping empty section)"
1061                );
1062                assert_eq!(items[0], (1, test_digest(100)));
1063                assert_eq!(items[1], (3, test_digest(300)));
1064            }
1065
1066            // Replay starting from empty section 2 - should get only section 3
1067            {
1068                let stream = journal
1069                    .replay(2, 0, NZUsize!(1024))
1070                    .await
1071                    .expect("failed to replay from section 2");
1072                pin_mut!(stream);
1073
1074                let mut items = Vec::new();
1075                while let Some(result) = stream.next().await {
1076                    let (section, _, item) = result.expect("replay error");
1077                    items.push((section, item));
1078                }
1079
1080                assert_eq!(items.len(), 1, "Should have 1 item from section 3");
1081                assert_eq!(items[0], (3, test_digest(300)));
1082            }
1083
1084            journal.destroy().await.expect("failed to destroy");
1085        });
1086    }
1087
1088    #[test_traced]
1089    fn test_segmented_fixed_truncation_recovery_across_page_boundary() {
1090        // Test that truncating a single byte from a blob that has items straddling a page boundary
1091        // correctly recovers by removing the incomplete item.
1092        //
1093        // With PAGE_SIZE=44 and ITEM_SIZE=32:
1094        // - Item 0: bytes 0-31
1095        // - Item 1: bytes 32-63 (straddles page boundary at 44)
1096        // - Item 2: bytes 64-95 (straddles page boundary at 88)
1097        //
1098        // After 3 items we have 96 bytes = 2 full pages + 8 bytes. Truncating 1 byte leaves 95
1099        // bytes, which is not a multiple of 32. Recovery should truncate to 64 bytes (2 complete
1100        // items).
1101        let executor = deterministic::Runner::default();
1102        executor.start(|context| async move {
1103            let cfg = test_cfg();
1104            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1105                .await
1106                .expect("failed to init");
1107
1108            // Append 3 items (just over 2 pages worth)
1109            for i in 0u64..3 {
1110                journal
1111                    .append(1, test_digest(i))
1112                    .await
1113                    .expect("failed to append");
1114            }
1115            journal.sync_all().await.expect("failed to sync");
1116
1117            // Verify all 3 items are readable
1118            for i in 0u64..3 {
1119                let item = journal.get(1, i).await.expect("failed to get");
1120                assert_eq!(item, test_digest(i));
1121            }
1122            drop(journal);
1123
1124            // Truncate the blob by exactly 1 byte to simulate partial write
1125            let (blob, size) = context
1126                .open(&cfg.partition, &1u64.to_be_bytes())
1127                .await
1128                .expect("failed to open blob");
1129            blob.resize(size - 1).await.expect("failed to truncate");
1130            blob.sync().await.expect("failed to sync");
1131            drop(blob);
1132
1133            // Reopen journal - should recover by truncating last page due to failed checksum, and
1134            // end up with a correct blob size due to partial-item trimming.
1135            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1136                .await
1137                .expect("failed to re-init");
1138
1139            // Verify section now has only 2 items
1140            assert_eq!(journal.section_len(1).await.unwrap(), 2);
1141
1142            // Verify size is the expected multiple of ITEM_SIZE (this would fail if we didn't trim
1143            // items and just relied on page-level checksum recovery).
1144            assert_eq!(journal.size(1).await.unwrap(), 64);
1145
1146            // Items 0 and 1 should still be readable
1147            let item0 = journal.get(1, 0).await.expect("failed to get item 0");
1148            assert_eq!(item0, test_digest(0));
1149            let item1 = journal.get(1, 1).await.expect("failed to get item 1");
1150            assert_eq!(item1, test_digest(1));
1151
1152            // Item 2 should return ItemOutOfRange
1153            let err = journal.get(1, 2).await;
1154            assert!(
1155                matches!(err, Err(Error::ItemOutOfRange(2))),
1156                "expected ItemOutOfRange(2), got {:?}",
1157                err
1158            );
1159
1160            journal.destroy().await.expect("failed to destroy");
1161        });
1162    }
1163
1164    #[test_traced]
1165    fn test_journal_clear() {
1166        let executor = deterministic::Runner::default();
1167        executor.start(|context| async move {
1168            let cfg = Config {
1169                partition: "clear_test".into(),
1170                page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1171                write_buffer: NZUsize!(1024),
1172            };
1173
1174            let mut journal: Journal<_, Digest> =
1175                Journal::init(context.with_label("journal"), cfg.clone())
1176                    .await
1177                    .expect("Failed to initialize journal");
1178
1179            // Append items across multiple sections
1180            for section in 0..5u64 {
1181                for i in 0..10u64 {
1182                    journal
1183                        .append(section, test_digest(section * 1000 + i))
1184                        .await
1185                        .expect("Failed to append");
1186                }
1187                journal.sync(section).await.expect("Failed to sync");
1188            }
1189
1190            // Verify we have data
1191            assert_eq!(journal.get(0, 0).await.unwrap(), test_digest(0));
1192            assert_eq!(journal.get(4, 0).await.unwrap(), test_digest(4000));
1193
1194            // Clear the journal
1195            journal.clear().await.expect("Failed to clear");
1196
1197            // After clear, all reads should fail
1198            for section in 0..5u64 {
1199                assert!(matches!(
1200                    journal.get(section, 0).await,
1201                    Err(Error::SectionOutOfRange(s)) if s == section
1202                ));
1203            }
1204
1205            // Append new data after clear
1206            for i in 0..5u64 {
1207                journal
1208                    .append(10, test_digest(i * 100))
1209                    .await
1210                    .expect("Failed to append after clear");
1211            }
1212            journal.sync(10).await.expect("Failed to sync after clear");
1213
1214            // New data should be readable
1215            assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(0));
1216
1217            // Old sections should still be missing
1218            assert!(matches!(
1219                journal.get(0, 0).await,
1220                Err(Error::SectionOutOfRange(0))
1221            ));
1222
1223            journal.destroy().await.unwrap();
1224        });
1225    }
1226}