use anyhow::Context;
use redb::{ReadableDatabase, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use super::CorpusStore;
const KG_CONTRIB_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("kg_contrib");
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ContribNode {
pub id: String,
pub kind: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ContribEdge {
pub from: String,
pub to: String,
#[serde(default)]
pub kind: Option<String>,
#[serde(default)]
pub tag: Option<String>,
#[serde(default)]
pub provenance: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub linked_server: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ContribGraph {
pub producer: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub producer_version: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub git_sha: Option<String>,
#[serde(default)]
pub nodes: Vec<ContribNode>,
#[serde(default)]
pub edges: Vec<ContribEdge>,
}
impl CorpusStore {
pub fn save_contrib_graph(&self, graph: &ContribGraph) -> anyhow::Result<bool> {
let bytes = serde_json::to_vec(graph).context("serialize contrib graph")?;
let txn = self.db.begin_write().context("begin contrib write txn")?;
let replaced;
{
let mut table = txn
.open_table(KG_CONTRIB_TABLE)
.context("open kg_contrib table")?;
replaced = table
.insert(graph.producer.as_str(), bytes.as_slice())
.context("insert contrib graph")?
.is_some();
}
txn.commit().context("commit contrib write txn")?;
Ok(replaced)
}
pub fn load_contrib_graphs(&self) -> anyhow::Result<Vec<ContribGraph>> {
let txn = self.db.begin_read().context("begin contrib read txn")?;
let table = match txn.open_table(KG_CONTRIB_TABLE) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
Err(e) => return Err(e).context("open kg_contrib table"),
};
let mut out: Vec<ContribGraph> = Vec::new();
for entry in table.iter().context("iterate kg_contrib")? {
let (_, value) = entry.context("read kg_contrib row")?;
let graph: ContribGraph =
serde_json::from_slice(value.value()).context("deserialize contrib graph")?;
out.push(graph);
}
out.sort_by(|a, b| a.producer.cmp(&b.producer));
Ok(out)
}
pub fn delete_contrib_graph(&self, producer: &str) -> anyhow::Result<bool> {
let txn = self.db.begin_write().context("begin contrib delete txn")?;
let existed;
{
let mut table = txn
.open_table(KG_CONTRIB_TABLE)
.context("open kg_contrib table")?;
existed = table
.remove(producer)
.context("remove contrib graph")?
.is_some();
}
txn.commit().context("commit contrib delete txn")?;
Ok(existed)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn store() -> (tempfile::TempDir, CorpusStore) {
let dir = tempfile::tempdir().expect("tempdir");
let store = CorpusStore::open(&dir.path().join("corpus.redb")).expect("open corpus");
(dir, store)
}
fn sample(producer: &str, edge_to: &str) -> ContribGraph {
ContribGraph {
producer: producer.to_string(),
producer_version: Some("0.1.0".into()),
git_sha: Some("abc123".into()),
nodes: vec![
ContribNode {
id: "dbo.usp_x".into(),
kind: "proc".into(),
},
ContribNode {
id: edge_to.into(),
kind: "table".into(),
},
],
edges: vec![ContribEdge {
from: "dbo.usp_x".into(),
to: edge_to.into(),
kind: Some("writes".into()),
tag: Some("custom:writes_table".into()),
provenance: vec!["a.sql".into()],
linked_server: None,
}],
}
}
#[test]
fn contrib_round_trip() {
let (_dir, store) = store();
let g = sample("navigatsql", "dbo.orders");
store.save_contrib_graph(&g).expect("save");
let loaded = store.load_contrib_graphs().expect("load");
assert_eq!(loaded, vec![g]);
}
#[test]
fn contrib_missing_table_is_empty() {
let (_dir, store) = store();
assert!(store.load_contrib_graphs().expect("load").is_empty());
}
#[test]
fn contrib_replace_per_producer_drops_old_rows() {
let (_dir, store) = store();
let replaced = store
.save_contrib_graph(&sample("navigatsql", "dbo.orders"))
.expect("save v1");
assert!(!replaced, "first save must not report a replacement");
let v2 = sample("navigatsql", "dbo.customers");
let replaced = store.save_contrib_graph(&v2).expect("save v2");
assert!(replaced, "second save must report the replacement");
let loaded = store.load_contrib_graphs().expect("load");
assert_eq!(loaded, vec![v2]);
assert!(!loaded[0].edges.iter().any(|e| e.to == "dbo.orders"));
}
#[test]
fn contrib_multi_producer_sorted_and_isolated() {
let (_dir, store) = store();
store
.save_contrib_graph(&sample("zeta", "dbo.t2"))
.expect("save zeta");
store
.save_contrib_graph(&sample("alpha", "dbo.t1"))
.expect("save alpha");
let loaded = store.load_contrib_graphs().expect("load");
assert_eq!(loaded.len(), 2);
assert_eq!(loaded[0].producer, "alpha");
assert_eq!(loaded[1].producer, "zeta");
}
#[test]
fn contrib_delete_removes_producer() {
let (_dir, store) = store();
store
.save_contrib_graph(&sample("navigatsql", "dbo.orders"))
.expect("save");
assert!(store.delete_contrib_graph("navigatsql").expect("delete"));
assert!(!store.delete_contrib_graph("navigatsql").expect("re-delete"));
assert!(store.load_contrib_graphs().expect("load").is_empty());
}
#[test]
fn contrib_survives_reopen() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("corpus.redb");
{
let store = CorpusStore::open(&path).expect("open");
store
.save_contrib_graph(&sample("navigatsql", "dbo.orders"))
.expect("save");
}
let store = CorpusStore::open(&path).expect("reopen");
let loaded = store.load_contrib_graphs().expect("load");
assert_eq!(loaded.len(), 1);
assert_eq!(loaded[0].producer, "navigatsql");
}
}