1use super::*;
2use journal_core::file::{DataObject, offset_array::InlinedCursor};
3use std::collections::{HashMap, HashSet};
4use std::time::{Duration, Instant};
5
6const DEFAULT_HISTOGRAM_TARGET_BUCKETS: usize = 150;
7const DEFAULT_TIME_SLACK_USEC: u64 = 120_000_000;
8const EXPLORER_CONTROL_CHECK_EVERY_ROWS: u64 = 8192;
9const DEFAULT_ROWS_FULL_CHECK_EVERY_ROWS: u64 = 1;
10const EXPLORER_SAMPLING_SLOTS_MAX: usize = 1000;
11const EXPLORER_SAMPLING_RECALIBRATE_ROWS: u64 = 10_000;
12const EXPLORER_SAMPLING_ESTIMATE_AFTER_PROGRESS: f64 = 0.01;
13const SOURCE_REALTIME_FIELD: &[u8] = b"_SOURCE_REALTIME_TIMESTAMP";
14const UNSET_VALUE: &[u8] = b"-";
15const EXPLORER_UNSAMPLED_VALUE: &[u8] = b"[unsampled]";
16const EXPLORER_ESTIMATED_VALUE: &[u8] = b"[estimated]";
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum ExplorerAnchor {
20 Auto,
21 Head,
22 Tail,
23 Realtime(u64),
24}
25
26impl Default for ExplorerAnchor {
27 fn default() -> Self {
28 Self::Auto
29 }
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum ExplorerFieldMode {
34 AllValues,
35 FirstValue,
36}
37
38impl Default for ExplorerFieldMode {
39 fn default() -> Self {
40 Self::FirstValue
41 }
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45#[non_exhaustive]
46pub enum ExplorerStrategy {
47 Traversal,
48 Index,
49 Compare,
50}
51
52impl Default for ExplorerStrategy {
53 fn default() -> Self {
54 Self::Traversal
55 }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct ExplorerFilter {
60 pub field: Vec<u8>,
61 pub values: Vec<Vec<u8>>,
62}
63
64impl ExplorerFilter {
65 pub fn new(
66 field: impl Into<Vec<u8>>,
67 values: impl IntoIterator<Item = impl Into<Vec<u8>>>,
68 ) -> Self {
69 Self {
70 field: field.into(),
71 values: values.into_iter().map(Into::into).collect(),
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
77pub struct ExplorerQuery {
78 pub after_realtime_usec: Option<u64>,
79 pub before_realtime_usec: Option<u64>,
80 pub anchor: ExplorerAnchor,
81 pub direction: Direction,
82 pub limit: usize,
83 pub filters: Vec<ExplorerFilter>,
84 pub facets: Vec<Vec<u8>>,
85 pub histogram: Option<Vec<u8>>,
86 pub histogram_after_realtime_usec: Option<u64>,
87 pub histogram_before_realtime_usec: Option<u64>,
88 pub histogram_target_buckets: usize,
89 pub fts_terms: Vec<ExplorerFtsPattern>,
90 pub fts_patterns: Vec<Vec<u8>>,
91 pub fts_negative_patterns: Vec<Vec<u8>>,
92 pub field_mode: ExplorerFieldMode,
93 pub exclude_facet_field_filters: bool,
94 pub use_source_realtime: bool,
95 pub realtime_slack_usec: u64,
96 pub stop_when_rows_full: bool,
97 pub stop_when_rows_full_check_every: u64,
98 pub sampling: Option<ExplorerSampling>,
99 #[doc(hidden)]
102 pub debug_collect_column_fields_by_row_traversal: bool,
103}
104
105impl Default for ExplorerQuery {
106 fn default() -> Self {
107 Self {
108 after_realtime_usec: None,
109 before_realtime_usec: None,
110 anchor: ExplorerAnchor::Auto,
111 direction: Direction::Forward,
112 limit: 200,
113 filters: Vec::new(),
114 facets: Vec::new(),
115 histogram: None,
116 histogram_after_realtime_usec: None,
117 histogram_before_realtime_usec: None,
118 histogram_target_buckets: DEFAULT_HISTOGRAM_TARGET_BUCKETS,
119 fts_terms: Vec::new(),
120 fts_patterns: Vec::new(),
121 fts_negative_patterns: Vec::new(),
122 field_mode: ExplorerFieldMode::FirstValue,
123 exclude_facet_field_filters: true,
124 use_source_realtime: true,
125 realtime_slack_usec: DEFAULT_TIME_SLACK_USEC,
126 stop_when_rows_full: false,
127 stop_when_rows_full_check_every: DEFAULT_ROWS_FULL_CHECK_EVERY_ROWS,
128 sampling: None,
129 debug_collect_column_fields_by_row_traversal: false,
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub struct ExplorerSampling {
136 pub budget: u64,
137 pub matched_files: u64,
138 pub file_head_realtime_usec: u64,
139 pub file_tail_realtime_usec: u64,
140 pub file_head_seqnum: u64,
141 pub file_tail_seqnum: u64,
142 pub file_entries: u64,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct ExplorerFtsPattern {
147 pub parts: Vec<Vec<u8>>,
148 pub negative: bool,
149}
150
151impl ExplorerFtsPattern {
152 pub fn substring(pattern: impl Into<Vec<u8>>, negative: bool) -> Self {
153 let pattern = pattern.into();
154 let parts = pattern
155 .split(|byte| *byte == b'*')
156 .filter(|part| !part.is_empty())
157 .map(|part| part.to_vec())
158 .collect();
159 Self { parts, negative }
160 }
161
162 fn matches(&self, value: &[u8]) -> bool {
163 if value.is_empty() {
164 return false;
165 }
166 if self.parts.is_empty() {
167 return true;
168 }
169
170 let mut haystack = value;
171 for part in &self.parts {
172 let Some(index) = find_ascii_case_insensitive(haystack, part) else {
173 return false;
174 };
175 haystack = &haystack[index.saturating_add(part.len())..];
176 }
177 true
178 }
179}
180
181impl ExplorerQuery {
182 pub fn with_filter(
183 mut self,
184 field: impl Into<Vec<u8>>,
185 values: impl IntoIterator<Item = impl Into<Vec<u8>>>,
186 ) -> Self {
187 self.filters.push(ExplorerFilter::new(field, values));
188 self
189 }
190
191 pub fn with_facet(mut self, field: impl Into<Vec<u8>>) -> Self {
192 self.facets.push(field.into());
193 self
194 }
195
196 pub fn with_histogram(mut self, field: impl Into<Vec<u8>>) -> Self {
197 self.histogram = Some(field.into());
198 self
199 }
200
201 pub fn with_fts_pattern(mut self, pattern: impl Into<Vec<u8>>) -> Self {
202 let pattern = pattern.into();
203 self.fts_terms
204 .push(ExplorerFtsPattern::substring(pattern.clone(), false));
205 self.fts_patterns.push(pattern);
206 self
207 }
208
209 pub fn with_fts_negative_pattern(mut self, pattern: impl Into<Vec<u8>>) -> Self {
210 let pattern = pattern.into();
211 self.fts_terms
212 .push(ExplorerFtsPattern::substring(pattern.clone(), true));
213 self.fts_negative_patterns.push(pattern);
214 self
215 }
216}
217
218#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize)]
219pub struct ExplorerStats {
220 pub rows_examined: u64,
221 pub rows_matched: u64,
222 pub facet_rows_matched: u64,
223 pub rows_returned: u64,
224 pub rows_unsampled: u64,
225 pub rows_estimated: u64,
226 pub sampling_sampled: u64,
227 pub sampling_unsampled: u64,
228 pub sampling_estimated: u64,
229 pub last_realtime_usec: u64,
230 pub max_source_realtime_delta_usec: u64,
231 pub data_refs_seen: u64,
232 pub data_refs_skipped: u64,
233 pub data_payloads_loaded: u64,
234 pub data_objects_classified: u64,
235 pub data_cache_hits: u64,
236 pub data_cache_misses: u64,
237 pub payloads_decompressed: u64,
238 pub fts_scans: u64,
239 pub facet_updates: u64,
240 pub histogram_updates: u64,
241 pub returned_row_expansions: u64,
242 pub early_stop_opportunities: u64,
243 pub early_stops: u64,
244}
245
246#[derive(Debug, Clone)]
247pub struct ExplorerRow {
248 pub realtime_usec: u64,
249 pub cursor: String,
250 pub payloads: Vec<Vec<u8>>,
251}
252
253#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254pub(crate) enum ExplorerRowPayloadMode {
255 Expand,
256 CursorOnly,
257}
258
259#[derive(Debug, Clone)]
260pub struct ExplorerHistogramBucket {
261 pub start_realtime_usec: u64,
262 pub end_realtime_usec: u64,
263 pub values: HashMap<Vec<u8>, u64>,
264}
265
266#[derive(Debug, Clone)]
267pub struct ExplorerHistogram {
268 pub field: Vec<u8>,
269 pub buckets: Vec<ExplorerHistogramBucket>,
270}
271
272#[derive(Debug, Clone, Default)]
273pub struct ExplorerComparison {
274 pub traversal_duration: Duration,
275 pub index_duration: Duration,
276 pub traversal_stats: ExplorerStats,
277 pub index_stats: ExplorerStats,
278}
279
280#[derive(Debug, Clone, Default)]
281pub struct ExplorerResult {
282 pub rows: Vec<ExplorerRow>,
283 pub facets: HashMap<Vec<u8>, HashMap<Vec<u8>, u64>>,
284 pub histogram: Option<ExplorerHistogram>,
285 pub column_fields: HashSet<Vec<u8>>,
286 pub stats: ExplorerStats,
287 pub comparison: Option<ExplorerComparison>,
288}
289
290#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
291pub enum ExplorerStopReason {
292 TimedOut,
293 Cancelled,
294}
295
296#[derive(Debug, Clone)]
297pub struct ExplorerProgress {
298 pub stats: ExplorerStats,
299 pub elapsed: Duration,
300}
301
302pub struct ExplorerControl<'a> {
303 deadline: Option<Instant>,
304 cancellation: Option<&'a dyn Fn() -> bool>,
305 progress: Option<&'a mut dyn FnMut(ExplorerProgress)>,
306 candidate_row: Option<&'a mut dyn FnMut(u64) -> bool>,
307 adjust_realtime: Option<&'a mut dyn FnMut(u64) -> u64>,
308 matched_row: Option<&'a mut dyn FnMut(u64, u64) -> bool>,
309 sampling: Option<&'a mut ExplorerSamplingState>,
310 progress_interval: Duration,
311 started: Instant,
312 last_progress: Instant,
313 next_check_rows: u64,
314 stop_reason: Option<ExplorerStopReason>,
315}
316
317impl<'a> ExplorerControl<'a> {
318 pub fn new() -> Self {
319 let now = Instant::now();
320 Self {
321 deadline: None,
322 cancellation: None,
323 progress: None,
324 candidate_row: None,
325 adjust_realtime: None,
326 matched_row: None,
327 sampling: None,
328 progress_interval: Duration::from_millis(250),
329 started: now,
330 last_progress: now,
331 next_check_rows: EXPLORER_CONTROL_CHECK_EVERY_ROWS,
332 stop_reason: None,
333 }
334 }
335
336 pub fn set_deadline(&mut self, deadline: Option<Instant>) {
337 self.deadline = deadline;
338 }
339
340 pub fn set_cancellation_callback(&mut self, cancellation: Option<&'a dyn Fn() -> bool>) {
341 self.cancellation = cancellation;
342 }
343
344 pub fn set_progress_callback(&mut self, progress: Option<&'a mut dyn FnMut(ExplorerProgress)>) {
345 self.progress = progress;
346 }
347
348 pub(crate) fn set_candidate_row_callback(
349 &mut self,
350 candidate_row: Option<&'a mut dyn FnMut(u64) -> bool>,
351 ) {
352 self.candidate_row = candidate_row;
353 }
354
355 pub(crate) fn set_realtime_adjust_callback(
356 &mut self,
357 adjust_realtime: Option<&'a mut dyn FnMut(u64) -> u64>,
358 ) {
359 self.adjust_realtime = adjust_realtime;
360 }
361
362 pub fn set_matched_row_callback(
363 &mut self,
364 matched_row: Option<&'a mut dyn FnMut(u64, u64) -> bool>,
365 ) {
366 self.matched_row = matched_row;
367 }
368
369 pub(crate) fn set_sampling_state(&mut self, sampling: Option<&'a mut ExplorerSamplingState>) {
370 self.sampling = sampling;
371 }
372
373 pub fn set_progress_interval(&mut self, interval: Duration) {
374 self.progress_interval = interval;
375 }
376
377 pub fn stop_reason(&self) -> Option<ExplorerStopReason> {
378 self.stop_reason
379 }
380
381 fn should_stop_after_rows(&mut self, rows_seen: u64, stats: &ExplorerStats) -> bool {
382 if self.stop_reason.is_some() {
383 return true;
384 }
385 if rows_seen < self.next_check_rows {
386 return false;
387 }
388 self.next_check_rows = rows_seen.saturating_add(EXPLORER_CONTROL_CHECK_EVERY_ROWS);
389 self.check(stats)
390 }
391
392 fn check(&mut self, stats: &ExplorerStats) -> bool {
393 let now = Instant::now();
394 if now.duration_since(self.last_progress) >= self.progress_interval {
395 self.emit_progress(stats, now);
396 }
397 if self.cancellation.is_some_and(|is_cancelled| is_cancelled()) {
398 self.stop_reason = Some(ExplorerStopReason::Cancelled);
399 self.emit_progress(stats, now);
400 return true;
401 }
402 if self.deadline.is_some_and(|deadline| now >= deadline) {
403 self.stop_reason = Some(ExplorerStopReason::TimedOut);
404 self.emit_progress(stats, now);
405 return true;
406 }
407 false
408 }
409
410 fn emit_progress(&mut self, stats: &ExplorerStats, now: Instant) {
411 self.last_progress = now;
412 if let Some(progress) = self.progress.as_deref_mut() {
413 progress(ExplorerProgress {
414 stats: stats.clone(),
415 elapsed: now.duration_since(self.started),
416 });
417 }
418 }
419
420 fn emit_matched_row(&mut self, realtime_usec: u64, rows_matched: u64) -> bool {
421 if let Some(matched_row) = self.matched_row.as_deref_mut() {
422 return matched_row(realtime_usec, rows_matched);
423 }
424 false
425 }
426
427 fn adjust_realtime(&mut self, realtime_usec: u64) -> u64 {
428 self.adjust_realtime
429 .as_deref_mut()
430 .map_or(realtime_usec, |adjust_realtime| {
431 adjust_realtime(realtime_usec)
432 })
433 }
434}
435
436impl Default for ExplorerControl<'_> {
437 fn default() -> Self {
438 Self::new()
439 }
440}
441
442#[derive(Debug, Clone, Copy, PartialEq, Eq)]
443enum ExplorerSamplingDecision {
444 Full {
445 sampled: bool,
446 },
447 SkipFields,
448 StopAndEstimate {
449 remaining_rows: u64,
450 from_realtime_usec: u64,
451 to_realtime_usec: u64,
452 },
453}
454
455#[derive(Debug)]
456pub(crate) struct ExplorerSamplingState {
457 start_realtime_usec: u64,
458 end_realtime_usec: u64,
459 file_head_realtime_usec: u64,
460 file_tail_realtime_usec: u64,
461 file_head_seqnum: u64,
462 file_tail_seqnum: u64,
463 file_entries: u64,
464 first_realtime_usec: Option<u64>,
465 step_realtime_usec: u64,
466 enable_after_samples: u64,
467 per_file_enable_after_samples: u64,
468 per_slot_enable_after_samples: u64,
469 sampled: u64,
470 per_file_sampled: u64,
471 per_file_unsampled: u64,
472 per_file_every: u64,
473 per_file_skipped: u64,
474 per_file_recalibrate: u64,
475 per_slot_sampled: Vec<u64>,
476 per_slot_unsampled: Vec<u64>,
477 matched_files: u64,
478 direction: Direction,
479}
480
481impl ExplorerSamplingState {
482 pub(crate) fn for_query(
483 query: &ExplorerQuery,
484 histogram_bucket_count: Option<usize>,
485 ) -> Option<Self> {
486 let sampling = query.sampling?;
487 let start_realtime_usec = query.after_realtime_usec?;
488 let end_realtime_usec = query.before_realtime_usec?;
489 if sampling.budget == 0
490 || sampling.matched_files == 0
491 || start_realtime_usec >= end_realtime_usec
492 {
493 return None;
494 }
495
496 let slots = histogram_bucket_count
497 .unwrap_or(query.histogram_target_buckets)
498 .clamp(2, EXPLORER_SAMPLING_SLOTS_MAX);
499 let delta = end_realtime_usec.saturating_sub(start_realtime_usec);
500 let step_realtime_usec = (delta / slots as u64).saturating_sub(1).max(1);
501 let per_file_enable_after_samples =
502 ((sampling.budget / 4) / sampling.matched_files.max(1)).max(query.limit as u64);
503 let per_slot_enable_after_samples =
504 ((sampling.budget / 4) / slots as u64).max(query.limit as u64);
505
506 Some(Self {
507 start_realtime_usec,
508 end_realtime_usec,
509 file_head_realtime_usec: sampling.file_head_realtime_usec,
510 file_tail_realtime_usec: sampling.file_tail_realtime_usec,
511 file_head_seqnum: sampling.file_head_seqnum,
512 file_tail_seqnum: sampling.file_tail_seqnum,
513 file_entries: sampling.file_entries,
514 first_realtime_usec: None,
515 step_realtime_usec,
516 enable_after_samples: sampling.budget / 2,
517 per_file_enable_after_samples,
518 per_slot_enable_after_samples,
519 sampled: 0,
520 per_file_sampled: 0,
521 per_file_unsampled: 0,
522 per_file_every: 0,
523 per_file_skipped: 0,
524 per_file_recalibrate: 0,
525 per_slot_sampled: vec![0; slots],
526 per_slot_unsampled: vec![0; slots],
527 matched_files: sampling.matched_files.max(1),
528 direction: query.direction,
529 })
530 }
531
532 fn begin_file(&mut self, sampling: ExplorerSampling) {
533 self.file_head_realtime_usec = sampling.file_head_realtime_usec;
534 self.file_tail_realtime_usec = sampling.file_tail_realtime_usec;
535 self.file_head_seqnum = sampling.file_head_seqnum;
536 self.file_tail_seqnum = sampling.file_tail_seqnum;
537 self.file_entries = sampling.file_entries;
538 self.first_realtime_usec = None;
539 self.per_file_sampled = 0;
540 self.per_file_unsampled = 0;
541 self.per_file_every = 0;
542 self.per_file_skipped = 0;
543 self.per_file_recalibrate = 0;
544 }
545
546 fn decide(
547 &mut self,
548 realtime_usec: u64,
549 seqnum: u64,
550 candidate_to_keep: bool,
551 ) -> ExplorerSamplingDecision {
552 if self.first_realtime_usec.is_none() {
553 self.first_realtime_usec = Some(realtime_usec);
554 }
555 if candidate_to_keep {
556 return ExplorerSamplingDecision::Full { sampled: false };
557 }
558
559 let slot = self.slot_for_realtime(realtime_usec);
560 let should_sample = if self.sampled < self.enable_after_samples
561 || self.per_file_sampled < self.per_file_enable_after_samples
562 || self.per_slot_sampled[slot] < self.per_slot_enable_after_samples
563 {
564 true
565 } else if self.per_file_recalibrate >= EXPLORER_SAMPLING_RECALIBRATE_ROWS
566 || self.per_file_every == 0
567 {
568 self.recalibrate(realtime_usec, seqnum);
569 true
570 } else if self.per_file_skipped >= self.per_file_every {
571 self.per_file_skipped = 0;
572 true
573 } else {
574 self.per_file_skipped = self.per_file_skipped.saturating_add(1);
575 false
576 };
577
578 if should_sample {
579 self.sampled = self.sampled.saturating_add(1);
580 self.per_file_sampled = self.per_file_sampled.saturating_add(1);
581 self.per_slot_sampled[slot] = self.per_slot_sampled[slot].saturating_add(1);
582 return ExplorerSamplingDecision::Full { sampled: true };
583 }
584
585 self.per_file_recalibrate = self.per_file_recalibrate.saturating_add(1);
586 self.per_file_unsampled = self.per_file_unsampled.saturating_add(1);
587 self.per_slot_unsampled[slot] = self.per_slot_unsampled[slot].saturating_add(1);
588
589 if self.per_file_unsampled > self.per_file_sampled
590 && self.progress_by_time(realtime_usec) > EXPLORER_SAMPLING_ESTIMATE_AFTER_PROGRESS
591 {
592 let remaining_rows = self.estimate_remaining_rows(realtime_usec, seqnum);
593 let (from_realtime_usec, to_realtime_usec) = self.remaining_range(realtime_usec);
594 return ExplorerSamplingDecision::StopAndEstimate {
595 remaining_rows,
596 from_realtime_usec,
597 to_realtime_usec,
598 };
599 }
600
601 ExplorerSamplingDecision::SkipFields
602 }
603
604 fn slot_for_realtime(&self, realtime_usec: u64) -> usize {
605 let clamped = realtime_usec.clamp(self.start_realtime_usec, self.end_realtime_usec);
606 let slot =
607 (clamped.saturating_sub(self.start_realtime_usec) / self.step_realtime_usec) as usize;
608 slot.min(self.per_slot_sampled.len().saturating_sub(1))
609 }
610
611 fn recalibrate(&mut self, realtime_usec: u64, seqnum: u64) {
612 let remaining_rows = self.estimate_remaining_rows(realtime_usec, seqnum);
613 let wanted_samples = (self.enable_after_samples / self.matched_files).max(1);
614 self.per_file_every = (remaining_rows / wanted_samples).max(1);
615 self.per_file_recalibrate = 0;
616 }
617
618 fn estimate_remaining_rows(&self, realtime_usec: u64, seqnum: u64) -> u64 {
619 if let Some(remaining) = self.estimate_remaining_rows_by_seqnum(seqnum) {
620 return remaining;
621 }
622 self.estimate_remaining_rows_by_time(realtime_usec)
623 }
624
625 fn estimate_remaining_rows_by_seqnum(&self, seqnum: u64) -> Option<u64> {
626 self.validate_seqnum_estimate_inputs(seqnum)?;
627 let scanned_rows = self.scanned_file_rows();
628 let seqnum_span = self.seqnum_span_so_far(seqnum)?;
629 if seqnum_span == 0 {
630 return None;
631 }
632 let proportion_of_all_lines_so_far =
633 bounded_positive_proportion(scanned_rows as f64 / seqnum_span as f64)?;
634 let expected_matching_logs =
635 (proportion_of_all_lines_so_far * self.file_entries as f64) as u64;
636 if expected_matching_logs == 0 {
637 return None;
638 }
639 Some(expected_matching_logs.saturating_sub(scanned_rows).max(1))
643 }
644
645 fn validate_seqnum_estimate_inputs(&self, seqnum: u64) -> Option<()> {
646 (self.file_entries != 0
647 && self.file_head_seqnum != 0
648 && self.file_tail_seqnum != 0
649 && seqnum != 0)
650 .then_some(())
651 }
652
653 fn scanned_file_rows(&self) -> u64 {
654 self.per_file_sampled
655 .saturating_add(self.per_file_unsampled)
656 .max(1)
657 }
658
659 fn seqnum_span_so_far(&self, seqnum: u64) -> Option<u64> {
660 match self.direction {
661 Direction::Forward => seqnum.checked_sub(self.file_head_seqnum),
662 Direction::Backward => self.file_tail_seqnum.checked_sub(seqnum),
663 }
664 }
665
666 fn estimate_remaining_rows_by_time(&self, realtime_usec: u64) -> u64 {
667 let scanned_rows = self.scanned_file_rows();
668 let (after, before) = self.overlapping_timeframe(realtime_usec);
669 let total_time = self
670 .remaining_time_bounds(realtime_usec, after, before)
671 .0
672 .max(1);
673 let remaining_time = self.remaining_time_bounds(realtime_usec, after, before).1;
674 let elapsed = total_time.saturating_sub(remaining_time).max(1);
675 let mut proportion_by_time = elapsed as f64 / total_time as f64;
676 if proportion_by_time == 0.0 || proportion_by_time > 1.0 || !proportion_by_time.is_finite()
677 {
678 proportion_by_time = 1.0;
679 }
680 let mut expected_total = (scanned_rows as f64 / proportion_by_time) as u64;
681 if self.file_entries != 0 && expected_total > self.file_entries {
682 expected_total = self.file_entries;
683 }
684 expected_total.saturating_sub(scanned_rows).max(1)
685 }
686
687 fn progress_by_time(&self, realtime_usec: u64) -> f64 {
688 let (after, before) = self.overlapping_timeframe(realtime_usec);
689 let total_time = before.saturating_sub(after).max(1);
690 let elapsed = match self.direction {
691 Direction::Forward => realtime_usec.saturating_sub(after),
692 Direction::Backward => before.saturating_sub(realtime_usec),
693 }
694 .min(total_time);
695 elapsed as f64 / total_time as f64
696 }
697
698 fn overlapping_timeframe(&self, realtime_usec: u64) -> (u64, u64) {
699 match self.direction {
700 Direction::Forward => {
701 let mut oldest = self
702 .first_realtime_usec
703 .or((self.file_head_realtime_usec != 0).then_some(self.file_head_realtime_usec))
704 .unwrap_or(self.start_realtime_usec);
705 let mut newest = if self.file_tail_realtime_usec != 0 {
706 self.end_realtime_usec.min(self.file_tail_realtime_usec)
707 } else {
708 self.end_realtime_usec
709 };
710 if newest <= oldest {
711 newest = oldest.saturating_add(1);
712 }
713 if realtime_usec < oldest {
714 oldest = realtime_usec.saturating_sub(1);
715 }
716 (oldest, newest)
717 }
718 Direction::Backward => {
719 let mut newest = self
720 .first_realtime_usec
721 .or((self.file_tail_realtime_usec != 0).then_some(self.file_tail_realtime_usec))
722 .unwrap_or(self.end_realtime_usec);
723 let oldest = if self.file_head_realtime_usec != 0 {
724 self.start_realtime_usec.max(self.file_head_realtime_usec)
725 } else {
726 self.start_realtime_usec
727 };
728 if newest <= oldest {
729 newest = oldest.saturating_add(1);
730 }
731 if newest < realtime_usec {
732 newest = realtime_usec.saturating_add(1);
733 }
734 (oldest, newest)
735 }
736 }
737 }
738
739 fn remaining_range(&self, realtime_usec: u64) -> (u64, u64) {
740 let (after, before) = self.overlapping_timeframe(realtime_usec);
741 let (_, _, remaining_start, remaining_end) =
742 self.remaining_time_details(realtime_usec, after, before);
743 (remaining_start, remaining_end)
744 }
745
746 fn remaining_time_bounds(&self, realtime_usec: u64, after: u64, before: u64) -> (u64, u64) {
747 let (total, remaining, _, _) = self.remaining_time_details(realtime_usec, after, before);
748 (total, remaining)
749 }
750
751 fn remaining_time_details(
752 &self,
753 realtime_usec: u64,
754 mut after: u64,
755 mut before: u64,
756 ) -> (u64, u64, u64, u64) {
757 if realtime_usec <= after {
758 after = realtime_usec.saturating_sub(1);
759 }
760 if realtime_usec >= before {
761 before = realtime_usec.saturating_add(1);
762 }
763 if before <= after {
764 before = after.saturating_add(1);
765 }
766 let (remaining_start, remaining_end) = match self.direction {
767 Direction::Forward => (realtime_usec, before),
768 Direction::Backward => (after, realtime_usec),
769 };
770 (
771 before.saturating_sub(after).max(1),
772 remaining_end.saturating_sub(remaining_start),
773 remaining_start,
774 remaining_end,
775 )
776 }
777}
778
779pub(crate) fn histogram_bucket_count_for_query(query: &ExplorerQuery) -> Option<usize> {
780 query
781 .histogram
782 .as_deref()
783 .map(|field| new_histogram(field, query).buckets.len())
784}
785
786#[derive(Default)]
787struct RowScan {
788 timestamp: Option<u64>,
789 fts_matches: bool,
790 fts_negative_match: bool,
791 column_fields: Vec<Vec<u8>>,
792}
793
794const FACET_PUBLIC: u8 = 0x01;
795const FACET_HISTOGRAM: u8 = 0x02;
796const FACET_SOURCE_REALTIME: u8 = 0x04;
797
798#[derive(Clone, Copy, Debug, PartialEq, Eq)]
799enum OffsetClass {
800 Irrelevant,
801 FtsMatch,
802 FtsNegativeMatch,
803 Value(usize),
804}
805
806impl OffsetClass {
807 const IRRELEVANT_RAW: usize = 1;
808 const FTS_MATCH_RAW: usize = 2;
809 const FTS_NEGATIVE_MATCH_RAW: usize = 3;
810 const VALUE_BASE: usize = 4;
811
812 fn to_raw(self) -> usize {
813 match self {
814 Self::Irrelevant => Self::IRRELEVANT_RAW,
815 Self::FtsMatch => Self::FTS_MATCH_RAW,
816 Self::FtsNegativeMatch => Self::FTS_NEGATIVE_MATCH_RAW,
817 Self::Value(index) => Self::VALUE_BASE.saturating_add(index),
818 }
819 }
820
821 fn from_raw(raw: usize) -> Self {
822 match raw {
823 Self::IRRELEVANT_RAW => Self::Irrelevant,
824 Self::FTS_MATCH_RAW => Self::FtsMatch,
825 Self::FTS_NEGATIVE_MATCH_RAW => Self::FtsNegativeMatch,
826 raw => Self::Value(raw.saturating_sub(Self::VALUE_BASE)),
827 }
828 }
829}
830
831#[derive(Clone, Copy, Debug, Default)]
832struct OffsetClassSlot {
833 offset: u64,
834 class: usize,
835}
836
837#[derive(Debug)]
838struct OffsetClassCache {
839 slots: Vec<OffsetClassSlot>,
840 len: usize,
841}
842
843impl Default for OffsetClassCache {
844 fn default() -> Self {
845 Self {
846 slots: vec![OffsetClassSlot::default(); 256],
847 len: 0,
848 }
849 }
850}
851
852impl OffsetClassCache {
853 fn lookup(&self, offset: NonZeroU64) -> Option<OffsetClass> {
854 let mask = self.slots.len().saturating_sub(1);
855 let mut index = offset_slot(offset.get()) & mask;
856 for _ in 0..self.slots.len() {
857 let slot = self.slots[index];
858 if slot.offset == 0 {
859 return None;
860 }
861 if slot.offset == offset.get() {
862 return Some(OffsetClass::from_raw(slot.class));
863 }
864 index = (index + 1) & mask;
865 }
866 None
867 }
868
869 fn insert(&mut self, offset: NonZeroU64, class: OffsetClass) {
870 if (self.len + 1).saturating_mul(4) >= self.slots.len().saturating_mul(3) {
871 self.grow();
872 }
873 self.insert_raw(offset.get(), class.to_raw());
874 }
875
876 fn grow(&mut self) {
877 let new_len = self.slots.len().saturating_mul(2).max(256);
878 let old = std::mem::replace(&mut self.slots, vec![OffsetClassSlot::default(); new_len]);
879 self.len = 0;
880 for slot in old {
881 if slot.offset != 0 {
882 self.insert_raw(slot.offset, slot.class);
883 }
884 }
885 }
886
887 fn insert_raw(&mut self, offset: u64, class: usize) {
888 let mask = self.slots.len().saturating_sub(1);
889 let mut index = offset_slot(offset) & mask;
890 loop {
891 if self.slots[index].offset == 0 {
892 self.slots[index] = OffsetClassSlot { offset, class };
893 self.len += 1;
894 return;
895 }
896 if self.slots[index].offset == offset {
897 self.slots[index].class = class;
898 return;
899 }
900 index = (index + 1) & mask;
901 }
902 }
903}
904
905fn offset_slot(offset: u64) -> usize {
906 let mut value = offset >> 3;
907 value ^= value >> 33;
908 value = value.wrapping_mul(0xff51afd7ed558ccd);
909 value ^= value >> 33;
910 value as usize
911}
912
913struct ExplorerAccumulator {
914 field_lookup: HashMap<Vec<u8>, usize>,
915 fields: Vec<Vec<u8>>,
916 flags: Vec<u8>,
917 last_seen_row_ids: Vec<u64>,
918 unset_counts: Vec<u64>,
919 values_by_field: Vec<Vec<usize>>,
920 value_counts: Vec<u64>,
921 value_field_indices: Vec<usize>,
922 value_labels: Vec<Vec<u8>>,
923 value_fts_matches: Vec<bool>,
924 value_source_realtime: Vec<Option<u64>>,
925 value_histogram_buckets: Vec<Option<Vec<u64>>>,
926 field_histogram_unset_buckets: Vec<Option<Vec<u64>>>,
927 offset_cache: OffsetClassCache,
928 histogram_start_realtime_usec: u64,
929 histogram_bucket_width_usec: u64,
930 histogram_bucket_count: usize,
931 required_identity_count: usize,
932}
933
934impl ExplorerAccumulator {
935 fn for_main(query: &ExplorerQuery, histogram: Option<&ExplorerHistogram>) -> Self {
936 Self::for_combined(query, &[], histogram)
937 }
938
939 fn for_facets(
940 query: &ExplorerQuery,
941 facet_indices: &[usize],
942 include_source_realtime: bool,
943 ) -> Self {
944 let mut out = Self::new(None);
945 for facet_index in facet_indices {
946 if let Some(field) = query.facets.get(*facet_index) {
947 out.add_field(field, FACET_PUBLIC);
948 }
949 }
950 if include_source_realtime {
951 out.add_field(SOURCE_REALTIME_FIELD, FACET_SOURCE_REALTIME);
952 }
953 out
954 }
955
956 fn for_combined(
957 query: &ExplorerQuery,
958 facet_indices: &[usize],
959 histogram: Option<&ExplorerHistogram>,
960 ) -> Self {
961 let mut out = Self::new(histogram);
962 if let Some(field) = &query.histogram {
963 out.add_field(field, FACET_HISTOGRAM);
964 }
965 for facet_index in facet_indices {
966 if let Some(field) = query.facets.get(*facet_index) {
967 out.add_field(field, FACET_PUBLIC);
968 }
969 }
970 if query_needs_source_realtime_main(query) || facet_pass_needs_source_realtime(query) {
971 out.add_field(SOURCE_REALTIME_FIELD, FACET_SOURCE_REALTIME);
972 }
973 out
974 }
975
976 fn new(histogram: Option<&ExplorerHistogram>) -> Self {
977 Self {
978 field_lookup: HashMap::new(),
979 fields: Vec::new(),
980 flags: Vec::new(),
981 last_seen_row_ids: Vec::new(),
982 unset_counts: Vec::new(),
983 values_by_field: Vec::new(),
984 value_counts: Vec::new(),
985 value_field_indices: Vec::new(),
986 value_labels: Vec::new(),
987 value_fts_matches: Vec::new(),
988 value_source_realtime: Vec::new(),
989 value_histogram_buckets: Vec::new(),
990 field_histogram_unset_buckets: Vec::new(),
991 offset_cache: OffsetClassCache::default(),
992 histogram_start_realtime_usec: histogram
993 .and_then(|histogram| histogram.buckets.first())
994 .map(|bucket| bucket.start_realtime_usec)
995 .unwrap_or_default(),
996 histogram_bucket_width_usec: histogram
997 .and_then(|histogram| histogram.buckets.first())
998 .map(|bucket| {
999 bucket
1000 .end_realtime_usec
1001 .saturating_sub(bucket.start_realtime_usec)
1002 .max(1)
1003 })
1004 .unwrap_or(1),
1005 histogram_bucket_count: histogram
1006 .map(|histogram| histogram.buckets.len())
1007 .unwrap_or_default(),
1008 required_identity_count: 0,
1009 }
1010 }
1011
1012 fn add_field(&mut self, field: &[u8], flags: u8) {
1013 if let Some(index) = self.field_lookup.get(field).copied() {
1014 let had_required = self.flags[index] != 0;
1015 self.flags[index] |= flags;
1016 if flags & FACET_HISTOGRAM != 0 && self.field_histogram_unset_buckets[index].is_none() {
1017 self.field_histogram_unset_buckets[index] =
1018 Some(vec![0; self.histogram_bucket_count]);
1019 }
1020 if !had_required && self.flags[index] != 0 {
1021 self.required_identity_count += 1;
1022 }
1023 return;
1024 }
1025
1026 let index = self.fields.len();
1027 self.field_lookup.insert(field.to_vec(), index);
1028 self.fields.push(field.to_vec());
1029 self.flags.push(flags);
1030 self.last_seen_row_ids.push(0);
1031 self.unset_counts.push(0);
1032 self.values_by_field.push(Vec::new());
1033 self.field_histogram_unset_buckets
1034 .push((flags & FACET_HISTOGRAM != 0).then(|| vec![0; self.histogram_bucket_count]));
1035 if flags != 0 {
1036 self.required_identity_count += 1;
1037 }
1038 }
1039
1040 fn add_value(
1041 &mut self,
1042 field_index: usize,
1043 _data_offset: NonZeroU64,
1044 value: &[u8],
1045 fts_matches: bool,
1046 ) -> usize {
1047 let value_index = self.value_counts.len();
1048 let flags = self.flags[field_index];
1049 self.value_counts.push(0);
1050 self.value_field_indices.push(field_index);
1051 self.value_labels.push(value.to_vec());
1052 self.value_fts_matches.push(fts_matches);
1053 self.value_source_realtime
1054 .push(if flags & FACET_SOURCE_REALTIME != 0 {
1055 parse_source_realtime(value)
1056 } else {
1057 None
1058 });
1059 self.value_histogram_buckets
1060 .push((flags & FACET_HISTOGRAM != 0).then(|| vec![0; self.histogram_bucket_count]));
1061 self.values_by_field[field_index].push(value_index);
1062 value_index
1063 }
1064
1065 fn mark_field_seen(&mut self, field_index: usize, row_id: u64) -> bool {
1066 if self.last_seen_row_ids[field_index] == row_id {
1069 return false;
1070 }
1071 self.last_seen_row_ids[field_index] = row_id;
1072 true
1073 }
1074
1075 fn apply_value(
1076 &mut self,
1077 value_index: usize,
1078 realtime_usec: Option<u64>,
1079 stats: &mut ExplorerStats,
1080 ) {
1081 let field_index = self.value_field_indices[value_index];
1082 let flags = self.flags[field_index];
1083 if flags & FACET_PUBLIC != 0 {
1084 self.value_counts[value_index] = self.value_counts[value_index].saturating_add(1);
1085 stats.facet_updates = stats.facet_updates.saturating_add(1);
1086 }
1087 if flags & FACET_HISTOGRAM != 0 {
1088 if let (Some(realtime_usec), Some(buckets)) = (
1089 realtime_usec,
1090 self.value_histogram_buckets[value_index].as_mut(),
1091 ) {
1092 if let Some(bucket_index) = histogram_bucket_index_from_bounds(
1093 realtime_usec,
1094 self.histogram_start_realtime_usec,
1095 self.histogram_bucket_width_usec,
1096 buckets.len(),
1097 ) {
1098 buckets[bucket_index] = buckets[bucket_index].saturating_add(1);
1099 stats.histogram_updates = stats.histogram_updates.saturating_add(1);
1100 }
1101 }
1102 }
1103 }
1104
1105 fn finish_facet_row(&mut self, row_id: u64, stats: &mut ExplorerStats) {
1106 for field_index in 0..self.fields.len() {
1107 if self.flags[field_index] & FACET_PUBLIC == 0 {
1108 continue;
1109 }
1110 if self.last_seen_row_ids[field_index] != row_id {
1111 self.unset_counts[field_index] = self.unset_counts[field_index].saturating_add(1);
1112 stats.facet_updates = stats.facet_updates.saturating_add(1);
1113 }
1114 }
1115 }
1116
1117 fn finish_histogram_row(&mut self, row_id: u64, realtime_usec: u64, stats: &mut ExplorerStats) {
1118 for field_index in 0..self.fields.len() {
1119 if self.flags[field_index] & FACET_HISTOGRAM == 0 {
1120 continue;
1121 }
1122 if self.last_seen_row_ids[field_index] == row_id {
1123 continue;
1124 }
1125 let Some(buckets) = self.field_histogram_unset_buckets[field_index].as_mut() else {
1126 continue;
1127 };
1128 if let Some(bucket_index) = histogram_bucket_index_from_bounds(
1129 realtime_usec,
1130 self.histogram_start_realtime_usec,
1131 self.histogram_bucket_width_usec,
1132 buckets.len(),
1133 ) {
1134 buckets[bucket_index] = buckets[bucket_index].saturating_add(1);
1135 stats.histogram_updates = stats.histogram_updates.saturating_add(1);
1136 }
1137 }
1138 }
1139
1140 fn finish_facets(&self, result: &mut ExplorerResult) {
1141 for field_index in 0..self.fields.len() {
1142 if self.flags[field_index] & FACET_PUBLIC == 0 {
1143 continue;
1144 }
1145 let mut values = HashMap::new();
1146 for value_index in &self.values_by_field[field_index] {
1147 let count = self.value_counts[*value_index];
1148 if count != 0 {
1149 increment_counter_by(&mut values, &self.value_labels[*value_index], count);
1150 }
1151 }
1152 if self.unset_counts[field_index] != 0 {
1153 increment_counter_by(&mut values, UNSET_VALUE, self.unset_counts[field_index]);
1154 }
1155 result
1156 .facets
1157 .insert(self.fields[field_index].clone(), values);
1158 }
1159 }
1160
1161 fn finish_histogram(&self, histogram: Option<&mut ExplorerHistogram>) {
1162 let Some(histogram) = histogram else {
1163 return;
1164 };
1165 for buckets in self.field_histogram_unset_buckets.iter().flatten() {
1166 for (bucket_index, count) in buckets.iter().enumerate() {
1167 if *count == 0 {
1168 continue;
1169 }
1170 if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
1171 increment_counter_by(&mut bucket.values, UNSET_VALUE, *count);
1172 }
1173 }
1174 }
1175 for value_index in 0..self.value_histogram_buckets.len() {
1176 let Some(buckets) = &self.value_histogram_buckets[value_index] else {
1177 continue;
1178 };
1179 for (bucket_index, count) in buckets.iter().enumerate() {
1180 if *count == 0 {
1181 continue;
1182 }
1183 if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
1184 increment_counter_by(
1185 &mut bucket.values,
1186 &self.value_labels[value_index],
1187 *count,
1188 );
1189 }
1190 }
1191 }
1192 }
1193}
1194
1195fn bounded_positive_proportion(value: f64) -> Option<f64> {
1196 if value <= 0.0 || !value.is_finite() {
1197 return None;
1198 }
1199 Some(value.min(1.0))
1200}
1201
1202impl FileReader {
1203 pub fn explore(&mut self, query: &ExplorerQuery) -> Result<ExplorerResult> {
1204 self.explore_with_strategy(query, ExplorerStrategy::Traversal)
1205 }
1206
1207 pub fn explore_with_strategy(
1208 &mut self,
1209 query: &ExplorerQuery,
1210 strategy: ExplorerStrategy,
1211 ) -> Result<ExplorerResult> {
1212 self.explore_with_strategy_and_payload_mode(query, strategy, ExplorerRowPayloadMode::Expand)
1213 }
1214
1215 pub fn explore_with_strategy_and_control(
1216 &mut self,
1217 query: &ExplorerQuery,
1218 strategy: ExplorerStrategy,
1219 control: &mut ExplorerControl<'_>,
1220 ) -> Result<ExplorerResult> {
1221 validate_no_debug_column_collection(query)?;
1222 self.explore_with_strategy_and_payload_mode_unchecked(
1223 query,
1224 strategy,
1225 ExplorerRowPayloadMode::Expand,
1226 Some(control),
1227 )
1228 }
1229
1230 #[cfg(test)]
1231 pub(crate) fn explore_with_strategy_cursor_rows(
1232 &mut self,
1233 query: &ExplorerQuery,
1234 strategy: ExplorerStrategy,
1235 ) -> Result<ExplorerResult> {
1236 self.explore_with_strategy_and_payload_mode(
1237 query,
1238 strategy,
1239 ExplorerRowPayloadMode::CursorOnly,
1240 )
1241 }
1242
1243 pub(crate) fn explore_with_strategy_cursor_rows_controlled(
1244 &mut self,
1245 query: &ExplorerQuery,
1246 strategy: ExplorerStrategy,
1247 control: &mut ExplorerControl<'_>,
1248 ) -> Result<ExplorerResult> {
1249 validate_no_debug_column_collection(query)?;
1250 self.explore_with_strategy_and_payload_mode_unchecked(
1251 query,
1252 strategy,
1253 ExplorerRowPayloadMode::CursorOnly,
1254 Some(control),
1255 )
1256 }
1257
1258 fn explore_with_strategy_and_payload_mode(
1259 &mut self,
1260 query: &ExplorerQuery,
1261 strategy: ExplorerStrategy,
1262 row_payload_mode: ExplorerRowPayloadMode,
1263 ) -> Result<ExplorerResult> {
1264 validate_no_debug_column_collection(query)?;
1265 self.explore_with_strategy_and_payload_mode_unchecked(
1266 query,
1267 strategy,
1268 row_payload_mode,
1269 None,
1270 )
1271 }
1272
1273 fn explore_with_strategy_and_payload_mode_unchecked(
1274 &mut self,
1275 query: &ExplorerQuery,
1276 strategy: ExplorerStrategy,
1277 row_payload_mode: ExplorerRowPayloadMode,
1278 mut control: Option<&mut ExplorerControl<'_>>,
1279 ) -> Result<ExplorerResult> {
1280 match strategy {
1281 ExplorerStrategy::Traversal => {
1282 self.explore_traversal(query, row_payload_mode, control.as_deref_mut())
1283 }
1284 ExplorerStrategy::Index => {
1285 self.explore_indexed(query, row_payload_mode, control.as_deref_mut())
1286 }
1287 ExplorerStrategy::Compare => self.explore_compare(query, row_payload_mode),
1288 }
1289 }
1290
1291 fn explore_traversal(
1292 &mut self,
1293 query: &ExplorerQuery,
1294 row_payload_mode: ExplorerRowPayloadMode,
1295 mut control: Option<&mut ExplorerControl<'_>>,
1296 ) -> Result<ExplorerResult> {
1297 validate_query(query)?;
1298 let mut result = explorer_result_for_query(query);
1299 let facet_groups = facet_pass_groups(query);
1300 if can_run_combined_explorer_pass(&facet_groups) {
1301 self.explore_traversal_combined(
1302 query,
1303 row_payload_mode,
1304 &mut result,
1305 &facet_groups,
1306 control.as_deref_mut(),
1307 )?;
1308 } else {
1309 self.explore_traversal_split(
1310 query,
1311 row_payload_mode,
1312 &mut result,
1313 facet_groups,
1314 control.as_deref_mut(),
1315 )?;
1316 }
1317 self.configure_explorer_filters(query, None)?;
1318 Ok(result)
1319 }
1320
1321 fn explore_traversal_combined(
1322 &mut self,
1323 query: &ExplorerQuery,
1324 row_payload_mode: ExplorerRowPayloadMode,
1325 result: &mut ExplorerResult,
1326 facet_groups: &[FacetPassGroup],
1327 control: Option<&mut ExplorerControl<'_>>,
1328 ) -> Result<()> {
1329 let facet_indices = combined_facet_indices(facet_groups);
1330 if query_needs_main_pass(query) || !facet_indices.is_empty() {
1331 self.configure_explorer_filters(query, None)?;
1332 let mut accumulator =
1333 ExplorerAccumulator::for_combined(query, &facet_indices, result.histogram.as_ref());
1334 self.scan_explorer_combined(
1335 query,
1336 &mut accumulator,
1337 result,
1338 !facet_indices.is_empty(),
1339 row_payload_mode,
1340 control,
1341 )?;
1342 accumulator.finish_facets(result);
1343 accumulator.finish_histogram(result.histogram.as_mut());
1344 }
1345 Ok(())
1346 }
1347
1348 fn explore_traversal_split(
1349 &mut self,
1350 query: &ExplorerQuery,
1351 row_payload_mode: ExplorerRowPayloadMode,
1352 result: &mut ExplorerResult,
1353 facet_groups: Vec<FacetPassGroup>,
1354 mut control: Option<&mut ExplorerControl<'_>>,
1355 ) -> Result<()> {
1356 if query_needs_main_pass(query) {
1357 self.configure_explorer_filters(query, None)?;
1358 let mut accumulator = ExplorerAccumulator::for_main(query, result.histogram.as_ref());
1359 self.scan_explorer_main(
1360 query,
1361 &mut accumulator,
1362 result,
1363 row_payload_mode,
1364 control.as_deref_mut(),
1365 )?;
1366 accumulator.finish_histogram(result.histogram.as_mut());
1367 }
1368
1369 for group in facet_groups {
1370 if explorer_control_stopped(control.as_deref()) {
1371 break;
1372 }
1373 self.configure_explorer_filters(query, group.excluded_field.as_deref())?;
1374 let mut accumulator = ExplorerAccumulator::for_facets(
1375 query,
1376 &group.facet_indices,
1377 facet_pass_needs_source_realtime(query),
1378 );
1379 self.scan_explorer_facet(
1380 query,
1381 &mut accumulator,
1382 &mut result.stats,
1383 control.as_deref_mut(),
1384 )?;
1385 accumulator.finish_facets(result);
1386 }
1387 Ok(())
1388 }
1389
1390 fn explore_compare(
1391 &mut self,
1392 query: &ExplorerQuery,
1393 row_payload_mode: ExplorerRowPayloadMode,
1394 ) -> Result<ExplorerResult> {
1395 let traversal_started = Instant::now();
1396 let traversal = self.explore_traversal(query, row_payload_mode, None)?;
1397 let traversal_duration = traversal_started.elapsed();
1398
1399 let index_started = Instant::now();
1400 let mut indexed = self.explore_indexed(query, row_payload_mode, None)?;
1401 let index_duration = index_started.elapsed();
1402
1403 if !explorer_outputs_match(&traversal, &indexed) {
1404 return Err(SdkError::VerificationError(
1405 "indexed explorer output differs from traversal explorer output".to_string(),
1406 ));
1407 }
1408 indexed.comparison = Some(ExplorerComparison {
1409 traversal_duration,
1410 index_duration,
1411 traversal_stats: traversal.stats,
1412 index_stats: indexed.stats.clone(),
1413 });
1414 Ok(indexed)
1415 }
1416
1417 fn explore_indexed(
1418 &mut self,
1419 query: &ExplorerQuery,
1420 row_payload_mode: ExplorerRowPayloadMode,
1421 mut control: Option<&mut ExplorerControl<'_>>,
1422 ) -> Result<ExplorerResult> {
1423 validate_query(query)?;
1424 validate_indexed_query(query)?;
1425 let mut result = explorer_result_for_query(query);
1426 self.indexed_collect_rows(query, row_payload_mode, &mut result, control.as_deref_mut())?;
1427 self.indexed_collect_facets(query, &mut result, control.as_deref())?;
1428 self.indexed_collect_histogram(query, &mut result, control.as_deref())?;
1429 self.configure_explorer_filters(query, None)?;
1430 Ok(result)
1431 }
1432
1433 fn indexed_collect_rows(
1434 &mut self,
1435 query: &ExplorerQuery,
1436 row_payload_mode: ExplorerRowPayloadMode,
1437 result: &mut ExplorerResult,
1438 control: Option<&mut ExplorerControl<'_>>,
1439 ) -> Result<()> {
1440 if query.limit == 0 {
1441 return Ok(());
1442 }
1443 let mut row_query = query.clone();
1444 row_query.facets.clear();
1445 row_query.histogram = None;
1446 self.configure_explorer_filters(&row_query, None)?;
1447 let mut accumulator = ExplorerAccumulator::for_main(&row_query, None);
1448 self.scan_explorer_main(
1449 &row_query,
1450 &mut accumulator,
1451 result,
1452 row_payload_mode,
1453 control,
1454 )
1455 }
1456
1457 fn indexed_collect_facets(
1458 &mut self,
1459 query: &ExplorerQuery,
1460 result: &mut ExplorerResult,
1461 control: Option<&ExplorerControl<'_>>,
1462 ) -> Result<()> {
1463 if explorer_control_stopped(control) {
1464 return Ok(());
1465 }
1466 for group in facet_pass_groups(query) {
1467 let candidates = self.indexed_candidate_set(query, group.excluded_field.as_deref())?;
1468 self.inner.with_file(|file| {
1469 indexed_count_facet_group(file, query, &group, &candidates, result)
1470 })?;
1471 }
1472 Ok(())
1473 }
1474
1475 fn indexed_collect_histogram(
1476 &mut self,
1477 query: &ExplorerQuery,
1478 result: &mut ExplorerResult,
1479 control: Option<&ExplorerControl<'_>>,
1480 ) -> Result<()> {
1481 if query.histogram.is_none() || explorer_control_stopped(control) {
1482 return Ok(());
1483 }
1484 let candidates = self.indexed_candidate_set(query, None)?;
1485 self.inner
1486 .with_file(|file| indexed_count_histogram(file, query, &candidates, result))
1487 }
1488
1489 fn indexed_candidate_set(
1490 &mut self,
1491 query: &ExplorerQuery,
1492 excluded_field: Option<&[u8]>,
1493 ) -> Result<IndexedCandidateSet> {
1494 if query.filters.is_empty()
1495 && query.after_realtime_usec.is_none()
1496 && query.before_realtime_usec.is_none()
1497 {
1498 let count = self
1499 .inner
1500 .with_file(|file| file.journal_header_ref().n_entries);
1501 return Ok(IndexedCandidateSet::All { count });
1502 }
1503
1504 self.configure_explorer_filters(query, excluded_field)?;
1505 self.seek_for_explorer(query);
1506 let mut offsets = HashSet::new();
1507 while self.step_for_explorer(query.direction)? {
1508 let Some(metadata) = self.row.metadata() else {
1509 continue;
1510 };
1511 let commit_realtime = metadata.realtime;
1512 if stop_by_commit_time(query, commit_realtime) {
1513 break;
1514 }
1515 if !timestamp_in_range(query, commit_realtime) {
1516 continue;
1517 }
1518 if let Some(entry_offset) = self.row.entry_offset() {
1519 offsets.insert(entry_offset);
1520 }
1521 }
1522 Ok(IndexedCandidateSet::Set {
1523 count: offsets.len() as u64,
1524 offsets,
1525 })
1526 }
1527
1528 fn configure_explorer_filters(
1529 &mut self,
1530 query: &ExplorerQuery,
1531 excluded_field: Option<&[u8]>,
1532 ) -> Result<()> {
1533 self.flush_matches();
1534 for filter in &query.filters {
1535 if excluded_field.is_some_and(|field| field == filter.field.as_slice()) {
1536 continue;
1537 }
1538 if filter.values.is_empty() {
1539 continue;
1540 }
1541 for value in &filter.values {
1542 let payload = payload_from_parts(&filter.field, value);
1543 self.add_match(&payload);
1544 }
1545 }
1546 Ok(())
1547 }
1548
1549 fn next_explorer_row_frame(
1550 &mut self,
1551 query: &ExplorerQuery,
1552 rows_seen: &mut u64,
1553 stats: &ExplorerStats,
1554 control: Option<&mut ExplorerControl<'_>>,
1555 ) -> Result<ExplorerLoopStep> {
1556 if !self.step_for_explorer(query.direction)? {
1557 return Ok(ExplorerLoopStep::Stop);
1558 }
1559 *rows_seen = rows_seen.saturating_add(1);
1560 if control.is_some_and(|control| control.should_stop_after_rows(*rows_seen, stats)) {
1561 return Ok(ExplorerLoopStep::Stop);
1562 }
1563 let Some(metadata) = self.row.metadata() else {
1564 return Ok(ExplorerLoopStep::Skip);
1565 };
1566 if stop_by_commit_time(query, metadata.realtime) {
1567 return Ok(ExplorerLoopStep::Stop);
1568 }
1569 if skip_by_commit_time(query, metadata.realtime) {
1570 return Ok(ExplorerLoopStep::Skip);
1571 }
1572 Ok(ExplorerLoopStep::Row(ExplorerRowFrame {
1573 commit_realtime: metadata.realtime,
1574 seqnum: metadata.seqnum,
1575 }))
1576 }
1577
1578 fn scan_row_data_or_default(
1579 &mut self,
1580 query: &ExplorerQuery,
1581 accumulator: &mut ExplorerAccumulator,
1582 row_id: &mut u64,
1583 deferred_values: &mut Vec<usize>,
1584 stats: &mut ExplorerStats,
1585 ) -> Result<RowScan> {
1586 if accumulator.required_identity_count == 0 && !query_has_fts(query) {
1587 stats.rows_examined = stats.rows_examined.saturating_add(1);
1588 return Ok(RowScan::default());
1589 }
1590 *row_id = row_id.saturating_add(1);
1591 deferred_values.clear();
1592 self.scan_current_row(
1593 query,
1594 accumulator,
1595 *row_id,
1596 ScanApply::Deferred(deferred_values),
1597 stats,
1598 )
1599 }
1600
1601 fn accepted_effective_realtime(
1602 query: &ExplorerQuery,
1603 scan: &RowScan,
1604 commit_realtime: u64,
1605 stats: &mut ExplorerStats,
1606 control: Option<&mut ExplorerControl<'_>>,
1607 ) -> Option<u64> {
1608 let mut effective_realtime = effective_realtime_from_scan(scan.timestamp, commit_realtime);
1609 record_source_realtime_delta(stats, scan.timestamp, commit_realtime);
1610 if let Some(control) = control {
1611 effective_realtime = control.adjust_realtime(effective_realtime);
1612 }
1613 (timestamp_in_range(query, effective_realtime) && !row_rejected_by_fts(query, scan))
1614 .then_some(effective_realtime)
1615 }
1616
1617 fn push_explorer_row_if_wanted(
1618 &mut self,
1619 query: &ExplorerQuery,
1620 result: &mut ExplorerResult,
1621 row_payload_mode: ExplorerRowPayloadMode,
1622 effective_realtime: u64,
1623 ) -> Result<()> {
1624 if row_within_anchor(query, effective_realtime) && result.rows.len() < query.limit {
1625 result.rows.push(self.current_explorer_row(
1626 effective_realtime,
1627 &mut result.stats,
1628 row_payload_mode,
1629 )?);
1630 }
1631 Ok(())
1632 }
1633
1634 fn apply_main_scanned_row(
1635 &mut self,
1636 query: &ExplorerQuery,
1637 accumulator: &mut ExplorerAccumulator,
1638 result: &mut ExplorerResult,
1639 row_payload_mode: ExplorerRowPayloadMode,
1640 scanned: MainScannedRow<'_>,
1641 control: Option<&mut ExplorerControl<'_>>,
1642 ) -> Result<bool> {
1643 if query.debug_collect_column_fields_by_row_traversal {
1644 result.column_fields.extend(scanned.scan.column_fields);
1645 }
1646 record_last_realtime(&mut result.stats, scanned.commit_realtime);
1647 result.stats.rows_matched = result.stats.rows_matched.saturating_add(1);
1648 let stop_after_matched_row = control.is_some_and(|control| {
1649 control.emit_matched_row(scanned.effective_realtime, result.stats.rows_matched)
1650 });
1651 for value_index in scanned.deferred_values {
1652 accumulator.apply_value(
1653 *value_index,
1654 Some(scanned.effective_realtime),
1655 &mut result.stats,
1656 );
1657 }
1658 accumulator.finish_histogram_row(
1659 scanned.row_id,
1660 scanned.effective_realtime,
1661 &mut result.stats,
1662 );
1663 self.push_explorer_row_if_wanted(
1664 query,
1665 result,
1666 row_payload_mode,
1667 scanned.effective_realtime,
1668 )?;
1669 Ok(stop_after_matched_row
1670 || should_stop_when_rows_full(
1671 query,
1672 &result.rows,
1673 scanned.effective_realtime,
1674 result.stats.rows_matched,
1675 ))
1676 }
1677
1678 fn sampling_state_for_combined(
1679 query: &ExplorerQuery,
1680 result: &ExplorerResult,
1681 control: Option<&mut ExplorerControl<'_>>,
1682 ) -> Option<ExplorerSamplingState> {
1683 let sampling = ExplorerSamplingState::for_query(
1684 query,
1685 result
1686 .histogram
1687 .as_ref()
1688 .map(|histogram| histogram.buckets.len()),
1689 );
1690 if let Some(control) = control {
1691 if let (Some(shared_sampling), Some(file_sampling)) =
1692 (control.sampling.as_deref_mut(), query.sampling)
1693 {
1694 shared_sampling.begin_file(file_sampling);
1695 }
1696 }
1697 sampling
1698 }
1699
1700 fn combined_sampling_decision(
1701 query: &ExplorerQuery,
1702 rows: &[ExplorerRow],
1703 frame: ExplorerRowFrame,
1704 sampling: &mut Option<ExplorerSamplingState>,
1705 mut control: Option<&mut ExplorerControl<'_>>,
1706 ) -> Option<ExplorerSamplingDecision> {
1707 let candidate_to_keep = if let Some(control) = control.as_deref_mut() {
1708 control.candidate_row.as_deref_mut().map_or_else(
1709 || row_candidate_to_keep(query, rows, frame.commit_realtime),
1710 |candidate_row| candidate_row(frame.commit_realtime),
1711 )
1712 } else {
1713 row_candidate_to_keep(query, rows, frame.commit_realtime)
1714 };
1715 if let Some(control) = control {
1716 if let Some(shared_sampling) = control.sampling.as_deref_mut() {
1717 return Some(shared_sampling.decide(
1718 frame.commit_realtime,
1719 frame.seqnum,
1720 candidate_to_keep,
1721 ));
1722 }
1723 }
1724 sampling
1725 .as_mut()
1726 .map(|sampling| sampling.decide(frame.commit_realtime, frame.seqnum, candidate_to_keep))
1727 }
1728
1729 fn apply_combined_sampling_decision(
1730 decision: ExplorerSamplingDecision,
1731 mode: CombinedScanMode,
1732 result: &mut ExplorerResult,
1733 frame: ExplorerRowFrame,
1734 ) -> SamplingRowAction {
1735 match decision {
1736 ExplorerSamplingDecision::Full { sampled } => {
1737 if sampled {
1738 result.stats.sampling_sampled = result.stats.sampling_sampled.saturating_add(1);
1739 }
1740 SamplingRowAction::Scan
1741 }
1742 ExplorerSamplingDecision::SkipFields => {
1743 record_combined_unsampled_row(
1744 &mut result.stats,
1745 mode,
1746 frame.commit_realtime,
1747 1,
1748 true,
1749 );
1750 add_special_histogram_value(
1751 result.histogram.as_mut(),
1752 frame.commit_realtime,
1753 EXPLORER_UNSAMPLED_VALUE,
1754 1,
1755 &mut result.stats,
1756 );
1757 SamplingRowAction::Skip
1758 }
1759 ExplorerSamplingDecision::StopAndEstimate {
1760 remaining_rows,
1761 from_realtime_usec,
1762 to_realtime_usec,
1763 } => {
1764 record_combined_unsampled_row(
1765 &mut result.stats,
1766 mode,
1767 frame.commit_realtime,
1768 remaining_rows,
1769 false,
1770 );
1771 result.stats.rows_estimated =
1772 result.stats.rows_estimated.saturating_add(remaining_rows);
1773 result.stats.sampling_estimated = result
1774 .stats
1775 .sampling_estimated
1776 .saturating_add(remaining_rows);
1777 add_estimated_histogram_range(
1778 result.histogram.as_mut(),
1779 from_realtime_usec,
1780 to_realtime_usec,
1781 remaining_rows,
1782 &mut result.stats,
1783 );
1784 SamplingRowAction::Stop
1785 }
1786 }
1787 }
1788
1789 fn apply_combined_scanned_row(
1790 &mut self,
1791 query: &ExplorerQuery,
1792 accumulator: &mut ExplorerAccumulator,
1793 result: &mut ExplorerResult,
1794 row_payload_mode: ExplorerRowPayloadMode,
1795 mode: CombinedScanMode,
1796 scanned: MainScannedRow<'_>,
1797 control: Option<&mut ExplorerControl<'_>>,
1798 ) -> Result<bool> {
1799 if query.debug_collect_column_fields_by_row_traversal {
1800 result.column_fields.extend(scanned.scan.column_fields);
1801 }
1802 record_last_realtime(&mut result.stats, scanned.commit_realtime);
1803 let stop_after_matched_row = update_combined_matched_stats(
1804 &mut result.stats,
1805 mode,
1806 scanned.effective_realtime,
1807 control,
1808 );
1809 let value_realtime = query
1810 .histogram
1811 .is_some()
1812 .then_some(scanned.effective_realtime);
1813 for value_index in scanned.deferred_values {
1814 accumulator.apply_value(*value_index, value_realtime, &mut result.stats);
1815 }
1816 if query.histogram.is_some() {
1817 accumulator.finish_histogram_row(
1818 scanned.row_id,
1819 scanned.effective_realtime,
1820 &mut result.stats,
1821 );
1822 }
1823 if mode.include_facets {
1824 accumulator.finish_facet_row(scanned.row_id, &mut result.stats);
1825 }
1826 self.push_explorer_row_if_wanted(
1827 query,
1828 result,
1829 row_payload_mode,
1830 scanned.effective_realtime,
1831 )?;
1832 Ok(stop_after_matched_row
1833 || should_stop_when_rows_full(
1834 query,
1835 &result.rows,
1836 scanned.effective_realtime,
1837 result.stats.rows_matched,
1838 ))
1839 }
1840
1841 fn scan_explorer_main(
1842 &mut self,
1843 query: &ExplorerQuery,
1844 accumulator: &mut ExplorerAccumulator,
1845 result: &mut ExplorerResult,
1846 row_payload_mode: ExplorerRowPayloadMode,
1847 mut control: Option<&mut ExplorerControl<'_>>,
1848 ) -> Result<()> {
1849 self.seek_for_explorer(query);
1850 let mut row_id = 0u64;
1851 let mut rows_seen = 0u64;
1852 let mut deferred_values = Vec::new();
1853 loop {
1854 let frame = match self.next_explorer_row_frame(
1855 query,
1856 &mut rows_seen,
1857 &result.stats,
1858 control.as_deref_mut(),
1859 )? {
1860 ExplorerLoopStep::Stop => break,
1861 ExplorerLoopStep::Skip => continue,
1862 ExplorerLoopStep::Row(frame) => frame,
1863 };
1864 let scan = self.scan_row_data_or_default(
1865 query,
1866 accumulator,
1867 &mut row_id,
1868 &mut deferred_values,
1869 &mut result.stats,
1870 )?;
1871 let Some(effective_realtime) = Self::accepted_effective_realtime(
1872 query,
1873 &scan,
1874 frame.commit_realtime,
1875 &mut result.stats,
1876 control.as_deref_mut(),
1877 ) else {
1878 continue;
1879 };
1880 let scanned = MainScannedRow {
1881 row_id,
1882 commit_realtime: frame.commit_realtime,
1883 effective_realtime,
1884 scan,
1885 deferred_values: &deferred_values,
1886 };
1887 if self.apply_main_scanned_row(
1888 query,
1889 accumulator,
1890 result,
1891 row_payload_mode,
1892 scanned,
1893 control.as_deref_mut(),
1894 )? {
1895 break;
1896 }
1897 }
1898 result.stats.rows_returned = result.rows.len() as u64;
1899 Ok(())
1900 }
1901
1902 fn scan_explorer_combined(
1903 &mut self,
1904 query: &ExplorerQuery,
1905 accumulator: &mut ExplorerAccumulator,
1906 result: &mut ExplorerResult,
1907 include_facets: bool,
1908 row_payload_mode: ExplorerRowPayloadMode,
1909 mut control: Option<&mut ExplorerControl<'_>>,
1910 ) -> Result<()> {
1911 self.seek_for_explorer(query);
1912 let mode = CombinedScanMode {
1913 include_main: query_needs_main_pass(query),
1914 include_facets,
1915 };
1916 let mut row_id = 0u64;
1917 let mut rows_seen = 0u64;
1918 let mut deferred_values = Vec::new();
1919 let mut sampling = Self::sampling_state_for_combined(query, result, control.as_deref_mut());
1920 loop {
1921 let frame = match self.next_explorer_row_frame(
1922 query,
1923 &mut rows_seen,
1924 &result.stats,
1925 control.as_deref_mut(),
1926 )? {
1927 ExplorerLoopStep::Stop => break,
1928 ExplorerLoopStep::Skip => continue,
1929 ExplorerLoopStep::Row(frame) => frame,
1930 };
1931 if let Some(decision) = Self::combined_sampling_decision(
1932 query,
1933 &result.rows,
1934 frame,
1935 &mut sampling,
1936 control.as_deref_mut(),
1937 ) {
1938 match Self::apply_combined_sampling_decision(decision, mode, result, frame) {
1939 SamplingRowAction::Scan => {}
1940 SamplingRowAction::Skip => continue,
1941 SamplingRowAction::Stop => break,
1942 }
1943 }
1944 let scan = self.scan_row_data_or_default(
1945 query,
1946 accumulator,
1947 &mut row_id,
1948 &mut deferred_values,
1949 &mut result.stats,
1950 )?;
1951 let Some(effective_realtime) = Self::accepted_effective_realtime(
1952 query,
1953 &scan,
1954 frame.commit_realtime,
1955 &mut result.stats,
1956 control.as_deref_mut(),
1957 ) else {
1958 continue;
1959 };
1960 let scanned = MainScannedRow {
1961 row_id,
1962 commit_realtime: frame.commit_realtime,
1963 effective_realtime,
1964 scan,
1965 deferred_values: &deferred_values,
1966 };
1967 if self.apply_combined_scanned_row(
1968 query,
1969 accumulator,
1970 result,
1971 row_payload_mode,
1972 mode,
1973 scanned,
1974 control.as_deref_mut(),
1975 )? {
1976 break;
1977 }
1978 }
1979 result.stats.rows_returned = result.rows.len() as u64;
1980 Ok(())
1981 }
1982
1983 fn scan_explorer_facet(
1984 &mut self,
1985 query: &ExplorerQuery,
1986 accumulator: &mut ExplorerAccumulator,
1987 stats: &mut ExplorerStats,
1988 mut control: Option<&mut ExplorerControl<'_>>,
1989 ) -> Result<()> {
1990 self.seek_for_explorer(query);
1991 let defer_apply = query.after_realtime_usec.is_some()
1992 || query.before_realtime_usec.is_some()
1993 || query_has_fts(query);
1994 let mut row_id = 0u64;
1995 let mut rows_seen = 0u64;
1996 let mut deferred_values = Vec::new();
1997 loop {
1998 let frame = match self.next_explorer_row_frame(
1999 query,
2000 &mut rows_seen,
2001 stats,
2002 control.as_deref_mut(),
2003 )? {
2004 ExplorerLoopStep::Stop => break,
2005 ExplorerLoopStep::Skip => continue,
2006 ExplorerLoopStep::Row(frame) => frame,
2007 };
2008 row_id = row_id.saturating_add(1);
2009 deferred_values.clear();
2010 let scan = if defer_apply {
2011 self.scan_current_row(
2012 query,
2013 accumulator,
2014 row_id,
2015 ScanApply::Deferred(&mut deferred_values),
2016 stats,
2017 )?
2018 } else {
2019 self.scan_current_row(query, accumulator, row_id, ScanApply::Immediate, stats)?
2020 };
2021 if Self::accepted_effective_realtime(query, &scan, frame.commit_realtime, stats, None)
2022 .is_none()
2023 {
2024 continue;
2025 }
2026 record_last_realtime(stats, frame.commit_realtime);
2027 stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(1);
2028 if defer_apply {
2029 for value_index in &deferred_values {
2030 accumulator.apply_value(*value_index, None, stats);
2031 }
2032 }
2033 accumulator.finish_facet_row(row_id, stats);
2034 }
2035 Ok(())
2036 }
2037
2038 fn scan_current_row(
2039 &mut self,
2040 query: &ExplorerQuery,
2041 accumulator: &mut ExplorerAccumulator,
2042 row_id: u64,
2043 mut apply: ScanApply<'_>,
2044 stats: &mut ExplorerStats,
2045 ) -> Result<RowScan> {
2046 stats.rows_examined = stats.rows_examined.saturating_add(1);
2047 let mut out = RowScan::default();
2048 let mut state = RowScanState::new(query, accumulator);
2049
2050 let inner = &mut self.inner;
2051 let row = &mut self.row;
2052 inner.with_mut(|fields| {
2053 fields.reader.release_object_guards();
2054 row.restart_data()?;
2055 let result = (|| {
2056 for index in 0..row.data_offset_count() {
2057 let Some(data_offset) = row.data_offset_at(index) else {
2058 break;
2059 };
2060 stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2061 let class = classify_data_for_accumulator(
2062 fields.file,
2063 row,
2064 data_offset,
2065 accumulator,
2066 state.needs_fts,
2067 query,
2068 query
2069 .debug_collect_column_fields_by_row_traversal
2070 .then_some(&mut out.column_fields),
2071 stats,
2072 )?;
2073
2074 handle_row_offset_class(
2075 class,
2076 accumulator,
2077 row_id,
2078 &mut state,
2079 &mut out,
2080 &mut apply,
2081 stats,
2082 );
2083 if state.should_stop_row_scan() {
2084 record_row_scan_early_stop(stats);
2085 break;
2086 }
2087 }
2088 Ok::<_, SdkError>(())
2089 })();
2090 row.reset_data_state(fields.file)?;
2091 result
2092 })?;
2093 Ok(out)
2094 }
2095
2096 fn seek_for_explorer(&mut self, query: &ExplorerQuery) {
2097 let anchor = if query.stop_when_rows_full {
2098 query.anchor
2099 } else {
2100 ExplorerAnchor::Auto
2101 };
2102 match query.direction {
2103 Direction::Forward => match anchor {
2104 ExplorerAnchor::Auto => {
2105 if let Some(after) = query.after_realtime_usec {
2106 self.seek_realtime(after.saturating_sub(query.realtime_slack_usec));
2107 } else {
2108 self.seek_head();
2109 }
2110 }
2111 ExplorerAnchor::Realtime(usec) => self.seek_realtime(usec),
2112 ExplorerAnchor::Tail => self.seek_tail(),
2113 ExplorerAnchor::Head => {
2114 if let Some(after) = query.after_realtime_usec {
2115 self.seek_realtime(after.saturating_sub(query.realtime_slack_usec));
2116 } else {
2117 self.seek_head();
2118 }
2119 }
2120 },
2121 Direction::Backward => match anchor {
2122 ExplorerAnchor::Auto => {
2123 if let Some(before) = query.before_realtime_usec {
2124 self.seek_realtime(before.saturating_add(query.realtime_slack_usec));
2125 } else {
2126 self.seek_tail();
2127 }
2128 }
2129 ExplorerAnchor::Realtime(usec) => self.seek_realtime(usec),
2130 ExplorerAnchor::Head => self.seek_head(),
2131 ExplorerAnchor::Tail => {
2132 if let Some(before) = query.before_realtime_usec {
2133 self.seek_realtime(before.saturating_add(query.realtime_slack_usec));
2134 } else {
2135 self.seek_tail();
2136 }
2137 }
2138 },
2139 }
2140 }
2141
2142 fn step_for_explorer(&mut self, direction: Direction) -> Result<bool> {
2143 match direction {
2144 Direction::Forward => self.next(),
2145 Direction::Backward => self.previous(),
2146 }
2147 }
2148
2149 fn current_explorer_row(
2150 &mut self,
2151 realtime_usec: u64,
2152 stats: &mut ExplorerStats,
2153 row_payload_mode: ExplorerRowPayloadMode,
2154 ) -> Result<ExplorerRow> {
2155 let cursor = self.get_cursor()?;
2156 let mut payloads = Vec::new();
2157 if row_payload_mode == ExplorerRowPayloadMode::Expand {
2158 self.collect_entry_payloads(&mut payloads)?;
2159 stats.returned_row_expansions = stats.returned_row_expansions.saturating_add(1);
2160 }
2161 Ok(ExplorerRow {
2162 realtime_usec,
2163 cursor,
2164 payloads,
2165 })
2166 }
2167}
2168
2169enum ScanApply<'a> {
2170 Immediate,
2171 Deferred(&'a mut Vec<usize>),
2172}
2173
2174#[derive(Debug, Clone, Copy)]
2175struct ExplorerRowFrame {
2176 commit_realtime: u64,
2177 seqnum: u64,
2178}
2179
2180enum ExplorerLoopStep {
2181 Stop,
2182 Skip,
2183 Row(ExplorerRowFrame),
2184}
2185
2186#[derive(Debug, Clone, Copy)]
2187struct CombinedScanMode {
2188 include_main: bool,
2189 include_facets: bool,
2190}
2191
2192struct MainScannedRow<'a> {
2193 row_id: u64,
2194 commit_realtime: u64,
2195 effective_realtime: u64,
2196 scan: RowScan,
2197 deferred_values: &'a [usize],
2198}
2199
2200struct RowScanState {
2201 use_first_value: bool,
2202 needs_fts: bool,
2203 collect_column_fields: bool,
2204 fields_missing_from_row: usize,
2205}
2206
2207impl RowScanState {
2208 fn new(query: &ExplorerQuery, accumulator: &ExplorerAccumulator) -> Self {
2209 let use_first_value = query.field_mode == ExplorerFieldMode::FirstValue;
2210 Self {
2211 use_first_value,
2212 needs_fts: query_has_fts(query),
2213 collect_column_fields: query.debug_collect_column_fields_by_row_traversal,
2214 fields_missing_from_row: if use_first_value {
2215 accumulator.required_identity_count
2216 } else {
2217 0
2218 },
2219 }
2220 }
2221
2222 fn should_stop_row_scan(&self) -> bool {
2223 self.use_first_value
2224 && !self.needs_fts
2225 && !self.collect_column_fields
2226 && self.fields_missing_from_row == 0
2227 }
2228}
2229
2230enum SamplingRowAction {
2231 Scan,
2232 Skip,
2233 Stop,
2234}
2235
2236enum IndexedCandidateSet {
2237 All {
2238 count: u64,
2239 },
2240 Set {
2241 count: u64,
2242 offsets: HashSet<NonZeroU64>,
2243 },
2244}
2245
2246impl IndexedCandidateSet {
2247 fn count(&self) -> u64 {
2248 match self {
2249 Self::All { count } | Self::Set { count, .. } => *count,
2250 }
2251 }
2252
2253 fn contains(&self, entry_offset: NonZeroU64) -> bool {
2254 match self {
2255 Self::All { .. } => true,
2256 Self::Set { offsets, .. } => offsets.contains(&entry_offset),
2257 }
2258 }
2259}
2260
2261struct FacetPassGroup {
2262 excluded_field: Option<Vec<u8>>,
2263 facet_indices: Vec<usize>,
2264}
2265
2266fn facet_pass_groups(query: &ExplorerQuery) -> Vec<FacetPassGroup> {
2267 let filter_fields: HashSet<&[u8]> = query
2268 .filters
2269 .iter()
2270 .map(|filter| filter.field.as_slice())
2271 .collect();
2272 let mut groups: Vec<FacetPassGroup> = Vec::new();
2273
2274 for (index, facet) in query.facets.iter().enumerate() {
2275 let excluded_field = (query.exclude_facet_field_filters
2276 && filter_fields.contains(facet.as_slice()))
2277 .then(|| facet.clone());
2278 if let Some(existing) = groups
2279 .iter_mut()
2280 .find(|group| group.excluded_field.as_deref() == excluded_field.as_deref())
2281 {
2282 existing.facet_indices.push(index);
2283 } else {
2284 groups.push(FacetPassGroup {
2285 excluded_field,
2286 facet_indices: vec![index],
2287 });
2288 }
2289 }
2290
2291 groups
2292}
2293
2294fn indexed_count_facet_group(
2295 file: &JournalFile<Mmap>,
2296 query: &ExplorerQuery,
2297 group: &FacetPassGroup,
2298 candidates: &IndexedCandidateSet,
2299 result: &mut ExplorerResult,
2300) -> Result<()> {
2301 result.stats.facet_rows_matched = result
2302 .stats
2303 .facet_rows_matched
2304 .saturating_add(candidates.count());
2305
2306 for facet_index in &group.facet_indices {
2307 let Some(field) = query.facets.get(*facet_index) else {
2308 continue;
2309 };
2310 let mut values = HashMap::new();
2311 let mut rows_with_field = HashSet::new();
2312 let mut decompressed = Vec::new();
2313
2314 for item in file.field_data_objects_with_offsets(field)? {
2315 let (_, data) = item?;
2316 let Some((value, cursor)) =
2317 indexed_value_and_cursor(&data, field, &mut decompressed, &mut result.stats)?
2318 else {
2319 continue;
2320 };
2321 drop(data);
2322
2323 let count = indexed_count_facet_entries(
2324 file,
2325 cursor,
2326 candidates,
2327 &mut rows_with_field,
2328 &mut result.stats,
2329 )?;
2330 if count == 0 {
2331 continue;
2332 }
2333 increment_counter_by(&mut values, &value, count);
2334 result.stats.facet_updates = result.stats.facet_updates.saturating_add(count);
2335 }
2336
2337 let unset = candidates
2338 .count()
2339 .saturating_sub(rows_with_field.len() as u64);
2340 if unset != 0 {
2341 increment_counter_by(&mut values, UNSET_VALUE, unset);
2342 result.stats.facet_updates = result.stats.facet_updates.saturating_add(unset);
2343 }
2344 result.facets.insert(field.clone(), values);
2345 }
2346
2347 Ok(())
2348}
2349
2350fn indexed_count_histogram(
2351 file: &JournalFile<Mmap>,
2352 query: &ExplorerQuery,
2353 candidates: &IndexedCandidateSet,
2354 result: &mut ExplorerResult,
2355) -> Result<()> {
2356 let Some(histogram) = result.histogram.as_mut() else {
2357 return Ok(());
2358 };
2359 let field = histogram.field.clone();
2360 let mut decompressed = Vec::new();
2361 let mut rows_with_field = HashSet::new();
2362
2363 for item in file.field_data_objects_with_offsets(&field)? {
2364 let (_, data) = item?;
2365 let Some((value, cursor)) =
2366 indexed_value_and_cursor(&data, &field, &mut decompressed, &mut result.stats)?
2367 else {
2368 continue;
2369 };
2370 drop(data);
2371
2372 indexed_count_histogram_entries(
2373 file,
2374 cursor,
2375 candidates,
2376 &value,
2377 histogram,
2378 query,
2379 &mut rows_with_field,
2380 &mut result.stats,
2381 )?;
2382 }
2383
2384 indexed_count_histogram_unset_entries(
2385 file,
2386 candidates,
2387 &rows_with_field,
2388 histogram,
2389 query,
2390 &mut result.stats,
2391 )?;
2392
2393 Ok(())
2394}
2395
2396fn indexed_value_and_cursor(
2397 data: &DataObject<&[u8]>,
2398 field: &[u8],
2399 decompressed: &mut Vec<u8>,
2400 stats: &mut ExplorerStats,
2401) -> Result<Option<(Vec<u8>, Option<InlinedCursor>)>> {
2402 stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2403 stats.data_payloads_loaded = stats.data_payloads_loaded.saturating_add(1);
2404 let payload = if data.is_compressed() {
2405 decompressed.clear();
2406 let len = data.decompress(decompressed)?;
2407 stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2408 &decompressed[..len]
2409 } else {
2410 data.raw_payload()
2411 };
2412
2413 let Some((payload_field, value)) = split_payload_bytes(payload) else {
2414 return Ok(None);
2415 };
2416 if payload_field != field {
2417 return Ok(None);
2418 }
2419 Ok(Some((value.to_vec(), data.inlined_cursor())))
2420}
2421
2422fn indexed_count_facet_entries(
2423 file: &JournalFile<Mmap>,
2424 cursor: Option<InlinedCursor>,
2425 candidates: &IndexedCandidateSet,
2426 rows_with_field: &mut HashSet<NonZeroU64>,
2427 stats: &mut ExplorerStats,
2428) -> Result<u64> {
2429 let mut count = 0u64;
2430 indexed_visit_entries(file, cursor, |entry_offset| {
2431 stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2432 if candidates.contains(entry_offset) {
2433 count = count.saturating_add(1);
2434 rows_with_field.insert(entry_offset);
2435 }
2436 Ok(())
2437 })?;
2438 Ok(count)
2439}
2440
2441fn indexed_count_histogram_entries(
2442 file: &JournalFile<Mmap>,
2443 cursor: Option<InlinedCursor>,
2444 candidates: &IndexedCandidateSet,
2445 value: &[u8],
2446 histogram: &mut ExplorerHistogram,
2447 query: &ExplorerQuery,
2448 rows_with_field: &mut HashSet<NonZeroU64>,
2449 stats: &mut ExplorerStats,
2450) -> Result<()> {
2451 let histogram_start = histogram
2452 .buckets
2453 .first()
2454 .map(|bucket| bucket.start_realtime_usec)
2455 .unwrap_or_default();
2456 let histogram_bucket_width = histogram
2457 .buckets
2458 .first()
2459 .map(|bucket| {
2460 bucket
2461 .end_realtime_usec
2462 .saturating_sub(bucket.start_realtime_usec)
2463 .max(1)
2464 })
2465 .unwrap_or(1);
2466 let histogram_bucket_count = histogram.buckets.len();
2467
2468 indexed_visit_entries(file, cursor, |entry_offset| {
2469 stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2470 if !candidates.contains(entry_offset) {
2471 return Ok(());
2472 }
2473 let entry = file.entry_ref(entry_offset)?;
2474 let realtime = entry.header.realtime;
2475 drop(entry);
2476 rows_with_field.insert(entry_offset);
2477 if !timestamp_in_range(query, realtime) {
2478 return Ok(());
2479 }
2480 let Some(bucket_index) = histogram_bucket_index_from_bounds(
2481 realtime,
2482 histogram_start,
2483 histogram_bucket_width,
2484 histogram_bucket_count,
2485 ) else {
2486 return Ok(());
2487 };
2488 if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
2489 increment_counter_by(&mut bucket.values, value, 1);
2490 stats.histogram_updates = stats.histogram_updates.saturating_add(1);
2491 }
2492 Ok(())
2493 })
2494}
2495
2496fn indexed_count_histogram_unset_entries(
2497 file: &JournalFile<Mmap>,
2498 candidates: &IndexedCandidateSet,
2499 rows_with_field: &HashSet<NonZeroU64>,
2500 histogram: &mut ExplorerHistogram,
2501 query: &ExplorerQuery,
2502 stats: &mut ExplorerStats,
2503) -> Result<()> {
2504 let histogram_start = histogram
2505 .buckets
2506 .first()
2507 .map(|bucket| bucket.start_realtime_usec)
2508 .unwrap_or_default();
2509 let histogram_bucket_width = histogram
2510 .buckets
2511 .first()
2512 .map(|bucket| {
2513 bucket
2514 .end_realtime_usec
2515 .saturating_sub(bucket.start_realtime_usec)
2516 .max(1)
2517 })
2518 .unwrap_or(1);
2519 let histogram_bucket_count = histogram.buckets.len();
2520
2521 let mut visit = |entry_offset: NonZeroU64| -> Result<()> {
2522 if rows_with_field.contains(&entry_offset) {
2523 return Ok(());
2524 }
2525 let entry = file.entry_ref(entry_offset)?;
2526 let realtime = entry.header.realtime;
2527 drop(entry);
2528 if !timestamp_in_range(query, realtime) {
2529 return Ok(());
2530 }
2531 let Some(bucket_index) = histogram_bucket_index_from_bounds(
2532 realtime,
2533 histogram_start,
2534 histogram_bucket_width,
2535 histogram_bucket_count,
2536 ) else {
2537 return Ok(());
2538 };
2539 if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
2540 increment_counter_by(&mut bucket.values, UNSET_VALUE, 1);
2541 stats.histogram_updates = stats.histogram_updates.saturating_add(1);
2542 }
2543 Ok(())
2544 };
2545
2546 match candidates {
2547 IndexedCandidateSet::All { .. } => {
2548 let mut entry_offsets = Vec::new();
2549 file.entry_offsets(&mut entry_offsets)?;
2550 for entry_offset in entry_offsets {
2551 visit(entry_offset)?;
2552 }
2553 }
2554 IndexedCandidateSet::Set { offsets, .. } => {
2555 for entry_offset in offsets {
2556 visit(*entry_offset)?;
2557 }
2558 }
2559 }
2560
2561 Ok(())
2562}
2563
2564fn indexed_visit_entries<F>(
2565 file: &JournalFile<Mmap>,
2566 cursor: Option<InlinedCursor>,
2567 mut visitor: F,
2568) -> Result<()>
2569where
2570 F: FnMut(NonZeroU64) -> Result<()>,
2571{
2572 let Some(mut cursor) = cursor.map(|cursor| cursor.head()) else {
2573 return Ok(());
2574 };
2575 let mut needle = NonZeroU64::MIN;
2576 while let Some(entry_offset) = cursor.next_until(file, needle)? {
2577 visitor(entry_offset)?;
2578 let Some(next) = entry_offset.get().checked_add(1).and_then(NonZeroU64::new) else {
2579 break;
2580 };
2581 needle = next;
2582 }
2583 Ok(())
2584}
2585
2586fn handle_row_offset_class(
2587 class: OffsetClass,
2588 accumulator: &mut ExplorerAccumulator,
2589 row_id: u64,
2590 state: &mut RowScanState,
2591 out: &mut RowScan,
2592 apply: &mut ScanApply<'_>,
2593 stats: &mut ExplorerStats,
2594) {
2595 match class {
2596 OffsetClass::Irrelevant => {
2597 stats.data_refs_skipped = stats.data_refs_skipped.saturating_add(1);
2598 }
2599 OffsetClass::FtsNegativeMatch => {
2600 out.fts_negative_match = true;
2601 }
2602 OffsetClass::FtsMatch => {
2603 out.fts_matches = true;
2604 }
2605 OffsetClass::Value(value_index) => {
2606 handle_row_value_class(value_index, accumulator, row_id, state, out, apply, stats)
2607 }
2608 }
2609}
2610
2611fn handle_row_value_class(
2612 value_index: usize,
2613 accumulator: &mut ExplorerAccumulator,
2614 row_id: u64,
2615 state: &mut RowScanState,
2616 out: &mut RowScan,
2617 apply: &mut ScanApply<'_>,
2618 stats: &mut ExplorerStats,
2619) {
2620 if accumulator.value_fts_matches[value_index] {
2621 out.fts_matches = true;
2622 }
2623 let field_index = accumulator.value_field_indices[value_index];
2624 let first_for_field = if state.use_first_value
2625 || accumulator.flags[field_index] & (FACET_PUBLIC | FACET_HISTOGRAM) != 0
2626 {
2627 accumulator.mark_field_seen(field_index, row_id)
2628 } else {
2629 true
2630 };
2631 if state.use_first_value && first_for_field {
2632 state.fields_missing_from_row = state.fields_missing_from_row.saturating_sub(1);
2633 }
2634 if !state.use_first_value || first_for_field {
2635 if let Some(timestamp) = accumulator.value_source_realtime[value_index] {
2636 out.timestamp = Some(timestamp);
2637 }
2638 match apply {
2639 ScanApply::Immediate => accumulator.apply_value(value_index, None, stats),
2640 ScanApply::Deferred(values) => values.push(value_index),
2641 }
2642 }
2643}
2644
2645fn record_row_scan_early_stop(stats: &mut ExplorerStats) {
2646 stats.early_stop_opportunities = stats.early_stop_opportunities.saturating_add(1);
2647 stats.early_stops = stats.early_stops.saturating_add(1);
2648}
2649
2650fn cached_offset_class_for_accumulator(
2651 file: &JournalFile<Mmap>,
2652 row: &mut CurrentRowView,
2653 data_offset: NonZeroU64,
2654 accumulator: &ExplorerAccumulator,
2655 column_fields: Option<&mut Vec<Vec<u8>>>,
2656 stats: &mut ExplorerStats,
2657) -> Result<Option<OffsetClass>> {
2658 let Some(class) = accumulator.offset_cache.lookup(data_offset) else {
2659 return Ok(None);
2660 };
2661 if let Some(column_fields) = column_fields {
2662 if let Some((field, _)) = read_payload_field(file, row, data_offset, stats)? {
2663 column_fields.push(field);
2664 }
2665 }
2666 stats.data_cache_hits = stats.data_cache_hits.saturating_add(1);
2667 Ok(Some(class))
2668}
2669
2670fn payload_for_classification<'a>(
2671 file: &JournalFile<Mmap>,
2672 row: &'a mut CurrentRowView,
2673 data_offset: NonZeroU64,
2674 stats: &mut ExplorerStats,
2675) -> Result<&'a [u8]> {
2676 stats.data_cache_misses = stats.data_cache_misses.saturating_add(1);
2677 stats.data_payloads_loaded = stats.data_payloads_loaded.saturating_add(1);
2678 let was_compressed = file.data_ref(data_offset)?.is_compressed();
2679 let payload = row.read_payload_at(file, data_offset)?;
2680 if was_compressed {
2681 stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2682 }
2683 Ok(row.payload_slice(payload))
2684}
2685
2686fn fts_flags_for_value(
2687 value: &[u8],
2688 needs_fts: bool,
2689 query: &ExplorerQuery,
2690 stats: &mut ExplorerStats,
2691) -> (bool, bool) {
2692 if !needs_fts {
2693 return (false, false);
2694 }
2695 stats.fts_scans = stats.fts_scans.saturating_add(1);
2696 match match_fts_query(value, query) {
2697 FtsTermMatch::Positive => (true, false),
2698 FtsTermMatch::Negative => (false, true),
2699 FtsTermMatch::None => (false, false),
2700 }
2701}
2702
2703fn structured_payload_class(
2704 field: &[u8],
2705 value: &[u8],
2706 data_offset: NonZeroU64,
2707 accumulator: &mut ExplorerAccumulator,
2708 fts_matches: bool,
2709 fts_negative_match: bool,
2710) -> OffsetClass {
2711 if fts_negative_match {
2712 OffsetClass::FtsNegativeMatch
2713 } else if let Some(field_index) = accumulator.field_lookup.get(field).copied() {
2714 OffsetClass::Value(accumulator.add_value(field_index, data_offset, value, fts_matches))
2715 } else if fts_matches {
2716 OffsetClass::FtsMatch
2717 } else {
2718 OffsetClass::Irrelevant
2719 }
2720}
2721
2722fn classify_data_for_accumulator(
2723 file: &JournalFile<Mmap>,
2724 row: &mut CurrentRowView,
2725 data_offset: NonZeroU64,
2726 accumulator: &mut ExplorerAccumulator,
2727 needs_fts: bool,
2728 query: &ExplorerQuery,
2729 mut column_fields: Option<&mut Vec<Vec<u8>>>,
2730 stats: &mut ExplorerStats,
2731) -> Result<OffsetClass> {
2732 if let Some(class) = cached_offset_class_for_accumulator(
2733 file,
2734 row,
2735 data_offset,
2736 accumulator,
2737 column_fields.as_mut().map(|fields| &mut **fields),
2738 stats,
2739 )? {
2740 return Ok(class);
2741 }
2742
2743 let payload = payload_for_classification(file, row, data_offset, stats)?;
2744 let Some((field, value)) = split_payload_bytes(payload) else {
2745 let class = classify_unstructured_payload(payload, needs_fts, query, stats);
2746 accumulator.offset_cache.insert(data_offset, class);
2747 stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2748 return Ok(class);
2749 };
2750 if let Some(column_fields) = column_fields {
2751 column_fields.push(field.to_vec());
2752 }
2753
2754 let (fts_matches, fts_negative_match) = fts_flags_for_value(value, needs_fts, query, stats);
2755 let class = structured_payload_class(
2756 field,
2757 value,
2758 data_offset,
2759 accumulator,
2760 fts_matches,
2761 fts_negative_match,
2762 );
2763 accumulator.offset_cache.insert(data_offset, class);
2764 stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2765 Ok(class)
2766}
2767
2768fn read_payload_field(
2769 file: &JournalFile<Mmap>,
2770 row: &mut CurrentRowView,
2771 data_offset: NonZeroU64,
2772 stats: &mut ExplorerStats,
2773) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
2774 let was_compressed = file.data_ref(data_offset)?.is_compressed();
2775 let payload = row.read_payload_at(file, data_offset)?;
2776 if was_compressed {
2777 stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2778 }
2779 let payload = row.payload_slice(payload);
2780 Ok(split_payload_bytes(payload).map(|(field, value)| (field.to_vec(), value.to_vec())))
2781}
2782
2783fn classify_unstructured_payload(
2784 payload: &[u8],
2785 needs_fts: bool,
2786 query: &ExplorerQuery,
2787 stats: &mut ExplorerStats,
2788) -> OffsetClass {
2789 if !needs_fts {
2790 return OffsetClass::Irrelevant;
2791 }
2792 stats.fts_scans = stats.fts_scans.saturating_add(1);
2793 match match_fts_query(payload, query) {
2794 FtsTermMatch::Positive => OffsetClass::FtsMatch,
2795 FtsTermMatch::Negative => OffsetClass::FtsNegativeMatch,
2796 FtsTermMatch::None => OffsetClass::Irrelevant,
2797 }
2798}
2799
2800fn histogram_bucket_index_from_bounds(
2801 realtime_usec: u64,
2802 start_realtime_usec: u64,
2803 bucket_width_usec: u64,
2804 bucket_count: usize,
2805) -> Option<usize> {
2806 if bucket_count == 0 {
2807 return None;
2808 }
2809 realtime_usec
2810 .saturating_sub(start_realtime_usec)
2811 .checked_div(bucket_width_usec.max(1))
2812 .map(|index| (index as usize).min(bucket_count - 1))
2813}
2814
2815fn validate_query(query: &ExplorerQuery) -> Result<()> {
2816 if query
2817 .after_realtime_usec
2818 .zip(query.before_realtime_usec)
2819 .is_some_and(|(after, before)| after > before)
2820 {
2821 return Err(SdkError::InvalidPath(
2822 "after_realtime_usec must be <= before_realtime_usec".to_string(),
2823 ));
2824 }
2825 for filter in &query.filters {
2826 if filter.field.is_empty() || filter.field.contains(&b'=') {
2827 return Err(SdkError::InvalidPath(
2828 "filter field must be non-empty and must not contain '='".to_string(),
2829 ));
2830 }
2831 }
2832 for field in query.facets.iter().chain(query.histogram.iter()) {
2833 if field.is_empty() || field.contains(&b'=') {
2834 return Err(SdkError::InvalidPath(
2835 "facet and histogram fields must be non-empty and must not contain '='".to_string(),
2836 ));
2837 }
2838 }
2839 let mut seen_facets: HashSet<&[u8]> = HashSet::new();
2840 for facet in &query.facets {
2841 if !seen_facets.insert(facet) {
2842 return Err(SdkError::InvalidPath(
2843 "facet fields must not be duplicated".to_string(),
2844 ));
2845 }
2846 }
2847 Ok(())
2848}
2849
2850fn validate_no_debug_column_collection(query: &ExplorerQuery) -> Result<()> {
2851 if query.debug_collect_column_fields_by_row_traversal {
2852 return Err(SdkError::Unsupported(
2853 "debug_collect_column_fields_by_row_traversal is a debug-only discrepancy tool; production explorer queries must use FIELD-index column catalogs instead",
2854 ));
2855 }
2856 Ok(())
2857}
2858
2859fn validate_indexed_query(query: &ExplorerQuery) -> Result<()> {
2860 if query.field_mode != ExplorerFieldMode::AllValues {
2861 return Err(SdkError::Unsupported(
2862 "indexed explorer strategy requires ExplorerFieldMode::AllValues",
2863 ));
2864 }
2865 if query_has_fts(query) {
2866 return Err(SdkError::Unsupported(
2867 "indexed explorer strategy does not support FTS",
2868 ));
2869 }
2870 if query.use_source_realtime
2871 && (query.after_realtime_usec.is_some()
2872 || query.before_realtime_usec.is_some()
2873 || query.histogram.is_some())
2874 {
2875 return Err(SdkError::Unsupported(
2876 "indexed explorer strategy requires commit realtime for time-bounded facets and histograms",
2877 ));
2878 }
2879 Ok(())
2880}
2881
2882fn explorer_outputs_match(left: &ExplorerResult, right: &ExplorerResult) -> bool {
2883 if left.rows.len() != right.rows.len() {
2884 return false;
2885 }
2886 if left.rows.iter().zip(&right.rows).any(|(left, right)| {
2887 left.realtime_usec != right.realtime_usec
2888 || left.cursor != right.cursor
2889 || left.payloads != right.payloads
2890 }) {
2891 return false;
2892 }
2893 if left.facets != right.facets {
2894 return false;
2895 }
2896 explorer_histograms_match(left.histogram.as_ref(), right.histogram.as_ref())
2897}
2898
2899fn explorer_histograms_match(
2900 left: Option<&ExplorerHistogram>,
2901 right: Option<&ExplorerHistogram>,
2902) -> bool {
2903 match (left, right) {
2904 (None, None) => true,
2905 (Some(left), Some(right)) => {
2906 left.field == right.field
2907 && left.buckets.len() == right.buckets.len()
2908 && left
2909 .buckets
2910 .iter()
2911 .zip(&right.buckets)
2912 .all(|(left, right)| {
2913 left.start_realtime_usec == right.start_realtime_usec
2914 && left.end_realtime_usec == right.end_realtime_usec
2915 && left.values == right.values
2916 })
2917 }
2918 _ => false,
2919 }
2920}
2921
2922fn query_needs_source_realtime_main(query: &ExplorerQuery) -> bool {
2923 query.use_source_realtime
2924 && (query.after_realtime_usec.is_some()
2925 || query.before_realtime_usec.is_some()
2926 || query.histogram.is_some()
2927 || query.limit > 0)
2928}
2929
2930fn facet_pass_needs_source_realtime(query: &ExplorerQuery) -> bool {
2931 query.use_source_realtime
2932 && (query.after_realtime_usec.is_some() || query.before_realtime_usec.is_some())
2933}
2934
2935fn query_needs_main_pass(query: &ExplorerQuery) -> bool {
2936 query.limit > 0 || query.histogram.is_some()
2937}
2938
2939fn explorer_result_for_query(query: &ExplorerQuery) -> ExplorerResult {
2940 ExplorerResult {
2941 histogram: query
2942 .histogram
2943 .as_ref()
2944 .map(|field| new_histogram(field, query)),
2945 ..ExplorerResult::default()
2946 }
2947}
2948
2949fn explorer_control_stopped(control: Option<&ExplorerControl<'_>>) -> bool {
2950 control.and_then(ExplorerControl::stop_reason).is_some()
2951}
2952
2953fn can_run_combined_explorer_pass(facet_groups: &[FacetPassGroup]) -> bool {
2954 facet_groups
2955 .iter()
2956 .all(|group| group.excluded_field.is_none())
2957}
2958
2959fn combined_facet_indices(facet_groups: &[FacetPassGroup]) -> Vec<usize> {
2960 facet_groups
2961 .iter()
2962 .flat_map(|group| group.facet_indices.iter().copied())
2963 .collect()
2964}
2965
2966fn record_combined_unsampled_row(
2967 stats: &mut ExplorerStats,
2968 mode: CombinedScanMode,
2969 commit_realtime: u64,
2970 row_count: u64,
2971 count_rows_unsampled: bool,
2972) {
2973 record_last_realtime(stats, commit_realtime);
2974 if mode.include_main {
2975 stats.rows_matched = stats.rows_matched.saturating_add(row_count);
2976 }
2977 if mode.include_facets {
2978 stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(row_count);
2979 }
2980 if count_rows_unsampled {
2981 stats.rows_unsampled = stats.rows_unsampled.saturating_add(row_count);
2982 }
2983 stats.sampling_unsampled = stats.sampling_unsampled.saturating_add(1);
2984}
2985
2986fn update_combined_matched_stats(
2987 stats: &mut ExplorerStats,
2988 mode: CombinedScanMode,
2989 effective_realtime: u64,
2990 control: Option<&mut ExplorerControl<'_>>,
2991) -> bool {
2992 let mut stop_after_matched_row = false;
2993 if mode.include_main {
2994 stats.rows_matched = stats.rows_matched.saturating_add(1);
2995 stop_after_matched_row = control
2996 .map(|control| control.emit_matched_row(effective_realtime, stats.rows_matched))
2997 .unwrap_or(false);
2998 }
2999 if mode.include_facets {
3000 stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(1);
3001 }
3002 stop_after_matched_row
3003}
3004
3005fn should_stop_when_rows_full(
3006 query: &ExplorerQuery,
3007 rows: &[ExplorerRow],
3008 effective_realtime: u64,
3009 rows_matched: u64,
3010) -> bool {
3011 if !query.stop_when_rows_full || query.limit == 0 || rows.len() < query.limit {
3012 return false;
3013 }
3014 let every = query.stop_when_rows_full_check_every.max(1);
3015 if rows_matched == 0 || rows_matched % every != 0 {
3016 return false;
3017 }
3018 match query.direction {
3019 Direction::Backward => {
3020 rows.iter()
3021 .map(|row| row.realtime_usec)
3022 .min()
3023 .is_some_and(|oldest| {
3024 effective_realtime < oldest.saturating_sub(query.realtime_slack_usec)
3025 })
3026 }
3027 Direction::Forward => {
3028 rows.iter()
3029 .map(|row| row.realtime_usec)
3030 .max()
3031 .is_some_and(|newest| {
3032 effective_realtime > newest.saturating_add(query.realtime_slack_usec)
3033 })
3034 }
3035 }
3036}
3037
3038fn row_candidate_to_keep(query: &ExplorerQuery, rows: &[ExplorerRow], realtime_usec: u64) -> bool {
3039 if query.limit == 0 {
3040 return false;
3041 }
3042 if !row_within_anchor(query, realtime_usec) {
3043 return false;
3044 }
3045 if rows.len() < query.limit {
3046 return true;
3047 }
3048 match query.direction {
3049 Direction::Backward => rows
3050 .iter()
3051 .map(|row| row.realtime_usec)
3052 .min()
3053 .is_some_and(|oldest| realtime_usec >= oldest),
3054 Direction::Forward => rows
3055 .iter()
3056 .map(|row| row.realtime_usec)
3057 .max()
3058 .is_some_and(|newest| realtime_usec <= newest),
3059 }
3060}
3061
3062fn row_within_anchor(query: &ExplorerQuery, realtime_usec: u64) -> bool {
3063 match (query.direction, query.anchor) {
3064 (Direction::Forward, ExplorerAnchor::Realtime(anchor)) => realtime_usec > anchor,
3065 (Direction::Backward, ExplorerAnchor::Realtime(anchor)) => realtime_usec <= anchor,
3066 _ => true,
3067 }
3068}
3069
3070fn add_special_histogram_value(
3071 histogram: Option<&mut ExplorerHistogram>,
3072 realtime_usec: u64,
3073 value: &[u8],
3074 count: u64,
3075 stats: &mut ExplorerStats,
3076) {
3077 let Some(histogram) = histogram else {
3078 return;
3079 };
3080 let Some(bucket_index) = histogram_bucket_index(histogram, realtime_usec) else {
3081 return;
3082 };
3083 if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
3084 increment_counter_by(&mut bucket.values, value, count);
3085 stats.histogram_updates = stats.histogram_updates.saturating_add(1);
3086 }
3087}
3088
3089fn add_estimated_histogram_range(
3090 histogram: Option<&mut ExplorerHistogram>,
3091 from_realtime_usec: u64,
3092 to_realtime_usec: u64,
3093 entries: u64,
3094 stats: &mut ExplorerStats,
3095) {
3096 let Some(histogram) = histogram else {
3097 return;
3098 };
3099 if entries == 0 || from_realtime_usec >= to_realtime_usec {
3100 return;
3101 }
3102
3103 let Some(first) = histogram.buckets.first() else {
3104 return;
3105 };
3106 let Some(last) = histogram.buckets.last() else {
3107 return;
3108 };
3109 let from_realtime_usec = from_realtime_usec.max(first.start_realtime_usec);
3110 let to_realtime_usec = to_realtime_usec.min(last.end_realtime_usec);
3111 if from_realtime_usec >= to_realtime_usec {
3112 return;
3113 }
3114
3115 let total = to_realtime_usec.saturating_sub(from_realtime_usec).max(1);
3116 let mut touched = 0u64;
3117 for bucket in &mut histogram.buckets {
3118 if bucket.start_realtime_usec > to_realtime_usec {
3119 break;
3120 }
3121 let overlap_start = bucket.start_realtime_usec.max(from_realtime_usec);
3122 let overlap_end = bucket.end_realtime_usec.min(to_realtime_usec);
3123 if overlap_start >= overlap_end {
3124 continue;
3125 }
3126 let bucket_entries = ((overlap_end.saturating_sub(overlap_start) as u128 * entries as u128)
3127 / total as u128) as u64;
3128 if bucket_entries != 0 {
3129 increment_counter_by(&mut bucket.values, EXPLORER_ESTIMATED_VALUE, bucket_entries);
3130 }
3131 touched = touched.saturating_add(1);
3132 }
3133 stats.histogram_updates = stats.histogram_updates.saturating_add(touched);
3134}
3135
3136fn histogram_bucket_index(histogram: &ExplorerHistogram, realtime_usec: u64) -> Option<usize> {
3137 let first = histogram.buckets.first()?;
3138 let width = first
3139 .end_realtime_usec
3140 .saturating_sub(first.start_realtime_usec)
3141 .max(1);
3142 histogram_bucket_index_from_bounds(
3143 realtime_usec,
3144 first.start_realtime_usec,
3145 width,
3146 histogram.buckets.len(),
3147 )
3148}
3149
3150fn payload_from_parts(field: &[u8], value: &[u8]) -> Vec<u8> {
3151 let mut out = Vec::with_capacity(field.len() + 1 + value.len());
3152 out.extend_from_slice(field);
3153 out.push(b'=');
3154 out.extend_from_slice(value);
3155 out
3156}
3157
3158fn split_payload_bytes(payload: &[u8]) -> Option<(&[u8], &[u8])> {
3159 let eq = payload.iter().position(|byte| *byte == b'=')?;
3160 Some((&payload[..eq], &payload[eq + 1..]))
3161}
3162
3163fn parse_source_realtime(value: &[u8]) -> Option<u64> {
3164 std::str::from_utf8(value).ok()?.parse().ok()
3165}
3166
3167fn effective_realtime_from_scan(source_realtime: Option<u64>, commit_realtime: u64) -> u64 {
3168 match source_realtime {
3169 Some(source_realtime) if source_realtime != 0 && source_realtime < commit_realtime => {
3170 source_realtime
3171 }
3172 _ => commit_realtime,
3173 }
3174}
3175
3176fn record_last_realtime(stats: &mut ExplorerStats, commit_realtime: u64) {
3177 if commit_realtime > stats.last_realtime_usec {
3178 stats.last_realtime_usec = commit_realtime;
3179 }
3180}
3181
3182fn record_source_realtime_delta(
3183 stats: &mut ExplorerStats,
3184 source_realtime: Option<u64>,
3185 commit_realtime: u64,
3186) {
3187 let Some(source_realtime) = source_realtime else {
3188 return;
3189 };
3190 if source_realtime == 0 || source_realtime >= commit_realtime {
3191 return;
3192 }
3193 let delta = commit_realtime.saturating_sub(source_realtime);
3194 if delta > stats.max_source_realtime_delta_usec {
3195 stats.max_source_realtime_delta_usec = delta;
3196 }
3197}
3198
3199fn query_has_fts(query: &ExplorerQuery) -> bool {
3200 !query.fts_terms.is_empty()
3201 || !query.fts_patterns.is_empty()
3202 || !query.fts_negative_patterns.is_empty()
3203}
3204
3205fn query_has_positive_fts(query: &ExplorerQuery) -> bool {
3206 if !query.fts_terms.is_empty() {
3207 query.fts_terms.iter().any(|term| !term.negative)
3208 } else {
3209 !query.fts_patterns.is_empty()
3210 }
3211}
3212
3213fn row_rejected_by_fts(query: &ExplorerQuery, scan: &RowScan) -> bool {
3214 query_has_fts(query)
3215 && (scan.fts_negative_match || query_has_positive_fts(query) && !scan.fts_matches)
3216}
3217
3218#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3219enum FtsTermMatch {
3220 None,
3221 Positive,
3222 Negative,
3223}
3224
3225fn match_fts_query(value: &[u8], query: &ExplorerQuery) -> FtsTermMatch {
3226 if !query.fts_terms.is_empty() {
3227 for term in &query.fts_terms {
3228 if term.matches(value) {
3229 return if term.negative {
3230 FtsTermMatch::Negative
3231 } else {
3232 FtsTermMatch::Positive
3233 };
3234 }
3235 }
3236 return FtsTermMatch::None;
3237 }
3238
3239 if matches_fts(value, &query.fts_negative_patterns) {
3240 FtsTermMatch::Negative
3241 } else if matches_fts(value, &query.fts_patterns) {
3242 FtsTermMatch::Positive
3243 } else {
3244 FtsTermMatch::None
3245 }
3246}
3247
3248fn matches_fts(value: &[u8], patterns: &[Vec<u8>]) -> bool {
3249 patterns
3250 .iter()
3251 .filter(|pattern| !pattern.is_empty())
3252 .any(|pattern| contains_ascii_case_insensitive(value, pattern))
3253}
3254
3255fn contains_ascii_case_insensitive(haystack: &[u8], needle: &[u8]) -> bool {
3256 if needle.is_empty() {
3257 return true;
3258 }
3259 if haystack.len() < needle.len() {
3260 return false;
3261 }
3262 haystack.windows(needle.len()).any(|window| {
3263 window
3264 .iter()
3265 .zip(needle)
3266 .all(|(left, right)| left.eq_ignore_ascii_case(right))
3267 })
3268}
3269
3270fn find_ascii_case_insensitive(haystack: &[u8], needle: &[u8]) -> Option<usize> {
3271 if needle.is_empty() {
3272 return Some(0);
3273 }
3274 if haystack.len() < needle.len() {
3275 return None;
3276 }
3277 haystack.windows(needle.len()).position(|window| {
3278 window
3279 .iter()
3280 .zip(needle)
3281 .all(|(left, right)| left.eq_ignore_ascii_case(right))
3282 })
3283}
3284
3285fn timestamp_in_range(query: &ExplorerQuery, timestamp: u64) -> bool {
3286 if query
3287 .after_realtime_usec
3288 .is_some_and(|after| timestamp < after)
3289 {
3290 return false;
3291 }
3292 if query
3293 .before_realtime_usec
3294 .is_some_and(|before| timestamp > before)
3295 {
3296 return false;
3297 }
3298 true
3299}
3300
3301fn stop_by_commit_time(query: &ExplorerQuery, commit_realtime: u64) -> bool {
3302 match query.direction {
3306 Direction::Forward => query.before_realtime_usec.is_some_and(|before| {
3307 commit_realtime > before.saturating_add(query.realtime_slack_usec)
3308 }),
3309 Direction::Backward => query
3310 .after_realtime_usec
3311 .is_some_and(|after| commit_realtime < after),
3312 }
3313}
3314
3315fn skip_by_commit_time(query: &ExplorerQuery, commit_realtime: u64) -> bool {
3316 match query.direction {
3317 Direction::Forward => query
3318 .after_realtime_usec
3319 .is_some_and(|after| commit_realtime < after),
3320 Direction::Backward => query.before_realtime_usec.is_some_and(|before| {
3321 commit_realtime > before.saturating_add(query.realtime_slack_usec)
3322 }),
3323 }
3324}
3325
3326fn new_histogram(field: &[u8], query: &ExplorerQuery) -> ExplorerHistogram {
3327 let (start, end) = histogram_bounds(query);
3328 let target_buckets = query.histogram_target_buckets.max(1);
3329 let mut width = histogram_bar_width_usec(start, end, target_buckets);
3330 let start = histogram_slot_baseline_usec(start, width);
3331 let mut end = histogram_slot_baseline_usec(end, width).saturating_add(width);
3332 let mut bucket_count = end
3333 .saturating_sub(start)
3334 .checked_div(width)
3335 .unwrap_or(0)
3336 .saturating_add(1) as usize;
3337 if bucket_count > 1001 {
3338 bucket_count = 1001;
3339 width = end
3340 .saturating_sub(start)
3341 .checked_div(1000)
3342 .unwrap_or(0)
3343 .max(1);
3344 end = start.saturating_add(width.saturating_mul(1000));
3345 }
3346 let mut buckets = Vec::with_capacity(bucket_count);
3347 for index in 0..bucket_count {
3348 let bucket_start = start.saturating_add(width.saturating_mul(index as u64));
3349 let bucket_end = if index + 1 == bucket_count {
3350 end.saturating_add(1)
3351 } else {
3352 bucket_start.saturating_add(width)
3353 };
3354 buckets.push(ExplorerHistogramBucket {
3355 start_realtime_usec: bucket_start,
3356 end_realtime_usec: bucket_end,
3357 values: HashMap::new(),
3358 });
3359 }
3360 ExplorerHistogram {
3361 field: field.to_vec(),
3362 buckets,
3363 }
3364}
3365
3366pub(crate) fn empty_histogram_for_query(field: &[u8], query: &ExplorerQuery) -> ExplorerHistogram {
3367 new_histogram(field, query)
3368}
3369
3370fn histogram_bar_width_usec(after: u64, before: u64, target_buckets: usize) -> u64 {
3371 const USEC_PER_SEC: u64 = 1_000_000;
3372 const VALID_DURATIONS_SECONDS: &[u64] = &[
3373 1, 2, 5, 10, 15, 30, 60, 120, 180, 300, 600, 900, 1800, 3600, 7200, 21600, 28800, 43200,
3374 86400, 172800, 259200, 432000, 604800, 1209600, 2592000,
3375 ];
3376 let duration = before.saturating_sub(after);
3377 for seconds in VALID_DURATIONS_SECONDS.iter().rev() {
3378 let width = seconds.saturating_mul(USEC_PER_SEC);
3379 if width != 0 && duration / width >= target_buckets as u64 {
3380 return width;
3381 }
3382 }
3383 USEC_PER_SEC
3384}
3385
3386fn histogram_slot_baseline_usec(value: u64, width: u64) -> u64 {
3387 value.saturating_sub(value % width.max(1))
3388}
3389
3390fn histogram_bounds(query: &ExplorerQuery) -> (u64, u64) {
3391 let start = query
3392 .histogram_after_realtime_usec
3393 .or(query.after_realtime_usec)
3394 .unwrap_or(0);
3395 let end = query
3396 .histogram_before_realtime_usec
3397 .or(query.before_realtime_usec)
3398 .unwrap_or_else(|| start.saturating_add(3_600_000_000));
3399 if end <= start {
3400 (start, start.saturating_add(1))
3401 } else {
3402 (start, end)
3403 }
3404}
3405
3406fn increment_counter_by(map: &mut HashMap<Vec<u8>, u64>, value: &[u8], delta: u64) {
3407 if let Some(count) = map.get_mut(value) {
3408 *count = count.saturating_add(delta);
3409 } else {
3410 map.insert(value.to_vec(), delta);
3411 }
3412}
3413
3414#[cfg(test)]
3415mod tests {
3416 use super::*;
3417 use journal_core::file::{JournalFileOptions, JournalWriter, MmapMut};
3418 use journal_core::repository::File as RepoFile;
3419 use tempfile::TempDir;
3420
3421 fn test_uuid(seed: u8) -> uuid::Uuid {
3422 uuid::Uuid::from_bytes([seed; 16])
3423 }
3424
3425 fn create_writer(
3426 path: &std::path::Path,
3427 compression: Option<(Compression, usize)>,
3428 ) -> (JournalFile<MmapMut>, JournalWriter) {
3429 if let Some(parent) = path.parent() {
3430 std::fs::create_dir_all(parent).expect("create journal parent");
3431 }
3432 let repo_file = RepoFile::from_path(path).expect("repo file");
3433 let mut options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
3434 if let Some((compression, threshold)) = compression {
3435 options = options
3436 .with_compression(compression)
3437 .with_compress_threshold(threshold);
3438 }
3439 let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
3440 let writer = if let Some((compression, threshold)) = compression {
3441 JournalWriter::new_with_compression(&mut file, 1, test_uuid(4), compression, threshold)
3442 .expect("writer")
3443 } else {
3444 JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer")
3445 };
3446 (file, writer)
3447 }
3448
3449 fn write_entries(
3450 path: &std::path::Path,
3451 compression: Option<(Compression, usize)>,
3452 entries: &[(&[&[u8]], u64)],
3453 ) {
3454 let (mut file, mut writer) = create_writer(path, compression);
3455 for (payloads, realtime) in entries {
3456 writer
3457 .add_entry(&mut file, payloads, *realtime, *realtime)
3458 .expect("write entry");
3459 }
3460 file.sync().expect("sync journal");
3461 }
3462
3463 fn write_many_entries(path: &std::path::Path, count: usize) {
3464 let (mut file, mut writer) = create_writer(path, None);
3465 for index in 0..count {
3466 let message = format!("MESSAGE=row-{index}");
3467 let service = if index % 2 == 0 {
3468 b"SERVICE=even".as_slice()
3469 } else {
3470 b"SERVICE=odd".as_slice()
3471 };
3472 let payloads: [&[u8]; 2] = [message.as_bytes(), service];
3473 let realtime = 1_700_000_000_000_000u64.saturating_add(index as u64);
3474 writer
3475 .add_entry(&mut file, &payloads, realtime, realtime)
3476 .expect("write entry");
3477 }
3478 file.sync().expect("sync journal");
3479 }
3480
3481 #[test]
3482 fn explorer_control_reports_progress_during_large_scan() {
3483 let dir = TempDir::new().expect("tempdir");
3484 let path = dir.path().join("progress.journal");
3485 write_many_entries(&path, 9_000);
3486
3487 let mut reports = Vec::new();
3488 let mut progress = |progress: ExplorerProgress| {
3489 reports.push(progress.stats.rows_examined);
3490 };
3491 let mut control = ExplorerControl::new();
3492 control.set_progress_interval(Duration::ZERO);
3493 control.set_progress_callback(Some(&mut progress));
3494 let mut reader = FileReader::open(&path).expect("open reader");
3495 let query = ExplorerQuery {
3496 facets: vec![b"SERVICE".to_vec()],
3497 limit: 0,
3498 ..ExplorerQuery::default()
3499 };
3500
3501 let result = reader
3502 .explore_with_strategy_and_control(&query, ExplorerStrategy::Traversal, &mut control)
3503 .expect("explore");
3504
3505 assert_eq!(control.stop_reason(), None);
3506 assert_eq!(result.stats.rows_examined, 9_000);
3507 assert!(!reports.is_empty());
3508 assert!(reports.iter().any(|rows| *rows >= 8_191));
3509 }
3510
3511 #[test]
3512 fn explorer_control_cancels_inside_large_scan() {
3513 let dir = TempDir::new().expect("tempdir");
3514 let path = dir.path().join("cancel.journal");
3515 write_many_entries(&path, 9_000);
3516
3517 let is_cancelled = || true;
3518 let mut control = ExplorerControl::new();
3519 control.set_cancellation_callback(Some(&is_cancelled));
3520 let mut reader = FileReader::open(&path).expect("open reader");
3521 let query = ExplorerQuery {
3522 facets: vec![b"SERVICE".to_vec()],
3523 limit: 0,
3524 ..ExplorerQuery::default()
3525 };
3526
3527 let result = reader
3528 .explore_with_strategy_and_control(&query, ExplorerStrategy::Traversal, &mut control)
3529 .expect("explore");
3530
3531 assert_eq!(control.stop_reason(), Some(ExplorerStopReason::Cancelled));
3532 assert!(result.stats.rows_examined < 9_000);
3533 }
3534
3535 #[test]
3536 fn explorer_filters_with_or_values_and_and_fields() {
3537 let dir = TempDir::new().expect("tempdir");
3538 let path = dir.path().join("filter.journal");
3539 write_entries(
3540 &path,
3541 None,
3542 &[
3543 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3544 (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
3545 (&[b"SERVICE=b", b"PRIORITY=4"], 3_000),
3546 ],
3547 );
3548
3549 let mut reader = FileReader::open(&path).expect("open reader");
3550 let query = ExplorerQuery {
3551 filters: vec![
3552 ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec(), b"b".to_vec()]),
3553 ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
3554 ],
3555 facets: vec![b"SERVICE".to_vec()],
3556 limit: 10,
3557 ..ExplorerQuery::default()
3558 };
3559
3560 let result = reader.explore(&query).expect("explore");
3561 assert_eq!(result.rows.len(), 2);
3562 let service = result
3563 .facets
3564 .get(b"SERVICE".as_slice())
3565 .expect("service facet");
3566 assert_eq!(service.get(b"a".as_slice()), Some(&1));
3567 assert_eq!(service.get(b"b".as_slice()), Some(&1));
3568 assert!(result.stats.data_cache_misses > 0);
3569 }
3570
3571 #[test]
3572 fn explorer_rejects_debug_row_traversal_column_collection() {
3573 let dir = TempDir::new().expect("tempdir");
3574 let path = dir.path().join("debug-column-collection.journal");
3575 write_entries(&path, None, &[(&[b"PRIORITY=3", b"MESSAGE=hello"], 1_000)]);
3576
3577 let query = ExplorerQuery {
3578 facets: vec![b"PRIORITY".to_vec()],
3579 debug_collect_column_fields_by_row_traversal: true,
3580 ..ExplorerQuery::default()
3581 };
3582
3583 let mut reader = FileReader::open(&path).expect("open reader");
3584 let err = reader
3585 .explore(&query)
3586 .expect_err("debug-only column collection is rejected");
3587 assert!(matches!(err, SdkError::Unsupported(_)));
3588 assert!(
3589 err.to_string()
3590 .contains("debug_collect_column_fields_by_row_traversal")
3591 );
3592
3593 let mut reader = FileReader::open(&path).expect("reopen reader");
3594 let err = reader
3595 .explore_with_strategy_cursor_rows(&query, ExplorerStrategy::Traversal)
3596 .expect_err("cursor-row explorer also rejects debug-only column collection");
3597 assert!(matches!(err, SdkError::Unsupported(_)));
3598 }
3599
3600 #[test]
3601 fn explorer_skips_irrelevant_compressed_data_for_facets() {
3602 let dir = TempDir::new().expect("tempdir");
3603 let path = dir.path().join("compressed.journal");
3604 let large_message = b"MESSAGE=abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
3605 write_entries(
3606 &path,
3607 Some((Compression::Zstd, 32)),
3608 &[(&[b"PRIORITY=3", large_message], 1_000)],
3609 );
3610
3611 let mut reader = FileReader::open(&path).expect("open reader");
3612 let query = ExplorerQuery {
3613 facets: vec![b"PRIORITY".to_vec()],
3614 limit: 0,
3615 ..ExplorerQuery::default()
3616 };
3617
3618 let result = reader.explore(&query).expect("explore");
3619 let priority = result
3620 .facets
3621 .get(b"PRIORITY".as_slice())
3622 .expect("priority facet");
3623 assert_eq!(priority.get(b"3".as_slice()), Some(&1));
3624 assert_eq!(result.stats.payloads_decompressed, 0);
3625 assert_eq!(result.stats.data_refs_seen, 1);
3626 assert_eq!(result.stats.early_stops, 1);
3627 }
3628
3629 #[test]
3630 fn explorer_reuses_classified_data_objects() {
3631 let dir = TempDir::new().expect("tempdir");
3632 let path = dir.path().join("reuse.journal");
3633 write_entries(
3634 &path,
3635 None,
3636 &[
3637 (&[b"PRIORITY=3"], 1_000),
3638 (&[b"PRIORITY=3"], 2_000),
3639 (&[b"PRIORITY=3"], 3_000),
3640 ],
3641 );
3642
3643 let mut reader = FileReader::open(&path).expect("open reader");
3644 let query = ExplorerQuery {
3645 facets: vec![b"PRIORITY".to_vec()],
3646 limit: 0,
3647 ..ExplorerQuery::default()
3648 };
3649
3650 let result = reader.explore(&query).expect("explore");
3651 let priority = result
3652 .facets
3653 .get(b"PRIORITY".as_slice())
3654 .expect("priority facet");
3655 assert_eq!(priority.get(b"3".as_slice()), Some(&3));
3656 assert!(result.stats.data_cache_hits >= 2);
3657 }
3658
3659 #[test]
3660 fn explorer_groups_facets_with_same_filter_set() {
3661 let dir = TempDir::new().expect("tempdir");
3662 let path = dir.path().join("grouped-facets.journal");
3663 write_entries(
3664 &path,
3665 None,
3666 &[
3667 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3668 (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3669 ],
3670 );
3671
3672 let mut reader = FileReader::open(&path).expect("open reader");
3673 let query = ExplorerQuery {
3674 facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
3675 limit: 0,
3676 ..ExplorerQuery::default()
3677 };
3678
3679 let result = reader.explore(&query).expect("explore");
3680 assert_eq!(result.stats.rows_examined, 2);
3681 assert_eq!(result.stats.facet_rows_matched, 2);
3682 assert_eq!(
3683 result
3684 .facets
3685 .get(b"SERVICE".as_slice())
3686 .and_then(|values| values.get(b"a".as_slice())),
3687 Some(&1)
3688 );
3689 assert_eq!(
3690 result
3691 .facets
3692 .get(b"PRIORITY".as_slice())
3693 .and_then(|values| values.get(b"4".as_slice())),
3694 Some(&1)
3695 );
3696 }
3697
3698 #[test]
3699 fn explorer_combines_rows_histogram_and_facets_in_one_pass() {
3700 let dir = TempDir::new().expect("tempdir");
3701 let path = dir.path().join("combined-pass.journal");
3702 write_entries(
3703 &path,
3704 None,
3705 &[
3706 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3707 (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3708 ],
3709 );
3710
3711 let mut reader = FileReader::open(&path).expect("open reader");
3712 let query = ExplorerQuery {
3713 facets: vec![b"SERVICE".to_vec()],
3714 histogram: Some(b"PRIORITY".to_vec()),
3715 histogram_target_buckets: 2,
3716 limit: 2,
3717 ..ExplorerQuery::default()
3718 };
3719
3720 let result = reader.explore(&query).expect("explore");
3721 assert_eq!(result.rows.len(), 2);
3722 assert_eq!(result.stats.rows_examined, 2);
3723 assert_eq!(result.stats.rows_matched, 2);
3724 assert_eq!(result.stats.facet_rows_matched, 2);
3725 assert_eq!(
3726 result
3727 .facets
3728 .get(b"SERVICE".as_slice())
3729 .and_then(|values| values.get(b"a".as_slice())),
3730 Some(&1)
3731 );
3732 let histogram_total = result
3733 .histogram
3734 .as_ref()
3735 .expect("histogram")
3736 .buckets
3737 .iter()
3738 .flat_map(|bucket| bucket.values.values())
3739 .sum::<u64>();
3740 assert_eq!(histogram_total, 2);
3741 }
3742
3743 #[test]
3744 fn explorer_sampling_uses_actual_histogram_bucket_count() {
3745 let query = ExplorerQuery {
3746 after_realtime_usec: Some(1_733_494_460_000_000),
3747 before_realtime_usec: Some(1_735_656_412_000_000),
3748 histogram: Some(b"PRIORITY".to_vec()),
3749 histogram_target_buckets: 300,
3750 sampling: Some(ExplorerSampling {
3751 budget: 20_000,
3752 matched_files: 200,
3753 file_head_realtime_usec: 1_733_494_460_000_000,
3754 file_tail_realtime_usec: 1_735_656_412_000_000,
3755 file_head_seqnum: 1,
3756 file_tail_seqnum: 2,
3757 file_entries: 2,
3758 }),
3759 ..ExplorerQuery::default()
3760 };
3761
3762 let bucket_count = histogram_bucket_count_for_query(&query).expect("bucket count");
3763 let sampling =
3764 ExplorerSamplingState::for_query(&query, Some(bucket_count)).expect("sampling");
3765
3766 assert_eq!(bucket_count, 302);
3767 assert_eq!(sampling.per_slot_sampled.len(), bucket_count);
3768 }
3769
3770 #[test]
3771 fn explorer_sampling_seqnum_estimate_clamps_over_scanned_to_one() {
3772 let query = ExplorerQuery {
3773 after_realtime_usec: Some(1),
3774 before_realtime_usec: Some(100),
3775 direction: Direction::Forward,
3776 sampling: Some(ExplorerSampling {
3777 budget: 20,
3778 matched_files: 1,
3779 file_head_realtime_usec: 1,
3780 file_tail_realtime_usec: 100,
3781 file_head_seqnum: 1,
3782 file_tail_seqnum: 100,
3783 file_entries: 3,
3784 }),
3785 ..ExplorerQuery::default()
3786 };
3787 let mut sampling = ExplorerSamplingState::for_query(&query, None).expect("sampling");
3788 sampling.per_file_sampled = 10;
3789
3790 assert_eq!(sampling.estimate_remaining_rows_by_seqnum(5), Some(1));
3791 }
3792
3793 #[test]
3794 fn explorer_estimated_histogram_distribution_matches_netdata_integer_math() {
3795 let mut histogram = ExplorerHistogram {
3796 field: b"PRIORITY".to_vec(),
3797 buckets: vec![
3798 ExplorerHistogramBucket {
3799 start_realtime_usec: 0,
3800 end_realtime_usec: 10,
3801 values: HashMap::new(),
3802 },
3803 ExplorerHistogramBucket {
3804 start_realtime_usec: 10,
3805 end_realtime_usec: 20,
3806 values: HashMap::new(),
3807 },
3808 ExplorerHistogramBucket {
3809 start_realtime_usec: 20,
3810 end_realtime_usec: 30,
3811 values: HashMap::new(),
3812 },
3813 ],
3814 };
3815 let mut stats = ExplorerStats::default();
3816
3817 add_estimated_histogram_range(Some(&mut histogram), 0, 30, 10, &mut stats);
3818
3819 let counts = histogram
3820 .buckets
3821 .iter()
3822 .map(|bucket| {
3823 bucket
3824 .values
3825 .get(EXPLORER_ESTIMATED_VALUE)
3826 .copied()
3827 .unwrap_or_default()
3828 })
3829 .collect::<Vec<_>>();
3830 assert_eq!(counts, vec![3, 3, 3]);
3831 assert_eq!(counts.iter().sum::<u64>(), 9);
3832 }
3833
3834 #[test]
3835 fn explorer_filters_then_combines_outputs_in_one_candidate_pass() {
3836 let dir = TempDir::new().expect("tempdir");
3837 let path = dir.path().join("filtered-combined-pass.journal");
3838 write_entries(
3839 &path,
3840 None,
3841 &[
3842 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3843 (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3844 (&[b"SERVICE=c", b"PRIORITY=3"], 3_000),
3845 ],
3846 );
3847
3848 let mut reader = FileReader::open(&path).expect("open reader");
3849 let query = ExplorerQuery {
3850 filters: vec![ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()])],
3851 facets: vec![b"SERVICE".to_vec()],
3852 histogram: Some(b"SERVICE".to_vec()),
3853 histogram_target_buckets: 2,
3854 limit: 10,
3855 ..ExplorerQuery::default()
3856 };
3857
3858 let result = reader.explore(&query).expect("explore");
3859 assert_eq!(result.rows.len(), 2);
3860 assert_eq!(result.stats.rows_examined, 2);
3861 assert_eq!(result.stats.rows_matched, 2);
3862 assert_eq!(result.stats.facet_rows_matched, 2);
3863 let service = result
3864 .facets
3865 .get(b"SERVICE".as_slice())
3866 .expect("service facet");
3867 assert_eq!(service.get(b"a".as_slice()), Some(&1));
3868 assert_eq!(service.get(b"c".as_slice()), Some(&1));
3869 assert_eq!(service.get(b"b".as_slice()), None);
3870 }
3871
3872 #[test]
3873 fn explorer_cursor_rows_defer_payload_expansion() {
3874 let dir = TempDir::new().expect("tempdir");
3875 let path = dir.path().join("cursor-only-row.journal");
3876 write_entries(
3877 &path,
3878 None,
3879 &[(&[b"SERVICE=a", b"PRIORITY=3", b"MESSAGE=hello"], 1_000)],
3880 );
3881
3882 let query = ExplorerQuery {
3883 limit: 1,
3884 ..ExplorerQuery::default()
3885 };
3886 let mut reader = FileReader::open(&path).expect("open reader");
3887 let result = reader
3888 .explore_with_strategy_cursor_rows(&query, ExplorerStrategy::Traversal)
3889 .expect("explore cursor rows");
3890
3891 assert_eq!(result.rows.len(), 1);
3892 assert!(result.rows[0].payloads.is_empty());
3893 assert_eq!(result.stats.returned_row_expansions, 0);
3894
3895 let cursor = result.rows[0].cursor.clone();
3896 let mut reader = FileReader::open(&path).expect("reopen reader");
3897 reader.seek_cursor(&cursor).expect("seek cursor");
3898 assert!(reader.test_cursor(&cursor).expect("test cursor"));
3899
3900 let mut payloads = Vec::new();
3901 reader
3902 .collect_entry_payloads(&mut payloads)
3903 .expect("collect payloads");
3904 assert!(payloads.iter().any(|payload| payload == b"MESSAGE=hello"));
3905 }
3906
3907 #[test]
3908 fn explorer_same_field_filter_exclusion_counts_filtered_out_facet_values() {
3909 let dir = TempDir::new().expect("tempdir");
3910 let path = dir.path().join("same-field-filter-facet.journal");
3911 write_entries(
3912 &path,
3913 None,
3914 &[
3915 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3916 (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
3917 (&[b"SERVICE=a", b"PRIORITY=4"], 3_000),
3918 ],
3919 );
3920
3921 let mut reader = FileReader::open(&path).expect("open reader");
3922 let query = ExplorerQuery {
3923 filters: vec![
3924 ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec()]),
3925 ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
3926 ],
3927 facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
3928 limit: 0,
3929 ..ExplorerQuery::default()
3930 };
3931
3932 let result = reader.explore(&query).expect("explore");
3933 let service = result
3934 .facets
3935 .get(b"SERVICE".as_slice())
3936 .expect("service facet");
3937 assert_eq!(service.get(b"a".as_slice()), Some(&1));
3938 assert_eq!(service.get(b"b".as_slice()), Some(&1));
3939
3940 let priority = result
3941 .facets
3942 .get(b"PRIORITY".as_slice())
3943 .expect("priority facet");
3944 assert_eq!(priority.get(b"3".as_slice()), Some(&1));
3945 assert_eq!(priority.get(b"4".as_slice()), Some(&1));
3946 }
3947
3948 #[test]
3949 fn explorer_index_strategy_matches_traversal_for_all_values() {
3950 let dir = TempDir::new().expect("tempdir");
3951 let path = dir.path().join("indexed-all-values.journal");
3952 write_entries(
3953 &path,
3954 None,
3955 &[
3956 (&[b"SERVICE=a", b"PRIORITY=3", b"TAG=x"], 1_000),
3957 (&[b"SERVICE=b", b"PRIORITY=3", b"TAG=x"], 2_000),
3958 (&[b"SERVICE=a", b"PRIORITY=4", b"TAG=y", b"TAG=z"], 3_000),
3959 (&[b"PRIORITY=3"], 4_000),
3960 ],
3961 );
3962
3963 let query = ExplorerQuery {
3964 after_realtime_usec: Some(0),
3965 before_realtime_usec: Some(5_000),
3966 filters: vec![ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()])],
3967 facets: vec![b"SERVICE".to_vec(), b"TAG".to_vec()],
3968 histogram: Some(b"SERVICE".to_vec()),
3969 histogram_target_buckets: 2,
3970 limit: 2,
3971 field_mode: ExplorerFieldMode::AllValues,
3972 use_source_realtime: false,
3973 ..ExplorerQuery::default()
3974 };
3975
3976 let mut reader = FileReader::open(&path).expect("open reader");
3977 let result = reader
3978 .explore_with_strategy(&query, ExplorerStrategy::Compare)
3979 .expect("compare");
3980
3981 let comparison = result.comparison.as_ref().expect("comparison diagnostics");
3982 assert_eq!(comparison.index_stats, result.stats);
3983 assert_eq!(comparison.traversal_stats.rows_returned, 2);
3984 assert_eq!(comparison.index_stats.rows_returned, 2);
3985
3986 assert_eq!(result.rows.len(), 2);
3987 let service = result
3988 .facets
3989 .get(b"SERVICE".as_slice())
3990 .expect("service facet");
3991 assert_eq!(service.get(b"a".as_slice()), Some(&1));
3992 assert_eq!(service.get(b"b".as_slice()), Some(&1));
3993 assert_eq!(service.get(UNSET_VALUE), Some(&1));
3994 let histogram = result.histogram.as_ref().expect("histogram");
3995 assert_eq!(histogram.buckets.len(), 2);
3996 assert_eq!(histogram.buckets[0].values.get(b"a".as_slice()), Some(&1));
3997 assert_eq!(histogram.buckets[0].values.get(b"b".as_slice()), Some(&1));
3998 assert_eq!(histogram.buckets[0].values.get(UNSET_VALUE), Some(&1));
3999 }
4000
4001 #[test]
4002 fn explorer_index_strategy_preserves_same_field_filter_exclusion() {
4003 let dir = TempDir::new().expect("tempdir");
4004 let path = dir.path().join("indexed-same-field-filter.journal");
4005 write_entries(
4006 &path,
4007 None,
4008 &[
4009 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
4010 (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
4011 (&[b"SERVICE=a", b"PRIORITY=4"], 3_000),
4012 ],
4013 );
4014
4015 let query = ExplorerQuery {
4016 filters: vec![
4017 ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec()]),
4018 ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
4019 ],
4020 facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
4021 field_mode: ExplorerFieldMode::AllValues,
4022 use_source_realtime: false,
4023 ..ExplorerQuery::default()
4024 };
4025
4026 let mut reader = FileReader::open(&path).expect("open reader");
4027 let result = reader
4028 .explore_with_strategy(&query, ExplorerStrategy::Compare)
4029 .expect("compare");
4030 let service = result
4031 .facets
4032 .get(b"SERVICE".as_slice())
4033 .expect("service facet");
4034 assert_eq!(service.get(b"a".as_slice()), Some(&1));
4035 assert_eq!(service.get(b"b".as_slice()), Some(&1));
4036 }
4037
4038 #[test]
4039 fn explorer_index_strategy_rejects_first_value_semantics() {
4040 let dir = TempDir::new().expect("tempdir");
4041 let path = dir.path().join("indexed-first-value.journal");
4042 write_entries(&path, None, &[(&[b"TAG=one", b"TAG=two"], 1_000)]);
4043
4044 let mut reader = FileReader::open(&path).expect("open reader");
4045 let err = reader
4046 .explore_with_strategy(
4047 &ExplorerQuery {
4048 facets: vec![b"TAG".to_vec()],
4049 field_mode: ExplorerFieldMode::FirstValue,
4050 ..ExplorerQuery::default()
4051 },
4052 ExplorerStrategy::Index,
4053 )
4054 .expect_err("first-value index strategy should be rejected");
4055
4056 assert!(matches!(err, SdkError::Unsupported(_)));
4057 }
4058
4059 #[test]
4060 fn explorer_first_value_counts_one_value_per_selected_field() {
4061 let dir = TempDir::new().expect("tempdir");
4062 let path = dir.path().join("first-value.journal");
4063 write_entries(
4064 &path,
4065 None,
4066 &[(&[b"TAG=one", b"TAG=two", b"SERVICE=a"], 1_000)],
4067 );
4068
4069 let mut all_values_reader = FileReader::open(&path).expect("open all-values reader");
4070 let all_values = all_values_reader
4071 .explore(&ExplorerQuery {
4072 facets: vec![b"TAG".to_vec()],
4073 limit: 0,
4074 field_mode: ExplorerFieldMode::AllValues,
4075 ..ExplorerQuery::default()
4076 })
4077 .expect("all-values explore");
4078 let all_tag = all_values
4079 .facets
4080 .get(b"TAG".as_slice())
4081 .expect("all-values tag facet");
4082 assert_eq!(all_tag.values().sum::<u64>(), 2);
4083 assert_eq!(all_tag.len(), 2);
4084
4085 let mut first_value_reader = FileReader::open(&path).expect("open first-value reader");
4086 let first_value = first_value_reader
4087 .explore(&ExplorerQuery {
4088 facets: vec![b"TAG".to_vec()],
4089 limit: 0,
4090 field_mode: ExplorerFieldMode::FirstValue,
4091 ..ExplorerQuery::default()
4092 })
4093 .expect("first-value explore");
4094 let first_tag = first_value
4095 .facets
4096 .get(b"TAG".as_slice())
4097 .expect("first-value tag facet");
4098 assert_eq!(first_tag.values().sum::<u64>(), 1);
4099 assert_eq!(first_tag.len(), 1);
4100 assert_eq!(first_value.stats.early_stops, 1);
4101 }
4102
4103 #[test]
4104 fn explorer_first_value_does_not_double_count_duplicate_facets_or_histogram() {
4105 let dir = TempDir::new().expect("tempdir");
4106 let path = dir.path().join("first-value-no-double-count.journal");
4107 write_entries(
4108 &path,
4109 None,
4110 &[(
4111 &[
4112 b"_SOURCE_REALTIME_TIMESTAMP=1000",
4113 b"TAG=one",
4114 b"TAG=two",
4115 b"MESSAGE=after-tag",
4116 ],
4117 1_000,
4118 )],
4119 );
4120
4121 let mut reader = FileReader::open(&path).expect("open reader");
4122 let result = reader
4123 .explore(&ExplorerQuery {
4124 facets: vec![b"TAG".to_vec()],
4125 histogram: Some(b"TAG".to_vec()),
4126 histogram_target_buckets: 1,
4127 limit: 0,
4128 ..ExplorerQuery::default()
4129 })
4130 .expect("explore");
4131
4132 assert_eq!(
4133 result
4134 .facets
4135 .get(b"TAG".as_slice())
4136 .expect("tag facet")
4137 .values()
4138 .sum::<u64>(),
4139 1
4140 );
4141 assert_eq!(
4142 result
4143 .histogram
4144 .as_ref()
4145 .expect("histogram")
4146 .buckets
4147 .iter()
4148 .flat_map(|bucket| bucket.values.values())
4149 .sum::<u64>(),
4150 1
4151 );
4152
4153 let mut all_values_reader = FileReader::open(&path).expect("open all-values reader");
4154 let all_values = all_values_reader
4155 .explore(&ExplorerQuery {
4156 facets: vec![b"TAG".to_vec()],
4157 histogram: Some(b"TAG".to_vec()),
4158 histogram_target_buckets: 1,
4159 limit: 0,
4160 field_mode: ExplorerFieldMode::AllValues,
4161 ..ExplorerQuery::default()
4162 })
4163 .expect("all-values explore");
4164
4165 assert_eq!(
4166 all_values
4167 .facets
4168 .get(b"TAG".as_slice())
4169 .expect("tag facet")
4170 .values()
4171 .sum::<u64>(),
4172 2
4173 );
4174 assert_eq!(
4175 all_values
4176 .histogram
4177 .as_ref()
4178 .expect("histogram")
4179 .buckets
4180 .iter()
4181 .flat_map(|bucket| bucket.values.values())
4182 .sum::<u64>(),
4183 2
4184 );
4185 }
4186
4187 #[test]
4188 fn explorer_first_value_tracks_required_field_identities() {
4189 let dir = TempDir::new().expect("tempdir");
4190 let path = dir.path().join("first-value-identities.journal");
4191 write_entries(
4192 &path,
4193 None,
4194 &[(&[b"TAG=one", b"TAG=two", b"SERVICE=a"], 1_000)],
4195 );
4196
4197 let mut reader = FileReader::open(&path).expect("open reader");
4198 let result = reader
4199 .explore(&ExplorerQuery {
4200 facets: vec![b"TAG".to_vec(), b"SERVICE".to_vec()],
4201 limit: 0,
4202 field_mode: ExplorerFieldMode::FirstValue,
4203 ..ExplorerQuery::default()
4204 })
4205 .expect("explore");
4206
4207 assert_eq!(
4208 result
4209 .facets
4210 .get(b"TAG".as_slice())
4211 .expect("tag facet")
4212 .values()
4213 .sum::<u64>(),
4214 1
4215 );
4216 assert_eq!(
4217 result
4218 .facets
4219 .get(b"SERVICE".as_slice())
4220 .and_then(|values| values.get(b"a".as_slice())),
4221 Some(&1)
4222 );
4223 assert_eq!(result.stats.early_stops, 1);
4224 }
4225
4226 #[test]
4227 fn explorer_rejects_duplicate_facet_fields() {
4228 let dir = TempDir::new().expect("tempdir");
4229 let path = dir.path().join("duplicate-facets.journal");
4230 write_entries(&path, None, &[(&[b"SERVICE=a"], 1_000)]);
4231
4232 let mut reader = FileReader::open(&path).expect("open reader");
4233 let err = reader
4234 .explore(&ExplorerQuery {
4235 facets: vec![b"SERVICE".to_vec(), b"SERVICE".to_vec()],
4236 limit: 0,
4237 ..ExplorerQuery::default()
4238 })
4239 .expect_err("duplicate facets rejected");
4240
4241 assert!(err.to_string().contains("must not be duplicated"));
4242 }
4243
4244 #[test]
4245 fn explorer_empty_result_keeps_requested_facet_with_no_values() {
4246 let dir = TempDir::new().expect("tempdir");
4247 let path = dir.path().join("empty-result.journal");
4248 write_entries(&path, None, &[(&[b"SERVICE=a", b"PRIORITY=3"], 1_000)]);
4249
4250 let mut reader = FileReader::open(&path).expect("open reader");
4251 let result = reader
4252 .explore(&ExplorerQuery {
4253 after_realtime_usec: Some(10_000),
4254 before_realtime_usec: Some(20_000),
4255 facets: vec![b"SERVICE".to_vec()],
4256 limit: 10,
4257 realtime_slack_usec: 0,
4258 ..ExplorerQuery::default()
4259 })
4260 .expect("explore");
4261
4262 assert!(result.rows.is_empty());
4263 assert_eq!(result.stats.rows_matched, 0);
4264 assert!(
4265 result
4266 .facets
4267 .get(b"SERVICE".as_slice())
4268 .expect("service facet")
4269 .is_empty()
4270 );
4271 }
4272
4273 #[test]
4274 fn explorer_facet_time_bounds_do_not_count_slack_rows_without_source_realtime() {
4275 let dir = TempDir::new().expect("tempdir");
4276 let path = dir.path().join("facet-time-bound.journal");
4277 write_entries(
4278 &path,
4279 None,
4280 &[
4281 (&[b"SERVICE=before"], 340_000_000),
4282 (&[b"SERVICE=inside"], 360_000_000),
4283 (&[b"SERVICE=after"], 400_000_000),
4284 ],
4285 );
4286
4287 let mut reader = FileReader::open(&path).expect("open reader");
4288 let result = reader
4289 .explore(&ExplorerQuery {
4290 after_realtime_usec: Some(350_000_000),
4291 before_realtime_usec: Some(370_000_000),
4292 facets: vec![b"SERVICE".to_vec()],
4293 limit: 0,
4294 realtime_slack_usec: 20_000_000,
4295 use_source_realtime: false,
4296 ..ExplorerQuery::default()
4297 })
4298 .expect("explore");
4299
4300 let service = result
4301 .facets
4302 .get(b"SERVICE".as_slice())
4303 .expect("service facet");
4304 assert_eq!(service.get(b"inside".as_slice()), Some(&1));
4305 assert_eq!(service.get(b"before".as_slice()), None);
4306 assert_eq!(service.get(b"after".as_slice()), None);
4307 assert_eq!(result.stats.facet_rows_matched, 1);
4308 }
4309
4310 #[test]
4311 fn explorer_fts_disables_first_value_early_stop() {
4312 let dir = TempDir::new().expect("tempdir");
4313 let path = dir.path().join("fts-no-early-stop.journal");
4314 write_entries(&path, None, &[(&[b"TAG=one", b"MESSAGE=needle"], 1_000)]);
4315
4316 let mut reader = FileReader::open(&path).expect("open reader");
4317 let result = reader
4318 .explore(&ExplorerQuery {
4319 facets: vec![b"TAG".to_vec()],
4320 fts_patterns: vec![b"needle".to_vec()],
4321 limit: 0,
4322 ..ExplorerQuery::default()
4323 })
4324 .expect("explore");
4325
4326 assert_eq!(result.stats.early_stops, 0);
4327 assert_eq!(result.stats.data_refs_seen, 2);
4328 assert_eq!(
4329 result
4330 .facets
4331 .get(b"TAG".as_slice())
4332 .and_then(|values| values.get(b"one".as_slice())),
4333 Some(&1)
4334 );
4335 }
4336
4337 #[test]
4338 fn explorer_fts_or_terms_and_negative_terms_filter_rows() {
4339 let dir = TempDir::new().expect("tempdir");
4340 let path = dir.path().join("fts-negative.journal");
4341 write_entries(
4342 &path,
4343 None,
4344 &[
4345 (&[b"TAG=alpha", b"MESSAGE=alpha keep"], 1_000),
4346 (&[b"TAG=beta", b"MESSAGE=beta keep"], 2_000),
4347 (&[b"TAG=debug", b"MESSAGE=alpha debug"], 3_000),
4348 (&[b"TAG=other", b"MESSAGE=other"], 4_000),
4349 (&[b"TAG=wild", b"MESSAGE=start middle end"], 5_000),
4350 ],
4351 );
4352
4353 let mut reader = FileReader::open(&path).expect("open reader");
4354 let result = reader
4355 .explore(&ExplorerQuery {
4356 facets: vec![b"TAG".to_vec()],
4357 fts_terms: vec![
4358 ExplorerFtsPattern::substring(b"alpha".to_vec(), false),
4359 ExplorerFtsPattern::substring(b"beta".to_vec(), false),
4360 ExplorerFtsPattern::substring(b"debug".to_vec(), true),
4361 ExplorerFtsPattern::substring(b"start*end".to_vec(), false),
4362 ],
4363 limit: 10,
4364 ..ExplorerQuery::default()
4365 })
4366 .expect("explore");
4367
4368 let tag = result.facets.get(b"TAG".as_slice()).expect("TAG facet");
4369 assert_eq!(result.rows.len(), 3);
4370 assert_eq!(tag.get(b"alpha".as_slice()), Some(&1));
4371 assert_eq!(tag.get(b"beta".as_slice()), Some(&1));
4372 assert_eq!(tag.get(b"wild".as_slice()), Some(&1));
4373 assert_eq!(tag.get(b"debug".as_slice()), None);
4374 assert_eq!(tag.get(b"other".as_slice()), None);
4375 }
4376
4377 #[test]
4378 fn explorer_auto_anchor_scans_backward_from_tail() {
4379 let dir = TempDir::new().expect("tempdir");
4380 let path = dir.path().join("backward.journal");
4381 write_entries(
4382 &path,
4383 None,
4384 &[
4385 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
4386 (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
4387 ],
4388 );
4389
4390 let mut reader = FileReader::open(&path).expect("open reader");
4391 let query = ExplorerQuery {
4392 direction: Direction::Backward,
4393 limit: 2,
4394 ..ExplorerQuery::default()
4395 };
4396
4397 let result = reader.explore(&query).expect("explore");
4398 assert_eq!(result.rows.len(), 2);
4399 assert_eq!(result.rows[0].realtime_usec, 2_000);
4400 assert_eq!(result.rows[1].realtime_usec, 1_000);
4401 }
4402
4403 #[test]
4404 fn explorer_backward_time_bound_stops_after_slack_window() {
4405 let dir = TempDir::new().expect("tempdir");
4406 let path = dir.path().join("backward-time-bound.journal");
4407 write_entries(
4408 &path,
4409 None,
4410 &[
4411 (&[b"SERVICE=a"], 100_000_000),
4412 (&[b"SERVICE=b"], 200_000_000),
4413 (&[b"SERVICE=c"], 300_000_000),
4414 (&[b"SERVICE=d"], 400_000_000),
4415 (&[b"SERVICE=e"], 500_000_000),
4416 ],
4417 );
4418
4419 let mut reader = FileReader::open(&path).expect("open reader");
4420 let query = ExplorerQuery {
4421 after_realtime_usec: Some(350_000_000),
4422 direction: Direction::Backward,
4423 limit: 10,
4424 realtime_slack_usec: 10_000_000,
4425 ..ExplorerQuery::default()
4426 };
4427
4428 let result = reader.explore(&query).expect("explore");
4429 assert_eq!(result.rows.len(), 2);
4430 assert_eq!(result.rows[0].realtime_usec, 500_000_000);
4431 assert_eq!(result.rows[1].realtime_usec, 400_000_000);
4432 assert_eq!(result.stats.rows_examined, 2);
4433 }
4434
4435 #[test]
4436 fn explorer_histogram_and_fts_are_opt_in() {
4437 let dir = TempDir::new().expect("tempdir");
4438 let path = dir.path().join("histogram.journal");
4439 write_entries(
4440 &path,
4441 None,
4442 &[
4443 (&[b"MESSAGE=alpha", b"PRIORITY=3"], 1_000),
4444 (&[b"MESSAGE=beta", b"PRIORITY=4"], 2_000),
4445 ],
4446 );
4447
4448 let mut reader = FileReader::open(&path).expect("open reader");
4449 let query = ExplorerQuery {
4450 after_realtime_usec: Some(0),
4451 before_realtime_usec: Some(3_000),
4452 histogram: Some(b"PRIORITY".to_vec()),
4453 histogram_target_buckets: 2,
4454 fts_patterns: vec![b"alp".to_vec()],
4455 limit: 10,
4456 ..ExplorerQuery::default()
4457 };
4458
4459 let result = reader.explore(&query).expect("explore");
4460 assert_eq!(result.rows.len(), 1);
4461 assert!(result.stats.fts_scans > 0);
4462 assert_eq!(
4463 result
4464 .histogram
4465 .as_ref()
4466 .expect("histogram")
4467 .buckets
4468 .iter()
4469 .flat_map(|bucket| bucket.values.values())
4470 .sum::<u64>(),
4471 1
4472 );
4473 }
4474
4475 #[test]
4476 fn explorer_first_value_stops_after_same_data_satisfies_multiple_roles() {
4477 let dir = TempDir::new().expect("tempdir");
4478 let path = dir.path().join("same-data-multiple-roles.journal");
4479 write_entries(
4480 &path,
4481 None,
4482 &[(
4483 &[b"_SOURCE_REALTIME_TIMESTAMP=1000", b"MESSAGE=after-source"],
4484 1_000,
4485 )],
4486 );
4487
4488 let mut reader = FileReader::open(&path).expect("open reader");
4489 let result = reader
4490 .explore(&ExplorerQuery {
4491 histogram: Some(SOURCE_REALTIME_FIELD.to_vec()),
4492 histogram_target_buckets: 1,
4493 limit: 0,
4494 field_mode: ExplorerFieldMode::FirstValue,
4495 ..ExplorerQuery::default()
4496 })
4497 .expect("explore");
4498
4499 assert_eq!(result.stats.histogram_updates, 1);
4500 assert_eq!(result.stats.early_stops, 1);
4501 assert_eq!(
4502 result
4503 .histogram
4504 .as_ref()
4505 .expect("histogram")
4506 .buckets
4507 .iter()
4508 .flat_map(|bucket| bucket.values.values())
4509 .sum::<u64>(),
4510 1
4511 );
4512 }
4513}