#![allow(clippy::too_many_lines)]
use crate::models::Memory;
use crate::models::field_names;
use axum::{
Json,
extract::{Query, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use serde::Deserialize;
use serde_json::json;
use crate::db;
use crate::identity::sentinels;
use crate::validate;
use super::AppState;
#[cfg(feature = "sal")]
use super::StorageBackend;
#[cfg(feature = "sal")]
use super::store_err_to_response;
#[derive(Debug, Deserialize)]
pub struct EntityRegisterBody {
pub canonical_name: String,
pub namespace: String,
#[serde(default)]
pub aliases: Vec<String>,
#[serde(default)]
pub metadata: serde_json::Value,
pub agent_id: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct EntityByAliasQuery {
pub alias: String,
pub namespace: Option<String>,
}
pub async fn entity_register(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<EntityRegisterBody>,
) -> impl IntoResponse {
if let Err(e) = validate::validate_title(&body.canonical_name) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid canonical_name: {e}")})),
)
.into_response();
}
if let Err(e) = validate::validate_namespace(&body.namespace) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("namespace", e)})),
)
.into_response();
}
let agent_id = body
.agent_id
.as_deref()
.or_else(|| {
headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok())
})
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_string);
if let Some(aid) = agent_id.as_deref()
&& let Err(e) = validate::validate_agent_id(aid)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
)
.into_response();
}
let extra_metadata = if body.metadata.is_object() {
body.metadata.clone()
} else {
json!({})
};
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let aid = agent_id
.clone()
.unwrap_or_else(|| "anonymous:entity-register".to_string());
let ctx = crate::store::CallerContext::for_agent(aid.clone());
{
use crate::models::GovernanceDecision;
let payload_for_pending = serde_json::json!({
"title": body.canonical_name,
"namespace": body.namespace,
"tier": "long",
"tags": ["entity"],
"metadata": &extra_metadata,
"aliases": &body.aliases,
});
match app
.store
.enforce_governance_action(
crate::store::GovernedAction::Store,
&body.namespace,
&aid,
None,
None,
&payload_for_pending,
)
.await
{
Ok(GovernanceDecision::Allow) => {}
Ok(GovernanceDecision::Deny(refusal)) => {
return (
StatusCode::FORBIDDEN,
Json(json!({
"error": crate::governance::deny_message(
"entity_register",
crate::governance::DenyGate::Governance,
&refusal.reason,
),
})),
)
.into_response();
}
Ok(GovernanceDecision::Pending(pending_id)) => {
return (
StatusCode::ACCEPTED,
Json(json!({
"status": "pending",
(field_names::PENDING_ID): pending_id,
"reason": crate::errors::msg::GOVERNANCE_REQUIRES_APPROVAL,
"action": "store",
"namespace": body.namespace,
(field_names::STORAGE_BACKEND): "postgres",
})),
)
.into_response();
}
Err(e) => return store_err_to_response(e),
}
}
return match app
.store
.entity_register(
&ctx,
&body.canonical_name,
&body.namespace,
&body.aliases,
&extra_metadata,
Some(&aid),
)
.await
{
Ok(reg) => (
if reg.created {
StatusCode::CREATED
} else {
StatusCode::OK
},
Json(json!({
"entity_id": reg.entity_id,
(field_names::CANONICAL_NAME): reg.canonical_name,
"namespace": reg.namespace,
"aliases": reg.aliases,
"created": reg.created,
})),
)
.into_response(),
Err(e) => store_err_to_response(e),
};
}
let lock = app.db.lock().await;
match db::entity_register(
&lock.0,
&body.canonical_name,
&body.namespace,
&body.aliases,
&extra_metadata,
agent_id.as_deref(),
) {
Ok(reg) => {
let status = if reg.created {
StatusCode::CREATED
} else {
StatusCode::OK
};
(
status,
Json(json!({
"entity_id": reg.entity_id,
(field_names::CANONICAL_NAME): reg.canonical_name,
"namespace": reg.namespace,
"aliases": reg.aliases,
"created": reg.created,
})),
)
.into_response()
}
Err(e) => {
let msg = e.to_string();
if msg.contains("non-entity memory") {
return (StatusCode::CONFLICT, Json(json!({"error": msg}))).into_response();
}
crate::handlers::errors::handler_error_500(&e)
}
}
}
pub async fn entity_get_by_alias(
State(app): State<AppState>,
headers: axum::http::HeaderMap,
Query(p): Query<EntityByAliasQuery>,
) -> impl IntoResponse {
#[cfg(not(feature = "sal"))]
let _ = &headers;
let alias = p.alias.trim();
if alias.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "alias is required"})),
)
.into_response();
}
let namespace = p
.namespace
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty());
if let Some(ns) = namespace
&& let Err(e) = validate::validate_namespace(ns)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("namespace", e)})),
)
.into_response();
}
#[cfg(feature = "sal-postgres")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
match app.store.entity_get_by_alias(alias, namespace).await {
Ok(Some(rec)) => {
let caller = {
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
crate::identity::resolve_http_agent_id(None, header_agent_id)
.unwrap_or_else(|_| crate::identity::anonymous_request_id())
};
let caller_is_admin =
crate::handlers::admin_role::is_admin_caller_trusted(&app, &caller);
let ctx_admin =
crate::store::CallerContext::for_admin_checked(caller.clone(), caller_is_admin);
let visible = caller_is_admin
|| app
.store
.get(&ctx_admin, &rec.entity_id)
.await
.ok()
.as_ref()
.is_none_or(|m| crate::visibility::is_visible_to_caller(m, &caller));
if visible {
return Json(json!({
"found": true,
"entity_id": rec.entity_id,
(field_names::CANONICAL_NAME): rec.canonical_name,
"namespace": rec.namespace,
"aliases": rec.aliases,
}))
.into_response();
}
}
Ok(None) => { }
Err(e) => return store_err_to_response(e),
}
let ctx = crate::handlers::parity::http_caller_ctx(&headers, None);
let filter = crate::store::Filter {
namespace: namespace.map(str::to_string),
limit: 1000,
..Default::default()
};
return match app.store.list(&ctx, &filter).await {
Ok(memories) => {
for m in &memories {
let Some(meta) = m.metadata.as_object() else {
continue;
};
let Some(kind) = meta.get("kind").and_then(|v| v.as_str()) else {
continue;
};
if kind != "entity" {
continue;
}
let aliases: Vec<String> = meta
.get("aliases")
.and_then(|v| v.as_array())
.map(|a| {
a.iter()
.filter_map(|x| x.as_str().map(str::to_string))
.collect()
})
.unwrap_or_default();
if aliases.iter().any(|a| a.eq_ignore_ascii_case(alias))
|| m.title.eq_ignore_ascii_case(alias)
{
return Json(json!({
"found": true,
"entity_id": m.id,
(field_names::CANONICAL_NAME): m.title,
"namespace": m.namespace,
"aliases": aliases,
}))
.into_response();
}
}
Json(json!({
"found": false,
"entity_id": null,
(field_names::CANONICAL_NAME): null,
"namespace": null,
"aliases": [],
}))
.into_response()
}
Err(e) => store_err_to_response(e),
};
}
let caller = {
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
crate::identity::resolve_http_agent_id(None, header_agent_id)
.unwrap_or_else(|_| crate::identity::anonymous_request_id())
};
let caller_is_admin = crate::handlers::admin_role::is_admin_caller_trusted(&app, &caller);
let lock = app.db.lock().await;
match db::entity_get_by_alias(&lock.0, alias, namespace) {
Ok(Some(rec)) => {
let visible = caller_is_admin
|| db::get(&lock.0, &rec.entity_id)
.ok()
.flatten()
.as_ref()
.is_none_or(|m| crate::visibility::is_visible_to_caller(m, &caller));
if !visible {
return Json(json!({
"found": false,
"entity_id": null,
(field_names::CANONICAL_NAME): null,
"namespace": null,
"aliases": [],
}))
.into_response();
}
Json(json!({
"found": true,
"entity_id": rec.entity_id,
(field_names::CANONICAL_NAME): rec.canonical_name,
"namespace": rec.namespace,
"aliases": rec.aliases,
}))
.into_response()
}
Ok(None) => Json(json!({
"found": false,
"entity_id": null,
(field_names::CANONICAL_NAME): null,
"namespace": null,
"aliases": [],
}))
.into_response(),
Err(e) => crate::handlers::errors::handler_error_500(&e),
}
}
#[derive(Debug, Deserialize)]
pub struct KgTimelineQuery {
pub source_id: String,
pub since: Option<String>,
pub until: Option<String>,
pub limit: Option<usize>,
}
pub async fn kg_timeline(
State(app): State<AppState>,
headers: axum::http::HeaderMap,
Query(p): Query<KgTimelineQuery>,
) -> impl IntoResponse {
if let Err(e) = validate::validate_id(&p.source_id) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("source_id", e)})),
)
.into_response();
}
let since = p.since.as_deref().map(str::trim).filter(|s| !s.is_empty());
let until = p.until.as_deref().map(str::trim).filter(|s| !s.is_empty());
if let Some(s) = since
&& let Err(e) = validate::validate_expires_at_format(s)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid since: {e}")})),
)
.into_response();
}
if let Some(u) = until
&& let Err(e) = validate::validate_expires_at_format(u)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid until: {e}")})),
)
.into_response();
}
let caller = match crate::handlers::parity::resolve_caller_agent_id(None, &headers, None) {
Ok(c) => c,
Err(err) => {
return (StatusCode::BAD_REQUEST, Json(json!({"error": err}))).into_response();
}
};
let extract_owner_target = |mem: &Memory| -> (String, String) {
let owner = mem
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let target = mem
.metadata
.get(field_names::TARGET_AGENT_ID)
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
(owner, target)
};
let source_owner: Option<(String, String)> = {
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let ctx = crate::store::CallerContext::for_agent(caller.clone());
match app.store.get(&ctx, &p.source_id).await {
Ok(mem) => Some(extract_owner_target(&mem)),
Err(e) => {
let msg = format!("{e:?}");
if msg.contains("NotFound") || msg.contains("not found") {
None
} else {
tracing::error!("kg_timeline: source lookup failed (postgres): {e:?}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
)
.into_response();
}
}
}
} else {
let lock = app.db.lock().await;
match db::get(&lock.0, &p.source_id) {
Ok(Some(mem)) => Some(extract_owner_target(&mem)),
Ok(None) => None,
Err(e) => {
tracing::error!("kg_timeline: source lookup failed: {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
)
.into_response();
}
}
}
#[cfg(not(feature = "sal"))]
{
let lock = app.db.lock().await;
match db::get(&lock.0, &p.source_id) {
Ok(Some(mem)) => Some(extract_owner_target(&mem)),
Ok(None) => None,
Err(e) => {
tracing::error!("kg_timeline: source lookup failed: {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
)
.into_response();
}
}
}
};
let Some((owner, target)) = source_owner else {
return (
StatusCode::NOT_FOUND,
Json(json!({
"found": false,
"source_id": p.source_id,
"error": crate::errors::msg::SOURCE_MEMORY_NOT_FOUND,
})),
)
.into_response();
};
let is_unowned_legacy = owner.is_empty();
if !is_unowned_legacy
&& owner != caller
&& target != caller
&& caller != sentinels::DAEMON_PRINCIPAL
{
tracing::warn!(
target: super::AUTHZ_TRACE_TARGET,
"GET /api/v1/kg/timeline 403: caller {caller} != owner {owner} (source_id={})",
p.source_id
);
return (
StatusCode::FORBIDDEN,
Json(json!({
"error": crate::errors::msg::CALLER_NOT_SOURCE_MEMORY_OWNER,
"owner": owner,
"caller": caller,
"source_id": p.source_id,
})),
)
.into_response();
}
#[cfg(feature = "sal-postgres")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let limit = p.limit;
return match app
.store
.kg_timeline(&p.source_id, since, until, limit)
.await
{
Ok(events) => {
let events_json: Vec<serde_json::Value> = events
.iter()
.map(|e| {
json!({
"target_id": e.target_id,
"relation": e.relation,
(field_names::VALID_FROM): e.valid_from,
(field_names::VALID_UNTIL): e.valid_until,
(field_names::OBSERVED_BY): e.observed_by,
"title": e.title,
(field_names::TARGET_NAMESPACE): e.target_namespace,
})
})
.collect();
Json(json!({
"source_id": p.source_id,
"events": events_json,
"count": events.len(),
}))
.into_response()
}
Err(e) => store_err_to_response(e),
};
}
let lock = app.db.lock().await;
match db::kg_timeline(&lock.0, &p.source_id, since, until, p.limit) {
Ok(events) => {
let events_json: Vec<serde_json::Value> = events
.iter()
.map(|e| {
json!({
"target_id": e.target_id,
"relation": e.relation,
(field_names::VALID_FROM): e.valid_from,
(field_names::VALID_UNTIL): e.valid_until,
(field_names::OBSERVED_BY): e.observed_by,
"title": e.title,
(field_names::TARGET_NAMESPACE): e.target_namespace,
})
})
.collect();
Json(json!({
"source_id": p.source_id,
"events": events_json,
"count": events.len(),
}))
.into_response()
}
Err(e) => crate::handlers::errors::handler_error_500(&e),
}
}
#[derive(Debug, Deserialize)]
pub struct KgInvalidateBody {
pub source_id: String,
pub target_id: String,
pub relation: String,
pub valid_until: Option<String>,
}
pub async fn kg_invalidate(
State(app): State<AppState>,
headers: axum::http::HeaderMap,
Json(body): Json<KgInvalidateBody>,
) -> impl IntoResponse {
if let Err(e) = validate::RequestValidator::validate_link_triple(
&body.source_id,
&body.target_id,
&body.relation,
) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": e.to_string()})),
)
.into_response();
}
let valid_until = body
.valid_until
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty());
if let Some(ts) = valid_until
&& let Err(e) = validate::validate_expires_at_format(ts)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid valid_until: {e}")})),
)
.into_response();
}
let caller = match crate::handlers::parity::resolve_caller_agent_id(None, &headers, None) {
Ok(c) => c,
Err(err) => {
return (StatusCode::BAD_REQUEST, Json(json!({"error": err}))).into_response();
}
};
let source_owner: Option<(String, String)> = {
let lock = app.db.lock().await;
match db::get(&lock.0, &body.source_id) {
Ok(Some(mem)) => {
let owner = mem
.metadata
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let target = mem
.metadata
.get(field_names::TARGET_AGENT_ID)
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
Some((owner, target))
}
Ok(None) => None,
Err(e) => {
tracing::error!("kg_invalidate: source lookup failed: {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": crate::errors::msg::INTERNAL_SERVER_ERROR})),
)
.into_response();
}
}
};
let Some((owner, target)) = source_owner else {
return (
StatusCode::NOT_FOUND,
Json(json!({
"found": false,
"source_id": body.source_id,
"target_id": body.target_id,
"relation": body.relation,
"error": crate::errors::msg::SOURCE_MEMORY_NOT_FOUND,
})),
)
.into_response();
};
let is_unowned_legacy = owner.is_empty();
if !is_unowned_legacy
&& owner != caller
&& target != caller
&& caller != sentinels::DAEMON_PRINCIPAL
{
tracing::warn!(
target: super::AUTHZ_TRACE_TARGET,
"POST /api/v1/kg/invalidate 403: caller {caller} != owner {owner} (source_id={})",
body.source_id
);
return (
StatusCode::FORBIDDEN,
Json(json!({
"error": crate::errors::msg::CALLER_NOT_SOURCE_MEMORY_OWNER,
"owner": owner,
"caller": caller,
"source_id": body.source_id,
})),
)
.into_response();
}
#[cfg(feature = "sal-postgres")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
return match app
.store
.invalidate_link(
&body.source_id,
&body.target_id,
&body.relation,
valid_until,
)
.await
{
Ok(res) if res.found => (
StatusCode::OK,
Json(json!({
"found": true,
"source_id": body.source_id,
"target_id": body.target_id,
"relation": body.relation,
(field_names::VALID_UNTIL): res.valid_until,
(field_names::PREVIOUS_VALID_UNTIL): res.previous_valid_until,
})),
)
.into_response(),
Ok(_) => (
StatusCode::NOT_FOUND,
Json(json!({
"found": false,
"source_id": body.source_id,
"target_id": body.target_id,
"relation": body.relation,
})),
)
.into_response(),
Err(e) => store_err_to_response(e),
};
}
let lock = app.db.lock().await;
match db::invalidate_link(
&lock.0,
&body.source_id,
&body.target_id,
&body.relation,
valid_until,
) {
Ok(Some(res)) => (
StatusCode::OK,
Json(json!({
"found": true,
"source_id": body.source_id,
"target_id": body.target_id,
"relation": body.relation,
(field_names::VALID_UNTIL): res.valid_until,
(field_names::PREVIOUS_VALID_UNTIL): res.previous_valid_until,
})),
)
.into_response(),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(json!({
"found": false,
"source_id": body.source_id,
"target_id": body.target_id,
"relation": body.relation,
})),
)
.into_response(),
Err(e) => crate::handlers::errors::handler_error_500(&e),
}
}
#[derive(Debug, Deserialize)]
pub struct FindPathsBody {
#[serde(alias = "from_id")]
pub source_id: String,
#[serde(alias = "to_id")]
pub target_id: String,
#[serde(default)]
pub max_depth: Option<usize>,
#[serde(default)]
pub max_results: Option<usize>,
}
pub async fn kg_find_paths(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<FindPathsBody>,
) -> impl IntoResponse {
if let Err(e) = validate::validate_id(&body.source_id) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("source_id", e)})),
)
.into_response();
}
if let Err(e) = validate::validate_id(&body.target_id) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid target_id: {e}")})),
)
.into_response();
}
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let caller = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
Ok(id) => id,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
)
.into_response();
}
};
#[cfg(feature = "sal")]
{
let ctx = crate::store::CallerContext::for_agent(&caller);
return match app
.store
.find_paths(
&ctx,
&body.source_id,
&body.target_id,
body.max_depth,
body.max_results,
)
.await
{
Ok(paths) => {
if crate::audit::is_enabled() {
crate::audit::emit(crate::audit::EventBuilder::new(
crate::audit::AuditAction::Recall,
crate::audit::actor(sentinels::AI_HTTP, "http_body", None),
crate::audit::target_memory(
body.source_id.clone(),
String::new(),
Some(format!("find_paths -> {}", body.target_id)),
None,
None,
),
));
}
let count = paths.len();
Json(json!({
"paths": paths,
"count": count,
"source_id": body.source_id,
"target_id": body.target_id,
}))
.into_response()
}
Err(e) => {
let msg = e.to_string();
if msg.contains("max_depth") || msg.contains("depth") {
return (
StatusCode::UNPROCESSABLE_ENTITY,
Json(json!({"error": msg})),
)
.into_response();
}
store_err_to_response(e)
}
};
}
#[cfg(not(feature = "sal"))]
{
let _ = app;
let _ = body;
let _ = caller;
(
StatusCode::NOT_IMPLEMENTED,
Json(json!({"error": "find_paths requires --features sal"})),
)
.into_response()
}
}
#[derive(Debug, Deserialize)]
pub struct KgQueryBody {
#[serde(default)]
pub source_id: Option<String>,
#[serde(default)]
pub from: Option<String>,
#[serde(default)]
pub to: Option<String>,
pub max_depth: Option<usize>,
pub valid_at: Option<String>,
pub allowed_agents: Option<Vec<String>>,
pub limit: Option<usize>,
#[serde(default)]
pub include_invalidated: bool,
#[serde(default)]
pub rel_types: Option<Vec<String>>,
}
#[cfg(feature = "sal-postgres")]
async fn kg_query_filter_visible(
app: &AppState,
caller: &str,
target_ids: Vec<String>,
) -> std::collections::HashSet<String> {
use std::collections::HashSet;
let mut visible: HashSet<String> = HashSet::with_capacity(target_ids.len());
let ctx = crate::store::CallerContext::for_agent(caller);
for id in target_ids {
if let Ok(mem) = app.store.get(&ctx, &id).await {
if crate::visibility::is_visible_to_caller(&mem, caller) {
visible.insert(id);
}
}
}
visible
}
pub async fn kg_query(
State(app): State<AppState>,
headers: HeaderMap,
Json(body): Json<KgQueryBody>,
) -> impl IntoResponse {
let header_agent_id = headers
.get(crate::HEADER_AGENT_ID)
.and_then(|v| v.to_str().ok());
let caller = match crate::identity::resolve_http_agent_id(None, header_agent_id) {
Ok(id) => id,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("agent_id", e)})),
)
.into_response();
}
};
let source_id = body
.source_id
.clone()
.or_else(|| body.from.clone())
.unwrap_or_default();
if let Err(e) = validate::validate_id(&source_id) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": crate::errors::msg::invalid("source_id", e)})),
)
.into_response();
}
let max_depth = body.max_depth.unwrap_or(1);
let valid_at = body
.valid_at
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty());
if let Some(t) = valid_at
&& let Err(e) = validate::validate_expires_at_format(t)
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid valid_at: {e}")})),
)
.into_response();
}
let allowed_agents: Option<Vec<String>> = body.allowed_agents.as_ref().map(|v| {
v.iter()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
});
if let Some(agents) = allowed_agents.as_ref() {
for a in agents {
if let Err(e) = validate::validate_agent_id(a) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": format!("invalid allowed_agents entry: {e}")})),
)
.into_response();
}
}
}
#[cfg(feature = "sal-postgres")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
return match app
.store
.kg_query(&source_id, max_depth, body.include_invalidated)
.await
{
Ok(nodes) => {
let target_ids: Vec<String> = nodes.iter().map(|n| n.target_id.clone()).collect();
let visible = kg_query_filter_visible(&app, &caller, target_ids).await;
let nodes: Vec<_> = nodes
.into_iter()
.filter(|n| visible.contains(&n.target_id))
.collect();
let memories_json: Vec<serde_json::Value> = nodes
.iter()
.map(|n| {
json!({
"target_id": n.target_id,
"relation": n.relation,
"depth": n.depth,
"path": n.path,
})
})
.collect();
let mut paths_json: Vec<serde_json::Value> = Vec::new();
if let Some(target) = body.to.as_deref() {
for n in &nodes {
if n.target_id == target {
let chain: Vec<String> =
n.path.split("->").map(str::to_string).collect();
paths_json.push(serde_json::Value::Array(
chain.into_iter().map(serde_json::Value::String).collect(),
));
break;
}
}
} else {
for n in &nodes {
paths_json.push(serde_json::Value::String(n.path.clone()));
}
}
Json(json!({
"source_id": source_id,
"max_depth": max_depth,
"memories": memories_json,
"paths": paths_json,
"count": nodes.len(),
}))
.into_response()
}
Err(e) => {
let msg = e.to_string();
if msg.contains("max_depth") || msg.contains("depth") {
(
StatusCode::UNPROCESSABLE_ENTITY,
Json(json!({"error": msg})),
)
.into_response()
} else {
store_err_to_response(e)
}
}
};
}
let lock = app.db.lock().await;
let kg_res = db::kg_query(
&lock.0,
&source_id,
max_depth,
valid_at,
allowed_agents.as_deref(),
body.limit,
body.include_invalidated,
);
let nodes_opt = match &kg_res {
Ok(nodes) => {
let mut visible: std::collections::HashSet<String> =
std::collections::HashSet::with_capacity(nodes.len());
for n in nodes {
if let Ok(Some(mem)) = db::get(&lock.0, &n.target_id) {
if crate::visibility::is_visible_to_caller(&mem, &caller) {
visible.insert(n.target_id.clone());
}
}
}
Some(visible)
}
Err(_) => None,
};
drop(lock);
match kg_res {
Ok(nodes) => {
let visible = nodes_opt.unwrap_or_default();
let nodes: Vec<_> = nodes
.into_iter()
.filter(|n| visible.contains(&n.target_id))
.collect();
let memories_json: Vec<serde_json::Value> = nodes
.iter()
.map(|n| {
json!({
"target_id": n.target_id,
"relation": n.relation,
(field_names::VALID_FROM): n.valid_from,
(field_names::VALID_UNTIL): n.valid_until,
(field_names::OBSERVED_BY): n.observed_by,
"title": n.title,
(field_names::TARGET_NAMESPACE): n.target_namespace,
"depth": n.depth,
"path": n.path,
})
})
.collect();
let paths_json: Vec<&str> = nodes.iter().map(|n| n.path.as_str()).collect();
Json(json!({
"source_id": source_id,
"max_depth": max_depth,
"memories": memories_json,
"paths": paths_json,
"count": nodes.len(),
}))
.into_response()
}
Err(e) => {
let msg = e.to_string();
if msg.contains("max_depth") {
return (
StatusCode::UNPROCESSABLE_ENTITY,
Json(json!({"error": msg})),
)
.into_response();
}
crate::handlers::errors::handler_error_500(&e)
}
}
}