use minigraf::{Minigraf, QueryResult};
use std::sync::Arc;
use std::thread;
use uuid::Uuid;
#[test]
fn test_concurrent_rule_registration() {
let db = Arc::new(Minigraf::in_memory().unwrap());
let handles: Vec<_> = (0..5)
.map(|i| {
let db = Arc::clone(&db);
thread::spawn(move || {
let predicate = format!("rule{}", i);
let rule_cmd = format!(r#"(rule [({} ?x ?y) [?x :connected{} ?y]])"#, predicate, i);
db.execute(&rule_cmd).unwrap();
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
for i in 0..5 {
let query_cmd = format!(r#"(query [:find ?y :where (rule{} :test ?y)])"#, i);
let _ = db.execute(&query_cmd);
}
}
#[test]
fn test_concurrent_rule_queries() {
let db = Arc::new(Minigraf::in_memory().unwrap());
let nodes: Vec<Uuid> = (0..10).map(|_| Uuid::new_v4()).collect();
let mut facts = String::from("(transact [");
for i in 0..9 {
facts.push_str(&format!(
r#"[#uuid "{}" :connected #uuid "{}"]"#,
nodes[i],
nodes[i + 1]
));
}
facts.push(']');
facts.push(')');
db.execute(&facts).unwrap();
db.execute(r#"(rule [(reach ?x ?y) [?x :connected ?y]])"#)
.unwrap();
db.execute(r#"(rule [(reach ?x ?y) [?x :connected ?z] (reach ?z ?y)])"#)
.unwrap();
let handles: Vec<_> = (0..10)
.map(|i| {
let db = Arc::clone(&db);
let node = nodes[i];
thread::spawn(move || {
let query_str =
format!(r#"(query [:find ?to :where (reach #uuid "{}" ?to)])"#, node);
let result = db.execute(&query_str).unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 9 - i);
}
_ => panic!("Expected QueryResults"),
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_concurrent_transact_and_rules() {
let db = Arc::new(Minigraf::in_memory().unwrap());
let handles: Vec<_> = (0..10)
.map(|i| {
let db = Arc::clone(&db);
thread::spawn(move || {
if i % 2 == 0 {
let a = Uuid::new_v4();
let b = Uuid::new_v4();
let cmd = format!(r#"(transact [[#uuid "{}" :attr{} #uuid "{}"]])"#, a, i, b);
db.execute(&cmd).unwrap();
} else {
let rule_cmd = format!(r#"(rule [(pred{} ?x ?y) [?x :attr{} ?y]])"#, i, i);
db.execute(&rule_cmd).unwrap();
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
let result = db
.execute(r#"(query [:find ?x :where [?x :attr0 ?y]])"#)
.unwrap();
match result {
QueryResult::QueryResults { .. } => {}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_concurrent_read_heavy() {
let db = Arc::new(Minigraf::in_memory().unwrap());
let a = Uuid::new_v4();
let b = Uuid::new_v4();
let c = Uuid::new_v4();
db.execute(&format!(
r#"(transact [[#uuid "{}" :connected #uuid "{}"]
[#uuid "{}" :connected #uuid "{}"]])"#,
a, b, b, c
))
.unwrap();
db.execute(r#"(rule [(reach ?x ?y) [?x :connected ?y]])"#)
.unwrap();
db.execute(r#"(rule [(reach ?x ?y) [?x :connected ?z] (reach ?z ?y)])"#)
.unwrap();
let handles: Vec<_> = (0..50)
.map(|_| {
let db = Arc::clone(&db);
let node = a;
thread::spawn(move || {
let query_str =
format!(r#"(query [:find ?to :where (reach #uuid "{}" ?to)])"#, node);
let result = db.execute(&query_str).unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 2); }
_ => panic!("Expected QueryResults"),
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_concurrent_recursive_evaluation() {
let db = Arc::new(Minigraf::in_memory().unwrap());
let mut all_facts = String::from("(transact [");
let mut chains: Vec<Vec<Uuid>> = Vec::new();
for _chain_id in 0..5 {
let nodes: Vec<Uuid> = (0..5).map(|_| Uuid::new_v4()).collect();
for i in 0..4 {
all_facts.push_str(&format!(
r#"[#uuid "{}" :connected #uuid "{}"]"#,
nodes[i],
nodes[i + 1]
));
}
chains.push(nodes);
}
all_facts.push(']');
all_facts.push(')');
db.execute(&all_facts).unwrap();
db.execute(r#"(rule [(reach ?x ?y) [?x :connected ?y]])"#)
.unwrap();
db.execute(r#"(rule [(reach ?x ?y) [?x :connected ?z] (reach ?z ?y)])"#)
.unwrap();
let handles: Vec<_> = chains
.iter()
.map(|chain| {
let db = Arc::clone(&db);
let start_node = chain[0];
thread::spawn(move || {
let query_str = format!(
r#"(query [:find ?to :where (reach #uuid "{}" ?to)])"#,
start_node
);
let result = db.execute(&query_str).unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 4); }
_ => panic!("Expected QueryResults"),
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_no_deadlocks_mixed_operations() {
let db = Arc::new(Minigraf::in_memory().unwrap());
let a = Uuid::new_v4();
let b = Uuid::new_v4();
db.execute(&format!(
r#"(transact [[#uuid "{}" :connected #uuid "{}"]])"#,
a, b
))
.unwrap();
db.execute(r#"(rule [(reach ?x ?y) [?x :connected ?y]])"#)
.unwrap();
let handles: Vec<_> = (0..20)
.map(|i| {
let db = Arc::clone(&db);
let node_a = a;
thread::spawn(move || {
match i % 4 {
0 => {
let x = Uuid::new_v4();
let y = Uuid::new_v4();
db.execute(&format!(
r#"(transact [[#uuid "{}" :attr #uuid "{}"]])"#,
x, y
))
.unwrap();
}
1 => {
let rule_cmd = format!(r#"(rule [(rule{} ?x ?y) [?x :attr ?y]])"#, i);
db.execute(&rule_cmd).unwrap();
}
2 => {
let query_str = format!(
r#"(query [:find ?to :where (reach #uuid "{}" ?to)])"#,
node_a
);
let _ = db.execute(&query_str);
}
3 => {
let _ = db.execute(r#"(query [:find ?to :where [?from :attr ?to]])"#);
}
_ => unreachable!(),
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_rwlock_consistency() {
let db = Arc::new(Minigraf::in_memory().unwrap());
let write_handles: Vec<_> = (0..10)
.map(|i| {
let db = Arc::clone(&db);
thread::spawn(move || {
for j in 0..5 {
let rule_cmd =
format!(r#"(rule [(pred{}-{} ?x ?y) [?x :attr{} ?y]])"#, i, j, i);
db.execute(&rule_cmd).unwrap();
}
})
})
.collect();
let read_handles: Vec<_> = (0..10)
.map(|_| {
let db = Arc::clone(&db);
thread::spawn(move || {
for _ in 0..10 {
let _ = db.execute(r#"(query [:find ?x :where [?x :attr0 ?y]])"#);
}
})
})
.collect();
for handle in write_handles {
handle.join().unwrap();
}
for handle in read_handles {
handle.join().unwrap();
}
for i in 0..10 {
for j in 0..5 {
let query_cmd = format!(r#"(query [:find ?y :where (pred{}-{} :test ?y)])"#, i, j);
let _ = db.execute(&query_cmd);
}
}
}