pgrdf 0.3.0

Rust-native PostgreSQL extension for RDF, SPARQL, SHACL and OWL reasoning
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
//! Turtle ingestion.
//!
//! Phase 2.2: per-call in-process dict cache + batched quad INSERTs.
//!   * HashMap<(value, type, datatype_id, language) -> id> keyed
//!     dictionary cache across one ingest call. Common terms
//!     (predicates, repeated subjects, common datatype IRIs) resolve
//!     to a cached id after the first lookup instead of a fresh
//!     scalar-subquery SELECT.
//!   * Quad inserts buffer until BATCH_SIZE then flush as a single
//!     `INSERT ... SELECT FROM unnest($1::bigint[], $2::bigint[],
//!     $3::bigint[])` — one SPI call per BATCH_SIZE tuples instead
//!     of one per triple.
//!
//! Phase 3 step 1: every fall-through to `put_term_full` first checks
//! the cross-backend shmem cache from `super::shmem_cache`. The
//! loader observes the global HITS counter around the call to
//! attribute hits to the current ingest in its verbose stats.
//!
//! The COPY ... FROM STDIN (FORMAT BINARY) fast path from
//! SPEC.pgRDF.LLD.v0.2 §4.3 needs lower-level Postgres integration
//! than pgrx 0.16 exposes cleanly. Tracked for Phase 3 step 3.

use crate::query::plan_cache;
use crate::storage::dict::{put_term_full, term_type};
use crate::storage::shmem_cache;
use oxrdf::{NamedOrBlankNode, Term};
use oxttl::TurtleParser;
use pgrx::datum::DatumWithOid;
use pgrx::pg_sys::{Oid, PgBuiltInOids};
use pgrx::prelude::*;
use serde_json::json;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Read};
use std::mem;
use std::sync::atomic::Ordering;
use std::time::Instant;

/// Quads buffered before each `INSERT ... unnest` flush. 1000 keeps
/// the array parameters comfortably below Postgres' 1 GB datum
/// ceiling while amortising the SPI round-trip cost.
const BATCH_SIZE: usize = 1000;

/// Static SQL for the batched quad flush. Phase 3 step 3 (LLD §4.3,
/// phase A): the string is constant, so a single prepared statement
/// — stashed in the per-backend `plan_cache` from Phase 3 step 2 —
/// is reused for every flush, in every load call, for the rest of
/// the backend's lifetime. Saves one parse+plan per batch.
const QUAD_INSERT_SQL: &str = "INSERT INTO pgrdf._pgrdf_quads \
    (subject_id, predicate_id, object_id, graph_id) \
    SELECT s, p, o, $4 \
      FROM unnest($1::bigint[], $2::bigint[], $3::bigint[]) AS t(s, p, o)";

type DictKey = (i16, String, Option<i64>, Option<String>);

#[derive(Default)]
struct LoaderStats {
    triples: i64,
    /// Term references that resolved out of the per-call HashMap cache.
    dict_cache_hits: i64,
    /// Term references that fell through the per-call HashMap and
    /// were satisfied by the cross-backend shmem cache without
    /// touching `_pgrdf_dictionary` (LLD §4.1).
    shmem_cache_hits: i64,
    /// Term references that fell through to `put_term_full` and hit
    /// the underlying _pgrdf_dictionary (either a select-hit or an
    /// insert).
    dict_db_calls: i64,
    quad_batches: i64,
    elapsed_ms: f64,
}

/// Resolve a term to its dictionary id, caching the result for the
/// remainder of the current ingest call.
fn intern_term(
    cache: &mut HashMap<DictKey, i64>,
    stats: &mut LoaderStats,
    value: &str,
    term_type: i16,
    datatype_id: Option<i64>,
    language: Option<&str>,
) -> i64 {
    let key = (
        term_type,
        value.to_string(),
        datatype_id,
        language.map(str::to_string),
    );
    if let Some(&id) = cache.get(&key) {
        stats.dict_cache_hits += 1;
        return id;
    }
    // Snapshot the global shmem-hit counter so we can attribute
    // hits to this individual put_term_full call. Atomics are
    // cheap; this stays well under the per-lookup µs budget.
    let hits_before = if shmem_cache::is_ready() {
        shmem_cache::HITS.get().load(Ordering::Relaxed)
    } else {
        0
    };
    let id = put_term_full(value, term_type, datatype_id, language);
    let hits_after = if shmem_cache::is_ready() {
        shmem_cache::HITS.get().load(Ordering::Relaxed)
    } else {
        0
    };
    if hits_after > hits_before {
        stats.shmem_cache_hits += 1;
    } else {
        stats.dict_db_calls += 1;
    }
    cache.insert(key, id);
    id
}

