codex_memory/memory/
importance_assessment.rs

1use crate::embedding::EmbeddingService;
2use crate::memory::MemoryError;
3use anyhow::Result;
4use chrono::{DateTime, Utc};
5use prometheus::{Counter, Histogram, IntCounter, IntGauge, Registry};
6use rand::Rng;
7use regex::Regex;
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10use std::collections::{HashMap, LinkedList};
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use thiserror::Error;
15use tokio::sync::RwLock;
16use tokio::time::timeout;
17use tracing::{debug, error, info, warn};
18
19#[derive(Debug, Error)]
20pub enum ImportanceAssessmentError {
21    #[error("Stage 1 pattern matching failed: {0}")]
22    Stage1Failed(String),
23
24    #[error("Stage 2 semantic analysis failed: {0}")]
25    Stage2Failed(String),
26
27    #[error("Stage 3 LLM scoring failed: {0}")]
28    Stage3Failed(String),
29
30    #[error("Circuit breaker is open: {0}")]
31    CircuitBreakerOpen(String),
32
33    #[error("Timeout exceeded: {0}")]
34    Timeout(String),
35
36    #[error("Configuration error: {0}")]
37    Configuration(String),
38
39    #[error("Memory operation failed: {0}")]
40    Memory(#[from] MemoryError),
41
42    #[error("Cache operation failed: {0}")]
43    Cache(String),
44}
45
46/// Configuration for the importance assessment pipeline
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ImportanceAssessmentConfig {
49    /// Stage 1: Pattern matching configuration
50    pub stage1: Stage1Config,
51
52    /// Stage 2: Semantic similarity configuration
53    pub stage2: Stage2Config,
54
55    /// Stage 3: LLM scoring configuration
56    pub stage3: Stage3Config,
57
58    /// Circuit breaker configuration
59    pub circuit_breaker: CircuitBreakerConfig,
60
61    /// Performance thresholds
62    pub performance: PerformanceConfig,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct Stage1Config {
67    /// Confidence threshold to pass to Stage 2 (0.0-1.0)
68    pub confidence_threshold: f64,
69
70    /// Pattern library for keyword/phrase matching
71    pub pattern_library: Vec<ImportancePattern>,
72
73    /// Maximum processing time in milliseconds
74    pub max_processing_time_ms: u64,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct Stage2Config {
79    /// Confidence threshold to pass to Stage 3 (0.0-1.0)
80    pub confidence_threshold: f64,
81
82    /// Maximum processing time in milliseconds
83    pub max_processing_time_ms: u64,
84
85    /// Cache TTL for embeddings in seconds
86    pub embedding_cache_ttl_seconds: u64,
87
88    /// Maximum cache size (number of entries)
89    pub embedding_cache_max_size: usize,
90
91    /// Cache eviction threshold (evict when this percentage full)
92    pub cache_eviction_threshold: f64,
93
94    /// Similarity threshold for semantic matching
95    pub similarity_threshold: f32,
96
97    /// Reference embeddings for importance patterns
98    pub reference_embeddings: Vec<ReferenceEmbedding>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct Stage3Config {
103    /// Maximum processing time in milliseconds
104    pub max_processing_time_ms: u64,
105
106    /// LLM endpoint configuration
107    pub llm_endpoint: String,
108
109    /// Maximum concurrent LLM requests
110    pub max_concurrent_requests: usize,
111
112    /// Prompt template for LLM scoring
113    pub prompt_template: String,
114
115    /// Target percentage of evaluations that should reach Stage 3
116    pub target_usage_percentage: f64,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct CircuitBreakerConfig {
121    /// Failure threshold before opening the circuit
122    pub failure_threshold: usize,
123
124    /// Time window for failure counting in seconds
125    pub failure_window_seconds: u64,
126
127    /// Recovery timeout in seconds
128    pub recovery_timeout_seconds: u64,
129
130    /// Minimum requests before evaluating failures
131    pub minimum_requests: usize,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct PerformanceConfig {
136    /// Stage 1 target time in milliseconds
137    pub stage1_target_ms: u64,
138
139    /// Stage 2 target time in milliseconds
140    pub stage2_target_ms: u64,
141
142    /// Stage 3 target time in milliseconds
143    pub stage3_target_ms: u64,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct ImportancePattern {
148    /// Pattern name for metrics and debugging
149    pub name: String,
150
151    /// Regular expression pattern
152    pub pattern: String,
153
154    /// Weight/importance score for this pattern (0.0-1.0)
155    pub weight: f64,
156
157    /// Context words that boost the pattern's importance
158    pub context_boosters: Vec<String>,
159
160    /// Category of the pattern (e.g., "command", "preference", "memory")
161    pub category: String,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct ReferenceEmbedding {
166    /// Name of the reference pattern
167    pub name: String,
168
169    /// Pre-computed embedding vector
170    pub embedding: Vec<f32>,
171
172    /// Importance weight for this reference
173    pub weight: f64,
174
175    /// Category of the reference
176    pub category: String,
177}
178
179/// Result of the importance assessment pipeline
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct ImportanceAssessmentResult {
182    /// Final importance score (0.0-1.0)
183    pub importance_score: f64,
184
185    /// Which stage provided the final score
186    pub final_stage: AssessmentStage,
187
188    /// Results from each stage
189    pub stage_results: Vec<StageResult>,
190
191    /// Total processing time in milliseconds
192    pub total_processing_time_ms: u64,
193
194    /// Assessment timestamp
195    pub assessed_at: DateTime<Utc>,
196
197    /// Confidence in the assessment (0.0-1.0)
198    pub confidence: f64,
199
200    /// Explanation of the assessment
201    pub explanation: Option<String>,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct StageResult {
206    /// Which stage this result is from
207    pub stage: AssessmentStage,
208
209    /// Score from this stage (0.0-1.0)
210    pub score: f64,
211
212    /// Confidence in this stage's result (0.0-1.0)
213    pub confidence: f64,
214
215    /// Processing time for this stage in milliseconds
216    pub processing_time_ms: u64,
217
218    /// Whether this stage passed its confidence threshold
219    pub passed_threshold: bool,
220
221    /// Stage-specific details
222    pub details: StageDetails,
223}
224
225#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
226pub enum AssessmentStage {
227    Stage1PatternMatching,
228    Stage2SemanticSimilarity,
229    Stage3LLMScoring,
230}
231
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233pub enum StageDetails {
234    Stage1 {
235        matched_patterns: Vec<MatchedPattern>,
236        total_patterns_checked: usize,
237    },
238    Stage2 {
239        similarity_scores: Vec<SimilarityScore>,
240        cache_hit: bool,
241        embedding_generation_time_ms: Option<u64>,
242    },
243    Stage3 {
244        llm_response: String,
245        prompt_tokens: Option<usize>,
246        completion_tokens: Option<usize>,
247        model_used: String,
248    },
249}
250
251#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
252pub struct MatchedPattern {
253    pub pattern_name: String,
254    pub pattern_category: String,
255    pub match_text: String,
256    pub match_position: usize,
257    pub weight: f64,
258    pub context_boost: f64,
259}
260
261#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
262pub struct SimilarityScore {
263    pub reference_name: String,
264    pub reference_category: String,
265    pub similarity: f32,
266    pub weight: f64,
267    pub weighted_score: f64,
268}
269
270/// Circuit breaker states
271#[derive(Debug, Clone, PartialEq)]
272enum CircuitBreakerState {
273    Closed,
274    Open(DateTime<Utc>),
275    HalfOpen,
276}
277
278/// Thread-safe circuit breaker for LLM calls using atomic operations
279#[derive(Debug)]
280struct CircuitBreaker {
281    state: RwLock<CircuitBreakerState>,
282    config: CircuitBreakerConfig,
283    failure_count: AtomicUsize,
284    last_failure_time: RwLock<Option<DateTime<Utc>>>,
285    request_count: AtomicUsize,
286    consecutive_successes: AtomicUsize, // For half-open state management
287}
288
289impl CircuitBreaker {
290    fn new(config: CircuitBreakerConfig) -> Self {
291        Self {
292            state: RwLock::new(CircuitBreakerState::Closed),
293            config,
294            failure_count: AtomicUsize::new(0),
295            last_failure_time: RwLock::new(None),
296            request_count: AtomicUsize::new(0),
297            consecutive_successes: AtomicUsize::new(0),
298        }
299    }
300
301    async fn can_execute(&self) -> Result<bool, ImportanceAssessmentError> {
302        let state = self.state.read().await;
303        match *state {
304            CircuitBreakerState::Closed => Ok(true),
305            CircuitBreakerState::Open(opened_at) => {
306                let now = Utc::now();
307                let recovery_time = opened_at
308                    + chrono::Duration::seconds(self.config.recovery_timeout_seconds as i64);
309
310                if now >= recovery_time {
311                    drop(state);
312                    let mut state = self.state.write().await;
313                    *state = CircuitBreakerState::HalfOpen;
314                    Ok(true)
315                } else {
316                    Err(ImportanceAssessmentError::CircuitBreakerOpen(format!(
317                        "Circuit breaker is open until {}",
318                        recovery_time
319                    )))
320                }
321            }
322            CircuitBreakerState::HalfOpen => Ok(true),
323        }
324    }
325
326    async fn record_success(&self) {
327        // Increment consecutive successes atomically
328        let consecutive_successes = self.consecutive_successes.fetch_add(1, Ordering::SeqCst);
329
330        // Reset failure count atomically
331        self.failure_count.store(0, Ordering::SeqCst);
332
333        // In half-open state, require multiple successes before fully closing
334        let current_state = {
335            let state = self.state.read().await;
336            state.clone()
337        };
338
339        match current_state {
340            CircuitBreakerState::HalfOpen => {
341                // Require at least 3 consecutive successes to close
342                if consecutive_successes >= 2 {
343                    let mut state = self.state.write().await;
344                    *state = CircuitBreakerState::Closed;
345
346                    let mut last_failure_time = self.last_failure_time.write().await;
347                    *last_failure_time = None;
348
349                    info!(
350                        "Circuit breaker closed after {} consecutive successes",
351                        consecutive_successes + 1
352                    );
353                }
354            }
355            CircuitBreakerState::Open(_) => {
356                // This shouldn't happen, but handle gracefully
357                warn!(
358                    "Received success while circuit breaker is open - state inconsistency detected"
359                );
360            }
361            CircuitBreakerState::Closed => {
362                // Already closed, just reset failure time
363                let mut last_failure_time = self.last_failure_time.write().await;
364                *last_failure_time = None;
365            }
366        }
367    }
368
369    async fn record_failure(&self) {
370        let now = Utc::now();
371
372        // Reset consecutive successes on any failure
373        self.consecutive_successes.store(0, Ordering::SeqCst);
374
375        // Atomically increment request count
376        let request_count = self.request_count.fetch_add(1, Ordering::SeqCst) + 1;
377
378        // Handle failure window and count atomically where possible
379        let should_open_circuit = {
380            let mut last_failure_time = self.last_failure_time.write().await;
381
382            // Check if we need to reset failure count due to time window
383            if let Some(last_failure) = *last_failure_time {
384                let window_start =
385                    now - chrono::Duration::seconds(self.config.failure_window_seconds as i64);
386                if last_failure < window_start {
387                    // Reset failure count as we're outside the window
388                    self.failure_count.store(0, Ordering::SeqCst);
389                }
390            }
391
392            // Atomically increment failure count
393            let failure_count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
394            *last_failure_time = Some(now);
395
396            // Check if we should open the circuit
397            request_count >= self.config.minimum_requests
398                && failure_count >= self.config.failure_threshold
399        };
400
401        if should_open_circuit {
402            // Check current state before opening
403            let current_state = {
404                let state = self.state.read().await;
405                state.clone()
406            };
407
408            match current_state {
409                CircuitBreakerState::Closed | CircuitBreakerState::HalfOpen => {
410                    let mut state = self.state.write().await;
411                    // Double-check state hasn't changed while acquiring write lock
412                    match *state {
413                        CircuitBreakerState::Closed | CircuitBreakerState::HalfOpen => {
414                            *state = CircuitBreakerState::Open(now);
415                            let failure_count = self.failure_count.load(Ordering::SeqCst);
416                            warn!(
417                                "Circuit breaker opened due to {} failures out of {} requests",
418                                failure_count, request_count
419                            );
420                        }
421                        CircuitBreakerState::Open(_) => {
422                            // Already open, nothing to do
423                        }
424                    }
425                }
426                CircuitBreakerState::Open(_) => {
427                    // Already open, just log
428                    debug!("Additional failure recorded while circuit breaker is open");
429                }
430            }
431        }
432    }
433}
434
435/// Cached embedding with TTL and LRU tracking
436#[derive(Debug, Clone)]
437struct CachedEmbedding {
438    embedding: Vec<f32>,
439    cached_at: DateTime<Utc>,
440    ttl_seconds: u64,
441    last_accessed: DateTime<Utc>,
442}
443
444/// LRU cache entry for tracking access order
445#[derive(Debug, Clone)]
446#[allow(dead_code)] // timestamp may be used for future LRU optimizations
447struct LRUNode {
448    key: String,
449    timestamp: DateTime<Utc>,
450}
451
452/// Thread-safe LRU cache for embeddings with automatic eviction
453#[derive(Debug)]
454struct EmbeddingCache {
455    cache: RwLock<HashMap<String, CachedEmbedding>>,
456    lru_list: RwLock<LinkedList<LRUNode>>,
457    current_size: AtomicUsize,
458    max_size: usize,
459    #[allow(dead_code)] // reserved for future adaptive eviction algorithms
460    eviction_threshold: f64,
461    eviction_count: AtomicU64,
462    memory_pressure_threshold: usize,
463}
464
465impl CachedEmbedding {
466    fn new(embedding: Vec<f32>, ttl_seconds: u64) -> Self {
467        let now = Utc::now();
468        Self {
469            embedding,
470            cached_at: now,
471            ttl_seconds,
472            last_accessed: now,
473        }
474    }
475
476    fn is_expired(&self) -> bool {
477        let now = Utc::now();
478        let expiry = self.cached_at + chrono::Duration::seconds(self.ttl_seconds as i64);
479        now >= expiry
480    }
481
482    fn touch(&mut self) {
483        self.last_accessed = Utc::now();
484    }
485}
486
487impl EmbeddingCache {
488    fn new(max_size: usize, eviction_threshold: f64) -> Self {
489        Self {
490            cache: RwLock::new(HashMap::new()),
491            lru_list: RwLock::new(LinkedList::new()),
492            current_size: AtomicUsize::new(0),
493            max_size,
494            eviction_threshold,
495            eviction_count: AtomicU64::new(0),
496            memory_pressure_threshold: (max_size as f64 * eviction_threshold) as usize,
497        }
498    }
499
500    async fn get(&self, key: &str) -> Option<CachedEmbedding> {
501        let mut cache = self.cache.write().await;
502        let mut lru_list = self.lru_list.write().await;
503
504        if let Some(cached) = cache.get_mut(key) {
505            if cached.is_expired() {
506                cache.remove(key);
507                self.current_size.fetch_sub(1, Ordering::Relaxed);
508                // Remove from LRU list manually (retain is unstable)
509                *lru_list = lru_list.iter().filter(|node| node.key != key).cloned().collect();
510                return None;
511            }
512
513            cached.touch();
514            // Move to front of LRU list manually (retain is unstable)
515            *lru_list = lru_list.iter().filter(|node| node.key != key).cloned().collect();
516            lru_list.push_front(LRUNode {
517                key: key.to_string(),
518                timestamp: cached.last_accessed,
519            });
520
521            Some(cached.clone())
522        } else {
523            None
524        }
525    }
526
527    async fn insert(
528        &self,
529        key: String,
530        value: CachedEmbedding,
531    ) -> Result<(), ImportanceAssessmentError> {
532        // Check if we need to evict before inserting
533        let current_size = self.current_size.load(Ordering::Relaxed);
534        if current_size >= self.memory_pressure_threshold {
535            self.evict_lru_entries().await?;
536        }
537
538        let mut cache = self.cache.write().await;
539        let mut lru_list = self.lru_list.write().await;
540
541        // If key already exists, update it
542        if cache.contains_key(&key) {
543            *lru_list = lru_list.iter().filter(|node| node.key != key).cloned().collect();
544        } else {
545            self.current_size.fetch_add(1, Ordering::Relaxed);
546        }
547
548        cache.insert(key.clone(), value.clone());
549        lru_list.push_front(LRUNode {
550            key: key.clone(),
551            timestamp: value.last_accessed,
552        });
553
554        Ok(())
555    }
556
557    async fn evict_lru_entries(&self) -> Result<(), ImportanceAssessmentError> {
558        let mut cache = self.cache.write().await;
559        let mut lru_list = self.lru_list.write().await;
560
561        let target_size = (self.max_size as f64 * 0.7) as usize; // Evict to 70% capacity
562        let current_size = cache.len();
563
564        if current_size <= target_size {
565            return Ok(());
566        }
567
568        let entries_to_remove = current_size - target_size;
569        let mut removed_count = 0;
570
571        // Remove oldest entries
572        while removed_count < entries_to_remove && !lru_list.is_empty() {
573            if let Some(node) = lru_list.pop_back() {
574                cache.remove(&node.key);
575                removed_count += 1;
576                self.eviction_count.fetch_add(1, Ordering::Relaxed);
577            }
578        }
579
580        self.current_size.store(cache.len(), Ordering::Relaxed);
581
582        if removed_count > 0 {
583            info!(
584                "Evicted {} entries from embedding cache due to memory pressure",
585                removed_count
586            );
587        }
588
589        Ok(())
590    }
591
592    async fn clear(&self) {
593        let mut cache = self.cache.write().await;
594        let mut lru_list = self.lru_list.write().await;
595
596        cache.clear();
597        lru_list.clear();
598        self.current_size.store(0, Ordering::Relaxed);
599    }
600
601    fn len(&self) -> usize {
602        self.current_size.load(Ordering::Relaxed)
603    }
604
605    fn eviction_count(&self) -> u64 {
606        self.eviction_count.load(Ordering::Relaxed)
607    }
608
609    async fn cleanup_expired(&self) -> Result<usize, ImportanceAssessmentError> {
610        let mut cache = self.cache.write().await;
611        let mut lru_list = self.lru_list.write().await;
612
613        let mut expired_keys = Vec::new();
614
615        for (key, value) in cache.iter() {
616            if value.is_expired() {
617                expired_keys.push(key.clone());
618            }
619        }
620
621        for key in &expired_keys {
622            cache.remove(key);
623            *lru_list = lru_list.iter().filter(|node| &node.key != key).cloned().collect();
624        }
625
626        let removed_count = expired_keys.len();
627        self.current_size.store(cache.len(), Ordering::Relaxed);
628
629        if removed_count > 0 {
630            debug!(
631                "Cleaned up {} expired entries from embedding cache",
632                removed_count
633            );
634        }
635
636        Ok(removed_count)
637    }
638}
639
640/// Metrics for the importance assessment pipeline
641#[derive(Debug)]
642pub struct ImportanceAssessmentMetrics {
643    // Stage progression counters
644    pub stage1_executions: IntCounter,
645    pub stage2_executions: IntCounter,
646    pub stage3_executions: IntCounter,
647
648    // Stage timing histograms
649    pub stage1_duration: Histogram,
650    pub stage2_duration: Histogram,
651    pub stage3_duration: Histogram,
652
653    // Pipeline completion counters
654    pub completed_at_stage1: IntCounter,
655    pub completed_at_stage2: IntCounter,
656    pub completed_at_stage3: IntCounter,
657
658    // Performance metrics
659    pub stage1_threshold_violations: IntCounter,
660    pub stage2_threshold_violations: IntCounter,
661    pub stage3_threshold_violations: IntCounter,
662
663    // Cache metrics
664    pub embedding_cache_hits: IntCounter,
665    pub embedding_cache_misses: IntCounter,
666    pub embedding_cache_size: IntGauge,
667
668    // Circuit breaker metrics
669    pub circuit_breaker_opened: Counter,
670    pub circuit_breaker_half_open: Counter,
671    pub circuit_breaker_closed: Counter,
672    pub llm_call_failures: IntCounter,
673    pub llm_call_successes: IntCounter,
674
675    // Quality metrics
676    pub assessment_confidence: Histogram,
677    pub final_importance_scores: Histogram,
678}
679
680impl ImportanceAssessmentMetrics {
681    pub fn new(registry: &Registry) -> Result<Self> {
682        let stage1_executions = IntCounter::new(
683            "importance_assessment_stage1_executions_total",
684            "Total number of Stage 1 executions",
685        )?;
686        registry.register(Box::new(stage1_executions.clone()))?;
687
688        let stage2_executions = IntCounter::new(
689            "importance_assessment_stage2_executions_total",
690            "Total number of Stage 2 executions",
691        )?;
692        registry.register(Box::new(stage2_executions.clone()))?;
693
694        let stage3_executions = IntCounter::new(
695            "importance_assessment_stage3_executions_total",
696            "Total number of Stage 3 executions",
697        )?;
698        registry.register(Box::new(stage3_executions.clone()))?;
699
700        let stage1_duration = Histogram::with_opts(
701            prometheus::HistogramOpts::new(
702                "importance_assessment_stage1_duration_seconds",
703                "Duration of Stage 1 processing",
704            )
705            .buckets(vec![0.001, 0.005, 0.01, 0.02, 0.05, 0.1]),
706        )?;
707        registry.register(Box::new(stage1_duration.clone()))?;
708
709        let stage2_duration = Histogram::with_opts(
710            prometheus::HistogramOpts::new(
711                "importance_assessment_stage2_duration_seconds",
712                "Duration of Stage 2 processing",
713            )
714            .buckets(vec![0.01, 0.05, 0.1, 0.2, 0.5, 1.0]),
715        )?;
716        registry.register(Box::new(stage2_duration.clone()))?;
717
718        let stage3_duration = Histogram::with_opts(
719            prometheus::HistogramOpts::new(
720                "importance_assessment_stage3_duration_seconds",
721                "Duration of Stage 3 processing",
722            )
723            .buckets(vec![0.1, 0.5, 1.0, 2.0, 5.0, 10.0]),
724        )?;
725        registry.register(Box::new(stage3_duration.clone()))?;
726
727        let completed_at_stage1 = IntCounter::new(
728            "importance_assessment_completed_at_stage1_total",
729            "Total assessments completed at Stage 1",
730        )?;
731        registry.register(Box::new(completed_at_stage1.clone()))?;
732
733        let completed_at_stage2 = IntCounter::new(
734            "importance_assessment_completed_at_stage2_total",
735            "Total assessments completed at Stage 2",
736        )?;
737        registry.register(Box::new(completed_at_stage2.clone()))?;
738
739        let completed_at_stage3 = IntCounter::new(
740            "importance_assessment_completed_at_stage3_total",
741            "Total assessments completed at Stage 3",
742        )?;
743        registry.register(Box::new(completed_at_stage3.clone()))?;
744
745        let stage1_threshold_violations = IntCounter::new(
746            "importance_assessment_stage1_threshold_violations_total",
747            "Total Stage 1 performance threshold violations",
748        )?;
749        registry.register(Box::new(stage1_threshold_violations.clone()))?;
750
751        let stage2_threshold_violations = IntCounter::new(
752            "importance_assessment_stage2_threshold_violations_total",
753            "Total Stage 2 performance threshold violations",
754        )?;
755        registry.register(Box::new(stage2_threshold_violations.clone()))?;
756
757        let stage3_threshold_violations = IntCounter::new(
758            "importance_assessment_stage3_threshold_violations_total",
759            "Total Stage 3 performance threshold violations",
760        )?;
761        registry.register(Box::new(stage3_threshold_violations.clone()))?;
762
763        let embedding_cache_hits = IntCounter::new(
764            "importance_assessment_embedding_cache_hits_total",
765            "Total embedding cache hits",
766        )?;
767        registry.register(Box::new(embedding_cache_hits.clone()))?;
768
769        let embedding_cache_misses = IntCounter::new(
770            "importance_assessment_embedding_cache_misses_total",
771            "Total embedding cache misses",
772        )?;
773        registry.register(Box::new(embedding_cache_misses.clone()))?;
774
775        let embedding_cache_size = IntGauge::new(
776            "importance_assessment_embedding_cache_size",
777            "Current size of embedding cache",
778        )?;
779        registry.register(Box::new(embedding_cache_size.clone()))?;
780
781        let circuit_breaker_opened = Counter::new(
782            "importance_assessment_circuit_breaker_opened_total",
783            "Total times circuit breaker opened",
784        )?;
785        registry.register(Box::new(circuit_breaker_opened.clone()))?;
786
787        let circuit_breaker_half_open = Counter::new(
788            "importance_assessment_circuit_breaker_half_open_total",
789            "Total times circuit breaker went half-open",
790        )?;
791        registry.register(Box::new(circuit_breaker_half_open.clone()))?;
792
793        let circuit_breaker_closed = Counter::new(
794            "importance_assessment_circuit_breaker_closed_total",
795            "Total times circuit breaker closed",
796        )?;
797        registry.register(Box::new(circuit_breaker_closed.clone()))?;
798
799        let llm_call_failures = IntCounter::new(
800            "importance_assessment_llm_call_failures_total",
801            "Total LLM call failures",
802        )?;
803        registry.register(Box::new(llm_call_failures.clone()))?;
804
805        let llm_call_successes = IntCounter::new(
806            "importance_assessment_llm_call_successes_total",
807            "Total LLM call successes",
808        )?;
809        registry.register(Box::new(llm_call_successes.clone()))?;
810
811        let assessment_confidence = Histogram::with_opts(
812            prometheus::HistogramOpts::new(
813                "importance_assessment_confidence",
814                "Confidence scores of assessments",
815            )
816            .buckets(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]),
817        )?;
818        registry.register(Box::new(assessment_confidence.clone()))?;
819
820        let final_importance_scores = Histogram::with_opts(
821            prometheus::HistogramOpts::new(
822                "importance_assessment_final_scores",
823                "Final importance scores from assessments",
824            )
825            .buckets(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]),
826        )?;
827        registry.register(Box::new(final_importance_scores.clone()))?;
828
829        Ok(Self {
830            stage1_executions,
831            stage2_executions,
832            stage3_executions,
833            stage1_duration,
834            stage2_duration,
835            stage3_duration,
836            completed_at_stage1,
837            completed_at_stage2,
838            completed_at_stage3,
839            stage1_threshold_violations,
840            stage2_threshold_violations,
841            stage3_threshold_violations,
842            embedding_cache_hits,
843            embedding_cache_misses,
844            embedding_cache_size,
845            circuit_breaker_opened,
846            circuit_breaker_half_open,
847            circuit_breaker_closed,
848            llm_call_failures,
849            llm_call_successes,
850            assessment_confidence,
851            final_importance_scores,
852        })
853    }
854}
855
856/// Optimized pattern matcher with pre-compiled regex and fast string matching
857#[derive(Debug)]
858struct OptimizedPatternMatcher {
859    // Pre-compiled regex patterns with their metadata
860    regex_patterns: Vec<(Regex, ImportancePattern)>,
861    // Fast string patterns for simple keyword matching
862    keyword_patterns: Vec<(String, ImportancePattern)>,
863    // Combined regex for single-pass matching when possible
864    #[allow(dead_code)] // reserved for future optimization
865    combined_regex: Option<Regex>,
866}
867
868impl OptimizedPatternMatcher {
869    fn new(patterns: &[ImportancePattern]) -> Result<Self, ImportanceAssessmentError> {
870        let mut regex_patterns = Vec::new();
871        let mut keyword_patterns = Vec::new();
872
873        for pattern in patterns {
874            // Validate regex complexity to prevent ReDoS
875            Self::validate_regex_complexity(&pattern.pattern)?;
876
877            // Try to determine if this is a simple keyword pattern
878            if Self::is_simple_keyword_pattern(&pattern.pattern) {
879                // Extract the keyword from simple patterns like (?i)\b(word)\b
880                if let Some(keyword) = Self::extract_keyword(&pattern.pattern) {
881                    keyword_patterns.push((keyword.to_lowercase(), pattern.clone()));
882                    continue;
883                }
884            }
885
886            // Compile complex regex patterns
887            match Regex::new(&pattern.pattern) {
888                Ok(regex) => regex_patterns.push((regex, pattern.clone())),
889                Err(e) => {
890                    error!(
891                        "Failed to compile regex pattern '{}': {}",
892                        pattern.pattern, e
893                    );
894                    return Err(ImportanceAssessmentError::Configuration(format!(
895                        "Invalid regex pattern '{}': {}",
896                        pattern.pattern, e
897                    )));
898                }
899            }
900        }
901
902        Ok(Self {
903            regex_patterns,
904            keyword_patterns,
905            combined_regex: None, // Could be optimized further with a single combined regex
906        })
907    }
908
909    fn validate_regex_complexity(pattern: &str) -> Result<(), ImportanceAssessmentError> {
910        // Check for potentially dangerous regex patterns that could cause ReDoS
911        let dangerous_patterns = [
912            "(.*)*",
913            "(.+)+",
914            "([^x]*)*",
915            "(a|a)*",
916            "(a|a)+",
917            "(a*)*",
918            "(a+)+",
919            "(.{0,10000})*",
920            "(.*){10,}",
921            "(.+){10,}",
922        ];
923
924        for dangerous in &dangerous_patterns {
925            if pattern.contains(dangerous) {
926                return Err(ImportanceAssessmentError::Configuration(format!(
927                    "Regex pattern contains potentially dangerous sequence '{}': {}",
928                    dangerous, pattern
929                )));
930            }
931        }
932
933        // Check pattern length
934        if pattern.len() > 1000 {
935            return Err(ImportanceAssessmentError::Configuration(
936                "Regex pattern too long (max 1000 characters)".to_string(),
937            ));
938        }
939
940        // Check for excessive nested groups
941        let open_parens = pattern.chars().filter(|&c| c == '(').count();
942        if open_parens > 20 {
943            return Err(ImportanceAssessmentError::Configuration(
944                "Regex pattern has too many nested groups (max 20)".to_string(),
945            ));
946        }
947
948        Ok(())
949    }
950
951    fn is_simple_keyword_pattern(pattern: &str) -> bool {
952        // Match patterns like (?i)\b(word|other)\b or (?i)\b(word)\b
953        pattern.starts_with("(?i)")
954            && pattern.contains("\\b")
955            && !pattern.contains(".*")
956            && !pattern.contains(".+")
957            && !pattern.contains("\\d")
958            && !pattern.contains("\\s")
959            && pattern.chars().filter(|&c| c == '(').count() <= 2
960    }
961
962    fn extract_keyword(pattern: &str) -> Option<String> {
963        // Extract keyword from patterns like (?i)\b(word)\b
964        if let Some(start) = pattern.find('(') {
965            if let Some(end) = pattern.rfind(')') {
966                let keywords = &pattern[start + 1..end];
967                // For simple single keyword patterns, return the first keyword
968                if !keywords.contains('|') && keywords.chars().all(|c| c.is_alphabetic()) {
969                    return Some(keywords.to_string());
970                }
971            }
972        }
973        None
974    }
975
976    fn find_matches(&self, content: &str, max_matches: usize) -> Vec<MatchedPattern> {
977        let mut matches = Vec::new();
978        let content_lower = content.to_lowercase();
979
980        // Fast keyword matching first
981        for (keyword, pattern) in &self.keyword_patterns {
982            if matches.len() >= max_matches {
983                break;
984            }
985
986            let mut start = 0;
987            while let Some(pos) = content_lower[start..].find(keyword) {
988                let absolute_pos = start + pos;
989
990                // Check word boundaries manually for fast keyword matching
991                let is_word_start = absolute_pos == 0
992                    || !content_lower
993                        .chars()
994                        .nth(absolute_pos - 1)
995                        .unwrap_or(' ')
996                        .is_alphabetic();
997                let is_word_end = absolute_pos + keyword.len() >= content_lower.len()
998                    || !content_lower
999                        .chars()
1000                        .nth(absolute_pos + keyword.len())
1001                        .unwrap_or(' ')
1002                        .is_alphabetic();
1003
1004                if is_word_start && is_word_end {
1005                    let context_boost = self.calculate_context_boost(
1006                        content,
1007                        absolute_pos,
1008                        &pattern.context_boosters,
1009                    );
1010
1011                    matches.push(MatchedPattern {
1012                        pattern_name: pattern.name.clone(),
1013                        pattern_category: pattern.category.clone(),
1014                        match_text: keyword.clone(),
1015                        match_position: absolute_pos,
1016                        weight: pattern.weight,
1017                        context_boost,
1018                    });
1019                }
1020
1021                start = absolute_pos + 1;
1022            }
1023        }
1024
1025        // Regex matching for complex patterns
1026        for (regex, pattern) in &self.regex_patterns {
1027            if matches.len() >= max_matches {
1028                break;
1029            }
1030
1031            for mat in regex.find_iter(content).take(max_matches - matches.len()) {
1032                let match_text = mat.as_str().to_string();
1033                let match_position = mat.start();
1034
1035                let context_boost = self.calculate_context_boost(
1036                    content,
1037                    match_position,
1038                    &pattern.context_boosters,
1039                );
1040
1041                matches.push(MatchedPattern {
1042                    pattern_name: pattern.name.clone(),
1043                    pattern_category: pattern.category.clone(),
1044                    match_text,
1045                    match_position,
1046                    weight: pattern.weight,
1047                    context_boost,
1048                });
1049            }
1050        }
1051
1052        matches
1053    }
1054
1055    fn calculate_context_boost(
1056        &self,
1057        content: &str,
1058        match_position: usize,
1059        boosters: &[String],
1060    ) -> f64 {
1061        let window_size = 100;
1062        let start = match_position.saturating_sub(window_size);
1063        let end = (match_position + window_size).min(content.len());
1064        let context = &content[start..end].to_lowercase();
1065
1066        let mut boost: f64 = 0.0;
1067        for booster in boosters {
1068            if context.contains(&booster.to_lowercase()) {
1069                boost += 0.1; // 10% boost per context word
1070            }
1071        }
1072
1073        boost.min(0.5_f64) // Maximum 50% boost
1074    }
1075}
1076
1077/// Main importance assessment pipeline
1078pub struct ImportanceAssessmentPipeline {
1079    config: ImportanceAssessmentConfig,
1080    pattern_matcher: OptimizedPatternMatcher,
1081    embedding_service: Arc<dyn EmbeddingService>,
1082    embedding_cache: EmbeddingCache,
1083    circuit_breaker: CircuitBreaker,
1084    metrics: ImportanceAssessmentMetrics,
1085    http_client: reqwest::Client,
1086}
1087
1088impl ImportanceAssessmentPipeline {
1089    pub fn new(
1090        config: ImportanceAssessmentConfig,
1091        embedding_service: Arc<dyn EmbeddingService>,
1092        metrics_registry: &Registry,
1093    ) -> Result<Self> {
1094        // Initialize optimized pattern matcher
1095        let pattern_matcher = OptimizedPatternMatcher::new(&config.stage1.pattern_library)?;
1096
1097        let metrics = ImportanceAssessmentMetrics::new(metrics_registry)?;
1098
1099        let circuit_breaker = CircuitBreaker::new(config.circuit_breaker.clone());
1100
1101        let http_client = reqwest::Client::builder()
1102            .timeout(Duration::from_millis(config.stage3.max_processing_time_ms))
1103            .build()?;
1104
1105        let embedding_cache = EmbeddingCache::new(
1106            config.stage2.embedding_cache_max_size,
1107            config.stage2.cache_eviction_threshold,
1108        );
1109
1110        Ok(Self {
1111            config,
1112            pattern_matcher,
1113            embedding_service,
1114            embedding_cache,
1115            circuit_breaker,
1116            metrics,
1117            http_client,
1118        })
1119    }
1120
1121    /// Assess the importance of a memory content string
1122    pub async fn assess_importance(
1123        &self,
1124        content: &str,
1125    ) -> Result<ImportanceAssessmentResult, ImportanceAssessmentError> {
1126        let assessment_start = Instant::now();
1127        let mut stage_results = Vec::new();
1128
1129        info!(
1130            "Starting importance assessment for content length: {}",
1131            content.len()
1132        );
1133
1134        // Stage 1: Pattern matching
1135        let stage1_result = self.execute_stage1(content).await?;
1136        let stage1_passed = stage1_result.passed_threshold;
1137        stage_results.push(stage1_result.clone());
1138
1139        if stage1_passed {
1140            debug!("Stage 1 passed threshold, proceeding to Stage 2");
1141
1142            // Stage 2: Semantic similarity
1143            let stage2_result = self.execute_stage2(content).await?;
1144            let stage2_passed = stage2_result.passed_threshold;
1145            stage_results.push(stage2_result.clone());
1146
1147            if stage2_passed {
1148                debug!("Stage 2 passed threshold, proceeding to Stage 3");
1149
1150                // Stage 3: LLM scoring
1151                let stage3_result = self.execute_stage3(content).await?;
1152                stage_results.push(stage3_result.clone());
1153
1154                self.metrics.completed_at_stage3.inc();
1155
1156                let final_score = stage3_result.score;
1157                let confidence = stage3_result.confidence;
1158
1159                let result = ImportanceAssessmentResult {
1160                    importance_score: final_score,
1161                    final_stage: AssessmentStage::Stage3LLMScoring,
1162                    stage_results,
1163                    total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1164                    assessed_at: Utc::now(),
1165                    confidence,
1166                    explanation: self.extract_explanation_from_stage3(&stage3_result),
1167                };
1168
1169                self.record_final_metrics(&result);
1170                return Ok(result);
1171            } else {
1172                self.metrics.completed_at_stage2.inc();
1173
1174                let final_score = stage2_result.score;
1175                let confidence = stage2_result.confidence;
1176
1177                let result = ImportanceAssessmentResult {
1178                    importance_score: final_score,
1179                    final_stage: AssessmentStage::Stage2SemanticSimilarity,
1180                    stage_results,
1181                    total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1182                    assessed_at: Utc::now(),
1183                    confidence,
1184                    explanation: Some(
1185                        "Assessment completed at Stage 2 based on semantic similarity".to_string(),
1186                    ),
1187                };
1188
1189                self.record_final_metrics(&result);
1190                return Ok(result);
1191            }
1192        } else {
1193            self.metrics.completed_at_stage1.inc();
1194
1195            let final_score = stage1_result.score;
1196            let confidence = stage1_result.confidence;
1197
1198            let result = ImportanceAssessmentResult {
1199                importance_score: final_score,
1200                final_stage: AssessmentStage::Stage1PatternMatching,
1201                stage_results,
1202                total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1203                assessed_at: Utc::now(),
1204                confidence,
1205                explanation: Some(
1206                    "Assessment completed at Stage 1 based on pattern matching".to_string(),
1207                ),
1208            };
1209
1210            self.record_final_metrics(&result);
1211            return Ok(result);
1212        }
1213    }
1214
1215    async fn execute_stage1(
1216        &self,
1217        content: &str,
1218    ) -> Result<StageResult, ImportanceAssessmentError> {
1219        let stage_start = Instant::now();
1220        self.metrics.stage1_executions.inc();
1221
1222        let timeout_duration = Duration::from_millis(self.config.stage1.max_processing_time_ms);
1223
1224        let result = timeout(timeout_duration, async {
1225            // Limit content length for Stage 1 to prevent performance issues
1226            let content_for_analysis = if content.len() > 10000 {
1227                warn!(
1228                    "Content length {} exceeds Stage 1 limit, truncating to 10000 chars",
1229                    content.len()
1230                );
1231                &content[..10000]
1232            } else {
1233                content
1234            };
1235
1236            // Use optimized pattern matching with limits
1237            let max_matches = 50; // Limit total matches to prevent runaway processing
1238            let matched_patterns = self
1239                .pattern_matcher
1240                .find_matches(content_for_analysis, max_matches);
1241
1242            let mut total_score = 0.0;
1243            let mut max_weight: f64 = 0.0;
1244
1245            for pattern in &matched_patterns {
1246                let effective_weight = pattern.weight * (1.0 + pattern.context_boost);
1247                total_score += effective_weight;
1248                max_weight = max_weight.max(effective_weight);
1249            }
1250
1251            // Normalize score to 0.0-1.0 range
1252            let normalized_score = if matched_patterns.is_empty() {
1253                0.0
1254            } else {
1255                (total_score / (matched_patterns.len() as f64)).min(1.0)
1256            };
1257
1258            // Calculate confidence based on pattern diversity and strength
1259            let confidence = if matched_patterns.is_empty() {
1260                0.1 // Low confidence for no matches
1261            } else {
1262                let pattern_diversity = matched_patterns
1263                    .iter()
1264                    .map(|m| m.pattern_category.clone())
1265                    .collect::<std::collections::HashSet<_>>()
1266                    .len() as f64;
1267                let pattern_count = self.config.stage1.pattern_library.len().max(1) as f64; // Avoid division by zero
1268                let base_confidence = (pattern_diversity / pattern_count).min(1.0);
1269                let strength_boost = (max_weight / 1.0_f64).min(0.3); // Max 30% boost from pattern strength
1270                (base_confidence + strength_boost).min(1.0)
1271            };
1272
1273            let passed_threshold = confidence >= self.config.stage1.confidence_threshold;
1274
1275            StageResult {
1276                stage: AssessmentStage::Stage1PatternMatching,
1277                score: normalized_score,
1278                confidence,
1279                processing_time_ms: stage_start.elapsed().as_millis() as u64,
1280                passed_threshold,
1281                details: StageDetails::Stage1 {
1282                    matched_patterns,
1283                    total_patterns_checked: self.config.stage1.pattern_library.len(),
1284                },
1285            }
1286        })
1287        .await;
1288
1289        match result {
1290            Ok(stage_result) => {
1291                let duration_seconds = stage_start.elapsed().as_secs_f64();
1292                self.metrics.stage1_duration.observe(duration_seconds);
1293
1294                // Check performance threshold
1295                if stage_result.processing_time_ms > self.config.performance.stage1_target_ms {
1296                    self.metrics.stage1_threshold_violations.inc();
1297                    warn!(
1298                        "Stage 1 exceeded target time: {}ms > {}ms",
1299                        stage_result.processing_time_ms, self.config.performance.stage1_target_ms
1300                    );
1301                }
1302
1303                debug!(
1304                    "Stage 1 completed in {}ms with score {:.3} and confidence {:.3}",
1305                    stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1306                );
1307
1308                Ok(stage_result)
1309            }
1310            Err(_) => {
1311                self.metrics.stage1_threshold_violations.inc();
1312                Err(ImportanceAssessmentError::Timeout(format!(
1313                    "Stage 1 exceeded maximum processing time of {}ms",
1314                    self.config.stage1.max_processing_time_ms
1315                )))
1316            }
1317        }
1318    }
1319
1320    async fn execute_stage2(
1321        &self,
1322        content: &str,
1323    ) -> Result<StageResult, ImportanceAssessmentError> {
1324        let stage_start = Instant::now();
1325        self.metrics.stage2_executions.inc();
1326
1327        let timeout_duration = Duration::from_millis(self.config.stage2.max_processing_time_ms);
1328
1329        let stage2_result = async {
1330            // Cleanup expired entries periodically (every 100th request)
1331            if self.metrics.stage2_executions.get() % 100 == 0 {
1332                if let Err(e) = self.embedding_cache.cleanup_expired().await {
1333                    warn!("Failed to cleanup expired cache entries: {}", e);
1334                }
1335            }
1336
1337            // Generate secure hash of content using SHA-256
1338            let mut hasher = Sha256::new();
1339            hasher.update(content.as_bytes());
1340            let content_hash = format!("{:x}", hasher.finalize());
1341
1342            let (content_embedding, cache_hit, embedding_time) =
1343                if let Some(cached) = self.embedding_cache.get(&content_hash).await {
1344                    self.metrics.embedding_cache_hits.inc();
1345                    (cached.embedding, true, None)
1346                } else {
1347                    self.metrics.embedding_cache_misses.inc();
1348                    let embed_start = Instant::now();
1349                    let embedding = match self.embedding_service.generate_embedding(content).await {
1350                        Ok(emb) => emb,
1351                        Err(e) => {
1352                            return Err(ImportanceAssessmentError::Stage2Failed(format!(
1353                                "Embedding generation failed: {}",
1354                                e
1355                            )))
1356                        }
1357                    };
1358                    let embed_time = embed_start.elapsed().as_millis() as u64;
1359
1360                    // Cache the new embedding with error handling
1361                    let cached_embedding = CachedEmbedding::new(
1362                        embedding.clone(),
1363                        self.config.stage2.embedding_cache_ttl_seconds,
1364                    );
1365
1366                    if let Err(e) = self
1367                        .embedding_cache
1368                        .insert(content_hash, cached_embedding)
1369                        .await
1370                    {
1371                        warn!("Failed to cache embedding: {}", e);
1372                    }
1373
1374                    self.metrics
1375                        .embedding_cache_size
1376                        .set(self.embedding_cache.len() as i64);
1377                    (embedding, false, Some(embed_time))
1378                };
1379
1380            // Calculate similarity scores with reference embeddings
1381            let mut similarity_scores = Vec::new();
1382            let mut total_weighted_score = 0.0;
1383            let mut total_weight = 0.0;
1384
1385            for reference in &self.config.stage2.reference_embeddings {
1386                let similarity =
1387                    self.calculate_cosine_similarity(&content_embedding, &reference.embedding);
1388
1389                if similarity >= self.config.stage2.similarity_threshold {
1390                    let weighted_score = similarity as f64 * reference.weight;
1391
1392                    similarity_scores.push(SimilarityScore {
1393                        reference_name: reference.name.clone(),
1394                        reference_category: reference.category.clone(),
1395                        similarity,
1396                        weight: reference.weight,
1397                        weighted_score,
1398                    });
1399
1400                    total_weighted_score += weighted_score;
1401                    total_weight += reference.weight;
1402                }
1403            }
1404
1405            // Normalize score to 0.0-1.0 range
1406            let normalized_score = if total_weight > 0.0 {
1407                (total_weighted_score / total_weight).min(1.0)
1408            } else {
1409                0.0
1410            };
1411
1412            // Calculate confidence based on number of matches and their strength
1413            let confidence = if similarity_scores.is_empty() {
1414                0.1 // Low confidence for no semantic matches
1415            } else {
1416                let match_ratio = similarity_scores.len() as f64
1417                    / self.config.stage2.reference_embeddings.len() as f64;
1418                let avg_similarity = similarity_scores
1419                    .iter()
1420                    .map(|s| s.similarity as f64)
1421                    .sum::<f64>()
1422                    / similarity_scores.len() as f64;
1423                (match_ratio * 0.5 + avg_similarity * 0.5).min(1.0)
1424            };
1425
1426            let passed_threshold = confidence >= self.config.stage2.confidence_threshold;
1427
1428            Ok(StageResult {
1429                stage: AssessmentStage::Stage2SemanticSimilarity,
1430                score: normalized_score,
1431                confidence,
1432                processing_time_ms: stage_start.elapsed().as_millis() as u64,
1433                passed_threshold,
1434                details: StageDetails::Stage2 {
1435                    similarity_scores,
1436                    cache_hit,
1437                    embedding_generation_time_ms: embedding_time,
1438                },
1439            })
1440        };
1441
1442        let result = timeout(timeout_duration, stage2_result).await;
1443
1444        match result {
1445            Ok(Ok(stage_result)) => {
1446                let duration_seconds = stage_start.elapsed().as_secs_f64();
1447                self.metrics.stage2_duration.observe(duration_seconds);
1448
1449                // Check performance threshold
1450                if stage_result.processing_time_ms > self.config.performance.stage2_target_ms {
1451                    self.metrics.stage2_threshold_violations.inc();
1452                    warn!(
1453                        "Stage 2 exceeded target time: {}ms > {}ms",
1454                        stage_result.processing_time_ms, self.config.performance.stage2_target_ms
1455                    );
1456                }
1457
1458                debug!(
1459                    "Stage 2 completed in {}ms with score {:.3} and confidence {:.3}",
1460                    stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1461                );
1462
1463                Ok(stage_result)
1464            }
1465            Ok(Err(e)) => Err(e),
1466            Err(_) => {
1467                self.metrics.stage2_threshold_violations.inc();
1468                Err(ImportanceAssessmentError::Timeout(format!(
1469                    "Stage 2 exceeded maximum processing time of {}ms",
1470                    self.config.stage2.max_processing_time_ms
1471                )))
1472            }
1473        }
1474    }
1475
1476    async fn execute_stage3(
1477        &self,
1478        content: &str,
1479    ) -> Result<StageResult, ImportanceAssessmentError> {
1480        let stage_start = Instant::now();
1481        self.metrics.stage3_executions.inc();
1482
1483        // Check circuit breaker
1484        if !self.circuit_breaker.can_execute().await? {
1485            return Err(ImportanceAssessmentError::CircuitBreakerOpen(
1486                "LLM circuit breaker is open".to_string(),
1487            ));
1488        }
1489
1490        let timeout_duration = Duration::from_millis(self.config.stage3.max_processing_time_ms);
1491
1492        let result = timeout(timeout_duration, async {
1493            // Prepare LLM prompt with length limits
1494            let content_preview = if content.len() > 2000 {
1495                format!(
1496                    "{}... [truncated from {} chars]",
1497                    &content[..2000],
1498                    content.len()
1499                )
1500            } else {
1501                content.to_string()
1502            };
1503
1504            let prompt = self
1505                .config
1506                .stage3
1507                .prompt_template
1508                .replace("{content}", &content_preview)
1509                .replace("{timestamp}", &Utc::now().to_rfc3339());
1510
1511            // Make LLM request
1512            let llm_response = self.call_llm(&prompt).await?;
1513
1514            // Parse LLM response to extract importance score and confidence
1515            let (importance_score, confidence) = self.parse_llm_response(&llm_response)?;
1516
1517            let passed_threshold = true; // Stage 3 is the final stage
1518
1519            Ok::<StageResult, ImportanceAssessmentError>(StageResult {
1520                stage: AssessmentStage::Stage3LLMScoring,
1521                score: importance_score,
1522                confidence,
1523                processing_time_ms: stage_start.elapsed().as_millis() as u64,
1524                passed_threshold,
1525                details: StageDetails::Stage3 {
1526                    llm_response,
1527                    prompt_tokens: Some(prompt.len() / 4), // Rough token estimate
1528                    completion_tokens: None,               // Would need to be provided by LLM API
1529                    model_used: "configured-model".to_string(),
1530                },
1531            })
1532        })
1533        .await;
1534
1535        match result {
1536            Ok(Ok(stage_result)) => {
1537                let duration_seconds = stage_start.elapsed().as_secs_f64();
1538                self.metrics.stage3_duration.observe(duration_seconds);
1539                self.metrics.llm_call_successes.inc();
1540                self.circuit_breaker.record_success().await;
1541
1542                // Check performance threshold
1543                if stage_result.processing_time_ms > self.config.performance.stage3_target_ms {
1544                    self.metrics.stage3_threshold_violations.inc();
1545                    warn!(
1546                        "Stage 3 exceeded target time: {}ms > {}ms",
1547                        stage_result.processing_time_ms, self.config.performance.stage3_target_ms
1548                    );
1549                }
1550
1551                debug!(
1552                    "Stage 3 completed in {}ms with score {:.3} and confidence {:.3}",
1553                    stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1554                );
1555
1556                Ok(stage_result)
1557            }
1558            Ok(Err(e)) => {
1559                self.metrics.llm_call_failures.inc();
1560                self.circuit_breaker.record_failure().await;
1561                Err(e)
1562            }
1563            Err(_) => {
1564                self.metrics.stage3_threshold_violations.inc();
1565                self.metrics.llm_call_failures.inc();
1566                self.circuit_breaker.record_failure().await;
1567                Err(ImportanceAssessmentError::Timeout(format!(
1568                    "Stage 3 exceeded maximum processing time of {}ms",
1569                    self.config.stage3.max_processing_time_ms
1570                )))
1571            }
1572        }
1573    }
1574
1575    fn calculate_cosine_similarity(&self, a: &[f32], b: &[f32]) -> f32 {
1576        if a.len() != b.len() {
1577            return 0.0;
1578        }
1579
1580        let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
1581        let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1582        let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1583
1584        if norm_a == 0.0 || norm_b == 0.0 {
1585            return 0.0;
1586        }
1587
1588        dot_product / (norm_a * norm_b)
1589    }
1590
1591    async fn call_llm(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1592        let sanitized_prompt = self.sanitize_llm_prompt(prompt)?;
1593
1594        // Implement exponential backoff retry logic
1595        let mut attempts = 0;
1596        let max_retries = 3;
1597        let base_delay = Duration::from_millis(100);
1598
1599        loop {
1600            match self.attempt_llm_call(&sanitized_prompt).await {
1601                Ok(response) => return Ok(response),
1602                Err(e) => {
1603                    attempts += 1;
1604                    if attempts > max_retries {
1605                        return Err(e);
1606                    }
1607
1608                    // Exponential backoff with jitter
1609                    let delay = base_delay * (2_u32.pow(attempts - 1));
1610                    let jitter = Duration::from_millis(rand::thread_rng().gen_range(0..=100));
1611                    tokio::time::sleep(delay + jitter).await;
1612
1613                    warn!(
1614                        "LLM call failed (attempt {}/{}), retrying in {:?}: {}",
1615                        attempts,
1616                        max_retries,
1617                        delay + jitter,
1618                        e
1619                    );
1620                }
1621            }
1622        }
1623    }
1624
1625    async fn attempt_llm_call(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1626        // Validate input length to prevent resource exhaustion
1627        if prompt.len() > 50000 {
1628            // 50KB limit
1629            return Err(ImportanceAssessmentError::Stage3Failed(
1630                "Prompt exceeds maximum length".to_string(),
1631            ));
1632        }
1633
1634        let request_body = serde_json::json!({
1635            "prompt": prompt,
1636            "max_tokens": 150,
1637            "temperature": 0.1,
1638            "stop": ["\n\n", "---"],
1639            "model": "text-davinci-003" // Default model
1640        });
1641
1642        let response = self
1643            .http_client
1644            .post(&self.config.stage3.llm_endpoint)
1645            .header("Content-Type", "application/json")
1646            .header("User-Agent", "codex-memory/0.1.0")
1647            .json(&request_body)
1648            .send()
1649            .await
1650            .map_err(|e| {
1651                if e.is_timeout() {
1652                    ImportanceAssessmentError::Timeout(format!("LLM request timed out: {}", e))
1653                } else if e.is_connect() {
1654                    ImportanceAssessmentError::Stage3Failed(format!(
1655                        "Failed to connect to LLM service: {}",
1656                        e
1657                    ))
1658                } else {
1659                    ImportanceAssessmentError::Stage3Failed(format!("LLM request failed: {}", e))
1660                }
1661            })?;
1662
1663        let status = response.status();
1664        let response_text = response.text().await.map_err(|e| {
1665            ImportanceAssessmentError::Stage3Failed(format!("Failed to read response body: {}", e))
1666        })?;
1667
1668        if !status.is_success() {
1669            // Handle different HTTP error codes
1670            let error_msg = match status.as_u16() {
1671                400 => format!("Bad request to LLM service: {}", response_text),
1672                401 => "Unauthorized: Invalid API key or credentials".to_string(),
1673                403 => "Forbidden: Insufficient permissions".to_string(),
1674                429 => "Rate limit exceeded, will retry".to_string(),
1675                500..=599 => format!("LLM service error ({}): {}", status, response_text),
1676                _ => format!("LLM service returned status {}: {}", status, response_text),
1677            };
1678
1679            return Err(ImportanceAssessmentError::Stage3Failed(error_msg));
1680        }
1681
1682        // Parse response with proper error handling
1683        let response_json: serde_json::Value =
1684            serde_json::from_str(&response_text).map_err(|e| {
1685                ImportanceAssessmentError::Stage3Failed(format!(
1686                    "Failed to parse LLM response as JSON: {}",
1687                    e
1688                ))
1689            })?;
1690
1691        // Extract response text with multiple fallback paths
1692        let response_content = response_json
1693            .get("choices")
1694            .and_then(|c| c.get(0))
1695            .and_then(|choice| {
1696                // Try different response formats (OpenAI, local models, etc.)
1697                choice
1698                    .get("text")
1699                    .or_else(|| choice.get("message").and_then(|m| m.get("content")))
1700                    .or_else(|| choice.get("generated_text"))
1701            })
1702            .and_then(|v| v.as_str())
1703            .ok_or_else(|| {
1704                ImportanceAssessmentError::Stage3Failed(format!(
1705                    "Invalid LLM response format. Expected 'choices[0].text' or similar. Got: {}",
1706                    serde_json::to_string_pretty(&response_json)
1707                        .unwrap_or_else(|_| "invalid JSON".to_string())
1708                ))
1709            })?
1710            .trim()
1711            .to_string();
1712
1713        // Validate response length
1714        if response_content.is_empty() {
1715            return Err(ImportanceAssessmentError::Stage3Failed(
1716                "LLM returned empty response".to_string(),
1717            ));
1718        }
1719
1720        if response_content.len() > 10000 {
1721            // 10KB response limit
1722            warn!("LLM response was truncated due to excessive length");
1723            return Ok(response_content[..10000].to_string());
1724        }
1725
1726        Ok(response_content)
1727    }
1728
1729    fn sanitize_llm_prompt(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1730        // Basic prompt injection protection
1731        let dangerous_patterns = [
1732            "ignore previous instructions",
1733            "ignore all previous",
1734            "disregard previous",
1735            "forget previous",
1736            "new instructions:",
1737            "system:",
1738            "assistant:",
1739            "user:",
1740            "<|endoftext|>",
1741            "###",
1742            "---\n",
1743        ];
1744
1745        let lower_prompt = prompt.to_lowercase();
1746        for pattern in &dangerous_patterns {
1747            if lower_prompt.contains(pattern) {
1748                warn!("Potential prompt injection detected: {}", pattern);
1749                return Err(ImportanceAssessmentError::Stage3Failed(
1750                    "Prompt contains potentially malicious content".to_string(),
1751                ));
1752            }
1753        }
1754
1755        // Remove or escape potentially dangerous characters
1756        let sanitized = prompt
1757            .replace('\0', "") // Remove null bytes
1758            .replace("\x1b", "") // Remove escape sequences
1759            .chars()
1760            .filter(|c| c.is_ascii_graphic() || c.is_ascii_whitespace())
1761            .collect::<String>();
1762
1763        // Validate final length
1764        if sanitized.len() > 10000 {
1765            return Err(ImportanceAssessmentError::Stage3Failed(
1766                "Prompt too long after sanitization".to_string(),
1767            ));
1768        }
1769
1770        Ok(sanitized)
1771    }
1772
1773    fn parse_llm_response(&self, response: &str) -> Result<(f64, f64), ImportanceAssessmentError> {
1774        // Parse LLM response to extract importance score and confidence
1775        // This is a simplified parser - in practice, you'd want more robust parsing
1776
1777        let lines: Vec<&str> = response.lines().collect();
1778        let mut importance_score = 0.5; // Default
1779        let mut confidence = 0.7; // Default
1780
1781        for line in lines {
1782            let line = line.trim().to_lowercase();
1783
1784            // Look for importance score
1785            if line.contains("importance:") || line.contains("score:") {
1786                if let Some(score_str) = line.split(':').nth(1) {
1787                    if let Ok(score) = score_str.trim().parse::<f64>() {
1788                        importance_score = score.clamp(0.0, 1.0);
1789                    }
1790                }
1791            }
1792
1793            // Look for confidence
1794            if line.contains("confidence:") {
1795                if let Some(conf_str) = line.split(':').nth(1) {
1796                    if let Ok(conf) = conf_str.trim().parse::<f64>() {
1797                        confidence = conf.clamp(0.0, 1.0);
1798                    }
1799                }
1800            }
1801        }
1802
1803        Ok((importance_score, confidence))
1804    }
1805
1806    fn extract_explanation_from_stage3(&self, stage_result: &StageResult) -> Option<String> {
1807        if let StageDetails::Stage3 { llm_response, .. } = &stage_result.details {
1808            Some(llm_response.clone())
1809        } else {
1810            None
1811        }
1812    }
1813
1814    fn record_final_metrics(&self, result: &ImportanceAssessmentResult) {
1815        self.metrics
1816            .assessment_confidence
1817            .observe(result.confidence);
1818        self.metrics
1819            .final_importance_scores
1820            .observe(result.importance_score);
1821
1822        info!(
1823            "Importance assessment completed: score={:.3}, confidence={:.3}, stage={:?}, time={}ms",
1824            result.importance_score,
1825            result.confidence,
1826            result.final_stage,
1827            result.total_processing_time_ms
1828        );
1829    }
1830
1831    /// Get current pipeline statistics
1832    pub async fn get_statistics(&self) -> PipelineStatistics {
1833        let cache_size = self.embedding_cache.len();
1834        let eviction_count = self.embedding_cache.eviction_count();
1835
1836        PipelineStatistics {
1837            cache_size,
1838            stage1_executions: self.metrics.stage1_executions.get(),
1839            stage2_executions: self.metrics.stage2_executions.get(),
1840            stage3_executions: self.metrics.stage3_executions.get(),
1841            completed_at_stage1: self.metrics.completed_at_stage1.get(),
1842            completed_at_stage2: self.metrics.completed_at_stage2.get(),
1843            completed_at_stage3: self.metrics.completed_at_stage3.get(),
1844            cache_hits: self.metrics.embedding_cache_hits.get(),
1845            cache_misses: self.metrics.embedding_cache_misses.get(),
1846            cache_evictions: eviction_count,
1847            circuit_breaker_state: format!("{:?}", *self.circuit_breaker.state.read().await),
1848            llm_success_rate: {
1849                let successes = self.metrics.llm_call_successes.get() as f64;
1850                let failures = self.metrics.llm_call_failures.get() as f64;
1851                let total = successes + failures;
1852                if total > 0.0 {
1853                    successes / total
1854                } else {
1855                    1.0
1856                }
1857            },
1858        }
1859    }
1860
1861    /// Clear the embedding cache
1862    pub async fn clear_cache(&self) {
1863        self.embedding_cache.clear().await;
1864        self.metrics.embedding_cache_size.set(0);
1865        info!("Embedding cache cleared");
1866    }
1867
1868    /// Get cache hit ratio
1869    pub fn get_cache_hit_ratio(&self) -> f64 {
1870        let hits = self.metrics.embedding_cache_hits.get() as f64;
1871        let misses = self.metrics.embedding_cache_misses.get() as f64;
1872        let total = hits + misses;
1873        if total > 0.0 {
1874            hits / total
1875        } else {
1876            0.0
1877        }
1878    }
1879}
1880
1881#[derive(Debug, Clone, Serialize, Deserialize)]
1882pub struct PipelineStatistics {
1883    pub cache_size: usize,
1884    pub stage1_executions: u64,
1885    pub stage2_executions: u64,
1886    pub stage3_executions: u64,
1887    pub completed_at_stage1: u64,
1888    pub completed_at_stage2: u64,
1889    pub completed_at_stage3: u64,
1890    pub cache_hits: u64,
1891    pub cache_misses: u64,
1892    pub cache_evictions: u64,
1893    pub circuit_breaker_state: String,
1894    pub llm_success_rate: f64,
1895}
1896
1897impl Default for ImportanceAssessmentConfig {
1898    fn default() -> Self {
1899        Self {
1900            stage1: Stage1Config {
1901                confidence_threshold: 0.6,
1902                pattern_library: vec![
1903                    ImportancePattern {
1904                        name: "remember_command".to_string(),
1905                        pattern: r"(?i)\b(remember|recall|don't forget)\b".to_string(),
1906                        weight: 0.8,
1907                        context_boosters: vec!["important".to_string(), "critical".to_string()],
1908                        category: "memory".to_string(),
1909                    },
1910                    ImportancePattern {
1911                        name: "preference_statement".to_string(),
1912                        pattern: r"(?i)\b(prefer|like|want|choose)\b".to_string(),
1913                        weight: 0.7,
1914                        context_boosters: vec!["always".to_string(), "usually".to_string()],
1915                        category: "preference".to_string(),
1916                    },
1917                    ImportancePattern {
1918                        name: "decision_making".to_string(),
1919                        pattern: r"(?i)\b(decide|decision|choose|select)\b".to_string(),
1920                        weight: 0.75,
1921                        context_boosters: vec!["final".to_string(), "official".to_string()],
1922                        category: "decision".to_string(),
1923                    },
1924                    ImportancePattern {
1925                        name: "correction".to_string(),
1926                        pattern: r"(?i)\b(correct|fix|wrong|mistake|error)\b".to_string(),
1927                        weight: 0.6,
1928                        context_boosters: vec!["actually".to_string(), "should".to_string()],
1929                        category: "correction".to_string(),
1930                    },
1931                    ImportancePattern {
1932                        name: "importance_marker".to_string(),
1933                        pattern: r"(?i)\b(important|critical|crucial|vital|essential)\b".to_string(),
1934                        weight: 0.9,
1935                        context_boosters: vec!["very".to_string(), "extremely".to_string()],
1936                        category: "importance".to_string(),
1937                    },
1938                ],
1939                max_processing_time_ms: 10,
1940            },
1941            stage2: Stage2Config {
1942                confidence_threshold: 0.7,
1943                max_processing_time_ms: 100,
1944                embedding_cache_ttl_seconds: 3600, // 1 hour
1945                embedding_cache_max_size: 10000, // Maximum 10k cached embeddings
1946                cache_eviction_threshold: 0.8, // Start evicting at 80% capacity
1947                similarity_threshold: 0.7,
1948                reference_embeddings: vec![], // Would be populated with pre-computed embeddings
1949            },
1950            stage3: Stage3Config {
1951                max_processing_time_ms: 1000,
1952                llm_endpoint: "http://localhost:8080/generate".to_string(),
1953                max_concurrent_requests: 5,
1954                prompt_template: "Assess the importance of this content on a scale of 0.0 to 1.0. Consider context, user intent, and actionability.\n\nContent: {content}\n\nProvide your assessment as:\nImportance: [score]\nConfidence: [confidence]\nReasoning: [explanation]".to_string(),
1955                target_usage_percentage: 20.0,
1956            },
1957            circuit_breaker: CircuitBreakerConfig {
1958                failure_threshold: 5,
1959                failure_window_seconds: 60,
1960                recovery_timeout_seconds: 30,
1961                minimum_requests: 3,
1962            },
1963            performance: PerformanceConfig {
1964                stage1_target_ms: 10,
1965                stage2_target_ms: 100,
1966                stage3_target_ms: 1000,
1967            },
1968        }
1969    }
1970}