#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
clippy::needless_raw_string_hashes,
clippy::duration_suboptimal_units,
clippy::branches_sharing_code,
clippy::used_underscore_binding,
clippy::single_char_pattern,
clippy::ignore_without_reason,
clippy::cloned_ref_to_slice_refs,
clippy::doc_overindented_list_items,
clippy::match_wildcard_for_single_variants,
clippy::ignored_unit_patterns,
clippy::needless_collect,
clippy::unnecessary_map_or,
clippy::manual_flatten,
clippy::manual_strip,
clippy::future_not_send,
clippy::unnested_or_patterns,
clippy::no_effect_underscore_binding,
clippy::literal_string_with_formatting_args
)]
use ggen_core::graph::{CachedResult, Graph};
use std::sync::{Arc, Barrier};
use std::thread;
fn make_graph_with_triples(count: usize) -> Graph {
let graph = Graph::new().expect("Graph::new");
let mut turtle = String::from("@prefix ex: <http://example.org/> .\n");
for i in 0..count {
turtle.push_str(&format!("ex:item{i} ex:value \"{i}\" .\n"));
}
graph
.insert_turtle(&turtle)
.expect("insert_turtle in make_graph_with_triples");
graph
}
const COUNT_QUERY: &str = "SELECT (COUNT(*) AS ?n) WHERE { ?s ?p ?o }";
const ALL_ITEMS_QUERY: &str = "SELECT ?item WHERE { ?item <http://example.org/value> ?v }";
const ALL_NAMED_QUERY: &str = "SELECT ?item WHERE { ?item <http://example.org/name> ?v }";
fn row_count(result: &CachedResult) -> usize {
match result {
CachedResult::Solutions(rows) => rows.len(),
_ => 0,
}
}
fn extract_count(result: &CachedResult) -> usize {
match result {
CachedResult::Solutions(rows) => rows
.first()
.and_then(|r| r.get("n"))
.and_then(|v| {
v.chars()
.take_while(|c| c.is_ascii_digit())
.collect::<String>()
.parse::<usize>()
.ok()
})
.unwrap_or(0),
_ => 0,
}
}
#[test]
fn test_clone_shares_underlying_store() {
let graph = make_graph_with_triples(3);
let clone = graph.clone();
assert_eq!(graph.len(), clone.len());
assert!(!graph.is_empty());
clone
.insert_turtle(
r#"@prefix ex: <http://example.org/> .
ex:extra ex:value "extra" ."#,
)
.expect("insert_turtle on clone");
assert_eq!(graph.len(), clone.len());
assert_eq!(graph.len(), 4);
}
#[test]
fn test_concurrent_reads_do_not_panic() {
let graph = Arc::new(make_graph_with_triples(5));
let barrier = Arc::new(Barrier::new(4));
let mut handles = Vec::new();
for i in 0..4 {
let g = Arc::clone(&graph);
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
let result = g
.query_cached(ALL_ITEMS_QUERY)
.expect("query_cached in thread");
assert_eq!(
row_count(&result),
5,
"thread {i}: expected 5 rows, got {}",
row_count(&result)
);
}));
}
for h in handles {
h.join().expect("thread should not panic");
}
}
#[test]
fn test_concurrent_writes_visible_across_clones() {
let graph = Arc::new(Graph::new().expect("Graph::new"));
let barrier = Arc::new(Barrier::new(2));
let writer = {
let g = Arc::clone(&graph);
let b = Arc::clone(&barrier);
thread::spawn(move || {
b.wait();
g.insert_turtle(
r#"@prefix ex: <http://example.org/> .
ex:alice ex:name "Alice" ."#,
)
.expect("insert_turtle in writer");
})
};
let reader = {
let g = Arc::clone(&graph);
let b = Arc::clone(&barrier);
thread::spawn(move || {
b.wait();
thread::sleep(std::time::Duration::from_millis(50));
let result = g
.query_cached(ALL_NAMED_QUERY)
.expect("query_cached in reader");
assert_eq!(
row_count(&result),
1,
"reader should see the triple inserted by writer"
);
})
};
writer.join().expect("writer should not panic");
reader.join().expect("reader should not panic");
}
#[test]
fn test_concurrent_read_write_does_not_deadlock() {
let graph = Arc::new(make_graph_with_triples(2));
const ITERATIONS: usize = 50;
let writer = {
let g = Arc::clone(&graph);
thread::spawn(move || {
for i in 0..ITERATIONS {
let turtle = format!(
r#"@prefix ex: <http://example.org/> .
ex:w{i} ex:value "{i}" ."#
);
g.insert_turtle(&turtle)
.expect("insert_turtle in writer loop");
}
})
};
let reader = {
let g = Arc::clone(&graph);
thread::spawn(move || {
for _ in 0..ITERATIONS {
let result = g
.query_cached(COUNT_QUERY)
.expect("query_cached in reader loop");
let _n = extract_count(&result);
}
})
};
writer.join().expect("writer thread panicked or deadlocked");
reader.join().expect("reader thread panicked or deadlocked");
}
#[test]
fn test_len_consistent_after_concurrent_inserts() {
let graph = Arc::new(Graph::new().expect("Graph::new"));
let thread_count = 8;
let barrier = Arc::new(Barrier::new(thread_count));
let mut handles = Vec::new();
for i in 0..thread_count {
let g = Arc::clone(&graph);
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
let turtle = format!(
r#"@prefix ex: <http://example.org/> .
ex:t{i} ex:value "{i}" ."#
);
g.insert_turtle(&turtle)
.expect("insert_turtle in concurrent len test");
}));
}
for h in handles {
h.join().expect("thread should not panic");
}
assert_eq!(
graph.len(),
thread_count,
"expected {thread_count} triples after concurrent inserts, got {}",
graph.len()
);
}
#[test]
fn test_cache_invalidation_after_concurrent_insert() {
let graph = Graph::new().expect("Graph::new");
graph
.insert_turtle(
r#"@prefix ex: <http://example.org/> .
ex:alice ex:value "Alice" ."#,
)
.expect("initial insert");
let r1 = graph.query_cached(ALL_ITEMS_QUERY).expect("first query");
assert_eq!(row_count(&r1), 1, "initial query should return 1 row");
graph
.insert_turtle(
r#"@prefix ex: <http://example.org/> .
ex:bob ex:value "Bob" ."#,
)
.expect("second insert");
let r2 = graph.query_cached(ALL_ITEMS_QUERY).expect("second query");
assert_eq!(
row_count(&r2),
2,
"after insert, query should return 2 rows (cache invalidated)"
);
}
#[test]
fn test_independent_graphs_do_not_interfere() {
let graph_a = Graph::new().expect("Graph::new A");
let graph_b = Graph::new().expect("Graph::new B");
graph_a
.insert_turtle(
r#"@prefix ex: <http://example.org/> .
ex:alpha ex:value "1" ."#,
)
.expect("insert into A");
graph_b
.insert_turtle(
r#"@prefix ex: <http://example.org/> .
ex:beta ex:value "2" ."#,
)
.expect("insert into B");
assert_eq!(graph_a.len(), 1, "graph_a should have exactly 1 triple");
assert_eq!(graph_b.len(), 1, "graph_b should have exactly 1 triple");
let result_a = graph_a
.query_cached("SELECT ?item WHERE { ?item <http://example.org/value> \"1\" }")
.expect("query A");
assert_eq!(row_count(&result_a), 1);
let result_b = graph_b
.query_cached("SELECT ?item WHERE { ?item <http://example.org/value> \"2\" }")
.expect("query B");
assert_eq!(row_count(&result_b), 1);
}
#[test]
fn test_concurrent_identical_query_cached_returns_same_result() {
let graph = Arc::new(make_graph_with_triples(3));
let barrier = Arc::new(Barrier::new(4));
let mut handles = Vec::new();
for i in 0..4 {
let g = Arc::clone(&graph);
let b = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
b.wait();
let result = g
.query_cached(ALL_ITEMS_QUERY)
.expect("query_cached in identical query test");
let count = row_count(&result);
assert_eq!(count, 3, "thread {i}: expected 3 triples, got {count}");
}));
}
for h in handles {
h.join().expect("thread should not panic");
}
}