milli_core/documents/
mod.rs1mod builder;
2mod enriched;
3mod primary_key;
4mod reader;
5mod serde_impl;
6
7use std::fmt::Debug;
8use std::io;
9use std::str::Utf8Error;
10
11use bimap::BiHashMap;
12pub use builder::DocumentsBatchBuilder;
13pub use enriched::{EnrichedDocument, EnrichedDocumentsBatchCursor, EnrichedDocumentsBatchReader};
14use obkv::KvReader;
15pub use primary_key::{
16 validate_document_id_str, validate_document_id_value, DocumentIdExtractionError, FieldIdMapper,
17 PrimaryKey, DEFAULT_PRIMARY_KEY,
18};
19pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader};
20use serde::{Deserialize, Serialize};
21
22use crate::error::{FieldIdMapMissingEntry, InternalError};
23use crate::{FieldId, Object, Result};
24
25const DOCUMENTS_BATCH_INDEX_KEY: [u8; 8] = u64::MAX.to_be_bytes();
28
29pub fn obkv_to_object(obkv: &KvReader<FieldId>, index: &DocumentsBatchIndex) -> Result<Object> {
31 obkv.iter()
32 .map(|(field_id, value)| {
33 let field_name = index
34 .name(field_id)
35 .ok_or(FieldIdMapMissingEntry::FieldId { field_id, process: "obkv_to_object" })?;
36 let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
37 Ok((field_name.to_string(), value))
38 })
39 .collect()
40}
41
42#[derive(Default, Clone, Debug, Serialize, Deserialize)]
44pub struct DocumentsBatchIndex(pub BiHashMap<FieldId, String>);
45
46impl DocumentsBatchIndex {
47 pub fn insert(&mut self, field: &str) -> FieldId {
49 match self.0.get_by_right(field) {
50 Some(field_id) => *field_id,
51 None => {
52 let field_id = self.0.len() as FieldId;
53 self.0.insert(field_id, field.to_string());
54 field_id
55 }
56 }
57 }
58
59 pub fn is_empty(&self) -> bool {
60 self.0.is_empty()
61 }
62
63 pub fn len(&self) -> usize {
64 self.0.len()
65 }
66
67 pub fn iter(&self) -> bimap::hash::Iter<'_, FieldId, String> {
68 self.0.iter()
69 }
70
71 pub fn name(&self, id: FieldId) -> Option<&str> {
72 self.0.get_by_left(&id).map(AsRef::as_ref)
73 }
74
75 pub fn id(&self, name: &str) -> Option<FieldId> {
76 self.0.get_by_right(name).cloned()
77 }
78
79 pub fn recreate_json(&self, document: &obkv::KvReaderU16) -> Result<Object> {
80 let mut map = Object::new();
81
82 for (k, v) in document.iter() {
83 let key = self
84 .0
85 .get_by_left(&k)
86 .ok_or(crate::error::InternalError::FieldIdMapMissingEntry(
87 FieldIdMapMissingEntry::FieldId { field_id: k, process: "recreate_json" },
88 ))?
89 .clone();
90 let value = serde_json::from_slice::<serde_json::Value>(v)
91 .map_err(crate::error::InternalError::SerdeJson)?;
92 map.insert(key, value);
93 }
94
95 Ok(map)
96 }
97}
98
99impl FieldIdMapper for DocumentsBatchIndex {
100 fn id(&self, name: &str) -> Option<FieldId> {
101 self.id(name)
102 }
103
104 fn name(&self, id: FieldId) -> Option<&str> {
105 self.name(id)
106 }
107}
108
109#[derive(Debug, thiserror::Error)]
110pub enum Error {
111 #[error("Error parsing number {value:?} at line {line}: {error}")]
112 ParseFloat { error: std::num::ParseFloatError, line: usize, value: String },
113 #[error("Error parsing boolean {value:?} at line {line}: {error}")]
114 ParseBool { error: std::str::ParseBoolError, line: usize, value: String },
115 #[error("Invalid document addition format, missing the documents batch index.")]
116 InvalidDocumentFormat,
117 #[error("Invalid enriched data.")]
118 InvalidEnrichedData,
119 #[error(transparent)]
120 InvalidUtf8(#[from] Utf8Error),
121 #[error(transparent)]
122 Csv(#[from] csv::Error),
123 #[error(transparent)]
124 Json(#[from] serde_json::Error),
125 #[error(transparent)]
126 Serialize(serde_json::Error),
127 #[error(transparent)]
128 Grenad(#[from] grenad::Error),
129 #[error(transparent)]
130 Io(#[from] io::Error),
131}
132
133pub fn objects_from_json_value(json: serde_json::Value) -> Vec<crate::Object> {
134 let documents = match json {
135 object @ serde_json::Value::Object(_) => vec![object],
136 serde_json::Value::Array(objects) => objects,
137 invalid => {
138 panic!("an array of objects must be specified, {:#?} is not an array", invalid)
139 }
140 };
141 let mut objects = vec![];
142 for document in documents {
143 let object = match document {
144 serde_json::Value::Object(object) => object,
145 invalid => panic!("an object must be specified, {:#?} is not an object", invalid),
146 };
147 objects.push(object);
148 }
149 objects
150}
151
152#[cfg(test)]
154macro_rules! documents {
155 ($data:tt) => {{
156 let documents = serde_json::json!($data);
157 let mut file = tempfile::tempfile().unwrap();
158
159 match documents {
160 serde_json::Value::Array(vec) => {
161 for document in vec {
162 serde_json::to_writer(&mut file, &document).unwrap();
163 }
164 }
165 serde_json::Value::Object(document) => {
166 serde_json::to_writer(&mut file, &document).unwrap();
167 }
168 _ => unimplemented!("The `documents!` macro only support Objects and Array"),
169 }
170 file.sync_all().unwrap();
171 unsafe { memmap2::Mmap::map(&file).unwrap() }
172 }};
173}
174
175pub fn mmap_from_objects(objects: impl IntoIterator<Item = Object>) -> memmap2::Mmap {
176 let mut writer = tempfile::tempfile().map(std::io::BufWriter::new).unwrap();
177 for object in objects {
178 serde_json::to_writer(&mut writer, &object).unwrap();
179 }
180 let file = writer.into_inner().unwrap();
181 unsafe { memmap2::Mmap::map(&file).unwrap() }
182}
183
184pub fn documents_batch_reader_from_objects(
185 objects: impl IntoIterator<Item = Object>,
186) -> DocumentsBatchReader<std::io::Cursor<Vec<u8>>> {
187 let mut builder = DocumentsBatchBuilder::new(Vec::new());
188 for object in objects {
189 builder.append_json_object(&object).unwrap();
190 }
191 let vector = builder.into_inner().unwrap();
192 DocumentsBatchReader::from_reader(std::io::Cursor::new(vector)).unwrap()
193}
194
195#[cfg(test)]
196mod test {
197 use std::io::Cursor;
198
199 use serde_json::json;
200
201 use super::*;
202
203 #[test]
204 fn create_documents_no_errors() {
205 let value = json!({
206 "number": 1,
207 "string": "this is a field",
208 "array": ["an", "array"],
209 "object": {
210 "key": "value",
211 },
212 "bool": true
213 });
214
215 let mut builder = DocumentsBatchBuilder::new(Vec::new());
216 builder.append_json_object(value.as_object().unwrap()).unwrap();
217 let vector = builder.into_inner().unwrap();
218
219 let (mut documents, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
220 .unwrap()
221 .into_cursor_and_fields_index();
222
223 assert_eq!(index.iter().count(), 5);
224 let reader = documents.next_document().unwrap().unwrap();
225 assert_eq!(reader.iter().count(), 5);
226 assert!(documents.next_document().unwrap().is_none());
227 }
228
229 #[test]
230 fn test_add_multiple_documents() {
231 let doc1 = json!({
232 "bool": true,
233 });
234 let doc2 = json!({
235 "toto": false,
236 });
237
238 let mut builder = DocumentsBatchBuilder::new(Vec::new());
239 builder.append_json_object(doc1.as_object().unwrap()).unwrap();
240 builder.append_json_object(doc2.as_object().unwrap()).unwrap();
241 let vector = builder.into_inner().unwrap();
242
243 let (mut documents, index) = DocumentsBatchReader::from_reader(io::Cursor::new(vector))
244 .unwrap()
245 .into_cursor_and_fields_index();
246 assert_eq!(index.iter().count(), 2);
247 let reader = documents.next_document().unwrap().unwrap();
248 assert_eq!(reader.iter().count(), 1);
249 assert!(documents.next_document().unwrap().is_some());
250 assert!(documents.next_document().unwrap().is_none());
251 }
252
253 #[test]
254 fn out_of_order_json_fields() {
255 let _documents = documents!([
256 {"id": 1,"b": 0},
257 {"id": 2,"a": 0,"b": 0},
258 ]);
259 }
260}