use crate::storage::dict::{put_term_full, term_type};
use oxrdf::{BlankNode, Literal, NamedNode, NamedOrBlankNode, Term, Triple};
use pgrx::prelude::*;
use reasonable::reasoner::Reasoner;
use serde_json::json;
use std::collections::HashSet;
use std::time::Instant;
#[pg_extern]
fn materialize(graph_id: i64) -> pgrx::JsonB {
let start = Instant::now();
let dropped: i64 = Spi::connect_mut(|client| {
let table = client
.update(
"WITH del AS (DELETE FROM pgrdf._pgrdf_quads
WHERE graph_id = $1 AND is_inferred = TRUE
RETURNING 1)
SELECT count(*)::bigint FROM del",
None,
&[unsafe {
pgrx::datum::DatumWithOid::new(
graph_id,
pgrx::pg_sys::PgBuiltInOids::INT8OID.into(),
)
}],
)
.expect("materialize: delete-prior failed");
table.first().get_one::<i64>().ok().flatten().unwrap_or(0)
});
let base = load_base_triples(graph_id);
let base_count = base.len() as i64;
let base_set: HashSet<Triple> = base.iter().cloned().collect();
let mut reasoner = Reasoner::new();
reasoner.load_triples(base);
reasoner.reason();
let errors: Vec<String> = reasoner.errors().iter().map(|e| format!("{e}")).collect();
let derived = reasoner.get_triples();
let inferred: Vec<&Triple> = derived.iter().filter(|t| !base_set.contains(t)).collect();
let mut written = 0i64;
for t in &inferred {
let s_id = subject_id(&t.subject);
let p_id = put_term_full(t.predicate.as_str(), term_type::URI, None, None);
let o_id = term_id(&t.object);
Spi::run_with_args(
"INSERT INTO pgrdf._pgrdf_quads
(subject_id, predicate_id, object_id, graph_id, is_inferred)
VALUES ($1, $2, $3, $4, TRUE)",
&[s_id.into(), p_id.into(), o_id.into(), graph_id.into()],
)
.expect("materialize: insert inferred failed");
written += 1;
}
pgrx::JsonB(json!({
"base_triples": base_count,
"inferred_triples_written": written,
"previous_inferred_dropped": dropped,
"reasoner_errors": errors,
"elapsed_ms": start.elapsed().as_secs_f64() * 1000.0,
}))
}
fn load_base_triples(graph_id: i64) -> Vec<Triple> {
let mut out = Vec::new();
Spi::connect(|client| {
let table = client
.select(
"SELECT
s.term_type, s.lexical_value,
p.lexical_value AS p_iri,
o.term_type, o.lexical_value,
dt.lexical_value AS o_dt,
o.language_tag AS o_lang
FROM pgrdf._pgrdf_quads q
JOIN pgrdf._pgrdf_dictionary s ON s.id = q.subject_id
JOIN pgrdf._pgrdf_dictionary p ON p.id = q.predicate_id
JOIN pgrdf._pgrdf_dictionary o ON o.id = q.object_id
LEFT JOIN pgrdf._pgrdf_dictionary dt ON dt.id = o.datatype_iri_id
WHERE q.graph_id = $1 AND q.is_inferred = FALSE",
None,
&[unsafe {
pgrx::datum::DatumWithOid::new(
graph_id,
pgrx::pg_sys::PgBuiltInOids::INT8OID.into(),
)
}],
)
.expect("materialize: base select failed");
for row in table {
let s_type: i16 = row
.get(1)
.ok()
.flatten()
.expect("materialize: subject term_type");
let s_val: String = row
.get(2)
.ok()
.flatten()
.expect("materialize: subject value");
let p_iri: String = row
.get(3)
.ok()
.flatten()
.expect("materialize: predicate iri");
let o_type: i16 = row
.get(4)
.ok()
.flatten()
.expect("materialize: object term_type");
let o_val: String = row
.get(5)
.ok()
.flatten()
.expect("materialize: object value");
let o_dt: Option<String> = row.get(6).ok().flatten();
let o_lang: Option<String> = row.get(7).ok().flatten();
let subject = build_subject(s_type, &s_val);
let predicate = match NamedNode::new(&p_iri) {
Ok(n) => n,
Err(_) => continue, };
let object = build_object(o_type, &o_val, o_dt.as_deref(), o_lang.as_deref());
out.push(Triple::new(subject, predicate, object));
}
});
out
}
fn build_subject(t_type: i16, value: &str) -> NamedOrBlankNode {
match t_type {
term_type::URI => NamedOrBlankNode::NamedNode(NamedNode::new(value).unwrap_or_else(|_| {
NamedNode::new("urn:pgrdf:invalid-iri")
.expect("materialize: urn:pgrdf:invalid-iri sentinel is well-formed")
})),
term_type::BLANK_NODE => NamedOrBlankNode::BlankNode(
BlankNode::new(value).unwrap_or_else(|_| BlankNode::default()),
),
_ => NamedOrBlankNode::BlankNode(BlankNode::default()),
}
}
fn build_object(
t_type: i16,
value: &str,
datatype_iri: Option<&str>,
language: Option<&str>,
) -> Term {
match t_type {
term_type::URI => Term::NamedNode(NamedNode::new(value).unwrap_or_else(|_| {
NamedNode::new("urn:pgrdf:invalid-iri")
.expect("materialize: urn:pgrdf:invalid-iri sentinel is well-formed")
})),
term_type::BLANK_NODE => {
Term::BlankNode(BlankNode::new(value).unwrap_or_else(|_| BlankNode::default()))
}
_ => {
if let Some(lang) = language {
match Literal::new_language_tagged_literal(value, lang) {
Ok(l) => Term::Literal(l),
Err(_) => Term::Literal(Literal::new_simple_literal(value)),
}
} else if let Some(dt) = datatype_iri {
match NamedNode::new(dt) {
Ok(dt_node) => Term::Literal(Literal::new_typed_literal(value, dt_node)),
Err(_) => Term::Literal(Literal::new_simple_literal(value)),
}
} else {
Term::Literal(Literal::new_simple_literal(value))
}
}
}
}
fn subject_id(s: &NamedOrBlankNode) -> i64 {
match s {
NamedOrBlankNode::NamedNode(n) => put_term_full(n.as_str(), term_type::URI, None, None),
NamedOrBlankNode::BlankNode(b) => {
put_term_full(b.as_str(), term_type::BLANK_NODE, None, None)
}
}
}
fn term_id(t: &Term) -> i64 {
match t {
Term::NamedNode(n) => put_term_full(n.as_str(), term_type::URI, None, None),
Term::BlankNode(b) => put_term_full(b.as_str(), term_type::BLANK_NODE, None, None),
Term::Literal(lit) => {
let lang = lit.language();
let datatype_id = if lang.is_some() {
None
} else {
Some(put_term_full(
lit.datatype().as_str(),
term_type::URI,
None,
None,
))
};
put_term_full(lit.value(), term_type::LITERAL, datatype_id, lang)
}
#[allow(unreachable_patterns)]
_ => panic!("materialize: unsupported object term (RDF-star out of scope)"),
}
}
#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
use pgrx::prelude::*;
#[pg_test]
fn materialize_subclass_chain() {
let ttl = r#"
@prefix ex: <http://example.com/> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
ex:Engineer rdfs:subClassOf ex:Person .
ex:alice rdf:type ex:Engineer .
"#;
let g: i64 = 8400;
Spi::run_with_args("SELECT pgrdf.add_graph($1)", &[g.into()]).unwrap();
Spi::run_with_args("SELECT pgrdf.parse_turtle($1, $2)", &[ttl.into(), g.into()]).unwrap();
let j: pgrx::JsonB = Spi::get_one_with_args("SELECT pgrdf.materialize($1)", &[g.into()])
.unwrap()
.unwrap();
let v = &j.0;
assert_eq!(v["base_triples"], 2);
assert!(
v["inferred_triples_written"].as_i64().unwrap() >= 1,
"expected at least one inferred triple, got {}",
v["inferred_triples_written"]
);
let person_count: i64 = Spi::get_one_with_args(
"SELECT count(*)::bigint FROM pgrdf._pgrdf_quads q
JOIN pgrdf._pgrdf_dictionary s ON s.id = q.subject_id
JOIN pgrdf._pgrdf_dictionary p ON p.id = q.predicate_id
JOIN pgrdf._pgrdf_dictionary o ON o.id = q.object_id
WHERE q.graph_id = $1
AND q.is_inferred = TRUE
AND s.lexical_value = 'http://example.com/alice'
AND p.lexical_value = 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type'
AND o.lexical_value = 'http://example.com/Person'",
&[g.into()],
)
.unwrap()
.unwrap();
assert_eq!(person_count, 1, "ex:alice a ex:Person must be inferred");
}
#[pg_test]
fn materialize_is_idempotent() {
let ttl = r#"
@prefix ex: <http://example.com/> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
ex:B rdfs:subClassOf ex:A .
ex:x rdf:type ex:B .
"#;
let g: i64 = 8401;
Spi::run_with_args("SELECT pgrdf.add_graph($1)", &[g.into()]).unwrap();
Spi::run_with_args("SELECT pgrdf.parse_turtle($1, $2)", &[ttl.into(), g.into()]).unwrap();
let first: pgrx::JsonB =
Spi::get_one_with_args("SELECT pgrdf.materialize($1)", &[g.into()])
.unwrap()
.unwrap();
let second: pgrx::JsonB =
Spi::get_one_with_args("SELECT pgrdf.materialize($1)", &[g.into()])
.unwrap()
.unwrap();
let n1 = first.0["inferred_triples_written"].as_i64().unwrap();
let n2 = second.0["inferred_triples_written"].as_i64().unwrap();
let dropped_2 = second.0["previous_inferred_dropped"].as_i64().unwrap();
assert_eq!(n1, n2, "two materialize runs must produce same row count");
assert_eq!(
dropped_2, n1,
"second call must drop the first call's output"
);
}
#[pg_test]
fn materialize_pure_data_preserves_input() {
let ttl = r#"
@prefix ex: <http://example.com/> .
ex:a ex:p ex:b .
ex:c ex:q ex:d .
"#;
let g: i64 = 8402;
Spi::run_with_args("SELECT pgrdf.add_graph($1)", &[g.into()]).unwrap();
Spi::run_with_args("SELECT pgrdf.parse_turtle($1, $2)", &[ttl.into(), g.into()]).unwrap();
let j: pgrx::JsonB = Spi::get_one_with_args("SELECT pgrdf.materialize($1)", &[g.into()])
.unwrap()
.unwrap();
assert_eq!(j.0["base_triples"], 2);
let written = j.0["inferred_triples_written"].as_i64().unwrap();
assert!(written >= 0);
let base_still_there: i64 = Spi::get_one_with_args(
"SELECT count(*)::bigint FROM pgrdf._pgrdf_quads
WHERE graph_id = $1 AND is_inferred = FALSE",
&[g.into()],
)
.unwrap()
.unwrap();
assert_eq!(
base_still_there, 2,
"base triples must remain after materialize"
);
}
}