use ruvector_graph::{Edge, GraphDB, Label, Node, Properties, PropertyValue};
use std::sync::Arc;
use std::thread;
#[test]
fn test_concurrent_node_creation() {
let db = Arc::new(GraphDB::new());
let num_threads = 10;
let nodes_per_thread = 100;
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let db_clone = Arc::clone(&db);
thread::spawn(move || {
for i in 0..nodes_per_thread {
let mut props = Properties::new();
props.insert("thread".to_string(), PropertyValue::Integer(thread_id));
props.insert("index".to_string(), PropertyValue::Integer(i));
let node = Node::new(
format!("node_{}_{}", thread_id, i),
vec![Label {
name: "Concurrent".to_string(),
}],
props,
);
db_clone.create_node(node).unwrap();
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_concurrent_reads() {
let db = Arc::new(GraphDB::new());
for i in 0..100 {
let node = Node::new(format!("node_{}", i), vec![], Properties::new());
db.create_node(node).unwrap();
}
let num_readers = 20;
let reads_per_thread = 1000;
let handles: Vec<_> = (0..num_readers)
.map(|thread_id| {
let db_clone = Arc::clone(&db);
thread::spawn(move || {
for i in 0..reads_per_thread {
let node_id = format!("node_{}", (thread_id * 10 + i) % 100);
let result = db_clone.get_node(&node_id);
assert!(result.is_some());
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_concurrent_writes_no_collision() {
let db = Arc::new(GraphDB::new());
let num_threads = 10;
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let db_clone = Arc::clone(&db);
thread::spawn(move || {
for i in 0..50 {
let node_id = format!("t{}_n{}", thread_id, i);
let node = Node::new(node_id, vec![], Properties::new());
db_clone.create_node(node).unwrap();
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_concurrent_edge_creation() {
let db = Arc::new(GraphDB::new());
for i in 0..100 {
db.create_node(Node::new(format!("n{}", i), vec![], Properties::new()))
.unwrap();
}
let num_threads = 10;
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let db_clone = Arc::clone(&db);
thread::spawn(move || {
for i in 0..50 {
let from = format!("n{}", (thread_id * 10 + i) % 100);
let to = format!("n{}", (thread_id * 10 + i + 1) % 100);
let edge = Edge::new(
format!("e_{}_{}", thread_id, i),
from,
to,
"LINK".to_string(),
Properties::new(),
);
db_clone.create_edge(edge).unwrap();
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_concurrent_read_while_writing() {
let db = Arc::new(GraphDB::new());
for i in 0..50 {
db.create_node(Node::new(
format!("initial_{}", i),
vec![],
Properties::new(),
))
.unwrap();
}
let num_readers = 5;
let num_writers = 3;
let mut handles = vec![];
for reader_id in 0..num_readers {
let db_clone = Arc::clone(&db);
let handle = thread::spawn(move || {
for i in 0..100 {
let node_id = format!("initial_{}", (reader_id * 10 + i) % 50);
let _ = db_clone.get_node(&node_id);
}
});
handles.push(handle);
}
for writer_id in 0..num_writers {
let db_clone = Arc::clone(&db);
let handle = thread::spawn(move || {
for i in 0..100 {
let node = Node::new(
format!("new_{}_{}", writer_id, i),
vec![],
Properties::new(),
);
db_clone.create_node(node).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_concurrent_property_updates() {
let db = Arc::new(GraphDB::new());
let mut props = Properties::new();
props.insert("counter".to_string(), PropertyValue::Integer(0));
db.create_node(Node::new("counter".to_string(), vec![], props))
.unwrap();
let num_threads = 10;
let handles: Vec<_> = (0..num_threads)
.map(|_| {
let db_clone = Arc::clone(&db);
thread::spawn(move || {
for _ in 0..100 {
let _node = db_clone.get_node("counter");
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_lock_free_reads() {
let db = Arc::new(GraphDB::new());
for i in 0..1000 {
db.create_node(Node::new(
format!("node_{}", i),
vec![Label {
name: "Test".to_string(),
}],
Properties::new(),
))
.unwrap();
}
let num_readers = 50;
let handles: Vec<_> = (0..num_readers)
.map(|reader_id| {
let db_clone = Arc::clone(&db);
thread::spawn(move || {
for i in 0..100 {
let node_id = format!("node_{}", (reader_id * 20 + i) % 1000);
let result = db_clone.get_node(&node_id);
assert!(result.is_some());
}
})
})
.collect();
let start = std::time::Instant::now();
for handle in handles {
handle.join().unwrap();
}
let duration = start.elapsed();
println!("Concurrent reads took: {:?}", duration);
}
#[test]
fn test_writer_starvation_prevention() {
let db = Arc::new(GraphDB::new());
for i in 0..100 {
db.create_node(Node::new(
format!("initial_{}", i),
vec![],
Properties::new(),
))
.unwrap();
}
let readers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
let writers_done = Arc::new(std::sync::atomic::AtomicBool::new(false));
let mut handles = vec![];
for reader_id in 0..20i64 {
let db_clone = Arc::clone(&db);
let done = Arc::clone(&readers_done);
let handle = thread::spawn(move || {
for i in 0..1000i64 {
let node_id = format!("initial_{}", (reader_id + i) % 100);
let _ = db_clone.get_node(&node_id);
}
done.store(true, std::sync::atomic::Ordering::Relaxed);
});
handles.push(handle);
}
for writer_id in 0..5 {
let db_clone = Arc::clone(&db);
let done = Arc::clone(&writers_done);
let handle = thread::spawn(move || {
for i in 0..50 {
let node = Node::new(
format!("writer_{}_{}", writer_id, i),
vec![],
Properties::new(),
);
db_clone.create_node(node).unwrap();
}
done.store(true, std::sync::atomic::Ordering::Relaxed);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert!(readers_done.load(std::sync::atomic::Ordering::Relaxed));
assert!(writers_done.load(std::sync::atomic::Ordering::Relaxed));
}
#[test]
fn test_high_concurrency_stress() {
let db = Arc::new(GraphDB::new());
let num_threads = 50;
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let db_clone = Arc::clone(&db);
thread::spawn(move || {
for i in 0i32..100 {
if i % 3 == 0 {
let node = Node::new(
format!("stress_{}_{}", thread_id, i),
vec![],
Properties::new(),
);
db_clone.create_node(node).unwrap();
} else {
let node_id = format!("stress_{}_{}", thread_id, i.saturating_sub(1));
let _ = db_clone.get_node(&node_id);
}
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_concurrent_batch_operations() {
let db = Arc::new(GraphDB::new());
let num_threads = 10;
let batch_size = 100;
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let db_clone = Arc::clone(&db);
thread::spawn(move || {
for i in 0..batch_size {
let node = Node::new(
format!("batch_{}_{}", thread_id, i),
vec![],
Properties::new(),
);
db_clone.create_node(node).unwrap();
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}