Skip to main content

journal_engine/logs/
query.rs

1//! Log querying from indexed journal files.
2//!
3//! This module provides the `LogQuery` builder for efficiently querying and
4//! merging log entries from multiple indexed journal files, as well as
5//! functions for extracting raw field data from journal entries.
6
7use crate::error::Result;
8use journal_core::file::{CurrentRowView, JournalFile, Mmap};
9use journal_index::{
10    Anchor, Direction, FieldName, FieldValuePair, FileIndex, Filter, LogEntryId, LogQueryParams,
11    LogQueryParamsBuilder, Microseconds,
12};
13use journal_registry::File;
14use std::collections::{HashMap, HashSet};
15use std::num::NonZeroU64;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use tokio_util::sync::CancellationToken;
19use tracing::warn;
20
21/// Pagination state for multi-file log queries.
22///
23/// This tracks the position in each file where we stopped reading,
24/// allowing queries to resume efficiently without re-scanning entries.
25///
26/// The state is tied to a specific query configuration (filter, anchor, direction, etc).
27/// Changing the query parameters while using the same pagination state will produce
28/// undefined results.
29#[derive(Debug, Clone, Default)]
30pub struct PaginationState {
31    /// Maps each file to the last position we read from it
32    pub file_positions: HashMap<File, usize>,
33}
34
35/// Builder for configuring and executing log queries from indexed journal files.
36///
37/// This builder allows you to specify:
38/// - Direction (forward/backward in time)
39/// - Anchor timestamp (starting point)
40/// - Limit (maximum entries to retrieve)
41/// - Source timestamp field (which field to use for timestamps)
42/// - Filter (to match specific entries)
43///
44/// # Example
45///
46/// ```ignore
47/// use journal_index::{Anchor, Direction};
48/// use journal_function::logs::LogQuery;
49///
50/// let entries = LogQuery::new(&file_indexes, Anchor::Head, Direction::Forward)
51///     .with_limit(100)
52///     .execute();
53/// ```
54pub struct LogQuery<'a> {
55    file_indexes: &'a [FileIndex],
56    builder: LogQueryParamsBuilder,
57    cancellation: Option<CancellationToken>,
58    progress: Option<Arc<AtomicUsize>>,
59    output_fields: Option<HashSet<String>>,
60}
61
62impl<'a> LogQuery<'a> {
63    /// Create a new log query builder with required parameters.
64    ///
65    /// # Arguments
66    ///
67    /// * `file_indexes` - Journal file indexes to query
68    /// * `anchor` - Starting point for the query (Head, Tail, or specific timestamp)
69    /// * `direction` - Direction to iterate (Forward or Backward)
70    ///
71    /// # Optional Configuration
72    ///
73    /// Use builder methods to set optional parameters:
74    /// - Limit: None (unlimited)
75    /// - Source timestamp field: _SOURCE_REALTIME_TIMESTAMP
76    /// - Filter: None
77    pub fn new(file_indexes: &'a [FileIndex], anchor: Anchor, direction: Direction) -> Self {
78        Self {
79            file_indexes,
80            builder: LogQueryParamsBuilder::new(anchor, direction).with_source_timestamp_field(
81                Some(FieldName::new_unchecked("_SOURCE_REALTIME_TIMESTAMP")),
82            ),
83            cancellation: None,
84            progress: None,
85            output_fields: None,
86        }
87    }
88
89    /// Set the maximum number of log entries to retrieve (optional).
90    ///
91    /// If not set (None), all matching entries will be retrieved.
92    pub fn with_limit(mut self, limit: usize) -> Self {
93        self.builder = self.builder.with_limit(limit);
94        self
95    }
96
97    /// Set the source timestamp field to use for entry timestamps (optional).
98    ///
99    /// Pass `None` to use the entry's realtime timestamp from the journal header.
100    /// Pass `Some(field_name)` to use a custom timestamp field from the entry data.
101    pub fn with_source_timestamp_field(mut self, field: Option<FieldName>) -> Self {
102        self.builder = self.builder.with_source_timestamp_field(field);
103        self
104    }
105
106    /// Set a filter to apply to log entries (optional).
107    ///
108    /// Only entries matching the filter will be included in the results.
109    pub fn with_filter(mut self, filter: Filter) -> Self {
110        self.builder = self.builder.with_filter(filter);
111        self
112    }
113
114    /// Set the lower time boundary (inclusive) in microseconds (optional).
115    ///
116    /// Only entries with timestamp >= after_usec will be included.
117    /// This enforces a hard boundary regardless of anchor or limit.
118    pub fn with_after_usec(mut self, after: u64) -> Self {
119        self.builder = self.builder.with_after(Microseconds(after));
120        self
121    }
122
123    /// Set the upper time boundary (exclusive) in microseconds (optional).
124    ///
125    /// Only entries with timestamp < before_usec will be included.
126    /// This enforces a hard boundary regardless of anchor or limit.
127    pub fn with_before_usec(mut self, before: u64) -> Self {
128        self.builder = self.builder.with_before(Microseconds(before));
129        self
130    }
131
132    /// Set a regex pattern for full-text search (optional).
133    ///
134    /// Only entries where at least one data object (in "FIELD=value" format)
135    /// matches the regex will be included in the results.
136    ///
137    /// The pattern will be compiled when the query is executed. Invalid patterns
138    /// will cause execute() to return an error.
139    pub fn with_regex(mut self, pattern: impl Into<String>) -> Self {
140        self.builder = self.builder.with_regex(pattern);
141        self
142    }
143
144    /// Set a cancellation token for the query (optional).
145    ///
146    /// When set, the query will check the token before processing each file
147    /// and return early with partial results if cancelled.
148    pub fn with_cancellation(mut self, token: CancellationToken) -> Self {
149        self.cancellation = Some(token);
150        self
151    }
152
153    /// Set a progress counter for the query (optional).
154    ///
155    /// When set, the counter is incremented (via `fetch_add`) after each file
156    /// is processed in `retrieve_log_entries`.
157    pub fn with_progress(mut self, counter: Arc<AtomicUsize>) -> Self {
158        self.progress = Some(counter);
159        self
160    }
161
162    /// Limit returned field-value pairs to the requested on-disk field names.
163    pub fn with_output_fields<I, S>(mut self, fields: I) -> Self
164    where
165        I: IntoIterator<Item = S>,
166        S: Into<String>,
167    {
168        self.output_fields = Some(fields.into_iter().map(Into::into).collect());
169        self
170    }
171
172    /// Execute the query and return log entries.
173    ///
174    /// This consumes the builder and returns a vector of log entries sorted by timestamp
175    /// according to the configured direction.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if anchor or direction were not set, or if time boundaries are invalid.
180    pub fn execute(self) -> Result<Vec<LogEntryData>> {
181        let params = self.builder.build()?;
182        let output_fields = self.output_fields;
183        let (log_entry_ids, _state) = retrieve_log_entries(
184            self.file_indexes.to_vec(),
185            params,
186            None,
187            self.cancellation.as_ref(),
188            self.progress.as_ref(),
189        );
190
191        extract_entry_data(&log_entry_ids, output_fields.as_ref())
192    }
193
194    /// Execute the query with pagination support.
195    ///
196    /// This consumes the builder and returns a page of log entries along with
197    /// pagination state that can be used to retrieve the next page.
198    ///
199    /// # Arguments
200    ///
201    /// * `state` - Optional pagination state from a previous query. Pass `None` for the first page.
202    ///
203    /// # Returns
204    ///
205    /// Returns a tuple of (log entry data, new pagination state). If the pagination state
206    /// is empty (no file positions tracked), there are no more results.
207    ///
208    /// # Errors
209    ///
210    /// Returns an error if anchor or direction were not set, or if time boundaries are invalid.
211    pub fn execute_page(
212        self,
213        state: Option<&PaginationState>,
214    ) -> Result<(Vec<LogEntryData>, PaginationState)> {
215        let params = self.builder.build()?;
216        let output_fields = self.output_fields;
217        let (log_entry_ids, new_state) = retrieve_log_entries(
218            self.file_indexes.to_vec(),
219            params,
220            state,
221            self.cancellation.as_ref(),
222            self.progress.as_ref(),
223        );
224
225        let data = extract_entry_data(&log_entry_ids, output_fields.as_ref())?;
226        Ok((data, new_state))
227    }
228}
229
230/// Retrieve and merge log entries from multiple indexed journal files.
231///
232/// This function efficiently retrieves log entries from multiple journal files,
233/// merging them in timestamp order while respecting the limit constraint.
234///
235/// # Arguments
236///
237/// * `file_indexes` - Vector of indexed journal files to retrieve from
238/// * `params` - Query parameters (anchor, direction, limit, filter, boundaries)
239/// * `state` - Optional pagination state to resume from previous query
240///
241/// # Returns
242///
243/// A tuple of (log entries, new pagination state). The entries are sorted by timestamp
244/// and limited to `params.limit`. The new state can be used to resume the query.
245fn retrieve_log_entries(
246    file_indexes: Vec<FileIndex>,
247    params: LogQueryParams,
248    state: Option<&PaginationState>,
249    cancellation: Option<&CancellationToken>,
250    progress: Option<&Arc<AtomicUsize>>,
251) -> (Vec<LogEntryId>, PaginationState) {
252    // Handle edge cases
253    if params.limit() == Some(0) || file_indexes.is_empty() {
254        return (Vec::new(), PaginationState::default());
255    }
256
257    let anchor_usec = multi_file_anchor_usec(&file_indexes, params.anchor());
258    let mut relevant_indexes =
259        relevant_file_indexes(&file_indexes, params.direction(), anchor_usec);
260
261    if let Some(counter) = progress {
262        let filtered = file_indexes.len() - relevant_indexes.len();
263        counter.fetch_add(filtered, Ordering::Relaxed);
264    }
265
266    if relevant_indexes.is_empty() {
267        return (Vec::new(), PaginationState::default());
268    }
269
270    sort_relevant_indexes(&mut relevant_indexes, params.direction());
271
272    let (limit, mut collected_entries) = collection_limit_and_buffer(params.limit());
273    let mut new_state = state.cloned().unwrap_or_default();
274
275    for file_index in relevant_indexes {
276        if query_cancelled(cancellation, &new_state) {
277            break;
278        }
279
280        mark_file_processed(progress);
281
282        if should_prune_file(file_index, &collected_entries, limit, params.direction()) {
283            break;
284        }
285
286        if let Some(new_entries) = query_file_entries(file_index, &params, state) {
287            collected_entries =
288                merge_log_entries(collected_entries, new_entries, limit, params.direction());
289        }
290    }
291
292    update_pagination_state(&mut new_state, &collected_entries, params.direction());
293
294    (collected_entries, new_state)
295}
296
297fn multi_file_anchor_usec(file_indexes: &[FileIndex], anchor: Anchor) -> u64 {
298    match anchor {
299        Anchor::Timestamp(ts) => ts.get(),
300        Anchor::Head => file_indexes
301            .iter()
302            .map(|fi| fi.start_time().to_microseconds().get())
303            .min()
304            .unwrap_or(0),
305        Anchor::Tail => file_indexes
306            .iter()
307            .map(|fi| fi.end_time().to_microseconds().get())
308            .max()
309            .unwrap_or(0),
310    }
311}
312
313fn relevant_file_indexes(
314    file_indexes: &[FileIndex],
315    direction: Direction,
316    anchor_usec: u64,
317) -> Vec<&FileIndex> {
318    file_indexes
319        .iter()
320        .filter(|fi| file_can_contain_anchor(fi, direction, anchor_usec))
321        .collect()
322}
323
324fn file_can_contain_anchor(file_index: &FileIndex, direction: Direction, anchor_usec: u64) -> bool {
325    match direction {
326        Direction::Forward => file_index.end_time().to_microseconds().get() >= anchor_usec,
327        Direction::Backward => file_index.start_time().to_microseconds().get() <= anchor_usec,
328    }
329}
330
331fn sort_relevant_indexes(file_indexes: &mut [&FileIndex], direction: Direction) {
332    match direction {
333        Direction::Forward => file_indexes.sort_by_key(|fi| fi.start_time()),
334        Direction::Backward => file_indexes.sort_by_key(|fi| std::cmp::Reverse(fi.end_time())),
335    }
336}
337
338fn collection_limit_and_buffer(limit: Option<usize>) -> (usize, Vec<LogEntryId>) {
339    match limit {
340        Some(limit) => (limit, Vec::with_capacity(limit)),
341        None => (usize::MAX, Vec::with_capacity(200)),
342    }
343}
344
345fn query_cancelled(cancellation: Option<&CancellationToken>, state: &PaginationState) -> bool {
346    let Some(token) = cancellation else {
347        return false;
348    };
349    if !token.is_cancelled() {
350        return false;
351    }
352    warn!(
353        "log query cancelled after processing {} files, returning partial results",
354        state.file_positions.len()
355    );
356    true
357}
358
359fn mark_file_processed(progress: Option<&Arc<AtomicUsize>>) {
360    if let Some(counter) = progress {
361        counter.fetch_add(1, Ordering::Relaxed);
362    }
363}
364
365fn should_prune_file(
366    file_index: &FileIndex,
367    collected_entries: &[LogEntryId],
368    limit: usize,
369    direction: Direction,
370) -> bool {
371    collected_entries.len() >= limit
372        && can_prune_file(file_index, collected_entries, direction).unwrap_or(false)
373}
374
375fn query_file_entries(
376    file_index: &FileIndex,
377    params: &LogQueryParams,
378    state: Option<&PaginationState>,
379) -> Option<Vec<LogEntryId>> {
380    let file = file_index.file();
381    let file_params = params_for_file(file_index, params, state);
382    match file_index.find_log_entries(file, &file_params) {
383        Ok(entries) if entries.is_empty() => None,
384        Ok(entries) => Some(entries),
385        Err(e) => {
386            warn!(file = file.path(), "failed to retrieve log entries: {e}");
387            None
388        }
389    }
390}
391
392fn params_for_file(
393    file_index: &FileIndex,
394    params: &LogQueryParams,
395    state: Option<&PaginationState>,
396) -> LogQueryParams {
397    let Some(pos) = state.and_then(|s| s.file_positions.get(file_index.file()).copied()) else {
398        return params.clone();
399    };
400
401    let mut builder = LogQueryParamsBuilder::new(params.anchor(), params.direction());
402    if let Some(limit) = params.limit() {
403        builder = builder.with_limit(limit);
404    }
405    if let Some(field) = params.source_timestamp_field() {
406        builder = builder.with_source_timestamp_field(Some(field.clone()));
407    }
408    if let Some(filter) = params.filter() {
409        builder = builder.with_filter(filter.clone());
410    }
411    if let Some(after) = params.after() {
412        builder = builder.with_after(after);
413    }
414    if let Some(before) = params.before() {
415        builder = builder.with_before(before);
416    }
417    if let Some(regex) = params.regex() {
418        builder = builder.with_regex(regex.as_str());
419    }
420
421    builder
422        .with_resume_position(pos)
423        .build()
424        .expect("resume params copied from validated query params")
425}
426
427fn update_pagination_state(
428    state: &mut PaginationState,
429    entries: &[LogEntryId],
430    direction: Direction,
431) {
432    for entry in entries {
433        state
434            .file_positions
435            .entry(entry.file.clone())
436            .and_modify(|pos| {
437                *pos = next_resume_position(*pos, entry.position, direction);
438            })
439            .or_insert(entry.position);
440    }
441}
442
443fn next_resume_position(current: usize, candidate: usize, direction: Direction) -> usize {
444    match direction {
445        Direction::Forward => current.max(candidate),
446        Direction::Backward => current.min(candidate),
447    }
448}
449
450/// Check if we can prune (skip) a file based on its time range and current results.
451///
452/// Returns Some(true) if we should break early, Some(false) if we should continue,
453/// or None if we can't determine (shouldn't happen with a full result set).
454fn can_prune_file(
455    file_index: &FileIndex,
456    result: &[LogEntryId],
457    direction: Direction,
458) -> Option<bool> {
459    match direction {
460        Direction::Forward => {
461            // For forward: if file starts after our latest entry, skip all remaining files
462            let max_timestamp = result.last()?.timestamp.get();
463            Some(file_index.start_time().to_microseconds().get() > max_timestamp)
464        }
465        Direction::Backward => {
466            // For backward: if file ends before our earliest entry, skip all remaining files
467            let min_timestamp = result.first()?.timestamp.get();
468            Some(file_index.end_time().to_microseconds().get() < min_timestamp)
469        }
470    }
471}
472
473/// Merges two sorted vectors into a single sorted vector with at most `limit` elements.
474///
475/// This function performs a two-pointer merge, which is efficient for combining
476/// sorted sequences. It only retains the smallest/largest `limit` entries by timestamp
477/// depending on the direction.
478///
479/// # Arguments
480///
481/// * `a` - First sorted vector
482/// * `b` - Second sorted vector
483/// * `limit` - Maximum number of elements in the result
484/// * `direction` - Direction determines ascending (Forward) or descending (Backward) order
485///
486/// # Returns
487///
488/// A new vector containing the merged and limited results
489fn merge_log_entries(
490    a: Vec<LogEntryId>,
491    b: Vec<LogEntryId>,
492    limit: usize,
493    direction: Direction,
494) -> Vec<LogEntryId> {
495    // Handle simple cases
496    if a.is_empty() {
497        return b.into_iter().take(limit).collect();
498    }
499    if b.is_empty() {
500        return a.into_iter().take(limit).collect();
501    }
502
503    // Allocate result vector with appropriate capacity — cap at actual data size
504    // to avoid capacity overflow when limit is usize::MAX (no limit set).
505    let mut result = Vec::with_capacity(a.len().saturating_add(b.len()).min(limit));
506    let mut i = 0;
507    let mut j = 0;
508
509    // Two-pointer merge: always take the appropriate element based on direction
510    while result.len() < limit {
511        let take_from_a = match (i < a.len(), j < b.len()) {
512            (true, false) => true,
513            (false, true) => false,
514            (false, false) => break,
515            (true, true) => match direction {
516                Direction::Forward => a[i].timestamp <= b[j].timestamp,
517                Direction::Backward => a[i].timestamp >= b[j].timestamp,
518            },
519        };
520
521        if take_from_a {
522            result.push(a[i].clone());
523            i += 1;
524        } else {
525            result.push(b[j].clone());
526            j += 1;
527        }
528    }
529
530    result
531}
532
533fn is_projected(raw_field_name: &str, output_fields: Option<&HashSet<String>>) -> bool {
534    output_fields.map_or(true, |projected| projected.contains(raw_field_name))
535}
536
537/// Raw field data extracted from a journal entry.
538///
539/// This is an intermediate representation between a `LogEntryId` (which only contains
540/// a file offset) and format-specific structures like `Table`, Arrow `RecordBatch`,
541/// or columnar data.
542///
543/// The fields are stored as `FieldValuePair` objects, which efficiently store the
544/// field name and value with a cached split position for fast access.
545#[derive(Debug, Clone)]
546pub struct LogEntryData {
547    /// Timestamp of the entry in microseconds since epoch
548    pub timestamp: u64,
549    /// All field=value pairs in this entry
550    pub fields: Vec<FieldValuePair>,
551}
552
553/// Extracts raw field data from multiple log entries efficiently.
554///
555/// This function groups entries by file and processes them in batches,
556/// minimizing file open/close overhead. It reads the journal files and
557/// extracts all field=value pairs without applying any transformations.
558///
559/// # Arguments
560///
561/// * `log_entries` - Slice of log entry IDs to extract data from
562///
563/// # Returns
564///
565/// A vector of `LogEntryData` in the same order as the input entries
566fn extract_entry_data(
567    log_entries: &[LogEntryId],
568    output_fields: Option<&HashSet<String>>,
569) -> Result<Vec<LogEntryData>> {
570    let entries_by_file = entries_grouped_by_file(log_entries);
571    let mut result = vec![None; log_entries.len()];
572
573    for (file, file_entries) in entries_by_file {
574        let journal_file = JournalFile::<Mmap>::open(file, 8 * 1024 * 1024)?;
575        let mut row = CurrentRowView::default();
576
577        for (original_idx, entry) in file_entries {
578            let fields = read_entry_fields(&journal_file, entry, output_fields, &mut row)?;
579            result[original_idx] = Some(LogEntryData {
580                timestamp: entry.timestamp.get(),
581                fields,
582            });
583        }
584    }
585
586    Ok(result.into_iter().flatten().collect())
587}
588
589fn entries_grouped_by_file(
590    log_entries: &[LogEntryId],
591) -> HashMap<&File, Vec<(usize, &LogEntryId)>> {
592    let mut entries_by_file: HashMap<&File, Vec<(usize, &LogEntryId)>> = HashMap::new();
593    for (idx, entry) in log_entries.iter().enumerate() {
594        entries_by_file
595            .entry(&entry.file)
596            .or_default()
597            .push((idx, entry));
598    }
599    entries_by_file
600}
601
602fn read_entry_fields(
603    journal_file: &JournalFile<Mmap>,
604    entry: &LogEntryId,
605    output_fields: Option<&HashSet<String>>,
606    row: &mut CurrentRowView,
607) -> Result<Vec<FieldValuePair>> {
608    let entry_offset =
609        NonZeroU64::new(entry.offset).ok_or(journal_core::JournalError::InvalidOffset)?;
610    row.load_entry(journal_file, entry_offset)?;
611    let mut fields = Vec::with_capacity(row.data_offset_count());
612    row.restart_data()?;
613
614    let result = (|| {
615        while let Some((_, payload)) = row.read_next_payload_with_offset(journal_file)? {
616            let payload = row.payload_slice(payload);
617            if let Some(pair) = read_projected_pair(payload, output_fields) {
618                fields.push(pair);
619            }
620        }
621        Ok(fields)
622    })();
623    row.reset_data_state(journal_file)?;
624    result
625}
626
627fn read_projected_pair(
628    payload_bytes: &[u8],
629    output_fields: Option<&HashSet<String>>,
630) -> Option<FieldValuePair> {
631    if !payload_may_match_projection(payload_bytes, output_fields) {
632        return None;
633    }
634    if let Some(pair) = FieldValuePair::parse_bytes(payload_bytes) {
635        return is_projected(pair.field(), output_fields).then_some(pair);
636    }
637    let payload_str = String::from_utf8_lossy(payload_bytes);
638    let Some(pair) = FieldValuePair::parse(&payload_str) else {
639        return None;
640    };
641    is_projected(pair.field(), output_fields).then_some(pair)
642}
643
644fn payload_may_match_projection(
645    payload_bytes: &[u8],
646    output_fields: Option<&HashSet<String>>,
647) -> bool {
648    let Some(projected) = output_fields else {
649        return true;
650    };
651    let Some(eq) = payload_bytes.iter().position(|byte| *byte == b'=') else {
652        return true;
653    };
654    let Ok(field) = std::str::from_utf8(&payload_bytes[..eq]) else {
655        return true;
656    };
657    projected.contains(field)
658}
659
660#[cfg(test)]
661mod tests {
662    use super::*;
663
664    fn projected_fields(fields: &[&str]) -> HashSet<String> {
665        fields.iter().map(|field| (*field).to_string()).collect()
666    }
667
668    #[test]
669    fn projection_accepts_raw_systemd_field_name() {
670        let projected = projected_fields(&["_SYSTEMD_UNIT"]);
671
672        assert!(is_projected("_SYSTEMD_UNIT", Some(&projected)));
673    }
674
675    #[test]
676    fn projection_rejects_unmatched_field_names() {
677        let projected = projected_fields(&["service.name"]);
678
679        assert!(!is_projected("_SYSTEMD_UNIT", Some(&projected)));
680    }
681
682    #[test]
683    fn projection_accepts_all_fields_without_projection_filter() {
684        assert!(is_projected("_SYSTEMD_UNIT", None));
685    }
686
687    #[test]
688    fn projected_pair_prefilter_rejects_unmatched_utf8_field_without_parsing_value() {
689        let projected = projected_fields(&["MESSAGE"]);
690
691        assert!(read_projected_pair(b"PRIORITY=3", Some(&projected)).is_none());
692        assert_eq!(
693            read_projected_pair(b"MESSAGE=hello", Some(&projected))
694                .expect("projected pair")
695                .as_str(),
696            "MESSAGE=hello"
697        );
698    }
699
700    #[test]
701    fn projected_pair_preserves_lossy_legacy_path_for_non_utf8_payloads() {
702        let projected = projected_fields(&["FIELD"]);
703        let pair = read_projected_pair(b"FIELD=\xff", Some(&projected)).expect("lossy pair");
704
705        assert_eq!(pair.field(), "FIELD");
706    }
707}