milli_core/documents/
reader.rs

1use std::convert::TryInto;
2use std::{error, fmt, io};
3
4use obkv::KvReader;
5
6use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY};
7use crate::FieldId;
8
9/// The `DocumentsBatchReader` provides a way to iterate over documents that have been created with
10/// a `DocumentsBatchWriter`.
11///
12/// The documents are returned in the form of `obkv::Reader` where each field is identified with a
13/// `FieldId`. The mapping between the field ids and the field names is done thanks to the index.
14pub struct DocumentsBatchReader<R> {
15    cursor: grenad::ReaderCursor<R>,
16    fields_index: DocumentsBatchIndex,
17}
18
19impl<R: io::Read + io::Seek> DocumentsBatchReader<R> {
20    pub fn new(cursor: DocumentsBatchCursor<R>, fields_index: DocumentsBatchIndex) -> Self {
21        Self { cursor: cursor.cursor, fields_index }
22    }
23
24    /// Construct a `DocumentsReader` from a reader.
25    ///
26    /// It first retrieves the index, then moves to the first document. Use the `into_cursor`
27    /// method to iterator over the documents, from the first to the last.
28    #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
29    pub fn from_reader(reader: R) -> Result<Self, Error> {
30        let reader = grenad::Reader::new(reader)?;
31        let mut cursor = reader.into_cursor()?;
32
33        let fields_index = match cursor.move_on_key_equal_to(DOCUMENTS_BATCH_INDEX_KEY)? {
34            Some((_, value)) => serde_json::from_slice(value).map_err(Error::Serialize)?,
35            None => return Err(Error::InvalidDocumentFormat),
36        };
37
38        Ok(DocumentsBatchReader { cursor, fields_index })
39    }
40
41    pub fn documents_count(&self) -> u32 {
42        self.cursor.len().saturating_sub(1).try_into().expect("Invalid number of documents")
43    }
44
45    pub fn is_empty(&self) -> bool {
46        self.cursor.len().saturating_sub(1) == 0
47    }
48
49    pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
50        &self.fields_index
51    }
52
53    /// This method returns a forward cursor over the documents.
54    pub fn into_cursor_and_fields_index(self) -> (DocumentsBatchCursor<R>, DocumentsBatchIndex) {
55        let DocumentsBatchReader { cursor, fields_index } = self;
56        let mut cursor = DocumentsBatchCursor { cursor };
57        cursor.reset();
58        (cursor, fields_index)
59    }
60}
61
62/// A forward cursor over the documents in a `DocumentsBatchReader`.
63pub struct DocumentsBatchCursor<R> {
64    cursor: grenad::ReaderCursor<R>,
65}
66
67impl<R> DocumentsBatchCursor<R> {
68    /// Resets the cursor to be able to read from the start again.
69    pub fn reset(&mut self) {
70        self.cursor.reset();
71    }
72}
73
74impl<R: io::Read + io::Seek> DocumentsBatchCursor<R> {
75    /// Returns a single document from the database.
76    pub fn get(
77        &mut self,
78        offset: u32,
79    ) -> Result<Option<&KvReader<FieldId>>, DocumentsBatchCursorError> {
80        match self.cursor.move_on_key_equal_to(offset.to_be_bytes())? {
81            Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => Ok(Some(value.into())),
82            _otherwise => Ok(None),
83        }
84    }
85
86    /// Returns the next document, starting from the first one. Subsequent calls to
87    /// `next_document` advance the document reader until all the documents have been read.
88    pub fn next_document(
89        &mut self,
90    ) -> Result<Option<&KvReader<FieldId>>, DocumentsBatchCursorError> {
91        match self.cursor.move_on_next()? {
92            Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => Ok(Some(value.into())),
93            _otherwise => Ok(None),
94        }
95    }
96}
97
98/// The possible error thrown by the `DocumentsBatchCursor` when iterating on the documents.
99#[derive(Debug)]
100pub enum DocumentsBatchCursorError {
101    Grenad(grenad::Error),
102    SerdeJson(serde_json::Error),
103}
104
105impl From<grenad::Error> for DocumentsBatchCursorError {
106    fn from(error: grenad::Error) -> DocumentsBatchCursorError {
107        DocumentsBatchCursorError::Grenad(error)
108    }
109}
110
111impl From<serde_json::Error> for DocumentsBatchCursorError {
112    fn from(error: serde_json::Error) -> DocumentsBatchCursorError {
113        DocumentsBatchCursorError::SerdeJson(error)
114    }
115}
116
117impl error::Error for DocumentsBatchCursorError {}
118
119impl fmt::Display for DocumentsBatchCursorError {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        match self {
122            DocumentsBatchCursorError::Grenad(e) => e.fmt(f),
123            DocumentsBatchCursorError::SerdeJson(e) => e.fmt(f),
124        }
125    }
126}