Skip to main content

commonware_storage/journal/segmented/
fixed.rs

1//! Segmented journal for fixed-size items.
2//!
3//! # Format
4//!
5//! Data is stored in one blob per section. Items are stored sequentially:
6//!
7//! ```text
8//! +--------+--------+--------+----------+
9//! | item_0 | item_1 |   ...  | item_n-1 |
10//! +--------+--------+--------+----------+
11//! ```
12//!
13//! # Sync
14//!
15//! Data written to `Journal` may not be immediately persisted to `Storage`. Use the
16//! `sync` method to force pending data to be written.
17//!
18//! # Pruning
19//!
20//! All data must be assigned to a `section`. This allows pruning entire sections
21//! (and their corresponding blobs) independently.
22
23use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
24use crate::journal::Error;
25use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _};
26use commonware_runtime::{
27    buffer::paged::{CacheRef, Replay},
28    Blob, Buf, Metrics, Storage,
29};
30use futures::{
31    stream::{self, Stream},
32    StreamExt,
33};
34use std::{marker::PhantomData, num::NonZeroUsize};
35use tracing::{trace, warn};
36
37/// State for replaying a single section's blob.
38struct ReplayState<B: Blob> {
39    section: u64,
40    replay: Replay<B>,
41    position: u64,
42    done: bool,
43}
44
45/// Configuration for the fixed segmented journal.
46#[derive(Clone)]
47pub struct Config {
48    /// The partition to use for storing blobs.
49    pub partition: String,
50
51    /// The page cache to use for caching data.
52    pub page_cache: CacheRef,
53
54    /// The size of the write buffer to use for each blob.
55    pub write_buffer: NonZeroUsize,
56}
57
58/// A segmented journal with fixed-size entries.
59///
60/// Each section is stored in a separate blob. Within each blob, items are fixed-size.
61///
62/// # Repair
63///
64/// Like
65/// [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
66/// and
67/// [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
68/// the first invalid data read will be considered the new end of the journal (and the
69/// underlying [Blob] will be truncated to the last valid item). Repair occurs during
70/// init by checking each blob's size.
71pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
72    manager: Manager<E, AppendFactory>,
73    _array: PhantomData<A>,
74}
75
76impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
77    /// Size of each entry.
78    pub const CHUNK_SIZE: usize = A::SIZE;
79    const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
80
81    /// Initialize a new `Journal` instance.
82    ///
83    /// All backing blobs are opened but not read during initialization. Use `replay`
84    /// to iterate over all items.
85    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
86        let manager_cfg = ManagerConfig {
87            partition: cfg.partition,
88            factory: AppendFactory {
89                write_buffer: cfg.write_buffer,
90                page_cache_ref: cfg.page_cache,
91            },
92        };
93        let mut manager = Manager::init(context, manager_cfg).await?;
94
95        // Repair any blobs with trailing bytes (incomplete items from crash)
96        let sections: Vec<_> = manager.sections().collect();
97        for section in sections {
98            let size = manager.size(section).await?;
99            if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
100                let valid_size = size - (size % Self::CHUNK_SIZE_U64);
101                warn!(
102                    section,
103                    invalid_size = size,
104                    new_size = valid_size,
105                    "trailing bytes detected: truncating"
106                );
107                manager.rewind_section(section, valid_size).await?;
108            }
109        }
110
111        Ok(Self {
112            manager,
113            _array: PhantomData,
114        })
115    }
116
117    /// Append a new item to the journal in the given section.
118    ///
119    /// Returns the position of the item within the section (0-indexed).
120    pub async fn append(&mut self, section: u64, item: &A) -> Result<u64, Error> {
121        let blob = self.manager.get_or_create(section).await?;
122
123        let size = blob.size().await;
124        if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
125            return Err(Error::InvalidBlobSize(section, size));
126        }
127        let position = size / Self::CHUNK_SIZE_U64;
128
129        // Encode the item
130        let buf = item.encode_mut();
131        blob.append(&buf).await?;
132        trace!(section, position, "appended item");
133
134        Ok(position)
135    }
136
137    /// Append pre-encoded bytes to the given section.
138    ///
139    /// The buffer must contain one or more encoded items with size [Self::CHUNK_SIZE] each.
140    ///
141    /// # Panics
142    ///
143    /// Panics if `buf` is empty or not a multiple of [Self::CHUNK_SIZE].
144    pub(crate) async fn append_raw(&mut self, section: u64, buf: &[u8]) -> Result<(), Error> {
145        assert!(!buf.is_empty());
146        assert!(buf.len().is_multiple_of(Self::CHUNK_SIZE));
147        let blob = self.manager.get_or_create(section).await?;
148        blob.append(buf).await?;
149        trace!(
150            section,
151            count = buf.len() / Self::CHUNK_SIZE,
152            "appended items"
153        );
154        Ok(())
155    }
156
157    /// Read the item at the given section and position.
158    ///
159    /// # Errors
160    ///
161    /// - [Error::AlreadyPrunedToSection] if the section has been pruned.
162    /// - [Error::SectionOutOfRange] if the section doesn't exist.
163    /// - [Error::ItemOutOfRange] if the position is beyond the blob size.
164    pub async fn get(&self, section: u64, position: u64) -> Result<A, Error> {
165        let blob = self
166            .manager
167            .get(section)?
168            .ok_or(Error::SectionOutOfRange(section))?;
169
170        let offset = position
171            .checked_mul(Self::CHUNK_SIZE_U64)
172            .ok_or(Error::ItemOutOfRange(position))?;
173        let end = offset
174            .checked_add(Self::CHUNK_SIZE_U64)
175            .ok_or(Error::ItemOutOfRange(position))?;
176        if end > blob.size().await {
177            return Err(Error::ItemOutOfRange(position));
178        }
179
180        let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
181        A::decode(buf.coalesce()).map_err(Error::Codec)
182    }
183
184    /// Read multiple items from the same section into a caller buffer.
185    ///
186    /// `buf` must be at least `positions.len() * CHUNK_SIZE` bytes. All positions must be
187    /// strictly increasing and within the section's bounds.
188    pub async fn get_many(
189        &self,
190        section: u64,
191        positions: &[u64],
192        buf: &mut [u8],
193    ) -> Result<Vec<A>, Error> {
194        if positions.is_empty() {
195            return Ok(Vec::new());
196        }
197        let blob = self
198            .manager
199            .get(section)?
200            .ok_or(Error::SectionOutOfRange(section))?;
201
202        let offsets: Vec<u64> = positions
203            .iter()
204            .map(|&p| {
205                p.checked_mul(Self::CHUNK_SIZE_U64)
206                    .ok_or(Error::ItemOutOfRange(p))
207            })
208            .collect::<Result<_, _>>()?;
209
210        blob.read_many_into(buf, &offsets, Self::CHUNK_SIZE).await?;
211
212        let mut items = Vec::with_capacity(positions.len());
213        for i in 0..positions.len() {
214            let slice = &buf[i * Self::CHUNK_SIZE..(i + 1) * Self::CHUNK_SIZE];
215            items.push(A::decode(slice).map_err(Error::Codec)?);
216        }
217        Ok(items)
218    }
219
220    /// Get an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
221    pub fn try_get_sync(&self, section: u64, position: u64) -> Option<A> {
222        let mut buf = vec![0u8; Self::CHUNK_SIZE];
223        self.try_get_sync_into(section, position, &mut buf)
224    }
225
226    /// Get an item synchronously using caller-provided buffer.
227    ///
228    /// `buf` must be at least [Self::CHUNK_SIZE] bytes.
229    ///
230    /// # Panics
231    ///
232    /// Panics if `buf` is smaller than [Self::CHUNK_SIZE].
233    pub fn try_get_sync_into(&self, section: u64, position: u64, buf: &mut [u8]) -> Option<A> {
234        assert!(
235            buf.len() >= Self::CHUNK_SIZE,
236            "try_get_sync_into requires buf.len() >= CHUNK_SIZE"
237        );
238        let blob = self.manager.get(section).ok()??;
239        let offset = position.checked_mul(Self::CHUNK_SIZE_U64)?;
240        let remaining = blob.try_size()?.checked_sub(offset)?;
241        if remaining < Self::CHUNK_SIZE_U64 {
242            return None;
243        }
244        let buf = &mut buf[..Self::CHUNK_SIZE];
245        if !blob.try_read_sync(offset, buf) {
246            return None;
247        }
248        A::decode(&buf[..]).ok()
249    }
250
251    /// Read the last item in a section, if any.
252    ///
253    /// Returns `Ok(None)` if the section is empty.
254    ///
255    /// # Errors
256    ///
257    /// - [Error::AlreadyPrunedToSection] if the section has been pruned.
258    /// - [Error::SectionOutOfRange] if the section doesn't exist.
259    pub async fn last(&self, section: u64) -> Result<Option<A>, Error> {
260        let blob = self
261            .manager
262            .get(section)?
263            .ok_or(Error::SectionOutOfRange(section))?;
264
265        let size = blob.size().await;
266        if size < Self::CHUNK_SIZE_U64 {
267            return Ok(None);
268        }
269
270        let last_position = (size / Self::CHUNK_SIZE_U64) - 1;
271        let offset = last_position * Self::CHUNK_SIZE_U64;
272        let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
273        A::decode(buf.coalesce()).map_err(Error::Codec).map(Some)
274    }
275
276    /// Returns a stream of all items starting from the given section.
277    ///
278    /// Each item is returned as (section, position, item).
279    pub async fn replay(
280        &self,
281        start_section: u64,
282        start_position: u64,
283        buffer: NonZeroUsize,
284    ) -> Result<impl Stream<Item = Result<(u64, u64, A), Error>> + Send + '_, Error> {
285        // Pre-create readers from blobs (async operation)
286        let mut blob_info = Vec::new();
287        for (&section, blob) in self.manager.sections_from(start_section) {
288            let blob_size = blob.size().await;
289            let mut replay = blob.replay(buffer).await?;
290            // For the first section, seek to the start position
291            let initial_position = if section == start_section {
292                let start = start_position * Self::CHUNK_SIZE_U64;
293                if start > blob_size {
294                    return Err(Error::ItemOutOfRange(start_position));
295                }
296                replay.seek_to(start)?;
297                start_position
298            } else {
299                0
300            };
301            blob_info.push((section, replay, initial_position));
302        }
303
304        // Stream items as they are read to avoid occupying too much memory.
305        // Each blob is processed sequentially, yielding batches of items that are then
306        // flattened into individual stream elements.
307        Ok(
308            stream::iter(blob_info).flat_map(move |(section, replay, initial_position)| {
309                stream::unfold(
310                    ReplayState {
311                        section,
312                        replay,
313                        position: initial_position,
314                        done: false,
315                    },
316                    move |mut state| async move {
317                        if state.done {
318                            return None;
319                        }
320
321                        let mut batch: Vec<Result<(u64, u64, A), Error>> = Vec::new();
322                        loop {
323                            // Ensure we have enough data for one item
324                            match state.replay.ensure(Self::CHUNK_SIZE).await {
325                                Ok(true) => {}
326                                Ok(false) => {
327                                    // Reader exhausted - we're done with this blob
328                                    state.done = true;
329                                    return if batch.is_empty() {
330                                        None
331                                    } else {
332                                        Some((batch, state))
333                                    };
334                                }
335                                Err(err) => {
336                                    batch.push(Err(Error::Runtime(err)));
337                                    state.done = true;
338                                    return Some((batch, state));
339                                }
340                            }
341
342                            // Decode items from buffer
343                            while state.replay.remaining() >= Self::CHUNK_SIZE {
344                                match A::read(&mut state.replay) {
345                                    Ok(item) => {
346                                        batch.push(Ok((state.section, state.position, item)));
347                                        state.position += 1;
348                                    }
349                                    Err(err) => {
350                                        batch.push(Err(Error::Codec(err)));
351                                        state.done = true;
352                                        return Some((batch, state));
353                                    }
354                                }
355                            }
356
357                            // Return batch if we have items
358                            if !batch.is_empty() {
359                                return Some((batch, state));
360                            }
361                        }
362                    },
363                )
364                .flat_map(stream::iter)
365            }),
366        )
367    }
368
369    /// Sync the given section to storage.
370    pub async fn sync(&self, section: u64) -> Result<(), Error> {
371        self.manager.sync(section).await
372    }
373
374    /// Sync all sections to storage.
375    pub async fn sync_all(&self) -> Result<(), Error> {
376        self.manager.sync_all().await
377    }
378
379    /// Prune all sections less than `min`. Returns true if any were pruned.
380    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
381        self.manager.prune(min).await
382    }
383
384    /// Returns the oldest section number, if any blobs exist.
385    pub fn oldest_section(&self) -> Option<u64> {
386        self.manager.oldest_section()
387    }
388
389    /// Returns the newest section number, if any blobs exist.
390    pub fn newest_section(&self) -> Option<u64> {
391        self.manager.newest_section()
392    }
393
394    /// Returns an iterator over all section numbers.
395    pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
396        self.manager.sections_from(0).map(|(section, _)| *section)
397    }
398
399    /// Returns the number of items in the given section.
400    pub async fn section_len(&self, section: u64) -> Result<u64, Error> {
401        let size = self.manager.size(section).await?;
402        Ok(size / Self::CHUNK_SIZE_U64)
403    }
404
405    /// Returns the byte size of the given section.
406    pub async fn size(&self, section: u64) -> Result<u64, Error> {
407        self.manager.size(section).await
408    }
409
410    /// Rewind the journal to a specific section and byte offset.
411    ///
412    /// This truncates the section to the given size. All sections
413    /// after `section` are removed.
414    pub async fn rewind(&mut self, section: u64, offset: u64) -> Result<(), Error> {
415        self.manager.rewind(section, offset).await
416    }
417
418    /// Rewind only the given section to a specific byte offset.
419    ///
420    /// Unlike `rewind`, this does not affect other sections.
421    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
422        self.manager.rewind_section(section, size).await
423    }
424
425    /// Remove all underlying blobs.
426    pub async fn destroy(self) -> Result<(), Error> {
427        self.manager.destroy().await
428    }
429
430    /// Clear all data, resetting the journal to an empty state.
431    ///
432    /// Unlike `destroy`, this keeps the journal alive so it can be reused.
433    pub async fn clear(&mut self) -> Result<(), Error> {
434        self.manager.clear().await
435    }
436
437    /// Ensure a section exists, creating an empty blob if needed.
438    ///
439    /// This is used to maintain the invariant that at least one blob always exists
440    /// (the "tail" blob), which allows reconstructing journal size on reopen.
441    pub(crate) async fn ensure_section_exists(&mut self, section: u64) -> Result<(), Error> {
442        self.manager.get_or_create(section).await?;
443        Ok(())
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
451    use commonware_macros::test_traced;
452    use commonware_runtime::{
453        buffer::paged::CacheRef, deterministic, BufferPooler, Runner, Supervisor as _,
454    };
455    use commonware_utils::{NZUsize, NZU16};
456    use core::num::NonZeroU16;
457    use futures::{pin_mut, StreamExt};
458
459    const PAGE_SIZE: NonZeroU16 = NZU16!(44);
460    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
461
462    fn test_digest(value: u64) -> Digest {
463        Sha256::hash(&value.to_be_bytes())
464    }
465
466    fn test_cfg(pooler: &impl BufferPooler) -> Config {
467        Config {
468            partition: "test-partition".into(),
469            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
470            write_buffer: NZUsize!(2048),
471        }
472    }
473
474    #[test_traced]
475    fn test_segmented_fixed_append_and_get() {
476        let executor = deterministic::Runner::default();
477        executor.start(|context| async move {
478            let cfg = test_cfg(&context);
479            let mut journal = Journal::init(context.child("storage"), cfg.clone())
480                .await
481                .expect("failed to init");
482
483            let pos0 = journal
484                .append(1, &test_digest(0))
485                .await
486                .expect("failed to append");
487            assert_eq!(pos0, 0);
488
489            let pos1 = journal
490                .append(1, &test_digest(1))
491                .await
492                .expect("failed to append");
493            assert_eq!(pos1, 1);
494
495            let pos2 = journal
496                .append(2, &test_digest(2))
497                .await
498                .expect("failed to append");
499            assert_eq!(pos2, 0);
500
501            let item0 = journal.get(1, 0).await.expect("failed to get");
502            assert_eq!(item0, test_digest(0));
503
504            let item1 = journal.get(1, 1).await.expect("failed to get");
505            assert_eq!(item1, test_digest(1));
506
507            let item2 = journal.get(2, 0).await.expect("failed to get");
508            assert_eq!(item2, test_digest(2));
509
510            let err = journal.get(1, 2).await;
511            assert!(matches!(err, Err(Error::ItemOutOfRange(2))));
512
513            let err = journal.get(3, 0).await;
514            assert!(matches!(err, Err(Error::SectionOutOfRange(3))));
515
516            journal.destroy().await.expect("failed to destroy");
517        });
518    }
519
520    #[test_traced]
521    fn test_segmented_fixed_replay() {
522        let executor = deterministic::Runner::default();
523        executor.start(|context| async move {
524            let cfg = test_cfg(&context);
525            let mut journal = Journal::init(context.child("first"), cfg.clone())
526                .await
527                .expect("failed to init");
528
529            for i in 0u64..10 {
530                journal
531                    .append(1, &test_digest(i))
532                    .await
533                    .expect("failed to append");
534            }
535            for i in 10u64..20 {
536                journal
537                    .append(2, &test_digest(i))
538                    .await
539                    .expect("failed to append");
540            }
541
542            journal.sync_all().await.expect("failed to sync");
543            drop(journal);
544
545            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
546                .await
547                .expect("failed to re-init");
548
549            let items = {
550                let stream = journal
551                    .replay(0, 0, NZUsize!(1024))
552                    .await
553                    .expect("failed to replay");
554                pin_mut!(stream);
555
556                let mut items = Vec::new();
557                while let Some(result) = stream.next().await {
558                    match result {
559                        Ok((section, pos, item)) => items.push((section, pos, item)),
560                        Err(err) => panic!("replay error: {err}"),
561                    }
562                }
563                items
564            };
565
566            assert_eq!(items.len(), 20);
567            for (i, item) in items.iter().enumerate().take(10) {
568                assert_eq!(item.0, 1);
569                assert_eq!(item.1, i as u64);
570                assert_eq!(item.2, test_digest(i as u64));
571            }
572            for (i, item) in items.iter().enumerate().skip(10).take(10) {
573                assert_eq!(item.0, 2);
574                assert_eq!(item.1, (i - 10) as u64);
575                assert_eq!(item.2, test_digest(i as u64));
576            }
577
578            journal.destroy().await.expect("failed to destroy");
579        });
580    }
581
582    #[test_traced]
583    fn test_segmented_fixed_replay_with_start_offset() {
584        // Test that replay with a non-zero start_position correctly skips items.
585        let executor = deterministic::Runner::default();
586        executor.start(|context| async move {
587            let cfg = test_cfg(&context);
588            let mut journal = Journal::init(context.child("first"), cfg.clone())
589                .await
590                .expect("failed to init");
591
592            // Append 10 items to section 1
593            for i in 0u64..10 {
594                journal
595                    .append(1, &test_digest(i))
596                    .await
597                    .expect("failed to append");
598            }
599            // Append 5 items to section 2
600            for i in 10u64..15 {
601                journal
602                    .append(2, &test_digest(i))
603                    .await
604                    .expect("failed to append");
605            }
606            journal.sync_all().await.expect("failed to sync");
607            drop(journal);
608
609            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
610                .await
611                .expect("failed to re-init");
612
613            // Replay from section 1, position 5 - should get items 5-9 from section 1 and all of section 2
614            {
615                let stream = journal
616                    .replay(1, 5, NZUsize!(1024))
617                    .await
618                    .expect("failed to replay");
619                pin_mut!(stream);
620
621                let mut items = Vec::new();
622                while let Some(result) = stream.next().await {
623                    let (section, pos, item) = result.expect("replay error");
624                    items.push((section, pos, item));
625                }
626
627                assert_eq!(
628                    items.len(),
629                    10,
630                    "Should have 5 items from section 1 + 5 from section 2"
631                );
632
633                // Check section 1 items (positions 5-9)
634                for (i, (section, pos, item)) in items.iter().enumerate().take(5) {
635                    assert_eq!(*section, 1);
636                    assert_eq!(*pos, (i + 5) as u64);
637                    assert_eq!(*item, test_digest((i + 5) as u64));
638                }
639
640                // Check section 2 items (positions 0-4)
641                for (i, (section, pos, item)) in items.iter().enumerate().skip(5) {
642                    assert_eq!(*section, 2);
643                    assert_eq!(*pos, (i - 5) as u64);
644                    assert_eq!(*item, test_digest((i + 5) as u64));
645                }
646            }
647
648            // Replay from section 1, position 9 - should get only item 9 from section 1 and all of section 2
649            {
650                let stream = journal
651                    .replay(1, 9, NZUsize!(1024))
652                    .await
653                    .expect("failed to replay");
654                pin_mut!(stream);
655
656                let mut items = Vec::new();
657                while let Some(result) = stream.next().await {
658                    let (section, pos, item) = result.expect("replay error");
659                    items.push((section, pos, item));
660                }
661
662                assert_eq!(
663                    items.len(),
664                    6,
665                    "Should have 1 item from section 1 + 5 from section 2"
666                );
667                assert_eq!(items[0], (1, 9, test_digest(9)));
668                for (i, (section, pos, item)) in items.iter().enumerate().skip(1) {
669                    assert_eq!(*section, 2);
670                    assert_eq!(*pos, (i - 1) as u64);
671                    assert_eq!(*item, test_digest((i + 9) as u64));
672                }
673            }
674
675            // Replay from section 2, position 3 - should get only items 3-4 from section 2
676            {
677                let stream = journal
678                    .replay(2, 3, NZUsize!(1024))
679                    .await
680                    .expect("failed to replay");
681                pin_mut!(stream);
682
683                let mut items = Vec::new();
684                while let Some(result) = stream.next().await {
685                    let (section, pos, item) = result.expect("replay error");
686                    items.push((section, pos, item));
687                }
688
689                assert_eq!(items.len(), 2, "Should have 2 items from section 2");
690                assert_eq!(items[0], (2, 3, test_digest(13)));
691                assert_eq!(items[1], (2, 4, test_digest(14)));
692            }
693
694            // Replay from position past the end should return ItemOutOfRange error
695            let result = journal.replay(1, 100, NZUsize!(1024)).await;
696            assert!(matches!(result, Err(Error::ItemOutOfRange(100))));
697            drop(result);
698
699            journal.destroy().await.expect("failed to destroy");
700        });
701    }
702
703    #[test_traced]
704    fn test_segmented_fixed_prune() {
705        let executor = deterministic::Runner::default();
706        executor.start(|context| async move {
707            let cfg = test_cfg(&context);
708            let mut journal = Journal::init(context.child("storage"), cfg.clone())
709                .await
710                .expect("failed to init");
711
712            for section in 1u64..=5 {
713                journal
714                    .append(section, &test_digest(section))
715                    .await
716                    .expect("failed to append");
717            }
718            journal.sync_all().await.expect("failed to sync");
719
720            journal.prune(3).await.expect("failed to prune");
721
722            let err = journal.get(1, 0).await;
723            assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
724
725            let err = journal.get(2, 0).await;
726            assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
727
728            let item = journal.get(3, 0).await.expect("should exist");
729            assert_eq!(item, test_digest(3));
730
731            journal.destroy().await.expect("failed to destroy");
732        });
733    }
734
735    #[test_traced]
736    fn test_segmented_fixed_rewind() {
737        let executor = deterministic::Runner::default();
738        executor.start(|context| async move {
739            let cfg = test_cfg(&context);
740            let mut journal = Journal::init(context.child("storage"), cfg.clone())
741                .await
742                .expect("failed to init");
743
744            // Create sections 1, 2, 3
745            for section in 1u64..=3 {
746                journal
747                    .append(section, &test_digest(section))
748                    .await
749                    .expect("failed to append");
750            }
751            journal.sync_all().await.expect("failed to sync");
752
753            // Verify all sections exist
754            for section in 1u64..=3 {
755                let size = journal.size(section).await.expect("failed to get size");
756                assert!(size > 0, "section {section} should have data");
757            }
758
759            // Rewind to section 1 (should remove sections 2, 3)
760            let size = journal.size(1).await.expect("failed to get size");
761            journal.rewind(1, size).await.expect("failed to rewind");
762
763            // Verify section 1 still has data
764            let size = journal.size(1).await.expect("failed to get size");
765            assert!(size > 0, "section 1 should still have data");
766
767            // Verify sections 2, 3 are removed
768            for section in 2u64..=3 {
769                let size = journal.size(section).await.expect("failed to get size");
770                assert_eq!(size, 0, "section {section} should be removed");
771            }
772
773            // Verify data in section 1 is still readable
774            let item = journal.get(1, 0).await.expect("failed to get");
775            assert_eq!(item, test_digest(1));
776
777            journal.destroy().await.expect("failed to destroy");
778        });
779    }
780
781    #[test_traced]
782    fn test_segmented_fixed_rewind_many_sections() {
783        let executor = deterministic::Runner::default();
784        executor.start(|context| async move {
785            let cfg = test_cfg(&context);
786            let mut journal = Journal::init(context.child("storage"), cfg.clone())
787                .await
788                .expect("failed to init");
789
790            // Create sections 1-10
791            for section in 1u64..=10 {
792                journal
793                    .append(section, &test_digest(section))
794                    .await
795                    .expect("failed to append");
796            }
797            journal.sync_all().await.expect("failed to sync");
798
799            // Rewind to section 5 (should remove sections 6-10)
800            let size = journal.size(5).await.expect("failed to get size");
801            journal.rewind(5, size).await.expect("failed to rewind");
802
803            // Verify sections 1-5 still have data
804            for section in 1u64..=5 {
805                let size = journal.size(section).await.expect("failed to get size");
806                assert!(size > 0, "section {section} should still have data");
807            }
808
809            // Verify sections 6-10 are removed
810            for section in 6u64..=10 {
811                let size = journal.size(section).await.expect("failed to get size");
812                assert_eq!(size, 0, "section {section} should be removed");
813            }
814
815            // Verify data integrity via replay
816            {
817                let stream = journal
818                    .replay(0, 0, NZUsize!(1024))
819                    .await
820                    .expect("failed to replay");
821                pin_mut!(stream);
822                let mut items = Vec::new();
823                while let Some(result) = stream.next().await {
824                    let (section, _, item) = result.expect("failed to read");
825                    items.push((section, item));
826                }
827                assert_eq!(items.len(), 5);
828                for (i, (section, item)) in items.iter().enumerate() {
829                    assert_eq!(*section, (i + 1) as u64);
830                    assert_eq!(*item, test_digest((i + 1) as u64));
831                }
832            }
833
834            journal.destroy().await.expect("failed to destroy");
835        });
836    }
837
838    #[test_traced]
839    fn test_segmented_fixed_rewind_persistence() {
840        let executor = deterministic::Runner::default();
841        executor.start(|context| async move {
842            let cfg = test_cfg(&context);
843
844            // Create sections 1-5
845            let mut journal = Journal::init(context.child("first"), cfg.clone())
846                .await
847                .expect("failed to init");
848            for section in 1u64..=5 {
849                journal
850                    .append(section, &test_digest(section))
851                    .await
852                    .expect("failed to append");
853            }
854            journal.sync_all().await.expect("failed to sync");
855
856            // Rewind to section 2
857            let size = journal.size(2).await.expect("failed to get size");
858            journal.rewind(2, size).await.expect("failed to rewind");
859            journal.sync_all().await.expect("failed to sync");
860            drop(journal);
861
862            // Re-init and verify only sections 1-2 exist
863            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
864                .await
865                .expect("failed to re-init");
866
867            // Verify sections 1-2 have data
868            for section in 1u64..=2 {
869                let size = journal.size(section).await.expect("failed to get size");
870                assert!(size > 0, "section {section} should have data after restart");
871            }
872
873            // Verify sections 3-5 are gone
874            for section in 3u64..=5 {
875                let size = journal.size(section).await.expect("failed to get size");
876                assert_eq!(size, 0, "section {section} should be gone after restart");
877            }
878
879            // Verify data integrity
880            let item1 = journal.get(1, 0).await.expect("failed to get");
881            assert_eq!(item1, test_digest(1));
882            let item2 = journal.get(2, 0).await.expect("failed to get");
883            assert_eq!(item2, test_digest(2));
884
885            journal.destroy().await.expect("failed to destroy");
886        });
887    }
888
889    #[test_traced]
890    fn test_segmented_fixed_corruption_recovery() {
891        let executor = deterministic::Runner::default();
892        executor.start(|context| async move {
893            let cfg = test_cfg(&context);
894            let mut journal = Journal::init(context.child("first"), cfg.clone())
895                .await
896                .expect("failed to init");
897
898            for i in 0u64..5 {
899                journal
900                    .append(1, &test_digest(i))
901                    .await
902                    .expect("failed to append");
903            }
904            journal.sync_all().await.expect("failed to sync");
905            drop(journal);
906
907            let (blob, size) = context
908                .open(&cfg.partition, &1u64.to_be_bytes())
909                .await
910                .expect("failed to open blob");
911            blob.resize(size - 1).await.expect("failed to truncate");
912            blob.sync().await.expect("failed to sync");
913
914            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
915                .await
916                .expect("failed to re-init");
917
918            let count = {
919                let stream = journal
920                    .replay(0, 0, NZUsize!(1024))
921                    .await
922                    .expect("failed to replay");
923                pin_mut!(stream);
924
925                let mut count = 0;
926                while let Some(result) = stream.next().await {
927                    result.expect("should be ok");
928                    count += 1;
929                }
930                count
931            };
932            assert_eq!(count, 4);
933
934            journal.destroy().await.expect("failed to destroy");
935        });
936    }
937
938    #[test_traced]
939    fn test_segmented_fixed_persistence() {
940        let executor = deterministic::Runner::default();
941        executor.start(|context| async move {
942            let cfg = test_cfg(&context);
943
944            // Create and populate journal
945            let mut journal = Journal::init(context.child("first"), cfg.clone())
946                .await
947                .expect("failed to init");
948
949            for i in 0u64..5 {
950                journal
951                    .append(1, &test_digest(i))
952                    .await
953                    .expect("failed to append");
954            }
955            journal.sync_all().await.expect("failed to sync");
956            drop(journal);
957
958            // Reopen and verify data persisted
959            let journal = Journal::<_, Digest>::init(context.child("second"), cfg)
960                .await
961                .expect("failed to re-init");
962
963            for i in 0u64..5 {
964                let item = journal.get(1, i).await.expect("failed to get");
965                assert_eq!(item, test_digest(i));
966            }
967
968            journal.destroy().await.expect("failed to destroy");
969        });
970    }
971
972    #[test_traced]
973    fn test_segmented_fixed_section_len() {
974        let executor = deterministic::Runner::default();
975        executor.start(|context| async move {
976            let cfg = test_cfg(&context);
977            let mut journal = Journal::init(context.child("storage"), cfg.clone())
978                .await
979                .expect("failed to init");
980
981            assert_eq!(journal.section_len(1).await.unwrap(), 0);
982
983            for i in 0u64..5 {
984                journal
985                    .append(1, &test_digest(i))
986                    .await
987                    .expect("failed to append");
988            }
989
990            assert_eq!(journal.section_len(1).await.unwrap(), 5);
991            assert_eq!(journal.section_len(2).await.unwrap(), 0);
992
993            journal.destroy().await.expect("failed to destroy");
994        });
995    }
996
997    #[test_traced]
998    fn test_segmented_fixed_non_contiguous_sections() {
999        // Test that sections with gaps in numbering work correctly.
1000        // Sections 1, 5, 10 should all be independent and accessible.
1001        let executor = deterministic::Runner::default();
1002        executor.start(|context| async move {
1003            let cfg = test_cfg(&context);
1004            let mut journal = Journal::init(context.child("first"), cfg.clone())
1005                .await
1006                .expect("failed to init");
1007
1008            // Create sections with gaps: 1, 5, 10
1009            journal
1010                .append(1, &test_digest(100))
1011                .await
1012                .expect("failed to append");
1013            journal
1014                .append(5, &test_digest(500))
1015                .await
1016                .expect("failed to append");
1017            journal
1018                .append(10, &test_digest(1000))
1019                .await
1020                .expect("failed to append");
1021            journal.sync_all().await.expect("failed to sync");
1022
1023            // Verify random access to each section
1024            assert_eq!(journal.get(1, 0).await.unwrap(), test_digest(100));
1025            assert_eq!(journal.get(5, 0).await.unwrap(), test_digest(500));
1026            assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(1000));
1027
1028            // Verify non-existent sections return appropriate errors
1029            for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
1030                let result = journal.get(missing_section, 0).await;
1031                assert!(
1032                    matches!(result, Err(Error::SectionOutOfRange(_))),
1033                    "Expected SectionOutOfRange for section {}, got {:?}",
1034                    missing_section,
1035                    result
1036                );
1037            }
1038
1039            // Drop and reopen to test replay
1040            drop(journal);
1041            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1042                .await
1043                .expect("failed to re-init");
1044
1045            // Replay and verify all items in order
1046            {
1047                let stream = journal
1048                    .replay(0, 0, NZUsize!(1024))
1049                    .await
1050                    .expect("failed to replay");
1051                pin_mut!(stream);
1052
1053                let mut items = Vec::new();
1054                while let Some(result) = stream.next().await {
1055                    let (section, _, item) = result.expect("replay error");
1056                    items.push((section, item));
1057                }
1058
1059                assert_eq!(items.len(), 3, "Should have 3 items");
1060                assert_eq!(items[0], (1, test_digest(100)));
1061                assert_eq!(items[1], (5, test_digest(500)));
1062                assert_eq!(items[2], (10, test_digest(1000)));
1063            }
1064
1065            // Test replay starting from middle section (5)
1066            {
1067                let stream = journal
1068                    .replay(5, 0, NZUsize!(1024))
1069                    .await
1070                    .expect("failed to replay from section 5");
1071                pin_mut!(stream);
1072
1073                let mut items = Vec::new();
1074                while let Some(result) = stream.next().await {
1075                    let (section, _, item) = result.expect("replay error");
1076                    items.push((section, item));
1077                }
1078
1079                assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
1080                assert_eq!(items[0], (5, test_digest(500)));
1081                assert_eq!(items[1], (10, test_digest(1000)));
1082            }
1083
1084            journal.destroy().await.expect("failed to destroy");
1085        });
1086    }
1087
1088    #[test_traced]
1089    fn test_segmented_fixed_empty_section_in_middle() {
1090        // Test that replay correctly handles an empty section between sections with data.
1091        // Section 1 has data, section 2 is empty, section 3 has data.
1092        let executor = deterministic::Runner::default();
1093        executor.start(|context| async move {
1094            let cfg = test_cfg(&context);
1095            let mut journal = Journal::init(context.child("first"), cfg.clone())
1096                .await
1097                .expect("failed to init");
1098
1099            // Append to section 1
1100            journal
1101                .append(1, &test_digest(100))
1102                .await
1103                .expect("failed to append");
1104
1105            // Create section 2 but make it empty via rewind
1106            journal
1107                .append(2, &test_digest(200))
1108                .await
1109                .expect("failed to append");
1110            journal.sync(2).await.expect("failed to sync");
1111            journal
1112                .rewind_section(2, 0)
1113                .await
1114                .expect("failed to rewind");
1115
1116            // Append to section 3
1117            journal
1118                .append(3, &test_digest(300))
1119                .await
1120                .expect("failed to append");
1121
1122            journal.sync_all().await.expect("failed to sync");
1123
1124            // Verify section lengths
1125            assert_eq!(journal.section_len(1).await.unwrap(), 1);
1126            assert_eq!(journal.section_len(2).await.unwrap(), 0);
1127            assert_eq!(journal.section_len(3).await.unwrap(), 1);
1128
1129            // Drop and reopen to test replay
1130            drop(journal);
1131            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1132                .await
1133                .expect("failed to re-init");
1134
1135            // Replay all - should get items from sections 1 and 3, skipping empty section 2
1136            {
1137                let stream = journal
1138                    .replay(0, 0, NZUsize!(1024))
1139                    .await
1140                    .expect("failed to replay");
1141                pin_mut!(stream);
1142
1143                let mut items = Vec::new();
1144                while let Some(result) = stream.next().await {
1145                    let (section, _, item) = result.expect("replay error");
1146                    items.push((section, item));
1147                }
1148
1149                assert_eq!(
1150                    items.len(),
1151                    2,
1152                    "Should have 2 items (skipping empty section)"
1153                );
1154                assert_eq!(items[0], (1, test_digest(100)));
1155                assert_eq!(items[1], (3, test_digest(300)));
1156            }
1157
1158            // Replay starting from empty section 2 - should get only section 3
1159            {
1160                let stream = journal
1161                    .replay(2, 0, NZUsize!(1024))
1162                    .await
1163                    .expect("failed to replay from section 2");
1164                pin_mut!(stream);
1165
1166                let mut items = Vec::new();
1167                while let Some(result) = stream.next().await {
1168                    let (section, _, item) = result.expect("replay error");
1169                    items.push((section, item));
1170                }
1171
1172                assert_eq!(items.len(), 1, "Should have 1 item from section 3");
1173                assert_eq!(items[0], (3, test_digest(300)));
1174            }
1175
1176            journal.destroy().await.expect("failed to destroy");
1177        });
1178    }
1179
1180    #[test_traced]
1181    fn test_segmented_fixed_truncation_recovery_across_page_boundary() {
1182        // Test that truncating a single byte from a blob that has items straddling a page boundary
1183        // correctly recovers by removing the incomplete item.
1184        //
1185        // With PAGE_SIZE=44 and ITEM_SIZE=32:
1186        // - Item 0: bytes 0-31
1187        // - Item 1: bytes 32-63 (straddles page boundary at 44)
1188        // - Item 2: bytes 64-95 (straddles page boundary at 88)
1189        //
1190        // After 3 items we have 96 bytes = 2 full pages + 8 bytes. Truncating 1 byte leaves 95
1191        // bytes, which is not a multiple of 32. Recovery should truncate to 64 bytes (2 complete
1192        // items).
1193        let executor = deterministic::Runner::default();
1194        executor.start(|context| async move {
1195            let cfg = test_cfg(&context);
1196            let mut journal = Journal::init(context.child("first"), cfg.clone())
1197                .await
1198                .expect("failed to init");
1199
1200            // Append 3 items (just over 2 pages worth)
1201            for i in 0u64..3 {
1202                journal
1203                    .append(1, &test_digest(i))
1204                    .await
1205                    .expect("failed to append");
1206            }
1207            journal.sync_all().await.expect("failed to sync");
1208
1209            // Verify all 3 items are readable
1210            for i in 0u64..3 {
1211                let item = journal.get(1, i).await.expect("failed to get");
1212                assert_eq!(item, test_digest(i));
1213            }
1214            drop(journal);
1215
1216            // Truncate the blob by exactly 1 byte to simulate partial write
1217            let (blob, size) = context
1218                .open(&cfg.partition, &1u64.to_be_bytes())
1219                .await
1220                .expect("failed to open blob");
1221            blob.resize(size - 1).await.expect("failed to truncate");
1222            blob.sync().await.expect("failed to sync");
1223            drop(blob);
1224
1225            // Reopen journal - should recover by truncating last page due to failed checksum, and
1226            // end up with a correct blob size due to partial-item trimming.
1227            let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1228                .await
1229                .expect("failed to re-init");
1230
1231            // Verify section now has only 2 items
1232            assert_eq!(journal.section_len(1).await.unwrap(), 2);
1233
1234            // Verify size is the expected multiple of ITEM_SIZE (this would fail if we didn't trim
1235            // items and just relied on page-level checksum recovery).
1236            assert_eq!(journal.size(1).await.unwrap(), 64);
1237
1238            // Items 0 and 1 should still be readable
1239            let item0 = journal.get(1, 0).await.expect("failed to get item 0");
1240            assert_eq!(item0, test_digest(0));
1241            let item1 = journal.get(1, 1).await.expect("failed to get item 1");
1242            assert_eq!(item1, test_digest(1));
1243
1244            // Item 2 should return ItemOutOfRange
1245            let err = journal.get(1, 2).await;
1246            assert!(
1247                matches!(err, Err(Error::ItemOutOfRange(2))),
1248                "expected ItemOutOfRange(2), got {:?}",
1249                err
1250            );
1251
1252            journal.destroy().await.expect("failed to destroy");
1253        });
1254    }
1255
1256    #[test_traced]
1257    fn test_journal_clear() {
1258        let executor = deterministic::Runner::default();
1259        executor.start(|context| async move {
1260            let cfg = Config {
1261                partition: "clear-test".into(),
1262                page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1263                write_buffer: NZUsize!(1024),
1264            };
1265
1266            let mut journal: Journal<_, Digest> =
1267                Journal::init(context.child("journal"), cfg.clone())
1268                    .await
1269                    .expect("Failed to initialize journal");
1270
1271            // Append items across multiple sections
1272            for section in 0..5u64 {
1273                for i in 0..10u64 {
1274                    journal
1275                        .append(section, &test_digest(section * 1000 + i))
1276                        .await
1277                        .expect("Failed to append");
1278                }
1279                journal.sync(section).await.expect("Failed to sync");
1280            }
1281
1282            // Verify we have data
1283            assert_eq!(journal.get(0, 0).await.unwrap(), test_digest(0));
1284            assert_eq!(journal.get(4, 0).await.unwrap(), test_digest(4000));
1285
1286            // Clear the journal
1287            journal.clear().await.expect("Failed to clear");
1288
1289            // After clear, all reads should fail
1290            for section in 0..5u64 {
1291                assert!(matches!(
1292                    journal.get(section, 0).await,
1293                    Err(Error::SectionOutOfRange(s)) if s == section
1294                ));
1295            }
1296
1297            // Append new data after clear
1298            for i in 0..5u64 {
1299                journal
1300                    .append(10, &test_digest(i * 100))
1301                    .await
1302                    .expect("Failed to append after clear");
1303            }
1304            journal.sync(10).await.expect("Failed to sync after clear");
1305
1306            // New data should be readable
1307            assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(0));
1308
1309            // Old sections should still be missing
1310            assert!(matches!(
1311                journal.get(0, 0).await,
1312                Err(Error::SectionOutOfRange(0))
1313            ));
1314
1315            journal.destroy().await.unwrap();
1316        });
1317    }
1318
1319    #[test_traced]
1320    fn test_last_missing_section_returns_error() {
1321        let executor = deterministic::Runner::default();
1322        executor.start(|context| async move {
1323            let cfg = test_cfg(&context);
1324            let journal = Journal::<_, Digest>::init(context.child("storage"), cfg.clone())
1325                .await
1326                .expect("failed to init");
1327
1328            assert!(matches!(
1329                journal.last(0).await,
1330                Err(Error::SectionOutOfRange(0))
1331            ));
1332            assert!(matches!(
1333                journal.last(99).await,
1334                Err(Error::SectionOutOfRange(99))
1335            ));
1336
1337            journal.destroy().await.unwrap();
1338        });
1339    }
1340
1341    #[test_traced]
1342    fn test_last_after_rewind_to_zero() {
1343        let executor = deterministic::Runner::default();
1344        executor.start(|context| async move {
1345            let cfg = test_cfg(&context);
1346            let mut journal = Journal::init(context.child("storage"), cfg.clone())
1347                .await
1348                .expect("failed to init");
1349
1350            journal.append(0, &test_digest(0)).await.unwrap();
1351            journal.append(0, &test_digest(1)).await.unwrap();
1352            journal.sync(0).await.unwrap();
1353
1354            assert!(journal.last(0).await.unwrap().is_some());
1355
1356            journal.rewind(0, 0).await.unwrap();
1357            assert_eq!(journal.last(0).await.unwrap(), None);
1358
1359            journal.destroy().await.unwrap();
1360        });
1361    }
1362
1363    #[test_traced]
1364    fn test_last_pruned_section_returns_error() {
1365        let executor = deterministic::Runner::default();
1366        executor.start(|context| async move {
1367            let cfg = test_cfg(&context);
1368            let mut journal = Journal::<_, Digest>::init(context.child("storage"), cfg.clone())
1369                .await
1370                .expect("failed to init");
1371
1372            journal.append(0, &test_digest(0)).await.unwrap();
1373            journal.append(1, &test_digest(1)).await.unwrap();
1374            journal.sync_all().await.unwrap();
1375
1376            journal.prune(1).await.unwrap();
1377
1378            assert!(matches!(
1379                journal.last(0).await,
1380                Err(Error::AlreadyPrunedToSection(1))
1381            ));
1382            assert!(journal.last(1).await.unwrap().is_some());
1383
1384            journal.destroy().await.unwrap();
1385        });
1386    }
1387
1388    #[test_traced]
1389    fn test_get_many_empty() {
1390        let executor = deterministic::Runner::default();
1391        executor.start(|context| async move {
1392            let cfg = test_cfg(&context);
1393            let mut journal = Journal::init(context.child("storage"), cfg).await.unwrap();
1394            journal.append(0, &test_digest(0)).await.unwrap();
1395            assert_eq!(journal.section_len(0).await.unwrap(), 1);
1396
1397            let mut buf = [];
1398            let items = journal.get_many(0, &[], &mut buf).await.unwrap();
1399            assert!(items.is_empty());
1400
1401            journal.destroy().await.unwrap();
1402        });
1403    }
1404
1405    #[test_traced]
1406    fn test_get_many_single_section() {
1407        let executor = deterministic::Runner::default();
1408        executor.start(|context| async move {
1409            let cfg = test_cfg(&context);
1410            let mut journal = Journal::init(context.child("storage"), cfg).await.unwrap();
1411
1412            for i in 0..5 {
1413                journal.append(0, &test_digest(i)).await.unwrap();
1414            }
1415            assert_eq!(journal.section_len(0).await.unwrap(), 5);
1416
1417            // Read all 5 items in one call.
1418            let chunk = Journal::<deterministic::Context, Digest>::CHUNK_SIZE;
1419            let mut buf = vec![0u8; 5 * chunk];
1420            let items = journal
1421                .get_many(0, &[0, 1, 2, 3, 4], &mut buf)
1422                .await
1423                .unwrap();
1424
1425            for (i, item) in items.iter().enumerate() {
1426                assert_eq!(*item, test_digest(i as u64));
1427            }
1428
1429            journal.destroy().await.unwrap();
1430        });
1431    }
1432
1433    #[test_traced]
1434    fn test_get_many_subset() {
1435        // Read a sparse subset of positions.
1436        let executor = deterministic::Runner::default();
1437        executor.start(|context| async move {
1438            let cfg = test_cfg(&context);
1439            let mut journal = Journal::init(context.child("storage"), cfg).await.unwrap();
1440
1441            for i in 0..10 {
1442                journal.append(0, &test_digest(i)).await.unwrap();
1443            }
1444            assert_eq!(journal.section_len(0).await.unwrap(), 10);
1445
1446            let chunk = Journal::<deterministic::Context, Digest>::CHUNK_SIZE;
1447            let positions = [1, 4, 7, 9];
1448            let mut buf = vec![0u8; positions.len() * chunk];
1449            let items = journal.get_many(0, &positions, &mut buf).await.unwrap();
1450
1451            for (i, &pos) in positions.iter().enumerate() {
1452                assert_eq!(items[i], test_digest(pos));
1453            }
1454
1455            journal.destroy().await.unwrap();
1456        });
1457    }
1458
1459    #[test_traced]
1460    fn test_get_many_bad_section() {
1461        let executor = deterministic::Runner::default();
1462        executor.start(|context| async move {
1463            let cfg = test_cfg(&context);
1464            let journal = Journal::<_, Digest>::init(context.child("storage"), cfg)
1465                .await
1466                .unwrap();
1467
1468            let mut buf = vec![0u8; 64];
1469            let err = journal.get_many(99, &[0], &mut buf).await.unwrap_err();
1470            assert!(matches!(err, Error::SectionOutOfRange(99)));
1471
1472            journal.destroy().await.unwrap();
1473        });
1474    }
1475
1476    #[test_traced]
1477    fn test_get_many_matches_get() {
1478        // Verify batch read matches individual reads.
1479        let executor = deterministic::Runner::default();
1480        executor.start(|context| async move {
1481            let cfg = test_cfg(&context);
1482            let mut journal = Journal::init(context.child("storage"), cfg).await.unwrap();
1483
1484            for i in 0..8 {
1485                journal.append(0, &test_digest(i)).await.unwrap();
1486            }
1487            assert_eq!(journal.section_len(0).await.unwrap(), 8);
1488            journal.sync_all().await.unwrap();
1489
1490            let chunk = Journal::<deterministic::Context, Digest>::CHUNK_SIZE;
1491            let positions: Vec<u64> = (0..8).collect();
1492            let mut buf = vec![0u8; positions.len() * chunk];
1493            let batch = journal.get_many(0, &positions, &mut buf).await.unwrap();
1494
1495            for pos in &positions {
1496                let single = journal.get(0, *pos).await.unwrap();
1497                assert_eq!(batch[*pos as usize], single);
1498            }
1499
1500            journal.destroy().await.unwrap();
1501        });
1502    }
1503}