1use serde::{Deserialize, Serialize};
2use uuid::Uuid;
3
4use crate::error::Result;
5use crate::hash::compute_content_hash;
6use crate::model::event::{AgentEvent, EventType};
7use crate::model::memory::{ConsolidationState, MemoryRecord, MemoryType, SourceType};
8use crate::model::relation::Relation;
9use crate::query::MnemoEngine;
10use crate::storage::MemoryFilter;
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
14#[serde(rename_all = "snake_case")]
15pub enum DecayFunction {
16 Exponential,
18 Linear,
20 StepFunction(f32),
22 PowerLaw(f32),
24}
25
26impl DecayFunction {
27 pub fn from_str_opt(s: &str) -> Option<Self> {
28 match s {
29 "exponential" => Some(DecayFunction::Exponential),
30 "linear" => Some(DecayFunction::Linear),
31 s if s.starts_with("step:") => {
32 s[5..].parse::<f32>().ok().map(DecayFunction::StepFunction)
33 }
34 s if s.starts_with("power_law:") => {
35 s[10..].parse::<f32>().ok().map(DecayFunction::PowerLaw)
36 }
37 _ => None,
38 }
39 }
40}
41
42pub fn effective_importance(record: &MemoryRecord) -> f32 {
45 let decay_fn = record
46 .decay_function
47 .as_deref()
48 .and_then(DecayFunction::from_str_opt)
49 .unwrap_or(DecayFunction::Exponential);
50 effective_importance_with(record, &decay_fn)
51}
52
53pub fn effective_importance_with(record: &MemoryRecord, decay_fn: &DecayFunction) -> f32 {
54 let decay_rate = record.decay_rate.unwrap_or(0.01);
55 let hours = hours_since_creation(&record.created_at);
56 let access_boost = 0.05 * (1.0 + record.access_count as f32).ln();
57
58 let base = match decay_fn {
59 DecayFunction::Exponential => record.importance * (-decay_rate * hours).exp(),
60 DecayFunction::Linear => record.importance * (1.0 - decay_rate * hours).max(0.0),
61 DecayFunction::StepFunction(threshold_hours) => {
62 if hours < *threshold_hours {
63 record.importance
64 } else {
65 0.0
66 }
67 }
68 DecayFunction::PowerLaw(alpha) => {
69 record.importance / (1.0 + decay_rate * hours).powf(*alpha)
70 }
71 };
72
73 (base + access_boost).min(1.0)
74}
75
76fn hours_since_creation(created_at: &str) -> f32 {
77 let now = chrono::Utc::now();
78 match chrono::DateTime::parse_from_rfc3339(created_at) {
79 Ok(dt) => {
80 let age = now - dt.with_timezone(&chrono::Utc);
81 (age.num_seconds() as f32 / 3600.0).max(0.0)
82 }
83 Err(_) => 0.0,
84 }
85}
86
87#[non_exhaustive]
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DecayPassResult {
90 pub archived: usize,
91 pub forgotten: usize,
92 pub total_processed: usize,
93}
94
95impl DecayPassResult {
96 pub fn new(archived: usize, forgotten: usize, total_processed: usize) -> Self {
97 Self {
98 archived,
99 forgotten,
100 total_processed,
101 }
102 }
103}
104
105pub async fn run_decay_pass(
109 engine: &MnemoEngine,
110 agent_id: &str,
111 archive_threshold: f32,
112 forget_threshold: f32,
113) -> Result<DecayPassResult> {
114 let filter = MemoryFilter {
115 agent_id: Some(agent_id.to_string()),
116 include_deleted: false,
117 ..Default::default()
118 };
119 let memories = engine
120 .storage
121 .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
122 .await?;
123
124 let mut archived = 0;
125 let mut forgotten = 0;
126 let total_processed = memories.len();
127
128 for mut record in memories {
129 if record.consolidation_state == ConsolidationState::Forgotten
130 || record.consolidation_state == ConsolidationState::Archived
131 {
132 continue;
133 }
134
135 let eff = effective_importance(&record);
136
137 if eff < forget_threshold {
138 record.consolidation_state = ConsolidationState::Forgotten;
139 record.updated_at = chrono::Utc::now().to_rfc3339();
140 engine.storage.update_memory(&record).await?;
141 forgotten += 1;
142 } else if eff < archive_threshold {
143 record.consolidation_state = ConsolidationState::Archived;
144 record.updated_at = chrono::Utc::now().to_rfc3339();
145 engine.storage.update_memory(&record).await?;
146 archived += 1;
147 }
148 }
149
150 Ok(DecayPassResult {
151 archived,
152 forgotten,
153 total_processed,
154 })
155}
156
157#[non_exhaustive]
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct ConsolidationResult {
160 pub clusters_found: usize,
161 pub new_memories_created: usize,
162 pub originals_consolidated: usize,
163}
164
165impl ConsolidationResult {
166 pub fn new(
167 clusters_found: usize,
168 new_memories_created: usize,
169 originals_consolidated: usize,
170 ) -> Self {
171 Self {
172 clusters_found,
173 new_memories_created,
174 originals_consolidated,
175 }
176 }
177}
178
179pub async fn run_consolidation(
182 engine: &MnemoEngine,
183 agent_id: &str,
184 min_cluster_size: usize,
185) -> Result<ConsolidationResult> {
186 let filter = MemoryFilter {
187 agent_id: Some(agent_id.to_string()),
188 memory_type: Some(MemoryType::Episodic),
189 include_deleted: false,
190 ..Default::default()
191 };
192 let memories = engine
193 .storage
194 .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
195 .await?;
196
197 let active: Vec<MemoryRecord> = memories
199 .into_iter()
200 .filter(|m| {
201 m.consolidation_state == ConsolidationState::Raw
202 || m.consolidation_state == ConsolidationState::Active
203 })
204 .collect();
205
206 let mut clusters: Vec<Vec<&MemoryRecord>> = Vec::new();
208
209 for record in &active {
210 let mut found_cluster = false;
211 for cluster in &mut clusters {
212 if cluster
214 .iter()
215 .any(|c| c.tags.iter().any(|t| record.tags.contains(t)))
216 {
217 cluster.push(record);
218 found_cluster = true;
219 break;
220 }
221 }
222 if !found_cluster {
223 clusters.push(vec![record]);
224 }
225 }
226
227 let mut clusters_found = 0;
228 let mut new_memories_created = 0;
229 let mut originals_consolidated = 0;
230
231 for cluster in &clusters {
232 if cluster.len() < min_cluster_size {
233 continue;
234 }
235 clusters_found += 1;
236
237 let combined_content: Vec<String> = cluster.iter().map(|m| m.content.clone()).collect();
239 let content = format!(
240 "[Consolidated from {} memories] {}",
241 cluster.len(),
242 combined_content.join(" | ")
243 );
244 let avg_importance =
245 cluster.iter().map(|m| m.importance).sum::<f32>() / cluster.len() as f32;
246 let all_tags: Vec<String> = cluster
247 .iter()
248 .flat_map(|m| m.tags.iter().cloned())
249 .collect::<std::collections::HashSet<String>>()
250 .into_iter()
251 .collect();
252
253 let now = chrono::Utc::now().to_rfc3339();
254 let new_id = Uuid::now_v7();
255 let content_hash = crate::hash::compute_content_hash(&content, agent_id, &now);
256
257 let embedding = engine.embedding.embed(&content).await?;
258
259 let prev_hash_raw = engine
260 .storage
261 .get_latest_memory_hash(agent_id, None)
262 .await
263 .ok()
264 .flatten();
265 let prev_hash = Some(crate::hash::compute_chain_hash(
266 &content_hash,
267 prev_hash_raw.as_deref(),
268 ));
269
270 let new_record = MemoryRecord {
271 id: new_id,
272 agent_id: agent_id.to_string(),
273 content,
274 memory_type: MemoryType::Semantic,
275 scope: cluster[0].scope,
276 importance: avg_importance,
277 tags: all_tags,
278 metadata: serde_json::json!({"consolidated_from": cluster.iter().map(|m| m.id.to_string()).collect::<Vec<_>>()}),
279 embedding: Some(embedding.clone()),
280 content_hash: content_hash.clone(),
281 prev_hash,
282 source_type: SourceType::Consolidation,
283 source_id: None,
284 consolidation_state: ConsolidationState::Active,
285 access_count: 0,
286 org_id: cluster[0].org_id.clone(),
287 thread_id: None,
288 created_at: now.clone(),
289 updated_at: now,
290 last_accessed_at: None,
291 expires_at: None,
292 deleted_at: None,
293 decay_rate: None,
294 created_by: Some("consolidation_engine".to_string()),
295 version: 1,
296 prev_version_id: None,
297 quarantined: false,
298 quarantine_reason: None,
299 decay_function: None,
300 };
301
302 engine.storage.insert_memory(&new_record).await?;
303 engine.index.add(new_id, &embedding)?;
304 if let Some(ref ft) = engine.full_text {
305 ft.add(new_id, &new_record.content)?;
306 ft.commit()?;
307 }
308 new_memories_created += 1;
309
310 for original in cluster {
312 let relation = Relation {
313 id: Uuid::now_v7(),
314 source_id: new_id,
315 target_id: original.id,
316 relation_type: "consolidated_from".to_string(),
317 weight: 1.0,
318 metadata: serde_json::Value::Object(serde_json::Map::new()),
319 created_at: new_record.created_at.clone(),
320 };
321 if let Err(e) = engine.storage.insert_relation(&relation).await {
322 tracing::error!(relation_id = %relation.id, error = %e, "failed to insert consolidation relation");
323 }
324
325 let mut updated = (*original).clone();
326 updated.consolidation_state = ConsolidationState::Consolidated;
327 updated.updated_at = chrono::Utc::now().to_rfc3339();
328 if let Err(e) = engine.storage.update_memory(&updated).await {
329 tracing::error!(memory_id = %updated.id, error = %e, "failed to update consolidation state");
330 }
331 originals_consolidated += 1;
332 }
333 }
334
335 Ok(ConsolidationResult {
336 clusters_found,
337 new_memories_created,
338 originals_consolidated,
339 })
340}
341
342#[non_exhaustive]
344#[derive(Debug, Clone, Serialize, Deserialize)]
345pub struct TtlReport {
346 pub swept_count: usize,
347 pub errors: Vec<TtlError>,
348}
349
350impl TtlReport {
351 pub fn new(swept_count: usize, errors: Vec<TtlError>) -> Self {
352 Self {
353 swept_count,
354 errors,
355 }
356 }
357}
358
359#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct TtlError {
361 pub memory_id: Uuid,
362 pub error: String,
363}
364
365pub async fn run_ttl_sweep(engine: &MnemoEngine) -> Result<TtlReport> {
372 let filter = MemoryFilter {
373 include_deleted: false,
374 ..Default::default()
375 };
376 let memories = engine
377 .storage
378 .list_memories(&filter, super::MAX_BATCH_QUERY_LIMIT, 0)
379 .await?;
380
381 let now = chrono::Utc::now();
382 let now_str = now.to_rfc3339();
383 let mut swept_count = 0;
384 let mut errors = Vec::new();
385
386 for record in memories {
387 let Some(ref expires_at) = record.expires_at else {
388 continue;
389 };
390 let Ok(exp) = chrono::DateTime::parse_from_rfc3339(expires_at) else {
391 continue;
392 };
393 if exp > now {
394 continue;
395 }
396
397 match engine.storage.hard_delete_memory(record.id).await {
398 Ok(()) => {
399 if let Err(e) = engine.index.remove(record.id) {
400 tracing::warn!(memory_id = %record.id, error = %e, "ttl sweep: vector index remove failed");
401 }
402 if let Some(ref ft) = engine.full_text {
403 if let Err(e) = ft.remove(record.id) {
404 tracing::warn!(memory_id = %record.id, error = %e, "ttl sweep: full-text remove failed");
405 }
406 let _ = ft.commit();
407 }
408 if let Some(ref cache) = engine.cache {
409 cache.invalidate(record.id);
410 }
411 emit_expiry_event(engine, &record, &now_str).await;
412 swept_count += 1;
413 }
414 Err(e) => errors.push(TtlError {
415 memory_id: record.id,
416 error: e.to_string(),
417 }),
418 }
419 }
420
421 Ok(TtlReport {
422 swept_count,
423 errors,
424 })
425}
426
427async fn emit_expiry_event(engine: &MnemoEngine, record: &MemoryRecord, now_str: &str) {
428 let event_content_hash =
429 compute_content_hash(&record.id.to_string(), &record.agent_id, now_str);
430 let prev_event_hash = match engine
431 .storage
432 .get_latest_event_hash(&record.agent_id, None)
433 .await
434 {
435 Ok(hash) => hash,
436 Err(e) => {
437 tracing::warn!(error = %e, "ttl sweep: failed to read prev event hash, starting new chain segment");
438 None
439 }
440 };
441 let event_prev_hash = Some(crate::hash::compute_chain_hash(
442 &event_content_hash,
443 prev_event_hash.as_deref(),
444 ));
445
446 let event = AgentEvent {
447 id: Uuid::now_v7(),
448 agent_id: record.agent_id.clone(),
449 thread_id: None,
450 run_id: None,
451 parent_event_id: None,
452 event_type: EventType::MemoryExpired,
453 payload: serde_json::json!({
454 "memory_id": record.id.to_string(),
455 "expired_at": record.expires_at.clone(),
456 }),
457 trace_id: None,
458 span_id: None,
459 model: None,
460 tokens_input: None,
461 tokens_output: None,
462 latency_ms: None,
463 cost_usd: None,
464 timestamp: now_str.to_string(),
465 logical_clock: 0,
466 content_hash: event_content_hash,
467 prev_hash: event_prev_hash,
468 embedding: None,
469 };
470 if let Err(e) = engine.storage.insert_event(&event).await {
471 tracing::error!(event_id = %event.id, error = %e, "ttl sweep: failed to insert MemoryExpired event");
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478 use crate::model::memory::*;
479
480 #[test]
481 fn test_effective_importance_decay() {
482 let now = chrono::Utc::now().to_rfc3339();
484 let record = MemoryRecord {
485 id: Uuid::now_v7(),
486 agent_id: "agent-1".to_string(),
487 content: "test".to_string(),
488 memory_type: MemoryType::Episodic,
489 scope: Scope::Private,
490 importance: 0.8,
491 tags: vec![],
492 metadata: serde_json::json!({}),
493 embedding: None,
494 content_hash: vec![],
495 prev_hash: None,
496 source_type: SourceType::Agent,
497 source_id: None,
498 consolidation_state: ConsolidationState::Raw,
499 access_count: 0,
500 org_id: None,
501 thread_id: None,
502 created_at: now,
503 updated_at: "2025-01-01T00:00:00Z".to_string(),
504 last_accessed_at: None,
505 expires_at: None,
506 deleted_at: None,
507 decay_rate: Some(0.01),
508 created_by: None,
509 version: 1,
510 prev_version_id: None,
511 quarantined: false,
512 quarantine_reason: None,
513 decay_function: None,
514 };
515
516 let eff = effective_importance(&record);
517 assert!(
519 eff > 0.7,
520 "effective importance {eff} should be > 0.7 for fresh memory"
521 );
522
523 let old_date = (chrono::Utc::now() - chrono::Duration::hours(1000)).to_rfc3339();
525 let old_record = MemoryRecord {
526 created_at: old_date,
527 decay_rate: Some(0.01),
528 access_count: 0,
529 ..record.clone()
530 };
531 let old_eff = effective_importance(&old_record);
532 assert!(
533 old_eff < eff,
534 "old memory {old_eff} should have lower importance than fresh {eff}"
535 );
536
537 let accessed_record = MemoryRecord {
539 access_count: 100,
540 ..old_record.clone()
541 };
542 let accessed_eff = effective_importance(&accessed_record);
543 assert!(
544 accessed_eff > old_eff,
545 "accessed memory {accessed_eff} should be higher than unaccessed {old_eff}"
546 );
547 }
548}