1#![allow(clippy::field_reassign_with_default)]
2
3pub use super::file_iterators::{
4 EntryDataIterator, FieldDataIterator, FieldDataOffsetIterator, FieldIterator,
5};
6pub use super::file_payload::{DataPayloadObjectInfo, DataPayloadReadContext};
7use super::mmap::{
8 ExperimentalMmapStrategy, MemoryMap, MemoryMapMut, WindowManager, WindowManagerStats,
9};
10use crate::error::{JournalError, Result};
11use crate::file::guarded_cell::GuardedCell;
12use crate::file::hash;
13use crate::file::object::*;
14use crate::file::offset_array;
15use std::fs::{File, OpenOptions};
16use std::marker::PhantomData;
17use std::num::NonZeroU64;
18use std::path::Path;
19use std::time::Duration;
20use zerocopy::{ByteSlice, FromBytes};
21
22use crate::file::value_guard::ValueGuard;
23
24pub(super) const OBJECT_ALIGNMENT: u64 = 8;
26const FILE_SIZE_INCREASE: u64 = 8 * 1024 * 1024;
27pub(super) const JOURNAL_COMPACT_SIZE_MAX: u64 = u32::MAX as u64;
28const DEFAULT_MAX_FILE_SIZE: u64 = 128 * 1024 * 1024;
29const JOURNAL_FILE_SIZE_MIN: u64 = 512 * 1024;
30const PAGE_SIZE: u64 = 4096;
31const DEFAULT_DATA_HASH_TABLE_SIZE: usize = 2047;
32const DEFAULT_FIELD_HASH_TABLE_SIZE: usize = 1023;
33pub const DEFAULT_COMPRESS_THRESHOLD: usize = 512;
34pub const MIN_COMPRESS_THRESHOLD: usize = 8;
35pub const DEFAULT_JOURNAL_FILE_MODE: u32 = 0o640;
36
37pub fn normalize_compress_threshold(threshold: usize) -> usize {
38 threshold.max(MIN_COMPRESS_THRESHOLD)
39}
40
41fn align_to(value: u64, alignment: u64) -> u64 {
42 value.saturating_add(alignment.saturating_sub(1)) & !(alignment.saturating_sub(1))
43}
44
45fn normalize_journal_max_file_size(max_file_size: Option<u64>, compact: bool) -> u64 {
46 let mut size = match max_file_size {
47 Some(0) | None => DEFAULT_MAX_FILE_SIZE,
48 Some(size) => align_to(size, PAGE_SIZE),
49 };
50 if compact && size > JOURNAL_COMPACT_SIZE_MAX {
51 size = JOURNAL_COMPACT_SIZE_MAX;
52 }
53 size.max(JOURNAL_FILE_SIZE_MIN)
54}
55
56fn data_hash_buckets_for_max_file_size(max_file_size: u64) -> usize {
57 let buckets = (max_file_size / 576).max(DEFAULT_DATA_HASH_TABLE_SIZE as u64);
58 buckets.min(usize::MAX as u64) as usize
59}
60
61pub(super) fn validate_offset_alignment(offset: NonZeroU64) -> Result<()> {
64 if offset.get() % OBJECT_ALIGNMENT != 0 {
65 return Err(JournalError::MisalignedOffset(offset.get()));
66 }
67 Ok(())
68}
69
70pub(super) fn round_up_to_file_size_increment(value: u64) -> Result<u64> {
71 value
72 .checked_add(FILE_SIZE_INCREASE - 1)
73 .map(|v| v & !(FILE_SIZE_INCREASE - 1))
74 .ok_or(JournalError::ObjectExceedsFileBounds)
75}
76
77pub trait BucketVisitor<'a> {
78 type Object: JournalObject<&'a [u8]> + HashableObject;
79 type Output;
80
81 fn visit(&mut self, object: &ValueGuard<'a, Self::Object>) -> Result<Option<Self::Output>>;
84}
85
86#[derive(Debug, Clone, Copy)]
87pub struct PayloadParts<'a> {
88 parts: [&'a [u8]; 3],
89 len: usize,
90 count: usize,
91}
92
93impl<'a> PayloadParts<'a> {
94 pub fn raw(payload: &'a [u8]) -> Self {
95 Self {
96 parts: [payload, &[], &[]],
97 len: payload.len(),
98 count: 1,
99 }
100 }
101
102 pub fn structured(name: &'a [u8], value: &'a [u8]) -> Self {
103 Self {
104 parts: [name, b"=", value],
105 len: name.len() + 1 + value.len(),
106 count: 3,
107 }
108 }
109
110 pub fn len(&self) -> usize {
111 self.len
112 }
113
114 pub fn is_empty(&self) -> bool {
115 self.len == 0
116 }
117
118 pub fn iter(&self) -> std::iter::Copied<std::slice::Iter<'_, &'a [u8]>> {
119 self.parts[..self.count].iter().copied()
120 }
121
122 pub fn as_single_slice(&self) -> Option<&'a [u8]> {
123 (self.count == 1).then_some(self.parts[0])
124 }
125
126 pub fn equals_slice(&self, other: &[u8]) -> bool {
127 if other.len() != self.len {
128 return false;
129 }
130
131 let mut remaining = other;
132 for part in self.iter() {
133 let Some((head, tail)) = remaining.split_at_checked(part.len()) else {
134 return false;
135 };
136 if head != part {
137 return false;
138 }
139 remaining = tail;
140 }
141
142 remaining.is_empty()
143 }
144
145 pub fn copy_to_slice(&self, dst: &mut [u8]) {
146 assert_eq!(dst.len(), self.len);
147 let mut offset = 0usize;
148 for part in self.iter() {
149 let end = offset + part.len();
150 dst[offset..end].copy_from_slice(part);
151 offset = end;
152 }
153 }
154
155 pub fn to_vec(&self) -> Vec<u8> {
156 let mut payload = Vec::with_capacity(self.len);
157 for part in self.iter() {
158 payload.extend_from_slice(part);
159 }
160 payload
161 }
162}
163
164struct PayloadMatcher<'data, T> {
165 payload: PayloadParts<'data>,
166 hash: u64,
167 decompression_buffer: Vec<u8>,
168 _phantom: PhantomData<T>,
169}
170
171impl<'data, B: ByteSlice> PayloadMatcher<'data, FieldObject<B>> {
172 fn field_matcher(payload: &'data [u8], hash: u64) -> Self {
173 Self {
174 payload: PayloadParts::raw(payload),
175 hash,
176 decompression_buffer: Vec::new(),
177 _phantom: PhantomData::<FieldObject<B>>,
178 }
179 }
180}
181
182impl<'a, T> BucketVisitor<'a> for PayloadMatcher<'_, T>
183where
184 T: JournalObject<&'a [u8]> + HashableObject,
185{
186 type Object = T;
187 type Output = NonZeroU64;
188
189 fn visit(&mut self, object: &ValueGuard<'a, Self::Object>) -> Result<Option<Self::Output>> {
190 if object.hash() != self.hash {
191 return Ok(None);
192 }
193
194 let matches = if object.is_compressed() {
195 let len = object.decompress(&mut self.decompression_buffer)?;
196 self.payload.equals_slice(&self.decompression_buffer[..len])
197 } else {
198 self.payload.equals_slice(object.raw_payload())
199 };
200
201 if matches {
202 Ok(Some(object.offset()))
203 } else {
204 Ok(None)
205 }
206 }
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq)]
210pub enum Compression {
211 None,
212 Xz,
213 Lz4,
214 Zstd,
215}
216
217impl Compression {
218 pub fn as_incompatible_flag(&self) -> u32 {
219 match self {
220 Compression::None => 0,
221 Compression::Xz => HeaderIncompatibleFlags::CompressedXz as u32,
222 Compression::Lz4 => HeaderIncompatibleFlags::CompressedLz4 as u32,
223 Compression::Zstd => HeaderIncompatibleFlags::CompressedZstd as u32,
224 }
225 }
226}
227
228impl Default for Compression {
229 fn default() -> Self {
230 Compression::None
231 }
232}
233
234#[derive(Debug, Clone)]
235pub struct JournalFileOptions {
236 pub(super) machine_id: uuid::Uuid,
237 pub(super) seqnum_id: uuid::Uuid,
238 pub(super) file_id: uuid::Uuid,
239 pub(super) window_size: u64,
240 pub(super) data_hash_table_buckets: usize,
241 pub(super) field_hash_table_buckets: usize,
242 pub(super) enable_keyed_hash: bool,
243 pub(super) compression: Compression,
244 pub(super) compress_threshold: usize,
245 pub(super) compact: bool,
246 pub(super) file_mode: u32,
247 pub(super) experimental_mmap_strategy: ExperimentalMmapStrategy,
248 pub seal: Option<crate::seal::SealOptions>,
249}
250
251impl JournalFileOptions {
252 pub fn new(machine_id: uuid::Uuid, _boot_id: uuid::Uuid, seqnum_id: uuid::Uuid) -> Self {
253 let file_id = uuid::Uuid::new_v4();
254
255 Self {
256 machine_id,
257 seqnum_id,
258 file_id,
259 window_size: 64 * 1024,
260 data_hash_table_buckets: 233_016,
261 field_hash_table_buckets: DEFAULT_FIELD_HASH_TABLE_SIZE,
262 enable_keyed_hash: true,
263 compression: Compression::None,
264 compress_threshold: DEFAULT_COMPRESS_THRESHOLD,
265 compact: false,
266 file_mode: DEFAULT_JOURNAL_FILE_MODE,
267 experimental_mmap_strategy: ExperimentalMmapStrategy::Windowed,
268 seal: None,
269 }
270 }
271
272 pub fn with_optimized_buckets(
274 mut self,
275 previous_utilization: Option<BucketUtilization>,
276 max_file_size: Option<u64>,
277 ) -> Self {
278 let _ = previous_utilization;
279 let max_file_size = normalize_journal_max_file_size(max_file_size, self.compact);
280
281 self.data_hash_table_buckets = data_hash_buckets_for_max_file_size(max_file_size);
282 self.field_hash_table_buckets = DEFAULT_FIELD_HASH_TABLE_SIZE;
283 self
284 }
285
286 pub fn with_window_size(mut self, size: u64) -> Self {
287 assert_eq!(size % OBJECT_ALIGNMENT, 0);
288 assert_eq!(size % 4096, 0, "Window size must be page-aligned");
289 self.window_size = size;
290 self
291 }
292
293 pub fn with_data_hash_table_buckets(mut self, buckets: usize) -> Self {
294 assert!(buckets > 0, "Hash table buckets must be positive");
295 self.data_hash_table_buckets = buckets;
296 self
297 }
298
299 pub fn with_field_hash_table_buckets(mut self, buckets: usize) -> Self {
300 assert!(buckets > 0, "Hash table buckets must be positive");
301 self.field_hash_table_buckets = buckets;
302 self
303 }
304
305 pub fn with_keyed_hash(mut self, enabled: bool) -> Self {
306 self.enable_keyed_hash = enabled;
307 self
308 }
309
310 pub fn with_file_id(mut self, file_id: uuid::Uuid) -> Self {
311 self.file_id = file_id;
312 self
313 }
314
315 pub fn with_compression(mut self, compression: Compression) -> Self {
316 self.compression = compression;
317 self
318 }
319
320 pub fn with_compress_threshold(mut self, threshold: usize) -> Self {
321 self.compress_threshold = normalize_compress_threshold(threshold);
322 self
323 }
324
325 pub fn with_compact(mut self, compact: bool) -> Self {
326 self.compact = compact;
327 self
328 }
329
330 pub fn with_file_mode(mut self, mode: u32) -> Self {
331 assert!(
332 mode <= 0o777,
333 "journal file mode must contain only permission bits"
334 );
335 self.file_mode = mode;
336 self
337 }
338
339 #[doc(hidden)]
340 pub fn with_experimental_mmap_strategy(mut self, strategy: ExperimentalMmapStrategy) -> Self {
341 self.experimental_mmap_strategy = strategy;
342 self
343 }
344
345 pub fn with_seal(mut self, seal: crate::seal::SealOptions) -> Self {
346 self.seal = Some(seal);
347 self
348 }
349
350 pub fn compression(&self) -> Compression {
351 self.compression
352 }
353
354 pub fn compress_threshold(&self) -> usize {
355 self.compress_threshold
356 }
357
358 pub fn compact(&self) -> bool {
359 self.compact
360 }
361
362 pub fn file_mode(&self) -> u32 {
363 self.file_mode
364 }
365
366 pub fn create<M: MemoryMapMut>(self, file: &crate::repository::File) -> Result<JournalFile<M>> {
367 JournalFile::create(file, self)
368 }
369}
370
371#[derive(Debug, Clone, Copy)]
373pub struct BucketUtilization {
374 pub data_occupied: usize,
375 pub data_total: usize,
376 pub field_occupied: usize,
377 pub field_total: usize,
378}
379
380impl BucketUtilization {
381 pub fn data_utilization(&self) -> f64 {
382 if self.data_total == 0 {
383 0.0
384 } else {
385 self.data_occupied as f64 / self.data_total as f64
386 }
387 }
388
389 pub fn field_utilization(&self) -> f64 {
390 if self.field_total == 0 {
391 0.0
392 } else {
393 self.field_occupied as f64 / self.field_total as f64
394 }
395 }
396}
397
398pub struct JournalFile<M: MemoryMap> {
420 pub(super) file: crate::repository::File,
422
423 pub(super) header_map: M,
425 pub(super) sanitized_header: Option<JournalHeader>,
426 pub(super) data_hash_table_map: Option<M>,
427 pub(super) field_hash_table_map: Option<M>,
428
429 pub(super) window_manager: GuardedCell<WindowManager<M>>,
431
432 pub seal_options: Option<crate::seal::SealOptions>,
434}
435
436pub(super) fn map_hash_table<M: MemoryMap>(
437 file: &File,
438 header_size: u64,
439 offset: Option<NonZeroU64>,
440 size: Option<NonZeroU64>,
441) -> Result<Option<M>> {
442 let (Some(offset), Some(size)) = (offset, size) else {
443 return Ok(None);
444 };
445
446 let object_header_size = std::mem::size_of::<ObjectHeader>() as u64;
447 if offset.get() < header_size + object_header_size {
448 return Err(JournalError::InvalidObjectLocation);
449 }
450 if size.get() <= object_header_size {
451 return Err(JournalError::InvalidObjectLocation);
452 }
453
454 let offset = offset.get() - object_header_size;
455 let size = object_header_size + size.get();
456 M::create(file, offset, size).map(Some)
457}
458
459fn sanitize_header_for_size(mut header: JournalHeader) -> JournalHeader {
460 if header.header_size < 216 {
461 header.n_data = 0;
462 }
463 if header.header_size < 224 {
464 header.n_fields = 0;
465 }
466 if header.header_size < 232 {
467 header.n_tags = 0;
468 }
469 if header.header_size < 240 {
470 header.n_entry_arrays = 0;
471 }
472 if header.header_size < 248 {
473 header.data_hash_chain_depth = 0;
474 }
475 if header.header_size < 256 {
476 header.field_hash_chain_depth = 0;
477 }
478 if header.header_size < 260 {
479 header.tail_entry_array_offset = 0;
480 }
481 if header.header_size < 264 {
482 header.tail_entry_array_n_entries = 0;
483 }
484 if header.header_size < 272 {
485 header.tail_entry_offset = 0;
486 }
487 header
488}
489
490impl<M: MemoryMap> JournalFile<M> {
491 pub fn visit_bucket<'a, H, V>(
492 &'a self,
493 hash_table: Option<H>,
494 hash: u64,
495 mut visitor: V,
496 ) -> Result<Option<V::Output>>
497 where
498 H: HashTable<Object = V::Object>,
499 V: BucketVisitor<'a>,
500 {
501 let hash_table = hash_table.ok_or(JournalError::MissingHashTable)?;
502 let bucket = hash_table.hash_item_ref(hash);
503 let mut object_offset = bucket.head_hash_offset;
504
505 while let Some(offset) = object_offset {
506 let object_guard = self.journal_object_ref::<V::Object>(offset)?;
507
508 if let Some(output) = visitor.visit(&object_guard)? {
509 return Ok(Some(output));
510 }
511
512 object_offset = object_guard.next_hash_offset();
513 }
514
515 Ok(None)
516 }
517
518 pub fn open(file: &crate::repository::File, window_size: u64) -> Result<Self> {
519 Self::open_repository_file(file.clone(), window_size)
520 }
521
522 pub fn open_with_strategy(
523 file: &crate::repository::File,
524 window_size: u64,
525 strategy: ExperimentalMmapStrategy,
526 ) -> Result<Self> {
527 Self::open_repository_file_with_strategy(file.clone(), window_size, strategy)
528 }
529
530 pub fn open_path(path: impl AsRef<Path>, window_size: u64) -> Result<Self> {
531 Self::open_path_with_strategy(path, window_size, ExperimentalMmapStrategy::Windowed)
532 }
533
534 pub fn open_path_with_strategy(
535 path: impl AsRef<Path>,
536 window_size: u64,
537 strategy: ExperimentalMmapStrategy,
538 ) -> Result<Self> {
539 let path = path.as_ref();
540 let absolute_path = if path.is_absolute() {
541 path.to_path_buf()
542 } else {
543 std::env::current_dir()?.join(path)
544 };
545 let file = crate::repository::File::from_raw_path(&absolute_path)
546 .ok_or(JournalError::InvalidFilename)?;
547 Self::open_repository_file_with_strategy(file, window_size, strategy)
548 }
549
550 pub fn open_snapshot(
551 file: &crate::repository::File,
552 window_size: u64,
553 strategy: ExperimentalMmapStrategy,
554 ) -> Result<Self> {
555 Self::open_repository_file_snapshot(file.clone(), window_size, strategy)
556 }
557
558 pub fn open_path_snapshot(
559 path: impl AsRef<Path>,
560 window_size: u64,
561 strategy: ExperimentalMmapStrategy,
562 ) -> Result<Self> {
563 let path = path.as_ref();
564 let absolute_path = if path.is_absolute() {
565 path.to_path_buf()
566 } else {
567 std::env::current_dir()?.join(path)
568 };
569 let file = crate::repository::File::from_raw_path(&absolute_path)
570 .ok_or(JournalError::InvalidFilename)?;
571 Self::open_repository_file_snapshot(file, window_size, strategy)
572 }
573
574 fn open_repository_file(file: crate::repository::File, window_size: u64) -> Result<Self> {
575 Self::open_repository_file_with_strategy(
576 file,
577 window_size,
578 ExperimentalMmapStrategy::Windowed,
579 )
580 }
581
582 fn open_repository_file_with_strategy(
583 file: crate::repository::File,
584 window_size: u64,
585 strategy: ExperimentalMmapStrategy,
586 ) -> Result<Self> {
587 Self::open_repository_file_with_window_manager(file, window_size, |fd| {
588 WindowManager::new_with_strategy(fd, window_size, 16, strategy)
589 })
590 }
591
592 fn open_repository_file_snapshot(
593 file: crate::repository::File,
594 window_size: u64,
595 strategy: ExperimentalMmapStrategy,
596 ) -> Result<Self> {
597 Self::open_repository_file_with_window_manager(file, window_size, |fd| {
598 WindowManager::new_snapshot(fd, window_size, 16, strategy)
599 })
600 }
601
602 fn open_repository_file_with_window_manager<F>(
603 file: crate::repository::File,
604 window_size: u64,
605 window_manager_builder: F,
606 ) -> Result<Self>
607 where
608 F: FnOnce(File) -> Result<WindowManager<M>>,
609 {
610 debug_assert_eq!(window_size % OBJECT_ALIGNMENT, 0);
611
612 let fd = OpenOptions::new()
614 .read(true)
615 .write(false)
616 .open(file.path())?;
617
618 let header_size = std::mem::size_of::<JournalHeader>() as u64;
620 let header_map = M::create(&fd, 0, header_size)?;
621 let header = JournalHeader::ref_from_prefix(&header_map).unwrap().0;
622 if header.signature != *b"LPKSHHRH" {
623 return Err(JournalError::InvalidMagicNumber);
624 }
625 let sanitized_header =
626 (header.header_size < header_size).then(|| sanitize_header_for_size(*header));
627
628 let data_hash_table_map = map_hash_table(
630 &fd,
631 header.header_size,
632 header.data_hash_table_offset,
633 header.data_hash_table_size,
634 )?;
635 let field_hash_table_map = map_hash_table(
636 &fd,
637 header.header_size,
638 header.field_hash_table_offset,
639 header.field_hash_table_size,
640 )?;
641
642 let window_manager = GuardedCell::new(window_manager_builder(fd)?);
644
645 Ok(JournalFile {
646 file,
647 header_map,
648 sanitized_header,
649 data_hash_table_map,
650 field_hash_table_map,
651 window_manager,
652 seal_options: None,
653 })
654 }
655
656 pub fn file(&self) -> &crate::repository::File {
657 &self.file
658 }
659
660 #[doc(hidden)]
661 pub fn mmap_stats(&self) -> Result<WindowManagerStats> {
662 Ok(self.window_manager.borrow_mut_checked()?.stats())
663 }
664
665 #[doc(hidden)]
666 pub fn reader_file_size(&self) -> Result<u64> {
667 Ok(self.window_manager.borrow_mut_checked()?.stats().file_size)
668 }
669
670 pub fn hash(&self, data: &[u8]) -> u64 {
671 self.hash_parts(PayloadParts::raw(data))
672 }
673
674 pub fn hash_parts(&self, payload: PayloadParts<'_>) -> u64 {
675 let is_keyed_hash = self
676 .journal_header_ref()
677 .has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash);
678
679 hash::journal_hash_data_parts(
680 payload.iter(),
681 is_keyed_hash,
682 if is_keyed_hash {
683 Some(&self.journal_header_ref().file_id)
684 } else {
685 None
686 },
687 )
688 }
689
690 pub fn entry_list(&self) -> Option<offset_array::List> {
691 let header = self.journal_header_ref();
692
693 header.entry_array_offset.and_then(|head_offset| {
694 std::num::NonZeroUsize::new(header.n_entries as usize)
695 .map(|total_items| offset_array::List::new(head_offset, total_items))
696 })
697 }
698
699 pub fn entry_offsets(&self, offsets: &mut Vec<NonZeroU64>) -> Result<()> {
700 if let Some(entry_list) = self.entry_list() {
701 entry_list.collect_offsets(self, offsets)?;
702 }
703
704 Ok(())
705 }
706
707 pub fn entry_data_object_offsets(
710 &self,
711 entry_offset: NonZeroU64,
712 offsets: &mut Vec<NonZeroU64>,
713 ) -> Result<()> {
714 let entry_guard = self.entry_ref(entry_offset)?;
715 entry_guard.collect_offsets(offsets)
716 }
717
718 pub fn journal_header_ref(&self) -> &JournalHeader {
719 if let Some(header) = &self.sanitized_header {
720 header
721 } else {
722 JournalHeader::ref_from_prefix(&self.header_map).unwrap().0
723 }
724 }
725
726 pub fn data_hash_table_map(&self) -> Option<&M> {
727 self.data_hash_table_map.as_ref()
728 }
729 pub fn field_hash_table_map(&self) -> Option<&M> {
730 self.field_hash_table_map.as_ref()
731 }
732
733 pub fn data_hash_table_ref(&self) -> Option<DataHashTable<&[u8]>> {
734 self.data_hash_table_map
735 .as_ref()
736 .and_then(|m| DataHashTable::<&[u8]>::from_data(m, false))
737 }
738
739 pub fn field_hash_table_ref(&self) -> Option<FieldHashTable<&[u8]>> {
740 self.field_hash_table_map
741 .as_ref()
742 .and_then(|m| FieldHashTable::<&[u8]>::from_data(m, false))
743 }
744
745 pub fn object_header_ref(&self, position: NonZeroU64) -> Result<&ObjectHeader> {
746 validate_offset_alignment(position)?;
747 let size_needed = std::mem::size_of::<ObjectHeader>() as u64;
748 let window_manager = self.window_manager.borrow_mut_checked()?;
749 let header_slice = window_manager.get_slice(position.get(), size_needed)?;
750 ObjectHeader::ref_from_bytes(header_slice).map_err(|_| JournalError::ZerocopyFailure)
751 }
752
753 pub fn read_bytes_at(&self, offset: u64, size: u64) -> Result<Vec<u8>> {
756 validate_offset_alignment(NonZeroU64::new(offset).ok_or(JournalError::InvalidOffset)?)?;
757 let window_manager = self.window_manager.borrow_mut_checked()?;
758 let src = window_manager.get_slice(offset, size)?;
759 Ok(src.to_vec())
760 }
761
762 #[doc(hidden)]
763 pub fn read_unaligned_bytes_at(&self, offset: u64, size: u64) -> Result<Vec<u8>> {
764 let window_manager = self.window_manager.borrow_mut_checked()?;
765 let src = window_manager.get_slice(offset, size)?;
766 Ok(src.to_vec())
767 }
768
769 fn journal_object_ref<'a, T>(&'a self, offset: NonZeroU64) -> Result<ValueGuard<'a, T>>
770 where
771 T: JournalObject<&'a [u8]>,
772 {
773 let journal_header = self.journal_header_ref();
774 let is_compact = journal_header.has_incompatible_flag(HeaderIncompatibleFlags::Compact);
775 let header_size = journal_header.header_size;
776 let arena_end = header_size + journal_header.arena_size;
777
778 validate_offset_alignment(offset)?;
779
780 if offset.get() < header_size {
782 return Err(JournalError::ObjectExceedsFileBounds);
783 }
784
785 self.window_manager.with_guarded(offset, |wm| {
786 let size_needed = {
788 let header_slice =
789 wm.get_slice(offset.get(), std::mem::size_of::<ObjectHeader>() as u64)?;
790 let header = ObjectHeader::ref_from_bytes(header_slice)
791 .map_err(|_| JournalError::ZerocopyFailure)?;
792 header.validated_size()?
793 };
794
795 let end_offset = offset
797 .get()
798 .checked_add(size_needed)
799 .ok_or(JournalError::ObjectExceedsFileBounds)?;
800 if end_offset > arena_end {
801 return Err(JournalError::ObjectExceedsFileBounds);
802 }
803
804 let data = wm.get_slice(offset.get(), size_needed)?;
806
807 let value = T::from_data(data, is_compact).ok_or(JournalError::ZerocopyFailure)?;
809
810 Ok(value)
811 })
812 }
813
814 pub fn offset_array_ref(
815 &self,
816 offset: NonZeroU64,
817 ) -> Result<ValueGuard<'_, OffsetArrayObject<&[u8]>>> {
818 self.journal_object_ref(offset)
819 }
820
821 pub fn field_ref(&self, offset: NonZeroU64) -> Result<ValueGuard<'_, FieldObject<&[u8]>>> {
822 self.journal_object_ref(offset)
823 }
824
825 pub fn entry_ref(&self, offset: NonZeroU64) -> Result<ValueGuard<'_, EntryObject<&[u8]>>> {
826 self.journal_object_ref(offset)
827 }
828
829 pub fn data_ref(&self, offset: NonZeroU64) -> Result<ValueGuard<'_, DataObject<&[u8]>>> {
830 self.journal_object_ref(offset)
831 }
832
833 pub fn tag_ref(&self, offset: NonZeroU64) -> Result<ValueGuard<'_, TagObject<&[u8]>>> {
834 self.journal_object_ref(offset)
835 }
836
837 pub fn find_field_offset(&self, hash: u64, payload: &[u8]) -> Result<Option<NonZeroU64>> {
838 let visitor = PayloadMatcher::field_matcher(payload, hash);
839 self.visit_bucket(self.field_hash_table_ref(), hash, visitor)
840 }
841
842 pub fn data_object_directed_partition_point<F>(
847 &self,
848 data_offset: NonZeroU64,
849 predicate: F,
850 direction: offset_array::Direction,
851 ) -> Result<Option<NonZeroU64>>
852 where
853 F: Fn(NonZeroU64) -> Result<bool>,
854 {
855 let Some(cursor) = self.data_ref(data_offset)?.inlined_cursor() else {
856 return Ok(None);
857 };
858
859 let Some(best_match) = cursor.directed_partition_point(self, predicate, direction)? else {
860 return Ok(None);
861 };
862
863 best_match.value(self)
864 }
865
866 pub fn fields(&self) -> FieldIterator<'_, M> {
868 let field_hash_table = self.field_hash_table_ref();
870
871 let mut iterator = FieldIterator {
873 journal: self,
874 field_hash_table,
875 current_bucket_index: 0,
876 next_field_offset: None,
877 };
878
879 iterator.advance_to_next_nonempty_bucket();
881
882 iterator
883 }
884
885 pub fn field_data_objects<'a>(
887 &'a self,
888 field_name: &'a [u8],
889 ) -> Result<FieldDataIterator<'a, M>> {
890 let field_hash = self.hash(field_name);
892 let Some(field_offset) = self.find_field_offset(field_hash, field_name)? else {
893 return Ok(FieldDataIterator {
894 journal: self,
895 current_data_offset: None,
896 });
897 };
898
899 let field_guard = self.field_ref(field_offset)?;
901 let head_data_offset = field_guard.header.head_data_offset;
902
903 Ok(FieldDataIterator {
905 journal: self,
906 current_data_offset: head_data_offset,
907 })
908 }
909
910 pub fn field_data_objects_with_offsets<'a>(
913 &'a self,
914 field_name: &'a [u8],
915 ) -> Result<FieldDataOffsetIterator<'a, M>> {
916 let field_hash = self.hash(field_name);
917 let Some(field_offset) = self.find_field_offset(field_hash, field_name)? else {
918 return Ok(FieldDataOffsetIterator {
919 journal: self,
920 current_data_offset: None,
921 });
922 };
923
924 let field_guard = self.field_ref(field_offset)?;
925 let head_data_offset = field_guard.header.head_data_offset;
926
927 Ok(FieldDataOffsetIterator {
928 journal: self,
929 current_data_offset: head_data_offset,
930 })
931 }
932
933 pub fn entry_data_objects(&self, entry_offset: NonZeroU64) -> Result<EntryDataIterator<'_, M>> {
935 let entry_guard = self.entry_ref(entry_offset)?;
937
938 let total_items = match &entry_guard.items {
940 EntryItemsType::Regular(items) => items.len(),
941 EntryItemsType::Compact(items) => items.len(),
942 };
943
944 Ok(EntryDataIterator {
946 journal: self,
947 entry_offset: Some(entry_offset),
948 current_index: 0,
949 total_items,
950 })
951 }
952
953 pub fn bucket_utilization(&self) -> Option<BucketUtilization> {
955 let data_hash_table = self.data_hash_table_ref()?;
956 let data_total = data_hash_table.items.len();
957 let data_occupied = data_hash_table
958 .items
959 .iter()
960 .filter(|item| item.head_hash_offset.is_some())
961 .count();
962
963 let field_hash_table = self.field_hash_table_ref()?;
964 let field_total = field_hash_table.items.len();
965 let field_occupied = field_hash_table
966 .items
967 .iter()
968 .filter(|item| item.head_hash_offset.is_some())
969 .count();
970
971 Some(BucketUtilization {
972 data_occupied,
973 data_total,
974 field_occupied,
975 field_total,
976 })
977 }
978
979 pub fn duration(&self) -> Option<Duration> {
982 let header = self.journal_header_ref();
983
984 if header.head_entry_realtime == 0 || header.tail_entry_realtime == 0 {
985 return None;
986 }
987
988 if header.tail_entry_realtime <= header.head_entry_realtime {
989 return None;
991 }
992
993 let duration_micros = header.tail_entry_realtime - header.head_entry_realtime;
994 Some(Duration::from_micros(duration_micros))
995 }
996}
997
998#[cfg(test)]
999#[path = "file_tests.rs"]
1000mod tests;