milli_core/documents/
enriched.rs

1use std::fs::File;
2use std::io::BufReader;
3use std::{io, str};
4
5use obkv::KvReader;
6
7use super::{
8    DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchIndex, DocumentsBatchReader,
9    Error,
10};
11use crate::update::DocumentId;
12use crate::FieldId;
13
14/// The `EnrichedDocumentsBatchReader` provides a way to iterate over documents that have
15/// been created with a `DocumentsBatchWriter` and, for the enriched data,
16/// a simple `grenad::Reader<File>`.
17///
18/// The documents are returned in the form of `obkv::Reader` where each field is identified with a
19/// `FieldId`. The mapping between the field ids and the field names is done thanks to the index.
20pub struct EnrichedDocumentsBatchReader<R> {
21    documents: DocumentsBatchReader<R>,
22    primary_key: String,
23    external_ids: grenad::ReaderCursor<BufReader<File>>,
24}
25
26impl<R: io::Read + io::Seek> EnrichedDocumentsBatchReader<R> {
27    pub fn new(
28        documents: DocumentsBatchReader<R>,
29        primary_key: String,
30        external_ids: grenad::Reader<BufReader<File>>,
31    ) -> Result<Self, Error> {
32        if documents.documents_count() as u64 == external_ids.len() {
33            Ok(EnrichedDocumentsBatchReader {
34                documents,
35                primary_key,
36                external_ids: external_ids.into_cursor()?,
37            })
38        } else {
39            Err(Error::InvalidEnrichedData)
40        }
41    }
42
43    pub fn documents_count(&self) -> u32 {
44        self.documents.documents_count()
45    }
46
47    pub fn primary_key(&self) -> &str {
48        &self.primary_key
49    }
50
51    pub fn is_empty(&self) -> bool {
52        self.documents.is_empty()
53    }
54
55    pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
56        self.documents.documents_batch_index()
57    }
58
59    /// This method returns a forward cursor over the enriched documents.
60    pub fn into_cursor_and_fields_index(
61        self,
62    ) -> (EnrichedDocumentsBatchCursor<R>, DocumentsBatchIndex) {
63        let EnrichedDocumentsBatchReader { documents, primary_key, mut external_ids } = self;
64        let (documents, fields_index) = documents.into_cursor_and_fields_index();
65        external_ids.reset();
66        (EnrichedDocumentsBatchCursor { documents, primary_key, external_ids }, fields_index)
67    }
68}
69
70#[derive(Debug, Clone)]
71pub struct EnrichedDocument<'a> {
72    pub document: &'a KvReader<FieldId>,
73    pub document_id: DocumentId,
74}
75
76pub struct EnrichedDocumentsBatchCursor<R> {
77    documents: DocumentsBatchCursor<R>,
78    primary_key: String,
79    external_ids: grenad::ReaderCursor<BufReader<File>>,
80}
81
82impl<R> EnrichedDocumentsBatchCursor<R> {
83    pub fn primary_key(&self) -> &str {
84        &self.primary_key
85    }
86    /// Resets the cursor to be able to read from the start again.
87    pub fn reset(&mut self) {
88        self.documents.reset();
89        self.external_ids.reset();
90    }
91}
92
93impl<R: io::Read + io::Seek> EnrichedDocumentsBatchCursor<R> {
94    /// Returns the next document, starting from the first one. Subsequent calls to
95    /// `next_document` advance the document reader until all the documents have been read.
96    pub fn next_enriched_document(
97        &mut self,
98    ) -> Result<Option<EnrichedDocument<'_>>, DocumentsBatchCursorError> {
99        let document = self.documents.next_document()?;
100        let document_id = match self.external_ids.move_on_next()? {
101            Some((_, bytes)) => serde_json::from_slice(bytes).map(Some)?,
102            None => None,
103        };
104
105        match document.zip(document_id) {
106            Some((document, document_id)) => Ok(Some(EnrichedDocument { document, document_id })),
107            None => Ok(None),
108        }
109    }
110}