journal_index/file_indexer.rs
1//! Journal file indexing functionality.
2//!
3//! This module provides the [`FileIndexer`] type which creates searchable
4//! indexes from journal files. The indexing process extracts:
5//!
6//! - Time-based histograms for efficient range queries
7//! - Bitmap indexes for fast field=value lookups
8//! - Metadata about available and indexed fields
9
10use crate::{
11 Bitmap, FieldName, FieldValuePair, FileIndex, Histogram, IndexError, Microseconds, Result,
12 Seconds,
13};
14use journal_core::collections::{HashMap, HashSet};
15use journal_core::file::{HashableObject, JournalFile, Mmap, offset_array::InlinedCursor};
16use journal_registry::File;
17use std::num::NonZeroU64;
18use tracing::{error, trace, warn};
19
20/// Default maximum number of unique values to index per field.
21pub const DEFAULT_MAX_UNIQUE_VALUES_PER_FIELD: usize = 500;
22
23/// Default maximum payload size (in bytes) for field values to index.
24pub const DEFAULT_MAX_FIELD_PAYLOAD_SIZE: usize = 100;
25
26/// Configuration limits for the indexing process.
27///
28/// These limits protect against unbounded memory growth when indexing
29/// journal files with high-cardinality fields or large payloads.
30#[derive(Debug, Clone, Copy)]
31pub struct IndexingLimits {
32 /// Maximum number of unique values to index per field.
33 ///
34 /// Fields with more unique values than this limit will have their indexing
35 /// truncated. This protects against high-cardinality fields (e.g., MESSAGE
36 /// with millions of unique values) causing memory exhaustion.
37 pub max_unique_values_per_field: usize,
38
39 /// Maximum payload size (in bytes) for field values to index.
40 ///
41 /// Field values with payloads larger than this limit (or compressed values)
42 /// will be skipped. This prevents large binary data or encoded content
43 /// from consuming excessive memory.
44 pub max_field_payload_size: usize,
45}
46
47impl Default for IndexingLimits {
48 fn default() -> Self {
49 Self {
50 max_unique_values_per_field: DEFAULT_MAX_UNIQUE_VALUES_PER_FIELD,
51 max_field_payload_size: DEFAULT_MAX_FIELD_PAYLOAD_SIZE,
52 }
53 }
54}
55
56/// Reusable indexer for creating searchable indexes from journal files.
57///
58/// # Indexing Process
59///
60/// The indexer performs three main tasks:
61///
62/// 1. **Histogram Construction**: Creates time-based buckets for efficient
63/// range queries. Entries are ordered by their source timestamp (if
64/// available) or realtime timestamp.
65///
66/// 2. **Bitmap Index Creation**: For each specified field, creates bitmap
67/// indexes mapping field=value pairs to entry indices, enabling fast
68/// filtered queries.
69///
70/// 3. **Metadata Collection**: Tracks which fields are available in the file
71/// and which were indexed.
72///
73/// # Concurrent Write Handling
74///
75/// The indexer captures the journal file's `tail_object_offset` at the start of indexing
76/// to create a consistent snapshot. Any entries written to the file after indexing begins
77/// are ignored, preventing race conditions with concurrent writers.
78#[derive(Debug)]
79#[cfg_attr(feature = "allocative", derive(allocative::Allocative))]
80pub struct FileIndexer {
81 /// Configuration limits for the indexing process.
82 limits: IndexingLimits,
83
84 // Associates a source timestamp value with its inlined cursor
85 source_timestamp_cursor_pairs: Vec<(Microseconds, InlinedCursor)>,
86
87 // Scratch buffer to collect entry offsets from the inlined cursor of a
88 // source timestamp value, or the global entry offset array
89 entry_offsets: Vec<NonZeroU64>,
90
91 // Associates a source timestamp value with its entry offset
92 source_timestamp_entry_offset_pairs: Vec<(Microseconds, NonZeroU64)>,
93
94 // Associates a journal file's entry realtime value with its offset
95 realtime_entry_offset_pairs: Vec<(Microseconds, NonZeroU64)>,
96
97 // Scratch buffer to collect the indices of entries in which a data
98 // object appears
99 entry_indices: Vec<u32>,
100
101 // Maps entry offsets to an index of an implicitly defined time-ordered
102 // array of entries
103 entry_offset_index: HashMap<NonZeroU64, u64>,
104}
105
106impl Default for FileIndexer {
107 fn default() -> Self {
108 Self::new(IndexingLimits::default())
109 }
110}
111
112impl FileIndexer {
113 /// Create a new indexer with the specified configuration limits.
114 pub fn new(limits: IndexingLimits) -> Self {
115 Self {
116 limits,
117 source_timestamp_cursor_pairs: Vec::new(),
118 entry_offsets: Vec::new(),
119 source_timestamp_entry_offset_pairs: Vec::new(),
120 realtime_entry_offset_pairs: Vec::new(),
121 entry_indices: Vec::new(),
122 entry_offset_index: HashMap::default(),
123 }
124 }
125}
126
127impl FileIndexer {
128 /// Create a searchable index from a journal file.
129 ///
130 /// # Arguments
131 /// * `file` - The journal file to index
132 /// * `source_timestamp_field` - Optional field to use for timestamps
133 /// * `field_names` - Fields to create bitmap indexes for
134 /// * `bucket_duration` - Duration of histogram buckets
135 pub fn index(
136 &mut self,
137 file: &File,
138 source_timestamp_field: Option<&FieldName>,
139 field_names: &[FieldName],
140 bucket_duration: Seconds,
141 ) -> Result<FileIndex> {
142 self.source_timestamp_cursor_pairs = Vec::new();
143 self.source_timestamp_entry_offset_pairs = Vec::new();
144 self.realtime_entry_offset_pairs = Vec::new();
145 self.entry_indices = Vec::new();
146 self.entry_offsets = Vec::new();
147 self.entry_offset_index = HashMap::default();
148
149 let window_size = 32 * 1024 * 1024;
150 let journal_file = JournalFile::<Mmap>::open(file, window_size)?;
151
152 // NOTE: Capture the maximum valid entry offset at the start of
153 // indexing.
154 //
155 // This prevents race conditions when the journal file is being
156 // actively written to. The `tail_object_offset` from the header tells
157 // us the offset of the last object in the file at this moment. Any
158 // entry offset beyond this was added after we started indexing and
159 // should be ignored.
160 let Some(tail_object_offset) = journal_file.journal_header_ref().tail_object_offset else {
161 return Err(IndexError::MissingOffset);
162 };
163
164 // Capture indexing timestamp
165 let indexed_at = Seconds::now();
166
167 // Capture whether the file was online when indexed.
168 //
169 // A file is considered online if:
170 // 1. The journal header state is 1 (STATE_ONLINE), OR
171 // 2. The file is an "Active" file by filename (e.g., system.journal
172 // without the @seqnum_id-head_seqnum-head_realtime suffix)
173 //
174 // We check both conditions because systemd-journal may temporarily set
175 // `state != 1` on active journal files (e.g., during flush operations).
176 // If we only checked the header state, we might incorrectly mark an
177 // active file as offline/archived, causing its cache entry to be
178 // considered "always fresh" and never re-indexed. This would result
179 // in the file being excluded from queries for current time ranges
180 // because its bounded time range (from when it was indexed) doesn't
181 // overlap with the query range.
182 //
183 let was_online = journal_file.journal_header_ref().state == 1 || file.is_active();
184
185 // Build the file histogram
186 let histogram = self.build_histogram(
187 &journal_file,
188 source_timestamp_field,
189 bucket_duration,
190 tail_object_offset,
191 )?;
192
193 // Use the (timestamp, entry-offset) pairs to construct a vector that
194 // will contain entry offsets sorted by time
195 let entry_offsets = self
196 .source_timestamp_entry_offset_pairs
197 .iter()
198 .map(|(_, entry_offset)| entry_offset.get() as u32)
199 .collect();
200
201 // Create the bitmaps for field=value pairs
202 let entries =
203 self.build_entries_index(&journal_file, field_names, tail_object_offset, was_online)?;
204
205 // Convert field_names to HashSet<FieldName> for indexed_fields
206 let indexed_fields: HashSet<FieldName> = field_names.iter().cloned().collect();
207
208 let mut file_fields = HashSet::default();
209 for field in journal_file.fields() {
210 let field = field?;
211 let field_name = String::from_utf8_lossy(field.raw_payload()).into_owned();
212 file_fields.insert(FieldName::new_unchecked(field_name));
213 }
214
215 Ok(FileIndex::new(
216 file.clone(),
217 indexed_at,
218 was_online,
219 histogram,
220 entry_offsets,
221 file_fields,
222 indexed_fields,
223 entries,
224 ))
225 }
226
227 /// Build bitmap indexes for field=value pairs.
228 ///
229 /// For each field in `field_names`, this iterates through all data objects
230 /// for that field and creates a bitmap mapping each unique field=value pair
231 /// to the entry indices where it appears.
232 ///
233 /// Only entries with offsets <= `tail_object_offset` are included in the
234 /// bitmaps, ensuring a consistent snapshot.
235 ///
236 /// Fields with more than `self.limits.max_unique_values_per_field` unique values
237 /// will have their indexing truncated to prevent unbounded memory growth.
238 fn build_entries_index(
239 &mut self,
240 journal_file: &JournalFile<Mmap>,
241 field_names: &[FieldName],
242 tail_object_offset: NonZeroU64,
243 was_online: bool,
244 ) -> Result<HashMap<FieldValuePair, Bitmap>> {
245 let mut entries_index = HashMap::default();
246 let mut issues = FieldIndexIssues::default();
247
248 for field_name in field_names {
249 self.index_field_values(
250 journal_file,
251 field_name,
252 tail_object_offset,
253 &mut entries_index,
254 &mut issues,
255 )?;
256 }
257
258 issues.log(journal_file, self.limits, was_online);
259 Ok(entries_index)
260 }
261
262 fn index_field_values<'a>(
263 &mut self,
264 journal_file: &JournalFile<Mmap>,
265 field_name: &'a FieldName,
266 tail_object_offset: NonZeroU64,
267 entries_index: &mut HashMap<FieldValuePair, Bitmap>,
268 issues: &mut FieldIndexIssues<'a>,
269 ) -> Result<()> {
270 let field_data_iterator = match journal_file.field_data_objects(field_name.as_bytes()) {
271 Ok(field_data_iterator) => field_data_iterator,
272 Err(e) => {
273 warn!(
274 "failed to iterate field data objects for field '{}' in file {}: {:#?}",
275 field_name,
276 journal_file.file().path(),
277 e
278 );
279 return Ok(());
280 }
281 };
282 let mut state = FieldIndexState::default();
283 for data_object in field_data_iterator {
284 if state.unique_values_count >= self.limits.max_unique_values_per_field {
285 state.was_truncated = true;
286 break;
287 }
288
289 let (data_payload, inlined_cursor) = {
290 let Ok(data_object) = data_object else {
291 continue;
292 };
293 if data_object.raw_payload().len() >= self.limits.max_field_payload_size
294 || data_object.is_compressed()
295 {
296 state.ignored_large_payloads += 1;
297 continue;
298 }
299
300 let data_payload = String::from_utf8_lossy(data_object.raw_payload()).into_owned();
301 let Some(inlined_cursor) = data_object.inlined_cursor() else {
302 continue;
303 };
304 (data_payload, inlined_cursor)
305 };
306
307 let Some(pair) = FieldValuePair::parse(&data_payload) else {
308 warn!("Invalid field=value format: {}", data_payload);
309 continue;
310 };
311 if self.collect_data_entry_indices(journal_file, inlined_cursor, tail_object_offset)? {
312 insert_field_bitmap(entries_index, field_name, pair.value(), &self.entry_indices);
313 state.unique_values_count += 1;
314 }
315 }
316
317 issues.record(field_name, state);
318 Ok(())
319 }
320
321 fn collect_data_entry_indices(
322 &mut self,
323 journal_file: &JournalFile<Mmap>,
324 inlined_cursor: InlinedCursor,
325 tail_object_offset: NonZeroU64,
326 ) -> Result<bool> {
327 self.entry_offsets.clear();
328 if let Err(err) = inlined_cursor.collect_offsets(journal_file, &mut self.entry_offsets) {
329 warn!("failed to collect entry offsets from DATA object index: {err:?}");
330 return Ok(false);
331 }
332
333 self.entry_indices.clear();
334 for entry_offset in self
335 .entry_offsets
336 .iter()
337 .copied()
338 .filter(|offset| *offset <= tail_object_offset)
339 {
340 let Some(entry_index) = self.entry_offset_index.get(&entry_offset) else {
341 panic!(
342 "missing entry offset {} from index (total offsets: {})",
343 entry_offset,
344 self.entry_offset_index.len()
345 );
346 };
347 self.entry_indices.push(*entry_index as u32);
348 }
349
350 if self.entry_indices.is_empty() {
351 return Ok(false);
352 }
353 self.entry_indices.sort_unstable();
354 Ok(true)
355 }
356
357 /// Collect timestamp information from a source timestamp field.
358 ///
359 /// This extracts (timestamp, entry_offset) pairs from the specified source
360 /// field (typically `_SOURCE_REALTIME_TIMESTAMP`). The pairs are sorted by
361 /// timestamp and used to build `entry_offset_index`, which maps each entry
362 /// offset to its position in the time-ordered sequence.
363 ///
364 /// If the source field is missing for some entries, those entries will be
365 /// handled later using the journal file's realtime timestamp.
366 fn collect_source_field_info(
367 &mut self,
368 journal_file: &JournalFile<Mmap>,
369 source_field_name: &[u8],
370 ) -> Result<()> {
371 // Create an iterator over all the different values the field can take
372 let field_data_iterator = journal_file.field_data_objects(source_field_name)?;
373
374 // Collect all the inlined cursors of the source timestamp field
375 self.source_timestamp_cursor_pairs.clear();
376 for data_object_result in field_data_iterator {
377 let Ok(data_object) = data_object_result else {
378 warn!("loading data object failed");
379 continue;
380 };
381
382 let Ok(source_timestamp) =
383 crate::field_types::parse_timestamp(source_field_name, &data_object)
384 else {
385 warn!("parsing source timestamp failed");
386 continue;
387 };
388
389 let Some(ic) = data_object.inlined_cursor() else {
390 use journal_core::file::JournalState;
391
392 let file_state = JournalState::try_from(journal_file.journal_header_ref().state)
393 .map(|s| s.to_string())
394 .unwrap_or_else(|_| "UNKNOWN".to_string());
395
396 warn!(
397 "orphaned data object (no entries) for _SOURCE_REALTIME_TIMESTAMP={} in {} (state: {})",
398 source_timestamp,
399 journal_file.file().path(),
400 file_state
401 );
402 continue;
403 };
404
405 self.source_timestamp_cursor_pairs
406 .push((Microseconds(source_timestamp), ic));
407 }
408
409 // Collect all the [source_timestamp, entry-offset] pairs
410 self.source_timestamp_entry_offset_pairs.clear();
411 for (ts, ic) in self.source_timestamp_cursor_pairs.iter() {
412 self.entry_offsets.clear();
413
414 match ic.collect_offsets(journal_file, &mut self.entry_offsets) {
415 Ok(_) => {}
416 Err(e) => {
417 error!("failed to collect offsets from source timestamp: {}", e);
418 continue;
419 }
420 }
421
422 for entry_offset in &self.entry_offsets {
423 self.source_timestamp_entry_offset_pairs
424 .push((*ts, *entry_offset));
425 }
426 }
427 // Sort the [source_timestamp, entry-offset] pairs
428 self.source_timestamp_entry_offset_pairs.sort_unstable();
429
430 // Map each entry offset to its position in the pair vector
431 for (idx, (_, entry_offset)) in self.source_timestamp_entry_offset_pairs.iter().enumerate()
432 {
433 self.entry_offset_index.insert(*entry_offset, idx as _);
434 }
435
436 Ok(())
437 }
438
439 /// Build a time-based histogram for the journal file.
440 ///
441 /// This creates a histogram that maps time ranges to entry counts, enabling
442 /// efficient time-range queries. The histogram uses the source timestamp field
443 /// if available, falling back to the journal file's realtime timestamp for
444 /// entries where the source field is missing.
445 ///
446 /// The method:
447 /// 1. Collects timestamps from the source field (if specified)
448 /// 2. Loads the global entry offset array
449 /// 3. Fills in missing timestamps using realtime values
450 /// 4. Sorts all (timestamp, entry_offset) pairs by time
451 /// 5. Constructs the histogram with the specified bucket duration
452 fn build_histogram(
453 &mut self,
454 journal_file: &JournalFile<Mmap>,
455 source_timestamp_field_name: Option<&FieldName>,
456 bucket_duration: Seconds,
457 tail_object_offset: NonZeroU64,
458 ) -> Result<Histogram> {
459 // Collect information from the source timestamp field
460 if let Some(source_field_name) = source_timestamp_field_name {
461 self.collect_source_field_info(journal_file, source_field_name.as_bytes())?;
462 }
463
464 // At this point:
465 //
466 // - `self.source_timestamp_entry_offset_pairs`: contains a vector of
467 // (timestamp, entry-offset) pairs sorted by time,
468 // - `self.entry_offset_index`: maps an entry offset to a number
469 // with the following invariant:
470 // if (e1.offset < e2.offset) then e1.number < e2.number.
471
472 // Load the global entry offset array from the file
473 self.entry_offsets.clear();
474 journal_file.entry_offsets(&mut self.entry_offsets)?;
475
476 // Iterate through entry offsets and find entries for which we could
477 // not collect a timestamp. In this case, fall-back to using the journal
478 // file's realtime timestamp. Filter out offsets beyond our maximum.
479 self.realtime_entry_offset_pairs.clear();
480 for entry_offset in self
481 .entry_offsets
482 .iter()
483 .copied()
484 .filter(|offset| *offset <= tail_object_offset)
485 {
486 if self.entry_offset_index.contains_key(&entry_offset) {
487 // We have the timestamp of this entry offset
488 continue;
489 }
490
491 // We don't know the timestamp of this entry offset, use
492 // the journal's file realtime timestamp.
493
494 let timestamp = {
495 let entry = journal_file.entry_ref(entry_offset)?;
496 entry.header.realtime
497 };
498
499 // Add the new (timestamp, entry-offset) pair
500 self.realtime_entry_offset_pairs
501 .push((Microseconds(timestamp), entry_offset));
502 }
503
504 // At this point:
505 //
506 // - `self.realtime_entry_offset_pairs`: contains (timestamp, entry-offset)
507 // pairs of all the entries for which we had to use the journal file's
508 // realtime timestamp.
509
510 // Reconstruct our indexes if we have entries whose time does not
511 // come from the source timestamp
512 if !self.realtime_entry_offset_pairs.is_empty() {
513 // Extend the vector holding pairs collected from the source timestamp
514 // with the pairs collected from the realtime timestamp and
515 // sort it by time again.
516 self.source_timestamp_entry_offset_pairs
517 .append(&mut self.realtime_entry_offset_pairs);
518 self.source_timestamp_entry_offset_pairs.sort_unstable();
519
520 // We need to rebuild the `self.entry_offset_index` because
521 // we found entry offsets from the global entry offset array
522 // whose timestamp is assume to be equal to the realtime timestamp
523 // of the journal file
524 self.entry_offset_index.clear();
525 for (idx, (_, entry_offset)) in
526 self.source_timestamp_entry_offset_pairs.iter().enumerate()
527 {
528 self.entry_offset_index.insert(*entry_offset, idx as _);
529 }
530 }
531
532 // At this point, we have information about the order and the time
533 // of all entries in the journal file:
534 //
535 // - `self.source_timestamp_entry_offset_pairs`: contains a vector of
536 // (timestamp, entry-offset) pairs sorted by time,
537 // - `self.entry_offset_index`: maps an entry offset to a number
538 // with the following invariant:
539 // if (e1.offset < e2.offset) then e1.number < e2.number.
540 //
541 // We can proceed with building the histogram
542
543 // Now we can build the file histogram
544 Histogram::from_timestamp_offset_pairs(
545 bucket_duration,
546 self.source_timestamp_entry_offset_pairs.as_slice(),
547 )
548 }
549}
550
551#[derive(Default)]
552struct FieldIndexState {
553 unique_values_count: usize,
554 ignored_large_payloads: usize,
555 was_truncated: bool,
556}
557
558#[derive(Default)]
559struct FieldIndexIssues<'a> {
560 truncated_fields: Vec<&'a FieldName>,
561 fields_with_large_payloads: Vec<&'a FieldName>,
562}
563
564impl<'a> FieldIndexIssues<'a> {
565 fn record(&mut self, field_name: &'a FieldName, state: FieldIndexState) {
566 if state.was_truncated {
567 self.truncated_fields.push(field_name);
568 }
569 if state.ignored_large_payloads > 0 {
570 self.fields_with_large_payloads.push(field_name);
571 }
572 }
573
574 fn log(&self, journal_file: &JournalFile<Mmap>, limits: IndexingLimits, was_online: bool) {
575 self.log_truncated_fields(journal_file, limits, was_online);
576 self.log_large_payload_fields(journal_file, was_online);
577 }
578
579 fn log_truncated_fields(
580 &self,
581 journal_file: &JournalFile<Mmap>,
582 limits: IndexingLimits,
583 was_online: bool,
584 ) {
585 if self.truncated_fields.is_empty() {
586 return;
587 }
588
589 let field_names: Vec<&str> = self.truncated_fields.iter().map(|f| f.as_str()).collect();
590 let msg = format!(
591 "File '{}': {} field(s) truncated due to cardinality limit ({}): {:?}",
592 journal_file.file().path(),
593 self.truncated_fields.len(),
594 limits.max_unique_values_per_field,
595 field_names
596 );
597 if was_online {
598 trace!("{msg}");
599 } else {
600 warn!("{msg}");
601 }
602 }
603
604 fn log_large_payload_fields(&self, journal_file: &JournalFile<Mmap>, was_online: bool) {
605 if self.fields_with_large_payloads.is_empty() {
606 return;
607 }
608
609 let field_names: Vec<&str> = self
610 .fields_with_large_payloads
611 .iter()
612 .map(|f| f.as_str())
613 .collect();
614 let msg = format!(
615 "File '{}': {} field(s) had values skipped due to large payloads: {:?}",
616 journal_file.file().path(),
617 self.fields_with_large_payloads.len(),
618 field_names
619 );
620 if was_online {
621 trace!("{msg}");
622 } else {
623 tracing::info!("{msg}");
624 }
625 }
626}
627
628fn insert_field_bitmap(
629 entries_index: &mut HashMap<FieldValuePair, Bitmap>,
630 field_name: &FieldName,
631 value: &str,
632 entry_indices: &[u32],
633) {
634 let mut bitmap =
635 Bitmap::from_sorted_iter(entry_indices.iter().copied()).expect("sorted entry indices");
636 bitmap.optimize();
637
638 let field_name = FieldName::new_unchecked(field_name);
639 let key = FieldValuePair::new_unchecked(field_name, value.to_string());
640 entries_index.insert(key, bitmap);
641}