Skip to main content

engine/
decay.rs

1//! Importance Decay Engine for Dakera AI Agent Memory Platform.
2//!
3//! Background task that periodically decays memory importance scores
4//! based on configurable strategies: Exponential, Linear, or StepFunction.
5//!
6//! On every recall hit, importance gets an access boost.
7//! Memories below `min_importance` are auto-deleted.
8
9use std::sync::Arc;
10
11use common::{DecayConfig, DecayStrategy, Memory, MemoryType, Vector};
12use serde::{Deserialize, Serialize};
13use storage::VectorStorage;
14use tracing;
15
16/// Importance Decay Engine that runs as a background task.
17pub struct DecayEngine {
18    pub config: DecayConfig,
19}
20
21/// Configuration loaded from environment variables.
22pub struct DecayEngineConfig {
23    /// Decay configuration (strategy, half_life, min_importance)
24    pub decay_config: DecayConfig,
25    /// How often to run decay (in seconds)
26    pub interval_secs: u64,
27}
28
29impl Default for DecayEngineConfig {
30    fn default() -> Self {
31        Self {
32            decay_config: DecayConfig {
33                strategy: DecayStrategy::Exponential,
34                half_life_hours: 168.0, // 1 week
35                min_importance: 0.01,
36            },
37            interval_secs: 3600, // 1 hour
38        }
39    }
40}
41
42impl DecayEngineConfig {
43    /// Load configuration from environment variables.
44    pub fn from_env() -> Self {
45        let half_life_hours: f64 = std::env::var("DAKERA_DECAY_HALF_LIFE_HOURS")
46            .ok()
47            .and_then(|v| v.parse().ok())
48            .unwrap_or(168.0);
49
50        let min_importance: f32 = std::env::var("DAKERA_DECAY_MIN_IMPORTANCE")
51            .ok()
52            .and_then(|v| v.parse().ok())
53            .unwrap_or(0.01);
54
55        let interval_secs: u64 = std::env::var("DAKERA_DECAY_INTERVAL_SECS")
56            .ok()
57            .and_then(|v| v.parse().ok())
58            .unwrap_or(3600);
59
60        let strategy_str =
61            std::env::var("DAKERA_DECAY_STRATEGY").unwrap_or_else(|_| "exponential".to_string());
62
63        let strategy = match strategy_str.to_lowercase().as_str() {
64            "linear" => DecayStrategy::Linear,
65            "step" | "stepfunction" | "step_function" => DecayStrategy::StepFunction,
66            _ => DecayStrategy::Exponential,
67        };
68
69        Self {
70            decay_config: DecayConfig {
71                strategy,
72                half_life_hours,
73                min_importance,
74            },
75            interval_secs,
76        }
77    }
78}
79
80impl DecayEngine {
81    /// Create a new DecayEngine with the given configuration.
82    pub fn new(config: DecayConfig) -> Self {
83        Self { config }
84    }
85
86    /// Calculate decayed importance for a single memory.
87    ///
88    /// Memory type determines base decay speed:
89    /// - Working:    3× faster (temporary by design)
90    /// - Episodic:   1× normal (events fade naturally)
91    /// - Semantic:   0.5× slower (knowledge persists)
92    /// - Procedural: 0.3× slower (skills are durable)
93    ///
94    /// Usage pattern shields from decay (diminishing returns).
95    /// Never-accessed memories fade 50% faster.
96    pub fn calculate_decay(
97        &self,
98        current_importance: f32,
99        hours_elapsed: f64,
100        memory_type: &MemoryType,
101        access_count: u32,
102    ) -> f32 {
103        if hours_elapsed <= 0.0 {
104            return current_importance;
105        }
106
107        // Memory type determines base decay speed
108        let type_multiplier = match memory_type {
109            MemoryType::Working => 3.0,
110            MemoryType::Episodic => 1.0,
111            MemoryType::Semantic => 0.5,
112            MemoryType::Procedural => 0.3,
113        };
114
115        // Usage pattern shields from decay (diminishing returns)
116        let usage_shield = if access_count > 0 {
117            1.0 / (1.0 + (access_count as f64 * 0.1))
118        } else {
119            1.5 // never accessed = 50% faster decay
120        };
121
122        let effective_half_life = self.config.half_life_hours / (type_multiplier * usage_shield);
123
124        let decayed = match self.config.strategy {
125            DecayStrategy::Exponential => {
126                let decay_factor = (0.5_f64).powf(hours_elapsed / effective_half_life);
127                current_importance * decay_factor as f32
128            }
129            DecayStrategy::Linear => {
130                let decay_amount = (hours_elapsed / effective_half_life) as f32 * 0.5;
131                (current_importance - decay_amount).max(0.0)
132            }
133            DecayStrategy::StepFunction => {
134                let steps = (hours_elapsed / effective_half_life).floor() as u32;
135                let decay_factor = (0.5_f32).powi(steps as i32);
136                current_importance * decay_factor
137            }
138        };
139
140        decayed.clamp(0.0, 1.0)
141    }
142
143    /// Calculate access boost for a memory that was just recalled.
144    /// Scales with current importance — valuable memories get bigger boosts.
145    pub fn access_boost(current_importance: f32) -> f32 {
146        let boost = 0.05 + 0.05 * current_importance; // 0.05–0.10 range
147        (current_importance + boost).min(1.0)
148    }
149
150    /// Apply decay to all memories across all agent namespaces.
151    ///
152    /// Iterates all namespaces prefixed with `_dakera_agent_`, loads memories,
153    /// applies decay based on time since last access, and removes memories
154    /// below the minimum importance threshold.
155    pub async fn apply_decay(&self, storage: &Arc<dyn VectorStorage>) -> DecayResult {
156        let mut result = DecayResult::default();
157
158        // List all namespaces
159        let namespaces = match storage.list_namespaces().await {
160            Ok(ns) => ns,
161            Err(e) => {
162                tracing::error!(error = %e, "Failed to list namespaces for decay");
163                return result;
164            }
165        };
166
167        let now = std::time::SystemTime::now()
168            .duration_since(std::time::UNIX_EPOCH)
169            .unwrap_or_default()
170            .as_secs();
171
172        // Only process Dakera agent namespaces
173        for namespace in namespaces {
174            if !namespace.starts_with("_dakera_agent_") {
175                continue;
176            }
177
178            result.namespaces_processed += 1;
179
180            let vectors = match storage.get_all(&namespace).await {
181                Ok(v) => v,
182                Err(e) => {
183                    tracing::warn!(
184                        namespace = %namespace,
185                        error = %e,
186                        "Failed to get vectors for decay"
187                    );
188                    continue;
189                }
190            };
191
192            let mut updated_vectors: Vec<Vector> = Vec::new();
193            let mut ids_to_delete: Vec<String> = Vec::new();
194
195            for vector in &vectors {
196                let memory = match Memory::from_vector(vector) {
197                    Some(m) => m,
198                    None => continue, // Skip non-memory vectors
199                };
200
201                result.memories_processed += 1;
202
203                // Calculate hours since last access
204                let hours_elapsed = if now > memory.last_accessed_at {
205                    (now - memory.last_accessed_at) as f64 / 3600.0
206                } else {
207                    0.0
208                };
209
210                let new_importance = self.calculate_decay(
211                    memory.importance,
212                    hours_elapsed,
213                    &memory.memory_type,
214                    memory.access_count,
215                );
216
217                // Check if below minimum importance
218                if new_importance < self.config.min_importance {
219                    ids_to_delete.push(memory.id.clone());
220                    result.memories_deleted += 1;
221                    continue;
222                }
223
224                // Only update if importance actually changed
225                if (new_importance - memory.importance).abs() > 0.001 {
226                    let mut updated_memory = memory;
227                    updated_memory.importance = new_importance;
228
229                    // Rebuild vector with updated metadata but same embedding
230                    let mut updated_vector = vector.clone();
231                    updated_vector.metadata = Some(updated_memory.to_vector_metadata());
232                    updated_vectors.push(updated_vector);
233                    result.memories_decayed += 1;
234                }
235            }
236
237            // Delete memories below threshold
238            if !ids_to_delete.is_empty() {
239                if let Err(e) = storage.delete(&namespace, &ids_to_delete).await {
240                    tracing::warn!(
241                        namespace = %namespace,
242                        count = ids_to_delete.len(),
243                        error = %e,
244                        "Failed to delete expired memories"
245                    );
246                }
247            }
248
249            // Upsert updated memories
250            if !updated_vectors.is_empty() {
251                if let Err(e) = storage.upsert(&namespace, updated_vectors).await {
252                    tracing::warn!(
253                        namespace = %namespace,
254                        error = %e,
255                        "Failed to upsert decayed memories"
256                    );
257                }
258            }
259        }
260
261        tracing::info!(
262            namespaces_processed = result.namespaces_processed,
263            memories_processed = result.memories_processed,
264            memories_decayed = result.memories_decayed,
265            memories_deleted = result.memories_deleted,
266            "Decay cycle completed"
267        );
268
269        result
270    }
271
272    /// Spawn the decay engine as a background tokio task.
273    pub fn spawn(
274        config: DecayEngineConfig,
275        storage: Arc<dyn VectorStorage>,
276        metrics: Arc<BackgroundMetrics>,
277    ) -> tokio::task::JoinHandle<()> {
278        let engine = DecayEngine::new(config.decay_config);
279        let interval = std::time::Duration::from_secs(config.interval_secs);
280
281        tokio::spawn(async move {
282            tracing::info!(
283                strategy = ?engine.config.strategy,
284                half_life_hours = engine.config.half_life_hours,
285                min_importance = engine.config.min_importance,
286                interval_secs = interval.as_secs(),
287                "Decay engine started"
288            );
289
290            loop {
291                tokio::time::sleep(interval).await;
292                let result = engine.apply_decay(&storage).await;
293                metrics.record_decay(&result);
294            }
295        })
296    }
297}
298
299/// Result of a decay cycle.
300#[derive(Debug, Default, Clone, Serialize, Deserialize)]
301pub struct DecayResult {
302    pub namespaces_processed: usize,
303    pub memories_processed: usize,
304    pub memories_decayed: usize,
305    pub memories_deleted: usize,
306}
307
308/// Shared metrics for background activity tracking.
309///
310/// Updated by decay/autopilot spawn loops so the API can expose
311/// what background jobs are doing to user memories.
312/// Persisted to storage so metrics survive restarts.
313#[derive(Debug, Default)]
314pub struct BackgroundMetrics {
315    inner: std::sync::Mutex<BackgroundMetricsInner>,
316    /// Flag: dirty since last persist
317    dirty: std::sync::atomic::AtomicBool,
318}
319
320/// Max history data points kept (7 days at 1-hour decay interval).
321const MAX_HISTORY_POINTS: usize = 168;
322
323#[derive(Debug, Default, Clone, Serialize, Deserialize)]
324pub struct BackgroundMetricsInner {
325    /// Last decay cycle result
326    #[serde(default)]
327    pub last_decay: Option<DecayResult>,
328    /// Timestamp of last decay run (unix secs)
329    #[serde(default)]
330    pub last_decay_at: Option<u64>,
331    /// Cumulative memories deleted by decay
332    #[serde(default)]
333    pub total_decay_deleted: u64,
334    /// Cumulative memories decayed (importance lowered)
335    #[serde(default)]
336    pub total_decay_adjusted: u64,
337
338    /// Last dedup cycle result
339    #[serde(default)]
340    pub last_dedup: Option<DedupResultSnapshot>,
341    /// Timestamp of last dedup run (unix secs)
342    #[serde(default)]
343    pub last_dedup_at: Option<u64>,
344    /// Cumulative duplicates removed
345    #[serde(default)]
346    pub total_dedup_removed: u64,
347
348    /// Last consolidation cycle result
349    #[serde(default)]
350    pub last_consolidation: Option<ConsolidationResultSnapshot>,
351    /// Timestamp of last consolidation run (unix secs)
352    #[serde(default)]
353    pub last_consolidation_at: Option<u64>,
354    /// Cumulative memories consolidated
355    #[serde(default)]
356    pub total_consolidated: u64,
357
358    /// Historical data points for graphing (ring buffer, newest last)
359    #[serde(default)]
360    pub history: Vec<ActivityHistoryPoint>,
361}
362
363/// A single historical data point for the activity timeline graph.
364#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct ActivityHistoryPoint {
366    /// Unix timestamp (seconds)
367    pub timestamp: u64,
368    /// Memories deleted by decay in this cycle
369    pub decay_deleted: u64,
370    /// Memories adjusted by decay in this cycle
371    pub decay_adjusted: u64,
372    /// Duplicates removed in this cycle
373    pub dedup_removed: u64,
374    /// Memories consolidated in this cycle
375    pub consolidated: u64,
376}
377
378/// Serializable snapshot of a dedup result (avoids coupling to autopilot module).
379#[derive(Debug, Default, Clone, Serialize, Deserialize)]
380pub struct DedupResultSnapshot {
381    pub namespaces_processed: usize,
382    pub memories_scanned: usize,
383    pub duplicates_removed: usize,
384}
385
386/// Serializable snapshot of a consolidation result.
387#[derive(Debug, Default, Clone, Serialize, Deserialize)]
388pub struct ConsolidationResultSnapshot {
389    pub namespaces_processed: usize,
390    pub memories_scanned: usize,
391    pub clusters_merged: usize,
392    pub memories_consolidated: usize,
393}
394
395impl BackgroundMetrics {
396    pub fn new() -> Self {
397        Self::default()
398    }
399
400    /// Restore metrics from a previously persisted snapshot.
401    pub fn restore(inner: BackgroundMetricsInner) -> Self {
402        Self {
403            inner: std::sync::Mutex::new(inner),
404            dirty: std::sync::atomic::AtomicBool::new(false),
405        }
406    }
407
408    /// Whether metrics changed since last persist.
409    pub fn is_dirty(&self) -> bool {
410        self.dirty.load(std::sync::atomic::Ordering::Relaxed)
411    }
412
413    /// Clear the dirty flag after a successful persist.
414    pub fn clear_dirty(&self) {
415        self.dirty
416            .store(false, std::sync::atomic::Ordering::Relaxed);
417    }
418
419    /// Record a decay cycle result.
420    pub fn record_decay(&self, result: &DecayResult) {
421        let now = std::time::SystemTime::now()
422            .duration_since(std::time::UNIX_EPOCH)
423            .unwrap_or_default()
424            .as_secs();
425        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
426        inner.total_decay_deleted += result.memories_deleted as u64;
427        inner.total_decay_adjusted += result.memories_decayed as u64;
428        inner.last_decay = Some(result.clone());
429        inner.last_decay_at = Some(now);
430        // Append history point
431        push_history(
432            &mut inner.history,
433            ActivityHistoryPoint {
434                timestamp: now,
435                decay_deleted: result.memories_deleted as u64,
436                decay_adjusted: result.memories_decayed as u64,
437                dedup_removed: 0,
438                consolidated: 0,
439            },
440        );
441        self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
442    }
443
444    /// Record a dedup cycle result.
445    pub fn record_dedup(
446        &self,
447        namespaces_processed: usize,
448        memories_scanned: usize,
449        duplicates_removed: usize,
450    ) {
451        let now = std::time::SystemTime::now()
452            .duration_since(std::time::UNIX_EPOCH)
453            .unwrap_or_default()
454            .as_secs();
455        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
456        inner.total_dedup_removed += duplicates_removed as u64;
457        inner.last_dedup = Some(DedupResultSnapshot {
458            namespaces_processed,
459            memories_scanned,
460            duplicates_removed,
461        });
462        inner.last_dedup_at = Some(now);
463        push_history(
464            &mut inner.history,
465            ActivityHistoryPoint {
466                timestamp: now,
467                decay_deleted: 0,
468                decay_adjusted: 0,
469                dedup_removed: duplicates_removed as u64,
470                consolidated: 0,
471            },
472        );
473        self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
474    }
475
476    /// Record a consolidation cycle result.
477    pub fn record_consolidation(
478        &self,
479        namespaces_processed: usize,
480        memories_scanned: usize,
481        clusters_merged: usize,
482        memories_consolidated: usize,
483    ) {
484        let now = std::time::SystemTime::now()
485            .duration_since(std::time::UNIX_EPOCH)
486            .unwrap_or_default()
487            .as_secs();
488        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
489        inner.total_consolidated += memories_consolidated as u64;
490        inner.last_consolidation = Some(ConsolidationResultSnapshot {
491            namespaces_processed,
492            memories_scanned,
493            clusters_merged,
494            memories_consolidated,
495        });
496        inner.last_consolidation_at = Some(now);
497        push_history(
498            &mut inner.history,
499            ActivityHistoryPoint {
500                timestamp: now,
501                decay_deleted: 0,
502                decay_adjusted: 0,
503                dedup_removed: 0,
504                consolidated: memories_consolidated as u64,
505            },
506        );
507        self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
508    }
509
510    /// Replace the inner state with a restored snapshot (used on startup).
511    pub fn restore_into(&self, restored: BackgroundMetricsInner) {
512        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
513        *inner = restored;
514        // Don't set dirty — this was just loaded from storage
515    }
516
517    /// Get a snapshot of all metrics for API response.
518    pub fn snapshot(&self) -> BackgroundMetricsInner {
519        self.inner.lock().unwrap_or_else(|e| e.into_inner()).clone()
520    }
521}
522
523/// Append a history point, capping at MAX_HISTORY_POINTS.
524fn push_history(history: &mut Vec<ActivityHistoryPoint>, point: ActivityHistoryPoint) {
525    history.push(point);
526    if history.len() > MAX_HISTORY_POINTS {
527        let excess = history.len() - MAX_HISTORY_POINTS;
528        history.drain(..excess);
529    }
530}
531
532#[cfg(test)]
533mod tests {
534    use super::*;
535
536    fn make_engine(strategy: DecayStrategy, half_life: f64) -> DecayEngine {
537        DecayEngine::new(DecayConfig {
538            strategy,
539            half_life_hours: half_life,
540            min_importance: 0.01,
541        })
542    }
543
544    // Default args: Episodic type, 0 access count (same as old behavior with multiplier=1.0, shield=1.5)
545    const EPISODIC: MemoryType = MemoryType::Episodic;
546
547    #[test]
548    fn test_exponential_decay_at_half_life_episodic_no_access() {
549        let engine = make_engine(DecayStrategy::Exponential, 168.0);
550        // Effective half-life = 168 / (1.0 * 1.5) = 112h (faster for never-accessed)
551        let result = engine.calculate_decay(1.0, 112.0, &EPISODIC, 0);
552        assert!((result - 0.5).abs() < 0.01, "Expected ~0.5, got {}", result);
553    }
554
555    #[test]
556    fn test_exponential_decay_zero_time() {
557        let engine = make_engine(DecayStrategy::Exponential, 168.0);
558        let result = engine.calculate_decay(0.8, 0.0, &EPISODIC, 0);
559        assert!((result - 0.8).abs() < 0.001);
560    }
561
562    #[test]
563    fn test_linear_decay_floors_at_zero() {
564        let engine = make_engine(DecayStrategy::Linear, 168.0);
565        let result = engine.calculate_decay(0.3, 168.0, &EPISODIC, 0);
566        assert!(result >= 0.0, "Should not go below 0, got {}", result);
567    }
568
569    #[test]
570    fn test_procedural_decays_slower_than_working() {
571        let engine = make_engine(DecayStrategy::Exponential, 168.0);
572        let working = engine.calculate_decay(1.0, 168.0, &MemoryType::Working, 0);
573        let procedural = engine.calculate_decay(1.0, 168.0, &MemoryType::Procedural, 0);
574        assert!(
575            procedural > working,
576            "Procedural ({}) should decay slower than Working ({})",
577            procedural,
578            working
579        );
580    }
581
582    #[test]
583    fn test_high_access_count_decays_slower() {
584        let engine = make_engine(DecayStrategy::Exponential, 168.0);
585        let no_access = engine.calculate_decay(1.0, 168.0, &EPISODIC, 0);
586        let high_access = engine.calculate_decay(1.0, 168.0, &EPISODIC, 10);
587        assert!(
588            high_access > no_access,
589            "High access ({}) should decay slower than no access ({})",
590            high_access,
591            no_access
592        );
593    }
594
595    #[test]
596    fn test_semantic_decays_slower_than_episodic() {
597        let engine = make_engine(DecayStrategy::Exponential, 168.0);
598        let episodic = engine.calculate_decay(1.0, 168.0, &EPISODIC, 5);
599        let semantic = engine.calculate_decay(1.0, 168.0, &MemoryType::Semantic, 5);
600        assert!(
601            semantic > episodic,
602            "Semantic ({}) should decay slower than Episodic ({})",
603            semantic,
604            episodic
605        );
606    }
607
608    #[test]
609    fn test_access_boost_scales_with_importance() {
610        let low = DecayEngine::access_boost(0.2);
611        let high = DecayEngine::access_boost(0.8);
612        let boost_low = low - 0.2;
613        let boost_high = high - 0.8;
614        assert!(
615            boost_high > boost_low,
616            "High-importance boost ({}) should be larger than low-importance boost ({})",
617            boost_high,
618            boost_low
619        );
620        // Verify range: 0.05 + 0.05*importance
621        assert!((boost_low - (0.05 + 0.05 * 0.2)).abs() < 0.001);
622        assert!((boost_high - (0.05 + 0.05 * 0.8)).abs() < 0.001);
623    }
624
625    #[test]
626    fn test_access_boost_caps_at_one() {
627        assert!((DecayEngine::access_boost(1.0) - 1.0).abs() < 0.001);
628        assert!((DecayEngine::access_boost(0.96) - 1.0).abs() < 0.001);
629    }
630
631    #[test]
632    fn test_decay_clamps_to_range() {
633        let engine = make_engine(DecayStrategy::Exponential, 1.0);
634        let result = engine.calculate_decay(0.001, 100.0, &EPISODIC, 0);
635        assert!(result >= 0.0 && result <= 1.0);
636    }
637
638    #[test]
639    fn test_step_function_decay() {
640        let engine = make_engine(DecayStrategy::StepFunction, 168.0);
641        // Effective half-life for Episodic+0 access = 168/1.5 = 112h
642        let eff_hl = 168.0 / 1.5;
643
644        // Before first step - no decay
645        let result = engine.calculate_decay(1.0, eff_hl * 0.5, &EPISODIC, 0);
646        assert!((result - 1.0).abs() < 0.001);
647
648        // At first step
649        let result = engine.calculate_decay(1.0, eff_hl, &EPISODIC, 0);
650        assert!((result - 0.5).abs() < 0.001);
651    }
652}