sdjournal 0.1.20

Pure Rust systemd journal reader and query engine
Documentation
use crate::config::JournalConfig;
use crate::cursor::SdJournalEntryKey;
use crate::error::{LimitKind, Result, SdJournalError};
use crate::file::{JournalFile, LiveFileState};
use crate::journal::{Journal, journal_from_file_paths};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::time::SystemTime;

use super::{compare_keys, is_skippable_live_file_error, warn_skipped_live_file};

pub(super) struct TrackedFile {
    pub(super) path: PathBuf,
    pub(super) file_id: [u8; 16],
    pub(super) live_state: LiveFileState,
    pub(super) tail: FileTailCursor,
}

pub(super) struct TrackedFiles {
    pub(super) files: Vec<TrackedFile>,
    pub(super) path_index: HashMap<PathBuf, usize>,
    pub(super) last_seen: Option<SdJournalEntryKey>,
}

pub(super) struct LiveSnapshot {
    pub(super) tracked: TrackedFiles,
    pub(super) journal: Journal,
}

pub(super) struct OffsetBatch {
    pub(super) offsets: Vec<u64>,
    pub(super) exhausted: bool,
}

pub(super) struct FallbackDirState {
    pub(super) path: PathBuf,
    pub(super) modified: Option<SystemTime>,
}

pub(super) struct FileTailCursor {
    current_array_offset: Option<u64>,
    next_item_idx: usize,
    last_entry_offset: Option<u64>,
}

impl FileTailCursor {
    fn at_end(file: &JournalFile) -> Result<Self> {
        let mut next = file.live_state().entry_array_offset;
        let mut current_array_offset = None;
        let mut next_item_idx = 0usize;
        let mut last_entry_offset = None;
        let mut steps = 0usize;

        while next != 0 {
            current_array_offset = Some(next);
            let items = file.read_entry_array_items(next)?;
            next_item_idx = items.len();
            last_entry_offset = items.last().copied();
            next = file.read_entry_array_next_offset(next)?;

            steps = steps.saturating_add(1);
            if steps > file.max_object_chain_steps() {
                return Err(SdJournalError::LimitExceeded {
                    kind: LimitKind::ObjectChainSteps,
                    limit: u64::try_from(file.max_object_chain_steps()).unwrap_or(u64::MAX),
                });
            }
        }

        Ok(Self {
            current_array_offset,
            next_item_idx,
            last_entry_offset,
        })
    }

    pub(super) fn drain_new_offsets(
        &mut self,
        file: &JournalFile,
        limit: usize,
    ) -> Result<OffsetBatch> {
        let mut out = Vec::new();
        let mut steps = 0usize;
        let mut current = match self.current_array_offset {
            Some(offset) => {
                let items = file.read_entry_array_items(offset)?;
                let mut item_idx = self.next_item_idx.min(items.len());
                while item_idx < items.len() {
                    let entry_offset = items[item_idx];
                    item_idx = item_idx.saturating_add(1);
                    if entry_offset != 0 {
                        out.push(entry_offset);
                        self.last_entry_offset = Some(entry_offset);
                    }
                    if out.len() >= limit {
                        self.next_item_idx = item_idx;
                        return Ok(OffsetBatch {
                            offsets: out,
                            exhausted: false,
                        });
                    }
                }
                self.next_item_idx = item_idx;
                file.read_entry_array_next_offset(offset)?
            }
            None => file.live_state().entry_array_offset,
        };

        while current != 0 {
            let items = file.read_entry_array_items(current)?;
            let mut item_idx = 0usize;
            while item_idx < items.len() {
                let entry_offset = items[item_idx];
                item_idx = item_idx.saturating_add(1);
                if entry_offset != 0 {
                    out.push(entry_offset);
                    self.last_entry_offset = Some(entry_offset);
                }
                if out.len() >= limit {
                    self.current_array_offset = Some(current);
                    self.next_item_idx = item_idx;
                    return Ok(OffsetBatch {
                        offsets: out,
                        exhausted: false,
                    });
                }
            }
            self.current_array_offset = Some(current);
            self.next_item_idx = item_idx;
            current = file.read_entry_array_next_offset(current)?;

            steps = steps.saturating_add(1);
            if steps > file.max_object_chain_steps() {
                return Err(SdJournalError::Transient {
                    path: Some(file.path().to_path_buf()),
                    reason: "entry array chain refresh exceeded expected growth".to_string(),
                });
            }
        }

        if self.current_array_offset.is_none() {
            self.next_item_idx = 0;
            self.last_entry_offset = None;
            return Ok(OffsetBatch {
                offsets: Vec::new(),
                exhausted: true,
            });
        }

        Ok(OffsetBatch {
            offsets: out,
            exhausted: true,
        })
    }
}

struct TrackedFileBuilder {
    tracked_files: Vec<TrackedFile>,
    path_index: HashMap<PathBuf, usize>,
    last_seen: Option<SdJournalEntryKey>,
    first_skipped_error: Option<SdJournalError>,
    seen: HashSet<[u8; 16]>,
}

impl TrackedFileBuilder {
    fn new(capacity: usize) -> Self {
        Self {
            tracked_files: Vec::with_capacity(capacity),
            path_index: HashMap::with_capacity(capacity),
            last_seen: None,
            first_skipped_error: None,
            seen: HashSet::new(),
        }
    }

