Skip to main content

journal_core/file/
file.rs

1#![allow(clippy::field_reassign_with_default)]
2
3pub use super::file_iterators::{
4    EntryDataIterator, FieldDataIterator, FieldDataOffsetIterator, FieldIterator,
5};
6pub use super::file_payload::{DataPayloadObjectInfo, DataPayloadReadContext};
7use super::mmap::{
8    ExperimentalMmapStrategy, MemoryMap, MemoryMapMut, WindowManager, WindowManagerStats,
9};
10use crate::error::{JournalError, Result};
11use crate::file::guarded_cell::GuardedCell;
12use crate::file::hash;
13use crate::file::object::*;
14use crate::file::offset_array;
15use std::fs::{File, OpenOptions};
16use std::marker::PhantomData;
17use std::num::NonZeroU64;
18use std::path::Path;
19use std::time::Duration;
20use zerocopy::{ByteSlice, FromBytes};
21
22use crate::file::value_guard::ValueGuard;
23
24// Size to pad objects to (8 bytes)
25pub(super) const OBJECT_ALIGNMENT: u64 = 8;
26const FILE_SIZE_INCREASE: u64 = 8 * 1024 * 1024;
27pub(super) const JOURNAL_COMPACT_SIZE_MAX: u64 = u32::MAX as u64;
28const DEFAULT_MAX_FILE_SIZE: u64 = 128 * 1024 * 1024;
29const JOURNAL_FILE_SIZE_MIN: u64 = 512 * 1024;
30const PAGE_SIZE: u64 = 4096;
31const DEFAULT_DATA_HASH_TABLE_SIZE: usize = 2047;
32const DEFAULT_FIELD_HASH_TABLE_SIZE: usize = 1023;
33pub const DEFAULT_COMPRESS_THRESHOLD: usize = 512;
34pub const MIN_COMPRESS_THRESHOLD: usize = 8;
35pub const DEFAULT_JOURNAL_FILE_MODE: u32 = 0o640;
36
37pub fn normalize_compress_threshold(threshold: usize) -> usize {
38    threshold.max(MIN_COMPRESS_THRESHOLD)
39}
40
41fn align_to(value: u64, alignment: u64) -> u64 {
42    value.saturating_add(alignment.saturating_sub(1)) & !(alignment.saturating_sub(1))
43}
44
45fn normalize_journal_max_file_size(max_file_size: Option<u64>, compact: bool) -> u64 {
46    let mut size = match max_file_size {
47        Some(0) | None => DEFAULT_MAX_FILE_SIZE,
48        Some(size) => align_to(size, PAGE_SIZE),
49    };
50    if compact && size > JOURNAL_COMPACT_SIZE_MAX {
51        size = JOURNAL_COMPACT_SIZE_MAX;
52    }
53    size.max(JOURNAL_FILE_SIZE_MIN)
54}
55
56fn data_hash_buckets_for_max_file_size(max_file_size: u64) -> usize {
57    let buckets = (max_file_size / 576).max(DEFAULT_DATA_HASH_TABLE_SIZE as u64);
58    buckets.min(usize::MAX as u64) as usize
59}
60
61/// Validates that an offset is properly aligned for journal objects.
62/// Journal objects must be 8-byte aligned.
63pub(super) fn validate_offset_alignment(offset: NonZeroU64) -> Result<()> {
64    if offset.get() % OBJECT_ALIGNMENT != 0 {
65        return Err(JournalError::MisalignedOffset(offset.get()));
66    }
67    Ok(())
68}
69
70pub(super) fn round_up_to_file_size_increment(value: u64) -> Result<u64> {
71    value
72        .checked_add(FILE_SIZE_INCREASE - 1)
73        .map(|v| v & !(FILE_SIZE_INCREASE - 1))
74        .ok_or(JournalError::ObjectExceedsFileBounds)
75}
76
77pub trait BucketVisitor<'a> {
78    type Object: JournalObject<&'a [u8]> + HashableObject;
79    type Output;
80
81    /// Called for each object in the bucket. Return Some(output) to stop iteration,
82    /// or None to continue to the next object.
83    fn visit(&mut self, object: &ValueGuard<'a, Self::Object>) -> Result<Option<Self::Output>>;
84}
85
86#[derive(Debug, Clone, Copy)]
87pub struct PayloadParts<'a> {
88    parts: [&'a [u8]; 3],
89    len: usize,
90    count: usize,
91}
92
93impl<'a> PayloadParts<'a> {
94    pub fn raw(payload: &'a [u8]) -> Self {
95        Self {
96            parts: [payload, &[], &[]],
97            len: payload.len(),
98            count: 1,
99        }
100    }
101
102    pub fn structured(name: &'a [u8], value: &'a [u8]) -> Self {
103        Self {
104            parts: [name, b"=", value],
105            len: name.len() + 1 + value.len(),
106            count: 3,
107        }
108    }
109
110    pub fn len(&self) -> usize {
111        self.len
112    }
113
114    pub fn is_empty(&self) -> bool {
115        self.len == 0
116    }
117
118    pub fn iter(&self) -> std::iter::Copied<std::slice::Iter<'_, &'a [u8]>> {
119        self.parts[..self.count].iter().copied()
120    }
121
122    pub fn as_single_slice(&self) -> Option<&'a [u8]> {
123        (self.count == 1).then_some(self.parts[0])
124    }
125
126    pub fn equals_slice(&self, other: &[u8]) -> bool {
127        if other.len() != self.len {
128            return false;
129        }
130
131        let mut remaining = other;
132        for part in self.iter() {
133            let Some((head, tail)) = remaining.split_at_checked(part.len()) else {
134                return false;
135            };
136            if head != part {
137                return false;
138            }
139            remaining = tail;
140        }
141
142        remaining.is_empty()
143    }
144
145    pub fn copy_to_slice(&self, dst: &mut [u8]) {
146        assert_eq!(dst.len(), self.len);
147        let mut offset = 0usize;
148        for part in self.iter() {
149            let end = offset + part.len();
150            dst[offset..end].copy_from_slice(part);
151            offset = end;
152        }
153    }
154
155    pub fn to_vec(&self) -> Vec<u8> {
156        let mut payload = Vec::with_capacity(self.len);
157        for part in self.iter() {
158            payload.extend_from_slice(part);
159        }
160        payload
161    }
162}
163
164struct PayloadMatcher<'data, T> {
165    payload: PayloadParts<'data>,
166    hash: u64,
167    decompression_buffer: Vec<u8>,
168    _phantom: PhantomData<T>,
169}
170
171impl<'data, B: ByteSlice> PayloadMatcher<'data, FieldObject<B>> {
172    fn field_matcher(payload: &'data [u8], hash: u64) -> Self {
173        Self {
174            payload: PayloadParts::raw(payload),
175            hash,
176            decompression_buffer: Vec::new(),
177            _phantom: PhantomData::<FieldObject<B>>,
178        }
179    }
180}
181
182impl<'a, T> BucketVisitor<'a> for PayloadMatcher<'_, T>
183where
184    T: JournalObject<&'a [u8]> + HashableObject,
185{
186    type Object = T;
187    type Output = NonZeroU64;
188
189    fn visit(&mut self, object: &ValueGuard<'a, Self::Object>) -> Result<Option<Self::Output>> {
190        if object.hash() != self.hash {
191            return Ok(None);
192        }
193
194        let matches = if object.is_compressed() {
195            let len = object.decompress(&mut self.decompression_buffer)?;
196            self.payload.equals_slice(&self.decompression_buffer[..len])
197        } else {
198            self.payload.equals_slice(object.raw_payload())
199        };
200
201        if matches {
202            Ok(Some(object.offset()))
203        } else {
204            Ok(None)
205        }
206    }
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq)]
210pub enum Compression {
211    None,
212    Xz,
213    Lz4,
214    Zstd,
215}
216
217impl Compression {
218    pub fn as_incompatible_flag(&self) -> u32 {
219        match self {
220            Compression::None => 0,
221            Compression::Xz => HeaderIncompatibleFlags::CompressedXz as u32,
222            Compression::Lz4 => HeaderIncompatibleFlags::CompressedLz4 as u32,
223            Compression::Zstd => HeaderIncompatibleFlags::CompressedZstd as u32,
224        }
225    }
226}
227
228impl Default for Compression {
229    fn default() -> Self {
230        Compression::None
231    }
232}
233
234#[derive(Debug, Clone)]
235pub struct JournalFileOptions {
236    pub(super) machine_id: uuid::Uuid,
237    pub(super) seqnum_id: uuid::Uuid,
238    pub(super) file_id: uuid::Uuid,
239    pub(super) window_size: u64,
240    pub(super) data_hash_table_buckets: usize,
241    pub(super) field_hash_table_buckets: usize,
242    pub(super) enable_keyed_hash: bool,
243    pub(super) compression: Compression,
244    pub(super) compress_threshold: usize,
245    pub(super) compact: bool,
246    pub(super) file_mode: u32,
247    pub(super) experimental_mmap_strategy: ExperimentalMmapStrategy,
248    pub seal: Option<crate::seal::SealOptions>,
249}
250
251impl JournalFileOptions {
252    pub fn new(machine_id: uuid::Uuid, _boot_id: uuid::Uuid, seqnum_id: uuid::Uuid) -> Self {
253        let file_id = uuid::Uuid::new_v4();
254
255        Self {
256            machine_id,
257            seqnum_id,
258            file_id,
259            window_size: 64 * 1024,
260            data_hash_table_buckets: 233_016,
261            field_hash_table_buckets: DEFAULT_FIELD_HASH_TABLE_SIZE,
262            enable_keyed_hash: true,
263            compression: Compression::None,
264            compress_threshold: DEFAULT_COMPRESS_THRESHOLD,
265            compact: false,
266            file_mode: DEFAULT_JOURNAL_FILE_MODE,
267            experimental_mmap_strategy: ExperimentalMmapStrategy::Windowed,
268            seal: None,
269        }
270    }
271
272    /// Creates options with bucket sizes optimized based on previous utilization
273    pub fn with_optimized_buckets(
274        mut self,
275        previous_utilization: Option<BucketUtilization>,
276        max_file_size: Option<u64>,
277    ) -> Self {
278        let _ = previous_utilization;
279        let max_file_size = normalize_journal_max_file_size(max_file_size, self.compact);
280
281        self.data_hash_table_buckets = data_hash_buckets_for_max_file_size(max_file_size);
282        self.field_hash_table_buckets = DEFAULT_FIELD_HASH_TABLE_SIZE;
283        self
284    }
285
286    pub fn with_window_size(mut self, size: u64) -> Self {
287        assert_eq!(size % OBJECT_ALIGNMENT, 0);
288        assert_eq!(size % 4096, 0, "Window size must be page-aligned");
289        self.window_size = size;
290        self
291    }
292
293    pub fn with_data_hash_table_buckets(mut self, buckets: usize) -> Self {
294        assert!(buckets > 0, "Hash table buckets must be positive");
295        self.data_hash_table_buckets = buckets;
296        self
297    }
298
299    pub fn with_field_hash_table_buckets(mut self, buckets: usize) -> Self {
300        assert!(buckets > 0, "Hash table buckets must be positive");
301        self.field_hash_table_buckets = buckets;
302        self
303    }
304
305    pub fn with_keyed_hash(mut self, enabled: bool) -> Self {
306        self.enable_keyed_hash = enabled;
307        self
308    }
309
310    pub fn with_file_id(mut self, file_id: uuid::Uuid) -> Self {
311        self.file_id = file_id;
312        self
313    }
314
315    pub fn with_compression(mut self, compression: Compression) -> Self {
316        self.compression = compression;
317        self
318    }
319
320    pub fn with_compress_threshold(mut self, threshold: usize) -> Self {
321        self.compress_threshold = normalize_compress_threshold(threshold);
322        self
323    }
324
325    pub fn with_compact(mut self, compact: bool) -> Self {
326        self.compact = compact;
327        self
328    }
329
330    pub fn with_file_mode(mut self, mode: u32) -> Self {
331        assert!(
332            mode <= 0o777,
333            "journal file mode must contain only permission bits"
334        );
335        self.file_mode = mode;
336        self
337    }
338
339    #[doc(hidden)]
340    pub fn with_experimental_mmap_strategy(mut self, strategy: ExperimentalMmapStrategy) -> Self {
341        self.experimental_mmap_strategy = strategy;
342        self
343    }
344
345    pub fn with_seal(mut self, seal: crate::seal::SealOptions) -> Self {
346        self.seal = Some(seal);
347        self
348    }
349
350    pub fn compression(&self) -> Compression {
351        self.compression
352    }
353
354    pub fn compress_threshold(&self) -> usize {
355        self.compress_threshold
356    }
357
358    pub fn compact(&self) -> bool {
359        self.compact
360    }
361
362    pub fn file_mode(&self) -> u32 {
363        self.file_mode
364    }
365
366    pub fn create<M: MemoryMapMut>(self, file: &crate::repository::File) -> Result<JournalFile<M>> {
367        JournalFile::create(file, self)
368    }
369}
370
371/// Hash table bucket utilization statistics
372#[derive(Debug, Clone, Copy)]
373pub struct BucketUtilization {
374    pub data_occupied: usize,
375    pub data_total: usize,
376    pub field_occupied: usize,
377    pub field_total: usize,
378}
379
380impl BucketUtilization {
381    pub fn data_utilization(&self) -> f64 {
382        if self.data_total == 0 {
383            0.0
384        } else {
385            self.data_occupied as f64 / self.data_total as f64
386        }
387    }
388
389    pub fn field_utilization(&self) -> f64 {
390        if self.field_total == 0 {
391            0.0
392        } else {
393            self.field_occupied as f64 / self.field_total as f64
394        }
395    }
396}
397
398///
399/// A reader for systemd journal files that efficiently maps small regions of the file into memory.
400///
401/// # Memory Management
402///
403/// This implementation uses a window-based memory mapping strategy similar to systemd's original
404/// implementation. Instead of mapping the entire file, it maintains a small set of memory-mapped
405/// windows and reuses them as needed.
406///
407/// # Concurrency and Safety
408///
409/// `JournalFile` uses interior mutability to provide a safe API with the following characteristics:
410///
411/// - The window manager is wrapped in a `GuardedCell` which owns both the `WindowManager` and
412///   its guard flag, providing interior mutability with integrated guard-based exclusion.
413/// - The guard flag ensures only one object can be active at a time.
414/// - Methods like `data_object()` return a `ValueGuard<T>` that automatically releases the guard
415///   when dropped.
416///
417/// This design ensures that memory safety is maintained even though references to memory-mapped
418/// regions could be invalidated when new objects are created.
419pub struct JournalFile<M: MemoryMap> {
420    // The validated File this journal represents
421    pub(super) file: crate::repository::File,
422
423    // Persistent memory maps for journal header and data/field hash tables
424    pub(super) header_map: M,
425    pub(super) sanitized_header: Option<JournalHeader>,
426    pub(super) data_hash_table_map: Option<M>,
427    pub(super) field_hash_table_map: Option<M>,
428
429    // Window manager for other objects (owns the guard flag internally)
430    pub(super) window_manager: GuardedCell<WindowManager<M>>,
431
432    // Forward Secure Sealing options (consumed by JournalWriter on first use)
433    pub seal_options: Option<crate::seal::SealOptions>,
434}
435
436pub(super) fn map_hash_table<M: MemoryMap>(
437    file: &File,
438    header_size: u64,
439    offset: Option<NonZeroU64>,
440    size: Option<NonZeroU64>,
441) -> Result<Option<M>> {
442    let (Some(offset), Some(size)) = (offset, size) else {
443        return Ok(None);
444    };
445
446    let object_header_size = std::mem::size_of::<ObjectHeader>() as u64;
447    if offset.get() < header_size + object_header_size {
448        return Err(JournalError::InvalidObjectLocation);
449    }
450    if size.get() <= object_header_size {
451        return Err(JournalError::InvalidObjectLocation);
452    }
453
454    let offset = offset.get() - object_header_size;
455    let size = object_header_size + size.get();
456    M::create(file, offset, size).map(Some)
457}
458
459fn sanitize_header_for_size(mut header: JournalHeader) -> JournalHeader {
460    if header.header_size < 216 {
461        header.n_data = 0;
462    }
463    if header.header_size < 224 {
464        header.n_fields = 0;
465    }
466    if header.header_size < 232 {
467        header.n_tags = 0;
468    }
469    if header.header_size < 240 {
470        header.n_entry_arrays = 0;
471    }
472    if header.header_size < 248 {
473        header.data_hash_chain_depth = 0;
474    }
475    if header.header_size < 256 {
476        header.field_hash_chain_depth = 0;
477    }
478    if header.header_size < 260 {
479        header.tail_entry_array_offset = 0;
480    }
481    if header.header_size < 264 {
482        header.tail_entry_array_n_entries = 0;
483    }
484    if header.header_size < 272 {
485        header.tail_entry_offset = 0;
486    }
487    header
488}
489
490impl<M: MemoryMap> JournalFile<M> {
491    pub fn visit_bucket<'a, H, V>(
492        &'a self,
493        hash_table: Option<H>,
494        hash: u64,
495        mut visitor: V,
496    ) -> Result<Option<V::Output>>
497    where
498        H: HashTable<Object = V::Object>,
499        V: BucketVisitor<'a>,
500    {
501        let hash_table = hash_table.ok_or(JournalError::MissingHashTable)?;
502        let bucket = hash_table.hash_item_ref(hash);
503        let mut object_offset = bucket.head_hash_offset;
504
505        while let Some(offset) = object_offset {
506            let object_guard = self.journal_object_ref::<V::Object>(offset)?;
507
508            if let Some(output) = visitor.visit(&object_guard)? {
509                return Ok(Some(output));
510            }
511
512            object_offset = object_guard.next_hash_offset();
513        }
514
515        Ok(None)
516    }
517
518    pub fn open(file: &crate::repository::File, window_size: u64) -> Result<Self> {
519        Self::open_repository_file(file.clone(), window_size)
520    }
521
522    pub fn open_with_strategy(
523        file: &crate::repository::File,
524        window_size: u64,
525        strategy: ExperimentalMmapStrategy,
526    ) -> Result<Self> {
527        Self::open_repository_file_with_strategy(file.clone(), window_size, strategy)
528    }
529
530    pub fn open_path(path: impl AsRef<Path>, window_size: u64) -> Result<Self> {
531        Self::open_path_with_strategy(path, window_size, ExperimentalMmapStrategy::Windowed)
532    }
533
534    pub fn open_path_with_strategy(
535        path: impl AsRef<Path>,
536        window_size: u64,
537        strategy: ExperimentalMmapStrategy,
538    ) -> Result<Self> {
539        let path = path.as_ref();
540        let absolute_path = if path.is_absolute() {
541            path.to_path_buf()
542        } else {
543            std::env::current_dir()?.join(path)
544        };
545        let file = crate::repository::File::from_raw_path(&absolute_path)
546            .ok_or(JournalError::InvalidFilename)?;
547        Self::open_repository_file_with_strategy(file, window_size, strategy)
548    }
549
550    pub fn open_snapshot(
551        file: &crate::repository::File,
552        window_size: u64,
553        strategy: ExperimentalMmapStrategy,
554    ) -> Result<Self> {
555        Self::open_repository_file_snapshot(file.clone(), window_size, strategy)
556    }
557
558    pub fn open_path_snapshot(
559        path: impl AsRef<Path>,
560        window_size: u64,
561        strategy: ExperimentalMmapStrategy,
562    ) -> Result<Self> {
563        let path = path.as_ref();
564        let absolute_path = if path.is_absolute() {
565            path.to_path_buf()
566        } else {
567            std::env::current_dir()?.join(path)
568        };
569        let file = crate::repository::File::from_raw_path(&absolute_path)
570            .ok_or(JournalError::InvalidFilename)?;
571        Self::open_repository_file_snapshot(file, window_size, strategy)
572    }
573
574    fn open_repository_file(file: crate::repository::File, window_size: u64) -> Result<Self> {
575        Self::open_repository_file_with_strategy(
576            file,
577            window_size,
578            ExperimentalMmapStrategy::Windowed,
579        )
580    }
581
582    fn open_repository_file_with_strategy(
583        file: crate::repository::File,
584        window_size: u64,
585        strategy: ExperimentalMmapStrategy,
586    ) -> Result<Self> {
587        Self::open_repository_file_with_window_manager(file, window_size, |fd| {
588            WindowManager::new_with_strategy(fd, window_size, 16, strategy)
589        })
590    }
591
592    fn open_repository_file_snapshot(
593        file: crate::repository::File,
594        window_size: u64,
595        strategy: ExperimentalMmapStrategy,
596    ) -> Result<Self> {
597        Self::open_repository_file_with_window_manager(file, window_size, |fd| {
598            WindowManager::new_snapshot(fd, window_size, 16, strategy)
599        })
600    }
601
602    fn open_repository_file_with_window_manager<F>(
603        file: crate::repository::File,
604        window_size: u64,
605        window_manager_builder: F,
606    ) -> Result<Self>
607    where
608        F: FnOnce(File) -> Result<WindowManager<M>>,
609    {
610        debug_assert_eq!(window_size % OBJECT_ALIGNMENT, 0);
611
612        // Open file and check its size
613        let fd = OpenOptions::new()
614            .read(true)
615            .write(false)
616            .open(file.path())?;
617
618        // Create a memory map for the header
619        let header_size = std::mem::size_of::<JournalHeader>() as u64;
620        let header_map = M::create(&fd, 0, header_size)?;
621        let header = JournalHeader::ref_from_prefix(&header_map).unwrap().0;
622        if header.signature != *b"LPKSHHRH" {
623            return Err(JournalError::InvalidMagicNumber);
624        }
625        let sanitized_header =
626            (header.header_size < header_size).then(|| sanitize_header_for_size(*header));
627
628        // Initialize the hash table maps if they exist
629        let data_hash_table_map = map_hash_table(
630            &fd,
631            header.header_size,
632            header.data_hash_table_offset,
633            header.data_hash_table_size,
634        )?;
635        let field_hash_table_map = map_hash_table(
636            &fd,
637            header.header_size,
638            header.field_hash_table_offset,
639            header.field_hash_table_size,
640        )?;
641
642        // Create window manager for the rest of the objects
643        let window_manager = GuardedCell::new(window_manager_builder(fd)?);
644
645        Ok(JournalFile {
646            file,
647            header_map,
648            sanitized_header,
649            data_hash_table_map,
650            field_hash_table_map,
651            window_manager,
652            seal_options: None,
653        })
654    }
655
656    pub fn file(&self) -> &crate::repository::File {
657        &self.file
658    }
659
660    #[doc(hidden)]
661    pub fn mmap_stats(&self) -> Result<WindowManagerStats> {
662        Ok(self.window_manager.borrow_mut_checked()?.stats())
663    }
664
665    #[doc(hidden)]
666    pub fn reader_file_size(&self) -> Result<u64> {
667        Ok(self.window_manager.borrow_mut_checked()?.stats().file_size)
668    }
669
670    pub fn hash(&self, data: &[u8]) -> u64 {
671        self.hash_parts(PayloadParts::raw(data))
672    }
673
674    pub fn hash_parts(&self, payload: PayloadParts<'_>) -> u64 {
675        let is_keyed_hash = self
676            .journal_header_ref()
677            .has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash);
678
679        hash::journal_hash_data_parts(
680            payload.iter(),
681            is_keyed_hash,
682            if is_keyed_hash {
683                Some(&self.journal_header_ref().file_id)
684            } else {
685                None
686            },
687        )
688    }
689
690    pub fn entry_list(&self) -> Option<offset_array::List> {
691        let header = self.journal_header_ref();
692
693        header.entry_array_offset.and_then(|head_offset| {
694            std::num::NonZeroUsize::new(header.n_entries as usize)
695                .map(|total_items| offset_array::List::new(head_offset, total_items))
696        })
697    }
698
699    pub fn entry_offsets(&self, offsets: &mut Vec<NonZeroU64>) -> Result<()> {
700        if let Some(entry_list) = self.entry_list() {
701            entry_list.collect_offsets(self, offsets)?;
702        }
703
704        Ok(())
705    }
706
707    // Returns the data object offsets of the entry object at the specified
708    // offset
709    pub fn entry_data_object_offsets(
710        &self,
711        entry_offset: NonZeroU64,
712        offsets: &mut Vec<NonZeroU64>,
713    ) -> Result<()> {
714        let entry_guard = self.entry_ref(entry_offset)?;
715        entry_guard.collect_offsets(offsets)
716    }
717
718    pub fn journal_header_ref(&self) -> &JournalHeader {
719        if let Some(header) = &self.sanitized_header {
720            header
721        } else {
722            JournalHeader::ref_from_prefix(&self.header_map).unwrap().0
723        }
724    }
725
726    pub fn data_hash_table_map(&self) -> Option<&M> {
727        self.data_hash_table_map.as_ref()
728    }
729    pub fn field_hash_table_map(&self) -> Option<&M> {
730        self.field_hash_table_map.as_ref()
731    }
732
733    pub fn data_hash_table_ref(&self) -> Option<DataHashTable<&[u8]>> {
734        self.data_hash_table_map
735            .as_ref()
736            .and_then(|m| DataHashTable::<&[u8]>::from_data(m, false))
737    }
738
739    pub fn field_hash_table_ref(&self) -> Option<FieldHashTable<&[u8]>> {
740        self.field_hash_table_map
741            .as_ref()
742            .and_then(|m| FieldHashTable::<&[u8]>::from_data(m, false))
743    }
744
745    pub fn object_header_ref(&self, position: NonZeroU64) -> Result<&ObjectHeader> {
746        validate_offset_alignment(position)?;
747        let size_needed = std::mem::size_of::<ObjectHeader>() as u64;
748        let window_manager = self.window_manager.borrow_mut_checked()?;
749        let header_slice = window_manager.get_slice(position.get(), size_needed)?;
750        ObjectHeader::ref_from_bytes(header_slice).map_err(|_| JournalError::ZerocopyFailure)
751    }
752
753    /// Reads raw bytes from the file at the given offset.
754    /// Returns a copied Vec so no borrow on the window manager is held.
755    pub fn read_bytes_at(&self, offset: u64, size: u64) -> Result<Vec<u8>> {
756        validate_offset_alignment(NonZeroU64::new(offset).ok_or(JournalError::InvalidOffset)?)?;
757        let window_manager = self.window_manager.borrow_mut_checked()?;
758        let src = window_manager.get_slice(offset, size)?;
759        Ok(src.to_vec())
760    }
761
762    #[doc(hidden)]
763    pub fn read_unaligned_bytes_at(&self, offset: u64, size: u64) -> Result<Vec<u8>> {
764        let window_manager = self.window_manager.borrow_mut_checked()?;
765        let src = window_manager.get_slice(offset, size)?;
766        Ok(src.to_vec())
767    }
768
769    fn journal_object_ref<'a, T>(&'a self, offset: NonZeroU64) -> Result<ValueGuard<'a, T>>
770    where
771        T: JournalObject<&'a [u8]>,
772    {
773        let journal_header = self.journal_header_ref();
774        let is_compact = journal_header.has_incompatible_flag(HeaderIncompatibleFlags::Compact);
775        let header_size = journal_header.header_size;
776        let arena_end = header_size + journal_header.arena_size;
777
778        validate_offset_alignment(offset)?;
779
780        // Objects cannot be located in the file header
781        if offset.get() < header_size {
782            return Err(JournalError::ObjectExceedsFileBounds);
783        }
784
785        self.window_manager.with_guarded(offset, |wm| {
786            // Get the object header to determine size
787            let size_needed = {
788                let header_slice =
789                    wm.get_slice(offset.get(), std::mem::size_of::<ObjectHeader>() as u64)?;
790                let header = ObjectHeader::ref_from_bytes(header_slice)
791                    .map_err(|_| JournalError::ZerocopyFailure)?;
792                header.validated_size()?
793            };
794
795            // Validate that the object doesn't exceed the journal's arena bounds
796            let end_offset = offset
797                .get()
798                .checked_add(size_needed)
799                .ok_or(JournalError::ObjectExceedsFileBounds)?;
800            if end_offset > arena_end {
801                return Err(JournalError::ObjectExceedsFileBounds);
802            }
803
804            // Get the full object data
805            let data = wm.get_slice(offset.get(), size_needed)?;
806
807            // Parse the object
808            let value = T::from_data(data, is_compact).ok_or(JournalError::ZerocopyFailure)?;
809
810            Ok(value)
811        })
812    }
813
814    pub fn offset_array_ref(
815        &self,
816        offset: NonZeroU64,
817    ) -> Result<ValueGuard<'_, OffsetArrayObject<&[u8]>>> {
818        self.journal_object_ref(offset)
819    }
820
821    pub fn field_ref(&self, offset: NonZeroU64) -> Result<ValueGuard<'_, FieldObject<&[u8]>>> {
822        self.journal_object_ref(offset)
823    }
824
825    pub fn entry_ref(&self, offset: NonZeroU64) -> Result<ValueGuard<'_, EntryObject<&[u8]>>> {
826        self.journal_object_ref(offset)
827    }
828
829    pub fn data_ref(&self, offset: NonZeroU64) -> Result<ValueGuard<'_, DataObject<&[u8]>>> {
830        self.journal_object_ref(offset)
831    }
832
833    pub fn tag_ref(&self, offset: NonZeroU64) -> Result<ValueGuard<'_, TagObject<&[u8]>>> {
834        self.journal_object_ref(offset)
835    }
836
837    pub fn find_field_offset(&self, hash: u64, payload: &[u8]) -> Result<Option<NonZeroU64>> {
838        let visitor = PayloadMatcher::field_matcher(payload, hash);
839        self.visit_bucket(self.field_hash_table_ref(), hash, visitor)
840    }
841
842    /// Run a directed partition point query on a data object's entry array
843    ///
844    /// This finds the first/last entry (depending on direction) that satisfies the given predicate
845    /// in the entry array chain of the data object.
846    pub fn data_object_directed_partition_point<F>(
847        &self,
848        data_offset: NonZeroU64,
849        predicate: F,
850        direction: offset_array::Direction,
851    ) -> Result<Option<NonZeroU64>>
852    where
853        F: Fn(NonZeroU64) -> Result<bool>,
854    {
855        let Some(cursor) = self.data_ref(data_offset)?.inlined_cursor() else {
856            return Ok(None);
857        };
858
859        let Some(best_match) = cursor.directed_partition_point(self, predicate, direction)? else {
860            return Ok(None);
861        };
862
863        best_match.value(self)
864    }
865
866    /// Creates an iterator over all field objects in the field hash table
867    pub fn fields(&self) -> FieldIterator<'_, M> {
868        // Get the field hash table
869        let field_hash_table = self.field_hash_table_ref();
870
871        // Initialize with the first bucket
872        let mut iterator = FieldIterator {
873            journal: self,
874            field_hash_table,
875            current_bucket_index: 0,
876            next_field_offset: None,
877        };
878
879        // Find the first non-empty bucket
880        iterator.advance_to_next_nonempty_bucket();
881
882        iterator
883    }
884
885    /// Creates an iterator over all DATA objects for the specified field
886    pub fn field_data_objects<'a>(
887        &'a self,
888        field_name: &'a [u8],
889    ) -> Result<FieldDataIterator<'a, M>> {
890        // Find the field offset by name
891        let field_hash = self.hash(field_name);
892        let Some(field_offset) = self.find_field_offset(field_hash, field_name)? else {
893            return Ok(FieldDataIterator {
894                journal: self,
895                current_data_offset: None,
896            });
897        };
898
899        // Get the field object to access its head_data_offset
900        let field_guard = self.field_ref(field_offset)?;
901        let head_data_offset = field_guard.header.head_data_offset;
902
903        // Create the iterator
904        Ok(FieldDataIterator {
905            journal: self,
906            current_data_offset: head_data_offset,
907        })
908    }
909
910    /// Creates an iterator over all DATA objects for the specified field,
911    /// including the on-disk DATA object offset.
912    pub fn field_data_objects_with_offsets<'a>(
913        &'a self,
914        field_name: &'a [u8],
915    ) -> Result<FieldDataOffsetIterator<'a, M>> {
916        let field_hash = self.hash(field_name);
917        let Some(field_offset) = self.find_field_offset(field_hash, field_name)? else {
918            return Ok(FieldDataOffsetIterator {
919                journal: self,
920                current_data_offset: None,
921            });
922        };
923
924        let field_guard = self.field_ref(field_offset)?;
925        let head_data_offset = field_guard.header.head_data_offset;
926
927        Ok(FieldDataOffsetIterator {
928            journal: self,
929            current_data_offset: head_data_offset,
930        })
931    }
932
933    /// Creates an iterator over all DATA objects for a specific entry
934    pub fn entry_data_objects(&self, entry_offset: NonZeroU64) -> Result<EntryDataIterator<'_, M>> {
935        // Get the entry object to determine how many data items it has
936        let entry_guard = self.entry_ref(entry_offset)?;
937
938        // Get the total number of items
939        let total_items = match &entry_guard.items {
940            EntryItemsType::Regular(items) => items.len(),
941            EntryItemsType::Compact(items) => items.len(),
942        };
943
944        // Create the iterator
945        Ok(EntryDataIterator {
946            journal: self,
947            entry_offset: Some(entry_offset),
948            current_index: 0,
949            total_items,
950        })
951    }
952
953    /// Get hash table bucket utilization statistics
954    pub fn bucket_utilization(&self) -> Option<BucketUtilization> {
955        let data_hash_table = self.data_hash_table_ref()?;
956        let data_total = data_hash_table.items.len();
957        let data_occupied = data_hash_table
958            .items
959            .iter()
960            .filter(|item| item.head_hash_offset.is_some())
961            .count();
962
963        let field_hash_table = self.field_hash_table_ref()?;
964        let field_total = field_hash_table.items.len();
965        let field_occupied = field_hash_table
966            .items
967            .iter()
968            .filter(|item| item.head_hash_offset.is_some())
969            .count();
970
971        Some(BucketUtilization {
972            data_occupied,
973            data_total,
974            field_occupied,
975            field_total,
976        })
977    }
978
979    /// Get the duration covered by all entries in the journal
980    /// Returns None if the journal is empty or contains only one entry
981    pub fn duration(&self) -> Option<Duration> {
982        let header = self.journal_header_ref();
983
984        if header.head_entry_realtime == 0 || header.tail_entry_realtime == 0 {
985            return None;
986        }
987
988        if header.tail_entry_realtime <= header.head_entry_realtime {
989            // Single entry or invalid state
990            return None;
991        }
992
993        let duration_micros = header.tail_entry_realtime - header.head_entry_realtime;
994        Some(Duration::from_micros(duration_micros))
995    }
996}
997
998#[cfg(test)]
999#[path = "file_tests.rs"]
1000mod tests;