commonware_storage/journal/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `variable::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular `Blob` in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+---+---+---+
15//! | 0 | 1 | 2 | 3 |    ...    | 8 | 9 |10 |11 |
16//! +---+---+---+---+---+---+---+---+---+---+---+
17//! |   Size (u32)  |   Data    |    C(u32)     |
18//! +---+---+---+---+---+---+---+---+---+---+---+
19//!
20//! C = CRC32(Data)
21//! ```
22//!
23//! _To ensure data returned by `Journal` is correct, a checksum (CRC32) is stored at the end of
24//! each item. If the checksum of the read data does not match the stored checksum, an error is
25//! returned. This checksum is only verified when data is accessed and not at startup (which would
26//! require reading all data in `Journal`)._
27//!
28//! # Open Blobs
29//!
30//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
31//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
32//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
33//! `sections`.
34//!
35//! # Offset Alignment
36//!
37//! In practice, `Journal` users won't store `u64::MAX` bytes of data in a given `section` (the max
38//! `Offset` provided by `Blob`). To reduce the memory usage for tracking offsets within `Journal`,
39//! offsets are thus `u32` (4 bytes) and aligned to 16 bytes. This means that the maximum size of
40//! any `section` is `u32::MAX * 17 = ~70GB` bytes (the last offset item can store up to `u32::MAX`
41//! bytes). If more data is written to a `section` past this max, an `OffsetOverflow` error is
42//! returned.
43//!
44//! # Sync
45//!
46//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
47//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
48//! calling `close`, all pending data is automatically synced and any open blobs are closed.
49//!
50//! # Pruning
51//!
52//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
53//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
54//! could be used, for example, by some blockchain application to prune old blocks.
55//!
56//! # Replay
57//!
58//! During application initialization, it is very common to replay data from `Journal` to recover
59//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
60//! method that iterates over multiple `sections` concurrently in a single stream.
61//!
62//! ## Skip Reads
63//!
64//! Some applications may only want to read the first `n` bytes of each item during `replay`. This
65//! can be done by providing a `prefix` parameter to the `replay` method. If `prefix` is provided,
66//! `Journal` will only return the first `prefix` bytes of each item and "skip ahead" to the next
67//! item (computing the offset using the read `size` value).
68//!
69//! _Reading only the `prefix` bytes of an item makes it impossible to compute the checksum of an
70//! item. It is up to the caller to ensure these reads are safe._
71//!
72//! # Exact Reads
73//!
74//! To allow for items to be fetched in a single disk operation, `Journal` allows callers to specify
75//! an `exact` parameter to the `get` method. This `exact` parameter must be cached by the caller
76//! (provided during `replay`) and usage of an incorrect `exact` value will result in undefined
77//! behavior.
78//!
79//! # Example
80//!
81//! ```rust
82//! use commonware_runtime::{Spawner, Runner, deterministic::Executor};
83//! use commonware_storage::journal::variable::{Journal, Config};
84//!
85//! let (executor, context, _) = Executor::default();
86//! executor.start(async move {
87//!     // Create a journal
88//!     let mut journal = Journal::init(context, Config{
89//!         partition: "partition".to_string()
90//!     }).await.unwrap();
91//!
92//!     // Append data to the journal
93//!     journal.append(1, "data".into()).await.unwrap();
94//!
95//!     // Close the journal
96//!     journal.close().await.unwrap();
97//! });
98//! ```
99
100use super::Error;
101use bytes::{BufMut, Bytes};
102use commonware_runtime::{Blob, Error as RError, Metrics, Storage};
103use commonware_utils::hex;
104use futures::stream::{self, Stream, StreamExt};
105use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
106use std::collections::{btree_map::Entry, BTreeMap};
107use tracing::{debug, trace, warn};
108
109/// Configuration for `Journal` storage.
110#[derive(Clone)]
111pub struct Config {
112    /// The `commonware-runtime::Storage` partition to use
113    /// for storing journal blobs.
114    pub partition: String,
115}
116
117const ITEM_ALIGNMENT: u64 = 16;
118
119/// Computes the next offset for an item using the underlying `u64`
120/// offset of `Blob`.
121fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
122    let overage = offset % ITEM_ALIGNMENT;
123    if overage != 0 {
124        offset += ITEM_ALIGNMENT - overage;
125    }
126    let offset = offset / ITEM_ALIGNMENT;
127    let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
128    Ok(aligned_offset)
129}
130
131/// Implementation of `Journal` storage.
132pub struct Journal<B: Blob, E: Storage<B> + Metrics> {
133    context: E,
134    cfg: Config,
135
136    oldest_allowed: Option<u64>,
137
138    blobs: BTreeMap<u64, B>,
139
140    tracked: Gauge,
141    synced: Counter,
142    pruned: Counter,
143}
144
145impl<B: Blob, E: Storage<B> + Metrics> Journal<B, E> {
146    /// Initialize a new `Journal` instance.
147    ///
148    /// All backing blobs are opened but not read during
149    /// initialization. The `replay` method can be used
150    /// to iterate over all items in the `Journal`.
151    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
152        // Iterate over blobs in partition
153        let mut blobs = BTreeMap::new();
154        let stored_blobs = match context.scan(&cfg.partition).await {
155            Ok(blobs) => blobs,
156            Err(RError::PartitionMissing(_)) => Vec::new(),
157            Err(err) => return Err(Error::Runtime(err)),
158        };
159        for name in stored_blobs {
160            let blob = context.open(&cfg.partition, &name).await?;
161            let hex_name = hex(&name);
162            let section = match name.try_into() {
163                Ok(section) => u64::from_be_bytes(section),
164                Err(_) => return Err(Error::InvalidBlobName(hex_name)),
165            };
166            debug!(section, blob = hex_name, "loaded section");
167            blobs.insert(section, blob);
168        }
169
170        // Initialize metrics
171        let tracked = Gauge::default();
172        let synced = Counter::default();
173        let pruned = Counter::default();
174        context.register("tracked", "Number of blobs", tracked.clone());
175        context.register("synced", "Number of syncs", synced.clone());
176        context.register("pruned", "Number of blobs pruned", pruned.clone());
177        tracked.set(blobs.len() as i64);
178
179        // Create journal instance
180        Ok(Self {
181            context,
182            cfg,
183
184            oldest_allowed: None,
185
186            blobs,
187            tracked,
188            synced,
189            pruned,
190        })
191    }
192
193    /// Ensures that a pruned section is not accessed.
194    fn prune_guard(&self, section: u64, inclusive: bool) -> Result<(), Error> {
195        if let Some(oldest_allowed) = self.oldest_allowed {
196            if section < oldest_allowed || (inclusive && section <= oldest_allowed) {
197                return Err(Error::AlreadyPrunedToSection(oldest_allowed));
198            }
199        }
200        Ok(())
201    }
202
203    /// Reads an item from the blob at the given offset.
204    async fn read(blob: &B, offset: u32) -> Result<(u32, u32, Bytes), Error> {
205        // Read item size
206        let offset = offset as u64 * ITEM_ALIGNMENT;
207        let mut size = [0u8; 4];
208        blob.read_at(&mut size, offset).await?;
209        let size = u32::from_be_bytes(size);
210        let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
211
212        // Read item
213        let mut item = vec![0u8; size as usize];
214        blob.read_at(&mut item, offset).await?;
215        let offset = offset
216            .checked_add(size as u64)
217            .ok_or(Error::OffsetOverflow)?;
218
219        // Read checksum
220        let mut stored_checksum = [0u8; 4];
221        blob.read_at(&mut stored_checksum, offset).await?;
222        let stored_checksum = u32::from_be_bytes(stored_checksum);
223        let checksum = crc32fast::hash(&item);
224        if checksum != stored_checksum {
225            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
226        }
227        let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
228
229        // Compute next offset
230        let aligned_offset = compute_next_offset(offset)?;
231
232        // Return item
233        Ok((aligned_offset, size, Bytes::from(item)))
234    }
235
236    /// Read `prefix` bytes from the blob at the given offset.
237    ///
238    /// # Warning
239    ///
240    /// This method bypasses the checksum verification and the caller is responsible for ensuring
241    /// the integrity of any data read. If `prefix` exceeds the size of an item (and runs over the blob
242    /// length), it will lead to unintentional truncation of data.
243    async fn read_prefix(blob: &B, offset: u32, prefix: u32) -> Result<(u32, u32, Bytes), Error> {
244        // Read item size and first `prefix` bytes
245        let offset = offset as u64 * ITEM_ALIGNMENT;
246        let mut buf = vec![0u8; 4 + prefix as usize];
247        blob.read_at(&mut buf, offset).await?;
248
249        // Get item size to compute next offset
250        let size = u32::from_be_bytes(buf[..4].try_into().unwrap());
251
252        // Get item prefix
253        //
254        // We don't compute the checksum here nor do we verify that the bytes
255        // requested is less than the item size.
256        let item_prefix = Bytes::from(buf[4..].to_vec());
257
258        // Compute next offset
259        let offset = offset
260            .checked_add(4)
261            .ok_or(Error::OffsetOverflow)?
262            .checked_add(size as u64)
263            .ok_or(Error::OffsetOverflow)?
264            .checked_add(4)
265            .ok_or(Error::OffsetOverflow)?;
266        let aligned_offset = compute_next_offset(offset)?;
267
268        // Return item
269        Ok((aligned_offset, size, item_prefix))
270    }
271
272    /// Read an item from the blob assuming it is of `exact` length. This method verifies the
273    /// checksum of the item.
274    ///
275    /// # Warning
276    ///
277    /// This method assumes the caller knows the exact size of the item (either because
278    /// they store fixed-size items or they previously indexed the size). If an incorrect
279    /// `exact` is provided, the method will likely return an error (as integrity is verified).
280    async fn read_exact(blob: &B, offset: u32, exact: u32) -> Result<(u32, Bytes), Error> {
281        // Read all of the item into one buffer
282        let offset = offset as u64 * ITEM_ALIGNMENT;
283        let mut buf = vec![0u8; 4 + exact as usize + 4];
284        blob.read_at(&mut buf, offset).await?;
285
286        // Check size
287        let size = u32::from_be_bytes(buf[..4].try_into().unwrap());
288        if size != exact {
289            return Err(Error::UnexpectedSize(size, exact));
290        }
291
292        // Get item
293        let item = Bytes::from(buf[4..4 + exact as usize].to_vec());
294
295        // Verify integrity
296        let stored_checksum = u32::from_be_bytes(buf[4 + exact as usize..].try_into().unwrap());
297        let checksum = crc32fast::hash(&item);
298        if checksum != stored_checksum {
299            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
300        }
301
302        // Compute next offset
303        let offset = offset
304            .checked_add(4)
305            .ok_or(Error::OffsetOverflow)?
306            .checked_add(exact as u64)
307            .ok_or(Error::OffsetOverflow)?
308            .checked_add(4)
309            .ok_or(Error::OffsetOverflow)?;
310        let aligned_offset = compute_next_offset(offset)?;
311
312        // Return item
313        Ok((aligned_offset, item))
314    }
315
316    /// Returns an unordered stream of all items in the journal.
317    ///
318    /// # Repair
319    ///
320    /// If any corrupted data is found, the stream will return an error.
321    ///
322    /// If any trailing data is found (i.e. misaligned entries), the journal will be truncated
323    /// to the last valid item. For this reason, it is recommended to call `replay` before
324    /// calling `append` (as data added to trailing bytes will fail checksum after restart).
325    ///
326    /// # Concurrency
327    ///
328    /// The `concurrency` parameter controls how many blobs are replayed concurrently. This can dramatically
329    /// speed up the replay process if the underlying storage supports concurrent reads across different
330    /// blobs.
331    ///
332    /// # Prefix
333    ///
334    /// If `prefix` is provided, the stream will only read up to `prefix` bytes of each item. Consequently,
335    /// this means we will not compute a checksum of the entire data and it is up to the caller to deal
336    /// with the consequences of this.
337    ///
338    /// Reading `prefix` bytes and skipping ahead to a future location in a blob is the theoretically optimal
339    /// way to read only what is required from storage, however, different storage implementations may take
340    /// the opportunity to readahead past what is required (needlessly). If the underlying storage can be tuned
341    /// for random access prior to invoking replay, it may lead to less IO.
342    pub async fn replay(
343        &mut self,
344        concurrency: usize,
345        prefix: Option<u32>,
346    ) -> Result<impl Stream<Item = Result<(u64, u32, u32, Bytes), Error>> + '_, Error> {
347        // Collect all blobs to replay
348        let mut blobs = Vec::with_capacity(self.blobs.len());
349        for (section, blob) in self.blobs.iter() {
350            let len = blob.len().await?;
351            let aligned_len = compute_next_offset(len)?;
352            blobs.push((*section, blob, aligned_len));
353        }
354
355        // Replay all blobs concurrently and stream items as they are read (to avoid
356        // occupying too much memory with buffered data)
357        Ok(stream::iter(blobs)
358            .map(move |(section, blob, len)| async move {
359                stream::unfold(
360                    (section, blob, 0u32),
361                    move |(section, blob, offset)| async move {
362                        // Check if we are at the end of the blob
363                        if offset == len {
364                            return None;
365                        }
366
367                        // Get next item
368                        let mut read = match prefix {
369                            Some(prefix) => Self::read_prefix(blob, offset, prefix).await,
370                            None => Self::read(blob, offset).await,
371                        };
372
373                        // Ensure a full read wouldn't put us past the end of the blob
374                        if let Ok((next_offset, _, _)) = read {
375                            if next_offset > len {
376                                read = Err(Error::Runtime(RError::BlobInsufficientLength));
377                            }
378                        };
379
380                        // Handle read result
381                        match read {
382                            Ok((next_offset, item_size, item)) => {
383                                trace!(blob = section, cursor = offset, len, "replayed item");
384                                Some((
385                                    Ok((section, offset, item_size, item)),
386                                    (section, blob, next_offset),
387                                ))
388                            }
389                            Err(Error::ChecksumMismatch(expected, found)) => {
390                                // If we encounter corruption, we don't try to fix it.
391                                warn!(
392                                    blob = section,
393                                    cursor = offset,
394                                    expected,
395                                    found,
396                                    "corruption detected"
397                                );
398                                Some((
399                                    Err(Error::ChecksumMismatch(expected, found)),
400                                    (section, blob, len),
401                                ))
402                            }
403                            Err(Error::Runtime(RError::BlobInsufficientLength)) => {
404                                // If we encounter trailing bytes, we prune to the last
405                                // valid item. This can happen during an unclean file close (where
406                                // pending data is not fully synced to disk).
407                                warn!(
408                                    blob = section,
409                                    new_size = offset,
410                                    old_size = len,
411                                    "trailing bytes detected: truncating"
412                                );
413                                blob.truncate(offset as u64 * ITEM_ALIGNMENT).await.ok()?;
414                                blob.sync().await.ok()?;
415                                None
416                            }
417                            Err(err) => Some((Err(err), (section, blob, len))),
418                        }
419                    },
420                )
421            })
422            .buffer_unordered(concurrency)
423            .flatten())
424    }
425
426    /// Appends an item to `Journal` in a given `section`.
427    ///
428    /// # Warning
429    ///
430    /// If there exist trailing bytes in the `Blob` of a particular `section` and
431    /// `replay` is not called before this, it is likely that subsequent data added
432    /// to the `Blob` will be considered corrupted (as the trailing bytes will fail
433    /// the checksum verification). It is recommended to call `replay` before calling
434    /// `append` to prevent this.
435    pub async fn append(&mut self, section: u64, item: Bytes) -> Result<u32, Error> {
436        // Check last pruned
437        self.prune_guard(section, false)?;
438
439        // Ensure item is not too large
440        let item_len = item.len();
441        let len = 4 + item_len + 4;
442        let item_len = match item_len.try_into() {
443            Ok(len) => len,
444            Err(_) => return Err(Error::ItemTooLarge(item_len)),
445        };
446
447        // Get existing blob or create new one
448        let blob = match self.blobs.entry(section) {
449            Entry::Occupied(entry) => entry.into_mut(),
450            Entry::Vacant(entry) => {
451                let name = section.to_be_bytes();
452                let blob = self.context.open(&self.cfg.partition, &name).await?;
453                self.tracked.inc();
454                entry.insert(blob)
455            }
456        };
457
458        // Populate buffer
459        let mut buf = Vec::with_capacity(len);
460        buf.put_u32(item_len);
461        let checksum = crc32fast::hash(&item);
462        buf.put(item);
463        buf.put_u32(checksum);
464
465        // Append item to blob
466        let cursor = blob.len().await?;
467        let offset = compute_next_offset(cursor)?;
468        blob.write_at(&buf, offset as u64 * ITEM_ALIGNMENT).await?;
469        trace!(blob = section, previous_len = len, offset, "appended item");
470        Ok(offset)
471    }
472
473    /// Retrieves the first `prefix` bytes of an item from `Journal` at a given `section` and `offset`.
474    ///
475    /// This method bypasses the checksum verification and the caller is responsible for ensuring
476    /// the integrity of any data read.
477    pub async fn get_prefix(
478        &self,
479        section: u64,
480        offset: u32,
481        prefix: u32,
482    ) -> Result<Option<Bytes>, Error> {
483        self.prune_guard(section, false)?;
484        let blob = match self.blobs.get(&section) {
485            Some(blob) => blob,
486            None => return Ok(None),
487        };
488        let (_, _, item) = Self::read_prefix(blob, offset, prefix).await?;
489        Ok(Some(item))
490    }
491
492    /// Retrieves an item from `Journal` at a given `section` and `offset`.
493    ///
494    /// If `exact` is provided, it is assumed the item is of size `exact` (which allows
495    /// the item to be read in a single read). If `exact` is provided, the checksum of the
496    /// data is still verified.
497    pub async fn get(
498        &self,
499        section: u64,
500        offset: u32,
501        exact: Option<u32>,
502    ) -> Result<Option<Bytes>, Error> {
503        self.prune_guard(section, false)?;
504        let blob = match self.blobs.get(&section) {
505            Some(blob) => blob,
506            None => return Ok(None),
507        };
508
509        // If we have an exact size, we can read the item in one go.
510        if let Some(exact) = exact {
511            let (_, item) = Self::read_exact(blob, offset, exact).await?;
512            return Ok(Some(item));
513        }
514
515        // Perform a multi-op read.
516        let (_, _, item) = Self::read(blob, offset).await?;
517        Ok(Some(item))
518    }
519
520    /// Ensures that all data in a given `section` is synced to the underlying store.
521    ///
522    /// If the `section` does not exist, no error will be returned.
523    pub async fn sync(&self, section: u64) -> Result<(), Error> {
524        self.prune_guard(section, false)?;
525        let blob = match self.blobs.get(&section) {
526            Some(blob) => blob,
527            None => return Ok(()),
528        };
529        self.synced.inc();
530        blob.sync().await.map_err(Error::Runtime)
531    }
532
533    /// Prunes all `sections` less than `min`.
534    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
535        // Check if we already ran this prune
536        self.prune_guard(min, true)?;
537
538        // Prune any blobs that are smaller than the minimum
539        while let Some((&section, _)) = self.blobs.first_key_value() {
540            // Stop pruning if we reach the minimum
541            if section >= min {
542                break;
543            }
544
545            // Remove and close blob
546            let blob = self.blobs.remove(&section).unwrap();
547            blob.close().await?;
548
549            // Remove blob from storage
550            self.context
551                .remove(&self.cfg.partition, Some(&section.to_be_bytes()))
552                .await?;
553            debug!(blob = section, "pruned blob");
554            self.tracked.dec();
555            self.pruned.inc();
556        }
557
558        // Update oldest allowed
559        self.oldest_allowed = Some(min);
560        Ok(())
561    }
562
563    /// Closes all open sections.
564    pub async fn close(self) -> Result<(), Error> {
565        for (section, blob) in self.blobs.into_iter() {
566            blob.close().await?;
567            debug!(blob = section, "closed blob");
568        }
569        Ok(())
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576    use bytes::{BufMut, Bytes};
577    use commonware_macros::test_traced;
578    use commonware_runtime::{deterministic::Executor, Blob, Error as RError, Runner, Storage};
579    use futures::{pin_mut, StreamExt};
580    use prometheus_client::registry::Metric;
581
582    #[test_traced]
583    fn test_journal_append_and_read() {
584        // Initialize the deterministic context
585        let (executor, context, _) = Executor::default();
586
587        // Start the test within the executor
588        executor.start(async move {
589            // Initialize the journal
590            let cfg = Config {
591                partition: "test_partition".into(),
592            };
593            let index = 1u64;
594            let data = Bytes::from("Test data");
595            let mut journal = Journal::init(context.clone(), cfg.clone())
596                .await
597                .expect("Failed to initialize journal");
598
599            // Append an item to the journal
600            journal
601                .append(index, data.clone())
602                .await
603                .expect("Failed to append data");
604
605            // Check metrics
606            let buffer = context.encode();
607            assert!(buffer.contains("tracked 1"));
608
609            // Close the journal
610            journal.close().await.expect("Failed to close journal");
611
612            // Re-initialize the journal to simulate a restart
613            let cfg = Config {
614                partition: "test_partition".into(),
615            };
616            let mut journal = Journal::init(context.clone(), cfg.clone())
617                .await
618                .expect("Failed to re-initialize journal");
619
620            // Replay the journal and collect items
621            let mut items = Vec::new();
622            let stream = journal
623                .replay(1, None)
624                .await
625                .expect("unable to setup replay");
626            pin_mut!(stream);
627            while let Some(result) = stream.next().await {
628                match result {
629                    Ok((blob_index, _, full_len, item)) => {
630                        assert_eq!(full_len as usize, item.len());
631                        items.push((blob_index, item))
632                    }
633                    Err(err) => panic!("Failed to read item: {}", err),
634                }
635            }
636
637            // Verify that the item was replayed correctly
638            assert_eq!(items.len(), 1);
639            assert_eq!(items[0].0, index);
640            assert_eq!(items[0].1, data);
641
642            // Check metrics
643            let buffer = context.encode();
644            assert!(buffer.contains("tracked 1"));
645        });
646    }
647
648    #[test_traced]
649    fn test_journal_multiple_appends_and_reads() {
650        // Initialize the deterministic context
651        let (executor, context, _) = Executor::default();
652
653        // Start the test within the executor
654        executor.start(async move {
655            // Create a journal configuration
656            let cfg = Config {
657                partition: "test_partition".into(),
658            };
659
660            // Initialize the journal
661            let mut journal = Journal::init(context.clone(), cfg.clone())
662                .await
663                .expect("Failed to initialize journal");
664
665            // Append multiple items to different blobs
666            let data_items = vec![
667                (1u64, Bytes::from("Data for blob 1")),
668                (1u64, Bytes::from("Data for blob 1, second item")),
669                (2u64, Bytes::from("Data for blob 2")),
670                (3u64, Bytes::from("Data for blob 3")),
671            ];
672            for (index, data) in &data_items {
673                journal
674                    .append(*index, data.clone())
675                    .await
676                    .expect("Failed to append data");
677                journal.sync(*index).await.expect("Failed to sync blob");
678            }
679
680            // Check metrics
681            let buffer = context.encode();
682            assert!(buffer.contains("tracked 3"));
683            assert!(buffer.contains("synced_total 4"));
684
685            // Close the journal
686            journal.close().await.expect("Failed to close journal");
687
688            // Re-initialize the journal to simulate a restart
689            let mut journal = Journal::init(context, cfg)
690                .await
691                .expect("Failed to re-initialize journal");
692
693            // Replay the journal and collect items
694            let mut items = Vec::new();
695            {
696                let stream = journal
697                    .replay(2, None)
698                    .await
699                    .expect("unable to setup replay");
700                pin_mut!(stream);
701                while let Some(result) = stream.next().await {
702                    match result {
703                        Ok((blob_index, _, full_len, item)) => {
704                            assert_eq!(full_len as usize, item.len());
705                            items.push((blob_index, item))
706                        }
707                        Err(err) => panic!("Failed to read item: {}", err),
708                    }
709                }
710            }
711
712            // Verify that all items were replayed correctly
713            assert_eq!(items.len(), data_items.len());
714            for ((expected_index, expected_data), (actual_index, actual_data)) in
715                data_items.iter().zip(items.iter())
716            {
717                assert_eq!(actual_index, expected_index);
718                assert_eq!(actual_data, expected_data);
719            }
720
721            // Replay just first bytes
722            {
723                let stream = journal
724                    .replay(2, Some(4))
725                    .await
726                    .expect("unable to setup replay");
727                pin_mut!(stream);
728                while let Some(result) = stream.next().await {
729                    match result {
730                        Ok((_, _, full_len, item)) => {
731                            assert_eq!(item, Bytes::from("Data"));
732                            assert!(full_len as usize > item.len());
733                        }
734                        Err(err) => panic!("Failed to read item: {}", err),
735                    }
736                }
737            }
738        });
739    }
740
741    #[test_traced]
742    fn test_journal_prune_blobs() {
743        // Initialize the deterministic context
744        let (executor, context, _) = Executor::default();
745
746        // Start the test within the executor
747        executor.start(async move {
748            // Create a journal configuration
749            let cfg = Config {
750                partition: "test_partition".into(),
751            };
752
753            // Initialize the journal
754            let mut journal = Journal::init(context.clone(), cfg.clone())
755                .await
756                .expect("Failed to initialize journal");
757
758            // Append items to multiple blobs
759            for index in 1u64..=5u64 {
760                let data = Bytes::from(format!("Data for blob {}", index));
761                journal
762                    .append(index, data)
763                    .await
764                    .expect("Failed to append data");
765                journal.sync(index).await.expect("Failed to sync blob");
766            }
767
768            // Add one item out-of-order
769            let data = Bytes::from("Data for blob 2, second item");
770            journal
771                .append(2u64, data)
772                .await
773                .expect("Failed to append data");
774            journal.sync(2u64).await.expect("Failed to sync blob");
775
776            // Prune blobs with indices less than 3
777            journal.prune(3).await.expect("Failed to prune blobs");
778
779            // Prune again with a section less than the previous one
780            let result = journal.prune(2).await;
781            assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
782
783            // Prune again with the same section
784            let result = journal.prune(3).await;
785            assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
786
787            // Check metrics
788            let buffer = context.encode();
789            assert!(buffer.contains("pruned_total 2"));
790
791            // Close the journal
792            journal.close().await.expect("Failed to close journal");
793
794            // Re-initialize the journal to simulate a restart
795            let mut journal = Journal::init(context.clone(), cfg.clone())
796                .await
797                .expect("Failed to re-initialize journal");
798
799            // Replay the journal and collect items
800            let mut items = Vec::new();
801            {
802                let stream = journal
803                    .replay(1, None)
804                    .await
805                    .expect("unable to setup replay");
806                pin_mut!(stream);
807                while let Some(result) = stream.next().await {
808                    match result {
809                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
810                        Err(err) => panic!("Failed to read item: {}", err),
811                    }
812                }
813            }
814
815            // Verify that items from blobs 1 and 2 are not present
816            assert_eq!(items.len(), 3);
817            let expected_indices = [3u64, 4u64, 5u64];
818            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
819                assert_eq!(item.0, *expected_index);
820            }
821
822            // Prune all blobs
823            journal.prune(6).await.expect("Failed to prune blobs");
824
825            // Close the journal
826            journal.close().await.expect("Failed to close journal");
827
828            // Ensure no remaining blobs exist
829            //
830            // Note: We don't remove the partition, so this does not error
831            // and instead returns an empty list of blobs.
832            assert!(context
833                .scan(&cfg.partition)
834                .await
835                .expect("Failed to list blobs")
836                .is_empty());
837        });
838    }
839
840    #[test_traced]
841    fn test_journal_with_invalid_blob_name() {
842        // Initialize the deterministic context
843        let (executor, context, _) = Executor::default();
844
845        // Start the test within the executor
846        executor.start(async move {
847            // Create a journal configuration
848            let cfg = Config {
849                partition: "test_partition".into(),
850            };
851
852            // Manually create a blob with an invalid name (not 8 bytes)
853            let invalid_blob_name = b"invalid"; // Less than 8 bytes
854            let blob = context
855                .open(&cfg.partition, invalid_blob_name)
856                .await
857                .expect("Failed to create blob with invalid name");
858            blob.close().await.expect("Failed to close blob");
859
860            // Attempt to initialize the journal
861            let result = Journal::init(context, cfg).await;
862
863            // Expect an error
864            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
865        });
866    }
867
868    fn journal_read_size_missing(exact: Option<u32>) {
869        // Initialize the deterministic context
870        let (executor, context, _) = Executor::default();
871
872        // Start the test within the executor
873        executor.start(async move {
874            // Create a journal configuration
875            let cfg = Config {
876                partition: "test_partition".into(),
877            };
878
879            // Manually create a blob with incomplete size data
880            let section = 1u64;
881            let blob_name = section.to_be_bytes();
882            let blob = context
883                .open(&cfg.partition, &blob_name)
884                .await
885                .expect("Failed to create blob");
886
887            // Write incomplete size data (less than 4 bytes)
888            let incomplete_data = vec![0x00, 0x01]; // Less than 4 bytes
889            blob.write_at(&incomplete_data, 0)
890                .await
891                .expect("Failed to write incomplete data");
892            blob.close().await.expect("Failed to close blob");
893
894            // Initialize the journal
895            let mut journal = Journal::init(context, cfg)
896                .await
897                .expect("Failed to initialize journal");
898
899            // Attempt to replay the journal
900            let stream = journal
901                .replay(1, exact)
902                .await
903                .expect("unable to setup replay");
904            pin_mut!(stream);
905            let mut items = Vec::new();
906            while let Some(result) = stream.next().await {
907                match result {
908                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
909                    Err(err) => panic!("Failed to read item: {}", err),
910                }
911            }
912            assert!(items.is_empty());
913        });
914    }
915
916    #[test_traced]
917    fn test_journal_read_size_missing_no_exact() {
918        journal_read_size_missing(None);
919    }
920
921    #[test_traced]
922    fn test_journal_read_size_missing_with_exact() {
923        journal_read_size_missing(Some(1));
924    }
925
926    fn journal_read_item_missing(exact: Option<u32>) {
927        // Initialize the deterministic context
928        let (executor, context, _) = Executor::default();
929
930        // Start the test within the executor
931        executor.start(async move {
932            // Create a journal configuration
933            let cfg = Config {
934                partition: "test_partition".into(),
935            };
936
937            // Manually create a blob with missing item data
938            let section = 1u64;
939            let blob_name = section.to_be_bytes();
940            let blob = context
941                .open(&cfg.partition, &blob_name)
942                .await
943                .expect("Failed to create blob");
944
945            // Write size but no item data
946            let item_size: u32 = 10; // Size of the item
947            let mut buf = Vec::new();
948            buf.put_u32(item_size);
949            let data = [2u8; 5];
950            buf.put_slice(&data);
951            blob.write_at(&buf, 0)
952                .await
953                .expect("Failed to write item size");
954            blob.close().await.expect("Failed to close blob");
955
956            // Initialize the journal
957            let mut journal = Journal::init(context, cfg)
958                .await
959                .expect("Failed to initialize journal");
960
961            // Attempt to replay the journal
962            let stream = journal
963                .replay(1, exact)
964                .await
965                .expect("unable to setup replay");
966
967            pin_mut!(stream);
968            let mut items = Vec::new();
969            while let Some(result) = stream.next().await {
970                match result {
971                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
972                    Err(err) => panic!("Failed to read item: {}", err),
973                }
974            }
975            assert!(items.is_empty());
976        });
977    }
978
979    #[test_traced]
980    fn test_journal_read_item_missing_no_exact() {
981        journal_read_item_missing(None);
982    }
983
984    #[test_traced]
985    fn test_journal_read_item_missing_with_exact() {
986        journal_read_item_missing(Some(1));
987    }
988
989    #[test_traced]
990    fn test_journal_read_checksum_missing() {
991        // Initialize the deterministic context
992        let (executor, context, _) = Executor::default();
993
994        // Start the test within the executor
995        executor.start(async move {
996            // Create a journal configuration
997            let cfg = Config {
998                partition: "test_partition".into(),
999            };
1000
1001            // Manually create a blob with missing checksum
1002            let section = 1u64;
1003            let blob_name = section.to_be_bytes();
1004            let blob = context
1005                .open(&cfg.partition, &blob_name)
1006                .await
1007                .expect("Failed to create blob");
1008
1009            // Prepare item data
1010            let item_data = b"Test data";
1011            let item_size = item_data.len() as u32;
1012
1013            // Write size
1014            let mut offset = 0;
1015            blob.write_at(&item_size.to_be_bytes(), offset)
1016                .await
1017                .expect("Failed to write item size");
1018            offset += 4;
1019
1020            // Write item data
1021            blob.write_at(item_data, offset)
1022                .await
1023                .expect("Failed to write item data");
1024            // Do not write checksum (omit it)
1025
1026            blob.close().await.expect("Failed to close blob");
1027
1028            // Initialize the journal
1029            let mut journal = Journal::init(context, cfg)
1030                .await
1031                .expect("Failed to initialize journal");
1032
1033            // Attempt to replay the journal
1034            //
1035            // This will truncate the leftover bytes from our manual write.
1036            let stream = journal
1037                .replay(1, None)
1038                .await
1039                .expect("unable to setup replay");
1040            pin_mut!(stream);
1041            let mut items = Vec::new();
1042            while let Some(result) = stream.next().await {
1043                match result {
1044                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1045                    Err(err) => panic!("Failed to read item: {}", err),
1046                }
1047            }
1048            assert!(items.is_empty());
1049        });
1050    }
1051
1052    #[test_traced]
1053    fn test_journal_read_checksum_mismatch() {
1054        // Initialize the deterministic context
1055        let (executor, context, _) = Executor::default();
1056
1057        // Start the test within the executor
1058        executor.start(async move {
1059            // Create a journal configuration
1060            let cfg = Config {
1061                partition: "test_partition".into(),
1062            };
1063
1064            // Manually create a blob with incorrect checksum
1065            let section = 1u64;
1066            let blob_name = section.to_be_bytes();
1067            let blob = context
1068                .open(&cfg.partition, &blob_name)
1069                .await
1070                .expect("Failed to create blob");
1071
1072            // Prepare item data
1073            let item_data = b"Test data";
1074            let item_size = item_data.len() as u32;
1075            let incorrect_checksum: u32 = 0xDEADBEEF;
1076
1077            // Write size
1078            let mut offset = 0;
1079            blob.write_at(&item_size.to_be_bytes(), offset)
1080                .await
1081                .expect("Failed to write item size");
1082            offset += 4;
1083
1084            // Write item data
1085            blob.write_at(item_data, offset)
1086                .await
1087                .expect("Failed to write item data");
1088            offset += item_data.len() as u64;
1089
1090            // Write incorrect checksum
1091            blob.write_at(&incorrect_checksum.to_be_bytes(), offset)
1092                .await
1093                .expect("Failed to write incorrect checksum");
1094
1095            blob.close().await.expect("Failed to close blob");
1096
1097            // Initialize the journal
1098            let mut journal = Journal::init(context, cfg)
1099                .await
1100                .expect("Failed to initialize journal");
1101
1102            // Attempt to replay the journal
1103            let stream = journal
1104                .replay(1, None)
1105                .await
1106                .expect("unable to setup replay");
1107            pin_mut!(stream);
1108            let mut items = Vec::new();
1109            let mut got_checksum_error = false;
1110            while let Some(result) = stream.next().await {
1111                match result {
1112                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1113                    Err(err) => {
1114                        assert!(matches!(err, Error::ChecksumMismatch(_, _)));
1115                        got_checksum_error = true;
1116                        // We explicitly don't return or break here to test that we won't end up in
1117                        // an infinite loop if the replay caller doesn't abort on error.
1118                    }
1119                }
1120            }
1121            assert!(got_checksum_error, "expected checksum mismatch error");
1122        });
1123    }
1124
1125    #[test_traced]
1126    fn test_journal_handling_truncated_data() {
1127        // Initialize the deterministic context
1128        let (executor, context, _) = Executor::default();
1129
1130        // Start the test within the executor
1131        executor.start(async move {
1132            // Create a journal configuration
1133            let cfg = Config {
1134                partition: "test_partition".into(),
1135            };
1136
1137            // Initialize the journal
1138            let mut journal = Journal::init(context.clone(), cfg.clone())
1139                .await
1140                .expect("Failed to initialize journal");
1141
1142            // Append 1 item to the first index
1143            journal
1144                .append(1, Bytes::from("Valid data"))
1145                .await
1146                .expect("Failed to append data");
1147
1148            // Append multiple items to the second index
1149            let data_items = vec![
1150                (2u64, Bytes::from("Valid data")),
1151                (2u64, Bytes::from("Valid data, second item")),
1152                (2u64, Bytes::from("Valid data, third item")),
1153            ];
1154            for (index, data) in &data_items {
1155                journal
1156                    .append(*index, data.clone())
1157                    .await
1158                    .expect("Failed to append data");
1159                journal.sync(*index).await.expect("Failed to sync blob");
1160            }
1161
1162            // Close the journal
1163            journal.close().await.expect("Failed to close journal");
1164
1165            // Manually corrupt the end of the second blob
1166            let blob = context
1167                .open(&cfg.partition, &2u64.to_be_bytes())
1168                .await
1169                .expect("Failed to open blob");
1170            let blob_len = blob.len().await.expect("Failed to get blob length");
1171            blob.truncate(blob_len - 4)
1172                .await
1173                .expect("Failed to corrupt blob");
1174            blob.close().await.expect("Failed to close blob");
1175
1176            // Re-initialize the journal to simulate a restart
1177            let mut journal = Journal::init(context, cfg)
1178                .await
1179                .expect("Failed to re-initialize journal");
1180
1181            // Attempt to replay the journal
1182            let mut items = Vec::new();
1183            let stream = journal
1184                .replay(1, None)
1185                .await
1186                .expect("unable to setup replay");
1187            pin_mut!(stream);
1188            while let Some(result) = stream.next().await {
1189                match result {
1190                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1191                    Err(err) => panic!("Failed to read item: {}", err),
1192                }
1193            }
1194
1195            // Verify that only non-corrupted items were replayed
1196            assert_eq!(items.len(), 3);
1197            assert_eq!(items[0].0, 1);
1198            assert_eq!(items[0].1, Bytes::from("Valid data"));
1199            assert_eq!(items[1].0, data_items[0].0);
1200            assert_eq!(items[1].1, data_items[0].1);
1201            assert_eq!(items[2].0, data_items[1].0);
1202            assert_eq!(items[2].1, data_items[1].1);
1203        });
1204    }
1205
1206    // Define `MockBlob` that returns an offset length that should overflow
1207    #[derive(Clone)]
1208    struct MockBlob {
1209        len: u64,
1210    }
1211
1212    impl Blob for MockBlob {
1213        async fn len(&self) -> Result<u64, commonware_runtime::Error> {
1214            // Return a length that will cause offset overflow
1215            Ok(self.len)
1216        }
1217
1218        async fn read_at(&self, _buf: &mut [u8], _offset: u64) -> Result<(), RError> {
1219            Ok(())
1220        }
1221
1222        async fn write_at(&self, _buf: &[u8], _offset: u64) -> Result<(), RError> {
1223            Ok(())
1224        }
1225
1226        async fn truncate(&self, _len: u64) -> Result<(), RError> {
1227            Ok(())
1228        }
1229
1230        async fn sync(&self) -> Result<(), RError> {
1231            Ok(())
1232        }
1233
1234        async fn close(self) -> Result<(), RError> {
1235            Ok(())
1236        }
1237    }
1238
1239    // Define `MockStorage` that returns `MockBlob`
1240    #[derive(Clone)]
1241    struct MockStorage {
1242        len: u64,
1243    }
1244
1245    impl Storage<MockBlob> for MockStorage {
1246        async fn open(&self, _partition: &str, _name: &[u8]) -> Result<MockBlob, RError> {
1247            Ok(MockBlob { len: self.len })
1248        }
1249
1250        async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1251            Ok(())
1252        }
1253
1254        async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1255            Ok(vec![])
1256        }
1257    }
1258
1259    impl Metrics for MockStorage {
1260        fn with_label(&self, _: &str) -> Self {
1261            self.clone()
1262        }
1263
1264        fn label(&self) -> String {
1265            String::new()
1266        }
1267
1268        fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1269
1270        fn encode(&self) -> String {
1271            String::new()
1272        }
1273    }
1274
1275    // Define the `INDEX_ALIGNMENT` again explicitly to ensure we catch any accidental
1276    // changes to the value
1277    const INDEX_ALIGNMENT: u64 = 16;
1278
1279    #[test_traced]
1280    fn test_journal_large_offset() {
1281        // Initialize the deterministic context
1282        let (executor, _, _) = Executor::default();
1283        executor.start(async move {
1284            // Create journal
1285            let cfg = Config {
1286                partition: "partition".to_string(),
1287            };
1288            let context = MockStorage {
1289                len: u32::MAX as u64 * INDEX_ALIGNMENT, // can store up to u32::Max at the last offset
1290            };
1291            let mut journal = Journal::init(context, cfg).await.unwrap();
1292
1293            // Append data
1294            let data = Bytes::from("Test data");
1295            let result = journal
1296                .append(1, data)
1297                .await
1298                .expect("Failed to append data");
1299            assert_eq!(result, u32::MAX);
1300        });
1301    }
1302
1303    #[test_traced]
1304    fn test_journal_offset_overflow() {
1305        // Initialize the deterministic context
1306        let (executor, _, _) = Executor::default();
1307        executor.start(async move {
1308            // Create journal
1309            let cfg = Config {
1310                partition: "partition".to_string(),
1311            };
1312            let context = MockStorage {
1313                len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1314            };
1315            let mut journal = Journal::init(context, cfg).await.unwrap();
1316
1317            // Append data
1318            let data = Bytes::from("Test data");
1319            let result = journal.append(1, data).await;
1320            assert!(matches!(result, Err(Error::OffsetOverflow)));
1321        });
1322    }
1323}