1pub mod bucket;
6pub mod hll;
7pub mod metric;
8pub mod parser;
9pub mod tdigest;
10
11use crate::core::{DocId, LuciError, Result, ScoreMode};
12use crate::query::Query;
13use crate::query::ast::ScoringExpression;
14use crate::search::searcher::Searcher;
15use crate::segment::reader::SegmentReader;
16use std::collections::HashMap;
17
18#[derive(Clone, Debug)]
22pub enum AggregationResult {
23 Metric(MetricResult),
24 Bucket(BucketResult),
25 Hits(HitsResult),
26}
27
28#[derive(Clone, Debug)]
30pub struct MetricResult {
31 pub value: Option<f64>,
32 pub extra: HashMap<String, f64>,
34 pub merge_state: Option<Vec<u8>>,
37}
38
39impl MetricResult {
40 pub fn single(value: Option<f64>) -> Self {
41 Self {
42 value,
43 extra: HashMap::new(),
44 merge_state: None,
45 }
46 }
47
48 pub fn stats(count: u64, min: f64, max: f64, avg: f64, sum: f64) -> Self {
49 let mut extra = HashMap::new();
50 extra.insert("count".into(), count as f64);
51 extra.insert("min".into(), min);
52 extra.insert("max".into(), max);
53 extra.insert("sum".into(), sum);
54 Self {
55 value: Some(avg),
56 extra,
57 merge_state: None,
58 }
59 }
60}
61
62#[derive(Clone, Debug)]
64pub struct BucketResult {
65 pub buckets: Vec<Bucket>,
66}
67
68#[derive(Clone, Debug)]
70pub struct Bucket {
71 pub key: BucketKey,
72 pub doc_count: u64,
73 pub sub_aggs: HashMap<String, AggregationResult>,
74}
75
76#[derive(Clone, Debug)]
78pub enum BucketKey {
79 String(String),
80 Number(f64),
81 Range { from: Option<f64>, to: Option<f64> },
82}
83
84#[derive(Clone, Debug)]
86pub struct HitsResult {
87 pub hits: Vec<serde_json::Value>,
88}
89
90impl AggregationResult {
93 pub fn to_json(&self) -> serde_json::Value {
94 match self {
95 AggregationResult::Metric(m) => m.to_json(),
96 AggregationResult::Bucket(b) => b.to_json(),
97 AggregationResult::Hits(h) => h.to_json(),
98 }
99 }
100}
101
102impl HitsResult {
103 pub fn to_json(&self) -> serde_json::Value {
104 serde_json::json!({"hits": {"hits": self.hits}})
105 }
106}
107
108impl MetricResult {
109 pub fn to_json(&self) -> serde_json::Value {
110 if self.extra.is_empty() {
111 serde_json::json!({"value": self.value})
112 } else {
113 let mut obj = serde_json::Map::new();
114 if let Some(v) = self.value {
115 obj.insert("value".into(), serde_json::json!(v));
116 }
117 for (k, v) in &self.extra {
118 obj.insert(k.clone(), serde_json::json!(v));
119 }
120 serde_json::Value::Object(obj)
121 }
122 }
123}
124
125impl BucketResult {
126 pub fn to_json(&self) -> serde_json::Value {
127 let buckets: Vec<serde_json::Value> = self.buckets.iter().map(|b| b.to_json()).collect();
128 serde_json::json!({"buckets": buckets})
129 }
130}
131
132impl Bucket {
133 pub fn to_json(&self) -> serde_json::Value {
134 let mut obj = serde_json::Map::new();
135 match &self.key {
136 BucketKey::String(s) => {
137 obj.insert("key".into(), serde_json::json!(s));
138 }
139 BucketKey::Number(n) => {
140 obj.insert("key".into(), serde_json::json!(n));
141 }
142 BucketKey::Range { from, to } => {
143 if let Some(f) = from {
144 obj.insert("from".into(), serde_json::json!(f));
145 }
146 if let Some(t) = to {
147 obj.insert("to".into(), serde_json::json!(t));
148 }
149 }
150 }
151 obj.insert("doc_count".into(), serde_json::json!(self.doc_count));
152 for (name, result) in &self.sub_aggs {
153 obj.insert(name.clone(), result.to_json());
154 }
155 serde_json::Value::Object(obj)
156 }
157}
158
159pub trait AggregatorFactory: Send + Sync {
163 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator>;
165
166 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult;
168}
169
170pub trait Aggregator: Send {
172 fn collect(&mut self, doc_id: DocId);
174
175 fn collect_range(&mut self, start: u32, end: u32) {
178 for i in start..end {
179 self.collect(DocId::new(i));
180 }
181 }
182
183 fn finish(self: Box<Self>) -> AggregationResult;
185}
186
187#[derive(Clone, Debug)]
197pub enum AggregationExpression {
198 Avg {
200 field: String,
201 },
202 Sum {
203 field: String,
204 },
205 Min {
206 field: String,
207 },
208 Max {
209 field: String,
210 },
211 ValueCount {
212 field: String,
213 },
214 Stats {
215 field: String,
216 },
217 ExtendedStats {
218 field: String,
219 },
220 Cardinality {
221 field: String,
222 precision_threshold: u32,
223 },
224 Percentiles {
225 field: String,
226 percents: Vec<f64>,
227 compression: f64,
228 },
229 GeoBounds {
230 field: String,
231 },
232 GeoCentroid {
233 field: String,
234 },
235 TopHits {
236 size: usize,
237 },
238
239 Terms {
241 field: String,
242 size: usize,
243 sub_aggs: Vec<(String, AggregationExpression)>,
244 },
245 Range {
246 field: String,
247 ranges: Vec<RangeDef>,
248 sub_aggs: Vec<(String, AggregationExpression)>,
249 },
250 Histogram {
251 field: String,
252 interval: f64,
253 sub_aggs: Vec<(String, AggregationExpression)>,
254 },
255 DateHistogram {
256 field: String,
257 interval: DateInterval,
258 sub_aggs: Vec<(String, AggregationExpression)>,
259 },
260 DateRange {
261 field: String,
262 ranges: Vec<RangeDef>,
263 sub_aggs: Vec<(String, AggregationExpression)>,
264 },
265 Filter {
266 query: ScoringExpression,
267 sub_aggs: Vec<(String, AggregationExpression)>,
268 },
269 Filters {
270 filters: Vec<(String, ScoringExpression)>,
271 sub_aggs: Vec<(String, AggregationExpression)>,
272 },
273 Nested {
274 path: String,
275 sub_aggs: Vec<(String, AggregationExpression)>,
276 },
277 ReverseNested {
278 sub_aggs: Vec<(String, AggregationExpression)>,
279 },
280 GeohashGrid {
281 field: String,
282 precision: usize,
283 size: usize,
284 sub_aggs: Vec<(String, AggregationExpression)>,
285 },
286}
287
288impl AggregationExpression {
289 pub(crate) fn bind(&self, searcher: &Searcher) -> Result<Box<dyn AggregatorFactory>> {
297 use crate::agg::bucket::{
298 DateHistogramAggFactory, FilterAggFactory, FiltersAggFactory, GeohashGridAggFactory,
299 HistogramAggFactory, NestedAggFactory, RangeAggFactory, ReverseNestedAggFactory,
300 TermsAggFactory, TopHitsAggFactory,
301 };
302 use crate::agg::metric::{
303 GeoBoundsAggFactory, GeoCentroidAggFactory, MetricAggFactory, MetricType,
304 };
305
306 let factory: Box<dyn AggregatorFactory> = match self {
307 Self::Avg { field } => Box::new(MetricAggFactory {
308 field_name: field.clone(),
309 metric_type: MetricType::Avg,
310 }),
311 Self::Sum { field } => Box::new(MetricAggFactory {
312 field_name: field.clone(),
313 metric_type: MetricType::Sum,
314 }),
315 Self::Min { field } => Box::new(MetricAggFactory {
316 field_name: field.clone(),
317 metric_type: MetricType::Min,
318 }),
319 Self::Max { field } => Box::new(MetricAggFactory {
320 field_name: field.clone(),
321 metric_type: MetricType::Max,
322 }),
323 Self::ValueCount { field } => Box::new(MetricAggFactory {
324 field_name: field.clone(),
325 metric_type: MetricType::ValueCount,
326 }),
327 Self::Stats { field } => Box::new(MetricAggFactory {
328 field_name: field.clone(),
329 metric_type: MetricType::Stats,
330 }),
331 Self::ExtendedStats { field } => Box::new(MetricAggFactory {
332 field_name: field.clone(),
333 metric_type: MetricType::ExtendedStats,
334 }),
335 Self::Cardinality {
336 field,
337 precision_threshold,
338 } => {
339 let p = ((*precision_threshold as f64 * 1.5).log2().ceil() as u8).clamp(4, 18);
340 Box::new(crate::agg::hll::CardinalityAggFactory {
341 field_name: field.clone(),
342 precision: p,
343 })
344 }
345 Self::Percentiles {
346 field,
347 percents,
348 compression,
349 } => Box::new(crate::agg::tdigest::PercentilesAggFactory {
350 field_name: field.clone(),
351 percents: percents.clone(),
352 compression: *compression,
353 }),
354 Self::GeoBounds { field } => Box::new(GeoBoundsAggFactory {
355 field_name: field.clone(),
356 }),
357 Self::GeoCentroid { field } => Box::new(GeoCentroidAggFactory {
358 field_name: field.clone(),
359 }),
360 Self::TopHits { size } => Box::new(TopHitsAggFactory { size: *size }),
361 Self::Terms {
362 field,
363 size,
364 sub_aggs,
365 } => Box::new(TermsAggFactory {
366 field_name: field.clone(),
367 size: *size,
368 sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
369 }),
370 Self::Range {
371 field,
372 ranges,
373 sub_aggs,
374 } => {
375 reject_sub_aggs(sub_aggs, "range")?;
376 Box::new(RangeAggFactory {
377 field_name: field.clone(),
378 ranges: ranges.clone(),
379 })
380 }
381 Self::Histogram {
382 field,
383 interval,
384 sub_aggs,
385 } => {
386 reject_sub_aggs(sub_aggs, "histogram")?;
387 Box::new(HistogramAggFactory {
388 field_name: field.clone(),
389 interval: *interval,
390 })
391 }
392 Self::DateHistogram {
393 field,
394 interval,
395 sub_aggs,
396 } => {
397 reject_sub_aggs(sub_aggs, "date_histogram")?;
398 Box::new(DateHistogramAggFactory {
399 field_name: field.clone(),
400 interval: interval.clone(),
401 })
402 }
403 Self::DateRange {
404 field,
405 ranges,
406 sub_aggs,
407 } => {
408 reject_sub_aggs(sub_aggs, "date_range")?;
409 Box::new(RangeAggFactory {
410 field_name: field.clone(),
411 ranges: ranges.clone(),
412 })
413 }
414 Self::Filter { query, sub_aggs } => Box::new(FilterAggFactory {
415 bound_query: query.bind(searcher, ScoreMode::CompleteNoScores)?,
416 sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
417 }),
418 Self::Filters { filters, sub_aggs } => {
419 reject_sub_aggs(sub_aggs, "filters")?;
420 let compiled = filters
421 .iter()
422 .map(|(name, ast)| {
423 Ok((
424 name.clone(),
425 ast.bind(searcher, ScoreMode::CompleteNoScores)?,
426 ))
427 })
428 .collect::<Result<Vec<_>>>()?;
429 Box::new(FiltersAggFactory { filters: compiled })
430 }
431 Self::Nested { path, sub_aggs } => Box::new(NestedAggFactory {
432 path: path.clone(),
433 sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
434 }),
435 Self::ReverseNested { sub_aggs } => Box::new(ReverseNestedAggFactory {
436 sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
437 }),
438 Self::GeohashGrid {
439 field,
440 precision,
441 size,
442 sub_aggs,
443 } => {
444 reject_sub_aggs(sub_aggs, "geohash_grid")?;
445 Box::new(GeohashGridAggFactory {
446 field_name: field.clone(),
447 precision: *precision,
448 size: *size,
449 })
450 }
451 };
452 Ok(factory)
453 }
454}
455
456fn reject_sub_aggs(sub_aggs: &[(String, AggregationExpression)], agg: &str) -> Result<()> {
464 if sub_aggs.is_empty() {
465 Ok(())
466 } else {
467 Err(LuciError::InvalidQuery(format!(
468 "sub-aggregations under a `{agg}` aggregation are not yet supported"
469 )))
470 }
471}
472
473fn bind_sub_aggs(
474 sub_aggs: &[(String, AggregationExpression)],
475 searcher: &Searcher,
476) -> Result<Vec<(String, Box<dyn AggregatorFactory>)>> {
477 sub_aggs
478 .iter()
479 .map(|(name, expr)| Ok((name.clone(), expr.bind(searcher)?)))
480 .collect()
481}
482
483#[derive(Clone, Debug)]
485pub enum DateInterval {
486 Fixed(f64),
488 Calendar(CalendarInterval),
490}
491
492#[derive(Clone, Debug)]
494pub enum CalendarInterval {
495 Minute,
496 Hour,
497 Day,
498 Week,
499 Month,
500 Quarter,
501 Year,
502}
503
504#[derive(Clone, Debug)]
506pub struct RangeDef {
507 pub key: Option<String>,
508 pub from: Option<f64>,
509 pub to: Option<f64>,
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515
516 #[test]
517 fn metric_result_json() {
518 let r = MetricResult::single(Some(3.14));
519 let j = r.to_json();
520 assert_eq!(j["value"], 3.14);
521 }
522
523 #[test]
524 fn metric_result_null() {
525 let r = MetricResult::single(None);
526 let j = r.to_json();
527 assert!(j["value"].is_null());
528 }
529
530 #[test]
531 fn stats_result_json() {
532 let r = MetricResult::stats(10, 1.0, 100.0, 50.5, 505.0);
533 let j = r.to_json();
534 assert_eq!(j["value"], 50.5);
535 assert_eq!(j["count"], 10.0);
536 assert_eq!(j["min"], 1.0);
537 }
538
539 #[test]
540 fn bucket_result_json() {
541 let r = BucketResult {
542 buckets: vec![Bucket {
543 key: BucketKey::String("tech".into()),
544 doc_count: 42,
545 sub_aggs: HashMap::new(),
546 }],
547 };
548 let j = r.to_json();
549 assert_eq!(j["buckets"][0]["key"], "tech");
550 assert_eq!(j["buckets"][0]["doc_count"], 42);
551 }
552
553 #[test]
554 fn nested_sub_agg_json() {
555 let r = BucketResult {
556 buckets: vec![Bucket {
557 key: BucketKey::String("a".into()),
558 doc_count: 10,
559 sub_aggs: {
560 let mut m = HashMap::new();
561 m.insert(
562 "avg_price".into(),
563 AggregationResult::Metric(MetricResult::single(Some(25.0))),
564 );
565 m
566 },
567 }],
568 };
569 let j = r.to_json();
570 assert_eq!(j["buckets"][0]["avg_price"]["value"], 25.0);
571 }
572
573 #[test]
574 fn range_key_json() {
575 let b = Bucket {
576 key: BucketKey::Range {
577 from: Some(0.0),
578 to: Some(100.0),
579 },
580 doc_count: 5,
581 sub_aggs: HashMap::new(),
582 };
583 let j = b.to_json();
584 assert_eq!(j["from"], 0.0);
585 assert_eq!(j["to"], 100.0);
586 }
587}