Skip to main content

journal/
explorer.rs

1use super::*;
2use journal_core::file::{DataObject, offset_array::InlinedCursor};
3use std::collections::{HashMap, HashSet};
4use std::time::{Duration, Instant};
5
6const DEFAULT_HISTOGRAM_TARGET_BUCKETS: usize = 150;
7const DEFAULT_TIME_SLACK_USEC: u64 = 120_000_000;
8const EXPLORER_CONTROL_CHECK_EVERY_ROWS: u64 = 8192;
9const DEFAULT_ROWS_FULL_CHECK_EVERY_ROWS: u64 = 1;
10const EXPLORER_SAMPLING_SLOTS_MAX: usize = 1000;
11const EXPLORER_SAMPLING_RECALIBRATE_ROWS: u64 = 10_000;
12const EXPLORER_SAMPLING_ESTIMATE_AFTER_PROGRESS: f64 = 0.01;
13const SOURCE_REALTIME_FIELD: &[u8] = b"_SOURCE_REALTIME_TIMESTAMP";
14const UNSET_VALUE: &[u8] = b"-";
15const EXPLORER_UNSAMPLED_VALUE: &[u8] = b"[unsampled]";
16const EXPLORER_ESTIMATED_VALUE: &[u8] = b"[estimated]";
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum ExplorerAnchor {
20    Auto,
21    Head,
22    Tail,
23    Realtime(u64),
24}
25
26impl Default for ExplorerAnchor {
27    fn default() -> Self {
28        Self::Auto
29    }
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum ExplorerFieldMode {
34    AllValues,
35    FirstValue,
36}
37
38impl Default for ExplorerFieldMode {
39    fn default() -> Self {
40        Self::FirstValue
41    }
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45#[non_exhaustive]
46pub enum ExplorerStrategy {
47    Traversal,
48    Index,
49    Compare,
50}
51
52impl Default for ExplorerStrategy {
53    fn default() -> Self {
54        Self::Traversal
55    }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct ExplorerFilter {
60    pub field: Vec<u8>,
61    pub values: Vec<Vec<u8>>,
62}
63
64impl ExplorerFilter {
65    pub fn new(
66        field: impl Into<Vec<u8>>,
67        values: impl IntoIterator<Item = impl Into<Vec<u8>>>,
68    ) -> Self {
69        Self {
70            field: field.into(),
71            values: values.into_iter().map(Into::into).collect(),
72        }
73    }
74}
75
76#[derive(Debug, Clone)]
77pub struct ExplorerQuery {
78    pub after_realtime_usec: Option<u64>,
79    pub before_realtime_usec: Option<u64>,
80    pub anchor: ExplorerAnchor,
81    pub direction: Direction,
82    pub limit: usize,
83    pub filters: Vec<ExplorerFilter>,
84    pub facets: Vec<Vec<u8>>,
85    pub histogram: Option<Vec<u8>>,
86    pub histogram_after_realtime_usec: Option<u64>,
87    pub histogram_before_realtime_usec: Option<u64>,
88    pub histogram_target_buckets: usize,
89    pub fts_terms: Vec<ExplorerFtsPattern>,
90    pub fts_patterns: Vec<Vec<u8>>,
91    pub fts_negative_patterns: Vec<Vec<u8>>,
92    pub field_mode: ExplorerFieldMode,
93    pub exclude_facet_field_filters: bool,
94    pub use_source_realtime: bool,
95    pub realtime_slack_usec: u64,
96    pub stop_when_rows_full: bool,
97    pub stop_when_rows_full_check_every: u64,
98    pub sampling: Option<ExplorerSampling>,
99    /// Debug-only discrepancy tool. Production explorer callers must never set
100    /// this; column catalogs belong to FIELD indexes, not row traversal.
101    #[doc(hidden)]
102    pub debug_collect_column_fields_by_row_traversal: bool,
103}
104
105impl Default for ExplorerQuery {
106    fn default() -> Self {
107        Self {
108            after_realtime_usec: None,
109            before_realtime_usec: None,
110            anchor: ExplorerAnchor::Auto,
111            direction: Direction::Forward,
112            limit: 200,
113            filters: Vec::new(),
114            facets: Vec::new(),
115            histogram: None,
116            histogram_after_realtime_usec: None,
117            histogram_before_realtime_usec: None,
118            histogram_target_buckets: DEFAULT_HISTOGRAM_TARGET_BUCKETS,
119            fts_terms: Vec::new(),
120            fts_patterns: Vec::new(),
121            fts_negative_patterns: Vec::new(),
122            field_mode: ExplorerFieldMode::FirstValue,
123            exclude_facet_field_filters: true,
124            use_source_realtime: true,
125            realtime_slack_usec: DEFAULT_TIME_SLACK_USEC,
126            stop_when_rows_full: false,
127            stop_when_rows_full_check_every: DEFAULT_ROWS_FULL_CHECK_EVERY_ROWS,
128            sampling: None,
129            debug_collect_column_fields_by_row_traversal: false,
130        }
131    }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub struct ExplorerSampling {
136    pub budget: u64,
137    pub matched_files: u64,
138    pub file_head_realtime_usec: u64,
139    pub file_tail_realtime_usec: u64,
140    pub file_head_seqnum: u64,
141    pub file_tail_seqnum: u64,
142    pub file_entries: u64,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct ExplorerFtsPattern {
147    pub parts: Vec<Vec<u8>>,
148    pub negative: bool,
149}
150
151impl ExplorerFtsPattern {
152    pub fn substring(pattern: impl Into<Vec<u8>>, negative: bool) -> Self {
153        let pattern = pattern.into();
154        let parts = pattern
155            .split(|byte| *byte == b'*')
156            .filter(|part| !part.is_empty())
157            .map(|part| part.to_vec())
158            .collect();
159        Self { parts, negative }
160    }
161
162    fn matches(&self, value: &[u8]) -> bool {
163        if value.is_empty() {
164            return false;
165        }
166        if self.parts.is_empty() {
167            return true;
168        }
169
170        let mut haystack = value;
171        for part in &self.parts {
172            let Some(index) = find_ascii_case_insensitive(haystack, part) else {
173                return false;
174            };
175            haystack = &haystack[index.saturating_add(part.len())..];
176        }
177        true
178    }
179}
180
181impl ExplorerQuery {
182    pub fn with_filter(
183        mut self,
184        field: impl Into<Vec<u8>>,
185        values: impl IntoIterator<Item = impl Into<Vec<u8>>>,
186    ) -> Self {
187        self.filters.push(ExplorerFilter::new(field, values));
188        self
189    }
190
191    pub fn with_facet(mut self, field: impl Into<Vec<u8>>) -> Self {
192        self.facets.push(field.into());
193        self
194    }
195
196    pub fn with_histogram(mut self, field: impl Into<Vec<u8>>) -> Self {
197        self.histogram = Some(field.into());
198        self
199    }
200
201    pub fn with_fts_pattern(mut self, pattern: impl Into<Vec<u8>>) -> Self {
202        let pattern = pattern.into();
203        self.fts_terms
204            .push(ExplorerFtsPattern::substring(pattern.clone(), false));
205        self.fts_patterns.push(pattern);
206        self
207    }
208
209    pub fn with_fts_negative_pattern(mut self, pattern: impl Into<Vec<u8>>) -> Self {
210        let pattern = pattern.into();
211        self.fts_terms
212            .push(ExplorerFtsPattern::substring(pattern.clone(), true));
213        self.fts_negative_patterns.push(pattern);
214        self
215    }
216}
217
218#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize)]
219pub struct ExplorerStats {
220    pub rows_examined: u64,
221    pub rows_matched: u64,
222    pub facet_rows_matched: u64,
223    pub rows_returned: u64,
224    pub rows_unsampled: u64,
225    pub rows_estimated: u64,
226    pub sampling_sampled: u64,
227    pub sampling_unsampled: u64,
228    pub sampling_estimated: u64,
229    pub last_realtime_usec: u64,
230    pub max_source_realtime_delta_usec: u64,
231    pub data_refs_seen: u64,
232    pub data_refs_skipped: u64,
233    pub data_payloads_loaded: u64,
234    pub data_objects_classified: u64,
235    pub data_cache_hits: u64,
236    pub data_cache_misses: u64,
237    pub payloads_decompressed: u64,
238    pub fts_scans: u64,
239    pub facet_updates: u64,
240    pub histogram_updates: u64,
241    pub returned_row_expansions: u64,
242    pub early_stop_opportunities: u64,
243    pub early_stops: u64,
244}
245
246#[derive(Debug, Clone)]
247pub struct ExplorerRow {
248    pub realtime_usec: u64,
249    pub cursor: String,
250    pub payloads: Vec<Vec<u8>>,
251}
252
253#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub(crate) enum ExplorerRowPayloadMode {
255    Expand,
256    CursorOnly,
257}
258
259#[derive(Debug, Clone)]
260pub struct ExplorerHistogramBucket {
261    pub start_realtime_usec: u64,
262    pub end_realtime_usec: u64,
263    pub values: HashMap<Vec<u8>, u64>,
264}
265
266#[derive(Debug, Clone)]
267pub struct ExplorerHistogram {
268    pub field: Vec<u8>,
269    pub buckets: Vec<ExplorerHistogramBucket>,
270}
271
272#[derive(Debug, Clone, Default)]
273pub struct ExplorerComparison {
274    pub traversal_duration: Duration,
275    pub index_duration: Duration,
276    pub traversal_stats: ExplorerStats,
277    pub index_stats: ExplorerStats,
278}
279
280#[derive(Debug, Clone, Default)]
281pub struct ExplorerResult {
282    pub rows: Vec<ExplorerRow>,
283    pub facets: HashMap<Vec<u8>, HashMap<Vec<u8>, u64>>,
284    pub histogram: Option<ExplorerHistogram>,
285    pub column_fields: HashSet<Vec<u8>>,
286    pub stats: ExplorerStats,
287    pub comparison: Option<ExplorerComparison>,
288}
289
290#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
291pub enum ExplorerStopReason {
292    TimedOut,
293    Cancelled,
294}
295
296#[derive(Debug, Clone)]
297pub struct ExplorerProgress {
298    pub stats: ExplorerStats,
299    pub elapsed: Duration,
300}
301
302pub struct ExplorerControl<'a> {
303    deadline: Option<Instant>,
304    cancellation: Option<&'a dyn Fn() -> bool>,
305    progress: Option<&'a mut dyn FnMut(ExplorerProgress)>,
306    candidate_row: Option<&'a mut dyn FnMut(u64) -> bool>,
307    adjust_realtime: Option<&'a mut dyn FnMut(u64) -> u64>,
308    matched_row: Option<&'a mut dyn FnMut(u64, u64) -> bool>,
309    sampling: Option<&'a mut ExplorerSamplingState>,
310    progress_interval: Duration,
311    started: Instant,
312    last_progress: Instant,
313    next_check_rows: u64,
314    stop_reason: Option<ExplorerStopReason>,
315}
316
317impl<'a> ExplorerControl<'a> {
318    pub fn new() -> Self {
319        let now = Instant::now();
320        Self {
321            deadline: None,
322            cancellation: None,
323            progress: None,
324            candidate_row: None,
325            adjust_realtime: None,
326            matched_row: None,
327            sampling: None,
328            progress_interval: Duration::from_millis(250),
329            started: now,
330            last_progress: now,
331            next_check_rows: EXPLORER_CONTROL_CHECK_EVERY_ROWS,
332            stop_reason: None,
333        }
334    }
335
336    pub fn set_deadline(&mut self, deadline: Option<Instant>) {
337        self.deadline = deadline;
338    }
339
340    pub fn set_cancellation_callback(&mut self, cancellation: Option<&'a dyn Fn() -> bool>) {
341        self.cancellation = cancellation;
342    }
343
344    pub fn set_progress_callback(&mut self, progress: Option<&'a mut dyn FnMut(ExplorerProgress)>) {
345        self.progress = progress;
346    }
347
348    pub(crate) fn set_candidate_row_callback(
349        &mut self,
350        candidate_row: Option<&'a mut dyn FnMut(u64) -> bool>,
351    ) {
352        self.candidate_row = candidate_row;
353    }
354
355    pub(crate) fn set_realtime_adjust_callback(
356        &mut self,
357        adjust_realtime: Option<&'a mut dyn FnMut(u64) -> u64>,
358    ) {
359        self.adjust_realtime = adjust_realtime;
360    }
361
362    pub fn set_matched_row_callback(
363        &mut self,
364        matched_row: Option<&'a mut dyn FnMut(u64, u64) -> bool>,
365    ) {
366        self.matched_row = matched_row;
367    }
368
369    pub(crate) fn set_sampling_state(&mut self, sampling: Option<&'a mut ExplorerSamplingState>) {
370        self.sampling = sampling;
371    }
372
373    pub fn set_progress_interval(&mut self, interval: Duration) {
374        self.progress_interval = interval;
375    }
376
377    pub fn stop_reason(&self) -> Option<ExplorerStopReason> {
378        self.stop_reason
379    }
380
381    fn should_stop_after_rows(&mut self, rows_seen: u64, stats: &ExplorerStats) -> bool {
382        if self.stop_reason.is_some() {
383            return true;
384        }
385        if rows_seen < self.next_check_rows {
386            return false;
387        }
388        self.next_check_rows = rows_seen.saturating_add(EXPLORER_CONTROL_CHECK_EVERY_ROWS);
389        self.check(stats)
390    }
391
392    fn check(&mut self, stats: &ExplorerStats) -> bool {
393        let now = Instant::now();
394        if now.duration_since(self.last_progress) >= self.progress_interval {
395            self.emit_progress(stats, now);
396        }
397        if self.cancellation.is_some_and(|is_cancelled| is_cancelled()) {
398            self.stop_reason = Some(ExplorerStopReason::Cancelled);
399            self.emit_progress(stats, now);
400            return true;
401        }
402        if self.deadline.is_some_and(|deadline| now >= deadline) {
403            self.stop_reason = Some(ExplorerStopReason::TimedOut);
404            self.emit_progress(stats, now);
405            return true;
406        }
407        false
408    }
409
410    fn emit_progress(&mut self, stats: &ExplorerStats, now: Instant) {
411        self.last_progress = now;
412        if let Some(progress) = self.progress.as_deref_mut() {
413            progress(ExplorerProgress {
414                stats: stats.clone(),
415                elapsed: now.duration_since(self.started),
416            });
417        }
418    }
419
420    fn emit_matched_row(&mut self, realtime_usec: u64, rows_matched: u64) -> bool {
421        if let Some(matched_row) = self.matched_row.as_deref_mut() {
422            return matched_row(realtime_usec, rows_matched);
423        }
424        false
425    }
426
427    fn adjust_realtime(&mut self, realtime_usec: u64) -> u64 {
428        self.adjust_realtime
429            .as_deref_mut()
430            .map_or(realtime_usec, |adjust_realtime| {
431                adjust_realtime(realtime_usec)
432            })
433    }
434}
435
436impl Default for ExplorerControl<'_> {
437    fn default() -> Self {
438        Self::new()
439    }
440}
441
442#[derive(Debug, Clone, Copy, PartialEq, Eq)]
443enum ExplorerSamplingDecision {
444    Full {
445        sampled: bool,
446    },
447    SkipFields,
448    StopAndEstimate {
449        remaining_rows: u64,
450        from_realtime_usec: u64,
451        to_realtime_usec: u64,
452    },
453}
454
455#[derive(Debug)]
456pub(crate) struct ExplorerSamplingState {
457    start_realtime_usec: u64,
458    end_realtime_usec: u64,
459    file_head_realtime_usec: u64,
460    file_tail_realtime_usec: u64,
461    file_head_seqnum: u64,
462    file_tail_seqnum: u64,
463    file_entries: u64,
464    first_realtime_usec: Option<u64>,
465    step_realtime_usec: u64,
466    enable_after_samples: u64,
467    per_file_enable_after_samples: u64,
468    per_slot_enable_after_samples: u64,
469    sampled: u64,
470    per_file_sampled: u64,
471    per_file_unsampled: u64,
472    per_file_every: u64,
473    per_file_skipped: u64,
474    per_file_recalibrate: u64,
475    per_slot_sampled: Vec<u64>,
476    per_slot_unsampled: Vec<u64>,
477    matched_files: u64,
478    direction: Direction,
479}
480
481impl ExplorerSamplingState {
482    pub(crate) fn for_query(
483        query: &ExplorerQuery,
484        histogram_bucket_count: Option<usize>,
485    ) -> Option<Self> {
486        let sampling = query.sampling?;
487        let start_realtime_usec = query.after_realtime_usec?;
488        let end_realtime_usec = query.before_realtime_usec?;
489        if sampling.budget == 0
490            || sampling.matched_files == 0
491            || start_realtime_usec >= end_realtime_usec
492        {
493            return None;
494        }
495
496        let slots = histogram_bucket_count
497            .unwrap_or(query.histogram_target_buckets)
498            .clamp(2, EXPLORER_SAMPLING_SLOTS_MAX);
499        let delta = end_realtime_usec.saturating_sub(start_realtime_usec);
500        let step_realtime_usec = (delta / slots as u64).saturating_sub(1).max(1);
501        let per_file_enable_after_samples =
502            ((sampling.budget / 4) / sampling.matched_files.max(1)).max(query.limit as u64);
503        let per_slot_enable_after_samples =
504            ((sampling.budget / 4) / slots as u64).max(query.limit as u64);
505
506        Some(Self {
507            start_realtime_usec,
508            end_realtime_usec,
509            file_head_realtime_usec: sampling.file_head_realtime_usec,
510            file_tail_realtime_usec: sampling.file_tail_realtime_usec,
511            file_head_seqnum: sampling.file_head_seqnum,
512            file_tail_seqnum: sampling.file_tail_seqnum,
513            file_entries: sampling.file_entries,
514            first_realtime_usec: None,
515            step_realtime_usec,
516            enable_after_samples: sampling.budget / 2,
517            per_file_enable_after_samples,
518            per_slot_enable_after_samples,
519            sampled: 0,
520            per_file_sampled: 0,
521            per_file_unsampled: 0,
522            per_file_every: 0,
523            per_file_skipped: 0,
524            per_file_recalibrate: 0,
525            per_slot_sampled: vec![0; slots],
526            per_slot_unsampled: vec![0; slots],
527            matched_files: sampling.matched_files.max(1),
528            direction: query.direction,
529        })
530    }
531
532    fn begin_file(&mut self, sampling: ExplorerSampling) {
533        self.file_head_realtime_usec = sampling.file_head_realtime_usec;
534        self.file_tail_realtime_usec = sampling.file_tail_realtime_usec;
535        self.file_head_seqnum = sampling.file_head_seqnum;
536        self.file_tail_seqnum = sampling.file_tail_seqnum;
537        self.file_entries = sampling.file_entries;
538        self.first_realtime_usec = None;
539        self.per_file_sampled = 0;
540        self.per_file_unsampled = 0;
541        self.per_file_every = 0;
542        self.per_file_skipped = 0;
543        self.per_file_recalibrate = 0;
544    }
545
546    fn decide(
547        &mut self,
548        realtime_usec: u64,
549        seqnum: u64,
550        candidate_to_keep: bool,
551    ) -> ExplorerSamplingDecision {
552        if self.first_realtime_usec.is_none() {
553            self.first_realtime_usec = Some(realtime_usec);
554        }
555        if candidate_to_keep {
556            return ExplorerSamplingDecision::Full { sampled: false };
557        }
558
559        let slot = self.slot_for_realtime(realtime_usec);
560        let should_sample = if self.sampled < self.enable_after_samples
561            || self.per_file_sampled < self.per_file_enable_after_samples
562            || self.per_slot_sampled[slot] < self.per_slot_enable_after_samples
563        {
564            true
565        } else if self.per_file_recalibrate >= EXPLORER_SAMPLING_RECALIBRATE_ROWS
566            || self.per_file_every == 0
567        {
568            self.recalibrate(realtime_usec, seqnum);
569            true
570        } else if self.per_file_skipped >= self.per_file_every {
571            self.per_file_skipped = 0;
572            true
573        } else {
574            self.per_file_skipped = self.per_file_skipped.saturating_add(1);
575            false
576        };
577
578        if should_sample {
579            self.sampled = self.sampled.saturating_add(1);
580            self.per_file_sampled = self.per_file_sampled.saturating_add(1);
581            self.per_slot_sampled[slot] = self.per_slot_sampled[slot].saturating_add(1);
582            return ExplorerSamplingDecision::Full { sampled: true };
583        }
584
585        self.per_file_recalibrate = self.per_file_recalibrate.saturating_add(1);
586        self.per_file_unsampled = self.per_file_unsampled.saturating_add(1);
587        self.per_slot_unsampled[slot] = self.per_slot_unsampled[slot].saturating_add(1);
588
589        if self.per_file_unsampled > self.per_file_sampled
590            && self.progress_by_time(realtime_usec) > EXPLORER_SAMPLING_ESTIMATE_AFTER_PROGRESS
591        {
592            let remaining_rows = self.estimate_remaining_rows(realtime_usec, seqnum);
593            let (from_realtime_usec, to_realtime_usec) = self.remaining_range(realtime_usec);
594            return ExplorerSamplingDecision::StopAndEstimate {
595                remaining_rows,
596                from_realtime_usec,
597                to_realtime_usec,
598            };
599        }
600
601        ExplorerSamplingDecision::SkipFields
602    }
603
604    fn slot_for_realtime(&self, realtime_usec: u64) -> usize {
605        let clamped = realtime_usec.clamp(self.start_realtime_usec, self.end_realtime_usec);
606        let slot =
607            (clamped.saturating_sub(self.start_realtime_usec) / self.step_realtime_usec) as usize;
608        slot.min(self.per_slot_sampled.len().saturating_sub(1))
609    }
610
611    fn recalibrate(&mut self, realtime_usec: u64, seqnum: u64) {
612        let remaining_rows = self.estimate_remaining_rows(realtime_usec, seqnum);
613        let wanted_samples = (self.enable_after_samples / self.matched_files).max(1);
614        self.per_file_every = (remaining_rows / wanted_samples).max(1);
615        self.per_file_recalibrate = 0;
616    }
617
618    fn estimate_remaining_rows(&self, realtime_usec: u64, seqnum: u64) -> u64 {
619        if let Some(remaining) = self.estimate_remaining_rows_by_seqnum(seqnum) {
620            return remaining;
621        }
622        self.estimate_remaining_rows_by_time(realtime_usec)
623    }
624
625    fn estimate_remaining_rows_by_seqnum(&self, seqnum: u64) -> Option<u64> {
626        self.validate_seqnum_estimate_inputs(seqnum)?;
627        let scanned_rows = self.scanned_file_rows();
628        let seqnum_span = self.seqnum_span_so_far(seqnum)?;
629        if seqnum_span == 0 {
630            return None;
631        }
632        let proportion_of_all_lines_so_far =
633            bounded_positive_proportion(scanned_rows as f64 / seqnum_span as f64)?;
634        let expected_matching_logs =
635            (proportion_of_all_lines_so_far * self.file_entries as f64) as u64;
636        if expected_matching_logs == 0 {
637            return None;
638        }
639        // Match Netdata systemd-journal.plugin sampling_running_file_query_estimate_remaining_lines():
640        // remaining_logs_by_seqnum = expected_matching_logs_by_seqnum - scanned_lines,
641        // clamped to one when the subtraction reaches zero.
642        Some(expected_matching_logs.saturating_sub(scanned_rows).max(1))
643    }
644
645    fn validate_seqnum_estimate_inputs(&self, seqnum: u64) -> Option<()> {
646        (self.file_entries != 0
647            && self.file_head_seqnum != 0
648            && self.file_tail_seqnum != 0
649            && seqnum != 0)
650            .then_some(())
651    }
652
653    fn scanned_file_rows(&self) -> u64 {
654        self.per_file_sampled
655            .saturating_add(self.per_file_unsampled)
656            .max(1)
657    }
658
659    fn seqnum_span_so_far(&self, seqnum: u64) -> Option<u64> {
660        match self.direction {
661            Direction::Forward => seqnum.checked_sub(self.file_head_seqnum),
662            Direction::Backward => self.file_tail_seqnum.checked_sub(seqnum),
663        }
664    }
665
666    fn estimate_remaining_rows_by_time(&self, realtime_usec: u64) -> u64 {
667        let scanned_rows = self.scanned_file_rows();
668        let (after, before) = self.overlapping_timeframe(realtime_usec);
669        let total_time = self
670            .remaining_time_bounds(realtime_usec, after, before)
671            .0
672            .max(1);
673        let remaining_time = self.remaining_time_bounds(realtime_usec, after, before).1;
674        let elapsed = total_time.saturating_sub(remaining_time).max(1);
675        let mut proportion_by_time = elapsed as f64 / total_time as f64;
676        if proportion_by_time == 0.0 || proportion_by_time > 1.0 || !proportion_by_time.is_finite()
677        {
678            proportion_by_time = 1.0;
679        }
680        let mut expected_total = (scanned_rows as f64 / proportion_by_time) as u64;
681        if self.file_entries != 0 && expected_total > self.file_entries {
682            expected_total = self.file_entries;
683        }
684        expected_total.saturating_sub(scanned_rows).max(1)
685    }
686
687    fn progress_by_time(&self, realtime_usec: u64) -> f64 {
688        let (after, before) = self.overlapping_timeframe(realtime_usec);
689        let total_time = before.saturating_sub(after).max(1);
690        let elapsed = match self.direction {
691            Direction::Forward => realtime_usec.saturating_sub(after),
692            Direction::Backward => before.saturating_sub(realtime_usec),
693        }
694        .min(total_time);
695        elapsed as f64 / total_time as f64
696    }
697
698    fn overlapping_timeframe(&self, realtime_usec: u64) -> (u64, u64) {
699        match self.direction {
700            Direction::Forward => {
701                let mut oldest = self
702                    .first_realtime_usec
703                    .or((self.file_head_realtime_usec != 0).then_some(self.file_head_realtime_usec))
704                    .unwrap_or(self.start_realtime_usec);
705                let mut newest = if self.file_tail_realtime_usec != 0 {
706                    self.end_realtime_usec.min(self.file_tail_realtime_usec)
707                } else {
708                    self.end_realtime_usec
709                };
710                if newest <= oldest {
711                    newest = oldest.saturating_add(1);
712                }
713                if realtime_usec < oldest {
714                    oldest = realtime_usec.saturating_sub(1);
715                }
716                (oldest, newest)
717            }
718            Direction::Backward => {
719                let mut newest = self
720                    .first_realtime_usec
721                    .or((self.file_tail_realtime_usec != 0).then_some(self.file_tail_realtime_usec))
722                    .unwrap_or(self.end_realtime_usec);
723                let oldest = if self.file_head_realtime_usec != 0 {
724                    self.start_realtime_usec.max(self.file_head_realtime_usec)
725                } else {
726                    self.start_realtime_usec
727                };
728                if newest <= oldest {
729                    newest = oldest.saturating_add(1);
730                }
731                if newest < realtime_usec {
732                    newest = realtime_usec.saturating_add(1);
733                }
734                (oldest, newest)
735            }
736        }
737    }
738
739    fn remaining_range(&self, realtime_usec: u64) -> (u64, u64) {
740        let (after, before) = self.overlapping_timeframe(realtime_usec);
741        let (_, _, remaining_start, remaining_end) =
742            self.remaining_time_details(realtime_usec, after, before);
743        (remaining_start, remaining_end)
744    }
745
746    fn remaining_time_bounds(&self, realtime_usec: u64, after: u64, before: u64) -> (u64, u64) {
747        let (total, remaining, _, _) = self.remaining_time_details(realtime_usec, after, before);
748        (total, remaining)
749    }
750
751    fn remaining_time_details(
752        &self,
753        realtime_usec: u64,
754        mut after: u64,
755        mut before: u64,
756    ) -> (u64, u64, u64, u64) {
757        if realtime_usec <= after {
758            after = realtime_usec.saturating_sub(1);
759        }
760        if realtime_usec >= before {
761            before = realtime_usec.saturating_add(1);
762        }
763        if before <= after {
764            before = after.saturating_add(1);
765        }
766        let (remaining_start, remaining_end) = match self.direction {
767            Direction::Forward => (realtime_usec, before),
768            Direction::Backward => (after, realtime_usec),
769        };
770        (
771            before.saturating_sub(after).max(1),
772            remaining_end.saturating_sub(remaining_start),
773            remaining_start,
774            remaining_end,
775        )
776    }
777}
778
779pub(crate) fn histogram_bucket_count_for_query(query: &ExplorerQuery) -> Option<usize> {
780    query
781        .histogram
782        .as_deref()
783        .map(|field| new_histogram(field, query).buckets.len())
784}
785
786#[derive(Default)]
787struct RowScan {
788    timestamp: Option<u64>,
789    fts_matches: bool,
790    fts_negative_match: bool,
791    column_fields: Vec<Vec<u8>>,
792}
793
794const FACET_PUBLIC: u8 = 0x01;
795const FACET_HISTOGRAM: u8 = 0x02;
796const FACET_SOURCE_REALTIME: u8 = 0x04;
797
798#[derive(Clone, Copy, Debug, PartialEq, Eq)]
799enum OffsetClass {
800    Irrelevant,
801    FtsMatch,
802    FtsNegativeMatch,
803    Value(usize),
804}
805
806impl OffsetClass {
807    const IRRELEVANT_RAW: usize = 1;
808    const FTS_MATCH_RAW: usize = 2;
809    const FTS_NEGATIVE_MATCH_RAW: usize = 3;
810    const VALUE_BASE: usize = 4;
811
812    fn to_raw(self) -> usize {
813        match self {
814            Self::Irrelevant => Self::IRRELEVANT_RAW,
815            Self::FtsMatch => Self::FTS_MATCH_RAW,
816            Self::FtsNegativeMatch => Self::FTS_NEGATIVE_MATCH_RAW,
817            Self::Value(index) => Self::VALUE_BASE.saturating_add(index),
818        }
819    }
820
821    fn from_raw(raw: usize) -> Self {
822        match raw {
823            Self::IRRELEVANT_RAW => Self::Irrelevant,
824            Self::FTS_MATCH_RAW => Self::FtsMatch,
825            Self::FTS_NEGATIVE_MATCH_RAW => Self::FtsNegativeMatch,
826            raw => Self::Value(raw.saturating_sub(Self::VALUE_BASE)),
827        }
828    }
829}
830
831#[derive(Clone, Copy, Debug, Default)]
832struct OffsetClassSlot {
833    offset: u64,
834    class: usize,
835}
836
837#[derive(Debug)]
838struct OffsetClassCache {
839    slots: Vec<OffsetClassSlot>,
840    len: usize,
841}
842
843impl Default for OffsetClassCache {
844    fn default() -> Self {
845        Self {
846            slots: vec![OffsetClassSlot::default(); 256],
847            len: 0,
848        }
849    }
850}
851
852impl OffsetClassCache {
853    fn lookup(&self, offset: NonZeroU64) -> Option<OffsetClass> {
854        let mask = self.slots.len().saturating_sub(1);
855        let mut index = offset_slot(offset.get()) & mask;
856        for _ in 0..self.slots.len() {
857            let slot = self.slots[index];
858            if slot.offset == 0 {
859                return None;
860            }
861            if slot.offset == offset.get() {
862                return Some(OffsetClass::from_raw(slot.class));
863            }
864            index = (index + 1) & mask;
865        }
866        None
867    }
868
869    fn insert(&mut self, offset: NonZeroU64, class: OffsetClass) {
870        if (self.len + 1).saturating_mul(4) >= self.slots.len().saturating_mul(3) {
871            self.grow();
872        }
873        self.insert_raw(offset.get(), class.to_raw());
874    }
875
876    fn grow(&mut self) {
877        let new_len = self.slots.len().saturating_mul(2).max(256);
878        let old = std::mem::replace(&mut self.slots, vec![OffsetClassSlot::default(); new_len]);
879        self.len = 0;
880        for slot in old {
881            if slot.offset != 0 {
882                self.insert_raw(slot.offset, slot.class);
883            }
884        }
885    }
886
887    fn insert_raw(&mut self, offset: u64, class: usize) {
888        let mask = self.slots.len().saturating_sub(1);
889        let mut index = offset_slot(offset) & mask;
890        loop {
891            if self.slots[index].offset == 0 {
892                self.slots[index] = OffsetClassSlot { offset, class };
893                self.len += 1;
894                return;
895            }
896            if self.slots[index].offset == offset {
897                self.slots[index].class = class;
898                return;
899            }
900            index = (index + 1) & mask;
901        }
902    }
903}
904
905fn offset_slot(offset: u64) -> usize {
906    let mut value = offset >> 3;
907    value ^= value >> 33;
908    value = value.wrapping_mul(0xff51afd7ed558ccd);
909    value ^= value >> 33;
910    value as usize
911}
912
913struct ExplorerAccumulator {
914    field_lookup: HashMap<Vec<u8>, usize>,
915    fields: Vec<Vec<u8>>,
916    flags: Vec<u8>,
917    last_seen_row_ids: Vec<u64>,
918    unset_counts: Vec<u64>,
919    values_by_field: Vec<Vec<usize>>,
920    value_counts: Vec<u64>,
921    value_field_indices: Vec<usize>,
922    value_labels: Vec<Vec<u8>>,
923    value_fts_matches: Vec<bool>,
924    value_source_realtime: Vec<Option<u64>>,
925    value_histogram_buckets: Vec<Option<Vec<u64>>>,
926    field_histogram_unset_buckets: Vec<Option<Vec<u64>>>,
927    offset_cache: OffsetClassCache,
928    histogram_start_realtime_usec: u64,
929    histogram_bucket_width_usec: u64,
930    histogram_bucket_count: usize,
931    required_identity_count: usize,
932}
933
934impl ExplorerAccumulator {
935    fn for_main(query: &ExplorerQuery, histogram: Option<&ExplorerHistogram>) -> Self {
936        Self::for_combined(query, &[], histogram)
937    }
938
939    fn for_facets(
940        query: &ExplorerQuery,
941        facet_indices: &[usize],
942        include_source_realtime: bool,
943    ) -> Self {
944        let mut out = Self::new(None);
945        for facet_index in facet_indices {
946            if let Some(field) = query.facets.get(*facet_index) {
947                out.add_field(field, FACET_PUBLIC);
948            }
949        }
950        if include_source_realtime {
951            out.add_field(SOURCE_REALTIME_FIELD, FACET_SOURCE_REALTIME);
952        }
953        out
954    }
955
956    fn for_combined(
957        query: &ExplorerQuery,
958        facet_indices: &[usize],
959        histogram: Option<&ExplorerHistogram>,
960    ) -> Self {
961        let mut out = Self::new(histogram);
962        if let Some(field) = &query.histogram {
963            out.add_field(field, FACET_HISTOGRAM);
964        }
965        for facet_index in facet_indices {
966            if let Some(field) = query.facets.get(*facet_index) {
967                out.add_field(field, FACET_PUBLIC);
968            }
969        }
970        if query_needs_source_realtime_main(query) || facet_pass_needs_source_realtime(query) {
971            out.add_field(SOURCE_REALTIME_FIELD, FACET_SOURCE_REALTIME);
972        }
973        out
974    }
975
976    fn new(histogram: Option<&ExplorerHistogram>) -> Self {
977        Self {
978            field_lookup: HashMap::new(),
979            fields: Vec::new(),
980            flags: Vec::new(),
981            last_seen_row_ids: Vec::new(),
982            unset_counts: Vec::new(),
983            values_by_field: Vec::new(),
984            value_counts: Vec::new(),
985            value_field_indices: Vec::new(),
986            value_labels: Vec::new(),
987            value_fts_matches: Vec::new(),
988            value_source_realtime: Vec::new(),
989            value_histogram_buckets: Vec::new(),
990            field_histogram_unset_buckets: Vec::new(),
991            offset_cache: OffsetClassCache::default(),
992            histogram_start_realtime_usec: histogram
993                .and_then(|histogram| histogram.buckets.first())
994                .map(|bucket| bucket.start_realtime_usec)
995                .unwrap_or_default(),
996            histogram_bucket_width_usec: histogram
997                .and_then(|histogram| histogram.buckets.first())
998                .map(|bucket| {
999                    bucket
1000                        .end_realtime_usec
1001                        .saturating_sub(bucket.start_realtime_usec)
1002                        .max(1)
1003                })
1004                .unwrap_or(1),
1005            histogram_bucket_count: histogram
1006                .map(|histogram| histogram.buckets.len())
1007                .unwrap_or_default(),
1008            required_identity_count: 0,
1009        }
1010    }
1011
1012    fn add_field(&mut self, field: &[u8], flags: u8) {
1013        if let Some(index) = self.field_lookup.get(field).copied() {
1014            let had_required = self.flags[index] != 0;
1015            self.flags[index] |= flags;
1016            if flags & FACET_HISTOGRAM != 0 && self.field_histogram_unset_buckets[index].is_none() {
1017                self.field_histogram_unset_buckets[index] =
1018                    Some(vec![0; self.histogram_bucket_count]);
1019            }
1020            if !had_required && self.flags[index] != 0 {
1021                self.required_identity_count += 1;
1022            }
1023            return;
1024        }
1025
1026        let index = self.fields.len();
1027        self.field_lookup.insert(field.to_vec(), index);
1028        self.fields.push(field.to_vec());
1029        self.flags.push(flags);
1030        self.last_seen_row_ids.push(0);
1031        self.unset_counts.push(0);
1032        self.values_by_field.push(Vec::new());
1033        self.field_histogram_unset_buckets
1034            .push((flags & FACET_HISTOGRAM != 0).then(|| vec![0; self.histogram_bucket_count]));
1035        if flags != 0 {
1036            self.required_identity_count += 1;
1037        }
1038    }
1039
1040    fn add_value(
1041        &mut self,
1042        field_index: usize,
1043        _data_offset: NonZeroU64,
1044        value: &[u8],
1045        fts_matches: bool,
1046    ) -> usize {
1047        let value_index = self.value_counts.len();
1048        let flags = self.flags[field_index];
1049        self.value_counts.push(0);
1050        self.value_field_indices.push(field_index);
1051        self.value_labels.push(value.to_vec());
1052        self.value_fts_matches.push(fts_matches);
1053        self.value_source_realtime
1054            .push(if flags & FACET_SOURCE_REALTIME != 0 {
1055                parse_source_realtime(value)
1056            } else {
1057                None
1058            });
1059        self.value_histogram_buckets
1060            .push((flags & FACET_HISTOGRAM != 0).then(|| vec![0; self.histogram_bucket_count]));
1061        self.values_by_field[field_index].push(value_index);
1062        value_index
1063    }
1064
1065    fn mark_field_seen(&mut self, field_index: usize, row_id: u64) -> bool {
1066        // Duplicate values for one field must not satisfy another required
1067        // field identity in first-value mode.
1068        if self.last_seen_row_ids[field_index] == row_id {
1069            return false;
1070        }
1071        self.last_seen_row_ids[field_index] = row_id;
1072        true
1073    }
1074
1075    fn apply_value(
1076        &mut self,
1077        value_index: usize,
1078        realtime_usec: Option<u64>,
1079        stats: &mut ExplorerStats,
1080    ) {
1081        let field_index = self.value_field_indices[value_index];
1082        let flags = self.flags[field_index];
1083        if flags & FACET_PUBLIC != 0 {
1084            self.value_counts[value_index] = self.value_counts[value_index].saturating_add(1);
1085            stats.facet_updates = stats.facet_updates.saturating_add(1);
1086        }
1087        if flags & FACET_HISTOGRAM != 0 {
1088            if let (Some(realtime_usec), Some(buckets)) = (
1089                realtime_usec,
1090                self.value_histogram_buckets[value_index].as_mut(),
1091            ) {
1092                if let Some(bucket_index) = histogram_bucket_index_from_bounds(
1093                    realtime_usec,
1094                    self.histogram_start_realtime_usec,
1095                    self.histogram_bucket_width_usec,
1096                    buckets.len(),
1097                ) {
1098                    buckets[bucket_index] = buckets[bucket_index].saturating_add(1);
1099                    stats.histogram_updates = stats.histogram_updates.saturating_add(1);
1100                }
1101            }
1102        }
1103    }
1104
1105    fn finish_facet_row(&mut self, row_id: u64, stats: &mut ExplorerStats) {
1106        for field_index in 0..self.fields.len() {
1107            if self.flags[field_index] & FACET_PUBLIC == 0 {
1108                continue;
1109            }
1110            if self.last_seen_row_ids[field_index] != row_id {
1111                self.unset_counts[field_index] = self.unset_counts[field_index].saturating_add(1);
1112                stats.facet_updates = stats.facet_updates.saturating_add(1);
1113            }
1114        }
1115    }
1116
1117    fn finish_histogram_row(&mut self, row_id: u64, realtime_usec: u64, stats: &mut ExplorerStats) {
1118        for field_index in 0..self.fields.len() {
1119            if self.flags[field_index] & FACET_HISTOGRAM == 0 {
1120                continue;
1121            }
1122            if self.last_seen_row_ids[field_index] == row_id {
1123                continue;
1124            }
1125            let Some(buckets) = self.field_histogram_unset_buckets[field_index].as_mut() else {
1126                continue;
1127            };
1128            if let Some(bucket_index) = histogram_bucket_index_from_bounds(
1129                realtime_usec,
1130                self.histogram_start_realtime_usec,
1131                self.histogram_bucket_width_usec,
1132                buckets.len(),
1133            ) {
1134                buckets[bucket_index] = buckets[bucket_index].saturating_add(1);
1135                stats.histogram_updates = stats.histogram_updates.saturating_add(1);
1136            }
1137        }
1138    }
1139
1140    fn finish_facets(&self, result: &mut ExplorerResult) {
1141        for field_index in 0..self.fields.len() {
1142            if self.flags[field_index] & FACET_PUBLIC == 0 {
1143                continue;
1144            }
1145            let mut values = HashMap::new();
1146            for value_index in &self.values_by_field[field_index] {
1147                let count = self.value_counts[*value_index];
1148                if count != 0 {
1149                    increment_counter_by(&mut values, &self.value_labels[*value_index], count);
1150                }
1151            }
1152            if self.unset_counts[field_index] != 0 {
1153                increment_counter_by(&mut values, UNSET_VALUE, self.unset_counts[field_index]);
1154            }
1155            result
1156                .facets
1157                .insert(self.fields[field_index].clone(), values);
1158        }
1159    }
1160
1161    fn finish_histogram(&self, histogram: Option<&mut ExplorerHistogram>) {
1162        let Some(histogram) = histogram else {
1163            return;
1164        };
1165        for buckets in self.field_histogram_unset_buckets.iter().flatten() {
1166            for (bucket_index, count) in buckets.iter().enumerate() {
1167                if *count == 0 {
1168                    continue;
1169                }
1170                if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
1171                    increment_counter_by(&mut bucket.values, UNSET_VALUE, *count);
1172                }
1173            }
1174        }
1175        for value_index in 0..self.value_histogram_buckets.len() {
1176            let Some(buckets) = &self.value_histogram_buckets[value_index] else {
1177                continue;
1178            };
1179            for (bucket_index, count) in buckets.iter().enumerate() {
1180                if *count == 0 {
1181                    continue;
1182                }
1183                if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
1184                    increment_counter_by(
1185                        &mut bucket.values,
1186                        &self.value_labels[value_index],
1187                        *count,
1188                    );
1189                }
1190            }
1191        }
1192    }
1193}
1194
1195fn bounded_positive_proportion(value: f64) -> Option<f64> {
1196    if value <= 0.0 || !value.is_finite() {
1197        return None;
1198    }
1199    Some(value.min(1.0))
1200}
1201
1202impl FileReader {
1203    pub fn explore(&mut self, query: &ExplorerQuery) -> Result<ExplorerResult> {
1204        self.explore_with_strategy(query, ExplorerStrategy::Traversal)
1205    }
1206
1207    pub fn explore_with_strategy(
1208        &mut self,
1209        query: &ExplorerQuery,
1210        strategy: ExplorerStrategy,
1211    ) -> Result<ExplorerResult> {
1212        self.explore_with_strategy_and_payload_mode(query, strategy, ExplorerRowPayloadMode::Expand)
1213    }
1214
1215    pub fn explore_with_strategy_and_control(
1216        &mut self,
1217        query: &ExplorerQuery,
1218        strategy: ExplorerStrategy,
1219        control: &mut ExplorerControl<'_>,
1220    ) -> Result<ExplorerResult> {
1221        validate_no_debug_column_collection(query)?;
1222        self.explore_with_strategy_and_payload_mode_unchecked(
1223            query,
1224            strategy,
1225            ExplorerRowPayloadMode::Expand,
1226            Some(control),
1227        )
1228    }
1229
1230    #[cfg(test)]
1231    pub(crate) fn explore_with_strategy_cursor_rows(
1232        &mut self,
1233        query: &ExplorerQuery,
1234        strategy: ExplorerStrategy,
1235    ) -> Result<ExplorerResult> {
1236        self.explore_with_strategy_and_payload_mode(
1237            query,
1238            strategy,
1239            ExplorerRowPayloadMode::CursorOnly,
1240        )
1241    }
1242
1243    pub(crate) fn explore_with_strategy_cursor_rows_controlled(
1244        &mut self,
1245        query: &ExplorerQuery,
1246        strategy: ExplorerStrategy,
1247        control: &mut ExplorerControl<'_>,
1248    ) -> Result<ExplorerResult> {
1249        validate_no_debug_column_collection(query)?;
1250        self.explore_with_strategy_and_payload_mode_unchecked(
1251            query,
1252            strategy,
1253            ExplorerRowPayloadMode::CursorOnly,
1254            Some(control),
1255        )
1256    }
1257
1258    fn explore_with_strategy_and_payload_mode(
1259        &mut self,
1260        query: &ExplorerQuery,
1261        strategy: ExplorerStrategy,
1262        row_payload_mode: ExplorerRowPayloadMode,
1263    ) -> Result<ExplorerResult> {
1264        validate_no_debug_column_collection(query)?;
1265        self.explore_with_strategy_and_payload_mode_unchecked(
1266            query,
1267            strategy,
1268            row_payload_mode,
1269            None,
1270        )
1271    }
1272
1273    fn explore_with_strategy_and_payload_mode_unchecked(
1274        &mut self,
1275        query: &ExplorerQuery,
1276        strategy: ExplorerStrategy,
1277        row_payload_mode: ExplorerRowPayloadMode,
1278        mut control: Option<&mut ExplorerControl<'_>>,
1279    ) -> Result<ExplorerResult> {
1280        match strategy {
1281            ExplorerStrategy::Traversal => {
1282                self.explore_traversal(query, row_payload_mode, control.as_deref_mut())
1283            }
1284            ExplorerStrategy::Index => {
1285                self.explore_indexed(query, row_payload_mode, control.as_deref_mut())
1286            }
1287            ExplorerStrategy::Compare => self.explore_compare(query, row_payload_mode),
1288        }
1289    }
1290
1291    fn explore_traversal(
1292        &mut self,
1293        query: &ExplorerQuery,
1294        row_payload_mode: ExplorerRowPayloadMode,
1295        mut control: Option<&mut ExplorerControl<'_>>,
1296    ) -> Result<ExplorerResult> {
1297        validate_query(query)?;
1298        let mut result = explorer_result_for_query(query);
1299        let facet_groups = facet_pass_groups(query);
1300        if can_run_combined_explorer_pass(&facet_groups) {
1301            self.explore_traversal_combined(
1302                query,
1303                row_payload_mode,
1304                &mut result,
1305                &facet_groups,
1306                control.as_deref_mut(),
1307            )?;
1308        } else {
1309            self.explore_traversal_split(
1310                query,
1311                row_payload_mode,
1312                &mut result,
1313                facet_groups,
1314                control.as_deref_mut(),
1315            )?;
1316        }
1317        self.configure_explorer_filters(query, None)?;
1318        Ok(result)
1319    }
1320
1321    fn explore_traversal_combined(
1322        &mut self,
1323        query: &ExplorerQuery,
1324        row_payload_mode: ExplorerRowPayloadMode,
1325        result: &mut ExplorerResult,
1326        facet_groups: &[FacetPassGroup],
1327        control: Option<&mut ExplorerControl<'_>>,
1328    ) -> Result<()> {
1329        let facet_indices = combined_facet_indices(facet_groups);
1330        if query_needs_main_pass(query) || !facet_indices.is_empty() {
1331            self.configure_explorer_filters(query, None)?;
1332            let mut accumulator =
1333                ExplorerAccumulator::for_combined(query, &facet_indices, result.histogram.as_ref());
1334            self.scan_explorer_combined(
1335                query,
1336                &mut accumulator,
1337                result,
1338                !facet_indices.is_empty(),
1339                row_payload_mode,
1340                control,
1341            )?;
1342            accumulator.finish_facets(result);
1343            accumulator.finish_histogram(result.histogram.as_mut());
1344        }
1345        Ok(())
1346    }
1347
1348    fn explore_traversal_split(
1349        &mut self,
1350        query: &ExplorerQuery,
1351        row_payload_mode: ExplorerRowPayloadMode,
1352        result: &mut ExplorerResult,
1353        facet_groups: Vec<FacetPassGroup>,
1354        mut control: Option<&mut ExplorerControl<'_>>,
1355    ) -> Result<()> {
1356        if query_needs_main_pass(query) {
1357            self.configure_explorer_filters(query, None)?;
1358            let mut accumulator = ExplorerAccumulator::for_main(query, result.histogram.as_ref());
1359            self.scan_explorer_main(
1360                query,
1361                &mut accumulator,
1362                result,
1363                row_payload_mode,
1364                control.as_deref_mut(),
1365            )?;
1366            accumulator.finish_histogram(result.histogram.as_mut());
1367        }
1368
1369        for group in facet_groups {
1370            if explorer_control_stopped(control.as_deref()) {
1371                break;
1372            }
1373            self.configure_explorer_filters(query, group.excluded_field.as_deref())?;
1374            let mut accumulator = ExplorerAccumulator::for_facets(
1375                query,
1376                &group.facet_indices,
1377                facet_pass_needs_source_realtime(query),
1378            );
1379            self.scan_explorer_facet(
1380                query,
1381                &mut accumulator,
1382                &mut result.stats,
1383                control.as_deref_mut(),
1384            )?;
1385            accumulator.finish_facets(result);
1386        }
1387        Ok(())
1388    }
1389
1390    fn explore_compare(
1391        &mut self,
1392        query: &ExplorerQuery,
1393        row_payload_mode: ExplorerRowPayloadMode,
1394    ) -> Result<ExplorerResult> {
1395        let traversal_started = Instant::now();
1396        let traversal = self.explore_traversal(query, row_payload_mode, None)?;
1397        let traversal_duration = traversal_started.elapsed();
1398
1399        let index_started = Instant::now();
1400        let mut indexed = self.explore_indexed(query, row_payload_mode, None)?;
1401        let index_duration = index_started.elapsed();
1402
1403        if !explorer_outputs_match(&traversal, &indexed) {
1404            return Err(SdkError::VerificationError(
1405                "indexed explorer output differs from traversal explorer output".to_string(),
1406            ));
1407        }
1408        indexed.comparison = Some(ExplorerComparison {
1409            traversal_duration,
1410            index_duration,
1411            traversal_stats: traversal.stats,
1412            index_stats: indexed.stats.clone(),
1413        });
1414        Ok(indexed)
1415    }
1416
1417    fn explore_indexed(
1418        &mut self,
1419        query: &ExplorerQuery,
1420        row_payload_mode: ExplorerRowPayloadMode,
1421        mut control: Option<&mut ExplorerControl<'_>>,
1422    ) -> Result<ExplorerResult> {
1423        validate_query(query)?;
1424        validate_indexed_query(query)?;
1425        let mut result = explorer_result_for_query(query);
1426        self.indexed_collect_rows(query, row_payload_mode, &mut result, control.as_deref_mut())?;
1427        self.indexed_collect_facets(query, &mut result, control.as_deref())?;
1428        self.indexed_collect_histogram(query, &mut result, control.as_deref())?;
1429        self.configure_explorer_filters(query, None)?;
1430        Ok(result)
1431    }
1432
1433    fn indexed_collect_rows(
1434        &mut self,
1435        query: &ExplorerQuery,
1436        row_payload_mode: ExplorerRowPayloadMode,
1437        result: &mut ExplorerResult,
1438        control: Option<&mut ExplorerControl<'_>>,
1439    ) -> Result<()> {
1440        if query.limit == 0 {
1441            return Ok(());
1442        }
1443        let mut row_query = query.clone();
1444        row_query.facets.clear();
1445        row_query.histogram = None;
1446        self.configure_explorer_filters(&row_query, None)?;
1447        let mut accumulator = ExplorerAccumulator::for_main(&row_query, None);
1448        self.scan_explorer_main(
1449            &row_query,
1450            &mut accumulator,
1451            result,
1452            row_payload_mode,
1453            control,
1454        )
1455    }
1456
1457    fn indexed_collect_facets(
1458        &mut self,
1459        query: &ExplorerQuery,
1460        result: &mut ExplorerResult,
1461        control: Option<&ExplorerControl<'_>>,
1462    ) -> Result<()> {
1463        if explorer_control_stopped(control) {
1464            return Ok(());
1465        }
1466        for group in facet_pass_groups(query) {
1467            let candidates = self.indexed_candidate_set(query, group.excluded_field.as_deref())?;
1468            self.inner.with_file(|file| {
1469                indexed_count_facet_group(file, query, &group, &candidates, result)
1470            })?;
1471        }
1472        Ok(())
1473    }
1474
1475    fn indexed_collect_histogram(
1476        &mut self,
1477        query: &ExplorerQuery,
1478        result: &mut ExplorerResult,
1479        control: Option<&ExplorerControl<'_>>,
1480    ) -> Result<()> {
1481        if query.histogram.is_none() || explorer_control_stopped(control) {
1482            return Ok(());
1483        }
1484        let candidates = self.indexed_candidate_set(query, None)?;
1485        self.inner
1486            .with_file(|file| indexed_count_histogram(file, query, &candidates, result))
1487    }
1488
1489    fn indexed_candidate_set(
1490        &mut self,
1491        query: &ExplorerQuery,
1492        excluded_field: Option<&[u8]>,
1493    ) -> Result<IndexedCandidateSet> {
1494        if query.filters.is_empty()
1495            && query.after_realtime_usec.is_none()
1496            && query.before_realtime_usec.is_none()
1497        {
1498            let count = self
1499                .inner
1500                .with_file(|file| file.journal_header_ref().n_entries);
1501            return Ok(IndexedCandidateSet::All { count });
1502        }
1503
1504        self.configure_explorer_filters(query, excluded_field)?;
1505        self.seek_for_explorer(query);
1506        let mut offsets = HashSet::new();
1507        while self.step_for_explorer(query.direction)? {
1508            let Some(metadata) = self.row.metadata() else {
1509                continue;
1510            };
1511            let commit_realtime = metadata.realtime;
1512            if stop_by_commit_time(query, commit_realtime) {
1513                break;
1514            }
1515            if !timestamp_in_range(query, commit_realtime) {
1516                continue;
1517            }
1518            if let Some(entry_offset) = self.row.entry_offset() {
1519                offsets.insert(entry_offset);
1520            }
1521        }
1522        Ok(IndexedCandidateSet::Set {
1523            count: offsets.len() as u64,
1524            offsets,
1525        })
1526    }
1527
1528    fn configure_explorer_filters(
1529        &mut self,
1530        query: &ExplorerQuery,
1531        excluded_field: Option<&[u8]>,
1532    ) -> Result<()> {
1533        self.flush_matches();
1534        for filter in &query.filters {
1535            if excluded_field.is_some_and(|field| field == filter.field.as_slice()) {
1536                continue;
1537            }
1538            if filter.values.is_empty() {
1539                continue;
1540            }
1541            for value in &filter.values {
1542                let payload = payload_from_parts(&filter.field, value);
1543                self.add_match(&payload);
1544            }
1545        }
1546        Ok(())
1547    }
1548
1549    fn next_explorer_row_frame(
1550        &mut self,
1551        query: &ExplorerQuery,
1552        rows_seen: &mut u64,
1553        stats: &ExplorerStats,
1554        control: Option<&mut ExplorerControl<'_>>,
1555    ) -> Result<ExplorerLoopStep> {
1556        if !self.step_for_explorer(query.direction)? {
1557            return Ok(ExplorerLoopStep::Stop);
1558        }
1559        *rows_seen = rows_seen.saturating_add(1);
1560        if control.is_some_and(|control| control.should_stop_after_rows(*rows_seen, stats)) {
1561            return Ok(ExplorerLoopStep::Stop);
1562        }
1563        let Some(metadata) = self.row.metadata() else {
1564            return Ok(ExplorerLoopStep::Skip);
1565        };
1566        if stop_by_commit_time(query, metadata.realtime) {
1567            return Ok(ExplorerLoopStep::Stop);
1568        }
1569        if skip_by_commit_time(query, metadata.realtime) {
1570            return Ok(ExplorerLoopStep::Skip);
1571        }
1572        Ok(ExplorerLoopStep::Row(ExplorerRowFrame {
1573            commit_realtime: metadata.realtime,
1574            seqnum: metadata.seqnum,
1575        }))
1576    }
1577
1578    fn scan_row_data_or_default(
1579        &mut self,
1580        query: &ExplorerQuery,
1581        accumulator: &mut ExplorerAccumulator,
1582        row_id: &mut u64,
1583        deferred_values: &mut Vec<usize>,
1584        stats: &mut ExplorerStats,
1585    ) -> Result<RowScan> {
1586        if accumulator.required_identity_count == 0 && !query_has_fts(query) {
1587            stats.rows_examined = stats.rows_examined.saturating_add(1);
1588            return Ok(RowScan::default());
1589        }
1590        *row_id = row_id.saturating_add(1);
1591        deferred_values.clear();
1592        self.scan_current_row(
1593            query,
1594            accumulator,
1595            *row_id,
1596            ScanApply::Deferred(deferred_values),
1597            stats,
1598        )
1599    }
1600
1601    fn accepted_effective_realtime(
1602        query: &ExplorerQuery,
1603        scan: &RowScan,
1604        commit_realtime: u64,
1605        stats: &mut ExplorerStats,
1606        control: Option<&mut ExplorerControl<'_>>,
1607    ) -> Option<u64> {
1608        let mut effective_realtime = effective_realtime_from_scan(scan.timestamp, commit_realtime);
1609        record_source_realtime_delta(stats, scan.timestamp, commit_realtime);
1610        if let Some(control) = control {
1611            effective_realtime = control.adjust_realtime(effective_realtime);
1612        }
1613        (timestamp_in_range(query, effective_realtime) && !row_rejected_by_fts(query, scan))
1614            .then_some(effective_realtime)
1615    }
1616
1617    fn push_explorer_row_if_wanted(
1618        &mut self,
1619        query: &ExplorerQuery,
1620        result: &mut ExplorerResult,
1621        row_payload_mode: ExplorerRowPayloadMode,
1622        effective_realtime: u64,
1623    ) -> Result<()> {
1624        if row_within_anchor(query, effective_realtime) && result.rows.len() < query.limit {
1625            result.rows.push(self.current_explorer_row(
1626                effective_realtime,
1627                &mut result.stats,
1628                row_payload_mode,
1629            )?);
1630        }
1631        Ok(())
1632    }
1633
1634    fn apply_main_scanned_row(
1635        &mut self,
1636        query: &ExplorerQuery,
1637        accumulator: &mut ExplorerAccumulator,
1638        result: &mut ExplorerResult,
1639        row_payload_mode: ExplorerRowPayloadMode,
1640        scanned: MainScannedRow<'_>,
1641        control: Option<&mut ExplorerControl<'_>>,
1642    ) -> Result<bool> {
1643        if query.debug_collect_column_fields_by_row_traversal {
1644            result.column_fields.extend(scanned.scan.column_fields);
1645        }
1646        record_last_realtime(&mut result.stats, scanned.commit_realtime);
1647        result.stats.rows_matched = result.stats.rows_matched.saturating_add(1);
1648        let stop_after_matched_row = control.is_some_and(|control| {
1649            control.emit_matched_row(scanned.effective_realtime, result.stats.rows_matched)
1650        });
1651        for value_index in scanned.deferred_values {
1652            accumulator.apply_value(
1653                *value_index,
1654                Some(scanned.effective_realtime),
1655                &mut result.stats,
1656            );
1657        }
1658        accumulator.finish_histogram_row(
1659            scanned.row_id,
1660            scanned.effective_realtime,
1661            &mut result.stats,
1662        );
1663        self.push_explorer_row_if_wanted(
1664            query,
1665            result,
1666            row_payload_mode,
1667            scanned.effective_realtime,
1668        )?;
1669        Ok(stop_after_matched_row
1670            || should_stop_when_rows_full(
1671                query,
1672                &result.rows,
1673                scanned.effective_realtime,
1674                result.stats.rows_matched,
1675            ))
1676    }
1677
1678    fn sampling_state_for_combined(
1679        query: &ExplorerQuery,
1680        result: &ExplorerResult,
1681        control: Option<&mut ExplorerControl<'_>>,
1682    ) -> Option<ExplorerSamplingState> {
1683        let sampling = ExplorerSamplingState::for_query(
1684            query,
1685            result
1686                .histogram
1687                .as_ref()
1688                .map(|histogram| histogram.buckets.len()),
1689        );
1690        if let Some(control) = control {
1691            if let (Some(shared_sampling), Some(file_sampling)) =
1692                (control.sampling.as_deref_mut(), query.sampling)
1693            {
1694                shared_sampling.begin_file(file_sampling);
1695            }
1696        }
1697        sampling
1698    }
1699
1700    fn combined_sampling_decision(
1701        query: &ExplorerQuery,
1702        rows: &[ExplorerRow],
1703        frame: ExplorerRowFrame,
1704        sampling: &mut Option<ExplorerSamplingState>,
1705        mut control: Option<&mut ExplorerControl<'_>>,
1706    ) -> Option<ExplorerSamplingDecision> {
1707        let candidate_to_keep = if let Some(control) = control.as_deref_mut() {
1708            control.candidate_row.as_deref_mut().map_or_else(
1709                || row_candidate_to_keep(query, rows, frame.commit_realtime),
1710                |candidate_row| candidate_row(frame.commit_realtime),
1711            )
1712        } else {
1713            row_candidate_to_keep(query, rows, frame.commit_realtime)
1714        };
1715        if let Some(control) = control {
1716            if let Some(shared_sampling) = control.sampling.as_deref_mut() {
1717                return Some(shared_sampling.decide(
1718                    frame.commit_realtime,
1719                    frame.seqnum,
1720                    candidate_to_keep,
1721                ));
1722            }
1723        }
1724        sampling
1725            .as_mut()
1726            .map(|sampling| sampling.decide(frame.commit_realtime, frame.seqnum, candidate_to_keep))
1727    }
1728
1729    fn apply_combined_sampling_decision(
1730        decision: ExplorerSamplingDecision,
1731        mode: CombinedScanMode,
1732        result: &mut ExplorerResult,
1733        frame: ExplorerRowFrame,
1734    ) -> SamplingRowAction {
1735        match decision {
1736            ExplorerSamplingDecision::Full { sampled } => {
1737                if sampled {
1738                    result.stats.sampling_sampled = result.stats.sampling_sampled.saturating_add(1);
1739                }
1740                SamplingRowAction::Scan
1741            }
1742            ExplorerSamplingDecision::SkipFields => {
1743                record_combined_unsampled_row(
1744                    &mut result.stats,
1745                    mode,
1746                    frame.commit_realtime,
1747                    1,
1748                    true,
1749                );
1750                add_special_histogram_value(
1751                    result.histogram.as_mut(),
1752                    frame.commit_realtime,
1753                    EXPLORER_UNSAMPLED_VALUE,
1754                    1,
1755                    &mut result.stats,
1756                );
1757                SamplingRowAction::Skip
1758            }
1759            ExplorerSamplingDecision::StopAndEstimate {
1760                remaining_rows,
1761                from_realtime_usec,
1762                to_realtime_usec,
1763            } => {
1764                record_combined_unsampled_row(
1765                    &mut result.stats,
1766                    mode,
1767                    frame.commit_realtime,
1768                    remaining_rows,
1769                    false,
1770                );
1771                result.stats.rows_estimated =
1772                    result.stats.rows_estimated.saturating_add(remaining_rows);
1773                result.stats.sampling_estimated = result
1774                    .stats
1775                    .sampling_estimated
1776                    .saturating_add(remaining_rows);
1777                add_estimated_histogram_range(
1778                    result.histogram.as_mut(),
1779                    from_realtime_usec,
1780                    to_realtime_usec,
1781                    remaining_rows,
1782                    &mut result.stats,
1783                );
1784                SamplingRowAction::Stop
1785            }
1786        }
1787    }
1788
1789    fn apply_combined_scanned_row(
1790        &mut self,
1791        query: &ExplorerQuery,
1792        accumulator: &mut ExplorerAccumulator,
1793        result: &mut ExplorerResult,
1794        row_payload_mode: ExplorerRowPayloadMode,
1795        mode: CombinedScanMode,
1796        scanned: MainScannedRow<'_>,
1797        control: Option<&mut ExplorerControl<'_>>,
1798    ) -> Result<bool> {
1799        if query.debug_collect_column_fields_by_row_traversal {
1800            result.column_fields.extend(scanned.scan.column_fields);
1801        }
1802        record_last_realtime(&mut result.stats, scanned.commit_realtime);
1803        let stop_after_matched_row = update_combined_matched_stats(
1804            &mut result.stats,
1805            mode,
1806            scanned.effective_realtime,
1807            control,
1808        );
1809        let value_realtime = query
1810            .histogram
1811            .is_some()
1812            .then_some(scanned.effective_realtime);
1813        for value_index in scanned.deferred_values {
1814            accumulator.apply_value(*value_index, value_realtime, &mut result.stats);
1815        }
1816        if query.histogram.is_some() {
1817            accumulator.finish_histogram_row(
1818                scanned.row_id,
1819                scanned.effective_realtime,
1820                &mut result.stats,
1821            );
1822        }
1823        if mode.include_facets {
1824            accumulator.finish_facet_row(scanned.row_id, &mut result.stats);
1825        }
1826        self.push_explorer_row_if_wanted(
1827            query,
1828            result,
1829            row_payload_mode,
1830            scanned.effective_realtime,
1831        )?;
1832        Ok(stop_after_matched_row
1833            || should_stop_when_rows_full(
1834                query,
1835                &result.rows,
1836                scanned.effective_realtime,
1837                result.stats.rows_matched,
1838            ))
1839    }
1840
1841    fn scan_explorer_main(
1842        &mut self,
1843        query: &ExplorerQuery,
1844        accumulator: &mut ExplorerAccumulator,
1845        result: &mut ExplorerResult,
1846        row_payload_mode: ExplorerRowPayloadMode,
1847        mut control: Option<&mut ExplorerControl<'_>>,
1848    ) -> Result<()> {
1849        self.seek_for_explorer(query);
1850        let mut row_id = 0u64;
1851        let mut rows_seen = 0u64;
1852        let mut deferred_values = Vec::new();
1853        loop {
1854            let frame = match self.next_explorer_row_frame(
1855                query,
1856                &mut rows_seen,
1857                &result.stats,
1858                control.as_deref_mut(),
1859            )? {
1860                ExplorerLoopStep::Stop => break,
1861                ExplorerLoopStep::Skip => continue,
1862                ExplorerLoopStep::Row(frame) => frame,
1863            };
1864            let scan = self.scan_row_data_or_default(
1865                query,
1866                accumulator,
1867                &mut row_id,
1868                &mut deferred_values,
1869                &mut result.stats,
1870            )?;
1871            let Some(effective_realtime) = Self::accepted_effective_realtime(
1872                query,
1873                &scan,
1874                frame.commit_realtime,
1875                &mut result.stats,
1876                control.as_deref_mut(),
1877            ) else {
1878                continue;
1879            };
1880            let scanned = MainScannedRow {
1881                row_id,
1882                commit_realtime: frame.commit_realtime,
1883                effective_realtime,
1884                scan,
1885                deferred_values: &deferred_values,
1886            };
1887            if self.apply_main_scanned_row(
1888                query,
1889                accumulator,
1890                result,
1891                row_payload_mode,
1892                scanned,
1893                control.as_deref_mut(),
1894            )? {
1895                break;
1896            }
1897        }
1898        result.stats.rows_returned = result.rows.len() as u64;
1899        Ok(())
1900    }
1901
1902    fn scan_explorer_combined(
1903        &mut self,
1904        query: &ExplorerQuery,
1905        accumulator: &mut ExplorerAccumulator,
1906        result: &mut ExplorerResult,
1907        include_facets: bool,
1908        row_payload_mode: ExplorerRowPayloadMode,
1909        mut control: Option<&mut ExplorerControl<'_>>,
1910    ) -> Result<()> {
1911        self.seek_for_explorer(query);
1912        let mode = CombinedScanMode {
1913            include_main: query_needs_main_pass(query),
1914            include_facets,
1915        };
1916        let mut row_id = 0u64;
1917        let mut rows_seen = 0u64;
1918        let mut deferred_values = Vec::new();
1919        let mut sampling = Self::sampling_state_for_combined(query, result, control.as_deref_mut());
1920        loop {
1921            let frame = match self.next_explorer_row_frame(
1922                query,
1923                &mut rows_seen,
1924                &result.stats,
1925                control.as_deref_mut(),
1926            )? {
1927                ExplorerLoopStep::Stop => break,
1928                ExplorerLoopStep::Skip => continue,
1929                ExplorerLoopStep::Row(frame) => frame,
1930            };
1931            if let Some(decision) = Self::combined_sampling_decision(
1932                query,
1933                &result.rows,
1934                frame,
1935                &mut sampling,
1936                control.as_deref_mut(),
1937            ) {
1938                match Self::apply_combined_sampling_decision(decision, mode, result, frame) {
1939                    SamplingRowAction::Scan => {}
1940                    SamplingRowAction::Skip => continue,
1941                    SamplingRowAction::Stop => break,
1942                }
1943            }
1944            let scan = self.scan_row_data_or_default(
1945                query,
1946                accumulator,
1947                &mut row_id,
1948                &mut deferred_values,
1949                &mut result.stats,
1950            )?;
1951            let Some(effective_realtime) = Self::accepted_effective_realtime(
1952                query,
1953                &scan,
1954                frame.commit_realtime,
1955                &mut result.stats,
1956                control.as_deref_mut(),
1957            ) else {
1958                continue;
1959            };
1960            let scanned = MainScannedRow {
1961                row_id,
1962                commit_realtime: frame.commit_realtime,
1963                effective_realtime,
1964                scan,
1965                deferred_values: &deferred_values,
1966            };
1967            if self.apply_combined_scanned_row(
1968                query,
1969                accumulator,
1970                result,
1971                row_payload_mode,
1972                mode,
1973                scanned,
1974                control.as_deref_mut(),
1975            )? {
1976                break;
1977            }
1978        }
1979        result.stats.rows_returned = result.rows.len() as u64;
1980        Ok(())
1981    }
1982
1983    fn scan_explorer_facet(
1984        &mut self,
1985        query: &ExplorerQuery,
1986        accumulator: &mut ExplorerAccumulator,
1987        stats: &mut ExplorerStats,
1988        mut control: Option<&mut ExplorerControl<'_>>,
1989    ) -> Result<()> {
1990        self.seek_for_explorer(query);
1991        let defer_apply = query.after_realtime_usec.is_some()
1992            || query.before_realtime_usec.is_some()
1993            || query_has_fts(query);
1994        let mut row_id = 0u64;
1995        let mut rows_seen = 0u64;
1996        let mut deferred_values = Vec::new();
1997        loop {
1998            let frame = match self.next_explorer_row_frame(
1999                query,
2000                &mut rows_seen,
2001                stats,
2002                control.as_deref_mut(),
2003            )? {
2004                ExplorerLoopStep::Stop => break,
2005                ExplorerLoopStep::Skip => continue,
2006                ExplorerLoopStep::Row(frame) => frame,
2007            };
2008            row_id = row_id.saturating_add(1);
2009            deferred_values.clear();
2010            let scan = if defer_apply {
2011                self.scan_current_row(
2012                    query,
2013                    accumulator,
2014                    row_id,
2015                    ScanApply::Deferred(&mut deferred_values),
2016                    stats,
2017                )?
2018            } else {
2019                self.scan_current_row(query, accumulator, row_id, ScanApply::Immediate, stats)?
2020            };
2021            if Self::accepted_effective_realtime(query, &scan, frame.commit_realtime, stats, None)
2022                .is_none()
2023            {
2024                continue;
2025            }
2026            record_last_realtime(stats, frame.commit_realtime);
2027            stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(1);
2028            if defer_apply {
2029                for value_index in &deferred_values {
2030                    accumulator.apply_value(*value_index, None, stats);
2031                }
2032            }
2033            accumulator.finish_facet_row(row_id, stats);
2034        }
2035        Ok(())
2036    }
2037
2038    fn scan_current_row(
2039        &mut self,
2040        query: &ExplorerQuery,
2041        accumulator: &mut ExplorerAccumulator,
2042        row_id: u64,
2043        mut apply: ScanApply<'_>,
2044        stats: &mut ExplorerStats,
2045    ) -> Result<RowScan> {
2046        stats.rows_examined = stats.rows_examined.saturating_add(1);
2047        let mut out = RowScan::default();
2048        let mut state = RowScanState::new(query, accumulator);
2049
2050        let inner = &mut self.inner;
2051        let row = &mut self.row;
2052        inner.with_mut(|fields| {
2053            fields.reader.release_object_guards();
2054            row.restart_data()?;
2055            let result = (|| {
2056                for index in 0..row.data_offset_count() {
2057                    let Some(data_offset) = row.data_offset_at(index) else {
2058                        break;
2059                    };
2060                    stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2061                    let class = classify_data_for_accumulator(
2062                        fields.file,
2063                        row,
2064                        data_offset,
2065                        accumulator,
2066                        state.needs_fts,
2067                        query,
2068                        query
2069                            .debug_collect_column_fields_by_row_traversal
2070                            .then_some(&mut out.column_fields),
2071                        stats,
2072                    )?;
2073
2074                    handle_row_offset_class(
2075                        class,
2076                        accumulator,
2077                        row_id,
2078                        &mut state,
2079                        &mut out,
2080                        &mut apply,
2081                        stats,
2082                    );
2083                    if state.should_stop_row_scan() {
2084                        record_row_scan_early_stop(stats);
2085                        break;
2086                    }
2087                }
2088                Ok::<_, SdkError>(())
2089            })();
2090            row.reset_data_state(fields.file)?;
2091            result
2092        })?;
2093        Ok(out)
2094    }
2095
2096    fn seek_for_explorer(&mut self, query: &ExplorerQuery) {
2097        let anchor = if query.stop_when_rows_full {
2098            query.anchor
2099        } else {
2100            ExplorerAnchor::Auto
2101        };
2102        match query.direction {
2103            Direction::Forward => match anchor {
2104                ExplorerAnchor::Auto => {
2105                    if let Some(after) = query.after_realtime_usec {
2106                        self.seek_realtime(after.saturating_sub(query.realtime_slack_usec));
2107                    } else {
2108                        self.seek_head();
2109                    }
2110                }
2111                ExplorerAnchor::Realtime(usec) => self.seek_realtime(usec),
2112                ExplorerAnchor::Tail => self.seek_tail(),
2113                ExplorerAnchor::Head => {
2114                    if let Some(after) = query.after_realtime_usec {
2115                        self.seek_realtime(after.saturating_sub(query.realtime_slack_usec));
2116                    } else {
2117                        self.seek_head();
2118                    }
2119                }
2120            },
2121            Direction::Backward => match anchor {
2122                ExplorerAnchor::Auto => {
2123                    if let Some(before) = query.before_realtime_usec {
2124                        self.seek_realtime(before.saturating_add(query.realtime_slack_usec));
2125                    } else {
2126                        self.seek_tail();
2127                    }
2128                }
2129                ExplorerAnchor::Realtime(usec) => self.seek_realtime(usec),
2130                ExplorerAnchor::Head => self.seek_head(),
2131                ExplorerAnchor::Tail => {
2132                    if let Some(before) = query.before_realtime_usec {
2133                        self.seek_realtime(before.saturating_add(query.realtime_slack_usec));
2134                    } else {
2135                        self.seek_tail();
2136                    }
2137                }
2138            },
2139        }
2140    }
2141
2142    fn step_for_explorer(&mut self, direction: Direction) -> Result<bool> {
2143        match direction {
2144            Direction::Forward => self.next(),
2145            Direction::Backward => self.previous(),
2146        }
2147    }
2148
2149    fn current_explorer_row(
2150        &mut self,
2151        realtime_usec: u64,
2152        stats: &mut ExplorerStats,
2153        row_payload_mode: ExplorerRowPayloadMode,
2154    ) -> Result<ExplorerRow> {
2155        let cursor = self.get_cursor()?;
2156        let mut payloads = Vec::new();
2157        if row_payload_mode == ExplorerRowPayloadMode::Expand {
2158            self.collect_entry_payloads(&mut payloads)?;
2159            stats.returned_row_expansions = stats.returned_row_expansions.saturating_add(1);
2160        }
2161        Ok(ExplorerRow {
2162            realtime_usec,
2163            cursor,
2164            payloads,
2165        })
2166    }
2167}
2168
2169enum ScanApply<'a> {
2170    Immediate,
2171    Deferred(&'a mut Vec<usize>),
2172}
2173
2174#[derive(Debug, Clone, Copy)]
2175struct ExplorerRowFrame {
2176    commit_realtime: u64,
2177    seqnum: u64,
2178}
2179
2180enum ExplorerLoopStep {
2181    Stop,
2182    Skip,
2183    Row(ExplorerRowFrame),
2184}
2185
2186#[derive(Debug, Clone, Copy)]
2187struct CombinedScanMode {
2188    include_main: bool,
2189    include_facets: bool,
2190}
2191
2192struct MainScannedRow<'a> {
2193    row_id: u64,
2194    commit_realtime: u64,
2195    effective_realtime: u64,
2196    scan: RowScan,
2197    deferred_values: &'a [usize],
2198}
2199
2200struct RowScanState {
2201    use_first_value: bool,
2202    needs_fts: bool,
2203    collect_column_fields: bool,
2204    fields_missing_from_row: usize,
2205}
2206
2207impl RowScanState {
2208    fn new(query: &ExplorerQuery, accumulator: &ExplorerAccumulator) -> Self {
2209        let use_first_value = query.field_mode == ExplorerFieldMode::FirstValue;
2210        Self {
2211            use_first_value,
2212            needs_fts: query_has_fts(query),
2213            collect_column_fields: query.debug_collect_column_fields_by_row_traversal,
2214            fields_missing_from_row: if use_first_value {
2215                accumulator.required_identity_count
2216            } else {
2217                0
2218            },
2219        }
2220    }
2221
2222    fn should_stop_row_scan(&self) -> bool {
2223        self.use_first_value
2224            && !self.needs_fts
2225            && !self.collect_column_fields
2226            && self.fields_missing_from_row == 0
2227    }
2228}
2229
2230enum SamplingRowAction {
2231    Scan,
2232    Skip,
2233    Stop,
2234}
2235
2236enum IndexedCandidateSet {
2237    All {
2238        count: u64,
2239    },
2240    Set {
2241        count: u64,
2242        offsets: HashSet<NonZeroU64>,
2243    },
2244}
2245
2246impl IndexedCandidateSet {
2247    fn count(&self) -> u64 {
2248        match self {
2249            Self::All { count } | Self::Set { count, .. } => *count,
2250        }
2251    }
2252
2253    fn contains(&self, entry_offset: NonZeroU64) -> bool {
2254        match self {
2255            Self::All { .. } => true,
2256            Self::Set { offsets, .. } => offsets.contains(&entry_offset),
2257        }
2258    }
2259}
2260
2261struct FacetPassGroup {
2262    excluded_field: Option<Vec<u8>>,
2263    facet_indices: Vec<usize>,
2264}
2265
2266fn facet_pass_groups(query: &ExplorerQuery) -> Vec<FacetPassGroup> {
2267    let filter_fields: HashSet<&[u8]> = query
2268        .filters
2269        .iter()
2270        .map(|filter| filter.field.as_slice())
2271        .collect();
2272    let mut groups: Vec<FacetPassGroup> = Vec::new();
2273
2274    for (index, facet) in query.facets.iter().enumerate() {
2275        let excluded_field = (query.exclude_facet_field_filters
2276            && filter_fields.contains(facet.as_slice()))
2277        .then(|| facet.clone());
2278        if let Some(existing) = groups
2279            .iter_mut()
2280            .find(|group| group.excluded_field.as_deref() == excluded_field.as_deref())
2281        {
2282            existing.facet_indices.push(index);
2283        } else {
2284            groups.push(FacetPassGroup {
2285                excluded_field,
2286                facet_indices: vec![index],
2287            });
2288        }
2289    }
2290
2291    groups
2292}
2293
2294fn indexed_count_facet_group(
2295    file: &JournalFile<Mmap>,
2296    query: &ExplorerQuery,
2297    group: &FacetPassGroup,
2298    candidates: &IndexedCandidateSet,
2299    result: &mut ExplorerResult,
2300) -> Result<()> {
2301    result.stats.facet_rows_matched = result
2302        .stats
2303        .facet_rows_matched
2304        .saturating_add(candidates.count());
2305
2306    for facet_index in &group.facet_indices {
2307        let Some(field) = query.facets.get(*facet_index) else {
2308            continue;
2309        };
2310        let mut values = HashMap::new();
2311        let mut rows_with_field = HashSet::new();
2312        let mut decompressed = Vec::new();
2313
2314        for item in file.field_data_objects_with_offsets(field)? {
2315            let (_, data) = item?;
2316            let Some((value, cursor)) =
2317                indexed_value_and_cursor(&data, field, &mut decompressed, &mut result.stats)?
2318            else {
2319                continue;
2320            };
2321            drop(data);
2322
2323            let count = indexed_count_facet_entries(
2324                file,
2325                cursor,
2326                candidates,
2327                &mut rows_with_field,
2328                &mut result.stats,
2329            )?;
2330            if count == 0 {
2331                continue;
2332            }
2333            increment_counter_by(&mut values, &value, count);
2334            result.stats.facet_updates = result.stats.facet_updates.saturating_add(count);
2335        }
2336
2337        let unset = candidates
2338            .count()
2339            .saturating_sub(rows_with_field.len() as u64);
2340        if unset != 0 {
2341            increment_counter_by(&mut values, UNSET_VALUE, unset);
2342            result.stats.facet_updates = result.stats.facet_updates.saturating_add(unset);
2343        }
2344        result.facets.insert(field.clone(), values);
2345    }
2346
2347    Ok(())
2348}
2349
2350fn indexed_count_histogram(
2351    file: &JournalFile<Mmap>,
2352    query: &ExplorerQuery,
2353    candidates: &IndexedCandidateSet,
2354    result: &mut ExplorerResult,
2355) -> Result<()> {
2356    let Some(histogram) = result.histogram.as_mut() else {
2357        return Ok(());
2358    };
2359    let field = histogram.field.clone();
2360    let mut decompressed = Vec::new();
2361    let mut rows_with_field = HashSet::new();
2362
2363    for item in file.field_data_objects_with_offsets(&field)? {
2364        let (_, data) = item?;
2365        let Some((value, cursor)) =
2366            indexed_value_and_cursor(&data, &field, &mut decompressed, &mut result.stats)?
2367        else {
2368            continue;
2369        };
2370        drop(data);
2371
2372        indexed_count_histogram_entries(
2373            file,
2374            cursor,
2375            candidates,
2376            &value,
2377            histogram,
2378            query,
2379            &mut rows_with_field,
2380            &mut result.stats,
2381        )?;
2382    }
2383
2384    indexed_count_histogram_unset_entries(
2385        file,
2386        candidates,
2387        &rows_with_field,
2388        histogram,
2389        query,
2390        &mut result.stats,
2391    )?;
2392
2393    Ok(())
2394}
2395
2396fn indexed_value_and_cursor(
2397    data: &DataObject<&[u8]>,
2398    field: &[u8],
2399    decompressed: &mut Vec<u8>,
2400    stats: &mut ExplorerStats,
2401) -> Result<Option<(Vec<u8>, Option<InlinedCursor>)>> {
2402    stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2403    stats.data_payloads_loaded = stats.data_payloads_loaded.saturating_add(1);
2404    let payload = if data.is_compressed() {
2405        decompressed.clear();
2406        let len = data.decompress(decompressed)?;
2407        stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2408        &decompressed[..len]
2409    } else {
2410        data.raw_payload()
2411    };
2412
2413    let Some((payload_field, value)) = split_payload_bytes(payload) else {
2414        return Ok(None);
2415    };
2416    if payload_field != field {
2417        return Ok(None);
2418    }
2419    Ok(Some((value.to_vec(), data.inlined_cursor())))
2420}
2421
2422fn indexed_count_facet_entries(
2423    file: &JournalFile<Mmap>,
2424    cursor: Option<InlinedCursor>,
2425    candidates: &IndexedCandidateSet,
2426    rows_with_field: &mut HashSet<NonZeroU64>,
2427    stats: &mut ExplorerStats,
2428) -> Result<u64> {
2429    let mut count = 0u64;
2430    indexed_visit_entries(file, cursor, |entry_offset| {
2431        stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2432        if candidates.contains(entry_offset) {
2433            count = count.saturating_add(1);
2434            rows_with_field.insert(entry_offset);
2435        }
2436        Ok(())
2437    })?;
2438    Ok(count)
2439}
2440
2441fn indexed_count_histogram_entries(
2442    file: &JournalFile<Mmap>,
2443    cursor: Option<InlinedCursor>,
2444    candidates: &IndexedCandidateSet,
2445    value: &[u8],
2446    histogram: &mut ExplorerHistogram,
2447    query: &ExplorerQuery,
2448    rows_with_field: &mut HashSet<NonZeroU64>,
2449    stats: &mut ExplorerStats,
2450) -> Result<()> {
2451    let histogram_start = histogram
2452        .buckets
2453        .first()
2454        .map(|bucket| bucket.start_realtime_usec)
2455        .unwrap_or_default();
2456    let histogram_bucket_width = histogram
2457        .buckets
2458        .first()
2459        .map(|bucket| {
2460            bucket
2461                .end_realtime_usec
2462                .saturating_sub(bucket.start_realtime_usec)
2463                .max(1)
2464        })
2465        .unwrap_or(1);
2466    let histogram_bucket_count = histogram.buckets.len();
2467
2468    indexed_visit_entries(file, cursor, |entry_offset| {
2469        stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2470        if !candidates.contains(entry_offset) {
2471            return Ok(());
2472        }
2473        let entry = file.entry_ref(entry_offset)?;
2474        let realtime = entry.header.realtime;
2475        drop(entry);
2476        rows_with_field.insert(entry_offset);
2477        if !timestamp_in_range(query, realtime) {
2478            return Ok(());
2479        }
2480        let Some(bucket_index) = histogram_bucket_index_from_bounds(
2481            realtime,
2482            histogram_start,
2483            histogram_bucket_width,
2484            histogram_bucket_count,
2485        ) else {
2486            return Ok(());
2487        };
2488        if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
2489            increment_counter_by(&mut bucket.values, value, 1);
2490            stats.histogram_updates = stats.histogram_updates.saturating_add(1);
2491        }
2492        Ok(())
2493    })
2494}
2495
2496fn indexed_count_histogram_unset_entries(
2497    file: &JournalFile<Mmap>,
2498    candidates: &IndexedCandidateSet,
2499    rows_with_field: &HashSet<NonZeroU64>,
2500    histogram: &mut ExplorerHistogram,
2501    query: &ExplorerQuery,
2502    stats: &mut ExplorerStats,
2503) -> Result<()> {
2504    let histogram_start = histogram
2505        .buckets
2506        .first()
2507        .map(|bucket| bucket.start_realtime_usec)
2508        .unwrap_or_default();
2509    let histogram_bucket_width = histogram
2510        .buckets
2511        .first()
2512        .map(|bucket| {
2513            bucket
2514                .end_realtime_usec
2515                .saturating_sub(bucket.start_realtime_usec)
2516                .max(1)
2517        })
2518        .unwrap_or(1);
2519    let histogram_bucket_count = histogram.buckets.len();
2520
2521    let mut visit = |entry_offset: NonZeroU64| -> Result<()> {
2522        if rows_with_field.contains(&entry_offset) {
2523            return Ok(());
2524        }
2525        let entry = file.entry_ref(entry_offset)?;
2526        let realtime = entry.header.realtime;
2527        drop(entry);
2528        if !timestamp_in_range(query, realtime) {
2529            return Ok(());
2530        }
2531        let Some(bucket_index) = histogram_bucket_index_from_bounds(
2532            realtime,
2533            histogram_start,
2534            histogram_bucket_width,
2535            histogram_bucket_count,
2536        ) else {
2537            return Ok(());
2538        };
2539        if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
2540            increment_counter_by(&mut bucket.values, UNSET_VALUE, 1);
2541            stats.histogram_updates = stats.histogram_updates.saturating_add(1);
2542        }
2543        Ok(())
2544    };
2545
2546    match candidates {
2547        IndexedCandidateSet::All { .. } => {
2548            let mut entry_offsets = Vec::new();
2549            file.entry_offsets(&mut entry_offsets)?;
2550            for entry_offset in entry_offsets {
2551                visit(entry_offset)?;
2552            }
2553        }
2554        IndexedCandidateSet::Set { offsets, .. } => {
2555            for entry_offset in offsets {
2556                visit(*entry_offset)?;
2557            }
2558        }
2559    }
2560
2561    Ok(())
2562}
2563
2564fn indexed_visit_entries<F>(
2565    file: &JournalFile<Mmap>,
2566    cursor: Option<InlinedCursor>,
2567    mut visitor: F,
2568) -> Result<()>
2569where
2570    F: FnMut(NonZeroU64) -> Result<()>,
2571{
2572    let Some(mut cursor) = cursor.map(|cursor| cursor.head()) else {
2573        return Ok(());
2574    };
2575    let mut needle = NonZeroU64::MIN;
2576    while let Some(entry_offset) = cursor.next_until(file, needle)? {
2577        visitor(entry_offset)?;
2578        let Some(next) = entry_offset.get().checked_add(1).and_then(NonZeroU64::new) else {
2579            break;
2580        };
2581        needle = next;
2582    }
2583    Ok(())
2584}
2585
2586fn handle_row_offset_class(
2587    class: OffsetClass,
2588    accumulator: &mut ExplorerAccumulator,
2589    row_id: u64,
2590    state: &mut RowScanState,
2591    out: &mut RowScan,
2592    apply: &mut ScanApply<'_>,
2593    stats: &mut ExplorerStats,
2594) {
2595    match class {
2596        OffsetClass::Irrelevant => {
2597            stats.data_refs_skipped = stats.data_refs_skipped.saturating_add(1);
2598        }
2599        OffsetClass::FtsNegativeMatch => {
2600            out.fts_negative_match = true;
2601        }
2602        OffsetClass::FtsMatch => {
2603            out.fts_matches = true;
2604        }
2605        OffsetClass::Value(value_index) => {
2606            handle_row_value_class(value_index, accumulator, row_id, state, out, apply, stats)
2607        }
2608    }
2609}
2610
2611fn handle_row_value_class(
2612    value_index: usize,
2613    accumulator: &mut ExplorerAccumulator,
2614    row_id: u64,
2615    state: &mut RowScanState,
2616    out: &mut RowScan,
2617    apply: &mut ScanApply<'_>,
2618    stats: &mut ExplorerStats,
2619) {
2620    if accumulator.value_fts_matches[value_index] {
2621        out.fts_matches = true;
2622    }
2623    let field_index = accumulator.value_field_indices[value_index];
2624    let first_for_field = if state.use_first_value
2625        || accumulator.flags[field_index] & (FACET_PUBLIC | FACET_HISTOGRAM) != 0
2626    {
2627        accumulator.mark_field_seen(field_index, row_id)
2628    } else {
2629        true
2630    };
2631    if state.use_first_value && first_for_field {
2632        state.fields_missing_from_row = state.fields_missing_from_row.saturating_sub(1);
2633    }
2634    if !state.use_first_value || first_for_field {
2635        if let Some(timestamp) = accumulator.value_source_realtime[value_index] {
2636            out.timestamp = Some(timestamp);
2637        }
2638        match apply {
2639            ScanApply::Immediate => accumulator.apply_value(value_index, None, stats),
2640            ScanApply::Deferred(values) => values.push(value_index),
2641        }
2642    }
2643}
2644
2645fn record_row_scan_early_stop(stats: &mut ExplorerStats) {
2646    stats.early_stop_opportunities = stats.early_stop_opportunities.saturating_add(1);
2647    stats.early_stops = stats.early_stops.saturating_add(1);
2648}
2649
2650fn cached_offset_class_for_accumulator(
2651    file: &JournalFile<Mmap>,
2652    row: &mut CurrentRowView,
2653    data_offset: NonZeroU64,
2654    accumulator: &ExplorerAccumulator,
2655    column_fields: Option<&mut Vec<Vec<u8>>>,
2656    stats: &mut ExplorerStats,
2657) -> Result<Option<OffsetClass>> {
2658    let Some(class) = accumulator.offset_cache.lookup(data_offset) else {
2659        return Ok(None);
2660    };
2661    if let Some(column_fields) = column_fields {
2662        if let Some((field, _)) = read_payload_field(file, row, data_offset, stats)? {
2663            column_fields.push(field);
2664        }
2665    }
2666    stats.data_cache_hits = stats.data_cache_hits.saturating_add(1);
2667    Ok(Some(class))
2668}
2669
2670fn payload_for_classification<'a>(
2671    file: &JournalFile<Mmap>,
2672    row: &'a mut CurrentRowView,
2673    data_offset: NonZeroU64,
2674    stats: &mut ExplorerStats,
2675) -> Result<&'a [u8]> {
2676    stats.data_cache_misses = stats.data_cache_misses.saturating_add(1);
2677    stats.data_payloads_loaded = stats.data_payloads_loaded.saturating_add(1);
2678    let was_compressed = file.data_ref(data_offset)?.is_compressed();
2679    let payload = row.read_payload_at(file, data_offset)?;
2680    if was_compressed {
2681        stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2682    }
2683    Ok(row.payload_slice(payload))
2684}
2685
2686fn fts_flags_for_value(
2687    value: &[u8],
2688    needs_fts: bool,
2689    query: &ExplorerQuery,
2690    stats: &mut ExplorerStats,
2691) -> (bool, bool) {
2692    if !needs_fts {
2693        return (false, false);
2694    }
2695    stats.fts_scans = stats.fts_scans.saturating_add(1);
2696    match match_fts_query(value, query) {
2697        FtsTermMatch::Positive => (true, false),
2698        FtsTermMatch::Negative => (false, true),
2699        FtsTermMatch::None => (false, false),
2700    }
2701}
2702
2703fn structured_payload_class(
2704    field: &[u8],
2705    value: &[u8],
2706    data_offset: NonZeroU64,
2707    accumulator: &mut ExplorerAccumulator,
2708    fts_matches: bool,
2709    fts_negative_match: bool,
2710) -> OffsetClass {
2711    if fts_negative_match {
2712        OffsetClass::FtsNegativeMatch
2713    } else if let Some(field_index) = accumulator.field_lookup.get(field).copied() {
2714        OffsetClass::Value(accumulator.add_value(field_index, data_offset, value, fts_matches))
2715    } else if fts_matches {
2716        OffsetClass::FtsMatch
2717    } else {
2718        OffsetClass::Irrelevant
2719    }
2720}
2721
2722fn classify_data_for_accumulator(
2723    file: &JournalFile<Mmap>,
2724    row: &mut CurrentRowView,
2725    data_offset: NonZeroU64,
2726    accumulator: &mut ExplorerAccumulator,
2727    needs_fts: bool,
2728    query: &ExplorerQuery,
2729    mut column_fields: Option<&mut Vec<Vec<u8>>>,
2730    stats: &mut ExplorerStats,
2731) -> Result<OffsetClass> {
2732    if let Some(class) = cached_offset_class_for_accumulator(
2733        file,
2734        row,
2735        data_offset,
2736        accumulator,
2737        column_fields.as_mut().map(|fields| &mut **fields),
2738        stats,
2739    )? {
2740        return Ok(class);
2741    }
2742
2743    let payload = payload_for_classification(file, row, data_offset, stats)?;
2744    let Some((field, value)) = split_payload_bytes(payload) else {
2745        let class = classify_unstructured_payload(payload, needs_fts, query, stats);
2746        accumulator.offset_cache.insert(data_offset, class);
2747        stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2748        return Ok(class);
2749    };
2750    if let Some(column_fields) = column_fields {
2751        column_fields.push(field.to_vec());
2752    }
2753
2754    let (fts_matches, fts_negative_match) = fts_flags_for_value(value, needs_fts, query, stats);
2755    let class = structured_payload_class(
2756        field,
2757        value,
2758        data_offset,
2759        accumulator,
2760        fts_matches,
2761        fts_negative_match,
2762    );
2763    accumulator.offset_cache.insert(data_offset, class);
2764    stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2765    Ok(class)
2766}
2767
2768fn read_payload_field(
2769    file: &JournalFile<Mmap>,
2770    row: &mut CurrentRowView,
2771    data_offset: NonZeroU64,
2772    stats: &mut ExplorerStats,
2773) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
2774    let was_compressed = file.data_ref(data_offset)?.is_compressed();
2775    let payload = row.read_payload_at(file, data_offset)?;
2776    if was_compressed {
2777        stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2778    }
2779    let payload = row.payload_slice(payload);
2780    Ok(split_payload_bytes(payload).map(|(field, value)| (field.to_vec(), value.to_vec())))
2781}
2782
2783fn classify_unstructured_payload(
2784    payload: &[u8],
2785    needs_fts: bool,
2786    query: &ExplorerQuery,
2787    stats: &mut ExplorerStats,
2788) -> OffsetClass {
2789    if !needs_fts {
2790        return OffsetClass::Irrelevant;
2791    }
2792    stats.fts_scans = stats.fts_scans.saturating_add(1);
2793    match match_fts_query(payload, query) {
2794        FtsTermMatch::Positive => OffsetClass::FtsMatch,
2795        FtsTermMatch::Negative => OffsetClass::FtsNegativeMatch,
2796        FtsTermMatch::None => OffsetClass::Irrelevant,
2797    }
2798}
2799
2800fn histogram_bucket_index_from_bounds(
2801    realtime_usec: u64,
2802    start_realtime_usec: u64,
2803    bucket_width_usec: u64,
2804    bucket_count: usize,
2805) -> Option<usize> {
2806    if bucket_count == 0 {
2807        return None;
2808    }
2809    realtime_usec
2810        .saturating_sub(start_realtime_usec)
2811        .checked_div(bucket_width_usec.max(1))
2812        .map(|index| (index as usize).min(bucket_count - 1))
2813}
2814
2815fn validate_query(query: &ExplorerQuery) -> Result<()> {
2816    if query
2817        .after_realtime_usec
2818        .zip(query.before_realtime_usec)
2819        .is_some_and(|(after, before)| after > before)
2820    {
2821        return Err(SdkError::InvalidPath(
2822            "after_realtime_usec must be <= before_realtime_usec".to_string(),
2823        ));
2824    }
2825    for filter in &query.filters {
2826        if filter.field.is_empty() || filter.field.contains(&b'=') {
2827            return Err(SdkError::InvalidPath(
2828                "filter field must be non-empty and must not contain '='".to_string(),
2829            ));
2830        }
2831    }
2832    for field in query.facets.iter().chain(query.histogram.iter()) {
2833        if field.is_empty() || field.contains(&b'=') {
2834            return Err(SdkError::InvalidPath(
2835                "facet and histogram fields must be non-empty and must not contain '='".to_string(),
2836            ));
2837        }
2838    }
2839    let mut seen_facets: HashSet<&[u8]> = HashSet::new();
2840    for facet in &query.facets {
2841        if !seen_facets.insert(facet) {
2842            return Err(SdkError::InvalidPath(
2843                "facet fields must not be duplicated".to_string(),
2844            ));
2845        }
2846    }
2847    Ok(())
2848}
2849
2850fn validate_no_debug_column_collection(query: &ExplorerQuery) -> Result<()> {
2851    if query.debug_collect_column_fields_by_row_traversal {
2852        return Err(SdkError::Unsupported(
2853            "debug_collect_column_fields_by_row_traversal is a debug-only discrepancy tool; production explorer queries must use FIELD-index column catalogs instead",
2854        ));
2855    }
2856    Ok(())
2857}
2858
2859fn validate_indexed_query(query: &ExplorerQuery) -> Result<()> {
2860    if query.field_mode != ExplorerFieldMode::AllValues {
2861        return Err(SdkError::Unsupported(
2862            "indexed explorer strategy requires ExplorerFieldMode::AllValues",
2863        ));
2864    }
2865    if query_has_fts(query) {
2866        return Err(SdkError::Unsupported(
2867            "indexed explorer strategy does not support FTS",
2868        ));
2869    }
2870    if query.use_source_realtime
2871        && (query.after_realtime_usec.is_some()
2872            || query.before_realtime_usec.is_some()
2873            || query.histogram.is_some())
2874    {
2875        return Err(SdkError::Unsupported(
2876            "indexed explorer strategy requires commit realtime for time-bounded facets and histograms",
2877        ));
2878    }
2879    Ok(())
2880}
2881
2882fn explorer_outputs_match(left: &ExplorerResult, right: &ExplorerResult) -> bool {
2883    if left.rows.len() != right.rows.len() {
2884        return false;
2885    }
2886    if left.rows.iter().zip(&right.rows).any(|(left, right)| {
2887        left.realtime_usec != right.realtime_usec
2888            || left.cursor != right.cursor
2889            || left.payloads != right.payloads
2890    }) {
2891        return false;
2892    }
2893    if left.facets != right.facets {
2894        return false;
2895    }
2896    explorer_histograms_match(left.histogram.as_ref(), right.histogram.as_ref())
2897}
2898
2899fn explorer_histograms_match(
2900    left: Option<&ExplorerHistogram>,
2901    right: Option<&ExplorerHistogram>,
2902) -> bool {
2903    match (left, right) {
2904        (None, None) => true,
2905        (Some(left), Some(right)) => {
2906            left.field == right.field
2907                && left.buckets.len() == right.buckets.len()
2908                && left
2909                    .buckets
2910                    .iter()
2911                    .zip(&right.buckets)
2912                    .all(|(left, right)| {
2913                        left.start_realtime_usec == right.start_realtime_usec
2914                            && left.end_realtime_usec == right.end_realtime_usec
2915                            && left.values == right.values
2916                    })
2917        }
2918        _ => false,
2919    }
2920}
2921
2922fn query_needs_source_realtime_main(query: &ExplorerQuery) -> bool {
2923    query.use_source_realtime
2924        && (query.after_realtime_usec.is_some()
2925            || query.before_realtime_usec.is_some()
2926            || query.histogram.is_some()
2927            || query.limit > 0)
2928}
2929
2930fn facet_pass_needs_source_realtime(query: &ExplorerQuery) -> bool {
2931    query.use_source_realtime
2932        && (query.after_realtime_usec.is_some() || query.before_realtime_usec.is_some())
2933}
2934
2935fn query_needs_main_pass(query: &ExplorerQuery) -> bool {
2936    query.limit > 0 || query.histogram.is_some()
2937}
2938
2939fn explorer_result_for_query(query: &ExplorerQuery) -> ExplorerResult {
2940    ExplorerResult {
2941        histogram: query
2942            .histogram
2943            .as_ref()
2944            .map(|field| new_histogram(field, query)),
2945        ..ExplorerResult::default()
2946    }
2947}
2948
2949fn explorer_control_stopped(control: Option<&ExplorerControl<'_>>) -> bool {
2950    control.and_then(ExplorerControl::stop_reason).is_some()
2951}
2952
2953fn can_run_combined_explorer_pass(facet_groups: &[FacetPassGroup]) -> bool {
2954    facet_groups
2955        .iter()
2956        .all(|group| group.excluded_field.is_none())
2957}
2958
2959fn combined_facet_indices(facet_groups: &[FacetPassGroup]) -> Vec<usize> {
2960    facet_groups
2961        .iter()
2962        .flat_map(|group| group.facet_indices.iter().copied())
2963        .collect()
2964}
2965
2966fn record_combined_unsampled_row(
2967    stats: &mut ExplorerStats,
2968    mode: CombinedScanMode,
2969    commit_realtime: u64,
2970    row_count: u64,
2971    count_rows_unsampled: bool,
2972) {
2973    record_last_realtime(stats, commit_realtime);
2974    if mode.include_main {
2975        stats.rows_matched = stats.rows_matched.saturating_add(row_count);
2976    }
2977    if mode.include_facets {
2978        stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(row_count);
2979    }
2980    if count_rows_unsampled {
2981        stats.rows_unsampled = stats.rows_unsampled.saturating_add(row_count);
2982    }
2983    stats.sampling_unsampled = stats.sampling_unsampled.saturating_add(1);
2984}
2985
2986fn update_combined_matched_stats(
2987    stats: &mut ExplorerStats,
2988    mode: CombinedScanMode,
2989    effective_realtime: u64,
2990    control: Option<&mut ExplorerControl<'_>>,
2991) -> bool {
2992    let mut stop_after_matched_row = false;
2993    if mode.include_main {
2994        stats.rows_matched = stats.rows_matched.saturating_add(1);
2995        stop_after_matched_row = control
2996            .map(|control| control.emit_matched_row(effective_realtime, stats.rows_matched))
2997            .unwrap_or(false);
2998    }
2999    if mode.include_facets {
3000        stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(1);
3001    }
3002    stop_after_matched_row
3003}
3004
3005fn should_stop_when_rows_full(
3006    query: &ExplorerQuery,
3007    rows: &[ExplorerRow],
3008    effective_realtime: u64,
3009    rows_matched: u64,
3010) -> bool {
3011    if !query.stop_when_rows_full || query.limit == 0 || rows.len() < query.limit {
3012        return false;
3013    }
3014    let every = query.stop_when_rows_full_check_every.max(1);
3015    if rows_matched == 0 || rows_matched % every != 0 {
3016        return false;
3017    }
3018    match query.direction {
3019        Direction::Backward => {
3020            rows.iter()
3021                .map(|row| row.realtime_usec)
3022                .min()
3023                .is_some_and(|oldest| {
3024                    effective_realtime < oldest.saturating_sub(query.realtime_slack_usec)
3025                })
3026        }
3027        Direction::Forward => {
3028            rows.iter()
3029                .map(|row| row.realtime_usec)
3030                .max()
3031                .is_some_and(|newest| {
3032                    effective_realtime > newest.saturating_add(query.realtime_slack_usec)
3033                })
3034        }
3035    }
3036}
3037
3038fn row_candidate_to_keep(query: &ExplorerQuery, rows: &[ExplorerRow], realtime_usec: u64) -> bool {
3039    if query.limit == 0 {
3040        return false;
3041    }
3042    if !row_within_anchor(query, realtime_usec) {
3043        return false;
3044    }
3045    if rows.len() < query.limit {
3046        return true;
3047    }
3048    match query.direction {
3049        Direction::Backward => rows
3050            .iter()
3051            .map(|row| row.realtime_usec)
3052            .min()
3053            .is_some_and(|oldest| realtime_usec >= oldest),
3054        Direction::Forward => rows
3055            .iter()
3056            .map(|row| row.realtime_usec)
3057            .max()
3058            .is_some_and(|newest| realtime_usec <= newest),
3059    }
3060}
3061
3062fn row_within_anchor(query: &ExplorerQuery, realtime_usec: u64) -> bool {
3063    match (query.direction, query.anchor) {
3064        (Direction::Forward, ExplorerAnchor::Realtime(anchor)) => realtime_usec > anchor,
3065        (Direction::Backward, ExplorerAnchor::Realtime(anchor)) => realtime_usec <= anchor,
3066        _ => true,
3067    }
3068}
3069
3070fn add_special_histogram_value(
3071    histogram: Option<&mut ExplorerHistogram>,
3072    realtime_usec: u64,
3073    value: &[u8],
3074    count: u64,
3075    stats: &mut ExplorerStats,
3076) {
3077    let Some(histogram) = histogram else {
3078        return;
3079    };
3080    let Some(bucket_index) = histogram_bucket_index(histogram, realtime_usec) else {
3081        return;
3082    };
3083    if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
3084        increment_counter_by(&mut bucket.values, value, count);
3085        stats.histogram_updates = stats.histogram_updates.saturating_add(1);
3086    }
3087}
3088
3089fn add_estimated_histogram_range(
3090    histogram: Option<&mut ExplorerHistogram>,
3091    from_realtime_usec: u64,
3092    to_realtime_usec: u64,
3093    entries: u64,
3094    stats: &mut ExplorerStats,
3095) {
3096    let Some(histogram) = histogram else {
3097        return;
3098    };
3099    if entries == 0 || from_realtime_usec >= to_realtime_usec {
3100        return;
3101    }
3102
3103    let Some(first) = histogram.buckets.first() else {
3104        return;
3105    };
3106    let Some(last) = histogram.buckets.last() else {
3107        return;
3108    };
3109    let from_realtime_usec = from_realtime_usec.max(first.start_realtime_usec);
3110    let to_realtime_usec = to_realtime_usec.min(last.end_realtime_usec);
3111    if from_realtime_usec >= to_realtime_usec {
3112        return;
3113    }
3114
3115    let total = to_realtime_usec.saturating_sub(from_realtime_usec).max(1);
3116    let mut touched = 0u64;
3117    for bucket in &mut histogram.buckets {
3118        if bucket.start_realtime_usec > to_realtime_usec {
3119            break;
3120        }
3121        let overlap_start = bucket.start_realtime_usec.max(from_realtime_usec);
3122        let overlap_end = bucket.end_realtime_usec.min(to_realtime_usec);
3123        if overlap_start >= overlap_end {
3124            continue;
3125        }
3126        let bucket_entries = ((overlap_end.saturating_sub(overlap_start) as u128 * entries as u128)
3127            / total as u128) as u64;
3128        if bucket_entries != 0 {
3129            increment_counter_by(&mut bucket.values, EXPLORER_ESTIMATED_VALUE, bucket_entries);
3130        }
3131        touched = touched.saturating_add(1);
3132    }
3133    stats.histogram_updates = stats.histogram_updates.saturating_add(touched);
3134}
3135
3136fn histogram_bucket_index(histogram: &ExplorerHistogram, realtime_usec: u64) -> Option<usize> {
3137    let first = histogram.buckets.first()?;
3138    let width = first
3139        .end_realtime_usec
3140        .saturating_sub(first.start_realtime_usec)
3141        .max(1);
3142    histogram_bucket_index_from_bounds(
3143        realtime_usec,
3144        first.start_realtime_usec,
3145        width,
3146        histogram.buckets.len(),
3147    )
3148}
3149
3150fn payload_from_parts(field: &[u8], value: &[u8]) -> Vec<u8> {
3151    let mut out = Vec::with_capacity(field.len() + 1 + value.len());
3152    out.extend_from_slice(field);
3153    out.push(b'=');
3154    out.extend_from_slice(value);
3155    out
3156}
3157
3158fn split_payload_bytes(payload: &[u8]) -> Option<(&[u8], &[u8])> {
3159    let eq = payload.iter().position(|byte| *byte == b'=')?;
3160    Some((&payload[..eq], &payload[eq + 1..]))
3161}
3162
3163fn parse_source_realtime(value: &[u8]) -> Option<u64> {
3164    std::str::from_utf8(value).ok()?.parse().ok()
3165}
3166
3167fn effective_realtime_from_scan(source_realtime: Option<u64>, commit_realtime: u64) -> u64 {
3168    match source_realtime {
3169        Some(source_realtime) if source_realtime != 0 && source_realtime < commit_realtime => {
3170            source_realtime
3171        }
3172        _ => commit_realtime,
3173    }
3174}
3175
3176fn record_last_realtime(stats: &mut ExplorerStats, commit_realtime: u64) {
3177    if commit_realtime > stats.last_realtime_usec {
3178        stats.last_realtime_usec = commit_realtime;
3179    }
3180}
3181
3182fn record_source_realtime_delta(
3183    stats: &mut ExplorerStats,
3184    source_realtime: Option<u64>,
3185    commit_realtime: u64,
3186) {
3187    let Some(source_realtime) = source_realtime else {
3188        return;
3189    };
3190    if source_realtime == 0 || source_realtime >= commit_realtime {
3191        return;
3192    }
3193    let delta = commit_realtime.saturating_sub(source_realtime);
3194    if delta > stats.max_source_realtime_delta_usec {
3195        stats.max_source_realtime_delta_usec = delta;
3196    }
3197}
3198
3199fn query_has_fts(query: &ExplorerQuery) -> bool {
3200    !query.fts_terms.is_empty()
3201        || !query.fts_patterns.is_empty()
3202        || !query.fts_negative_patterns.is_empty()
3203}
3204
3205fn query_has_positive_fts(query: &ExplorerQuery) -> bool {
3206    if !query.fts_terms.is_empty() {
3207        query.fts_terms.iter().any(|term| !term.negative)
3208    } else {
3209        !query.fts_patterns.is_empty()
3210    }
3211}
3212
3213fn row_rejected_by_fts(query: &ExplorerQuery, scan: &RowScan) -> bool {
3214    query_has_fts(query)
3215        && (scan.fts_negative_match || query_has_positive_fts(query) && !scan.fts_matches)
3216}
3217
3218#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3219enum FtsTermMatch {
3220    None,
3221    Positive,
3222    Negative,
3223}
3224
3225fn match_fts_query(value: &[u8], query: &ExplorerQuery) -> FtsTermMatch {
3226    if !query.fts_terms.is_empty() {
3227        for term in &query.fts_terms {
3228            if term.matches(value) {
3229                return if term.negative {
3230                    FtsTermMatch::Negative
3231                } else {
3232                    FtsTermMatch::Positive
3233                };
3234            }
3235        }
3236        return FtsTermMatch::None;
3237    }
3238
3239    if matches_fts(value, &query.fts_negative_patterns) {
3240        FtsTermMatch::Negative
3241    } else if matches_fts(value, &query.fts_patterns) {
3242        FtsTermMatch::Positive
3243    } else {
3244        FtsTermMatch::None
3245    }
3246}
3247
3248fn matches_fts(value: &[u8], patterns: &[Vec<u8>]) -> bool {
3249    patterns
3250        .iter()
3251        .filter(|pattern| !pattern.is_empty())
3252        .any(|pattern| contains_ascii_case_insensitive(value, pattern))
3253}
3254
3255fn contains_ascii_case_insensitive(haystack: &[u8], needle: &[u8]) -> bool {
3256    if needle.is_empty() {
3257        return true;
3258    }
3259    if haystack.len() < needle.len() {
3260        return false;
3261    }
3262    haystack.windows(needle.len()).any(|window| {
3263        window
3264            .iter()
3265            .zip(needle)
3266            .all(|(left, right)| left.eq_ignore_ascii_case(right))
3267    })
3268}
3269
3270fn find_ascii_case_insensitive(haystack: &[u8], needle: &[u8]) -> Option<usize> {
3271    if needle.is_empty() {
3272        return Some(0);
3273    }
3274    if haystack.len() < needle.len() {
3275        return None;
3276    }
3277    haystack.windows(needle.len()).position(|window| {
3278        window
3279            .iter()
3280            .zip(needle)
3281            .all(|(left, right)| left.eq_ignore_ascii_case(right))
3282    })
3283}
3284
3285fn timestamp_in_range(query: &ExplorerQuery, timestamp: u64) -> bool {
3286    if query
3287        .after_realtime_usec
3288        .is_some_and(|after| timestamp < after)
3289    {
3290        return false;
3291    }
3292    if query
3293        .before_realtime_usec
3294        .is_some_and(|before| timestamp > before)
3295    {
3296        return false;
3297    }
3298    true
3299}
3300
3301fn stop_by_commit_time(query: &ExplorerQuery, commit_realtime: u64) -> bool {
3302    // Netdata expands the seek/stop side by the learned journal-vs-source
3303    // realtime delta, then still uses commit realtime for the fast boundary
3304    // checks before row DATA is scanned for _SOURCE_REALTIME_TIMESTAMP.
3305    match query.direction {
3306        Direction::Forward => query.before_realtime_usec.is_some_and(|before| {
3307            commit_realtime > before.saturating_add(query.realtime_slack_usec)
3308        }),
3309        Direction::Backward => query
3310            .after_realtime_usec
3311            .is_some_and(|after| commit_realtime < after),
3312    }
3313}
3314
3315fn skip_by_commit_time(query: &ExplorerQuery, commit_realtime: u64) -> bool {
3316    match query.direction {
3317        Direction::Forward => query
3318            .after_realtime_usec
3319            .is_some_and(|after| commit_realtime < after),
3320        Direction::Backward => query.before_realtime_usec.is_some_and(|before| {
3321            commit_realtime > before.saturating_add(query.realtime_slack_usec)
3322        }),
3323    }
3324}
3325
3326fn new_histogram(field: &[u8], query: &ExplorerQuery) -> ExplorerHistogram {
3327    let (start, end) = histogram_bounds(query);
3328    let target_buckets = query.histogram_target_buckets.max(1);
3329    let mut width = histogram_bar_width_usec(start, end, target_buckets);
3330    let start = histogram_slot_baseline_usec(start, width);
3331    let mut end = histogram_slot_baseline_usec(end, width).saturating_add(width);
3332    let mut bucket_count = end
3333        .saturating_sub(start)
3334        .checked_div(width)
3335        .unwrap_or(0)
3336        .saturating_add(1) as usize;
3337    if bucket_count > 1001 {
3338        bucket_count = 1001;
3339        width = end
3340            .saturating_sub(start)
3341            .checked_div(1000)
3342            .unwrap_or(0)
3343            .max(1);
3344        end = start.saturating_add(width.saturating_mul(1000));
3345    }
3346    let mut buckets = Vec::with_capacity(bucket_count);
3347    for index in 0..bucket_count {
3348        let bucket_start = start.saturating_add(width.saturating_mul(index as u64));
3349        let bucket_end = if index + 1 == bucket_count {
3350            end.saturating_add(1)
3351        } else {
3352            bucket_start.saturating_add(width)
3353        };
3354        buckets.push(ExplorerHistogramBucket {
3355            start_realtime_usec: bucket_start,
3356            end_realtime_usec: bucket_end,
3357            values: HashMap::new(),
3358        });
3359    }
3360    ExplorerHistogram {
3361        field: field.to_vec(),
3362        buckets,
3363    }
3364}
3365
3366pub(crate) fn empty_histogram_for_query(field: &[u8], query: &ExplorerQuery) -> ExplorerHistogram {
3367    new_histogram(field, query)
3368}
3369
3370fn histogram_bar_width_usec(after: u64, before: u64, target_buckets: usize) -> u64 {
3371    const USEC_PER_SEC: u64 = 1_000_000;
3372    const VALID_DURATIONS_SECONDS: &[u64] = &[
3373        1, 2, 5, 10, 15, 30, 60, 120, 180, 300, 600, 900, 1800, 3600, 7200, 21600, 28800, 43200,
3374        86400, 172800, 259200, 432000, 604800, 1209600, 2592000,
3375    ];
3376    let duration = before.saturating_sub(after);
3377    for seconds in VALID_DURATIONS_SECONDS.iter().rev() {
3378        let width = seconds.saturating_mul(USEC_PER_SEC);
3379        if width != 0 && duration / width >= target_buckets as u64 {
3380            return width;
3381        }
3382    }
3383    USEC_PER_SEC
3384}
3385
3386fn histogram_slot_baseline_usec(value: u64, width: u64) -> u64 {
3387    value.saturating_sub(value % width.max(1))
3388}
3389
3390fn histogram_bounds(query: &ExplorerQuery) -> (u64, u64) {
3391    let start = query
3392        .histogram_after_realtime_usec
3393        .or(query.after_realtime_usec)
3394        .unwrap_or(0);
3395    let end = query
3396        .histogram_before_realtime_usec
3397        .or(query.before_realtime_usec)
3398        .unwrap_or_else(|| start.saturating_add(3_600_000_000));
3399    if end <= start {
3400        (start, start.saturating_add(1))
3401    } else {
3402        (start, end)
3403    }
3404}
3405
3406fn increment_counter_by(map: &mut HashMap<Vec<u8>, u64>, value: &[u8], delta: u64) {
3407    if let Some(count) = map.get_mut(value) {
3408        *count = count.saturating_add(delta);
3409    } else {
3410        map.insert(value.to_vec(), delta);
3411    }
3412}
3413
3414#[cfg(test)]
3415mod tests {
3416    use super::*;
3417    use journal_core::file::{JournalFileOptions, JournalWriter, MmapMut};
3418    use journal_core::repository::File as RepoFile;
3419    use tempfile::TempDir;
3420
3421    fn test_uuid(seed: u8) -> uuid::Uuid {
3422        uuid::Uuid::from_bytes([seed; 16])
3423    }
3424
3425    fn create_writer(
3426        path: &std::path::Path,
3427        compression: Option<(Compression, usize)>,
3428    ) -> (JournalFile<MmapMut>, JournalWriter) {
3429        if let Some(parent) = path.parent() {
3430            std::fs::create_dir_all(parent).expect("create journal parent");
3431        }
3432        let repo_file = RepoFile::from_path(path).expect("repo file");
3433        let mut options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
3434        if let Some((compression, threshold)) = compression {
3435            options = options
3436                .with_compression(compression)
3437                .with_compress_threshold(threshold);
3438        }
3439        let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
3440        let writer = if let Some((compression, threshold)) = compression {
3441            JournalWriter::new_with_compression(&mut file, 1, test_uuid(4), compression, threshold)
3442                .expect("writer")
3443        } else {
3444            JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer")
3445        };
3446        (file, writer)
3447    }
3448
3449    fn write_entries(
3450        path: &std::path::Path,
3451        compression: Option<(Compression, usize)>,
3452        entries: &[(&[&[u8]], u64)],
3453    ) {
3454        let (mut file, mut writer) = create_writer(path, compression);
3455        for (payloads, realtime) in entries {
3456            writer
3457                .add_entry(&mut file, payloads, *realtime, *realtime)
3458                .expect("write entry");
3459        }
3460        file.sync().expect("sync journal");
3461    }
3462
3463    fn write_many_entries(path: &std::path::Path, count: usize) {
3464        let (mut file, mut writer) = create_writer(path, None);
3465        for index in 0..count {
3466            let message = format!("MESSAGE=row-{index}");
3467            let service = if index % 2 == 0 {
3468                b"SERVICE=even".as_slice()
3469            } else {
3470                b"SERVICE=odd".as_slice()
3471            };
3472            let payloads: [&[u8]; 2] = [message.as_bytes(), service];
3473            let realtime = 1_700_000_000_000_000u64.saturating_add(index as u64);
3474            writer
3475                .add_entry(&mut file, &payloads, realtime, realtime)
3476                .expect("write entry");
3477        }
3478        file.sync().expect("sync journal");
3479    }
3480
3481    #[test]
3482    fn explorer_control_reports_progress_during_large_scan() {
3483        let dir = TempDir::new().expect("tempdir");
3484        let path = dir.path().join("progress.journal");
3485        write_many_entries(&path, 9_000);
3486
3487        let mut reports = Vec::new();
3488        let mut progress = |progress: ExplorerProgress| {
3489            reports.push(progress.stats.rows_examined);
3490        };
3491        let mut control = ExplorerControl::new();
3492        control.set_progress_interval(Duration::ZERO);
3493        control.set_progress_callback(Some(&mut progress));
3494        let mut reader = FileReader::open(&path).expect("open reader");
3495        let query = ExplorerQuery {
3496            facets: vec![b"SERVICE".to_vec()],
3497            limit: 0,
3498            ..ExplorerQuery::default()
3499        };
3500
3501        let result = reader
3502            .explore_with_strategy_and_control(&query, ExplorerStrategy::Traversal, &mut control)
3503            .expect("explore");
3504
3505        assert_eq!(control.stop_reason(), None);
3506        assert_eq!(result.stats.rows_examined, 9_000);
3507        assert!(!reports.is_empty());
3508        assert!(reports.iter().any(|rows| *rows >= 8_191));
3509    }
3510
3511    #[test]
3512    fn explorer_control_cancels_inside_large_scan() {
3513        let dir = TempDir::new().expect("tempdir");
3514        let path = dir.path().join("cancel.journal");
3515        write_many_entries(&path, 9_000);
3516
3517        let is_cancelled = || true;
3518        let mut control = ExplorerControl::new();
3519        control.set_cancellation_callback(Some(&is_cancelled));
3520        let mut reader = FileReader::open(&path).expect("open reader");
3521        let query = ExplorerQuery {
3522            facets: vec![b"SERVICE".to_vec()],
3523            limit: 0,
3524            ..ExplorerQuery::default()
3525        };
3526
3527        let result = reader
3528            .explore_with_strategy_and_control(&query, ExplorerStrategy::Traversal, &mut control)
3529            .expect("explore");
3530
3531        assert_eq!(control.stop_reason(), Some(ExplorerStopReason::Cancelled));
3532        assert!(result.stats.rows_examined < 9_000);
3533    }
3534
3535    #[test]
3536    fn explorer_filters_with_or_values_and_and_fields() {
3537        let dir = TempDir::new().expect("tempdir");
3538        let path = dir.path().join("filter.journal");
3539        write_entries(
3540            &path,
3541            None,
3542            &[
3543                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3544                (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
3545                (&[b"SERVICE=b", b"PRIORITY=4"], 3_000),
3546            ],
3547        );
3548
3549        let mut reader = FileReader::open(&path).expect("open reader");
3550        let query = ExplorerQuery {
3551            filters: vec![
3552                ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec(), b"b".to_vec()]),
3553                ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
3554            ],
3555            facets: vec![b"SERVICE".to_vec()],
3556            limit: 10,
3557            ..ExplorerQuery::default()
3558        };
3559
3560        let result = reader.explore(&query).expect("explore");
3561        assert_eq!(result.rows.len(), 2);
3562        let service = result
3563            .facets
3564            .get(b"SERVICE".as_slice())
3565            .expect("service facet");
3566        assert_eq!(service.get(b"a".as_slice()), Some(&1));
3567        assert_eq!(service.get(b"b".as_slice()), Some(&1));
3568        assert!(result.stats.data_cache_misses > 0);
3569    }
3570
3571    #[test]
3572    fn explorer_rejects_debug_row_traversal_column_collection() {
3573        let dir = TempDir::new().expect("tempdir");
3574        let path = dir.path().join("debug-column-collection.journal");
3575        write_entries(&path, None, &[(&[b"PRIORITY=3", b"MESSAGE=hello"], 1_000)]);
3576
3577        let query = ExplorerQuery {
3578            facets: vec![b"PRIORITY".to_vec()],
3579            debug_collect_column_fields_by_row_traversal: true,
3580            ..ExplorerQuery::default()
3581        };
3582
3583        let mut reader = FileReader::open(&path).expect("open reader");
3584        let err = reader
3585            .explore(&query)
3586            .expect_err("debug-only column collection is rejected");
3587        assert!(matches!(err, SdkError::Unsupported(_)));
3588        assert!(
3589            err.to_string()
3590                .contains("debug_collect_column_fields_by_row_traversal")
3591        );
3592
3593        let mut reader = FileReader::open(&path).expect("reopen reader");
3594        let err = reader
3595            .explore_with_strategy_cursor_rows(&query, ExplorerStrategy::Traversal)
3596            .expect_err("cursor-row explorer also rejects debug-only column collection");
3597        assert!(matches!(err, SdkError::Unsupported(_)));
3598    }
3599
3600    #[test]
3601    fn explorer_skips_irrelevant_compressed_data_for_facets() {
3602        let dir = TempDir::new().expect("tempdir");
3603        let path = dir.path().join("compressed.journal");
3604        let large_message = b"MESSAGE=abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
3605        write_entries(
3606            &path,
3607            Some((Compression::Zstd, 32)),
3608            &[(&[b"PRIORITY=3", large_message], 1_000)],
3609        );
3610
3611        let mut reader = FileReader::open(&path).expect("open reader");
3612        let query = ExplorerQuery {
3613            facets: vec![b"PRIORITY".to_vec()],
3614            limit: 0,
3615            ..ExplorerQuery::default()
3616        };
3617
3618        let result = reader.explore(&query).expect("explore");
3619        let priority = result
3620            .facets
3621            .get(b"PRIORITY".as_slice())
3622            .expect("priority facet");
3623        assert_eq!(priority.get(b"3".as_slice()), Some(&1));
3624        assert_eq!(result.stats.payloads_decompressed, 0);
3625        assert_eq!(result.stats.data_refs_seen, 1);
3626        assert_eq!(result.stats.early_stops, 1);
3627    }
3628
3629    #[test]
3630    fn explorer_reuses_classified_data_objects() {
3631        let dir = TempDir::new().expect("tempdir");
3632        let path = dir.path().join("reuse.journal");
3633        write_entries(
3634            &path,
3635            None,
3636            &[
3637                (&[b"PRIORITY=3"], 1_000),
3638                (&[b"PRIORITY=3"], 2_000),
3639                (&[b"PRIORITY=3"], 3_000),
3640            ],
3641        );
3642
3643        let mut reader = FileReader::open(&path).expect("open reader");
3644        let query = ExplorerQuery {
3645            facets: vec![b"PRIORITY".to_vec()],
3646            limit: 0,
3647            ..ExplorerQuery::default()
3648        };
3649
3650        let result = reader.explore(&query).expect("explore");
3651        let priority = result
3652            .facets
3653            .get(b"PRIORITY".as_slice())
3654            .expect("priority facet");
3655        assert_eq!(priority.get(b"3".as_slice()), Some(&3));
3656        assert!(result.stats.data_cache_hits >= 2);
3657    }
3658
3659    #[test]
3660    fn explorer_groups_facets_with_same_filter_set() {
3661        let dir = TempDir::new().expect("tempdir");
3662        let path = dir.path().join("grouped-facets.journal");
3663        write_entries(
3664            &path,
3665            None,
3666            &[
3667                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3668                (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3669            ],
3670        );
3671
3672        let mut reader = FileReader::open(&path).expect("open reader");
3673        let query = ExplorerQuery {
3674            facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
3675            limit: 0,
3676            ..ExplorerQuery::default()
3677        };
3678
3679        let result = reader.explore(&query).expect("explore");
3680        assert_eq!(result.stats.rows_examined, 2);
3681        assert_eq!(result.stats.facet_rows_matched, 2);
3682        assert_eq!(
3683            result
3684                .facets
3685                .get(b"SERVICE".as_slice())
3686                .and_then(|values| values.get(b"a".as_slice())),
3687            Some(&1)
3688        );
3689        assert_eq!(
3690            result
3691                .facets
3692                .get(b"PRIORITY".as_slice())
3693                .and_then(|values| values.get(b"4".as_slice())),
3694            Some(&1)
3695        );
3696    }
3697
3698    #[test]
3699    fn explorer_combines_rows_histogram_and_facets_in_one_pass() {
3700        let dir = TempDir::new().expect("tempdir");
3701        let path = dir.path().join("combined-pass.journal");
3702        write_entries(
3703            &path,
3704            None,
3705            &[
3706                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3707                (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3708            ],
3709        );
3710
3711        let mut reader = FileReader::open(&path).expect("open reader");
3712        let query = ExplorerQuery {
3713            facets: vec![b"SERVICE".to_vec()],
3714            histogram: Some(b"PRIORITY".to_vec()),
3715            histogram_target_buckets: 2,
3716            limit: 2,
3717            ..ExplorerQuery::default()
3718        };
3719
3720        let result = reader.explore(&query).expect("explore");
3721        assert_eq!(result.rows.len(), 2);
3722        assert_eq!(result.stats.rows_examined, 2);
3723        assert_eq!(result.stats.rows_matched, 2);
3724        assert_eq!(result.stats.facet_rows_matched, 2);
3725        assert_eq!(
3726            result
3727                .facets
3728                .get(b"SERVICE".as_slice())
3729                .and_then(|values| values.get(b"a".as_slice())),
3730            Some(&1)
3731        );
3732        let histogram_total = result
3733            .histogram
3734            .as_ref()
3735            .expect("histogram")
3736            .buckets
3737            .iter()
3738            .flat_map(|bucket| bucket.values.values())
3739            .sum::<u64>();
3740        assert_eq!(histogram_total, 2);
3741    }
3742
3743    #[test]
3744    fn explorer_sampling_uses_actual_histogram_bucket_count() {
3745        let query = ExplorerQuery {
3746            after_realtime_usec: Some(1_733_494_460_000_000),
3747            before_realtime_usec: Some(1_735_656_412_000_000),
3748            histogram: Some(b"PRIORITY".to_vec()),
3749            histogram_target_buckets: 300,
3750            sampling: Some(ExplorerSampling {
3751                budget: 20_000,
3752                matched_files: 200,
3753                file_head_realtime_usec: 1_733_494_460_000_000,
3754                file_tail_realtime_usec: 1_735_656_412_000_000,
3755                file_head_seqnum: 1,
3756                file_tail_seqnum: 2,
3757                file_entries: 2,
3758            }),
3759            ..ExplorerQuery::default()
3760        };
3761
3762        let bucket_count = histogram_bucket_count_for_query(&query).expect("bucket count");
3763        let sampling =
3764            ExplorerSamplingState::for_query(&query, Some(bucket_count)).expect("sampling");
3765
3766        assert_eq!(bucket_count, 302);
3767        assert_eq!(sampling.per_slot_sampled.len(), bucket_count);
3768    }
3769
3770    #[test]
3771    fn explorer_sampling_seqnum_estimate_clamps_over_scanned_to_one() {
3772        let query = ExplorerQuery {
3773            after_realtime_usec: Some(1),
3774            before_realtime_usec: Some(100),
3775            direction: Direction::Forward,
3776            sampling: Some(ExplorerSampling {
3777                budget: 20,
3778                matched_files: 1,
3779                file_head_realtime_usec: 1,
3780                file_tail_realtime_usec: 100,
3781                file_head_seqnum: 1,
3782                file_tail_seqnum: 100,
3783                file_entries: 3,
3784            }),
3785            ..ExplorerQuery::default()
3786        };
3787        let mut sampling = ExplorerSamplingState::for_query(&query, None).expect("sampling");
3788        sampling.per_file_sampled = 10;
3789
3790        assert_eq!(sampling.estimate_remaining_rows_by_seqnum(5), Some(1));
3791    }
3792
3793    #[test]
3794    fn explorer_estimated_histogram_distribution_matches_netdata_integer_math() {
3795        let mut histogram = ExplorerHistogram {
3796            field: b"PRIORITY".to_vec(),
3797            buckets: vec![
3798                ExplorerHistogramBucket {
3799                    start_realtime_usec: 0,
3800                    end_realtime_usec: 10,
3801                    values: HashMap::new(),
3802                },
3803                ExplorerHistogramBucket {
3804                    start_realtime_usec: 10,
3805                    end_realtime_usec: 20,
3806                    values: HashMap::new(),
3807                },
3808                ExplorerHistogramBucket {
3809                    start_realtime_usec: 20,
3810                    end_realtime_usec: 30,
3811                    values: HashMap::new(),
3812                },
3813            ],
3814        };
3815        let mut stats = ExplorerStats::default();
3816
3817        add_estimated_histogram_range(Some(&mut histogram), 0, 30, 10, &mut stats);
3818
3819        let counts = histogram
3820            .buckets
3821            .iter()
3822            .map(|bucket| {
3823                bucket
3824                    .values
3825                    .get(EXPLORER_ESTIMATED_VALUE)
3826                    .copied()
3827                    .unwrap_or_default()
3828            })
3829            .collect::<Vec<_>>();
3830        assert_eq!(counts, vec![3, 3, 3]);
3831        assert_eq!(counts.iter().sum::<u64>(), 9);
3832    }
3833
3834    #[test]
3835    fn explorer_filters_then_combines_outputs_in_one_candidate_pass() {
3836        let dir = TempDir::new().expect("tempdir");
3837        let path = dir.path().join("filtered-combined-pass.journal");
3838        write_entries(
3839            &path,
3840            None,
3841            &[
3842                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3843                (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3844                (&[b"SERVICE=c", b"PRIORITY=3"], 3_000),
3845            ],
3846        );
3847
3848        let mut reader = FileReader::open(&path).expect("open reader");
3849        let query = ExplorerQuery {
3850            filters: vec![ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()])],
3851            facets: vec![b"SERVICE".to_vec()],
3852            histogram: Some(b"SERVICE".to_vec()),
3853            histogram_target_buckets: 2,
3854            limit: 10,
3855            ..ExplorerQuery::default()
3856        };
3857
3858        let result = reader.explore(&query).expect("explore");
3859        assert_eq!(result.rows.len(), 2);
3860        assert_eq!(result.stats.rows_examined, 2);
3861        assert_eq!(result.stats.rows_matched, 2);
3862        assert_eq!(result.stats.facet_rows_matched, 2);
3863        let service = result
3864            .facets
3865            .get(b"SERVICE".as_slice())
3866            .expect("service facet");
3867        assert_eq!(service.get(b"a".as_slice()), Some(&1));
3868        assert_eq!(service.get(b"c".as_slice()), Some(&1));
3869        assert_eq!(service.get(b"b".as_slice()), None);
3870    }
3871
3872    #[test]
3873    fn explorer_cursor_rows_defer_payload_expansion() {
3874        let dir = TempDir::new().expect("tempdir");
3875        let path = dir.path().join("cursor-only-row.journal");
3876        write_entries(
3877            &path,
3878            None,
3879            &[(&[b"SERVICE=a", b"PRIORITY=3", b"MESSAGE=hello"], 1_000)],
3880        );
3881
3882        let query = ExplorerQuery {
3883            limit: 1,
3884            ..ExplorerQuery::default()
3885        };
3886        let mut reader = FileReader::open(&path).expect("open reader");
3887        let result = reader
3888            .explore_with_strategy_cursor_rows(&query, ExplorerStrategy::Traversal)
3889            .expect("explore cursor rows");
3890
3891        assert_eq!(result.rows.len(), 1);
3892        assert!(result.rows[0].payloads.is_empty());
3893        assert_eq!(result.stats.returned_row_expansions, 0);
3894
3895        let cursor = result.rows[0].cursor.clone();
3896        let mut reader = FileReader::open(&path).expect("reopen reader");
3897        reader.seek_cursor(&cursor).expect("seek cursor");
3898        assert!(reader.test_cursor(&cursor).expect("test cursor"));
3899
3900        let mut payloads = Vec::new();
3901        reader
3902            .collect_entry_payloads(&mut payloads)
3903            .expect("collect payloads");
3904        assert!(payloads.iter().any(|payload| payload == b"MESSAGE=hello"));
3905    }
3906
3907    #[test]
3908    fn explorer_same_field_filter_exclusion_counts_filtered_out_facet_values() {
3909        let dir = TempDir::new().expect("tempdir");
3910        let path = dir.path().join("same-field-filter-facet.journal");
3911        write_entries(
3912            &path,
3913            None,
3914            &[
3915                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3916                (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
3917                (&[b"SERVICE=a", b"PRIORITY=4"], 3_000),
3918            ],
3919        );
3920
3921        let mut reader = FileReader::open(&path).expect("open reader");
3922        let query = ExplorerQuery {
3923            filters: vec![
3924                ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec()]),
3925                ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
3926            ],
3927            facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
3928            limit: 0,
3929            ..ExplorerQuery::default()
3930        };
3931
3932        let result = reader.explore(&query).expect("explore");
3933        let service = result
3934            .facets
3935            .get(b"SERVICE".as_slice())
3936            .expect("service facet");
3937        assert_eq!(service.get(b"a".as_slice()), Some(&1));
3938        assert_eq!(service.get(b"b".as_slice()), Some(&1));
3939
3940        let priority = result
3941            .facets
3942            .get(b"PRIORITY".as_slice())
3943            .expect("priority facet");
3944        assert_eq!(priority.get(b"3".as_slice()), Some(&1));
3945        assert_eq!(priority.get(b"4".as_slice()), Some(&1));
3946    }
3947
3948    #[test]
3949    fn explorer_index_strategy_matches_traversal_for_all_values() {
3950        let dir = TempDir::new().expect("tempdir");
3951        let path = dir.path().join("indexed-all-values.journal");
3952        write_entries(
3953            &path,
3954            None,
3955            &[
3956                (&[b"SERVICE=a", b"PRIORITY=3", b"TAG=x"], 1_000),
3957                (&[b"SERVICE=b", b"PRIORITY=3", b"TAG=x"], 2_000),
3958                (&[b"SERVICE=a", b"PRIORITY=4", b"TAG=y", b"TAG=z"], 3_000),
3959                (&[b"PRIORITY=3"], 4_000),
3960            ],
3961        );
3962
3963        let query = ExplorerQuery {
3964            after_realtime_usec: Some(0),
3965            before_realtime_usec: Some(5_000),
3966            filters: vec![ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()])],
3967            facets: vec![b"SERVICE".to_vec(), b"TAG".to_vec()],
3968            histogram: Some(b"SERVICE".to_vec()),
3969            histogram_target_buckets: 2,
3970            limit: 2,
3971            field_mode: ExplorerFieldMode::AllValues,
3972            use_source_realtime: false,
3973            ..ExplorerQuery::default()
3974        };
3975
3976        let mut reader = FileReader::open(&path).expect("open reader");
3977        let result = reader
3978            .explore_with_strategy(&query, ExplorerStrategy::Compare)
3979            .expect("compare");
3980
3981        let comparison = result.comparison.as_ref().expect("comparison diagnostics");
3982        assert_eq!(comparison.index_stats, result.stats);
3983        assert_eq!(comparison.traversal_stats.rows_returned, 2);
3984        assert_eq!(comparison.index_stats.rows_returned, 2);
3985
3986        assert_eq!(result.rows.len(), 2);
3987        let service = result
3988            .facets
3989            .get(b"SERVICE".as_slice())
3990            .expect("service facet");
3991        assert_eq!(service.get(b"a".as_slice()), Some(&1));
3992        assert_eq!(service.get(b"b".as_slice()), Some(&1));
3993        assert_eq!(service.get(UNSET_VALUE), Some(&1));
3994        let histogram = result.histogram.as_ref().expect("histogram");
3995        assert_eq!(histogram.buckets.len(), 2);
3996        assert_eq!(histogram.buckets[0].values.get(b"a".as_slice()), Some(&1));
3997        assert_eq!(histogram.buckets[0].values.get(b"b".as_slice()), Some(&1));
3998        assert_eq!(histogram.buckets[0].values.get(UNSET_VALUE), Some(&1));
3999    }
4000
4001    #[test]
4002    fn explorer_index_strategy_preserves_same_field_filter_exclusion() {
4003        let dir = TempDir::new().expect("tempdir");
4004        let path = dir.path().join("indexed-same-field-filter.journal");
4005        write_entries(
4006            &path,
4007            None,
4008            &[
4009                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
4010                (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
4011                (&[b"SERVICE=a", b"PRIORITY=4"], 3_000),
4012            ],
4013        );
4014
4015        let query = ExplorerQuery {
4016            filters: vec![
4017                ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec()]),
4018                ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
4019            ],
4020            facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
4021            field_mode: ExplorerFieldMode::AllValues,
4022            use_source_realtime: false,
4023            ..ExplorerQuery::default()
4024        };
4025
4026        let mut reader = FileReader::open(&path).expect("open reader");
4027        let result = reader
4028            .explore_with_strategy(&query, ExplorerStrategy::Compare)
4029            .expect("compare");
4030        let service = result
4031            .facets
4032            .get(b"SERVICE".as_slice())
4033            .expect("service facet");
4034        assert_eq!(service.get(b"a".as_slice()), Some(&1));
4035        assert_eq!(service.get(b"b".as_slice()), Some(&1));
4036    }
4037
4038    #[test]
4039    fn explorer_index_strategy_rejects_first_value_semantics() {
4040        let dir = TempDir::new().expect("tempdir");
4041        let path = dir.path().join("indexed-first-value.journal");
4042        write_entries(&path, None, &[(&[b"TAG=one", b"TAG=two"], 1_000)]);
4043
4044        let mut reader = FileReader::open(&path).expect("open reader");
4045        let err = reader
4046            .explore_with_strategy(
4047                &ExplorerQuery {
4048                    facets: vec![b"TAG".to_vec()],
4049                    field_mode: ExplorerFieldMode::FirstValue,
4050                    ..ExplorerQuery::default()
4051                },
4052                ExplorerStrategy::Index,
4053            )
4054            .expect_err("first-value index strategy should be rejected");
4055
4056        assert!(matches!(err, SdkError::Unsupported(_)));
4057    }
4058
4059    #[test]
4060    fn explorer_first_value_counts_one_value_per_selected_field() {
4061        let dir = TempDir::new().expect("tempdir");
4062        let path = dir.path().join("first-value.journal");
4063        write_entries(
4064            &path,
4065            None,
4066            &[(&[b"TAG=one", b"TAG=two", b"SERVICE=a"], 1_000)],
4067        );
4068
4069        let mut all_values_reader = FileReader::open(&path).expect("open all-values reader");
4070        let all_values = all_values_reader
4071            .explore(&ExplorerQuery {
4072                facets: vec![b"TAG".to_vec()],
4073                limit: 0,
4074                field_mode: ExplorerFieldMode::AllValues,
4075                ..ExplorerQuery::default()
4076            })
4077            .expect("all-values explore");
4078        let all_tag = all_values
4079            .facets
4080            .get(b"TAG".as_slice())
4081            .expect("all-values tag facet");
4082        assert_eq!(all_tag.values().sum::<u64>(), 2);
4083        assert_eq!(all_tag.len(), 2);
4084
4085        let mut first_value_reader = FileReader::open(&path).expect("open first-value reader");
4086        let first_value = first_value_reader
4087            .explore(&ExplorerQuery {
4088                facets: vec![b"TAG".to_vec()],
4089                limit: 0,
4090                field_mode: ExplorerFieldMode::FirstValue,
4091                ..ExplorerQuery::default()
4092            })
4093            .expect("first-value explore");
4094        let first_tag = first_value
4095            .facets
4096            .get(b"TAG".as_slice())
4097            .expect("first-value tag facet");
4098        assert_eq!(first_tag.values().sum::<u64>(), 1);
4099        assert_eq!(first_tag.len(), 1);
4100        assert_eq!(first_value.stats.early_stops, 1);
4101    }
4102
4103    #[test]
4104    fn explorer_first_value_does_not_double_count_duplicate_facets_or_histogram() {
4105        let dir = TempDir::new().expect("tempdir");
4106        let path = dir.path().join("first-value-no-double-count.journal");
4107        write_entries(
4108            &path,
4109            None,
4110            &[(
4111                &[
4112                    b"_SOURCE_REALTIME_TIMESTAMP=1000",
4113                    b"TAG=one",
4114                    b"TAG=two",
4115                    b"MESSAGE=after-tag",
4116                ],
4117                1_000,
4118            )],
4119        );
4120
4121        let mut reader = FileReader::open(&path).expect("open reader");
4122        let result = reader
4123            .explore(&ExplorerQuery {
4124                facets: vec![b"TAG".to_vec()],
4125                histogram: Some(b"TAG".to_vec()),
4126                histogram_target_buckets: 1,
4127                limit: 0,
4128                ..ExplorerQuery::default()
4129            })
4130            .expect("explore");
4131
4132        assert_eq!(
4133            result
4134                .facets
4135                .get(b"TAG".as_slice())
4136                .expect("tag facet")
4137                .values()
4138                .sum::<u64>(),
4139            1
4140        );
4141        assert_eq!(
4142            result
4143                .histogram
4144                .as_ref()
4145                .expect("histogram")
4146                .buckets
4147                .iter()
4148                .flat_map(|bucket| bucket.values.values())
4149                .sum::<u64>(),
4150            1
4151        );
4152
4153        let mut all_values_reader = FileReader::open(&path).expect("open all-values reader");
4154        let all_values = all_values_reader
4155            .explore(&ExplorerQuery {
4156                facets: vec![b"TAG".to_vec()],
4157                histogram: Some(b"TAG".to_vec()),
4158                histogram_target_buckets: 1,
4159                limit: 0,
4160                field_mode: ExplorerFieldMode::AllValues,
4161                ..ExplorerQuery::default()
4162            })
4163            .expect("all-values explore");
4164
4165        assert_eq!(
4166            all_values
4167                .facets
4168                .get(b"TAG".as_slice())
4169                .expect("tag facet")
4170                .values()
4171                .sum::<u64>(),
4172            2
4173        );
4174        assert_eq!(
4175            all_values
4176                .histogram
4177                .as_ref()
4178                .expect("histogram")
4179                .buckets
4180                .iter()
4181                .flat_map(|bucket| bucket.values.values())
4182                .sum::<u64>(),
4183            2
4184        );
4185    }
4186
4187    #[test]
4188    fn explorer_first_value_tracks_required_field_identities() {
4189        let dir = TempDir::new().expect("tempdir");
4190        let path = dir.path().join("first-value-identities.journal");
4191        write_entries(
4192            &path,
4193            None,
4194            &[(&[b"TAG=one", b"TAG=two", b"SERVICE=a"], 1_000)],
4195        );
4196
4197        let mut reader = FileReader::open(&path).expect("open reader");
4198        let result = reader
4199            .explore(&ExplorerQuery {
4200                facets: vec![b"TAG".to_vec(), b"SERVICE".to_vec()],
4201                limit: 0,
4202                field_mode: ExplorerFieldMode::FirstValue,
4203                ..ExplorerQuery::default()
4204            })
4205            .expect("explore");
4206
4207        assert_eq!(
4208            result
4209                .facets
4210                .get(b"TAG".as_slice())
4211                .expect("tag facet")
4212                .values()
4213                .sum::<u64>(),
4214            1
4215        );
4216        assert_eq!(
4217            result
4218                .facets
4219                .get(b"SERVICE".as_slice())
4220                .and_then(|values| values.get(b"a".as_slice())),
4221            Some(&1)
4222        );
4223        assert_eq!(result.stats.early_stops, 1);
4224    }
4225
4226    #[test]
4227    fn explorer_rejects_duplicate_facet_fields() {
4228        let dir = TempDir::new().expect("tempdir");
4229        let path = dir.path().join("duplicate-facets.journal");
4230        write_entries(&path, None, &[(&[b"SERVICE=a"], 1_000)]);
4231
4232        let mut reader = FileReader::open(&path).expect("open reader");
4233        let err = reader
4234            .explore(&ExplorerQuery {
4235                facets: vec![b"SERVICE".to_vec(), b"SERVICE".to_vec()],
4236                limit: 0,
4237                ..ExplorerQuery::default()
4238            })
4239            .expect_err("duplicate facets rejected");
4240
4241        assert!(err.to_string().contains("must not be duplicated"));
4242    }
4243
4244    #[test]
4245    fn explorer_empty_result_keeps_requested_facet_with_no_values() {
4246        let dir = TempDir::new().expect("tempdir");
4247        let path = dir.path().join("empty-result.journal");
4248        write_entries(&path, None, &[(&[b"SERVICE=a", b"PRIORITY=3"], 1_000)]);
4249
4250        let mut reader = FileReader::open(&path).expect("open reader");
4251        let result = reader
4252            .explore(&ExplorerQuery {
4253                after_realtime_usec: Some(10_000),
4254                before_realtime_usec: Some(20_000),
4255                facets: vec![b"SERVICE".to_vec()],
4256                limit: 10,
4257                realtime_slack_usec: 0,
4258                ..ExplorerQuery::default()
4259            })
4260            .expect("explore");
4261
4262        assert!(result.rows.is_empty());
4263        assert_eq!(result.stats.rows_matched, 0);
4264        assert!(
4265            result
4266                .facets
4267                .get(b"SERVICE".as_slice())
4268                .expect("service facet")
4269                .is_empty()
4270        );
4271    }
4272
4273    #[test]
4274    fn explorer_facet_time_bounds_do_not_count_slack_rows_without_source_realtime() {
4275        let dir = TempDir::new().expect("tempdir");
4276        let path = dir.path().join("facet-time-bound.journal");
4277        write_entries(
4278            &path,
4279            None,
4280            &[
4281                (&[b"SERVICE=before"], 340_000_000),
4282                (&[b"SERVICE=inside"], 360_000_000),
4283                (&[b"SERVICE=after"], 400_000_000),
4284            ],
4285        );
4286
4287        let mut reader = FileReader::open(&path).expect("open reader");
4288        let result = reader
4289            .explore(&ExplorerQuery {
4290                after_realtime_usec: Some(350_000_000),
4291                before_realtime_usec: Some(370_000_000),
4292                facets: vec![b"SERVICE".to_vec()],
4293                limit: 0,
4294                realtime_slack_usec: 20_000_000,
4295                use_source_realtime: false,
4296                ..ExplorerQuery::default()
4297            })
4298            .expect("explore");
4299
4300        let service = result
4301            .facets
4302            .get(b"SERVICE".as_slice())
4303            .expect("service facet");
4304        assert_eq!(service.get(b"inside".as_slice()), Some(&1));
4305        assert_eq!(service.get(b"before".as_slice()), None);
4306        assert_eq!(service.get(b"after".as_slice()), None);
4307        assert_eq!(result.stats.facet_rows_matched, 1);
4308    }
4309
4310    #[test]
4311    fn explorer_fts_disables_first_value_early_stop() {
4312        let dir = TempDir::new().expect("tempdir");
4313        let path = dir.path().join("fts-no-early-stop.journal");
4314        write_entries(&path, None, &[(&[b"TAG=one", b"MESSAGE=needle"], 1_000)]);
4315
4316        let mut reader = FileReader::open(&path).expect("open reader");
4317        let result = reader
4318            .explore(&ExplorerQuery {
4319                facets: vec![b"TAG".to_vec()],
4320                fts_patterns: vec![b"needle".to_vec()],
4321                limit: 0,
4322                ..ExplorerQuery::default()
4323            })
4324            .expect("explore");
4325
4326        assert_eq!(result.stats.early_stops, 0);
4327        assert_eq!(result.stats.data_refs_seen, 2);
4328        assert_eq!(
4329            result
4330                .facets
4331                .get(b"TAG".as_slice())
4332                .and_then(|values| values.get(b"one".as_slice())),
4333            Some(&1)
4334        );
4335    }
4336
4337    #[test]
4338    fn explorer_fts_or_terms_and_negative_terms_filter_rows() {
4339        let dir = TempDir::new().expect("tempdir");
4340        let path = dir.path().join("fts-negative.journal");
4341        write_entries(
4342            &path,
4343            None,
4344            &[
4345                (&[b"TAG=alpha", b"MESSAGE=alpha keep"], 1_000),
4346                (&[b"TAG=beta", b"MESSAGE=beta keep"], 2_000),
4347                (&[b"TAG=debug", b"MESSAGE=alpha debug"], 3_000),
4348                (&[b"TAG=other", b"MESSAGE=other"], 4_000),
4349                (&[b"TAG=wild", b"MESSAGE=start middle end"], 5_000),
4350            ],
4351        );
4352
4353        let mut reader = FileReader::open(&path).expect("open reader");
4354        let result = reader
4355            .explore(&ExplorerQuery {
4356                facets: vec![b"TAG".to_vec()],
4357                fts_terms: vec![
4358                    ExplorerFtsPattern::substring(b"alpha".to_vec(), false),
4359                    ExplorerFtsPattern::substring(b"beta".to_vec(), false),
4360                    ExplorerFtsPattern::substring(b"debug".to_vec(), true),
4361                    ExplorerFtsPattern::substring(b"start*end".to_vec(), false),
4362                ],
4363                limit: 10,
4364                ..ExplorerQuery::default()
4365            })
4366            .expect("explore");
4367
4368        let tag = result.facets.get(b"TAG".as_slice()).expect("TAG facet");
4369        assert_eq!(result.rows.len(), 3);
4370        assert_eq!(tag.get(b"alpha".as_slice()), Some(&1));
4371        assert_eq!(tag.get(b"beta".as_slice()), Some(&1));
4372        assert_eq!(tag.get(b"wild".as_slice()), Some(&1));
4373        assert_eq!(tag.get(b"debug".as_slice()), None);
4374        assert_eq!(tag.get(b"other".as_slice()), None);
4375    }
4376
4377    #[test]
4378    fn explorer_auto_anchor_scans_backward_from_tail() {
4379        let dir = TempDir::new().expect("tempdir");
4380        let path = dir.path().join("backward.journal");
4381        write_entries(
4382            &path,
4383            None,
4384            &[
4385                (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
4386                (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
4387            ],
4388        );
4389
4390        let mut reader = FileReader::open(&path).expect("open reader");
4391        let query = ExplorerQuery {
4392            direction: Direction::Backward,
4393            limit: 2,
4394            ..ExplorerQuery::default()
4395        };
4396
4397        let result = reader.explore(&query).expect("explore");
4398        assert_eq!(result.rows.len(), 2);
4399        assert_eq!(result.rows[0].realtime_usec, 2_000);
4400        assert_eq!(result.rows[1].realtime_usec, 1_000);
4401    }
4402
4403    #[test]
4404    fn explorer_backward_time_bound_stops_after_slack_window() {
4405        let dir = TempDir::new().expect("tempdir");
4406        let path = dir.path().join("backward-time-bound.journal");
4407        write_entries(
4408            &path,
4409            None,
4410            &[
4411                (&[b"SERVICE=a"], 100_000_000),
4412                (&[b"SERVICE=b"], 200_000_000),
4413                (&[b"SERVICE=c"], 300_000_000),
4414                (&[b"SERVICE=d"], 400_000_000),
4415                (&[b"SERVICE=e"], 500_000_000),
4416            ],
4417        );
4418
4419        let mut reader = FileReader::open(&path).expect("open reader");
4420        let query = ExplorerQuery {
4421            after_realtime_usec: Some(350_000_000),
4422            direction: Direction::Backward,
4423            limit: 10,
4424            realtime_slack_usec: 10_000_000,
4425            ..ExplorerQuery::default()
4426        };
4427
4428        let result = reader.explore(&query).expect("explore");
4429        assert_eq!(result.rows.len(), 2);
4430        assert_eq!(result.rows[0].realtime_usec, 500_000_000);
4431        assert_eq!(result.rows[1].realtime_usec, 400_000_000);
4432        assert_eq!(result.stats.rows_examined, 2);
4433    }
4434
4435    #[test]
4436    fn explorer_histogram_and_fts_are_opt_in() {
4437        let dir = TempDir::new().expect("tempdir");
4438        let path = dir.path().join("histogram.journal");
4439        write_entries(
4440            &path,
4441            None,
4442            &[
4443                (&[b"MESSAGE=alpha", b"PRIORITY=3"], 1_000),
4444                (&[b"MESSAGE=beta", b"PRIORITY=4"], 2_000),
4445            ],
4446        );
4447
4448        let mut reader = FileReader::open(&path).expect("open reader");
4449        let query = ExplorerQuery {
4450            after_realtime_usec: Some(0),
4451            before_realtime_usec: Some(3_000),
4452            histogram: Some(b"PRIORITY".to_vec()),
4453            histogram_target_buckets: 2,
4454            fts_patterns: vec![b"alp".to_vec()],
4455            limit: 10,
4456            ..ExplorerQuery::default()
4457        };
4458
4459        let result = reader.explore(&query).expect("explore");
4460        assert_eq!(result.rows.len(), 1);
4461        assert!(result.stats.fts_scans > 0);
4462        assert_eq!(
4463            result
4464                .histogram
4465                .as_ref()
4466                .expect("histogram")
4467                .buckets
4468                .iter()
4469                .flat_map(|bucket| bucket.values.values())
4470                .sum::<u64>(),
4471            1
4472        );
4473    }
4474
4475    #[test]
4476    fn explorer_first_value_stops_after_same_data_satisfies_multiple_roles() {
4477        let dir = TempDir::new().expect("tempdir");
4478        let path = dir.path().join("same-data-multiple-roles.journal");
4479        write_entries(
4480            &path,
4481            None,
4482            &[(
4483                &[b"_SOURCE_REALTIME_TIMESTAMP=1000", b"MESSAGE=after-source"],
4484                1_000,
4485            )],
4486        );
4487
4488        let mut reader = FileReader::open(&path).expect("open reader");
4489        let result = reader
4490            .explore(&ExplorerQuery {
4491                histogram: Some(SOURCE_REALTIME_FIELD.to_vec()),
4492                histogram_target_buckets: 1,
4493                limit: 0,
4494                field_mode: ExplorerFieldMode::FirstValue,
4495                ..ExplorerQuery::default()
4496            })
4497            .expect("explore");
4498
4499        assert_eq!(result.stats.histogram_updates, 1);
4500        assert_eq!(result.stats.early_stops, 1);
4501        assert_eq!(
4502            result
4503                .histogram
4504                .as_ref()
4505                .expect("histogram")
4506                .buckets
4507                .iter()
4508                .flat_map(|bucket| bucket.values.values())
4509                .sum::<u64>(),
4510            1
4511        );
4512    }
4513}