use std::collections::HashMap;
use uni_common::{Result, UniError, Value};
use crate::bulk::{BulkBackend, BulkStats, BulkWriter, BulkWriterBuilder};
pub struct AppenderBuilder {
backend: BulkBackend,
label: String,
batch_size: usize,
defer_vector_indexes: bool,
max_buffer_size_bytes: Option<usize>,
}
impl AppenderBuilder {
pub fn new_from_tx(backend: BulkBackend, label: &str) -> Self {
Self {
backend,
label: label.to_string(),
batch_size: 5000,
defer_vector_indexes: true,
max_buffer_size_bytes: None,
}
}
pub fn batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
self.defer_vector_indexes = defer;
self
}
pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
self.max_buffer_size_bytes = Some(size);
self
}
pub fn build(self) -> Result<StreamingAppender> {
let mut bulk_builder = BulkWriterBuilder::new_unguarded(self.backend)
.batch_size(self.batch_size)
.defer_vector_indexes(self.defer_vector_indexes);
if let Some(max_buf) = self.max_buffer_size_bytes {
bulk_builder = bulk_builder.max_buffer_size_bytes(max_buf);
}
let writer = bulk_builder.build()?;
Ok(StreamingAppender {
writer: Some(writer),
label: self.label,
batch_size: self.batch_size,
buffer: Vec::with_capacity(self.batch_size),
})
}
}
pub struct StreamingAppender {
writer: Option<BulkWriter>,
label: String,
batch_size: usize,
buffer: Vec<HashMap<String, Value>>,
}
impl StreamingAppender {
pub async fn append(&mut self, properties: impl Into<HashMap<String, Value>>) -> Result<()> {
self.buffer.push(properties.into());
if self.buffer.len() >= self.batch_size {
self.flush_buffer().await?;
}
Ok(())
}
pub async fn write_batch(&mut self, batch: &arrow_array::RecordBatch) -> Result<()> {
for props in crate::bulk::record_batch_to_property_maps(batch) {
self.buffer.push(props);
if self.buffer.len() >= self.batch_size {
self.flush_buffer().await?;
}
}
Ok(())
}
pub async fn finish(mut self) -> Result<BulkStats> {
self.flush_buffer().await?;
let writer = self
.writer
.take()
.ok_or_else(|| UniError::Internal(anyhow::anyhow!("Appender already finished")))?;
let stats = writer.commit().await.map_err(UniError::Internal)?;
Ok(stats)
}
pub async fn abort(mut self) -> Result<()> {
self.buffer.clear();
if let Some(writer) = self.writer.take() {
writer.abort().await.map_err(UniError::Internal)?;
}
Ok(())
}
pub fn buffered_count(&self) -> usize {
self.buffer.len()
}
async fn flush_buffer(&mut self) -> Result<()> {
if self.buffer.is_empty() {
return Ok(());
}
let rows = std::mem::replace(&mut self.buffer, Vec::with_capacity(self.batch_size));
let writer = self
.writer
.as_mut()
.ok_or_else(|| UniError::Internal(anyhow::anyhow!("Appender already finished")))?;
writer
.insert_vertices(&self.label, rows)
.await
.map_err(UniError::Internal)?;
Ok(())
}
}