#![allow(clippy::cast_sign_loss)]
#![cfg(all(feature = "cdc", feature = "gql"))]
use grafeo_engine::cdc::{ChangeKind, EntityId};
use grafeo_engine::{Config, GrafeoDB};
fn db() -> GrafeoDB {
GrafeoDB::with_config(Config::in_memory().with_cdc()).unwrap()
}
#[test]
fn insert_through_session_generates_create_event() {
let db = db();
let session = db.session();
session
.execute("INSERT (:Person {name: 'Alix', age: 30})")
.unwrap();
let result = session
.execute("MATCH (n:Person {name: 'Alix'}) RETURN id(n) AS nid")
.unwrap();
assert_eq!(result.row_count(), 1, "Node should exist after INSERT");
let node_id = match &result.rows()[0][0] {
grafeo_common::types::Value::Int64(id) => grafeo_common::types::NodeId::new(*id as u64),
other => panic!("Expected Int64 node ID, got: {other:?}"),
};
let history = db.history(node_id).unwrap();
assert!(
!history.is_empty(),
"CDC should record session INSERT, got 0 events"
);
assert!(
history.iter().any(|e| e.kind == ChangeKind::Create),
"Should contain a Create event for the session INSERT"
);
}
#[test]
fn set_through_session_generates_update_event() {
let db = db();
let session = db.session();
session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
let result = session
.execute("MATCH (n:Person {name: 'Alix'}) RETURN id(n)")
.unwrap();
let node_id = match &result.rows()[0][0] {
grafeo_common::types::Value::Int64(id) => grafeo_common::types::NodeId::new(*id as u64),
other => panic!("Expected Int64, got: {other:?}"),
};
session
.execute("MATCH (n:Person {name: 'Alix'}) SET n.city = 'Amsterdam'")
.unwrap();
let history = db.history(node_id).unwrap();
let update_count = history
.iter()
.filter(|e| e.kind == ChangeKind::Update)
.count();
assert!(
update_count >= 1,
"Should have at least 1 Update event from SET, got {update_count}"
);
}
#[test]
fn delete_through_session_generates_delete_event() {
let db = db();
let session = db.session();
session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
let result = session
.execute("MATCH (n:Person {name: 'Alix'}) RETURN id(n)")
.unwrap();
let node_id = match &result.rows()[0][0] {
grafeo_common::types::Value::Int64(id) => grafeo_common::types::NodeId::new(*id as u64),
other => panic!("Expected Int64, got: {other:?}"),
};
session
.execute("MATCH (n:Person {name: 'Alix'}) DELETE n")
.unwrap();
let history = db.history(node_id).unwrap();
assert!(
history.iter().any(|e| e.kind == ChangeKind::Delete),
"Should contain a Delete event from session DELETE"
);
}
#[test]
fn rollback_discards_cdc_events() {
let db = db();
let mut session = db.session();
session.begin_transaction().unwrap();
session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
session.rollback().unwrap();
let result = session
.execute("MATCH (n:Person) RETURN count(n) AS cnt")
.unwrap();
assert_eq!(
result.rows()[0][0],
grafeo_common::types::Value::Int64(0),
"Rolled-back node should not exist"
);
let changes = db
.changes_between(
grafeo_common::types::EpochId::new(0),
grafeo_common::types::EpochId::new(u64::MAX),
)
.unwrap();
assert!(
changes.is_empty(),
"Rolled-back transaction should produce 0 CDC events, got {}",
changes.len()
);
}
#[test]
fn multi_statement_transaction_flushes_on_commit() {
let db = db();
let mut session = db.session();
session.begin_transaction().unwrap();
session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
let pre_commit_changes = db
.changes_between(
grafeo_common::types::EpochId::new(0),
grafeo_common::types::EpochId::new(u64::MAX),
)
.unwrap();
assert!(
pre_commit_changes.is_empty(),
"CDC events should not appear before commit, got {}",
pre_commit_changes.len()
);
session.commit().unwrap();
let post_commit_changes = db
.changes_between(
grafeo_common::types::EpochId::new(0),
grafeo_common::types::EpochId::new(u64::MAX),
)
.unwrap();
let create_count = post_commit_changes
.iter()
.filter(|e| e.kind == ChangeKind::Create)
.count();
assert!(
create_count >= 2,
"Should have at least 2 Create events after commit, got {create_count}"
);
}
#[test]
fn savepoint_rollback_discards_post_savepoint_events() {
let db = db();
let mut session = db.session();
session.begin_transaction().unwrap();
session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
session.execute("SAVEPOINT sp1").unwrap();
session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
session.execute("ROLLBACK TO SAVEPOINT sp1").unwrap();
session.commit().unwrap();
let result = session
.execute("MATCH (n:Person) RETURN n.name ORDER BY n.name")
.unwrap();
assert_eq!(
result.row_count(),
1,
"Only Alix should exist after savepoint rollback"
);
let changes = db
.changes_between(
grafeo_common::types::EpochId::new(0),
grafeo_common::types::EpochId::new(u64::MAX),
)
.unwrap();
let create_count = changes
.iter()
.filter(|e| e.kind == ChangeKind::Create && matches!(e.entity_id, EntityId::Node(_)))
.count();
assert_eq!(
create_count, 1,
"Should have exactly 1 Create node event (Alix only), got {create_count}"
);
}
#[test]
fn edge_creation_through_session_generates_cdc() {
let db = db();
let session = db.session();
session
.execute("INSERT (:Person {name: 'Alix'})-[:KNOWS]->(:Person {name: 'Gus'})")
.unwrap();
let changes = db
.changes_between(
grafeo_common::types::EpochId::new(0),
grafeo_common::types::EpochId::new(u64::MAX),
)
.unwrap();
let node_creates = changes
.iter()
.filter(|e| e.kind == ChangeKind::Create && matches!(e.entity_id, EntityId::Node(_)))
.count();
let edge_creates = changes
.iter()
.filter(|e| e.kind == ChangeKind::Create && matches!(e.entity_id, EntityId::Edge(_)))
.count();
assert!(
node_creates >= 2,
"Should have at least 2 node Create events, got {node_creates}"
);
assert!(
edge_creates >= 1,
"Should have at least 1 edge Create event, got {edge_creates}"
);
}
#[test]
fn auto_commit_insert_generates_cdc() {
let db = db();
let session = db.session();
session
.execute("INSERT (:Person {name: 'Vincent'})")
.unwrap();
let changes = db
.changes_between(
grafeo_common::types::EpochId::new(0),
grafeo_common::types::EpochId::new(u64::MAX),
)
.unwrap();
assert!(
!changes.is_empty(),
"Auto-commit INSERT should generate CDC events"
);
assert!(
changes.iter().any(|e| e.kind == ChangeKind::Create),
"Should contain a Create event"
);
}
#[test]
fn edge_deletion_through_session_generates_cdc() {
let db = db();
let session = db.session();
session
.execute("INSERT (:Person {name: 'Alix'})-[:KNOWS]->(:Person {name: 'Gus'})")
.unwrap();
session
.execute("MATCH (:Person {name: 'Alix'})-[r:KNOWS]->(:Person {name: 'Gus'}) DELETE r")
.unwrap();
let changes = db
.changes_between(
grafeo_common::types::EpochId::new(0),
grafeo_common::types::EpochId::new(u64::MAX),
)
.unwrap();
let edge_deletes = changes
.iter()
.filter(|e| e.kind == ChangeKind::Delete && matches!(e.entity_id, EntityId::Edge(_)))
.count();
assert!(
edge_deletes >= 1,
"Should have at least 1 edge Delete event, got {edge_deletes}"
);
}
#[test]
fn remove_property_through_session_generates_cdc() {
let db = db();
let session = db.session();
session
.execute("INSERT (:Person {name: 'Alix', city: 'Amsterdam'})")
.unwrap();
session
.execute("MATCH (n:Person {name: 'Alix'}) SET n.city = NULL")
.unwrap();
let result = session
.execute("MATCH (n:Person {name: 'Alix'}) RETURN id(n)")
.unwrap();
let node_id = match &result.rows()[0][0] {
grafeo_common::types::Value::Int64(id) => grafeo_common::types::NodeId::new(*id as u64),
other => panic!("Expected Int64, got: {other:?}"),
};
let history = db.history(node_id).unwrap();
let update_count = history
.iter()
.filter(|e| e.kind == ChangeKind::Update)
.count();
assert!(
update_count >= 1,
"Should have at least 1 Update event for property removal, got {update_count}"
);
}
#[test]
fn set_label_through_session_generates_cdc() {
let db = db();
let session = db.session();
session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
let result = session
.execute("MATCH (n:Person {name: 'Alix'}) RETURN id(n)")
.unwrap();
let node_id = match &result.rows()[0][0] {
grafeo_common::types::Value::Int64(id) => grafeo_common::types::NodeId::new(*id as u64),
other => panic!("Expected Int64, got: {other:?}"),
};
session
.execute("MATCH (n:Person {name: 'Alix'}) SET n:Employee")
.unwrap();
let history = db.history(node_id).unwrap();
assert!(
history.len() >= 2,
"Should have at least 2 events after label SET, got {}",
history.len()
);
}
#[test]
fn set_edge_property_through_session_generates_cdc() {
let db = db();
let session = db.session();
session
.execute("INSERT (:Person {name: 'Alix'})-[:KNOWS {since: 2020}]->(:Person {name: 'Gus'})")
.unwrap();
session
.execute(
"MATCH (:Person {name: 'Alix'})-[r:KNOWS]->(:Person {name: 'Gus'}) SET r.since = 2025",
)
.unwrap();
let changes = db
.changes_between(
grafeo_common::types::EpochId::new(0),
grafeo_common::types::EpochId::new(u64::MAX),
)
.unwrap();
let edge_updates = changes
.iter()
.filter(|e| e.kind == ChangeKind::Update && matches!(e.entity_id, EntityId::Edge(_)))
.count();
assert!(
edge_updates >= 1,
"Should have at least 1 edge Update event, got {edge_updates}"
);
}
#[test]
fn detach_delete_through_session_generates_cdc() {
let db = db();
let session = db.session();
session
.execute("INSERT (:Person {name: 'Alix'})-[:KNOWS]->(:Person {name: 'Gus'})")
.unwrap();
session
.execute("MATCH (n:Person {name: 'Alix'}) DETACH DELETE n")
.unwrap();
let changes = db
.changes_between(
grafeo_common::types::EpochId::new(0),
grafeo_common::types::EpochId::new(u64::MAX),
)
.unwrap();
let node_deletes = changes
.iter()
.filter(|e| e.kind == ChangeKind::Delete && matches!(e.entity_id, EntityId::Node(_)))
.count();
let edge_deletes = changes
.iter()
.filter(|e| e.kind == ChangeKind::Delete && matches!(e.entity_id, EntityId::Edge(_)))
.count();
assert!(
node_deletes >= 1,
"Should have at least 1 node Delete from DETACH DELETE, got {node_deletes}"
);
assert!(
edge_deletes >= 1,
"Should have at least 1 edge Delete from DETACH DELETE, got {edge_deletes}"
);
}
#[test]
fn multiple_property_updates_in_transaction_generate_cdc() {
let db = db();
let mut session = db.session();
session.begin_transaction().unwrap();
session
.execute("INSERT (:Person {name: 'Alix', age: 30})")
.unwrap();
session
.execute("MATCH (n:Person {name: 'Alix'}) SET n.age = 31, n.city = 'Amsterdam'")
.unwrap();
session.commit().unwrap();
let changes = db
.changes_between(
grafeo_common::types::EpochId::new(0),
grafeo_common::types::EpochId::new(u64::MAX),
)
.unwrap();
let update_count = changes
.iter()
.filter(|e| e.kind == ChangeKind::Update)
.count();
assert!(
update_count >= 1,
"Should have Update events for property changes, got {update_count}"
);
}