use std::time::Duration;
use serde::Deserialize;
use crate::monitor::dashboard::{MemoryData, PalaceRow};
pub const DEFAULT_MEMORY_URL: &str = "http://127.0.0.1:7070";
const REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
pub fn resolve_memory_url() -> String {
match crate::read_daemon_addr("trusty-memory") {
Ok(Some(addr)) => normalize_url(&addr),
_ => DEFAULT_MEMORY_URL.to_string(),
}
}
pub fn normalize_url(raw: &str) -> String {
if raw.starts_with("http://") || raw.starts_with("https://") {
raw.to_string()
} else {
format!("http://{raw}")
}
}
#[derive(Debug, Deserialize)]
struct StatusWire {
#[serde(default)]
version: String,
#[serde(default)]
palace_count: u64,
#[serde(default)]
total_drawers: u64,
#[serde(default)]
total_vectors: u64,
#[serde(default)]
total_kg_triples: u64,
}
#[derive(Debug, Default, Deserialize)]
struct PalaceWire {
#[serde(default)]
id: String,
#[serde(default)]
name: String,
#[serde(default, alias = "vectors", alias = "total_vectors")]
vector_count: u64,
}
#[derive(Debug, Clone)]
pub struct MemoryClient {
base: String,
http: reqwest::Client,
}
impl MemoryClient {
pub fn new(base: impl Into<String>) -> Self {
let http = reqwest::Client::builder()
.timeout(REQUEST_TIMEOUT)
.build()
.unwrap_or_default();
Self {
base: base.into(),
http,
}
}
pub fn base_url(&self) -> &str {
&self.base
}
pub fn set_base_url(&mut self, base: impl Into<String>) {
self.base = base.into();
}
pub async fn fetch_all(&self) -> anyhow::Result<MemoryData> {
let status: StatusWire = self
.http
.get(format!("{}/api/v1/status", self.base))
.send()
.await?
.error_for_status()?
.json()
.await?;
let palaces = match self.palaces().await {
Ok(rows) => rows,
Err(e) => {
tracing::warn!("palace list probe failed: {e}");
Vec::new()
}
};
Ok(MemoryData {
version: status.version,
palace_count: status.palace_count,
total_drawers: status.total_drawers,
total_vectors: status.total_vectors,
total_kg_triples: status.total_kg_triples,
palaces,
})
}
pub async fn is_healthy(&self) -> bool {
matches!(
self.http.get(format!("{}/health", self.base)).send().await,
Ok(r) if r.status().is_success()
)
}
async fn palaces(&self) -> anyhow::Result<Vec<PalaceRow>> {
let raw: serde_json::Value = self
.http
.get(format!("{}/api/v1/palaces", self.base))
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(parse_palaces(&raw))
}
pub async fn recall(&self, query: &str, top_k: usize) -> anyhow::Result<Vec<RecallHit>> {
let raw: serde_json::Value = self
.http
.get(format!("{}/api/v1/recall", self.base))
.query(&[("q", query), ("top_k", &top_k.to_string())])
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(parse_recall_hits(&raw))
}
pub async fn dream_run(&self) -> anyhow::Result<DreamStats> {
let raw: serde_json::Value = self
.http
.post(format!("{}/api/v1/dream/run", self.base))
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(parse_dream_stats(&raw))
}
pub async fn sse_stream(&self, tx: tokio::sync::mpsc::Sender<MemoryEvent>) {
let _ = self.sse_stream_inner(&tx).await;
}
async fn sse_stream_inner(
&self,
tx: &tokio::sync::mpsc::Sender<MemoryEvent>,
) -> anyhow::Result<()> {
use futures_util::StreamExt;
let sse = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(5))
.build()?;
let resp = sse
.get(format!("{}/sse", self.base))
.send()
.await?
.error_for_status()?;
let mut bytes = resp.bytes_stream();
let mut buf = String::new();
while let Some(chunk) = bytes.next().await {
let chunk = chunk?;
buf.push_str(&String::from_utf8_lossy(&chunk));
while let Some(nl) = buf.find('\n') {
let line = buf[..nl].trim_end_matches('\r').to_string();
buf.drain(..=nl);
let Some(payload) = line.strip_prefix("data:") else {
continue;
};
let payload = payload.trim();
if payload.is_empty() {
continue;
}
if let Ok(value) = serde_json::from_str::<serde_json::Value>(payload)
&& let Some(event) = parse_memory_event(&value)
&& tx.send(event).await.is_err()
{
return Ok(()); }
}
}
Ok(())
}
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct RecallHit {
pub palace_id: String,
pub snippet: String,
pub score: f32,
}
pub fn parse_recall_hits(raw: &serde_json::Value) -> Vec<RecallHit> {
let serde_json::Value::Array(items) = raw else {
return Vec::new();
};
items
.iter()
.map(|item| {
let palace_id = item
.get("palace_id")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let snippet = item
.get("content")
.and_then(|v| v.as_str())
.unwrap_or_default()
.lines()
.next()
.unwrap_or_default()
.trim()
.to_string();
let score = item.get("score").and_then(|v| v.as_f64()).unwrap_or(0.0) as f32;
RecallHit {
palace_id,
snippet,
score,
}
})
.collect()
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct DreamStats {
pub merged: u64,
pub pruned: u64,
pub compacted: u64,
}
pub fn parse_dream_stats(raw: &serde_json::Value) -> DreamStats {
let u64_of = |key: &str| raw.get(key).and_then(|v| v.as_u64()).unwrap_or(0);
DreamStats {
merged: u64_of("merged"),
pruned: u64_of("pruned"),
compacted: u64_of("compacted"),
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MemoryEvent {
PalaceCreated {
name: String,
},
DrawerAdded {
palace_id: String,
drawer_count: u64,
},
DrawerDeleted {
palace_id: String,
drawer_count: u64,
},
DreamCompleted {
merged: u64,
pruned: u64,
compacted: u64,
},
}
pub fn parse_memory_event(value: &serde_json::Value) -> Option<MemoryEvent> {
let tag = value.get("type").and_then(|v| v.as_str())?;
let str_of = |key: &str| {
value
.get(key)
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string()
};
let u64_of = |key: &str| value.get(key).and_then(|v| v.as_u64()).unwrap_or(0);
match tag {
"palace_created" => Some(MemoryEvent::PalaceCreated {
name: str_of("name"),
}),
"drawer_added" => Some(MemoryEvent::DrawerAdded {
palace_id: str_of("palace_id"),
drawer_count: u64_of("drawer_count"),
}),
"drawer_deleted" => Some(MemoryEvent::DrawerDeleted {
palace_id: str_of("palace_id"),
drawer_count: u64_of("drawer_count"),
}),
"dream_completed" => Some(MemoryEvent::DreamCompleted {
merged: u64_of("merged"),
pruned: u64_of("pruned"),
compacted: u64_of("compacted"),
}),
_ => None,
}
}
pub fn parse_palaces(raw: &serde_json::Value) -> Vec<PalaceRow> {
let array = match raw {
serde_json::Value::Array(items) => items.clone(),
serde_json::Value::Object(obj) => match obj.get("palaces") {
Some(serde_json::Value::Array(items)) => items.clone(),
_ => Vec::new(),
},
_ => Vec::new(),
};
array
.into_iter()
.filter_map(|v| serde_json::from_value::<PalaceWire>(v).ok())
.map(|p| PalaceRow {
id: p.id,
name: p.name,
vector_count: p.vector_count,
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_memory_url_is_local() {
assert!(DEFAULT_MEMORY_URL.starts_with("http://127.0.0.1"));
}
#[test]
fn normalize_url_adds_scheme() {
assert_eq!(normalize_url("127.0.0.1:7070"), "http://127.0.0.1:7070");
assert_eq!(
normalize_url("http://127.0.0.1:7070"),
"http://127.0.0.1:7070"
);
}
#[test]
fn memory_client_stores_base_url() {
let client = MemoryClient::new("http://127.0.0.1:7070");
assert_eq!(client.base_url(), "http://127.0.0.1:7070");
}
#[test]
fn memory_client_repoints() {
let mut client = MemoryClient::new("http://127.0.0.1:7070");
client.set_base_url("http://127.0.0.1:8080");
assert_eq!(client.base_url(), "http://127.0.0.1:8080");
}
#[test]
fn resolve_memory_url_returns_http_url() {
let url = resolve_memory_url();
assert!(url.starts_with("http://") || url.starts_with("https://"));
}
#[test]
fn palace_list_accepts_array_and_object_shapes() {
let arr = serde_json::json!([
{"id": "p1", "name": "default", "vector_count": 8400},
{"id": "p2", "name": "work", "vectors": 0},
]);
let rows = parse_palaces(&arr);
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].id, "p1");
assert_eq!(rows[0].vector_count, 8400);
assert_eq!(rows[1].name, "work");
let obj = serde_json::json!({
"palaces": [{"id": "p3", "name": "notes", "total_vectors": 12}],
});
let rows = parse_palaces(&obj);
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].vector_count, 12);
assert!(parse_palaces(&serde_json::json!("nonsense")).is_empty());
}
#[test]
fn parse_recall_hits_projects_fields() {
let raw = serde_json::json!([
{
"palace_id": "default",
"content": "JWT middleware added to auth flow\nmore detail",
"score": 0.83,
},
{
"palace_id": "work",
"content": " single line ",
"score": 0.5,
},
]);
let hits = parse_recall_hits(&raw);
assert_eq!(hits.len(), 2);
assert_eq!(hits[0].palace_id, "default");
assert_eq!(hits[0].snippet, "JWT middleware added to auth flow");
assert!((hits[0].score - 0.83).abs() < 1e-6);
assert_eq!(hits[1].snippet, "single line");
assert!(parse_recall_hits(&serde_json::json!({})).is_empty());
}
#[test]
fn parse_dream_stats_reads_counts() {
let raw = serde_json::json!({
"merged": 3, "pruned": 1, "compacted": 0,
"closets_updated": 5, "duration_ms": 42,
});
assert_eq!(
parse_dream_stats(&raw),
DreamStats {
merged: 3,
pruned: 1,
compacted: 0,
}
);
assert_eq!(
parse_dream_stats(&serde_json::json!({})),
DreamStats::default()
);
}
#[test]
fn parse_memory_event_maps_type_tag() {
assert_eq!(
parse_memory_event(&serde_json::json!({
"type": "palace_created", "id": "p1", "name": "notes",
})),
Some(MemoryEvent::PalaceCreated {
name: "notes".into(),
})
);
assert_eq!(
parse_memory_event(&serde_json::json!({
"type": "drawer_added", "palace_id": "default", "drawer_count": 14,
})),
Some(MemoryEvent::DrawerAdded {
palace_id: "default".into(),
drawer_count: 14,
})
);
assert_eq!(
parse_memory_event(&serde_json::json!({
"type": "dream_completed", "merged": 3, "pruned": 1, "compacted": 0,
})),
Some(MemoryEvent::DreamCompleted {
merged: 3,
pruned: 1,
compacted: 0,
})
);
assert!(parse_memory_event(&serde_json::json!({"type": "connected"})).is_none());
assert!(parse_memory_event(&serde_json::json!({"type": "lag", "skipped": 2})).is_none());
assert!(parse_memory_event(&serde_json::json!({"no": "type"})).is_none());
}
}