use std::ops::Range;
use arrow_array::{RecordBatch, UInt32Array};
use uuid::Uuid;
use super::fragment::FragmentReader;
use super::Dataset;
use crate::dataset::FileFragment;
use crate::datatypes::Schema;
use crate::format::Fragment;
use crate::io::deletion::DeletionVector;
use crate::{io::FileWriter, Error, Result};
use snafu::{location, Location};
pub struct Updater {
fragment: FileFragment,
reader: FragmentReader,
last_input: Option<RecordBatch>,
writer: Option<FileWriter>,
output_schema: Option<Schema>,
batch_id: usize,
start_row_id: u32,
deletion_vector: DeletionVector,
}
impl Updater {
pub(super) fn new(
fragment: FileFragment,
reader: FragmentReader,
deletion_vector: DeletionVector,
) -> Self {
Self {
fragment,
reader,
last_input: None,
writer: None,
output_schema: None,
batch_id: 0,
start_row_id: 0,
deletion_vector,
}
}
pub fn fragment(&self) -> &FileFragment {
&self.fragment
}
pub fn dataset(&self) -> &Dataset {
self.fragment.dataset()
}
pub async fn next(&mut self) -> Result<Option<&RecordBatch>> {
if self.batch_id >= self.reader.num_batches() {
return Ok(None);
}
let batch = self.reader.read_batch(self.batch_id, ..).await?;
self.batch_id += 1;
self.last_input = Some(batch);
Ok(self.last_input.as_ref())
}
async fn new_writer(&mut self, schema: Schema) -> Result<FileWriter> {
let existing_schema = self.fragment.dataset().schema();
for field in schema.fields.iter() {
if existing_schema.field(&field.name).is_some() {
return Err(Error::IO {
message: format!(
"Append column: duplicated column {} already exists",
field.name
),
location: location!(),
});
}
}
let file_name = format!("{}.lance", Uuid::new_v4());
self.fragment.metadata.add_file(&file_name, &schema);
let full_path = self.fragment.dataset().data_dir().child(file_name.as_str());
FileWriter::try_new(
self.fragment.dataset().object_store.as_ref(),
&full_path,
schema,
&Default::default(),
)
.await
}
pub async fn update(&mut self, batch: RecordBatch) -> Result<()> {
let Some(last) = self.last_input.as_ref() else {
return Err(Error::IO {
message: "Fragment Updater: no input data is available before update".to_string(),
location: location!(),
});
};
if last.num_rows() != batch.num_rows() {
return Err(Error::IO {
message: format!(
"Fragment Updater: new batch has different size with the source batch: {} != {}",
last.num_rows(),
batch.num_rows()
),
location: location!(),
});
};
if self.writer.is_none() {
let output_schema = batch.schema();
let merged = self.fragment.schema().merge(output_schema.as_ref())?;
let schema = merged.project_by_schema(output_schema.as_ref())?;
self.output_schema = Some(merged);
self.writer = Some(self.new_writer(schema).await?);
}
let writer = self.writer.as_mut().unwrap();
let row_id_stride = self.reader.num_rows_in_batch(self.batch_id - 1) as u32; let batch = add_blanks(
batch,
self.start_row_id..(self.start_row_id + row_id_stride),
&self.deletion_vector,
)?;
if batch.num_rows() != row_id_stride as usize {
return Err(Error::Internal {
message: format!(
"Fragment Updater: batch size mismatch: {} != {}",
batch.num_rows(),
row_id_stride
),
});
}
writer.write(&[batch]).await?;
self.start_row_id += row_id_stride;
Ok(())
}
pub async fn finish(&mut self) -> Result<Fragment> {
if let Some(writer) = self.writer.as_mut() {
writer.finish().await?;
}
Ok(self.fragment.metadata().clone())
}
pub fn schema(&self) -> Option<&Schema> {
self.output_schema.as_ref()
}
}
pub(crate) fn add_blanks(
batch: RecordBatch,
row_id_range: Range<u32>,
deletion_vector: &DeletionVector,
) -> Result<RecordBatch> {
if !row_id_range
.clone()
.any(|row_id| deletion_vector.contains(row_id))
{
return Ok(batch);
}
if batch.num_rows() == 0 {
return Err(Error::NotSupported {
source: "Missing many rows in merge".into(),
});
}
let mut array_i = 0;
let selection_vector: Vec<u32> = row_id_range
.map(move |row_id| {
if deletion_vector.contains(row_id) {
0
} else {
array_i += 1;
array_i - 1
}
})
.collect();
let selection_vector = UInt32Array::from(selection_vector);
let arrays = batch
.columns()
.iter()
.map(|array| {
arrow::compute::take(array.as_ref(), &selection_vector, None).map_err(|e| {
Error::Arrow {
message: format!("Failed to add blanks: {}", e),
}
})
})
.collect::<Result<Vec<_>>>()?;
let batch = RecordBatch::try_new(batch.schema(), arrays)?;
Ok(batch)
}