milli_core/update/new/
document.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use bumparaw_collections::RawMap;
4use heed::RoTxn;
5use rustc_hash::FxBuildHasher;
6use serde_json::value::RawValue;
7
8use super::vector_document::VectorDocument;
9use super::{KvReaderFieldId, KvWriterFieldId};
10use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
11use crate::documents::FieldIdMapper;
12use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
13
14/// A view into a document that can represent either the current version from the DB,
15/// the update data from payload or other means, or the merged updated version.
16///
17/// The 'doc lifetime is meant to live sufficiently for the document to be handled by the extractors.
18pub trait Document<'doc> {
19    /// Iterate over all **top-level** fields of the document, returning their name and raw JSON value.
20    ///
21    /// - The returned values *may* contain nested fields.
22    /// - The `_vectors` and `_geo` fields are **ignored** by this method, meaning  they are **not returned** by this method.
23    fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'doc str, &'doc RawValue)>>;
24
25    /// Number of top level fields, **excluding** `_vectors` and `_geo`
26    fn top_level_fields_count(&self) -> usize;
27
28    /// Get the **top-level** with the specified name, if exists.
29    ///
30    /// - The `_vectors` and `_geo` fields are **ignored** by this method, meaning e.g. `top_level_field("_vectors")` will return `Ok(None)`
31    fn top_level_field(&self, k: &str) -> Result<Option<&'doc RawValue>>;
32
33    /// Returns the unparsed value of the `_vectors` field from the document data.
34    ///
35    /// This field alone is insufficient to retrieve vectors, as they may be stored in a dedicated location in the database.
36    /// Use a [`super::vector_document::VectorDocument`] to access the vector.
37    ///
38    /// This method is meant as a convenience for implementors of [`super::vector_document::VectorDocument`].
39    fn vectors_field(&self) -> Result<Option<&'doc RawValue>>;
40
41    /// Returns the unparsed value of the `_geo` field from the document data.
42    ///
43    /// This field alone is insufficient to retrieve geo data, as they may be stored in a dedicated location in the database.
44    /// Use a [`super::geo_document::GeoDocument`] to access the vector.
45    ///
46    /// This method is meant as a convenience for implementors of [`super::geo_document::GeoDocument`].
47    fn geo_field(&self) -> Result<Option<&'doc RawValue>>;
48}
49
50#[derive(Debug)]
51pub struct DocumentFromDb<'t, Mapper: FieldIdMapper>
52where
53    Mapper: FieldIdMapper,
54{
55    fields_ids_map: &'t Mapper,
56    content: &'t KvReaderFieldId,
57}
58
59impl<Mapper: FieldIdMapper> Clone for DocumentFromDb<'_, Mapper> {
60    #[inline]
61    fn clone(&self) -> Self {
62        *self
63    }
64}
65impl<Mapper: FieldIdMapper> Copy for DocumentFromDb<'_, Mapper> {}
66
67impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
68    fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'t str, &'t RawValue)>> {
69        let mut it = self.content.iter();
70
71        std::iter::from_fn(move || loop {
72            let (fid, value) = it.next()?;
73            let name = match self.fields_ids_map.name(fid).ok_or(
74                InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldId {
75                    field_id: fid,
76                    process: "getting current document",
77                }),
78            ) {
79                Ok(name) => name,
80                Err(error) => return Some(Err(error.into())),
81            };
82
83            if name == RESERVED_VECTORS_FIELD_NAME || name == RESERVED_GEO_FIELD_NAME {
84                continue;
85            }
86
87            let res = (|| {
88                let value =
89                    serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?;
90
91                Ok((name, value))
92            })();
93
94            return Some(res);
95        })
96    }
97
98    fn vectors_field(&self) -> Result<Option<&'t RawValue>> {
99        self.field(RESERVED_VECTORS_FIELD_NAME)
100    }
101
102    fn geo_field(&self) -> Result<Option<&'t RawValue>> {
103        self.field(RESERVED_GEO_FIELD_NAME)
104    }
105
106    fn top_level_fields_count(&self) -> usize {
107        let has_vectors_field = self.vectors_field().unwrap_or(None).is_some();
108        let has_geo_field = self.geo_field().unwrap_or(None).is_some();
109        let count = self.content.iter().count();
110        match (has_vectors_field, has_geo_field) {
111            (true, true) => count - 2,
112            (true, false) | (false, true) => count - 1,
113            (false, false) => count,
114        }
115    }
116
117    fn top_level_field(&self, k: &str) -> Result<Option<&'t RawValue>> {
118        if k == RESERVED_VECTORS_FIELD_NAME || k == RESERVED_GEO_FIELD_NAME {
119            return Ok(None);
120        }
121        self.field(k)
122    }
123}
124
125impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> {
126    pub fn new(
127        docid: DocumentId,
128        rtxn: &'t RoTxn,
129        index: &'t Index,
130        db_fields_ids_map: &'t Mapper,
131    ) -> Result<Option<Self>> {
132        index.documents.get(rtxn, &docid).map_err(crate::Error::from).map(|reader| {
133            reader.map(|reader| Self { fields_ids_map: db_fields_ids_map, content: reader })
134        })
135    }
136
137    pub fn field(&self, name: &str) -> Result<Option<&'t RawValue>> {
138        let Some(fid) = self.fields_ids_map.id(name) else {
139            return Ok(None);
140        };
141        let Some(value) = self.content.get(fid) else { return Ok(None) };
142        Ok(Some(serde_json::from_slice(value).map_err(InternalError::SerdeJson)?))
143    }
144}
145
146#[derive(Debug)]
147pub struct DocumentFromVersions<'a, 'doc> {
148    versions: &'a Versions<'doc>,
149}
150
151impl<'a, 'doc> DocumentFromVersions<'a, 'doc> {
152    pub fn new(versions: &'a Versions<'doc>) -> Self {
153        Self { versions }
154    }
155}
156
157impl<'doc> Document<'doc> for DocumentFromVersions<'_, 'doc> {
158    fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'doc str, &'doc RawValue)>> {
159        self.versions.iter_top_level_fields().map(Ok)
160    }
161
162    fn vectors_field(&self) -> Result<Option<&'doc RawValue>> {
163        Ok(self.versions.vectors_field())
164    }
165
166    fn geo_field(&self) -> Result<Option<&'doc RawValue>> {
167        Ok(self.versions.geo_field())
168    }
169
170    fn top_level_fields_count(&self) -> usize {
171        let has_vectors_field = self.vectors_field().unwrap_or(None).is_some();
172        let has_geo_field = self.geo_field().unwrap_or(None).is_some();
173        let count = self.versions.len();
174        match (has_vectors_field, has_geo_field) {
175            (true, true) => count - 2,
176            (true, false) | (false, true) => count - 1,
177            (false, false) => count,
178        }
179    }
180
181    fn top_level_field(&self, k: &str) -> Result<Option<&'doc RawValue>> {
182        Ok(self.versions.top_level_field(k))
183    }
184}
185
186#[derive(Debug)]
187pub struct MergedDocument<'a, 'doc, 't, Mapper: FieldIdMapper> {
188    new_doc: DocumentFromVersions<'a, 'doc>,
189    db: Option<DocumentFromDb<'t, Mapper>>,
190}
191
192impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
193    pub fn with_db(
194        docid: DocumentId,
195        rtxn: &'t RoTxn,
196        index: &'t Index,
197        db_fields_ids_map: &'t Mapper,
198        new_doc: DocumentFromVersions<'a, 'doc>,
199    ) -> Result<Self> {
200        let db = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?;
201        Ok(Self { new_doc, db })
202    }
203
204    pub fn without_db(new_doc: DocumentFromVersions<'a, 'doc>) -> Self {
205        Self { new_doc, db: None }
206    }
207}
208
209impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
210    for MergedDocument<'d, 'doc, 't, Mapper>
211{
212    fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'d str, &'d RawValue)>> {
213        let mut new_doc_it = self.new_doc.iter_top_level_fields();
214        let mut db_it = self.db.iter().flat_map(|db| db.iter_top_level_fields());
215        let mut seen_fields = BTreeSet::new();
216
217        std::iter::from_fn(move || {
218            if let Some(next) = new_doc_it.next() {
219                if let Ok((name, _)) = next {
220                    seen_fields.insert(name);
221                }
222                return Some(next);
223            }
224            loop {
225                match db_it.next()? {
226                    Ok((name, value)) => {
227                        if seen_fields.contains(name) {
228                            continue;
229                        }
230                        return Some(Ok((name, value)));
231                    }
232                    Err(err) => return Some(Err(err)),
233                }
234            }
235        })
236    }
237
238    fn vectors_field(&self) -> Result<Option<&'d RawValue>> {
239        if let Some(vectors) = self.new_doc.vectors_field()? {
240            return Ok(Some(vectors));
241        }
242
243        let Some(db) = self.db else { return Ok(None) };
244
245        db.vectors_field()
246    }
247
248    fn geo_field(&self) -> Result<Option<&'d RawValue>> {
249        if let Some(geo) = self.new_doc.geo_field()? {
250            return Ok(Some(geo));
251        }
252
253        let Some(db) = self.db else { return Ok(None) };
254
255        db.geo_field()
256    }
257
258    fn top_level_fields_count(&self) -> usize {
259        self.iter_top_level_fields().count()
260    }
261
262    fn top_level_field(&self, k: &str) -> Result<Option<&'d RawValue>> {
263        if let Some(f) = self.new_doc.top_level_field(k)? {
264            return Ok(Some(f));
265        }
266        if let Some(db) = self.db {
267            return db.field(k);
268        }
269        Ok(None)
270    }
271}
272
273impl<'doc, D> Document<'doc> for &D
274where
275    D: Document<'doc>,
276{
277    fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'doc str, &'doc RawValue)>> {
278        D::iter_top_level_fields(self)
279    }
280
281    fn vectors_field(&self) -> Result<Option<&'doc RawValue>> {
282        D::vectors_field(self)
283    }
284
285    fn geo_field(&self) -> Result<Option<&'doc RawValue>> {
286        D::geo_field(self)
287    }
288
289    fn top_level_fields_count(&self) -> usize {
290        D::top_level_fields_count(self)
291    }
292
293    fn top_level_field(&self, k: &str) -> Result<Option<&'doc RawValue>> {
294        D::top_level_field(self, k)
295    }
296}
297
298/// Turn this document into an obkv, whose fields are indexed by the provided `FieldIdMapper`.
299///
300/// The produced obkv is suitable for storing into the documents DB, meaning:
301///
302/// - It contains the contains of `_vectors` that are not configured as an embedder
303/// - It contains all the top-level fields of the document, with their raw JSON value as value.
304///
305/// # Panics
306///
307/// - If the document contains a top-level field that is not present in `fields_ids_map`.
308///
309pub fn write_to_obkv<'s, 'a, 'map, 'buffer>(
310    document: &'s impl Document<'s>,
311    vector_document: Option<&'s impl VectorDocument<'s>>,
312    fields_ids_map: &'a mut GlobalFieldsIdsMap<'map>,
313    mut document_buffer: &'a mut bumpalo::collections::Vec<'buffer, u8>,
314) -> Result<&'a KvReaderFieldId>
315where
316    's: 'a,
317{
318    // will be used in 'inject_vectors
319    let vectors_value: Box<RawValue>;
320
321    document_buffer.clear();
322    let mut unordered_field_buffer = Vec::new();
323    unordered_field_buffer.clear();
324
325    let mut writer = KvWriterFieldId::new(&mut document_buffer);
326
327    for res in document.iter_top_level_fields() {
328        let (field_name, value) = res?;
329        let field_id =
330            fields_ids_map.id_or_insert(field_name).ok_or(UserError::AttributeLimitReached)?;
331        unordered_field_buffer.push((field_id, value));
332    }
333
334    'inject_vectors: {
335        let Some(vector_document) = vector_document else { break 'inject_vectors };
336
337        let mut vectors = BTreeMap::new();
338        for res in vector_document.iter_vectors() {
339            let (name, entry) = res?;
340            if entry.has_configured_embedder {
341                continue; // we don't write vectors with configured embedder in documents
342            }
343            vectors.insert(
344                name,
345                if entry.implicit {
346                    serde_json::json!(entry.embeddings)
347                } else {
348                    serde_json::json!({
349                        "regenerate": entry.regenerate,
350                        // TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object
351                        "embeddings": entry.embeddings,
352                    })
353                },
354            );
355        }
356
357        if vectors.is_empty() {
358            break 'inject_vectors;
359        }
360
361        let vectors_fid = fields_ids_map
362            .id_or_insert(RESERVED_VECTORS_FIELD_NAME)
363            .ok_or(UserError::AttributeLimitReached)?;
364
365        vectors_value = serde_json::value::to_raw_value(&vectors).unwrap();
366        unordered_field_buffer.push((vectors_fid, &vectors_value));
367    }
368
369    if let Some(geo_value) = document.geo_field()? {
370        let fid = fields_ids_map
371            .id_or_insert(RESERVED_GEO_FIELD_NAME)
372            .ok_or(UserError::AttributeLimitReached)?;
373        fields_ids_map.id_or_insert("_geo.lat").ok_or(UserError::AttributeLimitReached)?;
374        fields_ids_map.id_or_insert("_geo.lng").ok_or(UserError::AttributeLimitReached)?;
375        unordered_field_buffer.push((fid, geo_value));
376    }
377
378    unordered_field_buffer.sort_by_key(|(fid, _)| *fid);
379    for (fid, value) in unordered_field_buffer.iter() {
380        writer.insert(*fid, value.get().as_bytes()).unwrap();
381    }
382
383    writer.finish().unwrap();
384    Ok(KvReaderFieldId::from_slice(document_buffer))
385}
386
387pub type Entry<'doc> = (&'doc str, &'doc RawValue);
388
389#[derive(Debug)]
390pub struct Versions<'doc> {
391    data: RawMap<'doc, FxBuildHasher>,
392}
393
394impl<'doc> Versions<'doc> {
395    pub fn multiple(
396        mut versions: impl Iterator<Item = Result<RawMap<'doc, FxBuildHasher>>>,
397    ) -> Result<Option<Self>> {
398        let Some(data) = versions.next() else { return Ok(None) };
399        let mut data = data?;
400        for future_version in versions {
401            let future_version = future_version?;
402            for (field, value) in future_version {
403                data.insert(field, value);
404            }
405        }
406        Ok(Some(Self::single(data)))
407    }
408
409    pub fn single(version: RawMap<'doc, FxBuildHasher>) -> Self {
410        Self { data: version }
411    }
412
413    pub fn iter_top_level_fields(&self) -> impl Iterator<Item = (&'doc str, &'doc RawValue)> + '_ {
414        self.data
415            .iter()
416            .filter(|(k, _)| *k != RESERVED_VECTORS_FIELD_NAME && *k != RESERVED_GEO_FIELD_NAME)
417    }
418
419    pub fn vectors_field(&self) -> Option<&'doc RawValue> {
420        self.data.get(RESERVED_VECTORS_FIELD_NAME)
421    }
422
423    pub fn geo_field(&self) -> Option<&'doc RawValue> {
424        self.data.get(RESERVED_GEO_FIELD_NAME)
425    }
426
427    pub fn len(&self) -> usize {
428        self.data.len()
429    }
430
431    pub fn is_empty(&self) -> bool {
432        self.data.is_empty()
433    }
434
435    pub fn top_level_field(&self, k: &str) -> Option<&'doc RawValue> {
436        if k == RESERVED_VECTORS_FIELD_NAME || k == RESERVED_GEO_FIELD_NAME {
437            return None;
438        }
439        self.data.get(k)
440    }
441}