milli_core/documents/
enriched.rs1use std::fs::File;
2use std::io::BufReader;
3use std::{io, str};
4
5use obkv::KvReader;
6
7use super::{
8 DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchIndex, DocumentsBatchReader,
9 Error,
10};
11use crate::update::DocumentId;
12use crate::FieldId;
13
14pub struct EnrichedDocumentsBatchReader<R> {
21 documents: DocumentsBatchReader<R>,
22 primary_key: String,
23 external_ids: grenad::ReaderCursor<BufReader<File>>,
24}
25
26impl<R: io::Read + io::Seek> EnrichedDocumentsBatchReader<R> {
27 pub fn new(
28 documents: DocumentsBatchReader<R>,
29 primary_key: String,
30 external_ids: grenad::Reader<BufReader<File>>,
31 ) -> Result<Self, Error> {
32 if documents.documents_count() as u64 == external_ids.len() {
33 Ok(EnrichedDocumentsBatchReader {
34 documents,
35 primary_key,
36 external_ids: external_ids.into_cursor()?,
37 })
38 } else {
39 Err(Error::InvalidEnrichedData)
40 }
41 }
42
43 pub fn documents_count(&self) -> u32 {
44 self.documents.documents_count()
45 }
46
47 pub fn primary_key(&self) -> &str {
48 &self.primary_key
49 }
50
51 pub fn is_empty(&self) -> bool {
52 self.documents.is_empty()
53 }
54
55 pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
56 self.documents.documents_batch_index()
57 }
58
59 pub fn into_cursor_and_fields_index(
61 self,
62 ) -> (EnrichedDocumentsBatchCursor<R>, DocumentsBatchIndex) {
63 let EnrichedDocumentsBatchReader { documents, primary_key, mut external_ids } = self;
64 let (documents, fields_index) = documents.into_cursor_and_fields_index();
65 external_ids.reset();
66 (EnrichedDocumentsBatchCursor { documents, primary_key, external_ids }, fields_index)
67 }
68}
69
70#[derive(Debug, Clone)]
71pub struct EnrichedDocument<'a> {
72 pub document: &'a KvReader<FieldId>,
73 pub document_id: DocumentId,
74}
75
76pub struct EnrichedDocumentsBatchCursor<R> {
77 documents: DocumentsBatchCursor<R>,
78 primary_key: String,
79 external_ids: grenad::ReaderCursor<BufReader<File>>,
80}
81
82impl<R> EnrichedDocumentsBatchCursor<R> {
83 pub fn primary_key(&self) -> &str {
84 &self.primary_key
85 }
86 pub fn reset(&mut self) {
88 self.documents.reset();
89 self.external_ids.reset();
90 }
91}
92
93impl<R: io::Read + io::Seek> EnrichedDocumentsBatchCursor<R> {
94 pub fn next_enriched_document(
97 &mut self,
98 ) -> Result<Option<EnrichedDocument<'_>>, DocumentsBatchCursorError> {
99 let document = self.documents.next_document()?;
100 let document_id = match self.external_ids.move_on_next()? {
101 Some((_, bytes)) => serde_json::from_slice(bytes).map(Some)?,
102 None => None,
103 };
104
105 match document.zip(document_id) {
106 Some((document, document_id)) => Ok(Some(EnrichedDocument { document, document_id })),
107 None => Ok(None),
108 }
109 }
110}