#![allow(clippy::too_many_lines)]
use crate::models::field_names;
use axum::{
Json,
extract::State,
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use chrono::{Duration, Utc};
use serde_json::json;
use uuid::Uuid;
use crate::db;
use crate::embeddings::EmbedStatus;
#[cfg(test)]
use crate::models::Tier;
use crate::models::{CreateMemory, Memory};
use crate::validate;
#[cfg(feature = "sal")]
use super::StorageBackend;
use super::maybe_auto_tag;
#[cfg(feature = "sal")]
use super::store_err_to_response;
use super::{AppState, JsonOrBadRequest};
fn resolve_create_agent_id(
headers: &HeaderMap,
body: &CreateMemory,
) -> Result<(String, serde_json::Value), axum::response::Response> {
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let metadata_agent_id = body
.metadata
.get("agent_id")
.and_then(serde_json::Value::as_str)
.map(str::to_string);
let agent_id = crate::identity::resolve_http_agent_id(None, header_agent_id).map_err(|e| {
(
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
)
.into_response()
})?;
if let Some(claimed) = body.agent_id.as_deref()
&& claimed != agent_id
{
return Err((
StatusCode::FORBIDDEN,
Json(json!({"error": crate::errors::msg::AGENT_ID_BODY_MISMATCH})),
)
.into_response());
}
if let Some(claimed) = metadata_agent_id.as_deref()
&& claimed != agent_id
{
return Err((
StatusCode::FORBIDDEN,
Json(json!({"error": "metadata.agent_id does not match authenticated caller"})),
)
.into_response());
}
let mut metadata = body.metadata.clone();
if let Some(obj) = metadata.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String(agent_id.clone()),
);
}
if let Some(ref s) = body.scope {
validate::validate_scope(s).map_err(|e| {
(
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response()
})?;
if let Some(obj) = metadata.as_object_mut() {
obj.insert("scope".to_string(), serde_json::Value::String(s.clone()));
}
}
Ok((agent_id, metadata))
}
fn embed_create_before_lock(
app: &AppState,
title: &str,
content: &str,
) -> (Option<Vec<f32>>, EmbedStatus) {
let embedding_text = crate::embeddings::embedding_document(title, content);
match app.embedder.as_ref().as_ref() {
None => (None, EmbedStatus::Indexed),
Some(emb) => emb.embed_with_status(&embedding_text),
}
}
fn resolve_create_conflict_title(
conn: &rusqlite::Connection,
body: &CreateMemory,
on_conflict_mode: crate::mcp::tools::OnConflictMode,
) -> Result<String, axum::response::Response> {
use crate::mcp::tools::OnConflictMode;
match on_conflict_mode {
OnConflictMode::Error => {
match db::find_by_title_namespace(conn, &body.title, &body.namespace) {
Ok(Some(existing_id)) => Err((
StatusCode::CONFLICT,
Json(json!({
"code": crate::errors::error_codes::CONFLICT,
"error": format!(
"memory with title '{}' already exists in namespace '{}'",
body.title, body.namespace
),
"existing_id": existing_id,
})),
)
.into_response()),
Ok(None) => Ok(body.title.clone()),
Err(e) => {
tracing::error!("on_conflict lookup failed: {e}");
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "conflict check failed"})),
)
.into_response())
}
}
}
OnConflictMode::Version => db::next_versioned_title(conn, &body.title, &body.namespace)
.map_err(|e| {
tracing::error!("on_conflict=version failed: {e}");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "could not pick a versioned title"})),
)
.into_response()
}),
OnConflictMode::Merge => Ok(body.title.clone()),
}
}
async fn enforce_create_governance<'a>(
app: &AppState,
lock: tokio::sync::MutexGuard<
'a,
(
rusqlite::Connection,
std::path::PathBuf,
crate::config::ResolvedTtl,
bool,
),
>,
mem: &Memory,
) -> Result<
tokio::sync::MutexGuard<
'a,
(
rusqlite::Connection,
std::path::PathBuf,
crate::config::ResolvedTtl,
bool,
),
>,
axum::response::Response,
> {
use crate::models::{GovernanceDecision, GovernedAction};
let agent_for_gov = mem
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let payload = match super::to_value_or_500("create_memory.governance.payload", mem) {
Ok(v) => v,
Err(resp) => return Err(resp),
};
match db::enforce_governance(
&lock.0,
GovernedAction::Store,
&mem.namespace,
&agent_for_gov,
None,
None,
&payload,
) {
Ok(GovernanceDecision::Allow) => Ok(lock),
Ok(GovernanceDecision::Deny(refusal)) => Err((
StatusCode::FORBIDDEN,
Json(json!({"error": crate::governance::deny_message(
"store",
crate::governance::DenyGate::Governance,
&refusal.reason,
)})),
)
.into_response()),
Ok(GovernanceDecision::Pending(pending_id)) => {
let pending_row = db::get_pending_action(&lock.0, &pending_id).ok().flatten();
crate::subscriptions::dispatch_approval_requested(&lock.0, &pending_id, &lock.1);
let namespace = mem.namespace.clone();
drop(lock);
if let (Some(pa), Some(fed)) = (pending_row.as_ref(), app.federation.as_ref()) {
match crate::federation::broadcast_pending_quorum(fed, pa).await {
Ok(tracker) => {
if let Err(err) = crate::federation::finalise_quorum(&tracker) {
let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
return Err(super::quorum_not_met_response(&payload));
}
}
Err(err) => {
let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
return Err(super::quorum_not_met_response(&payload));
}
}
}
Err((
StatusCode::ACCEPTED,
Json(json!({
"status": "pending",
(field_names::PENDING_ID): pending_id,
"reason": crate::errors::msg::GOVERNANCE_REQUIRES_APPROVAL,
"action": "store",
"namespace": namespace,
})),
)
.into_response())
}
Err(e) => Err(crate::handlers::errors::governance_error_500(&e)),
}
}
fn insert_create_with_quota(
lock: &tokio::sync::MutexGuard<
'_,
(
rusqlite::Connection,
std::path::PathBuf,
crate::config::ResolvedTtl,
bool,
),
>,
mem: &Memory,
embedding: &Option<Vec<f32>>,
) -> Result<String, axum::response::Response> {
let quota_agent_id = mem
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let raw_payload_bytes = mem.title.len()
+ mem.content.len()
+ serde_json::to_string(&mem.metadata)
.map(|s| s.len())
.unwrap_or(0);
let payload_bytes = match i64::try_from(raw_payload_bytes) {
Ok(v) => v,
Err(_) => {
tracing::warn!(
agent_id = %quota_agent_id,
raw_bytes = raw_payload_bytes,
"quota byte-count saturated at i64::MAX for agent={}; \
metadata may be excessively large",
if quota_agent_id.is_empty() {
"<anonymous>"
} else {
quota_agent_id.as_str()
}
);
i64::MAX
}
};
let quota_op = crate::quotas::QuotaOp::Memory {
bytes: payload_bytes,
};
if !quota_agent_id.is_empty() {
if let Err(e) =
crate::quotas::check_and_record(&lock.0, "a_agent_id, &mem.namespace, quota_op)
{
return Err(match e {
crate::quotas::QuotaCheckError::Quota(qe) => (
StatusCode::TOO_MANY_REQUESTS,
Json(json!({
"code": crate::errors::error_codes::QUOTA_EXCEEDED,
"error": qe.to_string(),
"limit": qe.limit.as_str(),
"current": qe.current,
"max": qe.max,
"agent_id": qe.agent_id,
})),
)
.into_response(),
crate::quotas::QuotaCheckError::Sql(se) => {
tracing::error!("quota substrate error: {se}");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "quota check failed"})),
)
.into_response()
}
});
}
}
match db::insert(&lock.0, mem) {
Ok(actual_id) => {
if let Some(vec) = embedding.as_ref()
&& let Err(e) = db::set_embedding(&lock.0, &actual_id, vec)
{
tracing::warn!("failed to store embedding for {actual_id}: {e}");
}
Ok(actual_id)
}
Err(e) => {
if !quota_agent_id.is_empty() {
if let Err(re) =
crate::quotas::refund_op(&lock.0, "a_agent_id, &mem.namespace, quota_op)
{
crate::quotas::log_refund_op_failed("a_agent_id, &re);
}
}
if let Some(refusal) = e.downcast_ref::<crate::storage::GovernanceRefusal>() {
tracing::info!(
"create_memory refused by substrate governance: {}",
refusal.reason
);
return Err((
StatusCode::FORBIDDEN,
Json(json!({
"code": crate::errors::error_codes::GOVERNANCE_REFUSED,
"error": refusal.reason,
})),
)
.into_response());
}
Err(crate::handlers::errors::handler_error_500(&e))
}
}
}
async fn fanout_and_assemble_create_response(
app: &AppState,
mem: &Memory,
actual_id: &str,
embedding: Option<Vec<f32>>,
auto_tags: &[String],
contradiction_ids: Vec<String>,
embed_status: EmbedStatus,
) -> axum::response::Response {
let shipped = match (&embedding, app.embedder.as_ref().as_ref()) {
(Some(vec), Some(emb)) => Some(crate::federation::ShippedEmbedding::new(
actual_id.to_string(),
emb.model_description(),
vec.clone(),
)),
_ => None,
};
if let Some(vec) = embedding {
let mut idx_lock = app.vector_index.lock().await;
if let Some(idx) = idx_lock.as_mut() {
idx.insert(actual_id.to_string(), vec);
}
}
let resolved_agent_id = mem
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.map(str::to_string);
crate::audit::emit(crate::audit::EventBuilder::new(
crate::audit::AuditAction::Store,
crate::audit::actor(
resolved_agent_id.clone().unwrap_or_default(),
"http_body",
mem.metadata
.get("scope")
.and_then(|v| v.as_str())
.map(str::to_string),
),
crate::audit::target_memory(
actual_id.to_string(),
mem.namespace.clone(),
Some(mem.title.clone()),
Some(mem.tier.to_string()),
mem.metadata
.get("scope")
.and_then(|v| v.as_str())
.map(str::to_string),
),
));
let mut response = json!({
"id": actual_id,
"tier": mem.tier,
"namespace": mem.namespace,
"title": mem.title,
"agent_id": resolved_agent_id,
});
if !contradiction_ids.is_empty() {
response["potential_contradictions"] = json!(contradiction_ids);
}
if !auto_tags.is_empty() {
response["auto_tags"] = json!(auto_tags);
}
if embed_status.is_degraded() {
response["embed_status"] = json!(embed_status.as_str());
let reason = embed_status.reason();
if !reason.is_empty() {
response["embed_status_reason"] = json!(reason);
}
}
{
let lock = app.db.lock().await;
crate::subscriptions::dispatch_event(
&lock.0,
crate::mcp::registry::tool_names::MEMORY_STORE,
actual_id,
&mem.namespace,
resolved_agent_id.as_deref(),
&lock.1,
);
}
if let Some(fed) = app.federation.as_ref() {
let mut mem_echo = mem.clone();
mem_echo.id = actual_id.to_string();
match crate::federation::broadcast_store_quorum_with_embedding(
fed,
&mem_echo,
shipped.as_ref(),
)
.await
{
Ok(tracker) => match crate::federation::finalise_quorum(&tracker) {
Ok(got) => {
response["quorum_acks"] = json!(got);
return (StatusCode::CREATED, Json(response)).into_response();
}
Err(err) => {
let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
return super::quorum_not_met_response(&payload);
}
},
Err(err) => {
let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
return super::quorum_not_met_response(&payload);
}
}
}
(StatusCode::CREATED, Json(response)).into_response()
}
#[cfg(feature = "sal")]
async fn create_memory_postgres(
app: &AppState,
body: &CreateMemory,
agent_id: &str,
metadata: serde_json::Value,
) -> axum::response::Response {
let now = Utc::now();
let auto_tags =
maybe_auto_tag(app, &body.title, &body.content, &body.tags, &body.namespace).await;
let mut final_tags = body.tags.clone();
for t in &auto_tags {
if !final_tags.iter().any(|existing| existing == t) {
final_tags.push(t.clone());
}
}
let mut mem = Memory {
id: Uuid::new_v4().to_string(),
tier: body.tier.clone(),
namespace: body.namespace.clone(),
title: body.title.clone(),
content: body.content.clone(),
tags: final_tags,
priority: body.priority,
confidence: body.resolved_confidence(),
source: body.source.clone(),
access_count: 0,
created_at: now.to_rfc3339(),
updated_at: now.to_rfc3339(),
last_accessed_at: None,
expires_at: body.expires_at.clone(),
metadata,
reflection_depth: 0,
memory_kind: body
.kind
.as_deref()
.and_then(crate::models::MemoryKind::from_str)
.unwrap_or_default(),
entity_id: None,
persona_version: None,
citations: body.citations.clone(),
source_uri: body.source_uri.clone(),
source_span: body.source_span.clone(),
confidence_source: body.resolved_confidence_source(),
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
{
let presented_sig = body
.signature
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty());
if let Some(sig_b64) = presented_sig {
let (sig_bytes, signed_created_at) = match crate::identity::attest::prepare_signed_store(
sig_b64,
body.created_at.as_deref(),
) {
Ok(v) => v,
Err(msg) => {
return (StatusCode::BAD_REQUEST, Json(json!({"error": msg}))).into_response();
}
};
mem.created_at = signed_created_at.to_string();
if let Err(e) = crate::identity::attest::stamp_attestation_async(
app.store.as_ref(),
&mut mem,
agent_id,
Some(&sig_bytes),
)
.await
{
return (
StatusCode::FORBIDDEN,
Json(json!({
"code": crate::errors::error_codes::ATTESTATION_FAILED,
"error": e.to_string(),
})),
)
.into_response();
}
} else if crate::identity::attest::require_agent_attestation_enabled()
&& let Err(e) = crate::identity::attest::stamp_attestation_async(
app.store.as_ref(),
&mut mem,
agent_id,
None,
)
.await
{
return (
StatusCode::FORBIDDEN,
Json(json!({
"code": crate::errors::error_codes::ATTESTATION_FAILED,
"error": e.to_string(),
})),
)
.into_response();
}
}
let ctx = crate::store::CallerContext::for_agent(agent_id.to_string());
let embedding_text = crate::embeddings::embedding_document(&mem.title, &mem.content);
let embedding: Option<Vec<f32>> = match app.embedder.as_ref().as_ref() {
None => None,
Some(emb) => emb.embed(&embedding_text).ok(),
};
let payload_for_pending = serde_json::to_value(&mem).unwrap_or_else(|_| json!({}));
match app
.store
.enforce_governance_action(
crate::store::GovernedAction::Store,
&mem.namespace,
agent_id,
None,
None,
&payload_for_pending,
)
.await
{
Ok(crate::models::GovernanceDecision::Allow) => {}
Ok(crate::models::GovernanceDecision::Deny(refusal)) => {
return (
StatusCode::FORBIDDEN,
Json(json!({"error": format!("denied: {reason}", reason = refusal.reason)})),
)
.into_response();
}
Ok(crate::models::GovernanceDecision::Pending(pending_id)) => {
return (
StatusCode::ACCEPTED,
Json(json!({
"status": "pending",
(field_names::PENDING_ID): pending_id,
"namespace": mem.namespace,
(field_names::STORAGE_BACKEND): "postgres",
})),
)
.into_response();
}
Err(e) => return store_err_to_response(e),
}
let store_fut = app
.store
.store_with_embedding(&ctx, &mem, embedding.as_deref());
let (id, quorum_outcome) = match app.federation.as_ref() {
Some(fed) => {
tracing::debug!(
target: crate::federation::SYNC_TRACE_TARGET,
memory_id = %mem.id,
namespace = %mem.namespace,
peer_count = fed.peer_count(),
backend = "postgres",
"create_memory_postgres: pipelining broadcast_store_quorum with local write",
);
let mem_echo = mem.clone();
let shipped = match (&embedding, app.embedder.as_ref().as_ref()) {
(Some(vec), Some(emb)) => Some(crate::federation::ShippedEmbedding::new(
mem.id.clone(),
emb.model_description(),
vec.clone(),
)),
_ => None,
};
let (store_res, quorum_res) = tokio::join!(
store_fut,
crate::federation::broadcast_store_quorum_with_embedding(
fed,
&mem_echo,
shipped.as_ref(),
)
);
match store_res {
Ok(id) => (id, Some(quorum_res)),
Err(e) => return store_err_to_response(e),
}
}
None => {
tracing::debug!(
target: crate::federation::SYNC_TRACE_TARGET,
memory_id = %mem.id,
namespace = %mem.namespace,
backend = "postgres",
"create_memory_postgres: federation disabled — skipping broadcast",
);
match store_fut.await {
Ok(id) => (id, None),
Err(e) => return store_err_to_response(e),
}
}
};
if crate::audit::is_enabled() {
let scope = mem
.metadata
.get("scope")
.and_then(|v| v.as_str())
.map(str::to_string);
crate::audit::emit(crate::audit::EventBuilder::new(
crate::audit::AuditAction::Store,
crate::audit::actor(agent_id.to_string(), "http_body", scope.clone()),
crate::audit::target_memory(
id.clone(),
mem.namespace.clone(),
Some(mem.title.clone()),
Some(mem.tier.to_string()),
scope,
),
));
}
let id_for_dispatch = id.clone();
let ns_for_dispatch = mem.namespace.clone();
let agent_for_dispatch = agent_id.to_string();
super::dispatch_event_postgres(
app,
crate::mcp::registry::tool_names::MEMORY_STORE,
&id_for_dispatch,
&ns_for_dispatch,
Some(&agent_for_dispatch),
None,
)
.await;
if let Some(quorum_res) = quorum_outcome {
match quorum_res {
Ok(tracker) => {
if let Err(err) = crate::federation::finalise_quorum(&tracker) {
let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
return super::quorum_not_met_response(&payload);
}
}
Err(err) => {
let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
return super::quorum_not_met_response(&payload);
}
}
}
let mut payload = match super::to_value_or_500("create_memory.postgres.response", &mem) {
Ok(v) => v,
Err(resp) => return resp,
};
if let Some(obj) = payload.as_object_mut() {
obj.insert("id".to_string(), serde_json::Value::String(id));
if !auto_tags.is_empty() {
obj.insert("auto_tags".to_string(), json!(auto_tags));
}
}
(StatusCode::CREATED, Json(payload)).into_response()
}
pub async fn create_memory(
State(app): State<AppState>,
headers: HeaderMap,
JsonOrBadRequest(body): JsonOrBadRequest<CreateMemory>,
) -> impl IntoResponse {
if let Err(e) = validate::RequestValidator::validate_create(&body) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
let (agent_id, metadata) = match resolve_create_agent_id(&headers, &body) {
Ok(v) => v,
Err(resp) => return resp,
};
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
return create_memory_postgres(&app, &body, &agent_id, metadata).await;
}
let auto_tags = maybe_auto_tag(
&app,
&body.title,
&body.content,
&body.tags,
&body.namespace,
)
.await;
let (embedding, embed_status) = embed_create_before_lock(&app, &body.title, &body.content);
let conflict_candidate_ids: Option<Vec<String>> = if body.force {
None
} else if let Some(ref qe) = embedding {
let vi = app.vector_index.lock().await;
vi.as_ref()
.filter(|idx| idx.is_fully_searchable() && !idx.is_empty())
.map(|idx| {
idx.search(qe, db::PROACTIVE_CONFLICT_INDEX_K)
.into_iter()
.map(|h| h.id)
.collect()
})
} else {
None
};
let on_conflict_str = body.on_conflict.as_deref().unwrap_or("error");
let on_conflict_mode = match crate::mcp::tools::OnConflictMode::parse(on_conflict_str) {
Ok(m) => m,
Err(msg) => {
return (StatusCode::BAD_REQUEST, Json(json!({ "error": msg }))).into_response();
}
};
let state = app.db.clone();
let now = Utc::now();
let lock = state.lock().await;
let expires_at = body.expires_at.clone().or_else(|| {
body.ttl_secs
.or(lock.2.ttl_for_tier(&body.tier))
.map(|s| (now + Duration::seconds(s)).to_rfc3339())
});
let resolved_title = match resolve_create_conflict_title(&lock.0, &body, on_conflict_mode) {
Ok(t) => t,
Err(resp) => return resp,
};
let mut merged_tags = body.tags.clone();
for t in &auto_tags {
if !merged_tags.iter().any(|existing| existing == t) {
merged_tags.push(t.clone());
}
}
let mut mem = Memory {
id: Uuid::new_v4().to_string(),
tier: body.tier.clone(),
namespace: body.namespace.clone(),
title: resolved_title,
content: body.content.clone(),
tags: merged_tags,
priority: body.priority.clamp(1, 10),
confidence: body.resolved_confidence().clamp(0.0, 1.0),
source: body.source.clone(),
access_count: 0,
created_at: now.to_rfc3339(),
updated_at: now.to_rfc3339(),
last_accessed_at: None,
expires_at,
metadata,
reflection_depth: 0,
memory_kind: body
.kind
.as_deref()
.and_then(crate::models::MemoryKind::from_str)
.unwrap_or_default(),
entity_id: None,
persona_version: None,
citations: body.citations.clone(),
source_uri: body.source_uri.clone(),
source_span: body.source_span.clone(),
confidence_source: body.resolved_confidence_source(),
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
{
let presented_sig = body
.signature
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty());
if let Some(sig_b64) = presented_sig {
let (sig_bytes, signed_created_at) = match crate::identity::attest::prepare_signed_store(
sig_b64,
body.created_at.as_deref(),
) {
Ok(v) => v,
Err(msg) => {
return (StatusCode::BAD_REQUEST, Json(json!({"error": msg}))).into_response();
}
};
mem.created_at = signed_created_at.to_string();
if let Err(e) = crate::identity::attest::stamp_attestation_sync(
&lock.0,
&mut mem,
&agent_id,
Some(&sig_bytes),
) {
return (
StatusCode::FORBIDDEN,
Json(json!({
"code": crate::errors::error_codes::ATTESTATION_FAILED,
"error": e.to_string(),
})),
)
.into_response();
}
} else if crate::identity::attest::require_agent_attestation_enabled()
&& let Err(e) =
crate::identity::attest::stamp_attestation_sync(&lock.0, &mut mem, &agent_id, None)
{
return (
StatusCode::FORBIDDEN,
Json(json!({
"code": crate::errors::error_codes::ATTESTATION_FAILED,
"error": e.to_string(),
})),
)
.into_response();
}
}
let lock = match enforce_create_governance(&app, lock, &mem).await {
Ok(lock) => lock,
Err(resp) => return resp,
};
let contradictions =
db::find_contradictions(&lock.0, &mem.title, &mem.namespace).unwrap_or_default();
let contradiction_ids: Vec<String> = contradictions
.iter()
.filter(|c| c.id != mem.id)
.map(|c| c.id.clone())
.collect();
if !body.force
&& let Some(ref qe) = embedding
{
let check_result = match &conflict_candidate_ids {
Some(ids) => db::proactive_conflict_check_candidates(&lock.0, &mem, qe, ids),
None => db::proactive_conflict_check(&lock.0, &mem, qe),
};
match check_result {
Ok(Some(conflict)) => {
tracing::info!(
target: "create_memory",
namespace = %mem.namespace,
existing_id = %conflict.existing_id,
similarity = conflict.similarity,
reason = conflict.reason,
"create_memory refused by proactive conflict detection (#519); \
pass force=true to override",
);
return (
StatusCode::CONFLICT,
Json(json!({
"error": format!(
"near-duplicate of existing memory in namespace '{}'",
mem.namespace,
),
"code": crate::errors::error_codes::CONFLICT,
"existing_id": conflict.existing_id,
"existing_title": conflict.existing_title,
(field_names::SIMILARITY): conflict.similarity,
"reason": conflict.reason,
"hint": "pass force=true to insert anyway",
})),
)
.into_response();
}
Ok(None) => {}
Err(e) => {
tracing::warn!("proactive_conflict_check failed (non-fatal, continuing): {e}");
}
}
}
let actual_id = match insert_create_with_quota(&lock, &mem, &embedding) {
Ok(id) => id,
Err(resp) => return resp,
};
drop(lock);
fanout_and_assemble_create_response(
&app,
&mem,
&actual_id,
embedding,
&auto_tags,
contradiction_ids,
embed_status,
)
.await
}
#[cfg(test)]
mod tests {
use super::*;
use axum::http::{HeaderMap, HeaderValue};
use serde_json::json;
fn make_body(title: &str) -> CreateMemory {
CreateMemory {
tier: Tier::Long,
namespace: "test-ns".to_string(),
title: title.to_string(),
content: "content body — long enough to satisfy validators".to_string(),
tags: Vec::new(),
priority: 5,
confidence: Some(0.8),
source: "test".to_string(),
expires_at: None,
ttl_secs: None,
metadata: json!({}),
agent_id: None,
scope: None,
on_conflict: None,
detect_conflicts: None,
force: false,
citations: Vec::new(),
source_uri: None,
source_span: None,
kind: None,
signature: None,
created_at: None,
}
}
fn header(name: &'static str, value: &str) -> HeaderMap {
let mut h = HeaderMap::new();
h.insert(name, HeaderValue::from_str(value).unwrap());
h
}
#[test]
fn stage1_agent_id_body_disagreeing_with_header_returns_403() {
let mut body = make_body("title-1");
body.agent_id = Some("ai:from-body".to_string());
let headers = header("x-agent-id", "ai:from-header");
let err = resolve_create_agent_id(&headers, &body)
.expect_err("body/header disagree must 403 post-#907");
assert_eq!(err.status(), StatusCode::FORBIDDEN);
}
#[test]
fn stage1_agent_id_body_matching_header_succeeds() {
let mut body = make_body("title-1-match");
body.agent_id = Some("ai:same".to_string());
let headers = header("x-agent-id", "ai:same");
let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
assert_eq!(aid, "ai:same");
assert_eq!(metadata["agent_id"], json!("ai:same"));
}
#[test]
fn stage1_agent_id_metadata_disagreeing_with_header_returns_403() {
let mut body = make_body("title-2");
body.metadata = json!({"agent_id": "ai:from-metadata"});
let headers = header("x-agent-id", "ai:from-header");
let err = resolve_create_agent_id(&headers, &body)
.expect_err("metadata/header disagree must 403 post-#907");
assert_eq!(err.status(), StatusCode::FORBIDDEN);
}
#[test]
fn stage1_agent_id_metadata_matching_header_succeeds() {
let mut body = make_body("title-2-match");
body.metadata = json!({"agent_id": "ai:from-header"});
let headers = header("x-agent-id", "ai:from-header");
let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
assert_eq!(aid, "ai:from-header");
assert_eq!(metadata["agent_id"], json!("ai:from-header"));
}
#[test]
fn stage1_agent_id_x_agent_id_header_used_when_body_and_metadata_absent() {
let body = make_body("title-3");
let headers = header("x-agent-id", "ai:from-header");
let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
assert_eq!(aid, "ai:from-header");
assert_eq!(metadata["agent_id"], json!("ai:from-header"));
}
#[test]
fn stage1_agent_id_synthesised_when_no_source_supplied() {
let body = make_body("title-4");
let headers = HeaderMap::new();
let (aid, metadata) = resolve_create_agent_id(&headers, &body).expect("resolve ok");
assert!(
aid.starts_with("anonymous:req-"),
"synthesised agent_id must follow the `anonymous:req-<uuid8>` shape; got {aid}"
);
assert_eq!(metadata["agent_id"], json!(aid));
}
#[test]
fn stage2_conflict_error_mode_returns_409_when_title_exists() {
let conn = db::open(std::path::Path::new(":memory:")).unwrap();
let mut seed = Memory::default();
seed.title = "dup-title".to_string();
seed.namespace = "ns-x".to_string();
seed.tier = Tier::Long;
seed.content = "seed content".to_string();
seed.source = "test".to_string();
seed.created_at = Utc::now().to_rfc3339();
seed.updated_at = seed.created_at.clone();
db::insert(&conn, &seed).expect("seed insert ok");
let mut body = make_body("dup-title");
body.namespace = "ns-x".to_string();
use crate::mcp::tools::OnConflictMode;
let err = resolve_create_conflict_title(&conn, &body, OnConflictMode::Error)
.expect_err("must return CONFLICT");
assert_eq!(err.status(), StatusCode::CONFLICT);
}
#[test]
fn stage2_conflict_version_mode_picks_a_free_suffix() {
let conn = db::open(std::path::Path::new(":memory:")).unwrap();
let mut seed = Memory::default();
seed.title = "vers-title".to_string();
seed.namespace = "ns-v".to_string();
seed.tier = Tier::Long;
seed.content = "seed".to_string();
seed.source = "test".to_string();
seed.created_at = Utc::now().to_rfc3339();
seed.updated_at = seed.created_at.clone();
db::insert(&conn, &seed).expect("seed insert ok");
let mut body = make_body("vers-title");
body.namespace = "ns-v".to_string();
use crate::mcp::tools::OnConflictMode;
let resolved = resolve_create_conflict_title(&conn, &body, OnConflictMode::Version)
.expect("version path returns Ok");
assert_ne!(resolved, "vers-title");
assert!(
resolved.starts_with("vers-title"),
"versioned title must preserve the original base; got {resolved}"
);
}
#[test]
fn stage2_conflict_merge_mode_passes_title_through_unchanged() {
let conn = db::open(std::path::Path::new(":memory:")).unwrap();
let body = make_body("merge-title");
use crate::mcp::tools::OnConflictMode;
let resolved = resolve_create_conflict_title(&conn, &body, OnConflictMode::Merge)
.expect("merge path returns Ok");
assert_eq!(resolved, "merge-title");
}
#[test]
fn stage3_embed_no_embedder_reports_indexed() {
let (vec, status): (Option<Vec<f32>>, EmbedStatus) = (None, EmbedStatus::Indexed);
assert!(vec.is_none());
assert!(matches!(status, EmbedStatus::Indexed));
assert!(
!status.is_degraded(),
"Indexed must NOT be classified as degraded by `is_degraded` — the \
create_memory response branch on `embed_status` keys on this"
);
}
#[test]
fn validation_empty_title_short_circuits_with_bad_request() {
let body = make_body("");
let err = validate::RequestValidator::validate_create(&body)
.expect_err("empty title must fail validation");
let msg = err.to_string();
assert!(
!msg.is_empty(),
"validator error must carry a message for the 400 envelope"
);
}
#[test]
fn insert_governance_refusal_downcasts_to_403_envelope() {
let refusal = crate::storage::GovernanceRefusal {
reason: "test rule forbids store".to_string(),
};
let wrapped: anyhow::Error = anyhow::anyhow!(refusal.clone());
let downcast: Option<&crate::storage::GovernanceRefusal> = wrapped.downcast_ref();
assert!(
downcast.is_some(),
"GovernanceRefusal must round-trip through anyhow::Error \
so insert_create_with_quota's downcast can map to 403"
);
assert_eq!(downcast.unwrap().reason, refusal.reason);
}
}