milli_core/documents/
builder.rs

1use std::io::{self, Write};
2
3use grenad::{CompressionType, WriterBuilder};
4use serde::de::Deserializer;
5use serde_json::to_writer;
6
7use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY};
8use crate::documents::serde_impl::DocumentVisitor;
9use crate::Object;
10
11/// The `DocumentsBatchBuilder` provides a way to build a documents batch in the intermediary
12/// format used by milli.
13///
14/// The writer used by the `DocumentsBatchBuilder` can be read using a `DocumentsBatchReader`
15/// to iterate over the documents.
16///
17/// ## example:
18/// ```
19/// use serde_json::json;
20/// use milli_core::documents::DocumentsBatchBuilder;
21///
22/// let json = json!({ "id": 1, "name": "foo" });
23///
24/// let mut builder = DocumentsBatchBuilder::new(Vec::new());
25/// builder.append_json_object(json.as_object().unwrap()).unwrap();
26/// let _vector = builder.into_inner().unwrap();
27/// ```
28pub struct DocumentsBatchBuilder<W> {
29    /// The inner grenad writer, the last value must always be the `DocumentsBatchIndex`.
30    writer: grenad::Writer<W>,
31    /// A map that creates the relation between field ids and field names.
32    fields_index: DocumentsBatchIndex,
33    /// The number of documents that were added to this builder,
34    /// it doesn't take the primary key of the documents into account at this point.
35    documents_count: u32,
36
37    /// A buffer to store a temporary obkv buffer and avoid reallocating.
38    obkv_buffer: Vec<u8>,
39    /// A buffer to serialize the values and avoid reallocating,
40    /// serialized values are stored in an obkv.
41    value_buffer: Vec<u8>,
42}
43
44impl<W: Write> DocumentsBatchBuilder<W> {
45    pub fn new(writer: W) -> DocumentsBatchBuilder<W> {
46        DocumentsBatchBuilder {
47            writer: WriterBuilder::new().compression_type(CompressionType::None).build(writer),
48            fields_index: DocumentsBatchIndex::default(),
49            documents_count: 0,
50            obkv_buffer: Vec::new(),
51            value_buffer: Vec::new(),
52        }
53    }
54
55    /// Returns the number of documents inserted into this builder.
56    pub fn documents_count(&self) -> u32 {
57        self.documents_count
58    }
59
60    /// Appends a new JSON object into the batch and updates the `DocumentsBatchIndex` accordingly.
61    pub fn append_json_object(&mut self, object: &Object) -> io::Result<()> {
62        // Make sure that we insert the fields ids in order as the obkv writer has this requirement.
63        let mut fields_ids: Vec<_> = object.keys().map(|k| self.fields_index.insert(k)).collect();
64        fields_ids.sort_unstable();
65
66        self.obkv_buffer.clear();
67        let mut writer = obkv::KvWriter::new(&mut self.obkv_buffer);
68        for field_id in fields_ids {
69            let key = self.fields_index.name(field_id).unwrap();
70            self.value_buffer.clear();
71            to_writer(&mut self.value_buffer, &object[key])?;
72            writer.insert(field_id, &self.value_buffer)?;
73        }
74
75        let internal_id = self.documents_count.to_be_bytes();
76        let document_bytes = writer.into_inner()?;
77        self.writer.insert(internal_id, &document_bytes)?;
78        self.documents_count += 1;
79
80        Ok(())
81    }
82
83    /// Appends a new JSON array of objects into the batch and updates the `DocumentsBatchIndex` accordingly.
84    pub fn append_json_array<R: io::Read>(&mut self, reader: R) -> Result<(), Error> {
85        let mut de = serde_json::Deserializer::from_reader(reader);
86        let mut visitor = DocumentVisitor::new(self);
87        de.deserialize_any(&mut visitor)?
88    }
89
90    /// Flushes the content on disk and stores the final version of the `DocumentsBatchIndex`.
91    pub fn into_inner(mut self) -> io::Result<W> {
92        let DocumentsBatchBuilder { mut writer, fields_index, .. } = self;
93
94        // We serialize and insert the `DocumentsBatchIndex` as the last key of the grenad writer.
95        self.value_buffer.clear();
96        to_writer(&mut self.value_buffer, &fields_index)?;
97        writer.insert(DOCUMENTS_BATCH_INDEX_KEY, &self.value_buffer)?;
98
99        writer.into_inner()
100    }
101}
102
103#[cfg(test)]
104mod test {
105    use std::io::Cursor;
106
107    use super::*;
108    use crate::documents::DocumentsBatchReader;
109
110    #[test]
111    fn add_single_documents_json() {
112        let json = serde_json::json!({
113            "id": 1,
114            "field": "hello!",
115        });
116
117        let mut builder = DocumentsBatchBuilder::new(Vec::new());
118        builder.append_json_object(json.as_object().unwrap()).unwrap();
119
120        let json = serde_json::json!({
121            "blabla": false,
122            "field": "hello!",
123            "id": 1,
124        });
125
126        builder.append_json_object(json.as_object().unwrap()).unwrap();
127
128        assert_eq!(builder.documents_count(), 2);
129        let vector = builder.into_inner().unwrap();
130
131        let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
132            .unwrap()
133            .into_cursor_and_fields_index();
134        assert_eq!(index.len(), 3);
135
136        let document = cursor.next_document().unwrap().unwrap();
137        assert_eq!(document.iter().count(), 2);
138
139        let document = cursor.next_document().unwrap().unwrap();
140        assert_eq!(document.iter().count(), 3);
141
142        assert!(cursor.next_document().unwrap().is_none());
143    }
144}