1use super::mmap::MmapMut;
2use crate::error::{JournalError, Result};
3use crate::file::{
4 Compression, DEFAULT_COMPRESS_THRESHOLD, DataObject, DataPayloadType, EntryObjectHeader,
5 FieldObjectHeader, HashableObjectMut, HeaderIncompatibleFlags, JournalFile, JournalHeader,
6 ObjectFlags, ObjectType, PayloadParts, hash::jenkins_hash64_parts,
7 normalize_compress_threshold,
8};
9use rustc_hash::FxHashMap;
10use std::io::Cursor;
11use std::num::NonZeroU64;
12
13pub(super) const OBJECT_ALIGNMENT: u64 = 8;
14pub(super) const JOURNAL_COMPACT_SIZE_MAX: u64 = u32::MAX as u64;
15const FILE_SIZE_INCREASE: u64 = 8 * 1024 * 1024;
16const FIELD_CACHE_MAX_ENTRIES: usize = 1024;
17const FIELD_CACHE_MAX_PAYLOAD_LEN: usize = 128;
18pub(super) fn round_up_to_file_size_increment(value: u64) -> Result<u64> {
19 value
20 .checked_add(FILE_SIZE_INCREASE - 1)
21 .map(|v| v & !(FILE_SIZE_INCREASE - 1))
22 .ok_or(JournalError::ObjectExceedsFileBounds)
23}
24
25#[derive(Debug, Clone, Copy)]
26pub(super) struct EntryItem {
27 pub(super) offset: NonZeroU64,
28 pub(super) hash: u64,
29}
30
31#[derive(Debug, Clone, Copy)]
32pub struct StructuredField<'a> {
33 pub name: &'a [u8],
35 pub value: &'a [u8],
37}
38
39impl<'a> StructuredField<'a> {
40 pub fn new(name: &'a [u8], value: &'a [u8]) -> Self {
42 Self { name, value }
43 }
44}
45
46#[derive(Debug, Clone, Copy)]
47pub enum EntryField<'a> {
48 Raw(&'a [u8]),
50 Structured(StructuredField<'a>),
53}
54
55impl<'a> EntryField<'a> {
56 pub fn raw(payload: &'a [u8]) -> Self {
58 Self::Raw(payload)
59 }
60
61 pub fn structured(name: &'a [u8], value: &'a [u8]) -> Self {
63 Self::Structured(StructuredField::new(name, value))
64 }
65
66 fn payload_parts(self) -> PayloadParts<'a> {
67 match self {
68 Self::Raw(payload) => PayloadParts::raw(payload),
69 Self::Structured(field) => PayloadParts::structured(field.name, field.value),
70 }
71 }
72
73 fn field_name(self) -> Option<&'a [u8]> {
74 match self {
75 Self::Raw(payload) => payload
76 .iter()
77 .position(|&b| b == b'=')
78 .map(|pos| &payload[..pos]),
79 Self::Structured(field) => Some(field.name),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
85pub enum FieldNamePolicy {
86 #[default]
89 Journald,
90 Raw,
93 JournalApp,
96}
97
98#[derive(Debug, Clone, Copy, Default)]
99pub struct EntryWriteOptions {
100 pub trusted_unique_payloads: bool,
109 pub field_name_policy: FieldNamePolicy,
111 pub seqnum: Option<u64>,
117 pub boot_id: Option<uuid::Uuid>,
122}
123
124impl EntryWriteOptions {
125 pub fn trusted_unique_payloads(mut self, enabled: bool) -> Self {
130 self.trusted_unique_payloads = enabled;
131 self
132 }
133
134 pub fn field_name_policy(mut self, policy: FieldNamePolicy) -> Self {
136 self.field_name_policy = policy;
137 self
138 }
139
140 pub fn seqnum(mut self, seqnum: u64) -> Self {
142 self.seqnum = Some(seqnum);
143 self
144 }
145
146 pub fn boot_id(mut self, boot_id: uuid::Uuid) -> Self {
148 self.boot_id = Some(boot_id);
149 self
150 }
151}
152
153fn is_journal_field_name_valid(field_name: &[u8], allow_protected: bool) -> bool {
154 if field_name.is_empty() || field_name.len() > 64 {
155 return false;
156 }
157 if field_name[0] == b'_' && !allow_protected {
158 return false;
159 }
160 if field_name[0].is_ascii_digit() {
161 return false;
162 }
163 field_name
164 .iter()
165 .all(|&b| b.is_ascii_uppercase() || b.is_ascii_digit() || b == b'_')
166}
167
168fn is_raw_field_name_valid(field_name: &[u8]) -> bool {
169 !field_name.is_empty() && !field_name.contains(&b'=')
170}
171
172fn accept_entry_field(field: EntryField<'_>, policy: FieldNamePolicy) -> Result<bool> {
173 let Some(field_name) = field.field_name() else {
174 return Err(JournalError::InvalidField);
175 };
176 let valid = match policy {
177 FieldNamePolicy::Raw => is_raw_field_name_valid(field_name),
178 FieldNamePolicy::Journald => is_journal_field_name_valid(field_name, true),
179 FieldNamePolicy::JournalApp => is_journal_field_name_valid(field_name, false),
180 };
181 if valid {
182 return Ok(true);
183 }
184 if matches!(policy, FieldNamePolicy::JournalApp) {
185 return Ok(false);
186 }
187 Err(JournalError::InvalidField)
188}
189
190#[derive(Debug)]
191struct FieldCache {
192 entries: FxHashMap<Box<[u8]>, NonZeroU64>,
193}
194
195impl FieldCache {
196 fn new() -> Self {
197 Self {
198 entries: FxHashMap::default(),
199 }
200 }
201
202 fn get(&self, payload: &[u8]) -> Option<NonZeroU64> {
203 self.entries.get(payload).copied()
204 }
205
206 fn insert(&mut self, payload: &[u8], offset: NonZeroU64) {
207 if payload.len() > FIELD_CACHE_MAX_PAYLOAD_LEN {
208 return;
209 }
210
211 if self.entries.len() >= FIELD_CACHE_MAX_ENTRIES && self.entries.get(payload).is_none() {
212 self.entries.clear();
213 }
214
215 self.entries
216 .insert(payload.to_vec().into_boxed_slice(), offset);
217 }
218
219 #[cfg(test)]
220 fn len(&self) -> usize {
221 self.entries.len()
222 }
223}
224
225enum StoredDataPayload<'a> {
226 Uncompressed(PayloadParts<'a>),
227 Compressed(Vec<u8>, u8),
228}
229
230impl StoredDataPayload<'_> {
231 fn len(&self) -> usize {
232 match self {
233 Self::Uncompressed(payload) => payload.len(),
234 Self::Compressed(payload, _) => payload.len(),
235 }
236 }
237
238 fn object_flags(&self) -> u8 {
239 match self {
240 Self::Uncompressed(_) => 0,
241 Self::Compressed(_, flags) => *flags,
242 }
243 }
244
245 fn copy_to_data_object(&self, data: &mut DataObject<&mut [u8]>) {
246 match self {
247 Self::Uncompressed(payload) => match &mut data.payload {
248 DataPayloadType::Regular(dst) => payload.copy_to_slice(dst),
249 DataPayloadType::Compact { payload: dst, .. } => payload.copy_to_slice(dst),
250 },
251 Self::Compressed(payload, _) => data.set_payload(payload),
252 }
253 }
254}
255
256pub struct JournalWriter {
257 pub(super) tail_object_offset: NonZeroU64,
258 pub(super) append_offset: NonZeroU64,
259 next_seqnum: u64,
260 num_written_objects: u64,
261 pub(super) first_tag_written: bool,
262 pub(super) entry_items: Vec<EntryItem>,
263 field_cache: FieldCache,
264 first_entry_monotonic: Option<u64>,
265 boot_id: uuid::Uuid,
266 compression: Compression,
267 compress_threshold: usize,
268 live_publish_every_entries: u64,
269 entries_since_live_publication: u64,
270 pub(super) seal: Option<crate::seal::SealState>,
271}
272
273impl JournalWriter {
274 pub fn current_file_size(&self) -> u64 {
276 self.append_offset.get()
277 }
278
279 pub fn first_entry_monotonic(&self) -> Option<u64> {
281 self.first_entry_monotonic
282 }
283
284 pub fn next_seqnum(&self) -> u64 {
286 self.next_seqnum
287 }
288
289 pub fn boot_id(&self) -> uuid::Uuid {
291 self.boot_id
292 }
293
294 pub fn set_live_publish_every_entries(&mut self, entries: u64) {
302 self.live_publish_every_entries = entries;
303 self.entries_since_live_publication = 0;
304 }
305
306 pub fn live_publish_every_entries(&self) -> u64 {
311 self.live_publish_every_entries
312 }
313
314 pub fn new(
315 journal_file: &mut JournalFile<MmapMut>,
316 next_seqnum: u64,
317 boot_id: uuid::Uuid,
318 ) -> Result<Self> {
319 let compression = match journal_file.journal_header_ref() {
320 header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedZstd) => {
321 Compression::Zstd
322 }
323 header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedXz) => {
324 Compression::Xz
325 }
326 header if header.has_incompatible_flag(HeaderIncompatibleFlags::CompressedLz4) => {
327 Compression::Lz4
328 }
329 _ => Compression::None,
330 };
331
332 Self::new_with_compression(
333 journal_file,
334 next_seqnum,
335 boot_id,
336 compression,
337 DEFAULT_COMPRESS_THRESHOLD,
338 )
339 }
340
341 pub fn new_with_compression(
342 journal_file: &mut JournalFile<MmapMut>,
343 next_seqnum: u64,
344 boot_id: uuid::Uuid,
345 compression: Compression,
346 compress_threshold: usize,
347 ) -> Result<Self> {
348 let current_header_size = std::mem::size_of::<JournalHeader>() as u64;
349 let header = journal_file.journal_header_ref();
350 if !header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash)
351 || header.header_size < current_header_size
352 {
353 return Err(JournalError::UnsupportedJournalFile);
354 }
355
356 let append_offset = {
357 let header = journal_file.journal_header_ref();
358
359 let Some(tail_object_offset) = header.tail_object_offset else {
360 return Err(JournalError::InvalidMagicNumber);
361 };
362
363 let tail_object = journal_file.object_header_ref(tail_object_offset)?;
364
365 tail_object_offset.saturating_add(tail_object.size)
366 };
367
368 let seal = journal_file
369 .seal_options
370 .as_ref()
371 .map(|opts| crate::seal::SealState::new(opts))
372 .transpose()?;
373
374 let mut writer = Self {
375 tail_object_offset: journal_file
376 .journal_header_ref()
377 .tail_object_offset
378 .unwrap(),
379 append_offset,
380 next_seqnum,
381 num_written_objects: 0,
382 first_tag_written: false,
383 entry_items: Vec::with_capacity(128),
384 field_cache: FieldCache::new(),
385 first_entry_monotonic: None,
386 boot_id,
387 compression,
388 compress_threshold: normalize_compress_threshold(compress_threshold),
389 live_publish_every_entries: 1,
390 entries_since_live_publication: 0,
391 seal,
392 };
393
394 if writer.seal.is_some() && journal_file.journal_header_ref().n_tags == 0 {
395 writer.ensure_first_tag(journal_file)?;
396 {
397 let header = journal_file.journal_header_mut();
398 header.n_objects += writer.num_written_objects;
399 header.tail_object_offset = Some(writer.tail_object_offset);
400 }
401 writer.num_written_objects = 0;
402 }
403
404 Ok(writer)
405 }
406
407 pub fn create_successor(&self, journal_file: &mut JournalFile<MmapMut>) -> Result<Self> {
409 Self::new_with_compression(
410 journal_file,
411 self.next_seqnum,
412 self.boot_id,
413 self.compression,
414 self.compress_threshold,
415 )
416 }
417
418 pub fn add_entry(
419 &mut self,
420 journal_file: &mut JournalFile<MmapMut>,
421 items: &[&[u8]],
422 realtime: u64,
423 monotonic: u64,
424 ) -> Result<()> {
425 self.add_entry_fields_with_options(
426 journal_file,
427 items.iter().copied().map(EntryField::raw),
428 realtime,
429 monotonic,
430 EntryWriteOptions::default(),
431 )
432 }
433
434 pub fn add_entry_structured(
435 &mut self,
436 journal_file: &mut JournalFile<MmapMut>,
437 fields: &[StructuredField<'_>],
438 realtime: u64,
439 monotonic: u64,
440 ) -> Result<()> {
441 self.add_entry_fields_with_options(
442 journal_file,
443 fields.iter().copied().map(EntryField::Structured),
444 realtime,
445 monotonic,
446 EntryWriteOptions::default(),
447 )
448 }
449
450 pub fn add_entry_structured_with_options(
451 &mut self,
452 journal_file: &mut JournalFile<MmapMut>,
453 fields: &[StructuredField<'_>],
454 realtime: u64,
455 monotonic: u64,
456 options: EntryWriteOptions,
457 ) -> Result<()> {
458 self.add_entry_fields_with_options(
459 journal_file,
460 fields.iter().copied().map(EntryField::Structured),
461 realtime,
462 monotonic,
463 options,
464 )
465 }
466
467 pub fn add_entry_fields<'a>(
468 &mut self,
469 journal_file: &mut JournalFile<MmapMut>,
470 fields: impl IntoIterator<Item = EntryField<'a>>,
471 realtime: u64,
472 monotonic: u64,
473 ) -> Result<()> {
474 self.add_entry_fields_with_options(
475 journal_file,
476 fields,
477 realtime,
478 monotonic,
479 EntryWriteOptions::default(),
480 )
481 }
482
483 pub fn add_entry_fields_with_options<'a>(
484 &mut self,
485 journal_file: &mut JournalFile<MmapMut>,
486 fields: impl IntoIterator<Item = EntryField<'a>>,
487 realtime: u64,
488 monotonic: u64,
489 options: EntryWriteOptions,
490 ) -> Result<()> {
491 self.ensure_keyed_append(journal_file)?;
492 let entry_seqnum = self.entry_seqnum_for_options(options)?;
493 let entry_boot_id = options.boot_id.unwrap_or(self.boot_id);
494 let xor_hash = self.prepare_entry_items(journal_file, fields, realtime, options)?;
495 let entry_offset = self.write_entry_object(
496 journal_file,
497 entry_seqnum,
498 entry_boot_id,
499 realtime,
500 monotonic,
501 xor_hash,
502 )?;
503 self.publish_entry_links(journal_file, entry_offset)?;
504 self.entry_added(
505 journal_file.journal_header_mut(),
506 entry_offset,
507 entry_seqnum,
508 entry_boot_id,
509 realtime,
510 monotonic,
511 );
512 self.publish_after_entry(journal_file)
513 }
514
515 fn ensure_keyed_append(&self, journal_file: &JournalFile<MmapMut>) -> Result<()> {
516 let header = journal_file.journal_header_ref();
517 if header.has_incompatible_flag(HeaderIncompatibleFlags::KeyedHash) {
518 return Ok(());
519 }
520 Err(JournalError::UnsupportedJournalFile)
521 }
522
523 fn entry_seqnum_for_options(&self, options: EntryWriteOptions) -> Result<u64> {
524 let entry_seqnum = options.seqnum.unwrap_or(self.next_seqnum);
525 if entry_seqnum == 0 || entry_seqnum == u64::MAX || entry_seqnum < self.next_seqnum {
526 return Err(JournalError::InvalidField);
527 }
528 Ok(entry_seqnum)
529 }
530
531 fn prepare_entry_items<'a>(
532 &mut self,
533 journal_file: &mut JournalFile<MmapMut>,
534 fields: impl IntoIterator<Item = EntryField<'a>>,
535 realtime: u64,
536 options: EntryWriteOptions,
537 ) -> Result<u64> {
538 let mut xor_hash = 0;
539 self.entry_items.clear();
540 let mut publication_ready = false;
541 for field in fields {
542 if !accept_entry_field(field, options.field_name_policy)? {
543 continue;
544 }
545 self.ensure_entry_publication_ready(journal_file, realtime, &mut publication_ready)?;
546 xor_hash ^= self.add_entry_field_item(journal_file, field)?;
547 }
548 self.finish_entry_items(options.trusted_unique_payloads)?;
549 Ok(xor_hash)
550 }
551
552 fn ensure_entry_publication_ready(
553 &mut self,
554 journal_file: &mut JournalFile<MmapMut>,
555 realtime: u64,
556 publication_ready: &mut bool,
557 ) -> Result<()> {
558 if *publication_ready {
559 return Ok(());
560 }
561 self.ensure_first_tag(journal_file)?;
562 self.maybe_append_tag(journal_file, realtime)?;
563 *publication_ready = true;
564 Ok(())
565 }
566
567 fn add_entry_field_item(
568 &mut self,
569 journal_file: &mut JournalFile<MmapMut>,
570 field: EntryField<'_>,
571 ) -> Result<u64> {
572 let entry_item = self.add_data(journal_file, field)?;
573 self.entry_items.push(entry_item);
574 Ok(jenkins_hash64_parts(field.payload_parts().iter()))
575 }
576
577 fn finish_entry_items(&mut self, trusted_unique_payloads: bool) -> Result<()> {
578 if self.entry_items.is_empty() {
579 return Err(JournalError::InvalidField);
580 }
581 if !self.entry_items_are_sorted() {
582 self.entry_items
583 .sort_unstable_by(|a, b| a.offset.cmp(&b.offset));
584 }
585 if !trusted_unique_payloads {
586 self.entry_items.dedup_by(|a, b| a.offset == b.offset);
587 }
588 Ok(())
589 }
590
591 fn entry_items_are_sorted(&self) -> bool {
592 self.entry_items
593 .windows(2)
594 .all(|items| items[0].offset <= items[1].offset)
595 }
596
597 fn write_entry_object(
598 &mut self,
599 journal_file: &mut JournalFile<MmapMut>,
600 entry_seqnum: u64,
601 entry_boot_id: uuid::Uuid,
602 realtime: u64,
603 monotonic: u64,
604 xor_hash: u64,
605 ) -> Result<NonZeroU64> {
606 let entry_offset = self.append_offset;
607 let is_compact = Self::is_compact(journal_file);
608 let entry_payload_size = self.entry_items.len() as u64 * Self::entry_item_size(is_compact);
609 Self::ensure_compact_object_fits(
610 is_compact,
611 entry_offset,
612 std::mem::size_of::<EntryObjectHeader>() as u64 + entry_payload_size,
613 )?;
614 let entry_size = {
615 let size = Some(entry_payload_size);
616 let mut entry_guard = journal_file.entry_mut(entry_offset, size)?;
617
618 entry_guard.header.seqnum = entry_seqnum;
619 entry_guard.header.xor_hash = xor_hash;
620 entry_guard.header.boot_id = *entry_boot_id.as_bytes();
621 entry_guard.header.monotonic = monotonic;
622 entry_guard.header.realtime = realtime;
623
624 for (index, entry_item) in self.entry_items.iter().enumerate() {
626 Self::ensure_compact_offset(is_compact, entry_item.offset)?;
627 let item_hash = (!is_compact).then_some(entry_item.hash);
628 entry_guard.items.set(index, entry_item.offset, item_hash);
629 }
630
631 entry_guard.header.object_header.aligned_size()
632 };
633 self.hmac_put_object(journal_file, entry_offset.get(), ObjectType::Entry)?;
634 self.object_added(journal_file, entry_offset, entry_size)?;
635 Ok(entry_offset)
636 }
637
638 fn publish_entry_links(
639 &mut self,
640 journal_file: &mut JournalFile<MmapMut>,
641 entry_offset: NonZeroU64,
642 ) -> Result<()> {
643 self.append_to_entry_array(journal_file, entry_offset)?;
644 for entry_item_index in 0..self.entry_items.len() {
645 self.link_data_to_entry(journal_file, entry_offset, entry_item_index)?;
646 }
647 Ok(())
648 }
649
650 fn publish_after_entry(&mut self, journal_file: &mut JournalFile<MmapMut>) -> Result<()> {
651 match self.live_publish_every_entries {
652 0 => Ok(()),
653 1 => journal_file.post_change(),
654 interval => {
655 self.entries_since_live_publication += 1;
656 if self.entries_since_live_publication >= interval {
657 self.entries_since_live_publication = 0;
658 journal_file.post_change()
659 } else {
660 Ok(())
661 }
662 }
663 }
664 }
665
666 pub(super) fn object_added(
667 &mut self,
668 journal_file: &mut JournalFile<MmapMut>,
669 object_offset: NonZeroU64,
670 object_size: u64,
671 ) -> Result<()> {
672 self.tail_object_offset = object_offset;
673 self.append_offset = object_offset
674 .checked_add(object_size)
675 .ok_or(JournalError::ObjectExceedsFileBounds)?;
676 self.num_written_objects += 1;
677
678 let header = journal_file.journal_header_mut();
679 let old_size = header
680 .header_size
681 .checked_add(header.arena_size)
682 .ok_or(JournalError::ObjectExceedsFileBounds)?;
683 if self.append_offset.get() > old_size {
684 let new_size = round_up_to_file_size_increment(self.append_offset.get())?;
685 header.arena_size = new_size
686 .checked_sub(header.header_size)
687 .ok_or(JournalError::ObjectExceedsFileBounds)?;
688 }
689
690 Ok(())
691 }
692
693 fn entry_added(
694 &mut self,
695 header: &mut JournalHeader,
696 entry_offset: NonZeroU64,
697 entry_seqnum: u64,
698 entry_boot_id: uuid::Uuid,
699 realtime: u64,
700 monotonic: u64,
701 ) {
702 header.n_objects += self.num_written_objects;
703 header.tail_object_offset = Some(self.tail_object_offset);
704
705 if header.head_entry_seqnum == 0 {
706 header.head_entry_seqnum = entry_seqnum;
707 }
708 if header.head_entry_realtime == 0 {
709 header.head_entry_realtime = realtime;
710 }
711 if self.first_entry_monotonic.is_none() {
712 self.first_entry_monotonic = Some(monotonic);
713 }
714
715 header.tail_entry_seqnum = entry_seqnum;
716 header.tail_entry_realtime = realtime;
717 header.tail_entry_monotonic = monotonic;
718 header.tail_entry_boot_id = *entry_boot_id.as_bytes();
719 header.tail_entry_offset = entry_offset.get();
720 header.n_entries += 1;
721
722 self.next_seqnum = entry_seqnum + 1;
723 self.num_written_objects = 0;
724 }
725
726 fn add_data(
727 &mut self,
728 journal_file: &mut JournalFile<MmapMut>,
729 field: EntryField<'_>,
730 ) -> Result<EntryItem> {
731 let payload = field.payload_parts();
732 let field_name = field.field_name().ok_or(JournalError::InvalidField)?;
733 let hash = journal_file.hash_parts(payload);
734 if let Some(data_offset) = journal_file.find_data_offset_parts(hash, payload)? {
735 return Ok(Self::entry_item(data_offset, hash));
736 }
737 self.add_new_data(journal_file, payload, field_name, hash)
738 }
739
740 fn entry_item(offset: NonZeroU64, hash: u64) -> EntryItem {
741 EntryItem { offset, hash }
742 }
743
744 fn add_new_data<'a>(
745 &mut self,
746 journal_file: &mut JournalFile<MmapMut>,
747 payload: PayloadParts<'a>,
748 field_name: &'a [u8],
749 hash: u64,
750 ) -> Result<EntryItem> {
751 let data_offset = self.write_new_data_object(journal_file, payload, hash)?;
752 self.publish_new_data_object(journal_file, data_offset, hash)?;
753 self.link_data_to_field(journal_file, data_offset, field_name)?;
754 Ok(Self::entry_item(data_offset, hash))
755 }
756
757 fn write_new_data_object<'a>(
758 &mut self,
759 journal_file: &mut JournalFile<MmapMut>,
760 payload: PayloadParts<'a>,
761 hash: u64,
762 ) -> Result<NonZeroU64> {
763 let data_offset = self.append_offset;
764 let stored_payload = self.stored_data_payload(payload);
765 self.ensure_data_object_fits(journal_file, data_offset, stored_payload.len() as u64)?;
766 let data_size = {
767 let mut data_guard =
768 journal_file.data_mut(data_offset, Some(stored_payload.len() as u64))?;
769 data_guard.header.hash = hash;
770 stored_payload.copy_to_data_object(&mut data_guard);
771 data_guard.header.object_header.flags = stored_payload.object_flags();
772 data_guard.header.object_header.aligned_size()
773 };
774 self.hmac_put_object(journal_file, data_offset.get(), ObjectType::Data)?;
775 self.object_added(journal_file, data_offset, data_size)?;
776 Ok(data_offset)
777 }
778
779 fn ensure_data_object_fits(
780 &self,
781 journal_file: &JournalFile<MmapMut>,
782 data_offset: NonZeroU64,
783 payload_size: u64,
784 ) -> Result<()> {
785 let is_compact = Self::is_compact(journal_file);
786 Self::ensure_compact_object_fits(
787 is_compact,
788 data_offset,
789 Self::data_object_size(is_compact, payload_size),
790 )
791 }
792
793 fn publish_new_data_object(
794 &mut self,
795 journal_file: &mut JournalFile<MmapMut>,
796 data_offset: NonZeroU64,
797 hash: u64,
798 ) -> Result<()> {
799 journal_file.data_hash_table_set_tail_offset(hash, data_offset)?;
800 Self::update_data_hash_chain_depth(journal_file, hash)?;
801 journal_file.journal_header_mut().n_data += 1;
802 Ok(())
803 }
804
805 fn link_data_to_field(
806 &mut self,
807 journal_file: &mut JournalFile<MmapMut>,
808 data_offset: NonZeroU64,
809 field_name: &[u8],
810 ) -> Result<()> {
811 let field_offset = self.add_field(journal_file, field_name)?;
812 let head_data_offset = {
813 let field_guard = journal_file.field_ref(field_offset)?;
814 field_guard.header.head_data_offset
815 };
816 {
817 let mut data_guard = journal_file.data_mut(data_offset, None)?;
818 data_guard.header.next_field_offset = head_data_offset;
819 }
820 let mut field_guard = journal_file.field_mut(field_offset, None)?;
821 field_guard.header.head_data_offset = Some(data_offset);
822 Ok(())
823 }
824
825 fn stored_data_payload<'a>(&self, payload: PayloadParts<'a>) -> StoredDataPayload<'a> {
826 if payload.len() >= self.compress_threshold {
827 let full_payload;
828 let payload_bytes = if let Some(raw) = payload.as_single_slice() {
829 raw
830 } else {
831 full_payload = payload.to_vec();
834 full_payload.as_slice()
835 };
836 match self.compression {
837 Compression::Zstd => {
838 let compressed = ruzstd::encoding::compress_to_vec(
839 Cursor::new(payload_bytes),
840 ruzstd::encoding::CompressionLevel::Fastest,
841 );
842 let compressed = zstd_frame_with_content_size(compressed, payload_bytes.len());
843 if compressed.len() < payload_bytes.len() {
844 return StoredDataPayload::Compressed(
845 compressed,
846 ObjectFlags::CompressedZstd as u8,
847 );
848 }
849 }
850 Compression::Xz => {
851 if payload_bytes.len() >= 80 {
852 if let Ok(compressed) = xz_compress(payload_bytes) {
853 if compressed.len() < payload_bytes.len() {
854 return StoredDataPayload::Compressed(
855 compressed,
856 ObjectFlags::CompressedXz as u8,
857 );
858 }
859 }
860 }
861 }
862 Compression::Lz4 => {
863 if payload_bytes.len() >= 9 {
864 let compressed = lz4_compress(payload_bytes);
865 if compressed.len() < payload_bytes.len() {
866 return StoredDataPayload::Compressed(
867 compressed,
868 ObjectFlags::CompressedLz4 as u8,
869 );
870 }
871 }
872 }
873 Compression::None => {}
874 }
875 }
876
877 StoredDataPayload::Uncompressed(payload)
878 }
879
880 fn add_field(
881 &mut self,
882 journal_file: &mut JournalFile<MmapMut>,
883 payload: &[u8],
884 ) -> Result<NonZeroU64> {
885 self.ensure_first_tag(journal_file)?;
886
887 if let Some(field_offset) = self.field_cache.get(payload) {
888 return Ok(field_offset);
889 }
890
891 let hash = journal_file.hash(payload);
892
893 match journal_file.find_field_offset(hash, payload)? {
894 Some(field_offset) => {
895 self.field_cache.insert(payload, field_offset);
896 Ok(field_offset)
897 }
898 None => {
899 let field_offset = self.append_offset;
902 let is_compact = Self::is_compact(journal_file);
903 Self::ensure_compact_object_fits(
904 is_compact,
905 field_offset,
906 std::mem::size_of::<FieldObjectHeader>() as u64 + payload.len() as u64,
907 )?;
908 let field_size = {
909 let mut field_guard =
910 journal_file.field_mut(field_offset, Some(payload.len() as u64))?;
911
912 field_guard.header.hash = hash;
913 field_guard.set_payload(payload);
914 field_guard.header.object_header.aligned_size()
915 };
916 self.hmac_put_object(journal_file, field_offset.get(), ObjectType::Field)?;
917 self.object_added(journal_file, field_offset, field_size)?;
918
919 journal_file.field_hash_table_set_tail_offset(hash, field_offset)?;
921 let depth = Self::current_field_hash_chain_depth(journal_file, hash)?;
922 let max_depth = journal_file
923 .journal_header_ref()
924 .field_hash_chain_depth
925 .max(depth);
926 journal_file.journal_header_mut().field_hash_chain_depth = max_depth;
927 journal_file.journal_header_mut().n_fields += 1;
928
929 self.field_cache.insert(payload, field_offset);
930
931 Ok(field_offset)
933 }
934 }
935 }
936}
937
938fn zstd_frame_with_content_size(frame: Vec<u8>, content_size: usize) -> Vec<u8> {
939 const ZSTD_MAGIC: [u8; 4] = [0x28, 0xb5, 0x2f, 0xfd];
940 const SINGLE_SEGMENT_FLAG: u8 = 1 << 5;
941 const CONTENT_CHECKSUM_FLAG: u8 = 1 << 2;
942
943 if frame.len() < 6 || frame[0..4] != ZSTD_MAGIC {
944 return frame;
945 }
946
947 let descriptor = frame[4];
948 let dictionary_id_flag = descriptor & 0x03;
949 let frame_content_size_flag = descriptor >> 6;
950 if dictionary_id_flag != 0
951 || frame_content_size_flag != 0
952 || (descriptor & SINGLE_SEGMENT_FLAG) != 0
953 {
954 return frame;
955 }
956
957 let (new_frame_content_size_flag, frame_content_size) = if content_size <= 255 {
958 (0u8, vec![content_size as u8])
959 } else if content_size <= 65_791 {
960 (1u8, ((content_size - 256) as u16).to_le_bytes().to_vec())
961 } else if u32::try_from(content_size).is_ok() {
962 (2u8, (content_size as u32).to_le_bytes().to_vec())
963 } else {
964 (3u8, (content_size as u64).to_le_bytes().to_vec())
965 };
966
967 let mut patched = Vec::with_capacity(frame.len() + frame_content_size.len() - 1);
968 patched.extend_from_slice(&frame[..4]);
969 patched.push(
970 (new_frame_content_size_flag << 6)
971 | SINGLE_SEGMENT_FLAG
972 | (descriptor & CONTENT_CHECKSUM_FLAG),
973 );
974 patched.extend_from_slice(&frame_content_size);
975 patched.extend_from_slice(&frame[6..]);
976 patched
977}
978
979fn xz_compress(payload: &[u8]) -> std::io::Result<Vec<u8>> {
980 use lzma_rust2::{XzOptions, XzWriter};
981 use std::io::Write;
982
983 let mut options = XzOptions::with_preset(0);
984 options.set_check_sum_type(lzma_rust2::CheckType::None);
985 let mut writer = XzWriter::new(Vec::new(), options)?;
986 writer.write_all(payload)?;
987 writer.finish()
988}
989
990fn lz4_compress(payload: &[u8]) -> Vec<u8> {
991 let compressed = lz4_flex::block::compress(payload);
992 let mut out = Vec::with_capacity(8 + compressed.len());
993 out.extend_from_slice(&(payload.len() as u64).to_le_bytes());
994 out.extend_from_slice(&compressed);
995 out
996}
997
998#[cfg(test)]
999#[path = "writer_tests.rs"]
1000mod tests;