#![allow(clippy::too_many_lines)]
use crate::models::field_names;
use axum::{
Json,
extract::{Query, State},
http::StatusCode,
response::IntoResponse,
};
use serde_json::json;
use crate::db;
use crate::models::{RecallBody, RecallQuery, RecallRequest};
use crate::validate;
use super::AppState;
#[cfg(feature = "sal")]
use super::StorageBackend;
#[cfg(feature = "sal")]
use super::store_err_to_response;
fn splice_recall_scope_into(req: &mut RecallRequest, app: &AppState) -> Option<String> {
let want_splice = req.session_default.unwrap_or(false);
let scope_opt: Option<&crate::config::RecallScope> = if want_splice {
app.recall_scope.as_ref().as_ref()
} else {
None
};
if req.namespace.is_none() {
req.namespace = scope_opt
.and_then(|s| s.namespaces.as_ref())
.and_then(|v| v.first())
.cloned();
}
if req.since.is_none() {
req.since = scope_opt.and_then(|s| {
s.since.as_deref().and_then(|d| {
crate::config::parse_duration_string(d).map(|dur| {
let cutoff = chrono::Utc::now() - dur;
cutoff.to_rfc3339()
})
})
});
}
let tier = scope_opt.and_then(|s| s.tier.clone());
if req.limit.is_none()
&& let Some(v) = scope_opt.and_then(|s| s.limit)
{
req.limit = Some(i64::from(v));
}
tier
}
pub async fn recall_memories_get(
State(app): State<AppState>,
headers: axum::http::HeaderMap,
Query(p): Query<RecallQuery>,
) -> impl IntoResponse {
let mut req = RecallRequest::from_http_query(&p);
if req.context.trim().is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "context (or query) is required"})),
)
.into_response();
}
if let Some(ref a) = req.as_agent
&& let Err(e) = validate::validate_namespace(a)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("as_agent", e)})),
)
.into_response();
}
let format = match crate::toon::WireFormat::parse_http(req.format.as_deref()) {
Ok(f) => f,
Err(e) => return crate::handlers::wire_format::invalid_format_response(&e),
};
let scope_tier = splice_recall_scope_into(&mut req, &app);
let kinds = p.resolved_kinds();
let caller_principal = match crate::handlers::parity::resolve_caller_agent_id(
None,
&headers,
req.as_agent.as_deref(),
) {
Ok(p) => p,
Err(e) => {
return (axum::http::StatusCode::FORBIDDEN, Json(json!({"error": e}))).into_response();
}
};
let provenance_shape = crate::handlers::accept_provenance::resolve_from_headers(&headers);
recall_response(
&app,
&req,
Some(caller_principal.as_str()),
scope_tier.as_deref(),
kinds.as_deref(),
provenance_shape,
format,
)
.await
}
pub async fn recall_memories_post(
State(app): State<AppState>,
headers: axum::http::HeaderMap,
Json(body): Json<RecallBody>,
) -> impl IntoResponse {
let mut req = RecallRequest::from_http_body(&body);
if req.context.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "context (or query) is required"})),
)
.into_response();
}
if let Some(ref a) = req.as_agent
&& let Err(e) = validate::validate_namespace(a)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("as_agent", e)})),
)
.into_response();
}
let format = match crate::toon::WireFormat::parse_http(req.format.as_deref()) {
Ok(f) => f,
Err(e) => return crate::handlers::wire_format::invalid_format_response(&e),
};
let scope_tier = splice_recall_scope_into(&mut req, &app);
let kinds = body.resolved_kinds();
let caller_principal = match crate::handlers::parity::resolve_caller_agent_id(
None,
&headers,
req.as_agent.as_deref(),
) {
Ok(p) => p,
Err(e) => {
return (axum::http::StatusCode::FORBIDDEN, Json(json!({"error": e}))).into_response();
}
};
let provenance_shape = crate::handlers::accept_provenance::resolve_from_headers(&headers);
recall_response(
&app,
&req,
Some(caller_principal.as_str()),
scope_tier.as_deref(),
kinds.as_deref(),
provenance_shape,
format,
)
.await
}
async fn recall_response(
app: &AppState,
req: &RecallRequest,
caller_principal: Option<&str>,
recall_scope_tier: Option<&str>,
kinds_filter: Option<&[crate::models::MemoryKind]>,
provenance_shape: crate::handlers::accept_provenance::ProvenanceShape,
format: crate::toon::WireFormat,
) -> axum::response::Response {
let context = req.context.as_str();
let namespace = req.namespace.as_deref();
let limit = req.resolved_limit().min(50);
let tags = req.tags.as_deref();
let since = req.since.as_deref();
let until = req.until.as_deref();
let as_agent = req.as_agent.as_deref();
let budget_tokens = req.resolved_budget_tokens();
let has_citations = req.has_citations.unwrap_or(false);
let source_uri_prefix = req.source_uri_prefix.as_deref();
let session_id = req.session_id.as_deref();
let session_tracker = crate::reranker::global_session_recall_tracker();
#[cfg(not(feature = "sal"))]
let _ = recall_scope_tier;
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let query_emb: Option<Vec<f32>> = if let Some(emb) = app.embedder.as_ref().as_ref() {
match emb.embed_query(context) {
Ok(v) => Some(v),
Err(e) => {
tracing::warn!("recall (postgres): embed failed, keyword-only: {e}");
None
}
}
} else {
None
};
let mode = if query_emb.is_some() {
"hybrid"
} else {
"keyword"
};
let ctx_caller = crate::store::CallerContext::for_agent(
as_agent
.or(caller_principal)
.unwrap_or(crate::identity::sentinels::DAEMON_PRINCIPAL)
.to_string(),
);
let mut filter = crate::store::Filter {
namespace: namespace.map(str::to_string),
limit,
..Default::default()
};
if let Some(t) = recall_scope_tier
&& let Some(parsed) = crate::models::Tier::from_str(t)
{
filter.tier = Some(parsed);
}
if let Some(t) = tags {
filter.tags_any = t
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_string)
.collect();
}
if let Some(s) = since
&& let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s)
{
filter.since = Some(dt.into());
}
if let Some(u) = until
&& let Ok(dt) = chrono::DateTime::parse_from_rfc3339(u)
{
filter.until = Some(dt.into());
}
return match app
.store
.recall_hybrid(&ctx_caller, context, query_emb.as_deref(), &filter)
.await
{
Ok(scored_pairs) => {
let scored_pairs = crate::cli::recall::apply_form4_recall_filters(
scored_pairs,
has_citations,
source_uri_prefix,
);
let scored_pairs: Vec<_> = match kinds_filter {
None => scored_pairs,
Some(allowed) => scored_pairs
.into_iter()
.filter(|(m, _)| allowed.contains(&m.memory_kind))
.collect(),
};
let scored_pairs = crate::reranker::apply_session_recency_boost(
scored_pairs,
session_id,
session_tracker,
);
let touch_ids: Vec<String> =
scored_pairs.iter().map(|(m, _)| m.id.clone()).collect();
if provenance_shape.is_verbose() {
tracing::info!(
"recall (postgres): Accept-Provenance: verbose received; \
postgres-side verbose decoration not yet implemented — \
shipping bare Form 4/5/6 envelope. Sqlite path supports verbose."
);
}
let scored: Vec<serde_json::Value> = scored_pairs
.iter()
.filter_map(|(m, s)| match serde_json::to_value(m) {
Ok(mut v) => {
if let Some(obj) = v.as_object_mut() {
obj.insert(
"score".to_string(),
json!(
(*s * crate::SCORE_DISPLAY_ROUND_FACTOR).round()
/ crate::SCORE_DISPLAY_ROUND_FACTOR
),
);
}
Some(v)
}
Err(e) => {
tracing::error!(
memory_id = %m.id,
"recall (postgres): serialise Memory failed, skipping row: {e}"
);
None
}
})
.collect();
if let Err(e) = app.store.touch_after_recall(&touch_ids).await {
tracing::warn!("recall (postgres): touch_after_recall failed: {e}");
}
let mut resp = json!({
"memories": scored,
"count": scored.len(),
(field_names::TOKENS_USED): 0,
"mode": mode,
(field_names::STORAGE_BACKEND): "postgres",
});
if let Some(b) = budget_tokens {
resp[field_names::BUDGET_TOKENS] = json!(b);
}
crate::handlers::wire_format::memories_response(format, resp)
}
Err(e) => store_err_to_response(e),
};
}
let query_emb: Option<Vec<f32>> = if let Some(emb) = app.embedder.as_ref().as_ref() {
match emb.embed_query(context) {
Ok(v) => Some(v),
Err(e) => {
tracing::warn!("recall: embedder query failed, falling back to keyword-only: {e}");
None
}
}
} else {
None
};
let precomputed_hits: Option<Vec<crate::hnsw::VectorHit>> = if let Some(ref qe) = query_emb {
let vi_guard = app.vector_index.lock().await;
let hits = if let Some(idx) = vi_guard.as_ref() {
let ann_limit = (limit * 5).max(50);
idx.search(qe, ann_limit)
} else {
Vec::new()
};
Some(hits)
} else {
None
};
let (result, mode) = {
let lock = app.db.lock().await;
let short_extend = lock.2.short_extend_secs;
let mid_extend = lock.2.mid_extend_secs;
let (result, mode) = if let Some(ref qe) = query_emb {
let hits = precomputed_hits
.as_deref()
.expect("precomputed_hits set when query_emb is Some");
let r = db::recall_hybrid_precomputed_hnsw(
&lock.0,
context,
qe,
namespace,
limit,
tags,
since,
until,
hits,
short_extend,
mid_extend,
as_agent.or(caller_principal),
budget_tokens,
app.scoring.as_ref(),
false,
source_uri_prefix,
);
(r, "hybrid")
} else {
let r = db::recall(
&lock.0,
context,
namespace,
limit,
tags,
since,
until,
short_extend,
mid_extend,
as_agent.or(caller_principal),
budget_tokens,
false,
source_uri_prefix,
);
(r, "keyword")
};
(result, mode)
};
match result {
Ok((r, outcome)) => {
let r =
crate::cli::recall::apply_form4_recall_filters(r, has_citations, source_uri_prefix);
let r: Vec<_> = match kinds_filter {
None => r,
Some(allowed) => r
.into_iter()
.filter(|(m, _)| allowed.contains(&m.memory_kind))
.collect(),
};
let r = crate::reranker::apply_session_recency_boost(r, session_id, session_tracker);
let scored: Vec<serde_json::Value> = if provenance_shape.is_verbose() {
let lock = app.db.lock().await;
let out = crate::mcp::decorate_memory_many(&r, true, &lock.0);
drop(lock);
out
} else {
r.iter()
.filter_map(|(m, s)| match serde_json::to_value(m) {
Ok(mut v) => {
if let Some(obj) = v.as_object_mut() {
obj.insert(
"score".to_string(),
json!(
(*s * crate::SCORE_DISPLAY_ROUND_FACTOR).round()
/ crate::SCORE_DISPLAY_ROUND_FACTOR
),
);
}
Some(v)
}
Err(e) => {
tracing::error!(
memory_id = %m.id,
"recall (sqlite): serialise Memory failed, skipping row: {e}"
);
None
}
})
.collect()
};
let mut resp = json!({
"memories": scored,
"count": scored.len(),
(field_names::TOKENS_USED): outcome.tokens_used,
"mode": mode,
});
if let Some(b) = budget_tokens {
resp[field_names::BUDGET_TOKENS] = json!(b);
resp["meta"] = json!({
"budget_tokens_used": outcome.tokens_used,
"budget_tokens_remaining": outcome.tokens_remaining.unwrap_or(0),
(field_names::MEMORIES_DROPPED): outcome.memories_dropped,
"budget_overflow": outcome.budget_overflow,
});
}
crate::handlers::wire_format::memories_response(format, resp)
}
Err(e) => crate::handlers::errors::handler_error_500(&e),
}
}