milli_core/documents/
reader.rs1use std::convert::TryInto;
2use std::{error, fmt, io};
3
4use obkv::KvReader;
5
6use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY};
7use crate::FieldId;
8
9pub struct DocumentsBatchReader<R> {
15 cursor: grenad::ReaderCursor<R>,
16 fields_index: DocumentsBatchIndex,
17}
18
19impl<R: io::Read + io::Seek> DocumentsBatchReader<R> {
20 pub fn new(cursor: DocumentsBatchCursor<R>, fields_index: DocumentsBatchIndex) -> Self {
21 Self { cursor: cursor.cursor, fields_index }
22 }
23
24 #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
29 pub fn from_reader(reader: R) -> Result<Self, Error> {
30 let reader = grenad::Reader::new(reader)?;
31 let mut cursor = reader.into_cursor()?;
32
33 let fields_index = match cursor.move_on_key_equal_to(DOCUMENTS_BATCH_INDEX_KEY)? {
34 Some((_, value)) => serde_json::from_slice(value).map_err(Error::Serialize)?,
35 None => return Err(Error::InvalidDocumentFormat),
36 };
37
38 Ok(DocumentsBatchReader { cursor, fields_index })
39 }
40
41 pub fn documents_count(&self) -> u32 {
42 self.cursor.len().saturating_sub(1).try_into().expect("Invalid number of documents")
43 }
44
45 pub fn is_empty(&self) -> bool {
46 self.cursor.len().saturating_sub(1) == 0
47 }
48
49 pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
50 &self.fields_index
51 }
52
53 pub fn into_cursor_and_fields_index(self) -> (DocumentsBatchCursor<R>, DocumentsBatchIndex) {
55 let DocumentsBatchReader { cursor, fields_index } = self;
56 let mut cursor = DocumentsBatchCursor { cursor };
57 cursor.reset();
58 (cursor, fields_index)
59 }
60}
61
62pub struct DocumentsBatchCursor<R> {
64 cursor: grenad::ReaderCursor<R>,
65}
66
67impl<R> DocumentsBatchCursor<R> {
68 pub fn reset(&mut self) {
70 self.cursor.reset();
71 }
72}
73
74impl<R: io::Read + io::Seek> DocumentsBatchCursor<R> {
75 pub fn get(
77 &mut self,
78 offset: u32,
79 ) -> Result<Option<&KvReader<FieldId>>, DocumentsBatchCursorError> {
80 match self.cursor.move_on_key_equal_to(offset.to_be_bytes())? {
81 Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => Ok(Some(value.into())),
82 _otherwise => Ok(None),
83 }
84 }
85
86 pub fn next_document(
89 &mut self,
90 ) -> Result<Option<&KvReader<FieldId>>, DocumentsBatchCursorError> {
91 match self.cursor.move_on_next()? {
92 Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => Ok(Some(value.into())),
93 _otherwise => Ok(None),
94 }
95 }
96}
97
98#[derive(Debug)]
100pub enum DocumentsBatchCursorError {
101 Grenad(grenad::Error),
102 SerdeJson(serde_json::Error),
103}
104
105impl From<grenad::Error> for DocumentsBatchCursorError {
106 fn from(error: grenad::Error) -> DocumentsBatchCursorError {
107 DocumentsBatchCursorError::Grenad(error)
108 }
109}
110
111impl From<serde_json::Error> for DocumentsBatchCursorError {
112 fn from(error: serde_json::Error) -> DocumentsBatchCursorError {
113 DocumentsBatchCursorError::SerdeJson(error)
114 }
115}
116
117impl error::Error for DocumentsBatchCursorError {}
118
119impl fmt::Display for DocumentsBatchCursorError {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 match self {
122 DocumentsBatchCursorError::Grenad(e) => e.fmt(f),
123 DocumentsBatchCursorError::SerdeJson(e) => e.fmt(f),
124 }
125 }
126}