Skip to main content

mnemo_core/query/
mod.rs

1pub mod branch;
2pub mod causality;
3pub mod checkpoint;
4pub mod conflict;
5pub mod event_builder;
6pub mod forget;
7pub mod lifecycle;
8pub mod merge;
9pub mod poisoning;
10pub mod recall;
11pub mod reflection;
12pub mod remember;
13pub mod replay;
14pub mod retrieval;
15pub mod share;
16
17use std::sync::Arc;
18
19use crate::cache::MemoryCache;
20use crate::embedding::EmbeddingProvider;
21use crate::encryption::ContentEncryption;
22use crate::error::{Error, Result};
23use crate::index::VectorIndex;
24use crate::search::FullTextIndex;
25use crate::storage::StorageBackend;
26use crate::storage::cold::ColdStorage;
27
28const MAX_AGENT_ID_LEN: usize = 256;
29
30/// Maximum number of records returned by a single batch query.
31/// Prevents unbounded memory growth while supporting reasonable workloads.
32pub const MAX_BATCH_QUERY_LIMIT: usize = 10_000;
33
34/// Validate that an agent_id contains only safe characters and is within length limits.
35pub fn validate_agent_id(agent_id: &str) -> Result<()> {
36    if agent_id.is_empty() {
37        return Err(Error::Validation("agent_id cannot be empty".into()));
38    }
39    if agent_id.len() > MAX_AGENT_ID_LEN {
40        return Err(Error::Validation(format!(
41            "agent_id exceeds max length of {MAX_AGENT_ID_LEN}"
42        )));
43    }
44    if !agent_id
45        .chars()
46        .all(|c| c.is_alphanumeric() || c == '-' || c == '_' || c == '.')
47    {
48        return Err(Error::Validation(
49            "agent_id must contain only alphanumeric characters, hyphens, underscores, or dots"
50                .into(),
51        ));
52    }
53    Ok(())
54}
55
56pub struct MnemoEngine {
57    pub storage: Arc<dyn StorageBackend>,
58    pub index: Arc<dyn VectorIndex>,
59    pub embedding: Arc<dyn EmbeddingProvider>,
60    pub full_text: Option<Arc<dyn FullTextIndex>>,
61    pub default_agent_id: String,
62    pub default_org_id: Option<String>,
63    pub encryption: Option<Arc<ContentEncryption>>,
64    pub cold_storage: Option<Arc<dyn ColdStorage>>,
65    pub cache: Option<Arc<MemoryCache>>,
66    pub embed_events: bool,
67    /// Default TTL applied to `Working`-tier memories whose `remember`
68    /// request does not supply an explicit `ttl_seconds`. Defaults to 1 hour.
69    pub ttl_working_seconds: u64,
70    /// Importance floor enforced on write for `Procedural`-tier memories.
71    /// Defaults to 0.8.
72    pub procedural_importance_floor: f32,
73    /// Poisoning policy read by `check_for_anomaly`. Defaults to the v0.3.2
74    /// behaviour (no z-score outlier gate). Override with
75    /// [`MnemoEngine::with_poisoning_policy`].
76    pub poisoning_policy: poisoning::PoisoningPolicy,
77    /// v0.4.0-rc3 (Task B1) — when set, every
78    /// `recall(req)` with `req.with_provenance == Some(true)` returns
79    /// an HMAC-signed [`ReadProvenance`](crate::provenance::ReadProvenance)
80    /// receipt. `None` keeps the recall hot-path overhead at zero.
81    /// Attach via [`MnemoEngine::with_provenance_signer`].
82    pub provenance_signer: Option<Arc<crate::provenance::ProvenanceSigner>>,
83}
84
85/// Default TTL (in seconds) applied to Working-tier memories.
86pub const DEFAULT_TTL_WORKING_SECONDS: u64 = 3600;
87
88/// Minimum importance floor applied to Procedural-tier memories on write.
89pub const DEFAULT_PROCEDURAL_IMPORTANCE_FLOOR: f32 = 0.8;
90
91impl MnemoEngine {
92    pub fn new(
93        storage: Arc<dyn StorageBackend>,
94        index: Arc<dyn VectorIndex>,
95        embedding: Arc<dyn EmbeddingProvider>,
96        default_agent_id: String,
97        default_org_id: Option<String>,
98    ) -> Self {
99        Self {
100            storage,
101            index,
102            embedding,
103            full_text: None,
104            default_agent_id,
105            default_org_id,
106            encryption: None,
107            cold_storage: None,
108            cache: None,
109            embed_events: false,
110            ttl_working_seconds: DEFAULT_TTL_WORKING_SECONDS,
111            procedural_importance_floor: DEFAULT_PROCEDURAL_IMPORTANCE_FLOOR,
112            poisoning_policy: poisoning::PoisoningPolicy::default(),
113            provenance_signer: None,
114        }
115    }
116
117    /// Attach a [`provenance::ProvenanceSigner`](crate::provenance::ProvenanceSigner)
118    /// (Task B1) so callers can request signed read-receipts via
119    /// `RecallRequest.with_provenance = Some(true)`.
120    pub fn with_provenance_signer(
121        mut self,
122        signer: Arc<crate::provenance::ProvenanceSigner>,
123    ) -> Self {
124        self.provenance_signer = Some(signer);
125        self
126    }
127
128    /// Attach a [`poisoning::PoisoningPolicy`] to the engine. See
129    /// [`poisoning::PoisoningPolicy::with_outlier_threshold`] for the
130    /// v0.3.3 z-score outlier gate.
131    pub fn with_poisoning_policy(mut self, policy: poisoning::PoisoningPolicy) -> Self {
132        self.poisoning_policy = policy;
133        self
134    }
135
136    /// Override the default 1-hour TTL applied to `Working`-tier memories
137    /// when a caller does not supply an explicit `ttl_seconds`.
138    pub fn with_ttl_working_seconds(mut self, seconds: u64) -> Self {
139        self.ttl_working_seconds = seconds;
140        self
141    }
142
143    /// Override the default 0.8 importance floor applied to Procedural
144    /// memories on write.
145    pub fn with_procedural_importance_floor(mut self, floor: f32) -> Self {
146        self.procedural_importance_floor = floor.clamp(0.0, 1.0);
147        self
148    }
149
150    pub fn with_full_text(mut self, ft: Arc<dyn FullTextIndex>) -> Self {
151        self.full_text = Some(ft);
152        self
153    }
154
155    pub fn with_encryption(mut self, enc: Arc<ContentEncryption>) -> Self {
156        self.encryption = Some(enc);
157        self
158    }
159
160    pub fn with_cold_storage(mut self, cs: Arc<dyn ColdStorage>) -> Self {
161        self.cold_storage = Some(cs);
162        self
163    }
164
165    pub fn with_cache(mut self, c: Arc<MemoryCache>) -> Self {
166        self.cache = Some(c);
167        self
168    }
169
170    pub fn with_event_embeddings(mut self) -> Self {
171        self.embed_events = true;
172        self
173    }
174
175    pub async fn remember(
176        &self,
177        request: remember::RememberRequest,
178    ) -> Result<remember::RememberResponse> {
179        remember::execute(self, request).await
180    }
181
182    pub async fn recall(&self, request: recall::RecallRequest) -> Result<recall::RecallResponse> {
183        recall::execute(self, request).await
184    }
185
186    pub async fn forget(&self, request: forget::ForgetRequest) -> Result<forget::ForgetResponse> {
187        forget::execute(self, request).await
188    }
189
190    /// Subject-scoped erasure for GDPR / DPDPA compliance.
191    /// See [`forget::forget_subject`] for strategy semantics.
192    pub async fn forget_subject(
193        &self,
194        request: forget::ForgetSubjectRequest,
195    ) -> Result<forget::ForgetSubjectResponse> {
196        forget::forget_subject(self, request).await
197    }
198
199    /// Hard-delete every memory whose `expires_at` is in the past and emit
200    /// one `MemoryExpired` audit event per deletion.
201    pub async fn run_ttl_sweep(&self) -> Result<lifecycle::TtlReport> {
202        lifecycle::run_ttl_sweep(self).await
203    }
204
205    /// Auto-Dream-compatible reflection pass: date absolutization, external
206    /// rewrite acceptance, semantic dedup, low-importance conflict
207    /// resolution, and stale archival. See [`reflection::run_reflection_pass`].
208    pub async fn run_reflection_pass(
209        &self,
210        agent_id: Option<String>,
211    ) -> Result<reflection::ReflectionReport> {
212        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
213        reflection::run_reflection_pass(self, &agent_id).await
214    }
215
216    /// Reflection pass that honours the new `ReflectionMode` gate (v0.3.1).
217    /// Use `Coordinated` to avoid double-work when Auto Dream is also running.
218    pub async fn run_reflection_pass_with_mode(
219        &self,
220        agent_id: Option<String>,
221        mode: reflection::ReflectionMode,
222        force: bool,
223    ) -> Result<reflection::ReflectionReport> {
224        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
225        reflection::run_reflection_pass_with_mode(self, &agent_id, mode, force).await
226    }
227
228    /// List quarantined memories for operator review. See
229    /// [`poisoning::replay_quarantine`].
230    pub async fn replay_quarantine(
231        &self,
232        agent_id: Option<String>,
233        since: Option<&str>,
234    ) -> Result<Vec<poisoning::QuarantineReplayEntry>> {
235        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
236        poisoning::replay_quarantine(self, &agent_id, since).await
237    }
238
239    pub async fn share(&self, request: share::ShareRequest) -> Result<share::ShareResponse> {
240        share::execute(self, request).await
241    }
242
243    pub async fn checkpoint(
244        &self,
245        request: checkpoint::CheckpointRequest,
246    ) -> Result<checkpoint::CheckpointResponse> {
247        checkpoint::execute(self, request).await
248    }
249
250    pub async fn branch(&self, request: branch::BranchRequest) -> Result<branch::BranchResponse> {
251        branch::execute(self, request).await
252    }
253
254    pub async fn merge(&self, request: merge::MergeRequest) -> Result<merge::MergeResponse> {
255        merge::execute(self, request).await
256    }
257
258    pub async fn replay(&self, request: replay::ReplayRequest) -> Result<replay::ReplayResponse> {
259        replay::execute(self, request).await
260    }
261
262    pub async fn run_decay_pass(
263        &self,
264        agent_id: Option<String>,
265        archive_threshold: f32,
266        forget_threshold: f32,
267    ) -> Result<lifecycle::DecayPassResult> {
268        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
269        lifecycle::run_decay_pass(self, &agent_id, archive_threshold, forget_threshold).await
270    }
271
272    pub async fn run_consolidation(
273        &self,
274        agent_id: Option<String>,
275        min_cluster_size: usize,
276    ) -> Result<lifecycle::ConsolidationResult> {
277        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
278        lifecycle::run_consolidation(self, &agent_id, min_cluster_size).await
279    }
280
281    pub async fn verify_integrity(
282        &self,
283        agent_id: Option<String>,
284        thread_id: Option<&str>,
285    ) -> Result<crate::hash::ChainVerificationResult> {
286        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
287        let records = self
288            .storage
289            .list_memories_by_agent_ordered(&agent_id, thread_id, 10000)
290            .await?;
291        Ok(crate::hash::verify_chain(&records))
292    }
293
294    pub async fn trace_causality(
295        &self,
296        event_id: uuid::Uuid,
297        max_depth: usize,
298    ) -> Result<causality::CausalChain> {
299        causality::trace_causality(
300            self,
301            event_id,
302            max_depth,
303            causality::TraceDirection::Down,
304            None,
305        )
306        .await
307    }
308
309    pub async fn trace_causality_with_options(
310        &self,
311        event_id: uuid::Uuid,
312        max_depth: usize,
313        direction: causality::TraceDirection,
314        event_type_filter: Option<crate::model::event::EventType>,
315    ) -> Result<causality::CausalChain> {
316        causality::trace_causality(self, event_id, max_depth, direction, event_type_filter).await
317    }
318
319    pub async fn verify_event_integrity(
320        &self,
321        agent_id: Option<String>,
322        thread_id: Option<&str>,
323    ) -> Result<crate::hash::ChainVerificationResult> {
324        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
325        let events = if let Some(tid) = thread_id {
326            self.storage.get_events_by_thread(tid, 10000).await?
327        } else {
328            // list_events returns DESC order; reverse to chronological for chain verification
329            let mut evts = self.storage.list_events(&agent_id, 10000, 0).await?;
330            evts.reverse();
331            evts
332        };
333        Ok(crate::hash::verify_event_chain(&events))
334    }
335
336    pub async fn detect_conflicts(
337        &self,
338        agent_id: Option<String>,
339        threshold: f32,
340    ) -> Result<conflict::ConflictDetectionResult> {
341        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
342        conflict::detect_conflicts(self, &agent_id, threshold).await
343    }
344
345    pub async fn resolve_conflict(
346        &self,
347        conflict_pair: &conflict::ConflictPair,
348        strategy: conflict::ResolutionStrategy,
349    ) -> Result<()> {
350        conflict::resolve_conflict(self, conflict_pair, strategy).await
351    }
352}