1use std::collections::BTreeSet;
2
3use bumpalo::Bump;
4use bumparaw_collections::RawMap;
5use deserr::{Deserr, IntoValue};
6use heed::RoTxn;
7use rustc_hash::FxBuildHasher;
8use serde::Serialize;
9use serde_json::value::RawValue;
10
11use super::document::{Document, DocumentFromDb, DocumentFromVersions, Versions};
12use super::indexer::de::DeserrRawValue;
13use crate::constants::RESERVED_VECTORS_FIELD_NAME;
14use crate::documents::FieldIdMapper;
15use crate::index::IndexEmbeddingConfig;
16use crate::vector::parsed_vectors::{RawVectors, RawVectorsError, VectorOrArrayOfVectors};
17use crate::vector::{ArroyWrapper, Embedding, EmbeddingConfigs};
18use crate::{DocumentId, Index, InternalError, Result, UserError};
19
20#[derive(Serialize)]
21#[serde(untagged)]
22pub enum Embeddings<'doc> {
23 FromJsonExplicit(&'doc RawValue),
24 FromJsonImplicityUserProvided(&'doc RawValue),
25 FromDb(Vec<Embedding>),
26}
27impl<'doc> Embeddings<'doc> {
28 pub fn into_vec(
29 self,
30 doc_alloc: &'doc Bump,
31 embedder_name: &str,
32 ) -> std::result::Result<Vec<Embedding>, deserr::errors::JsonError> {
33 match self {
34 Embeddings::FromJsonExplicit(value) => {
35 let vectors_ref = deserr::ValuePointerRef::Key {
36 key: RESERVED_VECTORS_FIELD_NAME,
37 prev: &deserr::ValuePointerRef::Origin,
38 };
39 let embedders_ref =
40 deserr::ValuePointerRef::Key { key: embedder_name, prev: &vectors_ref };
41
42 let embeddings_ref =
43 deserr::ValuePointerRef::Key { key: "embeddings", prev: &embedders_ref };
44
45 let v: VectorOrArrayOfVectors = VectorOrArrayOfVectors::deserialize_from_value(
46 DeserrRawValue::new_in(value, doc_alloc).into_value(),
47 embeddings_ref,
48 )?;
49 Ok(v.into_array_of_vectors().unwrap_or_default())
50 }
51 Embeddings::FromJsonImplicityUserProvided(value) => {
52 let vectors_ref = deserr::ValuePointerRef::Key {
53 key: RESERVED_VECTORS_FIELD_NAME,
54 prev: &deserr::ValuePointerRef::Origin,
55 };
56 let embedders_ref =
57 deserr::ValuePointerRef::Key { key: embedder_name, prev: &vectors_ref };
58
59 let v: VectorOrArrayOfVectors = VectorOrArrayOfVectors::deserialize_from_value(
60 DeserrRawValue::new_in(value, doc_alloc).into_value(),
61 embedders_ref,
62 )?;
63 Ok(v.into_array_of_vectors().unwrap_or_default())
64 }
65 Embeddings::FromDb(vec) => Ok(vec),
66 }
67 }
68}
69
70pub struct VectorEntry<'doc> {
71 pub has_configured_embedder: bool,
72 pub embeddings: Option<Embeddings<'doc>>,
73 pub regenerate: bool,
74 pub implicit: bool,
75}
76
77pub trait VectorDocument<'doc> {
78 fn iter_vectors(&self) -> impl Iterator<Item = Result<(&'doc str, VectorEntry<'doc>)>>;
79
80 fn vectors_for_key(&self, key: &str) -> Result<Option<VectorEntry<'doc>>>;
81}
82
83pub struct VectorDocumentFromDb<'t> {
84 docid: DocumentId,
85 embedding_config: Vec<IndexEmbeddingConfig>,
86 index: &'t Index,
87 vectors_field: Option<RawMap<'t, FxBuildHasher>>,
88 rtxn: &'t RoTxn<'t>,
89 doc_alloc: &'t Bump,
90}
91
92impl<'t> VectorDocumentFromDb<'t> {
93 pub fn new<Mapper: FieldIdMapper>(
94 docid: DocumentId,
95 index: &'t Index,
96 rtxn: &'t RoTxn,
97 db_fields_ids_map: &'t Mapper,
98 doc_alloc: &'t Bump,
99 ) -> Result<Option<Self>> {
100 let Some(document) = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)? else {
101 return Ok(None);
102 };
103 let vectors = document.vectors_field()?;
104 let vectors_field = match vectors {
105 Some(vectors) => Some(
106 RawMap::from_raw_value_and_hasher(vectors, FxBuildHasher, doc_alloc)
107 .map_err(InternalError::SerdeJson)?,
108 ),
109 None => None,
110 };
111
112 let embedding_config = index.embedding_configs(rtxn)?;
113
114 Ok(Some(Self { docid, embedding_config, index, vectors_field, rtxn, doc_alloc }))
115 }
116
117 fn entry_from_db(
118 &self,
119 embedder_id: u8,
120 config: &IndexEmbeddingConfig,
121 ) -> Result<VectorEntry<'t>> {
122 let reader =
123 ArroyWrapper::new(self.index.vector_arroy, embedder_id, config.config.quantized());
124 let vectors = reader.item_vectors(self.rtxn, self.docid)?;
125
126 Ok(VectorEntry {
127 has_configured_embedder: true,
128 embeddings: Some(Embeddings::FromDb(vectors)),
129 regenerate: !config.user_provided.contains(self.docid),
130 implicit: false,
131 })
132 }
133}
134
135impl<'t> VectorDocument<'t> for VectorDocumentFromDb<'t> {
136 fn iter_vectors(&self) -> impl Iterator<Item = Result<(&'t str, VectorEntry<'t>)>> {
137 self.embedding_config
138 .iter()
139 .map(|config| {
140 let embedder_id =
141 self.index.embedder_category_id.get(self.rtxn, &config.name)?.unwrap();
142 let entry = self.entry_from_db(embedder_id, config)?;
143 let config_name = self.doc_alloc.alloc_str(config.name.as_str());
144 Ok((&*config_name, entry))
145 })
146 .chain(self.vectors_field.iter().flat_map(|map| map.iter()).map(|(name, value)| {
147 Ok((
148 name,
149 entry_from_raw_value(value, false).map_err(|_| {
150 InternalError::Serialization(crate::SerializationError::Decoding {
151 db_name: Some(crate::index::db_name::VECTOR_ARROY),
152 })
153 })?,
154 ))
155 }))
156 }
157
158 fn vectors_for_key(&self, key: &str) -> Result<Option<VectorEntry<'t>>> {
159 Ok(match self.index.embedder_category_id.get(self.rtxn, key)? {
160 Some(embedder_id) => {
161 let config =
162 self.embedding_config.iter().find(|config| config.name == key).unwrap();
163 Some(self.entry_from_db(embedder_id, config)?)
164 }
165 None => match self.vectors_field.as_ref().and_then(|obkv| obkv.get(key)) {
166 Some(embedding_from_doc) => {
167 Some(entry_from_raw_value(embedding_from_doc, false).map_err(|_| {
168 InternalError::Serialization(crate::SerializationError::Decoding {
169 db_name: Some(crate::index::db_name::VECTOR_ARROY),
170 })
171 })?)
172 }
173 None => None,
174 },
175 })
176 }
177}
178
179fn entry_from_raw_value_user<'doc>(
180 external_docid: &str,
181 embedder_name: &str,
182 value: &'doc RawValue,
183 has_configured_embedder: bool,
184) -> Result<VectorEntry<'doc>> {
185 entry_from_raw_value(value, has_configured_embedder).map_err(|error| {
186 UserError::InvalidVectorsEmbedderConf {
187 document_id: external_docid.to_string(),
188 error: error.msg(embedder_name),
189 }
190 .into()
191 })
192}
193
194fn entry_from_raw_value(
195 value: &RawValue,
196 has_configured_embedder: bool,
197) -> std::result::Result<VectorEntry<'_>, RawVectorsError> {
198 let value: RawVectors = RawVectors::from_raw_value(value)?;
199
200 Ok(match value {
201 RawVectors::Explicit(raw_explicit_vectors) => VectorEntry {
202 has_configured_embedder,
203 embeddings: raw_explicit_vectors.embeddings.map(Embeddings::FromJsonExplicit),
204 regenerate: raw_explicit_vectors.regenerate,
205 implicit: false,
206 },
207 RawVectors::ImplicitlyUserProvided(value) => VectorEntry {
208 has_configured_embedder,
209 embeddings: Some(
212 value
213 .map(Embeddings::FromJsonImplicityUserProvided)
214 .unwrap_or(Embeddings::FromDb(Default::default())),
215 ),
216 regenerate: false,
217 implicit: true,
218 },
219 })
220}
221
222pub struct VectorDocumentFromVersions<'doc> {
223 external_document_id: &'doc str,
224 vectors: RawMap<'doc, FxBuildHasher>,
225 embedders: &'doc EmbeddingConfigs,
226}
227
228impl<'doc> VectorDocumentFromVersions<'doc> {
229 pub fn new(
230 external_document_id: &'doc str,
231 versions: &Versions<'doc>,
232 bump: &'doc Bump,
233 embedders: &'doc EmbeddingConfigs,
234 ) -> Result<Option<Self>> {
235 let document = DocumentFromVersions::new(versions);
236 if let Some(vectors_field) = document.vectors_field()? {
237 let vectors = RawMap::from_raw_value_and_hasher(vectors_field, FxBuildHasher, bump)
238 .map_err(UserError::SerdeJson)?;
239 Ok(Some(Self { external_document_id, vectors, embedders }))
240 } else {
241 Ok(None)
242 }
243 }
244}
245
246impl<'doc> VectorDocument<'doc> for VectorDocumentFromVersions<'doc> {
247 fn iter_vectors(&self) -> impl Iterator<Item = Result<(&'doc str, VectorEntry<'doc>)>> {
248 self.vectors.iter().map(|(embedder, vectors)| {
249 let vectors = entry_from_raw_value_user(
250 self.external_document_id,
251 embedder,
252 vectors,
253 self.embedders.contains(embedder),
254 )?;
255 Ok((embedder, vectors))
256 })
257 }
258
259 fn vectors_for_key(&self, key: &str) -> Result<Option<VectorEntry<'doc>>> {
260 let Some(vectors) = self.vectors.get(key) else { return Ok(None) };
261 let vectors = entry_from_raw_value_user(
262 self.external_document_id,
263 key,
264 vectors,
265 self.embedders.contains(key),
266 )?;
267 Ok(Some(vectors))
268 }
269}
270
271pub struct MergedVectorDocument<'doc> {
272 new_doc: Option<VectorDocumentFromVersions<'doc>>,
273 db: Option<VectorDocumentFromDb<'doc>>,
274}
275
276impl<'doc> MergedVectorDocument<'doc> {
277 #[allow(clippy::too_many_arguments)]
278 pub fn with_db<Mapper: FieldIdMapper>(
279 docid: DocumentId,
280 external_document_id: &'doc str,
281 index: &'doc Index,
282 rtxn: &'doc RoTxn,
283 db_fields_ids_map: &'doc Mapper,
284 versions: &Versions<'doc>,
285 doc_alloc: &'doc Bump,
286 embedders: &'doc EmbeddingConfigs,
287 ) -> Result<Option<Self>> {
288 let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?;
289 let new_doc =
290 VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?;
291 Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) })
292 }
293
294 pub fn without_db(
295 external_document_id: &'doc str,
296 versions: &Versions<'doc>,
297 doc_alloc: &'doc Bump,
298 embedders: &'doc EmbeddingConfigs,
299 ) -> Result<Option<Self>> {
300 let Some(new_doc) =
301 VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?
302 else {
303 return Ok(None);
304 };
305 Ok(Some(Self { new_doc: Some(new_doc), db: None }))
306 }
307}
308
309impl<'doc> VectorDocument<'doc> for MergedVectorDocument<'doc> {
310 fn iter_vectors(&self) -> impl Iterator<Item = Result<(&'doc str, VectorEntry<'doc>)>> {
311 let mut new_doc_it = self.new_doc.iter().flat_map(|new_doc| new_doc.iter_vectors());
312 let mut db_it = self.db.iter().flat_map(|db| db.iter_vectors());
313 let mut seen_fields = BTreeSet::new();
314
315 std::iter::from_fn(move || {
316 if let Some(next) = new_doc_it.next() {
317 if let Ok((name, _)) = next {
318 seen_fields.insert(name);
319 }
320 return Some(next);
321 }
322 loop {
323 match db_it.next()? {
324 Ok((name, value)) => {
325 if seen_fields.contains(name) {
326 continue;
327 }
328 return Some(Ok((name, value)));
329 }
330 Err(err) => return Some(Err(err)),
331 }
332 }
333 })
334 }
335
336 fn vectors_for_key(&self, key: &str) -> Result<Option<VectorEntry<'doc>>> {
337 if let Some(new_doc) = &self.new_doc {
338 if let Some(entry) = new_doc.vectors_for_key(key)? {
339 return Ok(Some(entry));
340 }
341 }
342
343 let Some(db) = self.db.as_ref() else { return Ok(None) };
344 db.vectors_for_key(key)
345 }
346}