use anyhow::{Result, anyhow};
use chrono::Utc;
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use uni_common::UniConfig;
use uni_common::Value;
use uni_common::core::id::{Eid, Vid};
use uni_common::core::schema::SchemaManager;
use uni_common::core::snapshot::{EdgeSnapshot, LabelSnapshot, SnapshotManifest};
use uni_common::{Properties, UniError};
use uni_plugin_host::shutdown::ShutdownHandle;
use uni_store::runtime::writer::Writer;
use uni_store::storage::delta::{L1Entry, Op};
use uni_store::storage::main_edge::MainEdgeDataset;
use uni_store::storage::main_vertex::MainVertexDataset;
use uni_store::storage::manager::StorageManager;
use uni_store::storage::{IndexManager, IndexRebuildManager};
use uuid::Uuid;
#[derive(Clone)]
pub struct BulkBackend {
pub storage: Arc<StorageManager>,
pub writer: Option<Arc<Writer>>,
pub schema: Arc<SchemaManager>,
pub shutdown: Arc<ShutdownHandle>,
pub config: UniConfig,
}
pub trait IntoArrow {
fn into_property_maps(self) -> Vec<HashMap<String, Value>>;
}
impl IntoArrow for Vec<HashMap<String, Value>> {
fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
self
}
}
impl IntoArrow for arrow_array::RecordBatch {
fn into_property_maps(self) -> Vec<HashMap<String, Value>> {
record_batch_to_property_maps(&self)
}
}
pub fn record_batch_to_property_maps(
batch: &arrow_array::RecordBatch,
) -> Vec<HashMap<String, Value>> {
let schema = batch.schema();
let num_rows = batch.num_rows();
let mut rows = Vec::with_capacity(num_rows);
for row_idx in 0..num_rows {
let mut props = HashMap::with_capacity(schema.fields().len());
for (col_idx, field) in schema.fields().iter().enumerate() {
let col = batch.column(col_idx);
let value =
uni_store::storage::arrow_convert::arrow_to_value(col.as_ref(), row_idx, None);
if !value.is_null() {
props.insert(field.name().clone(), value);
}
}
rows.push(props);
}
rows
}
pub struct BulkWriterBuilder {
backend: BulkBackend,
config: BulkConfig,
progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
}
impl BulkWriterBuilder {
pub fn new_unguarded(backend: BulkBackend) -> Self {
Self {
backend,
config: BulkConfig::default(),
progress_callback: None,
}
}
pub fn defer_vector_indexes(mut self, defer: bool) -> Self {
self.config.defer_vector_indexes = defer;
self
}
pub fn defer_scalar_indexes(mut self, defer: bool) -> Self {
self.config.defer_scalar_indexes = defer;
self
}
pub fn batch_size(mut self, size: usize) -> Self {
self.config.batch_size = size;
self
}
pub fn on_progress<F: Fn(BulkProgress) + Send + 'static>(mut self, f: F) -> Self {
self.progress_callback = Some(Box::new(f));
self
}
pub fn async_indexes(mut self, async_: bool) -> Self {
self.config.async_indexes = async_;
self
}
pub fn validate_constraints(mut self, validate: bool) -> Self {
self.config.validate_constraints = validate;
self
}
pub fn max_buffer_size_bytes(mut self, size: usize) -> Self {
self.config.max_buffer_size_bytes = size;
self
}
pub fn build(self) -> Result<BulkWriter> {
if self.backend.writer.is_none() {
return Err(anyhow!("BulkWriter requires a writable database instance"));
}
Ok(BulkWriter {
backend: self.backend,
config: self.config,
progress_callback: self.progress_callback,
stats: BulkStats::default(),
start_time: Instant::now(),
pending_vertices: HashMap::new(),
pending_edges: HashMap::new(),
touched_labels: HashSet::new(),
touched_edge_types: HashSet::new(),
initial_table_versions: HashMap::new(),
buffer_size_bytes: 0,
committed: false,
})
}
}
pub struct BulkConfig {
pub defer_vector_indexes: bool,
pub defer_scalar_indexes: bool,
pub batch_size: usize,
pub async_indexes: bool,
pub validate_constraints: bool,
pub max_buffer_size_bytes: usize,
}
impl Default for BulkConfig {
fn default() -> Self {
Self {
defer_vector_indexes: true,
defer_scalar_indexes: true,
batch_size: 10_000,
async_indexes: false,
validate_constraints: true,
max_buffer_size_bytes: 1_073_741_824, }
}
}
#[derive(Debug, Clone)]
pub struct BulkProgress {
pub phase: BulkPhase,
pub rows_processed: usize,
pub total_rows: Option<usize>,
pub current_label: Option<String>,
pub elapsed: Duration,
}
#[derive(Debug, Clone)]
pub enum BulkPhase {
Inserting,
RebuildingIndexes { label: String },
Finalizing,
}
#[derive(Debug, Clone, Default)]
pub struct BulkStats {
pub vertices_inserted: usize,
pub edges_inserted: usize,
pub indexes_rebuilt: usize,
pub duration: Duration,
pub index_build_duration: Duration,
pub index_task_ids: Vec<String>,
pub indexes_pending: bool,
}
#[derive(Debug, Clone)]
pub struct EdgeData {
pub src_vid: Vid,
pub dst_vid: Vid,
pub properties: Properties,
}
impl EdgeData {
pub fn new(src_vid: Vid, dst_vid: Vid, properties: Properties) -> Self {
Self {
src_vid,
dst_vid,
properties,
}
}
}
pub struct BulkWriter {
backend: BulkBackend,
config: BulkConfig,
progress_callback: Option<Box<dyn Fn(BulkProgress) + Send>>,
stats: BulkStats,
start_time: Instant,
pending_vertices: HashMap<String, Vec<(Vid, Properties)>>,
pending_edges: HashMap<String, Vec<L1Entry>>,
touched_labels: HashSet<String>,
touched_edge_types: HashSet<String>,
initial_table_versions: HashMap<String, Option<u64>>,
buffer_size_bytes: usize,
committed: bool,
}
impl BulkWriter {
pub fn stats(&self) -> &BulkStats {
&self.stats
}
pub fn touched_labels(&self) -> Vec<String> {
self.touched_labels.iter().cloned().collect()
}
pub fn touched_edge_types(&self) -> Vec<String> {
self.touched_edge_types.iter().cloned().collect()
}
fn get_current_timestamp_micros() -> i64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.unwrap_or(0)
}
pub async fn insert_vertices(
&mut self,
label: &str,
vertices: impl IntoArrow,
) -> Result<Vec<Vid>> {
let vertices = vertices.into_property_maps();
let schema = self.backend.schema.schema();
schema
.labels
.get(label)
.ok_or_else(|| UniError::LabelNotFound {
label: label.to_string(),
})?;
if self.config.validate_constraints {
self.validate_vertex_batch_constraints(label, &vertices)
.await?;
}
let vids = {
let writer = self.backend.writer.as_ref().unwrap();
writer
.allocate_vids(vertices.len())
.await
.map_err(UniError::Internal)?
};
let buffer = self.pending_vertices.entry(label.to_string()).or_default();
for (i, props) in vertices.into_iter().enumerate() {
self.buffer_size_bytes += Self::estimate_properties_size(&props);
buffer.push((vids[i], props));
}
self.touched_labels.insert(label.to_string());
if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
self.checkpoint().await?;
} else {
self.check_flush_vertices(label).await?;
}
self.stats.vertices_inserted += vids.len();
self.report_progress(
BulkPhase::Inserting,
self.stats.vertices_inserted,
Some(label.to_string()),
);
Ok(vids)
}
fn estimate_properties_size(props: &Properties) -> usize {
let mut size = 0;
for (key, value) in props {
size += key.len();
size += Self::estimate_value_size(value);
}
size
}
fn estimate_value_size(value: &Value) -> usize {
match value {
Value::Null => 1,
Value::Bool(_) => 1,
Value::Int(_) | Value::Float(_) => 8,
Value::String(s) => s.len(),
Value::Bytes(b) => b.len(),
Value::List(arr) => arr.iter().map(Self::estimate_value_size).sum::<usize>() + 8,
Value::Map(obj) => {
obj.iter()
.map(|(k, v)| k.len() + Self::estimate_value_size(v))
.sum::<usize>()
+ 8
}
Value::Vector(v) => v.len() * 4,
_ => 16, }
}
async fn validate_vertex_batch_constraints(
&self,
label: &str,
vertices: &[Properties],
) -> Result<()> {
let schema = self.backend.schema.schema();
if let Some(props_meta) = schema.properties.get(label) {
for (idx, props) in vertices.iter().enumerate() {
for (prop_name, meta) in props_meta {
if !meta.nullable && props.get(prop_name).is_none_or(|v| v.is_null()) {
return Err(anyhow!(
"NOT NULL constraint violation at row {}: property '{}' cannot be null for label '{}'",
idx,
prop_name,
label
));
}
}
}
}
for constraint in &schema.constraints {
if !constraint.enabled {
continue;
}
match &constraint.target {
uni_common::core::schema::ConstraintTarget::Label(l) if l == label => {}
_ => continue,
}
match &constraint.constraint_type {
uni_common::core::schema::ConstraintType::Unique {
properties: unique_props,
} => {
let mut seen_keys: HashSet<String> = HashSet::new();
for (idx, props) in vertices.iter().enumerate() {
let key = self.compute_unique_key(unique_props, props);
if let Some(k) = key
&& !seen_keys.insert(k.clone())
{
return Err(anyhow!(
"UNIQUE constraint violation at row {}: duplicate key '{}' in batch",
idx,
k
));
}
}
if let Some(buffered) = self.pending_vertices.get(label) {
for (idx, props) in vertices.iter().enumerate() {
let key = self.compute_unique_key(unique_props, props);
if let Some(k) = key {
for (_, buffered_props) in buffered {
let buffered_key =
self.compute_unique_key(unique_props, buffered_props);
if buffered_key.as_ref() == Some(&k) {
return Err(anyhow!(
"UNIQUE constraint violation at row {}: key '{}' conflicts with buffered data",
idx,
k
));
}
}
}
}
}
}
uni_common::core::schema::ConstraintType::Exists { property } => {
for (idx, props) in vertices.iter().enumerate() {
if props.get(property).is_none_or(|v| v.is_null()) {
return Err(anyhow!(
"EXISTS constraint violation at row {}: property '{}' must exist",
idx,
property
));
}
}
}
uni_common::core::schema::ConstraintType::Check { expression } => {
for (idx, props) in vertices.iter().enumerate() {
if !self.evaluate_check_expression(expression, props)? {
return Err(anyhow!(
"CHECK constraint '{}' violated at row {}: expression '{}' evaluated to false",
constraint.name,
idx,
expression
));
}
}
}
_ => {}
}
}
Ok(())
}
fn compute_unique_key(&self, unique_props: &[String], props: &Properties) -> Option<String> {
let mut parts = Vec::new();
for prop in unique_props {
match props.get(prop) {
Some(v) if !v.is_null() => parts.push(v.to_string()),
_ => return None, }
}
Some(parts.join(":"))
}
fn evaluate_check_expression(&self, expression: &str, properties: &Properties) -> Result<bool> {
let parts: Vec<&str> = expression.split_whitespace().collect();
if parts.len() != 3 {
return Ok(true);
}
let prop_part = parts[0].trim_start_matches('(');
let prop_name = if let Some(idx) = prop_part.find('.') {
&prop_part[idx + 1..]
} else {
prop_part
};
let op = parts[1];
let val_str = parts[2].trim_end_matches(')');
let prop_val = match properties.get(prop_name) {
Some(v) => v,
None => return Ok(true), };
let target_val = if (val_str.starts_with('\'') && val_str.ends_with('\''))
|| (val_str.starts_with('"') && val_str.ends_with('"'))
{
Value::String(val_str[1..val_str.len() - 1].to_string())
} else if let Ok(n) = val_str.parse::<i64>() {
Value::Int(n)
} else if let Ok(n) = val_str.parse::<f64>() {
Value::Float(n)
} else if let Ok(b) = val_str.parse::<bool>() {
Value::Bool(b)
} else {
Value::String(val_str.to_string())
};
match op {
"=" | "==" => Ok(prop_val == &target_val),
"!=" | "<>" => Ok(prop_val != &target_val),
">" => self.compare_values(prop_val, &target_val).map(|c| c > 0),
"<" => self.compare_values(prop_val, &target_val).map(|c| c < 0),
">=" => self.compare_values(prop_val, &target_val).map(|c| c >= 0),
"<=" => self.compare_values(prop_val, &target_val).map(|c| c <= 0),
_ => Ok(true), }
}
fn compare_values(&self, a: &Value, b: &Value) -> Result<i8> {
let ordering = match (a, b) {
(Value::Int(n1), Value::Int(n2)) => n1.cmp(n2),
(Value::Float(f1), Value::Float(f2)) => f1.partial_cmp(f2).unwrap_or(Ordering::Equal),
(Value::Int(n), Value::Float(f)) => {
(*n as f64).partial_cmp(f).unwrap_or(Ordering::Equal)
}
(Value::Float(f), Value::Int(n)) => {
f.partial_cmp(&(*n as f64)).unwrap_or(Ordering::Equal)
}
(Value::String(s1), Value::String(s2)) => s1.cmp(s2),
_ => {
return Err(anyhow!(
"Cannot compare incompatible types: {:?} vs {:?}",
a,
b
));
}
};
Ok(ordering as i8)
}
async fn checkpoint(&mut self) -> Result<()> {
log::debug!(
"Checkpoint triggered at {} bytes (limit: {})",
self.buffer_size_bytes,
self.config.max_buffer_size_bytes
);
let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
for label in labels {
self.flush_vertices_buffer(&label).await?;
}
let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
for edge_type in edge_types {
self.flush_edges_buffer(&edge_type).await?;
}
self.buffer_size_bytes = 0;
Ok(())
}
async fn check_flush_vertices(&mut self, label: &str) -> Result<()> {
let should_flush = self
.pending_vertices
.get(label)
.is_some_and(|buf| buf.len() >= self.config.batch_size);
if should_flush {
self.flush_vertices_buffer(label).await?;
}
Ok(())
}
async fn flush_vertices_buffer(&mut self, label: &str) -> Result<()> {
if let Some(vertices) = self.pending_vertices.remove(label) {
if vertices.is_empty() {
return Ok(());
}
let table_name = uni_store::backend::table_names::vertex_table_name(label);
if !self.initial_table_versions.contains_key(&table_name) {
let backend = self.backend.storage.backend();
let version = backend
.get_table_version(&table_name)
.await
.map_err(UniError::Internal)?;
self.initial_table_versions.insert(table_name, version);
}
let main_table_name =
uni_store::backend::table_names::main_vertex_table_name().to_string();
if !self.initial_table_versions.contains_key(&main_table_name) {
let backend = self.backend.storage.backend();
let version = backend
.get_table_version(&main_table_name)
.await
.map_err(UniError::Internal)?;
self.initial_table_versions
.insert(main_table_name.clone(), version);
}
let ds = self
.backend
.storage
.vertex_dataset(label)
.map_err(UniError::Internal)?;
let schema = self.backend.schema.schema();
let deleted = vec![false; vertices.len()];
let versions = vec![1; vertices.len()];
let now = Self::get_current_timestamp_micros();
let mut created_at: HashMap<Vid, i64> = HashMap::new();
let mut updated_at: HashMap<Vid, i64> = HashMap::new();
for (vid, _) in &vertices {
created_at.insert(*vid, now);
updated_at.insert(*vid, now);
}
let labels = vec![label.to_string()];
let vertices_with_labels: Vec<(Vid, Vec<String>, Properties)> = vertices
.iter()
.map(|(vid, props)| (*vid, labels.clone(), props.clone()))
.collect();
let batch = ds
.build_record_batch_with_timestamps(
&vertices_with_labels,
&deleted,
&versions,
&schema,
Some(&created_at),
Some(&updated_at),
)
.map_err(UniError::Internal)?;
let backend = self.backend.storage.backend();
ds.write_batch(backend, batch, &schema)
.await
.map_err(UniError::Internal)?;
ds.ensure_default_indexes(backend)
.await
.map_err(UniError::Internal)?;
let main_vertices: Vec<(Vid, Vec<String>, Properties, bool, u64)> =
vertices_with_labels
.into_iter()
.map(|(vid, lbls, props)| (vid, lbls, props, false, 1u64))
.collect();
if !main_vertices.is_empty() {
let main_batch = MainVertexDataset::build_record_batch(
&main_vertices,
Some(&created_at),
Some(&updated_at),
)
.map_err(UniError::Internal)?;
MainVertexDataset::write_batch(backend, main_batch)
.await
.map_err(UniError::Internal)?;
MainVertexDataset::ensure_default_indexes(backend)
.await
.map_err(UniError::Internal)?;
}
}
Ok(())
}
pub async fn insert_edges(
&mut self,
edge_type: &str,
edges: Vec<EdgeData>,
) -> Result<Vec<Eid>> {
let schema = self.backend.schema.schema();
schema
.edge_types
.get(edge_type)
.ok_or_else(|| UniError::EdgeTypeNotFound {
edge_type: edge_type.to_string(),
})?;
let eids = {
let writer = self.backend.writer.as_ref().unwrap();
writer
.allocate_eids(edges.len())
.await
.map_err(UniError::Internal)?
};
let now = Self::get_current_timestamp_micros();
let mut added_size = 0usize;
let entries: Vec<L1Entry> = edges
.into_iter()
.enumerate()
.map(|(i, edge)| {
added_size += 32 + Self::estimate_properties_size(&edge.properties);
L1Entry {
src_vid: edge.src_vid,
dst_vid: edge.dst_vid,
eid: eids[i],
op: Op::Insert,
version: 1,
properties: edge.properties,
created_at: Some(now),
updated_at: Some(now),
}
})
.collect();
self.buffer_size_bytes += added_size;
self.pending_edges
.entry(edge_type.to_string())
.or_default()
.extend(entries);
self.touched_edge_types.insert(edge_type.to_string());
if self.buffer_size_bytes >= self.config.max_buffer_size_bytes {
self.checkpoint().await?;
} else {
self.check_flush_edges(edge_type).await?;
}
self.stats.edges_inserted += eids.len();
self.report_progress(
BulkPhase::Inserting,
self.stats.vertices_inserted + self.stats.edges_inserted,
Some(edge_type.to_string()),
);
Ok(eids)
}
async fn check_flush_edges(&mut self, edge_type: &str) -> Result<()> {
let should_flush = self
.pending_edges
.get(edge_type)
.is_some_and(|buf| buf.len() >= self.config.batch_size);
if should_flush {
self.flush_edges_buffer(edge_type).await?;
}
Ok(())
}
#[expect(
clippy::map_entry,
reason = "async code between contains_key and insert"
)]
async fn flush_edges_buffer(&mut self, edge_type: &str) -> Result<()> {
if let Some(entries) = self.pending_edges.remove(edge_type) {
if entries.is_empty() {
return Ok(());
}
let schema = self.backend.schema.schema();
let backend = self.backend.storage.backend();
let fwd_table_name =
uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
if !self.initial_table_versions.contains_key(&fwd_table_name) {
let version = backend
.get_table_version(&fwd_table_name)
.await
.map_err(UniError::Internal)?;
self.initial_table_versions.insert(fwd_table_name, version);
}
let bwd_table_name =
uni_store::backend::table_names::delta_table_name(edge_type, "bwd");
if !self.initial_table_versions.contains_key(&bwd_table_name) {
let version = backend
.get_table_version(&bwd_table_name)
.await
.map_err(UniError::Internal)?;
self.initial_table_versions.insert(bwd_table_name, version);
}
let main_edge_table_name =
uni_store::backend::table_names::main_edge_table_name().to_string();
if !self
.initial_table_versions
.contains_key(&main_edge_table_name)
{
let version = backend
.get_table_version(&main_edge_table_name)
.await
.map_err(UniError::Internal)?;
self.initial_table_versions
.insert(main_edge_table_name.clone(), version);
}
let mut fwd_entries = entries.clone();
fwd_entries.sort_by_key(|e| e.src_vid);
let fwd_ds = self
.backend
.storage
.delta_dataset(edge_type, "fwd")
.map_err(UniError::Internal)?;
let fwd_batch = fwd_ds
.build_record_batch(&fwd_entries, &schema)
.map_err(UniError::Internal)?;
let backend = self.backend.storage.backend();
fwd_ds
.write_run(backend, fwd_batch)
.await
.map_err(UniError::Internal)?;
fwd_ds
.ensure_eid_index(backend)
.await
.map_err(UniError::Internal)?;
let mut bwd_entries = entries.clone();
bwd_entries.sort_by_key(|e| e.dst_vid);
let bwd_ds = self
.backend
.storage
.delta_dataset(edge_type, "bwd")
.map_err(UniError::Internal)?;
let bwd_batch = bwd_ds
.build_record_batch(&bwd_entries, &schema)
.map_err(UniError::Internal)?;
bwd_ds
.write_run(backend, bwd_batch)
.await
.map_err(UniError::Internal)?;
bwd_ds
.ensure_eid_index(backend)
.await
.map_err(UniError::Internal)?;
let mut edge_created_at: HashMap<Eid, i64> = HashMap::new();
let mut edge_updated_at: HashMap<Eid, i64> = HashMap::new();
let main_edges: Vec<(Eid, Vid, Vid, String, Properties, bool, u64)> = entries
.iter()
.map(|e| {
let deleted = matches!(e.op, Op::Delete);
if let Some(ts) = e.created_at {
edge_created_at.insert(e.eid, ts);
}
if let Some(ts) = e.updated_at {
edge_updated_at.insert(e.eid, ts);
}
(
e.eid,
e.src_vid,
e.dst_vid,
edge_type.to_string(),
e.properties.clone(),
deleted,
e.version,
)
})
.collect();
if !main_edges.is_empty() {
let main_batch = MainEdgeDataset::build_record_batch(
&main_edges,
Some(&edge_created_at),
Some(&edge_updated_at),
)
.map_err(UniError::Internal)?;
MainEdgeDataset::write_batch(self.backend.storage.backend(), main_batch)
.await
.map_err(UniError::Internal)?;
MainEdgeDataset::ensure_default_indexes(self.backend.storage.backend())
.await
.map_err(UniError::Internal)?;
}
}
Ok(())
}
pub async fn commit(mut self) -> Result<BulkStats> {
let labels: Vec<String> = self.pending_vertices.keys().cloned().collect();
for label in labels {
self.flush_vertices_buffer(&label).await?;
}
let edge_types: Vec<String> = self.pending_edges.keys().cloned().collect();
for edge_type in edge_types {
self.flush_edges_buffer(&edge_type).await?;
}
let index_start = Instant::now();
if self.config.defer_vector_indexes || self.config.defer_scalar_indexes {
let labels_to_rebuild: Vec<String> = self.touched_labels.iter().cloned().collect();
if self.config.async_indexes && !labels_to_rebuild.is_empty() {
let schema = self.backend.schema.schema();
for label in &labels_to_rebuild {
for idx in &schema.indexes {
if idx.label() == label.as_str() {
let _ = self.backend.schema.update_index_metadata(idx.name(), |m| {
m.status = uni_common::core::schema::IndexStatus::Stale;
});
}
}
}
let rebuild_manager = IndexRebuildManager::new(
self.backend.storage.clone(),
self.backend.schema.clone(),
self.backend.config.index_rebuild.clone(),
)
.await
.map_err(UniError::Internal)?;
let task_ids = rebuild_manager
.schedule(labels_to_rebuild)
.await
.map_err(UniError::Internal)?;
self.stats.index_task_ids = task_ids;
self.stats.indexes_pending = true;
let manager = Arc::new(rebuild_manager);
let handle = manager.start_background_worker(self.backend.shutdown.subscribe());
self.backend.shutdown.track_task(handle);
} else {
for label in &labels_to_rebuild {
self.report_progress(
BulkPhase::RebuildingIndexes {
label: label.clone(),
},
self.stats.vertices_inserted + self.stats.edges_inserted,
Some(label.clone()),
);
let idx_mgr = IndexManager::new(
self.backend.storage.base_path(),
self.backend.storage.schema_manager_arc(),
);
idx_mgr
.rebuild_indexes_for_label(label)
.await
.map_err(UniError::Internal)?;
self.stats.indexes_rebuilt += 1;
let now = Utc::now();
let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
let row_count = self
.backend
.storage
.backend()
.count_rows(&vtable_name, None)
.await
.ok()
.map(|c| c as u64);
let schema = self.backend.schema.schema();
for idx in &schema.indexes {
if idx.label() == label.as_str() {
let _ = self.backend.schema.update_index_metadata(idx.name(), |m| {
m.status = uni_common::core::schema::IndexStatus::Online;
m.last_built_at = Some(now);
if let Some(count) = row_count {
m.row_count_at_build = Some(count);
}
});
}
}
}
}
}
self.stats.index_build_duration = index_start.elapsed();
self.report_progress(
BulkPhase::Finalizing,
self.stats.vertices_inserted + self.stats.edges_inserted,
None,
);
let mut manifest = self
.backend
.storage
.snapshot_manager()
.load_latest_snapshot()
.await
.map_err(UniError::Internal)?
.unwrap_or_else(|| {
SnapshotManifest::new(
Uuid::new_v4().to_string(),
self.backend.schema.schema().schema_version,
)
});
let parent_id = manifest.snapshot_id.clone();
manifest.parent_snapshot = Some(parent_id);
manifest.snapshot_id = Uuid::new_v4().to_string();
manifest.created_at = Utc::now();
let backend = self.backend.storage.backend();
for label in &self.touched_labels {
let vtable_name = uni_store::backend::table_names::vertex_table_name(label);
let count = backend
.count_rows(&vtable_name, None)
.await
.map_err(UniError::Internal)?;
let current_snap =
manifest
.vertices
.entry(label.to_string())
.or_insert(LabelSnapshot {
version: 0,
count: 0,
lance_version: 0,
});
current_snap.count = count as u64;
current_snap.lance_version = 0;
}
for edge_type in &self.touched_edge_types {
let delta_name = uni_store::backend::table_names::delta_table_name(edge_type, "fwd");
if let Ok(count) = backend.count_rows(&delta_name, None).await {
let current_snap =
manifest
.edges
.entry(edge_type.to_string())
.or_insert(EdgeSnapshot {
version: 0,
count: 0,
lance_version: 0,
});
current_snap.count = count as u64;
current_snap.lance_version = 0;
}
}
self.backend
.storage
.snapshot_manager()
.save_snapshot(&manifest)
.await
.map_err(UniError::Internal)?;
self.backend
.storage
.snapshot_manager()
.set_latest_snapshot(&manifest.snapshot_id)
.await
.map_err(UniError::Internal)?;
self.backend
.schema
.save()
.await
.map_err(UniError::Internal)?;
let schema = self.backend.storage.schema_manager().schema();
for edge_type_name in &self.touched_edge_types {
if let Some(meta) = schema.edge_types.get(edge_type_name.as_str()) {
let type_id = meta.id;
for &dir in uni_store::storage::direction::Direction::Both.expand() {
let _ = self
.backend
.storage
.warm_adjacency(type_id, dir, None)
.await;
}
}
}
self.committed = true;
self.stats.duration = self.start_time.elapsed();
Ok(self.stats.clone())
}
pub async fn abort(mut self) -> Result<()> {
if self.committed {
return Err(anyhow!("Cannot abort: bulk load already committed"));
}
self.pending_vertices.clear();
self.pending_edges.clear();
self.buffer_size_bytes = 0;
let backend = self.backend.storage.backend();
let mut rollback_errors = Vec::new();
let mut rolled_back_count = 0;
let mut dropped_count = 0;
for (table_name, initial_version) in &self.initial_table_versions {
match initial_version {
Some(version) => {
match backend.rollback_table(table_name, *version).await {
Ok(()) => {
log::info!("Rolled back table '{}' to version {}", table_name, version);
rolled_back_count += 1;
}
Err(e) => {
rollback_errors.push(format!("{}: {}", table_name, e));
}
}
}
None => {
match backend.drop_table(table_name).await {
Ok(()) => {
log::info!("Dropped table '{}' (created during bulk load)", table_name);
dropped_count += 1;
}
Err(e) => {
rollback_errors.push(format!("{}: {}", table_name, e));
}
}
}
}
}
self.backend.storage.backend().clear_cache();
if rollback_errors.is_empty() {
log::info!(
"Bulk load aborted successfully. Rolled back {} tables, dropped {} tables.",
rolled_back_count,
dropped_count
);
Ok(())
} else {
Err(anyhow!(
"Bulk load abort had {} rollback errors: {}",
rollback_errors.len(),
rollback_errors.join("; ")
))
}
}
fn report_progress(&self, phase: BulkPhase, rows: usize, label: Option<String>) {
if let Some(cb) = &self.progress_callback {
cb(BulkProgress {
phase,
rows_processed: rows,
total_rows: None,
current_label: label,
elapsed: self.start_time.elapsed(),
});
}
}
}