1use async_trait::async_trait;
85use downcast_rs::impl_downcast;
86
87use crate::schema::Schema;
88use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
89
90mod count_collector;
91pub use self::count_collector::Count;
92
93pub mod sort_key;
95
96mod histogram_collector;
97pub use histogram_collector::HistogramCollector;
98
99mod multi_collector;
100pub use self::multi_collector::{FruitHandle, MultiCollector, MultiFruit};
101
102mod top_collector;
103pub use self::top_collector::ComparableDoc;
104
105mod top_score_collector;
106pub use self::top_score_collector::{TopDocs, TopNComputer};
107
108mod sort_key_top_collector;
109pub use self::sort_key::{SegmentSortKeyComputer, SortKeyComputer};
110mod facet_collector;
111pub use self::facet_collector::{FacetCollector, FacetCounts};
112use crate::query::Weight;
113
114mod docset_collector;
115pub use self::docset_collector::DocSetCollector;
116
117mod filter_collector_wrapper;
118pub use self::filter_collector_wrapper::{BytesFilterCollector, FilterCollector};
119
120pub trait Fruit: Send + downcast_rs::Downcast {}
123
124impl<T> Fruit for T where T: Send + downcast_rs::Downcast {}
125
126#[async_trait]
143pub trait Collector: Sync + Send {
144 type Fruit: Fruit;
147
148 type Child: SegmentCollector;
150
151 fn check_schema(&self, _schema: &Schema) -> crate::Result<()> {
153 Ok(())
154 }
155
156 fn for_segment(
159 &self,
160 segment_local_id: SegmentOrdinal,
161 segment: &SegmentReader,
162 ) -> crate::Result<Self::Child>;
163
164 fn requires_scoring(&self) -> bool;
166
167 fn merge_fruits(
170 &self,
171 segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
172 ) -> crate::Result<Self::Fruit>;
173
174 fn collect_segment(
176 &self,
177 weight: &dyn Weight,
178 segment_ord: u32,
179 reader: &SegmentReader,
180 ) -> crate::Result<<Self::Child as SegmentCollector>::Fruit> {
181 let with_scoring = self.requires_scoring();
182 let mut segment_collector = self.for_segment(segment_ord, reader)?;
183 default_collect_segment_impl(&mut segment_collector, weight, reader, with_scoring)?;
184 Ok(segment_collector.harvest())
185 }
186
187 #[cfg(feature = "quickwit")]
189 async fn for_segment_async(
190 &self,
191 segment_local_id: SegmentOrdinal,
192 segment: &SegmentReader,
193 ) -> crate::Result<Self::Child> {
194 self.for_segment(segment_local_id, segment)
195 }
196
197 #[cfg(feature = "quickwit")]
199 async fn collect_segment_async(
200 &self,
201 weight: &dyn Weight,
202 segment_ord: u32,
203 reader: &SegmentReader,
204 ) -> crate::Result<<Self::Child as SegmentCollector>::Fruit> {
205 let mut segment_collector = self.for_segment_async(segment_ord, reader).await?;
206 match (reader.alive_bitset(), self.requires_scoring()) {
207 (Some(alive_bitset), true) => {
208 let cb = &mut |doc, score| {
209 if alive_bitset.is_alive(doc) {
210 segment_collector.collect(doc, score);
211 }
212 };
213 weight.for_each_async(reader, cb).await?;
214 }
215 (Some(alive_bitset), false) => {
216 let cb = &mut |docs: &[DocId]| {
217 for doc in docs.iter().cloned() {
218 if alive_bitset.is_alive(doc) {
219 segment_collector.collect(doc, 0.0);
220 }
221 }
222 };
223 weight.for_each_no_score_async(reader, cb).await?;
224 }
225 (None, true) => {
226 let cb = &mut |doc, score| {
227 segment_collector.collect(doc, score);
228 };
229 weight.for_each_async(reader, cb).await?;
230 }
231 (None, false) => {
232 let cb = &mut |docs: &[DocId]| {
233 for doc in docs.iter().cloned() {
234 segment_collector.collect(doc, 0.0);
235 }
236 };
237 weight.for_each_no_score_async(reader, cb).await?;
238 }
239 }
240 Ok(segment_collector.harvest())
241 }
242}
243
244pub(crate) fn default_collect_segment_impl<TSegmentCollector: SegmentCollector>(
245 segment_collector: &mut TSegmentCollector,
246 weight: &dyn Weight,
247 reader: &SegmentReader,
248 with_scoring: bool,
249) -> crate::Result<()> {
250 match (reader.alive_bitset(), with_scoring) {
251 (Some(alive_bitset), true) => {
252 weight.for_each(reader, &mut |doc, score| {
253 if alive_bitset.is_alive(doc) {
254 segment_collector.collect(doc, score);
255 }
256 })?;
257 }
258 (Some(alive_bitset), false) => {
259 weight.for_each_no_score(reader, &mut |docs| {
260 for doc in docs.iter().cloned() {
261 if alive_bitset.is_alive(doc) {
262 segment_collector.collect(doc, 0.0);
263 }
264 }
265 })?;
266 }
267 (None, true) => {
268 weight.for_each(reader, &mut |doc, score| {
269 segment_collector.collect(doc, score);
270 })?;
271 }
272 (None, false) => {
273 weight.for_each_no_score(reader, &mut |docs| {
274 segment_collector.collect_block(docs);
275 })?;
276 }
277 }
278 Ok(())
279}
280
281impl<TSegmentCollector: SegmentCollector> SegmentCollector for Option<TSegmentCollector> {
282 type Fruit = Option<TSegmentCollector::Fruit>;
283
284 fn collect(&mut self, doc: DocId, score: Score) {
285 if let Some(segment_collector) = self {
286 segment_collector.collect(doc, score);
287 }
288 }
289
290 fn collect_block(&mut self, docs: &[DocId]) {
291 if let Some(segment_collector) = self {
292 segment_collector.collect_block(docs);
293 }
294 }
295
296 fn harvest(self) -> Self::Fruit {
297 self.map(|segment_collector| segment_collector.harvest())
298 }
299}
300
301impl<TCollector: Collector> Collector for Option<TCollector> {
302 type Fruit = Option<TCollector::Fruit>;
303
304 type Child = Option<<TCollector as Collector>::Child>;
305
306 fn check_schema(&self, schema: &Schema) -> crate::Result<()> {
307 if let Some(underlying_collector) = self {
308 underlying_collector.check_schema(schema)?;
309 }
310 Ok(())
311 }
312
313 fn for_segment(
314 &self,
315 segment_local_id: SegmentOrdinal,
316 segment: &SegmentReader,
317 ) -> crate::Result<Self::Child> {
318 Ok(if let Some(inner) = self {
319 let inner_segment_collector = inner.for_segment(segment_local_id, segment)?;
320 Some(inner_segment_collector)
321 } else {
322 None
323 })
324 }
325
326 fn requires_scoring(&self) -> bool {
327 self.as_ref()
328 .map(|inner| inner.requires_scoring())
329 .unwrap_or(false)
330 }
331
332 fn merge_fruits(
333 &self,
334 segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
335 ) -> crate::Result<Self::Fruit> {
336 if let Some(inner) = self.as_ref() {
337 let inner_segment_fruits: Vec<_> = segment_fruits
338 .into_iter()
339 .flat_map(|fruit_opt| fruit_opt.into_iter())
340 .collect();
341 let fruit = inner.merge_fruits(inner_segment_fruits)?;
342 Ok(Some(fruit))
343 } else {
344 Ok(None)
345 }
346 }
347}
348
349pub trait SegmentCollector: 'static {
355 type Fruit: Fruit;
358
359 fn collect(&mut self, doc: DocId, score: Score);
361
362 fn collect_block(&mut self, docs: &[DocId]) {
368 for doc in docs {
369 self.collect(*doc, 0.0);
370 }
371 }
372
373 fn harvest(self) -> Self::Fruit;
375}
376
377impl<Left, Right> Collector for (Left, Right)
381where
382 Left: Collector,
383 Right: Collector,
384{
385 type Fruit = (Left::Fruit, Right::Fruit);
386 type Child = (Left::Child, Right::Child);
387
388 fn check_schema(&self, schema: &Schema) -> crate::Result<()> {
389 self.0.check_schema(schema)?;
390 self.1.check_schema(schema)?;
391 Ok(())
392 }
393
394 fn for_segment(
395 &self,
396 segment_local_id: u32,
397 segment: &SegmentReader,
398 ) -> crate::Result<Self::Child> {
399 let left = self.0.for_segment(segment_local_id, segment)?;
400 let right = self.1.for_segment(segment_local_id, segment)?;
401 Ok((left, right))
402 }
403
404 fn requires_scoring(&self) -> bool {
405 self.0.requires_scoring() || self.1.requires_scoring()
406 }
407
408 fn merge_fruits(
409 &self,
410 segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
411 ) -> crate::Result<(Left::Fruit, Right::Fruit)> {
412 let mut left_fruits = vec![];
413 let mut right_fruits = vec![];
414 for (left_fruit, right_fruit) in segment_fruits {
415 left_fruits.push(left_fruit);
416 right_fruits.push(right_fruit);
417 }
418 Ok((
419 self.0.merge_fruits(left_fruits)?,
420 self.1.merge_fruits(right_fruits)?,
421 ))
422 }
423}
424
425impl<Left, Right> SegmentCollector for (Left, Right)
426where
427 Left: SegmentCollector,
428 Right: SegmentCollector,
429{
430 type Fruit = (Left::Fruit, Right::Fruit);
431
432 fn collect(&mut self, doc: DocId, score: Score) {
433 self.0.collect(doc, score);
434 self.1.collect(doc, score);
435 }
436
437 fn collect_block(&mut self, docs: &[DocId]) {
438 self.0.collect_block(docs);
439 self.1.collect_block(docs);
440 }
441
442 fn harvest(self) -> <Self as SegmentCollector>::Fruit {
443 (self.0.harvest(), self.1.harvest())
444 }
445}
446
447impl<One, Two, Three> Collector for (One, Two, Three)
450where
451 One: Collector,
452 Two: Collector,
453 Three: Collector,
454{
455 type Fruit = (One::Fruit, Two::Fruit, Three::Fruit);
456 type Child = (One::Child, Two::Child, Three::Child);
457
458 fn check_schema(&self, schema: &Schema) -> crate::Result<()> {
459 self.0.check_schema(schema)?;
460 self.1.check_schema(schema)?;
461 self.2.check_schema(schema)?;
462 Ok(())
463 }
464
465 fn for_segment(
466 &self,
467 segment_local_id: u32,
468 segment: &SegmentReader,
469 ) -> crate::Result<Self::Child> {
470 let one = self.0.for_segment(segment_local_id, segment)?;
471 let two = self.1.for_segment(segment_local_id, segment)?;
472 let three = self.2.for_segment(segment_local_id, segment)?;
473 Ok((one, two, three))
474 }
475
476 fn requires_scoring(&self) -> bool {
477 self.0.requires_scoring() || self.1.requires_scoring() || self.2.requires_scoring()
478 }
479
480 fn merge_fruits(
481 &self,
482 children: Vec<<Self::Child as SegmentCollector>::Fruit>,
483 ) -> crate::Result<Self::Fruit> {
484 let mut one_fruits = vec![];
485 let mut two_fruits = vec![];
486 let mut three_fruits = vec![];
487 for (one_fruit, two_fruit, three_fruit) in children {
488 one_fruits.push(one_fruit);
489 two_fruits.push(two_fruit);
490 three_fruits.push(three_fruit);
491 }
492 Ok((
493 self.0.merge_fruits(one_fruits)?,
494 self.1.merge_fruits(two_fruits)?,
495 self.2.merge_fruits(three_fruits)?,
496 ))
497 }
498}
499
500impl<One, Two, Three> SegmentCollector for (One, Two, Three)
501where
502 One: SegmentCollector,
503 Two: SegmentCollector,
504 Three: SegmentCollector,
505{
506 type Fruit = (One::Fruit, Two::Fruit, Three::Fruit);
507
508 fn collect(&mut self, doc: DocId, score: Score) {
509 self.0.collect(doc, score);
510 self.1.collect(doc, score);
511 self.2.collect(doc, score);
512 }
513
514 fn collect_block(&mut self, docs: &[DocId]) {
515 self.0.collect_block(docs);
516 self.1.collect_block(docs);
517 self.2.collect_block(docs);
518 }
519
520 fn harvest(self) -> <Self as SegmentCollector>::Fruit {
521 (self.0.harvest(), self.1.harvest(), self.2.harvest())
522 }
523}
524
525impl<One, Two, Three, Four> Collector for (One, Two, Three, Four)
528where
529 One: Collector,
530 Two: Collector,
531 Three: Collector,
532 Four: Collector,
533{
534 type Fruit = (One::Fruit, Two::Fruit, Three::Fruit, Four::Fruit);
535 type Child = (One::Child, Two::Child, Three::Child, Four::Child);
536
537 fn check_schema(&self, schema: &Schema) -> crate::Result<()> {
538 self.0.check_schema(schema)?;
539 self.1.check_schema(schema)?;
540 self.2.check_schema(schema)?;
541 self.3.check_schema(schema)?;
542 Ok(())
543 }
544
545 fn for_segment(
546 &self,
547 segment_local_id: u32,
548 segment: &SegmentReader,
549 ) -> crate::Result<Self::Child> {
550 let one = self.0.for_segment(segment_local_id, segment)?;
551 let two = self.1.for_segment(segment_local_id, segment)?;
552 let three = self.2.for_segment(segment_local_id, segment)?;
553 let four = self.3.for_segment(segment_local_id, segment)?;
554 Ok((one, two, three, four))
555 }
556
557 fn requires_scoring(&self) -> bool {
558 self.0.requires_scoring()
559 || self.1.requires_scoring()
560 || self.2.requires_scoring()
561 || self.3.requires_scoring()
562 }
563
564 fn merge_fruits(
565 &self,
566 children: Vec<<Self::Child as SegmentCollector>::Fruit>,
567 ) -> crate::Result<Self::Fruit> {
568 let mut one_fruits = vec![];
569 let mut two_fruits = vec![];
570 let mut three_fruits = vec![];
571 let mut four_fruits = vec![];
572 for (one_fruit, two_fruit, three_fruit, four_fruit) in children {
573 one_fruits.push(one_fruit);
574 two_fruits.push(two_fruit);
575 three_fruits.push(three_fruit);
576 four_fruits.push(four_fruit);
577 }
578 Ok((
579 self.0.merge_fruits(one_fruits)?,
580 self.1.merge_fruits(two_fruits)?,
581 self.2.merge_fruits(three_fruits)?,
582 self.3.merge_fruits(four_fruits)?,
583 ))
584 }
585}
586
587impl<One, Two, Three, Four> SegmentCollector for (One, Two, Three, Four)
588where
589 One: SegmentCollector,
590 Two: SegmentCollector,
591 Three: SegmentCollector,
592 Four: SegmentCollector,
593{
594 type Fruit = (One::Fruit, Two::Fruit, Three::Fruit, Four::Fruit);
595
596 fn collect(&mut self, doc: DocId, score: Score) {
597 self.0.collect(doc, score);
598 self.1.collect(doc, score);
599 self.2.collect(doc, score);
600 self.3.collect(doc, score);
601 }
602
603 fn collect_block(&mut self, docs: &[DocId]) {
604 self.0.collect_block(docs);
605 self.1.collect_block(docs);
606 self.2.collect_block(docs);
607 self.3.collect_block(docs);
608 }
609
610 fn harvest(self) -> <Self as SegmentCollector>::Fruit {
611 (
612 self.0.harvest(),
613 self.1.harvest(),
614 self.2.harvest(),
615 self.3.harvest(),
616 )
617 }
618}
619
620impl_downcast!(Fruit);
621
622#[cfg(test)]
623pub(crate) mod tests;