Skip to main content

commonware_storage/journal/segmented/
fixed.rs

1//! Segmented journal for fixed-size items.
2//!
3//! # Format
4//!
5//! Data is stored in one blob per section. Items are stored sequentially:
6//!
7//! ```text
8//! +--------+--------+--------+----------+
9//! | item_0 | item_1 |   ...  | item_n-1 |
10//! +--------+--------+--------+----------+
11//! ```
12//!
13//! # Sync
14//!
15//! Data written to `Journal` may not be immediately persisted to `Storage`. Use the
16//! `sync` method to force pending data to be written.
17//!
18//! # Pruning
19//!
20//! All data must be assigned to a `section`. This allows pruning entire sections
21//! (and their corresponding blobs) independently.
22
23use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
24use crate::journal::Error;
25use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _};
26use commonware_runtime::{
27    buffer::paged::{CacheRef, Replay},
28    Blob, Buf, Metrics, Storage,
29};
30use futures::{
31    stream::{self, Stream},
32    StreamExt,
33};
34use std::{marker::PhantomData, num::NonZeroUsize};
35use tracing::{trace, warn};
36
37/// State for replaying a single section's blob.
38struct ReplayState<B: Blob> {
39    section: u64,
40    replay: Replay<B>,
41    position: u64,
42    done: bool,
43}
44
45/// Configuration for the fixed segmented journal.
46#[derive(Clone)]
47pub struct Config {
48    /// The partition to use for storing blobs.
49    pub partition: String,
50
51    /// The page cache to use for caching data.
52    pub page_cache: CacheRef,
53
54    /// The size of the write buffer to use for each blob.
55    pub write_buffer: NonZeroUsize,
56}
57
58/// A segmented journal with fixed-size entries.
59///
60/// Each section is stored in a separate blob. Within each blob, items are fixed-size.
61///
62/// # Repair
63///
64/// Like
65/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
66/// and
67/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
68/// the first invalid data read will be considered the new end of the journal (and the
69/// underlying [Blob] will be truncated to the last valid item). Repair occurs during
70/// init by checking each blob's size.
71pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
72    manager: Manager<E, AppendFactory>,
73    _array: PhantomData<A>,
74}
75
76impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
77    /// Size of each entry.
78    pub const CHUNK_SIZE: usize = A::SIZE;
79    const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
80
81    /// Initialize a new `Journal` instance.
82    ///
83    /// All backing blobs are opened but not read during initialization. Use `replay`
84    /// to iterate over all items.
85    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
86        let manager_cfg = ManagerConfig {
87            partition: cfg.partition,
88            factory: AppendFactory {
89                write_buffer: cfg.write_buffer,
90                page_cache_ref: cfg.page_cache,
91            },
92        };
93        let mut manager = Manager::init(context, manager_cfg).await?;
94
95        // Repair any blobs with trailing bytes (incomplete items from crash)
96        let sections: Vec<_> = manager.sections().collect();
97        for section in sections {
98            let size = manager.size(section).await?;
99            if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
100                let valid_size = size - (size % Self::CHUNK_SIZE_U64);
101                warn!(
102                    section,
103                    invalid_size = size,
104                    new_size = valid_size,
105                    "trailing bytes detected: truncating"
106                );
107                manager.rewind_section(section, valid_size).await?;
108            }
109        }
110
111        Ok(Self {
112            manager,
113            _array: PhantomData,
114        })
115    }
116
117    /// Append a new item to the journal in the given section.
118    ///
119    /// Returns the position of the item within the section (0-indexed).
120    pub async fn append(&mut self, section: u64, item: &A) -> Result<u64, Error> {
121        let blob = self.manager.get_or_create(section).await?;
122
123        let size = blob.size().await;
124        if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
125            return Err(Error::InvalidBlobSize(section, size));
126        }
127        let position = size / Self::CHUNK_SIZE_U64;
128
129        // Encode the item
130        let buf = item.encode_mut();
131        blob.append(&buf).await?;
132        trace!(section, position, "appended item");
133
134        Ok(position)
135    }
136
137    /// Read the item at the given section and position.
138    ///
139    /// # Errors
140    ///
141    /// - [Error::AlreadyPrunedToSection] if the section has been pruned.
142    /// - [Error::SectionOutOfRange] if the section doesn't exist.
143    /// - [Error::ItemOutOfRange] if the position is beyond the blob size.
144    pub async fn get(&self, section: u64, position: u64) -> Result<A, Error> {
145        let blob = self
146            .manager
147            .get(section)?
148            .ok_or(Error::SectionOutOfRange(section))?;
149
150        let offset = position
151            .checked_mul(Self::CHUNK_SIZE_U64)
152            .ok_or(Error::ItemOutOfRange(position))?;
153        let end = offset
154            .checked_add(Self::CHUNK_SIZE_U64)
155            .ok_or(Error::ItemOutOfRange(position))?;
156        if end > blob.size().await {
157            return Err(Error::ItemOutOfRange(position));
158        }
159
160        let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
161        A::decode(buf.coalesce()).map_err(Error::Codec)
162    }
163
164    /// Read the last item in a section, if any.
165    ///
166    /// Returns `Ok(None)` if the section is empty.
167    ///
168    /// # Errors
169    ///
170    /// - [Error::AlreadyPrunedToSection] if the section has been pruned.
171    /// - [Error::SectionOutOfRange] if the section doesn't exist.
172    pub async fn last(&self, section: u64) -> Result<Option<A>, Error> {
173        let blob = self
174            .manager
175            .get(section)?
176            .ok_or(Error::SectionOutOfRange(section))?;
177
178        let size = blob.size().await;
179        if size < Self::CHUNK_SIZE_U64 {
180            return Ok(None);
181        }
182
183        let last_position = (size / Self::CHUNK_SIZE_U64) - 1;
184        let offset = last_position * Self::CHUNK_SIZE_U64;
185        let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
186        A::decode(buf.coalesce()).map_err(Error::Codec).map(Some)
187    }
188
189    /// Returns a stream of all items starting from the given section.
190    ///
191    /// Each item is returned as (section, position, item).
192    pub async fn replay(
193        &self,
194        start_section: u64,
195        start_position: u64,
196        buffer: NonZeroUsize,
197    ) -> Result<impl Stream<Item = Result<(u64, u64, A), Error>> + Send + '_, Error> {
198        // Pre-create readers from blobs (async operation)
199        let mut blob_info = Vec::new();
200        for (&section, blob) in self.manager.sections_from(start_section) {
201            let blob_size = blob.size().await;
202            let mut replay = blob.replay(buffer).await?;
203            // For the first section, seek to the start position
204            let initial_position = if section == start_section {
205                let start = start_position * Self::CHUNK_SIZE_U64;
206                if start > blob_size {
207                    return Err(Error::ItemOutOfRange(start_position));
208                }
209                replay.seek_to(start)?;
210                start_position
211            } else {
212                0
213            };
214            blob_info.push((section, replay, initial_position));
215        }
216
217        // Stream items as they are read to avoid occupying too much memory.
218        // Each blob is processed sequentially, yielding batches of items that are then
219        // flattened into individual stream elements.
220        Ok(
221            stream::iter(blob_info).flat_map(move |(section, replay, initial_position)| {
222                stream::unfold(
223                    ReplayState {
224                        section,
225                        replay,
226                        position: initial_position,
227                        done: false,
228                    },
229                    move |mut state| async move {
230                        if state.done {
231                            return None;
232                        }
233
234                        let mut batch: Vec<Result<(u64, u64, A), Error>> = Vec::new();
235                        loop {
236                            // Ensure we have enough data for one item
237                            match state.replay.ensure(Self::CHUNK_SIZE).await {
238                                Ok(true) => {}
239                                Ok(false) => {
240                                    // Reader exhausted - we're done with this blob
241                                    state.done = true;
242                                    return if batch.is_empty() {
243                                        None
244                                    } else {
245                                        Some((batch, state))
246                                    };
247                                }
248                                Err(err) => {
249                                    batch.push(Err(Error::Runtime(err)));
250                                    state.done = true;
251                                    return Some((batch, state));
252                                }
253                            }
254
255                            // Decode items from buffer
256                            while state.replay.remaining() >= Self::CHUNK_SIZE {
257                                match A::read(&mut state.replay) {
258                                    Ok(item) => {
259                                        batch.push(Ok((state.section, state.position, item)));
260                                        state.position += 1;
261                                    }
262                                    Err(err) => {
263                                        batch.push(Err(Error::Codec(err)));
264                                        state.done = true;
265                                        return Some((batch, state));
266                                    }
267                                }
268                            }
269
270                            // Return batch if we have items
271                            if !batch.is_empty() {
272                                return Some((batch, state));
273                            }
274                        }
275                    },
276                )
277                .flat_map(stream::iter)
278            }),
279        )
280    }
281
282    /// Sync the given section to storage.
283    pub async fn sync(&self, section: u64) -> Result<(), Error> {
284        self.manager.sync(section).await
285    }
286
287    /// Sync all sections to storage.
288    pub async fn sync_all(&self) -> Result<(), Error> {
289        self.manager.sync_all().await
290    }
291
292    /// Prune all sections less than `min`. Returns true if any were pruned.
293    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
294        self.manager.prune(min).await
295    }
296
297    /// Returns the oldest section number, if any blobs exist.
298    pub fn oldest_section(&self) -> Option<u64> {
299        self.manager.oldest_section()
300    }
301
302    /// Returns the newest section number, if any blobs exist.
303    pub fn newest_section(&self) -> Option<u64> {
304        self.manager.newest_section()
305    }
306
307    /// Returns an iterator over all section numbers.
308    pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
309        self.manager.sections_from(0).map(|(section, _)| *section)
310    }
311
312    /// Returns the number of items in the given section.
313    pub async fn section_len(&self, section: u64) -> Result<u64, Error> {
314        let size = self.manager.size(section).await?;
315        Ok(size / Self::CHUNK_SIZE_U64)
316    }
317
318    /// Returns the byte size of the given section.
319    pub async fn size(&self, section: u64) -> Result<u64, Error> {
320        self.manager.size(section).await
321    }
322
323    /// Rewind the journal to a specific section and byte offset.
324    ///
325    /// This truncates the section to the given size. All sections
326    /// after `section` are removed.
327    pub async fn rewind(&mut self, section: u64, offset: u64) -> Result<(), Error> {
328        self.manager.rewind(section, offset).await
329    }
330
331    /// Rewind only the given section to a specific byte offset.
332    ///
333    /// Unlike `rewind`, this does not affect other sections.
334    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
335        self.manager.rewind_section(section, size).await
336    }
337
338    /// Remove all underlying blobs.
339    pub async fn destroy(self) -> Result<(), Error> {
340        self.manager.destroy().await
341    }
342
343    /// Clear all data, resetting the journal to an empty state.
344    ///
345    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
346    pub async fn clear(&mut self) -> Result<(), Error> {
347        self.manager.clear().await
348    }
349
350    /// Ensure a section exists, creating an empty blob if needed.
351    ///
352    /// This is used to maintain the invariant that at least one blob always exists
353    /// (the "tail" blob), which allows reconstructing journal size on reopen.
354    pub(crate) async fn ensure_section_exists(&mut self, section: u64) -> Result<(), Error> {
355        self.manager.get_or_create(section).await?;
356        Ok(())
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
364    use commonware_macros::test_traced;
365    use commonware_runtime::{
366        buffer::paged::CacheRef, deterministic, BufferPooler, Metrics, Runner,
367    };
368    use commonware_utils::{NZUsize, NZU16};
369    use core::num::NonZeroU16;
370    use futures::{pin_mut, StreamExt};
371
372    const PAGE_SIZE: NonZeroU16 = NZU16!(44);
373    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
374
375    fn test_digest(value: u64) -> Digest {
376        Sha256::hash(&value.to_be_bytes())
377    }
378
379    fn test_cfg(pooler: &impl BufferPooler) -> Config {
380        Config {
381            partition: "test-partition".into(),
382            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
383            write_buffer: NZUsize!(2048),
384        }
385    }
386
387    #[test_traced]
388    fn test_segmented_fixed_append_and_get() {
389        let executor = deterministic::Runner::default();
390        executor.start(|context| async move {
391            let cfg = test_cfg(&context);
392            let mut journal = Journal::init(context.clone(), cfg.clone())
393                .await
394                .expect("failed to init");
395
396            let pos0 = journal
397                .append(1, &test_digest(0))
398                .await
399                .expect("failed to append");
400            assert_eq!(pos0, 0);
401
402            let pos1 = journal
403                .append(1, &test_digest(1))
404                .await
405                .expect("failed to append");
406            assert_eq!(pos1, 1);
407
408            let pos2 = journal
409                .append(2, &test_digest(2))
410                .await
411                .expect("failed to append");
412            assert_eq!(pos2, 0);
413
414            let item0 = journal.get(1, 0).await.expect("failed to get");
415            assert_eq!(item0, test_digest(0));
416
417            let item1 = journal.get(1, 1).await.expect("failed to get");
418            assert_eq!(item1, test_digest(1));
419
420            let item2 = journal.get(2, 0).await.expect("failed to get");
421            assert_eq!(item2, test_digest(2));
422
423            let err = journal.get(1, 2).await;
424            assert!(matches!(err, Err(Error::ItemOutOfRange(2))));
425
426            let err = journal.get(3, 0).await;
427            assert!(matches!(err, Err(Error::SectionOutOfRange(3))));
428
429            journal.destroy().await.expect("failed to destroy");
430        });
431    }
432
433    #[test_traced]
434    fn test_segmented_fixed_replay() {
435        let executor = deterministic::Runner::default();
436        executor.start(|context| async move {
437            let cfg = test_cfg(&context);
438            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
439                .await
440                .expect("failed to init");
441
442            for i in 0u64..10 {
443                journal
444                    .append(1, &test_digest(i))
445                    .await
446                    .expect("failed to append");
447            }
448            for i in 10u64..20 {
449                journal
450                    .append(2, &test_digest(i))
451                    .await
452                    .expect("failed to append");
453            }
454
455            journal.sync_all().await.expect("failed to sync");
456            drop(journal);
457
458            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
459                .await
460                .expect("failed to re-init");
461
462            let items = {
463                let stream = journal
464                    .replay(0, 0, NZUsize!(1024))
465                    .await
466                    .expect("failed to replay");
467                pin_mut!(stream);
468
469                let mut items = Vec::new();
470                while let Some(result) = stream.next().await {
471                    match result {
472                        Ok((section, pos, item)) => items.push((section, pos, item)),
473                        Err(err) => panic!("replay error: {err}"),
474                    }
475                }
476                items
477            };
478
479            assert_eq!(items.len(), 20);
480            for (i, item) in items.iter().enumerate().take(10) {
481                assert_eq!(item.0, 1);
482                assert_eq!(item.1, i as u64);
483                assert_eq!(item.2, test_digest(i as u64));
484            }
485            for (i, item) in items.iter().enumerate().skip(10).take(10) {
486                assert_eq!(item.0, 2);
487                assert_eq!(item.1, (i - 10) as u64);
488                assert_eq!(item.2, test_digest(i as u64));
489            }
490
491            journal.destroy().await.expect("failed to destroy");
492        });
493    }
494
495    #[test_traced]
496    fn test_segmented_fixed_replay_with_start_offset() {
497        // Test that replay with a non-zero start_position correctly skips items.
498        let executor = deterministic::Runner::default();
499        executor.start(|context| async move {
500            let cfg = test_cfg(&context);
501            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
502                .await
503                .expect("failed to init");
504
505            // Append 10 items to section 1
506            for i in 0u64..10 {
507                journal
508                    .append(1, &test_digest(i))
509                    .await
510                    .expect("failed to append");
511            }
512            // Append 5 items to section 2
513            for i in 10u64..15 {
514                journal
515                    .append(2, &test_digest(i))
516                    .await
517                    .expect("failed to append");
518            }
519            journal.sync_all().await.expect("failed to sync");
520            drop(journal);
521
522            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
523                .await
524                .expect("failed to re-init");
525
526            // Replay from section 1, position 5 - should get items 5-9 from section 1 and all of section 2
527            {
528                let stream = journal
529                    .replay(1, 5, NZUsize!(1024))
530                    .await
531                    .expect("failed to replay");
532                pin_mut!(stream);
533
534                let mut items = Vec::new();
535                while let Some(result) = stream.next().await {
536                    let (section, pos, item) = result.expect("replay error");
537                    items.push((section, pos, item));
538                }
539
540                assert_eq!(
541                    items.len(),
542                    10,
543                    "Should have 5 items from section 1 + 5 from section 2"
544                );
545
546                // Check section 1 items (positions 5-9)
547                for (i, (section, pos, item)) in items.iter().enumerate().take(5) {
548                    assert_eq!(*section, 1);
549                    assert_eq!(*pos, (i + 5) as u64);
550                    assert_eq!(*item, test_digest((i + 5) as u64));
551                }
552
553                // Check section 2 items (positions 0-4)
554                for (i, (section, pos, item)) in items.iter().enumerate().skip(5) {
555                    assert_eq!(*section, 2);
556                    assert_eq!(*pos, (i - 5) as u64);
557                    assert_eq!(*item, test_digest((i + 5) as u64));
558                }
559            }
560
561            // Replay from section 1, position 9 - should get only item 9 from section 1 and all of section 2
562            {
563                let stream = journal
564                    .replay(1, 9, NZUsize!(1024))
565                    .await
566                    .expect("failed to replay");
567                pin_mut!(stream);
568
569                let mut items = Vec::new();
570                while let Some(result) = stream.next().await {
571                    let (section, pos, item) = result.expect("replay error");
572                    items.push((section, pos, item));
573                }
574
575                assert_eq!(
576                    items.len(),
577                    6,
578                    "Should have 1 item from section 1 + 5 from section 2"
579                );
580                assert_eq!(items[0], (1, 9, test_digest(9)));
581                for (i, (section, pos, item)) in items.iter().enumerate().skip(1) {
582                    assert_eq!(*section, 2);
583                    assert_eq!(*pos, (i - 1) as u64);
584                    assert_eq!(*item, test_digest((i + 9) as u64));
585                }
586            }
587
588            // Replay from section 2, position 3 - should get only items 3-4 from section 2
589            {
590                let stream = journal
591                    .replay(2, 3, NZUsize!(1024))
592                    .await
593                    .expect("failed to replay");
594                pin_mut!(stream);
595
596                let mut items = Vec::new();
597                while let Some(result) = stream.next().await {
598                    let (section, pos, item) = result.expect("replay error");
599                    items.push((section, pos, item));
600                }
601
602                assert_eq!(items.len(), 2, "Should have 2 items from section 2");
603                assert_eq!(items[0], (2, 3, test_digest(13)));
604                assert_eq!(items[1], (2, 4, test_digest(14)));
605            }
606
607            // Replay from position past the end should return ItemOutOfRange error
608            let result = journal.replay(1, 100, NZUsize!(1024)).await;
609            assert!(matches!(result, Err(Error::ItemOutOfRange(100))));
610            drop(result);
611
612            journal.destroy().await.expect("failed to destroy");
613        });
614    }
615
616    #[test_traced]
617    fn test_segmented_fixed_prune() {
618        let executor = deterministic::Runner::default();
619        executor.start(|context| async move {
620            let cfg = test_cfg(&context);
621            let mut journal = Journal::init(context.clone(), cfg.clone())
622                .await
623                .expect("failed to init");
624
625            for section in 1u64..=5 {
626                journal
627                    .append(section, &test_digest(section))
628                    .await
629                    .expect("failed to append");
630            }
631            journal.sync_all().await.expect("failed to sync");
632
633            journal.prune(3).await.expect("failed to prune");
634
635            let err = journal.get(1, 0).await;
636            assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
637
638            let err = journal.get(2, 0).await;
639            assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
640
641            let item = journal.get(3, 0).await.expect("should exist");
642            assert_eq!(item, test_digest(3));
643
644            journal.destroy().await.expect("failed to destroy");
645        });
646    }
647
648    #[test_traced]
649    fn test_segmented_fixed_rewind() {
650        let executor = deterministic::Runner::default();
651        executor.start(|context| async move {
652            let cfg = test_cfg(&context);
653            let mut journal = Journal::init(context.clone(), cfg.clone())
654                .await
655                .expect("failed to init");
656
657            // Create sections 1, 2, 3
658            for section in 1u64..=3 {
659                journal
660                    .append(section, &test_digest(section))
661                    .await
662                    .expect("failed to append");
663            }
664            journal.sync_all().await.expect("failed to sync");
665
666            // Verify all sections exist
667            for section in 1u64..=3 {
668                let size = journal.size(section).await.expect("failed to get size");
669                assert!(size > 0, "section {section} should have data");
670            }
671
672            // Rewind to section 1 (should remove sections 2, 3)
673            let size = journal.size(1).await.expect("failed to get size");
674            journal.rewind(1, size).await.expect("failed to rewind");
675
676            // Verify section 1 still has data
677            let size = journal.size(1).await.expect("failed to get size");
678            assert!(size > 0, "section 1 should still have data");
679
680            // Verify sections 2, 3 are removed
681            for section in 2u64..=3 {
682                let size = journal.size(section).await.expect("failed to get size");
683                assert_eq!(size, 0, "section {section} should be removed");
684            }
685
686            // Verify data in section 1 is still readable
687            let item = journal.get(1, 0).await.expect("failed to get");
688            assert_eq!(item, test_digest(1));
689
690            journal.destroy().await.expect("failed to destroy");
691        });
692    }
693
694    #[test_traced]
695    fn test_segmented_fixed_rewind_many_sections() {
696        let executor = deterministic::Runner::default();
697        executor.start(|context| async move {
698            let cfg = test_cfg(&context);
699            let mut journal = Journal::init(context.clone(), cfg.clone())
700                .await
701                .expect("failed to init");
702
703            // Create sections 1-10
704            for section in 1u64..=10 {
705                journal
706                    .append(section, &test_digest(section))
707                    .await
708                    .expect("failed to append");
709            }
710            journal.sync_all().await.expect("failed to sync");
711
712            // Rewind to section 5 (should remove sections 6-10)
713            let size = journal.size(5).await.expect("failed to get size");
714            journal.rewind(5, size).await.expect("failed to rewind");
715
716            // Verify sections 1-5 still have data
717            for section in 1u64..=5 {
718                let size = journal.size(section).await.expect("failed to get size");
719                assert!(size > 0, "section {section} should still have data");
720            }
721
722            // Verify sections 6-10 are removed
723            for section in 6u64..=10 {
724                let size = journal.size(section).await.expect("failed to get size");
725                assert_eq!(size, 0, "section {section} should be removed");
726            }
727
728            // Verify data integrity via replay
729            {
730                let stream = journal
731                    .replay(0, 0, NZUsize!(1024))
732                    .await
733                    .expect("failed to replay");
734                pin_mut!(stream);
735                let mut items = Vec::new();
736                while let Some(result) = stream.next().await {
737                    let (section, _, item) = result.expect("failed to read");
738                    items.push((section, item));
739                }
740                assert_eq!(items.len(), 5);
741                for (i, (section, item)) in items.iter().enumerate() {
742                    assert_eq!(*section, (i + 1) as u64);
743                    assert_eq!(*item, test_digest((i + 1) as u64));
744                }
745            }
746
747            journal.destroy().await.expect("failed to destroy");
748        });
749    }
750
751    #[test_traced]
752    fn test_segmented_fixed_rewind_persistence() {
753        let executor = deterministic::Runner::default();
754        executor.start(|context| async move {
755            let cfg = test_cfg(&context);
756
757            // Create sections 1-5
758            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
759                .await
760                .expect("failed to init");
761            for section in 1u64..=5 {
762                journal
763                    .append(section, &test_digest(section))
764                    .await
765                    .expect("failed to append");
766            }
767            journal.sync_all().await.expect("failed to sync");
768
769            // Rewind to section 2
770            let size = journal.size(2).await.expect("failed to get size");
771            journal.rewind(2, size).await.expect("failed to rewind");
772            journal.sync_all().await.expect("failed to sync");
773            drop(journal);
774
775            // Re-init and verify only sections 1-2 exist
776            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
777                .await
778                .expect("failed to re-init");
779
780            // Verify sections 1-2 have data
781            for section in 1u64..=2 {
782                let size = journal.size(section).await.expect("failed to get size");
783                assert!(size > 0, "section {section} should have data after restart");
784            }
785
786            // Verify sections 3-5 are gone
787            for section in 3u64..=5 {
788                let size = journal.size(section).await.expect("failed to get size");
789                assert_eq!(size, 0, "section {section} should be gone after restart");
790            }
791
792            // Verify data integrity
793            let item1 = journal.get(1, 0).await.expect("failed to get");
794            assert_eq!(item1, test_digest(1));
795            let item2 = journal.get(2, 0).await.expect("failed to get");
796            assert_eq!(item2, test_digest(2));
797
798            journal.destroy().await.expect("failed to destroy");
799        });
800    }
801
802    #[test_traced]
803    fn test_segmented_fixed_corruption_recovery() {
804        let executor = deterministic::Runner::default();
805        executor.start(|context| async move {
806            let cfg = test_cfg(&context);
807            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
808                .await
809                .expect("failed to init");
810
811            for i in 0u64..5 {
812                journal
813                    .append(1, &test_digest(i))
814                    .await
815                    .expect("failed to append");
816            }
817            journal.sync_all().await.expect("failed to sync");
818            drop(journal);
819
820            let (blob, size) = context
821                .open(&cfg.partition, &1u64.to_be_bytes())
822                .await
823                .expect("failed to open blob");
824            blob.resize(size - 1).await.expect("failed to truncate");
825            blob.sync().await.expect("failed to sync");
826
827            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
828                .await
829                .expect("failed to re-init");
830
831            let count = {
832                let stream = journal
833                    .replay(0, 0, NZUsize!(1024))
834                    .await
835                    .expect("failed to replay");
836                pin_mut!(stream);
837
838                let mut count = 0;
839                while let Some(result) = stream.next().await {
840                    result.expect("should be ok");
841                    count += 1;
842                }
843                count
844            };
845            assert_eq!(count, 4);
846
847            journal.destroy().await.expect("failed to destroy");
848        });
849    }
850
851    #[test_traced]
852    fn test_segmented_fixed_persistence() {
853        let executor = deterministic::Runner::default();
854        executor.start(|context| async move {
855            let cfg = test_cfg(&context);
856
857            // Create and populate journal
858            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
859                .await
860                .expect("failed to init");
861
862            for i in 0u64..5 {
863                journal
864                    .append(1, &test_digest(i))
865                    .await
866                    .expect("failed to append");
867            }
868            journal.sync_all().await.expect("failed to sync");
869            drop(journal);
870
871            // Reopen and verify data persisted
872            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg)
873                .await
874                .expect("failed to re-init");
875
876            for i in 0u64..5 {
877                let item = journal.get(1, i).await.expect("failed to get");
878                assert_eq!(item, test_digest(i));
879            }
880
881            journal.destroy().await.expect("failed to destroy");
882        });
883    }
884
885    #[test_traced]
886    fn test_segmented_fixed_section_len() {
887        let executor = deterministic::Runner::default();
888        executor.start(|context| async move {
889            let cfg = test_cfg(&context);
890            let mut journal = Journal::init(context.clone(), cfg.clone())
891                .await
892                .expect("failed to init");
893
894            assert_eq!(journal.section_len(1).await.unwrap(), 0);
895
896            for i in 0u64..5 {
897                journal
898                    .append(1, &test_digest(i))
899                    .await
900                    .expect("failed to append");
901            }
902
903            assert_eq!(journal.section_len(1).await.unwrap(), 5);
904            assert_eq!(journal.section_len(2).await.unwrap(), 0);
905
906            journal.destroy().await.expect("failed to destroy");
907        });
908    }
909
910    #[test_traced]
911    fn test_segmented_fixed_non_contiguous_sections() {
912        // Test that sections with gaps in numbering work correctly.
913        // Sections 1, 5, 10 should all be independent and accessible.
914        let executor = deterministic::Runner::default();
915        executor.start(|context| async move {
916            let cfg = test_cfg(&context);
917            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
918                .await
919                .expect("failed to init");
920
921            // Create sections with gaps: 1, 5, 10
922            journal
923                .append(1, &test_digest(100))
924                .await
925                .expect("failed to append");
926            journal
927                .append(5, &test_digest(500))
928                .await
929                .expect("failed to append");
930            journal
931                .append(10, &test_digest(1000))
932                .await
933                .expect("failed to append");
934            journal.sync_all().await.expect("failed to sync");
935
936            // Verify random access to each section
937            assert_eq!(journal.get(1, 0).await.unwrap(), test_digest(100));
938            assert_eq!(journal.get(5, 0).await.unwrap(), test_digest(500));
939            assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(1000));
940
941            // Verify non-existent sections return appropriate errors
942            for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
943                let result = journal.get(missing_section, 0).await;
944                assert!(
945                    matches!(result, Err(Error::SectionOutOfRange(_))),
946                    "Expected SectionOutOfRange for section {}, got {:?}",
947                    missing_section,
948                    result
949                );
950            }
951
952            // Drop and reopen to test replay
953            drop(journal);
954            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
955                .await
956                .expect("failed to re-init");
957
958            // Replay and verify all items in order
959            {
960                let stream = journal
961                    .replay(0, 0, NZUsize!(1024))
962                    .await
963                    .expect("failed to replay");
964                pin_mut!(stream);
965
966                let mut items = Vec::new();
967                while let Some(result) = stream.next().await {
968                    let (section, _, item) = result.expect("replay error");
969                    items.push((section, item));
970                }
971
972                assert_eq!(items.len(), 3, "Should have 3 items");
973                assert_eq!(items[0], (1, test_digest(100)));
974                assert_eq!(items[1], (5, test_digest(500)));
975                assert_eq!(items[2], (10, test_digest(1000)));
976            }
977
978            // Test replay starting from middle section (5)
979            {
980                let stream = journal
981                    .replay(5, 0, NZUsize!(1024))
982                    .await
983                    .expect("failed to replay from section 5");
984                pin_mut!(stream);
985
986                let mut items = Vec::new();
987                while let Some(result) = stream.next().await {
988                    let (section, _, item) = result.expect("replay error");
989                    items.push((section, item));
990                }
991
992                assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
993                assert_eq!(items[0], (5, test_digest(500)));
994                assert_eq!(items[1], (10, test_digest(1000)));
995            }
996
997            journal.destroy().await.expect("failed to destroy");
998        });
999    }
1000
1001    #[test_traced]
1002    fn test_segmented_fixed_empty_section_in_middle() {
1003        // Test that replay correctly handles an empty section between sections with data.
1004        // Section 1 has data, section 2 is empty, section 3 has data.
1005        let executor = deterministic::Runner::default();
1006        executor.start(|context| async move {
1007            let cfg = test_cfg(&context);
1008            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1009                .await
1010                .expect("failed to init");
1011
1012            // Append to section 1
1013            journal
1014                .append(1, &test_digest(100))
1015                .await
1016                .expect("failed to append");
1017
1018            // Create section 2 but make it empty via rewind
1019            journal
1020                .append(2, &test_digest(200))
1021                .await
1022                .expect("failed to append");
1023            journal.sync(2).await.expect("failed to sync");
1024            journal
1025                .rewind_section(2, 0)
1026                .await
1027                .expect("failed to rewind");
1028
1029            // Append to section 3
1030            journal
1031                .append(3, &test_digest(300))
1032                .await
1033                .expect("failed to append");
1034
1035            journal.sync_all().await.expect("failed to sync");
1036
1037            // Verify section lengths
1038            assert_eq!(journal.section_len(1).await.unwrap(), 1);
1039            assert_eq!(journal.section_len(2).await.unwrap(), 0);
1040            assert_eq!(journal.section_len(3).await.unwrap(), 1);
1041
1042            // Drop and reopen to test replay
1043            drop(journal);
1044            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1045                .await
1046                .expect("failed to re-init");
1047
1048            // Replay all - should get items from sections 1 and 3, skipping empty section 2
1049            {
1050                let stream = journal
1051                    .replay(0, 0, NZUsize!(1024))
1052                    .await
1053                    .expect("failed to replay");
1054                pin_mut!(stream);
1055
1056                let mut items = Vec::new();
1057                while let Some(result) = stream.next().await {
1058                    let (section, _, item) = result.expect("replay error");
1059                    items.push((section, item));
1060                }
1061
1062                assert_eq!(
1063                    items.len(),
1064                    2,
1065                    "Should have 2 items (skipping empty section)"
1066                );
1067                assert_eq!(items[0], (1, test_digest(100)));
1068                assert_eq!(items[1], (3, test_digest(300)));
1069            }
1070
1071            // Replay starting from empty section 2 - should get only section 3
1072            {
1073                let stream = journal
1074                    .replay(2, 0, NZUsize!(1024))
1075                    .await
1076                    .expect("failed to replay from section 2");
1077                pin_mut!(stream);
1078
1079                let mut items = Vec::new();
1080                while let Some(result) = stream.next().await {
1081                    let (section, _, item) = result.expect("replay error");
1082                    items.push((section, item));
1083                }
1084
1085                assert_eq!(items.len(), 1, "Should have 1 item from section 3");
1086                assert_eq!(items[0], (3, test_digest(300)));
1087            }
1088
1089            journal.destroy().await.expect("failed to destroy");
1090        });
1091    }
1092
1093    #[test_traced]
1094    fn test_segmented_fixed_truncation_recovery_across_page_boundary() {
1095        // Test that truncating a single byte from a blob that has items straddling a page boundary
1096        // correctly recovers by removing the incomplete item.
1097        //
1098        // With PAGE_SIZE=44 and ITEM_SIZE=32:
1099        // - Item 0: bytes 0-31
1100        // - Item 1: bytes 32-63 (straddles page boundary at 44)
1101        // - Item 2: bytes 64-95 (straddles page boundary at 88)
1102        //
1103        // After 3 items we have 96 bytes = 2 full pages + 8 bytes. Truncating 1 byte leaves 95
1104        // bytes, which is not a multiple of 32. Recovery should truncate to 64 bytes (2 complete
1105        // items).
1106        let executor = deterministic::Runner::default();
1107        executor.start(|context| async move {
1108            let cfg = test_cfg(&context);
1109            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1110                .await
1111                .expect("failed to init");
1112
1113            // Append 3 items (just over 2 pages worth)
1114            for i in 0u64..3 {
1115                journal
1116                    .append(1, &test_digest(i))
1117                    .await
1118                    .expect("failed to append");
1119            }
1120            journal.sync_all().await.expect("failed to sync");
1121
1122            // Verify all 3 items are readable
1123            for i in 0u64..3 {
1124                let item = journal.get(1, i).await.expect("failed to get");
1125                assert_eq!(item, test_digest(i));
1126            }
1127            drop(journal);
1128
1129            // Truncate the blob by exactly 1 byte to simulate partial write
1130            let (blob, size) = context
1131                .open(&cfg.partition, &1u64.to_be_bytes())
1132                .await
1133                .expect("failed to open blob");
1134            blob.resize(size - 1).await.expect("failed to truncate");
1135            blob.sync().await.expect("failed to sync");
1136            drop(blob);
1137
1138            // Reopen journal - should recover by truncating last page due to failed checksum, and
1139            // end up with a correct blob size due to partial-item trimming.
1140            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1141                .await
1142                .expect("failed to re-init");
1143
1144            // Verify section now has only 2 items
1145            assert_eq!(journal.section_len(1).await.unwrap(), 2);
1146
1147            // Verify size is the expected multiple of ITEM_SIZE (this would fail if we didn't trim
1148            // items and just relied on page-level checksum recovery).
1149            assert_eq!(journal.size(1).await.unwrap(), 64);
1150
1151            // Items 0 and 1 should still be readable
1152            let item0 = journal.get(1, 0).await.expect("failed to get item 0");
1153            assert_eq!(item0, test_digest(0));
1154            let item1 = journal.get(1, 1).await.expect("failed to get item 1");
1155            assert_eq!(item1, test_digest(1));
1156
1157            // Item 2 should return ItemOutOfRange
1158            let err = journal.get(1, 2).await;
1159            assert!(
1160                matches!(err, Err(Error::ItemOutOfRange(2))),
1161                "expected ItemOutOfRange(2), got {:?}",
1162                err
1163            );
1164
1165            journal.destroy().await.expect("failed to destroy");
1166        });
1167    }
1168
1169    #[test_traced]
1170    fn test_journal_clear() {
1171        let executor = deterministic::Runner::default();
1172        executor.start(|context| async move {
1173            let cfg = Config {
1174                partition: "clear-test".into(),
1175                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1176                write_buffer: NZUsize!(1024),
1177            };
1178
1179            let mut journal: Journal<_, Digest> =
1180                Journal::init(context.with_label("journal"), cfg.clone())
1181                    .await
1182                    .expect("Failed to initialize journal");
1183
1184            // Append items across multiple sections
1185            for section in 0..5u64 {
1186                for i in 0..10u64 {
1187                    journal
1188                        .append(section, &test_digest(section * 1000 + i))
1189                        .await
1190                        .expect("Failed to append");
1191                }
1192                journal.sync(section).await.expect("Failed to sync");
1193            }
1194
1195            // Verify we have data
1196            assert_eq!(journal.get(0, 0).await.unwrap(), test_digest(0));
1197            assert_eq!(journal.get(4, 0).await.unwrap(), test_digest(4000));
1198
1199            // Clear the journal
1200            journal.clear().await.expect("Failed to clear");
1201
1202            // After clear, all reads should fail
1203            for section in 0..5u64 {
1204                assert!(matches!(
1205                    journal.get(section, 0).await,
1206                    Err(Error::SectionOutOfRange(s)) if s == section
1207                ));
1208            }
1209
1210            // Append new data after clear
1211            for i in 0..5u64 {
1212                journal
1213                    .append(10, &test_digest(i * 100))
1214                    .await
1215                    .expect("Failed to append after clear");
1216            }
1217            journal.sync(10).await.expect("Failed to sync after clear");
1218
1219            // New data should be readable
1220            assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(0));
1221
1222            // Old sections should still be missing
1223            assert!(matches!(
1224                journal.get(0, 0).await,
1225                Err(Error::SectionOutOfRange(0))
1226            ));
1227
1228            journal.destroy().await.unwrap();
1229        });
1230    }
1231
1232    #[test_traced]
1233    fn test_last_missing_section_returns_error() {
1234        let executor = deterministic::Runner::default();
1235        executor.start(|context| async move {
1236            let cfg = test_cfg(&context);
1237            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1238                .await
1239                .expect("failed to init");
1240
1241            assert!(matches!(
1242                journal.last(0).await,
1243                Err(Error::SectionOutOfRange(0))
1244            ));
1245            assert!(matches!(
1246                journal.last(99).await,
1247                Err(Error::SectionOutOfRange(99))
1248            ));
1249
1250            journal.destroy().await.unwrap();
1251        });
1252    }
1253
1254    #[test_traced]
1255    fn test_last_after_rewind_to_zero() {
1256        let executor = deterministic::Runner::default();
1257        executor.start(|context| async move {
1258            let cfg = test_cfg(&context);
1259            let mut journal = Journal::init(context.clone(), cfg.clone())
1260                .await
1261                .expect("failed to init");
1262
1263            journal.append(0, &test_digest(0)).await.unwrap();
1264            journal.append(0, &test_digest(1)).await.unwrap();
1265            journal.sync(0).await.unwrap();
1266
1267            assert!(journal.last(0).await.unwrap().is_some());
1268
1269            journal.rewind(0, 0).await.unwrap();
1270            assert_eq!(journal.last(0).await.unwrap(), None);
1271
1272            journal.destroy().await.unwrap();
1273        });
1274    }
1275
1276    #[test_traced]
1277    fn test_last_pruned_section_returns_error() {
1278        let executor = deterministic::Runner::default();
1279        executor.start(|context| async move {
1280            let cfg = test_cfg(&context);
1281            let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1282                .await
1283                .expect("failed to init");
1284
1285            journal.append(0, &test_digest(0)).await.unwrap();
1286            journal.append(1, &test_digest(1)).await.unwrap();
1287            journal.sync_all().await.unwrap();
1288
1289            journal.prune(1).await.unwrap();
1290
1291            assert!(matches!(
1292                journal.last(0).await,
1293                Err(Error::AlreadyPrunedToSection(1))
1294            ));
1295            assert!(journal.last(1).await.unwrap().is_some());
1296
1297            journal.destroy().await.unwrap();
1298        });
1299    }
1300}