1use crate::error::Result;
8use journal_core::file::{CurrentRowView, JournalFile, Mmap};
9use journal_index::{
10 Anchor, Direction, FieldName, FieldValuePair, FileIndex, Filter, LogEntryId, LogQueryParams,
11 LogQueryParamsBuilder, Microseconds,
12};
13use journal_registry::File;
14use std::collections::{HashMap, HashSet};
15use std::num::NonZeroU64;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use tokio_util::sync::CancellationToken;
19use tracing::warn;
20
21#[derive(Debug, Clone, Default)]
30pub struct PaginationState {
31 pub file_positions: HashMap<File, usize>,
33}
34
35pub struct LogQuery<'a> {
55 file_indexes: &'a [FileIndex],
56 builder: LogQueryParamsBuilder,
57 cancellation: Option<CancellationToken>,
58 progress: Option<Arc<AtomicUsize>>,
59 output_fields: Option<HashSet<String>>,
60}
61
62impl<'a> LogQuery<'a> {
63 pub fn new(file_indexes: &'a [FileIndex], anchor: Anchor, direction: Direction) -> Self {
78 Self {
79 file_indexes,
80 builder: LogQueryParamsBuilder::new(anchor, direction).with_source_timestamp_field(
81 Some(FieldName::new_unchecked("_SOURCE_REALTIME_TIMESTAMP")),
82 ),
83 cancellation: None,
84 progress: None,
85 output_fields: None,
86 }
87 }
88
89 pub fn with_limit(mut self, limit: usize) -> Self {
93 self.builder = self.builder.with_limit(limit);
94 self
95 }
96
97 pub fn with_source_timestamp_field(mut self, field: Option<FieldName>) -> Self {
102 self.builder = self.builder.with_source_timestamp_field(field);
103 self
104 }
105
106 pub fn with_filter(mut self, filter: Filter) -> Self {
110 self.builder = self.builder.with_filter(filter);
111 self
112 }
113
114 pub fn with_after_usec(mut self, after: u64) -> Self {
119 self.builder = self.builder.with_after(Microseconds(after));
120 self
121 }
122
123 pub fn with_before_usec(mut self, before: u64) -> Self {
128 self.builder = self.builder.with_before(Microseconds(before));
129 self
130 }
131
132 pub fn with_regex(mut self, pattern: impl Into<String>) -> Self {
140 self.builder = self.builder.with_regex(pattern);
141 self
142 }
143
144 pub fn with_cancellation(mut self, token: CancellationToken) -> Self {
149 self.cancellation = Some(token);
150 self
151 }
152
153 pub fn with_progress(mut self, counter: Arc<AtomicUsize>) -> Self {
158 self.progress = Some(counter);
159 self
160 }
161
162 pub fn with_output_fields<I, S>(mut self, fields: I) -> Self
164 where
165 I: IntoIterator<Item = S>,
166 S: Into<String>,
167 {
168 self.output_fields = Some(fields.into_iter().map(Into::into).collect());
169 self
170 }
171
172 pub fn execute(self) -> Result<Vec<LogEntryData>> {
181 let params = self.builder.build()?;
182 let output_fields = self.output_fields;
183 let (log_entry_ids, _state) = retrieve_log_entries(
184 self.file_indexes.to_vec(),
185 params,
186 None,
187 self.cancellation.as_ref(),
188 self.progress.as_ref(),
189 );
190
191 extract_entry_data(&log_entry_ids, output_fields.as_ref())
192 }
193
194 pub fn execute_page(
212 self,
213 state: Option<&PaginationState>,
214 ) -> Result<(Vec<LogEntryData>, PaginationState)> {
215 let params = self.builder.build()?;
216 let output_fields = self.output_fields;
217 let (log_entry_ids, new_state) = retrieve_log_entries(
218 self.file_indexes.to_vec(),
219 params,
220 state,
221 self.cancellation.as_ref(),
222 self.progress.as_ref(),
223 );
224
225 let data = extract_entry_data(&log_entry_ids, output_fields.as_ref())?;
226 Ok((data, new_state))
227 }
228}
229
230fn retrieve_log_entries(
246 file_indexes: Vec<FileIndex>,
247 params: LogQueryParams,
248 state: Option<&PaginationState>,
249 cancellation: Option<&CancellationToken>,
250 progress: Option<&Arc<AtomicUsize>>,
251) -> (Vec<LogEntryId>, PaginationState) {
252 if params.limit() == Some(0) || file_indexes.is_empty() {
254 return (Vec::new(), PaginationState::default());
255 }
256
257 let anchor_usec = multi_file_anchor_usec(&file_indexes, params.anchor());
258 let mut relevant_indexes =
259 relevant_file_indexes(&file_indexes, params.direction(), anchor_usec);
260
261 if let Some(counter) = progress {
262 let filtered = file_indexes.len() - relevant_indexes.len();
263 counter.fetch_add(filtered, Ordering::Relaxed);
264 }
265
266 if relevant_indexes.is_empty() {
267 return (Vec::new(), PaginationState::default());
268 }
269
270 sort_relevant_indexes(&mut relevant_indexes, params.direction());
271
272 let (limit, mut collected_entries) = collection_limit_and_buffer(params.limit());
273 let mut new_state = state.cloned().unwrap_or_default();
274
275 for file_index in relevant_indexes {
276 if query_cancelled(cancellation, &new_state) {
277 break;
278 }
279
280 mark_file_processed(progress);
281
282 if should_prune_file(file_index, &collected_entries, limit, params.direction()) {
283 break;
284 }
285
286 if let Some(new_entries) = query_file_entries(file_index, ¶ms, state) {
287 collected_entries =
288 merge_log_entries(collected_entries, new_entries, limit, params.direction());
289 }
290 }
291
292 update_pagination_state(&mut new_state, &collected_entries, params.direction());
293
294 (collected_entries, new_state)
295}
296
297fn multi_file_anchor_usec(file_indexes: &[FileIndex], anchor: Anchor) -> u64 {
298 match anchor {
299 Anchor::Timestamp(ts) => ts.get(),
300 Anchor::Head => file_indexes
301 .iter()
302 .map(|fi| fi.start_time().to_microseconds().get())
303 .min()
304 .unwrap_or(0),
305 Anchor::Tail => file_indexes
306 .iter()
307 .map(|fi| fi.end_time().to_microseconds().get())
308 .max()
309 .unwrap_or(0),
310 }
311}
312
313fn relevant_file_indexes(
314 file_indexes: &[FileIndex],
315 direction: Direction,
316 anchor_usec: u64,
317) -> Vec<&FileIndex> {
318 file_indexes
319 .iter()
320 .filter(|fi| file_can_contain_anchor(fi, direction, anchor_usec))
321 .collect()
322}
323
324fn file_can_contain_anchor(file_index: &FileIndex, direction: Direction, anchor_usec: u64) -> bool {
325 match direction {
326 Direction::Forward => file_index.end_time().to_microseconds().get() >= anchor_usec,
327 Direction::Backward => file_index.start_time().to_microseconds().get() <= anchor_usec,
328 }
329}
330
331fn sort_relevant_indexes(file_indexes: &mut [&FileIndex], direction: Direction) {
332 match direction {
333 Direction::Forward => file_indexes.sort_by_key(|fi| fi.start_time()),
334 Direction::Backward => file_indexes.sort_by_key(|fi| std::cmp::Reverse(fi.end_time())),
335 }
336}
337
338fn collection_limit_and_buffer(limit: Option<usize>) -> (usize, Vec<LogEntryId>) {
339 match limit {
340 Some(limit) => (limit, Vec::with_capacity(limit)),
341 None => (usize::MAX, Vec::with_capacity(200)),
342 }
343}
344
345fn query_cancelled(cancellation: Option<&CancellationToken>, state: &PaginationState) -> bool {
346 let Some(token) = cancellation else {
347 return false;
348 };
349 if !token.is_cancelled() {
350 return false;
351 }
352 warn!(
353 "log query cancelled after processing {} files, returning partial results",
354 state.file_positions.len()
355 );
356 true
357}
358
359fn mark_file_processed(progress: Option<&Arc<AtomicUsize>>) {
360 if let Some(counter) = progress {
361 counter.fetch_add(1, Ordering::Relaxed);
362 }
363}
364
365fn should_prune_file(
366 file_index: &FileIndex,
367 collected_entries: &[LogEntryId],
368 limit: usize,
369 direction: Direction,
370) -> bool {
371 collected_entries.len() >= limit
372 && can_prune_file(file_index, collected_entries, direction).unwrap_or(false)
373}
374
375fn query_file_entries(
376 file_index: &FileIndex,
377 params: &LogQueryParams,
378 state: Option<&PaginationState>,
379) -> Option<Vec<LogEntryId>> {
380 let file = file_index.file();
381 let file_params = params_for_file(file_index, params, state);
382 match file_index.find_log_entries(file, &file_params) {
383 Ok(entries) if entries.is_empty() => None,
384 Ok(entries) => Some(entries),
385 Err(e) => {
386 warn!(file = file.path(), "failed to retrieve log entries: {e}");
387 None
388 }
389 }
390}
391
392fn params_for_file(
393 file_index: &FileIndex,
394 params: &LogQueryParams,
395 state: Option<&PaginationState>,
396) -> LogQueryParams {
397 let Some(pos) = state.and_then(|s| s.file_positions.get(file_index.file()).copied()) else {
398 return params.clone();
399 };
400
401 let mut builder = LogQueryParamsBuilder::new(params.anchor(), params.direction());
402 if let Some(limit) = params.limit() {
403 builder = builder.with_limit(limit);
404 }
405 if let Some(field) = params.source_timestamp_field() {
406 builder = builder.with_source_timestamp_field(Some(field.clone()));
407 }
408 if let Some(filter) = params.filter() {
409 builder = builder.with_filter(filter.clone());
410 }
411 if let Some(after) = params.after() {
412 builder = builder.with_after(after);
413 }
414 if let Some(before) = params.before() {
415 builder = builder.with_before(before);
416 }
417 if let Some(regex) = params.regex() {
418 builder = builder.with_regex(regex.as_str());
419 }
420
421 builder
422 .with_resume_position(pos)
423 .build()
424 .expect("resume params copied from validated query params")
425}
426
427fn update_pagination_state(
428 state: &mut PaginationState,
429 entries: &[LogEntryId],
430 direction: Direction,
431) {
432 for entry in entries {
433 state
434 .file_positions
435 .entry(entry.file.clone())
436 .and_modify(|pos| {
437 *pos = next_resume_position(*pos, entry.position, direction);
438 })
439 .or_insert(entry.position);
440 }
441}
442
443fn next_resume_position(current: usize, candidate: usize, direction: Direction) -> usize {
444 match direction {
445 Direction::Forward => current.max(candidate),
446 Direction::Backward => current.min(candidate),
447 }
448}
449
450fn can_prune_file(
455 file_index: &FileIndex,
456 result: &[LogEntryId],
457 direction: Direction,
458) -> Option<bool> {
459 match direction {
460 Direction::Forward => {
461 let max_timestamp = result.last()?.timestamp.get();
463 Some(file_index.start_time().to_microseconds().get() > max_timestamp)
464 }
465 Direction::Backward => {
466 let min_timestamp = result.first()?.timestamp.get();
468 Some(file_index.end_time().to_microseconds().get() < min_timestamp)
469 }
470 }
471}
472
473fn merge_log_entries(
490 a: Vec<LogEntryId>,
491 b: Vec<LogEntryId>,
492 limit: usize,
493 direction: Direction,
494) -> Vec<LogEntryId> {
495 if a.is_empty() {
497 return b.into_iter().take(limit).collect();
498 }
499 if b.is_empty() {
500 return a.into_iter().take(limit).collect();
501 }
502
503 let mut result = Vec::with_capacity(a.len().saturating_add(b.len()).min(limit));
506 let mut i = 0;
507 let mut j = 0;
508
509 while result.len() < limit {
511 let take_from_a = match (i < a.len(), j < b.len()) {
512 (true, false) => true,
513 (false, true) => false,
514 (false, false) => break,
515 (true, true) => match direction {
516 Direction::Forward => a[i].timestamp <= b[j].timestamp,
517 Direction::Backward => a[i].timestamp >= b[j].timestamp,
518 },
519 };
520
521 if take_from_a {
522 result.push(a[i].clone());
523 i += 1;
524 } else {
525 result.push(b[j].clone());
526 j += 1;
527 }
528 }
529
530 result
531}
532
533fn is_projected(raw_field_name: &str, output_fields: Option<&HashSet<String>>) -> bool {
534 output_fields.map_or(true, |projected| projected.contains(raw_field_name))
535}
536
537#[derive(Debug, Clone)]
546pub struct LogEntryData {
547 pub timestamp: u64,
549 pub fields: Vec<FieldValuePair>,
551}
552
553fn extract_entry_data(
567 log_entries: &[LogEntryId],
568 output_fields: Option<&HashSet<String>>,
569) -> Result<Vec<LogEntryData>> {
570 let entries_by_file = entries_grouped_by_file(log_entries);
571 let mut result = vec![None; log_entries.len()];
572
573 for (file, file_entries) in entries_by_file {
574 let journal_file = JournalFile::<Mmap>::open(file, 8 * 1024 * 1024)?;
575 let mut row = CurrentRowView::default();
576
577 for (original_idx, entry) in file_entries {
578 let fields = read_entry_fields(&journal_file, entry, output_fields, &mut row)?;
579 result[original_idx] = Some(LogEntryData {
580 timestamp: entry.timestamp.get(),
581 fields,
582 });
583 }
584 }
585
586 Ok(result.into_iter().flatten().collect())
587}
588
589fn entries_grouped_by_file(
590 log_entries: &[LogEntryId],
591) -> HashMap<&File, Vec<(usize, &LogEntryId)>> {
592 let mut entries_by_file: HashMap<&File, Vec<(usize, &LogEntryId)>> = HashMap::new();
593 for (idx, entry) in log_entries.iter().enumerate() {
594 entries_by_file
595 .entry(&entry.file)
596 .or_default()
597 .push((idx, entry));
598 }
599 entries_by_file
600}
601
602fn read_entry_fields(
603 journal_file: &JournalFile<Mmap>,
604 entry: &LogEntryId,
605 output_fields: Option<&HashSet<String>>,
606 row: &mut CurrentRowView,
607) -> Result<Vec<FieldValuePair>> {
608 let entry_offset =
609 NonZeroU64::new(entry.offset).ok_or(journal_core::JournalError::InvalidOffset)?;
610 row.load_entry(journal_file, entry_offset)?;
611 let mut fields = Vec::with_capacity(row.data_offset_count());
612 row.restart_data()?;
613
614 let result = (|| {
615 while let Some((_, payload)) = row.read_next_payload_with_offset(journal_file)? {
616 let payload = row.payload_slice(payload);
617 if let Some(pair) = read_projected_pair(payload, output_fields) {
618 fields.push(pair);
619 }
620 }
621 Ok(fields)
622 })();
623 row.reset_data_state(journal_file)?;
624 result
625}
626
627fn read_projected_pair(
628 payload_bytes: &[u8],
629 output_fields: Option<&HashSet<String>>,
630) -> Option<FieldValuePair> {
631 if !payload_may_match_projection(payload_bytes, output_fields) {
632 return None;
633 }
634 if let Some(pair) = FieldValuePair::parse_bytes(payload_bytes) {
635 return is_projected(pair.field(), output_fields).then_some(pair);
636 }
637 let payload_str = String::from_utf8_lossy(payload_bytes);
638 let Some(pair) = FieldValuePair::parse(&payload_str) else {
639 return None;
640 };
641 is_projected(pair.field(), output_fields).then_some(pair)
642}
643
644fn payload_may_match_projection(
645 payload_bytes: &[u8],
646 output_fields: Option<&HashSet<String>>,
647) -> bool {
648 let Some(projected) = output_fields else {
649 return true;
650 };
651 let Some(eq) = payload_bytes.iter().position(|byte| *byte == b'=') else {
652 return true;
653 };
654 let Ok(field) = std::str::from_utf8(&payload_bytes[..eq]) else {
655 return true;
656 };
657 projected.contains(field)
658}
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663
664 fn projected_fields(fields: &[&str]) -> HashSet<String> {
665 fields.iter().map(|field| (*field).to_string()).collect()
666 }
667
668 #[test]
669 fn projection_accepts_raw_systemd_field_name() {
670 let projected = projected_fields(&["_SYSTEMD_UNIT"]);
671
672 assert!(is_projected("_SYSTEMD_UNIT", Some(&projected)));
673 }
674
675 #[test]
676 fn projection_rejects_unmatched_field_names() {
677 let projected = projected_fields(&["service.name"]);
678
679 assert!(!is_projected("_SYSTEMD_UNIT", Some(&projected)));
680 }
681
682 #[test]
683 fn projection_accepts_all_fields_without_projection_filter() {
684 assert!(is_projected("_SYSTEMD_UNIT", None));
685 }
686
687 #[test]
688 fn projected_pair_prefilter_rejects_unmatched_utf8_field_without_parsing_value() {
689 let projected = projected_fields(&["MESSAGE"]);
690
691 assert!(read_projected_pair(b"PRIORITY=3", Some(&projected)).is_none());
692 assert_eq!(
693 read_projected_pair(b"MESSAGE=hello", Some(&projected))
694 .expect("projected pair")
695 .as_str(),
696 "MESSAGE=hello"
697 );
698 }
699
700 #[test]
701 fn projected_pair_preserves_lossy_legacy_path_for_non_utf8_payloads() {
702 let projected = projected_fields(&["FIELD"]);
703 let pair = read_projected_pair(b"FIELD=\xff", Some(&projected)).expect("lossy pair");
704
705 assert_eq!(pair.field(), "FIELD");
706 }
707}