Skip to main content

engine/
decay.rs

1//! Importance Decay Engine for Dakera AI Agent Memory Platform.
2//!
3//! Background task that periodically decays memory importance scores
4//! based on configurable strategies: Exponential, Linear, or StepFunction.
5//!
6//! On every recall hit, importance gets an access boost.
7//! Memories below `min_importance` are auto-deleted.
8
9use std::sync::Arc;
10
11use common::{DecayConfig, DecayStrategy, Memory, MemoryType, Vector};
12use serde::{Deserialize, Serialize};
13use storage::VectorStorage;
14use tokio::sync::RwLock;
15use tracing;
16
17/// Importance Decay Engine that runs as a background task.
18pub struct DecayEngine {
19    pub config: DecayConfig,
20}
21
22/// Configuration loaded from environment variables.
23pub struct DecayEngineConfig {
24    /// Decay configuration (strategy, half_life, min_importance)
25    pub decay_config: DecayConfig,
26    /// How often to run decay (in seconds)
27    pub interval_secs: u64,
28}
29
30impl Default for DecayEngineConfig {
31    fn default() -> Self {
32        Self {
33            decay_config: DecayConfig {
34                strategy: DecayStrategy::Exponential,
35                half_life_hours: 168.0, // 1 week
36                min_importance: 0.01,
37            },
38            interval_secs: 3600, // 1 hour
39        }
40    }
41}
42
43impl DecayEngineConfig {
44    /// Load configuration from environment variables.
45    pub fn from_env() -> Self {
46        let half_life_hours: f64 = std::env::var("DAKERA_DECAY_HALF_LIFE_HOURS")
47            .ok()
48            .and_then(|v| v.parse().ok())
49            .unwrap_or(168.0);
50
51        let min_importance: f32 = std::env::var("DAKERA_DECAY_MIN_IMPORTANCE")
52            .ok()
53            .and_then(|v| v.parse().ok())
54            .unwrap_or(0.01);
55
56        let interval_secs: u64 = std::env::var("DAKERA_DECAY_INTERVAL_SECS")
57            .ok()
58            .and_then(|v| v.parse().ok())
59            .unwrap_or(3600);
60
61        let strategy_str =
62            std::env::var("DAKERA_DECAY_STRATEGY").unwrap_or_else(|_| "exponential".to_string());
63
64        let strategy = match strategy_str.to_lowercase().as_str() {
65            "linear" => DecayStrategy::Linear,
66            "step" | "stepfunction" | "step_function" => DecayStrategy::StepFunction,
67            _ => DecayStrategy::Exponential,
68        };
69
70        Self {
71            decay_config: DecayConfig {
72                strategy,
73                half_life_hours,
74                min_importance,
75            },
76            interval_secs,
77        }
78    }
79}
80
81impl DecayEngine {
82    /// Create a new DecayEngine with the given configuration.
83    pub fn new(config: DecayConfig) -> Self {
84        Self { config }
85    }
86
87    /// Calculate decayed importance for a single memory.
88    ///
89    /// Memory type determines base decay speed:
90    /// - Working:    3× faster (temporary by design)
91    /// - Episodic:   1× normal (events fade naturally)
92    /// - Semantic:   0.5× slower (knowledge persists)
93    /// - Procedural: 0.3× slower (skills are durable)
94    ///
95    /// Usage pattern shields from decay (diminishing returns).
96    /// Never-accessed memories fade 50% faster.
97    pub fn calculate_decay(
98        &self,
99        current_importance: f32,
100        hours_elapsed: f64,
101        memory_type: &MemoryType,
102        access_count: u32,
103    ) -> f32 {
104        if hours_elapsed <= 0.0 {
105            return current_importance;
106        }
107
108        // Memory type determines base decay speed
109        let type_multiplier = match memory_type {
110            MemoryType::Working => 3.0,
111            MemoryType::Episodic => 1.0,
112            MemoryType::Semantic => 0.5,
113            MemoryType::Procedural => 0.3,
114        };
115
116        // Usage pattern shields from decay (diminishing returns)
117        let usage_shield = if access_count > 0 {
118            1.0 / (1.0 + (access_count as f64 * 0.1))
119        } else {
120            1.5 // never accessed = 50% faster decay
121        };
122
123        let effective_half_life = self.config.half_life_hours / (type_multiplier * usage_shield);
124
125        let decayed = match self.config.strategy {
126            DecayStrategy::Exponential => {
127                let decay_factor = (0.5_f64).powf(hours_elapsed / effective_half_life);
128                current_importance * decay_factor as f32
129            }
130            DecayStrategy::Linear => {
131                let decay_amount = (hours_elapsed / effective_half_life) as f32 * 0.5;
132                (current_importance - decay_amount).max(0.0)
133            }
134            DecayStrategy::StepFunction => {
135                let steps = (hours_elapsed / effective_half_life).floor() as u32;
136                let decay_factor = (0.5_f32).powi(steps as i32);
137                current_importance * decay_factor
138            }
139        };
140
141        decayed.clamp(0.0, 1.0)
142    }
143
144    /// Calculate access boost for a memory that was just recalled.
145    /// Scales with current importance — valuable memories get bigger boosts.
146    pub fn access_boost(current_importance: f32) -> f32 {
147        let boost = 0.05 + 0.05 * current_importance; // 0.05–0.10 range
148        (current_importance + boost).min(1.0)
149    }
150
151    /// Apply decay to all memories across all agent namespaces.
152    ///
153    /// Iterates all namespaces prefixed with `_dakera_agent_`, loads memories,
154    /// applies decay based on time since last access, and removes memories
155    /// below the minimum importance threshold.
156    pub async fn apply_decay(&self, storage: &Arc<dyn VectorStorage>) -> DecayResult {
157        let mut result = DecayResult::default();
158
159        // List all namespaces
160        let namespaces = match storage.list_namespaces().await {
161            Ok(ns) => ns,
162            Err(e) => {
163                tracing::error!(error = %e, "Failed to list namespaces for decay");
164                return result;
165            }
166        };
167
168        let now = std::time::SystemTime::now()
169            .duration_since(std::time::UNIX_EPOCH)
170            .unwrap_or_default()
171            .as_secs();
172
173        // Only process Dakera agent namespaces
174        for namespace in namespaces {
175            if !namespace.starts_with("_dakera_agent_") {
176                continue;
177            }
178
179            result.namespaces_processed += 1;
180
181            let vectors = match storage.get_all(&namespace).await {
182                Ok(v) => v,
183                Err(e) => {
184                    tracing::warn!(
185                        namespace = %namespace,
186                        error = %e,
187                        "Failed to get vectors for decay"
188                    );
189                    continue;
190                }
191            };
192
193            let mut updated_vectors: Vec<Vector> = Vec::new();
194            let mut ids_to_delete: Vec<String> = Vec::new();
195
196            for vector in &vectors {
197                let memory = match Memory::from_vector(vector) {
198                    Some(m) => m,
199                    None => continue, // Skip non-memory vectors
200                };
201
202                result.memories_processed += 1;
203
204                // DECAY-3: Hard-delete memories whose explicit TTL has expired.
205                // expires_at bypasses decay scoring — immediate removal.
206                if let Some(exp) = memory.expires_at {
207                    if exp <= now {
208                        ids_to_delete.push(memory.id.clone());
209                        result.memories_deleted += 1;
210                        continue;
211                    }
212                }
213
214                // Calculate hours since last access
215                let hours_elapsed = if now > memory.last_accessed_at {
216                    (now - memory.last_accessed_at) as f64 / 3600.0
217                } else {
218                    0.0
219                };
220
221                let new_importance = self.calculate_decay(
222                    memory.importance,
223                    hours_elapsed,
224                    &memory.memory_type,
225                    memory.access_count,
226                );
227
228                // Check if below minimum importance
229                if new_importance < self.config.min_importance {
230                    ids_to_delete.push(memory.id.clone());
231                    result.memories_deleted += 1;
232                    continue;
233                }
234
235                // Only update if importance actually changed
236                if (new_importance - memory.importance).abs() > 0.001 {
237                    let mut updated_memory = memory;
238                    updated_memory.importance = new_importance;
239
240                    // Rebuild vector with updated metadata but same embedding
241                    let mut updated_vector = vector.clone();
242                    updated_vector.metadata = Some(updated_memory.to_vector_metadata());
243                    updated_vectors.push(updated_vector);
244                    result.memories_decayed += 1;
245                }
246            }
247
248            // Delete memories below threshold
249            if !ids_to_delete.is_empty() {
250                if let Err(e) = storage.delete(&namespace, &ids_to_delete).await {
251                    tracing::warn!(
252                        namespace = %namespace,
253                        count = ids_to_delete.len(),
254                        error = %e,
255                        "Failed to delete expired memories"
256                    );
257                }
258            }
259
260            // Upsert updated memories
261            if !updated_vectors.is_empty() {
262                if let Err(e) = storage.upsert(&namespace, updated_vectors).await {
263                    tracing::warn!(
264                        namespace = %namespace,
265                        error = %e,
266                        "Failed to upsert decayed memories"
267                    );
268                }
269            }
270        }
271
272        tracing::info!(
273            namespaces_processed = result.namespaces_processed,
274            memories_processed = result.memories_processed,
275            memories_decayed = result.memories_decayed,
276            memories_deleted = result.memories_deleted,
277            "Decay cycle completed"
278        );
279
280        result
281    }
282
283    /// Spawn the decay engine as a background tokio task.
284    ///
285    /// Takes a shared `Arc<RwLock<DecayConfig>>` so that config changes made at
286    /// runtime via `PUT /admin/decay/config` take effect without a server restart.
287    /// Each loop iteration re-reads the config before running, so strategy/half-life
288    /// changes apply on the next cycle.
289    pub fn spawn(
290        config: Arc<RwLock<DecayConfig>>,
291        interval_secs: u64,
292        storage: Arc<dyn VectorStorage>,
293        metrics: Arc<BackgroundMetrics>,
294    ) -> tokio::task::JoinHandle<()> {
295        let interval = std::time::Duration::from_secs(interval_secs);
296
297        tokio::spawn(async move {
298            tracing::info!(
299                interval_secs,
300                "Decay engine started (hot-reload config via PUT /admin/decay/config)"
301            );
302
303            loop {
304                tokio::time::sleep(interval).await;
305                // Re-read config before each cycle — picks up hot-reload changes
306                let current_config = config.read().await.clone();
307                let engine = DecayEngine::new(current_config);
308                let result = engine.apply_decay(&storage).await;
309                metrics.record_decay(&result);
310            }
311        })
312    }
313}
314
315/// Result of a decay cycle.
316#[derive(Debug, Default, Clone, Serialize, Deserialize)]
317pub struct DecayResult {
318    pub namespaces_processed: usize,
319    pub memories_processed: usize,
320    pub memories_decayed: usize,
321    pub memories_deleted: usize,
322}
323
324/// Shared metrics for background activity tracking.
325///
326/// Updated by decay/autopilot spawn loops so the API can expose
327/// what background jobs are doing to user memories.
328/// Persisted to storage so metrics survive restarts.
329#[derive(Debug, Default)]
330pub struct BackgroundMetrics {
331    inner: std::sync::Mutex<BackgroundMetricsInner>,
332    /// Flag: dirty since last persist
333    dirty: std::sync::atomic::AtomicBool,
334}
335
336/// Max history data points kept (7 days at 1-hour decay interval).
337const MAX_HISTORY_POINTS: usize = 168;
338
339#[derive(Debug, Default, Clone, Serialize, Deserialize)]
340pub struct BackgroundMetricsInner {
341    /// Last decay cycle result
342    #[serde(default)]
343    pub last_decay: Option<DecayResult>,
344    /// Timestamp of last decay run (unix secs)
345    #[serde(default)]
346    pub last_decay_at: Option<u64>,
347    /// Cumulative memories deleted by decay
348    #[serde(default)]
349    pub total_decay_deleted: u64,
350    /// Cumulative memories decayed (importance lowered)
351    #[serde(default)]
352    pub total_decay_adjusted: u64,
353    /// Total number of decay cycles completed
354    #[serde(default)]
355    pub decay_cycles_run: u64,
356
357    /// Last dedup cycle result
358    #[serde(default)]
359    pub last_dedup: Option<DedupResultSnapshot>,
360    /// Timestamp of last dedup run (unix secs)
361    #[serde(default)]
362    pub last_dedup_at: Option<u64>,
363    /// Cumulative duplicates removed
364    #[serde(default)]
365    pub total_dedup_removed: u64,
366
367    /// Last consolidation cycle result
368    #[serde(default)]
369    pub last_consolidation: Option<ConsolidationResultSnapshot>,
370    /// Timestamp of last consolidation run (unix secs)
371    #[serde(default)]
372    pub last_consolidation_at: Option<u64>,
373    /// Cumulative memories consolidated
374    #[serde(default)]
375    pub total_consolidated: u64,
376
377    /// Historical data points for graphing (ring buffer, newest last)
378    #[serde(default)]
379    pub history: Vec<ActivityHistoryPoint>,
380}
381
382/// A single historical data point for the activity timeline graph.
383#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct ActivityHistoryPoint {
385    /// Unix timestamp (seconds)
386    pub timestamp: u64,
387    /// Memories deleted by decay in this cycle
388    pub decay_deleted: u64,
389    /// Memories adjusted by decay in this cycle
390    pub decay_adjusted: u64,
391    /// Duplicates removed in this cycle
392    pub dedup_removed: u64,
393    /// Memories consolidated in this cycle
394    pub consolidated: u64,
395}
396
397/// Serializable snapshot of a dedup result (avoids coupling to autopilot module).
398#[derive(Debug, Default, Clone, Serialize, Deserialize)]
399pub struct DedupResultSnapshot {
400    pub namespaces_processed: usize,
401    pub memories_scanned: usize,
402    pub duplicates_removed: usize,
403}
404
405/// Serializable snapshot of a consolidation result.
406#[derive(Debug, Default, Clone, Serialize, Deserialize)]
407pub struct ConsolidationResultSnapshot {
408    pub namespaces_processed: usize,
409    pub memories_scanned: usize,
410    pub clusters_merged: usize,
411    pub memories_consolidated: usize,
412}
413
414impl BackgroundMetrics {
415    pub fn new() -> Self {
416        Self::default()
417    }
418
419    /// Restore metrics from a previously persisted snapshot.
420    pub fn restore(inner: BackgroundMetricsInner) -> Self {
421        Self {
422            inner: std::sync::Mutex::new(inner),
423            dirty: std::sync::atomic::AtomicBool::new(false),
424        }
425    }
426
427    /// Whether metrics changed since last persist.
428    pub fn is_dirty(&self) -> bool {
429        self.dirty.load(std::sync::atomic::Ordering::Relaxed)
430    }
431
432    /// Clear the dirty flag after a successful persist.
433    pub fn clear_dirty(&self) {
434        self.dirty
435            .store(false, std::sync::atomic::Ordering::Relaxed);
436    }
437
438    /// Record a decay cycle result.
439    pub fn record_decay(&self, result: &DecayResult) {
440        let now = std::time::SystemTime::now()
441            .duration_since(std::time::UNIX_EPOCH)
442            .unwrap_or_default()
443            .as_secs();
444        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
445        inner.total_decay_deleted += result.memories_deleted as u64;
446        inner.total_decay_adjusted += result.memories_decayed as u64;
447        inner.decay_cycles_run += 1;
448        inner.last_decay = Some(result.clone());
449        inner.last_decay_at = Some(now);
450        // Append history point
451        push_history(
452            &mut inner.history,
453            ActivityHistoryPoint {
454                timestamp: now,
455                decay_deleted: result.memories_deleted as u64,
456                decay_adjusted: result.memories_decayed as u64,
457                dedup_removed: 0,
458                consolidated: 0,
459            },
460        );
461        self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
462    }
463
464    /// Record a dedup cycle result.
465    pub fn record_dedup(
466        &self,
467        namespaces_processed: usize,
468        memories_scanned: usize,
469        duplicates_removed: usize,
470    ) {
471        let now = std::time::SystemTime::now()
472            .duration_since(std::time::UNIX_EPOCH)
473            .unwrap_or_default()
474            .as_secs();
475        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
476        inner.total_dedup_removed += duplicates_removed as u64;
477        inner.last_dedup = Some(DedupResultSnapshot {
478            namespaces_processed,
479            memories_scanned,
480            duplicates_removed,
481        });
482        inner.last_dedup_at = Some(now);
483        push_history(
484            &mut inner.history,
485            ActivityHistoryPoint {
486                timestamp: now,
487                decay_deleted: 0,
488                decay_adjusted: 0,
489                dedup_removed: duplicates_removed as u64,
490                consolidated: 0,
491            },
492        );
493        self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
494    }
495
496    /// Record a consolidation cycle result.
497    pub fn record_consolidation(
498        &self,
499        namespaces_processed: usize,
500        memories_scanned: usize,
501        clusters_merged: usize,
502        memories_consolidated: usize,
503    ) {
504        let now = std::time::SystemTime::now()
505            .duration_since(std::time::UNIX_EPOCH)
506            .unwrap_or_default()
507            .as_secs();
508        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
509        inner.total_consolidated += memories_consolidated as u64;
510        inner.last_consolidation = Some(ConsolidationResultSnapshot {
511            namespaces_processed,
512            memories_scanned,
513            clusters_merged,
514            memories_consolidated,
515        });
516        inner.last_consolidation_at = Some(now);
517        push_history(
518            &mut inner.history,
519            ActivityHistoryPoint {
520                timestamp: now,
521                decay_deleted: 0,
522                decay_adjusted: 0,
523                dedup_removed: 0,
524                consolidated: memories_consolidated as u64,
525            },
526        );
527        self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
528    }
529
530    /// Replace the inner state with a restored snapshot (used on startup).
531    pub fn restore_into(&self, restored: BackgroundMetricsInner) {
532        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
533        *inner = restored;
534        // Don't set dirty — this was just loaded from storage
535    }
536
537    /// Get a snapshot of all metrics for API response.
538    pub fn snapshot(&self) -> BackgroundMetricsInner {
539        self.inner.lock().unwrap_or_else(|e| e.into_inner()).clone()
540    }
541}
542
543/// Append a history point, capping at MAX_HISTORY_POINTS.
544fn push_history(history: &mut Vec<ActivityHistoryPoint>, point: ActivityHistoryPoint) {
545    history.push(point);
546    if history.len() > MAX_HISTORY_POINTS {
547        let excess = history.len() - MAX_HISTORY_POINTS;
548        history.drain(..excess);
549    }
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555
556    fn make_engine(strategy: DecayStrategy, half_life: f64) -> DecayEngine {
557        DecayEngine::new(DecayConfig {
558            strategy,
559            half_life_hours: half_life,
560            min_importance: 0.01,
561        })
562    }
563
564    // Default args: Episodic type, 0 access count (same as old behavior with multiplier=1.0, shield=1.5)
565    const EPISODIC: MemoryType = MemoryType::Episodic;
566
567    #[test]
568    fn test_exponential_decay_at_half_life_episodic_no_access() {
569        let engine = make_engine(DecayStrategy::Exponential, 168.0);
570        // Effective half-life = 168 / (1.0 * 1.5) = 112h (faster for never-accessed)
571        let result = engine.calculate_decay(1.0, 112.0, &EPISODIC, 0);
572        assert!((result - 0.5).abs() < 0.01, "Expected ~0.5, got {}", result);
573    }
574
575    #[test]
576    fn test_exponential_decay_zero_time() {
577        let engine = make_engine(DecayStrategy::Exponential, 168.0);
578        let result = engine.calculate_decay(0.8, 0.0, &EPISODIC, 0);
579        assert!((result - 0.8).abs() < 0.001);
580    }
581
582    #[test]
583    fn test_linear_decay_floors_at_zero() {
584        let engine = make_engine(DecayStrategy::Linear, 168.0);
585        let result = engine.calculate_decay(0.3, 168.0, &EPISODIC, 0);
586        assert!(result >= 0.0, "Should not go below 0, got {}", result);
587    }
588
589    #[test]
590    fn test_procedural_decays_slower_than_working() {
591        let engine = make_engine(DecayStrategy::Exponential, 168.0);
592        let working = engine.calculate_decay(1.0, 168.0, &MemoryType::Working, 0);
593        let procedural = engine.calculate_decay(1.0, 168.0, &MemoryType::Procedural, 0);
594        assert!(
595            procedural > working,
596            "Procedural ({}) should decay slower than Working ({})",
597            procedural,
598            working
599        );
600    }
601
602    #[test]
603    fn test_high_access_count_decays_slower() {
604        let engine = make_engine(DecayStrategy::Exponential, 168.0);
605        let no_access = engine.calculate_decay(1.0, 168.0, &EPISODIC, 0);
606        let high_access = engine.calculate_decay(1.0, 168.0, &EPISODIC, 10);
607        assert!(
608            high_access > no_access,
609            "High access ({}) should decay slower than no access ({})",
610            high_access,
611            no_access
612        );
613    }
614
615    #[test]
616    fn test_semantic_decays_slower_than_episodic() {
617        let engine = make_engine(DecayStrategy::Exponential, 168.0);
618        let episodic = engine.calculate_decay(1.0, 168.0, &EPISODIC, 5);
619        let semantic = engine.calculate_decay(1.0, 168.0, &MemoryType::Semantic, 5);
620        assert!(
621            semantic > episodic,
622            "Semantic ({}) should decay slower than Episodic ({})",
623            semantic,
624            episodic
625        );
626    }
627
628    #[test]
629    fn test_access_boost_scales_with_importance() {
630        let low = DecayEngine::access_boost(0.2);
631        let high = DecayEngine::access_boost(0.8);
632        let boost_low = low - 0.2;
633        let boost_high = high - 0.8;
634        assert!(
635            boost_high > boost_low,
636            "High-importance boost ({}) should be larger than low-importance boost ({})",
637            boost_high,
638            boost_low
639        );
640        // Verify range: 0.05 + 0.05*importance
641        assert!((boost_low - (0.05 + 0.05 * 0.2)).abs() < 0.001);
642        assert!((boost_high - (0.05 + 0.05 * 0.8)).abs() < 0.001);
643    }
644
645    #[test]
646    fn test_access_boost_caps_at_one() {
647        assert!((DecayEngine::access_boost(1.0) - 1.0).abs() < 0.001);
648        assert!((DecayEngine::access_boost(0.96) - 1.0).abs() < 0.001);
649    }
650
651    #[test]
652    fn test_decay_clamps_to_range() {
653        let engine = make_engine(DecayStrategy::Exponential, 1.0);
654        let result = engine.calculate_decay(0.001, 100.0, &EPISODIC, 0);
655        assert!(result >= 0.0 && result <= 1.0);
656    }
657
658    #[test]
659    fn test_step_function_decay() {
660        let engine = make_engine(DecayStrategy::StepFunction, 168.0);
661        // Effective half-life for Episodic+0 access = 168/1.5 = 112h
662        let eff_hl = 168.0 / 1.5;
663
664        // Before first step - no decay
665        let result = engine.calculate_decay(1.0, eff_hl * 0.5, &EPISODIC, 0);
666        assert!((result - 1.0).abs() < 0.001);
667
668        // At first step
669        let result = engine.calculate_decay(1.0, eff_hl, &EPISODIC, 0);
670        assert!((result - 0.5).abs() < 0.001);
671    }
672}