#![allow(clippy::too_many_lines)]
use crate::models::field_names;
use axum::{
Json,
extract::State,
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
};
use serde_json::json;
use crate::db;
use crate::models::{Memory, Tier};
use crate::profile::Family;
use crate::validate;
use super::AppState;
#[cfg(feature = "sal")]
use super::MAX_BULK_SIZE;
#[cfg(feature = "sal")]
use super::StorageBackend;
#[cfg(feature = "sal")]
use super::store_err_to_response;
const AUTO_TAG_MAX_TAGS: usize = 8;
#[derive(serde::Deserialize)]
pub struct ConsolidateBody {
pub ids: Vec<String>,
pub title: String,
#[serde(default)]
pub summary: Option<String>,
#[serde(default = "default_ns")]
pub namespace: String,
#[serde(default)]
pub tier: Option<Tier>,
#[serde(default)]
pub agent_id: Option<String>,
#[serde(default)]
pub use_llm: bool,
}
fn default_ns() -> String {
crate::DEFAULT_NAMESPACE.to_string()
}
async fn resolve_consolidate_summary(
app: &AppState,
ids: &[String],
caller_principal: &str,
) -> Result<String, Response> {
let pairs = fetch_consolidate_source_pairs(app, ids, caller_principal).await?;
let llm_arc = app.llm.clone();
if llm_arc.is_none() || pairs.is_empty() {
let titles: Vec<String> = pairs.iter().map(|(t, _)| t.clone()).collect();
return Ok(format!(
"Consolidated summary of {} memories: {}",
titles.len(),
titles.join("; ")
));
}
let llm_timeout = app.llm_call_timeout;
let join = tokio::time::timeout(llm_timeout, async move {
let Some(llm) = llm_arc.as_ref() else {
return Ok::<String, anyhow::Error>(String::new());
};
llm.summarize_memories_async(&pairs).await
})
.await;
match join {
Ok(Ok(s)) if !s.trim().is_empty() => Ok(s),
Err(_) => {
tracing::warn!(
"H8: LLM call (summarize_memories) exceeded {}s timeout — falling back to \
deterministic concat",
llm_timeout.as_secs()
);
Ok("Consolidated summary (LLM timeout; deterministic fallback)".to_string())
}
Ok(_) => {
Ok("Consolidated summary (LLM unavailable; deterministic fallback)".to_string())
}
}
}
async fn fetch_consolidate_source_pairs(
app: &AppState,
ids: &[String],
caller_principal: &str,
) -> Result<Vec<(String, String)>, Response> {
#[cfg(not(feature = "sal"))]
let _ = caller_principal;
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let caller = crate::store::CallerContext::for_agent(caller_principal.to_string());
let mut out: Vec<(String, String)> = Vec::with_capacity(ids.len());
for id in ids {
match app.store.get(&caller, id).await {
Ok(mem) => out.push((mem.title, mem.content)),
Err(crate::store::StoreError::NotFound { .. }) => {
return Err((
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::memory_not_found(id)})),
)
.into_response());
}
Err(e) => return Err(store_err_to_response(e)),
}
}
return Ok(out);
}
let lock = app.db.lock().await;
let mut out: Vec<(String, String)> = Vec::with_capacity(ids.len());
for id in ids {
match db::get(&lock.0, id) {
Ok(Some(mem)) => out.push((mem.title, mem.content)),
Ok(None) => {
return Err((
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::memory_not_found(id)})),
)
.into_response());
}
Err(e) => {
tracing::error!("consolidate source lookup failed: {e}");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
)
.into_response());
}
}
}
Ok(out)
}
async fn consolidate_fanout(
fed: Option<&crate::federation::FederationConfig>,
mem: &crate::models::Memory,
source_ids: &[String],
) -> Option<axum::response::Response> {
let fed = fed?;
match crate::federation::broadcast_consolidate_quorum(fed, mem, source_ids).await {
Ok(tracker) => {
if let Err(err) = crate::federation::finalise_quorum(&tracker) {
let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
return Some(super::quorum_not_met_response(&payload));
}
}
Err(e) => {
tracing::warn!("consolidate fanout error (local committed): {e:?}");
}
}
None
}
pub async fn consolidate_memories(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<ConsolidateBody>,
) -> impl IntoResponse {
let consolidate_caller_principal =
crate::handlers::parity::resolve_caller_agent_id(None, &headers, None)
.unwrap_or_else(|_| crate::identity::sentinels::ANONYMOUS_INVALID.to_string());
let summary = match body.summary.clone() {
Some(s) if !s.is_empty() => s,
_ => {
match resolve_consolidate_summary(&app, &body.ids, &consolidate_caller_principal).await
{
Ok(s) => s,
Err(resp) => return resp,
}
}
};
if let Err(e) = validate::RequestValidator::validate_consolidate(
&body.ids,
&body.title,
&summary,
&body.namespace,
) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let consolidator_agent_id = match crate::identity::resolve_http_agent_id(None, header_agent_id)
{
Ok(id) => id,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
)
.into_response();
}
};
if let Some(claimed) = body.agent_id.as_deref()
&& claimed != consolidator_agent_id
{
return (
StatusCode::FORBIDDEN,
Json(json!({"error": crate::errors::msg::AGENT_ID_BODY_MISMATCH})),
)
.into_response();
}
let tier = body.tier.unwrap_or(Tier::Long);
let source_ids = body.ids.clone();
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let ctx = crate::store::CallerContext::for_agent(&consolidator_agent_id);
let new_id = match app
.store
.consolidate(
&ctx,
&body.ids,
&body.title,
&summary,
&body.namespace,
&tier,
crate::db::CONSOLIDATION_SOURCE,
&consolidator_agent_id,
)
.await
{
Ok(new_id) => new_id,
Err(e) => return store_err_to_response(e),
};
if app.federation.is_some() {
if let Ok(mem) = app.store.get(&ctx, &new_id).await {
if let Some(resp) =
consolidate_fanout(app.federation.as_ref().as_ref(), &mem, &source_ids).await
{
return resp;
}
}
}
return (
StatusCode::CREATED,
Json(json!({
"id": new_id,
(field_names::CONSOLIDATED): body.ids.len(),
"summary": summary,
"content": summary,
"memory": {
"id": new_id,
"title": body.title,
"content": summary,
"namespace": body.namespace,
},
(field_names::STORAGE_BACKEND): "postgres",
})),
)
.into_response();
}
let lock = app.db.lock().await;
let consolidate_result = db::consolidate(
&lock.0,
&body.ids,
&body.title,
&summary,
&body.namespace,
&tier,
crate::db::CONSOLIDATION_SOURCE,
&consolidator_agent_id,
);
let new_mem = match &consolidate_result {
Ok(new_id) => db::get(&lock.0, new_id).ok().flatten(),
Err(_) => None,
};
if let Ok(new_id) = &consolidate_result {
let details = serde_json::to_value(crate::subscriptions::ConsolidatedEventDetails {
source_ids: source_ids.clone(),
source_count: source_ids.len(),
})
.ok();
crate::subscriptions::dispatch_event_with_details(
&lock.0,
crate::subscriptions::webhook_events::MEMORY_CONSOLIDATED,
new_id,
&body.namespace,
Some(&consolidator_agent_id),
&lock.1,
details,
);
}
drop(lock);
match consolidate_result {
Ok(new_id) => {
if let Some(mem) = new_mem {
if let Some(resp) =
consolidate_fanout(app.federation.as_ref().as_ref(), &mem, &source_ids).await
{
return resp;
}
}
(
StatusCode::CREATED,
Json(json!({
"id": new_id,
(field_names::CONSOLIDATED): body.ids.len(),
"summary": summary,
"content": summary,
"memory": {
"id": new_id,
"title": body.title,
"content": summary,
"namespace": body.namespace,
},
})),
)
.into_response()
}
Err(e) => crate::handlers::errors::handler_error_500(&e),
}
}
#[derive(serde::Deserialize, Default)]
pub struct AutoTagBody {
#[serde(default)]
pub memory_id: Option<String>,
#[serde(default)]
pub namespace: Option<String>,
#[serde(default)]
pub title: Option<String>,
#[serde(default)]
pub content: Option<String>,
}
pub async fn auto_tag_handler(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<AutoTagBody>,
) -> impl IntoResponse {
if app.llm.is_none() {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({"error": "LLM not configured"})),
)
.into_response();
}
let auto_tag_caller_principal =
crate::handlers::parity::resolve_caller_agent_id(None, &headers, None)
.unwrap_or_else(|_| crate::identity::sentinels::ANONYMOUS_INVALID.to_string());
let (title, content, resolved_id): (String, String, Option<String>) =
if let Some(id) = body.memory_id.as_deref() {
if let Err(e) = validate::validate_id(id) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
match fetch_memory_for_handler(&app, id, &auto_tag_caller_principal).await {
Ok(mem) => (mem.title, mem.content, Some(id.to_string())),
Err(resp) => return resp,
}
} else {
match (body.title.clone(), body.content.clone()) {
(Some(t), Some(c)) => (t, c, None),
_ => {
return (
StatusCode::BAD_REQUEST,
Json(json!({
"error": "auto_tag requires memory_id (preferred) or title+content"
})),
)
.into_response();
}
}
};
let llm_arc = app.llm.clone();
let auto_tag_model = app.auto_tag_model.as_ref().clone();
let title_owned = title;
let content_owned = content;
let llm_timeout = app.llm_call_timeout;
let join = tokio::time::timeout(llm_timeout, async move {
let Some(llm) = llm_arc.as_ref() else {
return Ok::<Vec<String>, anyhow::Error>(Vec::new());
};
llm.auto_tag_async(&title_owned, &content_owned, auto_tag_model.as_deref())
.await
})
.await;
let tags = match join {
Ok(Ok(tags)) => tags.into_iter().take(AUTO_TAG_MAX_TAGS).collect::<Vec<_>>(),
Ok(Err(e)) => {
tracing::warn!("L6: auto_tag LLM call failed: {e}");
return (
StatusCode::BAD_GATEWAY,
Json(json!({"error": format!("LLM auto_tag failed: {e}")})),
)
.into_response();
}
Err(_) => {
tracing::warn!(
"H8: LLM call (auto_tag) exceeded {}s timeout — returning empty tag list",
llm_timeout.as_secs()
);
Vec::new()
}
};
(
StatusCode::OK,
Json(json!({
"tags": tags,
"memory_id": resolved_id,
})),
)
.into_response()
}
#[derive(serde::Deserialize, Default)]
pub struct ExpandQueryBody {
pub query: String,
#[serde(default)]
pub namespace: Option<String>,
}
pub async fn expand_query_handler(
State(app): State<AppState>,
Json(body): Json<ExpandQueryBody>,
) -> impl IntoResponse {
if app.llm.is_none() {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({"error": "LLM not configured"})),
)
.into_response();
}
let query = body.query.trim().to_string();
if query.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::QUERY_REQUIRED})),
)
.into_response();
}
let llm_arc = app.llm.clone();
let query_owned = query.clone();
let llm_timeout = app.llm_call_timeout;
let join = tokio::time::timeout(llm_timeout, async move {
let Some(llm) = llm_arc.as_ref() else {
return Ok::<Vec<String>, anyhow::Error>(Vec::new());
};
llm.expand_query_async(&query_owned).await
})
.await;
let expanded_terms = match join {
Ok(Ok(terms)) => terms,
Ok(Err(e)) => {
tracing::warn!("L6: expand_query LLM call failed: {e}");
return (
StatusCode::BAD_GATEWAY,
Json(json!({"error": format!("LLM expand_query failed: {e}")})),
)
.into_response();
}
Err(_) => {
tracing::warn!(
"H8: LLM call (expand_query) exceeded {}s timeout — returning empty expansion list",
llm_timeout.as_secs()
);
Vec::new()
}
};
(
StatusCode::OK,
Json(json!({
"original": query,
(field_names::EXPANDED_TERMS): expanded_terms,
})),
)
.into_response()
}
async fn fetch_memory_for_handler(
app: &AppState,
id: &str,
caller_principal: &str,
) -> Result<Memory, Response> {
#[cfg(not(feature = "sal"))]
let _ = caller_principal;
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let caller = crate::store::CallerContext::for_agent(caller_principal.to_string());
return match app.store.get(&caller, id).await {
Ok(mem) => Ok(mem),
Err(crate::store::StoreError::NotFound { .. }) => Err((
StatusCode::NOT_FOUND,
Json(json!({"error": crate::errors::msg::memory_not_found(id)})),
)
.into_response()),
Err(e) => Err(store_err_to_response(e)),
};
}
let lock = app.db.lock().await;
match db::get(&lock.0, id) {
Ok(Some(mem)) => Ok(mem),
Ok(None) => Err((
StatusCode::NOT_FOUND,
Json(json!({"error": crate::errors::msg::memory_not_found(id)})),
)
.into_response()),
Err(e) => {
tracing::error!("memory lookup failed: {e}");
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
)
.into_response())
}
}
}
#[derive(serde::Deserialize)]
pub struct LoadFamilyBody {
pub family: String,
#[serde(default)]
pub namespace: Option<String>,
#[serde(default)]
pub k: Option<u64>,
}
pub async fn load_family_handler(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<LoadFamilyBody>,
) -> impl IntoResponse {
use std::str::FromStr;
let family = match Family::from_str(&body.family) {
Ok(f) => f,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
};
if let Some(ref ns) = body.namespace
&& let Err(e) = validate::validate_namespace(ns)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
let k_raw = body.k.unwrap_or(20);
let k = usize::try_from(k_raw).unwrap_or(usize::MAX).clamp(1, 100);
let family_name = family.name();
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let filter = crate::store::Filter {
namespace: body.namespace.clone(),
tier: None,
tags_any: Vec::new(),
agent_id: None,
since: None,
until: None,
limit: MAX_BULK_SIZE,
};
let ctx = crate::handlers::parity::http_caller_ctx(&headers, None);
return match app.store.list(&ctx, &filter).await {
Ok(all) => {
let mut filtered: Vec<Memory> = all
.into_iter()
.filter(|m| {
m.metadata.get("family").and_then(serde_json::Value::as_str)
== Some(family_name)
})
.collect();
filtered.sort_by(|a, b| {
b.priority
.cmp(&a.priority)
.then_with(|| b.updated_at.cmp(&a.updated_at))
});
filtered.truncate(k);
let count = filtered.len();
Json(json!({
"family": family_name,
"namespace": body.namespace,
"k": k,
"count": count,
"memories": filtered,
}))
.into_response()
}
Err(e) => store_err_to_response(e),
};
}
let caller =
crate::handlers::parity::resolve_caller_agent_id(None, &headers, None).unwrap_or_default();
let lock = app.db.lock().await;
let params = json!({
"family": family_name,
"namespace": body.namespace,
"k": k,
});
match crate::mcp::handle_load_family(&lock.0, ¶ms, Some(caller.as_str())) {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": e}))).into_response(),
}
}