commonware_storage/adb/any/variable/
sync.rs

1use crate::journal::{
2    variable::{Config as VConfig, Journal as VJournal},
3    Error,
4};
5use commonware_codec::Codec;
6use commonware_runtime::{Metrics, Storage};
7use std::{num::NonZeroU64, ops::Bound};
8use tracing::debug;
9
10/// Initialize a Variable journal for use in state sync.
11///
12/// The bounds are item locations (not section numbers). This function prepares the
13/// on-disk journal so that subsequent appends go to the correct physical location for the
14/// requested range.
15///
16/// Behavior by existing on-disk state:
17/// - Fresh (no data): returns an empty journal.
18/// - Stale (all data strictly before `lower_bound`): destroys existing data and returns an
19///   empty journal.
20/// - Overlap within [`lower_bound`, `upper_bound`]:
21///   - Prunes sections strictly below `lower_bound / items_per_section` (section-aligned).
22///   - Removes any sections strictly greater than `upper_bound / items_per_section`.
23///   - Truncates the final retained section so that no item with location greater
24///     than `upper_bound` remains.
25///
26/// Note that lower-bound pruning is section-aligned. This means the first retained section may
27/// still contain items whose locations are < `lower_bound`. Callers should ignore these.
28///
29/// # Arguments
30/// - `context`: storage context
31/// - `cfg`: journal configuration
32/// - `lower_bound`: first item location to retain (inclusive)
33/// - `upper_bound`: last item location to retain (inclusive)
34/// - `items_per_section`: number of items per section
35///
36/// # Returns
37/// A journal whose sections satisfy:
38/// - No section index < `lower_bound / items_per_section` exists.
39/// - No section index > `upper_bound / items_per_section` exists.
40/// - The last retained section is truncated so that its last item’s location is `<= upper_bound`.
41pub(crate) async fn init_journal<E: Storage + Metrics, V: Codec>(
42    context: E,
43    cfg: VConfig<V::Cfg>,
44    lower_bound: u64,
45    upper_bound: u64,
46    items_per_section: NonZeroU64,
47) -> Result<VJournal<E, V>, Error> {
48    if lower_bound > upper_bound {
49        return Err(Error::InvalidSyncRange(lower_bound, upper_bound));
50    }
51
52    // Calculate the section ranges based on item locations
53    let items_per_section = items_per_section.get();
54    let lower_section = lower_bound / items_per_section;
55    let upper_section = upper_bound / items_per_section;
56
57    debug!(
58        lower_bound,
59        upper_bound,
60        lower_section,
61        upper_section,
62        items_per_section = items_per_section,
63        "initializing variable journal"
64    );
65
66    // Initialize the base journal to see what existing data we have
67    let mut journal = VJournal::init(context.clone(), cfg.clone()).await?;
68
69    let last_section = journal.blobs.last_key_value().map(|(&s, _)| s);
70
71    // No existing data
72    let Some(last_section) = last_section else {
73        debug!("no existing journal data, creating fresh journal");
74        return Ok(journal);
75    };
76
77    // If all existing data is before our sync range, destroy and recreate fresh
78    if last_section < lower_section {
79        debug!(
80            last_section,
81            lower_section, "existing journal data is stale, re-initializing"
82        );
83        journal.destroy().await?;
84        return VJournal::init(context, cfg).await;
85    }
86
87    // Prune sections below the lower bound.
88    if lower_section > 0 {
89        journal.prune(lower_section).await?;
90    }
91
92    // Remove any sections beyond the upper bound
93    if last_section > upper_section {
94        debug!(
95            last_section,
96            lower_section,
97            upper_section,
98            "existing journal data exceeds sync range, removing sections beyond upper bound"
99        );
100
101        let sections_to_remove: Vec<u64> = journal
102            .blobs
103            .range((Bound::Excluded(upper_section), Bound::Unbounded))
104            .map(|(&section, _)| section)
105            .collect();
106
107        for section in sections_to_remove {
108            debug!(section, "removing section beyond upper bound");
109            if let Some(blob) = journal.blobs.remove(&section) {
110                drop(blob);
111                let name = section.to_be_bytes();
112                journal
113                    .context
114                    .remove(&journal.cfg.partition, Some(&name))
115                    .await?;
116                journal.tracked.dec();
117            }
118        }
119    }
120
121    // Remove any items beyond upper_bound
122    truncate_upper_section(&mut journal, upper_bound, items_per_section).await?;
123
124    Ok(journal)
125}
126
127/// Remove items beyond the `upper_bound` location (inclusive).
128/// Assumes each section contains `items_per_section` items.
129async fn truncate_upper_section<E: Storage + Metrics, V: Codec>(
130    journal: &mut VJournal<E, V>,
131    upper_bound: u64,
132    items_per_section: u64,
133) -> Result<(), Error> {
134    // Find which section contains the upper_bound item
135    let upper_section = upper_bound / items_per_section;
136    let Some(blob) = journal.blobs.get(&upper_section) else {
137        return Ok(()); // Section doesn't exist, nothing to truncate
138    };
139
140    // Calculate the logical item range for this section
141    let section_start = upper_section * items_per_section;
142    let section_end = section_start + items_per_section - 1;
143
144    // If upper_bound is at the very end of the section, no truncation needed
145    if upper_bound >= section_end {
146        return Ok(());
147    }
148
149    // Calculate how many items to keep (upper_bound is inclusive)
150    let items_to_keep = (upper_bound - section_start + 1) as u32;
151    debug!(
152        upper_section,
153        upper_bound,
154        section_start,
155        section_end,
156        items_to_keep,
157        "truncating section to remove items beyond upper_bound"
158    );
159
160    // Find where to rewind to (after the last item we want to keep)
161    let target_byte_size = compute_offset::<E, V>(
162        blob,
163        &journal.cfg.codec_config,
164        journal.cfg.compression.is_some(),
165        items_to_keep,
166    )
167    .await?;
168
169    // Rewind to the appropriate position to remove items beyond the upper bound
170    journal
171        .rewind_section(upper_section, target_byte_size)
172        .await?;
173
174    debug!(
175        upper_section,
176        items_to_keep, target_byte_size, "section truncated"
177    );
178
179    Ok(())
180}
181
182/// Return the byte offset of the next element after `items_count` elements of `blob`.
183async fn compute_offset<E: Storage + Metrics, V: Codec>(
184    blob: &commonware_runtime::buffer::Append<E::Blob>,
185    codec_config: &V::Cfg,
186    compressed: bool,
187    items_count: u32,
188) -> Result<u64, Error> {
189    use crate::journal::variable::{Journal, ITEM_ALIGNMENT};
190
191    if items_count == 0 {
192        return Ok(0);
193    }
194
195    let mut current_offset = 0u32;
196
197    // Read through items one by one to find where each one ends
198    for _ in 0..items_count {
199        match Journal::<E, V>::read(compressed, codec_config, blob, current_offset).await {
200            Ok((next_slot, _item_len, _item)) => {
201                current_offset = next_slot;
202            }
203            Err(Error::Runtime(commonware_runtime::Error::BlobInsufficientLength)) => {
204                // This section has fewer than `items_count` items.
205                break;
206            }
207            Err(e) => return Err(e),
208        }
209    }
210
211    Ok((current_offset as u64) * ITEM_ALIGNMENT)
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use crate::journal::variable::ITEM_ALIGNMENT;
218    use commonware_macros::test_traced;
219    use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _};
220    use commonware_utils::{NZUsize, NZU64};
221
222    // Use some jank sizes to exercise boundary conditions.
223    const PAGE_SIZE: usize = 101;
224    const PAGE_CACHE_SIZE: usize = 2;
225
226    /// Test `init_journal` when there is no existing data on disk.
227    #[test_traced]
228    fn test_init_journal_no_existing_data() {
229        let executor = deterministic::Runner::default();
230        executor.start(|context| async move {
231            let cfg = VConfig {
232                partition: "test_fresh_start".into(),
233                compression: None,
234                codec_config: (),
235                write_buffer: NZUsize!(1024),
236                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
237            };
238
239            // Initialize journal with sync boundaries when no existing data exists
240            let lower_bound = 10;
241            let upper_bound = 25;
242            let items_per_section = NZU64!(5);
243            let mut journal = init_journal(
244                context.clone(),
245                cfg.clone(),
246                lower_bound,
247                upper_bound,
248                items_per_section,
249            )
250            .await
251            .expect("Failed to initialize journal with sync boundaries");
252
253            // Verify the journal is ready for sync items
254            assert!(journal.blobs.is_empty()); // No sections created yet
255            assert_eq!(journal.oldest_section(), None); // No pruning applied
256
257            // Verify that items can be appended starting from the sync position
258            let lower_section = lower_bound / items_per_section; // 10/5 = 2
259
260            // Append an element
261            let (offset, _) = journal.append(lower_section, 42u64).await.unwrap();
262            assert_eq!(offset, 0); // First item in section
263
264            // Verify the item can be retrieved
265            let retrieved = journal.get(lower_section, offset).await.unwrap();
266            assert_eq!(retrieved, 42u64);
267
268            // Append another element
269            let (offset2, _) = journal.append(lower_section, 43u64).await.unwrap();
270            assert_eq!(journal.get(lower_section, offset2).await.unwrap(), 43u64);
271
272            journal.destroy().await.unwrap();
273        });
274    }
275
276    /// Test `init_journal` when there is existing data that overlaps with the sync target range.
277    #[test_traced]
278    fn test_init_journal_existing_data_overlap() {
279        let executor = deterministic::Runner::default();
280        executor.start(|context| async move {
281            let cfg = VConfig {
282                partition: "test_overlap".into(),
283                compression: None,
284                codec_config: (),
285                write_buffer: NZUsize!(1024),
286                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
287            };
288
289            // Create initial journal with data in multiple sections
290            let mut journal =
291                VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
292                    .await
293                    .expect("Failed to create initial journal");
294
295            let items_per_section = NZU64!(5);
296
297            // Add data to sections 0, 1, 2, 3 (simulating items 0-19 with items_per_section=5)
298            for section in 0..4 {
299                for item in 0..items_per_section.get() {
300                    journal.append(section, section * 10 + item).await.unwrap();
301                }
302            }
303            journal.close().await.unwrap();
304
305            // Initialize with sync boundaries that overlap with existing data
306            // lower_bound: 8 (section 1), upper_bound: 30 (section 6)
307            let lower_bound = 8;
308            let upper_bound = 30;
309            let mut journal = init_journal(
310                context.clone(),
311                cfg.clone(),
312                lower_bound,
313                upper_bound,
314                items_per_section,
315            )
316            .await
317            .expect("Failed to initialize journal with overlap");
318
319            // Verify pruning: sections before lower_section are pruned
320            let lower_section = lower_bound / items_per_section; // 8/5 = 1
321            assert_eq!(lower_section, 1);
322            assert_eq!(journal.oldest_section(), Some(lower_section));
323
324            // Verify section 0 is pruned (< lower_section), section 1+ are retained (>= lower_section)
325            assert!(!journal.blobs.contains_key(&0)); // Section 0 should be pruned
326            assert!(journal.blobs.contains_key(&1)); // Section 1 should be retained (contains item 8)
327            assert!(journal.blobs.contains_key(&2)); // Section 2 should be retained
328            assert!(journal.blobs.contains_key(&3)); // Section 3 should be retained
329            assert!(!journal.blobs.contains_key(&4)); // Section 4 should not exist
330
331            // Verify data integrity: existing data in retained sections is accessible
332            let item = journal.get(1, 0).await.unwrap();
333            assert_eq!(item, 10u64); // First item in section 1 (1*10+0)
334            let item = journal.get(1, 1).await.unwrap();
335            assert_eq!(item, 11); // Second item in section 1 (1*10+1)
336            let item = journal.get(2, 0).await.unwrap();
337            assert_eq!(item, 20); // First item in section 2 (2*10+0)
338            let last_element_section = 19 / items_per_section;
339            let last_element_offset = (19 % items_per_section.get()) as u32;
340            let item = journal
341                .get(last_element_section, last_element_offset)
342                .await
343                .unwrap();
344            assert_eq!(item, 34); // Last item in section 3 (3*10+4)
345            let next_element_section = 20 / items_per_section;
346            let next_element_offset = (20 % items_per_section.get()) as u32;
347            let result = journal.get(next_element_section, next_element_offset).await;
348            assert!(matches!(result, Err(Error::SectionOutOfRange(4)))); // Next element should not exist
349
350            // Assert journal can accept new items
351            let (offset, _) = journal.append(next_element_section, 999).await.unwrap();
352            assert_eq!(
353                journal.get(next_element_section, offset).await.unwrap(),
354                999
355            );
356
357            journal.destroy().await.unwrap();
358        });
359    }
360
361    /// Test `init_journal` with invalid parameters.
362    #[test_traced]
363    fn test_init_journal_invalid_parameters() {
364        let executor = deterministic::Runner::default();
365        executor.start(|context| async move {
366            let cfg = VConfig {
367                partition: "test_invalid".into(),
368                compression: None,
369                codec_config: (),
370                write_buffer: NZUsize!(1024),
371                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
372            };
373
374            // Test invalid bounds: lower > upper
375            let result = init_journal::<deterministic::Context, u64>(
376                context.clone(),
377                cfg.clone(),
378                10,        // lower_bound
379                5,         // upper_bound (invalid: < lower_bound)
380                NZU64!(5), // items_per_section
381            )
382            .await;
383            assert!(matches!(result, Err(Error::InvalidSyncRange(10, 5))));
384        });
385    }
386
387    /// Test `init_journal` when existing data exactly matches the sync range.
388    #[test_traced]
389    fn test_init_journal_existing_data_exact_match() {
390        let executor = deterministic::Runner::default();
391        executor.start(|context| async move {
392            let cfg = VConfig {
393                partition: "test_exact_match".into(),
394                compression: None,
395                codec_config: (),
396                write_buffer: NZUsize!(1024),
397                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
398            };
399
400            // Create initial journal with data exactly matching sync range
401            let mut journal =
402                VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
403                    .await
404                    .expect("Failed to create initial journal");
405
406            // Add data to sections 1, 2, 3 (operations 5-19 with items_per_section=5)
407            let items_per_section = NZU64!(5);
408            for section in 1..4 {
409                for item in 0..items_per_section.get() {
410                    journal.append(section, section * 100 + item).await.unwrap();
411                }
412            }
413            journal.close().await.unwrap();
414
415            // Initialize with sync boundaries that exactly match existing data
416            let lower_bound = 5; // section 1
417            let upper_bound = 19; // section 3
418            let journal = init_journal(
419                context.clone(),
420                cfg.clone(),
421                lower_bound,
422                upper_bound,
423                items_per_section,
424            )
425            .await
426            .expect("Failed to initialize journal with exact match");
427
428            // Verify pruning to lower bound
429            let lower_section = lower_bound / items_per_section; // 5/5 = 1
430            assert_eq!(journal.oldest_section(), Some(lower_section));
431
432            // Verify section 0 is pruned, sections 1-3 are retained
433            assert!(!journal.blobs.contains_key(&0)); // Section 0 should be pruned
434            assert!(journal.blobs.contains_key(&1)); // Section 1 should be retained (contains operation 5)
435            assert!(journal.blobs.contains_key(&2)); // Section 2 should be retained
436            assert!(journal.blobs.contains_key(&3)); // Section 3 should be retained
437
438            // Verify data integrity: existing data in retained sections is accessible
439            let item = journal.get(1, 0).await.unwrap();
440            assert_eq!(item, 100u64); // First item in section 1 (1*100+0)
441            let item = journal.get(1, 1).await.unwrap();
442            assert_eq!(item, 101); // Second item in section 1 (1*100+1)
443            let item = journal.get(2, 0).await.unwrap();
444            assert_eq!(item, 200); // First item in section 2 (2*100+0)
445            let last_element_section = 19 / items_per_section;
446            let last_element_offset = (19 % items_per_section.get()) as u32;
447            let item = journal
448                .get(last_element_section, last_element_offset)
449                .await
450                .unwrap();
451            assert_eq!(item, 304); // Last item in section 3 (3*100+4)
452            let next_element_section = 20 / items_per_section;
453            let next_element_offset = (20 % items_per_section.get()) as u32;
454            let result = journal.get(next_element_section, next_element_offset).await;
455            assert!(matches!(result, Err(Error::SectionOutOfRange(4)))); // Next element should not exist
456
457            // Assert journal can accept new operations
458            let mut journal = journal;
459            let (offset, _) = journal.append(next_element_section, 999).await.unwrap();
460            assert_eq!(
461                journal.get(next_element_section, offset).await.unwrap(),
462                999
463            );
464
465            journal.destroy().await.unwrap();
466        });
467    }
468
469    /// Test `init_journal` when existing data exceeds the sync target range.
470    #[test_traced]
471    fn test_init_journal_existing_data_with_rewind() {
472        let executor = deterministic::Runner::default();
473        executor.start(|context| async move {
474            let cfg = VConfig {
475                partition: "test_rewind".into(),
476                compression: None,
477                codec_config: (),
478                write_buffer: NZUsize!(1024),
479                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
480            };
481
482            // Create initial journal with data beyond sync range
483            let mut journal =
484                VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
485                    .await
486                    .expect("Failed to create initial journal");
487
488            // Add data to sections 0-5 (operations 0-29 with items_per_section=5)
489            let items_per_section = NZU64!(5);
490            for section in 0..6 {
491                for item in 0..items_per_section.get() {
492                    journal
493                        .append(section, section * 1000 + item)
494                        .await
495                        .unwrap();
496                }
497            }
498            journal.close().await.unwrap();
499
500            // Initialize with sync boundaries that are exceeded by existing data
501            let lower_bound = 8; // section 1
502            let upper_bound = 17; // section 3
503            let mut journal = init_journal(
504                context.clone(),
505                cfg.clone(),
506                lower_bound,
507                upper_bound,
508                items_per_section,
509            )
510            .await
511            .expect("Failed to initialize journal with rewind");
512
513            // Verify pruning to lower bound and rewinding beyond upper bound
514            let lower_section = lower_bound / items_per_section; // 8/5 = 1
515            assert_eq!(journal.oldest_section(), Some(lower_section));
516
517            // Verify section 0 is pruned (< lower_section)
518            assert!(!journal.blobs.contains_key(&0));
519
520            // Verify sections within sync range exist (lower_section <= section <= upper_section)
521            assert!(journal.blobs.contains_key(&1)); // Section 1 (contains operation 8)
522            assert!(journal.blobs.contains_key(&2)); // Section 2
523            assert!(journal.blobs.contains_key(&3)); // Section 3 (contains operation 17)
524
525            // Verify sections beyond upper bound are removed (> upper_section)
526            assert!(!journal.blobs.contains_key(&4)); // Section 4 should be removed
527            assert!(!journal.blobs.contains_key(&5)); // Section 5 should be removed
528
529            // Verify data integrity in retained sections
530            let item = journal.get(1, 0).await.unwrap();
531            assert_eq!(item, 1000u64); // First item in section 1 (1*1000+0)
532            let item = journal.get(1, 1).await.unwrap();
533            assert_eq!(item, 1001); // Second item in section 1 (1*1000+1)
534            let item = journal.get(3, 0).await.unwrap();
535            assert_eq!(item, 3000); // First item in section 3 (3*1000+0)
536            let last_element_section = 17 / items_per_section;
537            let last_element_offset = (17 % items_per_section.get()) as u32;
538            let item = journal
539                .get(last_element_section, last_element_offset)
540                .await
541                .unwrap();
542            assert_eq!(item, 3002); // Last item in section 3 (3*1000+2)
543
544            // Verify that section 3 was properly truncated
545            let section_3_size = journal.size(3).await.unwrap();
546            assert_eq!(section_3_size, 3 * ITEM_ALIGNMENT);
547
548            // Verify that operations beyond upper_bound (17) are not accessible
549            // Reading beyond the truncated section should return an error
550            let result = journal.get(3, 3).await;
551            assert!(result.is_err()); // Operation 18 should be inaccessible (beyond upper_bound=17)
552
553            // Assert journal can accept new operations
554            let (offset, _) = journal.append(3, 999).await.unwrap();
555            assert_eq!(journal.get(3, offset).await.unwrap(), 999);
556
557            journal.destroy().await.unwrap();
558        });
559    }
560
561    /// Test `init_journal` when all existing data is stale (before lower bound).
562    #[test_traced]
563    fn test_init_journal_existing_data_stale() {
564        let executor = deterministic::Runner::default();
565        executor.start(|context| async move {
566            let cfg = VConfig {
567                partition: "test_stale".into(),
568                compression: None,
569                codec_config: (),
570                write_buffer: NZUsize!(1024),
571                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
572            };
573
574            // Create initial journal with stale data
575            let mut journal =
576                VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
577                    .await
578                    .expect("Failed to create initial journal");
579
580            // Add data to sections 0, 1 (operations 0-9 with items_per_section=5)
581            let items_per_section = NZU64!(5);
582            for section in 0..2 {
583                for item in 0..items_per_section.get() {
584                    journal.append(section, section * 100 + item).await.unwrap();
585                }
586            }
587            journal.close().await.unwrap();
588
589            // Initialize with sync boundaries beyond all existing data
590            let lower_bound = 15; // section 3
591            let upper_bound = 25; // section 5
592            let journal = init_journal::<deterministic::Context, u64>(
593                context.clone(),
594                cfg.clone(),
595                lower_bound,
596                upper_bound,
597                items_per_section,
598            )
599            .await
600            .expect("Failed to initialize journal with stale data");
601
602            // Verify fresh journal (all old data destroyed)
603            assert!(journal.blobs.is_empty());
604            assert_eq!(journal.oldest_section(), None);
605
606            // Verify old sections don't exist
607            assert!(!journal.blobs.contains_key(&0));
608            assert!(!journal.blobs.contains_key(&1));
609
610            journal.destroy().await.unwrap();
611        });
612    }
613
614    /// Test `init_journal` with section boundary edge cases.
615    #[test_traced]
616    fn test_init_journal_section_boundaries() {
617        let executor = deterministic::Runner::default();
618        executor.start(|context| async move {
619            let cfg = VConfig {
620                partition: "test_boundaries".into(),
621                compression: None,
622                codec_config: (),
623                write_buffer: NZUsize!(1024),
624                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
625            };
626
627            // Create journal with data at section boundaries
628            let mut journal =
629                VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
630                    .await
631                    .expect("Failed to create initial journal");
632
633            let items_per_section = NZU64!(5);
634
635            // Add data to sections 0, 1, 2, 3, 4
636            for section in 0..5 {
637                for item in 0..items_per_section.get() {
638                    journal.append(section, section * 100 + item).await.unwrap();
639                }
640            }
641            journal.close().await.unwrap();
642
643            // Test sync boundaries exactly at section boundaries
644            let lower_bound = 10; // Exactly at section boundary (10/5 = 2)
645            let upper_bound = 19; // Exactly at section boundary (19/5 = 3)
646            let mut journal = init_journal(
647                context.clone(),
648                cfg.clone(),
649                lower_bound,
650                upper_bound,
651                items_per_section,
652            )
653            .await
654            .expect("Failed to initialize journal at boundaries");
655
656            // Verify correct section range
657            let lower_section = lower_bound / items_per_section; // 2
658            assert_eq!(journal.oldest_section(), Some(lower_section));
659
660            // Verify sections 2, 3, 4 exist, others don't
661            assert!(!journal.blobs.contains_key(&0));
662            assert!(!journal.blobs.contains_key(&1));
663            assert!(journal.blobs.contains_key(&2));
664            assert!(journal.blobs.contains_key(&3));
665            assert!(!journal.blobs.contains_key(&4)); // Section 4 should not exist
666
667            // Verify data integrity in retained sections
668            let item = journal.get(2, 0).await.unwrap();
669            assert_eq!(item, 200u64); // First item in section 2
670            let item = journal.get(3, 4).await.unwrap();
671            assert_eq!(item, 304); // Last element
672            let next_element_section = 4;
673            let result = journal.get(next_element_section, 0).await;
674            assert!(matches!(result, Err(Error::SectionOutOfRange(4))));
675
676            // Assert journal can accept new operations
677            let (offset, _) = journal.append(next_element_section, 999).await.unwrap();
678            assert_eq!(
679                journal.get(next_element_section, offset).await.unwrap(),
680                999
681            );
682
683            journal.destroy().await.unwrap();
684        });
685    }
686
687    /// Test `init_journal` when lower_bound and upper_bound are in the same section.
688    #[test_traced]
689    fn test_init_journal_same_section_bounds() {
690        let executor = deterministic::Runner::default();
691        executor.start(|context| async move {
692            let cfg = VConfig {
693                partition: "test_same_section".into(),
694                compression: None,
695                codec_config: (),
696                write_buffer: NZUsize!(1024),
697                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
698            };
699
700            // Create journal with data in multiple sections
701            let mut journal =
702                VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
703                    .await
704                    .expect("Failed to create initial journal");
705
706            let items_per_section = NZU64!(5);
707
708            // Add data to sections 0, 1, 2
709            for section in 0..3 {
710                for item in 0..items_per_section.get() {
711                    journal.append(section, section * 100 + item).await.unwrap();
712                }
713            }
714            journal.close().await.unwrap();
715
716            // Test sync boundaries within the same section
717            let lower_bound = 6; // operation 6 (section 1: 6/5 = 1)
718            let upper_bound = 8; // operation 8 (section 1: 8/5 = 1)
719            let journal = init_journal(
720                context.clone(),
721                cfg.clone(),
722                lower_bound,
723                upper_bound,
724                items_per_section,
725            )
726            .await
727            .expect("Failed to initialize journal with same-section bounds");
728
729            // Both operations are in section 1, so section 0 should be pruned, section 1+ retained
730            let target_section = lower_bound / items_per_section; // 6/5 = 1
731            assert_eq!(journal.oldest_section(), Some(target_section));
732
733            // Verify pruning and retention
734            assert!(!journal.blobs.contains_key(&0)); // Section 0 should be pruned
735            assert!(journal.blobs.contains_key(&1)); // Section 1 should be retained
736            assert!(!journal.blobs.contains_key(&2)); // Section 2 should be removed (> upper_section)
737
738            // Verify data integrity
739            let item = journal.get(1, 0).await.unwrap();
740            assert_eq!(item, 100u64); // First item in section 1
741            let item = journal.get(1, 1).await.unwrap();
742            assert_eq!(item, 101); // Second item in section 1 (1*100+1)
743            let item = journal.get(1, 3).await.unwrap();
744            assert_eq!(item, 103); // Item at offset 3 in section 1 (1*100+3)
745
746            // Verify that section 1 was properly truncated
747            let section_1_size = journal.size(1).await.unwrap();
748            assert_eq!(section_1_size, 64); // Should be 4 operations * 16 bytes = 64 bytes
749
750            // Verify that operation beyond upper_bound (8) is not accessible
751            let result = journal.get(1, 4).await;
752            assert!(result.is_err()); // Operation 9 should be inaccessible (beyond upper_bound=8)
753
754            let result = journal.get(2, 0).await;
755            assert!(matches!(result, Err(Error::SectionOutOfRange(2)))); // Section 2 was removed, so no items
756
757            // Assert journal can accept new operations
758            let mut journal = journal;
759            let (offset, _) = journal.append(target_section, 999).await.unwrap();
760            assert_eq!(journal.get(target_section, offset).await.unwrap(), 999);
761
762            journal.destroy().await.unwrap();
763        });
764    }
765
766    /// Test `compute_offset` correctly calculates byte boundaries for variable-sized items.
767    #[test_traced]
768    fn test_compute_offset() {
769        let executor = deterministic::Runner::default();
770        executor.start(|context| async move {
771            let cfg = VConfig {
772                partition: "test_compute_offset".into(),
773                compression: None,
774                codec_config: (),
775                write_buffer: NZUsize!(1024),
776                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
777            };
778
779            // Create a journal and populate a section with 5 operations
780            let mut journal =
781                VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
782                    .await
783                    .expect("Failed to create journal");
784
785            let section = 0;
786            for i in 0..5 {
787                journal.append(section, i as u64).await.unwrap();
788            }
789            journal.sync(section).await.unwrap();
790
791            let blob = journal.blobs.get(&section).unwrap();
792
793            // Helper function to compute byte size for N operations
794            let compute_offset = |operations_count: u32| async move {
795                compute_offset::<deterministic::Context, u64>(
796                    blob,
797                    &journal.cfg.codec_config,
798                    journal.cfg.compression.is_some(),
799                    operations_count,
800                )
801                .await
802                .unwrap()
803            };
804
805            // Test various operation counts (each u64 operation takes 16 bytes when aligned)
806            assert_eq!(compute_offset(0).await, 0); // 0 operations = 0 bytes
807            assert_eq!(compute_offset(1).await, 16); // 1 operation = 16 bytes
808            assert_eq!(compute_offset(3).await, 48); // 3 operations = 48 bytes
809            assert_eq!(compute_offset(5).await, 80); // 5 operations = 80 bytes
810
811            // Test requesting more operations than available (should return size of all available)
812            assert_eq!(compute_offset(10).await, 80); // Still 80 bytes (capped at available)
813
814            journal.destroy().await.unwrap();
815        });
816    }
817
818    /// Test `truncate_upper_section` correctly removes items beyond sync boundaries.
819    #[test_traced]
820    fn test_truncate_section_to_upper_bound() {
821        let executor = deterministic::Runner::default();
822        executor.start(|context| async move {
823            let cfg = VConfig {
824                partition: "test_truncate_section".into(),
825                compression: None,
826                codec_config: (),
827                write_buffer: NZUsize!(1024),
828                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
829            };
830            let items_per_section = 5;
831
832            // Helper to create a fresh journal with test data
833            let create_journal = || async {
834                let mut journal =
835                    VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
836                        .await
837                        .expect("Failed to create journal");
838
839                // Add operations to sections 0, 1, 2
840                for section in 0..3 {
841                    for i in 0..items_per_section {
842                        journal.append(section, section * 100 + i).await.unwrap();
843                    }
844                    journal.sync(section).await.unwrap();
845                }
846                journal
847            };
848
849            // Test 1: No truncation needed (upper_bound at section end)
850            {
851                let mut journal = create_journal().await;
852                let upper_bound = 9; // End of section 1 (section 1: ops 5-9)
853                truncate_upper_section(&mut journal, upper_bound, items_per_section)
854                    .await
855                    .unwrap();
856
857                // Section 1 should remain unchanged (5 operations = 80 bytes)
858                let section_1_size = journal.size(1).await.unwrap();
859                assert_eq!(section_1_size, 80);
860                journal.destroy().await.unwrap();
861            }
862
863            // Test 2: Truncation needed (upper_bound mid-section)
864            {
865                let mut journal = create_journal().await;
866                let upper_bound = 7; // Middle of section 1 (keep ops 5, 6, 7)
867                truncate_upper_section(&mut journal, upper_bound, items_per_section)
868                    .await
869                    .unwrap();
870
871                // Section 1 should now have only 3 operations (48 bytes)
872                let section_1_size = journal.size(1).await.unwrap();
873                assert_eq!(section_1_size, 48);
874
875                // Verify the remaining operations are accessible
876                assert_eq!(journal.get(1, 0).await.unwrap(), 100); // section 1, offset 0 = 1*100+0
877                assert_eq!(journal.get(1, 1).await.unwrap(), 101); // section 1, offset 1 = 1*100+1
878                assert_eq!(journal.get(1, 2).await.unwrap(), 102); // section 1, offset 2 = 1*100+2
879
880                // Verify truncated operations are not accessible
881                let result = journal.get(1, 3).await;
882                assert!(result.is_err());
883                journal.destroy().await.unwrap();
884            }
885
886            // Test 3: Non-existent section (should not error)
887            {
888                let mut journal = create_journal().await;
889                truncate_upper_section(
890                    &mut journal,
891                    99, // upper_bound that would be in a non-existent section
892                    items_per_section,
893                )
894                .await
895                .unwrap(); // Should not error
896                journal.destroy().await.unwrap();
897            }
898
899            // Test 4: Upper bound beyond section (no truncation)
900            {
901                let mut journal = create_journal().await;
902                let upper_bound = 15; // Beyond section 2
903                let original_section_2_size = journal.size(2).await.unwrap();
904                truncate_upper_section(&mut journal, upper_bound, items_per_section)
905                    .await
906                    .unwrap();
907
908                // Section 2 should remain unchanged
909                let section_2_size = journal.size(2).await.unwrap();
910                assert_eq!(section_2_size, original_section_2_size);
911                journal.destroy().await.unwrap();
912            }
913        });
914    }
915
916    /// Test intra-section truncation.
917    #[test_traced]
918    fn test_truncate_section_mid_section() {
919        let executor = deterministic::Runner::default();
920        executor.start(|context| async move {
921            let cfg = VConfig {
922                partition: "test_truncation_integration".into(),
923                compression: None,
924                codec_config: (),
925                write_buffer: NZUsize!(1024),
926                buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
927            };
928            let items_per_section = 3;
929
930            // Create journal with data across multiple sections
931            let mut journal =
932                VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
933                    .await
934                    .expect("Failed to create journal");
935
936            // Section 0: items 0, 1, 2
937            // Section 1: items 3, 4, 5
938            // Section 2: items 6, 7, 8
939            for section in 0..3 {
940                for i in 0..items_per_section {
941                    let op_value = section * items_per_section + i;
942                    journal.append(section, op_value).await.unwrap();
943                }
944            }
945            journal.close().await.unwrap();
946
947            // Test sync with upper_bound in middle of section 1 (upper_bound = 4)
948            // Should keep: items 2, 3, 4 (sections 0 partially removed, 1 truncated, 2 removed)
949            let lower_bound = 2;
950            let upper_bound = 4;
951            let mut journal = init_journal(
952                context.clone(),
953                cfg.clone(),
954                lower_bound,
955                upper_bound,
956                NZU64!(items_per_section),
957            )
958            .await
959            .expect("Failed to initialize synced journal");
960
961            // Verify section 0 is partially present (only item 2)
962            assert!(journal.blobs.contains_key(&0));
963            assert_eq!(journal.get(0, 2).await.unwrap(), 2u64);
964
965            // Verify section 1 is truncated (items 3, 4 only)
966            assert!(journal.blobs.contains_key(&1));
967            assert_eq!(journal.get(1, 0).await.unwrap(), 3);
968            assert_eq!(journal.get(1, 1).await.unwrap(), 4);
969
970            // item 5 should be inaccessible (truncated)
971            let result = journal.get(1, 2).await;
972            assert!(result.is_err());
973
974            // Verify section 2 is completely removed
975            assert!(!journal.blobs.contains_key(&2));
976
977            // Test that new appends work correctly after truncation
978            let (offset, _) = journal.append(1, 999).await.unwrap();
979            assert_eq!(journal.get(1, offset).await.unwrap(), 999);
980
981            journal.destroy().await.unwrap();
982        });
983    }
984}