fn subject_to_id(
    s: &NamedOrBlankNode,
    cache: &mut HashMap<DictKey, i64>,
    stats: &mut LoaderStats,
) -> i64 {
    match s {
        NamedOrBlankNode::NamedNode(n) => {
            intern_term(cache, stats, n.as_str(), term_type::URI, None, None)
        }
        NamedOrBlankNode::BlankNode(b) => {
            intern_term(cache, stats, b.as_str(), term_type::BLANK_NODE, None, None)
        }
    }
}

fn object_to_id(t: &Term, cache: &mut HashMap<DictKey, i64>, stats: &mut LoaderStats) -> i64 {
    match t {
        Term::NamedNode(n) => intern_term(cache, stats, n.as_str(), term_type::URI, None, None),
        Term::BlankNode(b) => {
            intern_term(cache, stats, b.as_str(), term_type::BLANK_NODE, None, None)
        }
        Term::Literal(lit) => {
            let lang = lit.language();
            let datatype_id = if lang.is_some() {
                None
            } else {
                Some(intern_term(
                    cache,
                    stats,
                    lit.datatype().as_str(),
                    term_type::URI,
                    None,
                    None,
                ))
            };
            intern_term(
                cache,
                stats,
                lit.value(),
                term_type::LITERAL,
                datatype_id,
                lang,
            )
        }
        #[allow(unreachable_patterns)]
        _ => panic!("load_turtle: unsupported object term (RDF-star not in v0.2 scope)"),
    }
}

/// Flush a buffered batch of quads to the partitioned hexastore via
/// the cached prepared `INSERT ... unnest` statement.
///
/// On first call in a backend the SQL is prepared and `keep()`-ed
/// into `plan_cache`; every subsequent call (in this load and in
/// future loads) is a pure execute. Moves the buffer Vecs into
/// Postgres-side arrays so we don't pay a clone; callers see empty
/// Vecs after this returns.
fn flush_batch(
    batch_s: &mut Vec<i64>,
    batch_p: &mut Vec<i64>,
    batch_o: &mut Vec<i64>,
    graph_id: i64,
    stats: &mut LoaderStats,
) {
    if batch_s.is_empty() {
        return;
    }
    let s_arr = mem::take(batch_s);
    let p_arr = mem::take(batch_p);
    let o_arr = mem::take(batch_o);
    let int8_oid: Oid = PgBuiltInOids::INT8OID.into();
    let int8_array_oid: Oid = PgBuiltInOids::INT8ARRAYOID.into();

    Spi::connect_mut(|client| {
        // Prepare-once / reuse-many via the per-backend plan cache.
        // Same mechanism as the SPARQL executor (Phase 3 step 2);
        // keyed on the SQL string which is `QUAD_INSERT_SQL`
        // verbatim.
        if !plan_cache::contains(QUAD_INSERT_SQL) {
            let arg_oids = vec![
                PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID),
                PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID),
                PgOid::BuiltIn(PgBuiltInOids::INT8ARRAYOID),
                PgOid::BuiltIn(PgBuiltInOids::INT8OID),
            ];
            let prepared = client
                .prepare_mut(QUAD_INSERT_SQL, &arg_oids)
                .expect("flush_batch: prepare failed")
                .keep();
            plan_cache::insert(QUAD_INSERT_SQL.to_string(), prepared);
            plan_cache::record_miss();
        } else {
            plan_cache::record_hit();
        }

        // Build Datums for the cached plan. SAFETY: the (value, oid)
        // pairs match by construction (Vec<i64>/INT8ARRAYOID,
        // i64/INT8OID).
        let datums: Vec<DatumWithOid<'_>> = unsafe {
            vec![
                DatumWithOid::new(s_arr, int8_array_oid),
                DatumWithOid::new(p_arr, int8_array_oid),
                DatumWithOid::new(o_arr, int8_array_oid),
                DatumWithOid::new(graph_id, int8_oid),
            ]
        };

        plan_cache::with_plan(QUAD_INSERT_SQL, |maybe_owned| {
            let owned = maybe_owned.expect("load_turtle: plan must be in cache after insert");
            client
                .update(owned, None, &datums)
                .expect("flush_batch: prepared insert failed");
        });
    });
    stats.quad_batches += 1;
}

