Skip to main content

trusty_memory/service/
core_kg.rs

1//! `MemoryService` knowledge-graph / dream / activity methods.
2//!
3//! Why: the `MemoryService` impl exceeded the 500-SLOC production cap once the
4//! former monolithic `service.rs` was split (issue #607); its KG, dream-cycle,
5//! and activity-listing methods form a cohesive second half hosted here.
6//! What: a continuation `impl MemoryService` block whose methods were moved
7//! verbatim from the original single impl.
8//! Test: covered by the corresponding `web::tests` / `service::tests`.
9
10use crate::{ActivityFilter, ActivitySource, DaemonEvent};
11use std::sync::Arc;
12use trusty_common::memory_core::dream::{DreamConfig, Dreamer, PersistedDreamStats};
13use trusty_common::memory_core::palace::PalaceId;
14use trusty_common::memory_core::store::kg::Triple;
15use trusty_common::memory_core::{PalaceHandle, PalaceRegistry};
16
17use super::core::KG_GRAPH_MAX_TRIPLES;
18use super::helpers::refresh_gaps_cache;
19use super::types::{DreamStatusPayload, KgAssertBody, KgGraphPayload, ServiceError, ServiceResult};
20use super::MemoryService;
21
22impl MemoryService {
23    // -----------------------------------------------------------------
24    // Knowledge graph
25    // -----------------------------------------------------------------
26
27    /// Query the KG for all active triples whose subject matches.
28    pub async fn kg_query(&self, id: &str, subject: &str) -> ServiceResult<Vec<Triple>> {
29        let handle = self.open_handle(id)?;
30        handle
31            .kg
32            .query_active(subject)
33            .await
34            .map_err(|e| ServiceError::internal(format!("kg query: {e:#}")))
35    }
36
37    /// Assert a triple in the KG.
38    pub async fn kg_assert(&self, id: &str, body: KgAssertBody) -> ServiceResult<()> {
39        let handle = self.open_handle(id)?;
40        let triple = Triple {
41            subject: body.subject,
42            predicate: body.predicate,
43            object: body.object,
44            valid_from: chrono::Utc::now(),
45            valid_to: None,
46            confidence: body.confidence.unwrap_or(1.0),
47            provenance: body.provenance,
48        };
49        handle
50            .kg
51            .assert(triple)
52            .await
53            .map_err(|e| ServiceError::internal(format!("kg assert: {e:#}")))
54    }
55
56    /// Retract the single active triple identified by `(subject, predicate)`.
57    ///
58    /// Why: Issue #278 — the `DELETE /kg/triples/<id>` HTTP endpoint needs a
59    /// service-layer method so the HTTP handler stays a thin adapter.
60    /// What: Opens the palace handle, calls `KnowledgeGraph::retract`, and
61    /// maps the closed count to a 204/404 signal: `Ok(true)` when at least
62    /// one interval was closed, `Ok(false)` when no active triple matched.
63    /// Test: Covered by `kg_delete_triple_returns_204_on_success` in
64    /// `web::tests`.
65    pub async fn kg_retract_triple(
66        &self,
67        id: &str,
68        subject: &str,
69        predicate: &str,
70    ) -> ServiceResult<bool> {
71        let handle = self.open_handle(id)?;
72        let closed = handle
73            .kg
74            .retract(subject, predicate)
75            .await
76            .map_err(|e| ServiceError::internal(format!("kg retract: {e:#}")))?;
77        Ok(closed > 0)
78    }
79
80    /// List distinct subjects in the KG.
81    pub async fn kg_list_subjects(&self, id: &str, limit: usize) -> ServiceResult<Vec<String>> {
82        let handle = self.open_handle(id)?;
83        handle
84            .kg
85            .list_subjects(limit)
86            .map_err(|e| ServiceError::internal(format!("kg list_subjects: {e:#}")))
87    }
88
89    /// List distinct subjects in the KG paired with their active-triple count.
90    pub async fn kg_list_subjects_with_counts(
91        &self,
92        id: &str,
93        limit: usize,
94    ) -> ServiceResult<Vec<(String, u64)>> {
95        let handle = self.open_handle(id)?;
96        handle
97            .kg
98            .list_subjects_with_counts(limit)
99            .map_err(|e| ServiceError::internal(format!("kg list_subjects_with_counts: {e:#}")))
100    }
101
102    /// Page through every active triple.
103    pub async fn kg_list_all(
104        &self,
105        id: &str,
106        limit: usize,
107        offset: usize,
108    ) -> ServiceResult<Vec<Triple>> {
109        let handle = self.open_handle(id)?;
110        handle
111            .kg
112            .list_active(limit, offset)
113            .await
114            .map_err(|e| ServiceError::internal(format!("kg list_active: {e:#}")))
115    }
116
117    /// Return the count of currently-active triples.
118    pub async fn kg_count(&self, id: &str) -> ServiceResult<usize> {
119        let handle = self.open_handle(id)?;
120        Ok(handle.kg.count_active_triples())
121    }
122
123    /// Build the per-palace visual graph payload.
124    pub async fn kg_graph(&self, id: &str) -> ServiceResult<KgGraphPayload> {
125        let handle = self.open_handle(id)?;
126        let triples = handle
127            .kg
128            .list_active(KG_GRAPH_MAX_TRIPLES, 0)
129            .await
130            .map_err(|e| ServiceError::internal(format!("kg list_active: {e:#}")))?;
131        Ok(KgGraphPayload {
132            triples,
133            node_count: handle.kg.node_count() as u64,
134            edge_count: handle.kg.edge_count() as u64,
135            community_count: handle.kg.community_count() as u64,
136        })
137    }
138
139    // -----------------------------------------------------------------
140    // Dream cycle
141    // -----------------------------------------------------------------
142
143    /// Aggregate dream stats across every persisted palace.
144    pub async fn dream_status_aggregate(&self) -> DreamStatusPayload {
145        let palaces = PalaceRegistry::list_palaces(&self.state.data_root).unwrap_or_default();
146        let mut out = DreamStatusPayload::default();
147        let mut latest: Option<chrono::DateTime<chrono::Utc>> = None;
148        for p in palaces {
149            let data_dir = self.state.data_root.join(p.id.as_str());
150            let snap = match PersistedDreamStats::load(&data_dir) {
151                Ok(Some(s)) => s,
152                _ => continue,
153            };
154            out.merged = out.merged.saturating_add(snap.stats.merged);
155            out.pruned = out.pruned.saturating_add(snap.stats.pruned);
156            out.compacted = out.compacted.saturating_add(snap.stats.compacted);
157            out.closets_updated = out
158                .closets_updated
159                .saturating_add(snap.stats.closets_updated);
160            out.duration_ms = out.duration_ms.saturating_add(snap.stats.duration_ms);
161            latest = match latest {
162                Some(t) if t >= snap.last_run_at => Some(t),
163                _ => Some(snap.last_run_at),
164            };
165        }
166        out.last_run_at = latest;
167        out
168    }
169
170    /// Per-palace dream stats snapshot.
171    pub async fn dream_status_for_palace(&self, id: &str) -> ServiceResult<DreamStatusPayload> {
172        let data_dir = self.state.data_root.join(id);
173        if !data_dir.exists() {
174            return Err(ServiceError::not_found(format!("palace not found: {id}")));
175        }
176        match PersistedDreamStats::load(&data_dir) {
177            Ok(Some(s)) => Ok(s.into()),
178            Ok(None) => Ok(DreamStatusPayload::default()),
179            Err(e) => Err(ServiceError::internal(format!("read dream stats: {e:#}"))),
180        }
181    }
182
183    /// Run a dream cycle across every palace.
184    pub async fn dream_run(&self) -> ServiceResult<DreamStatusPayload> {
185        let palaces = PalaceRegistry::list_palaces(&self.state.data_root)
186            .map_err(|e| ServiceError::internal(format!("list palaces: {e:#}")))?;
187        let dreamer = Dreamer::new(DreamConfig::default());
188        let mut out = DreamStatusPayload::default();
189        for p in palaces {
190            let handle = match self
191                .state
192                .registry
193                .open_palace(&self.state.data_root, &p.id)
194            {
195                Ok(h) => h,
196                Err(e) => {
197                    tracing::warn!(palace = %p.id, "dream_run: open failed: {e:#}");
198                    continue;
199                }
200            };
201            match dreamer.dream_cycle(&handle).await {
202                Ok(stats) => {
203                    out.merged = out.merged.saturating_add(stats.merged);
204                    out.pruned = out.pruned.saturating_add(stats.pruned);
205                    out.compacted = out.compacted.saturating_add(stats.compacted);
206                    out.closets_updated = out.closets_updated.saturating_add(stats.closets_updated);
207                    out.duration_ms = out.duration_ms.saturating_add(stats.duration_ms);
208                }
209                Err(e) => tracing::warn!(palace = %p.id, "dream_run: cycle failed: {e:#}"),
210            }
211            refresh_gaps_cache(&self.state, &handle).await;
212        }
213        out.last_run_at = Some(chrono::Utc::now());
214        self.state.emit(DaemonEvent::DreamCompleted {
215            palace_id: None,
216            merged: out.merged,
217            pruned: out.pruned,
218            compacted: out.compacted,
219            closets_updated: out.closets_updated,
220            duration_ms: out.duration_ms,
221            source: ActivitySource::Http,
222        });
223        self.state.emit(self.aggregate_status_event());
224        Ok(out)
225    }
226
227    // -----------------------------------------------------------------
228    // Activity log
229    // -----------------------------------------------------------------
230
231    /// Paginated activity-log read.
232    pub async fn list_activity(
233        &self,
234        filter: ActivityFilter,
235        limit: usize,
236        offset: usize,
237    ) -> ServiceResult<(Vec<crate::ActivityEntry>, u64)> {
238        let entries = self
239            .state
240            .activity_log
241            .list(&filter, limit, offset)
242            .map_err(|e| ServiceError::internal(format!("activity list: {e:#}")))?;
243        let total = self
244            .state
245            .activity_log
246            .count()
247            .map_err(|e| ServiceError::internal(format!("activity count: {e:#}")))?;
248        Ok((entries, total))
249    }
250
251    // -----------------------------------------------------------------
252    // Internal helper — open a palace handle or return 404.
253    // -----------------------------------------------------------------
254
255    /// Open the named palace, returning `ServiceError::NotFound` on failure.
256    pub fn open_handle(&self, id: &str) -> ServiceResult<Arc<PalaceHandle>> {
257        self.state
258            .registry
259            .open_palace(&self.state.data_root, &PalaceId::new(id))
260            .map_err(|e| ServiceError::not_found(format!("palace not found: {id} ({e:#})")))
261    }
262}