commonware_storage/journal/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `variable::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular `Blob` in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+---+---+---+
15//! | 0 | 1 | 2 | 3 |    ...    | 8 | 9 |10 |11 |
16//! +---+---+---+---+---+---+---+---+---+---+---+
17//! |   Size (u32)  |   Data    |    C(u32)     |
18//! +---+---+---+---+---+---+---+---+---+---+---+
19//!
20//! C = CRC32(Data)
21//! ```
22//!
23//! _To ensure data returned by `Journal` is correct, a checksum (CRC32) is stored at the end of
24//! each item. If the checksum of the read data does not match the stored checksum, an error is
25//! returned. This checksum is only verified when data is accessed and not at startup (which would
26//! require reading all data in `Journal`)._
27//!
28//! # Open Blobs
29//!
30//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
31//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
32//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
33//! `sections`.
34//!
35//! # Offset Alignment
36//!
37//! In practice, `Journal` users won't store `u64::MAX` bytes of data in a given `section` (the max
38//! `Offset` provided by `Blob`). To reduce the memory usage for tracking offsets within `Journal`,
39//! offsets are thus `u32` (4 bytes) and aligned to 16 bytes. This means that the maximum size of
40//! any `section` is `u32::MAX * 17 = ~70GB` bytes (the last offset item can store up to `u32::MAX`
41//! bytes). If more data is written to a `section` past this max, an `OffsetOverflow` error is
42//! returned.
43//!
44//! # Sync
45//!
46//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
47//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
48//! calling `close`, all pending data is automatically synced and any open blobs are closed.
49//!
50//! # Pruning
51//!
52//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
53//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
54//! could be used, for example, by some blockchain application to prune old blocks.
55//!
56//! # Replay
57//!
58//! During application initialization, it is very common to replay data from `Journal` to recover
59//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
60//! method that iterates over multiple `sections` concurrently in a single stream.
61//!
62//! ## Skip Reads
63//!
64//! Some applications may only want to read the first `n` bytes of each item during `replay`. This
65//! can be done by providing a `prefix` parameter to the `replay` method. If `prefix` is provided,
66//! `Journal` will only return the first `prefix` bytes of each item and "skip ahead" to the next
67//! item (computing the offset using the read `size` value).
68//!
69//! _Reading only the `prefix` bytes of an item makes it impossible to compute the checksum of an
70//! item. It is up to the caller to ensure these reads are safe._
71//!
72//! # Exact Reads
73//!
74//! To allow for items to be fetched in a single disk operation, `Journal` allows callers to specify
75//! an `exact` parameter to the `get` method. This `exact` parameter must be cached by the caller
76//! (provided during `replay`) and usage of an incorrect `exact` value will result in undefined
77//! behavior.
78//!
79//! # Example
80//!
81//! ```rust
82//! use commonware_runtime::{Spawner, Runner, deterministic::Executor};
83//! use commonware_storage::journal::variable::{Journal, Config};
84//! use prometheus_client::registry::Registry;
85//! use std::sync::{Arc, Mutex};
86//!
87//! let (executor, context, _) = Executor::default();
88//! executor.start(async move {
89//!     // Create a journal
90//!     let mut journal = Journal::init(context, Config{
91//!         registry: Arc::new(Mutex::new(Registry::default())),
92//!         partition: "partition".to_string()
93//!     }).await.unwrap();
94//!
95//!     // Append data to the journal
96//!     journal.append(1, "data".into()).await.unwrap();
97//!
98//!     // Close the journal
99//!     journal.close().await.unwrap();
100//! });
101//! ```
102
103use super::Error;
104use bytes::{BufMut, Bytes};
105use commonware_runtime::{Blob, Error as RError, Storage};
106use commonware_utils::hex;
107use futures::stream::{self, Stream, StreamExt};
108use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
109use prometheus_client::registry::Registry;
110use std::collections::{btree_map::Entry, BTreeMap};
111use std::sync::{Arc, Mutex};
112use tracing::{debug, trace, warn};
113
114/// Configuration for `Journal` storage.
115#[derive(Clone)]
116pub struct Config {
117    /// Registry for metrics.
118    pub registry: Arc<Mutex<Registry>>,
119
120    /// The `commonware-runtime::Storage` partition to use
121    /// for storing journal blobs.
122    pub partition: String,
123}
124
125const ITEM_ALIGNMENT: u64 = 16;
126
127/// Implementation of `Journal` storage.
128pub struct Journal<B: Blob, E: Storage<B>> {
129    runtime: E,
130    cfg: Config,
131
132    oldest_allowed: Option<u64>,
133
134    blobs: BTreeMap<u64, B>,
135
136    tracked: Gauge,
137    synced: Counter,
138    pruned: Counter,
139}
140
141/// Computes the next offset for an item using the underlying `u64`
142/// offset of `Blob`.
143fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
144    let overage = offset % ITEM_ALIGNMENT;
145    if overage != 0 {
146        offset += ITEM_ALIGNMENT - overage;
147    }
148    let offset = offset / ITEM_ALIGNMENT;
149    let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
150    Ok(aligned_offset)
151}
152
153impl<B: Blob, E: Storage<B>> Journal<B, E> {
154    /// Initialize a new `Journal` instance.
155    ///
156    /// All backing blobs are opened but not read during
157    /// initialization. The `replay` method can be used
158    /// to iterate over all items in the `Journal`.
159    pub async fn init(runtime: E, cfg: Config) -> Result<Self, Error> {
160        // Iterate over blobs in partition
161        let mut blobs = BTreeMap::new();
162        let stored_blobs = match runtime.scan(&cfg.partition).await {
163            Ok(blobs) => blobs,
164            Err(RError::PartitionMissing(_)) => Vec::new(),
165            Err(err) => return Err(Error::Runtime(err)),
166        };
167        for name in stored_blobs {
168            let blob = runtime.open(&cfg.partition, &name).await?;
169            let hex_name = hex(&name);
170            let section = match name.try_into() {
171                Ok(section) => u64::from_be_bytes(section),
172                Err(_) => return Err(Error::InvalidBlobName(hex_name)),
173            };
174            debug!(section, blob = hex_name, "loaded section");
175            blobs.insert(section, blob);
176        }
177
178        // Initialize metrics
179        let tracked = Gauge::default();
180        let synced = Counter::default();
181        let pruned = Counter::default();
182        {
183            let mut registry = cfg.registry.lock().unwrap();
184            registry.register("tracked", "Number of blobs", tracked.clone());
185            registry.register("synced", "Number of syncs", synced.clone());
186            registry.register("pruned", "Number of blobs pruned", pruned.clone());
187        }
188        tracked.set(blobs.len() as i64);
189
190        // Create journal instance
191        Ok(Self {
192            runtime,
193            cfg,
194
195            oldest_allowed: None,
196
197            blobs,
198            tracked,
199            synced,
200            pruned,
201        })
202    }
203
204    /// Ensures that a pruned section is not accessed.
205    fn prune_guard(&self, section: u64, inclusive: bool) -> Result<(), Error> {
206        if let Some(oldest_allowed) = self.oldest_allowed {
207            if section < oldest_allowed || (inclusive && section <= oldest_allowed) {
208                return Err(Error::AlreadyPrunedToSection(oldest_allowed));
209            }
210        }
211        Ok(())
212    }
213
214    /// Reads an item from the blob at the given offset.
215    async fn read(blob: &B, offset: u32) -> Result<(u32, u32, Bytes), Error> {
216        // Read item size
217        let offset = offset as u64 * ITEM_ALIGNMENT;
218        let mut size = [0u8; 4];
219        blob.read_at(&mut size, offset).await?;
220        let size = u32::from_be_bytes(size);
221        let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
222
223        // Read item
224        let mut item = vec![0u8; size as usize];
225        blob.read_at(&mut item, offset).await?;
226        let offset = offset
227            .checked_add(size as u64)
228            .ok_or(Error::OffsetOverflow)?;
229
230        // Read checksum
231        let mut stored_checksum = [0u8; 4];
232        blob.read_at(&mut stored_checksum, offset).await?;
233        let stored_checksum = u32::from_be_bytes(stored_checksum);
234        let checksum = crc32fast::hash(&item);
235        if checksum != stored_checksum {
236            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
237        }
238        let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
239
240        // Compute next offset
241        let aligned_offset = compute_next_offset(offset)?;
242
243        // Return item
244        Ok((aligned_offset, size, Bytes::from(item)))
245    }
246
247    /// Read `prefix` bytes from the blob at the given offset.
248    ///
249    /// # Warning
250    ///
251    /// This method bypasses the checksum verification and the caller is responsible for ensuring
252    /// the integrity of any data read. If `prefix` exceeds the size of an item (and runs over the blob
253    /// length), it will lead to unintentional truncation of data.
254    async fn read_prefix(blob: &B, offset: u32, prefix: u32) -> Result<(u32, u32, Bytes), Error> {
255        // Read item size and first `prefix` bytes
256        let offset = offset as u64 * ITEM_ALIGNMENT;
257        let mut buf = vec![0u8; 4 + prefix as usize];
258        blob.read_at(&mut buf, offset).await?;
259
260        // Get item size to compute next offset
261        let size = u32::from_be_bytes(buf[..4].try_into().unwrap());
262
263        // Get item prefix
264        //
265        // We don't compute the checksum here nor do we verify that the bytes
266        // requested is less than the item size.
267        let item_prefix = Bytes::from(buf[4..].to_vec());
268
269        // Compute next offset
270        let offset = offset
271            .checked_add(4)
272            .ok_or(Error::OffsetOverflow)?
273            .checked_add(size as u64)
274            .ok_or(Error::OffsetOverflow)?
275            .checked_add(4)
276            .ok_or(Error::OffsetOverflow)?;
277        let aligned_offset = compute_next_offset(offset)?;
278
279        // Return item
280        Ok((aligned_offset, size, item_prefix))
281    }
282
283    /// Read an item from the blob assuming it is of `exact` length. This method verifies the
284    /// checksum of the item.
285    ///
286    /// # Warning
287    ///
288    /// This method assumes the caller knows the exact size of the item (either because
289    /// they store fixed-size items or they previously indexed the size). If an incorrect
290    /// `exact` is provided, the method will likely return an error (as integrity is verified).
291    async fn read_exact(blob: &B, offset: u32, exact: u32) -> Result<(u32, Bytes), Error> {
292        // Read all of the item into one buffer
293        let offset = offset as u64 * ITEM_ALIGNMENT;
294        let mut buf = vec![0u8; 4 + exact as usize + 4];
295        blob.read_at(&mut buf, offset).await?;
296
297        // Check size
298        let size = u32::from_be_bytes(buf[..4].try_into().unwrap());
299        if size != exact {
300            return Err(Error::UnexpectedSize(size, exact));
301        }
302
303        // Get item
304        let item = Bytes::from(buf[4..4 + exact as usize].to_vec());
305
306        // Verify integrity
307        let stored_checksum = u32::from_be_bytes(buf[4 + exact as usize..].try_into().unwrap());
308        let checksum = crc32fast::hash(&item);
309        if checksum != stored_checksum {
310            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
311        }
312
313        // Compute next offset
314        let offset = offset
315            .checked_add(4)
316            .ok_or(Error::OffsetOverflow)?
317            .checked_add(exact as u64)
318            .ok_or(Error::OffsetOverflow)?
319            .checked_add(4)
320            .ok_or(Error::OffsetOverflow)?;
321        let aligned_offset = compute_next_offset(offset)?;
322
323        // Return item
324        Ok((aligned_offset, item))
325    }
326
327    /// Returns an unordered stream of all items in the journal.
328    ///
329    /// # Repair
330    ///
331    /// If any corrupted data is found, the stream will return an error.
332    ///
333    /// If any trailing data is found (i.e. misaligned entries), the journal will be truncated
334    /// to the last valid item. For this reason, it is recommended to call `replay` before
335    /// calling `append` (as data added to trailing bytes will fail checksum after restart).
336    ///
337    /// # Concurrency
338    ///
339    /// The `concurrency` parameter controls how many blobs are replayed concurrently. This can dramatically
340    /// speed up the replay process if the underlying storage supports concurrent reads across different
341    /// blobs.
342    ///
343    /// # Prefix
344    ///
345    /// If `prefix` is provided, the stream will only read up to `prefix` bytes of each item. Consequently,
346    /// this means we will not compute a checksum of the entire data and it is up to the caller to deal
347    /// with the consequences of this.
348    ///
349    /// Reading `prefix` bytes and skipping ahead to a future location in a blob is the theoretically optimal
350    /// way to read only what is required from storage, however, different storage implementations may take
351    /// the opportunity to readahead past what is required (needlessly). If the underlying storage can be tuned
352    /// for random access prior to invoking replay, it may lead to less IO.
353    pub async fn replay(
354        &mut self,
355        concurrency: usize,
356        prefix: Option<u32>,
357    ) -> Result<impl Stream<Item = Result<(u64, u32, u32, Bytes), Error>> + '_, Error> {
358        // Collect all blobs to replay
359        let mut blobs = Vec::with_capacity(self.blobs.len());
360        for (section, blob) in self.blobs.iter() {
361            let len = blob.len().await?;
362            let aligned_len = compute_next_offset(len)?;
363            blobs.push((*section, blob, aligned_len));
364        }
365
366        // Replay all blobs concurrently and stream items as they are read (to avoid
367        // occupying too much memory with buffered data)
368        Ok(stream::iter(blobs)
369            .map(move |(section, blob, len)| async move {
370                stream::unfold(
371                    (section, blob, 0u32),
372                    move |(section, blob, offset)| async move {
373                        // Check if we are at the end of the blob
374                        if offset == len {
375                            return None;
376                        }
377
378                        // Get next item
379                        let mut read = match prefix {
380                            Some(prefix) => Self::read_prefix(blob, offset, prefix).await,
381                            None => Self::read(blob, offset).await,
382                        };
383
384                        // Ensure a full read wouldn't put us past the end of the blob
385                        if let Ok((next_offset, _, _)) = read {
386                            if next_offset > len {
387                                read = Err(Error::Runtime(RError::BlobInsufficientLength));
388                            }
389                        };
390
391                        // Handle read result
392                        match read {
393                            Ok((next_offset, item_size, item)) => {
394                                trace!(blob = section, cursor = offset, len, "replayed item");
395                                Some((
396                                    Ok((section, offset, item_size, item)),
397                                    (section, blob, next_offset),
398                                ))
399                            }
400                            Err(Error::ChecksumMismatch(expected, found)) => {
401                                // If we encounter corruption, we don't try to fix it.
402                                warn!(
403                                    blob = section,
404                                    cursor = offset,
405                                    expected,
406                                    found,
407                                    "corruption detected"
408                                );
409                                Some((
410                                    Err(Error::ChecksumMismatch(expected, found)),
411                                    (section, blob, len),
412                                ))
413                            }
414                            Err(Error::Runtime(RError::BlobInsufficientLength)) => {
415                                // If we encounter trailing bytes, we prune to the last
416                                // valid item. This can happen during an unclean file close (where
417                                // pending data is not fully synced to disk).
418                                warn!(
419                                    blob = section,
420                                    new_size = offset,
421                                    old_size = len,
422                                    "trailing bytes detected: truncating"
423                                );
424                                blob.truncate(offset as u64 * ITEM_ALIGNMENT).await.ok()?;
425                                blob.sync().await.ok()?;
426                                None
427                            }
428                            Err(err) => Some((Err(err), (section, blob, len))),
429                        }
430                    },
431                )
432            })
433            .buffer_unordered(concurrency)
434            .flatten())
435    }
436
437    /// Appends an item to `Journal` in a given `section`.
438    ///
439    /// # Warning
440    ///
441    /// If there exist trailing bytes in the `Blob` of a particular `section` and
442    /// `replay` is not called before this, it is likely that subsequent data added
443    /// to the `Blob` will be considered corrupted (as the trailing bytes will fail
444    /// the checksum verification). It is recommended to call `replay` before calling
445    /// `append` to prevent this.
446    pub async fn append(&mut self, section: u64, item: Bytes) -> Result<u32, Error> {
447        // Check last pruned
448        self.prune_guard(section, false)?;
449
450        // Ensure item is not too large
451        let item_len = item.len();
452        let len = 4 + item_len + 4;
453        let item_len = match item_len.try_into() {
454            Ok(len) => len,
455            Err(_) => return Err(Error::ItemTooLarge(item_len)),
456        };
457
458        // Get existing blob or create new one
459        let blob = match self.blobs.entry(section) {
460            Entry::Occupied(entry) => entry.into_mut(),
461            Entry::Vacant(entry) => {
462                let name = section.to_be_bytes();
463                let blob = self.runtime.open(&self.cfg.partition, &name).await?;
464                self.tracked.inc();
465                entry.insert(blob)
466            }
467        };
468
469        // Populate buffer
470        let mut buf = Vec::with_capacity(len);
471        buf.put_u32(item_len);
472        let checksum = crc32fast::hash(&item);
473        buf.put(item);
474        buf.put_u32(checksum);
475
476        // Append item to blob
477        let cursor = blob.len().await?;
478        let offset = compute_next_offset(cursor)?;
479        blob.write_at(&buf, offset as u64 * ITEM_ALIGNMENT).await?;
480        trace!(blob = section, previous_len = len, offset, "appended item");
481        Ok(offset)
482    }
483
484    /// Retrieves the first `prefix` bytes of an item from `Journal` at a given `section` and `offset`.
485    ///
486    /// This method bypasses the checksum verification and the caller is responsible for ensuring
487    /// the integrity of any data read.
488    pub async fn get_prefix(
489        &self,
490        section: u64,
491        offset: u32,
492        prefix: u32,
493    ) -> Result<Option<Bytes>, Error> {
494        self.prune_guard(section, false)?;
495        let blob = match self.blobs.get(&section) {
496            Some(blob) => blob,
497            None => return Ok(None),
498        };
499        let (_, _, item) = Self::read_prefix(blob, offset, prefix).await?;
500        Ok(Some(item))
501    }
502
503    /// Retrieves an item from `Journal` at a given `section` and `offset`.
504    ///
505    /// If `exact` is provided, it is assumed the item is of size `exact` (which allows
506    /// the item to be read in a single read). If `exact` is provided, the checksum of the
507    /// data is still verified.
508    pub async fn get(
509        &self,
510        section: u64,
511        offset: u32,
512        exact: Option<u32>,
513    ) -> Result<Option<Bytes>, Error> {
514        self.prune_guard(section, false)?;
515        let blob = match self.blobs.get(&section) {
516            Some(blob) => blob,
517            None => return Ok(None),
518        };
519
520        // If we have an exact size, we can read the item in one go.
521        if let Some(exact) = exact {
522            let (_, item) = Self::read_exact(blob, offset, exact).await?;
523            return Ok(Some(item));
524        }
525
526        // Perform a multi-op read.
527        let (_, _, item) = Self::read(blob, offset).await?;
528        Ok(Some(item))
529    }
530
531    /// Ensures that all data in a given `section` is synced to the underlying store.
532    ///
533    /// If the `section` does not exist, no error will be returned.
534    pub async fn sync(&self, section: u64) -> Result<(), Error> {
535        self.prune_guard(section, false)?;
536        let blob = match self.blobs.get(&section) {
537            Some(blob) => blob,
538            None => return Ok(()),
539        };
540        self.synced.inc();
541        blob.sync().await.map_err(Error::Runtime)
542    }
543
544    /// Prunes all `sections` less than `min`.
545    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
546        // Check if we already ran this prune
547        self.prune_guard(min, true)?;
548
549        // Prune any blobs that are smaller than the minimum
550        while let Some((&section, _)) = self.blobs.first_key_value() {
551            // Stop pruning if we reach the minimum
552            if section >= min {
553                break;
554            }
555
556            // Remove and close blob
557            let blob = self.blobs.remove(&section).unwrap();
558            blob.close().await?;
559
560            // Remove blob from storage
561            self.runtime
562                .remove(&self.cfg.partition, Some(&section.to_be_bytes()))
563                .await?;
564            debug!(blob = section, "pruned blob");
565            self.tracked.dec();
566            self.pruned.inc();
567        }
568
569        // Update oldest allowed
570        self.oldest_allowed = Some(min);
571        Ok(())
572    }
573
574    /// Closes all open sections.
575    pub async fn close(self) -> Result<(), Error> {
576        for (section, blob) in self.blobs.into_iter() {
577            blob.close().await?;
578            debug!(blob = section, "closed blob");
579        }
580        Ok(())
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587    use bytes::{BufMut, Bytes};
588    use commonware_macros::test_traced;
589    use commonware_runtime::{deterministic::Executor, Blob, Error as RError, Runner, Storage};
590    use futures::{pin_mut, StreamExt};
591    use prometheus_client::encoding::text::encode;
592
593    #[test_traced]
594    fn test_journal_append_and_read() {
595        // Initialize the deterministic runtime
596        let (executor, context, _) = Executor::default();
597
598        // Start the test within the executor
599        executor.start(async move {
600            // Initialize the journal
601            let cfg = Config {
602                registry: Arc::new(Mutex::new(Registry::default())),
603                partition: "test_partition".into(),
604            };
605            let index = 1u64;
606            let data = Bytes::from("Test data");
607            let mut journal = Journal::init(context.clone(), cfg.clone())
608                .await
609                .expect("Failed to initialize journal");
610
611            // Append an item to the journal
612            journal
613                .append(index, data.clone())
614                .await
615                .expect("Failed to append data");
616
617            // Check metrics
618            let mut buffer = String::new();
619            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
620            assert!(buffer.contains("tracked 1"));
621
622            // Close the journal
623            journal.close().await.expect("Failed to close journal");
624
625            // Re-initialize the journal to simulate a restart
626            let cfg = Config {
627                registry: Arc::new(Mutex::new(Registry::default())),
628                partition: "test_partition".into(),
629            };
630            let mut journal = Journal::init(context, cfg.clone())
631                .await
632                .expect("Failed to re-initialize journal");
633
634            // Replay the journal and collect items
635            let mut items = Vec::new();
636            let stream = journal
637                .replay(1, None)
638                .await
639                .expect("unable to setup replay");
640            pin_mut!(stream);
641            while let Some(result) = stream.next().await {
642                match result {
643                    Ok((blob_index, _, full_len, item)) => {
644                        assert_eq!(full_len as usize, item.len());
645                        items.push((blob_index, item))
646                    }
647                    Err(err) => panic!("Failed to read item: {}", err),
648                }
649            }
650
651            // Verify that the item was replayed correctly
652            assert_eq!(items.len(), 1);
653            assert_eq!(items[0].0, index);
654            assert_eq!(items[0].1, data);
655
656            // Check metrics
657            let mut buffer = String::new();
658            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
659            assert!(buffer.contains("tracked 1"));
660        });
661    }
662
663    #[test_traced]
664    fn test_journal_multiple_appends_and_reads() {
665        // Initialize the deterministic runtime
666        let (executor, context, _) = Executor::default();
667
668        // Start the test within the executor
669        executor.start(async move {
670            // Create a journal configuration
671            let cfg = Config {
672                registry: Arc::new(Mutex::new(Registry::default())),
673                partition: "test_partition".into(),
674            };
675
676            // Initialize the journal
677            let mut journal = Journal::init(context.clone(), cfg.clone())
678                .await
679                .expect("Failed to initialize journal");
680
681            // Append multiple items to different blobs
682            let data_items = vec![
683                (1u64, Bytes::from("Data for blob 1")),
684                (1u64, Bytes::from("Data for blob 1, second item")),
685                (2u64, Bytes::from("Data for blob 2")),
686                (3u64, Bytes::from("Data for blob 3")),
687            ];
688            for (index, data) in &data_items {
689                journal
690                    .append(*index, data.clone())
691                    .await
692                    .expect("Failed to append data");
693                journal.sync(*index).await.expect("Failed to sync blob");
694            }
695
696            // Check metrics
697            let mut buffer = String::new();
698            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
699            assert!(buffer.contains("tracked 3"));
700            assert!(buffer.contains("synced_total 4"));
701
702            // Close the journal
703            journal.close().await.expect("Failed to close journal");
704
705            // Re-initialize the journal to simulate a restart
706            let mut journal = Journal::init(context, cfg)
707                .await
708                .expect("Failed to re-initialize journal");
709
710            // Replay the journal and collect items
711            let mut items = Vec::new();
712            {
713                let stream = journal
714                    .replay(2, None)
715                    .await
716                    .expect("unable to setup replay");
717                pin_mut!(stream);
718                while let Some(result) = stream.next().await {
719                    match result {
720                        Ok((blob_index, _, full_len, item)) => {
721                            assert_eq!(full_len as usize, item.len());
722                            items.push((blob_index, item))
723                        }
724                        Err(err) => panic!("Failed to read item: {}", err),
725                    }
726                }
727            }
728
729            // Verify that all items were replayed correctly
730            assert_eq!(items.len(), data_items.len());
731            for ((expected_index, expected_data), (actual_index, actual_data)) in
732                data_items.iter().zip(items.iter())
733            {
734                assert_eq!(actual_index, expected_index);
735                assert_eq!(actual_data, expected_data);
736            }
737
738            // Replay just first bytes
739            {
740                let stream = journal
741                    .replay(2, Some(4))
742                    .await
743                    .expect("unable to setup replay");
744                pin_mut!(stream);
745                while let Some(result) = stream.next().await {
746                    match result {
747                        Ok((_, _, full_len, item)) => {
748                            assert_eq!(item, Bytes::from("Data"));
749                            assert!(full_len as usize > item.len());
750                        }
751                        Err(err) => panic!("Failed to read item: {}", err),
752                    }
753                }
754            }
755        });
756    }
757
758    #[test_traced]
759    fn test_journal_prune_blobs() {
760        // Initialize the deterministic runtime
761        let (executor, context, _) = Executor::default();
762
763        // Start the test within the executor
764        executor.start(async move {
765            // Create a journal configuration
766            let cfg = Config {
767                registry: Arc::new(Mutex::new(Registry::default())),
768                partition: "test_partition".into(),
769            };
770
771            // Initialize the journal
772            let mut journal = Journal::init(context.clone(), cfg.clone())
773                .await
774                .expect("Failed to initialize journal");
775
776            // Append items to multiple blobs
777            for index in 1u64..=5u64 {
778                let data = Bytes::from(format!("Data for blob {}", index));
779                journal
780                    .append(index, data)
781                    .await
782                    .expect("Failed to append data");
783                journal.sync(index).await.expect("Failed to sync blob");
784            }
785
786            // Add one item out-of-order
787            let data = Bytes::from("Data for blob 2, second item");
788            journal
789                .append(2u64, data)
790                .await
791                .expect("Failed to append data");
792            journal.sync(2u64).await.expect("Failed to sync blob");
793
794            // Prune blobs with indices less than 3
795            journal.prune(3).await.expect("Failed to prune blobs");
796
797            // Prune again with a section less than the previous one
798            let result = journal.prune(2).await;
799            assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
800
801            // Prune again with the same section
802            let result = journal.prune(3).await;
803            assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
804
805            // Check metrics
806            let mut buffer = String::new();
807            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
808            assert!(buffer.contains("pruned_total 2"));
809
810            // Close the journal
811            journal.close().await.expect("Failed to close journal");
812
813            // Re-initialize the journal to simulate a restart
814            let mut journal = Journal::init(context.clone(), cfg.clone())
815                .await
816                .expect("Failed to re-initialize journal");
817
818            // Replay the journal and collect items
819            let mut items = Vec::new();
820            {
821                let stream = journal
822                    .replay(1, None)
823                    .await
824                    .expect("unable to setup replay");
825                pin_mut!(stream);
826                while let Some(result) = stream.next().await {
827                    match result {
828                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
829                        Err(err) => panic!("Failed to read item: {}", err),
830                    }
831                }
832            }
833
834            // Verify that items from blobs 1 and 2 are not present
835            assert_eq!(items.len(), 3);
836            let expected_indices = [3u64, 4u64, 5u64];
837            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
838                assert_eq!(item.0, *expected_index);
839            }
840
841            // Prune all blobs
842            journal.prune(6).await.expect("Failed to prune blobs");
843
844            // Close the journal
845            journal.close().await.expect("Failed to close journal");
846
847            // Ensure no remaining blobs exist
848            //
849            // Note: We don't remove the partition, so this does not error
850            // and instead returns an empty list of blobs.
851            assert!(context
852                .scan(&cfg.partition)
853                .await
854                .expect("Failed to list blobs")
855                .is_empty());
856        });
857    }
858
859    #[test_traced]
860    fn test_journal_with_invalid_blob_name() {
861        // Initialize the deterministic runtime
862        let (executor, context, _) = Executor::default();
863
864        // Start the test within the executor
865        executor.start(async move {
866            // Create a journal configuration
867            let cfg = Config {
868                registry: Arc::new(Mutex::new(Registry::default())),
869                partition: "test_partition".into(),
870            };
871
872            // Manually create a blob with an invalid name (not 8 bytes)
873            let invalid_blob_name = b"invalid"; // Less than 8 bytes
874            let blob = context
875                .open(&cfg.partition, invalid_blob_name)
876                .await
877                .expect("Failed to create blob with invalid name");
878            blob.close().await.expect("Failed to close blob");
879
880            // Attempt to initialize the journal
881            let result = Journal::init(context, cfg).await;
882
883            // Expect an error
884            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
885        });
886    }
887
888    fn journal_read_size_missing(exact: Option<u32>) {
889        // Initialize the deterministic runtime
890        let (executor, context, _) = Executor::default();
891
892        // Start the test within the executor
893        executor.start(async move {
894            // Create a journal configuration
895            let cfg = Config {
896                registry: Arc::new(Mutex::new(Registry::default())),
897                partition: "test_partition".into(),
898            };
899
900            // Manually create a blob with incomplete size data
901            let section = 1u64;
902            let blob_name = section.to_be_bytes();
903            let blob = context
904                .open(&cfg.partition, &blob_name)
905                .await
906                .expect("Failed to create blob");
907
908            // Write incomplete size data (less than 4 bytes)
909            let incomplete_data = vec![0x00, 0x01]; // Less than 4 bytes
910            blob.write_at(&incomplete_data, 0)
911                .await
912                .expect("Failed to write incomplete data");
913            blob.close().await.expect("Failed to close blob");
914
915            // Initialize the journal
916            let mut journal = Journal::init(context, cfg)
917                .await
918                .expect("Failed to initialize journal");
919
920            // Attempt to replay the journal
921            let stream = journal
922                .replay(1, exact)
923                .await
924                .expect("unable to setup replay");
925            pin_mut!(stream);
926            let mut items = Vec::new();
927            while let Some(result) = stream.next().await {
928                match result {
929                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
930                    Err(err) => panic!("Failed to read item: {}", err),
931                }
932            }
933            assert!(items.is_empty());
934        });
935    }
936
937    #[test_traced]
938    fn test_journal_read_size_missing_no_exact() {
939        journal_read_size_missing(None);
940    }
941
942    #[test_traced]
943    fn test_journal_read_size_missing_with_exact() {
944        journal_read_size_missing(Some(1));
945    }
946
947    fn journal_read_item_missing(exact: Option<u32>) {
948        // Initialize the deterministic runtime
949        let (executor, context, _) = Executor::default();
950
951        // Start the test within the executor
952        executor.start(async move {
953            // Create a journal configuration
954            let cfg = Config {
955                registry: Arc::new(Mutex::new(Registry::default())),
956                partition: "test_partition".into(),
957            };
958
959            // Manually create a blob with missing item data
960            let section = 1u64;
961            let blob_name = section.to_be_bytes();
962            let blob = context
963                .open(&cfg.partition, &blob_name)
964                .await
965                .expect("Failed to create blob");
966
967            // Write size but no item data
968            let item_size: u32 = 10; // Size of the item
969            let mut buf = Vec::new();
970            buf.put_u32(item_size);
971            let data = [2u8; 5];
972            buf.put_slice(&data);
973            blob.write_at(&buf, 0)
974                .await
975                .expect("Failed to write item size");
976            blob.close().await.expect("Failed to close blob");
977
978            // Initialize the journal
979            let mut journal = Journal::init(context, cfg)
980                .await
981                .expect("Failed to initialize journal");
982
983            // Attempt to replay the journal
984            let stream = journal
985                .replay(1, exact)
986                .await
987                .expect("unable to setup replay");
988
989            pin_mut!(stream);
990            let mut items = Vec::new();
991            while let Some(result) = stream.next().await {
992                match result {
993                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
994                    Err(err) => panic!("Failed to read item: {}", err),
995                }
996            }
997            assert!(items.is_empty());
998        });
999    }
1000
1001    #[test_traced]
1002    fn test_journal_read_item_missing_no_exact() {
1003        journal_read_item_missing(None);
1004    }
1005
1006    #[test_traced]
1007    fn test_journal_read_item_missing_with_exact() {
1008        journal_read_item_missing(Some(1));
1009    }
1010
1011    #[test_traced]
1012    fn test_journal_read_checksum_missing() {
1013        // Initialize the deterministic runtime
1014        let (executor, context, _) = Executor::default();
1015
1016        // Start the test within the executor
1017        executor.start(async move {
1018            // Create a journal configuration
1019            let cfg = Config {
1020                registry: Arc::new(Mutex::new(Registry::default())),
1021                partition: "test_partition".into(),
1022            };
1023
1024            // Manually create a blob with missing checksum
1025            let section = 1u64;
1026            let blob_name = section.to_be_bytes();
1027            let blob = context
1028                .open(&cfg.partition, &blob_name)
1029                .await
1030                .expect("Failed to create blob");
1031
1032            // Prepare item data
1033            let item_data = b"Test data";
1034            let item_size = item_data.len() as u32;
1035
1036            // Write size
1037            let mut offset = 0;
1038            blob.write_at(&item_size.to_be_bytes(), offset)
1039                .await
1040                .expect("Failed to write item size");
1041            offset += 4;
1042
1043            // Write item data
1044            blob.write_at(item_data, offset)
1045                .await
1046                .expect("Failed to write item data");
1047            // Do not write checksum (omit it)
1048
1049            blob.close().await.expect("Failed to close blob");
1050
1051            // Initialize the journal
1052            let mut journal = Journal::init(context, cfg)
1053                .await
1054                .expect("Failed to initialize journal");
1055
1056            // Attempt to replay the journal
1057            //
1058            // This will truncate the leftover bytes from our manual write.
1059            let stream = journal
1060                .replay(1, None)
1061                .await
1062                .expect("unable to setup replay");
1063            pin_mut!(stream);
1064            let mut items = Vec::new();
1065            while let Some(result) = stream.next().await {
1066                match result {
1067                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1068                    Err(err) => panic!("Failed to read item: {}", err),
1069                }
1070            }
1071            assert!(items.is_empty());
1072        });
1073    }
1074
1075    #[test_traced]
1076    fn test_journal_read_checksum_mismatch() {
1077        // Initialize the deterministic runtime
1078        let (executor, context, _) = Executor::default();
1079
1080        // Start the test within the executor
1081        executor.start(async move {
1082            // Create a journal configuration
1083            let cfg = Config {
1084                registry: Arc::new(Mutex::new(Registry::default())),
1085                partition: "test_partition".into(),
1086            };
1087
1088            // Manually create a blob with incorrect checksum
1089            let section = 1u64;
1090            let blob_name = section.to_be_bytes();
1091            let blob = context
1092                .open(&cfg.partition, &blob_name)
1093                .await
1094                .expect("Failed to create blob");
1095
1096            // Prepare item data
1097            let item_data = b"Test data";
1098            let item_size = item_data.len() as u32;
1099            let incorrect_checksum: u32 = 0xDEADBEEF;
1100
1101            // Write size
1102            let mut offset = 0;
1103            blob.write_at(&item_size.to_be_bytes(), offset)
1104                .await
1105                .expect("Failed to write item size");
1106            offset += 4;
1107
1108            // Write item data
1109            blob.write_at(item_data, offset)
1110                .await
1111                .expect("Failed to write item data");
1112            offset += item_data.len() as u64;
1113
1114            // Write incorrect checksum
1115            blob.write_at(&incorrect_checksum.to_be_bytes(), offset)
1116                .await
1117                .expect("Failed to write incorrect checksum");
1118
1119            blob.close().await.expect("Failed to close blob");
1120
1121            // Initialize the journal
1122            let mut journal = Journal::init(context, cfg)
1123                .await
1124                .expect("Failed to initialize journal");
1125
1126            // Attempt to replay the journal
1127            let stream = journal
1128                .replay(1, None)
1129                .await
1130                .expect("unable to setup replay");
1131            pin_mut!(stream);
1132            let mut items = Vec::new();
1133            let mut got_checksum_error = false;
1134            while let Some(result) = stream.next().await {
1135                match result {
1136                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1137                    Err(err) => {
1138                        assert!(matches!(err, Error::ChecksumMismatch(_, _)));
1139                        got_checksum_error = true;
1140                        // We explicitly don't return or break here to test that we won't end up in
1141                        // an infinite loop if the replay caller doesn't abort on error.
1142                    }
1143                }
1144            }
1145            assert!(got_checksum_error, "expected checksum mismatch error");
1146        });
1147    }
1148
1149    #[test_traced]
1150    fn test_journal_handling_truncated_data() {
1151        // Initialize the deterministic runtime
1152        let (executor, context, _) = Executor::default();
1153
1154        // Start the test within the executor
1155        executor.start(async move {
1156            // Create a journal configuration
1157            let cfg = Config {
1158                registry: Arc::new(Mutex::new(Registry::default())),
1159                partition: "test_partition".into(),
1160            };
1161
1162            // Initialize the journal
1163            let mut journal = Journal::init(context.clone(), cfg.clone())
1164                .await
1165                .expect("Failed to initialize journal");
1166
1167            // Append 1 item to the first index
1168            journal
1169                .append(1, Bytes::from("Valid data"))
1170                .await
1171                .expect("Failed to append data");
1172
1173            // Append multiple items to the second index
1174            let data_items = vec![
1175                (2u64, Bytes::from("Valid data")),
1176                (2u64, Bytes::from("Valid data, second item")),
1177                (2u64, Bytes::from("Valid data, third item")),
1178            ];
1179            for (index, data) in &data_items {
1180                journal
1181                    .append(*index, data.clone())
1182                    .await
1183                    .expect("Failed to append data");
1184                journal.sync(*index).await.expect("Failed to sync blob");
1185            }
1186
1187            // Close the journal
1188            journal.close().await.expect("Failed to close journal");
1189
1190            // Manually corrupt the end of the second blob
1191            let blob = context
1192                .open(&cfg.partition, &2u64.to_be_bytes())
1193                .await
1194                .expect("Failed to open blob");
1195            let blob_len = blob.len().await.expect("Failed to get blob length");
1196            blob.truncate(blob_len - 4)
1197                .await
1198                .expect("Failed to corrupt blob");
1199            blob.close().await.expect("Failed to close blob");
1200
1201            // Re-initialize the journal to simulate a restart
1202            let mut journal = Journal::init(context, cfg)
1203                .await
1204                .expect("Failed to re-initialize journal");
1205
1206            // Attempt to replay the journal
1207            let mut items = Vec::new();
1208            let stream = journal
1209                .replay(1, None)
1210                .await
1211                .expect("unable to setup replay");
1212            pin_mut!(stream);
1213            while let Some(result) = stream.next().await {
1214                match result {
1215                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1216                    Err(err) => panic!("Failed to read item: {}", err),
1217                }
1218            }
1219
1220            // Verify that only non-corrupted items were replayed
1221            assert_eq!(items.len(), 3);
1222            assert_eq!(items[0].0, 1);
1223            assert_eq!(items[0].1, Bytes::from("Valid data"));
1224            assert_eq!(items[1].0, data_items[0].0);
1225            assert_eq!(items[1].1, data_items[0].1);
1226            assert_eq!(items[2].0, data_items[1].0);
1227            assert_eq!(items[2].1, data_items[1].1);
1228        });
1229    }
1230
1231    // Define `MockBlob` that returns an offset length that should overflow
1232    #[derive(Clone)]
1233    struct MockBlob {
1234        len: u64,
1235    }
1236
1237    impl Blob for MockBlob {
1238        async fn len(&self) -> Result<u64, commonware_runtime::Error> {
1239            // Return a length that will cause offset overflow
1240            Ok(self.len)
1241        }
1242
1243        async fn read_at(&self, _buf: &mut [u8], _offset: u64) -> Result<(), RError> {
1244            Ok(())
1245        }
1246
1247        async fn write_at(&self, _buf: &[u8], _offset: u64) -> Result<(), RError> {
1248            Ok(())
1249        }
1250
1251        async fn truncate(&self, _len: u64) -> Result<(), RError> {
1252            Ok(())
1253        }
1254
1255        async fn sync(&self) -> Result<(), RError> {
1256            Ok(())
1257        }
1258
1259        async fn close(self) -> Result<(), RError> {
1260            Ok(())
1261        }
1262    }
1263
1264    // Define `MockStorage` that returns `MockBlob`
1265    #[derive(Clone)]
1266    struct MockStorage {
1267        len: u64,
1268    }
1269
1270    impl Storage<MockBlob> for MockStorage {
1271        async fn open(&self, _partition: &str, _name: &[u8]) -> Result<MockBlob, RError> {
1272            Ok(MockBlob { len: self.len })
1273        }
1274
1275        async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1276            Ok(())
1277        }
1278
1279        async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1280            Ok(vec![])
1281        }
1282    }
1283
1284    // Define the `INDEX_ALIGNMENT` again explicitly to ensure we catch any accidental
1285    // changes to the value
1286    const INDEX_ALIGNMENT: u64 = 16;
1287
1288    #[test_traced]
1289    fn test_journal_large_offset() {
1290        // Initialize the deterministic runtime
1291        let (executor, _, _) = Executor::default();
1292        executor.start(async move {
1293            // Create journal
1294            let cfg = Config {
1295                registry: Arc::new(Mutex::new(Registry::default())),
1296                partition: "partition".to_string(),
1297            };
1298            let runtime = MockStorage {
1299                len: u32::MAX as u64 * INDEX_ALIGNMENT, // can store up to u32::Max at the last offset
1300            };
1301            let mut journal = Journal::init(runtime, cfg).await.unwrap();
1302
1303            // Append data
1304            let data = Bytes::from("Test data");
1305            let result = journal
1306                .append(1, data)
1307                .await
1308                .expect("Failed to append data");
1309            assert_eq!(result, u32::MAX);
1310        });
1311    }
1312
1313    #[test_traced]
1314    fn test_journal_offset_overflow() {
1315        // Initialize the deterministic runtime
1316        let (executor, _, _) = Executor::default();
1317        executor.start(async move {
1318            // Create journal
1319            let cfg = Config {
1320                registry: Arc::new(Mutex::new(Registry::default())),
1321                partition: "partition".to_string(),
1322            };
1323            let runtime = MockStorage {
1324                len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1325            };
1326            let mut journal = Journal::init(runtime, cfg).await.unwrap();
1327
1328            // Append data
1329            let data = Bytes::from("Test data");
1330            let result = journal.append(1, data).await;
1331            assert!(matches!(result, Err(Error::OffsetOverflow)));
1332        });
1333    }
1334}