use std::fs::File;
use std::io::BufReader;
use std::{io, str};
use obkv::KvReader;
use super::{
DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchIndex, DocumentsBatchReader,
Error,
};
use crate::update::DocumentId;
use crate::FieldId;
pub struct EnrichedDocumentsBatchReader<R> {
documents: DocumentsBatchReader<R>,
primary_key: String,
external_ids: grenad::ReaderCursor<BufReader<File>>,
}
impl<R: io::Read + io::Seek> EnrichedDocumentsBatchReader<R> {
pub fn new(
documents: DocumentsBatchReader<R>,
primary_key: String,
external_ids: grenad::Reader<BufReader<File>>,
) -> Result<Self, Error> {
if documents.documents_count() as u64 == external_ids.len() {
Ok(EnrichedDocumentsBatchReader {
documents,
primary_key,
external_ids: external_ids.into_cursor()?,
})
} else {
Err(Error::InvalidEnrichedData)
}
}
pub fn documents_count(&self) -> u32 {
self.documents.documents_count()
}
pub fn primary_key(&self) -> &str {
&self.primary_key
}
pub fn is_empty(&self) -> bool {
self.documents.is_empty()
}
pub fn documents_batch_index(&self) -> &DocumentsBatchIndex {
self.documents.documents_batch_index()
}
pub fn into_cursor_and_fields_index(
self,
) -> (EnrichedDocumentsBatchCursor<R>, DocumentsBatchIndex) {
let EnrichedDocumentsBatchReader { documents, primary_key, mut external_ids } = self;
let (documents, fields_index) = documents.into_cursor_and_fields_index();
external_ids.reset();
(EnrichedDocumentsBatchCursor { documents, primary_key, external_ids }, fields_index)
}
}
#[derive(Debug, Clone)]
pub struct EnrichedDocument<'a> {
pub document: &'a KvReader<FieldId>,
pub document_id: DocumentId,
}
pub struct EnrichedDocumentsBatchCursor<R> {
documents: DocumentsBatchCursor<R>,
primary_key: String,
external_ids: grenad::ReaderCursor<BufReader<File>>,
}
impl<R> EnrichedDocumentsBatchCursor<R> {
pub fn primary_key(&self) -> &str {
&self.primary_key
}
pub fn reset(&mut self) {
self.documents.reset();
self.external_ids.reset();
}
}
impl<R: io::Read + io::Seek> EnrichedDocumentsBatchCursor<R> {
pub fn next_enriched_document(
&mut self,
) -> Result<Option<EnrichedDocument<'_>>, DocumentsBatchCursorError> {
let document = self.documents.next_document()?;
let document_id = match self.external_ids.move_on_next()? {
Some((_, bytes)) => serde_json::from_slice(bytes).map(Some)?,
None => None,
};
match document.zip(document_id) {
Some((document, document_id)) => Ok(Some(EnrichedDocument { document, document_id })),
None => Ok(None),
}
}
}