use crate::{AppState, DaemonEvent};
use axum::{
body::Body,
extract::{Path as AxumPath, Query, State},
http::{header, HeaderValue, Request, StatusCode},
response::{IntoResponse, Response},
routing::{delete, get, post},
Json, Router,
};
use rust_embed::RustEmbed;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashSet;
use std::sync::Arc;
use trusty_common::{ChatEvent, ChatMessage, ToolDef};
use trusty_memory_core::dream::{DreamConfig, Dreamer, PersistedDreamStats};
use trusty_memory_core::palace::{Palace, PalaceId, RoomType};
use trusty_memory_core::retrieval::{
recall_across_palaces_with_default_embedder, recall_deep_with_default_embedder,
recall_with_default_embedder,
};
use trusty_memory_core::store::kg::Triple;
use trusty_memory_core::{PalaceHandle, PalaceRegistry};
use uuid::Uuid;
#[derive(RustEmbed)]
#[folder = "$CARGO_MANIFEST_DIR/../../ui/dist/"]
struct WebAssets;
pub fn router() -> Router<AppState> {
let router = Router::new()
.route("/api/v1/status", get(status))
.route("/api/v1/config", get(config))
.route("/api/v1/palaces", get(list_palaces).post(create_palace))
.route("/api/v1/palaces/{id}", get(get_palace_handler))
.route(
"/api/v1/palaces/{id}/drawers",
get(list_drawers).post(create_drawer),
)
.route(
"/api/v1/palaces/{id}/drawers/{drawer_id}",
delete(delete_drawer),
)
.route("/api/v1/palaces/{id}/recall", get(recall_handler))
.route("/api/v1/recall", get(recall_all_handler))
.route("/api/v1/palaces/{id}/kg", get(kg_query).post(kg_assert))
.route(
"/api/v1/palaces/{id}/dream/status",
get(palace_dream_status),
)
.route("/api/v1/dream/status", get(dream_status))
.route("/api/v1/dream/run", post(dream_run))
.route("/api/v1/chat", post(chat_handler))
.route("/api/v1/chat/providers", get(list_providers))
.route(
"/api/v1/palaces/{id}/chat/sessions",
get(list_chat_sessions).post(create_chat_session),
)
.route(
"/api/v1/palaces/{id}/chat/sessions/{session_id}",
get(get_chat_session).delete(delete_chat_session),
)
.route("/health", get(health))
.fallback(static_handler);
trusty_common::server::with_standard_middleware(router)
}
#[derive(serde::Serialize)]
struct HealthResponse {
status: &'static str,
version: &'static str,
}
async fn health() -> Json<HealthResponse> {
Json(HealthResponse {
status: "ok",
version: env!("CARGO_PKG_VERSION"),
})
}
async fn static_handler(req: Request<Body>) -> Response {
let path = req.uri().path().trim_start_matches('/').to_string();
if path.starts_with("api/") {
return (StatusCode::NOT_FOUND, "not found").into_response();
}
serve_embedded(&path).unwrap_or_else(|| {
serve_embedded("index.html")
.unwrap_or_else(|| (StatusCode::NOT_FOUND, "ui assets missing").into_response())
})
}
fn serve_embedded(path: &str) -> Option<Response> {
let path = if path.is_empty() { "index.html" } else { path };
let asset = WebAssets::get(path)?;
let mime = mime_guess::from_path(path).first_or_octet_stream();
let body = Body::from(asset.data.into_owned());
let mut resp = Response::new(body);
resp.headers_mut().insert(
header::CONTENT_TYPE,
HeaderValue::from_str(mime.as_ref())
.unwrap_or_else(|_| HeaderValue::from_static("application/octet-stream")),
);
Some(resp)
}
#[derive(Serialize)]
struct StatusPayload {
version: String,
palace_count: usize,
default_palace: Option<String>,
data_root: String,
total_drawers: usize,
total_vectors: usize,
total_kg_triples: usize,
}
async fn status(State(state): State<AppState>) -> Json<StatusPayload> {
let palaces = PalaceRegistry::list_palaces(&state.data_root).unwrap_or_default();
let palace_count = palaces.len();
let (mut total_drawers, mut total_vectors, mut total_kg_triples) = (0usize, 0usize, 0usize);
for p in &palaces {
if let Ok(handle) = state.registry.open_palace(&state.data_root, &p.id) {
total_drawers = total_drawers.saturating_add(handle.drawers.read().len());
total_vectors = total_vectors.saturating_add(handle.vector_store.index_size());
total_kg_triples = total_kg_triples.saturating_add(handle.kg.count_active_triples());
}
}
Json(StatusPayload {
version: state.version.clone(),
palace_count,
default_palace: state.default_palace.clone(),
data_root: state.data_root.display().to_string(),
total_drawers,
total_vectors,
total_kg_triples,
})
}
#[derive(Serialize)]
struct ConfigPayload {
openrouter_configured: bool,
model: String,
data_root: String,
}
async fn config(State(state): State<AppState>) -> Json<ConfigPayload> {
let cfg = load_user_config().unwrap_or_default();
Json(ConfigPayload {
openrouter_configured: !cfg.openrouter_api_key.is_empty(),
model: cfg.openrouter_model,
data_root: state.data_root.display().to_string(),
})
}
#[derive(Deserialize, Default, Clone)]
struct UserConfigMin {
#[serde(default)]
openrouter: OpenRouterMin,
#[serde(default)]
local_model: LocalModelMin,
}
#[derive(Deserialize, Default, Clone)]
struct OpenRouterMin {
#[serde(default)]
api_key: String,
#[serde(default)]
model: String,
}
#[derive(Deserialize, Clone)]
struct LocalModelMin {
#[serde(default = "default_local_enabled")]
enabled: bool,
#[serde(default = "default_local_base_url")]
base_url: String,
#[serde(default = "default_local_model")]
model: String,
}
fn default_local_enabled() -> bool {
true
}
fn default_local_base_url() -> String {
"http://localhost:11434".to_string()
}
fn default_local_model() -> String {
"llama3.2".to_string()
}
impl Default for LocalModelMin {
fn default() -> Self {
Self {
enabled: default_local_enabled(),
base_url: default_local_base_url(),
model: default_local_model(),
}
}
}
#[derive(Clone)]
pub(crate) struct LoadedUserConfig {
pub(crate) openrouter_api_key: String,
pub(crate) openrouter_model: String,
pub(crate) local_model: trusty_common::LocalModelConfig,
}
impl Default for LoadedUserConfig {
fn default() -> Self {
Self {
openrouter_api_key: String::new(),
openrouter_model: "anthropic/claude-3-5-sonnet".to_string(),
local_model: trusty_common::LocalModelConfig::default(),
}
}
}
pub(crate) fn load_user_config() -> Option<LoadedUserConfig> {
let home = dirs::home_dir()?;
let path = home.join(".trusty-memory").join("config.toml");
if !path.exists() {
return Some(LoadedUserConfig::default());
}
let raw = std::fs::read_to_string(&path).ok()?;
let parsed: UserConfigMin = toml::from_str(&raw).unwrap_or_default();
let model = if parsed.openrouter.model.is_empty() {
"anthropic/claude-3-5-sonnet".to_string()
} else {
parsed.openrouter.model
};
Some(LoadedUserConfig {
openrouter_api_key: parsed.openrouter.api_key,
openrouter_model: model,
local_model: trusty_common::LocalModelConfig {
enabled: parsed.local_model.enabled,
base_url: parsed.local_model.base_url,
model: parsed.local_model.model,
},
})
}
#[derive(Serialize)]
struct PalaceInfo {
id: String,
name: String,
description: Option<String>,
drawer_count: usize,
vector_count: usize,
kg_triple_count: usize,
wing_count: usize,
created_at: chrono::DateTime<chrono::Utc>,
}
fn palace_info_from(palace: &Palace, handle: Option<&Arc<PalaceHandle>>) -> PalaceInfo {
let (drawer_count, vector_count, kg_triple_count, wing_count) = if let Some(h) = handle {
let drawers = h.drawers.read();
let distinct_rooms: HashSet<Uuid> = drawers.iter().map(|d| d.room_id).collect();
(
drawers.len(),
h.vector_store.index_size(),
h.kg.count_active_triples(),
distinct_rooms.len(),
)
} else {
(0, 0, 0, 0)
};
PalaceInfo {
id: palace.id.0.clone(),
name: palace.name.clone(),
description: palace.description.clone(),
drawer_count,
vector_count,
kg_triple_count,
wing_count,
created_at: palace.created_at,
}
}
async fn list_palaces(State(state): State<AppState>) -> Result<Json<Vec<PalaceInfo>>, ApiError> {
let palaces = PalaceRegistry::list_palaces(&state.data_root)
.map_err(|e| ApiError::internal(format!("list palaces: {e:#}")))?;
let mut out = Vec::with_capacity(palaces.len());
for p in palaces {
let handle = state.registry.open_palace(&state.data_root, &p.id).ok();
out.push(palace_info_from(&p, handle.as_ref()));
}
Ok(Json(out))
}
#[derive(Deserialize)]
struct CreatePalaceBody {
name: String,
#[serde(default)]
description: Option<String>,
}
async fn create_palace(
State(state): State<AppState>,
Json(body): Json<CreatePalaceBody>,
) -> Result<Json<Value>, ApiError> {
let name = body.name.trim().to_string();
if name.is_empty() {
return Err(ApiError::bad_request("name is required"));
}
let id = PalaceId::new(&name);
let palace = Palace {
id: id.clone(),
name: name.clone(),
description: body.description.filter(|s| !s.is_empty()),
created_at: chrono::Utc::now(),
data_dir: state.data_root.join(&name),
};
state
.registry
.create_palace(&state.data_root, palace)
.map_err(|e| ApiError::internal(format!("create palace: {e:#}")))?;
state.emit(DaemonEvent::PalaceCreated {
id: name.clone(),
name: name.clone(),
});
Ok(Json(json!({ "id": name })))
}
async fn get_palace_handler(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
) -> Result<Json<PalaceInfo>, ApiError> {
let palaces = PalaceRegistry::list_palaces(&state.data_root)
.map_err(|e| ApiError::internal(format!("list palaces: {e:#}")))?;
let palace = palaces
.into_iter()
.find(|p| p.id.0 == id)
.ok_or_else(|| ApiError::not_found(format!("palace not found: {id}")))?;
let handle = state
.registry
.open_palace(&state.data_root, &palace.id)
.ok();
Ok(Json(palace_info_from(&palace, handle.as_ref())))
}
#[derive(Deserialize)]
struct ListDrawersQuery {
#[serde(default)]
room: Option<String>,
#[serde(default)]
tag: Option<String>,
#[serde(default)]
limit: Option<usize>,
}
async fn list_drawers(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Query(q): Query<ListDrawersQuery>,
) -> Result<Json<Value>, ApiError> {
let handle = open_handle(&state, &id)?;
let room = q.room.as_deref().map(RoomType::parse);
let drawers = handle.list_drawers(room, q.tag.clone(), q.limit.unwrap_or(50));
Ok(Json(serde_json::to_value(drawers).unwrap_or(json!([]))))
}
#[derive(Deserialize)]
struct CreateDrawerBody {
content: String,
#[serde(default)]
room: Option<String>,
#[serde(default)]
tags: Vec<String>,
#[serde(default)]
importance: Option<f32>,
}
async fn create_drawer(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Json(body): Json<CreateDrawerBody>,
) -> Result<Json<Value>, ApiError> {
let handle = open_handle(&state, &id)?;
let room = body
.room
.as_deref()
.map(RoomType::parse)
.unwrap_or(RoomType::General);
let importance = body.importance.unwrap_or(0.5);
let drawer_id = handle
.remember(body.content, room, body.tags, importance)
.await
.map_err(|e| ApiError::internal(format!("remember: {e:#}")))?;
let drawer_count = handle.drawers.read().len();
state.emit(DaemonEvent::DrawerAdded {
palace_id: id.clone(),
drawer_count,
});
state.emit(aggregate_status_event(&state));
Ok(Json(json!({ "id": drawer_id })))
}
async fn delete_drawer(
State(state): State<AppState>,
AxumPath((id, drawer_id)): AxumPath<(String, String)>,
) -> Result<StatusCode, ApiError> {
let handle = open_handle(&state, &id)?;
let uuid = Uuid::parse_str(&drawer_id)
.map_err(|_| ApiError::bad_request("drawer_id must be a UUID"))?;
handle
.forget(uuid)
.await
.map_err(|e| ApiError::internal(format!("forget: {e:#}")))?;
let drawer_count = handle.drawers.read().len();
state.emit(DaemonEvent::DrawerDeleted {
palace_id: id.clone(),
drawer_count,
});
state.emit(aggregate_status_event(&state));
Ok(StatusCode::NO_CONTENT)
}
fn aggregate_status_event(state: &AppState) -> DaemonEvent {
let palaces = PalaceRegistry::list_palaces(&state.data_root).unwrap_or_default();
let (mut total_drawers, mut total_vectors, mut total_kg_triples) = (0usize, 0usize, 0usize);
for p in &palaces {
if let Ok(handle) = state.registry.open_palace(&state.data_root, &p.id) {
total_drawers = total_drawers.saturating_add(handle.drawers.read().len());
total_vectors = total_vectors.saturating_add(handle.vector_store.index_size());
total_kg_triples = total_kg_triples.saturating_add(handle.kg.count_active_triples());
}
}
DaemonEvent::StatusChanged {
total_drawers,
total_vectors,
total_kg_triples,
}
}
#[derive(Deserialize)]
struct RecallQuery {
q: String,
#[serde(default)]
top_k: Option<usize>,
#[serde(default)]
deep: Option<bool>,
}
async fn recall_handler(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Query(q): Query<RecallQuery>,
) -> Result<Json<Value>, ApiError> {
let handle = open_handle(&state, &id)?;
let top_k = q.top_k.unwrap_or(10);
let results = if q.deep.unwrap_or(false) {
recall_deep_with_default_embedder(&handle, &q.q, top_k).await
} else {
recall_with_default_embedder(&handle, &q.q, top_k).await
}
.map_err(|e| ApiError::internal(format!("recall: {e:#}")))?;
let payload: Vec<Value> = results
.into_iter()
.map(|r| {
json!({
"drawer": r.drawer,
"score": r.score,
"layer": r.layer,
})
})
.collect();
Ok(Json(json!(payload)))
}
async fn recall_all_handler(
State(state): State<AppState>,
Query(q): Query<RecallQuery>,
) -> Result<Json<Value>, ApiError> {
let top_k = q.top_k.unwrap_or(10);
let deep = q.deep.unwrap_or(false);
let value = execute_recall_all(&state, &q.q, top_k, deep).await;
if let Some(err) = value.get("error").and_then(|v| v.as_str()) {
return Err(ApiError::internal(err.to_string()));
}
Ok(Json(value))
}
#[derive(Deserialize)]
struct KgQueryParams {
subject: String,
}
async fn kg_query(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Query(q): Query<KgQueryParams>,
) -> Result<Json<Vec<Triple>>, ApiError> {
let handle = open_handle(&state, &id)?;
let triples = handle
.kg
.query_active(&q.subject)
.await
.map_err(|e| ApiError::internal(format!("kg query: {e:#}")))?;
Ok(Json(triples))
}
#[derive(Deserialize)]
struct KgAssertBody {
subject: String,
predicate: String,
object: String,
#[serde(default)]
confidence: Option<f32>,
#[serde(default)]
provenance: Option<String>,
}
async fn kg_assert(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
Json(body): Json<KgAssertBody>,
) -> Result<StatusCode, ApiError> {
let handle = open_handle(&state, &id)?;
let triple = Triple {
subject: body.subject,
predicate: body.predicate,
object: body.object,
valid_from: chrono::Utc::now(),
valid_to: None,
confidence: body.confidence.unwrap_or(1.0),
provenance: body.provenance,
};
handle
.kg
.assert(triple)
.await
.map_err(|e| ApiError::internal(format!("kg assert: {e:#}")))?;
Ok(StatusCode::NO_CONTENT)
}
#[derive(Serialize, Default)]
struct DreamStatusPayload {
last_run_at: Option<chrono::DateTime<chrono::Utc>>,
merged: usize,
pruned: usize,
compacted: usize,
closets_updated: usize,
duration_ms: u64,
}
impl From<PersistedDreamStats> for DreamStatusPayload {
fn from(p: PersistedDreamStats) -> Self {
Self {
last_run_at: Some(p.last_run_at),
merged: p.stats.merged,
pruned: p.stats.pruned,
compacted: p.stats.compacted,
closets_updated: p.stats.closets_updated,
duration_ms: p.stats.duration_ms,
}
}
}
async fn dream_status(State(state): State<AppState>) -> Json<DreamStatusPayload> {
let palaces = PalaceRegistry::list_palaces(&state.data_root).unwrap_or_default();
let mut out = DreamStatusPayload::default();
let mut latest: Option<chrono::DateTime<chrono::Utc>> = None;
for p in palaces {
let data_dir = state.data_root.join(p.id.as_str());
let snap = match PersistedDreamStats::load(&data_dir) {
Ok(Some(s)) => s,
_ => continue,
};
out.merged = out.merged.saturating_add(snap.stats.merged);
out.pruned = out.pruned.saturating_add(snap.stats.pruned);
out.compacted = out.compacted.saturating_add(snap.stats.compacted);
out.closets_updated = out
.closets_updated
.saturating_add(snap.stats.closets_updated);
out.duration_ms = out.duration_ms.saturating_add(snap.stats.duration_ms);
latest = match latest {
Some(t) if t >= snap.last_run_at => Some(t),
_ => Some(snap.last_run_at),
};
}
out.last_run_at = latest;
Json(out)
}
async fn palace_dream_status(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
) -> Result<Json<DreamStatusPayload>, ApiError> {
let data_dir = state.data_root.join(&id);
if !data_dir.exists() {
return Err(ApiError::not_found(format!("palace not found: {id}")));
}
let payload = match PersistedDreamStats::load(&data_dir) {
Ok(Some(s)) => s.into(),
Ok(None) => DreamStatusPayload::default(),
Err(e) => return Err(ApiError::internal(format!("read dream stats: {e:#}"))),
};
Ok(Json(payload))
}
async fn dream_run(State(state): State<AppState>) -> Result<Json<DreamStatusPayload>, ApiError> {
let palaces = PalaceRegistry::list_palaces(&state.data_root)
.map_err(|e| ApiError::internal(format!("list palaces: {e:#}")))?;
let dreamer = Dreamer::new(DreamConfig::default());
let mut out = DreamStatusPayload::default();
for p in palaces {
let handle = match state.registry.open_palace(&state.data_root, &p.id) {
Ok(h) => h,
Err(e) => {
tracing::warn!(palace = %p.id, "dream_run: open failed: {e:#}");
continue;
}
};
match dreamer.dream_cycle(&handle).await {
Ok(stats) => {
out.merged = out.merged.saturating_add(stats.merged);
out.pruned = out.pruned.saturating_add(stats.pruned);
out.compacted = out.compacted.saturating_add(stats.compacted);
out.closets_updated = out.closets_updated.saturating_add(stats.closets_updated);
out.duration_ms = out.duration_ms.saturating_add(stats.duration_ms);
}
Err(e) => tracing::warn!(palace = %p.id, "dream_run: cycle failed: {e:#}"),
}
}
out.last_run_at = Some(chrono::Utc::now());
state.emit(DaemonEvent::DreamCompleted {
palace_id: None,
merged: out.merged,
pruned: out.pruned,
compacted: out.compacted,
closets_updated: out.closets_updated,
duration_ms: out.duration_ms,
});
state.emit(aggregate_status_event(&state));
Ok(Json(out))
}
#[derive(Deserialize)]
struct ChatBody {
#[serde(default)]
palace_id: Option<String>,
message: String,
#[serde(default)]
history: Vec<ChatMessage>,
#[serde(default)]
session_id: Option<String>,
}
const MAX_TOOL_ROUNDS: usize = 10;
fn all_tools() -> Vec<ToolDef> {
vec![
ToolDef {
name: "list_palaces".into(),
description: "List all memory palaces on this machine with their metadata (id, name, description, counts).".into(),
parameters: json!({ "type": "object", "properties": {}, "required": [] }),
},
ToolDef {
name: "get_palace".into(),
description: "Get details for a specific palace by id.".into(),
parameters: json!({
"type": "object",
"properties": { "palace_id": { "type": "string", "description": "Palace id (kebab-case)" } },
"required": ["palace_id"],
}),
},
ToolDef {
name: "recall_memories".into(),
description: "Semantic search for memories in a palace. Returns the top-k most relevant drawers ranked by similarity to the query.".into(),
parameters: json!({
"type": "object",
"properties": {
"palace_id": { "type": "string" },
"query": { "type": "string", "description": "Free-text query" },
"top_k": { "type": "integer", "minimum": 1, "maximum": 50, "default": 5 }
},
"required": ["palace_id", "query"],
}),
},
ToolDef {
name: "list_drawers".into(),
description: "List all drawers (memories) in a palace, most recent first.".into(),
parameters: json!({
"type": "object",
"properties": { "palace_id": { "type": "string" } },
"required": ["palace_id"],
}),
},
ToolDef {
name: "kg_query".into(),
description: "Query the temporal knowledge graph for all currently-active triples whose subject matches.".into(),
parameters: json!({
"type": "object",
"properties": {
"palace_id": { "type": "string" },
"subject": { "type": "string" }
},
"required": ["palace_id", "subject"],
}),
},
ToolDef {
name: "get_config".into(),
description: "Get the trusty-memory daemon's configuration (provider, model, data root). API keys are masked.".into(),
parameters: json!({ "type": "object", "properties": {}, "required": [] }),
},
ToolDef {
name: "get_status".into(),
description: "Get daemon health: version, palace count, totals for drawers/vectors/triples.".into(),
parameters: json!({ "type": "object", "properties": {}, "required": [] }),
},
ToolDef {
name: "get_dream_status".into(),
description: "Get aggregated dreamer activity across all palaces (merged/pruned/compacted counts, last run timestamp).".into(),
parameters: json!({ "type": "object", "properties": {}, "required": [] }),
},
ToolDef {
name: "get_palace_dream_status".into(),
description: "Get dreamer activity stats for a specific palace.".into(),
parameters: json!({
"type": "object",
"properties": { "palace_id": { "type": "string" } },
"required": ["palace_id"],
}),
},
ToolDef {
name: "create_memory".into(),
description: "Store a new memory (drawer) in a palace. The content is embedded and inserted into the vector index plus the drawer table.".into(),
parameters: json!({
"type": "object",
"properties": {
"palace_id": { "type": "string" },
"content": { "type": "string", "description": "Verbatim memory text" },
"room": { "type": "string", "description": "Room name (Frontend/Backend/Testing/Planning/Documentation/Research/Configuration/Meetings/General or a custom name); defaults to General." },
"tags": { "type": "array", "items": { "type": "string" } },
"importance": { "type": "number", "minimum": 0.0, "maximum": 1.0, "default": 0.5 }
},
"required": ["palace_id", "content"],
}),
},
ToolDef {
name: "kg_assert".into(),
description: "Assert a knowledge-graph triple. Any prior active triple with the same (subject, predicate) is closed out (valid_to set to now) before the new one is inserted.".into(),
parameters: json!({
"type": "object",
"properties": {
"palace_id": { "type": "string" },
"subject": { "type": "string" },
"predicate": { "type": "string" },
"object": { "type": "string" },
"confidence": { "type": "number", "minimum": 0.0, "maximum": 1.0, "default": 1.0 }
},
"required": ["palace_id", "subject", "predicate", "object"],
}),
},
ToolDef {
name: "memory_recall_all".into(),
description: "Semantic search across ALL palaces simultaneously. Returns the top-k most relevant drawers ranked by similarity, regardless of which palace they belong to. Each result includes a `palace_id` field identifying its source.".into(),
parameters: json!({
"type": "object",
"properties": {
"q": { "type": "string", "description": "Free-text query" },
"top_k": { "type": "integer", "minimum": 1, "maximum": 50, "default": 10 },
"deep": { "type": "boolean", "default": false }
},
"required": ["q"],
}),
},
]
}
async fn execute_tool(name: &str, args: &str, state: &AppState) -> Value {
let parsed: Value = serde_json::from_str(args).unwrap_or(json!({}));
match name {
"list_palaces" => execute_list_palaces(state).await,
"get_palace" => match parsed.get("palace_id").and_then(|v| v.as_str()) {
Some(id) => execute_get_palace(state, id).await,
None => json!({ "error": "missing required argument: palace_id" }),
},
"recall_memories" => {
let pid = parsed.get("palace_id").and_then(|v| v.as_str());
let q = parsed.get("query").and_then(|v| v.as_str());
let top_k = parsed.get("top_k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
match (pid, q) {
(Some(p), Some(q)) => execute_recall(state, p, q, top_k).await,
_ => json!({ "error": "missing required argument(s): palace_id, query" }),
}
}
"list_drawers" => match parsed.get("palace_id").and_then(|v| v.as_str()) {
Some(id) => execute_list_drawers(state, id).await,
None => json!({ "error": "missing required argument: palace_id" }),
},
"kg_query" => {
let pid = parsed.get("palace_id").and_then(|v| v.as_str());
let subj = parsed.get("subject").and_then(|v| v.as_str());
match (pid, subj) {
(Some(p), Some(s)) => execute_kg_query(state, p, s).await,
_ => json!({ "error": "missing required argument(s): palace_id, subject" }),
}
}
"get_config" => execute_get_config(state),
"get_status" => execute_get_status(state).await,
"get_dream_status" => execute_get_dream_status(state).await,
"get_palace_dream_status" => match parsed.get("palace_id").and_then(|v| v.as_str()) {
Some(id) => execute_get_palace_dream_status(state, id).await,
None => json!({ "error": "missing required argument: palace_id" }),
},
"create_memory" => {
let pid = parsed.get("palace_id").and_then(|v| v.as_str());
let content = parsed.get("content").and_then(|v| v.as_str());
let room = parsed.get("room").and_then(|v| v.as_str());
let tags: Vec<String> = parsed
.get("tags")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|t| t.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let importance = parsed
.get("importance")
.and_then(|v| v.as_f64())
.map(|f| f as f32)
.unwrap_or(0.5);
match (pid, content) {
(Some(p), Some(c)) => {
execute_create_memory(state, p, c, room, tags, importance).await
}
_ => json!({ "error": "missing required argument(s): palace_id, content" }),
}
}
"kg_assert" => {
let pid = parsed.get("palace_id").and_then(|v| v.as_str());
let subj = parsed.get("subject").and_then(|v| v.as_str());
let pred = parsed.get("predicate").and_then(|v| v.as_str());
let obj = parsed.get("object").and_then(|v| v.as_str());
let conf = parsed
.get("confidence")
.and_then(|v| v.as_f64())
.map(|f| f as f32)
.unwrap_or(1.0);
match (pid, subj, pred, obj) {
(Some(p), Some(s), Some(pr), Some(o)) => {
execute_kg_assert(state, p, s, pr, o, conf).await
}
_ => json!({
"error": "missing required argument(s): palace_id, subject, predicate, object"
}),
}
}
"memory_recall_all" => {
let q = parsed.get("q").and_then(|v| v.as_str());
let top_k = parsed.get("top_k").and_then(|v| v.as_u64()).unwrap_or(10) as usize;
let deep = parsed
.get("deep")
.and_then(|v| v.as_bool())
.unwrap_or(false);
match q {
Some(q) => execute_recall_all(state, q, top_k, deep).await,
None => json!({ "error": "missing required argument: q" }),
}
}
_ => json!({ "error": format!("unknown tool: {name}") }),
}
}
async fn execute_list_palaces(state: &AppState) -> Value {
let palaces = match PalaceRegistry::list_palaces(&state.data_root) {
Ok(v) => v,
Err(e) => return json!({ "error": format!("list palaces: {e:#}") }),
};
let out: Vec<Value> = palaces
.into_iter()
.map(|p| {
let handle = state.registry.open_palace(&state.data_root, &p.id).ok();
let info = palace_info_from(&p, handle.as_ref());
serde_json::to_value(info).unwrap_or(json!({}))
})
.collect();
json!(out)
}
async fn execute_get_palace(state: &AppState, id: &str) -> Value {
let palaces = match PalaceRegistry::list_palaces(&state.data_root) {
Ok(v) => v,
Err(e) => return json!({ "error": format!("list palaces: {e:#}") }),
};
match palaces.into_iter().find(|p| p.id.0 == id) {
Some(p) => {
let handle = state.registry.open_palace(&state.data_root, &p.id).ok();
serde_json::to_value(palace_info_from(&p, handle.as_ref())).unwrap_or(json!({}))
}
None => json!({ "error": format!("palace not found: {id}") }),
}
}
async fn execute_recall(state: &AppState, palace_id: &str, query: &str, top_k: usize) -> Value {
let handle = match state
.registry
.open_palace(&state.data_root, &PalaceId::new(palace_id))
{
Ok(h) => h,
Err(e) => return json!({ "error": format!("open palace {palace_id}: {e:#}") }),
};
match recall_with_default_embedder(&handle, query, top_k).await {
Ok(hits) => json!(hits
.into_iter()
.map(|r| json!({
"drawer_id": r.drawer.id.to_string(),
"content": r.drawer.content,
"importance": r.drawer.importance,
"tags": r.drawer.tags,
"score": r.score,
"layer": r.layer,
}))
.collect::<Vec<_>>()),
Err(e) => json!({ "error": format!("recall: {e:#}") }),
}
}
async fn execute_recall_all(state: &AppState, query: &str, top_k: usize, deep: bool) -> Value {
let palaces = match PalaceRegistry::list_palaces(&state.data_root) {
Ok(v) => v,
Err(e) => return json!({ "error": format!("list palaces: {e:#}") }),
};
let mut handles = Vec::with_capacity(palaces.len());
for p in &palaces {
match state.registry.open_palace(&state.data_root, &p.id) {
Ok(h) => handles.push(h),
Err(e) => {
tracing::warn!(palace = %p.id, "execute_recall_all: open failed: {e:#}");
}
}
}
if handles.is_empty() {
return json!([]);
}
match recall_across_palaces_with_default_embedder(&handles, query, top_k, deep).await {
Ok(results) => json!(results
.into_iter()
.map(|r| json!({
"palace_id": r.palace_id,
"drawer_id": r.result.drawer.id.to_string(),
"content": r.result.drawer.content,
"importance": r.result.drawer.importance,
"tags": r.result.drawer.tags,
"score": r.result.score,
"layer": r.result.layer,
}))
.collect::<Vec<_>>()),
Err(e) => json!({ "error": format!("recall_across_palaces: {e:#}") }),
}
}
async fn execute_list_drawers(state: &AppState, palace_id: &str) -> Value {
let handle = match state
.registry
.open_palace(&state.data_root, &PalaceId::new(palace_id))
{
Ok(h) => h,
Err(e) => return json!({ "error": format!("open palace {palace_id}: {e:#}") }),
};
let drawers = handle.list_drawers(None, None, 200);
serde_json::to_value(drawers).unwrap_or(json!([]))
}
async fn execute_kg_query(state: &AppState, palace_id: &str, subject: &str) -> Value {
let handle = match state
.registry
.open_palace(&state.data_root, &PalaceId::new(palace_id))
{
Ok(h) => h,
Err(e) => return json!({ "error": format!("open palace {palace_id}: {e:#}") }),
};
match handle.kg.query_active(subject).await {
Ok(triples) => serde_json::to_value(triples).unwrap_or(json!([])),
Err(e) => json!({ "error": format!("kg query: {e:#}") }),
}
}
fn execute_get_config(state: &AppState) -> Value {
let cfg = load_user_config().unwrap_or_default();
json!({
"openrouter_configured": !cfg.openrouter_api_key.is_empty(),
"openrouter_model": cfg.openrouter_model,
"local_model": {
"enabled": cfg.local_model.enabled,
"base_url": cfg.local_model.base_url,
"model": cfg.local_model.model,
},
"data_root": state.data_root.display().to_string(),
})
}
async fn execute_get_status(state: &AppState) -> Value {
let palaces = PalaceRegistry::list_palaces(&state.data_root).unwrap_or_default();
let (mut total_drawers, mut total_vectors, mut total_kg_triples) = (0usize, 0usize, 0usize);
for p in &palaces {
if let Ok(handle) = state.registry.open_palace(&state.data_root, &p.id) {
total_drawers = total_drawers.saturating_add(handle.drawers.read().len());
total_vectors = total_vectors.saturating_add(handle.vector_store.index_size());
total_kg_triples = total_kg_triples.saturating_add(handle.kg.count_active_triples());
}
}
json!({
"version": state.version,
"palace_count": palaces.len(),
"default_palace": state.default_palace,
"data_root": state.data_root.display().to_string(),
"total_drawers": total_drawers,
"total_vectors": total_vectors,
"total_kg_triples": total_kg_triples,
})
}
async fn execute_get_dream_status(state: &AppState) -> Value {
let palaces = PalaceRegistry::list_palaces(&state.data_root).unwrap_or_default();
let mut out = DreamStatusPayload::default();
let mut latest: Option<chrono::DateTime<chrono::Utc>> = None;
for p in palaces {
let data_dir = state.data_root.join(p.id.as_str());
let snap = match PersistedDreamStats::load(&data_dir) {
Ok(Some(s)) => s,
_ => continue,
};
out.merged = out.merged.saturating_add(snap.stats.merged);
out.pruned = out.pruned.saturating_add(snap.stats.pruned);
out.compacted = out.compacted.saturating_add(snap.stats.compacted);
out.closets_updated = out
.closets_updated
.saturating_add(snap.stats.closets_updated);
out.duration_ms = out.duration_ms.saturating_add(snap.stats.duration_ms);
latest = match latest {
Some(t) if t >= snap.last_run_at => Some(t),
_ => Some(snap.last_run_at),
};
}
out.last_run_at = latest;
serde_json::to_value(out).unwrap_or(json!({}))
}
async fn execute_get_palace_dream_status(state: &AppState, palace_id: &str) -> Value {
let data_dir = state.data_root.join(palace_id);
if !data_dir.exists() {
return json!({ "error": format!("palace not found: {palace_id}") });
}
match PersistedDreamStats::load(&data_dir) {
Ok(Some(s)) => serde_json::to_value(DreamStatusPayload::from(s)).unwrap_or(json!({})),
Ok(None) => serde_json::to_value(DreamStatusPayload::default()).unwrap_or(json!({})),
Err(e) => json!({ "error": format!("read dream stats: {e:#}") }),
}
}
async fn execute_create_memory(
state: &AppState,
palace_id: &str,
content: &str,
room: Option<&str>,
tags: Vec<String>,
importance: f32,
) -> Value {
let handle = match state
.registry
.open_palace(&state.data_root, &PalaceId::new(palace_id))
{
Ok(h) => h,
Err(e) => return json!({ "error": format!("open palace {palace_id}: {e:#}") }),
};
let room = room.map(RoomType::parse).unwrap_or(RoomType::General);
match handle
.remember(content.to_string(), room, tags, importance)
.await
{
Ok(id) => json!({ "drawer_id": id.to_string(), "status": "stored" }),
Err(e) => json!({ "error": format!("remember: {e:#}") }),
}
}
async fn execute_kg_assert(
state: &AppState,
palace_id: &str,
subject: &str,
predicate: &str,
object: &str,
confidence: f32,
) -> Value {
let handle = match state
.registry
.open_palace(&state.data_root, &PalaceId::new(palace_id))
{
Ok(h) => h,
Err(e) => return json!({ "error": format!("open palace {palace_id}: {e:#}") }),
};
let triple = Triple {
subject: subject.to_string(),
predicate: predicate.to_string(),
object: object.to_string(),
valid_from: chrono::Utc::now(),
valid_to: None,
confidence,
provenance: Some("chat:assistant".to_string()),
};
match handle.kg.assert(triple).await {
Ok(()) => json!({ "status": "asserted" }),
Err(e) => json!({ "error": format!("kg assert: {e:#}") }),
}
}
async fn chat_handler(State(state): State<AppState>, Json(body): Json<ChatBody>) -> Response {
let Some(provider) = state.chat_provider().await else {
return (
StatusCode::PRECONDITION_FAILED,
"No chat provider configured (no local Ollama detected and no OpenRouter key set)",
)
.into_response();
};
let palace_id = body
.palace_id
.clone()
.or_else(|| state.default_palace.clone())
.unwrap_or_default();
let (session_id, mut history): (Option<String>, Vec<ChatMessage>) = if !palace_id.is_empty() {
let store = match state.session_store(&palace_id) {
Ok(s) => s,
Err(e) => {
tracing::warn!(palace = %palace_id, "session_store open failed: {e:#}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("session store: {e:#}"),
)
.into_response();
}
};
match body.session_id.clone() {
Some(sid) => match store.get_session(&sid) {
Ok(Some(s)) => (
Some(sid),
s.history
.into_iter()
.map(|m| ChatMessage {
role: m.role,
content: m.content,
tool_call_id: None,
tool_calls: None,
})
.collect(),
),
_ => (Some(sid), body.history.clone()),
},
None => {
let new_id = store.create_session(None).unwrap_or_else(|e| {
tracing::warn!("create_session failed: {e:#}");
String::new()
});
(
if new_id.is_empty() {
None
} else {
Some(new_id)
},
body.history.clone(),
)
}
}
} else {
(None, body.history.clone())
};
let all_palaces = PalaceRegistry::list_palaces(&state.data_root).unwrap_or_default();
let palace_count = all_palaces.len();
let palace_roster: String = all_palaces
.iter()
.map(|p| format!("- {} (id: {})", p.name, p.id.0))
.collect::<Vec<_>>()
.join("\n");
let cfg = load_user_config().unwrap_or_default();
let active_provider_name = state
.chat_provider()
.await
.map(|p| p.name().to_string())
.unwrap_or_else(|| "none".to_string());
let dream_snapshot = execute_get_dream_status(&state).await;
let selected_palace_meta = if palace_id.is_empty() {
None
} else {
all_palaces.iter().find(|p| p.id.0 == palace_id).cloned()
};
let mut palace_block = String::new();
let mut context = String::new();
let mut palace_display_name = palace_id.clone();
if !palace_id.is_empty() {
if let Ok(handle) = state
.registry
.open_palace(&state.data_root, &PalaceId::new(&palace_id))
{
let drawer_count = handle.drawers.read().len();
let vector_count = handle.vector_store.index_size();
let kg_triple_count = handle.kg.count_active_triples();
let (name, description) = match &selected_palace_meta {
Some(p) => (p.name.clone(), p.description.clone()),
None => (palace_id.clone(), None),
};
palace_display_name = name.clone();
palace_block.push_str(&format!(
"Currently selected palace:\n\
- id: {id}\n\
- name: {name}\n",
id = palace_id,
name = name,
));
if let Some(desc) = description.as_deref().filter(|s| !s.is_empty()) {
palace_block.push_str(&format!("- description: {desc}\n"));
}
palace_block.push_str(&format!(
"- drawers: {drawer_count}\n\
- vectors: {vector_count}\n\
- kg_triples: {kg_triple_count}\n",
));
let identity_trimmed = handle.identity.trim();
if !identity_trimmed.is_empty() {
palace_block.push_str(&format!("- identity:\n{identity_trimmed}\n",));
}
if let Ok(hits) = recall_with_default_embedder(&handle, &body.message, 5).await {
for r in hits.iter().take(5) {
context.push_str(&format!("- (L{}) {}\n", r.layer, r.drawer.content));
}
}
}
}
let mut system = String::new();
system.push_str(&format!(
"You are the assistant for trusty-memory, a machine-wide AI memory \
service running locally on this user's machine. trusty-memory stores \
knowledge in named \"palaces\" — isolated memory namespaces, each with \
its own vector index (usearch HNSW) and temporal knowledge graph \
(SQLite). Memories are organized as Palace -> Wing -> Room -> Closet \
-> Drawer, where a Drawer is an atomic memory unit.\n\
There are currently {palace_count} palace(s) on this machine.\n",
));
if !palace_roster.is_empty() {
system.push_str(&format!("Palaces:\n{palace_roster}\n"));
}
system.push('\n');
system.push_str(&format!(
"System configuration:\n\
- active chat provider: {active_provider_name}\n\
- openrouter model: {or_model}\n\
- local model: {local_model} ({local_url}, enabled={local_enabled})\n\
- data root: {data_root}\n\n",
or_model = cfg.openrouter_model,
local_model = cfg.local_model.model,
local_url = cfg.local_model.base_url,
local_enabled = cfg.local_model.enabled,
data_root = state.data_root.display(),
));
system.push_str(&format!(
"Global dream status (background memory maintenance):\n{}\n\n",
dream_snapshot,
));
if !palace_block.is_empty() {
system.push_str(&palace_block);
system.push('\n');
}
if !context.is_empty() {
system.push_str(&format!(
"Relevant memories from the '{palace_display_name}' palace \
(L0 = identity, L1 = essentials, L2 = topic-filtered, L3 = deep):\n\
{context}\n",
));
}
system.push_str(
"You have a set of tools to introspect and modify this trusty-memory \
daemon. Prefer calling a tool over guessing — e.g. call \
`list_palaces` rather than relying on the roster above if you need \
live counts, and call `recall_memories` to search for facts you \
don't have in context. When the user asks about \"palaces\", they \
mean trusty-memory palaces (memory namespaces on this machine), not \
architectural palaces like Versailles. If a tool returns an error, \
report it honestly and don't fabricate results.",
);
history.push(ChatMessage {
role: "user".to_string(),
content: body.message.clone(),
tool_call_id: None,
tool_calls: None,
});
let mut messages: Vec<ChatMessage> = Vec::with_capacity(history.len() + 1);
messages.push(ChatMessage {
role: "system".to_string(),
content: system,
tool_call_id: None,
tool_calls: None,
});
messages.extend(history.iter().cloned());
let tools = all_tools();
let (sse_tx, sse_rx) =
tokio::sync::mpsc::channel::<Result<axum::body::Bytes, std::io::Error>>(64);
let session_store = if !palace_id.is_empty() && session_id.is_some() {
state.session_store(&palace_id).ok()
} else {
None
};
let persist_session_id = session_id.clone();
let loop_state = state.clone();
tokio::spawn(async move {
if let Some(sid) = persist_session_id.as_deref() {
let frame = format!("data: {}\n\n", json!({ "session_id": sid }));
if sse_tx
.send(Ok(axum::body::Bytes::from(frame)))
.await
.is_err()
{
return;
}
}
let mut final_assistant_text = String::new();
let mut stream_err: Option<String> = None;
for round in 0..MAX_TOOL_ROUNDS {
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<ChatEvent>(256);
let messages_clone = messages.clone();
let tools_clone = tools.clone();
let provider_clone = provider.clone();
let stream_handle = tokio::spawn(async move {
provider_clone
.chat_stream(messages_clone, tools_clone, event_tx)
.await
});
let mut tool_calls_this_round: Vec<trusty_common::ToolCall> = Vec::new();
let mut round_assistant_text = String::new();
while let Some(event) = event_rx.recv().await {
match event {
ChatEvent::Delta(text) => {
round_assistant_text.push_str(&text);
let frame = format!("data: {}\n\n", json!({ "delta": text }));
if sse_tx
.send(Ok(axum::body::Bytes::from(frame)))
.await
.is_err()
{
return;
}
}
ChatEvent::ToolCall(tc) => {
let frame = format!(
"data: {}\n\n",
json!({ "tool_call": {
"id": tc.id,
"name": tc.name,
"arguments": tc.arguments,
}})
);
let _ = sse_tx.send(Ok(axum::body::Bytes::from(frame))).await;
tool_calls_this_round.push(tc);
}
ChatEvent::Done => break,
ChatEvent::Error(e) => {
stream_err = Some(e);
break;
}
}
}
match stream_handle.await {
Ok(Ok(())) => {}
Ok(Err(e)) => stream_err = Some(e.to_string()),
Err(e) => stream_err = Some(format!("join: {e}")),
}
if stream_err.is_some() {
break;
}
final_assistant_text.push_str(&round_assistant_text);
if tool_calls_this_round.is_empty() {
break;
}
let assistant_tool_calls_json: Vec<Value> = tool_calls_this_round
.iter()
.map(|tc| {
json!({
"id": tc.id,
"type": "function",
"function": { "name": tc.name, "arguments": tc.arguments },
})
})
.collect();
messages.push(ChatMessage {
role: "assistant".to_string(),
content: round_assistant_text,
tool_call_id: None,
tool_calls: Some(assistant_tool_calls_json),
});
for tc in &tool_calls_this_round {
let result = execute_tool(&tc.name, &tc.arguments, &loop_state).await;
let result_str = result.to_string();
let frame = format!(
"data: {}\n\n",
json!({ "tool_result": {
"id": tc.id,
"name": tc.name,
"content": &result_str,
}})
);
let _ = sse_tx.send(Ok(axum::body::Bytes::from(frame))).await;
messages.push(ChatMessage {
role: "tool".to_string(),
content: result_str,
tool_call_id: Some(tc.id.clone()),
tool_calls: None,
});
}
if round + 1 == MAX_TOOL_ROUNDS {
tracing::warn!(
"chat: hit MAX_TOOL_ROUNDS={} — terminating tool loop",
MAX_TOOL_ROUNDS
);
}
}
if let (Some(store), Some(sid)) = (session_store, persist_session_id.as_deref()) {
if !final_assistant_text.is_empty() {
history.push(ChatMessage {
role: "assistant".into(),
content: final_assistant_text,
tool_call_id: None,
tool_calls: None,
});
}
let core_history: Vec<trusty_memory_core::store::chat_sessions::ChatMessage> = history
.iter()
.map(|m| trusty_memory_core::store::chat_sessions::ChatMessage {
role: m.role.clone(),
content: m.content.clone(),
})
.collect();
if let Err(e) = store.upsert_session(sid, &core_history) {
tracing::warn!("upsert_session failed: {e:#}");
}
}
match stream_err {
None => {
let _ = sse_tx
.send(Ok(axum::body::Bytes::from("data: [DONE]\n\n")))
.await;
}
Some(e) => {
let out = format!("data: {}\n\n", json!({ "error": e }));
let _ = sse_tx.send(Ok(axum::body::Bytes::from(out))).await;
}
}
});
let stream = tokio_stream::wrappers::ReceiverStream::new(sse_rx);
Response::builder()
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.body(Body::from_stream(stream))
.expect("static SSE response builds")
}
async fn list_providers(State(state): State<AppState>) -> Json<Value> {
let cfg = load_user_config().unwrap_or_default();
let ollama_available = if cfg.local_model.enabled {
trusty_common::auto_detect_local_provider(&cfg.local_model.base_url)
.await
.is_some()
} else {
false
};
let openrouter_available = !cfg.openrouter_api_key.is_empty();
let active = state.chat_provider().await.map(|p| p.name().to_string());
Json(json!({
"providers": [
{
"name": "ollama",
"model": cfg.local_model.model,
"available": ollama_available,
},
{
"name": "openrouter",
"model": cfg.openrouter_model,
"available": openrouter_available,
}
],
"active": active,
}))
}
#[derive(Deserialize, Default)]
struct CreateSessionBody {
#[serde(default)]
title: Option<String>,
}
async fn create_chat_session(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
body: Option<Json<CreateSessionBody>>,
) -> Result<Json<Value>, ApiError> {
let store = state
.session_store(&id)
.map_err(|e| ApiError::internal(format!("session store: {e:#}")))?;
let title = body.and_then(|b| b.0.title);
let sid = store
.create_session(title)
.map_err(|e| ApiError::internal(format!("create session: {e:#}")))?;
Ok(Json(json!({ "id": sid })))
}
async fn list_chat_sessions(
State(state): State<AppState>,
AxumPath(id): AxumPath<String>,
) -> Result<Json<Value>, ApiError> {
let store = state
.session_store(&id)
.map_err(|e| ApiError::internal(format!("session store: {e:#}")))?;
let metas = store
.list_sessions()
.map_err(|e| ApiError::internal(format!("list sessions: {e:#}")))?;
Ok(Json(serde_json::to_value(metas).unwrap_or(json!([]))))
}
async fn get_chat_session(
State(state): State<AppState>,
AxumPath((id, session_id)): AxumPath<(String, String)>,
) -> Result<Json<Value>, ApiError> {
let store = state
.session_store(&id)
.map_err(|e| ApiError::internal(format!("session store: {e:#}")))?;
let s = store
.get_session(&session_id)
.map_err(|e| ApiError::internal(format!("get session: {e:#}")))?
.ok_or_else(|| ApiError::not_found(format!("session not found: {session_id}")))?;
Ok(Json(serde_json::to_value(s).unwrap_or(json!({}))))
}
async fn delete_chat_session(
State(state): State<AppState>,
AxumPath((id, session_id)): AxumPath<(String, String)>,
) -> Result<StatusCode, ApiError> {
let store = state
.session_store(&id)
.map_err(|e| ApiError::internal(format!("session store: {e:#}")))?;
store
.delete_session(&session_id)
.map_err(|e| ApiError::internal(format!("delete session: {e:#}")))?;
Ok(StatusCode::NO_CONTENT)
}
fn open_handle(
state: &AppState,
id: &str,
) -> Result<std::sync::Arc<trusty_memory_core::PalaceHandle>, ApiError> {
state
.registry
.open_palace(&state.data_root, &PalaceId::new(id))
.map_err(|e| ApiError::not_found(format!("palace not found: {id} ({e:#})")))
}
struct ApiError {
status: StatusCode,
message: String,
}
impl ApiError {
fn bad_request(msg: impl Into<String>) -> Self {
Self {
status: StatusCode::BAD_REQUEST,
message: msg.into(),
}
}
fn not_found(msg: impl Into<String>) -> Self {
Self {
status: StatusCode::NOT_FOUND,
message: msg.into(),
}
}
fn internal(msg: impl Into<String>) -> Self {
Self {
status: StatusCode::INTERNAL_SERVER_ERROR,
message: msg.into(),
}
}
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
(self.status, Json(json!({ "error": self.message }))).into_response()
}
}
#[cfg(test)]
mod tests {
use super::*;
use axum::body::to_bytes;
use axum::http::Request;
use tower::util::ServiceExt;
fn test_state() -> AppState {
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().to_path_buf();
std::mem::forget(tmp);
AppState::new(root)
}
#[tokio::test]
async fn health_endpoint_returns_ok() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 1024).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["status"], "ok");
assert_eq!(v["version"], env!("CARGO_PKG_VERSION"));
}
#[tokio::test]
async fn status_endpoint_returns_payload() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 1024).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert!(v["version"].is_string());
assert_eq!(v["palace_count"], 0);
}
#[tokio::test]
async fn unknown_api_returns_404() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/does-not-exist")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn create_then_list_palace() {
let state = test_state();
let app = router().with_state(state.clone());
let body = json!({"name": "web-test", "description": "from test"}).to_string();
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/palaces")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let arr = v.as_array().expect("array");
assert!(arr.iter().any(|p| p["id"] == "web-test"));
}
#[tokio::test]
async fn status_includes_total_counters() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["total_drawers"], 0);
assert_eq!(v["total_vectors"], 0);
assert_eq!(v["total_kg_triples"], 0);
}
#[tokio::test]
async fn dream_status_empty_returns_nulls() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/dream/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert!(v["last_run_at"].is_null());
assert_eq!(v["merged"], 0);
assert_eq!(v["pruned"], 0);
}
#[tokio::test]
async fn providers_endpoint_returns_payload() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/chat/providers")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let arr = v["providers"].as_array().expect("providers array");
assert_eq!(arr.len(), 2);
let names: Vec<&str> = arr.iter().filter_map(|p| p["name"].as_str()).collect();
assert!(names.contains(&"ollama"));
assert!(names.contains(&"openrouter"));
assert!(v.get("active").is_some());
}
#[tokio::test]
async fn chat_session_crud_round_trip() {
let state = test_state();
let palace = trusty_memory_core::Palace {
id: PalaceId::new("sess-test"),
name: "sess-test".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("sess-test"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create_palace");
let app = router().with_state(state);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces/sess-test/chat/sessions")
.header("content-type", "application/json")
.body(Body::from(json!({"title":"first chat"}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let sid = v["id"].as_str().expect("session id").to_string();
let resp = app
.clone()
.oneshot(
Request::builder()
.uri("/api/v1/palaces/sess-test/chat/sessions")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
let arr = v.as_array().expect("array");
assert!(arr.iter().any(|s| s["id"] == sid));
let resp = app
.clone()
.oneshot(
Request::builder()
.uri(format!("/api/v1/palaces/sess-test/chat/sessions/{sid}"))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = app
.clone()
.oneshot(
Request::builder()
.method("DELETE")
.uri(format!("/api/v1/palaces/sess-test/chat/sessions/{sid}"))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
let resp = app
.oneshot(
Request::builder()
.uri(format!("/api/v1/palaces/sess-test/chat/sessions/{sid}"))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[test]
fn all_tools_returns_expected_set() {
let tools = all_tools();
let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
assert_eq!(
names,
vec![
"list_palaces",
"get_palace",
"recall_memories",
"list_drawers",
"kg_query",
"get_config",
"get_status",
"get_dream_status",
"get_palace_dream_status",
"create_memory",
"kg_assert",
"memory_recall_all",
]
);
for t in &tools {
assert_eq!(
t.parameters["type"], "object",
"tool {} schema type",
t.name
);
assert!(
t.parameters["required"].is_array(),
"tool {} required not array",
t.name
);
}
}
#[tokio::test]
async fn execute_tool_dispatches_known_tools() {
let state = test_state();
let result = execute_tool("list_palaces", "{}", &state).await;
assert!(
result.is_array(),
"list_palaces should be array, got {result}"
);
assert_eq!(result.as_array().unwrap().len(), 0);
let unknown = execute_tool("not_a_tool", "{}", &state).await;
assert!(
unknown["error"]
.as_str()
.unwrap_or("")
.contains("unknown tool"),
"expected unknown-tool error, got {unknown}"
);
let missing = execute_tool("get_palace", "{}", &state).await;
assert!(
missing["error"]
.as_str()
.unwrap_or("")
.contains("palace_id"),
"expected missing-arg error, got {missing}"
);
}
#[tokio::test]
async fn sse_broadcast_emits_palace_created() {
let state = test_state();
let mut rx = state.events.subscribe();
let app = router().with_state(state.clone());
let body = json!({"name": "sse-test"}).to_string();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/palaces")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let event = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv())
.await
.expect("event received within timeout")
.expect("event channel still open");
match event {
DaemonEvent::PalaceCreated { id, name } => {
assert_eq!(id, "sse-test");
assert_eq!(name, "sse-test");
}
other => panic!("expected PalaceCreated, got {other:?}"),
}
}
#[tokio::test]
async fn sse_endpoint_emits_connected_frame() {
use axum::routing::get;
let state = test_state();
let app = router()
.route("/sse", get(crate::sse_handler))
.with_state(state);
let resp = app
.oneshot(Request::builder().uri("/sse").body(Body::empty()).unwrap())
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok()),
Some("text/event-stream")
);
let body = resp.into_body();
let bytes =
tokio::time::timeout(std::time::Duration::from_millis(500), to_bytes(body, 4096))
.await
.ok()
.and_then(|r| r.ok())
.unwrap_or_default();
let text = String::from_utf8_lossy(&bytes);
assert!(
text.contains("\"type\":\"connected\""),
"expected connected frame, got: {text}"
);
}
#[tokio::test]
async fn dream_status_aggregates_across_palaces() {
use trusty_memory_core::dream::{DreamStats, PersistedDreamStats};
let state = test_state();
for (id, stats, ts) in [
(
"palace-a",
DreamStats {
merged: 1,
pruned: 2,
compacted: 3,
closets_updated: 4,
duration_ms: 100,
},
chrono::Utc::now() - chrono::Duration::seconds(60),
),
(
"palace-b",
DreamStats {
merged: 10,
pruned: 20,
compacted: 30,
closets_updated: 40,
duration_ms: 200,
},
chrono::Utc::now(),
),
] {
let palace = trusty_memory_core::Palace {
id: PalaceId::new(id),
name: id.to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join(id),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
let persisted = PersistedDreamStats {
last_run_at: ts,
stats,
};
persisted
.save(&state.data_root.join(id))
.expect("save dream stats");
}
let later = chrono::Utc::now();
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.uri("/api/v1/dream/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["merged"], 11);
assert_eq!(v["pruned"], 22);
assert_eq!(v["compacted"], 33);
assert_eq!(v["closets_updated"], 44);
assert_eq!(v["duration_ms"], 300);
let last = v["last_run_at"].as_str().expect("last_run_at is string");
let parsed: chrono::DateTime<chrono::Utc> = last
.parse()
.expect("last_run_at parses as RFC3339 timestamp");
assert!(
parsed <= later,
"last_run_at ({parsed}) should not exceed wall clock ({later})"
);
let cutoff = chrono::Utc::now() - chrono::Duration::seconds(30);
assert!(
parsed >= cutoff,
"expected the newer (palace-b) timestamp; got {parsed}"
);
}
#[tokio::test]
async fn dream_run_aggregates_stats() {
let state = test_state();
let palace = trusty_memory_core::Palace {
id: PalaceId::new("dream-run-test"),
name: "dream-run-test".to_string(),
description: None,
created_at: chrono::Utc::now(),
data_dir: state.data_root.join("dream-run-test"),
};
state
.registry
.create_palace(&state.data_root, palace)
.expect("create palace");
let app = router().with_state(state);
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/api/v1/dream/run")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = to_bytes(resp.into_body(), 4096).await.unwrap();
let v: Value = serde_json::from_slice(&bytes).unwrap();
for key in [
"merged",
"pruned",
"compacted",
"closets_updated",
"duration_ms",
] {
assert!(
v.get(key).is_some(),
"missing key {key} in dream_run payload: {v}"
);
assert!(
v[key].is_u64() || v[key].is_i64(),
"{key} should be integer, got {}",
v[key]
);
}
assert!(
v["last_run_at"].is_string(),
"last_run_at must be set by dream_run; got {v}"
);
}
#[tokio::test]
async fn serves_index_html_fallback() {
let state = test_state();
let app = router().with_state(state);
let resp = app
.oneshot(Request::builder().uri("/").body(Body::empty()).unwrap())
.await
.unwrap();
assert!(
resp.status() == StatusCode::OK || resp.status() == StatusCode::NOT_FOUND,
"got {}",
resp.status()
);
}
}