use std::time::Duration;
use crate::monitor::dashboard::{MemoryData, PalaceRow};
use super::parsers::{
parse_drawers, parse_dream_stats, parse_memory_details, parse_memory_event, parse_palaces,
parse_recall_hits,
};
use super::types::{DrawerInfo, DreamStats, MemoryDetail, MemoryEvent, REQUEST_TIMEOUT, RecallHit};
#[derive(Debug, Clone)]
pub struct MemoryClient {
pub(super) base: String,
http: reqwest::Client,
}
impl MemoryClient {
pub fn new(base: impl Into<String>) -> Self {
let http = reqwest::Client::builder()
.timeout(REQUEST_TIMEOUT)
.build()
.unwrap_or_default();
Self {
base: base.into(),
http,
}
}
pub fn base_url(&self) -> &str {
&self.base
}
pub fn set_base_url(&mut self, base: impl Into<String>) {
self.base = base.into();
}
pub async fn fetch_all(&self) -> anyhow::Result<MemoryData> {
use super::types::StatusWire;
let status: StatusWire = self
.http
.get(format!("{}/api/v1/status", self.base))
.send()
.await?
.error_for_status()?
.json()
.await?;
let palaces = match self.palaces().await {
Ok(rows) => rows,
Err(e) => {
tracing::warn!("palace list probe failed: {e}");
Vec::new()
}
};
Ok(MemoryData {
version: status.version,
palace_count: status.palace_count,
total_drawers: status.total_drawers,
total_vectors: status.total_vectors,
total_kg_triples: status.total_kg_triples,
palaces,
})
}
pub async fn is_healthy(&self) -> bool {
matches!(
self.http.get(format!("{}/health", self.base)).send().await,
Ok(r) if r.status().is_success()
)
}
async fn palaces(&self) -> anyhow::Result<Vec<PalaceRow>> {
let raw: serde_json::Value = self
.http
.get(format!("{}/api/v1/palaces", self.base))
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(parse_palaces(&raw))
}
pub async fn recall(&self, query: &str, top_k: usize) -> anyhow::Result<Vec<RecallHit>> {
let raw: serde_json::Value = self
.http
.get(format!("{}/api/v1/recall", self.base))
.query(&[("q", query), ("top_k", &top_k.to_string())])
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(parse_recall_hits(&raw))
}
pub async fn list_drawers(
&self,
palace_id: &str,
limit: usize,
offset: usize,
) -> anyhow::Result<Vec<DrawerInfo>> {
let raw: serde_json::Value = self
.http
.get(format!(
"{}/api/v1/palaces/{}/drawers",
self.base, palace_id,
))
.query(&[
("limit", limit.to_string()),
("offset", offset.to_string()),
("sort", "created_desc".to_string()),
])
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(parse_drawers(&raw))
}
pub async fn fetch_drawer_detail(
&self,
palace_id: &str,
limit: usize,
) -> anyhow::Result<Vec<MemoryDetail>> {
let raw: serde_json::Value = self
.http
.get(format!(
"{}/api/v1/palaces/{}/drawers",
self.base, palace_id,
))
.query(&[
("limit", limit.to_string()),
("sort", "created_desc".to_string()),
])
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(parse_memory_details(&raw))
}
pub async fn dream_run(&self) -> anyhow::Result<DreamStats> {
let raw: serde_json::Value = self
.http
.post(format!("{}/api/v1/dream/run", self.base))
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(parse_dream_stats(&raw))
}
pub async fn sse_stream(&self, tx: tokio::sync::mpsc::Sender<MemoryEvent>) {
let _ = self.sse_stream_inner(&tx).await;
}
async fn sse_stream_inner(
&self,
tx: &tokio::sync::mpsc::Sender<MemoryEvent>,
) -> anyhow::Result<()> {
use futures_util::StreamExt;
let sse = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(5))
.build()?;
let resp = sse
.get(format!("{}/sse", self.base))
.send()
.await?
.error_for_status()?;
let mut bytes = resp.bytes_stream();
let mut buf = String::new();
while let Some(chunk) = bytes.next().await {
let chunk = chunk?;
buf.push_str(&String::from_utf8_lossy(&chunk));
while let Some(nl) = buf.find('\n') {
let line = buf[..nl].trim_end_matches('\r').to_string();
buf.drain(..=nl);
let Some(payload) = line.strip_prefix("data:") else {
continue;
};
let payload = payload.trim();
if payload.is_empty() {
continue;
}
if let Ok(value) = serde_json::from_str::<serde_json::Value>(payload)
&& let Some(event) = parse_memory_event(&value)
&& tx.send(event).await.is_err()
{
return Ok(()); }
}
}
Ok(())
}
}