milli_core/documents/
mod.rs

1mod builder;
2mod enriched;
3mod primary_key;
4mod reader;
5mod serde_impl;
6
7use std::fmt::Debug;
8use std::io;
9use std::str::Utf8Error;
10
11use bimap::BiHashMap;
12pub use builder::DocumentsBatchBuilder;
13pub use enriched::{EnrichedDocument, EnrichedDocumentsBatchCursor, EnrichedDocumentsBatchReader};
14use obkv::KvReader;
15pub use primary_key::{
16    validate_document_id_str, validate_document_id_value, DocumentIdExtractionError, FieldIdMapper,
17    PrimaryKey, DEFAULT_PRIMARY_KEY,
18};
19pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader};
20use serde::{Deserialize, Serialize};
21
22use crate::error::{FieldIdMapMissingEntry, InternalError};
23use crate::{FieldId, Object, Result};
24
25/// The key that is used to store the `DocumentsBatchIndex` datastructure,
26/// it is the absolute last key of the list.
27const DOCUMENTS_BATCH_INDEX_KEY: [u8; 8] = u64::MAX.to_be_bytes();
28
29/// Helper function to convert an obkv reader into a JSON object.
30pub fn obkv_to_object(obkv: &KvReader<FieldId>, index: &DocumentsBatchIndex) -> Result<Object> {
31    obkv.iter()
32        .map(|(field_id, value)| {
33            let field_name = index
34                .name(field_id)
35                .ok_or(FieldIdMapMissingEntry::FieldId { field_id, process: "obkv_to_object" })?;
36            let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
37            Ok((field_name.to_string(), value))
38        })
39        .collect()
40}
41
42/// A bidirectional map that links field ids to their name in a document batch.
43#[derive(Default, Clone, Debug, Serialize, Deserialize)]
44pub struct DocumentsBatchIndex(pub BiHashMap<FieldId, String>);
45
46impl DocumentsBatchIndex {
47    /// Insert the field in the map, or return it's field id if it doesn't already exists.
48    pub fn insert(&mut self, field: &str) -> FieldId {
49        match self.0.get_by_right(field) {
50            Some(field_id) => *field_id,
51            None => {
52                let field_id = self.0.len() as FieldId;
53                self.0.insert(field_id, field.to_string());
54                field_id
55            }
56        }
57    }
58
59    pub fn is_empty(&self) -> bool {
60        self.0.is_empty()
61    }
62
63    pub fn len(&self) -> usize {
64        self.0.len()
65    }
66
67    pub fn iter(&self) -> bimap::hash::Iter<'_, FieldId, String> {
68        self.0.iter()
69    }
70
71    pub fn name(&self, id: FieldId) -> Option<&str> {
72        self.0.get_by_left(&id).map(AsRef::as_ref)
73    }
74
75    pub fn id(&self, name: &str) -> Option<FieldId> {
76        self.0.get_by_right(name).cloned()
77    }
78
79    pub fn recreate_json(&self, document: &obkv::KvReaderU16) -> Result<Object> {
80        let mut map = Object::new();
81
82        for (k, v) in document.iter() {
83            let key = self
84                .0
85                .get_by_left(&k)
86                .ok_or(crate::error::InternalError::FieldIdMapMissingEntry(
87                    FieldIdMapMissingEntry::FieldId { field_id: k, process: "recreate_json" },
88                ))?
89                .clone();
90            let value = serde_json::from_slice::<serde_json::Value>(v)
91                .map_err(crate::error::InternalError::SerdeJson)?;
92            map.insert(key, value);
93        }
94
95        Ok(map)
96    }
97}
98
99impl FieldIdMapper for DocumentsBatchIndex {
100    fn id(&self, name: &str) -> Option<FieldId> {
101        self.id(name)
102    }
103
104    fn name(&self, id: FieldId) -> Option<&str> {
105        self.name(id)
106    }
107}
108
109#[derive(Debug, thiserror::Error)]
110pub enum Error {
111    #[error("Error parsing number {value:?} at line {line}: {error}")]
112    ParseFloat { error: std::num::ParseFloatError, line: usize, value: String },
113    #[error("Error parsing boolean {value:?} at line {line}: {error}")]
114    ParseBool { error: std::str::ParseBoolError, line: usize, value: String },
115    #[error("Invalid document addition format, missing the documents batch index.")]
116    InvalidDocumentFormat,
117    #[error("Invalid enriched data.")]
118    InvalidEnrichedData,
119    #[error(transparent)]
120    InvalidUtf8(#[from] Utf8Error),
121    #[error(transparent)]
122    Csv(#[from] csv::Error),
123    #[error(transparent)]
124    Json(#[from] serde_json::Error),
125    #[error(transparent)]
126    Serialize(serde_json::Error),
127    #[error(transparent)]
128    Grenad(#[from] grenad::Error),
129    #[error(transparent)]
130    Io(#[from] io::Error),
131}
132
133pub fn objects_from_json_value(json: serde_json::Value) -> Vec<crate::Object> {
134    let documents = match json {
135        object @ serde_json::Value::Object(_) => vec![object],
136        serde_json::Value::Array(objects) => objects,
137        invalid => {
138            panic!("an array of objects must be specified, {:#?} is not an array", invalid)
139        }
140    };
141    let mut objects = vec![];
142    for document in documents {
143        let object = match document {
144            serde_json::Value::Object(object) => object,
145            invalid => panic!("an object must be specified, {:#?} is not an object", invalid),
146        };
147        objects.push(object);
148    }
149    objects
150}
151
152/// Macro used to generate documents, with the same syntax as `serde_json::json`
153#[cfg(test)]
154macro_rules! documents {
155    ($data:tt) => {{
156        let documents = serde_json::json!($data);
157        let mut file = tempfile::tempfile().unwrap();
158
159        match documents {
160            serde_json::Value::Array(vec) => {
161                for document in vec {
162                    serde_json::to_writer(&mut file, &document).unwrap();
163                }
164            }
165            serde_json::Value::Object(document) => {
166                serde_json::to_writer(&mut file, &document).unwrap();
167            }
168            _ => unimplemented!("The `documents!` macro only support Objects and Array"),
169        }
170        file.sync_all().unwrap();
171        unsafe { memmap2::Mmap::map(&file).unwrap() }
172    }};
173}
174
175pub fn mmap_from_objects(objects: impl IntoIterator<Item = Object>) -> memmap2::Mmap {
176    let mut writer = tempfile::tempfile().map(std::io::BufWriter::new).unwrap();
177    for object in objects {
178        serde_json::to_writer(&mut writer, &object).unwrap();
179    }
180    let file = writer.into_inner().unwrap();
181    unsafe { memmap2::Mmap::map(&file).unwrap() }
182}
183
184pub fn documents_batch_reader_from_objects(
185    objects: impl IntoIterator<Item = Object>,
186) -> DocumentsBatchReader<std::io::Cursor<Vec<u8>>> {
187    let mut builder = DocumentsBatchBuilder::new(Vec::new());
188    for object in objects {
189        builder.append_json_object(&object).unwrap();
190    }
191    let vector = builder.into_inner().unwrap();
192    DocumentsBatchReader::from_reader(std::io::Cursor::new(vector)).unwrap()
193}
194
195#[cfg(test)]
196mod test {
197    use std::io::Cursor;
198
199    use serde_json::json;
200
201    use super::*;
202
203    #[test]
204    fn create_documents_no_errors() {
205        let value = json!({
206            "number": 1,
207            "string": "this is a field",
208            "array": ["an", "array"],
209            "object": {
210                "key": "value",
211            },
212            "bool": true
213        });
214
215        let mut builder = DocumentsBatchBuilder::new(Vec::new());
216        builder.append_json_object(value.as_object().unwrap()).unwrap();
217        let vector = builder.into_inner().unwrap();
218
219        let (mut documents, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
220            .unwrap()
221            .into_cursor_and_fields_index();
222
223        assert_eq!(index.iter().count(), 5);
224        let reader = documents.next_document().unwrap().unwrap();
225        assert_eq!(reader.iter().count(), 5);
226        assert!(documents.next_document().unwrap().is_none());
227    }
228
229    #[test]
230    fn test_add_multiple_documents() {
231        let doc1 = json!({
232            "bool": true,
233        });
234        let doc2 = json!({
235            "toto": false,
236        });
237
238        let mut builder = DocumentsBatchBuilder::new(Vec::new());
239        builder.append_json_object(doc1.as_object().unwrap()).unwrap();
240        builder.append_json_object(doc2.as_object().unwrap()).unwrap();
241        let vector = builder.into_inner().unwrap();
242
243        let (mut documents, index) = DocumentsBatchReader::from_reader(io::Cursor::new(vector))
244            .unwrap()
245            .into_cursor_and_fields_index();
246        assert_eq!(index.iter().count(), 2);
247        let reader = documents.next_document().unwrap().unwrap();
248        assert_eq!(reader.iter().count(), 1);
249        assert!(documents.next_document().unwrap().is_some());
250        assert!(documents.next_document().unwrap().is_none());
251    }
252
253    #[test]
254    fn out_of_order_json_fields() {
255        let _documents = documents!([
256            {"id": 1,"b": 0},
257            {"id": 2,"a": 0,"b": 0},
258        ]);
259    }
260}