    fn push(&mut self, file: &JournalFile) -> Result<bool> {
        let file_id = file.file_id();
        if self.seen.contains(&file_id) {
            return Ok(false);
        }

        match track_file(file) {
            Ok((tracked, key)) => {
                self.seen.insert(file_id);
                if key.as_ref().is_some_and(|key| {
                    self.last_seen
                        .as_ref()
                        .is_none_or(|last| compare_keys(key, last).is_gt())
                }) {
                    self.last_seen = key;
                }
                self.path_index
                    .insert(tracked.path.clone(), self.tracked_files.len());
                self.tracked_files.push(tracked);
                Ok(true)
            }
            Err(err) if is_skippable_live_file_error(&err) => {
                warn_skipped_live_file(file.path(), &err);
                if self.first_skipped_error.is_none() {
                    self.first_skipped_error = Some(err);
                }
                Ok(false)
            }
            Err(err) => Err(err),
        }
    }

    fn skip_path(&mut self, path: &Path, err: SdJournalError) {
        warn_skipped_live_file(path, &err);
        if self.first_skipped_error.is_none() {
            self.first_skipped_error = Some(err);
        }
    }

    fn finish(self) -> Result<TrackedFiles> {
        finish_tracked_files(
            self.tracked_files,
            self.path_index,
            self.last_seen,
            self.first_skipped_error,
        )
    }
}

#[cfg(target_os = "linux")]
pub(super) fn collect_watch_paths(
    roots: &[PathBuf],
    tracked_files: &[TrackedFile],
) -> Vec<PathBuf> {
    let mut watch_paths: Vec<PathBuf> = roots.to_vec();
    for tracked in tracked_files {
        watch_paths.push(tracked.path.clone());
        if let Some(parent) = tracked.path.parent() {
            watch_paths.push(parent.to_path_buf());
        }
    }
    watch_paths.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
    watch_paths.dedup();
    watch_paths
}

pub(super) fn collect_fallback_dirs(
    roots: &[PathBuf],
    tracked_files: &[TrackedFile],
) -> Vec<FallbackDirState> {
    let mut dirs: Vec<PathBuf> = roots.to_vec();
    for tracked in tracked_files {
        if let Some(parent) = tracked.path.parent() {
            dirs.push(parent.to_path_buf());
        }
    }
    dirs.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
    dirs.dedup();
    dirs.into_iter()
        .map(|path| FallbackDirState {
            modified: std::fs::metadata(&path)
                .and_then(|meta| meta.modified())
                .ok(),
            path,
        })
        .collect()
}

pub(super) fn build_tracked_files_from_open_files(files: &[JournalFile]) -> Result<TrackedFiles> {
    let mut builder = TrackedFileBuilder::new(files.len());

    for file in files {
        builder.push(file)?;
    }

    builder.finish()
}

pub(super) fn build_tracked_files_from_paths(
    paths: &[PathBuf],
    config: &JournalConfig,
) -> Result<TrackedFiles> {
    let mut builder = TrackedFileBuilder::new(paths.len());

    for path in paths {
        let file = match JournalFile::open(path.clone(), config) {
            Ok(file) => file,
            Err(err) if is_skippable_live_file_error(&err) => {
                builder.skip_path(path, err);
                continue;
            }
            Err(err) => return Err(err),
        };
        builder.push(&file)?;
    }

    builder.finish()
}

pub(super) fn build_live_snapshot(
    paths: &[PathBuf],
    config: &JournalConfig,
) -> Result<LiveSnapshot> {
    let tracked = build_tracked_files_from_paths(paths, config)?;
    let file_paths = tracked
        .files
        .iter()
        .map(|tracked| tracked.path.clone())
        .collect();
    let journal = journal_from_file_paths(Vec::new(), file_paths, config.clone())?;

    Ok(LiveSnapshot { tracked, journal })
}

fn track_file(file: &JournalFile) -> Result<(TrackedFile, Option<SdJournalEntryKey>)> {
    let tail = FileTailCursor::at_end(file)?;
    let key = match tail.last_entry_offset {
        Some(offset) => {
            let meta = file.read_entry_meta(offset)?;
            Some(SdJournalEntryKey {
                file_id: meta.file_id,
                entry_offset: meta.entry_offset,
                seqnum: meta.seqnum,
                realtime_usec: meta.realtime_usec,
            })
        }
        None => None,
    };

    Ok((
        TrackedFile {
            path: file.path().to_path_buf(),
            file_id: file.file_id(),
            live_state: file.live_state(),
            tail,
        },
        key,
    ))
}

fn finish_tracked_files(
    tracked_files: Vec<TrackedFile>,
    path_index: HashMap<PathBuf, usize>,
    last_seen: Option<SdJournalEntryKey>,
    first_skipped_error: Option<SdJournalError>,
) -> Result<TrackedFiles> {
    if tracked_files.is_empty()
        && let Some(err) = first_skipped_error
    {
        return Err(err);
    }

    Ok(TrackedFiles {
        files: tracked_files,
        path_index,
        last_seen,
    })
}