Skip to main content

journal_core/file/
file_payload.rs

1use super::file::{JournalFile, PayloadParts, validate_offset_alignment};
2use super::mmap::{MemoryMap, WindowManager};
3use super::object::*;
4use crate::error::{JournalError, Result};
5use crate::file::value_guard::ValueGuard;
6use std::num::NonZeroU64;
7use zerocopy::FromBytes;
8
9#[doc(hidden)]
10#[derive(Debug, Clone, Copy)]
11pub struct DataPayloadReadContext {
12    is_compact: bool,
13    header_size: u64,
14    arena_end: u64,
15    payload_prefix_size: u64,
16}
17
18#[doc(hidden)]
19#[derive(Debug, Clone, Copy)]
20pub struct DataPayloadObjectInfo {
21    size_needed: u64,
22    is_compressed: bool,
23}
24
25#[doc(hidden)]
26pub enum RowPinnedPayload<'a> {
27    Borrowed { ptr: *const u8, len: usize },
28    Decompressed(&'a [u8]),
29}
30
31#[doc(hidden)]
32#[derive(Debug, Clone, Copy)]
33struct DataLookupResult {
34    next_hash_offset: Option<NonZeroU64>,
35    matches: bool,
36}
37
38#[derive(Debug, Clone, Copy)]
39struct DataLookupHeader {
40    flags: u8,
41    size_needed: u64,
42    stored_hash: u64,
43    next_hash_offset: Option<NonZeroU64>,
44}
45
46impl DataLookupHeader {
47    fn is_compressed(self) -> bool {
48        (self.flags
49            & (ObjectFlags::CompressedZstd as u8
50                | ObjectFlags::CompressedLz4 as u8
51                | ObjectFlags::CompressedXz as u8))
52            != 0
53    }
54}
55
56impl DataPayloadObjectInfo {
57    pub fn is_compressed(self) -> bool {
58        self.is_compressed
59    }
60}
61
62fn parse_data_payload_object_header(header_slice: &[u8]) -> Result<DataPayloadObjectInfo> {
63    let object_header =
64        ObjectHeader::ref_from_bytes(header_slice).map_err(|_| JournalError::ZerocopyFailure)?;
65
66    if object_header.type_ != ObjectType::Data as u8 {
67        return Err(JournalError::InvalidObjectType);
68    }
69
70    Ok(DataPayloadObjectInfo {
71        size_needed: object_header.validated_size()?,
72        is_compressed: object_header.is_compressed(),
73    })
74}
75
76impl<M: MemoryMap> JournalFile<M> {
77    #[doc(hidden)]
78    pub fn data_payload_read_context(&self) -> DataPayloadReadContext {
79        let journal_header = self.journal_header_ref();
80        let is_compact = journal_header.has_incompatible_flag(HeaderIncompatibleFlags::Compact);
81        let payload_prefix_size = std::mem::size_of::<DataObjectHeader>() as u64
82            + if is_compact {
83                std::mem::size_of::<CompactDataFields>() as u64
84            } else {
85                0
86            };
87        DataPayloadReadContext {
88            is_compact,
89            header_size: journal_header.header_size,
90            arena_end: journal_header.header_size + journal_header.arena_size,
91            payload_prefix_size,
92        }
93    }
94
95    #[doc(hidden)]
96    pub fn visit_data_payload_at<F>(
97        &self,
98        offset: NonZeroU64,
99        decompressed: &mut Vec<u8>,
100        visitor: F,
101    ) -> Result<()>
102    where
103        F: FnOnce(&[u8]) -> Result<()>,
104    {
105        let context = self.data_payload_read_context();
106        self.visit_data_payload_at_with_context(context, offset, decompressed, visitor)
107    }
108
109    #[doc(hidden)]
110    pub fn visit_data_payload_at_with_context<F>(
111        &self,
112        context: DataPayloadReadContext,
113        offset: NonZeroU64,
114        decompressed: &mut Vec<u8>,
115        visitor: F,
116    ) -> Result<()>
117    where
118        F: FnOnce(&[u8]) -> Result<()>,
119    {
120        Self::validate_data_payload_offset(context, offset)?;
121        self.window_manager.with_mut(|wm| {
122            let info = Self::data_payload_info_from_window(wm, context, offset)?;
123            let data = Self::data_slice_from_window(wm, offset, info.size_needed)?;
124            if !info.is_compressed {
125                return visitor(&data[context.payload_prefix_size as usize..]);
126            }
127            let object = DataObject::from_data(data, context.is_compact)
128                .ok_or(JournalError::ZerocopyFailure)?;
129            decompressed.clear();
130            let len = object.decompress(decompressed)?;
131            visitor(&decompressed[..len])
132        })
133    }
134
135    #[doc(hidden)]
136    pub fn data_payload_object_info_at(
137        &self,
138        context: DataPayloadReadContext,
139        offset: NonZeroU64,
140    ) -> Result<DataPayloadObjectInfo> {
141        validate_offset_alignment(offset)?;
142        if offset.get() < context.header_size {
143            return Err(JournalError::ObjectExceedsFileBounds);
144        }
145
146        self.window_manager
147            .with_mut(|wm| Self::data_payload_info_from_window(wm, context, offset))
148    }
149
150    fn validate_data_payload_offset(
151        context: DataPayloadReadContext,
152        offset: NonZeroU64,
153    ) -> Result<()> {
154        validate_offset_alignment(offset)?;
155        if offset.get() < context.header_size {
156            return Err(JournalError::ObjectExceedsFileBounds);
157        }
158        Ok(())
159    }
160
161    fn data_payload_info_from_window(
162        wm: &mut WindowManager<M>,
163        context: DataPayloadReadContext,
164        offset: NonZeroU64,
165    ) -> Result<DataPayloadObjectInfo> {
166        let object_header_size = std::mem::size_of::<ObjectHeader>() as u64;
167        let header_slice = wm.get_slice(offset.get(), object_header_size)?;
168        let info = parse_data_payload_object_header(header_slice)?;
169        Self::validate_data_payload_info(context, offset, info)?;
170        Ok(info)
171    }
172
173    fn validate_data_payload_info(
174        context: DataPayloadReadContext,
175        offset: NonZeroU64,
176        info: DataPayloadObjectInfo,
177    ) -> Result<()> {
178        let end_offset = offset
179            .get()
180            .checked_add(info.size_needed)
181            .ok_or(JournalError::ObjectExceedsFileBounds)?;
182        if end_offset > context.arena_end {
183            return Err(JournalError::ObjectExceedsFileBounds);
184        }
185        if info.size_needed < context.payload_prefix_size {
186            return Err(JournalError::InvalidObjectSize(info.size_needed));
187        }
188        Ok(())
189    }
190
191    fn data_slice_from_window<'w>(
192        wm: &'w mut WindowManager<M>,
193        offset: NonZeroU64,
194        size_needed: u64,
195    ) -> Result<&'w [u8]> {
196        if wm.active_window_contains(offset.get(), size_needed) {
197            return Ok(wm.active_slice(offset.get(), size_needed));
198        }
199        wm.get_slice(offset.get(), size_needed)
200    }
201
202    #[doc(hidden)]
203    pub fn raw_data_payload_ref_with_info(
204        &self,
205        context: DataPayloadReadContext,
206        offset: NonZeroU64,
207        info: DataPayloadObjectInfo,
208    ) -> Result<ValueGuard<'_, &[u8]>> {
209        validate_offset_alignment(offset)?;
210        if offset.get() < context.header_size {
211            return Err(JournalError::ObjectExceedsFileBounds);
212        }
213        if info.is_compressed {
214            return Err(JournalError::InvalidObjectType);
215        }
216        if info.size_needed < context.payload_prefix_size {
217            return Err(JournalError::InvalidObjectSize(info.size_needed));
218        }
219
220        self.window_manager.with_guarded(offset, |wm| {
221            if wm.active_window_contains(offset.get(), info.size_needed) {
222                let data = wm.active_slice(offset.get(), info.size_needed);
223                return Ok(&data[context.payload_prefix_size as usize..]);
224            }
225            let data = wm.get_slice(offset.get(), info.size_needed)?;
226            Ok(&data[context.payload_prefix_size as usize..])
227        })
228    }
229
230    #[doc(hidden)]
231    /// Returns an unguarded pointer to an uncompressed DATA payload.
232    ///
233    /// The caller must only expose the pointer while it can prove the backing
234    /// mmap window will not be remapped or evicted. This is intended for
235    /// whole-file mmap row-scoped facade enumeration. Do not call this for
236    /// windowed mmap; use `raw_data_payload_ref_with_info()` or copy the
237    /// payload instead.
238    pub fn raw_data_payload_ptr_with_info_unguarded(
239        &self,
240        context: DataPayloadReadContext,
241        offset: NonZeroU64,
242        info: DataPayloadObjectInfo,
243    ) -> Result<(*const u8, usize)> {
244        validate_offset_alignment(offset)?;
245        if offset.get() < context.header_size {
246            return Err(JournalError::ObjectExceedsFileBounds);
247        }
248        if info.is_compressed {
249            return Err(JournalError::InvalidObjectType);
250        }
251        if info.size_needed < context.payload_prefix_size {
252            return Err(JournalError::InvalidObjectSize(info.size_needed));
253        }
254
255        self.window_manager.with_mut(|wm| {
256            let data =
257                if let Some(data) = wm.active_slice_if_contains(offset.get(), info.size_needed) {
258                    data
259                } else {
260                    wm.get_slice(offset.get(), info.size_needed)?
261                };
262            let payload = &data[context.payload_prefix_size as usize..];
263            Ok((payload.as_ptr(), payload.len()))
264        })
265    }
266
267    #[doc(hidden)]
268    /// Returns a pointer to an uncompressed DATA payload and pins the backing
269    /// mmap window until row pins are explicitly cleared.
270    pub fn raw_data_payload_ptr_with_info_row_pinned(
271        &self,
272        context: DataPayloadReadContext,
273        offset: NonZeroU64,
274        info: DataPayloadObjectInfo,
275    ) -> Result<(*const u8, usize)> {
276        validate_offset_alignment(offset)?;
277        if offset.get() < context.header_size {
278            return Err(JournalError::ObjectExceedsFileBounds);
279        }
280        if info.is_compressed {
281            return Err(JournalError::InvalidObjectType);
282        }
283        if info.size_needed < context.payload_prefix_size {
284            return Err(JournalError::InvalidObjectSize(info.size_needed));
285        }
286
287        self.window_manager.with_mut(|wm| {
288            let data = wm.get_row_pinned_slice(offset.get(), info.size_needed)?;
289            let payload = &data[context.payload_prefix_size as usize..];
290            Ok((payload.as_ptr(), payload.len()))
291        })
292    }
293
294    #[doc(hidden)]
295    /// Returns a row-pinned pointer when the DATA object is uncompressed.
296    /// Compressed DATA returns `Ok(None)` so the caller can take the
297    /// decompression path.
298    pub fn raw_data_payload_ptr_row_pinned_if_uncompressed(
299        &self,
300        context: DataPayloadReadContext,
301        offset: NonZeroU64,
302    ) -> Result<Option<(*const u8, usize)>> {
303        Self::validate_data_payload_offset(context, offset)?;
304
305        self.window_manager.with_mut(|wm| {
306            let info = Self::data_payload_info_from_window(wm, context, offset)?;
307            if info.is_compressed {
308                return Ok(None);
309            }
310            let data = wm.get_row_pinned_slice(offset.get(), info.size_needed)?;
311            let payload = &data[context.payload_prefix_size as usize..];
312            Ok(Some((payload.as_ptr(), payload.len())))
313        })
314    }
315
316    #[doc(hidden)]
317    pub fn clear_row_payload_pins(&self) -> Result<()> {
318        self.window_manager.with_mut(|wm| {
319            wm.clear_row_pins();
320            Ok(())
321        })
322    }
323
324    #[doc(hidden)]
325    pub fn visit_data_payloads_row_pinned_with_context<F>(
326        &self,
327        context: DataPayloadReadContext,
328        offsets: &[NonZeroU64],
329        decompressed: &mut Vec<u8>,
330        mut visitor: F,
331    ) -> Result<()>
332    where
333        F: FnMut(RowPinnedPayload<'_>) -> Result<()>,
334    {
335        self.window_manager.with_mut(|wm| {
336            for offset in offsets.iter().copied() {
337                Self::visit_data_payload_row_pinned_from_window(
338                    wm,
339                    context,
340                    offset,
341                    decompressed,
342                    &mut visitor,
343                )?;
344            }
345            Ok(())
346        })
347    }
348
349    fn visit_data_payload_row_pinned_from_window<F>(
350        wm: &mut WindowManager<M>,
351        context: DataPayloadReadContext,
352        offset: NonZeroU64,
353        decompressed: &mut Vec<u8>,
354        visitor: &mut F,
355    ) -> Result<()>
356    where
357        F: FnMut(RowPinnedPayload<'_>) -> Result<()>,
358    {
359        Self::validate_data_payload_offset(context, offset)?;
360        let info = Self::data_payload_info_from_window(wm, context, offset)?;
361        if info.is_compressed {
362            return Self::visit_compressed_row_payload(
363                wm,
364                context,
365                offset,
366                info,
367                decompressed,
368                visitor,
369            );
370        }
371        Self::visit_borrowed_row_payload(wm, context, offset, info, visitor)
372    }
373
374    fn visit_borrowed_row_payload<F>(
375        wm: &mut WindowManager<M>,
376        context: DataPayloadReadContext,
377        offset: NonZeroU64,
378        info: DataPayloadObjectInfo,
379        visitor: &mut F,
380    ) -> Result<()>
381    where
382        F: FnMut(RowPinnedPayload<'_>) -> Result<()>,
383    {
384        let data = wm.get_row_pinned_slice(offset.get(), info.size_needed)?;
385        let payload = &data[context.payload_prefix_size as usize..];
386        visitor(RowPinnedPayload::Borrowed {
387            ptr: payload.as_ptr(),
388            len: payload.len(),
389        })
390    }
391
392    fn visit_compressed_row_payload<F>(
393        wm: &mut WindowManager<M>,
394        context: DataPayloadReadContext,
395        offset: NonZeroU64,
396        info: DataPayloadObjectInfo,
397        decompressed: &mut Vec<u8>,
398        visitor: &mut F,
399    ) -> Result<()>
400    where
401        F: FnMut(RowPinnedPayload<'_>) -> Result<()>,
402    {
403        let data = Self::data_slice_from_window(wm, offset, info.size_needed)?;
404        let object =
405            DataObject::from_data(data, context.is_compact).ok_or(JournalError::ZerocopyFailure)?;
406        decompressed.clear();
407        let len = object.decompress(decompressed)?;
408        visitor(RowPinnedPayload::Decompressed(&decompressed[..len]))
409    }
410
411    pub fn find_data_offset(&self, hash: u64, payload: &[u8]) -> Result<Option<NonZeroU64>> {
412        self.find_data_offset_parts(hash, PayloadParts::raw(payload))
413    }
414
415    pub fn find_data_offset_parts(
416        &self,
417        hash: u64,
418        payload: PayloadParts<'_>,
419    ) -> Result<Option<NonZeroU64>> {
420        let hash_table = self
421            .data_hash_table_ref()
422            .ok_or(JournalError::MissingHashTable)?;
423        let context = self.data_payload_read_context();
424        let mut decompression_buffer = Vec::new();
425        let mut object_offset = hash_table.hash_item_ref(hash).head_hash_offset;
426
427        while let Some(offset) = object_offset {
428            let result = self.data_lookup_result_at(
429                context,
430                offset,
431                hash,
432                payload,
433                &mut decompression_buffer,
434            )?;
435            if result.matches {
436                return Ok(Some(offset));
437            }
438            object_offset = result.next_hash_offset;
439        }
440
441        Ok(None)
442    }
443
444    fn data_lookup_result_at(
445        &self,
446        context: DataPayloadReadContext,
447        offset: NonZeroU64,
448        hash: u64,
449        payload: PayloadParts<'_>,
450        decompression_buffer: &mut Vec<u8>,
451    ) -> Result<DataLookupResult> {
452        Self::validate_data_payload_offset(context, offset)?;
453        self.window_manager.with_mut(|wm| {
454            let lookup = Self::data_lookup_header_from_window(wm, context, offset)?;
455            if lookup.stored_hash != hash {
456                return Ok(DataLookupResult {
457                    next_hash_offset: lookup.next_hash_offset,
458                    matches: false,
459                });
460            }
461
462            let data = Self::data_slice_from_window(wm, offset, lookup.size_needed)?;
463            let matches = Self::data_lookup_payload_matches(
464                context,
465                lookup,
466                data,
467                payload,
468                decompression_buffer,
469            )?;
470            Ok(DataLookupResult {
471                next_hash_offset: lookup.next_hash_offset,
472                matches,
473            })
474        })
475    }
476
477    fn data_lookup_header_from_window(
478        wm: &mut WindowManager<M>,
479        context: DataPayloadReadContext,
480        offset: NonZeroU64,
481    ) -> Result<DataLookupHeader> {
482        let header_slice =
483            wm.get_slice(offset.get(), std::mem::size_of::<DataObjectHeader>() as u64)?;
484        Self::parse_data_lookup_header(context, offset, header_slice)
485    }
486
487    fn parse_data_lookup_header(
488        context: DataPayloadReadContext,
489        offset: NonZeroU64,
490        header_slice: &[u8],
491    ) -> Result<DataLookupHeader> {
492        if header_slice[0] != ObjectType::Data as u8 {
493            return Err(JournalError::InvalidObjectType);
494        }
495        let size_needed = u64::from_le_bytes(header_slice[8..16].try_into().unwrap());
496        if size_needed < std::mem::size_of::<DataObjectHeader>() as u64 {
497            return Err(JournalError::InvalidObjectSize(size_needed));
498        }
499        let info = DataPayloadObjectInfo {
500            size_needed,
501            is_compressed: false,
502        };
503        Self::validate_data_payload_info(context, offset, info)?;
504        Ok(DataLookupHeader {
505            flags: header_slice[1],
506            size_needed,
507            stored_hash: u64::from_le_bytes(header_slice[16..24].try_into().unwrap()),
508            next_hash_offset: NonZeroU64::new(u64::from_le_bytes(
509                header_slice[24..32].try_into().unwrap(),
510            )),
511        })
512    }
513
514    fn data_lookup_payload_matches(
515        context: DataPayloadReadContext,
516        lookup: DataLookupHeader,
517        data: &[u8],
518        payload: PayloadParts<'_>,
519        decompression_buffer: &mut Vec<u8>,
520    ) -> Result<bool> {
521        if lookup.is_compressed() {
522            let object = DataObject::from_data(data, context.is_compact)
523                .ok_or(JournalError::ZerocopyFailure)?;
524            decompression_buffer.clear();
525            let len = object.decompress(decompression_buffer)?;
526            return Ok(payload.equals_slice(&decompression_buffer[..len]));
527        }
528        let payload_start = context.payload_prefix_size as usize;
529        Ok(payload.equals_slice(&data[payload_start..]))
530    }
531}