use crate::attribution::{
CreatorInfo, CreatorSource, HTTP_DEFAULT_CLIENT, X_TRUSTY_CLIENT_CWD, X_TRUSTY_CLIENT_NAME,
};
use crate::hook_emit::HookEventPayload;
use crate::{ActivityFilter, ActivitySource, AppState, DaemonEvent};
use axum::{
body::Body,
extract::{Path as AxumPath, Query, State},
http::{header, HeaderMap, HeaderValue, Request, StatusCode},
response::{IntoResponse, Response},
routing::{delete, get, post},
Json, Router,
};
use rust_embed::RustEmbed;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use trusty_common::memory_core::community::KnowledgeGap;
use trusty_common::memory_core::palace::{Palace, PalaceId, RoomType};
use trusty_common::memory_core::retrieval::recall_with_default_embedder;
use trusty_common::memory_core::store::kg::Triple;
use uuid::Uuid;
pub(crate) const HEALTH_PROBE_PALACE: &str = "__health_probe__";
#[derive(RustEmbed)]
#[folder = "$CARGO_MANIFEST_DIR/ui/dist/"]
struct WebAssets;
pub fn router() -> Router<AppState> {
let router = Router::new()
.route("/api/v1/status", get(status))
.route("/api/v1/config", get(config))
.route("/api/v1/palaces", get(list_palaces).post(create_palace))
.route(
"/api/v1/palaces/{id}",
get(get_palace_handler)
.delete(delete_palace_handler)
.patch(update_palace_handler),
)
.route(
"/api/v1/palaces/{id}/drawers",
get(list_drawers).post(create_drawer),
)
.route(
"/api/v1/palaces/{id}/drawers/{drawer_id}",
delete(delete_drawer),
)
.route(
"/api/v1/palaces/{id}/memories",
get(list_drawers).post(create_drawer),
)
.route(
"/api/v1/palaces/{id}/memories/{drawer_id}",
delete(delete_drawer),
)
.route("/api/v1/palaces/{id}/recall", get(recall_handler))
.route("/api/v1/recall", get(recall_all_handler))
.route("/api/v1/palaces/{id}/kg", get(kg_query).post(kg_assert))
.route("/api/v1/palaces/{id}/kg/subjects", get(kg_list_subjects))
.route(
"/api/v1/palaces/{id}/kg/subjects_with_counts",
get(kg_list_subjects_with_counts),
)
.route("/api/v1/palaces/{id}/kg/all", get(kg_list_all))
.route("/api/v1/palaces/{id}/kg/graph", get(kg_graph))
.route("/api/v1/palaces/{id}/kg/count", get(kg_count))
.route(
"/api/v1/palaces/{id}/kg/triples/{triple_id}",
delete(kg_delete_triple),
)
.route(
"/api/v1/palaces/{id}/dream/status",
get(palace_dream_status),
)
.route("/api/v1/dream/status", get(dream_status))
.route("/api/v1/dream/run", post(dream_run))
.route("/api/v1/kg/gaps", get(kg_gaps_handler))
.route("/api/v1/kg/prompt-context", get(prompt_context_handler))
.route("/api/v1/kg/aliases", post(add_alias_handler))
.route(
"/api/v1/kg/prompt-facts",
get(list_prompt_facts_handler).delete(remove_prompt_fact_handler),
)
.route("/api/v1/chat", post(crate::chat::chat_handler))
.route("/api/v1/chat/providers", get(crate::chat::list_providers))
.route(
"/api/v1/palaces/{id}/chat/sessions",
get(crate::chat::list_chat_sessions).post(crate::chat::create_chat_session),
)
.route(
"/api/v1/palaces/{id}/chat/sessions/{session_id}",
get(crate::chat::get_chat_session).delete(crate::chat::delete_chat_session),
)
.route(
"/api/v1/messages",
get(crate::chat::list_messages_handler).post(crate::chat::send_message_handler),
)
.route(
"/api/v1/messages/mark_read",
post(crate::chat::mark_message_read_handler),
)
.route("/health", get(health))
.route("/api/v1/logs/tail", get(logs_tail))
.route("/api/v1/activity", get(activity_handler))
.route("/api/v1/activity/hook", post(hook_activity_handler))
.route("/api/v1/admin/stop", post(admin_stop))
.route("/api/v1/remember", post(remember_async))
.route("/rpc", post(rpc_handler))
.fallback(static_handler);
trusty_common::server::with_standard_middleware(router)
}
#[derive(serde::Serialize)]
struct HealthResponse {
status: String,
#[serde(skip_serializing_if = "Option::is_none")]
detail: Option<String>,
version: &'static str,
rss_mb: u64,
disk_bytes: u64,
cpu_pct: f32,
uptime_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
addr: Option<String>,
}
async fn health(State(state): State<AppState>) -> Json<HealthResponse> {
let (rss_mb, cpu_pct) = {
let mut metrics = state.sys_metrics.lock().await;
metrics.sample()
};
let disk_bytes = state.disk_bytes.load(std::sync::atomic::Ordering::Relaxed);
let uptime_secs = state.started_at.elapsed().as_secs();
let addr = state.bound_addr.get().map(|a| a.to_string());
let (status, detail) = match run_health_round_trip(&state).await {
Ok(()) => ("ok".to_string(), None),
Err(err) => {
tracing::warn!("/health round-trip degraded: {err}");
("degraded".to_string(), Some(err.to_string()))
}
};
Json(HealthResponse {
status,
detail,
version: env!("CARGO_PKG_VERSION"),
rss_mb,
disk_bytes,
cpu_pct,
uptime_secs,
addr,
})
}
#[derive(Debug, thiserror::Error)]
enum HealthProbeError {
#[error("open palace failed: {0}")]
OpenPalace(String),
#[error("provision health probe palace failed: {0}")]
EnsureProbePalace(String),
#[error("store failed: {0}")]
Store(String),
#[error("recall failed: {0}")]
Recall(String),
#[error("recall did not return the probe drawer (id={0})")]
ProbeMissing(Uuid),
#[error("delete probe drawer failed: {0}")]
Delete(String),
}
fn ensure_health_probe_palace(state: &AppState) -> Result<(), HealthProbeError> {
let id = PalaceId::new(HEALTH_PROBE_PALACE);
if state.registry.get(&id).is_some() {
return Ok(());
}
if state.registry.open_palace(&state.data_root, &id).is_ok() {
return Ok(());
}
let palace = Palace {
id: id.clone(),
name: HEALTH_PROBE_PALACE.to_string(),
description: Some(
"Internal health-probe palace (issue #185). Hidden from listings; \
holds short-lived round-trip drawers cleaned up on every probe."
.to_string(),
),
created_at: chrono::Utc::now(),
data_dir: state.data_root.join(HEALTH_PROBE_PALACE),
};
state
.registry
.create_palace(&state.data_root, palace)
.map_err(|e| HealthProbeError::EnsureProbePalace(format!("{e:#}")))?;
Ok(())
}
async fn run_health_round_trip(state: &AppState) -> Result<(), HealthProbeError> {
ensure_health_probe_palace(state)?;
let probe_id = PalaceId::new(HEALTH_PROBE_PALACE);
let handle = state
.registry
.open_palace(&state.data_root, &probe_id)
.map_err(|e| HealthProbeError::OpenPalace(format!("{e:#}")))?;
run_health_round_trip_inner(handle, |handle, query| async move {
recall_with_default_embedder(&handle, &query, 5)
.await
.map_err(|e| HealthProbeError::Recall(format!("{e:#}")))
})
.await
}
async fn run_health_round_trip_inner<F, Fut>(
handle: std::sync::Arc<trusty_common::memory_core::PalaceHandle>,
recall: F,
) -> Result<(), HealthProbeError>
where
F: FnOnce(std::sync::Arc<trusty_common::memory_core::PalaceHandle>, String) -> Fut,
Fut: std::future::Future<
Output = Result<Vec<trusty_common::memory_core::retrieval::RecallResult>, HealthProbeError>,
>,
{
let probe_token = Uuid::new_v4();
let probe_content = format!("__trusty_memory_healthcheck__ probe {probe_token}");
let drawer_id = handle
.remember(
probe_content.clone(),
RoomType::General,
vec!["healthcheck".to_string()],
0.0,
)
.await
.map_err(|e| HealthProbeError::Store(format!("{e:#}")))?;
let recall_result = recall(handle.clone(), probe_content).await;
let delete_result = handle.forget(drawer_id).await;
match recall_result {
Ok(hits) => {
if !hits.iter().any(|hit| hit.drawer.id == drawer_id) {
return Err(HealthProbeError::ProbeMissing(drawer_id));
}
}
Err(e) => return Err(e),
}
delete_result.map_err(|e| HealthProbeError::Delete(format!("{e:#}")))?;
Ok(())
}
const DEFAULT_LOGS_TAIL_N: usize = 100;
const MAX_LOGS_TAIL_N: usize = trusty_common::log_buffer::DEFAULT_LOG_CAPACITY;
fn default_logs_tail_n() -> usize {
DEFAULT_LOGS_TAIL_N
}
#[derive(serde::Deserialize)]
struct LogsTailParams {
#[serde(default = "default_logs_tail_n")]
n: usize,
}
async fn logs_tail(
State(state): State<AppState>,
Query(params): Query<LogsTailParams>,
) -> Json<Value> {
let n = params.n.clamp(1, MAX_LOGS_TAIL_N);
let lines = state.log_buffer.tail(n);
Json(serde_json::json!({
"lines": lines,
"total": state.log_buffer.len(),
}))
}
async fn admin_stop(State(_state): State<AppState>) -> Json<Value> {
tracing::warn!("admin_stop: shutdown requested via POST /api/v1/admin/stop");
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
std::process::exit(0);
});
Json(serde_json::json!({ "ok": true, "message": "shutting down" }))
}
#[derive(Debug, Deserialize)]
struct RememberAsyncBody {
content: String,
#[serde(default)]
palace: Option<String>,
#[serde(default)]
tags: Option<Vec<String>>,
}
async fn remember_async(
State(state): State<AppState>,
Json(body): Json<RememberAsyncBody>,
) -> Result<(StatusCode, Json<Value>), ApiError> {
let content = body.content.trim();
if content.is_empty() {
return Err(ApiError::bad_request(
"remember: 'content' must be a non-empty string",
));
}
let mut args = serde_json::Map::new();
args.insert("text".to_string(), Value::String(content.to_string()));
if let Some(p) = body.palace {
args.insert("palace".to_string(), Value::String(p));
}
if let Some(tags) = body.tags {
args.insert(
"tags".to_string(),
Value::Array(tags.into_iter().map(Value::String).collect()),
);
}
let args = Value::Object(args);
let state_for_task = state.clone();
tokio::spawn(async move {
match crate::tools::dispatch_tool(&state_for_task, "memory_remember", args).await {
Ok(v) => {
tracing::debug!(target: "trusty_memory::remember_async", result = %v, "queued remember succeeded");
}
Err(e) => {
tracing::warn!(
target: "trusty_memory::remember_async",
error = %format!("{e:#}"),
"queued remember failed (caller already returned 202)",
);
}
}
});
Ok((StatusCode::ACCEPTED, Json(json!({ "status": "queued" }))))
}
const ACTIVITY_DEFAULT_LIMIT: usize = 50;
const ACTIVITY_MAX_LIMIT: usize = 500;
#[derive(Deserialize, Debug, Default)]
struct ActivityQuery {
#[serde(default)]
limit: Option<usize>,
#[serde(default)]
offset: Option<usize>,
#[serde(default)]
palace: Option<String>,
#[serde(default)]
source: Option<String>,
#[serde(default)]
since: Option<String>,
#[serde(default)]
until: Option<String>,
}
#[derive(Serialize, Debug)]
struct ActivityRow {
id: u64,
timestamp: chrono::DateTime<chrono::Utc>,
source: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
palace_id: Option<String>,
event_type: String,
payload: Value,
}
async fn activity_handler(
State(state): State<AppState>,
Query(q): Query<ActivityQuery>,
) -> Result<Json<Value>, ApiError> {
let limit = q
.limit
.unwrap_or(ACTIVITY_DEFAULT_LIMIT)
.clamp(1, ACTIVITY_MAX_LIMIT);
let offset = q.offset.unwrap_or(0);
let source = match q.source.as_deref() {
Some(s) => match ActivitySource::parse(s) {
Some(parsed) => Some(parsed),
None => {
return Err(ApiError::bad_request(format!(
"unknown source '{s}'; expected one of http, mcp, hook",
)))
}
},
None => None,
};
let since = parse_iso_or_bad_request(q.since.as_deref(), "since")?;
let until = parse_iso_or_bad_request(q.until.as_deref(), "until")?;
let filter = ActivityFilter {
palace_id: q.palace.filter(|s| !s.is_empty()),
source,
since,
until,
};
let entries = state
.activity_log
.list(&filter, limit, offset)
.map_err(|e| ApiError::internal(format!("activity list: {e:#}")))?;
let total = state
.activity_log
.count()
.map_err(|e| ApiError::internal(format!("activity count: {e:#}")))?;
let rows: Vec<ActivityRow> = entries
.into_iter()
.map(|e| {
let payload = serde_json::from_str::<Value>(&e.payload)
.unwrap_or_else(|_| Value::String(e.payload.clone()));
ActivityRow {
id: e.id,
timestamp: e.timestamp,
source: e.source.as_str(),
palace_id: e.palace_id,
event_type: e.event_type,
payload,
}
})
.collect();
Ok(Json(json!({
"entries": rows,
"total": total,
"limit": limit,
"offset": offset,
})))
}
async fn hook_activity_handler(
State(state): State<AppState>,
Json(payload): Json<HookEventPayload>,
) -> Result<StatusCode, ApiError> {
state.emit(DaemonEvent::HookFired {
palace_id: payload.palace_id,
palace_name: payload.palace_name,
hook_type: payload.hook_type,
injection_kind: payload.injection_kind,
injection_length: payload.injection_length,
trigger_prompt_excerpt: payload.trigger_prompt_excerpt,
timestamp: chrono::Utc::now(),
duration_ms: payload.duration_ms,
source: ActivitySource::Hook,
});
Ok(StatusCode::NO_CONTENT)
}
async fn rpc_handler(
State(state): State<AppState>,
Json(req): Json<crate::transport::rpc::JsonRpcRequest>,
) -> Json<crate::transport::rpc::JsonRpcResponse> {
let resp = crate::transport::rpc::dispatch(&state, req).await;
Json(resp)
}
pub(crate) fn creator_info_from_http(headers: &HeaderMap) -> CreatorInfo {
let client = headers
.get(X_TRUSTY_CLIENT_NAME)
.and_then(|v| v.to_str().ok())
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.unwrap_or(HTTP_DEFAULT_CLIENT)
.to_string();
let cwd = headers
.get(X_TRUSTY_CLIENT_CWD)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.filter(|s| !s.is_empty());
CreatorInfo {
client,
version: env!("CARGO_PKG_VERSION").to_string(),
source: CreatorSource::Http,
cwd,
}
}
fn parse_iso_or_bad_request(
s: Option<&str>,
field: &str,
) -> Result<Option<chrono::DateTime<chrono::Utc>>, ApiError> {
match s {
None | Some("") => Ok(None),
Some(raw) => chrono::DateTime::parse_from_rfc3339(raw)
.map(|dt| Some(dt.with_timezone(&chrono::Utc)))
.map_err(|e| ApiError::bad_request(format!("invalid {field} (RFC 3339): {e}"))),
}
}
async fn static_handler(req: Request<Body>) -> Response {
let path = req.uri().path().trim_start_matches('/').to_string();
if path.starts_with("api/") {
return (StatusCode::NOT_FOUND, "not found").into_response();
}
serve_embedded(&path).unwrap_or_else(|| {
serve_embedded("index.html")
.unwrap_or_else(|| (StatusCode::NOT_FOUND, "ui assets missing").into_response())
})
}
fn serve_embedded(path: &str) -> Option<Response> {
let path = if path.is_empty() { "index.html" } else { path };
let asset = WebAssets::get(path)?;
let mime = mime_guess::from_path(path).first_or_octet_stream();
let body = Body::from(asset.data.into_owned());
let mut resp = Response::new(body);
resp.headers_mut().insert(
header::CONTENT_TYPE,
HeaderValue::from_str(mime.as_ref())
.unwrap_or_else(|_| HeaderValue::from_static("application/octet-stream")),
);
Some(resp)
}
pub(crate) use crate::service::StatusPayload;
async fn status(State(state): State<AppState>) -> Json<StatusPayload> {
Json(crate::service::MemoryService::new(state).status().await)
}
#[derive(Serialize)]
struct ConfigPayload {
openrouter_configured: bool,
model: String,
data_root: String,
}
async fn config(State(state): State<AppState>) -> Json<ConfigPayload> {
let cfg = load_user_config().unwrap_or_default();
Json(ConfigPayload {
openrouter_configured: !cfg.openrouter_api_key.is_empty(),
model: cfg.openrouter_model,
data_root: state.data_root.display().to_string(),
})
}
pub(crate) use crate::service::load_user_config;
#[allow(unused_imports)]
pub(crate) use crate::service::LoadedUserConfig;
pub(crate) use crate::service::{palace_info_from, CreatePalaceBody, PalaceInfo};
async fn list_palaces(State(state): State<AppState>) -> Result<Json<Vec<PalaceInfo>>, ApiError> {
Ok(Json(
crate::service::MemoryService::new(state)
.list_palaces()
.await?,
))
}
async fn create_palace(
State(state): State<AppState>,
Json(body): Json<CreatePalaceBody>,
) -> Result<Json<Value>, ApiError> {
let id = crate::service::MemoryService::new(state)
.create_palace(body, ActivitySource::Http)
.await?;
Ok(Json(json!({ "id": id })))
}
async fn get_palace_handler(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
) -> Result<Json<PalaceInfo>, ApiError> {
Ok(Json(
crate::service::MemoryService::new(state)
.get_palace(&id)
.await?,
))
}
#[derive(Deserialize, Default)]
struct DeletePalaceQuery {
#[serde(default)]
force: Option<bool>,
}
async fn delete_palace_handler(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Query(q): Query<DeletePalaceQuery>,
) -> Result<StatusCode, ApiError> {
crate::service::MemoryService::new(state)
.delete_palace(&id, q.force.unwrap_or(false))
.await?;
Ok(StatusCode::NO_CONTENT)
}
#[derive(Deserialize)]
struct UpdatePalaceBody {
name: String,
}
async fn update_palace_handler(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Json(body): Json<UpdatePalaceBody>,
) -> Result<Json<Value>, ApiError> {
let value = crate::service::MemoryService::new(state)
.update_palace_name_typed(&id, &body.name)
.await?;
Ok(Json(value))
}
pub(crate) use crate::service::{CreateDrawerBody, ListDrawersQuery};
async fn list_drawers(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Query(q): Query<ListDrawersQuery>,
) -> Result<Json<Value>, ApiError> {
Ok(Json(
crate::service::MemoryService::new(state)
.list_drawers(&id, q)
.await?,
))
}
async fn create_drawer(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
headers: HeaderMap,
Json(body): Json<CreateDrawerBody>,
) -> Result<Json<Value>, ApiError> {
let creator = creator_info_from_http(&headers);
let drawer_id = crate::service::MemoryService::new(state)
.create_drawer(&id, body, creator, ActivitySource::Http)
.await?;
Ok(Json(json!({ "id": drawer_id })))
}
async fn delete_drawer(
State(state): State<AppState>,
AxumPath((id, drawer_id)): AxumPath<(String, String)>,
) -> Result<StatusCode, ApiError> {
crate::service::MemoryService::new(state)
.delete_drawer(&id, &drawer_id, ActivitySource::Http)
.await?;
Ok(StatusCode::NO_CONTENT)
}
#[derive(Deserialize)]
struct RecallQuery {
q: String,
#[serde(default)]
top_k: Option<usize>,
#[serde(default)]
deep: Option<bool>,
}
async fn recall_handler(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Query(q): Query<RecallQuery>,
) -> Result<Json<Value>, ApiError> {
Ok(Json(
crate::service::MemoryService::new(state)
.recall(&id, &q.q, q.top_k.unwrap_or(10), q.deep.unwrap_or(false))
.await?,
))
}
#[allow(unused_imports)]
pub(crate) use crate::service::recall_entry_json;
async fn recall_all_handler(
State(state): State<AppState>,
Query(q): Query<RecallQuery>,
) -> Result<Json<Value>, ApiError> {
let value = crate::service::MemoryService::new(state)
.recall_all(&q.q, q.top_k.unwrap_or(10), q.deep.unwrap_or(false))
.await;
if let Some(err) = value.get("error").and_then(|v| v.as_str()) {
return Err(ApiError::internal(err.to_string()));
}
Ok(Json(value))
}
#[derive(Deserialize)]
struct KgQueryParams {
subject: String,
}
async fn kg_query(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Query(q): Query<KgQueryParams>,
) -> Result<Json<Vec<Triple>>, ApiError> {
Ok(Json(
crate::service::MemoryService::new(state)
.kg_query(&id, &q.subject)
.await?,
))
}
pub(crate) use crate::service::KgAssertBody;
async fn kg_assert(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Json(body): Json<KgAssertBody>,
) -> Result<StatusCode, ApiError> {
crate::service::MemoryService::new(state)
.kg_assert(&id, body)
.await?;
Ok(StatusCode::NO_CONTENT)
}
const DEFAULT_KG_LIST_LIMIT: usize = 50;
const MAX_KG_LIST_LIMIT: usize = 200;
fn default_kg_list_limit() -> usize {
DEFAULT_KG_LIST_LIMIT
}
#[derive(Deserialize)]
struct KgListSubjectsParams {
#[serde(default = "default_kg_list_limit")]
limit: usize,
}
async fn kg_list_subjects(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Query(q): Query<KgListSubjectsParams>,
) -> Result<Json<Vec<String>>, ApiError> {
let limit = q.limit.clamp(1, MAX_KG_LIST_LIMIT);
Ok(Json(
crate::service::MemoryService::new(state)
.kg_list_subjects(&id, limit)
.await?,
))
}
async fn kg_list_subjects_with_counts(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Query(q): Query<KgListSubjectsParams>,
) -> Result<Json<Vec<Value>>, ApiError> {
let limit = q.limit.clamp(1, MAX_KG_LIST_LIMIT);
let rows = crate::service::MemoryService::new(state)
.kg_list_subjects_with_counts(&id, limit)
.await?;
let out: Vec<Value> = rows
.into_iter()
.map(|(subject, count)| json!({ "subject": subject, "count": count }))
.collect();
Ok(Json(out))
}
#[derive(Deserialize)]
struct KgListAllParams {
#[serde(default = "default_kg_list_limit")]
limit: usize,
#[serde(default)]
offset: usize,
}
async fn kg_list_all(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Query(q): Query<KgListAllParams>,
) -> Result<Json<Vec<Triple>>, ApiError> {
let limit = q.limit.clamp(1, MAX_KG_LIST_LIMIT);
Ok(Json(
crate::service::MemoryService::new(state)
.kg_list_all(&id, limit, q.offset)
.await?,
))
}
async fn kg_count(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
) -> Result<Json<Value>, ApiError> {
let active = crate::service::MemoryService::new(state)
.kg_count(&id)
.await?;
Ok(Json(json!({ "active": active })))
}
const TRIPLE_ID_SEPARATOR: u8 = 0x00;
#[allow(dead_code)]
pub(crate) fn encode_triple_id(subject: &str, predicate: &str) -> String {
use base64::Engine as _;
let mut buf = Vec::with_capacity(subject.len() + 1 + predicate.len());
buf.extend_from_slice(subject.as_bytes());
buf.push(TRIPLE_ID_SEPARATOR);
buf.extend_from_slice(predicate.as_bytes());
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&buf)
}
pub(crate) fn decode_triple_id(id: &str) -> Option<(String, String)> {
use base64::Engine as _;
let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
.decode(id)
.ok()?;
let sep_pos = bytes.iter().position(|&b| b == TRIPLE_ID_SEPARATOR)?;
let subject = String::from_utf8(bytes[..sep_pos].to_vec()).ok()?;
let predicate = String::from_utf8(bytes[sep_pos + 1..].to_vec()).ok()?;
Some((subject, predicate))
}
async fn kg_delete_triple(
State(state): State<AppState>,
AxumPath((id, triple_id)): AxumPath<(String, String)>,
) -> Result<StatusCode, ApiError> {
let (subject, predicate) = decode_triple_id(&triple_id).ok_or_else(|| {
ApiError::not_found("invalid triple id — expected base64url(subject\\0predicate)")
})?;
let found = crate::service::MemoryService::new(state)
.kg_retract_triple(&id, &subject, &predicate)
.await?;
if found {
Ok(StatusCode::NO_CONTENT)
} else {
Err(ApiError::not_found(format!(
"no active triple with subject={subject:?} predicate={predicate:?} in palace {id:?}"
)))
}
}
pub(crate) use crate::service::KgGraphPayload;
async fn kg_graph(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
) -> Result<Json<KgGraphPayload>, ApiError> {
Ok(Json(
crate::service::MemoryService::new(state)
.kg_graph(&id)
.await?,
))
}
pub(crate) use crate::service::DreamStatusPayload;
async fn dream_status(State(state): State<AppState>) -> Json<DreamStatusPayload> {
Json(
crate::service::MemoryService::new(state)
.dream_status_aggregate()
.await,
)
}
async fn palace_dream_status(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
) -> Result<Json<DreamStatusPayload>, ApiError> {
Ok(Json(
crate::service::MemoryService::new(state)
.dream_status_for_palace(&id)
.await?,
))
}
async fn dream_run(State(state): State<AppState>) -> Result<Json<DreamStatusPayload>, ApiError> {
Ok(Json(
crate::service::MemoryService::new(state)
.dream_run()
.await?,
))
}
#[derive(Serialize, Debug, Clone)]
pub struct KnowledgeGapResponse {
pub entities: Vec<String>,
pub internal_density: f32,
pub external_bridges: usize,
pub suggested_exploration: String,
}
impl From<KnowledgeGap> for KnowledgeGapResponse {
fn from(g: KnowledgeGap) -> Self {
Self {
entities: g.entities,
internal_density: g.internal_density,
external_bridges: g.external_bridges,
suggested_exploration: g.suggested_exploration,
}
}
}
#[derive(Deserialize)]
struct KgGapsQuery {
#[serde(default)]
palace: Option<String>,
}
async fn kg_gaps_handler(
State(state): State<AppState>,
Query(q): Query<KgGapsQuery>,
) -> Result<Json<Vec<KnowledgeGapResponse>>, ApiError> {
let palace_name = q
.palace
.clone()
.or_else(|| state.default_palace.clone())
.ok_or_else(|| {
ApiError::bad_request("missing 'palace' query parameter (no default palace configured)")
})?;
let _handle = open_handle(&state, &palace_name)?;
let pid = PalaceId::new(&palace_name);
let gaps = state.registry.get_gaps(&pid).unwrap_or_default();
let body: Vec<KnowledgeGapResponse> =
gaps.into_iter().map(KnowledgeGapResponse::from).collect();
Ok(Json(body))
}
#[derive(Deserialize)]
struct PromptFactsQuery {
#[serde(default)]
#[allow(dead_code)]
palace: Option<String>,
}
#[derive(Deserialize)]
struct AddAliasRequest {
short: String,
full: String,
#[serde(default)]
palace: Option<String>,
}
#[derive(Serialize)]
struct PromptFactRow {
subject: String,
predicate: String,
object: String,
}
#[derive(Deserialize)]
struct RemovePromptFactQuery {
subject: String,
predicate: String,
#[serde(default)]
#[allow(dead_code)]
object: Option<String>,
#[serde(default)]
#[allow(dead_code)]
palace: Option<String>,
}
async fn prompt_context_handler(
State(state): State<AppState>,
Query(_q): Query<PromptFactsQuery>,
) -> Result<Response, ApiError> {
let cache_snapshot = {
let guard = state.prompt_context_cache.read().await;
guard.clone()
};
let body = if cache_snapshot.formatted.is_empty() {
"No prompt facts stored yet.".to_string()
} else {
cache_snapshot.formatted
};
let mut resp = body.into_response();
resp.headers_mut().insert(
header::CONTENT_TYPE,
HeaderValue::from_static("text/plain; charset=utf-8"),
);
Ok(resp)
}
async fn add_alias_handler(
State(state): State<AppState>,
Json(req): Json<AddAliasRequest>,
) -> Result<Json<Value>, ApiError> {
if req.short.is_empty() || req.full.is_empty() {
return Err(ApiError::bad_request("short and full are required"));
}
let palace_name = req
.palace
.clone()
.or_else(|| state.default_palace.clone())
.ok_or_else(|| ApiError::bad_request("missing 'palace' (no default palace configured)"))?;
let handle = open_handle(&state, &palace_name)?;
let triple = Triple {
subject: req.short.clone(),
predicate: "is_alias_for".to_string(),
object: req.full.clone(),
valid_from: chrono::Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: Some("add_alias_http".to_string()),
};
handle
.kg
.assert(triple)
.await
.map_err(|e| ApiError::internal(format!("kg.assert failed: {e:#}")))?;
if let Err(e) = crate::prompt_facts::rebuild_prompt_cache(&state).await {
tracing::warn!("rebuild_prompt_cache after HTTP add_alias failed: {e:#}");
}
Ok(Json(json!({
"subject": req.short,
"predicate": "is_alias_for",
"object": req.full,
"palace": palace_name,
})))
}
async fn list_prompt_facts_handler(
State(state): State<AppState>,
Query(_q): Query<PromptFactsQuery>,
) -> Result<Json<Vec<PromptFactRow>>, ApiError> {
let triples = crate::prompt_facts::gather_hot_triples(&state)
.await
.map_err(|e| ApiError::internal(format!("gather_hot_triples: {e:#}")))?;
let rows: Vec<PromptFactRow> = triples
.into_iter()
.map(|(subject, predicate, object)| PromptFactRow {
subject,
predicate,
object,
})
.collect();
Ok(Json(rows))
}
async fn remove_prompt_fact_handler(
State(state): State<AppState>,
Query(q): Query<RemovePromptFactQuery>,
) -> Result<Json<Value>, ApiError> {
if q.subject.is_empty() || q.predicate.is_empty() {
return Err(ApiError::bad_request("subject and predicate are required"));
}
let mut closed_total: usize = 0;
for palace_id in state.registry.list() {
if let Some(handle) = state.registry.get(&palace_id) {
match handle.kg.retract(&q.subject, &q.predicate).await {
Ok(n) => closed_total += n,
Err(e) => tracing::warn!(
palace = %palace_id.as_str(),
"HTTP retract failed: {e:#}",
),
}
}
}
if closed_total > 0 {
if let Err(e) = crate::prompt_facts::rebuild_prompt_cache(&state).await {
tracing::warn!("rebuild_prompt_cache after HTTP remove_prompt_fact failed: {e:#}");
}
Ok(Json(json!({"removed": true, "closed": closed_total})))
} else {
Ok(Json(json!({"removed": false, "reason": "not found"})))
}
}
#[allow(unused_imports)]
pub(crate) use crate::service::refresh_gaps_cache;
pub(crate) fn open_handle(
state: &AppState,
id: &str,
) -> Result<std::sync::Arc<trusty_common::memory_core::PalaceHandle>, ApiError> {
state
.registry
.open_palace(&state.data_root, &PalaceId::new(id))
.map_err(|e| ApiError::not_found(format!("palace not found: {id} ({e:#})")))
}
pub(crate) struct ApiError {
status: StatusCode,
message: String,
}
impl ApiError {
pub(crate) fn bad_request(msg: impl Into<String>) -> Self {
Self {
status: StatusCode::BAD_REQUEST,
message: msg.into(),
}
}
pub(crate) fn not_found(msg: impl Into<String>) -> Self {
Self {
status: StatusCode::NOT_FOUND,
message: msg.into(),
}
}
#[allow(dead_code)]
pub(crate) fn conflict(msg: impl Into<String>) -> Self {
Self {
status: StatusCode::CONFLICT,
message: msg.into(),
}
}
pub(crate) fn internal(msg: impl Into<String>) -> Self {
Self {
status: StatusCode::INTERNAL_SERVER_ERROR,
message: msg.into(),
}
}
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
(self.status, Json(json!({ "error": self.message }))).into_response()
}
}
impl From<crate::service::ServiceError> for ApiError {
fn from(e: crate::service::ServiceError) -> Self {
match e {
crate::service::ServiceError::BadRequest(m) => ApiError::bad_request(m),
crate::service::ServiceError::NotFound(m) => ApiError::not_found(m),
crate::service::ServiceError::Conflict(m) => ApiError::conflict(m),
crate::service::ServiceError::Internal(m) => ApiError::internal(m),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::service::drawer_content_preview;
use crate::service::DRAWER_PREVIEW_MAX_CHARS;
use axum::body::to_bytes;
use axum::http::Request;
use tower::util::ServiceExt;
use trusty_common::memory_core::palace::Palace;
use trusty_common::memory_core::retrieval::RecallResult;
fn test_state() -> AppState {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
std::mem::forget(tmp);
unsafe {
std::env::set_var("TRUSTY_SKIP_PALACE_ENFORCEMENT", "1");
}
AppState::new(root)
}
#[test]
fn drawer_preview_collapses_whitespace_and_truncates() {
assert_eq!(drawer_content_preview("hello world"), "hello world");
assert_eq!(
drawer_content_preview("first line\n\nsecond\tline third"),
"first line second line third"
);
assert_eq!(drawer_content_preview(" padded "), "padded");
assert_eq!(drawer_content_preview(""), "");
let long = "x".repeat(DRAWER_PREVIEW_MAX_CHARS + 50);
let preview = drawer_content_preview(&long);
assert_eq!(preview.chars().count(), DRAWER_PREVIEW_MAX_CHARS);
assert!(preview.ends_with('…'));
let exact = "y".repeat(DRAWER_PREVIEW_MAX_CHARS);
assert_eq!(drawer_content_preview(&exact), exact);
}
#[tokio::test]
#[ignore = "loads the default ONNX embedder; run with --include-ignored"]
async fn health_endpoint_returns_ok() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 1024).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["status"], "ok");
assert_eq!(v["version"], env!("CARGO_PKG_VERSION"));
}
#[tokio::test]
#[ignore = "loads the default ONNX embedder; run with --include-ignored"]
async fn health_endpoint_includes_resource_fields() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 1024).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let rss_mb = v["rss_mb"].as_u64().expect("rss_mb is u64");
assert!(rss_mb < 1024 * 1024, "rss_mb unit must be MB");
let cpu = v["cpu_pct"].as_f64().expect("cpu_pct is a number");
assert!(cpu >= 0.0, "cpu_pct must be non-negative");
assert_eq!(v["disk_bytes"].as_u64(), Some(0));
assert!(v["uptime_secs"].is_u64(), "uptime_secs must be present");
}
#[tokio::test]
#[ignore = "loads the default ONNX embedder; run with --include-ignored"]
async fn health_endpoint_round_trip_on_fresh_install_is_ok() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 1024).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["status"], "ok");
assert!(
v.get("detail").is_none() || v["detail"].is_null(),
"fresh-install health must not carry a degraded detail (got {v:?})"
);
}
#[tokio::test]
#[ignore = "loads the default ONNX embedder; run with --include-ignored"]
async fn health_endpoint_round_trip_with_palace_is_ok() {
let state = test_state();
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("health-probe-palace"),
name: "health-probe-palace".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("health-probe-palace"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create_palace");
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 2048).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
v["status"], "ok",
"round-trip should succeed against a fresh palace; got {v:?}"
);
assert!(
v.get("detail").is_none() || v["detail"].is_null(),
"successful round-trip must not carry a detail field (got {v:?})"
);
}
#[tokio::test]
async fn health_probe_palace_is_invisible() {
let state = test_state();
ensure_health_probe_palace(&state).expect("ensure_health_probe_palace");
assert!(
state.data_root.join(HEALTH_PROBE_PALACE).exists(),
"probe palace directory should be persisted on disk"
);
let service = crate::service::MemoryService::new(state);
let listed = service.list_palaces().await.expect("list_palaces");
assert!(
listed.iter().all(|p| !p.id.starts_with("__")),
"no `__`-prefixed palace may appear in the user-facing list; got {:?}",
listed.iter().map(|p| &p.id).collect::<Vec<_>>()
);
assert!(
!listed.iter().any(|p| p.id == HEALTH_PROBE_PALACE),
"the dedicated `__health_probe__` palace must be invisible; got {:?}",
listed.iter().map(|p| &p.id).collect::<Vec<_>>()
);
}
#[tokio::test]
async fn health_probe_cleans_up_on_success() {
use trusty_common::memory_core::Drawer;
let state = test_state();
ensure_health_probe_palace(&state).expect("ensure_health_probe_palace");
let handle = state
.registry
.open_palace(&state.data_root, &PalaceId::new(HEALTH_PROBE_PALACE))
.expect("open probe palace");
let result = run_health_round_trip_inner(handle.clone(), move |h, _query| async move {
let drawers = h.drawers.read();
let last = drawers
.last()
.cloned()
.unwrap_or_else(|| Drawer::new(Uuid::new_v4(), "stub"));
drop(drawers);
Ok(vec![RecallResult {
drawer: last,
score: 1.0,
layer: 1,
}])
})
.await;
assert!(
result.is_ok(),
"successful round-trip should return Ok; got {result:?}"
);
let drawer_count = handle.drawers.read().len();
assert_eq!(
drawer_count, 0,
"probe palace must have zero drawers after a successful round-trip (got {drawer_count})"
);
}
#[tokio::test]
async fn health_probe_cleans_up_on_recall_miss() {
let state = test_state();
ensure_health_probe_palace(&state).expect("ensure_health_probe_palace");
let handle = state
.registry
.open_palace(&state.data_root, &PalaceId::new(HEALTH_PROBE_PALACE))
.expect("open probe palace");
let result = run_health_round_trip_inner(handle.clone(), |_h, _q| async move {
Ok(Vec::new())
})
.await;
assert!(
matches!(result, Err(HealthProbeError::ProbeMissing(_))),
"recall miss must surface as ProbeMissing; got {result:?}"
);
let drawer_count = handle.drawers.read().len();
assert_eq!(
drawer_count, 0,
"probe palace must be empty after a recall miss (got {drawer_count})"
);
}
#[tokio::test]
async fn health_probe_cleans_up_on_recall_error() {
let state = test_state();
ensure_health_probe_palace(&state).expect("ensure_health_probe_palace");
let handle = state
.registry
.open_palace(&state.data_root, &PalaceId::new(HEALTH_PROBE_PALACE))
.expect("open probe palace");
let result = run_health_round_trip_inner(handle.clone(), |_h, _q| async move {
Err(HealthProbeError::Recall("simulated failure".to_string()))
})
.await;
assert!(
matches!(result, Err(HealthProbeError::Recall(_))),
"recall error must surface as Recall; got {result:?}"
);
let drawer_count = handle.drawers.read().len();
assert_eq!(
drawer_count, 0,
"probe palace must be empty after a recall error (got {drawer_count})"
);
}
#[test]
fn recall_entry_json_hoists_drawer_fields() {
use trusty_common::memory_core::Drawer;
let room = Uuid::new_v4();
let mut drawer = Drawer::new(room, "the answer is 42");
drawer.tags = vec!["source:kuzu".to_string()];
drawer.importance = 0.7;
let entry = recall_entry_json(RecallResult {
drawer,
score: 0.699,
layer: 1,
});
assert_eq!(
entry.get("content").and_then(|v| v.as_str()),
Some("the answer is 42"),
"content must be at the top level, got {entry:?}"
);
assert!(
entry.get("drawer").is_none(),
"the legacy `drawer` wrapper must not be present, got {entry:?}"
);
assert_eq!(
entry["importance"].as_f64().map(|f| (f * 10.0).round()),
Some(7.0)
);
assert_eq!(
entry["tags"][0].as_str(),
Some("source:kuzu"),
"tags must be hoisted, got {entry:?}"
);
assert_eq!(entry["layer"].as_u64(), Some(1));
assert!(
entry["score"]
.as_f64()
.is_some_and(|s| (s - 0.699).abs() < 1e-6),
"score must be preserved, got {entry:?}"
);
}
#[tokio::test]
async fn logs_tail_returns_recent_lines() {
let buffer = trusty_common::log_buffer::LogBuffer::new(100);
buffer.push("line one".to_string());
buffer.push("line two".to_string());
buffer.push("line three".to_string());
let state = test_state().with_log_buffer(buffer);
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/logs/tail?n=2")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let lines = v["lines"].as_array().expect("lines array");
assert_eq!(lines.len(), 2, "n=2 must return two lines");
assert_eq!(lines[0].as_str(), Some("line two"));
assert_eq!(lines[1].as_str(), Some("line three"));
assert_eq!(v["total"].as_u64(), Some(3));
}
#[tokio::test]
async fn logs_tail_clamps_n() {
let buffer = trusty_common::log_buffer::LogBuffer::new(100);
for i in 0..5 {
buffer.push(format!("l{i}"));
}
let state = test_state().with_log_buffer(buffer);
let app = router().with_state(state);
let resp = app
.clone()
.oneshot(
Request::builder()
.uri("/api/v1/logs/tail?n=0")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["lines"].as_array().expect("lines").len(), 1);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/logs/tail?n=999999")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["lines"].as_array().expect("lines").len(), 5);
}
#[tokio::test]
async fn admin_stop_returns_ok() {
let state = test_state();
let Json(body) = admin_stop(State(state)).await;
assert_eq!(body["ok"], Value::Bool(true));
assert_eq!(body["message"].as_str(), Some("shutting down"));
}
#[tokio::test]
async fn remember_async_returns_202_and_persists() {
let state = test_state();
let palace = Palace {
id: PalaceId::new("remember-async"),
name: "remember-async".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("remember-async"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create_palace");
let app = router().with_state(state.clone());
let body = json!({
"content": "Trusty-memory note CLI ships a fire-and-forget HTTP endpoint for sub-agents.",
"palace": "remember-async",
"tags": ["docs", "note-cli"],
})
.to_string();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/remember")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
resp.status(),
StatusCode::ACCEPTED,
"remember endpoint must respond 202 immediately"
);
let bytes = to_bytes(resp.into_body(), 1024).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["status"], "queued");
let handle = state
.registry
.open_palace(&state.data_root, &PalaceId::new("remember-async"))
.expect("open palace");
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
let count = handle.drawers.read().len();
if count >= 1 {
break;
}
if std::time::Instant::now() >= deadline {
panic!("spawned remember task never persisted a drawer (count={count})");
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
#[tokio::test]
async fn remember_async_rejects_empty_content() {
let state = test_state();
let app = router().with_state(state);
for body in [json!({"content": ""}), json!({"content": " \n "})] {
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/remember")
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
resp.status(),
StatusCode::BAD_REQUEST,
"empty content must be rejected; body={body}"
);
}
}
#[tokio::test]
async fn status_endpoint_returns_payload() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 1024).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert!(v["version"].is_string());
assert_eq!(v["palace_count"], 0);
}
#[tokio::test]
async fn unknown_api_returns_404() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/does-not-exist")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn memories_alias_routes_to_drawers() {
let state = test_state();
let palace = Palace {
id: PalaceId::new("alias-test"),
name: "alias-test".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("alias-test"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create_palace");
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/alias-test/memories")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(
resp.status(),
StatusCode::OK,
"the /memories alias must resolve to list_drawers, not 404"
);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert!(
v.is_array(),
"the alias must return the list-drawers array shape, got {v:?}"
);
}
#[tokio::test]
async fn http_create_drawer_runs_auto_kg_extraction() {
let state = test_state();
let palace = Palace {
id: PalaceId::new("kgauto-http"),
name: "kgauto-http".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("kgauto-http"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create_palace");
let app = router().with_state(state.clone());
let body = json!({
"content": "trusty-memory is a Rust crate that ships an MCP server. \
It tracks #mcp and #rust topics with care.",
"room": "Backend",
"tags": ["backend", "kg"],
"importance": 0.5,
})
.to_string();
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces/kgauto-http/drawers")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(
resp.status(),
StatusCode::OK,
"create_drawer must return 200 OK"
);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/kgauto-http/kg/graph")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 64 * 1024).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let triples = v["triples"].as_array().expect("triples array");
assert!(
!triples.is_empty(),
"HTTP-origin drawer must populate the KG; got empty graph"
);
let auto: Vec<&Value> = triples
.iter()
.filter(|t| t["provenance"].as_str() == Some(crate::kg_extract::AUTO_PROVENANCE))
.collect();
assert!(
!auto.is_empty(),
"expected at least one auto-extracted triple in HTTP-populated KG; got: {triples:?}"
);
assert!(
auto.iter()
.any(|t| t["subject"].as_str() == Some("tag:backend")),
"expected `tag:backend` auto-extracted edge, got: {auto:?}"
);
assert!(
auto.iter()
.any(|t| t["predicate"].as_str() == Some("mentioned-in")),
"expected at least one #hashtag mention triple, got: {auto:?}"
);
}
#[tokio::test]
async fn create_then_list_palace() {
let state = test_state();
let app = router().with_state(state.clone());
let body = json!({"name": "web-test", "description": "from test"}).to_string();
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let arr = v.as_array().expect("array");
assert!(arr.iter().any(|p| p["id"] == "web-test"));
}
#[tokio::test]
async fn delete_palace_removes_dir_when_empty() {
let state = test_state();
let app = router().with_state(state.clone());
let body = json!({"name": "to-delete"}).to_string();
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("DELETE")
.uri("/api/v1/palaces/to-delete")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/to-delete")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let palace_dir = state.data_root.join("to-delete");
assert!(
!palace_dir.exists(),
"palace dir should be removed: {}",
palace_dir.display()
);
}
#[tokio::test]
async fn delete_palace_refuses_when_drawers_present() {
let state = test_state();
let app = router().with_state(state.clone());
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(json!({"name": "keep-me"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces/keep-me/drawers")
.header("content-type", "application/json")
.body(Body::from(
json!({
"content": "Important fact that should not be deleted accidentally.",
"tags": [],
})
.to_string(),
))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("DELETE")
.uri("/api/v1/palaces/keep-me")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::CONFLICT);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/keep-me")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[tokio::test]
async fn delete_palace_force_removes_populated_palace() {
let state = test_state();
let app = router().with_state(state.clone());
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(json!({"name": "force-delete"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces/force-delete/drawers")
.header("content-type", "application/json")
.body(Body::from(
json!({"content": "Sacrificial drawer for the force-delete path.", "tags": []}).to_string(),
))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("DELETE")
.uri("/api/v1/palaces/force-delete?force=true")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/force-delete")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn delete_palace_returns_not_found_for_missing_id() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.method("DELETE")
.uri("/api/v1/palaces/never-existed")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn update_palace_name_renames_palace() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(json!({"name": "rename-me"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("PATCH")
.uri("/api/v1/palaces/rename-me")
.header("content-type", "application/json")
.body(Body::from(json!({"name": "New Display Name"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["id"].as_str(), Some("rename-me"));
assert_eq!(v["name"].as_str(), Some("New Display Name"));
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/rename-me")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["id"].as_str(), Some("rename-me"));
assert_eq!(v["name"].as_str(), Some("New Display Name"));
}
#[tokio::test]
async fn update_palace_name_rejects_empty_name() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(json!({"name": "keep-name"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.oneshot(
Request::builder()
.method("PATCH")
.uri("/api/v1/palaces/keep-name")
.header("content-type", "application/json")
.body(Body::from(json!({"name": " "}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn update_palace_name_returns_not_found_for_missing_id() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.method("PATCH")
.uri("/api/v1/palaces/no-such-palace")
.header("content-type", "application/json")
.body(Body::from(json!({"name": "irrelevant"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn palace_list_includes_graph_counts() {
let state = test_state();
let app = router().with_state(state.clone());
let body = json!({"name": "graph-counts", "description": null}).to_string();
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let arr = v.as_array().expect("array");
let row = arr
.iter()
.find(|p| p["id"] == "graph-counts")
.expect("created palace must appear in list");
assert_eq!(row["node_count"].as_u64(), Some(0));
assert_eq!(row["edge_count"].as_u64(), Some(0));
assert_eq!(row["community_count"].as_u64(), Some(0));
assert_eq!(row["is_compacting"].as_bool(), Some(false));
}
#[tokio::test]
async fn status_includes_total_counters() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["total_drawers"], 0);
assert_eq!(v["total_vectors"], 0);
assert_eq!(v["total_kg_triples"], 0);
}
#[tokio::test]
async fn dream_status_empty_returns_nulls() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/dream/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert!(v["last_run_at"].is_null());
assert_eq!(v["merged"], 0);
assert_eq!(v["pruned"], 0);
}
#[tokio::test]
async fn providers_endpoint_returns_payload() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/chat/providers")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let arr = v["providers"].as_array().expect("providers array");
assert_eq!(arr.len(), 2);
let names: Vec<&str> = arr.iter().filter_map(|p| p["name"].as_str()).collect();
assert!(names.contains(&"ollama"));
assert!(names.contains(&"openrouter"));
assert!(v.get("active").is_some());
}
#[tokio::test]
async fn chat_session_crud_round_trip() {
let state = test_state();
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("sess-test"),
name: "sess-test".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("sess-test"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create_palace");
let app = router().with_state(state);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces/sess-test/chat/sessions")
.header("content-type", "application/json")
.body(Body::from(json!({"title":"first chat"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let sid = v["id"].as_str().expect("session id").to_string();
let resp = app
.clone()
.oneshot(
Request::builder()
.uri("/api/v1/palaces/sess-test/chat/sessions")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let arr = v.as_array().expect("array");
assert!(arr.iter().any(|s| s["id"] == sid));
let resp = app
.clone()
.oneshot(
Request::builder()
.uri(format!("/api/v1/palaces/sess-test/chat/sessions/{sid}"))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("DELETE")
.uri(format!("/api/v1/palaces/sess-test/chat/sessions/{sid}"))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
let resp = app
.oneshot(
Request::builder()
.uri(format!("/api/v1/palaces/sess-test/chat/sessions/{sid}"))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn messages_endpoint_round_trip() {
let state = test_state();
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("msg-test"),
name: "msg-test".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("msg-test"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create_palace");
let app = router().with_state(state);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/messages")
.header("content-type", "application/json")
.body(Body::from(
json!({
"to_palace": "msg-test",
"from_palace": "sender-palace",
"purpose": "task",
"content": "please refresh schema"
})
.to_string(),
))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let send_resp: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(send_resp["status"], "sent");
let drawer_id = send_resp["drawer_id"]
.as_str()
.expect("drawer_id")
.to_string();
let resp = app
.clone()
.oneshot(
Request::builder()
.uri("/api/v1/messages?palace=msg-test&unread_only=true")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 8192).await.unwrap();
let list: Vec<Value> = serde_json::from_slice(&bytes).unwrap();
assert_eq!(list.len(), 1);
assert_eq!(list[0]["id"], drawer_id);
assert_eq!(list[0]["from_palace"], "sender-palace");
assert_eq!(list[0]["to_palace"], "msg-test");
assert_eq!(list[0]["purpose"], "task");
assert_eq!(list[0]["content"], "please refresh schema");
assert_eq!(list[0]["read"], false);
assert!(list[0]["formatted"]
.as_str()
.unwrap()
.contains("sender-palace"));
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/messages/mark_read")
.header("content-type", "application/json")
.body(Body::from(
json!({"palace": "msg-test", "drawer_id": drawer_id}).to_string(),
))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 1024).await.unwrap();
let mark: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(mark["flipped"], true);
let resp = app
.clone()
.oneshot(
Request::builder()
.uri("/api/v1/messages?palace=msg-test&unread_only=true")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let list: Vec<Value> = serde_json::from_slice(&bytes).unwrap();
assert!(list.is_empty(), "inbox cleared after mark_read");
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/messages?palace=msg-test&unread_only=false")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 8192).await.unwrap();
let history: Vec<Value> = serde_json::from_slice(&bytes).unwrap();
assert_eq!(history.len(), 1);
assert_eq!(history[0]["read"], true);
}
#[test]
fn all_tools_returns_expected_set() {
let tools = crate::chat::all_tools();
let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
assert_eq!(
names,
vec![
"list_palaces",
"get_palace",
"recall_memories",
"list_drawers",
"kg_query",
"get_config",
"get_status",
"get_dream_status",
"get_palace_dream_status",
"create_memory",
"kg_assert",
"memory_recall_all",
]
);
for t in &tools {
assert_eq!(
t.parameters["type"], "object",
"tool {} schema type",
t.name
);
assert!(
t.parameters["required"].is_array(),
"tool {} required not array",
t.name
);
}
}
#[tokio::test]
async fn execute_tool_dispatches_known_tools() {
let state = test_state();
let result = crate::chat::execute_tool("list_palaces", "{}", &state).await;
assert!(
result.is_array(),
"list_palaces should be array, got {result}"
);
assert_eq!(result.as_array().unwrap().len(), 0);
let unknown = crate::chat::execute_tool("not_a_tool", "{}", &state).await;
assert!(
unknown["error"]
.as_str()
.unwrap_or("")
.contains("unknown tool"),
"expected unknown-tool error, got {unknown}"
);
let missing = crate::chat::execute_tool("get_palace", "{}", &state).await;
assert!(
missing["error"]
.as_str()
.unwrap_or("")
.contains("palace_id"),
"expected missing-arg error, got {missing}"
);
}
#[tokio::test]
async fn sse_broadcast_emits_palace_created() {
let state = test_state();
let mut rx = state.events.subscribe();
let app = router().with_state(state.clone());
let body = json!({"name": "sse-test"}).to_string();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let event = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv())
.await
.expect("event received within timeout")
.expect("event channel still open");
match event {
DaemonEvent::PalaceCreated { id, name, source } => {
assert_eq!(id, "sse-test");
assert_eq!(name, "sse-test");
assert_eq!(source, ActivitySource::Http);
}
other => panic!("expected PalaceCreated, got {other:?}"),
}
}
#[tokio::test]
async fn sse_endpoint_emits_connected_frame() {
use axum::routing::get;
let state = test_state();
let app = router()
.route("/sse", get(crate::sse_handler))
.with_state(state);
let resp = app
.oneshot(Request::builder().uri("/sse").body(Body::empty()).unwrap())
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok()),
Some("text/event-stream")
);
let body = resp.into_body();
let bytes =
tokio::time::timeout(std::time::Duration::from_millis(500), to_bytes(body, 4096))
.await
.ok()
.and_then(|r| r.ok())
.unwrap_or_default();
let text = String::from_utf8_lossy(&bytes);
assert!(
text.contains("\"type\":\"connected\""),
"expected connected frame, got: {text}"
);
}
#[tokio::test]
async fn dream_status_aggregates_across_palaces() {
use trusty_common::memory_core::dream::{DreamStats, PersistedDreamStats};
let state = test_state();
for (id, stats, ts) in [
(
"palace-a",
DreamStats {
merged: 1,
pruned: 2,
compacted: 3,
closets_updated: 4,
duration_ms: 100,
..DreamStats::default()
},
chrono::Utc::now() - chrono::Duration::seconds(60),
),
(
"palace-b",
DreamStats {
merged: 10,
pruned: 20,
compacted: 30,
closets_updated: 40,
duration_ms: 200,
..DreamStats::default()
},
chrono::Utc::now(),
),
] {
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new(id),
name: id.to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join(id),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
let persisted = PersistedDreamStats {
last_run_at: ts,
stats,
};
persisted
.save(&state.data_root.join(id))
.expect("save dream stats");
}
let later = chrono::Utc::now();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/dream/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["merged"], 11);
assert_eq!(v["pruned"], 22);
assert_eq!(v["compacted"], 33);
assert_eq!(v["closets_updated"], 44);
assert_eq!(v["duration_ms"], 300);
let last = v["last_run_at"].as_str().expect("last_run_at is string");
let parsed: chrono::DateTime<chrono::Utc> = last
.parse()
.expect("last_run_at parses as RFC3339 timestamp");
assert!(
parsed <= later,
"last_run_at ({parsed}) should not exceed wall clock ({later})"
);
let cutoff = chrono::Utc::now() - chrono::Duration::seconds(30);
assert!(
parsed >= cutoff,
"expected the newer (palace-b) timestamp; got {parsed}"
);
}
#[tokio::test]
async fn dream_run_aggregates_stats() {
let state = test_state();
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("dream-run-test"),
name: "dream-run-test".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("dream-run-test"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/dream/run")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
for key in [
"merged",
"pruned",
"compacted",
"closets_updated",
"duration_ms",
] {
assert!(
v.get(key).is_some(),
"missing key {key} in dream_run payload: {v}"
);
assert!(
v[key].is_u64() || v[key].is_i64(),
"{key} should be integer, got {}",
v[key]
);
}
assert!(
v["last_run_at"].is_string(),
"last_run_at must be set by dream_run; got {v}"
);
}
#[tokio::test]
async fn kg_gaps_endpoint_returns_empty_when_uncached() {
let state = test_state();
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("gaps-empty"),
name: "gaps-empty".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("gaps-empty"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/kg/gaps?palace=gaps-empty")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v.as_array().expect("array").len(), 0);
}
#[tokio::test]
async fn kg_gaps_endpoint_returns_cached_gaps() {
use trusty_common::memory_core::community::KnowledgeGap;
let state = test_state();
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("gaps-seed"),
name: "gaps-seed".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("gaps-seed"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
state.registry.set_gaps(
PalaceId::new("gaps-seed"),
vec![KnowledgeGap {
entities: vec!["foo".to_string(), "bar".to_string(), "baz".to_string()],
internal_density: 0.15,
external_bridges: 2,
suggested_exploration: "Explore connections between foo and related concepts"
.to_string(),
}],
);
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/kg/gaps?palace=gaps-seed")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let arr = v.as_array().expect("array");
assert_eq!(arr.len(), 1);
assert_eq!(arr[0]["entities"].as_array().unwrap().len(), 3);
assert_eq!(arr[0]["external_bridges"], 2);
assert!(arr[0]["suggested_exploration"]
.as_str()
.unwrap()
.contains("foo"));
}
#[tokio::test]
async fn kg_list_subjects_returns_distinct() {
let state = test_state();
let app = router().with_state(state.clone());
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(json!({"name": "kg-list"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
for subj in ["alpha", "beta"] {
let body = json!({
"subject": subj,
"predicate": "is",
"object": "thing",
})
.to_string();
let r = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces/kg-list/kg")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(r.status(), StatusCode::NO_CONTENT);
}
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/kg-list/kg/subjects?limit=10")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let arr = v.as_array().expect("subjects must be array");
let subjects: Vec<String> = arr
.iter()
.filter_map(|x| x.as_str().map(String::from))
.collect();
assert_eq!(subjects, vec!["alpha".to_string(), "beta".to_string()]);
}
#[tokio::test]
async fn kg_list_all_returns_paginated_triples() {
let state = test_state();
let app = router().with_state(state.clone());
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(json!({"name": "kg-all"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = json!({
"subject": "alpha",
"predicate": "is",
"object": "thing",
})
.to_string();
let r = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces/kg-all/kg")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(r.status(), StatusCode::NO_CONTENT);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/kg-all/kg/all?limit=10&offset=0")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let arr = v.as_array().expect("triples must be array");
assert_eq!(arr.len(), 1);
assert_eq!(arr[0]["subject"], "alpha");
assert_eq!(arr[0]["predicate"], "is");
assert_eq!(arr[0]["object"], "thing");
}
#[tokio::test]
async fn kg_graph_returns_active_triples() {
let state = test_state();
let app = router().with_state(state.clone());
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(json!({"name": "kg-graph"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = json!({
"subject": "alpha",
"predicate": "is",
"object": "thing",
})
.to_string();
let r = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces/kg-graph/kg")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(r.status(), StatusCode::NO_CONTENT);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/kg-graph/kg/graph")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 16_384).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let triples = v["triples"].as_array().expect("triples array");
assert!(triples
.iter()
.any(|t| t["subject"] == "alpha" && t["predicate"] == "is" && t["object"] == "thing"));
assert!(v["node_count"].as_u64().is_some());
assert!(v["edge_count"].as_u64().is_some());
assert!(v["community_count"].as_u64().is_some());
}
#[tokio::test]
async fn kg_graph_meets_perf_budget_for_500_triples() {
let state = test_state();
let app = router().with_state(state.clone());
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(json!({"name": "kg-perf"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let pid = trusty_common::memory_core::palace::PalaceId::new("kg-perf");
let handle = state
.registry
.open_palace(&state.data_root, &pid)
.expect("open palace");
let now = chrono::Utc::now();
for s in 0..10 {
for o in 0..50 {
handle
.kg
.assert(Triple {
subject: format!("s{s}"),
predicate: format!("p{o}"),
object: format!("o{o}"),
valid_from: now,
valid_to: None,
confidence: 1.0,
provenance: Some("perf-test".to_string()),
})
.await
.expect("kg.assert");
}
}
let started = std::time::Instant::now();
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/kg-perf/kg/graph")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let elapsed = started.elapsed();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 1_000_000).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let n = v["triples"].as_array().map(|a| a.len()).unwrap_or(0);
assert_eq!(n, 500, "expected 500 triples in payload");
assert!(
elapsed.as_secs_f64() < 10.0,
"graph endpoint should serve 500 triples in well under 10s; took {elapsed:?}"
);
eprintln!(
"[perf] kg_graph endpoint served 500 triples in {:.3}ms",
elapsed.as_secs_f64() * 1000.0
);
}
#[tokio::test]
async fn prompt_context_endpoint_returns_formatted_block() {
let state = test_state();
let app = router().with_state(state.clone());
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/kg/prompt-context")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let text = String::from_utf8(bytes.to_vec()).unwrap();
assert_eq!(text, "No prompt facts stored yet.");
{
let mut guard = state.prompt_context_cache.write().await;
let triples = vec![(
"tga".to_string(),
"is_alias_for".to_string(),
"trusty-git-analytics".to_string(),
)];
let formatted = crate::prompt_facts::build_prompt_context(&triples);
*guard = crate::prompt_facts::PromptFactsCache { triples, formatted };
}
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/kg/prompt-context")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let text = String::from_utf8(bytes.to_vec()).unwrap();
assert!(text.contains("tga → trusty-git-analytics"), "got: {text}");
}
#[tokio::test]
async fn add_alias_endpoint_asserts_triple_and_refreshes_cache() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
std::mem::forget(tmp);
let state = AppState::new(root).with_default_palace(Some("aliases".to_string()));
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("aliases"),
name: "aliases".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("aliases"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
let body = json!({"short": "tm", "full": "trusty-memory"});
let app = router().with_state(state.clone());
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/kg/aliases")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&body).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["subject"], "tm");
assert_eq!(v["object"], "trusty-memory");
let guard = state.prompt_context_cache.read().await;
assert!(
guard.formatted.contains("tm → trusty-memory"),
"cache missing alias; got: {}",
guard.formatted
);
}
#[tokio::test]
async fn list_prompt_facts_endpoint_returns_hot_triples() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
std::mem::forget(tmp);
let state = AppState::new(root).with_default_palace(Some("listfacts".to_string()));
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("listfacts"),
name: "listfacts".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("listfacts"),
};
let handle = state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
handle
.kg
.assert(Triple {
subject: "ts".to_string(),
predicate: "is_alias_for".to_string(),
object: "trusty-search".to_string(),
valid_from: chrono::Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.expect("assert alias");
handle
.kg
.assert(Triple {
subject: "alice".to_string(),
predicate: "works_at".to_string(),
object: "Acme".to_string(),
valid_from: chrono::Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.expect("assert works_at");
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/kg/prompt-facts")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let arr = v.as_array().expect("array");
assert!(
arr.iter().any(|r| r["subject"] == "ts"
&& r["predicate"] == "is_alias_for"
&& r["object"] == "trusty-search"),
"missing ts alias; got {arr:?}"
);
assert!(
!arr.iter().any(|r| r["predicate"] == "works_at"),
"non-hot triple leaked into prompt facts: {arr:?}"
);
}
#[tokio::test]
async fn remove_prompt_fact_endpoint_soft_deletes_and_refreshes_cache() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
std::mem::forget(tmp);
let state = AppState::new(root).with_default_palace(Some("rmfacts".to_string()));
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("rmfacts"),
name: "rmfacts".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("rmfacts"),
};
let handle = state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
handle
.kg
.assert(Triple {
subject: "ta".to_string(),
predicate: "is_alias_for".to_string(),
object: "trusty-analyze".to_string(),
valid_from: chrono::Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: None,
})
.await
.expect("assert alias");
crate::prompt_facts::rebuild_prompt_cache(&state)
.await
.expect("rebuild prompt cache");
let app = router().with_state(state.clone());
let resp = app
.oneshot(
Request::builder()
.method("DELETE")
.uri("/api/v1/kg/prompt-facts?subject=ta&predicate=is_alias_for")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["removed"], true);
assert!(v["closed"].as_u64().unwrap_or(0) >= 1);
{
let guard = state.prompt_context_cache.read().await;
assert!(
!guard.formatted.contains("ta → trusty-analyze"),
"alias still in cache after delete: {}",
guard.formatted
);
}
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.method("DELETE")
.uri("/api/v1/kg/prompt-facts?subject=nope&predicate=is_alias_for")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["removed"], false);
}
#[tokio::test]
async fn serves_index_html_fallback() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(Request::builder().uri("/").body(Body::empty()).unwrap())
.await
.unwrap();
assert!(
resp.status() == StatusCode::OK || resp.status() == StatusCode::NOT_FOUND,
"got {}",
resp.status()
);
}
#[tokio::test]
async fn activity_endpoint_lists_recent_emits() {
let state = test_state();
state.emit(DaemonEvent::PalaceCreated {
id: "alpha".into(),
name: "alpha".into(),
source: ActivitySource::Http,
});
state.emit(DaemonEvent::DrawerAdded {
palace_id: "alpha".into(),
palace_name: "alpha".into(),
drawer_count: 1,
timestamp: chrono::Utc::now(),
content_preview: "hello".into(),
source: ActivitySource::Mcp,
});
state.emit(DaemonEvent::DrawerAdded {
palace_id: "beta".into(),
palace_name: "beta".into(),
drawer_count: 1,
timestamp: chrono::Utc::now(),
content_preview: "hi there".into(),
source: ActivitySource::Http,
});
state.emit(DaemonEvent::DrawerDeleted {
palace_id: "alpha".into(),
drawer_count: 0,
source: ActivitySource::Http,
});
state.flush_activity_writes().await;
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/activity?limit=10")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 8192).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["limit"], 10);
assert_eq!(v["offset"], 0);
assert_eq!(v["total"], 4);
let entries = v["entries"].as_array().expect("entries array");
assert_eq!(entries.len(), 4);
assert_eq!(entries[0]["event_type"], "drawer_deleted");
assert_eq!(entries[3]["event_type"], "palace_created");
let sources: Vec<&str> = entries
.iter()
.filter_map(|e| e["source"].as_str())
.collect();
assert!(sources.contains(&"http"));
assert!(sources.contains(&"mcp"));
assert!(entries[0]["payload"].is_object());
}
#[tokio::test]
async fn activity_endpoint_clamps_limit() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/activity?limit=10000")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["limit"], ACTIVITY_MAX_LIMIT);
}
#[tokio::test]
async fn activity_endpoint_filters_by_source_and_palace() {
let state = test_state();
state.emit(DaemonEvent::DrawerAdded {
palace_id: "alpha".into(),
palace_name: "alpha".into(),
drawer_count: 1,
timestamp: chrono::Utc::now(),
content_preview: "".into(),
source: ActivitySource::Mcp,
});
state.emit(DaemonEvent::DrawerAdded {
palace_id: "alpha".into(),
palace_name: "alpha".into(),
drawer_count: 2,
timestamp: chrono::Utc::now(),
content_preview: "".into(),
source: ActivitySource::Http,
});
state.emit(DaemonEvent::DrawerAdded {
palace_id: "beta".into(),
palace_name: "beta".into(),
drawer_count: 1,
timestamp: chrono::Utc::now(),
content_preview: "".into(),
source: ActivitySource::Mcp,
});
state.flush_activity_writes().await;
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/activity?palace=alpha&source=mcp&limit=50")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let entries = v["entries"].as_array().unwrap();
assert_eq!(entries.len(), 1, "filter should leave one row, got {v}");
assert_eq!(entries[0]["palace_id"], "alpha");
assert_eq!(entries[0]["source"], "mcp");
}
#[tokio::test]
async fn activity_endpoint_rejects_unknown_source() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/activity?source=nope")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn mcp_memory_remember_emits_drawer_added_with_mcp_source() {
use crate::tools::dispatch_tool;
let state = test_state();
let mut rx = state.events.subscribe();
let _ = dispatch_tool(&state, "palace_create", json!({"name": "p1"}))
.await
.expect("palace_create");
let first = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv())
.await
.expect("first event")
.expect("channel open");
assert!(
matches!(first, DaemonEvent::PalaceCreated { ref source, .. } if *source == ActivitySource::Mcp)
);
let _ = dispatch_tool(
&state,
"memory_remember",
json!({
"palace": "p1",
"text": "the quick brown fox jumps over the lazy dog and more"
}),
)
.await
.expect("memory_remember");
let next = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv())
.await
.expect("drawer_added event")
.expect("channel open");
match next {
DaemonEvent::DrawerAdded {
source, palace_id, ..
} => {
assert_eq!(source, ActivitySource::Mcp);
assert_eq!(palace_id, "p1");
}
other => panic!("expected DrawerAdded, got {other:?}"),
}
state.flush_activity_writes().await;
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/activity?source=mcp&limit=10")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let entries = v["entries"].as_array().unwrap();
let event_types: std::collections::HashSet<&str> = entries
.iter()
.filter_map(|e| e["event_type"].as_str())
.collect();
assert!(event_types.contains("drawer_added"));
assert!(event_types.contains("palace_created"));
}
#[tokio::test]
async fn hook_fired_activity_emit_smoke() {
let state = test_state();
let app = router().with_state(state.clone());
let payload = serde_json::json!({
"palace_id": "alpha",
"palace_name": "alpha",
"hook_type": "UserPromptSubmit",
"injection_kind": "prompt-context",
"injection_length": 256,
"trigger_prompt_excerpt": "test prompt",
"duration_ms": 12,
});
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/activity/hook")
.header("content-type", "application/json")
.body(Body::from(payload.to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
state.flush_activity_writes().await;
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/activity?source=hook&limit=10")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let entries = v["entries"].as_array().expect("entries array");
assert!(
!entries.is_empty(),
"expected at least one hook activity row, got {entries:?}"
);
let first = &entries[0];
assert_eq!(first["source"], "hook");
assert_eq!(first["event_type"], "hook_fired");
assert_eq!(first["palace_id"], "alpha");
let body = &first["payload"];
assert_eq!(body["hook_type"], "UserPromptSubmit");
assert_eq!(body["injection_kind"], "prompt-context");
}
#[tokio::test]
async fn drawer_creator_attribution_http_default() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
std::mem::forget(tmp);
let state = AppState::new(root);
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("cred-default"),
name: "cred-default".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("cred-default"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
let app = router().with_state(state.clone());
let body = serde_json::json!({
"content": "hello world from anonymous client",
"tags": ["user-tag"],
});
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces/cred-default/drawers")
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/cred-default/drawers?limit=10")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 8192).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let drawers = v.as_array().expect("drawers array");
assert_eq!(drawers.len(), 1, "expected one drawer, got {drawers:?}");
let tags: Vec<&str> = drawers[0]["tags"]
.as_array()
.expect("tags array")
.iter()
.filter_map(|t| t.as_str())
.collect();
assert!(
tags.contains(&"user-tag"),
"user-supplied tag must survive; got {tags:?}"
);
assert!(
tags.contains(&"creator:client=unknown-http-client"),
"expected default client tag; got {tags:?}"
);
assert!(
tags.contains(&"creator:source=http"),
"expected http source tag; got {tags:?}"
);
assert!(
tags.iter().any(|t| t.starts_with("creator:version=")),
"expected creator:version tag; got {tags:?}"
);
}
#[tokio::test]
async fn drawer_creator_attribution_http_header() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
std::mem::forget(tmp);
let state = AppState::new(root);
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("cred-header"),
name: "cred-header".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("cred-header"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
let app = router().with_state(state.clone());
let body = serde_json::json!({
"content": "this is enough content to pass the signal/noise filter applied by remember",
"tags": [],
});
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces/cred-header/drawers")
.header("content-type", "application/json")
.header("x-trusty-client-name", "qa-curl")
.header("x-trusty-client-cwd", "/tmp/qa")
.body(Body::from(body.to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/cred-header/drawers?limit=10")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let bytes = to_bytes(resp.into_body(), 8192).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let tags: Vec<&str> = v[0]["tags"]
.as_array()
.expect("tags")
.iter()
.filter_map(|t| t.as_str())
.collect();
assert!(
tags.contains(&"creator:client=qa-curl"),
"expected custom client tag; got {tags:?}"
);
assert!(
tags.contains(&"creator:cwd=/tmp/qa"),
"expected cwd tag from header; got {tags:?}"
);
}
#[tokio::test]
async fn drawer_creator_attribution_mcp_default() {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
std::mem::forget(tmp);
let state = AppState::new(root);
let palace = trusty_common::memory_core::Palace {
id: PalaceId::new("cred-mcp"),
name: "cred-mcp".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("cred-mcp"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
let _ = crate::tools::dispatch_tool(
&state,
"memory_remember",
json!({
"palace": "cred-mcp",
"text": "remember a sentence with enough tokens to pass filters please",
"room": "General",
"tags": ["from-test"],
}),
)
.await
.expect("memory_remember dispatch");
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces/cred-mcp/drawers?limit=10")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let bytes = to_bytes(resp.into_body(), 8192).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let drawers = v.as_array().expect("drawers array");
assert!(!drawers.is_empty(), "expected at least one drawer");
let tags: Vec<&str> = drawers[0]["tags"]
.as_array()
.expect("tags array")
.iter()
.filter_map(|t| t.as_str())
.collect();
assert!(
tags.contains(&"creator:client=trusty-memory-mcp"),
"expected MCP client tag; got {tags:?}"
);
assert!(
tags.contains(&"creator:source=mcp"),
"expected MCP source tag; got {tags:?}"
);
}
#[tokio::test]
async fn hook_emit_failure_isolated() {
let _guard = crate::commands::env_test_lock().lock().await;
let tmp = tempfile::tempdir().expect("tempdir");
unsafe {
std::env::set_var(trusty_common::DATA_DIR_OVERRIDE_ENV, tmp.path());
}
let res = crate::commands::prompt_context::handle_prompt_context().await;
unsafe {
std::env::remove_var(trusty_common::DATA_DIR_OVERRIDE_ENV);
}
assert!(
res.is_ok(),
"hook must complete even when daemon emit fails; got {res:?}"
);
}
#[test]
fn decode_triple_id_round_trips() {
let cases = [
("drawer:some-uuid", "has_tag"),
("entity:alice", "works_at"),
("entity:project/foo", "depends_on"),
("subject", ""),
("path/to/node", "rel:type:sub"),
];
for (subject, predicate) in cases {
let encoded = encode_triple_id(subject, predicate);
assert!(
!encoded.contains('+') && !encoded.contains('/') && !encoded.contains('='),
"encoded triple id {encoded:?} is not URL-safe"
);
let (s, p) = decode_triple_id(&encoded)
.unwrap_or_else(|| panic!("decode_triple_id failed for {encoded:?}"));
assert_eq!(s, subject, "subject mismatch for ({subject}, {predicate})");
assert_eq!(
p, predicate,
"predicate mismatch for ({subject}, {predicate})"
);
}
}
#[test]
fn decode_triple_id_returns_none_for_invalid_input() {
assert!(decode_triple_id("not!!valid%%base64").is_none());
use base64::Engine as _;
let no_sep = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(b"no-separator");
assert!(decode_triple_id(&no_sep).is_none());
}
}