use crate::backend::sqlite::SqliteGraphBackend;
use crate::backend::{EdgeSpec, GraphBackend, NodeSpec, PubSubEvent, SubscriptionFilter};
#[test]
fn test_sqlite_pubsub_subscribe() {
let graph = SqliteGraphBackend::in_memory().unwrap();
let filter = SubscriptionFilter::all();
let (sub_id, _receiver) = graph.subscribe(filter).unwrap();
assert!(sub_id > 0);
let removed = graph.unsubscribe(sub_id).unwrap();
assert!(removed);
let removed = graph.unsubscribe(sub_id).unwrap();
assert!(!removed);
}
#[test]
fn test_sqlite_pubsub_node_created_event() {
let graph = SqliteGraphBackend::in_memory().unwrap();
let filter = SubscriptionFilter {
node_changes: true,
edge_changes: false,
kv_changes: false,
snapshot_commits: false,
};
let (sub_id, receiver) = graph.subscribe(filter).unwrap();
let node_id = graph
.insert_node(NodeSpec {
kind: "Test".to_string(),
name: "test_node".to_string(),
file_path: None,
data: serde_json::json!({"key": "value"}),
})
.unwrap();
let event = receiver.recv_timeout(std::time::Duration::from_secs(1));
assert!(event.is_ok(), "Should receive node changed event");
match event.unwrap() {
PubSubEvent::NodeChanged {
node_id: event_node_id,
..
} => {
assert_eq!(event_node_id, node_id);
}
_ => panic!("Expected NodeChanged event"),
}
graph.unsubscribe(sub_id).unwrap();
}
#[test]
fn test_sqlite_pubsub_edge_created_event() {
let graph = SqliteGraphBackend::in_memory().unwrap();
let node1 = graph
.insert_node(NodeSpec {
kind: "Test".to_string(),
name: "node1".to_string(),
file_path: None,
data: serde_json::json!({}),
})
.unwrap();
let node2 = graph
.insert_node(NodeSpec {
kind: "Test".to_string(),
name: "node2".to_string(),
file_path: None,
data: serde_json::json!({}),
})
.unwrap();
let filter = SubscriptionFilter {
node_changes: false,
edge_changes: true,
kv_changes: false,
snapshot_commits: false,
};
let (sub_id, receiver) = graph.subscribe(filter).unwrap();
let edge_id = graph
.insert_edge(EdgeSpec {
from: node1,
to: node2,
edge_type: "connects".to_string(),
data: serde_json::json!({}),
})
.unwrap();
let event = receiver.recv_timeout(std::time::Duration::from_secs(1));
assert!(event.is_ok(), "Should receive edge changed event");
match event.unwrap() {
PubSubEvent::EdgeChanged {
edge_id: event_edge_id,
..
} => {
assert_eq!(event_edge_id, edge_id);
}
_ => panic!("Expected EdgeChanged event"),
}
graph.unsubscribe(sub_id).unwrap();
}
#[test]
fn test_sqlite_pubsub_filtered_events() {
let graph = SqliteGraphBackend::in_memory().unwrap();
let filter = SubscriptionFilter {
node_changes: false,
edge_changes: true,
kv_changes: false,
snapshot_commits: false,
};
let (sub_id, receiver) = graph.subscribe(filter).unwrap();
let _node_id = graph
.insert_node(NodeSpec {
kind: "Test".to_string(),
name: "node1".to_string(),
file_path: None,
data: serde_json::json!({}),
})
.unwrap();
let event = receiver.recv_timeout(std::time::Duration::from_millis(100));
assert!(
event.is_err(),
"Should not receive event for unsubscribed node changes"
);
graph.unsubscribe(sub_id).unwrap();
}
#[test]
fn test_sqlite_pubsub_multiple_subscribers() {
let graph = SqliteGraphBackend::in_memory().unwrap();
let filter = SubscriptionFilter::all();
let (sub1, recv1) = graph.subscribe(filter).unwrap();
let (sub2, recv2) = graph.subscribe(filter).unwrap();
let node_id = graph
.insert_node(NodeSpec {
kind: "Test".to_string(),
name: "shared_node".to_string(),
file_path: None,
data: serde_json::json!({}),
})
.unwrap();
let event1 = recv1.recv_timeout(std::time::Duration::from_secs(1));
let event2 = recv2.recv_timeout(std::time::Duration::from_secs(1));
assert!(event1.is_ok(), "Subscriber 1 should receive event");
assert!(event2.is_ok(), "Subscriber 2 should receive event");
match event1.unwrap() {
PubSubEvent::NodeChanged { node_id: id, .. } => assert_eq!(id, node_id),
_ => panic!("Subscriber 1 expected NodeChanged"),
}
match event2.unwrap() {
PubSubEvent::NodeChanged { node_id: id, .. } => assert_eq!(id, node_id),
_ => panic!("Subscriber 2 expected NodeChanged"),
}
graph.unsubscribe(sub1).unwrap();
graph.unsubscribe(sub2).unwrap();
}
#[test]
fn test_sqlite_pubsub_no_events_after_unsubscribe() {
let graph = SqliteGraphBackend::in_memory().unwrap();
let filter = SubscriptionFilter::all();
let (sub_id, receiver) = graph.subscribe(filter).unwrap();
graph.unsubscribe(sub_id).unwrap();
let _node_id = graph
.insert_node(NodeSpec {
kind: "Test".to_string(),
name: "late_node".to_string(),
file_path: None,
data: serde_json::json!({}),
})
.unwrap();
let event = receiver.recv_timeout(std::time::Duration::from_millis(100));
assert!(
event.is_err(),
"Should not receive event after unsubscribing"
);
}