use std::collections::HashMap;
use std::sync::Arc;
use arcstr::ArcStr;
use grafeo_common::types::{
EdgeId, EpochId, HlcTimestamp, NodeId, PropertyKey, TransactionId, Value,
};
use grafeo_common::utils::hash::FxHashMap;
use grafeo_core::graph::lpg::{CompareOp, Edge, Node};
use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut, GraphStoreSearch};
use grafeo_core::statistics::Statistics;
use parking_lot::Mutex;
use crate::cdc::{CdcLog, ChangeEvent, ChangeKind, EntityId};
pub(crate) struct CdcGraphStore {
inner: Arc<dyn GraphStoreMut>,
cdc_log: Arc<CdcLog>,
pending_events: Arc<Mutex<Vec<ChangeEvent>>>,
}
impl CdcGraphStore {
pub fn new(inner: Arc<dyn GraphStoreMut>, cdc_log: Arc<CdcLog>) -> Self {
Self {
inner,
cdc_log,
pending_events: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn wrap(
inner: Arc<dyn GraphStoreMut>,
cdc_log: Arc<CdcLog>,
pending_events: Arc<Mutex<Vec<ChangeEvent>>>,
) -> Self {
Self {
inner,
cdc_log,
pending_events,
}
}
pub fn pending_events(&self) -> Arc<Mutex<Vec<ChangeEvent>>> {
Arc::clone(&self.pending_events)
}
fn buffer_event(&self, mut event: ChangeEvent) {
event.epoch = EpochId::PENDING;
self.pending_events.lock().push(event);
}
fn record_directly(&self, event: ChangeEvent) {
self.cdc_log.record(event);
}
fn collect_node_properties(&self, id: NodeId) -> Option<HashMap<String, Value>> {
let node = self.inner.get_node(id)?;
let map: HashMap<String, Value> = node
.properties
.iter()
.map(|(k, v)| (k.as_str().to_string(), v.clone()))
.collect();
if map.is_empty() { None } else { Some(map) }
}
fn collect_edge_properties(&self, id: EdgeId) -> Option<HashMap<String, Value>> {
let edge = self.inner.get_edge(id)?;
let map: HashMap<String, Value> = edge
.properties
.iter()
.map(|(k, v)| (k.as_str().to_string(), v.clone()))
.collect();
if map.is_empty() { None } else { Some(map) }
}
fn collect_node_labels(&self, id: NodeId) -> Option<Vec<String>> {
let node = self.inner.get_node(id)?;
Some(node.labels.iter().map(|l| l.to_string()).collect())
}
fn next_ts(&self) -> HlcTimestamp {
self.cdc_log.next_timestamp()
}
}
fn make_event(
entity_id: EntityId,
kind: ChangeKind,
epoch: EpochId,
timestamp: HlcTimestamp,
) -> ChangeEvent {
ChangeEvent {
entity_id,
kind,
epoch,
timestamp,
before: None,
after: None,
labels: None,
edge_type: None,
src_id: None,
dst_id: None,
triple_subject: None,
triple_predicate: None,
triple_object: None,
triple_graph: None,
}
}
impl GraphStore for CdcGraphStore {
fn get_node(&self, id: NodeId) -> Option<Node> {
self.inner.get_node(id)
}
fn get_edge(&self, id: EdgeId) -> Option<Edge> {
self.inner.get_edge(id)
}
fn get_node_versioned(
&self,
id: NodeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> Option<Node> {
self.inner.get_node_versioned(id, epoch, transaction_id)
}
fn get_edge_versioned(
&self,
id: EdgeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> Option<Edge> {
self.inner.get_edge_versioned(id, epoch, transaction_id)
}
fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
self.inner.get_node_at_epoch(id, epoch)
}
fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
self.inner.get_edge_at_epoch(id, epoch)
}
fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
self.inner.get_node_property(id, key)
}
fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
self.inner.get_edge_property(id, key)
}
fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
self.inner.get_node_property_batch(ids, key)
}
fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
self.inner.get_nodes_properties_batch(ids)
}
fn get_nodes_properties_selective_batch(
&self,
ids: &[NodeId],
keys: &[PropertyKey],
) -> Vec<FxHashMap<PropertyKey, Value>> {
self.inner.get_nodes_properties_selective_batch(ids, keys)
}
fn get_edges_properties_selective_batch(
&self,
ids: &[EdgeId],
keys: &[PropertyKey],
) -> Vec<FxHashMap<PropertyKey, Value>> {
self.inner.get_edges_properties_selective_batch(ids, keys)
}
fn neighbors(&self, node: NodeId, direction: Direction) -> Vec<NodeId> {
self.inner.neighbors(node, direction)
}
fn edges_from(&self, node: NodeId, direction: Direction) -> Vec<(NodeId, EdgeId)> {
self.inner.edges_from(node, direction)
}
fn out_degree(&self, node: NodeId) -> usize {
self.inner.out_degree(node)
}
fn in_degree(&self, node: NodeId) -> usize {
self.inner.in_degree(node)
}
fn has_backward_adjacency(&self) -> bool {
self.inner.has_backward_adjacency()
}
fn node_ids(&self) -> Vec<NodeId> {
self.inner.node_ids()
}
fn all_node_ids(&self) -> Vec<NodeId> {
self.inner.all_node_ids()
}
fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
self.inner.nodes_by_label(label)
}
fn node_count(&self) -> usize {
self.inner.node_count()
}
fn edge_count(&self) -> usize {
self.inner.edge_count()
}
fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
self.inner.edge_type(id)
}
fn has_property_index(&self, property: &str) -> bool {
self.inner.has_property_index(property)
}
fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
self.inner.find_nodes_by_property(property, value)
}
fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
self.inner.find_nodes_by_properties(conditions)
}
fn find_nodes_in_range(
&self,
property: &str,
min: Option<&Value>,
max: Option<&Value>,
min_inclusive: bool,
max_inclusive: bool,
) -> Vec<NodeId> {
self.inner
.find_nodes_in_range(property, min, max, min_inclusive, max_inclusive)
}
fn node_property_might_match(
&self,
property: &PropertyKey,
op: CompareOp,
value: &Value,
) -> bool {
self.inner.node_property_might_match(property, op, value)
}
fn edge_property_might_match(
&self,
property: &PropertyKey,
op: CompareOp,
value: &Value,
) -> bool {
self.inner.edge_property_might_match(property, op, value)
}
fn statistics(&self) -> Arc<Statistics> {
self.inner.statistics()
}
fn estimate_label_cardinality(&self, label: &str) -> f64 {
self.inner.estimate_label_cardinality(label)
}
fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
self.inner.estimate_avg_degree(edge_type, outgoing)
}
fn current_epoch(&self) -> EpochId {
self.inner.current_epoch()
}
fn all_labels(&self) -> Vec<String> {
self.inner.all_labels()
}
fn all_edge_types(&self) -> Vec<String> {
self.inner.all_edge_types()
}
fn all_property_keys(&self) -> Vec<String> {
self.inner.all_property_keys()
}
fn is_node_visible_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
self.inner.is_node_visible_at_epoch(id, epoch)
}
fn is_node_visible_versioned(
&self,
id: NodeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool {
self.inner
.is_node_visible_versioned(id, epoch, transaction_id)
}
fn is_edge_visible_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
self.inner.is_edge_visible_at_epoch(id, epoch)
}
fn is_edge_visible_versioned(
&self,
id: EdgeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool {
self.inner
.is_edge_visible_versioned(id, epoch, transaction_id)
}
fn filter_visible_node_ids(&self, ids: &[NodeId], epoch: EpochId) -> Vec<NodeId> {
self.inner.filter_visible_node_ids(ids, epoch)
}
fn filter_visible_node_ids_versioned(
&self,
ids: &[NodeId],
epoch: EpochId,
transaction_id: TransactionId,
) -> Vec<NodeId> {
self.inner
.filter_visible_node_ids_versioned(ids, epoch, transaction_id)
}
fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
self.inner.get_node_history(id)
}
fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
self.inner.get_edge_history(id)
}
}
impl GraphStoreSearch for CdcGraphStore {}
impl GraphStoreMut for CdcGraphStore {
fn create_node(&self, labels: &[&str]) -> NodeId {
let id = self.inner.create_node(labels);
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Node(id),
ChangeKind::Create,
epoch,
self.next_ts(),
);
event.labels = Some(labels.iter().map(|s| (*s).to_string()).collect());
self.record_directly(event);
id
}
fn create_node_versioned(
&self,
labels: &[&str],
epoch: EpochId,
transaction_id: TransactionId,
) -> NodeId {
let id = self
.inner
.create_node_versioned(labels, epoch, transaction_id);
let mut event = make_event(
EntityId::Node(id),
ChangeKind::Create,
EpochId::PENDING,
self.next_ts(),
);
event.labels = Some(labels.iter().map(|s| (*s).to_string()).collect());
self.buffer_event(event);
id
}
fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
let id = self.inner.create_edge(src, dst, edge_type);
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Edge(id),
ChangeKind::Create,
epoch,
self.next_ts(),
);
event.edge_type = Some(edge_type.to_string());
event.src_id = Some(src.as_u64());
event.dst_id = Some(dst.as_u64());
self.record_directly(event);
id
}
fn create_edge_versioned(
&self,
src: NodeId,
dst: NodeId,
edge_type: &str,
epoch: EpochId,
transaction_id: TransactionId,
) -> EdgeId {
let id = self
.inner
.create_edge_versioned(src, dst, edge_type, epoch, transaction_id);
let mut event = make_event(
EntityId::Edge(id),
ChangeKind::Create,
epoch,
self.next_ts(),
);
event.edge_type = Some(edge_type.to_string());
event.src_id = Some(src.as_u64());
event.dst_id = Some(dst.as_u64());
self.buffer_event(event);
id
}
fn batch_create_edges(&self, edges: &[(NodeId, NodeId, &str)]) -> Vec<EdgeId> {
let ids = self.inner.batch_create_edges(edges);
let epoch = self.inner.current_epoch();
for (id, (src, dst, edge_type)) in ids.iter().zip(edges) {
let mut event = make_event(
EntityId::Edge(*id),
ChangeKind::Create,
epoch,
self.next_ts(),
);
event.edge_type = Some((*edge_type).to_string());
event.src_id = Some(src.as_u64());
event.dst_id = Some(dst.as_u64());
self.record_directly(event);
}
ids
}
fn delete_node(&self, id: NodeId) -> bool {
let before_props = self.collect_node_properties(id);
let deleted = self.inner.delete_node(id);
if deleted {
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Node(id),
ChangeKind::Delete,
epoch,
self.next_ts(),
);
event.before = before_props;
self.record_directly(event);
}
deleted
}
fn delete_node_versioned(
&self,
id: NodeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool {
let before_props = self.collect_node_properties(id);
let labels = self.collect_node_labels(id);
let deleted = self.inner.delete_node_versioned(id, epoch, transaction_id);
if deleted {
let mut event = make_event(
EntityId::Node(id),
ChangeKind::Delete,
epoch,
self.next_ts(),
);
event.before = before_props;
event.labels = labels;
self.buffer_event(event);
}
deleted
}
fn delete_node_edges(&self, node_id: NodeId) {
let outgoing: Vec<(NodeId, EdgeId)> = self.inner.edges_from(node_id, Direction::Outgoing);
let incoming: Vec<(NodeId, EdgeId)> = self.inner.edges_from(node_id, Direction::Incoming);
let edge_infos: Vec<(EdgeId, Option<HashMap<String, Value>>)> = outgoing
.iter()
.chain(incoming.iter())
.map(|(_, eid)| (*eid, self.collect_edge_properties(*eid)))
.collect();
self.inner.delete_node_edges(node_id);
let epoch = self.inner.current_epoch();
for (eid, props) in edge_infos {
let mut event = make_event(
EntityId::Edge(eid),
ChangeKind::Delete,
epoch,
self.next_ts(),
);
event.before = props;
self.record_directly(event);
}
}
fn delete_edge(&self, id: EdgeId) -> bool {
let before_props = self.collect_edge_properties(id);
let deleted = self.inner.delete_edge(id);
if deleted {
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Edge(id),
ChangeKind::Delete,
epoch,
self.next_ts(),
);
event.before = before_props;
self.record_directly(event);
}
deleted
}
fn delete_edge_versioned(
&self,
id: EdgeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool {
let before_props = self.collect_edge_properties(id);
let deleted = self.inner.delete_edge_versioned(id, epoch, transaction_id);
if deleted {
let mut event = make_event(
EntityId::Edge(id),
ChangeKind::Delete,
epoch,
self.next_ts(),
);
event.before = before_props;
self.buffer_event(event);
}
deleted
}
fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
let old_value = self.inner.get_node_property(id, &PropertyKey::new(key));
self.inner.set_node_property(id, key, value.clone());
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Node(id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
event.before = old_value.map(|v| {
let mut m = HashMap::new();
m.insert(key.to_string(), v);
m
});
let mut after = HashMap::new();
after.insert(key.to_string(), value);
event.after = Some(after);
self.record_directly(event);
}
fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
let old_value = self.inner.get_edge_property(id, &PropertyKey::new(key));
self.inner.set_edge_property(id, key, value.clone());
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Edge(id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
event.before = old_value.map(|v| {
let mut m = HashMap::new();
m.insert(key.to_string(), v);
m
});
let mut after = HashMap::new();
after.insert(key.to_string(), value);
event.after = Some(after);
self.record_directly(event);
}
fn set_node_property_versioned(
&self,
id: NodeId,
key: &str,
value: Value,
transaction_id: TransactionId,
) {
let old_value = self.inner.get_node_property(id, &PropertyKey::new(key));
self.inner
.set_node_property_versioned(id, key, value.clone(), transaction_id);
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Node(id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
event.before = old_value.map(|v| {
let mut m = HashMap::new();
m.insert(key.to_string(), v);
m
});
let mut after = HashMap::new();
after.insert(key.to_string(), value);
event.after = Some(after);
self.buffer_event(event);
}
fn set_edge_property_versioned(
&self,
id: EdgeId,
key: &str,
value: Value,
transaction_id: TransactionId,
) {
let old_value = self.inner.get_edge_property(id, &PropertyKey::new(key));
self.inner
.set_edge_property_versioned(id, key, value.clone(), transaction_id);
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Edge(id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
event.before = old_value.map(|v| {
let mut m = HashMap::new();
m.insert(key.to_string(), v);
m
});
let mut after = HashMap::new();
after.insert(key.to_string(), value);
event.after = Some(after);
self.buffer_event(event);
}
fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
let removed = self.inner.remove_node_property(id, key);
if let Some(ref old_val) = removed {
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Node(id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
let mut before = HashMap::new();
before.insert(key.to_string(), old_val.clone());
event.before = Some(before);
self.record_directly(event);
}
removed
}
fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
let removed = self.inner.remove_edge_property(id, key);
if let Some(ref old_val) = removed {
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Edge(id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
let mut before = HashMap::new();
before.insert(key.to_string(), old_val.clone());
event.before = Some(before);
self.record_directly(event);
}
removed
}
fn remove_node_property_versioned(
&self,
id: NodeId,
key: &str,
transaction_id: TransactionId,
) -> Option<Value> {
let removed = self
.inner
.remove_node_property_versioned(id, key, transaction_id);
if let Some(ref old_val) = removed {
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Node(id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
let mut before = HashMap::new();
before.insert(key.to_string(), old_val.clone());
event.before = Some(before);
self.buffer_event(event);
}
removed
}
fn remove_edge_property_versioned(
&self,
id: EdgeId,
key: &str,
transaction_id: TransactionId,
) -> Option<Value> {
let removed = self
.inner
.remove_edge_property_versioned(id, key, transaction_id);
if let Some(ref old_val) = removed {
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Edge(id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
let mut before = HashMap::new();
before.insert(key.to_string(), old_val.clone());
event.before = Some(before);
self.buffer_event(event);
}
removed
}
fn add_label(&self, node_id: NodeId, label: &str) -> bool {
let added = self.inner.add_label(node_id, label);
if added {
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Node(node_id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
event.labels = self.collect_node_labels(node_id);
self.record_directly(event);
}
added
}
fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
let old_labels = self.collect_node_labels(node_id);
let removed = self.inner.remove_label(node_id, label);
if removed {
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Node(node_id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
event.labels = old_labels;
self.record_directly(event);
}
removed
}
fn add_label_versioned(
&self,
node_id: NodeId,
label: &str,
transaction_id: TransactionId,
) -> bool {
let added = self
.inner
.add_label_versioned(node_id, label, transaction_id);
if added {
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Node(node_id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
event.labels = self.collect_node_labels(node_id);
self.buffer_event(event);
}
added
}
fn remove_label_versioned(
&self,
node_id: NodeId,
label: &str,
transaction_id: TransactionId,
) -> bool {
let old_labels = self.collect_node_labels(node_id);
let removed = self
.inner
.remove_label_versioned(node_id, label, transaction_id);
if removed {
let epoch = self.inner.current_epoch();
let mut event = make_event(
EntityId::Node(node_id),
ChangeKind::Update,
epoch,
self.next_ts(),
);
event.labels = old_labels;
self.buffer_event(event);
}
removed
}
}
#[cfg(test)]
mod tests {
use super::*;
use grafeo_core::graph::lpg::LpgStore;
fn setup() -> (CdcGraphStore, Arc<CdcLog>) {
let store = Arc::new(LpgStore::new().unwrap());
let log = Arc::new(CdcLog::new());
let cdc = CdcGraphStore::new(
Arc::clone(&store) as Arc<dyn GraphStoreMut>,
Arc::clone(&log),
);
(cdc, log)
}
#[test]
fn new_creates_empty_pending_buffer() {
let (cdc, _log) = setup();
assert!(cdc.pending_events().lock().is_empty());
}
#[test]
fn wrap_shares_event_buffer() {
let store = Arc::new(LpgStore::new().unwrap());
let log = Arc::new(CdcLog::new());
let pending = Arc::new(Mutex::new(Vec::<ChangeEvent>::new()));
let cdc = CdcGraphStore::wrap(
Arc::clone(&store) as Arc<dyn GraphStoreMut>,
Arc::clone(&log),
Arc::clone(&pending),
);
let id = cdc.create_node(&["Person"]);
assert!(pending.lock().is_empty());
assert_eq!(log.history(EntityId::Node(id)).len(), 1);
}
#[test]
fn get_node_delegates_to_inner() {
let (cdc, _log) = setup();
let id = cdc.create_node(&["Person"]);
let node = cdc.get_node(id);
assert!(node.is_some());
assert!(node.unwrap().labels.iter().any(|l| l.as_str() == "Person"));
}
#[test]
fn get_edge_delegates_to_inner() {
let (cdc, _log) = setup();
let a = cdc.create_node(&["A"]);
let b = cdc.create_node(&["B"]);
let eid = cdc.create_edge(a, b, "KNOWS");
assert!(cdc.get_edge(eid).is_some());
}
#[test]
fn node_count_and_edge_count_delegate() {
let (cdc, _log) = setup();
assert_eq!(cdc.node_count(), 0);
assert_eq!(cdc.edge_count(), 0);
let a = cdc.create_node(&["A"]);
let b = cdc.create_node(&["B"]);
cdc.create_edge(a, b, "E");
assert_eq!(cdc.node_count(), 2);
assert_eq!(cdc.edge_count(), 1);
}
#[test]
fn node_ids_and_all_node_ids_delegate() {
let (cdc, _log) = setup();
let a = cdc.create_node(&["X"]);
assert!(cdc.node_ids().contains(&a));
assert!(cdc.all_node_ids().contains(&a));
}
#[test]
fn nodes_by_label_delegates() {
let (cdc, _log) = setup();
let a = cdc.create_node(&["City"]);
assert!(cdc.nodes_by_label("City").contains(&a));
assert!(cdc.nodes_by_label("Unknown").is_empty());
}
#[test]
fn edge_type_delegates() {
let (cdc, _log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let e = cdc.create_edge(a, b, "LIKES");
assert_eq!(&*cdc.edge_type(e).unwrap(), "LIKES");
}
#[test]
fn neighbors_and_edges_from_delegate() {
let (cdc, _log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
cdc.create_edge(a, b, "E");
assert!(cdc.neighbors(a, Direction::Outgoing).contains(&b));
assert!(!cdc.edges_from(a, Direction::Outgoing).is_empty());
}
#[test]
fn degree_delegates() {
let (cdc, _log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
cdc.create_edge(a, b, "E");
assert_eq!(cdc.out_degree(a), 1);
assert_eq!(cdc.in_degree(b), 1);
}
#[test]
fn property_access_delegates() {
let (cdc, _log) = setup();
let a = cdc.create_node(&["N"]);
cdc.set_node_property(a, "name", Value::from("Alix"));
assert_eq!(
cdc.get_node_property(a, &PropertyKey::new("name")),
Some(Value::from("Alix"))
);
}
#[test]
fn all_labels_and_edge_types_delegate() {
let (cdc, _log) = setup();
let a = cdc.create_node(&["Person"]);
let b = cdc.create_node(&["City"]);
cdc.create_edge(a, b, "LIVES_IN");
let labels = cdc.all_labels();
assert!(labels.contains(&"Person".to_string()));
assert!(labels.contains(&"City".to_string()));
let types = cdc.all_edge_types();
assert!(types.contains(&"LIVES_IN".to_string()));
}
#[test]
fn all_property_keys_delegates() {
let (cdc, _log) = setup();
let a = cdc.create_node(&[]);
cdc.set_node_property(a, "colour", Value::from("orange"));
let keys = cdc.all_property_keys();
assert!(keys.contains(&"colour".to_string()));
}
#[test]
fn statistics_delegates() {
let (cdc, _log) = setup();
let _stats = cdc.statistics();
}
#[test]
fn current_epoch_delegates() {
let (cdc, _log) = setup();
let _epoch = cdc.current_epoch();
}
#[test]
fn has_backward_adjacency_delegates() {
let (cdc, _log) = setup();
let _ = cdc.has_backward_adjacency();
}
#[test]
fn has_property_index_delegates() {
let (cdc, _log) = setup();
assert!(!cdc.has_property_index("nonexistent"));
}
#[test]
fn find_nodes_by_property_delegates() {
let (cdc, _log) = setup();
let a = cdc.create_node(&["N"]);
cdc.set_node_property(a, "x", Value::Int64(42));
let _found = cdc.find_nodes_by_property("x", &Value::Int64(42));
}
#[test]
fn find_nodes_by_properties_delegates() {
let (cdc, _log) = setup();
let _found = cdc.find_nodes_by_properties(&[("x", Value::Int64(1))]);
}
#[test]
fn find_nodes_in_range_delegates() {
let (cdc, _log) = setup();
let _found = cdc.find_nodes_in_range(
"x",
Some(&Value::Int64(0)),
Some(&Value::Int64(100)),
true,
true,
);
}
#[test]
fn estimate_label_cardinality_delegates() {
let (cdc, _log) = setup();
let _est = cdc.estimate_label_cardinality("Person");
}
#[test]
fn estimate_avg_degree_delegates() {
let (cdc, _log) = setup();
let _est = cdc.estimate_avg_degree("KNOWS", true);
}
#[test]
fn property_might_match_delegates() {
let (cdc, _log) = setup();
let pk = PropertyKey::new("x");
let _ = cdc.node_property_might_match(&pk, CompareOp::Eq, &Value::Int64(1));
let _ = cdc.edge_property_might_match(&pk, CompareOp::Eq, &Value::Int64(1));
}
#[test]
fn visibility_delegates() {
let (cdc, _log) = setup();
let a = cdc.create_node(&["N"]);
let b = cdc.create_node(&[]);
let e = cdc.create_edge(a, b, "E");
let epoch = cdc.current_epoch();
let _ = cdc.is_node_visible_at_epoch(a, epoch);
let _ = cdc.is_edge_visible_at_epoch(e, epoch);
let _ = cdc.filter_visible_node_ids(&[a], epoch);
}
#[test]
fn history_delegates() {
let (cdc, _log) = setup();
let a = cdc.create_node(&["N"]);
let b = cdc.create_node(&[]);
let e = cdc.create_edge(a, b, "E");
let _ = cdc.get_node_history(a);
let _ = cdc.get_edge_history(e);
}
#[test]
fn batch_property_access_delegates() {
let (cdc, _log) = setup();
let a = cdc.create_node(&["N"]);
let b = cdc.create_node(&["N"]);
cdc.set_node_property(a, "x", Value::Int64(1));
cdc.set_node_property(b, "x", Value::Int64(2));
let pk = PropertyKey::new("x");
let batch = cdc.get_node_property_batch(&[a, b], &pk);
assert_eq!(batch.len(), 2);
let props = cdc.get_nodes_properties_batch(&[a, b]);
assert_eq!(props.len(), 2);
let selective =
cdc.get_nodes_properties_selective_batch(&[a, b], std::slice::from_ref(&pk));
assert_eq!(selective.len(), 2);
let ea = cdc.create_edge(a, b, "E");
cdc.set_edge_property(ea, "w", Value::Int64(10));
let edge_sel = cdc.get_edges_properties_selective_batch(&[ea], &[PropertyKey::new("w")]);
assert_eq!(edge_sel.len(), 1);
}
#[test]
fn create_node_records_directly() {
let (cdc, log) = setup();
let id = cdc.create_node(&["Person", "Employee"]);
let events = log.history(EntityId::Node(id));
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, ChangeKind::Create);
assert_eq!(events[0].labels.as_ref().unwrap(), &["Person", "Employee"]);
assert!(cdc.pending_events().lock().is_empty());
}
#[test]
fn create_edge_records_directly() {
let (cdc, log) = setup();
let a = cdc.create_node(&["A"]);
let b = cdc.create_node(&["B"]);
let eid = cdc.create_edge(a, b, "KNOWS");
let events = log.history(EntityId::Edge(eid));
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, ChangeKind::Create);
assert_eq!(events[0].edge_type.as_deref(), Some("KNOWS"));
assert_eq!(events[0].src_id, Some(a.as_u64()));
assert_eq!(events[0].dst_id, Some(b.as_u64()));
}
#[test]
fn batch_create_edges_records_directly() {
let (cdc, log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let c = cdc.create_node(&[]);
let ids = cdc.batch_create_edges(&[(a, b, "X"), (b, c, "Y")]);
assert_eq!(ids.len(), 2);
for id in &ids {
let events = log.history(EntityId::Edge(*id));
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, ChangeKind::Create);
}
}
#[test]
fn delete_node_records_directly_with_before_props() {
let (cdc, log) = setup();
let id = cdc.create_node(&["P"]);
cdc.set_node_property(id, "name", Value::from("Alix"));
let deleted = cdc.delete_node(id);
assert!(deleted);
let events = log.history(EntityId::Node(id));
let del_event = events
.iter()
.find(|e| e.kind == ChangeKind::Delete)
.unwrap();
let before = del_event.before.as_ref().unwrap();
assert_eq!(before.get("name"), Some(&Value::from("Alix")));
}
#[test]
fn delete_node_no_event_when_not_found() {
let (cdc, log) = setup();
let fake_id = NodeId::new(999);
let deleted = cdc.delete_node(fake_id);
assert!(!deleted);
assert!(log.history(EntityId::Node(fake_id)).is_empty());
}
#[test]
fn delete_edge_records_directly_with_before_props() {
let (cdc, log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let eid = cdc.create_edge(a, b, "E");
cdc.set_edge_property(eid, "weight", Value::Float64(1.5));
let deleted = cdc.delete_edge(eid);
assert!(deleted);
let del_event = log
.history(EntityId::Edge(eid))
.into_iter()
.find(|e| e.kind == ChangeKind::Delete)
.unwrap();
let before = del_event.before.as_ref().unwrap();
assert_eq!(before.get("weight"), Some(&Value::Float64(1.5)));
}
#[test]
fn delete_edge_no_event_when_not_found() {
let (cdc, log) = setup();
let fake = EdgeId::new(999);
assert!(!cdc.delete_edge(fake));
assert!(log.history(EntityId::Edge(fake)).is_empty());
}
#[test]
fn delete_node_edges_records_each_edge() {
let (cdc, log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let c = cdc.create_node(&[]);
let e1 = cdc.create_edge(a, b, "X");
let e2 = cdc.create_edge(c, a, "Y");
cdc.set_edge_property(e1, "p", Value::Int64(1));
cdc.delete_node_edges(a);
let e1_del = log
.history(EntityId::Edge(e1))
.into_iter()
.any(|e| e.kind == ChangeKind::Delete);
let e2_del = log
.history(EntityId::Edge(e2))
.into_iter()
.any(|e| e.kind == ChangeKind::Delete);
assert!(e1_del, "Outgoing edge should have Delete event");
assert!(e2_del, "Incoming edge should have Delete event");
}
#[test]
fn set_node_property_records_old_and_new() {
let (cdc, log) = setup();
let id = cdc.create_node(&["N"]);
cdc.set_node_property(id, "city", Value::from("Amsterdam"));
cdc.set_node_property(id, "city", Value::from("Berlin"));
let events = log.history(EntityId::Node(id));
let updates: Vec<_> = events
.iter()
.filter(|e| e.kind == ChangeKind::Update)
.collect();
assert_eq!(updates.len(), 2);
assert!(updates[0].before.is_none());
assert_eq!(
updates[0].after.as_ref().unwrap().get("city"),
Some(&Value::from("Amsterdam"))
);
assert_eq!(
updates[1].before.as_ref().unwrap().get("city"),
Some(&Value::from("Amsterdam"))
);
assert_eq!(
updates[1].after.as_ref().unwrap().get("city"),
Some(&Value::from("Berlin"))
);
}
#[test]
fn set_edge_property_records_old_and_new() {
let (cdc, log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let eid = cdc.create_edge(a, b, "E");
cdc.set_edge_property(eid, "w", Value::Int64(1));
cdc.set_edge_property(eid, "w", Value::Int64(2));
let events = log.history(EntityId::Edge(eid));
let updates: Vec<_> = events
.iter()
.filter(|e| e.kind == ChangeKind::Update)
.collect();
assert_eq!(updates.len(), 2);
assert!(updates[0].before.is_none());
assert_eq!(
updates[1].before.as_ref().unwrap().get("w"),
Some(&Value::Int64(1))
);
assert_eq!(
updates[1].after.as_ref().unwrap().get("w"),
Some(&Value::Int64(2))
);
}
#[test]
fn remove_node_property_records_before() {
let (cdc, log) = setup();
let id = cdc.create_node(&[]);
cdc.set_node_property(id, "x", Value::Int64(42));
let removed = cdc.remove_node_property(id, "x");
assert_eq!(removed, Some(Value::Int64(42)));
let events = log.history(EntityId::Node(id));
let last = events.last().unwrap();
assert_eq!(last.kind, ChangeKind::Update);
assert_eq!(
last.before.as_ref().unwrap().get("x"),
Some(&Value::Int64(42))
);
assert!(last.after.is_none());
}
#[test]
fn remove_node_property_no_event_when_missing() {
let (cdc, log) = setup();
let id = cdc.create_node(&[]);
let removed = cdc.remove_node_property(id, "nope");
assert!(removed.is_none());
let events = log.history(EntityId::Node(id));
assert_eq!(events.len(), 1);
}
#[test]
fn remove_edge_property_records_before() {
let (cdc, log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let eid = cdc.create_edge(a, b, "E");
cdc.set_edge_property(eid, "w", Value::Float64(19.88));
let removed = cdc.remove_edge_property(eid, "w");
assert_eq!(removed, Some(Value::Float64(19.88)));
let events = log.history(EntityId::Edge(eid));
let last = events.last().unwrap();
assert_eq!(last.kind, ChangeKind::Update);
assert_eq!(
last.before.as_ref().unwrap().get("w"),
Some(&Value::Float64(19.88))
);
}
#[test]
fn remove_edge_property_no_event_when_missing() {
let (cdc, log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let eid = cdc.create_edge(a, b, "E");
let removed = cdc.remove_edge_property(eid, "nope");
assert!(removed.is_none());
let events = log.history(EntityId::Edge(eid));
assert_eq!(events.len(), 1);
}
#[test]
fn add_label_records_update() {
let (cdc, log) = setup();
let id = cdc.create_node(&["Person"]);
let added = cdc.add_label(id, "Employee");
assert!(added);
let events = log.history(EntityId::Node(id));
let update = events
.iter()
.find(|e| e.kind == ChangeKind::Update)
.unwrap();
let labels = update.labels.as_ref().unwrap();
assert!(labels.contains(&"Person".to_string()));
assert!(labels.contains(&"Employee".to_string()));
}
#[test]
fn add_label_no_event_when_already_present() {
let (cdc, log) = setup();
let id = cdc.create_node(&["Person"]);
let added = cdc.add_label(id, "Person");
assert!(!added);
assert_eq!(log.history(EntityId::Node(id)).len(), 1);
}
#[test]
fn remove_label_records_old_labels() {
let (cdc, log) = setup();
let id = cdc.create_node(&["Person", "Employee"]);
let removed = cdc.remove_label(id, "Employee");
assert!(removed);
let events = log.history(EntityId::Node(id));
let update = events
.iter()
.find(|e| e.kind == ChangeKind::Update)
.unwrap();
let labels = update.labels.as_ref().unwrap();
assert!(labels.contains(&"Person".to_string()));
assert!(labels.contains(&"Employee".to_string()));
}
#[test]
fn remove_label_no_event_when_missing() {
let (cdc, log) = setup();
let id = cdc.create_node(&["Person"]);
let removed = cdc.remove_label(id, "Nonexistent");
assert!(!removed);
assert_eq!(log.history(EntityId::Node(id)).len(), 1);
}
#[test]
fn create_node_versioned_buffers_event() {
let (cdc, log) = setup();
let epoch = EpochId(1);
let tx = TransactionId::new(1);
let id = cdc.create_node_versioned(&["Person"], epoch, tx);
assert!(log.history(EntityId::Node(id)).is_empty());
let pending = cdc.pending_events().lock().clone();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].kind, ChangeKind::Create);
assert_eq!(pending[0].epoch, EpochId::PENDING);
assert_eq!(pending[0].labels.as_ref().unwrap(), &["Person"]);
}
#[test]
fn create_edge_versioned_buffers_event() {
let (cdc, _log) = setup();
let a = cdc.create_node(&["A"]);
let b = cdc.create_node(&["B"]);
let epoch = EpochId(1);
let tx = TransactionId::new(1);
let eid = cdc.create_edge_versioned(a, b, "KNOWS", epoch, tx);
let pending = cdc.pending_events().lock().clone();
let edge_events: Vec<_> = pending
.iter()
.filter(|e| e.entity_id == EntityId::Edge(eid))
.collect();
assert_eq!(edge_events.len(), 1);
assert_eq!(edge_events[0].kind, ChangeKind::Create);
assert_eq!(edge_events[0].edge_type.as_deref(), Some("KNOWS"));
assert_eq!(edge_events[0].src_id, Some(a.as_u64()));
assert_eq!(edge_events[0].dst_id, Some(b.as_u64()));
}
#[test]
fn delete_node_versioned_buffers_event_with_snapshot() {
let (cdc, _log) = setup();
let id = cdc.create_node(&["Person"]);
cdc.set_node_property(id, "name", Value::from("Alix"));
let epoch = EpochId(2);
let tx = TransactionId::new(1);
let deleted = cdc.delete_node_versioned(id, epoch, tx);
assert!(deleted);
let pending = cdc.pending_events().lock().clone();
let del_event = pending
.iter()
.find(|e| e.kind == ChangeKind::Delete)
.unwrap();
assert_eq!(del_event.epoch, EpochId::PENDING);
let before = del_event.before.as_ref().unwrap();
assert_eq!(before.get("name"), Some(&Value::from("Alix")));
let labels = del_event.labels.as_ref().unwrap();
assert!(labels.contains(&"Person".to_string()));
}
#[test]
fn delete_node_versioned_no_buffer_when_not_found() {
let (cdc, _log) = setup();
let tx = TransactionId::new(1);
let deleted = cdc.delete_node_versioned(NodeId::new(999), EpochId(1), tx);
assert!(!deleted);
assert!(cdc.pending_events().lock().is_empty());
}
#[test]
fn delete_edge_versioned_buffers_event() {
let (cdc, _log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let eid = cdc.create_edge(a, b, "E");
cdc.set_edge_property(eid, "w", Value::Int64(5));
let tx = TransactionId::new(1);
let deleted = cdc.delete_edge_versioned(eid, EpochId(2), tx);
assert!(deleted);
let pending = cdc.pending_events().lock().clone();
let del = pending
.iter()
.find(|e| e.kind == ChangeKind::Delete)
.unwrap();
assert_eq!(
del.before.as_ref().unwrap().get("w"),
Some(&Value::Int64(5))
);
}
#[test]
fn delete_edge_versioned_no_buffer_when_not_found() {
let (cdc, _log) = setup();
let tx = TransactionId::new(1);
assert!(!cdc.delete_edge_versioned(EdgeId::new(999), EpochId(1), tx));
assert!(cdc.pending_events().lock().is_empty());
}
#[test]
fn set_node_property_versioned_buffers_event() {
let (cdc, _log) = setup();
let id = cdc.create_node(&["N"]);
cdc.set_node_property(id, "x", Value::Int64(1));
let tx = TransactionId::new(1);
cdc.set_node_property_versioned(id, "x", Value::Int64(2), tx);
let pending = cdc.pending_events().lock().clone();
assert_eq!(pending.len(), 1);
let event = &pending[0];
assert_eq!(event.kind, ChangeKind::Update);
assert_eq!(
event.before.as_ref().unwrap().get("x"),
Some(&Value::Int64(1))
);
assert_eq!(
event.after.as_ref().unwrap().get("x"),
Some(&Value::Int64(2))
);
}
#[test]
fn set_edge_property_versioned_buffers_event() {
let (cdc, _log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let eid = cdc.create_edge(a, b, "E");
cdc.set_edge_property(eid, "w", Value::Float64(1.0));
let tx = TransactionId::new(1);
cdc.set_edge_property_versioned(eid, "w", Value::Float64(2.0), tx);
let pending = cdc.pending_events().lock().clone();
let edge_events: Vec<_> = pending
.iter()
.filter(|e| e.entity_id == EntityId::Edge(eid))
.collect();
assert_eq!(edge_events.len(), 1);
assert_eq!(
edge_events[0].before.as_ref().unwrap().get("w"),
Some(&Value::Float64(1.0))
);
assert_eq!(
edge_events[0].after.as_ref().unwrap().get("w"),
Some(&Value::Float64(2.0))
);
}
#[test]
fn remove_node_property_versioned_buffers_event() {
let (cdc, _log) = setup();
let id = cdc.create_node(&[]);
cdc.set_node_property(id, "x", Value::Int64(42));
let tx = TransactionId::new(1);
let removed = cdc.remove_node_property_versioned(id, "x", tx);
assert_eq!(removed, Some(Value::Int64(42)));
let pending = cdc.pending_events().lock().clone();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].kind, ChangeKind::Update);
assert_eq!(
pending[0].before.as_ref().unwrap().get("x"),
Some(&Value::Int64(42))
);
assert!(pending[0].after.is_none());
}
#[test]
fn remove_node_property_versioned_no_event_when_missing() {
let (cdc, _log) = setup();
let id = cdc.create_node(&[]);
let tx = TransactionId::new(1);
let removed = cdc.remove_node_property_versioned(id, "nope", tx);
assert!(removed.is_none());
assert!(cdc.pending_events().lock().is_empty());
}
#[test]
fn remove_edge_property_versioned_buffers_event() {
let (cdc, _log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let eid = cdc.create_edge(a, b, "E");
cdc.set_edge_property(eid, "w", Value::Int64(7));
let tx = TransactionId::new(1);
let removed = cdc.remove_edge_property_versioned(eid, "w", tx);
assert_eq!(removed, Some(Value::Int64(7)));
let pending = cdc.pending_events().lock().clone();
let edge_events: Vec<_> = pending
.iter()
.filter(|e| e.entity_id == EntityId::Edge(eid))
.collect();
assert_eq!(edge_events.len(), 1);
assert_eq!(
edge_events[0].before.as_ref().unwrap().get("w"),
Some(&Value::Int64(7))
);
}
#[test]
fn remove_edge_property_versioned_no_event_when_missing() {
let (cdc, _log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let eid = cdc.create_edge(a, b, "E");
let tx = TransactionId::new(1);
let removed = cdc.remove_edge_property_versioned(eid, "nope", tx);
assert!(removed.is_none());
assert!(cdc.pending_events().lock().is_empty());
}
#[test]
fn add_label_versioned_buffers_event() {
let (cdc, _log) = setup();
let id = cdc.create_node(&["Person"]);
let tx = TransactionId::new(1);
let added = cdc.add_label_versioned(id, "Employee", tx);
assert!(added);
let pending = cdc.pending_events().lock().clone();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].kind, ChangeKind::Update);
let labels = pending[0].labels.as_ref().unwrap();
assert!(labels.contains(&"Person".to_string()));
assert!(labels.contains(&"Employee".to_string()));
}
#[test]
fn add_label_versioned_no_buffer_when_duplicate() {
let (cdc, _log) = setup();
let id = cdc.create_node(&["Person"]);
let tx = TransactionId::new(1);
let added = cdc.add_label_versioned(id, "Person", tx);
assert!(!added);
assert!(cdc.pending_events().lock().is_empty());
}
#[test]
fn remove_label_versioned_buffers_event_with_old_labels() {
let (cdc, _log) = setup();
let id = cdc.create_node(&["Person", "Employee"]);
let tx = TransactionId::new(1);
let removed = cdc.remove_label_versioned(id, "Employee", tx);
assert!(removed);
let pending = cdc.pending_events().lock().clone();
assert_eq!(pending.len(), 1);
let labels = pending[0].labels.as_ref().unwrap();
assert!(labels.contains(&"Person".to_string()));
assert!(labels.contains(&"Employee".to_string()));
}
#[test]
fn remove_label_versioned_no_buffer_when_missing() {
let (cdc, _log) = setup();
let id = cdc.create_node(&["Person"]);
let tx = TransactionId::new(1);
let removed = cdc.remove_label_versioned(id, "Nonexistent", tx);
assert!(!removed);
assert!(cdc.pending_events().lock().is_empty());
}
#[test]
fn collect_node_properties_returns_none_for_empty() {
let (cdc, _log) = setup();
let id = cdc.create_node(&["N"]);
assert!(cdc.collect_node_properties(id).is_none());
}
#[test]
fn collect_node_properties_returns_map() {
let (cdc, _log) = setup();
let id = cdc.create_node(&["N"]);
cdc.set_node_property(id, "a", Value::Int64(1));
cdc.set_node_property(id, "b", Value::from("hello"));
let map = cdc.collect_node_properties(id).unwrap();
assert_eq!(map.get("a"), Some(&Value::Int64(1)));
assert_eq!(map.get("b"), Some(&Value::from("hello")));
}
#[test]
fn collect_node_properties_returns_none_for_nonexistent() {
let (cdc, _log) = setup();
assert!(cdc.collect_node_properties(NodeId::new(999)).is_none());
}
#[test]
fn collect_edge_properties_returns_none_for_empty() {
let (cdc, _log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let eid = cdc.create_edge(a, b, "E");
assert!(cdc.collect_edge_properties(eid).is_none());
}
#[test]
fn collect_edge_properties_returns_map() {
let (cdc, _log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let eid = cdc.create_edge(a, b, "E");
cdc.set_edge_property(eid, "w", Value::Float64(2.5));
let map = cdc.collect_edge_properties(eid).unwrap();
assert_eq!(map.get("w"), Some(&Value::Float64(2.5)));
}
#[test]
fn collect_node_labels_returns_labels() {
let (cdc, _log) = setup();
let id = cdc.create_node(&["Person", "Employee"]);
let labels = cdc.collect_node_labels(id).unwrap();
assert!(labels.contains(&"Person".to_string()));
assert!(labels.contains(&"Employee".to_string()));
}
#[test]
fn collect_node_labels_returns_none_for_nonexistent() {
let (cdc, _log) = setup();
assert!(cdc.collect_node_labels(NodeId::new(999)).is_none());
}
#[test]
fn next_ts_returns_increasing_timestamps() {
let (cdc, _log) = setup();
let t1 = cdc.next_ts();
let t2 = cdc.next_ts();
assert!(t2 > t1);
}
#[test]
fn make_event_creates_minimal_event() {
let event = make_event(
EntityId::Node(NodeId::new(1)),
ChangeKind::Create,
EpochId(5),
HlcTimestamp::zero(),
);
assert_eq!(event.entity_id, EntityId::Node(NodeId::new(1)));
assert_eq!(event.kind, ChangeKind::Create);
assert_eq!(event.epoch, EpochId(5));
assert!(event.before.is_none());
assert!(event.after.is_none());
assert!(event.labels.is_none());
assert!(event.edge_type.is_none());
assert!(event.src_id.is_none());
assert!(event.dst_id.is_none());
}
#[test]
fn versioned_read_delegates() {
let (cdc, _log) = setup();
let id = cdc.create_node(&["N"]);
let epoch = cdc.current_epoch();
let tx = TransactionId::new(0);
let _ = cdc.get_node_versioned(id, epoch, tx);
let _ = cdc.get_node_at_epoch(id, epoch);
let _ = cdc.is_node_visible_versioned(id, epoch, tx);
let _ = cdc.filter_visible_node_ids_versioned(&[id], epoch, tx);
}
#[test]
fn versioned_edge_read_delegates() {
let (cdc, _log) = setup();
let a = cdc.create_node(&[]);
let b = cdc.create_node(&[]);
let e = cdc.create_edge(a, b, "E");
let epoch = cdc.current_epoch();
let tx = TransactionId::new(0);
let _ = cdc.get_edge_versioned(e, epoch, tx);
let _ = cdc.get_edge_at_epoch(e, epoch);
let _ = cdc.is_edge_visible_versioned(e, epoch, tx);
}
}