Skip to main content

journal_core/file/
writer.rs

1use super::mmap::MmapMut;
2use crate::error::{JournalError, Result};
3use crate::file::{
4    Compression, DEFAULT_COMPRESS_THRESHOLD, DataObject, DataPayloadType, EntryObjectHeader,
5    FieldObjectHeader, HashableObjectMut, HeaderIncompatibleFlags, JournalFile, JournalHeader,
6    ObjectFlags, ObjectType, PayloadParts, hash::jenkins_hash64_parts,
7    normalize_compress_threshold,
8};
9use rustc_hash::FxHashMap;
10use std::io::Cursor;
11use std::num::NonZeroU64;
12
13pub(super) const OBJECT_ALIGNMENT: u64 = 8;
14pub(super) const JOURNAL_COMPACT_SIZE_MAX: u64 = u32::MAX as u64;
15const FILE_SIZE_INCREASE: u64 = 8 * 1024 * 1024;
16const FIELD_CACHE_MAX_ENTRIES: usize = 1024;
17const FIELD_CACHE_MAX_PAYLOAD_LEN: usize = 128;
18pub(super) fn round_up_to_file_size_increment(value: u64) -> Result<u64> {
19    value
20        .checked_add(FILE_SIZE_INCREASE - 1)
21        .map(|v| v & !(FILE_SIZE_INCREASE - 1))
22        .ok_or(JournalError::ObjectExceedsFileBounds)
23}
24
25#[derive(Debug, Clone, Copy)]
26pub(super) struct EntryItem {
27    pub(super) offset: NonZeroU64,
28    pub(super) hash: u64,
29}
30
31#[derive(Debug, Clone, Copy)]
32pub struct StructuredField<'a> {
33    /// Field name without the `=` separator.
34    pub name: &'a [u8],
35    /// Field value bytes. Values may contain NUL bytes and `=` bytes.
36    pub value: &'a [u8],
37}
38
39impl<'a> StructuredField<'a> {
40    /// Creates a structured journal field from a name and binary-safe value.
41    pub fn new(name: &'a [u8], value: &'a [u8]) -> Self {
42        Self { name, value }
43    }
44}
45
46#[derive(Debug, Clone, Copy)]
47pub enum EntryField<'a> {
48    /// Full `KEY=value` payload, matching systemd's low-level writer shape.
49    Raw(&'a [u8]),
50    /// Split field name and value, avoiding `KEY=value` reconstruction for
51    /// already-structured producers.
52    Structured(StructuredField<'a>),
53}
54
55impl<'a> EntryField<'a> {
56    /// Creates a raw full-field entry item from a `KEY=value` byte payload.
57    pub fn raw(payload: &'a [u8]) -> Self {
58        Self::Raw(payload)
59    }
60
61    /// Creates a structured entry item from a name and binary-safe value.
62    pub fn structured(name: &'a [u8], value: &'a [u8]) -> Self {
63        Self::Structured(StructuredField::new(name, value))
64    }
65
66    fn payload_parts(self) -> PayloadParts<'a> {
67        match self {
68            Self::Raw(payload) => PayloadParts::raw(payload),
69            Self::Structured(field) => PayloadParts::structured(field.name, field.value),
70        }
71    }
72
73    fn field_name(self) -> Option<&'a [u8]> {
74        match self {
75            Self::Raw(payload) => payload
76                .iter()
77                .position(|&b| b == b'=')
78                .map(|pos| &payload[..pos]),
79            Self::Structured(field) => Some(field.name),
80        }
81    }
82}
83
84#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
85pub enum FieldNamePolicy {
86    /// Trusted journald-compatible field names. Protected `_...` names are
87    /// allowed.
88    #[default]
89    Journald,
90    /// Journal DATA structure capability only. Stock systemd tooling
91    /// compatibility is not guaranteed for names outside JOURNALD.
92    Raw,
93    /// Untrusted application input accepted by journald. Invalid or protected
94    /// caller fields are dropped.
95    JournalApp,
96}
97
98#[derive(Debug, Clone, Copy, Default)]
99pub struct EntryWriteOptions {
100    /// Skips duplicate DATA reference elimination for this ENTRY.
101    ///
102    /// Set this only when the caller guarantees that the entry contains no
103    /// duplicate full `KEY=value` payloads after field-name policy filtering.
104    /// Offset sorting by DATA object offset is always performed regardless of
105    /// this flag.
106    /// Misuse can write duplicate DATA offsets into one ENTRY object. Keep the
107    /// default `false` unless the producer owns and enforces that invariant.
108    pub trusted_unique_payloads: bool,
109    /// Field-name validation policy for caller-provided fields.
110    pub field_name_policy: FieldNamePolicy,
111    /// Optional low-level ENTRY seqnum override.
112    ///
113    /// This is for exact journal regeneration and must be monotonically
114    /// increasing relative to previously written entries. Leave unset for the
115    /// normal systemd-style auto-incrementing sequence.
116    pub seqnum: Option<u64>,
117    /// Optional low-level ENTRY boot ID override.
118    ///
119    /// This is for exact journal regeneration of multi-boot files. Leave unset
120    /// for the normal writer-wide boot ID.
121    pub boot_id: Option<uuid::Uuid>,
122}
123
124impl EntryWriteOptions {
125    /// Enables or disables the trusted unique-payload fast path.
126    ///
127    /// See [`EntryWriteOptions::trusted_unique_payloads`] for the caller
128    /// invariant required before enabling this option.
129    pub fn trusted_unique_payloads(mut self, enabled: bool) -> Self {
130        self.trusted_unique_payloads = enabled;
131        self
132    }
133
134    /// Selects the field-name validation policy for caller-provided fields.
135    pub fn field_name_policy(mut self, policy: FieldNamePolicy) -> Self {
136        self.field_name_policy = policy;
137        self
138    }
139
140    /// Uses a caller-provided ENTRY seqnum for this entry.
141    pub fn seqnum(mut self, seqnum: u64) -> Self {
142        self.seqnum = Some(seqnum);
143        self
144    }
145
146    /// Uses a caller-provided ENTRY boot ID for this entry.
147    pub fn boot_id(mut self, boot_id: uuid::Uuid) -> Self {
148        self.boot_id = Some(boot_id);
149        self
150    }
151}
152
153fn is_journal_field_name_valid(field_name: &[u8], allow_protected: bool) -> bool {
154    if field_name.is_empty() || field_name.len() > 64 {
155        return false;
156    }
157    if field_name[0] == b'_' && !allow_protected {
158        return false;
159    }
160    if field_name[0].is_ascii_digit() {
161        return false;
162    }
163    field_name
164        .iter()
165        .all(|&b| b.is_ascii_uppercase() || b.is_ascii_digit() || b == b'_')
166}
167
168fn is_raw_field_name_valid(field_name: &[u8]) -> bool {
169    !field_name.is_empty() && !field_name.contains(&b'=')
170}
171
172fn accept_entry_field(field: EntryField<'_>, policy: FieldNamePolicy) -> Result<bool> {
173    let Some(field_name) = field.field_name() else {
174        return Err(JournalError::InvalidField);
175    };
176    let valid = match policy {
177        FieldNamePolicy::Raw => is_raw_field_name_valid(field_name),
178        FieldNamePolicy::Journald => is_journal_field_name_valid(field_name, true),
179        FieldNamePolicy::JournalApp => is_journal_field_name_valid(field_name, false),
180    };
181    if valid {
182        return Ok(true);
183    }
184    if matches!(policy, FieldNamePolicy::JournalApp) {
185        return Ok(false);
186    }
187    Err(JournalError::InvalidField)
188}
189
190#[derive(Debug)]
191struct FieldCache {
192    entries: FxHashMap<Box<[u8]>, NonZeroU64>,
193}
194
195impl FieldCache {
196    fn new() -> Self {
197        Self {
198            entries: FxHashMap::default(),
199        }
200    }
201
202    fn get(&self, payload: &[u8]) -> Option<NonZeroU64> {
203        self.entries.get(payload).copied()
204    }
205
206    fn insert(&mut self, payload: &[u8], offset: NonZeroU64) {
207        if payload.len() > FIELD_CACHE_MAX_PAYLOAD_LEN {
208            return;
209        }
210
211        if self.entries.len() >= FIELD_CACHE_MAX_ENTRIES && self.entries.get(payload).is_none() {
212            self.entries.clear();
213        }
214
215        self.entries
216            .insert(payload.to_vec().into_boxed_slice(), offset);
217    }
218
219    #[cfg(test)]
220    fn len(&self) -> usize {
221        self.entries.len()
222    }
223}
224
225enum StoredDataPayload<'a> {
226    Uncompressed(PayloadParts<'a>),
227    Compressed(Vec<u8>, u8),
228}
229
230impl StoredDataPayload<'_> {
231    fn len(&self) -> usize {
232        match self {
233            Self::Uncompressed(payload) => payload.len(),
234            Self::Compressed(payload, _) => payload.len(),
235        }
236    }
237
238    fn object_flags(&self) -> u8 {
239        match self {
240            Self::Uncompressed(_) => 0,
241            Self::Compressed(_, flags) => *flags,
242        }
243    }
244
245    fn copy_to_data_object(&self, data: &mut DataObject<&mut [u8]>) {
246        match self {
247            Self::Uncompressed(payload) => match &mut data.payload {
248                DataPayloadType::Regular(dst) => payload.copy_to_slice(dst),
249                DataPayloadType::Compact { payload: dst, .. } => payload.copy_to_slice(dst),
250            },
251            Self::Compressed(payload, _) => data.set_payload(payload),
252        }
253    }
254}
255
256pub struct JournalWriter {
257    pub(super) tail_object_offset: NonZeroU64,
258    pub(super) append_offset: NonZeroU64,
259    next_seqnum: u64,
260    num_written_objects: u64,
261    pub(super) first_tag_written: bool,
262    pub(super) entry_items: Vec<EntryItem>,
263    field_cache: FieldCache,
264    first_entry_monotonic: Option<u64>,
265    boot_id: uuid::Uuid,
266    compression: Compression,
267    compress_threshold: usize,
268    live_publish_every_entries: u64,
269    entries_since_live_publication: u64,
270    pub(super) seal: Option<crate::seal::SealState>,
271}
272
273impl JournalWriter {
274    /// Get current file size in bytes
275    pub fn current_file_size(&self) -> u64 {
276        self.append_offset.get()
277    }
278
279    /// Get the monotonic timestamp of the first entry written to this file
280    pub fn first_entry_monotonic(&self) -> Option<u64> {
281        self.first_entry_monotonic
282    }
283
284    /// Get the next sequence number that will be written
285    pub fn next_seqnum(&self) -> u64 {
286        self.next_seqnum
287    }
288
289    /// Get the boot ID for this writer
290    pub fn boot_id(&self) -> uuid::Uuid {
291        self.boot_id
292    }
293
294    /// Sets how often the writer explicitly publishes live-reader visibility.
295    ///
296    /// `1` is the default and matches systemd-style publication after every
297    /// appended entry. `0` disables this explicit publication; closed-file
298    /// verification and reads after sync/close are unchanged, but stock
299    /// follow-reader visibility while the writer is active is not guaranteed.
300    /// Values greater than `1` publish after every N appended entries.
301    pub fn set_live_publish_every_entries(&mut self, entries: u64) {
302        self.live_publish_every_entries = entries;
303        self.entries_since_live_publication = 0;
304    }
305
306    /// Returns the configured live-reader publication cadence.
307    ///
308    /// See [`JournalWriter::set_live_publish_every_entries`] for the meaning of
309    /// `0`, `1`, and larger values.
310    pub fn live_publish_every_entries(&self) -> u64 {
311        self.live_publish_every_entries
312    }
313
314    pub fn new(
315        journal_file: &mut JournalFile<MmapMut>,
316        next_seqnum: u64,
317        boot_id: uuid::Uuid,
318    ) -> Result<Self> {
319        let compression = match journal_file.journal_header_ref() {
320            header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedZstd) => {
321                Compression::Zstd
322            }
323            header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedXz) => {
324                Compression::Xz
325            }
326            header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedLz4) => {
327                Compression::Lz4
328            }
329            _ => Compression::None,
330        };
331
332        Self::new_with_compression(
333            journal_file,
334            next_seqnum,
335            boot_id,
336            compression,
337            DEFAULT_COMPRESS_THRESHOLD,
338        )
339    }
340
341    pub fn new_with_compression(
342        journal_file: &mut JournalFile<MmapMut>,
343        next_seqnum: u64,
344        boot_id: uuid::Uuid,
345        compression: Compression,
346        compress_threshold: usize,
347    ) -> Result<Self> {
348        let current_header_size = std::mem::size_of::<JournalHeader>() as u64;
349        let header = journal_file.journal_header_ref();
350        if !header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash)
351            || header.header_size < current_header_size
352        {
353            return Err(JournalError::UnsupportedJournalFile);
354        }
355
356        let append_offset = {
357            let header = journal_file.journal_header_ref();
358
359            let Some(tail_object_offset) = header.tail_object_offset else {
360                return Err(JournalError::InvalidMagicNumber);
361            };
362
363            let tail_object = journal_file.object_header_ref(tail_object_offset)?;
364
365            tail_object_offset.saturating_add(tail_object.size)
366        };
367
368        let seal = journal_file
369            .seal_options
370            .as_ref()
371            .map(|opts| crate::seal::SealState::new(opts))
372            .transpose()?;
373
374        let mut writer = Self {
375            tail_object_offset: journal_file
376                .journal_header_ref()
377                .tail_object_offset
378                .unwrap(),
379            append_offset,
380            next_seqnum,
381            num_written_objects: 0,
382            first_tag_written: false,
383            entry_items: Vec::with_capacity(128),
384            field_cache: FieldCache::new(),
385            first_entry_monotonic: None,
386            boot_id,
387            compression,
388            compress_threshold: normalize_compress_threshold(compress_threshold),
389            live_publish_every_entries: 1,
390            entries_since_live_publication: 0,
391            seal,
392        };
393
394        if writer.seal.is_some() && journal_file.journal_header_ref().n_tags == 0 {
395            writer.ensure_first_tag(journal_file)?;
396            {
397                let header = journal_file.journal_header_mut();
398                header.n_objects += writer.num_written_objects;
399                header.tail_object_offset = Some(writer.tail_object_offset);
400            }
401            writer.num_written_objects = 0;
402        }
403
404        Ok(writer)
405    }
406
407    /// Creates a successor writer for a new journal file
408    pub fn create_successor(&self, journal_file: &mut JournalFile<MmapMut>) -> Result<Self> {
409        Self::new_with_compression(
410            journal_file,
411            self.next_seqnum,
412            self.boot_id,
413            self.compression,
414            self.compress_threshold,
415        )
416    }
417
418    pub fn add_entry(
419        &mut self,
420        journal_file: &mut JournalFile<MmapMut>,
421        items: &[&[u8]],
422        realtime: u64,
423        monotonic: u64,
424    ) -> Result<()> {
425        self.add_entry_fields_with_options(
426            journal_file,
427            items.iter().copied().map(EntryField::raw),
428            realtime,
429            monotonic,
430            EntryWriteOptions::default(),
431        )
432    }
433
434    pub fn add_entry_structured(
435        &mut self,
436        journal_file: &mut JournalFile<MmapMut>,
437        fields: &[StructuredField<'_>],
438        realtime: u64,
439        monotonic: u64,
440    ) -> Result<()> {
441        self.add_entry_fields_with_options(
442            journal_file,
443            fields.iter().copied().map(EntryField::Structured),
444            realtime,
445            monotonic,
446            EntryWriteOptions::default(),
447        )
448    }
449
450    pub fn add_entry_structured_with_options(
451        &mut self,
452        journal_file: &mut JournalFile<MmapMut>,
453        fields: &[StructuredField<'_>],
454        realtime: u64,
455        monotonic: u64,
456        options: EntryWriteOptions,
457    ) -> Result<()> {
458        self.add_entry_fields_with_options(
459            journal_file,
460            fields.iter().copied().map(EntryField::Structured),
461            realtime,
462            monotonic,
463            options,
464        )
465    }
466
467    pub fn add_entry_fields<'a>(
468        &mut self,
469        journal_file: &mut JournalFile<MmapMut>,
470        fields: impl IntoIterator<Item = EntryField<'a>>,
471        realtime: u64,
472        monotonic: u64,
473    ) -> Result<()> {
474        self.add_entry_fields_with_options(
475            journal_file,
476            fields,
477            realtime,
478            monotonic,
479            EntryWriteOptions::default(),
480        )
481    }
482
483    pub fn add_entry_fields_with_options<'a>(
484        &mut self,
485        journal_file: &mut JournalFile<MmapMut>,
486        fields: impl IntoIterator<Item = EntryField<'a>>,
487        realtime: u64,
488        monotonic: u64,
489        options: EntryWriteOptions,
490    ) -> Result<()> {
491        self.ensure_keyed_append(journal_file)?;
492        let entry_seqnum = self.entry_seqnum_for_options(options)?;
493        let entry_boot_id = options.boot_id.unwrap_or(self.boot_id);
494        let xor_hash = self.prepare_entry_items(journal_file, fields, realtime, options)?;
495        let entry_offset = self.write_entry_object(
496            journal_file,
497            entry_seqnum,
498            entry_boot_id,
499            realtime,
500            monotonic,
501            xor_hash,
502        )?;
503        self.publish_entry_links(journal_file, entry_offset)?;
504        self.entry_added(
505            journal_file.journal_header_mut(),
506            entry_offset,
507            entry_seqnum,
508            entry_boot_id,
509            realtime,
510            monotonic,
511        );
512        self.publish_after_entry(journal_file)
513    }
514
515    fn ensure_keyed_append(&self, journal_file: &JournalFile<MmapMut>) -> Result<()> {
516        let header = journal_file.journal_header_ref();
517        if header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash) {
518            return Ok(());
519        }
520        Err(JournalError::UnsupportedJournalFile)
521    }
522
523    fn entry_seqnum_for_options(&self, options: EntryWriteOptions) -> Result<u64> {
524        let entry_seqnum = options.seqnum.unwrap_or(self.next_seqnum);
525        if entry_seqnum == 0 || entry_seqnum == u64::MAX || entry_seqnum < self.next_seqnum {
526            return Err(JournalError::InvalidField);
527        }
528        Ok(entry_seqnum)
529    }
530
531    fn prepare_entry_items<'a>(
532        &mut self,
533        journal_file: &mut JournalFile<MmapMut>,
534        fields: impl IntoIterator<Item = EntryField<'a>>,
535        realtime: u64,
536        options: EntryWriteOptions,
537    ) -> Result<u64> {
538        let mut xor_hash = 0;
539        self.entry_items.clear();
540        let mut publication_ready = false;
541        for field in fields {
542            if !accept_entry_field(field, options.field_name_policy)? {
543                continue;
544            }
545            self.ensure_entry_publication_ready(journal_file, realtime, &mut publication_ready)?;
546            xor_hash ^= self.add_entry_field_item(journal_file, field)?;
547        }
548        self.finish_entry_items(options.trusted_unique_payloads)?;
549        Ok(xor_hash)
550    }
551
552    fn ensure_entry_publication_ready(
553        &mut self,
554        journal_file: &mut JournalFile<MmapMut>,
555        realtime: u64,
556        publication_ready: &mut bool,
557    ) -> Result<()> {
558        if *publication_ready {
559            return Ok(());
560        }
561        self.ensure_first_tag(journal_file)?;
562        self.maybe_append_tag(journal_file, realtime)?;
563        *publication_ready = true;
564        Ok(())
565    }
566
567    fn add_entry_field_item(
568        &mut self,
569        journal_file: &mut JournalFile<MmapMut>,
570        field: EntryField<'_>,
571    ) -> Result<u64> {
572        let entry_item = self.add_data(journal_file, field)?;
573        self.entry_items.push(entry_item);
574        Ok(jenkins_hash64_parts(field.payload_parts().iter()))
575    }
576
577    fn finish_entry_items(&mut self, trusted_unique_payloads: bool) -> Result<()> {
578        if self.entry_items.is_empty() {
579            return Err(JournalError::InvalidField);
580        }
581        if !self.entry_items_are_sorted() {
582            self.entry_items
583                .sort_unstable_by(|a, b| a.offset.cmp(&b.offset));
584        }
585        if !trusted_unique_payloads {
586            self.entry_items.dedup_by(|a, b| a.offset == b.offset);
587        }
588        Ok(())
589    }
590
591    fn entry_items_are_sorted(&self) -> bool {
592        self.entry_items
593            .windows(2)
594            .all(|items| items[0].offset <= items[1].offset)
595    }
596
597    fn write_entry_object(
598        &mut self,
599        journal_file: &mut JournalFile<MmapMut>,
600        entry_seqnum: u64,
601        entry_boot_id: uuid::Uuid,
602        realtime: u64,
603        monotonic: u64,
604        xor_hash: u64,
605    ) -> Result<NonZeroU64> {
606        let entry_offset = self.append_offset;
607        let is_compact = Self::is_compact(journal_file);
608        let entry_payload_size = self.entry_items.len() as u64 * Self::entry_item_size(is_compact);
609        Self::ensure_compact_object_fits(
610            is_compact,
611            entry_offset,
612            std::mem::size_of::<EntryObjectHeader>() as u64 + entry_payload_size,
613        )?;
614        let entry_size = {
615            let size = Some(entry_payload_size);
616            let mut entry_guard = journal_file.entry_mut(entry_offset, size)?;
617
618            entry_guard.header.seqnum = entry_seqnum;
619            entry_guard.header.xor_hash = xor_hash;
620            entry_guard.header.boot_id = *entry_boot_id.as_bytes();
621            entry_guard.header.monotonic = monotonic;
622            entry_guard.header.realtime = realtime;
623
624            // set each entry item
625            for (index, entry_item) in self.entry_items.iter().enumerate() {
626                Self::ensure_compact_offset(is_compact, entry_item.offset)?;
627                let item_hash = (!is_compact).then_some(entry_item.hash);
628                entry_guard.items.set(index, entry_item.offset, item_hash);
629            }
630
631            entry_guard.header.object_header.aligned_size()
632        };
633        self.hmac_put_object(journal_file, entry_offset.get(), ObjectType::Entry)?;
634        self.object_added(journal_file, entry_offset, entry_size)?;
635        Ok(entry_offset)
636    }
637
638    fn publish_entry_links(
639        &mut self,
640        journal_file: &mut JournalFile<MmapMut>,
641        entry_offset: NonZeroU64,
642    ) -> Result<()> {
643        self.append_to_entry_array(journal_file, entry_offset)?;
644        for entry_item_index in 0..self.entry_items.len() {
645            self.link_data_to_entry(journal_file, entry_offset, entry_item_index)?;
646        }
647        Ok(())
648    }
649
650    fn publish_after_entry(&mut self, journal_file: &mut JournalFile<MmapMut>) -> Result<()> {
651        match self.live_publish_every_entries {
652            0 => Ok(()),
653            1 => journal_file.post_change(),
654            interval => {
655                self.entries_since_live_publication += 1;
656                if self.entries_since_live_publication >= interval {
657                    self.entries_since_live_publication = 0;
658                    journal_file.post_change()
659                } else {
660                    Ok(())
661                }
662            }
663        }
664    }
665
666    pub(super) fn object_added(
667        &mut self,
668        journal_file: &mut JournalFile<MmapMut>,
669        object_offset: NonZeroU64,
670        object_size: u64,
671    ) -> Result<()> {
672        self.tail_object_offset = object_offset;
673        self.append_offset = object_offset
674            .checked_add(object_size)
675            .ok_or(JournalError::ObjectExceedsFileBounds)?;
676        self.num_written_objects += 1;
677
678        let header = journal_file.journal_header_mut();
679        let old_size = header
680            .header_size
681            .checked_add(header.arena_size)
682            .ok_or(JournalError::ObjectExceedsFileBounds)?;
683        if self.append_offset.get() > old_size {
684            let new_size = round_up_to_file_size_increment(self.append_offset.get())?;
685            header.arena_size = new_size
686                .checked_sub(header.header_size)
687                .ok_or(JournalError::ObjectExceedsFileBounds)?;
688        }
689
690        Ok(())
691    }
692
693    fn entry_added(
694        &mut self,
695        header: &mut JournalHeader,
696        entry_offset: NonZeroU64,
697        entry_seqnum: u64,
698        entry_boot_id: uuid::Uuid,
699        realtime: u64,
700        monotonic: u64,
701    ) {
702        header.n_objects += self.num_written_objects;
703        header.tail_object_offset = Some(self.tail_object_offset);
704
705        if header.head_entry_seqnum == 0 {
706            header.head_entry_seqnum = entry_seqnum;
707        }
708        if header.head_entry_realtime == 0 {
709            header.head_entry_realtime = realtime;
710        }
711        if self.first_entry_monotonic.is_none() {
712            self.first_entry_monotonic = Some(monotonic);
713        }
714
715        header.tail_entry_seqnum = entry_seqnum;
716        header.tail_entry_realtime = realtime;
717        header.tail_entry_monotonic = monotonic;
718        header.tail_entry_boot_id = *entry_boot_id.as_bytes();
719        header.tail_entry_offset = entry_offset.get();
720        header.n_entries += 1;
721
722        self.next_seqnum = entry_seqnum + 1;
723        self.num_written_objects = 0;
724    }
725
726    fn add_data(
727        &mut self,
728        journal_file: &mut JournalFile<MmapMut>,
729        field: EntryField<'_>,
730    ) -> Result<EntryItem> {
731        let payload = field.payload_parts();
732        let field_name = field.field_name().ok_or(JournalError::InvalidField)?;
733        let hash = journal_file.hash_parts(payload);
734        if let Some(data_offset) = journal_file.find_data_offset_parts(hash, payload)? {
735            return Ok(Self::entry_item(data_offset, hash));
736        }
737        self.add_new_data(journal_file, payload, field_name, hash)
738    }
739
740    fn entry_item(offset: NonZeroU64, hash: u64) -> EntryItem {
741        EntryItem { offset, hash }
742    }
743
744    fn add_new_data<'a>(
745        &mut self,
746        journal_file: &mut JournalFile<MmapMut>,
747        payload: PayloadParts<'a>,
748        field_name: &'a [u8],
749        hash: u64,
750    ) -> Result<EntryItem> {
751        let data_offset = self.write_new_data_object(journal_file, payload, hash)?;
752        self.publish_new_data_object(journal_file, data_offset, hash)?;
753        self.link_data_to_field(journal_file, data_offset, field_name)?;
754        Ok(Self::entry_item(data_offset, hash))
755    }
756
757    fn write_new_data_object<'a>(
758        &mut self,
759        journal_file: &mut JournalFile<MmapMut>,
760        payload: PayloadParts<'a>,
761        hash: u64,
762    ) -> Result<NonZeroU64> {
763        let data_offset = self.append_offset;
764        let stored_payload = self.stored_data_payload(payload);
765        self.ensure_data_object_fits(journal_file, data_offset, stored_payload.len() as u64)?;
766        let data_size = {
767            let mut data_guard =
768                journal_file.data_mut(data_offset, Some(stored_payload.len() as u64))?;
769            data_guard.header.hash = hash;
770            stored_payload.copy_to_data_object(&mut data_guard);
771            data_guard.header.object_header.flags = stored_payload.object_flags();
772            data_guard.header.object_header.aligned_size()
773        };
774        self.hmac_put_object(journal_file, data_offset.get(), ObjectType::Data)?;
775        self.object_added(journal_file, data_offset, data_size)?;
776        Ok(data_offset)
777    }
778
779    fn ensure_data_object_fits(
780        &self,
781        journal_file: &JournalFile<MmapMut>,
782        data_offset: NonZeroU64,
783        payload_size: u64,
784    ) -> Result<()> {
785        let is_compact = Self::is_compact(journal_file);
786        Self::ensure_compact_object_fits(
787            is_compact,
788            data_offset,
789            Self::data_object_size(is_compact, payload_size),
790        )
791    }
792
793    fn publish_new_data_object(
794        &mut self,
795        journal_file: &mut JournalFile<MmapMut>,
796        data_offset: NonZeroU64,
797        hash: u64,
798    ) -> Result<()> {
799        journal_file.data_hash_table_set_tail_offset(hash, data_offset)?;
800        Self::update_data_hash_chain_depth(journal_file, hash)?;
801        journal_file.journal_header_mut().n_data += 1;
802        Ok(())
803    }
804
805    fn link_data_to_field(
806        &mut self,
807        journal_file: &mut JournalFile<MmapMut>,
808        data_offset: NonZeroU64,
809        field_name: &[u8],
810    ) -> Result<()> {
811        let field_offset = self.add_field(journal_file, field_name)?;
812        let head_data_offset = {
813            let field_guard = journal_file.field_ref(field_offset)?;
814            field_guard.header.head_data_offset
815        };
816        {
817            let mut data_guard = journal_file.data_mut(data_offset, None)?;
818            data_guard.header.next_field_offset = head_data_offset;
819        }
820        let mut field_guard = journal_file.field_mut(field_offset, None)?;
821        field_guard.header.head_data_offset = Some(data_offset);
822        Ok(())
823    }
824
825    fn stored_data_payload<'a>(&self, payload: PayloadParts<'a>) -> StoredDataPayload<'a> {
826        if payload.len() >= self.compress_threshold {
827            let full_payload;
828            let payload_bytes = if let Some(raw) = payload.as_single_slice() {
829                raw
830            } else {
831                // Structured payloads need a contiguous buffer only when compression is
832                // enabled and the payload is large enough to attempt compression.
833                full_payload = payload.to_vec();
834                full_payload.as_slice()
835            };
836            match self.compression {
837                Compression::Zstd => {
838                    let compressed = ruzstd::encoding::compress_to_vec(
839                        Cursor::new(payload_bytes),
840                        ruzstd::encoding::CompressionLevel::Fastest,
841                    );
842                    let compressed = zstd_frame_with_content_size(compressed, payload_bytes.len());
843                    if compressed.len() < payload_bytes.len() {
844                        return StoredDataPayload::Compressed(
845                            compressed,
846                            ObjectFlags::CompressedZstd as u8,
847                        );
848                    }
849                }
850                Compression::Xz => {
851                    if payload_bytes.len() >= 80 {
852                        if let Ok(compressed) = xz_compress(payload_bytes) {
853                            if compressed.len() < payload_bytes.len() {
854                                return StoredDataPayload::Compressed(
855                                    compressed,
856                                    ObjectFlags::CompressedXz as u8,
857                                );
858                            }
859                        }
860                    }
861                }
862                Compression::Lz4 => {
863                    if payload_bytes.len() >= 9 {
864                        let compressed = lz4_compress(payload_bytes);
865                        if compressed.len() < payload_bytes.len() {
866                            return StoredDataPayload::Compressed(
867                                compressed,
868                                ObjectFlags::CompressedLz4 as u8,
869                            );
870                        }
871                    }
872                }
873                Compression::None => {}
874            }
875        }
876
877        StoredDataPayload::Uncompressed(payload)
878    }
879
880    fn add_field(
881        &mut self,
882        journal_file: &mut JournalFile<MmapMut>,
883        payload: &[u8],
884    ) -> Result<NonZeroU64> {
885        self.ensure_first_tag(journal_file)?;
886
887        if let Some(field_offset) = self.field_cache.get(payload) {
888            return Ok(field_offset);
889        }
890
891        let hash = journal_file.hash(payload);
892
893        match journal_file.find_field_offset(hash, payload)? {
894            Some(field_offset) => {
895                self.field_cache.insert(payload, field_offset);
896                Ok(field_offset)
897            }
898            None => {
899                // We will have to write the new field object at the current
900                // tail offset
901                let field_offset = self.append_offset;
902                let is_compact = Self::is_compact(journal_file);
903                Self::ensure_compact_object_fits(
904                    is_compact,
905                    field_offset,
906                    std::mem::size_of::<FieldObjectHeader>() as u64 + payload.len() as u64,
907                )?;
908                let field_size = {
909                    let mut field_guard =
910                        journal_file.field_mut(field_offset, Some(payload.len() as u64))?;
911
912                    field_guard.header.hash = hash;
913                    field_guard.set_payload(payload);
914                    field_guard.header.object_header.aligned_size()
915                };
916                self.hmac_put_object(journal_file, field_offset.get(), ObjectType::Field)?;
917                self.object_added(journal_file, field_offset, field_size)?;
918
919                // Update hash table
920                journal_file.field_hash_table_set_tail_offset(hash, field_offset)?;
921                let depth = Self::current_field_hash_chain_depth(journal_file, hash)?;
922                let max_depth = journal_file
923                    .journal_header_ref()
924                    .field_hash_chain_depth
925                    .max(depth);
926                journal_file.journal_header_mut().field_hash_chain_depth = max_depth;
927                journal_file.journal_header_mut().n_fields += 1;
928
929                self.field_cache.insert(payload, field_offset);
930
931                // Return the offset where we wrote the newly added data object
932                Ok(field_offset)
933            }
934        }
935    }
936}
937
938fn zstd_frame_with_content_size(frame: Vec<u8>, content_size: usize) -> Vec<u8> {
939    const ZSTD_MAGIC: [u8; 4] = [0x28, 0xb5, 0x2f, 0xfd];
940    const SINGLE_SEGMENT_FLAG: u8 = 1 << 5;
941    const CONTENT_CHECKSUM_FLAG: u8 = 1 << 2;
942
943    if frame.len() < 6 || frame[0..4] != ZSTD_MAGIC {
944        return frame;
945    }
946
947    let descriptor = frame[4];
948    let dictionary_id_flag = descriptor & 0x03;
949    let frame_content_size_flag = descriptor >> 6;
950    if dictionary_id_flag != 0
951        || frame_content_size_flag != 0
952        || (descriptor & SINGLE_SEGMENT_FLAG) != 0
953    {
954        return frame;
955    }
956
957    let (new_frame_content_size_flag, frame_content_size) = if content_size <= 255 {
958        (0u8, vec![content_size as u8])
959    } else if content_size <= 65_791 {
960        (1u8, ((content_size - 256) as u16).to_le_bytes().to_vec())
961    } else if u32::try_from(content_size).is_ok() {
962        (2u8, (content_size as u32).to_le_bytes().to_vec())
963    } else {
964        (3u8, (content_size as u64).to_le_bytes().to_vec())
965    };
966
967    let mut patched = Vec::with_capacity(frame.len() + frame_content_size.len() - 1);
968    patched.extend_from_slice(&frame[..4]);
969    patched.push(
970        (new_frame_content_size_flag << 6)
971            | SINGLE_SEGMENT_FLAG
972            | (descriptor & CONTENT_CHECKSUM_FLAG),
973    );
974    patched.extend_from_slice(&frame_content_size);
975    patched.extend_from_slice(&frame[6..]);
976    patched
977}
978
979fn xz_compress(payload: &[u8]) -> std::io::Result<Vec<u8>> {
980    use lzma_rust2::{XzOptions, XzWriter};
981    use std::io::Write;
982
983    let mut options = XzOptions::with_preset(0);
984    options.set_check_sum_type(lzma_rust2::CheckType::None);
985    let mut writer = XzWriter::new(Vec::new(), options)?;
986    writer.write_all(payload)?;
987    writer.finish()
988}
989
990fn lz4_compress(payload: &[u8]) -> Vec<u8> {
991    let compressed = lz4_flex::block::compress(payload);
992    let mut out = Vec::with_capacity(8 + compressed.len());
993    out.extend_from_slice(&(payload.len() as u64).to_le_bytes());
994    out.extend_from_slice(&compressed);
995    out
996}
997
998#[cfg(test)]
999#[path = "writer_tests.rs"]
1000mod tests;