Skip to main content

mnemo_core/query/
mod.rs

1pub mod branch;
2pub mod causality;
3pub mod checkpoint;
4pub mod conflict;
5pub mod event_builder;
6pub mod forget;
7pub mod lifecycle;
8pub mod merge;
9pub mod poisoning;
10pub mod recall;
11pub mod reflection;
12pub mod remember;
13pub mod replay;
14pub mod retrieval;
15pub mod share;
16
17use std::sync::Arc;
18
19use crate::cache::MemoryCache;
20use crate::embedding::EmbeddingProvider;
21use crate::encryption::ContentEncryption;
22use crate::error::{Error, Result};
23use crate::index::VectorIndex;
24use crate::search::FullTextIndex;
25use crate::storage::StorageBackend;
26use crate::storage::cold::ColdStorage;
27
28const MAX_AGENT_ID_LEN: usize = 256;
29
30/// Maximum number of records returned by a single batch query.
31/// Prevents unbounded memory growth while supporting reasonable workloads.
32pub const MAX_BATCH_QUERY_LIMIT: usize = 10_000;
33
34/// Validate that an agent_id contains only safe characters and is within length limits.
35pub fn validate_agent_id(agent_id: &str) -> Result<()> {
36    if agent_id.is_empty() {
37        return Err(Error::Validation("agent_id cannot be empty".into()));
38    }
39    if agent_id.len() > MAX_AGENT_ID_LEN {
40        return Err(Error::Validation(format!(
41            "agent_id exceeds max length of {MAX_AGENT_ID_LEN}"
42        )));
43    }
44    if !agent_id
45        .chars()
46        .all(|c| c.is_alphanumeric() || c == '-' || c == '_' || c == '.')
47    {
48        return Err(Error::Validation(
49            "agent_id must contain only alphanumeric characters, hyphens, underscores, or dots"
50                .into(),
51        ));
52    }
53    Ok(())
54}
55
56pub struct MnemoEngine {
57    pub storage: Arc<dyn StorageBackend>,
58    pub index: Arc<dyn VectorIndex>,
59    pub embedding: Arc<dyn EmbeddingProvider>,
60    pub full_text: Option<Arc<dyn FullTextIndex>>,
61    pub default_agent_id: String,
62    pub default_org_id: Option<String>,
63    pub encryption: Option<Arc<ContentEncryption>>,
64    pub cold_storage: Option<Arc<dyn ColdStorage>>,
65    pub cache: Option<Arc<MemoryCache>>,
66    pub embed_events: bool,
67    /// Default TTL applied to `Working`-tier memories whose `remember`
68    /// request does not supply an explicit `ttl_seconds`. Defaults to 1 hour.
69    pub ttl_working_seconds: u64,
70    /// Importance floor enforced on write for `Procedural`-tier memories.
71    /// Defaults to 0.8.
72    pub procedural_importance_floor: f32,
73    /// Poisoning policy read by `check_for_anomaly`. Defaults to the v0.3.2
74    /// behaviour (no z-score outlier gate). Override with
75    /// [`MnemoEngine::with_poisoning_policy`].
76    pub poisoning_policy: poisoning::PoisoningPolicy,
77}
78
79/// Default TTL (in seconds) applied to Working-tier memories.
80pub const DEFAULT_TTL_WORKING_SECONDS: u64 = 3600;
81
82/// Minimum importance floor applied to Procedural-tier memories on write.
83pub const DEFAULT_PROCEDURAL_IMPORTANCE_FLOOR: f32 = 0.8;
84
85impl MnemoEngine {
86    pub fn new(
87        storage: Arc<dyn StorageBackend>,
88        index: Arc<dyn VectorIndex>,
89        embedding: Arc<dyn EmbeddingProvider>,
90        default_agent_id: String,
91        default_org_id: Option<String>,
92    ) -> Self {
93        Self {
94            storage,
95            index,
96            embedding,
97            full_text: None,
98            default_agent_id,
99            default_org_id,
100            encryption: None,
101            cold_storage: None,
102            cache: None,
103            embed_events: false,
104            ttl_working_seconds: DEFAULT_TTL_WORKING_SECONDS,
105            procedural_importance_floor: DEFAULT_PROCEDURAL_IMPORTANCE_FLOOR,
106            poisoning_policy: poisoning::PoisoningPolicy::default(),
107        }
108    }
109
110    /// Attach a [`poisoning::PoisoningPolicy`] to the engine. See
111    /// [`poisoning::PoisoningPolicy::with_outlier_threshold`] for the
112    /// v0.3.3 z-score outlier gate.
113    pub fn with_poisoning_policy(mut self, policy: poisoning::PoisoningPolicy) -> Self {
114        self.poisoning_policy = policy;
115        self
116    }
117
118    /// Override the default 1-hour TTL applied to `Working`-tier memories
119    /// when a caller does not supply an explicit `ttl_seconds`.
120    pub fn with_ttl_working_seconds(mut self, seconds: u64) -> Self {
121        self.ttl_working_seconds = seconds;
122        self
123    }
124
125    /// Override the default 0.8 importance floor applied to Procedural
126    /// memories on write.
127    pub fn with_procedural_importance_floor(mut self, floor: f32) -> Self {
128        self.procedural_importance_floor = floor.clamp(0.0, 1.0);
129        self
130    }
131
132    pub fn with_full_text(mut self, ft: Arc<dyn FullTextIndex>) -> Self {
133        self.full_text = Some(ft);
134        self
135    }
136
137    pub fn with_encryption(mut self, enc: Arc<ContentEncryption>) -> Self {
138        self.encryption = Some(enc);
139        self
140    }
141
142    pub fn with_cold_storage(mut self, cs: Arc<dyn ColdStorage>) -> Self {
143        self.cold_storage = Some(cs);
144        self
145    }
146
147    pub fn with_cache(mut self, c: Arc<MemoryCache>) -> Self {
148        self.cache = Some(c);
149        self
150    }
151
152    pub fn with_event_embeddings(mut self) -> Self {
153        self.embed_events = true;
154        self
155    }
156
157    pub async fn remember(
158        &self,
159        request: remember::RememberRequest,
160    ) -> Result<remember::RememberResponse> {
161        remember::execute(self, request).await
162    }
163
164    pub async fn recall(&self, request: recall::RecallRequest) -> Result<recall::RecallResponse> {
165        recall::execute(self, request).await
166    }
167
168    pub async fn forget(&self, request: forget::ForgetRequest) -> Result<forget::ForgetResponse> {
169        forget::execute(self, request).await
170    }
171
172    /// Subject-scoped erasure for GDPR / DPDPA compliance.
173    /// See [`forget::forget_subject`] for strategy semantics.
174    pub async fn forget_subject(
175        &self,
176        request: forget::ForgetSubjectRequest,
177    ) -> Result<forget::ForgetSubjectResponse> {
178        forget::forget_subject(self, request).await
179    }
180
181    /// Hard-delete every memory whose `expires_at` is in the past and emit
182    /// one `MemoryExpired` audit event per deletion.
183    pub async fn run_ttl_sweep(&self) -> Result<lifecycle::TtlReport> {
184        lifecycle::run_ttl_sweep(self).await
185    }
186
187    /// Auto-Dream-compatible reflection pass: date absolutization, external
188    /// rewrite acceptance, semantic dedup, low-importance conflict
189    /// resolution, and stale archival. See [`reflection::run_reflection_pass`].
190    pub async fn run_reflection_pass(
191        &self,
192        agent_id: Option<String>,
193    ) -> Result<reflection::ReflectionReport> {
194        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
195        reflection::run_reflection_pass(self, &agent_id).await
196    }
197
198    /// Reflection pass that honours the new `ReflectionMode` gate (v0.3.1).
199    /// Use `Coordinated` to avoid double-work when Auto Dream is also running.
200    pub async fn run_reflection_pass_with_mode(
201        &self,
202        agent_id: Option<String>,
203        mode: reflection::ReflectionMode,
204        force: bool,
205    ) -> Result<reflection::ReflectionReport> {
206        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
207        reflection::run_reflection_pass_with_mode(self, &agent_id, mode, force).await
208    }
209
210    /// List quarantined memories for operator review. See
211    /// [`poisoning::replay_quarantine`].
212    pub async fn replay_quarantine(
213        &self,
214        agent_id: Option<String>,
215        since: Option<&str>,
216    ) -> Result<Vec<poisoning::QuarantineReplayEntry>> {
217        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
218        poisoning::replay_quarantine(self, &agent_id, since).await
219    }
220
221    pub async fn share(&self, request: share::ShareRequest) -> Result<share::ShareResponse> {
222        share::execute(self, request).await
223    }
224
225    pub async fn checkpoint(
226        &self,
227        request: checkpoint::CheckpointRequest,
228    ) -> Result<checkpoint::CheckpointResponse> {
229        checkpoint::execute(self, request).await
230    }
231
232    pub async fn branch(&self, request: branch::BranchRequest) -> Result<branch::BranchResponse> {
233        branch::execute(self, request).await
234    }
235
236    pub async fn merge(&self, request: merge::MergeRequest) -> Result<merge::MergeResponse> {
237        merge::execute(self, request).await
238    }
239
240    pub async fn replay(&self, request: replay::ReplayRequest) -> Result<replay::ReplayResponse> {
241        replay::execute(self, request).await
242    }
243
244    pub async fn run_decay_pass(
245        &self,
246        agent_id: Option<String>,
247        archive_threshold: f32,
248        forget_threshold: f32,
249    ) -> Result<lifecycle::DecayPassResult> {
250        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
251        lifecycle::run_decay_pass(self, &agent_id, archive_threshold, forget_threshold).await
252    }
253
254    pub async fn run_consolidation(
255        &self,
256        agent_id: Option<String>,
257        min_cluster_size: usize,
258    ) -> Result<lifecycle::ConsolidationResult> {
259        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
260        lifecycle::run_consolidation(self, &agent_id, min_cluster_size).await
261    }
262
263    pub async fn verify_integrity(
264        &self,
265        agent_id: Option<String>,
266        thread_id: Option<&str>,
267    ) -> Result<crate::hash::ChainVerificationResult> {
268        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
269        let records = self
270            .storage
271            .list_memories_by_agent_ordered(&agent_id, thread_id, 10000)
272            .await?;
273        Ok(crate::hash::verify_chain(&records))
274    }
275
276    pub async fn trace_causality(
277        &self,
278        event_id: uuid::Uuid,
279        max_depth: usize,
280    ) -> Result<causality::CausalChain> {
281        causality::trace_causality(
282            self,
283            event_id,
284            max_depth,
285            causality::TraceDirection::Down,
286            None,
287        )
288        .await
289    }
290
291    pub async fn trace_causality_with_options(
292        &self,
293        event_id: uuid::Uuid,
294        max_depth: usize,
295        direction: causality::TraceDirection,
296        event_type_filter: Option<crate::model::event::EventType>,
297    ) -> Result<causality::CausalChain> {
298        causality::trace_causality(self, event_id, max_depth, direction, event_type_filter).await
299    }
300
301    pub async fn verify_event_integrity(
302        &self,
303        agent_id: Option<String>,
304        thread_id: Option<&str>,
305    ) -> Result<crate::hash::ChainVerificationResult> {
306        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
307        let events = if let Some(tid) = thread_id {
308            self.storage.get_events_by_thread(tid, 10000).await?
309        } else {
310            // list_events returns DESC order; reverse to chronological for chain verification
311            let mut evts = self.storage.list_events(&agent_id, 10000, 0).await?;
312            evts.reverse();
313            evts
314        };
315        Ok(crate::hash::verify_event_chain(&events))
316    }
317
318    pub async fn detect_conflicts(
319        &self,
320        agent_id: Option<String>,
321        threshold: f32,
322    ) -> Result<conflict::ConflictDetectionResult> {
323        let agent_id = agent_id.unwrap_or_else(|| self.default_agent_id.clone());
324        conflict::detect_conflicts(self, &agent_id, threshold).await
325    }
326
327    pub async fn resolve_conflict(
328        &self,
329        conflict_pair: &conflict::ConflictPair,
330        strategy: conflict::ResolutionStrategy,
331    ) -> Result<()> {
332        conflict::resolve_conflict(self, conflict_pair, strategy).await
333    }
334}