1use super::file::{
2 Compression, JOURNAL_COMPACT_SIZE_MAX, JournalFile, JournalFileOptions, OBJECT_ALIGNMENT,
3 map_hash_table, round_up_to_file_size_increment, validate_offset_alignment,
4};
5use super::mmap::{MemoryMap, MemoryMapMut, WindowManager};
6use super::object::*;
7use crate::error::{JournalError, Result};
8use crate::file::guarded_cell::GuardedCell;
9use crate::file::value_guard::ValueGuard;
10use std::fs::{File, OpenOptions};
11use std::num::NonZeroU64;
12#[cfg(unix)]
13use std::os::unix::fs::OpenOptionsExt;
14use zerocopy::FromBytes;
15
16#[derive(Debug, Clone, Copy)]
17struct CreateLayout {
18 data_hash_table_size: usize,
19 field_hash_table_size: usize,
20 data_hash_table_offset: u64,
21 field_hash_table_offset: u64,
22 data_hash_table_object_offset: u64,
23 file_size: u64,
24}
25
26#[derive(Debug, Clone, Copy)]
27struct MutableObjectContext {
28 object_type: ObjectType,
29 is_compact: bool,
30 arena_end: u64,
31}
32
33impl JournalFile<super::mmap::MmapMut> {
34 pub fn open_for_append(file: &crate::repository::File, window_size: u64) -> Result<Self> {
35 debug_assert_eq!(window_size % OBJECT_ALIGNMENT, 0);
36
37 let fd = OpenOptions::new()
38 .read(true)
39 .write(true)
40 .open(file.path())?;
41
42 let header_size = std::mem::size_of::<JournalHeader>() as u64;
43 let header_map = super::mmap::MmapMut::create(&fd, 0, header_size)?;
44 let header = JournalHeader::ref_from_prefix(&header_map).unwrap().0;
45 if header.signature != *b"LPKSHHRH" {
46 return Err(JournalError::InvalidMagicNumber);
47 }
48 if header.header_size < header_size {
49 return Err(JournalError::UnsupportedJournalFile);
50 }
51 if !header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash) {
52 return Err(JournalError::UnsupportedJournalFile);
53 }
54
55 let data_hash_table_map = map_hash_table(
56 &fd,
57 header.header_size,
58 header.data_hash_table_offset,
59 header.data_hash_table_size,
60 )?;
61 let field_hash_table_map = map_hash_table(
62 &fd,
63 header.header_size,
64 header.field_hash_table_offset,
65 header.field_hash_table_size,
66 )?;
67
68 let window_manager =
69 GuardedCell::new(WindowManager::new_writer_owned(fd, window_size, 32)?);
70
71 Ok(JournalFile {
72 file: file.clone(),
73 header_map,
74 sanitized_header: None,
75 data_hash_table_map,
76 field_hash_table_map,
77 window_manager,
78 seal_options: None,
79 })
80 }
81}
82
83impl<M: MemoryMapMut> JournalFile<M> {
84 pub fn sync(&mut self) -> Result<()> {
90 self.header_map.flush()?;
92
93 let (logical_size, header_size) = {
95 let header = self.journal_header_ref();
96 (header.header_size + header.arena_size, header.header_size)
97 };
98 let header_bytes = self.header_map[..header_size as usize].to_vec();
99 let window_manager = self.window_manager.get_mut();
100 window_manager.sync(logical_size, &header_bytes)?;
101
102 Ok(())
103 }
104
105 pub fn post_change(&mut self) -> Result<()> {
107 let logical_size = {
108 let header = self.journal_header_ref();
109 header.header_size + header.arena_size
110 };
111 self.window_manager.get_mut().post_change(logical_size)
112 }
113
114 pub fn create_successor(
116 &self,
117 file: &crate::repository::File,
118 max_file_size: Option<u64>,
119 ) -> Result<Self> {
120 self.create_successor_with_file_mode(file, max_file_size, self.current_file_mode())
121 }
122
123 pub fn create_successor_with_file_mode(
124 &self,
125 file: &crate::repository::File,
126 max_file_size: Option<u64>,
127 file_mode: u32,
128 ) -> Result<Self> {
129 let header = self.journal_header_ref();
130 let bucket_utilization = self.bucket_utilization();
131
132 let options = JournalFileOptions::new(
133 uuid::Uuid::from_bytes(header.machine_id),
134 uuid::Uuid::from_bytes(header.tail_entry_boot_id),
135 uuid::Uuid::from_bytes(header.seqnum_id),
136 )
137 .with_window_size(8 * 1024 * 1024)
138 .with_keyed_hash(header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash))
139 .with_compact(header.has_incompatible_flag(HeaderIncompatibleFlags::Compact))
140 .with_file_mode(file_mode)
141 .with_optimized_buckets(bucket_utilization, max_file_size);
142
143 let options = if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedZstd) {
144 options.with_compression(Compression::Zstd)
145 } else if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedXz) {
146 options.with_compression(Compression::Xz)
147 } else if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedLz4) {
148 options.with_compression(Compression::Lz4)
149 } else {
150 options
151 };
152
153 Self::create(file, options)
154 }
155
156 pub fn create(file: &crate::repository::File, options: JournalFileOptions) -> Result<Self> {
157 let fd = Self::open_new_file(file, options.file_mode)?;
158 let layout = Self::create_layout(&options)?;
159 if options.compact && layout.file_size > JOURNAL_COMPACT_SIZE_MAX {
160 return Err(JournalError::ObjectExceedsFileBounds);
161 }
162 fd.set_len(layout.file_size)?;
163 let mut header = Self::create_header(&options, layout);
164 let data_hash_table_map = map_hash_table(
165 &fd,
166 header.header_size,
167 header.data_hash_table_offset,
168 header.data_hash_table_size,
169 )?;
170 let field_hash_table_map = map_hash_table(
171 &fd,
172 header.header_size,
173 header.field_hash_table_offset,
174 header.field_hash_table_size,
175 )?;
176 let header_map = Self::create_header_map(&fd, &mut header)?;
177 let window_manager = GuardedCell::new(WindowManager::new_writer_owned_with_strategy(
178 fd,
179 options.window_size,
180 32,
181 options.experimental_mmap_strategy,
182 )?);
183
184 let mut jf = JournalFile {
185 file: file.clone(),
186 header_map,
187 sanitized_header: None,
188 data_hash_table_map,
189 field_hash_table_map,
190 window_manager,
191 seal_options: options.seal.clone(),
192 };
193
194 jf.write_initial_hash_table_headers(header)?;
195 jf.sync()?;
196 Ok(jf)
197 }
198
199 fn current_file_mode(&self) -> u32 {
200 #[cfg(unix)]
201 {
202 use std::os::unix::fs::PermissionsExt;
203 if let Ok(metadata) = std::fs::metadata(self.file.path()) {
204 return metadata.permissions().mode() & 0o777;
205 }
206 }
207 super::file::DEFAULT_JOURNAL_FILE_MODE
208 }
209
210 fn open_new_file(file: &crate::repository::File, mode: u32) -> Result<File> {
211 let mut open_options = OpenOptions::new();
212 open_options
213 .create(true)
214 .truncate(true)
215 .read(true)
216 .write(true);
217 #[cfg(unix)]
218 open_options.mode(mode);
219 Ok(open_options.open(file.path())?)
220 }
221
222 fn create_layout(options: &JournalFileOptions) -> Result<CreateLayout> {
223 let data_hash_table_size =
224 options.data_hash_table_buckets * std::mem::size_of::<HashItem>();
225 let field_hash_table_size =
226 options.field_hash_table_buckets * std::mem::size_of::<HashItem>();
227 let field_hash_table_offset = std::mem::size_of::<JournalHeader>() as u64
228 + std::mem::size_of::<ObjectHeader>() as u64;
229 let data_hash_table_offset = field_hash_table_offset
230 + field_hash_table_size as u64
231 + std::mem::size_of::<ObjectHeader>() as u64;
232 let data_hash_table_object_offset =
233 data_hash_table_offset - std::mem::size_of::<ObjectHeader>() as u64;
234 let append_offset = data_hash_table_offset + data_hash_table_size as u64;
235 let file_size = round_up_to_file_size_increment(append_offset)?;
236 Ok(CreateLayout {
237 data_hash_table_size,
238 field_hash_table_size,
239 data_hash_table_offset,
240 field_hash_table_offset,
241 data_hash_table_object_offset,
242 file_size,
243 })
244 }
245
246 fn create_header(options: &JournalFileOptions, layout: CreateLayout) -> JournalHeader {
247 let mut header = JournalHeader::default();
248 header.signature = *b"LPKSHHRH";
249 header.compatible_flags = HeaderCompatibleFlags::TailEntryBootId as u32;
250 if options.enable_keyed_hash {
251 header.incompatible_flags |= HeaderIncompatibleFlags::KeyedHash as u32;
252 }
253 header.incompatible_flags |= options.compression.as_incompatible_flag();
254 if options.compact {
255 header.incompatible_flags |= HeaderIncompatibleFlags::Compact as u32;
256 }
257 if options.seal.is_some() {
258 header.compatible_flags |= HeaderCompatibleFlags::Sealed as u32;
259 header.compatible_flags |= HeaderCompatibleFlags::SealedContinuous as u32;
260 }
261 header.data_hash_table_offset = NonZeroU64::new(layout.data_hash_table_offset);
262 header.data_hash_table_size = NonZeroU64::new(layout.data_hash_table_size as u64);
263 header.field_hash_table_offset = NonZeroU64::new(layout.field_hash_table_offset);
264 header.field_hash_table_size = NonZeroU64::new(layout.field_hash_table_size as u64);
265 header.tail_object_offset = NonZeroU64::new(layout.data_hash_table_object_offset);
266 header.header_size = std::mem::size_of::<JournalHeader>() as u64;
267 header.n_objects = 2;
268 header.arena_size = layout.file_size - header.header_size;
269 header.machine_id = *options.machine_id.as_bytes();
270 header.file_id = *options.file_id.as_bytes();
271 header.seqnum_id = *options.seqnum_id.as_bytes();
272 header
273 }
274
275 fn create_header_map(fd: &File, header: &mut JournalHeader) -> Result<M> {
276 let header_size = std::mem::size_of::<JournalHeader>() as u64;
277 let mut header_map = M::create(fd, 0, header_size)?;
278 {
279 let header_mut = JournalHeader::mut_from_prefix(&mut header_map).unwrap().0;
280 *header_mut = *header;
281 header_mut.state = JournalState::Online as u8;
282 header.state = JournalState::Online as u8;
283 }
284 Ok(header_map)
285 }
286
287 fn write_initial_hash_table_headers(&mut self, header: JournalHeader) -> Result<()> {
288 self.write_hash_table_object_header(
289 header.data_hash_table_offset.unwrap(),
290 header.data_hash_table_size.unwrap(),
291 ObjectType::DataHashTable,
292 )?;
293 self.write_hash_table_object_header(
294 header.field_hash_table_offset.unwrap(),
295 header.field_hash_table_size.unwrap(),
296 ObjectType::FieldHashTable,
297 )
298 }
299
300 fn write_hash_table_object_header(
301 &self,
302 table_offset: NonZeroU64,
303 table_size: NonZeroU64,
304 object_type: ObjectType,
305 ) -> Result<()> {
306 let object_offset =
307 NonZeroU64::new(table_offset.get() - std::mem::size_of::<ObjectHeader>() as u64)
308 .unwrap();
309 let object_header = self.object_header_mut(object_offset)?;
310 object_header.type_ = object_type as u8;
311 object_header.size = table_size.get() + std::mem::size_of::<ObjectHeader>() as u64;
312 Ok(())
313 }
314
315 pub fn journal_header_mut(&mut self) -> &mut JournalHeader {
316 JournalHeader::mut_from_prefix(&mut self.header_map)
317 .unwrap()
318 .0
319 }
320
321 pub fn data_hash_table_mut(&mut self) -> Option<DataHashTable<&mut [u8]>> {
322 self.data_hash_table_map
323 .as_mut()
324 .and_then(|m| DataHashTable::<&mut [u8]>::from_data_mut(m, false))
325 }
326
327 pub fn field_hash_table_mut(&mut self) -> Option<FieldHashTable<&mut [u8]>> {
328 self.field_hash_table_map
329 .as_mut()
330 .and_then(|m| FieldHashTable::<&mut [u8]>::from_data_mut(m, false))
331 }
332
333 #[allow(clippy::mut_from_ref)]
334 fn object_header_mut(&self, offset: NonZeroU64) -> Result<&mut ObjectHeader> {
335 validate_offset_alignment(offset)?;
336 let size_needed = std::mem::size_of::<ObjectHeader>() as u64;
337 let window_manager = self.window_manager.borrow_mut_checked()?;
338 let header_slice = window_manager.get_slice_mut(offset.get(), size_needed)?;
339 ObjectHeader::mut_from_bytes(header_slice).map_err(|_| JournalError::ZerocopyFailure)
340 }
341
342 fn journal_object_mut<'a, T>(
343 &'a self,
344 type_: ObjectType,
345 offset: NonZeroU64,
346 size: Option<u64>,
347 ) -> Result<ValueGuard<'a, T>>
348 where
349 T: JournalObjectMut<&'a mut [u8]>,
350 {
351 let context = self.mutable_object_context(type_, offset)?;
352 self.window_manager.with_guarded(offset, |wm| {
353 let size_needed = Self::mutable_object_size(wm, context, offset, size)?;
354 let data = wm.get_slice_mut(offset.get(), size_needed)?;
355 let value =
356 T::from_data_mut(data, context.is_compact).ok_or(JournalError::ZerocopyFailure)?;
357 Ok(value)
358 })
359 }
360
361 fn mutable_object_context(
362 &self,
363 object_type: ObjectType,
364 offset: NonZeroU64,
365 ) -> Result<MutableObjectContext> {
366 validate_offset_alignment(offset)?;
367 let journal_header = self.journal_header_ref();
368 let header_size = journal_header.header_size;
369 if offset.get() < header_size {
370 return Err(JournalError::ObjectExceedsFileBounds);
371 }
372 Ok(MutableObjectContext {
373 object_type,
374 is_compact: journal_header.has_incompatible_flag(HeaderIncompatibleFlags::Compact),
375 arena_end: header_size + journal_header.arena_size,
376 })
377 }
378
379 fn mutable_object_size(
380 wm: &mut WindowManager<M>,
381 context: MutableObjectContext,
382 offset: NonZeroU64,
383 size: Option<u64>,
384 ) -> Result<u64> {
385 match size {
386 Some(size) => Self::initialize_mutable_object_header(wm, context, offset, size),
387 None => Self::existing_mutable_object_size(wm, context, offset),
388 }
389 }
390
391 fn initialize_mutable_object_header(
392 wm: &mut WindowManager<M>,
393 context: MutableObjectContext,
394 offset: NonZeroU64,
395 size: u64,
396 ) -> Result<u64> {
397 let header_slice =
398 wm.get_slice_mut(offset.get(), std::mem::size_of::<ObjectHeader>() as u64)?;
399 let header = ObjectHeader::mut_from_bytes(header_slice)
400 .map_err(|_| JournalError::ZerocopyFailure)?;
401 header.type_ = context.object_type as u8;
402 header.size = size;
403 Ok(size)
404 }
405
406 fn existing_mutable_object_size(
407 wm: &mut WindowManager<M>,
408 context: MutableObjectContext,
409 offset: NonZeroU64,
410 ) -> Result<u64> {
411 let header_slice =
412 wm.get_slice(offset.get(), std::mem::size_of::<ObjectHeader>() as u64)?;
413 let header = ObjectHeader::ref_from_bytes(header_slice)
414 .map_err(|_| JournalError::ZerocopyFailure)?;
415 if header.type_ != context.object_type as u8 {
416 return Err(JournalError::InvalidObjectType);
417 }
418 let size_needed = header.validated_size()?;
419 Self::validate_mutable_object_bounds(context, offset, size_needed)?;
420 Ok(size_needed)
421 }
422
423 fn validate_mutable_object_bounds(
424 context: MutableObjectContext,
425 offset: NonZeroU64,
426 size_needed: u64,
427 ) -> Result<()> {
428 let end_offset = offset
429 .get()
430 .checked_add(size_needed)
431 .ok_or(JournalError::ObjectExceedsFileBounds)?;
432 if end_offset > context.arena_end {
433 return Err(JournalError::ObjectExceedsFileBounds);
434 }
435 Ok(())
436 }
437
438 pub fn offset_array_mut(
439 &self,
440 offset: NonZeroU64,
441 capacity: Option<NonZeroU64>,
442 ) -> Result<ValueGuard<'_, OffsetArrayObject<&mut [u8]>>> {
443 let size = capacity.map(|c| {
444 let mut size = std::mem::size_of::<OffsetArrayObjectHeader>() as u64;
445
446 let is_compact = self
447 .journal_header_ref()
448 .has_incompatible_flag(HeaderIncompatibleFlags::Compact);
449 if is_compact {
450 size += c.get() * std::mem::size_of::<u32>() as u64;
451 } else {
452 size += c.get() * std::mem::size_of::<u64>() as u64;
453 }
454
455 size
456 });
457
458 self.journal_object_mut(ObjectType::EntryArray, offset, size)
459 }
460
461 pub fn field_mut(
462 &self,
463 offset: NonZeroU64,
464 size: Option<u64>,
465 ) -> Result<ValueGuard<'_, FieldObject<&mut [u8]>>> {
466 let size = size.map(|n| std::mem::size_of::<FieldObjectHeader>() as u64 + n);
467 self.journal_object_mut(ObjectType::Field, offset, size)
468 }
469
470 pub fn entry_mut(
471 &self,
472 offset: NonZeroU64,
473 size: Option<u64>,
474 ) -> Result<ValueGuard<'_, EntryObject<&mut [u8]>>> {
475 let size = size.map(|n| std::mem::size_of::<EntryObjectHeader>() as u64 + n);
476 self.journal_object_mut(ObjectType::Entry, offset, size)
477 }
478
479 pub fn data_mut(
480 &self,
481 offset: NonZeroU64,
482 size: Option<u64>,
483 ) -> Result<ValueGuard<'_, DataObject<&mut [u8]>>> {
484 let size = size.map(|n| {
485 let mut size = std::mem::size_of::<DataObjectHeader>() as u64 + n;
486 if self
487 .journal_header_ref()
488 .has_incompatible_flag(HeaderIncompatibleFlags::Compact)
489 {
490 size += std::mem::size_of::<CompactDataFields>() as u64;
491 }
492 size
493 });
494 self.journal_object_mut(ObjectType::Data, offset, size)
495 }
496
497 pub fn tag_mut(
498 &self,
499 offset: NonZeroU64,
500 new: bool,
501 ) -> Result<ValueGuard<'_, TagObject<&mut [u8]>>> {
502 let size = if new {
503 Some(std::mem::size_of::<TagObjectHeader>() as u64)
504 } else {
505 None
506 };
507 self.journal_object_mut(ObjectType::Tag, offset, size)
508 }
509}
510
511macro_rules! impl_hash_table_set_tail_offset {
512 (
513 $method_name:ident,
514 $hash_table_ref:ident,
515 $hash_table_mut:ident,
516 $object_mut:ident
517 ) => {
518 pub fn $method_name(&mut self, hash: u64, object_offset: NonZeroU64) -> Result<()> {
519 let hash_item = {
520 let Some(ht) = self.$hash_table_ref() else {
521 return Err(JournalError::MissingHashTable);
522 };
523 *ht.hash_item_ref(hash)
524 };
525
526 if let Some(tail_hash_offset) = hash_item.tail_hash_offset {
527 let mut tail_object = self.$object_mut(tail_hash_offset, None)?;
528 tail_object.set_next_hash_offset(object_offset);
529 }
530
531 let Some(mut ht) = self.$hash_table_mut() else {
532 return Err(JournalError::MissingHashTable);
533 };
534
535 let hash_item = ht.hash_item_mut(hash);
536 if hash_item.head_hash_offset.is_none() {
537 hash_item.head_hash_offset = Some(object_offset);
538 }
539 hash_item.tail_hash_offset = Some(object_offset);
540
541 Ok(())
542 }
543 };
544}
545
546impl<M: MemoryMapMut> JournalFile<M> {
547 impl_hash_table_set_tail_offset!(
548 data_hash_table_set_tail_offset,
549 data_hash_table_ref,
550 data_hash_table_mut,
551 data_mut
552 );
553
554 impl_hash_table_set_tail_offset!(
555 field_hash_table_set_tail_offset,
556 field_hash_table_ref,
557 field_hash_table_mut,
558 field_mut
559 );
560}