mnem_http/state.rs
1//! Shared app state. One `ReadonlyRepo` per server, behind an
2//! `Arc<Mutex>` because redb holds an exclusive file lock so we can
3//! only have one open per process. Writes take the lock, run a
4//! transaction, and replace the repo with the post-commit value.
5
6use std::collections::{HashMap, VecDeque};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10
11use mnem_core::id::{Cid, NodeId};
12use mnem_core::index::hybrid::{AdjEdge, AdjacencyIndex, EdgeProvenance};
13use mnem_core::index::{BruteForceVectorIndex, SparseInvertedIndex, VectorIndex};
14use mnem_core::repo::ReadonlyRepo;
15
16use crate::metrics::{LeidenModeLabels, Metrics};
17
18/// Default per-source neighbour count for the KNN-edge fallback. 32
19/// matches the `k=32, metric=cosine` determinism contract the
20/// `KnnEdgeIndex::compute_cid` key derivation bakes in.
21const KNN_FALLBACK_K: u32 = 32;
22
23/// One-shot guard so the "KNN-edge fallback activated" info-level log
24/// fires once per process lifetime (not once per retrieve). Keeps the
25/// prod log from flooding while still emitting the "yes, E0 wire is
26/// live" breadcrumb.
27static KNN_FALLBACK_LOGGED: AtomicBool = AtomicBool::new(false);
28
29// ---------------------------------------------------------------
30// Gap 10 Phase-1: community-cache invalidation tunables
31// ---------------------------------------------------------------
32
33/// Gap 10 R6 floor-c tunable: commit-storm DoS cap per minute. Default 60.
34///
35/// `#tunable: default=60; rationale="attacker cannot amplify beyond human commit rate"`
36pub const COMMIT_STORM_CAP_PER_MIN: u32 = 60;
37
38/// Gap 10 R6 floor-c tunable: fraction of the graph that must change
39/// in one commit before an incremental recompute path force-flips to
40/// full. Exported today as a gauge-visible constant; consulted by the
41/// debounced-full recompute loop when deciding whether to bypass the
42/// incremental shortcut.
43///
44/// `#tunable: default=0.5; rationale="half-graph change = incremental not cheaper than full"`
45pub const DELTA_RATIO_FORCE_FULL: f32 = 0.5;
46
47/// Gap 10 Phase-1 graph-size gate: above this node count the hot path
48/// refuses to run full-Leiden inline and serves `FallbackStale`.
49///
50/// `#tunable: default=250_000; rationale="HNSW memory derivation; see benchmarks/leiden-wallclock-vs-V.md"`
51pub const GRAPH_SIZE_GATE_V: usize = 250_000;
52
53/// Gap 10 R3 debounce floor.
54pub const DEBOUNCE_FLOOR_MS: u64 = 1_000;
55
56/// Size of the rolling ring buffer used for p75 commit-latency.
57pub const COMMIT_LATENCY_WINDOW: usize = 100;
58
59/// Gap 10 R6 code-sketch API: runtime-derived debounce window.
60#[must_use]
61pub fn derive_debounce_ms(rolling_p75_commit_ms: Option<u64>) -> u64 {
62 rolling_p75_commit_ms
63 .map(|p| p.max(DEBOUNCE_FLOOR_MS))
64 .unwrap_or(DEBOUNCE_FLOOR_MS)
65}
66
67/// Gap 10 Phase-1 recompute-mode enum. Closed vocabulary.
68#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
69pub enum LeidenMode {
70 /// Immediate full recompute on every commit. Env-forced.
71 Full,
72 /// Debounced full recompute (a future version default).
73 FullDebounced,
74 /// Served prior commit's assignment; refresh suppressed.
75 FallbackStale,
76}
77
78impl LeidenMode {
79 /// Prometheus label string.
80 #[must_use]
81 pub fn label(&self) -> &'static str {
82 match self {
83 Self::Full => "full",
84 Self::FullDebounced => "full_debounced",
85 Self::FallbackStale => "fallback_stale",
86 }
87 }
88
89 /// Gauge encoding for `mnem_leiden_mode_current`.
90 #[must_use]
91 pub fn gauge_value(&self) -> i64 {
92 match self {
93 Self::Full => 0,
94 Self::FullDebounced => 1,
95 Self::FallbackStale => 2,
96 }
97 }
98
99 /// Resolve default mode from env at startup.
100 #[must_use]
101 pub fn resolve_default_from_env() -> Self {
102 match std::env::var("MNEM_LEIDEN_FULL_RECOMPUTE").ok() {
103 Some(v) => {
104 let t = v.trim().to_ascii_lowercase();
105 if t.is_empty() || matches!(t.as_str(), "0" | "false" | "no" | "off") {
106 Self::FullDebounced
107 } else {
108 Self::Full
109 }
110 }
111 None => Self::FullDebounced,
112 }
113 }
114}
115
116/// Gap 10 Phase-1 debounce + storm-cap state.
117#[derive(Debug)]
118pub struct LeidenCache {
119 /// Rolling ring of commit wall-clock latencies (milliseconds).
120 pub commit_latency_ms: VecDeque<u64>,
121 /// Instant of the most recent successful full recompute.
122 pub last_recompute_at: Option<Instant>,
123 /// Ring of commit arrivals inside the trailing 60s.
124 pub commit_arrivals: VecDeque<Instant>,
125 /// Default mode at process start.
126 pub default_mode: LeidenMode,
127 /// Effective storm cap (operator-overridable).
128 pub storm_cap_per_min: u32,
129}
130
131impl Default for LeidenCache {
132 fn default() -> Self {
133 Self {
134 commit_latency_ms: VecDeque::with_capacity(COMMIT_LATENCY_WINDOW),
135 last_recompute_at: None,
136 commit_arrivals: VecDeque::new(),
137 default_mode: LeidenMode::FullDebounced,
138 storm_cap_per_min: COMMIT_STORM_CAP_PER_MIN,
139 }
140 }
141}
142
143impl LeidenCache {
144 /// Record a completed-commit wall-time sample.
145 pub fn observe_commit_latency(&mut self, latency: Duration) {
146 let ms = u64::try_from(latency.as_millis()).unwrap_or(u64::MAX);
147 if self.commit_latency_ms.len() == COMMIT_LATENCY_WINDOW {
148 self.commit_latency_ms.pop_front();
149 }
150 self.commit_latency_ms.push_back(ms);
151 }
152
153 /// Record a commit arrival; evicts entries older than 60s.
154 pub fn observe_commit_arrival(&mut self, at: Instant) {
155 let cutoff = at.checked_sub(Duration::from_mins(1)).unwrap_or(at);
156 while let Some(front) = self.commit_arrivals.front() {
157 if *front < cutoff {
158 self.commit_arrivals.pop_front();
159 } else {
160 break;
161 }
162 }
163 self.commit_arrivals.push_back(at);
164 }
165
166 /// Nearest-rank p75 of the rolling commit-latency ring.
167 #[must_use]
168 pub fn rolling_p75_commit_ms(&self) -> Option<u64> {
169 if self.commit_latency_ms.is_empty() {
170 return None;
171 }
172 let mut sorted: Vec<u64> = self.commit_latency_ms.iter().copied().collect();
173 sorted.sort_unstable();
174 let n = sorted.len();
175 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
176 let idx = ((n as f64 * 0.75).ceil() as usize)
177 .saturating_sub(1)
178 .min(n - 1);
179 Some(sorted[idx])
180 }
181
182 /// Effective debounce window via [`derive_debounce_ms`].
183 #[must_use]
184 pub fn effective_debounce_ms(&self) -> u64 {
185 derive_debounce_ms(self.rolling_p75_commit_ms())
186 }
187
188 /// Is the 60s commit-arrival ring at or above the storm cap?
189 #[must_use]
190 pub fn storm_cap_reached(&self) -> bool {
191 u32::try_from(self.commit_arrivals.len()).unwrap_or(u32::MAX) >= self.storm_cap_per_min
192 }
193
194 /// Pure policy function. Returns the mode to serve this call.
195 #[must_use]
196 pub fn select_mode(&self, node_count: usize, now: Instant) -> LeidenMode {
197 if self.default_mode == LeidenMode::Full {
198 return LeidenMode::Full;
199 }
200 if node_count >= GRAPH_SIZE_GATE_V {
201 return LeidenMode::FallbackStale;
202 }
203 if self.storm_cap_reached() {
204 return LeidenMode::FallbackStale;
205 }
206 if let Some(last) = self.last_recompute_at {
207 let elapsed_ms =
208 u64::try_from(now.saturating_duration_since(last).as_millis()).unwrap_or(u64::MAX);
209 if elapsed_ms < self.effective_debounce_ms() {
210 return LeidenMode::FallbackStale;
211 }
212 }
213 LeidenMode::FullDebounced
214 }
215}
216
217/// Shared application state passed to every axum handler via
218/// `State<AppState>`. Clones are cheap (shared `Arc`).
219#[derive(Clone)]
220pub struct AppState {
221 /// The open repo, held behind a mutex because redb keeps an
222 /// exclusive file lock per-process. Writes replace the value
223 /// inside the mutex with the post-commit `ReadonlyRepo`.
224 pub repo: Arc<Mutex<ReadonlyRepo>>,
225 /// Optional embed-provider config resolved from the repo's
226 /// `config.toml` at startup.
227 pub embed_cfg: Option<mnem_embed_providers::ProviderConfig>,
228 /// Optional sparse-provider config resolved from the repo's
229 /// `config.toml` at startup. When present, POST `/v1/nodes` and
230 /// POST `/v1/nodes/bulk` auto-populate `Node.sparse_embed` on
231 /// ingest, and `/v1/retrieve` auto-encodes the query via
232 /// `SparseEncoder::encode_query` so the neural-sparse lane fires
233 /// end-to-end.
234 pub sparse_cfg: Option<mnem_sparse_providers::ProviderConfig>,
235 /// Index cache keyed by commit CID. Fixes audit gap G1: without
236 /// this, every `/v1/retrieve` call rebuilt the vector + sparse
237 /// indexes from scratch (O(N) per query). With this, the first
238 /// retrieve after a commit pays the build cost; every subsequent
239 /// retrieve returns in microseconds.
240 ///
241 /// Invalidation is automatic: any write path that commits also
242 /// produces a new head commit CID (via
243 /// `Transaction::commit -> ReadonlyRepo`). The cache sees the
244 /// mismatch next time and evicts.
245 pub indexes: Arc<Mutex<IndexCache>>,
246 /// Whether the server accepts caller-supplied `label` values on
247 /// ingest and `label` filters on retrieve. Read from the
248 /// `MNEM_BENCH` environment variable at startup.
249 ///
250 /// Defaults to `false`. Casual / single-tenant / personal-graph
251 /// installations keep label hidden: every ingested node gets
252 /// `ntype = Node::DEFAULT_NTYPE` ("Node") regardless of what the
253 /// caller sent, and retrieve ignores any label filter. No way to
254 /// flip this via a CLI flag or request body - operators opt in by
255 /// setting `MNEM_BENCH=1` at server launch, which is how the
256 /// reference benchmark harnesses in this repo pin per-item
257 /// isolation. Zero surface area for a regular user to stumble
258 /// into label-scoped state.
259 pub allow_labels: bool,
260 /// Prometheus metrics registry shared with the `/metrics` route
261 /// and the `track_metrics` middleware. Clones are cheap (`Arc`
262 /// inside); no per-request allocation.
263 pub metrics: Metrics,
264 /// Bearer token that authorises `/remote/v1/push-blocks` and
265 /// `/remote/v1/advance-head`. Read from `MNEM_HTTP_PUSH_TOKEN`
266 /// at startup. `None` means those routes are administratively
267 /// disabled (fail-closed): they return 503 regardless of
268 /// whatever the caller presented.
269 ///
270 /// The token never touches disk and is never emitted to tracing.
271 /// See [`crate::auth`] for the extractor that enforces the
272 /// check.
273 pub push_token: Option<String>,
274 /// C3 FIX-1 + FIX-2: authored-edges adjacency + Leiden community
275 /// assignment cache, keyed on the repo's op-id. Populated lazily
276 /// on the first retrieve that asks for `community_filter=true` or
277 /// `graph_mode="ppr"`. Invalidated whenever the op-id changes
278 /// (any write path). Single-slot cache: one authored-adjacency
279 /// snapshot is shared across requests until the next commit.
280 pub graph_cache: Arc<Mutex<GraphCache>>,
281 /// Gap 09 - `/v1/traverse_answer` runtime config. Default OFF per
282 /// architect Decision 4; opt in via
283 /// `[experimental] single_call_multihop = true` in the repo's
284 /// `config.toml`. Wrapped in `Arc` for cheap clones across handler
285 /// dispatch.
286 pub traverse_cfg: Arc<crate::routes::traverse::TraverseAnswerCfg>,
287}
288
289impl AppState {
290 /// Read `MNEM_BENCH` at startup. See [`Self::parse_allow_labels`]
291 /// for the truthy / falsy string rules.
292 #[must_use]
293 pub fn resolve_allow_labels_from_env() -> bool {
294 Self::parse_allow_labels(std::env::var("MNEM_BENCH").ok().as_deref())
295 }
296
297 /// Read `MNEM_HTTP_PUSH_TOKEN` at startup. Empty / unset -> `None`
298 /// (writes fail-closed). See [`crate::auth::RequireBearer`] for the
299 /// extractor that enforces the check.
300 #[must_use]
301 pub fn resolve_push_token_from_env() -> Option<String> {
302 let raw = std::env::var("MNEM_HTTP_PUSH_TOKEN").ok()?;
303 let trimmed = raw.trim();
304 if trimmed.is_empty() {
305 None
306 } else {
307 Some(trimmed.to_string())
308 }
309 }
310
311 /// Pure parser for the `MNEM_BENCH` value. `None` (unset) is
312 /// false. Falsy strings (`"0"`, `"false"`, `"no"`, `"off"`, empty,
313 /// all case-insensitive) are false. Anything else is true.
314 #[must_use]
315 pub fn parse_allow_labels(val: Option<&str>) -> bool {
316 match val {
317 None => false,
318 Some(s) => {
319 let t = s.trim();
320 if t.is_empty() {
321 return false;
322 }
323 let l = t.to_ascii_lowercase();
324 !matches!(l.as_str(), "0" | "false" | "no" | "off")
325 }
326 }
327 }
328}
329
330/// Server-side cache of built retrieval indexes. Keyed on the repo's
331/// current head-commit CID; when that changes, the whole cache is
332/// treated as stale and rebuilt on demand.
333#[derive(Default)]
334pub struct IndexCache {
335 /// Repo op-id the current cache was built against. `None` on
336 /// startup or after a reset. When this differs from the repo's
337 /// current op-id, every entry below is considered stale.
338 ///
339 /// Named `cache_key_op_id` (not `commit_cid`) because op-id is
340 /// what we key on: op-id exists on a freshly-initialised repo
341 /// with no commits yet, which the commit CID does not.
342 pub cache_key_op_id: Option<Cid>,
343 /// Dense vector indexes keyed by the embedder's `model` string
344 /// (e.g. `"ollama:nomic-embed-text"`). One repo can carry multiple
345 /// model families if the caller runs several retrieves with
346 /// different `vector_model`s.
347 pub vectors: HashMap<String, Arc<BruteForceVectorIndex>>,
348 /// Sparse indexes keyed by `SparseEmbed::vocab_id`.
349 pub sparse: HashMap<String, Arc<SparseInvertedIndex>>,
350}
351
352#[cfg(test)]
353mod mnem_bench_parse_tests {
354 use super::*;
355
356 #[test]
357 fn unset_parses_false() {
358 assert!(!AppState::parse_allow_labels(None));
359 }
360
361 #[test]
362 fn falsy_strings_parse_false() {
363 for v in [
364 "", "0", "false", "FALSE", "False", "no", "No", "NO", "off", "Off", "OFF", " ", " 0 ",
365 ] {
366 assert!(
367 !AppState::parse_allow_labels(Some(v)),
368 "expected `{v:?}` to parse false"
369 );
370 }
371 }
372
373 #[test]
374 fn truthy_strings_parse_true() {
375 for v in ["1", "true", "yes", "on", "YES", "benchmark", "anything"] {
376 assert!(
377 AppState::parse_allow_labels(Some(v)),
378 "expected `{v:?}` to parse true"
379 );
380 }
381 }
382}
383
384impl IndexCache {
385 /// Reconcile the cache against the repo's current op-id. Called
386 /// on every cache access. When the op has changed (any write
387 /// flips it), all built indexes are dropped and the next getter
388 /// will rebuild.
389 ///
390 /// We key on `repo.op_id()` rather than `head_commit().cid`
391 /// because the op-id exists on freshly-initialised repos that
392 /// have no commits yet, and it changes on every write. Commit
393 /// CID would only change after the first real commit, leaving
394 /// an init-time cache entry with no way to invalidate.
395 pub fn reconcile(&mut self, repo: &ReadonlyRepo) {
396 let current = Some(repo.op_id().clone());
397 if self.cache_key_op_id != current {
398 self.cache_key_op_id = current;
399 self.vectors.clear();
400 self.sparse.clear();
401 }
402 }
403
404 /// Fetch (or build) a dense vector index for `model`.
405 pub fn vector_index(
406 &mut self,
407 repo: &ReadonlyRepo,
408 model: &str,
409 ) -> Result<Arc<BruteForceVectorIndex>, mnem_core::Error> {
410 self.reconcile(repo);
411 if let Some(idx) = self.vectors.get(model) {
412 return Ok(idx.clone());
413 }
414 let idx = Arc::new(repo.build_vector_index(model)?);
415 self.vectors.insert(model.to_string(), idx.clone());
416 Ok(idx)
417 }
418
419 /// Fetch (or build) a sparse inverted index for `vocab_id`.
420 pub fn sparse_index(
421 &mut self,
422 repo: &ReadonlyRepo,
423 vocab_id: &str,
424 ) -> Result<Arc<SparseInvertedIndex>, mnem_core::Error> {
425 self.reconcile(repo);
426 if let Some(idx) = self.sparse.get(vocab_id) {
427 return Ok(idx.clone());
428 }
429 let idx = Arc::new(SparseInvertedIndex::build_from_repo(repo, vocab_id)?);
430 self.sparse.insert(vocab_id.to_string(), idx.clone());
431 Ok(idx)
432 }
433}
434
435// ---------------------------------------------------------------
436// C3 FIX-1 + FIX-2: authored-edges + community-assignment cache.
437// ---------------------------------------------------------------
438
439/// Owned authored-edge list over which `AdjacencyIndex` is served.
440/// Collected once per op-id from the repo's commit.edges Prolly tree;
441/// Leiden community detection and PPR power iteration both read
442/// through the same snapshot.
443#[derive(Default, Debug)]
444pub struct AuthoredEdges {
445 /// Flat (src, dst) list in Prolly traversal order. Weighted `1.0`
446 /// by `iter_edges()` consumers; provenance stamped `Authored`.
447 pub edges: Vec<(NodeId, NodeId)>,
448}
449
450impl AdjacencyIndex for AuthoredEdges {
451 fn iter_edges(&self) -> Box<dyn Iterator<Item = AdjEdge> + '_> {
452 Box::new(self.edges.iter().map(|(s, d)| AdjEdge {
453 src: *s,
454 dst: *d,
455 weight: 1.0,
456 provenance: EdgeProvenance::Authored,
457 }))
458 }
459 fn edge_count(&self) -> usize {
460 self.edges.len()
461 }
462}
463
464/// C3 Patch-B: combined authored + KNN-derived adjacency, owned. Used
465/// by `GraphCache` to back both the Leiden community detector and the
466/// PPR adjacency wire in a single `AdjacencyIndex` impl without the
467/// lifetime acrobatics `mnem_core::HybridAdjacency<A,K>` imposes (it
468/// borrows both sources). Structurally equivalent to a
469/// `HybridAdjacency<AuthoredEdges, KnnSlice>` sent through an
470/// `Arc<dyn AdjacencyIndex + Send + Sync>` consumer.
471///
472/// # Flag-off / empty-KNN contract
473///
474/// When `knn` is empty, `iter_edges` yields exactly the authored
475/// stream in the authored source's native order, preserving the
476/// `mnem_core::tests::hybrid_adjacency_union` byte-identity contract.
477#[derive(Debug)]
478pub struct DerivedHybridAdjacency {
479 /// Authored-edge snapshot shared with `GraphCache::adjacency`.
480 pub authored: Arc<AuthoredEdges>,
481 /// KNN-derived edges in canonical `(src, dst)` ASC order. `weight`
482 /// is the similarity score, `provenance` is `Knn`.
483 pub knn: Vec<(NodeId, NodeId, f32)>,
484}
485
486impl AdjacencyIndex for DerivedHybridAdjacency {
487 fn iter_edges(&self) -> Box<dyn Iterator<Item = AdjEdge> + '_> {
488 let authored = self.authored.edges.iter().map(|(s, d)| AdjEdge {
489 src: *s,
490 dst: *d,
491 weight: 1.0,
492 provenance: EdgeProvenance::Authored,
493 });
494 let knn = self.knn.iter().map(|(s, d, w)| AdjEdge {
495 src: *s,
496 dst: *d,
497 weight: *w,
498 provenance: EdgeProvenance::Knn,
499 });
500 Box::new(authored.chain(knn))
501 }
502 fn edge_count(&self) -> usize {
503 self.authored.edges.len() + self.knn.len()
504 }
505}
506
507/// C3 FIX + Patch-B: authored adjacency, derived-KNN edges, and the
508/// community assignment keyed on `op_id`. Single-slot: one snapshot
509/// is shared across requests until the next commit.
510///
511/// Two-key design:
512/// - `key` = repo `op_id`; any write invalidates everything.
513/// - `knn_key` = content address of the KNN-edge index
514/// (`hash(root_cid, k, metric)`) used to avoid rebuilding the
515/// KNN-derived edges when the same vector index is re-used across
516/// retrieves at the same `op_id`.
517#[derive(Default)]
518pub struct GraphCache {
519 /// Op-id the cache was built against. `None` means empty.
520 pub key: Option<Cid>,
521 /// Shared authored-adjacency snapshot.
522 pub adjacency: Option<Arc<AuthoredEdges>>,
523 /// C3 Patch-B: KNN-derived edges produced by
524 /// `mnem_ann::derive_knn_edges_from_vectors` over the current
525 /// vector index. `None` means either (a) the authored adjacency
526 /// was already non-empty so KNN fallback never fired, or (b) the
527 /// vector index was empty (nothing to derive from).
528 pub knn_edges: Option<Arc<Vec<(NodeId, NodeId, f32)>>>,
529 /// Cache key for `knn_edges`:
530 /// `(KnnEdgeIndex::compute_cid, k, metric_tag)` projected down to
531 /// the content-address CID. Stable across restarts given the same
532 /// vector-index contents + k + metric.
533 pub knn_key: Option<Cid>,
534 /// Hybrid (authored + KNN) adjacency ready to hand out as
535 /// `Arc<dyn AdjacencyIndex + Send + Sync>`. Rebuilt whenever
536 /// `adjacency` or `knn_edges` changes.
537 pub hybrid: Option<Arc<DerivedHybridAdjacency>>,
538 /// Shared Leiden community assignment over `hybrid` (falls back
539 /// to `adjacency` when KNN fallback is inactive).
540 pub community: Option<Arc<mnem_graphrag::community::CommunityAssignment>>,
541 /// C3 FIX-1: cached row-stochastic CSR matrix used by PPR.
542 /// Invalidated alongside `adjacency` / `knn_edges` so the matrix
543 /// always reflects the current op-id's adjacency. Shared across
544 /// retrieves at the same op-id; `mnem_core::ppr::ppr_with_matrix`
545 /// is byte-identical to the from-scratch `ppr()` path (pinned by
546 /// the `ppr_with_matrix_matches_ppr_on_small_graph` integration
547 /// test), so turning this cache on is a pure-speed change.
548 pub ppr_matrix: Option<Arc<mnem_core::ppr::SparseTransition>>,
549 /// Gap 10 Phase-1: prior-commit assignment surviving op-id churn.
550 pub community_stale: Option<Arc<mnem_graphrag::community::CommunityAssignment>>,
551 /// Gap 10 Phase-1: debounce + storm-cap policy state.
552 pub leiden_cache: LeidenCache,
553}
554
555impl GraphCache {
556 /// Invalidate if the repo's op-id has moved. Called on every
557 /// cache access, mirrors [`IndexCache::reconcile`].
558 pub fn reconcile(&mut self, repo: &ReadonlyRepo) {
559 let current = Some(repo.op_id().clone());
560 if self.key != current {
561 self.key = current;
562 self.adjacency = None;
563 self.knn_edges = None;
564 self.knn_key = None;
565 self.hybrid = None;
566 // Gap 10 Phase-1: demote current assignment to `community_stale`
567 // so retrieves inside the debounce window can serve it.
568 if let Some(prev) = self.community.take() {
569 self.community_stale = Some(prev);
570 }
571 self.ppr_matrix = None;
572 }
573 }
574
575 /// Gap 10 Phase-1 entry point. Resolves the current mode via
576 /// [`LeidenCache::select_mode`], increments
577 /// `mnem_leiden_mode_total{mode=...}` + mirrors gauge triad, then
578 /// returns the assignment and the served mode.
579 pub fn community_for_head(
580 &mut self,
581 repo: &ReadonlyRepo,
582 vector: Option<&BruteForceVectorIndex>,
583 metrics: &Metrics,
584 ) -> Result<
585 (
586 Arc<mnem_graphrag::community::CommunityAssignment>,
587 LeidenMode,
588 ),
589 crate::error::Error,
590 > {
591 self.reconcile(repo);
592 let adj = self.hybrid_adjacency_for(repo, vector)?;
593 let node_count = authored_node_count(adj.as_ref());
594 let now = Instant::now();
595 let mode = self.leiden_cache.select_mode(node_count, now);
596
597 metrics
598 .leiden_debounce_effective
599 .set(i64::try_from(self.leiden_cache.effective_debounce_ms()).unwrap_or(i64::MAX));
600 metrics
601 .leiden_storm_cap_effective
602 .set(i64::from(self.leiden_cache.storm_cap_per_min));
603 #[allow(clippy::cast_possible_truncation)]
604 let delta_pp10k = (DELTA_RATIO_FORCE_FULL * 10_000.0) as i64;
605 metrics.leiden_delta_ratio_effective.set(delta_pp10k);
606 metrics.leiden_mode_current.set(mode.gauge_value());
607 metrics
608 .leiden_mode
609 .get_or_create(&LeidenModeLabels {
610 mode: mode.label().to_string(),
611 })
612 .inc();
613
614 match mode {
615 LeidenMode::Full | LeidenMode::FullDebounced => {
616 if let Some(c) = &self.community {
617 return Ok((c.clone(), mode));
618 }
619 let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
620 let arc = Arc::new(assignment);
621 self.community = Some(arc.clone());
622 if matches!(mode, LeidenMode::FullDebounced) {
623 self.leiden_cache.last_recompute_at = Some(now);
624 }
625 self.leiden_cache.observe_commit_arrival(now);
626 Ok((arc, mode))
627 }
628 LeidenMode::FallbackStale => {
629 if let Some(c) = &self.community {
630 return Ok((c.clone(), mode));
631 }
632 if let Some(c) = &self.community_stale {
633 return Ok((c.clone(), mode));
634 }
635 let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
636 let arc = Arc::new(assignment);
637 self.community_stale = Some(arc.clone());
638 Ok((arc, mode))
639 }
640 }
641 }
642
643 /// C3 FIX-1: fetch (or build) the row-stochastic CSR matrix used
644 /// by PPR, over the hybrid adjacency. The matrix is a pure
645 /// function of the adjacency; caching it lets repeated retrieves
646 /// at the same op-id skip the 3-pass CSR build (dominates PPR
647 /// cost at ~15 iters on small graphs).
648 ///
649 /// Byte-identity with the uncached `ppr()` path is pinned by the
650 /// `ppr_with_matrix_matches_ppr_on_small_graph` integration test
651 /// in mnem-core.
652 pub fn ppr_matrix_for(
653 &mut self,
654 repo: &ReadonlyRepo,
655 vector: Option<&BruteForceVectorIndex>,
656 ) -> Result<Arc<mnem_core::ppr::SparseTransition>, crate::error::Error> {
657 self.reconcile(repo);
658 if let Some(m) = &self.ppr_matrix {
659 return Ok(m.clone());
660 }
661 let adj = self.hybrid_adjacency_for(repo, vector)?;
662 let m = Arc::new(mnem_core::ppr::sparse_transition_matrix(adj.as_ref()));
663 self.ppr_matrix = Some(m.clone());
664 Ok(m)
665 }
666
667 /// Fetch (or build) the authored-adjacency snapshot. Back-compat
668 /// wrapper preserved for callers that do NOT want KNN fallback
669 /// (e.g. tests asserting the pure-authored path).
670 pub fn adjacency_for(
671 &mut self,
672 repo: &ReadonlyRepo,
673 ) -> Result<Arc<AuthoredEdges>, crate::error::Error> {
674 self.reconcile(repo);
675 if let Some(a) = &self.adjacency {
676 return Ok(a.clone());
677 }
678 let a = Arc::new(collect_authored_edges(repo)?);
679 self.adjacency = Some(a.clone());
680 Ok(a)
681 }
682
683 /// Fetch (or build) the Leiden community assignment over the
684 /// authored-only adjacency. Back-compat entry point.
685 pub fn community_for(
686 &mut self,
687 repo: &ReadonlyRepo,
688 ) -> Result<Arc<mnem_graphrag::community::CommunityAssignment>, crate::error::Error> {
689 self.reconcile(repo);
690 if let Some(c) = &self.community {
691 return Ok(c.clone());
692 }
693 let adj = self.adjacency_for(repo)?;
694 let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
695 let arc = Arc::new(assignment);
696 self.community = Some(arc.clone());
697 Ok(arc)
698 }
699
700 /// C3 Patch-B entry point: fetch (or build) the authored adjacency;
701 /// when it is empty (and `vector` is non-empty), derive a KNN-edge
702 /// substrate via `mnem_ann::derive_knn_edges_from_vectors` and
703 /// cache a combined [`DerivedHybridAdjacency`].
704 ///
705 /// Returns a trait-object `Arc<dyn AdjacencyIndex + Send + Sync>`
706 /// so the retriever's `with_adjacency_index` consumer can accept
707 /// it without knowing whether the KNN lane fired.
708 ///
709 /// # Zero-impact contracts
710 ///
711 /// - Authored non-empty -> returns the authored snapshot; KNN lane
712 /// never runs; `knn_edges` stays `None`.
713 /// - `vector` is `None` or empty -> returns the authored snapshot
714 /// (possibly empty); no KNN fallback; preserves the "no graph,
715 /// no filter" legacy behaviour.
716 /// - Same `op_id` + same vector-index content -> served from the
717 /// single-slot cache; `derive_knn_edges_from_vectors` is not
718 /// re-run.
719 pub fn hybrid_adjacency_for(
720 &mut self,
721 repo: &ReadonlyRepo,
722 vector: Option<&BruteForceVectorIndex>,
723 ) -> Result<Arc<dyn AdjacencyIndex + Send + Sync>, crate::error::Error> {
724 self.reconcile(repo);
725 let authored = self.adjacency_for(repo)?;
726
727 // Zero-impact short-circuits.
728 if !authored.edges.is_empty() {
729 return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
730 }
731 let Some(vec_idx) = vector else {
732 return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
733 };
734 if vec_idx.is_empty() {
735 return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
736 }
737
738 // Fallback active: derive (or fetch cached) KNN edges. The
739 // `KnnEdgeIndex` CID folds in (root_cid, k, metric) so the
740 // cache key is stable across restarts given the same vector
741 // content.
742 self.ensure_knn_edges(vec_idx)?;
743 let knn = self
744 .knn_edges
745 .clone()
746 .expect("ensure_knn_edges populated the slot");
747 // One-shot info log the first time the fallback path wins.
748 if !KNN_FALLBACK_LOGGED.swap(true, Ordering::Relaxed) {
749 tracing::info!(
750 target: "mnem_http::graph_cache",
751 k = KNN_FALLBACK_K,
752 metric = "cosine",
753 knn_edges = knn.len(),
754 vector_model = %vec_idx.model(),
755 "authored adjacency empty; KNN-edge fallback activated (E0 wire)",
756 );
757 }
758 if self.hybrid.is_none() {
759 self.hybrid = Some(Arc::new(DerivedHybridAdjacency {
760 authored: authored.clone(),
761 knn: (*knn).clone(),
762 }));
763 }
764 Ok(self.hybrid.clone().expect("hybrid slot populated above")
765 as Arc<dyn AdjacencyIndex + Send + Sync>)
766 }
767
768 /// C3 Patch-B: community assignment over the hybrid adjacency.
769 /// Falls through to the authored-only path when `vector` is absent
770 /// or empty, or when authored edges are already non-empty.
771 pub fn hybrid_community_for(
772 &mut self,
773 repo: &ReadonlyRepo,
774 vector: Option<&BruteForceVectorIndex>,
775 ) -> Result<Arc<mnem_graphrag::community::CommunityAssignment>, crate::error::Error> {
776 self.reconcile(repo);
777 if let Some(c) = &self.community {
778 return Ok(c.clone());
779 }
780 let adj = self.hybrid_adjacency_for(repo, vector)?;
781 // compute_communities takes `&dyn AdjacencyIndex`; the trait
782 // object's deref coerces.
783 let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
784 let arc = Arc::new(assignment);
785 self.community = Some(arc.clone());
786 Ok(arc)
787 }
788
789 /// Derive KNN edges from `vector` and cache them along with the
790 /// `KnnEdgeIndex` CID as the cache key. Idempotent: returns early
791 /// when the key already matches.
792 fn ensure_knn_edges(
793 &mut self,
794 vector: &BruteForceVectorIndex,
795 ) -> Result<(), crate::error::Error> {
796 // Collect (ids, vecs) from the brute-force index. The slices
797 // are borrowed from the index's flat buffer (zero copy up to
798 // the `.to_vec()` the KNN builder does internally when it
799 // clones per-source rows).
800 let mut ids: Vec<NodeId> = Vec::with_capacity(vector.len());
801 let mut vecs: Vec<Vec<f32>> = Vec::with_capacity(vector.len());
802 for (id, row) in vector.points_iter() {
803 ids.push(id);
804 vecs.push(row.to_vec());
805 }
806 let edges = mnem_ann::derive_knn_edges_from_vectors(
807 &ids,
808 &vecs,
809 KNN_FALLBACK_K,
810 mnem_ann::DistanceMetric::Cosine,
811 );
812 // Assemble a KnnEdgeIndex purely to compute the cache-key CID.
813 // The `root_cid` is the content hash of (model, dim, ids,
814 // flat_vecs); we lean on the HybridAdjacency E0 substrate's
815 // existing `compute_cid` so the key is stable across restarts
816 // given identical vector content.
817 let root_cid = vector_index_content_cid(vector, &ids)?;
818 let idx = mnem_ann::KnnEdgeIndex {
819 root_cid,
820 k: KNN_FALLBACK_K,
821 metric: mnem_ann::DistanceMetric::Cosine,
822 edges,
823 };
824 let cid = idx
825 .compute_cid()
826 .map_err(|e| crate::error::Error::internal(format!("knn edge cid: {e}")))?;
827 if self.knn_key.as_ref() == Some(&cid) && self.knn_edges.is_some() {
828 return Ok(());
829 }
830 let triples: Vec<(NodeId, NodeId, f32)> = idx
831 .edges
832 .into_iter()
833 .map(|e| (e.src, e.dst, e.weight))
834 .collect();
835 self.knn_edges = Some(Arc::new(triples));
836 self.knn_key = Some(cid);
837 self.hybrid = None; // invalidate derived combined view
838 Ok(())
839 }
840}
841
842/// Derive a stable content-address for a [`BruteForceVectorIndex`] by
843/// hashing `(model, dim, canonical_ids)`. Two vector indexes with the
844/// same model, dimensionality and ID set share a CID, which is the
845/// coarser grain we want for the KNN-edge cache key (the vectors
846/// themselves live in the Prolly tree keyed by NodeId, so equal IDs
847/// at the same op_id imply equal vector contents).
848fn vector_index_content_cid(
849 vector: &BruteForceVectorIndex,
850 ids: &[NodeId],
851) -> Result<Cid, crate::error::Error> {
852 use mnem_core::codec::to_canonical_bytes;
853 use mnem_core::id::{CODEC_RAW, Multihash};
854 #[derive(serde::Serialize)]
855 struct Preimage<'a> {
856 tag: &'a str,
857 model: &'a str,
858 dim: u32,
859 ids: &'a [NodeId],
860 }
861 let pre = Preimage {
862 tag: "mnem-http/knn-fallback/v1",
863 model: vector.model(),
864 dim: vector.dim(),
865 ids,
866 };
867 let body = to_canonical_bytes(&pre)
868 .map_err(|e| crate::error::Error::internal(format!("canonical encode: {e}")))?;
869 let hash = Multihash::sha2_256(&body);
870 Ok(Cid::new(CODEC_RAW, hash))
871}
872
873/// Gap 10 Phase-1: distinct node count walking adjacency edges.
874fn authored_node_count(adj: &(dyn AdjacencyIndex + Send + Sync)) -> usize {
875 use std::collections::BTreeSet;
876 let mut seen: BTreeSet<NodeId> = BTreeSet::new();
877 for e in adj.iter_edges() {
878 seen.insert(e.src);
879 seen.insert(e.dst);
880 }
881 seen.len()
882}
883
884/// Walk `commit.edges` Prolly tree once and collect every authored
885/// edge as a `(src, dst)` pair in canonical traversal order.
886fn collect_authored_edges(repo: &ReadonlyRepo) -> Result<AuthoredEdges, crate::error::Error> {
887 let Some(commit) = repo.head_commit() else {
888 return Ok(AuthoredEdges::default());
889 };
890 let bs = repo.blockstore().clone();
891 let cursor = mnem_core::prolly::Cursor::new(&*bs, &commit.edges)
892 .map_err(|e| crate::error::Error::internal(format!("opening edge cursor: {e}")))?;
893 let mut edges: Vec<(NodeId, NodeId)> = Vec::new();
894 for entry in cursor {
895 let (_key, edge_cid) =
896 entry.map_err(|e| crate::error::Error::internal(format!("walking edge tree: {e}")))?;
897 let bytes = bs
898 .get(&edge_cid)
899 .map_err(|e| crate::error::Error::internal(format!("fetching edge block: {e}")))?
900 .ok_or_else(|| {
901 crate::error::Error::internal(format!("edge block {edge_cid} missing"))
902 })?;
903 let edge: mnem_core::objects::Edge = mnem_core::codec::from_canonical_bytes(&bytes)
904 .map_err(|e| crate::error::Error::internal(format!("decoding edge: {e}")))?;
905 edges.push((edge.src, edge.dst));
906 }
907 Ok(AuthoredEdges { edges })
908}
909
910#[cfg(test)]
911pub(crate) mod test_support {
912 //! Test-only builders for [`AppState`]. Scoped `pub(crate)` so
913 //! sibling modules (`auth`, `routes::remote`) can build a minimal
914 //! in-memory state without paying the full `app_with_options`
915 //! path (redb open + config load) for every unit test.
916
917 use super::*;
918 use mnem_core::store::{MemoryBlockstore, MemoryOpHeadsStore};
919
920 /// Build an `AppState` backed by in-memory stores, with an
921 /// optional push token. Used by extractor and route unit tests.
922 pub(crate) fn state_with_token(token: Option<String>) -> AppState {
923 let bs: Arc<dyn mnem_core::store::Blockstore> = Arc::new(MemoryBlockstore::new());
924 let ohs: Arc<dyn mnem_core::store::OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
925 let repo = ReadonlyRepo::init(bs, ohs).expect("init ok");
926 AppState {
927 repo: Arc::new(Mutex::new(repo)),
928 embed_cfg: None,
929 sparse_cfg: None,
930 indexes: Arc::new(Mutex::new(IndexCache::default())),
931 allow_labels: false,
932 metrics: Metrics::new(),
933 push_token: token,
934 graph_cache: Arc::new(Mutex::new(GraphCache::default())),
935 traverse_cfg: Arc::new(crate::routes::traverse::TraverseAnswerCfg::default()),
936 }
937 }
938}
939
940#[cfg(test)]
941mod knn_fallback_tests {
942 //! C3 Patch-B unit tests: `GraphCache` KNN-edge fallback activation
943 //! and zero-impact short-circuits.
944
945 use super::*;
946 use bytes::Bytes;
947 use mnem_core::objects::node::{Dtype, Embedding};
948 use mnem_core::store::{Blockstore, MemoryBlockstore, MemoryOpHeadsStore, OpHeadsStore};
949
950 fn stores() -> (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) {
951 (
952 Arc::new(MemoryBlockstore::new()),
953 Arc::new(MemoryOpHeadsStore::new()),
954 )
955 }
956
957 fn f32_embed(model: &str, v: &[f32]) -> Embedding {
958 let mut bytes = Vec::with_capacity(v.len() * 4);
959 for x in v {
960 bytes.extend_from_slice(&x.to_le_bytes());
961 }
962 Embedding {
963 model: model.to_string(),
964 dtype: Dtype::F32,
965 dim: u32::try_from(v.len()).expect("test vec fits in u32"),
966 vector: Bytes::from(bytes),
967 }
968 }
969
970 fn build_vector_index(rows: &[(NodeId, Vec<f32>)]) -> BruteForceVectorIndex {
971 let mut idx = BruteForceVectorIndex::empty("m", 3);
972 for (id, v) in rows {
973 let inserted = idx.try_insert(*id, &f32_embed("m", v));
974 assert!(inserted, "embedding insert");
975 }
976 idx
977 }
978
979 #[test]
980 fn empty_authored_plus_empty_vector_is_no_op() {
981 let (bs, ohs) = stores();
982 let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
983 let mut gc = GraphCache::default();
984 let adj = gc.hybrid_adjacency_for(&repo, None).ok().expect("no-op");
985 assert_eq!(adj.edge_count(), 0, "no vectors -> no KNN fallback");
986 assert!(gc.knn_edges.is_none());
987 }
988
989 #[test]
990 fn empty_authored_plus_populated_vector_activates_fallback() {
991 let (bs, ohs) = stores();
992 let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
993 // 4 distinct L2-direction vectors; KNN k=32 (capped at n-1=3)
994 // yields at least one directed edge per source.
995 let rows: Vec<(NodeId, Vec<f32>)> = vec![
996 (NodeId::new_v7(), vec![1.0, 0.0, 0.0]),
997 (NodeId::new_v7(), vec![0.9, 0.1, 0.0]),
998 (NodeId::new_v7(), vec![0.0, 1.0, 0.0]),
999 (NodeId::new_v7(), vec![0.0, 0.0, 1.0]),
1000 ];
1001 let vec_idx = build_vector_index(&rows);
1002
1003 let mut gc = GraphCache::default();
1004 let adj = gc
1005 .hybrid_adjacency_for(&repo, Some(&vec_idx))
1006 .ok()
1007 .expect("knn fallback ok");
1008 assert!(
1009 adj.edge_count() > 0,
1010 "KNN fallback must produce at least one edge (got 0)",
1011 );
1012 assert!(gc.knn_edges.is_some(), "knn_edges slot populated");
1013 assert!(gc.knn_key.is_some(), "knn cache key populated");
1014
1015 // Community assignment must see the derived graph: at least one
1016 // community membership lookup must be Some (non-trivial graph).
1017 let assignment = gc
1018 .hybrid_community_for(&repo, Some(&vec_idx))
1019 .ok()
1020 .expect("community ok");
1021 let any_assigned = rows
1022 .iter()
1023 .any(|(id, _)| assignment.community_of(*id).is_some());
1024 assert!(
1025 any_assigned,
1026 "at least one node must have a community under a non-empty adjacency",
1027 );
1028 }
1029
1030 #[test]
1031 fn knn_fallback_is_idempotent_on_same_vector() {
1032 let (bs, ohs) = stores();
1033 let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
1034 let rows: Vec<(NodeId, Vec<f32>)> = vec![
1035 (NodeId::new_v7(), vec![1.0, 0.0, 0.0]),
1036 (NodeId::new_v7(), vec![0.0, 1.0, 0.0]),
1037 ];
1038 let vec_idx = build_vector_index(&rows);
1039 let mut gc = GraphCache::default();
1040 let _ = gc
1041 .hybrid_adjacency_for(&repo, Some(&vec_idx))
1042 .ok()
1043 .expect("first build");
1044 let first_key = gc.knn_key.clone().expect("first build populates key");
1045 // Second call must re-use the cached slot (same op_id + same
1046 // vector content -> same KnnEdgeIndex CID).
1047 let _ = gc
1048 .hybrid_adjacency_for(&repo, Some(&vec_idx))
1049 .ok()
1050 .expect("second build");
1051 let second_key = gc.knn_key.clone().expect("second build populates key");
1052 assert_eq!(first_key, second_key, "KNN cache key stable across calls");
1053 }
1054}