use crate::error::Result;
use journal_core::file::{CurrentRowView, JournalFile, Mmap};
use journal_index::{
Anchor, Direction, FieldName, FieldValuePair, FileIndex, Filter, LogEntryId, LogQueryParams,
LogQueryParamsBuilder, Microseconds,
};
use journal_registry::File;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroU64;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_util::sync::CancellationToken;
use tracing::warn;
#[derive(Debug, Clone, Default)]
pub struct PaginationState {
pub file_positions: HashMap<File, usize>,
}
pub struct LogQuery<'a> {
file_indexes: &'a [FileIndex],
builder: LogQueryParamsBuilder,
cancellation: Option<CancellationToken>,
progress: Option<Arc<AtomicUsize>>,
output_fields: Option<HashSet<String>>,
}
impl<'a> LogQuery<'a> {
pub fn new(file_indexes: &'a [FileIndex], anchor: Anchor, direction: Direction) -> Self {
Self {
file_indexes,
builder: LogQueryParamsBuilder::new(anchor, direction).with_source_timestamp_field(
Some(FieldName::new_unchecked("_SOURCE_REALTIME_TIMESTAMP")),
),
cancellation: None,
progress: None,
output_fields: None,
}
}
pub fn with_limit(mut self, limit: usize) -> Self {
self.builder = self.builder.with_limit(limit);
self
}
pub fn with_source_timestamp_field(mut self, field: Option<FieldName>) -> Self {
self.builder = self.builder.with_source_timestamp_field(field);
self
}
pub fn with_filter(mut self, filter: Filter) -> Self {
self.builder = self.builder.with_filter(filter);
self
}
pub fn with_after_usec(mut self, after: u64) -> Self {
self.builder = self.builder.with_after(Microseconds(after));
self
}
pub fn with_before_usec(mut self, before: u64) -> Self {
self.builder = self.builder.with_before(Microseconds(before));
self
}
pub fn with_regex(mut self, pattern: impl Into<String>) -> Self {
self.builder = self.builder.with_regex(pattern);
self
}
pub fn with_cancellation(mut self, token: CancellationToken) -> Self {
self.cancellation = Some(token);
self
}
pub fn with_progress(mut self, counter: Arc<AtomicUsize>) -> Self {
self.progress = Some(counter);
self
}
pub fn with_output_fields<I, S>(mut self, fields: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.output_fields = Some(fields.into_iter().map(Into::into).collect());
self
}
pub fn execute(self) -> Result<Vec<LogEntryData>> {
let params = self.builder.build()?;
let output_fields = self.output_fields;
let (log_entry_ids, _state) = retrieve_log_entries(
self.file_indexes.to_vec(),
params,
None,
self.cancellation.as_ref(),
self.progress.as_ref(),
);
extract_entry_data(&log_entry_ids, output_fields.as_ref())
}
pub fn execute_page(
self,
state: Option<&PaginationState>,
) -> Result<(Vec<LogEntryData>, PaginationState)> {
let params = self.builder.build()?;
let output_fields = self.output_fields;
let (log_entry_ids, new_state) = retrieve_log_entries(
self.file_indexes.to_vec(),
params,
state,
self.cancellation.as_ref(),
self.progress.as_ref(),
);
let data = extract_entry_data(&log_entry_ids, output_fields.as_ref())?;
Ok((data, new_state))
}
}
fn retrieve_log_entries(
file_indexes: Vec<FileIndex>,
params: LogQueryParams,
state: Option<&PaginationState>,
cancellation: Option<&CancellationToken>,
progress: Option<&Arc<AtomicUsize>>,
) -> (Vec<LogEntryId>, PaginationState) {
if params.limit() == Some(0) || file_indexes.is_empty() {
return (Vec::new(), PaginationState::default());
}
let anchor_usec = multi_file_anchor_usec(&file_indexes, params.anchor());
let mut relevant_indexes =
relevant_file_indexes(&file_indexes, params.direction(), anchor_usec);
if let Some(counter) = progress {
let filtered = file_indexes.len() - relevant_indexes.len();
counter.fetch_add(filtered, Ordering::Relaxed);
}
if relevant_indexes.is_empty() {
return (Vec::new(), PaginationState::default());
}
sort_relevant_indexes(&mut relevant_indexes, params.direction());
let (limit, mut collected_entries) = collection_limit_and_buffer(params.limit());
let mut new_state = state.cloned().unwrap_or_default();
for file_index in relevant_indexes {
if query_cancelled(cancellation, &new_state) {
break;
}
mark_file_processed(progress);
if should_prune_file(file_index, &collected_entries, limit, params.direction()) {
break;
}
if let Some(new_entries) = query_file_entries(file_index, ¶ms, state) {
collected_entries =
merge_log_entries(collected_entries, new_entries, limit, params.direction());
}
}
update_pagination_state(&mut new_state, &collected_entries, params.direction());
(collected_entries, new_state)
}
fn multi_file_anchor_usec(file_indexes: &[FileIndex], anchor: Anchor) -> u64 {
match anchor {
Anchor::Timestamp(ts) => ts.get(),
Anchor::Head => file_indexes
.iter()
.map(|fi| fi.start_time().to_microseconds().get())
.min()
.unwrap_or(0),
Anchor::Tail => file_indexes
.iter()
.map(|fi| fi.end_time().to_microseconds().get())
.max()
.unwrap_or(0),
}
}
fn relevant_file_indexes(
file_indexes: &[FileIndex],
direction: Direction,
anchor_usec: u64,
) -> Vec<&FileIndex> {
file_indexes
.iter()
.filter(|fi| file_can_contain_anchor(fi, direction, anchor_usec))
.collect()
}
fn file_can_contain_anchor(file_index: &FileIndex, direction: Direction, anchor_usec: u64) -> bool {
match direction {
Direction::Forward => file_index.end_time().to_microseconds().get() >= anchor_usec,
Direction::Backward => file_index.start_time().to_microseconds().get() <= anchor_usec,
}
}
fn sort_relevant_indexes(file_indexes: &mut [&FileIndex], direction: Direction) {
match direction {
Direction::Forward => file_indexes.sort_by_key(|fi| fi.start_time()),
Direction::Backward => file_indexes.sort_by_key(|fi| std::cmp::Reverse(fi.end_time())),
}
}
fn collection_limit_and_buffer(limit: Option<usize>) -> (usize, Vec<LogEntryId>) {
match limit {
Some(limit) => (limit, Vec::with_capacity(limit)),
None => (usize::MAX, Vec::with_capacity(200)),
}
}
fn query_cancelled(cancellation: Option<&CancellationToken>, state: &PaginationState) -> bool {
let Some(token) = cancellation else {
return false;
};
if !token.is_cancelled() {
return false;
}
warn!(
"log query cancelled after processing {} files, returning partial results",
state.file_positions.len()
);
true
}
fn mark_file_processed(progress: Option<&Arc<AtomicUsize>>) {
if let Some(counter) = progress {
counter.fetch_add(1, Ordering::Relaxed);
}
}
fn should_prune_file(
file_index: &FileIndex,
collected_entries: &[LogEntryId],
limit: usize,
direction: Direction,
) -> bool {
collected_entries.len() >= limit
&& can_prune_file(file_index, collected_entries, direction).unwrap_or(false)
}
fn query_file_entries(
file_index: &FileIndex,
params: &LogQueryParams,
state: Option<&PaginationState>,
) -> Option<Vec<LogEntryId>> {
let file = file_index.file();
let file_params = params_for_file(file_index, params, state);
match file_index.find_log_entries(file, &file_params) {
Ok(entries) if entries.is_empty() => None,
Ok(entries) => Some(entries),
Err(e) => {
warn!(file = file.path(), "failed to retrieve log entries: {e}");
None
}
}
}
fn params_for_file(
file_index: &FileIndex,
params: &LogQueryParams,
state: Option<&PaginationState>,
) -> LogQueryParams {
let Some(pos) = state.and_then(|s| s.file_positions.get(file_index.file()).copied()) else {
return params.clone();
};
let mut builder = LogQueryParamsBuilder::new(params.anchor(), params.direction());
if let Some(limit) = params.limit() {
builder = builder.with_limit(limit);
}
if let Some(field) = params.source_timestamp_field() {
builder = builder.with_source_timestamp_field(Some(field.clone()));
}
if let Some(filter) = params.filter() {
builder = builder.with_filter(filter.clone());
}
if let Some(after) = params.after() {
builder = builder.with_after(after);
}
if let Some(before) = params.before() {
builder = builder.with_before(before);
}
if let Some(regex) = params.regex() {
builder = builder.with_regex(regex.as_str());
}
builder
.with_resume_position(pos)
.build()
.expect("resume params copied from validated query params")
}
fn update_pagination_state(
state: &mut PaginationState,
entries: &[LogEntryId],
direction: Direction,
) {
for entry in entries {
state
.file_positions
.entry(entry.file.clone())
.and_modify(|pos| {
*pos = next_resume_position(*pos, entry.position, direction);
})
.or_insert(entry.position);
}
}
fn next_resume_position(current: usize, candidate: usize, direction: Direction) -> usize {
match direction {
Direction::Forward => current.max(candidate),
Direction::Backward => current.min(candidate),
}
}
fn can_prune_file(
file_index: &FileIndex,
result: &[LogEntryId],
direction: Direction,
) -> Option<bool> {
match direction {
Direction::Forward => {
let max_timestamp = result.last()?.timestamp.get();
Some(file_index.start_time().to_microseconds().get() > max_timestamp)
}
Direction::Backward => {
let min_timestamp = result.first()?.timestamp.get();
Some(file_index.end_time().to_microseconds().get() < min_timestamp)
}
}
}
fn merge_log_entries(
a: Vec<LogEntryId>,
b: Vec<LogEntryId>,
limit: usize,
direction: Direction,
) -> Vec<LogEntryId> {
if a.is_empty() {
return b.into_iter().take(limit).collect();
}
if b.is_empty() {
return a.into_iter().take(limit).collect();
}
let mut result = Vec::with_capacity(a.len().saturating_add(b.len()).min(limit));
let mut i = 0;
let mut j = 0;
while result.len() < limit {
let take_from_a = match (i < a.len(), j < b.len()) {
(true, false) => true,
(false, true) => false,
(false, false) => break,
(true, true) => match direction {
Direction::Forward => a[i].timestamp <= b[j].timestamp,
Direction::Backward => a[i].timestamp >= b[j].timestamp,
},
};
if take_from_a {
result.push(a[i].clone());
i += 1;
} else {
result.push(b[j].clone());
j += 1;
}
}
result
}
fn is_projected(raw_field_name: &str, output_fields: Option<&HashSet<String>>) -> bool {
output_fields.map_or(true, |projected| projected.contains(raw_field_name))
}
#[derive(Debug, Clone)]
pub struct LogEntryData {
pub timestamp: u64,
pub fields: Vec<FieldValuePair>,
}
fn extract_entry_data(
log_entries: &[LogEntryId],
output_fields: Option<&HashSet<String>>,
) -> Result<Vec<LogEntryData>> {
let entries_by_file = entries_grouped_by_file(log_entries);
let mut result = vec![None; log_entries.len()];
for (file, file_entries) in entries_by_file {
let journal_file = JournalFile::<Mmap>::open(file, 8 * 1024 * 1024)?;
let mut row = CurrentRowView::default();
for (original_idx, entry) in file_entries {
let fields = read_entry_fields(&journal_file, entry, output_fields, &mut row)?;
result[original_idx] = Some(LogEntryData {
timestamp: entry.timestamp.get(),
fields,
});
}
}
Ok(result.into_iter().flatten().collect())
}
fn entries_grouped_by_file(
log_entries: &[LogEntryId],
) -> HashMap<&File, Vec<(usize, &LogEntryId)>> {
let mut entries_by_file: HashMap<&File, Vec<(usize, &LogEntryId)>> = HashMap::new();
for (idx, entry) in log_entries.iter().enumerate() {
entries_by_file
.entry(&entry.file)
.or_default()
.push((idx, entry));
}
entries_by_file
}
fn read_entry_fields(
journal_file: &JournalFile<Mmap>,
entry: &LogEntryId,
output_fields: Option<&HashSet<String>>,
row: &mut CurrentRowView,
) -> Result<Vec<FieldValuePair>> {
let entry_offset =
NonZeroU64::new(entry.offset).ok_or(journal_core::JournalError::InvalidOffset)?;
row.load_entry(journal_file, entry_offset)?;
let mut fields = Vec::with_capacity(row.data_offset_count());
row.restart_data()?;
let result = (|| {
while let Some((_, payload)) = row.read_next_payload_with_offset(journal_file)? {
let payload = row.payload_slice(payload);
if let Some(pair) = read_projected_pair(payload, output_fields) {
fields.push(pair);
}
}
Ok(fields)
})();
row.reset_data_state(journal_file)?;
result
}
fn read_projected_pair(
payload_bytes: &[u8],
output_fields: Option<&HashSet<String>>,
) -> Option<FieldValuePair> {
if !payload_may_match_projection(payload_bytes, output_fields) {
return None;
}
if let Some(pair) = FieldValuePair::parse_bytes(payload_bytes) {
return is_projected(pair.field(), output_fields).then_some(pair);
}
let payload_str = String::from_utf8_lossy(payload_bytes);
let Some(pair) = FieldValuePair::parse(&payload_str) else {
return None;
};
is_projected(pair.field(), output_fields).then_some(pair)
}
fn payload_may_match_projection(
payload_bytes: &[u8],
output_fields: Option<&HashSet<String>>,
) -> bool {
let Some(projected) = output_fields else {
return true;
};
let Some(eq) = payload_bytes.iter().position(|byte| *byte == b'=') else {
return true;
};
let Ok(field) = std::str::from_utf8(&payload_bytes[..eq]) else {
return true;
};
projected.contains(field)
}
#[cfg(test)]
mod tests {
use super::*;
fn projected_fields(fields: &[&str]) -> HashSet<String> {
fields.iter().map(|field| (*field).to_string()).collect()
}
#[test]
fn projection_accepts_raw_systemd_field_name() {
let projected = projected_fields(&["_SYSTEMD_UNIT"]);
assert!(is_projected("_SYSTEMD_UNIT", Some(&projected)));
}
#[test]
fn projection_rejects_unmatched_field_names() {
let projected = projected_fields(&["service.name"]);
assert!(!is_projected("_SYSTEMD_UNIT", Some(&projected)));
}
#[test]
fn projection_accepts_all_fields_without_projection_filter() {
assert!(is_projected("_SYSTEMD_UNIT", None));
}
#[test]
fn projected_pair_prefilter_rejects_unmatched_utf8_field_without_parsing_value() {
let projected = projected_fields(&["MESSAGE"]);
assert!(read_projected_pair(b"PRIORITY=3", Some(&projected)).is_none());
assert_eq!(
read_projected_pair(b"MESSAGE=hello", Some(&projected))
.expect("projected pair")
.as_str(),
"MESSAGE=hello"
);
}
#[test]
fn projected_pair_preserves_lossy_legacy_path_for_non_utf8_payloads() {
let projected = projected_fields(&["FIELD"]);
let pair = read_projected_pair(b"FIELD=\xff", Some(&projected)).expect("lossy pair");
assert_eq!(pair.field(), "FIELD");
}
}