use crate::backend::StorageBackend;
use crate::backend::table_names;
use crate::backend::types::{ScalarIndexType, ScanRequest, WriteMode};
use crate::storage::arrow_convert::build_timestamp_column;
use crate::storage::property_builder::PropertyColumnBuilder;
use crate::storage::value_codec::CrdtDecodeMode;
use anyhow::{Result, anyhow};
use arrow_array::types::TimestampNanosecondType;
use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt8Array, UInt64Array};
use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
#[cfg(feature = "lance-backend")]
use futures::TryStreamExt;
#[cfg(feature = "lance-backend")]
use lance::dataset::Dataset;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::info;
use uni_common::DataType;
use uni_common::Properties;
use uni_common::core::id::{Eid, Vid};
use uni_common::core::schema::Schema;
pub const DEFAULT_MAX_COMPACTION_ROWS: usize = 5_000_000;
pub const ENTRY_SIZE_ESTIMATE: usize = 145;
pub fn check_oom_guard(
row_count: usize,
max_rows: usize,
entity_name: &str,
qualifier: &str,
) -> Result<()> {
if row_count > max_rows {
let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
return Err(anyhow!(
"Table for {}_{} has {} rows (estimated {:.2} GB in memory), exceeding max_compaction_rows limit of {}. \
Use chunked compaction or increase the limit. See issue #143.",
entity_name,
qualifier,
row_count,
estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0),
max_rows
));
}
Ok(())
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Op {
Insert = 0,
Delete = 1,
}
#[derive(Clone, Debug)]
pub struct L1Entry {
pub src_vid: Vid,
pub dst_vid: Vid,
pub eid: Eid,
pub op: Op,
pub version: u64,
pub properties: Properties,
pub created_at: Option<i64>,
pub updated_at: Option<i64>,
}
#[derive(Debug)]
pub struct DeltaDataset {
#[cfg_attr(not(feature = "lance-backend"), allow(dead_code))]
uri: String,
edge_type: String,
direction: String, }
impl DeltaDataset {
pub fn new(base_uri: &str, edge_type: &str, direction: &str) -> Self {
let uri = format!("{}/deltas/{}_{}", base_uri, edge_type, direction);
Self {
uri,
edge_type: edge_type.to_string(),
direction: direction.to_string(),
}
}
#[cfg(feature = "lance-backend")]
pub async fn open(&self) -> Result<Arc<Dataset>> {
self.open_at(None).await
}
#[cfg(feature = "lance-backend")]
pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
let mut ds = Dataset::open(&self.uri).await?;
if let Some(v) = version {
ds = ds.checkout_version(v).await?;
}
Ok(Arc::new(ds))
}
pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
let mut fields = vec![
Field::new("src_vid", arrow_schema::DataType::UInt64, false),
Field::new("dst_vid", arrow_schema::DataType::UInt64, false),
Field::new("eid", arrow_schema::DataType::UInt64, false),
Field::new("op", arrow_schema::DataType::UInt8, false), Field::new("_version", arrow_schema::DataType::UInt64, false),
Field::new(
"_created_at",
arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
true,
),
Field::new(
"_updated_at",
arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
true,
),
];
if let Some(type_props) = schema.properties.get(&self.edge_type) {
let mut sorted_props: Vec<_> = type_props.iter().collect();
sorted_props.sort_by_key(|(name, _)| *name);
for (name, meta) in sorted_props {
fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
}
}
fields.push(Field::new(
"overflow_json",
arrow_schema::DataType::LargeBinary,
true,
));
Ok(Arc::new(ArrowSchema::new(fields)))
}
pub fn build_record_batch(&self, entries: &[L1Entry], schema: &Schema) -> Result<RecordBatch> {
let arrow_schema = self.get_arrow_schema(schema)?;
let mut src_vids = Vec::with_capacity(entries.len());
let mut dst_vids = Vec::with_capacity(entries.len());
let mut eids = Vec::with_capacity(entries.len());
let mut ops = Vec::with_capacity(entries.len());
let mut versions = Vec::with_capacity(entries.len());
for entry in entries {
src_vids.push(entry.src_vid.as_u64());
dst_vids.push(entry.dst_vid.as_u64());
eids.push(entry.eid.as_u64());
ops.push(entry.op as u8);
versions.push(entry.version);
}
let mut columns: Vec<ArrayRef> = vec![
Arc::new(UInt64Array::from(src_vids)),
Arc::new(UInt64Array::from(dst_vids)),
Arc::new(UInt64Array::from(eids)),
Arc::new(UInt8Array::from(ops)),
Arc::new(UInt64Array::from(versions)),
];
columns.push(build_timestamp_column(entries.iter().map(|e| e.created_at)));
columns.push(build_timestamp_column(entries.iter().map(|e| e.updated_at)));
let deleted_flags: Vec<bool> = entries.iter().map(|e| e.op == Op::Delete).collect();
let prop_columns = PropertyColumnBuilder::new(schema, &self.edge_type, entries.len())
.with_deleted(&deleted_flags)
.build(|i| &entries[i].properties)?;
columns.extend(prop_columns);
let overflow_column = self.build_overflow_json_column(entries, schema)?;
columns.push(overflow_column);
RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
}
fn build_overflow_json_column(&self, entries: &[L1Entry], schema: &Schema) -> Result<ArrayRef> {
crate::storage::property_builder::build_overflow_json_column(
entries.len(),
&self.edge_type,
schema,
|i| &entries[i].properties,
&[],
)
}
#[cfg(feature = "lance-backend")]
pub async fn scan_all(&self, schema: &Schema) -> Result<Vec<L1Entry>> {
self.scan_all_with_limit(schema, DEFAULT_MAX_COMPACTION_ROWS)
.await
}
#[cfg(feature = "lance-backend")]
pub async fn scan_all_with_limit(
&self,
schema: &Schema,
max_rows: usize,
) -> Result<Vec<L1Entry>> {
let ds = match self.open().await {
Ok(ds) => ds,
Err(_) => return Ok(vec![]),
};
let row_count = ds.count_rows(None).await?;
check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
info!(
edge_type = %self.edge_type,
direction = %self.direction,
row_count,
estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
"Starting delta scan for compaction"
);
let mut stream = ds.scan().try_into_stream().await?;
let mut entries = Vec::new();
while let Some(batch) = stream.try_next().await? {
let mut batch_entries = self.parse_batch(&batch, schema)?;
entries.append(&mut batch_entries);
}
self.sort_entries(&mut entries);
Ok(entries)
}
fn sort_entries(&self, entries: &mut [L1Entry]) {
let is_fwd = self.direction == "fwd";
entries.sort_by(|a, b| {
let key_a = if is_fwd { a.src_vid } else { a.dst_vid };
let key_b = if is_fwd { b.src_vid } else { b.dst_vid };
key_a.cmp(&key_b).then(a.version.cmp(&b.version))
});
}
fn parse_batch(&self, batch: &RecordBatch, schema: &Schema) -> Result<Vec<L1Entry>> {
let src_vids = batch
.column_by_name("src_vid")
.ok_or(anyhow!("Missing src_vid"))?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or(anyhow!("Invalid src_vid type"))?;
let dst_vids = batch
.column_by_name("dst_vid")
.ok_or(anyhow!("Missing dst_vid"))?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or(anyhow!("Invalid dst_vid type"))?;
let eids = batch
.column_by_name("eid")
.ok_or(anyhow!("Missing eid"))?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or(anyhow!("Invalid eid type"))?;
let ops = batch
.column_by_name("op")
.ok_or(anyhow!("Missing op"))?
.as_any()
.downcast_ref::<UInt8Array>()
.ok_or(anyhow!("Invalid op type"))?;
let versions = batch
.column_by_name("_version")
.ok_or(anyhow!("Missing _version"))?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or(anyhow!("Invalid _version type"))?;
let created_at_col = batch.column_by_name("_created_at").and_then(|c| {
c.as_any()
.downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
});
let updated_at_col = batch.column_by_name("_updated_at").and_then(|c| {
c.as_any()
.downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
});
let mut prop_cols = Vec::new();
if let Some(type_props) = schema.properties.get(&self.edge_type) {
for (name, meta) in type_props {
if let Some(col) = batch.column_by_name(name) {
prop_cols.push((name, meta.r#type.clone(), col));
}
}
}
let mut entries = Vec::with_capacity(batch.num_rows());
for i in 0..batch.num_rows() {
let op = match ops.value(i) {
0 => Op::Insert,
1 => Op::Delete,
_ => continue, };
let properties = self.extract_properties(&prop_cols, i)?;
let read_ts = |col: Option<&PrimitiveArray<TimestampNanosecondType>>| {
col.and_then(|c| (!c.is_null(i)).then(|| c.value(i)))
};
let created_at = read_ts(created_at_col);
let updated_at = read_ts(updated_at_col);
entries.push(L1Entry {
src_vid: Vid::from(src_vids.value(i)),
dst_vid: Vid::from(dst_vids.value(i)),
eid: Eid::from(eids.value(i)),
op,
version: versions.value(i),
properties,
created_at,
updated_at,
});
}
Ok(entries)
}
fn extract_properties(
&self,
prop_cols: &[(&String, DataType, &ArrayRef)],
row: usize,
) -> Result<Properties> {
let mut properties = Properties::new();
for (name, dtype, col) in prop_cols {
if col.is_null(row) {
continue;
}
let val = Self::value_from_column(col.as_ref(), dtype, row)?;
properties.insert(name.to_string(), uni_common::Value::from(val));
}
Ok(properties)
}
fn value_from_column(
col: &dyn arrow_array::Array,
dtype: &uni_common::DataType,
row: usize,
) -> Result<serde_json::Value> {
crate::storage::value_codec::value_from_column(col, dtype, row, CrdtDecodeMode::Lenient)
}
fn filter_column(&self) -> &'static str {
if self.direction == "fwd" {
"src_vid"
} else {
"dst_vid"
}
}
pub async fn open_or_create(
&self,
backend: &dyn StorageBackend,
schema: &Schema,
) -> Result<()> {
let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
let arrow_schema = self.get_arrow_schema(schema)?;
backend
.open_or_create_table(&table_name, arrow_schema)
.await
}
pub async fn write_run(&self, backend: &dyn StorageBackend, batch: RecordBatch) -> Result<()> {
let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
if backend.table_exists(&table_name).await? {
backend
.write(&table_name, vec![batch], WriteMode::Append)
.await
} else {
backend.create_table(&table_name, vec![batch]).await
}
}
pub async fn ensure_eid_index(&self, backend: &dyn StorageBackend) -> Result<()> {
let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
let indices = backend.list_indexes(&table_name).await?;
if !indices
.iter()
.any(|idx| idx.columns.contains(&"eid".to_string()))
{
log::info!(
"Creating eid BTree index for edge type '{}'",
self.edge_type
);
if let Err(e) = backend
.create_scalar_index(&table_name, "eid", ScalarIndexType::BTree)
.await
{
log::warn!("Failed to create eid index for '{}': {}", self.edge_type, e);
}
}
Ok(())
}
pub fn table_name(&self) -> String {
table_names::delta_table_name(&self.edge_type, &self.direction)
}
pub async fn scan_all_backend(
&self,
backend: &dyn StorageBackend,
schema: &Schema,
) -> Result<Vec<L1Entry>> {
self.scan_all_backend_with_limit(backend, schema, DEFAULT_MAX_COMPACTION_ROWS)
.await
}
pub async fn scan_all_backend_with_limit(
&self,
backend: &dyn StorageBackend,
schema: &Schema,
max_rows: usize,
) -> Result<Vec<L1Entry>> {
let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
if !backend.table_exists(&table_name).await? {
return Ok(vec![]);
}
let row_count = backend.count_rows(&table_name, None).await?;
check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
info!(
edge_type = %self.edge_type,
direction = %self.direction,
row_count,
estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
"Starting delta scan for compaction (backend)"
);
let batches = backend.scan(ScanRequest::all(&table_name)).await?;
let mut entries = Vec::new();
for batch in batches {
let mut batch_entries = self.parse_batch(&batch, schema)?;
entries.append(&mut batch_entries);
}
self.sort_entries(&mut entries);
Ok(entries)
}
pub async fn replace(&self, backend: &dyn StorageBackend, batch: RecordBatch) -> Result<()> {
let table_name = self.table_name();
let arrow_schema = batch.schema();
backend
.replace_table_atomic(&table_name, vec![batch], arrow_schema)
.await
}
pub async fn read_deltas(
&self,
backend: &dyn StorageBackend,
vid: Vid,
schema: &Schema,
version_hwm: Option<u64>,
) -> Result<Vec<L1Entry>> {
let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
if !backend.table_exists(&table_name).await? {
return Ok(vec![]);
}
let base_filter = format!("{} = {}", self.filter_column(), vid.as_u64());
let final_filter = if let Some(hwm) = version_hwm {
format!("({}) AND (_version <= {})", base_filter, hwm)
} else {
base_filter
};
let batches = backend
.scan(ScanRequest::all(&table_name).with_filter(final_filter))
.await?;
let mut entries = Vec::new();
for batch in batches {
let mut batch_entries = self.parse_batch(&batch, schema)?;
entries.append(&mut batch_entries);
}
Ok(entries)
}
pub async fn read_deltas_batch(
&self,
backend: &dyn StorageBackend,
vids: &[Vid],
schema: &Schema,
version_hwm: Option<u64>,
) -> Result<HashMap<Vid, Vec<L1Entry>>> {
if vids.is_empty() {
return Ok(HashMap::new());
}
let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
if !backend.table_exists(&table_name).await? {
return Ok(HashMap::new());
}
let vid_list = vids
.iter()
.map(|v| v.as_u64().to_string())
.collect::<Vec<_>>()
.join(", ");
let mut filter = format!("{} IN ({})", self.filter_column(), vid_list);
if let Some(hwm) = version_hwm {
filter = format!("({}) AND (_version <= {})", filter, hwm);
}
let batches = backend
.scan(ScanRequest::all(&table_name).with_filter(filter))
.await?;
let is_fwd = self.direction == "fwd";
let mut result: HashMap<Vid, Vec<L1Entry>> = HashMap::new();
for batch in batches {
let entries = self.parse_batch(&batch, schema)?;
for entry in entries {
let vid = if is_fwd { entry.src_vid } else { entry.dst_vid };
result.entry(vid).or_default().push(entry);
}
}
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[expect(
clippy::assertions_on_constants,
reason = "Validating configuration constants intentionally"
)]
fn test_constants_are_reasonable() {
assert_eq!(DEFAULT_MAX_COMPACTION_ROWS, 5_000_000);
assert!(ENTRY_SIZE_ESTIMATE >= 100, "Entry size estimate too low");
assert!(ENTRY_SIZE_ESTIMATE <= 300, "Entry size estimate too high");
let estimated_gb =
(DEFAULT_MAX_COMPACTION_ROWS * ENTRY_SIZE_ESTIMATE) as f64 / (1024.0 * 1024.0 * 1024.0);
assert!(
estimated_gb < 1.0,
"5M entries should fit in under 1GB with current estimate"
);
}
#[test]
fn test_memory_estimate_formatting() {
let row_count = 10_000_000;
let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
let estimated_gb = estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
assert!(
estimated_gb > 1.0 && estimated_gb < 2.0,
"10M rows should be 1-2 GB"
);
}
#[test]
fn test_check_oom_guard_below_limit() {
let result = check_oom_guard(1_000_000, 5_000_000, "KNOWS", "fwd");
assert!(result.is_ok());
}
#[test]
fn test_check_oom_guard_at_limit() {
let result = check_oom_guard(5_000_000, 5_000_000, "KNOWS", "fwd");
assert!(result.is_ok());
}
#[test]
fn test_check_oom_guard_above_limit() {
let result = check_oom_guard(5_000_001, 5_000_000, "KNOWS", "fwd");
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(msg.contains("KNOWS_fwd"), "Error should name the entity");
assert!(msg.contains("5000001"), "Error should state the row count");
assert!(msg.contains("GB"), "Error should show GB estimate");
assert!(msg.contains("issue #143"), "Error should reference issue");
}
#[test]
fn test_op_values() {
assert_eq!(Op::Insert as u8, 0);
assert_eq!(Op::Delete as u8, 1);
}
}