milli_core/update/new/
vector_document.rs

1use std::collections::BTreeSet;
2
3use bumpalo::Bump;
4use bumparaw_collections::RawMap;
5use deserr::{Deserr, IntoValue};
6use heed::RoTxn;
7use rustc_hash::FxBuildHasher;
8use serde::Serialize;
9use serde_json::value::RawValue;
10
11use super::document::{Document, DocumentFromDb, DocumentFromVersions, Versions};
12use super::indexer::de::DeserrRawValue;
13use crate::constants::RESERVED_VECTORS_FIELD_NAME;
14use crate::documents::FieldIdMapper;
15use crate::index::IndexEmbeddingConfig;
16use crate::vector::parsed_vectors::{RawVectors, RawVectorsError, VectorOrArrayOfVectors};
17use crate::vector::{ArroyWrapper, Embedding, EmbeddingConfigs};
18use crate::{DocumentId, Index, InternalError, Result, UserError};
19
20#[derive(Serialize)]
21#[serde(untagged)]
22pub enum Embeddings<'doc> {
23    FromJsonExplicit(&'doc RawValue),
24    FromJsonImplicityUserProvided(&'doc RawValue),
25    FromDb(Vec<Embedding>),
26}
27impl<'doc> Embeddings<'doc> {
28    pub fn into_vec(
29        self,
30        doc_alloc: &'doc Bump,
31        embedder_name: &str,
32    ) -> std::result::Result<Vec<Embedding>, deserr::errors::JsonError> {
33        match self {
34            Embeddings::FromJsonExplicit(value) => {
35                let vectors_ref = deserr::ValuePointerRef::Key {
36                    key: RESERVED_VECTORS_FIELD_NAME,
37                    prev: &deserr::ValuePointerRef::Origin,
38                };
39                let embedders_ref =
40                    deserr::ValuePointerRef::Key { key: embedder_name, prev: &vectors_ref };
41
42                let embeddings_ref =
43                    deserr::ValuePointerRef::Key { key: "embeddings", prev: &embedders_ref };
44
45                let v: VectorOrArrayOfVectors = VectorOrArrayOfVectors::deserialize_from_value(
46                    DeserrRawValue::new_in(value, doc_alloc).into_value(),
47                    embeddings_ref,
48                )?;
49                Ok(v.into_array_of_vectors().unwrap_or_default())
50            }
51            Embeddings::FromJsonImplicityUserProvided(value) => {
52                let vectors_ref = deserr::ValuePointerRef::Key {
53                    key: RESERVED_VECTORS_FIELD_NAME,
54                    prev: &deserr::ValuePointerRef::Origin,
55                };
56                let embedders_ref =
57                    deserr::ValuePointerRef::Key { key: embedder_name, prev: &vectors_ref };
58
59                let v: VectorOrArrayOfVectors = VectorOrArrayOfVectors::deserialize_from_value(
60                    DeserrRawValue::new_in(value, doc_alloc).into_value(),
61                    embedders_ref,
62                )?;
63                Ok(v.into_array_of_vectors().unwrap_or_default())
64            }
65            Embeddings::FromDb(vec) => Ok(vec),
66        }
67    }
68}
69
70pub struct VectorEntry<'doc> {
71    pub has_configured_embedder: bool,
72    pub embeddings: Option<Embeddings<'doc>>,
73    pub regenerate: bool,
74    pub implicit: bool,
75}
76
77pub trait VectorDocument<'doc> {
78    fn iter_vectors(&self) -> impl Iterator<Item = Result<(&'doc str, VectorEntry<'doc>)>>;
79
80    fn vectors_for_key(&self, key: &str) -> Result<Option<VectorEntry<'doc>>>;
81}
82
83pub struct VectorDocumentFromDb<'t> {
84    docid: DocumentId,
85    embedding_config: Vec<IndexEmbeddingConfig>,
86    index: &'t Index,
87    vectors_field: Option<RawMap<'t, FxBuildHasher>>,
88    rtxn: &'t RoTxn<'t>,
89    doc_alloc: &'t Bump,
90}
91
92impl<'t> VectorDocumentFromDb<'t> {
93    pub fn new<Mapper: FieldIdMapper>(
94        docid: DocumentId,
95        index: &'t Index,
96        rtxn: &'t RoTxn,
97        db_fields_ids_map: &'t Mapper,
98        doc_alloc: &'t Bump,
99    ) -> Result<Option<Self>> {
100        let Some(document) = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)? else {
101            return Ok(None);
102        };
103        let vectors = document.vectors_field()?;
104        let vectors_field = match vectors {
105            Some(vectors) => Some(
106                RawMap::from_raw_value_and_hasher(vectors, FxBuildHasher, doc_alloc)
107                    .map_err(InternalError::SerdeJson)?,
108            ),
109            None => None,
110        };
111
112        let embedding_config = index.embedding_configs(rtxn)?;
113
114        Ok(Some(Self { docid, embedding_config, index, vectors_field, rtxn, doc_alloc }))
115    }
116
117    fn entry_from_db(
118        &self,
119        embedder_id: u8,
120        config: &IndexEmbeddingConfig,
121    ) -> Result<VectorEntry<'t>> {
122        let reader =
123            ArroyWrapper::new(self.index.vector_arroy, embedder_id, config.config.quantized());
124        let vectors = reader.item_vectors(self.rtxn, self.docid)?;
125
126        Ok(VectorEntry {
127            has_configured_embedder: true,
128            embeddings: Some(Embeddings::FromDb(vectors)),
129            regenerate: !config.user_provided.contains(self.docid),
130            implicit: false,
131        })
132    }
133}
134
135impl<'t> VectorDocument<'t> for VectorDocumentFromDb<'t> {
136    fn iter_vectors(&self) -> impl Iterator<Item = Result<(&'t str, VectorEntry<'t>)>> {
137        self.embedding_config
138            .iter()
139            .map(|config| {
140                let embedder_id =
141                    self.index.embedder_category_id.get(self.rtxn, &config.name)?.unwrap();
142                let entry = self.entry_from_db(embedder_id, config)?;
143                let config_name = self.doc_alloc.alloc_str(config.name.as_str());
144                Ok((&*config_name, entry))
145            })
146            .chain(self.vectors_field.iter().flat_map(|map| map.iter()).map(|(name, value)| {
147                Ok((
148                    name,
149                    entry_from_raw_value(value, false).map_err(|_| {
150                        InternalError::Serialization(crate::SerializationError::Decoding {
151                            db_name: Some(crate::index::db_name::VECTOR_ARROY),
152                        })
153                    })?,
154                ))
155            }))
156    }
157
158    fn vectors_for_key(&self, key: &str) -> Result<Option<VectorEntry<'t>>> {
159        Ok(match self.index.embedder_category_id.get(self.rtxn, key)? {
160            Some(embedder_id) => {
161                let config =
162                    self.embedding_config.iter().find(|config| config.name == key).unwrap();
163                Some(self.entry_from_db(embedder_id, config)?)
164            }
165            None => match self.vectors_field.as_ref().and_then(|obkv| obkv.get(key)) {
166                Some(embedding_from_doc) => {
167                    Some(entry_from_raw_value(embedding_from_doc, false).map_err(|_| {
168                        InternalError::Serialization(crate::SerializationError::Decoding {
169                            db_name: Some(crate::index::db_name::VECTOR_ARROY),
170                        })
171                    })?)
172                }
173                None => None,
174            },
175        })
176    }
177}
178
179fn entry_from_raw_value_user<'doc>(
180    external_docid: &str,
181    embedder_name: &str,
182    value: &'doc RawValue,
183    has_configured_embedder: bool,
184) -> Result<VectorEntry<'doc>> {
185    entry_from_raw_value(value, has_configured_embedder).map_err(|error| {
186        UserError::InvalidVectorsEmbedderConf {
187            document_id: external_docid.to_string(),
188            error: error.msg(embedder_name),
189        }
190        .into()
191    })
192}
193
194fn entry_from_raw_value(
195    value: &RawValue,
196    has_configured_embedder: bool,
197) -> std::result::Result<VectorEntry<'_>, RawVectorsError> {
198    let value: RawVectors = RawVectors::from_raw_value(value)?;
199
200    Ok(match value {
201        RawVectors::Explicit(raw_explicit_vectors) => VectorEntry {
202            has_configured_embedder,
203            embeddings: raw_explicit_vectors.embeddings.map(Embeddings::FromJsonExplicit),
204            regenerate: raw_explicit_vectors.regenerate,
205            implicit: false,
206        },
207        RawVectors::ImplicitlyUserProvided(value) => VectorEntry {
208            has_configured_embedder,
209            // implicitly user provided always provide embeddings
210            // `None` here means that there are no embeddings
211            embeddings: Some(
212                value
213                    .map(Embeddings::FromJsonImplicityUserProvided)
214                    .unwrap_or(Embeddings::FromDb(Default::default())),
215            ),
216            regenerate: false,
217            implicit: true,
218        },
219    })
220}
221
222pub struct VectorDocumentFromVersions<'doc> {
223    external_document_id: &'doc str,
224    vectors: RawMap<'doc, FxBuildHasher>,
225    embedders: &'doc EmbeddingConfigs,
226}
227
228impl<'doc> VectorDocumentFromVersions<'doc> {
229    pub fn new(
230        external_document_id: &'doc str,
231        versions: &Versions<'doc>,
232        bump: &'doc Bump,
233        embedders: &'doc EmbeddingConfigs,
234    ) -> Result<Option<Self>> {
235        let document = DocumentFromVersions::new(versions);
236        if let Some(vectors_field) = document.vectors_field()? {
237            let vectors = RawMap::from_raw_value_and_hasher(vectors_field, FxBuildHasher, bump)
238                .map_err(UserError::SerdeJson)?;
239            Ok(Some(Self { external_document_id, vectors, embedders }))
240        } else {
241            Ok(None)
242        }
243    }
244}
245
246impl<'doc> VectorDocument<'doc> for VectorDocumentFromVersions<'doc> {
247    fn iter_vectors(&self) -> impl Iterator<Item = Result<(&'doc str, VectorEntry<'doc>)>> {
248        self.vectors.iter().map(|(embedder, vectors)| {
249            let vectors = entry_from_raw_value_user(
250                self.external_document_id,
251                embedder,
252                vectors,
253                self.embedders.contains(embedder),
254            )?;
255            Ok((embedder, vectors))
256        })
257    }
258
259    fn vectors_for_key(&self, key: &str) -> Result<Option<VectorEntry<'doc>>> {
260        let Some(vectors) = self.vectors.get(key) else { return Ok(None) };
261        let vectors = entry_from_raw_value_user(
262            self.external_document_id,
263            key,
264            vectors,
265            self.embedders.contains(key),
266        )?;
267        Ok(Some(vectors))
268    }
269}
270
271pub struct MergedVectorDocument<'doc> {
272    new_doc: Option<VectorDocumentFromVersions<'doc>>,
273    db: Option<VectorDocumentFromDb<'doc>>,
274}
275
276impl<'doc> MergedVectorDocument<'doc> {
277    #[allow(clippy::too_many_arguments)]
278    pub fn with_db<Mapper: FieldIdMapper>(
279        docid: DocumentId,
280        external_document_id: &'doc str,
281        index: &'doc Index,
282        rtxn: &'doc RoTxn,
283        db_fields_ids_map: &'doc Mapper,
284        versions: &Versions<'doc>,
285        doc_alloc: &'doc Bump,
286        embedders: &'doc EmbeddingConfigs,
287    ) -> Result<Option<Self>> {
288        let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?;
289        let new_doc =
290            VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?;
291        Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) })
292    }
293
294    pub fn without_db(
295        external_document_id: &'doc str,
296        versions: &Versions<'doc>,
297        doc_alloc: &'doc Bump,
298        embedders: &'doc EmbeddingConfigs,
299    ) -> Result<Option<Self>> {
300        let Some(new_doc) =
301            VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?
302        else {
303            return Ok(None);
304        };
305        Ok(Some(Self { new_doc: Some(new_doc), db: None }))
306    }
307}
308
309impl<'doc> VectorDocument<'doc> for MergedVectorDocument<'doc> {
310    fn iter_vectors(&self) -> impl Iterator<Item = Result<(&'doc str, VectorEntry<'doc>)>> {
311        let mut new_doc_it = self.new_doc.iter().flat_map(|new_doc| new_doc.iter_vectors());
312        let mut db_it = self.db.iter().flat_map(|db| db.iter_vectors());
313        let mut seen_fields = BTreeSet::new();
314
315        std::iter::from_fn(move || {
316            if let Some(next) = new_doc_it.next() {
317                if let Ok((name, _)) = next {
318                    seen_fields.insert(name);
319                }
320                return Some(next);
321            }
322            loop {
323                match db_it.next()? {
324                    Ok((name, value)) => {
325                        if seen_fields.contains(name) {
326                            continue;
327                        }
328                        return Some(Ok((name, value)));
329                    }
330                    Err(err) => return Some(Err(err)),
331                }
332            }
333        })
334    }
335
336    fn vectors_for_key(&self, key: &str) -> Result<Option<VectorEntry<'doc>>> {
337        if let Some(new_doc) = &self.new_doc {
338            if let Some(entry) = new_doc.vectors_for_key(key)? {
339                return Ok(Some(entry));
340            }
341        }
342
343        let Some(db) = self.db.as_ref() else { return Ok(None) };
344        db.vectors_for_key(key)
345    }
346}