Skip to main content

journal_core/file/
writer.rs

1use super::mmap::MmapMut;
2use crate::error::{JournalError, Result};
3use crate::file::{
4    Compression, DEFAULT_COMPRESS_THRESHOLD, DataObject, DataPayloadType, EntryObjectHeader,
5    FieldObjectHeader, HashableObjectMut, HeaderIncompatibleFlags, JournalFile, JournalHeader,
6    ObjectFlags, ObjectType, PayloadParts, hash::jenkins_hash64_parts,
7    normalize_compress_threshold,
8};
9use rustc_hash::FxHashMap;
10use std::io::Cursor;
11use std::num::NonZeroU64;
12
13pub(super) const OBJECT_ALIGNMENT: u64 = 8;
14pub(super) const JOURNAL_COMPACT_SIZE_MAX: u64 = u32::MAX as u64;
15const FILE_SIZE_INCREASE: u64 = 8 * 1024 * 1024;
16const FIELD_CACHE_MAX_ENTRIES: usize = 1024;
17const FIELD_CACHE_MAX_PAYLOAD_LEN: usize = 128;
18pub(super) fn round_up_to_file_size_increment(value: u64) -> Result<u64> {
19    value
20        .checked_add(FILE_SIZE_INCREASE - 1)
21        .map(|v| v & !(FILE_SIZE_INCREASE - 1))
22        .ok_or(JournalError::ObjectExceedsFileBounds)
23}
24
25#[derive(Debug, Clone, Copy)]
26pub(super) struct EntryItem {
27    pub(super) offset: NonZeroU64,
28    pub(super) hash: u64,
29}
30
31#[derive(Debug, Clone, Copy)]
32pub struct StructuredField<'a> {
33    /// Field name without the `=` separator.
34    pub name: &'a [u8],
35    /// Field value bytes. Values may contain NUL bytes and `=` bytes.
36    pub value: &'a [u8],
37}
38
39impl<'a> StructuredField<'a> {
40    /// Creates a structured journal field from a name and binary-safe value.
41    pub fn new(name: &'a [u8], value: &'a [u8]) -> Self {
42        Self { name, value }
43    }
44}
45
46#[derive(Debug, Clone, Copy)]
47pub enum EntryField<'a> {
48    /// Full `KEY=value` payload, matching systemd's low-level writer shape.
49    Raw(&'a [u8]),
50    /// Split field name and value, avoiding `KEY=value` reconstruction for
51    /// already-structured producers.
52    Structured(StructuredField<'a>),
53}
54
55impl<'a> EntryField<'a> {
56    /// Creates a raw full-field entry item from a `KEY=value` byte payload.
57    pub fn raw(payload: &'a [u8]) -> Self {
58        Self::Raw(payload)
59    }
60
61    /// Creates a structured entry item from a name and binary-safe value.
62    pub fn structured(name: &'a [u8], value: &'a [u8]) -> Self {
63        Self::Structured(StructuredField::new(name, value))
64    }
65
66    fn payload_parts(self) -> PayloadParts<'a> {
67        match self {
68            Self::Raw(payload) => PayloadParts::raw(payload),
69            Self::Structured(field) => PayloadParts::structured(field.name, field.value),
70        }
71    }
72
73    fn field_name(self) -> Option<&'a [u8]> {
74        match self {
75            Self::Raw(payload) => payload
76                .iter()
77                .position(|&b| b == b'=')
78                .map(|pos| &payload[..pos]),
79            Self::Structured(field) => Some(field.name),
80        }
81    }
82}
83
84#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
85pub enum FieldNamePolicy {
86    /// Trusted journald-compatible field names. Protected `_...` names are
87    /// allowed.
88    #[default]
89    Journald,
90    /// Journal DATA structure capability only. Stock systemd tooling
91    /// compatibility is not guaranteed for names outside JOURNALD.
92    Raw,
93    /// Untrusted application input accepted by journald. Invalid or protected
94    /// caller fields are dropped.
95    JournalApp,
96}
97
98#[derive(Debug, Clone, Copy, Default)]
99pub struct EntryWriteOptions {
100    /// Skips duplicate DATA reference elimination for this ENTRY.
101    ///
102    /// Set this only when the caller guarantees that the entry contains no
103    /// duplicate full `KEY=value` payloads after field-name policy filtering.
104    /// Offset sorting by DATA object offset is always performed regardless of
105    /// this flag.
106    /// Misuse can write duplicate DATA offsets into one ENTRY object. Keep the
107    /// default `false` unless the producer owns and enforces that invariant.
108    pub trusted_unique_payloads: bool,
109    /// Field-name validation policy for caller-provided fields.
110    pub field_name_policy: FieldNamePolicy,
111    /// Optional low-level ENTRY seqnum override.
112    ///
113    /// This is for exact journal regeneration and must be monotonically
114    /// increasing relative to previously written entries. Leave unset for the
115    /// normal systemd-style auto-incrementing sequence.
116    pub seqnum: Option<u64>,
117    /// Optional low-level ENTRY boot ID override.
118    ///
119    /// This is for exact journal regeneration of multi-boot files. Leave unset
120    /// for the normal writer-wide boot ID.
121    pub boot_id: Option<uuid::Uuid>,
122}
123
124impl EntryWriteOptions {
125    /// Enables or disables the trusted unique-payload fast path.
126    ///
127    /// See [`EntryWriteOptions::trusted_unique_payloads`] for the caller
128    /// invariant required before enabling this option.
129    pub fn trusted_unique_payloads(mut self, enabled: bool) -> Self {
130        self.trusted_unique_payloads = enabled;
131        self
132    }
133
134    /// Selects the field-name validation policy for caller-provided fields.
135    pub fn field_name_policy(mut self, policy: FieldNamePolicy) -> Self {
136        self.field_name_policy = policy;
137        self
138    }
139
140    /// Uses a caller-provided ENTRY seqnum for this entry.
141    pub fn seqnum(mut self, seqnum: u64) -> Self {
142        self.seqnum = Some(seqnum);
143        self
144    }
145
146    /// Uses a caller-provided ENTRY boot ID for this entry.
147    pub fn boot_id(mut self, boot_id: uuid::Uuid) -> Self {
148        self.boot_id = Some(boot_id);
149        self
150    }
151}
152
153fn is_journal_field_name_valid(field_name: &[u8], allow_protected: bool) -> bool {
154    if field_name.is_empty() || field_name.len() > 64 {
155        return false;
156    }
157    if field_name[0] == b'_' && !allow_protected {
158        return false;
159    }
160    if field_name[0].is_ascii_digit() {
161        return false;
162    }
163    field_name
164        .iter()
165        .all(|&b| b.is_ascii_uppercase() || b.is_ascii_digit() || b == b'_')
166}
167
168fn is_raw_field_name_valid(field_name: &[u8]) -> bool {
169    !field_name.is_empty() && !field_name.contains(&b'=')
170}
171
172fn accept_entry_field(field: EntryField<'_>, policy: FieldNamePolicy) -> Result<bool> {
173    let Some(field_name) = field.field_name() else {
174        return Err(JournalError::InvalidField);
175    };
176    let valid = match policy {
177        FieldNamePolicy::Raw => is_raw_field_name_valid(field_name),
178        FieldNamePolicy::Journald => is_journal_field_name_valid(field_name, true),
179        FieldNamePolicy::JournalApp => is_journal_field_name_valid(field_name, false),
180    };
181    if valid {
182        return Ok(true);
183    }
184    if matches!(policy, FieldNamePolicy::JournalApp) {
185        return Ok(false);
186    }
187    Err(JournalError::InvalidField)
188}
189
190#[derive(Debug)]
191struct FieldCache {
192    entries: FxHashMap<Box<[u8]>, NonZeroU64>,
193}
194
195impl FieldCache {
196    fn new() -> Self {
197        Self {
198            entries: FxHashMap::default(),
199        }
200    }
201
202    fn get(&self, payload: &[u8]) -> Option<NonZeroU64> {
203        self.entries.get(payload).copied()
204    }
205
206    fn insert(&mut self, payload: &[u8], offset: NonZeroU64) {
207        if payload.len() > FIELD_CACHE_MAX_PAYLOAD_LEN {
208            return;
209        }
210
211        if self.entries.len() >= FIELD_CACHE_MAX_ENTRIES && self.entries.get(payload).is_none() {
212            self.entries.clear();
213        }
214
215        self.entries
216            .insert(payload.to_vec().into_boxed_slice(), offset);
217    }
218
219    #[cfg(test)]
220    fn len(&self) -> usize {
221        self.entries.len()
222    }
223}
224
225enum StoredDataPayload<'a> {
226    Uncompressed(PayloadParts<'a>),
227    Compressed(Vec<u8>, u8),
228}
229
230impl StoredDataPayload<'_> {
231    fn len(&self) -> usize {
232        match self {
233            Self::Uncompressed(payload) => payload.len(),
234            Self::Compressed(payload, _) => payload.len(),
235        }
236    }
237
238    fn object_flags(&self) -> u8 {
239        match self {
240            Self::Uncompressed(_) => 0,
241            Self::Compressed(_, flags) => *flags,
242        }
243    }
244
245    fn copy_to_data_object(&self, data: &mut DataObject<&mut [u8]>) {
246        match self {
247            Self::Uncompressed(payload) => match &mut data.payload {
248                DataPayloadType::Regular(dst) => payload.copy_to_slice(dst),
249                DataPayloadType::Compact { payload: dst, .. } => payload.copy_to_slice(dst),
250            },
251            Self::Compressed(payload, _) => data.set_payload(payload),
252        }
253    }
254}
255
256pub struct JournalWriter {
257    pub(super) tail_object_offset: NonZeroU64,
258    pub(super) append_offset: NonZeroU64,
259    next_seqnum: u64,
260    num_written_objects: u64,
261    pub(super) first_tag_written: bool,
262    pub(super) entry_items: Vec<EntryItem>,
263    field_cache: FieldCache,
264    first_entry_monotonic: Option<u64>,
265    boot_id: uuid::Uuid,
266    compression: Compression,
267    compress_threshold: usize,
268    live_publish_every_entries: u64,
269    entries_since_live_publication: u64,
270    pub(super) seal: Option<crate::seal::SealState>,
271}
272
273impl JournalWriter {
274    /// Get current file size in bytes
275    pub fn current_file_size(&self) -> u64 {
276        self.append_offset.get()
277    }
278
279    /// Get the monotonic timestamp of the first entry written to this file
280    pub fn first_entry_monotonic(&self) -> Option<u64> {
281        self.first_entry_monotonic
282    }
283
284    /// Get the next sequence number that will be written
285    pub fn next_seqnum(&self) -> u64 {
286        self.next_seqnum
287    }
288
289    /// Get the boot ID for this writer
290    pub fn boot_id(&self) -> uuid::Uuid {
291        self.boot_id
292    }
293
294    /// Sets how often the writer explicitly publishes live-reader visibility.
295    ///
296    /// `1` is the default and matches systemd-style publication after every
297    /// appended entry. `0` disables this explicit publication; closed-file
298    /// verification and reads after sync/close are unchanged, but stock
299    /// follow-reader visibility while the writer is active is not guaranteed.
300    /// Values greater than `1` publish after every N appended entries.
301    pub fn set_live_publish_every_entries(&mut self, entries: u64) {
302        self.live_publish_every_entries = entries;
303        self.entries_since_live_publication = 0;
304    }
305
306    /// Returns the configured live-reader publication cadence.
307    ///
308    /// See [`JournalWriter::set_live_publish_every_entries`] for the meaning of
309    /// `0`, `1`, and larger values.
310    pub fn live_publish_every_entries(&self) -> u64 {
311        self.live_publish_every_entries
312    }
313
314    pub fn new(
315        journal_file: &mut JournalFile<MmapMut>,
316        next_seqnum: u64,
317        boot_id: uuid::Uuid,
318    ) -> Result<Self> {
319        let compression = match journal_file.journal_header_ref() {
320            header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedZstd) => {
321                Compression::Zstd
322            }
323            header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedXz) => {
324                Compression::Xz
325            }
326            header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedLz4) => {
327                Compression::Lz4
328            }
329            _ => Compression::None,
330        };
331
332        Self::new_with_compression(
333            journal_file,
334            next_seqnum,
335            boot_id,
336            compression,
337            DEFAULT_COMPRESS_THRESHOLD,
338        )
339    }
340
341    pub fn new_with_compression(
342        journal_file: &mut JournalFile<MmapMut>,
343        next_seqnum: u64,
344        boot_id: uuid::Uuid,
345        compression: Compression,
346        compress_threshold: usize,
347    ) -> Result<Self> {
348        let current_header_size = std::mem::size_of::<JournalHeader>() as u64;
349        let header = journal_file.journal_header_ref();
350        if !header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash)
351            || header.header_size < current_header_size
352        {
353            return Err(JournalError::UnsupportedJournalFile);
354        }
355
356        let append_offset = {
357            let header = journal_file.journal_header_ref();
358
359            let Some(tail_object_offset) = header.tail_object_offset else {
360                return Err(JournalError::InvalidMagicNumber);
361            };
362
363            let tail_object = journal_file.object_header_ref(tail_object_offset)?;
364
365            tail_object_offset.saturating_add(tail_object.size)
366        };
367
368        let seal = journal_file
369            .seal_options
370            .as_ref()
371            .map(|opts| crate::seal::SealState::new(opts))
372            .transpose()?;
373
374        let mut writer = Self {
375            tail_object_offset: journal_file
376                .journal_header_ref()
377                .tail_object_offset
378                .unwrap(),
379            append_offset,
380            next_seqnum,
381            num_written_objects: 0,
382            first_tag_written: false,
383            entry_items: Vec::with_capacity(128),
384            field_cache: FieldCache::new(),
385            first_entry_monotonic: None,
386            boot_id,
387            compression,
388            compress_threshold: normalize_compress_threshold(compress_threshold),
389            live_publish_every_entries: 1,
390            entries_since_live_publication: 0,
391            seal,
392        };
393
394        if writer.seal.is_some() && journal_file.journal_header_ref().n_tags == 0 {
395            writer.ensure_first_tag(journal_file)?;
396            {
397                let header = journal_file.journal_header_mut();
398                header.n_objects += writer.num_written_objects;
399                header.tail_object_offset = Some(writer.tail_object_offset);
400            }
401            writer.num_written_objects = 0;
402        }
403
404        Ok(writer)
405    }
406
407    /// Creates a successor writer for a new journal file
408    pub fn create_successor(&self, journal_file: &mut JournalFile<MmapMut>) -> Result<Self> {
409        Self::new_with_compression(
410            journal_file,
411            self.next_seqnum,
412            self.boot_id,
413            self.compression,
414            self.compress_threshold,
415        )
416    }
417
418    pub fn add_entry(
419        &mut self,
420        journal_file: &mut JournalFile<MmapMut>,
421        items: &[&[u8]],
422        realtime: u64,
423        monotonic: u64,
424    ) -> Result<()> {
425        self.add_entry_fields_with_options(
426            journal_file,
427            items.iter().copied().map(EntryField::raw),
428            realtime,
429            monotonic,
430            EntryWriteOptions::default(),
431        )
432    }
433
434    pub fn add_entry_structured(
435        &mut self,
436        journal_file: &mut JournalFile<MmapMut>,
437        fields: &[StructuredField<'_>],
438        realtime: u64,
439        monotonic: u64,
440    ) -> Result<()> {
441        self.add_entry_fields_with_options(
442            journal_file,
443            fields.iter().copied().map(EntryField::Structured),
444            realtime,
445            monotonic,
446            EntryWriteOptions::default(),
447        )
448    }
449
450    pub fn add_entry_structured_with_options(
451        &mut self,
452        journal_file: &mut JournalFile<MmapMut>,
453        fields: &[StructuredField<'_>],
454        realtime: u64,
455        monotonic: u64,
456        options: EntryWriteOptions,
457    ) -> Result<()> {
458        self.add_entry_fields_with_options(
459            journal_file,
460            fields.iter().copied().map(EntryField::Structured),
461            realtime,
462            monotonic,
463            options,
464        )
465    }
466
467    pub fn add_entry_fields<'a>(
468        &mut self,
469        journal_file: &mut JournalFile<MmapMut>,
470        fields: impl IntoIterator<Item = EntryField<'a>>,
471        realtime: u64,
472        monotonic: u64,
473    ) -> Result<()> {
474        self.add_entry_fields_with_options(
475            journal_file,
476            fields,
477            realtime,
478            monotonic,
479            EntryWriteOptions::default(),
480        )
481    }
482
483    pub fn add_entry_fields_with_options<'a>(
484        &mut self,
485        journal_file: &mut JournalFile<MmapMut>,
486        fields: impl IntoIterator<Item = EntryField<'a>>,
487        realtime: u64,
488        monotonic: u64,
489        options: EntryWriteOptions,
490    ) -> Result<()> {
491        self.ensure_keyed_append(journal_file)?;
492        let entry_seqnum = self.entry_seqnum_for_options(options)?;
493        let entry_boot_id = options.boot_id.unwrap_or(self.boot_id);
494        let monotonic = self.clamp_same_boot_monotonic(journal_file, entry_boot_id, monotonic)?;
495        let xor_hash = self.prepare_entry_items(journal_file, fields, realtime, options)?;
496        let entry_offset = self.write_entry_object(
497            journal_file,
498            entry_seqnum,
499            entry_boot_id,
500            realtime,
501            monotonic,
502            xor_hash,
503        )?;
504        self.publish_entry_links(journal_file, entry_offset)?;
505        self.entry_added(
506            journal_file.journal_header_mut(),
507            entry_offset,
508            entry_seqnum,
509            entry_boot_id,
510            realtime,
511            monotonic,
512        );
513        self.publish_after_entry(journal_file)
514    }
515
516    fn ensure_keyed_append(&self, journal_file: &JournalFile<MmapMut>) -> Result<()> {
517        let header = journal_file.journal_header_ref();
518        if header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash) {
519            return Ok(());
520        }
521        Err(JournalError::UnsupportedJournalFile)
522    }
523
524    fn entry_seqnum_for_options(&self, options: EntryWriteOptions) -> Result<u64> {
525        let entry_seqnum = options.seqnum.unwrap_or(self.next_seqnum);
526        if entry_seqnum == 0 || entry_seqnum == u64::MAX || entry_seqnum < self.next_seqnum {
527            return Err(JournalError::InvalidField);
528        }
529        Ok(entry_seqnum)
530    }
531
532    fn clamp_same_boot_monotonic(
533        &self,
534        journal_file: &JournalFile<MmapMut>,
535        entry_boot_id: uuid::Uuid,
536        monotonic: u64,
537    ) -> Result<u64> {
538        let header = journal_file.journal_header_ref();
539        if header.n_entries == 0
540            || header.tail_entry_boot_id != *entry_boot_id.as_bytes()
541            || monotonic > header.tail_entry_monotonic
542        {
543            return Ok(monotonic);
544        }
545        header
546            .tail_entry_monotonic
547            .checked_add(1)
548            .ok_or(JournalError::InvalidField)
549    }
550
551    fn prepare_entry_items<'a>(
552        &mut self,
553        journal_file: &mut JournalFile<MmapMut>,
554        fields: impl IntoIterator<Item = EntryField<'a>>,
555        realtime: u64,
556        options: EntryWriteOptions,
557    ) -> Result<u64> {
558        let mut xor_hash = 0;
559        self.entry_items.clear();
560        let mut publication_ready = false;
561        for field in fields {
562            if !accept_entry_field(field, options.field_name_policy)? {
563                continue;
564            }
565            self.ensure_entry_publication_ready(journal_file, realtime, &mut publication_ready)?;
566            xor_hash ^= self.add_entry_field_item(journal_file, field)?;
567        }
568        self.finish_entry_items(options.trusted_unique_payloads)?;
569        Ok(xor_hash)
570    }
571
572    fn ensure_entry_publication_ready(
573        &mut self,
574        journal_file: &mut JournalFile<MmapMut>,
575        realtime: u64,
576        publication_ready: &mut bool,
577    ) -> Result<()> {
578        if *publication_ready {
579            return Ok(());
580        }
581        self.ensure_first_tag(journal_file)?;
582        self.maybe_append_tag(journal_file, realtime)?;
583        *publication_ready = true;
584        Ok(())
585    }
586
587    fn add_entry_field_item(
588        &mut self,
589        journal_file: &mut JournalFile<MmapMut>,
590        field: EntryField<'_>,
591    ) -> Result<u64> {
592        let entry_item = self.add_data(journal_file, field)?;
593        self.entry_items.push(entry_item);
594        Ok(jenkins_hash64_parts(field.payload_parts().iter()))
595    }
596
597    fn finish_entry_items(&mut self, trusted_unique_payloads: bool) -> Result<()> {
598        if self.entry_items.is_empty() {
599            return Err(JournalError::InvalidField);
600        }
601        if !self.entry_items_are_sorted() {
602            self.entry_items
603                .sort_unstable_by(|a, b| a.offset.cmp(&b.offset));
604        }
605        if !trusted_unique_payloads {
606            self.entry_items.dedup_by(|a, b| a.offset == b.offset);
607        }
608        Ok(())
609    }
610
611    fn entry_items_are_sorted(&self) -> bool {
612        self.entry_items
613            .windows(2)
614            .all(|items| items[0].offset <= items[1].offset)
615    }
616
617    fn write_entry_object(
618        &mut self,
619        journal_file: &mut JournalFile<MmapMut>,
620        entry_seqnum: u64,
621        entry_boot_id: uuid::Uuid,
622        realtime: u64,
623        monotonic: u64,
624        xor_hash: u64,
625    ) -> Result<NonZeroU64> {
626        let entry_offset = self.append_offset;
627        let is_compact = Self::is_compact(journal_file);
628        let entry_payload_size = self.entry_items.len() as u64 * Self::entry_item_size(is_compact);
629        Self::ensure_compact_object_fits(
630            is_compact,
631            entry_offset,
632            std::mem::size_of::<EntryObjectHeader>() as u64 + entry_payload_size,
633        )?;
634        let entry_size = {
635            let size = Some(entry_payload_size);
636            let mut entry_guard = journal_file.entry_mut(entry_offset, size)?;
637
638            entry_guard.header.seqnum = entry_seqnum;
639            entry_guard.header.xor_hash = xor_hash;
640            entry_guard.header.boot_id = *entry_boot_id.as_bytes();
641            entry_guard.header.monotonic = monotonic;
642            entry_guard.header.realtime = realtime;
643
644            // set each entry item
645            for (index, entry_item) in self.entry_items.iter().enumerate() {
646                Self::ensure_compact_offset(is_compact, entry_item.offset)?;
647                let item_hash = (!is_compact).then_some(entry_item.hash);
648                entry_guard.items.set(index, entry_item.offset, item_hash);
649            }
650
651            entry_guard.header.object_header.aligned_size()
652        };
653        self.hmac_put_object(journal_file, entry_offset.get(), ObjectType::Entry)?;
654        self.object_added(journal_file, entry_offset, entry_size)?;
655        Ok(entry_offset)
656    }
657
658    fn publish_entry_links(
659        &mut self,
660        journal_file: &mut JournalFile<MmapMut>,
661        entry_offset: NonZeroU64,
662    ) -> Result<()> {
663        self.append_to_entry_array(journal_file, entry_offset)?;
664        for entry_item_index in 0..self.entry_items.len() {
665            self.link_data_to_entry(journal_file, entry_offset, entry_item_index)?;
666        }
667        Ok(())
668    }
669
670    fn publish_after_entry(&mut self, journal_file: &mut JournalFile<MmapMut>) -> Result<()> {
671        match self.live_publish_every_entries {
672            0 => Ok(()),
673            1 => journal_file.post_change(),
674            interval => {
675                self.entries_since_live_publication += 1;
676                if self.entries_since_live_publication >= interval {
677                    self.entries_since_live_publication = 0;
678                    journal_file.post_change()
679                } else {
680                    Ok(())
681                }
682            }
683        }
684    }
685
686    pub(super) fn object_added(
687        &mut self,
688        journal_file: &mut JournalFile<MmapMut>,
689        object_offset: NonZeroU64,
690        object_size: u64,
691    ) -> Result<()> {
692        self.tail_object_offset = object_offset;
693        self.append_offset = object_offset
694            .checked_add(object_size)
695            .ok_or(JournalError::ObjectExceedsFileBounds)?;
696        self.num_written_objects += 1;
697
698        let header = journal_file.journal_header_mut();
699        let old_size = header
700            .header_size
701            .checked_add(header.arena_size)
702            .ok_or(JournalError::ObjectExceedsFileBounds)?;
703        if self.append_offset.get() > old_size {
704            let new_size = round_up_to_file_size_increment(self.append_offset.get())?;
705            header.arena_size = new_size
706                .checked_sub(header.header_size)
707                .ok_or(JournalError::ObjectExceedsFileBounds)?;
708        }
709
710        Ok(())
711    }
712
713    fn entry_added(
714        &mut self,
715        header: &mut JournalHeader,
716        entry_offset: NonZeroU64,
717        entry_seqnum: u64,
718        entry_boot_id: uuid::Uuid,
719        realtime: u64,
720        monotonic: u64,
721    ) {
722        header.n_objects += self.num_written_objects;
723        header.tail_object_offset = Some(self.tail_object_offset);
724
725        if header.head_entry_seqnum == 0 {
726            header.head_entry_seqnum = entry_seqnum;
727        }
728        if header.head_entry_realtime == 0 {
729            header.head_entry_realtime = realtime;
730        }
731        if self.first_entry_monotonic.is_none() {
732            self.first_entry_monotonic = Some(monotonic);
733        }
734
735        header.tail_entry_seqnum = entry_seqnum;
736        header.tail_entry_realtime = realtime;
737        header.tail_entry_monotonic = monotonic;
738        header.tail_entry_boot_id = *entry_boot_id.as_bytes();
739        header.tail_entry_offset = entry_offset.get();
740        header.n_entries += 1;
741
742        self.next_seqnum = entry_seqnum + 1;
743        self.num_written_objects = 0;
744    }
745
746    fn add_data(
747        &mut self,
748        journal_file: &mut JournalFile<MmapMut>,
749        field: EntryField<'_>,
750    ) -> Result<EntryItem> {
751        let payload = field.payload_parts();
752        let field_name = field.field_name().ok_or(JournalError::InvalidField)?;
753        let hash = journal_file.hash_parts(payload);
754        if let Some(data_offset) = journal_file.find_data_offset_parts(hash, payload)? {
755            return Ok(Self::entry_item(data_offset, hash));
756        }
757        self.add_new_data(journal_file, payload, field_name, hash)
758    }
759
760    fn entry_item(offset: NonZeroU64, hash: u64) -> EntryItem {
761        EntryItem { offset, hash }
762    }
763
764    fn add_new_data<'a>(
765        &mut self,
766        journal_file: &mut JournalFile<MmapMut>,
767        payload: PayloadParts<'a>,
768        field_name: &'a [u8],
769        hash: u64,
770    ) -> Result<EntryItem> {
771        let data_offset = self.write_new_data_object(journal_file, payload, hash)?;
772        self.publish_new_data_object(journal_file, data_offset, hash)?;
773        self.link_data_to_field(journal_file, data_offset, field_name)?;
774        Ok(Self::entry_item(data_offset, hash))
775    }
776
777    fn write_new_data_object<'a>(
778        &mut self,
779        journal_file: &mut JournalFile<MmapMut>,
780        payload: PayloadParts<'a>,
781        hash: u64,
782    ) -> Result<NonZeroU64> {
783        let data_offset = self.append_offset;
784        let stored_payload = self.stored_data_payload(payload);
785        self.ensure_data_object_fits(journal_file, data_offset, stored_payload.len() as u64)?;
786        let data_size = {
787            let mut data_guard =
788                journal_file.data_mut(data_offset, Some(stored_payload.len() as u64))?;
789            data_guard.header.hash = hash;
790            stored_payload.copy_to_data_object(&mut data_guard);
791            data_guard.header.object_header.flags = stored_payload.object_flags();
792            data_guard.header.object_header.aligned_size()
793        };
794        self.hmac_put_object(journal_file, data_offset.get(), ObjectType::Data)?;
795        self.object_added(journal_file, data_offset, data_size)?;
796        Ok(data_offset)
797    }
798
799    fn ensure_data_object_fits(
800        &self,
801        journal_file: &JournalFile<MmapMut>,
802        data_offset: NonZeroU64,
803        payload_size: u64,
804    ) -> Result<()> {
805        let is_compact = Self::is_compact(journal_file);
806        Self::ensure_compact_object_fits(
807            is_compact,
808            data_offset,
809            Self::data_object_size(is_compact, payload_size),
810        )
811    }
812
813    fn publish_new_data_object(
814        &mut self,
815        journal_file: &mut JournalFile<MmapMut>,
816        data_offset: NonZeroU64,
817        hash: u64,
818    ) -> Result<()> {
819        journal_file.data_hash_table_set_tail_offset(hash, data_offset)?;
820        Self::update_data_hash_chain_depth(journal_file, hash)?;
821        journal_file.journal_header_mut().n_data += 1;
822        Ok(())
823    }
824
825    fn link_data_to_field(
826        &mut self,
827        journal_file: &mut JournalFile<MmapMut>,
828        data_offset: NonZeroU64,
829        field_name: &[u8],
830    ) -> Result<()> {
831        let field_offset = self.add_field(journal_file, field_name)?;
832        let head_data_offset = {
833            let field_guard = journal_file.field_ref(field_offset)?;
834            field_guard.header.head_data_offset
835        };
836        {
837            let mut data_guard = journal_file.data_mut(data_offset, None)?;
838            data_guard.header.next_field_offset = head_data_offset;
839        }
840        let mut field_guard = journal_file.field_mut(field_offset, None)?;
841        field_guard.header.head_data_offset = Some(data_offset);
842        Ok(())
843    }
844
845    fn stored_data_payload<'a>(&self, payload: PayloadParts<'a>) -> StoredDataPayload<'a> {
846        if payload.len() >= self.compress_threshold {
847            let full_payload;
848            let payload_bytes = if let Some(raw) = payload.as_single_slice() {
849                raw
850            } else {
851                // Structured payloads need a contiguous buffer only when compression is
852                // enabled and the payload is large enough to attempt compression.
853                full_payload = payload.to_vec();
854                full_payload.as_slice()
855            };
856            match self.compression {
857                Compression::Zstd => {
858                    let compressed = ruzstd::encoding::compress_to_vec(
859                        Cursor::new(payload_bytes),
860                        ruzstd::encoding::CompressionLevel::Fastest,
861                    );
862                    let compressed = zstd_frame_with_content_size(compressed, payload_bytes.len());
863                    if compressed.len() < payload_bytes.len() {
864                        return StoredDataPayload::Compressed(
865                            compressed,
866                            ObjectFlags::CompressedZstd as u8,
867                        );
868                    }
869                }
870                Compression::Xz => {
871                    if payload_bytes.len() >= 80 {
872                        if let Ok(compressed) = xz_compress(payload_bytes) {
873                            if compressed.len() < payload_bytes.len() {
874                                return StoredDataPayload::Compressed(
875                                    compressed,
876                                    ObjectFlags::CompressedXz as u8,
877                                );
878                            }
879                        }
880                    }
881                }
882                Compression::Lz4 => {
883                    if payload_bytes.len() >= 9 {
884                        let compressed = lz4_compress(payload_bytes);
885                        if compressed.len() < payload_bytes.len() {
886                            return StoredDataPayload::Compressed(
887                                compressed,
888                                ObjectFlags::CompressedLz4 as u8,
889                            );
890                        }
891                    }
892                }
893                Compression::None => {}
894            }
895        }
896
897        StoredDataPayload::Uncompressed(payload)
898    }
899
900    fn add_field(
901        &mut self,
902        journal_file: &mut JournalFile<MmapMut>,
903        payload: &[u8],
904    ) -> Result<NonZeroU64> {
905        self.ensure_first_tag(journal_file)?;
906
907        if let Some(field_offset) = self.field_cache.get(payload) {
908            return Ok(field_offset);
909        }
910
911        let hash = journal_file.hash(payload);
912
913        match journal_file.find_field_offset(hash, payload)? {
914            Some(field_offset) => {
915                self.field_cache.insert(payload, field_offset);
916                Ok(field_offset)
917            }
918            None => {
919                // We will have to write the new field object at the current
920                // tail offset
921                let field_offset = self.append_offset;
922                let is_compact = Self::is_compact(journal_file);
923                Self::ensure_compact_object_fits(
924                    is_compact,
925                    field_offset,
926                    std::mem::size_of::<FieldObjectHeader>() as u64 + payload.len() as u64,
927                )?;
928                let field_size = {
929                    let mut field_guard =
930                        journal_file.field_mut(field_offset, Some(payload.len() as u64))?;
931
932                    field_guard.header.hash = hash;
933                    field_guard.set_payload(payload);
934                    field_guard.header.object_header.aligned_size()
935                };
936                self.hmac_put_object(journal_file, field_offset.get(), ObjectType::Field)?;
937                self.object_added(journal_file, field_offset, field_size)?;
938
939                // Update hash table
940                journal_file.field_hash_table_set_tail_offset(hash, field_offset)?;
941                let depth = Self::current_field_hash_chain_depth(journal_file, hash)?;
942                let max_depth = journal_file
943                    .journal_header_ref()
944                    .field_hash_chain_depth
945                    .max(depth);
946                journal_file.journal_header_mut().field_hash_chain_depth = max_depth;
947                journal_file.journal_header_mut().n_fields += 1;
948
949                self.field_cache.insert(payload, field_offset);
950
951                // Return the offset where we wrote the newly added data object
952                Ok(field_offset)
953            }
954        }
955    }
956}
957
958fn zstd_frame_with_content_size(frame: Vec<u8>, content_size: usize) -> Vec<u8> {
959    const ZSTD_MAGIC: [u8; 4] = [0x28, 0xb5, 0x2f, 0xfd];
960    const SINGLE_SEGMENT_FLAG: u8 = 1 << 5;
961    const CONTENT_CHECKSUM_FLAG: u8 = 1 << 2;
962
963    if frame.len() < 6 || frame[0..4] != ZSTD_MAGIC {
964        return frame;
965    }
966
967    let descriptor = frame[4];
968    let dictionary_id_flag = descriptor & 0x03;
969    let frame_content_size_flag = descriptor >> 6;
970    if dictionary_id_flag != 0
971        || frame_content_size_flag != 0
972        || (descriptor & SINGLE_SEGMENT_FLAG) != 0
973    {
974        return frame;
975    }
976
977    let (new_frame_content_size_flag, frame_content_size) = if content_size <= 255 {
978        (0u8, vec![content_size as u8])
979    } else if content_size <= 65_791 {
980        (1u8, ((content_size - 256) as u16).to_le_bytes().to_vec())
981    } else if u32::try_from(content_size).is_ok() {
982        (2u8, (content_size as u32).to_le_bytes().to_vec())
983    } else {
984        (3u8, (content_size as u64).to_le_bytes().to_vec())
985    };
986
987    let mut patched = Vec::with_capacity(frame.len() + frame_content_size.len() - 1);
988    patched.extend_from_slice(&frame[..4]);
989    patched.push(
990        (new_frame_content_size_flag << 6)
991            | SINGLE_SEGMENT_FLAG
992            | (descriptor & CONTENT_CHECKSUM_FLAG),
993    );
994    patched.extend_from_slice(&frame_content_size);
995    patched.extend_from_slice(&frame[6..]);
996    patched
997}
998
999fn xz_compress(payload: &[u8]) -> std::io::Result<Vec<u8>> {
1000    use lzma_rust2::{XzOptions, XzWriter};
1001    use std::io::Write;
1002
1003    let mut options = XzOptions::with_preset(0);
1004    options.set_check_sum_type(lzma_rust2::CheckType::None);
1005    let mut writer = XzWriter::new(Vec::new(), options)?;
1006    writer.write_all(payload)?;
1007    writer.finish()
1008}
1009
1010fn lz4_compress(payload: &[u8]) -> Vec<u8> {
1011    let compressed = lz4_flex::block::compress(payload);
1012    let mut out = Vec::with_capacity(8 + compressed.len());
1013    out.extend_from_slice(&(payload.len() as u64).to_le_bytes());
1014    out.extend_from_slice(&compressed);
1015    out
1016}
1017
1018#[cfg(test)]
1019#[path = "writer_tests.rs"]
1020mod tests;