Skip to main content

commonware_storage/journal/contiguous/
mod.rs

1//! Contiguous journals with position-based access.
2//!
3//! This module provides position-based journal implementations where items are stored
4//! contiguously and can be accessed by their position (0-indexed). Both [fixed]-size and
5//! [variable]-size item journals are supported.
6
7use super::Error;
8use futures::Stream;
9use std::{future::Future, num::NonZeroUsize, ops::Range};
10use tracing::warn;
11
12pub mod fixed;
13pub mod variable;
14
15#[cfg(test)]
16mod tests;
17
18/// Core trait for contiguous journals supporting sequential append operations.
19///
20/// A contiguous journal maintains a consecutively increasing position counter where each
21/// appended item receives a unique position starting from 0.
22pub trait Contiguous: Send + Sync {
23    /// The type of items stored in the journal.
24    type Item;
25
26    /// Returns [start, end) where `start` and `end - 1` are the indices of the oldest and newest
27    /// retained operations respectively.
28    fn bounds(&self) -> Range<u64>;
29
30    /// Return the total number of items that have been appended to the journal.
31    ///
32    /// This count is NOT affected by pruning. The next appended item will receive this
33    /// position as its value.
34    ///
35    /// Equivalent to `bounds().end`.
36    fn size(&self) -> u64 {
37        self.bounds().end
38    }
39
40    /// Return a stream of all items in the journal starting from `start_pos`.
41    ///
42    /// Each item is yielded as a tuple `(position, item)` where position is the item's
43    /// stable position in the journal.
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if `start_pos` exceeds the journal size or if any storage/decoding
48    /// errors occur during replay.
49    fn replay(
50        &self,
51        start_pos: u64,
52        buffer: NonZeroUsize,
53    ) -> impl std::future::Future<
54        Output = Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + Send + '_, Error>,
55    > + Send;
56
57    /// Read the item at the given position.
58    ///
59    /// # Errors
60    ///
61    /// - Returns [Error::ItemPruned] if the item at `position` has been pruned.
62    /// - Returns [Error::ItemOutOfRange] if the item at `position` does not exist.
63    fn read(&self, position: u64) -> impl Future<Output = Result<Self::Item, Error>> + Send;
64}
65
66/// A [Contiguous] journal that supports appending, rewinding, and pruning.
67pub trait MutableContiguous: Contiguous + Send + Sync {
68    /// Append a new item to the journal, returning its position.
69    ///
70    /// Positions are consecutively increasing starting from 0. The position of each item
71    /// is stable across pruning (i.e., if item X has position 5, it will always have
72    /// position 5 even if earlier items are pruned).
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the underlying storage operation fails or if the item cannot
77    /// be encoded.
78    fn append(
79        &mut self,
80        item: Self::Item,
81    ) -> impl std::future::Future<Output = Result<u64, Error>> + Send;
82
83    /// Prune items at positions strictly less than `min_position`.
84    ///
85    /// Returns `true` if any data was pruned, `false` otherwise.
86    ///
87    /// # Behavior
88    ///
89    /// - If `min_position > bounds.end`, the prune is capped to `bounds.end` (no error is returned)
90    /// - Some items with positions less than `min_position` may be retained due to
91    ///   section/blob alignment
92    /// - This operation is not atomic, but implementations guarantee the journal is left in a
93    ///   recoverable state if a crash occurs during pruning
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the underlying storage operation fails.
98    fn prune(
99        &mut self,
100        min_position: u64,
101    ) -> impl std::future::Future<Output = Result<bool, Error>> + Send;
102
103    /// Rewind the journal to the given size, discarding items from the end.
104    ///
105    /// After rewinding to size N, the journal will contain exactly N items (positions 0 to N-1),
106    /// and the next append will receive position N.
107    ///
108    /// # Behavior
109    ///
110    /// - If `size > bounds.end`, returns [Error::InvalidRewind]
111    /// - If `size == bounds.end`, this is a no-op
112    /// - If `size < bounds.start`, returns [Error::ItemPruned] (can't rewind to pruned data)
113    /// - This operation is not atomic, but implementations guarantee the journal is left in a
114    ///   recoverable state if a crash occurs during rewinding
115    ///
116    /// # Warnings
117    ///
118    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
119    ///
120    /// # Errors
121    ///
122    /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
123    /// Returns an error if the underlying storage operation fails.
124    fn rewind(&mut self, size: u64) -> impl std::future::Future<Output = Result<(), Error>> + Send;
125
126    /// Rewinds the journal to the last item matching `predicate`. If no item matches, the journal
127    /// is rewound to the pruning boundary, discarding all unpruned items.
128    ///
129    /// # Warnings
130    ///
131    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
132    fn rewind_to<'a, P>(
133        &'a mut self,
134        mut predicate: P,
135    ) -> impl std::future::Future<Output = Result<u64, Error>> + Send + 'a
136    where
137        P: FnMut(&Self::Item) -> bool + Send + 'a,
138    {
139        async move {
140            let bounds = self.bounds();
141            let mut rewind_size = bounds.end;
142
143            while rewind_size > bounds.start {
144                let item = self.read(rewind_size - 1).await?;
145                if predicate(&item) {
146                    break;
147                }
148                rewind_size -= 1;
149            }
150
151            if rewind_size != bounds.end {
152                let rewound_items = bounds.end - rewind_size;
153                warn!(
154                    journal_size = bounds.end,
155                    rewound_items, "rewinding journal items"
156                );
157                self.rewind(rewind_size).await?;
158            }
159
160            Ok(rewind_size)
161        }
162    }
163}