Skip to main content

mnemo_core/query/
forget.rs

1use serde::{Deserialize, Serialize};
2use uuid::Uuid;
3
4use crate::error::{Error, Result};
5use crate::hash::compute_content_hash;
6use crate::model::acl::Permission;
7use crate::model::event::{AgentEvent, EventType};
8use crate::model::memory::MemoryType;
9use crate::query::MnemoEngine;
10use crate::storage::MemoryFilter;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "snake_case")]
14pub enum ForgetStrategy {
15    SoftDelete,
16    HardDelete,
17    Decay,
18    Consolidate,
19    Archive,
20    /// GDPR-aligned content erasure: replace the content with a redaction
21    /// marker while keeping the existing `content_hash` and `prev_hash`
22    /// intact so the audit trail (who wrote when) remains verifiable.
23    Redact,
24}
25
26/// Sentinel content written in place of redacted memories.
27pub const REDACTED_CONTENT: &str = "[REDACTED]";
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct ForgetCriteria {
31    pub max_age_hours: Option<f64>,
32    pub min_importance_below: Option<f32>,
33    pub memory_type: Option<MemoryType>,
34    pub tags: Option<Vec<String>>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ForgetRequest {
39    pub memory_ids: Vec<Uuid>,
40    pub agent_id: Option<String>,
41    pub strategy: Option<ForgetStrategy>,
42    pub criteria: Option<ForgetCriteria>,
43}
44
45impl ForgetRequest {
46    pub fn new(memory_ids: Vec<Uuid>) -> Self {
47        Self {
48            memory_ids,
49            agent_id: None,
50            strategy: None,
51            criteria: None,
52        }
53    }
54}
55
56#[non_exhaustive]
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct ForgetResponse {
59    pub forgotten: Vec<Uuid>,
60    pub errors: Vec<ForgetError>,
61}
62
63impl ForgetResponse {
64    pub fn new(forgotten: Vec<Uuid>, errors: Vec<ForgetError>) -> Self {
65        Self { forgotten, errors }
66    }
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct ForgetError {
71    pub id: Uuid,
72    pub error: String,
73}
74
75pub async fn execute(engine: &MnemoEngine, request: ForgetRequest) -> Result<ForgetResponse> {
76    let agent_id = request
77        .agent_id
78        .unwrap_or_else(|| engine.default_agent_id.clone());
79    let strategy = request.strategy.unwrap_or(ForgetStrategy::SoftDelete);
80
81    // If criteria is specified and memory_ids is empty, find matching memories
82    let memory_ids = if request.memory_ids.is_empty() {
83        if let Some(ref criteria) = request.criteria {
84            let filter = MemoryFilter {
85                agent_id: Some(agent_id.clone()),
86                memory_type: criteria.memory_type,
87                min_importance: None, // We'll filter below
88                tags: criteria.tags.clone(),
89                include_deleted: false,
90                ..Default::default()
91            };
92            let memories = engine.storage.list_memories(&filter, 1000, 0).await?;
93            let now = chrono::Utc::now();
94            memories
95                .into_iter()
96                .filter(|m| {
97                    if let Some(max_age) = criteria.max_age_hours
98                        && let Ok(created) = chrono::DateTime::parse_from_rfc3339(&m.created_at)
99                    {
100                        let age_hours = (now - created.with_timezone(&chrono::Utc)).num_seconds()
101                            as f64
102                            / 3600.0;
103                        if age_hours < max_age {
104                            return false;
105                        }
106                    }
107                    if let Some(min_below) = criteria.min_importance_below
108                        && m.importance >= min_below
109                    {
110                        return false;
111                    }
112                    true
113                })
114                .map(|m| m.id)
115                .collect()
116        } else {
117            return Err(Error::Validation(
118                "memory_ids or criteria must be provided".to_string(),
119            ));
120        }
121    } else {
122        request.memory_ids.clone()
123    };
124
125    if memory_ids.is_empty() {
126        return Ok(ForgetResponse {
127            forgotten: vec![],
128            errors: vec![],
129        });
130    }
131
132    let mut forgotten = Vec::new();
133    let mut errors = Vec::new();
134
135    for id in &memory_ids {
136        // Check permission
137        match engine
138            .storage
139            .check_permission(*id, &agent_id, Permission::Write)
140            .await
141        {
142            Ok(true) => {}
143            Ok(false) => {
144                errors.push(ForgetError {
145                    id: *id,
146                    error: "permission denied".to_string(),
147                });
148                continue;
149            }
150            Err(e) => {
151                errors.push(ForgetError {
152                    id: *id,
153                    error: e.to_string(),
154                });
155                continue;
156            }
157        }
158
159        // Execute strategy
160        match strategy {
161            ForgetStrategy::SoftDelete => match engine.storage.soft_delete_memory(*id).await {
162                Ok(()) => {
163                    if let Err(e) = engine.index.remove(*id) {
164                        tracing::error!(memory_id = %id, error = %e, "failed to remove from vector index during soft delete");
165                    }
166                    if let Some(ref ft) = engine.full_text {
167                        if let Err(e) = ft.remove(*id) {
168                            tracing::error!(memory_id = %id, error = %e, "failed to remove from full-text index");
169                        }
170                        if let Err(e) = ft.commit() {
171                            tracing::error!(memory_id = %id, error = %e, "failed to commit full-text index");
172                        }
173                    }
174                    forgotten.push(*id);
175                }
176                Err(e) => {
177                    errors.push(ForgetError {
178                        id: *id,
179                        error: e.to_string(),
180                    });
181                }
182            },
183            ForgetStrategy::HardDelete => match engine.storage.hard_delete_memory(*id).await {
184                Ok(()) => {
185                    if let Err(e) = engine.index.remove(*id) {
186                        tracing::error!(memory_id = %id, error = %e, "failed to remove from vector index during hard delete");
187                    }
188                    if let Some(ref ft) = engine.full_text {
189                        if let Err(e) = ft.remove(*id) {
190                            tracing::error!(memory_id = %id, error = %e, "failed to remove from full-text index");
191                        }
192                        if let Err(e) = ft.commit() {
193                            tracing::error!(memory_id = %id, error = %e, "failed to commit full-text index");
194                        }
195                    }
196                    forgotten.push(*id);
197                }
198                Err(e) => {
199                    errors.push(ForgetError {
200                        id: *id,
201                        error: e.to_string(),
202                    });
203                }
204            },
205            ForgetStrategy::Decay => match engine.storage.get_memory(*id).await {
206                Ok(Some(mut record)) => {
207                    let decay_rate = record.decay_rate.unwrap_or(0.1);
208                    record.importance = (record.importance - decay_rate).max(0.0);
209                    record.updated_at = chrono::Utc::now().to_rfc3339();
210                    match engine.storage.update_memory(&record).await {
211                        Ok(()) => forgotten.push(*id),
212                        Err(e) => errors.push(ForgetError {
213                            id: *id,
214                            error: e.to_string(),
215                        }),
216                    }
217                }
218                Ok(None) => errors.push(ForgetError {
219                    id: *id,
220                    error: "not found".to_string(),
221                }),
222                Err(e) => errors.push(ForgetError {
223                    id: *id,
224                    error: e.to_string(),
225                }),
226            },
227            ForgetStrategy::Archive => {
228                match engine.storage.get_memory(*id).await {
229                    Ok(Some(mut record)) => {
230                        record.consolidation_state =
231                            crate::model::memory::ConsolidationState::Archived;
232                        record.updated_at = chrono::Utc::now().to_rfc3339();
233                        match engine.storage.update_memory(&record).await {
234                            Ok(()) => {
235                                // Archive to cold storage if configured
236                                if let Some(ref cs) = engine.cold_storage
237                                    && let Err(e) = cs.archive(&record).await
238                                {
239                                    tracing::warn!("cold storage archive failed for {}: {e}", id);
240                                }
241                                forgotten.push(*id);
242                            }
243                            Err(e) => errors.push(ForgetError {
244                                id: *id,
245                                error: e.to_string(),
246                            }),
247                        }
248                    }
249                    Ok(None) => errors.push(ForgetError {
250                        id: *id,
251                        error: "not found".to_string(),
252                    }),
253                    Err(e) => errors.push(ForgetError {
254                        id: *id,
255                        error: e.to_string(),
256                    }),
257                }
258            }
259            ForgetStrategy::Consolidate => match engine.storage.get_memory(*id).await {
260                Ok(Some(mut record)) => {
261                    record.consolidation_state =
262                        crate::model::memory::ConsolidationState::Consolidated;
263                    record.updated_at = chrono::Utc::now().to_rfc3339();
264                    match engine.storage.update_memory(&record).await {
265                        Ok(()) => forgotten.push(*id),
266                        Err(e) => errors.push(ForgetError {
267                            id: *id,
268                            error: e.to_string(),
269                        }),
270                    }
271                }
272                Ok(None) => errors.push(ForgetError {
273                    id: *id,
274                    error: "not found".to_string(),
275                }),
276                Err(e) => errors.push(ForgetError {
277                    id: *id,
278                    error: e.to_string(),
279                }),
280            },
281            ForgetStrategy::Redact => {
282                // GDPR erasure: replace content, leave content_hash + prev_hash
283                // untouched so downstream chain verification still works.
284                match engine.storage.get_memory(*id).await {
285                    Ok(Some(mut record)) => {
286                        record.content = REDACTED_CONTENT.to_string();
287                        record.tags.retain(|t| !t.starts_with("subject:"));
288                        record.metadata = serde_json::json!({"redacted": true});
289                        record.updated_at = chrono::Utc::now().to_rfc3339();
290                        match engine.storage.update_memory(&record).await {
291                            Ok(()) => {
292                                if let Err(e) = engine.index.remove(*id) {
293                                    tracing::error!(memory_id = %id, error = %e, "failed to remove from vector index during redact");
294                                }
295                                if let Some(ref ft) = engine.full_text {
296                                    if let Err(e) = ft.remove(*id) {
297                                        tracing::error!(memory_id = %id, error = %e, "failed to remove from full-text index during redact");
298                                    }
299                                    if let Err(e) = ft.commit() {
300                                        tracing::error!(memory_id = %id, error = %e, "failed to commit full-text index during redact");
301                                    }
302                                }
303                                if let Some(ref cache) = engine.cache {
304                                    cache.invalidate(*id);
305                                }
306                                forgotten.push(*id);
307                            }
308                            Err(e) => errors.push(ForgetError {
309                                id: *id,
310                                error: e.to_string(),
311                            }),
312                        }
313                    }
314                    Ok(None) => errors.push(ForgetError {
315                        id: *id,
316                        error: "not found".to_string(),
317                    }),
318                    Err(e) => errors.push(ForgetError {
319                        id: *id,
320                        error: e.to_string(),
321                    }),
322                }
323            }
324        }
325    }
326
327    // Emit MemoryDelete event for each forgotten memory with hash chaining (fire-and-forget)
328    let now = chrono::Utc::now().to_rfc3339();
329    for id in &forgotten {
330        let event_content_hash = compute_content_hash(&id.to_string(), &agent_id, &now);
331        let prev_event_hash = match engine.storage.get_latest_event_hash(&agent_id, None).await {
332            Ok(hash) => hash,
333            Err(e) => {
334                tracing::warn!(error = %e, "failed to get latest event hash, starting new chain segment");
335                None
336            }
337        };
338        let event_prev_hash = Some(crate::hash::compute_chain_hash(
339            &event_content_hash,
340            prev_event_hash.as_deref(),
341        ));
342        let event = AgentEvent {
343            id: Uuid::now_v7(),
344            agent_id: agent_id.clone(),
345            thread_id: None,
346            run_id: None,
347            parent_event_id: None,
348            event_type: EventType::MemoryDelete,
349            payload: serde_json::json!({"memory_id": id.to_string()}),
350            trace_id: None,
351            span_id: None,
352            model: None,
353            tokens_input: None,
354            tokens_output: None,
355            latency_ms: None,
356            cost_usd: None,
357            timestamp: now.clone(),
358            logical_clock: 0,
359            content_hash: event_content_hash,
360            prev_hash: event_prev_hash,
361            embedding: None,
362        };
363        if let Err(e) = engine.storage.insert_event(&event).await {
364            tracing::error!(event_id = %event.id, error = %e, "failed to insert audit event");
365        }
366
367        // Invalidate cache on forget
368        if let Some(ref cache) = engine.cache {
369            cache.invalidate(*id);
370        }
371    }
372
373    Ok(ForgetResponse { forgotten, errors })
374}
375
376/// Tag convention used by :fn:`forget_subject` to locate memories owned by
377/// a given subject (e.g. end-user or data principal under GDPR/DPDPA).
378pub const SUBJECT_TAG_PREFIX: &str = "subject:";
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct ForgetSubjectRequest {
382    /// Opaque identifier for the subject whose memories should be erased.
383    /// Memories are matched via tag `subject:<subject_id>`.
384    pub subject_id: String,
385    /// Optional agent scope; defaults to the engine default.
386    pub agent_id: Option<String>,
387    /// Erasure strategy. Only `Redact` and `HardDelete` are meaningful for
388    /// a subject-scoped operation; other strategies are accepted and passed
389    /// through to the standard forget pipeline.
390    pub strategy: ForgetStrategy,
391}
392
393#[non_exhaustive]
394#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct ForgetSubjectResponse {
396    pub subject_id: String,
397    pub strategy: ForgetStrategy,
398    pub matched: usize,
399    pub forgotten: Vec<Uuid>,
400    pub cascaded_events: usize,
401    pub errors: Vec<ForgetError>,
402}
403
404/// Erase every memory tagged with `subject:<subject_id>`, using the
405/// specified strategy.
406///
407/// * `Redact` preserves the existing `content_hash` and `prev_hash`
408///   so downstream chain verification still succeeds; only the content
409///   itself becomes the REDACTED marker.
410/// * `HardDelete` removes the memory row and cascades to emit a
411///   `MemoryDelete` audit event per record; no further cascade is
412///   performed against events referencing the memory id, those remain
413///   available for audit.
414pub async fn forget_subject(
415    engine: &MnemoEngine,
416    request: ForgetSubjectRequest,
417) -> Result<ForgetSubjectResponse> {
418    if request.subject_id.is_empty() {
419        return Err(Error::Validation("subject_id cannot be empty".to_string()));
420    }
421    let agent_id = request
422        .agent_id
423        .clone()
424        .unwrap_or_else(|| engine.default_agent_id.clone());
425    super::validate_agent_id(&agent_id)?;
426
427    let subject_tag = format!("{SUBJECT_TAG_PREFIX}{}", request.subject_id);
428    // The shared storage backends don't push the tag predicate into SQL yet,
429    // so we filter in Rust. This is O(n) over the agent's non-deleted memories,
430    // which is acceptable for GDPR-erasure workloads and keeps the contract
431    // consistent across DuckDB / PostgreSQL.
432    let filter = MemoryFilter {
433        agent_id: Some(agent_id.clone()),
434        include_deleted: false,
435        ..Default::default()
436    };
437    let all_records = engine
438        .storage
439        .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
440        .await?;
441    let matched_records: Vec<_> = all_records
442        .into_iter()
443        .filter(|r| r.tags.iter().any(|t| t == &subject_tag))
444        .collect();
445    let matched = matched_records.len();
446    let ids: Vec<Uuid> = matched_records.iter().map(|r| r.id).collect();
447
448    if ids.is_empty() {
449        return Ok(ForgetSubjectResponse {
450            subject_id: request.subject_id,
451            strategy: request.strategy,
452            matched,
453            forgotten: Vec::new(),
454            cascaded_events: 0,
455            errors: Vec::new(),
456        });
457    }
458
459    // Audit events that reference these memories are intentionally preserved:
460    // removing them would break the per-agent hash chain and destroy the
461    // GDPR-aligned audit trail. Callers who need to erase event payloads
462    // containing PII should issue a targeted Redact against the relevant
463    // event rows (not yet exposed as a public API).
464    let cascaded_events: usize = 0;
465
466    // Re-use the standard forget pipeline for the actual deletion/redaction
467    // so audit-event semantics stay consistent with mnemo.forget.
468    let standard_req = ForgetRequest {
469        memory_ids: ids,
470        agent_id: Some(agent_id.clone()),
471        strategy: Some(request.strategy),
472        criteria: None,
473    };
474    let resp = execute(engine, standard_req).await?;
475
476    // For Redact, emit a MemoryRedact event per affected memory so auditors
477    // can distinguish redactions from ordinary deletes.
478    if request.strategy == ForgetStrategy::Redact {
479        let now = chrono::Utc::now().to_rfc3339();
480        for id in &resp.forgotten {
481            let content_hash = compute_content_hash(
482                &format!("redact:{id}:{}", request.subject_id),
483                &agent_id,
484                &now,
485            );
486            let prev_hash_raw = engine
487                .storage
488                .get_latest_event_hash(&agent_id, None)
489                .await
490                .ok()
491                .flatten();
492            let event_prev_hash = Some(crate::hash::compute_chain_hash(
493                &content_hash,
494                prev_hash_raw.as_deref(),
495            ));
496            let event = AgentEvent {
497                id: Uuid::now_v7(),
498                agent_id: agent_id.clone(),
499                thread_id: None,
500                run_id: None,
501                parent_event_id: None,
502                event_type: EventType::MemoryRedact,
503                payload: serde_json::json!({
504                    "memory_id": id.to_string(),
505                    "subject_id": request.subject_id,
506                }),
507                trace_id: None,
508                span_id: None,
509                model: None,
510                tokens_input: None,
511                tokens_output: None,
512                latency_ms: None,
513                cost_usd: None,
514                timestamp: now.clone(),
515                logical_clock: 0,
516                content_hash,
517                prev_hash: event_prev_hash,
518                embedding: None,
519            };
520            if let Err(e) = engine.storage.insert_event(&event).await {
521                tracing::error!(
522                    event_id = %event.id,
523                    error = %e,
524                    "failed to insert MemoryRedact event"
525                );
526            }
527        }
528    }
529
530    Ok(ForgetSubjectResponse {
531        subject_id: request.subject_id,
532        strategy: request.strategy,
533        matched,
534        forgotten: resp.forgotten,
535        cascaded_events,
536        errors: resp.errors,
537    })
538}