1mod chain;
2use chain::OwnedChain;
3
4mod config;
5pub use config::{Config, LogIdentityMode, LogOpenMode, RetentionPolicy, RotationPolicy};
6
7mod helpers;
8mod startup;
9use helpers::*;
10use startup::{ActiveFile, RotationState, build_startup_state};
11
12use crate::{Result, WriterError};
13use itoa::Buffer as ItoaBuffer;
14pub use journal_common::EntryTimestamps;
15use journal_common::{Microseconds, RealtimeClock};
16use journal_core::error::JournalError;
17use journal_core::file::mmap::MmapMut;
18use journal_core::file::{
19 Compression, EntryField, EntryWriteOptions, FieldNamePolicy, JournalFile, JournalFileOptions,
20 JournalWriter, StructuredField,
21};
22use journal_registry::repository;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25
26const STACK_ENTRY_REF_LIMIT: usize = 128;
27const SOURCE_REALTIME_PREFIX: &[u8] = b"_SOURCE_REALTIME_TIMESTAMP=";
28const DERIVED_ROTATION_FRACTION: u64 = 20;
29const JOURNAL_FILE_SIZE_MIN: u64 = 512 * 1024;
30const PAGE_SIZE: u64 = 4096;
31const JOURNAL_COMPACT_SIZE_MAX: u64 = u32::MAX as u64;
32
33pub struct Log {
35 configured_dir: PathBuf,
36 chain: OwnedChain,
37 config: Config,
38 active_file: Option<ActiveFile>,
39 rotation_state: RotationState,
40 boot_id: uuid::Uuid,
41 seqnum_id: uuid::Uuid,
42 current_seqnum: u64,
43 clock: RealtimeClock,
44 last_monotonic_usec: u64,
45 lifecycle_observer: Option<Arc<dyn LogLifecycleObserver>>,
46 artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
47 retention_on_open_applied: bool,
48 boot_id_field: Vec<u8>,
49 source_realtime_field: Vec<u8>,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum LogLifecycleReason {
54 Append,
55 EagerOpen,
56 Rotation,
57 Retention,
58}
59
60#[derive(Debug, Clone)]
61pub enum LogLifecycleEvent {
62 Created {
63 active: repository::File,
64 reason: LogLifecycleReason,
65 },
66 Rotated {
67 archived: repository::File,
68 active: repository::File,
69 },
70 RetainedDeleted {
71 files: Vec<repository::File>,
72 },
73}
74
75pub trait LogLifecycleObserver: Send + Sync {
76 fn on_event(&self, event: &LogLifecycleEvent);
77}
78
79pub trait LogArtifactSizer: Send + Sync {
80 fn journal_artifact_size(&self, journal_path: &Path) -> Result<u64>;
81}
82
83impl Log {
84 fn duration_to_micros(duration: std::time::Duration) -> u64 {
85 duration.as_micros().try_into().unwrap_or(u64::MAX)
86 }
87
88 fn peek_entry_realtime(&self, timestamps: &EntryTimestamps) -> u64 {
89 let candidate = timestamps
90 .entry_realtime_usec
91 .unwrap_or_else(|| Microseconds::now().get());
92 let last_seen = self.clock.last_seen().get();
93 if candidate > last_seen {
94 candidate
95 } else {
96 last_seen.saturating_add(1)
97 }
98 }
99
100 fn should_rotate_for_realtime(&self, realtime: u64) -> bool {
101 let Some(active_file) = &self.active_file else {
102 return true;
103 };
104 if self.rotation_state.should_rotate() {
105 return true;
106 }
107 let Some(max_duration) = self.config.rotation_policy.duration_of_journal_file else {
108 return false;
109 };
110 let header = active_file.journal_file.journal_header_ref();
111 header.n_entries > 0
112 && header.head_entry_realtime > 0
113 && realtime.saturating_sub(header.head_entry_realtime)
114 >= Self::duration_to_micros(max_duration)
115 }
116
117 fn append_rotation_reason(&self) -> LogLifecycleReason {
118 if self.active_file.is_none() {
119 LogLifecycleReason::Append
120 } else {
121 LogLifecycleReason::Rotation
122 }
123 }
124
125 fn prepare_append_for_realtime(&mut self, entry_realtime: u64) -> Result<()> {
126 self.apply_retention_on_open()?;
127 let opened_first_active = self.active_file.is_none();
128 if self.should_rotate_for_realtime(entry_realtime) {
129 self.rotate(entry_realtime, self.append_rotation_reason())?;
130 if opened_first_active {
131 self.retention_on_open_applied = true;
132 }
133 }
134 self.apply_retention_on_open()
135 }
136
137 fn raw_items_for_policy<'a>(&self, items: &'a [&'a [u8]]) -> Result<Option<Vec<&'a [u8]>>> {
138 if self.config.field_name_policy != FieldNamePolicy::JournalApp {
139 return Ok(None);
140 }
141 let filtered_items = filter_raw_items_for_journal_app(items)?;
142 if filtered_items.is_empty() {
143 return Err(WriterError::EmptyEntry);
144 }
145 Ok(Some(filtered_items))
146 }
147
148 fn structured_fields_for_policy<'a>(
149 &self,
150 fields: &'a [StructuredField<'a>],
151 ) -> Result<Option<Vec<StructuredField<'a>>>> {
152 if self.config.field_name_policy != FieldNamePolicy::JournalApp {
153 return Ok(None);
154 }
155 let filtered_fields = filter_structured_fields_for_journal_app(fields);
156 if filtered_fields.is_empty() {
157 return Err(WriterError::EmptyEntry);
158 }
159 Ok(Some(filtered_fields))
160 }
161
162 fn apply_retention(&mut self, protected_file: Option<&repository::File>) -> Result<()> {
163 if let Some(sizer) = &self.artifact_sizer {
164 self.chain.refresh_retained_sizes(|file| {
165 sizer.journal_artifact_size(Path::new(file.path()))
166 })?;
167 }
168 let retention = self
169 .chain
170 .retain(&self.config.retention_policy, protected_file);
171 let deleted_files = retention.deleted_files;
172 if !deleted_files.is_empty()
173 && let Some(observer) = &self.lifecycle_observer
174 {
175 observer.on_event(&LogLifecycleEvent::RetainedDeleted {
176 files: deleted_files,
177 });
178 }
179 if let Some(error) = retention.error {
180 return Err(error);
181 }
182
183 Ok(())
184 }
185
186 fn apply_retention_on_open(&mut self) -> Result<()> {
187 if self.retention_on_open_applied || self.active_file.is_none() {
188 return Ok(());
189 }
190 self.enforce_retention()?;
191 self.retention_on_open_applied = true;
192 Ok(())
193 }
194
195 fn capture_dual_timestamp(
201 &mut self,
202 timestamp_override: Option<&EntryTimestamps>,
203 ) -> Result<(u64, u64)> {
204 let realtime = match timestamp_override.and_then(|ts| ts.entry_realtime_usec) {
205 Some(ts) => self.clock.observe(Microseconds::new(ts)).get(),
206 None => self.clock.now().get(),
207 };
208
209 let desired_monotonic = timestamp_override
210 .and_then(|ts| ts.entry_monotonic_usec)
211 .ok_or_else(|| {
212 WriterError::InvalidConfig("entry monotonic timestamp is required".to_string())
213 })?;
214
215 let monotonic = if desired_monotonic > self.last_monotonic_usec {
216 desired_monotonic
217 } else {
218 self.last_monotonic_usec.saturating_add(1)
219 };
220 self.last_monotonic_usec = monotonic;
221
222 Ok((realtime, monotonic))
223 }
224
225 fn require_entry_monotonic(timestamps: &EntryTimestamps) -> Result<()> {
226 if timestamps.entry_monotonic_usec.is_none() {
227 return Err(WriterError::InvalidConfig(
228 "entry monotonic timestamp is required".to_string(),
229 ));
230 }
231 Ok(())
232 }
233
234 pub fn new(path: &Path, config: Config) -> Result<Self> {
236 Self::new_inner(path, config, None, None)
237 }
238
239 pub fn new_with_lifecycle_observer(
240 path: &Path,
241 config: Config,
242 observer: Arc<dyn LogLifecycleObserver>,
243 ) -> Result<Self> {
244 Self::new_inner(path, config, Some(observer), None)
245 }
246
247 pub fn new_with_hooks(
248 path: &Path,
249 config: Config,
250 observer: Option<Arc<dyn LogLifecycleObserver>>,
251 artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
252 ) -> Result<Self> {
253 Self::new_inner(path, config, observer, artifact_sizer)
254 }
255
256 fn new_inner(
257 path: &Path,
258 config: Config,
259 lifecycle_observer: Option<Arc<dyn LogLifecycleObserver>>,
260 artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
261 ) -> Result<Self> {
262 let startup = build_startup_state(path, config)?;
263
264 let mut log = Log {
265 configured_dir: path.to_path_buf(),
266 chain: startup.chain,
267 config: startup.config,
268 active_file: startup.active_file,
269 rotation_state: startup.rotation_state,
270 boot_id: startup.boot_id,
271 seqnum_id: startup.seqnum_id,
272 current_seqnum: startup.current_seqnum,
273 clock: startup.clock,
274 last_monotonic_usec: startup.last_monotonic_usec,
275 lifecycle_observer,
276 artifact_sizer,
277 retention_on_open_applied: false,
278 boot_id_field: format!("_BOOT_ID={}", startup.boot_id.as_simple()).into_bytes(),
279 source_realtime_field: Vec::with_capacity(SOURCE_REALTIME_PREFIX.len() + 20),
280 };
281 if log.config.open_mode == LogOpenMode::Eager && log.active_file.is_none() {
282 let realtime = log.peek_entry_realtime(&EntryTimestamps::default());
283 log.rotate(realtime, LogLifecycleReason::EagerOpen)?;
284 log.retention_on_open_applied = true;
285 }
286 log.apply_retention_on_open()?;
287 Ok(log)
288 }
289
290 pub fn with_lifecycle_observer(mut self, observer: Arc<dyn LogLifecycleObserver>) -> Self {
291 self.lifecycle_observer = Some(observer);
292 self
293 }
294
295 pub fn with_artifact_sizer(mut self, sizer: Arc<dyn LogArtifactSizer>) -> Self {
296 self.artifact_sizer = Some(sizer);
297 self
298 }
299
300 #[deprecated(
306 since = "0.7.2",
307 note = "use write_entry_with_timestamps and provide an explicit entry monotonic timestamp"
308 )]
309 pub fn write_entry(
310 &mut self,
311 items: &[&[u8]],
312 source_realtime_usec: Option<u64>,
313 ) -> Result<()> {
314 self.write_entry_with_timestamps(
315 items,
316 EntryTimestamps {
317 source_realtime_usec,
318 ..EntryTimestamps::default()
319 },
320 )
321 }
322
323 pub fn write_entry_with_timestamps(
329 &mut self,
330 items: &[&[u8]],
331 timestamps: EntryTimestamps,
332 ) -> Result<()> {
333 if items.is_empty() {
334 return Err(WriterError::EmptyEntry);
335 }
336 Self::require_entry_monotonic(×tamps)?;
337
338 let entry_realtime = self.peek_entry_realtime(×tamps);
339 self.prepare_append_for_realtime(entry_realtime)?;
340 let filtered_items = self.raw_items_for_policy(items)?;
341 let write_items = filtered_items.as_deref().unwrap_or(items);
342
343 let (realtime, monotonic) = self.capture_dual_timestamp(Some(×tamps))?;
344 self.write_raw_entry_fields(
345 write_items,
346 timestamps.source_realtime_usec,
347 realtime,
348 monotonic,
349 self.low_level_entry_options(EntryWriteOptions::default()),
350 )?;
351
352 let active_file = self.active_file.as_ref().unwrap();
353 self.rotation_state.update(&active_file.writer);
354 self.current_seqnum += 1;
355
356 Ok(())
357 }
358
359 #[deprecated(
369 since = "0.7.2",
370 note = "use write_fields_with_timestamps and provide an explicit entry monotonic timestamp"
371 )]
372 pub fn write_fields(
373 &mut self,
374 fields: &[StructuredField<'_>],
375 source_realtime_usec: Option<u64>,
376 ) -> Result<()> {
377 self.write_fields_with_timestamps(
378 fields,
379 EntryTimestamps {
380 source_realtime_usec,
381 ..EntryTimestamps::default()
382 },
383 )
384 }
385
386 pub fn write_fields_with_timestamps(
392 &mut self,
393 fields: &[StructuredField<'_>],
394 timestamps: EntryTimestamps,
395 ) -> Result<()> {
396 self.write_fields_with_options(fields, timestamps, EntryWriteOptions::default())
397 }
398
399 pub fn write_fields_with_options(
405 &mut self,
406 fields: &[StructuredField<'_>],
407 timestamps: EntryTimestamps,
408 options: EntryWriteOptions,
409 ) -> Result<()> {
410 if fields.is_empty() {
411 return Err(WriterError::EmptyEntry);
412 }
413 Self::require_entry_monotonic(×tamps)?;
414
415 let entry_realtime = self.peek_entry_realtime(×tamps);
416 self.prepare_append_for_realtime(entry_realtime)?;
417 let filtered_fields = self.structured_fields_for_policy(fields)?;
418 let write_fields = filtered_fields.as_deref().unwrap_or(fields);
419
420 let (realtime, monotonic) = self.capture_dual_timestamp(Some(×tamps))?;
421 self.write_structured_entry_fields(
422 write_fields,
423 timestamps.source_realtime_usec,
424 realtime,
425 monotonic,
426 self.low_level_entry_options(options),
427 )?;
428
429 let active_file = self.active_file.as_ref().unwrap();
430 self.rotation_state.update(&active_file.writer);
431 self.current_seqnum += 1;
432
433 Ok(())
434 }
435
436 fn write_raw_entry_fields(
437 &mut self,
438 items: &[&[u8]],
439 source_realtime_usec: Option<u64>,
440 realtime: u64,
441 monotonic: u64,
442 options: EntryWriteOptions,
443 ) -> Result<()> {
444 let source_field = if let Some(timestamp_usec) = source_realtime_usec {
445 self.prepare_source_realtime_field(timestamp_usec);
446 Some(self.source_realtime_field.as_slice())
447 } else {
448 None
449 };
450
451 let total_items = items.len() + 1 + usize::from(source_field.is_some());
452 if total_items <= STACK_ENTRY_REF_LIMIT {
453 let mut refs = [EntryField::raw(&[]); STACK_ENTRY_REF_LIMIT];
454 let mut len = 0usize;
455 refs[len] = EntryField::raw(self.boot_id_field.as_slice());
456 len += 1;
457 if let Some(source_field) = source_field {
458 refs[len] = EntryField::raw(source_field);
459 len += 1;
460 }
461 for item in items {
462 refs[len] = EntryField::raw(item);
463 len += 1;
464 }
465 self.active_file.as_mut().unwrap().write_entry_fields(
466 refs[..len].iter().copied(),
467 realtime,
468 monotonic,
469 options,
470 )?;
471 } else {
472 let mut refs = Vec::with_capacity(total_items);
473 refs.push(EntryField::raw(self.boot_id_field.as_slice()));
474 if let Some(source_field) = source_field {
475 refs.push(EntryField::raw(source_field));
476 }
477 refs.extend(items.iter().copied().map(EntryField::raw));
478 self.active_file.as_mut().unwrap().write_entry_fields(
479 refs.iter().copied(),
480 realtime,
481 monotonic,
482 options,
483 )?;
484 }
485
486 Ok(())
487 }
488
489 fn write_structured_entry_fields(
490 &mut self,
491 fields: &[StructuredField<'_>],
492 source_realtime_usec: Option<u64>,
493 realtime: u64,
494 monotonic: u64,
495 options: EntryWriteOptions,
496 ) -> Result<()> {
497 let source_field = if let Some(timestamp_usec) = source_realtime_usec {
498 self.prepare_source_realtime_field(timestamp_usec);
499 Some(self.source_realtime_field.as_slice())
500 } else {
501 None
502 };
503
504 let total_items = fields.len() + 1 + usize::from(source_field.is_some());
505 if total_items <= STACK_ENTRY_REF_LIMIT {
506 let mut refs = [EntryField::raw(&[]); STACK_ENTRY_REF_LIMIT];
507 let mut len = 0usize;
508 refs[len] = EntryField::raw(self.boot_id_field.as_slice());
509 len += 1;
510 if let Some(source_field) = source_field {
511 refs[len] = EntryField::raw(source_field);
512 len += 1;
513 }
514 for field in fields {
515 refs[len] = EntryField::Structured(*field);
516 len += 1;
517 }
518 self.active_file.as_mut().unwrap().write_entry_fields(
519 refs[..len].iter().copied(),
520 realtime,
521 monotonic,
522 options,
523 )?;
524 } else {
525 let mut refs = Vec::with_capacity(total_items);
526 refs.push(EntryField::raw(self.boot_id_field.as_slice()));
527 if let Some(source_field) = source_field {
528 refs.push(EntryField::raw(source_field));
529 }
530 refs.extend(fields.iter().copied().map(EntryField::Structured));
531 self.active_file.as_mut().unwrap().write_entry_fields(
532 refs.iter().copied(),
533 realtime,
534 monotonic,
535 options,
536 )?;
537 }
538
539 Ok(())
540 }
541
542 fn low_level_entry_options(&self, options: EntryWriteOptions) -> EntryWriteOptions {
543 options.field_name_policy(log_writer_field_name_policy(self.config.field_name_policy))
544 }
545
546 fn prepare_source_realtime_field(&mut self, timestamp_usec: u64) {
547 self.source_realtime_field.clear();
548 self.source_realtime_field
549 .extend_from_slice(SOURCE_REALTIME_PREFIX);
550 let mut buffer = ItoaBuffer::new();
551 self.source_realtime_field
552 .extend_from_slice(buffer.format(timestamp_usec).as_bytes());
553 }
554
555 pub fn sync(&mut self) -> Result<()> {
560 if let Some(active_file) = &mut self.active_file {
561 active_file.journal_file.sync()?;
562 }
563 Ok(())
564 }
565
566 pub fn close(mut self) -> Result<()> {
573 use journal_core::file::JournalState;
574
575 let Some(mut active_file) = self.active_file.take() else {
576 return Ok(());
577 };
578
579 let n_entries = active_file.journal_file.journal_header_ref().n_entries;
580 if self.config.strict_systemd_naming && n_entries == 0 {
581 match std::fs::remove_file(active_file.repository_file.path()) {
582 Ok(()) => {}
583 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
584 Err(err) => return Err(err.into()),
585 }
586 self.chain.remove_tracked_file(&active_file.repository_file);
587 return Ok(());
588 }
589
590 self.chain.update_file_size(
591 &active_file.repository_file,
592 active_file.current_file_size(),
593 );
594 active_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
595 active_file.journal_file.sync()?;
596
597 let protected_file = if self.config.strict_systemd_naming {
598 let header = active_file.journal_file.journal_header_ref();
599 self.chain.archive_file(
600 &active_file.repository_file,
601 uuid::Uuid::from_bytes(header.seqnum_id),
602 header.head_entry_seqnum,
603 header.head_entry_realtime,
604 )?
605 } else {
606 active_file.repository_file.clone()
607 };
608
609 self.apply_retention(Some(&protected_file))?;
610
611 Ok(())
612 }
613
614 pub fn active_file(&self) -> Option<&repository::File> {
615 self.active_file
616 .as_ref()
617 .map(|active_file| &active_file.repository_file)
618 }
619
620 pub fn active_path(&self) -> Option<&Path> {
621 self.active_file
622 .as_ref()
623 .map(|active_file| Path::new(active_file.repository_file.path()))
624 }
625
626 pub fn configured_directory(&self) -> &Path {
627 &self.configured_dir
628 }
629
630 pub fn journal_directory(&self) -> &Path {
631 &self.chain.path
632 }
633
634 pub fn machine_id(&self) -> uuid::Uuid {
635 self.chain.machine_id
636 }
637
638 pub fn boot_id(&self) -> uuid::Uuid {
639 self.boot_id
640 }
641
642 pub fn source(&self) -> &journal_registry::Source {
643 &self.chain.source
644 }
645
646 pub fn enforce_retention(&mut self) -> Result<()> {
650 let protected_file = if let Some(active_file) = &self.active_file {
651 self.chain.update_file_size(
652 &active_file.repository_file,
653 active_file.current_file_size(),
654 );
655 Some(active_file.repository_file.clone())
656 } else {
657 None
658 };
659 self.apply_retention(protected_file.as_ref())
660 }
661
662 fn update_active_file_size(&mut self) {
663 if let Some(active_file) = &self.active_file {
664 self.chain.update_file_size(
665 &active_file.repository_file,
666 active_file.current_file_size(),
667 );
668 }
669 }
670
671 fn prepare_initial_rotation(&mut self) -> Result<()> {
672 self.update_active_file_size();
673 if self.active_file.is_none() && self.config.strict_systemd_naming {
674 self.chain.archive_existing_active_file()?;
675 }
676 Ok(())
677 }
678
679 fn archive_rotated_file(&mut self, old_file: &ActiveFile) -> Result<repository::File> {
680 if !self.config.strict_systemd_naming {
681 return Ok(old_file.repository_file.clone());
682 }
683 let old_header = old_file.journal_file.journal_header_ref();
684 self.chain.archive_file(
685 &old_file.repository_file,
686 uuid::Uuid::from_bytes(old_header.seqnum_id),
687 old_header.head_entry_seqnum,
688 old_header.head_entry_realtime,
689 )
690 }
691
692 fn rotate_existing_active_file(
693 &mut self,
694 mut old_file: ActiveFile,
695 max_file_size: Option<u64>,
696 head_realtime: u64,
697 ) -> Result<(ActiveFile, LogLifecycleEvent)> {
698 use journal_core::file::JournalState;
699
700 old_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
701 old_file.journal_file.sync()?;
702 let archived = self.archive_rotated_file(&old_file)?;
703 let new_file = old_file.rotate(
704 &mut self.chain,
705 max_file_size,
706 head_realtime,
707 self.config.compression,
708 self.config.compression_threshold,
709 self.config.strict_systemd_naming,
710 self.config.live_publish_every_entries,
711 self.config.file_mode,
712 )?;
713 let active = new_file.repository_file.clone();
714 Ok((new_file, LogLifecycleEvent::Rotated { archived, active }))
715 }
716
717 fn create_initial_active_file(
718 &mut self,
719 max_file_size: Option<u64>,
720 head_realtime: u64,
721 reason: LogLifecycleReason,
722 ) -> Result<(ActiveFile, LogLifecycleEvent)> {
723 let new_file = ActiveFile::create(
724 &mut self.chain,
725 self.seqnum_id,
726 self.boot_id,
727 self.current_seqnum + 1,
728 max_file_size,
729 head_realtime,
730 self.config.compression,
731 self.config.compression_threshold,
732 self.config.compact,
733 self.config.strict_systemd_naming,
734 self.config.live_publish_every_entries,
735 self.config.file_mode,
736 )?;
737 let active = new_file.repository_file.clone();
738 Ok((new_file, LogLifecycleEvent::Created { active, reason }))
739 }
740
741 fn emit_lifecycle_event(&self, event: &LogLifecycleEvent) {
742 if let Some(observer) = &self.lifecycle_observer {
743 observer.on_event(event);
744 }
745 }
746
747 fn protected_active_file(&self) -> Option<repository::File> {
748 self.active_file
749 .as_ref()
750 .map(|active_file| active_file.repository_file.clone())
751 }
752
753 #[tracing::instrument(skip_all, fields(active_file))]
754 fn rotate(&mut self, head_realtime: u64, reason: LogLifecycleReason) -> Result<()> {
755 self.prepare_initial_rotation()?;
756 let max_file_size = self.config.rotation_policy.size_of_journal_file;
757 let (new_file, lifecycle_event) = if let Some(old_file) = self.active_file.take() {
758 self.rotate_existing_active_file(old_file, max_file_size, head_realtime)?
759 } else {
760 self.create_initial_active_file(max_file_size, head_realtime, reason)?
761 };
762
763 tracing::Span::current().record("new_file", new_file.repository_file.path());
764
765 self.active_file = Some(new_file);
766 self.rotation_state.reset();
767 self.update_active_file_size();
768 self.emit_lifecycle_event(&lifecycle_event);
769
770 let protected_file = self.protected_active_file();
773 self.apply_retention(protected_file.as_ref())?;
774
775 Ok(())
776 }
777
778 #[cfg(feature = "serde-api")]
836 #[deprecated(
837 since = "0.7.2",
838 note = "use write_structured_with_timestamps and provide an explicit entry monotonic timestamp"
839 )]
840 pub fn write_structured<T: serde::Serialize>(&mut self, value: &T) -> Result<()> {
841 self.write_structured_with_timestamps(value, EntryTimestamps::default())
842 }
843
844 #[cfg(feature = "serde-api")]
846 pub fn write_structured_with_timestamps<T: serde::Serialize>(
847 &mut self,
848 value: &T,
849 timestamps: EntryTimestamps,
850 ) -> Result<()> {
851 let json_value = serde_json::to_value(value).map_err(|e| {
853 WriterError::Serialization(format!("failed to serialize to JSON: {}", e))
854 })?;
855
856 let flattened = if let serde_json::Value::Object(map) = json_value {
858 flatten_json_map(&map)
859 } else {
860 return Err(WriterError::Serialization(
862 "value must be a JSON object, not a primitive or array".to_string(),
863 ));
864 };
865
866 let mut fields: Vec<Vec<u8>> = Vec::with_capacity(flattened.len());
868
869 for (key, value) in flattened.iter() {
870 let journal_key = key.to_uppercase().replace('.', "_");
873
874 let field = match value {
876 serde_json::Value::String(s) => {
877 format!("{}={}", journal_key, s)
878 }
879 serde_json::Value::Number(n) => {
880 format!("{}={}", journal_key, n)
881 }
882 serde_json::Value::Bool(b) => {
883 format!("{}={}", journal_key, if *b { "true" } else { "false" })
884 }
885 serde_json::Value::Null => {
886 format!("{}=", journal_key)
887 }
888 _ => {
890 format!("{}={}", journal_key, value)
891 }
892 };
893
894 fields.push(field.into_bytes());
895 }
896
897 let field_refs: Vec<&[u8]> = fields.iter().map(|f| f.as_slice()).collect();
899
900 self.write_entry_with_timestamps(&field_refs, timestamps)
901 }
902}
903
904#[cfg(all(test, feature = "serde-api"))]
905mod serde_api_tests;
906
907impl Drop for Log {
908 fn drop(&mut self) {
909 use journal_core::file::JournalState;
910
911 if let Some(ref mut active_file) = self.active_file {
912 active_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
916
917 let _ = active_file.journal_file.sync();
919 }
920 }
921}