1use super::file::{JournalFile, PayloadParts, validate_offset_alignment};
2use super::mmap::{MemoryMap, WindowManager};
3use super::object::*;
4use crate::error::{JournalError, Result};
5use crate::file::value_guard::ValueGuard;
6use std::num::NonZeroU64;
7use zerocopy::FromBytes;
8
9#[doc(hidden)]
10#[derive(Debug, Clone, Copy)]
11pub struct DataPayloadReadContext {
12 is_compact: bool,
13 header_size: u64,
14 arena_end: u64,
15 payload_prefix_size: u64,
16}
17
18#[doc(hidden)]
19#[derive(Debug, Clone, Copy)]
20pub struct DataPayloadObjectInfo {
21 size_needed: u64,
22 is_compressed: bool,
23}
24
25#[doc(hidden)]
26pub enum RowPinnedPayload<'a> {
27 Borrowed { ptr: *const u8, len: usize },
28 Decompressed(&'a [u8]),
29}
30
31#[doc(hidden)]
32#[derive(Debug, Clone, Copy)]
33struct DataLookupResult {
34 next_hash_offset: Option<NonZeroU64>,
35 matches: bool,
36}
37
38#[derive(Debug, Clone, Copy)]
39struct DataLookupHeader {
40 flags: u8,
41 size_needed: u64,
42 stored_hash: u64,
43 next_hash_offset: Option<NonZeroU64>,
44}
45
46impl DataLookupHeader {
47 fn is_compressed(self) -> bool {
48 (self.flags
49 & (ObjectFlags::CompressedZstd as u8
50 | ObjectFlags::CompressedLz4 as u8
51 | ObjectFlags::CompressedXz as u8))
52 != 0
53 }
54}
55
56impl DataPayloadObjectInfo {
57 pub fn is_compressed(self) -> bool {
58 self.is_compressed
59 }
60}
61
62fn parse_data_payload_object_header(header_slice: &[u8]) -> Result<DataPayloadObjectInfo> {
63 let object_header =
64 ObjectHeader::ref_from_bytes(header_slice).map_err(|_| JournalError::ZerocopyFailure)?;
65
66 if object_header.type_ != ObjectType::Data as u8 {
67 return Err(JournalError::InvalidObjectType);
68 }
69
70 Ok(DataPayloadObjectInfo {
71 size_needed: object_header.validated_size()?,
72 is_compressed: object_header.is_compressed(),
73 })
74}
75
76impl<M: MemoryMap> JournalFile<M> {
77 #[doc(hidden)]
78 pub fn data_payload_read_context(&self) -> DataPayloadReadContext {
79 let journal_header = self.journal_header_ref();
80 let is_compact = journal_header.has_incompatible_flag(HeaderIncompatibleFlags::Compact);
81 let payload_prefix_size = std::mem::size_of::<DataObjectHeader>() as u64
82 + if is_compact {
83 std::mem::size_of::<CompactDataFields>() as u64
84 } else {
85 0
86 };
87 DataPayloadReadContext {
88 is_compact,
89 header_size: journal_header.header_size,
90 arena_end: journal_header.header_size + journal_header.arena_size,
91 payload_prefix_size,
92 }
93 }
94
95 #[doc(hidden)]
96 pub fn visit_data_payload_at<F>(
97 &self,
98 offset: NonZeroU64,
99 decompressed: &mut Vec<u8>,
100 visitor: F,
101 ) -> Result<()>
102 where
103 F: FnOnce(&[u8]) -> Result<()>,
104 {
105 let context = self.data_payload_read_context();
106 self.visit_data_payload_at_with_context(context, offset, decompressed, visitor)
107 }
108
109 #[doc(hidden)]
110 pub fn visit_data_payload_at_with_context<F>(
111 &self,
112 context: DataPayloadReadContext,
113 offset: NonZeroU64,
114 decompressed: &mut Vec<u8>,
115 visitor: F,
116 ) -> Result<()>
117 where
118 F: FnOnce(&[u8]) -> Result<()>,
119 {
120 Self::validate_data_payload_offset(context, offset)?;
121 self.window_manager.with_mut(|wm| {
122 let info = Self::data_payload_info_from_window(wm, context, offset)?;
123 let data = Self::data_slice_from_window(wm, offset, info.size_needed)?;
124 if !info.is_compressed {
125 return visitor(&data[context.payload_prefix_size as usize..]);
126 }
127 let object = DataObject::from_data(data, context.is_compact)
128 .ok_or(JournalError::ZerocopyFailure)?;
129 decompressed.clear();
130 let len = object.decompress(decompressed)?;
131 visitor(&decompressed[..len])
132 })
133 }
134
135 #[doc(hidden)]
136 pub fn data_payload_object_info_at(
137 &self,
138 context: DataPayloadReadContext,
139 offset: NonZeroU64,
140 ) -> Result<DataPayloadObjectInfo> {
141 validate_offset_alignment(offset)?;
142 if offset.get() < context.header_size {
143 return Err(JournalError::ObjectExceedsFileBounds);
144 }
145
146 self.window_manager
147 .with_mut(|wm| Self::data_payload_info_from_window(wm, context, offset))
148 }
149
150 fn validate_data_payload_offset(
151 context: DataPayloadReadContext,
152 offset: NonZeroU64,
153 ) -> Result<()> {
154 validate_offset_alignment(offset)?;
155 if offset.get() < context.header_size {
156 return Err(JournalError::ObjectExceedsFileBounds);
157 }
158 Ok(())
159 }
160
161 fn data_payload_info_from_window(
162 wm: &mut WindowManager<M>,
163 context: DataPayloadReadContext,
164 offset: NonZeroU64,
165 ) -> Result<DataPayloadObjectInfo> {
166 let object_header_size = std::mem::size_of::<ObjectHeader>() as u64;
167 let header_slice = wm.get_slice(offset.get(), object_header_size)?;
168 let info = parse_data_payload_object_header(header_slice)?;
169 Self::validate_data_payload_info(context, offset, info)?;
170 Ok(info)
171 }
172
173 fn validate_data_payload_info(
174 context: DataPayloadReadContext,
175 offset: NonZeroU64,
176 info: DataPayloadObjectInfo,
177 ) -> Result<()> {
178 let end_offset = offset
179 .get()
180 .checked_add(info.size_needed)
181 .ok_or(JournalError::ObjectExceedsFileBounds)?;
182 if end_offset > context.arena_end {
183 return Err(JournalError::ObjectExceedsFileBounds);
184 }
185 if info.size_needed < context.payload_prefix_size {
186 return Err(JournalError::InvalidObjectSize(info.size_needed));
187 }
188 Ok(())
189 }
190
191 fn data_slice_from_window<'w>(
192 wm: &'w mut WindowManager<M>,
193 offset: NonZeroU64,
194 size_needed: u64,
195 ) -> Result<&'w [u8]> {
196 if wm.active_window_contains(offset.get(), size_needed) {
197 return Ok(wm.active_slice(offset.get(), size_needed));
198 }
199 wm.get_slice(offset.get(), size_needed)
200 }
201
202 #[doc(hidden)]
203 pub fn raw_data_payload_ref_with_info(
204 &self,
205 context: DataPayloadReadContext,
206 offset: NonZeroU64,
207 info: DataPayloadObjectInfo,
208 ) -> Result<ValueGuard<'_, &[u8]>> {
209 validate_offset_alignment(offset)?;
210 if offset.get() < context.header_size {
211 return Err(JournalError::ObjectExceedsFileBounds);
212 }
213 if info.is_compressed {
214 return Err(JournalError::InvalidObjectType);
215 }
216 if info.size_needed < context.payload_prefix_size {
217 return Err(JournalError::InvalidObjectSize(info.size_needed));
218 }
219
220 self.window_manager.with_guarded(offset, |wm| {
221 if wm.active_window_contains(offset.get(), info.size_needed) {
222 let data = wm.active_slice(offset.get(), info.size_needed);
223 return Ok(&data[context.payload_prefix_size as usize..]);
224 }
225 let data = wm.get_slice(offset.get(), info.size_needed)?;
226 Ok(&data[context.payload_prefix_size as usize..])
227 })
228 }
229
230 #[doc(hidden)]
231 pub fn raw_data_payload_ptr_with_info_unguarded(
239 &self,
240 context: DataPayloadReadContext,
241 offset: NonZeroU64,
242 info: DataPayloadObjectInfo,
243 ) -> Result<(*const u8, usize)> {
244 validate_offset_alignment(offset)?;
245 if offset.get() < context.header_size {
246 return Err(JournalError::ObjectExceedsFileBounds);
247 }
248 if info.is_compressed {
249 return Err(JournalError::InvalidObjectType);
250 }
251 if info.size_needed < context.payload_prefix_size {
252 return Err(JournalError::InvalidObjectSize(info.size_needed));
253 }
254
255 self.window_manager.with_mut(|wm| {
256 let data =
257 if let Some(data) = wm.active_slice_if_contains(offset.get(), info.size_needed) {
258 data
259 } else {
260 wm.get_slice(offset.get(), info.size_needed)?
261 };
262 let payload = &data[context.payload_prefix_size as usize..];
263 Ok((payload.as_ptr(), payload.len()))
264 })
265 }
266
267 #[doc(hidden)]
268 pub fn raw_data_payload_ptr_with_info_row_pinned(
271 &self,
272 context: DataPayloadReadContext,
273 offset: NonZeroU64,
274 info: DataPayloadObjectInfo,
275 ) -> Result<(*const u8, usize)> {
276 validate_offset_alignment(offset)?;
277 if offset.get() < context.header_size {
278 return Err(JournalError::ObjectExceedsFileBounds);
279 }
280 if info.is_compressed {
281 return Err(JournalError::InvalidObjectType);
282 }
283 if info.size_needed < context.payload_prefix_size {
284 return Err(JournalError::InvalidObjectSize(info.size_needed));
285 }
286
287 self.window_manager.with_mut(|wm| {
288 let data = wm.get_row_pinned_slice(offset.get(), info.size_needed)?;
289 let payload = &data[context.payload_prefix_size as usize..];
290 Ok((payload.as_ptr(), payload.len()))
291 })
292 }
293
294 #[doc(hidden)]
295 pub fn raw_data_payload_ptr_row_pinned_if_uncompressed(
299 &self,
300 context: DataPayloadReadContext,
301 offset: NonZeroU64,
302 ) -> Result<Option<(*const u8, usize)>> {
303 Self::validate_data_payload_offset(context, offset)?;
304
305 self.window_manager.with_mut(|wm| {
306 let info = Self::data_payload_info_from_window(wm, context, offset)?;
307 if info.is_compressed {
308 return Ok(None);
309 }
310 let data = wm.get_row_pinned_slice(offset.get(), info.size_needed)?;
311 let payload = &data[context.payload_prefix_size as usize..];
312 Ok(Some((payload.as_ptr(), payload.len())))
313 })
314 }
315
316 #[doc(hidden)]
317 pub fn clear_row_payload_pins(&self) -> Result<()> {
318 self.window_manager.with_mut(|wm| {
319 wm.clear_row_pins();
320 Ok(())
321 })
322 }
323
324 #[doc(hidden)]
325 pub fn visit_data_payloads_row_pinned_with_context<F>(
326 &self,
327 context: DataPayloadReadContext,
328 offsets: &[NonZeroU64],
329 decompressed: &mut Vec<u8>,
330 mut visitor: F,
331 ) -> Result<()>
332 where
333 F: FnMut(RowPinnedPayload<'_>) -> Result<()>,
334 {
335 self.window_manager.with_mut(|wm| {
336 for offset in offsets.iter().copied() {
337 Self::visit_data_payload_row_pinned_from_window(
338 wm,
339 context,
340 offset,
341 decompressed,
342 &mut visitor,
343 )?;
344 }
345 Ok(())
346 })
347 }
348
349 fn visit_data_payload_row_pinned_from_window<F>(
350 wm: &mut WindowManager<M>,
351 context: DataPayloadReadContext,
352 offset: NonZeroU64,
353 decompressed: &mut Vec<u8>,
354 visitor: &mut F,
355 ) -> Result<()>
356 where
357 F: FnMut(RowPinnedPayload<'_>) -> Result<()>,
358 {
359 Self::validate_data_payload_offset(context, offset)?;
360 let info = Self::data_payload_info_from_window(wm, context, offset)?;
361 if info.is_compressed {
362 return Self::visit_compressed_row_payload(
363 wm,
364 context,
365 offset,
366 info,
367 decompressed,
368 visitor,
369 );
370 }
371 Self::visit_borrowed_row_payload(wm, context, offset, info, visitor)
372 }
373
374 fn visit_borrowed_row_payload<F>(
375 wm: &mut WindowManager<M>,
376 context: DataPayloadReadContext,
377 offset: NonZeroU64,
378 info: DataPayloadObjectInfo,
379 visitor: &mut F,
380 ) -> Result<()>
381 where
382 F: FnMut(RowPinnedPayload<'_>) -> Result<()>,
383 {
384 let data = wm.get_row_pinned_slice(offset.get(), info.size_needed)?;
385 let payload = &data[context.payload_prefix_size as usize..];
386 visitor(RowPinnedPayload::Borrowed {
387 ptr: payload.as_ptr(),
388 len: payload.len(),
389 })
390 }
391
392 fn visit_compressed_row_payload<F>(
393 wm: &mut WindowManager<M>,
394 context: DataPayloadReadContext,
395 offset: NonZeroU64,
396 info: DataPayloadObjectInfo,
397 decompressed: &mut Vec<u8>,
398 visitor: &mut F,
399 ) -> Result<()>
400 where
401 F: FnMut(RowPinnedPayload<'_>) -> Result<()>,
402 {
403 let data = Self::data_slice_from_window(wm, offset, info.size_needed)?;
404 let object =
405 DataObject::from_data(data, context.is_compact).ok_or(JournalError::ZerocopyFailure)?;
406 decompressed.clear();
407 let len = object.decompress(decompressed)?;
408 visitor(RowPinnedPayload::Decompressed(&decompressed[..len]))
409 }
410
411 pub fn find_data_offset(&self, hash: u64, payload: &[u8]) -> Result<Option<NonZeroU64>> {
412 self.find_data_offset_parts(hash, PayloadParts::raw(payload))
413 }
414
415 pub fn find_data_offset_parts(
416 &self,
417 hash: u64,
418 payload: PayloadParts<'_>,
419 ) -> Result<Option<NonZeroU64>> {
420 let hash_table = self
421 .data_hash_table_ref()
422 .ok_or(JournalError::MissingHashTable)?;
423 let context = self.data_payload_read_context();
424 let mut decompression_buffer = Vec::new();
425 let mut object_offset = hash_table.hash_item_ref(hash).head_hash_offset;
426
427 while let Some(offset) = object_offset {
428 let result = self.data_lookup_result_at(
429 context,
430 offset,
431 hash,
432 payload,
433 &mut decompression_buffer,
434 )?;
435 if result.matches {
436 return Ok(Some(offset));
437 }
438 object_offset = result.next_hash_offset;
439 }
440
441 Ok(None)
442 }
443
444 fn data_lookup_result_at(
445 &self,
446 context: DataPayloadReadContext,
447 offset: NonZeroU64,
448 hash: u64,
449 payload: PayloadParts<'_>,
450 decompression_buffer: &mut Vec<u8>,
451 ) -> Result<DataLookupResult> {
452 Self::validate_data_payload_offset(context, offset)?;
453 self.window_manager.with_mut(|wm| {
454 let lookup = Self::data_lookup_header_from_window(wm, context, offset)?;
455 if lookup.stored_hash != hash {
456 return Ok(DataLookupResult {
457 next_hash_offset: lookup.next_hash_offset,
458 matches: false,
459 });
460 }
461
462 let data = Self::data_slice_from_window(wm, offset, lookup.size_needed)?;
463 let matches = Self::data_lookup_payload_matches(
464 context,
465 lookup,
466 data,
467 payload,
468 decompression_buffer,
469 )?;
470 Ok(DataLookupResult {
471 next_hash_offset: lookup.next_hash_offset,
472 matches,
473 })
474 })
475 }
476
477 fn data_lookup_header_from_window(
478 wm: &mut WindowManager<M>,
479 context: DataPayloadReadContext,
480 offset: NonZeroU64,
481 ) -> Result<DataLookupHeader> {
482 let header_slice =
483 wm.get_slice(offset.get(), std::mem::size_of::<DataObjectHeader>() as u64)?;
484 Self::parse_data_lookup_header(context, offset, header_slice)
485 }
486
487 fn parse_data_lookup_header(
488 context: DataPayloadReadContext,
489 offset: NonZeroU64,
490 header_slice: &[u8],
491 ) -> Result<DataLookupHeader> {
492 if header_slice[0] != ObjectType::Data as u8 {
493 return Err(JournalError::InvalidObjectType);
494 }
495 let size_needed = u64::from_le_bytes(header_slice[8..16].try_into().unwrap());
496 if size_needed < std::mem::size_of::<DataObjectHeader>() as u64 {
497 return Err(JournalError::InvalidObjectSize(size_needed));
498 }
499 let info = DataPayloadObjectInfo {
500 size_needed,
501 is_compressed: false,
502 };
503 Self::validate_data_payload_info(context, offset, info)?;
504 Ok(DataLookupHeader {
505 flags: header_slice[1],
506 size_needed,
507 stored_hash: u64::from_le_bytes(header_slice[16..24].try_into().unwrap()),
508 next_hash_offset: NonZeroU64::new(u64::from_le_bytes(
509 header_slice[24..32].try_into().unwrap(),
510 )),
511 })
512 }
513
514 fn data_lookup_payload_matches(
515 context: DataPayloadReadContext,
516 lookup: DataLookupHeader,
517 data: &[u8],
518 payload: PayloadParts<'_>,
519 decompression_buffer: &mut Vec<u8>,
520 ) -> Result<bool> {
521 if lookup.is_compressed() {
522 let object = DataObject::from_data(data, context.is_compact)
523 .ok_or(JournalError::ZerocopyFailure)?;
524 decompression_buffer.clear();
525 let len = object.decompress(decompression_buffer)?;
526 return Ok(payload.equals_slice(&decompression_buffer[..len]));
527 }
528 let payload_start = context.payload_prefix_size as usize;
529 Ok(payload.equals_slice(&data[payload_start..]))
530 }
531}