Skip to main content

commonware_storage/journal/contiguous/
mod.rs

1//! Contiguous journals with position-based access.
2//!
3//! This module provides position-based journal implementations where items are stored
4//! contiguously and can be accessed by their position (0-indexed). Both [fixed]-size and
5//! [variable]-size item journals are supported.
6
7use super::Error;
8use futures::Stream;
9use std::{future::Future, num::NonZeroUsize, ops::Range};
10use tracing::warn;
11
12pub mod fixed;
13pub mod variable;
14
15#[cfg(test)]
16mod tests;
17
18/// A reader guard that holds a consistent view of the journal.
19///
20/// While this guard exists, operations that may modify the bounds (such as `append`, `prune`, and
21/// `rewind`) will block until the guard is dropped. This keeps bounds stable, so any position
22/// within `bounds()` is guaranteed readable.
23//
24// TODO(<https://github.com/commonwarexyz/monorepo/issues/3084>): Relax locking to allow `append`
25// since it doesn't invalidate reads within the cached bounds.
26pub trait Reader: Send + Sync {
27    /// The type of items stored in the journal.
28    type Item;
29
30    /// Returns [start, end) with a guaranteed stable pruning boundary.
31    fn bounds(&self) -> Range<u64>;
32
33    /// Read the item at the given position.
34    ///
35    /// Guaranteed not to return [Error::ItemPruned] for positions within `bounds()`.
36    fn read(&self, position: u64) -> impl Future<Output = Result<Self::Item, Error>> + Send;
37
38    /// Return a stream of all items starting from `start_pos`.
39    ///
40    /// Because the reader holds the lock, validation and stream setup happen
41    /// atomically with respect to `prune()`.
42    fn replay(
43        &self,
44        buffer: NonZeroUsize,
45        start_pos: u64,
46    ) -> impl Future<
47        Output = Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + Send, Error>,
48    > + Send;
49}
50
51/// Journals that support sequential append operations.
52///
53/// Maintains a monotonically increasing position counter where each appended item receives a unique
54/// position starting from 0.
55pub trait Contiguous: Send + Sync {
56    /// The type of items stored in the journal.
57    type Item;
58
59    /// Acquire a reader guard that holds a consistent view of the journal.
60    ///
61    /// While the returned guard exists, operations that need the journal's
62    /// internal write lock (such as `append`, `prune`, and `rewind`) may block
63    /// until the guard is dropped. This ensures any position within
64    /// `reader.bounds()` remains readable.
65    fn reader(&self) -> impl Future<Output = impl Reader<Item = Self::Item> + '_> + Send;
66
67    /// Return the total number of items that have been appended to the journal.
68    ///
69    /// This count is NOT affected by pruning. The next appended item will receive this
70    /// position as its value. Equivalent to [`Reader::bounds`]`.end`.
71    fn size(&self) -> impl Future<Output = u64> + Send;
72}
73
74/// A [Contiguous] journal that supports appending, rewinding, and pruning.
75pub trait Mutable: Contiguous + Send + Sync {
76    /// Append a new item to the journal, returning its position.
77    ///
78    /// Positions are consecutively increasing starting from 0. The position of each item
79    /// is stable across pruning (i.e., if item X has position 5, it will always have
80    /// position 5 even if earlier items are pruned).
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the underlying storage operation fails or if the item cannot
85    /// be encoded.
86    fn append(
87        &mut self,
88        item: &Self::Item,
89    ) -> impl std::future::Future<Output = Result<u64, Error>> + Send;
90
91    /// Prune items at positions strictly less than `min_position`.
92    ///
93    /// Returns `true` if any data was pruned, `false` otherwise.
94    ///
95    /// # Behavior
96    ///
97    /// - If `min_position > bounds.end`, the prune is capped to `bounds.end` (no error is returned)
98    /// - Some items with positions less than `min_position` may be retained due to
99    ///   section/blob alignment
100    /// - This operation is not atomic, but implementations guarantee the journal is left in a
101    ///   recoverable state if a crash occurs during pruning
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the underlying storage operation fails.
106    fn prune(
107        &mut self,
108        min_position: u64,
109    ) -> impl std::future::Future<Output = Result<bool, Error>> + Send;
110
111    /// Rewind the journal to the given size, discarding items from the end.
112    ///
113    /// After rewinding to size N, the journal will contain exactly N items (positions 0 to N-1),
114    /// and the next append will receive position N.
115    ///
116    /// # Behavior
117    ///
118    /// - If `size > bounds.end`, returns [Error::InvalidRewind]
119    /// - If `size == bounds.end`, this is a no-op
120    /// - If `size < bounds.start`, returns [Error::ItemPruned] (can't rewind to pruned data)
121    /// - This operation is not atomic, but implementations guarantee the journal is left in a
122    ///   recoverable state if a crash occurs during rewinding
123    ///
124    /// # Warnings
125    ///
126    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
127    ///
128    /// # Errors
129    ///
130    /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
131    /// Returns an error if the underlying storage operation fails.
132    fn rewind(&mut self, size: u64) -> impl std::future::Future<Output = Result<(), Error>> + Send;
133
134    /// Rewinds the journal to the last item matching `predicate`. If no item matches, the journal
135    /// is rewound to the pruning boundary, discarding all unpruned items.
136    ///
137    /// # Warnings
138    ///
139    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
140    fn rewind_to<'a, P>(
141        &'a mut self,
142        mut predicate: P,
143    ) -> impl std::future::Future<Output = Result<u64, Error>> + Send + 'a
144    where
145        P: FnMut(&Self::Item) -> bool + Send + 'a,
146    {
147        async move {
148            let (bounds, rewind_size) = {
149                let reader = self.reader().await;
150                let bounds = reader.bounds();
151                let mut rewind_size = bounds.end;
152
153                while rewind_size > bounds.start {
154                    let item = reader.read(rewind_size - 1).await?;
155                    if predicate(&item) {
156                        break;
157                    }
158                    rewind_size -= 1;
159                }
160
161                (bounds, rewind_size)
162            };
163
164            if rewind_size != bounds.end {
165                let rewound_items = bounds.end - rewind_size;
166                warn!(
167                    journal_size = bounds.end,
168                    rewound_items, "rewinding journal items"
169                );
170                self.rewind(rewind_size).await?;
171            }
172
173            Ok(rewind_size)
174        }
175    }
176}