milli_core/update/index_documents/
enrich.rs

1use std::fmt;
2use std::io::{BufWriter, Read, Seek};
3use std::result::Result as StdResult;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::constants::RESERVED_GEO_FIELD_NAME;
9use crate::documents::{
10    DocumentIdExtractionError, DocumentsBatchIndex, DocumentsBatchReader,
11    EnrichedDocumentsBatchReader, PrimaryKey, DEFAULT_PRIMARY_KEY,
12};
13use crate::error::{GeoError, InternalError, UserError};
14use crate::update::index_documents::{obkv_to_object, writer_into_reader};
15use crate::{FieldId, Index, Result};
16
17/// This function validates and enrich the documents by checking that:
18///  - we can infer a primary key,
19///  - all the documents id exist and are extracted,
20///  - the validity of them but also,
21///  - the validity of the `_geo` field depending on the settings.
22///
23/// # Panics
24///
25/// - if reader.is_empty(), this function may panic in some cases
26#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
27pub fn enrich_documents_batch<R: Read + Seek>(
28    rtxn: &heed::RoTxn<'_>,
29    index: &Index,
30    autogenerate_docids: bool,
31    reader: DocumentsBatchReader<R>,
32) -> Result<StdResult<EnrichedDocumentsBatchReader<R>, UserError>> {
33    let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index();
34
35    let mut external_ids = tempfile::tempfile().map(BufWriter::new).map(grenad::Writer::new)?;
36    let mut uuid_buffer = [0; uuid::fmt::Hyphenated::LENGTH];
37
38    // The primary key *field id* that has already been set for this index or the one
39    // we will guess by searching for the first key that contains "id" as a substring.
40    let primary_key = match index.primary_key(rtxn)? {
41        Some(primary_key) => match PrimaryKey::new(primary_key, &documents_batch_index) {
42            Some(primary_key) => primary_key,
43            None if autogenerate_docids => PrimaryKey::Flat {
44                name: primary_key,
45                field_id: documents_batch_index.insert(primary_key),
46            },
47            None => {
48                return match cursor.next_document()? {
49                    Some(first_document) => Ok(Err(UserError::MissingDocumentId {
50                        primary_key: primary_key.to_string(),
51                        document: obkv_to_object(first_document, &documents_batch_index)?,
52                    })),
53                    None => unreachable!("Called with reader.is_empty()"),
54                };
55            }
56        },
57        None => {
58            let mut guesses: Vec<(u16, &str)> = documents_batch_index
59                .iter()
60                .filter(|(_, name)| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY))
61                .map(|(field_id, name)| (*field_id, name.as_str()))
62                .collect();
63
64            // sort the keys in a deterministic, obvious way, so that fields are always in the same order.
65            guesses.sort_by(|(_, left_name), (_, right_name)| {
66                // shortest name first
67                left_name.len().cmp(&right_name.len()).then_with(
68                    // then alphabetical order
69                    || left_name.cmp(right_name),
70                )
71            });
72
73            match guesses.as_slice() {
74                [] if autogenerate_docids => PrimaryKey::Flat {
75                    name: DEFAULT_PRIMARY_KEY,
76                    field_id: documents_batch_index.insert(DEFAULT_PRIMARY_KEY),
77                },
78                [] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
79                [(field_id, name)] => {
80                    tracing::info!("Primary key was not specified in index. Inferred to '{name}'");
81                    PrimaryKey::Flat { name, field_id: *field_id }
82                }
83                multiple => {
84                    return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
85                        candidates: multiple
86                            .iter()
87                            .map(|(_, candidate)| candidate.to_string())
88                            .collect(),
89                    }));
90                }
91            }
92        }
93    };
94
95    // If the settings specifies that a _geo field must be used therefore we must check the
96    // validity of it in all the documents of this batch and this is when we return `Some`.
97    let geo_field_id = match documents_batch_index.id(RESERVED_GEO_FIELD_NAME) {
98        Some(geo_field_id) if index.is_geo_enabled(rtxn)? => Some(geo_field_id),
99        _otherwise => None,
100    };
101
102    let mut count = 0;
103    while let Some(document) = cursor.next_document()? {
104        let document_id = match fetch_or_generate_document_id(
105            document,
106            &documents_batch_index,
107            primary_key,
108            autogenerate_docids,
109            &mut uuid_buffer,
110            count,
111        )? {
112            Ok(document_id) => document_id,
113            Err(user_error) => return Ok(Err(user_error)),
114        };
115
116        if let Some(geo_value) = geo_field_id.and_then(|fid| document.get(fid)) {
117            if let Err(user_error) = validate_geo_from_json(&document_id, geo_value)? {
118                return Ok(Err(UserError::from(Box::new(user_error))));
119            }
120        }
121
122        let document_id = serde_json::to_vec(&document_id).map_err(InternalError::SerdeJson)?;
123        external_ids.insert(count.to_be_bytes(), document_id)?;
124
125        count += 1;
126    }
127
128    let external_ids = writer_into_reader(external_ids)?;
129    let primary_key_name = primary_key.name().to_string();
130    let reader = EnrichedDocumentsBatchReader::new(
131        DocumentsBatchReader::new(cursor, documents_batch_index),
132        primary_key_name,
133        external_ids,
134    )?;
135
136    Ok(Ok(reader))
137}
138
139/// Retrieve the document id after validating it, returning a `UserError`
140/// if the id is invalid or can't be guessed.
141#[tracing::instrument(level = "trace", skip(uuid_buffer, documents_batch_index, document)
142target = "indexing::documents")]
143fn fetch_or_generate_document_id(
144    document: &obkv::KvReader<FieldId>,
145    documents_batch_index: &DocumentsBatchIndex,
146    primary_key: PrimaryKey<'_>,
147    autogenerate_docids: bool,
148    uuid_buffer: &mut [u8; uuid::fmt::Hyphenated::LENGTH],
149    count: u32,
150) -> Result<StdResult<DocumentId, UserError>> {
151    Ok(match primary_key.document_id(document, documents_batch_index)? {
152        Ok(document_id) => Ok(DocumentId::Retrieved { value: document_id }),
153        Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => Err(user_error),
154        Err(DocumentIdExtractionError::MissingDocumentId) if autogenerate_docids => {
155            let uuid = uuid::Uuid::new_v4().as_hyphenated().encode_lower(uuid_buffer);
156            Ok(DocumentId::Generated { value: uuid.to_string(), document_nth: count })
157        }
158        Err(DocumentIdExtractionError::MissingDocumentId) => Err(UserError::MissingDocumentId {
159            primary_key: primary_key.name().to_string(),
160            document: obkv_to_object(document, documents_batch_index)?,
161        }),
162        Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => {
163            Err(UserError::TooManyDocumentIds {
164                primary_key: primary_key.name().to_string(),
165                document: obkv_to_object(document, documents_batch_index)?,
166            })
167        }
168    })
169}
170
171/// A type that represents a document id that has been retrieved from a document or auto-generated.
172///
173/// In case the document id has been auto-generated, the document nth is kept to help
174/// users debug if there is an issue with the document itself.
175#[derive(Serialize, Deserialize, Clone)]
176pub enum DocumentId {
177    Retrieved { value: String },
178    Generated { value: String, document_nth: u32 },
179}
180
181impl DocumentId {
182    fn debug(&self) -> String {
183        format!("{:?}", self)
184    }
185
186    pub fn is_generated(&self) -> bool {
187        matches!(self, DocumentId::Generated { .. })
188    }
189
190    pub fn value(&self) -> &str {
191        match self {
192            DocumentId::Retrieved { value } => value,
193            DocumentId::Generated { value, .. } => value,
194        }
195    }
196}
197
198impl fmt::Debug for DocumentId {
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        match self {
201            DocumentId::Retrieved { value } => write!(f, "{:?}", value),
202            DocumentId::Generated { value, document_nth } => {
203                write!(f, "{{{:?}}} of the {}nth document", value, document_nth)
204            }
205        }
206    }
207}
208
209/// Try to extract an `f64` from a JSON `Value` and return the `Value`
210/// in the `Err` variant if it failed.
211pub fn extract_finite_float_from_value(value: Value) -> StdResult<f64, Value> {
212    let number = match value {
213        Value::Number(ref n) => match n.as_f64() {
214            Some(number) => number,
215            None => return Err(value),
216        },
217        Value::String(ref s) => match s.parse::<f64>() {
218            Ok(number) => number,
219            Err(_) => return Err(value),
220        },
221        value => return Err(value),
222    };
223
224    if number.is_finite() {
225        Ok(number)
226    } else {
227        Err(value)
228    }
229}
230
231pub fn validate_geo_from_json(id: &DocumentId, bytes: &[u8]) -> Result<StdResult<(), GeoError>> {
232    use GeoError::*;
233    let debug_id = || {
234        serde_json::from_slice(id.value().as_bytes()).unwrap_or_else(|_| Value::from(id.debug()))
235    };
236    match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson)? {
237        Value::Object(mut object) => match (object.remove("lat"), object.remove("lng")) {
238            (Some(lat), Some(lng)) => {
239                match (extract_finite_float_from_value(lat), extract_finite_float_from_value(lng)) {
240                    (Ok(_), Ok(_)) if !object.is_empty() => Ok(Err(UnexpectedExtraFields {
241                        document_id: debug_id(),
242                        value: object.into(),
243                    })),
244                    (Ok(_), Ok(_)) => Ok(Ok(())),
245                    (Err(value), Ok(_)) => Ok(Err(BadLatitude { document_id: debug_id(), value })),
246                    (Ok(_), Err(value)) => Ok(Err(BadLongitude { document_id: debug_id(), value })),
247                    (Err(lat), Err(lng)) => {
248                        Ok(Err(BadLatitudeAndLongitude { document_id: debug_id(), lat, lng }))
249                    }
250                }
251            }
252            (None, Some(_)) => Ok(Err(MissingLatitude { document_id: debug_id() })),
253            (Some(_), None) => Ok(Err(MissingLongitude { document_id: debug_id() })),
254            (None, None) => Ok(Err(MissingLatitudeAndLongitude { document_id: debug_id() })),
255        },
256        Value::Null => Ok(Ok(())),
257        value => Ok(Err(NotAnObject { document_id: debug_id(), value })),
258    }
259}