Skip to main content

commonware_storage/journal/segmented/
fixed.rs

1//! Segmented journal for fixed-size items.
2//!
3//! # Format
4//!
5//! Data is stored in one blob per section. Items are stored sequentially:
6//!
7//! ```text
8//! +--------+--------+--------+----------+
9//! | item_0 | item_1 |   ...  | item_n-1 |
10//! +--------+--------+--------+----------+
11//! ```
12//!
13//! # Sync
14//!
15//! Data written to `Journal` may not be immediately persisted to `Storage`. Use the
16//! `sync` method to force pending data to be written.
17//!
18//! # Pruning
19//!
20//! All data must be assigned to a `section`. This allows pruning entire sections
21//! (and their corresponding blobs) independently.
22
23use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
24use crate::journal::Error;
25use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _};
26use commonware_runtime::{
27    buffer::paged::{CacheRef, Replay},
28    Blob, Buf, Metrics, Storage,
29};
30use futures::{
31    stream::{self, Stream},
32    StreamExt,
33};
34use std::{marker::PhantomData, num::NonZeroUsize};
35use tracing::{trace, warn};
36
37/// State for replaying a single section's blob.
38struct ReplayState<B: Blob> {
39    section: u64,
40    replay: Replay<B>,
41    position: u64,
42    done: bool,
43}
44
45/// Configuration for the fixed segmented journal.
46#[derive(Clone)]
47pub struct Config {
48    /// The partition to use for storing blobs.
49    pub partition: String,
50
51    /// The page cache to use for caching data.
52    pub page_cache: CacheRef,
53
54    /// The size of the write buffer to use for each blob.
55    pub write_buffer: NonZeroUsize,
56}
57
58/// A segmented journal with fixed-size entries.
59///
60/// Each section is stored in a separate blob. Within each blob, items are fixed-size.
61///
62/// # Repair
63///
64/// Like
65/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
66/// and
67/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
68/// the first invalid data read will be considered the new end of the journal (and the
69/// underlying [Blob] will be truncated to the last valid item). Repair occurs during
70/// init by checking each blob's size.
71pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
72    manager: Manager<E, AppendFactory>,
73    _array: PhantomData<A>,
74}
75
76impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
77    /// Size of each entry.
78    pub const CHUNK_SIZE: usize = A::SIZE;
79    const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
80
81    /// Initialize a new `Journal` instance.
82    ///
83    /// All backing blobs are opened but not read during initialization. Use `replay`
84    /// to iterate over all items.
85    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
86        let manager_cfg = ManagerConfig {
87            partition: cfg.partition,
88            factory: AppendFactory {
89                write_buffer: cfg.write_buffer,
90                page_cache_ref: cfg.page_cache,
91            },
92        };
93        let mut manager = Manager::init(context, manager_cfg).await?;
94
95        // Repair any blobs with trailing bytes (incomplete items from crash)
96        let sections: Vec<_> = manager.sections().collect();
97        for section in sections {
98            let size = manager.size(section).await?;
99            if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
100                let valid_size = size - (size % Self::CHUNK_SIZE_U64);
101                warn!(
102                    section,
103                    invalid_size = size,
104                    new_size = valid_size,
105                    "trailing bytes detected: truncating"
106                );
107                manager.rewind_section(section, valid_size).await?;
108            }
109        }
110
111        Ok(Self {
112            manager,
113            _array: PhantomData,
114        })
115    }
116
117    /// Append a new item to the journal in the given section.
118    ///
119    /// Returns the position of the item within the section (0-indexed).
120    pub async fn append(&mut self, section: u64, item: &A) -> Result<u64, Error> {
121        let blob = self.manager.get_or_create(section).await?;
122
123        let size = blob.size().await;
124        if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
125            return Err(Error::InvalidBlobSize(section, size));
126        }
127        let position = size / Self::CHUNK_SIZE_U64;
128
129        // Encode the item
130        let buf = item.encode_mut();
131        blob.append(&buf).await?;
132        trace!(section, position, "appended item");
133
134        Ok(position)
135    }
136
137    /// Append pre-encoded bytes to the given section.
138    ///
139    /// The buffer must contain one or more encoded items with size [Self::CHUNK_SIZE] each.
140    ///
141    /// # Panics
142    ///
143    /// Panics if `buf` is empty or not a multiple of [Self::CHUNK_SIZE].
144    pub(crate) async fn append_raw(&mut self, section: u64, buf: &[u8]) -> Result<(), Error> {
145        assert!(!buf.is_empty());
146        assert!(buf.len().is_multiple_of(Self::CHUNK_SIZE));
147        let blob = self.manager.get_or_create(section).await?;
148        blob.append(buf).await?;
149        trace!(
150            section,
151            count = buf.len() / Self::CHUNK_SIZE,
152            "appended items"
153        );
154        Ok(())
155    }
156
157    /// Read the item at the given section and position.
158    ///
159    /// # Errors
160    ///
161    /// - [Error::AlreadyPrunedToSection] if the section has been pruned.
162    /// - [Error::SectionOutOfRange] if the section doesn't exist.
163    /// - [Error::ItemOutOfRange] if the position is beyond the blob size.
164    pub async fn get(&self, section: u64, position: u64) -> Result<A, Error> {
165        let blob = self
166            .manager
167            .get(section)?
168            .ok_or(Error::SectionOutOfRange(section))?;
169
170        let offset = position
171            .checked_mul(Self::CHUNK_SIZE_U64)
172            .ok_or(Error::ItemOutOfRange(position))?;
173        let end = offset
174            .checked_add(Self::CHUNK_SIZE_U64)
175            .ok_or(Error::ItemOutOfRange(position))?;
176        if end > blob.size().await {
177            return Err(Error::ItemOutOfRange(position));
178        }
179
180        let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
181        A::decode(buf.coalesce()).map_err(Error::Codec)
182    }
183
184    /// Get an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
185    pub fn try_get_sync(&self, section: u64, position: u64) -> Option<A> {
186        let blob = self.manager.get(section).ok()??;
187        let offset = position.checked_mul(Self::CHUNK_SIZE_U64)?;
188        let remaining = blob.try_size()?.checked_sub(offset)?;
189        if remaining < Self::CHUNK_SIZE_U64 {
190            return None;
191        }
192        let mut buf = vec![0u8; Self::CHUNK_SIZE];
193        if !blob.try_read_sync(offset, &mut buf) {
194            return None;
195        }
196        A::decode(&buf[..]).ok()
197    }
198
199    /// Read the last item in a section, if any.
200    ///
201    /// Returns `Ok(None)` if the section is empty.
202    ///
203    /// # Errors
204    ///
205    /// - [Error::AlreadyPrunedToSection] if the section has been pruned.
206    /// - [Error::SectionOutOfRange] if the section doesn't exist.
207    pub async fn last(&self, section: u64) -> Result<Option<A>, Error> {
208        let blob = self
209            .manager
210            .get(section)?
211            .ok_or(Error::SectionOutOfRange(section))?;
212
213        let size = blob.size().await;
214        if size < Self::CHUNK_SIZE_U64 {
215            return Ok(None);
216        }
217
218        let last_position = (size / Self::CHUNK_SIZE_U64) - 1;
219        let offset = last_position * Self::CHUNK_SIZE_U64;
220        let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
221        A::decode(buf.coalesce()).map_err(Error::Codec).map(Some)
222    }
223
224    /// Returns a stream of all items starting from the given section.
225    ///
226    /// Each item is returned as (section, position, item).
227    pub async fn replay(
228        &self,
229        start_section: u64,
230        start_position: u64,
231        buffer: NonZeroUsize,
232    ) -> Result<impl Stream<Item = Result<(u64, u64, A), Error>> + Send + '_, Error> {
233        // Pre-create readers from blobs (async operation)
234        let mut blob_info = Vec::new();
235        for (&section, blob) in self.manager.sections_from(start_section) {
236            let blob_size = blob.size().await;
237            let mut replay = blob.replay(buffer).await?;
238            // For the first section, seek to the start position
239            let initial_position = if section == start_section {
240                let start = start_position * Self::CHUNK_SIZE_U64;
241                if start > blob_size {
242                    return Err(Error::ItemOutOfRange(start_position));
243                }
244                replay.seek_to(start)?;
245                start_position
246            } else {
247                0
248            };
249            blob_info.push((section, replay, initial_position));
250        }
251
252        // Stream items as they are read to avoid occupying too much memory.
253        // Each blob is processed sequentially, yielding batches of items that are then
254        // flattened into individual stream elements.
255        Ok(
256            stream::iter(blob_info).flat_map(move |(section, replay, initial_position)| {
257                stream::unfold(
258                    ReplayState {
259                        section,
260                        replay,
261                        position: initial_position,
262                        done: false,
263                    },
264                    move |mut state| async move {
265                        if state.done {
266                            return None;
267                        }
268
269                        let mut batch: Vec<Result<(u64, u64, A), Error>> = Vec::new();
270                        loop {
271                            // Ensure we have enough data for one item
272                            match state.replay.ensure(Self::CHUNK_SIZE).await {
273                                Ok(true) => {}
274                                Ok(false) => {
275                                    // Reader exhausted - we're done with this blob
276                                    state.done = true;
277                                    return if batch.is_empty() {
278                                        None
279                                    } else {
280                                        Some((batch, state))
281                                    };
282                                }
283                                Err(err) => {
284                                    batch.push(Err(Error::Runtime(err)));
285                                    state.done = true;
286                                    return Some((batch, state));
287                                }
288                            }
289
290                            // Decode items from buffer
291                            while state.replay.remaining() >= Self::CHUNK_SIZE {
292                                match A::read(&mut state.replay) {
293                                    Ok(item) => {
294                                        batch.push(Ok((state.section, state.position, item)));
295                                        state.position += 1;
296                                    }
297                                    Err(err) => {
298                                        batch.push(Err(Error::Codec(err)));
299                                        state.done = true;
300                                        return Some((batch, state));
301                                    }
302                                }
303                            }
304
305                            // Return batch if we have items
306                            if !batch.is_empty() {
307                                return Some((batch, state));
308                            }
309                        }
310                    },
311                )
312                .flat_map(stream::iter)
313            }),
314        )
315    }
316
317    /// Sync the given section to storage.
318    pub async fn sync(&self, section: u64) -> Result<(), Error> {
319        self.manager.sync(section).await
320    }
321
322    /// Sync all sections to storage.
323    pub async fn sync_all(&self) -> Result<(), Error> {
324        self.manager.sync_all().await
325    }
326
327    /// Prune all sections less than `min`. Returns true if any were pruned.
328    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
329        self.manager.prune(min).await
330    }
331
332    /// Returns the oldest section number, if any blobs exist.
333    pub fn oldest_section(&self) -> Option<u64> {
334        self.manager.oldest_section()
335    }
336
337    /// Returns the newest section number, if any blobs exist.
338    pub fn newest_section(&self) -> Option<u64> {
339        self.manager.newest_section()
340    }
341
342    /// Returns an iterator over all section numbers.
343    pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
344        self.manager.sections_from(0).map(|(section, _)| *section)
345    }
346
347    /// Returns the number of items in the given section.
348    pub async fn section_len(&self, section: u64) -> Result<u64, Error> {
349        let size = self.manager.size(section).await?;
350        Ok(size / Self::CHUNK_SIZE_U64)
351    }
352
353    /// Returns the byte size of the given section.
354    pub async fn size(&self, section: u64) -> Result<u64, Error> {
355        self.manager.size(section).await
356    }
357
358    /// Rewind the journal to a specific section and byte offset.
359    ///
360    /// This truncates the section to the given size. All sections
361    /// after `section` are removed.
362    pub async fn rewind(&mut self, section: u64, offset: u64) -> Result<(), Error> {
363        self.manager.rewind(section, offset).await
364    }
365
366    /// Rewind only the given section to a specific byte offset.
367    ///
368    /// Unlike `rewind`, this does not affect other sections.
369    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
370        self.manager.rewind_section(section, size).await
371    }
372
373    /// Remove all underlying blobs.
374    pub async fn destroy(self) -> Result<(), Error> {
375        self.manager.destroy().await
376    }
377
378    /// Clear all data, resetting the journal to an empty state.
379    ///
380    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
381    pub async fn clear(&mut self) -> Result<(), Error> {
382        self.manager.clear().await
383    }
384
385    /// Ensure a section exists, creating an empty blob if needed.
386    ///
387    /// This is used to maintain the invariant that at least one blob always exists
388    /// (the "tail" blob), which allows reconstructing journal size on reopen.
389    pub(crate) async fn ensure_section_exists(&mut self, section: u64) -> Result<(), Error> {
390        self.manager.get_or_create(section).await?;
391        Ok(())
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
399    use commonware_macros::test_traced;
400    use commonware_runtime::{
401        buffer::paged::CacheRef, deterministic, BufferPooler, Metrics, Runner,
402    };
403    use commonware_utils::{NZUsize, NZU16};
404    use core::num::NonZeroU16;
405    use futures::{pin_mut, StreamExt};
406
407    const PAGE_SIZE: NonZeroU16 = NZU16!(44);
408    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
409
410    fn test_digest(value: u64) -> Digest {
411        Sha256::hash(&value.to_be_bytes())
412    }
413
414    fn test_cfg(pooler: &impl BufferPooler) -> Config {
415        Config {
416            partition: "test-partition".into(),
417            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
418            write_buffer: NZUsize!(2048),
419        }
420    }
421
422    #[test_traced]
423    fn test_segmented_fixed_append_and_get() {
424        let executor = deterministic::Runner::default();
425        executor.start(|context| async move {
426            let cfg = test_cfg(&context);
427            let mut journal = Journal::init(context.clone(), cfg.clone())
428                .await
429                .expect("failed to init");
430
431            let pos0 = journal
432                .append(1, &test_digest(0))
433                .await
434                .expect("failed to append");
435            assert_eq!(pos0, 0);
436
437            let pos1 = journal
438                .append(1, &test_digest(1))
439                .await
440                .expect("failed to append");
441            assert_eq!(pos1, 1);
442
443            let pos2 = journal
444                .append(2, &test_digest(2))
445                .await
446                .expect("failed to append");
447            assert_eq!(pos2, 0);
448
449            let item0 = journal.get(1, 0).await.expect("failed to get");
450            assert_eq!(item0, test_digest(0));
451
452            let item1 = journal.get(1, 1).await.expect("failed to get");
453            assert_eq!(item1, test_digest(1));
454
455            let item2 = journal.get(2, 0).await.expect("failed to get");
456            assert_eq!(item2, test_digest(2));
457
458            let err = journal.get(1, 2).await;
459            assert!(matches!(err, Err(Error::ItemOutOfRange(2))));
460
461            let err = journal.get(3, 0).await;
462            assert!(matches!(err, Err(Error::SectionOutOfRange(3))));
463
464            journal.destroy().await.expect("failed to destroy");
465        });
466    }
467
468    #[test_traced]
469    fn test_segmented_fixed_replay() {
470        let executor = deterministic::Runner::default();
471        executor.start(|context| async move {
472            let cfg = test_cfg(&context);
473            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
474                .await
475                .expect("failed to init");
476
477            for i in 0u64..10 {
478                journal
479                    .append(1, &test_digest(i))
480                    .await
481                    .expect("failed to append");
482            }
483            for i in 10u64..20 {
484                journal
485                    .append(2, &test_digest(i))
486                    .await
487                    .expect("failed to append");
488            }
489
490            journal.sync_all().await.expect("failed to sync");
491            drop(journal);
492
493            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
494                .await
495                .expect("failed to re-init");
496
497            let items = {
498                let stream = journal
499                    .replay(0, 0, NZUsize!(1024))
500                    .await
501                    .expect("failed to replay");
502                pin_mut!(stream);
503
504                let mut items = Vec::new();
505                while let Some(result) = stream.next().await {
506                    match result {
507                        Ok((section, pos, item)) => items.push((section, pos, item)),
508                        Err(err) => panic!("replay error: {err}"),
509                    }
510                }
511                items
512            };
513
514            assert_eq!(items.len(), 20);
515            for (i, item) in items.iter().enumerate().take(10) {
516                assert_eq!(item.0, 1);
517                assert_eq!(item.1, i as u64);
518                assert_eq!(item.2, test_digest(i as u64));
519            }
520            for (i, item) in items.iter().enumerate().skip(10).take(10) {
521                assert_eq!(item.0, 2);
522                assert_eq!(item.1, (i - 10) as u64);
523                assert_eq!(item.2, test_digest(i as u64));
524            }
525
526            journal.destroy().await.expect("failed to destroy");
527        });
528    }
529
530    #[test_traced]
531    fn test_segmented_fixed_replay_with_start_offset() {
532        // Test that replay with a non-zero start_position correctly skips items.
533        let executor = deterministic::Runner::default();
534        executor.start(|context| async move {
535            let cfg = test_cfg(&context);
536            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
537                .await
538                .expect("failed to init");
539
540            // Append 10 items to section 1
541            for i in 0u64..10 {
542                journal
543                    .append(1, &test_digest(i))
544                    .await
545                    .expect("failed to append");
546            }
547            // Append 5 items to section 2
548            for i in 10u64..15 {
549                journal
550                    .append(2, &test_digest(i))
551                    .await
552                    .expect("failed to append");
553            }
554            journal.sync_all().await.expect("failed to sync");
555            drop(journal);
556
557            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
558                .await
559                .expect("failed to re-init");
560
561            // Replay from section 1, position 5 - should get items 5-9 from section 1 and all of section 2
562            {
563                let stream = journal
564                    .replay(1, 5, NZUsize!(1024))
565                    .await
566                    .expect("failed to replay");
567                pin_mut!(stream);
568
569                let mut items = Vec::new();
570                while let Some(result) = stream.next().await {
571                    let (section, pos, item) = result.expect("replay error");
572                    items.push((section, pos, item));
573                }
574
575                assert_eq!(
576                    items.len(),
577                    10,
578                    "Should have 5 items from section 1 + 5 from section 2"
579                );
580
581                // Check section 1 items (positions 5-9)
582                for (i, (section, pos, item)) in items.iter().enumerate().take(5) {
583                    assert_eq!(*section, 1);
584                    assert_eq!(*pos, (i + 5) as u64);
585                    assert_eq!(*item, test_digest((i + 5) as u64));
586                }
587
588                // Check section 2 items (positions 0-4)
589                for (i, (section, pos, item)) in items.iter().enumerate().skip(5) {
590                    assert_eq!(*section, 2);
591                    assert_eq!(*pos, (i - 5) as u64);
592                    assert_eq!(*item, test_digest((i + 5) as u64));
593                }
594            }
595
596            // Replay from section 1, position 9 - should get only item 9 from section 1 and all of section 2
597            {
598                let stream = journal
599                    .replay(1, 9, NZUsize!(1024))
600                    .await
601                    .expect("failed to replay");
602                pin_mut!(stream);
603
604                let mut items = Vec::new();
605                while let Some(result) = stream.next().await {
606                    let (section, pos, item) = result.expect("replay error");
607                    items.push((section, pos, item));
608                }
609
610                assert_eq!(
611                    items.len(),
612                    6,
613                    "Should have 1 item from section 1 + 5 from section 2"
614                );
615                assert_eq!(items[0], (1, 9, test_digest(9)));
616                for (i, (section, pos, item)) in items.iter().enumerate().skip(1) {
617                    assert_eq!(*section, 2);
618                    assert_eq!(*pos, (i - 1) as u64);
619                    assert_eq!(*item, test_digest((i + 9) as u64));
620                }
621            }
622
623            // Replay from section 2, position 3 - should get only items 3-4 from section 2
624            {
625                let stream = journal
626                    .replay(2, 3, NZUsize!(1024))
627                    .await
628                    .expect("failed to replay");
629                pin_mut!(stream);
630
631                let mut items = Vec::new();
632                while let Some(result) = stream.next().await {
633                    let (section, pos, item) = result.expect("replay error");
634                    items.push((section, pos, item));
635                }
636
637                assert_eq!(items.len(), 2, "Should have 2 items from section 2");
638                assert_eq!(items[0], (2, 3, test_digest(13)));
639                assert_eq!(items[1], (2, 4, test_digest(14)));
640            }
641
642            // Replay from position past the end should return ItemOutOfRange error
643            let result = journal.replay(1, 100, NZUsize!(1024)).await;
644            assert!(matches!(result, Err(Error::ItemOutOfRange(100))));
645            drop(result);
646
647            journal.destroy().await.expect("failed to destroy");
648        });
649    }
650
651    #[test_traced]
652    fn test_segmented_fixed_prune() {
653        let executor = deterministic::Runner::default();
654        executor.start(|context| async move {
655            let cfg = test_cfg(&context);
656            let mut journal = Journal::init(context.clone(), cfg.clone())
657                .await
658                .expect("failed to init");
659
660            for section in 1u64..=5 {
661                journal
662                    .append(section, &test_digest(section))
663                    .await
664                    .expect("failed to append");
665            }
666            journal.sync_all().await.expect("failed to sync");
667
668            journal.prune(3).await.expect("failed to prune");
669
670            let err = journal.get(1, 0).await;
671            assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
672
673            let err = journal.get(2, 0).await;
674            assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
675
676            let item = journal.get(3, 0).await.expect("should exist");
677            assert_eq!(item, test_digest(3));
678
679            journal.destroy().await.expect("failed to destroy");
680        });
681    }
682
683    #[test_traced]
684    fn test_segmented_fixed_rewind() {
685        let executor = deterministic::Runner::default();
686        executor.start(|context| async move {
687            let cfg = test_cfg(&context);
688            let mut journal = Journal::init(context.clone(), cfg.clone())
689                .await
690                .expect("failed to init");
691
692            // Create sections 1, 2, 3
693            for section in 1u64..=3 {
694                journal
695                    .append(section, &test_digest(section))
696                    .await
697                    .expect("failed to append");
698            }
699            journal.sync_all().await.expect("failed to sync");
700
701            // Verify all sections exist
702            for section in 1u64..=3 {
703                let size = journal.size(section).await.expect("failed to get size");
704                assert!(size > 0, "section {section} should have data");
705            }
706
707            // Rewind to section 1 (should remove sections 2, 3)
708            let size = journal.size(1).await.expect("failed to get size");
709            journal.rewind(1, size).await.expect("failed to rewind");
710
711            // Verify section 1 still has data
712            let size = journal.size(1).await.expect("failed to get size");
713            assert!(size > 0, "section 1 should still have data");
714
715            // Verify sections 2, 3 are removed
716            for section in 2u64..=3 {
717                let size = journal.size(section).await.expect("failed to get size");
718                assert_eq!(size, 0, "section {section} should be removed");
719            }
720
721            // Verify data in section 1 is still readable
722            let item = journal.get(1, 0).await.expect("failed to get");
723            assert_eq!(item, test_digest(1));
724
725            journal.destroy().await.expect("failed to destroy");
726        });
727    }
728
729    #[test_traced]
730    fn test_segmented_fixed_rewind_many_sections() {
731        let executor = deterministic::Runner::default();
732        executor.start(|context| async move {
733            let cfg = test_cfg(&context);
734            let mut journal = Journal::init(context.clone(), cfg.clone())
735                .await
736                .expect("failed to init");
737
738            // Create sections 1-10
739            for section in 1u64..=10 {
740                journal
741                    .append(section, &test_digest(section))
742                    .await
743                    .expect("failed to append");
744            }
745            journal.sync_all().await.expect("failed to sync");
746
747            // Rewind to section 5 (should remove sections 6-10)
748            let size = journal.size(5).await.expect("failed to get size");
749            journal.rewind(5, size).await.expect("failed to rewind");
750
751            // Verify sections 1-5 still have data
752            for section in 1u64..=5 {
753                let size = journal.size(section).await.expect("failed to get size");
754                assert!(size > 0, "section {section} should still have data");
755            }
756
757            // Verify sections 6-10 are removed
758            for section in 6u64..=10 {
759                let size = journal.size(section).await.expect("failed to get size");
760                assert_eq!(size, 0, "section {section} should be removed");
761            }
762
763            // Verify data integrity via replay
764            {
765                let stream = journal
766                    .replay(0, 0, NZUsize!(1024))
767                    .await
768                    .expect("failed to replay");
769                pin_mut!(stream);
770                let mut items = Vec::new();
771                while let Some(result) = stream.next().await {
772                    let (section, _, item) = result.expect("failed to read");
773                    items.push((section, item));
774                }
775                assert_eq!(items.len(), 5);
776                for (i, (section, item)) in items.iter().enumerate() {
777                    assert_eq!(*section, (i + 1) as u64);
778                    assert_eq!(*item, test_digest((i + 1) as u64));
779                }
780            }
781
782            journal.destroy().await.expect("failed to destroy");
783        });
784    }
785
786    #[test_traced]
787    fn test_segmented_fixed_rewind_persistence() {
788        let executor = deterministic::Runner::default();
789        executor.start(|context| async move {
790            let cfg = test_cfg(&context);
791
792            // Create sections 1-5
793            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
794                .await
795                .expect("failed to init");
796            for section in 1u64..=5 {
797                journal
798                    .append(section, &test_digest(section))
799                    .await
800                    .expect("failed to append");
801            }
802            journal.sync_all().await.expect("failed to sync");
803
804            // Rewind to section 2
805            let size = journal.size(2).await.expect("failed to get size");
806            journal.rewind(2, size).await.expect("failed to rewind");
807            journal.sync_all().await.expect("failed to sync");
808            drop(journal);
809
810            // Re-init and verify only sections 1-2 exist
811            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
812                .await
813                .expect("failed to re-init");
814
815            // Verify sections 1-2 have data
816            for section in 1u64..=2 {
817                let size = journal.size(section).await.expect("failed to get size");
818                assert!(size > 0, "section {section} should have data after restart");
819            }
820
821            // Verify sections 3-5 are gone
822            for section in 3u64..=5 {
823                let size = journal.size(section).await.expect("failed to get size");
824                assert_eq!(size, 0, "section {section} should be gone after restart");
825            }
826
827            // Verify data integrity
828            let item1 = journal.get(1, 0).await.expect("failed to get");
829            assert_eq!(item1, test_digest(1));
830            let item2 = journal.get(2, 0).await.expect("failed to get");
831            assert_eq!(item2, test_digest(2));
832
833            journal.destroy().await.expect("failed to destroy");
834        });
835    }
836
837    #[test_traced]
838    fn test_segmented_fixed_corruption_recovery() {
839        let executor = deterministic::Runner::default();
840        executor.start(|context| async move {
841            let cfg = test_cfg(&context);
842            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
843                .await
844                .expect("failed to init");
845
846            for i in 0u64..5 {
847                journal
848                    .append(1, &test_digest(i))
849                    .await
850                    .expect("failed to append");
851            }
852            journal.sync_all().await.expect("failed to sync");
853            drop(journal);
854
855            let (blob, size) = context
856                .open(&cfg.partition, &1u64.to_be_bytes())
857                .await
858                .expect("failed to open blob");
859            blob.resize(size - 1).await.expect("failed to truncate");
860            blob.sync().await.expect("failed to sync");
861
862            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
863                .await
864                .expect("failed to re-init");
865
866            let count = {
867                let stream = journal
868                    .replay(0, 0, NZUsize!(1024))
869                    .await
870                    .expect("failed to replay");
871                pin_mut!(stream);
872
873                let mut count = 0;
874                while let Some(result) = stream.next().await {
875                    result.expect("should be ok");
876                    count += 1;
877                }
878                count
879            };
880            assert_eq!(count, 4);
881
882            journal.destroy().await.expect("failed to destroy");
883        });
884    }
885
886    #[test_traced]
887    fn test_segmented_fixed_persistence() {
888        let executor = deterministic::Runner::default();
889        executor.start(|context| async move {
890            let cfg = test_cfg(&context);
891
892            // Create and populate journal
893            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
894                .await
895                .expect("failed to init");
896
897            for i in 0u64..5 {
898                journal
899                    .append(1, &test_digest(i))
900                    .await
901                    .expect("failed to append");
902            }
903            journal.sync_all().await.expect("failed to sync");
904            drop(journal);
905
906            // Reopen and verify data persisted
907            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg)
908                .await
909                .expect("failed to re-init");
910
911            for i in 0u64..5 {
912                let item = journal.get(1, i).await.expect("failed to get");
913                assert_eq!(item, test_digest(i));
914            }
915
916            journal.destroy().await.expect("failed to destroy");
917        });
918    }
919
920    #[test_traced]
921    fn test_segmented_fixed_section_len() {
922        let executor = deterministic::Runner::default();
923        executor.start(|context| async move {
924            let cfg = test_cfg(&context);
925            let mut journal = Journal::init(context.clone(), cfg.clone())
926                .await
927                .expect("failed to init");
928
929            assert_eq!(journal.section_len(1).await.unwrap(), 0);
930
931            for i in 0u64..5 {
932                journal
933                    .append(1, &test_digest(i))
934                    .await
935                    .expect("failed to append");
936            }
937
938            assert_eq!(journal.section_len(1).await.unwrap(), 5);
939            assert_eq!(journal.section_len(2).await.unwrap(), 0);
940
941            journal.destroy().await.expect("failed to destroy");
942        });
943    }
944
945    #[test_traced]
946    fn test_segmented_fixed_non_contiguous_sections() {
947        // Test that sections with gaps in numbering work correctly.
948        // Sections 1, 5, 10 should all be independent and accessible.
949        let executor = deterministic::Runner::default();
950        executor.start(|context| async move {
951            let cfg = test_cfg(&context);
952            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
953                .await
954                .expect("failed to init");
955
956            // Create sections with gaps: 1, 5, 10
957            journal
958                .append(1, &test_digest(100))
959                .await
960                .expect("failed to append");
961            journal
962                .append(5, &test_digest(500))
963                .await
964                .expect("failed to append");
965            journal
966                .append(10, &test_digest(1000))
967                .await
968                .expect("failed to append");
969            journal.sync_all().await.expect("failed to sync");
970
971            // Verify random access to each section
972            assert_eq!(journal.get(1, 0).await.unwrap(), test_digest(100));
973            assert_eq!(journal.get(5, 0).await.unwrap(), test_digest(500));
974            assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(1000));
975
976            // Verify non-existent sections return appropriate errors
977            for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
978                let result = journal.get(missing_section, 0).await;
979                assert!(
980                    matches!(result, Err(Error::SectionOutOfRange(_))),
981                    "Expected SectionOutOfRange for section {}, got {:?}",
982                    missing_section,
983                    result
984                );
985            }
986
987            // Drop and reopen to test replay
988            drop(journal);
989            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
990                .await
991                .expect("failed to re-init");
992
993            // Replay and verify all items in order
994            {
995                let stream = journal
996                    .replay(0, 0, NZUsize!(1024))
997                    .await
998                    .expect("failed to replay");
999                pin_mut!(stream);
1000
1001                let mut items = Vec::new();
1002                while let Some(result) = stream.next().await {
1003                    let (section, _, item) = result.expect("replay error");
1004                    items.push((section, item));
1005                }
1006
1007                assert_eq!(items.len(), 3, "Should have 3 items");
1008                assert_eq!(items[0], (1, test_digest(100)));
1009                assert_eq!(items[1], (5, test_digest(500)));
1010                assert_eq!(items[2], (10, test_digest(1000)));
1011            }
1012
1013            // Test replay starting from middle section (5)
1014            {
1015                let stream = journal
1016                    .replay(5, 0, NZUsize!(1024))
1017                    .await
1018                    .expect("failed to replay from section 5");
1019                pin_mut!(stream);
1020
1021                let mut items = Vec::new();
1022                while let Some(result) = stream.next().await {
1023                    let (section, _, item) = result.expect("replay error");
1024                    items.push((section, item));
1025                }
1026
1027                assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
1028                assert_eq!(items[0], (5, test_digest(500)));
1029                assert_eq!(items[1], (10, test_digest(1000)));
1030            }
1031
1032            journal.destroy().await.expect("failed to destroy");
1033        });
1034    }
1035
1036    #[test_traced]
1037    fn test_segmented_fixed_empty_section_in_middle() {
1038        // Test that replay correctly handles an empty section between sections with data.
1039        // Section 1 has data, section 2 is empty, section 3 has data.
1040        let executor = deterministic::Runner::default();
1041        executor.start(|context| async move {
1042            let cfg = test_cfg(&context);
1043            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1044                .await
1045                .expect("failed to init");
1046
1047            // Append to section 1
1048            journal
1049                .append(1, &test_digest(100))
1050                .await
1051                .expect("failed to append");
1052
1053            // Create section 2 but make it empty via rewind
1054            journal
1055                .append(2, &test_digest(200))
1056                .await
1057                .expect("failed to append");
1058            journal.sync(2).await.expect("failed to sync");
1059            journal
1060                .rewind_section(2, 0)
1061                .await
1062                .expect("failed to rewind");
1063
1064            // Append to section 3
1065            journal
1066                .append(3, &test_digest(300))
1067                .await
1068                .expect("failed to append");
1069
1070            journal.sync_all().await.expect("failed to sync");
1071
1072            // Verify section lengths
1073            assert_eq!(journal.section_len(1).await.unwrap(), 1);
1074            assert_eq!(journal.section_len(2).await.unwrap(), 0);
1075            assert_eq!(journal.section_len(3).await.unwrap(), 1);
1076
1077            // Drop and reopen to test replay
1078            drop(journal);
1079            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1080                .await
1081                .expect("failed to re-init");
1082
1083            // Replay all - should get items from sections 1 and 3, skipping empty section 2
1084            {
1085                let stream = journal
1086                    .replay(0, 0, NZUsize!(1024))
1087                    .await
1088                    .expect("failed to replay");
1089                pin_mut!(stream);
1090
1091                let mut items = Vec::new();
1092                while let Some(result) = stream.next().await {
1093                    let (section, _, item) = result.expect("replay error");
1094                    items.push((section, item));
1095                }
1096
1097                assert_eq!(
1098                    items.len(),
1099                    2,
1100                    "Should have 2 items (skipping empty section)"
1101                );
1102                assert_eq!(items[0], (1, test_digest(100)));
1103                assert_eq!(items[1], (3, test_digest(300)));
1104            }
1105
1106            // Replay starting from empty section 2 - should get only section 3
1107            {
1108                let stream = journal
1109                    .replay(2, 0, NZUsize!(1024))
1110                    .await
1111                    .expect("failed to replay from section 2");
1112                pin_mut!(stream);
1113
1114                let mut items = Vec::new();
1115                while let Some(result) = stream.next().await {
1116                    let (section, _, item) = result.expect("replay error");
1117                    items.push((section, item));
1118                }
1119
1120                assert_eq!(items.len(), 1, "Should have 1 item from section 3");
1121                assert_eq!(items[0], (3, test_digest(300)));
1122            }
1123
1124            journal.destroy().await.expect("failed to destroy");
1125        });
1126    }
1127
1128    #[test_traced]
1129    fn test_segmented_fixed_truncation_recovery_across_page_boundary() {
1130        // Test that truncating a single byte from a blob that has items straddling a page boundary
1131        // correctly recovers by removing the incomplete item.
1132        //
1133        // With PAGE_SIZE=44 and ITEM_SIZE=32:
1134        // - Item 0: bytes 0-31
1135        // - Item 1: bytes 32-63 (straddles page boundary at 44)
1136        // - Item 2: bytes 64-95 (straddles page boundary at 88)
1137        //
1138        // After 3 items we have 96 bytes = 2 full pages + 8 bytes. Truncating 1 byte leaves 95
1139        // bytes, which is not a multiple of 32. Recovery should truncate to 64 bytes (2 complete
1140        // items).
1141        let executor = deterministic::Runner::default();
1142        executor.start(|context| async move {
1143            let cfg = test_cfg(&context);
1144            let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1145                .await
1146                .expect("failed to init");
1147
1148            // Append 3 items (just over 2 pages worth)
1149            for i in 0u64..3 {
1150                journal
1151                    .append(1, &test_digest(i))
1152                    .await
1153                    .expect("failed to append");
1154            }
1155            journal.sync_all().await.expect("failed to sync");
1156
1157            // Verify all 3 items are readable
1158            for i in 0u64..3 {
1159                let item = journal.get(1, i).await.expect("failed to get");
1160                assert_eq!(item, test_digest(i));
1161            }
1162            drop(journal);
1163
1164            // Truncate the blob by exactly 1 byte to simulate partial write
1165            let (blob, size) = context
1166                .open(&cfg.partition, &1u64.to_be_bytes())
1167                .await
1168                .expect("failed to open blob");
1169            blob.resize(size - 1).await.expect("failed to truncate");
1170            blob.sync().await.expect("failed to sync");
1171            drop(blob);
1172
1173            // Reopen journal - should recover by truncating last page due to failed checksum, and
1174            // end up with a correct blob size due to partial-item trimming.
1175            let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1176                .await
1177                .expect("failed to re-init");
1178
1179            // Verify section now has only 2 items
1180            assert_eq!(journal.section_len(1).await.unwrap(), 2);
1181
1182            // Verify size is the expected multiple of ITEM_SIZE (this would fail if we didn't trim
1183            // items and just relied on page-level checksum recovery).
1184            assert_eq!(journal.size(1).await.unwrap(), 64);
1185
1186            // Items 0 and 1 should still be readable
1187            let item0 = journal.get(1, 0).await.expect("failed to get item 0");
1188            assert_eq!(item0, test_digest(0));
1189            let item1 = journal.get(1, 1).await.expect("failed to get item 1");
1190            assert_eq!(item1, test_digest(1));
1191
1192            // Item 2 should return ItemOutOfRange
1193            let err = journal.get(1, 2).await;
1194            assert!(
1195                matches!(err, Err(Error::ItemOutOfRange(2))),
1196                "expected ItemOutOfRange(2), got {:?}",
1197                err
1198            );
1199
1200            journal.destroy().await.expect("failed to destroy");
1201        });
1202    }
1203
1204    #[test_traced]
1205    fn test_journal_clear() {
1206        let executor = deterministic::Runner::default();
1207        executor.start(|context| async move {
1208            let cfg = Config {
1209                partition: "clear-test".into(),
1210                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1211                write_buffer: NZUsize!(1024),
1212            };
1213
1214            let mut journal: Journal<_, Digest> =
1215                Journal::init(context.with_label("journal"), cfg.clone())
1216                    .await
1217                    .expect("Failed to initialize journal");
1218
1219            // Append items across multiple sections
1220            for section in 0..5u64 {
1221                for i in 0..10u64 {
1222                    journal
1223                        .append(section, &test_digest(section * 1000 + i))
1224                        .await
1225                        .expect("Failed to append");
1226                }
1227                journal.sync(section).await.expect("Failed to sync");
1228            }
1229
1230            // Verify we have data
1231            assert_eq!(journal.get(0, 0).await.unwrap(), test_digest(0));
1232            assert_eq!(journal.get(4, 0).await.unwrap(), test_digest(4000));
1233
1234            // Clear the journal
1235            journal.clear().await.expect("Failed to clear");
1236
1237            // After clear, all reads should fail
1238            for section in 0..5u64 {
1239                assert!(matches!(
1240                    journal.get(section, 0).await,
1241                    Err(Error::SectionOutOfRange(s)) if s == section
1242                ));
1243            }
1244
1245            // Append new data after clear
1246            for i in 0..5u64 {
1247                journal
1248                    .append(10, &test_digest(i * 100))
1249                    .await
1250                    .expect("Failed to append after clear");
1251            }
1252            journal.sync(10).await.expect("Failed to sync after clear");
1253
1254            // New data should be readable
1255            assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(0));
1256
1257            // Old sections should still be missing
1258            assert!(matches!(
1259                journal.get(0, 0).await,
1260                Err(Error::SectionOutOfRange(0))
1261            ));
1262
1263            journal.destroy().await.unwrap();
1264        });
1265    }
1266
1267    #[test_traced]
1268    fn test_last_missing_section_returns_error() {
1269        let executor = deterministic::Runner::default();
1270        executor.start(|context| async move {
1271            let cfg = test_cfg(&context);
1272            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1273                .await
1274                .expect("failed to init");
1275
1276            assert!(matches!(
1277                journal.last(0).await,
1278                Err(Error::SectionOutOfRange(0))
1279            ));
1280            assert!(matches!(
1281                journal.last(99).await,
1282                Err(Error::SectionOutOfRange(99))
1283            ));
1284
1285            journal.destroy().await.unwrap();
1286        });
1287    }
1288
1289    #[test_traced]
1290    fn test_last_after_rewind_to_zero() {
1291        let executor = deterministic::Runner::default();
1292        executor.start(|context| async move {
1293            let cfg = test_cfg(&context);
1294            let mut journal = Journal::init(context.clone(), cfg.clone())
1295                .await
1296                .expect("failed to init");
1297
1298            journal.append(0, &test_digest(0)).await.unwrap();
1299            journal.append(0, &test_digest(1)).await.unwrap();
1300            journal.sync(0).await.unwrap();
1301
1302            assert!(journal.last(0).await.unwrap().is_some());
1303
1304            journal.rewind(0, 0).await.unwrap();
1305            assert_eq!(journal.last(0).await.unwrap(), None);
1306
1307            journal.destroy().await.unwrap();
1308        });
1309    }
1310
1311    #[test_traced]
1312    fn test_last_pruned_section_returns_error() {
1313        let executor = deterministic::Runner::default();
1314        executor.start(|context| async move {
1315            let cfg = test_cfg(&context);
1316            let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1317                .await
1318                .expect("failed to init");
1319
1320            journal.append(0, &test_digest(0)).await.unwrap();
1321            journal.append(1, &test_digest(1)).await.unwrap();
1322            journal.sync_all().await.unwrap();
1323
1324            journal.prune(1).await.unwrap();
1325
1326            assert!(matches!(
1327                journal.last(0).await,
1328                Err(Error::AlreadyPrunedToSection(1))
1329            ));
1330            assert!(journal.last(1).await.unwrap().is_some());
1331
1332            journal.destroy().await.unwrap();
1333        });
1334    }
1335}