use axum::Json;
use axum::extract::{Path, Query, State};
use ipld_core::ipld::Ipld;
use mnem_core::codec::json_to_ipld;
use mnem_core::id::NodeId;
use mnem_core::index::PropPredicate;
use mnem_core::objects::Node;
use mnem_core::retrieve::Lane;
use mnem_embed_providers::Embedder as _;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value, json};
use crate::error::Error;
use crate::state::AppState;
const fn lane_name(lane: Lane) -> &'static str {
match lane {
Lane::Vector => "vector",
Lane::Sparse => "sparse",
Lane::GraphExpand => "graph_expand",
Lane::Rerank => "rerank",
_ => "unknown",
}
}
pub(crate) const MAX_RETRIEVE_LIMIT: usize = 1_000;
pub(crate) const MAX_VECTOR_CAP: usize = 100_000;
pub(crate) const MAX_RERANK_TOP_K: usize = 500;
fn clamp_or_reject(name: &'static str, value: Option<usize>, cap: usize) -> Result<(), Error> {
if let Some(n) = value
&& n > cap
{
return Err(Error::bad_request(format!(
"{name}={n} exceeds max of {cap}; lower the value or split the request"
)));
}
Ok(())
}
pub(crate) async fn healthz() -> Json<Value> {
Json(json!({
"schema": "mnem.v1.healthz",
"ok": true,
"service": "mnem http",
"version": env!("CARGO_PKG_VERSION"),
}))
}
pub(crate) async fn stats(State(s): State<AppState>) -> Result<Json<Value>, Error> {
let repo = s.repo.lock().map_err(|_| Error::locked())?;
let op_id = repo.op_id().to_string();
let head = repo.view().heads.first().map(ToString::to_string);
let refs = repo.view().refs.len();
Ok(Json(json!({
"schema": "mnem.v1.stats",
"op_id": op_id,
"head_commit": head,
"refs": refs,
})))
}
#[derive(Deserialize)]
pub(crate) struct PostNodeBody {
#[serde(default)]
pub label: String,
pub summary: Option<String>,
pub props: Option<Map<String, Value>>,
pub content: Option<String>,
#[serde(default)]
pub author: Option<String>,
#[serde(default)]
pub message: Option<String>,
#[serde(default)]
pub id: Option<String>,
}
#[derive(Serialize)]
pub(crate) struct PostNodeResp {
schema: &'static str,
id: String,
label: String,
op_id: String,
}
pub(crate) async fn post_node(
State(s): State<AppState>,
Json(body): Json<PostNodeBody>,
) -> Result<Json<PostNodeResp>, Error> {
let label = if s.allow_labels && !body.label.trim().is_empty() {
body.label.clone()
} else {
Node::DEFAULT_NTYPE.to_string()
};
let author = body
.author
.as_deref()
.map(str::trim)
.filter(|a| !a.is_empty())
.map(str::to_string);
let author = match author {
Some(a) => a,
None => return Err(Error::bad_request("author is required")),
};
let node_id = match body.id.as_deref() {
Some(s) => NodeId::parse_uuid(s)
.map_err(|e| Error::bad_request(format!("invalid caller-supplied id: {e}")))?,
None => NodeId::new_v7(),
};
let mut node = Node::new(node_id, &label);
if let Some(sum) = &body.summary {
node = node.with_summary(sum);
}
if let Some(props) = body.props {
for (k, v) in props {
node = node.with_prop(
k,
json_to_ipld(&v).map_err(|e| Error::bad_request(e.to_string()))?,
);
}
}
if let Some(c) = body.content {
node = node.with_content(bytes::Bytes::from(c.into_bytes()));
}
let text_for_embed: Option<String> = node
.summary
.as_ref()
.filter(|t| !t.trim().is_empty())
.cloned();
let mut pending_dense: Option<(String, mnem_core::objects::Embedding)> = None;
if let Some(text) = text_for_embed {
if let Some(pc) = &s.embed_cfg
&& let Ok(embedder) = mnem_embed_providers::open(pc)
&& let Ok(v) = embedder.embed(&text)
{
let emb = mnem_embed_providers::to_embedding(embedder.model(), &v);
pending_dense = Some((embedder.model().to_string(), emb));
}
if let Some(sc) = &s.sparse_cfg
&& let Ok(sparser) = mnem_sparse_providers::open(sc)
&& let Ok(se) = sparser.encode(&text)
{
node = node.with_sparse_embed(se);
}
}
let id = node.id;
let mut guard = s.repo.lock().map_err(|_| Error::locked())?;
let mut tx = guard.start_transaction();
let cid = tx.add_node(&node)?;
if let Some((model, emb)) = pending_dense {
tx.set_embedding(cid, model, emb)?;
}
let commit_start = std::time::Instant::now();
let new_repo = tx.commit(
&author,
body.message.as_deref().unwrap_or("mnem http add node"),
)?;
s.metrics
.commit_duration
.observe(commit_start.elapsed().as_secs_f64());
let op_id = new_repo.op_id().to_string();
*guard = new_repo;
Ok(Json(PostNodeResp {
schema: "mnem.v1.post-node",
id: id.to_uuid_string(),
label: body.label,
op_id,
}))
}
pub(crate) async fn get_node(
State(s): State<AppState>,
Path(id_str): Path<String>,
) -> Result<Json<Value>, Error> {
let id = NodeId::parse_uuid(&id_str)
.map_err(|e| Error::bad_request(format!("invalid UUID: {e}")))?;
let repo = s.repo.lock().map_err(|_| Error::locked())?;
let node = repo
.lookup_node(&id)?
.ok_or_else(|| Error::not_found(format!("no node with id={id_str}")))?;
let mut props_map = Map::new();
for (k, v) in &node.props {
props_map.insert(k.clone(), ipld_to_json(v));
}
let has_embedding = match s.embed_cfg.as_ref() {
Some(pc) => {
let model = model_fq_of(pc);
let (_, node_cid) = mnem_core::codec::hash_to_cid(&node)
.map_err(|e| Error::internal(format!("hash node: {e}")))?;
repo.embedding_for(&node_cid, &model)?.is_some()
}
None => false,
};
Ok(Json(json!({
"schema": "mnem.v1.node",
"id": node.id.to_uuid_string(),
"label": node.ntype,
"summary": node.summary,
"props": Value::Object(props_map),
"content_bytes": node.content.as_ref().map_or(0, bytes::Bytes::len),
"has_embedding": has_embedding,
})))
}
fn model_fq_of(pc: &mnem_embed_providers::ProviderConfig) -> String {
use mnem_embed_providers::ProviderConfig as PC;
match pc {
PC::Openai(c) => format!("openai:{}", c.model),
PC::Ollama(c) => format!("ollama:{}", c.model),
PC::Onnx(c) => format!("onnx:{}", c.model),
}
}
#[derive(Deserialize)]
pub(crate) struct DeleteQuery {
pub author: String,
#[serde(default)]
pub message: Option<String>,
}
pub(crate) async fn delete_node(
State(s): State<AppState>,
Path(id_str): Path<String>,
Query(q): Query<DeleteQuery>,
) -> Result<Json<Value>, Error> {
let id = NodeId::parse_uuid(&id_str)
.map_err(|e| Error::bad_request(format!("invalid UUID: {e}")))?;
if q.author.trim().is_empty() {
return Err(Error::bad_request("author is required"));
}
let mut guard = s.repo.lock().map_err(|_| Error::locked())?;
let existed = guard.lookup_node(&id)?.is_some();
let mut tx = guard.start_transaction();
tx.remove_node(id);
let commit_start = std::time::Instant::now();
let new_repo = tx.commit(
&q.author,
q.message.as_deref().unwrap_or("mnem http delete node"),
)?;
s.metrics
.commit_duration
.observe(commit_start.elapsed().as_secs_f64());
let op_id = new_repo.op_id().to_string();
*guard = new_repo;
Ok(Json(json!({
"schema": "mnem.v1.delete-node",
"id": id_str,
"existed": existed,
"op_id": op_id,
})))
}
#[derive(Deserialize)]
pub(crate) struct TombstoneBody {
#[serde(default)]
pub reason: String,
pub author: String,
}
pub(crate) async fn tombstone_node(
State(s): State<AppState>,
Path(id_str): Path<String>,
Json(body): Json<TombstoneBody>,
) -> Result<Json<Value>, Error> {
let id = NodeId::parse_uuid(&id_str)
.map_err(|e| Error::bad_request(format!("invalid UUID: {e}")))?;
if body.author.trim().is_empty() {
return Err(Error::bad_request("author is required"));
}
let mut guard = s.repo.lock().map_err(|_| Error::locked())?;
if guard.lookup_node(&id)?.is_none() {
return Err(Error::not_found(format!("no node with id={id_str}")));
}
if guard.is_tombstoned(&id) {
return Err(Error::conflict(format!(
"node {id_str} is already tombstoned"
)));
}
let mut tx = guard.start_transaction();
tx.tombstone_node(id, body.reason.clone())?;
let commit_start = std::time::Instant::now();
let new_repo = tx.commit(&body.author, "mnem http tombstone node")?;
s.metrics
.commit_duration
.observe(commit_start.elapsed().as_secs_f64());
let op_id = new_repo.op_id().to_string();
*guard = new_repo;
Ok(Json(json!({
"schema": "mnem.v1.tombstone",
"op_id": op_id,
"node_id": id_str,
})))
}
#[derive(Deserialize)]
pub(crate) struct BulkNodeBody {
pub nodes: Vec<PostNodeBody>,
pub author: String,
#[serde(default)]
pub message: Option<String>,
#[serde(default = "default_true")]
pub auto_embed: bool,
}
const fn default_true() -> bool {
true
}
#[derive(Serialize)]
pub(crate) struct BulkNodeResp {
schema: &'static str,
op_id: String,
results: Vec<BulkNodeEntry>,
embedded: u32,
skipped_embed: u32,
}
#[derive(Serialize)]
pub(crate) struct BulkNodeEntry {
id: String,
label: String,
}
pub(crate) async fn post_nodes_bulk(
State(s): State<AppState>,
Json(body): Json<BulkNodeBody>,
) -> Result<Json<BulkNodeResp>, Error> {
if body.author.trim().is_empty() {
return Err(Error::bad_request("author is required"));
}
if body.nodes.is_empty() {
return Err(Error::bad_request("nodes must not be empty"));
}
let embedder = if body.auto_embed {
match s.embed_cfg.as_ref() {
Some(pc) => Some(mnem_embed_providers::open(pc).map_err(|e| {
Error::internal(format!(
"embed provider configured but open failed: {e}; bulk aborted to avoid silent no-embed commit"
))
})?),
None => None,
}
} else {
None
};
let sparser = if body.auto_embed {
match s.sparse_cfg.as_ref() {
Some(sc) => Some(mnem_sparse_providers::open(sc).map_err(|e| {
Error::internal(format!(
"sparse provider configured but open failed: {e}; bulk aborted to avoid silent no-sparse commit"
))
})?),
None => None,
}
} else {
None
};
let mut built: Vec<(Node, Option<(String, mnem_core::objects::Embedding)>)> =
Vec::with_capacity(body.nodes.len());
let mut results: Vec<BulkNodeEntry> = Vec::with_capacity(body.nodes.len());
let mut embedded = 0u32;
let mut skipped_embed = 0u32;
for nb in body.nodes {
let label = if s.allow_labels && !nb.label.trim().is_empty() {
nb.label.clone()
} else {
Node::DEFAULT_NTYPE.to_string()
};
let node_id = match nb.id.as_deref() {
Some(s) => NodeId::parse_uuid(s)
.map_err(|e| Error::bad_request(format!("invalid caller-supplied id: {e}")))?,
None => NodeId::new_v7(),
};
let mut node = Node::new(node_id, &label);
if let Some(sum) = &nb.summary {
node = node.with_summary(sum);
}
if let Some(props) = nb.props {
for (k, v) in props {
node = node.with_prop(
k,
json_to_ipld(&v).map_err(|e| Error::bad_request(e.to_string()))?,
);
}
}
if let Some(c) = nb.content {
node = node.with_content(bytes::Bytes::from(c.into_bytes()));
}
let text_for_embed: Option<String> = node
.summary
.as_ref()
.filter(|t| !t.trim().is_empty())
.cloned();
let mut pending_dense: Option<(String, mnem_core::objects::Embedding)> = None;
if let Some(text) = text_for_embed {
if let Some(embedder) = embedder.as_ref() {
match embedder.embed(&text) {
Ok(v) => {
let emb = mnem_embed_providers::to_embedding(embedder.model(), &v);
pending_dense = Some((embedder.model().to_string(), emb));
embedded += 1;
}
Err(_) => {
skipped_embed += 1;
}
}
}
if let Some(sparser) = sparser.as_ref()
&& let Ok(se) = sparser.encode(&text)
{
node = node.with_sparse_embed(se);
}
}
results.push(BulkNodeEntry {
id: node.id.to_uuid_string(),
label: nb.label,
});
built.push((node, pending_dense));
}
let mut guard = s.repo.lock().map_err(|_| Error::locked())?;
let mut tx = guard.start_transaction();
for (node, pending_dense) in &built {
let cid = tx.add_node(node)?;
if let Some((model, emb)) = pending_dense {
tx.set_embedding(cid, model.clone(), emb.clone())?;
}
}
let commit_start = std::time::Instant::now();
let new_repo = tx.commit(
&body.author,
body.message.as_deref().unwrap_or("mnem http bulk add"),
)?;
s.metrics
.commit_duration
.observe(commit_start.elapsed().as_secs_f64());
let op_id = new_repo.op_id().to_string();
*guard = new_repo;
Ok(Json(BulkNodeResp {
schema: "mnem.v1.post-nodes-bulk",
op_id,
results,
embedded,
skipped_embed,
}))
}
#[derive(Deserialize)]
pub(crate) struct RetrieveQuery {
pub text: Option<String>,
pub label: Option<String>,
#[serde(default)]
pub budget: Option<u32>,
#[serde(default)]
pub limit: Option<usize>,
pub where_eq: Option<String>,
}
pub(crate) async fn retrieve(
State(s): State<AppState>,
Query(q): Query<RetrieveQuery>,
) -> Result<Json<Value>, Error> {
clamp_or_reject("limit", q.limit, MAX_RETRIEVE_LIMIT)?;
let repo = s.repo.lock().map_err(|_| Error::locked())?;
let mut ret = repo.retrieve();
if s.allow_labels
&& let Some(l) = &q.label
{
ret = ret.label(l.clone());
}
if let Some(w) = &q.where_eq {
let (k, v) = parse_kv(w).map_err(Error::bad_request)?;
ret = ret.where_prop(k, PropPredicate::Eq(v));
}
if let Some(b) = q.budget {
ret = ret.token_budget(b);
}
if let Some(n) = q.limit {
ret = ret.limit(n);
}
let mut vector_model: Option<String> = None;
let mut sparse_vocab: Option<String> = None;
if let Some(text) = q.text.as_deref()
&& !text.trim().is_empty()
{
ret = ret.query_text(text.to_string());
if let Some(pc) = &s.embed_cfg {
let embedder = mnem_embed_providers::open(pc)
.map_err(|e| Error::internal(format!("embed provider open failed: {e}")))?;
let qvec = embedder
.embed(text)
.map_err(|e| Error::internal(format!("embed call failed: {e}")))?;
vector_model = Some(embedder.model().to_string());
ret = ret.vector(embedder.model().to_string(), qvec);
}
if let Some(sc) = &s.sparse_cfg {
let sparser = mnem_sparse_providers::open(sc)
.map_err(|e| Error::bad_request(format!("sparse open failed: {e}")))?;
let sq = sparser
.encode_query(text)
.map_err(|e| Error::bad_request(format!("sparse encode failed: {e}")))?;
sparse_vocab = Some(sq.vocab_id.clone());
ret = ret.sparse_query(sq);
}
if vector_model.is_none() && sparse_vocab.is_none() {
let mock = mnem_embed_providers::MockEmbedder::new("mock:cold-start-384", 384);
let qvec = mock
.embed(text)
.map_err(|e| Error::internal(format!("mock embed failed: {e}")))?;
vector_model = Some(mock.model().to_string());
ret = ret.vector(mock.model().to_string(), qvec);
tracing::warn!(
"retrieve: no [embed]/[sparse] configured; using deterministic \
MockEmbedder fallback (cold-start). Configure a real provider \
in config.toml for production retrieval quality."
);
}
}
{
let mut cache = s.indexes.lock().map_err(|_| Error::locked())?;
if let Some(model) = &vector_model {
let idx = cache.vector_index(&repo, model)?;
ret = ret.with_vector_index(idx);
}
if let Some(vocab) = &sparse_vocab {
let idx = cache.sparse_index(&repo, vocab)?;
ret = ret.with_sparse_index(idx);
}
}
let retrieve_start = std::time::Instant::now();
let result = ret.execute()?;
s.metrics
.retrieve_latency
.observe(retrieve_start.elapsed().as_secs_f64());
let items: Vec<Value> = result
.items
.iter()
.map(|item| {
let mut lane_obj = Map::new();
for (lane, score) in &item.lane_scores {
lane_obj.insert(lane_name(*lane).to_string(), json!(score));
}
json!({
"id": item.node.id.to_uuid_string(),
"label": item.node.ntype,
"score": item.score,
"tokens": item.tokens,
"summary": item.node.summary,
"rendered": item.rendered,
"lane_scores": Value::Object(lane_obj),
})
})
.collect();
let score_dist = {
let scores: Vec<f32> = result.items.iter().map(|it| it.score).collect();
mnem_graphrag::distribution_shape(&scores, mnem_graphrag::K_MIN)
};
Ok(Json(json!({
"schema": "mnem.v1.retrieve",
"items": items,
"tokens_used": result.tokens_used,
"tokens_budget": if result.tokens_budget == u32::MAX {
Value::Null
} else {
Value::from(result.tokens_budget)
},
"dropped": result.dropped,
"candidates_seen": result.candidates_seen,
"score_distribution": score_dist,
})))
}
#[derive(Deserialize, Default)]
pub(crate) struct RetrieveRequest {
#[serde(default)]
pub text: Option<String>,
#[serde(default)]
pub label: Option<String>,
#[serde(default)]
pub where_eq: Option<String>,
#[serde(default)]
pub budget: Option<u32>,
#[serde(default)]
pub limit: Option<usize>,
#[serde(default)]
pub vector_cap: Option<usize>,
#[serde(default)]
pub vector_model: Option<String>,
#[serde(default)]
pub vector: Option<Vec<f32>>,
#[serde(default)]
pub rerank: Option<String>,
#[serde(default)]
pub rerank_top_k: Option<usize>,
#[serde(default)]
pub community_filter: Option<bool>,
#[serde(default)]
pub community_min_coverage: Option<f32>,
#[serde(default)]
pub community_expand_seeds: Option<usize>,
#[serde(default)]
pub community_max_per: Option<usize>,
#[serde(default)]
pub community_decay: Option<f32>,
#[serde(default)]
pub graph_expand: Option<usize>,
#[serde(default)]
pub graph_decay: Option<f32>,
#[serde(default)]
pub graph_etype: Option<Vec<String>>,
#[serde(default)]
pub graph_depth: Option<usize>,
#[serde(default)]
pub graph_max_per_seed: Option<usize>,
#[serde(default)]
pub graph_mode: Option<String>,
#[serde(default)]
pub ppr_damping: Option<f32>,
#[serde(default)]
pub ppr_iter: Option<u32>,
#[serde(default)]
pub ppr_opt_in: Option<bool>,
#[serde(default)]
pub summarize: Option<bool>,
#[serde(default)]
pub summarize_k: Option<usize>,
}
pub(crate) async fn retrieve_full(
State(s): State<AppState>,
Json(body): Json<RetrieveRequest>,
) -> Result<Json<Value>, Error> {
clamp_or_reject("limit", body.limit, MAX_RETRIEVE_LIMIT)?;
clamp_or_reject("vector_cap", body.vector_cap, MAX_VECTOR_CAP)?;
clamp_or_reject("rerank_top_k", body.rerank_top_k, MAX_RERANK_TOP_K)?;
let repo = s.repo.lock().map_err(|_| Error::locked())?;
let mut ret = repo.retrieve();
let mut skipped: Vec<String> = Vec::new();
let mut warnings: Vec<mnem_core::retrieve::Warning> = Vec::new();
if s.allow_labels
&& let Some(l) = &body.label
{
ret = ret.label(l.clone());
}
if let Some(w) = &body.where_eq {
let (k, v) = parse_kv(w).map_err(Error::bad_request)?;
ret = ret.where_prop(k, PropPredicate::Eq(v));
}
if let Some(b) = body.budget {
ret = ret.token_budget(b);
}
if let Some(n) = body.limit {
ret = ret.limit(n);
}
if let Some(n) = body.vector_cap {
ret = ret.vector_cap(n);
}
let mut vector_model: Option<String> = None;
let mut sparse_vocab: Option<String> = None;
if let Some(text) = body.text.as_deref()
&& !text.trim().is_empty()
{
ret = ret.query_text(text.to_string());
}
if let (Some(m), Some(v)) = (&body.vector_model, &body.vector) {
vector_model = Some(m.clone());
ret = ret.vector(m.clone(), v.clone());
} else if let Some(text) = body.text.as_deref()
&& !text.trim().is_empty()
&& let Some(pc) = &s.embed_cfg
{
let embedder = mnem_embed_providers::open(pc)
.map_err(|e| Error::bad_request(format!("embed open failed: {e}")))?;
let qvec = embedder
.embed(text)
.map_err(|e| Error::bad_request(format!("embed call failed: {e}")))?;
vector_model = Some(embedder.model().to_string());
ret = ret.vector(embedder.model().to_string(), qvec);
}
if let Some(text) = body.text.as_deref()
&& !text.trim().is_empty()
&& let Some(sc) = &s.sparse_cfg
{
let sparser = mnem_sparse_providers::open(sc)
.map_err(|e| Error::internal(format!("sparse provider open failed: {e}")))?;
let sq = sparser
.encode_query(text)
.map_err(|e| Error::internal(format!("sparse encode failed: {e}")))?;
sparse_vocab = Some(sq.vocab_id.clone());
ret = ret.sparse_query(sq);
}
if body.text.as_deref().is_some_and(|t| !t.trim().is_empty())
&& vector_model.is_none()
&& sparse_vocab.is_none()
&& body.vector.is_none()
{
if let Some(text) = body.text.as_deref() {
let mock = mnem_embed_providers::MockEmbedder::new("mock:cold-start-384", 384);
let qvec = mock
.embed(text)
.map_err(|e| Error::internal(format!("mock embed failed: {e}")))?;
vector_model = Some(mock.model().to_string());
ret = ret.vector(mock.model().to_string(), qvec);
skipped.push(
"embed: cold-start MockEmbedder fallback (no [embed]/[sparse] configured)"
.to_string(),
);
tracing::warn!(
"retrieve_full: no [embed]/[sparse] configured; using deterministic \
MockEmbedder fallback (cold-start). Configure a real provider in \
config.toml for production retrieval quality."
);
}
}
let mut vector_idx_for_graph: Option<std::sync::Arc<mnem_core::index::BruteForceVectorIndex>> =
None;
{
let mut cache = s.indexes.lock().map_err(|_| Error::locked())?;
if let Some(model) = &vector_model {
let idx = cache.vector_index(&repo, model)?;
vector_idx_for_graph = Some(idx.clone());
ret = ret.with_vector_index(idx);
}
if let Some(vocab) = &sparse_vocab {
let idx = cache.sparse_index(&repo, vocab)?;
ret = ret.with_sparse_index(idx);
}
}
if let Some(spec) = &body.rerank {
match parse_rerank_spec(spec) {
Ok(cfg) => match mnem_rerank_providers::open(&cfg) {
Ok(rr) => {
ret = ret.with_reranker(rr);
if let Some(k) = body.rerank_top_k {
ret = ret.rerank_top_k(k);
}
}
Err(e) => {
skipped.push(format!("rerank: {e}"));
warnings.push(mnem_core::retrieve::Warning::for_code(
mnem_core::retrieve::WarningCode::NoReranker,
));
}
},
Err(e) => {
skipped.push(format!("rerank spec: {e}"));
warnings.push(mnem_core::retrieve::Warning::for_code(
mnem_core::retrieve::WarningCode::NoReranker,
));
}
}
}
if body.community_filter.unwrap_or(false) {
let has_vectors = vector_idx_for_graph
.as_deref()
.is_some_and(|v| !v.is_empty());
let has_authored_edges = match s.graph_cache.lock() {
Ok(gc) => gc.adjacency.as_ref().is_some_and(|a| !a.edges.is_empty()),
Err(_) => false,
};
if !has_vectors && !has_authored_edges {
warnings.push(mnem_core::retrieve::Warning::for_code(
mnem_core::retrieve::WarningCode::CommunityFilterNoop,
));
}
let assignment = {
let mut gc = s.graph_cache.lock().map_err(|_| Error::locked())?;
gc.hybrid_community_for(&repo, vector_idx_for_graph.as_deref())?
};
let expand_seeds = body.community_expand_seeds.unwrap_or(3);
let max_per_community = body.community_max_per.unwrap_or(10);
let decay = body.community_decay.unwrap_or(0.85).clamp(0.0, 1.0);
let min_coverage = body.community_min_coverage.unwrap_or(0.5).clamp(0.0, 1.0);
let cfg = mnem_core::retrieve::CommunityFilterCfg {
enabled: true,
expand_seeds,
max_per_community,
decay,
min_coverage,
};
let lookup_handle_fwd = assignment.clone();
let lookup_handle_inv = assignment.clone();
let lookup = std::sync::Arc::new(mnem_core::retrieve::CommunityLookup::new_with_members(
move |nid| lookup_handle_fwd.community_of(*nid),
move |cid| lookup_handle_inv.members_of(cid).to_vec(),
));
ret = ret.with_community_filter(cfg, lookup);
}
let want_ppr = body
.graph_mode
.as_deref()
.is_some_and(|m| m.eq_ignore_ascii_case("ppr"));
if want_ppr {
let has_vectors = vector_idx_for_graph
.as_deref()
.is_some_and(|v| !v.is_empty());
let has_authored_edges = match s.graph_cache.lock() {
Ok(gc) => gc.adjacency.as_ref().is_some_and(|a| !a.edges.is_empty()),
Err(_) => false,
};
if !has_vectors && !has_authored_edges {
warnings.push(mnem_core::retrieve::Warning::for_code(
mnem_core::retrieve::WarningCode::PprNoSubstrate,
));
}
let adj = {
let mut gc = s.graph_cache.lock().map_err(|_| Error::locked())?;
gc.hybrid_adjacency_for(&repo, vector_idx_for_graph.as_deref())?
};
ret = ret.with_adjacency_index(adj);
}
if let Some(max_expand) = body.graph_expand {
let has_authored_edges = match s.graph_cache.lock() {
Ok(gc) => gc.adjacency.as_ref().is_some_and(|a| !a.edges.is_empty()),
Err(_) => false,
};
if !has_authored_edges {
warnings.push(mnem_core::retrieve::Warning::for_code(
mnem_core::retrieve::WarningCode::AuthoredAdjacencyEmpty,
));
}
let mut cfg = mnem_core::retrieve::GraphExpand {
max_expand,
decay: body
.graph_decay
.unwrap_or(mnem_core::retrieve::GraphExpand::DEFAULT_DECAY),
etype_filter: body.graph_etype.clone(),
..Default::default()
};
if let Some(depth) = body.graph_depth {
cfg = cfg.with_depth(depth);
}
if let Some(cap) = body.graph_max_per_seed {
cfg = cfg.with_max_per_seed(cap);
}
if let Some(mode) = body.graph_mode.as_deref()
&& mode == "ppr"
{
let damping = body.ppr_damping.unwrap_or(mnem_core::ppr::DEFAULT_DAMPING);
let iter = body.ppr_iter.unwrap_or(mnem_core::ppr::DEFAULT_MAX_ITER);
cfg = cfg.with_ppr(damping, iter, mnem_core::ppr::DEFAULT_EPS);
}
ret = ret.with_graph_expand(cfg);
}
ret = ret.with_ppr_opt_in(body.ppr_opt_in.unwrap_or(false));
let retrieve_start = std::time::Instant::now();
let result = ret.execute()?;
s.metrics
.retrieve_latency
.observe(retrieve_start.elapsed().as_secs_f64());
if result.ppr_size_gate_skipped {
warnings.push(mnem_core::retrieve::Warning::for_code(
mnem_core::retrieve::WarningCode::PprSizeGateSkipped,
));
s.metrics
.ppr_size_gate_skipped
.get_or_create(&crate::metrics::PprSizeGateLabels {
reason: "above_threshold".into(),
})
.inc();
}
let items: Vec<Value> = result
.items
.iter()
.map(|item| {
let mut lane_obj = Map::new();
for (lane, score) in &item.lane_scores {
lane_obj.insert(lane_name(*lane).to_string(), json!(score));
}
json!({
"id": item.node.id.to_uuid_string(),
"label": item.node.ntype,
"score": item.score,
"tokens": item.tokens,
"summary": item.node.summary,
"rendered": item.rendered,
"lane_scores": Value::Object(lane_obj),
})
})
.collect();
let score_dist = {
let scores: Vec<f32> = result.items.iter().map(|it| it.score).collect();
mnem_graphrag::distribution_shape(&scores, mnem_graphrag::K_MIN)
};
let warnings = mnem_core::retrieve::cap_warnings(warnings);
let warnings_json: Vec<Value> = warnings
.iter()
.map(|w| {
json!({
"code": w.code.as_str(),
"knob": w.knob,
"message": w.message,
"remediation_ref": w.remediation_ref,
})
})
.collect();
let gap01_confidence = gap01_compute_confidence(&result.items);
let gap01_neighbors = gap01_suggested_neighbors(&result.items);
let gap01_community_density = 0.0_f32;
let gap01_session_reservoir_ttl_s = mnem_core::retrieve::session_reservoir::IDLE_TTL.as_secs();
let mut response = json!({
"schema": "mnem.v1.retrieve",
"items": items,
"tokens_used": result.tokens_used,
"tokens_budget": if result.tokens_budget == u32::MAX {
Value::Null
} else {
Value::from(result.tokens_budget)
},
"dropped": result.dropped,
"score_distribution": score_dist,
"candidates_seen": result.candidates_seen,
"skipped": skipped,
"confidence": gap01_confidence,
"suggested_neighbors": gap01_neighbors,
"community_density": gap01_community_density,
"session_reservoir_ttl_s": gap01_session_reservoir_ttl_s,
});
if !warnings_json.is_empty() {
response["warnings"] = Value::Array(warnings_json);
}
if body.summarize.unwrap_or(false) {
let k = body.summarize_k.unwrap_or(3).min(MAX_RETRIEVE_LIMIT);
let mut sentences: Vec<String> = Vec::new();
let mut centrality_weights: Vec<f32> = Vec::new();
let degree_map: Option<std::collections::HashMap<NodeId, u32>> = if want_ppr {
if let Ok(gc) = s.graph_cache.lock() {
gc.adjacency.as_ref().map(|adj| {
let mut m: std::collections::HashMap<NodeId, u32> =
std::collections::HashMap::new();
for (src, dst) in &adj.edges {
*m.entry(*src).or_insert(0) += 1;
*m.entry(*dst).or_insert(0) += 1;
}
m
})
} else {
None
}
} else {
None
};
for it in &result.items {
if let Some(summary) = it.node.summary.clone() {
sentences.push(summary);
let w = if want_ppr {
it.score.max(0.0)
} else if let Some(m) = °ree_map {
m.get(&it.node.id).copied().unwrap_or(0) as f32
} else {
1.0_f32
};
centrality_weights.push(w);
}
}
if sentences.is_empty() {
response["summary"] = json!([]);
} else if let Some(pc) = &s.embed_cfg {
match mnem_embed_providers::open(pc) {
Ok(embedder) => {
let centrality_vec = centrality_weights.clone();
let centrality =
move |i: usize| centrality_vec.get(i).copied().unwrap_or(1.0_f32);
match mnem_graphrag::summarize_community(
&sentences,
embedder.as_ref(),
None, ¢rality,
k,
0.5,
) {
Ok(summary) => {
let arr: Vec<Value> = summary
.sentences
.iter()
.zip(summary.scores.iter())
.map(|(s, score)| json!({"sentence": s, "score": score}))
.collect();
response["summary"] = Value::Array(arr);
}
Err(e) => {
response["summary"] = json!([]);
response["summarize_skipped"] = json!(format!("summarize failed: {e}"));
}
}
}
Err(e) => {
response["summary"] = json!([]);
response["summarize_skipped"] =
json!(format!("embed provider open failed: {e}"));
}
}
} else {
response["summary"] = json!([]);
response["summarize_skipped"] = json!("no [embed] provider configured on server");
}
}
Ok(Json(response))
}
fn parse_rerank_spec(spec: &str) -> Result<mnem_rerank_providers::ProviderConfig, String> {
let (prov, model) = spec
.split_once(':')
.ok_or_else(|| format!("expected PROVIDER:MODEL, got `{spec}`"))?;
if model.is_empty() {
return Err(format!("empty model in `{spec}`"));
}
match prov {
"cohere" => Ok(mnem_rerank_providers::ProviderConfig::Cohere(
mnem_rerank_providers::CohereConfig {
model: model.into(),
..Default::default()
},
)),
"voyage" => Ok(mnem_rerank_providers::ProviderConfig::Voyage(
mnem_rerank_providers::VoyageConfig {
model: model.into(),
..Default::default()
},
)),
"jina" => Ok(mnem_rerank_providers::ProviderConfig::Jina(
mnem_rerank_providers::JinaConfig {
model: model.into(),
..Default::default()
},
)),
other => Err(format!(
"unknown rerank provider `{other}`; want cohere|voyage|jina"
)),
}
}
fn ipld_to_json(v: &Ipld) -> Value {
match v {
Ipld::Null => Value::Null,
Ipld::Bool(b) => Value::Bool(*b),
Ipld::Integer(i) => serde_json::Number::from_i128(*i).map_or(Value::Null, Value::Number),
Ipld::Float(f) => serde_json::Number::from_f64(*f).map_or(Value::Null, Value::Number),
Ipld::String(s) => Value::String(s.clone()),
Ipld::Bytes(b) => Value::String(format!("<{} bytes>", b.len())),
Ipld::List(xs) => Value::Array(xs.iter().map(ipld_to_json).collect()),
Ipld::Map(m) => {
let mut out = Map::new();
for (k, v) in m {
out.insert(k.clone(), ipld_to_json(v));
}
Value::Object(out)
}
Ipld::Link(cid) => Value::String(cid.to_string()),
}
}
fn parse_kv(s: &str) -> Result<(String, Ipld), String> {
let (k, v) = s
.split_once('=')
.ok_or_else(|| format!("expected KEY=VALUE, got `{s}`"))?;
let val = match serde_json::from_str::<Value>(v) {
Ok(json) => json_to_ipld(&json).map_err(|e| e.to_string())?,
Err(_) => Ipld::String(v.to_string()),
};
Ok((k.to_string(), val))
}
pub(crate) const GAP01_TOP_SEEDS: usize = 3;
pub(crate) const GAP01_MAX_NEIGHBOURS: usize = 3;
pub(crate) const GAP01_PREVIEW_CHARS: usize = 200;
pub(crate) fn gap01_compute_confidence(items: &[mnem_core::retrieve::RetrievedItem]) -> f32 {
if items.len() < 2 {
return 0.0;
}
let top = items[0].score;
if !top.is_finite() || top <= 0.0 {
return 0.0;
}
let tail = items[items.len() - 1].score.max(0.0);
(1.0 - (tail / top)).clamp(0.0, 1.0)
}
pub(crate) fn gap01_suggested_neighbors(
items: &[mnem_core::retrieve::RetrievedItem],
) -> Vec<Value> {
items
.iter()
.skip(GAP01_TOP_SEEDS)
.take(GAP01_MAX_NEIGHBOURS)
.map(|it| {
let preview: String = it.rendered.chars().take(GAP01_PREVIEW_CHARS).collect();
json!({
"id": it.node.id.to_uuid_string(),
"preview": preview,
"via": "adjacency",
})
})
.collect()
}
pub(crate) const DEFAULT_SERIALIZATION_RATE_BYTES_PER_MS: u64 = 4_096;
pub(crate) const DEFAULT_LATENCY_BUDGET_MS: u32 = 256;
pub(crate) const EXPLAIN_ADJACENCY_CAP: usize = 256;
pub(crate) const EXPLAIN_MAX_DEPTH: u16 = 8;
#[derive(Serialize, Deserialize, Clone, Copy, Debug, Default, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ExplainMode {
#[default]
Compact,
CompactFull,
}
#[derive(Deserialize, Debug)]
pub(crate) struct ExplainRequest {
pub node_id: String,
#[serde(default = "default_explain_depth")]
pub depth: u16,
#[serde(default)]
pub mode: ExplainMode,
#[serde(default)]
pub latency_budget_ms: Option<u32>,
#[serde(default)]
pub serialization_rate_bytes_per_ms: Option<u64>,
}
fn default_explain_depth() -> u16 {
3
}
#[must_use]
pub fn derive_max_path_bytes(remaining_ms: u32, serialization_rate_bytes_per_ms: u64) -> usize {
u64::from(remaining_ms)
.saturating_mul(serialization_rate_bytes_per_ms)
.try_into()
.unwrap_or(usize::MAX)
}
pub(crate) async fn explain(
State(s): State<AppState>,
Json(body): Json<ExplainRequest>,
) -> Result<Json<Value>, Error> {
let seed = NodeId::parse_uuid(&body.node_id)
.map_err(|e| Error::bad_request(format!("invalid node_id UUID: {e}")))?;
let depth = body.depth.min(EXPLAIN_MAX_DEPTH);
let rate = body
.serialization_rate_bytes_per_ms
.filter(|&r| r > 0)
.unwrap_or(DEFAULT_SERIALIZATION_RATE_BYTES_PER_MS);
let budget_ms = body
.latency_budget_ms
.filter(|&m| m > 0)
.unwrap_or(DEFAULT_LATENCY_BUDGET_MS);
let max_bytes = derive_max_path_bytes(budget_ms, rate);
let (effective_mode, mode_warning): (ExplainMode, Option<&'static str>) = match body.mode {
ExplainMode::Compact => (ExplainMode::Compact, None),
ExplainMode::CompactFull => (
ExplainMode::Compact,
Some("compact_full requested but no ACL is configured; falling back to compact"),
),
};
let repo = s.repo.lock().map_err(|_| Error::locked())?;
let mut nodes: Vec<NodeId> = vec![seed];
let mut visited: std::collections::HashMap<NodeId, u32> = std::collections::HashMap::new();
visited.insert(seed, 0);
let mut steps: Vec<(u16, u32)> = Vec::new();
let mut truncated_reason: Option<&'static str> = None;
let mut frontier: Vec<u32> = vec![0];
'bfs: for _hop in 0..depth {
let mut next_frontier: Vec<u32> = Vec::new();
for &parent_idx in &frontier {
let parent_node = nodes[parent_idx as usize];
let edges = repo
.incoming_edges_capped(&parent_node, None, EXPLAIN_ADJACENCY_CAP)
.map_err(Error::from)?;
for edge in edges {
let from = edge.src;
if visited.contains_key(&from) {
continue;
}
let projected =
steps.len().saturating_mul(32) + nodes.len().saturating_mul(40) + 32;
if projected > max_bytes {
truncated_reason = Some("response_budget");
break 'bfs;
}
let new_idx: u32 = nodes.len().try_into().unwrap_or(u32::MAX);
nodes.push(from);
visited.insert(from, new_idx);
steps.push((u16::try_from(parent_idx).unwrap_or(u16::MAX), new_idx));
next_frontier.push(new_idx);
}
}
if next_frontier.is_empty() {
break;
}
frontier = next_frontier;
}
if truncated_reason.is_none() && depth == EXPLAIN_MAX_DEPTH && !frontier.is_empty() {
truncated_reason = Some("depth");
}
drop(repo);
let nodes_wire: Vec<Value> = nodes
.iter()
.map(|n| Value::String(n.to_uuid_string()))
.collect();
let steps_wire: Vec<Value> = steps
.iter()
.map(|(p, t)| {
json!({
"parent_idx": p,
"to_idx": t,
})
})
.collect();
let mut warnings: Vec<Value> = Vec::new();
if let Some(w) = mode_warning {
warnings.push(json!({
"code": "explain.mode_downgraded",
"message": w,
}));
}
let mode_str = match effective_mode {
ExplainMode::Compact => "compact",
ExplainMode::CompactFull => "compact_full",
};
Ok(Json(json!({
"schema": "mnem.v1.explain",
"seed": seed.to_uuid_string(),
"mode": mode_str,
"path_source":
format!("bfs.v1:graph_depth={depth}:edge_source=adjacency.v1"),
"max_path_bytes_total": max_bytes,
"latency_budget_ms": budget_ms,
"serialization_rate_bytes_per_ms": rate,
"nodes": nodes_wire,
"steps": steps_wire,
"path_truncated": truncated_reason.is_some(),
"path_truncated_reason": truncated_reason,
"warnings": warnings,
})))
}
#[cfg(test)]
mod gap01_tests {
use super::*;
use mnem_core::id::NodeId;
use mnem_core::objects::Node;
use mnem_core::retrieve::RetrievedItem;
use proptest::prelude::*;
fn fake_item(score: f32) -> RetrievedItem {
let node = Node::new(NodeId::new_v7(), "Gap01Probe");
RetrievedItem::new(node, "rendered preview".to_string(), 4, score)
}
#[test]
fn confidence_zero_on_empty() {
assert_eq!(gap01_compute_confidence(&[]), 0.0);
}
#[test]
fn confidence_zero_on_singleton() {
assert_eq!(gap01_compute_confidence(&[fake_item(1.0)]), 0.0);
}
#[test]
fn confidence_high_when_tail_far_below_top() {
let items = vec![fake_item(1.0), fake_item(0.9), fake_item(0.01)];
let c = gap01_compute_confidence(&items);
assert!(c > 0.9, "expected >0.9, got {c}");
}
#[test]
fn confidence_low_when_flat() {
let items = vec![fake_item(1.0), fake_item(0.99), fake_item(0.98)];
let c = gap01_compute_confidence(&items);
assert!(c < 0.1, "expected <0.1, got {c}");
}
#[test]
fn suggested_neighbors_empty_below_top_seeds() {
let items = vec![fake_item(1.0), fake_item(0.9), fake_item(0.8)];
assert!(gap01_suggested_neighbors(&items).is_empty());
}
#[test]
fn suggested_neighbors_skips_top_seeds() {
let items = vec![
fake_item(1.0),
fake_item(0.9),
fake_item(0.8),
fake_item(0.7),
fake_item(0.6),
];
let n = gap01_suggested_neighbors(&items);
assert_eq!(n.len(), 2);
for entry in &n {
assert_eq!(entry["via"], "adjacency");
}
}
#[test]
fn suggested_neighbors_bounded_by_max() {
let items: Vec<_> = (0..100).map(|i| fake_item(1.0 - i as f32 * 0.01)).collect();
let n = gap01_suggested_neighbors(&items);
assert!(n.len() <= GAP01_MAX_NEIGHBOURS);
}
proptest! {
#[test]
fn suggested_neighbors_always_subset_of_adjacency(
scores in proptest::collection::vec(-1.0f32..1.0f32, 0..32),
) {
let items: Vec<_> = scores.iter().map(|&s| fake_item(s)).collect();
let neighbours = gap01_suggested_neighbors(&items);
let ids: Vec<String> = items
.iter()
.map(|it| it.node.id.to_uuid_string())
.collect();
for entry in &neighbours {
let nid = entry["id"].as_str().expect("id field");
prop_assert!(
ids.iter().any(|i| i == nid),
"neighbour id {nid} not in adjacency"
);
}
prop_assert!(neighbours.len() <= GAP01_MAX_NEIGHBOURS);
}
}
}