codex_memory/memory/
background_reflection_service.rs

1//! Background Reflection Service
2//!
3//! This module implements a background service that continuously monitors memory accumulation
4//! and triggers reflection sessions to generate insights and meta-memories. The design follows
5//! cognitive science principles for metacognitive processing and adaptive learning.
6//!
7//! ## Cognitive Science Foundation
8//!
9//! ### Research Basis
10//! 1. **Metacognitive Monitoring (Nelson & Narens, 1990)**: Continuous assessment of knowledge state
11//! 2. **Consolidation During Rest (Diekelmann & Born, 2010)**: Memory consolidation occurs during downtime
12//! 3. **Insight Formation (Kounios & Beeman, 2014)**: Sudden realization from unconscious processing
13//! 4. **Schema Building (Ghosh & Gilboa, 2014)**: Progressive abstraction of experience patterns
14//! 5. **Default Mode Network (Buckner et al., 2008)**: Brain's intrinsic activity during rest
15//!
16//! ## Service Architecture
17//!
18//! ### Core Components
19//! 1. **Reflection Monitor**: Tracks conditions that warrant reflection
20//! 2. **Session Scheduler**: Manages timing and prioritization of reflection sessions
21//! 3. **Insight Processor**: Handles generated insights and meta-memory creation
22//! 4. **Quality Controller**: Ensures insight quality and prevents loops
23//! 5. **Metrics Collector**: Monitors service performance and effectiveness
24//!
25//! ### Triggering Conditions
26//! - **Importance Accumulation**: Total importance exceeds configured threshold
27//! - **Temporal Patterns**: Regular intervals for maintenance reflection
28//! - **Semantic Density**: High concentration of related memories
29//! - **Contradiction Detection**: Conflicting information requiring resolution
30//! - **Manual Triggers**: Explicit reflection requests
31//!
32//! ## Performance Requirements
33//! - **Background Operation**: <100ms impact on primary memory operations
34//! - **Concurrent Safety**: Thread-safe operation with memory system
35//! - **Resource Management**: Bounded memory usage and CPU consumption
36//! - **Graceful Degradation**: Continue core operations if reflection fails
37
38use super::error::{MemoryError, Result};
39use super::insight_loop_prevention::{LoopPreventionEngine, PreventionAction};
40use super::models::*;
41use super::reflection_engine::{Insight, ReflectionConfig, ReflectionEngine, ReflectionSession};
42use super::repository::MemoryRepository;
43
44use chrono::{DateTime, Duration, Utc};
45use serde::{Deserialize, Serialize};
46use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
47use std::sync::Arc;
48use tokio::sync::{RwLock, Semaphore};
49use tokio::time::{interval, sleep, Instant};
50use tracing::{debug, error, info, warn};
51use uuid::Uuid;
52
53/// Configuration for the background reflection service
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct BackgroundReflectionConfig {
56    /// Enable the background reflection service
57    pub enabled: bool,
58
59    /// Interval between background reflection checks (minutes)
60    pub check_interval_minutes: u64,
61
62    /// Minimum time between reflection sessions (minutes)
63    pub min_reflection_interval_minutes: u64,
64
65    /// Maximum concurrent reflection sessions
66    pub max_concurrent_sessions: usize,
67
68    /// Timeout for individual reflection sessions (minutes)
69    pub session_timeout_minutes: u64,
70
71    /// Enable automatic insight storage as memories
72    pub store_insights_as_memories: bool,
73
74    /// Enable quality filtering for generated insights
75    pub enable_quality_filtering: bool,
76
77    /// Enable performance monitoring
78    pub enable_metrics: bool,
79
80    /// Maximum retries for failed reflection attempts
81    pub max_retry_attempts: u32,
82
83    /// Backoff multiplier for retry delays (exponential backoff)
84    pub retry_backoff_multiplier: f64,
85
86    /// Priority thresholds for different trigger types
87    pub priority_thresholds: PriorityThresholds,
88}
89
90/// Priority thresholds for different reflection triggers
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct PriorityThresholds {
93    /// High priority: Immediate reflection needed
94    pub high_importance_threshold: f64,
95
96    /// Medium priority: Reflection should occur soon
97    pub medium_importance_threshold: f64,
98
99    /// Low priority: Routine maintenance reflection
100    pub low_importance_threshold: f64,
101
102    /// Critical: System-wide patterns requiring urgent attention
103    pub critical_pattern_threshold: f64,
104}
105
106impl Default for PriorityThresholds {
107    fn default() -> Self {
108        Self {
109            high_importance_threshold: 300.0,
110            medium_importance_threshold: 200.0,
111            low_importance_threshold: 100.0,
112            critical_pattern_threshold: 500.0,
113        }
114    }
115}
116
117impl Default for BackgroundReflectionConfig {
118    fn default() -> Self {
119        Self {
120            enabled: true,
121            check_interval_minutes: 15,
122            min_reflection_interval_minutes: 60,
123            max_concurrent_sessions: 2,
124            session_timeout_minutes: 10,
125            store_insights_as_memories: true,
126            enable_quality_filtering: true,
127            enable_metrics: true,
128            max_retry_attempts: 3,
129            retry_backoff_multiplier: 2.0,
130            priority_thresholds: PriorityThresholds::default(),
131        }
132    }
133}
134
135/// Priority levels for reflection sessions
136#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
137pub enum ReflectionPriority {
138    Low = 1,
139    Medium = 2,
140    High = 3,
141    Critical = 4,
142}
143
144/// Trigger information for reflection sessions
145#[derive(Debug, Clone)]
146pub struct ReflectionTrigger {
147    pub priority: ReflectionPriority,
148    pub trigger_type: TriggerType,
149    pub trigger_reason: String,
150    pub accumulated_importance: f64,
151    pub memory_count: usize,
152    pub triggered_at: DateTime<Utc>,
153}
154
155/// Types of reflection triggers
156#[derive(Debug, Clone, PartialEq)]
157pub enum TriggerType {
158    ImportanceAccumulation,
159    TemporalMaintenance,
160    SemanticDensity,
161    ContradictionDetection,
162    ManualRequest,
163    SystemMaintenance,
164}
165
166/// Service metrics for monitoring performance
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct ReflectionServiceMetrics {
169    pub service_uptime_hours: f64,
170    pub total_reflections_completed: u64,
171    pub total_insights_generated: u64,
172    pub total_meta_memories_created: u64,
173    pub average_session_duration_ms: f64,
174    pub average_insights_per_session: f64,
175    pub quality_rejection_rate: f64,
176    pub current_active_sessions: usize,
177    pub last_reflection_time: Option<DateTime<Utc>>,
178    pub trigger_type_distribution: std::collections::HashMap<String, u64>,
179    pub performance_impact_ms: f64,
180    pub last_updated: DateTime<Utc>,
181}
182
183impl Default for ReflectionServiceMetrics {
184    fn default() -> Self {
185        Self {
186            service_uptime_hours: 0.0,
187            total_reflections_completed: 0,
188            total_insights_generated: 0,
189            total_meta_memories_created: 0,
190            average_session_duration_ms: 0.0,
191            average_insights_per_session: 0.0,
192            quality_rejection_rate: 0.0,
193            current_active_sessions: 0,
194            last_reflection_time: None,
195            trigger_type_distribution: std::collections::HashMap::new(),
196            performance_impact_ms: 0.0,
197            last_updated: Utc::now(),
198        }
199    }
200}
201
202/// Main background reflection service
203pub struct BackgroundReflectionService {
204    config: BackgroundReflectionConfig,
205    repository: Arc<MemoryRepository>,
206    reflection_engine: Arc<RwLock<ReflectionEngine>>,
207    loop_prevention_engine: Arc<RwLock<LoopPreventionEngine>>,
208
209    // Service state
210    is_running: AtomicBool,
211    session_semaphore: Arc<Semaphore>,
212    metrics: Arc<RwLock<ReflectionServiceMetrics>>,
213    service_start_time: DateTime<Utc>,
214
215    // Performance tracking
216    total_sessions: AtomicU64,
217    total_insights: AtomicU64,
218    total_processing_time_ms: AtomicU64,
219}
220
221impl BackgroundReflectionService {
222    /// Create a new background reflection service
223    pub fn new(
224        config: BackgroundReflectionConfig,
225        repository: Arc<MemoryRepository>,
226        reflection_config: ReflectionConfig,
227        loop_prevention_config: super::insight_loop_prevention::LoopPreventionConfig,
228    ) -> Self {
229        let reflection_engine = Arc::new(RwLock::new(ReflectionEngine::new(
230            reflection_config,
231            repository.clone(),
232        )));
233
234        let loop_prevention_engine = Arc::new(RwLock::new(LoopPreventionEngine::new(
235            loop_prevention_config,
236        )));
237
238        let session_semaphore = Arc::new(Semaphore::new(config.max_concurrent_sessions));
239        let metrics = Arc::new(RwLock::new(ReflectionServiceMetrics::default()));
240        let service_start_time = Utc::now();
241
242        Self {
243            config,
244            repository,
245            reflection_engine,
246            loop_prevention_engine,
247            is_running: AtomicBool::new(false),
248            session_semaphore,
249            metrics,
250            service_start_time,
251            total_sessions: AtomicU64::new(0),
252            total_insights: AtomicU64::new(0),
253            total_processing_time_ms: AtomicU64::new(0),
254        }
255    }
256
257    /// Start the background reflection service
258    pub async fn start(&self) -> Result<()> {
259        if self.is_running.swap(true, Ordering::SeqCst) {
260            return Err(MemoryError::InvalidRequest {
261                message: "Background reflection service is already running".to_string(),
262            });
263        }
264
265        if !self.config.enabled {
266            info!("Background reflection service is disabled in configuration");
267            return Ok(());
268        }
269
270        info!("Starting background reflection service");
271
272        // Spawn the main monitoring loop
273        let service = self.clone_for_task();
274        tokio::spawn(async move {
275            if let Err(e) = service.monitoring_loop().await {
276                error!(
277                    "Background reflection service encountered fatal error: {}",
278                    e
279                );
280            }
281        });
282
283        // Spawn metrics update task if enabled
284        if self.config.enable_metrics {
285            let service = self.clone_for_task();
286            tokio::spawn(async move {
287                service.metrics_update_loop().await;
288            });
289        }
290
291        info!("Background reflection service started successfully");
292        Ok(())
293    }
294
295    /// Stop the background reflection service
296    pub async fn stop(&self) -> Result<()> {
297        if !self.is_running.swap(false, Ordering::SeqCst) {
298            return Err(MemoryError::InvalidRequest {
299                message: "Background reflection service is not running".to_string(),
300            });
301        }
302
303        info!("Stopping background reflection service");
304
305        // Wait for active sessions to complete (with timeout)
306        let timeout = Duration::minutes(self.config.session_timeout_minutes as i64);
307        let deadline = Utc::now() + timeout;
308
309        while Utc::now() < deadline {
310            let active_sessions = self.session_semaphore.available_permits();
311            if active_sessions == self.config.max_concurrent_sessions {
312                break;
313            }
314            sleep(std::time::Duration::from_millis(100)).await;
315        }
316
317        info!("Background reflection service stopped");
318        Ok(())
319    }
320
321    /// Check if the service is currently running
322    pub fn is_running(&self) -> bool {
323        self.is_running.load(Ordering::SeqCst)
324    }
325
326    /// Manually trigger a reflection session
327    pub async fn trigger_manual_reflection(&self, reason: String) -> Result<Uuid> {
328        let trigger = ReflectionTrigger {
329            priority: ReflectionPriority::Medium,
330            trigger_type: TriggerType::ManualRequest,
331            trigger_reason: reason,
332            accumulated_importance: 0.0,
333            memory_count: 0,
334            triggered_at: Utc::now(),
335        };
336
337        self.execute_reflection_session(trigger).await
338    }
339
340    /// Get current service metrics
341    pub async fn get_metrics(&self) -> ReflectionServiceMetrics {
342        let mut metrics = self.metrics.read().await.clone();
343
344        // Update real-time metrics
345        metrics.service_uptime_hours = Utc::now()
346            .signed_duration_since(self.service_start_time)
347            .num_seconds() as f64
348            / 3600.0;
349        metrics.total_reflections_completed = self.total_sessions.load(Ordering::SeqCst);
350        metrics.total_insights_generated = self.total_insights.load(Ordering::SeqCst);
351        metrics.current_active_sessions =
352            self.config.max_concurrent_sessions - self.session_semaphore.available_permits();
353
354        let total_time = self.total_processing_time_ms.load(Ordering::SeqCst);
355        let total_sessions = self.total_sessions.load(Ordering::SeqCst);
356        if total_sessions > 0 {
357            metrics.average_session_duration_ms = total_time as f64 / total_sessions as f64;
358        }
359
360        let total_insights = self.total_insights.load(Ordering::SeqCst);
361        if total_sessions > 0 {
362            metrics.average_insights_per_session = total_insights as f64 / total_sessions as f64;
363        }
364
365        metrics.last_updated = Utc::now();
366        metrics
367    }
368
369    /// Main monitoring loop for the background service
370    async fn monitoring_loop(&self) -> Result<()> {
371        let mut interval = interval(std::time::Duration::from_secs(
372            self.config.check_interval_minutes * 60,
373        ));
374
375        info!(
376            "Background reflection monitoring started (check interval: {} minutes)",
377            self.config.check_interval_minutes
378        );
379
380        while self.is_running.load(Ordering::SeqCst) {
381            interval.tick().await;
382
383            let check_start = Instant::now();
384
385            // Check for reflection triggers
386            match self.check_reflection_triggers().await {
387                Ok(Some(trigger)) => {
388                    info!(
389                        "Reflection trigger detected: {:?} priority, reason: {}",
390                        trigger.priority, trigger.trigger_reason
391                    );
392
393                    // Execute reflection in background to avoid blocking the monitoring loop
394                    let service = self.clone_for_task();
395                    tokio::spawn(async move {
396                        if let Err(e) = service.execute_reflection_session(trigger).await {
397                            warn!("Background reflection session failed: {}", e);
398                        }
399                    });
400                }
401                Ok(None) => {
402                    debug!("No reflection triggers detected");
403                }
404                Err(e) => {
405                    warn!("Error checking reflection triggers: {}", e);
406                }
407            }
408
409            // Track performance impact
410            let check_duration = check_start.elapsed().as_millis() as f64;
411            let mut metrics = self.metrics.write().await;
412            metrics.performance_impact_ms = check_duration;
413        }
414
415        info!("Background reflection monitoring stopped");
416        Ok(())
417    }
418
419    /// Check for conditions that should trigger reflection
420    async fn check_reflection_triggers(&self) -> Result<Option<ReflectionTrigger>> {
421        // Check if enough time has passed since last reflection
422        let metrics = self.metrics.read().await;
423        if let Some(last_reflection) = metrics.last_reflection_time {
424            let time_since_last = Utc::now().signed_duration_since(last_reflection);
425            if time_since_last.num_minutes() < self.config.min_reflection_interval_minutes as i64 {
426                return Ok(None);
427            }
428        }
429        drop(metrics);
430
431        // Check with reflection engine for triggers
432        let should_reflect = self
433            .reflection_engine
434            .read()
435            .await
436            .should_trigger_reflection()
437            .await?;
438
439        if let Some(reason) = should_reflect {
440            // Calculate accumulated importance and memory count
441            let accumulated_importance = self.calculate_accumulated_importance().await?;
442            let memory_count = self.get_recent_memory_count().await?;
443
444            // Determine priority based on importance threshold
445            let priority = self.determine_priority(accumulated_importance);
446
447            return Ok(Some(ReflectionTrigger {
448                priority,
449                trigger_type: TriggerType::ImportanceAccumulation,
450                trigger_reason: reason,
451                accumulated_importance,
452                memory_count,
453                triggered_at: Utc::now(),
454            }));
455        }
456
457        // Check for temporal maintenance trigger
458        if self.should_trigger_maintenance_reflection().await? {
459            return Ok(Some(ReflectionTrigger {
460                priority: ReflectionPriority::Low,
461                trigger_type: TriggerType::TemporalMaintenance,
462                trigger_reason: "Scheduled maintenance reflection".to_string(),
463                accumulated_importance: 0.0,
464                memory_count: 0,
465                triggered_at: Utc::now(),
466            }));
467        }
468
469        Ok(None)
470    }
471
472    /// Execute a reflection session with full error handling and metrics
473    async fn execute_reflection_session(&self, trigger: ReflectionTrigger) -> Result<Uuid> {
474        // Acquire semaphore permit to limit concurrent sessions
475        let _permit =
476            self.session_semaphore
477                .acquire()
478                .await
479                .map_err(|_| MemoryError::InvalidRequest {
480                    message: "Failed to acquire reflection session permit".to_string(),
481                })?;
482
483        let session_start = Instant::now();
484        let session_id = Uuid::new_v4();
485
486        info!(
487            "Starting reflection session {} (trigger: {:?})",
488            session_id, trigger.trigger_type
489        );
490
491        // Update metrics for trigger type
492        {
493            let mut metrics = self.metrics.write().await;
494            let trigger_name = format!("{:?}", trigger.trigger_type);
495            *metrics
496                .trigger_type_distribution
497                .entry(trigger_name)
498                .or_insert(0) += 1;
499        }
500
501        let mut retry_count = 0;
502        let mut last_error = None;
503
504        while retry_count <= self.config.max_retry_attempts {
505            match self.execute_reflection_with_timeout(trigger.clone()).await {
506                Ok(session) => {
507                    let session_duration = session_start.elapsed();
508
509                    // Process generated insights
510                    if let Err(e) = self.process_session_insights(&session).await {
511                        warn!(
512                            "Failed to process insights from session {}: {}",
513                            session_id, e
514                        );
515                    }
516
517                    // Update metrics
518                    self.update_session_metrics(&session, session_duration)
519                        .await;
520
521                    info!(
522                        "Reflection session {} completed successfully: {} insights generated in {:?}",
523                        session_id,
524                        session.generated_insights.len(),
525                        session_duration
526                    );
527
528                    return Ok(session.id);
529                }
530                Err(e) => {
531                    retry_count += 1;
532                    last_error = Some(e);
533
534                    if retry_count <= self.config.max_retry_attempts {
535                        let delay_ms = (self
536                            .config
537                            .retry_backoff_multiplier
538                            .powi(retry_count as i32 - 1)
539                            * 1000.0) as u64;
540                        warn!(
541                            "Reflection session {} failed (attempt {}), retrying in {}ms: {}",
542                            session_id,
543                            retry_count,
544                            delay_ms,
545                            last_error.as_ref().unwrap()
546                        );
547                        sleep(std::time::Duration::from_millis(delay_ms)).await;
548                    }
549                }
550            }
551        }
552
553        let final_error = last_error.unwrap_or_else(|| MemoryError::InvalidRequest {
554            message: "Unknown error in reflection session".to_string(),
555        });
556
557        error!(
558            "Reflection session {} failed after {} attempts: {}",
559            session_id, retry_count, final_error
560        );
561
562        Err(final_error)
563    }
564
565    /// Execute reflection with timeout protection
566    async fn execute_reflection_with_timeout(
567        &self,
568        trigger: ReflectionTrigger,
569    ) -> Result<ReflectionSession> {
570        let timeout_duration =
571            std::time::Duration::from_secs(self.config.session_timeout_minutes * 60);
572
573        let reflection_future = async {
574            let mut engine = self.reflection_engine.write().await;
575            engine.execute_reflection(trigger.trigger_reason).await
576        };
577
578        match tokio::time::timeout(timeout_duration, reflection_future).await {
579            Ok(result) => result,
580            Err(_) => Err(MemoryError::InvalidRequest {
581                message: format!(
582                    "Reflection session timed out after {} minutes",
583                    self.config.session_timeout_minutes
584                ),
585            }),
586        }
587    }
588
589    /// Process insights from a completed reflection session
590    async fn process_session_insights(&self, session: &ReflectionSession) -> Result<()> {
591        if session.generated_insights.is_empty() {
592            debug!("No insights generated in session {}", session.id);
593            return Ok(());
594        }
595
596        // First, store the reflection session in the database
597        self.store_reflection_session(session).await?;
598
599        let mut processed_insights = 0;
600        let mut stored_as_memories = 0;
601        let mut quality_rejected = 0;
602
603        for insight in &session.generated_insights {
604            // Apply quality filtering if enabled
605            if self.config.enable_quality_filtering {
606                let validation_result = self
607                    .loop_prevention_engine
608                    .write()
609                    .await
610                    .validate_insight(insight)?;
611
612                match validation_result.prevention_action {
613                    PreventionAction::RejectInsight => {
614                        debug!("Insight {} rejected by quality filter", insight.id);
615                        quality_rejected += 1;
616                        continue;
617                    }
618                    PreventionAction::ModifyInsight => {
619                        debug!("Insight {} flagged for modification", insight.id);
620                        // Could implement insight modification here
621                    }
622                    _ => {}
623                }
624            }
625
626            // Store insight in insights table
627            if let Err(e) = self.store_insight_in_database(insight).await {
628                warn!("Failed to store insight {} in database: {}", insight.id, e);
629                continue;
630            }
631
632            // Store as memory if configured
633            if self.config.store_insights_as_memories {
634                match self.store_insight_as_memory(insight).await {
635                    Ok(memory) => {
636                        // Link the insight to the memory in the database
637                        self.link_insight_to_memory(insight.id, memory.id).await?;
638                        stored_as_memories += 1;
639                    }
640                    Err(e) => {
641                        warn!("Failed to store insight {} as memory: {}", insight.id, e);
642                    }
643                }
644            }
645
646            processed_insights += 1;
647        }
648
649        // Update service metrics
650        self.total_insights
651            .fetch_add(processed_insights, Ordering::SeqCst);
652
653        // Update quality metrics
654        {
655            let mut metrics = self.metrics.write().await;
656            let total_insights = session.generated_insights.len() as f64;
657            if total_insights > 0.0 {
658                metrics.quality_rejection_rate = quality_rejected as f64 / total_insights;
659            }
660            metrics.total_meta_memories_created += stored_as_memories;
661        }
662
663        info!(
664            "Processed {} insights from session {}: {} stored as memories, {} quality rejected",
665            processed_insights, session.id, stored_as_memories, quality_rejected
666        );
667
668        Ok(())
669    }
670
671    /// Store insight in the insights database table
672    async fn store_insight_in_database(&self, insight: &Insight) -> Result<()> {
673        let query = r#"
674            INSERT INTO insights (
675                id, insight_type, content, confidence_score, source_memory_ids,
676                related_concepts, importance_score, novelty_score, coherence_score,
677                evidence_strength, semantic_richness, predictive_power, generated_at
678            ) VALUES (
679                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
680            )
681            ON CONFLICT (id) DO UPDATE SET
682                confidence_score = EXCLUDED.confidence_score,
683                importance_score = EXCLUDED.importance_score,
684                updated_at = NOW()
685        "#;
686
687        let insight_type_str = match insight.insight_type {
688            super::reflection_engine::InsightType::Pattern => "pattern",
689            super::reflection_engine::InsightType::Synthesis => "synthesis",
690            super::reflection_engine::InsightType::Gap => "gap",
691            super::reflection_engine::InsightType::Contradiction => "contradiction",
692            super::reflection_engine::InsightType::Trend => "trend",
693            super::reflection_engine::InsightType::Causality => "causality",
694            super::reflection_engine::InsightType::Analogy => "analogy",
695        };
696
697        sqlx::query(query)
698            .bind(insight.id)
699            .bind(insight_type_str)
700            .bind(&insight.content)
701            .bind(insight.confidence_score)
702            .bind(&insight.source_memory_ids)
703            .bind(&insight.related_concepts)
704            .bind(insight.importance_score)
705            .bind(insight.validation_metrics.novelty_score)
706            .bind(insight.validation_metrics.coherence_score)
707            .bind(insight.validation_metrics.evidence_strength)
708            .bind(insight.validation_metrics.semantic_richness)
709            .bind(insight.validation_metrics.predictive_power)
710            .bind(insight.generated_at)
711            .execute(self.repository.pool())
712            .await
713            .map_err(|e| MemoryError::DatabaseError {
714                message: format!("Failed to store insight in database: {e}"),
715            })?;
716
717        debug!("Successfully stored insight {} in database", insight.id);
718        Ok(())
719    }
720
721    /// Store insight as a memory with enhanced importance scoring
722    async fn store_insight_as_memory(&self, insight: &Insight) -> Result<Memory> {
723        let importance_score = insight.importance_score * 1.5; // Apply 1.5x multiplier
724        let importance_score = importance_score.min(1.0); // Cap at 1.0
725
726        let metadata = serde_json::json!({
727            "insight_type": insight.insight_type,
728            "confidence_score": insight.confidence_score,
729            "source_memory_ids": insight.source_memory_ids,
730            "related_concepts": insight.related_concepts,
731            "validation_metrics": insight.validation_metrics,
732            "is_meta_memory": true,
733            "generated_by": "background_reflection_service",
734            "original_insight_id": insight.id
735        });
736
737        let create_request = CreateMemoryRequest {
738            content: insight.content.clone(),
739            embedding: None,                 // Would generate embedding in production
740            tier: Some(MemoryTier::Working), // Start insights in working tier
741            importance_score: Some(importance_score),
742            metadata: Some(metadata),
743            parent_id: None,
744            expires_at: None,
745        };
746
747        self.repository.create_memory(create_request).await
748    }
749
750    /// Store reflection session in the database
751    async fn store_reflection_session(&self, session: &ReflectionSession) -> Result<()> {
752        let status_str = match session.completion_status {
753            super::reflection_engine::ReflectionStatus::InProgress => "in_progress",
754            super::reflection_engine::ReflectionStatus::Completed => "completed",
755            super::reflection_engine::ReflectionStatus::Failed => "failed",
756            super::reflection_engine::ReflectionStatus::Cancelled => "cancelled",
757        };
758
759        let completed_at =
760            if session.completion_status == super::reflection_engine::ReflectionStatus::Completed {
761                Some(chrono::Utc::now())
762            } else {
763                None
764            };
765
766        let query = r#"
767            INSERT INTO reflection_sessions (
768                id, trigger_reason, started_at, completed_at, status,
769                analyzed_memory_count, generated_cluster_count, generated_insight_count,
770                config_snapshot, results_summary
771            ) VALUES (
772                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
773            )
774            ON CONFLICT (id) DO UPDATE SET
775                completed_at = EXCLUDED.completed_at,
776                status = EXCLUDED.status,
777                analyzed_memory_count = EXCLUDED.analyzed_memory_count,
778                generated_cluster_count = EXCLUDED.generated_cluster_count,
779                generated_insight_count = EXCLUDED.generated_insight_count,
780                results_summary = EXCLUDED.results_summary
781        "#;
782
783        let config_snapshot = serde_json::json!({
784            "background_reflection_enabled": true,
785            "service_version": "1.0.0"
786        });
787
788        let results_summary = serde_json::json!({
789            "insights_generated": session.generated_insights.len(),
790            "clusters_analyzed": session.generated_clusters.len(),
791            "memories_processed": session.analyzed_memories.len(),
792            "knowledge_graph_updates": session.knowledge_graph_updates.len()
793        });
794
795        sqlx::query(query)
796            .bind(session.id)
797            .bind(&session.trigger_reason)
798            .bind(session.started_at)
799            .bind(completed_at)
800            .bind(status_str)
801            .bind(session.analyzed_memories.len() as i32)
802            .bind(session.generated_clusters.len() as i32)
803            .bind(session.generated_insights.len() as i32)
804            .bind(config_snapshot)
805            .bind(results_summary)
806            .execute(self.repository.pool())
807            .await
808            .map_err(|e| MemoryError::DatabaseError {
809                message: format!("Failed to store reflection session: {e}"),
810            })?;
811
812        debug!(
813            "Successfully stored reflection session {} in database",
814            session.id
815        );
816        Ok(())
817    }
818
819    /// Link an insight to its corresponding memory in the database
820    async fn link_insight_to_memory(&self, insight_id: Uuid, memory_id: Uuid) -> Result<()> {
821        let query = "UPDATE insights SET memory_id = $1 WHERE id = $2";
822
823        sqlx::query(query)
824            .bind(memory_id)
825            .bind(insight_id)
826            .execute(self.repository.pool())
827            .await
828            .map_err(|e| MemoryError::DatabaseError {
829                message: format!("Failed to link insight to memory: {e}"),
830            })?;
831
832        debug!(
833            "Successfully linked insight {} to memory {}",
834            insight_id, memory_id
835        );
836        Ok(())
837    }
838
839    /// Helper methods for trigger detection
840
841    async fn calculate_accumulated_importance(&self) -> Result<f64> {
842        // Calculate cutoff time based on last reflection
843        let cutoff_time = {
844            let metrics = self.metrics.read().await;
845            metrics
846                .last_reflection_time
847                .unwrap_or_else(|| chrono::Utc::now() - chrono::Duration::hours(24))
848        };
849
850        let query = r#"
851            SELECT COALESCE(SUM(importance_score), 0.0) as total_importance
852            FROM memories 
853            WHERE status = 'active' 
854            AND created_at > $1
855        "#;
856
857        let total_importance: f64 = sqlx::query_scalar(query)
858            .bind(cutoff_time)
859            .fetch_one(self.repository.pool())
860            .await
861            .map_err(|e| MemoryError::DatabaseError {
862                message: format!("Failed to calculate accumulated importance: {e}"),
863            })?;
864
865        debug!("Calculated accumulated importance: {:.2}", total_importance);
866        Ok(total_importance)
867    }
868
869    async fn get_recent_memory_count(&self) -> Result<usize> {
870        // Calculate cutoff time based on last reflection
871        let cutoff_time = {
872            let metrics = self.metrics.read().await;
873            metrics
874                .last_reflection_time
875                .unwrap_or_else(|| chrono::Utc::now() - chrono::Duration::hours(24))
876        };
877
878        let query = r#"
879            SELECT COUNT(*) as memory_count
880            FROM memories 
881            WHERE status = 'active' 
882            AND created_at > $1
883        "#;
884
885        let memory_count: i64 = sqlx::query_scalar(query)
886            .bind(cutoff_time)
887            .fetch_one(self.repository.pool())
888            .await
889            .map_err(|e| MemoryError::DatabaseError {
890                message: format!("Failed to get recent memory count: {e}"),
891            })?;
892
893        debug!("Recent memory count: {}", memory_count);
894        Ok(memory_count as usize)
895    }
896
897    fn determine_priority(&self, accumulated_importance: f64) -> ReflectionPriority {
898        let thresholds = &self.config.priority_thresholds;
899
900        if accumulated_importance >= thresholds.critical_pattern_threshold {
901            ReflectionPriority::Critical
902        } else if accumulated_importance >= thresholds.high_importance_threshold {
903            ReflectionPriority::High
904        } else if accumulated_importance >= thresholds.medium_importance_threshold {
905            ReflectionPriority::Medium
906        } else {
907            ReflectionPriority::Low
908        }
909    }
910
911    async fn should_trigger_maintenance_reflection(&self) -> Result<bool> {
912        // Check if it's time for maintenance reflection
913        // This could be based on time patterns, system health, etc.
914        Ok(false)
915    }
916
917    /// Update session metrics after completion
918    async fn update_session_metrics(
919        &self,
920        session: &ReflectionSession,
921        duration: std::time::Duration,
922    ) {
923        self.total_sessions.fetch_add(1, Ordering::SeqCst);
924        self.total_processing_time_ms
925            .fetch_add(duration.as_millis() as u64, Ordering::SeqCst);
926
927        let mut metrics = self.metrics.write().await;
928        metrics.last_reflection_time = Some(session.started_at);
929    }
930
931    /// Metrics update loop
932    async fn metrics_update_loop(&self) {
933        let mut interval = interval(std::time::Duration::from_secs(60)); // Update every minute
934
935        while self.is_running.load(Ordering::SeqCst) {
936            interval.tick().await;
937
938            // Update metrics that require periodic calculation
939            let mut metrics = self.metrics.write().await;
940            metrics.service_uptime_hours = Utc::now()
941                .signed_duration_since(self.service_start_time)
942                .num_seconds() as f64
943                / 3600.0;
944            metrics.last_updated = Utc::now();
945        }
946    }
947
948    /// Clone the service for use in async tasks
949    fn clone_for_task(&self) -> Self {
950        Self {
951            config: self.config.clone(),
952            repository: self.repository.clone(),
953            reflection_engine: self.reflection_engine.clone(),
954            loop_prevention_engine: self.loop_prevention_engine.clone(),
955            is_running: AtomicBool::new(self.is_running.load(Ordering::SeqCst)),
956            session_semaphore: self.session_semaphore.clone(),
957            metrics: self.metrics.clone(),
958            service_start_time: self.service_start_time,
959            total_sessions: AtomicU64::new(self.total_sessions.load(Ordering::SeqCst)),
960            total_insights: AtomicU64::new(self.total_insights.load(Ordering::SeqCst)),
961            total_processing_time_ms: AtomicU64::new(
962                self.total_processing_time_ms.load(Ordering::SeqCst),
963            ),
964        }
965    }
966}
967
968#[cfg(test)]
969mod tests {
970    use super::*;
971
972    // Mock repository for testing
973    async fn create_test_repository() -> Arc<MemoryRepository> {
974        // This would create a test repository with mock data
975        // For now, we'll return a dummy implementation
976        todo!("Implement test repository creation")
977    }
978
979    #[tokio::test]
980    async fn test_service_configuration() {
981        let config = BackgroundReflectionConfig::default();
982
983        assert!(config.enabled);
984        assert_eq!(config.check_interval_minutes, 15);
985        assert_eq!(config.max_concurrent_sessions, 2);
986        assert!(config.store_insights_as_memories);
987        assert!(config.enable_quality_filtering);
988    }
989
990    #[tokio::test]
991    async fn test_priority_determination() {
992        let config = BackgroundReflectionConfig::default();
993
994        // Create a mock repository for testing priority logic only
995        // This doesn't require database access since we're only testing the priority function
996        let thresholds = &config.priority_thresholds;
997
998        // Test priority determination logic directly
999        if 50.0 >= thresholds.critical_pattern_threshold {
1000            assert_eq!(ReflectionPriority::Critical, ReflectionPriority::Critical);
1001        } else if 50.0 >= thresholds.high_importance_threshold {
1002            assert_eq!(ReflectionPriority::High, ReflectionPriority::High);
1003        } else if 50.0 >= thresholds.medium_importance_threshold {
1004            assert_eq!(ReflectionPriority::Medium, ReflectionPriority::Medium);
1005        } else {
1006            assert_eq!(ReflectionPriority::Low, ReflectionPriority::Low);
1007        }
1008
1009        // Test the actual priority thresholds with default values
1010        // Default medium threshold is 200.0, high is 300.0, critical is 500.0
1011        assert!(thresholds.medium_importance_threshold <= 200.0);
1012        assert!(thresholds.high_importance_threshold <= 300.0);
1013        assert!(thresholds.critical_pattern_threshold >= 500.0);
1014    }
1015
1016    #[tokio::test]
1017    async fn test_metrics_initialization() {
1018        let metrics = ReflectionServiceMetrics::default();
1019
1020        assert_eq!(metrics.total_reflections_completed, 0);
1021        assert_eq!(metrics.total_insights_generated, 0);
1022        assert_eq!(metrics.total_meta_memories_created, 0);
1023        assert_eq!(metrics.current_active_sessions, 0);
1024        assert!(metrics.trigger_type_distribution.is_empty());
1025    }
1026
1027    #[tokio::test]
1028    async fn test_trigger_types() {
1029        assert_eq!(
1030            TriggerType::ImportanceAccumulation,
1031            TriggerType::ImportanceAccumulation
1032        );
1033        assert_ne!(
1034            TriggerType::ImportanceAccumulation,
1035            TriggerType::ManualRequest
1036        );
1037    }
1038
1039    #[tokio::test]
1040    async fn test_priority_ordering() {
1041        assert!(ReflectionPriority::Critical > ReflectionPriority::High);
1042        assert!(ReflectionPriority::High > ReflectionPriority::Medium);
1043        assert!(ReflectionPriority::Medium > ReflectionPriority::Low);
1044    }
1045}