use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use mnem_core::id::{Cid, NodeId};
use mnem_core::index::hybrid::{AdjEdge, AdjacencyIndex, EdgeProvenance};
use mnem_core::index::{BruteForceVectorIndex, SparseInvertedIndex, VectorIndex};
use mnem_core::repo::ReadonlyRepo;
use crate::metrics::{LeidenModeLabels, Metrics};
const KNN_FALLBACK_K: u32 = 32;
static KNN_FALLBACK_LOGGED: AtomicBool = AtomicBool::new(false);
pub const COMMIT_STORM_CAP_PER_MIN: u32 = 60;
pub const DELTA_RATIO_FORCE_FULL: f32 = 0.5;
pub const GRAPH_SIZE_GATE_V: usize = 250_000;
pub const DEBOUNCE_FLOOR_MS: u64 = 1_000;
pub const COMMIT_LATENCY_WINDOW: usize = 100;
#[must_use]
pub fn derive_debounce_ms(rolling_p75_commit_ms: Option<u64>) -> u64 {
rolling_p75_commit_ms
.map(|p| p.max(DEBOUNCE_FLOOR_MS))
.unwrap_or(DEBOUNCE_FLOOR_MS)
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum LeidenMode {
Full,
FullDebounced,
FallbackStale,
}
impl LeidenMode {
#[must_use]
pub fn label(&self) -> &'static str {
match self {
Self::Full => "full",
Self::FullDebounced => "full_debounced",
Self::FallbackStale => "fallback_stale",
}
}
#[must_use]
pub fn gauge_value(&self) -> i64 {
match self {
Self::Full => 0,
Self::FullDebounced => 1,
Self::FallbackStale => 2,
}
}
#[must_use]
pub fn resolve_default_from_env() -> Self {
match std::env::var("MNEM_LEIDEN_FULL_RECOMPUTE").ok() {
Some(v) => {
let t = v.trim().to_ascii_lowercase();
if t.is_empty() || matches!(t.as_str(), "0" | "false" | "no" | "off") {
Self::FullDebounced
} else {
Self::Full
}
}
None => Self::FullDebounced,
}
}
}
#[derive(Debug)]
pub struct LeidenCache {
pub commit_latency_ms: VecDeque<u64>,
pub last_recompute_at: Option<Instant>,
pub commit_arrivals: VecDeque<Instant>,
pub default_mode: LeidenMode,
pub storm_cap_per_min: u32,
}
impl Default for LeidenCache {
fn default() -> Self {
Self {
commit_latency_ms: VecDeque::with_capacity(COMMIT_LATENCY_WINDOW),
last_recompute_at: None,
commit_arrivals: VecDeque::new(),
default_mode: LeidenMode::FullDebounced,
storm_cap_per_min: COMMIT_STORM_CAP_PER_MIN,
}
}
}
impl LeidenCache {
pub fn observe_commit_latency(&mut self, latency: Duration) {
let ms = u64::try_from(latency.as_millis()).unwrap_or(u64::MAX);
if self.commit_latency_ms.len() == COMMIT_LATENCY_WINDOW {
self.commit_latency_ms.pop_front();
}
self.commit_latency_ms.push_back(ms);
}
pub fn observe_commit_arrival(&mut self, at: Instant) {
let cutoff = at.checked_sub(Duration::from_mins(1)).unwrap_or(at);
while let Some(front) = self.commit_arrivals.front() {
if *front < cutoff {
self.commit_arrivals.pop_front();
} else {
break;
}
}
self.commit_arrivals.push_back(at);
}
#[must_use]
pub fn rolling_p75_commit_ms(&self) -> Option<u64> {
if self.commit_latency_ms.is_empty() {
return None;
}
let mut sorted: Vec<u64> = self.commit_latency_ms.iter().copied().collect();
sorted.sort_unstable();
let n = sorted.len();
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let idx = ((n as f64 * 0.75).ceil() as usize)
.saturating_sub(1)
.min(n - 1);
Some(sorted[idx])
}
#[must_use]
pub fn effective_debounce_ms(&self) -> u64 {
derive_debounce_ms(self.rolling_p75_commit_ms())
}
#[must_use]
pub fn storm_cap_reached(&self) -> bool {
u32::try_from(self.commit_arrivals.len()).unwrap_or(u32::MAX) >= self.storm_cap_per_min
}
#[must_use]
pub fn select_mode(&self, node_count: usize, now: Instant) -> LeidenMode {
if self.default_mode == LeidenMode::Full {
return LeidenMode::Full;
}
if node_count >= GRAPH_SIZE_GATE_V {
return LeidenMode::FallbackStale;
}
if self.storm_cap_reached() {
return LeidenMode::FallbackStale;
}
if let Some(last) = self.last_recompute_at {
let elapsed_ms =
u64::try_from(now.saturating_duration_since(last).as_millis()).unwrap_or(u64::MAX);
if elapsed_ms < self.effective_debounce_ms() {
return LeidenMode::FallbackStale;
}
}
LeidenMode::FullDebounced
}
}
#[derive(Clone)]
pub struct AppState {
pub repo: Arc<Mutex<ReadonlyRepo>>,
pub embed_cfg: Option<mnem_embed_providers::ProviderConfig>,
pub sparse_cfg: Option<mnem_sparse_providers::ProviderConfig>,
pub indexes: Arc<Mutex<IndexCache>>,
pub allow_labels: bool,
pub metrics: Metrics,
pub push_token: Option<String>,
pub graph_cache: Arc<Mutex<GraphCache>>,
pub traverse_cfg: Arc<crate::routes::traverse::TraverseAnswerCfg>,
pub ner_cfg: Option<mnem_ingest::NerConfig>,
}
impl AppState {
#[must_use]
pub fn resolve_allow_labels_from_env() -> bool {
Self::parse_allow_labels(std::env::var("MNEM_BENCH").ok().as_deref())
}
#[must_use]
pub fn resolve_push_token_from_env() -> Option<String> {
let raw = std::env::var("MNEM_HTTP_PUSH_TOKEN").ok()?;
let trimmed = raw.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
#[must_use]
pub fn parse_allow_labels(val: Option<&str>) -> bool {
match val {
None => false,
Some(s) => {
let t = s.trim();
if t.is_empty() {
return false;
}
let l = t.to_ascii_lowercase();
!matches!(l.as_str(), "0" | "false" | "no" | "off")
}
}
}
}
#[derive(Default)]
pub struct IndexCache {
pub cache_key_op_id: Option<Cid>,
pub vectors: HashMap<String, Arc<BruteForceVectorIndex>>,
pub sparse: HashMap<String, Arc<SparseInvertedIndex>>,
}
#[cfg(test)]
mod mnem_bench_parse_tests {
use super::*;
#[test]
fn unset_parses_false() {
assert!(!AppState::parse_allow_labels(None));
}
#[test]
fn falsy_strings_parse_false() {
for v in [
"", "0", "false", "FALSE", "False", "no", "No", "NO", "off", "Off", "OFF", " ", " 0 ",
] {
assert!(
!AppState::parse_allow_labels(Some(v)),
"expected `{v:?}` to parse false"
);
}
}
#[test]
fn truthy_strings_parse_true() {
for v in ["1", "true", "yes", "on", "YES", "benchmark", "anything"] {
assert!(
AppState::parse_allow_labels(Some(v)),
"expected `{v:?}` to parse true"
);
}
}
}
impl IndexCache {
pub fn reconcile(&mut self, repo: &ReadonlyRepo) {
let current = Some(repo.op_id().clone());
if self.cache_key_op_id != current {
self.cache_key_op_id = current;
self.vectors.clear();
self.sparse.clear();
}
}
pub fn vector_index(
&mut self,
repo: &ReadonlyRepo,
model: &str,
) -> Result<Arc<BruteForceVectorIndex>, mnem_core::Error> {
self.reconcile(repo);
if let Some(idx) = self.vectors.get(model) {
return Ok(idx.clone());
}
let idx = Arc::new(repo.build_vector_index(model)?);
self.vectors.insert(model.to_string(), idx.clone());
Ok(idx)
}
pub fn sparse_index(
&mut self,
repo: &ReadonlyRepo,
vocab_id: &str,
) -> Result<Arc<SparseInvertedIndex>, mnem_core::Error> {
self.reconcile(repo);
if let Some(idx) = self.sparse.get(vocab_id) {
return Ok(idx.clone());
}
let idx = Arc::new(SparseInvertedIndex::build_from_repo(repo, vocab_id)?);
self.sparse.insert(vocab_id.to_string(), idx.clone());
Ok(idx)
}
}
#[derive(Default, Debug)]
pub struct AuthoredEdges {
pub edges: Vec<(NodeId, NodeId)>,
}
impl AdjacencyIndex for AuthoredEdges {
fn iter_edges(&self) -> Box<dyn Iterator<Item = AdjEdge> + '_> {
Box::new(self.edges.iter().map(|(s, d)| AdjEdge {
src: *s,
dst: *d,
weight: 1.0,
provenance: EdgeProvenance::Authored,
}))
}
fn edge_count(&self) -> usize {
self.edges.len()
}
}
#[derive(Debug)]
pub struct DerivedHybridAdjacency {
pub authored: Arc<AuthoredEdges>,
pub knn: Vec<(NodeId, NodeId, f32)>,
}
impl AdjacencyIndex for DerivedHybridAdjacency {
fn iter_edges(&self) -> Box<dyn Iterator<Item = AdjEdge> + '_> {
let authored = self.authored.edges.iter().map(|(s, d)| AdjEdge {
src: *s,
dst: *d,
weight: 1.0,
provenance: EdgeProvenance::Authored,
});
let knn = self.knn.iter().map(|(s, d, w)| AdjEdge {
src: *s,
dst: *d,
weight: *w,
provenance: EdgeProvenance::Knn,
});
Box::new(authored.chain(knn))
}
fn edge_count(&self) -> usize {
self.authored.edges.len() + self.knn.len()
}
}
#[derive(Default)]
pub struct GraphCache {
pub key: Option<Cid>,
pub adjacency: Option<Arc<AuthoredEdges>>,
pub knn_edges: Option<Arc<Vec<(NodeId, NodeId, f32)>>>,
pub knn_key: Option<Cid>,
pub hybrid: Option<Arc<DerivedHybridAdjacency>>,
pub community: Option<Arc<mnem_graphrag::community::CommunityAssignment>>,
pub ppr_matrix: Option<Arc<mnem_core::ppr::SparseTransition>>,
pub community_stale: Option<Arc<mnem_graphrag::community::CommunityAssignment>>,
pub leiden_cache: LeidenCache,
}
impl GraphCache {
pub fn reconcile(&mut self, repo: &ReadonlyRepo) {
let current = Some(repo.op_id().clone());
if self.key != current {
self.key = current;
self.adjacency = None;
self.knn_edges = None;
self.knn_key = None;
self.hybrid = None;
if let Some(prev) = self.community.take() {
self.community_stale = Some(prev);
}
self.ppr_matrix = None;
}
}
pub fn community_for_head(
&mut self,
repo: &ReadonlyRepo,
vector: Option<&BruteForceVectorIndex>,
metrics: &Metrics,
) -> Result<
(
Arc<mnem_graphrag::community::CommunityAssignment>,
LeidenMode,
),
crate::error::Error,
> {
self.reconcile(repo);
let adj = self.hybrid_adjacency_for(repo, vector)?;
let node_count = authored_node_count(adj.as_ref());
let now = Instant::now();
let mode = self.leiden_cache.select_mode(node_count, now);
metrics
.leiden_debounce_effective
.set(i64::try_from(self.leiden_cache.effective_debounce_ms()).unwrap_or(i64::MAX));
metrics
.leiden_storm_cap_effective
.set(i64::from(self.leiden_cache.storm_cap_per_min));
#[allow(clippy::cast_possible_truncation)]
let delta_pp10k = (DELTA_RATIO_FORCE_FULL * 10_000.0) as i64;
metrics.leiden_delta_ratio_effective.set(delta_pp10k);
metrics.leiden_mode_current.set(mode.gauge_value());
metrics
.leiden_mode
.get_or_create(&LeidenModeLabels {
mode: mode.label().to_string(),
})
.inc();
match mode {
LeidenMode::Full | LeidenMode::FullDebounced => {
if let Some(c) = &self.community {
return Ok((c.clone(), mode));
}
let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
let arc = Arc::new(assignment);
self.community = Some(arc.clone());
if matches!(mode, LeidenMode::FullDebounced) {
self.leiden_cache.last_recompute_at = Some(now);
}
self.leiden_cache.observe_commit_arrival(now);
Ok((arc, mode))
}
LeidenMode::FallbackStale => {
if let Some(c) = &self.community {
return Ok((c.clone(), mode));
}
if let Some(c) = &self.community_stale {
return Ok((c.clone(), mode));
}
let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
let arc = Arc::new(assignment);
self.community_stale = Some(arc.clone());
Ok((arc, mode))
}
}
}
pub fn ppr_matrix_for(
&mut self,
repo: &ReadonlyRepo,
vector: Option<&BruteForceVectorIndex>,
) -> Result<Arc<mnem_core::ppr::SparseTransition>, crate::error::Error> {
self.reconcile(repo);
if let Some(m) = &self.ppr_matrix {
return Ok(m.clone());
}
let adj = self.hybrid_adjacency_for(repo, vector)?;
let m = Arc::new(mnem_core::ppr::sparse_transition_matrix(adj.as_ref()));
self.ppr_matrix = Some(m.clone());
Ok(m)
}
pub fn adjacency_for(
&mut self,
repo: &ReadonlyRepo,
) -> Result<Arc<AuthoredEdges>, crate::error::Error> {
self.reconcile(repo);
if let Some(a) = &self.adjacency {
return Ok(a.clone());
}
let a = Arc::new(collect_authored_edges(repo)?);
self.adjacency = Some(a.clone());
Ok(a)
}
pub fn community_for(
&mut self,
repo: &ReadonlyRepo,
) -> Result<Arc<mnem_graphrag::community::CommunityAssignment>, crate::error::Error> {
self.reconcile(repo);
if let Some(c) = &self.community {
return Ok(c.clone());
}
let adj = self.adjacency_for(repo)?;
let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
let arc = Arc::new(assignment);
self.community = Some(arc.clone());
Ok(arc)
}
pub fn hybrid_adjacency_for(
&mut self,
repo: &ReadonlyRepo,
vector: Option<&BruteForceVectorIndex>,
) -> Result<Arc<dyn AdjacencyIndex + Send + Sync>, crate::error::Error> {
self.reconcile(repo);
let authored = self.adjacency_for(repo)?;
if !authored.edges.is_empty() {
return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
}
let Some(vec_idx) = vector else {
return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
};
if vec_idx.is_empty() {
return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
}
self.ensure_knn_edges(vec_idx)?;
let knn = self
.knn_edges
.clone()
.expect("ensure_knn_edges populated the slot");
if !KNN_FALLBACK_LOGGED.swap(true, Ordering::Relaxed) {
tracing::info!(
target: "mnem_http::graph_cache",
k = KNN_FALLBACK_K,
metric = "cosine",
knn_edges = knn.len(),
vector_model = %vec_idx.model(),
"authored adjacency empty; KNN-edge fallback activated (E0 wire)",
);
}
if self.hybrid.is_none() {
self.hybrid = Some(Arc::new(DerivedHybridAdjacency {
authored: authored.clone(),
knn: (*knn).clone(),
}));
}
Ok(self.hybrid.clone().expect("hybrid slot populated above")
as Arc<dyn AdjacencyIndex + Send + Sync>)
}
pub fn hybrid_community_for(
&mut self,
repo: &ReadonlyRepo,
vector: Option<&BruteForceVectorIndex>,
) -> Result<Arc<mnem_graphrag::community::CommunityAssignment>, crate::error::Error> {
self.reconcile(repo);
if let Some(c) = &self.community {
return Ok(c.clone());
}
let adj = self.hybrid_adjacency_for(repo, vector)?;
let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
let arc = Arc::new(assignment);
self.community = Some(arc.clone());
Ok(arc)
}
fn ensure_knn_edges(
&mut self,
vector: &BruteForceVectorIndex,
) -> Result<(), crate::error::Error> {
let mut ids: Vec<NodeId> = Vec::with_capacity(vector.len());
let mut vecs: Vec<Vec<f32>> = Vec::with_capacity(vector.len());
for (id, row) in vector.points_iter() {
ids.push(id);
vecs.push(row.to_vec());
}
let edges = mnem_ann::derive_knn_edges_from_vectors(
&ids,
&vecs,
KNN_FALLBACK_K,
mnem_ann::DistanceMetric::Cosine,
);
let root_cid = vector_index_content_cid(vector, &ids)?;
let idx = mnem_ann::KnnEdgeIndex {
root_cid,
k: KNN_FALLBACK_K,
metric: mnem_ann::DistanceMetric::Cosine,
edges,
};
let cid = idx
.compute_cid()
.map_err(|e| crate::error::Error::internal(format!("knn edge cid: {e}")))?;
if self.knn_key.as_ref() == Some(&cid) && self.knn_edges.is_some() {
return Ok(());
}
let triples: Vec<(NodeId, NodeId, f32)> = idx
.edges
.into_iter()
.map(|e| (e.src, e.dst, e.weight))
.collect();
self.knn_edges = Some(Arc::new(triples));
self.knn_key = Some(cid);
self.hybrid = None; Ok(())
}
}
fn vector_index_content_cid(
vector: &BruteForceVectorIndex,
ids: &[NodeId],
) -> Result<Cid, crate::error::Error> {
use mnem_core::codec::to_canonical_bytes;
use mnem_core::id::{CODEC_RAW, Multihash};
#[derive(serde::Serialize)]
struct Preimage<'a> {
tag: &'a str,
model: &'a str,
dim: u32,
ids: &'a [NodeId],
}
let pre = Preimage {
tag: "mnem-http/knn-fallback/v1",
model: vector.model(),
dim: vector.dim(),
ids,
};
let body = to_canonical_bytes(&pre)
.map_err(|e| crate::error::Error::internal(format!("canonical encode: {e}")))?;
let hash = Multihash::sha2_256(&body);
Ok(Cid::new(CODEC_RAW, hash))
}
fn authored_node_count(adj: &(dyn AdjacencyIndex + Send + Sync)) -> usize {
use std::collections::BTreeSet;
let mut seen: BTreeSet<NodeId> = BTreeSet::new();
for e in adj.iter_edges() {
seen.insert(e.src);
seen.insert(e.dst);
}
seen.len()
}
fn collect_authored_edges(repo: &ReadonlyRepo) -> Result<AuthoredEdges, crate::error::Error> {
let Some(commit) = repo.head_commit() else {
return Ok(AuthoredEdges::default());
};
let bs = repo.blockstore().clone();
let cursor = mnem_core::prolly::Cursor::new(&*bs, &commit.edges)
.map_err(|e| crate::error::Error::internal(format!("opening edge cursor: {e}")))?;
let mut edges: Vec<(NodeId, NodeId)> = Vec::new();
for entry in cursor {
let (_key, edge_cid) =
entry.map_err(|e| crate::error::Error::internal(format!("walking edge tree: {e}")))?;
let bytes = bs
.get(&edge_cid)
.map_err(|e| crate::error::Error::internal(format!("fetching edge block: {e}")))?
.ok_or_else(|| {
crate::error::Error::internal(format!("edge block {edge_cid} missing"))
})?;
let edge: mnem_core::objects::Edge = mnem_core::codec::from_canonical_bytes(&bytes)
.map_err(|e| crate::error::Error::internal(format!("decoding edge: {e}")))?;
edges.push((edge.src, edge.dst));
}
Ok(AuthoredEdges { edges })
}
#[cfg(test)]
pub(crate) mod test_support {
use super::*;
use mnem_core::store::{MemoryBlockstore, MemoryOpHeadsStore};
pub(crate) fn state_with_token(token: Option<String>) -> AppState {
let bs: Arc<dyn mnem_core::store::Blockstore> = Arc::new(MemoryBlockstore::new());
let ohs: Arc<dyn mnem_core::store::OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
let repo = ReadonlyRepo::init(bs, ohs).expect("init ok");
AppState {
repo: Arc::new(Mutex::new(repo)),
embed_cfg: None,
sparse_cfg: None,
indexes: Arc::new(Mutex::new(IndexCache::default())),
allow_labels: false,
metrics: Metrics::new(),
push_token: token,
graph_cache: Arc::new(Mutex::new(GraphCache::default())),
traverse_cfg: Arc::new(crate::routes::traverse::TraverseAnswerCfg::default()),
ner_cfg: None,
}
}
}
#[cfg(test)]
mod knn_fallback_tests {
use super::*;
use bytes::Bytes;
use mnem_core::objects::node::{Dtype, Embedding};
use mnem_core::store::{Blockstore, MemoryBlockstore, MemoryOpHeadsStore, OpHeadsStore};
fn stores() -> (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) {
(
Arc::new(MemoryBlockstore::new()),
Arc::new(MemoryOpHeadsStore::new()),
)
}
fn f32_embed(model: &str, v: &[f32]) -> Embedding {
let mut bytes = Vec::with_capacity(v.len() * 4);
for x in v {
bytes.extend_from_slice(&x.to_le_bytes());
}
Embedding {
model: model.to_string(),
dtype: Dtype::F32,
dim: u32::try_from(v.len()).expect("test vec fits in u32"),
vector: Bytes::from(bytes),
}
}
fn build_vector_index(rows: &[(NodeId, Vec<f32>)]) -> BruteForceVectorIndex {
let mut idx = BruteForceVectorIndex::empty("m", 3);
for (id, v) in rows {
let inserted = idx.try_insert(*id, &f32_embed("m", v));
assert!(inserted, "embedding insert");
}
idx
}
#[test]
fn empty_authored_plus_empty_vector_is_no_op() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
let mut gc = GraphCache::default();
let adj = gc.hybrid_adjacency_for(&repo, None).ok().expect("no-op");
assert_eq!(adj.edge_count(), 0, "no vectors -> no KNN fallback");
assert!(gc.knn_edges.is_none());
}
#[test]
fn empty_authored_plus_populated_vector_activates_fallback() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
let rows: Vec<(NodeId, Vec<f32>)> = vec![
(NodeId::new_v7(), vec![1.0, 0.0, 0.0]),
(NodeId::new_v7(), vec![0.9, 0.1, 0.0]),
(NodeId::new_v7(), vec![0.0, 1.0, 0.0]),
(NodeId::new_v7(), vec![0.0, 0.0, 1.0]),
];
let vec_idx = build_vector_index(&rows);
let mut gc = GraphCache::default();
let adj = gc
.hybrid_adjacency_for(&repo, Some(&vec_idx))
.ok()
.expect("knn fallback ok");
assert!(
adj.edge_count() > 0,
"KNN fallback must produce at least one edge (got 0)",
);
assert!(gc.knn_edges.is_some(), "knn_edges slot populated");
assert!(gc.knn_key.is_some(), "knn cache key populated");
let assignment = gc
.hybrid_community_for(&repo, Some(&vec_idx))
.ok()
.expect("community ok");
let any_assigned = rows
.iter()
.any(|(id, _)| assignment.community_of(*id).is_some());
assert!(
any_assigned,
"at least one node must have a community under a non-empty adjacency",
);
}
#[test]
fn knn_fallback_is_idempotent_on_same_vector() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
let rows: Vec<(NodeId, Vec<f32>)> = vec![
(NodeId::new_v7(), vec![1.0, 0.0, 0.0]),
(NodeId::new_v7(), vec![0.0, 1.0, 0.0]),
];
let vec_idx = build_vector_index(&rows);
let mut gc = GraphCache::default();
let _ = gc
.hybrid_adjacency_for(&repo, Some(&vec_idx))
.ok()
.expect("first build");
let first_key = gc.knn_key.clone().expect("first build populates key");
let _ = gc
.hybrid_adjacency_for(&repo, Some(&vec_idx))
.ok()
.expect("second build");
let second_key = gc.knn_key.clone().expect("second build populates key");
assert_eq!(first_key, second_key, "KNN cache key stable across calls");
}
}