commonware_storage/journal/contiguous/mod.rs
1//! Contiguous journals with position-based access.
2//!
3//! This module provides position-based journal implementations where items are stored
4//! contiguously and can be accessed by their position (0-indexed). Both [fixed]-size and
5//! [variable]-size item journals are supported.
6
7use super::Error;
8use futures::Stream;
9use std::{future::Future, num::NonZeroUsize};
10use tracing::warn;
11
12pub mod fixed;
13pub mod variable;
14
15#[cfg(test)]
16mod tests;
17
18/// Core trait for contiguous journals supporting sequential append operations.
19///
20/// A contiguous journal maintains a consecutively increasing position counter where each
21/// appended item receives a unique position starting from 0.
22pub trait Contiguous: Send + Sync {
23 /// The type of items stored in the journal.
24 type Item;
25
26 /// Return the total number of items that have been appended to the journal.
27 ///
28 /// This count is NOT affected by pruning. The next appended item will receive this
29 /// position as its value.
30 fn size(&self) -> u64;
31
32 /// Return the position of the oldest item still retained in the journal.
33 ///
34 /// Returns `None` if the journal is empty or if all items have been pruned.
35 ///
36 /// After pruning, this returns the position of the first item that remains.
37 /// Note that due to section/blob alignment, this may be less than the `min_position`
38 /// passed to `prune()`.
39 fn oldest_retained_pos(&self) -> Option<u64>;
40
41 /// Return the location before which all items have been pruned.
42 ///
43 /// If this is the same as `size()`, then all items have been pruned.
44 fn pruning_boundary(&self) -> u64;
45
46 /// Return a stream of all items in the journal starting from `start_pos`.
47 ///
48 /// Each item is yielded as a tuple `(position, item)` where position is the item's
49 /// stable position in the journal.
50 ///
51 /// # Errors
52 ///
53 /// Returns an error if `start_pos` exceeds the journal size or if any storage/decoding
54 /// errors occur during replay.
55 fn replay(
56 &self,
57 start_pos: u64,
58 buffer: NonZeroUsize,
59 ) -> impl std::future::Future<
60 Output = Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + Send + '_, Error>,
61 > + Send;
62
63 /// Read the item at the given position.
64 ///
65 /// # Errors
66 ///
67 /// - Returns [Error::ItemPruned] if the item at `position` has been pruned.
68 /// - Returns [Error::ItemOutOfRange] if the item at `position` does not exist.
69 fn read(&self, position: u64) -> impl Future<Output = Result<Self::Item, Error>> + Send;
70}
71
72/// A [Contiguous] journal that supports appending, rewinding, and pruning.
73pub trait MutableContiguous: Contiguous + Send + Sync {
74 /// Append a new item to the journal, returning its position.
75 ///
76 /// Positions are consecutively increasing starting from 0. The position of each item
77 /// is stable across pruning (i.e., if item X has position 5, it will always have
78 /// position 5 even if earlier items are pruned).
79 ///
80 /// # Errors
81 ///
82 /// Returns an error if the underlying storage operation fails or if the item cannot
83 /// be encoded.
84 fn append(
85 &mut self,
86 item: Self::Item,
87 ) -> impl std::future::Future<Output = Result<u64, Error>> + Send;
88
89 /// Prune items at positions strictly less than `min_position`.
90 ///
91 /// Returns `true` if any data was pruned, `false` otherwise.
92 ///
93 /// # Behavior
94 ///
95 /// - If `min_position > size()`, the prune is capped to `size()` (no error is returned)
96 /// - Some items with positions less than `min_position` may be retained due to
97 /// section/blob alignment
98 /// - This operation is not atomic, but implementations guarantee the journal is left in a
99 /// recoverable state if a crash occurs during pruning
100 ///
101 /// # Errors
102 ///
103 /// Returns an error if the underlying storage operation fails.
104 fn prune(
105 &mut self,
106 min_position: u64,
107 ) -> impl std::future::Future<Output = Result<bool, Error>> + Send;
108
109 /// Rewind the journal to the given size, discarding items from the end.
110 ///
111 /// After rewinding to size N, the journal will contain exactly N items (positions 0 to N-1),
112 /// and the next append will receive position N.
113 ///
114 /// # Behavior
115 ///
116 /// - If `size > current_size()`, returns [Error::InvalidRewind]
117 /// - If `size == current_size()`, this is a no-op
118 /// - If `size < oldest_retained_pos()`, returns [Error::InvalidRewind] (can't rewind to pruned
119 /// data)
120 /// - This operation is not atomic, but implementations guarantee the journal is left in a
121 /// recoverable state if a crash occurs during rewinding
122 ///
123 /// # Warnings
124 ///
125 /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
126 ///
127 /// # Errors
128 ///
129 /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
130 /// Returns an error if the underlying storage operation fails.
131 fn rewind(&mut self, size: u64) -> impl std::future::Future<Output = Result<(), Error>> + Send;
132
133 /// Rewinds the journal to the last item matching `predicate`. If no item matches, the journal
134 /// is rewound to the pruning boundary, discarding all unpruned items.
135 ///
136 /// # Warnings
137 ///
138 /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
139 fn rewind_to<'a, P>(
140 &'a mut self,
141 mut predicate: P,
142 ) -> impl std::future::Future<Output = Result<u64, Error>> + Send + 'a
143 where
144 P: FnMut(&Self::Item) -> bool + Send + 'a,
145 {
146 async move {
147 let journal_size = self.size();
148 let pruning_boundary = self.pruning_boundary();
149 let mut rewind_size = journal_size;
150
151 while rewind_size > pruning_boundary {
152 let item = self.read(rewind_size - 1).await?;
153 if predicate(&item) {
154 break;
155 }
156 rewind_size -= 1;
157 }
158
159 if rewind_size != journal_size {
160 let rewound_items = journal_size - rewind_size;
161 warn!(journal_size, rewound_items, "rewinding journal items");
162 self.rewind(rewind_size).await?;
163 }
164
165 Ok(rewind_size)
166 }
167 }
168}