/// Core ingest loop. Shared by load_turtle / parse_turtle and their
/// _verbose variants.
fn ingest_turtle_with_stats<R: Read>(
    reader: R,
    graph_id: i64,
    base_iri: Option<&str>,
) -> LoaderStats {
    let mut parser = TurtleParser::new();
    if let Some(base) = base_iri {
        parser = parser
            .with_base_iri(base)
            .unwrap_or_else(|e| panic!("load_turtle: invalid base IRI {base:?}: {e}"));
    }
    let parser = parser.for_reader(reader);

    let start = Instant::now();
    let mut cache: HashMap<DictKey, i64> = HashMap::new();
    let mut stats = LoaderStats::default();
    let mut batch_s: Vec<i64> = Vec::with_capacity(BATCH_SIZE);
    let mut batch_p: Vec<i64> = Vec::with_capacity(BATCH_SIZE);
    let mut batch_o: Vec<i64> = Vec::with_capacity(BATCH_SIZE);

    for triple_result in parser {
        let triple = triple_result.expect("load_turtle: turtle parse error");
        let s = subject_to_id(&triple.subject, &mut cache, &mut stats);
        let p = intern_term(
            &mut cache,
            &mut stats,
            triple.predicate.as_str(),
            term_type::URI,
            None,
            None,
        );
        let o = object_to_id(&triple.object, &mut cache, &mut stats);
        batch_s.push(s);
        batch_p.push(p);
        batch_o.push(o);
        stats.triples += 1;
        if batch_s.len() >= BATCH_SIZE {
            flush_batch(
                &mut batch_s,
                &mut batch_p,
                &mut batch_o,
                graph_id,
                &mut stats,
            );
        }
    }
    flush_batch(
        &mut batch_s,
        &mut batch_p,
        &mut batch_o,
        graph_id,
        &mut stats,
    );
    stats.elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
    stats
}

fn stats_to_jsonb(stats: &LoaderStats) -> pgrx::JsonB {
    pgrx::JsonB(json!({
        "triples":          stats.triples,
        "dict_cache_hits":  stats.dict_cache_hits,
        "shmem_cache_hits": stats.shmem_cache_hits,
        "dict_db_calls":    stats.dict_db_calls,
        "quad_batches":     stats.quad_batches,
        "elapsed_ms":       stats.elapsed_ms,
    }))
}

// ─────────────────────────────────────────────────────────────────────
// UDF surface
// ─────────────────────────────────────────────────────────────────────

/// Load a Turtle file from a server-side path into the named graph.
/// Returns the number of triples inserted. `base_iri` resolves
/// relative IRIs; pass NULL or '' for absolute-IRI-only files.
///
/// SQL: `pgrdf.load_turtle(path TEXT, graph_id BIGINT, base_iri TEXT DEFAULT NULL) -> BIGINT`.
#[pg_extern]
fn load_turtle(path: &str, graph_id: i64, base_iri: default!(Option<&str>, "NULL")) -> i64 {
    let file =
        File::open(path).unwrap_or_else(|e| panic!("load_turtle: failed to open {path:?}: {e}"));
    let base = base_iri.filter(|s| !s.is_empty());
    ingest_turtle_with_stats(BufReader::new(file), graph_id, base).triples
}

/// Same as `load_turtle` but returns JSONB stats: triples,
/// dict_cache_hits, shmem_cache_hits, dict_db_calls, quad_batches,
/// elapsed_ms. Useful for measuring whether the cache + batching
/// paths are firing.
///
/// SQL: `pgrdf.load_turtle_verbose(path TEXT, graph_id BIGINT, base_iri TEXT DEFAULT NULL) -> JSONB`.
#[pg_extern]
fn load_turtle_verbose(
    path: &str,
    graph_id: i64,
    base_iri: default!(Option<&str>, "NULL"),
) -> pgrx::JsonB {
    let file = File::open(path)
        .unwrap_or_else(|e| panic!("load_turtle_verbose: failed to open {path:?}: {e}"));
    let base = base_iri.filter(|s| !s.is_empty());
    let stats = ingest_turtle_with_stats(BufReader::new(file), graph_id, base);
    stats_to_jsonb(&stats)
}

/// Parse Turtle from a string. Same semantics as `load_turtle` for
/// dict caching and batched inserts, just with an in-memory source.
///
/// SQL: `pgrdf.parse_turtle(content TEXT, graph_id BIGINT, base_iri TEXT DEFAULT NULL) -> BIGINT`.
#[pg_extern]
fn parse_turtle(content: &str, graph_id: i64, base_iri: default!(Option<&str>, "NULL")) -> i64 {
    let base = base_iri.filter(|s| !s.is_empty());
    ingest_turtle_with_stats(content.as_bytes(), graph_id, base).triples
}

