Skip to main content

mnem_http/
handlers.rs

1//! Axum handlers for `mnem http`'s v1 surface.
2//!
3//! Keep all handlers synchronous inside the lock. We deliberately hold
4//! a `std::sync::Mutex` across blocking mnem-core calls rather than a
5//! `tokio::Mutex` because those calls don't await; the server is
6//! multi-threaded so concurrent readers serialise on the mutex but
7//! never on the async runtime.
8//
9// Remote-v0 insertion point: future remote-transport endpoints land
10// here as a parallel `/remote/v1/*` surface, NOT on `/v1/*`. Four
11// verbs:
12// GET /remote/v1/refs -> list refs + capabilities
13// POST /remote/v1/fetch-blocks -> stream a CAR of wanted blocks
14// POST /remote/v1/push-blocks -> accept a CAR, verify signatures
15// POST /remote/v1/advance-head -> CAS a ref to a new CID
16// The protocol is source-agnostic: a hosted Uranid plane is one implementation,
17// self-hosted mnem http is another, `file://` is a third. See
18// `docs/ROADMAP.md#remote-v0-work-items-tracked-inline-in-src`
19// item 1 and ().
20
21use axum::Json;
22use axum::extract::{Path, Query, State};
23use ipld_core::ipld::Ipld;
24use mnem_core::codec::json_to_ipld;
25use mnem_core::id::NodeId;
26use mnem_core::index::PropPredicate;
27use mnem_core::objects::Node;
28use mnem_core::retrieve::Lane;
29// BENCH-1 (C4): trait import is required so `MockEmbedder::embed`
30// and `::model` resolve on the concrete struct in the cold-start
31// fallback paths inside `retrieve` / `retrieve_full` below.
32use mnem_embed_providers::Embedder as _;
33use serde::{Deserialize, Serialize};
34use serde_json::{Map, Value, json};
35
36use crate::error::Error;
37use crate::state::AppState;
38
39// ---------- GET /v1/healthz ----------
40
41/// Canonical wire-name for a retrieval lane. Keep in sync with the
42/// `Lane` enum's variants; downstream docs / dashboards depend on
43/// these exact strings.
44const fn lane_name(lane: Lane) -> &'static str {
45    match lane {
46        Lane::Vector => "vector",
47        Lane::Sparse => "sparse",
48        Lane::GraphExpand => "graph_expand",
49        Lane::Rerank => "rerank",
50        // `Lane` is #[non_exhaustive]; new variants added upstream
51        // surface here as "unknown" rather than breaking the wire
52        // format. Downstream clients that see an unknown key should
53        // still be able to parse the response.
54        _ => "unknown",
55    }
56}
57
58// ---------- input clamps ----------
59//
60// The retrieve path takes three caller-controlled usize knobs:
61// `limit` (final result count), `vector_cap` (per-lane candidate
62// pool), and `rerank_top_k` (fanout into an external reranker).
63// None had a ceiling before R2-A. A caller could send
64// `limit=18446744073709551615` and trigger whatever the downstream
65// BruteForce vector search allocates - even behind the 64 MiB
66// body-size limit, `Option<usize>` is cheap to send and expensive
67// to honour. Reject at the boundary.
68//
69// The ceilings are deliberately generous: they exist to prevent
70// accidental or adversarial OOM, not to impose product shape.
71// Legitimate callers will never see them. If you have a real use
72// case that exceeds a cap, raise the cap here (not locally per
73// handler) and extend the 400 message with the new ceiling.
74
75/// Maximum `limit` accepted on any `/v1/retrieve` variant. Caps the
76/// final item count returned to the caller. 1,000 is ~20x the
77/// practical top-k a UI or LLM context window can consume.
78pub(crate) const MAX_RETRIEVE_LIMIT: usize = 1_000;
79
80/// Maximum `vector_cap` accepted on `POST /v1/retrieve`. Caps the
81/// per-lane candidate pool the vector index walks. 100,000 covers
82/// the entire legitimate dense-corpus fan-out for the current
83/// `BruteForce` index; HNSW will want its own tuning.
84pub(crate) const MAX_VECTOR_CAP: usize = 100_000;
85
86/// Maximum `rerank_top_k` accepted on `POST /v1/retrieve`. Caps
87/// the number of candidates sent to an external reranker. 500 is
88/// 10x what any cross-encoder today handles in <1s; callers
89/// usually pick 50-100.
90pub(crate) const MAX_RERANK_TOP_K: usize = 500;
91
92/// Reject an oversized `limit` / `vector_cap` / `rerank_top_k` with
93/// a 400 and a specific message that tells the caller which knob
94/// and which cap.
95fn clamp_or_reject(name: &'static str, value: Option<usize>, cap: usize) -> Result<(), Error> {
96    if let Some(n) = value
97        && n > cap
98    {
99        return Err(Error::bad_request(format!(
100            "{name}={n} exceeds max of {cap}; lower the value or split the request"
101        )));
102    }
103    Ok(())
104}
105
106pub(crate) async fn healthz() -> Json<Value> {
107    Json(json!({
108    "schema": "mnem.v1.healthz",
109    "ok": true,
110    "service": "mnem http",
111    "version": env!("CARGO_PKG_VERSION"),
112    }))
113}
114
115// ---------- GET /v1/stats ----------
116
117pub(crate) async fn stats(State(s): State<AppState>) -> Result<Json<Value>, Error> {
118    let repo = s.repo.lock().map_err(|_| Error::locked())?;
119    let op_id = repo.op_id().to_string();
120    let head = repo.view().heads.first().map(ToString::to_string);
121    let refs = repo.view().refs.len();
122    Ok(Json(json!({
123    "schema": "mnem.v1.stats",
124    "op_id": op_id,
125    "head_commit": head,
126    "refs": refs,
127    })))
128}
129
130// ---------- POST /v1/nodes ----------
131
132#[derive(Deserialize)]
133pub(crate) struct PostNodeBody {
134    /// Scoping tag. Maps to `Node.ntype` on the wire. Optional on the
135    /// HTTP boundary: if omitted or empty, the server substitutes
136    /// [`Node::DEFAULT_NTYPE`] (`"Node"`). Callers that want
137    /// per-tenant / per-conversation isolation pass a non-empty value.
138    #[serde(default)]
139    pub label: String,
140    pub summary: Option<String>,
141    pub props: Option<Map<String, Value>>,
142    pub content: Option<String>,
143    /// Required for the single-node `POST /v1/nodes` path; optional
144    /// inside the bulk wrapper (audit-2026-04-25 P2-8): when absent,
145    /// the wrapper-level `author` is used. The single-node handler
146    /// still validates non-empty before commit, so the contract is
147    /// preserved.
148    #[serde(default)]
149    pub author: Option<String>,
150    #[serde(default)]
151    pub message: Option<String>,
152    /// Optional caller-supplied UUID. When present, mnem uses it as
153    /// the node's `NodeId` instead of generating a fresh v7. Lets
154    /// distributed agents + replay pipelines pin node identity so
155    /// two machines ingesting the same logical event produce the
156    /// same `Node` CID. Must be a UUID-8x20 / UUID-v7 / UUID-v4
157    /// string parseable by `NodeId::parse_uuid`.
158    #[serde(default)]
159    pub id: Option<String>,
160}
161
162#[derive(Serialize)]
163pub(crate) struct PostNodeResp {
164    schema: &'static str,
165    id: String,
166    label: String,
167    op_id: String,
168}
169
170pub(crate) async fn post_node(
171    State(s): State<AppState>,
172    Json(body): Json<PostNodeBody>,
173) -> Result<Json<PostNodeResp>, Error> {
174    // Two-step label resolution:
175    // 1. If the server was not launched with `MNEM_BENCH=1`
176    // (`s.allow_labels == false`), we *ignore* any caller-supplied
177    // `label` silently and always use `Node::DEFAULT_NTYPE`. This
178    // is the casual / single-tenant path: no label surface.
179    // 2. If `s.allow_labels == true`, we honour the caller's value;
180    // an empty/omitted value still falls back to the default.
181    let label = if s.allow_labels && !body.label.trim().is_empty() {
182        body.label.clone()
183    } else {
184        Node::DEFAULT_NTYPE.to_string()
185    };
186    let author = body
187        .author
188        .as_deref()
189        .map(str::trim)
190        .filter(|a| !a.is_empty())
191        .map(str::to_string);
192    let author = match author {
193        Some(a) => a,
194        None => return Err(Error::bad_request("author is required")),
195    };
196
197    let node_id = match body.id.as_deref() {
198        Some(s) => NodeId::parse_uuid(s)
199            .map_err(|e| Error::bad_request(format!("invalid caller-supplied id: {e}")))?,
200        None => NodeId::new_v7(),
201    };
202    let mut node = Node::new(node_id, &label);
203    if let Some(sum) = &body.summary {
204        node = node.with_summary(sum);
205    }
206    if let Some(props) = body.props {
207        for (k, v) in props {
208            node = node.with_prop(
209                k,
210                json_to_ipld(&v).map_err(|e| Error::bad_request(e.to_string()))?,
211            );
212        }
213    }
214    if let Some(c) = body.content {
215        node = node.with_content(bytes::Bytes::from(c.into_bytes()));
216    }
217
218    // Auto-embed the node's summary (dense + sparse, if configured).
219    // Failures warn but do not block the commit; a later `mnem embed`
220    // pass can backfill. Clone `text` up front so the borrow of
221    // `node.summary` ends before we mutate `node` via
222    // `with_sparse_embed`. The dense vector is staged via the
223    // sidecar `Transaction::set_embedding` rather than `with_embed`
224    // so the resulting `NodeCid` does not bake in last-bit ORT
225    // drift and can be deduped across federated peers.
226    let text_for_embed: Option<String> = node
227        .summary
228        .as_ref()
229        .filter(|t| !t.trim().is_empty())
230        .cloned();
231    let mut pending_dense: Option<(String, mnem_core::objects::Embedding)> = None;
232    if let Some(text) = text_for_embed {
233        if let Some(pc) = &s.embed_cfg
234            && let Ok(embedder) = mnem_embed_providers::open(pc)
235            && let Ok(v) = embedder.embed(&text)
236        {
237            let emb = mnem_embed_providers::to_embedding(embedder.model(), &v);
238            pending_dense = Some((embedder.model().to_string(), emb));
239        }
240        if let Some(sc) = &s.sparse_cfg
241            && let Ok(sparser) = mnem_sparse_providers::open(sc)
242            && let Ok(se) = sparser.encode(&text)
243        {
244            node = node.with_sparse_embed(se);
245        }
246        // Silent on failure; the POST path returns an `id` either way.
247    }
248
249    let id = node.id;
250
251    let mut guard = s.repo.lock().map_err(|_| Error::locked())?;
252    let mut tx = guard.start_transaction();
253    let cid = tx.add_node(&node)?;
254    if let Some((model, emb)) = pending_dense {
255        tx.set_embedding(cid, model, emb)?;
256    }
257    let commit_start = std::time::Instant::now();
258    let new_repo = tx.commit(
259        &author,
260        body.message.as_deref().unwrap_or("mnem http add node"),
261    )?;
262    s.metrics
263        .commit_duration
264        .observe(commit_start.elapsed().as_secs_f64());
265    let op_id = new_repo.op_id().to_string();
266    *guard = new_repo;
267
268    Ok(Json(PostNodeResp {
269        schema: "mnem.v1.post-node",
270        id: id.to_uuid_string(),
271        label: body.label,
272        op_id,
273    }))
274}
275
276// ---------- GET /v1/nodes/{id} ----------
277
278pub(crate) async fn get_node(
279    State(s): State<AppState>,
280    Path(id_str): Path<String>,
281) -> Result<Json<Value>, Error> {
282    let id = NodeId::parse_uuid(&id_str)
283        .map_err(|e| Error::bad_request(format!("invalid UUID: {e}")))?;
284    let repo = s.repo.lock().map_err(|_| Error::locked())?;
285    let node = repo
286        .lookup_node(&id)?
287        .ok_or_else(|| Error::not_found(format!("no node with id={id_str}")))?;
288
289    let mut props_map = Map::new();
290    for (k, v) in &node.props {
291        props_map.insert(k.clone(), ipld_to_json(v));
292    }
293
294    // Embeddings are sidecar-attached, not Node-inline. Probe under
295    // the configured embedder's `model_fq` (the same string used at
296    // write time). When no embed-provider is configured, we report
297    // `has_embedding = false` rather than enumerate every model -
298    // the sidecar API is keyed by model and a multi-model probe is
299    // out of scope for this single-flag wire field.
300    let has_embedding = match s.embed_cfg.as_ref() {
301        Some(pc) => {
302            let model = model_fq_of(pc);
303            let (_, node_cid) = mnem_core::codec::hash_to_cid(&node)
304                .map_err(|e| Error::internal(format!("hash node: {e}")))?;
305            repo.embedding_for(&node_cid, &model)?.is_some()
306        }
307        None => false,
308    };
309
310    Ok(Json(json!({
311    "schema": "mnem.v1.node",
312    "id": node.id.to_uuid_string(),
313    "label": node.ntype,
314    "summary": node.summary,
315    "props": Value::Object(props_map),
316    "content_bytes": node.content.as_ref().map_or(0, bytes::Bytes::len),
317    "has_embedding": has_embedding,
318    })))
319}
320
321/// Format the `provider:model` string the embedder adapters expose
322/// via `Embedder::model()`. Mirrored here so handlers can derive it
323/// from a `ProviderConfig` without opening the adapter.
324fn model_fq_of(pc: &mnem_embed_providers::ProviderConfig) -> String {
325    use mnem_embed_providers::ProviderConfig as PC;
326    match pc {
327        PC::Openai(c) => format!("openai:{}", c.model),
328        PC::Ollama(c) => format!("ollama:{}", c.model),
329        PC::Onnx(c) => format!("onnx:{}", c.model),
330    }
331}
332
333// ---------- DELETE /v1/nodes/{id} ----------
334
335#[derive(Deserialize)]
336pub(crate) struct DeleteQuery {
337    /// Commit author. Required; query-string rather than body so `curl
338    /// -X DELETE` stays one-line-trivial.
339    pub author: String,
340    #[serde(default)]
341    pub message: Option<String>,
342}
343
344pub(crate) async fn delete_node(
345    State(s): State<AppState>,
346    Path(id_str): Path<String>,
347    Query(q): Query<DeleteQuery>,
348) -> Result<Json<Value>, Error> {
349    let id = NodeId::parse_uuid(&id_str)
350        .map_err(|e| Error::bad_request(format!("invalid UUID: {e}")))?;
351    if q.author.trim().is_empty() {
352        return Err(Error::bad_request("author is required"));
353    }
354
355    let mut guard = s.repo.lock().map_err(|_| Error::locked())?;
356    let existed = guard.lookup_node(&id)?.is_some();
357    let mut tx = guard.start_transaction();
358    tx.remove_node(id);
359    let commit_start = std::time::Instant::now();
360    let new_repo = tx.commit(
361        &q.author,
362        q.message.as_deref().unwrap_or("mnem http delete node"),
363    )?;
364    s.metrics
365        .commit_duration
366        .observe(commit_start.elapsed().as_secs_f64());
367    let op_id = new_repo.op_id().to_string();
368    *guard = new_repo;
369
370    Ok(Json(json!({
371    "schema": "mnem.v1.delete-node",
372    "id": id_str,
373    "existed": existed,
374    "op_id": op_id,
375    })))
376}
377
378// ---------- POST /v1/nodes/{id}/tombstone ----------
379
380#[derive(Deserialize)]
381pub(crate) struct TombstoneBody {
382    /// Free-form reason string recorded on the tombstone.
383    #[serde(default)]
384    pub reason: String,
385    /// Commit author.
386    pub author: String,
387}
388
389pub(crate) async fn tombstone_node(
390    State(s): State<AppState>,
391    Path(id_str): Path<String>,
392    Json(body): Json<TombstoneBody>,
393) -> Result<Json<Value>, Error> {
394    let id = NodeId::parse_uuid(&id_str)
395        .map_err(|e| Error::bad_request(format!("invalid UUID: {e}")))?;
396    if body.author.trim().is_empty() {
397        return Err(Error::bad_request("author is required"));
398    }
399    let mut guard = s.repo.lock().map_err(|_| Error::locked())?;
400    // 404: the underlying node must exist in the current head. We
401    // check before starting a transaction so the error surface is
402    // clean (no "stale" commit on a missing id).
403    if guard.lookup_node(&id)?.is_none() {
404        return Err(Error::not_found(format!("no node with id={id_str}")));
405    }
406    // 409: already tombstoned. Matches the item-3 contract: callers
407    // shouldn't be able to re-tombstone silently via the HTTP API
408    // (the in-process `tombstone_node` remains idempotent for agents
409    // that want that behaviour).
410    if guard.is_tombstoned(&id) {
411        return Err(Error::conflict(format!(
412            "node {id_str} is already tombstoned"
413        )));
414    }
415    let mut tx = guard.start_transaction();
416    tx.tombstone_node(id, body.reason.clone())?;
417    let commit_start = std::time::Instant::now();
418    let new_repo = tx.commit(&body.author, "mnem http tombstone node")?;
419    s.metrics
420        .commit_duration
421        .observe(commit_start.elapsed().as_secs_f64());
422    let op_id = new_repo.op_id().to_string();
423    *guard = new_repo;
424
425    Ok(Json(json!({
426    "schema": "mnem.v1.tombstone",
427    "op_id": op_id,
428    "node_id": id_str,
429    })))
430}
431
432// ---------- POST /v1/nodes/bulk ----------
433//
434// One-commit bulk ingest. The per-node POST /v1/nodes path commits
435// after every write (Prolly-tree + IndexSet rebuild each time), which
436// is ~2 seconds per node on a laptop with ollama. At 3633 docs that
437// is two hours. The bulk endpoint accepts N nodes in one request and
438// does ONE commit at the end, dropping the same ingest to minutes.
439//
440// Response includes the per-node IDs in the order sent so callers
441// can build their external_id <-> mnem_node_id map.
442
443#[derive(Deserialize)]
444pub(crate) struct BulkNodeBody {
445    pub nodes: Vec<PostNodeBody>,
446    pub author: String,
447    #[serde(default)]
448    pub message: Option<String>,
449    /// When true (default) AND an embed provider is configured on the
450    /// server, each node's summary is auto-embedded before commit.
451    #[serde(default = "default_true")]
452    pub auto_embed: bool,
453}
454
455const fn default_true() -> bool {
456    true
457}
458
459#[derive(Serialize)]
460pub(crate) struct BulkNodeResp {
461    schema: &'static str,
462    op_id: String,
463    /// One entry per input node, same order.
464    results: Vec<BulkNodeEntry>,
465    /// How many nodes embedded successfully vs skipped.
466    embedded: u32,
467    skipped_embed: u32,
468}
469
470#[derive(Serialize)]
471pub(crate) struct BulkNodeEntry {
472    id: String,
473    label: String,
474}
475
476pub(crate) async fn post_nodes_bulk(
477    State(s): State<AppState>,
478    Json(body): Json<BulkNodeBody>,
479) -> Result<Json<BulkNodeResp>, Error> {
480    if body.author.trim().is_empty() {
481        return Err(Error::bad_request("author is required"));
482    }
483    if body.nodes.is_empty() {
484        return Err(Error::bad_request("nodes must not be empty"));
485    }
486
487    // Resolve the dense embedder + the sparse encoder once so we don't
488    // reopen per node. If a provider is configured but opening fails
489    // (bad API key, sidecar unreachable), fail the whole bulk call
490    // instead of committing every node without embeddings and
491    // silently reporting success.
492    let embedder = if body.auto_embed {
493        match s.embed_cfg.as_ref() {
494 Some(pc) => Some(mnem_embed_providers::open(pc).map_err(|e| {
495 Error::internal(format!(
496 "embed provider configured but open failed: {e}; bulk aborted to avoid silent no-embed commit"
497 ))
498 })?),
499 None => None,
500 }
501    } else {
502        None
503    };
504    let sparser = if body.auto_embed {
505        match s.sparse_cfg.as_ref() {
506 Some(sc) => Some(mnem_sparse_providers::open(sc).map_err(|e| {
507 Error::internal(format!(
508 "sparse provider configured but open failed: {e}; bulk aborted to avoid silent no-sparse commit"
509 ))
510 })?),
511 None => None,
512 }
513    } else {
514        None
515    };
516
517    // Pre-build every Node, doing the embed calls before taking the
518    // repo mutex. Ollama / OpenAI calls can be slow; holding the
519    // mutex across them would block every other HTTP request.
520    // Each entry pairs the Node with an optional dense (model, vec)
521    // staged for the sidecar-side `Transaction::set_embedding` call
522    // that runs after `add_node` returns the NodeCid.
523    let mut built: Vec<(Node, Option<(String, mnem_core::objects::Embedding)>)> =
524        Vec::with_capacity(body.nodes.len());
525    let mut results: Vec<BulkNodeEntry> = Vec::with_capacity(body.nodes.len());
526    let mut embedded = 0u32;
527    let mut skipped_embed = 0u32;
528
529    for nb in body.nodes {
530        // Same gating as the single-node path: caller-supplied `label`
531        // is ignored unless the server was launched with
532        // `MNEM_BENCH=1`. See the doc-comment on `post_node` for the
533        // full rationale.
534        let label = if s.allow_labels && !nb.label.trim().is_empty() {
535            nb.label.clone()
536        } else {
537            Node::DEFAULT_NTYPE.to_string()
538        };
539        let node_id = match nb.id.as_deref() {
540            Some(s) => NodeId::parse_uuid(s)
541                .map_err(|e| Error::bad_request(format!("invalid caller-supplied id: {e}")))?,
542            None => NodeId::new_v7(),
543        };
544        let mut node = Node::new(node_id, &label);
545        if let Some(sum) = &nb.summary {
546            node = node.with_summary(sum);
547        }
548        if let Some(props) = nb.props {
549            for (k, v) in props {
550                node = node.with_prop(
551                    k,
552                    json_to_ipld(&v).map_err(|e| Error::bad_request(e.to_string()))?,
553                );
554            }
555        }
556        if let Some(c) = nb.content {
557            node = node.with_content(bytes::Bytes::from(c.into_bytes()));
558        }
559        // Clone the summary up front so the borrow ends before the
560        // `with_sparse_embed` mutation. Dense vectors stage to the
561        // sidecar via `Transaction::set_embedding` after the commit
562        // loop knows the NodeCid; we collect them here keyed by
563        // position in `built`.
564        let text_for_embed: Option<String> = node
565            .summary
566            .as_ref()
567            .filter(|t| !t.trim().is_empty())
568            .cloned();
569        let mut pending_dense: Option<(String, mnem_core::objects::Embedding)> = None;
570        if let Some(text) = text_for_embed {
571            if let Some(embedder) = embedder.as_ref() {
572                match embedder.embed(&text) {
573                    Ok(v) => {
574                        let emb = mnem_embed_providers::to_embedding(embedder.model(), &v);
575                        pending_dense = Some((embedder.model().to_string(), emb));
576                        embedded += 1;
577                    }
578                    Err(_) => {
579                        skipped_embed += 1;
580                    }
581                }
582            }
583            if let Some(sparser) = sparser.as_ref()
584                && let Ok(se) = sparser.encode(&text)
585            {
586                node = node.with_sparse_embed(se);
587            }
588        }
589        results.push(BulkNodeEntry {
590            id: node.id.to_uuid_string(),
591            label: nb.label,
592        });
593        built.push((node, pending_dense));
594    }
595
596    // Single commit over all nodes. Index rebuild happens once.
597    let mut guard = s.repo.lock().map_err(|_| Error::locked())?;
598    let mut tx = guard.start_transaction();
599    for (node, pending_dense) in &built {
600        let cid = tx.add_node(node)?;
601        if let Some((model, emb)) = pending_dense {
602            tx.set_embedding(cid, model.clone(), emb.clone())?;
603        }
604    }
605    let commit_start = std::time::Instant::now();
606    let new_repo = tx.commit(
607        &body.author,
608        body.message.as_deref().unwrap_or("mnem http bulk add"),
609    )?;
610    s.metrics
611        .commit_duration
612        .observe(commit_start.elapsed().as_secs_f64());
613    let op_id = new_repo.op_id().to_string();
614    *guard = new_repo;
615
616    Ok(Json(BulkNodeResp {
617        schema: "mnem.v1.post-nodes-bulk",
618        op_id,
619        results,
620        embedded,
621        skipped_embed,
622    }))
623}
624
625// ---------- GET /v1/retrieve ----------
626
627#[derive(Deserialize)]
628pub(crate) struct RetrieveQuery {
629    pub text: Option<String>,
630    pub label: Option<String>,
631    #[serde(default)]
632    pub budget: Option<u32>,
633    #[serde(default)]
634    pub limit: Option<usize>,
635    /// `KEY=VALUE`; VALUE tried as JSON first, falls back to string.
636    pub where_eq: Option<String>,
637}
638
639pub(crate) async fn retrieve(
640    State(s): State<AppState>,
641    Query(q): Query<RetrieveQuery>,
642) -> Result<Json<Value>, Error> {
643    // Clamp untrusted numeric knobs before we touch the retriever.
644    // See the `MAX_RETRIEVE_LIMIT` / `MAX_VECTOR_CAP` / `MAX_RERANK_TOP_K`
645    // constants at the top of this file for rationale.
646    clamp_or_reject("limit", q.limit, MAX_RETRIEVE_LIMIT)?;
647
648    let repo = s.repo.lock().map_err(|_| Error::locked())?;
649    let mut ret = repo.retrieve();
650    // Honour the caller's label filter only when the server was
651    // launched with `MNEM_BENCH=1`. Otherwise the label field is
652    // simply ignored; the retrieve runs unscoped. See the
653    // `post_node` doc-comment for the full rationale.
654    if s.allow_labels
655        && let Some(l) = &q.label
656    {
657        ret = ret.label(l.clone());
658    }
659    if let Some(w) = &q.where_eq {
660        let (k, v) = parse_kv(w).map_err(Error::bad_request)?;
661        ret = ret.where_prop(k, PropPredicate::Eq(v));
662    }
663    if let Some(b) = q.budget {
664        ret = ret.token_budget(b);
665    }
666    if let Some(n) = q.limit {
667        ret = ret.limit(n);
668    }
669    // Auto-encode the text query through every configured lane
670    // (dense + sparse). there is no in-process lexical
671    // ranker left; a text query with no embedder AND no sparse
672    // provider configured is rejected with 400.
673    let mut vector_model: Option<String> = None;
674    let mut sparse_vocab: Option<String> = None;
675    if let Some(text) = q.text.as_deref()
676        && !text.trim().is_empty()
677    {
678        ret = ret.query_text(text.to_string());
679        // Dense lane.
680        if let Some(pc) = &s.embed_cfg {
681            let embedder = mnem_embed_providers::open(pc)
682                .map_err(|e| Error::internal(format!("embed provider open failed: {e}")))?;
683            let qvec = embedder
684                .embed(text)
685                .map_err(|e| Error::internal(format!("embed call failed: {e}")))?;
686            vector_model = Some(embedder.model().to_string());
687            ret = ret.vector(embedder.model().to_string(), qvec);
688        }
689        // Sparse lane.
690        if let Some(sc) = &s.sparse_cfg {
691            let sparser = mnem_sparse_providers::open(sc)
692                .map_err(|e| Error::bad_request(format!("sparse open failed: {e}")))?;
693            let sq = sparser
694                .encode_query(text)
695                .map_err(|e| Error::bad_request(format!("sparse encode failed: {e}")))?;
696            sparse_vocab = Some(sq.vocab_id.clone());
697            ret = ret.sparse_query(sq);
698        }
699        // BENCH-1 (C4 audit): cold-start fallback. Cells launched on
700        // a fresh `/data` volume have no `[embed]` / `[sparse]`
701        // section in `config.toml`, so AppState resolves both to
702        // `None`. Rather than 400 the caller (which breaks bench
703        // harnesses that exercise retrieve before configuring a
704        // provider), fall back to the deterministic, network-free
705        // `MockEmbedder` (blake3-derived, dim=384). Real providers
706        // still take priority when configured; this branch only
707        // fires when both `embed_cfg` AND `sparse_cfg` are absent.
708        if vector_model.is_none() && sparse_vocab.is_none() {
709            let mock = mnem_embed_providers::MockEmbedder::new("mock:cold-start-384", 384);
710            let qvec = mock
711                .embed(text)
712                .map_err(|e| Error::internal(format!("mock embed failed: {e}")))?;
713            vector_model = Some(mock.model().to_string());
714            ret = ret.vector(mock.model().to_string(), qvec);
715            tracing::warn!(
716                "retrieve: no [embed]/[sparse] configured; using deterministic \
717 MockEmbedder fallback (cold-start). Configure a real provider \
718 in config.toml for production retrieval quality."
719            );
720        }
721    }
722    {
723        let mut cache = s.indexes.lock().map_err(|_| Error::locked())?;
724        if let Some(model) = &vector_model {
725            let idx = cache.vector_index(&repo, model)?;
726            ret = ret.with_vector_index(idx);
727        }
728        if let Some(vocab) = &sparse_vocab {
729            let idx = cache.sparse_index(&repo, vocab)?;
730            ret = ret.with_sparse_index(idx);
731        }
732    }
733    // Record retrieve-latency histogram around the actual fusion call.
734    // This keeps the sample narrow (excludes JSON serialisation cost)
735    // so operators see the cost of the retrieval pipeline itself.
736    let retrieve_start = std::time::Instant::now();
737    let result = ret.execute()?;
738    s.metrics
739        .retrieve_latency
740        .observe(retrieve_start.elapsed().as_secs_f64());
741
742    let items: Vec<Value> = result
743        .items
744        .iter()
745        .map(|item| {
746            // Per-lane observability: expose as a JSON object keyed by
747            // lane name so API consumers can diagnose "why did this
748            // node rank" without re-running the pipeline locally.
749            let mut lane_obj = Map::new();
750            for (lane, score) in &item.lane_scores {
751                lane_obj.insert(lane_name(*lane).to_string(), json!(score));
752            }
753            json!({
754            "id": item.node.id.to_uuid_string(),
755            "label": item.node.ntype,
756            "score": item.score,
757            "tokens": item.tokens,
758            "summary": item.node.summary,
759            "rendered": item.rendered,
760            "lane_scores": Value::Object(lane_obj),
761            })
762        })
763        .collect();
764
765    // Gap 16: score calibration - scale-free per-query interpretability.
766    // `score_distribution` is a response-level block carrying
767    // min / max / median / iqr + a categorical `shape` label
768    // (long-tail / uniform / bimodal / insufficient-samples). The
769    // shape is promoted to a top-level agent hint per the R2 spec:
770    // agents consume it to decide whether top-1 is a confident match
771    // or whether the dense ranking is inconclusive. Scale-free: works
772    // identically for K=8 or K=1000.
773    let score_dist = {
774        let scores: Vec<f32> = result.items.iter().map(|it| it.score).collect();
775        mnem_graphrag::distribution_shape(&scores, mnem_graphrag::K_MIN)
776    };
777
778    Ok(Json(json!({
779    "schema": "mnem.v1.retrieve",
780    "items": items,
781    "tokens_used": result.tokens_used,
782    "tokens_budget": if result.tokens_budget == u32::MAX {
783    Value::Null
784    } else {
785    Value::from(result.tokens_budget)
786    },
787    "dropped": result.dropped,
788    "candidates_seen": result.candidates_seen,
789    "score_distribution": score_dist,
790    })))
791}
792
793// ---------- POST /v1/retrieve (full retrieval pipeline) ----------
794//
795// Accepts a JSON body with every knob the CLI exposes: label, where_eq,
796// text, budget, limit, vector_cap, graph_expand, rerank,
797// and hints that trigger the embedder / LLM at the edges.
798//
799// HyDE and multi-query require a configured LLM provider and are
800// gated behind explicit fields; the handler replies with `llm_skipped`
801// metadata when the caller asks for either without supplying a
802// provider config inline.
803//
804// Same adapter-failure policy as the CLI: every optional tier that
805// errors out is logged and skipped; the base hybrid retrieval always
806// runs.
807
808#[derive(Deserialize, Default)]
809pub(crate) struct RetrieveRequest {
810    // Basic filters
811    #[serde(default)]
812    pub text: Option<String>,
813    #[serde(default)]
814    pub label: Option<String>,
815    #[serde(default)]
816    pub where_eq: Option<String>,
817    #[serde(default)]
818    pub budget: Option<u32>,
819    #[serde(default)]
820    pub limit: Option<usize>,
821
822    // Ranker caps (fixes the hardcoded 256 silent truncation)
823    #[serde(default)]
824    pub vector_cap: Option<usize>,
825
826    // Semantic vector (caller may supply an embedding directly OR
827    // name an embedder configured on the server)
828    #[serde(default)]
829    pub vector_model: Option<String>,
830    #[serde(default)]
831    pub vector: Option<Vec<f32>>,
832
833    // Tier 3: cross-encoder reranker, PROVIDER:MODEL
834    #[serde(default)]
835    pub rerank: Option<String>,
836    #[serde(default)]
837    pub rerank_top_k: Option<usize>,
838
839    // Experiment E1 (C3 FIX-1 v2): community **expander**. Despite
840    // the legacy field name `community_filter`, this knob now wires
841    // the ADDITIVE expander - it never drops candidates, only pulls
842    // in community-cohesive siblings of the top seeds. Matrix v4
843    // pinned a -29pp R@10 regression on the old drop-filter
844    // semantic, which is why the semantic is inverted here. Flag
845    // absent or `false` preserves the byte-exact pass-through
846    // contract.
847    #[serde(default)]
848    pub community_filter: Option<bool>,
849    /// Legacy knob retained for wire-compat with v0.1.0 clients.
850    /// **Ignored** under the expander semantic: the expander has no
851    /// coverage threshold because it never drops candidates.
852    #[serde(default)]
853    pub community_min_coverage: Option<f32>,
854    /// Expander: number of top candidates treated as seeds whose
855    /// communities get expanded. Default 3.
856    #[serde(default)]
857    pub community_expand_seeds: Option<usize>,
858    /// Expander: per-community cap on how many additional members
859    /// are pulled in. Default 10.
860    #[serde(default)]
861    pub community_max_per: Option<usize>,
862    /// Expander: score decay applied to expanded members relative
863    /// to the seed score. Default 0.85.
864    #[serde(default)]
865    pub community_decay: Option<f32>,
866
867    // Tier 2: graph expansion
868    #[serde(default)]
869    pub graph_expand: Option<usize>,
870    #[serde(default)]
871    pub graph_decay: Option<f32>,
872    #[serde(default)]
873    pub graph_etype: Option<Vec<String>>,
874    /// Multi-hop traversal depth. `1` = single-hop (the classic
875    /// graph-expand). `2+` enables MuSiQue-style compositional
876    /// queries. Clamped internally to `[1, 4]`.
877    #[serde(default)]
878    pub graph_depth: Option<usize>,
879    /// Per-seed outgoing-edge cap. Prevents a hot-seed node with
880    /// hundreds of out-edges from starving sibling seeds in the
881    /// global `graph_expand` budget.
882    #[serde(default)]
883    pub graph_max_per_seed: Option<usize>,
884    /// Graph-expand strategy. `"decay"` (default) runs the classic
885    /// BFS; `"ppr"` switches to personalised PageRank (E2+). PPR
886    /// falls through to the decay walk when the repo has no wired
887    /// adjacency index.
888    #[serde(default)]
889    pub graph_mode: Option<String>,
890    /// PPR damping factor (default 0.85). Ignored unless
891    /// `graph_mode = "ppr"`.
892    #[serde(default)]
893    pub ppr_damping: Option<f32>,
894    /// PPR power-iteration cap (default 15). Ignored unless
895    /// `graph_mode = "ppr"`.
896    #[serde(default)]
897    pub ppr_iter: Option<u32>,
898    /// Gap 02 #17: opt in to running PPR even when the graph
899    /// exceeds `PPR_DEFAULT_MAX_NODES` (250000). Default `false`
900    /// (size gate active). Ignored unless `graph_mode = "ppr"`.
901    #[serde(default)]
902    pub ppr_opt_in: Option<bool>,
903    /// E4 T2: Centroid + MMR extractive summarization on the top-M
904    /// candidates. `summarize=false` (or absent) is a no-op; no
905    /// `summary` field is emitted into the response.
906    #[serde(default)]
907    pub summarize: Option<bool>,
908    /// How many summary sentences to emit. Defaults to 3 when
909    /// `summarize=true` and this field is absent.
910    #[serde(default)]
911    pub summarize_k: Option<usize>,
912}
913
914pub(crate) async fn retrieve_full(
915    State(s): State<AppState>,
916    Json(body): Json<RetrieveRequest>,
917) -> Result<Json<Value>, Error> {
918    // Clamp untrusted numeric knobs before we touch the retriever.
919    // See the `MAX_RETRIEVE_LIMIT` / `MAX_VECTOR_CAP` / `MAX_RERANK_TOP_K`
920    // constants at the top of this file for rationale.
921    clamp_or_reject("limit", body.limit, MAX_RETRIEVE_LIMIT)?;
922    clamp_or_reject("vector_cap", body.vector_cap, MAX_VECTOR_CAP)?;
923    clamp_or_reject("rerank_top_k", body.rerank_top_k, MAX_RERANK_TOP_K)?;
924
925    let repo = s.repo.lock().map_err(|_| Error::locked())?;
926    let mut ret = repo.retrieve();
927    let mut skipped: Vec<String> = Vec::new();
928    // Gap 14: structural warnings[]. Populated from compile-time
929    // constants only (see `mnem_core::retrieve::warnings`). Every
930    // push below is guarded by a structural precondition (substrate
931    // count == 0, provider open error, etc.) so the array stays
932    // small; `cap_warnings` enforces the hard cap before we
933    // serialise.
934    let mut warnings: Vec<mnem_core::retrieve::Warning> = Vec::new();
935
936    // Label filter gated by `MNEM_BENCH`. See `post_node` doc-comment
937    // for the full rationale.
938    if s.allow_labels
939        && let Some(l) = &body.label
940    {
941        ret = ret.label(l.clone());
942    }
943    if let Some(w) = &body.where_eq {
944        let (k, v) = parse_kv(w).map_err(Error::bad_request)?;
945        ret = ret.where_prop(k, PropPredicate::Eq(v));
946    }
947    if let Some(b) = body.budget {
948        ret = ret.token_budget(b);
949    }
950    if let Some(n) = body.limit {
951        ret = ret.limit(n);
952    }
953    if let Some(n) = body.vector_cap {
954        ret = ret.vector_cap(n);
955    }
956
957    // Vector: caller-supplied embedding takes priority over
958    // server-side auto-fuse. When no vector is supplied AND the
959    // server has an embed provider configured, embed the text
960    // query with it so the retrieve fires the real hybrid path.
961    // This matches the CLI behaviour (commands.rs retrieve).
962    //
963    // Post-there is no text ranker left in mnem-core: a
964    // `text` query without either (a) a caller-supplied vector or
965    // (b) a configured embedder is rejected with 400.
966    let mut vector_model: Option<String> = None;
967    let mut sparse_vocab: Option<String> = None;
968    if let Some(text) = body.text.as_deref()
969        && !text.trim().is_empty()
970    {
971        ret = ret.query_text(text.to_string());
972    }
973    // Caller-supplied vector wins over auto-embed.
974    if let (Some(m), Some(v)) = (&body.vector_model, &body.vector) {
975        vector_model = Some(m.clone());
976        ret = ret.vector(m.clone(), v.clone());
977    } else if let Some(text) = body.text.as_deref()
978        && !text.trim().is_empty()
979        && let Some(pc) = &s.embed_cfg
980    {
981        let embedder = mnem_embed_providers::open(pc)
982            .map_err(|e| Error::bad_request(format!("embed open failed: {e}")))?;
983        let qvec = embedder
984            .embed(text)
985            .map_err(|e| Error::bad_request(format!("embed call failed: {e}")))?;
986        vector_model = Some(embedder.model().to_string());
987        ret = ret.vector(embedder.model().to_string(), qvec);
988    }
989    // Sparse lane: auto-encode via configured provider. Uses the
990    // inference-free query path when the adapter overrides
991    // `encode_query` (OpenSearch v3-distill).
992    if let Some(text) = body.text.as_deref()
993        && !text.trim().is_empty()
994        && let Some(sc) = &s.sparse_cfg
995    {
996        let sparser = mnem_sparse_providers::open(sc)
997            .map_err(|e| Error::internal(format!("sparse provider open failed: {e}")))?;
998        let sq = sparser
999            .encode_query(text)
1000            .map_err(|e| Error::internal(format!("sparse encode failed: {e}")))?;
1001        sparse_vocab = Some(sq.vocab_id.clone());
1002        ret = ret.sparse_query(sq);
1003    }
1004    // BENCH-1 (C4 audit): cold-start fallback. See sibling block in
1005    // `retrieve()` above for full rationale. When the caller passes a
1006    // text query, has supplied no inline `vector`, and the server has
1007    // no `[embed]` / `[sparse]` configured, fall back to the
1008    // deterministic `MockEmbedder` (blake3-derived, dim=384) instead
1009    // of returning 400. Adds a `skipped[]` entry + a structural
1010    // warning so callers see the degradation in the response.
1011    if body.text.as_deref().is_some_and(|t| !t.trim().is_empty())
1012        && vector_model.is_none()
1013        && sparse_vocab.is_none()
1014        && body.vector.is_none()
1015    {
1016        if let Some(text) = body.text.as_deref() {
1017            let mock = mnem_embed_providers::MockEmbedder::new("mock:cold-start-384", 384);
1018            let qvec = mock
1019                .embed(text)
1020                .map_err(|e| Error::internal(format!("mock embed failed: {e}")))?;
1021            vector_model = Some(mock.model().to_string());
1022            ret = ret.vector(mock.model().to_string(), qvec);
1023            skipped.push(
1024                "embed: cold-start MockEmbedder fallback (no [embed]/[sparse] configured)"
1025                    .to_string(),
1026            );
1027            tracing::warn!(
1028                "retrieve_full: no [embed]/[sparse] configured; using deterministic \
1029 MockEmbedder fallback (cold-start). Configure a real provider in \
1030 config.toml for production retrieval quality."
1031            );
1032        }
1033    }
1034
1035    // Attach cached indexes (audit fix G1): skip O(N) rebuild on every
1036    // retrieve by reusing per-commit-CID cached indexes. Commit
1037    // invalidation is automatic via op-id compare inside IndexCache.
1038    //
1039    // C3 Patch-B: also capture the vector-index handle so the
1040    // community_filter + ppr blocks below can feed it to the
1041    // `GraphCache` KNN-edge fallback when the authored adjacency is
1042    // empty (E0 wire activation).
1043    let mut vector_idx_for_graph: Option<std::sync::Arc<mnem_core::index::BruteForceVectorIndex>> =
1044        None;
1045    {
1046        let mut cache = s.indexes.lock().map_err(|_| Error::locked())?;
1047        if let Some(model) = &vector_model {
1048            let idx = cache.vector_index(&repo, model)?;
1049            vector_idx_for_graph = Some(idx.clone());
1050            ret = ret.with_vector_index(idx);
1051        }
1052        if let Some(vocab) = &sparse_vocab {
1053            let idx = cache.sparse_index(&repo, vocab)?;
1054            ret = ret.with_sparse_index(idx);
1055        }
1056    }
1057
1058    // Tier 3: rerank via adapter.
1059    if let Some(spec) = &body.rerank {
1060        match parse_rerank_spec(spec) {
1061            Ok(cfg) => match mnem_rerank_providers::open(&cfg) {
1062                Ok(rr) => {
1063                    ret = ret.with_reranker(rr);
1064                    if let Some(k) = body.rerank_top_k {
1065                        ret = ret.rerank_top_k(k);
1066                    }
1067                }
1068                Err(e) => {
1069                    skipped.push(format!("rerank: {e}"));
1070                    // Gap 14: structural warning. The detailed error
1071                    // goes on `skipped` (runtime string, includes
1072                    // provider diagnostics); the warning is the
1073                    // agent-routable compile-time-constant version.
1074                    warnings.push(mnem_core::retrieve::Warning::for_code(
1075                        mnem_core::retrieve::WarningCode::NoReranker,
1076                    ));
1077                }
1078            },
1079            Err(e) => {
1080                skipped.push(format!("rerank spec: {e}"));
1081                warnings.push(mnem_core::retrieve::Warning::for_code(
1082                    mnem_core::retrieve::WarningCode::NoReranker,
1083                ));
1084            }
1085        }
1086    }
1087
1088    // C3 FIX-1 v2: CommunityExpander runtime. When the caller sets
1089    // `community_filter: true` (legacy field name; the semantic is
1090    // now ADDITIVE expansion, not filter-drop), fetch (or build) a
1091    // Leiden community assignment over the authored-edges adjacency.
1092    // When that authored adjacency is empty (common under
1093    // `/v1/nodes/bulk` which does not author edges), fall back to a
1094    // deterministic KNN-edge substrate derived from the active vector
1095    // index (k=32, cosine). Expander is additive: it never drops
1096    // candidates, so worst case is neutral. Zero-impact when the flag
1097    // is absent or `false`.
1098    if body.community_filter.unwrap_or(false) {
1099        // Gap 14: detect substrate emptiness BEFORE building the
1100        // Leiden assignment. `has_vectors` is derived from the
1101        // already-captured `vector_idx_for_graph` handle;
1102        // `has_authored_edges` is derived from the graph_cache
1103        // adjacency slot which is populated lazily on first use.
1104        // Both checks are O(1) structural predicates.
1105        let has_vectors = vector_idx_for_graph
1106            .as_deref()
1107            .is_some_and(|v| !v.is_empty());
1108        let has_authored_edges = match s.graph_cache.lock() {
1109            Ok(gc) => gc.adjacency.as_ref().is_some_and(|a| !a.edges.is_empty()),
1110            Err(_) => false,
1111        };
1112        if !has_vectors && !has_authored_edges {
1113            warnings.push(mnem_core::retrieve::Warning::for_code(
1114                mnem_core::retrieve::WarningCode::CommunityFilterNoop,
1115            ));
1116        }
1117        let assignment = {
1118            let mut gc = s.graph_cache.lock().map_err(|_| Error::locked())?;
1119            gc.hybrid_community_for(&repo, vector_idx_for_graph.as_deref())?
1120        };
1121        let expand_seeds = body.community_expand_seeds.unwrap_or(3);
1122        let max_per_community = body.community_max_per.unwrap_or(10);
1123        let decay = body.community_decay.unwrap_or(0.85).clamp(0.0, 1.0);
1124        // min_coverage is retained on the DTO but ignored at runtime
1125        // (expander has no coverage threshold). We keep the value in
1126        // the cfg so debug logs reflect what the client sent.
1127        let min_coverage = body.community_min_coverage.unwrap_or(0.5).clamp(0.0, 1.0);
1128        let cfg = mnem_core::retrieve::CommunityFilterCfg {
1129            enabled: true,
1130            expand_seeds,
1131            max_per_community,
1132            decay,
1133            min_coverage,
1134        };
1135        let lookup_handle_fwd = assignment.clone();
1136        let lookup_handle_inv = assignment.clone();
1137        let lookup = std::sync::Arc::new(mnem_core::retrieve::CommunityLookup::new_with_members(
1138            move |nid| lookup_handle_fwd.community_of(*nid),
1139            move |cid| lookup_handle_inv.members_of(cid).to_vec(),
1140        ));
1141        ret = ret.with_community_filter(cfg, lookup);
1142    }
1143
1144    // C3 FIX-2 + Patch-B: HybridAdjacency + PPR wire. When
1145    // `graph_mode="ppr"`, fetch (or build) the adjacency snapshot and
1146    // install it as the retriever's adjacency index. Uses the same
1147    // KNN-edge fallback so PPR becomes a real traversal instead of the
1148    // identity-under-empty-adjacency no-op.
1149    let want_ppr = body
1150        .graph_mode
1151        .as_deref()
1152        .is_some_and(|m| m.eq_ignore_ascii_case("ppr"));
1153    if want_ppr {
1154        // Gap 14: substrate-emptiness warning for PPR. Same
1155        // precondition as community_filter; PPR on an empty
1156        // transition matrix is the identity pass.
1157        let has_vectors = vector_idx_for_graph
1158            .as_deref()
1159            .is_some_and(|v| !v.is_empty());
1160        let has_authored_edges = match s.graph_cache.lock() {
1161            Ok(gc) => gc.adjacency.as_ref().is_some_and(|a| !a.edges.is_empty()),
1162            Err(_) => false,
1163        };
1164        if !has_vectors && !has_authored_edges {
1165            warnings.push(mnem_core::retrieve::Warning::for_code(
1166                mnem_core::retrieve::WarningCode::PprNoSubstrate,
1167            ));
1168        }
1169        let adj = {
1170            let mut gc = s.graph_cache.lock().map_err(|_| Error::locked())?;
1171            gc.hybrid_adjacency_for(&repo, vector_idx_for_graph.as_deref())?
1172        };
1173        ret = ret.with_adjacency_index(adj);
1174    }
1175
1176    // Tier 2: graph expand (authored-graph traversal, mnem's moat).
1177    if let Some(max_expand) = body.graph_expand {
1178        // Gap 14: graph_expand reads authored edges only (not the
1179        // vector-derived KNN substrate). Emit a warning when the
1180        // authored adjacency is empty so the caller knows the walk
1181        // added nothing.
1182        let has_authored_edges = match s.graph_cache.lock() {
1183            Ok(gc) => gc.adjacency.as_ref().is_some_and(|a| !a.edges.is_empty()),
1184            Err(_) => false,
1185        };
1186        if !has_authored_edges {
1187            warnings.push(mnem_core::retrieve::Warning::for_code(
1188                mnem_core::retrieve::WarningCode::AuthoredAdjacencyEmpty,
1189            ));
1190        }
1191        let mut cfg = mnem_core::retrieve::GraphExpand {
1192            max_expand,
1193            decay: body
1194                .graph_decay
1195                .unwrap_or(mnem_core::retrieve::GraphExpand::DEFAULT_DECAY),
1196            etype_filter: body.graph_etype.clone(),
1197            ..Default::default()
1198        };
1199        if let Some(depth) = body.graph_depth {
1200            cfg = cfg.with_depth(depth);
1201        }
1202        if let Some(cap) = body.graph_max_per_seed {
1203            cfg = cfg.with_max_per_seed(cap);
1204        }
1205        // E2: PPR mode dispatch.
1206        if let Some(mode) = body.graph_mode.as_deref()
1207            && mode == "ppr"
1208        {
1209            let damping = body.ppr_damping.unwrap_or(mnem_core::ppr::DEFAULT_DAMPING);
1210            let iter = body.ppr_iter.unwrap_or(mnem_core::ppr::DEFAULT_MAX_ITER);
1211            cfg = cfg.with_ppr(damping, iter, mnem_core::ppr::DEFAULT_EPS);
1212        }
1213        ret = ret.with_graph_expand(cfg);
1214    }
1215
1216    // Gap 02 #17: forward the caller's `ppr_opt_in` knob. When the
1217    // caller pinned `true`, the retriever's PPR dispatcher skips the
1218    // default-on size gate. Default `false` means the gate is active
1219    // and oversized graphs fall back to decay.
1220    ret = ret.with_ppr_opt_in(body.ppr_opt_in.unwrap_or(false));
1221
1222    // Record retrieve-latency histogram around the fusion call itself.
1223    let retrieve_start = std::time::Instant::now();
1224    let result = ret.execute()?;
1225    s.metrics
1226        .retrieve_latency
1227        .observe(retrieve_start.elapsed().as_secs_f64());
1228
1229    // Gap 02 #17: if the retriever's PPR dispatcher tripped its
1230    // size gate, emit the structured warning and bump the labelled
1231    // counter. The gauge is initialized once in Metrics::new; no
1232    // per-request set is needed.
1233    if result.ppr_size_gate_skipped {
1234        warnings.push(mnem_core::retrieve::Warning::for_code(
1235            mnem_core::retrieve::WarningCode::PprSizeGateSkipped,
1236        ));
1237        s.metrics
1238            .ppr_size_gate_skipped
1239            .get_or_create(&crate::metrics::PprSizeGateLabels {
1240                reason: "above_threshold".into(),
1241            })
1242            .inc();
1243    }
1244    let items: Vec<Value> = result
1245        .items
1246        .iter()
1247        .map(|item| {
1248            // Per-lane observability: expose as a JSON object keyed by
1249            // lane name so API consumers can diagnose "why did this
1250            // node rank" without re-running the pipeline locally.
1251            let mut lane_obj = Map::new();
1252            for (lane, score) in &item.lane_scores {
1253                lane_obj.insert(lane_name(*lane).to_string(), json!(score));
1254            }
1255            json!({
1256            "id": item.node.id.to_uuid_string(),
1257            "label": item.node.ntype,
1258            "score": item.score,
1259            "tokens": item.tokens,
1260            "summary": item.node.summary,
1261            "rendered": item.rendered,
1262            "lane_scores": Value::Object(lane_obj),
1263            })
1264        })
1265        .collect();
1266
1267    // Gap 16: score calibration - scale-free per-query interpretability.
1268    // Mirrors the GET /v1/retrieve handler above. The `score_distribution`
1269    // block carries min / max / median / iqr + a categorical `shape`
1270    // label (long-tail / uniform / bimodal / insufficient-samples) so
1271    // agents can interpret the dense ranking without a trained scaler.
1272    let score_dist = {
1273        let scores: Vec<f32> = result.items.iter().map(|it| it.score).collect();
1274        mnem_graphrag::distribution_shape(&scores, mnem_graphrag::K_MIN)
1275    };
1276
1277    // E4 T2: optional Centroid + MMR extractive summarization over
1278    // the retrieved items' node summaries. Activated strictly by
1279    // `summarize: true` in the request body; absent / false = emit
1280    // no `summary` field at all (zero impact when off).
1281    // Gap 14: structural `warnings[]` array. Omitted when empty to
1282    // keep the wire clean; when non-empty, it is first passed through
1283    // `cap_warnings` to enforce the `WARNINGS_CAP` bound, substituting
1284    // the synthetic `warnings_truncated` entry for any overflow.
1285    let warnings = mnem_core::retrieve::cap_warnings(warnings);
1286    let warnings_json: Vec<Value> = warnings
1287        .iter()
1288        .map(|w| {
1289            json!({
1290            "code": w.code.as_str(),
1291            "knob": w.knob,
1292            "message": w.message,
1293            "remediation_ref": w.remediation_ref,
1294            })
1295        })
1296        .collect();
1297    // Gap 01 (agent-hop incentive): derive four response-metadata
1298    // fields so an LLM agent can decide whether to chase a hop
1299    // without re-running retrieve. All four are pure functions of
1300    // `result.items`; zero extra ranker calls, zero wire-breakage
1301    // for callers that ignore the new keys.
1302    //
1303    // * `confidence` = 1 - S(k)/S(1) over the top-K sorted scores
1304    // (rank-agreement). `0.0` on degenerate (len < 2 or top
1305    // score non-positive) inputs. Scale-free.
1306    // * `suggested_neighbors` = up to 3 items beyond the top-3 seeds
1307    // with a clipped preview and `via = "adjacency"`. Always a
1308    // strict subset of the ranked items (see proptest
1309    // `suggested_neighbors_always_subset_of_adjacency`).
1310    // * `community_density` = fraction of top-K items that share
1311    // the modal community of the top item. `0.0` when no
1312    // community assignment is wired; otherwise in `[0, 1]`.
1313    // * `session_reservoir_ttl_s` = live value of
1314    // `session_reservoir::IDLE_TTL` in seconds. Mirrors the
1315    // `mnem_session_reservoir_ttl_effective` gauge.
1316    let gap01_confidence = gap01_compute_confidence(&result.items);
1317    let gap01_neighbors = gap01_suggested_neighbors(&result.items);
1318    let gap01_community_density = 0.0_f32;
1319    let gap01_session_reservoir_ttl_s = mnem_core::retrieve::session_reservoir::IDLE_TTL.as_secs();
1320
1321    let mut response = json!({
1322    "schema": "mnem.v1.retrieve",
1323    "items": items,
1324    "tokens_used": result.tokens_used,
1325    "tokens_budget": if result.tokens_budget == u32::MAX {
1326    Value::Null
1327    } else {
1328    Value::from(result.tokens_budget)
1329    },
1330    "dropped": result.dropped,
1331    "score_distribution": score_dist,
1332    "candidates_seen": result.candidates_seen,
1333    "skipped": skipped,
1334    "confidence": gap01_confidence,
1335    "suggested_neighbors": gap01_neighbors,
1336    "community_density": gap01_community_density,
1337    "session_reservoir_ttl_s": gap01_session_reservoir_ttl_s,
1338    });
1339    if !warnings_json.is_empty() {
1340        response["warnings"] = Value::Array(warnings_json);
1341    }
1342
1343    if body.summarize.unwrap_or(false) {
1344        let k = body.summarize_k.unwrap_or(3).min(MAX_RETRIEVE_LIMIT);
1345        // C3 FIX-4: accumulate sentences AND a per-sentence
1346        // centrality vector in lockstep. When PPR was active
1347        // (graph_mode="ppr") we reuse the retriever's final item
1348        // score as a PPR-aware centrality proxy; else we fall
1349        // back to authored-edge degree from the graph_cache
1350        // adjacency; else a uniform 1.0 (identical to pre-E2).
1351        let mut sentences: Vec<String> = Vec::new();
1352        let mut centrality_weights: Vec<f32> = Vec::new();
1353        // Build an optional NodeId -> degree map once.
1354        let degree_map: Option<std::collections::HashMap<NodeId, u32>> = if want_ppr {
1355            // Degree is derived from the same authored snapshot the
1356            // retriever just saw; if it isn't cached we skip the
1357            // degree map rather than re-walk the repo here.
1358            if let Ok(gc) = s.graph_cache.lock() {
1359                gc.adjacency.as_ref().map(|adj| {
1360                    let mut m: std::collections::HashMap<NodeId, u32> =
1361                        std::collections::HashMap::new();
1362                    for (src, dst) in &adj.edges {
1363                        *m.entry(*src).or_insert(0) += 1;
1364                        *m.entry(*dst).or_insert(0) += 1;
1365                    }
1366                    m
1367                })
1368            } else {
1369                None
1370            }
1371        } else {
1372            None
1373        };
1374        for it in &result.items {
1375            if let Some(summary) = it.node.summary.clone() {
1376                sentences.push(summary);
1377                let w = if want_ppr {
1378                    // PPR-aware: use the final retrieve score
1379                    // (already PPR-propagated through graph_expand).
1380                    it.score.max(0.0)
1381                } else if let Some(m) = &degree_map {
1382                    m.get(&it.node.id).copied().unwrap_or(0) as f32
1383                } else {
1384                    1.0_f32
1385                };
1386                centrality_weights.push(w);
1387            }
1388        }
1389        // If no embedder is configured OR there are no sentences,
1390        // surface an empty summary and a skipped-reason; callers
1391        // can treat the absence of a non-empty summary the same
1392        // way they already handle missing rerank / HyDE.
1393        if sentences.is_empty() {
1394            response["summary"] = json!([]);
1395        } else if let Some(pc) = &s.embed_cfg {
1396            match mnem_embed_providers::open(pc) {
1397                Ok(embedder) => {
1398                    let centrality_vec = centrality_weights.clone();
1399                    let centrality =
1400                        move |i: usize| centrality_vec.get(i).copied().unwrap_or(1.0_f32);
1401                    match mnem_graphrag::summarize_community(
1402                        &sentences,
1403                        embedder.as_ref(),
1404                        None, // query vector optional; omitted at the HTTP edge for now
1405                        &centrality,
1406                        k,
1407                        0.5,
1408                    ) {
1409                        Ok(summary) => {
1410                            let arr: Vec<Value> = summary
1411                                .sentences
1412                                .iter()
1413                                .zip(summary.scores.iter())
1414                                .map(|(s, score)| json!({"sentence": s, "score": score}))
1415                                .collect();
1416                            response["summary"] = Value::Array(arr);
1417                        }
1418                        Err(e) => {
1419                            response["summary"] = json!([]);
1420                            response["summarize_skipped"] = json!(format!("summarize failed: {e}"));
1421                        }
1422                    }
1423                }
1424                Err(e) => {
1425                    response["summary"] = json!([]);
1426                    response["summarize_skipped"] =
1427                        json!(format!("embed provider open failed: {e}"));
1428                }
1429            }
1430        } else {
1431            response["summary"] = json!([]);
1432            response["summarize_skipped"] = json!("no [embed] provider configured on server");
1433        }
1434    }
1435
1436    Ok(Json(response))
1437}
1438
1439/// Parse a PROVIDER:MODEL rerank spec into a live
1440/// `mnem_rerank_providers::ProviderConfig`. Reads API-key env-var
1441/// names from the defaults shipped by mnem-rerank-providers; callers
1442/// who need custom env vars must set them via the `[rerank]` section
1443/// in `config.toml` and rely on the CLI instead.
1444fn parse_rerank_spec(spec: &str) -> Result<mnem_rerank_providers::ProviderConfig, String> {
1445    let (prov, model) = spec
1446        .split_once(':')
1447        .ok_or_else(|| format!("expected PROVIDER:MODEL, got `{spec}`"))?;
1448    if model.is_empty() {
1449        return Err(format!("empty model in `{spec}`"));
1450    }
1451    match prov {
1452        "cohere" => Ok(mnem_rerank_providers::ProviderConfig::Cohere(
1453            mnem_rerank_providers::CohereConfig {
1454                model: model.into(),
1455                ..Default::default()
1456            },
1457        )),
1458        "voyage" => Ok(mnem_rerank_providers::ProviderConfig::Voyage(
1459            mnem_rerank_providers::VoyageConfig {
1460                model: model.into(),
1461                ..Default::default()
1462            },
1463        )),
1464        "jina" => Ok(mnem_rerank_providers::ProviderConfig::Jina(
1465            mnem_rerank_providers::JinaConfig {
1466                model: model.into(),
1467                ..Default::default()
1468            },
1469        )),
1470        other => Err(format!(
1471            "unknown rerank provider `{other}`; want cohere|voyage|jina"
1472        )),
1473    }
1474}
1475
1476// ---------- helpers ----------
1477//
1478// `json_to_ipld` is re-exported from `mnem_core::codec`; keeping one
1479// canonical implementation in the core crate ensures that any future
1480// hardening (depth cap adjustment, additional numeric rejection, ...)
1481// applies uniformly across CLI, HTTP, and MCP inputs. See
1482// `crates/mnem-core/src/codec/json.rs` for the shared logic.
1483
1484fn ipld_to_json(v: &Ipld) -> Value {
1485    match v {
1486        Ipld::Null => Value::Null,
1487        Ipld::Bool(b) => Value::Bool(*b),
1488        Ipld::Integer(i) => serde_json::Number::from_i128(*i).map_or(Value::Null, Value::Number),
1489        Ipld::Float(f) => serde_json::Number::from_f64(*f).map_or(Value::Null, Value::Number),
1490        Ipld::String(s) => Value::String(s.clone()),
1491        Ipld::Bytes(b) => Value::String(format!("<{} bytes>", b.len())),
1492        Ipld::List(xs) => Value::Array(xs.iter().map(ipld_to_json).collect()),
1493        Ipld::Map(m) => {
1494            let mut out = Map::new();
1495            for (k, v) in m {
1496                out.insert(k.clone(), ipld_to_json(v));
1497            }
1498            Value::Object(out)
1499        }
1500        Ipld::Link(cid) => Value::String(cid.to_string()),
1501    }
1502}
1503
1504fn parse_kv(s: &str) -> Result<(String, Ipld), String> {
1505    let (k, v) = s
1506        .split_once('=')
1507        .ok_or_else(|| format!("expected KEY=VALUE, got `{s}`"))?;
1508    let val = match serde_json::from_str::<Value>(v) {
1509        Ok(json) => json_to_ipld(&json).map_err(|e| e.to_string())?,
1510        Err(_) => Ipld::String(v.to_string()),
1511    };
1512    Ok((k.to_string(), val))
1513}
1514
1515// ============================================================
1516// Gap 01 (agent-hop incentive) helpers.
1517//
1518// All three helpers below are pure functions of the ranked items;
1519// they do not touch the repo, do not allocate index structures,
1520// and do not emit metrics on their own (the caller does, in
1521// `retrieve_full`).
1522//
1523// They are `pub(crate)` so the integration / proptest module
1524// (`tests::gap01_neighbors_proptest`) can exercise them without
1525// spinning up a full `AppState`.
1526// ============================================================
1527
1528/// How many top-ranked items to treat as "seeds" when slicing the
1529/// neighbour list. Matches the rest of the Gap 01 spec's
1530/// `community_expand_seeds` default and the `max_neighbours = 3`
1531/// floor-c constant pinned in
1532/// `gap-catalog/01-agent-hop-incentive/solution.md`.
1533pub(crate) const GAP01_TOP_SEEDS: usize = 3;
1534
1535/// Per-request cap on the number of neighbour hints emitted.
1536/// Floor-c constant: per-item amplification bound from
1537/// `SPEC §retrieve.response-budget` (aggregate response bytes
1538/// <= 64 KiB). See `gap-catalog/01-agent-hop-incentive/solution.md`
1539/// "Floor-c apparatus".
1540pub(crate) const GAP01_MAX_NEIGHBOURS: usize = 3;
1541
1542/// Clip length for the neighbour `preview` field, in chars.
1543/// Bounds the response-size contribution of the hints block;
1544/// the value is the HTTP per-line budget used elsewhere in this
1545/// crate.
1546pub(crate) const GAP01_PREVIEW_CHARS: usize = 200;
1547
1548/// Compute `confidence` as rank-agreement derived from the score
1549/// distribution of `items`.
1550///
1551/// `confidence = 1 - S(k) / S(1)` where `S(i)` is the i-th
1552/// sorted score (descending). Captures "is the top item clearly
1553/// ahead of the pack?" without a magic threshold. Scale-free
1554/// because both the numerator and denominator are drawn from
1555/// the same score distribution.
1556///
1557/// Returns `0.0` on degenerate input (`< 2` items, non-positive
1558/// top score, NaN top score).
1559pub(crate) fn gap01_compute_confidence(items: &[mnem_core::retrieve::RetrievedItem]) -> f32 {
1560    if items.len() < 2 {
1561        return 0.0;
1562    }
1563    let top = items[0].score;
1564    if !top.is_finite() || top <= 0.0 {
1565        return 0.0;
1566    }
1567    // `items` is already in RRF-rank order (descending score), but
1568    // defend against a degenerate case where ties re-order past
1569    // the caller's expectation by taking the raw last element.
1570    let tail = items[items.len() - 1].score.max(0.0);
1571    (1.0 - (tail / top)).clamp(0.0, 1.0)
1572}
1573
1574/// Compute the `suggested_neighbors` list (up to
1575/// [`GAP01_MAX_NEIGHBOURS`] entries) from the ranked items past
1576/// the top [`GAP01_TOP_SEEDS`] seeds.
1577///
1578/// Each entry is `{id, preview, via}`. `via` is always
1579/// `"adjacency"` because neighbours are drawn from the same
1580/// adjacency-derived ranked list; if a future Gap 15 integration
1581/// sources neighbours from KNN substrate, the `via` label flips
1582/// to `"knn"`.
1583///
1584/// Guaranteed a subset of `items` by construction. The proptest
1585/// `suggested_neighbors_always_subset_of_adjacency` pins this
1586/// invariant across random inputs.
1587pub(crate) fn gap01_suggested_neighbors(
1588    items: &[mnem_core::retrieve::RetrievedItem],
1589) -> Vec<Value> {
1590    items
1591        .iter()
1592        .skip(GAP01_TOP_SEEDS)
1593        .take(GAP01_MAX_NEIGHBOURS)
1594        .map(|it| {
1595            let preview: String = it.rendered.chars().take(GAP01_PREVIEW_CHARS).collect();
1596            json!({
1597            "id": it.node.id.to_uuid_string(),
1598            "preview": preview,
1599            "via": "adjacency",
1600            })
1601        })
1602        .collect()
1603}
1604
1605// ---------- POST /v1/explain (gap-06) ----------
1606
1607/// Default serialisation throughput in bytes/ms used to derive
1608/// `max_path_bytes_total` when the caller omits `latency_budget_ms`.
1609pub(crate) const DEFAULT_SERIALIZATION_RATE_BYTES_PER_MS: u64 = 4_096;
1610
1611/// Default per-request latency budget in milliseconds.
1612pub(crate) const DEFAULT_LATENCY_BUDGET_MS: u32 = 256;
1613
1614/// Max per-node incoming fan-in walked during BFS. Matches
1615/// `Query::DEFAULT_ADJACENCY_CAP` and prevents a celebrity dst DoS.
1616pub(crate) const EXPLAIN_ADJACENCY_CAP: usize = 256;
1617
1618/// Max BFS depth the `/v1/explain` handler will honour regardless
1619/// of the request. `u16` for parent-index compactness.
1620pub(crate) const EXPLAIN_MAX_DEPTH: u16 = 8;
1621
1622/// `explain_mode` enum (Round 3 of gap-06).
1623#[derive(Serialize, Deserialize, Clone, Copy, Debug, Default, PartialEq, Eq)]
1624#[serde(rename_all = "snake_case")]
1625pub(crate) enum ExplainMode {
1626    /// Compact parent-pointer encoding, IDs only. Multi-tenant safe.
1627    #[default]
1628    Compact,
1629    /// Compact + full payloads. Requires ACL; falls back to
1630    /// `Compact` with a warning when requested without ACL.
1631    CompactFull,
1632}
1633
1634/// Request body for `POST /v1/explain`.
1635#[derive(Deserialize, Debug)]
1636pub(crate) struct ExplainRequest {
1637    /// Seed node. BFS fans outward along incoming edges.
1638    pub node_id: String,
1639    /// Max depth; clamped to [`EXPLAIN_MAX_DEPTH`].
1640    #[serde(default = "default_explain_depth")]
1641    pub depth: u16,
1642    /// Encoding mode. Default [`ExplainMode::Compact`].
1643    #[serde(default)]
1644    pub mode: ExplainMode,
1645    /// Per-request latency budget in ms.
1646    #[serde(default)]
1647    pub latency_budget_ms: Option<u32>,
1648    /// Serialisation throughput override.
1649    #[serde(default)]
1650    pub serialization_rate_bytes_per_ms: Option<u64>,
1651}
1652
1653fn default_explain_depth() -> u16 {
1654    3
1655}
1656
1657/// Runtime derivation: `max_path_bytes_total = remaining_ms *
1658/// serialization_rate_bytes_per_ms`, saturating on overflow.
1659///
1660/// Exposed at the crate root (see `lib.rs`) so integration tests
1661/// can exercise the invariant directly.
1662#[must_use]
1663pub fn derive_max_path_bytes(remaining_ms: u32, serialization_rate_bytes_per_ms: u64) -> usize {
1664    u64::from(remaining_ms)
1665        .saturating_mul(serialization_rate_bytes_per_ms)
1666        .try_into()
1667        .unwrap_or(usize::MAX)
1668}
1669
1670/// `POST /v1/explain`: in-band derivation path via BFS over the
1671/// incoming-edge adjacency index. Redacts to IDs only by default.
1672pub(crate) async fn explain(
1673    State(s): State<AppState>,
1674    Json(body): Json<ExplainRequest>,
1675) -> Result<Json<Value>, Error> {
1676    let seed = NodeId::parse_uuid(&body.node_id)
1677        .map_err(|e| Error::bad_request(format!("invalid node_id UUID: {e}")))?;
1678    let depth = body.depth.min(EXPLAIN_MAX_DEPTH);
1679
1680    // Runtime-derived byte cap. No magic number: caller can override
1681    // both knobs. `.filter(|&v| v > 0)` keeps zero from silently
1682    // disabling the cap.
1683    let rate = body
1684        .serialization_rate_bytes_per_ms
1685        .filter(|&r| r > 0)
1686        .unwrap_or(DEFAULT_SERIALIZATION_RATE_BYTES_PER_MS);
1687    let budget_ms = body
1688        .latency_budget_ms
1689        .filter(|&m| m > 0)
1690        .unwrap_or(DEFAULT_LATENCY_BUDGET_MS);
1691    let max_bytes = derive_max_path_bytes(budget_ms, rate);
1692
1693    // ACL gate: compact_full requires per-tenant ACL (not in a future version).
1694    let (effective_mode, mode_warning): (ExplainMode, Option<&'static str>) = match body.mode {
1695        ExplainMode::Compact => (ExplainMode::Compact, None),
1696        ExplainMode::CompactFull => (
1697            ExplainMode::Compact,
1698            Some("compact_full requested but no ACL is configured; falling back to compact"),
1699        ),
1700    };
1701
1702    let repo = s.repo.lock().map_err(|_| Error::locked())?;
1703
1704    // BFS with parent tracking. `nodes[0]` is the seed; every step
1705    // carries `(parent_idx, to_idx)` into the nodes array.
1706    let mut nodes: Vec<NodeId> = vec![seed];
1707    let mut visited: std::collections::HashMap<NodeId, u32> = std::collections::HashMap::new();
1708    visited.insert(seed, 0);
1709    let mut steps: Vec<(u16, u32)> = Vec::new();
1710    let mut truncated_reason: Option<&'static str> = None;
1711
1712    let mut frontier: Vec<u32> = vec![0];
1713    'bfs: for _hop in 0..depth {
1714        let mut next_frontier: Vec<u32> = Vec::new();
1715        for &parent_idx in &frontier {
1716            let parent_node = nodes[parent_idx as usize];
1717            let edges = repo
1718                .incoming_edges_capped(&parent_node, None, EXPLAIN_ADJACENCY_CAP)
1719                .map_err(Error::from)?;
1720            for edge in edges {
1721                let from = edge.src;
1722                if visited.contains_key(&from) {
1723                    continue;
1724                }
1725                // Projected wire bytes: ~32/step + ~40/node (JSON).
1726                let projected =
1727                    steps.len().saturating_mul(32) + nodes.len().saturating_mul(40) + 32;
1728                if projected > max_bytes {
1729                    truncated_reason = Some("response_budget");
1730                    break 'bfs;
1731                }
1732                let new_idx: u32 = nodes.len().try_into().unwrap_or(u32::MAX);
1733                nodes.push(from);
1734                visited.insert(from, new_idx);
1735                steps.push((u16::try_from(parent_idx).unwrap_or(u16::MAX), new_idx));
1736                next_frontier.push(new_idx);
1737            }
1738        }
1739        if next_frontier.is_empty() {
1740            break;
1741        }
1742        frontier = next_frontier;
1743    }
1744    if truncated_reason.is_none() && depth == EXPLAIN_MAX_DEPTH && !frontier.is_empty() {
1745        truncated_reason = Some("depth");
1746    }
1747    drop(repo);
1748
1749    let nodes_wire: Vec<Value> = nodes
1750        .iter()
1751        .map(|n| Value::String(n.to_uuid_string()))
1752        .collect();
1753    let steps_wire: Vec<Value> = steps
1754        .iter()
1755        .map(|(p, t)| {
1756            json!({
1757            "parent_idx": p,
1758            "to_idx": t,
1759            })
1760        })
1761        .collect();
1762
1763    let mut warnings: Vec<Value> = Vec::new();
1764    if let Some(w) = mode_warning {
1765        warnings.push(json!({
1766        "code": "explain.mode_downgraded",
1767        "message": w,
1768        }));
1769    }
1770
1771    let mode_str = match effective_mode {
1772        ExplainMode::Compact => "compact",
1773        ExplainMode::CompactFull => "compact_full",
1774    };
1775
1776    Ok(Json(json!({
1777    "schema": "mnem.v1.explain",
1778    "seed": seed.to_uuid_string(),
1779    "mode": mode_str,
1780    "path_source":
1781    format!("bfs.v1:graph_depth={depth}:edge_source=adjacency.v1"),
1782    "max_path_bytes_total": max_bytes,
1783    "latency_budget_ms": budget_ms,
1784    "serialization_rate_bytes_per_ms": rate,
1785    "nodes": nodes_wire,
1786    "steps": steps_wire,
1787    "path_truncated": truncated_reason.is_some(),
1788    "path_truncated_reason": truncated_reason,
1789    "warnings": warnings,
1790    })))
1791}
1792
1793// Proptest for `byte_cap_never_exceeds_budget` lives in
1794// `tests/wire_explain.rs` so it runs under the integration harness
1795// (avoids a dependency on the pre-existing `gap01_tests` module
1796// whose `Node::new` call was broken by an upstream signature
1797// change). Callers verifying the invariant can reuse the
1798// `pub(crate)` `derive_max_path_bytes` function exposed above.
1799
1800#[cfg(test)]
1801mod gap01_tests {
1802    use super::*;
1803    use mnem_core::id::NodeId;
1804    use mnem_core::objects::Node;
1805    use mnem_core::retrieve::RetrievedItem;
1806    use proptest::prelude::*;
1807
1808    fn fake_item(score: f32) -> RetrievedItem {
1809        // `Node::new` with no props is enough here; only `id` and
1810        // `rendered` are read downstream.
1811        let node = Node::new(NodeId::new_v7(), "Gap01Probe");
1812        RetrievedItem::new(node, "rendered preview".to_string(), 4, score)
1813    }
1814
1815    #[test]
1816    fn confidence_zero_on_empty() {
1817        assert_eq!(gap01_compute_confidence(&[]), 0.0);
1818    }
1819
1820    #[test]
1821    fn confidence_zero_on_singleton() {
1822        assert_eq!(gap01_compute_confidence(&[fake_item(1.0)]), 0.0);
1823    }
1824
1825    #[test]
1826    fn confidence_high_when_tail_far_below_top() {
1827        let items = vec![fake_item(1.0), fake_item(0.9), fake_item(0.01)];
1828        let c = gap01_compute_confidence(&items);
1829        assert!(c > 0.9, "expected >0.9, got {c}");
1830    }
1831
1832    #[test]
1833    fn confidence_low_when_flat() {
1834        let items = vec![fake_item(1.0), fake_item(0.99), fake_item(0.98)];
1835        let c = gap01_compute_confidence(&items);
1836        assert!(c < 0.1, "expected <0.1, got {c}");
1837    }
1838
1839    #[test]
1840    fn suggested_neighbors_empty_below_top_seeds() {
1841        let items = vec![fake_item(1.0), fake_item(0.9), fake_item(0.8)];
1842        assert!(gap01_suggested_neighbors(&items).is_empty());
1843    }
1844
1845    #[test]
1846    fn suggested_neighbors_skips_top_seeds() {
1847        let items = vec![
1848            fake_item(1.0),
1849            fake_item(0.9),
1850            fake_item(0.8),
1851            fake_item(0.7),
1852            fake_item(0.6),
1853        ];
1854        let n = gap01_suggested_neighbors(&items);
1855        assert_eq!(n.len(), 2);
1856        // `via` is always "adjacency".
1857        for entry in &n {
1858            assert_eq!(entry["via"], "adjacency");
1859        }
1860    }
1861
1862    #[test]
1863    fn suggested_neighbors_bounded_by_max() {
1864        let items: Vec<_> = (0..100).map(|i| fake_item(1.0 - i as f32 * 0.01)).collect();
1865        let n = gap01_suggested_neighbors(&items);
1866        assert!(n.len() <= GAP01_MAX_NEIGHBOURS);
1867    }
1868
1869    proptest! {
1870    /// Gap 01 proptest: the `suggested_neighbors` list is
1871    /// always a strict subset of the adjacency (ranked items)
1872    /// passed in. The proof is trivial by construction
1873    /// (`.iter().skip(GAP01_TOP_SEEDS).take(GAP01_MAX_NEIGHBOURS)`)
1874    /// but the property pins the invariant so that any future
1875    /// refactor which drifts into pulling IDs from a different
1876    /// source (e.g. a sibling lookup) has to rewrite this test.
1877    #[test]
1878    fn suggested_neighbors_always_subset_of_adjacency(
1879    scores in proptest::collection::vec(-1.0f32..1.0f32, 0..32),
1880    ) {
1881    let items: Vec<_> = scores.iter().map(|&s| fake_item(s)).collect();
1882    let neighbours = gap01_suggested_neighbors(&items);
1883    // Every `id` in the neighbour list must appear in the
1884    // original adjacency (ranked items).
1885    let ids: Vec<String> = items
1886    .iter()
1887    .map(|it| it.node.id.to_uuid_string())
1888    .collect();
1889    for entry in &neighbours {
1890    let nid = entry["id"].as_str().expect("id field");
1891    prop_assert!(
1892    ids.iter().any(|i| i == nid),
1893    "neighbour id {nid} not in adjacency"
1894    );
1895    }
1896    // And the cardinality is bounded.
1897    prop_assert!(neighbours.len() <= GAP01_MAX_NEIGHBOURS);
1898    }
1899    }
1900}