Skip to main content

mnemo_core/query/
lifecycle.rs

1use serde::{Deserialize, Serialize};
2use uuid::Uuid;
3
4use crate::error::Result;
5use crate::hash::compute_content_hash;
6use crate::model::event::{AgentEvent, EventType};
7use crate::model::memory::{ConsolidationState, MemoryRecord, MemoryType, SourceType};
8use crate::model::relation::Relation;
9use crate::query::MnemoEngine;
10use crate::storage::MemoryFilter;
11
12/// Custom decay function types.
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
14#[serde(rename_all = "snake_case")]
15pub enum DecayFunction {
16    /// Exponential: base * e^(-rate * hours)  (default, Ebbinghaus-inspired)
17    Exponential,
18    /// Linear: base * max(0, 1 - rate * hours)
19    Linear,
20    /// Step function: base importance until threshold hours, then 0
21    StepFunction(f32),
22    /// Power law: base / (1 + rate * hours)^alpha
23    PowerLaw(f32),
24}
25
26impl DecayFunction {
27    pub fn from_str_opt(s: &str) -> Option<Self> {
28        match s {
29            "exponential" => Some(DecayFunction::Exponential),
30            "linear" => Some(DecayFunction::Linear),
31            s if s.starts_with("step:") => {
32                s[5..].parse::<f32>().ok().map(DecayFunction::StepFunction)
33            }
34            s if s.starts_with("power_law:") => {
35                s[10..].parse::<f32>().ok().map(DecayFunction::PowerLaw)
36            }
37            _ => None,
38        }
39    }
40}
41
42/// Compute effective importance using the specified or default decay curve.
43/// Default (Exponential): `base_importance * e^(-decay_rate * hours) + 0.05 * ln(1 + access_count)`
44pub fn effective_importance(record: &MemoryRecord) -> f32 {
45    let decay_fn = record
46        .decay_function
47        .as_deref()
48        .and_then(DecayFunction::from_str_opt)
49        .unwrap_or(DecayFunction::Exponential);
50    effective_importance_with(record, &decay_fn)
51}
52
53pub fn effective_importance_with(record: &MemoryRecord, decay_fn: &DecayFunction) -> f32 {
54    let decay_rate = record.decay_rate.unwrap_or(0.01);
55    let hours = hours_since_creation(&record.created_at);
56    let access_boost = 0.05 * (1.0 + record.access_count as f32).ln();
57
58    let base = match decay_fn {
59        DecayFunction::Exponential => record.importance * (-decay_rate * hours).exp(),
60        DecayFunction::Linear => record.importance * (1.0 - decay_rate * hours).max(0.0),
61        DecayFunction::StepFunction(threshold_hours) => {
62            if hours < *threshold_hours {
63                record.importance
64            } else {
65                0.0
66            }
67        }
68        DecayFunction::PowerLaw(alpha) => {
69            record.importance / (1.0 + decay_rate * hours).powf(*alpha)
70        }
71    };
72
73    (base + access_boost).min(1.0)
74}
75
76fn hours_since_creation(created_at: &str) -> f32 {
77    let now = chrono::Utc::now();
78    match chrono::DateTime::parse_from_rfc3339(created_at) {
79        Ok(dt) => {
80            let age = now - dt.with_timezone(&chrono::Utc);
81            (age.num_seconds() as f32 / 3600.0).max(0.0)
82        }
83        Err(_) => 0.0,
84    }
85}
86
87#[non_exhaustive]
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DecayPassResult {
90    pub archived: usize,
91    pub forgotten: usize,
92    pub total_processed: usize,
93}
94
95impl DecayPassResult {
96    pub fn new(archived: usize, forgotten: usize, total_processed: usize) -> Self {
97        Self {
98            archived,
99            forgotten,
100            total_processed,
101        }
102    }
103}
104
105/// Run a decay pass over all active memories for the given agent.
106/// Memories below `forget_threshold` are marked Forgotten.
107/// Memories below `archive_threshold` (but above forget) are marked Archived.
108pub async fn run_decay_pass(
109    engine: &MnemoEngine,
110    agent_id: &str,
111    archive_threshold: f32,
112    forget_threshold: f32,
113) -> Result<DecayPassResult> {
114    let filter = MemoryFilter {
115        agent_id: Some(agent_id.to_string()),
116        include_deleted: false,
117        ..Default::default()
118    };
119    let memories = engine
120        .storage
121        .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
122        .await?;
123
124    let mut archived = 0;
125    let mut forgotten = 0;
126    let total_processed = memories.len();
127
128    for mut record in memories {
129        if record.consolidation_state == ConsolidationState::Forgotten
130            || record.consolidation_state == ConsolidationState::Archived
131        {
132            continue;
133        }
134
135        let eff = effective_importance(&record);
136
137        if eff < forget_threshold {
138            record.consolidation_state = ConsolidationState::Forgotten;
139            record.updated_at = chrono::Utc::now().to_rfc3339();
140            engine.storage.update_memory(&record).await?;
141            forgotten += 1;
142        } else if eff < archive_threshold {
143            record.consolidation_state = ConsolidationState::Archived;
144            record.updated_at = chrono::Utc::now().to_rfc3339();
145            engine.storage.update_memory(&record).await?;
146            archived += 1;
147        }
148    }
149
150    Ok(DecayPassResult {
151        archived,
152        forgotten,
153        total_processed,
154    })
155}
156
157#[non_exhaustive]
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct ConsolidationResult {
160    pub clusters_found: usize,
161    pub new_memories_created: usize,
162    pub originals_consolidated: usize,
163}
164
165impl ConsolidationResult {
166    pub fn new(
167        clusters_found: usize,
168        new_memories_created: usize,
169        originals_consolidated: usize,
170    ) -> Self {
171        Self {
172            clusters_found,
173            new_memories_created,
174            originals_consolidated,
175        }
176    }
177}
178
179/// Consolidate episodic memories into semantic summaries.
180/// Clusters by tag overlap and creates consolidated semantic memories.
181pub async fn run_consolidation(
182    engine: &MnemoEngine,
183    agent_id: &str,
184    min_cluster_size: usize,
185) -> Result<ConsolidationResult> {
186    let filter = MemoryFilter {
187        agent_id: Some(agent_id.to_string()),
188        memory_type: Some(MemoryType::Episodic),
189        include_deleted: false,
190        ..Default::default()
191    };
192    let memories = engine
193        .storage
194        .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
195        .await?;
196
197    // Only consider memories that are Raw or Active
198    let active: Vec<MemoryRecord> = memories
199        .into_iter()
200        .filter(|m| {
201            m.consolidation_state == ConsolidationState::Raw
202                || m.consolidation_state == ConsolidationState::Active
203        })
204        .collect();
205
206    // Cluster by tag overlap: group memories sharing at least one tag
207    let mut clusters: Vec<Vec<&MemoryRecord>> = Vec::new();
208
209    for record in &active {
210        let mut found_cluster = false;
211        for cluster in &mut clusters {
212            // Check if this record shares any tag with any record in cluster
213            if cluster
214                .iter()
215                .any(|c| c.tags.iter().any(|t| record.tags.contains(t)))
216            {
217                cluster.push(record);
218                found_cluster = true;
219                break;
220            }
221        }
222        if !found_cluster {
223            clusters.push(vec![record]);
224        }
225    }
226
227    let mut clusters_found = 0;
228    let mut new_memories_created = 0;
229    let mut originals_consolidated = 0;
230
231    for cluster in &clusters {
232        if cluster.len() < min_cluster_size {
233            continue;
234        }
235        clusters_found += 1;
236
237        // Create a consolidated semantic memory
238        let combined_content: Vec<String> = cluster.iter().map(|m| m.content.clone()).collect();
239        let content = format!(
240            "[Consolidated from {} memories] {}",
241            cluster.len(),
242            combined_content.join(" | ")
243        );
244        let avg_importance =
245            cluster.iter().map(|m| m.importance).sum::<f32>() / cluster.len() as f32;
246        let all_tags: Vec<String> = cluster
247            .iter()
248            .flat_map(|m| m.tags.iter().cloned())
249            .collect::<std::collections::HashSet<String>>()
250            .into_iter()
251            .collect();
252
253        let now = chrono::Utc::now().to_rfc3339();
254        let new_id = Uuid::now_v7();
255        let content_hash = crate::hash::compute_content_hash(&content, agent_id, &now);
256
257        let embedding = engine.embedding.embed(&content).await?;
258
259        let prev_hash_raw = engine
260            .storage
261            .get_latest_memory_hash(agent_id, None)
262            .await
263            .ok()
264            .flatten();
265        let prev_hash = Some(crate::hash::compute_chain_hash(
266            &content_hash,
267            prev_hash_raw.as_deref(),
268        ));
269
270        let new_record = MemoryRecord {
271            id: new_id,
272            agent_id: agent_id.to_string(),
273            content,
274            memory_type: MemoryType::Semantic,
275            scope: cluster[0].scope,
276            importance: avg_importance,
277            tags: all_tags,
278            metadata: serde_json::json!({"consolidated_from": cluster.iter().map(|m| m.id.to_string()).collect::<Vec<_>>()}),
279            embedding: Some(embedding.clone()),
280            content_hash: content_hash.clone(),
281            prev_hash,
282            source_type: SourceType::Consolidation,
283            source_id: None,
284            consolidation_state: ConsolidationState::Active,
285            access_count: 0,
286            org_id: cluster[0].org_id.clone(),
287            thread_id: None,
288            created_at: now.clone(),
289            updated_at: now,
290            last_accessed_at: None,
291            expires_at: None,
292            deleted_at: None,
293            decay_rate: None,
294            created_by: Some("consolidation_engine".to_string()),
295            version: 1,
296            prev_version_id: None,
297            quarantined: false,
298            quarantine_reason: None,
299            decay_function: None,
300        };
301
302        engine.storage.insert_memory(&new_record).await?;
303        engine.index.add(new_id, &embedding)?;
304        if let Some(ref ft) = engine.full_text {
305            ft.add(new_id, &new_record.content)?;
306            ft.commit()?;
307        }
308        new_memories_created += 1;
309
310        // Create relations and mark originals as consolidated
311        for original in cluster {
312            let relation = Relation {
313                id: Uuid::now_v7(),
314                source_id: new_id,
315                target_id: original.id,
316                relation_type: "consolidated_from".to_string(),
317                weight: 1.0,
318                metadata: serde_json::Value::Object(serde_json::Map::new()),
319                created_at: new_record.created_at.clone(),
320            };
321            if let Err(e) = engine.storage.insert_relation(&relation).await {
322                tracing::error!(relation_id = %relation.id, error = %e, "failed to insert consolidation relation");
323            }
324
325            let mut updated = (*original).clone();
326            updated.consolidation_state = ConsolidationState::Consolidated;
327            updated.updated_at = chrono::Utc::now().to_rfc3339();
328            if let Err(e) = engine.storage.update_memory(&updated).await {
329                tracing::error!(memory_id = %updated.id, error = %e, "failed to update consolidation state");
330            }
331            originals_consolidated += 1;
332        }
333    }
334
335    Ok(ConsolidationResult {
336        clusters_found,
337        new_memories_created,
338        originals_consolidated,
339    })
340}
341
342/// Report from a single TTL sweep pass.
343#[non_exhaustive]
344#[derive(Debug, Clone, Serialize, Deserialize)]
345pub struct TtlReport {
346    pub swept_count: usize,
347    pub errors: Vec<TtlError>,
348}
349
350impl TtlReport {
351    pub fn new(swept_count: usize, errors: Vec<TtlError>) -> Self {
352        Self {
353            swept_count,
354            errors,
355        }
356    }
357}
358
359#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct TtlError {
361    pub memory_id: Uuid,
362    pub error: String,
363}
364
365/// Hard-delete every memory whose `expires_at` is in the past.
366///
367/// Each deletion emits an `EventType::MemoryExpired` audit event so the chain
368/// records which memories were purged and when. The function is idempotent
369/// under concurrent callers (storage deletes of an already-absent row surface
370/// as a storage error and are reported, not retried).
371pub async fn run_ttl_sweep(engine: &MnemoEngine) -> Result<TtlReport> {
372    let filter = MemoryFilter {
373        include_deleted: false,
374        ..Default::default()
375    };
376    let memories = engine
377        .storage
378        .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
379        .await?;
380
381    let now = chrono::Utc::now();
382    let now_str = now.to_rfc3339();
383    let mut swept_count = 0;
384    let mut errors = Vec::new();
385
386    for record in memories {
387        let Some(ref expires_at) = record.expires_at else {
388            continue;
389        };
390        let Ok(exp) = chrono::DateTime::parse_from_rfc3339(expires_at) else {
391            continue;
392        };
393        if exp > now {
394            continue;
395        }
396
397        match engine.storage.hard_delete_memory(record.id).await {
398            Ok(()) => {
399                if let Err(e) = engine.index.remove(record.id) {
400                    tracing::warn!(memory_id = %record.id, error = %e, "ttl sweep: vector index remove failed");
401                }
402                if let Some(ref ft) = engine.full_text {
403                    if let Err(e) = ft.remove(record.id) {
404                        tracing::warn!(memory_id = %record.id, error = %e, "ttl sweep: full-text remove failed");
405                    }
406                    let _ = ft.commit();
407                }
408                if let Some(ref cache) = engine.cache {
409                    cache.invalidate(record.id);
410                }
411                emit_expiry_event(engine, &record, &now_str).await;
412                swept_count += 1;
413            }
414            Err(e) => errors.push(TtlError {
415                memory_id: record.id,
416                error: e.to_string(),
417            }),
418        }
419    }
420
421    Ok(TtlReport {
422        swept_count,
423        errors,
424    })
425}
426
427async fn emit_expiry_event(engine: &MnemoEngine, record: &MemoryRecord, now_str: &str) {
428    let event_content_hash =
429        compute_content_hash(&record.id.to_string(), &record.agent_id, now_str);
430    let prev_event_hash = match engine
431        .storage
432        .get_latest_event_hash(&record.agent_id, None)
433        .await
434    {
435        Ok(hash) => hash,
436        Err(e) => {
437            tracing::warn!(error = %e, "ttl sweep: failed to read prev event hash, starting new chain segment");
438            None
439        }
440    };
441    let event_prev_hash = Some(crate::hash::compute_chain_hash(
442        &event_content_hash,
443        prev_event_hash.as_deref(),
444    ));
445
446    let event = AgentEvent {
447        id: Uuid::now_v7(),
448        agent_id: record.agent_id.clone(),
449        thread_id: None,
450        run_id: None,
451        parent_event_id: None,
452        event_type: EventType::MemoryExpired,
453        payload: serde_json::json!({
454            "memory_id": record.id.to_string(),
455            "expired_at": record.expires_at.clone(),
456        }),
457        trace_id: None,
458        span_id: None,
459        model: None,
460        tokens_input: None,
461        tokens_output: None,
462        latency_ms: None,
463        cost_usd: None,
464        timestamp: now_str.to_string(),
465        logical_clock: 0,
466        content_hash: event_content_hash,
467        prev_hash: event_prev_hash,
468        embedding: None,
469    };
470    if let Err(e) = engine.storage.insert_event(&event).await {
471        tracing::error!(event_id = %event.id, error = %e, "ttl sweep: failed to insert MemoryExpired event");
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478    use crate::model::memory::*;
479
480    #[test]
481    fn test_effective_importance_decay() {
482        // Fresh memory with high importance
483        let now = chrono::Utc::now().to_rfc3339();
484        let record = MemoryRecord {
485            id: Uuid::now_v7(),
486            agent_id: "agent-1".to_string(),
487            content: "test".to_string(),
488            memory_type: MemoryType::Episodic,
489            scope: Scope::Private,
490            importance: 0.8,
491            tags: vec![],
492            metadata: serde_json::json!({}),
493            embedding: None,
494            content_hash: vec![],
495            prev_hash: None,
496            source_type: SourceType::Agent,
497            source_id: None,
498            consolidation_state: ConsolidationState::Raw,
499            access_count: 0,
500            org_id: None,
501            thread_id: None,
502            created_at: now,
503            updated_at: "2025-01-01T00:00:00Z".to_string(),
504            last_accessed_at: None,
505            expires_at: None,
506            deleted_at: None,
507            decay_rate: Some(0.01),
508            created_by: None,
509            version: 1,
510            prev_version_id: None,
511            quarantined: false,
512            quarantine_reason: None,
513            decay_function: None,
514        };
515
516        let eff = effective_importance(&record);
517        // Fresh memory should be close to base importance
518        assert!(
519            eff > 0.7,
520            "effective importance {eff} should be > 0.7 for fresh memory"
521        );
522
523        // Old memory with high decay rate
524        let old_date = (chrono::Utc::now() - chrono::Duration::hours(1000)).to_rfc3339();
525        let old_record = MemoryRecord {
526            created_at: old_date,
527            decay_rate: Some(0.01),
528            access_count: 0,
529            ..record.clone()
530        };
531        let old_eff = effective_importance(&old_record);
532        assert!(
533            old_eff < eff,
534            "old memory {old_eff} should have lower importance than fresh {eff}"
535        );
536
537        // Access count boosts importance
538        let accessed_record = MemoryRecord {
539            access_count: 100,
540            ..old_record.clone()
541        };
542        let accessed_eff = effective_importance(&accessed_record);
543        assert!(
544            accessed_eff > old_eff,
545            "accessed memory {accessed_eff} should be higher than unaccessed {old_eff}"
546        );
547    }
548}