commonware_storage/journal/
variable.rs

1//! An append-only log for storing arbitrary variable length items.
2//!
3//! `variable::Journal` is an append-only log for storing arbitrary variable length data on disk. In
4//! addition to replay, stored items can be directly retrieved given their section number and offset
5//! within the section.
6//!
7//! # Format
8//!
9//! Data stored in `Journal` is persisted in one of many Blobs within a caller-provided `partition`.
10//! The particular `Blob` in which data is stored is identified by a `section` number (`u64`).
11//! Within a `section`, data is appended as an `item` with the following format:
12//!
13//! ```text
14//! +---+---+---+---+---+---+---+---+---+---+---+
15//! | 0 | 1 | 2 | 3 |    ...    | 8 | 9 |10 |11 |
16//! +---+---+---+---+---+---+---+---+---+---+---+
17//! |   Size (u32)  |   Data    |    C(u32)     |
18//! +---+---+---+---+---+---+---+---+---+---+---+
19//!
20//! C = CRC32(Size | Data)
21//! ```
22//!
23//! _To ensure data returned by `Journal` is correct, a checksum (CRC32) is stored at the end of
24//! each item. If the checksum of the read data does not match the stored checksum, an error is
25//! returned. This checksum is only verified when data is accessed and not at startup (which would
26//! require reading all data in `Journal`)._
27//!
28//! # Open Blobs
29//!
30//! `Journal` uses 1 `commonware-storage::Blob` per `section` to store data. All `Blobs` in a given
31//! `partition` are kept open during the lifetime of `Journal`. If the caller wishes to bound the
32//! number of open `Blobs`, they can group data into fewer `sections` and/or prune unused
33//! `sections`.
34//!
35//! # Offset Alignment
36//!
37//! In practice, `Journal` users won't store `u64::MAX` bytes of data in a given `section` (the max
38//! `Offset` provided by `Blob`). To reduce the memory usage for tracking offsets within `Journal`,
39//! offsets are thus `u32` (4 bytes) and aligned to 16 bytes. This means that the maximum size of
40//! any `section` is `u32::MAX * 17 = ~70GB` bytes (the last offset item can store up to `u32::MAX`
41//! bytes). If more data is written to a `section` past this max, an `OffsetOverflow` error is
42//! returned.
43//!
44//! # Sync
45//!
46//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
47//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
48//! calling `close`, all pending data is automatically synced and any open blobs are closed.
49//!
50//! # Pruning
51//!
52//! All data appended to `Journal` must be assigned to some `section` (`u64`). This assignment
53//! allows the caller to prune data from `Journal` by specifying a minimum `section` number. This
54//! could be used, for example, by some blockchain application to prune old blocks.
55//!
56//! # Replay
57//!
58//! During application initialization, it is very common to replay data from `Journal` to recover
59//! some in-memory state. `Journal` is heavily optimized for this pattern and provides a `replay`
60//! method to produce a stream of all items in the `Journal` in order of their `section` and
61//! `offset`.
62//!
63//! # Exact Reads
64//!
65//! To allow for items to be fetched in a single disk operation, `Journal` allows callers to specify
66//! an `exact` parameter to the `get` method. This `exact` parameter must be cached by the caller
67//! (provided during `replay`) and usage of an incorrect `exact` value will result in undefined
68//! behavior.
69//!
70//! # Compression
71//!
72//! `Journal` supports optional compression using `zstd`. This can be enabled by setting the
73//! `compression` field in the `Config` struct to a valid `zstd` compression level. This setting can
74//! be changed between initializations of `Journal`, however, it must remain populated if any data
75//! was written with compression enabled.
76//!
77//! # Example
78//!
79//! ```rust
80//! use commonware_runtime::{Spawner, Runner, deterministic};
81//! use commonware_storage::journal::variable::{Journal, Config};
82//!
83//! let executor = deterministic::Runner::default();
84//! executor.start(|context| async move {
85//!     // Create a journal
86//!     let mut journal = Journal::init(context, Config{
87//!         partition: "partition".to_string(),
88//!         compression: None,
89//!         codec_config: (),
90//!         write_buffer: 1024 * 1024,
91//!     }).await.unwrap();
92//!
93//!     // Append data to the journal
94//!     journal.append(1, 128).await.unwrap();
95//!
96//!     // Close the journal
97//!     journal.close().await.unwrap();
98//! });
99//! ```
100
101use super::Error;
102use bytes::BufMut;
103use commonware_codec::Codec;
104use commonware_runtime::{
105    buffer::{Read, Write},
106    Blob, Error as RError, Metrics, Storage,
107};
108use commonware_utils::hex;
109use futures::stream::{self, Stream, StreamExt};
110use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
111use std::{
112    collections::{btree_map::Entry, BTreeMap},
113    io::Cursor,
114    marker::PhantomData,
115};
116use tracing::{debug, trace, warn};
117use zstd::{bulk::compress, decode_all};
118
119/// Configuration for `Journal` storage.
120#[derive(Clone)]
121pub struct Config<C> {
122    /// The `commonware-runtime::Storage` partition to use
123    /// for storing journal blobs.
124    pub partition: String,
125
126    /// Optional compression level (using `zstd`) to apply to data before storing.
127    pub compression: Option<u8>,
128
129    /// The codec configuration to use for encoding and decoding items.
130    pub codec_config: C,
131
132    /// The size of the write buffer to use for each blob.
133    pub write_buffer: usize,
134}
135
136const ITEM_ALIGNMENT: u64 = 16;
137
138/// Computes the next offset for an item using the underlying `u64`
139/// offset of `Blob`.
140#[inline]
141fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
142    let overage = offset % ITEM_ALIGNMENT;
143    if overage != 0 {
144        offset += ITEM_ALIGNMENT - overage;
145    }
146    let offset = offset / ITEM_ALIGNMENT;
147    let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
148    Ok(aligned_offset)
149}
150
151/// Implementation of `Journal` storage.
152pub struct Journal<E: Storage + Metrics, V: Codec> {
153    context: E,
154    cfg: Config<V::Cfg>,
155
156    oldest_allowed: Option<u64>,
157
158    blobs: BTreeMap<u64, Write<E::Blob>>,
159
160    tracked: Gauge,
161    synced: Counter,
162    pruned: Counter,
163
164    _phantom: PhantomData<V>,
165}
166
167impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
168    /// Initialize a new `Journal` instance.
169    ///
170    /// All backing blobs are opened but not read during
171    /// initialization. The `replay` method can be used
172    /// to iterate over all items in the `Journal`.
173    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
174        // Iterate over blobs in partition
175        let mut blobs = BTreeMap::new();
176        let stored_blobs = match context.scan(&cfg.partition).await {
177            Ok(blobs) => blobs,
178            Err(RError::PartitionMissing(_)) => Vec::new(),
179            Err(err) => return Err(Error::Runtime(err)),
180        };
181        for name in stored_blobs {
182            let (blob, size) = context.open(&cfg.partition, &name).await?;
183            let hex_name = hex(&name);
184            let section = match name.try_into() {
185                Ok(section) => u64::from_be_bytes(section),
186                Err(_) => return Err(Error::InvalidBlobName(hex_name)),
187            };
188            debug!(section, blob = hex_name, size, "loaded section");
189            let blob = Write::new(blob, size, cfg.write_buffer);
190            blobs.insert(section, blob);
191        }
192
193        // Initialize metrics
194        let tracked = Gauge::default();
195        let synced = Counter::default();
196        let pruned = Counter::default();
197        context.register("tracked", "Number of blobs", tracked.clone());
198        context.register("synced", "Number of syncs", synced.clone());
199        context.register("pruned", "Number of blobs pruned", pruned.clone());
200        tracked.set(blobs.len() as i64);
201
202        // Create journal instance
203        Ok(Self {
204            context,
205            cfg,
206
207            oldest_allowed: None,
208
209            blobs,
210            tracked,
211            synced,
212            pruned,
213
214            _phantom: PhantomData,
215        })
216    }
217
218    /// Ensures that a pruned section is not accessed.
219    fn prune_guard(&self, section: u64, inclusive: bool) -> Result<(), Error> {
220        if let Some(oldest_allowed) = self.oldest_allowed {
221            if section < oldest_allowed || (inclusive && section <= oldest_allowed) {
222                return Err(Error::AlreadyPrunedToSection(oldest_allowed));
223            }
224        }
225        Ok(())
226    }
227
228    /// Reads an item from the blob at the given offset.
229    async fn read(
230        compressed: bool,
231        cfg: &V::Cfg,
232        blob: &Write<E::Blob>,
233        offset: u32,
234    ) -> Result<(u32, u32, V), Error> {
235        // Read item size
236        let mut hasher = crc32fast::Hasher::new();
237        let offset = offset as u64 * ITEM_ALIGNMENT;
238        let size = blob.read_at(vec![0; 4], offset).await?;
239        hasher.update(size.as_ref());
240        let size = u32::from_be_bytes(size.as_ref().try_into().unwrap()) as usize;
241        let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
242
243        // Read remaining
244        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
245        let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
246        let buf = buf.as_ref();
247        let offset = offset
248            .checked_add(buf_size as u64)
249            .ok_or(Error::OffsetOverflow)?;
250
251        // Read item
252        let item = &buf[..size];
253        hasher.update(item);
254
255        // Verify integrity
256        let checksum = hasher.finalize();
257        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
258        if checksum != stored_checksum {
259            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
260        }
261
262        // Compute next offset
263        let aligned_offset = compute_next_offset(offset)?;
264
265        // If compression is enabled, decompress the item
266        let item = if compressed {
267            let decompressed =
268                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
269            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
270        } else {
271            V::decode_cfg(item, cfg).map_err(Error::Codec)?
272        };
273
274        // Return item
275        Ok((aligned_offset, size as u32, item))
276    }
277
278    /// Helper function to read an item from a [Read].
279    async fn read_buffered(
280        reader: &mut Read<Write<E::Blob>>,
281        offset: u32,
282        cfg: &V::Cfg,
283        compressed: bool,
284    ) -> Result<(u32, u64, u32, V), Error> {
285        // Calculate absolute file offset from the item offset
286        let file_offset = offset as u64 * ITEM_ALIGNMENT;
287
288        // If we're not at the right position, seek to it
289        if reader.position() != file_offset {
290            reader.seek_to(file_offset).map_err(Error::Runtime)?;
291        }
292
293        // Read item size (4 bytes)
294        let mut hasher = crc32fast::Hasher::new();
295        let mut size_buf = [0u8; 4];
296        reader
297            .read_exact(&mut size_buf, 4)
298            .await
299            .map_err(Error::Runtime)?;
300        hasher.update(&size_buf);
301
302        // Read remaining
303        let size = u32::from_be_bytes(size_buf) as usize;
304        let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
305        let mut buf = vec![0u8; buf_size];
306        reader
307            .read_exact(&mut buf, buf_size)
308            .await
309            .map_err(Error::Runtime)?;
310
311        // Read item
312        let item = &buf[..size];
313        hasher.update(item);
314
315        // Verify integrity
316        let checksum = hasher.finalize();
317        let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
318        if checksum != stored_checksum {
319            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
320        }
321
322        // If compression is enabled, decompress the item
323        let item = if compressed {
324            let decompressed =
325                decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
326            V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
327        } else {
328            V::decode_cfg(item, cfg).map_err(Error::Codec)?
329        };
330
331        // Calculate next offset
332        let current_pos = reader.position();
333        let aligned_offset = compute_next_offset(current_pos)?;
334        Ok((aligned_offset, current_pos, size as u32, item))
335    }
336
337    /// Reads an item from the blob at the given offset and of a given size.
338    async fn read_exact(
339        compressed: bool,
340        cfg: &V::Cfg,
341        blob: &Write<E::Blob>,
342        offset: u32,
343        len: u32,
344    ) -> Result<V, Error> {
345        // Read buffer
346        let offset = offset as u64 * ITEM_ALIGNMENT;
347        let entry_size = 4 + len as usize + 4;
348        let buf = blob.read_at(vec![0u8; entry_size], offset).await?;
349
350        // Check size
351        let mut hasher = crc32fast::Hasher::new();
352        let disk_size = u32::from_be_bytes(buf.as_ref()[..4].try_into().unwrap());
353        hasher.update(&buf.as_ref()[..4]);
354        if disk_size != len {
355            return Err(Error::UnexpectedSize(disk_size, len));
356        }
357
358        // Verify integrity
359        let item = &buf.as_ref()[4..4 + len as usize];
360        hasher.update(item);
361        let checksum = hasher.finalize();
362        let stored_checksum =
363            u32::from_be_bytes(buf.as_ref()[4 + len as usize..].try_into().unwrap());
364        if checksum != stored_checksum {
365            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
366        }
367
368        // Decompress item
369        let item = if compressed {
370            decode_all(Cursor::new(item)).map_err(|_| Error::DecompressionFailed)?
371        } else {
372            item.to_vec()
373        };
374
375        // Return item
376        let item = V::decode_cfg(item.as_ref(), cfg).map_err(Error::Codec)?;
377        Ok(item)
378    }
379
380    /// Returns an ordered stream of all items in the journal, each as a tuple of (section, offset,
381    /// size, item).
382    ///
383    /// # Repair
384    ///
385    /// Like [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
386    /// and [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
387    /// the first invalid data read will be considered the new end of the journal (and the underlying [Blob] will be
388    /// truncated to the last valid item).
389    pub async fn replay(
390        &self,
391        buffer: usize,
392    ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
393        // Collect all blobs to replay
394        let codec_config = self.cfg.codec_config.clone();
395        let compressed = self.cfg.compression.is_some();
396        let mut blobs = Vec::with_capacity(self.blobs.len());
397        for (section, blob) in self.blobs.iter() {
398            let blob_size = blob.size().await;
399            let max_offset = compute_next_offset(blob_size)?;
400            blobs.push((
401                *section,
402                blob.clone(),
403                max_offset,
404                blob_size,
405                codec_config.clone(),
406                compressed,
407            ));
408        }
409
410        // Replay all blobs in order and stream items as they are read (to avoid occupying too much
411        // memory with buffered data)
412        Ok(
413            stream::iter(blobs).flat_map(
414                move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
415                    // Created buffered reader
416                    let reader = Read::new(blob, blob_size, buffer);
417
418                    // Read over the blob
419                    stream::unfold(
420                        (section, reader, 0u32, 0u64, codec_config, compressed),
421                        move |(
422                            section,
423                            mut reader,
424                            offset,
425                            valid_size,
426                            codec_config,
427                            compressed,
428                        )| async move {
429                            // Check if we are at the end of the blob
430                            if offset >= max_offset {
431                                return None;
432                            }
433
434                            // Read an item from the buffer
435                            match Self::read_buffered(
436                                &mut reader,
437                                offset,
438                                &codec_config,
439                                compressed,
440                            )
441                            .await
442                            {
443                                Ok((next_offset, next_valid_size, size, item)) => {
444                                    trace!(blob = section, cursor = offset, "replayed item");
445                                    Some((
446                                        Ok((section, offset, size, item)),
447                                        (
448                                            section,
449                                            reader,
450                                            next_offset,
451                                            next_valid_size,
452                                            codec_config,
453                                            compressed,
454                                        ),
455                                    ))
456                                }
457                                Err(Error::ChecksumMismatch(expected, found)) => {
458                                    // If we encounter corruption, we prune to the last valid item. This
459                                    // can happen during an unclean file close (where pending data is not
460                                    // fully synced to disk).
461                                    warn!(
462                                        blob = section,
463                                        new_offset = offset,
464                                        new_size = valid_size,
465                                        expected,
466                                        found,
467                                        "corruption detected: truncating"
468                                    );
469                                    reader.resize(valid_size).await.ok()?;
470                                    None
471                                }
472                                Err(Error::Runtime(RError::BlobInsufficientLength)) => {
473                                    // If we encounter trailing bytes, we prune to the last
474                                    // valid item. This can happen during an unclean file close (where
475                                    // pending data is not fully synced to disk).
476                                    warn!(
477                                        blob = section,
478                                        new_offset = offset,
479                                        new_size = valid_size,
480                                        "trailing bytes detected: truncating"
481                                    );
482                                    reader.resize(valid_size).await.ok()?;
483                                    None
484                                }
485                                Err(err) => {
486                                    // If we encounter an unexpected error, return it without attempting
487                                    // to fix anything.
488                                    warn!(
489                                        blob = section,
490                                        cursor = offset,
491                                        ?err,
492                                        "unexpected error"
493                                    );
494                                    Some((
495                                        Err(err),
496                                        (
497                                            section,
498                                            reader,
499                                            offset,
500                                            valid_size,
501                                            codec_config,
502                                            compressed,
503                                        ),
504                                    ))
505                                }
506                            }
507                        },
508                    )
509                },
510            ),
511        )
512    }
513
514    /// Appends an item to `Journal` in a given `section`, returning the offset
515    /// where the item was written and the size of the item (which may now be smaller
516    /// than the encoded size from the codec, if compression is enabled).
517    ///
518    /// # Warning
519    ///
520    /// If there exist trailing bytes in the `Blob` of a particular `section` and
521    /// `replay` is not called before this, it is likely that subsequent data added
522    /// to the `Blob` will be considered corrupted (as the trailing bytes will fail
523    /// the checksum verification). It is recommended to call `replay` before calling
524    /// `append` to prevent this.
525    pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
526        // Check last pruned
527        self.prune_guard(section, false)?;
528
529        // Create item
530        let encoded = item.encode();
531        let encoded = if let Some(compression) = self.cfg.compression {
532            compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
533        } else {
534            encoded.into()
535        };
536
537        // Ensure item is not too large
538        let item_len = encoded.len();
539        let entry_len = 4 + item_len + 4;
540        let item_len = match item_len.try_into() {
541            Ok(len) => len,
542            Err(_) => return Err(Error::ItemTooLarge(item_len)),
543        };
544
545        // Get existing blob or create new one
546        let blob = match self.blobs.entry(section) {
547            Entry::Occupied(entry) => entry.into_mut(),
548            Entry::Vacant(entry) => {
549                let name = section.to_be_bytes();
550                let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
551                let blob = Write::new(blob, size, self.cfg.write_buffer);
552                self.tracked.inc();
553                entry.insert(blob)
554            }
555        };
556
557        // Populate buffer
558        let mut buf = Vec::with_capacity(entry_len);
559        buf.put_u32(item_len);
560        buf.put_slice(&encoded);
561        let checksum = crc32fast::hash(&buf);
562        buf.put_u32(checksum);
563        assert_eq!(buf.len(), entry_len);
564
565        // Append item to blob
566        let cursor = blob.size().await;
567        let offset = compute_next_offset(cursor)?;
568        let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
569        blob.write_at(buf, aligned_cursor).await?;
570        trace!(blob = section, offset, "appended item");
571        Ok((offset, item_len))
572    }
573
574    /// Retrieves an item from `Journal` at a given `section` and `offset`.
575    pub async fn get(&self, section: u64, offset: u32) -> Result<Option<V>, Error> {
576        self.prune_guard(section, false)?;
577        let blob = match self.blobs.get(&section) {
578            Some(blob) => blob,
579            None => return Ok(None),
580        };
581
582        // Perform a multi-op read.
583        let (_, _, item) = Self::read(
584            self.cfg.compression.is_some(),
585            &self.cfg.codec_config,
586            blob,
587            offset,
588        )
589        .await?;
590        Ok(Some(item))
591    }
592
593    /// Retrieves an item from `Journal` at a given `section` and `offset` with a given size.
594    pub async fn get_exact(
595        &self,
596        section: u64,
597        offset: u32,
598        size: u32,
599    ) -> Result<Option<V>, Error> {
600        self.prune_guard(section, false)?;
601        let blob = match self.blobs.get(&section) {
602            Some(blob) => blob,
603            None => return Ok(None),
604        };
605
606        // Perform a multi-op read.
607        let item = Self::read_exact(
608            self.cfg.compression.is_some(),
609            &self.cfg.codec_config,
610            blob,
611            offset,
612            size,
613        )
614        .await?;
615        Ok(Some(item))
616    }
617
618    /// Gets the size of the journal for a specific section.
619    ///
620    /// Returns 0 if the section does not exist.
621    pub async fn size(&self, section: u64) -> Result<u64, Error> {
622        self.prune_guard(section, false)?;
623        match self.blobs.get(&section) {
624            Some(blob) => Ok(blob.size().await),
625            None => Ok(0),
626        }
627    }
628
629    /// Rewinds the journal to the given `section` and `offset`, removing any data beyond it.
630    pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
631        self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
632    }
633
634    /// Rewinds the journal to the given `section` and `size`.
635    ///
636    /// This removes any data beyond the specified `section` and `size`.
637    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
638        self.prune_guard(section, false)?;
639
640        // Remove any sections beyond the given section
641        let trailing: Vec<u64> = self
642            .blobs
643            .range((
644                std::ops::Bound::Excluded(section),
645                std::ops::Bound::Unbounded,
646            ))
647            .map(|(&section, _)| section)
648            .collect();
649        for index in &trailing {
650            // Remove the underlying blob from storage.
651            let blob = self.blobs.remove(index).unwrap();
652
653            // Destroy the blob
654            blob.close().await?;
655            self.context
656                .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
657                .await?;
658            debug!(section = index, "removed section");
659            self.tracked.dec();
660        }
661
662        // If the section exists, truncate it to the given offset
663        let blob = match self.blobs.get_mut(&section) {
664            Some(blob) => blob,
665            None => return Ok(()),
666        };
667        let current = blob.size().await;
668        if size >= current {
669            return Ok(()); // Already smaller than or equal to target size
670        }
671        blob.resize(size).await?;
672        debug!(
673            section,
674            from = current,
675            to = size,
676            ?trailing,
677            "rewound journal"
678        );
679        Ok(())
680    }
681
682    /// Rewinds the `section` to the given `size`.
683    ///
684    /// Unlike [Self::rewind], this method does not modify anything other than the given `section`.
685    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
686        self.prune_guard(section, false)?;
687
688        // Get the blob at the given section
689        let blob = match self.blobs.get_mut(&section) {
690            Some(blob) => blob,
691            None => return Ok(()),
692        };
693
694        // Truncate the blob to the given size
695        let current = blob.size().await;
696        if size >= current {
697            return Ok(()); // Already smaller than or equal to target size
698        }
699        blob.resize(size).await?;
700        debug!(section, from = current, to = size, "rewound section");
701        Ok(())
702    }
703
704    /// Ensures that all data in a given `section` is synced to the underlying store.
705    ///
706    /// If the `section` does not exist, no error will be returned.
707    pub async fn sync(&self, section: u64) -> Result<(), Error> {
708        self.prune_guard(section, false)?;
709        let blob = match self.blobs.get(&section) {
710            Some(blob) => blob,
711            None => return Ok(()),
712        };
713        self.synced.inc();
714        blob.sync().await.map_err(Error::Runtime)
715    }
716
717    /// Prunes all `sections` less than `min`.
718    pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
719        // Check if we already ran this prune
720        self.prune_guard(min, true)?;
721
722        // Prune any blobs that are smaller than the minimum
723        while let Some((&section, _)) = self.blobs.first_key_value() {
724            // Stop pruning if we reach the minimum
725            if section >= min {
726                break;
727            }
728
729            // Remove and close blob
730            let blob = self.blobs.remove(&section).unwrap();
731            let size = blob.size().await;
732            blob.close().await?;
733
734            // Remove blob from storage
735            self.context
736                .remove(&self.cfg.partition, Some(&section.to_be_bytes()))
737                .await?;
738            debug!(blob = section, size, "pruned blob");
739            self.tracked.dec();
740            self.pruned.inc();
741        }
742
743        // Update oldest allowed
744        self.oldest_allowed = Some(min);
745        Ok(())
746    }
747
748    /// Closes all open sections.
749    pub async fn close(self) -> Result<(), Error> {
750        for (section, blob) in self.blobs.into_iter() {
751            let size = blob.size().await;
752            blob.close().await?;
753            debug!(blob = section, size, "closed blob");
754        }
755        Ok(())
756    }
757
758    /// Close and remove any underlying blobs created by the journal.
759    pub async fn destroy(self) -> Result<(), Error> {
760        for (i, blob) in self.blobs.into_iter() {
761            let size = blob.size().await;
762            blob.close().await?;
763            debug!(blob = i, size, "destroyed blob");
764            self.context
765                .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
766                .await?;
767        }
768        match self.context.remove(&self.cfg.partition, None).await {
769            Ok(()) => {}
770            Err(RError::PartitionMissing(_)) => {
771                // Partition already removed or never existed.
772            }
773            Err(err) => return Err(Error::Runtime(err)),
774        }
775        Ok(())
776    }
777}
778
779#[cfg(test)]
780mod tests {
781    use super::*;
782    use bytes::BufMut;
783    use commonware_cryptography::hash;
784    use commonware_macros::test_traced;
785    use commonware_runtime::{deterministic, Blob, Error as RError, Runner, Storage};
786    use commonware_utils::StableBuf;
787    use futures::{pin_mut, StreamExt};
788    use prometheus_client::registry::Metric;
789
790    #[test_traced]
791    fn test_journal_append_and_read() {
792        // Initialize the deterministic context
793        let executor = deterministic::Runner::default();
794
795        // Start the test within the executor
796        executor.start(|context| async move {
797            // Initialize the journal
798            let cfg = Config {
799                partition: "test_partition".into(),
800                compression: None,
801                codec_config: (),
802                write_buffer: 1024,
803            };
804            let index = 1u64;
805            let data = 10;
806            let mut journal = Journal::init(context.clone(), cfg.clone())
807                .await
808                .expect("Failed to initialize journal");
809
810            // Append an item to the journal
811            journal
812                .append(index, data)
813                .await
814                .expect("Failed to append data");
815
816            // Check metrics
817            let buffer = context.encode();
818            assert!(buffer.contains("tracked 1"));
819
820            // Close the journal
821            journal.close().await.expect("Failed to close journal");
822
823            // Re-initialize the journal to simulate a restart
824            let cfg = Config {
825                partition: "test_partition".into(),
826                compression: None,
827                codec_config: (),
828                write_buffer: 1024,
829            };
830            let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
831                .await
832                .expect("Failed to re-initialize journal");
833
834            // Replay the journal and collect items
835            let mut items = Vec::new();
836            let stream = journal.replay(1024).await.expect("unable to setup replay");
837            pin_mut!(stream);
838            while let Some(result) = stream.next().await {
839                match result {
840                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
841                    Err(err) => panic!("Failed to read item: {err}"),
842                }
843            }
844
845            // Verify that the item was replayed correctly
846            assert_eq!(items.len(), 1);
847            assert_eq!(items[0].0, index);
848            assert_eq!(items[0].1, data);
849
850            // Check metrics
851            let buffer = context.encode();
852            assert!(buffer.contains("tracked 1"));
853        });
854    }
855
856    #[test_traced]
857    fn test_journal_multiple_appends_and_reads() {
858        // Initialize the deterministic context
859        let executor = deterministic::Runner::default();
860
861        // Start the test within the executor
862        executor.start(|context| async move {
863            // Create a journal configuration
864            let cfg = Config {
865                partition: "test_partition".into(),
866                compression: None,
867                codec_config: (),
868                write_buffer: 1024,
869            };
870
871            // Initialize the journal
872            let mut journal = Journal::init(context.clone(), cfg.clone())
873                .await
874                .expect("Failed to initialize journal");
875
876            // Append multiple items to different blobs
877            let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
878            for (index, data) in &data_items {
879                journal
880                    .append(*index, *data)
881                    .await
882                    .expect("Failed to append data");
883                journal.sync(*index).await.expect("Failed to sync blob");
884            }
885
886            // Check metrics
887            let buffer = context.encode();
888            assert!(buffer.contains("tracked 3"));
889            assert!(buffer.contains("synced_total 4"));
890
891            // Close the journal
892            journal.close().await.expect("Failed to close journal");
893
894            // Re-initialize the journal to simulate a restart
895            let journal = Journal::init(context, cfg)
896                .await
897                .expect("Failed to re-initialize journal");
898
899            // Replay the journal and collect items
900            let mut items = Vec::<(u64, u32)>::new();
901            {
902                let stream = journal.replay(1024).await.expect("unable to setup replay");
903                pin_mut!(stream);
904                while let Some(result) = stream.next().await {
905                    match result {
906                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
907                        Err(err) => panic!("Failed to read item: {err}"),
908                    }
909                }
910            }
911
912            // Verify that all items were replayed correctly
913            assert_eq!(items.len(), data_items.len());
914            for ((expected_index, expected_data), (actual_index, actual_data)) in
915                data_items.iter().zip(items.iter())
916            {
917                assert_eq!(actual_index, expected_index);
918                assert_eq!(actual_data, expected_data);
919            }
920
921            // Cleanup
922            journal.destroy().await.expect("Failed to destroy journal");
923        });
924    }
925
926    #[test_traced]
927    fn test_journal_prune_blobs() {
928        // Initialize the deterministic context
929        let executor = deterministic::Runner::default();
930
931        // Start the test within the executor
932        executor.start(|context| async move {
933            // Create a journal configuration
934            let cfg = Config {
935                partition: "test_partition".into(),
936                compression: None,
937                codec_config: (),
938                write_buffer: 1024,
939            };
940
941            // Initialize the journal
942            let mut journal = Journal::init(context.clone(), cfg.clone())
943                .await
944                .expect("Failed to initialize journal");
945
946            // Append items to multiple blobs
947            for index in 1u64..=5u64 {
948                journal
949                    .append(index, index)
950                    .await
951                    .expect("Failed to append data");
952                journal.sync(index).await.expect("Failed to sync blob");
953            }
954
955            // Add one item out-of-order
956            let data = 99;
957            journal
958                .append(2u64, data)
959                .await
960                .expect("Failed to append data");
961            journal.sync(2u64).await.expect("Failed to sync blob");
962
963            // Prune blobs with indices less than 3
964            journal.prune(3).await.expect("Failed to prune blobs");
965
966            // Prune again with a section less than the previous one
967            let result = journal.prune(2).await;
968            assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
969
970            // Prune again with the same section
971            let result = journal.prune(3).await;
972            assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
973
974            // Check metrics
975            let buffer = context.encode();
976            assert!(buffer.contains("pruned_total 2"));
977
978            // Close the journal
979            journal.close().await.expect("Failed to close journal");
980
981            // Re-initialize the journal to simulate a restart
982            let mut journal = Journal::init(context.clone(), cfg.clone())
983                .await
984                .expect("Failed to re-initialize journal");
985
986            // Replay the journal and collect items
987            let mut items = Vec::<(u64, u64)>::new();
988            {
989                let stream = journal.replay(1024).await.expect("unable to setup replay");
990                pin_mut!(stream);
991                while let Some(result) = stream.next().await {
992                    match result {
993                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
994                        Err(err) => panic!("Failed to read item: {err}"),
995                    }
996                }
997            }
998
999            // Verify that items from blobs 1 and 2 are not present
1000            assert_eq!(items.len(), 3);
1001            let expected_indices = [3u64, 4u64, 5u64];
1002            for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1003                assert_eq!(item.0, *expected_index);
1004            }
1005
1006            // Prune all blobs
1007            journal.prune(6).await.expect("Failed to prune blobs");
1008
1009            // Close the journal
1010            journal.close().await.expect("Failed to close journal");
1011
1012            // Ensure no remaining blobs exist
1013            //
1014            // Note: We don't remove the partition, so this does not error
1015            // and instead returns an empty list of blobs.
1016            assert!(context
1017                .scan(&cfg.partition)
1018                .await
1019                .expect("Failed to list blobs")
1020                .is_empty());
1021        });
1022    }
1023
1024    #[test_traced]
1025    fn test_journal_with_invalid_blob_name() {
1026        // Initialize the deterministic context
1027        let executor = deterministic::Runner::default();
1028
1029        // Start the test within the executor
1030        executor.start(|context| async move {
1031            // Create a journal configuration
1032            let cfg = Config {
1033                partition: "test_partition".into(),
1034                compression: None,
1035                codec_config: (),
1036                write_buffer: 1024,
1037            };
1038
1039            // Manually create a blob with an invalid name (not 8 bytes)
1040            let invalid_blob_name = b"invalid"; // Less than 8 bytes
1041            let (blob, _) = context
1042                .open(&cfg.partition, invalid_blob_name)
1043                .await
1044                .expect("Failed to create blob with invalid name");
1045            blob.close().await.expect("Failed to close blob");
1046
1047            // Attempt to initialize the journal
1048            let result = Journal::<_, u64>::init(context, cfg).await;
1049
1050            // Expect an error
1051            assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1052        });
1053    }
1054
1055    #[test_traced]
1056    fn test_journal_read_size_missing() {
1057        // Initialize the deterministic context
1058        let executor = deterministic::Runner::default();
1059
1060        // Start the test within the executor
1061        executor.start(|context| async move {
1062            // Create a journal configuration
1063            let cfg = Config {
1064                partition: "test_partition".into(),
1065                compression: None,
1066                codec_config: (),
1067                write_buffer: 1024,
1068            };
1069
1070            // Manually create a blob with incomplete size data
1071            let section = 1u64;
1072            let blob_name = section.to_be_bytes();
1073            let (blob, _) = context
1074                .open(&cfg.partition, &blob_name)
1075                .await
1076                .expect("Failed to create blob");
1077
1078            // Write incomplete size data (less than 4 bytes)
1079            let incomplete_data = vec![0x00, 0x01]; // Less than 4 bytes
1080            blob.write_at(incomplete_data, 0)
1081                .await
1082                .expect("Failed to write incomplete data");
1083            blob.close().await.expect("Failed to close blob");
1084
1085            // Initialize the journal
1086            let journal = Journal::init(context, cfg)
1087                .await
1088                .expect("Failed to initialize journal");
1089
1090            // Attempt to replay the journal
1091            let stream = journal.replay(1024).await.expect("unable to setup replay");
1092            pin_mut!(stream);
1093            let mut items = Vec::<(u64, u64)>::new();
1094            while let Some(result) = stream.next().await {
1095                match result {
1096                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1097                    Err(err) => panic!("Failed to read item: {err}"),
1098                }
1099            }
1100            assert!(items.is_empty());
1101        });
1102    }
1103
1104    #[test_traced]
1105    fn test_journal_read_item_missing() {
1106        // Initialize the deterministic context
1107        let executor = deterministic::Runner::default();
1108
1109        // Start the test within the executor
1110        executor.start(|context| async move {
1111            // Create a journal configuration
1112            let cfg = Config {
1113                partition: "test_partition".into(),
1114                compression: None,
1115                codec_config: (),
1116                write_buffer: 1024,
1117            };
1118
1119            // Manually create a blob with missing item data
1120            let section = 1u64;
1121            let blob_name = section.to_be_bytes();
1122            let (blob, _) = context
1123                .open(&cfg.partition, &blob_name)
1124                .await
1125                .expect("Failed to create blob");
1126
1127            // Write size but no item data
1128            let item_size: u32 = 10; // Size of the item
1129            let mut buf = Vec::new();
1130            buf.put_u32(item_size);
1131            let data = [2u8; 5];
1132            BufMut::put_slice(&mut buf, &data);
1133            blob.write_at(buf, 0)
1134                .await
1135                .expect("Failed to write item size");
1136            blob.close().await.expect("Failed to close blob");
1137
1138            // Initialize the journal
1139            let journal = Journal::init(context, cfg)
1140                .await
1141                .expect("Failed to initialize journal");
1142
1143            // Attempt to replay the journal
1144            let stream = journal.replay(1024).await.expect("unable to setup replay");
1145            pin_mut!(stream);
1146            let mut items = Vec::<(u64, u64)>::new();
1147            while let Some(result) = stream.next().await {
1148                match result {
1149                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1150                    Err(err) => panic!("Failed to read item: {err}"),
1151                }
1152            }
1153            assert!(items.is_empty());
1154        });
1155    }
1156
1157    #[test_traced]
1158    fn test_journal_read_checksum_missing() {
1159        // Initialize the deterministic context
1160        let executor = deterministic::Runner::default();
1161
1162        // Start the test within the executor
1163        executor.start(|context| async move {
1164            // Create a journal configuration
1165            let cfg = Config {
1166                partition: "test_partition".into(),
1167                compression: None,
1168                codec_config: (),
1169                write_buffer: 1024,
1170            };
1171
1172            // Manually create a blob with missing checksum
1173            let section = 1u64;
1174            let blob_name = section.to_be_bytes();
1175            let (blob, _) = context
1176                .open(&cfg.partition, &blob_name)
1177                .await
1178                .expect("Failed to create blob");
1179
1180            // Prepare item data
1181            let item_data = b"Test data";
1182            let item_size = item_data.len() as u32;
1183
1184            // Write size
1185            let mut offset = 0;
1186            blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1187                .await
1188                .expect("Failed to write item size");
1189            offset += 4;
1190
1191            // Write item data
1192            blob.write_at(item_data.to_vec(), offset)
1193                .await
1194                .expect("Failed to write item data");
1195            // Do not write checksum (omit it)
1196
1197            blob.close().await.expect("Failed to close blob");
1198
1199            // Initialize the journal
1200            let journal = Journal::init(context, cfg)
1201                .await
1202                .expect("Failed to initialize journal");
1203
1204            // Attempt to replay the journal
1205            //
1206            // This will truncate the leftover bytes from our manual write.
1207            let stream = journal.replay(1024).await.expect("unable to setup replay");
1208            pin_mut!(stream);
1209            let mut items = Vec::<(u64, u64)>::new();
1210            while let Some(result) = stream.next().await {
1211                match result {
1212                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1213                    Err(err) => panic!("Failed to read item: {err}"),
1214                }
1215            }
1216            assert!(items.is_empty());
1217        });
1218    }
1219
1220    #[test_traced]
1221    fn test_journal_read_checksum_mismatch() {
1222        // Initialize the deterministic context
1223        let executor = deterministic::Runner::default();
1224
1225        // Start the test within the executor
1226        executor.start(|context| async move {
1227            // Create a journal configuration
1228            let cfg = Config {
1229                partition: "test_partition".into(),
1230                compression: None,
1231                codec_config: (),
1232                write_buffer: 1024,
1233            };
1234
1235            // Manually create a blob with incorrect checksum
1236            let section = 1u64;
1237            let blob_name = section.to_be_bytes();
1238            let (blob, _) = context
1239                .open(&cfg.partition, &blob_name)
1240                .await
1241                .expect("Failed to create blob");
1242
1243            // Prepare item data
1244            let item_data = b"Test data";
1245            let item_size = item_data.len() as u32;
1246            let incorrect_checksum: u32 = 0xDEADBEEF;
1247
1248            // Write size
1249            let mut offset = 0;
1250            blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1251                .await
1252                .expect("Failed to write item size");
1253            offset += 4;
1254
1255            // Write item data
1256            blob.write_at(item_data.to_vec(), offset)
1257                .await
1258                .expect("Failed to write item data");
1259            offset += item_data.len() as u64;
1260
1261            // Write incorrect checksum
1262            blob.write_at(incorrect_checksum.to_be_bytes().to_vec(), offset)
1263                .await
1264                .expect("Failed to write incorrect checksum");
1265
1266            blob.close().await.expect("Failed to close blob");
1267
1268            // Initialize the journal
1269            let journal = Journal::init(context.clone(), cfg.clone())
1270                .await
1271                .expect("Failed to initialize journal");
1272
1273            // Attempt to replay the journal
1274            {
1275                let stream = journal.replay(1024).await.expect("unable to setup replay");
1276                pin_mut!(stream);
1277                let mut items = Vec::<(u64, u64)>::new();
1278                while let Some(result) = stream.next().await {
1279                    match result {
1280                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1281                        Err(err) => panic!("Failed to read item: {err}"),
1282                    }
1283                }
1284                assert!(items.is_empty());
1285            }
1286            journal.close().await.expect("Failed to close journal");
1287
1288            // Confirm blob is expected length
1289            let (_, blob_size) = context
1290                .open(&cfg.partition, &section.to_be_bytes())
1291                .await
1292                .expect("Failed to open blob");
1293            assert_eq!(blob_size, 0);
1294        });
1295    }
1296
1297    #[test_traced]
1298    fn test_journal_handling_unaligned_truncated_data() {
1299        // Initialize the deterministic context
1300        let executor = deterministic::Runner::default();
1301
1302        // Start the test within the executor
1303        executor.start(|context| async move {
1304            // Create a journal configuration
1305            let cfg = Config {
1306                partition: "test_partition".into(),
1307                compression: None,
1308                codec_config: (),
1309                write_buffer: 1024,
1310            };
1311
1312            // Initialize the journal
1313            let mut journal = Journal::init(context.clone(), cfg.clone())
1314                .await
1315                .expect("Failed to initialize journal");
1316
1317            // Append 1 item to the first index
1318            journal.append(1, 1).await.expect("Failed to append data");
1319
1320            // Append multiple items to the second index (with unaligned values)
1321            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1322            for (index, data) in &data_items {
1323                journal
1324                    .append(*index, *data)
1325                    .await
1326                    .expect("Failed to append data");
1327                journal.sync(*index).await.expect("Failed to sync blob");
1328            }
1329
1330            // Close the journal
1331            journal.close().await.expect("Failed to close journal");
1332
1333            // Manually corrupt the end of the second blob
1334            let (blob, blob_size) = context
1335                .open(&cfg.partition, &2u64.to_be_bytes())
1336                .await
1337                .expect("Failed to open blob");
1338            blob.resize(blob_size - 4)
1339                .await
1340                .expect("Failed to corrupt blob");
1341            blob.close().await.expect("Failed to close blob");
1342
1343            // Re-initialize the journal to simulate a restart
1344            let journal = Journal::init(context.clone(), cfg.clone())
1345                .await
1346                .expect("Failed to re-initialize journal");
1347
1348            // Attempt to replay the journal
1349            let mut items = Vec::<(u64, u32)>::new();
1350            {
1351                let stream = journal.replay(1024).await.expect("unable to setup replay");
1352                pin_mut!(stream);
1353                while let Some(result) = stream.next().await {
1354                    match result {
1355                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1356                        Err(err) => panic!("Failed to read item: {err}"),
1357                    }
1358                }
1359            }
1360            journal.close().await.expect("Failed to close journal");
1361
1362            // Verify that only non-corrupted items were replayed
1363            assert_eq!(items.len(), 3);
1364            assert_eq!(items[0].0, 1);
1365            assert_eq!(items[0].1, 1);
1366            assert_eq!(items[1].0, data_items[0].0);
1367            assert_eq!(items[1].1, data_items[0].1);
1368            assert_eq!(items[2].0, data_items[1].0);
1369            assert_eq!(items[2].1, data_items[1].1);
1370
1371            // Confirm blob is expected length
1372            let (_, blob_size) = context
1373                .open(&cfg.partition, &2u64.to_be_bytes())
1374                .await
1375                .expect("Failed to open blob");
1376            assert_eq!(blob_size, 28);
1377
1378            // Attempt to replay journal after truncation
1379            let mut journal = Journal::init(context.clone(), cfg.clone())
1380                .await
1381                .expect("Failed to re-initialize journal");
1382
1383            // Attempt to replay the journal
1384            let mut items = Vec::<(u64, u32)>::new();
1385            {
1386                let stream = journal.replay(1024).await.expect("unable to setup replay");
1387                pin_mut!(stream);
1388                while let Some(result) = stream.next().await {
1389                    match result {
1390                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1391                        Err(err) => panic!("Failed to read item: {err}"),
1392                    }
1393                }
1394            }
1395
1396            // Verify that only non-corrupted items were replayed
1397            assert_eq!(items.len(), 3);
1398            assert_eq!(items[0].0, 1);
1399            assert_eq!(items[0].1, 1);
1400            assert_eq!(items[1].0, data_items[0].0);
1401            assert_eq!(items[1].1, data_items[0].1);
1402            assert_eq!(items[2].0, data_items[1].0);
1403            assert_eq!(items[2].1, data_items[1].1);
1404
1405            // Append a new item to truncated partition
1406            journal.append(2, 5).await.expect("Failed to append data");
1407            journal.sync(2).await.expect("Failed to sync blob");
1408
1409            // Get the new item
1410            let item = journal
1411                .get(2, 2)
1412                .await
1413                .expect("Failed to get item")
1414                .expect("Failed to get item");
1415            assert_eq!(item, 5);
1416
1417            // Close the journal
1418            journal.close().await.expect("Failed to close journal");
1419
1420            // Confirm blob is expected length
1421            let (_, blob_size) = context
1422                .open(&cfg.partition, &2u64.to_be_bytes())
1423                .await
1424                .expect("Failed to open blob");
1425            assert_eq!(blob_size, 44);
1426
1427            // Re-initialize the journal to simulate a restart
1428            let journal = Journal::init(context.clone(), cfg.clone())
1429                .await
1430                .expect("Failed to re-initialize journal");
1431
1432            // Attempt to replay the journal
1433            let mut items = Vec::<(u64, u32)>::new();
1434            {
1435                let stream = journal.replay(1024).await.expect("unable to setup replay");
1436                pin_mut!(stream);
1437                while let Some(result) = stream.next().await {
1438                    match result {
1439                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1440                        Err(err) => panic!("Failed to read item: {err}"),
1441                    }
1442                }
1443            }
1444
1445            // Verify that only non-corrupted items were replayed
1446            assert_eq!(items.len(), 4);
1447            assert_eq!(items[0].0, 1);
1448            assert_eq!(items[0].1, 1);
1449            assert_eq!(items[1].0, data_items[0].0);
1450            assert_eq!(items[1].1, data_items[0].1);
1451            assert_eq!(items[2].0, data_items[1].0);
1452            assert_eq!(items[2].1, data_items[1].1);
1453            assert_eq!(items[3].0, 2);
1454            assert_eq!(items[3].1, 5);
1455        });
1456    }
1457
1458    #[test_traced]
1459    fn test_journal_handling_aligned_truncated_data() {
1460        // Initialize the deterministic context
1461        let executor = deterministic::Runner::default();
1462
1463        // Start the test within the executor
1464        executor.start(|context| async move {
1465            // Create a journal configuration
1466            let cfg = Config {
1467                partition: "test_partition".into(),
1468                compression: None,
1469                codec_config: (),
1470                write_buffer: 1024,
1471            };
1472
1473            // Initialize the journal
1474            let mut journal = Journal::init(context.clone(), cfg.clone())
1475                .await
1476                .expect("Failed to initialize journal");
1477
1478            // Append 1 item to the first index
1479            journal.append(1, 1).await.expect("Failed to append data");
1480
1481            // Append multiple items to the second index (with unaligned values)
1482            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1483            for (index, data) in &data_items {
1484                journal
1485                    .append(*index, *data)
1486                    .await
1487                    .expect("Failed to append data");
1488                journal.sync(*index).await.expect("Failed to sync blob");
1489            }
1490
1491            // Close the journal
1492            journal.close().await.expect("Failed to close journal");
1493
1494            // Manually corrupt the end of the second blob
1495            let (blob, blob_size) = context
1496                .open(&cfg.partition, &2u64.to_be_bytes())
1497                .await
1498                .expect("Failed to open blob");
1499            blob.resize(blob_size - 4)
1500                .await
1501                .expect("Failed to corrupt blob");
1502            blob.close().await.expect("Failed to close blob");
1503
1504            // Re-initialize the journal to simulate a restart
1505            let mut journal = Journal::init(context.clone(), cfg.clone())
1506                .await
1507                .expect("Failed to re-initialize journal");
1508
1509            // Attempt to replay the journal
1510            let mut items = Vec::<(u64, u64)>::new();
1511            {
1512                let stream = journal.replay(1024).await.expect("unable to setup replay");
1513                pin_mut!(stream);
1514                while let Some(result) = stream.next().await {
1515                    match result {
1516                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1517                        Err(err) => panic!("Failed to read item: {err}"),
1518                    }
1519                }
1520            }
1521
1522            // Verify that only non-corrupted items were replayed
1523            assert_eq!(items.len(), 3);
1524            assert_eq!(items[0].0, 1);
1525            assert_eq!(items[0].1, 1);
1526            assert_eq!(items[1].0, data_items[0].0);
1527            assert_eq!(items[1].1, data_items[0].1);
1528            assert_eq!(items[2].0, data_items[1].0);
1529            assert_eq!(items[2].1, data_items[1].1);
1530
1531            // Append a new item to the truncated partition
1532            journal.append(2, 5).await.expect("Failed to append data");
1533            journal.sync(2).await.expect("Failed to sync blob");
1534
1535            // Get the new item
1536            let item = journal
1537                .get(2, 2)
1538                .await
1539                .expect("Failed to get item")
1540                .expect("Failed to get item");
1541            assert_eq!(item, 5);
1542
1543            // Close the journal
1544            journal.close().await.expect("Failed to close journal");
1545
1546            // Confirm blob is expected length
1547            let (_, blob_size) = context
1548                .open(&cfg.partition, &2u64.to_be_bytes())
1549                .await
1550                .expect("Failed to open blob");
1551            assert_eq!(blob_size, 48);
1552
1553            // Attempt to replay journal after truncation
1554            let journal = Journal::init(context, cfg)
1555                .await
1556                .expect("Failed to re-initialize journal");
1557
1558            // Attempt to replay the journal
1559            let mut items = Vec::<(u64, u64)>::new();
1560            {
1561                let stream = journal.replay(1024).await.expect("unable to setup replay");
1562                pin_mut!(stream);
1563                while let Some(result) = stream.next().await {
1564                    match result {
1565                        Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1566                        Err(err) => panic!("Failed to read item: {err}"),
1567                    }
1568                }
1569            }
1570            journal.close().await.expect("Failed to close journal");
1571
1572            // Verify that only non-corrupted items were replayed
1573            assert_eq!(items.len(), 4);
1574            assert_eq!(items[0].0, 1);
1575            assert_eq!(items[0].1, 1);
1576            assert_eq!(items[1].0, data_items[0].0);
1577            assert_eq!(items[1].1, data_items[0].1);
1578            assert_eq!(items[2].0, data_items[1].0);
1579            assert_eq!(items[2].1, data_items[1].1);
1580            assert_eq!(items[3].0, 2);
1581            assert_eq!(items[3].1, 5);
1582        });
1583    }
1584
1585    #[test_traced]
1586    fn test_journal_handling_extra_data() {
1587        // Initialize the deterministic context
1588        let executor = deterministic::Runner::default();
1589
1590        // Start the test within the executor
1591        executor.start(|context| async move {
1592            // Create a journal configuration
1593            let cfg = Config {
1594                partition: "test_partition".into(),
1595                compression: None,
1596                codec_config: (),
1597                write_buffer: 1024,
1598            };
1599
1600            // Initialize the journal
1601            let mut journal = Journal::init(context.clone(), cfg.clone())
1602                .await
1603                .expect("Failed to initialize journal");
1604
1605            // Append 1 item to the first index
1606            journal.append(1, 1).await.expect("Failed to append data");
1607
1608            // Append multiple items to the second index
1609            let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1610            for (index, data) in &data_items {
1611                journal
1612                    .append(*index, *data)
1613                    .await
1614                    .expect("Failed to append data");
1615                journal.sync(*index).await.expect("Failed to sync blob");
1616            }
1617
1618            // Close the journal
1619            journal.close().await.expect("Failed to close journal");
1620
1621            // Manually add extra data to the end of the second blob
1622            let (blob, blob_size) = context
1623                .open(&cfg.partition, &2u64.to_be_bytes())
1624                .await
1625                .expect("Failed to open blob");
1626            blob.write_at(vec![0u8; 16], blob_size)
1627                .await
1628                .expect("Failed to add extra data");
1629            blob.close().await.expect("Failed to close blob");
1630
1631            // Re-initialize the journal to simulate a restart
1632            let journal = Journal::init(context, cfg)
1633                .await
1634                .expect("Failed to re-initialize journal");
1635
1636            // Attempt to replay the journal
1637            let mut items = Vec::<(u64, i32)>::new();
1638            let stream = journal.replay(1024).await.expect("unable to setup replay");
1639            pin_mut!(stream);
1640            while let Some(result) = stream.next().await {
1641                match result {
1642                    Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1643                    Err(err) => panic!("Failed to read item: {err}"),
1644                }
1645            }
1646        });
1647    }
1648
1649    // Define `MockBlob` that returns an offset length that should overflow
1650    #[derive(Clone)]
1651    struct MockBlob {}
1652
1653    impl Blob for MockBlob {
1654        async fn read_at(
1655            &self,
1656            buf: impl Into<StableBuf> + Send,
1657            _offset: u64,
1658        ) -> Result<StableBuf, RError> {
1659            Ok(buf.into())
1660        }
1661
1662        async fn write_at(
1663            &self,
1664            _buf: impl Into<StableBuf> + Send,
1665            _offset: u64,
1666        ) -> Result<(), RError> {
1667            Ok(())
1668        }
1669
1670        async fn resize(&self, _len: u64) -> Result<(), RError> {
1671            Ok(())
1672        }
1673
1674        async fn sync(&self) -> Result<(), RError> {
1675            Ok(())
1676        }
1677
1678        async fn close(self) -> Result<(), RError> {
1679            Ok(())
1680        }
1681    }
1682
1683    // Define `MockStorage` that returns `MockBlob`
1684    #[derive(Clone)]
1685    struct MockStorage {
1686        len: u64,
1687    }
1688
1689    impl Storage for MockStorage {
1690        type Blob = MockBlob;
1691
1692        async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1693            Ok((MockBlob {}, self.len))
1694        }
1695
1696        async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1697            Ok(())
1698        }
1699
1700        async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1701            Ok(vec![])
1702        }
1703    }
1704
1705    impl Metrics for MockStorage {
1706        fn with_label(&self, _: &str) -> Self {
1707            self.clone()
1708        }
1709
1710        fn label(&self) -> String {
1711            String::new()
1712        }
1713
1714        fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1715
1716        fn encode(&self) -> String {
1717            String::new()
1718        }
1719    }
1720
1721    // Define the `INDEX_ALIGNMENT` again explicitly to ensure we catch any accidental
1722    // changes to the value
1723    const INDEX_ALIGNMENT: u64 = 16;
1724
1725    #[test_traced]
1726    fn test_journal_large_offset() {
1727        // Initialize the deterministic context
1728        let executor = deterministic::Runner::default();
1729        executor.start(|_| async move {
1730            // Create journal
1731            let cfg = Config {
1732                partition: "partition".to_string(),
1733                compression: None,
1734                codec_config: (),
1735                write_buffer: 1024,
1736            };
1737            let context = MockStorage {
1738                len: u32::MAX as u64 * INDEX_ALIGNMENT, // can store up to u32::Max at the last offset
1739            };
1740            let mut journal = Journal::init(context, cfg).await.unwrap();
1741
1742            // Append data
1743            let data = 1;
1744            let (result, _) = journal
1745                .append(1, data)
1746                .await
1747                .expect("Failed to append data");
1748            assert_eq!(result, u32::MAX);
1749        });
1750    }
1751
1752    #[test_traced]
1753    fn test_journal_offset_overflow() {
1754        // Initialize the deterministic context
1755        let executor = deterministic::Runner::default();
1756        executor.start(|_| async move {
1757            // Create journal
1758            let cfg = Config {
1759                partition: "partition".to_string(),
1760                compression: None,
1761                codec_config: (),
1762                write_buffer: 1024,
1763            };
1764            let context = MockStorage {
1765                len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1766            };
1767            let mut journal = Journal::init(context, cfg).await.unwrap();
1768
1769            // Append data
1770            let data = 1;
1771            let result = journal.append(1, data).await;
1772            assert!(matches!(result, Err(Error::OffsetOverflow)));
1773        });
1774    }
1775
1776    #[test_traced]
1777    fn test_journal_rewind() {
1778        // Initialize the deterministic context
1779        let executor = deterministic::Runner::default();
1780        executor.start(|context| async move {
1781            // Create journal
1782            let cfg = Config {
1783                partition: "test_partition".to_string(),
1784                compression: None,
1785                codec_config: (),
1786                write_buffer: 1024,
1787            };
1788            let mut journal = Journal::init(context, cfg).await.unwrap();
1789
1790            // Check size of non-existent section
1791            let size = journal.size(1).await.unwrap();
1792            assert_eq!(size, 0);
1793
1794            // Append data to section 1
1795            journal.append(1, 42i32).await.unwrap();
1796
1797            // Check size of section 1 - should be greater than 0
1798            let size = journal.size(1).await.unwrap();
1799            assert!(size > 0);
1800
1801            // Append more data and verify size increases
1802            journal.append(1, 43i32).await.unwrap();
1803            let new_size = journal.size(1).await.unwrap();
1804            assert!(new_size > size);
1805
1806            // Check size of different section - should still be 0
1807            let size = journal.size(2).await.unwrap();
1808            assert_eq!(size, 0);
1809
1810            // Append data to section 2
1811            journal.append(2, 44i32).await.unwrap();
1812
1813            // Check size of section 2 - should be greater than 0
1814            let size = journal.size(2).await.unwrap();
1815            assert!(size > 0);
1816
1817            // Rollback everything in section 1 and 2
1818            journal.rewind(1, 0).await.unwrap();
1819
1820            // Check size of section 1 - should be 0
1821            let size = journal.size(1).await.unwrap();
1822            assert_eq!(size, 0);
1823
1824            // Check size of section 2 - should be 0
1825            let size = journal.size(2).await.unwrap();
1826            assert_eq!(size, 0);
1827        });
1828    }
1829
1830    #[test_traced]
1831    fn test_journal_rewind_section() {
1832        // Initialize the deterministic context
1833        let executor = deterministic::Runner::default();
1834        executor.start(|context| async move {
1835            // Create journal
1836            let cfg = Config {
1837                partition: "test_partition".to_string(),
1838                compression: None,
1839                codec_config: (),
1840                write_buffer: 1024,
1841            };
1842            let mut journal = Journal::init(context, cfg).await.unwrap();
1843
1844            // Check size of non-existent section
1845            let size = journal.size(1).await.unwrap();
1846            assert_eq!(size, 0);
1847
1848            // Append data to section 1
1849            journal.append(1, 42i32).await.unwrap();
1850
1851            // Check size of section 1 - should be greater than 0
1852            let size = journal.size(1).await.unwrap();
1853            assert!(size > 0);
1854
1855            // Append more data and verify size increases
1856            journal.append(1, 43i32).await.unwrap();
1857            let new_size = journal.size(1).await.unwrap();
1858            assert!(new_size > size);
1859
1860            // Check size of different section - should still be 0
1861            let size = journal.size(2).await.unwrap();
1862            assert_eq!(size, 0);
1863
1864            // Append data to section 2
1865            journal.append(2, 44i32).await.unwrap();
1866
1867            // Check size of section 2 - should be greater than 0
1868            let size = journal.size(2).await.unwrap();
1869            assert!(size > 0);
1870
1871            // Rollback everything in section 1
1872            journal.rewind_section(1, 0).await.unwrap();
1873
1874            // Check size of section 1 - should be 0
1875            let size = journal.size(1).await.unwrap();
1876            assert_eq!(size, 0);
1877
1878            // Check size of section 2 - should be greater than 0
1879            let size = journal.size(2).await.unwrap();
1880            assert!(size > 0);
1881        });
1882    }
1883
1884    /// Protect against accidental changes to the journal disk format.
1885    #[test_traced]
1886    fn test_journal_conformance() {
1887        // Initialize the deterministic context
1888        let executor = deterministic::Runner::default();
1889
1890        // Start the test within the executor
1891        executor.start(|context| async move {
1892            // Create a journal configuration
1893            let cfg = Config {
1894                partition: "test_partition".into(),
1895                compression: None,
1896                codec_config: (),
1897                write_buffer: 1024,
1898            };
1899
1900            // Initialize the journal
1901            let mut journal = Journal::init(context.clone(), cfg.clone())
1902                .await
1903                .expect("Failed to initialize journal");
1904
1905            // Append 100 items to the journal
1906            for i in 0..100 {
1907                journal.append(1, i).await.expect("Failed to append data");
1908            }
1909            journal.sync(1).await.expect("Failed to sync blob");
1910
1911            // Close the journal
1912            journal.close().await.expect("Failed to close journal");
1913
1914            // Hash blob contents
1915            let (blob, size) = context
1916                .open(&cfg.partition, &1u64.to_be_bytes())
1917                .await
1918                .expect("Failed to open blob");
1919            assert!(size > 0);
1920            let buf = blob
1921                .read_at(vec![0u8; size as usize], 0)
1922                .await
1923                .expect("Failed to read blob");
1924            let digest = hash(buf.as_ref());
1925            assert_eq!(
1926                hex(&digest),
1927                "ca3845fa7fabd4d2855ab72ed21226d1d6eb30cb895ea9ec5e5a14201f3f25d8",
1928            );
1929        });
1930    }
1931}