milli_core/documents/
builder.rs1use std::io::{self, Write};
2
3use grenad::{CompressionType, WriterBuilder};
4use serde::de::Deserializer;
5use serde_json::to_writer;
6
7use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY};
8use crate::documents::serde_impl::DocumentVisitor;
9use crate::Object;
10
11pub struct DocumentsBatchBuilder<W> {
29 writer: grenad::Writer<W>,
31 fields_index: DocumentsBatchIndex,
33 documents_count: u32,
36
37 obkv_buffer: Vec<u8>,
39 value_buffer: Vec<u8>,
42}
43
44impl<W: Write> DocumentsBatchBuilder<W> {
45 pub fn new(writer: W) -> DocumentsBatchBuilder<W> {
46 DocumentsBatchBuilder {
47 writer: WriterBuilder::new().compression_type(CompressionType::None).build(writer),
48 fields_index: DocumentsBatchIndex::default(),
49 documents_count: 0,
50 obkv_buffer: Vec::new(),
51 value_buffer: Vec::new(),
52 }
53 }
54
55 pub fn documents_count(&self) -> u32 {
57 self.documents_count
58 }
59
60 pub fn append_json_object(&mut self, object: &Object) -> io::Result<()> {
62 let mut fields_ids: Vec<_> = object.keys().map(|k| self.fields_index.insert(k)).collect();
64 fields_ids.sort_unstable();
65
66 self.obkv_buffer.clear();
67 let mut writer = obkv::KvWriter::new(&mut self.obkv_buffer);
68 for field_id in fields_ids {
69 let key = self.fields_index.name(field_id).unwrap();
70 self.value_buffer.clear();
71 to_writer(&mut self.value_buffer, &object[key])?;
72 writer.insert(field_id, &self.value_buffer)?;
73 }
74
75 let internal_id = self.documents_count.to_be_bytes();
76 let document_bytes = writer.into_inner()?;
77 self.writer.insert(internal_id, &document_bytes)?;
78 self.documents_count += 1;
79
80 Ok(())
81 }
82
83 pub fn append_json_array<R: io::Read>(&mut self, reader: R) -> Result<(), Error> {
85 let mut de = serde_json::Deserializer::from_reader(reader);
86 let mut visitor = DocumentVisitor::new(self);
87 de.deserialize_any(&mut visitor)?
88 }
89
90 pub fn into_inner(mut self) -> io::Result<W> {
92 let DocumentsBatchBuilder { mut writer, fields_index, .. } = self;
93
94 self.value_buffer.clear();
96 to_writer(&mut self.value_buffer, &fields_index)?;
97 writer.insert(DOCUMENTS_BATCH_INDEX_KEY, &self.value_buffer)?;
98
99 writer.into_inner()
100 }
101}
102
103#[cfg(test)]
104mod test {
105 use std::io::Cursor;
106
107 use super::*;
108 use crate::documents::DocumentsBatchReader;
109
110 #[test]
111 fn add_single_documents_json() {
112 let json = serde_json::json!({
113 "id": 1,
114 "field": "hello!",
115 });
116
117 let mut builder = DocumentsBatchBuilder::new(Vec::new());
118 builder.append_json_object(json.as_object().unwrap()).unwrap();
119
120 let json = serde_json::json!({
121 "blabla": false,
122 "field": "hello!",
123 "id": 1,
124 });
125
126 builder.append_json_object(json.as_object().unwrap()).unwrap();
127
128 assert_eq!(builder.documents_count(), 2);
129 let vector = builder.into_inner().unwrap();
130
131 let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
132 .unwrap()
133 .into_cursor_and_fields_index();
134 assert_eq!(index.len(), 3);
135
136 let document = cursor.next_document().unwrap().unwrap();
137 assert_eq!(document.iter().count(), 2);
138
139 let document = cursor.next_document().unwrap().unwrap();
140 assert_eq!(document.iter().count(), 3);
141
142 assert!(cursor.next_document().unwrap().is_none());
143 }
144}