sdjournal 0.1.21

Pure Rust systemd journal reader and query engine
Documentation
use crate::config::JournalConfig;
use crate::cursor::{Cursor, SdJournalEntryKey};
use crate::entry::EntryRef;
use crate::error::Result;
use crate::journal::Journal;
use std::cmp::Ordering;

use super::filter::{CompiledFilter, MatchTerm};
use super::{
    compare_keys, cursor_from_key, is_skippable_live_file_error, key_from_entry_ref,
    warn_live_file_error,
};

pub(super) struct JournalSnapshot {
    pub(super) journal: Journal,
    pub(super) last_seen: Option<SdJournalEntryKey>,
}

pub(super) struct ReplayState {
    pub(super) journal: Journal,
    pub(super) cursor: Option<Cursor>,
    pub(super) since_realtime: Option<u64>,
    pub(super) last_key: Option<SdJournalEntryKey>,
    pub(super) upper_key: Option<SdJournalEntryKey>,
    pub(super) remaining: Option<usize>,
}

impl ReplayState {
    pub(super) fn new(
        snapshot: JournalSnapshot,
        after_cursor: Option<Cursor>,
        since_realtime: Option<u64>,
        config: &JournalConfig,
    ) -> Self {
        let (cursor, last_key) = match after_cursor {
            Some(cursor) => match cursor.sdjournal_entry_key() {
                Some(key) => (None, Some(key)),
                None => (Some(cursor), None),
            },
            None => (None, None),
        };

        Self {
            journal: snapshot.journal,
            cursor,
            since_realtime,
            last_key,
            upper_key: snapshot.last_seen,
            remaining: config.max_live_replay_entries,
        }
    }

    pub(super) fn catch_up(
        journal: Journal,
        after_key: SdJournalEntryKey,
        upper_key: SdJournalEntryKey,
        config: &JournalConfig,
    ) -> Self {
        Self {
            journal,
            cursor: None,
            since_realtime: None,
            last_key: Some(after_key),
            upper_key: Some(upper_key),
            remaining: config.max_live_replay_entries,
        }
    }
}

pub(super) struct PendingTopologyCatchup {
    pub(super) journal: Journal,
    pub(super) last_key: Option<SdJournalEntryKey>,
    pub(super) upper_key: Option<SdJournalEntryKey>,
}

pub(super) struct EntryBatch {
    pub(super) entries: Vec<EntryRef>,
    pub(super) last_key: Option<SdJournalEntryKey>,
    pub(super) exhausted: bool,
}

pub(super) fn collect_replay_batch(
    replay: &ReplayState,
    filter: &CompiledFilter,
    limit: usize,
) -> Result<EntryBatch> {
    let Some(upper_key) = replay.upper_key else {
        return Ok(EntryBatch {
            entries: Vec::new(),
            last_key: None,
            exhausted: true,
        });
    };

    let mut q = replay.journal.query();
    apply_compiled_filter(&mut q, filter);
    if let Some(since) = replay.since_realtime {
        q.since_realtime(since);
    }
    if let Some(key) = replay.last_key {
        q.after_cursor(cursor_from_key(key));
    } else if let Some(cursor) = replay.cursor.clone() {
        q.after_cursor(cursor);
    }
    q.limit(limit.saturating_add(1));

    collect_entry_batch(q, limit, Some(upper_key), "skipping live replay entry")
}

pub(super) fn collect_entries_after_key(
    journal: &Journal,
    after_key: Option<SdJournalEntryKey>,
    upper_key: Option<SdJournalEntryKey>,
    limit: usize,
) -> Result<EntryBatch> {
    let mut q = journal.query();
    if let Some(key) = after_key {
        q.after_cursor(cursor_from_key(key));
    }
    q.limit(limit.saturating_add(1));

    collect_entry_batch(q, limit, upper_key, "skipping live refresh entry")
}

fn collect_entry_batch(
    query: crate::JournalQuery,
    limit: usize,
    upper_key: Option<SdJournalEntryKey>,
    skip_message: &'static str,
) -> Result<EntryBatch> {
    let mut entries = Vec::with_capacity(limit.min(64));
    let mut exhausted = true;
    for item in query.iter()? {
        match item {
            Ok(entry) => {
                let key = key_from_entry_ref(&entry);
                if let Some(upper_key) = upper_key
                    && compare_keys(&key, &upper_key) == Ordering::Greater
                {
                    break;
                }
                if entries.len() >= limit {
                    exhausted = false;
                    break;
                }
                entries.push(entry);
            }
            Err(err) if is_skippable_live_file_error(&err) => {
                warn_live_file_error(skip_message, &err);
            }
            Err(err) => return Err(err),
        }
    }

    let last_key = entries.last().map(key_from_entry_ref);
    Ok(EntryBatch {
        entries,
        last_key,
        exhausted,
    })
}

fn apply_compiled_filter(query: &mut crate::JournalQuery, filter: &CompiledFilter) {
    match filter.branches.as_slice() {
        [] => {}
        [branch] => {
            for term in branch {
                apply_term_to_query(query, term);
            }
        }
        branches => {
            for branch in branches {
                query.or_group(|group| {
                    for term in branch {
                        apply_term_to_or_group(group, term);
                    }
                });
            }
        }
    }
}

fn apply_term_to_query(query: &mut crate::JournalQuery, term: &MatchTerm) {
    match term {
        MatchTerm::Exact { field, value } => {
            query.match_exact(field, value);
        }
        MatchTerm::Present { field } => {
            query.match_present(field);
        }
    }
}

fn apply_term_to_or_group(group: &mut crate::query::OrGroupBuilder, term: &MatchTerm) {
    match term {
        MatchTerm::Exact { field, value } => {
            group.match_exact(field, value);
        }
        MatchTerm::Present { field } => {
            group.match_present(field);
        }
    }
}