Skip to main content

luci/agg/
bucket.rs

1//! Bucket aggregation implementations: terms, range, histogram, filter.
2//!
3//! See [[aggregations]] and [[feature-aggregations-v010#Step 6]].
4
5use crate::core::DocId;
6use std::collections::HashMap;
7
8use super::*;
9use crate::segment::reader::SegmentReader;
10
11/// Re-export so existing `super::bucket::OwnedColumn` paths in sibling agg
12/// modules (`hll.rs`, `metric.rs`, `tdigest.rs`) keep compiling. Promoted to
13/// `crate::columnar::owned` in May 2026 — see
14/// [[optimize-hit-id-column-reader-cache]] §"Lifetime resolution — extend the
15/// existing `OwnedColumn`".
16pub(super) use crate::columnar::owned::OwnedColumn;
17
18// --- Terms aggregation ---
19
20pub struct TermsAggFactory {
21    pub field_name: String,
22    pub size: usize,
23    pub sub_agg_factories: Vec<(String, Box<dyn AggregatorFactory>)>,
24}
25
26impl AggregatorFactory for TermsAggFactory {
27    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
28        let field_id = reader
29            .header()
30            .fields
31            .iter()
32            .find(|f| f.field_name == self.field_name)
33            .map(|f| f.field_id);
34
35        let col = OwnedColumn::new(field_id, reader);
36        let dict_size = col.as_ref().map_or(0, |c| c.dict_size());
37
38        Box::new(TermsCollector {
39            col,
40            segment_data: reader as *const SegmentReader,
41            ordinal_buckets: Vec::with_capacity(dict_size),
42            sub_agg_factories: self.sub_agg_factories.as_slice()
43                as *const [(String, Box<dyn AggregatorFactory>)],
44        })
45    }
46
47    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
48        // Merge: combine bucket counts and sub-agg results across segments
49        let mut merged: HashMap<String, (u64, HashMap<String, Vec<AggregationResult>>)> =
50            HashMap::new();
51
52        for r in results {
53            if let AggregationResult::Bucket(br) = r {
54                for bucket in br.buckets {
55                    if let BucketKey::String(key) = bucket.key {
56                        let entry = merged.entry(key).or_insert_with(|| (0, HashMap::new()));
57                        entry.0 += bucket.doc_count;
58                        for (name, result) in bucket.sub_aggs {
59                            entry.1.entry(name).or_default().push(result);
60                        }
61                    }
62                }
63            }
64        }
65
66        // Sort by count descending, take top N
67        let mut bucket_entries: Vec<_> = merged.into_iter().collect();
68        bucket_entries.sort_by(|a, b| b.1.0.cmp(&a.1.0));
69        bucket_entries.truncate(self.size);
70
71        let result_buckets = bucket_entries
72            .into_iter()
73            .map(|(key, (doc_count, sub_partials))| {
74                // Merge sub-agg results
75                let mut sub_aggs = HashMap::new();
76                for (_i, (name, factory)) in self.sub_agg_factories.iter().enumerate() {
77                    if let Some(partials) = sub_partials.get(name) {
78                        sub_aggs.insert(name.clone(), factory.merge_results(partials.clone()));
79                    }
80                }
81                Bucket {
82                    key: BucketKey::String(key),
83                    doc_count,
84                    sub_aggs,
85                }
86            })
87            .collect();
88
89        AggregationResult::Bucket(BucketResult {
90            buckets: result_buckets,
91        })
92    }
93}
94
95/// Per-bucket state: count + sub-agg collectors.
96struct BucketState {
97    count: u64,
98    sub_collectors: Vec<(String, Box<dyn Aggregator>)>,
99}
100
101/// Ordinal-indexed terms collector with lazy column access.
102/// See [[strategy-lazy-materialization]].
103struct TermsCollector {
104    col: Option<OwnedColumn>,
105    segment_data: *const SegmentReader,
106    /// Vec indexed by ordinal. None = ordinal not yet seen.
107    ordinal_buckets: Vec<Option<BucketState>>,
108    sub_agg_factories: *const [(String, Box<dyn AggregatorFactory>)],
109}
110
111unsafe impl Send for TermsCollector {}
112
113impl Aggregator for TermsCollector {
114    fn collect(&mut self, doc_id: DocId) {
115        let Some(ord) = self
116            .col
117            .as_ref()
118            .and_then(|c| c.keyword_ordinal(doc_id.as_u32()))
119        else {
120            return;
121        };
122        let ordinal = ord as usize;
123
124        // Ensure Vec is large enough
125        if ordinal >= self.ordinal_buckets.len() {
126            self.ordinal_buckets.resize_with(ordinal + 1, || None);
127        }
128
129        let reader = unsafe { &*self.segment_data };
130        let sub_factories = unsafe { &*self.sub_agg_factories };
131        let state = self.ordinal_buckets[ordinal].get_or_insert_with(|| {
132            let subs = sub_factories
133                .iter()
134                .map(|(name, factory)| (name.clone(), factory.create_collector(reader)))
135                .collect();
136            BucketState {
137                count: 0,
138                sub_collectors: subs,
139            }
140        });
141        state.count += 1;
142        for (_, collector) in &mut state.sub_collectors {
143            collector.collect(doc_id);
144        }
145    }
146
147    fn finish(self: Box<Self>) -> AggregationResult {
148        // Bulk consumer: materialize the keyword dict once so the per-ordinal
149        // resolve below indexes O(1) instead of block-seeking O(DICT_BLOCK_SIZE)
150        // each (no-op unless this is a blocked keyword column).
151        if let Some(col) = self.col.as_ref() {
152            col.ensure_dict();
153        }
154        let mut buckets: Vec<Bucket> = self
155            .ordinal_buckets
156            .into_iter()
157            .enumerate()
158            .filter_map(|(ordinal, state)| {
159                let state = state?;
160                let key = self
161                    .col
162                    .as_ref()
163                    .and_then(|c| c.ordinal_to_string(ordinal as u32))
164                    .unwrap_or("?")
165                    .to_string();
166                let sub_aggs: HashMap<String, AggregationResult> = state
167                    .sub_collectors
168                    .into_iter()
169                    .map(|(name, collector)| (name, collector.finish()))
170                    .collect();
171                Some(Bucket {
172                    key: BucketKey::String(key),
173                    doc_count: state.count,
174                    sub_aggs,
175                })
176            })
177            .collect();
178
179        buckets.sort_by(|a, b| b.doc_count.cmp(&a.doc_count));
180        AggregationResult::Bucket(BucketResult { buckets })
181    }
182}
183
184// --- Range aggregation ---
185
186pub struct RangeAggFactory {
187    pub field_name: String,
188    pub ranges: Vec<RangeDef>,
189}
190
191impl AggregatorFactory for RangeAggFactory {
192    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
193        let field_id = reader
194            .header()
195            .fields
196            .iter()
197            .find(|f| f.field_name == self.field_name)
198            .map(|f| f.field_id);
199
200        let col = OwnedColumn::new(field_id, reader);
201
202        Box::new(RangeCollector {
203            col,
204            ranges: self.ranges.clone(),
205            counts: vec![0u64; self.ranges.len()],
206        })
207    }
208
209    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
210        let mut merged_counts = vec![0u64; self.ranges.len()];
211
212        for r in results {
213            if let AggregationResult::Bucket(br) = r {
214                for (i, bucket) in br.buckets.iter().enumerate() {
215                    if i < merged_counts.len() {
216                        merged_counts[i] += bucket.doc_count;
217                    }
218                }
219            }
220        }
221
222        let buckets = self
223            .ranges
224            .iter()
225            .zip(merged_counts.iter())
226            .map(|(range, &count)| Bucket {
227                key: BucketKey::Range {
228                    from: range.from,
229                    to: range.to,
230                },
231                doc_count: count,
232                sub_aggs: HashMap::new(),
233            })
234            .collect();
235
236        AggregationResult::Bucket(BucketResult { buckets })
237    }
238}
239
240struct RangeCollector {
241    col: Option<OwnedColumn>,
242    ranges: Vec<RangeDef>,
243    counts: Vec<u64>,
244}
245
246unsafe impl Send for RangeCollector {}
247
248impl Aggregator for RangeCollector {
249    fn collect(&mut self, doc_id: DocId) {
250        let Some(v) = self
251            .col
252            .as_ref()
253            .and_then(|c| c.numeric_value(doc_id.as_u32()))
254        else {
255            return;
256        };
257
258        for (i, range) in self.ranges.iter().enumerate() {
259            let above_from = range.from.map_or(true, |f| v >= f);
260            let below_to = range.to.map_or(true, |t| v < t);
261            if above_from && below_to {
262                self.counts[i] += 1;
263            }
264        }
265    }
266
267    fn finish(self: Box<Self>) -> AggregationResult {
268        let buckets = self
269            .ranges
270            .iter()
271            .zip(self.counts.iter())
272            .map(|(range, &count)| Bucket {
273                key: BucketKey::Range {
274                    from: range.from,
275                    to: range.to,
276                },
277                doc_count: count,
278                sub_aggs: HashMap::new(),
279            })
280            .collect();
281        AggregationResult::Bucket(BucketResult { buckets })
282    }
283}
284
285// --- Histogram aggregation ---
286
287pub struct HistogramAggFactory {
288    pub field_name: String,
289    pub interval: f64,
290}
291
292impl AggregatorFactory for HistogramAggFactory {
293    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
294        let field_id = reader
295            .header()
296            .fields
297            .iter()
298            .find(|f| f.field_name == self.field_name)
299            .map(|f| f.field_id);
300
301        let col = OwnedColumn::new(field_id, reader);
302
303        Box::new(HistogramCollector {
304            col,
305            interval: self.interval,
306            buckets: HashMap::new(),
307        })
308    }
309
310    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
311        let mut merged: HashMap<i64, u64> = HashMap::new();
312
313        for r in results {
314            if let AggregationResult::Bucket(br) = r {
315                for bucket in br.buckets {
316                    if let BucketKey::Number(key) = bucket.key {
317                        *merged.entry(key as i64).or_insert(0) += bucket.doc_count;
318                    }
319                }
320            }
321        }
322
323        let mut buckets: Vec<Bucket> = merged
324            .into_iter()
325            .map(|(key, count)| Bucket {
326                key: BucketKey::Number(key as f64),
327                doc_count: count,
328                sub_aggs: HashMap::new(),
329            })
330            .collect();
331
332        buckets.sort_by(|a, b| {
333            let ka = if let BucketKey::Number(n) = a.key {
334                n
335            } else {
336                0.0
337            };
338            let kb = if let BucketKey::Number(n) = b.key {
339                n
340            } else {
341                0.0
342            };
343            ka.partial_cmp(&kb).unwrap_or(std::cmp::Ordering::Equal)
344        });
345
346        AggregationResult::Bucket(BucketResult { buckets })
347    }
348}
349
350struct HistogramCollector {
351    col: Option<OwnedColumn>,
352    interval: f64,
353    buckets: HashMap<i64, u64>,
354}
355
356unsafe impl Send for HistogramCollector {}
357
358impl Aggregator for HistogramCollector {
359    fn collect(&mut self, doc_id: DocId) {
360        let Some(v) = self
361            .col
362            .as_ref()
363            .and_then(|c| c.numeric_value(doc_id.as_u32()))
364        else {
365            return;
366        };
367
368        let bucket_key = (v / self.interval).floor() as i64 * self.interval as i64;
369        *self.buckets.entry(bucket_key).or_insert(0) += 1;
370    }
371
372    fn finish(self: Box<Self>) -> AggregationResult {
373        let mut buckets: Vec<Bucket> = self
374            .buckets
375            .into_iter()
376            .map(|(key, count)| Bucket {
377                key: BucketKey::Number(key as f64),
378                doc_count: count,
379                sub_aggs: HashMap::new(),
380            })
381            .collect();
382
383        buckets.sort_by(|a, b| {
384            let ka = if let BucketKey::Number(n) = a.key {
385                n
386            } else {
387                0.0
388            };
389            let kb = if let BucketKey::Number(n) = b.key {
390                n
391            } else {
392                0.0
393            };
394            ka.partial_cmp(&kb).unwrap_or(std::cmp::Ordering::Equal)
395        });
396
397        AggregationResult::Bucket(BucketResult { buckets })
398    }
399}
400
401// --- Date histogram aggregation ---
402
403pub struct DateHistogramAggFactory {
404    pub field_name: String,
405    pub interval: DateInterval,
406}
407
408impl AggregatorFactory for DateHistogramAggFactory {
409    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
410        let field_id = reader
411            .header()
412            .fields
413            .iter()
414            .find(|f| f.field_name == self.field_name)
415            .map(|f| f.field_id);
416        let col = OwnedColumn::new(field_id, reader);
417
418        Box::new(DateHistogramCollector {
419            col,
420            interval: self.interval.clone(),
421            buckets: HashMap::new(),
422        })
423    }
424
425    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
426        let mut merged: HashMap<i64, u64> = HashMap::new();
427        for r in results {
428            if let AggregationResult::Bucket(br) = r {
429                for bucket in br.buckets {
430                    if let BucketKey::Number(key) = bucket.key {
431                        *merged.entry(key as i64).or_insert(0) += bucket.doc_count;
432                    }
433                }
434            }
435        }
436        let mut buckets: Vec<Bucket> = merged
437            .into_iter()
438            .map(|(key, count)| Bucket {
439                key: BucketKey::Number(key as f64),
440                doc_count: count,
441                sub_aggs: HashMap::new(),
442            })
443            .collect();
444        buckets.sort_by(|a, b| {
445            let ka = if let BucketKey::Number(n) = a.key {
446                n
447            } else {
448                0.0
449            };
450            let kb = if let BucketKey::Number(n) = b.key {
451                n
452            } else {
453                0.0
454            };
455            ka.partial_cmp(&kb).unwrap_or(std::cmp::Ordering::Equal)
456        });
457        AggregationResult::Bucket(BucketResult { buckets })
458    }
459}
460
461struct DateHistogramCollector {
462    col: Option<OwnedColumn>,
463    interval: DateInterval,
464    buckets: HashMap<i64, u64>,
465}
466
467unsafe impl Send for DateHistogramCollector {}
468
469impl DateHistogramCollector {
470    /// Compute the bucket key (epoch millis of the interval start) for a value.
471    fn bucket_key(&self, epoch_millis: f64) -> i64 {
472        match &self.interval {
473            DateInterval::Fixed(ms) => ((epoch_millis / ms).floor() as i64) * (*ms as i64),
474            DateInterval::Calendar(cal) => calendar_floor(epoch_millis as i64, cal),
475        }
476    }
477}
478
479/// Floor an epoch millis timestamp to the start of a calendar interval.
480/// Simple UTC-only implementation (no timezone support).
481fn calendar_floor(epoch_ms: i64, interval: &CalendarInterval) -> i64 {
482    const MS_PER_SEC: i64 = 1_000;
483    const MS_PER_MIN: i64 = 60 * MS_PER_SEC;
484    const MS_PER_HOUR: i64 = 60 * MS_PER_MIN;
485    const MS_PER_DAY: i64 = 24 * MS_PER_HOUR;
486
487    match interval {
488        CalendarInterval::Minute => (epoch_ms / MS_PER_MIN) * MS_PER_MIN,
489        CalendarInterval::Hour => (epoch_ms / MS_PER_HOUR) * MS_PER_HOUR,
490        CalendarInterval::Day => (epoch_ms / MS_PER_DAY) * MS_PER_DAY,
491        CalendarInterval::Week => {
492            // ISO week: Monday is day 0. Epoch (1970-01-01) was Thursday (day 3).
493            let days = epoch_ms / MS_PER_DAY;
494            let week_start = days - ((days + 3) % 7); // align to Monday
495            week_start * MS_PER_DAY
496        }
497        CalendarInterval::Month | CalendarInterval::Quarter | CalendarInterval::Year => {
498            // Convert epoch_ms to (year, month, day) then floor
499            let days_since_epoch = epoch_ms / MS_PER_DAY;
500            let (y, m, _d) = days_to_ymd(days_since_epoch);
501            let floored_month = match interval {
502                CalendarInterval::Month => m,
503                CalendarInterval::Quarter => ((m - 1) / 3) * 3 + 1,
504                CalendarInterval::Year => 1,
505                _ => unreachable!(),
506            };
507            let floored_year = if matches!(interval, CalendarInterval::Year) {
508                y
509            } else {
510                y
511            };
512            ymd_to_epoch_ms(floored_year, floored_month, 1)
513        }
514    }
515}
516
517/// Convert days since epoch to (year, month, day). Simplified algorithm.
518fn days_to_ymd(days: i64) -> (i32, u32, u32) {
519    // Algorithm from Howard Hinnant's civil_from_days
520    let z = days + 719468;
521    let era = if z >= 0 { z } else { z - 146096 } / 146097;
522    let doe = (z - era * 146097) as u32;
523    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
524    let y = yoe as i64 + era * 400;
525    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
526    let mp = (5 * doy + 2) / 153;
527    let d = doy - (153 * mp + 2) / 5 + 1;
528    let m = if mp < 10 { mp + 3 } else { mp - 9 };
529    let y = if m <= 2 { y + 1 } else { y };
530    (y as i32, m, d)
531}
532
533/// Convert (year, month, day) to epoch milliseconds.
534fn ymd_to_epoch_ms(y: i32, m: u32, d: u32) -> i64 {
535    let y = if m <= 2 { y as i64 - 1 } else { y as i64 };
536    let era = if y >= 0 { y } else { y - 399 } / 400;
537    let yoe = (y - era * 400) as u32;
538    let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d - 1;
539    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
540    let days = era * 146097 + doe as i64 - 719468;
541    days * 86_400_000
542}
543
544impl Aggregator for DateHistogramCollector {
545    fn collect(&mut self, doc_id: DocId) {
546        let Some(v) = self
547            .col
548            .as_ref()
549            .and_then(|c| c.numeric_value(doc_id.as_u32()))
550        else {
551            return;
552        };
553        let key = self.bucket_key(v);
554        *self.buckets.entry(key).or_insert(0) += 1;
555    }
556
557    fn finish(self: Box<Self>) -> AggregationResult {
558        let mut buckets: Vec<Bucket> = self
559            .buckets
560            .into_iter()
561            .map(|(key, count)| Bucket {
562                key: BucketKey::Number(key as f64),
563                doc_count: count,
564                sub_aggs: HashMap::new(),
565            })
566            .collect();
567        buckets.sort_by(|a, b| {
568            let ka = if let BucketKey::Number(n) = a.key {
569                n
570            } else {
571                0.0
572            };
573            let kb = if let BucketKey::Number(n) = b.key {
574                n
575            } else {
576                0.0
577            };
578            ka.partial_cmp(&kb).unwrap_or(std::cmp::Ordering::Equal)
579        });
580        AggregationResult::Bucket(BucketResult { buckets })
581    }
582}
583
584// --- Nested aggregation ---
585
586pub struct NestedAggFactory {
587    pub path: String,
588    pub sub_agg_factories: Vec<(String, Box<dyn AggregatorFactory>)>,
589}
590
591impl AggregatorFactory for NestedAggFactory {
592    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
593        let parent_bitset = reader.parent_bitset().map(|b| b.to_vec());
594        let sub_collectors: Vec<(String, Box<dyn Aggregator>)> = self
595            .sub_agg_factories
596            .iter()
597            .map(|(name, factory)| (name.clone(), factory.create_collector(reader)))
598            .collect();
599        Box::new(NestedAggregator {
600            parent_bitset,
601            sub_collectors,
602        })
603    }
604
605    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
606        // Nested agg is a single-bucket agg: merge doc_counts and sub-aggs
607        let mut total_count = 0u64;
608        let mut sub_results: HashMap<String, Vec<AggregationResult>> = HashMap::new();
609
610        for r in results {
611            if let AggregationResult::Bucket(br) = r {
612                for b in br.buckets {
613                    total_count += b.doc_count;
614                    for (name, sub_r) in b.sub_aggs {
615                        sub_results.entry(name).or_default().push(sub_r);
616                    }
617                }
618            }
619        }
620
621        let mut merged_sub_aggs = HashMap::new();
622        for ((name, factory), (_, results)) in self.sub_agg_factories.iter().zip(sub_results.iter())
623        {
624            merged_sub_aggs.insert(name.clone(), factory.merge_results(results.clone()));
625        }
626
627        AggregationResult::Bucket(BucketResult {
628            buckets: vec![Bucket {
629                key: BucketKey::String("nested".into()),
630                doc_count: total_count,
631                sub_aggs: merged_sub_aggs,
632            }],
633        })
634    }
635}
636
637struct NestedAggregator {
638    parent_bitset: Option<Vec<bool>>,
639    sub_collectors: Vec<(String, Box<dyn Aggregator>)>,
640}
641
642unsafe impl Send for NestedAggregator {}
643
644impl NestedAggregator {
645    /// Given a parent doc_id, find all nested child doc_ids.
646    fn children_of(&self, parent_doc: u32) -> Vec<u32> {
647        let Some(bitset) = &self.parent_bitset else {
648            return vec![];
649        };
650        let mut children = Vec::new();
651        let start = parent_doc as usize + 1;
652        for i in start..bitset.len() {
653            if bitset[i] {
654                break; // next parent
655            }
656            children.push(i as u32);
657        }
658        children
659    }
660}
661
662impl Aggregator for NestedAggregator {
663    fn collect(&mut self, doc_id: DocId) {
664        // Expand parent doc to its nested children and collect each
665        let children = self.children_of(doc_id.as_u32());
666        for child_id in children {
667            for (_, collector) in &mut self.sub_collectors {
668                collector.collect(DocId::new(child_id));
669            }
670        }
671    }
672
673    fn finish(self: Box<Self>) -> AggregationResult {
674        let mut sub_aggs = HashMap::new();
675        let mut total_children = 0u64;
676        for (name, collector) in self.sub_collectors {
677            let result = collector.finish();
678            // Count total nested docs collected
679            if let AggregationResult::Bucket(ref br) = result {
680                for b in &br.buckets {
681                    total_children += b.doc_count;
682                }
683            }
684            sub_aggs.insert(name, result);
685        }
686        AggregationResult::Bucket(BucketResult {
687            buckets: vec![Bucket {
688                key: BucketKey::String("nested".into()),
689                doc_count: total_children,
690                sub_aggs,
691            }],
692        })
693    }
694}
695
696// --- Reverse nested aggregation ---
697
698pub struct ReverseNestedAggFactory {
699    pub sub_agg_factories: Vec<(String, Box<dyn AggregatorFactory>)>,
700}
701
702impl AggregatorFactory for ReverseNestedAggFactory {
703    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
704        let parent_bitset = reader.parent_bitset().map(|b| b.to_vec());
705        let sub_collectors: Vec<(String, Box<dyn Aggregator>)> = self
706            .sub_agg_factories
707            .iter()
708            .map(|(name, factory)| (name.clone(), factory.create_collector(reader)))
709            .collect();
710        Box::new(ReverseNestedAggregator {
711            parent_bitset,
712            sub_collectors,
713            seen_parents: std::collections::HashSet::new(),
714        })
715    }
716
717    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
718        let mut total_count = 0u64;
719        let mut sub_results: HashMap<String, Vec<AggregationResult>> = HashMap::new();
720
721        for r in results {
722            if let AggregationResult::Bucket(br) = r {
723                for b in br.buckets {
724                    total_count += b.doc_count;
725                    for (name, sub_r) in b.sub_aggs {
726                        sub_results.entry(name).or_default().push(sub_r);
727                    }
728                }
729            }
730        }
731
732        let mut merged_sub_aggs = HashMap::new();
733        for ((name, factory), (_, results)) in self.sub_agg_factories.iter().zip(sub_results.iter())
734        {
735            merged_sub_aggs.insert(name.clone(), factory.merge_results(results.clone()));
736        }
737
738        AggregationResult::Bucket(BucketResult {
739            buckets: vec![Bucket {
740                key: BucketKey::String("reverse_nested".into()),
741                doc_count: total_count,
742                sub_aggs: merged_sub_aggs,
743            }],
744        })
745    }
746}
747
748struct ReverseNestedAggregator {
749    parent_bitset: Option<Vec<bool>>,
750    sub_collectors: Vec<(String, Box<dyn Aggregator>)>,
751    /// Track which parents we've already collected to avoid duplicates
752    /// (multiple nested children may map to the same parent).
753    seen_parents: std::collections::HashSet<u32>,
754}
755
756unsafe impl Send for ReverseNestedAggregator {}
757
758impl ReverseNestedAggregator {
759    /// Find the parent doc for a nested child doc.
760    fn find_parent(&self, nested_doc: u32) -> Option<u32> {
761        let bitset = self.parent_bitset.as_ref()?;
762        let mut i = nested_doc as usize;
763        while i > 0 {
764            if i < bitset.len() && bitset[i] {
765                return Some(i as u32);
766            }
767            i -= 1;
768        }
769        if !bitset.is_empty() && bitset[0] {
770            Some(0)
771        } else {
772            None
773        }
774    }
775}
776
777impl Aggregator for ReverseNestedAggregator {
778    fn collect(&mut self, doc_id: DocId) {
779        // Map nested child doc back to its parent
780        if let Some(parent_id) = self.find_parent(doc_id.as_u32()) {
781            if self.seen_parents.insert(parent_id) {
782                for (_, collector) in &mut self.sub_collectors {
783                    collector.collect(DocId::new(parent_id));
784                }
785            }
786        }
787    }
788
789    fn finish(self: Box<Self>) -> AggregationResult {
790        let parent_count = self.seen_parents.len() as u64;
791        let mut sub_aggs = HashMap::new();
792        for (name, collector) in self.sub_collectors {
793            sub_aggs.insert(name, collector.finish());
794        }
795        AggregationResult::Bucket(BucketResult {
796            buckets: vec![Bucket {
797                key: BucketKey::String("reverse_nested".into()),
798                doc_count: parent_count,
799                sub_aggs,
800            }],
801        })
802    }
803}
804
805// --- Geohash grid aggregation ---
806
807pub struct GeohashGridAggFactory {
808    pub field_name: String,
809    pub precision: usize,
810    pub size: usize,
811}
812
813impl AggregatorFactory for GeohashGridAggFactory {
814    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
815        let field_id = reader
816            .header()
817            .fields
818            .iter()
819            .find(|f| f.field_name == self.field_name)
820            .map(|f| f.field_id);
821        let store = field_id.and_then(|fid| reader.geo_points(fid));
822        Box::new(GeohashGridCollector {
823            store,
824            precision: self.precision,
825            buckets: HashMap::new(),
826        })
827    }
828
829    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
830        let mut merged: HashMap<String, u64> = HashMap::new();
831        for r in results {
832            if let AggregationResult::Bucket(br) = r {
833                for b in br.buckets {
834                    if let BucketKey::String(key) = b.key {
835                        *merged.entry(key).or_insert(0) += b.doc_count;
836                    }
837                }
838            }
839        }
840        let mut buckets: Vec<Bucket> = merged
841            .into_iter()
842            .map(|(key, count)| Bucket {
843                key: BucketKey::String(key),
844                doc_count: count,
845                sub_aggs: HashMap::new(),
846            })
847            .collect();
848        // Sort by doc_count descending, truncate to size
849        buckets.sort_by(|a, b| b.doc_count.cmp(&a.doc_count));
850        buckets.truncate(self.size);
851        AggregationResult::Bucket(BucketResult { buckets })
852    }
853}
854
855struct GeohashGridCollector {
856    store: Option<crate::spatial::geo::GeoPointStore>,
857    precision: usize,
858    buckets: HashMap<String, u64>,
859}
860
861unsafe impl Send for GeohashGridCollector {}
862
863impl Aggregator for GeohashGridCollector {
864    fn collect(&mut self, doc_id: DocId) {
865        if let Some(store) = &self.store {
866            if let Some(point) = store.get(doc_id.as_u32()) {
867                if let Ok(hash) = geohash::encode(
868                    geohash::Coord {
869                        x: point.lon,
870                        y: point.lat,
871                    },
872                    self.precision,
873                ) {
874                    *self.buckets.entry(hash).or_insert(0) += 1;
875                }
876            }
877        }
878    }
879
880    fn finish(self: Box<Self>) -> AggregationResult {
881        let mut buckets: Vec<Bucket> = self
882            .buckets
883            .into_iter()
884            .map(|(key, count)| Bucket {
885                key: BucketKey::String(key),
886                doc_count: count,
887                sub_aggs: HashMap::new(),
888            })
889            .collect();
890        buckets.sort_by(|a, b| b.doc_count.cmp(&a.doc_count));
891        AggregationResult::Bucket(BucketResult { buckets })
892    }
893}
894
895// --- Top hits aggregation ---
896
897pub struct TopHitsAggFactory {
898    pub size: usize,
899}
900
901impl AggregatorFactory for TopHitsAggFactory {
902    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
903        Box::new(TopHitsCollector {
904            segment: reader as *const SegmentReader,
905            doc_ids: Vec::new(),
906            size: self.size,
907        })
908    }
909
910    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
911        let mut all_hits: Vec<serde_json::Value> = Vec::new();
912        for r in results {
913            if let AggregationResult::Hits(h) = r {
914                all_hits.extend(h.hits);
915            }
916        }
917        all_hits.truncate(self.size);
918        AggregationResult::Hits(HitsResult { hits: all_hits })
919    }
920}
921
922struct TopHitsCollector {
923    segment: *const SegmentReader,
924    doc_ids: Vec<u32>,
925    size: usize,
926}
927
928unsafe impl Send for TopHitsCollector {}
929
930impl Aggregator for TopHitsCollector {
931    fn collect(&mut self, doc_id: DocId) {
932        if self.doc_ids.len() < self.size {
933            self.doc_ids.push(doc_id.as_u32());
934        }
935    }
936
937    fn finish(self: Box<Self>) -> AggregationResult {
938        let reader = unsafe { &*self.segment };
939        let doc_store = reader.doc_store();
940        let mut hits = Vec::new();
941        for &doc_id in &self.doc_ids {
942            if let Some(source_bytes) = doc_store.get(doc_id) {
943                if let Ok(source) = serde_json::from_slice::<serde_json::Value>(&source_bytes) {
944                    hits.push(serde_json::json!({
945                        "_doc_id": doc_id,
946                        "_source": source,
947                    }));
948                }
949            }
950        }
951        AggregationResult::Hits(HitsResult { hits })
952    }
953}
954
955// --- Filter aggregation ---
956
957pub struct FilterAggFactory {
958    pub(crate) bound_query: Box<dyn crate::query::BoundQuery>,
959    pub(crate) sub_agg_factories: Vec<(String, Box<dyn AggregatorFactory>)>,
960}
961
962impl AggregatorFactory for FilterAggFactory {
963    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
964        // Run the pre-bound filter query against this segment to build a
965        // bitset. The query was bound once at AggregationExpression::bind
966        // time against the real searcher, so a bad-field filter already
967        // surfaced as `Err` there. See [[feature-knn-query-type]] §4b.
968        let doc_count = reader.doc_count() as usize;
969        let mut bitset = vec![false; doc_count];
970
971        if let Ok(Some(supplier)) = self.bound_query.scorer_supplier(reader) {
972            if let Ok(mut scorer) = supplier.scorer() {
973                while scorer.doc_id() != crate::core::NO_MORE_DOCS {
974                    let id = scorer.doc_id().as_u32() as usize;
975                    if id < doc_count {
976                        bitset[id] = true;
977                    }
978                    scorer.next();
979                }
980            }
981        }
982
983        // Create sub-collectors for sub-aggregations
984        let sub_collectors: Vec<(String, Box<dyn Aggregator>)> = self
985            .sub_agg_factories
986            .iter()
987            .map(|(name, factory)| (name.clone(), factory.create_collector(reader)))
988            .collect();
989
990        Box::new(FilterCollector {
991            bitset,
992            count: 0,
993            sub_collectors,
994        })
995    }
996
997    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
998        let mut total = 0u64;
999        let mut sub_partials: HashMap<String, Vec<AggregationResult>> = HashMap::new();
1000
1001        for r in results {
1002            if let AggregationResult::Bucket(br) = r {
1003                for b in br.buckets {
1004                    total += b.doc_count;
1005                    for (name, result) in b.sub_aggs {
1006                        sub_partials.entry(name).or_default().push(result);
1007                    }
1008                }
1009            }
1010        }
1011
1012        // Merge sub-aggregation results
1013        let mut sub_aggs = HashMap::new();
1014        for (name, factory) in &self.sub_agg_factories {
1015            if let Some(partials) = sub_partials.remove(name) {
1016                sub_aggs.insert(name.clone(), factory.merge_results(partials));
1017            }
1018        }
1019
1020        AggregationResult::Bucket(BucketResult {
1021            buckets: vec![Bucket {
1022                key: BucketKey::String("filter".into()),
1023                doc_count: total,
1024                sub_aggs,
1025            }],
1026        })
1027    }
1028}
1029
1030struct FilterCollector {
1031    bitset: Vec<bool>,
1032    count: u64,
1033    sub_collectors: Vec<(String, Box<dyn Aggregator>)>,
1034}
1035
1036// --- Filters (multi-filter) aggregation ---
1037
1038pub struct FiltersAggFactory {
1039    /// Named filters: (bucket_name, pre-bound query)
1040    pub(crate) filters: Vec<(String, Box<dyn crate::query::BoundQuery>)>,
1041}
1042
1043impl AggregatorFactory for FiltersAggFactory {
1044    fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
1045        // Run each pre-bound filter query against this segment to build a
1046        // per-filter bitset. The queries were bound once at
1047        // AggregationExpression::bind time against the real searcher, so a
1048        // bad-field filter already surfaced as `Err` there. See
1049        // [[feature-knn-query-type]] §4b.
1050        let mut filter_bitsets: Vec<(String, Vec<bool>)> = Vec::new();
1051        let doc_count = reader.doc_count() as usize;
1052
1053        for (name, bound_query) in &self.filters {
1054            let mut bitset = vec![false; doc_count];
1055            if let Ok(Some(supplier)) = bound_query.scorer_supplier(reader) {
1056                if let Ok(mut scorer) = supplier.scorer() {
1057                    while scorer.doc_id() != crate::core::NO_MORE_DOCS {
1058                        let id = scorer.doc_id().as_u32() as usize;
1059                        if id < doc_count {
1060                            bitset[id] = true;
1061                        }
1062                        scorer.next();
1063                    }
1064                }
1065            }
1066            filter_bitsets.push((name.clone(), bitset));
1067        }
1068
1069        Box::new(FiltersCollector {
1070            filter_bitsets,
1071            counts: vec![0u64; self.filters.len()],
1072        })
1073    }
1074
1075    fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
1076        let num_filters = self.filters.len();
1077        let mut totals = vec![0u64; num_filters];
1078        let names: Vec<String> = self.filters.iter().map(|(n, _)| n.clone()).collect();
1079
1080        for r in results {
1081            if let AggregationResult::Bucket(br) = r {
1082                for (i, b) in br.buckets.iter().enumerate() {
1083                    if i < num_filters {
1084                        totals[i] += b.doc_count;
1085                    }
1086                }
1087            }
1088        }
1089
1090        AggregationResult::Bucket(BucketResult {
1091            buckets: names
1092                .iter()
1093                .zip(totals.iter())
1094                .map(|(name, &count)| Bucket {
1095                    key: BucketKey::String(name.clone()),
1096                    doc_count: count,
1097                    sub_aggs: HashMap::new(),
1098                })
1099                .collect(),
1100        })
1101    }
1102}
1103
1104struct FiltersCollector {
1105    filter_bitsets: Vec<(String, Vec<bool>)>,
1106    counts: Vec<u64>,
1107}
1108
1109unsafe impl Send for FiltersCollector {}
1110
1111impl Aggregator for FiltersCollector {
1112    fn collect(&mut self, doc_id: DocId) {
1113        let id = doc_id.as_u32() as usize;
1114        for (i, (_, bitset)) in self.filter_bitsets.iter().enumerate() {
1115            if id < bitset.len() && bitset[id] {
1116                self.counts[i] += 1;
1117            }
1118        }
1119    }
1120
1121    fn finish(self: Box<Self>) -> AggregationResult {
1122        AggregationResult::Bucket(BucketResult {
1123            buckets: self
1124                .filter_bitsets
1125                .iter()
1126                .zip(self.counts.iter())
1127                .map(|((name, _), &count)| Bucket {
1128                    key: BucketKey::String(name.clone()),
1129                    doc_count: count,
1130                    sub_aggs: HashMap::new(),
1131                })
1132                .collect(),
1133        })
1134    }
1135}
1136
1137impl Aggregator for FilterCollector {
1138    fn collect(&mut self, doc_id: DocId) {
1139        let id = doc_id.as_u32() as usize;
1140        if id < self.bitset.len() && self.bitset[id] {
1141            self.count += 1;
1142            for (_, collector) in &mut self.sub_collectors {
1143                collector.collect(doc_id);
1144            }
1145        }
1146    }
1147
1148    fn finish(self: Box<Self>) -> AggregationResult {
1149        let mut sub_aggs = HashMap::new();
1150        for (name, collector) in self.sub_collectors {
1151            sub_aggs.insert(name, collector.finish());
1152        }
1153        AggregationResult::Bucket(BucketResult {
1154            buckets: vec![Bucket {
1155                key: BucketKey::String("filter".into()),
1156                doc_count: self.count,
1157                sub_aggs,
1158            }],
1159        })
1160    }
1161}