mnem_http/state.rs
1//! Shared app state. One `ReadonlyRepo` per server, behind an
2//! `Arc<Mutex>` because redb holds an exclusive file lock so we can
3//! only have one open per process. Writes take the lock, run a
4//! transaction, and replace the repo with the post-commit value.
5
6use std::collections::{HashMap, VecDeque};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10
11use mnem_core::id::{Cid, NodeId};
12use mnem_core::index::hybrid::{AdjEdge, AdjacencyIndex, EdgeProvenance};
13use mnem_core::index::{BruteForceVectorIndex, SparseInvertedIndex, VectorIndex};
14use mnem_core::repo::ReadonlyRepo;
15
16use crate::metrics::{LeidenModeLabels, Metrics};
17
18/// Default per-source neighbour count for the KNN-edge fallback. 32
19/// matches the `k=32, metric=cosine` determinism contract the
20/// `KnnEdgeIndex::compute_cid` key derivation bakes in.
21const KNN_FALLBACK_K: u32 = 32;
22
23/// One-shot guard so the "KNN-edge fallback activated" info-level log
24/// fires once per process lifetime (not once per retrieve). Keeps the
25/// prod log from flooding while still emitting the "yes, E0 wire is
26/// live" breadcrumb.
27static KNN_FALLBACK_LOGGED: AtomicBool = AtomicBool::new(false);
28
29// ---------------------------------------------------------------
30// Gap 10 Phase-1: community-cache invalidation tunables
31// ---------------------------------------------------------------
32
33/// Gap 10 R6 floor-c tunable: commit-storm DoS cap per minute. Default 60.
34///
35/// `#tunable: default=60; rationale="attacker cannot amplify beyond human commit rate"`
36pub const COMMIT_STORM_CAP_PER_MIN: u32 = 60;
37
38/// Gap 10 R6 floor-c tunable: fraction of the graph that must change
39/// in one commit before an incremental recompute path force-flips to
40/// full. Exported today as a gauge-visible constant; consulted by the
41/// debounced-full recompute loop when deciding whether to bypass the
42/// incremental shortcut.
43///
44/// `#tunable: default=0.5; rationale="half-graph change = incremental not cheaper than full"`
45pub const DELTA_RATIO_FORCE_FULL: f32 = 0.5;
46
47/// Gap 10 Phase-1 graph-size gate: above this node count the hot path
48/// refuses to run full-Leiden inline and serves `FallbackStale`.
49///
50/// `#tunable: default=250_000; rationale="HNSW memory derivation; see benchmarks/leiden-wallclock-vs-V.md"`
51pub const GRAPH_SIZE_GATE_V: usize = 250_000;
52
53/// Gap 10 R3 debounce floor.
54pub const DEBOUNCE_FLOOR_MS: u64 = 1_000;
55
56/// Size of the rolling ring buffer used for p75 commit-latency.
57pub const COMMIT_LATENCY_WINDOW: usize = 100;
58
59/// Gap 10 R6 code-sketch API: runtime-derived debounce window.
60#[must_use]
61pub fn derive_debounce_ms(rolling_p75_commit_ms: Option<u64>) -> u64 {
62 rolling_p75_commit_ms
63 .map(|p| p.max(DEBOUNCE_FLOOR_MS))
64 .unwrap_or(DEBOUNCE_FLOOR_MS)
65}
66
67/// Gap 10 Phase-1 recompute-mode enum. Closed vocabulary.
68#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
69pub enum LeidenMode {
70 /// Immediate full recompute on every commit. Env-forced.
71 Full,
72 /// Debounced full recompute (a future version default).
73 FullDebounced,
74 /// Served prior commit's assignment; refresh suppressed.
75 FallbackStale,
76}
77
78impl LeidenMode {
79 /// Prometheus label string.
80 #[must_use]
81 pub fn label(&self) -> &'static str {
82 match self {
83 Self::Full => "full",
84 Self::FullDebounced => "full_debounced",
85 Self::FallbackStale => "fallback_stale",
86 }
87 }
88
89 /// Gauge encoding for `mnem_leiden_mode_current`.
90 #[must_use]
91 pub fn gauge_value(&self) -> i64 {
92 match self {
93 Self::Full => 0,
94 Self::FullDebounced => 1,
95 Self::FallbackStale => 2,
96 }
97 }
98
99 /// Resolve default mode from env at startup.
100 #[must_use]
101 pub fn resolve_default_from_env() -> Self {
102 match std::env::var("MNEM_LEIDEN_FULL_RECOMPUTE").ok() {
103 Some(v) => {
104 let t = v.trim().to_ascii_lowercase();
105 if t.is_empty() || matches!(t.as_str(), "0" | "false" | "no" | "off") {
106 Self::FullDebounced
107 } else {
108 Self::Full
109 }
110 }
111 None => Self::FullDebounced,
112 }
113 }
114}
115
116/// Gap 10 Phase-1 debounce + storm-cap state.
117#[derive(Debug)]
118pub struct LeidenCache {
119 /// Rolling ring of commit wall-clock latencies (milliseconds).
120 pub commit_latency_ms: VecDeque<u64>,
121 /// Instant of the most recent successful full recompute.
122 pub last_recompute_at: Option<Instant>,
123 /// Ring of commit arrivals inside the trailing 60s.
124 pub commit_arrivals: VecDeque<Instant>,
125 /// Default mode at process start.
126 pub default_mode: LeidenMode,
127 /// Effective storm cap (operator-overridable).
128 pub storm_cap_per_min: u32,
129}
130
131impl Default for LeidenCache {
132 fn default() -> Self {
133 Self {
134 commit_latency_ms: VecDeque::with_capacity(COMMIT_LATENCY_WINDOW),
135 last_recompute_at: None,
136 commit_arrivals: VecDeque::new(),
137 default_mode: LeidenMode::FullDebounced,
138 storm_cap_per_min: COMMIT_STORM_CAP_PER_MIN,
139 }
140 }
141}
142
143impl LeidenCache {
144 /// Record a completed-commit wall-time sample.
145 pub fn observe_commit_latency(&mut self, latency: Duration) {
146 let ms = u64::try_from(latency.as_millis()).unwrap_or(u64::MAX);
147 if self.commit_latency_ms.len() == COMMIT_LATENCY_WINDOW {
148 self.commit_latency_ms.pop_front();
149 }
150 self.commit_latency_ms.push_back(ms);
151 }
152
153 /// Record a commit arrival; evicts entries older than 60s.
154 pub fn observe_commit_arrival(&mut self, at: Instant) {
155 let cutoff = at.checked_sub(Duration::from_mins(1)).unwrap_or(at);
156 while let Some(front) = self.commit_arrivals.front() {
157 if *front < cutoff {
158 self.commit_arrivals.pop_front();
159 } else {
160 break;
161 }
162 }
163 self.commit_arrivals.push_back(at);
164 }
165
166 /// Nearest-rank p75 of the rolling commit-latency ring.
167 #[must_use]
168 pub fn rolling_p75_commit_ms(&self) -> Option<u64> {
169 if self.commit_latency_ms.is_empty() {
170 return None;
171 }
172 let mut sorted: Vec<u64> = self.commit_latency_ms.iter().copied().collect();
173 sorted.sort_unstable();
174 let n = sorted.len();
175 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
176 let idx = ((n as f64 * 0.75).ceil() as usize)
177 .saturating_sub(1)
178 .min(n - 1);
179 Some(sorted[idx])
180 }
181
182 /// Effective debounce window via [`derive_debounce_ms`].
183 #[must_use]
184 pub fn effective_debounce_ms(&self) -> u64 {
185 derive_debounce_ms(self.rolling_p75_commit_ms())
186 }
187
188 /// Is the 60s commit-arrival ring at or above the storm cap?
189 #[must_use]
190 pub fn storm_cap_reached(&self) -> bool {
191 u32::try_from(self.commit_arrivals.len()).unwrap_or(u32::MAX) >= self.storm_cap_per_min
192 }
193
194 /// Pure policy function. Returns the mode to serve this call.
195 #[must_use]
196 pub fn select_mode(&self, node_count: usize, now: Instant) -> LeidenMode {
197 if self.default_mode == LeidenMode::Full {
198 return LeidenMode::Full;
199 }
200 if node_count >= GRAPH_SIZE_GATE_V {
201 return LeidenMode::FallbackStale;
202 }
203 if self.storm_cap_reached() {
204 return LeidenMode::FallbackStale;
205 }
206 if let Some(last) = self.last_recompute_at {
207 let elapsed_ms =
208 u64::try_from(now.saturating_duration_since(last).as_millis()).unwrap_or(u64::MAX);
209 if elapsed_ms < self.effective_debounce_ms() {
210 return LeidenMode::FallbackStale;
211 }
212 }
213 LeidenMode::FullDebounced
214 }
215}
216
217/// Shared application state passed to every axum handler via
218/// `State<AppState>`. Clones are cheap (shared `Arc`).
219#[derive(Clone)]
220pub struct AppState {
221 /// The open repo, held behind a mutex because redb keeps an
222 /// exclusive file lock per-process. Writes replace the value
223 /// inside the mutex with the post-commit `ReadonlyRepo`.
224 pub repo: Arc<Mutex<ReadonlyRepo>>,
225 /// Optional embed-provider config resolved from the repo's
226 /// `config.toml` at startup.
227 pub embed_cfg: Option<mnem_embed_providers::ProviderConfig>,
228 /// Optional sparse-provider config resolved from the repo's
229 /// `config.toml` at startup. When present, POST `/v1/nodes` and
230 /// POST `/v1/nodes/bulk` auto-populate `Node.sparse_embed` on
231 /// ingest, and `/v1/retrieve` auto-encodes the query via
232 /// `SparseEncoder::encode_query` so the neural-sparse lane fires
233 /// end-to-end.
234 pub sparse_cfg: Option<mnem_sparse_providers::ProviderConfig>,
235 /// Index cache keyed by commit CID. Fixes audit gap G1: without
236 /// this, every `/v1/retrieve` call rebuilt the vector + sparse
237 /// indexes from scratch (O(N) per query). With this, the first
238 /// retrieve after a commit pays the build cost; every subsequent
239 /// retrieve returns in microseconds.
240 ///
241 /// Invalidation is automatic: any write path that commits also
242 /// produces a new head commit CID (via
243 /// `Transaction::commit -> ReadonlyRepo`). The cache sees the
244 /// mismatch next time and evicts.
245 pub indexes: Arc<Mutex<IndexCache>>,
246 /// Whether the server accepts caller-supplied `label` values on
247 /// ingest and `label` filters on retrieve. Read from the
248 /// `MNEM_BENCH` environment variable at startup.
249 ///
250 /// Defaults to `false`. Casual / single-tenant / personal-graph
251 /// installations keep label hidden: every ingested node gets
252 /// `ntype = Node::DEFAULT_NTYPE` ("Node") regardless of what the
253 /// caller sent, and retrieve ignores any label filter. No way to
254 /// flip this via a CLI flag or request body - operators opt in by
255 /// setting `MNEM_BENCH=1` at server launch, which is how the
256 /// reference benchmark harnesses in this repo pin per-item
257 /// isolation. Zero surface area for a regular user to stumble
258 /// into label-scoped state.
259 pub allow_labels: bool,
260 /// Prometheus metrics registry shared with the `/metrics` route
261 /// and the `track_metrics` middleware. Clones are cheap (`Arc`
262 /// inside); no per-request allocation.
263 pub metrics: Metrics,
264 /// Bearer token that authorises `/remote/v1/push-blocks` and
265 /// `/remote/v1/advance-head`. Read from `MNEM_HTTP_PUSH_TOKEN`
266 /// at startup. `None` means those routes are administratively
267 /// disabled (fail-closed): they return 503 regardless of
268 /// whatever the caller presented.
269 ///
270 /// The token never touches disk and is never emitted to tracing.
271 /// See [`crate::auth`] for the extractor that enforces the
272 /// check.
273 pub push_token: Option<String>,
274 /// C3 FIX-1 + FIX-2: authored-edges adjacency + Leiden community
275 /// assignment cache, keyed on the repo's op-id. Populated lazily
276 /// on the first retrieve that asks for `community_filter=true` or
277 /// `graph_mode="ppr"`. Invalidated whenever the op-id changes
278 /// (any write path). Single-slot cache: one authored-adjacency
279 /// snapshot is shared across requests until the next commit.
280 pub graph_cache: Arc<Mutex<GraphCache>>,
281 /// Gap 09 - `/v1/traverse_answer` runtime config. Default OFF per
282 /// architect Decision 4; opt in via
283 /// `[experimental] single_call_multihop = true` in the repo's
284 /// `config.toml`. Wrapped in `Arc` for cheap clones across handler
285 /// dispatch.
286 pub traverse_cfg: Arc<crate::routes::traverse::TraverseAnswerCfg>,
287 /// NER provider config resolved from the repo's `config.toml` at
288 /// startup. When `None`, ingest paths default to `NerConfig::Rule`.
289 pub ner_cfg: Option<mnem_ingest::NerConfig>,
290}
291
292impl AppState {
293 /// Read `MNEM_BENCH` at startup. See [`Self::parse_allow_labels`]
294 /// for the truthy / falsy string rules.
295 #[must_use]
296 pub fn resolve_allow_labels_from_env() -> bool {
297 Self::parse_allow_labels(std::env::var("MNEM_BENCH").ok().as_deref())
298 }
299
300 /// Read `MNEM_HTTP_PUSH_TOKEN` at startup. Empty / unset -> `None`
301 /// (writes fail-closed). See [`crate::auth::RequireBearer`] for the
302 /// extractor that enforces the check.
303 #[must_use]
304 pub fn resolve_push_token_from_env() -> Option<String> {
305 let raw = std::env::var("MNEM_HTTP_PUSH_TOKEN").ok()?;
306 let trimmed = raw.trim();
307 if trimmed.is_empty() {
308 None
309 } else {
310 Some(trimmed.to_string())
311 }
312 }
313
314 /// Pure parser for the `MNEM_BENCH` value. `None` (unset) is
315 /// false. Falsy strings (`"0"`, `"false"`, `"no"`, `"off"`, empty,
316 /// all case-insensitive) are false. Anything else is true.
317 #[must_use]
318 pub fn parse_allow_labels(val: Option<&str>) -> bool {
319 match val {
320 None => false,
321 Some(s) => {
322 let t = s.trim();
323 if t.is_empty() {
324 return false;
325 }
326 let l = t.to_ascii_lowercase();
327 !matches!(l.as_str(), "0" | "false" | "no" | "off")
328 }
329 }
330 }
331}
332
333/// Server-side cache of built retrieval indexes. Keyed on the repo's
334/// current head-commit CID; when that changes, the whole cache is
335/// treated as stale and rebuilt on demand.
336#[derive(Default)]
337pub struct IndexCache {
338 /// Repo op-id the current cache was built against. `None` on
339 /// startup or after a reset. When this differs from the repo's
340 /// current op-id, every entry below is considered stale.
341 ///
342 /// Named `cache_key_op_id` (not `commit_cid`) because op-id is
343 /// what we key on: op-id exists on a freshly-initialised repo
344 /// with no commits yet, which the commit CID does not.
345 pub cache_key_op_id: Option<Cid>,
346 /// Dense vector indexes keyed by the embedder's `model` string
347 /// (e.g. `"ollama:nomic-embed-text"`). One repo can carry multiple
348 /// model families if the caller runs several retrieves with
349 /// different `vector_model`s.
350 pub vectors: HashMap<String, Arc<BruteForceVectorIndex>>,
351 /// Sparse indexes keyed by `SparseEmbed::vocab_id`.
352 pub sparse: HashMap<String, Arc<SparseInvertedIndex>>,
353}
354
355#[cfg(test)]
356mod mnem_bench_parse_tests {
357 use super::*;
358
359 #[test]
360 fn unset_parses_false() {
361 assert!(!AppState::parse_allow_labels(None));
362 }
363
364 #[test]
365 fn falsy_strings_parse_false() {
366 for v in [
367 "", "0", "false", "FALSE", "False", "no", "No", "NO", "off", "Off", "OFF", " ", " 0 ",
368 ] {
369 assert!(
370 !AppState::parse_allow_labels(Some(v)),
371 "expected `{v:?}` to parse false"
372 );
373 }
374 }
375
376 #[test]
377 fn truthy_strings_parse_true() {
378 for v in ["1", "true", "yes", "on", "YES", "benchmark", "anything"] {
379 assert!(
380 AppState::parse_allow_labels(Some(v)),
381 "expected `{v:?}` to parse true"
382 );
383 }
384 }
385}
386
387impl IndexCache {
388 /// Reconcile the cache against the repo's current op-id. Called
389 /// on every cache access. When the op has changed (any write
390 /// flips it), all built indexes are dropped and the next getter
391 /// will rebuild.
392 ///
393 /// We key on `repo.op_id()` rather than `head_commit().cid`
394 /// because the op-id exists on freshly-initialised repos that
395 /// have no commits yet, and it changes on every write. Commit
396 /// CID would only change after the first real commit, leaving
397 /// an init-time cache entry with no way to invalidate.
398 pub fn reconcile(&mut self, repo: &ReadonlyRepo) {
399 let current = Some(repo.op_id().clone());
400 if self.cache_key_op_id != current {
401 self.cache_key_op_id = current;
402 self.vectors.clear();
403 self.sparse.clear();
404 }
405 }
406
407 /// Fetch (or build) a dense vector index for `model`.
408 pub fn vector_index(
409 &mut self,
410 repo: &ReadonlyRepo,
411 model: &str,
412 ) -> Result<Arc<BruteForceVectorIndex>, mnem_core::Error> {
413 self.reconcile(repo);
414 if let Some(idx) = self.vectors.get(model) {
415 return Ok(idx.clone());
416 }
417 let idx = Arc::new(repo.build_vector_index(model)?);
418 self.vectors.insert(model.to_string(), idx.clone());
419 Ok(idx)
420 }
421
422 /// Fetch (or build) a sparse inverted index for `vocab_id`.
423 pub fn sparse_index(
424 &mut self,
425 repo: &ReadonlyRepo,
426 vocab_id: &str,
427 ) -> Result<Arc<SparseInvertedIndex>, mnem_core::Error> {
428 self.reconcile(repo);
429 if let Some(idx) = self.sparse.get(vocab_id) {
430 return Ok(idx.clone());
431 }
432 let idx = Arc::new(SparseInvertedIndex::build_from_repo(repo, vocab_id)?);
433 self.sparse.insert(vocab_id.to_string(), idx.clone());
434 Ok(idx)
435 }
436}
437
438// ---------------------------------------------------------------
439// C3 FIX-1 + FIX-2: authored-edges + community-assignment cache.
440// ---------------------------------------------------------------
441
442/// Owned authored-edge list over which `AdjacencyIndex` is served.
443/// Collected once per op-id from the repo's commit.edges Prolly tree;
444/// Leiden community detection and PPR power iteration both read
445/// through the same snapshot.
446#[derive(Default, Debug)]
447pub struct AuthoredEdges {
448 /// Flat (src, dst) list in Prolly traversal order. Weighted `1.0`
449 /// by `iter_edges()` consumers; provenance stamped `Authored`.
450 pub edges: Vec<(NodeId, NodeId)>,
451}
452
453impl AdjacencyIndex for AuthoredEdges {
454 fn iter_edges(&self) -> Box<dyn Iterator<Item = AdjEdge> + '_> {
455 Box::new(self.edges.iter().map(|(s, d)| AdjEdge {
456 src: *s,
457 dst: *d,
458 weight: 1.0,
459 provenance: EdgeProvenance::Authored,
460 }))
461 }
462 fn edge_count(&self) -> usize {
463 self.edges.len()
464 }
465}
466
467/// C3 Patch-B: combined authored + KNN-derived adjacency, owned. Used
468/// by `GraphCache` to back both the Leiden community detector and the
469/// PPR adjacency wire in a single `AdjacencyIndex` impl without the
470/// lifetime acrobatics `mnem_core::HybridAdjacency<A,K>` imposes (it
471/// borrows both sources). Structurally equivalent to a
472/// `HybridAdjacency<AuthoredEdges, KnnSlice>` sent through an
473/// `Arc<dyn AdjacencyIndex + Send + Sync>` consumer.
474///
475/// # Flag-off / empty-KNN contract
476///
477/// When `knn` is empty, `iter_edges` yields exactly the authored
478/// stream in the authored source's native order, preserving the
479/// `mnem_core::tests::hybrid_adjacency_union` byte-identity contract.
480#[derive(Debug)]
481pub struct DerivedHybridAdjacency {
482 /// Authored-edge snapshot shared with `GraphCache::adjacency`.
483 pub authored: Arc<AuthoredEdges>,
484 /// KNN-derived edges in canonical `(src, dst)` ASC order. `weight`
485 /// is the similarity score, `provenance` is `Knn`.
486 pub knn: Vec<(NodeId, NodeId, f32)>,
487}
488
489impl AdjacencyIndex for DerivedHybridAdjacency {
490 fn iter_edges(&self) -> Box<dyn Iterator<Item = AdjEdge> + '_> {
491 let authored = self.authored.edges.iter().map(|(s, d)| AdjEdge {
492 src: *s,
493 dst: *d,
494 weight: 1.0,
495 provenance: EdgeProvenance::Authored,
496 });
497 let knn = self.knn.iter().map(|(s, d, w)| AdjEdge {
498 src: *s,
499 dst: *d,
500 weight: *w,
501 provenance: EdgeProvenance::Knn,
502 });
503 Box::new(authored.chain(knn))
504 }
505 fn edge_count(&self) -> usize {
506 self.authored.edges.len() + self.knn.len()
507 }
508}
509
510/// C3 FIX + Patch-B: authored adjacency, derived-KNN edges, and the
511/// community assignment keyed on `op_id`. Single-slot: one snapshot
512/// is shared across requests until the next commit.
513///
514/// Two-key design:
515/// - `key` = repo `op_id`; any write invalidates everything.
516/// - `knn_key` = content address of the KNN-edge index
517/// (`hash(root_cid, k, metric)`) used to avoid rebuilding the
518/// KNN-derived edges when the same vector index is re-used across
519/// retrieves at the same `op_id`.
520#[derive(Default)]
521pub struct GraphCache {
522 /// Op-id the cache was built against. `None` means empty.
523 pub key: Option<Cid>,
524 /// Shared authored-adjacency snapshot.
525 pub adjacency: Option<Arc<AuthoredEdges>>,
526 /// C3 Patch-B: KNN-derived edges produced by
527 /// `mnem_ann::derive_knn_edges_from_vectors` over the current
528 /// vector index. `None` means either (a) the authored adjacency
529 /// was already non-empty so KNN fallback never fired, or (b) the
530 /// vector index was empty (nothing to derive from).
531 pub knn_edges: Option<Arc<Vec<(NodeId, NodeId, f32)>>>,
532 /// Cache key for `knn_edges`:
533 /// `(KnnEdgeIndex::compute_cid, k, metric_tag)` projected down to
534 /// the content-address CID. Stable across restarts given the same
535 /// vector-index contents + k + metric.
536 pub knn_key: Option<Cid>,
537 /// Hybrid (authored + KNN) adjacency ready to hand out as
538 /// `Arc<dyn AdjacencyIndex + Send + Sync>`. Rebuilt whenever
539 /// `adjacency` or `knn_edges` changes.
540 pub hybrid: Option<Arc<DerivedHybridAdjacency>>,
541 /// Shared Leiden community assignment over `hybrid` (falls back
542 /// to `adjacency` when KNN fallback is inactive).
543 pub community: Option<Arc<mnem_graphrag::community::CommunityAssignment>>,
544 /// C3 FIX-1: cached row-stochastic CSR matrix used by PPR.
545 /// Invalidated alongside `adjacency` / `knn_edges` so the matrix
546 /// always reflects the current op-id's adjacency. Shared across
547 /// retrieves at the same op-id; `mnem_core::ppr::ppr_with_matrix`
548 /// is byte-identical to the from-scratch `ppr()` path (pinned by
549 /// the `ppr_with_matrix_matches_ppr_on_small_graph` integration
550 /// test), so turning this cache on is a pure-speed change.
551 pub ppr_matrix: Option<Arc<mnem_core::ppr::SparseTransition>>,
552 /// Gap 10 Phase-1: prior-commit assignment surviving op-id churn.
553 pub community_stale: Option<Arc<mnem_graphrag::community::CommunityAssignment>>,
554 /// Gap 10 Phase-1: debounce + storm-cap policy state.
555 pub leiden_cache: LeidenCache,
556}
557
558impl GraphCache {
559 /// Invalidate if the repo's op-id has moved. Called on every
560 /// cache access, mirrors [`IndexCache::reconcile`].
561 pub fn reconcile(&mut self, repo: &ReadonlyRepo) {
562 let current = Some(repo.op_id().clone());
563 if self.key != current {
564 self.key = current;
565 self.adjacency = None;
566 self.knn_edges = None;
567 self.knn_key = None;
568 self.hybrid = None;
569 // Gap 10 Phase-1: demote current assignment to `community_stale`
570 // so retrieves inside the debounce window can serve it.
571 if let Some(prev) = self.community.take() {
572 self.community_stale = Some(prev);
573 }
574 self.ppr_matrix = None;
575 }
576 }
577
578 /// Gap 10 Phase-1 entry point. Resolves the current mode via
579 /// [`LeidenCache::select_mode`], increments
580 /// `mnem_leiden_mode_total{mode=...}` + mirrors gauge triad, then
581 /// returns the assignment and the served mode.
582 pub fn community_for_head(
583 &mut self,
584 repo: &ReadonlyRepo,
585 vector: Option<&BruteForceVectorIndex>,
586 metrics: &Metrics,
587 ) -> Result<
588 (
589 Arc<mnem_graphrag::community::CommunityAssignment>,
590 LeidenMode,
591 ),
592 crate::error::Error,
593 > {
594 self.reconcile(repo);
595 let adj = self.hybrid_adjacency_for(repo, vector)?;
596 let node_count = authored_node_count(adj.as_ref());
597 let now = Instant::now();
598 let mode = self.leiden_cache.select_mode(node_count, now);
599
600 metrics
601 .leiden_debounce_effective
602 .set(i64::try_from(self.leiden_cache.effective_debounce_ms()).unwrap_or(i64::MAX));
603 metrics
604 .leiden_storm_cap_effective
605 .set(i64::from(self.leiden_cache.storm_cap_per_min));
606 #[allow(clippy::cast_possible_truncation)]
607 let delta_pp10k = (DELTA_RATIO_FORCE_FULL * 10_000.0) as i64;
608 metrics.leiden_delta_ratio_effective.set(delta_pp10k);
609 metrics.leiden_mode_current.set(mode.gauge_value());
610 metrics
611 .leiden_mode
612 .get_or_create(&LeidenModeLabels {
613 mode: mode.label().to_string(),
614 })
615 .inc();
616
617 match mode {
618 LeidenMode::Full | LeidenMode::FullDebounced => {
619 if let Some(c) = &self.community {
620 return Ok((c.clone(), mode));
621 }
622 let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
623 let arc = Arc::new(assignment);
624 self.community = Some(arc.clone());
625 if matches!(mode, LeidenMode::FullDebounced) {
626 self.leiden_cache.last_recompute_at = Some(now);
627 }
628 self.leiden_cache.observe_commit_arrival(now);
629 Ok((arc, mode))
630 }
631 LeidenMode::FallbackStale => {
632 if let Some(c) = &self.community {
633 return Ok((c.clone(), mode));
634 }
635 if let Some(c) = &self.community_stale {
636 return Ok((c.clone(), mode));
637 }
638 let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
639 let arc = Arc::new(assignment);
640 self.community_stale = Some(arc.clone());
641 Ok((arc, mode))
642 }
643 }
644 }
645
646 /// C3 FIX-1: fetch (or build) the row-stochastic CSR matrix used
647 /// by PPR, over the hybrid adjacency. The matrix is a pure
648 /// function of the adjacency; caching it lets repeated retrieves
649 /// at the same op-id skip the 3-pass CSR build (dominates PPR
650 /// cost at ~15 iters on small graphs).
651 ///
652 /// Byte-identity with the uncached `ppr()` path is pinned by the
653 /// `ppr_with_matrix_matches_ppr_on_small_graph` integration test
654 /// in mnem-core.
655 pub fn ppr_matrix_for(
656 &mut self,
657 repo: &ReadonlyRepo,
658 vector: Option<&BruteForceVectorIndex>,
659 ) -> Result<Arc<mnem_core::ppr::SparseTransition>, crate::error::Error> {
660 self.reconcile(repo);
661 if let Some(m) = &self.ppr_matrix {
662 return Ok(m.clone());
663 }
664 let adj = self.hybrid_adjacency_for(repo, vector)?;
665 let m = Arc::new(mnem_core::ppr::sparse_transition_matrix(adj.as_ref()));
666 self.ppr_matrix = Some(m.clone());
667 Ok(m)
668 }
669
670 /// Fetch (or build) the authored-adjacency snapshot. Back-compat
671 /// wrapper preserved for callers that do NOT want KNN fallback
672 /// (e.g. tests asserting the pure-authored path).
673 pub fn adjacency_for(
674 &mut self,
675 repo: &ReadonlyRepo,
676 ) -> Result<Arc<AuthoredEdges>, crate::error::Error> {
677 self.reconcile(repo);
678 if let Some(a) = &self.adjacency {
679 return Ok(a.clone());
680 }
681 let a = Arc::new(collect_authored_edges(repo)?);
682 self.adjacency = Some(a.clone());
683 Ok(a)
684 }
685
686 /// Fetch (or build) the Leiden community assignment over the
687 /// authored-only adjacency. Back-compat entry point.
688 pub fn community_for(
689 &mut self,
690 repo: &ReadonlyRepo,
691 ) -> Result<Arc<mnem_graphrag::community::CommunityAssignment>, crate::error::Error> {
692 self.reconcile(repo);
693 if let Some(c) = &self.community {
694 return Ok(c.clone());
695 }
696 let adj = self.adjacency_for(repo)?;
697 let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
698 let arc = Arc::new(assignment);
699 self.community = Some(arc.clone());
700 Ok(arc)
701 }
702
703 /// C3 Patch-B entry point: fetch (or build) the authored adjacency;
704 /// when it is empty (and `vector` is non-empty), derive a KNN-edge
705 /// substrate via `mnem_ann::derive_knn_edges_from_vectors` and
706 /// cache a combined [`DerivedHybridAdjacency`].
707 ///
708 /// Returns a trait-object `Arc<dyn AdjacencyIndex + Send + Sync>`
709 /// so the retriever's `with_adjacency_index` consumer can accept
710 /// it without knowing whether the KNN lane fired.
711 ///
712 /// # Zero-impact contracts
713 ///
714 /// - Authored non-empty -> returns the authored snapshot; KNN lane
715 /// never runs; `knn_edges` stays `None`.
716 /// - `vector` is `None` or empty -> returns the authored snapshot
717 /// (possibly empty); no KNN fallback; preserves the "no graph,
718 /// no filter" legacy behaviour.
719 /// - Same `op_id` + same vector-index content -> served from the
720 /// single-slot cache; `derive_knn_edges_from_vectors` is not
721 /// re-run.
722 pub fn hybrid_adjacency_for(
723 &mut self,
724 repo: &ReadonlyRepo,
725 vector: Option<&BruteForceVectorIndex>,
726 ) -> Result<Arc<dyn AdjacencyIndex + Send + Sync>, crate::error::Error> {
727 self.reconcile(repo);
728 let authored = self.adjacency_for(repo)?;
729
730 // Zero-impact short-circuits.
731 if !authored.edges.is_empty() {
732 return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
733 }
734 let Some(vec_idx) = vector else {
735 return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
736 };
737 if vec_idx.is_empty() {
738 return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
739 }
740
741 // Fallback active: derive (or fetch cached) KNN edges. The
742 // `KnnEdgeIndex` CID folds in (root_cid, k, metric) so the
743 // cache key is stable across restarts given the same vector
744 // content.
745 self.ensure_knn_edges(vec_idx)?;
746 let knn = self
747 .knn_edges
748 .clone()
749 .expect("ensure_knn_edges populated the slot");
750 // One-shot info log the first time the fallback path wins.
751 if !KNN_FALLBACK_LOGGED.swap(true, Ordering::Relaxed) {
752 tracing::info!(
753 target: "mnem_http::graph_cache",
754 k = KNN_FALLBACK_K,
755 metric = "cosine",
756 knn_edges = knn.len(),
757 vector_model = %vec_idx.model(),
758 "authored adjacency empty; KNN-edge fallback activated (E0 wire)",
759 );
760 }
761 if self.hybrid.is_none() {
762 self.hybrid = Some(Arc::new(DerivedHybridAdjacency {
763 authored: authored.clone(),
764 knn: (*knn).clone(),
765 }));
766 }
767 Ok(self.hybrid.clone().expect("hybrid slot populated above")
768 as Arc<dyn AdjacencyIndex + Send + Sync>)
769 }
770
771 /// C3 Patch-B: community assignment over the hybrid adjacency.
772 /// Falls through to the authored-only path when `vector` is absent
773 /// or empty, or when authored edges are already non-empty.
774 pub fn hybrid_community_for(
775 &mut self,
776 repo: &ReadonlyRepo,
777 vector: Option<&BruteForceVectorIndex>,
778 ) -> Result<Arc<mnem_graphrag::community::CommunityAssignment>, crate::error::Error> {
779 self.reconcile(repo);
780 if let Some(c) = &self.community {
781 return Ok(c.clone());
782 }
783 let adj = self.hybrid_adjacency_for(repo, vector)?;
784 // compute_communities takes `&dyn AdjacencyIndex`; the trait
785 // object's deref coerces.
786 let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
787 let arc = Arc::new(assignment);
788 self.community = Some(arc.clone());
789 Ok(arc)
790 }
791
792 /// Derive KNN edges from `vector` and cache them along with the
793 /// `KnnEdgeIndex` CID as the cache key. Idempotent: returns early
794 /// when the key already matches.
795 fn ensure_knn_edges(
796 &mut self,
797 vector: &BruteForceVectorIndex,
798 ) -> Result<(), crate::error::Error> {
799 // Collect (ids, vecs) from the brute-force index. The slices
800 // are borrowed from the index's flat buffer (zero copy up to
801 // the `.to_vec()` the KNN builder does internally when it
802 // clones per-source rows).
803 let mut ids: Vec<NodeId> = Vec::with_capacity(vector.len());
804 let mut vecs: Vec<Vec<f32>> = Vec::with_capacity(vector.len());
805 for (id, row) in vector.points_iter() {
806 ids.push(id);
807 vecs.push(row.to_vec());
808 }
809 let edges = mnem_ann::derive_knn_edges_from_vectors(
810 &ids,
811 &vecs,
812 KNN_FALLBACK_K,
813 mnem_ann::DistanceMetric::Cosine,
814 );
815 // Assemble a KnnEdgeIndex purely to compute the cache-key CID.
816 // The `root_cid` is the content hash of (model, dim, ids,
817 // flat_vecs); we lean on the HybridAdjacency E0 substrate's
818 // existing `compute_cid` so the key is stable across restarts
819 // given identical vector content.
820 let root_cid = vector_index_content_cid(vector, &ids)?;
821 let idx = mnem_ann::KnnEdgeIndex {
822 root_cid,
823 k: KNN_FALLBACK_K,
824 metric: mnem_ann::DistanceMetric::Cosine,
825 edges,
826 };
827 let cid = idx
828 .compute_cid()
829 .map_err(|e| crate::error::Error::internal(format!("knn edge cid: {e}")))?;
830 if self.knn_key.as_ref() == Some(&cid) && self.knn_edges.is_some() {
831 return Ok(());
832 }
833 let triples: Vec<(NodeId, NodeId, f32)> = idx
834 .edges
835 .into_iter()
836 .map(|e| (e.src, e.dst, e.weight))
837 .collect();
838 self.knn_edges = Some(Arc::new(triples));
839 self.knn_key = Some(cid);
840 self.hybrid = None; // invalidate derived combined view
841 Ok(())
842 }
843}
844
845/// Derive a stable content-address for a [`BruteForceVectorIndex`] by
846/// hashing `(model, dim, canonical_ids)`. Two vector indexes with the
847/// same model, dimensionality and ID set share a CID, which is the
848/// coarser grain we want for the KNN-edge cache key (the vectors
849/// themselves live in the Prolly tree keyed by NodeId, so equal IDs
850/// at the same op_id imply equal vector contents).
851fn vector_index_content_cid(
852 vector: &BruteForceVectorIndex,
853 ids: &[NodeId],
854) -> Result<Cid, crate::error::Error> {
855 use mnem_core::codec::to_canonical_bytes;
856 use mnem_core::id::{CODEC_RAW, Multihash};
857 #[derive(serde::Serialize)]
858 struct Preimage<'a> {
859 tag: &'a str,
860 model: &'a str,
861 dim: u32,
862 ids: &'a [NodeId],
863 }
864 let pre = Preimage {
865 tag: "mnem-http/knn-fallback/v1",
866 model: vector.model(),
867 dim: vector.dim(),
868 ids,
869 };
870 let body = to_canonical_bytes(&pre)
871 .map_err(|e| crate::error::Error::internal(format!("canonical encode: {e}")))?;
872 let hash = Multihash::sha2_256(&body);
873 Ok(Cid::new(CODEC_RAW, hash))
874}
875
876/// Gap 10 Phase-1: distinct node count walking adjacency edges.
877fn authored_node_count(adj: &(dyn AdjacencyIndex + Send + Sync)) -> usize {
878 use std::collections::BTreeSet;
879 let mut seen: BTreeSet<NodeId> = BTreeSet::new();
880 for e in adj.iter_edges() {
881 seen.insert(e.src);
882 seen.insert(e.dst);
883 }
884 seen.len()
885}
886
887/// Walk `commit.edges` Prolly tree once and collect every authored
888/// edge as a `(src, dst)` pair in canonical traversal order.
889fn collect_authored_edges(repo: &ReadonlyRepo) -> Result<AuthoredEdges, crate::error::Error> {
890 let Some(commit) = repo.head_commit() else {
891 return Ok(AuthoredEdges::default());
892 };
893 let bs = repo.blockstore().clone();
894 let cursor = mnem_core::prolly::Cursor::new(&*bs, &commit.edges)
895 .map_err(|e| crate::error::Error::internal(format!("opening edge cursor: {e}")))?;
896 let mut edges: Vec<(NodeId, NodeId)> = Vec::new();
897 for entry in cursor {
898 let (_key, edge_cid) =
899 entry.map_err(|e| crate::error::Error::internal(format!("walking edge tree: {e}")))?;
900 let bytes = bs
901 .get(&edge_cid)
902 .map_err(|e| crate::error::Error::internal(format!("fetching edge block: {e}")))?
903 .ok_or_else(|| {
904 crate::error::Error::internal(format!("edge block {edge_cid} missing"))
905 })?;
906 let edge: mnem_core::objects::Edge = mnem_core::codec::from_canonical_bytes(&bytes)
907 .map_err(|e| crate::error::Error::internal(format!("decoding edge: {e}")))?;
908 edges.push((edge.src, edge.dst));
909 }
910 Ok(AuthoredEdges { edges })
911}
912
913#[cfg(test)]
914pub(crate) mod test_support {
915 //! Test-only builders for [`AppState`]. Scoped `pub(crate)` so
916 //! sibling modules (`auth`, `routes::remote`) can build a minimal
917 //! in-memory state without paying the full `app_with_options`
918 //! path (redb open + config load) for every unit test.
919
920 use super::*;
921 use mnem_core::store::{MemoryBlockstore, MemoryOpHeadsStore};
922
923 /// Build an `AppState` backed by in-memory stores, with an
924 /// optional push token. Used by extractor and route unit tests.
925 pub(crate) fn state_with_token(token: Option<String>) -> AppState {
926 let bs: Arc<dyn mnem_core::store::Blockstore> = Arc::new(MemoryBlockstore::new());
927 let ohs: Arc<dyn mnem_core::store::OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
928 let repo = ReadonlyRepo::init(bs, ohs).expect("init ok");
929 AppState {
930 repo: Arc::new(Mutex::new(repo)),
931 embed_cfg: None,
932 sparse_cfg: None,
933 indexes: Arc::new(Mutex::new(IndexCache::default())),
934 allow_labels: false,
935 metrics: Metrics::new(),
936 push_token: token,
937 graph_cache: Arc::new(Mutex::new(GraphCache::default())),
938 traverse_cfg: Arc::new(crate::routes::traverse::TraverseAnswerCfg::default()),
939 ner_cfg: None,
940 }
941 }
942}
943
944#[cfg(test)]
945mod knn_fallback_tests {
946 //! C3 Patch-B unit tests: `GraphCache` KNN-edge fallback activation
947 //! and zero-impact short-circuits.
948
949 use super::*;
950 use bytes::Bytes;
951 use mnem_core::objects::node::{Dtype, Embedding};
952 use mnem_core::store::{Blockstore, MemoryBlockstore, MemoryOpHeadsStore, OpHeadsStore};
953
954 fn stores() -> (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) {
955 (
956 Arc::new(MemoryBlockstore::new()),
957 Arc::new(MemoryOpHeadsStore::new()),
958 )
959 }
960
961 fn f32_embed(model: &str, v: &[f32]) -> Embedding {
962 let mut bytes = Vec::with_capacity(v.len() * 4);
963 for x in v {
964 bytes.extend_from_slice(&x.to_le_bytes());
965 }
966 Embedding {
967 model: model.to_string(),
968 dtype: Dtype::F32,
969 dim: u32::try_from(v.len()).expect("test vec fits in u32"),
970 vector: Bytes::from(bytes),
971 }
972 }
973
974 fn build_vector_index(rows: &[(NodeId, Vec<f32>)]) -> BruteForceVectorIndex {
975 let mut idx = BruteForceVectorIndex::empty("m", 3);
976 for (id, v) in rows {
977 let inserted = idx.try_insert(*id, &f32_embed("m", v));
978 assert!(inserted, "embedding insert");
979 }
980 idx
981 }
982
983 #[test]
984 fn empty_authored_plus_empty_vector_is_no_op() {
985 let (bs, ohs) = stores();
986 let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
987 let mut gc = GraphCache::default();
988 let adj = gc.hybrid_adjacency_for(&repo, None).ok().expect("no-op");
989 assert_eq!(adj.edge_count(), 0, "no vectors -> no KNN fallback");
990 assert!(gc.knn_edges.is_none());
991 }
992
993 #[test]
994 fn empty_authored_plus_populated_vector_activates_fallback() {
995 let (bs, ohs) = stores();
996 let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
997 // 4 distinct L2-direction vectors; KNN k=32 (capped at n-1=3)
998 // yields at least one directed edge per source.
999 let rows: Vec<(NodeId, Vec<f32>)> = vec![
1000 (NodeId::new_v7(), vec![1.0, 0.0, 0.0]),
1001 (NodeId::new_v7(), vec![0.9, 0.1, 0.0]),
1002 (NodeId::new_v7(), vec![0.0, 1.0, 0.0]),
1003 (NodeId::new_v7(), vec![0.0, 0.0, 1.0]),
1004 ];
1005 let vec_idx = build_vector_index(&rows);
1006
1007 let mut gc = GraphCache::default();
1008 let adj = gc
1009 .hybrid_adjacency_for(&repo, Some(&vec_idx))
1010 .ok()
1011 .expect("knn fallback ok");
1012 assert!(
1013 adj.edge_count() > 0,
1014 "KNN fallback must produce at least one edge (got 0)",
1015 );
1016 assert!(gc.knn_edges.is_some(), "knn_edges slot populated");
1017 assert!(gc.knn_key.is_some(), "knn cache key populated");
1018
1019 // Community assignment must see the derived graph: at least one
1020 // community membership lookup must be Some (non-trivial graph).
1021 let assignment = gc
1022 .hybrid_community_for(&repo, Some(&vec_idx))
1023 .ok()
1024 .expect("community ok");
1025 let any_assigned = rows
1026 .iter()
1027 .any(|(id, _)| assignment.community_of(*id).is_some());
1028 assert!(
1029 any_assigned,
1030 "at least one node must have a community under a non-empty adjacency",
1031 );
1032 }
1033
1034 #[test]
1035 fn knn_fallback_is_idempotent_on_same_vector() {
1036 let (bs, ohs) = stores();
1037 let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
1038 let rows: Vec<(NodeId, Vec<f32>)> = vec![
1039 (NodeId::new_v7(), vec![1.0, 0.0, 0.0]),
1040 (NodeId::new_v7(), vec![0.0, 1.0, 0.0]),
1041 ];
1042 let vec_idx = build_vector_index(&rows);
1043 let mut gc = GraphCache::default();
1044 let _ = gc
1045 .hybrid_adjacency_for(&repo, Some(&vec_idx))
1046 .ok()
1047 .expect("first build");
1048 let first_key = gc.knn_key.clone().expect("first build populates key");
1049 // Second call must re-use the cached slot (same op_id + same
1050 // vector content -> same KnnEdgeIndex CID).
1051 let _ = gc
1052 .hybrid_adjacency_for(&repo, Some(&vec_idx))
1053 .ok()
1054 .expect("second build");
1055 let second_key = gc.knn_key.clone().expect("second build populates key");
1056 assert_eq!(first_key, second_key, "KNN cache key stable across calls");
1057 }
1058}