1use crate::core::DocId;
6use std::collections::HashMap;
7
8use super::*;
9use crate::segment::reader::SegmentReader;
10
11pub(super) use crate::columnar::owned::OwnedColumn;
17
18pub struct TermsAggFactory {
21 pub field_name: String,
22 pub size: usize,
23 pub sub_agg_factories: Vec<(String, Box<dyn AggregatorFactory>)>,
24}
25
26impl AggregatorFactory for TermsAggFactory {
27 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
28 let field_id = reader
29 .header()
30 .fields
31 .iter()
32 .find(|f| f.field_name == self.field_name)
33 .map(|f| f.field_id);
34
35 let col = OwnedColumn::new(field_id, reader);
36 let dict_size = col.as_ref().map_or(0, |c| c.dict_size());
37
38 Box::new(TermsCollector {
39 col,
40 segment_data: reader as *const SegmentReader,
41 ordinal_buckets: Vec::with_capacity(dict_size),
42 sub_agg_factories: self.sub_agg_factories.as_slice()
43 as *const [(String, Box<dyn AggregatorFactory>)],
44 })
45 }
46
47 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
48 let mut merged: HashMap<String, (u64, HashMap<String, Vec<AggregationResult>>)> =
50 HashMap::new();
51
52 for r in results {
53 if let AggregationResult::Bucket(br) = r {
54 for bucket in br.buckets {
55 if let BucketKey::String(key) = bucket.key {
56 let entry = merged.entry(key).or_insert_with(|| (0, HashMap::new()));
57 entry.0 += bucket.doc_count;
58 for (name, result) in bucket.sub_aggs {
59 entry.1.entry(name).or_default().push(result);
60 }
61 }
62 }
63 }
64 }
65
66 let mut bucket_entries: Vec<_> = merged.into_iter().collect();
68 bucket_entries.sort_by(|a, b| b.1.0.cmp(&a.1.0));
69 bucket_entries.truncate(self.size);
70
71 let result_buckets = bucket_entries
72 .into_iter()
73 .map(|(key, (doc_count, sub_partials))| {
74 let mut sub_aggs = HashMap::new();
76 for (_i, (name, factory)) in self.sub_agg_factories.iter().enumerate() {
77 if let Some(partials) = sub_partials.get(name) {
78 sub_aggs.insert(name.clone(), factory.merge_results(partials.clone()));
79 }
80 }
81 Bucket {
82 key: BucketKey::String(key),
83 doc_count,
84 sub_aggs,
85 }
86 })
87 .collect();
88
89 AggregationResult::Bucket(BucketResult {
90 buckets: result_buckets,
91 })
92 }
93}
94
95struct BucketState {
97 count: u64,
98 sub_collectors: Vec<(String, Box<dyn Aggregator>)>,
99}
100
101struct TermsCollector {
104 col: Option<OwnedColumn>,
105 segment_data: *const SegmentReader,
106 ordinal_buckets: Vec<Option<BucketState>>,
108 sub_agg_factories: *const [(String, Box<dyn AggregatorFactory>)],
109}
110
111unsafe impl Send for TermsCollector {}
112
113impl Aggregator for TermsCollector {
114 fn collect(&mut self, doc_id: DocId) {
115 let Some(ord) = self
116 .col
117 .as_ref()
118 .and_then(|c| c.keyword_ordinal(doc_id.as_u32()))
119 else {
120 return;
121 };
122 let ordinal = ord as usize;
123
124 if ordinal >= self.ordinal_buckets.len() {
126 self.ordinal_buckets.resize_with(ordinal + 1, || None);
127 }
128
129 let reader = unsafe { &*self.segment_data };
130 let sub_factories = unsafe { &*self.sub_agg_factories };
131 let state = self.ordinal_buckets[ordinal].get_or_insert_with(|| {
132 let subs = sub_factories
133 .iter()
134 .map(|(name, factory)| (name.clone(), factory.create_collector(reader)))
135 .collect();
136 BucketState {
137 count: 0,
138 sub_collectors: subs,
139 }
140 });
141 state.count += 1;
142 for (_, collector) in &mut state.sub_collectors {
143 collector.collect(doc_id);
144 }
145 }
146
147 fn finish(self: Box<Self>) -> AggregationResult {
148 if let Some(col) = self.col.as_ref() {
152 col.ensure_dict();
153 }
154 let mut buckets: Vec<Bucket> = self
155 .ordinal_buckets
156 .into_iter()
157 .enumerate()
158 .filter_map(|(ordinal, state)| {
159 let state = state?;
160 let key = self
161 .col
162 .as_ref()
163 .and_then(|c| c.ordinal_to_string(ordinal as u32))
164 .unwrap_or("?")
165 .to_string();
166 let sub_aggs: HashMap<String, AggregationResult> = state
167 .sub_collectors
168 .into_iter()
169 .map(|(name, collector)| (name, collector.finish()))
170 .collect();
171 Some(Bucket {
172 key: BucketKey::String(key),
173 doc_count: state.count,
174 sub_aggs,
175 })
176 })
177 .collect();
178
179 buckets.sort_by(|a, b| b.doc_count.cmp(&a.doc_count));
180 AggregationResult::Bucket(BucketResult { buckets })
181 }
182}
183
184pub struct RangeAggFactory {
187 pub field_name: String,
188 pub ranges: Vec<RangeDef>,
189}
190
191impl AggregatorFactory for RangeAggFactory {
192 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
193 let field_id = reader
194 .header()
195 .fields
196 .iter()
197 .find(|f| f.field_name == self.field_name)
198 .map(|f| f.field_id);
199
200 let col = OwnedColumn::new(field_id, reader);
201
202 Box::new(RangeCollector {
203 col,
204 ranges: self.ranges.clone(),
205 counts: vec![0u64; self.ranges.len()],
206 })
207 }
208
209 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
210 let mut merged_counts = vec![0u64; self.ranges.len()];
211
212 for r in results {
213 if let AggregationResult::Bucket(br) = r {
214 for (i, bucket) in br.buckets.iter().enumerate() {
215 if i < merged_counts.len() {
216 merged_counts[i] += bucket.doc_count;
217 }
218 }
219 }
220 }
221
222 let buckets = self
223 .ranges
224 .iter()
225 .zip(merged_counts.iter())
226 .map(|(range, &count)| Bucket {
227 key: BucketKey::Range {
228 from: range.from,
229 to: range.to,
230 },
231 doc_count: count,
232 sub_aggs: HashMap::new(),
233 })
234 .collect();
235
236 AggregationResult::Bucket(BucketResult { buckets })
237 }
238}
239
240struct RangeCollector {
241 col: Option<OwnedColumn>,
242 ranges: Vec<RangeDef>,
243 counts: Vec<u64>,
244}
245
246unsafe impl Send for RangeCollector {}
247
248impl Aggregator for RangeCollector {
249 fn collect(&mut self, doc_id: DocId) {
250 let Some(v) = self
251 .col
252 .as_ref()
253 .and_then(|c| c.numeric_value(doc_id.as_u32()))
254 else {
255 return;
256 };
257
258 for (i, range) in self.ranges.iter().enumerate() {
259 let above_from = range.from.map_or(true, |f| v >= f);
260 let below_to = range.to.map_or(true, |t| v < t);
261 if above_from && below_to {
262 self.counts[i] += 1;
263 }
264 }
265 }
266
267 fn finish(self: Box<Self>) -> AggregationResult {
268 let buckets = self
269 .ranges
270 .iter()
271 .zip(self.counts.iter())
272 .map(|(range, &count)| Bucket {
273 key: BucketKey::Range {
274 from: range.from,
275 to: range.to,
276 },
277 doc_count: count,
278 sub_aggs: HashMap::new(),
279 })
280 .collect();
281 AggregationResult::Bucket(BucketResult { buckets })
282 }
283}
284
285pub struct HistogramAggFactory {
288 pub field_name: String,
289 pub interval: f64,
290}
291
292impl AggregatorFactory for HistogramAggFactory {
293 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
294 let field_id = reader
295 .header()
296 .fields
297 .iter()
298 .find(|f| f.field_name == self.field_name)
299 .map(|f| f.field_id);
300
301 let col = OwnedColumn::new(field_id, reader);
302
303 Box::new(HistogramCollector {
304 col,
305 interval: self.interval,
306 buckets: HashMap::new(),
307 })
308 }
309
310 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
311 let mut merged: HashMap<i64, u64> = HashMap::new();
312
313 for r in results {
314 if let AggregationResult::Bucket(br) = r {
315 for bucket in br.buckets {
316 if let BucketKey::Number(key) = bucket.key {
317 *merged.entry(key as i64).or_insert(0) += bucket.doc_count;
318 }
319 }
320 }
321 }
322
323 let mut buckets: Vec<Bucket> = merged
324 .into_iter()
325 .map(|(key, count)| Bucket {
326 key: BucketKey::Number(key as f64),
327 doc_count: count,
328 sub_aggs: HashMap::new(),
329 })
330 .collect();
331
332 buckets.sort_by(|a, b| {
333 let ka = if let BucketKey::Number(n) = a.key {
334 n
335 } else {
336 0.0
337 };
338 let kb = if let BucketKey::Number(n) = b.key {
339 n
340 } else {
341 0.0
342 };
343 ka.partial_cmp(&kb).unwrap_or(std::cmp::Ordering::Equal)
344 });
345
346 AggregationResult::Bucket(BucketResult { buckets })
347 }
348}
349
350struct HistogramCollector {
351 col: Option<OwnedColumn>,
352 interval: f64,
353 buckets: HashMap<i64, u64>,
354}
355
356unsafe impl Send for HistogramCollector {}
357
358impl Aggregator for HistogramCollector {
359 fn collect(&mut self, doc_id: DocId) {
360 let Some(v) = self
361 .col
362 .as_ref()
363 .and_then(|c| c.numeric_value(doc_id.as_u32()))
364 else {
365 return;
366 };
367
368 let bucket_key = (v / self.interval).floor() as i64 * self.interval as i64;
369 *self.buckets.entry(bucket_key).or_insert(0) += 1;
370 }
371
372 fn finish(self: Box<Self>) -> AggregationResult {
373 let mut buckets: Vec<Bucket> = self
374 .buckets
375 .into_iter()
376 .map(|(key, count)| Bucket {
377 key: BucketKey::Number(key as f64),
378 doc_count: count,
379 sub_aggs: HashMap::new(),
380 })
381 .collect();
382
383 buckets.sort_by(|a, b| {
384 let ka = if let BucketKey::Number(n) = a.key {
385 n
386 } else {
387 0.0
388 };
389 let kb = if let BucketKey::Number(n) = b.key {
390 n
391 } else {
392 0.0
393 };
394 ka.partial_cmp(&kb).unwrap_or(std::cmp::Ordering::Equal)
395 });
396
397 AggregationResult::Bucket(BucketResult { buckets })
398 }
399}
400
401pub struct DateHistogramAggFactory {
404 pub field_name: String,
405 pub interval: DateInterval,
406}
407
408impl AggregatorFactory for DateHistogramAggFactory {
409 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
410 let field_id = reader
411 .header()
412 .fields
413 .iter()
414 .find(|f| f.field_name == self.field_name)
415 .map(|f| f.field_id);
416 let col = OwnedColumn::new(field_id, reader);
417
418 Box::new(DateHistogramCollector {
419 col,
420 interval: self.interval.clone(),
421 buckets: HashMap::new(),
422 })
423 }
424
425 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
426 let mut merged: HashMap<i64, u64> = HashMap::new();
427 for r in results {
428 if let AggregationResult::Bucket(br) = r {
429 for bucket in br.buckets {
430 if let BucketKey::Number(key) = bucket.key {
431 *merged.entry(key as i64).or_insert(0) += bucket.doc_count;
432 }
433 }
434 }
435 }
436 let mut buckets: Vec<Bucket> = merged
437 .into_iter()
438 .map(|(key, count)| Bucket {
439 key: BucketKey::Number(key as f64),
440 doc_count: count,
441 sub_aggs: HashMap::new(),
442 })
443 .collect();
444 buckets.sort_by(|a, b| {
445 let ka = if let BucketKey::Number(n) = a.key {
446 n
447 } else {
448 0.0
449 };
450 let kb = if let BucketKey::Number(n) = b.key {
451 n
452 } else {
453 0.0
454 };
455 ka.partial_cmp(&kb).unwrap_or(std::cmp::Ordering::Equal)
456 });
457 AggregationResult::Bucket(BucketResult { buckets })
458 }
459}
460
461struct DateHistogramCollector {
462 col: Option<OwnedColumn>,
463 interval: DateInterval,
464 buckets: HashMap<i64, u64>,
465}
466
467unsafe impl Send for DateHistogramCollector {}
468
469impl DateHistogramCollector {
470 fn bucket_key(&self, epoch_millis: f64) -> i64 {
472 match &self.interval {
473 DateInterval::Fixed(ms) => ((epoch_millis / ms).floor() as i64) * (*ms as i64),
474 DateInterval::Calendar(cal) => calendar_floor(epoch_millis as i64, cal),
475 }
476 }
477}
478
479fn calendar_floor(epoch_ms: i64, interval: &CalendarInterval) -> i64 {
482 const MS_PER_SEC: i64 = 1_000;
483 const MS_PER_MIN: i64 = 60 * MS_PER_SEC;
484 const MS_PER_HOUR: i64 = 60 * MS_PER_MIN;
485 const MS_PER_DAY: i64 = 24 * MS_PER_HOUR;
486
487 match interval {
488 CalendarInterval::Minute => (epoch_ms / MS_PER_MIN) * MS_PER_MIN,
489 CalendarInterval::Hour => (epoch_ms / MS_PER_HOUR) * MS_PER_HOUR,
490 CalendarInterval::Day => (epoch_ms / MS_PER_DAY) * MS_PER_DAY,
491 CalendarInterval::Week => {
492 let days = epoch_ms / MS_PER_DAY;
494 let week_start = days - ((days + 3) % 7); week_start * MS_PER_DAY
496 }
497 CalendarInterval::Month | CalendarInterval::Quarter | CalendarInterval::Year => {
498 let days_since_epoch = epoch_ms / MS_PER_DAY;
500 let (y, m, _d) = days_to_ymd(days_since_epoch);
501 let floored_month = match interval {
502 CalendarInterval::Month => m,
503 CalendarInterval::Quarter => ((m - 1) / 3) * 3 + 1,
504 CalendarInterval::Year => 1,
505 _ => unreachable!(),
506 };
507 let floored_year = if matches!(interval, CalendarInterval::Year) {
508 y
509 } else {
510 y
511 };
512 ymd_to_epoch_ms(floored_year, floored_month, 1)
513 }
514 }
515}
516
517fn days_to_ymd(days: i64) -> (i32, u32, u32) {
519 let z = days + 719468;
521 let era = if z >= 0 { z } else { z - 146096 } / 146097;
522 let doe = (z - era * 146097) as u32;
523 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
524 let y = yoe as i64 + era * 400;
525 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
526 let mp = (5 * doy + 2) / 153;
527 let d = doy - (153 * mp + 2) / 5 + 1;
528 let m = if mp < 10 { mp + 3 } else { mp - 9 };
529 let y = if m <= 2 { y + 1 } else { y };
530 (y as i32, m, d)
531}
532
533fn ymd_to_epoch_ms(y: i32, m: u32, d: u32) -> i64 {
535 let y = if m <= 2 { y as i64 - 1 } else { y as i64 };
536 let era = if y >= 0 { y } else { y - 399 } / 400;
537 let yoe = (y - era * 400) as u32;
538 let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d - 1;
539 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
540 let days = era * 146097 + doe as i64 - 719468;
541 days * 86_400_000
542}
543
544impl Aggregator for DateHistogramCollector {
545 fn collect(&mut self, doc_id: DocId) {
546 let Some(v) = self
547 .col
548 .as_ref()
549 .and_then(|c| c.numeric_value(doc_id.as_u32()))
550 else {
551 return;
552 };
553 let key = self.bucket_key(v);
554 *self.buckets.entry(key).or_insert(0) += 1;
555 }
556
557 fn finish(self: Box<Self>) -> AggregationResult {
558 let mut buckets: Vec<Bucket> = self
559 .buckets
560 .into_iter()
561 .map(|(key, count)| Bucket {
562 key: BucketKey::Number(key as f64),
563 doc_count: count,
564 sub_aggs: HashMap::new(),
565 })
566 .collect();
567 buckets.sort_by(|a, b| {
568 let ka = if let BucketKey::Number(n) = a.key {
569 n
570 } else {
571 0.0
572 };
573 let kb = if let BucketKey::Number(n) = b.key {
574 n
575 } else {
576 0.0
577 };
578 ka.partial_cmp(&kb).unwrap_or(std::cmp::Ordering::Equal)
579 });
580 AggregationResult::Bucket(BucketResult { buckets })
581 }
582}
583
584pub struct NestedAggFactory {
587 pub path: String,
588 pub sub_agg_factories: Vec<(String, Box<dyn AggregatorFactory>)>,
589}
590
591impl AggregatorFactory for NestedAggFactory {
592 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
593 let parent_bitset = reader.parent_bitset().map(|b| b.to_vec());
594 let sub_collectors: Vec<(String, Box<dyn Aggregator>)> = self
595 .sub_agg_factories
596 .iter()
597 .map(|(name, factory)| (name.clone(), factory.create_collector(reader)))
598 .collect();
599 Box::new(NestedAggregator {
600 parent_bitset,
601 sub_collectors,
602 })
603 }
604
605 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
606 let mut total_count = 0u64;
608 let mut sub_results: HashMap<String, Vec<AggregationResult>> = HashMap::new();
609
610 for r in results {
611 if let AggregationResult::Bucket(br) = r {
612 for b in br.buckets {
613 total_count += b.doc_count;
614 for (name, sub_r) in b.sub_aggs {
615 sub_results.entry(name).or_default().push(sub_r);
616 }
617 }
618 }
619 }
620
621 let mut merged_sub_aggs = HashMap::new();
622 for ((name, factory), (_, results)) in self.sub_agg_factories.iter().zip(sub_results.iter())
623 {
624 merged_sub_aggs.insert(name.clone(), factory.merge_results(results.clone()));
625 }
626
627 AggregationResult::Bucket(BucketResult {
628 buckets: vec![Bucket {
629 key: BucketKey::String("nested".into()),
630 doc_count: total_count,
631 sub_aggs: merged_sub_aggs,
632 }],
633 })
634 }
635}
636
637struct NestedAggregator {
638 parent_bitset: Option<Vec<bool>>,
639 sub_collectors: Vec<(String, Box<dyn Aggregator>)>,
640}
641
642unsafe impl Send for NestedAggregator {}
643
644impl NestedAggregator {
645 fn children_of(&self, parent_doc: u32) -> Vec<u32> {
647 let Some(bitset) = &self.parent_bitset else {
648 return vec![];
649 };
650 let mut children = Vec::new();
651 let start = parent_doc as usize + 1;
652 for i in start..bitset.len() {
653 if bitset[i] {
654 break; }
656 children.push(i as u32);
657 }
658 children
659 }
660}
661
662impl Aggregator for NestedAggregator {
663 fn collect(&mut self, doc_id: DocId) {
664 let children = self.children_of(doc_id.as_u32());
666 for child_id in children {
667 for (_, collector) in &mut self.sub_collectors {
668 collector.collect(DocId::new(child_id));
669 }
670 }
671 }
672
673 fn finish(self: Box<Self>) -> AggregationResult {
674 let mut sub_aggs = HashMap::new();
675 let mut total_children = 0u64;
676 for (name, collector) in self.sub_collectors {
677 let result = collector.finish();
678 if let AggregationResult::Bucket(ref br) = result {
680 for b in &br.buckets {
681 total_children += b.doc_count;
682 }
683 }
684 sub_aggs.insert(name, result);
685 }
686 AggregationResult::Bucket(BucketResult {
687 buckets: vec![Bucket {
688 key: BucketKey::String("nested".into()),
689 doc_count: total_children,
690 sub_aggs,
691 }],
692 })
693 }
694}
695
696pub struct ReverseNestedAggFactory {
699 pub sub_agg_factories: Vec<(String, Box<dyn AggregatorFactory>)>,
700}
701
702impl AggregatorFactory for ReverseNestedAggFactory {
703 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
704 let parent_bitset = reader.parent_bitset().map(|b| b.to_vec());
705 let sub_collectors: Vec<(String, Box<dyn Aggregator>)> = self
706 .sub_agg_factories
707 .iter()
708 .map(|(name, factory)| (name.clone(), factory.create_collector(reader)))
709 .collect();
710 Box::new(ReverseNestedAggregator {
711 parent_bitset,
712 sub_collectors,
713 seen_parents: std::collections::HashSet::new(),
714 })
715 }
716
717 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
718 let mut total_count = 0u64;
719 let mut sub_results: HashMap<String, Vec<AggregationResult>> = HashMap::new();
720
721 for r in results {
722 if let AggregationResult::Bucket(br) = r {
723 for b in br.buckets {
724 total_count += b.doc_count;
725 for (name, sub_r) in b.sub_aggs {
726 sub_results.entry(name).or_default().push(sub_r);
727 }
728 }
729 }
730 }
731
732 let mut merged_sub_aggs = HashMap::new();
733 for ((name, factory), (_, results)) in self.sub_agg_factories.iter().zip(sub_results.iter())
734 {
735 merged_sub_aggs.insert(name.clone(), factory.merge_results(results.clone()));
736 }
737
738 AggregationResult::Bucket(BucketResult {
739 buckets: vec![Bucket {
740 key: BucketKey::String("reverse_nested".into()),
741 doc_count: total_count,
742 sub_aggs: merged_sub_aggs,
743 }],
744 })
745 }
746}
747
748struct ReverseNestedAggregator {
749 parent_bitset: Option<Vec<bool>>,
750 sub_collectors: Vec<(String, Box<dyn Aggregator>)>,
751 seen_parents: std::collections::HashSet<u32>,
754}
755
756unsafe impl Send for ReverseNestedAggregator {}
757
758impl ReverseNestedAggregator {
759 fn find_parent(&self, nested_doc: u32) -> Option<u32> {
761 let bitset = self.parent_bitset.as_ref()?;
762 let mut i = nested_doc as usize;
763 while i > 0 {
764 if i < bitset.len() && bitset[i] {
765 return Some(i as u32);
766 }
767 i -= 1;
768 }
769 if !bitset.is_empty() && bitset[0] {
770 Some(0)
771 } else {
772 None
773 }
774 }
775}
776
777impl Aggregator for ReverseNestedAggregator {
778 fn collect(&mut self, doc_id: DocId) {
779 if let Some(parent_id) = self.find_parent(doc_id.as_u32()) {
781 if self.seen_parents.insert(parent_id) {
782 for (_, collector) in &mut self.sub_collectors {
783 collector.collect(DocId::new(parent_id));
784 }
785 }
786 }
787 }
788
789 fn finish(self: Box<Self>) -> AggregationResult {
790 let parent_count = self.seen_parents.len() as u64;
791 let mut sub_aggs = HashMap::new();
792 for (name, collector) in self.sub_collectors {
793 sub_aggs.insert(name, collector.finish());
794 }
795 AggregationResult::Bucket(BucketResult {
796 buckets: vec![Bucket {
797 key: BucketKey::String("reverse_nested".into()),
798 doc_count: parent_count,
799 sub_aggs,
800 }],
801 })
802 }
803}
804
805pub struct GeohashGridAggFactory {
808 pub field_name: String,
809 pub precision: usize,
810 pub size: usize,
811}
812
813impl AggregatorFactory for GeohashGridAggFactory {
814 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
815 let field_id = reader
816 .header()
817 .fields
818 .iter()
819 .find(|f| f.field_name == self.field_name)
820 .map(|f| f.field_id);
821 let store = field_id.and_then(|fid| reader.geo_points(fid));
822 Box::new(GeohashGridCollector {
823 store,
824 precision: self.precision,
825 buckets: HashMap::new(),
826 })
827 }
828
829 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
830 let mut merged: HashMap<String, u64> = HashMap::new();
831 for r in results {
832 if let AggregationResult::Bucket(br) = r {
833 for b in br.buckets {
834 if let BucketKey::String(key) = b.key {
835 *merged.entry(key).or_insert(0) += b.doc_count;
836 }
837 }
838 }
839 }
840 let mut buckets: Vec<Bucket> = merged
841 .into_iter()
842 .map(|(key, count)| Bucket {
843 key: BucketKey::String(key),
844 doc_count: count,
845 sub_aggs: HashMap::new(),
846 })
847 .collect();
848 buckets.sort_by(|a, b| b.doc_count.cmp(&a.doc_count));
850 buckets.truncate(self.size);
851 AggregationResult::Bucket(BucketResult { buckets })
852 }
853}
854
855struct GeohashGridCollector {
856 store: Option<crate::spatial::geo::GeoPointStore>,
857 precision: usize,
858 buckets: HashMap<String, u64>,
859}
860
861unsafe impl Send for GeohashGridCollector {}
862
863impl Aggregator for GeohashGridCollector {
864 fn collect(&mut self, doc_id: DocId) {
865 if let Some(store) = &self.store {
866 if let Some(point) = store.get(doc_id.as_u32()) {
867 if let Ok(hash) = geohash::encode(
868 geohash::Coord {
869 x: point.lon,
870 y: point.lat,
871 },
872 self.precision,
873 ) {
874 *self.buckets.entry(hash).or_insert(0) += 1;
875 }
876 }
877 }
878 }
879
880 fn finish(self: Box<Self>) -> AggregationResult {
881 let mut buckets: Vec<Bucket> = self
882 .buckets
883 .into_iter()
884 .map(|(key, count)| Bucket {
885 key: BucketKey::String(key),
886 doc_count: count,
887 sub_aggs: HashMap::new(),
888 })
889 .collect();
890 buckets.sort_by(|a, b| b.doc_count.cmp(&a.doc_count));
891 AggregationResult::Bucket(BucketResult { buckets })
892 }
893}
894
895pub struct TopHitsAggFactory {
898 pub size: usize,
899}
900
901impl AggregatorFactory for TopHitsAggFactory {
902 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
903 Box::new(TopHitsCollector {
904 segment: reader as *const SegmentReader,
905 doc_ids: Vec::new(),
906 size: self.size,
907 })
908 }
909
910 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
911 let mut all_hits: Vec<serde_json::Value> = Vec::new();
912 for r in results {
913 if let AggregationResult::Hits(h) = r {
914 all_hits.extend(h.hits);
915 }
916 }
917 all_hits.truncate(self.size);
918 AggregationResult::Hits(HitsResult { hits: all_hits })
919 }
920}
921
922struct TopHitsCollector {
923 segment: *const SegmentReader,
924 doc_ids: Vec<u32>,
925 size: usize,
926}
927
928unsafe impl Send for TopHitsCollector {}
929
930impl Aggregator for TopHitsCollector {
931 fn collect(&mut self, doc_id: DocId) {
932 if self.doc_ids.len() < self.size {
933 self.doc_ids.push(doc_id.as_u32());
934 }
935 }
936
937 fn finish(self: Box<Self>) -> AggregationResult {
938 let reader = unsafe { &*self.segment };
939 let doc_store = reader.doc_store();
940 let mut hits = Vec::new();
941 for &doc_id in &self.doc_ids {
942 if let Some(source_bytes) = doc_store.get(doc_id) {
943 if let Ok(source) = serde_json::from_slice::<serde_json::Value>(&source_bytes) {
944 hits.push(serde_json::json!({
945 "_doc_id": doc_id,
946 "_source": source,
947 }));
948 }
949 }
950 }
951 AggregationResult::Hits(HitsResult { hits })
952 }
953}
954
955pub struct FilterAggFactory {
958 pub(crate) bound_query: Box<dyn crate::query::BoundQuery>,
959 pub(crate) sub_agg_factories: Vec<(String, Box<dyn AggregatorFactory>)>,
960}
961
962impl AggregatorFactory for FilterAggFactory {
963 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
964 let doc_count = reader.doc_count() as usize;
969 let mut bitset = vec![false; doc_count];
970
971 if let Ok(Some(supplier)) = self.bound_query.scorer_supplier(reader) {
972 if let Ok(mut scorer) = supplier.scorer() {
973 while scorer.doc_id() != crate::core::NO_MORE_DOCS {
974 let id = scorer.doc_id().as_u32() as usize;
975 if id < doc_count {
976 bitset[id] = true;
977 }
978 scorer.next();
979 }
980 }
981 }
982
983 let sub_collectors: Vec<(String, Box<dyn Aggregator>)> = self
985 .sub_agg_factories
986 .iter()
987 .map(|(name, factory)| (name.clone(), factory.create_collector(reader)))
988 .collect();
989
990 Box::new(FilterCollector {
991 bitset,
992 count: 0,
993 sub_collectors,
994 })
995 }
996
997 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
998 let mut total = 0u64;
999 let mut sub_partials: HashMap<String, Vec<AggregationResult>> = HashMap::new();
1000
1001 for r in results {
1002 if let AggregationResult::Bucket(br) = r {
1003 for b in br.buckets {
1004 total += b.doc_count;
1005 for (name, result) in b.sub_aggs {
1006 sub_partials.entry(name).or_default().push(result);
1007 }
1008 }
1009 }
1010 }
1011
1012 let mut sub_aggs = HashMap::new();
1014 for (name, factory) in &self.sub_agg_factories {
1015 if let Some(partials) = sub_partials.remove(name) {
1016 sub_aggs.insert(name.clone(), factory.merge_results(partials));
1017 }
1018 }
1019
1020 AggregationResult::Bucket(BucketResult {
1021 buckets: vec![Bucket {
1022 key: BucketKey::String("filter".into()),
1023 doc_count: total,
1024 sub_aggs,
1025 }],
1026 })
1027 }
1028}
1029
1030struct FilterCollector {
1031 bitset: Vec<bool>,
1032 count: u64,
1033 sub_collectors: Vec<(String, Box<dyn Aggregator>)>,
1034}
1035
1036pub struct FiltersAggFactory {
1039 pub(crate) filters: Vec<(String, Box<dyn crate::query::BoundQuery>)>,
1041}
1042
1043impl AggregatorFactory for FiltersAggFactory {
1044 fn create_collector(&self, reader: &SegmentReader) -> Box<dyn Aggregator> {
1045 let mut filter_bitsets: Vec<(String, Vec<bool>)> = Vec::new();
1051 let doc_count = reader.doc_count() as usize;
1052
1053 for (name, bound_query) in &self.filters {
1054 let mut bitset = vec![false; doc_count];
1055 if let Ok(Some(supplier)) = bound_query.scorer_supplier(reader) {
1056 if let Ok(mut scorer) = supplier.scorer() {
1057 while scorer.doc_id() != crate::core::NO_MORE_DOCS {
1058 let id = scorer.doc_id().as_u32() as usize;
1059 if id < doc_count {
1060 bitset[id] = true;
1061 }
1062 scorer.next();
1063 }
1064 }
1065 }
1066 filter_bitsets.push((name.clone(), bitset));
1067 }
1068
1069 Box::new(FiltersCollector {
1070 filter_bitsets,
1071 counts: vec![0u64; self.filters.len()],
1072 })
1073 }
1074
1075 fn merge_results(&self, results: Vec<AggregationResult>) -> AggregationResult {
1076 let num_filters = self.filters.len();
1077 let mut totals = vec![0u64; num_filters];
1078 let names: Vec<String> = self.filters.iter().map(|(n, _)| n.clone()).collect();
1079
1080 for r in results {
1081 if let AggregationResult::Bucket(br) = r {
1082 for (i, b) in br.buckets.iter().enumerate() {
1083 if i < num_filters {
1084 totals[i] += b.doc_count;
1085 }
1086 }
1087 }
1088 }
1089
1090 AggregationResult::Bucket(BucketResult {
1091 buckets: names
1092 .iter()
1093 .zip(totals.iter())
1094 .map(|(name, &count)| Bucket {
1095 key: BucketKey::String(name.clone()),
1096 doc_count: count,
1097 sub_aggs: HashMap::new(),
1098 })
1099 .collect(),
1100 })
1101 }
1102}
1103
1104struct FiltersCollector {
1105 filter_bitsets: Vec<(String, Vec<bool>)>,
1106 counts: Vec<u64>,
1107}
1108
1109unsafe impl Send for FiltersCollector {}
1110
1111impl Aggregator for FiltersCollector {
1112 fn collect(&mut self, doc_id: DocId) {
1113 let id = doc_id.as_u32() as usize;
1114 for (i, (_, bitset)) in self.filter_bitsets.iter().enumerate() {
1115 if id < bitset.len() && bitset[id] {
1116 self.counts[i] += 1;
1117 }
1118 }
1119 }
1120
1121 fn finish(self: Box<Self>) -> AggregationResult {
1122 AggregationResult::Bucket(BucketResult {
1123 buckets: self
1124 .filter_bitsets
1125 .iter()
1126 .zip(self.counts.iter())
1127 .map(|((name, _), &count)| Bucket {
1128 key: BucketKey::String(name.clone()),
1129 doc_count: count,
1130 sub_aggs: HashMap::new(),
1131 })
1132 .collect(),
1133 })
1134 }
1135}
1136
1137impl Aggregator for FilterCollector {
1138 fn collect(&mut self, doc_id: DocId) {
1139 let id = doc_id.as_u32() as usize;
1140 if id < self.bitset.len() && self.bitset[id] {
1141 self.count += 1;
1142 for (_, collector) in &mut self.sub_collectors {
1143 collector.collect(doc_id);
1144 }
1145 }
1146 }
1147
1148 fn finish(self: Box<Self>) -> AggregationResult {
1149 let mut sub_aggs = HashMap::new();
1150 for (name, collector) in self.sub_collectors {
1151 sub_aggs.insert(name, collector.finish());
1152 }
1153 AggregationResult::Bucket(BucketResult {
1154 buckets: vec![Bucket {
1155 key: BucketKey::String("filter".into()),
1156 doc_count: self.count,
1157 sub_aggs,
1158 }],
1159 })
1160 }
1161}