Skip to main content

journal_index/
file_indexer.rs

1//! Journal file indexing functionality.
2//!
3//! This module provides the [`FileIndexer`] type which creates searchable
4//! indexes from journal files. The indexing process extracts:
5//!
6//! - Time-based histograms for efficient range queries
7//! - Bitmap indexes for fast field=value lookups
8//! - Metadata about available and indexed fields
9
10use crate::{
11    Bitmap, FieldName, FieldValuePair, FileIndex, Histogram, IndexError, Microseconds, Result,
12    Seconds,
13};
14use journal_core::collections::{HashMap, HashSet};
15use journal_core::file::{HashableObject, JournalFile, Mmap, offset_array::InlinedCursor};
16use journal_registry::File;
17use std::num::NonZeroU64;
18use tracing::{error, trace, warn};
19
20/// Default maximum number of unique values to index per field.
21pub const DEFAULT_MAX_UNIQUE_VALUES_PER_FIELD: usize = 500;
22
23/// Default maximum payload size (in bytes) for field values to index.
24pub const DEFAULT_MAX_FIELD_PAYLOAD_SIZE: usize = 100;
25
26/// Configuration limits for the indexing process.
27///
28/// These limits protect against unbounded memory growth when indexing
29/// journal files with high-cardinality fields or large payloads.
30#[derive(Debug, Clone, Copy)]
31pub struct IndexingLimits {
32    /// Maximum number of unique values to index per field.
33    ///
34    /// Fields with more unique values than this limit will have their indexing
35    /// truncated. This protects against high-cardinality fields (e.g., MESSAGE
36    /// with millions of unique values) causing memory exhaustion.
37    pub max_unique_values_per_field: usize,
38
39    /// Maximum payload size (in bytes) for field values to index.
40    ///
41    /// Field values with payloads larger than this limit (or compressed values)
42    /// will be skipped. This prevents large binary data or encoded content
43    /// from consuming excessive memory.
44    pub max_field_payload_size: usize,
45}
46
47impl Default for IndexingLimits {
48    fn default() -> Self {
49        Self {
50            max_unique_values_per_field: DEFAULT_MAX_UNIQUE_VALUES_PER_FIELD,
51            max_field_payload_size: DEFAULT_MAX_FIELD_PAYLOAD_SIZE,
52        }
53    }
54}
55
56/// Reusable indexer for creating searchable indexes from journal files.
57///
58/// # Indexing Process
59///
60/// The indexer performs three main tasks:
61///
62/// 1. **Histogram Construction**: Creates time-based buckets for efficient
63///    range queries. Entries are ordered by their source timestamp (if
64///    available) or realtime timestamp.
65///
66/// 2. **Bitmap Index Creation**: For each specified field, creates bitmap
67///    indexes mapping field=value pairs to entry indices, enabling fast
68///    filtered queries.
69///
70/// 3. **Metadata Collection**: Tracks which fields are available in the file
71///    and which were indexed.
72///
73/// # Concurrent Write Handling
74///
75/// The indexer captures the journal file's `tail_object_offset` at the start of indexing
76/// to create a consistent snapshot. Any entries written to the file after indexing begins
77/// are ignored, preventing race conditions with concurrent writers.
78#[derive(Debug)]
79#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
80pub struct FileIndexer {
81    /// Configuration limits for the indexing process.
82    limits: IndexingLimits,
83
84    // Associates a source timestamp value with its inlined cursor
85    source_timestamp_cursor_pairs: Vec<(Microseconds, InlinedCursor)>,
86
87    // Scratch buffer to collect entry offsets from the inlined cursor of a
88    // source timestamp value, or the global entry offset array
89    entry_offsets: Vec<NonZeroU64>,
90
91    // Associates a source timestamp value with its entry offset
92    source_timestamp_entry_offset_pairs: Vec<(Microseconds, NonZeroU64)>,
93
94    // Associates a journal file's entry realtime value with its offset
95    realtime_entry_offset_pairs: Vec<(Microseconds, NonZeroU64)>,
96
97    // Scratch buffer to collect the indices of entries in which a data
98    // object appears
99    entry_indices: Vec<u32>,
100
101    // Maps entry offsets to an index of an implicitly defined time-ordered
102    // array of entries
103    entry_offset_index: HashMap<NonZeroU64, u64>,
104}
105
106impl Default for FileIndexer {
107    fn default() -> Self {
108        Self::new(IndexingLimits::default())
109    }
110}
111
112impl FileIndexer {
113    /// Create a new indexer with the specified configuration limits.
114    pub fn new(limits: IndexingLimits) -> Self {
115        Self {
116            limits,
117            source_timestamp_cursor_pairs: Vec::new(),
118            entry_offsets: Vec::new(),
119            source_timestamp_entry_offset_pairs: Vec::new(),
120            realtime_entry_offset_pairs: Vec::new(),
121            entry_indices: Vec::new(),
122            entry_offset_index: HashMap::default(),
123        }
124    }
125}
126
127impl FileIndexer {
128    /// Create a searchable index from a journal file.
129    ///
130    /// # Arguments
131    /// * `file` - The journal file to index
132    /// * `source_timestamp_field` - Optional field to use for timestamps
133    /// * `field_names` - Fields to create bitmap indexes for
134    /// * `bucket_duration` - Duration of histogram buckets
135    pub fn index(
136        &mut self,
137        file: &File,
138        source_timestamp_field: Option<&FieldName>,
139        field_names: &[FieldName],
140        bucket_duration: Seconds,
141    ) -> Result<FileIndex> {
142        self.source_timestamp_cursor_pairs = Vec::new();
143        self.source_timestamp_entry_offset_pairs = Vec::new();
144        self.realtime_entry_offset_pairs = Vec::new();
145        self.entry_indices = Vec::new();
146        self.entry_offsets = Vec::new();
147        self.entry_offset_index = HashMap::default();
148
149        let window_size = 32 * 1024 * 1024;
150        let journal_file = JournalFile::<Mmap>::open(file, window_size)?;
151
152        // NOTE: Capture the maximum valid entry offset at the start of
153        // indexing.
154        //
155        // This prevents race conditions when the journal file is being
156        // actively written to. The `tail_object_offset` from the header tells
157        // us the offset of the last object in the file at this moment. Any
158        // entry offset beyond this was added after we started indexing and
159        // should be ignored.
160        let Some(tail_object_offset) = journal_file.journal_header_ref().tail_object_offset else {
161            return Err(IndexError::MissingOffset);
162        };
163
164        // Capture indexing timestamp
165        let indexed_at = Seconds::now();
166
167        // Capture whether the file was online when indexed.
168        //
169        // A file is considered online if:
170        // 1. The journal header state is 1 (STATE_ONLINE), OR
171        // 2. The file is an "Active" file by filename (e.g., system.journal
172        //    without the @seqnum_id-head_seqnum-head_realtime suffix)
173        //
174        // We check both conditions because systemd-journal may temporarily set
175        // `state != 1` on active journal files (e.g., during flush operations).
176        // If we only checked the header state, we might incorrectly mark an
177        // active file as offline/archived, causing its cache entry to be
178        // considered "always fresh" and never re-indexed. This would result
179        // in the file being excluded from queries for current time ranges
180        // because its bounded time range (from when it was indexed) doesn't
181        // overlap with the query range.
182        //
183        let was_online = journal_file.journal_header_ref().state == 1 || file.is_active();
184
185        // Build the file histogram
186        let histogram = self.build_histogram(
187            &journal_file,
188            source_timestamp_field,
189            bucket_duration,
190            tail_object_offset,
191        )?;
192
193        // Use the (timestamp, entry-offset) pairs to construct a vector that
194        // will contain entry offsets sorted by time
195        let entry_offsets = self
196            .source_timestamp_entry_offset_pairs
197            .iter()
198            .map(|(_, entry_offset)| entry_offset.get() as u32)
199            .collect();
200
201        // Create the bitmaps for field=value pairs
202        let entries =
203            self.build_entries_index(&journal_file, field_names, tail_object_offset, was_online)?;
204
205        // Convert field_names to HashSet<FieldName> for indexed_fields
206        let indexed_fields: HashSet<FieldName> = field_names.iter().cloned().collect();
207
208        let mut file_fields = HashSet::default();
209        for field in journal_file.fields() {
210            let field = field?;
211            let field_name = String::from_utf8_lossy(field.raw_payload()).into_owned();
212            file_fields.insert(FieldName::new_unchecked(field_name));
213        }
214
215        Ok(FileIndex::new(
216            file.clone(),
217            indexed_at,
218            was_online,
219            histogram,
220            entry_offsets,
221            file_fields,
222            indexed_fields,
223            entries,
224        ))
225    }
226
227    /// Build bitmap indexes for field=value pairs.
228    ///
229    /// For each field in `field_names`, this iterates through all data objects
230    /// for that field and creates a bitmap mapping each unique field=value pair
231    /// to the entry indices where it appears.
232    ///
233    /// Only entries with offsets <= `tail_object_offset` are included in the
234    /// bitmaps, ensuring a consistent snapshot.
235    ///
236    /// Fields with more than `self.limits.max_unique_values_per_field` unique values
237    /// will have their indexing truncated to prevent unbounded memory growth.
238    fn build_entries_index(
239        &mut self,
240        journal_file: &JournalFile<Mmap>,
241        field_names: &[FieldName],
242        tail_object_offset: NonZeroU64,
243        was_online: bool,
244    ) -> Result<HashMap<FieldValuePair, Bitmap>> {
245        let mut entries_index = HashMap::default();
246        let mut issues = FieldIndexIssues::default();
247
248        for field_name in field_names {
249            self.index_field_values(
250                journal_file,
251                field_name,
252                tail_object_offset,
253                &mut entries_index,
254                &mut issues,
255            )?;
256        }
257
258        issues.log(journal_file, self.limits, was_online);
259        Ok(entries_index)
260    }
261
262    fn index_field_values<'a>(
263        &mut self,
264        journal_file: &JournalFile<Mmap>,
265        field_name: &'a FieldName,
266        tail_object_offset: NonZeroU64,
267        entries_index: &mut HashMap<FieldValuePair, Bitmap>,
268        issues: &mut FieldIndexIssues<'a>,
269    ) -> Result<()> {
270        let field_data_iterator = match journal_file.field_data_objects(field_name.as_bytes()) {
271            Ok(field_data_iterator) => field_data_iterator,
272            Err(e) => {
273                warn!(
274                    "failed to iterate field data objects for field '{}' in file {}: {:#?}",
275                    field_name,
276                    journal_file.file().path(),
277                    e
278                );
279                return Ok(());
280            }
281        };
282        let mut state = FieldIndexState::default();
283        for data_object in field_data_iterator {
284            if state.unique_values_count >= self.limits.max_unique_values_per_field {
285                state.was_truncated = true;
286                break;
287            }
288
289            let (data_payload, inlined_cursor) = {
290                let Ok(data_object) = data_object else {
291                    continue;
292                };
293                if data_object.raw_payload().len() >= self.limits.max_field_payload_size
294                    || data_object.is_compressed()
295                {
296                    state.ignored_large_payloads += 1;
297                    continue;
298                }
299
300                let data_payload = String::from_utf8_lossy(data_object.raw_payload()).into_owned();
301                let Some(inlined_cursor) = data_object.inlined_cursor() else {
302                    continue;
303                };
304                (data_payload, inlined_cursor)
305            };
306
307            let Some(pair) = FieldValuePair::parse(&data_payload) else {
308                warn!("Invalid field=value format: {}", data_payload);
309                continue;
310            };
311            if self.collect_data_entry_indices(journal_file, inlined_cursor, tail_object_offset)? {
312                insert_field_bitmap(entries_index, field_name, pair.value(), &self.entry_indices);
313                state.unique_values_count += 1;
314            }
315        }
316
317        issues.record(field_name, state);
318        Ok(())
319    }
320
321    fn collect_data_entry_indices(
322        &mut self,
323        journal_file: &JournalFile<Mmap>,
324        inlined_cursor: InlinedCursor,
325        tail_object_offset: NonZeroU64,
326    ) -> Result<bool> {
327        self.entry_offsets.clear();
328        if let Err(err) = inlined_cursor.collect_offsets(journal_file, &mut self.entry_offsets) {
329            warn!("failed to collect entry offsets from DATA object index: {err:?}");
330            return Ok(false);
331        }
332
333        self.entry_indices.clear();
334        for entry_offset in self
335            .entry_offsets
336            .iter()
337            .copied()
338            .filter(|offset| *offset <= tail_object_offset)
339        {
340            let Some(entry_index) = self.entry_offset_index.get(&entry_offset) else {
341                panic!(
342                    "missing entry offset {} from index (total offsets: {})",
343                    entry_offset,
344                    self.entry_offset_index.len()
345                );
346            };
347            self.entry_indices.push(*entry_index as u32);
348        }
349
350        if self.entry_indices.is_empty() {
351            return Ok(false);
352        }
353        self.entry_indices.sort_unstable();
354        Ok(true)
355    }
356
357    /// Collect timestamp information from a source timestamp field.
358    ///
359    /// This extracts (timestamp, entry_offset) pairs from the specified source
360    /// field (typically `_SOURCE_REALTIME_TIMESTAMP`). The pairs are sorted by
361    /// timestamp and used to build `entry_offset_index`, which maps each entry
362    /// offset to its position in the time-ordered sequence.
363    ///
364    /// If the source field is missing for some entries, those entries will be
365    /// handled later using the journal file's realtime timestamp.
366    fn collect_source_field_info(
367        &mut self,
368        journal_file: &JournalFile<Mmap>,
369        source_field_name: &[u8],
370    ) -> Result<()> {
371        // Create an iterator over all the different values the field can take
372        let field_data_iterator = journal_file.field_data_objects(source_field_name)?;
373
374        // Collect all the inlined cursors of the source timestamp field
375        self.source_timestamp_cursor_pairs.clear();
376        for data_object_result in field_data_iterator {
377            let Ok(data_object) = data_object_result else {
378                warn!("loading data object failed");
379                continue;
380            };
381
382            let Ok(source_timestamp) =
383                crate::field_types::parse_timestamp(source_field_name, &data_object)
384            else {
385                warn!("parsing source timestamp failed");
386                continue;
387            };
388
389            let Some(ic) = data_object.inlined_cursor() else {
390                use journal_core::file::JournalState;
391
392                let file_state = JournalState::try_from(journal_file.journal_header_ref().state)
393                    .map(|s| s.to_string())
394                    .unwrap_or_else(|_| "UNKNOWN".to_string());
395
396                warn!(
397                    "orphaned data object (no entries) for _SOURCE_REALTIME_TIMESTAMP={} in {} (state: {})",
398                    source_timestamp,
399                    journal_file.file().path(),
400                    file_state
401                );
402                continue;
403            };
404
405            self.source_timestamp_cursor_pairs
406                .push((Microseconds(source_timestamp), ic));
407        }
408
409        // Collect all the [source_timestamp, entry-offset] pairs
410        self.source_timestamp_entry_offset_pairs.clear();
411        for (ts, ic) in self.source_timestamp_cursor_pairs.iter() {
412            self.entry_offsets.clear();
413
414            match ic.collect_offsets(journal_file, &mut self.entry_offsets) {
415                Ok(_) => {}
416                Err(e) => {
417                    error!("failed to collect offsets from source timestamp: {}", e);
418                    continue;
419                }
420            }
421
422            for entry_offset in &self.entry_offsets {
423                self.source_timestamp_entry_offset_pairs
424                    .push((*ts, *entry_offset));
425            }
426        }
427        // Sort the [source_timestamp, entry-offset] pairs
428        self.source_timestamp_entry_offset_pairs.sort_unstable();
429
430        // Map each entry offset to its position in the pair vector
431        for (idx, (_, entry_offset)) in self.source_timestamp_entry_offset_pairs.iter().enumerate()
432        {
433            self.entry_offset_index.insert(*entry_offset, idx as _);
434        }
435
436        Ok(())
437    }
438
439    /// Build a time-based histogram for the journal file.
440    ///
441    /// This creates a histogram that maps time ranges to entry counts, enabling
442    /// efficient time-range queries. The histogram uses the source timestamp field
443    /// if available, falling back to the journal file's realtime timestamp for
444    /// entries where the source field is missing.
445    ///
446    /// The method:
447    /// 1. Collects timestamps from the source field (if specified)
448    /// 2. Loads the global entry offset array
449    /// 3. Fills in missing timestamps using realtime values
450    /// 4. Sorts all (timestamp, entry_offset) pairs by time
451    /// 5. Constructs the histogram with the specified bucket duration
452    fn build_histogram(
453        &mut self,
454        journal_file: &JournalFile<Mmap>,
455        source_timestamp_field_name: Option<&FieldName>,
456        bucket_duration: Seconds,
457        tail_object_offset: NonZeroU64,
458    ) -> Result<Histogram> {
459        // Collect information from the source timestamp field
460        if let Some(source_field_name) = source_timestamp_field_name {
461            self.collect_source_field_info(journal_file, source_field_name.as_bytes())?;
462        }
463
464        // At this point:
465        //
466        // - `self.source_timestamp_entry_offset_pairs`: contains a vector of
467        //   (timestamp, entry-offset) pairs sorted by time,
468        // - `self.entry_offset_index`: maps an entry offset to a number
469        //   with the following invariant:
470        //      if (e1.offset < e2.offset) then e1.number < e2.number.
471
472        // Load the global entry offset array from the file
473        self.entry_offsets.clear();
474        journal_file.entry_offsets(&mut self.entry_offsets)?;
475
476        // Iterate through entry offsets and find entries for which we could
477        // not collect a timestamp. In this case, fall-back to using the journal
478        // file's realtime timestamp. Filter out offsets beyond our maximum.
479        self.realtime_entry_offset_pairs.clear();
480        for entry_offset in self
481            .entry_offsets
482            .iter()
483            .copied()
484            .filter(|offset| *offset <= tail_object_offset)
485        {
486            if self.entry_offset_index.contains_key(&entry_offset) {
487                // We have the timestamp of this entry offset
488                continue;
489            }
490
491            // We don't know the timestamp of this entry offset, use
492            // the journal's file realtime timestamp.
493
494            let timestamp = {
495                let entry = journal_file.entry_ref(entry_offset)?;
496                entry.header.realtime
497            };
498
499            // Add the new (timestamp, entry-offset) pair
500            self.realtime_entry_offset_pairs
501                .push((Microseconds(timestamp), entry_offset));
502        }
503
504        // At this point:
505        //
506        // - `self.realtime_entry_offset_pairs`: contains (timestamp, entry-offset)
507        // pairs of all the entries for which we had to use the journal file's
508        // realtime timestamp.
509
510        // Reconstruct our indexes if we have entries whose time does not
511        // come from the source timestamp
512        if !self.realtime_entry_offset_pairs.is_empty() {
513            // Extend the vector holding pairs collected from the source timestamp
514            // with the pairs collected from the realtime timestamp and
515            // sort it by time again.
516            self.source_timestamp_entry_offset_pairs
517                .append(&mut self.realtime_entry_offset_pairs);
518            self.source_timestamp_entry_offset_pairs.sort_unstable();
519
520            // We need to rebuild the `self.entry_offset_index` because
521            // we found entry offsets from the global entry offset array
522            // whose timestamp is assume to be equal to the realtime timestamp
523            // of the journal file
524            self.entry_offset_index.clear();
525            for (idx, (_, entry_offset)) in
526                self.source_timestamp_entry_offset_pairs.iter().enumerate()
527            {
528                self.entry_offset_index.insert(*entry_offset, idx as _);
529            }
530        }
531
532        // At this point, we have information about the order and the time
533        // of all entries in the journal file:
534        //
535        // - `self.source_timestamp_entry_offset_pairs`: contains a vector of
536        //   (timestamp, entry-offset) pairs sorted by time,
537        // - `self.entry_offset_index`: maps an entry offset to a number
538        //   with the following invariant:
539        //      if (e1.offset < e2.offset) then e1.number < e2.number.
540        //
541        // We can proceed with building the histogram
542
543        // Now we can build the file histogram
544        Histogram::from_timestamp_offset_pairs(
545            bucket_duration,
546            self.source_timestamp_entry_offset_pairs.as_slice(),
547        )
548    }
549}
550
551#[derive(Default)]
552struct FieldIndexState {
553    unique_values_count: usize,
554    ignored_large_payloads: usize,
555    was_truncated: bool,
556}
557
558#[derive(Default)]
559struct FieldIndexIssues<'a> {
560    truncated_fields: Vec<&'a FieldName>,
561    fields_with_large_payloads: Vec<&'a FieldName>,
562}
563
564impl<'a> FieldIndexIssues<'a> {
565    fn record(&mut self, field_name: &'a FieldName, state: FieldIndexState) {
566        if state.was_truncated {
567            self.truncated_fields.push(field_name);
568        }
569        if state.ignored_large_payloads > 0 {
570            self.fields_with_large_payloads.push(field_name);
571        }
572    }
573
574    fn log(&self, journal_file: &JournalFile<Mmap>, limits: IndexingLimits, was_online: bool) {
575        self.log_truncated_fields(journal_file, limits, was_online);
576        self.log_large_payload_fields(journal_file, was_online);
577    }
578
579    fn log_truncated_fields(
580        &self,
581        journal_file: &JournalFile<Mmap>,
582        limits: IndexingLimits,
583        was_online: bool,
584    ) {
585        if self.truncated_fields.is_empty() {
586            return;
587        }
588
589        let field_names: Vec<&str> = self.truncated_fields.iter().map(|f| f.as_str()).collect();
590        let msg = format!(
591            "File '{}': {} field(s) truncated due to cardinality limit ({}): {:?}",
592            journal_file.file().path(),
593            self.truncated_fields.len(),
594            limits.max_unique_values_per_field,
595            field_names
596        );
597        if was_online {
598            trace!("{msg}");
599        } else {
600            warn!("{msg}");
601        }
602    }
603
604    fn log_large_payload_fields(&self, journal_file: &JournalFile<Mmap>, was_online: bool) {
605        if self.fields_with_large_payloads.is_empty() {
606            return;
607        }
608
609        let field_names: Vec<&str> = self
610            .fields_with_large_payloads
611            .iter()
612            .map(|f| f.as_str())
613            .collect();
614        let msg = format!(
615            "File '{}': {} field(s) had values skipped due to large payloads: {:?}",
616            journal_file.file().path(),
617            self.fields_with_large_payloads.len(),
618            field_names
619        );
620        if was_online {
621            trace!("{msg}");
622        } else {
623            tracing::info!("{msg}");
624        }
625    }
626}
627
628fn insert_field_bitmap(
629    entries_index: &mut HashMap<FieldValuePair, Bitmap>,
630    field_name: &FieldName,
631    value: &str,
632    entry_indices: &[u32],
633) {
634    let mut bitmap =
635        Bitmap::from_sorted_iter(entry_indices.iter().copied()).expect("sorted entry indices");
636    bitmap.optimize();
637
638    let field_name = FieldName::new_unchecked(field_name);
639    let key = FieldValuePair::new_unchecked(field_name, value.to_string());
640    entries_index.insert(key, bitmap);
641}