#[cfg(feature = "sal")]
use crate::models::field_names;
use axum::{
Json,
extract::State,
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use serde_json::{Value, json};
use super::AppState;
#[cfg(feature = "sal")]
use super::StorageBackend;
fn err_response(e: String) -> axum::response::Response {
tracing::warn!(error = %e, "HTTP route #1111 substrate refusal");
(StatusCode::BAD_REQUEST, Json(json!({"error": e}))).into_response()
}
async fn reflect_fanout(
fed: Option<&crate::federation::FederationConfig>,
mem: &crate::models::Memory,
links: &[crate::models::MemoryLink],
) -> Option<axum::response::Response> {
let fed = fed?;
match crate::federation::broadcast_store_quorum(fed, mem).await {
Ok(tracker) => {
if let Err(err) = crate::federation::finalise_quorum(&tracker) {
let payload = crate::federation::QuorumNotMetPayload::from_err(&err);
return Some(super::quorum_not_met_response(&payload));
}
}
Err(e) => {
tracing::warn!("reflect memory fanout error (local committed): {e:?}");
}
}
for link in links.iter().filter(|l| {
l.relation == crate::models::MemoryLinkRelation::ReflectsOn && l.source_id == mem.id
}) {
if let Err(e) = crate::federation::broadcast_link_quorum(fed, link).await {
tracing::warn!("reflect edge fanout error (local committed): {e:?}");
}
}
None
}
pub async fn handle_smart_load_http(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
let caller =
crate::handlers::parity::resolve_caller_agent_id(None, &headers, None).unwrap_or_default();
let lock = app.db.lock().await;
let embedder = app
.embedder
.as_ref()
.as_ref()
.map(|e| e as &dyn crate::embeddings::Embed);
let result = crate::mcp::handle_smart_load(&lock.0, &body, embedder, Some(caller.as_str()));
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_reflect_http(
State(app): State<AppState>,
_headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let (input, caller_depth) = match crate::mcp::parse_reflect_input(&body, None) {
Ok(parsed) => parsed,
Err(e) => return err_response(e),
};
let caller = crate::store::CallerContext::for_agent(&input.agent_id);
if let Some(caller_d) = caller_depth {
let mut max_src_depth: i32 = 0;
for sid in &input.source_ids {
if let Ok(m) = app.store.get(&caller, sid).await {
max_src_depth = max_src_depth.max(m.reflection_depth);
}
}
let computed = i64::from(max_src_depth.max(0).saturating_add(1));
if caller_d != computed {
return err_response(format!(
"CALLER_DEPTH_MISMATCH: caller asserted depth={caller_d} but \
substrate computed reflection_depth={computed} from sources \
(max(source_depths)+1). Omit the `depth` field to defer to the \
substrate, or pass the matching value."
));
}
}
let active_keypair = app.active_keypair.as_ref().as_ref();
let outcome = match app.store.reflect(&caller, &input, active_keypair).await {
Ok(outcome) => outcome,
Err(e) => return err_response(crate::mcp::map_reflect_error_to_wire_string(e)),
};
if app.federation.is_some() {
if let Ok(mem) = app.store.get(&caller, &outcome.id).await {
let links = app
.store
.get_links_for_anchor(&outcome.id)
.await
.unwrap_or_default();
if let Some(resp) =
reflect_fanout(app.federation.as_ref().as_ref(), &mem, &links).await
{
return resp;
}
}
}
return (
StatusCode::OK,
Json(json!({
"id": outcome.id,
(field_names::REFLECTION_DEPTH): outcome.reflection_depth,
(crate::models::link::REL_REFLECTS_ON): outcome.reflects_on,
"namespace": outcome.namespace,
})),
)
.into_response();
}
let lock = app.db.lock().await;
let db_path = lock.1.clone();
let embedder = app
.embedder
.as_ref()
.as_ref()
.map(|e| e as &dyn crate::embeddings::Embed);
let vec_lock = app.vector_index.lock().await;
let vector_index = vec_lock.as_ref();
let active_keypair = app.active_keypair.as_ref().as_ref();
let result = crate::mcp::handle_reflect(
&lock.0,
&db_path,
&body,
embedder,
vector_index,
None,
active_keypair,
);
drop(vec_lock);
let fanout = match &result {
Ok(v) => v.get("id").and_then(|x| x.as_str()).and_then(|id| {
let mem = crate::db::get(&lock.0, id).ok().flatten();
let links = crate::db::get_links(&lock.0, id).unwrap_or_default();
mem.map(|m| (m, links))
}),
Err(_) => None,
};
drop(lock);
if let Some((mem, links)) = fanout.as_ref() {
if let Some(resp) = reflect_fanout(app.federation.as_ref().as_ref(), mem, links).await {
return resp;
}
}
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_recall_observations_http(
State(app): State<AppState>,
_headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let recall_id = body
.get("recall_id")
.and_then(Value::as_str)
.map(str::trim)
.filter(|s| !s.is_empty());
let consumed = body.get("consumed").and_then(Value::as_bool);
let since = body
.get("since")
.and_then(Value::as_str)
.map(str::trim)
.filter(|s| !s.is_empty());
let until = body
.get("until")
.and_then(Value::as_str)
.map(str::trim)
.filter(|s| !s.is_empty());
let limit = body
.get("limit")
.and_then(Value::as_u64)
.and_then(|n| usize::try_from(n).ok())
.map_or(crate::mcp::RECALL_OBS_DEFAULT_LIMIT, |n| {
n.min(crate::mcp::RECALL_OBS_MAX_LIMIT)
});
return match app
.store
.list_recall_observations(recall_id, consumed, since, until, limit)
.await
{
Ok(rows) => {
let count = rows.len();
(
StatusCode::OK,
Json(json!({ (field_names::OBSERVATIONS): rows, "count": count })),
)
.into_response()
}
Err(e) => err_response(e.to_string()),
};
}
let lock = app.db.lock().await;
let result = crate::mcp::handle_recall_observations(&lock.0, &body);
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_reflection_origin_http(
State(app): State<AppState>,
_headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let memory_id = match body["memory_id"].as_str() {
Some(s) if !s.is_empty() => s,
Some(_) => return err_response(crate::errors::msg::MEMORY_ID_EMPTY.to_string()),
None => return err_response(crate::errors::msg::MEMORY_ID_REQUIRED.to_string()),
};
return match app.store.get_reflection_origin(memory_id).await {
Ok(Some(record)) => (
StatusCode::OK,
Json(json!({
"memory_id": record.memory_id,
(field_names::PEER_ORIGIN): record.peer_origin,
(field_names::SIGNING_AGENT): record.signing_agent,
(field_names::ORIGINAL_DEPTH): record.original_depth,
(field_names::LOCAL_DEPTH_AT_ARRIVAL): record.local_depth_at_arrival,
(field_names::IS_REFLECTION): record.is_reflection,
})),
)
.into_response(),
Ok(None) => err_response(crate::errors::msg::memory_not_found(memory_id)),
Err(e) => err_response(e.to_string()),
};
}
let lock = app.db.lock().await;
let result = crate::mcp::handle_reflection_origin(&lock.0, &body);
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_dependents_of_invalidated_http(
State(app): State<AppState>,
_headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
let lock = app.db.lock().await;
let result = crate::mcp::handle_dependents_of_invalidated(&lock.0, &body);
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_export_reflection_http(
State(app): State<AppState>,
_headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
let lock = app.db.lock().await;
let result = crate::mcp::handle_export_reflection(&lock.0, &body);
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_atomise_http(
State(app): State<AppState>,
_headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
let lock = app.db.lock().await;
let tier = app.tier_config.tier;
let result = crate::mcp::tools::handle_atomise(&lock.0, &body, None, tier, None);
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_calibrate_confidence_http(
State(app): State<AppState>,
_headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
let lock = app.db.lock().await;
let result = crate::mcp::handle_calibrate_confidence(&lock.0, &body);
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_verify_http(
State(app): State<AppState>,
_headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
let lock = app.db.lock().await;
let result = crate::mcp::handle_verify(&lock.0, &body);
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_replay_http(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
let body_agent = body.get("agent_id").and_then(Value::as_str);
let caller = match crate::handlers::parity::resolve_caller_agent_id(body_agent, &headers, None)
{
Ok(id) => id,
Err(e) => return err_response(e),
};
let mut owned = body.clone();
if let Some(obj) = owned.as_object_mut() {
obj.insert("agent_id".to_string(), Value::String(caller.clone()));
}
let lock = app.db.lock().await;
let result = crate::mcp::handle_replay(&lock.0, &owned, None, Some(&caller));
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_subscription_replay_http(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
let body_agent = body.get("agent_id").and_then(Value::as_str);
let caller = match crate::handlers::parity::resolve_caller_agent_id(body_agent, &headers, None)
{
Ok(id) => id,
Err(e) => return err_response(e),
};
let lock = app.db.lock().await;
let result = crate::mcp::handle_subscription_replay(&lock.0, &body, Some(&caller));
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_subscription_dlq_list_http(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
let body_agent = body.get("agent_id").and_then(Value::as_str);
let caller = match crate::handlers::parity::resolve_caller_agent_id(body_agent, &headers, None)
{
Ok(id) => id,
Err(e) => return err_response(e),
};
let lock = app.db.lock().await;
let result = crate::mcp::handle_subscription_dlq_list(&lock.0, &body, Some(&caller));
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_rule_list_http(
State(app): State<AppState>,
_headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
let lock = app.db.lock().await;
let result = crate::mcp::handle_rule_list(&lock.0, &body);
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}
pub async fn handle_check_agent_action_http(
State(app): State<AppState>,
_headers: HeaderMap,
Json(body): Json<Value>,
) -> impl IntoResponse {
let lock = app.db.lock().await;
let result = crate::mcp::handle_check_agent_action(&lock.0, &body);
drop(lock);
match result {
Ok(v) => (StatusCode::OK, Json(v)).into_response(),
Err(e) => err_response(e),
}
}