commonware_storage/journal/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `variable::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular `Blob` in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+---+---+---+
15//! | 0 | 1 | 2 | 3 |    ...    | 8 | 9 |10 |11 |
16//! +---+---+---+---+---+---+---+---+---+---+---+
17//! |   Size (u32)  |   Data    |    C(u32)     |
18//! +---+---+---+---+---+---+---+---+---+---+---+
19//!
20//! C = CRC32(Size | Data)
21//! ```
22//!
23//! _To ensure data returned by `Journal` is correct, a checksum (CRC32) is stored at the end of
24//! each item. If the checksum of the read data does not match the stored checksum, an error is
25//! returned. This checksum is only verified when data is accessed and not at startup (which would
26//! require reading all data in `Journal`)._
27//!
28//! # Open Blobs
29//!
30//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
31//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
32//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
33//! `sections`.
34//!
35//! # Offset Alignment
36//!
37//! In practice, `Journal` users won't store `u64::MAX` bytes of data in a given `section` (the max
38//! `Offset` provided by `Blob`). To reduce the memory usage for tracking offsets within `Journal`,
39//! offsets are thus `u32` (4 bytes) and aligned to 16 bytes. This means that the maximum size of
40//! any `section` is `u32::MAX * 17 = ~70GB` bytes (the last offset item can store up to `u32::MAX`
41//! bytes). If more data is written to a `section` past this max, an `OffsetOverflow` error is
42//! returned.
43//!
44//! # Sync
45//!
46//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
47//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
48//! calling `close`, all pending data is automatically synced and any open blobs are dropped.
49//!
50//! # Pruning
51//!
52//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
53//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
54//! could be used, for example, by some blockchain application to prune old blocks.
55//!
56//! # Replay
57//!
58//! During application initialization, it is very common to replay data from `Journal` to recover
59//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
60//! method to produce a stream of all items in the `Journal` in order of their `section` and
61//! `offset`.
62//!
63//! # Exact Reads
64//!
65//! To allow for items to be fetched in a single disk operation, `Journal` allows callers to specify
66//! an `exact` parameter to the `get` method. This `exact` parameter must be cached by the caller
67//! (provided during `replay`) and usage of an incorrect `exact` value will result in undefined
68//! behavior.
69//!
70//! # Compression
71//!
72//! `Journal` supports optional compression using `zstd`. This can be enabled by setting the
73//! `compression` field in the `Config` struct to a valid `zstd` compression level. This setting can
74//! be changed between initializations of `Journal`, however, it must remain populated if any data
75//! was written with compression enabled.
76//!
77//! # Example
78//!
79//! ```rust
80//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
81//! use commonware_storage::journal::variable::{Journal, Config};
82//! use commonware_utils::NZUsize;
83//!
84//! let executor = deterministic::Runner::default();
85//! executor.start(|context| async move {
86//!     // Create a journal
87//!     let mut journal = Journal::init(context, Config{
88//!         partition: "partition".to_string(),
89//!         compression: None,
90//!         codec_config: (),
91//!         buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
92//!         write_buffer: NZUsize!(1024 * 1024),
93//!     }).await.unwrap();
94//!
95//!     // Append data to the journal
96//!     journal.append(1, 128).await.unwrap();
97//!
98//!     // Close the journal
99//!     journal.close().await.unwrap();
100//! });
101//! ```
102
103use super::Error;
104use bytes::BufMut;
105use commonware_codec::Codec;
106use commonware_runtime::{
107    buffer::{Append, PoolRef, Read},
108    Blob, Error as RError, Metrics, Storage,
109};
110use commonware_utils::hex;
111use futures::stream::{self, Stream, StreamExt};
112use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
113use std::{
114    collections::{btree_map::Entry, BTreeMap},
115    io::Cursor,
116    marker::PhantomData,
117    num::NonZeroUsize,
118};
119use tracing::{debug, trace, warn};
120use zstd::{bulk::compress, decode_all};
121
122/// Configuration for `Journal` storage.
123#[derive(Clone)]
124pub struct Config<C> {
125    /// The `commonware-runtime::Storage` partition to use
126    /// for storing journal blobs.
127    pub partition: String,
128
129    /// Optional compression level (using `zstd`) to apply to data before storing.
130    pub compression: Option<u8>,
131
132    /// The codec configuration to use for encoding and decoding items.
133    pub codec_config: C,
134
135    /// The buffer pool to use for caching data.
136    pub buffer_pool: PoolRef,
137
138    /// The size of the write buffer to use for each blob.
139    pub write_buffer: NonZeroUsize,
140}
141
142const ITEM_ALIGNMENT: u64 = 16;
143
144/// Computes the next offset for an item using the underlying `u64`
145/// offset of `Blob`.
146#[inline]
147fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
148    let overage = offset % ITEM_ALIGNMENT;
149    if overage != 0 {
150        offset += ITEM_ALIGNMENT - overage;
151    }
152    let offset = offset / ITEM_ALIGNMENT;
153    let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
154    Ok(aligned_offset)
155}
156
157/// Implementation of `Journal` storage.
158pub struct Journal<E: Storage + Metrics, V: Codec> {
159    context: E,
160    cfg: Config<V::Cfg>,
161
162    oldest_allowed: Option<u64>,
163
164    blobs: BTreeMap<u64, Append<E::Blob>>,
165
166    tracked: Gauge,
167    synced: Counter,
168    pruned: Counter,
169
170    _phantom: PhantomData<V>,
171}
172
173impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
174    /// Initialize a new `Journal` instance.
175    ///
176    /// All backing blobs are opened but not read during
177    /// initialization. The `replay` method can be used
178    /// to iterate over all items in the `Journal`.
179    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
180        // Iterate over blobs in partition
181        let mut blobs = BTreeMap::new();
182        let stored_blobs = match context.scan(&cfg.partition).await {
183            Ok(blobs) => blobs,
184            Err(RError::PartitionMissing(_)) => Vec::new(),
185            Err(err) => return Err(Error::Runtime(err)),
186        };
187        for name in stored_blobs {
188            let (blob, size) = context.open(&cfg.partition, &name).await?;
189            let hex_name = hex(&name);
190            let section = match name.try_into() {
191                Ok(section) => u64::from_be_bytes(section),
192                Err(_) => return Err(Error::InvalidBlobName(hex_name)),
193            };
194            debug!(section, blob = hex_name, size, "loaded section");
195            let blob = Append::new(blob, size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
196            blobs.insert(section, blob);
197        }
198
199        // Initialize metrics
200        let tracked = Gauge::default();
201        let synced = Counter::default();
202        let pruned = Counter::default();
203        context.register("tracked", "Number of blobs", tracked.clone());
204        context.register("synced", "Number of syncs", synced.clone());
205        context.register("pruned", "Number of blobs pruned", pruned.clone());
206        tracked.set(blobs.len() as i64);
207
208        // Create journal instance
209        Ok(Self {
210            context,
211            cfg,
212
213            oldest_allowed: None,
214
215            blobs,
216            tracked,
217            synced,
218            pruned,
219
220            _phantom: PhantomData,
221        })
222    }
223
224    /// Ensures that a pruned section is not accessed.
225    fn prune_guard(&self, section: u64, inclusive: bool) -> Result<(), Error> {
226        if let Some(oldest_allowed) = self.oldest_allowed {
227            if section < oldest_allowed || (inclusive && section <= oldest_allowed) {
228                return Err(Error::AlreadyPrunedToSection(oldest_allowed));
229            }
230        }
231        Ok(())
232    }
233
234    /// Reads an item from the blob at the given offset.
235    async fn read(
236        compressed: bool,
237        cfg: &V::Cfg,
238        blob: &Append<E::Blob>,
239        offset: u32,
240    ) -> Result<(u32, u32, V), Error> {
241        // Read item size
242        let mut hasher = crc32fast::Hasher::new();
243        let offset = offset as u64 * ITEM_ALIGNMENT;
244        let size = blob.read_at(vec![0; 4], offset).await?;
245        hasher.update(size.as_ref());
246        let size = u32::from_be_bytes(size.as_ref().try_into().unwrap()) as usize;
247        let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
248
249        // Read remaining
250        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
251        let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
252        let buf = buf.as_ref();
253        let offset = offset
254            .checked_add(buf_size as u64)
255            .ok_or(Error::OffsetOverflow)?;
256
257        // Read item
258        let item = &buf[..size];
259        hasher.update(item);
260
261        // Verify integrity
262        let checksum = hasher.finalize();
263        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
264        if checksum != stored_checksum {
265            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
266        }
267
268        // Compute next offset
269        let aligned_offset = compute_next_offset(offset)?;
270
271        // If compression is enabled, decompress the item
272        let item = if compressed {
273            let decompressed =
274                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
275            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
276        } else {
277            V::decode_cfg(item, cfg).map_err(Error::Codec)?
278        };
279
280        // Return item
281        Ok((aligned_offset, size as u32, item))
282    }
283
284    /// Helper function to read an item from a [Read].
285    async fn read_buffered(
286        reader: &mut Read<Append<E::Blob>>,
287        offset: u32,
288        cfg: &V::Cfg,
289        compressed: bool,
290    ) -> Result<(u32, u64, u32, V), Error> {
291        // Calculate absolute file offset from the item offset
292        let file_offset = offset as u64 * ITEM_ALIGNMENT;
293
294        // If we're not at the right position, seek to it
295        if reader.position() != file_offset {
296            reader.seek_to(file_offset).map_err(Error::Runtime)?;
297        }
298
299        // Read item size (4 bytes)
300        let mut hasher = crc32fast::Hasher::new();
301        let mut size_buf = [0u8; 4];
302        reader
303            .read_exact(&mut size_buf, 4)
304            .await
305            .map_err(Error::Runtime)?;
306        hasher.update(&size_buf);
307
308        // Read remaining
309        let size = u32::from_be_bytes(size_buf) as usize;
310        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
311        let mut buf = vec![0u8; buf_size];
312        reader
313            .read_exact(&mut buf, buf_size)
314            .await
315            .map_err(Error::Runtime)?;
316
317        // Read item
318        let item = &buf[..size];
319        hasher.update(item);
320
321        // Verify integrity
322        let checksum = hasher.finalize();
323        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
324        if checksum != stored_checksum {
325            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
326        }
327
328        // If compression is enabled, decompress the item
329        let item = if compressed {
330            let decompressed =
331                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
332            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
333        } else {
334            V::decode_cfg(item, cfg).map_err(Error::Codec)?
335        };
336
337        // Calculate next offset
338        let current_pos = reader.position();
339        let aligned_offset = compute_next_offset(current_pos)?;
340        Ok((aligned_offset, current_pos, size as u32, item))
341    }
342
343    /// Reads an item from the blob at the given offset and of a given size.
344    async fn read_exact(
345        compressed: bool,
346        cfg: &V::Cfg,
347        blob: &Append<E::Blob>,
348        offset: u32,
349        len: u32,
350    ) -> Result<V, Error> {
351        // Read buffer
352        let offset = offset as u64 * ITEM_ALIGNMENT;
353        let entry_size = 4 + len as usize + 4;
354        let buf = blob.read_at(vec![0u8; entry_size], offset).await?;
355
356        // Check size
357        let mut hasher = crc32fast::Hasher::new();
358        let disk_size = u32::from_be_bytes(buf.as_ref()[..4].try_into().unwrap());
359        hasher.update(&buf.as_ref()[..4]);
360        if disk_size != len {
361            return Err(Error::UnexpectedSize(disk_size, len));
362        }
363
364        // Verify integrity
365        let item = &buf.as_ref()[4..4 + len as usize];
366        hasher.update(item);
367        let checksum = hasher.finalize();
368        let stored_checksum =
369            u32::from_be_bytes(buf.as_ref()[4 + len as usize..].try_into().unwrap());
370        if checksum != stored_checksum {
371            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
372        }
373
374        // Decompress item
375        let item = if compressed {
376            decode_all(Cursor::new(item)).map_err(|_| Error::DecompressionFailed)?
377        } else {
378            item.to_vec()
379        };
380
381        // Return item
382        let item = V::decode_cfg(item.as_ref(), cfg).map_err(Error::Codec)?;
383        Ok(item)
384    }
385
386    /// Returns an ordered stream of all items in the journal, each as a tuple of (section, offset,
387    /// size, item).
388    ///
389    /// # Repair
390    ///
391    /// Like [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
392    /// and [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
393    /// the first invalid data read will be considered the new end of the journal (and the underlying [Blob] will be
394    /// truncated to the last valid item).
395    pub async fn replay(
396        &self,
397        buffer: NonZeroUsize,
398    ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
399        // Collect all blobs to replay
400        let codec_config = self.cfg.codec_config.clone();
401        let compressed = self.cfg.compression.is_some();
402        let mut blobs = Vec::with_capacity(self.blobs.len());
403        for (section, blob) in self.blobs.iter() {
404            let blob_size = blob.size().await;
405            let max_offset = compute_next_offset(blob_size)?;
406            blobs.push((
407                *section,
408                blob.clone(),
409                max_offset,
410                blob_size,
411                codec_config.clone(),
412                compressed,
413            ));
414        }
415
416        // Replay all blobs in order and stream items as they are read (to avoid occupying too much
417        // memory with buffered data)
418        Ok(
419            stream::iter(blobs).flat_map(
420                move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
421                    // Created buffered reader
422                    let reader = Read::new(blob, blob_size, buffer);
423
424                    // Read over the blob
425                    stream::unfold(
426                        (section, reader, 0u32, 0u64, codec_config, compressed),
427                        move |(
428                            section,
429                            mut reader,
430                            offset,
431                            valid_size,
432                            codec_config,
433                            compressed,
434                        )| async move {
435                            // Check if we are at the end of the blob
436                            if offset >= max_offset {
437                                return None;
438                            }
439
440                            // Read an item from the buffer
441                            match Self::read_buffered(
442                                &mut reader,
443                                offset,
444                                &codec_config,
445                                compressed,
446                            )
447                            .await
448                            {
449                                Ok((next_offset, next_valid_size, size, item)) => {
450                                    trace!(blob = section, cursor = offset, "replayed item");
451                                    Some((
452                                        Ok((section, offset, size, item)),
453                                        (
454                                            section,
455                                            reader,
456                                            next_offset,
457                                            next_valid_size,
458                                            codec_config,
459                                            compressed,
460                                        ),
461                                    ))
462                                }
463                                Err(Error::ChecksumMismatch(expected, found)) => {
464                                    // If we encounter corruption, we prune to the last valid item. This
465                                    // can happen during an unclean file close (where pending data is not
466                                    // fully synced to disk).
467                                    warn!(
468                                        blob = section,
469                                        new_offset = offset,
470                                        new_size = valid_size,
471                                        expected,
472                                        found,
473                                        "corruption detected: truncating"
474                                    );
475                                    reader.resize(valid_size).await.ok()?;
476                                    None
477                                }
478                                Err(Error::Runtime(RError::BlobInsufficientLength)) => {
479                                    // If we encounter trailing bytes, we prune to the last
480                                    // valid item. This can happen during an unclean file close (where
481                                    // pending data is not fully synced to disk).
482                                    warn!(
483                                        blob = section,
484                                        new_offset = offset,
485                                        new_size = valid_size,
486                                        "trailing bytes detected: truncating"
487                                    );
488                                    reader.resize(valid_size).await.ok()?;
489                                    None
490                                }
491                                Err(err) => {
492                                    // If we encounter an unexpected error, return it without attempting
493                                    // to fix anything.
494                                    warn!(
495                                        blob = section,
496                                        cursor = offset,
497                                        ?err,
498                                        "unexpected error"
499                                    );
500                                    Some((
501                                        Err(err),
502                                        (
503                                            section,
504                                            reader,
505                                            offset,
506                                            valid_size,
507                                            codec_config,
508                                            compressed,
509                                        ),
510                                    ))
511                                }
512                            }
513                        },
514                    )
515                },
516            ),
517        )
518    }
519
520    /// Appends an item to `Journal` in a given `section`, returning the offset
521    /// where the item was written and the size of the item (which may now be smaller
522    /// than the encoded size from the codec, if compression is enabled).
523    ///
524    /// # Warning
525    ///
526    /// If there exist trailing bytes in the `Blob` of a particular `section` and
527    /// `replay` is not called before this, it is likely that subsequent data added
528    /// to the `Blob` will be considered corrupted (as the trailing bytes will fail
529    /// the checksum verification). It is recommended to call `replay` before calling
530    /// `append` to prevent this.
531    pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
532        // Check last pruned
533        self.prune_guard(section, false)?;
534
535        // Create item
536        let encoded = item.encode();
537        let encoded = if let Some(compression) = self.cfg.compression {
538            compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
539        } else {
540            encoded.into()
541        };
542
543        // Ensure item is not too large
544        let item_len = encoded.len();
545        let entry_len = 4 + item_len + 4;
546        let item_len = match item_len.try_into() {
547            Ok(len) => len,
548            Err(_) => return Err(Error::ItemTooLarge(item_len)),
549        };
550
551        // Get existing blob or create new one
552        let blob = match self.blobs.entry(section) {
553            Entry::Occupied(entry) => entry.into_mut(),
554            Entry::Vacant(entry) => {
555                let name = section.to_be_bytes();
556                let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
557                let blob = Append::new(
558                    blob,
559                    size,
560                    self.cfg.write_buffer,
561                    self.cfg.buffer_pool.clone(),
562                )
563                .await?;
564                self.tracked.inc();
565                entry.insert(blob)
566            }
567        };
568
569        // Calculate alignment
570        let cursor = blob.size().await;
571        let offset = compute_next_offset(cursor)?;
572        let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
573        let padding = (aligned_cursor - cursor) as usize;
574
575        // Populate buffer
576        let mut buf = Vec::with_capacity(padding + entry_len);
577
578        // Add padding bytes if necessary
579        if padding > 0 {
580            buf.resize(padding, 0);
581        }
582
583        // Add entry data
584        let entry_start = buf.len();
585        buf.put_u32(item_len);
586        buf.put_slice(&encoded);
587
588        // Calculate checksum only for the entry data (without padding)
589        let checksum = crc32fast::hash(&buf[entry_start..]);
590        buf.put_u32(checksum);
591        assert_eq!(buf[entry_start..].len(), entry_len);
592
593        // Append item to blob
594        blob.append(buf).await?;
595        trace!(blob = section, offset, "appended item");
596        Ok((offset, item_len))
597    }
598
599    /// Retrieves an item from `Journal` at a given `section` and `offset`.
600    pub async fn get(&self, section: u64, offset: u32) -> Result<Option<V>, Error> {
601        self.prune_guard(section, false)?;
602        let blob = match self.blobs.get(&section) {
603            Some(blob) => blob,
604            None => return Ok(None),
605        };
606
607        // Perform a multi-op read.
608        let (_, _, item) = Self::read(
609            self.cfg.compression.is_some(),
610            &self.cfg.codec_config,
611            blob,
612            offset,
613        )
614        .await?;
615        Ok(Some(item))
616    }
617
618    /// Retrieves an item from `Journal` at a given `section` and `offset` with a given size.
619    pub async fn get_exact(
620        &self,
621        section: u64,
622        offset: u32,
623        size: u32,
624    ) -> Result<Option<V>, Error> {
625        self.prune_guard(section, false)?;
626        let blob = match self.blobs.get(&section) {
627            Some(blob) => blob,
628            None => return Ok(None),
629        };
630
631        // Perform a multi-op read.
632        let item = Self::read_exact(
633            self.cfg.compression.is_some(),
634            &self.cfg.codec_config,
635            blob,
636            offset,
637            size,
638        )
639        .await?;
640        Ok(Some(item))
641    }
642
643    /// Gets the size of the journal for a specific section.
644    ///
645    /// Returns 0 if the section does not exist.
646    pub async fn size(&self, section: u64) -> Result<u64, Error> {
647        self.prune_guard(section, false)?;
648        match self.blobs.get(&section) {
649            Some(blob) => Ok(blob.size().await),
650            None => Ok(0),
651        }
652    }
653
654    /// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
655    pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
656        self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
657    }
658
659    /// Rewinds the journal to the given `section` and `size`.
660    ///
661    /// This removes any data beyond the specified `section` and `size`.
662    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
663        self.prune_guard(section, false)?;
664
665        // Remove any sections beyond the given section
666        let trailing: Vec<u64> = self
667            .blobs
668            .range((
669                std::ops::Bound::Excluded(section),
670                std::ops::Bound::Unbounded,
671            ))
672            .map(|(&section, _)| section)
673            .collect();
674        for index in trailing.iter().rev() {
675            // Remove the underlying blob from storage.
676            let blob = self.blobs.remove(index).unwrap();
677
678            // Destroy the blob
679            drop(blob);
680            self.context
681                .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
682                .await?;
683            debug!(section = index, "removed section");
684            self.tracked.dec();
685        }
686
687        // If the section exists, truncate it to the given offset
688        let blob = match self.blobs.get_mut(&section) {
689            Some(blob) => blob,
690            None => return Ok(()),
691        };
692        let current = blob.size().await;
693        if size >= current {
694            return Ok(()); // Already smaller than or equal to target size
695        }
696        blob.resize(size).await?;
697        debug!(
698            section,
699            from = current,
700            to = size,
701            ?trailing,
702            "rewound journal"
703        );
704        Ok(())
705    }
706
707    /// Rewinds the `section` to the given `size`.
708    ///
709    /// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
710    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
711        self.prune_guard(section, false)?;
712
713        // Get the blob at the given section
714        let blob = match self.blobs.get_mut(&section) {
715            Some(blob) => blob,
716            None => return Ok(()),
717        };
718
719        // Truncate the blob to the given size
720        let current = blob.size().await;
721        if size >= current {
722            return Ok(()); // Already smaller than or equal to target size
723        }
724        blob.resize(size).await?;
725        debug!(section, from = current, to = size, "rewound section");
726        Ok(())
727    }
728
729    /// Ensures that all data in a given `section` is synced to the underlying store.
730    ///
731    /// If the `section` does not exist, no error will be returned.
732    pub async fn sync(&self, section: u64) -> Result<(), Error> {
733        self.prune_guard(section, false)?;
734        let blob = match self.blobs.get(&section) {
735            Some(blob) => blob,
736            None => return Ok(()),
737        };
738        self.synced.inc();
739        blob.sync().await.map_err(Error::Runtime)
740    }
741
742    /// Prunes all `sections` less than `min`.
743    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
744        // Prune any blobs that are smaller than the minimum
745        while let Some((&section, _)) = self.blobs.first_key_value() {
746            // Stop pruning if we reach the minimum
747            if section >= min {
748                break;
749            }
750
751            // Remove blob
752            let blob = self.blobs.remove(&section).unwrap();
753            let size = blob.size().await;
754            drop(blob);
755
756            // Remove blob from storage
757            self.context
758                .remove(&self.cfg.partition, Some(&section.to_be_bytes()))
759                .await?;
760            debug!(blob = section, size, "pruned blob");
761            self.tracked.dec();
762            self.pruned.inc();
763        }
764
765        // Update oldest allowed
766        self.oldest_allowed = Some(min);
767        Ok(())
768    }
769
770    /// Syncs and closes all open sections.
771    pub async fn close(self) -> Result<(), Error> {
772        for (section, blob) in self.blobs.into_iter() {
773            let size = blob.size().await;
774            blob.sync().await?;
775            debug!(blob = section, size, "synced blob");
776        }
777        Ok(())
778    }
779
780    /// Remove any underlying blobs created by the journal.
781    pub async fn destroy(self) -> Result<(), Error> {
782        for (i, blob) in self.blobs.into_iter() {
783            let size = blob.size().await;
784            drop(blob);
785            debug!(blob = i, size, "destroyed blob");
786            self.context
787                .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
788                .await?;
789        }
790        match self.context.remove(&self.cfg.partition, None).await {
791            Ok(()) => {}
792            Err(RError::PartitionMissing(_)) => {
793                // Partition already removed or never existed.
794            }
795            Err(err) => return Err(Error::Runtime(err)),
796        }
797        Ok(())
798    }
799}
800
801#[cfg(test)]
802mod tests {
803    use super::*;
804    use bytes::BufMut;
805    use commonware_cryptography::hash;
806    use commonware_macros::test_traced;
807    use commonware_runtime::{
808        buffer::PoolRef, deterministic, Blob, Error as RError, Runner, Storage,
809    };
810    use commonware_utils::{NZUsize, StableBuf};
811    use futures::{pin_mut, StreamExt};
812    use prometheus_client::registry::Metric;
813
814    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
815    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
816
817    #[test_traced]
818    fn test_journal_append_and_read() {
819        // Initialize the deterministic context
820        let executor = deterministic::Runner::default();
821
822        // Start the test within the executor
823        executor.start(|context| async move {
824            // Initialize the journal
825            let cfg = Config {
826                partition: "test_partition".into(),
827                compression: None,
828                codec_config: (),
829                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
830                write_buffer: NZUsize!(1024),
831            };
832            let index = 1u64;
833            let data = 10;
834            let mut journal = Journal::init(context.clone(), cfg.clone())
835                .await
836                .expect("Failed to initialize journal");
837
838            // Append an item to the journal
839            journal
840                .append(index, data)
841                .await
842                .expect("Failed to append data");
843
844            // Check metrics
845            let buffer = context.encode();
846            assert!(buffer.contains("tracked 1"));
847
848            // Close the journal
849            journal.close().await.expect("Failed to close journal");
850
851            // Re-initialize the journal to simulate a restart
852            let cfg = Config {
853                partition: "test_partition".into(),
854                compression: None,
855                codec_config: (),
856                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
857                write_buffer: NZUsize!(1024),
858            };
859            let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
860                .await
861                .expect("Failed to re-initialize journal");
862
863            // Replay the journal and collect items
864            let mut items = Vec::new();
865            let stream = journal
866                .replay(NZUsize!(1024))
867                .await
868                .expect("unable to setup replay");
869            pin_mut!(stream);
870            while let Some(result) = stream.next().await {
871                match result {
872                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
873                    Err(err) => panic!("Failed to read item: {err}"),
874                }
875            }
876
877            // Verify that the item was replayed correctly
878            assert_eq!(items.len(), 1);
879            assert_eq!(items[0].0, index);
880            assert_eq!(items[0].1, data);
881
882            // Check metrics
883            let buffer = context.encode();
884            assert!(buffer.contains("tracked 1"));
885        });
886    }
887
888    #[test_traced]
889    fn test_journal_multiple_appends_and_reads() {
890        // Initialize the deterministic context
891        let executor = deterministic::Runner::default();
892
893        // Start the test within the executor
894        executor.start(|context| async move {
895            // Create a journal configuration
896            let cfg = Config {
897                partition: "test_partition".into(),
898                compression: None,
899                codec_config: (),
900                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
901                write_buffer: NZUsize!(1024),
902            };
903
904            // Initialize the journal
905            let mut journal = Journal::init(context.clone(), cfg.clone())
906                .await
907                .expect("Failed to initialize journal");
908
909            // Append multiple items to different blobs
910            let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
911            for (index, data) in &data_items {
912                journal
913                    .append(*index, *data)
914                    .await
915                    .expect("Failed to append data");
916                journal.sync(*index).await.expect("Failed to sync blob");
917            }
918
919            // Check metrics
920            let buffer = context.encode();
921            assert!(buffer.contains("tracked 3"));
922            assert!(buffer.contains("synced_total 4"));
923
924            // Close the journal
925            journal.close().await.expect("Failed to close journal");
926
927            // Re-initialize the journal to simulate a restart
928            let journal = Journal::init(context, cfg)
929                .await
930                .expect("Failed to re-initialize journal");
931
932            // Replay the journal and collect items
933            let mut items = Vec::<(u64, u32)>::new();
934            {
935                let stream = journal
936                    .replay(NZUsize!(1024))
937                    .await
938                    .expect("unable to setup replay");
939                pin_mut!(stream);
940                while let Some(result) = stream.next().await {
941                    match result {
942                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
943                        Err(err) => panic!("Failed to read item: {err}"),
944                    }
945                }
946            }
947
948            // Verify that all items were replayed correctly
949            assert_eq!(items.len(), data_items.len());
950            for ((expected_index, expected_data), (actual_index, actual_data)) in
951                data_items.iter().zip(items.iter())
952            {
953                assert_eq!(actual_index, expected_index);
954                assert_eq!(actual_data, expected_data);
955            }
956
957            // Cleanup
958            journal.destroy().await.expect("Failed to destroy journal");
959        });
960    }
961
962    #[test_traced]
963    fn test_journal_prune_blobs() {
964        // Initialize the deterministic context
965        let executor = deterministic::Runner::default();
966
967        // Start the test within the executor
968        executor.start(|context| async move {
969            // Create a journal configuration
970            let cfg = Config {
971                partition: "test_partition".into(),
972                compression: None,
973                codec_config: (),
974                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
975                write_buffer: NZUsize!(1024),
976            };
977
978            // Initialize the journal
979            let mut journal = Journal::init(context.clone(), cfg.clone())
980                .await
981                .expect("Failed to initialize journal");
982
983            // Append items to multiple blobs
984            for index in 1u64..=5u64 {
985                journal
986                    .append(index, index)
987                    .await
988                    .expect("Failed to append data");
989                journal.sync(index).await.expect("Failed to sync blob");
990            }
991
992            // Add one item out-of-order
993            let data = 99;
994            journal
995                .append(2u64, data)
996                .await
997                .expect("Failed to append data");
998            journal.sync(2u64).await.expect("Failed to sync blob");
999
1000            // Prune blobs with indices less than 3
1001            journal.prune(3).await.expect("Failed to prune blobs");
1002
1003            // Check metrics
1004            let buffer = context.encode();
1005            assert!(buffer.contains("pruned_total 2"));
1006
1007            // Prune again with a section less than the previous one, should be a no-op
1008            journal.prune(2).await.expect("Failed to no-op prune");
1009            let buffer = context.encode();
1010            assert!(buffer.contains("pruned_total 2"));
1011
1012            // Close the journal
1013            journal.close().await.expect("Failed to close journal");
1014
1015            // Re-initialize the journal to simulate a restart
1016            let mut journal = Journal::init(context.clone(), cfg.clone())
1017                .await
1018                .expect("Failed to re-initialize journal");
1019
1020            // Replay the journal and collect items
1021            let mut items = Vec::<(u64, u64)>::new();
1022            {
1023                let stream = journal
1024                    .replay(NZUsize!(1024))
1025                    .await
1026                    .expect("unable to setup replay");
1027                pin_mut!(stream);
1028                while let Some(result) = stream.next().await {
1029                    match result {
1030                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1031                        Err(err) => panic!("Failed to read item: {err}"),
1032                    }
1033                }
1034            }
1035
1036            // Verify that items from blobs 1 and 2 are not present
1037            assert_eq!(items.len(), 3);
1038            let expected_indices = [3u64, 4u64, 5u64];
1039            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1040                assert_eq!(item.0, *expected_index);
1041            }
1042
1043            // Prune all blobs
1044            journal.prune(6).await.expect("Failed to prune blobs");
1045
1046            // Close the journal
1047            journal.close().await.expect("Failed to close journal");
1048
1049            // Ensure no remaining blobs exist
1050            //
1051            // Note: We don't remove the partition, so this does not error
1052            // and instead returns an empty list of blobs.
1053            assert!(context
1054                .scan(&cfg.partition)
1055                .await
1056                .expect("Failed to list blobs")
1057                .is_empty());
1058        });
1059    }
1060
1061    #[test_traced]
1062    fn test_journal_with_invalid_blob_name() {
1063        // Initialize the deterministic context
1064        let executor = deterministic::Runner::default();
1065
1066        // Start the test within the executor
1067        executor.start(|context| async move {
1068            // Create a journal configuration
1069            let cfg = Config {
1070                partition: "test_partition".into(),
1071                compression: None,
1072                codec_config: (),
1073                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1074                write_buffer: NZUsize!(1024),
1075            };
1076
1077            // Manually create a blob with an invalid name (not 8 bytes)
1078            let invalid_blob_name = b"invalid"; // Less than 8 bytes
1079            let (blob, _) = context
1080                .open(&cfg.partition, invalid_blob_name)
1081                .await
1082                .expect("Failed to create blob with invalid name");
1083            blob.sync().await.expect("Failed to sync blob");
1084
1085            // Attempt to initialize the journal
1086            let result = Journal::<_, u64>::init(context, cfg).await;
1087
1088            // Expect an error
1089            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1090        });
1091    }
1092
1093    #[test_traced]
1094    fn test_journal_read_size_missing() {
1095        // Initialize the deterministic context
1096        let executor = deterministic::Runner::default();
1097
1098        // Start the test within the executor
1099        executor.start(|context| async move {
1100            // Create a journal configuration
1101            let cfg = Config {
1102                partition: "test_partition".into(),
1103                compression: None,
1104                codec_config: (),
1105                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1106                write_buffer: NZUsize!(1024),
1107            };
1108
1109            // Manually create a blob with incomplete size data
1110            let section = 1u64;
1111            let blob_name = section.to_be_bytes();
1112            let (blob, _) = context
1113                .open(&cfg.partition, &blob_name)
1114                .await
1115                .expect("Failed to create blob");
1116
1117            // Write incomplete size data (less than 4 bytes)
1118            let incomplete_data = vec![0x00, 0x01]; // Less than 4 bytes
1119            blob.write_at(incomplete_data, 0)
1120                .await
1121                .expect("Failed to write incomplete data");
1122            blob.sync().await.expect("Failed to sync blob");
1123
1124            // Initialize the journal
1125            let journal = Journal::init(context, cfg)
1126                .await
1127                .expect("Failed to initialize journal");
1128
1129            // Attempt to replay the journal
1130            let stream = journal
1131                .replay(NZUsize!(1024))
1132                .await
1133                .expect("unable to setup replay");
1134            pin_mut!(stream);
1135            let mut items = Vec::<(u64, u64)>::new();
1136            while let Some(result) = stream.next().await {
1137                match result {
1138                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1139                    Err(err) => panic!("Failed to read item: {err}"),
1140                }
1141            }
1142            assert!(items.is_empty());
1143        });
1144    }
1145
1146    #[test_traced]
1147    fn test_journal_read_item_missing() {
1148        // Initialize the deterministic context
1149        let executor = deterministic::Runner::default();
1150
1151        // Start the test within the executor
1152        executor.start(|context| async move {
1153            // Create a journal configuration
1154            let cfg = Config {
1155                partition: "test_partition".into(),
1156                compression: None,
1157                codec_config: (),
1158                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1159                write_buffer: NZUsize!(1024),
1160            };
1161
1162            // Manually create a blob with missing item data
1163            let section = 1u64;
1164            let blob_name = section.to_be_bytes();
1165            let (blob, _) = context
1166                .open(&cfg.partition, &blob_name)
1167                .await
1168                .expect("Failed to create blob");
1169
1170            // Write size but no item data
1171            let item_size: u32 = 10; // Size of the item
1172            let mut buf = Vec::new();
1173            buf.put_u32(item_size);
1174            let data = [2u8; 5];
1175            BufMut::put_slice(&mut buf, &data);
1176            blob.write_at(buf, 0)
1177                .await
1178                .expect("Failed to write item size");
1179            blob.sync().await.expect("Failed to sync blob");
1180
1181            // Initialize the journal
1182            let journal = Journal::init(context, cfg)
1183                .await
1184                .expect("Failed to initialize journal");
1185
1186            // Attempt to replay the journal
1187            let stream = journal
1188                .replay(NZUsize!(1024))
1189                .await
1190                .expect("unable to setup replay");
1191            pin_mut!(stream);
1192            let mut items = Vec::<(u64, u64)>::new();
1193            while let Some(result) = stream.next().await {
1194                match result {
1195                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1196                    Err(err) => panic!("Failed to read item: {err}"),
1197                }
1198            }
1199            assert!(items.is_empty());
1200        });
1201    }
1202
1203    #[test_traced]
1204    fn test_journal_read_checksum_missing() {
1205        // Initialize the deterministic context
1206        let executor = deterministic::Runner::default();
1207
1208        // Start the test within the executor
1209        executor.start(|context| async move {
1210            // Create a journal configuration
1211            let cfg = Config {
1212                partition: "test_partition".into(),
1213                compression: None,
1214                codec_config: (),
1215                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1216                write_buffer: NZUsize!(1024),
1217            };
1218
1219            // Manually create a blob with missing checksum
1220            let section = 1u64;
1221            let blob_name = section.to_be_bytes();
1222            let (blob, _) = context
1223                .open(&cfg.partition, &blob_name)
1224                .await
1225                .expect("Failed to create blob");
1226
1227            // Prepare item data
1228            let item_data = b"Test data";
1229            let item_size = item_data.len() as u32;
1230
1231            // Write size
1232            let mut offset = 0;
1233            blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1234                .await
1235                .expect("Failed to write item size");
1236            offset += 4;
1237
1238            // Write item data
1239            blob.write_at(item_data.to_vec(), offset)
1240                .await
1241                .expect("Failed to write item data");
1242            // Do not write checksum (omit it)
1243
1244            blob.sync().await.expect("Failed to sync blob");
1245
1246            // Initialize the journal
1247            let journal = Journal::init(context, cfg)
1248                .await
1249                .expect("Failed to initialize journal");
1250
1251            // Attempt to replay the journal
1252            //
1253            // This will truncate the leftover bytes from our manual write.
1254            let stream = journal
1255                .replay(NZUsize!(1024))
1256                .await
1257                .expect("unable to setup replay");
1258            pin_mut!(stream);
1259            let mut items = Vec::<(u64, u64)>::new();
1260            while let Some(result) = stream.next().await {
1261                match result {
1262                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1263                    Err(err) => panic!("Failed to read item: {err}"),
1264                }
1265            }
1266            assert!(items.is_empty());
1267        });
1268    }
1269
1270    #[test_traced]
1271    fn test_journal_read_checksum_mismatch() {
1272        // Initialize the deterministic context
1273        let executor = deterministic::Runner::default();
1274
1275        // Start the test within the executor
1276        executor.start(|context| async move {
1277            // Create a journal configuration
1278            let cfg = Config {
1279                partition: "test_partition".into(),
1280                compression: None,
1281                codec_config: (),
1282                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1283                write_buffer: NZUsize!(1024),
1284            };
1285
1286            // Manually create a blob with incorrect checksum
1287            let section = 1u64;
1288            let blob_name = section.to_be_bytes();
1289            let (blob, _) = context
1290                .open(&cfg.partition, &blob_name)
1291                .await
1292                .expect("Failed to create blob");
1293
1294            // Prepare item data
1295            let item_data = b"Test data";
1296            let item_size = item_data.len() as u32;
1297            let incorrect_checksum: u32 = 0xDEADBEEF;
1298
1299            // Write size
1300            let mut offset = 0;
1301            blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1302                .await
1303                .expect("Failed to write item size");
1304            offset += 4;
1305
1306            // Write item data
1307            blob.write_at(item_data.to_vec(), offset)
1308                .await
1309                .expect("Failed to write item data");
1310            offset += item_data.len() as u64;
1311
1312            // Write incorrect checksum
1313            blob.write_at(incorrect_checksum.to_be_bytes().to_vec(), offset)
1314                .await
1315                .expect("Failed to write incorrect checksum");
1316
1317            blob.sync().await.expect("Failed to sync blob");
1318
1319            // Initialize the journal
1320            let journal = Journal::init(context.clone(), cfg.clone())
1321                .await
1322                .expect("Failed to initialize journal");
1323
1324            // Attempt to replay the journal
1325            {
1326                let stream = journal
1327                    .replay(NZUsize!(1024))
1328                    .await
1329                    .expect("unable to setup replay");
1330                pin_mut!(stream);
1331                let mut items = Vec::<(u64, u64)>::new();
1332                while let Some(result) = stream.next().await {
1333                    match result {
1334                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1335                        Err(err) => panic!("Failed to read item: {err}"),
1336                    }
1337                }
1338                assert!(items.is_empty());
1339            }
1340            journal.close().await.expect("Failed to close journal");
1341
1342            // Confirm blob is expected length
1343            let (_, blob_size) = context
1344                .open(&cfg.partition, &section.to_be_bytes())
1345                .await
1346                .expect("Failed to open blob");
1347            assert_eq!(blob_size, 0);
1348        });
1349    }
1350
1351    #[test_traced]
1352    fn test_journal_handling_unaligned_truncated_data() {
1353        // Initialize the deterministic context
1354        let executor = deterministic::Runner::default();
1355
1356        // Start the test within the executor
1357        executor.start(|context| async move {
1358            // Create a journal configuration
1359            let cfg = Config {
1360                partition: "test_partition".into(),
1361                compression: None,
1362                codec_config: (),
1363                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1364                write_buffer: NZUsize!(1024),
1365            };
1366
1367            // Initialize the journal
1368            let mut journal = Journal::init(context.clone(), cfg.clone())
1369                .await
1370                .expect("Failed to initialize journal");
1371
1372            // Append 1 item to the first index
1373            journal.append(1, 1).await.expect("Failed to append data");
1374
1375            // Append multiple items to the second index (with unaligned values)
1376            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1377            for (index, data) in &data_items {
1378                journal
1379                    .append(*index, *data)
1380                    .await
1381                    .expect("Failed to append data");
1382                journal.sync(*index).await.expect("Failed to sync blob");
1383            }
1384
1385            // Close the journal
1386            journal.close().await.expect("Failed to close journal");
1387
1388            // Manually corrupt the end of the second blob
1389            let (blob, blob_size) = context
1390                .open(&cfg.partition, &2u64.to_be_bytes())
1391                .await
1392                .expect("Failed to open blob");
1393            blob.resize(blob_size - 4)
1394                .await
1395                .expect("Failed to corrupt blob");
1396            blob.sync().await.expect("Failed to sync blob");
1397
1398            // Re-initialize the journal to simulate a restart
1399            let journal = Journal::init(context.clone(), cfg.clone())
1400                .await
1401                .expect("Failed to re-initialize journal");
1402
1403            // Attempt to replay the journal
1404            let mut items = Vec::<(u64, u32)>::new();
1405            {
1406                let stream = journal
1407                    .replay(NZUsize!(1024))
1408                    .await
1409                    .expect("unable to setup replay");
1410                pin_mut!(stream);
1411                while let Some(result) = stream.next().await {
1412                    match result {
1413                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1414                        Err(err) => panic!("Failed to read item: {err}"),
1415                    }
1416                }
1417            }
1418            journal.close().await.expect("Failed to close journal");
1419
1420            // Verify that only non-corrupted items were replayed
1421            assert_eq!(items.len(), 3);
1422            assert_eq!(items[0].0, 1);
1423            assert_eq!(items[0].1, 1);
1424            assert_eq!(items[1].0, data_items[0].0);
1425            assert_eq!(items[1].1, data_items[0].1);
1426            assert_eq!(items[2].0, data_items[1].0);
1427            assert_eq!(items[2].1, data_items[1].1);
1428
1429            // Confirm blob is expected length
1430            let (_, blob_size) = context
1431                .open(&cfg.partition, &2u64.to_be_bytes())
1432                .await
1433                .expect("Failed to open blob");
1434            assert_eq!(blob_size, 28);
1435
1436            // Attempt to replay journal after truncation
1437            let mut journal = Journal::init(context.clone(), cfg.clone())
1438                .await
1439                .expect("Failed to re-initialize journal");
1440
1441            // Attempt to replay the journal
1442            let mut items = Vec::<(u64, u32)>::new();
1443            {
1444                let stream = journal
1445                    .replay(NZUsize!(1024))
1446                    .await
1447                    .expect("unable to setup replay");
1448                pin_mut!(stream);
1449                while let Some(result) = stream.next().await {
1450                    match result {
1451                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1452                        Err(err) => panic!("Failed to read item: {err}"),
1453                    }
1454                }
1455            }
1456
1457            // Verify that only non-corrupted items were replayed
1458            assert_eq!(items.len(), 3);
1459            assert_eq!(items[0].0, 1);
1460            assert_eq!(items[0].1, 1);
1461            assert_eq!(items[1].0, data_items[0].0);
1462            assert_eq!(items[1].1, data_items[0].1);
1463            assert_eq!(items[2].0, data_items[1].0);
1464            assert_eq!(items[2].1, data_items[1].1);
1465
1466            // Append a new item to truncated partition
1467            journal.append(2, 5).await.expect("Failed to append data");
1468            journal.sync(2).await.expect("Failed to sync blob");
1469
1470            // Get the new item
1471            let item = journal
1472                .get(2, 2)
1473                .await
1474                .expect("Failed to get item")
1475                .expect("Failed to get item");
1476            assert_eq!(item, 5);
1477
1478            // Close the journal
1479            journal.close().await.expect("Failed to close journal");
1480
1481            // Confirm blob is expected length
1482            let (_, blob_size) = context
1483                .open(&cfg.partition, &2u64.to_be_bytes())
1484                .await
1485                .expect("Failed to open blob");
1486            assert_eq!(blob_size, 44);
1487
1488            // Re-initialize the journal to simulate a restart
1489            let journal = Journal::init(context.clone(), cfg.clone())
1490                .await
1491                .expect("Failed to re-initialize journal");
1492
1493            // Attempt to replay the journal
1494            let mut items = Vec::<(u64, u32)>::new();
1495            {
1496                let stream = journal
1497                    .replay(NZUsize!(1024))
1498                    .await
1499                    .expect("unable to setup replay");
1500                pin_mut!(stream);
1501                while let Some(result) = stream.next().await {
1502                    match result {
1503                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1504                        Err(err) => panic!("Failed to read item: {err}"),
1505                    }
1506                }
1507            }
1508
1509            // Verify that only non-corrupted items were replayed
1510            assert_eq!(items.len(), 4);
1511            assert_eq!(items[0].0, 1);
1512            assert_eq!(items[0].1, 1);
1513            assert_eq!(items[1].0, data_items[0].0);
1514            assert_eq!(items[1].1, data_items[0].1);
1515            assert_eq!(items[2].0, data_items[1].0);
1516            assert_eq!(items[2].1, data_items[1].1);
1517            assert_eq!(items[3].0, 2);
1518            assert_eq!(items[3].1, 5);
1519        });
1520    }
1521
1522    #[test_traced]
1523    fn test_journal_handling_aligned_truncated_data() {
1524        // Initialize the deterministic context
1525        let executor = deterministic::Runner::default();
1526
1527        // Start the test within the executor
1528        executor.start(|context| async move {
1529            // Create a journal configuration
1530            let cfg = Config {
1531                partition: "test_partition".into(),
1532                compression: None,
1533                codec_config: (),
1534                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1535                write_buffer: NZUsize!(1024),
1536            };
1537
1538            // Initialize the journal
1539            let mut journal = Journal::init(context.clone(), cfg.clone())
1540                .await
1541                .expect("Failed to initialize journal");
1542
1543            // Append 1 item to the first index
1544            journal.append(1, 1).await.expect("Failed to append data");
1545
1546            // Append multiple items to the second index (with unaligned values)
1547            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1548            for (index, data) in &data_items {
1549                journal
1550                    .append(*index, *data)
1551                    .await
1552                    .expect("Failed to append data");
1553                journal.sync(*index).await.expect("Failed to sync blob");
1554            }
1555
1556            // Close the journal
1557            journal.close().await.expect("Failed to close journal");
1558
1559            // Manually corrupt the end of the second blob
1560            let (blob, blob_size) = context
1561                .open(&cfg.partition, &2u64.to_be_bytes())
1562                .await
1563                .expect("Failed to open blob");
1564            blob.resize(blob_size - 4)
1565                .await
1566                .expect("Failed to corrupt blob");
1567            blob.sync().await.expect("Failed to sync blob");
1568
1569            // Re-initialize the journal to simulate a restart
1570            let mut journal = Journal::init(context.clone(), cfg.clone())
1571                .await
1572                .expect("Failed to re-initialize journal");
1573
1574            // Attempt to replay the journal
1575            let mut items = Vec::<(u64, u64)>::new();
1576            {
1577                let stream = journal
1578                    .replay(NZUsize!(1024))
1579                    .await
1580                    .expect("unable to setup replay");
1581                pin_mut!(stream);
1582                while let Some(result) = stream.next().await {
1583                    match result {
1584                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1585                        Err(err) => panic!("Failed to read item: {err}"),
1586                    }
1587                }
1588            }
1589
1590            // Verify that only non-corrupted items were replayed
1591            assert_eq!(items.len(), 3);
1592            assert_eq!(items[0].0, 1);
1593            assert_eq!(items[0].1, 1);
1594            assert_eq!(items[1].0, data_items[0].0);
1595            assert_eq!(items[1].1, data_items[0].1);
1596            assert_eq!(items[2].0, data_items[1].0);
1597            assert_eq!(items[2].1, data_items[1].1);
1598
1599            // Append a new item to the truncated partition
1600            journal.append(2, 5).await.expect("Failed to append data");
1601            journal.sync(2).await.expect("Failed to sync blob");
1602
1603            // Get the new item
1604            let item = journal
1605                .get(2, 2)
1606                .await
1607                .expect("Failed to get item")
1608                .expect("Failed to get item");
1609            assert_eq!(item, 5);
1610
1611            // Close the journal
1612            journal.close().await.expect("Failed to close journal");
1613
1614            // Confirm blob is expected length
1615            let (_, blob_size) = context
1616                .open(&cfg.partition, &2u64.to_be_bytes())
1617                .await
1618                .expect("Failed to open blob");
1619            assert_eq!(blob_size, 48);
1620
1621            // Attempt to replay journal after truncation
1622            let journal = Journal::init(context, cfg)
1623                .await
1624                .expect("Failed to re-initialize journal");
1625
1626            // Attempt to replay the journal
1627            let mut items = Vec::<(u64, u64)>::new();
1628            {
1629                let stream = journal
1630                    .replay(NZUsize!(1024))
1631                    .await
1632                    .expect("unable to setup replay");
1633                pin_mut!(stream);
1634                while let Some(result) = stream.next().await {
1635                    match result {
1636                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1637                        Err(err) => panic!("Failed to read item: {err}"),
1638                    }
1639                }
1640            }
1641            journal.close().await.expect("Failed to close journal");
1642
1643            // Verify that only non-corrupted items were replayed
1644            assert_eq!(items.len(), 4);
1645            assert_eq!(items[0].0, 1);
1646            assert_eq!(items[0].1, 1);
1647            assert_eq!(items[1].0, data_items[0].0);
1648            assert_eq!(items[1].1, data_items[0].1);
1649            assert_eq!(items[2].0, data_items[1].0);
1650            assert_eq!(items[2].1, data_items[1].1);
1651            assert_eq!(items[3].0, 2);
1652            assert_eq!(items[3].1, 5);
1653        });
1654    }
1655
1656    #[test_traced]
1657    fn test_journal_handling_extra_data() {
1658        // Initialize the deterministic context
1659        let executor = deterministic::Runner::default();
1660
1661        // Start the test within the executor
1662        executor.start(|context| async move {
1663            // Create a journal configuration
1664            let cfg = Config {
1665                partition: "test_partition".into(),
1666                compression: None,
1667                codec_config: (),
1668                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1669                write_buffer: NZUsize!(1024),
1670            };
1671
1672            // Initialize the journal
1673            let mut journal = Journal::init(context.clone(), cfg.clone())
1674                .await
1675                .expect("Failed to initialize journal");
1676
1677            // Append 1 item to the first index
1678            journal.append(1, 1).await.expect("Failed to append data");
1679
1680            // Append multiple items to the second index
1681            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1682            for (index, data) in &data_items {
1683                journal
1684                    .append(*index, *data)
1685                    .await
1686                    .expect("Failed to append data");
1687                journal.sync(*index).await.expect("Failed to sync blob");
1688            }
1689
1690            // Close the journal
1691            journal.close().await.expect("Failed to close journal");
1692
1693            // Manually add extra data to the end of the second blob
1694            let (blob, blob_size) = context
1695                .open(&cfg.partition, &2u64.to_be_bytes())
1696                .await
1697                .expect("Failed to open blob");
1698            blob.write_at(vec![0u8; 16], blob_size)
1699                .await
1700                .expect("Failed to add extra data");
1701            blob.sync().await.expect("Failed to sync blob");
1702
1703            // Re-initialize the journal to simulate a restart
1704            let journal = Journal::init(context, cfg)
1705                .await
1706                .expect("Failed to re-initialize journal");
1707
1708            // Attempt to replay the journal
1709            let mut items = Vec::<(u64, i32)>::new();
1710            let stream = journal
1711                .replay(NZUsize!(1024))
1712                .await
1713                .expect("unable to setup replay");
1714            pin_mut!(stream);
1715            while let Some(result) = stream.next().await {
1716                match result {
1717                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1718                    Err(err) => panic!("Failed to read item: {err}"),
1719                }
1720            }
1721        });
1722    }
1723
1724    // Define `MockBlob` that returns an offset length that should overflow
1725    #[derive(Clone)]
1726    struct MockBlob {}
1727
1728    impl Blob for MockBlob {
1729        async fn read_at(
1730            &self,
1731            buf: impl Into<StableBuf> + Send,
1732            _offset: u64,
1733        ) -> Result<StableBuf, RError> {
1734            Ok(buf.into())
1735        }
1736
1737        async fn write_at(
1738            &self,
1739            _buf: impl Into<StableBuf> + Send,
1740            _offset: u64,
1741        ) -> Result<(), RError> {
1742            Ok(())
1743        }
1744
1745        async fn resize(&self, _len: u64) -> Result<(), RError> {
1746            Ok(())
1747        }
1748
1749        async fn sync(&self) -> Result<(), RError> {
1750            Ok(())
1751        }
1752    }
1753
1754    // Define `MockStorage` that returns `MockBlob`
1755    #[derive(Clone)]
1756    struct MockStorage {
1757        len: u64,
1758    }
1759
1760    impl Storage for MockStorage {
1761        type Blob = MockBlob;
1762
1763        async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1764            Ok((MockBlob {}, self.len))
1765        }
1766
1767        async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1768            Ok(())
1769        }
1770
1771        async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1772            Ok(vec![])
1773        }
1774    }
1775
1776    impl Metrics for MockStorage {
1777        fn with_label(&self, _: &str) -> Self {
1778            self.clone()
1779        }
1780
1781        fn label(&self) -> String {
1782            String::new()
1783        }
1784
1785        fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1786
1787        fn encode(&self) -> String {
1788            String::new()
1789        }
1790    }
1791
1792    // Define the `INDEX_ALIGNMENT` again explicitly to ensure we catch any accidental
1793    // changes to the value
1794    const INDEX_ALIGNMENT: u64 = 16;
1795
1796    #[test_traced]
1797    fn test_journal_large_offset() {
1798        // Initialize the deterministic context
1799        let executor = deterministic::Runner::default();
1800        executor.start(|_| async move {
1801            // Create journal
1802            let cfg = Config {
1803                partition: "partition".to_string(),
1804                compression: None,
1805                codec_config: (),
1806                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1807                write_buffer: NZUsize!(1024),
1808            };
1809            let context = MockStorage {
1810                len: u32::MAX as u64 * INDEX_ALIGNMENT, // can store up to u32::Max at the last offset
1811            };
1812            let mut journal = Journal::init(context, cfg).await.unwrap();
1813
1814            // Append data
1815            let data = 1;
1816            let (result, _) = journal
1817                .append(1, data)
1818                .await
1819                .expect("Failed to append data");
1820            assert_eq!(result, u32::MAX);
1821        });
1822    }
1823
1824    #[test_traced]
1825    fn test_journal_offset_overflow() {
1826        // Initialize the deterministic context
1827        let executor = deterministic::Runner::default();
1828        executor.start(|_| async move {
1829            // Create journal
1830            let cfg = Config {
1831                partition: "partition".to_string(),
1832                compression: None,
1833                codec_config: (),
1834                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1835                write_buffer: NZUsize!(1024),
1836            };
1837            let context = MockStorage {
1838                len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1839            };
1840            let mut journal = Journal::init(context, cfg).await.unwrap();
1841
1842            // Append data
1843            let data = 1;
1844            let result = journal.append(1, data).await;
1845            assert!(matches!(result, Err(Error::OffsetOverflow)));
1846        });
1847    }
1848
1849    #[test_traced]
1850    fn test_journal_rewind() {
1851        // Initialize the deterministic context
1852        let executor = deterministic::Runner::default();
1853        executor.start(|context| async move {
1854            // Create journal
1855            let cfg = Config {
1856                partition: "test_partition".to_string(),
1857                compression: None,
1858                codec_config: (),
1859                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1860                write_buffer: NZUsize!(1024),
1861            };
1862            let mut journal = Journal::init(context, cfg).await.unwrap();
1863
1864            // Check size of non-existent section
1865            let size = journal.size(1).await.unwrap();
1866            assert_eq!(size, 0);
1867
1868            // Append data to section 1
1869            journal.append(1, 42i32).await.unwrap();
1870
1871            // Check size of section 1 - should be greater than 0
1872            let size = journal.size(1).await.unwrap();
1873            assert!(size > 0);
1874
1875            // Append more data and verify size increases
1876            journal.append(1, 43i32).await.unwrap();
1877            let new_size = journal.size(1).await.unwrap();
1878            assert!(new_size > size);
1879
1880            // Check size of different section - should still be 0
1881            let size = journal.size(2).await.unwrap();
1882            assert_eq!(size, 0);
1883
1884            // Append data to section 2
1885            journal.append(2, 44i32).await.unwrap();
1886
1887            // Check size of section 2 - should be greater than 0
1888            let size = journal.size(2).await.unwrap();
1889            assert!(size > 0);
1890
1891            // Rollback everything in section 1 and 2
1892            journal.rewind(1, 0).await.unwrap();
1893
1894            // Check size of section 1 - should be 0
1895            let size = journal.size(1).await.unwrap();
1896            assert_eq!(size, 0);
1897
1898            // Check size of section 2 - should be 0
1899            let size = journal.size(2).await.unwrap();
1900            assert_eq!(size, 0);
1901        });
1902    }
1903
1904    #[test_traced]
1905    fn test_journal_rewind_section() {
1906        // Initialize the deterministic context
1907        let executor = deterministic::Runner::default();
1908        executor.start(|context| async move {
1909            // Create journal
1910            let cfg = Config {
1911                partition: "test_partition".to_string(),
1912                compression: None,
1913                codec_config: (),
1914                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1915                write_buffer: NZUsize!(1024),
1916            };
1917            let mut journal = Journal::init(context, cfg).await.unwrap();
1918
1919            // Check size of non-existent section
1920            let size = journal.size(1).await.unwrap();
1921            assert_eq!(size, 0);
1922
1923            // Append data to section 1
1924            journal.append(1, 42i32).await.unwrap();
1925
1926            // Check size of section 1 - should be greater than 0
1927            let size = journal.size(1).await.unwrap();
1928            assert!(size > 0);
1929
1930            // Append more data and verify size increases
1931            journal.append(1, 43i32).await.unwrap();
1932            let new_size = journal.size(1).await.unwrap();
1933            assert!(new_size > size);
1934
1935            // Check size of different section - should still be 0
1936            let size = journal.size(2).await.unwrap();
1937            assert_eq!(size, 0);
1938
1939            // Append data to section 2
1940            journal.append(2, 44i32).await.unwrap();
1941
1942            // Check size of section 2 - should be greater than 0
1943            let size = journal.size(2).await.unwrap();
1944            assert!(size > 0);
1945
1946            // Rollback everything in section 1
1947            journal.rewind_section(1, 0).await.unwrap();
1948
1949            // Check size of section 1 - should be 0
1950            let size = journal.size(1).await.unwrap();
1951            assert_eq!(size, 0);
1952
1953            // Check size of section 2 - should be greater than 0
1954            let size = journal.size(2).await.unwrap();
1955            assert!(size > 0);
1956        });
1957    }
1958
1959    /// Protect against accidental changes to the journal disk format.
1960    #[test_traced]
1961    fn test_journal_conformance() {
1962        // Initialize the deterministic context
1963        let executor = deterministic::Runner::default();
1964
1965        // Start the test within the executor
1966        executor.start(|context| async move {
1967            // Create a journal configuration
1968            let cfg = Config {
1969                partition: "test_partition".into(),
1970                compression: None,
1971                codec_config: (),
1972                buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1973                write_buffer: NZUsize!(1024),
1974            };
1975
1976            // Initialize the journal
1977            let mut journal = Journal::init(context.clone(), cfg.clone())
1978                .await
1979                .expect("Failed to initialize journal");
1980
1981            // Append 100 items to the journal
1982            for i in 0..100 {
1983                journal.append(1, i).await.expect("Failed to append data");
1984            }
1985            journal.sync(1).await.expect("Failed to sync blob");
1986
1987            // Close the journal
1988            journal.close().await.expect("Failed to close journal");
1989
1990            // Hash blob contents
1991            let (blob, size) = context
1992                .open(&cfg.partition, &1u64.to_be_bytes())
1993                .await
1994                .expect("Failed to open blob");
1995            assert!(size > 0);
1996            let buf = blob
1997                .read_at(vec![0u8; size as usize], 0)
1998                .await
1999                .expect("Failed to read blob");
2000            let digest = hash(buf.as_ref());
2001            assert_eq!(
2002                hex(&digest),
2003                "ca3845fa7fabd4d2855ab72ed21226d1d6eb30cb895ea9ec5e5a14201f3f25d8",
2004            );
2005        });
2006    }
2007}