use anyhow::Result;
use std::collections::HashMap;
use uni_db::Uni;
use uni_db::api::bulk::EdgeData;
use uni_db::unival;
const SCHEMA_JSON: &str = r#"{
"schema_version": 1,
"labels": {
"Person": {
"id": 1,
"created_at": "2024-01-01T00:00:00Z",
"state": "Active"
},
"Company": {
"id": 2,
"created_at": "2024-01-01T00:00:00Z",
"state": "Active"
}
},
"edge_types": {
"KNOWS": {
"id": 1,
"src_labels": ["Person"],
"dst_labels": ["Person"],
"state": "Active"
},
"WORKS_AT": {
"id": 2,
"src_labels": ["Person"],
"dst_labels": ["Company"],
"state": "Active"
}
},
"properties": {
"Person": {
"name": { "type": "String", "nullable": true, "added_in": 1, "state": "Active" },
"age": { "type": "Int32", "nullable": true, "added_in": 1, "state": "Active" }
},
"Company": {
"name": { "type": "String", "nullable": true, "added_in": 1, "state": "Active" }
},
"KNOWS": {
"since": { "type": "Int32", "nullable": true, "added_in": 1, "state": "Active" }
},
"WORKS_AT": {
"role": { "type": "String", "nullable": true, "added_in": 1, "state": "Active" }
}
},
"indexes": []
}"#;
async fn setup_db() -> Result<(Uni, tempfile::TempDir)> {
let temp_dir = tempfile::tempdir()?;
let path = temp_dir.path();
let schema_path = path.join("schema.json");
tokio::fs::write(&schema_path, SCHEMA_JSON).await?;
let db = Uni::open(path.to_str().unwrap()).build().await?;
db.load_schema(&schema_path).await?;
Ok((db, temp_dir))
}
#[tokio::test]
async fn test_bulk_insert_vertices() -> Result<()> {
let (db, _temp) = setup_db().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx.bulk_writer().batch_size(100).build()?;
let mut props = Vec::new();
for i in 0..250 {
let mut p: HashMap<String, uni_db::Value> = HashMap::new();
p.insert("name".to_string(), unival!(format!("Person_{}", i)));
p.insert("age".to_string(), unival!(i % 100));
props.push(p);
}
let vids = bulk.insert_vertices("Person", props).await?;
assert_eq!(vids.len(), 250);
let stats = bulk.commit().await?;
assert_eq!(stats.vertices_inserted, 250);
drop(tx);
let result = db
.session()
.query("MATCH (p:Person) RETURN count(p) AS c")
.await?;
assert_eq!(result.rows()[0].get::<i64>("c")?, 250);
Ok(())
}
#[tokio::test]
async fn test_bulk_insert_edges() -> Result<()> {
let (db, _temp) = setup_db().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx.bulk_writer().batch_size(100).build()?;
let mut person_props = Vec::new();
for i in 0..100 {
let mut p: HashMap<String, uni_db::Value> = HashMap::new();
p.insert("name".to_string(), unival!(format!("Person_{}", i)));
p.insert("age".to_string(), unival!(20 + i % 50));
person_props.push(p);
}
let person_vids = bulk.insert_vertices("Person", person_props).await?;
let mut company_props = Vec::new();
for i in 0..10 {
let mut p: HashMap<String, uni_db::Value> = HashMap::new();
p.insert("name".to_string(), unival!(format!("Company_{}", i)));
company_props.push(p);
}
let company_vids = bulk.insert_vertices("Company", company_props).await?;
let mut knows_edges = Vec::new();
for i in 0..50 {
let mut props = HashMap::new();
props.insert("since".to_string(), unival!(2020 + (i % 5)));
knows_edges.push(EdgeData::new(
person_vids[i],
person_vids[(i + 1) % 100],
props,
));
}
let knows_eids = bulk.insert_edges("KNOWS", knows_edges).await?;
assert_eq!(knows_eids.len(), 50);
let mut works_edges = Vec::new();
for i in 0..100 {
let mut props = HashMap::new();
props.insert("role".to_string(), unival!(format!("Role_{}", i % 5)));
works_edges.push(EdgeData::new(person_vids[i], company_vids[i % 10], props));
}
let works_eids = bulk.insert_edges("WORKS_AT", works_edges).await?;
assert_eq!(works_eids.len(), 100);
let stats = bulk.commit().await?;
assert_eq!(stats.vertices_inserted, 110); assert_eq!(stats.edges_inserted, 150);
Ok(())
}
#[tokio::test]
async fn test_bulk_abort_clears_buffers() -> Result<()> {
let (db, _temp) = setup_db().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx.bulk_writer().batch_size(1000).build()?;
let mut props = Vec::new();
for i in 0..50 {
let mut p: HashMap<String, uni_db::Value> = HashMap::new();
p.insert("name".to_string(), unival!(format!("Person_{}", i)));
props.push(p);
}
let _vids = bulk.insert_vertices("Person", props).await?;
bulk.abort().await?;
drop(tx);
let result = db
.session()
.query("MATCH (p:Person) RETURN count(p) AS c")
.await?;
if result.is_empty() {
} else {
assert_eq!(result.rows()[0].get::<i64>("c")?, 0);
}
Ok(())
}
#[tokio::test]
async fn test_bulk_progress_callback() -> Result<()> {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
let (db, _temp) = setup_db().await?;
let progress_count = Arc::new(AtomicUsize::new(0));
let progress_count_clone = progress_count.clone();
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx
.bulk_writer()
.batch_size(50)
.on_progress(move |_progress| {
progress_count_clone.fetch_add(1, Ordering::SeqCst);
})
.build()?;
let mut props = Vec::new();
for i in 0..200 {
let mut p: HashMap<String, uni_db::Value> = HashMap::new();
p.insert("name".to_string(), unival!(format!("Person_{}", i)));
props.push(p);
}
bulk.insert_vertices("Person", props).await?;
bulk.commit().await?;
assert!(progress_count.load(Ordering::SeqCst) > 0);
Ok(())
}
#[tokio::test]
async fn test_bulk_edge_with_properties() -> Result<()> {
let (db, _temp) = setup_db().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx.bulk_writer().build()?;
let p1_props = vec![{
let mut p = HashMap::new();
p.insert("name".to_string(), unival!("Alice"));
p.insert("age".to_string(), unival!(30));
p
}];
let p2_props = vec![{
let mut p = HashMap::new();
p.insert("name".to_string(), unival!("Bob"));
p.insert("age".to_string(), unival!(25));
p
}];
let p1_vids = bulk.insert_vertices("Person", p1_props).await?;
let p2_vids = bulk.insert_vertices("Person", p2_props).await?;
let mut edge_props = HashMap::new();
edge_props.insert("since".to_string(), unival!(2020));
let edges = vec![EdgeData::new(p1_vids[0], p2_vids[0], edge_props)];
let eids = bulk.insert_edges("KNOWS", edges).await?;
assert_eq!(eids.len(), 1);
bulk.commit().await?;
drop(tx);
let result = db
.session()
.query("MATCH (a:Person)-[r:KNOWS]->(b:Person) RETURN a.name, b.name")
.await?;
assert_eq!(result.len(), 1);
Ok(())
}
#[tokio::test]
async fn test_bulk_async_indexes_returns_immediately() -> Result<()> {
use uni_store::storage::IndexRebuildStatus;
let (db, _temp) = setup_db().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx
.bulk_writer()
.async_indexes(true)
.batch_size(100)
.build()?;
let mut props = Vec::new();
for i in 0..100 {
let mut p: HashMap<String, uni_db::Value> = HashMap::new();
p.insert("name".to_string(), unival!(format!("Person_{}", i)));
p.insert("age".to_string(), unival!(i % 100));
props.push(p);
}
bulk.insert_vertices("Person", props).await?;
let stats = bulk.commit().await?;
drop(tx);
assert!(stats.indexes_pending);
let result = db
.session()
.query("MATCH (p:Person) RETURN count(p) AS c")
.await?;
assert_eq!(result.rows()[0].get::<i64>("c")?, 100);
let status = db.indexes().rebuild_status().await?;
if !status.is_empty() {
for task in &status {
assert!(!task.label.is_empty());
assert!(
task.status == IndexRebuildStatus::Pending
|| task.status == IndexRebuildStatus::InProgress
|| task.status == IndexRebuildStatus::Completed
);
}
}
Ok(())
}
#[tokio::test]
async fn test_bulk_sync_indexes_blocks() -> Result<()> {
let (db, _temp) = setup_db().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx
.bulk_writer()
.async_indexes(false) .batch_size(100)
.build()?;
let mut props = Vec::new();
for i in 0..50 {
let mut p: HashMap<String, uni_db::Value> = HashMap::new();
p.insert("name".to_string(), unival!(format!("Person_{}", i)));
props.push(p);
}
bulk.insert_vertices("Person", props).await?;
let stats = bulk.commit().await?;
drop(tx);
assert!(!stats.indexes_pending);
assert!(stats.index_task_ids.is_empty());
let result = db
.session()
.query("MATCH (p:Person) RETURN count(p) AS c")
.await?;
assert_eq!(result.rows()[0].get::<i64>("c")?, 50);
Ok(())
}
#[tokio::test]
async fn test_index_rebuild_status_tracking() -> Result<()> {
let (db, _temp) = setup_db().await?;
let status = db.indexes().rebuild_status().await?;
let _initial_count = status.len();
let task_id = db.indexes().rebuild("Person", true).await?;
if let Some(tid) = task_id {
let status = db.indexes().rebuild_status().await?;
let found = status.iter().any(|t| t.id == tid);
assert!(found, "Task {} should be in status list", tid);
}
Ok(())
}
#[tokio::test]
async fn test_is_index_building() -> Result<()> {
let (db, _temp) = setup_db().await?;
let status = db.indexes().rebuild_status().await?;
let is_building = status.iter().any(|t| {
matches!(
t.status,
uni_store::storage::IndexRebuildStatus::Pending
| uni_store::storage::IndexRebuildStatus::InProgress
)
});
assert!(!is_building);
let _task_id = db.indexes().rebuild("Person", true).await?;
Ok(())
}
const SCHEMA_WITH_CONSTRAINTS: &str = r#"{
"schema_version": 1,
"labels": {
"Person": {
"id": 1,
"created_at": "2024-01-01T00:00:00Z",
"state": "Active"
}
},
"edge_types": {},
"properties": {
"Person": {
"name": { "type": "String", "nullable": false, "added_in": 1, "state": "Active" },
"email": { "type": "String", "nullable": true, "added_in": 1, "state": "Active" }
}
},
"constraints": [
{
"name": "unique_email",
"target": { "Label": "Person" },
"constraint_type": { "Unique": { "properties": ["email"] } },
"enabled": true
}
],
"indexes": []
}"#;
async fn setup_db_with_constraints() -> Result<(Uni, tempfile::TempDir)> {
let temp_dir = tempfile::tempdir()?;
let path = temp_dir.path();
let schema_path = path.join("schema.json");
tokio::fs::write(&schema_path, SCHEMA_WITH_CONSTRAINTS).await?;
let db = Uni::open(path.to_str().unwrap()).build().await?;
db.load_schema(&schema_path).await?;
Ok((db, temp_dir))
}
#[tokio::test]
async fn test_bulk_not_null_constraint() -> Result<()> {
let (db, _temp) = setup_db_with_constraints().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx.bulk_writer().validate_constraints(true).build()?;
let props = vec![{
let mut p = HashMap::new();
p.insert("email".to_string(), unival!("test@example.com"));
p
}];
let result = bulk.insert_vertices("Person", props).await;
assert!(result.is_err(), "Expected NOT NULL constraint violation");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("NOT NULL") || err_msg.contains("cannot be null"),
"Error should mention NOT NULL: {}",
err_msg
);
Ok(())
}
#[tokio::test]
async fn test_bulk_not_null_constraint_with_explicit_null() -> Result<()> {
let (db, _temp) = setup_db_with_constraints().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx.bulk_writer().validate_constraints(true).build()?;
let props = vec![{
let mut p = HashMap::new();
p.insert("name".to_string(), uni_db::Value::Null); p.insert("email".to_string(), unival!("test@example.com"));
p
}];
let result = bulk.insert_vertices("Person", props).await;
assert!(result.is_err(), "Expected NOT NULL constraint violation");
Ok(())
}
#[tokio::test]
async fn test_bulk_unique_constraint_in_batch() -> Result<()> {
let (db, _temp) = setup_db_with_constraints().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx.bulk_writer().validate_constraints(true).build()?;
let props = vec![
{
let mut p = HashMap::new();
p.insert("name".to_string(), unival!("Alice"));
p.insert("email".to_string(), unival!("same@example.com"));
p
},
{
let mut p = HashMap::new();
p.insert("name".to_string(), unival!("Bob"));
p.insert("email".to_string(), unival!("same@example.com")); p
},
];
let result = bulk.insert_vertices("Person", props).await;
assert!(result.is_err(), "Expected UNIQUE constraint violation");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("UNIQUE") || err_msg.contains("duplicate"),
"Error should mention UNIQUE: {}",
err_msg
);
Ok(())
}
#[tokio::test]
async fn test_bulk_unique_constraint_across_batches() -> Result<()> {
let (db, _temp) = setup_db_with_constraints().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx.bulk_writer().validate_constraints(true).build()?;
let props1 = vec![{
let mut p = HashMap::new();
p.insert("name".to_string(), unival!("Alice"));
p.insert("email".to_string(), unival!("alice@example.com"));
p
}];
bulk.insert_vertices("Person", props1).await?;
let props2 = vec![{
let mut p = HashMap::new();
p.insert("name".to_string(), unival!("Bob"));
p.insert("email".to_string(), unival!("alice@example.com")); p
}];
let result = bulk.insert_vertices("Person", props2).await;
assert!(
result.is_err(),
"Expected UNIQUE violation against buffered data"
);
Ok(())
}
#[tokio::test]
async fn test_bulk_abort_after_flush_rollback() -> Result<()> {
let (db, _temp) = setup_db().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx.bulk_writer().batch_size(10).build()?;
let mut props = Vec::new();
for i in 0..25 {
let mut p: HashMap<String, uni_db::Value> = HashMap::new();
p.insert("name".to_string(), unival!(format!("Person_{}", i)));
p.insert("age".to_string(), unival!(i));
props.push(p);
}
let _vids = bulk.insert_vertices("Person", props).await?;
bulk.abort().await?;
drop(tx);
let result = db
.session()
.query("MATCH (p:Person) RETURN count(p) AS c")
.await?;
if result.is_empty() {
} else {
assert_eq!(
result.rows()[0].get::<i64>("c")?,
0,
"Abort should rollback all flushed data"
);
}
Ok(())
}
#[tokio::test]
async fn test_bulk_buffer_limit_checkpoint() -> Result<()> {
let (db, _temp) = setup_db().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx
.bulk_writer()
.batch_size(100_000) .max_buffer_size_bytes(10 * 1024) .build()?;
let mut props = Vec::new();
for i in 0..100 {
let mut p: HashMap<String, uni_db::Value> = HashMap::new();
let long_name = format!("Person_{}_with_a_very_long_name_{}", i, "x".repeat(500));
p.insert("name".to_string(), unival!(long_name));
p.insert("age".to_string(), unival!(i));
props.push(p);
}
let vids = bulk.insert_vertices("Person", props).await?;
assert_eq!(vids.len(), 100);
let stats = bulk.commit().await?;
assert_eq!(stats.vertices_inserted, 100);
drop(tx);
let result = db
.session()
.query("MATCH (p:Person) RETURN count(p) AS c")
.await?;
assert_eq!(result.rows()[0].get::<i64>("c")?, 100);
Ok(())
}
#[tokio::test]
async fn test_bulk_constraint_validation_disabled() -> Result<()> {
let (db, _temp) = setup_db_with_constraints().await?;
let s = db.session();
let tx = s.tx().await?;
let mut bulk = tx.bulk_writer().validate_constraints(false).build()?;
let props = vec![
{
let mut p = HashMap::new();
p.insert("name".to_string(), unival!("Alice"));
p.insert("email".to_string(), unival!("same@example.com"));
p
},
{
let mut p = HashMap::new();
p.insert("name".to_string(), unival!("Bob"));
p.insert("email".to_string(), unival!("same@example.com")); p
},
];
let result = bulk.insert_vertices("Person", props).await;
assert!(
result.is_ok(),
"Should succeed with validation disabled: {:?}",
result.err()
);
bulk.commit().await?;
drop(tx);
let result = db
.session()
.query("MATCH (p:Person) RETURN count(p) AS c")
.await?;
assert_eq!(result.rows()[0].get::<i64>("c")?, 2);
Ok(())
}