1use super::mmap::MmapMut;
2use crate::error::{JournalError, Result};
3use crate::file::{
4 Compression, DEFAULT_COMPRESS_THRESHOLD, DataObject, DataPayloadType, EntryObjectHeader,
5 FieldObjectHeader, HashableObjectMut, HeaderIncompatibleFlags, JournalFile, JournalHeader,
6 ObjectFlags, ObjectType, PayloadParts, hash::jenkins_hash64_parts,
7 normalize_compress_threshold,
8};
9use rustc_hash::FxHashMap;
10use std::io::Cursor;
11use std::num::NonZeroU64;
12
13pub(super) const OBJECT_ALIGNMENT: u64 = 8;
14pub(super) const JOURNAL_COMPACT_SIZE_MAX: u64 = u32::MAX as u64;
15const FILE_SIZE_INCREASE: u64 = 8 * 1024 * 1024;
16const FIELD_CACHE_MAX_ENTRIES: usize = 1024;
17const FIELD_CACHE_MAX_PAYLOAD_LEN: usize = 128;
18pub(super) fn round_up_to_file_size_increment(value: u64) -> Result<u64> {
19 value
20 .checked_add(FILE_SIZE_INCREASE - 1)
21 .map(|v| v & !(FILE_SIZE_INCREASE - 1))
22 .ok_or(JournalError::ObjectExceedsFileBounds)
23}
24
25#[derive(Debug, Clone, Copy)]
26pub(super) struct EntryItem {
27 pub(super) offset: NonZeroU64,
28 pub(super) hash: u64,
29}
30
31#[derive(Debug, Clone, Copy)]
32pub struct StructuredField<'a> {
33 pub name: &'a [u8],
35 pub value: &'a [u8],
37}
38
39impl<'a> StructuredField<'a> {
40 pub fn new(name: &'a [u8], value: &'a [u8]) -> Self {
42 Self { name, value }
43 }
44}
45
46#[derive(Debug, Clone, Copy)]
47pub enum EntryField<'a> {
48 Raw(&'a [u8]),
50 Structured(StructuredField<'a>),
53}
54
55impl<'a> EntryField<'a> {
56 pub fn raw(payload: &'a [u8]) -> Self {
58 Self::Raw(payload)
59 }
60
61 pub fn structured(name: &'a [u8], value: &'a [u8]) -> Self {
63 Self::Structured(StructuredField::new(name, value))
64 }
65
66 fn payload_parts(self) -> PayloadParts<'a> {
67 match self {
68 Self::Raw(payload) => PayloadParts::raw(payload),
69 Self::Structured(field) => PayloadParts::structured(field.name, field.value),
70 }
71 }
72
73 fn field_name(self) -> Option<&'a [u8]> {
74 match self {
75 Self::Raw(payload) => payload
76 .iter()
77 .position(|&b| b == b'=')
78 .map(|pos| &payload[..pos]),
79 Self::Structured(field) => Some(field.name),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
85pub enum FieldNamePolicy {
86 #[default]
89 Journald,
90 Raw,
93 JournalApp,
96}
97
98#[derive(Debug, Clone, Copy, Default)]
99pub struct EntryWriteOptions {
100 pub trusted_unique_payloads: bool,
109 pub field_name_policy: FieldNamePolicy,
111 pub seqnum: Option<u64>,
117 pub boot_id: Option<uuid::Uuid>,
122}
123
124impl EntryWriteOptions {
125 pub fn trusted_unique_payloads(mut self, enabled: bool) -> Self {
130 self.trusted_unique_payloads = enabled;
131 self
132 }
133
134 pub fn field_name_policy(mut self, policy: FieldNamePolicy) -> Self {
136 self.field_name_policy = policy;
137 self
138 }
139
140 pub fn seqnum(mut self, seqnum: u64) -> Self {
142 self.seqnum = Some(seqnum);
143 self
144 }
145
146 pub fn boot_id(mut self, boot_id: uuid::Uuid) -> Self {
148 self.boot_id = Some(boot_id);
149 self
150 }
151}
152
153fn is_journal_field_name_valid(field_name: &[u8], allow_protected: bool) -> bool {
154 if field_name.is_empty() || field_name.len() > 64 {
155 return false;
156 }
157 if field_name[0] == b'_' && !allow_protected {
158 return false;
159 }
160 if field_name[0].is_ascii_digit() {
161 return false;
162 }
163 field_name
164 .iter()
165 .all(|&b| b.is_ascii_uppercase() || b.is_ascii_digit() || b == b'_')
166}
167
168fn is_raw_field_name_valid(field_name: &[u8]) -> bool {
169 !field_name.is_empty() && !field_name.contains(&b'=')
170}
171
172fn accept_entry_field(field: EntryField<'_>, policy: FieldNamePolicy) -> Result<bool> {
173 let Some(field_name) = field.field_name() else {
174 return Err(JournalError::InvalidField);
175 };
176 let valid = match policy {
177 FieldNamePolicy::Raw => is_raw_field_name_valid(field_name),
178 FieldNamePolicy::Journald => is_journal_field_name_valid(field_name, true),
179 FieldNamePolicy::JournalApp => is_journal_field_name_valid(field_name, false),
180 };
181 if valid {
182 return Ok(true);
183 }
184 if matches!(policy, FieldNamePolicy::JournalApp) {
185 return Ok(false);
186 }
187 Err(JournalError::InvalidField)
188}
189
190#[derive(Debug)]
191struct FieldCache {
192 entries: FxHashMap<Box<[u8]>, NonZeroU64>,
193}
194
195impl FieldCache {
196 fn new() -> Self {
197 Self {
198 entries: FxHashMap::default(),
199 }
200 }
201
202 fn get(&self, payload: &[u8]) -> Option<NonZeroU64> {
203 self.entries.get(payload).copied()
204 }
205
206 fn insert(&mut self, payload: &[u8], offset: NonZeroU64) {
207 if payload.len() > FIELD_CACHE_MAX_PAYLOAD_LEN {
208 return;
209 }
210
211 if self.entries.len() >= FIELD_CACHE_MAX_ENTRIES && self.entries.get(payload).is_none() {
212 self.entries.clear();
213 }
214
215 self.entries
216 .insert(payload.to_vec().into_boxed_slice(), offset);
217 }
218
219 #[cfg(test)]
220 fn len(&self) -> usize {
221 self.entries.len()
222 }
223}
224
225enum StoredDataPayload<'a> {
226 Uncompressed(PayloadParts<'a>),
227 Compressed(Vec<u8>, u8),
228}
229
230impl StoredDataPayload<'_> {
231 fn len(&self) -> usize {
232 match self {
233 Self::Uncompressed(payload) => payload.len(),
234 Self::Compressed(payload, _) => payload.len(),
235 }
236 }
237
238 fn object_flags(&self) -> u8 {
239 match self {
240 Self::Uncompressed(_) => 0,
241 Self::Compressed(_, flags) => *flags,
242 }
243 }
244
245 fn copy_to_data_object(&self, data: &mut DataObject<&mut [u8]>) {
246 match self {
247 Self::Uncompressed(payload) => match &mut data.payload {
248 DataPayloadType::Regular(dst) => payload.copy_to_slice(dst),
249 DataPayloadType::Compact { payload: dst, .. } => payload.copy_to_slice(dst),
250 },
251 Self::Compressed(payload, _) => data.set_payload(payload),
252 }
253 }
254}
255
256pub struct JournalWriter {
257 pub(super) tail_object_offset: NonZeroU64,
258 pub(super) append_offset: NonZeroU64,
259 next_seqnum: u64,
260 num_written_objects: u64,
261 pub(super) first_tag_written: bool,
262 pub(super) entry_items: Vec<EntryItem>,
263 field_cache: FieldCache,
264 first_entry_monotonic: Option<u64>,
265 boot_id: uuid::Uuid,
266 compression: Compression,
267 compress_threshold: usize,
268 live_publish_every_entries: u64,
269 entries_since_live_publication: u64,
270 pub(super) seal: Option<crate::seal::SealState>,
271}
272
273impl JournalWriter {
274 pub fn current_file_size(&self) -> u64 {
276 self.append_offset.get()
277 }
278
279 pub fn first_entry_monotonic(&self) -> Option<u64> {
281 self.first_entry_monotonic
282 }
283
284 pub fn next_seqnum(&self) -> u64 {
286 self.next_seqnum
287 }
288
289 pub fn boot_id(&self) -> uuid::Uuid {
291 self.boot_id
292 }
293
294 pub fn set_live_publish_every_entries(&mut self, entries: u64) {
302 self.live_publish_every_entries = entries;
303 self.entries_since_live_publication = 0;
304 }
305
306 pub fn live_publish_every_entries(&self) -> u64 {
311 self.live_publish_every_entries
312 }
313
314 pub fn new(
315 journal_file: &mut JournalFile<MmapMut>,
316 next_seqnum: u64,
317 boot_id: uuid::Uuid,
318 ) -> Result<Self> {
319 let compression = match journal_file.journal_header_ref() {
320 header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedZstd) => {
321 Compression::Zstd
322 }
323 header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedXz) => {
324 Compression::Xz
325 }
326 header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedLz4) => {
327 Compression::Lz4
328 }
329 _ => Compression::None,
330 };
331
332 Self::new_with_compression(
333 journal_file,
334 next_seqnum,
335 boot_id,
336 compression,
337 DEFAULT_COMPRESS_THRESHOLD,
338 )
339 }
340
341 pub fn new_with_compression(
342 journal_file: &mut JournalFile<MmapMut>,
343 next_seqnum: u64,
344 boot_id: uuid::Uuid,
345 compression: Compression,
346 compress_threshold: usize,
347 ) -> Result<Self> {
348 let current_header_size = std::mem::size_of::<JournalHeader>() as u64;
349 let header = journal_file.journal_header_ref();
350 if !header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash)
351 || header.header_size < current_header_size
352 {
353 return Err(JournalError::UnsupportedJournalFile);
354 }
355
356 let append_offset = {
357 let header = journal_file.journal_header_ref();
358
359 let Some(tail_object_offset) = header.tail_object_offset else {
360 return Err(JournalError::InvalidMagicNumber);
361 };
362
363 let tail_object = journal_file.object_header_ref(tail_object_offset)?;
364
365 tail_object_offset.saturating_add(tail_object.size)
366 };
367
368 let seal = journal_file
369 .seal_options
370 .as_ref()
371 .map(|opts| crate::seal::SealState::new(opts))
372 .transpose()?;
373
374 let mut writer = Self {
375 tail_object_offset: journal_file
376 .journal_header_ref()
377 .tail_object_offset
378 .unwrap(),
379 append_offset,
380 next_seqnum,
381 num_written_objects: 0,
382 first_tag_written: false,
383 entry_items: Vec::with_capacity(128),
384 field_cache: FieldCache::new(),
385 first_entry_monotonic: None,
386 boot_id,
387 compression,
388 compress_threshold: normalize_compress_threshold(compress_threshold),
389 live_publish_every_entries: 1,
390 entries_since_live_publication: 0,
391 seal,
392 };
393
394 if writer.seal.is_some() && journal_file.journal_header_ref().n_tags == 0 {
395 writer.ensure_first_tag(journal_file)?;
396 {
397 let header = journal_file.journal_header_mut();
398 header.n_objects += writer.num_written_objects;
399 header.tail_object_offset = Some(writer.tail_object_offset);
400 }
401 writer.num_written_objects = 0;
402 }
403
404 Ok(writer)
405 }
406
407 pub fn create_successor(&self, journal_file: &mut JournalFile<MmapMut>) -> Result<Self> {
409 Self::new_with_compression(
410 journal_file,
411 self.next_seqnum,
412 self.boot_id,
413 self.compression,
414 self.compress_threshold,
415 )
416 }
417
418 pub fn add_entry(
419 &mut self,
420 journal_file: &mut JournalFile<MmapMut>,
421 items: &[&[u8]],
422 realtime: u64,
423 monotonic: u64,
424 ) -> Result<()> {
425 self.add_entry_fields_with_options(
426 journal_file,
427 items.iter().copied().map(EntryField::raw),
428 realtime,
429 monotonic,
430 EntryWriteOptions::default(),
431 )
432 }
433
434 pub fn add_entry_structured(
435 &mut self,
436 journal_file: &mut JournalFile<MmapMut>,
437 fields: &[StructuredField<'_>],
438 realtime: u64,
439 monotonic: u64,
440 ) -> Result<()> {
441 self.add_entry_fields_with_options(
442 journal_file,
443 fields.iter().copied().map(EntryField::Structured),
444 realtime,
445 monotonic,
446 EntryWriteOptions::default(),
447 )
448 }
449
450 pub fn add_entry_structured_with_options(
451 &mut self,
452 journal_file: &mut JournalFile<MmapMut>,
453 fields: &[StructuredField<'_>],
454 realtime: u64,
455 monotonic: u64,
456 options: EntryWriteOptions,
457 ) -> Result<()> {
458 self.add_entry_fields_with_options(
459 journal_file,
460 fields.iter().copied().map(EntryField::Structured),
461 realtime,
462 monotonic,
463 options,
464 )
465 }
466
467 pub fn add_entry_fields<'a>(
468 &mut self,
469 journal_file: &mut JournalFile<MmapMut>,
470 fields: impl IntoIterator<Item = EntryField<'a>>,
471 realtime: u64,
472 monotonic: u64,
473 ) -> Result<()> {
474 self.add_entry_fields_with_options(
475 journal_file,
476 fields,
477 realtime,
478 monotonic,
479 EntryWriteOptions::default(),
480 )
481 }
482
483 pub fn add_entry_fields_with_options<'a>(
484 &mut self,
485 journal_file: &mut JournalFile<MmapMut>,
486 fields: impl IntoIterator<Item = EntryField<'a>>,
487 realtime: u64,
488 monotonic: u64,
489 options: EntryWriteOptions,
490 ) -> Result<()> {
491 self.ensure_keyed_append(journal_file)?;
492 let entry_seqnum = self.entry_seqnum_for_options(options)?;
493 let entry_boot_id = options.boot_id.unwrap_or(self.boot_id);
494 let monotonic = self.clamp_same_boot_monotonic(journal_file, entry_boot_id, monotonic)?;
495 let xor_hash = self.prepare_entry_items(journal_file, fields, realtime, options)?;
496 let entry_offset = self.write_entry_object(
497 journal_file,
498 entry_seqnum,
499 entry_boot_id,
500 realtime,
501 monotonic,
502 xor_hash,
503 )?;
504 self.publish_entry_links(journal_file, entry_offset)?;
505 self.entry_added(
506 journal_file.journal_header_mut(),
507 entry_offset,
508 entry_seqnum,
509 entry_boot_id,
510 realtime,
511 monotonic,
512 );
513 self.publish_after_entry(journal_file)
514 }
515
516 fn ensure_keyed_append(&self, journal_file: &JournalFile<MmapMut>) -> Result<()> {
517 let header = journal_file.journal_header_ref();
518 if header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash) {
519 return Ok(());
520 }
521 Err(JournalError::UnsupportedJournalFile)
522 }
523
524 fn entry_seqnum_for_options(&self, options: EntryWriteOptions) -> Result<u64> {
525 let entry_seqnum = options.seqnum.unwrap_or(self.next_seqnum);
526 if entry_seqnum == 0 || entry_seqnum == u64::MAX || entry_seqnum < self.next_seqnum {
527 return Err(JournalError::InvalidField);
528 }
529 Ok(entry_seqnum)
530 }
531
532 fn clamp_same_boot_monotonic(
533 &self,
534 journal_file: &JournalFile<MmapMut>,
535 entry_boot_id: uuid::Uuid,
536 monotonic: u64,
537 ) -> Result<u64> {
538 let header = journal_file.journal_header_ref();
539 if header.n_entries == 0
540 || header.tail_entry_boot_id != *entry_boot_id.as_bytes()
541 || monotonic > header.tail_entry_monotonic
542 {
543 return Ok(monotonic);
544 }
545 header
546 .tail_entry_monotonic
547 .checked_add(1)
548 .ok_or(JournalError::InvalidField)
549 }
550
551 fn prepare_entry_items<'a>(
552 &mut self,
553 journal_file: &mut JournalFile<MmapMut>,
554 fields: impl IntoIterator<Item = EntryField<'a>>,
555 realtime: u64,
556 options: EntryWriteOptions,
557 ) -> Result<u64> {
558 let mut xor_hash = 0;
559 self.entry_items.clear();
560 let mut publication_ready = false;
561 for field in fields {
562 if !accept_entry_field(field, options.field_name_policy)? {
563 continue;
564 }
565 self.ensure_entry_publication_ready(journal_file, realtime, &mut publication_ready)?;
566 xor_hash ^= self.add_entry_field_item(journal_file, field)?;
567 }
568 self.finish_entry_items(options.trusted_unique_payloads)?;
569 Ok(xor_hash)
570 }
571
572 fn ensure_entry_publication_ready(
573 &mut self,
574 journal_file: &mut JournalFile<MmapMut>,
575 realtime: u64,
576 publication_ready: &mut bool,
577 ) -> Result<()> {
578 if *publication_ready {
579 return Ok(());
580 }
581 self.ensure_first_tag(journal_file)?;
582 self.maybe_append_tag(journal_file, realtime)?;
583 *publication_ready = true;
584 Ok(())
585 }
586
587 fn add_entry_field_item(
588 &mut self,
589 journal_file: &mut JournalFile<MmapMut>,
590 field: EntryField<'_>,
591 ) -> Result<u64> {
592 let entry_item = self.add_data(journal_file, field)?;
593 self.entry_items.push(entry_item);
594 Ok(jenkins_hash64_parts(field.payload_parts().iter()))
595 }
596
597 fn finish_entry_items(&mut self, trusted_unique_payloads: bool) -> Result<()> {
598 if self.entry_items.is_empty() {
599 return Err(JournalError::InvalidField);
600 }
601 if !self.entry_items_are_sorted() {
602 self.entry_items
603 .sort_unstable_by(|a, b| a.offset.cmp(&b.offset));
604 }
605 if !trusted_unique_payloads {
606 self.entry_items.dedup_by(|a, b| a.offset == b.offset);
607 }
608 Ok(())
609 }
610
611 fn entry_items_are_sorted(&self) -> bool {
612 self.entry_items
613 .windows(2)
614 .all(|items| items[0].offset <= items[1].offset)
615 }
616
617 fn write_entry_object(
618 &mut self,
619 journal_file: &mut JournalFile<MmapMut>,
620 entry_seqnum: u64,
621 entry_boot_id: uuid::Uuid,
622 realtime: u64,
623 monotonic: u64,
624 xor_hash: u64,
625 ) -> Result<NonZeroU64> {
626 let entry_offset = self.append_offset;
627 let is_compact = Self::is_compact(journal_file);
628 let entry_payload_size = self.entry_items.len() as u64 * Self::entry_item_size(is_compact);
629 Self::ensure_compact_object_fits(
630 is_compact,
631 entry_offset,
632 std::mem::size_of::<EntryObjectHeader>() as u64 + entry_payload_size,
633 )?;
634 let entry_size = {
635 let size = Some(entry_payload_size);
636 let mut entry_guard = journal_file.entry_mut(entry_offset, size)?;
637
638 entry_guard.header.seqnum = entry_seqnum;
639 entry_guard.header.xor_hash = xor_hash;
640 entry_guard.header.boot_id = *entry_boot_id.as_bytes();
641 entry_guard.header.monotonic = monotonic;
642 entry_guard.header.realtime = realtime;
643
644 for (index, entry_item) in self.entry_items.iter().enumerate() {
646 Self::ensure_compact_offset(is_compact, entry_item.offset)?;
647 let item_hash = (!is_compact).then_some(entry_item.hash);
648 entry_guard.items.set(index, entry_item.offset, item_hash);
649 }
650
651 entry_guard.header.object_header.aligned_size()
652 };
653 self.hmac_put_object(journal_file, entry_offset.get(), ObjectType::Entry)?;
654 self.object_added(journal_file, entry_offset, entry_size)?;
655 Ok(entry_offset)
656 }
657
658 fn publish_entry_links(
659 &mut self,
660 journal_file: &mut JournalFile<MmapMut>,
661 entry_offset: NonZeroU64,
662 ) -> Result<()> {
663 self.append_to_entry_array(journal_file, entry_offset)?;
664 for entry_item_index in 0..self.entry_items.len() {
665 self.link_data_to_entry(journal_file, entry_offset, entry_item_index)?;
666 }
667 Ok(())
668 }
669
670 fn publish_after_entry(&mut self, journal_file: &mut JournalFile<MmapMut>) -> Result<()> {
671 match self.live_publish_every_entries {
672 0 => Ok(()),
673 1 => journal_file.post_change(),
674 interval => {
675 self.entries_since_live_publication += 1;
676 if self.entries_since_live_publication >= interval {
677 self.entries_since_live_publication = 0;
678 journal_file.post_change()
679 } else {
680 Ok(())
681 }
682 }
683 }
684 }
685
686 pub(super) fn object_added(
687 &mut self,
688 journal_file: &mut JournalFile<MmapMut>,
689 object_offset: NonZeroU64,
690 object_size: u64,
691 ) -> Result<()> {
692 self.tail_object_offset = object_offset;
693 self.append_offset = object_offset
694 .checked_add(object_size)
695 .ok_or(JournalError::ObjectExceedsFileBounds)?;
696 self.num_written_objects += 1;
697
698 let header = journal_file.journal_header_mut();
699 let old_size = header
700 .header_size
701 .checked_add(header.arena_size)
702 .ok_or(JournalError::ObjectExceedsFileBounds)?;
703 if self.append_offset.get() > old_size {
704 let new_size = round_up_to_file_size_increment(self.append_offset.get())?;
705 header.arena_size = new_size
706 .checked_sub(header.header_size)
707 .ok_or(JournalError::ObjectExceedsFileBounds)?;
708 }
709
710 Ok(())
711 }
712
713 fn entry_added(
714 &mut self,
715 header: &mut JournalHeader,
716 entry_offset: NonZeroU64,
717 entry_seqnum: u64,
718 entry_boot_id: uuid::Uuid,
719 realtime: u64,
720 monotonic: u64,
721 ) {
722 header.n_objects += self.num_written_objects;
723 header.tail_object_offset = Some(self.tail_object_offset);
724
725 if header.head_entry_seqnum == 0 {
726 header.head_entry_seqnum = entry_seqnum;
727 }
728 if header.head_entry_realtime == 0 {
729 header.head_entry_realtime = realtime;
730 }
731 if self.first_entry_monotonic.is_none() {
732 self.first_entry_monotonic = Some(monotonic);
733 }
734
735 header.tail_entry_seqnum = entry_seqnum;
736 header.tail_entry_realtime = realtime;
737 header.tail_entry_monotonic = monotonic;
738 header.tail_entry_boot_id = *entry_boot_id.as_bytes();
739 header.tail_entry_offset = entry_offset.get();
740 header.n_entries += 1;
741
742 self.next_seqnum = entry_seqnum + 1;
743 self.num_written_objects = 0;
744 }
745
746 fn add_data(
747 &mut self,
748 journal_file: &mut JournalFile<MmapMut>,
749 field: EntryField<'_>,
750 ) -> Result<EntryItem> {
751 let payload = field.payload_parts();
752 let field_name = field.field_name().ok_or(JournalError::InvalidField)?;
753 let hash = journal_file.hash_parts(payload);
754 if let Some(data_offset) = journal_file.find_data_offset_parts(hash, payload)? {
755 return Ok(Self::entry_item(data_offset, hash));
756 }
757 self.add_new_data(journal_file, payload, field_name, hash)
758 }
759
760 fn entry_item(offset: NonZeroU64, hash: u64) -> EntryItem {
761 EntryItem { offset, hash }
762 }
763
764 fn add_new_data<'a>(
765 &mut self,
766 journal_file: &mut JournalFile<MmapMut>,
767 payload: PayloadParts<'a>,
768 field_name: &'a [u8],
769 hash: u64,
770 ) -> Result<EntryItem> {
771 let data_offset = self.write_new_data_object(journal_file, payload, hash)?;
772 self.publish_new_data_object(journal_file, data_offset, hash)?;
773 self.link_data_to_field(journal_file, data_offset, field_name)?;
774 Ok(Self::entry_item(data_offset, hash))
775 }
776
777 fn write_new_data_object<'a>(
778 &mut self,
779 journal_file: &mut JournalFile<MmapMut>,
780 payload: PayloadParts<'a>,
781 hash: u64,
782 ) -> Result<NonZeroU64> {
783 let data_offset = self.append_offset;
784 let stored_payload = self.stored_data_payload(payload);
785 self.ensure_data_object_fits(journal_file, data_offset, stored_payload.len() as u64)?;
786 let data_size = {
787 let mut data_guard =
788 journal_file.data_mut(data_offset, Some(stored_payload.len() as u64))?;
789 data_guard.header.hash = hash;
790 stored_payload.copy_to_data_object(&mut data_guard);
791 data_guard.header.object_header.flags = stored_payload.object_flags();
792 data_guard.header.object_header.aligned_size()
793 };
794 self.hmac_put_object(journal_file, data_offset.get(), ObjectType::Data)?;
795 self.object_added(journal_file, data_offset, data_size)?;
796 Ok(data_offset)
797 }
798
799 fn ensure_data_object_fits(
800 &self,
801 journal_file: &JournalFile<MmapMut>,
802 data_offset: NonZeroU64,
803 payload_size: u64,
804 ) -> Result<()> {
805 let is_compact = Self::is_compact(journal_file);
806 Self::ensure_compact_object_fits(
807 is_compact,
808 data_offset,
809 Self::data_object_size(is_compact, payload_size),
810 )
811 }
812
813 fn publish_new_data_object(
814 &mut self,
815 journal_file: &mut JournalFile<MmapMut>,
816 data_offset: NonZeroU64,
817 hash: u64,
818 ) -> Result<()> {
819 journal_file.data_hash_table_set_tail_offset(hash, data_offset)?;
820 Self::update_data_hash_chain_depth(journal_file, hash)?;
821 journal_file.journal_header_mut().n_data += 1;
822 Ok(())
823 }
824
825 fn link_data_to_field(
826 &mut self,
827 journal_file: &mut JournalFile<MmapMut>,
828 data_offset: NonZeroU64,
829 field_name: &[u8],
830 ) -> Result<()> {
831 let field_offset = self.add_field(journal_file, field_name)?;
832 let head_data_offset = {
833 let field_guard = journal_file.field_ref(field_offset)?;
834 field_guard.header.head_data_offset
835 };
836 {
837 let mut data_guard = journal_file.data_mut(data_offset, None)?;
838 data_guard.header.next_field_offset = head_data_offset;
839 }
840 let mut field_guard = journal_file.field_mut(field_offset, None)?;
841 field_guard.header.head_data_offset = Some(data_offset);
842 Ok(())
843 }
844
845 fn stored_data_payload<'a>(&self, payload: PayloadParts<'a>) -> StoredDataPayload<'a> {
846 if payload.len() >= self.compress_threshold {
847 let full_payload;
848 let payload_bytes = if let Some(raw) = payload.as_single_slice() {
849 raw
850 } else {
851 full_payload = payload.to_vec();
854 full_payload.as_slice()
855 };
856 match self.compression {
857 Compression::Zstd => {
858 let compressed = ruzstd::encoding::compress_to_vec(
859 Cursor::new(payload_bytes),
860 ruzstd::encoding::CompressionLevel::Fastest,
861 );
862 let compressed = zstd_frame_with_content_size(compressed, payload_bytes.len());
863 if compressed.len() < payload_bytes.len() {
864 return StoredDataPayload::Compressed(
865 compressed,
866 ObjectFlags::CompressedZstd as u8,
867 );
868 }
869 }
870 Compression::Xz => {
871 if payload_bytes.len() >= 80 {
872 if let Ok(compressed) = xz_compress(payload_bytes) {
873 if compressed.len() < payload_bytes.len() {
874 return StoredDataPayload::Compressed(
875 compressed,
876 ObjectFlags::CompressedXz as u8,
877 );
878 }
879 }
880 }
881 }
882 Compression::Lz4 => {
883 if payload_bytes.len() >= 9 {
884 let compressed = lz4_compress(payload_bytes);
885 if compressed.len() < payload_bytes.len() {
886 return StoredDataPayload::Compressed(
887 compressed,
888 ObjectFlags::CompressedLz4 as u8,
889 );
890 }
891 }
892 }
893 Compression::None => {}
894 }
895 }
896
897 StoredDataPayload::Uncompressed(payload)
898 }
899
900 fn add_field(
901 &mut self,
902 journal_file: &mut JournalFile<MmapMut>,
903 payload: &[u8],
904 ) -> Result<NonZeroU64> {
905 self.ensure_first_tag(journal_file)?;
906
907 if let Some(field_offset) = self.field_cache.get(payload) {
908 return Ok(field_offset);
909 }
910
911 let hash = journal_file.hash(payload);
912
913 match journal_file.find_field_offset(hash, payload)? {
914 Some(field_offset) => {
915 self.field_cache.insert(payload, field_offset);
916 Ok(field_offset)
917 }
918 None => {
919 let field_offset = self.append_offset;
922 let is_compact = Self::is_compact(journal_file);
923 Self::ensure_compact_object_fits(
924 is_compact,
925 field_offset,
926 std::mem::size_of::<FieldObjectHeader>() as u64 + payload.len() as u64,
927 )?;
928 let field_size = {
929 let mut field_guard =
930 journal_file.field_mut(field_offset, Some(payload.len() as u64))?;
931
932 field_guard.header.hash = hash;
933 field_guard.set_payload(payload);
934 field_guard.header.object_header.aligned_size()
935 };
936 self.hmac_put_object(journal_file, field_offset.get(), ObjectType::Field)?;
937 self.object_added(journal_file, field_offset, field_size)?;
938
939 journal_file.field_hash_table_set_tail_offset(hash, field_offset)?;
941 let depth = Self::current_field_hash_chain_depth(journal_file, hash)?;
942 let max_depth = journal_file
943 .journal_header_ref()
944 .field_hash_chain_depth
945 .max(depth);
946 journal_file.journal_header_mut().field_hash_chain_depth = max_depth;
947 journal_file.journal_header_mut().n_fields += 1;
948
949 self.field_cache.insert(payload, field_offset);
950
951 Ok(field_offset)
953 }
954 }
955 }
956}
957
958fn zstd_frame_with_content_size(frame: Vec<u8>, content_size: usize) -> Vec<u8> {
959 const ZSTD_MAGIC: [u8; 4] = [0x28, 0xb5, 0x2f, 0xfd];
960 const SINGLE_SEGMENT_FLAG: u8 = 1 << 5;
961 const CONTENT_CHECKSUM_FLAG: u8 = 1 << 2;
962
963 if frame.len() < 6 || frame[0..4] != ZSTD_MAGIC {
964 return frame;
965 }
966
967 let descriptor = frame[4];
968 let dictionary_id_flag = descriptor & 0x03;
969 let frame_content_size_flag = descriptor >> 6;
970 if dictionary_id_flag != 0
971 || frame_content_size_flag != 0
972 || (descriptor & SINGLE_SEGMENT_FLAG) != 0
973 {
974 return frame;
975 }
976
977 let (new_frame_content_size_flag, frame_content_size) = if content_size <= 255 {
978 (0u8, vec![content_size as u8])
979 } else if content_size <= 65_791 {
980 (1u8, ((content_size - 256) as u16).to_le_bytes().to_vec())
981 } else if u32::try_from(content_size).is_ok() {
982 (2u8, (content_size as u32).to_le_bytes().to_vec())
983 } else {
984 (3u8, (content_size as u64).to_le_bytes().to_vec())
985 };
986
987 let mut patched = Vec::with_capacity(frame.len() + frame_content_size.len() - 1);
988 patched.extend_from_slice(&frame[..4]);
989 patched.push(
990 (new_frame_content_size_flag << 6)
991 | SINGLE_SEGMENT_FLAG
992 | (descriptor & CONTENT_CHECKSUM_FLAG),
993 );
994 patched.extend_from_slice(&frame_content_size);
995 patched.extend_from_slice(&frame[6..]);
996 patched
997}
998
999fn xz_compress(payload: &[u8]) -> std::io::Result<Vec<u8>> {
1000 use lzma_rust2::{XzOptions, XzWriter};
1001 use std::io::Write;
1002
1003 let mut options = XzOptions::with_preset(0);
1004 options.set_check_sum_type(lzma_rust2::CheckType::None);
1005 let mut writer = XzWriter::new(Vec::new(), options)?;
1006 writer.write_all(payload)?;
1007 writer.finish()
1008}
1009
1010fn lz4_compress(payload: &[u8]) -> Vec<u8> {
1011 let compressed = lz4_flex::block::compress(payload);
1012 let mut out = Vec::with_capacity(8 + compressed.len());
1013 out.extend_from_slice(&(payload.len() as u64).to_le_bytes());
1014 out.extend_from_slice(&compressed);
1015 out
1016}
1017
1018#[cfg(test)]
1019#[path = "writer_tests.rs"]
1020mod tests;