1mod chain;
2use chain::OwnedChain;
3
4mod config;
5pub use config::{Config, LogIdentityMode, LogOpenMode, RetentionPolicy, RotationPolicy};
6
7mod helpers;
8mod startup;
9use helpers::*;
10use startup::{ActiveFile, RotationState, build_startup_state};
11
12use crate::{Result, WriterError};
13use itoa::Buffer as ItoaBuffer;
14use journal_common::{Microseconds, RealtimeClock, monotonic_now};
15use journal_core::error::JournalError;
16use journal_core::file::mmap::MmapMut;
17use journal_core::file::{
18 Compression, EntryField, EntryWriteOptions, FieldNamePolicy, JournalFile, JournalFileOptions,
19 JournalWriter, StructuredField,
20};
21use journal_registry::repository;
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24
25const STACK_ENTRY_REF_LIMIT: usize = 128;
26const SOURCE_REALTIME_PREFIX: &[u8] = b"_SOURCE_REALTIME_TIMESTAMP=";
27const DERIVED_ROTATION_FRACTION: u64 = 20;
28const JOURNAL_FILE_SIZE_MIN: u64 = 512 * 1024;
29const PAGE_SIZE: u64 = 4096;
30const JOURNAL_COMPACT_SIZE_MAX: u64 = u32::MAX as u64;
31
32pub struct Log {
34 configured_dir: PathBuf,
35 chain: OwnedChain,
36 config: Config,
37 active_file: Option<ActiveFile>,
38 rotation_state: RotationState,
39 boot_id: uuid::Uuid,
40 seqnum_id: uuid::Uuid,
41 current_seqnum: u64,
42 clock: RealtimeClock,
43 last_monotonic_usec: u64,
44 lifecycle_observer: Option<Arc<dyn LogLifecycleObserver>>,
45 artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
46 retention_on_open_applied: bool,
47 boot_id_field: Vec<u8>,
48 source_realtime_field: Vec<u8>,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum LogLifecycleReason {
53 Append,
54 EagerOpen,
55 Rotation,
56 Retention,
57}
58
59#[derive(Debug, Clone)]
60pub enum LogLifecycleEvent {
61 Created {
62 active: repository::File,
63 reason: LogLifecycleReason,
64 },
65 Rotated {
66 archived: repository::File,
67 active: repository::File,
68 },
69 RetainedDeleted {
70 files: Vec<repository::File>,
71 },
72}
73
74pub trait LogLifecycleObserver: Send + Sync {
75 fn on_event(&self, event: &LogLifecycleEvent);
76}
77
78pub trait LogArtifactSizer: Send + Sync {
79 fn journal_artifact_size(&self, journal_path: &Path) -> Result<u64>;
80}
81
82#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
83pub struct EntryTimestamps {
84 pub source_realtime_usec: Option<u64>,
86 pub entry_realtime_usec: Option<u64>,
88 pub entry_monotonic_usec: Option<u64>,
90}
91
92impl EntryTimestamps {
93 pub fn with_source_realtime_usec(mut self, ts: u64) -> Self {
94 self.source_realtime_usec = Some(ts);
95 self
96 }
97
98 pub fn with_entry_realtime_usec(mut self, ts: u64) -> Self {
99 self.entry_realtime_usec = Some(ts);
100 self
101 }
102
103 pub fn with_entry_monotonic_usec(mut self, ts: u64) -> Self {
104 self.entry_monotonic_usec = Some(ts);
105 self
106 }
107}
108
109impl Log {
110 fn duration_to_micros(duration: std::time::Duration) -> u64 {
111 duration.as_micros().try_into().unwrap_or(u64::MAX)
112 }
113
114 fn peek_entry_realtime(&self, timestamps: &EntryTimestamps) -> u64 {
115 let candidate = timestamps
116 .entry_realtime_usec
117 .unwrap_or_else(|| Microseconds::now().get());
118 let last_seen = self.clock.last_seen().get();
119 if candidate > last_seen {
120 candidate
121 } else {
122 last_seen.saturating_add(1)
123 }
124 }
125
126 fn should_rotate_for_realtime(&self, realtime: u64) -> bool {
127 let Some(active_file) = &self.active_file else {
128 return true;
129 };
130 if self.rotation_state.should_rotate() {
131 return true;
132 }
133 let Some(max_duration) = self.config.rotation_policy.duration_of_journal_file else {
134 return false;
135 };
136 let header = active_file.journal_file.journal_header_ref();
137 header.n_entries > 0
138 && header.head_entry_realtime > 0
139 && realtime.saturating_sub(header.head_entry_realtime)
140 >= Self::duration_to_micros(max_duration)
141 }
142
143 fn append_rotation_reason(&self) -> LogLifecycleReason {
144 if self.active_file.is_none() {
145 LogLifecycleReason::Append
146 } else {
147 LogLifecycleReason::Rotation
148 }
149 }
150
151 fn prepare_append_for_realtime(&mut self, entry_realtime: u64) -> Result<()> {
152 self.apply_retention_on_open()?;
153 let opened_first_active = self.active_file.is_none();
154 if self.should_rotate_for_realtime(entry_realtime) {
155 self.rotate(entry_realtime, self.append_rotation_reason())?;
156 if opened_first_active {
157 self.retention_on_open_applied = true;
158 }
159 }
160 self.apply_retention_on_open()
161 }
162
163 fn raw_items_for_policy<'a>(&self, items: &'a [&'a [u8]]) -> Result<Option<Vec<&'a [u8]>>> {
164 if self.config.field_name_policy != FieldNamePolicy::JournalApp {
165 return Ok(None);
166 }
167 let filtered_items = filter_raw_items_for_journal_app(items)?;
168 if filtered_items.is_empty() {
169 return Err(WriterError::EmptyEntry);
170 }
171 Ok(Some(filtered_items))
172 }
173
174 fn structured_fields_for_policy<'a>(
175 &self,
176 fields: &'a [StructuredField<'a>],
177 ) -> Result<Option<Vec<StructuredField<'a>>>> {
178 if self.config.field_name_policy != FieldNamePolicy::JournalApp {
179 return Ok(None);
180 }
181 let filtered_fields = filter_structured_fields_for_journal_app(fields);
182 if filtered_fields.is_empty() {
183 return Err(WriterError::EmptyEntry);
184 }
185 Ok(Some(filtered_fields))
186 }
187
188 fn apply_retention(&mut self, protected_file: Option<&repository::File>) -> Result<()> {
189 if let Some(sizer) = &self.artifact_sizer {
190 self.chain.refresh_retained_sizes(|file| {
191 sizer.journal_artifact_size(Path::new(file.path()))
192 })?;
193 }
194 let retention = self
195 .chain
196 .retain(&self.config.retention_policy, protected_file);
197 let deleted_files = retention.deleted_files;
198 if !deleted_files.is_empty()
199 && let Some(observer) = &self.lifecycle_observer
200 {
201 observer.on_event(&LogLifecycleEvent::RetainedDeleted {
202 files: deleted_files,
203 });
204 }
205 if let Some(error) = retention.error {
206 return Err(error);
207 }
208
209 Ok(())
210 }
211
212 fn apply_retention_on_open(&mut self) -> Result<()> {
213 if self.retention_on_open_applied || self.active_file.is_none() {
214 return Ok(());
215 }
216 self.enforce_retention()?;
217 self.retention_on_open_applied = true;
218 Ok(())
219 }
220
221 fn capture_dual_timestamp(
227 &mut self,
228 timestamp_override: Option<&EntryTimestamps>,
229 ) -> Result<(u64, u64)> {
230 let realtime = match timestamp_override.and_then(|ts| ts.entry_realtime_usec) {
231 Some(ts) => self.clock.observe(Microseconds::new(ts)).get(),
232 None => self.clock.now().get(),
233 };
234
235 let desired_monotonic = match timestamp_override.and_then(|ts| ts.entry_monotonic_usec) {
236 Some(ts) => ts,
237 None => monotonic_now().map_err(WriterError::Io)?.get(),
238 };
239
240 let monotonic = if desired_monotonic > self.last_monotonic_usec {
241 desired_monotonic
242 } else {
243 self.last_monotonic_usec.saturating_add(1)
244 };
245 self.last_monotonic_usec = monotonic;
246
247 Ok((realtime, monotonic))
248 }
249
250 pub fn new(path: &Path, config: Config) -> Result<Self> {
252 Self::new_inner(path, config, None, None)
253 }
254
255 pub fn new_with_lifecycle_observer(
256 path: &Path,
257 config: Config,
258 observer: Arc<dyn LogLifecycleObserver>,
259 ) -> Result<Self> {
260 Self::new_inner(path, config, Some(observer), None)
261 }
262
263 pub fn new_with_hooks(
264 path: &Path,
265 config: Config,
266 observer: Option<Arc<dyn LogLifecycleObserver>>,
267 artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
268 ) -> Result<Self> {
269 Self::new_inner(path, config, observer, artifact_sizer)
270 }
271
272 fn new_inner(
273 path: &Path,
274 config: Config,
275 lifecycle_observer: Option<Arc<dyn LogLifecycleObserver>>,
276 artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
277 ) -> Result<Self> {
278 let startup = build_startup_state(path, config)?;
279
280 let mut log = Log {
281 configured_dir: path.to_path_buf(),
282 chain: startup.chain,
283 config: startup.config,
284 active_file: startup.active_file,
285 rotation_state: startup.rotation_state,
286 boot_id: startup.boot_id,
287 seqnum_id: startup.seqnum_id,
288 current_seqnum: startup.current_seqnum,
289 clock: startup.clock,
290 last_monotonic_usec: startup.last_monotonic_usec,
291 lifecycle_observer,
292 artifact_sizer,
293 retention_on_open_applied: false,
294 boot_id_field: format!("_BOOT_ID={}", startup.boot_id.as_simple()).into_bytes(),
295 source_realtime_field: Vec::with_capacity(SOURCE_REALTIME_PREFIX.len() + 20),
296 };
297 if log.config.open_mode == LogOpenMode::Eager && log.active_file.is_none() {
298 let realtime = log.peek_entry_realtime(&EntryTimestamps::default());
299 log.rotate(realtime, LogLifecycleReason::EagerOpen)?;
300 log.retention_on_open_applied = true;
301 }
302 log.apply_retention_on_open()?;
303 Ok(log)
304 }
305
306 pub fn with_lifecycle_observer(mut self, observer: Arc<dyn LogLifecycleObserver>) -> Self {
307 self.lifecycle_observer = Some(observer);
308 self
309 }
310
311 pub fn with_artifact_sizer(mut self, sizer: Arc<dyn LogArtifactSizer>) -> Self {
312 self.artifact_sizer = Some(sizer);
313 self
314 }
315
316 pub fn write_entry(
322 &mut self,
323 items: &[&[u8]],
324 source_realtime_usec: Option<u64>,
325 ) -> Result<()> {
326 self.write_entry_with_timestamps(
327 items,
328 EntryTimestamps {
329 source_realtime_usec,
330 ..EntryTimestamps::default()
331 },
332 )
333 }
334
335 pub fn write_entry_with_timestamps(
341 &mut self,
342 items: &[&[u8]],
343 timestamps: EntryTimestamps,
344 ) -> Result<()> {
345 if items.is_empty() {
346 return Err(WriterError::EmptyEntry);
347 }
348
349 let entry_realtime = self.peek_entry_realtime(×tamps);
350 self.prepare_append_for_realtime(entry_realtime)?;
351 let filtered_items = self.raw_items_for_policy(items)?;
352 let write_items = filtered_items.as_deref().unwrap_or(items);
353
354 let (realtime, monotonic) = self.capture_dual_timestamp(Some(×tamps))?;
355 self.write_raw_entry_fields(
356 write_items,
357 timestamps.source_realtime_usec,
358 realtime,
359 monotonic,
360 self.low_level_entry_options(EntryWriteOptions::default()),
361 )?;
362
363 let active_file = self.active_file.as_ref().unwrap();
364 self.rotation_state.update(&active_file.writer);
365 self.current_seqnum += 1;
366
367 Ok(())
368 }
369
370 pub fn write_fields(
376 &mut self,
377 fields: &[StructuredField<'_>],
378 source_realtime_usec: Option<u64>,
379 ) -> Result<()> {
380 self.write_fields_with_timestamps(
381 fields,
382 EntryTimestamps {
383 source_realtime_usec,
384 ..EntryTimestamps::default()
385 },
386 )
387 }
388
389 pub fn write_fields_with_timestamps(
394 &mut self,
395 fields: &[StructuredField<'_>],
396 timestamps: EntryTimestamps,
397 ) -> Result<()> {
398 self.write_fields_with_options(fields, timestamps, EntryWriteOptions::default())
399 }
400
401 pub fn write_fields_with_options(
407 &mut self,
408 fields: &[StructuredField<'_>],
409 timestamps: EntryTimestamps,
410 options: EntryWriteOptions,
411 ) -> Result<()> {
412 if fields.is_empty() {
413 return Err(WriterError::EmptyEntry);
414 }
415
416 let entry_realtime = self.peek_entry_realtime(×tamps);
417 self.prepare_append_for_realtime(entry_realtime)?;
418 let filtered_fields = self.structured_fields_for_policy(fields)?;
419 let write_fields = filtered_fields.as_deref().unwrap_or(fields);
420
421 let (realtime, monotonic) = self.capture_dual_timestamp(Some(×tamps))?;
422 self.write_structured_entry_fields(
423 write_fields,
424 timestamps.source_realtime_usec,
425 realtime,
426 monotonic,
427 self.low_level_entry_options(options),
428 )?;
429
430 let active_file = self.active_file.as_ref().unwrap();
431 self.rotation_state.update(&active_file.writer);
432 self.current_seqnum += 1;
433
434 Ok(())
435 }
436
437 fn write_raw_entry_fields(
438 &mut self,
439 items: &[&[u8]],
440 source_realtime_usec: Option<u64>,
441 realtime: u64,
442 monotonic: u64,
443 options: EntryWriteOptions,
444 ) -> Result<()> {
445 let source_field = if let Some(timestamp_usec) = source_realtime_usec {
446 self.prepare_source_realtime_field(timestamp_usec);
447 Some(self.source_realtime_field.as_slice())
448 } else {
449 None
450 };
451
452 let total_items = items.len() + 1 + usize::from(source_field.is_some());
453 if total_items <= STACK_ENTRY_REF_LIMIT {
454 let mut refs = [EntryField::raw(&[]); STACK_ENTRY_REF_LIMIT];
455 let mut len = 0usize;
456 refs[len] = EntryField::raw(self.boot_id_field.as_slice());
457 len += 1;
458 if let Some(source_field) = source_field {
459 refs[len] = EntryField::raw(source_field);
460 len += 1;
461 }
462 for item in items {
463 refs[len] = EntryField::raw(item);
464 len += 1;
465 }
466 self.active_file.as_mut().unwrap().write_entry_fields(
467 refs[..len].iter().copied(),
468 realtime,
469 monotonic,
470 options,
471 )?;
472 } else {
473 let mut refs = Vec::with_capacity(total_items);
474 refs.push(EntryField::raw(self.boot_id_field.as_slice()));
475 if let Some(source_field) = source_field {
476 refs.push(EntryField::raw(source_field));
477 }
478 refs.extend(items.iter().copied().map(EntryField::raw));
479 self.active_file.as_mut().unwrap().write_entry_fields(
480 refs.iter().copied(),
481 realtime,
482 monotonic,
483 options,
484 )?;
485 }
486
487 Ok(())
488 }
489
490 fn write_structured_entry_fields(
491 &mut self,
492 fields: &[StructuredField<'_>],
493 source_realtime_usec: Option<u64>,
494 realtime: u64,
495 monotonic: u64,
496 options: EntryWriteOptions,
497 ) -> Result<()> {
498 let source_field = if let Some(timestamp_usec) = source_realtime_usec {
499 self.prepare_source_realtime_field(timestamp_usec);
500 Some(self.source_realtime_field.as_slice())
501 } else {
502 None
503 };
504
505 let total_items = fields.len() + 1 + usize::from(source_field.is_some());
506 if total_items <= STACK_ENTRY_REF_LIMIT {
507 let mut refs = [EntryField::raw(&[]); STACK_ENTRY_REF_LIMIT];
508 let mut len = 0usize;
509 refs[len] = EntryField::raw(self.boot_id_field.as_slice());
510 len += 1;
511 if let Some(source_field) = source_field {
512 refs[len] = EntryField::raw(source_field);
513 len += 1;
514 }
515 for field in fields {
516 refs[len] = EntryField::Structured(*field);
517 len += 1;
518 }
519 self.active_file.as_mut().unwrap().write_entry_fields(
520 refs[..len].iter().copied(),
521 realtime,
522 monotonic,
523 options,
524 )?;
525 } else {
526 let mut refs = Vec::with_capacity(total_items);
527 refs.push(EntryField::raw(self.boot_id_field.as_slice()));
528 if let Some(source_field) = source_field {
529 refs.push(EntryField::raw(source_field));
530 }
531 refs.extend(fields.iter().copied().map(EntryField::Structured));
532 self.active_file.as_mut().unwrap().write_entry_fields(
533 refs.iter().copied(),
534 realtime,
535 monotonic,
536 options,
537 )?;
538 }
539
540 Ok(())
541 }
542
543 fn low_level_entry_options(&self, options: EntryWriteOptions) -> EntryWriteOptions {
544 options.field_name_policy(log_writer_field_name_policy(self.config.field_name_policy))
545 }
546
547 fn prepare_source_realtime_field(&mut self, timestamp_usec: u64) {
548 self.source_realtime_field.clear();
549 self.source_realtime_field
550 .extend_from_slice(SOURCE_REALTIME_PREFIX);
551 let mut buffer = ItoaBuffer::new();
552 self.source_realtime_field
553 .extend_from_slice(buffer.format(timestamp_usec).as_bytes());
554 }
555
556 pub fn sync(&mut self) -> Result<()> {
561 if let Some(active_file) = &mut self.active_file {
562 active_file.journal_file.sync()?;
563 }
564 Ok(())
565 }
566
567 pub fn close(mut self) -> Result<()> {
574 use journal_core::file::JournalState;
575
576 let Some(mut active_file) = self.active_file.take() else {
577 return Ok(());
578 };
579
580 let n_entries = active_file.journal_file.journal_header_ref().n_entries;
581 if self.config.strict_systemd_naming && n_entries == 0 {
582 match std::fs::remove_file(active_file.repository_file.path()) {
583 Ok(()) => {}
584 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
585 Err(err) => return Err(err.into()),
586 }
587 self.chain.remove_tracked_file(&active_file.repository_file);
588 return Ok(());
589 }
590
591 self.chain.update_file_size(
592 &active_file.repository_file,
593 active_file.current_file_size(),
594 );
595 active_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
596 active_file.journal_file.sync()?;
597
598 let protected_file = if self.config.strict_systemd_naming {
599 let header = active_file.journal_file.journal_header_ref();
600 self.chain.archive_file(
601 &active_file.repository_file,
602 uuid::Uuid::from_bytes(header.seqnum_id),
603 header.head_entry_seqnum,
604 header.head_entry_realtime,
605 )?
606 } else {
607 active_file.repository_file.clone()
608 };
609
610 self.apply_retention(Some(&protected_file))?;
611
612 Ok(())
613 }
614
615 pub fn active_file(&self) -> Option<&repository::File> {
616 self.active_file
617 .as_ref()
618 .map(|active_file| &active_file.repository_file)
619 }
620
621 pub fn active_path(&self) -> Option<&Path> {
622 self.active_file
623 .as_ref()
624 .map(|active_file| Path::new(active_file.repository_file.path()))
625 }
626
627 pub fn configured_directory(&self) -> &Path {
628 &self.configured_dir
629 }
630
631 pub fn journal_directory(&self) -> &Path {
632 &self.chain.path
633 }
634
635 pub fn machine_id(&self) -> uuid::Uuid {
636 self.chain.machine_id
637 }
638
639 pub fn boot_id(&self) -> uuid::Uuid {
640 self.boot_id
641 }
642
643 pub fn source(&self) -> &journal_registry::Source {
644 &self.chain.source
645 }
646
647 pub fn enforce_retention(&mut self) -> Result<()> {
651 let protected_file = if let Some(active_file) = &self.active_file {
652 self.chain.update_file_size(
653 &active_file.repository_file,
654 active_file.current_file_size(),
655 );
656 Some(active_file.repository_file.clone())
657 } else {
658 None
659 };
660 self.apply_retention(protected_file.as_ref())
661 }
662
663 fn update_active_file_size(&mut self) {
664 if let Some(active_file) = &self.active_file {
665 self.chain.update_file_size(
666 &active_file.repository_file,
667 active_file.current_file_size(),
668 );
669 }
670 }
671
672 fn prepare_initial_rotation(&mut self) -> Result<()> {
673 self.update_active_file_size();
674 if self.active_file.is_none() && self.config.strict_systemd_naming {
675 self.chain.archive_existing_active_file()?;
676 }
677 Ok(())
678 }
679
680 fn archive_rotated_file(&mut self, old_file: &ActiveFile) -> Result<repository::File> {
681 if !self.config.strict_systemd_naming {
682 return Ok(old_file.repository_file.clone());
683 }
684 let old_header = old_file.journal_file.journal_header_ref();
685 self.chain.archive_file(
686 &old_file.repository_file,
687 uuid::Uuid::from_bytes(old_header.seqnum_id),
688 old_header.head_entry_seqnum,
689 old_header.head_entry_realtime,
690 )
691 }
692
693 fn rotate_existing_active_file(
694 &mut self,
695 mut old_file: ActiveFile,
696 max_file_size: Option<u64>,
697 head_realtime: u64,
698 ) -> Result<(ActiveFile, LogLifecycleEvent)> {
699 use journal_core::file::JournalState;
700
701 old_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
702 old_file.journal_file.sync()?;
703 let archived = self.archive_rotated_file(&old_file)?;
704 let new_file = old_file.rotate(
705 &mut self.chain,
706 max_file_size,
707 head_realtime,
708 self.config.compression,
709 self.config.compression_threshold,
710 self.config.strict_systemd_naming,
711 self.config.live_publish_every_entries,
712 self.config.file_mode,
713 )?;
714 let active = new_file.repository_file.clone();
715 Ok((new_file, LogLifecycleEvent::Rotated { archived, active }))
716 }
717
718 fn create_initial_active_file(
719 &mut self,
720 max_file_size: Option<u64>,
721 head_realtime: u64,
722 reason: LogLifecycleReason,
723 ) -> Result<(ActiveFile, LogLifecycleEvent)> {
724 let new_file = ActiveFile::create(
725 &mut self.chain,
726 self.seqnum_id,
727 self.boot_id,
728 self.current_seqnum + 1,
729 max_file_size,
730 head_realtime,
731 self.config.compression,
732 self.config.compression_threshold,
733 self.config.compact,
734 self.config.strict_systemd_naming,
735 self.config.live_publish_every_entries,
736 self.config.file_mode,
737 )?;
738 let active = new_file.repository_file.clone();
739 Ok((new_file, LogLifecycleEvent::Created { active, reason }))
740 }
741
742 fn emit_lifecycle_event(&self, event: &LogLifecycleEvent) {
743 if let Some(observer) = &self.lifecycle_observer {
744 observer.on_event(event);
745 }
746 }
747
748 fn protected_active_file(&self) -> Option<repository::File> {
749 self.active_file
750 .as_ref()
751 .map(|active_file| active_file.repository_file.clone())
752 }
753
754 #[tracing::instrument(skip_all, fields(active_file))]
755 fn rotate(&mut self, head_realtime: u64, reason: LogLifecycleReason) -> Result<()> {
756 self.prepare_initial_rotation()?;
757 let max_file_size = self.config.rotation_policy.size_of_journal_file;
758 let (new_file, lifecycle_event) = if let Some(old_file) = self.active_file.take() {
759 self.rotate_existing_active_file(old_file, max_file_size, head_realtime)?
760 } else {
761 self.create_initial_active_file(max_file_size, head_realtime, reason)?
762 };
763
764 tracing::Span::current().record("new_file", new_file.repository_file.path());
765
766 self.active_file = Some(new_file);
767 self.rotation_state.reset();
768 self.update_active_file_size();
769 self.emit_lifecycle_event(&lifecycle_event);
770
771 let protected_file = self.protected_active_file();
774 self.apply_retention(protected_file.as_ref())?;
775
776 Ok(())
777 }
778
779 #[cfg(feature = "serde-api")]
833 pub fn write_structured<T: serde::Serialize>(&mut self, value: &T) -> Result<()> {
834 let json_value = serde_json::to_value(value).map_err(|e| {
836 WriterError::Serialization(format!("failed to serialize to JSON: {}", e))
837 })?;
838
839 let flattened = if let serde_json::Value::Object(map) = json_value {
841 flatten_json_map(&map)
842 } else {
843 return Err(WriterError::Serialization(
845 "value must be a JSON object, not a primitive or array".to_string(),
846 ));
847 };
848
849 let mut fields: Vec<Vec<u8>> = Vec::with_capacity(flattened.len());
851
852 for (key, value) in flattened.iter() {
853 let journal_key = key.to_uppercase().replace('.', "_");
856
857 let field = match value {
859 serde_json::Value::String(s) => {
860 format!("{}={}", journal_key, s)
861 }
862 serde_json::Value::Number(n) => {
863 format!("{}={}", journal_key, n)
864 }
865 serde_json::Value::Bool(b) => {
866 format!("{}={}", journal_key, if *b { "true" } else { "false" })
867 }
868 serde_json::Value::Null => {
869 format!("{}=", journal_key)
870 }
871 _ => {
873 format!("{}={}", journal_key, value)
874 }
875 };
876
877 fields.push(field.into_bytes());
878 }
879
880 let field_refs: Vec<&[u8]> = fields.iter().map(|f| f.as_slice()).collect();
882
883 self.write_entry(&field_refs, None)
884 }
885}
886
887#[cfg(all(test, feature = "serde-api"))]
888mod serde_api_tests;
889
890impl Drop for Log {
891 fn drop(&mut self) {
892 use journal_core::file::JournalState;
893
894 if let Some(ref mut active_file) = self.active_file {
895 active_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
899
900 let _ = active_file.journal_file.sync();
902 }
903 }
904}