use crate::query::plan_cache;
use crate::storage::dict::{put_term_full, term_type};
use crate::storage::shmem_cache;
use oxrdf::{NamedOrBlankNode, Term};
use oxttl::TurtleParser;
use pgrx::datum::DatumWithOid;
use pgrx::pg_sys::{Oid, PgBuiltInOids};
use pgrx::prelude::*;
use serde_json::json;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Read};
use std::mem;
use std::sync::atomic::Ordering;
use std::time::Instant;
const BATCH_SIZE: usize = 1000;
const QUAD_INSERT_SQL: &str = "INSERT INTO pgrdf._pgrdf_quads \
(subject_id, predicate_id, object_id, graph_id) \
SELECT s, p, o, $4 \
FROM unnest($1::bigint[], $2::bigint[], $3::bigint[]) AS t(s, p, o)";
type DictKey = (i16, String, Option<i64>, Option<String>);
#[derive(Default)]
struct LoaderStats {
triples: i64,
dict_cache_hits: i64,
shmem_cache_hits: i64,
dict_db_calls: i64,
quad_batches: i64,
elapsed_ms: f64,
}
fn intern_term(
cache: &mut HashMap<DictKey, i64>,
stats: &mut LoaderStats,
value: &str,
term_type: i16,
datatype_id: Option<i64>,
language: Option<&str>,
) -> i64 {
let key = (
term_type,
value.to_string(),
datatype_id,
language.map(str::to_string),
);
if let Some(&id) = cache.get(&key) {
stats.dict_cache_hits += 1;
return id;
}
let hits_before = if shmem_cache::is_ready() {
shmem_cache::HITS.get().load(Ordering::Relaxed)
} else {
0
};
let id = put_term_full(value, term_type, datatype_id, language);
let hits_after = if shmem_cache::is_ready() {
shmem_cache::HITS.get().load(Ordering::Relaxed)
} else {
0
};
if hits_after > hits_before {
stats.shmem_cache_hits += 1;
} else {
stats.dict_db_calls += 1;
}
cache.insert(key, id);
id
}
fn subject_to_id(
s: &NamedOrBlankNode,
cache: &mut HashMap<DictKey, i64>,
stats: &mut LoaderStats,
) -> i64 {
match s {
NamedOrBlankNode::NamedNode(n) => {
intern_term(cache, stats, n.as_str(), term_type::URI, None, None)
}
NamedOrBlankNode::BlankNode(b) => {
intern_term(cache, stats, b.as_str(), term_type::BLANK_NODE, None, None)
}
}
}
fn object_to_id(t: &Term, cache: &mut HashMap<DictKey, i64>, stats: &mut LoaderStats) -> i64 {
match t {
Term::NamedNode(n) => intern_term(cache, stats, n.as_str(), term_type::URI, None, None),
Term::BlankNode(b) => {
intern_term(cache, stats, b.as_str(), term_type::BLANK_NODE, None, None)
}
Term::Literal(lit) => {
let lang = lit.language();
let datatype_id = if lang.is_some() {
None
} else {
Some(intern_term(
cache,
stats,
lit.datatype().as_str(),
term_type::URI,
None,
None,
))
};
intern_term(
cache,
stats,
lit.value(),
term_type::LITERAL,
datatype_id,
lang,
)
}
#[allow(unreachable_patterns)]
_ => panic!("load_turtle: unsupported object term (RDF-star not in v0.2 scope)"),
}
}
fn flush_batch(
batch_s: &mut Vec<i64>,
batch_p: &mut Vec<i64>,
batch_o: &mut Vec<i64>,
graph_id: i64,
stats: &mut LoaderStats,
) {
if batch_s.is_empty() {
return;
}
let s_arr = mem::take(batch_s);
let p_arr = mem::take(batch_p);
let o_arr = mem::take(batch_o);
let int8_oid: Oid = PgBuiltInOids::INT8OID.into();
let int8_array_oid: Oid = PgBuiltInOids::INT8ARRAYOID.into();
Spi::connect_mut(|client| {
if !plan_cache::contains(QUAD_INSERT_SQL) {
let arg_oids = vec![
PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID),
PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID),
PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID),
PgOid::BuiltIn(PgBuiltInOids::INT8OID),
];
let prepared = client
.prepare_mut(QUAD_INSERT_SQL, &arg_oids)
.expect("flush_batch: prepare failed")
.keep();
plan_cache::insert(QUAD_INSERT_SQL.to_string(), prepared);
plan_cache::record_miss();
} else {
plan_cache::record_hit();
}
let datums: Vec<DatumWithOid<'_>> = unsafe {
vec![
DatumWithOid::new(s_arr, int8_array_oid),
DatumWithOid::new(p_arr, int8_array_oid),
DatumWithOid::new(o_arr, int8_array_oid),
DatumWithOid::new(graph_id, int8_oid),
]
};
plan_cache::with_plan(QUAD_INSERT_SQL, |maybe_owned| {
let owned = maybe_owned.expect("load_turtle: plan must be in cache after insert");
client
.update(owned, None, &datums)
.expect("flush_batch: prepared insert failed");
});
});
stats.quad_batches += 1;
}
fn ingest_turtle_with_stats<R: Read>(
reader: R,
graph_id: i64,
base_iri: Option<&str>,
) -> LoaderStats {
let mut parser = TurtleParser::new();
if let Some(base) = base_iri {
parser = parser
.with_base_iri(base)
.unwrap_or_else(|e| panic!("load_turtle: invalid base IRI {base:?}: {e}"));
}
let parser = parser.for_reader(reader);
let start = Instant::now();
let mut cache: HashMap<DictKey, i64> = HashMap::new();
let mut stats = LoaderStats::default();
let mut batch_s: Vec<i64> = Vec::with_capacity(BATCH_SIZE);
let mut batch_p: Vec<i64> = Vec::with_capacity(BATCH_SIZE);
let mut batch_o: Vec<i64> = Vec::with_capacity(BATCH_SIZE);
for triple_result in parser {
let triple = triple_result.expect("load_turtle: turtle parse error");
let s = subject_to_id(&triple.subject, &mut cache, &mut stats);
let p = intern_term(
&mut cache,
&mut stats,
triple.predicate.as_str(),
term_type::URI,
None,
None,
);
let o = object_to_id(&triple.object, &mut cache, &mut stats);
batch_s.push(s);
batch_p.push(p);
batch_o.push(o);
stats.triples += 1;
if batch_s.len() >= BATCH_SIZE {
flush_batch(
&mut batch_s,
&mut batch_p,
&mut batch_o,
graph_id,
&mut stats,
);
}
}
flush_batch(
&mut batch_s,
&mut batch_p,
&mut batch_o,
graph_id,
&mut stats,
);
stats.elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
stats
}
fn stats_to_jsonb(stats: &LoaderStats) -> pgrx::JsonB {
pgrx::JsonB(json!({
"triples": stats.triples,
"dict_cache_hits": stats.dict_cache_hits,
"shmem_cache_hits": stats.shmem_cache_hits,
"dict_db_calls": stats.dict_db_calls,
"quad_batches": stats.quad_batches,
"elapsed_ms": stats.elapsed_ms,
}))
}
#[pg_extern]
fn load_turtle(path: &str, graph_id: i64, base_iri: default!(Option<&str>, "NULL")) -> i64 {
let file =
File::open(path).unwrap_or_else(|e| panic!("load_turtle: failed to open {path:?}: {e}"));
let base = base_iri.filter(|s| !s.is_empty());
ingest_turtle_with_stats(BufReader::new(file), graph_id, base).triples
}
#[pg_extern]
fn load_turtle_verbose(
path: &str,
graph_id: i64,
base_iri: default!(Option<&str>, "NULL"),
) -> pgrx::JsonB {
let file = File::open(path)
.unwrap_or_else(|e| panic!("load_turtle_verbose: failed to open {path:?}: {e}"));
let base = base_iri.filter(|s| !s.is_empty());
let stats = ingest_turtle_with_stats(BufReader::new(file), graph_id, base);
stats_to_jsonb(&stats)
}
#[pg_extern]
fn parse_turtle(content: &str, graph_id: i64, base_iri: default!(Option<&str>, "NULL")) -> i64 {
let base = base_iri.filter(|s| !s.is_empty());
ingest_turtle_with_stats(content.as_bytes(), graph_id, base).triples
}
#[pg_extern]
fn parse_turtle_verbose(
content: &str,
graph_id: i64,
base_iri: default!(Option<&str>, "NULL"),
) -> pgrx::JsonB {
let base = base_iri.filter(|s| !s.is_empty());
let stats = ingest_turtle_with_stats(content.as_bytes(), graph_id, base);
stats_to_jsonb(&stats)
}
#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
use pgrx::prelude::*;
#[pg_test]
fn parse_turtle_basic() {
let ttl = r#"
@prefix ex: <http://example.com/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
ex:alice a foaf:Person ;
foaf:name "Alice" ;
foaf:mbox <mailto:alice@example.com> ;
foaf:knows ex:bob .
ex:bob a foaf:Person .
"#;
let n: i64 = Spi::get_one_with_args(
"SELECT pgrdf.parse_turtle($1, $2)",
&[ttl.into(), 7_001i64.into()],
)
.unwrap()
.unwrap();
assert_eq!(n, 5);
let by_graph: i64 =
Spi::get_one_with_args("SELECT pgrdf.count_quads($1)", &[7_001i64.into()])
.unwrap()
.unwrap();
assert_eq!(by_graph, 5);
let person: Option<i64> = Spi::get_one(
"SELECT (SELECT id FROM pgrdf._pgrdf_dictionary
WHERE term_type = 1
AND lexical_value = 'http://xmlns.com/foaf/0.1/Person')",
)
.unwrap();
assert!(person.is_some());
}
#[pg_test]
fn parse_turtle_typed_literal() {
let ttl = r#"
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix ex: <http://example.com/> .
ex:n ex:age "42"^^xsd:integer .
"#;
let n: i64 = Spi::get_one_with_args(
"SELECT pgrdf.parse_turtle($1, $2)",
&[ttl.into(), 7_002i64.into()],
)
.unwrap()
.unwrap();
assert_eq!(n, 1);
let dt: Option<i64> = Spi::get_one(
"SELECT (SELECT id FROM pgrdf._pgrdf_dictionary
WHERE term_type = 1
AND lexical_value = 'http://www.w3.org/2001/XMLSchema#integer')",
)
.unwrap();
assert!(dt.is_some());
}
#[pg_test]
fn parse_turtle_verbose_cache_fires() {
let ttl = r#"
@prefix ex: <http://example.com/> .
ex:s ex:p ex:o1 .
ex:s ex:p ex:o2 .
ex:s ex:p ex:o3 .
"#;
let j: pgrx::JsonB = Spi::get_one_with_args(
"SELECT pgrdf.parse_turtle_verbose($1, $2)",
&[ttl.into(), 7_003i64.into()],
)
.unwrap()
.unwrap();
let v = &j.0;
assert_eq!(v["triples"], 3);
assert_eq!(v["dict_cache_hits"], 4);
let shmem_hits = v["shmem_cache_hits"].as_i64().unwrap();
let db_calls = v["dict_db_calls"].as_i64().unwrap();
assert_eq!(shmem_hits + db_calls, 5);
assert_eq!(v["quad_batches"], 1);
}
}