use pgrx::prelude::*;
use pgrx::spi::OwnedPreparedStatement;
use pgrx::{pg_shmem_init, PgAtomic};
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
pub(crate) static HITS: PgAtomic<AtomicU64> = unsafe { PgAtomic::new(c"pgrdf_plan_cache_hits") };
pub(crate) static MISSES: PgAtomic<AtomicU64> =
unsafe { PgAtomic::new(c"pgrdf_plan_cache_misses") };
pub(crate) static INSERTS: PgAtomic<AtomicU64> =
unsafe { PgAtomic::new(c"pgrdf_plan_cache_inserts") };
pub fn init_in_postmaster() {
pg_shmem_init!(HITS);
pg_shmem_init!(MISSES);
pg_shmem_init!(INSERTS);
}
thread_local! {
static PLANS: RefCell<HashMap<String, OwnedPreparedStatement>> =
RefCell::new(HashMap::new());
}
pub fn contains(sql: &str) -> bool {
PLANS.with(|c| c.borrow().contains_key(sql))
}
pub fn insert(sql: String, plan: OwnedPreparedStatement) {
PLANS.with(|c| {
c.borrow_mut().insert(sql, plan);
});
INSERTS.get().fetch_add(1, Ordering::Relaxed);
}
pub fn local_size() -> usize {
PLANS.with(|c| c.borrow().len())
}
pub fn record_hit() {
HITS.get().fetch_add(1, Ordering::Relaxed);
}
pub fn record_miss() {
MISSES.get().fetch_add(1, Ordering::Relaxed);
}
pub fn with_plan<R>(sql: &str, f: impl FnOnce(Option<&OwnedPreparedStatement>) -> R) -> R {
PLANS.with(|c| {
let map = c.borrow();
f(map.get(sql))
})
}
#[pg_extern]
fn plan_cache_clear() -> i64 {
let dropped = PLANS.with(|c| {
let mut m = c.borrow_mut();
let n = m.len();
m.clear();
n
});
dropped as i64
}
pub struct Snapshot {
pub hits: u64,
pub misses: u64,
pub inserts: u64,
pub local_size: usize,
}
pub fn snapshot() -> Snapshot {
let ready = crate::storage::shmem_cache::is_ready();
Snapshot {
hits: if ready {
HITS.get().load(Ordering::Relaxed)
} else {
0
},
misses: if ready {
MISSES.get().load(Ordering::Relaxed)
} else {
0
},
inserts: if ready {
INSERTS.get().load(Ordering::Relaxed)
} else {
0
},
local_size: local_size(),
}
}
#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
use pgrx::prelude::*;
#[pg_test]
fn plan_cache_repeats_hit() {
Spi::run("SELECT pgrdf.plan_cache_clear()").unwrap();
Spi::run("SELECT pgrdf.add_graph(8100)").unwrap();
Spi::run(
"SELECT pgrdf.parse_turtle(
'@prefix ex: <http://example.com/> . ex:a ex:p ex:b .', 8100)",
)
.unwrap();
let size_before: i64 =
Spi::get_one("SELECT (pgrdf.stats()->>'plan_cache_local_size')::bigint")
.unwrap()
.unwrap();
let inserts_before: i64 =
Spi::get_one("SELECT (pgrdf.stats()->>'plan_cache_inserts')::bigint")
.unwrap()
.unwrap();
let _ = Spi::run("SELECT count(*) FROM pgrdf.sparql('SELECT ?s WHERE { ?s ?p ?o }')");
let size_after_first: i64 =
Spi::get_one("SELECT (pgrdf.stats()->>'plan_cache_local_size')::bigint")
.unwrap()
.unwrap();
let inserts_after_first: i64 =
Spi::get_one("SELECT (pgrdf.stats()->>'plan_cache_inserts')::bigint")
.unwrap()
.unwrap();
assert_eq!(
size_after_first - size_before,
1,
"first SPARQL call must populate one new cache slot"
);
assert_eq!(
inserts_after_first - inserts_before,
1,
"first call must bump the cumulative insert counter by 1"
);
let _ = Spi::run("SELECT count(*) FROM pgrdf.sparql('SELECT ?s WHERE { ?s ?p ?o }')");
let size_after_second: i64 =
Spi::get_one("SELECT (pgrdf.stats()->>'plan_cache_local_size')::bigint")
.unwrap()
.unwrap();
let inserts_after_second: i64 =
Spi::get_one("SELECT (pgrdf.stats()->>'plan_cache_inserts')::bigint")
.unwrap()
.unwrap();
assert_eq!(
size_after_second, size_after_first,
"second identical call must NOT add a slot"
);
assert_eq!(
inserts_after_second, inserts_after_first,
"second call must not bump the cumulative insert counter"
);
}
#[pg_test]
fn plan_cache_clear_returns_count() {
Spi::run("SELECT pgrdf.plan_cache_clear()").unwrap();
Spi::run("SELECT pgrdf.add_graph(8101)").unwrap();
Spi::run(
"SELECT pgrdf.parse_turtle(
'@prefix ex: <http://example.com/> . ex:a ex:p ex:b .', 8101)",
)
.unwrap();
let _ = Spi::run("SELECT count(*) FROM pgrdf.sparql('SELECT ?s WHERE { ?s ?p ?o }')");
let _ = Spi::run(
"SELECT count(*) FROM pgrdf.sparql(
'SELECT ?s ?o WHERE { ?s ?p ?o }')",
);
let dropped: i64 = Spi::get_one("SELECT pgrdf.plan_cache_clear()")
.unwrap()
.unwrap();
assert!(dropped >= 2, "should have at least two cached plans");
let after: i64 = Spi::get_one("SELECT (pgrdf.stats()->>'plan_cache_local_size')::bigint")
.unwrap()
.unwrap();
assert_eq!(after, 0);
}
}