Skip to main content

journal_core/file/
file_mut.rs

1use super::file::{
2    Compression, JOURNAL_COMPACT_SIZE_MAX, JournalFile, JournalFileOptions, OBJECT_ALIGNMENT,
3    map_hash_table, round_up_to_file_size_increment, validate_offset_alignment,
4};
5use super::mmap::{MemoryMap, MemoryMapMut, WindowManager};
6use super::object::*;
7use crate::error::{JournalError, Result};
8use crate::file::guarded_cell::GuardedCell;
9use crate::file::value_guard::ValueGuard;
10use std::fs::{File, OpenOptions};
11use std::num::NonZeroU64;
12#[cfg(unix)]
13use std::os::unix::fs::OpenOptionsExt;
14use zerocopy::FromBytes;
15
16#[derive(Debug, Clone, Copy)]
17struct CreateLayout {
18    data_hash_table_size: usize,
19    field_hash_table_size: usize,
20    data_hash_table_offset: u64,
21    field_hash_table_offset: u64,
22    data_hash_table_object_offset: u64,
23    file_size: u64,
24}
25
26#[derive(Debug, Clone, Copy)]
27struct MutableObjectContext {
28    object_type: ObjectType,
29    is_compact: bool,
30    arena_end: u64,
31}
32
33impl JournalFile<super::mmap::MmapMut> {
34    pub fn open_for_append(file: &crate::repository::File, window_size: u64) -> Result<Self> {
35        debug_assert_eq!(window_size % OBJECT_ALIGNMENT, 0);
36
37        let fd = OpenOptions::new()
38            .read(true)
39            .write(true)
40            .open(file.path())?;
41
42        let header_size = std::mem::size_of::<JournalHeader>() as u64;
43        let header_map = super::mmap::MmapMut::create(&fd, 0, header_size)?;
44        let header = JournalHeader::ref_from_prefix(&header_map).unwrap().0;
45        if header.signature != *b"LPKSHHRH" {
46            return Err(JournalError::InvalidMagicNumber);
47        }
48        if header.header_size < header_size {
49            return Err(JournalError::UnsupportedJournalFile);
50        }
51        if !header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash) {
52            return Err(JournalError::UnsupportedJournalFile);
53        }
54
55        let data_hash_table_map = map_hash_table(
56            &fd,
57            header.header_size,
58            header.data_hash_table_offset,
59            header.data_hash_table_size,
60        )?;
61        let field_hash_table_map = map_hash_table(
62            &fd,
63            header.header_size,
64            header.field_hash_table_offset,
65            header.field_hash_table_size,
66        )?;
67
68        let window_manager =
69            GuardedCell::new(WindowManager::new_writer_owned(fd, window_size, 32)?);
70
71        Ok(JournalFile {
72            file: file.clone(),
73            header_map,
74            sanitized_header: None,
75            data_hash_table_map,
76            field_hash_table_map,
77            window_manager,
78            seal_options: None,
79        })
80    }
81}
82
83impl<M: MemoryMapMut> JournalFile<M> {
84    /// Syncs all file data to disk, ensuring all changes are persisted
85    ///
86    /// This performs a two-step sync process:
87    /// 1. Flushes memory-mapped regions to the file page cache (msync)
88    /// 2. Syncs the file page cache to physical disk (fdatasync)
89    pub fn sync(&mut self) -> Result<()> {
90        // Flush memory-mapped header to file page cache
91        self.header_map.flush()?;
92
93        // Sync file page cache to disk
94        let (logical_size, header_size) = {
95            let header = self.journal_header_ref();
96            (header.header_size + header.arena_size, header.header_size)
97        };
98        let header_bytes = self.header_map[..header_size as usize].to_vec();
99        let window_manager = self.window_manager.get_mut();
100        window_manager.sync(logical_size, &header_bytes)?;
101
102        Ok(())
103    }
104
105    /// Trigger a stock-reader-visible post-change notification after mmap append.
106    pub fn post_change(&mut self) -> Result<()> {
107        let logical_size = {
108            let header = self.journal_header_ref();
109            header.header_size + header.arena_size
110        };
111        self.window_manager.get_mut().post_change(logical_size)
112    }
113
114    /// Creates a successor journal file with optimized bucket sizes based on this file's utilization
115    pub fn create_successor(
116        &self,
117        file: &crate::repository::File,
118        max_file_size: Option<u64>,
119    ) -> Result<Self> {
120        self.create_successor_with_file_mode(file, max_file_size, self.current_file_mode())
121    }
122
123    pub fn create_successor_with_file_mode(
124        &self,
125        file: &crate::repository::File,
126        max_file_size: Option<u64>,
127        file_mode: u32,
128    ) -> Result<Self> {
129        let header = self.journal_header_ref();
130        let bucket_utilization = self.bucket_utilization();
131
132        let options = JournalFileOptions::new(
133            uuid::Uuid::from_bytes(header.machine_id),
134            uuid::Uuid::from_bytes(header.tail_entry_boot_id),
135            uuid::Uuid::from_bytes(header.seqnum_id),
136        )
137        .with_window_size(8 * 1024 * 1024)
138        .with_keyed_hash(header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash))
139        .with_compact(header.has_incompatible_flag(HeaderIncompatibleFlags::Compact))
140        .with_file_mode(file_mode)
141        .with_optimized_buckets(bucket_utilization, max_file_size);
142
143        let options = if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedZstd) {
144            options.with_compression(Compression::Zstd)
145        } else if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedXz) {
146            options.with_compression(Compression::Xz)
147        } else if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedLz4) {
148            options.with_compression(Compression::Lz4)
149        } else {
150            options
151        };
152
153        Self::create(file, options)
154    }
155
156    pub fn create(file: &crate::repository::File, options: JournalFileOptions) -> Result<Self> {
157        let fd = Self::open_new_file(file, options.file_mode)?;
158        let layout = Self::create_layout(&options)?;
159        if options.compact && layout.file_size > JOURNAL_COMPACT_SIZE_MAX {
160            return Err(JournalError::ObjectExceedsFileBounds);
161        }
162        fd.set_len(layout.file_size)?;
163        let mut header = Self::create_header(&options, layout);
164        let data_hash_table_map = map_hash_table(
165            &fd,
166            header.header_size,
167            header.data_hash_table_offset,
168            header.data_hash_table_size,
169        )?;
170        let field_hash_table_map = map_hash_table(
171            &fd,
172            header.header_size,
173            header.field_hash_table_offset,
174            header.field_hash_table_size,
175        )?;
176        let header_map = Self::create_header_map(&fd, &mut header)?;
177        let window_manager = GuardedCell::new(WindowManager::new_writer_owned_with_strategy(
178            fd,
179            options.window_size,
180            32,
181            options.experimental_mmap_strategy,
182        )?);
183
184        let mut jf = JournalFile {
185            file: file.clone(),
186            header_map,
187            sanitized_header: None,
188            data_hash_table_map,
189            field_hash_table_map,
190            window_manager,
191            seal_options: options.seal.clone(),
192        };
193
194        jf.write_initial_hash_table_headers(header)?;
195        jf.sync()?;
196        Ok(jf)
197    }
198
199    fn current_file_mode(&self) -> u32 {
200        #[cfg(unix)]
201        {
202            use std::os::unix::fs::PermissionsExt;
203            if let Ok(metadata) = std::fs::metadata(self.file.path()) {
204                return metadata.permissions().mode() & 0o777;
205            }
206        }
207        super::file::DEFAULT_JOURNAL_FILE_MODE
208    }
209
210    fn open_new_file(file: &crate::repository::File, mode: u32) -> Result<File> {
211        let mut open_options = OpenOptions::new();
212        open_options
213            .create(true)
214            .truncate(true)
215            .read(true)
216            .write(true);
217        #[cfg(unix)]
218        open_options.mode(mode);
219        Ok(open_options.open(file.path())?)
220    }
221
222    fn create_layout(options: &JournalFileOptions) -> Result<CreateLayout> {
223        let data_hash_table_size =
224            options.data_hash_table_buckets * std::mem::size_of::<HashItem>();
225        let field_hash_table_size =
226            options.field_hash_table_buckets * std::mem::size_of::<HashItem>();
227        let field_hash_table_offset = std::mem::size_of::<JournalHeader>() as u64
228            + std::mem::size_of::<ObjectHeader>() as u64;
229        let data_hash_table_offset = field_hash_table_offset
230            + field_hash_table_size as u64
231            + std::mem::size_of::<ObjectHeader>() as u64;
232        let data_hash_table_object_offset =
233            data_hash_table_offset - std::mem::size_of::<ObjectHeader>() as u64;
234        let append_offset = data_hash_table_offset + data_hash_table_size as u64;
235        let file_size = round_up_to_file_size_increment(append_offset)?;
236        Ok(CreateLayout {
237            data_hash_table_size,
238            field_hash_table_size,
239            data_hash_table_offset,
240            field_hash_table_offset,
241            data_hash_table_object_offset,
242            file_size,
243        })
244    }
245
246    fn create_header(options: &JournalFileOptions, layout: CreateLayout) -> JournalHeader {
247        let mut header = JournalHeader::default();
248        header.signature = *b"LPKSHHRH";
249        header.compatible_flags = HeaderCompatibleFlags::TailEntryBootId as u32;
250        if options.enable_keyed_hash {
251            header.incompatible_flags |= HeaderIncompatibleFlags::KeyedHash as u32;
252        }
253        header.incompatible_flags |= options.compression.as_incompatible_flag();
254        if options.compact {
255            header.incompatible_flags |= HeaderIncompatibleFlags::Compact as u32;
256        }
257        if options.seal.is_some() {
258            header.compatible_flags |= HeaderCompatibleFlags::Sealed as u32;
259            header.compatible_flags |= HeaderCompatibleFlags::SealedContinuous as u32;
260        }
261        header.data_hash_table_offset = NonZeroU64::new(layout.data_hash_table_offset);
262        header.data_hash_table_size = NonZeroU64::new(layout.data_hash_table_size as u64);
263        header.field_hash_table_offset = NonZeroU64::new(layout.field_hash_table_offset);
264        header.field_hash_table_size = NonZeroU64::new(layout.field_hash_table_size as u64);
265        header.tail_object_offset = NonZeroU64::new(layout.data_hash_table_object_offset);
266        header.header_size = std::mem::size_of::<JournalHeader>() as u64;
267        header.n_objects = 2;
268        header.arena_size = layout.file_size - header.header_size;
269        header.machine_id = *options.machine_id.as_bytes();
270        header.file_id = *options.file_id.as_bytes();
271        header.seqnum_id = *options.seqnum_id.as_bytes();
272        header
273    }
274
275    fn create_header_map(fd: &File, header: &mut JournalHeader) -> Result<M> {
276        let header_size = std::mem::size_of::<JournalHeader>() as u64;
277        let mut header_map = M::create(fd, 0, header_size)?;
278        {
279            let header_mut = JournalHeader::mut_from_prefix(&mut header_map).unwrap().0;
280            *header_mut = *header;
281            header_mut.state = JournalState::Online as u8;
282            header.state = JournalState::Online as u8;
283        }
284        Ok(header_map)
285    }
286
287    fn write_initial_hash_table_headers(&mut self, header: JournalHeader) -> Result<()> {
288        self.write_hash_table_object_header(
289            header.data_hash_table_offset.unwrap(),
290            header.data_hash_table_size.unwrap(),
291            ObjectType::DataHashTable,
292        )?;
293        self.write_hash_table_object_header(
294            header.field_hash_table_offset.unwrap(),
295            header.field_hash_table_size.unwrap(),
296            ObjectType::FieldHashTable,
297        )
298    }
299
300    fn write_hash_table_object_header(
301        &self,
302        table_offset: NonZeroU64,
303        table_size: NonZeroU64,
304        object_type: ObjectType,
305    ) -> Result<()> {
306        let object_offset =
307            NonZeroU64::new(table_offset.get() - std::mem::size_of::<ObjectHeader>() as u64)
308                .unwrap();
309        let object_header = self.object_header_mut(object_offset)?;
310        object_header.type_ = object_type as u8;
311        object_header.size = table_size.get() + std::mem::size_of::<ObjectHeader>() as u64;
312        Ok(())
313    }
314
315    pub fn journal_header_mut(&mut self) -> &mut JournalHeader {
316        JournalHeader::mut_from_prefix(&mut self.header_map)
317            .unwrap()
318            .0
319    }
320
321    pub fn data_hash_table_mut(&mut self) -> Option<DataHashTable<&mut [u8]>> {
322        self.data_hash_table_map
323            .as_mut()
324            .and_then(|m| DataHashTable::<&mut [u8]>::from_data_mut(m, false))
325    }
326
327    pub fn field_hash_table_mut(&mut self) -> Option<FieldHashTable<&mut [u8]>> {
328        self.field_hash_table_map
329            .as_mut()
330            .and_then(|m| FieldHashTable::<&mut [u8]>::from_data_mut(m, false))
331    }
332
333    #[allow(clippy::mut_from_ref)]
334    fn object_header_mut(&self, offset: NonZeroU64) -> Result<&mut ObjectHeader> {
335        validate_offset_alignment(offset)?;
336        let size_needed = std::mem::size_of::<ObjectHeader>() as u64;
337        let window_manager = self.window_manager.borrow_mut_checked()?;
338        let header_slice = window_manager.get_slice_mut(offset.get(), size_needed)?;
339        ObjectHeader::mut_from_bytes(header_slice).map_err(|_| JournalError::ZerocopyFailure)
340    }
341
342    fn journal_object_mut<'a, T>(
343        &'a self,
344        type_: ObjectType,
345        offset: NonZeroU64,
346        size: Option<u64>,
347    ) -> Result<ValueGuard<'a, T>>
348    where
349        T: JournalObjectMut<&'a mut [u8]>,
350    {
351        let context = self.mutable_object_context(type_, offset)?;
352        self.window_manager.with_guarded(offset, |wm| {
353            let size_needed = Self::mutable_object_size(wm, context, offset, size)?;
354            let data = wm.get_slice_mut(offset.get(), size_needed)?;
355            let value =
356                T::from_data_mut(data, context.is_compact).ok_or(JournalError::ZerocopyFailure)?;
357            Ok(value)
358        })
359    }
360
361    fn mutable_object_context(
362        &self,
363        object_type: ObjectType,
364        offset: NonZeroU64,
365    ) -> Result<MutableObjectContext> {
366        validate_offset_alignment(offset)?;
367        let journal_header = self.journal_header_ref();
368        let header_size = journal_header.header_size;
369        if offset.get() < header_size {
370            return Err(JournalError::ObjectExceedsFileBounds);
371        }
372        Ok(MutableObjectContext {
373            object_type,
374            is_compact: journal_header.has_incompatible_flag(HeaderIncompatibleFlags::Compact),
375            arena_end: header_size + journal_header.arena_size,
376        })
377    }
378
379    fn mutable_object_size(
380        wm: &mut WindowManager<M>,
381        context: MutableObjectContext,
382        offset: NonZeroU64,
383        size: Option<u64>,
384    ) -> Result<u64> {
385        match size {
386            Some(size) => Self::initialize_mutable_object_header(wm, context, offset, size),
387            None => Self::existing_mutable_object_size(wm, context, offset),
388        }
389    }
390
391    fn initialize_mutable_object_header(
392        wm: &mut WindowManager<M>,
393        context: MutableObjectContext,
394        offset: NonZeroU64,
395        size: u64,
396    ) -> Result<u64> {
397        let header_slice =
398            wm.get_slice_mut(offset.get(), std::mem::size_of::<ObjectHeader>() as u64)?;
399        let header = ObjectHeader::mut_from_bytes(header_slice)
400            .map_err(|_| JournalError::ZerocopyFailure)?;
401        header.type_ = context.object_type as u8;
402        header.size = size;
403        Ok(size)
404    }
405
406    fn existing_mutable_object_size(
407        wm: &mut WindowManager<M>,
408        context: MutableObjectContext,
409        offset: NonZeroU64,
410    ) -> Result<u64> {
411        let header_slice =
412            wm.get_slice(offset.get(), std::mem::size_of::<ObjectHeader>() as u64)?;
413        let header = ObjectHeader::ref_from_bytes(header_slice)
414            .map_err(|_| JournalError::ZerocopyFailure)?;
415        if header.type_ != context.object_type as u8 {
416            return Err(JournalError::InvalidObjectType);
417        }
418        let size_needed = header.validated_size()?;
419        Self::validate_mutable_object_bounds(context, offset, size_needed)?;
420        Ok(size_needed)
421    }
422
423    fn validate_mutable_object_bounds(
424        context: MutableObjectContext,
425        offset: NonZeroU64,
426        size_needed: u64,
427    ) -> Result<()> {
428        let end_offset = offset
429            .get()
430            .checked_add(size_needed)
431            .ok_or(JournalError::ObjectExceedsFileBounds)?;
432        if end_offset > context.arena_end {
433            return Err(JournalError::ObjectExceedsFileBounds);
434        }
435        Ok(())
436    }
437
438    pub fn offset_array_mut(
439        &self,
440        offset: NonZeroU64,
441        capacity: Option<NonZeroU64>,
442    ) -> Result<ValueGuard<'_, OffsetArrayObject<&mut [u8]>>> {
443        let size = capacity.map(|c| {
444            let mut size = std::mem::size_of::<OffsetArrayObjectHeader>() as u64;
445
446            let is_compact = self
447                .journal_header_ref()
448                .has_incompatible_flag(HeaderIncompatibleFlags::Compact);
449            if is_compact {
450                size += c.get() * std::mem::size_of::<u32>() as u64;
451            } else {
452                size += c.get() * std::mem::size_of::<u64>() as u64;
453            }
454
455            size
456        });
457
458        self.journal_object_mut(ObjectType::EntryArray, offset, size)
459    }
460
461    pub fn field_mut(
462        &self,
463        offset: NonZeroU64,
464        size: Option<u64>,
465    ) -> Result<ValueGuard<'_, FieldObject<&mut [u8]>>> {
466        let size = size.map(|n| std::mem::size_of::<FieldObjectHeader>() as u64 + n);
467        self.journal_object_mut(ObjectType::Field, offset, size)
468    }
469
470    pub fn entry_mut(
471        &self,
472        offset: NonZeroU64,
473        size: Option<u64>,
474    ) -> Result<ValueGuard<'_, EntryObject<&mut [u8]>>> {
475        let size = size.map(|n| std::mem::size_of::<EntryObjectHeader>() as u64 + n);
476        self.journal_object_mut(ObjectType::Entry, offset, size)
477    }
478
479    pub fn data_mut(
480        &self,
481        offset: NonZeroU64,
482        size: Option<u64>,
483    ) -> Result<ValueGuard<'_, DataObject<&mut [u8]>>> {
484        let size = size.map(|n| {
485            let mut size = std::mem::size_of::<DataObjectHeader>() as u64 + n;
486            if self
487                .journal_header_ref()
488                .has_incompatible_flag(HeaderIncompatibleFlags::Compact)
489            {
490                size += std::mem::size_of::<CompactDataFields>() as u64;
491            }
492            size
493        });
494        self.journal_object_mut(ObjectType::Data, offset, size)
495    }
496
497    pub fn tag_mut(
498        &self,
499        offset: NonZeroU64,
500        new: bool,
501    ) -> Result<ValueGuard<'_, TagObject<&mut [u8]>>> {
502        let size = if new {
503            Some(std::mem::size_of::<TagObjectHeader>() as u64)
504        } else {
505            None
506        };
507        self.journal_object_mut(ObjectType::Tag, offset, size)
508    }
509}
510
511macro_rules! impl_hash_table_set_tail_offset {
512    (
513        $method_name:ident,
514        $hash_table_ref:ident,
515        $hash_table_mut:ident,
516        $object_mut:ident
517    ) => {
518        pub fn $method_name(&mut self, hash: u64, object_offset: NonZeroU64) -> Result<()> {
519            let hash_item = {
520                let Some(ht) = self.$hash_table_ref() else {
521                    return Err(JournalError::MissingHashTable);
522                };
523                *ht.hash_item_ref(hash)
524            };
525
526            if let Some(tail_hash_offset) = hash_item.tail_hash_offset {
527                let mut tail_object = self.$object_mut(tail_hash_offset, None)?;
528                tail_object.set_next_hash_offset(object_offset);
529            }
530
531            let Some(mut ht) = self.$hash_table_mut() else {
532                return Err(JournalError::MissingHashTable);
533            };
534
535            let hash_item = ht.hash_item_mut(hash);
536            if hash_item.head_hash_offset.is_none() {
537                hash_item.head_hash_offset = Some(object_offset);
538            }
539            hash_item.tail_hash_offset = Some(object_offset);
540
541            Ok(())
542        }
543    };
544}
545
546impl<M: MemoryMapMut> JournalFile<M> {
547    impl_hash_table_set_tail_offset!(
548        data_hash_table_set_tail_offset,
549        data_hash_table_ref,
550        data_hash_table_mut,
551        data_mut
552    );
553
554    impl_hash_table_set_tail_offset!(
555        field_hash_table_set_tail_offset,
556        field_hash_table_ref,
557        field_hash_table_mut,
558        field_mut
559    );
560}