1use super::*;
2use journal_core::file::{DataObject, offset_array::InlinedCursor};
3use std::collections::{HashMap, HashSet};
4use std::time::{Duration, Instant};
5
6const DEFAULT_HISTOGRAM_TARGET_BUCKETS: usize = 150;
7const DEFAULT_TIME_SLACK_USEC: u64 = 120_000_000;
8const EXPLORER_CONTROL_CHECK_EVERY_ROWS: u64 = 8192;
9const DEFAULT_ROWS_FULL_CHECK_EVERY_ROWS: u64 = 1;
10const EXPLORER_SAMPLING_SLOTS_MAX: usize = 1000;
11const EXPLORER_SAMPLING_RECALIBRATE_ROWS: u64 = 10_000;
12const EXPLORER_SAMPLING_ESTIMATE_AFTER_PROGRESS: f64 = 0.01;
13const SOURCE_REALTIME_FIELD: &[u8] = b"_SOURCE_REALTIME_TIMESTAMP";
14const UNSET_VALUE: &[u8] = b"-";
15const EXPLORER_UNSAMPLED_VALUE: &[u8] = b"[unsampled]";
16const EXPLORER_ESTIMATED_VALUE: &[u8] = b"[estimated]";
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum ExplorerAnchor {
20 Auto,
21 Head,
22 Tail,
23 Realtime(u64),
24}
25
26impl Default for ExplorerAnchor {
27 fn default() -> Self {
28 Self::Auto
29 }
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum ExplorerFieldMode {
34 AllValues,
35 FirstValue,
36}
37
38impl Default for ExplorerFieldMode {
39 fn default() -> Self {
40 Self::FirstValue
41 }
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45#[non_exhaustive]
46pub enum ExplorerStrategy {
47 Traversal,
48 Index,
49 Compare,
50}
51
52impl Default for ExplorerStrategy {
53 fn default() -> Self {
54 Self::Traversal
55 }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub struct ExplorerFilter {
60 pub field: Vec<u8>,
61 pub values: Vec<Vec<u8>>,
62}
63
64impl ExplorerFilter {
65 pub fn new(
66 field: impl Into<Vec<u8>>,
67 values: impl IntoIterator<Item = impl Into<Vec<u8>>>,
68 ) -> Self {
69 Self {
70 field: field.into(),
71 values: values.into_iter().map(Into::into).collect(),
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
77pub struct ExplorerQuery {
78 pub after_realtime_usec: Option<u64>,
79 pub before_realtime_usec: Option<u64>,
80 pub anchor: ExplorerAnchor,
81 pub direction: Direction,
82 pub limit: usize,
83 pub filters: Vec<ExplorerFilter>,
84 pub facets: Vec<Vec<u8>>,
85 pub histogram: Option<Vec<u8>>,
86 pub histogram_target_buckets: usize,
87 pub fts_terms: Vec<ExplorerFtsPattern>,
88 pub fts_patterns: Vec<Vec<u8>>,
89 pub fts_negative_patterns: Vec<Vec<u8>>,
90 pub field_mode: ExplorerFieldMode,
91 pub exclude_facet_field_filters: bool,
92 pub use_source_realtime: bool,
93 pub realtime_slack_usec: u64,
94 pub stop_when_rows_full: bool,
95 pub stop_when_rows_full_check_every: u64,
96 pub sampling: Option<ExplorerSampling>,
97 #[doc(hidden)]
100 pub debug_collect_column_fields_by_row_traversal: bool,
101}
102
103impl Default for ExplorerQuery {
104 fn default() -> Self {
105 Self {
106 after_realtime_usec: None,
107 before_realtime_usec: None,
108 anchor: ExplorerAnchor::Auto,
109 direction: Direction::Forward,
110 limit: 200,
111 filters: Vec::new(),
112 facets: Vec::new(),
113 histogram: None,
114 histogram_target_buckets: DEFAULT_HISTOGRAM_TARGET_BUCKETS,
115 fts_terms: Vec::new(),
116 fts_patterns: Vec::new(),
117 fts_negative_patterns: Vec::new(),
118 field_mode: ExplorerFieldMode::FirstValue,
119 exclude_facet_field_filters: true,
120 use_source_realtime: true,
121 realtime_slack_usec: DEFAULT_TIME_SLACK_USEC,
122 stop_when_rows_full: false,
123 stop_when_rows_full_check_every: DEFAULT_ROWS_FULL_CHECK_EVERY_ROWS,
124 sampling: None,
125 debug_collect_column_fields_by_row_traversal: false,
126 }
127 }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub struct ExplorerSampling {
132 pub budget: u64,
133 pub matched_files: u64,
134 pub file_head_realtime_usec: u64,
135 pub file_tail_realtime_usec: u64,
136 pub file_head_seqnum: u64,
137 pub file_tail_seqnum: u64,
138 pub file_entries: u64,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct ExplorerFtsPattern {
143 pub parts: Vec<Vec<u8>>,
144 pub negative: bool,
145}
146
147impl ExplorerFtsPattern {
148 pub fn substring(pattern: impl Into<Vec<u8>>, negative: bool) -> Self {
149 let pattern = pattern.into();
150 let parts = pattern
151 .split(|byte| *byte == b'*')
152 .filter(|part| !part.is_empty())
153 .map(|part| part.to_vec())
154 .collect();
155 Self { parts, negative }
156 }
157
158 fn matches(&self, value: &[u8]) -> bool {
159 if value.is_empty() {
160 return false;
161 }
162 if self.parts.is_empty() {
163 return true;
164 }
165
166 let mut haystack = value;
167 for part in &self.parts {
168 let Some(index) = find_ascii_case_insensitive(haystack, part) else {
169 return false;
170 };
171 haystack = &haystack[index.saturating_add(part.len())..];
172 }
173 true
174 }
175}
176
177impl ExplorerQuery {
178 pub fn with_filter(
179 mut self,
180 field: impl Into<Vec<u8>>,
181 values: impl IntoIterator<Item = impl Into<Vec<u8>>>,
182 ) -> Self {
183 self.filters.push(ExplorerFilter::new(field, values));
184 self
185 }
186
187 pub fn with_facet(mut self, field: impl Into<Vec<u8>>) -> Self {
188 self.facets.push(field.into());
189 self
190 }
191
192 pub fn with_histogram(mut self, field: impl Into<Vec<u8>>) -> Self {
193 self.histogram = Some(field.into());
194 self
195 }
196
197 pub fn with_fts_pattern(mut self, pattern: impl Into<Vec<u8>>) -> Self {
198 let pattern = pattern.into();
199 self.fts_terms
200 .push(ExplorerFtsPattern::substring(pattern.clone(), false));
201 self.fts_patterns.push(pattern);
202 self
203 }
204
205 pub fn with_fts_negative_pattern(mut self, pattern: impl Into<Vec<u8>>) -> Self {
206 let pattern = pattern.into();
207 self.fts_terms
208 .push(ExplorerFtsPattern::substring(pattern.clone(), true));
209 self.fts_negative_patterns.push(pattern);
210 self
211 }
212}
213
214#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize)]
215pub struct ExplorerStats {
216 pub rows_examined: u64,
217 pub rows_matched: u64,
218 pub facet_rows_matched: u64,
219 pub rows_returned: u64,
220 pub rows_unsampled: u64,
221 pub rows_estimated: u64,
222 pub sampling_sampled: u64,
223 pub sampling_unsampled: u64,
224 pub sampling_estimated: u64,
225 pub last_realtime_usec: u64,
226 pub max_source_realtime_delta_usec: u64,
227 pub data_refs_seen: u64,
228 pub data_refs_skipped: u64,
229 pub data_payloads_loaded: u64,
230 pub data_objects_classified: u64,
231 pub data_cache_hits: u64,
232 pub data_cache_misses: u64,
233 pub payloads_decompressed: u64,
234 pub fts_scans: u64,
235 pub facet_updates: u64,
236 pub histogram_updates: u64,
237 pub returned_row_expansions: u64,
238 pub early_stop_opportunities: u64,
239 pub early_stops: u64,
240}
241
242#[derive(Debug, Clone)]
243pub struct ExplorerRow {
244 pub realtime_usec: u64,
245 pub cursor: String,
246 pub payloads: Vec<Vec<u8>>,
247}
248
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub(crate) enum ExplorerRowPayloadMode {
251 Expand,
252 CursorOnly,
253}
254
255#[derive(Debug, Clone)]
256pub struct ExplorerHistogramBucket {
257 pub start_realtime_usec: u64,
258 pub end_realtime_usec: u64,
259 pub values: HashMap<Vec<u8>, u64>,
260}
261
262#[derive(Debug, Clone)]
263pub struct ExplorerHistogram {
264 pub field: Vec<u8>,
265 pub buckets: Vec<ExplorerHistogramBucket>,
266}
267
268#[derive(Debug, Clone, Default)]
269pub struct ExplorerComparison {
270 pub traversal_duration: Duration,
271 pub index_duration: Duration,
272 pub traversal_stats: ExplorerStats,
273 pub index_stats: ExplorerStats,
274}
275
276#[derive(Debug, Clone, Default)]
277pub struct ExplorerResult {
278 pub rows: Vec<ExplorerRow>,
279 pub facets: HashMap<Vec<u8>, HashMap<Vec<u8>, u64>>,
280 pub histogram: Option<ExplorerHistogram>,
281 pub column_fields: HashSet<Vec<u8>>,
282 pub stats: ExplorerStats,
283 pub comparison: Option<ExplorerComparison>,
284}
285
286#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
287pub enum ExplorerStopReason {
288 TimedOut,
289 Cancelled,
290}
291
292#[derive(Debug, Clone)]
293pub struct ExplorerProgress {
294 pub stats: ExplorerStats,
295 pub elapsed: Duration,
296}
297
298pub struct ExplorerControl<'a> {
299 deadline: Option<Instant>,
300 cancellation: Option<&'a dyn Fn() -> bool>,
301 progress: Option<&'a mut dyn FnMut(ExplorerProgress)>,
302 candidate_row: Option<&'a mut dyn FnMut(u64) -> bool>,
303 adjust_realtime: Option<&'a mut dyn FnMut(u64) -> u64>,
304 matched_row: Option<&'a mut dyn FnMut(u64, u64) -> bool>,
305 sampling: Option<&'a mut ExplorerSamplingState>,
306 progress_interval: Duration,
307 started: Instant,
308 last_progress: Instant,
309 next_check_rows: u64,
310 stop_reason: Option<ExplorerStopReason>,
311}
312
313impl<'a> ExplorerControl<'a> {
314 pub fn new() -> Self {
315 let now = Instant::now();
316 Self {
317 deadline: None,
318 cancellation: None,
319 progress: None,
320 candidate_row: None,
321 adjust_realtime: None,
322 matched_row: None,
323 sampling: None,
324 progress_interval: Duration::from_millis(250),
325 started: now,
326 last_progress: now,
327 next_check_rows: EXPLORER_CONTROL_CHECK_EVERY_ROWS,
328 stop_reason: None,
329 }
330 }
331
332 pub fn set_deadline(&mut self, deadline: Option<Instant>) {
333 self.deadline = deadline;
334 }
335
336 pub fn set_cancellation_callback(&mut self, cancellation: Option<&'a dyn Fn() -> bool>) {
337 self.cancellation = cancellation;
338 }
339
340 pub fn set_progress_callback(&mut self, progress: Option<&'a mut dyn FnMut(ExplorerProgress)>) {
341 self.progress = progress;
342 }
343
344 pub(crate) fn set_candidate_row_callback(
345 &mut self,
346 candidate_row: Option<&'a mut dyn FnMut(u64) -> bool>,
347 ) {
348 self.candidate_row = candidate_row;
349 }
350
351 pub(crate) fn set_realtime_adjust_callback(
352 &mut self,
353 adjust_realtime: Option<&'a mut dyn FnMut(u64) -> u64>,
354 ) {
355 self.adjust_realtime = adjust_realtime;
356 }
357
358 pub fn set_matched_row_callback(
359 &mut self,
360 matched_row: Option<&'a mut dyn FnMut(u64, u64) -> bool>,
361 ) {
362 self.matched_row = matched_row;
363 }
364
365 pub(crate) fn set_sampling_state(&mut self, sampling: Option<&'a mut ExplorerSamplingState>) {
366 self.sampling = sampling;
367 }
368
369 pub fn set_progress_interval(&mut self, interval: Duration) {
370 self.progress_interval = interval;
371 }
372
373 pub fn stop_reason(&self) -> Option<ExplorerStopReason> {
374 self.stop_reason
375 }
376
377 fn should_stop_after_rows(&mut self, rows_seen: u64, stats: &ExplorerStats) -> bool {
378 if self.stop_reason.is_some() {
379 return true;
380 }
381 if rows_seen < self.next_check_rows {
382 return false;
383 }
384 self.next_check_rows = rows_seen.saturating_add(EXPLORER_CONTROL_CHECK_EVERY_ROWS);
385 self.check(stats)
386 }
387
388 fn check(&mut self, stats: &ExplorerStats) -> bool {
389 let now = Instant::now();
390 if now.duration_since(self.last_progress) >= self.progress_interval {
391 self.emit_progress(stats, now);
392 }
393 if self.cancellation.is_some_and(|is_cancelled| is_cancelled()) {
394 self.stop_reason = Some(ExplorerStopReason::Cancelled);
395 self.emit_progress(stats, now);
396 return true;
397 }
398 if self.deadline.is_some_and(|deadline| now >= deadline) {
399 self.stop_reason = Some(ExplorerStopReason::TimedOut);
400 self.emit_progress(stats, now);
401 return true;
402 }
403 false
404 }
405
406 fn emit_progress(&mut self, stats: &ExplorerStats, now: Instant) {
407 self.last_progress = now;
408 if let Some(progress) = self.progress.as_deref_mut() {
409 progress(ExplorerProgress {
410 stats: stats.clone(),
411 elapsed: now.duration_since(self.started),
412 });
413 }
414 }
415
416 fn emit_matched_row(&mut self, realtime_usec: u64, rows_matched: u64) -> bool {
417 if let Some(matched_row) = self.matched_row.as_deref_mut() {
418 return matched_row(realtime_usec, rows_matched);
419 }
420 false
421 }
422
423 fn adjust_realtime(&mut self, realtime_usec: u64) -> u64 {
424 self.adjust_realtime
425 .as_deref_mut()
426 .map_or(realtime_usec, |adjust_realtime| {
427 adjust_realtime(realtime_usec)
428 })
429 }
430}
431
432impl Default for ExplorerControl<'_> {
433 fn default() -> Self {
434 Self::new()
435 }
436}
437
438#[derive(Debug, Clone, Copy, PartialEq, Eq)]
439enum ExplorerSamplingDecision {
440 Full {
441 sampled: bool,
442 },
443 SkipFields,
444 StopAndEstimate {
445 remaining_rows: u64,
446 from_realtime_usec: u64,
447 to_realtime_usec: u64,
448 },
449}
450
451#[derive(Debug)]
452pub(crate) struct ExplorerSamplingState {
453 start_realtime_usec: u64,
454 end_realtime_usec: u64,
455 file_head_realtime_usec: u64,
456 file_tail_realtime_usec: u64,
457 file_head_seqnum: u64,
458 file_tail_seqnum: u64,
459 file_entries: u64,
460 first_realtime_usec: Option<u64>,
461 step_realtime_usec: u64,
462 enable_after_samples: u64,
463 per_file_enable_after_samples: u64,
464 per_slot_enable_after_samples: u64,
465 sampled: u64,
466 per_file_sampled: u64,
467 per_file_unsampled: u64,
468 per_file_every: u64,
469 per_file_skipped: u64,
470 per_file_recalibrate: u64,
471 per_slot_sampled: Vec<u64>,
472 per_slot_unsampled: Vec<u64>,
473 matched_files: u64,
474 direction: Direction,
475}
476
477impl ExplorerSamplingState {
478 pub(crate) fn for_query(
479 query: &ExplorerQuery,
480 histogram_bucket_count: Option<usize>,
481 ) -> Option<Self> {
482 let sampling = query.sampling?;
483 let start_realtime_usec = query.after_realtime_usec?;
484 let end_realtime_usec = query.before_realtime_usec?;
485 if sampling.budget == 0
486 || sampling.matched_files == 0
487 || start_realtime_usec >= end_realtime_usec
488 {
489 return None;
490 }
491
492 let slots = histogram_bucket_count
493 .unwrap_or(query.histogram_target_buckets)
494 .clamp(2, EXPLORER_SAMPLING_SLOTS_MAX);
495 let delta = end_realtime_usec.saturating_sub(start_realtime_usec);
496 let step_realtime_usec = (delta / slots as u64).saturating_sub(1).max(1);
497 let per_file_enable_after_samples =
498 ((sampling.budget / 4) / sampling.matched_files.max(1)).max(query.limit as u64);
499 let per_slot_enable_after_samples =
500 ((sampling.budget / 4) / slots as u64).max(query.limit as u64);
501
502 Some(Self {
503 start_realtime_usec,
504 end_realtime_usec,
505 file_head_realtime_usec: sampling.file_head_realtime_usec,
506 file_tail_realtime_usec: sampling.file_tail_realtime_usec,
507 file_head_seqnum: sampling.file_head_seqnum,
508 file_tail_seqnum: sampling.file_tail_seqnum,
509 file_entries: sampling.file_entries,
510 first_realtime_usec: None,
511 step_realtime_usec,
512 enable_after_samples: sampling.budget / 2,
513 per_file_enable_after_samples,
514 per_slot_enable_after_samples,
515 sampled: 0,
516 per_file_sampled: 0,
517 per_file_unsampled: 0,
518 per_file_every: 0,
519 per_file_skipped: 0,
520 per_file_recalibrate: 0,
521 per_slot_sampled: vec![0; slots],
522 per_slot_unsampled: vec![0; slots],
523 matched_files: sampling.matched_files.max(1),
524 direction: query.direction,
525 })
526 }
527
528 fn begin_file(&mut self, sampling: ExplorerSampling) {
529 self.file_head_realtime_usec = sampling.file_head_realtime_usec;
530 self.file_tail_realtime_usec = sampling.file_tail_realtime_usec;
531 self.file_head_seqnum = sampling.file_head_seqnum;
532 self.file_tail_seqnum = sampling.file_tail_seqnum;
533 self.file_entries = sampling.file_entries;
534 self.first_realtime_usec = None;
535 self.per_file_sampled = 0;
536 self.per_file_unsampled = 0;
537 self.per_file_every = 0;
538 self.per_file_skipped = 0;
539 self.per_file_recalibrate = 0;
540 }
541
542 fn decide(
543 &mut self,
544 realtime_usec: u64,
545 seqnum: u64,
546 candidate_to_keep: bool,
547 ) -> ExplorerSamplingDecision {
548 if self.first_realtime_usec.is_none() {
549 self.first_realtime_usec = Some(realtime_usec);
550 }
551 if candidate_to_keep {
552 return ExplorerSamplingDecision::Full { sampled: false };
553 }
554
555 let slot = self.slot_for_realtime(realtime_usec);
556 let should_sample = if self.sampled < self.enable_after_samples
557 || self.per_file_sampled < self.per_file_enable_after_samples
558 || self.per_slot_sampled[slot] < self.per_slot_enable_after_samples
559 {
560 true
561 } else if self.per_file_recalibrate >= EXPLORER_SAMPLING_RECALIBRATE_ROWS
562 || self.per_file_every == 0
563 {
564 self.recalibrate(realtime_usec, seqnum);
565 true
566 } else if self.per_file_skipped >= self.per_file_every {
567 self.per_file_skipped = 0;
568 true
569 } else {
570 self.per_file_skipped = self.per_file_skipped.saturating_add(1);
571 false
572 };
573
574 if should_sample {
575 self.sampled = self.sampled.saturating_add(1);
576 self.per_file_sampled = self.per_file_sampled.saturating_add(1);
577 self.per_slot_sampled[slot] = self.per_slot_sampled[slot].saturating_add(1);
578 return ExplorerSamplingDecision::Full { sampled: true };
579 }
580
581 self.per_file_recalibrate = self.per_file_recalibrate.saturating_add(1);
582 self.per_file_unsampled = self.per_file_unsampled.saturating_add(1);
583 self.per_slot_unsampled[slot] = self.per_slot_unsampled[slot].saturating_add(1);
584
585 if self.per_file_unsampled > self.per_file_sampled
586 && self.progress_by_time(realtime_usec) > EXPLORER_SAMPLING_ESTIMATE_AFTER_PROGRESS
587 {
588 let remaining_rows = self.estimate_remaining_rows(realtime_usec, seqnum);
589 let (from_realtime_usec, to_realtime_usec) = self.remaining_range(realtime_usec);
590 return ExplorerSamplingDecision::StopAndEstimate {
591 remaining_rows,
592 from_realtime_usec,
593 to_realtime_usec,
594 };
595 }
596
597 ExplorerSamplingDecision::SkipFields
598 }
599
600 fn slot_for_realtime(&self, realtime_usec: u64) -> usize {
601 let clamped = realtime_usec.clamp(self.start_realtime_usec, self.end_realtime_usec);
602 let slot =
603 (clamped.saturating_sub(self.start_realtime_usec) / self.step_realtime_usec) as usize;
604 slot.min(self.per_slot_sampled.len().saturating_sub(1))
605 }
606
607 fn recalibrate(&mut self, realtime_usec: u64, seqnum: u64) {
608 let remaining_rows = self.estimate_remaining_rows(realtime_usec, seqnum);
609 let wanted_samples = (self.enable_after_samples / self.matched_files).max(1);
610 self.per_file_every = (remaining_rows / wanted_samples).max(1);
611 self.per_file_recalibrate = 0;
612 }
613
614 fn estimate_remaining_rows(&self, realtime_usec: u64, seqnum: u64) -> u64 {
615 if let Some(remaining) = self.estimate_remaining_rows_by_seqnum(seqnum) {
616 return remaining;
617 }
618 self.estimate_remaining_rows_by_time(realtime_usec)
619 }
620
621 fn estimate_remaining_rows_by_seqnum(&self, seqnum: u64) -> Option<u64> {
622 self.validate_seqnum_estimate_inputs(seqnum)?;
623 let scanned_rows = self.scanned_file_rows();
624 let seqnum_span = self.seqnum_span_so_far(seqnum)?;
625 if seqnum_span == 0 {
626 return None;
627 }
628 let proportion_of_all_lines_so_far =
629 bounded_positive_proportion(scanned_rows as f64 / seqnum_span as f64)?;
630 let expected_matching_logs =
631 (proportion_of_all_lines_so_far * self.file_entries as f64) as u64;
632 if expected_matching_logs == 0 {
633 return None;
634 }
635 Some(expected_matching_logs.saturating_sub(scanned_rows).max(1))
639 }
640
641 fn validate_seqnum_estimate_inputs(&self, seqnum: u64) -> Option<()> {
642 (self.file_entries != 0
643 && self.file_head_seqnum != 0
644 && self.file_tail_seqnum != 0
645 && seqnum != 0)
646 .then_some(())
647 }
648
649 fn scanned_file_rows(&self) -> u64 {
650 self.per_file_sampled
651 .saturating_add(self.per_file_unsampled)
652 .max(1)
653 }
654
655 fn seqnum_span_so_far(&self, seqnum: u64) -> Option<u64> {
656 match self.direction {
657 Direction::Forward => seqnum.checked_sub(self.file_head_seqnum),
658 Direction::Backward => self.file_tail_seqnum.checked_sub(seqnum),
659 }
660 }
661
662 fn estimate_remaining_rows_by_time(&self, realtime_usec: u64) -> u64 {
663 let scanned_rows = self.scanned_file_rows();
664 let (after, before) = self.overlapping_timeframe(realtime_usec);
665 let total_time = self
666 .remaining_time_bounds(realtime_usec, after, before)
667 .0
668 .max(1);
669 let remaining_time = self.remaining_time_bounds(realtime_usec, after, before).1;
670 let elapsed = total_time.saturating_sub(remaining_time).max(1);
671 let mut proportion_by_time = elapsed as f64 / total_time as f64;
672 if proportion_by_time == 0.0 || proportion_by_time > 1.0 || !proportion_by_time.is_finite()
673 {
674 proportion_by_time = 1.0;
675 }
676 let mut expected_total = (scanned_rows as f64 / proportion_by_time) as u64;
677 if self.file_entries != 0 && expected_total > self.file_entries {
678 expected_total = self.file_entries;
679 }
680 expected_total.saturating_sub(scanned_rows).max(1)
681 }
682
683 fn progress_by_time(&self, realtime_usec: u64) -> f64 {
684 let (after, before) = self.overlapping_timeframe(realtime_usec);
685 let total_time = before.saturating_sub(after).max(1);
686 let elapsed = match self.direction {
687 Direction::Forward => realtime_usec.saturating_sub(after),
688 Direction::Backward => before.saturating_sub(realtime_usec),
689 }
690 .min(total_time);
691 elapsed as f64 / total_time as f64
692 }
693
694 fn overlapping_timeframe(&self, realtime_usec: u64) -> (u64, u64) {
695 match self.direction {
696 Direction::Forward => {
697 let mut oldest = self
698 .first_realtime_usec
699 .or((self.file_head_realtime_usec != 0).then_some(self.file_head_realtime_usec))
700 .unwrap_or(self.start_realtime_usec);
701 let mut newest = if self.file_tail_realtime_usec != 0 {
702 self.end_realtime_usec.min(self.file_tail_realtime_usec)
703 } else {
704 self.end_realtime_usec
705 };
706 if newest <= oldest {
707 newest = oldest.saturating_add(1);
708 }
709 if realtime_usec < oldest {
710 oldest = realtime_usec.saturating_sub(1);
711 }
712 (oldest, newest)
713 }
714 Direction::Backward => {
715 let mut newest = self
716 .first_realtime_usec
717 .or((self.file_tail_realtime_usec != 0).then_some(self.file_tail_realtime_usec))
718 .unwrap_or(self.end_realtime_usec);
719 let oldest = if self.file_head_realtime_usec != 0 {
720 self.start_realtime_usec.max(self.file_head_realtime_usec)
721 } else {
722 self.start_realtime_usec
723 };
724 if newest <= oldest {
725 newest = oldest.saturating_add(1);
726 }
727 if newest < realtime_usec {
728 newest = realtime_usec.saturating_add(1);
729 }
730 (oldest, newest)
731 }
732 }
733 }
734
735 fn remaining_range(&self, realtime_usec: u64) -> (u64, u64) {
736 let (after, before) = self.overlapping_timeframe(realtime_usec);
737 let (_, _, remaining_start, remaining_end) =
738 self.remaining_time_details(realtime_usec, after, before);
739 (remaining_start, remaining_end)
740 }
741
742 fn remaining_time_bounds(&self, realtime_usec: u64, after: u64, before: u64) -> (u64, u64) {
743 let (total, remaining, _, _) = self.remaining_time_details(realtime_usec, after, before);
744 (total, remaining)
745 }
746
747 fn remaining_time_details(
748 &self,
749 realtime_usec: u64,
750 mut after: u64,
751 mut before: u64,
752 ) -> (u64, u64, u64, u64) {
753 if realtime_usec <= after {
754 after = realtime_usec.saturating_sub(1);
755 }
756 if realtime_usec >= before {
757 before = realtime_usec.saturating_add(1);
758 }
759 if before <= after {
760 before = after.saturating_add(1);
761 }
762 let (remaining_start, remaining_end) = match self.direction {
763 Direction::Forward => (realtime_usec, before),
764 Direction::Backward => (after, realtime_usec),
765 };
766 (
767 before.saturating_sub(after).max(1),
768 remaining_end.saturating_sub(remaining_start),
769 remaining_start,
770 remaining_end,
771 )
772 }
773}
774
775pub(crate) fn histogram_bucket_count_for_query(query: &ExplorerQuery) -> Option<usize> {
776 query
777 .histogram
778 .as_deref()
779 .map(|field| new_histogram(field, query).buckets.len())
780}
781
782#[derive(Default)]
783struct RowScan {
784 timestamp: Option<u64>,
785 fts_matches: bool,
786 fts_negative_match: bool,
787 column_fields: Vec<Vec<u8>>,
788}
789
790const FACET_PUBLIC: u8 = 0x01;
791const FACET_HISTOGRAM: u8 = 0x02;
792const FACET_SOURCE_REALTIME: u8 = 0x04;
793
794#[derive(Clone, Copy, Debug, PartialEq, Eq)]
795enum OffsetClass {
796 Irrelevant,
797 FtsMatch,
798 FtsNegativeMatch,
799 Value(usize),
800}
801
802impl OffsetClass {
803 const IRRELEVANT_RAW: usize = 1;
804 const FTS_MATCH_RAW: usize = 2;
805 const FTS_NEGATIVE_MATCH_RAW: usize = 3;
806 const VALUE_BASE: usize = 4;
807
808 fn to_raw(self) -> usize {
809 match self {
810 Self::Irrelevant => Self::IRRELEVANT_RAW,
811 Self::FtsMatch => Self::FTS_MATCH_RAW,
812 Self::FtsNegativeMatch => Self::FTS_NEGATIVE_MATCH_RAW,
813 Self::Value(index) => Self::VALUE_BASE.saturating_add(index),
814 }
815 }
816
817 fn from_raw(raw: usize) -> Self {
818 match raw {
819 Self::IRRELEVANT_RAW => Self::Irrelevant,
820 Self::FTS_MATCH_RAW => Self::FtsMatch,
821 Self::FTS_NEGATIVE_MATCH_RAW => Self::FtsNegativeMatch,
822 raw => Self::Value(raw.saturating_sub(Self::VALUE_BASE)),
823 }
824 }
825}
826
827#[derive(Clone, Copy, Debug, Default)]
828struct OffsetClassSlot {
829 offset: u64,
830 class: usize,
831}
832
833#[derive(Debug)]
834struct OffsetClassCache {
835 slots: Vec<OffsetClassSlot>,
836 len: usize,
837}
838
839impl Default for OffsetClassCache {
840 fn default() -> Self {
841 Self {
842 slots: vec![OffsetClassSlot::default(); 256],
843 len: 0,
844 }
845 }
846}
847
848impl OffsetClassCache {
849 fn lookup(&self, offset: NonZeroU64) -> Option<OffsetClass> {
850 let mask = self.slots.len().saturating_sub(1);
851 let mut index = offset_slot(offset.get()) & mask;
852 for _ in 0..self.slots.len() {
853 let slot = self.slots[index];
854 if slot.offset == 0 {
855 return None;
856 }
857 if slot.offset == offset.get() {
858 return Some(OffsetClass::from_raw(slot.class));
859 }
860 index = (index + 1) & mask;
861 }
862 None
863 }
864
865 fn insert(&mut self, offset: NonZeroU64, class: OffsetClass) {
866 if (self.len + 1).saturating_mul(4) >= self.slots.len().saturating_mul(3) {
867 self.grow();
868 }
869 self.insert_raw(offset.get(), class.to_raw());
870 }
871
872 fn grow(&mut self) {
873 let new_len = self.slots.len().saturating_mul(2).max(256);
874 let old = std::mem::replace(&mut self.slots, vec![OffsetClassSlot::default(); new_len]);
875 self.len = 0;
876 for slot in old {
877 if slot.offset != 0 {
878 self.insert_raw(slot.offset, slot.class);
879 }
880 }
881 }
882
883 fn insert_raw(&mut self, offset: u64, class: usize) {
884 let mask = self.slots.len().saturating_sub(1);
885 let mut index = offset_slot(offset) & mask;
886 loop {
887 if self.slots[index].offset == 0 {
888 self.slots[index] = OffsetClassSlot { offset, class };
889 self.len += 1;
890 return;
891 }
892 if self.slots[index].offset == offset {
893 self.slots[index].class = class;
894 return;
895 }
896 index = (index + 1) & mask;
897 }
898 }
899}
900
901fn offset_slot(offset: u64) -> usize {
902 let mut value = offset >> 3;
903 value ^= value >> 33;
904 value = value.wrapping_mul(0xff51afd7ed558ccd);
905 value ^= value >> 33;
906 value as usize
907}
908
909struct ExplorerAccumulator {
910 field_lookup: HashMap<Vec<u8>, usize>,
911 fields: Vec<Vec<u8>>,
912 flags: Vec<u8>,
913 last_seen_row_ids: Vec<u64>,
914 unset_counts: Vec<u64>,
915 values_by_field: Vec<Vec<usize>>,
916 value_counts: Vec<u64>,
917 value_field_indices: Vec<usize>,
918 value_labels: Vec<Vec<u8>>,
919 value_fts_matches: Vec<bool>,
920 value_source_realtime: Vec<Option<u64>>,
921 value_histogram_buckets: Vec<Option<Vec<u64>>>,
922 field_histogram_unset_buckets: Vec<Option<Vec<u64>>>,
923 offset_cache: OffsetClassCache,
924 histogram_start_realtime_usec: u64,
925 histogram_bucket_width_usec: u64,
926 histogram_bucket_count: usize,
927 required_identity_count: usize,
928}
929
930impl ExplorerAccumulator {
931 fn for_main(query: &ExplorerQuery, histogram: Option<&ExplorerHistogram>) -> Self {
932 Self::for_combined(query, &[], histogram)
933 }
934
935 fn for_facets(
936 query: &ExplorerQuery,
937 facet_indices: &[usize],
938 include_source_realtime: bool,
939 ) -> Self {
940 let mut out = Self::new(None);
941 for facet_index in facet_indices {
942 if let Some(field) = query.facets.get(*facet_index) {
943 out.add_field(field, FACET_PUBLIC);
944 }
945 }
946 if include_source_realtime {
947 out.add_field(SOURCE_REALTIME_FIELD, FACET_SOURCE_REALTIME);
948 }
949 out
950 }
951
952 fn for_combined(
953 query: &ExplorerQuery,
954 facet_indices: &[usize],
955 histogram: Option<&ExplorerHistogram>,
956 ) -> Self {
957 let mut out = Self::new(histogram);
958 if let Some(field) = &query.histogram {
959 out.add_field(field, FACET_HISTOGRAM);
960 }
961 for facet_index in facet_indices {
962 if let Some(field) = query.facets.get(*facet_index) {
963 out.add_field(field, FACET_PUBLIC);
964 }
965 }
966 if query_needs_source_realtime_main(query) || facet_pass_needs_source_realtime(query) {
967 out.add_field(SOURCE_REALTIME_FIELD, FACET_SOURCE_REALTIME);
968 }
969 out
970 }
971
972 fn new(histogram: Option<&ExplorerHistogram>) -> Self {
973 Self {
974 field_lookup: HashMap::new(),
975 fields: Vec::new(),
976 flags: Vec::new(),
977 last_seen_row_ids: Vec::new(),
978 unset_counts: Vec::new(),
979 values_by_field: Vec::new(),
980 value_counts: Vec::new(),
981 value_field_indices: Vec::new(),
982 value_labels: Vec::new(),
983 value_fts_matches: Vec::new(),
984 value_source_realtime: Vec::new(),
985 value_histogram_buckets: Vec::new(),
986 field_histogram_unset_buckets: Vec::new(),
987 offset_cache: OffsetClassCache::default(),
988 histogram_start_realtime_usec: histogram
989 .and_then(|histogram| histogram.buckets.first())
990 .map(|bucket| bucket.start_realtime_usec)
991 .unwrap_or_default(),
992 histogram_bucket_width_usec: histogram
993 .and_then(|histogram| histogram.buckets.first())
994 .map(|bucket| {
995 bucket
996 .end_realtime_usec
997 .saturating_sub(bucket.start_realtime_usec)
998 .max(1)
999 })
1000 .unwrap_or(1),
1001 histogram_bucket_count: histogram
1002 .map(|histogram| histogram.buckets.len())
1003 .unwrap_or_default(),
1004 required_identity_count: 0,
1005 }
1006 }
1007
1008 fn add_field(&mut self, field: &[u8], flags: u8) {
1009 if let Some(index) = self.field_lookup.get(field).copied() {
1010 let had_required = self.flags[index] != 0;
1011 self.flags[index] |= flags;
1012 if flags & FACET_HISTOGRAM != 0 && self.field_histogram_unset_buckets[index].is_none() {
1013 self.field_histogram_unset_buckets[index] =
1014 Some(vec![0; self.histogram_bucket_count]);
1015 }
1016 if !had_required && self.flags[index] != 0 {
1017 self.required_identity_count += 1;
1018 }
1019 return;
1020 }
1021
1022 let index = self.fields.len();
1023 self.field_lookup.insert(field.to_vec(), index);
1024 self.fields.push(field.to_vec());
1025 self.flags.push(flags);
1026 self.last_seen_row_ids.push(0);
1027 self.unset_counts.push(0);
1028 self.values_by_field.push(Vec::new());
1029 self.field_histogram_unset_buckets
1030 .push((flags & FACET_HISTOGRAM != 0).then(|| vec![0; self.histogram_bucket_count]));
1031 if flags != 0 {
1032 self.required_identity_count += 1;
1033 }
1034 }
1035
1036 fn add_value(
1037 &mut self,
1038 field_index: usize,
1039 _data_offset: NonZeroU64,
1040 value: &[u8],
1041 fts_matches: bool,
1042 ) -> usize {
1043 let value_index = self.value_counts.len();
1044 let flags = self.flags[field_index];
1045 self.value_counts.push(0);
1046 self.value_field_indices.push(field_index);
1047 self.value_labels.push(value.to_vec());
1048 self.value_fts_matches.push(fts_matches);
1049 self.value_source_realtime
1050 .push(if flags & FACET_SOURCE_REALTIME != 0 {
1051 parse_source_realtime(value)
1052 } else {
1053 None
1054 });
1055 self.value_histogram_buckets
1056 .push((flags & FACET_HISTOGRAM != 0).then(|| vec![0; self.histogram_bucket_count]));
1057 self.values_by_field[field_index].push(value_index);
1058 value_index
1059 }
1060
1061 fn mark_field_seen(&mut self, field_index: usize, row_id: u64) -> bool {
1062 if self.last_seen_row_ids[field_index] == row_id {
1065 return false;
1066 }
1067 self.last_seen_row_ids[field_index] = row_id;
1068 true
1069 }
1070
1071 fn apply_value(
1072 &mut self,
1073 value_index: usize,
1074 realtime_usec: Option<u64>,
1075 stats: &mut ExplorerStats,
1076 ) {
1077 let field_index = self.value_field_indices[value_index];
1078 let flags = self.flags[field_index];
1079 if flags & FACET_PUBLIC != 0 {
1080 self.value_counts[value_index] = self.value_counts[value_index].saturating_add(1);
1081 stats.facet_updates = stats.facet_updates.saturating_add(1);
1082 }
1083 if flags & FACET_HISTOGRAM != 0 {
1084 if let (Some(realtime_usec), Some(buckets)) = (
1085 realtime_usec,
1086 self.value_histogram_buckets[value_index].as_mut(),
1087 ) {
1088 if let Some(bucket_index) = histogram_bucket_index_from_bounds(
1089 realtime_usec,
1090 self.histogram_start_realtime_usec,
1091 self.histogram_bucket_width_usec,
1092 buckets.len(),
1093 ) {
1094 buckets[bucket_index] = buckets[bucket_index].saturating_add(1);
1095 stats.histogram_updates = stats.histogram_updates.saturating_add(1);
1096 }
1097 }
1098 }
1099 }
1100
1101 fn finish_facet_row(&mut self, row_id: u64, stats: &mut ExplorerStats) {
1102 for field_index in 0..self.fields.len() {
1103 if self.flags[field_index] & FACET_PUBLIC == 0 {
1104 continue;
1105 }
1106 if self.last_seen_row_ids[field_index] != row_id {
1107 self.unset_counts[field_index] = self.unset_counts[field_index].saturating_add(1);
1108 stats.facet_updates = stats.facet_updates.saturating_add(1);
1109 }
1110 }
1111 }
1112
1113 fn finish_histogram_row(&mut self, row_id: u64, realtime_usec: u64, stats: &mut ExplorerStats) {
1114 for field_index in 0..self.fields.len() {
1115 if self.flags[field_index] & FACET_HISTOGRAM == 0 {
1116 continue;
1117 }
1118 if self.last_seen_row_ids[field_index] == row_id {
1119 continue;
1120 }
1121 let Some(buckets) = self.field_histogram_unset_buckets[field_index].as_mut() else {
1122 continue;
1123 };
1124 if let Some(bucket_index) = histogram_bucket_index_from_bounds(
1125 realtime_usec,
1126 self.histogram_start_realtime_usec,
1127 self.histogram_bucket_width_usec,
1128 buckets.len(),
1129 ) {
1130 buckets[bucket_index] = buckets[bucket_index].saturating_add(1);
1131 stats.histogram_updates = stats.histogram_updates.saturating_add(1);
1132 }
1133 }
1134 }
1135
1136 fn finish_facets(&self, result: &mut ExplorerResult) {
1137 for field_index in 0..self.fields.len() {
1138 if self.flags[field_index] & FACET_PUBLIC == 0 {
1139 continue;
1140 }
1141 let mut values = HashMap::new();
1142 for value_index in &self.values_by_field[field_index] {
1143 let count = self.value_counts[*value_index];
1144 if count != 0 {
1145 increment_counter_by(&mut values, &self.value_labels[*value_index], count);
1146 }
1147 }
1148 if self.unset_counts[field_index] != 0 {
1149 increment_counter_by(&mut values, UNSET_VALUE, self.unset_counts[field_index]);
1150 }
1151 result
1152 .facets
1153 .insert(self.fields[field_index].clone(), values);
1154 }
1155 }
1156
1157 fn finish_histogram(&self, histogram: Option<&mut ExplorerHistogram>) {
1158 let Some(histogram) = histogram else {
1159 return;
1160 };
1161 for buckets in self.field_histogram_unset_buckets.iter().flatten() {
1162 for (bucket_index, count) in buckets.iter().enumerate() {
1163 if *count == 0 {
1164 continue;
1165 }
1166 if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
1167 increment_counter_by(&mut bucket.values, UNSET_VALUE, *count);
1168 }
1169 }
1170 }
1171 for value_index in 0..self.value_histogram_buckets.len() {
1172 let Some(buckets) = &self.value_histogram_buckets[value_index] else {
1173 continue;
1174 };
1175 for (bucket_index, count) in buckets.iter().enumerate() {
1176 if *count == 0 {
1177 continue;
1178 }
1179 if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
1180 increment_counter_by(
1181 &mut bucket.values,
1182 &self.value_labels[value_index],
1183 *count,
1184 );
1185 }
1186 }
1187 }
1188 }
1189}
1190
1191fn bounded_positive_proportion(value: f64) -> Option<f64> {
1192 if value <= 0.0 || !value.is_finite() {
1193 return None;
1194 }
1195 Some(value.min(1.0))
1196}
1197
1198impl FileReader {
1199 pub fn explore(&mut self, query: &ExplorerQuery) -> Result<ExplorerResult> {
1200 self.explore_with_strategy(query, ExplorerStrategy::Traversal)
1201 }
1202
1203 pub fn explore_with_strategy(
1204 &mut self,
1205 query: &ExplorerQuery,
1206 strategy: ExplorerStrategy,
1207 ) -> Result<ExplorerResult> {
1208 self.explore_with_strategy_and_payload_mode(query, strategy, ExplorerRowPayloadMode::Expand)
1209 }
1210
1211 pub fn explore_with_strategy_and_control(
1212 &mut self,
1213 query: &ExplorerQuery,
1214 strategy: ExplorerStrategy,
1215 control: &mut ExplorerControl<'_>,
1216 ) -> Result<ExplorerResult> {
1217 validate_no_debug_column_collection(query)?;
1218 self.explore_with_strategy_and_payload_mode_unchecked(
1219 query,
1220 strategy,
1221 ExplorerRowPayloadMode::Expand,
1222 Some(control),
1223 )
1224 }
1225
1226 #[cfg(test)]
1227 pub(crate) fn explore_with_strategy_cursor_rows(
1228 &mut self,
1229 query: &ExplorerQuery,
1230 strategy: ExplorerStrategy,
1231 ) -> Result<ExplorerResult> {
1232 self.explore_with_strategy_and_payload_mode(
1233 query,
1234 strategy,
1235 ExplorerRowPayloadMode::CursorOnly,
1236 )
1237 }
1238
1239 pub(crate) fn explore_with_strategy_cursor_rows_controlled(
1240 &mut self,
1241 query: &ExplorerQuery,
1242 strategy: ExplorerStrategy,
1243 control: &mut ExplorerControl<'_>,
1244 ) -> Result<ExplorerResult> {
1245 validate_no_debug_column_collection(query)?;
1246 self.explore_with_strategy_and_payload_mode_unchecked(
1247 query,
1248 strategy,
1249 ExplorerRowPayloadMode::CursorOnly,
1250 Some(control),
1251 )
1252 }
1253
1254 fn explore_with_strategy_and_payload_mode(
1255 &mut self,
1256 query: &ExplorerQuery,
1257 strategy: ExplorerStrategy,
1258 row_payload_mode: ExplorerRowPayloadMode,
1259 ) -> Result<ExplorerResult> {
1260 validate_no_debug_column_collection(query)?;
1261 self.explore_with_strategy_and_payload_mode_unchecked(
1262 query,
1263 strategy,
1264 row_payload_mode,
1265 None,
1266 )
1267 }
1268
1269 fn explore_with_strategy_and_payload_mode_unchecked(
1270 &mut self,
1271 query: &ExplorerQuery,
1272 strategy: ExplorerStrategy,
1273 row_payload_mode: ExplorerRowPayloadMode,
1274 mut control: Option<&mut ExplorerControl<'_>>,
1275 ) -> Result<ExplorerResult> {
1276 match strategy {
1277 ExplorerStrategy::Traversal => {
1278 self.explore_traversal(query, row_payload_mode, control.as_deref_mut())
1279 }
1280 ExplorerStrategy::Index => {
1281 self.explore_indexed(query, row_payload_mode, control.as_deref_mut())
1282 }
1283 ExplorerStrategy::Compare => self.explore_compare(query, row_payload_mode),
1284 }
1285 }
1286
1287 fn explore_traversal(
1288 &mut self,
1289 query: &ExplorerQuery,
1290 row_payload_mode: ExplorerRowPayloadMode,
1291 mut control: Option<&mut ExplorerControl<'_>>,
1292 ) -> Result<ExplorerResult> {
1293 validate_query(query)?;
1294 let mut result = explorer_result_for_query(query);
1295 let facet_groups = facet_pass_groups(query);
1296 if can_run_combined_explorer_pass(&facet_groups) {
1297 self.explore_traversal_combined(
1298 query,
1299 row_payload_mode,
1300 &mut result,
1301 &facet_groups,
1302 control.as_deref_mut(),
1303 )?;
1304 } else {
1305 self.explore_traversal_split(
1306 query,
1307 row_payload_mode,
1308 &mut result,
1309 facet_groups,
1310 control.as_deref_mut(),
1311 )?;
1312 }
1313 self.configure_explorer_filters(query, None)?;
1314 Ok(result)
1315 }
1316
1317 fn explore_traversal_combined(
1318 &mut self,
1319 query: &ExplorerQuery,
1320 row_payload_mode: ExplorerRowPayloadMode,
1321 result: &mut ExplorerResult,
1322 facet_groups: &[FacetPassGroup],
1323 control: Option<&mut ExplorerControl<'_>>,
1324 ) -> Result<()> {
1325 let facet_indices = combined_facet_indices(facet_groups);
1326 if query_needs_main_pass(query) || !facet_indices.is_empty() {
1327 self.configure_explorer_filters(query, None)?;
1328 let mut accumulator =
1329 ExplorerAccumulator::for_combined(query, &facet_indices, result.histogram.as_ref());
1330 self.scan_explorer_combined(
1331 query,
1332 &mut accumulator,
1333 result,
1334 !facet_indices.is_empty(),
1335 row_payload_mode,
1336 control,
1337 )?;
1338 accumulator.finish_facets(result);
1339 accumulator.finish_histogram(result.histogram.as_mut());
1340 }
1341 Ok(())
1342 }
1343
1344 fn explore_traversal_split(
1345 &mut self,
1346 query: &ExplorerQuery,
1347 row_payload_mode: ExplorerRowPayloadMode,
1348 result: &mut ExplorerResult,
1349 facet_groups: Vec<FacetPassGroup>,
1350 mut control: Option<&mut ExplorerControl<'_>>,
1351 ) -> Result<()> {
1352 if query_needs_main_pass(query) {
1353 self.configure_explorer_filters(query, None)?;
1354 let mut accumulator = ExplorerAccumulator::for_main(query, result.histogram.as_ref());
1355 self.scan_explorer_main(
1356 query,
1357 &mut accumulator,
1358 result,
1359 row_payload_mode,
1360 control.as_deref_mut(),
1361 )?;
1362 accumulator.finish_histogram(result.histogram.as_mut());
1363 }
1364
1365 for group in facet_groups {
1366 if explorer_control_stopped(control.as_deref()) {
1367 break;
1368 }
1369 self.configure_explorer_filters(query, group.excluded_field.as_deref())?;
1370 let mut accumulator = ExplorerAccumulator::for_facets(
1371 query,
1372 &group.facet_indices,
1373 facet_pass_needs_source_realtime(query),
1374 );
1375 self.scan_explorer_facet(
1376 query,
1377 &mut accumulator,
1378 &mut result.stats,
1379 control.as_deref_mut(),
1380 )?;
1381 accumulator.finish_facets(result);
1382 }
1383 Ok(())
1384 }
1385
1386 fn explore_compare(
1387 &mut self,
1388 query: &ExplorerQuery,
1389 row_payload_mode: ExplorerRowPayloadMode,
1390 ) -> Result<ExplorerResult> {
1391 let traversal_started = Instant::now();
1392 let traversal = self.explore_traversal(query, row_payload_mode, None)?;
1393 let traversal_duration = traversal_started.elapsed();
1394
1395 let index_started = Instant::now();
1396 let mut indexed = self.explore_indexed(query, row_payload_mode, None)?;
1397 let index_duration = index_started.elapsed();
1398
1399 if !explorer_outputs_match(&traversal, &indexed) {
1400 return Err(SdkError::VerificationError(
1401 "indexed explorer output differs from traversal explorer output".to_string(),
1402 ));
1403 }
1404 indexed.comparison = Some(ExplorerComparison {
1405 traversal_duration,
1406 index_duration,
1407 traversal_stats: traversal.stats,
1408 index_stats: indexed.stats.clone(),
1409 });
1410 Ok(indexed)
1411 }
1412
1413 fn explore_indexed(
1414 &mut self,
1415 query: &ExplorerQuery,
1416 row_payload_mode: ExplorerRowPayloadMode,
1417 mut control: Option<&mut ExplorerControl<'_>>,
1418 ) -> Result<ExplorerResult> {
1419 validate_query(query)?;
1420 validate_indexed_query(query)?;
1421 let mut result = explorer_result_for_query(query);
1422 self.indexed_collect_rows(query, row_payload_mode, &mut result, control.as_deref_mut())?;
1423 self.indexed_collect_facets(query, &mut result, control.as_deref())?;
1424 self.indexed_collect_histogram(query, &mut result, control.as_deref())?;
1425 self.configure_explorer_filters(query, None)?;
1426 Ok(result)
1427 }
1428
1429 fn indexed_collect_rows(
1430 &mut self,
1431 query: &ExplorerQuery,
1432 row_payload_mode: ExplorerRowPayloadMode,
1433 result: &mut ExplorerResult,
1434 control: Option<&mut ExplorerControl<'_>>,
1435 ) -> Result<()> {
1436 if query.limit == 0 {
1437 return Ok(());
1438 }
1439 let mut row_query = query.clone();
1440 row_query.facets.clear();
1441 row_query.histogram = None;
1442 self.configure_explorer_filters(&row_query, None)?;
1443 let mut accumulator = ExplorerAccumulator::for_main(&row_query, None);
1444 self.scan_explorer_main(
1445 &row_query,
1446 &mut accumulator,
1447 result,
1448 row_payload_mode,
1449 control,
1450 )
1451 }
1452
1453 fn indexed_collect_facets(
1454 &mut self,
1455 query: &ExplorerQuery,
1456 result: &mut ExplorerResult,
1457 control: Option<&ExplorerControl<'_>>,
1458 ) -> Result<()> {
1459 if explorer_control_stopped(control) {
1460 return Ok(());
1461 }
1462 for group in facet_pass_groups(query) {
1463 let candidates = self.indexed_candidate_set(query, group.excluded_field.as_deref())?;
1464 self.inner.with_file(|file| {
1465 indexed_count_facet_group(file, query, &group, &candidates, result)
1466 })?;
1467 }
1468 Ok(())
1469 }
1470
1471 fn indexed_collect_histogram(
1472 &mut self,
1473 query: &ExplorerQuery,
1474 result: &mut ExplorerResult,
1475 control: Option<&ExplorerControl<'_>>,
1476 ) -> Result<()> {
1477 if query.histogram.is_none() || explorer_control_stopped(control) {
1478 return Ok(());
1479 }
1480 let candidates = self.indexed_candidate_set(query, None)?;
1481 self.inner
1482 .with_file(|file| indexed_count_histogram(file, query, &candidates, result))
1483 }
1484
1485 fn indexed_candidate_set(
1486 &mut self,
1487 query: &ExplorerQuery,
1488 excluded_field: Option<&[u8]>,
1489 ) -> Result<IndexedCandidateSet> {
1490 if query.filters.is_empty()
1491 && query.after_realtime_usec.is_none()
1492 && query.before_realtime_usec.is_none()
1493 {
1494 let count = self
1495 .inner
1496 .with_file(|file| file.journal_header_ref().n_entries);
1497 return Ok(IndexedCandidateSet::All { count });
1498 }
1499
1500 self.configure_explorer_filters(query, excluded_field)?;
1501 self.seek_for_explorer(query);
1502 let mut offsets = HashSet::new();
1503 while self.step_for_explorer(query.direction)? {
1504 let Some(metadata) = self.row.metadata() else {
1505 continue;
1506 };
1507 let commit_realtime = metadata.realtime;
1508 if stop_by_commit_time(query, commit_realtime) {
1509 break;
1510 }
1511 if !timestamp_in_range(query, commit_realtime) {
1512 continue;
1513 }
1514 if let Some(entry_offset) = self.row.entry_offset() {
1515 offsets.insert(entry_offset);
1516 }
1517 }
1518 Ok(IndexedCandidateSet::Set {
1519 count: offsets.len() as u64,
1520 offsets,
1521 })
1522 }
1523
1524 fn configure_explorer_filters(
1525 &mut self,
1526 query: &ExplorerQuery,
1527 excluded_field: Option<&[u8]>,
1528 ) -> Result<()> {
1529 self.flush_matches();
1530 for filter in &query.filters {
1531 if excluded_field.is_some_and(|field| field == filter.field.as_slice()) {
1532 continue;
1533 }
1534 if filter.values.is_empty() {
1535 continue;
1536 }
1537 for value in &filter.values {
1538 let payload = payload_from_parts(&filter.field, value);
1539 self.add_match(&payload);
1540 }
1541 }
1542 Ok(())
1543 }
1544
1545 fn next_explorer_row_frame(
1546 &mut self,
1547 query: &ExplorerQuery,
1548 rows_seen: &mut u64,
1549 stats: &ExplorerStats,
1550 control: Option<&mut ExplorerControl<'_>>,
1551 ) -> Result<ExplorerLoopStep> {
1552 if !self.step_for_explorer(query.direction)? {
1553 return Ok(ExplorerLoopStep::Stop);
1554 }
1555 *rows_seen = rows_seen.saturating_add(1);
1556 if control.is_some_and(|control| control.should_stop_after_rows(*rows_seen, stats)) {
1557 return Ok(ExplorerLoopStep::Stop);
1558 }
1559 let Some(metadata) = self.row.metadata() else {
1560 return Ok(ExplorerLoopStep::Skip);
1561 };
1562 if stop_by_commit_time(query, metadata.realtime) {
1563 return Ok(ExplorerLoopStep::Stop);
1564 }
1565 if skip_by_commit_time(query, metadata.realtime) {
1566 return Ok(ExplorerLoopStep::Skip);
1567 }
1568 Ok(ExplorerLoopStep::Row(ExplorerRowFrame {
1569 commit_realtime: metadata.realtime,
1570 seqnum: metadata.seqnum,
1571 }))
1572 }
1573
1574 fn scan_row_data_or_default(
1575 &mut self,
1576 query: &ExplorerQuery,
1577 accumulator: &mut ExplorerAccumulator,
1578 row_id: &mut u64,
1579 deferred_values: &mut Vec<usize>,
1580 stats: &mut ExplorerStats,
1581 ) -> Result<RowScan> {
1582 if accumulator.required_identity_count == 0 && !query_has_fts(query) {
1583 stats.rows_examined = stats.rows_examined.saturating_add(1);
1584 return Ok(RowScan::default());
1585 }
1586 *row_id = row_id.saturating_add(1);
1587 deferred_values.clear();
1588 self.scan_current_row(
1589 query,
1590 accumulator,
1591 *row_id,
1592 ScanApply::Deferred(deferred_values),
1593 stats,
1594 )
1595 }
1596
1597 fn accepted_effective_realtime(
1598 query: &ExplorerQuery,
1599 scan: &RowScan,
1600 commit_realtime: u64,
1601 stats: &mut ExplorerStats,
1602 control: Option<&mut ExplorerControl<'_>>,
1603 ) -> Option<u64> {
1604 let mut effective_realtime = effective_realtime_from_scan(scan.timestamp, commit_realtime);
1605 record_source_realtime_delta(stats, scan.timestamp, commit_realtime);
1606 if let Some(control) = control {
1607 effective_realtime = control.adjust_realtime(effective_realtime);
1608 }
1609 (timestamp_in_range(query, effective_realtime) && !row_rejected_by_fts(query, scan))
1610 .then_some(effective_realtime)
1611 }
1612
1613 fn push_explorer_row_if_wanted(
1614 &mut self,
1615 query: &ExplorerQuery,
1616 result: &mut ExplorerResult,
1617 row_payload_mode: ExplorerRowPayloadMode,
1618 effective_realtime: u64,
1619 ) -> Result<()> {
1620 if row_within_anchor(query, effective_realtime) && result.rows.len() < query.limit {
1621 result.rows.push(self.current_explorer_row(
1622 effective_realtime,
1623 &mut result.stats,
1624 row_payload_mode,
1625 )?);
1626 }
1627 Ok(())
1628 }
1629
1630 fn apply_main_scanned_row(
1631 &mut self,
1632 query: &ExplorerQuery,
1633 accumulator: &mut ExplorerAccumulator,
1634 result: &mut ExplorerResult,
1635 row_payload_mode: ExplorerRowPayloadMode,
1636 scanned: MainScannedRow<'_>,
1637 control: Option<&mut ExplorerControl<'_>>,
1638 ) -> Result<bool> {
1639 if query.debug_collect_column_fields_by_row_traversal {
1640 result.column_fields.extend(scanned.scan.column_fields);
1641 }
1642 record_last_realtime(&mut result.stats, scanned.commit_realtime);
1643 result.stats.rows_matched = result.stats.rows_matched.saturating_add(1);
1644 let stop_after_matched_row = control.is_some_and(|control| {
1645 control.emit_matched_row(scanned.effective_realtime, result.stats.rows_matched)
1646 });
1647 for value_index in scanned.deferred_values {
1648 accumulator.apply_value(
1649 *value_index,
1650 Some(scanned.effective_realtime),
1651 &mut result.stats,
1652 );
1653 }
1654 accumulator.finish_histogram_row(
1655 scanned.row_id,
1656 scanned.effective_realtime,
1657 &mut result.stats,
1658 );
1659 self.push_explorer_row_if_wanted(
1660 query,
1661 result,
1662 row_payload_mode,
1663 scanned.effective_realtime,
1664 )?;
1665 Ok(stop_after_matched_row
1666 || should_stop_when_rows_full(
1667 query,
1668 &result.rows,
1669 scanned.effective_realtime,
1670 result.stats.rows_matched,
1671 ))
1672 }
1673
1674 fn sampling_state_for_combined(
1675 query: &ExplorerQuery,
1676 result: &ExplorerResult,
1677 control: Option<&mut ExplorerControl<'_>>,
1678 ) -> Option<ExplorerSamplingState> {
1679 let sampling = ExplorerSamplingState::for_query(
1680 query,
1681 result
1682 .histogram
1683 .as_ref()
1684 .map(|histogram| histogram.buckets.len()),
1685 );
1686 if let Some(control) = control {
1687 if let (Some(shared_sampling), Some(file_sampling)) =
1688 (control.sampling.as_deref_mut(), query.sampling)
1689 {
1690 shared_sampling.begin_file(file_sampling);
1691 }
1692 }
1693 sampling
1694 }
1695
1696 fn combined_sampling_decision(
1697 query: &ExplorerQuery,
1698 rows: &[ExplorerRow],
1699 frame: ExplorerRowFrame,
1700 sampling: &mut Option<ExplorerSamplingState>,
1701 mut control: Option<&mut ExplorerControl<'_>>,
1702 ) -> Option<ExplorerSamplingDecision> {
1703 let candidate_to_keep = if let Some(control) = control.as_deref_mut() {
1704 control.candidate_row.as_deref_mut().map_or_else(
1705 || row_candidate_to_keep(query, rows, frame.commit_realtime),
1706 |candidate_row| candidate_row(frame.commit_realtime),
1707 )
1708 } else {
1709 row_candidate_to_keep(query, rows, frame.commit_realtime)
1710 };
1711 if let Some(control) = control {
1712 if let Some(shared_sampling) = control.sampling.as_deref_mut() {
1713 return Some(shared_sampling.decide(
1714 frame.commit_realtime,
1715 frame.seqnum,
1716 candidate_to_keep,
1717 ));
1718 }
1719 }
1720 sampling
1721 .as_mut()
1722 .map(|sampling| sampling.decide(frame.commit_realtime, frame.seqnum, candidate_to_keep))
1723 }
1724
1725 fn apply_combined_sampling_decision(
1726 decision: ExplorerSamplingDecision,
1727 mode: CombinedScanMode,
1728 result: &mut ExplorerResult,
1729 frame: ExplorerRowFrame,
1730 ) -> SamplingRowAction {
1731 match decision {
1732 ExplorerSamplingDecision::Full { sampled } => {
1733 if sampled {
1734 result.stats.sampling_sampled = result.stats.sampling_sampled.saturating_add(1);
1735 }
1736 SamplingRowAction::Scan
1737 }
1738 ExplorerSamplingDecision::SkipFields => {
1739 record_combined_unsampled_row(
1740 &mut result.stats,
1741 mode,
1742 frame.commit_realtime,
1743 1,
1744 true,
1745 );
1746 add_special_histogram_value(
1747 result.histogram.as_mut(),
1748 frame.commit_realtime,
1749 EXPLORER_UNSAMPLED_VALUE,
1750 1,
1751 &mut result.stats,
1752 );
1753 SamplingRowAction::Skip
1754 }
1755 ExplorerSamplingDecision::StopAndEstimate {
1756 remaining_rows,
1757 from_realtime_usec,
1758 to_realtime_usec,
1759 } => {
1760 record_combined_unsampled_row(
1761 &mut result.stats,
1762 mode,
1763 frame.commit_realtime,
1764 remaining_rows,
1765 false,
1766 );
1767 result.stats.rows_estimated =
1768 result.stats.rows_estimated.saturating_add(remaining_rows);
1769 result.stats.sampling_estimated = result
1770 .stats
1771 .sampling_estimated
1772 .saturating_add(remaining_rows);
1773 add_estimated_histogram_range(
1774 result.histogram.as_mut(),
1775 from_realtime_usec,
1776 to_realtime_usec,
1777 remaining_rows,
1778 &mut result.stats,
1779 );
1780 SamplingRowAction::Stop
1781 }
1782 }
1783 }
1784
1785 fn apply_combined_scanned_row(
1786 &mut self,
1787 query: &ExplorerQuery,
1788 accumulator: &mut ExplorerAccumulator,
1789 result: &mut ExplorerResult,
1790 row_payload_mode: ExplorerRowPayloadMode,
1791 mode: CombinedScanMode,
1792 scanned: MainScannedRow<'_>,
1793 control: Option<&mut ExplorerControl<'_>>,
1794 ) -> Result<bool> {
1795 if query.debug_collect_column_fields_by_row_traversal {
1796 result.column_fields.extend(scanned.scan.column_fields);
1797 }
1798 record_last_realtime(&mut result.stats, scanned.commit_realtime);
1799 let stop_after_matched_row = update_combined_matched_stats(
1800 &mut result.stats,
1801 mode,
1802 scanned.effective_realtime,
1803 control,
1804 );
1805 let value_realtime = query
1806 .histogram
1807 .is_some()
1808 .then_some(scanned.effective_realtime);
1809 for value_index in scanned.deferred_values {
1810 accumulator.apply_value(*value_index, value_realtime, &mut result.stats);
1811 }
1812 if query.histogram.is_some() {
1813 accumulator.finish_histogram_row(
1814 scanned.row_id,
1815 scanned.effective_realtime,
1816 &mut result.stats,
1817 );
1818 }
1819 if mode.include_facets {
1820 accumulator.finish_facet_row(scanned.row_id, &mut result.stats);
1821 }
1822 self.push_explorer_row_if_wanted(
1823 query,
1824 result,
1825 row_payload_mode,
1826 scanned.effective_realtime,
1827 )?;
1828 Ok(stop_after_matched_row
1829 || should_stop_when_rows_full(
1830 query,
1831 &result.rows,
1832 scanned.effective_realtime,
1833 result.stats.rows_matched,
1834 ))
1835 }
1836
1837 fn scan_explorer_main(
1838 &mut self,
1839 query: &ExplorerQuery,
1840 accumulator: &mut ExplorerAccumulator,
1841 result: &mut ExplorerResult,
1842 row_payload_mode: ExplorerRowPayloadMode,
1843 mut control: Option<&mut ExplorerControl<'_>>,
1844 ) -> Result<()> {
1845 self.seek_for_explorer(query);
1846 let mut row_id = 0u64;
1847 let mut rows_seen = 0u64;
1848 let mut deferred_values = Vec::new();
1849 loop {
1850 let frame = match self.next_explorer_row_frame(
1851 query,
1852 &mut rows_seen,
1853 &result.stats,
1854 control.as_deref_mut(),
1855 )? {
1856 ExplorerLoopStep::Stop => break,
1857 ExplorerLoopStep::Skip => continue,
1858 ExplorerLoopStep::Row(frame) => frame,
1859 };
1860 let scan = self.scan_row_data_or_default(
1861 query,
1862 accumulator,
1863 &mut row_id,
1864 &mut deferred_values,
1865 &mut result.stats,
1866 )?;
1867 let Some(effective_realtime) = Self::accepted_effective_realtime(
1868 query,
1869 &scan,
1870 frame.commit_realtime,
1871 &mut result.stats,
1872 control.as_deref_mut(),
1873 ) else {
1874 continue;
1875 };
1876 let scanned = MainScannedRow {
1877 row_id,
1878 commit_realtime: frame.commit_realtime,
1879 effective_realtime,
1880 scan,
1881 deferred_values: &deferred_values,
1882 };
1883 if self.apply_main_scanned_row(
1884 query,
1885 accumulator,
1886 result,
1887 row_payload_mode,
1888 scanned,
1889 control.as_deref_mut(),
1890 )? {
1891 break;
1892 }
1893 }
1894 result.stats.rows_returned = result.rows.len() as u64;
1895 Ok(())
1896 }
1897
1898 fn scan_explorer_combined(
1899 &mut self,
1900 query: &ExplorerQuery,
1901 accumulator: &mut ExplorerAccumulator,
1902 result: &mut ExplorerResult,
1903 include_facets: bool,
1904 row_payload_mode: ExplorerRowPayloadMode,
1905 mut control: Option<&mut ExplorerControl<'_>>,
1906 ) -> Result<()> {
1907 self.seek_for_explorer(query);
1908 let mode = CombinedScanMode {
1909 include_main: query_needs_main_pass(query),
1910 include_facets,
1911 };
1912 let mut row_id = 0u64;
1913 let mut rows_seen = 0u64;
1914 let mut deferred_values = Vec::new();
1915 let mut sampling = Self::sampling_state_for_combined(query, result, control.as_deref_mut());
1916 loop {
1917 let frame = match self.next_explorer_row_frame(
1918 query,
1919 &mut rows_seen,
1920 &result.stats,
1921 control.as_deref_mut(),
1922 )? {
1923 ExplorerLoopStep::Stop => break,
1924 ExplorerLoopStep::Skip => continue,
1925 ExplorerLoopStep::Row(frame) => frame,
1926 };
1927 if let Some(decision) = Self::combined_sampling_decision(
1928 query,
1929 &result.rows,
1930 frame,
1931 &mut sampling,
1932 control.as_deref_mut(),
1933 ) {
1934 match Self::apply_combined_sampling_decision(decision, mode, result, frame) {
1935 SamplingRowAction::Scan => {}
1936 SamplingRowAction::Skip => continue,
1937 SamplingRowAction::Stop => break,
1938 }
1939 }
1940 let scan = self.scan_row_data_or_default(
1941 query,
1942 accumulator,
1943 &mut row_id,
1944 &mut deferred_values,
1945 &mut result.stats,
1946 )?;
1947 let Some(effective_realtime) = Self::accepted_effective_realtime(
1948 query,
1949 &scan,
1950 frame.commit_realtime,
1951 &mut result.stats,
1952 control.as_deref_mut(),
1953 ) else {
1954 continue;
1955 };
1956 let scanned = MainScannedRow {
1957 row_id,
1958 commit_realtime: frame.commit_realtime,
1959 effective_realtime,
1960 scan,
1961 deferred_values: &deferred_values,
1962 };
1963 if self.apply_combined_scanned_row(
1964 query,
1965 accumulator,
1966 result,
1967 row_payload_mode,
1968 mode,
1969 scanned,
1970 control.as_deref_mut(),
1971 )? {
1972 break;
1973 }
1974 }
1975 result.stats.rows_returned = result.rows.len() as u64;
1976 Ok(())
1977 }
1978
1979 fn scan_explorer_facet(
1980 &mut self,
1981 query: &ExplorerQuery,
1982 accumulator: &mut ExplorerAccumulator,
1983 stats: &mut ExplorerStats,
1984 mut control: Option<&mut ExplorerControl<'_>>,
1985 ) -> Result<()> {
1986 self.seek_for_explorer(query);
1987 let defer_apply = query.after_realtime_usec.is_some()
1988 || query.before_realtime_usec.is_some()
1989 || query_has_fts(query);
1990 let mut row_id = 0u64;
1991 let mut rows_seen = 0u64;
1992 let mut deferred_values = Vec::new();
1993 loop {
1994 let frame = match self.next_explorer_row_frame(
1995 query,
1996 &mut rows_seen,
1997 stats,
1998 control.as_deref_mut(),
1999 )? {
2000 ExplorerLoopStep::Stop => break,
2001 ExplorerLoopStep::Skip => continue,
2002 ExplorerLoopStep::Row(frame) => frame,
2003 };
2004 row_id = row_id.saturating_add(1);
2005 deferred_values.clear();
2006 let scan = if defer_apply {
2007 self.scan_current_row(
2008 query,
2009 accumulator,
2010 row_id,
2011 ScanApply::Deferred(&mut deferred_values),
2012 stats,
2013 )?
2014 } else {
2015 self.scan_current_row(query, accumulator, row_id, ScanApply::Immediate, stats)?
2016 };
2017 if Self::accepted_effective_realtime(query, &scan, frame.commit_realtime, stats, None)
2018 .is_none()
2019 {
2020 continue;
2021 }
2022 record_last_realtime(stats, frame.commit_realtime);
2023 stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(1);
2024 if defer_apply {
2025 for value_index in &deferred_values {
2026 accumulator.apply_value(*value_index, None, stats);
2027 }
2028 }
2029 accumulator.finish_facet_row(row_id, stats);
2030 }
2031 Ok(())
2032 }
2033
2034 fn scan_current_row(
2035 &mut self,
2036 query: &ExplorerQuery,
2037 accumulator: &mut ExplorerAccumulator,
2038 row_id: u64,
2039 mut apply: ScanApply<'_>,
2040 stats: &mut ExplorerStats,
2041 ) -> Result<RowScan> {
2042 stats.rows_examined = stats.rows_examined.saturating_add(1);
2043 let mut out = RowScan::default();
2044 let mut state = RowScanState::new(query, accumulator);
2045
2046 let inner = &mut self.inner;
2047 let row = &mut self.row;
2048 inner.with_mut(|fields| {
2049 fields.reader.release_object_guards();
2050 row.restart_data()?;
2051 let result = (|| {
2052 for index in 0..row.data_offset_count() {
2053 let Some(data_offset) = row.data_offset_at(index) else {
2054 break;
2055 };
2056 stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2057 let class = classify_data_for_accumulator(
2058 fields.file,
2059 row,
2060 data_offset,
2061 accumulator,
2062 state.needs_fts,
2063 query,
2064 query
2065 .debug_collect_column_fields_by_row_traversal
2066 .then_some(&mut out.column_fields),
2067 stats,
2068 )?;
2069
2070 handle_row_offset_class(
2071 class,
2072 accumulator,
2073 row_id,
2074 &mut state,
2075 &mut out,
2076 &mut apply,
2077 stats,
2078 );
2079 if state.should_stop_row_scan() {
2080 record_row_scan_early_stop(stats);
2081 break;
2082 }
2083 }
2084 Ok::<_, SdkError>(())
2085 })();
2086 row.reset_data_state(fields.file)?;
2087 result
2088 })?;
2089 Ok(out)
2090 }
2091
2092 fn seek_for_explorer(&mut self, query: &ExplorerQuery) {
2093 let anchor = if query.stop_when_rows_full {
2094 query.anchor
2095 } else {
2096 ExplorerAnchor::Auto
2097 };
2098 match query.direction {
2099 Direction::Forward => match anchor {
2100 ExplorerAnchor::Auto => {
2101 if let Some(after) = query.after_realtime_usec {
2102 self.seek_realtime(after.saturating_sub(query.realtime_slack_usec));
2103 } else {
2104 self.seek_head();
2105 }
2106 }
2107 ExplorerAnchor::Realtime(usec) => self.seek_realtime(usec),
2108 ExplorerAnchor::Tail => self.seek_tail(),
2109 ExplorerAnchor::Head => {
2110 if let Some(after) = query.after_realtime_usec {
2111 self.seek_realtime(after.saturating_sub(query.realtime_slack_usec));
2112 } else {
2113 self.seek_head();
2114 }
2115 }
2116 },
2117 Direction::Backward => match anchor {
2118 ExplorerAnchor::Auto => {
2119 if let Some(before) = query.before_realtime_usec {
2120 self.seek_realtime(before.saturating_add(query.realtime_slack_usec));
2121 } else {
2122 self.seek_tail();
2123 }
2124 }
2125 ExplorerAnchor::Realtime(usec) => self.seek_realtime(usec),
2126 ExplorerAnchor::Head => self.seek_head(),
2127 ExplorerAnchor::Tail => {
2128 if let Some(before) = query.before_realtime_usec {
2129 self.seek_realtime(before.saturating_add(query.realtime_slack_usec));
2130 } else {
2131 self.seek_tail();
2132 }
2133 }
2134 },
2135 }
2136 }
2137
2138 fn step_for_explorer(&mut self, direction: Direction) -> Result<bool> {
2139 match direction {
2140 Direction::Forward => self.next(),
2141 Direction::Backward => self.previous(),
2142 }
2143 }
2144
2145 fn current_explorer_row(
2146 &mut self,
2147 realtime_usec: u64,
2148 stats: &mut ExplorerStats,
2149 row_payload_mode: ExplorerRowPayloadMode,
2150 ) -> Result<ExplorerRow> {
2151 let cursor = self.get_cursor()?;
2152 let mut payloads = Vec::new();
2153 if row_payload_mode == ExplorerRowPayloadMode::Expand {
2154 self.collect_entry_payloads(&mut payloads)?;
2155 stats.returned_row_expansions = stats.returned_row_expansions.saturating_add(1);
2156 }
2157 Ok(ExplorerRow {
2158 realtime_usec,
2159 cursor,
2160 payloads,
2161 })
2162 }
2163}
2164
2165enum ScanApply<'a> {
2166 Immediate,
2167 Deferred(&'a mut Vec<usize>),
2168}
2169
2170#[derive(Debug, Clone, Copy)]
2171struct ExplorerRowFrame {
2172 commit_realtime: u64,
2173 seqnum: u64,
2174}
2175
2176enum ExplorerLoopStep {
2177 Stop,
2178 Skip,
2179 Row(ExplorerRowFrame),
2180}
2181
2182#[derive(Debug, Clone, Copy)]
2183struct CombinedScanMode {
2184 include_main: bool,
2185 include_facets: bool,
2186}
2187
2188struct MainScannedRow<'a> {
2189 row_id: u64,
2190 commit_realtime: u64,
2191 effective_realtime: u64,
2192 scan: RowScan,
2193 deferred_values: &'a [usize],
2194}
2195
2196struct RowScanState {
2197 use_first_value: bool,
2198 needs_fts: bool,
2199 collect_column_fields: bool,
2200 fields_missing_from_row: usize,
2201}
2202
2203impl RowScanState {
2204 fn new(query: &ExplorerQuery, accumulator: &ExplorerAccumulator) -> Self {
2205 let use_first_value = query.field_mode == ExplorerFieldMode::FirstValue;
2206 Self {
2207 use_first_value,
2208 needs_fts: query_has_fts(query),
2209 collect_column_fields: query.debug_collect_column_fields_by_row_traversal,
2210 fields_missing_from_row: if use_first_value {
2211 accumulator.required_identity_count
2212 } else {
2213 0
2214 },
2215 }
2216 }
2217
2218 fn should_stop_row_scan(&self) -> bool {
2219 self.use_first_value
2220 && !self.needs_fts
2221 && !self.collect_column_fields
2222 && self.fields_missing_from_row == 0
2223 }
2224}
2225
2226enum SamplingRowAction {
2227 Scan,
2228 Skip,
2229 Stop,
2230}
2231
2232enum IndexedCandidateSet {
2233 All {
2234 count: u64,
2235 },
2236 Set {
2237 count: u64,
2238 offsets: HashSet<NonZeroU64>,
2239 },
2240}
2241
2242impl IndexedCandidateSet {
2243 fn count(&self) -> u64 {
2244 match self {
2245 Self::All { count } | Self::Set { count, .. } => *count,
2246 }
2247 }
2248
2249 fn contains(&self, entry_offset: NonZeroU64) -> bool {
2250 match self {
2251 Self::All { .. } => true,
2252 Self::Set { offsets, .. } => offsets.contains(&entry_offset),
2253 }
2254 }
2255}
2256
2257struct FacetPassGroup {
2258 excluded_field: Option<Vec<u8>>,
2259 facet_indices: Vec<usize>,
2260}
2261
2262fn facet_pass_groups(query: &ExplorerQuery) -> Vec<FacetPassGroup> {
2263 let filter_fields: HashSet<&[u8]> = query
2264 .filters
2265 .iter()
2266 .map(|filter| filter.field.as_slice())
2267 .collect();
2268 let mut groups: Vec<FacetPassGroup> = Vec::new();
2269
2270 for (index, facet) in query.facets.iter().enumerate() {
2271 let excluded_field = (query.exclude_facet_field_filters
2272 && filter_fields.contains(facet.as_slice()))
2273 .then(|| facet.clone());
2274 if let Some(existing) = groups
2275 .iter_mut()
2276 .find(|group| group.excluded_field.as_deref() == excluded_field.as_deref())
2277 {
2278 existing.facet_indices.push(index);
2279 } else {
2280 groups.push(FacetPassGroup {
2281 excluded_field,
2282 facet_indices: vec![index],
2283 });
2284 }
2285 }
2286
2287 groups
2288}
2289
2290fn indexed_count_facet_group(
2291 file: &JournalFile<Mmap>,
2292 query: &ExplorerQuery,
2293 group: &FacetPassGroup,
2294 candidates: &IndexedCandidateSet,
2295 result: &mut ExplorerResult,
2296) -> Result<()> {
2297 result.stats.facet_rows_matched = result
2298 .stats
2299 .facet_rows_matched
2300 .saturating_add(candidates.count());
2301
2302 for facet_index in &group.facet_indices {
2303 let Some(field) = query.facets.get(*facet_index) else {
2304 continue;
2305 };
2306 let mut values = HashMap::new();
2307 let mut rows_with_field = HashSet::new();
2308 let mut decompressed = Vec::new();
2309
2310 for item in file.field_data_objects_with_offsets(field)? {
2311 let (_, data) = item?;
2312 let Some((value, cursor)) =
2313 indexed_value_and_cursor(&data, field, &mut decompressed, &mut result.stats)?
2314 else {
2315 continue;
2316 };
2317 drop(data);
2318
2319 let count = indexed_count_facet_entries(
2320 file,
2321 cursor,
2322 candidates,
2323 &mut rows_with_field,
2324 &mut result.stats,
2325 )?;
2326 if count == 0 {
2327 continue;
2328 }
2329 increment_counter_by(&mut values, &value, count);
2330 result.stats.facet_updates = result.stats.facet_updates.saturating_add(count);
2331 }
2332
2333 let unset = candidates
2334 .count()
2335 .saturating_sub(rows_with_field.len() as u64);
2336 if unset != 0 {
2337 increment_counter_by(&mut values, UNSET_VALUE, unset);
2338 result.stats.facet_updates = result.stats.facet_updates.saturating_add(unset);
2339 }
2340 result.facets.insert(field.clone(), values);
2341 }
2342
2343 Ok(())
2344}
2345
2346fn indexed_count_histogram(
2347 file: &JournalFile<Mmap>,
2348 query: &ExplorerQuery,
2349 candidates: &IndexedCandidateSet,
2350 result: &mut ExplorerResult,
2351) -> Result<()> {
2352 let Some(histogram) = result.histogram.as_mut() else {
2353 return Ok(());
2354 };
2355 let field = histogram.field.clone();
2356 let mut decompressed = Vec::new();
2357 let mut rows_with_field = HashSet::new();
2358
2359 for item in file.field_data_objects_with_offsets(&field)? {
2360 let (_, data) = item?;
2361 let Some((value, cursor)) =
2362 indexed_value_and_cursor(&data, &field, &mut decompressed, &mut result.stats)?
2363 else {
2364 continue;
2365 };
2366 drop(data);
2367
2368 indexed_count_histogram_entries(
2369 file,
2370 cursor,
2371 candidates,
2372 &value,
2373 histogram,
2374 query,
2375 &mut rows_with_field,
2376 &mut result.stats,
2377 )?;
2378 }
2379
2380 indexed_count_histogram_unset_entries(
2381 file,
2382 candidates,
2383 &rows_with_field,
2384 histogram,
2385 query,
2386 &mut result.stats,
2387 )?;
2388
2389 Ok(())
2390}
2391
2392fn indexed_value_and_cursor(
2393 data: &DataObject<&[u8]>,
2394 field: &[u8],
2395 decompressed: &mut Vec<u8>,
2396 stats: &mut ExplorerStats,
2397) -> Result<Option<(Vec<u8>, Option<InlinedCursor>)>> {
2398 stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2399 stats.data_payloads_loaded = stats.data_payloads_loaded.saturating_add(1);
2400 let payload = if data.is_compressed() {
2401 decompressed.clear();
2402 let len = data.decompress(decompressed)?;
2403 stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2404 &decompressed[..len]
2405 } else {
2406 data.raw_payload()
2407 };
2408
2409 let Some((payload_field, value)) = split_payload_bytes(payload) else {
2410 return Ok(None);
2411 };
2412 if payload_field != field {
2413 return Ok(None);
2414 }
2415 Ok(Some((value.to_vec(), data.inlined_cursor())))
2416}
2417
2418fn indexed_count_facet_entries(
2419 file: &JournalFile<Mmap>,
2420 cursor: Option<InlinedCursor>,
2421 candidates: &IndexedCandidateSet,
2422 rows_with_field: &mut HashSet<NonZeroU64>,
2423 stats: &mut ExplorerStats,
2424) -> Result<u64> {
2425 let mut count = 0u64;
2426 indexed_visit_entries(file, cursor, |entry_offset| {
2427 stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2428 if candidates.contains(entry_offset) {
2429 count = count.saturating_add(1);
2430 rows_with_field.insert(entry_offset);
2431 }
2432 Ok(())
2433 })?;
2434 Ok(count)
2435}
2436
2437fn indexed_count_histogram_entries(
2438 file: &JournalFile<Mmap>,
2439 cursor: Option<InlinedCursor>,
2440 candidates: &IndexedCandidateSet,
2441 value: &[u8],
2442 histogram: &mut ExplorerHistogram,
2443 query: &ExplorerQuery,
2444 rows_with_field: &mut HashSet<NonZeroU64>,
2445 stats: &mut ExplorerStats,
2446) -> Result<()> {
2447 let histogram_start = histogram
2448 .buckets
2449 .first()
2450 .map(|bucket| bucket.start_realtime_usec)
2451 .unwrap_or_default();
2452 let histogram_bucket_width = histogram
2453 .buckets
2454 .first()
2455 .map(|bucket| {
2456 bucket
2457 .end_realtime_usec
2458 .saturating_sub(bucket.start_realtime_usec)
2459 .max(1)
2460 })
2461 .unwrap_or(1);
2462 let histogram_bucket_count = histogram.buckets.len();
2463
2464 indexed_visit_entries(file, cursor, |entry_offset| {
2465 stats.data_refs_seen = stats.data_refs_seen.saturating_add(1);
2466 if !candidates.contains(entry_offset) {
2467 return Ok(());
2468 }
2469 let entry = file.entry_ref(entry_offset)?;
2470 let realtime = entry.header.realtime;
2471 drop(entry);
2472 rows_with_field.insert(entry_offset);
2473 if !timestamp_in_range(query, realtime) {
2474 return Ok(());
2475 }
2476 let Some(bucket_index) = histogram_bucket_index_from_bounds(
2477 realtime,
2478 histogram_start,
2479 histogram_bucket_width,
2480 histogram_bucket_count,
2481 ) else {
2482 return Ok(());
2483 };
2484 if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
2485 increment_counter_by(&mut bucket.values, value, 1);
2486 stats.histogram_updates = stats.histogram_updates.saturating_add(1);
2487 }
2488 Ok(())
2489 })
2490}
2491
2492fn indexed_count_histogram_unset_entries(
2493 file: &JournalFile<Mmap>,
2494 candidates: &IndexedCandidateSet,
2495 rows_with_field: &HashSet<NonZeroU64>,
2496 histogram: &mut ExplorerHistogram,
2497 query: &ExplorerQuery,
2498 stats: &mut ExplorerStats,
2499) -> Result<()> {
2500 let histogram_start = histogram
2501 .buckets
2502 .first()
2503 .map(|bucket| bucket.start_realtime_usec)
2504 .unwrap_or_default();
2505 let histogram_bucket_width = histogram
2506 .buckets
2507 .first()
2508 .map(|bucket| {
2509 bucket
2510 .end_realtime_usec
2511 .saturating_sub(bucket.start_realtime_usec)
2512 .max(1)
2513 })
2514 .unwrap_or(1);
2515 let histogram_bucket_count = histogram.buckets.len();
2516
2517 let mut visit = |entry_offset: NonZeroU64| -> Result<()> {
2518 if rows_with_field.contains(&entry_offset) {
2519 return Ok(());
2520 }
2521 let entry = file.entry_ref(entry_offset)?;
2522 let realtime = entry.header.realtime;
2523 drop(entry);
2524 if !timestamp_in_range(query, realtime) {
2525 return Ok(());
2526 }
2527 let Some(bucket_index) = histogram_bucket_index_from_bounds(
2528 realtime,
2529 histogram_start,
2530 histogram_bucket_width,
2531 histogram_bucket_count,
2532 ) else {
2533 return Ok(());
2534 };
2535 if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
2536 increment_counter_by(&mut bucket.values, UNSET_VALUE, 1);
2537 stats.histogram_updates = stats.histogram_updates.saturating_add(1);
2538 }
2539 Ok(())
2540 };
2541
2542 match candidates {
2543 IndexedCandidateSet::All { .. } => {
2544 let mut entry_offsets = Vec::new();
2545 file.entry_offsets(&mut entry_offsets)?;
2546 for entry_offset in entry_offsets {
2547 visit(entry_offset)?;
2548 }
2549 }
2550 IndexedCandidateSet::Set { offsets, .. } => {
2551 for entry_offset in offsets {
2552 visit(*entry_offset)?;
2553 }
2554 }
2555 }
2556
2557 Ok(())
2558}
2559
2560fn indexed_visit_entries<F>(
2561 file: &JournalFile<Mmap>,
2562 cursor: Option<InlinedCursor>,
2563 mut visitor: F,
2564) -> Result<()>
2565where
2566 F: FnMut(NonZeroU64) -> Result<()>,
2567{
2568 let Some(mut cursor) = cursor.map(|cursor| cursor.head()) else {
2569 return Ok(());
2570 };
2571 let mut needle = NonZeroU64::MIN;
2572 while let Some(entry_offset) = cursor.next_until(file, needle)? {
2573 visitor(entry_offset)?;
2574 let Some(next) = entry_offset.get().checked_add(1).and_then(NonZeroU64::new) else {
2575 break;
2576 };
2577 needle = next;
2578 }
2579 Ok(())
2580}
2581
2582fn handle_row_offset_class(
2583 class: OffsetClass,
2584 accumulator: &mut ExplorerAccumulator,
2585 row_id: u64,
2586 state: &mut RowScanState,
2587 out: &mut RowScan,
2588 apply: &mut ScanApply<'_>,
2589 stats: &mut ExplorerStats,
2590) {
2591 match class {
2592 OffsetClass::Irrelevant => {
2593 stats.data_refs_skipped = stats.data_refs_skipped.saturating_add(1);
2594 }
2595 OffsetClass::FtsNegativeMatch => {
2596 out.fts_negative_match = true;
2597 }
2598 OffsetClass::FtsMatch => {
2599 out.fts_matches = true;
2600 }
2601 OffsetClass::Value(value_index) => {
2602 handle_row_value_class(value_index, accumulator, row_id, state, out, apply, stats)
2603 }
2604 }
2605}
2606
2607fn handle_row_value_class(
2608 value_index: usize,
2609 accumulator: &mut ExplorerAccumulator,
2610 row_id: u64,
2611 state: &mut RowScanState,
2612 out: &mut RowScan,
2613 apply: &mut ScanApply<'_>,
2614 stats: &mut ExplorerStats,
2615) {
2616 if accumulator.value_fts_matches[value_index] {
2617 out.fts_matches = true;
2618 }
2619 let field_index = accumulator.value_field_indices[value_index];
2620 let first_for_field = if state.use_first_value
2621 || accumulator.flags[field_index] & (FACET_PUBLIC | FACET_HISTOGRAM) != 0
2622 {
2623 accumulator.mark_field_seen(field_index, row_id)
2624 } else {
2625 true
2626 };
2627 if state.use_first_value && first_for_field {
2628 state.fields_missing_from_row = state.fields_missing_from_row.saturating_sub(1);
2629 }
2630 if !state.use_first_value || first_for_field {
2631 if let Some(timestamp) = accumulator.value_source_realtime[value_index] {
2632 out.timestamp = Some(timestamp);
2633 }
2634 match apply {
2635 ScanApply::Immediate => accumulator.apply_value(value_index, None, stats),
2636 ScanApply::Deferred(values) => values.push(value_index),
2637 }
2638 }
2639}
2640
2641fn record_row_scan_early_stop(stats: &mut ExplorerStats) {
2642 stats.early_stop_opportunities = stats.early_stop_opportunities.saturating_add(1);
2643 stats.early_stops = stats.early_stops.saturating_add(1);
2644}
2645
2646fn cached_offset_class_for_accumulator(
2647 file: &JournalFile<Mmap>,
2648 row: &mut CurrentRowView,
2649 data_offset: NonZeroU64,
2650 accumulator: &ExplorerAccumulator,
2651 column_fields: Option<&mut Vec<Vec<u8>>>,
2652 stats: &mut ExplorerStats,
2653) -> Result<Option<OffsetClass>> {
2654 let Some(class) = accumulator.offset_cache.lookup(data_offset) else {
2655 return Ok(None);
2656 };
2657 if let Some(column_fields) = column_fields {
2658 if let Some((field, _)) = read_payload_field(file, row, data_offset, stats)? {
2659 column_fields.push(field);
2660 }
2661 }
2662 stats.data_cache_hits = stats.data_cache_hits.saturating_add(1);
2663 Ok(Some(class))
2664}
2665
2666fn payload_for_classification<'a>(
2667 file: &JournalFile<Mmap>,
2668 row: &'a mut CurrentRowView,
2669 data_offset: NonZeroU64,
2670 stats: &mut ExplorerStats,
2671) -> Result<&'a [u8]> {
2672 stats.data_cache_misses = stats.data_cache_misses.saturating_add(1);
2673 stats.data_payloads_loaded = stats.data_payloads_loaded.saturating_add(1);
2674 let was_compressed = file.data_ref(data_offset)?.is_compressed();
2675 let payload = row.read_payload_at(file, data_offset)?;
2676 if was_compressed {
2677 stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2678 }
2679 Ok(row.payload_slice(payload))
2680}
2681
2682fn fts_flags_for_value(
2683 value: &[u8],
2684 needs_fts: bool,
2685 query: &ExplorerQuery,
2686 stats: &mut ExplorerStats,
2687) -> (bool, bool) {
2688 if !needs_fts {
2689 return (false, false);
2690 }
2691 stats.fts_scans = stats.fts_scans.saturating_add(1);
2692 match match_fts_query(value, query) {
2693 FtsTermMatch::Positive => (true, false),
2694 FtsTermMatch::Negative => (false, true),
2695 FtsTermMatch::None => (false, false),
2696 }
2697}
2698
2699fn structured_payload_class(
2700 field: &[u8],
2701 value: &[u8],
2702 data_offset: NonZeroU64,
2703 accumulator: &mut ExplorerAccumulator,
2704 fts_matches: bool,
2705 fts_negative_match: bool,
2706) -> OffsetClass {
2707 if fts_negative_match {
2708 OffsetClass::FtsNegativeMatch
2709 } else if let Some(field_index) = accumulator.field_lookup.get(field).copied() {
2710 OffsetClass::Value(accumulator.add_value(field_index, data_offset, value, fts_matches))
2711 } else if fts_matches {
2712 OffsetClass::FtsMatch
2713 } else {
2714 OffsetClass::Irrelevant
2715 }
2716}
2717
2718fn classify_data_for_accumulator(
2719 file: &JournalFile<Mmap>,
2720 row: &mut CurrentRowView,
2721 data_offset: NonZeroU64,
2722 accumulator: &mut ExplorerAccumulator,
2723 needs_fts: bool,
2724 query: &ExplorerQuery,
2725 mut column_fields: Option<&mut Vec<Vec<u8>>>,
2726 stats: &mut ExplorerStats,
2727) -> Result<OffsetClass> {
2728 if let Some(class) = cached_offset_class_for_accumulator(
2729 file,
2730 row,
2731 data_offset,
2732 accumulator,
2733 column_fields.as_mut().map(|fields| &mut **fields),
2734 stats,
2735 )? {
2736 return Ok(class);
2737 }
2738
2739 let payload = payload_for_classification(file, row, data_offset, stats)?;
2740 let Some((field, value)) = split_payload_bytes(payload) else {
2741 let class = classify_unstructured_payload(payload, needs_fts, query, stats);
2742 accumulator.offset_cache.insert(data_offset, class);
2743 stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2744 return Ok(class);
2745 };
2746 if let Some(column_fields) = column_fields {
2747 column_fields.push(field.to_vec());
2748 }
2749
2750 let (fts_matches, fts_negative_match) = fts_flags_for_value(value, needs_fts, query, stats);
2751 let class = structured_payload_class(
2752 field,
2753 value,
2754 data_offset,
2755 accumulator,
2756 fts_matches,
2757 fts_negative_match,
2758 );
2759 accumulator.offset_cache.insert(data_offset, class);
2760 stats.data_objects_classified = stats.data_objects_classified.saturating_add(1);
2761 Ok(class)
2762}
2763
2764fn read_payload_field(
2765 file: &JournalFile<Mmap>,
2766 row: &mut CurrentRowView,
2767 data_offset: NonZeroU64,
2768 stats: &mut ExplorerStats,
2769) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
2770 let was_compressed = file.data_ref(data_offset)?.is_compressed();
2771 let payload = row.read_payload_at(file, data_offset)?;
2772 if was_compressed {
2773 stats.payloads_decompressed = stats.payloads_decompressed.saturating_add(1);
2774 }
2775 let payload = row.payload_slice(payload);
2776 Ok(split_payload_bytes(payload).map(|(field, value)| (field.to_vec(), value.to_vec())))
2777}
2778
2779fn classify_unstructured_payload(
2780 payload: &[u8],
2781 needs_fts: bool,
2782 query: &ExplorerQuery,
2783 stats: &mut ExplorerStats,
2784) -> OffsetClass {
2785 if !needs_fts {
2786 return OffsetClass::Irrelevant;
2787 }
2788 stats.fts_scans = stats.fts_scans.saturating_add(1);
2789 match match_fts_query(payload, query) {
2790 FtsTermMatch::Positive => OffsetClass::FtsMatch,
2791 FtsTermMatch::Negative => OffsetClass::FtsNegativeMatch,
2792 FtsTermMatch::None => OffsetClass::Irrelevant,
2793 }
2794}
2795
2796fn histogram_bucket_index_from_bounds(
2797 realtime_usec: u64,
2798 start_realtime_usec: u64,
2799 bucket_width_usec: u64,
2800 bucket_count: usize,
2801) -> Option<usize> {
2802 if bucket_count == 0 {
2803 return None;
2804 }
2805 realtime_usec
2806 .saturating_sub(start_realtime_usec)
2807 .checked_div(bucket_width_usec.max(1))
2808 .map(|index| (index as usize).min(bucket_count - 1))
2809}
2810
2811fn validate_query(query: &ExplorerQuery) -> Result<()> {
2812 if query
2813 .after_realtime_usec
2814 .zip(query.before_realtime_usec)
2815 .is_some_and(|(after, before)| after > before)
2816 {
2817 return Err(SdkError::InvalidPath(
2818 "after_realtime_usec must be <= before_realtime_usec".to_string(),
2819 ));
2820 }
2821 for filter in &query.filters {
2822 if filter.field.is_empty() || filter.field.contains(&b'=') {
2823 return Err(SdkError::InvalidPath(
2824 "filter field must be non-empty and must not contain '='".to_string(),
2825 ));
2826 }
2827 }
2828 for field in query.facets.iter().chain(query.histogram.iter()) {
2829 if field.is_empty() || field.contains(&b'=') {
2830 return Err(SdkError::InvalidPath(
2831 "facet and histogram fields must be non-empty and must not contain '='".to_string(),
2832 ));
2833 }
2834 }
2835 let mut seen_facets: HashSet<&[u8]> = HashSet::new();
2836 for facet in &query.facets {
2837 if !seen_facets.insert(facet) {
2838 return Err(SdkError::InvalidPath(
2839 "facet fields must not be duplicated".to_string(),
2840 ));
2841 }
2842 }
2843 Ok(())
2844}
2845
2846fn validate_no_debug_column_collection(query: &ExplorerQuery) -> Result<()> {
2847 if query.debug_collect_column_fields_by_row_traversal {
2848 return Err(SdkError::Unsupported(
2849 "debug_collect_column_fields_by_row_traversal is a debug-only discrepancy tool; production explorer queries must use FIELD-index column catalogs instead",
2850 ));
2851 }
2852 Ok(())
2853}
2854
2855fn validate_indexed_query(query: &ExplorerQuery) -> Result<()> {
2856 if query.field_mode != ExplorerFieldMode::AllValues {
2857 return Err(SdkError::Unsupported(
2858 "indexed explorer strategy requires ExplorerFieldMode::AllValues",
2859 ));
2860 }
2861 if query_has_fts(query) {
2862 return Err(SdkError::Unsupported(
2863 "indexed explorer strategy does not support FTS",
2864 ));
2865 }
2866 if query.use_source_realtime
2867 && (query.after_realtime_usec.is_some()
2868 || query.before_realtime_usec.is_some()
2869 || query.histogram.is_some())
2870 {
2871 return Err(SdkError::Unsupported(
2872 "indexed explorer strategy requires commit realtime for time-bounded facets and histograms",
2873 ));
2874 }
2875 Ok(())
2876}
2877
2878fn explorer_outputs_match(left: &ExplorerResult, right: &ExplorerResult) -> bool {
2879 if left.rows.len() != right.rows.len() {
2880 return false;
2881 }
2882 if left.rows.iter().zip(&right.rows).any(|(left, right)| {
2883 left.realtime_usec != right.realtime_usec
2884 || left.cursor != right.cursor
2885 || left.payloads != right.payloads
2886 }) {
2887 return false;
2888 }
2889 if left.facets != right.facets {
2890 return false;
2891 }
2892 explorer_histograms_match(left.histogram.as_ref(), right.histogram.as_ref())
2893}
2894
2895fn explorer_histograms_match(
2896 left: Option<&ExplorerHistogram>,
2897 right: Option<&ExplorerHistogram>,
2898) -> bool {
2899 match (left, right) {
2900 (None, None) => true,
2901 (Some(left), Some(right)) => {
2902 left.field == right.field
2903 && left.buckets.len() == right.buckets.len()
2904 && left
2905 .buckets
2906 .iter()
2907 .zip(&right.buckets)
2908 .all(|(left, right)| {
2909 left.start_realtime_usec == right.start_realtime_usec
2910 && left.end_realtime_usec == right.end_realtime_usec
2911 && left.values == right.values
2912 })
2913 }
2914 _ => false,
2915 }
2916}
2917
2918fn query_needs_source_realtime_main(query: &ExplorerQuery) -> bool {
2919 query.use_source_realtime
2920 && (query.after_realtime_usec.is_some()
2921 || query.before_realtime_usec.is_some()
2922 || query.histogram.is_some()
2923 || query.limit > 0)
2924}
2925
2926fn facet_pass_needs_source_realtime(query: &ExplorerQuery) -> bool {
2927 query.use_source_realtime
2928 && (query.after_realtime_usec.is_some() || query.before_realtime_usec.is_some())
2929}
2930
2931fn query_needs_main_pass(query: &ExplorerQuery) -> bool {
2932 query.limit > 0 || query.histogram.is_some()
2933}
2934
2935fn explorer_result_for_query(query: &ExplorerQuery) -> ExplorerResult {
2936 ExplorerResult {
2937 histogram: query
2938 .histogram
2939 .as_ref()
2940 .map(|field| new_histogram(field, query)),
2941 ..ExplorerResult::default()
2942 }
2943}
2944
2945fn explorer_control_stopped(control: Option<&ExplorerControl<'_>>) -> bool {
2946 control.and_then(ExplorerControl::stop_reason).is_some()
2947}
2948
2949fn can_run_combined_explorer_pass(facet_groups: &[FacetPassGroup]) -> bool {
2950 facet_groups
2951 .iter()
2952 .all(|group| group.excluded_field.is_none())
2953}
2954
2955fn combined_facet_indices(facet_groups: &[FacetPassGroup]) -> Vec<usize> {
2956 facet_groups
2957 .iter()
2958 .flat_map(|group| group.facet_indices.iter().copied())
2959 .collect()
2960}
2961
2962fn record_combined_unsampled_row(
2963 stats: &mut ExplorerStats,
2964 mode: CombinedScanMode,
2965 commit_realtime: u64,
2966 row_count: u64,
2967 count_rows_unsampled: bool,
2968) {
2969 record_last_realtime(stats, commit_realtime);
2970 if mode.include_main {
2971 stats.rows_matched = stats.rows_matched.saturating_add(row_count);
2972 }
2973 if mode.include_facets {
2974 stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(row_count);
2975 }
2976 if count_rows_unsampled {
2977 stats.rows_unsampled = stats.rows_unsampled.saturating_add(row_count);
2978 }
2979 stats.sampling_unsampled = stats.sampling_unsampled.saturating_add(1);
2980}
2981
2982fn update_combined_matched_stats(
2983 stats: &mut ExplorerStats,
2984 mode: CombinedScanMode,
2985 effective_realtime: u64,
2986 control: Option<&mut ExplorerControl<'_>>,
2987) -> bool {
2988 let mut stop_after_matched_row = false;
2989 if mode.include_main {
2990 stats.rows_matched = stats.rows_matched.saturating_add(1);
2991 stop_after_matched_row = control
2992 .map(|control| control.emit_matched_row(effective_realtime, stats.rows_matched))
2993 .unwrap_or(false);
2994 }
2995 if mode.include_facets {
2996 stats.facet_rows_matched = stats.facet_rows_matched.saturating_add(1);
2997 }
2998 stop_after_matched_row
2999}
3000
3001fn should_stop_when_rows_full(
3002 query: &ExplorerQuery,
3003 rows: &[ExplorerRow],
3004 effective_realtime: u64,
3005 rows_matched: u64,
3006) -> bool {
3007 if !query.stop_when_rows_full || query.limit == 0 || rows.len() < query.limit {
3008 return false;
3009 }
3010 let every = query.stop_when_rows_full_check_every.max(1);
3011 if rows_matched == 0 || rows_matched % every != 0 {
3012 return false;
3013 }
3014 match query.direction {
3015 Direction::Backward => {
3016 rows.iter()
3017 .map(|row| row.realtime_usec)
3018 .min()
3019 .is_some_and(|oldest| {
3020 effective_realtime < oldest.saturating_sub(query.realtime_slack_usec)
3021 })
3022 }
3023 Direction::Forward => {
3024 rows.iter()
3025 .map(|row| row.realtime_usec)
3026 .max()
3027 .is_some_and(|newest| {
3028 effective_realtime > newest.saturating_add(query.realtime_slack_usec)
3029 })
3030 }
3031 }
3032}
3033
3034fn row_candidate_to_keep(query: &ExplorerQuery, rows: &[ExplorerRow], realtime_usec: u64) -> bool {
3035 if query.limit == 0 {
3036 return false;
3037 }
3038 if !row_within_anchor(query, realtime_usec) {
3039 return false;
3040 }
3041 if rows.len() < query.limit {
3042 return true;
3043 }
3044 match query.direction {
3045 Direction::Backward => rows
3046 .iter()
3047 .map(|row| row.realtime_usec)
3048 .min()
3049 .is_some_and(|oldest| realtime_usec >= oldest),
3050 Direction::Forward => rows
3051 .iter()
3052 .map(|row| row.realtime_usec)
3053 .max()
3054 .is_some_and(|newest| realtime_usec <= newest),
3055 }
3056}
3057
3058fn row_within_anchor(query: &ExplorerQuery, realtime_usec: u64) -> bool {
3059 match (query.direction, query.anchor) {
3060 (Direction::Forward, ExplorerAnchor::Realtime(anchor)) => realtime_usec > anchor,
3061 (Direction::Backward, ExplorerAnchor::Realtime(anchor)) => realtime_usec <= anchor,
3062 _ => true,
3063 }
3064}
3065
3066fn add_special_histogram_value(
3067 histogram: Option<&mut ExplorerHistogram>,
3068 realtime_usec: u64,
3069 value: &[u8],
3070 count: u64,
3071 stats: &mut ExplorerStats,
3072) {
3073 let Some(histogram) = histogram else {
3074 return;
3075 };
3076 let Some(bucket_index) = histogram_bucket_index(histogram, realtime_usec) else {
3077 return;
3078 };
3079 if let Some(bucket) = histogram.buckets.get_mut(bucket_index) {
3080 increment_counter_by(&mut bucket.values, value, count);
3081 stats.histogram_updates = stats.histogram_updates.saturating_add(1);
3082 }
3083}
3084
3085fn add_estimated_histogram_range(
3086 histogram: Option<&mut ExplorerHistogram>,
3087 from_realtime_usec: u64,
3088 to_realtime_usec: u64,
3089 entries: u64,
3090 stats: &mut ExplorerStats,
3091) {
3092 let Some(histogram) = histogram else {
3093 return;
3094 };
3095 if entries == 0 || from_realtime_usec >= to_realtime_usec {
3096 return;
3097 }
3098
3099 let Some(first) = histogram.buckets.first() else {
3100 return;
3101 };
3102 let Some(last) = histogram.buckets.last() else {
3103 return;
3104 };
3105 let from_realtime_usec = from_realtime_usec.max(first.start_realtime_usec);
3106 let to_realtime_usec = to_realtime_usec.min(last.end_realtime_usec);
3107 if from_realtime_usec >= to_realtime_usec {
3108 return;
3109 }
3110
3111 let total = to_realtime_usec.saturating_sub(from_realtime_usec).max(1);
3112 let mut touched = 0u64;
3113 for bucket in &mut histogram.buckets {
3114 if bucket.start_realtime_usec > to_realtime_usec {
3115 break;
3116 }
3117 let overlap_start = bucket.start_realtime_usec.max(from_realtime_usec);
3118 let overlap_end = bucket.end_realtime_usec.min(to_realtime_usec);
3119 if overlap_start >= overlap_end {
3120 continue;
3121 }
3122 let bucket_entries = ((overlap_end.saturating_sub(overlap_start) as u128 * entries as u128)
3123 / total as u128) as u64;
3124 if bucket_entries != 0 {
3125 increment_counter_by(&mut bucket.values, EXPLORER_ESTIMATED_VALUE, bucket_entries);
3126 }
3127 touched = touched.saturating_add(1);
3128 }
3129 stats.histogram_updates = stats.histogram_updates.saturating_add(touched);
3130}
3131
3132fn histogram_bucket_index(histogram: &ExplorerHistogram, realtime_usec: u64) -> Option<usize> {
3133 let first = histogram.buckets.first()?;
3134 let width = first
3135 .end_realtime_usec
3136 .saturating_sub(first.start_realtime_usec)
3137 .max(1);
3138 histogram_bucket_index_from_bounds(
3139 realtime_usec,
3140 first.start_realtime_usec,
3141 width,
3142 histogram.buckets.len(),
3143 )
3144}
3145
3146fn payload_from_parts(field: &[u8], value: &[u8]) -> Vec<u8> {
3147 let mut out = Vec::with_capacity(field.len() + 1 + value.len());
3148 out.extend_from_slice(field);
3149 out.push(b'=');
3150 out.extend_from_slice(value);
3151 out
3152}
3153
3154fn split_payload_bytes(payload: &[u8]) -> Option<(&[u8], &[u8])> {
3155 let eq = payload.iter().position(|byte| *byte == b'=')?;
3156 Some((&payload[..eq], &payload[eq + 1..]))
3157}
3158
3159fn parse_source_realtime(value: &[u8]) -> Option<u64> {
3160 std::str::from_utf8(value).ok()?.parse().ok()
3161}
3162
3163fn effective_realtime_from_scan(source_realtime: Option<u64>, commit_realtime: u64) -> u64 {
3164 match source_realtime {
3165 Some(source_realtime) if source_realtime != 0 && source_realtime < commit_realtime => {
3166 source_realtime
3167 }
3168 _ => commit_realtime,
3169 }
3170}
3171
3172fn record_last_realtime(stats: &mut ExplorerStats, commit_realtime: u64) {
3173 if commit_realtime > stats.last_realtime_usec {
3174 stats.last_realtime_usec = commit_realtime;
3175 }
3176}
3177
3178fn record_source_realtime_delta(
3179 stats: &mut ExplorerStats,
3180 source_realtime: Option<u64>,
3181 commit_realtime: u64,
3182) {
3183 let Some(source_realtime) = source_realtime else {
3184 return;
3185 };
3186 if source_realtime == 0 || source_realtime >= commit_realtime {
3187 return;
3188 }
3189 let delta = commit_realtime.saturating_sub(source_realtime);
3190 if delta > stats.max_source_realtime_delta_usec {
3191 stats.max_source_realtime_delta_usec = delta;
3192 }
3193}
3194
3195fn query_has_fts(query: &ExplorerQuery) -> bool {
3196 !query.fts_terms.is_empty()
3197 || !query.fts_patterns.is_empty()
3198 || !query.fts_negative_patterns.is_empty()
3199}
3200
3201fn query_has_positive_fts(query: &ExplorerQuery) -> bool {
3202 if !query.fts_terms.is_empty() {
3203 query.fts_terms.iter().any(|term| !term.negative)
3204 } else {
3205 !query.fts_patterns.is_empty()
3206 }
3207}
3208
3209fn row_rejected_by_fts(query: &ExplorerQuery, scan: &RowScan) -> bool {
3210 query_has_fts(query)
3211 && (scan.fts_negative_match || query_has_positive_fts(query) && !scan.fts_matches)
3212}
3213
3214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3215enum FtsTermMatch {
3216 None,
3217 Positive,
3218 Negative,
3219}
3220
3221fn match_fts_query(value: &[u8], query: &ExplorerQuery) -> FtsTermMatch {
3222 if !query.fts_terms.is_empty() {
3223 for term in &query.fts_terms {
3224 if term.matches(value) {
3225 return if term.negative {
3226 FtsTermMatch::Negative
3227 } else {
3228 FtsTermMatch::Positive
3229 };
3230 }
3231 }
3232 return FtsTermMatch::None;
3233 }
3234
3235 if matches_fts(value, &query.fts_negative_patterns) {
3236 FtsTermMatch::Negative
3237 } else if matches_fts(value, &query.fts_patterns) {
3238 FtsTermMatch::Positive
3239 } else {
3240 FtsTermMatch::None
3241 }
3242}
3243
3244fn matches_fts(value: &[u8], patterns: &[Vec<u8>]) -> bool {
3245 patterns
3246 .iter()
3247 .filter(|pattern| !pattern.is_empty())
3248 .any(|pattern| contains_ascii_case_insensitive(value, pattern))
3249}
3250
3251fn contains_ascii_case_insensitive(haystack: &[u8], needle: &[u8]) -> bool {
3252 if needle.is_empty() {
3253 return true;
3254 }
3255 if haystack.len() < needle.len() {
3256 return false;
3257 }
3258 haystack.windows(needle.len()).any(|window| {
3259 window
3260 .iter()
3261 .zip(needle)
3262 .all(|(left, right)| left.eq_ignore_ascii_case(right))
3263 })
3264}
3265
3266fn find_ascii_case_insensitive(haystack: &[u8], needle: &[u8]) -> Option<usize> {
3267 if needle.is_empty() {
3268 return Some(0);
3269 }
3270 if haystack.len() < needle.len() {
3271 return None;
3272 }
3273 haystack.windows(needle.len()).position(|window| {
3274 window
3275 .iter()
3276 .zip(needle)
3277 .all(|(left, right)| left.eq_ignore_ascii_case(right))
3278 })
3279}
3280
3281fn timestamp_in_range(query: &ExplorerQuery, timestamp: u64) -> bool {
3282 if query
3283 .after_realtime_usec
3284 .is_some_and(|after| timestamp < after)
3285 {
3286 return false;
3287 }
3288 if query
3289 .before_realtime_usec
3290 .is_some_and(|before| timestamp > before)
3291 {
3292 return false;
3293 }
3294 true
3295}
3296
3297fn stop_by_commit_time(query: &ExplorerQuery, commit_realtime: u64) -> bool {
3298 match query.direction {
3302 Direction::Forward => query.before_realtime_usec.is_some_and(|before| {
3303 commit_realtime > before.saturating_add(query.realtime_slack_usec)
3304 }),
3305 Direction::Backward => query
3306 .after_realtime_usec
3307 .is_some_and(|after| commit_realtime < after),
3308 }
3309}
3310
3311fn skip_by_commit_time(query: &ExplorerQuery, commit_realtime: u64) -> bool {
3312 match query.direction {
3313 Direction::Forward => query
3314 .after_realtime_usec
3315 .is_some_and(|after| commit_realtime < after),
3316 Direction::Backward => query.before_realtime_usec.is_some_and(|before| {
3317 commit_realtime > before.saturating_add(query.realtime_slack_usec)
3318 }),
3319 }
3320}
3321
3322fn new_histogram(field: &[u8], query: &ExplorerQuery) -> ExplorerHistogram {
3323 let (start, end) = histogram_bounds(query);
3324 let target_buckets = query.histogram_target_buckets.max(1);
3325 let mut width = histogram_bar_width_usec(start, end, target_buckets);
3326 let start = histogram_slot_baseline_usec(start, width);
3327 let mut end = histogram_slot_baseline_usec(end, width).saturating_add(width);
3328 let mut bucket_count = end
3329 .saturating_sub(start)
3330 .checked_div(width)
3331 .unwrap_or(0)
3332 .saturating_add(1) as usize;
3333 if bucket_count > 1001 {
3334 bucket_count = 1001;
3335 width = end
3336 .saturating_sub(start)
3337 .checked_div(1000)
3338 .unwrap_or(0)
3339 .max(1);
3340 end = start.saturating_add(width.saturating_mul(1000));
3341 }
3342 let mut buckets = Vec::with_capacity(bucket_count);
3343 for index in 0..bucket_count {
3344 let bucket_start = start.saturating_add(width.saturating_mul(index as u64));
3345 let bucket_end = if index + 1 == bucket_count {
3346 end.saturating_add(1)
3347 } else {
3348 bucket_start.saturating_add(width)
3349 };
3350 buckets.push(ExplorerHistogramBucket {
3351 start_realtime_usec: bucket_start,
3352 end_realtime_usec: bucket_end,
3353 values: HashMap::new(),
3354 });
3355 }
3356 ExplorerHistogram {
3357 field: field.to_vec(),
3358 buckets,
3359 }
3360}
3361
3362fn histogram_bar_width_usec(after: u64, before: u64, target_buckets: usize) -> u64 {
3363 const USEC_PER_SEC: u64 = 1_000_000;
3364 const VALID_DURATIONS_SECONDS: &[u64] = &[
3365 1, 2, 5, 10, 15, 30, 60, 120, 180, 300, 600, 900, 1800, 3600, 7200, 21600, 28800, 43200,
3366 86400, 172800, 259200, 432000, 604800, 1209600, 2592000,
3367 ];
3368 let duration = before.saturating_sub(after);
3369 for seconds in VALID_DURATIONS_SECONDS.iter().rev() {
3370 let width = seconds.saturating_mul(USEC_PER_SEC);
3371 if width != 0 && duration / width >= target_buckets as u64 {
3372 return width;
3373 }
3374 }
3375 USEC_PER_SEC
3376}
3377
3378fn histogram_slot_baseline_usec(value: u64, width: u64) -> u64 {
3379 value.saturating_sub(value % width.max(1))
3380}
3381
3382fn histogram_bounds(query: &ExplorerQuery) -> (u64, u64) {
3383 let start = query.after_realtime_usec.unwrap_or(0);
3384 let end = query
3385 .before_realtime_usec
3386 .unwrap_or_else(|| start.saturating_add(3_600_000_000));
3387 if end <= start {
3388 (start, start.saturating_add(1))
3389 } else {
3390 (start, end)
3391 }
3392}
3393
3394fn increment_counter_by(map: &mut HashMap<Vec<u8>, u64>, value: &[u8], delta: u64) {
3395 if let Some(count) = map.get_mut(value) {
3396 *count = count.saturating_add(delta);
3397 } else {
3398 map.insert(value.to_vec(), delta);
3399 }
3400}
3401
3402#[cfg(test)]
3403mod tests {
3404 use super::*;
3405 use journal_core::file::{JournalFileOptions, JournalWriter, MmapMut};
3406 use journal_core::repository::File as RepoFile;
3407 use tempfile::TempDir;
3408
3409 fn test_uuid(seed: u8) -> uuid::Uuid {
3410 uuid::Uuid::from_bytes([seed; 16])
3411 }
3412
3413 fn create_writer(
3414 path: &std::path::Path,
3415 compression: Option<(Compression, usize)>,
3416 ) -> (JournalFile<MmapMut>, JournalWriter) {
3417 if let Some(parent) = path.parent() {
3418 std::fs::create_dir_all(parent).expect("create journal parent");
3419 }
3420 let repo_file = RepoFile::from_path(path).expect("repo file");
3421 let mut options = JournalFileOptions::new(test_uuid(1), test_uuid(2), test_uuid(3));
3422 if let Some((compression, threshold)) = compression {
3423 options = options
3424 .with_compression(compression)
3425 .with_compress_threshold(threshold);
3426 }
3427 let mut file = JournalFile::<MmapMut>::create(&repo_file, options).expect("create journal");
3428 let writer = if let Some((compression, threshold)) = compression {
3429 JournalWriter::new_with_compression(&mut file, 1, test_uuid(4), compression, threshold)
3430 .expect("writer")
3431 } else {
3432 JournalWriter::new(&mut file, 1, test_uuid(4)).expect("writer")
3433 };
3434 (file, writer)
3435 }
3436
3437 fn write_entries(
3438 path: &std::path::Path,
3439 compression: Option<(Compression, usize)>,
3440 entries: &[(&[&[u8]], u64)],
3441 ) {
3442 let (mut file, mut writer) = create_writer(path, compression);
3443 for (payloads, realtime) in entries {
3444 writer
3445 .add_entry(&mut file, payloads, *realtime, *realtime)
3446 .expect("write entry");
3447 }
3448 file.sync().expect("sync journal");
3449 }
3450
3451 fn write_many_entries(path: &std::path::Path, count: usize) {
3452 let (mut file, mut writer) = create_writer(path, None);
3453 for index in 0..count {
3454 let message = format!("MESSAGE=row-{index}");
3455 let service = if index % 2 == 0 {
3456 b"SERVICE=even".as_slice()
3457 } else {
3458 b"SERVICE=odd".as_slice()
3459 };
3460 let payloads: [&[u8]; 2] = [message.as_bytes(), service];
3461 let realtime = 1_700_000_000_000_000u64.saturating_add(index as u64);
3462 writer
3463 .add_entry(&mut file, &payloads, realtime, realtime)
3464 .expect("write entry");
3465 }
3466 file.sync().expect("sync journal");
3467 }
3468
3469 #[test]
3470 fn explorer_control_reports_progress_during_large_scan() {
3471 let dir = TempDir::new().expect("tempdir");
3472 let path = dir.path().join("progress.journal");
3473 write_many_entries(&path, 9_000);
3474
3475 let mut reports = Vec::new();
3476 let mut progress = |progress: ExplorerProgress| {
3477 reports.push(progress.stats.rows_examined);
3478 };
3479 let mut control = ExplorerControl::new();
3480 control.set_progress_interval(Duration::ZERO);
3481 control.set_progress_callback(Some(&mut progress));
3482 let mut reader = FileReader::open(&path).expect("open reader");
3483 let query = ExplorerQuery {
3484 facets: vec![b"SERVICE".to_vec()],
3485 limit: 0,
3486 ..ExplorerQuery::default()
3487 };
3488
3489 let result = reader
3490 .explore_with_strategy_and_control(&query, ExplorerStrategy::Traversal, &mut control)
3491 .expect("explore");
3492
3493 assert_eq!(control.stop_reason(), None);
3494 assert_eq!(result.stats.rows_examined, 9_000);
3495 assert!(!reports.is_empty());
3496 assert!(reports.iter().any(|rows| *rows >= 8_191));
3497 }
3498
3499 #[test]
3500 fn explorer_control_cancels_inside_large_scan() {
3501 let dir = TempDir::new().expect("tempdir");
3502 let path = dir.path().join("cancel.journal");
3503 write_many_entries(&path, 9_000);
3504
3505 let is_cancelled = || true;
3506 let mut control = ExplorerControl::new();
3507 control.set_cancellation_callback(Some(&is_cancelled));
3508 let mut reader = FileReader::open(&path).expect("open reader");
3509 let query = ExplorerQuery {
3510 facets: vec![b"SERVICE".to_vec()],
3511 limit: 0,
3512 ..ExplorerQuery::default()
3513 };
3514
3515 let result = reader
3516 .explore_with_strategy_and_control(&query, ExplorerStrategy::Traversal, &mut control)
3517 .expect("explore");
3518
3519 assert_eq!(control.stop_reason(), Some(ExplorerStopReason::Cancelled));
3520 assert!(result.stats.rows_examined < 9_000);
3521 }
3522
3523 #[test]
3524 fn explorer_filters_with_or_values_and_and_fields() {
3525 let dir = TempDir::new().expect("tempdir");
3526 let path = dir.path().join("filter.journal");
3527 write_entries(
3528 &path,
3529 None,
3530 &[
3531 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3532 (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
3533 (&[b"SERVICE=b", b"PRIORITY=4"], 3_000),
3534 ],
3535 );
3536
3537 let mut reader = FileReader::open(&path).expect("open reader");
3538 let query = ExplorerQuery {
3539 filters: vec![
3540 ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec(), b"b".to_vec()]),
3541 ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
3542 ],
3543 facets: vec![b"SERVICE".to_vec()],
3544 limit: 10,
3545 ..ExplorerQuery::default()
3546 };
3547
3548 let result = reader.explore(&query).expect("explore");
3549 assert_eq!(result.rows.len(), 2);
3550 let service = result
3551 .facets
3552 .get(b"SERVICE".as_slice())
3553 .expect("service facet");
3554 assert_eq!(service.get(b"a".as_slice()), Some(&1));
3555 assert_eq!(service.get(b"b".as_slice()), Some(&1));
3556 assert!(result.stats.data_cache_misses > 0);
3557 }
3558
3559 #[test]
3560 fn explorer_rejects_debug_row_traversal_column_collection() {
3561 let dir = TempDir::new().expect("tempdir");
3562 let path = dir.path().join("debug-column-collection.journal");
3563 write_entries(&path, None, &[(&[b"PRIORITY=3", b"MESSAGE=hello"], 1_000)]);
3564
3565 let query = ExplorerQuery {
3566 facets: vec![b"PRIORITY".to_vec()],
3567 debug_collect_column_fields_by_row_traversal: true,
3568 ..ExplorerQuery::default()
3569 };
3570
3571 let mut reader = FileReader::open(&path).expect("open reader");
3572 let err = reader
3573 .explore(&query)
3574 .expect_err("debug-only column collection is rejected");
3575 assert!(matches!(err, SdkError::Unsupported(_)));
3576 assert!(
3577 err.to_string()
3578 .contains("debug_collect_column_fields_by_row_traversal")
3579 );
3580
3581 let mut reader = FileReader::open(&path).expect("reopen reader");
3582 let err = reader
3583 .explore_with_strategy_cursor_rows(&query, ExplorerStrategy::Traversal)
3584 .expect_err("cursor-row explorer also rejects debug-only column collection");
3585 assert!(matches!(err, SdkError::Unsupported(_)));
3586 }
3587
3588 #[test]
3589 fn explorer_skips_irrelevant_compressed_data_for_facets() {
3590 let dir = TempDir::new().expect("tempdir");
3591 let path = dir.path().join("compressed.journal");
3592 let large_message = b"MESSAGE=abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
3593 write_entries(
3594 &path,
3595 Some((Compression::Zstd, 32)),
3596 &[(&[b"PRIORITY=3", large_message], 1_000)],
3597 );
3598
3599 let mut reader = FileReader::open(&path).expect("open reader");
3600 let query = ExplorerQuery {
3601 facets: vec![b"PRIORITY".to_vec()],
3602 limit: 0,
3603 ..ExplorerQuery::default()
3604 };
3605
3606 let result = reader.explore(&query).expect("explore");
3607 let priority = result
3608 .facets
3609 .get(b"PRIORITY".as_slice())
3610 .expect("priority facet");
3611 assert_eq!(priority.get(b"3".as_slice()), Some(&1));
3612 assert_eq!(result.stats.payloads_decompressed, 0);
3613 assert_eq!(result.stats.data_refs_seen, 1);
3614 assert_eq!(result.stats.early_stops, 1);
3615 }
3616
3617 #[test]
3618 fn explorer_reuses_classified_data_objects() {
3619 let dir = TempDir::new().expect("tempdir");
3620 let path = dir.path().join("reuse.journal");
3621 write_entries(
3622 &path,
3623 None,
3624 &[
3625 (&[b"PRIORITY=3"], 1_000),
3626 (&[b"PRIORITY=3"], 2_000),
3627 (&[b"PRIORITY=3"], 3_000),
3628 ],
3629 );
3630
3631 let mut reader = FileReader::open(&path).expect("open reader");
3632 let query = ExplorerQuery {
3633 facets: vec![b"PRIORITY".to_vec()],
3634 limit: 0,
3635 ..ExplorerQuery::default()
3636 };
3637
3638 let result = reader.explore(&query).expect("explore");
3639 let priority = result
3640 .facets
3641 .get(b"PRIORITY".as_slice())
3642 .expect("priority facet");
3643 assert_eq!(priority.get(b"3".as_slice()), Some(&3));
3644 assert!(result.stats.data_cache_hits >= 2);
3645 }
3646
3647 #[test]
3648 fn explorer_groups_facets_with_same_filter_set() {
3649 let dir = TempDir::new().expect("tempdir");
3650 let path = dir.path().join("grouped-facets.journal");
3651 write_entries(
3652 &path,
3653 None,
3654 &[
3655 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3656 (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3657 ],
3658 );
3659
3660 let mut reader = FileReader::open(&path).expect("open reader");
3661 let query = ExplorerQuery {
3662 facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
3663 limit: 0,
3664 ..ExplorerQuery::default()
3665 };
3666
3667 let result = reader.explore(&query).expect("explore");
3668 assert_eq!(result.stats.rows_examined, 2);
3669 assert_eq!(result.stats.facet_rows_matched, 2);
3670 assert_eq!(
3671 result
3672 .facets
3673 .get(b"SERVICE".as_slice())
3674 .and_then(|values| values.get(b"a".as_slice())),
3675 Some(&1)
3676 );
3677 assert_eq!(
3678 result
3679 .facets
3680 .get(b"PRIORITY".as_slice())
3681 .and_then(|values| values.get(b"4".as_slice())),
3682 Some(&1)
3683 );
3684 }
3685
3686 #[test]
3687 fn explorer_combines_rows_histogram_and_facets_in_one_pass() {
3688 let dir = TempDir::new().expect("tempdir");
3689 let path = dir.path().join("combined-pass.journal");
3690 write_entries(
3691 &path,
3692 None,
3693 &[
3694 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3695 (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3696 ],
3697 );
3698
3699 let mut reader = FileReader::open(&path).expect("open reader");
3700 let query = ExplorerQuery {
3701 facets: vec![b"SERVICE".to_vec()],
3702 histogram: Some(b"PRIORITY".to_vec()),
3703 histogram_target_buckets: 2,
3704 limit: 2,
3705 ..ExplorerQuery::default()
3706 };
3707
3708 let result = reader.explore(&query).expect("explore");
3709 assert_eq!(result.rows.len(), 2);
3710 assert_eq!(result.stats.rows_examined, 2);
3711 assert_eq!(result.stats.rows_matched, 2);
3712 assert_eq!(result.stats.facet_rows_matched, 2);
3713 assert_eq!(
3714 result
3715 .facets
3716 .get(b"SERVICE".as_slice())
3717 .and_then(|values| values.get(b"a".as_slice())),
3718 Some(&1)
3719 );
3720 let histogram_total = result
3721 .histogram
3722 .as_ref()
3723 .expect("histogram")
3724 .buckets
3725 .iter()
3726 .flat_map(|bucket| bucket.values.values())
3727 .sum::<u64>();
3728 assert_eq!(histogram_total, 2);
3729 }
3730
3731 #[test]
3732 fn explorer_sampling_uses_actual_histogram_bucket_count() {
3733 let query = ExplorerQuery {
3734 after_realtime_usec: Some(1_733_494_460_000_000),
3735 before_realtime_usec: Some(1_735_656_412_000_000),
3736 histogram: Some(b"PRIORITY".to_vec()),
3737 histogram_target_buckets: 300,
3738 sampling: Some(ExplorerSampling {
3739 budget: 20_000,
3740 matched_files: 200,
3741 file_head_realtime_usec: 1_733_494_460_000_000,
3742 file_tail_realtime_usec: 1_735_656_412_000_000,
3743 file_head_seqnum: 1,
3744 file_tail_seqnum: 2,
3745 file_entries: 2,
3746 }),
3747 ..ExplorerQuery::default()
3748 };
3749
3750 let bucket_count = histogram_bucket_count_for_query(&query).expect("bucket count");
3751 let sampling =
3752 ExplorerSamplingState::for_query(&query, Some(bucket_count)).expect("sampling");
3753
3754 assert_eq!(bucket_count, 302);
3755 assert_eq!(sampling.per_slot_sampled.len(), bucket_count);
3756 }
3757
3758 #[test]
3759 fn explorer_sampling_seqnum_estimate_clamps_over_scanned_to_one() {
3760 let query = ExplorerQuery {
3761 after_realtime_usec: Some(1),
3762 before_realtime_usec: Some(100),
3763 direction: Direction::Forward,
3764 sampling: Some(ExplorerSampling {
3765 budget: 20,
3766 matched_files: 1,
3767 file_head_realtime_usec: 1,
3768 file_tail_realtime_usec: 100,
3769 file_head_seqnum: 1,
3770 file_tail_seqnum: 100,
3771 file_entries: 3,
3772 }),
3773 ..ExplorerQuery::default()
3774 };
3775 let mut sampling = ExplorerSamplingState::for_query(&query, None).expect("sampling");
3776 sampling.per_file_sampled = 10;
3777
3778 assert_eq!(sampling.estimate_remaining_rows_by_seqnum(5), Some(1));
3779 }
3780
3781 #[test]
3782 fn explorer_estimated_histogram_distribution_matches_netdata_integer_math() {
3783 let mut histogram = ExplorerHistogram {
3784 field: b"PRIORITY".to_vec(),
3785 buckets: vec![
3786 ExplorerHistogramBucket {
3787 start_realtime_usec: 0,
3788 end_realtime_usec: 10,
3789 values: HashMap::new(),
3790 },
3791 ExplorerHistogramBucket {
3792 start_realtime_usec: 10,
3793 end_realtime_usec: 20,
3794 values: HashMap::new(),
3795 },
3796 ExplorerHistogramBucket {
3797 start_realtime_usec: 20,
3798 end_realtime_usec: 30,
3799 values: HashMap::new(),
3800 },
3801 ],
3802 };
3803 let mut stats = ExplorerStats::default();
3804
3805 add_estimated_histogram_range(Some(&mut histogram), 0, 30, 10, &mut stats);
3806
3807 let counts = histogram
3808 .buckets
3809 .iter()
3810 .map(|bucket| {
3811 bucket
3812 .values
3813 .get(EXPLORER_ESTIMATED_VALUE)
3814 .copied()
3815 .unwrap_or_default()
3816 })
3817 .collect::<Vec<_>>();
3818 assert_eq!(counts, vec![3, 3, 3]);
3819 assert_eq!(counts.iter().sum::<u64>(), 9);
3820 }
3821
3822 #[test]
3823 fn explorer_filters_then_combines_outputs_in_one_candidate_pass() {
3824 let dir = TempDir::new().expect("tempdir");
3825 let path = dir.path().join("filtered-combined-pass.journal");
3826 write_entries(
3827 &path,
3828 None,
3829 &[
3830 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3831 (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
3832 (&[b"SERVICE=c", b"PRIORITY=3"], 3_000),
3833 ],
3834 );
3835
3836 let mut reader = FileReader::open(&path).expect("open reader");
3837 let query = ExplorerQuery {
3838 filters: vec![ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()])],
3839 facets: vec![b"SERVICE".to_vec()],
3840 histogram: Some(b"SERVICE".to_vec()),
3841 histogram_target_buckets: 2,
3842 limit: 10,
3843 ..ExplorerQuery::default()
3844 };
3845
3846 let result = reader.explore(&query).expect("explore");
3847 assert_eq!(result.rows.len(), 2);
3848 assert_eq!(result.stats.rows_examined, 2);
3849 assert_eq!(result.stats.rows_matched, 2);
3850 assert_eq!(result.stats.facet_rows_matched, 2);
3851 let service = result
3852 .facets
3853 .get(b"SERVICE".as_slice())
3854 .expect("service facet");
3855 assert_eq!(service.get(b"a".as_slice()), Some(&1));
3856 assert_eq!(service.get(b"c".as_slice()), Some(&1));
3857 assert_eq!(service.get(b"b".as_slice()), None);
3858 }
3859
3860 #[test]
3861 fn explorer_cursor_rows_defer_payload_expansion() {
3862 let dir = TempDir::new().expect("tempdir");
3863 let path = dir.path().join("cursor-only-row.journal");
3864 write_entries(
3865 &path,
3866 None,
3867 &[(&[b"SERVICE=a", b"PRIORITY=3", b"MESSAGE=hello"], 1_000)],
3868 );
3869
3870 let query = ExplorerQuery {
3871 limit: 1,
3872 ..ExplorerQuery::default()
3873 };
3874 let mut reader = FileReader::open(&path).expect("open reader");
3875 let result = reader
3876 .explore_with_strategy_cursor_rows(&query, ExplorerStrategy::Traversal)
3877 .expect("explore cursor rows");
3878
3879 assert_eq!(result.rows.len(), 1);
3880 assert!(result.rows[0].payloads.is_empty());
3881 assert_eq!(result.stats.returned_row_expansions, 0);
3882
3883 let cursor = result.rows[0].cursor.clone();
3884 let mut reader = FileReader::open(&path).expect("reopen reader");
3885 reader.seek_cursor(&cursor).expect("seek cursor");
3886 assert!(reader.test_cursor(&cursor).expect("test cursor"));
3887
3888 let mut payloads = Vec::new();
3889 reader
3890 .collect_entry_payloads(&mut payloads)
3891 .expect("collect payloads");
3892 assert!(payloads.iter().any(|payload| payload == b"MESSAGE=hello"));
3893 }
3894
3895 #[test]
3896 fn explorer_same_field_filter_exclusion_counts_filtered_out_facet_values() {
3897 let dir = TempDir::new().expect("tempdir");
3898 let path = dir.path().join("same-field-filter-facet.journal");
3899 write_entries(
3900 &path,
3901 None,
3902 &[
3903 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3904 (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
3905 (&[b"SERVICE=a", b"PRIORITY=4"], 3_000),
3906 ],
3907 );
3908
3909 let mut reader = FileReader::open(&path).expect("open reader");
3910 let query = ExplorerQuery {
3911 filters: vec![
3912 ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec()]),
3913 ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
3914 ],
3915 facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
3916 limit: 0,
3917 ..ExplorerQuery::default()
3918 };
3919
3920 let result = reader.explore(&query).expect("explore");
3921 let service = result
3922 .facets
3923 .get(b"SERVICE".as_slice())
3924 .expect("service facet");
3925 assert_eq!(service.get(b"a".as_slice()), Some(&1));
3926 assert_eq!(service.get(b"b".as_slice()), Some(&1));
3927
3928 let priority = result
3929 .facets
3930 .get(b"PRIORITY".as_slice())
3931 .expect("priority facet");
3932 assert_eq!(priority.get(b"3".as_slice()), Some(&1));
3933 assert_eq!(priority.get(b"4".as_slice()), Some(&1));
3934 }
3935
3936 #[test]
3937 fn explorer_index_strategy_matches_traversal_for_all_values() {
3938 let dir = TempDir::new().expect("tempdir");
3939 let path = dir.path().join("indexed-all-values.journal");
3940 write_entries(
3941 &path,
3942 None,
3943 &[
3944 (&[b"SERVICE=a", b"PRIORITY=3", b"TAG=x"], 1_000),
3945 (&[b"SERVICE=b", b"PRIORITY=3", b"TAG=x"], 2_000),
3946 (&[b"SERVICE=a", b"PRIORITY=4", b"TAG=y", b"TAG=z"], 3_000),
3947 (&[b"PRIORITY=3"], 4_000),
3948 ],
3949 );
3950
3951 let query = ExplorerQuery {
3952 after_realtime_usec: Some(0),
3953 before_realtime_usec: Some(5_000),
3954 filters: vec![ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()])],
3955 facets: vec![b"SERVICE".to_vec(), b"TAG".to_vec()],
3956 histogram: Some(b"SERVICE".to_vec()),
3957 histogram_target_buckets: 2,
3958 limit: 2,
3959 field_mode: ExplorerFieldMode::AllValues,
3960 use_source_realtime: false,
3961 ..ExplorerQuery::default()
3962 };
3963
3964 let mut reader = FileReader::open(&path).expect("open reader");
3965 let result = reader
3966 .explore_with_strategy(&query, ExplorerStrategy::Compare)
3967 .expect("compare");
3968
3969 let comparison = result.comparison.as_ref().expect("comparison diagnostics");
3970 assert_eq!(comparison.index_stats, result.stats);
3971 assert_eq!(comparison.traversal_stats.rows_returned, 2);
3972 assert_eq!(comparison.index_stats.rows_returned, 2);
3973
3974 assert_eq!(result.rows.len(), 2);
3975 let service = result
3976 .facets
3977 .get(b"SERVICE".as_slice())
3978 .expect("service facet");
3979 assert_eq!(service.get(b"a".as_slice()), Some(&1));
3980 assert_eq!(service.get(b"b".as_slice()), Some(&1));
3981 assert_eq!(service.get(UNSET_VALUE), Some(&1));
3982 let histogram = result.histogram.as_ref().expect("histogram");
3983 assert_eq!(histogram.buckets.len(), 2);
3984 assert_eq!(histogram.buckets[0].values.get(b"a".as_slice()), Some(&1));
3985 assert_eq!(histogram.buckets[0].values.get(b"b".as_slice()), Some(&1));
3986 assert_eq!(histogram.buckets[0].values.get(UNSET_VALUE), Some(&1));
3987 }
3988
3989 #[test]
3990 fn explorer_index_strategy_preserves_same_field_filter_exclusion() {
3991 let dir = TempDir::new().expect("tempdir");
3992 let path = dir.path().join("indexed-same-field-filter.journal");
3993 write_entries(
3994 &path,
3995 None,
3996 &[
3997 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
3998 (&[b"SERVICE=b", b"PRIORITY=3"], 2_000),
3999 (&[b"SERVICE=a", b"PRIORITY=4"], 3_000),
4000 ],
4001 );
4002
4003 let query = ExplorerQuery {
4004 filters: vec![
4005 ExplorerFilter::new(b"SERVICE".to_vec(), [b"a".to_vec()]),
4006 ExplorerFilter::new(b"PRIORITY".to_vec(), [b"3".to_vec()]),
4007 ],
4008 facets: vec![b"SERVICE".to_vec(), b"PRIORITY".to_vec()],
4009 field_mode: ExplorerFieldMode::AllValues,
4010 use_source_realtime: false,
4011 ..ExplorerQuery::default()
4012 };
4013
4014 let mut reader = FileReader::open(&path).expect("open reader");
4015 let result = reader
4016 .explore_with_strategy(&query, ExplorerStrategy::Compare)
4017 .expect("compare");
4018 let service = result
4019 .facets
4020 .get(b"SERVICE".as_slice())
4021 .expect("service facet");
4022 assert_eq!(service.get(b"a".as_slice()), Some(&1));
4023 assert_eq!(service.get(b"b".as_slice()), Some(&1));
4024 }
4025
4026 #[test]
4027 fn explorer_index_strategy_rejects_first_value_semantics() {
4028 let dir = TempDir::new().expect("tempdir");
4029 let path = dir.path().join("indexed-first-value.journal");
4030 write_entries(&path, None, &[(&[b"TAG=one", b"TAG=two"], 1_000)]);
4031
4032 let mut reader = FileReader::open(&path).expect("open reader");
4033 let err = reader
4034 .explore_with_strategy(
4035 &ExplorerQuery {
4036 facets: vec![b"TAG".to_vec()],
4037 field_mode: ExplorerFieldMode::FirstValue,
4038 ..ExplorerQuery::default()
4039 },
4040 ExplorerStrategy::Index,
4041 )
4042 .expect_err("first-value index strategy should be rejected");
4043
4044 assert!(matches!(err, SdkError::Unsupported(_)));
4045 }
4046
4047 #[test]
4048 fn explorer_first_value_counts_one_value_per_selected_field() {
4049 let dir = TempDir::new().expect("tempdir");
4050 let path = dir.path().join("first-value.journal");
4051 write_entries(
4052 &path,
4053 None,
4054 &[(&[b"TAG=one", b"TAG=two", b"SERVICE=a"], 1_000)],
4055 );
4056
4057 let mut all_values_reader = FileReader::open(&path).expect("open all-values reader");
4058 let all_values = all_values_reader
4059 .explore(&ExplorerQuery {
4060 facets: vec![b"TAG".to_vec()],
4061 limit: 0,
4062 field_mode: ExplorerFieldMode::AllValues,
4063 ..ExplorerQuery::default()
4064 })
4065 .expect("all-values explore");
4066 let all_tag = all_values
4067 .facets
4068 .get(b"TAG".as_slice())
4069 .expect("all-values tag facet");
4070 assert_eq!(all_tag.values().sum::<u64>(), 2);
4071 assert_eq!(all_tag.len(), 2);
4072
4073 let mut first_value_reader = FileReader::open(&path).expect("open first-value reader");
4074 let first_value = first_value_reader
4075 .explore(&ExplorerQuery {
4076 facets: vec![b"TAG".to_vec()],
4077 limit: 0,
4078 field_mode: ExplorerFieldMode::FirstValue,
4079 ..ExplorerQuery::default()
4080 })
4081 .expect("first-value explore");
4082 let first_tag = first_value
4083 .facets
4084 .get(b"TAG".as_slice())
4085 .expect("first-value tag facet");
4086 assert_eq!(first_tag.values().sum::<u64>(), 1);
4087 assert_eq!(first_tag.len(), 1);
4088 assert_eq!(first_value.stats.early_stops, 1);
4089 }
4090
4091 #[test]
4092 fn explorer_first_value_does_not_double_count_duplicate_facets_or_histogram() {
4093 let dir = TempDir::new().expect("tempdir");
4094 let path = dir.path().join("first-value-no-double-count.journal");
4095 write_entries(
4096 &path,
4097 None,
4098 &[(
4099 &[
4100 b"_SOURCE_REALTIME_TIMESTAMP=1000",
4101 b"TAG=one",
4102 b"TAG=two",
4103 b"MESSAGE=after-tag",
4104 ],
4105 1_000,
4106 )],
4107 );
4108
4109 let mut reader = FileReader::open(&path).expect("open reader");
4110 let result = reader
4111 .explore(&ExplorerQuery {
4112 facets: vec![b"TAG".to_vec()],
4113 histogram: Some(b"TAG".to_vec()),
4114 histogram_target_buckets: 1,
4115 limit: 0,
4116 ..ExplorerQuery::default()
4117 })
4118 .expect("explore");
4119
4120 assert_eq!(
4121 result
4122 .facets
4123 .get(b"TAG".as_slice())
4124 .expect("tag facet")
4125 .values()
4126 .sum::<u64>(),
4127 1
4128 );
4129 assert_eq!(
4130 result
4131 .histogram
4132 .as_ref()
4133 .expect("histogram")
4134 .buckets
4135 .iter()
4136 .flat_map(|bucket| bucket.values.values())
4137 .sum::<u64>(),
4138 1
4139 );
4140
4141 let mut all_values_reader = FileReader::open(&path).expect("open all-values reader");
4142 let all_values = all_values_reader
4143 .explore(&ExplorerQuery {
4144 facets: vec![b"TAG".to_vec()],
4145 histogram: Some(b"TAG".to_vec()),
4146 histogram_target_buckets: 1,
4147 limit: 0,
4148 field_mode: ExplorerFieldMode::AllValues,
4149 ..ExplorerQuery::default()
4150 })
4151 .expect("all-values explore");
4152
4153 assert_eq!(
4154 all_values
4155 .facets
4156 .get(b"TAG".as_slice())
4157 .expect("tag facet")
4158 .values()
4159 .sum::<u64>(),
4160 2
4161 );
4162 assert_eq!(
4163 all_values
4164 .histogram
4165 .as_ref()
4166 .expect("histogram")
4167 .buckets
4168 .iter()
4169 .flat_map(|bucket| bucket.values.values())
4170 .sum::<u64>(),
4171 2
4172 );
4173 }
4174
4175 #[test]
4176 fn explorer_first_value_tracks_required_field_identities() {
4177 let dir = TempDir::new().expect("tempdir");
4178 let path = dir.path().join("first-value-identities.journal");
4179 write_entries(
4180 &path,
4181 None,
4182 &[(&[b"TAG=one", b"TAG=two", b"SERVICE=a"], 1_000)],
4183 );
4184
4185 let mut reader = FileReader::open(&path).expect("open reader");
4186 let result = reader
4187 .explore(&ExplorerQuery {
4188 facets: vec![b"TAG".to_vec(), b"SERVICE".to_vec()],
4189 limit: 0,
4190 field_mode: ExplorerFieldMode::FirstValue,
4191 ..ExplorerQuery::default()
4192 })
4193 .expect("explore");
4194
4195 assert_eq!(
4196 result
4197 .facets
4198 .get(b"TAG".as_slice())
4199 .expect("tag facet")
4200 .values()
4201 .sum::<u64>(),
4202 1
4203 );
4204 assert_eq!(
4205 result
4206 .facets
4207 .get(b"SERVICE".as_slice())
4208 .and_then(|values| values.get(b"a".as_slice())),
4209 Some(&1)
4210 );
4211 assert_eq!(result.stats.early_stops, 1);
4212 }
4213
4214 #[test]
4215 fn explorer_rejects_duplicate_facet_fields() {
4216 let dir = TempDir::new().expect("tempdir");
4217 let path = dir.path().join("duplicate-facets.journal");
4218 write_entries(&path, None, &[(&[b"SERVICE=a"], 1_000)]);
4219
4220 let mut reader = FileReader::open(&path).expect("open reader");
4221 let err = reader
4222 .explore(&ExplorerQuery {
4223 facets: vec![b"SERVICE".to_vec(), b"SERVICE".to_vec()],
4224 limit: 0,
4225 ..ExplorerQuery::default()
4226 })
4227 .expect_err("duplicate facets rejected");
4228
4229 assert!(err.to_string().contains("must not be duplicated"));
4230 }
4231
4232 #[test]
4233 fn explorer_empty_result_keeps_requested_facet_with_no_values() {
4234 let dir = TempDir::new().expect("tempdir");
4235 let path = dir.path().join("empty-result.journal");
4236 write_entries(&path, None, &[(&[b"SERVICE=a", b"PRIORITY=3"], 1_000)]);
4237
4238 let mut reader = FileReader::open(&path).expect("open reader");
4239 let result = reader
4240 .explore(&ExplorerQuery {
4241 after_realtime_usec: Some(10_000),
4242 before_realtime_usec: Some(20_000),
4243 facets: vec![b"SERVICE".to_vec()],
4244 limit: 10,
4245 realtime_slack_usec: 0,
4246 ..ExplorerQuery::default()
4247 })
4248 .expect("explore");
4249
4250 assert!(result.rows.is_empty());
4251 assert_eq!(result.stats.rows_matched, 0);
4252 assert!(
4253 result
4254 .facets
4255 .get(b"SERVICE".as_slice())
4256 .expect("service facet")
4257 .is_empty()
4258 );
4259 }
4260
4261 #[test]
4262 fn explorer_facet_time_bounds_do_not_count_slack_rows_without_source_realtime() {
4263 let dir = TempDir::new().expect("tempdir");
4264 let path = dir.path().join("facet-time-bound.journal");
4265 write_entries(
4266 &path,
4267 None,
4268 &[
4269 (&[b"SERVICE=before"], 340_000_000),
4270 (&[b"SERVICE=inside"], 360_000_000),
4271 (&[b"SERVICE=after"], 400_000_000),
4272 ],
4273 );
4274
4275 let mut reader = FileReader::open(&path).expect("open reader");
4276 let result = reader
4277 .explore(&ExplorerQuery {
4278 after_realtime_usec: Some(350_000_000),
4279 before_realtime_usec: Some(370_000_000),
4280 facets: vec![b"SERVICE".to_vec()],
4281 limit: 0,
4282 realtime_slack_usec: 20_000_000,
4283 use_source_realtime: false,
4284 ..ExplorerQuery::default()
4285 })
4286 .expect("explore");
4287
4288 let service = result
4289 .facets
4290 .get(b"SERVICE".as_slice())
4291 .expect("service facet");
4292 assert_eq!(service.get(b"inside".as_slice()), Some(&1));
4293 assert_eq!(service.get(b"before".as_slice()), None);
4294 assert_eq!(service.get(b"after".as_slice()), None);
4295 assert_eq!(result.stats.facet_rows_matched, 1);
4296 }
4297
4298 #[test]
4299 fn explorer_fts_disables_first_value_early_stop() {
4300 let dir = TempDir::new().expect("tempdir");
4301 let path = dir.path().join("fts-no-early-stop.journal");
4302 write_entries(&path, None, &[(&[b"TAG=one", b"MESSAGE=needle"], 1_000)]);
4303
4304 let mut reader = FileReader::open(&path).expect("open reader");
4305 let result = reader
4306 .explore(&ExplorerQuery {
4307 facets: vec![b"TAG".to_vec()],
4308 fts_patterns: vec![b"needle".to_vec()],
4309 limit: 0,
4310 ..ExplorerQuery::default()
4311 })
4312 .expect("explore");
4313
4314 assert_eq!(result.stats.early_stops, 0);
4315 assert_eq!(result.stats.data_refs_seen, 2);
4316 assert_eq!(
4317 result
4318 .facets
4319 .get(b"TAG".as_slice())
4320 .and_then(|values| values.get(b"one".as_slice())),
4321 Some(&1)
4322 );
4323 }
4324
4325 #[test]
4326 fn explorer_fts_or_terms_and_negative_terms_filter_rows() {
4327 let dir = TempDir::new().expect("tempdir");
4328 let path = dir.path().join("fts-negative.journal");
4329 write_entries(
4330 &path,
4331 None,
4332 &[
4333 (&[b"TAG=alpha", b"MESSAGE=alpha keep"], 1_000),
4334 (&[b"TAG=beta", b"MESSAGE=beta keep"], 2_000),
4335 (&[b"TAG=debug", b"MESSAGE=alpha debug"], 3_000),
4336 (&[b"TAG=other", b"MESSAGE=other"], 4_000),
4337 (&[b"TAG=wild", b"MESSAGE=start middle end"], 5_000),
4338 ],
4339 );
4340
4341 let mut reader = FileReader::open(&path).expect("open reader");
4342 let result = reader
4343 .explore(&ExplorerQuery {
4344 facets: vec![b"TAG".to_vec()],
4345 fts_terms: vec![
4346 ExplorerFtsPattern::substring(b"alpha".to_vec(), false),
4347 ExplorerFtsPattern::substring(b"beta".to_vec(), false),
4348 ExplorerFtsPattern::substring(b"debug".to_vec(), true),
4349 ExplorerFtsPattern::substring(b"start*end".to_vec(), false),
4350 ],
4351 limit: 10,
4352 ..ExplorerQuery::default()
4353 })
4354 .expect("explore");
4355
4356 let tag = result.facets.get(b"TAG".as_slice()).expect("TAG facet");
4357 assert_eq!(result.rows.len(), 3);
4358 assert_eq!(tag.get(b"alpha".as_slice()), Some(&1));
4359 assert_eq!(tag.get(b"beta".as_slice()), Some(&1));
4360 assert_eq!(tag.get(b"wild".as_slice()), Some(&1));
4361 assert_eq!(tag.get(b"debug".as_slice()), None);
4362 assert_eq!(tag.get(b"other".as_slice()), None);
4363 }
4364
4365 #[test]
4366 fn explorer_auto_anchor_scans_backward_from_tail() {
4367 let dir = TempDir::new().expect("tempdir");
4368 let path = dir.path().join("backward.journal");
4369 write_entries(
4370 &path,
4371 None,
4372 &[
4373 (&[b"SERVICE=a", b"PRIORITY=3"], 1_000),
4374 (&[b"SERVICE=b", b"PRIORITY=4"], 2_000),
4375 ],
4376 );
4377
4378 let mut reader = FileReader::open(&path).expect("open reader");
4379 let query = ExplorerQuery {
4380 direction: Direction::Backward,
4381 limit: 2,
4382 ..ExplorerQuery::default()
4383 };
4384
4385 let result = reader.explore(&query).expect("explore");
4386 assert_eq!(result.rows.len(), 2);
4387 assert_eq!(result.rows[0].realtime_usec, 2_000);
4388 assert_eq!(result.rows[1].realtime_usec, 1_000);
4389 }
4390
4391 #[test]
4392 fn explorer_backward_time_bound_stops_after_slack_window() {
4393 let dir = TempDir::new().expect("tempdir");
4394 let path = dir.path().join("backward-time-bound.journal");
4395 write_entries(
4396 &path,
4397 None,
4398 &[
4399 (&[b"SERVICE=a"], 100_000_000),
4400 (&[b"SERVICE=b"], 200_000_000),
4401 (&[b"SERVICE=c"], 300_000_000),
4402 (&[b"SERVICE=d"], 400_000_000),
4403 (&[b"SERVICE=e"], 500_000_000),
4404 ],
4405 );
4406
4407 let mut reader = FileReader::open(&path).expect("open reader");
4408 let query = ExplorerQuery {
4409 after_realtime_usec: Some(350_000_000),
4410 direction: Direction::Backward,
4411 limit: 10,
4412 realtime_slack_usec: 10_000_000,
4413 ..ExplorerQuery::default()
4414 };
4415
4416 let result = reader.explore(&query).expect("explore");
4417 assert_eq!(result.rows.len(), 2);
4418 assert_eq!(result.rows[0].realtime_usec, 500_000_000);
4419 assert_eq!(result.rows[1].realtime_usec, 400_000_000);
4420 assert_eq!(result.stats.rows_examined, 2);
4421 }
4422
4423 #[test]
4424 fn explorer_histogram_and_fts_are_opt_in() {
4425 let dir = TempDir::new().expect("tempdir");
4426 let path = dir.path().join("histogram.journal");
4427 write_entries(
4428 &path,
4429 None,
4430 &[
4431 (&[b"MESSAGE=alpha", b"PRIORITY=3"], 1_000),
4432 (&[b"MESSAGE=beta", b"PRIORITY=4"], 2_000),
4433 ],
4434 );
4435
4436 let mut reader = FileReader::open(&path).expect("open reader");
4437 let query = ExplorerQuery {
4438 after_realtime_usec: Some(0),
4439 before_realtime_usec: Some(3_000),
4440 histogram: Some(b"PRIORITY".to_vec()),
4441 histogram_target_buckets: 2,
4442 fts_patterns: vec![b"alp".to_vec()],
4443 limit: 10,
4444 ..ExplorerQuery::default()
4445 };
4446
4447 let result = reader.explore(&query).expect("explore");
4448 assert_eq!(result.rows.len(), 1);
4449 assert!(result.stats.fts_scans > 0);
4450 assert_eq!(
4451 result
4452 .histogram
4453 .as_ref()
4454 .expect("histogram")
4455 .buckets
4456 .iter()
4457 .flat_map(|bucket| bucket.values.values())
4458 .sum::<u64>(),
4459 1
4460 );
4461 }
4462
4463 #[test]
4464 fn explorer_first_value_stops_after_same_data_satisfies_multiple_roles() {
4465 let dir = TempDir::new().expect("tempdir");
4466 let path = dir.path().join("same-data-multiple-roles.journal");
4467 write_entries(
4468 &path,
4469 None,
4470 &[(
4471 &[b"_SOURCE_REALTIME_TIMESTAMP=1000", b"MESSAGE=after-source"],
4472 1_000,
4473 )],
4474 );
4475
4476 let mut reader = FileReader::open(&path).expect("open reader");
4477 let result = reader
4478 .explore(&ExplorerQuery {
4479 histogram: Some(SOURCE_REALTIME_FIELD.to_vec()),
4480 histogram_target_buckets: 1,
4481 limit: 0,
4482 field_mode: ExplorerFieldMode::FirstValue,
4483 ..ExplorerQuery::default()
4484 })
4485 .expect("explore");
4486
4487 assert_eq!(result.stats.histogram_updates, 1);
4488 assert_eq!(result.stats.early_stops, 1);
4489 assert_eq!(
4490 result
4491 .histogram
4492 .as_ref()
4493 .expect("histogram")
4494 .buckets
4495 .iter()
4496 .flat_map(|bucket| bucket.values.values())
4497 .sum::<u64>(),
4498 1
4499 );
4500 }
4501}