use super::*;
use journal_core::file::{DataObject, offset_array::InlinedCursor};
use std::collections::{HashMap, HashSet};
use std::time::{Duration, Instant};
const DEFAULT_HISTOGRAM_TARGET_BUCKETS: usize = 150;
const DEFAULT_TIME_SLACK_USEC: u64 = 120_000_000;
const EXPLORER_CONTROL_CHECK_EVERY_ROWS: u64 = 8192;
const DEFAULT_ROWS_FULL_CHECK_EVERY_ROWS: u64 = 1;
const EXPLORER_SAMPLING_SLOTS_MAX: usize = 1000;
const EXPLORER_SAMPLING_RECALIBRATE_ROWS: u64 = 10_000;
const EXPLORER_SAMPLING_ESTIMATE_AFTER_PROGRESS: f64 = 0.01;
const SOURCE_REALTIME_FIELD: &[u8] = b"_SOURCE_REALTIME_TIMESTAMP";
const UNSET_VALUE: &[u8] = b"-";
const EXPLORER_UNSAMPLED_VALUE: &[u8] = b"[unsampled]";
const EXPLORER_ESTIMATED_VALUE: &[u8] = b"[estimated]";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExplorerAnchor {
Auto,
Head,
Tail,
Realtime(u64),
}
impl Default for ExplorerAnchor {
fn default() -> Self {
Self::Auto
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExplorerFieldMode {
AllValues,
FirstValue,
}
impl Default for ExplorerFieldMode {
fn default() -> Self {
Self::FirstValue
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum ExplorerStrategy {
Traversal,
Index,
Compare,
}
impl Default for ExplorerStrategy {
fn default() -> Self {
Self::Traversal
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExplorerFilter {
pub field: Vec<u8>,
pub values: Vec<Vec<u8>>,
}
impl ExplorerFilter {
pub fn new(
field: impl Into<Vec<u8>>,
values: impl IntoIterator<Item = impl Into<Vec<u8>>>,
) -> Self {
Self {
field: field.into(),
values: values.into_iter().map(Into::into).collect(),
}
}
}
#[derive(Debug, Clone)]
pub struct ExplorerQuery {
pub after_realtime_usec: Option<u64>,
pub before_realtime_usec: Option<u64>,
pub anchor: ExplorerAnchor,
pub direction: Direction,
pub limit: usize,
pub filters: Vec<ExplorerFilter>,
pub facets: Vec<Vec<u8>>,
pub histogram: Option<Vec<u8>>,
pub histogram_after_realtime_usec: Option<u64>,
pub histogram_before_realtime_usec: Option<u64>,
pub histogram_target_buckets: usize,
pub fts_terms: Vec<ExplorerFtsPattern>,
pub fts_patterns: Vec<Vec<u8>>,
pub fts_negative_patterns: Vec<Vec<u8>>,
pub field_mode: ExplorerFieldMode,
pub exclude_facet_field_filters: bool,
pub use_source_realtime: bool,
pub realtime_slack_usec: u64,
pub stop_when_rows_full: bool,
pub stop_when_rows_full_check_every: u64,
pub sampling: Option<ExplorerSampling>,
#[doc(hidden)]
pub debug_collect_column_fields_by_row_traversal: bool,
}
impl Default for ExplorerQuery {
fn default() -> Self {
Self {
after_realtime_usec: None,
before_realtime_usec: None,
anchor: ExplorerAnchor::Auto,
direction: Direction::Forward,
limit: 200,
filters: Vec::new(),
facets: Vec::new(),
histogram: None,
histogram_after_realtime_usec: None,
histogram_before_realtime_usec: None,
histogram_target_buckets: DEFAULT_HISTOGRAM_TARGET_BUCKETS,
fts_terms: Vec::new(),
fts_patterns: Vec::new(),
fts_negative_patterns: Vec::new(),
field_mode: ExplorerFieldMode::FirstValue,
exclude_facet_field_filters: true,
use_source_realtime: true,
realtime_slack_usec: DEFAULT_TIME_SLACK_USEC,
stop_when_rows_full: false,
stop_when_rows_full_check_every: DEFAULT_ROWS_FULL_CHECK_EVERY_ROWS,
sampling: None,
debug_collect_column_fields_by_row_traversal: false,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ExplorerSampling {
pub budget: u64,
pub matched_files: u64,
pub file_head_realtime_usec: u64,
pub file_tail_realtime_usec: u64,
pub file_head_seqnum: u64,
pub file_tail_seqnum: u64,
pub file_entries: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExplorerFtsPattern {
pub parts: Vec<Vec<u8>>,
pub negative: bool,
}
impl ExplorerFtsPattern {
pub fn substring(pattern: impl Into<Vec<u8>>, negative: bool) -> Self {
let pattern = pattern.into();
let parts = pattern
.split(|byte| *byte == b'*')
.filter(|part| !part.is_empty())
.map(|part| part.to_vec())
.collect();
Self { parts, negative }
}
fn matches(&self, value: &[u8]) -> bool {
if value.is_empty() {
return false;
}
if self.parts.is_empty() {
return true;
}
let mut haystack = value;
for part in &self.parts {
let Some(index) = find_ascii_case_insensitive(haystack, part) else {
return false;
};
haystack = &haystack[index.saturating_add(part.len())..];
}
true
}
}
impl ExplorerQuery {
pub fn with_filter(
mut self,
field: impl Into<Vec<u8>>,
values: impl IntoIterator<Item = impl Into<Vec<u8>>>,
) -> Self {
self.filters.push(ExplorerFilter::new(field, values));
self
}
pub fn with_facet(mut self, field: impl Into<Vec<u8>>) -> Self {
self.facets.push(field.into());
self
}
pub fn with_histogram(mut self, field: impl Into<Vec<u8>>) -> Self {
self.histogram = Some(field.into());
self
}
pub fn with_fts_pattern(mut self, pattern: impl Into<Vec<u8>>) -> Self {
let pattern = pattern.into();
self.fts_terms
.push(ExplorerFtsPattern::substring(pattern.clone(), false));
self.fts_patterns.push(pattern);
self
}
pub fn with_fts_negative_pattern(mut self, pattern: impl Into<Vec<u8>>) -> Self {
let pattern = pattern.into();
self.fts_terms
.push(ExplorerFtsPattern::substring(pattern.clone(), true));
self.fts_negative_patterns.push(pattern);
self
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize)]
pub struct ExplorerStats {
pub rows_examined: u64,
pub rows_matched: u64,
pub facet_rows_matched: u64,
pub rows_returned: u64,
pub rows_unsampled: u64,
pub rows_estimated: u64,
pub sampling_sampled: u64,
pub sampling_unsampled: u64,
pub sampling_estimated: u64,
pub last_realtime_usec: u64,
pub max_source_realtime_delta_usec: u64,
pub data_refs_seen: u64,
pub data_refs_skipped: u64,
pub data_payloads_loaded: u64,
pub data_objects_classified: u64,
pub data_cache_hits: u64,
pub data_cache_misses: u64,
pub payloads_decompressed: u64,
pub fts_scans: u64,
pub facet_updates: u64,
pub histogram_updates: u64,
pub returned_row_expansions: u64,
pub early_stop_opportunities: u64,
pub early_stops: u64,
}
#[derive(Debug, Clone)]
pub struct ExplorerRow {
pub realtime_usec: u64,
pub cursor: String,
pub payloads: Vec<Vec<u8>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ExplorerRowPayloadMode {
Expand,
CursorOnly,
}
#[derive(Debug, Clone)]
pub struct ExplorerHistogramBucket {
pub start_realtime_usec: u64,
pub end_realtime_usec: u64,
pub values: HashMap<Vec<u8>, u64>,
}
#[derive(Debug, Clone)]
pub struct ExplorerHistogram {
pub field: Vec<u8>,
pub buckets: Vec<ExplorerHistogramBucket>,
}
#[derive(Debug, Clone, Default)]
pub struct ExplorerComparison {
pub traversal_duration: Duration,
pub index_duration: Duration,
pub traversal_stats: ExplorerStats,
pub index_stats: ExplorerStats,
}
#[derive(Debug, Clone, Default)]
pub struct ExplorerResult {
pub rows: Vec<ExplorerRow>,
pub facets: HashMap<Vec<u8>, HashMap<Vec<u8>, u64>>,
pub histogram: Option<ExplorerHistogram>,
pub column_fields: HashSet<Vec<u8>>,
pub stats: ExplorerStats,
pub comparison: Option<ExplorerComparison>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
pub enum ExplorerStopReason {
TimedOut,
Cancelled,
}
#[derive(Debug, Clone)]
pub struct ExplorerProgress {
pub stats: ExplorerStats,
pub elapsed: Duration,
}
pub struct ExplorerControl<'a> {
deadline: Option<Instant>,
cancellation: Option<&'a dyn Fn() -> bool>,
progress: Option<&'a mut dyn FnMut(ExplorerProgress)>,
candidate_row: Option<&'a mut dyn FnMut(u64) -> bool>,
adjust_realtime: Option<&'a mut dyn FnMut(u64) -> u64>,
matched_row: Option<&'a mut dyn FnMut(u64, u64) -> bool>,
sampling: Option<&'a mut ExplorerSamplingState>,
progress_interval: Duration,
started: Instant,
last_progress: Instant,
next_check_rows: u64,
stop_reason: Option<ExplorerStopReason>,
}
impl<'a> ExplorerControl<'a> {
pub fn new() -> Self {
let now = Instant::now();
Self {
deadline: None,
cancellation: None,
progress: None,
candidate_row: None,
adjust_realtime: None,
matched_row: None,
sampling: None,
progress_interval: Duration::from_millis(250),
started: now,
last_progress: now,
next_check_rows: EXPLORER_CONTROL_CHECK_EVERY_ROWS,
stop_reason: None,
}
}
pub fn set_deadline(&mut self, deadline: Option<Instant>) {
self.deadline = deadline;
}
pub fn set_cancellation_callback(&mut self, cancellation: Option<&'a dyn Fn() -> bool>) {
self.cancellation = cancellation;
}
pub fn set_progress_callback(&mut self, progress: Option<&'a mut dyn FnMut(ExplorerProgress)>) {
self.progress = progress;
}
pub(crate) fn set_candidate_row_callback(
&mut self,
candidate_row: Option<&'a mut dyn FnMut(u64) -> bool>,
) {
self.candidate_row = candidate_row;
}
pub(crate) fn set_realtime_adjust_callback(
&mut self,
adjust_realtime: Option<&'a mut dyn FnMut(u64) -> u64>,
) {
self.adjust_realtime = adjust_realtime;
}
pub fn set_matched_row_callback(
&mut self,
matched_row: Option<&'a mut dyn FnMut(u64, u64) -> bool>,
) {
self.matched_row = matched_row;
}
pub(crate) fn set_sampling_state(&mut self, sampling: Option<&'a mut ExplorerSamplingState>) {
self.sampling = sampling;
}
pub fn set_progress_interval(&mut self, interval: Duration) {
self.progress_interval = interval;
}
pub fn stop_reason(&self) -> Option<ExplorerStopReason> {
self.stop_reason
}
fn should_stop_after_rows(&mut self, rows_seen: u64, stats: &ExplorerStats) -> bool {
if self.stop_reason.is_some() {
return true;
}
if rows_seen < self.next_check_rows {
return false;
}
self.next_check_rows = rows_seen.saturating_add(EXPLORER_CONTROL_CHECK_EVERY_ROWS);
self.check(stats)
}
fn check(&mut self, stats: &ExplorerStats) -> bool {
let now = Instant::now();
if now.duration_since(self.last_progress) >= self.progress_interval {
self.emit_progress(stats, now);
}
if self.cancellation.is_some_and(|is_cancelled| is_cancelled()) {
self.stop_reason = Some(ExplorerStopReason::Cancelled);
self.emit_progress(stats, now);
return true;
}
if self.deadline.is_some_and(|deadline| now >= deadline) {
self.stop_reason = Some(ExplorerStopReason::TimedOut);
self.emit_progress(stats, now);
return true;
}
false
}
fn emit_progress(&mut self, stats: &ExplorerStats, now: Instant) {
self.last_progress = now;
if let Some(progress) = self.progress.as_deref_mut() {
progress(ExplorerProgress {
stats: stats.clone(),
elapsed: now.duration_since(self.started),
});
}
}
fn emit_matched_row(&mut self, realtime_usec: u64, rows_matched: u64) -> bool {
if let Some(matched_row) = self.matched_row.as_deref_mut() {
return matched_row(realtime_usec, rows_matched);
}
false
}
fn adjust_realtime(&mut self, realtime_usec: u64) -> u64 {
self.adjust_realtime
.as_deref_mut()
.map_or(realtime_usec, |adjust_realtime| {
adjust_realtime(realtime_usec)
})
}
}
impl Default for ExplorerControl<'_> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ExplorerSamplingDecision {
Full {
sampled: bool,
},
SkipFields,
StopAndEstimate {
remaining_rows: u64,
from_realtime_usec: u64,
to_realtime_usec: u64,
},
}
#[derive(Debug)]
pub(crate) struct ExplorerSamplingState {
start_realtime_usec: u64,
end_realtime_usec: u64,
file_head_realtime_usec: u64,
file_tail_realtime_usec: u64,
file_head_seqnum: u64,
file_tail_seqnum: u64,
file_entries: u64,
first_realtime_usec: Option<u64>,
step_realtime_usec: u64,
enable_after_samples: u64,
per_file_enable_after_samples: u64,
per_slot_enable_after_samples: u64,
sampled: u64,
per_file_sampled: u64,
per_file_unsampled: u64,
per_file_every: u64,
per_file_skipped: u64,
per_file_recalibrate: u64,
per_slot_sampled: Vec<u64>,
per_slot_unsampled: Vec<u64>,
matched_files: u64,
direction: Direction,
}
impl ExplorerSamplingState {
pub(crate) fn for_query(
query: &ExplorerQuery,
histogram_bucket_count: Option<usize>,
) -> Option<Self> {
let sampling = query.sampling?;
let start_realtime_usec = query.after_realtime_usec?;
let end_realtime_usec = query.before_realtime_usec?;
if sampling.budget == 0
|| sampling.matched_files == 0
|| start_realtime_usec >= end_realtime_usec
{
return None;
}
let slots = histogram_bucket_count
.unwrap_or(query.histogram_target_buckets)
.clamp(2, EXPLORER_SAMPLING_SLOTS_MAX);
let delta = end_realtime_usec.saturating_sub(start_realtime_usec);
let step_realtime_usec = (delta / slots as u64).saturating_sub(1).max(1);
let per_file_enable_after_samples =
((sampling.budget / 4) / sampling.matched_files.max(1)).max(query.limit as u64);
let per_slot_enable_after_samples =
((sampling.budget / 4) / slots as u64).max(query.limit as u64);
Some(Self {
start_realtime_usec,
end_realtime_usec,
file_head_realtime_usec: sampling.file_head_realtime_usec,
file_tail_realtime_usec: sampling.file_tail_realtime_usec,
file_head_seqnum: sampling.file_head_seqnum,
file_tail_seqnum: sampling.file_tail_seqnum,
file_entries: sampling.file_entries,
first_realtime_usec: None,
step_realtime_usec,
enable_after_samples: sampling.budget / 2,
per_file_enable_after_samples,
per_slot_enable_after_samples,
sampled: 0,
per_file_sampled: 0,
per_file_unsampled: 0,
per_file_every: 0,
per_file_skipped: 0,
per_file_recalibrate: 0,
per_slot_sampled: vec![0; slots],
per_slot_unsampled: vec![0; slots],
matched_files: sampling.matched_files.max(1),
direction: query.direction,
})
}
fn begin_file(&mut self, sampling: ExplorerSampling) {
self.file_head_realtime_usec = sampling.file_head_realtime_usec;
self.file_tail_realtime_usec = sampling.file_tail_realtime_usec;
self.file_head_seqnum = sampling.file_head_seqnum;
self.file_tail_seqnum = sampling.file_tail_seqnum;
self.file_entries = sampling.file_entries;
self.first_realtime_usec = None;
self.per_file_sampled = 0;
self.per_file_unsampled = 0;
self.per_file_every = 0;
self.per_file_skipped = 0;
self.per_file_recalibrate = 0;
}
fn decide(
&mut self,
realtime_usec: u64,
seqnum: u64,
candidate_to_keep: bool,
) -> ExplorerSamplingDecision {
if self.first_realtime_usec.is_none() {
self.first_realtime_usec = Some(realtime_usec);
}
if candidate_to_keep {
return ExplorerSamplingDecision::Full { sampled: false };
}
let slot = self.slot_for_realtime(realtime_usec);
let should_sample = if self.sampled < self.enable_after_samples
|| self.per_file_sampled < self.per_file_enable_after_samples
|| self.per_slot_sampled[slot] < self.per_slot_enable_after_samples
{
true
} else if self.per_file_recalibrate >= EXPLORER_SAMPLING_RECALIBRATE_ROWS
|| self.per_file_every == 0
{
self.recalibrate(realtime_usec, seqnum);
true
} else if self.per_file_skipped >= self.per_file_every {
self.per_file_skipped = 0;
true
} else {
self.per_file_skipped = self.per_file_skipped.saturating_add(1);
false
};
if should_sample {
self.sampled = self.sampled.saturating_add(1);
self.per_file_sampled = self.per_file_sampled.saturating_add(1);
self.per_slot_sampled[slot] = self.per_slot_sampled[slot].saturating_add(1);
return ExplorerSamplingDecision::Full { sampled: true };
}
self.per_file_recalibrate = self.per_file_recalibrate.saturating_add(1);
self.per_file_unsampled = self.per_file_unsampled.saturating_add(1);
self.per_slot_unsampled[slot] = self.per_slot_unsampled[slot].saturating_add(1);
if self.per_file_unsampled > self.per_file_sampled
&& self.progress_by_time(realtime_usec) > EXPLORER_SAMPLING_ESTIMATE_AFTER_PROGRESS
{
let remaining_rows = self.estimate_remaining_rows(realtime_usec, seqnum);
let (from_realtime_usec, to_realtime_usec) = self.remaining_range(realtime_usec);
return ExplorerSamplingDecision::StopAndEstimate {
remaining_rows,
from_realtime_usec,
to_realtime_usec,
};
}
ExplorerSamplingDecision::SkipFields
}
fn slot_for_realtime(&self, realtime_usec: u64) -> usize {
let clamped = realtime_usec.clamp(self.start_realtime_usec, self.end_realtime_usec);
let slot =
(clamped.saturating_sub(self.start_realtime_usec) / self.step_realtime_usec) as usize;
slot.min(self.per_slot_sampled.len().saturating_sub(1))
}
fn recalibrate(&mut self, realtime_usec: u64, seqnum: u64) {
let remaining_rows = self.estimate_remaining_rows(realtime_usec, seqnum);
let wanted_samples = (self.enable_after_samples / self.matched_files).max(1);
self.per_file_every = (remaining_rows / wanted_samples).max(1);
self.per_file_recalibrate = 0;
}
fn estimate_remaining_rows(&self, realtime_usec: u64, seqnum: u64) -> u64 {
if let Some(remaining) = self.estimate_remaining_rows_by_seqnum(seqnum) {
return remaining;
}
self.estimate_remaining_rows_by_time(realtime_usec)
}
fn estimate_remaining_rows_by_seqnum(&self, seqnum: u64) -> Option<u64> {
self.validate_seqnum_estimate_inputs(seqnum)?;
let scanned_rows = self.scanned_file_rows();
let seqnum_span = self.seqnum_span_so_far(seqnum)?;
if seqnum_span == 0 {
return None;
}
let proportion_of_all_lines_so_far =
bounded_positive_proportion(scanned_rows as f64 / seqnum_span as f64)?;
let expected_matching_logs =
(proportion_of_all_lines_so_far * self.file_entries as f64) as u64;
if expected_matching_logs == 0 {
return None;
}
Some(expected_matching_logs.saturating_sub(scanned_rows).max(1))
}
fn validate_seqnum_estimate_inputs(&self, seqnum: u64) -> Option<()> {
(self.file_entries != 0
&& self.file_head_seqnum != 0
&& self.file_tail_seqnum != 0
&& seqnum != 0)
.then_some(())
}
fn scanned_file_rows(&self) -> u64 {
self.per_file_sampled
.saturating_add(self.per_file_unsampled)
.max(1)
}
fn seqnum_span_so_far(&self, seqnum: u64) -> Option<u64> {
match self.direction {
Direction::Forward => seqnum.checked_sub(self.file_head_seqnum),
Direction::Backward => self.file_tail_seqnum.checked_sub(seqnum),
}
}
fn estimate_remaining_rows_by_time(&self, realtime_usec: u64) -> u64 {
let scanned_rows = self.scanned_file_rows();
let (after, before) = self.overlapping_timeframe(realtime_usec);
let total_time = self
.remaining_time_bounds(realtime_usec, after, before)
.0
.max(1);
let remaining_time = self.remaining_time_bounds(realtime_usec, after, before).1;
let elapsed = total_time.saturating_sub(remaining_time).max(1);
let mut proportion_by_time = elapsed as f64 / total_time as f64;
if proportion_by_time == 0.0 || proportion_by_time > 1.0 || !proportion_by_time.is_finite()
{
proportion_by_time = 1.0;
}
let mut expected_total = (scanned_rows as f64 / proportion_by_time) as u64;
if self.file_entries != 0 && expected_total > self.file_entries {
expected_total = self.file_entries;
}
expected_total.saturating_sub(scanned_rows).max(1)
}
fn progress_by_time(&self, realtime_usec: u64) -> f64 {
let (after, before) = self.overlapping_timeframe(realtime_usec);
let total_time = before.saturating_sub(after).max(1);
let elapsed = match self.direction {
Direction::Forward => realtime_usec.saturating_sub(after),
Direction::Backward => before.saturating_sub(realtime_usec),
}
.min(total_time);
elapsed as f64 / total_time as f64
}
fn overlapping_timeframe(&self, realtime_usec: u64) -> (u64, u64) {
match self.direction {
Direction::Forward => {
let mut oldest = self
.first_realtime_usec
.or((self.file_head_realtime_usec != 0).then_some(self.file_head_realtime_usec))
.unwrap_or(self.start_realtime_usec);
let mut newest = if self.file_tail_realtime_usec != 0 {
self.end_realtime_usec.min(self.file_tail_realtime_usec)
} else {
self.end_realtime_usec
};
if newest <= oldest {
newest = oldest.saturating_add(1);
}
if realtime_usec < oldest {
oldest = realtime_usec.saturating_sub(1);
}
(oldest, newest)
}
Direction::Backward => {
let mut newest = self
.first_realtime_usec
.or((self.file_tail_realtime_usec != 0).then_some(self.file_tail_realtime_usec))
.unwrap_or(self.end_realtime_usec);
let oldest = if self.file_head_realtime_usec != 0 {
self.start_realtime_usec.max(self.file_head_realtime_usec)
} else {
self.start_realtime_usec
};
if newest <= oldest {
newest = oldest.saturating_add(1);
}
if newest < realtime_usec {
newest = realtime_usec.saturating_add(1);
}
(oldest, newest)
}
}
}
fn remaining_range(&self, realtime_usec: u64) -> (u64, u64) {
let (after, before) = self.overlapping_timeframe(realtime_usec);
let (_, _, remaining_start, remaining_end) =
self.remaining_time_details(realtime_usec, after, before);
(remaining_start, remaining_end)
}
fn remaining_time_bounds(&self, realtime_usec: u64, after: u64, before: u64) -> (u64, u64) {
let (total, remaining, _, _) = self.remaining_time_details(realtime_usec, after, before);
(total, remaining)
}
fn remaining_time_details(
&self,
realtime_usec: u64,
mut after: u64,
mut before: u64,
) -> (u64, u64, u64, u64) {
if realtime_usec <= after {
after = realtime_usec.saturating_sub(1);
}
if realtime_usec >= before {
before = realtime_usec.saturating_add(1);
}
if before <= after {
before = after.saturating_add(1);
}
let (remaining_start, remaining_end) = match self.direction {
Direction::Forward => (realtime_usec, before),
Direction::Backward => (after, realtime_usec),
};
(
before.saturating_sub(after).max(1),
remaining_end.saturating_sub(remaining_start),
remaining_start,
remaining_end,
)
}
}
pub(crate) fn histogram_bucket_count_for_query(query: &ExplorerQuery) -> Option<usize> {
query
.histogram
.as_deref()
.map(|field| new_histogram(field, query).buckets.len())
}
#[derive(Default)]
struct RowScan {
timestamp: Option<u64>,
fts_matches: bool,
fts_negative_match: bool,
column_fields: Vec<Vec<u8>>,
}
const FACET_PUBLIC: u8 = 0x01;
const FACET_HISTOGRAM: u8 = 0x02;
const FACET_SOURCE_REALTIME: u8 = 0x04;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum OffsetClass {
Irrelevant,
FtsMatch,
FtsNegativeMatch,
Value(usize),
}
impl OffsetClass {
const IRRELEVANT_RAW: usize = 1;
const FTS_MATCH_RAW: usize = 2;
const FTS_NEGATIVE_MATCH_RAW: usize = 3;
const VALUE_BASE: usize = 4;
fn to_raw(self) -> usize {
match self {
Self::Irrelevant => Self::IRRELEVANT_RAW,
Self::FtsMatch => Self::FTS_MATCH_RAW,
Self::FtsNegativeMatch => Self::FTS_NEGATIVE_MATCH_RAW,
Self::Value(index) => Self::VALUE_BASE.saturating_add(index),
}
}
fn from_raw(raw: usize) -> Self {
match raw {
Self::IRRELEVANT_RAW => Self::Irrelevant,
Self::FTS_MATCH_RAW => Self::FtsMatch,
Self::FTS_NEGATIVE_MATCH_RAW => Self::FtsNegativeMatch,
raw => Self::Value(raw.saturating_sub(Self::VALUE_BASE)),
}
}
}
#[derive(Clone, Copy, Debug, Default)]
struct OffsetClassSlot {
offset: u64,
class: usize,
}
#[derive(Debug)]
struct OffsetClassCache {
slots: Vec<OffsetClassSlot>,
len: usize,
}
impl Default for OffsetClassCache {
fn default() -> Self {
Self {
slots: vec![OffsetClassSlot::default(); 256],
len: 0,
}
}
}
impl OffsetClassCache {
fn lookup(&self, offset: NonZeroU64) -> Option<OffsetClass> {
let mask = self.slots.len().saturating_sub(1);
let mut index = offset_slot(offset.get()) & mask;
for _ in 0..self.slots.len() {
let slot = self.slots[index];
if slot.offset == 0 {
return None;
}
if slot.offset == offset.get() {
return Some(OffsetClass::from_raw(slot.class));
}
index = (index + 1) & mask;
}
None
}
fn insert(&mut self, offset: NonZeroU64, class: OffsetClass) {
if (self.len + 1).saturating_mul(4) >= self.slots.len().saturating_mul(3) {
self.grow();
}
self.insert_raw(offset.get(), class.to_raw());
}
fn grow(&mut self) {
let new_len = self.slots.len().saturating_mul(2).max(256);
let old = std::mem::replace(&mut self.slots, vec![OffsetClassSlot::default(); new_len]);
self.len = 0;
for slot in old {
if slot.offset != 0 {
self.insert_raw(slot.offset, slot.class);
}
}
}
fn insert_raw(&mut self, offset: u64, class: usize) {
let mask = self.slots.len().saturating_sub(1);
let mut index = offset_slot(offset) & mask;
loop {
if self.slots[index].offset == 0 {
self.slots[index] = OffsetClassSlot { offset, class };
self.len += 1;
return;
}
if self.slots[index].offset == offset {
self.slots[index].class = class;
return;
}
index = (index + 1) & mask;
}
}
}
fn offset_slot(offset: u64) -> usize {
let mut value = offset >> 3;
value ^= value >> 33;
value = value.wrapping_mul(0xff51afd7ed558ccd);
value ^= value >> 33;
value as usize
}
struct ExplorerAccumulator {
field_lookup: HashMap<Vec<u8>, usize>,
fields: Vec<Vec<u8>>,
flags: Vec<u8>,
last_seen_row_ids: Vec<u64>,
unset_counts: Vec<u64>,
values_by_field: Vec<Vec<usize>>,
value_counts: Vec<u64>,
value_field_indices: Vec<usize>,
value_labels: Vec<Vec<u8>>,
value_fts_matches: Vec<bool>,
value_source_realtime: Vec<Option<u64>>,
value_histogram_buckets: Vec<Option<Vec<u64>>>,
field_histogram_unset_buckets: Vec<Option<Vec<u64>>>,
offset_cache: OffsetClassCache,
histogram_start_realtime_usec: u64,
histogram_bucket_width_usec: u64,
histogram_bucket_count: usize,
required_identity_count: usize,
}
impl ExplorerAccumulator {
fn for_main(query: &ExplorerQuery, histogram: Option<&ExplorerHistogram>) -> Self {
Self::for_combined(query, &[], histogram)
}
fn for_facets(
query: &ExplorerQuery,
facet_indices: &[usize],
include_source_realtime: bool,
) -> Self {
let mut out = Self::new(None);
for facet_index in facet_indices {
if let Some(field) = query.facets.get(*facet_index) {
out.add_field(field, FACET_PUBLIC);
}
}
if include_source_realtime {
out.add_field(SOURCE_REALTIME_FIELD, FACET_SOURCE_REALTIME);
}
out
}
fn for_combined(
query: &ExplorerQuery,
facet_indices: &[usize],
histogram: Option<&ExplorerHistogram>,
) -> Self {
let mut out = Self::new(histogram);
if let Some(field) = &query.histogram {
out.add_field(field, FACET_HISTOGRAM);
}
for facet_index in facet_indices {
if let Some(field) = query.facets.get(*facet_index) {
out.add_field(field, FACET_PUBLIC);
}
}
if query_needs_source_realtime_main(query) || facet_pass_needs_source_realtime(query) {
out.add_field(SOURCE_REALTIME_FIELD, FACET_SOURCE_REALTIME);
}
out
}
fn new(histogram: Option<&ExplorerHistogram>) -> Self {
Self {
field_lookup: HashMap::new(),
fields: Vec::new(),
flags: Vec::new(),
last_seen_row_ids: Vec::new(),
unset_counts: Vec::new(),
values_by_field: Vec::new(),
value_counts: Vec::new(),
value_field_indices: Vec::new(),
value_labels: Vec::new(),
value_fts_matches: Vec::new(),
value_source_realtime: Vec::new(),
value_histogram_buckets: Vec::new(),
field_histogram_unset_buckets: Vec::new(),
offset_cache: OffsetClassCache::default(),
histogram_start_realtime_usec: histogram
.and_then(|histogram| histogram.buckets.first())
.map(|bucket| bucket.start_realtime_usec)
.unwrap_or_default(),
histogram_bucket_width_usec: histogram
.and_then(|histogram| histogram.buckets.first())
.map(|bucket| {
bucket
.end_realtime_usec
.saturating_sub(bucket.start_realtime_usec)
.max(1)
})
.unwrap_or(1),
histogram_bucket_count: histogram
.map(|histogram| histogram.buckets.len())
.unwrap_or_default(),
required_identity_count: 0,
}
}
fn add_field(&mut self, field: &[u8], flags: u8) {
if let Some(index) = self.field_lookup.get(field).copied() {
let had_required = self.flags[index] != 0;
self.flags[index] |= flags;
if flags & FACET_HISTOGRAM != 0 && self.field_histogram_unset_buckets[index].is_none() {
self.field_histogram_unset_buckets[index] =
Some(vec![0; self.histogram_bucket_count]);
}
if !had_required && self.flags[index] != 0 {
self.required_identity_count += 1;
}
return;
}
let index = self.fields.len();
self.field_lookup.insert(field.to_vec(), index);
self.fields.push(field.to_vec());
self.flags.push(flags);
self.last_seen_row_ids.push(0);
self.unset_counts.push(0);
self.values_by_field.push(Vec::new());
self.field_histogram_unset_buckets
.push((flags & FACET_HISTOGRAM != 0).then(|| vec![0; self.histogram_bucket_count]));
if flags != 0 {
self.required_identity_count += 1;
}
}
fn add_value(
&mut self,
field_index: usize,
_data_offset: NonZeroU64,
value: &[u8],
fts_matches: bool,
) -> usize {
let value_index = self.value_counts.len();
let flags = self.flags[field_index];
self.value_counts.push(0);
self.value_field_indices.push(field_index);
self.value_labels.push(value.to_vec());
self.value_fts_matches.push(fts_matches);
self.value_source_realtime
.push(if flags & FACET_SOURCE_REALTIME != 0 {
parse_source_realtime(value)
} else {
None
});
self.value_histogram_buckets
.push((flags & FACET_HISTOGRAM != 0).then(|| vec![0; self.histogram_bucket_count]));
self.values_by_field[field_index].push(value_index);
value_index
}
fn mark_field_seen(&mut self, field_index: usize, row_id: u64) -> bool {
if self.last_seen_row_ids[field_index] == row_id {
return false;
}
self.last_seen_row_ids[field_index] = row_id;
true
}
fn apply_value(
&mut self,
value_index: usize,
realtime_usec: Option<u64>,
stats: &mut ExplorerStats,
) {
let field_index = self.value_field_indices[value_index];
let flags = self.flags[field_index];
if flags & FACET_PUBLIC != 0 {
self.value_counts[value_index] = self.value_counts[value_index].saturating_add(1);
stats.facet_updates = stats.facet_updates.saturating_add(1);
}
if flags & FACET_HISTOGRAM != 0 {
if let (Some(realtime_usec), Some(buckets)) = (
realtime_usec,
self.value_histogram_buckets[value_index].as_mut(),
) {
if let Some(bucket_index) = histogram_bucket_index_from_bounds(
realtime_usec,
self.histogram_start_realtime_usec,
self.histogram_bucket_width_usec,
buckets.len(),
) {
buckets[bucket_index] = buckets[bucket_index].saturating_add(1);
stats.histogram_updates = stats.histogram_updates.saturating_add(1);
}
}
}
}
fn finish_facet_row(&mut self, row_id: u64, stats: &mut ExplorerStats) {
for field_index in 0..self.fields.len() {
if self.flags[field_index] & FACET_PUBLIC == 0 {
continue;
}
if self.last_seen_row_ids[field_index] != row_id {
self.unset_counts[field_index] = self.unset_counts[field_index].saturating_add(1);
stats.facet_updates = stats.facet_updates.saturating_add(1);
}
}
}
fn finish_histogram_row(&mut self, row_id: u64, realtime_usec: u64, stats: &mut ExplorerStats) {
for field_index in 0..self.fields.len() {
if self.flags[field_index] & FACET_HISTOGRAM == 0 {
continue;
}
if self.last_seen_row_ids[field_index] == row_id {
continue;
}
let Some(buckets) = self.field_histogram_unset_buckets[field_index].as_mut() else {
continue;
};
if let Some(bucket_index) = histogram_bucket_index_from_bounds(
realtime_usec,
self.histogram_start_realtime_usec,
self.histogram_bucket_width_usec,
buckets.len(),
) {
buckets[bucket_index] = buckets[bucket_index].saturating_add(1);
stats.histogram_updates = stats.histogram_updates.saturating_add(1);
}
}
}
fn finish_facets(&self, result: &mut ExplorerResult) {
for field_index in 0..self.fields.len() {
if self.flags[field_index] & FACET_PUBLIC == 0 {
continue;
}
let mut values = HashMap::new();
for value_index in &self.values_by_field[field_index] {
let count = self.value_counts[*value_index];
if count != 0 {
increment_counter_by(&mut values, &self.value_labels[*value_index], count);
}
}
if self.unset_counts[field_index] != 0 {
increment_counter_by(&mut values, UNSET_VALUE, self.unset_counts[field_index]);
}
result
.facets
.insert(self.fields[field_index].clone(), values);
}
}
fn finish_histogram(&self, histogram: Option<&mut ExplorerHistogram>) {
let Some(histogram) = histogram else {
return;
};
for buckets in self.field_histogram_unset_buckets.iter().flatten() {
for (bucket_index, count) in buckets.iter().enumerate() {
if *count == 0 {
continue;
}
if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
increment_counter_by(&mut bucket.values, UNSET_VALUE, *count);
}
}
}
for value_index in 0..self.value_histogram_buckets.len() {
let Some(buckets) = &self.value_histogram_buckets[value_index] else {
continue;
};
for (bucket_index, count) in buckets.iter().enumerate() {
if *count == 0 {
continue;
}
if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
increment_counter_by(
&mut bucket.values,
&self.value_labels[value_index],
*count,
);
}
}
}
}
}
fn bounded_positive_proportion(value: f64) -> Option<f64> {
if value <= 0.0 || !value.is_finite() {
return None;
}
Some(value.min(1.0))
}
impl FileReader {
pub fn explore(&mut self, query: &ExplorerQuery) -> Result<ExplorerResult> {
self.explore_with_strategy(query, ExplorerStrategy::Traversal)
}
pub fn explore_with_strategy(
&mut self,
query: &ExplorerQuery,
strategy: ExplorerStrategy,
) -> Result<ExplorerResult> {
self.explore_with_strategy_and_payload_mode(query, strategy, ExplorerRowPayloadMode::Expand)
}
pub fn explore_with_strategy_and_control(
&mut self,
query: &ExplorerQuery,
strategy: ExplorerStrategy,
control: &mut ExplorerControl<'_>,
) -> Result<ExplorerResult> {
validate_no_debug_column_collection(query)?;
self.explore_with_strategy_and_payload_mode_unchecked(
query,
strategy,
ExplorerRowPayloadMode::Expand,
Some(control),
)
}
#[cfg(test)]
pub(crate) fn explore_with_strategy_cursor_rows(
&mut self,
query: &ExplorerQuery,
strategy: ExplorerStrategy,
) -> Result<ExplorerResult> {
self.explore_with_strategy_and_payload_mode(
query,
strategy,
ExplorerRowPayloadMode::CursorOnly,
)
}
pub(crate) fn explore_with_strategy_cursor_rows_controlled(
&mut self,
query: &ExplorerQuery,
strategy: ExplorerStrategy,
control: &mut ExplorerControl<'_>,
) -> Result<ExplorerResult> {
validate_no_debug_column_collection(query)?;
self.explore_with_strategy_and_payload_mode_unchecked(
query,
strategy,
ExplorerRowPayloadMode::CursorOnly,
Some(control),
)
}
fn explore_with_strategy_and_payload_mode(
&mut self,
query: &ExplorerQuery,
strategy: ExplorerStrategy,
row_payload_mode: ExplorerRowPayloadMode,
) -> Result<ExplorerResult> {
validate_no_debug_column_collection(query)?;
self.explore_with_strategy_and_payload_mode_unchecked(
query,
strategy,
row_payload_mode,
None,
)
}
fn explore_with_strategy_and_payload_mode_unchecked(
&mut self,
query: &ExplorerQuery,
strategy: ExplorerStrategy,
row_payload_mode: ExplorerRowPayloadMode,
mut control: Option<&mut ExplorerControl<'_>>,
) -> Result<ExplorerResult> {
match strategy {
ExplorerStrategy::Traversal => {
self.explore_traversal(query, row_payload_mode, control.as_deref_mut())
}
ExplorerStrategy::Index => {
self.explore_indexed(query, row_payload_mode, control.as_deref_mut())
}
ExplorerStrategy::Compare => self.explore_compare(query, row_payload_mode),
}
}
fn explore_traversal(
&mut self,
query: &ExplorerQuery,
row_payload_mode: ExplorerRowPayloadMode,
mut control: Option<&mut ExplorerControl<'_>>,
) -> Result<ExplorerResult> {
validate_query(query)?;
let mut result = explorer_result_for_query(query);
let facet_groups = facet_pass_groups(query);
if can_run_combined_explorer_pass(&facet_groups) {
self.explore_traversal_combined(
query,
row_payload_mode,
&mut result,
&facet_groups,
control.as_deref_mut(),
)?;
} else {
self.explore_traversal_split(
query,
row_payload_mode,
&mut result,
facet_groups,
control.as_deref_mut(),
)?;
}
self.configure_explorer_filters(query, None)?;
Ok(result)
}
fn explore_traversal_combined(
&mut self,
query: &ExplorerQuery,
row_payload_mode: ExplorerRowPayloadMode,
result: &mut ExplorerResult,
facet_groups: &[FacetPassGroup],
control: Option<&mut ExplorerControl<'_>>,
) -> Result<()> {
let facet_indices = combined_facet_indices(facet_groups);
if query_needs_main_pass(query) || !facet_indices.is_empty() {
self.configure_explorer_filters(query, None)?;
let mut accumulator =
ExplorerAccumulator::for_combined(query, &facet_indices, result.histogram.as_ref());
self.scan_explorer_combined(
query,
&mut accumulator,
result,
!facet_indices.is_empty(),
row_payload_mode,
control,
)?;
accumulator.finish_facets(result);
accumulator.finish_histogram(result.histogram.as_mut());
}
Ok(())
}
fn explore_traversal_split(
&mut self,
query: &ExplorerQuery,
row_payload_mode: ExplorerRowPayloadMode,
result: &mut ExplorerResult,
facet_groups: Vec<FacetPassGroup>,
mut control: Option<&mut ExplorerControl<'_>>,
) -> Result<()> {
if query_needs_main_pass(query) {
self.configure_explorer_filters(query, None)?;
let mut accumulator = ExplorerAccumulator::for_main(query, result.histogram.as_ref());
self.scan_explorer_main(
query,
&mut accumulator,
result,
row_payload_mode,
control.as_deref_mut(),
)?;
accumulator.finish_histogram(result.histogram.as_mut());
}
for group in facet_groups {
if explorer_control_stopped(control.as_deref()) {
break;
}
self.configure_explorer_filters(query, group.excluded_field.as_deref())?;
let mut accumulator = ExplorerAccumulator::for_facets(
query,
&group.facet_indices,
facet_pass_needs_source_realtime(query),
);
self.scan_explorer_facet(
query,
&mut accumulator,
&mut result.stats,
control.as_deref_mut(),
)?;
accumulator.finish_facets(result);
}
Ok(())
}
fn explore_compare(
&mut self,
query: &ExplorerQuery,
row_payload_mode: ExplorerRowPayloadMode,
) -> Result<ExplorerResult> {
let traversal_started = Instant::now();
let traversal = self.explore_traversal(query, row_payload_mode, None)?;
let traversal_duration = traversal_started.elapsed();
let index_started = Instant::now();
let mut indexed = self.explore_indexed(query, row_payload_mode, None)?;
let index_duration = index_started.elapsed();
if !explorer_outputs_match(&traversal, &indexed) {
return Err(SdkError::VerificationError(
"indexed explorer output differs from traversal explorer output".to_string(),
));
}
indexed.comparison = Some(ExplorerComparison {
traversal_duration,
index_duration,
traversal_stats: traversal.stats,
index_stats: indexed.stats.clone(),
});
Ok(indexed)
}
fn explore_indexed(
&mut self,
query: &ExplorerQuery,
row_payload_mode: ExplorerRowPayloadMode,
mut control: Option<&mut ExplorerControl<'_>>,
) -> Result<ExplorerResult> {
validate_query(query)?;
validate_indexed_query(query)?;
let mut result = explorer_result_for_query(query);
self.indexed_collect_rows(query, row_payload_mode, &mut result, control.as_deref_mut())?;
self.indexed_collect_facets(query, &mut result, control.as_deref())?;
self.indexed_collect_histogram(query, &mut result, control.as_deref())?;
self.configure_explorer_filters(query, None)?;
Ok(result)
}
fn indexed_collect_rows(
&mut self,
query: &ExplorerQuery,
row_payload_mode: ExplorerRowPayloadMode,
result: &mut ExplorerResult,
control: Option<&mut ExplorerControl<'_>>,
) -> Result<()> {
if query.limit == 0 {
return Ok(());
}
let mut row_query = query.clone();
row_query.facets.clear();
row_query.histogram = None;
self.configure_explorer_filters(&row_query, None)?;
let mut accumulator = ExplorerAccumulator::for_main(&row_query, None);
self.scan_explorer_main(
&row_query,
&mut accumulator,
result,
row_payload_mode,
control,
)
}
fn indexed_collect_facets(
&mut self,
query: &ExplorerQuery,
result: &mut ExplorerResult,
control: Option<&ExplorerControl<'_>>,
) -> Result<()> {
if explorer_control_stopped(control) {
return Ok(());
}
for group in facet_pass_groups(query) {
let candidates = self.indexed_candidate_set(query, group.excluded_field.as_deref())?;
self.inner.with_file(|file| {
indexed_count_facet_group(file, query, &group, &candidates, result)
})?;
}
Ok(())
}
fn indexed_collect_histogram(
&mut self,
query: &ExplorerQuery,
result: &mut ExplorerResult,
control: Option<&ExplorerControl<'_>>,
) -> Result<()> {
if query.histogram.is_none() || explorer_control_stopped(control) {
return Ok(());
}
let candidates = self.indexed_candidate_set(query, None)?;
self.inner
.with_file(|file| indexed_count_histogram(file, query, &candidates, result))
}
fn indexed_candidate_set(
&mut self,
query: &ExplorerQuery,
excluded_field: Option<&[u8]>,
) -> Result<IndexedCandidateSet> {
if query.filters.is_empty()
&& query.after_realtime_usec.is_none()
&& query.before_realtime_usec.is_none()
{
let count = self
.inner
.with_file(|file| file.journal_header_ref().n_entries);
return Ok(IndexedCandidateSet::All { count });
}
self.configure_explorer_filters(query, excluded_field)?;
self.seek_for_explorer(query);
let mut offsets = HashSet::new();
while self.step_for_explorer(query.direction)? {
let Some(metadata) = self.row.metadata() else {
continue;
};
let commit_realtime = metadata.realtime;
if stop_by_commit_time(query, commit_realtime) {
break;
}
if !timestamp_in_range(query, commit_realtime) {
continue;
}
if let Some(entry_offset) = self.row.entry_offset() {
offsets.insert(entry_offset);
}
}
Ok(IndexedCandidateSet::Set {
count: offsets.len() as u64,
offsets,
})
}
fn configure_explorer_filters(
&mut self,
query: &ExplorerQuery,
excluded_field: Option<&[u8]>,
) -> Result<()> {
self.flush_matches();
for filter in &query.filters {
if excluded_field.is_some_and(|field| field == filter.field.as_slice()) {
continue;
}
if filter.values.is_empty() {
continue;
}
for value in &filter.values {
let payload = payload_from_parts(&filter.field, value);
self.add_match(&payload);
}
}
Ok(())
}
fn next_explorer_row_frame(
&mut self,
query: &ExplorerQuery,
rows_seen: &mut u64,
stats: &ExplorerStats,
control: Option<&mut ExplorerControl<'_>>,
) -> Result<ExplorerLoopStep> {
if !self.step_for_explorer(query.direction)? {
return Ok(ExplorerLoopStep::Stop);
}
*rows_seen = rows_seen.saturating_add(1);
if control.is_some_and(|control| control.should_stop_after_rows(*rows_seen, stats)) {
return Ok(ExplorerLoopStep::Stop);
}
let Some(metadata) = self.row.metadata() else {
return Ok(ExplorerLoopStep::Skip);
};
if stop_by_commit_time(query, metadata.realtime) {
return Ok(ExplorerLoopStep::Stop);
}
if skip_by_commit_time(query, metadata.realtime) {
return Ok(ExplorerLoopStep::Skip);
}
Ok(ExplorerLoopStep::Row(ExplorerRowFrame {
commit_realtime: metadata.realtime,
seqnum: metadata.seqnum,
}))
}
fn scan_row_data_or_default(
&mut self,
query: &ExplorerQuery,
accumulator: &mut ExplorerAccumulator,
row_id: &mut u64,
deferred_values: &mut Vec<usize>,
stats: &mut ExplorerStats,
) -> Result<RowScan> {
if accumulator.required_identity_count == 0 && !query_has_fts(query) {
stats.rows_examined = stats.rows_examined.saturating_add(1);
return Ok(RowScan::default());
}
*row_id = row_id.saturating_add(1);
deferred_values.clear();
self.scan_current_row(
query,
accumulator,
*row_id,
ScanApply::Deferred(deferred_values),
stats,
)
}
fn accepted_effective_realtime(
query: &ExplorerQuery,
scan: &RowScan,
commit_realtime: u64,
stats: &mut ExplorerStats,
control: Option<&mut ExplorerControl<'_>>,
) -> Option<u64> {
let mut effective_realtime = effective_realtime_from_scan(scan.timestamp, commit_realtime);
record_source_realtime_delta(stats, scan.timestamp, commit_realtime);
if let Some(control) = control {
effective_realtime = control.adjust_realtime(effective_realtime);
}
(timestamp_in_range(query, effective_realtime) && !row_rejected_by_fts(query, scan))
.then_some(effective_realtime)
}
fn push_explorer_row_if_wanted(
&mut self,
query: &ExplorerQuery,
result: &mut ExplorerResult,
row_payload_mode: ExplorerRowPayloadMode,
effective_realtime: u64,
) -> Result<()> {
if row_within_anchor(query, effective_realtime) && result.rows.len() < query.limit {
result.rows.push(self.current_explorer_row(
effective_realtime,
&mut result.stats,
row_payload_mode,
)?);
}
Ok(())
}
fn apply_main_scanned_row(
&mut self,
query: &ExplorerQuery,
accumulator: &mut ExplorerAccumulator,
result: &mut ExplorerResult,
row_payload_mode: ExplorerRowPayloadMode,
scanned: MainScannedRow<'_>,
control: Option<&mut ExplorerControl<'_>>,
) -> Result<bool> {
if query.debug_collect_column_fields_by_row_traversal {
result.column_fields.extend(scanned.scan.column_fields);
}
record_last_realtime(&mut result.stats, scanned.commit_realtime);
result.stats.rows_matched = result.stats.rows_matched.saturating_add(1);
let stop_after_matched_row = control.is_some_and(|control| {
control.emit_matched_row(scanned.effective_realtime, result.stats.rows_matched)
});
for value_index in scanned.deferred_values {
accumulator.apply_value(
*value_index,
Some(scanned.effective_realtime),
&mut result.stats,
);
}
accumulator.finish_histogram_row(
scanned.row_id,
scanned.effective_realtime,
&mut result.stats,
);
self.push_explorer_row_if_wanted(
query,
result,
row_payload_mode,
scanned.effective_realtime,
)?;
Ok(stop_after_matched_row
|| should_stop_when_rows_full(
query,
&result.rows,
scanned.effective_realtime,
result.stats.rows_matched,
))
}
fn sampling_state_for_combined(
query: &ExplorerQuery,
result: &ExplorerResult,
control: Option<&mut ExplorerControl<'_>>,
) -> Option<ExplorerSamplingState> {
let sampling = ExplorerSamplingState::for_query(
query,
result
.histogram
.as_ref()
.map(|histogram| histogram.buckets.len()),
);
if let Some(control) = control {
if let (Some(shared_sampling), Some(file_sampling)) =
(control.sampling.as_deref_mut(), query.sampling)
{
shared_sampling.begin_file(file_sampling);
}
}
sampling
}
fn combined_sampling_decision(
query: &ExplorerQuery,
rows: &[ExplorerRow],
frame: ExplorerRowFrame,
sampling: &mut Option<ExplorerSamplingState>,
mut control: Option<&mut ExplorerControl<'_>>,
) -> Option<ExplorerSamplingDecision> {
let candidate_to_keep = if let Some(control) = control.as_deref_mut() {
control.candidate_row.as_deref_mut().map_or_else(
|| row_candidate_to_keep(query, rows, frame.commit_realtime),
|candidate_row| candidate_row(frame.commit_realtime),
)
} else {
row_candidate_to_keep(query, rows, frame.commit_realtime)
};
if let Some(control) = control {
if let Some(shared_sampling) = control.sampling.as_deref_mut() {
return Some(shared_sampling.decide(
frame.commit_realtime,
frame.seqnum,
candidate_to_keep,
));
}
}
sampling
.as_mut()
.map(|sampling| sampling.decide(frame.commit_realtime, frame.seqnum, candidate_to_keep))
}
fn apply_combined_sampling_decision(
decision: ExplorerSamplingDecision,
mode: CombinedScanMode,
result: &mut ExplorerResult,
frame: ExplorerRowFrame,
) -> SamplingRowAction {
match decision {
ExplorerSamplingDecision::Full { sampled } => {
if sampled {
result.stats.sampling_sampled = result.stats.sampling_sampled.saturating_add(1);
}
SamplingRowAction::Scan
}
ExplorerSamplingDecision::SkipFields => {
record_combined_unsampled_row(
&mut result.stats,
mode,
frame.commit_realtime,
1,
true,
);
add_special_histogram_value(
result.histogram.as_mut(),
frame.commit_realtime,
EXPLORER_UNSAMPLED_VALUE,
1,
&mut result.stats,
);
SamplingRowAction::Skip
}
ExplorerSamplingDecision::StopAndEstimate {
remaining_rows,
from_realtime_usec,
to_realtime_usec,
} => {
record_combined_unsampled_row(
&mut result.stats,
mode,
frame.commit_realtime,
remaining_rows,
false,
);
result.stats.rows_estimated =
result.stats.rows_estimated.saturating_add(remaining_rows);
result.stats.sampling_estimated = result
.stats
.sampling_estimated
.saturating_add(remaining_rows);
add_estimated_histogram_range(
result.histogram.as_mut(),
from_realtime_usec,
to_realtime_usec,
remaining_rows,
&mut result.stats,
);
SamplingRowAction::Stop
}
}
}
fn apply_combined_scanned_row(
&mut self,
query: &ExplorerQuery,
accumulator: &mut ExplorerAccumulator,
result: &mut ExplorerResult,
row_payload_mode: ExplorerRowPayloadMode,
mode: CombinedScanMode,
scanned: MainScannedRow<'_>,
control: Option<&mut ExplorerControl<'_>>,
) -> Result<bool> {
if query.debug_collect_column_fields_by_row_traversal {
result.column_fields.extend(scanned.scan.column_fields);
}
record_last_realtime(&mut result.stats, scanned.commit_realtime);
let stop_after_matched_row = update_combined_matched_stats(
&mut result.stats,
mode,
scanned.effective_realtime,
control,
);
let value_realtime = query
.histogram
.is_some()
.then_some(scanned.effective_realtime);
for value_index in scanned.deferred_values {
accumulator.apply_value(*value_index, value_realtime, &mut result.stats);
}
if query.histogram.is_some() {
accumulator.finish_histogram_row(
scanned.row_id,
scanned.effective_realtime,
&mut result.stats,
);
}
if mode.include_facets {
accumulator.finish_facet_row(scanned.row_id, &mut result.stats);
}
self.push_explorer_row_if_wanted(
query,
result,
row_payload_mode,
scanned.effective_realtime,
)?;
Ok(stop_after_matched_row
|| should_stop_when_rows_full(
query,
&result.rows,
scanned.effective_realtime,
result.stats.rows_matched,
))
}
fn scan_explorer_main(
&mut self,
query: &ExplorerQuery,
accumulator: &mut ExplorerAccumulator,
result: &mut ExplorerResult,
row_payload_mode: ExplorerRowPayloadMode,
mut control: Option<&mut ExplorerControl<'_>>,
) -> Result<()> {
self.seek_for_explorer(query);
let mut row_id = 0u64;
let mut rows_seen = 0u64;
let mut deferred_values = Vec::new();
loop {
let frame = match self.next_explorer_row_frame(
query,
&mut rows_seen,
&result.stats,
control.as_deref_mut(),
)? {
ExplorerLoopStep::Stop => break,
ExplorerLoopStep::Skip => continue,
ExplorerLoopStep::Row(frame) => frame,
};
let scan = self.scan_row_data_or_default(
query,
accumulator,
&mut row_id,
&mut deferred_values,
&mut result.stats,
)?;
let Some(effective_realtime) = Self::accepted_effective_realtime(
query,
&scan,
frame.commit_realtime,
&mut result.stats,
control.as_deref_mut(),
) else {
continue;
};
let scanned = MainScannedRow {
row_id,
commit_realtime: frame.commit_realtime,
effective_realtime,
scan,
deferred_values: &deferred_values,
};
if self.apply_main_scanned_row(
query,
accumulator,
result,
row_payload_mode,
scanned,
control.as_deref_mut(),
)? {
break;
}
}
result.stats.rows_returned = result.rows.len() as u64;
Ok(())
}
fn scan_explorer_combined(
&mut self,
query: &ExplorerQuery,
accumulator: &mut ExplorerAccumulator,
result: &mut ExplorerResult,
include_facets: bool,
row_payload_mode: ExplorerRowPayloadMode,
mut control: Option<&mut ExplorerControl<'_>>,
) -> Result<()> {
self.seek_for_explorer(query);
let mode = CombinedScanMode {
include_main: query_needs_main_pass(query),
include_facets,
};
let mut row_id = 0u64;
let mut rows_seen = 0u64;
let mut deferred_values = Vec::new();
let mut sampling = Self::sampling_state_for_combined(query, result, control.as_deref_mut());
loop {
let frame = match self.next_explorer_row_frame(
query,
&mut rows_seen,
&result.stats,
control.as_deref_mut(),
)? {
ExplorerLoopStep::Stop => break,
ExplorerLoopStep::Skip => continue,
ExplorerLoopStep::Row(frame) => frame,
};
if let Some(decision) = Self::combined_sampling_decision(
query,
&result.rows,
frame,
&mut sampling,
control.as_deref_mut(),
) {
match Self::apply_combined_sampling_decision(decision, mode, result, frame) {
SamplingRowAction::Scan => {}
SamplingRowAction::Skip => continue,
SamplingRowAction::Stop => break,
}
}
let scan = self.scan_row_data_or_default(
query,
accumulator,
&mut row_id,
&mut deferred_values,
&mut result.stats,
)?;
let Some(effective_realtime) = Self::accepted_effective_realtime(
query,
&scan,
frame.commit_realtime,
&mut result.stats,
control.as_deref_mut(),
) else {
continue;
};
let scanned = MainScannedRow {
row_id,
commit_realtime: frame.commit_realtime,
effective_realtime,
scan,
deferred_values: &deferred_values,
};
if self.apply_combined_scanned_row(
query,
accumulator,
result,
row_payload_mode,
mode,
scanned,
control.as_deref_mut(),
)? {
break;
}
}
result.stats.rows_returned = result.rows.len() as u64;
Ok(())
}
fn scan_explorer_facet(
&mut self,
query: &ExplorerQuery,
accumulator: &mut ExplorerAccumulator,
stats: &mut ExplorerStats,
mut control: Option<&mut ExplorerControl<'_>>,
) -> Result<()> {
self.seek_for_explorer(query);
let defer_apply = query.after_realtime_usec.is_some()
|| query.before_realtime_usec.is_some()
|| query_has_fts(query);
let mut row_id = 0u64;
let mut rows_seen = 0u64;
let mut deferred_values = Vec::new();
loop {
let frame = match self.next_explorer_row_frame(
query,
&mut rows_seen,
stats,
control.as_deref_mut(),
)? {
ExplorerLoopStep::Stop => break,
ExplorerLoopStep::Skip => continue,
ExplorerLoopStep::Row(frame) => frame,
};
row_id = row_id.saturating_add(1);
deferred_values.clear();
let scan = if defer_apply {
self.scan_current_row(
query,
accumulator,
row_id,
ScanApply::Deferred(&mut deferred_values),
stats,
)?
} else {
self.scan_current_row(query, accumulator, row_id, ScanApply::Immediate, stats)?
};
if Self::accepted_effective_realtime(query, &scan, frame.commit_realtime, stats, None)
.is_none()
{
continue;
}
record_last_realtime(stats, frame.commit_realtime);
stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(1);
if defer_apply {
for value_index in &deferred_values {
accumulator.apply_value(*value_index, None, stats);
}
}
accumulator.finish_facet_row(row_id, stats);
}
Ok(())
}
fn scan_current_row(
&mut self,
query: &ExplorerQuery,
accumulator: &mut ExplorerAccumulator,
row_id: u64,
mut apply: ScanApply<'_>,
stats: &mut ExplorerStats,
) -> Result<RowScan> {
stats.rows_examined = stats.rows_examined.saturating_add(1);
let mut out = RowScan::default();
let mut state = RowScanState::new(query, accumulator);
let inner = &mut self.inner;
let row = &mut self.row;
inner.with_mut(|fields| {
fields.reader.release_object_guards();
row.restart_data()?;
let result = (|| {
for index in 0..row.data_offset_count() {
let Some(data_offset) = row.data_offset_at(index) else {
break;
};
stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
let class = classify_data_for_accumulator(
fields.file,
row,
data_offset,
accumulator,
state.needs_fts,
query,
query
.debug_collect_column_fields_by_row_traversal
.then_some(&mut out.column_fields),
stats,
)?;
handle_row_offset_class(
class,
accumulator,
row_id,
&mut state,
&mut out,
&mut apply,
stats,
);
if state.should_stop_row_scan() {
record_row_scan_early_stop(stats);
break;
}
}
Ok::<_, SdkError>(())
})();
row.reset_data_state(fields.file)?;
result
})?;
Ok(out)
}
fn seek_for_explorer(&mut self, query: &ExplorerQuery) {
let anchor = if query.stop_when_rows_full {
query.anchor
} else {
ExplorerAnchor::Auto
};
match query.direction {
Direction::Forward => match anchor {
ExplorerAnchor::Auto => {
if let Some(after) = query.after_realtime_usec {
self.seek_realtime(after.saturating_sub(query.realtime_slack_usec));
} else {
self.seek_head();
}
}
ExplorerAnchor::Realtime(usec) => self.seek_realtime(usec),
ExplorerAnchor::Tail => self.seek_tail(),
ExplorerAnchor::Head => {
if let Some(after) = query.after_realtime_usec {
self.seek_realtime(after.saturating_sub(query.realtime_slack_usec));
} else {
self.seek_head();
}
}
},
Direction::Backward => match anchor {
ExplorerAnchor::Auto => {
if let Some(before) = query.before_realtime_usec {
self.seek_realtime(before.saturating_add(query.realtime_slack_usec));
} else {
self.seek_tail();
}
}
ExplorerAnchor::Realtime(usec) => self.seek_realtime(usec),
ExplorerAnchor::Head => self.seek_head(),
ExplorerAnchor::Tail => {
if let Some(before) = query.before_realtime_usec {
self.seek_realtime(before.saturating_add(query.realtime_slack_usec));
} else {
self.seek_tail();
}
}
},
}
}
fn step_for_explorer(&mut self, direction: Direction) -> Result<bool> {
match direction {
Direction::Forward => self.next(),
Direction::Backward => self.previous(),
}
}
fn current_explorer_row(
&mut self,
realtime_usec: u64,
stats: &mut ExplorerStats,
row_payload_mode: ExplorerRowPayloadMode,
) -> Result<ExplorerRow> {
let cursor = self.get_cursor()?;
let mut payloads = Vec::new();
if row_payload_mode == ExplorerRowPayloadMode::Expand {
self.collect_entry_payloads(&mut payloads)?;
stats.returned_row_expansions = stats.returned_row_expansions.saturating_add(1);
}
Ok(ExplorerRow {
realtime_usec,
cursor,
payloads,
})
}
}
enum ScanApply<'a> {
Immediate,
Deferred(&'a mut Vec<usize>),
}
#[derive(Debug, Clone, Copy)]
struct ExplorerRowFrame {
commit_realtime: u64,
seqnum: u64,
}
enum ExplorerLoopStep {
Stop,
Skip,
Row(ExplorerRowFrame),
}
#[derive(Debug, Clone, Copy)]
struct CombinedScanMode {
include_main: bool,
include_facets: bool,
}
struct MainScannedRow<'a> {
row_id: u64,
commit_realtime: u64,
effective_realtime: u64,
scan: RowScan,
deferred_values: &'a [usize],
}
struct RowScanState {
use_first_value: bool,
needs_fts: bool,
collect_column_fields: bool,
fields_missing_from_row: usize,
}
impl RowScanState {
fn new(query: &ExplorerQuery, accumulator: &ExplorerAccumulator) -> Self {
let use_first_value = query.field_mode == ExplorerFieldMode::FirstValue;
Self {
use_first_value,
needs_fts: query_has_fts(query),
collect_column_fields: query.debug_collect_column_fields_by_row_traversal,
fields_missing_from_row: if use_first_value {
accumulator.required_identity_count
} else {
0
},
}
}
fn should_stop_row_scan(&self) -> bool {
self.use_first_value
&& !self.needs_fts
&& !self.collect_column_fields
&& self.fields_missing_from_row == 0
}
}
enum SamplingRowAction {
Scan,
Skip,
Stop,
}
enum IndexedCandidateSet {
All {
count: u64,
},
Set {
count: u64,
offsets: HashSet<NonZeroU64>,
},
}
impl IndexedCandidateSet {
fn count(&self) -> u64 {
match self {
Self::All { count } | Self::Set { count, .. } => *count,
}
}
fn contains(&self, entry_offset: NonZeroU64) -> bool {
match self {
Self::All { .. } => true,
Self::Set { offsets, .. } => offsets.contains(&entry_offset),
}
}
}
struct FacetPassGroup {
excluded_field: Option<Vec<u8>>,
facet_indices: Vec<usize>,
}
fn facet_pass_groups(query: &ExplorerQuery) -> Vec<FacetPassGroup> {
let filter_fields: HashSet<&[u8]> = query
.filters
.iter()
.map(|filter| filter.field.as_slice())
.collect();
let mut groups: Vec<FacetPassGroup> = Vec::new();
for (index, facet) in query.facets.iter().enumerate() {
let excluded_field = (query.exclude_facet_field_filters
&& filter_fields.contains(facet.as_slice()))
.then(|| facet.clone());
if let Some(existing) = groups
.iter_mut()
.find(|group| group.excluded_field.as_deref() == excluded_field.as_deref())
{
existing.facet_indices.push(index);
} else {
groups.push(FacetPassGroup {
excluded_field,
facet_indices: vec![index],
});
}
}
groups
}
fn indexed_count_facet_group(
file: &JournalFile<Mmap>,
query: &ExplorerQuery,
group: &FacetPassGroup,
candidates: &IndexedCandidateSet,
result: &mut ExplorerResult,
) -> Result<()> {
result.stats.facet_rows_matched = result
.stats
.facet_rows_matched
.saturating_add(candidates.count());
for facet_index in &group.facet_indices {
let Some(field) = query.facets.get(*facet_index) else {
continue;
};
let mut values = HashMap::new();
let mut rows_with_field = HashSet::new();
let mut decompressed = Vec::new();
for item in file.field_data_objects_with_offsets(field)? {
let (_, data) = item?;
let Some((value, cursor)) =
indexed_value_and_cursor(&data, field, &mut decompressed, &mut result.stats)?
else {
continue;
};
drop(data);
let count = indexed_count_facet_entries(
file,
cursor,
candidates,
&mut rows_with_field,
&mut result.stats,
)?;
if count == 0 {
continue;
}
increment_counter_by(&mut values, &value, count);
result.stats.facet_updates = result.stats.facet_updates.saturating_add(count);
}
let unset = candidates
.count()
.saturating_sub(rows_with_field.len() as u64);
if unset != 0 {
increment_counter_by(&mut values, UNSET_VALUE, unset);
result.stats.facet_updates = result.stats.facet_updates.saturating_add(unset);
}
result.facets.insert(field.clone(), values);
}
Ok(())
}
fn indexed_count_histogram(
file: &JournalFile<Mmap>,
query: &ExplorerQuery,
candidates: &IndexedCandidateSet,
result: &mut ExplorerResult,
) -> Result<()> {
let Some(histogram) = result.histogram.as_mut() else {
return Ok(());
};
let field = histogram.field.clone();
let mut decompressed = Vec::new();
let mut rows_with_field = HashSet::new();
for item in file.field_data_objects_with_offsets(&field)? {
let (_, data) = item?;
let Some((value, cursor)) =
indexed_value_and_cursor(&data, &field, &mut decompressed, &mut result.stats)?
else {
continue;
};
drop(data);
indexed_count_histogram_entries(
file,
cursor,
candidates,
&value,
histogram,
query,
&mut rows_with_field,
&mut result.stats,
)?;
}
indexed_count_histogram_unset_entries(
file,
candidates,
&rows_with_field,
histogram,
query,
&mut result.stats,
)?;
Ok(())
}
fn indexed_value_and_cursor(
data: &DataObject<&[u8]>,
field: &[u8],
decompressed: &mut Vec<u8>,
stats: &mut ExplorerStats,
) -> Result<Option<(Vec<u8>, Option<InlinedCursor>)>> {
stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
stats.data_payloads_loaded = stats.data_payloads_loaded.saturating_add(1);
let payload = if data.is_compressed() {
decompressed.clear();
let len = data.decompress(decompressed)?;
stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
&decompressed[..len]
} else {
data.raw_payload()
};
let Some((payload_field, value)) = split_payload_bytes(payload) else {
return Ok(None);
};
if payload_field != field {
return Ok(None);
}
Ok(Some((value.to_vec(), data.inlined_cursor())))
}
fn indexed_count_facet_entries(
file: &JournalFile<Mmap>,
cursor: Option<InlinedCursor>,
candidates: &IndexedCandidateSet,
rows_with_field: &mut HashSet<NonZeroU64>,
stats: &mut ExplorerStats,
) -> Result<u64> {
let mut count = 0u64;
indexed_visit_entries(file, cursor, |entry_offset| {
stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
if candidates.contains(entry_offset) {
count = count.saturating_add(1);
rows_with_field.insert(entry_offset);
}
Ok(())
})?;
Ok(count)
}
fn indexed_count_histogram_entries(
file: &JournalFile<Mmap>,
cursor: Option<InlinedCursor>,
candidates: &IndexedCandidateSet,
value: &[u8],
histogram: &mut ExplorerHistogram,
query: &ExplorerQuery,
rows_with_field: &mut HashSet<NonZeroU64>,
stats: &mut ExplorerStats,
) -> Result<()> {
let histogram_start = histogram
.buckets
.first()
.map(|bucket| bucket.start_realtime_usec)
.unwrap_or_default();
let histogram_bucket_width = histogram
.buckets
.first()
.map(|bucket| {
bucket
.end_realtime_usec
.saturating_sub(bucket.start_realtime_usec)
.max(1)
})
.unwrap_or(1);
let histogram_bucket_count = histogram.buckets.len();
indexed_visit_entries(file, cursor, |entry_offset| {
stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
if !candidates.contains(entry_offset) {
return Ok(());
}
let entry = file.entry_ref(entry_offset)?;
let realtime = entry.header.realtime;
drop(entry);
rows_with_field.insert(entry_offset);
if !timestamp_in_range(query, realtime) {
return Ok(());
}
let Some(bucket_index) = histogram_bucket_index_from_bounds(
realtime,
histogram_start,
histogram_bucket_width,
histogram_bucket_count,
) else {
return Ok(());
};
if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
increment_counter_by(&mut bucket.values, value, 1);
stats.histogram_updates = stats.histogram_updates.saturating_add(1);
}
Ok(())
})
}
fn indexed_count_histogram_unset_entries(
file: &JournalFile<Mmap>,
candidates: &IndexedCandidateSet,
rows_with_field: &HashSet<NonZeroU64>,
histogram: &mut ExplorerHistogram,
query: &ExplorerQuery,
stats: &mut ExplorerStats,
) -> Result<()> {
let histogram_start = histogram
.buckets
.first()
.map(|bucket| bucket.start_realtime_usec)
.unwrap_or_default();
let histogram_bucket_width = histogram
.buckets
.first()
.map(|bucket| {
bucket
.end_realtime_usec
.saturating_sub(bucket.start_realtime_usec)
.max(1)
})
.unwrap_or(1);
let histogram_bucket_count = histogram.buckets.len();
let mut visit = |entry_offset: NonZeroU64| -> Result<()> {
if rows_with_field.contains(&entry_offset) {
return Ok(());
}
let entry = file.entry_ref(entry_offset)?;
let realtime = entry.header.realtime;
drop(entry);
if !timestamp_in_range(query, realtime) {
return Ok(());
}
let Some(bucket_index) = histogram_bucket_index_from_bounds(
realtime,
histogram_start,
histogram_bucket_width,
histogram_bucket_count,
) else {
return Ok(());
};
if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
increment_counter_by(&mut bucket.values, UNSET_VALUE, 1);
stats.histogram_updates = stats.histogram_updates.saturating_add(1);
}
Ok(())
};
match candidates {
IndexedCandidateSet::All { .. } => {
let mut entry_offsets = Vec::new();
file.entry_offsets(&mut entry_offsets)?;
for entry_offset in entry_offsets {
visit(entry_offset)?;
}
}
IndexedCandidateSet::Set { offsets, .. } => {
for entry_offset in offsets {
visit(*entry_offset)?;
}
}
}
Ok(())
}
fn indexed_visit_entries<F>(
file: &JournalFile<Mmap>,
cursor: Option<InlinedCursor>,
mut visitor: F,
) -> Result<()>
where
F: FnMut(NonZeroU64) -> Result<()>,
{
let Some(mut cursor) = cursor.map(|cursor| cursor.head()) else {
return Ok(());
};
let mut needle = NonZeroU64::MIN;
while let Some(entry_offset) = cursor.next_until(file, needle)? {
visitor(entry_offset)?;
let Some(next) = entry_offset.get().checked_add(1).and_then(NonZeroU64::new) else {
break;
};
needle = next;
}
Ok(())
}
fn handle_row_offset_class(
class: OffsetClass,
accumulator: &mut ExplorerAccumulator,
row_id: u64,
state: &mut RowScanState,
out: &mut RowScan,
apply: &mut ScanApply<'_>,
stats: &mut ExplorerStats,
) {
match class {
OffsetClass::Irrelevant => {
stats.data_refs_skipped = stats.data_refs_skipped.saturating_add(1);
}
OffsetClass::FtsNegativeMatch => {
out.fts_negative_match = true;
}
OffsetClass::FtsMatch => {
out.fts_matches = true;
}
OffsetClass::Value(value_index) => {
handle_row_value_class(value_index, accumulator, row_id, state, out, apply, stats)
}
}
}
fn handle_row_value_class(
value_index: usize,
accumulator: &mut ExplorerAccumulator,
row_id: u64,
state: &mut RowScanState,
out: &mut RowScan,
apply: &mut ScanApply<'_>,
stats: &mut ExplorerStats,
) {
if accumulator.value_fts_matches[value_index] {
out.fts_matches = true;
}
let field_index = accumulator.value_field_indices[value_index];
let first_for_field = if state.use_first_value
|| accumulator.flags[field_index] & (FACET_PUBLIC | FACET_HISTOGRAM) != 0
{
accumulator.mark_field_seen(field_index, row_id)
} else {
true
};
if state.use_first_value && first_for_field {
state.fields_missing_from_row = state.fields_missing_from_row.saturating_sub(1);
}
if !state.use_first_value || first_for_field {
if let Some(timestamp) = accumulator.value_source_realtime[value_index] {
out.timestamp = Some(timestamp);
}
match apply {
ScanApply::Immediate => accumulator.apply_value(value_index, None, stats),
ScanApply::Deferred(values) => values.push(value_index),
}
}
}
fn record_row_scan_early_stop(stats: &mut ExplorerStats) {
stats.early_stop_opportunities = stats.early_stop_opportunities.saturating_add(1);
stats.early_stops = stats.early_stops.saturating_add(1);
}
fn cached_offset_class_for_accumulator(
file: &JournalFile<Mmap>,
row: &mut CurrentRowView,
data_offset: NonZeroU64,
accumulator: &ExplorerAccumulator,
column_fields: Option<&mut Vec<Vec<u8>>>,
stats: &mut ExplorerStats,
) -> Result<Option<OffsetClass>> {
let Some(class) = accumulator.offset_cache.lookup(data_offset) else {
return Ok(None);
};
if let Some(column_fields) = column_fields {
if let Some((field, _)) = read_payload_field(file, row, data_offset, stats)? {
column_fields.push(field);
}
}
stats.data_cache_hits = stats.data_cache_hits.saturating_add(1);
Ok(Some(class))
}
fn payload_for_classification<'a>(
file: &JournalFile<Mmap>,
row: &'a mut CurrentRowView,
data_offset: NonZeroU64,
stats: &mut ExplorerStats,
) -> Result<&'a [u8]> {
stats.data_cache_misses = stats.data_cache_misses.saturating_add(1);
stats.data_payloads_loaded = stats.data_payloads_loaded.saturating_add(1);
let was_compressed = file.data_ref(data_offset)?.is_compressed();
let payload = row.read_payload_at(file, data_offset)?;
if was_compressed {
stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
}
Ok(row.payload_slice(payload))
}
fn fts_flags_for_value(
value: &[u8],
needs_fts: bool,
query: &ExplorerQuery,
stats: &mut ExplorerStats,
) -> (bool, bool) {
if !needs_fts {
return (false, false);
}
stats.fts_scans = stats.fts_scans.saturating_add(1);
match match_fts_query(value, query) {
FtsTermMatch::Positive => (true, false),
FtsTermMatch::Negative => (false, true),
FtsTermMatch::None => (false, false),
}
}
fn structured_payload_class(
field: &[u8],
value: &[u8],
data_offset: NonZeroU64,
accumulator: &mut ExplorerAccumulator,
fts_matches: bool,
fts_negative_match: bool,
) -> OffsetClass {
if fts_negative_match {
OffsetClass::FtsNegativeMatch
} else if let Some(field_index) = accumulator.field_lookup.get(field).copied() {
OffsetClass::Value(accumulator.add_value(field_index, data_offset, value, fts_matches))
} else if fts_matches {
OffsetClass::FtsMatch
} else {
OffsetClass::Irrelevant
}
}
fn classify_data_for_accumulator(
file: &JournalFile<Mmap>,
row: &mut CurrentRowView,
data_offset: NonZeroU64,
accumulator: &mut ExplorerAccumulator,
needs_fts: bool,
query: &ExplorerQuery,
mut column_fields: Option<&mut Vec<Vec<u8>>>,
stats: &mut ExplorerStats,
) -> Result<OffsetClass> {
if let Some(class) = cached_offset_class_for_accumulator(
file,
row,
data_offset,
accumulator,
column_fields.as_mut().map(|fields| &mut **fields),
stats,
)? {
return Ok(class);
}
let payload = payload_for_classification(file, row, data_offset, stats)?;
let Some((field, value)) = split_payload_bytes(payload) else {
let class = classify_unstructured_payload(payload, needs_fts, query, stats);
accumulator.offset_cache.insert(data_offset, class);
stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
return Ok(class);
};
if let Some(column_fields) = column_fields {
column_fields.push(field.to_vec());
}
let (fts_matches, fts_negative_match) = fts_flags_for_value(value, needs_fts, query, stats);
let class = structured_payload_class(
field,
value,
data_offset,
accumulator,
fts_matches,
fts_negative_match,
);
accumulator.offset_cache.insert(data_offset, class);
stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
Ok(class)
}
fn read_payload_field(
file: &JournalFile<Mmap>,
row: &mut CurrentRowView,
data_offset: NonZeroU64,
stats: &mut ExplorerStats,
) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let was_compressed = file.data_ref(data_offset)?.is_compressed();
let payload = row.read_payload_at(file, data_offset)?;
if was_compressed {
stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
}
let payload = row.payload_slice(payload);
Ok(split_payload_bytes(payload).map(|(field, value)| (field.to_vec(), value.to_vec())))
}
fn classify_unstructured_payload(
payload: &[u8],
needs_fts: bool,
query: &ExplorerQuery,
stats: &mut ExplorerStats,
) -> OffsetClass {
if !needs_fts {
return OffsetClass::Irrelevant;
}
stats.fts_scans = stats.fts_scans.saturating_add(1);
match match_fts_query(payload, query) {
FtsTermMatch::Positive => OffsetClass::FtsMatch,
FtsTermMatch::Negative => OffsetClass::FtsNegativeMatch,
FtsTermMatch::None => OffsetClass::Irrelevant,
}
}
fn histogram_bucket_index_from_bounds(
realtime_usec: u64,
start_realtime_usec: u64,
bucket_width_usec: u64,
bucket_count: usize,
) -> Option<usize> {
if bucket_count == 0 {
return None;
}
realtime_usec
.saturating_sub(start_realtime_usec)
.checked_div(bucket_width_usec.max(1))
.map(|index| (index as usize).min(bucket_count - 1))
}
fn validate_query(query: &ExplorerQuery) -> Result<()> {
if query
.after_realtime_usec
.zip(query.before_realtime_usec)
.is_some_and(|(after, before)| after > before)
{
return Err(SdkError::InvalidPath(
"after_realtime_usec must be <= before_realtime_usec".to_string(),
));
}
for filter in &query.filters {
if filter.field.is_empty() || filter.field.contains(&b'=') {
return Err(SdkError::InvalidPath(
"filter field must be non-empty and must not contain '='".to_string(),
));
}
}
for field in query.facets.iter().chain(query.histogram.iter()) {
if field.is_empty() || field.contains(&b'=') {
return Err(SdkError::InvalidPath(
"facet and histogram fields must be non-empty and must not contain '='".to_string(),
));
}
}
let mut seen_facets: HashSet<&[u8]> = HashSet::new();
for facet in &query.facets {
if !seen_facets.insert(facet) {
return Err(SdkError::InvalidPath(
"facet fields must not be duplicated".to_string(),
));
}
}
Ok(())
}
fn validate_no_debug_column_collection(query: &ExplorerQuery) -> Result<()> {
if query.debug_collect_column_fields_by_row_traversal {
return Err(SdkError::Unsupported(
"debug_collect_column_fields_by_row_traversal is a debug-only discrepancy tool; production explorer queries must use FIELD-index column catalogs instead",
));
}
Ok(())
}
fn validate_indexed_query(query: &ExplorerQuery) -> Result<()> {
if query.field_mode != ExplorerFieldMode::AllValues {
return Err(SdkError::Unsupported(
"indexed explorer strategy requires ExplorerFieldMode::AllValues",
));
}
if query_has_fts(query) {
return Err(SdkError::Unsupported(
"indexed explorer strategy does not support FTS",
));
}
if query.use_source_realtime
&& (query.after_realtime_usec.is_some()
|| query.before_realtime_usec.is_some()
|| query.histogram.is_some())
{
return Err(SdkError::Unsupported(
"indexed explorer strategy requires commit realtime for time-bounded facets and histograms",
));
}
Ok(())
}
fn explorer_outputs_match(left: &ExplorerResult, right: &ExplorerResult) -> bool {
if left.rows.len() != right.rows.len() {
return false;
}
if left.rows.iter().zip(&right.rows).any(|(left, right)| {
left.realtime_usec != right.realtime_usec
|| left.cursor != right.cursor
|| left.payloads != right.payloads
}) {
return false;
}
if left.facets != right.facets {
return false;
}
explorer_histograms_match(left.histogram.as_ref(), right.histogram.as_ref())
}
fn explorer_histograms_match(
left: Option<&ExplorerHistogram>,
right: Option<&ExplorerHistogram>,
) -> bool {
match (left, right) {
(None, None) => true,
(Some(left), Some(right)) => {
left.field == right.field
&& left.buckets.len() == right.buckets.len()
&& left
.buckets
.iter()
.zip(&right.buckets)
.all(|(left, right)| {
left.start_realtime_usec == right.start_realtime_usec
&& left.end_realtime_usec == right.end_realtime_usec
&& left.values == right.values
})
}
_ => false,
}
}
fn query_needs_source_realtime_main(query: &ExplorerQuery) -> bool {
query.use_source_realtime
&& (query.after_realtime_usec.is_some()
|| query.before_realtime_usec.is_some()
|| query.histogram.is_some()
|| query.limit > 0)
}
fn facet_pass_needs_source_realtime(query: &ExplorerQuery) -> bool {
query.use_source_realtime
&& (query.after_realtime_usec.is_some() || query.before_realtime_usec.is_some())
}
fn query_needs_main_pass(query: &ExplorerQuery) -> bool {
query.limit > 0 || query.histogram.is_some()
}
fn explorer_result_for_query(query: &ExplorerQuery) -> ExplorerResult {
ExplorerResult {
histogram: query
.histogram
.as_ref()
.map(|field| new_histogram(field, query)),
..ExplorerResult::default()
}
}
fn explorer_control_stopped(control: Option<&ExplorerControl<'_>>) -> bool {
control.and_then(ExplorerControl::stop_reason).is_some()
}
fn can_run_combined_explorer_pass(facet_groups: &[FacetPassGroup]) -> bool {
facet_groups
.iter()
.all(|group| group.excluded_field.is_none())
}
fn combined_facet_indices(facet_groups: &[FacetPassGroup]) -> Vec<usize> {
facet_groups
.iter()
.flat_map(|group| group.facet_indices.iter().copied())
.collect()
}
fn record_combined_unsampled_row(
stats: &mut ExplorerStats,
mode: CombinedScanMode,
commit_realtime: u64,
row_count: u64,
count_rows_unsampled: bool,
) {
record_last_realtime(stats, commit_realtime);
if mode.include_main {
stats.rows_matched = stats.rows_matched.saturating_add(row_count);
}
if mode.include_facets {
stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(row_count);
}
if count_rows_unsampled {
stats.rows_unsampled = stats.rows_unsampled.saturating_add(row_count);
}
stats.sampling_unsampled = stats.sampling_unsampled.saturating_add(1);
}
fn update_combined_matched_stats(
stats: &mut ExplorerStats,
mode: CombinedScanMode,
effective_realtime: u64,
control: Option<&mut ExplorerControl<'_>>,
) -> bool {
let mut stop_after_matched_row = false;
if mode.include_main {
stats.rows_matched = stats.rows_matched.saturating_add(1);
stop_after_matched_row = control
.map(|control| control.emit_matched_row(effective_realtime, stats.rows_matched))
.unwrap_or(false);
}
if mode.include_facets {
stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(1);
}
stop_after_matched_row
}
fn should_stop_when_rows_full(
query: &ExplorerQuery,
rows: &[ExplorerRow],
effective_realtime: u64,
rows_matched: u64,
) -> bool {
if !query.stop_when_rows_full || query.limit == 0 || rows.len() < query.limit {
return false;
}
let every = query.stop_when_rows_full_check_every.max(1);
if rows_matched == 0 || rows_matched % every != 0 {
return false;
}
match query.direction {
Direction::Backward => {
rows.iter()
.map(|row| row.realtime_usec)
.min()
.is_some_and(|oldest| {
effective_realtime < oldest.saturating_sub(query.realtime_slack_usec)
})
}
Direction::Forward => {
rows.iter()
.map(|row| row.realtime_usec)
.max()
.is_some_and(|newest| {
effective_realtime > newest.saturating_add(query.realtime_slack_usec)
})
}
}
}
fn row_candidate_to_keep(query: &ExplorerQuery, rows: &[ExplorerRow], realtime_usec: u64) -> bool {
if query.limit == 0 {
return false;
}
if !row_within_anchor(query, realtime_usec) {
return false;
}
if rows.len() < query.limit {
return true;
}
match query.direction {
Direction::Backward => rows
.iter()
.map(|row| row.realtime_usec)
.min()
.is_some_and(|oldest| realtime_usec >= oldest),
Direction::Forward => rows
.iter()
.map(|row| row.realtime_usec)
.max()
.is_some_and(|newest| realtime_usec <= newest),
}
}
fn row_within_anchor(query: &ExplorerQuery, realtime_usec: u64) -> bool {
match (query.direction, query.anchor) {
(Direction::Forward, ExplorerAnchor::Realtime(anchor)) => realtime_usec > anchor,
(Direction::Backward, ExplorerAnchor::Realtime(anchor)) => realtime_usec <= anchor,
_ => true,
}
}
fn add_special_histogram_value(
histogram: Option<&mut ExplorerHistogram>,
realtime_usec: u64,
value: &[u8],
count: u64,
stats: &mut ExplorerStats,
) {
let Some(histogram) = histogram else {
return;
};
let Some(bucket_index) = histogram_bucket_index(histogram, realtime_usec) else {
return;
};
if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
increment_counter_by(&mut bucket.values, value, count);
stats.histogram_updates = stats.histogram_updates.saturating_add(1);
}
}
fn add_estimated_histogram_range(
histogram: Option<&mut ExplorerHistogram>,
from_realtime_usec: u64,
to_realtime_usec: u64,
entries: u64,
stats: &mut ExplorerStats,
) {
let Some(histogram) = histogram else {
return;
};
if entries == 0 || from_realtime_usec >= to_realtime_usec {
return;
}
let Some(first) = histogram.buckets.first() else {
return;
};
let Some(last) = histogram.buckets.last() else {
return;
};
let from_realtime_usec = from_realtime_usec.max(first.start_realtime_usec);
let to_realtime_usec = to_realtime_usec.min(last.end_realtime_usec);
if from_realtime_usec >= to_realtime_usec {
return;
}
let total = to_realtime_usec.saturating_sub(from_realtime_usec).max(1);
let mut touched = 0u64;
for bucket in &mut histogram.buckets {
if bucket.start_realtime_usec > to_realtime_usec {
break;
}
let overlap_start = bucket.start_realtime_usec.max(from_realtime_usec);
let overlap_end = bucket.end_realtime_usec.min(to_realtime_usec);
if overlap_start >= overlap_end {
continue;
}
let bucket_entries = ((overlap_end.saturating_sub(overlap_start) as u128 * entries as u128)
/ total as u128) as u64;
if bucket_entries != 0 {
increment_counter_by(&mut bucket.values, EXPLORER_ESTIMATED_VALUE, bucket_entries);
}
touched = touched.saturating_add(1);
}
stats.histogram_updates = stats.histogram_updates.saturating_add(touched);
}
fn histogram_bucket_index(histogram: &ExplorerHistogram, realtime_usec: u64) -> Option<usize> {
let first = histogram.buckets.first()?;
let width = first
.end_realtime_usec
.saturating_sub(first.start_realtime_usec)
.max(1);
histogram_bucket_index_from_bounds(
realtime_usec,
first.start_realtime_usec,
width,
histogram.buckets.len(),
)
}
fn payload_from_parts(field: &[u8], value: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(field.len() + 1 + value.len());
out.extend_from_slice(field);
out.push(b'=');
out.extend_from_slice(value);
out
}
fn split_payload_bytes(payload: &[u8]) -> Option<(&[u8], &[u8])> {
let eq = payload.iter().position(|byte| *byte == b'=')?;
Some((&payload[..eq], &payload[eq + 1..]))
}
fn parse_source_realtime(value: &[u8]) -> Option<u64> {
std::str::from_utf8(value).ok()?.parse().ok()
}
fn effective_realtime_from_scan(source_realtime: Option<u64>, commit_realtime: u64) -> u64 {
match source_realtime {
Some(source_realtime) if source_realtime != 0 && source_realtime < commit_realtime => {
source_realtime
}
_ => commit_realtime,
}
}
fn record_last_realtime(stats: &mut ExplorerStats, commit_realtime: u64) {
if commit_realtime > stats.last_realtime_usec {
stats.last_realtime_usec = commit_realtime;
}
}
fn record_source_realtime_delta(
stats: &mut ExplorerStats,
source_realtime: Option<u64>,
commit_realtime: u64,
) {
let Some(source_realtime) = source_realtime else {
return;
};
if source_realtime == 0 || source_realtime >= commit_realtime {
return;
}
let delta = commit_realtime.saturating_sub(source_realtime);
if delta > stats.max_source_realtime_delta_usec {
stats.max_source_realtime_delta_usec = delta;
}
}
fn query_has_fts(query: &ExplorerQuery) -> bool {
!query.fts_terms.is_empty()
|| !query.fts_patterns.is_empty()
|| !query.fts_negative_patterns.is_empty()
}
fn query_has_positive_fts(query: &ExplorerQuery) -> bool {
if !query.fts_terms.is_empty() {
query.fts_terms.iter().any(|term| !term.negative)
} else {
!query.fts_patterns.is_empty()
}
}
fn row_rejected_by_fts(query: &ExplorerQuery, scan: &RowScan) -> bool {
query_has_fts(query)
&& (scan.fts_negative_match || query_has_positive_fts(query) && !scan.fts_matches)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FtsTermMatch {
None,
Positive,
Negative,
}
fn match_fts_query(value: &[u8], query: &ExplorerQuery) -> FtsTermMatch {
if !query.fts_terms.is_empty() {
for term in &query.fts_terms {
if term.matches(value) {
return if term.negative {
FtsTermMatch::Negative
} else {
FtsTermMatch::Positive
};
}
}
return FtsTermMatch::None;
}
if matches_fts(value, &query.fts_negative_patterns) {
FtsTermMatch::Negative
} else if matches_fts(value, &query.fts_patterns) {
FtsTermMatch::Positive
} else {
FtsTermMatch::None
}
}
fn matches_fts(value: &[u8], patterns: &[Vec<u8>]) -> bool {
patterns
.iter()
.filter(|pattern| !pattern.is_empty())
.any(|pattern| contains_ascii_case_insensitive(value, pattern))
}
fn contains_ascii_case_insensitive(haystack: &[u8], needle: &[u8]) -> bool {
if needle.is_empty() {
return true;
}
if haystack.len() < needle.len() {
return false;
}
haystack.windows(needle.len()).any(|window| {
window
.iter()
.zip(needle)
.all(|(left, right)| left.eq_ignore_ascii_case(right))
})
}
fn find_ascii_case_insensitive(haystack: &[u8], needle: &[u8]) -> Option<usize> {
if needle.is_empty() {
return Some(0);
}
if haystack.len() < needle.len() {
return None;
}
haystack.windows(needle.len()).position(|window| {
window
.iter()
.zip(needle)
.all(|(left, right)| left.eq_ignore_ascii_case(right))
})
}
fn timestamp_in_range(query: &ExplorerQuery, timestamp: u64) -> bool {
if query
.after_realtime_usec
.is_some_and(|after| timestamp < after)
{
return false;
}
if query
.before_realtime_usec
.is_some_and(|before| timestamp > before)
{
return false;
}
true
}
fn stop_by_commit_time(query: &ExplorerQuery, commit_realtime: u64) -> bool {
match query.direction {
Direction::Forward => query.before_realtime_usec.is_some_and(|before| {
commit_realtime > before.saturating_add(query.realtime_slack_usec)
}),
Direction::Backward => query
.after_realtime_usec
.is_some_and(|after| commit_realtime < after),
}
}
fn skip_by_commit_time(query: &ExplorerQuery, commit_realtime: u64) -> bool {
match query.direction {
Direction::Forward => query
.after_realtime_usec
.is_some_and(|after| commit_realtime < after),
Direction::Backward => query.before_realtime_usec.is_some_and(|before| {
commit_realtime > before.saturating_add(query.realtime_slack_usec)
}),
}
}
fn new_histogram(field: &[u8], query: &ExplorerQuery) -> ExplorerHistogram {
let (start, end) = histogram_bounds(query);
let target_buckets = query.histogram_target_buckets.max(1);
let mut width = histogram_bar_width_usec(start, end, target_buckets);
let start = histogram_slot_baseline_usec(start, width);
let mut end = histogram_slot_baseline_usec(end, width).saturating_add(width);
let mut bucket_count = end
.saturating_sub(start)
.checked_div(width)
.unwrap_or(0)
.saturating_add(1) as usize;
if bucket_count > 1001 {
bucket_count = 1001;
width = end
.saturating_sub(start)
.checked_div(1000)
.unwrap_or(0)
.max(1);
end = start.saturating_add(width.saturating_mul(1000));
}
let mut buckets = Vec::with_capacity(bucket_count);
for index in 0..bucket_count {
let bucket_start = start.saturating_add(width.saturating_mul(index as u64));
let bucket_end = if index + 1 == bucket_count {
end.saturating_add(1)
} else {
bucket_start.saturating_add(width)
};
buckets.push(ExplorerHistogramBucket {
start_realtime_usec: bucket_start,
end_realtime_usec: bucket_end,
values: HashMap::new(),
});
}
ExplorerHistogram {
field: field.to_vec(),
buckets,
}
}
pub(crate) fn empty_histogram_for_query(field: &[u8], query: &ExplorerQuery) -> ExplorerHistogram {
new_histogram(field, query)
}
fn histogram_bar_width_usec(after: u64, before: u64, target_buckets: usize) -> u64 {
const USEC_PER_SEC: u64 = 1_000_000;
const VALID_DURATIONS_SECONDS: &[u64] = &[
1, 2, 5, 10, 15, 30, 60, 120, 180, 300, 600, 900, 1800, 3600, 7200, 21600, 28800, 43200,
86400, 172800, 259200, 432000, 604800, 1209600, 2592000,
];
let duration = before.saturating_sub(after);
for seconds in VALID_DURATIONS_SECONDS.iter().rev() {
let width = seconds.saturating_mul(USEC_PER_SEC);
if width != 0 && duration / width >= target_buckets as u64 {
return width;
}
}
USEC_PER_SEC
}
fn histogram_slot_baseline_usec(value: u64, width: u64) -> u64 {
value.saturating_sub(value % width.max(1))
}
fn histogram_bounds(query: &ExplorerQuery) -> (u64, u64) {
let start = query
.histogram_after_realtime_usec
.or(query.after_realtime_usec)
.unwrap_or(0);
let end = query
.histogram_before_realtime_usec
.or(query.before_realtime_usec)
.unwrap_or_else(|| start.saturating_add(3_600_000_000));
if end <= start {
(start, start.saturating_add(1))
} else {
(start, end)
}
}
fn increment_counter_by(map: &mut HashMap<Vec<u8>, u64>, value: &[u8], delta: u64) {
if let Some(count) = map.get_mut(value) {
*count = count.saturating_add(delta);
} else {
map.insert(value.to_vec(), delta);
}
}
#[cfg(test)]
mod tests {
use super::*;
use journal_core::file::{JournalFileOptions, JournalWriter, MmapMut};
use journal_core::repository::File as RepoFile;
use tempfile::TempDir;
fn test_uuid(seed: u8) -> uuid::Uuid {
uuid::Uuid::from_bytes([seed; 16])
}
fn create_writer(
path: &std::path::Path,
compression: Option<(Compression, usize)>,
) -> (JournalFile<MmapMut>, JournalWriter) {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).expect("create journal parent");
}
let repo_file = RepoFile::from_path(path).expect("repo file");
let mut options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
if let Some((compression, threshold)) = compression {
options = options
.with_compression(compression)
.with_compress_threshold(threshold);
}
let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
let writer = if let Some((compression, threshold)) = compression {
JournalWriter::new_with_compression(&mut file, 1, test_uuid(4), compression, threshold)
.expect("writer")
} else {
JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer")
};
(file, writer)
}
fn write_entries(
path: &std::path::Path,
compression: Option<(Compression, usize)>,
entries: &[(&[&[u8]], u64)],
) {
let (mut file, mut writer) = create_writer(path, compression);
for (payloads, realtime) in entries {
writer
.add_entry(&mut file, payloads, *realtime, *realtime)
.expect("write entry");
}
file.sync().expect("sync journal");
}
fn write_many_entries(path: &std::path::Path, count: usize) {
let (mut file, mut writer) = create_writer(path, None);
for index in 0..count {
let message = format!("MESSAGE=row-{index}");
let service = if index % 2 == 0 {
b"SERVICE=even".as_slice()
} else {
b"SERVICE=odd".as_slice()
};
let payloads: [&[u8]; 2] = [message.as_bytes(), service];
let realtime = 1_700_000_000_000_000u64.saturating_add(index as u64);
writer
.add_entry(&mut file, &payloads, realtime, realtime)
.expect("write entry");
}
file.sync().expect("sync journal");
}
#[test]
fn explorer_control_reports_progress_during_large_scan() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("progress.journal");
write_many_entries(&path, 9_000);
let mut reports = Vec::new();
let mut progress = |progress: ExplorerProgress| {
reports.push(progress.stats.rows_examined);
};
let mut control = ExplorerControl::new();
control.set_progress_interval(Duration::ZERO);
control.set_progress_callback(Some(&mut progress));
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
facets: vec![b"SERVICE".to_vec()],
limit: 0,
..ExplorerQuery::default()
};
let result = reader
.explore_with_strategy_and_control(&query, ExplorerStrategy::Traversal, &mut control)
.expect("explore");
assert_eq!(control.stop_reason(), None);
assert_eq!(result.stats.rows_examined, 9_000);
assert!(!reports.is_empty());
assert!(reports.iter().any(|rows| *rows >= 8_191));
}
#[test]
fn explorer_control_cancels_inside_large_scan() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("cancel.journal");
write_many_entries(&path, 9_000);
let is_cancelled = || true;
let mut control = ExplorerControl::new();
control.set_cancellation_callback(Some(&is_cancelled));
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
facets: vec![b"SERVICE".to_vec()],
limit: 0,
..ExplorerQuery::default()
};
let result = reader
.explore_with_strategy_and_control(&query, ExplorerStrategy::Traversal, &mut control)
.expect("explore");
assert_eq!(control.stop_reason(), Some(ExplorerStopReason::Cancelled));
assert!(result.stats.rows_examined < 9_000);
}
#[test]
fn explorer_filters_with_or_values_and_and_fields() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("filter.journal");
write_entries(
&path,
None,
&[
(&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
(&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
(&[b"SERVICE=b", b"PRIORITY=4"], 3_000),
],
);
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
filters: vec![
ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec(), b"b".to_vec()]),
ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
],
facets: vec![b"SERVICE".to_vec()],
limit: 10,
..ExplorerQuery::default()
};
let result = reader.explore(&query).expect("explore");
assert_eq!(result.rows.len(), 2);
let service = result
.facets
.get(b"SERVICE".as_slice())
.expect("service facet");
assert_eq!(service.get(b"a".as_slice()), Some(&1));
assert_eq!(service.get(b"b".as_slice()), Some(&1));
assert!(result.stats.data_cache_misses > 0);
}
#[test]
fn explorer_rejects_debug_row_traversal_column_collection() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("debug-column-collection.journal");
write_entries(&path, None, &[(&[b"PRIORITY=3", b"MESSAGE=hello"], 1_000)]);
let query = ExplorerQuery {
facets: vec![b"PRIORITY".to_vec()],
debug_collect_column_fields_by_row_traversal: true,
..ExplorerQuery::default()
};
let mut reader = FileReader::open(&path).expect("open reader");
let err = reader
.explore(&query)
.expect_err("debug-only column collection is rejected");
assert!(matches!(err, SdkError::Unsupported(_)));
assert!(
err.to_string()
.contains("debug_collect_column_fields_by_row_traversal")
);
let mut reader = FileReader::open(&path).expect("reopen reader");
let err = reader
.explore_with_strategy_cursor_rows(&query, ExplorerStrategy::Traversal)
.expect_err("cursor-row explorer also rejects debug-only column collection");
assert!(matches!(err, SdkError::Unsupported(_)));
}
#[test]
fn explorer_skips_irrelevant_compressed_data_for_facets() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("compressed.journal");
let large_message = b"MESSAGE=abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
write_entries(
&path,
Some((Compression::Zstd, 32)),
&[(&[b"PRIORITY=3", large_message], 1_000)],
);
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
facets: vec![b"PRIORITY".to_vec()],
limit: 0,
..ExplorerQuery::default()
};
let result = reader.explore(&query).expect("explore");
let priority = result
.facets
.get(b"PRIORITY".as_slice())
.expect("priority facet");
assert_eq!(priority.get(b"3".as_slice()), Some(&1));
assert_eq!(result.stats.payloads_decompressed, 0);
assert_eq!(result.stats.data_refs_seen, 1);
assert_eq!(result.stats.early_stops, 1);
}
#[test]
fn explorer_reuses_classified_data_objects() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("reuse.journal");
write_entries(
&path,
None,
&[
(&[b"PRIORITY=3"], 1_000),
(&[b"PRIORITY=3"], 2_000),
(&[b"PRIORITY=3"], 3_000),
],
);
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
facets: vec![b"PRIORITY".to_vec()],
limit: 0,
..ExplorerQuery::default()
};
let result = reader.explore(&query).expect("explore");
let priority = result
.facets
.get(b"PRIORITY".as_slice())
.expect("priority facet");
assert_eq!(priority.get(b"3".as_slice()), Some(&3));
assert!(result.stats.data_cache_hits >= 2);
}
#[test]
fn explorer_groups_facets_with_same_filter_set() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("grouped-facets.journal");
write_entries(
&path,
None,
&[
(&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
(&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
],
);
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
limit: 0,
..ExplorerQuery::default()
};
let result = reader.explore(&query).expect("explore");
assert_eq!(result.stats.rows_examined, 2);
assert_eq!(result.stats.facet_rows_matched, 2);
assert_eq!(
result
.facets
.get(b"SERVICE".as_slice())
.and_then(|values| values.get(b"a".as_slice())),
Some(&1)
);
assert_eq!(
result
.facets
.get(b"PRIORITY".as_slice())
.and_then(|values| values.get(b"4".as_slice())),
Some(&1)
);
}
#[test]
fn explorer_combines_rows_histogram_and_facets_in_one_pass() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("combined-pass.journal");
write_entries(
&path,
None,
&[
(&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
(&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
],
);
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
facets: vec![b"SERVICE".to_vec()],
histogram: Some(b"PRIORITY".to_vec()),
histogram_target_buckets: 2,
limit: 2,
..ExplorerQuery::default()
};
let result = reader.explore(&query).expect("explore");
assert_eq!(result.rows.len(), 2);
assert_eq!(result.stats.rows_examined, 2);
assert_eq!(result.stats.rows_matched, 2);
assert_eq!(result.stats.facet_rows_matched, 2);
assert_eq!(
result
.facets
.get(b"SERVICE".as_slice())
.and_then(|values| values.get(b"a".as_slice())),
Some(&1)
);
let histogram_total = result
.histogram
.as_ref()
.expect("histogram")
.buckets
.iter()
.flat_map(|bucket| bucket.values.values())
.sum::<u64>();
assert_eq!(histogram_total, 2);
}
#[test]
fn explorer_sampling_uses_actual_histogram_bucket_count() {
let query = ExplorerQuery {
after_realtime_usec: Some(1_733_494_460_000_000),
before_realtime_usec: Some(1_735_656_412_000_000),
histogram: Some(b"PRIORITY".to_vec()),
histogram_target_buckets: 300,
sampling: Some(ExplorerSampling {
budget: 20_000,
matched_files: 200,
file_head_realtime_usec: 1_733_494_460_000_000,
file_tail_realtime_usec: 1_735_656_412_000_000,
file_head_seqnum: 1,
file_tail_seqnum: 2,
file_entries: 2,
}),
..ExplorerQuery::default()
};
let bucket_count = histogram_bucket_count_for_query(&query).expect("bucket count");
let sampling =
ExplorerSamplingState::for_query(&query, Some(bucket_count)).expect("sampling");
assert_eq!(bucket_count, 302);
assert_eq!(sampling.per_slot_sampled.len(), bucket_count);
}
#[test]
fn explorer_sampling_seqnum_estimate_clamps_over_scanned_to_one() {
let query = ExplorerQuery {
after_realtime_usec: Some(1),
before_realtime_usec: Some(100),
direction: Direction::Forward,
sampling: Some(ExplorerSampling {
budget: 20,
matched_files: 1,
file_head_realtime_usec: 1,
file_tail_realtime_usec: 100,
file_head_seqnum: 1,
file_tail_seqnum: 100,
file_entries: 3,
}),
..ExplorerQuery::default()
};
let mut sampling = ExplorerSamplingState::for_query(&query, None).expect("sampling");
sampling.per_file_sampled = 10;
assert_eq!(sampling.estimate_remaining_rows_by_seqnum(5), Some(1));
}
#[test]
fn explorer_estimated_histogram_distribution_matches_netdata_integer_math() {
let mut histogram = ExplorerHistogram {
field: b"PRIORITY".to_vec(),
buckets: vec![
ExplorerHistogramBucket {
start_realtime_usec: 0,
end_realtime_usec: 10,
values: HashMap::new(),
},
ExplorerHistogramBucket {
start_realtime_usec: 10,
end_realtime_usec: 20,
values: HashMap::new(),
},
ExplorerHistogramBucket {
start_realtime_usec: 20,
end_realtime_usec: 30,
values: HashMap::new(),
},
],
};
let mut stats = ExplorerStats::default();
add_estimated_histogram_range(Some(&mut histogram), 0, 30, 10, &mut stats);
let counts = histogram
.buckets
.iter()
.map(|bucket| {
bucket
.values
.get(EXPLORER_ESTIMATED_VALUE)
.copied()
.unwrap_or_default()
})
.collect::<Vec<_>>();
assert_eq!(counts, vec![3, 3, 3]);
assert_eq!(counts.iter().sum::<u64>(), 9);
}
#[test]
fn explorer_filters_then_combines_outputs_in_one_candidate_pass() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("filtered-combined-pass.journal");
write_entries(
&path,
None,
&[
(&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
(&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
(&[b"SERVICE=c", b"PRIORITY=3"], 3_000),
],
);
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
filters: vec![ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()])],
facets: vec![b"SERVICE".to_vec()],
histogram: Some(b"SERVICE".to_vec()),
histogram_target_buckets: 2,
limit: 10,
..ExplorerQuery::default()
};
let result = reader.explore(&query).expect("explore");
assert_eq!(result.rows.len(), 2);
assert_eq!(result.stats.rows_examined, 2);
assert_eq!(result.stats.rows_matched, 2);
assert_eq!(result.stats.facet_rows_matched, 2);
let service = result
.facets
.get(b"SERVICE".as_slice())
.expect("service facet");
assert_eq!(service.get(b"a".as_slice()), Some(&1));
assert_eq!(service.get(b"c".as_slice()), Some(&1));
assert_eq!(service.get(b"b".as_slice()), None);
}
#[test]
fn explorer_cursor_rows_defer_payload_expansion() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("cursor-only-row.journal");
write_entries(
&path,
None,
&[(&[b"SERVICE=a", b"PRIORITY=3", b"MESSAGE=hello"], 1_000)],
);
let query = ExplorerQuery {
limit: 1,
..ExplorerQuery::default()
};
let mut reader = FileReader::open(&path).expect("open reader");
let result = reader
.explore_with_strategy_cursor_rows(&query, ExplorerStrategy::Traversal)
.expect("explore cursor rows");
assert_eq!(result.rows.len(), 1);
assert!(result.rows[0].payloads.is_empty());
assert_eq!(result.stats.returned_row_expansions, 0);
let cursor = result.rows[0].cursor.clone();
let mut reader = FileReader::open(&path).expect("reopen reader");
reader.seek_cursor(&cursor).expect("seek cursor");
assert!(reader.test_cursor(&cursor).expect("test cursor"));
let mut payloads = Vec::new();
reader
.collect_entry_payloads(&mut payloads)
.expect("collect payloads");
assert!(payloads.iter().any(|payload| payload == b"MESSAGE=hello"));
}
#[test]
fn explorer_same_field_filter_exclusion_counts_filtered_out_facet_values() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("same-field-filter-facet.journal");
write_entries(
&path,
None,
&[
(&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
(&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
(&[b"SERVICE=a", b"PRIORITY=4"], 3_000),
],
);
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
filters: vec![
ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec()]),
ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
],
facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
limit: 0,
..ExplorerQuery::default()
};
let result = reader.explore(&query).expect("explore");
let service = result
.facets
.get(b"SERVICE".as_slice())
.expect("service facet");
assert_eq!(service.get(b"a".as_slice()), Some(&1));
assert_eq!(service.get(b"b".as_slice()), Some(&1));
let priority = result
.facets
.get(b"PRIORITY".as_slice())
.expect("priority facet");
assert_eq!(priority.get(b"3".as_slice()), Some(&1));
assert_eq!(priority.get(b"4".as_slice()), Some(&1));
}
#[test]
fn explorer_index_strategy_matches_traversal_for_all_values() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("indexed-all-values.journal");
write_entries(
&path,
None,
&[
(&[b"SERVICE=a", b"PRIORITY=3", b"TAG=x"], 1_000),
(&[b"SERVICE=b", b"PRIORITY=3", b"TAG=x"], 2_000),
(&[b"SERVICE=a", b"PRIORITY=4", b"TAG=y", b"TAG=z"], 3_000),
(&[b"PRIORITY=3"], 4_000),
],
);
let query = ExplorerQuery {
after_realtime_usec: Some(0),
before_realtime_usec: Some(5_000),
filters: vec![ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()])],
facets: vec![b"SERVICE".to_vec(), b"TAG".to_vec()],
histogram: Some(b"SERVICE".to_vec()),
histogram_target_buckets: 2,
limit: 2,
field_mode: ExplorerFieldMode::AllValues,
use_source_realtime: false,
..ExplorerQuery::default()
};
let mut reader = FileReader::open(&path).expect("open reader");
let result = reader
.explore_with_strategy(&query, ExplorerStrategy::Compare)
.expect("compare");
let comparison = result.comparison.as_ref().expect("comparison diagnostics");
assert_eq!(comparison.index_stats, result.stats);
assert_eq!(comparison.traversal_stats.rows_returned, 2);
assert_eq!(comparison.index_stats.rows_returned, 2);
assert_eq!(result.rows.len(), 2);
let service = result
.facets
.get(b"SERVICE".as_slice())
.expect("service facet");
assert_eq!(service.get(b"a".as_slice()), Some(&1));
assert_eq!(service.get(b"b".as_slice()), Some(&1));
assert_eq!(service.get(UNSET_VALUE), Some(&1));
let histogram = result.histogram.as_ref().expect("histogram");
assert_eq!(histogram.buckets.len(), 2);
assert_eq!(histogram.buckets[0].values.get(b"a".as_slice()), Some(&1));
assert_eq!(histogram.buckets[0].values.get(b"b".as_slice()), Some(&1));
assert_eq!(histogram.buckets[0].values.get(UNSET_VALUE), Some(&1));
}
#[test]
fn explorer_index_strategy_preserves_same_field_filter_exclusion() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("indexed-same-field-filter.journal");
write_entries(
&path,
None,
&[
(&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
(&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
(&[b"SERVICE=a", b"PRIORITY=4"], 3_000),
],
);
let query = ExplorerQuery {
filters: vec![
ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec()]),
ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
],
facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
field_mode: ExplorerFieldMode::AllValues,
use_source_realtime: false,
..ExplorerQuery::default()
};
let mut reader = FileReader::open(&path).expect("open reader");
let result = reader
.explore_with_strategy(&query, ExplorerStrategy::Compare)
.expect("compare");
let service = result
.facets
.get(b"SERVICE".as_slice())
.expect("service facet");
assert_eq!(service.get(b"a".as_slice()), Some(&1));
assert_eq!(service.get(b"b".as_slice()), Some(&1));
}
#[test]
fn explorer_index_strategy_rejects_first_value_semantics() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("indexed-first-value.journal");
write_entries(&path, None, &[(&[b"TAG=one", b"TAG=two"], 1_000)]);
let mut reader = FileReader::open(&path).expect("open reader");
let err = reader
.explore_with_strategy(
&ExplorerQuery {
facets: vec![b"TAG".to_vec()],
field_mode: ExplorerFieldMode::FirstValue,
..ExplorerQuery::default()
},
ExplorerStrategy::Index,
)
.expect_err("first-value index strategy should be rejected");
assert!(matches!(err, SdkError::Unsupported(_)));
}
#[test]
fn explorer_first_value_counts_one_value_per_selected_field() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("first-value.journal");
write_entries(
&path,
None,
&[(&[b"TAG=one", b"TAG=two", b"SERVICE=a"], 1_000)],
);
let mut all_values_reader = FileReader::open(&path).expect("open all-values reader");
let all_values = all_values_reader
.explore(&ExplorerQuery {
facets: vec![b"TAG".to_vec()],
limit: 0,
field_mode: ExplorerFieldMode::AllValues,
..ExplorerQuery::default()
})
.expect("all-values explore");
let all_tag = all_values
.facets
.get(b"TAG".as_slice())
.expect("all-values tag facet");
assert_eq!(all_tag.values().sum::<u64>(), 2);
assert_eq!(all_tag.len(), 2);
let mut first_value_reader = FileReader::open(&path).expect("open first-value reader");
let first_value = first_value_reader
.explore(&ExplorerQuery {
facets: vec![b"TAG".to_vec()],
limit: 0,
field_mode: ExplorerFieldMode::FirstValue,
..ExplorerQuery::default()
})
.expect("first-value explore");
let first_tag = first_value
.facets
.get(b"TAG".as_slice())
.expect("first-value tag facet");
assert_eq!(first_tag.values().sum::<u64>(), 1);
assert_eq!(first_tag.len(), 1);
assert_eq!(first_value.stats.early_stops, 1);
}
#[test]
fn explorer_first_value_does_not_double_count_duplicate_facets_or_histogram() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("first-value-no-double-count.journal");
write_entries(
&path,
None,
&[(
&[
b"_SOURCE_REALTIME_TIMESTAMP=1000",
b"TAG=one",
b"TAG=two",
b"MESSAGE=after-tag",
],
1_000,
)],
);
let mut reader = FileReader::open(&path).expect("open reader");
let result = reader
.explore(&ExplorerQuery {
facets: vec![b"TAG".to_vec()],
histogram: Some(b"TAG".to_vec()),
histogram_target_buckets: 1,
limit: 0,
..ExplorerQuery::default()
})
.expect("explore");
assert_eq!(
result
.facets
.get(b"TAG".as_slice())
.expect("tag facet")
.values()
.sum::<u64>(),
1
);
assert_eq!(
result
.histogram
.as_ref()
.expect("histogram")
.buckets
.iter()
.flat_map(|bucket| bucket.values.values())
.sum::<u64>(),
1
);
let mut all_values_reader = FileReader::open(&path).expect("open all-values reader");
let all_values = all_values_reader
.explore(&ExplorerQuery {
facets: vec![b"TAG".to_vec()],
histogram: Some(b"TAG".to_vec()),
histogram_target_buckets: 1,
limit: 0,
field_mode: ExplorerFieldMode::AllValues,
..ExplorerQuery::default()
})
.expect("all-values explore");
assert_eq!(
all_values
.facets
.get(b"TAG".as_slice())
.expect("tag facet")
.values()
.sum::<u64>(),
2
);
assert_eq!(
all_values
.histogram
.as_ref()
.expect("histogram")
.buckets
.iter()
.flat_map(|bucket| bucket.values.values())
.sum::<u64>(),
2
);
}
#[test]
fn explorer_first_value_tracks_required_field_identities() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("first-value-identities.journal");
write_entries(
&path,
None,
&[(&[b"TAG=one", b"TAG=two", b"SERVICE=a"], 1_000)],
);
let mut reader = FileReader::open(&path).expect("open reader");
let result = reader
.explore(&ExplorerQuery {
facets: vec![b"TAG".to_vec(), b"SERVICE".to_vec()],
limit: 0,
field_mode: ExplorerFieldMode::FirstValue,
..ExplorerQuery::default()
})
.expect("explore");
assert_eq!(
result
.facets
.get(b"TAG".as_slice())
.expect("tag facet")
.values()
.sum::<u64>(),
1
);
assert_eq!(
result
.facets
.get(b"SERVICE".as_slice())
.and_then(|values| values.get(b"a".as_slice())),
Some(&1)
);
assert_eq!(result.stats.early_stops, 1);
}
#[test]
fn explorer_rejects_duplicate_facet_fields() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("duplicate-facets.journal");
write_entries(&path, None, &[(&[b"SERVICE=a"], 1_000)]);
let mut reader = FileReader::open(&path).expect("open reader");
let err = reader
.explore(&ExplorerQuery {
facets: vec![b"SERVICE".to_vec(), b"SERVICE".to_vec()],
limit: 0,
..ExplorerQuery::default()
})
.expect_err("duplicate facets rejected");
assert!(err.to_string().contains("must not be duplicated"));
}
#[test]
fn explorer_empty_result_keeps_requested_facet_with_no_values() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("empty-result.journal");
write_entries(&path, None, &[(&[b"SERVICE=a", b"PRIORITY=3"], 1_000)]);
let mut reader = FileReader::open(&path).expect("open reader");
let result = reader
.explore(&ExplorerQuery {
after_realtime_usec: Some(10_000),
before_realtime_usec: Some(20_000),
facets: vec![b"SERVICE".to_vec()],
limit: 10,
realtime_slack_usec: 0,
..ExplorerQuery::default()
})
.expect("explore");
assert!(result.rows.is_empty());
assert_eq!(result.stats.rows_matched, 0);
assert!(
result
.facets
.get(b"SERVICE".as_slice())
.expect("service facet")
.is_empty()
);
}
#[test]
fn explorer_facet_time_bounds_do_not_count_slack_rows_without_source_realtime() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("facet-time-bound.journal");
write_entries(
&path,
None,
&[
(&[b"SERVICE=before"], 340_000_000),
(&[b"SERVICE=inside"], 360_000_000),
(&[b"SERVICE=after"], 400_000_000),
],
);
let mut reader = FileReader::open(&path).expect("open reader");
let result = reader
.explore(&ExplorerQuery {
after_realtime_usec: Some(350_000_000),
before_realtime_usec: Some(370_000_000),
facets: vec![b"SERVICE".to_vec()],
limit: 0,
realtime_slack_usec: 20_000_000,
use_source_realtime: false,
..ExplorerQuery::default()
})
.expect("explore");
let service = result
.facets
.get(b"SERVICE".as_slice())
.expect("service facet");
assert_eq!(service.get(b"inside".as_slice()), Some(&1));
assert_eq!(service.get(b"before".as_slice()), None);
assert_eq!(service.get(b"after".as_slice()), None);
assert_eq!(result.stats.facet_rows_matched, 1);
}
#[test]
fn explorer_fts_disables_first_value_early_stop() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("fts-no-early-stop.journal");
write_entries(&path, None, &[(&[b"TAG=one", b"MESSAGE=needle"], 1_000)]);
let mut reader = FileReader::open(&path).expect("open reader");
let result = reader
.explore(&ExplorerQuery {
facets: vec![b"TAG".to_vec()],
fts_patterns: vec![b"needle".to_vec()],
limit: 0,
..ExplorerQuery::default()
})
.expect("explore");
assert_eq!(result.stats.early_stops, 0);
assert_eq!(result.stats.data_refs_seen, 2);
assert_eq!(
result
.facets
.get(b"TAG".as_slice())
.and_then(|values| values.get(b"one".as_slice())),
Some(&1)
);
}
#[test]
fn explorer_fts_or_terms_and_negative_terms_filter_rows() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("fts-negative.journal");
write_entries(
&path,
None,
&[
(&[b"TAG=alpha", b"MESSAGE=alpha keep"], 1_000),
(&[b"TAG=beta", b"MESSAGE=beta keep"], 2_000),
(&[b"TAG=debug", b"MESSAGE=alpha debug"], 3_000),
(&[b"TAG=other", b"MESSAGE=other"], 4_000),
(&[b"TAG=wild", b"MESSAGE=start middle end"], 5_000),
],
);
let mut reader = FileReader::open(&path).expect("open reader");
let result = reader
.explore(&ExplorerQuery {
facets: vec![b"TAG".to_vec()],
fts_terms: vec![
ExplorerFtsPattern::substring(b"alpha".to_vec(), false),
ExplorerFtsPattern::substring(b"beta".to_vec(), false),
ExplorerFtsPattern::substring(b"debug".to_vec(), true),
ExplorerFtsPattern::substring(b"start*end".to_vec(), false),
],
limit: 10,
..ExplorerQuery::default()
})
.expect("explore");
let tag = result.facets.get(b"TAG".as_slice()).expect("TAG facet");
assert_eq!(result.rows.len(), 3);
assert_eq!(tag.get(b"alpha".as_slice()), Some(&1));
assert_eq!(tag.get(b"beta".as_slice()), Some(&1));
assert_eq!(tag.get(b"wild".as_slice()), Some(&1));
assert_eq!(tag.get(b"debug".as_slice()), None);
assert_eq!(tag.get(b"other".as_slice()), None);
}
#[test]
fn explorer_auto_anchor_scans_backward_from_tail() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("backward.journal");
write_entries(
&path,
None,
&[
(&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
(&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
],
);
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
direction: Direction::Backward,
limit: 2,
..ExplorerQuery::default()
};
let result = reader.explore(&query).expect("explore");
assert_eq!(result.rows.len(), 2);
assert_eq!(result.rows[0].realtime_usec, 2_000);
assert_eq!(result.rows[1].realtime_usec, 1_000);
}
#[test]
fn explorer_backward_time_bound_stops_after_slack_window() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("backward-time-bound.journal");
write_entries(
&path,
None,
&[
(&[b"SERVICE=a"], 100_000_000),
(&[b"SERVICE=b"], 200_000_000),
(&[b"SERVICE=c"], 300_000_000),
(&[b"SERVICE=d"], 400_000_000),
(&[b"SERVICE=e"], 500_000_000),
],
);
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
after_realtime_usec: Some(350_000_000),
direction: Direction::Backward,
limit: 10,
realtime_slack_usec: 10_000_000,
..ExplorerQuery::default()
};
let result = reader.explore(&query).expect("explore");
assert_eq!(result.rows.len(), 2);
assert_eq!(result.rows[0].realtime_usec, 500_000_000);
assert_eq!(result.rows[1].realtime_usec, 400_000_000);
assert_eq!(result.stats.rows_examined, 2);
}
#[test]
fn explorer_histogram_and_fts_are_opt_in() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("histogram.journal");
write_entries(
&path,
None,
&[
(&[b"MESSAGE=alpha", b"PRIORITY=3"], 1_000),
(&[b"MESSAGE=beta", b"PRIORITY=4"], 2_000),
],
);
let mut reader = FileReader::open(&path).expect("open reader");
let query = ExplorerQuery {
after_realtime_usec: Some(0),
before_realtime_usec: Some(3_000),
histogram: Some(b"PRIORITY".to_vec()),
histogram_target_buckets: 2,
fts_patterns: vec![b"alp".to_vec()],
limit: 10,
..ExplorerQuery::default()
};
let result = reader.explore(&query).expect("explore");
assert_eq!(result.rows.len(), 1);
assert!(result.stats.fts_scans > 0);
assert_eq!(
result
.histogram
.as_ref()
.expect("histogram")
.buckets
.iter()
.flat_map(|bucket| bucket.values.values())
.sum::<u64>(),
1
);
}
#[test]
fn explorer_first_value_stops_after_same_data_satisfies_multiple_roles() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("same-data-multiple-roles.journal");
write_entries(
&path,
None,
&[(
&[b"_SOURCE_REALTIME_TIMESTAMP=1000", b"MESSAGE=after-source"],
1_000,
)],
);
let mut reader = FileReader::open(&path).expect("open reader");
let result = reader
.explore(&ExplorerQuery {
histogram: Some(SOURCE_REALTIME_FIELD.to_vec()),
histogram_target_buckets: 1,
limit: 0,
field_mode: ExplorerFieldMode::FirstValue,
..ExplorerQuery::default()
})
.expect("explore");
assert_eq!(result.stats.histogram_updates, 1);
assert_eq!(result.stats.early_stops, 1);
assert_eq!(
result
.histogram
.as_ref()
.expect("histogram")
.buckets
.iter()
.flat_map(|bucket| bucket.values.values())
.sum::<u64>(),
1
);
}
}