Skip to main content

journal_log_writer/log/
mod.rs

1mod chain;
2use chain::OwnedChain;
3
4mod config;
5pub use config::{Config, LogIdentityMode, LogOpenMode, RetentionPolicy, RotationPolicy};
6
7mod helpers;
8mod startup;
9use helpers::*;
10use startup::{ActiveFile, RotationState, build_startup_state};
11
12use crate::{Result, WriterError};
13use itoa::Buffer as ItoaBuffer;
14use journal_common::{Microseconds, RealtimeClock, monotonic_now};
15use journal_core::error::JournalError;
16use journal_core::file::mmap::MmapMut;
17use journal_core::file::{
18    Compression, EntryField, EntryWriteOptions, FieldNamePolicy, JournalFile, JournalFileOptions,
19    JournalWriter, StructuredField,
20};
21use journal_registry::repository;
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24
25const STACK_ENTRY_REF_LIMIT: usize = 128;
26const SOURCE_REALTIME_PREFIX: &[u8] = b"_SOURCE_REALTIME_TIMESTAMP=";
27const DERIVED_ROTATION_FRACTION: u64 = 20;
28const JOURNAL_FILE_SIZE_MIN: u64 = 512 * 1024;
29const PAGE_SIZE: u64 = 4096;
30const JOURNAL_COMPACT_SIZE_MAX: u64 = u32::MAX as u64;
31
32/// Tracks rotation state for size and count limits.
33pub struct Log {
34    configured_dir: PathBuf,
35    chain: OwnedChain,
36    config: Config,
37    active_file: Option<ActiveFile>,
38    rotation_state: RotationState,
39    boot_id: uuid::Uuid,
40    seqnum_id: uuid::Uuid,
41    current_seqnum: u64,
42    clock: RealtimeClock,
43    last_monotonic_usec: u64,
44    lifecycle_observer: Option<Arc<dyn LogLifecycleObserver>>,
45    artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
46    retention_on_open_applied: bool,
47    boot_id_field: Vec<u8>,
48    source_realtime_field: Vec<u8>,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum LogLifecycleReason {
53    Append,
54    EagerOpen,
55    Rotation,
56    Retention,
57}
58
59#[derive(Debug, Clone)]
60pub enum LogLifecycleEvent {
61    Created {
62        active: repository::File,
63        reason: LogLifecycleReason,
64    },
65    Rotated {
66        archived: repository::File,
67        active: repository::File,
68    },
69    RetainedDeleted {
70        files: Vec<repository::File>,
71    },
72}
73
74pub trait LogLifecycleObserver: Send + Sync {
75    fn on_event(&self, event: &LogLifecycleEvent);
76}
77
78pub trait LogArtifactSizer: Send + Sync {
79    fn journal_artifact_size(&self, journal_path: &Path) -> Result<u64>;
80}
81
82#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
83pub struct EntryTimestamps {
84    /// Optional source timestamp for `_SOURCE_REALTIME_TIMESTAMP` field injection.
85    pub source_realtime_usec: Option<u64>,
86    /// Optional journal entry realtime timestamp override.
87    pub entry_realtime_usec: Option<u64>,
88    /// Optional journal entry monotonic timestamp override.
89    pub entry_monotonic_usec: Option<u64>,
90}
91
92impl EntryTimestamps {
93    pub fn with_source_realtime_usec(mut self, ts: u64) -> Self {
94        self.source_realtime_usec = Some(ts);
95        self
96    }
97
98    pub fn with_entry_realtime_usec(mut self, ts: u64) -> Self {
99        self.entry_realtime_usec = Some(ts);
100        self
101    }
102
103    pub fn with_entry_monotonic_usec(mut self, ts: u64) -> Self {
104        self.entry_monotonic_usec = Some(ts);
105        self
106    }
107}
108
109impl Log {
110    fn duration_to_micros(duration: std::time::Duration) -> u64 {
111        duration.as_micros().try_into().unwrap_or(u64::MAX)
112    }
113
114    fn peek_entry_realtime(&self, timestamps: &EntryTimestamps) -> u64 {
115        let candidate = timestamps
116            .entry_realtime_usec
117            .unwrap_or_else(|| Microseconds::now().get());
118        let last_seen = self.clock.last_seen().get();
119        if candidate > last_seen {
120            candidate
121        } else {
122            last_seen.saturating_add(1)
123        }
124    }
125
126    fn should_rotate_for_realtime(&self, realtime: u64) -> bool {
127        let Some(active_file) = &self.active_file else {
128            return true;
129        };
130        if self.rotation_state.should_rotate() {
131            return true;
132        }
133        let Some(max_duration) = self.config.rotation_policy.duration_of_journal_file else {
134            return false;
135        };
136        let header = active_file.journal_file.journal_header_ref();
137        header.n_entries > 0
138            && header.head_entry_realtime > 0
139            && realtime.saturating_sub(header.head_entry_realtime)
140                >= Self::duration_to_micros(max_duration)
141    }
142
143    fn append_rotation_reason(&self) -> LogLifecycleReason {
144        if self.active_file.is_none() {
145            LogLifecycleReason::Append
146        } else {
147            LogLifecycleReason::Rotation
148        }
149    }
150
151    fn prepare_append_for_realtime(&mut self, entry_realtime: u64) -> Result<()> {
152        self.apply_retention_on_open()?;
153        let opened_first_active = self.active_file.is_none();
154        if self.should_rotate_for_realtime(entry_realtime) {
155            self.rotate(entry_realtime, self.append_rotation_reason())?;
156            if opened_first_active {
157                self.retention_on_open_applied = true;
158            }
159        }
160        self.apply_retention_on_open()
161    }
162
163    fn raw_items_for_policy<'a>(&self, items: &'a [&'a [u8]]) -> Result<Option<Vec<&'a [u8]>>> {
164        if self.config.field_name_policy != FieldNamePolicy::JournalApp {
165            return Ok(None);
166        }
167        let filtered_items = filter_raw_items_for_journal_app(items)?;
168        if filtered_items.is_empty() {
169            return Err(WriterError::EmptyEntry);
170        }
171        Ok(Some(filtered_items))
172    }
173
174    fn structured_fields_for_policy<'a>(
175        &self,
176        fields: &'a [StructuredField<'a>],
177    ) -> Result<Option<Vec<StructuredField<'a>>>> {
178        if self.config.field_name_policy != FieldNamePolicy::JournalApp {
179            return Ok(None);
180        }
181        let filtered_fields = filter_structured_fields_for_journal_app(fields);
182        if filtered_fields.is_empty() {
183            return Err(WriterError::EmptyEntry);
184        }
185        Ok(Some(filtered_fields))
186    }
187
188    fn apply_retention(&mut self, protected_file: Option<&repository::File>) -> Result<()> {
189        if let Some(sizer) = &self.artifact_sizer {
190            self.chain.refresh_retained_sizes(|file| {
191                sizer.journal_artifact_size(Path::new(file.path()))
192            })?;
193        }
194        let retention = self
195            .chain
196            .retain(&self.config.retention_policy, protected_file);
197        let deleted_files = retention.deleted_files;
198        if !deleted_files.is_empty()
199            && let Some(observer) = &self.lifecycle_observer
200        {
201            observer.on_event(&LogLifecycleEvent::RetainedDeleted {
202                files: deleted_files,
203            });
204        }
205        if let Some(error) = retention.error {
206            return Err(error);
207        }
208
209        Ok(())
210    }
211
212    fn apply_retention_on_open(&mut self) -> Result<()> {
213        if self.retention_on_open_applied || self.active_file.is_none() {
214            return Ok(());
215        }
216        self.enforce_retention()?;
217        self.retention_on_open_applied = true;
218        Ok(())
219    }
220
221    /// Captures both realtime and monotonic timestamps, similar to systemd's dual_timestamp_now().
222    ///
223    /// Returns (realtime_usec, monotonic_usec) where:
224    /// - realtime: microseconds since Unix epoch (CLOCK_REALTIME), monotonically increasing
225    /// - monotonic: microseconds since boot (CLOCK_MONOTONIC)
226    fn capture_dual_timestamp(
227        &mut self,
228        timestamp_override: Option<&EntryTimestamps>,
229    ) -> Result<(u64, u64)> {
230        let realtime = match timestamp_override.and_then(|ts| ts.entry_realtime_usec) {
231            Some(ts) => self.clock.observe(Microseconds::new(ts)).get(),
232            None => self.clock.now().get(),
233        };
234
235        let desired_monotonic = match timestamp_override.and_then(|ts| ts.entry_monotonic_usec) {
236            Some(ts) => ts,
237            None => monotonic_now().map_err(WriterError::Io)?.get(),
238        };
239
240        let monotonic = if desired_monotonic > self.last_monotonic_usec {
241            desired_monotonic
242        } else {
243            self.last_monotonic_usec.saturating_add(1)
244        };
245        self.last_monotonic_usec = monotonic;
246
247        Ok((realtime, monotonic))
248    }
249
250    /// Creates a new journal log.
251    pub fn new(path: &Path, config: Config) -> Result<Self> {
252        Self::new_inner(path, config, None, None)
253    }
254
255    pub fn new_with_lifecycle_observer(
256        path: &Path,
257        config: Config,
258        observer: Arc<dyn LogLifecycleObserver>,
259    ) -> Result<Self> {
260        Self::new_inner(path, config, Some(observer), None)
261    }
262
263    pub fn new_with_hooks(
264        path: &Path,
265        config: Config,
266        observer: Option<Arc<dyn LogLifecycleObserver>>,
267        artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
268    ) -> Result<Self> {
269        Self::new_inner(path, config, observer, artifact_sizer)
270    }
271
272    fn new_inner(
273        path: &Path,
274        config: Config,
275        lifecycle_observer: Option<Arc<dyn LogLifecycleObserver>>,
276        artifact_sizer: Option<Arc<dyn LogArtifactSizer>>,
277    ) -> Result<Self> {
278        let startup = build_startup_state(path, config)?;
279
280        let mut log = Log {
281            configured_dir: path.to_path_buf(),
282            chain: startup.chain,
283            config: startup.config,
284            active_file: startup.active_file,
285            rotation_state: startup.rotation_state,
286            boot_id: startup.boot_id,
287            seqnum_id: startup.seqnum_id,
288            current_seqnum: startup.current_seqnum,
289            clock: startup.clock,
290            last_monotonic_usec: startup.last_monotonic_usec,
291            lifecycle_observer,
292            artifact_sizer,
293            retention_on_open_applied: false,
294            boot_id_field: format!("_BOOT_ID={}", startup.boot_id.as_simple()).into_bytes(),
295            source_realtime_field: Vec::with_capacity(SOURCE_REALTIME_PREFIX.len() + 20),
296        };
297        if log.config.open_mode == LogOpenMode::Eager && log.active_file.is_none() {
298            let realtime = log.peek_entry_realtime(&EntryTimestamps::default());
299            log.rotate(realtime, LogLifecycleReason::EagerOpen)?;
300            log.retention_on_open_applied = true;
301        }
302        log.apply_retention_on_open()?;
303        Ok(log)
304    }
305
306    pub fn with_lifecycle_observer(mut self, observer: Arc<dyn LogLifecycleObserver>) -> Self {
307        self.lifecycle_observer = Some(observer);
308        self
309    }
310
311    pub fn with_artifact_sizer(mut self, sizer: Arc<dyn LogArtifactSizer>) -> Self {
312        self.artifact_sizer = Some(sizer);
313        self
314    }
315
316    /// Writes a journal entry.
317    ///
318    /// If `source_realtime_usec` is provided, a `_SOURCE_REALTIME_TIMESTAMP` field will be added
319    /// to record the original timestamp from the source (in microseconds since Unix epoch).
320    /// This is useful when ingesting logs from external sources that have their own timestamps.
321    pub fn write_entry(
322        &mut self,
323        items: &[&[u8]],
324        source_realtime_usec: Option<u64>,
325    ) -> Result<()> {
326        self.write_entry_with_timestamps(
327            items,
328            EntryTimestamps {
329                source_realtime_usec,
330                ..EntryTimestamps::default()
331            },
332        )
333    }
334
335    /// Writes a journal entry with optional source and entry timestamp overrides.
336    ///
337    /// Overrides are safe by construction:
338    /// - entry realtime is clamped to strict monotonic progression (`last + 1us` floor)
339    /// - entry monotonic is also clamped to strict monotonic progression (`last + 1us` floor)
340    pub fn write_entry_with_timestamps(
341        &mut self,
342        items: &[&[u8]],
343        timestamps: EntryTimestamps,
344    ) -> Result<()> {
345        if items.is_empty() {
346            return Err(WriterError::EmptyEntry);
347        }
348
349        let entry_realtime = self.peek_entry_realtime(&timestamps);
350        self.prepare_append_for_realtime(entry_realtime)?;
351        let filtered_items = self.raw_items_for_policy(items)?;
352        let write_items = filtered_items.as_deref().unwrap_or(items);
353
354        let (realtime, monotonic) = self.capture_dual_timestamp(Some(&timestamps))?;
355        self.write_raw_entry_fields(
356            write_items,
357            timestamps.source_realtime_usec,
358            realtime,
359            monotonic,
360            self.low_level_entry_options(EntryWriteOptions::default()),
361        )?;
362
363        let active_file = self.active_file.as_ref().unwrap();
364        self.rotation_state.update(&active_file.writer);
365        self.current_seqnum += 1;
366
367        Ok(())
368    }
369
370    /// Writes a journal entry from structured field names and binary-safe values.
371    ///
372    /// This is the preferred path when the producer already has split field
373    /// names and values. If `source_realtime_usec` is provided, a
374    /// `_SOURCE_REALTIME_TIMESTAMP` field is added.
375    pub fn write_fields(
376        &mut self,
377        fields: &[StructuredField<'_>],
378        source_realtime_usec: Option<u64>,
379    ) -> Result<()> {
380        self.write_fields_with_timestamps(
381            fields,
382            EntryTimestamps {
383                source_realtime_usec,
384                ..EntryTimestamps::default()
385            },
386        )
387    }
388
389    /// Writes structured fields with optional source and entry timestamp overrides.
390    ///
391    /// Entry realtime and monotonic overrides use the same monotonic clamping
392    /// rules as [`Log::write_entry_with_timestamps`].
393    pub fn write_fields_with_timestamps(
394        &mut self,
395        fields: &[StructuredField<'_>],
396        timestamps: EntryTimestamps,
397    ) -> Result<()> {
398        self.write_fields_with_options(fields, timestamps, EntryWriteOptions::default())
399    }
400
401    /// Writes structured fields with explicit low-level entry write options.
402    ///
403    /// Use this only when the caller can satisfy any invariants required by the
404    /// selected [`EntryWriteOptions`], especially no duplicate full `KEY=value`
405    /// payloads when `trusted_unique_payloads` is enabled.
406    pub fn write_fields_with_options(
407        &mut self,
408        fields: &[StructuredField<'_>],
409        timestamps: EntryTimestamps,
410        options: EntryWriteOptions,
411    ) -> Result<()> {
412        if fields.is_empty() {
413            return Err(WriterError::EmptyEntry);
414        }
415
416        let entry_realtime = self.peek_entry_realtime(&timestamps);
417        self.prepare_append_for_realtime(entry_realtime)?;
418        let filtered_fields = self.structured_fields_for_policy(fields)?;
419        let write_fields = filtered_fields.as_deref().unwrap_or(fields);
420
421        let (realtime, monotonic) = self.capture_dual_timestamp(Some(&timestamps))?;
422        self.write_structured_entry_fields(
423            write_fields,
424            timestamps.source_realtime_usec,
425            realtime,
426            monotonic,
427            self.low_level_entry_options(options),
428        )?;
429
430        let active_file = self.active_file.as_ref().unwrap();
431        self.rotation_state.update(&active_file.writer);
432        self.current_seqnum += 1;
433
434        Ok(())
435    }
436
437    fn write_raw_entry_fields(
438        &mut self,
439        items: &[&[u8]],
440        source_realtime_usec: Option<u64>,
441        realtime: u64,
442        monotonic: u64,
443        options: EntryWriteOptions,
444    ) -> Result<()> {
445        let source_field = if let Some(timestamp_usec) = source_realtime_usec {
446            self.prepare_source_realtime_field(timestamp_usec);
447            Some(self.source_realtime_field.as_slice())
448        } else {
449            None
450        };
451
452        let total_items = items.len() + 1 + usize::from(source_field.is_some());
453        if total_items <= STACK_ENTRY_REF_LIMIT {
454            let mut refs = [EntryField::raw(&[]); STACK_ENTRY_REF_LIMIT];
455            let mut len = 0usize;
456            refs[len] = EntryField::raw(self.boot_id_field.as_slice());
457            len += 1;
458            if let Some(source_field) = source_field {
459                refs[len] = EntryField::raw(source_field);
460                len += 1;
461            }
462            for item in items {
463                refs[len] = EntryField::raw(item);
464                len += 1;
465            }
466            self.active_file.as_mut().unwrap().write_entry_fields(
467                refs[..len].iter().copied(),
468                realtime,
469                monotonic,
470                options,
471            )?;
472        } else {
473            let mut refs = Vec::with_capacity(total_items);
474            refs.push(EntryField::raw(self.boot_id_field.as_slice()));
475            if let Some(source_field) = source_field {
476                refs.push(EntryField::raw(source_field));
477            }
478            refs.extend(items.iter().copied().map(EntryField::raw));
479            self.active_file.as_mut().unwrap().write_entry_fields(
480                refs.iter().copied(),
481                realtime,
482                monotonic,
483                options,
484            )?;
485        }
486
487        Ok(())
488    }
489
490    fn write_structured_entry_fields(
491        &mut self,
492        fields: &[StructuredField<'_>],
493        source_realtime_usec: Option<u64>,
494        realtime: u64,
495        monotonic: u64,
496        options: EntryWriteOptions,
497    ) -> Result<()> {
498        let source_field = if let Some(timestamp_usec) = source_realtime_usec {
499            self.prepare_source_realtime_field(timestamp_usec);
500            Some(self.source_realtime_field.as_slice())
501        } else {
502            None
503        };
504
505        let total_items = fields.len() + 1 + usize::from(source_field.is_some());
506        if total_items <= STACK_ENTRY_REF_LIMIT {
507            let mut refs = [EntryField::raw(&[]); STACK_ENTRY_REF_LIMIT];
508            let mut len = 0usize;
509            refs[len] = EntryField::raw(self.boot_id_field.as_slice());
510            len += 1;
511            if let Some(source_field) = source_field {
512                refs[len] = EntryField::raw(source_field);
513                len += 1;
514            }
515            for field in fields {
516                refs[len] = EntryField::Structured(*field);
517                len += 1;
518            }
519            self.active_file.as_mut().unwrap().write_entry_fields(
520                refs[..len].iter().copied(),
521                realtime,
522                monotonic,
523                options,
524            )?;
525        } else {
526            let mut refs = Vec::with_capacity(total_items);
527            refs.push(EntryField::raw(self.boot_id_field.as_slice()));
528            if let Some(source_field) = source_field {
529                refs.push(EntryField::raw(source_field));
530            }
531            refs.extend(fields.iter().copied().map(EntryField::Structured));
532            self.active_file.as_mut().unwrap().write_entry_fields(
533                refs.iter().copied(),
534                realtime,
535                monotonic,
536                options,
537            )?;
538        }
539
540        Ok(())
541    }
542
543    fn low_level_entry_options(&self, options: EntryWriteOptions) -> EntryWriteOptions {
544        options.field_name_policy(log_writer_field_name_policy(self.config.field_name_policy))
545    }
546
547    fn prepare_source_realtime_field(&mut self, timestamp_usec: u64) {
548        self.source_realtime_field.clear();
549        self.source_realtime_field
550            .extend_from_slice(SOURCE_REALTIME_PREFIX);
551        let mut buffer = ItoaBuffer::new();
552        self.source_realtime_field
553            .extend_from_slice(buffer.format(timestamp_usec).as_bytes());
554    }
555
556    /// Syncs all written data to disk, ensuring durability.
557    ///
558    /// This should be called after writing a batch of log entries to ensure
559    /// they are persisted to disk before acknowledging the request.
560    pub fn sync(&mut self) -> Result<()> {
561        if let Some(active_file) = &mut self.active_file {
562            active_file.journal_file.sync()?;
563        }
564        Ok(())
565    }
566
567    /// Archives and closes the active file.
568    ///
569    /// In strict systemd naming mode this renames `<source>.journal` to the
570    /// chain filename before retention, matching the explicit close behavior of
571    /// the other SDK implementations. `Drop` remains best-effort for callers
572    /// that do not explicitly close.
573    pub fn close(mut self) -> Result<()> {
574        use journal_core::file::JournalState;
575
576        let Some(mut active_file) = self.active_file.take() else {
577            return Ok(());
578        };
579
580        let n_entries = active_file.journal_file.journal_header_ref().n_entries;
581        if self.config.strict_systemd_naming && n_entries == 0 {
582            match std::fs::remove_file(active_file.repository_file.path()) {
583                Ok(()) => {}
584                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
585                Err(err) => return Err(err.into()),
586            }
587            self.chain.remove_tracked_file(&active_file.repository_file);
588            return Ok(());
589        }
590
591        self.chain.update_file_size(
592            &active_file.repository_file,
593            active_file.current_file_size(),
594        );
595        active_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
596        active_file.journal_file.sync()?;
597
598        let protected_file = if self.config.strict_systemd_naming {
599            let header = active_file.journal_file.journal_header_ref();
600            self.chain.archive_file(
601                &active_file.repository_file,
602                uuid::Uuid::from_bytes(header.seqnum_id),
603                header.head_entry_seqnum,
604                header.head_entry_realtime,
605            )?
606        } else {
607            active_file.repository_file.clone()
608        };
609
610        self.apply_retention(Some(&protected_file))?;
611
612        Ok(())
613    }
614
615    pub fn active_file(&self) -> Option<&repository::File> {
616        self.active_file
617            .as_ref()
618            .map(|active_file| &active_file.repository_file)
619    }
620
621    pub fn active_path(&self) -> Option<&Path> {
622        self.active_file
623            .as_ref()
624            .map(|active_file| Path::new(active_file.repository_file.path()))
625    }
626
627    pub fn configured_directory(&self) -> &Path {
628        &self.configured_dir
629    }
630
631    pub fn journal_directory(&self) -> &Path {
632        &self.chain.path
633    }
634
635    pub fn machine_id(&self) -> uuid::Uuid {
636        self.chain.machine_id
637    }
638
639    pub fn boot_id(&self) -> uuid::Uuid {
640        self.boot_id
641    }
642
643    pub fn source(&self) -> &journal_registry::Source {
644        &self.chain.source
645    }
646
647    /// Applies the configured retention policy without requiring a rotation or
648    /// close. The current active file is counted in retention envelopes and is
649    /// protected from deletion.
650    pub fn enforce_retention(&mut self) -> Result<()> {
651        let protected_file = if let Some(active_file) = &self.active_file {
652            self.chain.update_file_size(
653                &active_file.repository_file,
654                active_file.current_file_size(),
655            );
656            Some(active_file.repository_file.clone())
657        } else {
658            None
659        };
660        self.apply_retention(protected_file.as_ref())
661    }
662
663    fn update_active_file_size(&mut self) {
664        if let Some(active_file) = &self.active_file {
665            self.chain.update_file_size(
666                &active_file.repository_file,
667                active_file.current_file_size(),
668            );
669        }
670    }
671
672    fn prepare_initial_rotation(&mut self) -> Result<()> {
673        self.update_active_file_size();
674        if self.active_file.is_none() && self.config.strict_systemd_naming {
675            self.chain.archive_existing_active_file()?;
676        }
677        Ok(())
678    }
679
680    fn archive_rotated_file(&mut self, old_file: &ActiveFile) -> Result<repository::File> {
681        if !self.config.strict_systemd_naming {
682            return Ok(old_file.repository_file.clone());
683        }
684        let old_header = old_file.journal_file.journal_header_ref();
685        self.chain.archive_file(
686            &old_file.repository_file,
687            uuid::Uuid::from_bytes(old_header.seqnum_id),
688            old_header.head_entry_seqnum,
689            old_header.head_entry_realtime,
690        )
691    }
692
693    fn rotate_existing_active_file(
694        &mut self,
695        mut old_file: ActiveFile,
696        max_file_size: Option<u64>,
697        head_realtime: u64,
698    ) -> Result<(ActiveFile, LogLifecycleEvent)> {
699        use journal_core::file::JournalState;
700
701        old_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
702        old_file.journal_file.sync()?;
703        let archived = self.archive_rotated_file(&old_file)?;
704        let new_file = old_file.rotate(
705            &mut self.chain,
706            max_file_size,
707            head_realtime,
708            self.config.compression,
709            self.config.compression_threshold,
710            self.config.strict_systemd_naming,
711            self.config.live_publish_every_entries,
712            self.config.file_mode,
713        )?;
714        let active = new_file.repository_file.clone();
715        Ok((new_file, LogLifecycleEvent::Rotated { archived, active }))
716    }
717
718    fn create_initial_active_file(
719        &mut self,
720        max_file_size: Option<u64>,
721        head_realtime: u64,
722        reason: LogLifecycleReason,
723    ) -> Result<(ActiveFile, LogLifecycleEvent)> {
724        let new_file = ActiveFile::create(
725            &mut self.chain,
726            self.seqnum_id,
727            self.boot_id,
728            self.current_seqnum + 1,
729            max_file_size,
730            head_realtime,
731            self.config.compression,
732            self.config.compression_threshold,
733            self.config.compact,
734            self.config.strict_systemd_naming,
735            self.config.live_publish_every_entries,
736            self.config.file_mode,
737        )?;
738        let active = new_file.repository_file.clone();
739        Ok((new_file, LogLifecycleEvent::Created { active, reason }))
740    }
741
742    fn emit_lifecycle_event(&self, event: &LogLifecycleEvent) {
743        if let Some(observer) = &self.lifecycle_observer {
744            observer.on_event(event);
745        }
746    }
747
748    fn protected_active_file(&self) -> Option<repository::File> {
749        self.active_file
750            .as_ref()
751            .map(|active_file| active_file.repository_file.clone())
752    }
753
754    #[tracing::instrument(skip_all, fields(active_file))]
755    fn rotate(&mut self, head_realtime: u64, reason: LogLifecycleReason) -> Result<()> {
756        self.prepare_initial_rotation()?;
757        let max_file_size = self.config.rotation_policy.size_of_journal_file;
758        let (new_file, lifecycle_event) = if let Some(old_file) = self.active_file.take() {
759            self.rotate_existing_active_file(old_file, max_file_size, head_realtime)?
760        } else {
761            self.create_initial_active_file(max_file_size, head_realtime, reason)?
762        };
763
764        tracing::Span::current().record("new_file", new_file.repository_file.path());
765
766        self.active_file = Some(new_file);
767        self.rotation_state.reset();
768        self.update_active_file_size();
769        self.emit_lifecycle_event(&lifecycle_event);
770
771        // Retention runs after the post-rotation current file is known, so the
772        // tracked current file counts in the envelope and is never deleted.
773        let protected_file = self.protected_active_file();
774        self.apply_retention(protected_file.as_ref())?;
775
776        Ok(())
777    }
778
779    /// Writes a journal entry from a serializable value.
780    ///
781    /// This method serializes the value to JSON, flattens it, and writes it to the journal.
782    /// The flattened structure converts nested JSON into KEY=VALUE pairs suitable for journal entries.
783    ///
784    /// # Example
785    ///
786    /// ```no_run
787    /// use serde::Serialize;
788    /// use journal_log_writer::{Log, Config, RotationPolicy, RetentionPolicy};
789    /// use journal_registry::Origin;
790    /// use std::path::Path;
791    ///
792    /// #[derive(Serialize)]
793    /// struct LogEntry {
794    ///     message: String,
795    ///     level: String,
796    ///     user: User,
797    /// }
798    ///
799    /// #[derive(Serialize)]
800    /// struct User {
801    ///     id: u64,
802    ///     name: String,
803    /// }
804    ///
805    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
806    /// let origin = Origin {
807    ///     machine_id: None,
808    ///     namespace: None,
809    ///     source: journal_registry::Source::System,
810    /// };
811    /// let config = Config::new(origin, RotationPolicy::default(), RetentionPolicy::default());
812    /// let mut log = Log::new(Path::new("/tmp/test-journal"), config)?;
813    ///
814    /// let entry = LogEntry {
815    ///     message: "User logged in".to_string(),
816    ///     level: "INFO".to_string(),
817    ///     user: User {
818    ///         id: 42,
819    ///         name: "alice".to_string(),
820    ///     },
821    /// };
822    ///
823    /// // This will write fields like:
824    /// // MESSAGE=User logged in
825    /// // LEVEL=INFO
826    /// // USER_ID=42
827    /// // USER_NAME=alice
828    /// log.write_structured(&entry)?;
829    /// # Ok(())
830    /// # }
831    /// ```
832    #[cfg(feature = "serde-api")]
833    pub fn write_structured<T: serde::Serialize>(&mut self, value: &T) -> Result<()> {
834        // Serialize to JSON value
835        let json_value = serde_json::to_value(value).map_err(|e| {
836            WriterError::Serialization(format!("failed to serialize to JSON: {}", e))
837        })?;
838
839        // Flatten the JSON structure - requires a JSON object (Map)
840        let flattened = if let serde_json::Value::Object(map) = json_value {
841            flatten_json_map(&map)
842        } else {
843            // If not an object, return error
844            return Err(WriterError::Serialization(
845                "value must be a JSON object, not a primitive or array".to_string(),
846            ));
847        };
848
849        // Convert to journal field format (KEY=VALUE)
850        let mut fields: Vec<Vec<u8>> = Vec::with_capacity(flattened.len());
851
852        for (key, value) in flattened.iter() {
853            // Convert key to uppercase and replace dots with underscores
854            // (journal convention)
855            let journal_key = key.to_uppercase().replace('.', "_");
856
857            // Format as KEY=VALUE
858            let field = match value {
859                serde_json::Value::String(s) => {
860                    format!("{}={}", journal_key, s)
861                }
862                serde_json::Value::Number(n) => {
863                    format!("{}={}", journal_key, n)
864                }
865                serde_json::Value::Bool(b) => {
866                    format!("{}={}", journal_key, if *b { "true" } else { "false" })
867                }
868                serde_json::Value::Null => {
869                    format!("{}=", journal_key)
870                }
871                // Arrays and objects should be flattened already, but just in case
872                _ => {
873                    format!("{}={}", journal_key, value)
874                }
875            };
876
877            fields.push(field.into_bytes());
878        }
879
880        // Convert Vec<Vec<u8>> to Vec<&[u8]> for write_entry
881        let field_refs: Vec<&[u8]> = fields.iter().map(|f| f.as_slice()).collect();
882
883        self.write_entry(&field_refs, None)
884    }
885}
886
887#[cfg(all(test, feature = "serde-api"))]
888mod serde_api_tests;
889
890impl Drop for Log {
891    fn drop(&mut self) {
892        use journal_core::file::JournalState;
893
894        if let Some(ref mut active_file) = self.active_file {
895            // Keep the active path stable on close so file-backed readers that
896            // already follow system.journal can finish. The next writer startup
897            // archives this stale active file before creating a fresh one.
898            active_file.journal_file.journal_header_mut().state = JournalState::Archived as u8;
899
900            // Best/Last-effort sync just to be on the cautious side.
901            let _ = active_file.journal_file.sync();
902        }
903    }
904}