pub mod bucket;
pub mod hll;
pub mod metric;
pub mod parser;
pub mod tdigest;
use crate::core::{DocId, LuciError, Result, ScoreMode};
use crate::query::Query;
use crate::query::ast::ScoringExpression;
use crate::search::searcher::Searcher;
use crate::segment::reader::SegmentReader;
use std::collections::HashMap;
#[derive(Clone, Debug)]
pub enum AggregationResult {
Metric(MetricResult),
Bucket(BucketResult),
Hits(HitsResult),
}
#[derive(Clone, Debug)]
pub struct MetricResult {
pub value: Option<f64>,
pub extra: HashMap<String, f64>,
pub merge_state: Option<Vec<u8>>,
}
impl MetricResult {
pub fn single(value: Option<f64>) -> Self {
Self {
value,
extra: HashMap::new(),
merge_state: None,
}
}
pub fn stats(count: u64, min: f64, max: f64, avg: f64, sum: f64) -> Self {
let mut extra = HashMap::new();
extra.insert("count".into(), count as f64);
extra.insert("min".into(), min);
extra.insert("max".into(), max);
extra.insert("sum".into(), sum);
Self {
value: Some(avg),
extra,
merge_state: None,
}
}
}
#[derive(Clone, Debug)]
pub struct BucketResult {
pub buckets: Vec<Bucket>,
}
#[derive(Clone, Debug)]
pub struct Bucket {
pub key: BucketKey,
pub doc_count: u64,
pub sub_aggs: HashMap<String, AggregationResult>,
}
#[derive(Clone, Debug)]
pub enum BucketKey {
String(String),
Number(f64),
Range { from: Option<f64>, to: Option<f64> },
}
#[derive(Clone, Debug)]
pub struct HitsResult {
pub hits: Vec<serde_json::Value>,
}
impl AggregationResult {
pub fn to_json(&self) -> serde_json::Value {
match self {
AggregationResult::Metric(m) => m.to_json(),
AggregationResult::Bucket(b) => b.to_json(),
AggregationResult::Hits(h) => h.to_json(),
}
}
}
impl HitsResult {
pub fn to_json(&self) -> serde_json::Value {
serde_json::json!({"hits": {"hits": self.hits}})
}
}
impl MetricResult {
pub fn to_json(&self) -> serde_json::Value {
if self.extra.is_empty() {
serde_json::json!({"value": self.value})
} else {
let mut obj = serde_json::Map::new();
if let Some(v) = self.value {
obj.insert("value".into(), serde_json::json!(v));
}
for (k, v) in &self.extra {
obj.insert(k.clone(), serde_json::json!(v));
}
serde_json::Value::Object(obj)
}
}
}
impl BucketResult {
pub fn to_json(&self) -> serde_json::Value {
let buckets: Vec<serde_json::Value> = self.buckets.iter().map(|b| b.to_json()).collect();
serde_json::json!({"buckets": buckets})
}
}
impl Bucket {
pub fn to_json(&self) -> serde_json::Value {
let mut obj = serde_json::Map::new();
match &self.key {
BucketKey::String(s) => {
obj.insert("key".into(), serde_json::json!(s));
}
BucketKey::Number(n) => {
obj.insert("key".into(), serde_json::json!(n));
}
BucketKey::Range { from, to } => {
if let Some(f) = from {
obj.insert("from".into(), serde_json::json!(f));
}
if let Some(t) = to {
obj.insert("to".into(), serde_json::json!(t));
}
}
}
obj.insert("doc_count".into(), serde_json::json!(self.doc_count));
for (name, result) in &self.sub_aggs {
obj.insert(name.clone(), result.to_json());
}
serde_json::Value::Object(obj)
}
}
pub trait AggregatorFactory: Send + Sync {
fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator>;
fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult;
}
pub trait Aggregator: Send {
fn collect(&mut self, doc_id: DocId);
fn collect_range(&mut self, start: u32, end: u32) {
for i in start..end {
self.collect(DocId::new(i));
}
}
fn finish(self: Box<Self>) -> AggregationResult;
}
#[derive(Clone, Debug)]
pub enum AggregationExpression {
Avg {
field: String,
},
Sum {
field: String,
},
Min {
field: String,
},
Max {
field: String,
},
ValueCount {
field: String,
},
Stats {
field: String,
},
ExtendedStats {
field: String,
},
Cardinality {
field: String,
precision_threshold: u32,
},
Percentiles {
field: String,
percents: Vec<f64>,
compression: f64,
},
GeoBounds {
field: String,
},
GeoCentroid {
field: String,
},
TopHits {
size: usize,
},
Terms {
field: String,
size: usize,
sub_aggs: Vec<(String, AggregationExpression)>,
},
Range {
field: String,
ranges: Vec<RangeDef>,
sub_aggs: Vec<(String, AggregationExpression)>,
},
Histogram {
field: String,
interval: f64,
sub_aggs: Vec<(String, AggregationExpression)>,
},
DateHistogram {
field: String,
interval: DateInterval,
sub_aggs: Vec<(String, AggregationExpression)>,
},
DateRange {
field: String,
ranges: Vec<RangeDef>,
sub_aggs: Vec<(String, AggregationExpression)>,
},
Filter {
query: ScoringExpression,
sub_aggs: Vec<(String, AggregationExpression)>,
},
Filters {
filters: Vec<(String, ScoringExpression)>,
sub_aggs: Vec<(String, AggregationExpression)>,
},
Nested {
path: String,
sub_aggs: Vec<(String, AggregationExpression)>,
},
ReverseNested {
sub_aggs: Vec<(String, AggregationExpression)>,
},
GeohashGrid {
field: String,
precision: usize,
size: usize,
sub_aggs: Vec<(String, AggregationExpression)>,
},
}
impl AggregationExpression {
pub(crate) fn bind(&self, searcher: &Searcher) -> Result<Box<dyn AggregatorFactory>> {
use crate::agg::bucket::{
DateHistogramAggFactory, FilterAggFactory, FiltersAggFactory, GeohashGridAggFactory,
HistogramAggFactory, NestedAggFactory, RangeAggFactory, ReverseNestedAggFactory,
TermsAggFactory, TopHitsAggFactory,
};
use crate::agg::metric::{
GeoBoundsAggFactory, GeoCentroidAggFactory, MetricAggFactory, MetricType,
};
let factory: Box<dyn AggregatorFactory> = match self {
Self::Avg { field } => Box::new(MetricAggFactory {
field_name: field.clone(),
metric_type: MetricType::Avg,
}),
Self::Sum { field } => Box::new(MetricAggFactory {
field_name: field.clone(),
metric_type: MetricType::Sum,
}),
Self::Min { field } => Box::new(MetricAggFactory {
field_name: field.clone(),
metric_type: MetricType::Min,
}),
Self::Max { field } => Box::new(MetricAggFactory {
field_name: field.clone(),
metric_type: MetricType::Max,
}),
Self::ValueCount { field } => Box::new(MetricAggFactory {
field_name: field.clone(),
metric_type: MetricType::ValueCount,
}),
Self::Stats { field } => Box::new(MetricAggFactory {
field_name: field.clone(),
metric_type: MetricType::Stats,
}),
Self::ExtendedStats { field } => Box::new(MetricAggFactory {
field_name: field.clone(),
metric_type: MetricType::ExtendedStats,
}),
Self::Cardinality {
field,
precision_threshold,
} => {
let p = ((*precision_threshold as f64 * 1.5).log2().ceil() as u8).clamp(4, 18);
Box::new(crate::agg::hll::CardinalityAggFactory {
field_name: field.clone(),
precision: p,
})
}
Self::Percentiles {
field,
percents,
compression,
} => Box::new(crate::agg::tdigest::PercentilesAggFactory {
field_name: field.clone(),
percents: percents.clone(),
compression: *compression,
}),
Self::GeoBounds { field } => Box::new(GeoBoundsAggFactory {
field_name: field.clone(),
}),
Self::GeoCentroid { field } => Box::new(GeoCentroidAggFactory {
field_name: field.clone(),
}),
Self::TopHits { size } => Box::new(TopHitsAggFactory { size: *size }),
Self::Terms {
field,
size,
sub_aggs,
} => Box::new(TermsAggFactory {
field_name: field.clone(),
size: *size,
sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
}),
Self::Range {
field,
ranges,
sub_aggs,
} => {
reject_sub_aggs(sub_aggs, "range")?;
Box::new(RangeAggFactory {
field_name: field.clone(),
ranges: ranges.clone(),
})
}
Self::Histogram {
field,
interval,
sub_aggs,
} => {
reject_sub_aggs(sub_aggs, "histogram")?;
Box::new(HistogramAggFactory {
field_name: field.clone(),
interval: *interval,
})
}
Self::DateHistogram {
field,
interval,
sub_aggs,
} => {
reject_sub_aggs(sub_aggs, "date_histogram")?;
Box::new(DateHistogramAggFactory {
field_name: field.clone(),
interval: interval.clone(),
})
}
Self::DateRange {
field,
ranges,
sub_aggs,
} => {
reject_sub_aggs(sub_aggs, "date_range")?;
Box::new(RangeAggFactory {
field_name: field.clone(),
ranges: ranges.clone(),
})
}
Self::Filter { query, sub_aggs } => Box::new(FilterAggFactory {
bound_query: query.bind(searcher, ScoreMode::CompleteNoScores)?,
sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
}),
Self::Filters { filters, sub_aggs } => {
reject_sub_aggs(sub_aggs, "filters")?;
let compiled = filters
.iter()
.map(|(name, ast)| {
Ok((
name.clone(),
ast.bind(searcher, ScoreMode::CompleteNoScores)?,
))
})
.collect::<Result<Vec<_>>>()?;
Box::new(FiltersAggFactory { filters: compiled })
}
Self::Nested { path, sub_aggs } => Box::new(NestedAggFactory {
path: path.clone(),
sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
}),
Self::ReverseNested { sub_aggs } => Box::new(ReverseNestedAggFactory {
sub_agg_factories: bind_sub_aggs(sub_aggs, searcher)?,
}),
Self::GeohashGrid {
field,
precision,
size,
sub_aggs,
} => {
reject_sub_aggs(sub_aggs, "geohash_grid")?;
Box::new(GeohashGridAggFactory {
field_name: field.clone(),
precision: *precision,
size: *size,
})
}
};
Ok(factory)
}
}
fn reject_sub_aggs(sub_aggs: &[(String, AggregationExpression)], agg: &str) -> Result<()> {
if sub_aggs.is_empty() {
Ok(())
} else {
Err(LuciError::InvalidQuery(format!(
"sub-aggregations under a `{agg}` aggregation are not yet supported"
)))
}
}
fn bind_sub_aggs(
sub_aggs: &[(String, AggregationExpression)],
searcher: &Searcher,
) -> Result<Vec<(String, Box<dyn AggregatorFactory>)>> {
sub_aggs
.iter()
.map(|(name, expr)| Ok((name.clone(), expr.bind(searcher)?)))
.collect()
}
#[derive(Clone, Debug)]
pub enum DateInterval {
Fixed(f64),
Calendar(CalendarInterval),
}
#[derive(Clone, Debug)]
pub enum CalendarInterval {
Minute,
Hour,
Day,
Week,
Month,
Quarter,
Year,
}
#[derive(Clone, Debug)]
pub struct RangeDef {
pub key: Option<String>,
pub from: Option<f64>,
pub to: Option<f64>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn metric_result_json() {
let r = MetricResult::single(Some(3.14));
let j = r.to_json();
assert_eq!(j["value"], 3.14);
}
#[test]
fn metric_result_null() {
let r = MetricResult::single(None);
let j = r.to_json();
assert!(j["value"].is_null());
}
#[test]
fn stats_result_json() {
let r = MetricResult::stats(10, 1.0, 100.0, 50.5, 505.0);
let j = r.to_json();
assert_eq!(j["value"], 50.5);
assert_eq!(j["count"], 10.0);
assert_eq!(j["min"], 1.0);
}
#[test]
fn bucket_result_json() {
let r = BucketResult {
buckets: vec![Bucket {
key: BucketKey::String("tech".into()),
doc_count: 42,
sub_aggs: HashMap::new(),
}],
};
let j = r.to_json();
assert_eq!(j["buckets"][0]["key"], "tech");
assert_eq!(j["buckets"][0]["doc_count"], 42);
}
#[test]
fn nested_sub_agg_json() {
let r = BucketResult {
buckets: vec![Bucket {
key: BucketKey::String("a".into()),
doc_count: 10,
sub_aggs: {
let mut m = HashMap::new();
m.insert(
"avg_price".into(),
AggregationResult::Metric(MetricResult::single(Some(25.0))),
);
m
},
}],
};
let j = r.to_json();
assert_eq!(j["buckets"][0]["avg_price"]["value"], 25.0);
}
#[test]
fn range_key_json() {
let b = Bucket {
key: BucketKey::Range {
from: Some(0.0),
to: Some(100.0),
},
doc_count: 5,
sub_aggs: HashMap::new(),
};
let j = b.to_json();
assert_eq!(j["from"], 0.0);
assert_eq!(j["to"], 100.0);
}
}