codex_memory/memory/
forgetting_job.rs

1//! Forgetting and Memory Cleanup Job
2//!
3//! This module implements an automatic cleanup job for processing memory decay
4//! and forgetting based on the Ebbinghaus forgetting curve. It extends the existing
5//! consolidation job infrastructure with forgetting-specific logic.
6//!
7//! Key features:
8//! - Ebbinghaus forgetting curve-based decay calculation
9//! - Tier-specific decay rate multipliers
10//! - Reinforcement learning for dynamic importance scoring
11//! - Configurable hard deletion of forgotten memories
12//! - Performance metrics and monitoring
13
14use super::error::{MemoryError, Result};
15use super::math_engine::{MathEngine, MemoryParameters};
16use super::models::*;
17use super::repository::MemoryRepository;
18use crate::config::ForgettingConfig;
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use tokio::time;
24use tracing::{debug, error, info, warn};
25use uuid::Uuid;
26
27/// Configuration for the forgetting background job
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct ForgettingJobConfig {
30    /// Forgetting configuration
31    pub forgetting_config: ForgettingConfig,
32
33    /// Maximum number of memories to process per batch
34    pub batch_size: usize,
35
36    /// Maximum number of batches to process per run
37    pub max_batches_per_run: usize,
38
39    /// Math engine configuration for decay calculations
40    pub math_engine_config: super::math_engine::MathEngineConfig,
41}
42
43impl Default for ForgettingJobConfig {
44    fn default() -> Self {
45        Self {
46            forgetting_config: ForgettingConfig::default(),
47            batch_size: 1000,
48            max_batches_per_run: 10,
49            math_engine_config: super::math_engine::MathEngineConfig::default(),
50        }
51    }
52}
53
54/// Result of a forgetting job run
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct ForgettingJobResult {
57    pub run_id: Uuid,
58    pub started_at: DateTime<Utc>,
59    pub completed_at: DateTime<Utc>,
60    pub total_memories_processed: usize,
61    pub decay_rates_updated: usize,
62    pub importance_scores_updated: usize,
63    pub memories_hard_deleted: usize,
64    pub batches_processed: usize,
65    pub errors_encountered: usize,
66    pub performance_metrics: ForgettingPerformanceMetrics,
67}
68
69/// Performance metrics for forgetting processing
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct ForgettingPerformanceMetrics {
72    pub memories_per_second: f64,
73    pub total_processing_time_ms: u64,
74    pub decay_calculation_time_ms: u64,
75    pub database_update_time_ms: u64,
76    pub hard_deletion_time_ms: u64,
77    pub reinforcement_learning_time_ms: u64,
78}
79
80/// Batch processing result for forgetting operations
81#[derive(Debug, Clone)]
82pub struct ForgettingBatchResult {
83    pub processed_count: usize,
84    pub decay_updates: Vec<(Uuid, f64)>, // (memory_id, new_decay_rate)
85    pub importance_updates: Vec<(Uuid, f64)>, // (memory_id, new_importance_score)
86    pub hard_deletion_candidates: Vec<Uuid>,
87    pub processing_time_ms: u64,
88}
89
90/// Background forgetting job runner
91pub struct ForgettingJob {
92    config: ForgettingJobConfig,
93    repository: Arc<MemoryRepository>,
94    math_engine: MathEngine,
95    is_running: std::sync::atomic::AtomicBool,
96}
97
98impl ForgettingJob {
99    pub fn new(config: ForgettingJobConfig, repository: Arc<MemoryRepository>) -> Self {
100        let math_engine = MathEngine::with_config(config.math_engine_config.clone());
101
102        Self {
103            config,
104            repository,
105            math_engine,
106            is_running: std::sync::atomic::AtomicBool::new(false),
107        }
108    }
109
110    /// Start the background forgetting job
111    pub async fn start(&self) -> Result<()> {
112        if self.is_running.load(std::sync::atomic::Ordering::Relaxed) {
113            return Err(MemoryError::InvalidRequest {
114                message: "Forgetting job is already running".to_string(),
115            });
116        }
117
118        if !self.config.forgetting_config.enabled {
119            info!("Forgetting job is disabled in configuration");
120            return Ok(());
121        }
122
123        self.is_running
124            .store(true, std::sync::atomic::Ordering::Relaxed);
125        info!("Starting forgetting background job");
126
127        let mut interval = time::interval(Duration::from_secs(
128            self.config.forgetting_config.cleanup_interval_seconds,
129        ));
130
131        while self.is_running.load(std::sync::atomic::Ordering::Relaxed) {
132            interval.tick().await;
133
134            match self.run_forgetting_cycle().await {
135                Ok(result) => {
136                    info!(
137                        "Forgetting cycle completed: {} memories processed, {} decay updates, {} importance updates, {} hard deletions, {:.1} mem/sec",
138                        result.total_memories_processed,
139                        result.decay_rates_updated,
140                        result.importance_scores_updated,
141                        result.memories_hard_deleted,
142                        result.performance_metrics.memories_per_second
143                    );
144                }
145                Err(e) => {
146                    error!("Forgetting cycle failed: {}", e);
147                    // Continue running despite errors
148                }
149            }
150        }
151
152        Ok(())
153    }
154
155    /// Stop the background forgetting job
156    pub fn stop(&self) {
157        info!("Stopping forgetting background job");
158        self.is_running
159            .store(false, std::sync::atomic::Ordering::Relaxed);
160    }
161
162    /// Run a single forgetting cycle
163    pub async fn run_forgetting_cycle(&self) -> Result<ForgettingJobResult> {
164        let run_id = Uuid::new_v4();
165        let started_at = Utc::now();
166        let start_time = Instant::now();
167
168        debug!("Starting forgetting cycle {}", run_id);
169
170        let mut total_processed = 0;
171        let mut decay_updates = 0;
172        let mut importance_updates = 0;
173        let mut hard_deletions = 0;
174        let mut batches_processed = 0;
175        let mut errors_encountered = 0;
176        let mut decay_calc_time = 0u64;
177        let mut db_update_time = 0u64;
178        let mut hard_deletion_time = 0u64;
179        let mut rl_time = 0u64;
180
181        // Process batches across all tiers
182        for tier in [
183            MemoryTier::Working,
184            MemoryTier::Warm,
185            MemoryTier::Cold,
186            MemoryTier::Frozen,
187        ] {
188            for _ in 0..self.config.max_batches_per_run {
189                let batch_start = Instant::now();
190
191                match self.process_forgetting_batch(tier).await {
192                    Ok(batch_result) => {
193                        if batch_result.processed_count == 0 {
194                            break; // No more memories to process in this tier
195                        }
196
197                        // Apply decay rate updates
198                        if !batch_result.decay_updates.is_empty() {
199                            let update_start = Instant::now();
200                            match self.apply_decay_updates(&batch_result.decay_updates).await {
201                                Ok(_) => {
202                                    db_update_time += update_start.elapsed().as_millis() as u64;
203                                    decay_updates += batch_result.decay_updates.len();
204                                }
205                                Err(e) => {
206                                    warn!("Failed to apply decay updates: {}", e);
207                                    errors_encountered += 1;
208                                }
209                            }
210                        }
211
212                        // Apply importance updates (reinforcement learning)
213                        if !batch_result.importance_updates.is_empty() {
214                            let rl_start = Instant::now();
215                            match self
216                                .apply_importance_updates(&batch_result.importance_updates)
217                                .await
218                            {
219                                Ok(_) => {
220                                    rl_time += rl_start.elapsed().as_millis() as u64;
221                                    importance_updates += batch_result.importance_updates.len();
222                                }
223                                Err(e) => {
224                                    warn!("Failed to apply importance updates: {}", e);
225                                    errors_encountered += 1;
226                                }
227                            }
228                        }
229
230                        // Process hard deletions if enabled
231                        if self.config.forgetting_config.enable_hard_deletion
232                            && !batch_result.hard_deletion_candidates.is_empty()
233                        {
234                            let deletion_start = Instant::now();
235                            match self
236                                .process_hard_deletions(&batch_result.hard_deletion_candidates)
237                                .await
238                            {
239                                Ok(deleted_count) => {
240                                    hard_deletion_time +=
241                                        deletion_start.elapsed().as_millis() as u64;
242                                    hard_deletions += deleted_count;
243                                }
244                                Err(e) => {
245                                    warn!("Failed to process hard deletions: {}", e);
246                                    errors_encountered += 1;
247                                }
248                            }
249                        }
250
251                        total_processed += batch_result.processed_count;
252                        batches_processed += 1;
253                        decay_calc_time += batch_result.processing_time_ms;
254
255                        debug!(
256                            "Processed forgetting batch for {:?}: {} memories, {} decay updates, {} importance updates, {} deletion candidates",
257                            tier,
258                            batch_result.processed_count,
259                            batch_result.decay_updates.len(),
260                            batch_result.importance_updates.len(),
261                            batch_result.hard_deletion_candidates.len()
262                        );
263                    }
264                    Err(e) => {
265                        warn!("Failed to process forgetting batch for {:?}: {}", tier, e);
266                        errors_encountered += 1;
267                        break;
268                    }
269                }
270            }
271        }
272
273        let completed_at = Utc::now();
274        let total_time = start_time.elapsed();
275
276        let performance_metrics = ForgettingPerformanceMetrics {
277            memories_per_second: if total_time.as_secs_f64() > 0.0 {
278                total_processed as f64 / total_time.as_secs_f64()
279            } else {
280                0.0
281            },
282            total_processing_time_ms: total_time.as_millis() as u64,
283            decay_calculation_time_ms: decay_calc_time,
284            database_update_time_ms: db_update_time,
285            hard_deletion_time_ms: hard_deletion_time,
286            reinforcement_learning_time_ms: rl_time,
287        };
288
289        Ok(ForgettingJobResult {
290            run_id,
291            started_at,
292            completed_at,
293            total_memories_processed: total_processed,
294            decay_rates_updated: decay_updates,
295            importance_scores_updated: importance_updates,
296            memories_hard_deleted: hard_deletions,
297            batches_processed,
298            errors_encountered,
299            performance_metrics,
300        })
301    }
302
303    /// Process a batch of memories for forgetting operations
304    async fn process_forgetting_batch(&self, tier: MemoryTier) -> Result<ForgettingBatchResult> {
305        let start_time = Instant::now();
306
307        // Get memories that haven't been updated recently
308        let memories = self.get_memories_for_forgetting(tier).await?;
309
310        if memories.is_empty() {
311            return Ok(ForgettingBatchResult {
312                processed_count: 0,
313                decay_updates: Vec::new(),
314                importance_updates: Vec::new(),
315                hard_deletion_candidates: Vec::new(),
316                processing_time_ms: start_time.elapsed().as_millis() as u64,
317            });
318        }
319
320        let mut decay_updates = Vec::new();
321        let mut importance_updates = Vec::new();
322        let mut hard_deletion_candidates = Vec::new();
323
324        for memory in &memories {
325            // Calculate new decay rate based on tier and access patterns
326            let new_decay_rate = self.calculate_adaptive_decay_rate(memory, tier)?;
327            if (new_decay_rate - memory.decay_rate).abs() > 0.01 {
328                decay_updates.push((memory.id, new_decay_rate));
329            }
330
331            // Update importance score using reinforcement learning if enabled
332            if self.config.forgetting_config.enable_reinforcement_learning {
333                let new_importance = self.calculate_adaptive_importance(memory)?;
334                if (new_importance - memory.importance_score).abs() > 0.01 {
335                    importance_updates.push((memory.id, new_importance));
336                }
337            }
338
339            // Check if memory should be hard deleted
340            if self.config.forgetting_config.enable_hard_deletion {
341                let params = self.create_memory_parameters(memory)?;
342                let recall_result = self.math_engine.calculate_recall_probability(&params)?;
343
344                if recall_result.recall_probability
345                    < self.config.forgetting_config.hard_deletion_threshold
346                {
347                    // Check if memory is old enough for deletion
348                    let age_days = (Utc::now() - memory.created_at).num_days();
349                    if age_days >= self.config.forgetting_config.hard_deletion_retention_days as i64
350                    {
351                        hard_deletion_candidates.push(memory.id);
352                    }
353                }
354            }
355        }
356
357        Ok(ForgettingBatchResult {
358            processed_count: memories.len(),
359            decay_updates,
360            importance_updates,
361            hard_deletion_candidates,
362            processing_time_ms: start_time.elapsed().as_millis() as u64,
363        })
364    }
365
366    /// Calculate adaptive decay rate based on tier, access patterns, and importance
367    fn calculate_adaptive_decay_rate(&self, memory: &Memory, tier: MemoryTier) -> Result<f64> {
368        let base_rate = self.config.forgetting_config.base_decay_rate;
369
370        // Apply tier-specific multiplier
371        let tier_multiplier = match tier {
372            MemoryTier::Working => self.config.forgetting_config.working_decay_multiplier,
373            MemoryTier::Warm => self.config.forgetting_config.warm_decay_multiplier,
374            MemoryTier::Cold => self.config.forgetting_config.cold_decay_multiplier,
375            MemoryTier::Frozen => self.config.forgetting_config.cold_decay_multiplier * 1.5,
376        };
377
378        // Apply importance factor (higher importance = lower decay)
379        let importance_factor =
380            1.0 - (memory.importance_score * self.config.forgetting_config.importance_decay_factor);
381
382        // Apply age-based scaling
383        let age_days = (Utc::now() - memory.created_at).num_days() as f64;
384        let age_factor = if age_days > 0.0 {
385            1.0 + (age_days / 30.0)
386                .min(self.config.forgetting_config.max_age_decay_multiplier - 1.0)
387        } else {
388            1.0
389        };
390
391        // Apply access frequency factor (more access = slower decay)
392        let access_factor = if memory.access_count > 0 {
393            1.0 / (1.0 + (memory.access_count as f64).ln())
394        } else {
395            1.0
396        };
397
398        let new_decay_rate =
399            base_rate * tier_multiplier * importance_factor * age_factor * access_factor;
400
401        // Apply bounds
402        Ok(new_decay_rate
403            .max(self.config.forgetting_config.min_decay_rate)
404            .min(self.config.forgetting_config.max_decay_rate))
405    }
406
407    /// Calculate adaptive importance using reinforcement learning
408    fn calculate_adaptive_importance(&self, memory: &Memory) -> Result<f64> {
409        let learning_rate = self.config.forgetting_config.learning_rate;
410        let current_importance = memory.importance_score;
411
412        // Simple reinforcement learning based on access patterns
413        let access_frequency = memory.access_count as f64;
414        let recency_hours = memory
415            .last_accessed_at
416            .map(|last| (Utc::now() - last).num_seconds() as f64 / 3600.0)
417            .unwrap_or(f64::MAX);
418
419        // Reward frequent recent access, penalize infrequent old access
420        let access_reward = if recency_hours < 24.0 {
421            (access_frequency / (1.0 + recency_hours)).min(1.0)
422        } else {
423            0.0
424        };
425
426        // Apply reinforcement learning update
427        let importance_delta = learning_rate * (access_reward - 0.5); // Center around 0.5
428        let new_importance = (current_importance + importance_delta).max(0.0).min(1.0);
429
430        Ok(new_importance)
431    }
432
433    /// Create memory parameters for math engine calculations
434    fn create_memory_parameters(&self, memory: &Memory) -> Result<MemoryParameters> {
435        Ok(MemoryParameters {
436            consolidation_strength: memory.consolidation_strength,
437            decay_rate: memory.decay_rate,
438            last_accessed_at: memory.last_accessed_at,
439            created_at: memory.created_at,
440            access_count: memory.access_count,
441            importance_score: memory.importance_score,
442        })
443    }
444
445    /// Get memories that need forgetting processing
446    async fn get_memories_for_forgetting(&self, tier: MemoryTier) -> Result<Vec<Memory>> {
447        // Use repository method instead of direct SQL
448        self.repository
449            .get_memories_for_forgetting(tier, self.config.batch_size)
450            .await
451    }
452
453    /// Apply decay rate updates to the database
454    async fn apply_decay_updates(&self, updates: &[(Uuid, f64)]) -> Result<()> {
455        // Use repository method instead of direct SQL
456        self.repository.batch_update_decay_rates(updates).await?;
457        Ok(())
458    }
459
460    /// Apply importance score updates to the database
461    async fn apply_importance_updates(&self, updates: &[(Uuid, f64)]) -> Result<()> {
462        // Use repository method instead of direct SQL
463        self.repository
464            .batch_update_importance_scores(updates)
465            .await?;
466        Ok(())
467    }
468
469    /// Process hard deletion of completely forgotten memories
470    async fn process_hard_deletions(&self, candidates: &[Uuid]) -> Result<usize> {
471        if candidates.is_empty() {
472            return Ok(0);
473        }
474
475        // Use repository method instead of direct SQL
476        let deleted_count = self
477            .repository
478            .batch_soft_delete_memories(candidates)
479            .await?;
480
481        info!(
482            "Marked {} memories as deleted due to forgetting",
483            deleted_count
484        );
485        Ok(deleted_count)
486    }
487
488    /// Check if the job is currently running
489    pub fn is_running(&self) -> bool {
490        self.is_running.load(std::sync::atomic::Ordering::Relaxed)
491    }
492}
493
494/// Async function to run forgetting job in the background
495pub async fn spawn_forgetting_job(
496    config: ForgettingJobConfig,
497    repository: Arc<MemoryRepository>,
498) -> tokio::task::JoinHandle<Result<()>> {
499    tokio::spawn(async move {
500        let job = ForgettingJob::new(config, repository);
501        job.start().await
502    })
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508
509    #[test]
510    fn test_forgetting_job_config_defaults() {
511        let config = ForgettingJobConfig::default();
512
513        assert!(config.forgetting_config.enabled);
514        assert_eq!(config.batch_size, 1000);
515        assert_eq!(config.forgetting_config.cleanup_interval_seconds, 3600);
516        assert!(config.forgetting_config.enable_reinforcement_learning);
517        assert!(!config.forgetting_config.enable_hard_deletion); // Conservative default
518    }
519
520    #[test]
521    fn test_adaptive_decay_rate_calculation() {
522        let config = ForgettingJobConfig::default();
523
524        // Test tier-specific multipliers
525        assert_eq!(config.forgetting_config.working_decay_multiplier, 0.5);
526        assert_eq!(config.forgetting_config.warm_decay_multiplier, 1.0);
527        assert_eq!(config.forgetting_config.cold_decay_multiplier, 1.5);
528
529        // Test bounds
530        assert_eq!(config.forgetting_config.min_decay_rate, 0.1);
531        assert_eq!(config.forgetting_config.max_decay_rate, 5.0);
532    }
533
534    #[test]
535    fn test_reinforcement_learning_parameters() {
536        let config = ForgettingJobConfig::default();
537
538        assert!(config.forgetting_config.enable_reinforcement_learning);
539        assert_eq!(config.forgetting_config.learning_rate, 0.1);
540        assert_eq!(config.forgetting_config.importance_decay_factor, 0.5);
541    }
542}