use crate::memory_core::palace::Drawer;
use crate::memory_core::store::kg_redb::KgStoreRedb;
use crate::memory_core::store::kg_writer::KgWriter;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use petgraph::algo::{astar, dijkstra};
use petgraph::graph::NodeIndex;
use petgraph::stable_graph::StableGraph;
use petgraph::visit::EdgeRef;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{HashSet, VecDeque};
use std::path::Path;
use std::sync::{Arc, RwLock};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct KgEdge {
pub predicate: String,
pub confidence: f32,
pub provenance: Option<String>,
pub valid_from: DateTime<Utc>,
pub valid_to: Option<DateTime<Utc>>,
}
#[derive(Default)]
struct Adjacency {
graph: StableGraph<String, KgEdge>,
node_index: HashMap<String, NodeIndex<u32>>,
}
impl Adjacency {
fn ensure_node(&mut self, entity: &str) -> NodeIndex<u32> {
if let Some(idx) = self.node_index.get(entity) {
return *idx;
}
let idx = self.graph.add_node(entity.to_string());
self.node_index.insert(entity.to_string(), idx);
idx
}
fn edge_from_triple(t: &Triple) -> KgEdge {
KgEdge {
predicate: t.predicate.clone(),
confidence: t.confidence,
provenance: t.provenance.clone(),
valid_from: t.valid_from,
valid_to: t.valid_to,
}
}
fn upsert_edge(&mut self, triple: &Triple) {
let s_idx = self.ensure_node(&triple.subject);
let o_idx = self.ensure_node(&triple.object);
let to_remove: Vec<_> = self
.graph
.edges(s_idx)
.filter(|e| e.weight().predicate == triple.predicate)
.map(|e| e.id())
.collect();
for eid in to_remove {
self.graph.remove_edge(eid);
}
self.graph
.add_edge(s_idx, o_idx, Self::edge_from_triple(triple));
}
fn remove_edges(&mut self, subject: &str, predicate: &str) -> usize {
let Some(&s_idx) = self.node_index.get(subject) else {
return 0;
};
let to_remove: Vec<_> = self
.graph
.edges(s_idx)
.filter(|e| e.weight().predicate == predicate)
.map(|e| e.id())
.collect();
let n = to_remove.len();
for eid in to_remove {
self.graph.remove_edge(eid);
}
n
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Triple {
pub subject: String,
pub predicate: String,
pub object: String,
pub valid_from: DateTime<Utc>,
pub valid_to: Option<DateTime<Utc>>,
pub confidence: f32,
pub provenance: Option<String>,
}
#[derive(Clone)]
pub struct KnowledgeGraph {
store: KgStoreRedb,
writer: KgWriter,
adj: Arc<RwLock<Adjacency>>,
}
fn redb_path_for(input: &Path) -> std::path::PathBuf {
match input.extension().and_then(|s| s.to_str()) {
Some("redb") => input.to_path_buf(),
_ => input.with_extension("redb"),
}
}
#[cfg(feature = "sqlite-kg")]
fn migrate_from_sqlite_if_needed(data_dir: &Path, redb_store: &KgStoreRedb) -> Result<()> {
use crate::memory_core::store::kg_sqlite::KnowledgeGraphSqlite;
let legacy = data_dir.join("kg.db");
let migrated_marker = data_dir.join("kg.db.migrated");
if !legacy.exists() {
return Ok(());
}
if migrated_marker.exists() {
return Ok(());
}
let sqlite = KnowledgeGraphSqlite::open_readonly(&legacy)
.with_context(|| format!("open legacy sqlite kg at {}", legacy.display()))?;
let triples = sqlite
.dump_all_triples()
.context("dump triples from legacy sqlite kg")?;
let drawers = sqlite
.load_drawers()
.context("load drawers from legacy sqlite kg")?;
let n_triples = triples.len();
let n_drawers = drawers.len();
redb_store
.import_all(triples, drawers)
.context("import legacy sqlite data into redb")?;
drop(sqlite);
std::fs::rename(&legacy, &migrated_marker).with_context(|| {
format!(
"rename {} to {}",
legacy.display(),
migrated_marker.display()
)
})?;
tracing::info!(
"Migrated {} triples and {} drawers from SQLite to redb at {}",
n_triples,
n_drawers,
data_dir.display()
);
Ok(())
}
#[cfg(not(feature = "sqlite-kg"))]
fn migrate_from_sqlite_if_needed(_data_dir: &Path, _redb_store: &KgStoreRedb) -> Result<()> {
Ok(())
}
fn hydrate_adjacency(store: &KgStoreRedb) -> Result<Adjacency> {
let mut adj = Adjacency::default();
let triples = store
.list_active(usize::MAX, 0)
.context("list active triples for adjacency hydration")?;
for t in &triples {
adj.upsert_edge(t);
}
Ok(adj)
}
impl KnowledgeGraph {
pub fn open(path: &Path) -> Result<Self> {
let redb_path = redb_path_for(path);
let store = KgStoreRedb::open(&redb_path)
.with_context(|| format!("open KG redb at {}", redb_path.display()))?;
if let Some(data_dir) = redb_path.parent() {
migrate_from_sqlite_if_needed(data_dir, &store)
.with_context(|| format!("migrate legacy SQLite KG at {}", data_dir.display()))?;
}
let adj = hydrate_adjacency(&store)
.with_context(|| format!("hydrate KG adjacency from {}", redb_path.display()))?;
let store_arc = Arc::new(store.clone());
let writer = if store.is_read_only() || tokio::runtime::Handle::try_current().is_err() {
KgWriter::bypass(store_arc)
} else {
KgWriter::spawn(store_arc)
};
Ok(Self {
store,
writer,
adj: Arc::new(RwLock::new(adj)),
})
}
pub async fn assert(&self, triple: Triple) -> Result<()> {
self.writer.assert(triple.clone()).await?;
{
let mut adj = self
.adj
.write()
.map_err(|_| anyhow::anyhow!("kg adjacency lock poisoned"))?;
if triple.valid_to.is_some() {
adj.remove_edges(&triple.subject, &triple.predicate);
} else {
adj.upsert_edge(&triple);
}
}
Ok(())
}
pub async fn retract(&self, subject: &str, predicate: &str) -> Result<usize> {
let subject_owned = subject.to_string();
let predicate_owned = predicate.to_string();
let closed = self
.writer
.retract(subject_owned.clone(), predicate_owned.clone())
.await?;
if closed > 0 {
let mut adj = self
.adj
.write()
.map_err(|_| anyhow::anyhow!("kg adjacency lock poisoned"))?;
adj.remove_edges(&subject_owned, &predicate_owned);
}
Ok(closed)
}
pub fn neighbors(&self, entity: &str) -> Result<Vec<(String, KgEdge)>> {
let adj = self
.adj
.read()
.map_err(|_| anyhow::anyhow!("kg adjacency lock poisoned"))?;
let Some(&idx) = adj.node_index.get(entity) else {
return Ok(Vec::new());
};
let mut out = Vec::new();
for e in adj.graph.edges(idx) {
let other = adj
.graph
.node_weight(e.target())
.cloned()
.unwrap_or_default();
out.push((other, e.weight().clone()));
}
for e in adj.graph.edges_directed(idx, petgraph::Direction::Incoming) {
let other = adj
.graph
.node_weight(e.source())
.cloned()
.unwrap_or_default();
out.push((other, e.weight().clone()));
}
Ok(out)
}
pub fn shortest_path(&self, from: &str, to: &str) -> Result<Option<Vec<String>>> {
let adj = self
.adj
.read()
.map_err(|_| anyhow::anyhow!("kg adjacency lock poisoned"))?;
let Some(&from_idx) = adj.node_index.get(from) else {
return Ok(None);
};
let Some(&to_idx) = adj.node_index.get(to) else {
return Ok(None);
};
if from_idx == to_idx {
return Ok(Some(vec![from.to_string()]));
}
let distances = dijkstra(&adj.graph, from_idx, Some(to_idx), |_| 1usize);
let Some(&total) = distances.get(&to_idx) else {
return Ok(None);
};
let mut path_rev = vec![to_idx];
let mut current = to_idx;
let mut current_dist = total;
while current_dist > 0 {
let mut next: Option<NodeIndex<u32>> = None;
for e in adj
.graph
.edges_directed(current, petgraph::Direction::Incoming)
{
let src = e.source();
if let Some(&d) = distances.get(&src)
&& d + 1 == current_dist
{
next = Some(src);
break;
}
}
let Some(prev) = next else {
return Ok(None);
};
path_rev.push(prev);
current = prev;
current_dist -= 1;
}
path_rev.reverse();
let path: Vec<String> = path_rev
.into_iter()
.filter_map(|i| adj.graph.node_weight(i).cloned())
.collect();
Ok(Some(path))
}
pub fn reachable(&self, entity: &str, max_hops: usize) -> Result<Vec<String>> {
if max_hops == 0 {
return Ok(Vec::new());
}
let adj = self
.adj
.read()
.map_err(|_| anyhow::anyhow!("kg adjacency lock poisoned"))?;
let Some(&start) = adj.node_index.get(entity) else {
return Ok(Vec::new());
};
let mut visited: HashSet<NodeIndex<u32>> = HashSet::new();
visited.insert(start);
let mut frontier: VecDeque<(NodeIndex<u32>, usize)> = VecDeque::new();
frontier.push_back((start, 0));
let mut out: Vec<String> = Vec::new();
while let Some((node, depth)) = frontier.pop_front() {
if depth == max_hops {
continue;
}
for e in adj.graph.edges(node) {
let tgt = e.target();
if visited.insert(tgt) {
if let Some(name) = adj.graph.node_weight(tgt) {
out.push(name.clone());
}
frontier.push_back((tgt, depth + 1));
}
}
}
Ok(out)
}
pub fn incoming(&self, entity: &str) -> Result<Vec<(String, KgEdge)>> {
let adj = self
.adj
.read()
.map_err(|_| anyhow::anyhow!("kg adjacency lock poisoned"))?;
let Some(&idx) = adj.node_index.get(entity) else {
return Ok(Vec::new());
};
let mut out = Vec::new();
for e in adj.graph.edges_directed(idx, petgraph::Direction::Incoming) {
let src = adj
.graph
.node_weight(e.source())
.cloned()
.unwrap_or_default();
out.push((src, e.weight().clone()));
}
Ok(out)
}
pub fn connected_components(&self) -> Result<usize> {
let adj = self
.adj
.read()
.map_err(|_| anyhow::anyhow!("kg adjacency lock poisoned"))?;
let mut visited: HashSet<NodeIndex<u32>> = HashSet::new();
let mut count = 0usize;
for start in adj.graph.node_indices() {
if visited.contains(&start) {
continue;
}
count += 1;
let mut frontier: VecDeque<NodeIndex<u32>> = VecDeque::new();
frontier.push_back(start);
visited.insert(start);
while let Some(node) = frontier.pop_front() {
for e in adj.graph.edges(node) {
if visited.insert(e.target()) {
frontier.push_back(e.target());
}
}
for e in adj
.graph
.edges_directed(node, petgraph::Direction::Incoming)
{
if visited.insert(e.source()) {
frontier.push_back(e.source());
}
}
}
}
Ok(count)
}
pub fn astar_path(&self, from: &str, to: &str) -> Result<Option<Vec<String>>> {
let adj = self
.adj
.read()
.map_err(|_| anyhow::anyhow!("kg adjacency lock poisoned"))?;
let Some(&from_idx) = adj.node_index.get(from) else {
return Ok(None);
};
let Some(&to_idx) = adj.node_index.get(to) else {
return Ok(None);
};
let result = astar(
&adj.graph,
from_idx,
|n| n == to_idx,
|_| 1usize,
|_| 0usize,
);
let Some((_, indices)) = result else {
return Ok(None);
};
let path: Vec<String> = indices
.into_iter()
.filter_map(|i| adj.graph.node_weight(i).cloned())
.collect();
Ok(Some(path))
}
pub async fn query_active(&self, subject: &str) -> Result<Vec<Triple>> {
let store = self.store.clone();
let subject = subject.to_string();
let triples = tokio::task::spawn_blocking(move || store.query_active(&subject))
.await
.context("query_active spawn_blocking join error")??;
Ok(triples)
}
pub fn list_subjects(&self, limit: usize) -> Result<Vec<String>> {
self.store.list_subjects(limit)
}
pub fn list_subjects_with_counts(&self, limit: usize) -> Result<Vec<(String, u64)>> {
self.store.list_subjects_with_counts(limit)
}
pub async fn list_active(&self, limit: usize, offset: usize) -> Result<Vec<Triple>> {
let store = self.store.clone();
let triples = tokio::task::spawn_blocking(move || store.list_active(limit, offset))
.await
.context("list_active spawn_blocking join error")??;
Ok(triples)
}
pub fn count_active_triples(&self) -> usize {
let n = self.store.count_active_triples();
usize::try_from(n).unwrap_or(usize::MAX)
}
pub fn node_count(&self) -> usize {
match self.adj.read() {
Ok(adj) => adj.graph.node_count(),
Err(_) => 0,
}
}
pub fn edge_count(&self) -> usize {
match self.adj.read() {
Ok(adj) => adj.graph.edge_count(),
Err(_) => 0,
}
}
pub fn community_count(&self) -> usize {
crate::memory_core::community::partition(self)
.iter()
.filter(|c| !c.is_empty())
.count()
}
pub fn checkpoint(&self) -> Result<(i64, i64)> {
self.store.checkpoint()?;
Ok((0, 0))
}
pub async fn upsert_drawer(&self, drawer: &Drawer) -> Result<()> {
self.writer.upsert_drawer(drawer.clone()).await
}
pub async fn delete_drawer(&self, id: Uuid) -> Result<()> {
self.writer.delete_drawer(id).await
}
pub fn delete_drawer_sync(&self, id: Uuid) -> Result<()> {
self.store.delete_drawer(id)
}
pub fn load_drawer_ids(&self) -> Result<std::collections::HashSet<Uuid>> {
self.store.load_drawer_ids()
}
pub fn load_drawers(&self) -> Result<Vec<Drawer>> {
self.store.load_drawers()
}
pub(crate) fn snapshot_undirected(&self) -> Result<(Vec<String>, Vec<(usize, usize)>)> {
let adj = self
.adj
.read()
.map_err(|_| anyhow::anyhow!("kg adjacency lock poisoned"))?;
let mut idx_to_dense: HashMap<NodeIndex<u32>, usize> = HashMap::new();
let mut node_names: Vec<String> = Vec::new();
for ni in adj.graph.node_indices() {
let name = adj.graph.node_weight(ni).cloned().unwrap_or_default();
idx_to_dense.insert(ni, node_names.len());
node_names.push(name);
}
let mut edges: Vec<(usize, usize)> = Vec::new();
let mut seen: HashSet<(usize, usize)> = HashSet::new();
for ni in adj.graph.node_indices() {
let u = match idx_to_dense.get(&ni) {
Some(&u) => u,
None => continue,
};
for e in adj.graph.edges(ni) {
let Some(&v) = idx_to_dense.get(&e.target()) else {
continue;
};
if u == v {
continue;
}
let key = if u < v { (u, v) } else { (v, u) };
if seen.insert(key) {
edges.push(key);
}
}
}
Ok((node_names, edges))
}
pub fn knowledge_gaps(&self) -> Vec<crate::memory_core::community::KnowledgeGap> {
crate::memory_core::community::find_communities(self)
}
pub fn is_read_only(&self) -> bool {
self.store.is_read_only()
}
pub fn dump_all_triples(&self) -> Result<Vec<Triple>> {
self.store.dump_all_triples()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use tempfile::tempdir;
#[tokio::test]
async fn open_creates_schema() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
let result = kg.query_active("nonexistent").await.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn assert_then_query_active_returns_fact() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
let triple = Triple {
subject: "alice".to_string(),
predicate: "works_at".to_string(),
object: "Acme Corp".to_string(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
};
kg.assert(triple).await.unwrap();
let active = kg.query_active("alice").await.unwrap();
assert_eq!(active.len(), 1);
assert_eq!(active[0].object, "Acme Corp");
}
#[tokio::test]
async fn retract_closes_active_interval() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
let t = Triple {
subject: "tga".to_string(),
predicate: "is_alias_for".to_string(),
object: "trusty-git-analytics".to_string(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
};
kg.assert(t).await.unwrap();
assert_eq!(kg.query_active("tga").await.unwrap().len(), 1);
let closed = kg.retract("tga", "is_alias_for").await.unwrap();
assert_eq!(closed, 1, "should close exactly one active row");
assert!(
kg.query_active("tga").await.unwrap().is_empty(),
"retract must drop the active triple"
);
let again = kg.retract("tga", "is_alias_for").await.unwrap();
assert_eq!(again, 0);
}
#[tokio::test]
async fn second_assert_closes_prior_interval() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
let t1 = Triple {
subject: "alice".to_string(),
predicate: "works_at".to_string(),
object: "Acme Corp".to_string(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
};
kg.assert(t1).await.unwrap();
let t2 = Triple {
subject: "alice".to_string(),
predicate: "works_at".to_string(),
object: "Beta Inc".to_string(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
};
kg.assert(t2).await.unwrap();
let active = kg.query_active("alice").await.unwrap();
assert_eq!(active.len(), 1, "should have exactly 1 active triple");
assert_eq!(active[0].object, "Beta Inc");
}
#[tokio::test]
async fn upsert_drawer_then_load_drawers_round_trips() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
let room_id = Uuid::new_v4();
let mut d = Drawer::new(room_id, "the cold-start drawer");
d.importance = 0.83;
d.tags = vec!["alpha".into(), "beta".into()];
d.source_file = Some(PathBuf::from("/tmp/source.md"));
kg.upsert_drawer(&d).await.unwrap();
let loaded = kg.load_drawers().unwrap();
assert_eq!(loaded.len(), 1);
assert_eq!(loaded[0].id, d.id);
assert_eq!(loaded[0].room_id, room_id);
assert_eq!(loaded[0].content, "the cold-start drawer");
assert!((loaded[0].importance - 0.83).abs() < 1e-5);
assert_eq!(loaded[0].tags, vec!["alpha".to_string(), "beta".into()]);
assert_eq!(loaded[0].source_file, Some(PathBuf::from("/tmp/source.md")));
}
#[tokio::test]
async fn load_drawer_ids_matches_load_drawers() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
let room = Uuid::new_v4();
let d1 = Drawer::new(room, "one");
let d2 = Drawer::new(room, "two");
kg.upsert_drawer(&d1).await.unwrap();
kg.upsert_drawer(&d2).await.unwrap();
let ids = kg.load_drawer_ids().unwrap();
assert_eq!(ids.len(), 2);
assert!(ids.contains(&d1.id));
assert!(ids.contains(&d2.id));
}
#[tokio::test]
async fn delete_drawer_removes_row() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
let d = Drawer::new(Uuid::new_v4(), "to be deleted");
kg.upsert_drawer(&d).await.unwrap();
kg.delete_drawer(d.id).await.unwrap();
let loaded = kg.load_drawers().unwrap();
assert!(loaded.is_empty());
}
#[tokio::test]
async fn upsert_drawer_replaces_existing_row() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
let mut d = Drawer::new(Uuid::new_v4(), "original");
kg.upsert_drawer(&d).await.unwrap();
d.content = "updated".into();
d.importance = 0.95;
kg.upsert_drawer(&d).await.unwrap();
let loaded = kg.load_drawers().unwrap();
assert_eq!(loaded.len(), 1);
assert_eq!(loaded[0].content, "updated");
assert!((loaded[0].importance - 0.95).abs() < 1e-5);
}
#[tokio::test]
async fn count_active_triples_returns_live_only() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
assert_eq!(kg.count_active_triples(), 0);
kg.assert(Triple {
subject: "alice".into(),
predicate: "works_at".into(),
object: "Acme".into(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.unwrap();
assert_eq!(kg.count_active_triples(), 1);
kg.assert(Triple {
subject: "alice".into(),
predicate: "works_at".into(),
object: "Beta".into(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.unwrap();
assert_eq!(kg.count_active_triples(), 1);
}
#[tokio::test]
async fn wal_checkpoint_returns_pages() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
kg.assert(Triple {
subject: "s".into(),
predicate: "p".into(),
object: "o".into(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.unwrap();
let (wal, done) = kg.checkpoint().expect("checkpoint should succeed");
assert!(wal >= 0);
assert!(done >= 0);
}
#[tokio::test]
async fn list_subjects_returns_distinct_active_subjects() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
assert!(kg.list_subjects(50).unwrap().is_empty());
kg.assert(Triple {
subject: "bob".into(),
predicate: "knows".into(),
object: "alice".into(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.unwrap();
kg.assert(Triple {
subject: "alice".into(),
predicate: "knows".into(),
object: "bob".into(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.unwrap();
kg.assert(Triple {
subject: "alice".into(),
predicate: "knows".into(),
object: "carol".into(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.unwrap();
let subjects = kg.list_subjects(50).unwrap();
assert_eq!(subjects, vec!["alice".to_string(), "bob".to_string()]);
}
#[tokio::test]
async fn list_subjects_with_counts_returns_grouped_counts() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
assert!(kg.list_subjects_with_counts(50).unwrap().is_empty());
for (subj, pred) in [
("alice", "knows"),
("alice", "likes"),
("alice", "owns"),
("bob", "knows"),
] {
kg.assert(Triple {
subject: subj.into(),
predicate: pred.into(),
object: "thing".into(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.unwrap();
}
let rows = kg.list_subjects_with_counts(50).unwrap();
assert_eq!(rows, vec![("alice".to_string(), 3), ("bob".to_string(), 1)]);
}
#[tokio::test]
async fn list_active_returns_ordered_window() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
for i in 0..3 {
kg.assert(Triple {
subject: format!("subj-{i}"),
predicate: "rel".into(),
object: format!("obj-{i}"),
valid_from: Utc::now() + chrono::Duration::milliseconds(i * 10),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.unwrap();
}
let all = kg.list_active(10, 0).await.unwrap();
assert_eq!(all.len(), 3);
assert_eq!(all[0].subject, "subj-2");
assert_eq!(all[2].subject, "subj-0");
let window = kg.list_active(2, 1).await.unwrap();
assert_eq!(window.len(), 2);
assert_eq!(window[0].subject, "subj-1");
assert_eq!(window[1].subject, "subj-0");
}
#[tokio::test]
async fn node_and_edge_count_match_adjacency() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
assert_eq!(kg.node_count(), 0);
assert_eq!(kg.edge_count(), 0);
for (s, o) in [("a", "b"), ("b", "c"), ("c", "a")] {
kg.assert(Triple {
subject: s.into(),
predicate: "rel".into(),
object: o.into(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.unwrap();
}
assert_eq!(kg.node_count(), 3);
assert_eq!(kg.edge_count(), 3);
}
#[tokio::test]
async fn community_count_returns_partition_size() {
let dir = tempdir().unwrap();
let kg = KnowledgeGraph::open(&dir.path().join("kg.db")).unwrap();
assert_eq!(kg.community_count(), 0);
for (s, o) in [("x", "y"), ("y", "z"), ("z", "x")] {
kg.assert(Triple {
subject: s.into(),
predicate: "rel".into(),
object: o.into(),
valid_from: Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.unwrap();
}
assert!(kg.community_count() >= 1);
}
}