use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::types::facts::FactRecord;
use anyhow::{Context, Result};
use redb::{Database, ReadableTable, TableDefinition};
use xxhash_rust::xxh3::Xxh3;
const FACTS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("facts");
pub fn fact_hash(subject: &str, predicate: &str, object: &str) -> u64 {
let mut h = Xxh3::new();
for part in [subject, predicate, object] {
h.update(&(part.len() as u64).to_le_bytes());
h.update(part.as_bytes());
}
h.digest()
}
pub fn new_fact(
subject: impl Into<String>,
predicate: impl Into<String>,
object: impl Into<String>,
index_id: impl Into<String>,
) -> FactRecord {
let subject = subject.into();
let predicate = predicate.into();
let object = object.into();
FactRecord {
id: fact_hash(&subject, &predicate, &object),
subject,
predicate,
object,
confidence: 1.0,
provenance: Vec::new(),
index_id: index_id.into(),
created_at: now_secs(),
}
}
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[derive(Clone)]
pub struct FactStore {
db: Arc<Database>,
}
impl FactStore {
pub fn open(path: &Path) -> Result<Self> {
let db = Database::create(path).context("open facts redb")?;
let txn = db.begin_write().context("begin facts init txn")?;
{
let _t = txn
.open_table(FACTS_TABLE)
.context("open facts table for init")?;
}
txn.commit().context("commit facts init txn")?;
Ok(Self { db: Arc::new(db) })
}
pub fn upsert(&self, mut fact: FactRecord) -> Result<()> {
fact.id = fact_hash(&fact.subject, &fact.predicate, &fact.object);
fact.confidence = fact.confidence.clamp(0.0, 1.0);
let txn = self.db.begin_write().context("begin upsert txn")?;
{
let mut table = txn
.open_table(FACTS_TABLE)
.context("open facts table for upsert")?;
let merged =
if let Some(existing_bytes) = table.get(fact.id).context("read existing fact")? {
let existing: FactRecord = serde_json::from_slice(existing_bytes.value())
.context("decode existing fact for merge")?;
let mut prov_set: HashSet<String> = existing.provenance.into_iter().collect();
for p in &fact.provenance {
prov_set.insert(p.clone());
}
let mut provenance: Vec<String> = prov_set.into_iter().collect();
provenance.sort();
FactRecord {
id: fact.id,
subject: fact.subject,
predicate: fact.predicate,
object: fact.object,
confidence: fact.confidence,
provenance,
index_id: fact.index_id,
created_at: existing.created_at,
}
} else {
fact
};
let bytes = serde_json::to_vec(&merged).context("encode fact")?;
table
.insert(merged.id, bytes.as_slice())
.context("insert fact")?;
}
txn.commit().context("commit upsert txn")?;
Ok(())
}
pub fn query(
&self,
subject: Option<&str>,
predicate: Option<&str>,
object: Option<&str>,
) -> Result<Vec<FactRecord>> {
let txn = self.db.begin_read().context("begin query txn")?;
let table = txn
.open_table(FACTS_TABLE)
.context("open facts table for query")?;
let mut out = Vec::new();
for row in table.iter().context("iter facts")? {
let (_, v) = row.context("read fact row")?;
let fact: FactRecord =
serde_json::from_slice(v.value()).context("decode fact during query")?;
if let Some(s) = subject {
if fact.subject != s {
continue;
}
}
if let Some(p) = predicate {
if fact.predicate != p {
continue;
}
}
if let Some(o) = object {
if fact.object != o {
continue;
}
}
out.push(fact);
}
Ok(out)
}
pub fn all(&self) -> Result<Vec<FactRecord>> {
self.query(None, None, None)
}
pub fn delete(&self, id: u64) -> Result<bool> {
let txn = self.db.begin_write().context("begin delete txn")?;
let removed = {
let mut table = txn
.open_table(FACTS_TABLE)
.context("open facts table for delete")?;
let was_present = table.remove(id).context("delete fact")?.is_some();
was_present
};
txn.commit().context("commit delete txn")?;
Ok(removed)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn make_store() -> (FactStore, TempDir) {
let tmp = TempDir::new().expect("tempdir");
let path = tmp.path().join("facts.redb");
let store = FactStore::open(&path).expect("open facts store");
(store, tmp)
}
#[test]
fn upsert_and_query_by_subject() {
let (store, _tmp) = make_store();
let mut f = new_fact("fn search", "implements", "trait Searcher", "test");
f.confidence = 0.9;
f.provenance = vec!["src/indexer.rs:1:10".into()];
store.upsert(f).unwrap();
let hits = store.query(Some("fn search"), None, None).unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].object, "trait Searcher");
}
#[test]
fn upsert_dedupes_and_merges_provenance() {
let (store, _tmp) = make_store();
let mut a = new_fact("X", "implements", "Y", "i1");
a.provenance = vec!["c1".into()];
store.upsert(a).unwrap();
let mut b = new_fact("X", "implements", "Y", "i1");
b.provenance = vec!["c2".into()];
store.upsert(b).unwrap();
let all = store.all().unwrap();
assert_eq!(all.len(), 1);
let mut prov = all[0].provenance.clone();
prov.sort();
assert_eq!(prov, vec!["c1".to_string(), "c2".to_string()]);
}
#[test]
fn confidence_clamps_on_upsert() {
let (store, _tmp) = make_store();
let mut f = new_fact("a", "b", "c", "i");
f.confidence = 2.5;
store.upsert(f).unwrap();
assert_eq!(store.all().unwrap()[0].confidence, 1.0);
}
#[test]
fn fact_hash_is_stable_and_unambiguous() {
let h1 = fact_hash("a", "b", "c");
let h2 = fact_hash("a", "b", "c");
assert_eq!(h1, h2);
assert_ne!(fact_hash("ab", "", "c"), fact_hash("a", "b", "c"));
assert_ne!(fact_hash("a", "b", "c"), fact_hash("c", "b", "a"));
}
#[test]
fn delete_returns_true_then_false() {
let (store, _tmp) = make_store();
let f = new_fact("X", "y", "Z", "i");
let id = f.id;
store.upsert(f).unwrap();
assert!(store.delete(id).unwrap());
assert!(!store.delete(id).unwrap());
}
}