1use super::error::{MemoryError, Result};
15use super::math_engine::{MathEngine, MemoryParameters};
16use super::models::*;
17use super::repository::MemoryRepository;
18use crate::config::ForgettingConfig;
19use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use tokio::time;
24use tracing::{debug, error, info, warn};
25use uuid::Uuid;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct ForgettingJobConfig {
30 pub forgetting_config: ForgettingConfig,
32
33 pub batch_size: usize,
35
36 pub max_batches_per_run: usize,
38
39 pub math_engine_config: super::math_engine::MathEngineConfig,
41}
42
43impl Default for ForgettingJobConfig {
44 fn default() -> Self {
45 Self {
46 forgetting_config: ForgettingConfig::default(),
47 batch_size: 1000,
48 max_batches_per_run: 10,
49 math_engine_config: super::math_engine::MathEngineConfig::default(),
50 }
51 }
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct ForgettingJobResult {
57 pub run_id: Uuid,
58 pub started_at: DateTime<Utc>,
59 pub completed_at: DateTime<Utc>,
60 pub total_memories_processed: usize,
61 pub decay_rates_updated: usize,
62 pub importance_scores_updated: usize,
63 pub memories_hard_deleted: usize,
64 pub batches_processed: usize,
65 pub errors_encountered: usize,
66 pub performance_metrics: ForgettingPerformanceMetrics,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct ForgettingPerformanceMetrics {
72 pub memories_per_second: f64,
73 pub total_processing_time_ms: u64,
74 pub decay_calculation_time_ms: u64,
75 pub database_update_time_ms: u64,
76 pub hard_deletion_time_ms: u64,
77 pub reinforcement_learning_time_ms: u64,
78}
79
80#[derive(Debug, Clone)]
82pub struct ForgettingBatchResult {
83 pub processed_count: usize,
84 pub decay_updates: Vec<(Uuid, f64)>, pub importance_updates: Vec<(Uuid, f64)>, pub hard_deletion_candidates: Vec<Uuid>,
87 pub processing_time_ms: u64,
88}
89
90pub struct ForgettingJob {
92 config: ForgettingJobConfig,
93 repository: Arc<MemoryRepository>,
94 math_engine: MathEngine,
95 is_running: std::sync::atomic::AtomicBool,
96}
97
98impl ForgettingJob {
99 pub fn new(config: ForgettingJobConfig, repository: Arc<MemoryRepository>) -> Self {
100 let math_engine = MathEngine::with_config(config.math_engine_config.clone());
101
102 Self {
103 config,
104 repository,
105 math_engine,
106 is_running: std::sync::atomic::AtomicBool::new(false),
107 }
108 }
109
110 pub async fn start(&self) -> Result<()> {
112 if self.is_running.load(std::sync::atomic::Ordering::Relaxed) {
113 return Err(MemoryError::InvalidRequest {
114 message: "Forgetting job is already running".to_string(),
115 });
116 }
117
118 if !self.config.forgetting_config.enabled {
119 info!("Forgetting job is disabled in configuration");
120 return Ok(());
121 }
122
123 self.is_running
124 .store(true, std::sync::atomic::Ordering::Relaxed);
125 info!("Starting forgetting background job");
126
127 let mut interval = time::interval(Duration::from_secs(
128 self.config.forgetting_config.cleanup_interval_seconds,
129 ));
130
131 while self.is_running.load(std::sync::atomic::Ordering::Relaxed) {
132 interval.tick().await;
133
134 match self.run_forgetting_cycle().await {
135 Ok(result) => {
136 info!(
137 "Forgetting cycle completed: {} memories processed, {} decay updates, {} importance updates, {} hard deletions, {:.1} mem/sec",
138 result.total_memories_processed,
139 result.decay_rates_updated,
140 result.importance_scores_updated,
141 result.memories_hard_deleted,
142 result.performance_metrics.memories_per_second
143 );
144 }
145 Err(e) => {
146 error!("Forgetting cycle failed: {}", e);
147 }
149 }
150 }
151
152 Ok(())
153 }
154
155 pub fn stop(&self) {
157 info!("Stopping forgetting background job");
158 self.is_running
159 .store(false, std::sync::atomic::Ordering::Relaxed);
160 }
161
162 pub async fn run_forgetting_cycle(&self) -> Result<ForgettingJobResult> {
164 let run_id = Uuid::new_v4();
165 let started_at = Utc::now();
166 let start_time = Instant::now();
167
168 debug!("Starting forgetting cycle {}", run_id);
169
170 let mut total_processed = 0;
171 let mut decay_updates = 0;
172 let mut importance_updates = 0;
173 let mut hard_deletions = 0;
174 let mut batches_processed = 0;
175 let mut errors_encountered = 0;
176 let mut decay_calc_time = 0u64;
177 let mut db_update_time = 0u64;
178 let mut hard_deletion_time = 0u64;
179 let mut rl_time = 0u64;
180
181 for tier in [
183 MemoryTier::Working,
184 MemoryTier::Warm,
185 MemoryTier::Cold,
186 MemoryTier::Frozen,
187 ] {
188 for _ in 0..self.config.max_batches_per_run {
189 let batch_start = Instant::now();
190
191 match self.process_forgetting_batch(tier).await {
192 Ok(batch_result) => {
193 if batch_result.processed_count == 0 {
194 break; }
196
197 if !batch_result.decay_updates.is_empty() {
199 let update_start = Instant::now();
200 match self.apply_decay_updates(&batch_result.decay_updates).await {
201 Ok(_) => {
202 db_update_time += update_start.elapsed().as_millis() as u64;
203 decay_updates += batch_result.decay_updates.len();
204 }
205 Err(e) => {
206 warn!("Failed to apply decay updates: {}", e);
207 errors_encountered += 1;
208 }
209 }
210 }
211
212 if !batch_result.importance_updates.is_empty() {
214 let rl_start = Instant::now();
215 match self
216 .apply_importance_updates(&batch_result.importance_updates)
217 .await
218 {
219 Ok(_) => {
220 rl_time += rl_start.elapsed().as_millis() as u64;
221 importance_updates += batch_result.importance_updates.len();
222 }
223 Err(e) => {
224 warn!("Failed to apply importance updates: {}", e);
225 errors_encountered += 1;
226 }
227 }
228 }
229
230 if self.config.forgetting_config.enable_hard_deletion
232 && !batch_result.hard_deletion_candidates.is_empty()
233 {
234 let deletion_start = Instant::now();
235 match self
236 .process_hard_deletions(&batch_result.hard_deletion_candidates)
237 .await
238 {
239 Ok(deleted_count) => {
240 hard_deletion_time +=
241 deletion_start.elapsed().as_millis() as u64;
242 hard_deletions += deleted_count;
243 }
244 Err(e) => {
245 warn!("Failed to process hard deletions: {}", e);
246 errors_encountered += 1;
247 }
248 }
249 }
250
251 total_processed += batch_result.processed_count;
252 batches_processed += 1;
253 decay_calc_time += batch_result.processing_time_ms;
254
255 debug!(
256 "Processed forgetting batch for {:?}: {} memories, {} decay updates, {} importance updates, {} deletion candidates",
257 tier,
258 batch_result.processed_count,
259 batch_result.decay_updates.len(),
260 batch_result.importance_updates.len(),
261 batch_result.hard_deletion_candidates.len()
262 );
263 }
264 Err(e) => {
265 warn!("Failed to process forgetting batch for {:?}: {}", tier, e);
266 errors_encountered += 1;
267 break;
268 }
269 }
270 }
271 }
272
273 let completed_at = Utc::now();
274 let total_time = start_time.elapsed();
275
276 let performance_metrics = ForgettingPerformanceMetrics {
277 memories_per_second: if total_time.as_secs_f64() > 0.0 {
278 total_processed as f64 / total_time.as_secs_f64()
279 } else {
280 0.0
281 },
282 total_processing_time_ms: total_time.as_millis() as u64,
283 decay_calculation_time_ms: decay_calc_time,
284 database_update_time_ms: db_update_time,
285 hard_deletion_time_ms: hard_deletion_time,
286 reinforcement_learning_time_ms: rl_time,
287 };
288
289 Ok(ForgettingJobResult {
290 run_id,
291 started_at,
292 completed_at,
293 total_memories_processed: total_processed,
294 decay_rates_updated: decay_updates,
295 importance_scores_updated: importance_updates,
296 memories_hard_deleted: hard_deletions,
297 batches_processed,
298 errors_encountered,
299 performance_metrics,
300 })
301 }
302
303 async fn process_forgetting_batch(&self, tier: MemoryTier) -> Result<ForgettingBatchResult> {
305 let start_time = Instant::now();
306
307 let memories = self.get_memories_for_forgetting(tier).await?;
309
310 if memories.is_empty() {
311 return Ok(ForgettingBatchResult {
312 processed_count: 0,
313 decay_updates: Vec::new(),
314 importance_updates: Vec::new(),
315 hard_deletion_candidates: Vec::new(),
316 processing_time_ms: start_time.elapsed().as_millis() as u64,
317 });
318 }
319
320 let mut decay_updates = Vec::new();
321 let mut importance_updates = Vec::new();
322 let mut hard_deletion_candidates = Vec::new();
323
324 for memory in &memories {
325 let new_decay_rate = self.calculate_adaptive_decay_rate(memory, tier)?;
327 if (new_decay_rate - memory.decay_rate).abs() > 0.01 {
328 decay_updates.push((memory.id, new_decay_rate));
329 }
330
331 if self.config.forgetting_config.enable_reinforcement_learning {
333 let new_importance = self.calculate_adaptive_importance(memory)?;
334 if (new_importance - memory.importance_score).abs() > 0.01 {
335 importance_updates.push((memory.id, new_importance));
336 }
337 }
338
339 if self.config.forgetting_config.enable_hard_deletion {
341 let params = self.create_memory_parameters(memory)?;
342 let recall_result = self.math_engine.calculate_recall_probability(¶ms)?;
343
344 if recall_result.recall_probability
345 < self.config.forgetting_config.hard_deletion_threshold
346 {
347 let age_days = (Utc::now() - memory.created_at).num_days();
349 if age_days >= self.config.forgetting_config.hard_deletion_retention_days as i64
350 {
351 hard_deletion_candidates.push(memory.id);
352 }
353 }
354 }
355 }
356
357 Ok(ForgettingBatchResult {
358 processed_count: memories.len(),
359 decay_updates,
360 importance_updates,
361 hard_deletion_candidates,
362 processing_time_ms: start_time.elapsed().as_millis() as u64,
363 })
364 }
365
366 fn calculate_adaptive_decay_rate(&self, memory: &Memory, tier: MemoryTier) -> Result<f64> {
368 let base_rate = self.config.forgetting_config.base_decay_rate;
369
370 let tier_multiplier = match tier {
372 MemoryTier::Working => self.config.forgetting_config.working_decay_multiplier,
373 MemoryTier::Warm => self.config.forgetting_config.warm_decay_multiplier,
374 MemoryTier::Cold => self.config.forgetting_config.cold_decay_multiplier,
375 MemoryTier::Frozen => self.config.forgetting_config.cold_decay_multiplier * 1.5,
376 };
377
378 let importance_factor =
380 1.0 - (memory.importance_score * self.config.forgetting_config.importance_decay_factor);
381
382 let age_days = (Utc::now() - memory.created_at).num_days() as f64;
384 let age_factor = if age_days > 0.0 {
385 1.0 + (age_days / 30.0)
386 .min(self.config.forgetting_config.max_age_decay_multiplier - 1.0)
387 } else {
388 1.0
389 };
390
391 let access_factor = if memory.access_count > 0 {
393 1.0 / (1.0 + (memory.access_count as f64).ln())
394 } else {
395 1.0
396 };
397
398 let new_decay_rate =
399 base_rate * tier_multiplier * importance_factor * age_factor * access_factor;
400
401 Ok(new_decay_rate
403 .max(self.config.forgetting_config.min_decay_rate)
404 .min(self.config.forgetting_config.max_decay_rate))
405 }
406
407 fn calculate_adaptive_importance(&self, memory: &Memory) -> Result<f64> {
409 let learning_rate = self.config.forgetting_config.learning_rate;
410 let current_importance = memory.importance_score;
411
412 let access_frequency = memory.access_count as f64;
414 let recency_hours = memory
415 .last_accessed_at
416 .map(|last| (Utc::now() - last).num_seconds() as f64 / 3600.0)
417 .unwrap_or(f64::MAX);
418
419 let access_reward = if recency_hours < 24.0 {
421 (access_frequency / (1.0 + recency_hours)).min(1.0)
422 } else {
423 0.0
424 };
425
426 let importance_delta = learning_rate * (access_reward - 0.5); let new_importance = (current_importance + importance_delta).max(0.0).min(1.0);
429
430 Ok(new_importance)
431 }
432
433 fn create_memory_parameters(&self, memory: &Memory) -> Result<MemoryParameters> {
435 Ok(MemoryParameters {
436 consolidation_strength: memory.consolidation_strength,
437 decay_rate: memory.decay_rate,
438 last_accessed_at: memory.last_accessed_at,
439 created_at: memory.created_at,
440 access_count: memory.access_count,
441 importance_score: memory.importance_score,
442 })
443 }
444
445 async fn get_memories_for_forgetting(&self, tier: MemoryTier) -> Result<Vec<Memory>> {
447 self.repository
449 .get_memories_for_forgetting(tier, self.config.batch_size)
450 .await
451 }
452
453 async fn apply_decay_updates(&self, updates: &[(Uuid, f64)]) -> Result<()> {
455 self.repository.batch_update_decay_rates(updates).await?;
457 Ok(())
458 }
459
460 async fn apply_importance_updates(&self, updates: &[(Uuid, f64)]) -> Result<()> {
462 self.repository
464 .batch_update_importance_scores(updates)
465 .await?;
466 Ok(())
467 }
468
469 async fn process_hard_deletions(&self, candidates: &[Uuid]) -> Result<usize> {
471 if candidates.is_empty() {
472 return Ok(0);
473 }
474
475 let deleted_count = self
477 .repository
478 .batch_soft_delete_memories(candidates)
479 .await?;
480
481 info!(
482 "Marked {} memories as deleted due to forgetting",
483 deleted_count
484 );
485 Ok(deleted_count)
486 }
487
488 pub fn is_running(&self) -> bool {
490 self.is_running.load(std::sync::atomic::Ordering::Relaxed)
491 }
492}
493
494pub async fn spawn_forgetting_job(
496 config: ForgettingJobConfig,
497 repository: Arc<MemoryRepository>,
498) -> tokio::task::JoinHandle<Result<()>> {
499 tokio::spawn(async move {
500 let job = ForgettingJob::new(config, repository);
501 job.start().await
502 })
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508
509 #[test]
510 fn test_forgetting_job_config_defaults() {
511 let config = ForgettingJobConfig::default();
512
513 assert!(config.forgetting_config.enabled);
514 assert_eq!(config.batch_size, 1000);
515 assert_eq!(config.forgetting_config.cleanup_interval_seconds, 3600);
516 assert!(config.forgetting_config.enable_reinforcement_learning);
517 assert!(!config.forgetting_config.enable_hard_deletion); }
519
520 #[test]
521 fn test_adaptive_decay_rate_calculation() {
522 let config = ForgettingJobConfig::default();
523
524 assert_eq!(config.forgetting_config.working_decay_multiplier, 0.5);
526 assert_eq!(config.forgetting_config.warm_decay_multiplier, 1.0);
527 assert_eq!(config.forgetting_config.cold_decay_multiplier, 1.5);
528
529 assert_eq!(config.forgetting_config.min_decay_rate, 0.1);
531 assert_eq!(config.forgetting_config.max_decay_rate, 5.0);
532 }
533
534 #[test]
535 fn test_reinforcement_learning_parameters() {
536 let config = ForgettingJobConfig::default();
537
538 assert!(config.forgetting_config.enable_reinforcement_learning);
539 assert_eq!(config.forgetting_config.learning_rate, 0.1);
540 assert_eq!(config.forgetting_config.importance_decay_factor, 0.5);
541 }
542}