commonware_storage/journal/contiguous/
mod.rs

1//! Contiguous journals with position-based access.
2//!
3//! This module provides position-based journal implementations where items are stored
4//! contiguously and can be accessed by their position (0-indexed). Both [fixed]-size and
5//! [variable]-size item journals are supported.
6
7use super::Error;
8use futures::Stream;
9use std::num::NonZeroUsize;
10use tracing::warn;
11
12pub mod fixed;
13pub mod variable;
14
15#[cfg(test)]
16mod tests;
17
18/// Core trait for contiguous journals supporting sequential append operations.
19///
20/// A contiguous journal maintains a consecutively increasing position counter where each
21/// appended item receives a unique position starting from 0.
22pub trait Contiguous {
23    /// The type of items stored in the journal.
24    type Item;
25
26    /// Return the total number of items that have been appended to the journal.
27    ///
28    /// This count is NOT affected by pruning. The next appended item will receive this
29    /// position as its value.
30    fn size(&self) -> u64;
31
32    /// Return the position of the oldest item still retained in the journal.
33    ///
34    /// Returns `None` if the journal is empty or if all items have been pruned.
35    ///
36    /// After pruning, this returns the position of the first item that remains.
37    /// Note that due to section/blob alignment, this may be less than the `min_position`
38    /// passed to `prune()`.
39    fn oldest_retained_pos(&self) -> Option<u64>;
40
41    /// Return the location before which all items have been pruned.
42    ///
43    /// If this is the same as `size()`, then all items have been pruned.
44    fn pruning_boundary(&self) -> u64;
45
46    /// Return a stream of all items in the journal starting from `start_pos`.
47    ///
48    /// Each item is yielded as a tuple `(position, item)` where position is the item's
49    /// stable position in the journal.
50    ///
51    /// # Errors
52    ///
53    /// Returns an error if `start_pos` exceeds the journal size or if any storage/decoding
54    /// errors occur during replay.
55    fn replay(
56        &self,
57        start_pos: u64,
58        buffer: NonZeroUsize,
59    ) -> impl std::future::Future<
60        Output = Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error>,
61    >;
62
63    /// Read the item at the given position.
64    ///
65    /// # Errors
66    ///
67    /// - Returns [Error::ItemPruned] if the item at `position` has been pruned.
68    /// - Returns [Error::ItemOutOfRange] if the item at `position` does not exist.
69    fn read(&self, position: u64) -> impl std::future::Future<Output = Result<Self::Item, Error>>;
70}
71
72/// A [Contiguous] journal that supports appending, rewinding, and pruning.
73pub trait MutableContiguous: Contiguous {
74    /// Append a new item to the journal, returning its position.
75    ///
76    /// Positions are consecutively increasing starting from 0. The position of each item
77    /// is stable across pruning (i.e., if item X has position 5, it will always have
78    /// position 5 even if earlier items are pruned).
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if the underlying storage operation fails or if the item cannot
83    /// be encoded.
84    fn append(&mut self, item: Self::Item)
85        -> impl std::future::Future<Output = Result<u64, Error>>;
86
87    /// Prune items at positions strictly less than `min_position`.
88    ///
89    /// Returns `true` if any data was pruned, `false` otherwise.
90    ///
91    /// # Behavior
92    ///
93    /// - If `min_position > size()`, the prune is capped to `size()` (no error is returned)
94    /// - Some items with positions less than `min_position` may be retained due to
95    ///   section/blob alignment
96    /// - This operation is not atomic, but implementations guarantee the journal is left in a
97    ///   recoverable state if a crash occurs during pruning
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if the underlying storage operation fails.
102    fn prune(
103        &mut self,
104        min_position: u64,
105    ) -> impl std::future::Future<Output = Result<bool, Error>>;
106
107    /// Rewind the journal to the given size, discarding items from the end.
108    ///
109    /// After rewinding to size N, the journal will contain exactly N items (positions 0 to N-1),
110    /// and the next append will receive position N.
111    ///
112    /// # Behavior
113    ///
114    /// - If `size > current_size()`, returns [Error::InvalidRewind]
115    /// - If `size == current_size()`, this is a no-op
116    /// - If `size < oldest_retained_pos()`, returns [Error::InvalidRewind] (can't rewind to pruned
117    ///   data)
118    /// - This operation is not atomic, but implementations guarantee the journal is left in a
119    ///   recoverable state if a crash occurs during rewinding
120    ///
121    /// # Warnings
122    ///
123    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
124    ///
125    /// # Errors
126    ///
127    /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
128    /// Returns an error if the underlying storage operation fails.
129    fn rewind(&mut self, size: u64) -> impl std::future::Future<Output = Result<(), Error>>;
130
131    /// Rewinds the journal to the last item matching `predicate`. If no item matches, the journal
132    /// is rewound to the pruning boundary, discarding all unpruned items.
133    ///
134    /// # Warnings
135    ///
136    /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
137    fn rewind_to<'a, P>(
138        &'a mut self,
139        mut predicate: P,
140    ) -> impl std::future::Future<Output = Result<u64, Error>> + 'a
141    where
142        P: FnMut(&Self::Item) -> bool + 'a,
143    {
144        async move {
145            let journal_size = self.size();
146            let pruning_boundary = self.pruning_boundary();
147            let mut rewind_size = journal_size;
148
149            while rewind_size > pruning_boundary {
150                let item = self.read(rewind_size - 1).await?;
151                if predicate(&item) {
152                    break;
153                }
154                rewind_size -= 1;
155            }
156
157            if rewind_size != journal_size {
158                let rewound_items = journal_size - rewind_size;
159                warn!(journal_size, rewound_items, "rewinding journal items");
160                self.rewind(rewind_size).await?;
161            }
162
163            Ok(rewind_size)
164        }
165    }
166}
167
168pub trait PersistableContiguous: MutableContiguous {
169    /// Durably persist the journal but does not write all data, potentially leaving recovery
170    /// required on startup.
171    ///
172    /// For a stronger guarantee that eliminates potential recovery, use [Self::sync] instead.
173    fn commit(&mut self) -> impl std::future::Future<Output = Result<(), Error>>;
174
175    /// Durably persist the journal and write all data, guaranteeing no recovery will be required
176    /// on startup.
177    ///
178    /// This provides a stronger guarantee than [Self::commit] but may be slower.
179    fn sync(&mut self) -> impl std::future::Future<Output = Result<(), Error>>;
180
181    /// Close the journal, syncing all pending writes and releasing resources.
182    fn close(self) -> impl std::future::Future<Output = Result<(), Error>>;
183
184    /// Destroy the journal, removing all associated storage.
185    ///
186    /// This method consumes the journal and deletes all persisted data including blobs,
187    /// metadata, and any other storage artifacts. Use this for cleanup in tests or when
188    /// permanently removing a journal.
189    fn destroy(self) -> impl std::future::Future<Output = Result<(), Error>>;
190}