use crate::{ActivityFilter, ActivitySource, DaemonEvent};
use std::sync::Arc;
use trusty_common::memory_core::dream::{DreamConfig, Dreamer, PersistedDreamStats};
use trusty_common::memory_core::palace::PalaceId;
use trusty_common::memory_core::store::kg::Triple;
use trusty_common::memory_core::{PalaceHandle, PalaceRegistry};
use super::core::KG_GRAPH_MAX_TRIPLES;
use super::helpers::refresh_gaps_cache;
use super::types::{DreamStatusPayload, KgAssertBody, KgGraphPayload, ServiceError, ServiceResult};
use super::MemoryService;
impl MemoryService {
pub async fn kg_query(&self, id: &str, subject: &str) -> ServiceResult<Vec<Triple>> {
let handle = self.open_handle(id)?;
handle
.kg
.query_active(subject)
.await
.map_err(|e| ServiceError::internal(format!("kg query: {e:#}")))
}
pub async fn kg_assert(&self, id: &str, body: KgAssertBody) -> ServiceResult<()> {
let handle = self.open_handle(id)?;
let triple = Triple {
subject: body.subject,
predicate: body.predicate,
object: body.object,
valid_from: chrono::Utc::now(),
valid_to: None,
confidence: body.confidence.unwrap_or(1.0),
provenance: body.provenance,
};
handle
.kg
.assert(triple)
.await
.map_err(|e| ServiceError::internal(format!("kg assert: {e:#}")))
}
pub async fn kg_retract_triple(
&self,
id: &str,
subject: &str,
predicate: &str,
) -> ServiceResult<bool> {
let handle = self.open_handle(id)?;
let closed = handle
.kg
.retract(subject, predicate)
.await
.map_err(|e| ServiceError::internal(format!("kg retract: {e:#}")))?;
Ok(closed > 0)
}
pub async fn kg_list_subjects(&self, id: &str, limit: usize) -> ServiceResult<Vec<String>> {
let handle = self.open_handle(id)?;
handle
.kg
.list_subjects(limit)
.map_err(|e| ServiceError::internal(format!("kg list_subjects: {e:#}")))
}
pub async fn kg_list_subjects_with_counts(
&self,
id: &str,
limit: usize,
) -> ServiceResult<Vec<(String, u64)>> {
let handle = self.open_handle(id)?;
handle
.kg
.list_subjects_with_counts(limit)
.map_err(|e| ServiceError::internal(format!("kg list_subjects_with_counts: {e:#}")))
}
pub async fn kg_list_all(
&self,
id: &str,
limit: usize,
offset: usize,
) -> ServiceResult<Vec<Triple>> {
let handle = self.open_handle(id)?;
handle
.kg
.list_active(limit, offset)
.await
.map_err(|e| ServiceError::internal(format!("kg list_active: {e:#}")))
}
pub async fn kg_count(&self, id: &str) -> ServiceResult<usize> {
let handle = self.open_handle(id)?;
Ok(handle.kg.count_active_triples())
}
pub async fn kg_graph(&self, id: &str) -> ServiceResult<KgGraphPayload> {
let handle = self.open_handle(id)?;
let triples = handle
.kg
.list_active(KG_GRAPH_MAX_TRIPLES, 0)
.await
.map_err(|e| ServiceError::internal(format!("kg list_active: {e:#}")))?;
Ok(KgGraphPayload {
triples,
node_count: handle.kg.node_count() as u64,
edge_count: handle.kg.edge_count() as u64,
community_count: handle.kg.community_count() as u64,
})
}
pub async fn dream_status_aggregate(&self) -> DreamStatusPayload {
let palaces = PalaceRegistry::list_palaces(&self.state.data_root).unwrap_or_default();
let mut out = DreamStatusPayload::default();
let mut latest: Option<chrono::DateTime<chrono::Utc>> = None;
for p in palaces {
let data_dir = self.state.data_root.join(p.id.as_str());
let snap = match PersistedDreamStats::load(&data_dir) {
Ok(Some(s)) => s,
_ => continue,
};
out.merged = out.merged.saturating_add(snap.stats.merged);
out.pruned = out.pruned.saturating_add(snap.stats.pruned);
out.compacted = out.compacted.saturating_add(snap.stats.compacted);
out.closets_updated = out
.closets_updated
.saturating_add(snap.stats.closets_updated);
out.duration_ms = out.duration_ms.saturating_add(snap.stats.duration_ms);
latest = match latest {
Some(t) if t >= snap.last_run_at => Some(t),
_ => Some(snap.last_run_at),
};
}
out.last_run_at = latest;
out
}
pub async fn dream_status_for_palace(&self, id: &str) -> ServiceResult<DreamStatusPayload> {
let data_dir = self.state.data_root.join(id);
if !data_dir.exists() {
return Err(ServiceError::not_found(format!("palace not found: {id}")));
}
match PersistedDreamStats::load(&data_dir) {
Ok(Some(s)) => Ok(s.into()),
Ok(None) => Ok(DreamStatusPayload::default()),
Err(e) => Err(ServiceError::internal(format!("read dream stats: {e:#}"))),
}
}
pub async fn dream_run(&self) -> ServiceResult<DreamStatusPayload> {
let palaces = PalaceRegistry::list_palaces(&self.state.data_root)
.map_err(|e| ServiceError::internal(format!("list palaces: {e:#}")))?;
let dreamer = Dreamer::new(DreamConfig::default());
let mut out = DreamStatusPayload::default();
for p in palaces {
let handle = match self
.state
.registry
.open_palace(&self.state.data_root, &p.id)
{
Ok(h) => h,
Err(e) => {
tracing::warn!(palace = %p.id, "dream_run: open failed: {e:#}");
continue;
}
};
match dreamer.dream_cycle(&handle).await {
Ok(stats) => {
out.merged = out.merged.saturating_add(stats.merged);
out.pruned = out.pruned.saturating_add(stats.pruned);
out.compacted = out.compacted.saturating_add(stats.compacted);
out.closets_updated = out.closets_updated.saturating_add(stats.closets_updated);
out.duration_ms = out.duration_ms.saturating_add(stats.duration_ms);
}
Err(e) => tracing::warn!(palace = %p.id, "dream_run: cycle failed: {e:#}"),
}
refresh_gaps_cache(&self.state, &handle).await;
}
out.last_run_at = Some(chrono::Utc::now());
self.state.emit(DaemonEvent::DreamCompleted {
palace_id: None,
merged: out.merged,
pruned: out.pruned,
compacted: out.compacted,
closets_updated: out.closets_updated,
duration_ms: out.duration_ms,
source: ActivitySource::Http,
});
self.state.emit(self.aggregate_status_event());
Ok(out)
}
pub async fn list_activity(
&self,
filter: ActivityFilter,
limit: usize,
offset: usize,
) -> ServiceResult<(Vec<crate::ActivityEntry>, u64)> {
let entries = self
.state
.activity_log
.list(&filter, limit, offset)
.map_err(|e| ServiceError::internal(format!("activity list: {e:#}")))?;
let total = self
.state
.activity_log
.count()
.map_err(|e| ServiceError::internal(format!("activity count: {e:#}")))?;
Ok((entries, total))
}
pub fn open_handle(&self, id: &str) -> ServiceResult<Arc<PalaceHandle>> {
self.state
.registry
.open_palace(&self.state.data_root, &PalaceId::new(id))
.map_err(|e| ServiceError::not_found(format!("palace not found: {id} ({e:#})")))
}
}