use crate::attribution::{CreatorInfo, CreatorSource, MCP_CLIENT_NAME};
use crate::{ActivitySource, AppState, DaemonEvent, DaemonReadiness};
use anyhow::{anyhow, Context, Result};
use serde_json::{json, Value};
use trusty_common::memory_core::palace::RoomType;
use trusty_common::memory_core::retrieval::{
recall, recall_across_palaces, recall_deep, RememberOptions,
};
use trusty_common::memory_core::timeouts;
use uuid::Uuid;
use super::bm25::{
bm25_hits_to_recall_results, bm25_search_optional, fuse_bm25_into_recall, serialize_recall,
};
use super::helpers::{
attach_mcp_attribution, blocklist_gate, content_gate, dedup_gate, mcp_remember_opts,
open_palace_handle, parse_room, parse_tags, resolve_palace, room_label, skipped_envelope,
write_drawer, WriteDrawerParams,
};
pub(crate) async fn handle_memory_remember(state: &AppState, args: Value) -> Result<Value> {
let defer_embedding = state.readiness() == DaemonReadiness::Warming;
let palace = resolve_palace(state, &args, "memory_remember")?;
let palace = palace.as_str();
let raw_text = args
.get("text")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("memory_remember: missing 'text'"))?
.to_string();
if let Some(pattern) = blocklist_gate(&raw_text) {
let reason = format!("content gate: skipped (blocked pattern: {pattern:?})");
tracing::debug!(palace = %palace, pattern = %pattern, "{reason}");
return Ok(skipped_envelope(palace, &reason));
}
let ctx = args.get("context").and_then(|v| v.as_str());
let text = match content_gate(&raw_text, ctx) {
Some(t) => t,
None => {
return Ok(skipped_envelope(
palace,
"content gate: skipped (short prompt, no context)",
));
}
};
let room = parse_room(args.get("room").and_then(|v| v.as_str()));
let mut tags = parse_tags(&args);
attach_mcp_attribution(&mut tags);
let force = args.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
let write_lock = state.palace_write_lock(palace);
let _write_guard =
timeouts::lock_with_timeout(&write_lock, timeouts::write_lock_timeout(), palace)
.await
.map_err(|e| anyhow::anyhow!("memory_remember: {e:#}"))?;
if !force {
let handle = open_palace_handle(state, palace)?;
if dedup_gate(&handle, &text) {
tracing::debug!(
palace = %palace,
"content gate: skipped (duplicate within window)",
);
return Ok(skipped_envelope(
palace,
"content gate: skipped (duplicate within window)",
));
}
}
let room_label_for_kg = room_label(&room);
let drawer_id = write_drawer(
state,
WriteDrawerParams {
palace_id: palace,
content: text,
tags,
room,
importance: 0.5,
opts: mcp_remember_opts(force, defer_embedding),
room_label_for_kg,
},
)
.await?;
Ok(json!({
"drawer_id": drawer_id.to_string(),
"palace": palace,
"status": "stored",
}))
}
pub(crate) async fn handle_memory_note(state: &AppState, args: Value) -> Result<Value> {
let defer_embedding = state.readiness() == DaemonReadiness::Warming;
let palace = resolve_palace(state, &args, "memory_note")?;
let palace = palace.as_str();
let raw_content = args
.get("content")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("memory_note: missing 'content'"))?
.to_string();
if let Some(pattern) = blocklist_gate(&raw_content) {
let reason = format!("content gate: skipped (blocked pattern: {pattern:?})");
tracing::debug!(palace = %palace, pattern = %pattern, "{reason}");
return Ok(skipped_envelope(palace, &reason));
}
let ctx = args.get("context").and_then(|v| v.as_str());
let content = match content_gate(&raw_content, ctx) {
Some(c) => c,
None => {
return Ok(skipped_envelope(
palace,
"content gate: skipped (short prompt, no context)",
));
}
};
let mut tags = parse_tags(&args);
attach_mcp_attribution(&mut tags);
let write_lock = state.palace_write_lock(palace);
let _write_guard =
timeouts::lock_with_timeout(&write_lock, timeouts::write_lock_timeout(), palace)
.await
.map_err(|e| anyhow::anyhow!("memory_note: {e:#}"))?;
{
let handle = open_palace_handle(state, palace)?;
if dedup_gate(&handle, &content) {
tracing::debug!(
palace = %palace,
"content gate: skipped (duplicate within window)",
);
return Ok(skipped_envelope(
palace,
"content gate: skipped (duplicate within window)",
));
}
}
let drawer_id = write_drawer(
state,
WriteDrawerParams {
palace_id: palace,
content,
tags,
room: RoomType::General,
importance: 1.0,
opts: RememberOptions {
defer_embedding,
..RememberOptions::note()
},
room_label_for_kg: Some("General".to_string()),
},
)
.await
.context("PalaceHandle::remember_with_options (note)")?;
Ok(json!({
"drawer_id": drawer_id.to_string(),
"palace": palace,
"status": "stored",
"drawer_type": "UserFact",
}))
}
async fn recall_without_embedder(
state: &AppState,
handle: &trusty_common::memory_core::retrieval::PalaceHandle,
palace: &str,
query: &str,
top_k: usize,
) -> Vec<trusty_common::memory_core::retrieval::RecallResult> {
let mut results = trusty_common::memory_core::retrieval::retrieve_l0_l1(handle);
if let Some(bm25_hits) = bm25_search_optional(state, palace, query, top_k).await {
for hydrated in bm25_hits_to_recall_results(handle, &bm25_hits) {
if !results.iter().any(|r| r.drawer.id == hydrated.drawer.id) {
results.push(hydrated);
}
}
}
results.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
results.truncate(top_k);
results
}
pub(crate) async fn handle_memory_recall(state: &AppState, args: Value) -> Result<Value> {
let palace = resolve_palace(state, &args, "memory_recall")?;
let query = args
.get("query")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("memory_recall: missing 'query'"))?;
let top_k = args.get("top_k").and_then(|v| v.as_u64()).unwrap_or(10) as usize;
let handle = open_palace_handle(state, &palace)?;
if state.readiness() == DaemonReadiness::Warming {
let results = recall_without_embedder(state, &handle, &palace, query, top_k).await;
return Ok(serialize_recall(&palace, query, results));
}
let embedder = state.embedder().await?;
let vector_fut = recall(&handle, embedder.as_ref(), query, top_k);
let bm25_fut = bm25_search_optional(state, &palace, query, top_k);
let (vector_res, bm25_res) = tokio::join!(vector_fut, bm25_fut);
let mut results = vector_res.context("recall")?;
if let Some(bm25_hits) = bm25_res {
fuse_bm25_into_recall(&mut results, &bm25_hits, top_k);
}
Ok(serialize_recall(&palace, query, results))
}
pub(crate) async fn handle_memory_recall_deep(state: &AppState, args: Value) -> Result<Value> {
let palace = resolve_palace(state, &args, "memory_recall_deep")?;
let query = args
.get("query")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("memory_recall_deep: missing 'query'"))?;
let top_k = args.get("top_k").and_then(|v| v.as_u64()).unwrap_or(10) as usize;
let handle = open_palace_handle(state, &palace)?;
if state.readiness() == DaemonReadiness::Warming {
let results = recall_without_embedder(state, &handle, &palace, query, top_k).await;
return Ok(serialize_recall(&palace, query, results));
}
let embedder = state.embedder().await?;
let results = recall_deep(&handle, embedder.as_ref(), query, top_k)
.await
.context("recall_deep")?;
Ok(serialize_recall(&palace, query, results))
}
pub(crate) async fn handle_memory_list(state: &AppState, args: Value) -> Result<Value> {
let palace = resolve_palace(state, &args, "memory_list")?;
let handle = open_palace_handle(state, &palace)?;
let room = args
.get("room")
.and_then(|v| v.as_str())
.map(|s| parse_room(Some(s)));
let tag = args
.get("tag")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let limit = args.get("limit").and_then(|v| v.as_u64()).unwrap_or(50) as usize;
let drawers = handle.list_drawers(room, tag, limit);
let payload: Vec<Value> = drawers
.iter()
.map(|d| {
json!({
"drawer_id": d.id.to_string(),
"content": d.content,
"importance": d.importance,
"tags": d.tags,
"created_at": d.created_at.to_rfc3339(),
"drawer_type": d.drawer_type.as_str(),
"expires_at": d.expires_at.map(|t| t.to_rfc3339()),
})
})
.collect();
Ok(json!({ "palace": palace, "drawers": payload }))
}
pub(crate) async fn handle_memory_forget(state: &AppState, args: Value) -> Result<Value> {
let palace = resolve_palace(state, &args, "memory_forget")?;
let drawer_id_str = args
.get("drawer_id")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("memory_forget: missing 'drawer_id'"))?;
let drawer_id = Uuid::parse_str(drawer_id_str)
.map_err(|e| anyhow!("memory_forget: invalid drawer_id UUID: {e}"))?;
let handle = open_palace_handle(state, &palace)?;
handle.forget(drawer_id).await.context("forget")?;
let drawer_count = handle.drawers.read().len();
state.emit(DaemonEvent::DrawerDeleted {
palace_id: palace.clone(),
drawer_count,
source: ActivitySource::Mcp,
});
Ok(json!({ "status": "deleted", "drawer_id": drawer_id_str, "palace": palace }))
}
async fn recall_all_without_embedder(
state: &AppState,
handles: &[std::sync::Arc<trusty_common::memory_core::retrieval::PalaceHandle>],
query: &str,
top_k: usize,
) -> Vec<trusty_common::memory_core::retrieval::CrossPalaceResult> {
let mut merged = Vec::new();
for handle in handles {
let palace_id = handle.id.as_str().to_string();
let hits = recall_without_embedder(state, handle, &palace_id, query, top_k).await;
merged.extend(hits.into_iter().map(|result| {
trusty_common::memory_core::retrieval::CrossPalaceResult {
palace_id: palace_id.clone(),
result,
}
}));
}
merged.sort_by(|a, b| {
b.result
.score
.partial_cmp(&a.result.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
merged.truncate(top_k);
merged
}
pub(crate) async fn handle_memory_recall_all(state: &AppState, args: Value) -> Result<Value> {
let query = args
.get("q")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("memory_recall_all: missing 'q'"))?;
let top_k = args.get("top_k").and_then(|v| v.as_u64()).unwrap_or(10) as usize;
let deep = args.get("deep").and_then(|v| v.as_bool()).unwrap_or(false);
let root = state.data_root.clone();
let palaces = tokio::task::spawn_blocking(move || {
trusty_common::memory_core::PalaceRegistry::list_palaces(&root)
})
.await
.context("join list_palaces")??;
let mut handles = Vec::with_capacity(palaces.len());
for p in &palaces {
match state.registry.open_palace(&state.data_root, &p.id) {
Ok(h) => handles.push(h),
Err(e) => {
tracing::warn!(palace = %p.id, "memory_recall_all: open failed: {e:#}")
}
}
}
let results = if state.readiness() == DaemonReadiness::Warming {
recall_all_without_embedder(state, &handles, query, top_k).await
} else {
let embedder = state.embedder().await?;
let erased: std::sync::Arc<dyn trusty_common::memory_core::embed::Embedder + Send + Sync> =
embedder;
recall_across_palaces(&handles, &erased, query, top_k, deep)
.await
.context("recall_across_palaces")?
};
let payload: Vec<Value> = results
.iter()
.map(|r| {
json!({
"palace_id": r.palace_id,
"drawer_id": r.result.drawer.id.to_string(),
"content": r.result.drawer.content,
"importance": r.result.drawer.importance,
"tags": r.result.drawer.tags,
"score": r.result.score,
"layer": r.result.layer,
"drawer_type": r.result.drawer.drawer_type.as_str(),
})
})
.collect();
Ok(json!({ "query": query, "results": payload }))
}
pub(crate) async fn handle_memory_send_message(state: &AppState, args: Value) -> Result<Value> {
let to_palace = args
.get("to_palace")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("memory_send_message: missing 'to_palace'"))?
.to_string();
let purpose = args
.get("purpose")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("memory_send_message: missing 'purpose'"))?
.to_string();
let content = args
.get("content")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("memory_send_message: missing 'content'"))?
.to_string();
let from_palace = if let Some(s) = args.get("from_palace").and_then(|v| v.as_str()) {
s.to_string()
} else if let Some(d) = state.default_palace.clone() {
d
} else {
crate::messaging::cwd_palace_slug()
.context("memory_send_message: derive from_palace from cwd")?
};
let drawer_id = crate::messaging::send_message_to_palace(
&state.registry,
&state.data_root,
&from_palace,
&to_palace,
&purpose,
content,
CreatorInfo::new_self(MCP_CLIENT_NAME, CreatorSource::Mcp),
)
.await
.context("memory_send_message")?;
Ok(json!({
"drawer_id": drawer_id.to_string(),
"from_palace": from_palace,
"to_palace": to_palace,
"purpose": purpose,
"status": "sent",
}))
}