Skip to main content

journal/
explorer.rs

1use super::*;
2use journal_core::file::{DataObject, offset_array::InlinedCursor};
3use std::collections::{HashMap, HashSet};
4use std::time::{Duration, Instant};
5
6const DEFAULT_HISTOGRAM_TARGET_BUCKETS: usize = 150;
7const DEFAULT_TIME_SLACK_USEC: u64 = 120_000_000;
8const EXPLORER_CONTROL_CHECK_EVERY_ROWS: u64 = 8192;
9const DEFAULT_ROWS_FULL_CHECK_EVERY_ROWS: u64 = 1;
10const EXPLORER_SAMPLING_SLOTS_MAX: usize = 1000;
11const EXPLORER_SAMPLING_RECALIBRATE_ROWS: u64 = 10_000;
12const EXPLORER_SAMPLING_ESTIMATE_AFTER_PROGRESS: f64 = 0.01;
13const SOURCE_REALTIME_FIELD: &[u8] = b"_SOURCE_REALTIME_TIMESTAMP";
14const UNSET_VALUE: &[u8] = b"-";
15const EXPLORER_UNSAMPLED_VALUE: &[u8] = b"[unsampled]";
16const EXPLORER_ESTIMATED_VALUE: &[u8] = b"[estimated]";
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum ExplorerAnchor {
20    Auto,
21    Head,
22    Tail,
23    Realtime(u64),
24}
25
26impl Default for ExplorerAnchor {
27    fn default() -> Self {
28        Self::Auto
29    }
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum ExplorerFieldMode {
34    AllValues,
35    FirstValue,
36}
37
38impl Default for ExplorerFieldMode {
39    fn default() -> Self {
40        Self::FirstValue
41    }
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45#[non_exhaustive]
46pub enum ExplorerStrategy {
47    Traversal,
48    Index,
49    Compare,
50}
51
52impl Default for ExplorerStrategy {
53    fn default() -> Self {
54        Self::Traversal
55    }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct ExplorerFilter {
60    pub field: Vec<u8>,
61    pub values: Vec<Vec<u8>>,
62}
63
64impl ExplorerFilter {
65    pub fn new(
66        field: impl Into<Vec<u8>>,
67        values: impl IntoIterator<Item = impl Into<Vec<u8>>>,
68    ) -> Self {
69        Self {
70            field: field.into(),
71            values: values.into_iter().map(Into::into).collect(),
72        }
73    }
74}
75
76#[derive(Debug, Clone)]
77pub struct ExplorerQuery {
78    pub after_realtime_usec: Option<u64>,
79    pub before_realtime_usec: Option<u64>,
80    pub anchor: ExplorerAnchor,
81    pub direction: Direction,
82    pub limit: usize,
83    pub filters: Vec<ExplorerFilter>,
84    pub facets: Vec<Vec<u8>>,
85    pub histogram: Option<Vec<u8>>,
86    pub histogram_target_buckets: usize,
87    pub fts_terms: Vec<ExplorerFtsPattern>,
88    pub fts_patterns: Vec<Vec<u8>>,
89    pub fts_negative_patterns: Vec<Vec<u8>>,
90    pub field_mode: ExplorerFieldMode,
91    pub exclude_facet_field_filters: bool,
92    pub use_source_realtime: bool,
93    pub realtime_slack_usec: u64,
94    pub stop_when_rows_full: bool,
95    pub stop_when_rows_full_check_every: u64,
96    pub sampling: Option<ExplorerSampling>,
97    /// Debug-only discrepancy tool. Production explorer callers must never set
98    /// this; column catalogs belong to FIELD indexes, not row traversal.
99    #[doc(hidden)]
100    pub debug_collect_column_fields_by_row_traversal: bool,
101}
102
103impl Default for ExplorerQuery {
104    fn default() -> Self {
105        Self {
106            after_realtime_usec: None,
107            before_realtime_usec: None,
108            anchor: ExplorerAnchor::Auto,
109            direction: Direction::Forward,
110            limit: 200,
111            filters: Vec::new(),
112            facets: Vec::new(),
113            histogram: None,
114            histogram_target_buckets: DEFAULT_HISTOGRAM_TARGET_BUCKETS,
115            fts_terms: Vec::new(),
116            fts_patterns: Vec::new(),
117            fts_negative_patterns: Vec::new(),
118            field_mode: ExplorerFieldMode::FirstValue,
119            exclude_facet_field_filters: true,
120            use_source_realtime: true,
121            realtime_slack_usec: DEFAULT_TIME_SLACK_USEC,
122            stop_when_rows_full: false,
123            stop_when_rows_full_check_every: DEFAULT_ROWS_FULL_CHECK_EVERY_ROWS,
124            sampling: None,
125            debug_collect_column_fields_by_row_traversal: false,
126        }
127    }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub struct ExplorerSampling {
132    pub budget: u64,
133    pub matched_files: u64,
134    pub file_head_realtime_usec: u64,
135    pub file_tail_realtime_usec: u64,
136    pub file_head_seqnum: u64,
137    pub file_tail_seqnum: u64,
138    pub file_entries: u64,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct ExplorerFtsPattern {
143    pub parts: Vec<Vec<u8>>,
144    pub negative: bool,
145}
146
147impl ExplorerFtsPattern {
148    pub fn substring(pattern: impl Into<Vec<u8>>, negative: bool) -> Self {
149        let pattern = pattern.into();
150        let parts = pattern
151            .split(|byte| *byte == b'*')
152            .filter(|part| !part.is_empty())
153            .map(|part| part.to_vec())
154            .collect();
155        Self { parts, negative }
156    }
157
158    fn matches(&self, value: &[u8]) -> bool {
159        if value.is_empty() {
160            return false;
161        }
162        if self.parts.is_empty() {
163            return true;
164        }
165
166        let mut haystack = value;
167        for part in &self.parts {
168            let Some(index) = find_ascii_case_insensitive(haystack, part) else {
169                return false;
170            };
171            haystack = &haystack[index.saturating_add(part.len())..];
172        }
173        true
174    }
175}
176
177impl ExplorerQuery {
178    pub fn with_filter(
179        mut self,
180        field: impl Into<Vec<u8>>,
181        values: impl IntoIterator<Item = impl Into<Vec<u8>>>,
182    ) -> Self {
183        self.filters.push(ExplorerFilter::new(field, values));
184        self
185    }
186
187    pub fn with_facet(mut self, field: impl Into<Vec<u8>>) -> Self {
188        self.facets.push(field.into());
189        self
190    }
191
192    pub fn with_histogram(mut self, field: impl Into<Vec<u8>>) -> Self {
193        self.histogram = Some(field.into());
194        self
195    }
196
197    pub fn with_fts_pattern(mut self, pattern: impl Into<Vec<u8>>) -> Self {
198        let pattern = pattern.into();
199        self.fts_terms
200            .push(ExplorerFtsPattern::substring(pattern.clone(), false));
201        self.fts_patterns.push(pattern);
202        self
203    }
204
205    pub fn with_fts_negative_pattern(mut self, pattern: impl Into<Vec<u8>>) -> Self {
206        let pattern = pattern.into();
207        self.fts_terms
208            .push(ExplorerFtsPattern::substring(pattern.clone(), true));
209        self.fts_negative_patterns.push(pattern);
210        self
211    }
212}
213
214#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize)]
215pub struct ExplorerStats {
216    pub rows_examined: u64,
217    pub rows_matched: u64,
218    pub facet_rows_matched: u64,
219    pub rows_returned: u64,
220    pub rows_unsampled: u64,
221    pub rows_estimated: u64,
222    pub sampling_sampled: u64,
223    pub sampling_unsampled: u64,
224    pub sampling_estimated: u64,
225    pub last_realtime_usec: u64,
226    pub max_source_realtime_delta_usec: u64,
227    pub data_refs_seen: u64,
228    pub data_refs_skipped: u64,
229    pub data_payloads_loaded: u64,
230    pub data_objects_classified: u64,
231    pub data_cache_hits: u64,
232    pub data_cache_misses: u64,
233    pub payloads_decompressed: u64,
234    pub fts_scans: u64,
235    pub facet_updates: u64,
236    pub histogram_updates: u64,
237    pub returned_row_expansions: u64,
238    pub early_stop_opportunities: u64,
239    pub early_stops: u64,
240}
241
242#[derive(Debug, Clone)]
243pub struct ExplorerRow {
244    pub realtime_usec: u64,
245    pub cursor: String,
246    pub payloads: Vec<Vec<u8>>,
247}
248
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub(crate) enum ExplorerRowPayloadMode {
251    Expand,
252    CursorOnly,
253}
254
255#[derive(Debug, Clone)]
256pub struct ExplorerHistogramBucket {
257    pub start_realtime_usec: u64,
258    pub end_realtime_usec: u64,
259    pub values: HashMap<Vec<u8>, u64>,
260}
261
262#[derive(Debug, Clone)]
263pub struct ExplorerHistogram {
264    pub field: Vec<u8>,
265    pub buckets: Vec<ExplorerHistogramBucket>,
266}
267
268#[derive(Debug, Clone, Default)]
269pub struct ExplorerComparison {
270    pub traversal_duration: Duration,
271    pub index_duration: Duration,
272    pub traversal_stats: ExplorerStats,
273    pub index_stats: ExplorerStats,
274}
275
276#[derive(Debug, Clone, Default)]
277pub struct ExplorerResult {
278    pub rows: Vec<ExplorerRow>,
279    pub facets: HashMap<Vec<u8>, HashMap<Vec<u8>, u64>>,
280    pub histogram: Option<ExplorerHistogram>,
281    pub column_fields: HashSet<Vec<u8>>,
282    pub stats: ExplorerStats,
283    pub comparison: Option<ExplorerComparison>,
284}
285
286#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
287pub enum ExplorerStopReason {
288    TimedOut,
289    Cancelled,
290}
291
292#[derive(Debug, Clone)]
293pub struct ExplorerProgress {
294    pub stats: ExplorerStats,
295    pub elapsed: Duration,
296}
297
298pub struct ExplorerControl<'a> {
299    deadline: Option<Instant>,
300    cancellation: Option<&'a dyn Fn() -> bool>,
301    progress: Option<&'a mut dyn FnMut(ExplorerProgress)>,
302    candidate_row: Option<&'a mut dyn FnMut(u64) -> bool>,
303    adjust_realtime: Option<&'a mut dyn FnMut(u64) -> u64>,
304    matched_row: Option<&'a mut dyn FnMut(u64, u64) -> bool>,
305    sampling: Option<&'a mut ExplorerSamplingState>,
306    progress_interval: Duration,
307    started: Instant,
308    last_progress: Instant,
309    next_check_rows: u64,
310    stop_reason: Option<ExplorerStopReason>,
311}
312
313impl<'a> ExplorerControl<'a> {
314    pub fn new() -> Self {
315        let now = Instant::now();
316        Self {
317            deadline: None,
318            cancellation: None,
319            progress: None,
320            candidate_row: None,
321            adjust_realtime: None,
322            matched_row: None,
323            sampling: None,
324            progress_interval: Duration::from_millis(250),
325            started: now,
326            last_progress: now,
327            next_check_rows: EXPLORER_CONTROL_CHECK_EVERY_ROWS,
328            stop_reason: None,
329        }
330    }
331
332    pub fn set_deadline(&mut self, deadline: Option<Instant>) {
333        self.deadline = deadline;
334    }
335
336    pub fn set_cancellation_callback(&mut self, cancellation: Option<&'a dyn Fn() -> bool>) {
337        self.cancellation = cancellation;
338    }
339
340    pub fn set_progress_callback(&mut self, progress: Option<&'a mut dyn FnMut(ExplorerProgress)>) {
341        self.progress = progress;
342    }
343
344    pub(crate) fn set_candidate_row_callback(
345        &mut self,
346        candidate_row: Option<&'a mut dyn FnMut(u64) -> bool>,
347    ) {
348        self.candidate_row = candidate_row;
349    }
350
351    pub(crate) fn set_realtime_adjust_callback(
352        &mut self,
353        adjust_realtime: Option<&'a mut dyn FnMut(u64) -> u64>,
354    ) {
355        self.adjust_realtime = adjust_realtime;
356    }
357
358    pub fn set_matched_row_callback(
359        &mut self,
360        matched_row: Option<&'a mut dyn FnMut(u64, u64) -> bool>,
361    ) {
362        self.matched_row = matched_row;
363    }
364
365    pub(crate) fn set_sampling_state(&mut self, sampling: Option<&'a mut ExplorerSamplingState>) {
366        self.sampling = sampling;
367    }
368
369    pub fn set_progress_interval(&mut self, interval: Duration) {
370        self.progress_interval = interval;
371    }
372
373    pub fn stop_reason(&self) -> Option<ExplorerStopReason> {
374        self.stop_reason
375    }
376
377    fn should_stop_after_rows(&mut self, rows_seen: u64, stats: &ExplorerStats) -> bool {
378        if self.stop_reason.is_some() {
379            return true;
380        }
381        if rows_seen < self.next_check_rows {
382            return false;
383        }
384        self.next_check_rows = rows_seen.saturating_add(EXPLORER_CONTROL_CHECK_EVERY_ROWS);
385        self.check(stats)
386    }
387
388    fn check(&mut self, stats: &ExplorerStats) -> bool {
389        let now = Instant::now();
390        if now.duration_since(self.last_progress) >= self.progress_interval {
391            self.emit_progress(stats, now);
392        }
393        if self.cancellation.is_some_and(|is_cancelled| is_cancelled()) {
394            self.stop_reason = Some(ExplorerStopReason::Cancelled);
395            self.emit_progress(stats, now);
396            return true;
397        }
398        if self.deadline.is_some_and(|deadline| now >= deadline) {
399            self.stop_reason = Some(ExplorerStopReason::TimedOut);
400            self.emit_progress(stats, now);
401            return true;
402        }
403        false
404    }
405
406    fn emit_progress(&mut self, stats: &ExplorerStats, now: Instant) {
407        self.last_progress = now;
408        if let Some(progress) = self.progress.as_deref_mut() {
409            progress(ExplorerProgress {
410                stats: stats.clone(),
411                elapsed: now.duration_since(self.started),
412            });
413        }
414    }
415
416    fn emit_matched_row(&mut self, realtime_usec: u64, rows_matched: u64) -> bool {
417        if let Some(matched_row) = self.matched_row.as_deref_mut() {
418            return matched_row(realtime_usec, rows_matched);
419        }
420        false
421    }
422
423    fn adjust_realtime(&mut self, realtime_usec: u64) -> u64 {
424        self.adjust_realtime
425            .as_deref_mut()
426            .map_or(realtime_usec, |adjust_realtime| {
427                adjust_realtime(realtime_usec)
428            })
429    }
430}
431
432impl Default for ExplorerControl<'_> {
433    fn default() -> Self {
434        Self::new()
435    }
436}
437
438#[derive(Debug, Clone, Copy, PartialEq, Eq)]
439enum ExplorerSamplingDecision {
440    Full {
441        sampled: bool,
442    },
443    SkipFields,
444    StopAndEstimate {
445        remaining_rows: u64,
446        from_realtime_usec: u64,
447        to_realtime_usec: u64,
448    },
449}
450
451#[derive(Debug)]
452pub(crate) struct ExplorerSamplingState {
453    start_realtime_usec: u64,
454    end_realtime_usec: u64,
455    file_head_realtime_usec: u64,
456    file_tail_realtime_usec: u64,
457    file_head_seqnum: u64,
458    file_tail_seqnum: u64,
459    file_entries: u64,
460    first_realtime_usec: Option<u64>,
461    step_realtime_usec: u64,
462    enable_after_samples: u64,
463    per_file_enable_after_samples: u64,
464    per_slot_enable_after_samples: u64,
465    sampled: u64,
466    per_file_sampled: u64,
467    per_file_unsampled: u64,
468    per_file_every: u64,
469    per_file_skipped: u64,
470    per_file_recalibrate: u64,
471    per_slot_sampled: Vec<u64>,
472    per_slot_unsampled: Vec<u64>,
473    matched_files: u64,
474    direction: Direction,
475}
476
477impl ExplorerSamplingState {
478    pub(crate) fn for_query(
479        query: &ExplorerQuery,
480        histogram_bucket_count: Option<usize>,
481    ) -> Option<Self> {
482        let sampling = query.sampling?;
483        let start_realtime_usec = query.after_realtime_usec?;
484        let end_realtime_usec = query.before_realtime_usec?;
485        if sampling.budget == 0
486            || sampling.matched_files == 0
487            || start_realtime_usec >= end_realtime_usec
488        {
489            return None;
490        }
491
492        let slots = histogram_bucket_count
493            .unwrap_or(query.histogram_target_buckets)
494            .clamp(2, EXPLORER_SAMPLING_SLOTS_MAX);
495        let delta = end_realtime_usec.saturating_sub(start_realtime_usec);
496        let step_realtime_usec = (delta / slots as u64).saturating_sub(1).max(1);
497        let per_file_enable_after_samples =
498            ((sampling.budget / 4) / sampling.matched_files.max(1)).max(query.limit as u64);
499        let per_slot_enable_after_samples =
500            ((sampling.budget / 4) / slots as u64).max(query.limit as u64);
501
502        Some(Self {
503            start_realtime_usec,
504            end_realtime_usec,
505            file_head_realtime_usec: sampling.file_head_realtime_usec,
506            file_tail_realtime_usec: sampling.file_tail_realtime_usec,
507            file_head_seqnum: sampling.file_head_seqnum,
508            file_tail_seqnum: sampling.file_tail_seqnum,
509            file_entries: sampling.file_entries,
510            first_realtime_usec: None,
511            step_realtime_usec,
512            enable_after_samples: sampling.budget / 2,
513            per_file_enable_after_samples,
514            per_slot_enable_after_samples,
515            sampled: 0,
516            per_file_sampled: 0,
517            per_file_unsampled: 0,
518            per_file_every: 0,
519            per_file_skipped: 0,
520            per_file_recalibrate: 0,
521            per_slot_sampled: vec![0; slots],
522            per_slot_unsampled: vec![0; slots],
523            matched_files: sampling.matched_files.max(1),
524            direction: query.direction,
525        })
526    }
527
528    fn begin_file(&mut self, sampling: ExplorerSampling) {
529        self.file_head_realtime_usec = sampling.file_head_realtime_usec;
530        self.file_tail_realtime_usec = sampling.file_tail_realtime_usec;
531        self.file_head_seqnum = sampling.file_head_seqnum;
532        self.file_tail_seqnum = sampling.file_tail_seqnum;
533        self.file_entries = sampling.file_entries;
534        self.first_realtime_usec = None;
535        self.per_file_sampled = 0;
536        self.per_file_unsampled = 0;
537        self.per_file_every = 0;
538        self.per_file_skipped = 0;
539        self.per_file_recalibrate = 0;
540    }
541
542    fn decide(
543        &mut self,
544        realtime_usec: u64,
545        seqnum: u64,
546        candidate_to_keep: bool,
547    ) -> ExplorerSamplingDecision {
548        if self.first_realtime_usec.is_none() {
549            self.first_realtime_usec = Some(realtime_usec);
550        }
551        if candidate_to_keep {
552            return ExplorerSamplingDecision::Full { sampled: false };
553        }
554
555        let slot = self.slot_for_realtime(realtime_usec);
556        let should_sample = if self.sampled < self.enable_after_samples
557            || self.per_file_sampled < self.per_file_enable_after_samples
558            || self.per_slot_sampled[slot] < self.per_slot_enable_after_samples
559        {
560            true
561        } else if self.per_file_recalibrate >= EXPLORER_SAMPLING_RECALIBRATE_ROWS
562            || self.per_file_every == 0
563        {
564            self.recalibrate(realtime_usec, seqnum);
565            true
566        } else if self.per_file_skipped >= self.per_file_every {
567            self.per_file_skipped = 0;
568            true
569        } else {
570            self.per_file_skipped = self.per_file_skipped.saturating_add(1);
571            false
572        };
573
574        if should_sample {
575            self.sampled = self.sampled.saturating_add(1);
576            self.per_file_sampled = self.per_file_sampled.saturating_add(1);
577            self.per_slot_sampled[slot] = self.per_slot_sampled[slot].saturating_add(1);
578            return ExplorerSamplingDecision::Full { sampled: true };
579        }
580
581        self.per_file_recalibrate = self.per_file_recalibrate.saturating_add(1);
582        self.per_file_unsampled = self.per_file_unsampled.saturating_add(1);
583        self.per_slot_unsampled[slot] = self.per_slot_unsampled[slot].saturating_add(1);
584
585        if self.per_file_unsampled > self.per_file_sampled
586            && self.progress_by_time(realtime_usec) > EXPLORER_SAMPLING_ESTIMATE_AFTER_PROGRESS
587        {
588            let remaining_rows = self.estimate_remaining_rows(realtime_usec, seqnum);
589            let (from_realtime_usec, to_realtime_usec) = self.remaining_range(realtime_usec);
590            return ExplorerSamplingDecision::StopAndEstimate {
591                remaining_rows,
592                from_realtime_usec,
593                to_realtime_usec,
594            };
595        }
596
597        ExplorerSamplingDecision::SkipFields
598    }
599
600    fn slot_for_realtime(&self, realtime_usec: u64) -> usize {
601        let clamped = realtime_usec.clamp(self.start_realtime_usec, self.end_realtime_usec);
602        let slot =
603            (clamped.saturating_sub(self.start_realtime_usec) / self.step_realtime_usec) as usize;
604        slot.min(self.per_slot_sampled.len().saturating_sub(1))
605    }
606
607    fn recalibrate(&mut self, realtime_usec: u64, seqnum: u64) {
608        let remaining_rows = self.estimate_remaining_rows(realtime_usec, seqnum);
609        let wanted_samples = (self.enable_after_samples / self.matched_files).max(1);
610        self.per_file_every = (remaining_rows / wanted_samples).max(1);
611        self.per_file_recalibrate = 0;
612    }
613
614    fn estimate_remaining_rows(&self, realtime_usec: u64, seqnum: u64) -> u64 {
615        if let Some(remaining) = self.estimate_remaining_rows_by_seqnum(seqnum) {
616            return remaining;
617        }
618        self.estimate_remaining_rows_by_time(realtime_usec)
619    }
620
621    fn estimate_remaining_rows_by_seqnum(&self, seqnum: u64) -> Option<u64> {
622        self.validate_seqnum_estimate_inputs(seqnum)?;
623        let scanned_rows = self.scanned_file_rows();
624        let seqnum_span = self.seqnum_span_so_far(seqnum)?;
625        if seqnum_span == 0 {
626            return None;
627        }
628        let proportion_of_all_lines_so_far =
629            bounded_positive_proportion(scanned_rows as f64 / seqnum_span as f64)?;
630        let expected_matching_logs =
631            (proportion_of_all_lines_so_far * self.file_entries as f64) as u64;
632        if expected_matching_logs == 0 {
633            return None;
634        }
635        // Match Netdata systemd-journal.plugin sampling_running_file_query_estimate_remaining_lines():
636        // remaining_logs_by_seqnum = expected_matching_logs_by_seqnum - scanned_lines,
637        // clamped to one when the subtraction reaches zero.
638        Some(expected_matching_logs.saturating_sub(scanned_rows).max(1))
639    }
640
641    fn validate_seqnum_estimate_inputs(&self, seqnum: u64) -> Option<()> {
642        (self.file_entries != 0
643            && self.file_head_seqnum != 0
644            && self.file_tail_seqnum != 0
645            && seqnum != 0)
646            .then_some(())
647    }
648
649    fn scanned_file_rows(&self) -> u64 {
650        self.per_file_sampled
651            .saturating_add(self.per_file_unsampled)
652            .max(1)
653    }
654
655    fn seqnum_span_so_far(&self, seqnum: u64) -> Option<u64> {
656        match self.direction {
657            Direction::Forward => seqnum.checked_sub(self.file_head_seqnum),
658            Direction::Backward => self.file_tail_seqnum.checked_sub(seqnum),
659        }
660    }
661
662    fn estimate_remaining_rows_by_time(&self, realtime_usec: u64) -> u64 {
663        let scanned_rows = self.scanned_file_rows();
664        let (after, before) = self.overlapping_timeframe(realtime_usec);
665        let total_time = self
666            .remaining_time_bounds(realtime_usec, after, before)
667            .0
668            .max(1);
669        let remaining_time = self.remaining_time_bounds(realtime_usec, after, before).1;
670        let elapsed = total_time.saturating_sub(remaining_time).max(1);
671        let mut proportion_by_time = elapsed as f64 / total_time as f64;
672        if proportion_by_time == 0.0 || proportion_by_time > 1.0 || !proportion_by_time.is_finite()
673        {
674            proportion_by_time = 1.0;
675        }
676        let mut expected_total = (scanned_rows as f64 / proportion_by_time) as u64;
677        if self.file_entries != 0 && expected_total > self.file_entries {
678            expected_total = self.file_entries;
679        }
680        expected_total.saturating_sub(scanned_rows).max(1)
681    }
682
683    fn progress_by_time(&self, realtime_usec: u64) -> f64 {
684        let (after, before) = self.overlapping_timeframe(realtime_usec);
685        let total_time = before.saturating_sub(after).max(1);
686        let elapsed = match self.direction {
687            Direction::Forward => realtime_usec.saturating_sub(after),
688            Direction::Backward => before.saturating_sub(realtime_usec),
689        }
690        .min(total_time);
691        elapsed as f64 / total_time as f64
692    }
693
694    fn overlapping_timeframe(&self, realtime_usec: u64) -> (u64, u64) {
695        match self.direction {
696            Direction::Forward => {
697                let mut oldest = self
698                    .first_realtime_usec
699                    .or((self.file_head_realtime_usec != 0).then_some(self.file_head_realtime_usec))
700                    .unwrap_or(self.start_realtime_usec);
701                let mut newest = if self.file_tail_realtime_usec != 0 {
702                    self.end_realtime_usec.min(self.file_tail_realtime_usec)
703                } else {
704                    self.end_realtime_usec
705                };
706                if newest <= oldest {
707                    newest = oldest.saturating_add(1);
708                }
709                if realtime_usec < oldest {
710                    oldest = realtime_usec.saturating_sub(1);
711                }
712                (oldest, newest)
713            }
714            Direction::Backward => {
715                let mut newest = self
716                    .first_realtime_usec
717                    .or((self.file_tail_realtime_usec != 0).then_some(self.file_tail_realtime_usec))
718                    .unwrap_or(self.end_realtime_usec);
719                let oldest = if self.file_head_realtime_usec != 0 {
720                    self.start_realtime_usec.max(self.file_head_realtime_usec)
721                } else {
722                    self.start_realtime_usec
723                };
724                if newest <= oldest {
725                    newest = oldest.saturating_add(1);
726                }
727                if newest < realtime_usec {
728                    newest = realtime_usec.saturating_add(1);
729                }
730                (oldest, newest)
731            }
732        }
733    }
734
735    fn remaining_range(&self, realtime_usec: u64) -> (u64, u64) {
736        let (after, before) = self.overlapping_timeframe(realtime_usec);
737        let (_, _, remaining_start, remaining_end) =
738            self.remaining_time_details(realtime_usec, after, before);
739        (remaining_start, remaining_end)
740    }
741
742    fn remaining_time_bounds(&self, realtime_usec: u64, after: u64, before: u64) -> (u64, u64) {
743        let (total, remaining, _, _) = self.remaining_time_details(realtime_usec, after, before);
744        (total, remaining)
745    }
746
747    fn remaining_time_details(
748        &self,
749        realtime_usec: u64,
750        mut after: u64,
751        mut before: u64,
752    ) -> (u64, u64, u64, u64) {
753        if realtime_usec <= after {
754            after = realtime_usec.saturating_sub(1);
755        }
756        if realtime_usec >= before {
757            before = realtime_usec.saturating_add(1);
758        }
759        if before <= after {
760            before = after.saturating_add(1);
761        }
762        let (remaining_start, remaining_end) = match self.direction {
763            Direction::Forward => (realtime_usec, before),
764            Direction::Backward => (after, realtime_usec),
765        };
766        (
767            before.saturating_sub(after).max(1),
768            remaining_end.saturating_sub(remaining_start),
769            remaining_start,
770            remaining_end,
771        )
772    }
773}
774
775pub(crate) fn histogram_bucket_count_for_query(query: &ExplorerQuery) -> Option<usize> {
776    query
777        .histogram
778        .as_deref()
779        .map(|field| new_histogram(field, query).buckets.len())
780}
781
782#[derive(Default)]
783struct RowScan {
784    timestamp: Option<u64>,
785    fts_matches: bool,
786    fts_negative_match: bool,
787    column_fields: Vec<Vec<u8>>,
788}
789
790const FACET_PUBLIC: u8 = 0x01;
791const FACET_HISTOGRAM: u8 = 0x02;
792const FACET_SOURCE_REALTIME: u8 = 0x04;
793
794#[derive(Clone, Copy, Debug, PartialEq, Eq)]
795enum OffsetClass {
796    Irrelevant,
797    FtsMatch,
798    FtsNegativeMatch,
799    Value(usize),
800}
801
802impl OffsetClass {
803    const IRRELEVANT_RAW: usize = 1;
804    const FTS_MATCH_RAW: usize = 2;
805    const FTS_NEGATIVE_MATCH_RAW: usize = 3;
806    const VALUE_BASE: usize = 4;
807
808    fn to_raw(self) -> usize {
809        match self {
810            Self::Irrelevant => Self::IRRELEVANT_RAW,
811            Self::FtsMatch => Self::FTS_MATCH_RAW,
812            Self::FtsNegativeMatch => Self::FTS_NEGATIVE_MATCH_RAW,
813            Self::Value(index) => Self::VALUE_BASE.saturating_add(index),
814        }
815    }
816
817    fn from_raw(raw: usize) -> Self {
818        match raw {
819            Self::IRRELEVANT_RAW => Self::Irrelevant,
820            Self::FTS_MATCH_RAW => Self::FtsMatch,
821            Self::FTS_NEGATIVE_MATCH_RAW => Self::FtsNegativeMatch,
822            raw => Self::Value(raw.saturating_sub(Self::VALUE_BASE)),
823        }
824    }
825}
826
827#[derive(Clone, Copy, Debug, Default)]
828struct OffsetClassSlot {
829    offset: u64,
830    class: usize,
831}
832
833#[derive(Debug)]
834struct OffsetClassCache {
835    slots: Vec<OffsetClassSlot>,
836    len: usize,
837}
838
839impl Default for OffsetClassCache {
840    fn default() -> Self {
841        Self {
842            slots: vec![OffsetClassSlot::default(); 256],
843            len: 0,
844        }
845    }
846}
847
848impl OffsetClassCache {
849    fn lookup(&self, offset: NonZeroU64) -> Option<OffsetClass> {
850        let mask = self.slots.len().saturating_sub(1);
851        let mut index = offset_slot(offset.get()) & mask;
852        for _ in 0..self.slots.len() {
853            let slot = self.slots[index];
854            if slot.offset == 0 {
855                return None;
856            }
857            if slot.offset == offset.get() {
858                return Some(OffsetClass::from_raw(slot.class));
859            }
860            index = (index + 1) & mask;
861        }
862        None
863    }
864
865    fn insert(&mut self, offset: NonZeroU64, class: OffsetClass) {
866        if (self.len + 1).saturating_mul(4) >= self.slots.len().saturating_mul(3) {
867            self.grow();
868        }
869        self.insert_raw(offset.get(), class.to_raw());
870    }
871
872    fn grow(&mut self) {
873        let new_len = self.slots.len().saturating_mul(2).max(256);
874        let old = std::mem::replace(&mut self.slots, vec![OffsetClassSlot::default(); new_len]);
875        self.len = 0;
876        for slot in old {
877            if slot.offset != 0 {
878                self.insert_raw(slot.offset, slot.class);
879            }
880        }
881    }
882
883    fn insert_raw(&mut self, offset: u64, class: usize) {
884        let mask = self.slots.len().saturating_sub(1);
885        let mut index = offset_slot(offset) & mask;
886        loop {
887            if self.slots[index].offset == 0 {
888                self.slots[index] = OffsetClassSlot { offset, class };
889                self.len += 1;
890                return;
891            }
892            if self.slots[index].offset == offset {
893                self.slots[index].class = class;
894                return;
895            }
896            index = (index + 1) & mask;
897        }
898    }
899}
900
901fn offset_slot(offset: u64) -> usize {
902    let mut value = offset >> 3;
903    value ^= value >> 33;
904    value = value.wrapping_mul(0xff51afd7ed558ccd);
905    value ^= value >> 33;
906    value as usize
907}
908
909struct ExplorerAccumulator {
910    field_lookup: HashMap<Vec<u8>, usize>,
911    fields: Vec<Vec<u8>>,
912    flags: Vec<u8>,
913    last_seen_row_ids: Vec<u64>,
914    unset_counts: Vec<u64>,
915    values_by_field: Vec<Vec<usize>>,
916    value_counts: Vec<u64>,
917    value_field_indices: Vec<usize>,
918    value_labels: Vec<Vec<u8>>,
919    value_fts_matches: Vec<bool>,
920    value_source_realtime: Vec<Option<u64>>,
921    value_histogram_buckets: Vec<Option<Vec<u64>>>,
922    field_histogram_unset_buckets: Vec<Option<Vec<u64>>>,
923    offset_cache: OffsetClassCache,
924    histogram_start_realtime_usec: u64,
925    histogram_bucket_width_usec: u64,
926    histogram_bucket_count: usize,
927    required_identity_count: usize,
928}
929
930impl ExplorerAccumulator {
931    fn for_main(query: &ExplorerQuery, histogram: Option<&ExplorerHistogram>) -> Self {
932        Self::for_combined(query, &[], histogram)
933    }
934
935    fn for_facets(
936        query: &ExplorerQuery,
937        facet_indices: &[usize],
938        include_source_realtime: bool,
939    ) -> Self {
940        let mut out = Self::new(None);
941        for facet_index in facet_indices {
942            if let Some(field) = query.facets.get(*facet_index) {
943                out.add_field(field, FACET_PUBLIC);
944            }
945        }
946        if include_source_realtime {
947            out.add_field(SOURCE_REALTIME_FIELD, FACET_SOURCE_REALTIME);
948        }
949        out
950    }
951
952    fn for_combined(
953        query: &ExplorerQuery,
954        facet_indices: &[usize],
955        histogram: Option<&ExplorerHistogram>,
956    ) -> Self {
957        let mut out = Self::new(histogram);
958        if let Some(field) = &query.histogram {
959            out.add_field(field, FACET_HISTOGRAM);
960        }
961        for facet_index in facet_indices {
962            if let Some(field) = query.facets.get(*facet_index) {
963                out.add_field(field, FACET_PUBLIC);
964            }
965        }
966        if query_needs_source_realtime_main(query) || facet_pass_needs_source_realtime(query) {
967            out.add_field(SOURCE_REALTIME_FIELD, FACET_SOURCE_REALTIME);
968        }
969        out
970    }
971
972    fn new(histogram: Option<&ExplorerHistogram>) -> Self {
973        Self {
974            field_lookup: HashMap::new(),
975            fields: Vec::new(),
976            flags: Vec::new(),
977            last_seen_row_ids: Vec::new(),
978            unset_counts: Vec::new(),
979            values_by_field: Vec::new(),
980            value_counts: Vec::new(),
981            value_field_indices: Vec::new(),
982            value_labels: Vec::new(),
983            value_fts_matches: Vec::new(),
984            value_source_realtime: Vec::new(),
985            value_histogram_buckets: Vec::new(),
986            field_histogram_unset_buckets: Vec::new(),
987            offset_cache: OffsetClassCache::default(),
988            histogram_start_realtime_usec: histogram
989                .and_then(|histogram| histogram.buckets.first())
990                .map(|bucket| bucket.start_realtime_usec)
991                .unwrap_or_default(),
992            histogram_bucket_width_usec: histogram
993                .and_then(|histogram| histogram.buckets.first())
994                .map(|bucket| {
995                    bucket
996                        .end_realtime_usec
997                        .saturating_sub(bucket.start_realtime_usec)
998                        .max(1)
999                })
1000                .unwrap_or(1),
1001            histogram_bucket_count: histogram
1002                .map(|histogram| histogram.buckets.len())
1003                .unwrap_or_default(),
1004            required_identity_count: 0,
1005        }
1006    }
1007
1008    fn add_field(&mut self, field: &[u8], flags: u8) {
1009        if let Some(index) = self.field_lookup.get(field).copied() {
1010            let had_required = self.flags[index] != 0;
1011            self.flags[index] |= flags;
1012            if flags & FACET_HISTOGRAM != 0 && self.field_histogram_unset_buckets[index].is_none() {
1013                self.field_histogram_unset_buckets[index] =
1014                    Some(vec![0; self.histogram_bucket_count]);
1015            }
1016            if !had_required && self.flags[index] != 0 {
1017                self.required_identity_count += 1;
1018            }
1019            return;
1020        }
1021
1022        let index = self.fields.len();
1023        self.field_lookup.insert(field.to_vec(), index);
1024        self.fields.push(field.to_vec());
1025        self.flags.push(flags);
1026        self.last_seen_row_ids.push(0);
1027        self.unset_counts.push(0);
1028        self.values_by_field.push(Vec::new());
1029        self.field_histogram_unset_buckets
1030            .push((flags & FACET_HISTOGRAM != 0).then(|| vec![0; self.histogram_bucket_count]));
1031        if flags != 0 {
1032            self.required_identity_count += 1;
1033        }
1034    }
1035
1036    fn add_value(
1037        &mut self,
1038        field_index: usize,
1039        _data_offset: NonZeroU64,
1040        value: &[u8],
1041        fts_matches: bool,
1042    ) -> usize {
1043        let value_index = self.value_counts.len();
1044        let flags = self.flags[field_index];
1045        self.value_counts.push(0);
1046        self.value_field_indices.push(field_index);
1047        self.value_labels.push(value.to_vec());
1048        self.value_fts_matches.push(fts_matches);
1049        self.value_source_realtime
1050            .push(if flags & FACET_SOURCE_REALTIME != 0 {
1051                parse_source_realtime(value)
1052            } else {
1053                None
1054            });
1055        self.value_histogram_buckets
1056            .push((flags & FACET_HISTOGRAM != 0).then(|| vec![0; self.histogram_bucket_count]));
1057        self.values_by_field[field_index].push(value_index);
1058        value_index
1059    }
1060
1061    fn mark_field_seen(&mut self, field_index: usize, row_id: u64) -> bool {
1062        // Duplicate values for one field must not satisfy another required
1063        // field identity in first-value mode.
1064        if self.last_seen_row_ids[field_index] == row_id {
1065            return false;
1066        }
1067        self.last_seen_row_ids[field_index] = row_id;
1068        true
1069    }
1070
1071    fn apply_value(
1072        &mut self,
1073        value_index: usize,
1074        realtime_usec: Option<u64>,
1075        stats: &mut ExplorerStats,
1076    ) {
1077        let field_index = self.value_field_indices[value_index];
1078        let flags = self.flags[field_index];
1079        if flags & FACET_PUBLIC != 0 {
1080            self.value_counts[value_index] = self.value_counts[value_index].saturating_add(1);
1081            stats.facet_updates = stats.facet_updates.saturating_add(1);
1082        }
1083        if flags & FACET_HISTOGRAM != 0 {
1084            if let (Some(realtime_usec), Some(buckets)) = (
1085                realtime_usec,
1086                self.value_histogram_buckets[value_index].as_mut(),
1087            ) {
1088                if let Some(bucket_index) = histogram_bucket_index_from_bounds(
1089                    realtime_usec,
1090                    self.histogram_start_realtime_usec,
1091                    self.histogram_bucket_width_usec,
1092                    buckets.len(),
1093                ) {
1094                    buckets[bucket_index] = buckets[bucket_index].saturating_add(1);
1095                    stats.histogram_updates = stats.histogram_updates.saturating_add(1);
1096                }
1097            }
1098        }
1099    }
1100
1101    fn finish_facet_row(&mut self, row_id: u64, stats: &mut ExplorerStats) {
1102        for field_index in 0..self.fields.len() {
1103            if self.flags[field_index] & FACET_PUBLIC == 0 {
1104                continue;
1105            }
1106            if self.last_seen_row_ids[field_index] != row_id {
1107                self.unset_counts[field_index] = self.unset_counts[field_index].saturating_add(1);
1108                stats.facet_updates = stats.facet_updates.saturating_add(1);
1109            }
1110        }
1111    }
1112
1113    fn finish_histogram_row(&mut self, row_id: u64, realtime_usec: u64, stats: &mut ExplorerStats) {
1114        for field_index in 0..self.fields.len() {
1115            if self.flags[field_index] & FACET_HISTOGRAM == 0 {
1116                continue;
1117            }
1118            if self.last_seen_row_ids[field_index] == row_id {
1119                continue;
1120            }
1121            let Some(buckets) = self.field_histogram_unset_buckets[field_index].as_mut() else {
1122                continue;
1123            };
1124            if let Some(bucket_index) = histogram_bucket_index_from_bounds(
1125                realtime_usec,
1126                self.histogram_start_realtime_usec,
1127                self.histogram_bucket_width_usec,
1128                buckets.len(),
1129            ) {
1130                buckets[bucket_index] = buckets[bucket_index].saturating_add(1);
1131                stats.histogram_updates = stats.histogram_updates.saturating_add(1);
1132            }
1133        }
1134    }
1135
1136    fn finish_facets(&self, result: &mut ExplorerResult) {
1137        for field_index in 0..self.fields.len() {
1138            if self.flags[field_index] & FACET_PUBLIC == 0 {
1139                continue;
1140            }
1141            let mut values = HashMap::new();
1142            for value_index in &self.values_by_field[field_index] {
1143                let count = self.value_counts[*value_index];
1144                if count != 0 {
1145                    increment_counter_by(&mut values, &self.value_labels[*value_index], count);
1146                }
1147            }
1148            if self.unset_counts[field_index] != 0 {
1149                increment_counter_by(&mut values, UNSET_VALUE, self.unset_counts[field_index]);
1150            }
1151            result
1152                .facets
1153                .insert(self.fields[field_index].clone(), values);
1154        }
1155    }
1156
1157    fn finish_histogram(&self, histogram: Option<&mut ExplorerHistogram>) {
1158        let Some(histogram) = histogram else {
1159            return;
1160        };
1161        for buckets in self.field_histogram_unset_buckets.iter().flatten() {
1162            for (bucket_index, count) in buckets.iter().enumerate() {
1163                if *count == 0 {
1164                    continue;
1165                }
1166                if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
1167                    increment_counter_by(&mut bucket.values, UNSET_VALUE, *count);
1168                }
1169            }
1170        }
1171        for value_index in 0..self.value_histogram_buckets.len() {
1172            let Some(buckets) = &self.value_histogram_buckets[value_index] else {
1173                continue;
1174            };
1175            for (bucket_index, count) in buckets.iter().enumerate() {
1176                if *count == 0 {
1177                    continue;
1178                }
1179                if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
1180                    increment_counter_by(
1181                        &mut bucket.values,
1182                        &self.value_labels[value_index],
1183                        *count,
1184                    );
1185                }
1186            }
1187        }
1188    }
1189}
1190
1191fn bounded_positive_proportion(value: f64) -> Option<f64> {
1192    if value <= 0.0 || !value.is_finite() {
1193        return None;
1194    }
1195    Some(value.min(1.0))
1196}
1197
1198impl FileReader {
1199    pub fn explore(&mut self, query: &ExplorerQuery) -> Result<ExplorerResult> {
1200        self.explore_with_strategy(query, ExplorerStrategy::Traversal)
1201    }
1202
1203    pub fn explore_with_strategy(
1204        &mut self,
1205        query: &ExplorerQuery,
1206        strategy: ExplorerStrategy,
1207    ) -> Result<ExplorerResult> {
1208        self.explore_with_strategy_and_payload_mode(query, strategy, ExplorerRowPayloadMode::Expand)
1209    }
1210
1211    pub fn explore_with_strategy_and_control(
1212        &mut self,
1213        query: &ExplorerQuery,
1214        strategy: ExplorerStrategy,
1215        control: &mut ExplorerControl<'_>,
1216    ) -> Result<ExplorerResult> {
1217        validate_no_debug_column_collection(query)?;
1218        self.explore_with_strategy_and_payload_mode_unchecked(
1219            query,
1220            strategy,
1221            ExplorerRowPayloadMode::Expand,
1222            Some(control),
1223        )
1224    }
1225
1226    #[cfg(test)]
1227    pub(crate) fn explore_with_strategy_cursor_rows(
1228        &mut self,
1229        query: &ExplorerQuery,
1230        strategy: ExplorerStrategy,
1231    ) -> Result<ExplorerResult> {
1232        self.explore_with_strategy_and_payload_mode(
1233            query,
1234            strategy,
1235            ExplorerRowPayloadMode::CursorOnly,
1236        )
1237    }
1238
1239    pub(crate) fn explore_with_strategy_cursor_rows_controlled(
1240        &mut self,
1241        query: &ExplorerQuery,
1242        strategy: ExplorerStrategy,
1243        control: &mut ExplorerControl<'_>,
1244    ) -> Result<ExplorerResult> {
1245        validate_no_debug_column_collection(query)?;
1246        self.explore_with_strategy_and_payload_mode_unchecked(
1247            query,
1248            strategy,
1249            ExplorerRowPayloadMode::CursorOnly,
1250            Some(control),
1251        )
1252    }
1253
1254    fn explore_with_strategy_and_payload_mode(
1255        &mut self,
1256        query: &ExplorerQuery,
1257        strategy: ExplorerStrategy,
1258        row_payload_mode: ExplorerRowPayloadMode,
1259    ) -> Result<ExplorerResult> {
1260        validate_no_debug_column_collection(query)?;
1261        self.explore_with_strategy_and_payload_mode_unchecked(
1262            query,
1263            strategy,
1264            row_payload_mode,
1265            None,
1266        )
1267    }
1268
1269    fn explore_with_strategy_and_payload_mode_unchecked(
1270        &mut self,
1271        query: &ExplorerQuery,
1272        strategy: ExplorerStrategy,
1273        row_payload_mode: ExplorerRowPayloadMode,
1274        mut control: Option<&mut ExplorerControl<'_>>,
1275    ) -> Result<ExplorerResult> {
1276        match strategy {
1277            ExplorerStrategy::Traversal => {
1278                self.explore_traversal(query, row_payload_mode, control.as_deref_mut())
1279            }
1280            ExplorerStrategy::Index => {
1281                self.explore_indexed(query, row_payload_mode, control.as_deref_mut())
1282            }
1283            ExplorerStrategy::Compare => self.explore_compare(query, row_payload_mode),
1284        }
1285    }
1286
1287    fn explore_traversal(
1288        &mut self,
1289        query: &ExplorerQuery,
1290        row_payload_mode: ExplorerRowPayloadMode,
1291        mut control: Option<&mut ExplorerControl<'_>>,
1292    ) -> Result<ExplorerResult> {
1293        validate_query(query)?;
1294        let mut result = explorer_result_for_query(query);
1295        let facet_groups = facet_pass_groups(query);
1296        if can_run_combined_explorer_pass(&facet_groups) {
1297            self.explore_traversal_combined(
1298                query,
1299                row_payload_mode,
1300                &mut result,
1301                &facet_groups,
1302                control.as_deref_mut(),
1303            )?;
1304        } else {
1305            self.explore_traversal_split(
1306                query,
1307                row_payload_mode,
1308                &mut result,
1309                facet_groups,
1310                control.as_deref_mut(),
1311            )?;
1312        }
1313        self.configure_explorer_filters(query, None)?;
1314        Ok(result)
1315    }
1316
1317    fn explore_traversal_combined(
1318        &mut self,
1319        query: &ExplorerQuery,
1320        row_payload_mode: ExplorerRowPayloadMode,
1321        result: &mut ExplorerResult,
1322        facet_groups: &[FacetPassGroup],
1323        control: Option<&mut ExplorerControl<'_>>,
1324    ) -> Result<()> {
1325        let facet_indices = combined_facet_indices(facet_groups);
1326        if query_needs_main_pass(query) || !facet_indices.is_empty() {
1327            self.configure_explorer_filters(query, None)?;
1328            let mut accumulator =
1329                ExplorerAccumulator::for_combined(query, &facet_indices, result.histogram.as_ref());
1330            self.scan_explorer_combined(
1331                query,
1332                &mut accumulator,
1333                result,
1334                !facet_indices.is_empty(),
1335                row_payload_mode,
1336                control,
1337            )?;
1338            accumulator.finish_facets(result);
1339            accumulator.finish_histogram(result.histogram.as_mut());
1340        }
1341        Ok(())
1342    }
1343
1344    fn explore_traversal_split(
1345        &mut self,
1346        query: &ExplorerQuery,
1347        row_payload_mode: ExplorerRowPayloadMode,
1348        result: &mut ExplorerResult,
1349        facet_groups: Vec<FacetPassGroup>,
1350        mut control: Option<&mut ExplorerControl<'_>>,
1351    ) -> Result<()> {
1352        if query_needs_main_pass(query) {
1353            self.configure_explorer_filters(query, None)?;
1354            let mut accumulator = ExplorerAccumulator::for_main(query, result.histogram.as_ref());
1355            self.scan_explorer_main(
1356                query,
1357                &mut accumulator,
1358                result,
1359                row_payload_mode,
1360                control.as_deref_mut(),
1361            )?;
1362            accumulator.finish_histogram(result.histogram.as_mut());
1363        }
1364
1365        for group in facet_groups {
1366            if explorer_control_stopped(control.as_deref()) {
1367                break;
1368            }
1369            self.configure_explorer_filters(query, group.excluded_field.as_deref())?;
1370            let mut accumulator = ExplorerAccumulator::for_facets(
1371                query,
1372                &group.facet_indices,
1373                facet_pass_needs_source_realtime(query),
1374            );
1375            self.scan_explorer_facet(
1376                query,
1377                &mut accumulator,
1378                &mut result.stats,
1379                control.as_deref_mut(),
1380            )?;
1381            accumulator.finish_facets(result);
1382        }
1383        Ok(())
1384    }
1385
1386    fn explore_compare(
1387        &mut self,
1388        query: &ExplorerQuery,
1389        row_payload_mode: ExplorerRowPayloadMode,
1390    ) -> Result<ExplorerResult> {
1391        let traversal_started = Instant::now();
1392        let traversal = self.explore_traversal(query, row_payload_mode, None)?;
1393        let traversal_duration = traversal_started.elapsed();
1394
1395        let index_started = Instant::now();
1396        let mut indexed = self.explore_indexed(query, row_payload_mode, None)?;
1397        let index_duration = index_started.elapsed();
1398
1399        if !explorer_outputs_match(&traversal, &indexed) {
1400            return Err(SdkError::VerificationError(
1401                "indexed explorer output differs from traversal explorer output".to_string(),
1402            ));
1403        }
1404        indexed.comparison = Some(ExplorerComparison {
1405            traversal_duration,
1406            index_duration,
1407            traversal_stats: traversal.stats,
1408            index_stats: indexed.stats.clone(),
1409        });
1410        Ok(indexed)
1411    }
1412
1413    fn explore_indexed(
1414        &mut self,
1415        query: &ExplorerQuery,
1416        row_payload_mode: ExplorerRowPayloadMode,
1417        mut control: Option<&mut ExplorerControl<'_>>,
1418    ) -> Result<ExplorerResult> {
1419        validate_query(query)?;
1420        validate_indexed_query(query)?;
1421        let mut result = explorer_result_for_query(query);
1422        self.indexed_collect_rows(query, row_payload_mode, &mut result, control.as_deref_mut())?;
1423        self.indexed_collect_facets(query, &mut result, control.as_deref())?;
1424        self.indexed_collect_histogram(query, &mut result, control.as_deref())?;
1425        self.configure_explorer_filters(query, None)?;
1426        Ok(result)
1427    }
1428
1429    fn indexed_collect_rows(
1430        &mut self,
1431        query: &ExplorerQuery,
1432        row_payload_mode: ExplorerRowPayloadMode,
1433        result: &mut ExplorerResult,
1434        control: Option<&mut ExplorerControl<'_>>,
1435    ) -> Result<()> {
1436        if query.limit == 0 {
1437            return Ok(());
1438        }
1439        let mut row_query = query.clone();
1440        row_query.facets.clear();
1441        row_query.histogram = None;
1442        self.configure_explorer_filters(&row_query, None)?;
1443        let mut accumulator = ExplorerAccumulator::for_main(&row_query, None);
1444        self.scan_explorer_main(
1445            &row_query,
1446            &mut accumulator,
1447            result,
1448            row_payload_mode,
1449            control,
1450        )
1451    }
1452
1453    fn indexed_collect_facets(
1454        &mut self,
1455        query: &ExplorerQuery,
1456        result: &mut ExplorerResult,
1457        control: Option<&ExplorerControl<'_>>,
1458    ) -> Result<()> {
1459        if explorer_control_stopped(control) {
1460            return Ok(());
1461        }
1462        for group in facet_pass_groups(query) {
1463            let candidates = self.indexed_candidate_set(query, group.excluded_field.as_deref())?;
1464            self.inner.with_file(|file| {
1465                indexed_count_facet_group(file, query, &group, &candidates, result)
1466            })?;
1467        }
1468        Ok(())
1469    }
1470
1471    fn indexed_collect_histogram(
1472        &mut self,
1473        query: &ExplorerQuery,
1474        result: &mut ExplorerResult,
1475        control: Option<&ExplorerControl<'_>>,
1476    ) -> Result<()> {
1477        if query.histogram.is_none() || explorer_control_stopped(control) {
1478            return Ok(());
1479        }
1480        let candidates = self.indexed_candidate_set(query, None)?;
1481        self.inner
1482            .with_file(|file| indexed_count_histogram(file, query, &candidates, result))
1483    }
1484
1485    fn indexed_candidate_set(
1486        &mut self,
1487        query: &ExplorerQuery,
1488        excluded_field: Option<&[u8]>,
1489    ) -> Result<IndexedCandidateSet> {
1490        if query.filters.is_empty()
1491            && query.after_realtime_usec.is_none()
1492            && query.before_realtime_usec.is_none()
1493        {
1494            let count = self
1495                .inner
1496                .with_file(|file| file.journal_header_ref().n_entries);
1497            return Ok(IndexedCandidateSet::All { count });
1498        }
1499
1500        self.configure_explorer_filters(query, excluded_field)?;
1501        self.seek_for_explorer(query);
1502        let mut offsets = HashSet::new();
1503        while self.step_for_explorer(query.direction)? {
1504            let Some(metadata) = self.row.metadata() else {
1505                continue;
1506            };
1507            let commit_realtime = metadata.realtime;
1508            if stop_by_commit_time(query, commit_realtime) {
1509                break;
1510            }
1511            if !timestamp_in_range(query, commit_realtime) {
1512                continue;
1513            }
1514            if let Some(entry_offset) = self.row.entry_offset() {
1515                offsets.insert(entry_offset);
1516            }
1517        }
1518        Ok(IndexedCandidateSet::Set {
1519            count: offsets.len() as u64,
1520            offsets,
1521        })
1522    }
1523
1524    fn configure_explorer_filters(
1525        &mut self,
1526        query: &ExplorerQuery,
1527        excluded_field: Option<&[u8]>,
1528    ) -> Result<()> {
1529        self.flush_matches();
1530        for filter in &query.filters {
1531            if excluded_field.is_some_and(|field| field == filter.field.as_slice()) {
1532                continue;
1533            }
1534            if filter.values.is_empty() {
1535                continue;
1536            }
1537            for value in &filter.values {
1538                let payload = payload_from_parts(&filter.field, value);
1539                self.add_match(&payload);
1540            }
1541        }
1542        Ok(())
1543    }
1544
1545    fn next_explorer_row_frame(
1546        &mut self,
1547        query: &ExplorerQuery,
1548        rows_seen: &mut u64,
1549        stats: &ExplorerStats,
1550        control: Option<&mut ExplorerControl<'_>>,
1551    ) -> Result<ExplorerLoopStep> {
1552        if !self.step_for_explorer(query.direction)? {
1553            return Ok(ExplorerLoopStep::Stop);
1554        }
1555        *rows_seen = rows_seen.saturating_add(1);
1556        if control.is_some_and(|control| control.should_stop_after_rows(*rows_seen, stats)) {
1557            return Ok(ExplorerLoopStep::Stop);
1558        }
1559        let Some(metadata) = self.row.metadata() else {
1560            return Ok(ExplorerLoopStep::Skip);
1561        };
1562        if stop_by_commit_time(query, metadata.realtime) {
1563            return Ok(ExplorerLoopStep::Stop);
1564        }
1565        if skip_by_commit_time(query, metadata.realtime) {
1566            return Ok(ExplorerLoopStep::Skip);
1567        }
1568        Ok(ExplorerLoopStep::Row(ExplorerRowFrame {
1569            commit_realtime: metadata.realtime,
1570            seqnum: metadata.seqnum,
1571        }))
1572    }
1573
1574    fn scan_row_data_or_default(
1575        &mut self,
1576        query: &ExplorerQuery,
1577        accumulator: &mut ExplorerAccumulator,
1578        row_id: &mut u64,
1579        deferred_values: &mut Vec<usize>,
1580        stats: &mut ExplorerStats,
1581    ) -> Result<RowScan> {
1582        if accumulator.required_identity_count == 0 && !query_has_fts(query) {
1583            stats.rows_examined = stats.rows_examined.saturating_add(1);
1584            return Ok(RowScan::default());
1585        }
1586        *row_id = row_id.saturating_add(1);
1587        deferred_values.clear();
1588        self.scan_current_row(
1589            query,
1590            accumulator,
1591            *row_id,
1592            ScanApply::Deferred(deferred_values),
1593            stats,
1594        )
1595    }
1596
1597    fn accepted_effective_realtime(
1598        query: &ExplorerQuery,
1599        scan: &RowScan,
1600        commit_realtime: u64,
1601        stats: &mut ExplorerStats,
1602        control: Option<&mut ExplorerControl<'_>>,
1603    ) -> Option<u64> {
1604        let mut effective_realtime = effective_realtime_from_scan(scan.timestamp, commit_realtime);
1605        record_source_realtime_delta(stats, scan.timestamp, commit_realtime);
1606        if let Some(control) = control {
1607            effective_realtime = control.adjust_realtime(effective_realtime);
1608        }
1609        (timestamp_in_range(query, effective_realtime) && !row_rejected_by_fts(query, scan))
1610            .then_some(effective_realtime)
1611    }
1612
1613    fn push_explorer_row_if_wanted(
1614        &mut self,
1615        query: &ExplorerQuery,
1616        result: &mut ExplorerResult,
1617        row_payload_mode: ExplorerRowPayloadMode,
1618        effective_realtime: u64,
1619    ) -> Result<()> {
1620        if row_within_anchor(query, effective_realtime) && result.rows.len() < query.limit {
1621            result.rows.push(self.current_explorer_row(
1622                effective_realtime,
1623                &mut result.stats,
1624                row_payload_mode,
1625            )?);
1626        }
1627        Ok(())
1628    }
1629
1630    fn apply_main_scanned_row(
1631        &mut self,
1632        query: &ExplorerQuery,
1633        accumulator: &mut ExplorerAccumulator,
1634        result: &mut ExplorerResult,
1635        row_payload_mode: ExplorerRowPayloadMode,
1636        scanned: MainScannedRow<'_>,
1637        control: Option<&mut ExplorerControl<'_>>,
1638    ) -> Result<bool> {
1639        if query.debug_collect_column_fields_by_row_traversal {
1640            result.column_fields.extend(scanned.scan.column_fields);
1641        }
1642        record_last_realtime(&mut result.stats, scanned.commit_realtime);
1643        result.stats.rows_matched = result.stats.rows_matched.saturating_add(1);
1644        let stop_after_matched_row = control.is_some_and(|control| {
1645            control.emit_matched_row(scanned.effective_realtime, result.stats.rows_matched)
1646        });
1647        for value_index in scanned.deferred_values {
1648            accumulator.apply_value(
1649                *value_index,
1650                Some(scanned.effective_realtime),
1651                &mut result.stats,
1652            );
1653        }
1654        accumulator.finish_histogram_row(
1655            scanned.row_id,
1656            scanned.effective_realtime,
1657            &mut result.stats,
1658        );
1659        self.push_explorer_row_if_wanted(
1660            query,
1661            result,
1662            row_payload_mode,
1663            scanned.effective_realtime,
1664        )?;
1665        Ok(stop_after_matched_row
1666            || should_stop_when_rows_full(
1667                query,
1668                &result.rows,
1669                scanned.effective_realtime,
1670                result.stats.rows_matched,
1671            ))
1672    }
1673
1674    fn sampling_state_for_combined(
1675        query: &ExplorerQuery,
1676        result: &ExplorerResult,
1677        control: Option<&mut ExplorerControl<'_>>,
1678    ) -> Option<ExplorerSamplingState> {
1679        let sampling = ExplorerSamplingState::for_query(
1680            query,
1681            result
1682                .histogram
1683                .as_ref()
1684                .map(|histogram| histogram.buckets.len()),
1685        );
1686        if let Some(control) = control {
1687            if let (Some(shared_sampling), Some(file_sampling)) =
1688                (control.sampling.as_deref_mut(), query.sampling)
1689            {
1690                shared_sampling.begin_file(file_sampling);
1691            }
1692        }
1693        sampling
1694    }
1695
1696    fn combined_sampling_decision(
1697        query: &ExplorerQuery,
1698        rows: &[ExplorerRow],
1699        frame: ExplorerRowFrame,
1700        sampling: &mut Option<ExplorerSamplingState>,
1701        mut control: Option<&mut ExplorerControl<'_>>,
1702    ) -> Option<ExplorerSamplingDecision> {
1703        let candidate_to_keep = if let Some(control) = control.as_deref_mut() {
1704            control.candidate_row.as_deref_mut().map_or_else(
1705                || row_candidate_to_keep(query, rows, frame.commit_realtime),
1706                |candidate_row| candidate_row(frame.commit_realtime),
1707            )
1708        } else {
1709            row_candidate_to_keep(query, rows, frame.commit_realtime)
1710        };
1711        if let Some(control) = control {
1712            if let Some(shared_sampling) = control.sampling.as_deref_mut() {
1713                return Some(shared_sampling.decide(
1714                    frame.commit_realtime,
1715                    frame.seqnum,
1716                    candidate_to_keep,
1717                ));
1718            }
1719        }
1720        sampling
1721            .as_mut()
1722            .map(|sampling| sampling.decide(frame.commit_realtime, frame.seqnum, candidate_to_keep))
1723    }
1724
1725    fn apply_combined_sampling_decision(
1726        decision: ExplorerSamplingDecision,
1727        mode: CombinedScanMode,
1728        result: &mut ExplorerResult,
1729        frame: ExplorerRowFrame,
1730    ) -> SamplingRowAction {
1731        match decision {
1732            ExplorerSamplingDecision::Full { sampled } => {
1733                if sampled {
1734                    result.stats.sampling_sampled = result.stats.sampling_sampled.saturating_add(1);
1735                }
1736                SamplingRowAction::Scan
1737            }
1738            ExplorerSamplingDecision::SkipFields => {
1739                record_combined_unsampled_row(
1740                    &mut result.stats,
1741                    mode,
1742                    frame.commit_realtime,
1743                    1,
1744                    true,
1745                );
1746                add_special_histogram_value(
1747                    result.histogram.as_mut(),
1748                    frame.commit_realtime,
1749                    EXPLORER_UNSAMPLED_VALUE,
1750                    1,
1751                    &mut result.stats,
1752                );
1753                SamplingRowAction::Skip
1754            }
1755            ExplorerSamplingDecision::StopAndEstimate {
1756                remaining_rows,
1757                from_realtime_usec,
1758                to_realtime_usec,
1759            } => {
1760                record_combined_unsampled_row(
1761                    &mut result.stats,
1762                    mode,
1763                    frame.commit_realtime,
1764                    remaining_rows,
1765                    false,
1766                );
1767                result.stats.rows_estimated =
1768                    result.stats.rows_estimated.saturating_add(remaining_rows);
1769                result.stats.sampling_estimated = result
1770                    .stats
1771                    .sampling_estimated
1772                    .saturating_add(remaining_rows);
1773                add_estimated_histogram_range(
1774                    result.histogram.as_mut(),
1775                    from_realtime_usec,
1776                    to_realtime_usec,
1777                    remaining_rows,
1778                    &mut result.stats,
1779                );
1780                SamplingRowAction::Stop
1781            }
1782        }
1783    }
1784
1785    fn apply_combined_scanned_row(
1786        &mut self,
1787        query: &ExplorerQuery,
1788        accumulator: &mut ExplorerAccumulator,
1789        result: &mut ExplorerResult,
1790        row_payload_mode: ExplorerRowPayloadMode,
1791        mode: CombinedScanMode,
1792        scanned: MainScannedRow<'_>,
1793        control: Option<&mut ExplorerControl<'_>>,
1794    ) -> Result<bool> {
1795        if query.debug_collect_column_fields_by_row_traversal {
1796            result.column_fields.extend(scanned.scan.column_fields);
1797        }
1798        record_last_realtime(&mut result.stats, scanned.commit_realtime);
1799        let stop_after_matched_row = update_combined_matched_stats(
1800            &mut result.stats,
1801            mode,
1802            scanned.effective_realtime,
1803            control,
1804        );
1805        let value_realtime = query
1806            .histogram
1807            .is_some()
1808            .then_some(scanned.effective_realtime);
1809        for value_index in scanned.deferred_values {
1810            accumulator.apply_value(*value_index, value_realtime, &mut result.stats);
1811        }
1812        if query.histogram.is_some() {
1813            accumulator.finish_histogram_row(
1814                scanned.row_id,
1815                scanned.effective_realtime,
1816                &mut result.stats,
1817            );
1818        }
1819        if mode.include_facets {
1820            accumulator.finish_facet_row(scanned.row_id, &mut result.stats);
1821        }
1822        self.push_explorer_row_if_wanted(
1823            query,
1824            result,
1825            row_payload_mode,
1826            scanned.effective_realtime,
1827        )?;
1828        Ok(stop_after_matched_row
1829            || should_stop_when_rows_full(
1830                query,
1831                &result.rows,
1832                scanned.effective_realtime,
1833                result.stats.rows_matched,
1834            ))
1835    }
1836
1837    fn scan_explorer_main(
1838        &mut self,
1839        query: &ExplorerQuery,
1840        accumulator: &mut ExplorerAccumulator,
1841        result: &mut ExplorerResult,
1842        row_payload_mode: ExplorerRowPayloadMode,
1843        mut control: Option<&mut ExplorerControl<'_>>,
1844    ) -> Result<()> {
1845        self.seek_for_explorer(query);
1846        let mut row_id = 0u64;
1847        let mut rows_seen = 0u64;
1848        let mut deferred_values = Vec::new();
1849        loop {
1850            let frame = match self.next_explorer_row_frame(
1851                query,
1852                &mut rows_seen,
1853                &result.stats,
1854                control.as_deref_mut(),
1855            )? {
1856                ExplorerLoopStep::Stop => break,
1857                ExplorerLoopStep::Skip => continue,
1858                ExplorerLoopStep::Row(frame) => frame,
1859            };
1860            let scan = self.scan_row_data_or_default(
1861                query,
1862                accumulator,
1863                &mut row_id,
1864                &mut deferred_values,
1865                &mut result.stats,
1866            )?;
1867            let Some(effective_realtime) = Self::accepted_effective_realtime(
1868                query,
1869                &scan,
1870                frame.commit_realtime,
1871                &mut result.stats,
1872                control.as_deref_mut(),
1873            ) else {
1874                continue;
1875            };
1876            let scanned = MainScannedRow {
1877                row_id,
1878                commit_realtime: frame.commit_realtime,
1879                effective_realtime,
1880                scan,
1881                deferred_values: &deferred_values,
1882            };
1883            if self.apply_main_scanned_row(
1884                query,
1885                accumulator,
1886                result,
1887                row_payload_mode,
1888                scanned,
1889                control.as_deref_mut(),
1890            )? {
1891                break;
1892            }
1893        }
1894        result.stats.rows_returned = result.rows.len() as u64;
1895        Ok(())
1896    }
1897
1898    fn scan_explorer_combined(
1899        &mut self,
1900        query: &ExplorerQuery,
1901        accumulator: &mut ExplorerAccumulator,
1902        result: &mut ExplorerResult,
1903        include_facets: bool,
1904        row_payload_mode: ExplorerRowPayloadMode,
1905        mut control: Option<&mut ExplorerControl<'_>>,
1906    ) -> Result<()> {
1907        self.seek_for_explorer(query);
1908        let mode = CombinedScanMode {
1909            include_main: query_needs_main_pass(query),
1910            include_facets,
1911        };
1912        let mut row_id = 0u64;
1913        let mut rows_seen = 0u64;
1914        let mut deferred_values = Vec::new();
1915        let mut sampling = Self::sampling_state_for_combined(query, result, control.as_deref_mut());
1916        loop {
1917            let frame = match self.next_explorer_row_frame(
1918                query,
1919                &mut rows_seen,
1920                &result.stats,
1921                control.as_deref_mut(),
1922            )? {
1923                ExplorerLoopStep::Stop => break,
1924                ExplorerLoopStep::Skip => continue,
1925                ExplorerLoopStep::Row(frame) => frame,
1926            };
1927            if let Some(decision) = Self::combined_sampling_decision(
1928                query,
1929                &result.rows,
1930                frame,
1931                &mut sampling,
1932                control.as_deref_mut(),
1933            ) {
1934                match Self::apply_combined_sampling_decision(decision, mode, result, frame) {
1935                    SamplingRowAction::Scan => {}
1936                    SamplingRowAction::Skip => continue,
1937                    SamplingRowAction::Stop => break,
1938                }
1939            }
1940            let scan = self.scan_row_data_or_default(
1941                query,
1942                accumulator,
1943                &mut row_id,
1944                &mut deferred_values,
1945                &mut result.stats,
1946            )?;
1947            let Some(effective_realtime) = Self::accepted_effective_realtime(
1948                query,
1949                &scan,
1950                frame.commit_realtime,
1951                &mut result.stats,
1952                control.as_deref_mut(),
1953            ) else {
1954                continue;
1955            };
1956            let scanned = MainScannedRow {
1957                row_id,
1958                commit_realtime: frame.commit_realtime,
1959                effective_realtime,
1960                scan,
1961                deferred_values: &deferred_values,
1962            };
1963            if self.apply_combined_scanned_row(
1964                query,
1965                accumulator,
1966                result,
1967                row_payload_mode,
1968                mode,
1969                scanned,
1970                control.as_deref_mut(),
1971            )? {
1972                break;
1973            }
1974        }
1975        result.stats.rows_returned = result.rows.len() as u64;
1976        Ok(())
1977    }
1978
1979    fn scan_explorer_facet(
1980        &mut self,
1981        query: &ExplorerQuery,
1982        accumulator: &mut ExplorerAccumulator,
1983        stats: &mut ExplorerStats,
1984        mut control: Option<&mut ExplorerControl<'_>>,
1985    ) -> Result<()> {
1986        self.seek_for_explorer(query);
1987        let defer_apply = query.after_realtime_usec.is_some()
1988            || query.before_realtime_usec.is_some()
1989            || query_has_fts(query);
1990        let mut row_id = 0u64;
1991        let mut rows_seen = 0u64;
1992        let mut deferred_values = Vec::new();
1993        loop {
1994            let frame = match self.next_explorer_row_frame(
1995                query,
1996                &mut rows_seen,
1997                stats,
1998                control.as_deref_mut(),
1999            )? {
2000                ExplorerLoopStep::Stop => break,
2001                ExplorerLoopStep::Skip => continue,
2002                ExplorerLoopStep::Row(frame) => frame,
2003            };
2004            row_id = row_id.saturating_add(1);
2005            deferred_values.clear();
2006            let scan = if defer_apply {
2007                self.scan_current_row(
2008                    query,
2009                    accumulator,
2010                    row_id,
2011                    ScanApply::Deferred(&mut deferred_values),
2012                    stats,
2013                )?
2014            } else {
2015                self.scan_current_row(query, accumulator, row_id, ScanApply::Immediate, stats)?
2016            };
2017            if Self::accepted_effective_realtime(query, &scan, frame.commit_realtime, stats, None)
2018                .is_none()
2019            {
2020                continue;
2021            }
2022            record_last_realtime(stats, frame.commit_realtime);
2023            stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(1);
2024            if defer_apply {
2025                for value_index in &deferred_values {
2026                    accumulator.apply_value(*value_index, None, stats);
2027                }
2028            }
2029            accumulator.finish_facet_row(row_id, stats);
2030        }
2031        Ok(())
2032    }
2033
2034    fn scan_current_row(
2035        &mut self,
2036        query: &ExplorerQuery,
2037        accumulator: &mut ExplorerAccumulator,
2038        row_id: u64,
2039        mut apply: ScanApply<'_>,
2040        stats: &mut ExplorerStats,
2041    ) -> Result<RowScan> {
2042        stats.rows_examined = stats.rows_examined.saturating_add(1);
2043        let mut out = RowScan::default();
2044        let mut state = RowScanState::new(query, accumulator);
2045
2046        let inner = &mut self.inner;
2047        let row = &mut self.row;
2048        inner.with_mut(|fields| {
2049            fields.reader.release_object_guards();
2050            row.restart_data()?;
2051            let result = (|| {
2052                for index in 0..row.data_offset_count() {
2053                    let Some(data_offset) = row.data_offset_at(index) else {
2054                        break;
2055                    };
2056                    stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2057                    let class = classify_data_for_accumulator(
2058                        fields.file,
2059                        row,
2060                        data_offset,
2061                        accumulator,
2062                        state.needs_fts,
2063                        query,
2064                        query
2065                            .debug_collect_column_fields_by_row_traversal
2066                            .then_some(&mut out.column_fields),
2067                        stats,
2068                    )?;
2069
2070                    handle_row_offset_class(
2071                        class,
2072                        accumulator,
2073                        row_id,
2074                        &mut state,
2075                        &mut out,
2076                        &mut apply,
2077                        stats,
2078                    );
2079                    if state.should_stop_row_scan() {
2080                        record_row_scan_early_stop(stats);
2081                        break;
2082                    }
2083                }
2084                Ok::<_, SdkError>(())
2085            })();
2086            row.reset_data_state(fields.file)?;
2087            result
2088        })?;
2089        Ok(out)
2090    }
2091
2092    fn seek_for_explorer(&mut self, query: &ExplorerQuery) {
2093        let anchor = if query.stop_when_rows_full {
2094            query.anchor
2095        } else {
2096            ExplorerAnchor::Auto
2097        };
2098        match query.direction {
2099            Direction::Forward => match anchor {
2100                ExplorerAnchor::Auto => {
2101                    if let Some(after) = query.after_realtime_usec {
2102                        self.seek_realtime(after.saturating_sub(query.realtime_slack_usec));
2103                    } else {
2104                        self.seek_head();
2105                    }
2106                }
2107                ExplorerAnchor::Realtime(usec) => self.seek_realtime(usec),
2108                ExplorerAnchor::Tail => self.seek_tail(),
2109                ExplorerAnchor::Head => {
2110                    if let Some(after) = query.after_realtime_usec {
2111                        self.seek_realtime(after.saturating_sub(query.realtime_slack_usec));
2112                    } else {
2113                        self.seek_head();
2114                    }
2115                }
2116            },
2117            Direction::Backward => match anchor {
2118                ExplorerAnchor::Auto => {
2119                    if let Some(before) = query.before_realtime_usec {
2120                        self.seek_realtime(before.saturating_add(query.realtime_slack_usec));
2121                    } else {
2122                        self.seek_tail();
2123                    }
2124                }
2125                ExplorerAnchor::Realtime(usec) => self.seek_realtime(usec),
2126                ExplorerAnchor::Head => self.seek_head(),
2127                ExplorerAnchor::Tail => {
2128                    if let Some(before) = query.before_realtime_usec {
2129                        self.seek_realtime(before.saturating_add(query.realtime_slack_usec));
2130                    } else {
2131                        self.seek_tail();
2132                    }
2133                }
2134            },
2135        }
2136    }
2137
2138    fn step_for_explorer(&mut self, direction: Direction) -> Result<bool> {
2139        match direction {
2140            Direction::Forward => self.next(),
2141            Direction::Backward => self.previous(),
2142        }
2143    }
2144
2145    fn current_explorer_row(
2146        &mut self,
2147        realtime_usec: u64,
2148        stats: &mut ExplorerStats,
2149        row_payload_mode: ExplorerRowPayloadMode,
2150    ) -> Result<ExplorerRow> {
2151        let cursor = self.get_cursor()?;
2152        let mut payloads = Vec::new();
2153        if row_payload_mode == ExplorerRowPayloadMode::Expand {
2154            self.collect_entry_payloads(&mut payloads)?;
2155            stats.returned_row_expansions = stats.returned_row_expansions.saturating_add(1);
2156        }
2157        Ok(ExplorerRow {
2158            realtime_usec,
2159            cursor,
2160            payloads,
2161        })
2162    }
2163}
2164
2165enum ScanApply<'a> {
2166    Immediate,
2167    Deferred(&'a mut Vec<usize>),
2168}
2169
2170#[derive(Debug, Clone, Copy)]
2171struct ExplorerRowFrame {
2172    commit_realtime: u64,
2173    seqnum: u64,
2174}
2175
2176enum ExplorerLoopStep {
2177    Stop,
2178    Skip,
2179    Row(ExplorerRowFrame),
2180}
2181
2182#[derive(Debug, Clone, Copy)]
2183struct CombinedScanMode {
2184    include_main: bool,
2185    include_facets: bool,
2186}
2187
2188struct MainScannedRow<'a> {
2189    row_id: u64,
2190    commit_realtime: u64,
2191    effective_realtime: u64,
2192    scan: RowScan,
2193    deferred_values: &'a [usize],
2194}
2195
2196struct RowScanState {
2197    use_first_value: bool,
2198    needs_fts: bool,
2199    collect_column_fields: bool,
2200    fields_missing_from_row: usize,
2201}
2202
2203impl RowScanState {
2204    fn new(query: &ExplorerQuery, accumulator: &ExplorerAccumulator) -> Self {
2205        let use_first_value = query.field_mode == ExplorerFieldMode::FirstValue;
2206        Self {
2207            use_first_value,
2208            needs_fts: query_has_fts(query),
2209            collect_column_fields: query.debug_collect_column_fields_by_row_traversal,
2210            fields_missing_from_row: if use_first_value {
2211                accumulator.required_identity_count
2212            } else {
2213                0
2214            },
2215        }
2216    }
2217
2218    fn should_stop_row_scan(&self) -> bool {
2219        self.use_first_value
2220            && !self.needs_fts
2221            && !self.collect_column_fields
2222            && self.fields_missing_from_row == 0
2223    }
2224}
2225
2226enum SamplingRowAction {
2227    Scan,
2228    Skip,
2229    Stop,
2230}
2231
2232enum IndexedCandidateSet {
2233    All {
2234        count: u64,
2235    },
2236    Set {
2237        count: u64,
2238        offsets: HashSet<NonZeroU64>,
2239    },
2240}
2241
2242impl IndexedCandidateSet {
2243    fn count(&self) -> u64 {
2244        match self {
2245            Self::All { count } | Self::Set { count, .. } => *count,
2246        }
2247    }
2248
2249    fn contains(&self, entry_offset: NonZeroU64) -> bool {
2250        match self {
2251            Self::All { .. } => true,
2252            Self::Set { offsets, .. } => offsets.contains(&entry_offset),
2253        }
2254    }
2255}
2256
2257struct FacetPassGroup {
2258    excluded_field: Option<Vec<u8>>,
2259    facet_indices: Vec<usize>,
2260}
2261
2262fn facet_pass_groups(query: &ExplorerQuery) -> Vec<FacetPassGroup> {
2263    let filter_fields: HashSet<&[u8]> = query
2264        .filters
2265        .iter()
2266        .map(|filter| filter.field.as_slice())
2267        .collect();
2268    let mut groups: Vec<FacetPassGroup> = Vec::new();
2269
2270    for (index, facet) in query.facets.iter().enumerate() {
2271        let excluded_field = (query.exclude_facet_field_filters
2272            && filter_fields.contains(facet.as_slice()))
2273        .then(|| facet.clone());
2274        if let Some(existing) = groups
2275            .iter_mut()
2276            .find(|group| group.excluded_field.as_deref() == excluded_field.as_deref())
2277        {
2278            existing.facet_indices.push(index);
2279        } else {
2280            groups.push(FacetPassGroup {
2281                excluded_field,
2282                facet_indices: vec![index],
2283            });
2284        }
2285    }
2286
2287    groups
2288}
2289
2290fn indexed_count_facet_group(
2291    file: &JournalFile<Mmap>,
2292    query: &ExplorerQuery,
2293    group: &FacetPassGroup,
2294    candidates: &IndexedCandidateSet,
2295    result: &mut ExplorerResult,
2296) -> Result<()> {
2297    result.stats.facet_rows_matched = result
2298        .stats
2299        .facet_rows_matched
2300        .saturating_add(candidates.count());
2301
2302    for facet_index in &group.facet_indices {
2303        let Some(field) = query.facets.get(*facet_index) else {
2304            continue;
2305        };
2306        let mut values = HashMap::new();
2307        let mut rows_with_field = HashSet::new();
2308        let mut decompressed = Vec::new();
2309
2310        for item in file.field_data_objects_with_offsets(field)? {
2311            let (_, data) = item?;
2312            let Some((value, cursor)) =
2313                indexed_value_and_cursor(&data, field, &mut decompressed, &mut result.stats)?
2314            else {
2315                continue;
2316            };
2317            drop(data);
2318
2319            let count = indexed_count_facet_entries(
2320                file,
2321                cursor,
2322                candidates,
2323                &mut rows_with_field,
2324                &mut result.stats,
2325            )?;
2326            if count == 0 {
2327                continue;
2328            }
2329            increment_counter_by(&mut values, &value, count);
2330            result.stats.facet_updates = result.stats.facet_updates.saturating_add(count);
2331        }
2332
2333        let unset = candidates
2334            .count()
2335            .saturating_sub(rows_with_field.len() as u64);
2336        if unset != 0 {
2337            increment_counter_by(&mut values, UNSET_VALUE, unset);
2338            result.stats.facet_updates = result.stats.facet_updates.saturating_add(unset);
2339        }
2340        result.facets.insert(field.clone(), values);
2341    }
2342
2343    Ok(())
2344}
2345
2346fn indexed_count_histogram(
2347    file: &JournalFile<Mmap>,
2348    query: &ExplorerQuery,
2349    candidates: &IndexedCandidateSet,
2350    result: &mut ExplorerResult,
2351) -> Result<()> {
2352    let Some(histogram) = result.histogram.as_mut() else {
2353        return Ok(());
2354    };
2355    let field = histogram.field.clone();
2356    let mut decompressed = Vec::new();
2357    let mut rows_with_field = HashSet::new();
2358
2359    for item in file.field_data_objects_with_offsets(&field)? {
2360        let (_, data) = item?;
2361        let Some((value, cursor)) =
2362            indexed_value_and_cursor(&data, &field, &mut decompressed, &mut result.stats)?
2363        else {
2364            continue;
2365        };
2366        drop(data);
2367
2368        indexed_count_histogram_entries(
2369            file,
2370            cursor,
2371            candidates,
2372            &value,
2373            histogram,
2374            query,
2375            &mut rows_with_field,
2376            &mut result.stats,
2377        )?;
2378    }
2379
2380    indexed_count_histogram_unset_entries(
2381        file,
2382        candidates,
2383        &rows_with_field,
2384        histogram,
2385        query,
2386        &mut result.stats,
2387    )?;
2388
2389    Ok(())
2390}
2391
2392fn indexed_value_and_cursor(
2393    data: &DataObject<&[u8]>,
2394    field: &[u8],
2395    decompressed: &mut Vec<u8>,
2396    stats: &mut ExplorerStats,
2397) -> Result<Option<(Vec<u8>, Option<InlinedCursor>)>> {
2398    stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2399    stats.data_payloads_loaded = stats.data_payloads_loaded.saturating_add(1);
2400    let payload = if data.is_compressed() {
2401        decompressed.clear();
2402        let len = data.decompress(decompressed)?;
2403        stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2404        &decompressed[..len]
2405    } else {
2406        data.raw_payload()
2407    };
2408
2409    let Some((payload_field, value)) = split_payload_bytes(payload) else {
2410        return Ok(None);
2411    };
2412    if payload_field != field {
2413        return Ok(None);
2414    }
2415    Ok(Some((value.to_vec(), data.inlined_cursor())))
2416}
2417
2418fn indexed_count_facet_entries(
2419    file: &JournalFile<Mmap>,
2420    cursor: Option<InlinedCursor>,
2421    candidates: &IndexedCandidateSet,
2422    rows_with_field: &mut HashSet<NonZeroU64>,
2423    stats: &mut ExplorerStats,
2424) -> Result<u64> {
2425    let mut count = 0u64;
2426    indexed_visit_entries(file, cursor, |entry_offset| {
2427        stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2428        if candidates.contains(entry_offset) {
2429            count = count.saturating_add(1);
2430            rows_with_field.insert(entry_offset);
2431        }
2432        Ok(())
2433    })?;
2434    Ok(count)
2435}
2436
2437fn indexed_count_histogram_entries(
2438    file: &JournalFile<Mmap>,
2439    cursor: Option<InlinedCursor>,
2440    candidates: &IndexedCandidateSet,
2441    value: &[u8],
2442    histogram: &mut ExplorerHistogram,
2443    query: &ExplorerQuery,
2444    rows_with_field: &mut HashSet<NonZeroU64>,
2445    stats: &mut ExplorerStats,
2446) -> Result<()> {
2447    let histogram_start = histogram
2448        .buckets
2449        .first()
2450        .map(|bucket| bucket.start_realtime_usec)
2451        .unwrap_or_default();
2452    let histogram_bucket_width = histogram
2453        .buckets
2454        .first()
2455        .map(|bucket| {
2456            bucket
2457                .end_realtime_usec
2458                .saturating_sub(bucket.start_realtime_usec)
2459                .max(1)
2460        })
2461        .unwrap_or(1);
2462    let histogram_bucket_count = histogram.buckets.len();
2463
2464    indexed_visit_entries(file, cursor, |entry_offset| {
2465        stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2466        if !candidates.contains(entry_offset) {
2467            return Ok(());
2468        }
2469        let entry = file.entry_ref(entry_offset)?;
2470        let realtime = entry.header.realtime;
2471        drop(entry);
2472        rows_with_field.insert(entry_offset);
2473        if !timestamp_in_range(query, realtime) {
2474            return Ok(());
2475        }
2476        let Some(bucket_index) = histogram_bucket_index_from_bounds(
2477            realtime,
2478            histogram_start,
2479            histogram_bucket_width,
2480            histogram_bucket_count,
2481        ) else {
2482            return Ok(());
2483        };
2484        if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
2485            increment_counter_by(&mut bucket.values, value, 1);
2486            stats.histogram_updates = stats.histogram_updates.saturating_add(1);
2487        }
2488        Ok(())
2489    })
2490}
2491
2492fn indexed_count_histogram_unset_entries(
2493    file: &JournalFile<Mmap>,
2494    candidates: &IndexedCandidateSet,
2495    rows_with_field: &HashSet<NonZeroU64>,
2496    histogram: &mut ExplorerHistogram,
2497    query: &ExplorerQuery,
2498    stats: &mut ExplorerStats,
2499) -> Result<()> {
2500    let histogram_start = histogram
2501        .buckets
2502        .first()
2503        .map(|bucket| bucket.start_realtime_usec)
2504        .unwrap_or_default();
2505    let histogram_bucket_width = histogram
2506        .buckets
2507        .first()
2508        .map(|bucket| {
2509            bucket
2510                .end_realtime_usec
2511                .saturating_sub(bucket.start_realtime_usec)
2512                .max(1)
2513        })
2514        .unwrap_or(1);
2515    let histogram_bucket_count = histogram.buckets.len();
2516
2517    let mut visit = |entry_offset: NonZeroU64| -> Result<()> {
2518        if rows_with_field.contains(&entry_offset) {
2519            return Ok(());
2520        }
2521        let entry = file.entry_ref(entry_offset)?;
2522        let realtime = entry.header.realtime;
2523        drop(entry);
2524        if !timestamp_in_range(query, realtime) {
2525            return Ok(());
2526        }
2527        let Some(bucket_index) = histogram_bucket_index_from_bounds(
2528            realtime,
2529            histogram_start,
2530            histogram_bucket_width,
2531            histogram_bucket_count,
2532        ) else {
2533            return Ok(());
2534        };
2535        if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
2536            increment_counter_by(&mut bucket.values, UNSET_VALUE, 1);
2537            stats.histogram_updates = stats.histogram_updates.saturating_add(1);
2538        }
2539        Ok(())
2540    };
2541
2542    match candidates {
2543        IndexedCandidateSet::All { .. } => {
2544            let mut entry_offsets = Vec::new();
2545            file.entry_offsets(&mut entry_offsets)?;
2546            for entry_offset in entry_offsets {
2547                visit(entry_offset)?;
2548            }
2549        }
2550        IndexedCandidateSet::Set { offsets, .. } => {
2551            for entry_offset in offsets {
2552                visit(*entry_offset)?;
2553            }
2554        }
2555    }
2556
2557    Ok(())
2558}
2559
2560fn indexed_visit_entries<F>(
2561    file: &JournalFile<Mmap>,
2562    cursor: Option<InlinedCursor>,
2563    mut visitor: F,
2564) -> Result<()>
2565where
2566    F: FnMut(NonZeroU64) -> Result<()>,
2567{
2568    let Some(mut cursor) = cursor.map(|cursor| cursor.head()) else {
2569        return Ok(());
2570    };
2571    let mut needle = NonZeroU64::MIN;
2572    while let Some(entry_offset) = cursor.next_until(file, needle)? {
2573        visitor(entry_offset)?;
2574        let Some(next) = entry_offset.get().checked_add(1).and_then(NonZeroU64::new) else {
2575            break;
2576        };
2577        needle = next;
2578    }
2579    Ok(())
2580}
2581
2582fn handle_row_offset_class(
2583    class: OffsetClass,
2584    accumulator: &mut ExplorerAccumulator,
2585    row_id: u64,
2586    state: &mut RowScanState,
2587    out: &mut RowScan,
2588    apply: &mut ScanApply<'_>,
2589    stats: &mut ExplorerStats,
2590) {
2591    match class {
2592        OffsetClass::Irrelevant => {
2593            stats.data_refs_skipped = stats.data_refs_skipped.saturating_add(1);
2594        }
2595        OffsetClass::FtsNegativeMatch => {
2596            out.fts_negative_match = true;
2597        }
2598        OffsetClass::FtsMatch => {
2599            out.fts_matches = true;
2600        }
2601        OffsetClass::Value(value_index) => {
2602            handle_row_value_class(value_index, accumulator, row_id, state, out, apply, stats)
2603        }
2604    }
2605}
2606
2607fn handle_row_value_class(
2608    value_index: usize,
2609    accumulator: &mut ExplorerAccumulator,
2610    row_id: u64,
2611    state: &mut RowScanState,
2612    out: &mut RowScan,
2613    apply: &mut ScanApply<'_>,
2614    stats: &mut ExplorerStats,
2615) {
2616    if accumulator.value_fts_matches[value_index] {
2617        out.fts_matches = true;
2618    }
2619    let field_index = accumulator.value_field_indices[value_index];
2620    let first_for_field = if state.use_first_value
2621        || accumulator.flags[field_index] & (FACET_PUBLIC | FACET_HISTOGRAM) != 0
2622    {
2623        accumulator.mark_field_seen(field_index, row_id)
2624    } else {
2625        true
2626    };
2627    if state.use_first_value && first_for_field {
2628        state.fields_missing_from_row = state.fields_missing_from_row.saturating_sub(1);
2629    }
2630    if !state.use_first_value || first_for_field {
2631        if let Some(timestamp) = accumulator.value_source_realtime[value_index] {
2632            out.timestamp = Some(timestamp);
2633        }
2634        match apply {
2635            ScanApply::Immediate => accumulator.apply_value(value_index, None, stats),
2636            ScanApply::Deferred(values) => values.push(value_index),
2637        }
2638    }
2639}
2640
2641fn record_row_scan_early_stop(stats: &mut ExplorerStats) {
2642    stats.early_stop_opportunities = stats.early_stop_opportunities.saturating_add(1);
2643    stats.early_stops = stats.early_stops.saturating_add(1);
2644}
2645
2646fn cached_offset_class_for_accumulator(
2647    file: &JournalFile<Mmap>,
2648    row: &mut CurrentRowView,
2649    data_offset: NonZeroU64,
2650    accumulator: &ExplorerAccumulator,
2651    column_fields: Option<&mut Vec<Vec<u8>>>,
2652    stats: &mut ExplorerStats,
2653) -> Result<Option<OffsetClass>> {
2654    let Some(class) = accumulator.offset_cache.lookup(data_offset) else {
2655        return Ok(None);
2656    };
2657    if let Some(column_fields) = column_fields {
2658        if let Some((field, _)) = read_payload_field(file, row, data_offset, stats)? {
2659            column_fields.push(field);
2660        }
2661    }
2662    stats.data_cache_hits = stats.data_cache_hits.saturating_add(1);
2663    Ok(Some(class))
2664}
2665
2666fn payload_for_classification<'a>(
2667    file: &JournalFile<Mmap>,
2668    row: &'a mut CurrentRowView,
2669    data_offset: NonZeroU64,
2670    stats: &mut ExplorerStats,
2671) -> Result<&'a [u8]> {
2672    stats.data_cache_misses = stats.data_cache_misses.saturating_add(1);
2673    stats.data_payloads_loaded = stats.data_payloads_loaded.saturating_add(1);
2674    let was_compressed = file.data_ref(data_offset)?.is_compressed();
2675    let payload = row.read_payload_at(file, data_offset)?;
2676    if was_compressed {
2677        stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2678    }
2679    Ok(row.payload_slice(payload))
2680}
2681
2682fn fts_flags_for_value(
2683    value: &[u8],
2684    needs_fts: bool,
2685    query: &ExplorerQuery,
2686    stats: &mut ExplorerStats,
2687) -> (bool, bool) {
2688    if !needs_fts {
2689        return (false, false);
2690    }
2691    stats.fts_scans = stats.fts_scans.saturating_add(1);
2692    match match_fts_query(value, query) {
2693        FtsTermMatch::Positive => (true, false),
2694        FtsTermMatch::Negative => (false, true),
2695        FtsTermMatch::None => (false, false),
2696    }
2697}
2698
2699fn structured_payload_class(
2700    field: &[u8],
2701    value: &[u8],
2702    data_offset: NonZeroU64,
2703    accumulator: &mut ExplorerAccumulator,
2704    fts_matches: bool,
2705    fts_negative_match: bool,
2706) -> OffsetClass {
2707    if fts_negative_match {
2708        OffsetClass::FtsNegativeMatch
2709    } else if let Some(field_index) = accumulator.field_lookup.get(field).copied() {
2710        OffsetClass::Value(accumulator.add_value(field_index, data_offset, value, fts_matches))
2711    } else if fts_matches {
2712        OffsetClass::FtsMatch
2713    } else {
2714        OffsetClass::Irrelevant
2715    }
2716}
2717
2718fn classify_data_for_accumulator(
2719    file: &JournalFile<Mmap>,
2720    row: &mut CurrentRowView,
2721    data_offset: NonZeroU64,
2722    accumulator: &mut ExplorerAccumulator,
2723    needs_fts: bool,
2724    query: &ExplorerQuery,
2725    mut column_fields: Option<&mut Vec<Vec<u8>>>,
2726    stats: &mut ExplorerStats,
2727) -> Result<OffsetClass> {
2728    if let Some(class) = cached_offset_class_for_accumulator(
2729        file,
2730        row,
2731        data_offset,
2732        accumulator,
2733        column_fields.as_mut().map(|fields| &mut **fields),
2734        stats,
2735    )? {
2736        return Ok(class);
2737    }
2738
2739    let payload = payload_for_classification(file, row, data_offset, stats)?;
2740    let Some((field, value)) = split_payload_bytes(payload) else {
2741        let class = classify_unstructured_payload(payload, needs_fts, query, stats);
2742        accumulator.offset_cache.insert(data_offset, class);
2743        stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2744        return Ok(class);
2745    };
2746    if let Some(column_fields) = column_fields {
2747        column_fields.push(field.to_vec());
2748    }
2749
2750    let (fts_matches, fts_negative_match) = fts_flags_for_value(value, needs_fts, query, stats);
2751    let class = structured_payload_class(
2752        field,
2753        value,
2754        data_offset,
2755        accumulator,
2756        fts_matches,
2757        fts_negative_match,
2758    );
2759    accumulator.offset_cache.insert(data_offset, class);
2760    stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2761    Ok(class)
2762}
2763
2764fn read_payload_field(
2765    file: &JournalFile<Mmap>,
2766    row: &mut CurrentRowView,
2767    data_offset: NonZeroU64,
2768    stats: &mut ExplorerStats,
2769) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
2770    let was_compressed = file.data_ref(data_offset)?.is_compressed();
2771    let payload = row.read_payload_at(file, data_offset)?;
2772    if was_compressed {
2773        stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2774    }
2775    let payload = row.payload_slice(payload);
2776    Ok(split_payload_bytes(payload).map(|(field, value)| (field.to_vec(), value.to_vec())))
2777}
2778
2779fn classify_unstructured_payload(
2780    payload: &[u8],
2781    needs_fts: bool,
2782    query: &ExplorerQuery,
2783    stats: &mut ExplorerStats,
2784) -> OffsetClass {
2785    if !needs_fts {
2786        return OffsetClass::Irrelevant;
2787    }
2788    stats.fts_scans = stats.fts_scans.saturating_add(1);
2789    match match_fts_query(payload, query) {
2790        FtsTermMatch::Positive => OffsetClass::FtsMatch,
2791        FtsTermMatch::Negative => OffsetClass::FtsNegativeMatch,
2792        FtsTermMatch::None => OffsetClass::Irrelevant,
2793    }
2794}
2795
2796fn histogram_bucket_index_from_bounds(
2797    realtime_usec: u64,
2798    start_realtime_usec: u64,
2799    bucket_width_usec: u64,
2800    bucket_count: usize,
2801) -> Option<usize> {
2802    if bucket_count == 0 {
2803        return None;
2804    }
2805    realtime_usec
2806        .saturating_sub(start_realtime_usec)
2807        .checked_div(bucket_width_usec.max(1))
2808        .map(|index| (index as usize).min(bucket_count - 1))
2809}
2810
2811fn validate_query(query: &ExplorerQuery) -> Result<()> {
2812    if query
2813        .after_realtime_usec
2814        .zip(query.before_realtime_usec)
2815        .is_some_and(|(after, before)| after > before)
2816    {
2817        return Err(SdkError::InvalidPath(
2818            "after_realtime_usec must be <= before_realtime_usec".to_string(),
2819        ));
2820    }
2821    for filter in &query.filters {
2822        if filter.field.is_empty() || filter.field.contains(&b'=') {
2823            return Err(SdkError::InvalidPath(
2824                "filter field must be non-empty and must not contain '='".to_string(),
2825            ));
2826        }
2827    }
2828    for field in query.facets.iter().chain(query.histogram.iter()) {
2829        if field.is_empty() || field.contains(&b'=') {
2830            return Err(SdkError::InvalidPath(
2831                "facet and histogram fields must be non-empty and must not contain '='".to_string(),
2832            ));
2833        }
2834    }
2835    let mut seen_facets: HashSet<&[u8]> = HashSet::new();
2836    for facet in &query.facets {
2837        if !seen_facets.insert(facet) {
2838            return Err(SdkError::InvalidPath(
2839                "facet fields must not be duplicated".to_string(),
2840            ));
2841        }
2842    }
2843    Ok(())
2844}
2845
2846fn validate_no_debug_column_collection(query: &ExplorerQuery) -> Result<()> {
2847    if query.debug_collect_column_fields_by_row_traversal {
2848        return Err(SdkError::Unsupported(
2849            "debug_collect_column_fields_by_row_traversal is a debug-only discrepancy tool; production explorer queries must use FIELD-index column catalogs instead",
2850        ));
2851    }
2852    Ok(())
2853}
2854
2855fn validate_indexed_query(query: &ExplorerQuery) -> Result<()> {
2856    if query.field_mode != ExplorerFieldMode::AllValues {
2857        return Err(SdkError::Unsupported(
2858            "indexed explorer strategy requires ExplorerFieldMode::AllValues",
2859        ));
2860    }
2861    if query_has_fts(query) {
2862        return Err(SdkError::Unsupported(
2863            "indexed explorer strategy does not support FTS",
2864        ));
2865    }
2866    if query.use_source_realtime
2867        && (query.after_realtime_usec.is_some()
2868            || query.before_realtime_usec.is_some()
2869            || query.histogram.is_some())
2870    {
2871        return Err(SdkError::Unsupported(
2872            "indexed explorer strategy requires commit realtime for time-bounded facets and histograms",
2873        ));
2874    }
2875    Ok(())
2876}
2877
2878fn explorer_outputs_match(left: &ExplorerResult, right: &ExplorerResult) -> bool {
2879    if left.rows.len() != right.rows.len() {
2880        return false;
2881    }
2882    if left.rows.iter().zip(&right.rows).any(|(left, right)| {
2883        left.realtime_usec != right.realtime_usec
2884            || left.cursor != right.cursor
2885            || left.payloads != right.payloads
2886    }) {
2887        return false;
2888    }
2889    if left.facets != right.facets {
2890        return false;
2891    }
2892    explorer_histograms_match(left.histogram.as_ref(), right.histogram.as_ref())
2893}
2894
2895fn explorer_histograms_match(
2896    left: Option<&ExplorerHistogram>,
2897    right: Option<&ExplorerHistogram>,
2898) -> bool {
2899    match (left, right) {
2900        (None, None) => true,
2901        (Some(left), Some(right)) => {
2902            left.field == right.field
2903                && left.buckets.len() == right.buckets.len()
2904                && left
2905                    .buckets
2906                    .iter()
2907                    .zip(&right.buckets)
2908                    .all(|(left, right)| {
2909                        left.start_realtime_usec == right.start_realtime_usec
2910                            && left.end_realtime_usec == right.end_realtime_usec
2911                            && left.values == right.values
2912                    })
2913        }
2914        _ => false,
2915    }
2916}
2917
2918fn query_needs_source_realtime_main(query: &ExplorerQuery) -> bool {
2919    query.use_source_realtime
2920        && (query.after_realtime_usec.is_some()
2921            || query.before_realtime_usec.is_some()
2922            || query.histogram.is_some()
2923            || query.limit > 0)
2924}
2925
2926fn facet_pass_needs_source_realtime(query: &ExplorerQuery) -> bool {
2927    query.use_source_realtime
2928        && (query.after_realtime_usec.is_some() || query.before_realtime_usec.is_some())
2929}
2930
2931fn query_needs_main_pass(query: &ExplorerQuery) -> bool {
2932    query.limit > 0 || query.histogram.is_some()
2933}
2934
2935fn explorer_result_for_query(query: &ExplorerQuery) -> ExplorerResult {
2936    ExplorerResult {
2937        histogram: query
2938            .histogram
2939            .as_ref()
2940            .map(|field| new_histogram(field, query)),
2941        ..ExplorerResult::default()
2942    }
2943}
2944
2945fn explorer_control_stopped(control: Option<&ExplorerControl<'_>>) -> bool {
2946    control.and_then(ExplorerControl::stop_reason).is_some()
2947}
2948
2949fn can_run_combined_explorer_pass(facet_groups: &[FacetPassGroup]) -> bool {
2950    facet_groups
2951        .iter()
2952        .all(|group| group.excluded_field.is_none())
2953}
2954
2955fn combined_facet_indices(facet_groups: &[FacetPassGroup]) -> Vec<usize> {
2956    facet_groups
2957        .iter()
2958        .flat_map(|group| group.facet_indices.iter().copied())
2959        .collect()
2960}
2961
2962fn record_combined_unsampled_row(
2963    stats: &mut ExplorerStats,
2964    mode: CombinedScanMode,
2965    commit_realtime: u64,
2966    row_count: u64,
2967    count_rows_unsampled: bool,
2968) {
2969    record_last_realtime(stats, commit_realtime);
2970    if mode.include_main {
2971        stats.rows_matched = stats.rows_matched.saturating_add(row_count);
2972    }
2973    if mode.include_facets {
2974        stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(row_count);
2975    }
2976    if count_rows_unsampled {
2977        stats.rows_unsampled = stats.rows_unsampled.saturating_add(row_count);
2978    }
2979    stats.sampling_unsampled = stats.sampling_unsampled.saturating_add(1);
2980}
2981
2982fn update_combined_matched_stats(
2983    stats: &mut ExplorerStats,
2984    mode: CombinedScanMode,
2985    effective_realtime: u64,
2986    control: Option<&mut ExplorerControl<'_>>,
2987) -> bool {
2988    let mut stop_after_matched_row = false;
2989    if mode.include_main {
2990        stats.rows_matched = stats.rows_matched.saturating_add(1);
2991        stop_after_matched_row = control
2992            .map(|control| control.emit_matched_row(effective_realtime, stats.rows_matched))
2993            .unwrap_or(false);
2994    }
2995    if mode.include_facets {
2996        stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(1);
2997    }
2998    stop_after_matched_row
2999}
3000
3001fn should_stop_when_rows_full(
3002    query: &ExplorerQuery,
3003    rows: &[ExplorerRow],
3004    effective_realtime: u64,
3005    rows_matched: u64,
3006) -> bool {
3007    if !query.stop_when_rows_full || query.limit == 0 || rows.len() < query.limit {
3008        return false;
3009    }
3010    let every = query.stop_when_rows_full_check_every.max(1);
3011    if rows_matched == 0 || rows_matched % every != 0 {
3012        return false;
3013    }
3014    match query.direction {
3015        Direction::Backward => {
3016            rows.iter()
3017                .map(|row| row.realtime_usec)
3018                .min()
3019                .is_some_and(|oldest| {
3020                    effective_realtime < oldest.saturating_sub(query.realtime_slack_usec)
3021                })
3022        }
3023        Direction::Forward => {
3024            rows.iter()
3025                .map(|row| row.realtime_usec)
3026                .max()
3027                .is_some_and(|newest| {
3028                    effective_realtime > newest.saturating_add(query.realtime_slack_usec)
3029                })
3030        }
3031    }
3032}
3033
3034fn row_candidate_to_keep(query: &ExplorerQuery, rows: &[ExplorerRow], realtime_usec: u64) -> bool {
3035    if query.limit == 0 {
3036        return false;
3037    }
3038    if !row_within_anchor(query, realtime_usec) {
3039        return false;
3040    }
3041    if rows.len() < query.limit {
3042        return true;
3043    }
3044    match query.direction {
3045        Direction::Backward => rows
3046            .iter()
3047            .map(|row| row.realtime_usec)
3048            .min()
3049            .is_some_and(|oldest| realtime_usec >= oldest),
3050        Direction::Forward => rows
3051            .iter()
3052            .map(|row| row.realtime_usec)
3053            .max()
3054            .is_some_and(|newest| realtime_usec <= newest),
3055    }
3056}
3057
3058fn row_within_anchor(query: &ExplorerQuery, realtime_usec: u64) -> bool {
3059    match (query.direction, query.anchor) {
3060        (Direction::Forward, ExplorerAnchor::Realtime(anchor)) => realtime_usec > anchor,
3061        (Direction::Backward, ExplorerAnchor::Realtime(anchor)) => realtime_usec <= anchor,
3062        _ => true,
3063    }
3064}
3065
3066fn add_special_histogram_value(
3067    histogram: Option<&mut ExplorerHistogram>,
3068    realtime_usec: u64,
3069    value: &[u8],
3070    count: u64,
3071    stats: &mut ExplorerStats,
3072) {
3073    let Some(histogram) = histogram else {
3074        return;
3075    };
3076    let Some(bucket_index) = histogram_bucket_index(histogram, realtime_usec) else {
3077        return;
3078    };
3079    if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
3080        increment_counter_by(&mut bucket.values, value, count);
3081        stats.histogram_updates = stats.histogram_updates.saturating_add(1);
3082    }
3083}
3084
3085fn add_estimated_histogram_range(
3086    histogram: Option<&mut ExplorerHistogram>,
3087    from_realtime_usec: u64,
3088    to_realtime_usec: u64,
3089    entries: u64,
3090    stats: &mut ExplorerStats,
3091) {
3092    let Some(histogram) = histogram else {
3093        return;
3094    };
3095    if entries == 0 || from_realtime_usec >= to_realtime_usec {
3096        return;
3097    }
3098
3099    let Some(first) = histogram.buckets.first() else {
3100        return;
3101    };
3102    let Some(last) = histogram.buckets.last() else {
3103        return;
3104    };
3105    let from_realtime_usec = from_realtime_usec.max(first.start_realtime_usec);
3106    let to_realtime_usec = to_realtime_usec.min(last.end_realtime_usec);
3107    if from_realtime_usec >= to_realtime_usec {
3108        return;
3109    }
3110
3111    let total = to_realtime_usec.saturating_sub(from_realtime_usec).max(1);
3112    let mut touched = 0u64;
3113    for bucket in &mut histogram.buckets {
3114        if bucket.start_realtime_usec > to_realtime_usec {
3115            break;
3116        }
3117        let overlap_start = bucket.start_realtime_usec.max(from_realtime_usec);
3118        let overlap_end = bucket.end_realtime_usec.min(to_realtime_usec);
3119        if overlap_start >= overlap_end {
3120            continue;
3121        }
3122        let bucket_entries = ((overlap_end.saturating_sub(overlap_start) as u128 * entries as u128)
3123            / total as u128) as u64;
3124        if bucket_entries != 0 {
3125            increment_counter_by(&mut bucket.values, EXPLORER_ESTIMATED_VALUE, bucket_entries);
3126        }
3127        touched = touched.saturating_add(1);
3128    }
3129    stats.histogram_updates = stats.histogram_updates.saturating_add(touched);
3130}
3131
3132fn histogram_bucket_index(histogram: &ExplorerHistogram, realtime_usec: u64) -> Option<usize> {
3133    let first = histogram.buckets.first()?;
3134    let width = first
3135        .end_realtime_usec
3136        .saturating_sub(first.start_realtime_usec)
3137        .max(1);
3138    histogram_bucket_index_from_bounds(
3139        realtime_usec,
3140        first.start_realtime_usec,
3141        width,
3142        histogram.buckets.len(),
3143    )
3144}
3145
3146fn payload_from_parts(field: &[u8], value: &[u8]) -> Vec<u8> {
3147    let mut out = Vec::with_capacity(field.len() + 1 + value.len());
3148    out.extend_from_slice(field);
3149    out.push(b'=');
3150    out.extend_from_slice(value);
3151    out
3152}
3153
3154fn split_payload_bytes(payload: &[u8]) -> Option<(&[u8], &[u8])> {
3155    let eq = payload.iter().position(|byte| *byte == b'=')?;
3156    Some((&payload[..eq], &payload[eq + 1..]))
3157}
3158
3159fn parse_source_realtime(value: &[u8]) -> Option<u64> {
3160    std::str::from_utf8(value).ok()?.parse().ok()
3161}
3162
3163fn effective_realtime_from_scan(source_realtime: Option<u64>, commit_realtime: u64) -> u64 {
3164    match source_realtime {
3165        Some(source_realtime) if source_realtime != 0 && source_realtime < commit_realtime => {
3166            source_realtime
3167        }
3168        _ => commit_realtime,
3169    }
3170}
3171
3172fn record_last_realtime(stats: &mut ExplorerStats, commit_realtime: u64) {
3173    if commit_realtime > stats.last_realtime_usec {
3174        stats.last_realtime_usec = commit_realtime;
3175    }
3176}
3177
3178fn record_source_realtime_delta(
3179    stats: &mut ExplorerStats,
3180    source_realtime: Option<u64>,
3181    commit_realtime: u64,
3182) {
3183    let Some(source_realtime) = source_realtime else {
3184        return;
3185    };
3186    if source_realtime == 0 || source_realtime >= commit_realtime {
3187        return;
3188    }
3189    let delta = commit_realtime.saturating_sub(source_realtime);
3190    if delta > stats.max_source_realtime_delta_usec {
3191        stats.max_source_realtime_delta_usec = delta;
3192    }
3193}
3194
3195fn query_has_fts(query: &ExplorerQuery) -> bool {
3196    !query.fts_terms.is_empty()
3197        || !query.fts_patterns.is_empty()
3198        || !query.fts_negative_patterns.is_empty()
3199}
3200
3201fn query_has_positive_fts(query: &ExplorerQuery) -> bool {
3202    if !query.fts_terms.is_empty() {
3203        query.fts_terms.iter().any(|term| !term.negative)
3204    } else {
3205        !query.fts_patterns.is_empty()
3206    }
3207}
3208
3209fn row_rejected_by_fts(query: &ExplorerQuery, scan: &RowScan) -> bool {
3210    query_has_fts(query)
3211        && (scan.fts_negative_match || query_has_positive_fts(query) && !scan.fts_matches)
3212}
3213
3214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3215enum FtsTermMatch {
3216    None,
3217    Positive,
3218    Negative,
3219}
3220
3221fn match_fts_query(value: &[u8], query: &ExplorerQuery) -> FtsTermMatch {
3222    if !query.fts_terms.is_empty() {
3223        for term in &query.fts_terms {
3224            if term.matches(value) {
3225                return if term.negative {
3226                    FtsTermMatch::Negative
3227                } else {
3228                    FtsTermMatch::Positive
3229                };
3230            }
3231        }
3232        return FtsTermMatch::None;
3233    }
3234
3235    if matches_fts(value, &query.fts_negative_patterns) {
3236        FtsTermMatch::Negative
3237    } else if matches_fts(value, &query.fts_patterns) {
3238        FtsTermMatch::Positive
3239    } else {
3240        FtsTermMatch::None
3241    }
3242}
3243
3244fn matches_fts(value: &[u8], patterns: &[Vec<u8>]) -> bool {
3245    patterns
3246        .iter()
3247        .filter(|pattern| !pattern.is_empty())
3248        .any(|pattern| contains_ascii_case_insensitive(value, pattern))
3249}
3250
3251fn contains_ascii_case_insensitive(haystack: &[u8], needle: &[u8]) -> bool {
3252    if needle.is_empty() {
3253        return true;
3254    }
3255    if haystack.len() < needle.len() {
3256        return false;
3257    }
3258    haystack.windows(needle.len()).any(|window| {
3259        window
3260            .iter()
3261            .zip(needle)
3262            .all(|(left, right)| left.eq_ignore_ascii_case(right))
3263    })
3264}
3265
3266fn find_ascii_case_insensitive(haystack: &[u8], needle: &[u8]) -> Option<usize> {
3267    if needle.is_empty() {
3268        return Some(0);
3269    }
3270    if haystack.len() < needle.len() {
3271        return None;
3272    }
3273    haystack.windows(needle.len()).position(|window| {
3274        window
3275            .iter()
3276            .zip(needle)
3277            .all(|(left, right)| left.eq_ignore_ascii_case(right))
3278    })
3279}
3280
3281fn timestamp_in_range(query: &ExplorerQuery, timestamp: u64) -> bool {
3282    if query
3283        .after_realtime_usec
3284        .is_some_and(|after| timestamp < after)
3285    {
3286        return false;
3287    }
3288    if query
3289        .before_realtime_usec
3290        .is_some_and(|before| timestamp > before)
3291    {
3292        return false;
3293    }
3294    true
3295}
3296
3297fn stop_by_commit_time(query: &ExplorerQuery, commit_realtime: u64) -> bool {
3298    // Netdata expands the seek/stop side by the learned journal-vs-source
3299    // realtime delta, then still uses commit realtime for the fast boundary
3300    // checks before row DATA is scanned for _SOURCE_REALTIME_TIMESTAMP.
3301    match query.direction {
3302        Direction::Forward => query.before_realtime_usec.is_some_and(|before| {
3303            commit_realtime > before.saturating_add(query.realtime_slack_usec)
3304        }),
3305        Direction::Backward => query
3306            .after_realtime_usec
3307            .is_some_and(|after| commit_realtime < after),
3308    }
3309}
3310
3311fn skip_by_commit_time(query: &ExplorerQuery, commit_realtime: u64) -> bool {
3312    match query.direction {
3313        Direction::Forward => query
3314            .after_realtime_usec
3315            .is_some_and(|after| commit_realtime < after),
3316        Direction::Backward => query.before_realtime_usec.is_some_and(|before| {
3317            commit_realtime > before.saturating_add(query.realtime_slack_usec)
3318        }),
3319    }
3320}
3321
3322fn new_histogram(field: &[u8], query: &ExplorerQuery) -> ExplorerHistogram {
3323    let (start, end) = histogram_bounds(query);
3324    let target_buckets = query.histogram_target_buckets.max(1);
3325    let mut width = histogram_bar_width_usec(start, end, target_buckets);
3326    let start = histogram_slot_baseline_usec(start, width);
3327    let mut end = histogram_slot_baseline_usec(end, width).saturating_add(width);
3328    let mut bucket_count = end
3329        .saturating_sub(start)
3330        .checked_div(width)
3331        .unwrap_or(0)
3332        .saturating_add(1) as usize;
3333    if bucket_count > 1001 {
3334        bucket_count = 1001;
3335        width = end
3336            .saturating_sub(start)
3337            .checked_div(1000)
3338            .unwrap_or(0)
3339            .max(1);
3340        end = start.saturating_add(width.saturating_mul(1000));
3341    }
3342    let mut buckets = Vec::with_capacity(bucket_count);
3343    for index in 0..bucket_count {
3344        let bucket_start = start.saturating_add(width.saturating_mul(index as u64));
3345        let bucket_end = if index + 1 == bucket_count {
3346            end.saturating_add(1)
3347        } else {
3348            bucket_start.saturating_add(width)
3349        };
3350        buckets.push(ExplorerHistogramBucket {
3351            start_realtime_usec: bucket_start,
3352            end_realtime_usec: bucket_end,
3353            values: HashMap::new(),
3354        });
3355    }
3356    ExplorerHistogram {
3357        field: field.to_vec(),
3358        buckets,
3359    }
3360}
3361
3362fn histogram_bar_width_usec(after: u64, before: u64, target_buckets: usize) -> u64 {
3363    const USEC_PER_SEC: u64 = 1_000_000;
3364    const VALID_DURATIONS_SECONDS: &[u64] = &[
3365        1, 2, 5, 10, 15, 30, 60, 120, 180, 300, 600, 900, 1800, 3600, 7200, 21600, 28800, 43200,
3366        86400, 172800, 259200, 432000, 604800, 1209600, 2592000,
3367    ];
3368    let duration = before.saturating_sub(after);
3369    for seconds in VALID_DURATIONS_SECONDS.iter().rev() {
3370        let width = seconds.saturating_mul(USEC_PER_SEC);
3371        if width != 0 && duration / width >= target_buckets as u64 {
3372            return width;
3373        }
3374    }
3375    USEC_PER_SEC
3376}
3377
3378fn histogram_slot_baseline_usec(value: u64, width: u64) -> u64 {
3379    value.saturating_sub(value % width.max(1))
3380}
3381
3382fn histogram_bounds(query: &ExplorerQuery) -> (u64, u64) {
3383    let start = query.after_realtime_usec.unwrap_or(0);
3384    let end = query
3385        .before_realtime_usec
3386        .unwrap_or_else(|| start.saturating_add(3_600_000_000));
3387    if end <= start {
3388        (start, start.saturating_add(1))
3389    } else {
3390        (start, end)
3391    }
3392}
3393
3394fn increment_counter_by(map: &mut HashMap<Vec<u8>, u64>, value: &[u8], delta: u64) {
3395    if let Some(count) = map.get_mut(value) {
3396        *count = count.saturating_add(delta);
3397    } else {
3398        map.insert(value.to_vec(), delta);
3399    }
3400}
3401
3402#[cfg(test)]
3403mod tests {
3404    use super::*;
3405    use journal_core::file::{JournalFileOptions, JournalWriter, MmapMut};
3406    use journal_core::repository::File as RepoFile;
3407    use tempfile::TempDir;
3408
3409    fn test_uuid(seed: u8) -> uuid::Uuid {
3410        uuid::Uuid::from_bytes([seed; 16])
3411    }
3412
3413    fn create_writer(
3414        path: &std::path::Path,
3415        compression: Option<(Compression, usize)>,
3416    ) -> (JournalFile<MmapMut>, JournalWriter) {
3417        if let Some(parent) = path.parent() {
3418            std::fs::create_dir_all(parent).expect("create journal parent");
3419        }
3420        let repo_file = RepoFile::from_path(path).expect("repo file");
3421        let mut options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
3422        if let Some((compression, threshold)) = compression {
3423            options = options
3424                .with_compression(compression)
3425                .with_compress_threshold(threshold);
3426        }
3427        let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
3428        let writer = if let Some((compression, threshold)) = compression {
3429            JournalWriter::new_with_compression(&mut file, 1, test_uuid(4), compression, threshold)
3430                .expect("writer")
3431        } else {
3432            JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer")
3433        };
3434        (file, writer)
3435    }
3436
3437    fn write_entries(
3438        path: &std::path::Path,
3439        compression: Option<(Compression, usize)>,
3440        entries: &[(&[&[u8]], u64)],
3441    ) {
3442        let (mut file, mut writer) = create_writer(path, compression);
3443        for (payloads, realtime) in entries {
3444            writer
3445                .add_entry(&mut file, payloads, *realtime, *realtime)
3446                .expect("write entry");
3447        }
3448        file.sync().expect("sync journal");
3449    }
3450
3451    fn write_many_entries(path: &std::path::Path, count: usize) {
3452        let (mut file, mut writer) = create_writer(path, None);
3453        for index in 0..count {
3454            let message = format!("MESSAGE=row-{index}");
3455            let service = if index % 2 == 0 {
3456                b"SERVICE=even".as_slice()
3457            } else {
3458                b"SERVICE=odd".as_slice()
3459            };
3460            let payloads: [&[u8]; 2] = [message.as_bytes(), service];
3461            let realtime = 1_700_000_000_000_000u64.saturating_add(index as u64);
3462            writer
3463                .add_entry(&mut file, &payloads, realtime, realtime)
3464                .expect("write entry");
3465        }
3466        file.sync().expect("sync journal");
3467    }
3468
3469    #[test]
3470    fn explorer_control_reports_progress_during_large_scan() {
3471        let dir = TempDir::new().expect("tempdir");
3472        let path = dir.path().join("progress.journal");
3473        write_many_entries(&path, 9_000);
3474
3475        let mut reports = Vec::new();
3476        let mut progress = |progress: ExplorerProgress| {
3477            reports.push(progress.stats.rows_examined);
3478        };
3479        let mut control = ExplorerControl::new();
3480        control.set_progress_interval(Duration::ZERO);
3481        control.set_progress_callback(Some(&mut progress));
3482        let mut reader = FileReader::open(&path).expect("open reader");
3483        let query = ExplorerQuery {
3484            facets: vec![b"SERVICE".to_vec()],
3485            limit: 0,
3486            ..ExplorerQuery::default()
3487        };
3488
3489        let result = reader
3490            .explore_with_strategy_and_control(&query, ExplorerStrategy::Traversal, &mut control)
3491            .expect("explore");
3492
3493        assert_eq!(control.stop_reason(), None);
3494        assert_eq!(result.stats.rows_examined, 9_000);
3495        assert!(!reports.is_empty());
3496        assert!(reports.iter().any(|rows| *rows >= 8_191));
3497    }
3498
3499    #[test]
3500    fn explorer_control_cancels_inside_large_scan() {
3501        let dir = TempDir::new().expect("tempdir");
3502        let path = dir.path().join("cancel.journal");
3503        write_many_entries(&path, 9_000);
3504
3505        let is_cancelled = || true;
3506        let mut control = ExplorerControl::new();
3507        control.set_cancellation_callback(Some(&is_cancelled));
3508        let mut reader = FileReader::open(&path).expect("open reader");
3509        let query = ExplorerQuery {
3510            facets: vec![b"SERVICE".to_vec()],
3511            limit: 0,
3512            ..ExplorerQuery::default()
3513        };
3514
3515        let result = reader
3516            .explore_with_strategy_and_control(&query, ExplorerStrategy::Traversal, &mut control)
3517            .expect("explore");
3518
3519        assert_eq!(control.stop_reason(), Some(ExplorerStopReason::Cancelled));
3520        assert!(result.stats.rows_examined < 9_000);
3521    }
3522
3523    #[test]
3524    fn explorer_filters_with_or_values_and_and_fields() {
3525        let dir = TempDir::new().expect("tempdir");
3526        let path = dir.path().join("filter.journal");
3527        write_entries(
3528            &path,
3529            None,
3530            &[
3531                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3532                (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
3533                (&[b"SERVICE=b", b"PRIORITY=4"], 3_000),
3534            ],
3535        );
3536
3537        let mut reader = FileReader::open(&path).expect("open reader");
3538        let query = ExplorerQuery {
3539            filters: vec![
3540                ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec(), b"b".to_vec()]),
3541                ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
3542            ],
3543            facets: vec![b"SERVICE".to_vec()],
3544            limit: 10,
3545            ..ExplorerQuery::default()
3546        };
3547
3548        let result = reader.explore(&query).expect("explore");
3549        assert_eq!(result.rows.len(), 2);
3550        let service = result
3551            .facets
3552            .get(b"SERVICE".as_slice())
3553            .expect("service facet");
3554        assert_eq!(service.get(b"a".as_slice()), Some(&1));
3555        assert_eq!(service.get(b"b".as_slice()), Some(&1));
3556        assert!(result.stats.data_cache_misses > 0);
3557    }
3558
3559    #[test]
3560    fn explorer_rejects_debug_row_traversal_column_collection() {
3561        let dir = TempDir::new().expect("tempdir");
3562        let path = dir.path().join("debug-column-collection.journal");
3563        write_entries(&path, None, &[(&[b"PRIORITY=3", b"MESSAGE=hello"], 1_000)]);
3564
3565        let query = ExplorerQuery {
3566            facets: vec![b"PRIORITY".to_vec()],
3567            debug_collect_column_fields_by_row_traversal: true,
3568            ..ExplorerQuery::default()
3569        };
3570
3571        let mut reader = FileReader::open(&path).expect("open reader");
3572        let err = reader
3573            .explore(&query)
3574            .expect_err("debug-only column collection is rejected");
3575        assert!(matches!(err, SdkError::Unsupported(_)));
3576        assert!(
3577            err.to_string()
3578                .contains("debug_collect_column_fields_by_row_traversal")
3579        );
3580
3581        let mut reader = FileReader::open(&path).expect("reopen reader");
3582        let err = reader
3583            .explore_with_strategy_cursor_rows(&query, ExplorerStrategy::Traversal)
3584            .expect_err("cursor-row explorer also rejects debug-only column collection");
3585        assert!(matches!(err, SdkError::Unsupported(_)));
3586    }
3587
3588    #[test]
3589    fn explorer_skips_irrelevant_compressed_data_for_facets() {
3590        let dir = TempDir::new().expect("tempdir");
3591        let path = dir.path().join("compressed.journal");
3592        let large_message = b"MESSAGE=abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
3593        write_entries(
3594            &path,
3595            Some((Compression::Zstd, 32)),
3596            &[(&[b"PRIORITY=3", large_message], 1_000)],
3597        );
3598
3599        let mut reader = FileReader::open(&path).expect("open reader");
3600        let query = ExplorerQuery {
3601            facets: vec![b"PRIORITY".to_vec()],
3602            limit: 0,
3603            ..ExplorerQuery::default()
3604        };
3605
3606        let result = reader.explore(&query).expect("explore");
3607        let priority = result
3608            .facets
3609            .get(b"PRIORITY".as_slice())
3610            .expect("priority facet");
3611        assert_eq!(priority.get(b"3".as_slice()), Some(&1));
3612        assert_eq!(result.stats.payloads_decompressed, 0);
3613        assert_eq!(result.stats.data_refs_seen, 1);
3614        assert_eq!(result.stats.early_stops, 1);
3615    }
3616
3617    #[test]
3618    fn explorer_reuses_classified_data_objects() {
3619        let dir = TempDir::new().expect("tempdir");
3620        let path = dir.path().join("reuse.journal");
3621        write_entries(
3622            &path,
3623            None,
3624            &[
3625                (&[b"PRIORITY=3"], 1_000),
3626                (&[b"PRIORITY=3"], 2_000),
3627                (&[b"PRIORITY=3"], 3_000),
3628            ],
3629        );
3630
3631        let mut reader = FileReader::open(&path).expect("open reader");
3632        let query = ExplorerQuery {
3633            facets: vec![b"PRIORITY".to_vec()],
3634            limit: 0,
3635            ..ExplorerQuery::default()
3636        };
3637
3638        let result = reader.explore(&query).expect("explore");
3639        let priority = result
3640            .facets
3641            .get(b"PRIORITY".as_slice())
3642            .expect("priority facet");
3643        assert_eq!(priority.get(b"3".as_slice()), Some(&3));
3644        assert!(result.stats.data_cache_hits >= 2);
3645    }
3646
3647    #[test]
3648    fn explorer_groups_facets_with_same_filter_set() {
3649        let dir = TempDir::new().expect("tempdir");
3650        let path = dir.path().join("grouped-facets.journal");
3651        write_entries(
3652            &path,
3653            None,
3654            &[
3655                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3656                (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3657            ],
3658        );
3659
3660        let mut reader = FileReader::open(&path).expect("open reader");
3661        let query = ExplorerQuery {
3662            facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
3663            limit: 0,
3664            ..ExplorerQuery::default()
3665        };
3666
3667        let result = reader.explore(&query).expect("explore");
3668        assert_eq!(result.stats.rows_examined, 2);
3669        assert_eq!(result.stats.facet_rows_matched, 2);
3670        assert_eq!(
3671            result
3672                .facets
3673                .get(b"SERVICE".as_slice())
3674                .and_then(|values| values.get(b"a".as_slice())),
3675            Some(&1)
3676        );
3677        assert_eq!(
3678            result
3679                .facets
3680                .get(b"PRIORITY".as_slice())
3681                .and_then(|values| values.get(b"4".as_slice())),
3682            Some(&1)
3683        );
3684    }
3685
3686    #[test]
3687    fn explorer_combines_rows_histogram_and_facets_in_one_pass() {
3688        let dir = TempDir::new().expect("tempdir");
3689        let path = dir.path().join("combined-pass.journal");
3690        write_entries(
3691            &path,
3692            None,
3693            &[
3694                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3695                (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3696            ],
3697        );
3698
3699        let mut reader = FileReader::open(&path).expect("open reader");
3700        let query = ExplorerQuery {
3701            facets: vec![b"SERVICE".to_vec()],
3702            histogram: Some(b"PRIORITY".to_vec()),
3703            histogram_target_buckets: 2,
3704            limit: 2,
3705            ..ExplorerQuery::default()
3706        };
3707
3708        let result = reader.explore(&query).expect("explore");
3709        assert_eq!(result.rows.len(), 2);
3710        assert_eq!(result.stats.rows_examined, 2);
3711        assert_eq!(result.stats.rows_matched, 2);
3712        assert_eq!(result.stats.facet_rows_matched, 2);
3713        assert_eq!(
3714            result
3715                .facets
3716                .get(b"SERVICE".as_slice())
3717                .and_then(|values| values.get(b"a".as_slice())),
3718            Some(&1)
3719        );
3720        let histogram_total = result
3721            .histogram
3722            .as_ref()
3723            .expect("histogram")
3724            .buckets
3725            .iter()
3726            .flat_map(|bucket| bucket.values.values())
3727            .sum::<u64>();
3728        assert_eq!(histogram_total, 2);
3729    }
3730
3731    #[test]
3732    fn explorer_sampling_uses_actual_histogram_bucket_count() {
3733        let query = ExplorerQuery {
3734            after_realtime_usec: Some(1_733_494_460_000_000),
3735            before_realtime_usec: Some(1_735_656_412_000_000),
3736            histogram: Some(b"PRIORITY".to_vec()),
3737            histogram_target_buckets: 300,
3738            sampling: Some(ExplorerSampling {
3739                budget: 20_000,
3740                matched_files: 200,
3741                file_head_realtime_usec: 1_733_494_460_000_000,
3742                file_tail_realtime_usec: 1_735_656_412_000_000,
3743                file_head_seqnum: 1,
3744                file_tail_seqnum: 2,
3745                file_entries: 2,
3746            }),
3747            ..ExplorerQuery::default()
3748        };
3749
3750        let bucket_count = histogram_bucket_count_for_query(&query).expect("bucket count");
3751        let sampling =
3752            ExplorerSamplingState::for_query(&query, Some(bucket_count)).expect("sampling");
3753
3754        assert_eq!(bucket_count, 302);
3755        assert_eq!(sampling.per_slot_sampled.len(), bucket_count);
3756    }
3757
3758    #[test]
3759    fn explorer_sampling_seqnum_estimate_clamps_over_scanned_to_one() {
3760        let query = ExplorerQuery {
3761            after_realtime_usec: Some(1),
3762            before_realtime_usec: Some(100),
3763            direction: Direction::Forward,
3764            sampling: Some(ExplorerSampling {
3765                budget: 20,
3766                matched_files: 1,
3767                file_head_realtime_usec: 1,
3768                file_tail_realtime_usec: 100,
3769                file_head_seqnum: 1,
3770                file_tail_seqnum: 100,
3771                file_entries: 3,
3772            }),
3773            ..ExplorerQuery::default()
3774        };
3775        let mut sampling = ExplorerSamplingState::for_query(&query, None).expect("sampling");
3776        sampling.per_file_sampled = 10;
3777
3778        assert_eq!(sampling.estimate_remaining_rows_by_seqnum(5), Some(1));
3779    }
3780
3781    #[test]
3782    fn explorer_estimated_histogram_distribution_matches_netdata_integer_math() {
3783        let mut histogram = ExplorerHistogram {
3784            field: b"PRIORITY".to_vec(),
3785            buckets: vec![
3786                ExplorerHistogramBucket {
3787                    start_realtime_usec: 0,
3788                    end_realtime_usec: 10,
3789                    values: HashMap::new(),
3790                },
3791                ExplorerHistogramBucket {
3792                    start_realtime_usec: 10,
3793                    end_realtime_usec: 20,
3794                    values: HashMap::new(),
3795                },
3796                ExplorerHistogramBucket {
3797                    start_realtime_usec: 20,
3798                    end_realtime_usec: 30,
3799                    values: HashMap::new(),
3800                },
3801            ],
3802        };
3803        let mut stats = ExplorerStats::default();
3804
3805        add_estimated_histogram_range(Some(&mut histogram), 0, 30, 10, &mut stats);
3806
3807        let counts = histogram
3808            .buckets
3809            .iter()
3810            .map(|bucket| {
3811                bucket
3812                    .values
3813                    .get(EXPLORER_ESTIMATED_VALUE)
3814                    .copied()
3815                    .unwrap_or_default()
3816            })
3817            .collect::<Vec<_>>();
3818        assert_eq!(counts, vec![3, 3, 3]);
3819        assert_eq!(counts.iter().sum::<u64>(), 9);
3820    }
3821
3822    #[test]
3823    fn explorer_filters_then_combines_outputs_in_one_candidate_pass() {
3824        let dir = TempDir::new().expect("tempdir");
3825        let path = dir.path().join("filtered-combined-pass.journal");
3826        write_entries(
3827            &path,
3828            None,
3829            &[
3830                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3831                (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3832                (&[b"SERVICE=c", b"PRIORITY=3"], 3_000),
3833            ],
3834        );
3835
3836        let mut reader = FileReader::open(&path).expect("open reader");
3837        let query = ExplorerQuery {
3838            filters: vec![ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()])],
3839            facets: vec![b"SERVICE".to_vec()],
3840            histogram: Some(b"SERVICE".to_vec()),
3841            histogram_target_buckets: 2,
3842            limit: 10,
3843            ..ExplorerQuery::default()
3844        };
3845
3846        let result = reader.explore(&query).expect("explore");
3847        assert_eq!(result.rows.len(), 2);
3848        assert_eq!(result.stats.rows_examined, 2);
3849        assert_eq!(result.stats.rows_matched, 2);
3850        assert_eq!(result.stats.facet_rows_matched, 2);
3851        let service = result
3852            .facets
3853            .get(b"SERVICE".as_slice())
3854            .expect("service facet");
3855        assert_eq!(service.get(b"a".as_slice()), Some(&1));
3856        assert_eq!(service.get(b"c".as_slice()), Some(&1));
3857        assert_eq!(service.get(b"b".as_slice()), None);
3858    }
3859
3860    #[test]
3861    fn explorer_cursor_rows_defer_payload_expansion() {
3862        let dir = TempDir::new().expect("tempdir");
3863        let path = dir.path().join("cursor-only-row.journal");
3864        write_entries(
3865            &path,
3866            None,
3867            &[(&[b"SERVICE=a", b"PRIORITY=3", b"MESSAGE=hello"], 1_000)],
3868        );
3869
3870        let query = ExplorerQuery {
3871            limit: 1,
3872            ..ExplorerQuery::default()
3873        };
3874        let mut reader = FileReader::open(&path).expect("open reader");
3875        let result = reader
3876            .explore_with_strategy_cursor_rows(&query, ExplorerStrategy::Traversal)
3877            .expect("explore cursor rows");
3878
3879        assert_eq!(result.rows.len(), 1);
3880        assert!(result.rows[0].payloads.is_empty());
3881        assert_eq!(result.stats.returned_row_expansions, 0);
3882
3883        let cursor = result.rows[0].cursor.clone();
3884        let mut reader = FileReader::open(&path).expect("reopen reader");
3885        reader.seek_cursor(&cursor).expect("seek cursor");
3886        assert!(reader.test_cursor(&cursor).expect("test cursor"));
3887
3888        let mut payloads = Vec::new();
3889        reader
3890            .collect_entry_payloads(&mut payloads)
3891            .expect("collect payloads");
3892        assert!(payloads.iter().any(|payload| payload == b"MESSAGE=hello"));
3893    }
3894
3895    #[test]
3896    fn explorer_same_field_filter_exclusion_counts_filtered_out_facet_values() {
3897        let dir = TempDir::new().expect("tempdir");
3898        let path = dir.path().join("same-field-filter-facet.journal");
3899        write_entries(
3900            &path,
3901            None,
3902            &[
3903                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3904                (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
3905                (&[b"SERVICE=a", b"PRIORITY=4"], 3_000),
3906            ],
3907        );
3908
3909        let mut reader = FileReader::open(&path).expect("open reader");
3910        let query = ExplorerQuery {
3911            filters: vec![
3912                ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec()]),
3913                ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
3914            ],
3915            facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
3916            limit: 0,
3917            ..ExplorerQuery::default()
3918        };
3919
3920        let result = reader.explore(&query).expect("explore");
3921        let service = result
3922            .facets
3923            .get(b"SERVICE".as_slice())
3924            .expect("service facet");
3925        assert_eq!(service.get(b"a".as_slice()), Some(&1));
3926        assert_eq!(service.get(b"b".as_slice()), Some(&1));
3927
3928        let priority = result
3929            .facets
3930            .get(b"PRIORITY".as_slice())
3931            .expect("priority facet");
3932        assert_eq!(priority.get(b"3".as_slice()), Some(&1));
3933        assert_eq!(priority.get(b"4".as_slice()), Some(&1));
3934    }
3935
3936    #[test]
3937    fn explorer_index_strategy_matches_traversal_for_all_values() {
3938        let dir = TempDir::new().expect("tempdir");
3939        let path = dir.path().join("indexed-all-values.journal");
3940        write_entries(
3941            &path,
3942            None,
3943            &[
3944                (&[b"SERVICE=a", b"PRIORITY=3", b"TAG=x"], 1_000),
3945                (&[b"SERVICE=b", b"PRIORITY=3", b"TAG=x"], 2_000),
3946                (&[b"SERVICE=a", b"PRIORITY=4", b"TAG=y", b"TAG=z"], 3_000),
3947                (&[b"PRIORITY=3"], 4_000),
3948            ],
3949        );
3950
3951        let query = ExplorerQuery {
3952            after_realtime_usec: Some(0),
3953            before_realtime_usec: Some(5_000),
3954            filters: vec![ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()])],
3955            facets: vec![b"SERVICE".to_vec(), b"TAG".to_vec()],
3956            histogram: Some(b"SERVICE".to_vec()),
3957            histogram_target_buckets: 2,
3958            limit: 2,
3959            field_mode: ExplorerFieldMode::AllValues,
3960            use_source_realtime: false,
3961            ..ExplorerQuery::default()
3962        };
3963
3964        let mut reader = FileReader::open(&path).expect("open reader");
3965        let result = reader
3966            .explore_with_strategy(&query, ExplorerStrategy::Compare)
3967            .expect("compare");
3968
3969        let comparison = result.comparison.as_ref().expect("comparison diagnostics");
3970        assert_eq!(comparison.index_stats, result.stats);
3971        assert_eq!(comparison.traversal_stats.rows_returned, 2);
3972        assert_eq!(comparison.index_stats.rows_returned, 2);
3973
3974        assert_eq!(result.rows.len(), 2);
3975        let service = result
3976            .facets
3977            .get(b"SERVICE".as_slice())
3978            .expect("service facet");
3979        assert_eq!(service.get(b"a".as_slice()), Some(&1));
3980        assert_eq!(service.get(b"b".as_slice()), Some(&1));
3981        assert_eq!(service.get(UNSET_VALUE), Some(&1));
3982        let histogram = result.histogram.as_ref().expect("histogram");
3983        assert_eq!(histogram.buckets.len(), 2);
3984        assert_eq!(histogram.buckets[0].values.get(b"a".as_slice()), Some(&1));
3985        assert_eq!(histogram.buckets[0].values.get(b"b".as_slice()), Some(&1));
3986        assert_eq!(histogram.buckets[0].values.get(UNSET_VALUE), Some(&1));
3987    }
3988
3989    #[test]
3990    fn explorer_index_strategy_preserves_same_field_filter_exclusion() {
3991        let dir = TempDir::new().expect("tempdir");
3992        let path = dir.path().join("indexed-same-field-filter.journal");
3993        write_entries(
3994            &path,
3995            None,
3996            &[
3997                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3998                (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
3999                (&[b"SERVICE=a", b"PRIORITY=4"], 3_000),
4000            ],
4001        );
4002
4003        let query = ExplorerQuery {
4004            filters: vec![
4005                ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec()]),
4006                ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
4007            ],
4008            facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
4009            field_mode: ExplorerFieldMode::AllValues,
4010            use_source_realtime: false,
4011            ..ExplorerQuery::default()
4012        };
4013
4014        let mut reader = FileReader::open(&path).expect("open reader");
4015        let result = reader
4016            .explore_with_strategy(&query, ExplorerStrategy::Compare)
4017            .expect("compare");
4018        let service = result
4019            .facets
4020            .get(b"SERVICE".as_slice())
4021            .expect("service facet");
4022        assert_eq!(service.get(b"a".as_slice()), Some(&1));
4023        assert_eq!(service.get(b"b".as_slice()), Some(&1));
4024    }
4025
4026    #[test]
4027    fn explorer_index_strategy_rejects_first_value_semantics() {
4028        let dir = TempDir::new().expect("tempdir");
4029        let path = dir.path().join("indexed-first-value.journal");
4030        write_entries(&path, None, &[(&[b"TAG=one", b"TAG=two"], 1_000)]);
4031
4032        let mut reader = FileReader::open(&path).expect("open reader");
4033        let err = reader
4034            .explore_with_strategy(
4035                &ExplorerQuery {
4036                    facets: vec![b"TAG".to_vec()],
4037                    field_mode: ExplorerFieldMode::FirstValue,
4038                    ..ExplorerQuery::default()
4039                },
4040                ExplorerStrategy::Index,
4041            )
4042            .expect_err("first-value index strategy should be rejected");
4043
4044        assert!(matches!(err, SdkError::Unsupported(_)));
4045    }
4046
4047    #[test]
4048    fn explorer_first_value_counts_one_value_per_selected_field() {
4049        let dir = TempDir::new().expect("tempdir");
4050        let path = dir.path().join("first-value.journal");
4051        write_entries(
4052            &path,
4053            None,
4054            &[(&[b"TAG=one", b"TAG=two", b"SERVICE=a"], 1_000)],
4055        );
4056
4057        let mut all_values_reader = FileReader::open(&path).expect("open all-values reader");
4058        let all_values = all_values_reader
4059            .explore(&ExplorerQuery {
4060                facets: vec![b"TAG".to_vec()],
4061                limit: 0,
4062                field_mode: ExplorerFieldMode::AllValues,
4063                ..ExplorerQuery::default()
4064            })
4065            .expect("all-values explore");
4066        let all_tag = all_values
4067            .facets
4068            .get(b"TAG".as_slice())
4069            .expect("all-values tag facet");
4070        assert_eq!(all_tag.values().sum::<u64>(), 2);
4071        assert_eq!(all_tag.len(), 2);
4072
4073        let mut first_value_reader = FileReader::open(&path).expect("open first-value reader");
4074        let first_value = first_value_reader
4075            .explore(&ExplorerQuery {
4076                facets: vec![b"TAG".to_vec()],
4077                limit: 0,
4078                field_mode: ExplorerFieldMode::FirstValue,
4079                ..ExplorerQuery::default()
4080            })
4081            .expect("first-value explore");
4082        let first_tag = first_value
4083            .facets
4084            .get(b"TAG".as_slice())
4085            .expect("first-value tag facet");
4086        assert_eq!(first_tag.values().sum::<u64>(), 1);
4087        assert_eq!(first_tag.len(), 1);
4088        assert_eq!(first_value.stats.early_stops, 1);
4089    }
4090
4091    #[test]
4092    fn explorer_first_value_does_not_double_count_duplicate_facets_or_histogram() {
4093        let dir = TempDir::new().expect("tempdir");
4094        let path = dir.path().join("first-value-no-double-count.journal");
4095        write_entries(
4096            &path,
4097            None,
4098            &[(
4099                &[
4100                    b"_SOURCE_REALTIME_TIMESTAMP=1000",
4101                    b"TAG=one",
4102                    b"TAG=two",
4103                    b"MESSAGE=after-tag",
4104                ],
4105                1_000,
4106            )],
4107        );
4108
4109        let mut reader = FileReader::open(&path).expect("open reader");
4110        let result = reader
4111            .explore(&ExplorerQuery {
4112                facets: vec![b"TAG".to_vec()],
4113                histogram: Some(b"TAG".to_vec()),
4114                histogram_target_buckets: 1,
4115                limit: 0,
4116                ..ExplorerQuery::default()
4117            })
4118            .expect("explore");
4119
4120        assert_eq!(
4121            result
4122                .facets
4123                .get(b"TAG".as_slice())
4124                .expect("tag facet")
4125                .values()
4126                .sum::<u64>(),
4127            1
4128        );
4129        assert_eq!(
4130            result
4131                .histogram
4132                .as_ref()
4133                .expect("histogram")
4134                .buckets
4135                .iter()
4136                .flat_map(|bucket| bucket.values.values())
4137                .sum::<u64>(),
4138            1
4139        );
4140
4141        let mut all_values_reader = FileReader::open(&path).expect("open all-values reader");
4142        let all_values = all_values_reader
4143            .explore(&ExplorerQuery {
4144                facets: vec![b"TAG".to_vec()],
4145                histogram: Some(b"TAG".to_vec()),
4146                histogram_target_buckets: 1,
4147                limit: 0,
4148                field_mode: ExplorerFieldMode::AllValues,
4149                ..ExplorerQuery::default()
4150            })
4151            .expect("all-values explore");
4152
4153        assert_eq!(
4154            all_values
4155                .facets
4156                .get(b"TAG".as_slice())
4157                .expect("tag facet")
4158                .values()
4159                .sum::<u64>(),
4160            2
4161        );
4162        assert_eq!(
4163            all_values
4164                .histogram
4165                .as_ref()
4166                .expect("histogram")
4167                .buckets
4168                .iter()
4169                .flat_map(|bucket| bucket.values.values())
4170                .sum::<u64>(),
4171            2
4172        );
4173    }
4174
4175    #[test]
4176    fn explorer_first_value_tracks_required_field_identities() {
4177        let dir = TempDir::new().expect("tempdir");
4178        let path = dir.path().join("first-value-identities.journal");
4179        write_entries(
4180            &path,
4181            None,
4182            &[(&[b"TAG=one", b"TAG=two", b"SERVICE=a"], 1_000)],
4183        );
4184
4185        let mut reader = FileReader::open(&path).expect("open reader");
4186        let result = reader
4187            .explore(&ExplorerQuery {
4188                facets: vec![b"TAG".to_vec(), b"SERVICE".to_vec()],
4189                limit: 0,
4190                field_mode: ExplorerFieldMode::FirstValue,
4191                ..ExplorerQuery::default()
4192            })
4193            .expect("explore");
4194
4195        assert_eq!(
4196            result
4197                .facets
4198                .get(b"TAG".as_slice())
4199                .expect("tag facet")
4200                .values()
4201                .sum::<u64>(),
4202            1
4203        );
4204        assert_eq!(
4205            result
4206                .facets
4207                .get(b"SERVICE".as_slice())
4208                .and_then(|values| values.get(b"a".as_slice())),
4209            Some(&1)
4210        );
4211        assert_eq!(result.stats.early_stops, 1);
4212    }
4213
4214    #[test]
4215    fn explorer_rejects_duplicate_facet_fields() {
4216        let dir = TempDir::new().expect("tempdir");
4217        let path = dir.path().join("duplicate-facets.journal");
4218        write_entries(&path, None, &[(&[b"SERVICE=a"], 1_000)]);
4219
4220        let mut reader = FileReader::open(&path).expect("open reader");
4221        let err = reader
4222            .explore(&ExplorerQuery {
4223                facets: vec![b"SERVICE".to_vec(), b"SERVICE".to_vec()],
4224                limit: 0,
4225                ..ExplorerQuery::default()
4226            })
4227            .expect_err("duplicate facets rejected");
4228
4229        assert!(err.to_string().contains("must not be duplicated"));
4230    }
4231
4232    #[test]
4233    fn explorer_empty_result_keeps_requested_facet_with_no_values() {
4234        let dir = TempDir::new().expect("tempdir");
4235        let path = dir.path().join("empty-result.journal");
4236        write_entries(&path, None, &[(&[b"SERVICE=a", b"PRIORITY=3"], 1_000)]);
4237
4238        let mut reader = FileReader::open(&path).expect("open reader");
4239        let result = reader
4240            .explore(&ExplorerQuery {
4241                after_realtime_usec: Some(10_000),
4242                before_realtime_usec: Some(20_000),
4243                facets: vec![b"SERVICE".to_vec()],
4244                limit: 10,
4245                realtime_slack_usec: 0,
4246                ..ExplorerQuery::default()
4247            })
4248            .expect("explore");
4249
4250        assert!(result.rows.is_empty());
4251        assert_eq!(result.stats.rows_matched, 0);
4252        assert!(
4253            result
4254                .facets
4255                .get(b"SERVICE".as_slice())
4256                .expect("service facet")
4257                .is_empty()
4258        );
4259    }
4260
4261    #[test]
4262    fn explorer_facet_time_bounds_do_not_count_slack_rows_without_source_realtime() {
4263        let dir = TempDir::new().expect("tempdir");
4264        let path = dir.path().join("facet-time-bound.journal");
4265        write_entries(
4266            &path,
4267            None,
4268            &[
4269                (&[b"SERVICE=before"], 340_000_000),
4270                (&[b"SERVICE=inside"], 360_000_000),
4271                (&[b"SERVICE=after"], 400_000_000),
4272            ],
4273        );
4274
4275        let mut reader = FileReader::open(&path).expect("open reader");
4276        let result = reader
4277            .explore(&ExplorerQuery {
4278                after_realtime_usec: Some(350_000_000),
4279                before_realtime_usec: Some(370_000_000),
4280                facets: vec![b"SERVICE".to_vec()],
4281                limit: 0,
4282                realtime_slack_usec: 20_000_000,
4283                use_source_realtime: false,
4284                ..ExplorerQuery::default()
4285            })
4286            .expect("explore");
4287
4288        let service = result
4289            .facets
4290            .get(b"SERVICE".as_slice())
4291            .expect("service facet");
4292        assert_eq!(service.get(b"inside".as_slice()), Some(&1));
4293        assert_eq!(service.get(b"before".as_slice()), None);
4294        assert_eq!(service.get(b"after".as_slice()), None);
4295        assert_eq!(result.stats.facet_rows_matched, 1);
4296    }
4297
4298    #[test]
4299    fn explorer_fts_disables_first_value_early_stop() {
4300        let dir = TempDir::new().expect("tempdir");
4301        let path = dir.path().join("fts-no-early-stop.journal");
4302        write_entries(&path, None, &[(&[b"TAG=one", b"MESSAGE=needle"], 1_000)]);
4303
4304        let mut reader = FileReader::open(&path).expect("open reader");
4305        let result = reader
4306            .explore(&ExplorerQuery {
4307                facets: vec![b"TAG".to_vec()],
4308                fts_patterns: vec![b"needle".to_vec()],
4309                limit: 0,
4310                ..ExplorerQuery::default()
4311            })
4312            .expect("explore");
4313
4314        assert_eq!(result.stats.early_stops, 0);
4315        assert_eq!(result.stats.data_refs_seen, 2);
4316        assert_eq!(
4317            result
4318                .facets
4319                .get(b"TAG".as_slice())
4320                .and_then(|values| values.get(b"one".as_slice())),
4321            Some(&1)
4322        );
4323    }
4324
4325    #[test]
4326    fn explorer_fts_or_terms_and_negative_terms_filter_rows() {
4327        let dir = TempDir::new().expect("tempdir");
4328        let path = dir.path().join("fts-negative.journal");
4329        write_entries(
4330            &path,
4331            None,
4332            &[
4333                (&[b"TAG=alpha", b"MESSAGE=alpha keep"], 1_000),
4334                (&[b"TAG=beta", b"MESSAGE=beta keep"], 2_000),
4335                (&[b"TAG=debug", b"MESSAGE=alpha debug"], 3_000),
4336                (&[b"TAG=other", b"MESSAGE=other"], 4_000),
4337                (&[b"TAG=wild", b"MESSAGE=start middle end"], 5_000),
4338            ],
4339        );
4340
4341        let mut reader = FileReader::open(&path).expect("open reader");
4342        let result = reader
4343            .explore(&ExplorerQuery {
4344                facets: vec![b"TAG".to_vec()],
4345                fts_terms: vec![
4346                    ExplorerFtsPattern::substring(b"alpha".to_vec(), false),
4347                    ExplorerFtsPattern::substring(b"beta".to_vec(), false),
4348                    ExplorerFtsPattern::substring(b"debug".to_vec(), true),
4349                    ExplorerFtsPattern::substring(b"start*end".to_vec(), false),
4350                ],
4351                limit: 10,
4352                ..ExplorerQuery::default()
4353            })
4354            .expect("explore");
4355
4356        let tag = result.facets.get(b"TAG".as_slice()).expect("TAG facet");
4357        assert_eq!(result.rows.len(), 3);
4358        assert_eq!(tag.get(b"alpha".as_slice()), Some(&1));
4359        assert_eq!(tag.get(b"beta".as_slice()), Some(&1));
4360        assert_eq!(tag.get(b"wild".as_slice()), Some(&1));
4361        assert_eq!(tag.get(b"debug".as_slice()), None);
4362        assert_eq!(tag.get(b"other".as_slice()), None);
4363    }
4364
4365    #[test]
4366    fn explorer_auto_anchor_scans_backward_from_tail() {
4367        let dir = TempDir::new().expect("tempdir");
4368        let path = dir.path().join("backward.journal");
4369        write_entries(
4370            &path,
4371            None,
4372            &[
4373                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
4374                (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
4375            ],
4376        );
4377
4378        let mut reader = FileReader::open(&path).expect("open reader");
4379        let query = ExplorerQuery {
4380            direction: Direction::Backward,
4381            limit: 2,
4382            ..ExplorerQuery::default()
4383        };
4384
4385        let result = reader.explore(&query).expect("explore");
4386        assert_eq!(result.rows.len(), 2);
4387        assert_eq!(result.rows[0].realtime_usec, 2_000);
4388        assert_eq!(result.rows[1].realtime_usec, 1_000);
4389    }
4390
4391    #[test]
4392    fn explorer_backward_time_bound_stops_after_slack_window() {
4393        let dir = TempDir::new().expect("tempdir");
4394        let path = dir.path().join("backward-time-bound.journal");
4395        write_entries(
4396            &path,
4397            None,
4398            &[
4399                (&[b"SERVICE=a"], 100_000_000),
4400                (&[b"SERVICE=b"], 200_000_000),
4401                (&[b"SERVICE=c"], 300_000_000),
4402                (&[b"SERVICE=d"], 400_000_000),
4403                (&[b"SERVICE=e"], 500_000_000),
4404            ],
4405        );
4406
4407        let mut reader = FileReader::open(&path).expect("open reader");
4408        let query = ExplorerQuery {
4409            after_realtime_usec: Some(350_000_000),
4410            direction: Direction::Backward,
4411            limit: 10,
4412            realtime_slack_usec: 10_000_000,
4413            ..ExplorerQuery::default()
4414        };
4415
4416        let result = reader.explore(&query).expect("explore");
4417        assert_eq!(result.rows.len(), 2);
4418        assert_eq!(result.rows[0].realtime_usec, 500_000_000);
4419        assert_eq!(result.rows[1].realtime_usec, 400_000_000);
4420        assert_eq!(result.stats.rows_examined, 2);
4421    }
4422
4423    #[test]
4424    fn explorer_histogram_and_fts_are_opt_in() {
4425        let dir = TempDir::new().expect("tempdir");
4426        let path = dir.path().join("histogram.journal");
4427        write_entries(
4428            &path,
4429            None,
4430            &[
4431                (&[b"MESSAGE=alpha", b"PRIORITY=3"], 1_000),
4432                (&[b"MESSAGE=beta", b"PRIORITY=4"], 2_000),
4433            ],
4434        );
4435
4436        let mut reader = FileReader::open(&path).expect("open reader");
4437        let query = ExplorerQuery {
4438            after_realtime_usec: Some(0),
4439            before_realtime_usec: Some(3_000),
4440            histogram: Some(b"PRIORITY".to_vec()),
4441            histogram_target_buckets: 2,
4442            fts_patterns: vec![b"alp".to_vec()],
4443            limit: 10,
4444            ..ExplorerQuery::default()
4445        };
4446
4447        let result = reader.explore(&query).expect("explore");
4448        assert_eq!(result.rows.len(), 1);
4449        assert!(result.stats.fts_scans > 0);
4450        assert_eq!(
4451            result
4452                .histogram
4453                .as_ref()
4454                .expect("histogram")
4455                .buckets
4456                .iter()
4457                .flat_map(|bucket| bucket.values.values())
4458                .sum::<u64>(),
4459            1
4460        );
4461    }
4462
4463    #[test]
4464    fn explorer_first_value_stops_after_same_data_satisfies_multiple_roles() {
4465        let dir = TempDir::new().expect("tempdir");
4466        let path = dir.path().join("same-data-multiple-roles.journal");
4467        write_entries(
4468            &path,
4469            None,
4470            &[(
4471                &[b"_SOURCE_REALTIME_TIMESTAMP=1000", b"MESSAGE=after-source"],
4472                1_000,
4473            )],
4474        );
4475
4476        let mut reader = FileReader::open(&path).expect("open reader");
4477        let result = reader
4478            .explore(&ExplorerQuery {
4479                histogram: Some(SOURCE_REALTIME_FIELD.to_vec()),
4480                histogram_target_buckets: 1,
4481                limit: 0,
4482                field_mode: ExplorerFieldMode::FirstValue,
4483                ..ExplorerQuery::default()
4484            })
4485            .expect("explore");
4486
4487        assert_eq!(result.stats.histogram_updates, 1);
4488        assert_eq!(result.stats.early_stops, 1);
4489        assert_eq!(
4490            result
4491                .histogram
4492                .as_ref()
4493                .expect("histogram")
4494                .buckets
4495                .iter()
4496                .flat_map(|bucket| bucket.values.values())
4497                .sum::<u64>(),
4498            1
4499        );
4500    }
4501}