Skip to main content

commonware_storage/journal/contiguous/
mod.rs

1//! Contiguous journals with position-based access.
2//!
3//! This module provides position-based journal implementations where items are stored
4//! contiguously and can be accessed by their position (0-indexed). Both [fixed]-size and
5//! [variable]-size item journals are supported.
6
7use super::Error;
8use futures::Stream;
9use std::{future::Future, num::NonZeroUsize, ops::Range};
10use tracing::warn;
11
12pub mod fixed;
13mod metrics;
14pub mod variable;
15
16#[cfg(test)]
17mod tests;
18
19/// A reader guard that holds a consistent view of the journal.
20///
21/// While this guard exists, operations that may modify the bounds (such as `append`, `prune`, and
22/// `rewind`) will block until the guard is dropped. This keeps bounds stable, so any position
23/// within `bounds()` is guaranteed readable.
24//
25// TODO(<https://github.com/commonwarexyz/monorepo/issues/3084>): Relax locking to allow `append`
26// since it doesn't invalidate reads within the cached bounds.
27pub trait Reader: Send + Sync {
28    /// The type of items stored in the journal.
29    type Item;
30
31    /// Returns [start, end) with a guaranteed stable pruning boundary.
32    fn bounds(&self) -> Range<u64>;
33
34    /// Read the item at the given position.
35    ///
36    /// Guaranteed not to return [Error::ItemPruned] for positions within `bounds()`.
37    fn read(&self, position: u64) -> impl Future<Output = Result<Self::Item, Error>> + Send + Sync;
38
39    /// Read multiple items at the given positions, which must be strictly increasing.
40    ///
41    /// The default implementation calls [`read`](Self::read) in a loop. Concrete journal
42    /// implementations override this to amortize lock acquisition and batch I/O.
43    fn read_many(
44        &self,
45        positions: &[u64],
46    ) -> impl Future<Output = Result<Vec<Self::Item>, Error>> + Send
47    where
48        Self::Item: Send,
49    {
50        async move {
51            let mut items = Vec::with_capacity(positions.len());
52            for &pos in positions {
53                items.push(self.read(pos).await?);
54            }
55            Ok(items)
56        }
57    }
58
59    /// Read an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
60    ///
61    /// Default implementation always returns `None`.
62    fn try_read_sync(&self, _position: u64) -> Option<Self::Item> {
63        None
64    }
65
66    /// Return a stream of all items starting from `start_pos`.
67    ///
68    /// Because the reader holds the lock, validation and stream setup happen
69    /// atomically with respect to `prune()`.
70    fn replay(
71        &self,
72        buffer: NonZeroUsize,
73        start_pos: u64,
74    ) -> impl Future<
75        Output = Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + Send, Error>,
76    > + Send;
77}
78
79/// Journals that support sequential append operations.
80///
81/// Maintains a monotonically increasing position counter where each appended item receives a unique
82/// position starting from 0.
83pub trait Contiguous: Send + Sync {
84    /// The type of items stored in the journal.
85    type Item;
86
87    /// Acquire a reader guard that holds a consistent view of the journal.
88    ///
89    /// While the returned guard exists, operations that need the journal's
90    /// internal write lock (such as `append`, `prune`, and `rewind`) may block
91    /// until the guard is dropped. This ensures any position within
92    /// `reader.bounds()` remains readable.
93    fn reader(&self) -> impl Future<Output = impl Reader<Item = Self::Item> + '_> + Send;
94
95    /// Return the total number of items that have been appended to the journal.
96    ///
97    /// This count is NOT affected by pruning. The next appended item will receive this
98    /// position as its value. Equivalent to [`Reader::bounds`]`.end`.
99    fn size(&self) -> impl Future<Output = u64> + Send;
100}
101
102/// Items to append via [`Mutable::append_many`].
103///
104/// `Flat` wraps a single contiguous slice; `Nested` wraps multiple slices that are
105/// appended in order under a single lock acquisition.
106pub enum Many<'a, T> {
107    /// A single contiguous slice of items.
108    Flat(&'a [T]),
109    /// Multiple slices of items, appended in order.
110    Nested(&'a [&'a [T]]),
111}
112
113impl<T> Many<'_, T> {
114    /// Returns the total number of items across all segments.
115    pub fn len(&self) -> usize {
116        match self {
117            Self::Flat(items) => items.len(),
118            Self::Nested(nested_items) => nested_items.iter().map(|items| items.len()).sum(),
119        }
120    }
121
122    /// Returns `true` if there are no items across all segments.
123    pub fn is_empty(&self) -> bool {
124        match self {
125            Self::Flat(items) => items.is_empty(),
126            Self::Nested(nested_items) => nested_items.iter().all(|items| items.is_empty()),
127        }
128    }
129}
130
131/// A [Contiguous] journal that supports appending, rewinding, and pruning.
132pub trait Mutable: Contiguous + Send + Sync {
133    /// Append a new item to the journal, returning its position.
134    ///
135    /// Positions are consecutively increasing starting from 0. The position of each item
136    /// is stable across pruning (i.e., if item X has position 5, it will always have
137    /// position 5 even if earlier items are pruned).
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if the underlying storage operation fails or if the item cannot
142    /// be encoded.
143    fn append(
144        &mut self,
145        item: &Self::Item,
146    ) -> impl std::future::Future<Output = Result<u64, Error>> + Send;
147
148    /// Append items to the journal, returning the position of the last item appended.
149    ///
150    /// The default implementation calls [Self::append] in a loop. Concrete implementations
151    /// may override this to acquire the write lock once for all items.
152    ///
153    /// Returns [Error::EmptyAppend] if items is empty.
154    fn append_many<'a>(
155        &'a mut self,
156        items: Many<'a, Self::Item>,
157    ) -> impl std::future::Future<Output = Result<u64, Error>> + Send + 'a
158    where
159        Self::Item: Sync,
160    {
161        async move {
162            if items.is_empty() {
163                return Err(Error::EmptyAppend);
164            }
165            let mut last_pos = self.size().await;
166            match items {
167                Many::Flat(items) => {
168                    for item in items {
169                        last_pos = self.append(item).await?;
170                    }
171                }
172                Many::Nested(nested_items) => {
173                    for items in nested_items {
174                        for item in *items {
175                            last_pos = self.append(item).await?;
176                        }
177                    }
178                }
179            }
180            Ok(last_pos)
181        }
182    }
183
184    /// Prune items at positions strictly less than `min_position`.
185    ///
186    /// Returns `true` if any data was pruned, `false` otherwise.
187    ///
188    /// # Behavior
189    ///
190    /// - If `min_position > bounds.end`, the prune is capped to `bounds.end` (no error is returned)
191    /// - Some items with positions less than `min_position` may be retained due to
192    ///   section/blob alignment
193    /// - This operation is not atomic, but implementations guarantee the journal is left in a
194    ///   recoverable state if a crash occurs during pruning
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if the underlying storage operation fails.
199    fn prune(
200        &mut self,
201        min_position: u64,
202    ) -> impl std::future::Future<Output = Result<bool, Error>> + Send;
203
204    /// Rewind the journal to the given size, discarding items from the end.
205    ///
206    /// After rewinding to size N, the journal will contain exactly N items (positions 0 to N-1),
207    /// and the next append will receive position N.
208    ///
209    /// # Behavior
210    ///
211    /// - If `size > bounds.end`, returns [Error::InvalidRewind]
212    /// - If `size == bounds.end`, this is a no-op
213    /// - If `size < bounds.start`, returns [Error::ItemPruned] (can't rewind to pruned data)
214    /// - This operation is not atomic, but implementations guarantee the journal is left in a
215    ///   recoverable state if a crash occurs during rewinding
216    ///
217    /// # Warnings
218    ///
219    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
220    ///
221    /// # Errors
222    ///
223    /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
224    /// Returns an error if the underlying storage operation fails.
225    fn rewind(&mut self, size: u64) -> impl std::future::Future<Output = Result<(), Error>> + Send;
226
227    /// Rewinds the journal to the last item matching `predicate`. If no item matches, the journal
228    /// is rewound to the pruning boundary, discarding all unpruned items.
229    ///
230    /// # Warnings
231    ///
232    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
233    fn rewind_to<'a, P>(
234        &'a mut self,
235        mut predicate: P,
236    ) -> impl std::future::Future<Output = Result<u64, Error>> + Send + 'a
237    where
238        P: FnMut(&Self::Item) -> bool + Send + 'a,
239    {
240        async move {
241            let (bounds, rewind_size) = {
242                let reader = self.reader().await;
243                let bounds = reader.bounds();
244                let mut rewind_size = bounds.end;
245
246                while rewind_size > bounds.start {
247                    let item = reader.read(rewind_size - 1).await?;
248                    if predicate(&item) {
249                        break;
250                    }
251                    rewind_size -= 1;
252                }
253
254                (bounds, rewind_size)
255            };
256
257            if rewind_size != bounds.end {
258                let rewound_items = bounds.end - rewind_size;
259                warn!(
260                    journal_size = bounds.end,
261                    rewound_items, "rewinding journal items"
262                );
263                self.rewind(rewind_size).await?;
264            }
265
266            Ok(rewind_size)
267        }
268    }
269}