/// Verbose variant of `parse_turtle` returning JSONB stats.
#[pg_extern]
fn parse_turtle_verbose(
    content: &str,
    graph_id: i64,
    base_iri: default!(Option<&str>, "NULL"),
) -> pgrx::JsonB {
    let base = base_iri.filter(|s| !s.is_empty());
    let stats = ingest_turtle_with_stats(content.as_bytes(), graph_id, base);
    stats_to_jsonb(&stats)
}

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
    use pgrx::prelude::*;

    /// parse_turtle on a tiny FOAF graph reports the expected triple
    /// count and the dictionary contains the well-known IRIs.
    #[pg_test]
    fn parse_turtle_basic() {
        // Five triples:
        //   ex:alice rdf:type   foaf:Person
        //   ex:alice foaf:name  "Alice"
        //   ex:alice foaf:mbox  <mailto:alice@example.com>
        //   ex:alice foaf:knows ex:bob
        //   ex:bob   rdf:type   foaf:Person
        let ttl = r#"
            @prefix ex:   <http://example.com/> .
            @prefix foaf: <http://xmlns.com/foaf/0.1/> .
            ex:alice a foaf:Person ;
                     foaf:name "Alice" ;
                     foaf:mbox <mailto:alice@example.com> ;
                     foaf:knows ex:bob .
            ex:bob   a foaf:Person .
        "#;

        let n: i64 = Spi::get_one_with_args(
            "SELECT pgrdf.parse_turtle($1, $2)",
            &[ttl.into(), 7_001i64.into()],
        )
        .unwrap()
        .unwrap();
        assert_eq!(n, 5);

        let by_graph: i64 =
            Spi::get_one_with_args("SELECT pgrdf.count_quads($1)", &[7_001i64.into()])
                .unwrap()
                .unwrap();
        assert_eq!(by_graph, 5);

        let person: Option<i64> = Spi::get_one(
            "SELECT (SELECT id FROM pgrdf._pgrdf_dictionary
                      WHERE term_type = 1
                        AND lexical_value = 'http://xmlns.com/foaf/0.1/Person')",
        )
        .unwrap();
        assert!(person.is_some());
    }

    /// Datatypes round-trip into the dictionary.
    #[pg_test]
    fn parse_turtle_typed_literal() {
        let ttl = r#"
            @prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
            @prefix ex:  <http://example.com/> .
            ex:n ex:age "42"^^xsd:integer .
        "#;
        let n: i64 = Spi::get_one_with_args(
            "SELECT pgrdf.parse_turtle($1, $2)",
            &[ttl.into(), 7_002i64.into()],
        )
        .unwrap()
        .unwrap();
        assert_eq!(n, 1);

        let dt: Option<i64> = Spi::get_one(
            "SELECT (SELECT id FROM pgrdf._pgrdf_dictionary
                      WHERE term_type = 1
                        AND lexical_value = 'http://www.w3.org/2001/XMLSchema#integer')",
        )
        .unwrap();
        assert!(dt.is_some());
    }

    /// Cache fires on repeated subjects + predicates within a single
    /// ingest call. Three FOAF-shape triples share both subject and
    /// predicate, so after the first triple's three DB calls the
    /// other two should be entirely cached except for distinct objects.
    #[pg_test]
    fn parse_turtle_verbose_cache_fires() {
        let ttl = r#"
            @prefix ex:   <http://example.com/> .
            ex:s ex:p ex:o1 .
            ex:s ex:p ex:o2 .
            ex:s ex:p ex:o3 .
        "#;
        let j: pgrx::JsonB = Spi::get_one_with_args(
            "SELECT pgrdf.parse_turtle_verbose($1, $2)",
            &[ttl.into(), 7_003i64.into()],
        )
        .unwrap()
        .unwrap();
        let v = &j.0;
        assert_eq!(v["triples"], 3);
        // 3 triples × 3 terms each = 9 references; 5 distinct
        // (s, p, o1, o2, o3) -> 5 fall-throughs and 4 hashmap hits.
        // Of the 5 fall-throughs every shmem-vs-db split is allowed
        // (depends on prior tests in this postmaster), so only the
        // sum is invariant.
        assert_eq!(v["dict_cache_hits"], 4);
        let shmem_hits = v["shmem_cache_hits"].as_i64().unwrap();
        let db_calls = v["dict_db_calls"].as_i64().unwrap();
        assert_eq!(shmem_hits + db_calls, 5);
        assert_eq!(v["quad_batches"], 1);
    }
}