1use super::background_reflection_service::{
48 BackgroundReflectionConfig, BackgroundReflectionService, ReflectionServiceMetrics,
49};
50use super::cognitive_consolidation::{
51 CognitiveConsolidationConfig, CognitiveConsolidationEngine, CognitiveConsolidationResult,
52 RetrievalContext,
53};
54use super::error::Result;
55use super::insight_loop_prevention::{
56 LoopPreventionConfig, LoopPreventionEngine, PreventionAction,
57};
58use super::models::*;
59use super::reflection_engine::{Insight, ReflectionConfig, ReflectionEngine, ReflectionSession};
60use super::repository::MemoryRepository;
61use super::three_component_scoring::{
62 EnhancedSearchService, ScoringContext, ThreeComponentConfig, ThreeComponentEngine,
63};
64
65use chrono::{DateTime, Utc};
66use serde::{Deserialize, Serialize};
67use std::sync::Arc;
68use tokio::sync::RwLock;
69use tracing::{debug, info, warn};
70use uuid::Uuid;
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct CognitiveMemoryConfig {
75 pub scoring_config: ThreeComponentConfig,
77
78 pub consolidation_config: CognitiveConsolidationConfig,
80
81 pub reflection_config: ReflectionConfig,
83
84 pub loop_prevention_config: LoopPreventionConfig,
86
87 pub background_reflection_config: BackgroundReflectionConfig,
89
90 pub enable_auto_processing: bool,
92
93 pub enable_background_reflection: bool,
95
96 pub auto_processing_interval_minutes: u64,
98
99 pub enable_performance_monitoring: bool,
101
102 pub max_concurrent_operations: usize,
104}
105
106impl Default for CognitiveMemoryConfig {
107 fn default() -> Self {
108 Self {
109 scoring_config: ThreeComponentConfig::default(),
110 consolidation_config: CognitiveConsolidationConfig::default(),
111 reflection_config: ReflectionConfig::default(),
112 loop_prevention_config: LoopPreventionConfig::default(),
113 background_reflection_config: BackgroundReflectionConfig::default(),
114 enable_auto_processing: true,
115 enable_background_reflection: true,
116 auto_processing_interval_minutes: 30,
117 enable_performance_monitoring: true,
118 max_concurrent_operations: 10,
119 }
120 }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct CognitivePerformanceMetrics {
126 pub total_memories_processed: u64,
127 pub total_insights_generated: u64,
128 pub total_reflections_completed: u64,
129 pub average_scoring_time_ms: f64,
130 pub average_consolidation_time_ms: f64,
131 pub average_reflection_time_ms: f64,
132 pub loop_prevention_blocks: u64,
133 pub quality_rejections: u64,
134 pub system_uptime_hours: f64,
135 pub last_updated: DateTime<Utc>,
136}
137
138impl Default for CognitivePerformanceMetrics {
139 fn default() -> Self {
140 Self {
141 total_memories_processed: 0,
142 total_insights_generated: 0,
143 total_reflections_completed: 0,
144 average_scoring_time_ms: 0.0,
145 average_consolidation_time_ms: 0.0,
146 average_reflection_time_ms: 0.0,
147 loop_prevention_blocks: 0,
148 quality_rejections: 0,
149 system_uptime_hours: 0.0,
150 last_updated: Utc::now(),
151 }
152 }
153}
154
155#[derive(Debug, Clone)]
157pub struct CognitiveMemoryRequest {
158 pub content: String,
159 pub embedding: Option<Vec<f32>>,
160 pub importance_score: Option<f64>,
161 pub metadata: Option<serde_json::Value>,
162 pub retrieval_context: RetrievalContext,
163 pub enable_immediate_consolidation: bool,
164 pub enable_quality_assessment: bool,
165}
166
167#[derive(Debug, Clone)]
169pub struct CognitiveMemoryResult {
170 pub memory: Memory,
171 pub consolidation_result: Option<CognitiveConsolidationResult>,
172 pub quality_assessment: Option<super::insight_loop_prevention::QualityAssessment>,
173 pub processing_time_ms: u64,
174 pub cognitive_flags: CognitiveFlags,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize, Default)]
179pub struct CognitiveFlags {
180 pub consolidation_applied: bool,
181 pub reflection_triggered: bool,
182 pub quality_validated: bool,
183 pub loop_prevention_checked: bool,
184 pub three_component_scored: bool,
185}
186
187pub struct CognitiveMemorySystem {
189 repository: Arc<MemoryRepository>,
190 config: CognitiveMemoryConfig,
191
192 scoring_engine: ThreeComponentEngine,
194 consolidation_engine: CognitiveConsolidationEngine,
195 reflection_engine: Arc<RwLock<ReflectionEngine>>,
196 loop_prevention_engine: Arc<RwLock<LoopPreventionEngine>>,
197 search_service: EnhancedSearchService,
198 background_reflection_service: Arc<BackgroundReflectionService>,
199
200 performance_metrics: Arc<RwLock<CognitivePerformanceMetrics>>,
202 last_background_processing: Arc<RwLock<DateTime<Utc>>>,
203 system_start_time: DateTime<Utc>,
204}
205
206impl CognitiveMemorySystem {
207 pub async fn new(
209 repository: Arc<MemoryRepository>,
210 config: CognitiveMemoryConfig,
211 ) -> Result<Self> {
212 info!("Initializing Cognitive Memory System with enhanced features");
213
214 let scoring_engine = ThreeComponentEngine::new(config.scoring_config.clone())?;
216 let consolidation_engine =
217 CognitiveConsolidationEngine::new(config.consolidation_config.clone());
218 let reflection_engine = Arc::new(RwLock::new(ReflectionEngine::new(
219 config.reflection_config.clone(),
220 repository.clone(),
221 )));
222 let loop_prevention_engine = Arc::new(RwLock::new(LoopPreventionEngine::new(
223 config.loop_prevention_config.clone(),
224 )));
225 let search_service = EnhancedSearchService::new(config.scoring_config.clone())?;
226
227 let background_reflection_service = Arc::new(BackgroundReflectionService::new(
229 config.background_reflection_config.clone(),
230 repository.clone(),
231 config.reflection_config.clone(),
232 config.loop_prevention_config.clone(),
233 ));
234
235 let performance_metrics = Arc::new(RwLock::new(CognitivePerformanceMetrics::default()));
236 let last_background_processing = Arc::new(RwLock::new(Utc::now()));
237 let system_start_time = Utc::now();
238
239 info!("Cognitive Memory System initialized successfully");
240
241 let system = Self {
242 repository,
243 config,
244 scoring_engine,
245 consolidation_engine,
246 reflection_engine,
247 loop_prevention_engine,
248 search_service,
249 background_reflection_service,
250 performance_metrics,
251 last_background_processing,
252 system_start_time,
253 };
254
255 if system.config.enable_background_reflection {
257 system.background_reflection_service.start().await?;
258 info!("Background reflection service started");
259 }
260
261 Ok(system)
262 }
263
264 pub async fn store_memory_with_cognitive_processing(
266 &self,
267 request: CognitiveMemoryRequest,
268 ) -> Result<CognitiveMemoryResult> {
269 let start_time = std::time::Instant::now();
270 let mut cognitive_flags = CognitiveFlags::default();
271
272 info!("Processing memory storage with cognitive enhancements");
273
274 let create_request = CreateMemoryRequest {
276 content: request.content.clone(),
277 embedding: request.embedding.clone(),
278 tier: Some(MemoryTier::Working),
279 importance_score: request.importance_score,
280 metadata: request.metadata.clone(),
281 parent_id: None,
282 expires_at: None,
283 };
284
285 let mut memory = self.repository.create_memory(create_request).await?;
286
287 let scoring_context = ScoringContext {
289 query_embedding: request.embedding.clone().map(pgvector::Vector::from),
290 context_factors: request.retrieval_context.environmental_factors.clone(),
291 query_time: Utc::now(),
292 user_preferences: std::collections::HashMap::new(),
293 };
294
295 if let Ok(scoring_result) =
296 self.scoring_engine
297 .calculate_score(&memory, &scoring_context, false)
298 {
299 cognitive_flags.three_component_scored = true;
301 debug!(
302 "Three-component scoring applied: combined score {:.3}",
303 scoring_result.combined_score
304 );
305 }
306
307 let consolidation_result = if request.enable_immediate_consolidation {
309 let similar_memories = self.find_similar_memories(&memory, 10).await?;
310
311 match self
312 .consolidation_engine
313 .calculate_cognitive_consolidation(
314 &memory,
315 &request.retrieval_context,
316 &similar_memories,
317 )
318 .await
319 {
320 Ok(result) => {
321 self.consolidation_engine
322 .apply_consolidation_results(&mut memory, &result, &self.repository)
323 .await?;
324 cognitive_flags.consolidation_applied = true;
325 debug!(
326 "Cognitive consolidation applied: strength {:.3}",
327 result.new_consolidation_strength
328 );
329 Some(result)
330 }
331 Err(e) => {
332 warn!("Consolidation failed: {}", e);
333 None
334 }
335 }
336 } else {
337 None
338 };
339
340 let quality_assessment = if request.enable_quality_assessment {
342 let mock_insight = Insight {
344 id: Uuid::new_v4(),
345 insight_type: super::reflection_engine::InsightType::Pattern,
346 content: request.content.clone(),
347 confidence_score: 0.8,
348 source_memory_ids: vec![memory.id],
349 related_concepts: Vec::new(), knowledge_graph_nodes: Vec::new(),
351 importance_score: memory.importance_score,
352 generated_at: Utc::now(),
353 validation_metrics: super::reflection_engine::ValidationMetrics {
354 novelty_score: 0.8,
355 coherence_score: 0.9,
356 evidence_strength: 0.7,
357 semantic_richness: 0.6,
358 predictive_power: 0.5,
359 },
360 };
361
362 match self
363 .loop_prevention_engine
364 .write()
365 .await
366 .validate_insight(&mock_insight)
367 {
368 Ok(loop_result) => {
369 cognitive_flags.quality_validated = true;
370 cognitive_flags.loop_prevention_checked = true;
371
372 if loop_result.prevention_action == PreventionAction::RejectInsight {
373 warn!("Memory quality assessment failed - would be rejected as insight");
374 }
375
376 Some(super::insight_loop_prevention::QualityAssessment {
377 novelty_score: 0.8,
378 coherence_score: 0.9,
379 evidence_strength: 0.7,
380 semantic_richness: 0.6,
381 predictive_power: 0.5,
382 overall_quality: 0.74,
383 quality_factors: vec!["High coherence".to_string()],
384 deficiency_reasons: Vec::new(),
385 })
386 }
387 Err(e) => {
388 warn!("Quality assessment failed: {}", e);
389 None
390 }
391 }
392 } else {
393 None
394 };
395
396 if self.config.enable_auto_processing {
398 if let Ok(should_reflect) = self
399 .reflection_engine
400 .read()
401 .await
402 .should_trigger_reflection()
403 .await
404 {
405 if should_reflect.is_some() {
406 cognitive_flags.reflection_triggered = true;
407 debug!("Reflection will be triggered in background");
408
409 let reflection_engine = self.reflection_engine.clone();
411 let trigger_reason = should_reflect.unwrap();
412 tokio::spawn(async move {
413 let mut engine = reflection_engine.write().await;
414 match engine.execute_reflection(trigger_reason).await {
415 Ok(session) => {
416 info!(
417 "Background reflection completed: {} insights generated",
418 session.generated_insights.len()
419 );
420 }
421 Err(e) => {
422 warn!("Background reflection failed: {}", e);
423 }
424 }
425 });
426 }
427 }
428 }
429
430 let processing_time = start_time.elapsed().as_millis() as u64;
431
432 self.update_performance_metrics(processing_time, &cognitive_flags)
434 .await;
435
436 info!(
437 "Cognitive memory processing completed in {}ms",
438 processing_time
439 );
440
441 Ok(CognitiveMemoryResult {
442 memory,
443 consolidation_result,
444 quality_assessment,
445 processing_time_ms: processing_time,
446 cognitive_flags,
447 })
448 }
449
450 pub async fn cognitive_search(
452 &self,
453 query: &str,
454 context: ScoringContext,
455 limit: Option<i32>,
456 ) -> Result<Vec<super::three_component_scoring::EnhancedSearchResult>> {
457 let start_time = std::time::Instant::now();
458
459 info!("Performing cognitive search for: {}", query);
460
461 let search_request = SearchRequest {
463 query_text: Some(query.to_string()),
464 query_embedding: context
465 .query_embedding
466 .as_ref()
467 .map(|v| v.as_slice().to_vec()),
468 search_type: Some(SearchType::Hybrid),
469 limit,
470 explain_score: Some(true),
471 ..Default::default()
472 };
473
474 let search_response = self.repository.search_memories(search_request).await?;
476
477 let enhanced_results =
479 self.search_service
480 .rank_search_results(search_response.results, &context, true)?;
481
482 let processing_time = start_time.elapsed().as_millis() as u64;
483
484 debug!(
485 "Cognitive search completed in {}ms, {} results",
486 processing_time,
487 enhanced_results.len()
488 );
489
490 Ok(enhanced_results)
491 }
492
493 pub async fn trigger_reflection(&self, reason: String) -> Result<ReflectionSession> {
495 info!("Manually triggering reflection: {}", reason);
496
497 let mut reflection_engine = self.reflection_engine.write().await;
498 let session = reflection_engine.execute_reflection(reason).await?;
499
500 let mut loop_prevention = self.loop_prevention_engine.write().await;
502 for insight in &session.generated_insights {
503 match loop_prevention.validate_insight(insight) {
504 Ok(validation_result) => {
505 if validation_result.prevention_action == PreventionAction::Allow {
506 let quality = super::insight_loop_prevention::QualityAssessment {
508 novelty_score: insight.validation_metrics.novelty_score,
509 coherence_score: insight.validation_metrics.coherence_score,
510 evidence_strength: insight.validation_metrics.evidence_strength,
511 semantic_richness: insight.validation_metrics.semantic_richness,
512 predictive_power: insight.validation_metrics.predictive_power,
513 overall_quality: (insight.validation_metrics.novelty_score
514 + insight.validation_metrics.coherence_score
515 + insight.validation_metrics.evidence_strength
516 + insight.validation_metrics.semantic_richness
517 + insight.validation_metrics.predictive_power)
518 / 5.0,
519 quality_factors: vec!["Validated through reflection".to_string()],
520 deficiency_reasons: Vec::new(),
521 };
522
523 if let Err(e) = loop_prevention.register_insight(insight, quality) {
524 warn!("Failed to register insight {}: {}", insight.id, e);
525 }
526 } else {
527 warn!(
528 "Insight {} blocked by loop prevention: {:?}",
529 insight.id, validation_result.prevention_action
530 );
531 }
532 }
533 Err(e) => {
534 warn!("Failed to validate insight {}: {}", insight.id, e);
535 }
536 }
537 }
538
539 {
541 let mut metrics = self.performance_metrics.write().await;
542 metrics.total_reflections_completed += 1;
543 metrics.total_insights_generated += session.generated_insights.len() as u64;
544 metrics.last_updated = Utc::now();
545 }
546
547 info!(
548 "Reflection completed: {} insights generated, {} clusters analyzed",
549 session.generated_insights.len(),
550 session.generated_clusters.len()
551 );
552
553 Ok(session)
554 }
555
556 pub async fn get_performance_metrics(&self) -> CognitivePerformanceMetrics {
558 let mut metrics = self.performance_metrics.read().await.clone();
559
560 metrics.system_uptime_hours = Utc::now()
562 .signed_duration_since(self.system_start_time)
563 .num_seconds() as f64
564 / 3600.0;
565
566 metrics
567 }
568
569 pub async fn get_loop_prevention_statistics(
571 &self,
572 ) -> super::insight_loop_prevention::PreventionStatistics {
573 self.loop_prevention_engine
574 .read()
575 .await
576 .get_prevention_statistics()
577 }
578
579 pub async fn get_reflection_service_metrics(&self) -> ReflectionServiceMetrics {
581 self.background_reflection_service.get_metrics().await
582 }
583
584 pub async fn trigger_background_reflection(&self, reason: String) -> Result<uuid::Uuid> {
586 info!("Manually triggering background reflection: {}", reason);
587 self.background_reflection_service
588 .trigger_manual_reflection(reason)
589 .await
590 }
591
592 pub async fn start_background_reflection(&self) -> Result<()> {
594 if !self.background_reflection_service.is_running() {
595 self.background_reflection_service.start().await?;
596 info!("Background reflection service started");
597 }
598 Ok(())
599 }
600
601 pub async fn stop_background_reflection(&self) -> Result<()> {
603 if self.background_reflection_service.is_running() {
604 self.background_reflection_service.stop().await?;
605 info!("Background reflection service stopped");
606 }
607 Ok(())
608 }
609
610 pub async fn update_configuration(&mut self, config: CognitiveMemoryConfig) -> Result<()> {
612 info!("Updating cognitive memory system configuration");
613
614 config.scoring_config.validate()?;
616
617 self.scoring_engine
619 .update_config(config.scoring_config.clone())?;
620
621 self.config = config;
623
624 info!("Configuration updated successfully");
625 Ok(())
626 }
627
628 pub async fn background_maintenance(&self) -> Result<()> {
630 if !self.config.enable_background_reflection {
631 return Ok(());
632 }
633
634 let last_processing = *self.last_background_processing.read().await;
635 let minutes_since_last = Utc::now()
636 .signed_duration_since(last_processing)
637 .num_minutes() as u64;
638
639 if minutes_since_last < self.config.auto_processing_interval_minutes {
640 return Ok(());
641 }
642
643 debug!("Performing background maintenance");
644
645 if let Ok(should_reflect) = self
647 .reflection_engine
648 .read()
649 .await
650 .should_trigger_reflection()
651 .await
652 {
653 if let Some(reason) = should_reflect {
654 match self.trigger_reflection(reason).await {
655 Ok(_) => {
656 info!("Background reflection completed successfully");
657 }
658 Err(e) => {
659 warn!("Background reflection failed: {}", e);
660 }
661 }
662 }
663 }
664
665 *self.last_background_processing.write().await = Utc::now();
667
668 Ok(())
669 }
670
671 async fn find_similar_memories(&self, memory: &Memory, limit: usize) -> Result<Vec<Memory>> {
674 if memory.embedding.is_none() {
675 return Ok(Vec::new());
676 }
677
678 let search_request = SearchRequest {
679 query_embedding: Some(memory.embedding.as_ref().unwrap().as_slice().to_vec()),
680 search_type: Some(SearchType::Semantic),
681 similarity_threshold: Some(0.7),
682 limit: Some(limit as i32),
683 ..Default::default()
684 };
685
686 let search_response = self.repository.search_memories(search_request).await?;
687
688 Ok(search_response
689 .results
690 .into_iter()
691 .filter(|result| result.memory.id != memory.id)
692 .map(|result| result.memory)
693 .collect())
694 }
695
696 async fn update_performance_metrics(&self, processing_time_ms: u64, flags: &CognitiveFlags) {
697 let mut metrics = self.performance_metrics.write().await;
698
699 metrics.total_memories_processed += 1;
700
701 let count = metrics.total_memories_processed as f64;
703 metrics.average_scoring_time_ms =
704 (metrics.average_scoring_time_ms * (count - 1.0) + processing_time_ms as f64) / count;
705
706 if flags.consolidation_applied {
707 metrics.average_consolidation_time_ms =
708 (metrics.average_consolidation_time_ms * (count - 1.0) + processing_time_ms as f64)
709 / count;
710 }
711
712 metrics.last_updated = Utc::now();
713 }
714}
715
716#[cfg(test)]
717mod tests {
718 use super::*;
719
720 async fn create_test_repository() -> Arc<MemoryRepository> {
721 panic!("Test repository creation not implemented - requires database setup");
723 }
724
725 #[tokio::test]
726 #[ignore] async fn test_cognitive_memory_system_creation() {
728 let repository = create_test_repository().await;
729 let config = CognitiveMemoryConfig::default();
730
731 let system = CognitiveMemorySystem::new(repository, config).await;
732 assert!(system.is_ok());
733 }
734
735 #[test]
736 fn test_cognitive_memory_config_defaults() {
737 let config = CognitiveMemoryConfig::default();
738
739 assert!(config.enable_auto_processing);
740 assert!(config.enable_background_reflection);
741 assert_eq!(config.auto_processing_interval_minutes, 30);
742 assert_eq!(config.max_concurrent_operations, 10);
743 }
744
745 #[test]
746 fn test_cognitive_flags_default() {
747 let flags = CognitiveFlags::default();
748
749 assert!(!flags.consolidation_applied);
750 assert!(!flags.reflection_triggered);
751 assert!(!flags.quality_validated);
752 assert!(!flags.loop_prevention_checked);
753 assert!(!flags.three_component_scored);
754 }
755
756 #[test]
757 fn test_performance_metrics_default() {
758 let metrics = CognitivePerformanceMetrics::default();
759
760 assert_eq!(metrics.total_memories_processed, 0);
761 assert_eq!(metrics.total_insights_generated, 0);
762 assert_eq!(metrics.total_reflections_completed, 0);
763 assert_eq!(metrics.average_scoring_time_ms, 0.0);
764 }
765}