#![cfg(feature = "native-v2")]
use std::thread::{self, JoinHandle};
use std::time::Duration;
use sqlitegraph::{
EdgeSpec, GraphConfig, NodeSpec, backend::PubSubEvent, backend::SubscriptionFilter,
backend::native::v2::pubsub::PubSubEventType, open_graph,
};
fn create_test_graph() -> (tempfile::TempDir, std::path::PathBuf) {
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");
let db_path = temp_dir.path().join("test.db");
let graph = open_graph(&db_path, &GraphConfig::native()).expect("Failed to create graph");
std::mem::drop(graph);
(temp_dir, db_path)
}
#[test]
fn test_concurrent_subscribers_no_contention() {
let (_temp_dir, db_path) = create_test_graph();
let graph = open_graph(&db_path, &GraphConfig::native()).expect("Failed to open graph");
let mut receivers = Vec::new();
let mut subscriber_ids = Vec::new();
for i in 0..10 {
let filter = match i % 3 {
0 => SubscriptionFilter::event_types(vec![PubSubEventType::Node]),
1 => SubscriptionFilter::event_types(vec![PubSubEventType::Edge]),
_ => SubscriptionFilter::all(),
};
let (id, rx) = graph.subscribe(filter).expect("Failed to subscribe");
receivers.push(rx);
subscriber_ids.push(id);
}
let mut node_ids = Vec::new();
for i in 0..10 {
let node_id = graph
.insert_node(NodeSpec {
kind: "Node".to_string(),
name: format!("node_{}", i),
file_path: None,
data: serde_json::json!({"id": i}),
})
.expect("Failed to insert node");
node_ids.push(node_id);
}
for i in 0..9 {
graph
.insert_edge(EdgeSpec {
from: node_ids[i],
to: node_ids[i + 1],
edge_type: "chain".to_string(),
data: serde_json::json!({"order": i}),
})
.expect("Failed to insert edge");
}
for sub_id in subscriber_ids {
let removed = graph.unsubscribe(sub_id).expect("Failed to unsubscribe");
assert!(removed, "Subscriber should exist");
}
}
#[test]
fn test_subscribe_unsubscribe_during_commits() {
let (_temp_dir, db_path) = create_test_graph();
let graph = open_graph(&db_path, &GraphConfig::native()).expect("Failed to open graph");
for round in 0..5 {
let (sub_id, _rx) = graph
.subscribe(SubscriptionFilter::all())
.expect("Failed to subscribe");
for i in 0..5 {
let _node_id = graph
.insert_node(NodeSpec {
kind: "Node".to_string(),
name: format!("node_{}_{}", round, i),
file_path: None,
data: serde_json::json!({"round": round, "i": i}),
})
.expect("Failed to insert node");
}
let removed = graph.unsubscribe(sub_id).expect("Failed to unsubscribe");
assert!(removed, "Subscriber should exist");
}
}
#[test]
fn test_dropped_receiver_doesnt_block_commit() {
let (_temp_dir, db_path) = create_test_graph();
let graph = open_graph(&db_path, &GraphConfig::native()).expect("Failed to open graph");
let mut subscriber_ids = Vec::new();
for _ in 0..5 {
let (id, _rx) = graph
.subscribe(SubscriptionFilter::all())
.expect("Failed to subscribe");
subscriber_ids.push(id);
}
let mut node_ids = Vec::new();
for i in 0..10 {
let node_id = graph
.insert_node(NodeSpec {
kind: "Node".to_string(),
name: format!("node_{}", i),
file_path: None,
data: serde_json::json!({"id": i}),
})
.expect("Failed to insert node");
node_ids.push(node_id);
}
for i in 0..9 {
graph
.insert_edge(EdgeSpec {
from: node_ids[i],
to: node_ids[i + 1],
edge_type: "chain".to_string(),
data: serde_json::json!({"order": i}),
})
.expect("Failed to insert edge");
}
for sub_id in subscriber_ids {
let _ = graph.unsubscribe(sub_id);
}
}
#[test]
fn test_filter_api_works() {
let (_temp_dir, db_path) = create_test_graph();
let graph = open_graph(&db_path, &GraphConfig::native()).expect("Failed to open graph");
let (node_sub_id, _node_rx) = graph
.subscribe(SubscriptionFilter::event_types(vec![PubSubEventType::Node]))
.expect("Failed to subscribe");
let (edge_sub_id, _edge_rx) = graph
.subscribe(SubscriptionFilter::event_types(vec![PubSubEventType::Edge]))
.expect("Failed to subscribe");
let (all_sub_id, _all_rx) = graph
.subscribe(SubscriptionFilter::all())
.expect("Failed to subscribe");
let mut node_ids = Vec::new();
for i in 0..5 {
let node_id = graph
.insert_node(NodeSpec {
kind: "Node".to_string(),
name: format!("node_{}", i),
file_path: None,
data: serde_json::json!({"id": i}),
})
.expect("Failed to insert node");
node_ids.push(node_id);
}
for i in 0..4 {
graph
.insert_edge(EdgeSpec {
from: node_ids[i],
to: node_ids[i + 1],
edge_type: "chain".to_string(),
data: serde_json::json!({"order": i}),
})
.expect("Failed to insert edge");
}
assert!(graph.unsubscribe(node_sub_id).unwrap());
assert!(graph.unsubscribe(edge_sub_id).unwrap());
assert!(graph.unsubscribe(all_sub_id).unwrap());
}
#[test]
fn test_multiple_subscribers_no_crashes() {
let (_temp_dir, db_path) = create_test_graph();
let graph = open_graph(&db_path, &GraphConfig::native()).expect("Failed to open graph");
let (sub1_id, _rx1) = graph
.subscribe(SubscriptionFilter::all())
.expect("Failed to subscribe");
let (sub2_id, _rx2) = graph
.subscribe(SubscriptionFilter::all())
.expect("Failed to subscribe");
let (sub3_id, _rx3) = graph
.subscribe(SubscriptionFilter::all())
.expect("Failed to subscribe");
for i in 0..5 {
let _node_id = graph
.insert_node(NodeSpec {
kind: "Node".to_string(),
name: format!("node_{}", i),
file_path: None,
data: serde_json::json!({"id": i}),
})
.expect("Failed to insert node");
}
assert!(graph.unsubscribe(sub1_id).unwrap());
assert!(graph.unsubscribe(sub2_id).unwrap());
assert!(graph.unsubscribe(sub3_id).unwrap());
assert!(!graph.unsubscribe(sub1_id).unwrap());
}
#[test]
fn test_unsubscribe_api_works() {
let (_temp_dir, db_path) = create_test_graph();
let graph = open_graph(&db_path, &GraphConfig::native()).expect("Failed to open graph");
let (sub_id, _rx) = graph
.subscribe(SubscriptionFilter::all())
.expect("Failed to subscribe");
let _node_id1 = graph
.insert_node(NodeSpec {
kind: "Node".to_string(),
name: "node_1".to_string(),
file_path: None,
data: serde_json::json!({"id": 1}),
})
.expect("Failed to insert node");
let removed = graph.unsubscribe(sub_id).expect("Failed to unsubscribe");
assert!(removed, "Subscriber should exist");
let _node_id2 = graph
.insert_node(NodeSpec {
kind: "Node".to_string(),
name: "node_2".to_string(),
file_path: None,
data: serde_json::json!({"id": 2}),
})
.expect("Failed to insert node");
let removed_again = graph.unsubscribe(sub_id).expect("Failed to unsubscribe");
assert!(!removed_again, "Second unsubscribe should return false");
}