trusty_memory/service/
core_kg.rs1use crate::{ActivityFilter, ActivitySource, DaemonEvent};
11use std::sync::Arc;
12use trusty_common::memory_core::dream::{DreamConfig, Dreamer, PersistedDreamStats};
13use trusty_common::memory_core::palace::PalaceId;
14use trusty_common::memory_core::store::kg::Triple;
15use trusty_common::memory_core::{PalaceHandle, PalaceRegistry};
16
17use super::core::KG_GRAPH_MAX_TRIPLES;
18use super::helpers::refresh_gaps_cache;
19use super::types::{DreamStatusPayload, KgAssertBody, KgGraphPayload, ServiceError, ServiceResult};
20use super::MemoryService;
21
22impl MemoryService {
23 pub async fn kg_query(&self, id: &str, subject: &str) -> ServiceResult<Vec<Triple>> {
29 let handle = self.open_handle(id)?;
30 handle
31 .kg
32 .query_active(subject)
33 .await
34 .map_err(|e| ServiceError::internal(format!("kg query: {e:#}")))
35 }
36
37 pub async fn kg_assert(&self, id: &str, body: KgAssertBody) -> ServiceResult<()> {
39 let handle = self.open_handle(id)?;
40 let triple = Triple {
41 subject: body.subject,
42 predicate: body.predicate,
43 object: body.object,
44 valid_from: chrono::Utc::now(),
45 valid_to: None,
46 confidence: body.confidence.unwrap_or(1.0),
47 provenance: body.provenance,
48 };
49 handle
50 .kg
51 .assert(triple)
52 .await
53 .map_err(|e| ServiceError::internal(format!("kg assert: {e:#}")))
54 }
55
56 pub async fn kg_retract_triple(
66 &self,
67 id: &str,
68 subject: &str,
69 predicate: &str,
70 ) -> ServiceResult<bool> {
71 let handle = self.open_handle(id)?;
72 let closed = handle
73 .kg
74 .retract(subject, predicate)
75 .await
76 .map_err(|e| ServiceError::internal(format!("kg retract: {e:#}")))?;
77 Ok(closed > 0)
78 }
79
80 pub async fn kg_list_subjects(&self, id: &str, limit: usize) -> ServiceResult<Vec<String>> {
82 let handle = self.open_handle(id)?;
83 handle
84 .kg
85 .list_subjects(limit)
86 .map_err(|e| ServiceError::internal(format!("kg list_subjects: {e:#}")))
87 }
88
89 pub async fn kg_list_subjects_with_counts(
91 &self,
92 id: &str,
93 limit: usize,
94 ) -> ServiceResult<Vec<(String, u64)>> {
95 let handle = self.open_handle(id)?;
96 handle
97 .kg
98 .list_subjects_with_counts(limit)
99 .map_err(|e| ServiceError::internal(format!("kg list_subjects_with_counts: {e:#}")))
100 }
101
102 pub async fn kg_list_all(
104 &self,
105 id: &str,
106 limit: usize,
107 offset: usize,
108 ) -> ServiceResult<Vec<Triple>> {
109 let handle = self.open_handle(id)?;
110 handle
111 .kg
112 .list_active(limit, offset)
113 .await
114 .map_err(|e| ServiceError::internal(format!("kg list_active: {e:#}")))
115 }
116
117 pub async fn kg_count(&self, id: &str) -> ServiceResult<usize> {
119 let handle = self.open_handle(id)?;
120 Ok(handle.kg.count_active_triples())
121 }
122
123 pub async fn kg_graph(&self, id: &str) -> ServiceResult<KgGraphPayload> {
125 let handle = self.open_handle(id)?;
126 let triples = handle
127 .kg
128 .list_active(KG_GRAPH_MAX_TRIPLES, 0)
129 .await
130 .map_err(|e| ServiceError::internal(format!("kg list_active: {e:#}")))?;
131 Ok(KgGraphPayload {
132 triples,
133 node_count: handle.kg.node_count() as u64,
134 edge_count: handle.kg.edge_count() as u64,
135 community_count: handle.kg.community_count() as u64,
136 })
137 }
138
139 pub async fn dream_status_aggregate(&self) -> DreamStatusPayload {
145 let palaces = PalaceRegistry::list_palaces(&self.state.data_root).unwrap_or_default();
146 let mut out = DreamStatusPayload::default();
147 let mut latest: Option<chrono::DateTime<chrono::Utc>> = None;
148 for p in palaces {
149 let data_dir = self.state.data_root.join(p.id.as_str());
150 let snap = match PersistedDreamStats::load(&data_dir) {
151 Ok(Some(s)) => s,
152 _ => continue,
153 };
154 out.merged = out.merged.saturating_add(snap.stats.merged);
155 out.pruned = out.pruned.saturating_add(snap.stats.pruned);
156 out.compacted = out.compacted.saturating_add(snap.stats.compacted);
157 out.closets_updated = out
158 .closets_updated
159 .saturating_add(snap.stats.closets_updated);
160 out.duration_ms = out.duration_ms.saturating_add(snap.stats.duration_ms);
161 latest = match latest {
162 Some(t) if t >= snap.last_run_at => Some(t),
163 _ => Some(snap.last_run_at),
164 };
165 }
166 out.last_run_at = latest;
167 out
168 }
169
170 pub async fn dream_status_for_palace(&self, id: &str) -> ServiceResult<DreamStatusPayload> {
172 let data_dir = self.state.data_root.join(id);
173 if !data_dir.exists() {
174 return Err(ServiceError::not_found(format!("palace not found: {id}")));
175 }
176 match PersistedDreamStats::load(&data_dir) {
177 Ok(Some(s)) => Ok(s.into()),
178 Ok(None) => Ok(DreamStatusPayload::default()),
179 Err(e) => Err(ServiceError::internal(format!("read dream stats: {e:#}"))),
180 }
181 }
182
183 pub async fn dream_run(&self) -> ServiceResult<DreamStatusPayload> {
185 let palaces = PalaceRegistry::list_palaces(&self.state.data_root)
186 .map_err(|e| ServiceError::internal(format!("list palaces: {e:#}")))?;
187 let dreamer = Dreamer::new(DreamConfig::default());
188 let mut out = DreamStatusPayload::default();
189 for p in palaces {
190 let handle = match self
191 .state
192 .registry
193 .open_palace(&self.state.data_root, &p.id)
194 {
195 Ok(h) => h,
196 Err(e) => {
197 tracing::warn!(palace = %p.id, "dream_run: open failed: {e:#}");
198 continue;
199 }
200 };
201 match dreamer.dream_cycle(&handle).await {
202 Ok(stats) => {
203 out.merged = out.merged.saturating_add(stats.merged);
204 out.pruned = out.pruned.saturating_add(stats.pruned);
205 out.compacted = out.compacted.saturating_add(stats.compacted);
206 out.closets_updated = out.closets_updated.saturating_add(stats.closets_updated);
207 out.duration_ms = out.duration_ms.saturating_add(stats.duration_ms);
208 }
209 Err(e) => tracing::warn!(palace = %p.id, "dream_run: cycle failed: {e:#}"),
210 }
211 refresh_gaps_cache(&self.state, &handle).await;
212 }
213 out.last_run_at = Some(chrono::Utc::now());
214 self.state.emit(DaemonEvent::DreamCompleted {
215 palace_id: None,
216 merged: out.merged,
217 pruned: out.pruned,
218 compacted: out.compacted,
219 closets_updated: out.closets_updated,
220 duration_ms: out.duration_ms,
221 source: ActivitySource::Http,
222 });
223 self.state.emit(self.aggregate_status_event());
224 Ok(out)
225 }
226
227 pub async fn list_activity(
233 &self,
234 filter: ActivityFilter,
235 limit: usize,
236 offset: usize,
237 ) -> ServiceResult<(Vec<crate::ActivityEntry>, u64)> {
238 let entries = self
239 .state
240 .activity_log
241 .list(&filter, limit, offset)
242 .map_err(|e| ServiceError::internal(format!("activity list: {e:#}")))?;
243 let total = self
244 .state
245 .activity_log
246 .count()
247 .map_err(|e| ServiceError::internal(format!("activity count: {e:#}")))?;
248 Ok((entries, total))
249 }
250
251 pub fn open_handle(&self, id: &str) -> ServiceResult<Arc<PalaceHandle>> {
257 self.state
258 .registry
259 .open_palace(&self.state.data_root, &PalaceId::new(id))
260 .map_err(|e| ServiceError::not_found(format!("palace not found: {id} ({e:#})")))
261 }
262}