1use super::error::{MemoryError, Result};
39use super::insight_loop_prevention::{LoopPreventionEngine, PreventionAction};
40use super::models::*;
41use super::reflection_engine::{Insight, ReflectionConfig, ReflectionEngine, ReflectionSession};
42use super::repository::MemoryRepository;
43
44use chrono::{DateTime, Duration, Utc};
45use serde::{Deserialize, Serialize};
46use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
47use std::sync::Arc;
48use tokio::sync::{RwLock, Semaphore};
49use tokio::time::{interval, sleep, Instant};
50use tracing::{debug, error, info, warn};
51use uuid::Uuid;
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct BackgroundReflectionConfig {
56 pub enabled: bool,
58
59 pub check_interval_minutes: u64,
61
62 pub min_reflection_interval_minutes: u64,
64
65 pub max_concurrent_sessions: usize,
67
68 pub session_timeout_minutes: u64,
70
71 pub store_insights_as_memories: bool,
73
74 pub enable_quality_filtering: bool,
76
77 pub enable_metrics: bool,
79
80 pub max_retry_attempts: u32,
82
83 pub retry_backoff_multiplier: f64,
85
86 pub priority_thresholds: PriorityThresholds,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct PriorityThresholds {
93 pub high_importance_threshold: f64,
95
96 pub medium_importance_threshold: f64,
98
99 pub low_importance_threshold: f64,
101
102 pub critical_pattern_threshold: f64,
104}
105
106impl Default for PriorityThresholds {
107 fn default() -> Self {
108 Self {
109 high_importance_threshold: 300.0,
110 medium_importance_threshold: 200.0,
111 low_importance_threshold: 100.0,
112 critical_pattern_threshold: 500.0,
113 }
114 }
115}
116
117impl Default for BackgroundReflectionConfig {
118 fn default() -> Self {
119 Self {
120 enabled: true,
121 check_interval_minutes: 15,
122 min_reflection_interval_minutes: 60,
123 max_concurrent_sessions: 2,
124 session_timeout_minutes: 10,
125 store_insights_as_memories: true,
126 enable_quality_filtering: true,
127 enable_metrics: true,
128 max_retry_attempts: 3,
129 retry_backoff_multiplier: 2.0,
130 priority_thresholds: PriorityThresholds::default(),
131 }
132 }
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
137pub enum ReflectionPriority {
138 Low = 1,
139 Medium = 2,
140 High = 3,
141 Critical = 4,
142}
143
144#[derive(Debug, Clone)]
146pub struct ReflectionTrigger {
147 pub priority: ReflectionPriority,
148 pub trigger_type: TriggerType,
149 pub trigger_reason: String,
150 pub accumulated_importance: f64,
151 pub memory_count: usize,
152 pub triggered_at: DateTime<Utc>,
153}
154
155#[derive(Debug, Clone, PartialEq)]
157pub enum TriggerType {
158 ImportanceAccumulation,
159 TemporalMaintenance,
160 SemanticDensity,
161 ContradictionDetection,
162 ManualRequest,
163 SystemMaintenance,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct ReflectionServiceMetrics {
169 pub service_uptime_hours: f64,
170 pub total_reflections_completed: u64,
171 pub total_insights_generated: u64,
172 pub total_meta_memories_created: u64,
173 pub average_session_duration_ms: f64,
174 pub average_insights_per_session: f64,
175 pub quality_rejection_rate: f64,
176 pub current_active_sessions: usize,
177 pub last_reflection_time: Option<DateTime<Utc>>,
178 pub trigger_type_distribution: std::collections::HashMap<String, u64>,
179 pub performance_impact_ms: f64,
180 pub last_updated: DateTime<Utc>,
181}
182
183impl Default for ReflectionServiceMetrics {
184 fn default() -> Self {
185 Self {
186 service_uptime_hours: 0.0,
187 total_reflections_completed: 0,
188 total_insights_generated: 0,
189 total_meta_memories_created: 0,
190 average_session_duration_ms: 0.0,
191 average_insights_per_session: 0.0,
192 quality_rejection_rate: 0.0,
193 current_active_sessions: 0,
194 last_reflection_time: None,
195 trigger_type_distribution: std::collections::HashMap::new(),
196 performance_impact_ms: 0.0,
197 last_updated: Utc::now(),
198 }
199 }
200}
201
202pub struct BackgroundReflectionService {
204 config: BackgroundReflectionConfig,
205 repository: Arc<MemoryRepository>,
206 reflection_engine: Arc<RwLock<ReflectionEngine>>,
207 loop_prevention_engine: Arc<RwLock<LoopPreventionEngine>>,
208
209 is_running: AtomicBool,
211 session_semaphore: Arc<Semaphore>,
212 metrics: Arc<RwLock<ReflectionServiceMetrics>>,
213 service_start_time: DateTime<Utc>,
214
215 total_sessions: AtomicU64,
217 total_insights: AtomicU64,
218 total_processing_time_ms: AtomicU64,
219}
220
221impl BackgroundReflectionService {
222 pub fn new(
224 config: BackgroundReflectionConfig,
225 repository: Arc<MemoryRepository>,
226 reflection_config: ReflectionConfig,
227 loop_prevention_config: super::insight_loop_prevention::LoopPreventionConfig,
228 ) -> Self {
229 let reflection_engine = Arc::new(RwLock::new(ReflectionEngine::new(
230 reflection_config,
231 repository.clone(),
232 )));
233
234 let loop_prevention_engine = Arc::new(RwLock::new(LoopPreventionEngine::new(
235 loop_prevention_config,
236 )));
237
238 let session_semaphore = Arc::new(Semaphore::new(config.max_concurrent_sessions));
239 let metrics = Arc::new(RwLock::new(ReflectionServiceMetrics::default()));
240 let service_start_time = Utc::now();
241
242 Self {
243 config,
244 repository,
245 reflection_engine,
246 loop_prevention_engine,
247 is_running: AtomicBool::new(false),
248 session_semaphore,
249 metrics,
250 service_start_time,
251 total_sessions: AtomicU64::new(0),
252 total_insights: AtomicU64::new(0),
253 total_processing_time_ms: AtomicU64::new(0),
254 }
255 }
256
257 pub async fn start(&self) -> Result<()> {
259 if self.is_running.swap(true, Ordering::SeqCst) {
260 return Err(MemoryError::InvalidRequest {
261 message: "Background reflection service is already running".to_string(),
262 });
263 }
264
265 if !self.config.enabled {
266 info!("Background reflection service is disabled in configuration");
267 return Ok(());
268 }
269
270 info!("Starting background reflection service");
271
272 let service = self.clone_for_task();
274 tokio::spawn(async move {
275 if let Err(e) = service.monitoring_loop().await {
276 error!(
277 "Background reflection service encountered fatal error: {}",
278 e
279 );
280 }
281 });
282
283 if self.config.enable_metrics {
285 let service = self.clone_for_task();
286 tokio::spawn(async move {
287 service.metrics_update_loop().await;
288 });
289 }
290
291 info!("Background reflection service started successfully");
292 Ok(())
293 }
294
295 pub async fn stop(&self) -> Result<()> {
297 if !self.is_running.swap(false, Ordering::SeqCst) {
298 return Err(MemoryError::InvalidRequest {
299 message: "Background reflection service is not running".to_string(),
300 });
301 }
302
303 info!("Stopping background reflection service");
304
305 let timeout = Duration::minutes(self.config.session_timeout_minutes as i64);
307 let deadline = Utc::now() + timeout;
308
309 while Utc::now() < deadline {
310 let active_sessions = self.session_semaphore.available_permits();
311 if active_sessions == self.config.max_concurrent_sessions {
312 break;
313 }
314 sleep(std::time::Duration::from_millis(100)).await;
315 }
316
317 info!("Background reflection service stopped");
318 Ok(())
319 }
320
321 pub fn is_running(&self) -> bool {
323 self.is_running.load(Ordering::SeqCst)
324 }
325
326 pub async fn trigger_manual_reflection(&self, reason: String) -> Result<Uuid> {
328 let trigger = ReflectionTrigger {
329 priority: ReflectionPriority::Medium,
330 trigger_type: TriggerType::ManualRequest,
331 trigger_reason: reason,
332 accumulated_importance: 0.0,
333 memory_count: 0,
334 triggered_at: Utc::now(),
335 };
336
337 self.execute_reflection_session(trigger).await
338 }
339
340 pub async fn get_metrics(&self) -> ReflectionServiceMetrics {
342 let mut metrics = self.metrics.read().await.clone();
343
344 metrics.service_uptime_hours = Utc::now()
346 .signed_duration_since(self.service_start_time)
347 .num_seconds() as f64
348 / 3600.0;
349 metrics.total_reflections_completed = self.total_sessions.load(Ordering::SeqCst);
350 metrics.total_insights_generated = self.total_insights.load(Ordering::SeqCst);
351 metrics.current_active_sessions =
352 self.config.max_concurrent_sessions - self.session_semaphore.available_permits();
353
354 let total_time = self.total_processing_time_ms.load(Ordering::SeqCst);
355 let total_sessions = self.total_sessions.load(Ordering::SeqCst);
356 if total_sessions > 0 {
357 metrics.average_session_duration_ms = total_time as f64 / total_sessions as f64;
358 }
359
360 let total_insights = self.total_insights.load(Ordering::SeqCst);
361 if total_sessions > 0 {
362 metrics.average_insights_per_session = total_insights as f64 / total_sessions as f64;
363 }
364
365 metrics.last_updated = Utc::now();
366 metrics
367 }
368
369 async fn monitoring_loop(&self) -> Result<()> {
371 let mut interval = interval(std::time::Duration::from_secs(
372 self.config.check_interval_minutes * 60,
373 ));
374
375 info!(
376 "Background reflection monitoring started (check interval: {} minutes)",
377 self.config.check_interval_minutes
378 );
379
380 while self.is_running.load(Ordering::SeqCst) {
381 interval.tick().await;
382
383 let check_start = Instant::now();
384
385 match self.check_reflection_triggers().await {
387 Ok(Some(trigger)) => {
388 info!(
389 "Reflection trigger detected: {:?} priority, reason: {}",
390 trigger.priority, trigger.trigger_reason
391 );
392
393 let service = self.clone_for_task();
395 tokio::spawn(async move {
396 if let Err(e) = service.execute_reflection_session(trigger).await {
397 warn!("Background reflection session failed: {}", e);
398 }
399 });
400 }
401 Ok(None) => {
402 debug!("No reflection triggers detected");
403 }
404 Err(e) => {
405 warn!("Error checking reflection triggers: {}", e);
406 }
407 }
408
409 let check_duration = check_start.elapsed().as_millis() as f64;
411 let mut metrics = self.metrics.write().await;
412 metrics.performance_impact_ms = check_duration;
413 }
414
415 info!("Background reflection monitoring stopped");
416 Ok(())
417 }
418
419 async fn check_reflection_triggers(&self) -> Result<Option<ReflectionTrigger>> {
421 let metrics = self.metrics.read().await;
423 if let Some(last_reflection) = metrics.last_reflection_time {
424 let time_since_last = Utc::now().signed_duration_since(last_reflection);
425 if time_since_last.num_minutes() < self.config.min_reflection_interval_minutes as i64 {
426 return Ok(None);
427 }
428 }
429 drop(metrics);
430
431 let should_reflect = self
433 .reflection_engine
434 .read()
435 .await
436 .should_trigger_reflection()
437 .await?;
438
439 if let Some(reason) = should_reflect {
440 let accumulated_importance = self.calculate_accumulated_importance().await?;
442 let memory_count = self.get_recent_memory_count().await?;
443
444 let priority = self.determine_priority(accumulated_importance);
446
447 return Ok(Some(ReflectionTrigger {
448 priority,
449 trigger_type: TriggerType::ImportanceAccumulation,
450 trigger_reason: reason,
451 accumulated_importance,
452 memory_count,
453 triggered_at: Utc::now(),
454 }));
455 }
456
457 if self.should_trigger_maintenance_reflection().await? {
459 return Ok(Some(ReflectionTrigger {
460 priority: ReflectionPriority::Low,
461 trigger_type: TriggerType::TemporalMaintenance,
462 trigger_reason: "Scheduled maintenance reflection".to_string(),
463 accumulated_importance: 0.0,
464 memory_count: 0,
465 triggered_at: Utc::now(),
466 }));
467 }
468
469 Ok(None)
470 }
471
472 async fn execute_reflection_session(&self, trigger: ReflectionTrigger) -> Result<Uuid> {
474 let _permit =
476 self.session_semaphore
477 .acquire()
478 .await
479 .map_err(|_| MemoryError::InvalidRequest {
480 message: "Failed to acquire reflection session permit".to_string(),
481 })?;
482
483 let session_start = Instant::now();
484 let session_id = Uuid::new_v4();
485
486 info!(
487 "Starting reflection session {} (trigger: {:?})",
488 session_id, trigger.trigger_type
489 );
490
491 {
493 let mut metrics = self.metrics.write().await;
494 let trigger_name = format!("{:?}", trigger.trigger_type);
495 *metrics
496 .trigger_type_distribution
497 .entry(trigger_name)
498 .or_insert(0) += 1;
499 }
500
501 let mut retry_count = 0;
502 let mut last_error = None;
503
504 while retry_count <= self.config.max_retry_attempts {
505 match self.execute_reflection_with_timeout(trigger.clone()).await {
506 Ok(session) => {
507 let session_duration = session_start.elapsed();
508
509 if let Err(e) = self.process_session_insights(&session).await {
511 warn!(
512 "Failed to process insights from session {}: {}",
513 session_id, e
514 );
515 }
516
517 self.update_session_metrics(&session, session_duration)
519 .await;
520
521 info!(
522 "Reflection session {} completed successfully: {} insights generated in {:?}",
523 session_id,
524 session.generated_insights.len(),
525 session_duration
526 );
527
528 return Ok(session.id);
529 }
530 Err(e) => {
531 retry_count += 1;
532 last_error = Some(e);
533
534 if retry_count <= self.config.max_retry_attempts {
535 let delay_ms = (self
536 .config
537 .retry_backoff_multiplier
538 .powi(retry_count as i32 - 1)
539 * 1000.0) as u64;
540 warn!(
541 "Reflection session {} failed (attempt {}), retrying in {}ms: {}",
542 session_id,
543 retry_count,
544 delay_ms,
545 last_error.as_ref().unwrap()
546 );
547 sleep(std::time::Duration::from_millis(delay_ms)).await;
548 }
549 }
550 }
551 }
552
553 let final_error = last_error.unwrap_or_else(|| MemoryError::InvalidRequest {
554 message: "Unknown error in reflection session".to_string(),
555 });
556
557 error!(
558 "Reflection session {} failed after {} attempts: {}",
559 session_id, retry_count, final_error
560 );
561
562 Err(final_error)
563 }
564
565 async fn execute_reflection_with_timeout(
567 &self,
568 trigger: ReflectionTrigger,
569 ) -> Result<ReflectionSession> {
570 let timeout_duration =
571 std::time::Duration::from_secs(self.config.session_timeout_minutes * 60);
572
573 let reflection_future = async {
574 let mut engine = self.reflection_engine.write().await;
575 engine.execute_reflection(trigger.trigger_reason).await
576 };
577
578 match tokio::time::timeout(timeout_duration, reflection_future).await {
579 Ok(result) => result,
580 Err(_) => Err(MemoryError::InvalidRequest {
581 message: format!(
582 "Reflection session timed out after {} minutes",
583 self.config.session_timeout_minutes
584 ),
585 }),
586 }
587 }
588
589 async fn process_session_insights(&self, session: &ReflectionSession) -> Result<()> {
591 if session.generated_insights.is_empty() {
592 debug!("No insights generated in session {}", session.id);
593 return Ok(());
594 }
595
596 self.store_reflection_session(session).await?;
598
599 let mut processed_insights = 0;
600 let mut stored_as_memories = 0;
601 let mut quality_rejected = 0;
602
603 for insight in &session.generated_insights {
604 if self.config.enable_quality_filtering {
606 let validation_result = self
607 .loop_prevention_engine
608 .write()
609 .await
610 .validate_insight(insight)?;
611
612 match validation_result.prevention_action {
613 PreventionAction::RejectInsight => {
614 debug!("Insight {} rejected by quality filter", insight.id);
615 quality_rejected += 1;
616 continue;
617 }
618 PreventionAction::ModifyInsight => {
619 debug!("Insight {} flagged for modification", insight.id);
620 }
622 _ => {}
623 }
624 }
625
626 if let Err(e) = self.store_insight_in_database(insight).await {
628 warn!("Failed to store insight {} in database: {}", insight.id, e);
629 continue;
630 }
631
632 if self.config.store_insights_as_memories {
634 match self.store_insight_as_memory(insight).await {
635 Ok(memory) => {
636 self.link_insight_to_memory(insight.id, memory.id).await?;
638 stored_as_memories += 1;
639 }
640 Err(e) => {
641 warn!("Failed to store insight {} as memory: {}", insight.id, e);
642 }
643 }
644 }
645
646 processed_insights += 1;
647 }
648
649 self.total_insights
651 .fetch_add(processed_insights, Ordering::SeqCst);
652
653 {
655 let mut metrics = self.metrics.write().await;
656 let total_insights = session.generated_insights.len() as f64;
657 if total_insights > 0.0 {
658 metrics.quality_rejection_rate = quality_rejected as f64 / total_insights;
659 }
660 metrics.total_meta_memories_created += stored_as_memories;
661 }
662
663 info!(
664 "Processed {} insights from session {}: {} stored as memories, {} quality rejected",
665 processed_insights, session.id, stored_as_memories, quality_rejected
666 );
667
668 Ok(())
669 }
670
671 async fn store_insight_in_database(&self, insight: &Insight) -> Result<()> {
673 let query = r#"
674 INSERT INTO insights (
675 id, insight_type, content, confidence_score, source_memory_ids,
676 related_concepts, importance_score, novelty_score, coherence_score,
677 evidence_strength, semantic_richness, predictive_power, generated_at
678 ) VALUES (
679 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
680 )
681 ON CONFLICT (id) DO UPDATE SET
682 confidence_score = EXCLUDED.confidence_score,
683 importance_score = EXCLUDED.importance_score,
684 updated_at = NOW()
685 "#;
686
687 let insight_type_str = match insight.insight_type {
688 super::reflection_engine::InsightType::Pattern => "pattern",
689 super::reflection_engine::InsightType::Synthesis => "synthesis",
690 super::reflection_engine::InsightType::Gap => "gap",
691 super::reflection_engine::InsightType::Contradiction => "contradiction",
692 super::reflection_engine::InsightType::Trend => "trend",
693 super::reflection_engine::InsightType::Causality => "causality",
694 super::reflection_engine::InsightType::Analogy => "analogy",
695 };
696
697 sqlx::query(query)
698 .bind(insight.id)
699 .bind(insight_type_str)
700 .bind(&insight.content)
701 .bind(insight.confidence_score)
702 .bind(&insight.source_memory_ids)
703 .bind(&insight.related_concepts)
704 .bind(insight.importance_score)
705 .bind(insight.validation_metrics.novelty_score)
706 .bind(insight.validation_metrics.coherence_score)
707 .bind(insight.validation_metrics.evidence_strength)
708 .bind(insight.validation_metrics.semantic_richness)
709 .bind(insight.validation_metrics.predictive_power)
710 .bind(insight.generated_at)
711 .execute(self.repository.pool())
712 .await
713 .map_err(|e| MemoryError::DatabaseError {
714 message: format!("Failed to store insight in database: {e}"),
715 })?;
716
717 debug!("Successfully stored insight {} in database", insight.id);
718 Ok(())
719 }
720
721 async fn store_insight_as_memory(&self, insight: &Insight) -> Result<Memory> {
723 let importance_score = insight.importance_score * 1.5; let importance_score = importance_score.min(1.0); let metadata = serde_json::json!({
727 "insight_type": insight.insight_type,
728 "confidence_score": insight.confidence_score,
729 "source_memory_ids": insight.source_memory_ids,
730 "related_concepts": insight.related_concepts,
731 "validation_metrics": insight.validation_metrics,
732 "is_meta_memory": true,
733 "generated_by": "background_reflection_service",
734 "original_insight_id": insight.id
735 });
736
737 let create_request = CreateMemoryRequest {
738 content: insight.content.clone(),
739 embedding: None, tier: Some(MemoryTier::Working), importance_score: Some(importance_score),
742 metadata: Some(metadata),
743 parent_id: None,
744 expires_at: None,
745 };
746
747 self.repository.create_memory(create_request).await
748 }
749
750 async fn store_reflection_session(&self, session: &ReflectionSession) -> Result<()> {
752 let status_str = match session.completion_status {
753 super::reflection_engine::ReflectionStatus::InProgress => "in_progress",
754 super::reflection_engine::ReflectionStatus::Completed => "completed",
755 super::reflection_engine::ReflectionStatus::Failed => "failed",
756 super::reflection_engine::ReflectionStatus::Cancelled => "cancelled",
757 };
758
759 let completed_at =
760 if session.completion_status == super::reflection_engine::ReflectionStatus::Completed {
761 Some(chrono::Utc::now())
762 } else {
763 None
764 };
765
766 let query = r#"
767 INSERT INTO reflection_sessions (
768 id, trigger_reason, started_at, completed_at, status,
769 analyzed_memory_count, generated_cluster_count, generated_insight_count,
770 config_snapshot, results_summary
771 ) VALUES (
772 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
773 )
774 ON CONFLICT (id) DO UPDATE SET
775 completed_at = EXCLUDED.completed_at,
776 status = EXCLUDED.status,
777 analyzed_memory_count = EXCLUDED.analyzed_memory_count,
778 generated_cluster_count = EXCLUDED.generated_cluster_count,
779 generated_insight_count = EXCLUDED.generated_insight_count,
780 results_summary = EXCLUDED.results_summary
781 "#;
782
783 let config_snapshot = serde_json::json!({
784 "background_reflection_enabled": true,
785 "service_version": "1.0.0"
786 });
787
788 let results_summary = serde_json::json!({
789 "insights_generated": session.generated_insights.len(),
790 "clusters_analyzed": session.generated_clusters.len(),
791 "memories_processed": session.analyzed_memories.len(),
792 "knowledge_graph_updates": session.knowledge_graph_updates.len()
793 });
794
795 sqlx::query(query)
796 .bind(session.id)
797 .bind(&session.trigger_reason)
798 .bind(session.started_at)
799 .bind(completed_at)
800 .bind(status_str)
801 .bind(session.analyzed_memories.len() as i32)
802 .bind(session.generated_clusters.len() as i32)
803 .bind(session.generated_insights.len() as i32)
804 .bind(config_snapshot)
805 .bind(results_summary)
806 .execute(self.repository.pool())
807 .await
808 .map_err(|e| MemoryError::DatabaseError {
809 message: format!("Failed to store reflection session: {e}"),
810 })?;
811
812 debug!(
813 "Successfully stored reflection session {} in database",
814 session.id
815 );
816 Ok(())
817 }
818
819 async fn link_insight_to_memory(&self, insight_id: Uuid, memory_id: Uuid) -> Result<()> {
821 let query = "UPDATE insights SET memory_id = $1 WHERE id = $2";
822
823 sqlx::query(query)
824 .bind(memory_id)
825 .bind(insight_id)
826 .execute(self.repository.pool())
827 .await
828 .map_err(|e| MemoryError::DatabaseError {
829 message: format!("Failed to link insight to memory: {e}"),
830 })?;
831
832 debug!(
833 "Successfully linked insight {} to memory {}",
834 insight_id, memory_id
835 );
836 Ok(())
837 }
838
839 async fn calculate_accumulated_importance(&self) -> Result<f64> {
842 let cutoff_time = {
844 let metrics = self.metrics.read().await;
845 metrics
846 .last_reflection_time
847 .unwrap_or_else(|| chrono::Utc::now() - chrono::Duration::hours(24))
848 };
849
850 let query = r#"
851 SELECT COALESCE(SUM(importance_score), 0.0) as total_importance
852 FROM memories
853 WHERE status = 'active'
854 AND created_at > $1
855 "#;
856
857 let total_importance: f64 = sqlx::query_scalar(query)
858 .bind(cutoff_time)
859 .fetch_one(self.repository.pool())
860 .await
861 .map_err(|e| MemoryError::DatabaseError {
862 message: format!("Failed to calculate accumulated importance: {e}"),
863 })?;
864
865 debug!("Calculated accumulated importance: {:.2}", total_importance);
866 Ok(total_importance)
867 }
868
869 async fn get_recent_memory_count(&self) -> Result<usize> {
870 let cutoff_time = {
872 let metrics = self.metrics.read().await;
873 metrics
874 .last_reflection_time
875 .unwrap_or_else(|| chrono::Utc::now() - chrono::Duration::hours(24))
876 };
877
878 let query = r#"
879 SELECT COUNT(*) as memory_count
880 FROM memories
881 WHERE status = 'active'
882 AND created_at > $1
883 "#;
884
885 let memory_count: i64 = sqlx::query_scalar(query)
886 .bind(cutoff_time)
887 .fetch_one(self.repository.pool())
888 .await
889 .map_err(|e| MemoryError::DatabaseError {
890 message: format!("Failed to get recent memory count: {e}"),
891 })?;
892
893 debug!("Recent memory count: {}", memory_count);
894 Ok(memory_count as usize)
895 }
896
897 fn determine_priority(&self, accumulated_importance: f64) -> ReflectionPriority {
898 let thresholds = &self.config.priority_thresholds;
899
900 if accumulated_importance >= thresholds.critical_pattern_threshold {
901 ReflectionPriority::Critical
902 } else if accumulated_importance >= thresholds.high_importance_threshold {
903 ReflectionPriority::High
904 } else if accumulated_importance >= thresholds.medium_importance_threshold {
905 ReflectionPriority::Medium
906 } else {
907 ReflectionPriority::Low
908 }
909 }
910
911 async fn should_trigger_maintenance_reflection(&self) -> Result<bool> {
912 Ok(false)
915 }
916
917 async fn update_session_metrics(
919 &self,
920 session: &ReflectionSession,
921 duration: std::time::Duration,
922 ) {
923 self.total_sessions.fetch_add(1, Ordering::SeqCst);
924 self.total_processing_time_ms
925 .fetch_add(duration.as_millis() as u64, Ordering::SeqCst);
926
927 let mut metrics = self.metrics.write().await;
928 metrics.last_reflection_time = Some(session.started_at);
929 }
930
931 async fn metrics_update_loop(&self) {
933 let mut interval = interval(std::time::Duration::from_secs(60)); while self.is_running.load(Ordering::SeqCst) {
936 interval.tick().await;
937
938 let mut metrics = self.metrics.write().await;
940 metrics.service_uptime_hours = Utc::now()
941 .signed_duration_since(self.service_start_time)
942 .num_seconds() as f64
943 / 3600.0;
944 metrics.last_updated = Utc::now();
945 }
946 }
947
948 fn clone_for_task(&self) -> Self {
950 Self {
951 config: self.config.clone(),
952 repository: self.repository.clone(),
953 reflection_engine: self.reflection_engine.clone(),
954 loop_prevention_engine: self.loop_prevention_engine.clone(),
955 is_running: AtomicBool::new(self.is_running.load(Ordering::SeqCst)),
956 session_semaphore: self.session_semaphore.clone(),
957 metrics: self.metrics.clone(),
958 service_start_time: self.service_start_time,
959 total_sessions: AtomicU64::new(self.total_sessions.load(Ordering::SeqCst)),
960 total_insights: AtomicU64::new(self.total_insights.load(Ordering::SeqCst)),
961 total_processing_time_ms: AtomicU64::new(
962 self.total_processing_time_ms.load(Ordering::SeqCst),
963 ),
964 }
965 }
966}
967
968#[cfg(test)]
969mod tests {
970 use super::*;
971
972 async fn create_test_repository() -> Arc<MemoryRepository> {
974 todo!("Implement test repository creation")
977 }
978
979 #[tokio::test]
980 async fn test_service_configuration() {
981 let config = BackgroundReflectionConfig::default();
982
983 assert!(config.enabled);
984 assert_eq!(config.check_interval_minutes, 15);
985 assert_eq!(config.max_concurrent_sessions, 2);
986 assert!(config.store_insights_as_memories);
987 assert!(config.enable_quality_filtering);
988 }
989
990 #[tokio::test]
991 async fn test_priority_determination() {
992 let config = BackgroundReflectionConfig::default();
993
994 let thresholds = &config.priority_thresholds;
997
998 if 50.0 >= thresholds.critical_pattern_threshold {
1000 assert_eq!(ReflectionPriority::Critical, ReflectionPriority::Critical);
1001 } else if 50.0 >= thresholds.high_importance_threshold {
1002 assert_eq!(ReflectionPriority::High, ReflectionPriority::High);
1003 } else if 50.0 >= thresholds.medium_importance_threshold {
1004 assert_eq!(ReflectionPriority::Medium, ReflectionPriority::Medium);
1005 } else {
1006 assert_eq!(ReflectionPriority::Low, ReflectionPriority::Low);
1007 }
1008
1009 assert!(thresholds.medium_importance_threshold <= 200.0);
1012 assert!(thresholds.high_importance_threshold <= 300.0);
1013 assert!(thresholds.critical_pattern_threshold >= 500.0);
1014 }
1015
1016 #[tokio::test]
1017 async fn test_metrics_initialization() {
1018 let metrics = ReflectionServiceMetrics::default();
1019
1020 assert_eq!(metrics.total_reflections_completed, 0);
1021 assert_eq!(metrics.total_insights_generated, 0);
1022 assert_eq!(metrics.total_meta_memories_created, 0);
1023 assert_eq!(metrics.current_active_sessions, 0);
1024 assert!(metrics.trigger_type_distribution.is_empty());
1025 }
1026
1027 #[tokio::test]
1028 async fn test_trigger_types() {
1029 assert_eq!(
1030 TriggerType::ImportanceAccumulation,
1031 TriggerType::ImportanceAccumulation
1032 );
1033 assert_ne!(
1034 TriggerType::ImportanceAccumulation,
1035 TriggerType::ManualRequest
1036 );
1037 }
1038
1039 #[tokio::test]
1040 async fn test_priority_ordering() {
1041 assert!(ReflectionPriority::Critical > ReflectionPriority::High);
1042 assert!(ReflectionPriority::High > ReflectionPriority::Medium);
1043 assert!(ReflectionPriority::Medium > ReflectionPriority::Low);
1044 }
1045}