Skip to main content

mnem_http/
state.rs

1//! Shared app state. One `ReadonlyRepo` per server, behind an
2//! `Arc<Mutex>` because redb holds an exclusive file lock so we can
3//! only have one open per process. Writes take the lock, run a
4//! transaction, and replace the repo with the post-commit value.
5
6use std::collections::{HashMap, VecDeque};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10
11use mnem_core::id::{Cid, NodeId};
12use mnem_core::index::hybrid::{AdjEdge, AdjacencyIndex, EdgeProvenance};
13use mnem_core::index::{BruteForceVectorIndex, SparseInvertedIndex, VectorIndex};
14use mnem_core::repo::ReadonlyRepo;
15
16use crate::metrics::{LeidenModeLabels, Metrics};
17
18/// Default per-source neighbour count for the KNN-edge fallback. 32
19/// matches the `k=32, metric=cosine` determinism contract the
20/// `KnnEdgeIndex::compute_cid` key derivation bakes in.
21const KNN_FALLBACK_K: u32 = 32;
22
23/// One-shot guard so the "KNN-edge fallback activated" info-level log
24/// fires once per process lifetime (not once per retrieve). Keeps the
25/// prod log from flooding while still emitting the "yes, E0 wire is
26/// live" breadcrumb.
27static KNN_FALLBACK_LOGGED: AtomicBool = AtomicBool::new(false);
28
29// ---------------------------------------------------------------
30// Gap 10 Phase-1: community-cache invalidation tunables
31// ---------------------------------------------------------------
32
33/// Gap 10 R6 floor-c tunable: commit-storm DoS cap per minute. Default 60.
34///
35/// `#tunable: default=60; rationale="attacker cannot amplify beyond human commit rate"`
36pub const COMMIT_STORM_CAP_PER_MIN: u32 = 60;
37
38/// Gap 10 R6 floor-c tunable: fraction of the graph that must change
39/// in one commit before an incremental recompute path force-flips to
40/// full. Exported today as a gauge-visible constant; consulted by the
41/// debounced-full recompute loop when deciding whether to bypass the
42/// incremental shortcut.
43///
44/// `#tunable: default=0.5; rationale="half-graph change = incremental not cheaper than full"`
45pub const DELTA_RATIO_FORCE_FULL: f32 = 0.5;
46
47/// Gap 10 Phase-1 graph-size gate: above this node count the hot path
48/// refuses to run full-Leiden inline and serves `FallbackStale`.
49///
50/// `#tunable: default=250_000; rationale="HNSW memory derivation; see benchmarks/leiden-wallclock-vs-V.md"`
51pub const GRAPH_SIZE_GATE_V: usize = 250_000;
52
53/// Gap 10 R3 debounce floor.
54pub const DEBOUNCE_FLOOR_MS: u64 = 1_000;
55
56/// Size of the rolling ring buffer used for p75 commit-latency.
57pub const COMMIT_LATENCY_WINDOW: usize = 100;
58
59/// Gap 10 R6 code-sketch API: runtime-derived debounce window.
60#[must_use]
61pub fn derive_debounce_ms(rolling_p75_commit_ms: Option<u64>) -> u64 {
62    rolling_p75_commit_ms
63        .map(|p| p.max(DEBOUNCE_FLOOR_MS))
64        .unwrap_or(DEBOUNCE_FLOOR_MS)
65}
66
67/// Gap 10 Phase-1 recompute-mode enum. Closed vocabulary.
68#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
69pub enum LeidenMode {
70    /// Immediate full recompute on every commit. Env-forced.
71    Full,
72    /// Debounced full recompute (a future version default).
73    FullDebounced,
74    /// Served prior commit's assignment; refresh suppressed.
75    FallbackStale,
76}
77
78impl LeidenMode {
79    /// Prometheus label string.
80    #[must_use]
81    pub fn label(&self) -> &'static str {
82        match self {
83            Self::Full => "full",
84            Self::FullDebounced => "full_debounced",
85            Self::FallbackStale => "fallback_stale",
86        }
87    }
88
89    /// Gauge encoding for `mnem_leiden_mode_current`.
90    #[must_use]
91    pub fn gauge_value(&self) -> i64 {
92        match self {
93            Self::Full => 0,
94            Self::FullDebounced => 1,
95            Self::FallbackStale => 2,
96        }
97    }
98
99    /// Resolve default mode from env at startup.
100    #[must_use]
101    pub fn resolve_default_from_env() -> Self {
102        match std::env::var("MNEM_LEIDEN_FULL_RECOMPUTE").ok() {
103            Some(v) => {
104                let t = v.trim().to_ascii_lowercase();
105                if t.is_empty() || matches!(t.as_str(), "0" | "false" | "no" | "off") {
106                    Self::FullDebounced
107                } else {
108                    Self::Full
109                }
110            }
111            None => Self::FullDebounced,
112        }
113    }
114}
115
116/// Gap 10 Phase-1 debounce + storm-cap state.
117#[derive(Debug)]
118pub struct LeidenCache {
119    /// Rolling ring of commit wall-clock latencies (milliseconds).
120    pub commit_latency_ms: VecDeque<u64>,
121    /// Instant of the most recent successful full recompute.
122    pub last_recompute_at: Option<Instant>,
123    /// Ring of commit arrivals inside the trailing 60s.
124    pub commit_arrivals: VecDeque<Instant>,
125    /// Default mode at process start.
126    pub default_mode: LeidenMode,
127    /// Effective storm cap (operator-overridable).
128    pub storm_cap_per_min: u32,
129}
130
131impl Default for LeidenCache {
132    fn default() -> Self {
133        Self {
134            commit_latency_ms: VecDeque::with_capacity(COMMIT_LATENCY_WINDOW),
135            last_recompute_at: None,
136            commit_arrivals: VecDeque::new(),
137            default_mode: LeidenMode::FullDebounced,
138            storm_cap_per_min: COMMIT_STORM_CAP_PER_MIN,
139        }
140    }
141}
142
143impl LeidenCache {
144    /// Record a completed-commit wall-time sample.
145    pub fn observe_commit_latency(&mut self, latency: Duration) {
146        let ms = u64::try_from(latency.as_millis()).unwrap_or(u64::MAX);
147        if self.commit_latency_ms.len() == COMMIT_LATENCY_WINDOW {
148            self.commit_latency_ms.pop_front();
149        }
150        self.commit_latency_ms.push_back(ms);
151    }
152
153    /// Record a commit arrival; evicts entries older than 60s.
154    pub fn observe_commit_arrival(&mut self, at: Instant) {
155        let cutoff = at.checked_sub(Duration::from_mins(1)).unwrap_or(at);
156        while let Some(front) = self.commit_arrivals.front() {
157            if *front < cutoff {
158                self.commit_arrivals.pop_front();
159            } else {
160                break;
161            }
162        }
163        self.commit_arrivals.push_back(at);
164    }
165
166    /// Nearest-rank p75 of the rolling commit-latency ring.
167    #[must_use]
168    pub fn rolling_p75_commit_ms(&self) -> Option<u64> {
169        if self.commit_latency_ms.is_empty() {
170            return None;
171        }
172        let mut sorted: Vec<u64> = self.commit_latency_ms.iter().copied().collect();
173        sorted.sort_unstable();
174        let n = sorted.len();
175        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
176        let idx = ((n as f64 * 0.75).ceil() as usize)
177            .saturating_sub(1)
178            .min(n - 1);
179        Some(sorted[idx])
180    }
181
182    /// Effective debounce window via [`derive_debounce_ms`].
183    #[must_use]
184    pub fn effective_debounce_ms(&self) -> u64 {
185        derive_debounce_ms(self.rolling_p75_commit_ms())
186    }
187
188    /// Is the 60s commit-arrival ring at or above the storm cap?
189    #[must_use]
190    pub fn storm_cap_reached(&self) -> bool {
191        u32::try_from(self.commit_arrivals.len()).unwrap_or(u32::MAX) >= self.storm_cap_per_min
192    }
193
194    /// Pure policy function. Returns the mode to serve this call.
195    #[must_use]
196    pub fn select_mode(&self, node_count: usize, now: Instant) -> LeidenMode {
197        if self.default_mode == LeidenMode::Full {
198            return LeidenMode::Full;
199        }
200        if node_count >= GRAPH_SIZE_GATE_V {
201            return LeidenMode::FallbackStale;
202        }
203        if self.storm_cap_reached() {
204            return LeidenMode::FallbackStale;
205        }
206        if let Some(last) = self.last_recompute_at {
207            let elapsed_ms =
208                u64::try_from(now.saturating_duration_since(last).as_millis()).unwrap_or(u64::MAX);
209            if elapsed_ms < self.effective_debounce_ms() {
210                return LeidenMode::FallbackStale;
211            }
212        }
213        LeidenMode::FullDebounced
214    }
215}
216
217/// Shared application state passed to every axum handler via
218/// `State<AppState>`. Clones are cheap (shared `Arc`).
219#[derive(Clone)]
220pub struct AppState {
221    /// The open repo, held behind a mutex because redb keeps an
222    /// exclusive file lock per-process. Writes replace the value
223    /// inside the mutex with the post-commit `ReadonlyRepo`.
224    pub repo: Arc<Mutex<ReadonlyRepo>>,
225    /// Optional embed-provider config resolved from the repo's
226    /// `config.toml` at startup.
227    pub embed_cfg: Option<mnem_embed_providers::ProviderConfig>,
228    /// Optional sparse-provider config resolved from the repo's
229    /// `config.toml` at startup. When present, POST `/v1/nodes` and
230    /// POST `/v1/nodes/bulk` auto-populate `Node.sparse_embed` on
231    /// ingest, and `/v1/retrieve` auto-encodes the query via
232    /// `SparseEncoder::encode_query` so the neural-sparse lane fires
233    /// end-to-end.
234    pub sparse_cfg: Option<mnem_sparse_providers::ProviderConfig>,
235    /// Index cache keyed by commit CID. Fixes audit gap G1: without
236    /// this, every `/v1/retrieve` call rebuilt the vector + sparse
237    /// indexes from scratch (O(N) per query). With this, the first
238    /// retrieve after a commit pays the build cost; every subsequent
239    /// retrieve returns in microseconds.
240    ///
241    /// Invalidation is automatic: any write path that commits also
242    /// produces a new head commit CID (via
243    /// `Transaction::commit -> ReadonlyRepo`). The cache sees the
244    /// mismatch next time and evicts.
245    pub indexes: Arc<Mutex<IndexCache>>,
246    /// Whether the server accepts caller-supplied `label` values on
247    /// ingest and `label` filters on retrieve. Read from the
248    /// `MNEM_BENCH` environment variable at startup.
249    ///
250    /// Defaults to `false`. Casual / single-tenant / personal-graph
251    /// installations keep label hidden: every ingested node gets
252    /// `ntype = Node::DEFAULT_NTYPE` ("Node") regardless of what the
253    /// caller sent, and retrieve ignores any label filter. No way to
254    /// flip this via a CLI flag or request body - operators opt in by
255    /// setting `MNEM_BENCH=1` at server launch, which is how the
256    /// reference benchmark harnesses in this repo pin per-item
257    /// isolation. Zero surface area for a regular user to stumble
258    /// into label-scoped state.
259    pub allow_labels: bool,
260    /// Prometheus metrics registry shared with the `/metrics` route
261    /// and the `track_metrics` middleware. Clones are cheap (`Arc`
262    /// inside); no per-request allocation.
263    pub metrics: Metrics,
264    /// Bearer token that authorises `/remote/v1/push-blocks` and
265    /// `/remote/v1/advance-head`. Read from `MNEM_HTTP_PUSH_TOKEN`
266    /// at startup. `None` means those routes are administratively
267    /// disabled (fail-closed): they return 503 regardless of
268    /// whatever the caller presented.
269    ///
270    /// The token never touches disk and is never emitted to tracing.
271    /// See [`crate::auth`] for the extractor that enforces the
272    /// check.
273    pub push_token: Option<String>,
274    /// C3 FIX-1 + FIX-2: authored-edges adjacency + Leiden community
275    /// assignment cache, keyed on the repo's op-id. Populated lazily
276    /// on the first retrieve that asks for `community_filter=true` or
277    /// `graph_mode="ppr"`. Invalidated whenever the op-id changes
278    /// (any write path). Single-slot cache: one authored-adjacency
279    /// snapshot is shared across requests until the next commit.
280    pub graph_cache: Arc<Mutex<GraphCache>>,
281    /// Gap 09 - `/v1/traverse_answer` runtime config. Default OFF per
282    /// architect Decision 4; opt in via
283    /// `[experimental] single_call_multihop = true` in the repo's
284    /// `config.toml`. Wrapped in `Arc` for cheap clones across handler
285    /// dispatch.
286    pub traverse_cfg: Arc<crate::routes::traverse::TraverseAnswerCfg>,
287}
288
289impl AppState {
290    /// Read `MNEM_BENCH` at startup. See [`Self::parse_allow_labels`]
291    /// for the truthy / falsy string rules.
292    #[must_use]
293    pub fn resolve_allow_labels_from_env() -> bool {
294        Self::parse_allow_labels(std::env::var("MNEM_BENCH").ok().as_deref())
295    }
296
297    /// Read `MNEM_HTTP_PUSH_TOKEN` at startup. Empty / unset -> `None`
298    /// (writes fail-closed). See [`crate::auth::RequireBearer`] for the
299    /// extractor that enforces the check.
300    #[must_use]
301    pub fn resolve_push_token_from_env() -> Option<String> {
302        let raw = std::env::var("MNEM_HTTP_PUSH_TOKEN").ok()?;
303        let trimmed = raw.trim();
304        if trimmed.is_empty() {
305            None
306        } else {
307            Some(trimmed.to_string())
308        }
309    }
310
311    /// Pure parser for the `MNEM_BENCH` value. `None` (unset) is
312    /// false. Falsy strings (`"0"`, `"false"`, `"no"`, `"off"`, empty,
313    /// all case-insensitive) are false. Anything else is true.
314    #[must_use]
315    pub fn parse_allow_labels(val: Option<&str>) -> bool {
316        match val {
317            None => false,
318            Some(s) => {
319                let t = s.trim();
320                if t.is_empty() {
321                    return false;
322                }
323                let l = t.to_ascii_lowercase();
324                !matches!(l.as_str(), "0" | "false" | "no" | "off")
325            }
326        }
327    }
328}
329
330/// Server-side cache of built retrieval indexes. Keyed on the repo's
331/// current head-commit CID; when that changes, the whole cache is
332/// treated as stale and rebuilt on demand.
333#[derive(Default)]
334pub struct IndexCache {
335    /// Repo op-id the current cache was built against. `None` on
336    /// startup or after a reset. When this differs from the repo's
337    /// current op-id, every entry below is considered stale.
338    ///
339    /// Named `cache_key_op_id` (not `commit_cid`) because op-id is
340    /// what we key on: op-id exists on a freshly-initialised repo
341    /// with no commits yet, which the commit CID does not.
342    pub cache_key_op_id: Option<Cid>,
343    /// Dense vector indexes keyed by the embedder's `model` string
344    /// (e.g. `"ollama:nomic-embed-text"`). One repo can carry multiple
345    /// model families if the caller runs several retrieves with
346    /// different `vector_model`s.
347    pub vectors: HashMap<String, Arc<BruteForceVectorIndex>>,
348    /// Sparse indexes keyed by `SparseEmbed::vocab_id`.
349    pub sparse: HashMap<String, Arc<SparseInvertedIndex>>,
350}
351
352#[cfg(test)]
353mod mnem_bench_parse_tests {
354    use super::*;
355
356    #[test]
357    fn unset_parses_false() {
358        assert!(!AppState::parse_allow_labels(None));
359    }
360
361    #[test]
362    fn falsy_strings_parse_false() {
363        for v in [
364            "", "0", "false", "FALSE", "False", "no", "No", "NO", "off", "Off", "OFF", "  ", "  0 ",
365        ] {
366            assert!(
367                !AppState::parse_allow_labels(Some(v)),
368                "expected `{v:?}` to parse false"
369            );
370        }
371    }
372
373    #[test]
374    fn truthy_strings_parse_true() {
375        for v in ["1", "true", "yes", "on", "YES", "benchmark", "anything"] {
376            assert!(
377                AppState::parse_allow_labels(Some(v)),
378                "expected `{v:?}` to parse true"
379            );
380        }
381    }
382}
383
384impl IndexCache {
385    /// Reconcile the cache against the repo's current op-id. Called
386    /// on every cache access. When the op has changed (any write
387    /// flips it), all built indexes are dropped and the next getter
388    /// will rebuild.
389    ///
390    /// We key on `repo.op_id()` rather than `head_commit().cid`
391    /// because the op-id exists on freshly-initialised repos that
392    /// have no commits yet, and it changes on every write. Commit
393    /// CID would only change after the first real commit, leaving
394    /// an init-time cache entry with no way to invalidate.
395    pub fn reconcile(&mut self, repo: &ReadonlyRepo) {
396        let current = Some(repo.op_id().clone());
397        if self.cache_key_op_id != current {
398            self.cache_key_op_id = current;
399            self.vectors.clear();
400            self.sparse.clear();
401        }
402    }
403
404    /// Fetch (or build) a dense vector index for `model`.
405    pub fn vector_index(
406        &mut self,
407        repo: &ReadonlyRepo,
408        model: &str,
409    ) -> Result<Arc<BruteForceVectorIndex>, mnem_core::Error> {
410        self.reconcile(repo);
411        if let Some(idx) = self.vectors.get(model) {
412            return Ok(idx.clone());
413        }
414        let idx = Arc::new(repo.build_vector_index(model)?);
415        self.vectors.insert(model.to_string(), idx.clone());
416        Ok(idx)
417    }
418
419    /// Fetch (or build) a sparse inverted index for `vocab_id`.
420    pub fn sparse_index(
421        &mut self,
422        repo: &ReadonlyRepo,
423        vocab_id: &str,
424    ) -> Result<Arc<SparseInvertedIndex>, mnem_core::Error> {
425        self.reconcile(repo);
426        if let Some(idx) = self.sparse.get(vocab_id) {
427            return Ok(idx.clone());
428        }
429        let idx = Arc::new(SparseInvertedIndex::build_from_repo(repo, vocab_id)?);
430        self.sparse.insert(vocab_id.to_string(), idx.clone());
431        Ok(idx)
432    }
433}
434
435// ---------------------------------------------------------------
436// C3 FIX-1 + FIX-2: authored-edges + community-assignment cache.
437// ---------------------------------------------------------------
438
439/// Owned authored-edge list over which `AdjacencyIndex` is served.
440/// Collected once per op-id from the repo's commit.edges Prolly tree;
441/// Leiden community detection and PPR power iteration both read
442/// through the same snapshot.
443#[derive(Default, Debug)]
444pub struct AuthoredEdges {
445    /// Flat (src, dst) list in Prolly traversal order. Weighted `1.0`
446    /// by `iter_edges()` consumers; provenance stamped `Authored`.
447    pub edges: Vec<(NodeId, NodeId)>,
448}
449
450impl AdjacencyIndex for AuthoredEdges {
451    fn iter_edges(&self) -> Box<dyn Iterator<Item = AdjEdge> + '_> {
452        Box::new(self.edges.iter().map(|(s, d)| AdjEdge {
453            src: *s,
454            dst: *d,
455            weight: 1.0,
456            provenance: EdgeProvenance::Authored,
457        }))
458    }
459    fn edge_count(&self) -> usize {
460        self.edges.len()
461    }
462}
463
464/// C3 Patch-B: combined authored + KNN-derived adjacency, owned. Used
465/// by `GraphCache` to back both the Leiden community detector and the
466/// PPR adjacency wire in a single `AdjacencyIndex` impl without the
467/// lifetime acrobatics `mnem_core::HybridAdjacency<A,K>` imposes (it
468/// borrows both sources). Structurally equivalent to a
469/// `HybridAdjacency<AuthoredEdges, KnnSlice>` sent through an
470/// `Arc<dyn AdjacencyIndex + Send + Sync>` consumer.
471///
472/// # Flag-off / empty-KNN contract
473///
474/// When `knn` is empty, `iter_edges` yields exactly the authored
475/// stream in the authored source's native order, preserving the
476/// `mnem_core::tests::hybrid_adjacency_union` byte-identity contract.
477#[derive(Debug)]
478pub struct DerivedHybridAdjacency {
479    /// Authored-edge snapshot shared with `GraphCache::adjacency`.
480    pub authored: Arc<AuthoredEdges>,
481    /// KNN-derived edges in canonical `(src, dst)` ASC order. `weight`
482    /// is the similarity score, `provenance` is `Knn`.
483    pub knn: Vec<(NodeId, NodeId, f32)>,
484}
485
486impl AdjacencyIndex for DerivedHybridAdjacency {
487    fn iter_edges(&self) -> Box<dyn Iterator<Item = AdjEdge> + '_> {
488        let authored = self.authored.edges.iter().map(|(s, d)| AdjEdge {
489            src: *s,
490            dst: *d,
491            weight: 1.0,
492            provenance: EdgeProvenance::Authored,
493        });
494        let knn = self.knn.iter().map(|(s, d, w)| AdjEdge {
495            src: *s,
496            dst: *d,
497            weight: *w,
498            provenance: EdgeProvenance::Knn,
499        });
500        Box::new(authored.chain(knn))
501    }
502    fn edge_count(&self) -> usize {
503        self.authored.edges.len() + self.knn.len()
504    }
505}
506
507/// C3 FIX + Patch-B: authored adjacency, derived-KNN edges, and the
508/// community assignment keyed on `op_id`. Single-slot: one snapshot
509/// is shared across requests until the next commit.
510///
511/// Two-key design:
512/// - `key` = repo `op_id`; any write invalidates everything.
513/// - `knn_key` = content address of the KNN-edge index
514///   (`hash(root_cid, k, metric)`) used to avoid rebuilding the
515///   KNN-derived edges when the same vector index is re-used across
516///   retrieves at the same `op_id`.
517#[derive(Default)]
518pub struct GraphCache {
519    /// Op-id the cache was built against. `None` means empty.
520    pub key: Option<Cid>,
521    /// Shared authored-adjacency snapshot.
522    pub adjacency: Option<Arc<AuthoredEdges>>,
523    /// C3 Patch-B: KNN-derived edges produced by
524    /// `mnem_ann::derive_knn_edges_from_vectors` over the current
525    /// vector index. `None` means either (a) the authored adjacency
526    /// was already non-empty so KNN fallback never fired, or (b) the
527    /// vector index was empty (nothing to derive from).
528    pub knn_edges: Option<Arc<Vec<(NodeId, NodeId, f32)>>>,
529    /// Cache key for `knn_edges`:
530    /// `(KnnEdgeIndex::compute_cid, k, metric_tag)` projected down to
531    /// the content-address CID. Stable across restarts given the same
532    /// vector-index contents + k + metric.
533    pub knn_key: Option<Cid>,
534    /// Hybrid (authored + KNN) adjacency ready to hand out as
535    /// `Arc<dyn AdjacencyIndex + Send + Sync>`. Rebuilt whenever
536    /// `adjacency` or `knn_edges` changes.
537    pub hybrid: Option<Arc<DerivedHybridAdjacency>>,
538    /// Shared Leiden community assignment over `hybrid` (falls back
539    /// to `adjacency` when KNN fallback is inactive).
540    pub community: Option<Arc<mnem_graphrag::community::CommunityAssignment>>,
541    /// C3 FIX-1: cached row-stochastic CSR matrix used by PPR.
542    /// Invalidated alongside `adjacency` / `knn_edges` so the matrix
543    /// always reflects the current op-id's adjacency. Shared across
544    /// retrieves at the same op-id; `mnem_core::ppr::ppr_with_matrix`
545    /// is byte-identical to the from-scratch `ppr()` path (pinned by
546    /// the `ppr_with_matrix_matches_ppr_on_small_graph` integration
547    /// test), so turning this cache on is a pure-speed change.
548    pub ppr_matrix: Option<Arc<mnem_core::ppr::SparseTransition>>,
549    /// Gap 10 Phase-1: prior-commit assignment surviving op-id churn.
550    pub community_stale: Option<Arc<mnem_graphrag::community::CommunityAssignment>>,
551    /// Gap 10 Phase-1: debounce + storm-cap policy state.
552    pub leiden_cache: LeidenCache,
553}
554
555impl GraphCache {
556    /// Invalidate if the repo's op-id has moved. Called on every
557    /// cache access, mirrors [`IndexCache::reconcile`].
558    pub fn reconcile(&mut self, repo: &ReadonlyRepo) {
559        let current = Some(repo.op_id().clone());
560        if self.key != current {
561            self.key = current;
562            self.adjacency = None;
563            self.knn_edges = None;
564            self.knn_key = None;
565            self.hybrid = None;
566            // Gap 10 Phase-1: demote current assignment to `community_stale`
567            // so retrieves inside the debounce window can serve it.
568            if let Some(prev) = self.community.take() {
569                self.community_stale = Some(prev);
570            }
571            self.ppr_matrix = None;
572        }
573    }
574
575    /// Gap 10 Phase-1 entry point. Resolves the current mode via
576    /// [`LeidenCache::select_mode`], increments
577    /// `mnem_leiden_mode_total{mode=...}` + mirrors gauge triad, then
578    /// returns the assignment and the served mode.
579    pub fn community_for_head(
580        &mut self,
581        repo: &ReadonlyRepo,
582        vector: Option<&BruteForceVectorIndex>,
583        metrics: &Metrics,
584    ) -> Result<
585        (
586            Arc<mnem_graphrag::community::CommunityAssignment>,
587            LeidenMode,
588        ),
589        crate::error::Error,
590    > {
591        self.reconcile(repo);
592        let adj = self.hybrid_adjacency_for(repo, vector)?;
593        let node_count = authored_node_count(adj.as_ref());
594        let now = Instant::now();
595        let mode = self.leiden_cache.select_mode(node_count, now);
596
597        metrics
598            .leiden_debounce_effective
599            .set(i64::try_from(self.leiden_cache.effective_debounce_ms()).unwrap_or(i64::MAX));
600        metrics
601            .leiden_storm_cap_effective
602            .set(i64::from(self.leiden_cache.storm_cap_per_min));
603        #[allow(clippy::cast_possible_truncation)]
604        let delta_pp10k = (DELTA_RATIO_FORCE_FULL * 10_000.0) as i64;
605        metrics.leiden_delta_ratio_effective.set(delta_pp10k);
606        metrics.leiden_mode_current.set(mode.gauge_value());
607        metrics
608            .leiden_mode
609            .get_or_create(&LeidenModeLabels {
610                mode: mode.label().to_string(),
611            })
612            .inc();
613
614        match mode {
615            LeidenMode::Full | LeidenMode::FullDebounced => {
616                if let Some(c) = &self.community {
617                    return Ok((c.clone(), mode));
618                }
619                let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
620                let arc = Arc::new(assignment);
621                self.community = Some(arc.clone());
622                if matches!(mode, LeidenMode::FullDebounced) {
623                    self.leiden_cache.last_recompute_at = Some(now);
624                }
625                self.leiden_cache.observe_commit_arrival(now);
626                Ok((arc, mode))
627            }
628            LeidenMode::FallbackStale => {
629                if let Some(c) = &self.community {
630                    return Ok((c.clone(), mode));
631                }
632                if let Some(c) = &self.community_stale {
633                    return Ok((c.clone(), mode));
634                }
635                let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
636                let arc = Arc::new(assignment);
637                self.community_stale = Some(arc.clone());
638                Ok((arc, mode))
639            }
640        }
641    }
642
643    /// C3 FIX-1: fetch (or build) the row-stochastic CSR matrix used
644    /// by PPR, over the hybrid adjacency. The matrix is a pure
645    /// function of the adjacency; caching it lets repeated retrieves
646    /// at the same op-id skip the 3-pass CSR build (dominates PPR
647    /// cost at ~15 iters on small graphs).
648    ///
649    /// Byte-identity with the uncached `ppr()` path is pinned by the
650    /// `ppr_with_matrix_matches_ppr_on_small_graph` integration test
651    /// in mnem-core.
652    pub fn ppr_matrix_for(
653        &mut self,
654        repo: &ReadonlyRepo,
655        vector: Option<&BruteForceVectorIndex>,
656    ) -> Result<Arc<mnem_core::ppr::SparseTransition>, crate::error::Error> {
657        self.reconcile(repo);
658        if let Some(m) = &self.ppr_matrix {
659            return Ok(m.clone());
660        }
661        let adj = self.hybrid_adjacency_for(repo, vector)?;
662        let m = Arc::new(mnem_core::ppr::sparse_transition_matrix(adj.as_ref()));
663        self.ppr_matrix = Some(m.clone());
664        Ok(m)
665    }
666
667    /// Fetch (or build) the authored-adjacency snapshot. Back-compat
668    /// wrapper preserved for callers that do NOT want KNN fallback
669    /// (e.g. tests asserting the pure-authored path).
670    pub fn adjacency_for(
671        &mut self,
672        repo: &ReadonlyRepo,
673    ) -> Result<Arc<AuthoredEdges>, crate::error::Error> {
674        self.reconcile(repo);
675        if let Some(a) = &self.adjacency {
676            return Ok(a.clone());
677        }
678        let a = Arc::new(collect_authored_edges(repo)?);
679        self.adjacency = Some(a.clone());
680        Ok(a)
681    }
682
683    /// Fetch (or build) the Leiden community assignment over the
684    /// authored-only adjacency. Back-compat entry point.
685    pub fn community_for(
686        &mut self,
687        repo: &ReadonlyRepo,
688    ) -> Result<Arc<mnem_graphrag::community::CommunityAssignment>, crate::error::Error> {
689        self.reconcile(repo);
690        if let Some(c) = &self.community {
691            return Ok(c.clone());
692        }
693        let adj = self.adjacency_for(repo)?;
694        let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
695        let arc = Arc::new(assignment);
696        self.community = Some(arc.clone());
697        Ok(arc)
698    }
699
700    /// C3 Patch-B entry point: fetch (or build) the authored adjacency;
701    /// when it is empty (and `vector` is non-empty), derive a KNN-edge
702    /// substrate via `mnem_ann::derive_knn_edges_from_vectors` and
703    /// cache a combined [`DerivedHybridAdjacency`].
704    ///
705    /// Returns a trait-object `Arc<dyn AdjacencyIndex + Send + Sync>`
706    /// so the retriever's `with_adjacency_index` consumer can accept
707    /// it without knowing whether the KNN lane fired.
708    ///
709    /// # Zero-impact contracts
710    ///
711    /// - Authored non-empty -> returns the authored snapshot; KNN lane
712    ///   never runs; `knn_edges` stays `None`.
713    /// - `vector` is `None` or empty -> returns the authored snapshot
714    ///   (possibly empty); no KNN fallback; preserves the "no graph,
715    ///   no filter" legacy behaviour.
716    /// - Same `op_id` + same vector-index content -> served from the
717    ///   single-slot cache; `derive_knn_edges_from_vectors` is not
718    ///   re-run.
719    pub fn hybrid_adjacency_for(
720        &mut self,
721        repo: &ReadonlyRepo,
722        vector: Option<&BruteForceVectorIndex>,
723    ) -> Result<Arc<dyn AdjacencyIndex + Send + Sync>, crate::error::Error> {
724        self.reconcile(repo);
725        let authored = self.adjacency_for(repo)?;
726
727        // Zero-impact short-circuits.
728        if !authored.edges.is_empty() {
729            return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
730        }
731        let Some(vec_idx) = vector else {
732            return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
733        };
734        if vec_idx.is_empty() {
735            return Ok(authored as Arc<dyn AdjacencyIndex + Send + Sync>);
736        }
737
738        // Fallback active: derive (or fetch cached) KNN edges. The
739        // `KnnEdgeIndex` CID folds in (root_cid, k, metric) so the
740        // cache key is stable across restarts given the same vector
741        // content.
742        self.ensure_knn_edges(vec_idx)?;
743        let knn = self
744            .knn_edges
745            .clone()
746            .expect("ensure_knn_edges populated the slot");
747        // One-shot info log the first time the fallback path wins.
748        if !KNN_FALLBACK_LOGGED.swap(true, Ordering::Relaxed) {
749            tracing::info!(
750                target: "mnem_http::graph_cache",
751                k = KNN_FALLBACK_K,
752                metric = "cosine",
753                knn_edges = knn.len(),
754                vector_model = %vec_idx.model(),
755                "authored adjacency empty; KNN-edge fallback activated (E0 wire)",
756            );
757        }
758        if self.hybrid.is_none() {
759            self.hybrid = Some(Arc::new(DerivedHybridAdjacency {
760                authored: authored.clone(),
761                knn: (*knn).clone(),
762            }));
763        }
764        Ok(self.hybrid.clone().expect("hybrid slot populated above")
765            as Arc<dyn AdjacencyIndex + Send + Sync>)
766    }
767
768    /// C3 Patch-B: community assignment over the hybrid adjacency.
769    /// Falls through to the authored-only path when `vector` is absent
770    /// or empty, or when authored edges are already non-empty.
771    pub fn hybrid_community_for(
772        &mut self,
773        repo: &ReadonlyRepo,
774        vector: Option<&BruteForceVectorIndex>,
775    ) -> Result<Arc<mnem_graphrag::community::CommunityAssignment>, crate::error::Error> {
776        self.reconcile(repo);
777        if let Some(c) = &self.community {
778            return Ok(c.clone());
779        }
780        let adj = self.hybrid_adjacency_for(repo, vector)?;
781        // compute_communities takes `&dyn AdjacencyIndex`; the trait
782        // object's deref coerces.
783        let assignment = mnem_graphrag::community::compute_communities(adj.as_ref(), 0);
784        let arc = Arc::new(assignment);
785        self.community = Some(arc.clone());
786        Ok(arc)
787    }
788
789    /// Derive KNN edges from `vector` and cache them along with the
790    /// `KnnEdgeIndex` CID as the cache key. Idempotent: returns early
791    /// when the key already matches.
792    fn ensure_knn_edges(
793        &mut self,
794        vector: &BruteForceVectorIndex,
795    ) -> Result<(), crate::error::Error> {
796        // Collect (ids, vecs) from the brute-force index. The slices
797        // are borrowed from the index's flat buffer (zero copy up to
798        // the `.to_vec()` the KNN builder does internally when it
799        // clones per-source rows).
800        let mut ids: Vec<NodeId> = Vec::with_capacity(vector.len());
801        let mut vecs: Vec<Vec<f32>> = Vec::with_capacity(vector.len());
802        for (id, row) in vector.points_iter() {
803            ids.push(id);
804            vecs.push(row.to_vec());
805        }
806        let edges = mnem_ann::derive_knn_edges_from_vectors(
807            &ids,
808            &vecs,
809            KNN_FALLBACK_K,
810            mnem_ann::DistanceMetric::Cosine,
811        );
812        // Assemble a KnnEdgeIndex purely to compute the cache-key CID.
813        // The `root_cid` is the content hash of (model, dim, ids,
814        // flat_vecs); we lean on the HybridAdjacency E0 substrate's
815        // existing `compute_cid` so the key is stable across restarts
816        // given identical vector content.
817        let root_cid = vector_index_content_cid(vector, &ids)?;
818        let idx = mnem_ann::KnnEdgeIndex {
819            root_cid,
820            k: KNN_FALLBACK_K,
821            metric: mnem_ann::DistanceMetric::Cosine,
822            edges,
823        };
824        let cid = idx
825            .compute_cid()
826            .map_err(|e| crate::error::Error::internal(format!("knn edge cid: {e}")))?;
827        if self.knn_key.as_ref() == Some(&cid) && self.knn_edges.is_some() {
828            return Ok(());
829        }
830        let triples: Vec<(NodeId, NodeId, f32)> = idx
831            .edges
832            .into_iter()
833            .map(|e| (e.src, e.dst, e.weight))
834            .collect();
835        self.knn_edges = Some(Arc::new(triples));
836        self.knn_key = Some(cid);
837        self.hybrid = None; // invalidate derived combined view
838        Ok(())
839    }
840}
841
842/// Derive a stable content-address for a [`BruteForceVectorIndex`] by
843/// hashing `(model, dim, canonical_ids)`. Two vector indexes with the
844/// same model, dimensionality and ID set share a CID, which is the
845/// coarser grain we want for the KNN-edge cache key (the vectors
846/// themselves live in the Prolly tree keyed by NodeId, so equal IDs
847/// at the same op_id imply equal vector contents).
848fn vector_index_content_cid(
849    vector: &BruteForceVectorIndex,
850    ids: &[NodeId],
851) -> Result<Cid, crate::error::Error> {
852    use mnem_core::codec::to_canonical_bytes;
853    use mnem_core::id::{CODEC_RAW, Multihash};
854    #[derive(serde::Serialize)]
855    struct Preimage<'a> {
856        tag: &'a str,
857        model: &'a str,
858        dim: u32,
859        ids: &'a [NodeId],
860    }
861    let pre = Preimage {
862        tag: "mnem-http/knn-fallback/v1",
863        model: vector.model(),
864        dim: vector.dim(),
865        ids,
866    };
867    let body = to_canonical_bytes(&pre)
868        .map_err(|e| crate::error::Error::internal(format!("canonical encode: {e}")))?;
869    let hash = Multihash::sha2_256(&body);
870    Ok(Cid::new(CODEC_RAW, hash))
871}
872
873/// Gap 10 Phase-1: distinct node count walking adjacency edges.
874fn authored_node_count(adj: &(dyn AdjacencyIndex + Send + Sync)) -> usize {
875    use std::collections::BTreeSet;
876    let mut seen: BTreeSet<NodeId> = BTreeSet::new();
877    for e in adj.iter_edges() {
878        seen.insert(e.src);
879        seen.insert(e.dst);
880    }
881    seen.len()
882}
883
884/// Walk `commit.edges` Prolly tree once and collect every authored
885/// edge as a `(src, dst)` pair in canonical traversal order.
886fn collect_authored_edges(repo: &ReadonlyRepo) -> Result<AuthoredEdges, crate::error::Error> {
887    let Some(commit) = repo.head_commit() else {
888        return Ok(AuthoredEdges::default());
889    };
890    let bs = repo.blockstore().clone();
891    let cursor = mnem_core::prolly::Cursor::new(&*bs, &commit.edges)
892        .map_err(|e| crate::error::Error::internal(format!("opening edge cursor: {e}")))?;
893    let mut edges: Vec<(NodeId, NodeId)> = Vec::new();
894    for entry in cursor {
895        let (_key, edge_cid) =
896            entry.map_err(|e| crate::error::Error::internal(format!("walking edge tree: {e}")))?;
897        let bytes = bs
898            .get(&edge_cid)
899            .map_err(|e| crate::error::Error::internal(format!("fetching edge block: {e}")))?
900            .ok_or_else(|| {
901                crate::error::Error::internal(format!("edge block {edge_cid} missing"))
902            })?;
903        let edge: mnem_core::objects::Edge = mnem_core::codec::from_canonical_bytes(&bytes)
904            .map_err(|e| crate::error::Error::internal(format!("decoding edge: {e}")))?;
905        edges.push((edge.src, edge.dst));
906    }
907    Ok(AuthoredEdges { edges })
908}
909
910#[cfg(test)]
911pub(crate) mod test_support {
912    //! Test-only builders for [`AppState`]. Scoped `pub(crate)` so
913    //! sibling modules (`auth`, `routes::remote`) can build a minimal
914    //! in-memory state without paying the full `app_with_options`
915    //! path (redb open + config load) for every unit test.
916
917    use super::*;
918    use mnem_core::store::{MemoryBlockstore, MemoryOpHeadsStore};
919
920    /// Build an `AppState` backed by in-memory stores, with an
921    /// optional push token. Used by extractor and route unit tests.
922    pub(crate) fn state_with_token(token: Option<String>) -> AppState {
923        let bs: Arc<dyn mnem_core::store::Blockstore> = Arc::new(MemoryBlockstore::new());
924        let ohs: Arc<dyn mnem_core::store::OpHeadsStore> = Arc::new(MemoryOpHeadsStore::new());
925        let repo = ReadonlyRepo::init(bs, ohs).expect("init ok");
926        AppState {
927            repo: Arc::new(Mutex::new(repo)),
928            embed_cfg: None,
929            sparse_cfg: None,
930            indexes: Arc::new(Mutex::new(IndexCache::default())),
931            allow_labels: false,
932            metrics: Metrics::new(),
933            push_token: token,
934            graph_cache: Arc::new(Mutex::new(GraphCache::default())),
935            traverse_cfg: Arc::new(crate::routes::traverse::TraverseAnswerCfg::default()),
936        }
937    }
938}
939
940#[cfg(test)]
941mod knn_fallback_tests {
942    //! C3 Patch-B unit tests: `GraphCache` KNN-edge fallback activation
943    //! and zero-impact short-circuits.
944
945    use super::*;
946    use bytes::Bytes;
947    use mnem_core::objects::node::{Dtype, Embedding};
948    use mnem_core::store::{Blockstore, MemoryBlockstore, MemoryOpHeadsStore, OpHeadsStore};
949
950    fn stores() -> (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) {
951        (
952            Arc::new(MemoryBlockstore::new()),
953            Arc::new(MemoryOpHeadsStore::new()),
954        )
955    }
956
957    fn f32_embed(model: &str, v: &[f32]) -> Embedding {
958        let mut bytes = Vec::with_capacity(v.len() * 4);
959        for x in v {
960            bytes.extend_from_slice(&x.to_le_bytes());
961        }
962        Embedding {
963            model: model.to_string(),
964            dtype: Dtype::F32,
965            dim: u32::try_from(v.len()).expect("test vec fits in u32"),
966            vector: Bytes::from(bytes),
967        }
968    }
969
970    fn build_vector_index(rows: &[(NodeId, Vec<f32>)]) -> BruteForceVectorIndex {
971        let mut idx = BruteForceVectorIndex::empty("m", 3);
972        for (id, v) in rows {
973            let inserted = idx.try_insert(*id, &f32_embed("m", v));
974            assert!(inserted, "embedding insert");
975        }
976        idx
977    }
978
979    #[test]
980    fn empty_authored_plus_empty_vector_is_no_op() {
981        let (bs, ohs) = stores();
982        let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
983        let mut gc = GraphCache::default();
984        let adj = gc.hybrid_adjacency_for(&repo, None).ok().expect("no-op");
985        assert_eq!(adj.edge_count(), 0, "no vectors -> no KNN fallback");
986        assert!(gc.knn_edges.is_none());
987    }
988
989    #[test]
990    fn empty_authored_plus_populated_vector_activates_fallback() {
991        let (bs, ohs) = stores();
992        let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
993        // 4 distinct L2-direction vectors; KNN k=32 (capped at n-1=3)
994        // yields at least one directed edge per source.
995        let rows: Vec<(NodeId, Vec<f32>)> = vec![
996            (NodeId::new_v7(), vec![1.0, 0.0, 0.0]),
997            (NodeId::new_v7(), vec![0.9, 0.1, 0.0]),
998            (NodeId::new_v7(), vec![0.0, 1.0, 0.0]),
999            (NodeId::new_v7(), vec![0.0, 0.0, 1.0]),
1000        ];
1001        let vec_idx = build_vector_index(&rows);
1002
1003        let mut gc = GraphCache::default();
1004        let adj = gc
1005            .hybrid_adjacency_for(&repo, Some(&vec_idx))
1006            .ok()
1007            .expect("knn fallback ok");
1008        assert!(
1009            adj.edge_count() > 0,
1010            "KNN fallback must produce at least one edge (got 0)",
1011        );
1012        assert!(gc.knn_edges.is_some(), "knn_edges slot populated");
1013        assert!(gc.knn_key.is_some(), "knn cache key populated");
1014
1015        // Community assignment must see the derived graph: at least one
1016        // community membership lookup must be Some (non-trivial graph).
1017        let assignment = gc
1018            .hybrid_community_for(&repo, Some(&vec_idx))
1019            .ok()
1020            .expect("community ok");
1021        let any_assigned = rows
1022            .iter()
1023            .any(|(id, _)| assignment.community_of(*id).is_some());
1024        assert!(
1025            any_assigned,
1026            "at least one node must have a community under a non-empty adjacency",
1027        );
1028    }
1029
1030    #[test]
1031    fn knn_fallback_is_idempotent_on_same_vector() {
1032        let (bs, ohs) = stores();
1033        let repo = ReadonlyRepo::init(bs, ohs).expect("init repo");
1034        let rows: Vec<(NodeId, Vec<f32>)> = vec![
1035            (NodeId::new_v7(), vec![1.0, 0.0, 0.0]),
1036            (NodeId::new_v7(), vec![0.0, 1.0, 0.0]),
1037        ];
1038        let vec_idx = build_vector_index(&rows);
1039        let mut gc = GraphCache::default();
1040        let _ = gc
1041            .hybrid_adjacency_for(&repo, Some(&vec_idx))
1042            .ok()
1043            .expect("first build");
1044        let first_key = gc.knn_key.clone().expect("first build populates key");
1045        // Second call must re-use the cached slot (same op_id + same
1046        // vector content -> same KnnEdgeIndex CID).
1047        let _ = gc
1048            .hybrid_adjacency_for(&repo, Some(&vec_idx))
1049            .ok()
1050            .expect("second build");
1051        let second_key = gc.knn_key.clone().expect("second build populates key");
1052        assert_eq!(first_key, second_key, "KNN cache key stable across calls");
1053    }
1054}