1use std::sync::Arc;
10
11use common::{DecayConfig, DecayStrategy, Memory, MemoryType, Vector};
12use serde::{Deserialize, Serialize};
13use storage::VectorStorage;
14use tokio::sync::RwLock;
15use tracing;
16
17pub struct DecayEngine {
19 pub config: DecayConfig,
20}
21
22pub struct DecayEngineConfig {
24 pub decay_config: DecayConfig,
26 pub interval_secs: u64,
28}
29
30impl Default for DecayEngineConfig {
31 fn default() -> Self {
32 Self {
33 decay_config: DecayConfig {
34 strategy: DecayStrategy::Exponential,
35 half_life_hours: 168.0, min_importance: 0.01,
37 },
38 interval_secs: 3600, }
40 }
41}
42
43impl DecayEngineConfig {
44 pub fn from_env() -> Self {
46 let half_life_hours: f64 = std::env::var("DAKERA_DECAY_HALF_LIFE_HOURS")
47 .ok()
48 .and_then(|v| v.parse().ok())
49 .unwrap_or(168.0);
50
51 let min_importance: f32 = std::env::var("DAKERA_DECAY_MIN_IMPORTANCE")
52 .ok()
53 .and_then(|v| v.parse().ok())
54 .unwrap_or(0.01);
55
56 let interval_secs: u64 = std::env::var("DAKERA_DECAY_INTERVAL_SECS")
57 .ok()
58 .and_then(|v| v.parse().ok())
59 .unwrap_or(3600);
60
61 let strategy_str =
62 std::env::var("DAKERA_DECAY_STRATEGY").unwrap_or_else(|_| "exponential".to_string());
63
64 let strategy = match strategy_str.to_lowercase().as_str() {
65 "linear" => DecayStrategy::Linear,
66 "step" | "stepfunction" | "step_function" => DecayStrategy::StepFunction,
67 _ => DecayStrategy::Exponential,
68 };
69
70 Self {
71 decay_config: DecayConfig {
72 strategy,
73 half_life_hours,
74 min_importance,
75 },
76 interval_secs,
77 }
78 }
79}
80
81impl DecayEngine {
82 pub fn new(config: DecayConfig) -> Self {
84 Self { config }
85 }
86
87 pub fn calculate_decay(
98 &self,
99 current_importance: f32,
100 hours_elapsed: f64,
101 memory_type: &MemoryType,
102 access_count: u32,
103 ) -> f32 {
104 if hours_elapsed <= 0.0 {
105 return current_importance;
106 }
107
108 let type_multiplier = match memory_type {
110 MemoryType::Working => 3.0,
111 MemoryType::Episodic => 1.0,
112 MemoryType::Semantic => 0.5,
113 MemoryType::Procedural => 0.3,
114 };
115
116 let usage_shield = if access_count > 0 {
118 1.0 / (1.0 + (access_count as f64 * 0.1))
119 } else {
120 1.5 };
122
123 let effective_half_life = self.config.half_life_hours / (type_multiplier * usage_shield);
124
125 let decayed = match self.config.strategy {
126 DecayStrategy::Exponential => {
127 let decay_factor = (0.5_f64).powf(hours_elapsed / effective_half_life);
128 current_importance * decay_factor as f32
129 }
130 DecayStrategy::Linear => {
131 let decay_amount = (hours_elapsed / effective_half_life) as f32 * 0.5;
132 (current_importance - decay_amount).max(0.0)
133 }
134 DecayStrategy::StepFunction => {
135 let steps = (hours_elapsed / effective_half_life).floor() as u32;
136 let decay_factor = (0.5_f32).powi(steps as i32);
137 current_importance * decay_factor
138 }
139 };
140
141 decayed.clamp(0.0, 1.0)
142 }
143
144 pub fn access_boost(current_importance: f32) -> f32 {
147 let boost = 0.05 + 0.05 * current_importance; (current_importance + boost).min(1.0)
149 }
150
151 pub async fn apply_decay(&self, storage: &Arc<dyn VectorStorage>) -> DecayResult {
157 let mut result = DecayResult::default();
158
159 let namespaces = match storage.list_namespaces().await {
161 Ok(ns) => ns,
162 Err(e) => {
163 tracing::error!(error = %e, "Failed to list namespaces for decay");
164 return result;
165 }
166 };
167
168 let now = std::time::SystemTime::now()
169 .duration_since(std::time::UNIX_EPOCH)
170 .unwrap_or_default()
171 .as_secs();
172
173 for namespace in namespaces {
175 if !namespace.starts_with("_dakera_agent_") {
176 continue;
177 }
178
179 result.namespaces_processed += 1;
180
181 let vectors = match storage.get_all(&namespace).await {
182 Ok(v) => v,
183 Err(e) => {
184 tracing::warn!(
185 namespace = %namespace,
186 error = %e,
187 "Failed to get vectors for decay"
188 );
189 continue;
190 }
191 };
192
193 let mut updated_vectors: Vec<Vector> = Vec::new();
194 let mut ids_to_delete: Vec<String> = Vec::new();
195
196 for vector in &vectors {
197 let memory = match Memory::from_vector(vector) {
198 Some(m) => m,
199 None => continue, };
201
202 result.memories_processed += 1;
203
204 if let Some(exp) = memory.expires_at {
207 if exp <= now {
208 ids_to_delete.push(memory.id.clone());
209 result.memories_deleted += 1;
210 continue;
211 }
212 }
213
214 let hours_elapsed = if now > memory.last_accessed_at {
216 (now - memory.last_accessed_at) as f64 / 3600.0
217 } else {
218 0.0
219 };
220
221 let new_importance = self.calculate_decay(
222 memory.importance,
223 hours_elapsed,
224 &memory.memory_type,
225 memory.access_count,
226 );
227
228 if new_importance < self.config.min_importance {
230 ids_to_delete.push(memory.id.clone());
231 result.memories_deleted += 1;
232 continue;
233 }
234
235 if (new_importance - memory.importance).abs() > 0.001 {
237 let mut updated_memory = memory;
238 updated_memory.importance = new_importance;
239
240 let mut updated_vector = vector.clone();
242 updated_vector.metadata = Some(updated_memory.to_vector_metadata());
243 updated_vectors.push(updated_vector);
244 result.memories_decayed += 1;
245 }
246 }
247
248 if !ids_to_delete.is_empty() {
250 if let Err(e) = storage.delete(&namespace, &ids_to_delete).await {
251 tracing::warn!(
252 namespace = %namespace,
253 count = ids_to_delete.len(),
254 error = %e,
255 "Failed to delete expired memories"
256 );
257 }
258 }
259
260 if !updated_vectors.is_empty() {
262 if let Err(e) = storage.upsert(&namespace, updated_vectors).await {
263 tracing::warn!(
264 namespace = %namespace,
265 error = %e,
266 "Failed to upsert decayed memories"
267 );
268 }
269 }
270 }
271
272 tracing::info!(
273 namespaces_processed = result.namespaces_processed,
274 memories_processed = result.memories_processed,
275 memories_decayed = result.memories_decayed,
276 memories_deleted = result.memories_deleted,
277 "Decay cycle completed"
278 );
279
280 result
281 }
282
283 pub fn spawn(
290 config: Arc<RwLock<DecayConfig>>,
291 interval_secs: u64,
292 storage: Arc<dyn VectorStorage>,
293 metrics: Arc<BackgroundMetrics>,
294 ) -> tokio::task::JoinHandle<()> {
295 let interval = std::time::Duration::from_secs(interval_secs);
296
297 tokio::spawn(async move {
298 tracing::info!(
299 interval_secs,
300 "Decay engine started (hot-reload config via PUT /admin/decay/config)"
301 );
302
303 loop {
304 tokio::time::sleep(interval).await;
305 let current_config = config.read().await.clone();
307 let engine = DecayEngine::new(current_config);
308 let result = engine.apply_decay(&storage).await;
309 metrics.record_decay(&result);
310 }
311 })
312 }
313}
314
315#[derive(Debug, Default, Clone, Serialize, Deserialize)]
317pub struct DecayResult {
318 pub namespaces_processed: usize,
319 pub memories_processed: usize,
320 pub memories_decayed: usize,
321 pub memories_deleted: usize,
322}
323
324#[derive(Debug, Default)]
330pub struct BackgroundMetrics {
331 inner: std::sync::Mutex<BackgroundMetricsInner>,
332 dirty: std::sync::atomic::AtomicBool,
334}
335
336const MAX_HISTORY_POINTS: usize = 168;
338
339#[derive(Debug, Default, Clone, Serialize, Deserialize)]
340pub struct BackgroundMetricsInner {
341 #[serde(default)]
343 pub last_decay: Option<DecayResult>,
344 #[serde(default)]
346 pub last_decay_at: Option<u64>,
347 #[serde(default)]
349 pub total_decay_deleted: u64,
350 #[serde(default)]
352 pub total_decay_adjusted: u64,
353 #[serde(default)]
355 pub decay_cycles_run: u64,
356
357 #[serde(default)]
359 pub last_dedup: Option<DedupResultSnapshot>,
360 #[serde(default)]
362 pub last_dedup_at: Option<u64>,
363 #[serde(default)]
365 pub total_dedup_removed: u64,
366
367 #[serde(default)]
369 pub last_consolidation: Option<ConsolidationResultSnapshot>,
370 #[serde(default)]
372 pub last_consolidation_at: Option<u64>,
373 #[serde(default)]
375 pub total_consolidated: u64,
376
377 #[serde(default)]
379 pub history: Vec<ActivityHistoryPoint>,
380}
381
382#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct ActivityHistoryPoint {
385 pub timestamp: u64,
387 pub decay_deleted: u64,
389 pub decay_adjusted: u64,
391 pub dedup_removed: u64,
393 pub consolidated: u64,
395}
396
397#[derive(Debug, Default, Clone, Serialize, Deserialize)]
399pub struct DedupResultSnapshot {
400 pub namespaces_processed: usize,
401 pub memories_scanned: usize,
402 pub duplicates_removed: usize,
403}
404
405#[derive(Debug, Default, Clone, Serialize, Deserialize)]
407pub struct ConsolidationResultSnapshot {
408 pub namespaces_processed: usize,
409 pub memories_scanned: usize,
410 pub clusters_merged: usize,
411 pub memories_consolidated: usize,
412}
413
414impl BackgroundMetrics {
415 pub fn new() -> Self {
416 Self::default()
417 }
418
419 pub fn restore(inner: BackgroundMetricsInner) -> Self {
421 Self {
422 inner: std::sync::Mutex::new(inner),
423 dirty: std::sync::atomic::AtomicBool::new(false),
424 }
425 }
426
427 pub fn is_dirty(&self) -> bool {
429 self.dirty.load(std::sync::atomic::Ordering::Relaxed)
430 }
431
432 pub fn clear_dirty(&self) {
434 self.dirty
435 .store(false, std::sync::atomic::Ordering::Relaxed);
436 }
437
438 pub fn record_decay(&self, result: &DecayResult) {
440 let now = std::time::SystemTime::now()
441 .duration_since(std::time::UNIX_EPOCH)
442 .unwrap_or_default()
443 .as_secs();
444 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
445 inner.total_decay_deleted += result.memories_deleted as u64;
446 inner.total_decay_adjusted += result.memories_decayed as u64;
447 inner.decay_cycles_run += 1;
448 inner.last_decay = Some(result.clone());
449 inner.last_decay_at = Some(now);
450 push_history(
452 &mut inner.history,
453 ActivityHistoryPoint {
454 timestamp: now,
455 decay_deleted: result.memories_deleted as u64,
456 decay_adjusted: result.memories_decayed as u64,
457 dedup_removed: 0,
458 consolidated: 0,
459 },
460 );
461 self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
462 }
463
464 pub fn record_dedup(
466 &self,
467 namespaces_processed: usize,
468 memories_scanned: usize,
469 duplicates_removed: usize,
470 ) {
471 let now = std::time::SystemTime::now()
472 .duration_since(std::time::UNIX_EPOCH)
473 .unwrap_or_default()
474 .as_secs();
475 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
476 inner.total_dedup_removed += duplicates_removed as u64;
477 inner.last_dedup = Some(DedupResultSnapshot {
478 namespaces_processed,
479 memories_scanned,
480 duplicates_removed,
481 });
482 inner.last_dedup_at = Some(now);
483 push_history(
484 &mut inner.history,
485 ActivityHistoryPoint {
486 timestamp: now,
487 decay_deleted: 0,
488 decay_adjusted: 0,
489 dedup_removed: duplicates_removed as u64,
490 consolidated: 0,
491 },
492 );
493 self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
494 }
495
496 pub fn record_consolidation(
498 &self,
499 namespaces_processed: usize,
500 memories_scanned: usize,
501 clusters_merged: usize,
502 memories_consolidated: usize,
503 ) {
504 let now = std::time::SystemTime::now()
505 .duration_since(std::time::UNIX_EPOCH)
506 .unwrap_or_default()
507 .as_secs();
508 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
509 inner.total_consolidated += memories_consolidated as u64;
510 inner.last_consolidation = Some(ConsolidationResultSnapshot {
511 namespaces_processed,
512 memories_scanned,
513 clusters_merged,
514 memories_consolidated,
515 });
516 inner.last_consolidation_at = Some(now);
517 push_history(
518 &mut inner.history,
519 ActivityHistoryPoint {
520 timestamp: now,
521 decay_deleted: 0,
522 decay_adjusted: 0,
523 dedup_removed: 0,
524 consolidated: memories_consolidated as u64,
525 },
526 );
527 self.dirty.store(true, std::sync::atomic::Ordering::Relaxed);
528 }
529
530 pub fn restore_into(&self, restored: BackgroundMetricsInner) {
532 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
533 *inner = restored;
534 }
536
537 pub fn snapshot(&self) -> BackgroundMetricsInner {
539 self.inner.lock().unwrap_or_else(|e| e.into_inner()).clone()
540 }
541}
542
543fn push_history(history: &mut Vec<ActivityHistoryPoint>, point: ActivityHistoryPoint) {
545 history.push(point);
546 if history.len() > MAX_HISTORY_POINTS {
547 let excess = history.len() - MAX_HISTORY_POINTS;
548 history.drain(..excess);
549 }
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555
556 fn make_engine(strategy: DecayStrategy, half_life: f64) -> DecayEngine {
557 DecayEngine::new(DecayConfig {
558 strategy,
559 half_life_hours: half_life,
560 min_importance: 0.01,
561 })
562 }
563
564 const EPISODIC: MemoryType = MemoryType::Episodic;
566
567 #[test]
568 fn test_exponential_decay_at_half_life_episodic_no_access() {
569 let engine = make_engine(DecayStrategy::Exponential, 168.0);
570 let result = engine.calculate_decay(1.0, 112.0, &EPISODIC, 0);
572 assert!((result - 0.5).abs() < 0.01, "Expected ~0.5, got {}", result);
573 }
574
575 #[test]
576 fn test_exponential_decay_zero_time() {
577 let engine = make_engine(DecayStrategy::Exponential, 168.0);
578 let result = engine.calculate_decay(0.8, 0.0, &EPISODIC, 0);
579 assert!((result - 0.8).abs() < 0.001);
580 }
581
582 #[test]
583 fn test_linear_decay_floors_at_zero() {
584 let engine = make_engine(DecayStrategy::Linear, 168.0);
585 let result = engine.calculate_decay(0.3, 168.0, &EPISODIC, 0);
586 assert!(result >= 0.0, "Should not go below 0, got {}", result);
587 }
588
589 #[test]
590 fn test_procedural_decays_slower_than_working() {
591 let engine = make_engine(DecayStrategy::Exponential, 168.0);
592 let working = engine.calculate_decay(1.0, 168.0, &MemoryType::Working, 0);
593 let procedural = engine.calculate_decay(1.0, 168.0, &MemoryType::Procedural, 0);
594 assert!(
595 procedural > working,
596 "Procedural ({}) should decay slower than Working ({})",
597 procedural,
598 working
599 );
600 }
601
602 #[test]
603 fn test_high_access_count_decays_slower() {
604 let engine = make_engine(DecayStrategy::Exponential, 168.0);
605 let no_access = engine.calculate_decay(1.0, 168.0, &EPISODIC, 0);
606 let high_access = engine.calculate_decay(1.0, 168.0, &EPISODIC, 10);
607 assert!(
608 high_access > no_access,
609 "High access ({}) should decay slower than no access ({})",
610 high_access,
611 no_access
612 );
613 }
614
615 #[test]
616 fn test_semantic_decays_slower_than_episodic() {
617 let engine = make_engine(DecayStrategy::Exponential, 168.0);
618 let episodic = engine.calculate_decay(1.0, 168.0, &EPISODIC, 5);
619 let semantic = engine.calculate_decay(1.0, 168.0, &MemoryType::Semantic, 5);
620 assert!(
621 semantic > episodic,
622 "Semantic ({}) should decay slower than Episodic ({})",
623 semantic,
624 episodic
625 );
626 }
627
628 #[test]
629 fn test_access_boost_scales_with_importance() {
630 let low = DecayEngine::access_boost(0.2);
631 let high = DecayEngine::access_boost(0.8);
632 let boost_low = low - 0.2;
633 let boost_high = high - 0.8;
634 assert!(
635 boost_high > boost_low,
636 "High-importance boost ({}) should be larger than low-importance boost ({})",
637 boost_high,
638 boost_low
639 );
640 assert!((boost_low - (0.05 + 0.05 * 0.2)).abs() < 0.001);
642 assert!((boost_high - (0.05 + 0.05 * 0.8)).abs() < 0.001);
643 }
644
645 #[test]
646 fn test_access_boost_caps_at_one() {
647 assert!((DecayEngine::access_boost(1.0) - 1.0).abs() < 0.001);
648 assert!((DecayEngine::access_boost(0.96) - 1.0).abs() < 0.001);
649 }
650
651 #[test]
652 fn test_decay_clamps_to_range() {
653 let engine = make_engine(DecayStrategy::Exponential, 1.0);
654 let result = engine.calculate_decay(0.001, 100.0, &EPISODIC, 0);
655 assert!(result >= 0.0 && result <= 1.0);
656 }
657
658 #[test]
659 fn test_step_function_decay() {
660 let engine = make_engine(DecayStrategy::StepFunction, 168.0);
661 let eff_hl = 168.0 / 1.5;
663
664 let result = engine.calculate_decay(1.0, eff_hl * 0.5, &EPISODIC, 0);
666 assert!((result - 1.0).abs() < 0.001);
667
668 let result = engine.calculate_decay(1.0, eff_hl, &EPISODIC, 0);
670 assert!((result - 0.5).abs() < 0.001);
671 }
672}