Skip to main content

journal_log_writer/log/
mod.rs

1mod chain;
2use chain::OwnedChain;
3
4mod config;
5pub use config::{Config, LogIdentityMode, LogOpenMode, RetentionPolicy, RotationPolicy};
6
7mod helpers;
8mod startup;
9use helpers::*;
10use startup::{ActiveFile, RotationState, build_startup_state};
11
12use crate::{Result, WriterError};
13use itoa::Buffer as ItoaBuffer;
14pub use journal_common::EntryTimestamps;
15use journal_common::{Microseconds, RealtimeClock};
16use journal_core::error::JournalError;
17use journal_core::file::mmap::MmapMut;
18use journal_core::file::{
19    Compression, EntryField, EntryWriteOptions, FieldNamePolicy, JournalFile, JournalFileOptions,
20    JournalWriter, StructuredField,
21};
22use journal_registry::repository;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25
26const STACK_ENTRY_REF_LIMIT: usize = 128;
27const SOURCE_REALTIME_PREFIX: &[u8] = b"_SOURCE_REALTIME_TIMESTAMP=";
28const DERIVED_ROTATION_FRACTION: u64 = 20;
29const JOURNAL_FILE_SIZE_MIN: u64 = 512 * 1024;
30const PAGE_SIZE: u64 = 4096;
31const JOURNAL_COMPACT_SIZE_MAX: u64 = u32::MAX as u64;
32
33/// Tracks rotation state for size and count limits.
34pub struct Log {
35    configured_dir: PathBuf,
36    chain: OwnedChain,
37    config: Config,
38    active_file: Option<ActiveFile>,
39    rotation_state: RotationState,
40    boot_id: uuid::Uuid,
41    seqnum_id: uuid::Uuid,
42    current_seqnum: u64,
43    clock: RealtimeClock,
44    last_monotonic_usec: u64,
45    lifecycle_observer: Option<Arc<dyn LogLifecycleObserver>>,
46    artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
47    retention_on_open_applied: bool,
48    boot_id_field: Vec<u8>,
49    source_realtime_field: Vec<u8>,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum LogLifecycleReason {
54    Append,
55    EagerOpen,
56    Rotation,
57    Retention,
58}
59
60#[derive(Debug, Clone)]
61pub enum LogLifecycleEvent {
62    Created {
63        active: repository::File,
64        reason: LogLifecycleReason,
65    },
66    Rotated {
67        archived: repository::File,
68        active: repository::File,
69    },
70    RetainedDeleted {
71        files: Vec<repository::File>,
72    },
73}
74
75pub trait LogLifecycleObserver: Send + Sync {
76    fn on_event(&self, event: &LogLifecycleEvent);
77}
78
79pub trait LogArtifactSizer: Send + Sync {
80    fn journal_artifact_size(&self, journal_path: &Path) -> Result<u64>;
81}
82
83impl Log {
84    fn duration_to_micros(duration: std::time::Duration) -> u64 {
85        duration.as_micros().try_into().unwrap_or(u64::MAX)
86    }
87
88    fn peek_entry_realtime(&self, timestamps: &EntryTimestamps) -> u64 {
89        let candidate = timestamps
90            .entry_realtime_usec
91            .unwrap_or_else(|| Microseconds::now().get());
92        let last_seen = self.clock.last_seen().get();
93        if candidate > last_seen {
94            candidate
95        } else {
96            last_seen.saturating_add(1)
97        }
98    }
99
100    fn should_rotate_for_realtime(&self, realtime: u64) -> bool {
101        let Some(active_file) = &self.active_file else {
102            return true;
103        };
104        if self.rotation_state.should_rotate() {
105            return true;
106        }
107        let Some(max_duration) = self.config.rotation_policy.duration_of_journal_file else {
108            return false;
109        };
110        let header = active_file.journal_file.journal_header_ref();
111        header.n_entries > 0
112            && header.head_entry_realtime > 0
113            && realtime.saturating_sub(header.head_entry_realtime)
114                >= Self::duration_to_micros(max_duration)
115    }
116
117    fn append_rotation_reason(&self) -> LogLifecycleReason {
118        if self.active_file.is_none() {
119            LogLifecycleReason::Append
120        } else {
121            LogLifecycleReason::Rotation
122        }
123    }
124
125    fn prepare_append_for_realtime(&mut self, entry_realtime: u64) -> Result<()> {
126        self.apply_retention_on_open()?;
127        let opened_first_active = self.active_file.is_none();
128        if self.should_rotate_for_realtime(entry_realtime) {
129            self.rotate(entry_realtime, self.append_rotation_reason())?;
130            if opened_first_active {
131                self.retention_on_open_applied = true;
132            }
133        }
134        self.apply_retention_on_open()
135    }
136
137    fn raw_items_for_policy<'a>(&self, items: &'a [&'a [u8]]) -> Result<Option<Vec<&'a [u8]>>> {
138        if self.config.field_name_policy != FieldNamePolicy::JournalApp {
139            return Ok(None);
140        }
141        let filtered_items = filter_raw_items_for_journal_app(items)?;
142        if filtered_items.is_empty() {
143            return Err(WriterError::EmptyEntry);
144        }
145        Ok(Some(filtered_items))
146    }
147
148    fn structured_fields_for_policy<'a>(
149        &self,
150        fields: &'a [StructuredField<'a>],
151    ) -> Result<Option<Vec<StructuredField<'a>>>> {
152        if self.config.field_name_policy != FieldNamePolicy::JournalApp {
153            return Ok(None);
154        }
155        let filtered_fields = filter_structured_fields_for_journal_app(fields);
156        if filtered_fields.is_empty() {
157            return Err(WriterError::EmptyEntry);
158        }
159        Ok(Some(filtered_fields))
160    }
161
162    fn apply_retention(&mut self, protected_file: Option<&repository::File>) -> Result<()> {
163        if let Some(sizer) = &self.artifact_sizer {
164            self.chain.refresh_retained_sizes(|file| {
165                sizer.journal_artifact_size(Path::new(file.path()))
166            })?;
167        }
168        let retention = self
169            .chain
170            .retain(&self.config.retention_policy, protected_file);
171        let deleted_files = retention.deleted_files;
172        if !deleted_files.is_empty()
173            && let Some(observer) = &self.lifecycle_observer
174        {
175            observer.on_event(&LogLifecycleEvent::RetainedDeleted {
176                files: deleted_files,
177            });
178        }
179        if let Some(error) = retention.error {
180            return Err(error);
181        }
182
183        Ok(())
184    }
185
186    fn apply_retention_on_open(&mut self) -> Result<()> {
187        if self.retention_on_open_applied || self.active_file.is_none() {
188            return Ok(());
189        }
190        self.enforce_retention()?;
191        self.retention_on_open_applied = true;
192        Ok(())
193    }
194
195    /// Captures both realtime and monotonic timestamps, similar to systemd's dual_timestamp_now().
196    ///
197    /// Returns (realtime_usec, monotonic_usec) where:
198    /// - realtime: microseconds since Unix epoch (CLOCK_REALTIME), monotonically increasing
199    /// - monotonic: microseconds since boot (CLOCK_MONOTONIC)
200    fn capture_dual_timestamp(
201        &mut self,
202        timestamp_override: Option<&EntryTimestamps>,
203    ) -> Result<(u64, u64)> {
204        let realtime = match timestamp_override.and_then(|ts| ts.entry_realtime_usec) {
205            Some(ts) => self.clock.observe(Microseconds::new(ts)).get(),
206            None => self.clock.now().get(),
207        };
208
209        let desired_monotonic = timestamp_override
210            .and_then(|ts| ts.entry_monotonic_usec)
211            .ok_or_else(|| {
212                WriterError::InvalidConfig("entry monotonic timestamp is required".to_string())
213            })?;
214
215        let monotonic = if desired_monotonic > self.last_monotonic_usec {
216            desired_monotonic
217        } else {
218            self.last_monotonic_usec.saturating_add(1)
219        };
220        self.last_monotonic_usec = monotonic;
221
222        Ok((realtime, monotonic))
223    }
224
225    fn require_entry_monotonic(timestamps: &EntryTimestamps) -> Result<()> {
226        if timestamps.entry_monotonic_usec.is_none() {
227            return Err(WriterError::InvalidConfig(
228                "entry monotonic timestamp is required".to_string(),
229            ));
230        }
231        Ok(())
232    }
233
234    /// Creates a new journal log.
235    pub fn new(path: &Path, config: Config) -> Result<Self> {
236        Self::new_inner(path, config, None, None)
237    }
238
239    pub fn new_with_lifecycle_observer(
240        path: &Path,
241        config: Config,
242        observer: Arc<dyn LogLifecycleObserver>,
243    ) -> Result<Self> {
244        Self::new_inner(path, config, Some(observer), None)
245    }
246
247    pub fn new_with_hooks(
248        path: &Path,
249        config: Config,
250        observer: Option<Arc<dyn LogLifecycleObserver>>,
251        artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
252    ) -> Result<Self> {
253        Self::new_inner(path, config, observer, artifact_sizer)
254    }
255
256    fn new_inner(
257        path: &Path,
258        config: Config,
259        lifecycle_observer: Option<Arc<dyn LogLifecycleObserver>>,
260        artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
261    ) -> Result<Self> {
262        let startup = build_startup_state(path, config)?;
263
264        let mut log = Log {
265            configured_dir: path.to_path_buf(),
266            chain: startup.chain,
267            config: startup.config,
268            active_file: startup.active_file,
269            rotation_state: startup.rotation_state,
270            boot_id: startup.boot_id,
271            seqnum_id: startup.seqnum_id,
272            current_seqnum: startup.current_seqnum,
273            clock: startup.clock,
274            last_monotonic_usec: startup.last_monotonic_usec,
275            lifecycle_observer,
276            artifact_sizer,
277            retention_on_open_applied: false,
278            boot_id_field: format!("_BOOT_ID={}", startup.boot_id.as_simple()).into_bytes(),
279            source_realtime_field: Vec::with_capacity(SOURCE_REALTIME_PREFIX.len() + 20),
280        };
281        if log.config.open_mode == LogOpenMode::Eager && log.active_file.is_none() {
282            let realtime = log.peek_entry_realtime(&EntryTimestamps::default());
283            log.rotate(realtime, LogLifecycleReason::EagerOpen)?;
284            log.retention_on_open_applied = true;
285        }
286        log.apply_retention_on_open()?;
287        Ok(log)
288    }
289
290    pub fn with_lifecycle_observer(mut self, observer: Arc<dyn LogLifecycleObserver>) -> Self {
291        self.lifecycle_observer = Some(observer);
292        self
293    }
294
295    pub fn with_artifact_sizer(mut self, sizer: Arc<dyn LogArtifactSizer>) -> Self {
296        self.artifact_sizer = Some(sizer);
297        self
298    }
299
300    /// Writes a journal entry.
301    ///
302    /// This compatibility method always returns an error under the strict
303    /// writer contract. Use [`Log::write_entry_with_timestamps`] and provide
304    /// an explicit entry monotonic timestamp.
305    #[deprecated(
306        since = "0.7.2",
307        note = "use write_entry_with_timestamps and provide an explicit entry monotonic timestamp"
308    )]
309    pub fn write_entry(
310        &mut self,
311        items: &[&[u8]],
312        source_realtime_usec: Option<u64>,
313    ) -> Result<()> {
314        self.write_entry_with_timestamps(
315            items,
316            EntryTimestamps {
317                source_realtime_usec,
318                ..EntryTimestamps::default()
319            },
320        )
321    }
322
323    /// Writes a journal entry with optional source and entry timestamp overrides.
324    ///
325    /// Overrides are safe by construction:
326    /// - entry realtime is clamped to strict monotonic progression (`last + 1us` floor)
327    /// - entry monotonic is also clamped to strict monotonic progression (`last + 1us` floor)
328    pub fn write_entry_with_timestamps(
329        &mut self,
330        items: &[&[u8]],
331        timestamps: EntryTimestamps,
332    ) -> Result<()> {
333        if items.is_empty() {
334            return Err(WriterError::EmptyEntry);
335        }
336        Self::require_entry_monotonic(&timestamps)?;
337
338        let entry_realtime = self.peek_entry_realtime(&timestamps);
339        self.prepare_append_for_realtime(entry_realtime)?;
340        let filtered_items = self.raw_items_for_policy(items)?;
341        let write_items = filtered_items.as_deref().unwrap_or(items);
342
343        let (realtime, monotonic) = self.capture_dual_timestamp(Some(&timestamps))?;
344        self.write_raw_entry_fields(
345            write_items,
346            timestamps.source_realtime_usec,
347            realtime,
348            monotonic,
349            self.low_level_entry_options(EntryWriteOptions::default()),
350        )?;
351
352        let active_file = self.active_file.as_ref().unwrap();
353        self.rotation_state.update(&active_file.writer);
354        self.current_seqnum += 1;
355
356        Ok(())
357    }
358
359    /// Writes a journal entry from structured field names and binary-safe values.
360    ///
361    /// This is the preferred path when the producer already has split field
362    /// names and values. If `source_realtime_usec` is provided, a
363    /// `_SOURCE_REALTIME_TIMESTAMP` field is added.
364    ///
365    /// This compatibility method always returns an error under the strict
366    /// writer contract. Use [`Log::write_fields_with_timestamps`] and provide
367    /// an explicit entry monotonic timestamp.
368    #[deprecated(
369        since = "0.7.2",
370        note = "use write_fields_with_timestamps and provide an explicit entry monotonic timestamp"
371    )]
372    pub fn write_fields(
373        &mut self,
374        fields: &[StructuredField<'_>],
375        source_realtime_usec: Option<u64>,
376    ) -> Result<()> {
377        self.write_fields_with_timestamps(
378            fields,
379            EntryTimestamps {
380                source_realtime_usec,
381                ..EntryTimestamps::default()
382            },
383        )
384    }
385
386    /// Writes structured fields with optional source and entry timestamp overrides.
387    ///
388    /// Entry monotonic timestamp is required. Entry realtime and monotonic
389    /// overrides use the same clamping rules as
390    /// [`Log::write_entry_with_timestamps`].
391    pub fn write_fields_with_timestamps(
392        &mut self,
393        fields: &[StructuredField<'_>],
394        timestamps: EntryTimestamps,
395    ) -> Result<()> {
396        self.write_fields_with_options(fields, timestamps, EntryWriteOptions::default())
397    }
398
399    /// Writes structured fields with explicit low-level entry write options.
400    ///
401    /// Use this only when the caller can satisfy any invariants required by the
402    /// selected [`EntryWriteOptions`], especially no duplicate full `KEY=value`
403    /// payloads when `trusted_unique_payloads` is enabled.
404    pub fn write_fields_with_options(
405        &mut self,
406        fields: &[StructuredField<'_>],
407        timestamps: EntryTimestamps,
408        options: EntryWriteOptions,
409    ) -> Result<()> {
410        if fields.is_empty() {
411            return Err(WriterError::EmptyEntry);
412        }
413        Self::require_entry_monotonic(&timestamps)?;
414
415        let entry_realtime = self.peek_entry_realtime(&timestamps);
416        self.prepare_append_for_realtime(entry_realtime)?;
417        let filtered_fields = self.structured_fields_for_policy(fields)?;
418        let write_fields = filtered_fields.as_deref().unwrap_or(fields);
419
420        let (realtime, monotonic) = self.capture_dual_timestamp(Some(&timestamps))?;
421        self.write_structured_entry_fields(
422            write_fields,
423            timestamps.source_realtime_usec,
424            realtime,
425            monotonic,
426            self.low_level_entry_options(options),
427        )?;
428
429        let active_file = self.active_file.as_ref().unwrap();
430        self.rotation_state.update(&active_file.writer);
431        self.current_seqnum += 1;
432
433        Ok(())
434    }
435
436    fn write_raw_entry_fields(
437        &mut self,
438        items: &[&[u8]],
439        source_realtime_usec: Option<u64>,
440        realtime: u64,
441        monotonic: u64,
442        options: EntryWriteOptions,
443    ) -> Result<()> {
444        let source_field = if let Some(timestamp_usec) = source_realtime_usec {
445            self.prepare_source_realtime_field(timestamp_usec);
446            Some(self.source_realtime_field.as_slice())
447        } else {
448            None
449        };
450
451        let total_items = items.len() + 1 + usize::from(source_field.is_some());
452        if total_items <= STACK_ENTRY_REF_LIMIT {
453            let mut refs = [EntryField::raw(&[]); STACK_ENTRY_REF_LIMIT];
454            let mut len = 0usize;
455            refs[len] = EntryField::raw(self.boot_id_field.as_slice());
456            len += 1;
457            if let Some(source_field) = source_field {
458                refs[len] = EntryField::raw(source_field);
459                len += 1;
460            }
461            for item in items {
462                refs[len] = EntryField::raw(item);
463                len += 1;
464            }
465            self.active_file.as_mut().unwrap().write_entry_fields(
466                refs[..len].iter().copied(),
467                realtime,
468                monotonic,
469                options,
470            )?;
471        } else {
472            let mut refs = Vec::with_capacity(total_items);
473            refs.push(EntryField::raw(self.boot_id_field.as_slice()));
474            if let Some(source_field) = source_field {
475                refs.push(EntryField::raw(source_field));
476            }
477            refs.extend(items.iter().copied().map(EntryField::raw));
478            self.active_file.as_mut().unwrap().write_entry_fields(
479                refs.iter().copied(),
480                realtime,
481                monotonic,
482                options,
483            )?;
484        }
485
486        Ok(())
487    }
488
489    fn write_structured_entry_fields(
490        &mut self,
491        fields: &[StructuredField<'_>],
492        source_realtime_usec: Option<u64>,
493        realtime: u64,
494        monotonic: u64,
495        options: EntryWriteOptions,
496    ) -> Result<()> {
497        let source_field = if let Some(timestamp_usec) = source_realtime_usec {
498            self.prepare_source_realtime_field(timestamp_usec);
499            Some(self.source_realtime_field.as_slice())
500        } else {
501            None
502        };
503
504        let total_items = fields.len() + 1 + usize::from(source_field.is_some());
505        if total_items <= STACK_ENTRY_REF_LIMIT {
506            let mut refs = [EntryField::raw(&[]); STACK_ENTRY_REF_LIMIT];
507            let mut len = 0usize;
508            refs[len] = EntryField::raw(self.boot_id_field.as_slice());
509            len += 1;
510            if let Some(source_field) = source_field {
511                refs[len] = EntryField::raw(source_field);
512                len += 1;
513            }
514            for field in fields {
515                refs[len] = EntryField::Structured(*field);
516                len += 1;
517            }
518            self.active_file.as_mut().unwrap().write_entry_fields(
519                refs[..len].iter().copied(),
520                realtime,
521                monotonic,
522                options,
523            )?;
524        } else {
525            let mut refs = Vec::with_capacity(total_items);
526            refs.push(EntryField::raw(self.boot_id_field.as_slice()));
527            if let Some(source_field) = source_field {
528                refs.push(EntryField::raw(source_field));
529            }
530            refs.extend(fields.iter().copied().map(EntryField::Structured));
531            self.active_file.as_mut().unwrap().write_entry_fields(
532                refs.iter().copied(),
533                realtime,
534                monotonic,
535                options,
536            )?;
537        }
538
539        Ok(())
540    }
541
542    fn low_level_entry_options(&self, options: EntryWriteOptions) -> EntryWriteOptions {
543        options.field_name_policy(log_writer_field_name_policy(self.config.field_name_policy))
544    }
545
546    fn prepare_source_realtime_field(&mut self, timestamp_usec: u64) {
547        self.source_realtime_field.clear();
548        self.source_realtime_field
549            .extend_from_slice(SOURCE_REALTIME_PREFIX);
550        let mut buffer = ItoaBuffer::new();
551        self.source_realtime_field
552            .extend_from_slice(buffer.format(timestamp_usec).as_bytes());
553    }
554
555    /// Syncs all written data to disk, ensuring durability.
556    ///
557    /// This should be called after writing a batch of log entries to ensure
558    /// they are persisted to disk before acknowledging the request.
559    pub fn sync(&mut self) -> Result<()> {
560        if let Some(active_file) = &mut self.active_file {
561            active_file.journal_file.sync()?;
562        }
563        Ok(())
564    }
565
566    /// Archives and closes the active file.
567    ///
568    /// In strict systemd naming mode this renames `<source>.journal` to the
569    /// chain filename before retention, matching the explicit close behavior of
570    /// the other SDK implementations. `Drop` remains best-effort for callers
571    /// that do not explicitly close.
572    pub fn close(mut self) -> Result<()> {
573        use journal_core::file::JournalState;
574
575        let Some(mut active_file) = self.active_file.take() else {
576            return Ok(());
577        };
578
579        let n_entries = active_file.journal_file.journal_header_ref().n_entries;
580        if self.config.strict_systemd_naming && n_entries == 0 {
581            match std::fs::remove_file(active_file.repository_file.path()) {
582                Ok(()) => {}
583                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
584                Err(err) => return Err(err.into()),
585            }
586            self.chain.remove_tracked_file(&active_file.repository_file);
587            return Ok(());
588        }
589
590        self.chain.update_file_size(
591            &active_file.repository_file,
592            active_file.current_file_size(),
593        );
594        active_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
595        active_file.journal_file.sync()?;
596
597        let protected_file = if self.config.strict_systemd_naming {
598            let header = active_file.journal_file.journal_header_ref();
599            self.chain.archive_file(
600                &active_file.repository_file,
601                uuid::Uuid::from_bytes(header.seqnum_id),
602                header.head_entry_seqnum,
603                header.head_entry_realtime,
604            )?
605        } else {
606            active_file.repository_file.clone()
607        };
608
609        self.apply_retention(Some(&protected_file))?;
610
611        Ok(())
612    }
613
614    pub fn active_file(&self) -> Option<&repository::File> {
615        self.active_file
616            .as_ref()
617            .map(|active_file| &active_file.repository_file)
618    }
619
620    pub fn active_path(&self) -> Option<&Path> {
621        self.active_file
622            .as_ref()
623            .map(|active_file| Path::new(active_file.repository_file.path()))
624    }
625
626    pub fn configured_directory(&self) -> &Path {
627        &self.configured_dir
628    }
629
630    pub fn journal_directory(&self) -> &Path {
631        &self.chain.path
632    }
633
634    pub fn machine_id(&self) -> uuid::Uuid {
635        self.chain.machine_id
636    }
637
638    pub fn boot_id(&self) -> uuid::Uuid {
639        self.boot_id
640    }
641
642    pub fn source(&self) -> &journal_registry::Source {
643        &self.chain.source
644    }
645
646    /// Applies the configured retention policy without requiring a rotation or
647    /// close. The current active file is counted in retention envelopes and is
648    /// protected from deletion.
649    pub fn enforce_retention(&mut self) -> Result<()> {
650        let protected_file = if let Some(active_file) = &self.active_file {
651            self.chain.update_file_size(
652                &active_file.repository_file,
653                active_file.current_file_size(),
654            );
655            Some(active_file.repository_file.clone())
656        } else {
657            None
658        };
659        self.apply_retention(protected_file.as_ref())
660    }
661
662    fn update_active_file_size(&mut self) {
663        if let Some(active_file) = &self.active_file {
664            self.chain.update_file_size(
665                &active_file.repository_file,
666                active_file.current_file_size(),
667            );
668        }
669    }
670
671    fn prepare_initial_rotation(&mut self) -> Result<()> {
672        self.update_active_file_size();
673        if self.active_file.is_none() && self.config.strict_systemd_naming {
674            self.chain.archive_existing_active_file()?;
675        }
676        Ok(())
677    }
678
679    fn archive_rotated_file(&mut self, old_file: &ActiveFile) -> Result<repository::File> {
680        if !self.config.strict_systemd_naming {
681            return Ok(old_file.repository_file.clone());
682        }
683        let old_header = old_file.journal_file.journal_header_ref();
684        self.chain.archive_file(
685            &old_file.repository_file,
686            uuid::Uuid::from_bytes(old_header.seqnum_id),
687            old_header.head_entry_seqnum,
688            old_header.head_entry_realtime,
689        )
690    }
691
692    fn rotate_existing_active_file(
693        &mut self,
694        mut old_file: ActiveFile,
695        max_file_size: Option<u64>,
696        head_realtime: u64,
697    ) -> Result<(ActiveFile, LogLifecycleEvent)> {
698        use journal_core::file::JournalState;
699
700        old_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
701        old_file.journal_file.sync()?;
702        let archived = self.archive_rotated_file(&old_file)?;
703        let new_file = old_file.rotate(
704            &mut self.chain,
705            max_file_size,
706            head_realtime,
707            self.config.compression,
708            self.config.compression_threshold,
709            self.config.strict_systemd_naming,
710            self.config.live_publish_every_entries,
711            self.config.file_mode,
712        )?;
713        let active = new_file.repository_file.clone();
714        Ok((new_file, LogLifecycleEvent::Rotated { archived, active }))
715    }
716
717    fn create_initial_active_file(
718        &mut self,
719        max_file_size: Option<u64>,
720        head_realtime: u64,
721        reason: LogLifecycleReason,
722    ) -> Result<(ActiveFile, LogLifecycleEvent)> {
723        let new_file = ActiveFile::create(
724            &mut self.chain,
725            self.seqnum_id,
726            self.boot_id,
727            self.current_seqnum + 1,
728            max_file_size,
729            head_realtime,
730            self.config.compression,
731            self.config.compression_threshold,
732            self.config.compact,
733            self.config.strict_systemd_naming,
734            self.config.live_publish_every_entries,
735            self.config.file_mode,
736        )?;
737        let active = new_file.repository_file.clone();
738        Ok((new_file, LogLifecycleEvent::Created { active, reason }))
739    }
740
741    fn emit_lifecycle_event(&self, event: &LogLifecycleEvent) {
742        if let Some(observer) = &self.lifecycle_observer {
743            observer.on_event(event);
744        }
745    }
746
747    fn protected_active_file(&self) -> Option<repository::File> {
748        self.active_file
749            .as_ref()
750            .map(|active_file| active_file.repository_file.clone())
751    }
752
753    #[tracing::instrument(skip_all, fields(active_file))]
754    fn rotate(&mut self, head_realtime: u64, reason: LogLifecycleReason) -> Result<()> {
755        self.prepare_initial_rotation()?;
756        let max_file_size = self.config.rotation_policy.size_of_journal_file;
757        let (new_file, lifecycle_event) = if let Some(old_file) = self.active_file.take() {
758            self.rotate_existing_active_file(old_file, max_file_size, head_realtime)?
759        } else {
760            self.create_initial_active_file(max_file_size, head_realtime, reason)?
761        };
762
763        tracing::Span::current().record("new_file", new_file.repository_file.path());
764
765        self.active_file = Some(new_file);
766        self.rotation_state.reset();
767        self.update_active_file_size();
768        self.emit_lifecycle_event(&lifecycle_event);
769
770        // Retention runs after the post-rotation current file is known, so the
771        // tracked current file counts in the envelope and is never deleted.
772        let protected_file = self.protected_active_file();
773        self.apply_retention(protected_file.as_ref())?;
774
775        Ok(())
776    }
777
778    /// Writes a journal entry from a serializable value.
779    ///
780    /// This method serializes the value to JSON, flattens it, and writes it to the journal.
781    /// The flattened structure converts nested JSON into KEY=VALUE pairs suitable for journal entries.
782    ///
783    /// # Example
784    ///
785    /// ```no_run
786    /// use serde::Serialize;
787    /// use journal_log_writer::{Config, EntryTimestamps, Log, RotationPolicy, RetentionPolicy};
788    /// use journal_registry::Origin;
789    /// use std::path::Path;
790    ///
791    /// #[derive(Serialize)]
792    /// struct LogEntry {
793    ///     message: String,
794    ///     level: String,
795    ///     user: User,
796    /// }
797    ///
798    /// #[derive(Serialize)]
799    /// struct User {
800    ///     id: u64,
801    ///     name: String,
802    /// }
803    ///
804    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
805    /// let origin = Origin {
806    ///     machine_id: Some("00112233445566778899aabbccddeeff".parse()?),
807    ///     namespace: None,
808    ///     source: journal_registry::Source::System,
809    /// };
810    /// let config = Config::new(origin, RotationPolicy::default(), RetentionPolicy::default())
811    ///     .with_boot_id("ffeeddccbbaa99887766554433221100".parse()?);
812    /// let mut log = Log::new(Path::new("/tmp/test-journal"), config)?;
813    ///
814    /// let entry = LogEntry {
815    ///     message: "User logged in".to_string(),
816    ///     level: "INFO".to_string(),
817    ///     user: User {
818    ///         id: 42,
819    ///         name: "alice".to_string(),
820    ///     },
821    /// };
822    ///
823    /// // This will write fields like:
824    /// // MESSAGE=User logged in
825    /// // LEVEL=INFO
826    /// // USER_ID=42
827    /// // USER_NAME=alice
828    /// let timestamps = EntryTimestamps::default()
829    ///     .with_entry_realtime_usec(1_700_000_000_000_000)
830    ///     .with_entry_monotonic_usec(1);
831    /// log.write_structured_with_timestamps(&entry, timestamps)?;
832    /// # Ok(())
833    /// # }
834    /// ```
835    #[cfg(feature = "serde-api")]
836    #[deprecated(
837        since = "0.7.2",
838        note = "use write_structured_with_timestamps and provide an explicit entry monotonic timestamp"
839    )]
840    pub fn write_structured<T: serde::Serialize>(&mut self, value: &T) -> Result<()> {
841        self.write_structured_with_timestamps(value, EntryTimestamps::default())
842    }
843
844    /// Writes a journal entry from a serializable value with explicit timestamps.
845    #[cfg(feature = "serde-api")]
846    pub fn write_structured_with_timestamps<T: serde::Serialize>(
847        &mut self,
848        value: &T,
849        timestamps: EntryTimestamps,
850    ) -> Result<()> {
851        // Serialize to JSON value
852        let json_value = serde_json::to_value(value).map_err(|e| {
853            WriterError::Serialization(format!("failed to serialize to JSON: {}", e))
854        })?;
855
856        // Flatten the JSON structure - requires a JSON object (Map)
857        let flattened = if let serde_json::Value::Object(map) = json_value {
858            flatten_json_map(&map)
859        } else {
860            // If not an object, return error
861            return Err(WriterError::Serialization(
862                "value must be a JSON object, not a primitive or array".to_string(),
863            ));
864        };
865
866        // Convert to journal field format (KEY=VALUE)
867        let mut fields: Vec<Vec<u8>> = Vec::with_capacity(flattened.len());
868
869        for (key, value) in flattened.iter() {
870            // Convert key to uppercase and replace dots with underscores
871            // (journal convention)
872            let journal_key = key.to_uppercase().replace('.', "_");
873
874            // Format as KEY=VALUE
875            let field = match value {
876                serde_json::Value::String(s) => {
877                    format!("{}={}", journal_key, s)
878                }
879                serde_json::Value::Number(n) => {
880                    format!("{}={}", journal_key, n)
881                }
882                serde_json::Value::Bool(b) => {
883                    format!("{}={}", journal_key, if *b { "true" } else { "false" })
884                }
885                serde_json::Value::Null => {
886                    format!("{}=", journal_key)
887                }
888                // Arrays and objects should be flattened already, but just in case
889                _ => {
890                    format!("{}={}", journal_key, value)
891                }
892            };
893
894            fields.push(field.into_bytes());
895        }
896
897        // Convert Vec<Vec<u8>> to Vec<&[u8]> for write_entry
898        let field_refs: Vec<&[u8]> = fields.iter().map(|f| f.as_slice()).collect();
899
900        self.write_entry_with_timestamps(&field_refs, timestamps)
901    }
902}
903
904#[cfg(all(test, feature = "serde-api"))]
905mod serde_api_tests;
906
907impl Drop for Log {
908    fn drop(&mut self) {
909        use journal_core::file::JournalState;
910
911        if let Some(ref mut active_file) = self.active_file {
912            // Keep the active path stable on close so file-backed readers that
913            // already follow system.journal can finish. The next writer startup
914            // archives this stale active file before creating a fresh one.
915            active_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
916
917            // Best/Last-effort sync just to be on the cautious side.
918            let _ = active_file.journal_file.sync();
919        }
920    }
921}