commonware_storage/journal/
fixed.rs

1//! An append-only log for storing fixed length items on disk.
2//!
3//! In addition to replay, stored items can be fetched directly by their `position` in the journal,
4//! where position is defined as the item's order of insertion starting from 0, unaffected by
5//! pruning.
6//!
7//! _See the [variable crate](crate::journal::variable) for a journal that supports variable length
8//! items._
9//!
10//! # Format
11//!
12//! Data stored in a `fixed::Journal` is persisted in one of many Blobs within a caller-provided
13//! `partition`. Each `Blob` contains a configurable maximum of `items_per_blob`, with each item
14//! followed by its checksum (CRC32):
15//!
16//! ```text
17//! +--------+-----------+--------+-----------+--------+----------+-------------+
18//! | item_0 | C(Item_0) | item_1 | C(Item_1) |   ...  | item_n-1 | C(Item_n-1) |
19//! +--------+-----------+--------+----0------+--------+----------+-------------+
20//!
21//! n = config.items_per_blob, C = CRC32
22//! ```
23//!
24//! The most recent blob may not necessarily be full, in which case it will contain fewer than the
25//! maximum number of items.
26//!
27//! A fetched or replayed item's checksum is always computed and checked against the stored value
28//! before it is returned. If the checksums do not match, an error is returned instead.
29//!
30//! # Open Blobs
31//!
32//! All `Blobs` in a given `partition` are kept open during the lifetime of `Journal`. You can limit
33//! the number of open blobs by using a higher number of `items_per_blob` or pruning old items.
34//!
35//! # Sync
36//!
37//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
38//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
39//! calling `close`, all pending data is automatically synced and any open blobs are closed.
40//!
41//! # Pruning
42//!
43//! Old data can be pruned from `Journal` by calling the `prune` method which will remove all blobs
44//! consisting entirely of values older than the given item.
45//!
46//! # Replay
47//!
48//! The `replay` method iterates over multiple blobs concurrently to support fast reading of all
49//! unpruned items into memory.
50
51use super::Error;
52use bytes::BufMut;
53use commonware_cryptography::Array;
54use commonware_runtime::{Blob, Error as RError, Storage};
55use commonware_utils::{hex, SizedSerialize};
56use futures::stream::{self, Stream, StreamExt};
57use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
58use prometheus_client::registry::Registry;
59use std::collections::BTreeMap;
60use std::marker::PhantomData;
61use std::sync::{Arc, Mutex};
62use tracing::{debug, trace, warn};
63
64/// Configuration for `Journal` storage.
65#[derive(Clone)]
66pub struct Config {
67    /// Registry for metrics.
68    pub registry: Arc<Mutex<Registry>>,
69
70    /// The `commonware-runtime::Storage` partition to use for storing journal blobs.
71    pub partition: String,
72
73    /// The maximum number of journal items to store in each blob.
74    ///
75    /// Any unpruned historical blobs will contain exactly this number of items.
76    /// Only the newest blob may contain fewer items.
77    pub items_per_blob: u64,
78}
79
80/// Implementation of `Journal` storage.
81pub struct Journal<B: Blob, E: Storage<B>, A: Array> {
82    runtime: E,
83    cfg: Config,
84
85    // Blobs are stored in a BTreeMap to ensure they are always iterated in order of their indices.
86    // Indices are consecutive and without gaps.
87    blobs: BTreeMap<u64, B>,
88
89    tracked: Gauge,
90    synced: Counter,
91    pruned: Counter,
92
93    _array: PhantomData<A>,
94}
95
96impl<B: Blob, E: Storage<B>, A: Array> Journal<B, E, A> {
97    const CHUNK_SIZE: usize = u32::SERIALIZED_LEN + A::SERIALIZED_LEN;
98
99    /// Initialize a new `Journal` instance.
100    ///
101    /// All backing blobs are opened but not read during initialization. The `replay` method can be
102    /// used to iterate over all items in the `Journal`.
103    pub async fn init(runtime: E, cfg: Config) -> Result<Self, Error> {
104        // Iterate over blobs in partition
105        let mut blobs = BTreeMap::new();
106        let stored_blobs = match runtime.scan(&cfg.partition).await {
107            Ok(blobs) => blobs,
108            Err(RError::PartitionMissing(_)) => Vec::new(),
109            Err(err) => return Err(Error::Runtime(err)),
110        };
111        for name in stored_blobs {
112            let blob = runtime
113                .open(&cfg.partition, &name)
114                .await
115                .map_err(Error::Runtime)?;
116            let index = match name.try_into() {
117                Ok(index) => u64::from_be_bytes(index),
118                Err(nm) => return Err(Error::InvalidBlobName(hex(&nm))),
119            };
120            debug!(blob = index, "loaded blob");
121            blobs.insert(index, blob);
122        }
123        if !blobs.is_empty() {
124            // Check that there are no gaps in the blob numbering, which would indicate missing data.
125            let mut it = blobs.keys();
126            let mut previous_index = *it.next().unwrap();
127            for index in it {
128                if *index != previous_index + 1 {
129                    return Err(Error::MissingBlob(previous_index + 1));
130                }
131                previous_index = *index;
132            }
133        } else {
134            debug!("no blobs found");
135            let blob = runtime.open(&cfg.partition, &0u64.to_be_bytes()).await?;
136            blobs.insert(0, blob);
137        }
138
139        // Initialize metrics
140        let tracked = Gauge::default();
141        let synced = Counter::default();
142        let pruned = Counter::default();
143        {
144            let mut registry = cfg.registry.lock().unwrap();
145            registry.register("tracked", "Number of blobs", tracked.clone());
146            registry.register("synced", "Number of syncs", synced.clone());
147            registry.register("pruned", "Number of blobs pruned", pruned.clone());
148        }
149        tracked.set(blobs.len() as i64);
150
151        // truncate the last blob if it's not the expected length, which might happen from unclean
152        // shutdown.
153        let newest_blob = blobs.last_key_value().unwrap().1;
154        let blob_len: u64 = newest_blob.len().await?;
155        if blob_len % Self::CHUNK_SIZE as u64 != 0 {
156            warn!(
157                "last blob len ({}) is not a multiple of item size, truncating",
158                blob_len
159            );
160            newest_blob
161                .truncate(blob_len - blob_len % Self::CHUNK_SIZE as u64)
162                .await?;
163            newest_blob.sync().await?;
164        }
165
166        Ok(Self {
167            runtime,
168            cfg,
169            blobs,
170            tracked,
171            synced,
172            pruned,
173
174            _array: PhantomData,
175        })
176    }
177
178    /// Sync any pending updates to disk.
179    pub async fn sync(&mut self) -> Result<(), Error> {
180        self.synced.inc();
181        let newest_blob = self.newest_blob();
182        debug!("syncing blob {}", newest_blob.0);
183        self.newest_blob().1.sync().await.map_err(Error::Runtime)
184    }
185
186    /// Returns the total number of items in the journal, ignoring any pruning. The next value
187    /// appended to the journal will be at this position.
188    pub async fn size(&self) -> Result<u64, Error> {
189        let newest_blob = self.newest_blob();
190        let blob_len = newest_blob.1.len().await?;
191        assert_eq!(blob_len % Self::CHUNK_SIZE as u64, 0);
192        let items_in_blob = blob_len / Self::CHUNK_SIZE as u64;
193        Ok(items_in_blob + self.cfg.items_per_blob * newest_blob.0)
194    }
195
196    /// Append a new item to the journal. Return the item's position in the journal, or error if the
197    /// operation fails.
198    pub async fn append(&mut self, item: A) -> Result<u64, Error> {
199        let mut newest_blob = self.newest_blob();
200
201        let mut blob_len = newest_blob.1.len().await?;
202        assert_eq!(blob_len % Self::CHUNK_SIZE as u64, 0);
203        let items_in_blob = blob_len / Self::CHUNK_SIZE as u64;
204
205        // if the blob is full we need to create the next one and use that instead
206        if items_in_blob >= self.cfg.items_per_blob {
207            let next_blob_index = newest_blob.0 + 1;
208            debug!("creating next blob {}", next_blob_index);
209            assert_eq!(items_in_blob, self.cfg.items_per_blob);
210            // always sync the previous blob before creating a new one
211            newest_blob.1.sync().await?;
212            let next_blob = self
213                .runtime
214                .open(&self.cfg.partition, &next_blob_index.to_be_bytes())
215                .await?;
216            assert!(self.blobs.insert(next_blob_index, next_blob).is_none());
217            newest_blob = self.newest_blob();
218            self.tracked.inc();
219            blob_len = 0;
220        }
221
222        let mut buf: Vec<u8> = Vec::with_capacity(Self::CHUNK_SIZE);
223        let checksum = crc32fast::hash(&item);
224        buf.extend_from_slice(&item);
225        buf.put_u32(checksum);
226
227        let item_position = blob_len / Self::CHUNK_SIZE as u64;
228        newest_blob.1.write_at(&buf, blob_len).await?;
229        trace!(
230            blob = newest_blob.0,
231            position = item_position,
232            "appended item"
233        );
234        Ok(item_position + self.cfg.items_per_blob * newest_blob.0)
235    }
236
237    /// Rewind the journal to the given `journal_size`.
238    ///
239    /// The journal is not synced after rewinding.
240    pub async fn rewind(&mut self, journal_size: u64) -> Result<(), Error> {
241        let size = self.size().await?;
242        match journal_size.cmp(&size) {
243            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(journal_size)),
244            std::cmp::Ordering::Equal => return Ok(()),
245            std::cmp::Ordering::Less => {}
246        }
247        let rewind_to_size = journal_size;
248        let rewind_to_blob_index = rewind_to_size / self.cfg.items_per_blob;
249        if rewind_to_blob_index < self.oldest_blob().0 {
250            return Err(Error::ItemPruned(rewind_to_size));
251        }
252
253        let mut current_blob_index = self.newest_blob().0;
254
255        // Remove blobs until we reach the rewind point.
256        while current_blob_index > rewind_to_blob_index {
257            let blob = match self.blobs.remove(&current_blob_index) {
258                Some(blob) => blob,
259                None => return Err(Error::MissingBlob(current_blob_index)),
260            };
261            blob.close().await?;
262            self.runtime
263                .remove(&self.cfg.partition, Some(&current_blob_index.to_be_bytes()))
264                .await?;
265            debug!(blob = current_blob_index, "unwound over blob");
266            self.tracked.dec();
267            current_blob_index -= 1;
268        }
269
270        // Truncate the rewind blob to the correct offset.
271        let rewind_blob = match self.blobs.get_mut(&rewind_to_blob_index) {
272            Some(blob) => blob,
273            None => return Err(Error::MissingBlob(rewind_to_blob_index)),
274        };
275        let rewind_to_offset = (rewind_to_size % self.cfg.items_per_blob) * Self::CHUNK_SIZE as u64;
276        rewind_blob.truncate(rewind_to_offset).await?;
277        Ok(())
278    }
279
280    /// Read the item at the given position in the journal.
281    pub async fn read(&self, item_position: u64) -> Result<A, Error> {
282        let blob_index = item_position / self.cfg.items_per_blob;
283
284        let blob = match self.blobs.get(&blob_index) {
285            Some(blob) => blob,
286            None => {
287                let newest_blob = self.newest_blob();
288                if blob_index > newest_blob.0 {
289                    return Err(Error::InvalidItem(item_position));
290                }
291                assert!(blob_index < self.oldest_blob().0);
292                return Err(Error::ItemPruned(item_position));
293            }
294        };
295
296        let item_index = item_position % self.cfg.items_per_blob;
297        let offset = item_index * Self::CHUNK_SIZE as u64;
298        let mut buf = vec![0u8; Self::CHUNK_SIZE];
299        blob.read_at(&mut buf, offset).await?;
300
301        // Verify integrity
302        Self::verify_integrity(&buf)
303    }
304
305    fn verify_integrity(buf: &[u8]) -> Result<A, Error> {
306        let stored_checksum = u32::from_be_bytes(buf[A::SERIALIZED_LEN..].try_into().unwrap());
307        let checksum = crc32fast::hash(&buf[..A::SERIALIZED_LEN]);
308        if checksum != stored_checksum {
309            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
310        }
311        Ok(buf[..A::SERIALIZED_LEN].try_into().unwrap())
312    }
313
314    /// Returns an unordered stream of all items in the journal.
315    ///
316    /// # Integrity
317    ///
318    /// If any corrupted data is found, the stream will return an error.
319    ///
320    /// # Concurrency
321    ///
322    /// The `concurrency` parameter controls how many blobs are replayed concurrently. This can
323    /// dramatically speed up the replay process if the underlying storage supports concurrent reads
324    /// across different blobs.
325    pub async fn replay(
326        &mut self,
327        concurrency: usize,
328    ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
329        // Collect all blobs to replay
330        let mut blobs = Vec::with_capacity(self.blobs.len());
331        for (index, blob) in self.blobs.iter() {
332            let blob_len = {
333                if *index == (self.blobs.len() - 1) as u64 {
334                    blob.len().await?
335                } else {
336                    self.cfg.items_per_blob * Self::CHUNK_SIZE as u64
337                }
338            };
339            blobs.push((index, blob, blob_len));
340        }
341
342        // Replay all blobs concurrently and stream items as they are read (to avoid occupying too
343        // much memory with buffered data)
344        let items_per_blob = self.cfg.items_per_blob;
345        Ok(stream::iter(blobs)
346            .map(move |(index, blob, blob_len)| async move {
347                stream::unfold(
348                    (index, blob, 0u64),
349                    move |(index, blob, offset)| async move {
350                        // Check if we are at the end of the blob
351                        if offset == blob_len {
352                            return None;
353                        }
354                        // Get next item
355                        let mut buf = vec![0u8; Self::CHUNK_SIZE];
356                        let item = blob.read_at(&mut buf, offset).await.map_err(Error::Runtime);
357                        let next_offset = offset + Self::CHUNK_SIZE as u64;
358                        match item {
359                            Ok(_) => match Self::verify_integrity(&buf) {
360                                Ok(item) => Some((
361                                    Ok((
362                                        items_per_blob * *index + offset / Self::CHUNK_SIZE as u64,
363                                        item,
364                                    )),
365                                    (index, blob, next_offset),
366                                )),
367                                Err(err) => Some((Err(err), (index, blob, next_offset))),
368                            },
369                            Err(err) => Some((Err(err), (index, blob, blob_len))),
370                        }
371                    },
372                )
373            })
374            .buffer_unordered(concurrency)
375            .flatten())
376    }
377
378    /// Return the blob containing the most recent items and its index.
379    fn newest_blob(&self) -> (u64, &B) {
380        if let Some((index, blob)) = self.blobs.last_key_value() {
381            return (*index, blob);
382        }
383        panic!("no blobs found");
384    }
385
386    /// Return the blob containing the oldest unpruned items and its index.
387    fn oldest_blob(&self) -> (u64, &B) {
388        if let Some((index, blob)) = self.blobs.first_key_value() {
389            return (*index, blob);
390        }
391        panic!("no blobs found");
392    }
393
394    /// Allow the journal to prune items older than `min_item_position`. The journal may still
395    /// retain some of these items, for example if they are part of the most recent blob.
396    pub async fn prune(&mut self, min_item_position: u64) -> Result<(), Error> {
397        let oldest_blob = self.oldest_blob().0;
398        let mut new_oldest_blob = min_item_position / self.cfg.items_per_blob;
399        if new_oldest_blob <= oldest_blob {
400            // nothing to prune
401            return Ok(());
402        }
403        // Make sure we never prune the most recent blob
404        let newest_blob = self.newest_blob();
405        if new_oldest_blob >= newest_blob.0 {
406            new_oldest_blob = newest_blob.0
407        }
408
409        for index in oldest_blob..new_oldest_blob {
410            let blob = self.blobs.remove(&index).unwrap();
411            // Close the blob and remove it from storage
412            blob.close().await?;
413            self.runtime
414                .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
415                .await?;
416            debug!(blob = index, "pruned blob");
417            self.pruned.inc();
418            self.tracked.dec();
419        }
420        Ok(())
421    }
422
423    /// Close the journal
424    pub async fn close(self) -> Result<(), Error> {
425        for (i, blob) in self.blobs.into_iter() {
426            blob.close().await?;
427            debug!(blob = i, "closed blob");
428        }
429        Ok(())
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436    use commonware_cryptography::{hash, sha256::Digest};
437    use commonware_macros::test_traced;
438    use commonware_runtime::{deterministic::Executor, Blob, Runner, Storage};
439    use futures::{pin_mut, StreamExt};
440    use prometheus_client::encoding::text::encode;
441
442    /// Generate a SHA-256 digest for the given value.
443    fn test_digest(value: u64) -> Digest {
444        hash(&value.to_be_bytes())
445    }
446
447    #[test_traced]
448    fn test_fixed_journal_append_and_prune() {
449        // Initialize the deterministic runtime
450        let (executor, context, _) = Executor::default();
451
452        // Start the test within the executor
453        executor.start(async move {
454            // Initialize the journal, allowing a max of 2 items per blob.
455            let cfg = Config {
456                registry: Arc::new(Mutex::new(Registry::default())),
457                partition: "test_partition".into(),
458                items_per_blob: 2,
459            };
460            let mut journal = Journal::init(context.clone(), cfg.clone())
461                .await
462                .expect("failed to initialize journal");
463
464            let mut buffer = String::new();
465            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
466            assert!(buffer.contains("tracked 1"));
467
468            // Append an item to the journal
469            let mut position = journal
470                .append(test_digest(0))
471                .await
472                .expect("failed to append data 0");
473            assert_eq!(position, 0);
474
475            // Close the journal
476            journal.close().await.expect("Failed to close journal");
477
478            // Re-initialize the journal to simulate a restart
479            let cfg = Config {
480                registry: Arc::new(Mutex::new(Registry::default())),
481                partition: "test_partition".into(),
482                items_per_blob: 2,
483            };
484            let mut journal = Journal::init(context, cfg.clone())
485                .await
486                .expect("failed to re-initialize journal");
487
488            // Append two more items to the journal to trigger a new blob creation
489            position = journal
490                .append(test_digest(1))
491                .await
492                .expect("failed to append data 1");
493            assert_eq!(position, 1);
494            position = journal
495                .append(test_digest(2))
496                .await
497                .expect("failed to append data 2");
498            assert_eq!(position, 2);
499            let mut buffer = String::new();
500            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
501            assert!(buffer.contains("tracked 2"));
502
503            // Read the items back
504            let item0 = journal.read(0).await.expect("failed to read data 0");
505            assert_eq!(item0, test_digest(0));
506            let item1 = journal.read(1).await.expect("failed to read data 1");
507            assert_eq!(item1, test_digest(1));
508            let item2 = journal.read(2).await.expect("failed to read data 2");
509            assert_eq!(item2, test_digest(2));
510            let err = journal.read(3).await.expect_err("expected read to fail");
511            assert!(matches!(err, Error::Runtime(_)));
512            let err = journal.read(400).await.expect_err("expected read to fail");
513            assert!(matches!(err, Error::InvalidItem(x) if x == 400));
514
515            // Sync the journal
516            journal.sync().await.expect("failed to sync journal");
517            let mut buffer = String::new();
518            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
519            assert!(buffer.contains("synced_total 1"));
520
521            // Prune the journal -- this should be a no-op because there's no complete blob covered
522            journal.prune(1).await.expect("failed to prune journal 1");
523            let mut buffer = String::new();
524            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
525            assert!(buffer.contains("tracked 2"));
526
527            // Prune again this time make sure 1 blob is pruned
528            journal.prune(2).await.expect("failed to prune journal 2");
529            let mut buffer = String::new();
530            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
531            assert!(buffer.contains("tracked 1"));
532            assert!(buffer.contains("pruned_total 1"));
533
534            // Reading from the first blob should fail
535            let result0 = journal.read(0).await;
536            assert!(matches!(result0, Err(Error::ItemPruned(0))));
537            let result1 = journal.read(1).await;
538            assert!(matches!(result1, Err(Error::ItemPruned(1))));
539
540            // Third item should still be readable
541            let result2 = journal.read(2).await.unwrap();
542            assert_eq!(result2, test_digest(2));
543
544            // Should be able to continue to append items
545            for i in 3..10 {
546                let position = journal
547                    .append(test_digest(i))
548                    .await
549                    .expect("failed to append data");
550                assert_eq!(position, i);
551            }
552
553            // Check no-op pruning
554            journal
555                .prune(0)
556                .await
557                .expect("failed to no-op prune the journal");
558            assert_eq!(journal.oldest_blob().0, 1);
559            assert_eq!(journal.newest_blob().0, 4);
560
561            // Prune first 3 blobs
562            journal
563                .prune(3 * cfg.items_per_blob)
564                .await
565                .expect("failed to prune journal 2");
566            let mut buffer = String::new();
567            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
568            assert_eq!(journal.oldest_blob().0, 3);
569            assert_eq!(journal.newest_blob().0, 4);
570            assert!(buffer.contains("tracked 2"));
571            assert!(buffer.contains("pruned_total 3"));
572
573            // Try pruning (more than) everything in the journal, which should leave only the most
574            // recent blob.
575            journal
576                .prune(10000)
577                .await
578                .expect("failed to max-prune journal");
579            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
580            assert_eq!(journal.size().await.unwrap(), 10);
581            assert_eq!(journal.oldest_blob().0, 4);
582            assert_eq!(journal.newest_blob().0, 4);
583            assert!(buffer.contains("tracked 1"));
584            assert!(buffer.contains("pruned_total 4"));
585
586            let stream = journal.replay(1).await.expect("failed to replay journal");
587            pin_mut!(stream);
588            let mut items = Vec::new();
589            while let Some(result) = stream.next().await {
590                match result {
591                    Ok((position, item)) => {
592                        assert_eq!(test_digest(position), item);
593                        items.push(position);
594                    }
595                    Err(err) => panic!("Failed to read item: {}", err),
596                }
597            }
598            assert_eq!(items, vec![8u64, 9u64]);
599        });
600    }
601
602    #[test_traced]
603    fn test_fixed_journal_replay() {
604        const ITEMS_PER_BLOB: u64 = 7;
605        // Initialize the deterministic runtime
606        let (executor, context, _) = Executor::default();
607
608        // Start the test within the executor
609        executor.start(async move {
610            // Initialize the journal, allowing a max of 7 items per blob.
611            let cfg = Config {
612                registry: Arc::new(Mutex::new(Registry::default())),
613                partition: "test_partition".into(),
614                items_per_blob: ITEMS_PER_BLOB,
615            };
616            let mut journal = Journal::init(context.clone(), cfg.clone())
617                .await
618                .expect("failed to initialize journal");
619
620            // Append many items, filling 100 blobs and part of the 101st
621            for i in 0u64..(ITEMS_PER_BLOB * 100 + ITEMS_PER_BLOB / 2) {
622                let position = journal
623                    .append(test_digest(i))
624                    .await
625                    .expect("failed to append data");
626                assert_eq!(position, i);
627            }
628
629            let mut buffer = String::new();
630            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
631            assert!(buffer.contains("tracked 101"));
632
633            // Replay should return all items
634            {
635                let stream = journal.replay(10).await.expect("failed to replay journal");
636                let mut items = Vec::new();
637                pin_mut!(stream);
638                while let Some(result) = stream.next().await {
639                    match result {
640                        Ok((position, item)) => {
641                            assert_eq!(test_digest(position), item);
642                            items.push(position);
643                        }
644                        Err(err) => panic!("Failed to read item: {}", err),
645                    }
646                }
647
648                // Make sure all items were replayed
649                assert_eq!(
650                    items.len(),
651                    ITEMS_PER_BLOB as usize * 100 + ITEMS_PER_BLOB as usize / 2
652                );
653                items.sort();
654                for (i, position) in items.iter().enumerate() {
655                    assert_eq!(i as u64, *position);
656                }
657            }
658            journal.close().await.expect("Failed to close journal");
659
660            // Corrupt one of the checksums and make sure it's detected.
661            let checksum_offset = Digest::SERIALIZED_LEN as u64
662                + (ITEMS_PER_BLOB / 2) * (Digest::SERIALIZED_LEN + u32::SERIALIZED_LEN) as u64;
663            let blob = context
664                .open(&cfg.partition, &40u64.to_be_bytes())
665                .await
666                .expect("Failed to open blob");
667            // Write incorrect checksum
668            let bad_checksum = 123456789u32;
669            blob.write_at(&bad_checksum.to_be_bytes(), checksum_offset)
670                .await
671                .expect("Failed to write incorrect checksum");
672            let corrupted_item_position = 40 * ITEMS_PER_BLOB + ITEMS_PER_BLOB / 2;
673            blob.close().await.expect("Failed to close blob");
674
675            // Re-initialize the journal to simulate a restart
676            let mut journal = Journal::init(context.clone(), cfg.clone())
677                .await
678                .expect("Failed to re-initialize journal");
679            let err = journal.read(corrupted_item_position).await.unwrap_err();
680            assert!(matches!(err, Error::ChecksumMismatch(x, _) if x == bad_checksum));
681
682            // Replay all items, making sure the checksum mismatch error is handled correctly
683            {
684                let stream = journal.replay(10).await.expect("failed to replay journal");
685                let mut items = Vec::new();
686                pin_mut!(stream);
687                let mut error_count = 0;
688                while let Some(result) = stream.next().await {
689                    match result {
690                        Ok((position, item)) => {
691                            assert_eq!(test_digest(position), item);
692                            items.push(position);
693                        }
694                        Err(err) => {
695                            error_count += 1;
696                            assert!(matches!(err, Error::ChecksumMismatch(_, _)));
697                        }
698                    }
699                }
700                assert_eq!(error_count, 1);
701                // Result will be missing only the one corrupted value.
702                assert_eq!(
703                    items.len(),
704                    ITEMS_PER_BLOB as usize * 100 + ITEMS_PER_BLOB as usize / 2 - 1
705                );
706            }
707            journal.close().await.expect("Failed to close journal");
708
709            // Manually truncate one blob to force a partial-read error and make sure it's handled
710            // as expected.
711            let blob = context
712                .open(&cfg.partition, &40u64.to_be_bytes())
713                .await
714                .expect("Failed to open blob");
715            // truncate the blob at the start of the corrupted checksum
716            blob.truncate(checksum_offset)
717                .await
718                .expect("Failed to corrupt blob");
719            blob.close().await.expect("Failed to close blob");
720
721            // Re-initialize the journal to simulate a restart
722            let mut journal = Journal::init(context.clone(), cfg.clone())
723                .await
724                .expect("Failed to re-initialize journal");
725            let err = journal.read(corrupted_item_position).await.unwrap_err();
726            assert!(matches!(err, Error::Runtime(_)));
727
728            // Replay all items, making sure the partial read error is handled correctly
729            {
730                let stream = journal.replay(10).await.expect("failed to replay journal");
731                let mut items = Vec::new();
732                pin_mut!(stream);
733                let mut error_count = 0;
734                while let Some(result) = stream.next().await {
735                    match result {
736                        Ok((position, item)) => {
737                            assert_eq!(test_digest(position), item);
738                            items.push(position);
739                        }
740                        Err(err) => {
741                            error_count += 1;
742                            assert!(matches!(err, Error::Runtime(_)));
743                        }
744                    }
745                }
746                assert_eq!(error_count, 1);
747                // Result will be missing the 4 items following the truncation
748                assert_eq!(
749                    items.len(),
750                    ITEMS_PER_BLOB as usize * 100 + ITEMS_PER_BLOB as usize / 2 - 4
751                );
752            }
753
754            // Delete a blob and make sure the gap is detected
755            context
756                .remove(&cfg.partition, Some(&40u64.to_be_bytes()))
757                .await
758                .expect("Failed to open blob");
759            // Re-initialize the journal to simulate a restart
760            let result = Journal::<_, _, Digest>::init(context.clone(), cfg.clone()).await;
761            assert!(matches!(result.err().unwrap(), Error::MissingBlob(n) if n == 40));
762        });
763    }
764
765    #[test_traced]
766    fn test_fixed_journal_recover_from_partial_write() {
767        // Initialize the deterministic runtime
768        let (executor, context, _) = Executor::default();
769
770        // Start the test within the executor
771        executor.start(async move {
772            // Initialize the journal, allowing a max of 2 items per blob.
773            let cfg = Config {
774                registry: Arc::new(Mutex::new(Registry::default())),
775                partition: "test_partition".into(),
776                items_per_blob: 2,
777            };
778            let mut journal = Journal::init(context.clone(), cfg.clone())
779                .await
780                .expect("failed to initialize journal");
781            for i in 0..4 {
782                journal
783                    .append(test_digest(i))
784                    .await
785                    .expect("failed to append data");
786            }
787            assert_eq!(journal.size().await.unwrap(), 4);
788            let mut buffer = String::new();
789            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
790            assert!(buffer.contains("tracked 2"));
791            journal.close().await.expect("Failed to close journal");
792
793            // Manually truncate most recent blob to simulate a partial write.
794            let blob = context
795                .open(&cfg.partition, &1u64.to_be_bytes())
796                .await
797                .expect("Failed to open blob");
798            let blob_len = blob.len().await.expect("Failed to get blob length");
799            // truncate the most recent blob by 1 byte which corrupts the most recent item
800            blob.truncate(blob_len - 1)
801                .await
802                .expect("Failed to corrupt blob");
803            blob.close().await.expect("Failed to close blob");
804
805            // Re-initialize the journal to simulate a restart
806            let journal = Journal::<_, _, Digest>::init(context.clone(), cfg.clone())
807                .await
808                .expect("Failed to re-initialize journal");
809            // the last corrupted item should get discarded
810            assert_eq!(journal.size().await.unwrap(), 3);
811            let mut buffer = String::new();
812            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
813            assert!(buffer.contains("tracked 2"));
814        });
815    }
816
817    #[test_traced]
818    fn test_fixed_journal_rewinding() {
819        let (executor, context, _) = Executor::default();
820        executor.start(async move {
821            // Initialize the journal, allowing a max of 2 items per blob.
822            let cfg = Config {
823                registry: Arc::new(Mutex::new(Registry::default())),
824                partition: "test_partition".into(),
825                items_per_blob: 2,
826            };
827            let mut journal = Journal::init(context.clone(), cfg.clone())
828                .await
829                .expect("failed to initialize journal");
830            assert!(matches!(journal.rewind(0).await, Ok(())));
831            assert!(matches!(
832                journal.rewind(1).await,
833                Err(Error::InvalidRewind(1))
834            ));
835            let mut buffer = String::new();
836
837            // Append an item to the journal
838            journal
839                .append(test_digest(0))
840                .await
841                .expect("failed to append data 0");
842            assert_eq!(journal.size().await.unwrap(), 1);
843            assert!(matches!(journal.rewind(1).await, Ok(()))); // should be no-op
844            assert!(matches!(journal.rewind(0).await, Ok(())));
845            assert_eq!(journal.size().await.unwrap(), 0);
846
847            // append 7 items
848            for i in 0..7 {
849                let pos = journal
850                    .append(test_digest(i))
851                    .await
852                    .expect("failed to append data");
853                assert_eq!(pos, i);
854            }
855            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
856            assert!(buffer.contains("tracked 4"));
857            assert_eq!(journal.size().await.unwrap(), 7);
858
859            // rewind back to item #4 and ensure a blob is rewound over
860            assert!(matches!(journal.rewind(4).await, Ok(())));
861            assert_eq!(journal.size().await.unwrap(), 4);
862            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
863            assert!(buffer.contains("tracked 3"));
864
865            // rewind back to empty and ensure all blobs are rewound over
866            assert!(matches!(journal.rewind(0).await, Ok(())));
867            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
868            assert!(buffer.contains("tracked 1"));
869            assert_eq!(journal.size().await.unwrap(), 0);
870
871            // stress test: add 100 items, rewind 49, repeat x10.
872            for _ in 0..10 {
873                for i in 0..100 {
874                    journal
875                        .append(test_digest(i))
876                        .await
877                        .expect("failed to append data");
878                }
879                journal
880                    .rewind(journal.size().await.unwrap() - 49)
881                    .await
882                    .unwrap();
883            }
884            const ITEMS_REMAINING: u64 = 10 * (100 - 49);
885            assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
886
887            journal.close().await.expect("Failed to close journal");
888
889            // Repeat with a different blob size (3 items per blob)
890            let cfg = Config {
891                registry: Arc::new(Mutex::new(Registry::default())),
892                partition: "test_partition_2".into(),
893                items_per_blob: 3,
894            };
895            let mut journal = Journal::init(context.clone(), cfg.clone())
896                .await
897                .expect("failed to initialize journal");
898            for _ in 0..10 {
899                for i in 0..100 {
900                    journal
901                        .append(test_digest(i))
902                        .await
903                        .expect("failed to append data");
904                }
905                journal
906                    .rewind(journal.size().await.unwrap() - 49)
907                    .await
908                    .unwrap();
909            }
910            assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
911
912            journal.close().await.expect("Failed to close journal");
913
914            // Make sure re-opened journal is as expected
915            let mut journal: Journal<_, _, Digest> = Journal::init(context.clone(), cfg.clone())
916                .await
917                .expect("failed to re-initialize journal");
918            assert_eq!(journal.size().await.unwrap(), 10 * (100 - 49));
919
920            // Make sure rewinding works after pruning
921            journal.prune(300).await.expect("failed to prune journal");
922            assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
923            assert!(matches!(
924                journal.rewind(299).await,
925                Err(Error::ItemPruned(299))
926            ));
927            assert!(matches!(journal.rewind(301).await, Ok(())));
928            assert_eq!(journal.size().await.unwrap(), 301);
929            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
930            assert!(buffer.contains("tracked 1"));
931            assert!(matches!(journal.rewind(300).await, Ok(())));
932            assert!(matches!(
933                journal.rewind(299).await,
934                Err(Error::ItemPruned(299))
935            ));
936        });
937    }
938}