Skip to main content

mnem_http/
state.rs

1//! Shared app state. One `ReadonlyRepo` per server, behind an
2//! `Arc<Mutex>` because redb holds an exclusive file lock so we can
3//! only have one open per process. Writes take the lock, run a
4//! transaction, and replace the repo with the post-commit value.
5
6use std::collections::{HashMap, VecDeque};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10
11use mnem_core::id::{Cid, NodeId};
12use mnem_core::index::hybrid::{AdjEdge, AdjacencyIndex, EdgeProvenance};
13use mnem_core::index::{BruteForceVectorIndex, SparseInvertedIndex, VectorIndex};
14use mnem_core::repo::ReadonlyRepo;
15
16use crate::metrics::{LeidenModeLabels, Metrics};
17
18/// Default per-source neighbour count for the KNN-edge fallback. 32
19/// matches the `k=32, metric=cosine` determinism contract the
20/// `KnnEdgeIndex::compute_cid` key derivation bakes in.
21const KNN_FALLBACK_K: u32 = 32;
22
23/// One-shot guard so the "KNN-edge fallback activated" info-level log
24/// fires once per process lifetime (not once per retrieve). Keeps the
25/// prod log from flooding while still emitting the "yes, E0 wire is
26/// live" breadcrumb.
27static KNN_FALLBACK_LOGGED: AtomicBool = AtomicBool::new(false);
28
29// ---------------------------------------------------------------
30// Gap 10 Phase-1: community-cache invalidation tunables
31// ---------------------------------------------------------------
32
33/// Gap 10 R6 floor-c tunable: commit-storm DoS cap per minute. Default 60.
34///
35/// `#tunable: default=60; rationale="attacker cannot amplify beyond human commit rate"`
36pub const COMMIT_STORM_CAP_PER_MIN: u32 = 60;
37
38/// Gap 10 R6 floor-c tunable: fraction of the graph that must change
39/// in one commit before an incremental recompute path force-flips to
40/// full. Exported today as a gauge-visible constant; consulted by the
41/// debounced-full recompute loop when deciding whether to bypass the
42/// incremental shortcut.
43///
44/// `#tunable: default=0.5; rationale="half-graph change = incremental not cheaper than full"`
45pub const DELTA_RATIO_FORCE_FULL: f32 = 0.5;
46
47/// Gap 10 Phase-1 graph-size gate: above this node count the hot path
48/// refuses to run full-Leiden inline and serves `FallbackStale`.
49///
50/// `#tunable: default=250_000; rationale="HNSW memory derivation; see benchmarks/leiden-wallclock-vs-V.md"`
51pub const GRAPH_SIZE_GATE_V: usize = 250_000;
52
53/// Gap 10 R3 debounce floor.
54pub const DEBOUNCE_FLOOR_MS: u64 = 1_000;
55
56/// Size of the rolling ring buffer used for p75 commit-latency.
57pub const COMMIT_LATENCY_WINDOW: usize = 100;
58
59/// Gap 10 R6 code-sketch API: runtime-derived debounce window.
60#[must_use]
61pub fn derive_debounce_ms(rolling_p75_commit_ms: Option<u64>) -> u64 {
62    rolling_p75_commit_ms
63        .map(|p| p.max(DEBOUNCE_FLOOR_MS))
64        .unwrap_or(DEBOUNCE_FLOOR_MS)
65}
66
67/// Gap 10 Phase-1 recompute-mode enum. Closed vocabulary.
68#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
69pub enum LeidenMode {
70    /// Immediate full recompute on every commit. Env-forced.
71    Full,
72    /// Debounced full recompute (a future version default).
73    FullDebounced,
74    /// Served prior commit's assignment; refresh suppressed.
75    FallbackStale,
76}
77
78impl LeidenMode {
79    /// Prometheus label string.
80    #[must_use]
81    pub fn label(&self) -> &'static str {
82        match self {
83            Self::Full => "full",
84            Self::FullDebounced => "full_debounced",
85            Self::FallbackStale => "fallback_stale",
86        }
87    }
88
89    /// Gauge encoding for `mnem_leiden_mode_current`.
90    #[must_use]
91    pub fn gauge_value(&self) -> i64 {
92        match self {
93            Self::Full => 0,
94            Self::FullDebounced => 1,
95            Self::FallbackStale => 2,
96        }
97    }
98
99    /// Resolve default mode from env at startup.
100    #[must_use]
101    pub fn resolve_default_from_env() -> Self {
102        match std::env::var("MNEM_LEIDEN_FULL_RECOMPUTE").ok() {
103            Some(v) => {
104                let t = v.trim().to_ascii_lowercase();
105                if t.is_empty() || matches!(t.as_str(), "0" | "false" | "no" | "off") {
106                    Self::FullDebounced
107                } else {
108                    Self::Full
109                }
110            }
111            None => Self::FullDebounced,
112        }
113    }
114}
115
116/// Gap 10 Phase-1 debounce + storm-cap state.
117#[derive(Debug)]
118pub struct LeidenCache {
119    /// Rolling ring of commit wall-clock latencies (milliseconds).
120    pub commit_latency_ms: VecDeque<u64>,
121    /// Instant of the most recent successful full recompute.
122    pub last_recompute_at: Option<Instant>,
123    /// Ring of commit arrivals inside the trailing 60s.
124    pub commit_arrivals: VecDeque<Instant>,
125    /// Default mode at process start.
126    pub default_mode: LeidenMode,
127    /// Effective storm cap (operator-overridable).
128    pub storm_cap_per_min: u32,
129}
130
131impl Default for LeidenCache {
132    fn default() -> Self {
133        Self {
134            commit_latency_ms: VecDeque::with_capacity(COMMIT_LATENCY_WINDOW),
135            last_recompute_at: None,
136            commit_arrivals: VecDeque::new(),
137            default_mode: LeidenMode::FullDebounced,
138            storm_cap_per_min: COMMIT_STORM_CAP_PER_MIN,
139        }
140    }
141}
142
143impl LeidenCache {
144    /// Record a completed-commit wall-time sample.
145    pub fn observe_commit_latency(&mut self, latency: Duration) {
146        let ms = u64::try_from(latency.as_millis()).unwrap_or(u64::MAX);
147        if self.commit_latency_ms.len() == COMMIT_LATENCY_WINDOW {
148            self.commit_latency_ms.pop_front();
149        }
150        self.commit_latency_ms.push_back(ms);
151    }
152
153    /// Record a commit arrival; evicts entries older than 60s.
154    pub fn observe_commit_arrival(&mut self, at: Instant) {
155        let cutoff = at.checked_sub(Duration::from_mins(1)).unwrap_or(at);
156        while let Some(front) = self.commit_arrivals.front() {
157            if *front < cutoff {
158                self.commit_arrivals.pop_front();
159            } else {
160                break;
161            }
162        }
163        self.commit_arrivals.push_back(at);
164    }
165
166    /// Nearest-rank p75 of the rolling commit-latency ring.
167    #[must_use]
168    pub fn rolling_p75_commit_ms(&self) -> Option<u64> {
169        if self.commit_latency_ms.is_empty() {
170            return None;
171        }
172        let mut sorted: Vec<u64> = self.commit_latency_ms.iter().copied().collect();
173        sorted.sort_unstable();
174        let n = sorted.len();
175        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
176        let idx = ((n as f64 * 0.75).ceil() as usize)
177            .saturating_sub(1)
178            .min(n - 1);
179        Some(sorted[idx])
180    }
181
182    /// Effective debounce window via [`derive_debounce_ms`].
183    #[must_use]
184    pub fn effective_debounce_ms(&self) -> u64 {
185        derive_debounce_ms(self.rolling_p75_commit_ms())
186    }
187
188    /// Is the 60s commit-arrival ring at or above the storm cap?
189    #[must_use]
190    pub fn storm_cap_reached(&self) -> bool {
191        u32::try_from(self.commit_arrivals.len()).unwrap_or(u32::MAX) >= self.storm_cap_per_min
192    }
193
194    /// Pure policy function. Returns the mode to serve this call.
195    #[must_use]
196    pub fn select_mode(&self, node_count: usize, now: Instant) -> LeidenMode {
197        if self.default_mode == LeidenMode::Full {
198            return LeidenMode::Full;
199        }
200        if node_count >= GRAPH_SIZE_GATE_V {
201            return LeidenMode::FallbackStale;
202        }
203        if self.storm_cap_reached() {
204            return LeidenMode::FallbackStale;
205        }
206        if let Some(last) = self.last_recompute_at {
207            let elapsed_ms =
208                u64::try_from(now.saturating_duration_since(last).as_millis()).unwrap_or(u64::MAX);
209            if elapsed_ms < self.effective_debounce_ms() {
210                return LeidenMode::FallbackStale;
211            }
212        }
213        LeidenMode::FullDebounced
214    }
215}
216
217/// Shared application state passed to every axum handler via
218/// `State<AppState>`. Clones are cheap (shared `Arc`).
219#[derive(Clone)]
220pub struct AppState {
221    /// The open repo, held behind a mutex because redb keeps an
222    /// exclusive file lock per-process. Writes replace the value
223    /// inside the mutex with the post-commit `ReadonlyRepo`.
224    pub repo: Arc<Mutex<ReadonlyRepo>>,
225    /// Optional embed-provider config resolved from the repo's
226    /// `config.toml` at startup.
227    pub embed_cfg: Option<mnem_embed_providers::ProviderConfig>,
228    /// Optional sparse-provider config resolved from the repo's
229    /// `config.toml` at startup. When present, POST `/v1/nodes` and
230    /// POST `/v1/nodes/bulk` auto-populate `Node.sparse_embed` on
231    /// ingest, and `/v1/retrieve` auto-encodes the query via
232    /// `SparseEncoder::encode_query` so the neural-sparse lane fires
233    /// end-to-end.
234    pub sparse_cfg: Option<mnem_sparse_providers::ProviderConfig>,
235    /// Index cache keyed by commit CID. Fixes audit gap G1: without
236    /// this, every `/v1/retrieve` call rebuilt the vector + sparse
237    /// indexes from scratch (O(N) per query). With this, the first
238    /// retrieve after a commit pays the build cost; every subsequent
239    /// retrieve returns in microseconds.
240    ///
241    /// Invalidation is automatic: any write path that commits also
242    /// produces a new head commit CID (via
243    /// `Transaction::commit -> ReadonlyRepo`). The cache sees the
244    /// mismatch next time and evicts.
245    pub indexes: Arc<Mutex<IndexCache>>,
246    /// Whether the server accepts caller-supplied `label` values on
247    /// ingest and `label` filters on retrieve. Read from the
248    /// `MNEM_BENCH` environment variable at startup.
249    ///
250    /// Defaults to `false`. Casual / single-tenant / personal-graph
251    /// installations keep label hidden: every ingested node gets
252    /// `ntype = Node::DEFAULT_NTYPE` ("Node") regardless of what the
253    /// caller sent, and retrieve ignores any label filter. No way to
254    /// flip this via a CLI flag or request body - operators opt in by
255    /// setting `MNEM_BENCH=1` at server launch, which is how the
256    /// reference benchmark harnesses in this repo pin per-item
257    /// isolation. Zero surface area for a regular user to stumble
258    /// into label-scoped state.
259    pub allow_labels: bool,
260    /// Prometheus metrics registry shared with the `/metrics` route
261    /// and the `track_metrics` middleware. Clones are cheap (`Arc`
262    /// inside); no per-request allocation.
263    pub metrics: Metrics,
264    /// Bearer token that authorises `/remote/v1/push-blocks` and
265    /// `/remote/v1/advance-head`. Read from `MNEM_HTTP_PUSH_TOKEN`
266    /// at startup. `None` means those routes are administratively
267    /// disabled (fail-closed): they return 503 regardless of
268    /// whatever the caller presented.
269    ///
270    /// The token never touches disk and is never emitted to tracing.
271    /// See [`crate::auth`] for the extractor that enforces the
272    /// check.
273    pub push_token: Option<String>,
274    /// C3 FIX-1 + FIX-2: authored-edges adjacency + Leiden community
275    /// assignment cache, keyed on the repo's op-id. Populated lazily
276    /// on the first retrieve that asks for `community_filter=true` or
277    /// `graph_mode="ppr"`. Invalidated whenever the op-id changes
278    /// (any write path). Single-slot cache: one authored-adjacency
279    /// snapshot is shared across requests until the next commit.
280    pub graph_cache: Arc<Mutex<GraphCache>>,
281    /// Gap 09 - `/v1/traverse_answer` runtime config. Default OFF per
282    /// architect Decision 4; opt in via
283    /// `[experimental] single_call_multihop = true` in the repo's
284    /// `config.toml`. Wrapped in `Arc` for cheap clones across handler
285    /// dispatch.
286    pub traverse_cfg: Arc<crate::routes::traverse::TraverseAnswerCfg>,
287    /// NER provider config resolved from the repo's `config.toml` at
288    /// startup. When `None`, ingest paths default to `NerConfig::Rule`.
289    pub ner_cfg: Option<mnem_ingest::NerConfig>,
290}
291
292impl AppState {
293    /// Read `MNEM_BENCH` at startup. See [`Self::parse_allow_labels`]
294    /// for the truthy / falsy string rules.
295    #[must_use]
296    pub fn resolve_allow_labels_from_env() -> bool {
297        Self::parse_allow_labels(std::env::var("MNEM_BENCH").ok().as_deref())
298    }
299
300    /// Read `MNEM_HTTP_PUSH_TOKEN` at startup. Empty / unset -> `None`
301    /// (writes fail-closed). See [`crate::auth::RequireBearer`] for the
302    /// extractor that enforces the check.
303    #[must_use]
304    pub fn resolve_push_token_from_env() -> Option<String> {
305        let raw = std::env::var("MNEM_HTTP_PUSH_TOKEN").ok()?;
306        let trimmed = raw.trim();
307        if trimmed.is_empty() {
308            None
309        } else {
310            Some(trimmed.to_string())
311        }
312    }
313
314    /// Pure parser for the `MNEM_BENCH` value. `None` (unset) is
315    /// false. Falsy strings (`"0"`, `"false"`, `"no"`, `"off"`, empty,
316    /// all case-insensitive) are false. Anything else is true.
317    #[must_use]
318    pub fn parse_allow_labels(val: Option<&str>) -> bool {
319        match val {
320            None => false,
321            Some(s) => {
322                let t = s.trim();
323                if t.is_empty() {
324                    return false;
325                }
326                let l = t.to_ascii_lowercase();
327                !matches!(l.as_str(), "0" | "false" | "no" | "off")
328            }
329        }
330    }
331}
332
333/// Server-side cache of built retrieval indexes. Keyed on the repo's
334/// current head-commit CID; when that changes, the whole cache is
335/// treated as stale and rebuilt on demand.
336#[derive(Default)]
337pub struct IndexCache {
338    /// Repo op-id the current cache was built against. `None` on
339    /// startup or after a reset. When this differs from the repo's
340    /// current op-id, every entry below is considered stale.
341    ///
342    /// Named `cache_key_op_id` (not `commit_cid`) because op-id is
343    /// what we key on: op-id exists on a freshly-initialised repo
344    /// with no commits yet, which the commit CID does not.
345    pub cache_key_op_id: Option<Cid>,
346    /// Dense vector indexes keyed by the embedder's `model` string
347    /// (e.g. `"ollama:nomic-embed-text"`). One repo can carry multiple
348    /// model families if the caller runs several retrieves with
349    /// different `vector_model`s.
350    pub vectors: HashMap<String, Arc<BruteForceVectorIndex>>,
351    /// Sparse indexes keyed by `SparseEmbed::vocab_id`.
352    pub sparse: HashMap<String, Arc<SparseInvertedIndex>>,
353}
354
355#[cfg(test)]
356mod mnem_bench_parse_tests {
357    use super::*;
358
359    #[test]
360    fn unset_parses_false() {
361        assert!(!AppState::parse_allow_labels(None));
362    }
363
364    #[test]
365    fn falsy_strings_parse_false() {
366        for v in [
367            "", "0", "false", "FALSE", "False", "no", "No", "NO", "off", "Off", "OFF", "  ", "  0 ",
368        ] {
369            assert!(
370                !AppState::parse_allow_labels(Some(v)),
371                "expected `{v:?}` to parse false"
372            );
373        }
374    }
375
376    #[test]
377    fn truthy_strings_parse_true() {
378        for v in ["1", "true", "yes", "on", "YES", "benchmark", "anything"] {
379            assert!(
380                AppState::parse_allow_labels(Some(v)),
381                "expected `{v:?}` to parse true"
382            );
383        }
384    }
385}
386
387impl IndexCache {
388    /// Reconcile the cache against the repo's current op-id. Called
389    /// on every cache access. When the op has changed (any write
390    /// flips it), all built indexes are dropped and the next getter
391    /// will rebuild.
392    ///
393    /// We key on `repo.op_id()` rather than `head_commit().cid`
394    /// because the op-id exists on freshly-initialised repos that
395    /// have no commits yet, and it changes on every write. Commit
396    /// CID would only change after the first real commit, leaving
397    /// an init-time cache entry with no way to invalidate.
398    pub fn reconcile(&mut self, repo: &ReadonlyRepo) {
399        let current = Some(repo.op_id().clone());
400        if self.cache_key_op_id != current {
401            self.cache_key_op_id = current;
402            self.vectors.clear();
403            self.sparse.clear();
404        }
405    }
406
407    /// Fetch (or build) a dense vector index for `model`.
408    pub fn vector_index(
409        &mut self,
410        repo: &ReadonlyRepo,
411        model: &str,
412    ) -> Result<Arc<BruteForceVectorIndex>, mnem_core::Error> {
413        self.reconcile(repo);
414        if let Some(idx) = self.vectors.get(model) {
415            return Ok(idx.clone());
416        }
417        let idx = Arc::new(repo.build_vector_index(model)?);
418        self.vectors.insert(model.to_string(), idx.clone());
419        Ok(idx)
420    }
421
422    /// Fetch (or build) a sparse inverted index for `vocab_id`.
423    pub fn sparse_index(
424        &mut self,
425        repo: &ReadonlyRepo,
426        vocab_id: &str,
427    ) -> Result<Arc<SparseInvertedIndex>, mnem_core::Error> {
428        self.reconcile(repo);
429        if let Some(idx) = self.sparse.get(vocab_id) {
430            return Ok(idx.clone());
431        }
432        let idx = Arc::new(SparseInvertedIndex::build_from_repo(repo, vocab_id)?);
433        self.sparse.insert(vocab_id.to_string(), idx.clone());
434        Ok(idx)
435    }
436}
437
438// ---------------------------------------------------------------
439// C3 FIX-1 + FIX-2: authored-edges + community-assignment cache.
440// ---------------------------------------------------------------
441
442/// Owned authored-edge list over which `AdjacencyIndex` is served.
443/// Collected once per op-id from the repo's commit.edges Prolly tree;
444/// Leiden community detection and PPR power iteration both read
445/// through the same snapshot.
446#[derive(Default, Debug)]
447pub struct AuthoredEdges {
448    /// Flat (src, dst) list in Prolly traversal order. Weighted `1.0`
449    /// by `iter_edges()` consumers; provenance stamped `Authored`.
450    pub edges: Vec<(NodeId, NodeId)>,
451}
452
453impl AdjacencyIndex for AuthoredEdges {
454    fn iter_edges(&self) -> Box<dyn Iterator<Item = AdjEdge> + '_> {
455        Box::new(self.edges.iter().map(|(s, d)| AdjEdge {
456            src: *s,
457            dst: *d,
458            weight: 1.0,
459            provenance: EdgeProvenance::Authored,
460        }))
461    }
462    fn edge_count(&self) -> usize {
463        self.edges.len()
464    }
465}
466
467/// C3 Patch-B: combined authored + KNN-derived adjacency, owned. Used
468/// by `GraphCache` to back both the Leiden community detector and the
469/// PPR adjacency wire in a single `AdjacencyIndex` impl without the
470/// lifetime acrobatics `mnem_core::HybridAdjacency<A,K>` imposes (it
471/// borrows both sources). Structurally equivalent to a
472/// `HybridAdjacency<AuthoredEdges, KnnSlice>` sent through an
473/// `Arc<dyn AdjacencyIndex + Send + Sync>` consumer.
474///
475/// # Flag-off / empty-KNN contract
476///
477/// When `knn` is empty, `iter_edges` yields exactly the authored
478/// stream in the authored source's native order, preserving the
479/// `mnem_core::tests::hybrid_adjacency_union` byte-identity contract.
480#[derive(Debug)]
481pub struct DerivedHybridAdjacency {
482    /// Authored-edge snapshot shared with `GraphCache::adjacency`.
483    pub authored: Arc<AuthoredEdges>,
484    /// KNN-derived edges in canonical `(src, dst)` ASC order. `weight`
485    /// is the similarity score, `provenance` is `Knn`.
486    pub knn: Vec<(NodeId, NodeId, f32)>,
487}
488
489impl AdjacencyIndex for DerivedHybridAdjacency {
490    fn iter_edges(&self) -> Box<dyn Iterator<Item = AdjEdge> + '_> {
491        let authored = self.authored.edges.iter().map(|(s, d)| AdjEdge {
492            src: *s,
493            dst: *d,
494            weight: 1.0,
495            provenance: EdgeProvenance::Authored,
496        });
497        let knn = self.knn.iter().map(|(s, d, w)| AdjEdge {
498            src: *s,
499            dst: *d,
500            weight: *w,
501            provenance: EdgeProvenance::Knn,
502        });
503        Box::new(authored.chain(knn))
504    }
505    fn edge_count(&self) -> usize {
506        self.authored.edges.len() + self.knn.len()
507    }
508}
509
510/// C3 FIX + Patch-B: authored adjacency, derived-KNN edges, and the
511/// community assignment keyed on `op_id`. Single-slot: one snapshot
512/// is shared across requests until the next commit.
513///
514/// Two-key design:
515/// - `key` = repo `op_id`; any write invalidates everything.
516/// - `knn_key` = content address of the KNN-edge index
517///   (`hash(root_cid, k, metric)`) used to avoid rebuilding the
518///   KNN-derived edges when the same vector index is re-used across
519///   retrieves at the same `op_id`.
520#[derive(Default)]
521pub struct GraphCache {
522    /// Op-id the cache was built against. `None` means empty.
523    pub key: Option<Cid>,
524    /// Shared authored-adjacency snapshot.
525    pub adjacency: Option<Arc<AuthoredEdges>>,
526    /// C3 Patch-B: KNN-derived edges produced by
527    /// `mnem_ann::derive_knn_edges_from_vectors` over the current
528    /// vector index. `None` means either (a) the authored adjacency
529    /// was already non-empty so KNN fallback never fired, or (b) the
530    /// vector index was empty (nothing to derive from).
531    pub knn_edges: Option<Arc<Vec<(NodeId, NodeId, f32)>>>,
532    /// Cache key for `knn_edges`:
533    /// `(KnnEdgeIndex::compute_cid, k, metric_tag)` projected down to
534    /// the content-address CID. Stable across restarts given the same
535    /// vector-index contents + k + metric.
536    pub knn_key: Option<Cid>,
537    /// Hybrid (authored + KNN) adjacency ready to hand out as
538    /// `Arc<dyn AdjacencyIndex + Send + Sync>`. Rebuilt whenever
539    /// `adjacency` or `knn_edges` changes.
540    pub hybrid: Option<Arc<DerivedHybridAdjacency>>,
541    /// Shared Leiden community assignment over `hybrid` (falls back
542    /// to `adjacency` when KNN fallback is inactive).
543    pub community: Option<Arc<mnem_graphrag::community::CommunityAssignment>>,
544    /// C3 FIX-1: cached row-stochastic CSR matrix used by PPR.
545    /// Invalidated alongside `adjacency` / `knn_edges` so the matrix
546    /// always reflects the current op-id's adjacency. Shared across
547    /// retrieves at the same op-id; `mnem_core::ppr::ppr_with_matrix`
548    /// is byte-identical to the from-scratch `ppr()` path (pinned by
549    /// the `ppr_with_matrix_matches_ppr_on_small_graph` integration
550    /// test), so turning this cache on is a pure-speed change.
551    pub ppr_matrix: Option<Arc<mnem_core::ppr::SparseTransition>>,
552    /// Gap 10 Phase-1: prior-commit assignment surviving op-id churn.
553    pub community_stale: Option<Arc<mnem_graphrag::community::CommunityAssignment>>,
554    /// Gap 10 Phase-1: debounce + storm-cap policy state.
555    pub leiden_cache: LeidenCache,
556}
557
558impl GraphCache {
559    /// Invalidate if the repo's op-id has moved. Called on every
560    /// cache access, mirrors [`IndexCache::reconcile`].
561    pub fn reconcile(&mut self, repo: &ReadonlyRepo) {
562        let current = Some(repo.op_id().clone());
563        if self.key != current {
564            self.key = current;
565            self.adjacency = None;
566            self.knn_edges = None;
567            self.knn_key = None;
568            self.hybrid = None;
569            // Gap 10 Phase-1: demote current assignment to `community_stale`
570            // so retrieves inside the debounce window can serve it.
571            if let Some(prev) = self.community.take() {
572                self.community_stale = Some(prev);
573            }
574            self.ppr_matrix = None;
575        }
576    }
577
578    /// Gap 10 Phase-1 entry point. Resolves the current mode via
579    /// [`LeidenCache::select_mode`], increments
580    /// `mnem_leiden_mode_total{mode=...}` + mirrors gauge triad, then
581    /// returns the assignment and the served mode.
582    pub fn community_for_head(
583        &mut self,
584        repo: &ReadonlyRepo,
585        vector: Option<&BruteForceVectorIndex>,
586        metrics: &Metrics,
587    ) -> Result<
588        (
589            Arc<mnem_graphrag::community::CommunityAssignment>,
590            LeidenMode,
591        ),
592        crate::error::Error,
593    > {
594        self.reconcile(repo);
595        let adj = self.hybrid_adjacency_for(repo, vector)?;
596        let node_count = authored_node_count(adj.as_ref());
597        let now = Instant::now();
598        let mode = self.leiden_cache.select_mode(node_count, now);
599
600        metrics
601            .leiden_debounce_effective
602            .set(i64::try_from(self.leiden_cache.effective_debounce_ms()).unwrap_or(i64::MAX));
603        metrics
604            .leiden_storm_cap_effective
605            .set(i64::from(self.leiden_cache.storm_cap_per_min));
606        #[allow(clippy::cast_possible_truncation)]
607        let delta_pp10k = (DELTA_RATIO_FORCE_FULL * 10_000.0) as i64;
608        metrics.leiden_delta_ratio_effective.set(delta_pp10k);
609        metrics.leiden_mode_current.set(mode.gauge_value());
610        metrics
611            .leiden_mode
612            .get_or_create(&LeidenModeLabels {
613                mode: mode.label().to_string(),
614            })
615            .inc();
616
617        match mode {
618            LeidenMode::Full | LeidenMode::FullDebounced => {
619                if let Some(c) = &self.community {
620                    return Ok((c.clone(), mode));
621                }
622                let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
623                let arc = Arc::new(assignment);
624                self.community = Some(arc.clone());
625                if matches!(mode, LeidenMode::FullDebounced) {
626                    self.leiden_cache.last_recompute_at = Some(now);
627                }
628                self.leiden_cache.observe_commit_arrival(now);
629                Ok((arc, mode))
630            }
631            LeidenMode::FallbackStale => {
632                if let Some(c) = &self.community {
633                    return Ok((c.clone(), mode));
634                }
635                if let Some(c) = &self.community_stale {
636                    return Ok((c.clone(), mode));
637                }
638                let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
639                let arc = Arc::new(assignment);
640                self.community_stale = Some(arc.clone());
641                Ok((arc, mode))
642            }
643        }
644    }
645
646    /// C3 FIX-1: fetch (or build) the row-stochastic CSR matrix used
647    /// by PPR, over the hybrid adjacency. The matrix is a pure
648    /// function of the adjacency; caching it lets repeated retrieves
649    /// at the same op-id skip the 3-pass CSR build (dominates PPR
650    /// cost at ~15 iters on small graphs).
651    ///
652    /// Byte-identity with the uncached `ppr()` path is pinned by the
653    /// `ppr_with_matrix_matches_ppr_on_small_graph` integration test
654    /// in mnem-core.
655    pub fn ppr_matrix_for(
656        &mut self,
657        repo: &ReadonlyRepo,
658        vector: Option<&BruteForceVectorIndex>,
659    ) -> Result<Arc<mnem_core::ppr::SparseTransition>, crate::error::Error> {
660        self.reconcile(repo);
661        if let Some(m) = &self.ppr_matrix {
662            return Ok(m.clone());
663        }
664        let adj = self.hybrid_adjacency_for(repo, vector)?;
665        let m = Arc::new(mnem_core::ppr::sparse_transition_matrix(adj.as_ref()));
666        self.ppr_matrix = Some(m.clone());
667        Ok(m)
668    }
669
670    /// Fetch (or build) the authored-adjacency snapshot. Back-compat
671    /// wrapper preserved for callers that do NOT want KNN fallback
672    /// (e.g. tests asserting the pure-authored path).
673    pub fn adjacency_for(
674        &mut self,
675        repo: &ReadonlyRepo,
676    ) -> Result<Arc<AuthoredEdges>, crate::error::Error> {
677        self.reconcile(repo);
678        if let Some(a) = &self.adjacency {
679            return Ok(a.clone());
680        }
681        let a = Arc::new(collect_authored_edges(repo)?);
682        self.adjacency = Some(a.clone());
683        Ok(a)
684    }
685
686    /// Fetch (or build) the Leiden community assignment over the
687    /// authored-only adjacency. Back-compat entry point.
688    pub fn community_for(
689        &mut self,
690        repo: &ReadonlyRepo,
691    ) -> Result<Arc<mnem_graphrag::community::CommunityAssignment>, crate::error::Error> {
692        self.reconcile(repo);
693        if let Some(c) = &self.community {
694            return Ok(c.clone());
695        }
696        let adj = self.adjacency_for(repo)?;
697        let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
698        let arc = Arc::new(assignment);
699        self.community = Some(arc.clone());
700        Ok(arc)
701    }
702
703    /// C3 Patch-B entry point: fetch (or build) the authored adjacency;
704    /// when it is empty (and `vector` is non-empty), derive a KNN-edge
705    /// substrate via `mnem_ann::derive_knn_edges_from_vectors` and
706    /// cache a combined [`DerivedHybridAdjacency`].
707    ///
708    /// Returns a trait-object `Arc<dyn AdjacencyIndex + Send + Sync>`
709    /// so the retriever's `with_adjacency_index` consumer can accept
710    /// it without knowing whether the KNN lane fired.
711    ///
712    /// # Zero-impact contracts
713    ///
714    /// - Authored non-empty -> returns the authored snapshot; KNN lane
715    ///   never runs; `knn_edges` stays `None`.
716    /// - `vector` is `None` or empty -> returns the authored snapshot
717    ///   (possibly empty); no KNN fallback; preserves the "no graph,
718    ///   no filter" legacy behaviour.
719    /// - Same `op_id` + same vector-index content -> served from the
720    ///   single-slot cache; `derive_knn_edges_from_vectors` is not
721    ///   re-run.
722    pub fn hybrid_adjacency_for(
723        &mut self,
724        repo: &ReadonlyRepo,
725        vector: Option<&BruteForceVectorIndex>,
726    ) -> Result<Arc<dyn AdjacencyIndex + Send + Sync>, crate::error::Error> {
727        self.reconcile(repo);
728        let authored = self.adjacency_for(repo)?;
729
730        // Zero-impact short-circuits.
731        if !authored.edges.is_empty() {
732            return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
733        }
734        let Some(vec_idx) = vector else {
735            return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
736        };
737        if vec_idx.is_empty() {
738            return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
739        }
740
741        // Fallback active: derive (or fetch cached) KNN edges. The
742        // `KnnEdgeIndex` CID folds in (root_cid, k, metric) so the
743        // cache key is stable across restarts given the same vector
744        // content.
745        self.ensure_knn_edges(vec_idx)?;
746        let knn = self
747            .knn_edges
748            .clone()
749            .expect("ensure_knn_edges populated the slot");
750        // One-shot info log the first time the fallback path wins.
751        if !KNN_FALLBACK_LOGGED.swap(true, Ordering::Relaxed) {
752            tracing::info!(
753                target: "mnem_http::graph_cache",
754                k = KNN_FALLBACK_K,
755                metric = "cosine",
756                knn_edges = knn.len(),
757                vector_model = %vec_idx.model(),
758                "authored adjacency empty; KNN-edge fallback activated (E0 wire)",
759            );
760        }
761        if self.hybrid.is_none() {
762            self.hybrid = Some(Arc::new(DerivedHybridAdjacency {
763                authored: authored.clone(),
764                knn: (*knn).clone(),
765            }));
766        }
767        Ok(self.hybrid.clone().expect("hybrid slot populated above")
768            as Arc<dyn AdjacencyIndex + Send + Sync>)
769    }
770
771    /// C3 Patch-B: community assignment over the hybrid adjacency.
772    /// Falls through to the authored-only path when `vector` is absent
773    /// or empty, or when authored edges are already non-empty.
774    pub fn hybrid_community_for(
775        &mut self,
776        repo: &ReadonlyRepo,
777        vector: Option<&BruteForceVectorIndex>,
778    ) -> Result<Arc<mnem_graphrag::community::CommunityAssignment>, crate::error::Error> {
779        self.reconcile(repo);
780        if let Some(c) = &self.community {
781            return Ok(c.clone());
782        }
783        let adj = self.hybrid_adjacency_for(repo, vector)?;
784        // compute_communities takes `&dyn AdjacencyIndex`; the trait
785        // object's deref coerces.
786        let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
787        let arc = Arc::new(assignment);
788        self.community = Some(arc.clone());
789        Ok(arc)
790    }
791
792    /// Derive KNN edges from `vector` and cache them along with the
793    /// `KnnEdgeIndex` CID as the cache key. Idempotent: returns early
794    /// when the key already matches.
795    fn ensure_knn_edges(
796        &mut self,
797        vector: &BruteForceVectorIndex,
798    ) -> Result<(), crate::error::Error> {
799        // Collect (ids, vecs) from the brute-force index. The slices
800        // are borrowed from the index's flat buffer (zero copy up to
801        // the `.to_vec()` the KNN builder does internally when it
802        // clones per-source rows).
803        let mut ids: Vec<NodeId> = Vec::with_capacity(vector.len());
804        let mut vecs: Vec<Vec<f32>> = Vec::with_capacity(vector.len());
805        for (id, row) in vector.points_iter() {
806            ids.push(id);
807            vecs.push(row.to_vec());
808        }
809        let edges = mnem_ann::derive_knn_edges_from_vectors(
810            &ids,
811            &vecs,
812            KNN_FALLBACK_K,
813            mnem_ann::DistanceMetric::Cosine,
814        );
815        // Assemble a KnnEdgeIndex purely to compute the cache-key CID.
816        // The `root_cid` is the content hash of (model, dim, ids,
817        // flat_vecs); we lean on the HybridAdjacency E0 substrate's
818        // existing `compute_cid` so the key is stable across restarts
819        // given identical vector content.
820        let root_cid = vector_index_content_cid(vector, &ids)?;
821        let idx = mnem_ann::KnnEdgeIndex {
822            root_cid,
823            k: KNN_FALLBACK_K,
824            metric: mnem_ann::DistanceMetric::Cosine,
825            edges,
826        };
827        let cid = idx
828            .compute_cid()
829            .map_err(|e| crate::error::Error::internal(format!("knn edge cid: {e}")))?;
830        if self.knn_key.as_ref() == Some(&cid) && self.knn_edges.is_some() {
831            return Ok(());
832        }
833        let triples: Vec<(NodeId, NodeId, f32)> = idx
834            .edges
835            .into_iter()
836            .map(|e| (e.src, e.dst, e.weight))
837            .collect();
838        self.knn_edges = Some(Arc::new(triples));
839        self.knn_key = Some(cid);
840        self.hybrid = None; // invalidate derived combined view
841        Ok(())
842    }
843}
844
845/// Derive a stable content-address for a [`BruteForceVectorIndex`] by
846/// hashing `(model, dim, canonical_ids)`. Two vector indexes with the
847/// same model, dimensionality and ID set share a CID, which is the
848/// coarser grain we want for the KNN-edge cache key (the vectors
849/// themselves live in the Prolly tree keyed by NodeId, so equal IDs
850/// at the same op_id imply equal vector contents).
851fn vector_index_content_cid(
852    vector: &BruteForceVectorIndex,
853    ids: &[NodeId],
854) -> Result<Cid, crate::error::Error> {
855    use mnem_core::codec::to_canonical_bytes;
856    use mnem_core::id::{CODEC_RAW, Multihash};
857    #[derive(serde::Serialize)]
858    struct Preimage<'a> {
859        tag: &'a str,
860        model: &'a str,
861        dim: u32,
862        ids: &'a [NodeId],
863    }
864    let pre = Preimage {
865        tag: "mnem-http/knn-fallback/v1",
866        model: vector.model(),
867        dim: vector.dim(),
868        ids,
869    };
870    let body = to_canonical_bytes(&pre)
871        .map_err(|e| crate::error::Error::internal(format!("canonical encode: {e}")))?;
872    let hash = Multihash::sha2_256(&body);
873    Ok(Cid::new(CODEC_RAW, hash))
874}
875
876/// Gap 10 Phase-1: distinct node count walking adjacency edges.
877fn authored_node_count(adj: &(dyn AdjacencyIndex + Send + Sync)) -> usize {
878    use std::collections::BTreeSet;
879    let mut seen: BTreeSet<NodeId> = BTreeSet::new();
880    for e in adj.iter_edges() {
881        seen.insert(e.src);
882        seen.insert(e.dst);
883    }
884    seen.len()
885}
886
887/// Walk `commit.edges` Prolly tree once and collect every authored
888/// edge as a `(src, dst)` pair in canonical traversal order.
889fn collect_authored_edges(repo: &ReadonlyRepo) -> Result<AuthoredEdges, crate::error::Error> {
890    let Some(commit) = repo.head_commit() else {
891        return Ok(AuthoredEdges::default());
892    };
893    let bs = repo.blockstore().clone();
894    let cursor = mnem_core::prolly::Cursor::new(&*bs, &commit.edges)
895        .map_err(|e| crate::error::Error::internal(format!("opening edge cursor: {e}")))?;
896    let mut edges: Vec<(NodeId, NodeId)> = Vec::new();
897    for entry in cursor {
898        let (_key, edge_cid) =
899            entry.map_err(|e| crate::error::Error::internal(format!("walking edge tree: {e}")))?;
900        let bytes = bs
901            .get(&edge_cid)
902            .map_err(|e| crate::error::Error::internal(format!("fetching edge block: {e}")))?
903            .ok_or_else(|| {
904                crate::error::Error::internal(format!("edge block {edge_cid} missing"))
905            })?;
906        let edge: mnem_core::objects::Edge = mnem_core::codec::from_canonical_bytes(&bytes)
907            .map_err(|e| crate::error::Error::internal(format!("decoding edge: {e}")))?;
908        edges.push((edge.src, edge.dst));
909    }
910    Ok(AuthoredEdges { edges })
911}
912
913#[cfg(test)]
914pub(crate) mod test_support {
915    //! Test-only builders for [`AppState`]. Scoped `pub(crate)` so
916    //! sibling modules (`auth`, `routes::remote`) can build a minimal
917    //! in-memory state without paying the full `app_with_options`
918    //! path (redb open + config load) for every unit test.
919
920    use super::*;
921    use mnem_core::store::{MemoryBlockstore, MemoryOpHeadsStore};
922
923    /// Build an `AppState` backed by in-memory stores, with an
924    /// optional push token. Used by extractor and route unit tests.
925    pub(crate) fn state_with_token(token: Option<String>) -> AppState {
926        let bs: Arc<dyn mnem_core::store::Blockstore> = Arc::new(MemoryBlockstore::new());
927        let ohs: Arc<dyn mnem_core::store::OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
928        let repo = ReadonlyRepo::init(bs, ohs).expect("init ok");
929        AppState {
930            repo: Arc::new(Mutex::new(repo)),
931            embed_cfg: None,
932            sparse_cfg: None,
933            indexes: Arc::new(Mutex::new(IndexCache::default())),
934            allow_labels: false,
935            metrics: Metrics::new(),
936            push_token: token,
937            graph_cache: Arc::new(Mutex::new(GraphCache::default())),
938            traverse_cfg: Arc::new(crate::routes::traverse::TraverseAnswerCfg::default()),
939            ner_cfg: None,
940        }
941    }
942}
943
944#[cfg(test)]
945mod knn_fallback_tests {
946    //! C3 Patch-B unit tests: `GraphCache` KNN-edge fallback activation
947    //! and zero-impact short-circuits.
948
949    use super::*;
950    use bytes::Bytes;
951    use mnem_core::objects::node::{Dtype, Embedding};
952    use mnem_core::store::{Blockstore, MemoryBlockstore, MemoryOpHeadsStore, OpHeadsStore};
953
954    fn stores() -> (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) {
955        (
956            Arc::new(MemoryBlockstore::new()),
957            Arc::new(MemoryOpHeadsStore::new()),
958        )
959    }
960
961    fn f32_embed(model: &str, v: &[f32]) -> Embedding {
962        let mut bytes = Vec::with_capacity(v.len() * 4);
963        for x in v {
964            bytes.extend_from_slice(&x.to_le_bytes());
965        }
966        Embedding {
967            model: model.to_string(),
968            dtype: Dtype::F32,
969            dim: u32::try_from(v.len()).expect("test vec fits in u32"),
970            vector: Bytes::from(bytes),
971        }
972    }
973
974    fn build_vector_index(rows: &[(NodeId, Vec<f32>)]) -> BruteForceVectorIndex {
975        let mut idx = BruteForceVectorIndex::empty("m", 3);
976        for (id, v) in rows {
977            let inserted = idx.try_insert(*id, &f32_embed("m", v));
978            assert!(inserted, "embedding insert");
979        }
980        idx
981    }
982
983    #[test]
984    fn empty_authored_plus_empty_vector_is_no_op() {
985        let (bs, ohs) = stores();
986        let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
987        let mut gc = GraphCache::default();
988        let adj = gc.hybrid_adjacency_for(&repo, None).ok().expect("no-op");
989        assert_eq!(adj.edge_count(), 0, "no vectors -> no KNN fallback");
990        assert!(gc.knn_edges.is_none());
991    }
992
993    #[test]
994    fn empty_authored_plus_populated_vector_activates_fallback() {
995        let (bs, ohs) = stores();
996        let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
997        // 4 distinct L2-direction vectors; KNN k=32 (capped at n-1=3)
998        // yields at least one directed edge per source.
999        let rows: Vec<(NodeId, Vec<f32>)> = vec![
1000            (NodeId::new_v7(), vec![1.0, 0.0, 0.0]),
1001            (NodeId::new_v7(), vec![0.9, 0.1, 0.0]),
1002            (NodeId::new_v7(), vec![0.0, 1.0, 0.0]),
1003            (NodeId::new_v7(), vec![0.0, 0.0, 1.0]),
1004        ];
1005        let vec_idx = build_vector_index(&rows);
1006
1007        let mut gc = GraphCache::default();
1008        let adj = gc
1009            .hybrid_adjacency_for(&repo, Some(&vec_idx))
1010            .ok()
1011            .expect("knn fallback ok");
1012        assert!(
1013            adj.edge_count() > 0,
1014            "KNN fallback must produce at least one edge (got 0)",
1015        );
1016        assert!(gc.knn_edges.is_some(), "knn_edges slot populated");
1017        assert!(gc.knn_key.is_some(), "knn cache key populated");
1018
1019        // Community assignment must see the derived graph: at least one
1020        // community membership lookup must be Some (non-trivial graph).
1021        let assignment = gc
1022            .hybrid_community_for(&repo, Some(&vec_idx))
1023            .ok()
1024            .expect("community ok");
1025        let any_assigned = rows
1026            .iter()
1027            .any(|(id, _)| assignment.community_of(*id).is_some());
1028        assert!(
1029            any_assigned,
1030            "at least one node must have a community under a non-empty adjacency",
1031        );
1032    }
1033
1034    #[test]
1035    fn knn_fallback_is_idempotent_on_same_vector() {
1036        let (bs, ohs) = stores();
1037        let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
1038        let rows: Vec<(NodeId, Vec<f32>)> = vec![
1039            (NodeId::new_v7(), vec![1.0, 0.0, 0.0]),
1040            (NodeId::new_v7(), vec![0.0, 1.0, 0.0]),
1041        ];
1042        let vec_idx = build_vector_index(&rows);
1043        let mut gc = GraphCache::default();
1044        let _ = gc
1045            .hybrid_adjacency_for(&repo, Some(&vec_idx))
1046            .ok()
1047            .expect("first build");
1048        let first_key = gc.knn_key.clone().expect("first build populates key");
1049        // Second call must re-use the cached slot (same op_id + same
1050        // vector content -> same KnnEdgeIndex CID).
1051        let _ = gc
1052            .hybrid_adjacency_for(&repo, Some(&vec_idx))
1053            .ok()
1054            .expect("second build");
1055        let second_key = gc.knn_key.clone().expect("second build populates key");
1056        assert_eq!(first_key, second_key, "KNN cache key stable across calls");
1057    }
1058}