milli_core/update/index_documents/
enrich.rs1use std::fmt;
2use std::io::{BufWriter, Read, Seek};
3use std::result::Result as StdResult;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::constants::RESERVED_GEO_FIELD_NAME;
9use crate::documents::{
10 DocumentIdExtractionError, DocumentsBatchIndex, DocumentsBatchReader,
11 EnrichedDocumentsBatchReader, PrimaryKey, DEFAULT_PRIMARY_KEY,
12};
13use crate::error::{GeoError, InternalError, UserError};
14use crate::update::index_documents::{obkv_to_object, writer_into_reader};
15use crate::{FieldId, Index, Result};
16
17#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
27pub fn enrich_documents_batch<R: Read + Seek>(
28 rtxn: &heed::RoTxn<'_>,
29 index: &Index,
30 autogenerate_docids: bool,
31 reader: DocumentsBatchReader<R>,
32) -> Result<StdResult<EnrichedDocumentsBatchReader<R>, UserError>> {
33 let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index();
34
35 let mut external_ids = tempfile::tempfile().map(BufWriter::new).map(grenad::Writer::new)?;
36 let mut uuid_buffer = [0; uuid::fmt::Hyphenated::LENGTH];
37
38 let primary_key = match index.primary_key(rtxn)? {
41 Some(primary_key) => match PrimaryKey::new(primary_key, &documents_batch_index) {
42 Some(primary_key) => primary_key,
43 None if autogenerate_docids => PrimaryKey::Flat {
44 name: primary_key,
45 field_id: documents_batch_index.insert(primary_key),
46 },
47 None => {
48 return match cursor.next_document()? {
49 Some(first_document) => Ok(Err(UserError::MissingDocumentId {
50 primary_key: primary_key.to_string(),
51 document: obkv_to_object(first_document, &documents_batch_index)?,
52 })),
53 None => unreachable!("Called with reader.is_empty()"),
54 };
55 }
56 },
57 None => {
58 let mut guesses: Vec<(u16, &str)> = documents_batch_index
59 .iter()
60 .filter(|(_, name)| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY))
61 .map(|(field_id, name)| (*field_id, name.as_str()))
62 .collect();
63
64 guesses.sort_by(|(_, left_name), (_, right_name)| {
66 left_name.len().cmp(&right_name.len()).then_with(
68 || left_name.cmp(right_name),
70 )
71 });
72
73 match guesses.as_slice() {
74 [] if autogenerate_docids => PrimaryKey::Flat {
75 name: DEFAULT_PRIMARY_KEY,
76 field_id: documents_batch_index.insert(DEFAULT_PRIMARY_KEY),
77 },
78 [] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
79 [(field_id, name)] => {
80 tracing::info!("Primary key was not specified in index. Inferred to '{name}'");
81 PrimaryKey::Flat { name, field_id: *field_id }
82 }
83 multiple => {
84 return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
85 candidates: multiple
86 .iter()
87 .map(|(_, candidate)| candidate.to_string())
88 .collect(),
89 }));
90 }
91 }
92 }
93 };
94
95 let geo_field_id = match documents_batch_index.id(RESERVED_GEO_FIELD_NAME) {
98 Some(geo_field_id) if index.is_geo_enabled(rtxn)? => Some(geo_field_id),
99 _otherwise => None,
100 };
101
102 let mut count = 0;
103 while let Some(document) = cursor.next_document()? {
104 let document_id = match fetch_or_generate_document_id(
105 document,
106 &documents_batch_index,
107 primary_key,
108 autogenerate_docids,
109 &mut uuid_buffer,
110 count,
111 )? {
112 Ok(document_id) => document_id,
113 Err(user_error) => return Ok(Err(user_error)),
114 };
115
116 if let Some(geo_value) = geo_field_id.and_then(|fid| document.get(fid)) {
117 if let Err(user_error) = validate_geo_from_json(&document_id, geo_value)? {
118 return Ok(Err(UserError::from(Box::new(user_error))));
119 }
120 }
121
122 let document_id = serde_json::to_vec(&document_id).map_err(InternalError::SerdeJson)?;
123 external_ids.insert(count.to_be_bytes(), document_id)?;
124
125 count += 1;
126 }
127
128 let external_ids = writer_into_reader(external_ids)?;
129 let primary_key_name = primary_key.name().to_string();
130 let reader = EnrichedDocumentsBatchReader::new(
131 DocumentsBatchReader::new(cursor, documents_batch_index),
132 primary_key_name,
133 external_ids,
134 )?;
135
136 Ok(Ok(reader))
137}
138
139#[tracing::instrument(level = "trace", skip(uuid_buffer, documents_batch_index, document)
142target = "indexing::documents")]
143fn fetch_or_generate_document_id(
144 document: &obkv::KvReader<FieldId>,
145 documents_batch_index: &DocumentsBatchIndex,
146 primary_key: PrimaryKey<'_>,
147 autogenerate_docids: bool,
148 uuid_buffer: &mut [u8; uuid::fmt::Hyphenated::LENGTH],
149 count: u32,
150) -> Result<StdResult<DocumentId, UserError>> {
151 Ok(match primary_key.document_id(document, documents_batch_index)? {
152 Ok(document_id) => Ok(DocumentId::Retrieved { value: document_id }),
153 Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => Err(user_error),
154 Err(DocumentIdExtractionError::MissingDocumentId) if autogenerate_docids => {
155 let uuid = uuid::Uuid::new_v4().as_hyphenated().encode_lower(uuid_buffer);
156 Ok(DocumentId::Generated { value: uuid.to_string(), document_nth: count })
157 }
158 Err(DocumentIdExtractionError::MissingDocumentId) => Err(UserError::MissingDocumentId {
159 primary_key: primary_key.name().to_string(),
160 document: obkv_to_object(document, documents_batch_index)?,
161 }),
162 Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => {
163 Err(UserError::TooManyDocumentIds {
164 primary_key: primary_key.name().to_string(),
165 document: obkv_to_object(document, documents_batch_index)?,
166 })
167 }
168 })
169}
170
171#[derive(Serialize, Deserialize, Clone)]
176pub enum DocumentId {
177 Retrieved { value: String },
178 Generated { value: String, document_nth: u32 },
179}
180
181impl DocumentId {
182 fn debug(&self) -> String {
183 format!("{:?}", self)
184 }
185
186 pub fn is_generated(&self) -> bool {
187 matches!(self, DocumentId::Generated { .. })
188 }
189
190 pub fn value(&self) -> &str {
191 match self {
192 DocumentId::Retrieved { value } => value,
193 DocumentId::Generated { value, .. } => value,
194 }
195 }
196}
197
198impl fmt::Debug for DocumentId {
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 match self {
201 DocumentId::Retrieved { value } => write!(f, "{:?}", value),
202 DocumentId::Generated { value, document_nth } => {
203 write!(f, "{{{:?}}} of the {}nth document", value, document_nth)
204 }
205 }
206 }
207}
208
209pub fn extract_finite_float_from_value(value: Value) -> StdResult<f64, Value> {
212 let number = match value {
213 Value::Number(ref n) => match n.as_f64() {
214 Some(number) => number,
215 None => return Err(value),
216 },
217 Value::String(ref s) => match s.parse::<f64>() {
218 Ok(number) => number,
219 Err(_) => return Err(value),
220 },
221 value => return Err(value),
222 };
223
224 if number.is_finite() {
225 Ok(number)
226 } else {
227 Err(value)
228 }
229}
230
231pub fn validate_geo_from_json(id: &DocumentId, bytes: &[u8]) -> Result<StdResult<(), GeoError>> {
232 use GeoError::*;
233 let debug_id = || {
234 serde_json::from_slice(id.value().as_bytes()).unwrap_or_else(|_| Value::from(id.debug()))
235 };
236 match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson)? {
237 Value::Object(mut object) => match (object.remove("lat"), object.remove("lng")) {
238 (Some(lat), Some(lng)) => {
239 match (extract_finite_float_from_value(lat), extract_finite_float_from_value(lng)) {
240 (Ok(_), Ok(_)) if !object.is_empty() => Ok(Err(UnexpectedExtraFields {
241 document_id: debug_id(),
242 value: object.into(),
243 })),
244 (Ok(_), Ok(_)) => Ok(Ok(())),
245 (Err(value), Ok(_)) => Ok(Err(BadLatitude { document_id: debug_id(), value })),
246 (Ok(_), Err(value)) => Ok(Err(BadLongitude { document_id: debug_id(), value })),
247 (Err(lat), Err(lng)) => {
248 Ok(Err(BadLatitudeAndLongitude { document_id: debug_id(), lat, lng }))
249 }
250 }
251 }
252 (None, Some(_)) => Ok(Err(MissingLatitude { document_id: debug_id() })),
253 (Some(_), None) => Ok(Err(MissingLongitude { document_id: debug_id() })),
254 (None, None) => Ok(Err(MissingLatitudeAndLongitude { document_id: debug_id() })),
255 },
256 Value::Null => Ok(Ok(())),
257 value => Ok(Err(NotAnObject { document_id: debug_id(), value })),
258 }
259}