1use std::collections::{BTreeMap, BTreeSet};
2
3use bumparaw_collections::RawMap;
4use heed::RoTxn;
5use rustc_hash::FxBuildHasher;
6use serde_json::value::RawValue;
7
8use super::vector_document::VectorDocument;
9use super::{KvReaderFieldId, KvWriterFieldId};
10use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
11use crate::documents::FieldIdMapper;
12use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
13
14pub trait Document<'doc> {
19 fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'doc str, &'doc RawValue)>>;
24
25 fn top_level_fields_count(&self) -> usize;
27
28 fn top_level_field(&self, k: &str) -> Result<Option<&'doc RawValue>>;
32
33 fn vectors_field(&self) -> Result<Option<&'doc RawValue>>;
40
41 fn geo_field(&self) -> Result<Option<&'doc RawValue>>;
48}
49
50#[derive(Debug)]
51pub struct DocumentFromDb<'t, Mapper: FieldIdMapper>
52where
53 Mapper: FieldIdMapper,
54{
55 fields_ids_map: &'t Mapper,
56 content: &'t KvReaderFieldId,
57}
58
59impl<Mapper: FieldIdMapper> Clone for DocumentFromDb<'_, Mapper> {
60 #[inline]
61 fn clone(&self) -> Self {
62 *self
63 }
64}
65impl<Mapper: FieldIdMapper> Copy for DocumentFromDb<'_, Mapper> {}
66
67impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
68 fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'t str, &'t RawValue)>> {
69 let mut it = self.content.iter();
70
71 std::iter::from_fn(move || loop {
72 let (fid, value) = it.next()?;
73 let name = match self.fields_ids_map.name(fid).ok_or(
74 InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldId {
75 field_id: fid,
76 process: "getting current document",
77 }),
78 ) {
79 Ok(name) => name,
80 Err(error) => return Some(Err(error.into())),
81 };
82
83 if name == RESERVED_VECTORS_FIELD_NAME || name == RESERVED_GEO_FIELD_NAME {
84 continue;
85 }
86
87 let res = (|| {
88 let value =
89 serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?;
90
91 Ok((name, value))
92 })();
93
94 return Some(res);
95 })
96 }
97
98 fn vectors_field(&self) -> Result<Option<&'t RawValue>> {
99 self.field(RESERVED_VECTORS_FIELD_NAME)
100 }
101
102 fn geo_field(&self) -> Result<Option<&'t RawValue>> {
103 self.field(RESERVED_GEO_FIELD_NAME)
104 }
105
106 fn top_level_fields_count(&self) -> usize {
107 let has_vectors_field = self.vectors_field().unwrap_or(None).is_some();
108 let has_geo_field = self.geo_field().unwrap_or(None).is_some();
109 let count = self.content.iter().count();
110 match (has_vectors_field, has_geo_field) {
111 (true, true) => count - 2,
112 (true, false) | (false, true) => count - 1,
113 (false, false) => count,
114 }
115 }
116
117 fn top_level_field(&self, k: &str) -> Result<Option<&'t RawValue>> {
118 if k == RESERVED_VECTORS_FIELD_NAME || k == RESERVED_GEO_FIELD_NAME {
119 return Ok(None);
120 }
121 self.field(k)
122 }
123}
124
125impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> {
126 pub fn new(
127 docid: DocumentId,
128 rtxn: &'t RoTxn,
129 index: &'t Index,
130 db_fields_ids_map: &'t Mapper,
131 ) -> Result<Option<Self>> {
132 index.documents.get(rtxn, &docid).map_err(crate::Error::from).map(|reader| {
133 reader.map(|reader| Self { fields_ids_map: db_fields_ids_map, content: reader })
134 })
135 }
136
137 pub fn field(&self, name: &str) -> Result<Option<&'t RawValue>> {
138 let Some(fid) = self.fields_ids_map.id(name) else {
139 return Ok(None);
140 };
141 let Some(value) = self.content.get(fid) else { return Ok(None) };
142 Ok(Some(serde_json::from_slice(value).map_err(InternalError::SerdeJson)?))
143 }
144}
145
146#[derive(Debug)]
147pub struct DocumentFromVersions<'a, 'doc> {
148 versions: &'a Versions<'doc>,
149}
150
151impl<'a, 'doc> DocumentFromVersions<'a, 'doc> {
152 pub fn new(versions: &'a Versions<'doc>) -> Self {
153 Self { versions }
154 }
155}
156
157impl<'doc> Document<'doc> for DocumentFromVersions<'_, 'doc> {
158 fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'doc str, &'doc RawValue)>> {
159 self.versions.iter_top_level_fields().map(Ok)
160 }
161
162 fn vectors_field(&self) -> Result<Option<&'doc RawValue>> {
163 Ok(self.versions.vectors_field())
164 }
165
166 fn geo_field(&self) -> Result<Option<&'doc RawValue>> {
167 Ok(self.versions.geo_field())
168 }
169
170 fn top_level_fields_count(&self) -> usize {
171 let has_vectors_field = self.vectors_field().unwrap_or(None).is_some();
172 let has_geo_field = self.geo_field().unwrap_or(None).is_some();
173 let count = self.versions.len();
174 match (has_vectors_field, has_geo_field) {
175 (true, true) => count - 2,
176 (true, false) | (false, true) => count - 1,
177 (false, false) => count,
178 }
179 }
180
181 fn top_level_field(&self, k: &str) -> Result<Option<&'doc RawValue>> {
182 Ok(self.versions.top_level_field(k))
183 }
184}
185
186#[derive(Debug)]
187pub struct MergedDocument<'a, 'doc, 't, Mapper: FieldIdMapper> {
188 new_doc: DocumentFromVersions<'a, 'doc>,
189 db: Option<DocumentFromDb<'t, Mapper>>,
190}
191
192impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
193 pub fn with_db(
194 docid: DocumentId,
195 rtxn: &'t RoTxn,
196 index: &'t Index,
197 db_fields_ids_map: &'t Mapper,
198 new_doc: DocumentFromVersions<'a, 'doc>,
199 ) -> Result<Self> {
200 let db = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?;
201 Ok(Self { new_doc, db })
202 }
203
204 pub fn without_db(new_doc: DocumentFromVersions<'a, 'doc>) -> Self {
205 Self { new_doc, db: None }
206 }
207}
208
209impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
210 for MergedDocument<'d, 'doc, 't, Mapper>
211{
212 fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'d str, &'d RawValue)>> {
213 let mut new_doc_it = self.new_doc.iter_top_level_fields();
214 let mut db_it = self.db.iter().flat_map(|db| db.iter_top_level_fields());
215 let mut seen_fields = BTreeSet::new();
216
217 std::iter::from_fn(move || {
218 if let Some(next) = new_doc_it.next() {
219 if let Ok((name, _)) = next {
220 seen_fields.insert(name);
221 }
222 return Some(next);
223 }
224 loop {
225 match db_it.next()? {
226 Ok((name, value)) => {
227 if seen_fields.contains(name) {
228 continue;
229 }
230 return Some(Ok((name, value)));
231 }
232 Err(err) => return Some(Err(err)),
233 }
234 }
235 })
236 }
237
238 fn vectors_field(&self) -> Result<Option<&'d RawValue>> {
239 if let Some(vectors) = self.new_doc.vectors_field()? {
240 return Ok(Some(vectors));
241 }
242
243 let Some(db) = self.db else { return Ok(None) };
244
245 db.vectors_field()
246 }
247
248 fn geo_field(&self) -> Result<Option<&'d RawValue>> {
249 if let Some(geo) = self.new_doc.geo_field()? {
250 return Ok(Some(geo));
251 }
252
253 let Some(db) = self.db else { return Ok(None) };
254
255 db.geo_field()
256 }
257
258 fn top_level_fields_count(&self) -> usize {
259 self.iter_top_level_fields().count()
260 }
261
262 fn top_level_field(&self, k: &str) -> Result<Option<&'d RawValue>> {
263 if let Some(f) = self.new_doc.top_level_field(k)? {
264 return Ok(Some(f));
265 }
266 if let Some(db) = self.db {
267 return db.field(k);
268 }
269 Ok(None)
270 }
271}
272
273impl<'doc, D> Document<'doc> for &D
274where
275 D: Document<'doc>,
276{
277 fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'doc str, &'doc RawValue)>> {
278 D::iter_top_level_fields(self)
279 }
280
281 fn vectors_field(&self) -> Result<Option<&'doc RawValue>> {
282 D::vectors_field(self)
283 }
284
285 fn geo_field(&self) -> Result<Option<&'doc RawValue>> {
286 D::geo_field(self)
287 }
288
289 fn top_level_fields_count(&self) -> usize {
290 D::top_level_fields_count(self)
291 }
292
293 fn top_level_field(&self, k: &str) -> Result<Option<&'doc RawValue>> {
294 D::top_level_field(self, k)
295 }
296}
297
298pub fn write_to_obkv<'s, 'a, 'map, 'buffer>(
310 document: &'s impl Document<'s>,
311 vector_document: Option<&'s impl VectorDocument<'s>>,
312 fields_ids_map: &'a mut GlobalFieldsIdsMap<'map>,
313 mut document_buffer: &'a mut bumpalo::collections::Vec<'buffer, u8>,
314) -> Result<&'a KvReaderFieldId>
315where
316 's: 'a,
317{
318 let vectors_value: Box<RawValue>;
320
321 document_buffer.clear();
322 let mut unordered_field_buffer = Vec::new();
323 unordered_field_buffer.clear();
324
325 let mut writer = KvWriterFieldId::new(&mut document_buffer);
326
327 for res in document.iter_top_level_fields() {
328 let (field_name, value) = res?;
329 let field_id =
330 fields_ids_map.id_or_insert(field_name).ok_or(UserError::AttributeLimitReached)?;
331 unordered_field_buffer.push((field_id, value));
332 }
333
334 'inject_vectors: {
335 let Some(vector_document) = vector_document else { break 'inject_vectors };
336
337 let mut vectors = BTreeMap::new();
338 for res in vector_document.iter_vectors() {
339 let (name, entry) = res?;
340 if entry.has_configured_embedder {
341 continue; }
343 vectors.insert(
344 name,
345 if entry.implicit {
346 serde_json::json!(entry.embeddings)
347 } else {
348 serde_json::json!({
349 "regenerate": entry.regenerate,
350 "embeddings": entry.embeddings,
352 })
353 },
354 );
355 }
356
357 if vectors.is_empty() {
358 break 'inject_vectors;
359 }
360
361 let vectors_fid = fields_ids_map
362 .id_or_insert(RESERVED_VECTORS_FIELD_NAME)
363 .ok_or(UserError::AttributeLimitReached)?;
364
365 vectors_value = serde_json::value::to_raw_value(&vectors).unwrap();
366 unordered_field_buffer.push((vectors_fid, &vectors_value));
367 }
368
369 if let Some(geo_value) = document.geo_field()? {
370 let fid = fields_ids_map
371 .id_or_insert(RESERVED_GEO_FIELD_NAME)
372 .ok_or(UserError::AttributeLimitReached)?;
373 fields_ids_map.id_or_insert("_geo.lat").ok_or(UserError::AttributeLimitReached)?;
374 fields_ids_map.id_or_insert("_geo.lng").ok_or(UserError::AttributeLimitReached)?;
375 unordered_field_buffer.push((fid, geo_value));
376 }
377
378 unordered_field_buffer.sort_by_key(|(fid, _)| *fid);
379 for (fid, value) in unordered_field_buffer.iter() {
380 writer.insert(*fid, value.get().as_bytes()).unwrap();
381 }
382
383 writer.finish().unwrap();
384 Ok(KvReaderFieldId::from_slice(document_buffer))
385}
386
387pub type Entry<'doc> = (&'doc str, &'doc RawValue);
388
389#[derive(Debug)]
390pub struct Versions<'doc> {
391 data: RawMap<'doc, FxBuildHasher>,
392}
393
394impl<'doc> Versions<'doc> {
395 pub fn multiple(
396 mut versions: impl Iterator<Item = Result<RawMap<'doc, FxBuildHasher>>>,
397 ) -> Result<Option<Self>> {
398 let Some(data) = versions.next() else { return Ok(None) };
399 let mut data = data?;
400 for future_version in versions {
401 let future_version = future_version?;
402 for (field, value) in future_version {
403 data.insert(field, value);
404 }
405 }
406 Ok(Some(Self::single(data)))
407 }
408
409 pub fn single(version: RawMap<'doc, FxBuildHasher>) -> Self {
410 Self { data: version }
411 }
412
413 pub fn iter_top_level_fields(&self) -> impl Iterator<Item = (&'doc str, &'doc RawValue)> + '_ {
414 self.data
415 .iter()
416 .filter(|(k, _)| *k != RESERVED_VECTORS_FIELD_NAME && *k != RESERVED_GEO_FIELD_NAME)
417 }
418
419 pub fn vectors_field(&self) -> Option<&'doc RawValue> {
420 self.data.get(RESERVED_VECTORS_FIELD_NAME)
421 }
422
423 pub fn geo_field(&self) -> Option<&'doc RawValue> {
424 self.data.get(RESERVED_GEO_FIELD_NAME)
425 }
426
427 pub fn len(&self) -> usize {
428 self.data.len()
429 }
430
431 pub fn is_empty(&self) -> bool {
432 self.data.is_empty()
433 }
434
435 pub fn top_level_field(&self, k: &str) -> Option<&'doc RawValue> {
436 if k == RESERVED_VECTORS_FIELD_NAME || k == RESERVED_GEO_FIELD_NAME {
437 return None;
438 }
439 self.data.get(k)
440 }
441}