Skip to main content

ld_lucivy/collector/
mod.rs

1//! # Collectors
2//!
3//! Collectors define the information you want to extract from the documents matching the queries.
4//! In lucivy jargon, we call this information your search "fruit".
5//!
6//! Your fruit could for instance be :
7//! - [the count of matching documents](crate::collector::Count)
8//! - [the top 10 documents, by relevancy or by a fast field](crate::collector::TopDocs)
9//! - [facet counts](FacetCollector)
10//!
11//! At some point in your code, you will trigger the actual search operation by calling
12//! [`Searcher::search()`](crate::Searcher::search).
13//! This call will look like this:
14//!
15//! ```verbatim
16//! let fruit = searcher.search(&query, &collector)?;
17//! ```
18//!
19//! Here the type of fruit is actually determined as an associated type of the collector
20//! (`Collector::Fruit`).
21//!
22//!
23//! # Combining several collectors
24//!
25//! A rich search experience often requires to run several collectors on your search query.
26//! For instance,
27//! - selecting the top-K products matching your query
28//! - counting the matching documents
29//! - computing several facets
30//! - computing statistics about the matching product prices
31//!
32//! A simple and efficient way to do that is to pass your collectors as one tuple.
33//! The resulting `Fruit` will then be a typed tuple with each collector's original fruits
34//! in their respective position.
35//!
36//! ```rust
37//! # use lucivy::schema::*;
38//! # use lucivy::*;
39//! # use lucivy::query::*;
40//! use lucivy::collector::{Count, TopDocs};
41//! #
42//! # fn main() -> lucivy::Result<()> {
43//! # let mut schema_builder = Schema::builder();
44//! #     let title = schema_builder.add_text_field("title", TEXT);
45//! #     let schema = schema_builder.build();
46//! #     let index = Index::create_in_ram(schema);
47//! #     let mut index_writer = index.writer(15_000_000)?;
48//! #       index_writer.add_document(doc!(
49//! #       title => "The Name of the Wind",
50//! #      ))?;
51//! #     index_writer.add_document(doc!(
52//! #        title => "The Diary of Muadib",
53//! #     ))?;
54//! #     index_writer.commit()?;
55//! #     let reader = index.reader()?;
56//! #     let searcher = reader.searcher();
57//! #     let query_parser = QueryParser::for_index(&index, vec![title]);
58//! #     let query = query_parser.parse_query("diary")?;
59//! let (doc_count, top_docs): (usize, Vec<(Score, DocAddress)>) =
60//! searcher.search(&query, &(Count, TopDocs::with_limit(2).order_by_score()))?;
61//! #     Ok(())
62//! # }
63//! ```
64//!
65//! The `Collector` trait is implemented for up to 4 collectors.
66//! If you have more than 4 collectors, you can either group them into
67//! tuples of tuples `(a,(b,(c,d)))`, or rely on [`MultiCollector`].
68//!
69//! # Combining several collectors dynamically
70//!
71//! Combining collectors into a tuple is a zero-cost abstraction: everything
72//! happens as if you had manually implemented a single collector
73//! combining all of our features.
74//!
75//! Unfortunately it requires you to know at compile time your collector types.
76//! If on the other hand, the collectors depend on some query parameter,
77//! you can rely on [`MultiCollector`]'s.
78//!
79//!
80//! # Implementing your own collectors.
81//!
82//! See the `custom_collector` example.
83
84use async_trait::async_trait;
85use downcast_rs::impl_downcast;
86
87use crate::schema::Schema;
88use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
89
90mod count_collector;
91pub use self::count_collector::Count;
92
93/// Sort keys
94pub mod sort_key;
95
96mod histogram_collector;
97pub use histogram_collector::HistogramCollector;
98
99mod multi_collector;
100pub use self::multi_collector::{FruitHandle, MultiCollector, MultiFruit};
101
102mod top_collector;
103pub use self::top_collector::ComparableDoc;
104
105mod top_score_collector;
106pub use self::top_score_collector::{TopDocs, TopNComputer};
107
108mod sort_key_top_collector;
109pub use self::sort_key::{SegmentSortKeyComputer, SortKeyComputer};
110mod facet_collector;
111pub use self::facet_collector::{FacetCollector, FacetCounts};
112use crate::query::Weight;
113
114mod docset_collector;
115pub use self::docset_collector::DocSetCollector;
116
117mod filter_collector_wrapper;
118pub use self::filter_collector_wrapper::{BytesFilterCollector, FilterCollector};
119
120/// `Fruit` is the type for the result of our collection.
121/// e.g. `usize` for the `Count` collector.
122pub trait Fruit: Send + downcast_rs::Downcast {}
123
124impl<T> Fruit for T where T: Send + downcast_rs::Downcast {}
125
126/// Collectors are in charge of collecting and retaining relevant
127/// information from the document found and scored by the query.
128///
129/// For instance,
130///
131/// - keeping track of the top 10 best documents
132/// - computing a breakdown over a fast field
133/// - computing the number of documents matching the query
134///
135/// Our search index is in fact a collection of segments, so
136/// a `Collector` trait is actually more of a factory to instance
137/// `SegmentCollector`s for each segments.
138///
139/// The collection logic itself is in the `SegmentCollector`.
140///
141/// Segments are not guaranteed to be visited in any specific order.
142#[async_trait]
143pub trait Collector: Sync + Send {
144    /// `Fruit` is the type for the result of our collection.
145    /// e.g. `usize` for the `Count` collector.
146    type Fruit: Fruit;
147
148    /// Type of the `SegmentCollector` associated with this collector.
149    type Child: SegmentCollector;
150
151    /// Returns an error if the schema is not compatible with the collector.
152    fn check_schema(&self, _schema: &Schema) -> crate::Result<()> {
153        Ok(())
154    }
155
156    /// `set_segment` is called before beginning to enumerate
157    /// on this segment.
158    fn for_segment(
159        &self,
160        segment_local_id: SegmentOrdinal,
161        segment: &SegmentReader,
162    ) -> crate::Result<Self::Child>;
163
164    /// Returns true iff the collector requires to compute scores for documents.
165    fn requires_scoring(&self) -> bool;
166
167    /// Combines the fruit associated with the collection of each segments
168    /// into one fruit.
169    fn merge_fruits(
170        &self,
171        segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
172    ) -> crate::Result<Self::Fruit>;
173
174    /// Created a segment collector and
175    fn collect_segment(
176        &self,
177        weight: &dyn Weight,
178        segment_ord: u32,
179        reader: &SegmentReader,
180    ) -> crate::Result<<Self::Child as SegmentCollector>::Fruit> {
181        let with_scoring = self.requires_scoring();
182        let mut segment_collector = self.for_segment(segment_ord, reader)?;
183        default_collect_segment_impl(&mut segment_collector, weight, reader, with_scoring)?;
184        Ok(segment_collector.harvest())
185    }
186
187    /// Creates a segment collector asynchronously
188    #[cfg(feature = "quickwit")]
189    async fn for_segment_async(
190        &self,
191        segment_local_id: SegmentOrdinal,
192        segment: &SegmentReader,
193    ) -> crate::Result<Self::Child> {
194        self.for_segment(segment_local_id, segment)
195    }
196
197    /// Created a segment collector in async way
198    #[cfg(feature = "quickwit")]
199    async fn collect_segment_async(
200        &self,
201        weight: &dyn Weight,
202        segment_ord: u32,
203        reader: &SegmentReader,
204    ) -> crate::Result<<Self::Child as SegmentCollector>::Fruit> {
205        let mut segment_collector = self.for_segment_async(segment_ord, reader).await?;
206        match (reader.alive_bitset(), self.requires_scoring()) {
207            (Some(alive_bitset), true) => {
208                let cb = &mut |doc, score| {
209                    if alive_bitset.is_alive(doc) {
210                        segment_collector.collect(doc, score);
211                    }
212                };
213                weight.for_each_async(reader, cb).await?;
214            }
215            (Some(alive_bitset), false) => {
216                let cb = &mut |docs: &[DocId]| {
217                    for doc in docs.iter().cloned() {
218                        if alive_bitset.is_alive(doc) {
219                            segment_collector.collect(doc, 0.0);
220                        }
221                    }
222                };
223                weight.for_each_no_score_async(reader, cb).await?;
224            }
225            (None, true) => {
226                let cb = &mut |doc, score| {
227                    segment_collector.collect(doc, score);
228                };
229                weight.for_each_async(reader, cb).await?;
230            }
231            (None, false) => {
232                let cb = &mut |docs: &[DocId]| {
233                    for doc in docs.iter().cloned() {
234                        segment_collector.collect(doc, 0.0);
235                    }
236                };
237                weight.for_each_no_score_async(reader, cb).await?;
238            }
239        }
240        Ok(segment_collector.harvest())
241    }
242}
243
244pub(crate) fn default_collect_segment_impl<TSegmentCollector: SegmentCollector>(
245    segment_collector: &mut TSegmentCollector,
246    weight: &dyn Weight,
247    reader: &SegmentReader,
248    with_scoring: bool,
249) -> crate::Result<()> {
250    match (reader.alive_bitset(), with_scoring) {
251        (Some(alive_bitset), true) => {
252            weight.for_each(reader, &mut |doc, score| {
253                if alive_bitset.is_alive(doc) {
254                    segment_collector.collect(doc, score);
255                }
256            })?;
257        }
258        (Some(alive_bitset), false) => {
259            weight.for_each_no_score(reader, &mut |docs| {
260                for doc in docs.iter().cloned() {
261                    if alive_bitset.is_alive(doc) {
262                        segment_collector.collect(doc, 0.0);
263                    }
264                }
265            })?;
266        }
267        (None, true) => {
268            weight.for_each(reader, &mut |doc, score| {
269                segment_collector.collect(doc, score);
270            })?;
271        }
272        (None, false) => {
273            weight.for_each_no_score(reader, &mut |docs| {
274                segment_collector.collect_block(docs);
275            })?;
276        }
277    }
278    Ok(())
279}
280
281impl<TSegmentCollector: SegmentCollector> SegmentCollector for Option<TSegmentCollector> {
282    type Fruit = Option<TSegmentCollector::Fruit>;
283
284    fn collect(&mut self, doc: DocId, score: Score) {
285        if let Some(segment_collector) = self {
286            segment_collector.collect(doc, score);
287        }
288    }
289
290    fn collect_block(&mut self, docs: &[DocId]) {
291        if let Some(segment_collector) = self {
292            segment_collector.collect_block(docs);
293        }
294    }
295
296    fn harvest(self) -> Self::Fruit {
297        self.map(|segment_collector| segment_collector.harvest())
298    }
299}
300
301impl<TCollector: Collector> Collector for Option<TCollector> {
302    type Fruit = Option<TCollector::Fruit>;
303
304    type Child = Option<<TCollector as Collector>::Child>;
305
306    fn check_schema(&self, schema: &Schema) -> crate::Result<()> {
307        if let Some(underlying_collector) = self {
308            underlying_collector.check_schema(schema)?;
309        }
310        Ok(())
311    }
312
313    fn for_segment(
314        &self,
315        segment_local_id: SegmentOrdinal,
316        segment: &SegmentReader,
317    ) -> crate::Result<Self::Child> {
318        Ok(if let Some(inner) = self {
319            let inner_segment_collector = inner.for_segment(segment_local_id, segment)?;
320            Some(inner_segment_collector)
321        } else {
322            None
323        })
324    }
325
326    fn requires_scoring(&self) -> bool {
327        self.as_ref()
328            .map(|inner| inner.requires_scoring())
329            .unwrap_or(false)
330    }
331
332    fn merge_fruits(
333        &self,
334        segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
335    ) -> crate::Result<Self::Fruit> {
336        if let Some(inner) = self.as_ref() {
337            let inner_segment_fruits: Vec<_> = segment_fruits
338                .into_iter()
339                .flat_map(|fruit_opt| fruit_opt.into_iter())
340                .collect();
341            let fruit = inner.merge_fruits(inner_segment_fruits)?;
342            Ok(Some(fruit))
343        } else {
344            Ok(None)
345        }
346    }
347}
348
349/// The `SegmentCollector` is the trait in charge of defining the
350/// collect operation at the scale of the segment.
351///
352/// `.collect(doc, score)` will be called for every documents
353/// matching the query.
354pub trait SegmentCollector: 'static {
355    /// `Fruit` is the type for the result of our collection.
356    /// e.g. `usize` for the `Count` collector.
357    type Fruit: Fruit;
358
359    /// The query pushes the scored document to the collector via this method.
360    fn collect(&mut self, doc: DocId, score: Score);
361
362    /// The query pushes the scored document to the collector via this method.
363    /// This method is used when the collector does not require scoring.
364    ///
365    /// See [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN) for the
366    /// buffer size passed to the collector.
367    fn collect_block(&mut self, docs: &[DocId]) {
368        for doc in docs {
369            self.collect(*doc, 0.0);
370        }
371    }
372
373    /// Extract the fruit of the collection from the `SegmentCollector`.
374    fn harvest(self) -> Self::Fruit;
375}
376
377// -----------------------------------------------
378// Tuple implementations.
379
380impl<Left, Right> Collector for (Left, Right)
381where
382    Left: Collector,
383    Right: Collector,
384{
385    type Fruit = (Left::Fruit, Right::Fruit);
386    type Child = (Left::Child, Right::Child);
387
388    fn check_schema(&self, schema: &Schema) -> crate::Result<()> {
389        self.0.check_schema(schema)?;
390        self.1.check_schema(schema)?;
391        Ok(())
392    }
393
394    fn for_segment(
395        &self,
396        segment_local_id: u32,
397        segment: &SegmentReader,
398    ) -> crate::Result<Self::Child> {
399        let left = self.0.for_segment(segment_local_id, segment)?;
400        let right = self.1.for_segment(segment_local_id, segment)?;
401        Ok((left, right))
402    }
403
404    fn requires_scoring(&self) -> bool {
405        self.0.requires_scoring() || self.1.requires_scoring()
406    }
407
408    fn merge_fruits(
409        &self,
410        segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
411    ) -> crate::Result<(Left::Fruit, Right::Fruit)> {
412        let mut left_fruits = vec![];
413        let mut right_fruits = vec![];
414        for (left_fruit, right_fruit) in segment_fruits {
415            left_fruits.push(left_fruit);
416            right_fruits.push(right_fruit);
417        }
418        Ok((
419            self.0.merge_fruits(left_fruits)?,
420            self.1.merge_fruits(right_fruits)?,
421        ))
422    }
423}
424
425impl<Left, Right> SegmentCollector for (Left, Right)
426where
427    Left: SegmentCollector,
428    Right: SegmentCollector,
429{
430    type Fruit = (Left::Fruit, Right::Fruit);
431
432    fn collect(&mut self, doc: DocId, score: Score) {
433        self.0.collect(doc, score);
434        self.1.collect(doc, score);
435    }
436
437    fn collect_block(&mut self, docs: &[DocId]) {
438        self.0.collect_block(docs);
439        self.1.collect_block(docs);
440    }
441
442    fn harvest(self) -> <Self as SegmentCollector>::Fruit {
443        (self.0.harvest(), self.1.harvest())
444    }
445}
446
447// 3-Tuple
448
449impl<One, Two, Three> Collector for (One, Two, Three)
450where
451    One: Collector,
452    Two: Collector,
453    Three: Collector,
454{
455    type Fruit = (One::Fruit, Two::Fruit, Three::Fruit);
456    type Child = (One::Child, Two::Child, Three::Child);
457
458    fn check_schema(&self, schema: &Schema) -> crate::Result<()> {
459        self.0.check_schema(schema)?;
460        self.1.check_schema(schema)?;
461        self.2.check_schema(schema)?;
462        Ok(())
463    }
464
465    fn for_segment(
466        &self,
467        segment_local_id: u32,
468        segment: &SegmentReader,
469    ) -> crate::Result<Self::Child> {
470        let one = self.0.for_segment(segment_local_id, segment)?;
471        let two = self.1.for_segment(segment_local_id, segment)?;
472        let three = self.2.for_segment(segment_local_id, segment)?;
473        Ok((one, two, three))
474    }
475
476    fn requires_scoring(&self) -> bool {
477        self.0.requires_scoring() || self.1.requires_scoring() || self.2.requires_scoring()
478    }
479
480    fn merge_fruits(
481        &self,
482        children: Vec<<Self::Child as SegmentCollector>::Fruit>,
483    ) -> crate::Result<Self::Fruit> {
484        let mut one_fruits = vec![];
485        let mut two_fruits = vec![];
486        let mut three_fruits = vec![];
487        for (one_fruit, two_fruit, three_fruit) in children {
488            one_fruits.push(one_fruit);
489            two_fruits.push(two_fruit);
490            three_fruits.push(three_fruit);
491        }
492        Ok((
493            self.0.merge_fruits(one_fruits)?,
494            self.1.merge_fruits(two_fruits)?,
495            self.2.merge_fruits(three_fruits)?,
496        ))
497    }
498}
499
500impl<One, Two, Three> SegmentCollector for (One, Two, Three)
501where
502    One: SegmentCollector,
503    Two: SegmentCollector,
504    Three: SegmentCollector,
505{
506    type Fruit = (One::Fruit, Two::Fruit, Three::Fruit);
507
508    fn collect(&mut self, doc: DocId, score: Score) {
509        self.0.collect(doc, score);
510        self.1.collect(doc, score);
511        self.2.collect(doc, score);
512    }
513
514    fn collect_block(&mut self, docs: &[DocId]) {
515        self.0.collect_block(docs);
516        self.1.collect_block(docs);
517        self.2.collect_block(docs);
518    }
519
520    fn harvest(self) -> <Self as SegmentCollector>::Fruit {
521        (self.0.harvest(), self.1.harvest(), self.2.harvest())
522    }
523}
524
525// 4-Tuple
526
527impl<One, Two, Three, Four> Collector for (One, Two, Three, Four)
528where
529    One: Collector,
530    Two: Collector,
531    Three: Collector,
532    Four: Collector,
533{
534    type Fruit = (One::Fruit, Two::Fruit, Three::Fruit, Four::Fruit);
535    type Child = (One::Child, Two::Child, Three::Child, Four::Child);
536
537    fn check_schema(&self, schema: &Schema) -> crate::Result<()> {
538        self.0.check_schema(schema)?;
539        self.1.check_schema(schema)?;
540        self.2.check_schema(schema)?;
541        self.3.check_schema(schema)?;
542        Ok(())
543    }
544
545    fn for_segment(
546        &self,
547        segment_local_id: u32,
548        segment: &SegmentReader,
549    ) -> crate::Result<Self::Child> {
550        let one = self.0.for_segment(segment_local_id, segment)?;
551        let two = self.1.for_segment(segment_local_id, segment)?;
552        let three = self.2.for_segment(segment_local_id, segment)?;
553        let four = self.3.for_segment(segment_local_id, segment)?;
554        Ok((one, two, three, four))
555    }
556
557    fn requires_scoring(&self) -> bool {
558        self.0.requires_scoring()
559            || self.1.requires_scoring()
560            || self.2.requires_scoring()
561            || self.3.requires_scoring()
562    }
563
564    fn merge_fruits(
565        &self,
566        children: Vec<<Self::Child as SegmentCollector>::Fruit>,
567    ) -> crate::Result<Self::Fruit> {
568        let mut one_fruits = vec![];
569        let mut two_fruits = vec![];
570        let mut three_fruits = vec![];
571        let mut four_fruits = vec![];
572        for (one_fruit, two_fruit, three_fruit, four_fruit) in children {
573            one_fruits.push(one_fruit);
574            two_fruits.push(two_fruit);
575            three_fruits.push(three_fruit);
576            four_fruits.push(four_fruit);
577        }
578        Ok((
579            self.0.merge_fruits(one_fruits)?,
580            self.1.merge_fruits(two_fruits)?,
581            self.2.merge_fruits(three_fruits)?,
582            self.3.merge_fruits(four_fruits)?,
583        ))
584    }
585}
586
587impl<One, Two, Three, Four> SegmentCollector for (One, Two, Three, Four)
588where
589    One: SegmentCollector,
590    Two: SegmentCollector,
591    Three: SegmentCollector,
592    Four: SegmentCollector,
593{
594    type Fruit = (One::Fruit, Two::Fruit, Three::Fruit, Four::Fruit);
595
596    fn collect(&mut self, doc: DocId, score: Score) {
597        self.0.collect(doc, score);
598        self.1.collect(doc, score);
599        self.2.collect(doc, score);
600        self.3.collect(doc, score);
601    }
602
603    fn collect_block(&mut self, docs: &[DocId]) {
604        self.0.collect_block(docs);
605        self.1.collect_block(docs);
606        self.2.collect_block(docs);
607        self.3.collect_block(docs);
608    }
609
610    fn harvest(self) -> <Self as SegmentCollector>::Fruit {
611        (
612            self.0.harvest(),
613            self.1.harvest(),
614            self.2.harvest(),
615            self.3.harvest(),
616        )
617    }
618}
619
620impl_downcast!(Fruit);
621
622#[cfg(test)]
623pub(crate) mod tests;