1use std::sync::Arc;
10
11use common::{DecayConfig, DecayStrategy, Memory, MemoryType, Vector};
12use serde::{Deserialize, Serialize};
13use storage::VectorStorage;
14use tracing;
15
16pub struct DecayEngine {
18 pub config: DecayConfig,
19}
20
21pub struct DecayEngineConfig {
23 pub decay_config: DecayConfig,
25 pub interval_secs: u64,
27}
28
29impl Default for DecayEngineConfig {
30 fn default() -> Self {
31 Self {
32 decay_config: DecayConfig {
33 strategy: DecayStrategy::Exponential,
34 half_life_hours: 168.0, min_importance: 0.01,
36 },
37 interval_secs: 3600, }
39 }
40}
41
42impl DecayEngineConfig {
43 pub fn from_env() -> Self {
45 let half_life_hours: f64 = std::env::var("DAKERA_DECAY_HALF_LIFE_HOURS")
46 .ok()
47 .and_then(|v| v.parse().ok())
48 .unwrap_or(168.0);
49
50 let min_importance: f32 = std::env::var("DAKERA_DECAY_MIN_IMPORTANCE")
51 .ok()
52 .and_then(|v| v.parse().ok())
53 .unwrap_or(0.01);
54
55 let interval_secs: u64 = std::env::var("DAKERA_DECAY_INTERVAL_SECS")
56 .ok()
57 .and_then(|v| v.parse().ok())
58 .unwrap_or(3600);
59
60 let strategy_str =
61 std::env::var("DAKERA_DECAY_STRATEGY").unwrap_or_else(|_| "exponential".to_string());
62
63 let strategy = match strategy_str.to_lowercase().as_str() {
64 "linear" => DecayStrategy::Linear,
65 "step" | "stepfunction" | "step_function" => DecayStrategy::StepFunction,
66 _ => DecayStrategy::Exponential,
67 };
68
69 Self {
70 decay_config: DecayConfig {
71 strategy,
72 half_life_hours,
73 min_importance,
74 },
75 interval_secs,
76 }
77 }
78}
79
80impl DecayEngine {
81 pub fn new(config: DecayConfig) -> Self {
83 Self { config }
84 }
85
86 pub fn calculate_decay(
97 &self,
98 current_importance: f32,
99 hours_elapsed: f64,
100 memory_type: &MemoryType,
101 access_count: u32,
102 ) -> f32 {
103 if hours_elapsed <= 0.0 {
104 return current_importance;
105 }
106
107 let type_multiplier = match memory_type {
109 MemoryType::Working => 3.0,
110 MemoryType::Episodic => 1.0,
111 MemoryType::Semantic => 0.5,
112 MemoryType::Procedural => 0.3,
113 };
114
115 let usage_shield = if access_count > 0 {
117 1.0 / (1.0 + (access_count as f64 * 0.1))
118 } else {
119 1.5 };
121
122 let effective_half_life = self.config.half_life_hours / (type_multiplier * usage_shield);
123
124 let decayed = match self.config.strategy {
125 DecayStrategy::Exponential => {
126 let decay_factor = (0.5_f64).powf(hours_elapsed / effective_half_life);
127 current_importance * decay_factor as f32
128 }
129 DecayStrategy::Linear => {
130 let decay_amount = (hours_elapsed / effective_half_life) as f32 * 0.5;
131 (current_importance - decay_amount).max(0.0)
132 }
133 DecayStrategy::StepFunction => {
134 let steps = (hours_elapsed / effective_half_life).floor() as u32;
135 let decay_factor = (0.5_f32).powi(steps as i32);
136 current_importance * decay_factor
137 }
138 };
139
140 decayed.clamp(0.0, 1.0)
141 }
142
143 pub fn access_boost(current_importance: f32) -> f32 {
146 let boost = 0.05 + 0.05 * current_importance; (current_importance + boost).min(1.0)
148 }
149
150 pub async fn apply_decay(&self, storage: &Arc<dyn VectorStorage>) -> DecayResult {
156 let mut result = DecayResult::default();
157
158 let namespaces = match storage.list_namespaces().await {
160 Ok(ns) => ns,
161 Err(e) => {
162 tracing::error!(error = %e, "Failed to list namespaces for decay");
163 return result;
164 }
165 };
166
167 let now = std::time::SystemTime::now()
168 .duration_since(std::time::UNIX_EPOCH)
169 .unwrap_or_default()
170 .as_secs();
171
172 for namespace in namespaces {
174 if !namespace.starts_with("_dakera_agent_") {
175 continue;
176 }
177
178 result.namespaces_processed += 1;
179
180 let vectors = match storage.get_all(&namespace).await {
181 Ok(v) => v,
182 Err(e) => {
183 tracing::warn!(
184 namespace = %namespace,
185 error = %e,
186 "Failed to get vectors for decay"
187 );
188 continue;
189 }
190 };
191
192 let mut updated_vectors: Vec<Vector> = Vec::new();
193 let mut ids_to_delete: Vec<String> = Vec::new();
194
195 for vector in &vectors {
196 let memory = match Memory::from_vector(vector) {
197 Some(m) => m,
198 None => continue, };
200
201 result.memories_processed += 1;
202
203 let hours_elapsed = if now > memory.last_accessed_at {
205 (now - memory.last_accessed_at) as f64 / 3600.0
206 } else {
207 0.0
208 };
209
210 let new_importance = self.calculate_decay(
211 memory.importance,
212 hours_elapsed,
213 &memory.memory_type,
214 memory.access_count,
215 );
216
217 if new_importance < self.config.min_importance {
219 ids_to_delete.push(memory.id.clone());
220 result.memories_deleted += 1;
221 continue;
222 }
223
224 if (new_importance - memory.importance).abs() > 0.001 {
226 let mut updated_memory = memory;
227 updated_memory.importance = new_importance;
228
229 let mut updated_vector = vector.clone();
231 updated_vector.metadata = Some(updated_memory.to_vector_metadata());
232 updated_vectors.push(updated_vector);
233 result.memories_decayed += 1;
234 }
235 }
236
237 if !ids_to_delete.is_empty() {
239 if let Err(e) = storage.delete(&namespace, &ids_to_delete).await {
240 tracing::warn!(
241 namespace = %namespace,
242 count = ids_to_delete.len(),
243 error = %e,
244 "Failed to delete expired memories"
245 );
246 }
247 }
248
249 if !updated_vectors.is_empty() {
251 if let Err(e) = storage.upsert(&namespace, updated_vectors).await {
252 tracing::warn!(
253 namespace = %namespace,
254 error = %e,
255 "Failed to upsert decayed memories"
256 );
257 }
258 }
259 }
260
261 tracing::info!(
262 namespaces_processed = result.namespaces_processed,
263 memories_processed = result.memories_processed,
264 memories_decayed = result.memories_decayed,
265 memories_deleted = result.memories_deleted,
266 "Decay cycle completed"
267 );
268
269 result
270 }
271
272 pub fn spawn(
274 config: DecayEngineConfig,
275 storage: Arc<dyn VectorStorage>,
276 metrics: Arc<BackgroundMetrics>,
277 ) -> tokio::task::JoinHandle<()> {
278 let engine = DecayEngine::new(config.decay_config);
279 let interval = std::time::Duration::from_secs(config.interval_secs);
280
281 tokio::spawn(async move {
282 tracing::info!(
283 strategy = ?engine.config.strategy,
284 half_life_hours = engine.config.half_life_hours,
285 min_importance = engine.config.min_importance,
286 interval_secs = interval.as_secs(),
287 "Decay engine started"
288 );
289
290 loop {
291 tokio::time::sleep(interval).await;
292 let result = engine.apply_decay(&storage).await;
293 metrics.record_decay(&result);
294 }
295 })
296 }
297}
298
299#[derive(Debug, Default, Clone, Serialize, Deserialize)]
301pub struct DecayResult {
302 pub namespaces_processed: usize,
303 pub memories_processed: usize,
304 pub memories_decayed: usize,
305 pub memories_deleted: usize,
306}
307
308#[derive(Debug, Default)]
314pub struct BackgroundMetrics {
315 inner: std::sync::Mutex<BackgroundMetricsInner>,
316 dirty: std::sync::atomic::AtomicBool,
318}
319
320const MAX_HISTORY_POINTS: usize = 168;
322
323#[derive(Debug, Default, Clone, Serialize, Deserialize)]
324pub struct BackgroundMetricsInner {
325 #[serde(default)]
327 pub last_decay: Option<DecayResult>,
328 #[serde(default)]
330 pub last_decay_at: Option<u64>,
331 #[serde(default)]
333 pub total_decay_deleted: u64,
334 #[serde(default)]
336 pub total_decay_adjusted: u64,
337
338 #[serde(default)]
340 pub last_dedup: Option<DedupResultSnapshot>,
341 #[serde(default)]
343 pub last_dedup_at: Option<u64>,
344 #[serde(default)]
346 pub total_dedup_removed: u64,
347
348 #[serde(default)]
350 pub last_consolidation: Option<ConsolidationResultSnapshot>,
351 #[serde(default)]
353 pub last_consolidation_at: Option<u64>,
354 #[serde(default)]
356 pub total_consolidated: u64,
357
358 #[serde(default)]
360 pub history: Vec<ActivityHistoryPoint>,
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct ActivityHistoryPoint {
366 pub timestamp: u64,
368 pub decay_deleted: u64,
370 pub decay_adjusted: u64,
372 pub dedup_removed: u64,
374 pub consolidated: u64,
376}
377
378#[derive(Debug, Default, Clone, Serialize, Deserialize)]
380pub struct DedupResultSnapshot {
381 pub namespaces_processed: usize,
382 pub memories_scanned: usize,
383 pub duplicates_removed: usize,
384}
385
386#[derive(Debug, Default, Clone, Serialize, Deserialize)]
388pub struct ConsolidationResultSnapshot {
389 pub namespaces_processed: usize,
390 pub memories_scanned: usize,
391 pub clusters_merged: usize,
392 pub memories_consolidated: usize,
393}
394
395impl BackgroundMetrics {
396 pub fn new() -> Self {
397 Self::default()
398 }
399
400 pub fn restore(inner: BackgroundMetricsInner) -> Self {
402 Self {
403 inner: std::sync::Mutex::new(inner),
404 dirty: std::sync::atomic::AtomicBool::new(false),
405 }
406 }
407
408 pub fn is_dirty(&self) -> bool {
410 self.dirty.load(std::sync::atomic::Ordering::Relaxed)
411 }
412
413 pub fn clear_dirty(&self) {
415 self.dirty
416 .store(false, std::sync::atomic::Ordering::Relaxed);
417 }
418
419 pub fn record_decay(&self, result: &DecayResult) {
421 let now = std::time::SystemTime::now()
422 .duration_since(std::time::UNIX_EPOCH)
423 .unwrap_or_default()
424 .as_secs();
425 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
426 inner.total_decay_deleted += result.memories_deleted as u64;
427 inner.total_decay_adjusted += result.memories_decayed as u64;
428 inner.last_decay = Some(result.clone());
429 inner.last_decay_at = Some(now);
430 push_history(
432 &mut inner.history,
433 ActivityHistoryPoint {
434 timestamp: now,
435 decay_deleted: result.memories_deleted as u64,
436 decay_adjusted: result.memories_decayed as u64,
437 dedup_removed: 0,
438 consolidated: 0,
439 },
440 );
441 self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
442 }
443
444 pub fn record_dedup(
446 &self,
447 namespaces_processed: usize,
448 memories_scanned: usize,
449 duplicates_removed: usize,
450 ) {
451 let now = std::time::SystemTime::now()
452 .duration_since(std::time::UNIX_EPOCH)
453 .unwrap_or_default()
454 .as_secs();
455 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
456 inner.total_dedup_removed += duplicates_removed as u64;
457 inner.last_dedup = Some(DedupResultSnapshot {
458 namespaces_processed,
459 memories_scanned,
460 duplicates_removed,
461 });
462 inner.last_dedup_at = Some(now);
463 push_history(
464 &mut inner.history,
465 ActivityHistoryPoint {
466 timestamp: now,
467 decay_deleted: 0,
468 decay_adjusted: 0,
469 dedup_removed: duplicates_removed as u64,
470 consolidated: 0,
471 },
472 );
473 self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
474 }
475
476 pub fn record_consolidation(
478 &self,
479 namespaces_processed: usize,
480 memories_scanned: usize,
481 clusters_merged: usize,
482 memories_consolidated: usize,
483 ) {
484 let now = std::time::SystemTime::now()
485 .duration_since(std::time::UNIX_EPOCH)
486 .unwrap_or_default()
487 .as_secs();
488 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
489 inner.total_consolidated += memories_consolidated as u64;
490 inner.last_consolidation = Some(ConsolidationResultSnapshot {
491 namespaces_processed,
492 memories_scanned,
493 clusters_merged,
494 memories_consolidated,
495 });
496 inner.last_consolidation_at = Some(now);
497 push_history(
498 &mut inner.history,
499 ActivityHistoryPoint {
500 timestamp: now,
501 decay_deleted: 0,
502 decay_adjusted: 0,
503 dedup_removed: 0,
504 consolidated: memories_consolidated as u64,
505 },
506 );
507 self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
508 }
509
510 pub fn restore_into(&self, restored: BackgroundMetricsInner) {
512 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
513 *inner = restored;
514 }
516
517 pub fn snapshot(&self) -> BackgroundMetricsInner {
519 self.inner.lock().unwrap_or_else(|e| e.into_inner()).clone()
520 }
521}
522
523fn push_history(history: &mut Vec<ActivityHistoryPoint>, point: ActivityHistoryPoint) {
525 history.push(point);
526 if history.len() > MAX_HISTORY_POINTS {
527 let excess = history.len() - MAX_HISTORY_POINTS;
528 history.drain(..excess);
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 use super::*;
535
536 fn make_engine(strategy: DecayStrategy, half_life: f64) -> DecayEngine {
537 DecayEngine::new(DecayConfig {
538 strategy,
539 half_life_hours: half_life,
540 min_importance: 0.01,
541 })
542 }
543
544 const EPISODIC: MemoryType = MemoryType::Episodic;
546
547 #[test]
548 fn test_exponential_decay_at_half_life_episodic_no_access() {
549 let engine = make_engine(DecayStrategy::Exponential, 168.0);
550 let result = engine.calculate_decay(1.0, 112.0, &EPISODIC, 0);
552 assert!((result - 0.5).abs() < 0.01, "Expected ~0.5, got {}", result);
553 }
554
555 #[test]
556 fn test_exponential_decay_zero_time() {
557 let engine = make_engine(DecayStrategy::Exponential, 168.0);
558 let result = engine.calculate_decay(0.8, 0.0, &EPISODIC, 0);
559 assert!((result - 0.8).abs() < 0.001);
560 }
561
562 #[test]
563 fn test_linear_decay_floors_at_zero() {
564 let engine = make_engine(DecayStrategy::Linear, 168.0);
565 let result = engine.calculate_decay(0.3, 168.0, &EPISODIC, 0);
566 assert!(result >= 0.0, "Should not go below 0, got {}", result);
567 }
568
569 #[test]
570 fn test_procedural_decays_slower_than_working() {
571 let engine = make_engine(DecayStrategy::Exponential, 168.0);
572 let working = engine.calculate_decay(1.0, 168.0, &MemoryType::Working, 0);
573 let procedural = engine.calculate_decay(1.0, 168.0, &MemoryType::Procedural, 0);
574 assert!(
575 procedural > working,
576 "Procedural ({}) should decay slower than Working ({})",
577 procedural,
578 working
579 );
580 }
581
582 #[test]
583 fn test_high_access_count_decays_slower() {
584 let engine = make_engine(DecayStrategy::Exponential, 168.0);
585 let no_access = engine.calculate_decay(1.0, 168.0, &EPISODIC, 0);
586 let high_access = engine.calculate_decay(1.0, 168.0, &EPISODIC, 10);
587 assert!(
588 high_access > no_access,
589 "High access ({}) should decay slower than no access ({})",
590 high_access,
591 no_access
592 );
593 }
594
595 #[test]
596 fn test_semantic_decays_slower_than_episodic() {
597 let engine = make_engine(DecayStrategy::Exponential, 168.0);
598 let episodic = engine.calculate_decay(1.0, 168.0, &EPISODIC, 5);
599 let semantic = engine.calculate_decay(1.0, 168.0, &MemoryType::Semantic, 5);
600 assert!(
601 semantic > episodic,
602 "Semantic ({}) should decay slower than Episodic ({})",
603 semantic,
604 episodic
605 );
606 }
607
608 #[test]
609 fn test_access_boost_scales_with_importance() {
610 let low = DecayEngine::access_boost(0.2);
611 let high = DecayEngine::access_boost(0.8);
612 let boost_low = low - 0.2;
613 let boost_high = high - 0.8;
614 assert!(
615 boost_high > boost_low,
616 "High-importance boost ({}) should be larger than low-importance boost ({})",
617 boost_high,
618 boost_low
619 );
620 assert!((boost_low - (0.05 + 0.05 * 0.2)).abs() < 0.001);
622 assert!((boost_high - (0.05 + 0.05 * 0.8)).abs() < 0.001);
623 }
624
625 #[test]
626 fn test_access_boost_caps_at_one() {
627 assert!((DecayEngine::access_boost(1.0) - 1.0).abs() < 0.001);
628 assert!((DecayEngine::access_boost(0.96) - 1.0).abs() < 0.001);
629 }
630
631 #[test]
632 fn test_decay_clamps_to_range() {
633 let engine = make_engine(DecayStrategy::Exponential, 1.0);
634 let result = engine.calculate_decay(0.001, 100.0, &EPISODIC, 0);
635 assert!(result >= 0.0 && result <= 1.0);
636 }
637
638 #[test]
639 fn test_step_function_decay() {
640 let engine = make_engine(DecayStrategy::StepFunction, 168.0);
641 let eff_hl = 168.0 / 1.5;
643
644 let result = engine.calculate_decay(1.0, eff_hl * 0.5, &EPISODIC, 0);
646 assert!((result - 1.0).abs() < 0.001);
647
648 let result = engine.calculate_decay(1.0, eff_hl, &EPISODIC, 0);
650 assert!((result - 0.5).abs() < 0.001);
651 }
652}