Skip to main content

luci/agg/
mod.rs

1//! Aggregation framework: collectors, result types, and aggregation definitions.
2//!
3//! See [[aggregations]] and [[feature-aggregations-v010|milestone-3]].
4
5pub mod bucket;
6pub mod hll;
7pub mod metric;
8pub mod parser;
9pub mod tdigest;
10
11use crate::core::{DocId, LuciError, Result, ScoreMode};
12use crate::query::Query;
13use crate::query::ast::ScoringExpression;
14use crate::search::searcher::Searcher;
15use crate::segment::reader::SegmentReader;
16use std::collections::HashMap;
17
18// --- Result types ---
19
20/// Top-level aggregation result.
21#[derive(Clone, Debug)]
22pub enum AggregationResult {
23    Metric(MetricResult),
24    Bucket(BucketResult),
25    Hits(HitsResult),
26}
27
28/// Result from a metric aggregation (single numeric value).
29#[derive(Clone, Debug)]
30pub struct MetricResult {
31    pub value: Option<f64>,
32    /// For stats agg: additional fields
33    pub extra: HashMap<String, f64>,
34    /// Opaque merge state (e.g., HLL registers, t-digest centroids).
35    /// Not serialized to JSON — used only during cross-segment merge.
36    pub merge_state: Option<Vec<u8>>,
37}
38
39impl MetricResult {
40    pub fn single(value: Option<f64>) -> Self {
41        Self {
42            value,
43            extra: HashMap::new(),
44            merge_state: None,
45        }
46    }
47
48    pub fn stats(count: u64, min: f64, max: f64, avg: f64, sum: f64) -> Self {
49        let mut extra = HashMap::new();
50        extra.insert("count".into(), count as f64);
51        extra.insert("min".into(), min);
52        extra.insert("max".into(), max);
53        extra.insert("sum".into(), sum);
54        Self {
55            value: Some(avg),
56            extra,
57            merge_state: None,
58        }
59    }
60}
61
62/// Result from a bucket aggregation.
63#[derive(Clone, Debug)]
64pub struct BucketResult {
65    pub buckets: Vec<Bucket>,
66}
67
68/// A single bucket in a bucket aggregation result.
69#[derive(Clone, Debug)]
70pub struct Bucket {
71    pub key: BucketKey,
72    pub doc_count: u64,
73    pub sub_aggs: HashMap<String, AggregationResult>,
74}
75
76/// The key identifying a bucket.
77#[derive(Clone, Debug)]
78pub enum BucketKey {
79    String(String),
80    Number(f64),
81    Range { from: Option<f64>, to: Option<f64> },
82}
83
84/// Result from a top_hits aggregation.
85#[derive(Clone, Debug)]
86pub struct HitsResult {
87    pub hits: Vec<serde_json::Value>,
88}
89
90// --- JSON serialization ---
91
92impl AggregationResult {
93    pub fn to_json(&self) -> serde_json::Value {
94        match self {
95            AggregationResult::Metric(m) => m.to_json(),
96            AggregationResult::Bucket(b) => b.to_json(),
97            AggregationResult::Hits(h) => h.to_json(),
98        }
99    }
100}
101
102impl HitsResult {
103    pub fn to_json(&self) -> serde_json::Value {
104        serde_json::json!({"hits": {"hits": self.hits}})
105    }
106}
107
108impl MetricResult {
109    pub fn to_json(&self) -> serde_json::Value {
110        if self.extra.is_empty() {
111            serde_json::json!({"value": self.value})
112        } else {
113            let mut obj = serde_json::Map::new();
114            if let Some(v) = self.value {
115                obj.insert("value".into(), serde_json::json!(v));
116            }
117            for (k, v) in &self.extra {
118                obj.insert(k.clone(), serde_json::json!(v));
119            }
120            serde_json::Value::Object(obj)
121        }
122    }
123}
124
125impl BucketResult {
126    pub fn to_json(&self) -> serde_json::Value {
127        let buckets: Vec<serde_json::Value> = self.buckets.iter().map(|b| b.to_json()).collect();
128        serde_json::json!({"buckets": buckets})
129    }
130}
131
132impl Bucket {
133    pub fn to_json(&self) -> serde_json::Value {
134        let mut obj = serde_json::Map::new();
135        match &self.key {
136            BucketKey::String(s) => {
137                obj.insert("key".into(), serde_json::json!(s));
138            }
139            BucketKey::Number(n) => {
140                obj.insert("key".into(), serde_json::json!(n));
141            }
142            BucketKey::Range { from, to } => {
143                if let Some(f) = from {
144                    obj.insert("from".into(), serde_json::json!(f));
145                }
146                if let Some(t) = to {
147                    obj.insert("to".into(), serde_json::json!(t));
148                }
149            }
150        }
151        obj.insert("doc_count".into(), serde_json::json!(self.doc_count));
152        for (name, result) in &self.sub_aggs {
153            obj.insert(name.clone(), result.to_json());
154        }
155        serde_json::Value::Object(obj)
156    }
157}
158
159// --- Collector traits ---
160
161/// Creates per-segment aggregation collectors and merges their results.
162pub trait AggregatorFactory: Send + Sync {
163    /// Create a collector for a specific segment.
164    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator>;
165
166    /// Merge results from multiple segments.
167    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult;
168}
169
170/// Per-segment aggregation collector. Called once per matching document.
171pub trait Aggregator: Send {
172    /// Collect a matching document.
173    fn collect(&mut self, doc_id: DocId);
174
175    /// Collect a contiguous range of doc IDs. Used by the match_all +
176    /// agg-only fast path. Default delegates to single-doc collect.
177    fn collect_range(&mut self, start: u32, end: u32) {
178        for i in start..end {
179            self.collect(DocId::new(i));
180        }
181    }
182
183    /// Produce the segment-level result.
184    fn finish(self: Box<Self>) -> AggregationResult;
185}
186
187// --- Aggregation expression ---
188
189/// A composable aggregation expression. The engine's native representation.
190/// JSON is parsed into this at the edge. Analogous to `ScoringExpression`.
191///
192/// `bind()` produces an `AggregatorFactory` for execution, analogous to
193/// `ScoringExpression::bind()` producing a `BoundQuery`.
194///
195/// See [[architecture-aggregation-expression]].
196#[derive(Clone, Debug)]
197pub enum AggregationExpression {
198    // --- Metric aggregations ---
199    Avg {
200        field: String,
201    },
202    Sum {
203        field: String,
204    },
205    Min {
206        field: String,
207    },
208    Max {
209        field: String,
210    },
211    ValueCount {
212        field: String,
213    },
214    Stats {
215        field: String,
216    },
217    ExtendedStats {
218        field: String,
219    },
220    Cardinality {
221        field: String,
222        precision_threshold: u32,
223    },
224    Percentiles {
225        field: String,
226        percents: Vec<f64>,
227        compression: f64,
228    },
229    GeoBounds {
230        field: String,
231    },
232    GeoCentroid {
233        field: String,
234    },
235    TopHits {
236        size: usize,
237    },
238
239    // --- Bucket aggregations ---
240    Terms {
241        field: String,
242        size: usize,
243        sub_aggs: Vec<(String, AggregationExpression)>,
244    },
245    Range {
246        field: String,
247        ranges: Vec<RangeDef>,
248        sub_aggs: Vec<(String, AggregationExpression)>,
249    },
250    Histogram {
251        field: String,
252        interval: f64,
253        sub_aggs: Vec<(String, AggregationExpression)>,
254    },
255    DateHistogram {
256        field: String,
257        interval: DateInterval,
258        sub_aggs: Vec<(String, AggregationExpression)>,
259    },
260    DateRange {
261        field: String,
262        ranges: Vec<RangeDef>,
263        sub_aggs: Vec<(String, AggregationExpression)>,
264    },
265    Filter {
266        query: ScoringExpression,
267        sub_aggs: Vec<(String, AggregationExpression)>,
268    },
269    Filters {
270        filters: Vec<(String, ScoringExpression)>,
271        sub_aggs: Vec<(String, AggregationExpression)>,
272    },
273    Nested {
274        path: String,
275        sub_aggs: Vec<(String, AggregationExpression)>,
276    },
277    ReverseNested {
278        sub_aggs: Vec<(String, AggregationExpression)>,
279    },
280    GeohashGrid {
281        field: String,
282        precision: usize,
283        size: usize,
284        sub_aggs: Vec<(String, AggregationExpression)>,
285    },
286}
287
288impl AggregationExpression {
289    /// Bind this expression to produce an AggregatorFactory for execution.
290    ///
291    /// Fallible and takes the live `Searcher` so the `Filter`/`Filters`
292    /// arms bind their filter query against the real mapping + global
293    /// HNSW: a bad-field knn filter surfaces `Err` instead of a silently
294    /// empty bucket, and a valid one actually runs. See
295    /// [[feature-knn-query-type]] §4b.
296    pub(crate) fn bind(&self, searcher: &Searcher) -> Result<Box<dyn AggregatorFactory>> {
297        use crate::agg::bucket::{
298            DateHistogramAggFactory, FilterAggFactory, FiltersAggFactory, GeohashGridAggFactory,
299            HistogramAggFactory, NestedAggFactory, RangeAggFactory, ReverseNestedAggFactory,
300            TermsAggFactory, TopHitsAggFactory,
301        };
302        use crate::agg::metric::{
303            GeoBoundsAggFactory, GeoCentroidAggFactory, MetricAggFactory, MetricType,
304        };
305
306        let factory: Box<dyn AggregatorFactory> = match self {
307            Self::Avg { field } => Box::new(MetricAggFactory {
308                field_name: field.clone(),
309                metric_type: MetricType::Avg,
310            }),
311            Self::Sum { field } => Box::new(MetricAggFactory {
312                field_name: field.clone(),
313                metric_type: MetricType::Sum,
314            }),
315            Self::Min { field } => Box::new(MetricAggFactory {
316                field_name: field.clone(),
317                metric_type: MetricType::Min,
318            }),
319            Self::Max { field } => Box::new(MetricAggFactory {
320                field_name: field.clone(),
321                metric_type: MetricType::Max,
322            }),
323            Self::ValueCount { field } => Box::new(MetricAggFactory {
324                field_name: field.clone(),
325                metric_type: MetricType::ValueCount,
326            }),
327            Self::Stats { field } => Box::new(MetricAggFactory {
328                field_name: field.clone(),
329                metric_type: MetricType::Stats,
330            }),
331            Self::ExtendedStats { field } => Box::new(MetricAggFactory {
332                field_name: field.clone(),
333                metric_type: MetricType::ExtendedStats,
334            }),
335            Self::Cardinality {
336                field,
337                precision_threshold,
338            } => {
339                let p = ((*precision_threshold as f64 * 1.5).log2().ceil() as u8).clamp(4, 18);
340                Box::new(crate::agg::hll::CardinalityAggFactory {
341                    field_name: field.clone(),
342                    precision: p,
343                })
344            }
345            Self::Percentiles {
346                field,
347                percents,
348                compression,
349            } => Box::new(crate::agg::tdigest::PercentilesAggFactory {
350                field_name: field.clone(),
351                percents: percents.clone(),
352                compression: *compression,
353            }),
354            Self::GeoBounds { field } => Box::new(GeoBoundsAggFactory {
355                field_name: field.clone(),
356            }),
357            Self::GeoCentroid { field } => Box::new(GeoCentroidAggFactory {
358                field_name: field.clone(),
359            }),
360            Self::TopHits { size } => Box::new(TopHitsAggFactory { size: *size }),
361            Self::Terms {
362                field,
363                size,
364                sub_aggs,
365            } => Box::new(TermsAggFactory {
366                field_name: field.clone(),
367                size: *size,
368                sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
369            }),
370            Self::Range {
371                field,
372                ranges,
373                sub_aggs,
374            } => {
375                reject_sub_aggs(sub_aggs, "range")?;
376                Box::new(RangeAggFactory {
377                    field_name: field.clone(),
378                    ranges: ranges.clone(),
379                })
380            }
381            Self::Histogram {
382                field,
383                interval,
384                sub_aggs,
385            } => {
386                reject_sub_aggs(sub_aggs, "histogram")?;
387                Box::new(HistogramAggFactory {
388                    field_name: field.clone(),
389                    interval: *interval,
390                })
391            }
392            Self::DateHistogram {
393                field,
394                interval,
395                sub_aggs,
396            } => {
397                reject_sub_aggs(sub_aggs, "date_histogram")?;
398                Box::new(DateHistogramAggFactory {
399                    field_name: field.clone(),
400                    interval: interval.clone(),
401                })
402            }
403            Self::DateRange {
404                field,
405                ranges,
406                sub_aggs,
407            } => {
408                reject_sub_aggs(sub_aggs, "date_range")?;
409                Box::new(RangeAggFactory {
410                    field_name: field.clone(),
411                    ranges: ranges.clone(),
412                })
413            }
414            Self::Filter { query, sub_aggs } => Box::new(FilterAggFactory {
415                bound_query: query.bind(searcher, ScoreMode::CompleteNoScores)?,
416                sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
417            }),
418            Self::Filters { filters, sub_aggs } => {
419                reject_sub_aggs(sub_aggs, "filters")?;
420                let compiled = filters
421                    .iter()
422                    .map(|(name, ast)| {
423                        Ok((
424                            name.clone(),
425                            ast.bind(searcher, ScoreMode::CompleteNoScores)?,
426                        ))
427                    })
428                    .collect::<Result<Vec<_>>>()?;
429                Box::new(FiltersAggFactory { filters: compiled })
430            }
431            Self::Nested { path, sub_aggs } => Box::new(NestedAggFactory {
432                path: path.clone(),
433                sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
434            }),
435            Self::ReverseNested { sub_aggs } => Box::new(ReverseNestedAggFactory {
436                sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
437            }),
438            Self::GeohashGrid {
439                field,
440                precision,
441                size,
442                sub_aggs,
443            } => {
444                reject_sub_aggs(sub_aggs, "geohash_grid")?;
445                Box::new(GeohashGridAggFactory {
446                    field_name: field.clone(),
447                    precision: *precision,
448                    size: *size,
449                })
450            }
451        };
452        Ok(factory)
453    }
454}
455
456/// Refuse a bucket aggregation that parses sub-aggregations but does not
457/// implement them yet. Returns an explicit error rather than silently
458/// dropping the request (the historical `..`-match behaviour). `filters`
459/// and the five remaining bucket arms (range, date_range, histogram,
460/// date_histogram, geohash_grid) all support sub-aggs in Elasticsearch;
461/// implementing them is deferred. See [[code-must-not-lie]] and
462/// [[bucket-agg-sub-aggs-silent-drop]].
463fn reject_sub_aggs(sub_aggs: &[(String, AggregationExpression)], agg: &str) -> Result<()> {
464    if sub_aggs.is_empty() {
465        Ok(())
466    } else {
467        Err(LuciError::InvalidQuery(format!(
468            "sub-aggregations under a `{agg}` aggregation are not yet supported"
469        )))
470    }
471}
472
473fn bind_sub_aggs(
474    sub_aggs: &[(String, AggregationExpression)],
475    searcher: &Searcher,
476) -> Result<Vec<(String, Box<dyn AggregatorFactory>)>> {
477    sub_aggs
478        .iter()
479        .map(|(name, expr)| Ok((name.clone(), expr.bind(searcher)?)))
480        .collect()
481}
482
483/// Date histogram interval.
484#[derive(Clone, Debug)]
485pub enum DateInterval {
486    /// Fixed interval in milliseconds (e.g., "1d" = 86400000ms).
487    Fixed(f64),
488    /// Calendar interval: month, quarter, year (variable length).
489    Calendar(CalendarInterval),
490}
491
492/// Calendar-aware intervals.
493#[derive(Clone, Debug)]
494pub enum CalendarInterval {
495    Minute,
496    Hour,
497    Day,
498    Week,
499    Month,
500    Quarter,
501    Year,
502}
503
504/// A range definition for range aggregation.
505#[derive(Clone, Debug)]
506pub struct RangeDef {
507    pub key: Option<String>,
508    pub from: Option<f64>,
509    pub to: Option<f64>,
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515
516    #[test]
517    fn metric_result_json() {
518        let r = MetricResult::single(Some(3.14));
519        let j = r.to_json();
520        assert_eq!(j["value"], 3.14);
521    }
522
523    #[test]
524    fn metric_result_null() {
525        let r = MetricResult::single(None);
526        let j = r.to_json();
527        assert!(j["value"].is_null());
528    }
529
530    #[test]
531    fn stats_result_json() {
532        let r = MetricResult::stats(10, 1.0, 100.0, 50.5, 505.0);
533        let j = r.to_json();
534        assert_eq!(j["value"], 50.5);
535        assert_eq!(j["count"], 10.0);
536        assert_eq!(j["min"], 1.0);
537    }
538
539    #[test]
540    fn bucket_result_json() {
541        let r = BucketResult {
542            buckets: vec![Bucket {
543                key: BucketKey::String("tech".into()),
544                doc_count: 42,
545                sub_aggs: HashMap::new(),
546            }],
547        };
548        let j = r.to_json();
549        assert_eq!(j["buckets"][0]["key"], "tech");
550        assert_eq!(j["buckets"][0]["doc_count"], 42);
551    }
552
553    #[test]
554    fn nested_sub_agg_json() {
555        let r = BucketResult {
556            buckets: vec![Bucket {
557                key: BucketKey::String("a".into()),
558                doc_count: 10,
559                sub_aggs: {
560                    let mut m = HashMap::new();
561                    m.insert(
562                        "avg_price".into(),
563                        AggregationResult::Metric(MetricResult::single(Some(25.0))),
564                    );
565                    m
566                },
567            }],
568        };
569        let j = r.to_json();
570        assert_eq!(j["buckets"][0]["avg_price"]["value"], 25.0);
571    }
572
573    #[test]
574    fn range_key_json() {
575        let b = Bucket {
576            key: BucketKey::Range {
577                from: Some(0.0),
578                to: Some(100.0),
579            },
580            doc_count: 5,
581            sub_aggs: HashMap::new(),
582        };
583        let j = b.to_json();
584        assert_eq!(j["from"], 0.0);
585        assert_eq!(j["to"], 100.0);
586    }
587}