#[cfg(test)]
use nodedb_graph::CsrIndex;
use nodedb_graph::ShardedCsrIndex;
use nodedb_graph::csr::weights::extract_weight_from_properties;
#[cfg(test)]
use nodedb_types::TenantId;
use crate::engine::graph::edge_store::EdgeStore;
pub fn rebuild_sharded_from_store(store: &EdgeStore) -> crate::Result<ShardedCsrIndex> {
rebuild_sharded_from_store_as_of(store, None)
}
pub fn rebuild_sharded_from_store_as_of(
store: &EdgeStore,
system_as_of: Option<i64>,
) -> crate::Result<ShardedCsrIndex> {
let mut sharded = ShardedCsrIndex::new();
let all_edges = store.scan_all_edges_decoded(system_as_of)?;
for (tid, _collection, src, _label, dst, _props) in &all_edges {
let partition = sharded.get_or_create(*tid);
partition
.add_node(src)
.map_err(|e| crate::Error::Internal {
detail: format!("CSR rebuild (add src node): {e}"),
})?;
partition
.add_node(dst)
.map_err(|e| crate::Error::Internal {
detail: format!("CSR rebuild (add dst node): {e}"),
})?;
}
for (tid, _collection, src, label, dst, props) in &all_edges {
let partition = sharded.get_or_create(*tid);
let weight = extract_weight_from_properties(props);
let res = if weight != 1.0 {
partition.add_edge_weighted(src, label, dst, weight)
} else {
partition.add_edge(src, label, dst)
};
res.map_err(|e| crate::Error::Internal {
detail: format!("CSR rebuild: {e}"),
})?;
}
if let Err(e) = sharded.compact_all() {
tracing::warn!(error = %e, "CSR compaction rejected by memory governor during rebuild; skipping");
}
Ok(sharded)
}
#[cfg(test)]
pub fn rebuild_from_store(store: &EdgeStore) -> crate::Result<CsrIndex> {
use std::collections::hash_map::Entry;
let mut sharded = rebuild_sharded_from_store_as_of(store, None)?;
let tid = sharded
.iter()
.map(|(tid, _)| *tid)
.next()
.unwrap_or_else(|| TenantId::new(0));
match sharded.entry(tid) {
Entry::Occupied(entry) => Ok(entry.remove()),
Entry::Vacant(_) => Ok(CsrIndex::new()),
}
}