use std::io::{self, Write};
use grenad::{CompressionType, WriterBuilder};
use serde::de::Deserializer;
use serde_json::to_writer;
use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY};
use crate::documents::serde_impl::DocumentVisitor;
use crate::Object;
pub struct DocumentsBatchBuilder<W> {
writer: grenad::Writer<W>,
fields_index: DocumentsBatchIndex,
documents_count: u32,
obkv_buffer: Vec<u8>,
value_buffer: Vec<u8>,
}
impl<W: Write> DocumentsBatchBuilder<W> {
pub fn new(writer: W) -> DocumentsBatchBuilder<W> {
DocumentsBatchBuilder {
writer: WriterBuilder::new().compression_type(CompressionType::None).build(writer),
fields_index: DocumentsBatchIndex::default(),
documents_count: 0,
obkv_buffer: Vec::new(),
value_buffer: Vec::new(),
}
}
pub fn documents_count(&self) -> u32 {
self.documents_count
}
pub fn append_json_object(&mut self, object: &Object) -> io::Result<()> {
let mut fields_ids: Vec<_> = object.keys().map(|k| self.fields_index.insert(k)).collect();
fields_ids.sort_unstable();
self.obkv_buffer.clear();
let mut writer = obkv::KvWriter::new(&mut self.obkv_buffer);
for field_id in fields_ids {
let key = self.fields_index.name(field_id).unwrap();
self.value_buffer.clear();
to_writer(&mut self.value_buffer, &object[key])?;
writer.insert(field_id, &self.value_buffer)?;
}
let internal_id = self.documents_count.to_be_bytes();
let document_bytes = writer.into_inner()?;
self.writer.insert(internal_id, &document_bytes)?;
self.documents_count += 1;
Ok(())
}
pub fn append_json_array<R: io::Read>(&mut self, reader: R) -> Result<(), Error> {
let mut de = serde_json::Deserializer::from_reader(reader);
let mut visitor = DocumentVisitor::new(self);
de.deserialize_any(&mut visitor)?
}
pub fn into_inner(mut self) -> io::Result<W> {
let DocumentsBatchBuilder { mut writer, fields_index, .. } = self;
self.value_buffer.clear();
to_writer(&mut self.value_buffer, &fields_index)?;
writer.insert(DOCUMENTS_BATCH_INDEX_KEY, &self.value_buffer)?;
writer.into_inner()
}
}
#[cfg(test)]
mod test {
use std::io::Cursor;
use super::*;
use crate::documents::DocumentsBatchReader;
#[test]
fn add_single_documents_json() {
let json = serde_json::json!({
"id": 1,
"field": "hello!",
});
let mut builder = DocumentsBatchBuilder::new(Vec::new());
builder.append_json_object(json.as_object().unwrap()).unwrap();
let json = serde_json::json!({
"blabla": false,
"field": "hello!",
"id": 1,
});
builder.append_json_object(json.as_object().unwrap()).unwrap();
assert_eq!(builder.documents_count(), 2);
let vector = builder.into_inner().unwrap();
let (mut cursor, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
.unwrap()
.into_cursor_and_fields_index();
assert_eq!(index.len(), 3);
let document = cursor.next_document().unwrap().unwrap();
assert_eq!(document.iter().count(), 2);
let document = cursor.next_document().unwrap().unwrap();
assert_eq!(document.iter().count(), 3);
assert!(cursor.next_document().unwrap().is_none());
}
}