#![allow(clippy::too_many_lines)]
use crate::models::ConfidenceSource;
use crate::models::field_names;
use axum::{
Json,
extract::{Query, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use chrono::{Duration, Utc};
use serde_json::json;
use std::sync::Arc;
use uuid::Uuid;
use crate::db;
use crate::models::{CreateMemory, ForgetQuery, ListQuery, Memory, SearchQuery};
use crate::validate;
use super::AppState;
use super::BULK_FANOUT_CONCURRENCY;
#[cfg(feature = "sal")]
use super::StorageBackend;
#[cfg(feature = "sal")]
use super::store_err_to_response;
use crate::visibility::is_visible_to_caller;
pub async fn list_memories(
State(app): State<AppState>,
headers: HeaderMap,
Query(p): Query<ListQuery>,
) -> impl IntoResponse {
if let Some(ref aid) = p.agent_id
&& let Err(e) = validate::validate_agent_id(aid)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid agent_id filter: {e}")})),
)
.into_response();
}
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let caller = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
Ok(id) => id,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
)
.into_response();
}
};
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
if p.offset.unwrap_or(0) > 0 {
tracing::debug!(
"list_memories on postgres: ?offset is unsupported on the SAL trait; ignored"
);
}
if p.min_priority.is_some() {
tracing::debug!(
"list_memories on postgres: ?min_priority is unsupported on the SAL trait; ignored"
);
}
let limit = p.limit.unwrap_or(20).min(app.max_page_size);
let since = p
.since
.as_deref()
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|d| d.with_timezone(&chrono::Utc));
let until = p
.until
.as_deref()
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|d| d.with_timezone(&chrono::Utc));
let filter = crate::store::Filter {
namespace: p.namespace.clone(),
tier: p.tier.clone(),
tags_any: p
.tags
.as_deref()
.map(|s| s.split(',').map(str::to_string).collect())
.unwrap_or_default(),
agent_id: p.agent_id.clone(),
since,
until,
limit,
};
let ctx = crate::store::CallerContext::for_agent(&caller);
return match app.store.list(&ctx, &filter).await {
Ok(mems) => {
let visible: Vec<Memory> = mems
.into_iter()
.filter(|m| is_visible_to_caller(m, &caller))
.collect();
Json(json!({"memories": &visible, "count": visible.len()})).into_response()
}
Err(e) => store_err_to_response(e),
};
}
let lock = app.db.lock().await;
let limit = p.limit.unwrap_or(20).min(app.max_page_size);
match db::list(
&lock.0,
p.namespace.as_deref(),
p.tier.as_ref(),
limit,
p.offset.unwrap_or(0),
p.min_priority,
p.since.as_deref(),
p.until.as_deref(),
p.tags.as_deref(),
p.agent_id.as_deref(),
) {
Ok(mems) => {
let visible: Vec<Memory> = mems
.into_iter()
.filter(|m| is_visible_to_caller(m, &caller))
.collect();
Json(json!({"memories": &visible, "count": visible.len()})).into_response()
}
Err(e) => crate::handlers::errors::handler_error_500(&e),
}
}
pub async fn search_memories(
State(app): State<AppState>,
headers: axum::http::HeaderMap,
Query(p): Query<SearchQuery>,
) -> impl IntoResponse {
let source_uri_empty = p.source_uri.as_deref().is_none_or(|s| s.trim().is_empty());
if p.q.trim().is_empty() && source_uri_empty {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "query or source_uri is required"})),
)
.into_response();
}
if let Some(ref aid) = p.agent_id
&& let Err(e) = validate::validate_agent_id(aid)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid agent_id filter: {e}")})),
)
.into_response();
}
if let Some(ref a) = p.as_agent
&& let Err(e) = validate::validate_namespace(a)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("as_agent", e)})),
)
.into_response();
}
let format = match crate::toon::WireFormat::parse_http(p.format.as_deref()) {
Ok(f) => f,
Err(e) => return crate::handlers::wire_format::invalid_format_response(&e),
};
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let limit = p.limit.unwrap_or(20).min(app.max_page_size);
let since = p
.since
.as_deref()
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|d| d.with_timezone(&chrono::Utc));
let until = p
.until
.as_deref()
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|d| d.with_timezone(&chrono::Utc));
let filter = crate::store::Filter {
namespace: p.namespace.clone(),
tier: p.tier.clone(),
tags_any: p
.tags
.as_deref()
.map(|s| s.split(',').map(str::to_string).collect())
.unwrap_or_default(),
agent_id: p.agent_id.clone(),
since,
until,
limit,
};
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let caller = crate::identity::resolve_http_agent_id(None, header_agent_id)
.unwrap_or_else(|_| crate::identity::anonymous_request_id());
let ctx = crate::store::CallerContext {
agent_id: caller,
as_agent: p.as_agent.clone(),
request_id: None,
bypass_visibility: false,
};
return match app.store.search(&ctx, &p.q, &filter).await {
Ok(r) => crate::handlers::wire_format::search_response(
format,
json!({"results": r, "count": r.len(), "query": p.q}),
),
Err(e) => store_err_to_response(e),
};
}
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let effective_as_agent: Option<String> = p
.as_agent
.clone()
.or_else(|| crate::identity::resolve_http_agent_id(None, header_agent_id).ok());
let lock = app.db.lock().await;
let limit = p.limit.unwrap_or(20).min(app.max_page_size);
let source_uri = p
.source_uri
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty());
if let Some(uri) = source_uri {
if let Err(e) = validate::validate_source_uri(uri) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid source_uri filter: {e}")})),
)
.into_response();
}
if p.q.trim().is_empty() {
return match db::list_by_source_uri(
&lock.0,
uri,
p.namespace.as_deref(),
Some(limit),
effective_as_agent.as_deref(),
) {
Ok(r) => crate::handlers::wire_format::search_response(
format,
json!({"results": r, "count": r.len(), (field_names::SOURCE_URI): uri}),
),
Err(e) => crate::handlers::errors::handler_error_500(&e),
};
}
}
match db::search_with_source_uri(
&lock.0,
&p.q,
p.namespace.as_deref(),
p.tier.as_ref(),
limit,
p.min_priority,
p.since.as_deref(),
p.until.as_deref(),
p.tags.as_deref(),
p.agent_id.as_deref(),
effective_as_agent.as_deref(),
false,
source_uri,
) {
Ok(r) => crate::handlers::wire_format::search_response(
format,
json!({"results": r, "count": r.len(), "query": p.q}),
),
Err(e) => crate::handlers::errors::handler_error_500(&e),
}
}
pub async fn forget_memories(
State(app): State<AppState>,
headers: axum::http::HeaderMap,
Json(body): Json<ForgetQuery>,
) -> impl IntoResponse {
if let Err(resp) = crate::handlers::admin_role::require_admin(&app, &headers, "forget_memories")
{
return resp;
}
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let archive_flag = {
let lock = app.db.lock().await;
lock.3
};
let ctx = crate::handlers::parity::http_caller_ctx(&headers, None);
return match app
.store
.forget(
&ctx,
body.namespace.as_deref(),
body.pattern.as_deref(),
body.tier.as_ref(),
archive_flag,
)
.await
{
Ok(n) => Json(json!({"deleted": n})).into_response(),
Err(e) => store_err_to_response(e),
};
}
let lock = app.db.lock().await;
match db::forget(
&lock.0,
body.namespace.as_deref(),
body.pattern.as_deref(),
body.tier.as_ref(),
lock.3, ) {
Ok(n) => Json(json!({"deleted": n})).into_response(),
Err(e) => (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response(),
}
}
pub async fn bulk_create(
State(app): State<AppState>,
headers: HeaderMap,
Json(bodies): Json<Vec<CreateMemory>>,
) -> impl IntoResponse {
if bodies.len() > app.max_page_size {
return (
StatusCode::BAD_REQUEST,
Json(
json!({"error": format!("bulk operations limited to {} items", app.max_page_size)}),
),
)
.into_response();
}
let now = Utc::now();
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let caller = crate::identity::resolve_http_agent_id(None, header_agent_id)
.unwrap_or_else(|_| crate::identity::anonymous_request_id());
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let ctx = crate::store::CallerContext::for_agent(caller.clone());
let mut errors: Vec<String> = Vec::new();
let mut pending: Vec<serde_json::Value> = Vec::new();
let mut allowed: Vec<Memory> = Vec::new();
for body in bodies {
if let Err(e) = validate::RequestValidator::validate_create(&body) {
tracing::warn!("bulk_create(postgres): validate_create failed: {e}");
errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
continue;
}
let expires_at = body.expires_at.clone().or_else(|| {
body.ttl_secs
.map(|s| (now + Duration::seconds(s)).to_rfc3339())
});
let mut metadata_stamped = body.metadata;
if let Some(obj) = metadata_stamped.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String(caller.clone()),
);
}
let memory_kind = body
.kind
.as_deref()
.and_then(crate::models::MemoryKind::from_str)
.unwrap_or_default();
let citations = body.citations;
let source_uri = body.source_uri;
let source_span = body.source_span;
let mem = Memory {
id: Uuid::new_v4().to_string(),
tier: body.tier,
namespace: body.namespace,
title: body.title,
content: body.content,
tags: body.tags,
priority: body.priority.clamp(1, 10),
confidence: body
.confidence
.unwrap_or(crate::models::DEFAULT_CONFIDENCE)
.clamp(0.0, 1.0),
source: body.source,
access_count: 0,
created_at: now.to_rfc3339(),
updated_at: now.to_rfc3339(),
last_accessed_at: None,
expires_at,
metadata: metadata_stamped,
reflection_depth: 0,
memory_kind,
entity_id: None,
persona_version: None,
citations,
source_uri,
source_span,
confidence_source: if body.confidence.is_some() {
ConfidenceSource::CallerProvided
} else {
ConfidenceSource::Default
},
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
use crate::models::GovernanceDecision;
let agent_id = mem
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or(crate::identity::sentinels::DAEMON_PRINCIPAL);
let payload_for_pending = serde_json::to_value(&mem).unwrap_or_else(|_| json!({}));
match app
.store
.enforce_governance_action(
crate::store::GovernedAction::Store,
&mem.namespace,
agent_id,
None,
None,
&payload_for_pending,
)
.await
{
Ok(GovernanceDecision::Allow) => {}
Ok(GovernanceDecision::Deny(refusal)) => {
errors.push(format!(
"{}: bulk_create denied by governance: {reason}",
mem.title,
reason = refusal.reason,
));
continue;
}
Ok(GovernanceDecision::Pending(pending_id)) => {
pending.push(json!({
"title": mem.title,
"namespace": mem.namespace,
(field_names::PENDING_ID): pending_id,
}));
continue;
}
Err(e) => {
errors.push(format!("{}: governance error: {e}", mem.title));
continue;
}
}
allowed.push(mem);
}
let created: usize = if allowed.is_empty() {
0
} else {
match app.store.store_batch(&ctx, &allowed).await {
Ok(ids) => ids.len(),
Err(e) => {
tracing::warn!("bulk_create(postgres): store_batch failed: {e}");
errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
0
}
}
};
return Json(json!({
"created": created,
"errors": errors,
"pending": pending,
}))
.into_response();
}
let mut created_mems: Vec<Memory> = Vec::new();
let mut errors: Vec<String> = Vec::new();
{
let lock = app.db.lock().await;
for body in bodies {
if let Err(e) = validate::RequestValidator::validate_create(&body) {
tracing::warn!("bulk_create: validate_create failed: {e}");
errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
continue;
}
let expires_at = body.expires_at.or_else(|| {
body.ttl_secs
.or(lock.2.ttl_for_tier(&body.tier))
.map(|s| (now + Duration::seconds(s)).to_rfc3339())
});
let mut metadata_stamped = body.metadata;
if let Some(obj) = metadata_stamped.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String(caller.clone()),
);
}
let memory_kind = body
.kind
.as_deref()
.and_then(crate::models::MemoryKind::from_str)
.unwrap_or_default();
let citations = body.citations;
let source_uri = body.source_uri;
let source_span = body.source_span;
let mem = Memory {
id: Uuid::new_v4().to_string(),
tier: body.tier,
namespace: body.namespace,
title: body.title,
content: body.content,
tags: body.tags,
priority: body.priority.clamp(1, 10),
confidence: body
.confidence
.unwrap_or(crate::models::DEFAULT_CONFIDENCE)
.clamp(0.0, 1.0),
source: body.source,
access_count: 0,
created_at: now.to_rfc3339(),
updated_at: now.to_rfc3339(),
last_accessed_at: None,
expires_at,
metadata: metadata_stamped,
reflection_depth: 0,
memory_kind,
entity_id: None,
persona_version: None,
citations,
source_uri,
source_span,
confidence_source: if body.confidence.is_some() {
ConfidenceSource::CallerProvided
} else {
ConfidenceSource::Default
},
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
match db::insert(&lock.0, &mem) {
Ok(_) => created_mems.push(mem),
Err(e) => {
tracing::warn!("bulk_create: db::insert failed: {e}");
errors.push(super::sanitize_bulk_row_error(&e.to_string()).to_string());
}
}
}
}
if let Some(fed) = app.federation.as_ref() {
let sem = Arc::new(tokio::sync::Semaphore::new(BULK_FANOUT_CONCURRENCY));
let mut joins: tokio::task::JoinSet<(String, Result<(), String>)> =
tokio::task::JoinSet::new();
for mem in &created_mems {
let fed = fed.clone();
let mem = mem.clone();
let sem = sem.clone();
joins.spawn(async move {
let Ok(_permit) = sem.acquire_owned().await else {
return (mem.id.clone(), Err("fanout semaphore closed".to_string()));
};
let id = mem.id.clone();
let outcome = match crate::federation::broadcast_store_quorum(&fed, &mem).await {
Ok(tracker) => match crate::federation::finalise_quorum(&tracker) {
Ok(_) => Ok(()),
Err(err) => Err(err.to_string()),
},
Err(e) => {
tracing::warn!(
"bulk_create: fanout for {id} failed (local committed): {e:?}"
);
Ok(())
}
};
(id, outcome)
});
}
while let Some(res) = joins.join_next().await {
match res {
Ok((id, Err(err))) => errors.push(format!("{id}: {err}")),
Ok((_, Ok(()))) => {}
Err(e) => tracing::warn!("bulk_create: fanout task join error: {e:?}"),
}
}
if !created_mems.is_empty() {
let catchup_errors = crate::federation::bulk_catchup_push(fed, &created_mems).await;
for (peer_id, err) in catchup_errors {
errors.push(format!("catchup to {peer_id}: {err}"));
}
}
}
Json(json!({"created": created_mems.len(), "errors": errors})).into_response()
}