Skip to main content

commonware_storage/journal/contiguous/
mod.rs

1//! Contiguous journals with position-based access.
2//!
3//! This module provides position-based journal implementations where items are stored
4//! contiguously and can be accessed by their position (0-indexed). Both [fixed]-size and
5//! [variable]-size item journals are supported.
6
7use super::Error;
8use futures::Stream;
9use std::{future::Future, num::NonZeroUsize, ops::Range};
10use tracing::warn;
11
12pub mod fixed;
13pub mod variable;
14
15#[cfg(test)]
16mod tests;
17
18/// A reader guard that holds a consistent view of the journal.
19///
20/// While this guard exists, operations that may modify the bounds (such as `append`, `prune`, and
21/// `rewind`) will block until the guard is dropped. This keeps bounds stable, so any position
22/// within `bounds()` is guaranteed readable.
23//
24// TODO(<https://github.com/commonwarexyz/monorepo/issues/3084>): Relax locking to allow `append`
25// since it doesn't invalidate reads within the cached bounds.
26pub trait Reader: Send + Sync {
27    /// The type of items stored in the journal.
28    type Item;
29
30    /// Returns [start, end) with a guaranteed stable pruning boundary.
31    fn bounds(&self) -> Range<u64>;
32
33    /// Read the item at the given position.
34    ///
35    /// Guaranteed not to return [Error::ItemPruned] for positions within `bounds()`.
36    fn read(&self, position: u64) -> impl Future<Output = Result<Self::Item, Error>> + Send;
37
38    /// Read an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
39    ///
40    /// Default implementation always returns `None`.
41    fn try_read_sync(&self, _position: u64) -> Option<Self::Item> {
42        None
43    }
44
45    /// Return a stream of all items starting from `start_pos`.
46    ///
47    /// Because the reader holds the lock, validation and stream setup happen
48    /// atomically with respect to `prune()`.
49    fn replay(
50        &self,
51        buffer: NonZeroUsize,
52        start_pos: u64,
53    ) -> impl Future<
54        Output = Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + Send, Error>,
55    > + Send;
56}
57
58/// Journals that support sequential append operations.
59///
60/// Maintains a monotonically increasing position counter where each appended item receives a unique
61/// position starting from 0.
62pub trait Contiguous: Send + Sync {
63    /// The type of items stored in the journal.
64    type Item;
65
66    /// Acquire a reader guard that holds a consistent view of the journal.
67    ///
68    /// While the returned guard exists, operations that need the journal's
69    /// internal write lock (such as `append`, `prune`, and `rewind`) may block
70    /// until the guard is dropped. This ensures any position within
71    /// `reader.bounds()` remains readable.
72    fn reader(&self) -> impl Future<Output = impl Reader<Item = Self::Item> + '_> + Send;
73
74    /// Return the total number of items that have been appended to the journal.
75    ///
76    /// This count is NOT affected by pruning. The next appended item will receive this
77    /// position as its value. Equivalent to [`Reader::bounds`]`.end`.
78    fn size(&self) -> impl Future<Output = u64> + Send;
79}
80
81/// Items to append via [`Mutable::append_many`].
82///
83/// `Flat` wraps a single contiguous slice; `Nested` wraps multiple slices that are
84/// appended in order under a single lock acquisition.
85pub enum Many<'a, T> {
86    /// A single contiguous slice of items.
87    Flat(&'a [T]),
88    /// Multiple slices of items, appended in order.
89    Nested(&'a [&'a [T]]),
90}
91
92impl<T> Many<'_, T> {
93    /// Returns `true` if there are no items across all segments.
94    pub fn is_empty(&self) -> bool {
95        match self {
96            Self::Flat(items) => items.is_empty(),
97            Self::Nested(nested_items) => nested_items.iter().all(|items| items.is_empty()),
98        }
99    }
100}
101
102/// A [Contiguous] journal that supports appending, rewinding, and pruning.
103pub trait Mutable: Contiguous + Send + Sync {
104    /// Append a new item to the journal, returning its position.
105    ///
106    /// Positions are consecutively increasing starting from 0. The position of each item
107    /// is stable across pruning (i.e., if item X has position 5, it will always have
108    /// position 5 even if earlier items are pruned).
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if the underlying storage operation fails or if the item cannot
113    /// be encoded.
114    fn append(
115        &mut self,
116        item: &Self::Item,
117    ) -> impl std::future::Future<Output = Result<u64, Error>> + Send;
118
119    /// Append items to the journal, returning the position of the last item appended.
120    ///
121    /// The default implementation calls [Self::append] in a loop. Concrete implementations
122    /// may override this to acquire the write lock once for all items.
123    ///
124    /// Returns [Error::EmptyAppend] if items is empty.
125    fn append_many<'a>(
126        &'a mut self,
127        items: Many<'a, Self::Item>,
128    ) -> impl std::future::Future<Output = Result<u64, Error>> + Send + 'a
129    where
130        Self::Item: Sync,
131    {
132        async move {
133            if items.is_empty() {
134                return Err(Error::EmptyAppend);
135            }
136            let mut last_pos = self.size().await;
137            match items {
138                Many::Flat(items) => {
139                    for item in items {
140                        last_pos = self.append(item).await?;
141                    }
142                }
143                Many::Nested(nested_items) => {
144                    for items in nested_items {
145                        for item in *items {
146                            last_pos = self.append(item).await?;
147                        }
148                    }
149                }
150            }
151            Ok(last_pos)
152        }
153    }
154
155    /// Prune items at positions strictly less than `min_position`.
156    ///
157    /// Returns `true` if any data was pruned, `false` otherwise.
158    ///
159    /// # Behavior
160    ///
161    /// - If `min_position > bounds.end`, the prune is capped to `bounds.end` (no error is returned)
162    /// - Some items with positions less than `min_position` may be retained due to
163    ///   section/blob alignment
164    /// - This operation is not atomic, but implementations guarantee the journal is left in a
165    ///   recoverable state if a crash occurs during pruning
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if the underlying storage operation fails.
170    fn prune(
171        &mut self,
172        min_position: u64,
173    ) -> impl std::future::Future<Output = Result<bool, Error>> + Send;
174
175    /// Rewind the journal to the given size, discarding items from the end.
176    ///
177    /// After rewinding to size N, the journal will contain exactly N items (positions 0 to N-1),
178    /// and the next append will receive position N.
179    ///
180    /// # Behavior
181    ///
182    /// - If `size > bounds.end`, returns [Error::InvalidRewind]
183    /// - If `size == bounds.end`, this is a no-op
184    /// - If `size < bounds.start`, returns [Error::ItemPruned] (can't rewind to pruned data)
185    /// - This operation is not atomic, but implementations guarantee the journal is left in a
186    ///   recoverable state if a crash occurs during rewinding
187    ///
188    /// # Warnings
189    ///
190    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
191    ///
192    /// # Errors
193    ///
194    /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
195    /// Returns an error if the underlying storage operation fails.
196    fn rewind(&mut self, size: u64) -> impl std::future::Future<Output = Result<(), Error>> + Send;
197
198    /// Rewinds the journal to the last item matching `predicate`. If no item matches, the journal
199    /// is rewound to the pruning boundary, discarding all unpruned items.
200    ///
201    /// # Warnings
202    ///
203    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
204    fn rewind_to<'a, P>(
205        &'a mut self,
206        mut predicate: P,
207    ) -> impl std::future::Future<Output = Result<u64, Error>> + Send + 'a
208    where
209        P: FnMut(&Self::Item) -> bool + Send + 'a,
210    {
211        async move {
212            let (bounds, rewind_size) = {
213                let reader = self.reader().await;
214                let bounds = reader.bounds();
215                let mut rewind_size = bounds.end;
216
217                while rewind_size > bounds.start {
218                    let item = reader.read(rewind_size - 1).await?;
219                    if predicate(&item) {
220                        break;
221                    }
222                    rewind_size -= 1;
223                }
224
225                (bounds, rewind_size)
226            };
227
228            if rewind_size != bounds.end {
229                let rewound_items = bounds.end - rewind_size;
230                warn!(
231                    journal_size = bounds.end,
232                    rewound_items, "rewinding journal items"
233                );
234                self.rewind(rewind_size).await?;
235            }
236
237            Ok(rewind_size)
238        }
239    }
240}