commonware_storage/journal/contiguous/mod.rs
1//! Contiguous journals with position-based access.
2//!
3//! This module provides position-based journal implementations where items are stored
4//! contiguously and can be accessed by their position (0-indexed). Both [fixed]-size and
5//! [variable]-size item journals are supported.
6
7use super::Error;
8use futures::Stream;
9use std::{future::Future, num::NonZeroUsize, ops::Range};
10use tracing::warn;
11
12pub mod fixed;
13pub mod variable;
14
15#[cfg(test)]
16mod tests;
17
18/// A reader guard that holds a consistent view of the journal.
19///
20/// While this guard exists, operations that may modify the bounds (such as `append`, `prune`, and
21/// `rewind`) will block until the guard is dropped. This keeps bounds stable, so any position
22/// within `bounds()` is guaranteed readable.
23//
24// TODO(<https://github.com/commonwarexyz/monorepo/issues/3084>): Relax locking to allow `append`
25// since it doesn't invalidate reads within the cached bounds.
26pub trait Reader: Send + Sync {
27 /// The type of items stored in the journal.
28 type Item;
29
30 /// Returns [start, end) with a guaranteed stable pruning boundary.
31 fn bounds(&self) -> Range<u64>;
32
33 /// Read the item at the given position.
34 ///
35 /// Guaranteed not to return [Error::ItemPruned] for positions within `bounds()`.
36 fn read(&self, position: u64) -> impl Future<Output = Result<Self::Item, Error>> + Send;
37
38 /// Read an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
39 ///
40 /// Default implementation always returns `None`.
41 fn try_read_sync(&self, _position: u64) -> Option<Self::Item> {
42 None
43 }
44
45 /// Return a stream of all items starting from `start_pos`.
46 ///
47 /// Because the reader holds the lock, validation and stream setup happen
48 /// atomically with respect to `prune()`.
49 fn replay(
50 &self,
51 buffer: NonZeroUsize,
52 start_pos: u64,
53 ) -> impl Future<
54 Output = Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + Send, Error>,
55 > + Send;
56}
57
58/// Journals that support sequential append operations.
59///
60/// Maintains a monotonically increasing position counter where each appended item receives a unique
61/// position starting from 0.
62pub trait Contiguous: Send + Sync {
63 /// The type of items stored in the journal.
64 type Item;
65
66 /// Acquire a reader guard that holds a consistent view of the journal.
67 ///
68 /// While the returned guard exists, operations that need the journal's
69 /// internal write lock (such as `append`, `prune`, and `rewind`) may block
70 /// until the guard is dropped. This ensures any position within
71 /// `reader.bounds()` remains readable.
72 fn reader(&self) -> impl Future<Output = impl Reader<Item = Self::Item> + '_> + Send;
73
74 /// Return the total number of items that have been appended to the journal.
75 ///
76 /// This count is NOT affected by pruning. The next appended item will receive this
77 /// position as its value. Equivalent to [`Reader::bounds`]`.end`.
78 fn size(&self) -> impl Future<Output = u64> + Send;
79}
80
81/// Items to append via [`Mutable::append_many`].
82///
83/// `Flat` wraps a single contiguous slice; `Nested` wraps multiple slices that are
84/// appended in order under a single lock acquisition.
85pub enum Many<'a, T> {
86 /// A single contiguous slice of items.
87 Flat(&'a [T]),
88 /// Multiple slices of items, appended in order.
89 Nested(&'a [&'a [T]]),
90}
91
92impl<T> Many<'_, T> {
93 /// Returns `true` if there are no items across all segments.
94 pub fn is_empty(&self) -> bool {
95 match self {
96 Self::Flat(items) => items.is_empty(),
97 Self::Nested(nested_items) => nested_items.iter().all(|items| items.is_empty()),
98 }
99 }
100}
101
102/// A [Contiguous] journal that supports appending, rewinding, and pruning.
103pub trait Mutable: Contiguous + Send + Sync {
104 /// Append a new item to the journal, returning its position.
105 ///
106 /// Positions are consecutively increasing starting from 0. The position of each item
107 /// is stable across pruning (i.e., if item X has position 5, it will always have
108 /// position 5 even if earlier items are pruned).
109 ///
110 /// # Errors
111 ///
112 /// Returns an error if the underlying storage operation fails or if the item cannot
113 /// be encoded.
114 fn append(
115 &mut self,
116 item: &Self::Item,
117 ) -> impl std::future::Future<Output = Result<u64, Error>> + Send;
118
119 /// Append items to the journal, returning the position of the last item appended.
120 ///
121 /// The default implementation calls [Self::append] in a loop. Concrete implementations
122 /// may override this to acquire the write lock once for all items.
123 ///
124 /// Returns [Error::EmptyAppend] if items is empty.
125 fn append_many<'a>(
126 &'a mut self,
127 items: Many<'a, Self::Item>,
128 ) -> impl std::future::Future<Output = Result<u64, Error>> + Send + 'a
129 where
130 Self::Item: Sync,
131 {
132 async move {
133 if items.is_empty() {
134 return Err(Error::EmptyAppend);
135 }
136 let mut last_pos = self.size().await;
137 match items {
138 Many::Flat(items) => {
139 for item in items {
140 last_pos = self.append(item).await?;
141 }
142 }
143 Many::Nested(nested_items) => {
144 for items in nested_items {
145 for item in *items {
146 last_pos = self.append(item).await?;
147 }
148 }
149 }
150 }
151 Ok(last_pos)
152 }
153 }
154
155 /// Prune items at positions strictly less than `min_position`.
156 ///
157 /// Returns `true` if any data was pruned, `false` otherwise.
158 ///
159 /// # Behavior
160 ///
161 /// - If `min_position > bounds.end`, the prune is capped to `bounds.end` (no error is returned)
162 /// - Some items with positions less than `min_position` may be retained due to
163 /// section/blob alignment
164 /// - This operation is not atomic, but implementations guarantee the journal is left in a
165 /// recoverable state if a crash occurs during pruning
166 ///
167 /// # Errors
168 ///
169 /// Returns an error if the underlying storage operation fails.
170 fn prune(
171 &mut self,
172 min_position: u64,
173 ) -> impl std::future::Future<Output = Result<bool, Error>> + Send;
174
175 /// Rewind the journal to the given size, discarding items from the end.
176 ///
177 /// After rewinding to size N, the journal will contain exactly N items (positions 0 to N-1),
178 /// and the next append will receive position N.
179 ///
180 /// # Behavior
181 ///
182 /// - If `size > bounds.end`, returns [Error::InvalidRewind]
183 /// - If `size == bounds.end`, this is a no-op
184 /// - If `size < bounds.start`, returns [Error::ItemPruned] (can't rewind to pruned data)
185 /// - This operation is not atomic, but implementations guarantee the journal is left in a
186 /// recoverable state if a crash occurs during rewinding
187 ///
188 /// # Warnings
189 ///
190 /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
191 ///
192 /// # Errors
193 ///
194 /// Returns [Error::InvalidRewind] if size is invalid (too large or points to pruned data).
195 /// Returns an error if the underlying storage operation fails.
196 fn rewind(&mut self, size: u64) -> impl std::future::Future<Output = Result<(), Error>> + Send;
197
198 /// Rewinds the journal to the last item matching `predicate`. If no item matches, the journal
199 /// is rewound to the pruning boundary, discarding all unpruned items.
200 ///
201 /// # Warnings
202 ///
203 /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called.
204 fn rewind_to<'a, P>(
205 &'a mut self,
206 mut predicate: P,
207 ) -> impl std::future::Future<Output = Result<u64, Error>> + Send + 'a
208 where
209 P: FnMut(&Self::Item) -> bool + Send + 'a,
210 {
211 async move {
212 let (bounds, rewind_size) = {
213 let reader = self.reader().await;
214 let bounds = reader.bounds();
215 let mut rewind_size = bounds.end;
216
217 while rewind_size > bounds.start {
218 let item = reader.read(rewind_size - 1).await?;
219 if predicate(&item) {
220 break;
221 }
222 rewind_size -= 1;
223 }
224
225 (bounds, rewind_size)
226 };
227
228 if rewind_size != bounds.end {
229 let rewound_items = bounds.end - rewind_size;
230 warn!(
231 journal_size = bounds.end,
232 rewound_items, "rewinding journal items"
233 );
234 self.rewind(rewind_size).await?;
235 }
236
237 Ok(rewind_size)
238 }
239 }
240}