codex_memory/memory/
importance_assessment.rs

1use crate::embedding::EmbeddingService;
2use crate::memory::MemoryError;
3use anyhow::Result;
4use chrono::{DateTime, Utc};
5use prometheus::{Counter, Histogram, IntCounter, IntGauge, Registry};
6use rand::Rng;
7use regex::Regex;
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10use std::collections::{HashMap, LinkedList};
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use thiserror::Error;
15use tokio::sync::RwLock;
16use tokio::time::timeout;
17use tracing::{debug, error, info, warn};
18
19#[derive(Debug, Error)]
20pub enum ImportanceAssessmentError {
21    #[error("Stage 1 pattern matching failed: {0}")]
22    Stage1Failed(String),
23
24    #[error("Stage 2 semantic analysis failed: {0}")]
25    Stage2Failed(String),
26
27    #[error("Stage 3 LLM scoring failed: {0}")]
28    Stage3Failed(String),
29
30    #[error("Circuit breaker is open: {0}")]
31    CircuitBreakerOpen(String),
32
33    #[error("Timeout exceeded: {0}")]
34    Timeout(String),
35
36    #[error("Configuration error: {0}")]
37    Configuration(String),
38
39    #[error("Memory operation failed: {0}")]
40    Memory(#[from] MemoryError),
41
42    #[error("Cache operation failed: {0}")]
43    Cache(String),
44}
45
46/// Configuration for the importance assessment pipeline
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ImportanceAssessmentConfig {
49    /// Stage 1: Pattern matching configuration
50    pub stage1: Stage1Config,
51
52    /// Stage 2: Semantic similarity configuration
53    pub stage2: Stage2Config,
54
55    /// Stage 3: LLM scoring configuration
56    pub stage3: Stage3Config,
57
58    /// Circuit breaker configuration
59    pub circuit_breaker: CircuitBreakerConfig,
60
61    /// Performance thresholds
62    pub performance: PerformanceConfig,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct Stage1Config {
67    /// Confidence threshold to pass to Stage 2 (0.0-1.0)
68    pub confidence_threshold: f64,
69
70    /// Pattern library for keyword/phrase matching
71    pub pattern_library: Vec<ImportancePattern>,
72
73    /// Maximum processing time in milliseconds
74    pub max_processing_time_ms: u64,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct Stage2Config {
79    /// Confidence threshold to pass to Stage 3 (0.0-1.0)
80    pub confidence_threshold: f64,
81
82    /// Maximum processing time in milliseconds
83    pub max_processing_time_ms: u64,
84
85    /// Cache TTL for embeddings in seconds
86    pub embedding_cache_ttl_seconds: u64,
87
88    /// Maximum cache size (number of entries)
89    pub embedding_cache_max_size: usize,
90
91    /// Cache eviction threshold (evict when this percentage full)
92    pub cache_eviction_threshold: f64,
93
94    /// Similarity threshold for semantic matching
95    pub similarity_threshold: f32,
96
97    /// Reference embeddings for importance patterns
98    pub reference_embeddings: Vec<ReferenceEmbedding>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct Stage3Config {
103    /// Maximum processing time in milliseconds
104    pub max_processing_time_ms: u64,
105
106    /// LLM endpoint configuration
107    pub llm_endpoint: String,
108
109    /// Maximum concurrent LLM requests
110    pub max_concurrent_requests: usize,
111
112    /// Prompt template for LLM scoring
113    pub prompt_template: String,
114
115    /// Target percentage of evaluations that should reach Stage 3
116    pub target_usage_percentage: f64,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct CircuitBreakerConfig {
121    /// Failure threshold before opening the circuit
122    pub failure_threshold: usize,
123
124    /// Time window for failure counting in seconds
125    pub failure_window_seconds: u64,
126
127    /// Recovery timeout in seconds
128    pub recovery_timeout_seconds: u64,
129
130    /// Minimum requests before evaluating failures
131    pub minimum_requests: usize,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct PerformanceConfig {
136    /// Stage 1 target time in milliseconds
137    pub stage1_target_ms: u64,
138
139    /// Stage 2 target time in milliseconds
140    pub stage2_target_ms: u64,
141
142    /// Stage 3 target time in milliseconds
143    pub stage3_target_ms: u64,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct ImportancePattern {
148    /// Pattern name for metrics and debugging
149    pub name: String,
150
151    /// Regular expression pattern
152    pub pattern: String,
153
154    /// Weight/importance score for this pattern (0.0-1.0)
155    pub weight: f64,
156
157    /// Context words that boost the pattern's importance
158    pub context_boosters: Vec<String>,
159
160    /// Category of the pattern (e.g., "command", "preference", "memory")
161    pub category: String,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct ReferenceEmbedding {
166    /// Name of the reference pattern
167    pub name: String,
168
169    /// Pre-computed embedding vector
170    pub embedding: Vec<f32>,
171
172    /// Importance weight for this reference
173    pub weight: f64,
174
175    /// Category of the reference
176    pub category: String,
177}
178
179/// Result of the importance assessment pipeline
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct ImportanceAssessmentResult {
182    /// Final importance score (0.0-1.0)
183    pub importance_score: f64,
184
185    /// Which stage provided the final score
186    pub final_stage: AssessmentStage,
187
188    /// Results from each stage
189    pub stage_results: Vec<StageResult>,
190
191    /// Total processing time in milliseconds
192    pub total_processing_time_ms: u64,
193
194    /// Assessment timestamp
195    pub assessed_at: DateTime<Utc>,
196
197    /// Confidence in the assessment (0.0-1.0)
198    pub confidence: f64,
199
200    /// Explanation of the assessment
201    pub explanation: Option<String>,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct StageResult {
206    /// Which stage this result is from
207    pub stage: AssessmentStage,
208
209    /// Score from this stage (0.0-1.0)
210    pub score: f64,
211
212    /// Confidence in this stage's result (0.0-1.0)
213    pub confidence: f64,
214
215    /// Processing time for this stage in milliseconds
216    pub processing_time_ms: u64,
217
218    /// Whether this stage passed its confidence threshold
219    pub passed_threshold: bool,
220
221    /// Stage-specific details
222    pub details: StageDetails,
223}
224
225#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
226pub enum AssessmentStage {
227    Stage1PatternMatching,
228    Stage2SemanticSimilarity,
229    Stage3LLMScoring,
230}
231
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233pub enum StageDetails {
234    Stage1 {
235        matched_patterns: Vec<MatchedPattern>,
236        total_patterns_checked: usize,
237    },
238    Stage2 {
239        similarity_scores: Vec<SimilarityScore>,
240        cache_hit: bool,
241        embedding_generation_time_ms: Option<u64>,
242    },
243    Stage3 {
244        llm_response: String,
245        prompt_tokens: Option<usize>,
246        completion_tokens: Option<usize>,
247        model_used: String,
248    },
249}
250
251#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
252pub struct MatchedPattern {
253    pub pattern_name: String,
254    pub pattern_category: String,
255    pub match_text: String,
256    pub match_position: usize,
257    pub weight: f64,
258    pub context_boost: f64,
259}
260
261#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
262pub struct SimilarityScore {
263    pub reference_name: String,
264    pub reference_category: String,
265    pub similarity: f32,
266    pub weight: f64,
267    pub weighted_score: f64,
268}
269
270/// Circuit breaker states
271#[derive(Debug, Clone, PartialEq)]
272enum CircuitBreakerState {
273    Closed,
274    Open(DateTime<Utc>),
275    HalfOpen,
276}
277
278/// Thread-safe circuit breaker for LLM calls using atomic operations
279#[derive(Debug)]
280struct CircuitBreaker {
281    state: RwLock<CircuitBreakerState>,
282    config: CircuitBreakerConfig,
283    failure_count: AtomicUsize,
284    last_failure_time: RwLock<Option<DateTime<Utc>>>,
285    request_count: AtomicUsize,
286    consecutive_successes: AtomicUsize, // For half-open state management
287}
288
289impl CircuitBreaker {
290    fn new(config: CircuitBreakerConfig) -> Self {
291        Self {
292            state: RwLock::new(CircuitBreakerState::Closed),
293            config,
294            failure_count: AtomicUsize::new(0),
295            last_failure_time: RwLock::new(None),
296            request_count: AtomicUsize::new(0),
297            consecutive_successes: AtomicUsize::new(0),
298        }
299    }
300
301    async fn can_execute(&self) -> Result<bool, ImportanceAssessmentError> {
302        let state = self.state.read().await;
303        match *state {
304            CircuitBreakerState::Closed => Ok(true),
305            CircuitBreakerState::Open(opened_at) => {
306                let now = Utc::now();
307                let recovery_time = opened_at
308                    + chrono::Duration::seconds(self.config.recovery_timeout_seconds as i64);
309
310                if now >= recovery_time {
311                    drop(state);
312                    let mut state = self.state.write().await;
313                    *state = CircuitBreakerState::HalfOpen;
314                    Ok(true)
315                } else {
316                    Err(ImportanceAssessmentError::CircuitBreakerOpen(format!(
317                        "Circuit breaker is open until {recovery_time}"
318                    )))
319                }
320            }
321            CircuitBreakerState::HalfOpen => Ok(true),
322        }
323    }
324
325    async fn record_success(&self) {
326        // Increment consecutive successes atomically
327        let consecutive_successes = self.consecutive_successes.fetch_add(1, Ordering::SeqCst);
328
329        // Reset failure count atomically
330        self.failure_count.store(0, Ordering::SeqCst);
331
332        // In half-open state, require multiple successes before fully closing
333        let current_state = {
334            let state = self.state.read().await;
335            state.clone()
336        };
337
338        match current_state {
339            CircuitBreakerState::HalfOpen => {
340                // Require at least 3 consecutive successes to close
341                if consecutive_successes >= 2 {
342                    let mut state = self.state.write().await;
343                    *state = CircuitBreakerState::Closed;
344
345                    let mut last_failure_time = self.last_failure_time.write().await;
346                    *last_failure_time = None;
347
348                    info!(
349                        "Circuit breaker closed after {} consecutive successes",
350                        consecutive_successes + 1
351                    );
352                }
353            }
354            CircuitBreakerState::Open(_) => {
355                // This shouldn't happen, but handle gracefully
356                warn!(
357                    "Received success while circuit breaker is open - state inconsistency detected"
358                );
359            }
360            CircuitBreakerState::Closed => {
361                // Already closed, just reset failure time
362                let mut last_failure_time = self.last_failure_time.write().await;
363                *last_failure_time = None;
364            }
365        }
366    }
367
368    async fn record_failure(&self) {
369        let now = Utc::now();
370
371        // Reset consecutive successes on any failure
372        self.consecutive_successes.store(0, Ordering::SeqCst);
373
374        // Atomically increment request count
375        let request_count = self.request_count.fetch_add(1, Ordering::SeqCst) + 1;
376
377        // Handle failure window and count atomically where possible
378        let should_open_circuit = {
379            let mut last_failure_time = self.last_failure_time.write().await;
380
381            // Check if we need to reset failure count due to time window
382            if let Some(last_failure) = *last_failure_time {
383                let window_start =
384                    now - chrono::Duration::seconds(self.config.failure_window_seconds as i64);
385                if last_failure < window_start {
386                    // Reset failure count as we're outside the window
387                    self.failure_count.store(0, Ordering::SeqCst);
388                }
389            }
390
391            // Atomically increment failure count
392            let failure_count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
393            *last_failure_time = Some(now);
394
395            // Check if we should open the circuit
396            request_count >= self.config.minimum_requests
397                && failure_count >= self.config.failure_threshold
398        };
399
400        if should_open_circuit {
401            // Check current state before opening
402            let current_state = {
403                let state = self.state.read().await;
404                state.clone()
405            };
406
407            match current_state {
408                CircuitBreakerState::Closed | CircuitBreakerState::HalfOpen => {
409                    let mut state = self.state.write().await;
410                    // Double-check state hasn't changed while acquiring write lock
411                    match *state {
412                        CircuitBreakerState::Closed | CircuitBreakerState::HalfOpen => {
413                            *state = CircuitBreakerState::Open(now);
414                            let failure_count = self.failure_count.load(Ordering::SeqCst);
415                            warn!(
416                                "Circuit breaker opened due to {} failures out of {} requests",
417                                failure_count, request_count
418                            );
419                        }
420                        CircuitBreakerState::Open(_) => {
421                            // Already open, nothing to do
422                        }
423                    }
424                }
425                CircuitBreakerState::Open(_) => {
426                    // Already open, just log
427                    debug!("Additional failure recorded while circuit breaker is open");
428                }
429            }
430        }
431    }
432}
433
434/// Cached embedding with TTL and LRU tracking
435#[derive(Debug, Clone)]
436struct CachedEmbedding {
437    embedding: Vec<f32>,
438    cached_at: DateTime<Utc>,
439    ttl_seconds: u64,
440    last_accessed: DateTime<Utc>,
441}
442
443/// LRU cache entry for tracking access order
444#[derive(Debug, Clone)]
445#[allow(dead_code)] // timestamp may be used for future LRU optimizations
446struct LRUNode {
447    key: String,
448    timestamp: DateTime<Utc>,
449}
450
451/// Thread-safe LRU cache for embeddings with automatic eviction
452#[derive(Debug)]
453struct EmbeddingCache {
454    cache: RwLock<HashMap<String, CachedEmbedding>>,
455    lru_list: RwLock<LinkedList<LRUNode>>,
456    current_size: AtomicUsize,
457    max_size: usize,
458    #[allow(dead_code)] // reserved for future adaptive eviction algorithms
459    eviction_threshold: f64,
460    eviction_count: AtomicU64,
461    memory_pressure_threshold: usize,
462}
463
464impl CachedEmbedding {
465    fn new(embedding: Vec<f32>, ttl_seconds: u64) -> Self {
466        let now = Utc::now();
467        Self {
468            embedding,
469            cached_at: now,
470            ttl_seconds,
471            last_accessed: now,
472        }
473    }
474
475    fn is_expired(&self) -> bool {
476        let now = Utc::now();
477        let expiry = self.cached_at + chrono::Duration::seconds(self.ttl_seconds as i64);
478        now >= expiry
479    }
480
481    fn touch(&mut self) {
482        self.last_accessed = Utc::now();
483    }
484}
485
486impl EmbeddingCache {
487    fn new(max_size: usize, eviction_threshold: f64) -> Self {
488        Self {
489            cache: RwLock::new(HashMap::new()),
490            lru_list: RwLock::new(LinkedList::new()),
491            current_size: AtomicUsize::new(0),
492            max_size,
493            eviction_threshold,
494            eviction_count: AtomicU64::new(0),
495            memory_pressure_threshold: (max_size as f64 * eviction_threshold) as usize,
496        }
497    }
498
499    async fn get(&self, key: &str) -> Option<CachedEmbedding> {
500        let mut cache = self.cache.write().await;
501        let mut lru_list = self.lru_list.write().await;
502
503        if let Some(cached) = cache.get_mut(key) {
504            if cached.is_expired() {
505                cache.remove(key);
506                self.current_size.fetch_sub(1, Ordering::Relaxed);
507                // Remove from LRU list manually (retain is unstable)
508                *lru_list = lru_list
509                    .iter()
510                    .filter(|node| node.key != key)
511                    .cloned()
512                    .collect();
513                return None;
514            }
515
516            cached.touch();
517            // Move to front of LRU list manually (retain is unstable)
518            *lru_list = lru_list
519                .iter()
520                .filter(|node| node.key != key)
521                .cloned()
522                .collect();
523            lru_list.push_front(LRUNode {
524                key: key.to_string(),
525                timestamp: cached.last_accessed,
526            });
527
528            Some(cached.clone())
529        } else {
530            None
531        }
532    }
533
534    async fn insert(
535        &self,
536        key: String,
537        value: CachedEmbedding,
538    ) -> Result<(), ImportanceAssessmentError> {
539        // Check if we need to evict before inserting
540        let current_size = self.current_size.load(Ordering::Relaxed);
541        if current_size >= self.memory_pressure_threshold {
542            self.evict_lru_entries().await?;
543        }
544
545        let mut cache = self.cache.write().await;
546        let mut lru_list = self.lru_list.write().await;
547
548        // If key already exists, update it
549        if cache.contains_key(&key) {
550            *lru_list = lru_list
551                .iter()
552                .filter(|node| node.key != key)
553                .cloned()
554                .collect();
555        } else {
556            self.current_size.fetch_add(1, Ordering::Relaxed);
557        }
558
559        cache.insert(key.clone(), value.clone());
560        lru_list.push_front(LRUNode {
561            key: key.clone(),
562            timestamp: value.last_accessed,
563        });
564
565        Ok(())
566    }
567
568    async fn evict_lru_entries(&self) -> Result<(), ImportanceAssessmentError> {
569        let mut cache = self.cache.write().await;
570        let mut lru_list = self.lru_list.write().await;
571
572        let target_size = (self.max_size as f64 * 0.7) as usize; // Evict to 70% capacity
573        let current_size = cache.len();
574
575        if current_size <= target_size {
576            return Ok(());
577        }
578
579        let entries_to_remove = current_size - target_size;
580        let mut removed_count = 0;
581
582        // Remove oldest entries
583        while removed_count < entries_to_remove && !lru_list.is_empty() {
584            if let Some(node) = lru_list.pop_back() {
585                cache.remove(&node.key);
586                removed_count += 1;
587                self.eviction_count.fetch_add(1, Ordering::Relaxed);
588            }
589        }
590
591        self.current_size.store(cache.len(), Ordering::Relaxed);
592
593        if removed_count > 0 {
594            info!(
595                "Evicted {} entries from embedding cache due to memory pressure",
596                removed_count
597            );
598        }
599
600        Ok(())
601    }
602
603    async fn clear(&self) {
604        let mut cache = self.cache.write().await;
605        let mut lru_list = self.lru_list.write().await;
606
607        cache.clear();
608        lru_list.clear();
609        self.current_size.store(0, Ordering::Relaxed);
610    }
611
612    fn len(&self) -> usize {
613        self.current_size.load(Ordering::Relaxed)
614    }
615
616    fn eviction_count(&self) -> u64 {
617        self.eviction_count.load(Ordering::Relaxed)
618    }
619
620    async fn cleanup_expired(&self) -> Result<usize, ImportanceAssessmentError> {
621        let mut cache = self.cache.write().await;
622        let mut lru_list = self.lru_list.write().await;
623
624        let mut expired_keys = Vec::new();
625
626        for (key, value) in cache.iter() {
627            if value.is_expired() {
628                expired_keys.push(key.clone());
629            }
630        }
631
632        for key in &expired_keys {
633            cache.remove(key);
634            *lru_list = lru_list
635                .iter()
636                .filter(|node| &node.key != key)
637                .cloned()
638                .collect();
639        }
640
641        let removed_count = expired_keys.len();
642        self.current_size.store(cache.len(), Ordering::Relaxed);
643
644        if removed_count > 0 {
645            debug!(
646                "Cleaned up {} expired entries from embedding cache",
647                removed_count
648            );
649        }
650
651        Ok(removed_count)
652    }
653}
654
655/// Metrics for the importance assessment pipeline
656#[derive(Debug)]
657pub struct ImportanceAssessmentMetrics {
658    // Stage progression counters
659    pub stage1_executions: IntCounter,
660    pub stage2_executions: IntCounter,
661    pub stage3_executions: IntCounter,
662
663    // Stage timing histograms
664    pub stage1_duration: Histogram,
665    pub stage2_duration: Histogram,
666    pub stage3_duration: Histogram,
667
668    // Pipeline completion counters
669    pub completed_at_stage1: IntCounter,
670    pub completed_at_stage2: IntCounter,
671    pub completed_at_stage3: IntCounter,
672
673    // Performance metrics
674    pub stage1_threshold_violations: IntCounter,
675    pub stage2_threshold_violations: IntCounter,
676    pub stage3_threshold_violations: IntCounter,
677
678    // Cache metrics
679    pub embedding_cache_hits: IntCounter,
680    pub embedding_cache_misses: IntCounter,
681    pub embedding_cache_size: IntGauge,
682
683    // Circuit breaker metrics
684    pub circuit_breaker_opened: Counter,
685    pub circuit_breaker_half_open: Counter,
686    pub circuit_breaker_closed: Counter,
687    pub llm_call_failures: IntCounter,
688    pub llm_call_successes: IntCounter,
689
690    // Quality metrics
691    pub assessment_confidence: Histogram,
692    pub final_importance_scores: Histogram,
693}
694
695impl ImportanceAssessmentMetrics {
696    pub fn new(registry: &Registry) -> Result<Self> {
697        let stage1_executions = IntCounter::new(
698            "importance_assessment_stage1_executions_total",
699            "Total number of Stage 1 executions",
700        )?;
701        registry.register(Box::new(stage1_executions.clone()))?;
702
703        let stage2_executions = IntCounter::new(
704            "importance_assessment_stage2_executions_total",
705            "Total number of Stage 2 executions",
706        )?;
707        registry.register(Box::new(stage2_executions.clone()))?;
708
709        let stage3_executions = IntCounter::new(
710            "importance_assessment_stage3_executions_total",
711            "Total number of Stage 3 executions",
712        )?;
713        registry.register(Box::new(stage3_executions.clone()))?;
714
715        let stage1_duration = Histogram::with_opts(
716            prometheus::HistogramOpts::new(
717                "importance_assessment_stage1_duration_seconds",
718                "Duration of Stage 1 processing",
719            )
720            .buckets(vec![0.001, 0.005, 0.01, 0.02, 0.05, 0.1]),
721        )?;
722        registry.register(Box::new(stage1_duration.clone()))?;
723
724        let stage2_duration = Histogram::with_opts(
725            prometheus::HistogramOpts::new(
726                "importance_assessment_stage2_duration_seconds",
727                "Duration of Stage 2 processing",
728            )
729            .buckets(vec![0.01, 0.05, 0.1, 0.2, 0.5, 1.0]),
730        )?;
731        registry.register(Box::new(stage2_duration.clone()))?;
732
733        let stage3_duration = Histogram::with_opts(
734            prometheus::HistogramOpts::new(
735                "importance_assessment_stage3_duration_seconds",
736                "Duration of Stage 3 processing",
737            )
738            .buckets(vec![0.1, 0.5, 1.0, 2.0, 5.0, 10.0]),
739        )?;
740        registry.register(Box::new(stage3_duration.clone()))?;
741
742        let completed_at_stage1 = IntCounter::new(
743            "importance_assessment_completed_at_stage1_total",
744            "Total assessments completed at Stage 1",
745        )?;
746        registry.register(Box::new(completed_at_stage1.clone()))?;
747
748        let completed_at_stage2 = IntCounter::new(
749            "importance_assessment_completed_at_stage2_total",
750            "Total assessments completed at Stage 2",
751        )?;
752        registry.register(Box::new(completed_at_stage2.clone()))?;
753
754        let completed_at_stage3 = IntCounter::new(
755            "importance_assessment_completed_at_stage3_total",
756            "Total assessments completed at Stage 3",
757        )?;
758        registry.register(Box::new(completed_at_stage3.clone()))?;
759
760        let stage1_threshold_violations = IntCounter::new(
761            "importance_assessment_stage1_threshold_violations_total",
762            "Total Stage 1 performance threshold violations",
763        )?;
764        registry.register(Box::new(stage1_threshold_violations.clone()))?;
765
766        let stage2_threshold_violations = IntCounter::new(
767            "importance_assessment_stage2_threshold_violations_total",
768            "Total Stage 2 performance threshold violations",
769        )?;
770        registry.register(Box::new(stage2_threshold_violations.clone()))?;
771
772        let stage3_threshold_violations = IntCounter::new(
773            "importance_assessment_stage3_threshold_violations_total",
774            "Total Stage 3 performance threshold violations",
775        )?;
776        registry.register(Box::new(stage3_threshold_violations.clone()))?;
777
778        let embedding_cache_hits = IntCounter::new(
779            "importance_assessment_embedding_cache_hits_total",
780            "Total embedding cache hits",
781        )?;
782        registry.register(Box::new(embedding_cache_hits.clone()))?;
783
784        let embedding_cache_misses = IntCounter::new(
785            "importance_assessment_embedding_cache_misses_total",
786            "Total embedding cache misses",
787        )?;
788        registry.register(Box::new(embedding_cache_misses.clone()))?;
789
790        let embedding_cache_size = IntGauge::new(
791            "importance_assessment_embedding_cache_size",
792            "Current size of embedding cache",
793        )?;
794        registry.register(Box::new(embedding_cache_size.clone()))?;
795
796        let circuit_breaker_opened = Counter::new(
797            "importance_assessment_circuit_breaker_opened_total",
798            "Total times circuit breaker opened",
799        )?;
800        registry.register(Box::new(circuit_breaker_opened.clone()))?;
801
802        let circuit_breaker_half_open = Counter::new(
803            "importance_assessment_circuit_breaker_half_open_total",
804            "Total times circuit breaker went half-open",
805        )?;
806        registry.register(Box::new(circuit_breaker_half_open.clone()))?;
807
808        let circuit_breaker_closed = Counter::new(
809            "importance_assessment_circuit_breaker_closed_total",
810            "Total times circuit breaker closed",
811        )?;
812        registry.register(Box::new(circuit_breaker_closed.clone()))?;
813
814        let llm_call_failures = IntCounter::new(
815            "importance_assessment_llm_call_failures_total",
816            "Total LLM call failures",
817        )?;
818        registry.register(Box::new(llm_call_failures.clone()))?;
819
820        let llm_call_successes = IntCounter::new(
821            "importance_assessment_llm_call_successes_total",
822            "Total LLM call successes",
823        )?;
824        registry.register(Box::new(llm_call_successes.clone()))?;
825
826        let assessment_confidence = Histogram::with_opts(
827            prometheus::HistogramOpts::new(
828                "importance_assessment_confidence",
829                "Confidence scores of assessments",
830            )
831            .buckets(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]),
832        )?;
833        registry.register(Box::new(assessment_confidence.clone()))?;
834
835        let final_importance_scores = Histogram::with_opts(
836            prometheus::HistogramOpts::new(
837                "importance_assessment_final_scores",
838                "Final importance scores from assessments",
839            )
840            .buckets(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]),
841        )?;
842        registry.register(Box::new(final_importance_scores.clone()))?;
843
844        Ok(Self {
845            stage1_executions,
846            stage2_executions,
847            stage3_executions,
848            stage1_duration,
849            stage2_duration,
850            stage3_duration,
851            completed_at_stage1,
852            completed_at_stage2,
853            completed_at_stage3,
854            stage1_threshold_violations,
855            stage2_threshold_violations,
856            stage3_threshold_violations,
857            embedding_cache_hits,
858            embedding_cache_misses,
859            embedding_cache_size,
860            circuit_breaker_opened,
861            circuit_breaker_half_open,
862            circuit_breaker_closed,
863            llm_call_failures,
864            llm_call_successes,
865            assessment_confidence,
866            final_importance_scores,
867        })
868    }
869}
870
871/// Optimized pattern matcher with pre-compiled regex and fast string matching
872#[derive(Debug)]
873struct OptimizedPatternMatcher {
874    // Pre-compiled regex patterns with their metadata
875    regex_patterns: Vec<(Regex, ImportancePattern)>,
876    // Fast string patterns for simple keyword matching
877    keyword_patterns: Vec<(String, ImportancePattern)>,
878    // Combined regex for single-pass matching when possible
879    #[allow(dead_code)] // reserved for future optimization
880    combined_regex: Option<Regex>,
881}
882
883impl OptimizedPatternMatcher {
884    fn new(patterns: &[ImportancePattern]) -> Result<Self, ImportanceAssessmentError> {
885        let mut regex_patterns = Vec::new();
886        let mut keyword_patterns = Vec::new();
887
888        for pattern in patterns {
889            // Validate regex complexity to prevent ReDoS
890            Self::validate_regex_complexity(&pattern.pattern)?;
891
892            // Try to determine if this is a simple keyword pattern
893            if Self::is_simple_keyword_pattern(&pattern.pattern) {
894                // Extract the keyword from simple patterns like (?i)\b(word)\b
895                if let Some(keyword) = Self::extract_keyword(&pattern.pattern) {
896                    keyword_patterns.push((keyword.to_lowercase(), pattern.clone()));
897                    continue;
898                }
899            }
900
901            // Compile complex regex patterns
902            match Regex::new(&pattern.pattern) {
903                Ok(regex) => regex_patterns.push((regex, pattern.clone())),
904                Err(e) => {
905                    error!(
906                        "Failed to compile regex pattern '{}': {}",
907                        pattern.pattern, e
908                    );
909                    return Err(ImportanceAssessmentError::Configuration(format!(
910                        "Invalid regex pattern '{}': {}",
911                        pattern.pattern, e
912                    )));
913                }
914            }
915        }
916
917        Ok(Self {
918            regex_patterns,
919            keyword_patterns,
920            combined_regex: None, // Could be optimized further with a single combined regex
921        })
922    }
923
924    fn validate_regex_complexity(pattern: &str) -> Result<(), ImportanceAssessmentError> {
925        // Check for potentially dangerous regex patterns that could cause ReDoS
926        let dangerous_patterns = [
927            "(.*)*",
928            "(.+)+",
929            "([^x]*)*",
930            "(a|a)*",
931            "(a|a)+",
932            "(a*)*",
933            "(a+)+",
934            "(.{0,10000})*",
935            "(.*){10,}",
936            "(.+){10,}",
937        ];
938
939        for dangerous in &dangerous_patterns {
940            if pattern.contains(dangerous) {
941                return Err(ImportanceAssessmentError::Configuration(format!(
942                    "Regex pattern contains potentially dangerous sequence '{dangerous}': {pattern}"
943                )));
944            }
945        }
946
947        // Check pattern length
948        if pattern.len() > 1000 {
949            return Err(ImportanceAssessmentError::Configuration(
950                "Regex pattern too long (max 1000 characters)".to_string(),
951            ));
952        }
953
954        // Check for excessive nested groups
955        let open_parens = pattern.chars().filter(|&c| c == '(').count();
956        if open_parens > 20 {
957            return Err(ImportanceAssessmentError::Configuration(
958                "Regex pattern has too many nested groups (max 20)".to_string(),
959            ));
960        }
961
962        Ok(())
963    }
964
965    fn is_simple_keyword_pattern(pattern: &str) -> bool {
966        // Match patterns like (?i)\b(word|other)\b or (?i)\b(word)\b
967        pattern.starts_with("(?i)")
968            && pattern.contains("\\b")
969            && !pattern.contains(".*")
970            && !pattern.contains(".+")
971            && !pattern.contains("\\d")
972            && !pattern.contains("\\s")
973            && pattern.chars().filter(|&c| c == '(').count() <= 2
974    }
975
976    fn extract_keyword(pattern: &str) -> Option<String> {
977        // Extract keyword from patterns like (?i)\b(word)\b
978        if let Some(start) = pattern.find('(') {
979            if let Some(end) = pattern.rfind(')') {
980                let keywords = &pattern[start + 1..end];
981                // For simple single keyword patterns, return the first keyword
982                if !keywords.contains('|') && keywords.chars().all(|c| c.is_alphabetic()) {
983                    return Some(keywords.to_string());
984                }
985            }
986        }
987        None
988    }
989
990    fn find_matches(&self, content: &str, max_matches: usize) -> Vec<MatchedPattern> {
991        let mut matches = Vec::new();
992        let content_lower = content.to_lowercase();
993
994        // Fast keyword matching first
995        for (keyword, pattern) in &self.keyword_patterns {
996            if matches.len() >= max_matches {
997                break;
998            }
999
1000            let mut start = 0;
1001            while let Some(pos) = content_lower[start..].find(keyword) {
1002                let absolute_pos = start + pos;
1003
1004                // Check word boundaries manually for fast keyword matching
1005                let is_word_start = absolute_pos == 0
1006                    || !content_lower
1007                        .chars()
1008                        .nth(absolute_pos - 1)
1009                        .unwrap_or(' ')
1010                        .is_alphabetic();
1011                let is_word_end = absolute_pos + keyword.len() >= content_lower.len()
1012                    || !content_lower
1013                        .chars()
1014                        .nth(absolute_pos + keyword.len())
1015                        .unwrap_or(' ')
1016                        .is_alphabetic();
1017
1018                if is_word_start && is_word_end {
1019                    let context_boost = self.calculate_context_boost(
1020                        content,
1021                        absolute_pos,
1022                        &pattern.context_boosters,
1023                    );
1024
1025                    matches.push(MatchedPattern {
1026                        pattern_name: pattern.name.clone(),
1027                        pattern_category: pattern.category.clone(),
1028                        match_text: keyword.clone(),
1029                        match_position: absolute_pos,
1030                        weight: pattern.weight,
1031                        context_boost,
1032                    });
1033                }
1034
1035                start = absolute_pos + 1;
1036            }
1037        }
1038
1039        // Regex matching for complex patterns
1040        for (regex, pattern) in &self.regex_patterns {
1041            if matches.len() >= max_matches {
1042                break;
1043            }
1044
1045            for mat in regex.find_iter(content).take(max_matches - matches.len()) {
1046                let match_text = mat.as_str().to_string();
1047                let match_position = mat.start();
1048
1049                let context_boost = self.calculate_context_boost(
1050                    content,
1051                    match_position,
1052                    &pattern.context_boosters,
1053                );
1054
1055                matches.push(MatchedPattern {
1056                    pattern_name: pattern.name.clone(),
1057                    pattern_category: pattern.category.clone(),
1058                    match_text,
1059                    match_position,
1060                    weight: pattern.weight,
1061                    context_boost,
1062                });
1063            }
1064        }
1065
1066        matches
1067    }
1068
1069    fn calculate_context_boost(
1070        &self,
1071        content: &str,
1072        match_position: usize,
1073        boosters: &[String],
1074    ) -> f64 {
1075        let window_size = 100;
1076        let start = match_position.saturating_sub(window_size);
1077        let end = (match_position + window_size).min(content.len());
1078        let context = &content[start..end].to_lowercase();
1079
1080        let mut boost: f64 = 0.0;
1081        for booster in boosters {
1082            if context.contains(&booster.to_lowercase()) {
1083                boost += 0.1; // 10% boost per context word
1084            }
1085        }
1086
1087        boost.min(0.5_f64) // Maximum 50% boost
1088    }
1089}
1090
1091/// Main importance assessment pipeline
1092pub struct ImportanceAssessmentPipeline {
1093    config: ImportanceAssessmentConfig,
1094    pattern_matcher: OptimizedPatternMatcher,
1095    embedding_service: Arc<dyn EmbeddingService>,
1096    embedding_cache: EmbeddingCache,
1097    circuit_breaker: CircuitBreaker,
1098    metrics: ImportanceAssessmentMetrics,
1099    http_client: reqwest::Client,
1100}
1101
1102impl ImportanceAssessmentPipeline {
1103    pub fn new(
1104        config: ImportanceAssessmentConfig,
1105        embedding_service: Arc<dyn EmbeddingService>,
1106        metrics_registry: &Registry,
1107    ) -> Result<Self> {
1108        // Initialize optimized pattern matcher
1109        let pattern_matcher = OptimizedPatternMatcher::new(&config.stage1.pattern_library)?;
1110
1111        let metrics = ImportanceAssessmentMetrics::new(metrics_registry)?;
1112
1113        let circuit_breaker = CircuitBreaker::new(config.circuit_breaker.clone());
1114
1115        let http_client = reqwest::Client::builder()
1116            .timeout(Duration::from_millis(config.stage3.max_processing_time_ms))
1117            .build()?;
1118
1119        let embedding_cache = EmbeddingCache::new(
1120            config.stage2.embedding_cache_max_size,
1121            config.stage2.cache_eviction_threshold,
1122        );
1123
1124        Ok(Self {
1125            config,
1126            pattern_matcher,
1127            embedding_service,
1128            embedding_cache,
1129            circuit_breaker,
1130            metrics,
1131            http_client,
1132        })
1133    }
1134
1135    /// Assess the importance of a memory content string
1136    pub async fn assess_importance(
1137        &self,
1138        content: &str,
1139    ) -> Result<ImportanceAssessmentResult, ImportanceAssessmentError> {
1140        let assessment_start = Instant::now();
1141        let mut stage_results = Vec::new();
1142
1143        info!(
1144            "Starting importance assessment for content length: {}",
1145            content.len()
1146        );
1147
1148        // Stage 1: Pattern matching
1149        let stage1_result = self.execute_stage1(content).await?;
1150        let stage1_passed = stage1_result.passed_threshold;
1151        stage_results.push(stage1_result.clone());
1152
1153        if stage1_passed {
1154            debug!("Stage 1 passed threshold, proceeding to Stage 2");
1155
1156            // Stage 2: Semantic similarity
1157            let stage2_result = self.execute_stage2(content).await?;
1158            let stage2_passed = stage2_result.passed_threshold;
1159            stage_results.push(stage2_result.clone());
1160
1161            if stage2_passed {
1162                debug!("Stage 2 passed threshold, proceeding to Stage 3");
1163
1164                // Stage 3: LLM scoring
1165                let stage3_result = self.execute_stage3(content).await?;
1166                stage_results.push(stage3_result.clone());
1167
1168                self.metrics.completed_at_stage3.inc();
1169
1170                let final_score = stage3_result.score;
1171                let confidence = stage3_result.confidence;
1172
1173                let result = ImportanceAssessmentResult {
1174                    importance_score: final_score,
1175                    final_stage: AssessmentStage::Stage3LLMScoring,
1176                    stage_results,
1177                    total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1178                    assessed_at: Utc::now(),
1179                    confidence,
1180                    explanation: self.extract_explanation_from_stage3(&stage3_result),
1181                };
1182
1183                self.record_final_metrics(&result);
1184                Ok(result)
1185            } else {
1186                self.metrics.completed_at_stage2.inc();
1187
1188                let final_score = stage2_result.score;
1189                let confidence = stage2_result.confidence;
1190
1191                let result = ImportanceAssessmentResult {
1192                    importance_score: final_score,
1193                    final_stage: AssessmentStage::Stage2SemanticSimilarity,
1194                    stage_results,
1195                    total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1196                    assessed_at: Utc::now(),
1197                    confidence,
1198                    explanation: Some(
1199                        "Assessment completed at Stage 2 based on semantic similarity".to_string(),
1200                    ),
1201                };
1202
1203                self.record_final_metrics(&result);
1204                Ok(result)
1205            }
1206        } else {
1207            self.metrics.completed_at_stage1.inc();
1208
1209            let final_score = stage1_result.score;
1210            let confidence = stage1_result.confidence;
1211
1212            let result = ImportanceAssessmentResult {
1213                importance_score: final_score,
1214                final_stage: AssessmentStage::Stage1PatternMatching,
1215                stage_results,
1216                total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1217                assessed_at: Utc::now(),
1218                confidence,
1219                explanation: Some(
1220                    "Assessment completed at Stage 1 based on pattern matching".to_string(),
1221                ),
1222            };
1223
1224            self.record_final_metrics(&result);
1225            Ok(result)
1226        }
1227    }
1228
1229    async fn execute_stage1(
1230        &self,
1231        content: &str,
1232    ) -> Result<StageResult, ImportanceAssessmentError> {
1233        let stage_start = Instant::now();
1234        self.metrics.stage1_executions.inc();
1235
1236        let timeout_duration = Duration::from_millis(self.config.stage1.max_processing_time_ms);
1237
1238        let result = timeout(timeout_duration, async {
1239            // Limit content length for Stage 1 to prevent performance issues
1240            let content_for_analysis = if content.len() > 10000 {
1241                warn!(
1242                    "Content length {} exceeds Stage 1 limit, truncating to 10000 chars",
1243                    content.len()
1244                );
1245                &content[..10000]
1246            } else {
1247                content
1248            };
1249
1250            // Use optimized pattern matching with limits
1251            let max_matches = 50; // Limit total matches to prevent runaway processing
1252            let matched_patterns = self
1253                .pattern_matcher
1254                .find_matches(content_for_analysis, max_matches);
1255
1256            let mut total_score = 0.0;
1257            let mut max_weight: f64 = 0.0;
1258
1259            for pattern in &matched_patterns {
1260                let effective_weight = pattern.weight * (1.0 + pattern.context_boost);
1261                total_score += effective_weight;
1262                max_weight = max_weight.max(effective_weight);
1263            }
1264
1265            // Normalize score to 0.0-1.0 range
1266            let normalized_score = if matched_patterns.is_empty() {
1267                0.0
1268            } else {
1269                (total_score / (matched_patterns.len() as f64)).min(1.0)
1270            };
1271
1272            // Calculate confidence based on pattern diversity and strength
1273            let confidence = if matched_patterns.is_empty() {
1274                0.1 // Low confidence for no matches
1275            } else {
1276                let pattern_diversity = matched_patterns
1277                    .iter()
1278                    .map(|m| m.pattern_category.clone())
1279                    .collect::<std::collections::HashSet<_>>()
1280                    .len() as f64;
1281                let pattern_count = self.config.stage1.pattern_library.len().max(1) as f64; // Avoid division by zero
1282                let base_confidence = (pattern_diversity / pattern_count).min(1.0);
1283                let strength_boost = (max_weight / 1.0_f64).min(0.3); // Max 30% boost from pattern strength
1284                (base_confidence + strength_boost).min(1.0)
1285            };
1286
1287            let passed_threshold = confidence >= self.config.stage1.confidence_threshold;
1288
1289            StageResult {
1290                stage: AssessmentStage::Stage1PatternMatching,
1291                score: normalized_score,
1292                confidence,
1293                processing_time_ms: stage_start.elapsed().as_millis() as u64,
1294                passed_threshold,
1295                details: StageDetails::Stage1 {
1296                    matched_patterns,
1297                    total_patterns_checked: self.config.stage1.pattern_library.len(),
1298                },
1299            }
1300        })
1301        .await;
1302
1303        match result {
1304            Ok(stage_result) => {
1305                let duration_seconds = stage_start.elapsed().as_secs_f64();
1306                self.metrics.stage1_duration.observe(duration_seconds);
1307
1308                // Check performance threshold
1309                if stage_result.processing_time_ms > self.config.performance.stage1_target_ms {
1310                    self.metrics.stage1_threshold_violations.inc();
1311                    warn!(
1312                        "Stage 1 exceeded target time: {}ms > {}ms",
1313                        stage_result.processing_time_ms, self.config.performance.stage1_target_ms
1314                    );
1315                }
1316
1317                debug!(
1318                    "Stage 1 completed in {}ms with score {:.3} and confidence {:.3}",
1319                    stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1320                );
1321
1322                Ok(stage_result)
1323            }
1324            Err(_) => {
1325                self.metrics.stage1_threshold_violations.inc();
1326                Err(ImportanceAssessmentError::Timeout(format!(
1327                    "Stage 1 exceeded maximum processing time of {}ms",
1328                    self.config.stage1.max_processing_time_ms
1329                )))
1330            }
1331        }
1332    }
1333
1334    async fn execute_stage2(
1335        &self,
1336        content: &str,
1337    ) -> Result<StageResult, ImportanceAssessmentError> {
1338        let stage_start = Instant::now();
1339        self.metrics.stage2_executions.inc();
1340
1341        let timeout_duration = Duration::from_millis(self.config.stage2.max_processing_time_ms);
1342
1343        let stage2_result = async {
1344            // Cleanup expired entries periodically (every 100th request)
1345            if self.metrics.stage2_executions.get() % 100 == 0 {
1346                if let Err(e) = self.embedding_cache.cleanup_expired().await {
1347                    warn!("Failed to cleanup expired cache entries: {}", e);
1348                }
1349            }
1350
1351            // Generate secure hash of content using SHA-256
1352            let mut hasher = Sha256::new();
1353            hasher.update(content.as_bytes());
1354            let content_hash = format!("{:x}", hasher.finalize());
1355
1356            let (content_embedding, cache_hit, embedding_time) =
1357                if let Some(cached) = self.embedding_cache.get(&content_hash).await {
1358                    self.metrics.embedding_cache_hits.inc();
1359                    (cached.embedding, true, None)
1360                } else {
1361                    self.metrics.embedding_cache_misses.inc();
1362                    let embed_start = Instant::now();
1363                    let embedding = match self.embedding_service.generate_embedding(content).await {
1364                        Ok(emb) => emb,
1365                        Err(e) => {
1366                            return Err(ImportanceAssessmentError::Stage2Failed(format!(
1367                                "Embedding generation failed: {e}"
1368                            )))
1369                        }
1370                    };
1371                    let embed_time = embed_start.elapsed().as_millis() as u64;
1372
1373                    // Cache the new embedding with error handling
1374                    let cached_embedding = CachedEmbedding::new(
1375                        embedding.clone(),
1376                        self.config.stage2.embedding_cache_ttl_seconds,
1377                    );
1378
1379                    if let Err(e) = self
1380                        .embedding_cache
1381                        .insert(content_hash, cached_embedding)
1382                        .await
1383                    {
1384                        warn!("Failed to cache embedding: {}", e);
1385                    }
1386
1387                    self.metrics
1388                        .embedding_cache_size
1389                        .set(self.embedding_cache.len() as i64);
1390                    (embedding, false, Some(embed_time))
1391                };
1392
1393            // Calculate similarity scores with reference embeddings
1394            let mut similarity_scores = Vec::new();
1395            let mut total_weighted_score = 0.0;
1396            let mut total_weight = 0.0;
1397
1398            for reference in &self.config.stage2.reference_embeddings {
1399                let similarity =
1400                    self.calculate_cosine_similarity(&content_embedding, &reference.embedding);
1401
1402                if similarity >= self.config.stage2.similarity_threshold {
1403                    let weighted_score = similarity as f64 * reference.weight;
1404
1405                    similarity_scores.push(SimilarityScore {
1406                        reference_name: reference.name.clone(),
1407                        reference_category: reference.category.clone(),
1408                        similarity,
1409                        weight: reference.weight,
1410                        weighted_score,
1411                    });
1412
1413                    total_weighted_score += weighted_score;
1414                    total_weight += reference.weight;
1415                }
1416            }
1417
1418            // Normalize score to 0.0-1.0 range
1419            let normalized_score = if total_weight > 0.0 {
1420                (total_weighted_score / total_weight).min(1.0)
1421            } else {
1422                0.0
1423            };
1424
1425            // Calculate confidence based on number of matches and their strength
1426            let confidence = if similarity_scores.is_empty() {
1427                0.1 // Low confidence for no semantic matches
1428            } else {
1429                let match_ratio = similarity_scores.len() as f64
1430                    / self.config.stage2.reference_embeddings.len() as f64;
1431                let avg_similarity = similarity_scores
1432                    .iter()
1433                    .map(|s| s.similarity as f64)
1434                    .sum::<f64>()
1435                    / similarity_scores.len() as f64;
1436                (match_ratio * 0.5 + avg_similarity * 0.5).min(1.0)
1437            };
1438
1439            let passed_threshold = confidence >= self.config.stage2.confidence_threshold;
1440
1441            Ok(StageResult {
1442                stage: AssessmentStage::Stage2SemanticSimilarity,
1443                score: normalized_score,
1444                confidence,
1445                processing_time_ms: stage_start.elapsed().as_millis() as u64,
1446                passed_threshold,
1447                details: StageDetails::Stage2 {
1448                    similarity_scores,
1449                    cache_hit,
1450                    embedding_generation_time_ms: embedding_time,
1451                },
1452            })
1453        };
1454
1455        let result = timeout(timeout_duration, stage2_result).await;
1456
1457        match result {
1458            Ok(Ok(stage_result)) => {
1459                let duration_seconds = stage_start.elapsed().as_secs_f64();
1460                self.metrics.stage2_duration.observe(duration_seconds);
1461
1462                // Check performance threshold
1463                if stage_result.processing_time_ms > self.config.performance.stage2_target_ms {
1464                    self.metrics.stage2_threshold_violations.inc();
1465                    warn!(
1466                        "Stage 2 exceeded target time: {}ms > {}ms",
1467                        stage_result.processing_time_ms, self.config.performance.stage2_target_ms
1468                    );
1469                }
1470
1471                debug!(
1472                    "Stage 2 completed in {}ms with score {:.3} and confidence {:.3}",
1473                    stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1474                );
1475
1476                Ok(stage_result)
1477            }
1478            Ok(Err(e)) => Err(e),
1479            Err(_) => {
1480                self.metrics.stage2_threshold_violations.inc();
1481                Err(ImportanceAssessmentError::Timeout(format!(
1482                    "Stage 2 exceeded maximum processing time of {}ms",
1483                    self.config.stage2.max_processing_time_ms
1484                )))
1485            }
1486        }
1487    }
1488
1489    async fn execute_stage3(
1490        &self,
1491        content: &str,
1492    ) -> Result<StageResult, ImportanceAssessmentError> {
1493        let stage_start = Instant::now();
1494        self.metrics.stage3_executions.inc();
1495
1496        // Check circuit breaker
1497        if !self.circuit_breaker.can_execute().await? {
1498            return Err(ImportanceAssessmentError::CircuitBreakerOpen(
1499                "LLM circuit breaker is open".to_string(),
1500            ));
1501        }
1502
1503        let timeout_duration = Duration::from_millis(self.config.stage3.max_processing_time_ms);
1504
1505        let result = timeout(timeout_duration, async {
1506            // Prepare LLM prompt with length limits
1507            let content_preview = if content.len() > 2000 {
1508                format!(
1509                    "{}... [truncated from {} chars]",
1510                    &content[..2000],
1511                    content.len()
1512                )
1513            } else {
1514                content.to_string()
1515            };
1516
1517            let prompt = self
1518                .config
1519                .stage3
1520                .prompt_template
1521                .replace("{content}", &content_preview)
1522                .replace("{timestamp}", &Utc::now().to_rfc3339());
1523
1524            // Make LLM request
1525            let llm_response = self.call_llm(&prompt).await?;
1526
1527            // Parse LLM response to extract importance score and confidence
1528            let (importance_score, confidence) = self.parse_llm_response(&llm_response)?;
1529
1530            let passed_threshold = true; // Stage 3 is the final stage
1531
1532            Ok::<StageResult, ImportanceAssessmentError>(StageResult {
1533                stage: AssessmentStage::Stage3LLMScoring,
1534                score: importance_score,
1535                confidence,
1536                processing_time_ms: stage_start.elapsed().as_millis() as u64,
1537                passed_threshold,
1538                details: StageDetails::Stage3 {
1539                    llm_response,
1540                    prompt_tokens: Some(prompt.len() / 4), // Rough token estimate
1541                    completion_tokens: None,               // Would need to be provided by LLM API
1542                    model_used: "configured-model".to_string(),
1543                },
1544            })
1545        })
1546        .await;
1547
1548        match result {
1549            Ok(Ok(stage_result)) => {
1550                let duration_seconds = stage_start.elapsed().as_secs_f64();
1551                self.metrics.stage3_duration.observe(duration_seconds);
1552                self.metrics.llm_call_successes.inc();
1553                self.circuit_breaker.record_success().await;
1554
1555                // Check performance threshold
1556                if stage_result.processing_time_ms > self.config.performance.stage3_target_ms {
1557                    self.metrics.stage3_threshold_violations.inc();
1558                    warn!(
1559                        "Stage 3 exceeded target time: {}ms > {}ms",
1560                        stage_result.processing_time_ms, self.config.performance.stage3_target_ms
1561                    );
1562                }
1563
1564                debug!(
1565                    "Stage 3 completed in {}ms with score {:.3} and confidence {:.3}",
1566                    stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1567                );
1568
1569                Ok(stage_result)
1570            }
1571            Ok(Err(e)) => {
1572                self.metrics.llm_call_failures.inc();
1573                self.circuit_breaker.record_failure().await;
1574                Err(e)
1575            }
1576            Err(_) => {
1577                self.metrics.stage3_threshold_violations.inc();
1578                self.metrics.llm_call_failures.inc();
1579                self.circuit_breaker.record_failure().await;
1580                Err(ImportanceAssessmentError::Timeout(format!(
1581                    "Stage 3 exceeded maximum processing time of {}ms",
1582                    self.config.stage3.max_processing_time_ms
1583                )))
1584            }
1585        }
1586    }
1587
1588    fn calculate_cosine_similarity(&self, a: &[f32], b: &[f32]) -> f32 {
1589        if a.len() != b.len() {
1590            return 0.0;
1591        }
1592
1593        let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
1594        let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1595        let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1596
1597        if norm_a == 0.0 || norm_b == 0.0 {
1598            return 0.0;
1599        }
1600
1601        dot_product / (norm_a * norm_b)
1602    }
1603
1604    async fn call_llm(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1605        let sanitized_prompt = self.sanitize_llm_prompt(prompt)?;
1606
1607        // Implement exponential backoff retry logic
1608        let mut attempts = 0;
1609        let max_retries = 3;
1610        let base_delay = Duration::from_millis(100);
1611
1612        loop {
1613            match self.attempt_llm_call(&sanitized_prompt).await {
1614                Ok(response) => return Ok(response),
1615                Err(e) => {
1616                    attempts += 1;
1617                    if attempts > max_retries {
1618                        return Err(e);
1619                    }
1620
1621                    // Exponential backoff with jitter
1622                    let delay = base_delay * (2_u32.pow(attempts - 1));
1623                    let jitter = Duration::from_millis(rand::thread_rng().gen_range(0..=100));
1624                    tokio::time::sleep(delay + jitter).await;
1625
1626                    warn!(
1627                        "LLM call failed (attempt {}/{}), retrying in {:?}: {}",
1628                        attempts,
1629                        max_retries,
1630                        delay + jitter,
1631                        e
1632                    );
1633                }
1634            }
1635        }
1636    }
1637
1638    async fn attempt_llm_call(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1639        // Validate input length to prevent resource exhaustion
1640        if prompt.len() > 50000 {
1641            // 50KB limit
1642            return Err(ImportanceAssessmentError::Stage3Failed(
1643                "Prompt exceeds maximum length".to_string(),
1644            ));
1645        }
1646
1647        let request_body = serde_json::json!({
1648            "prompt": prompt,
1649            "max_tokens": 150,
1650            "temperature": 0.1,
1651            "stop": ["\n\n", "---"],
1652            "model": "text-davinci-003" // Default model
1653        });
1654
1655        let response = self
1656            .http_client
1657            .post(&self.config.stage3.llm_endpoint)
1658            .header("Content-Type", "application/json")
1659            .header("User-Agent", "codex-memory/0.1.0")
1660            .json(&request_body)
1661            .send()
1662            .await
1663            .map_err(|e| {
1664                if e.is_timeout() {
1665                    ImportanceAssessmentError::Timeout(format!("LLM request timed out: {e}"))
1666                } else if e.is_connect() {
1667                    ImportanceAssessmentError::Stage3Failed(format!(
1668                        "Failed to connect to LLM service: {e}"
1669                    ))
1670                } else {
1671                    ImportanceAssessmentError::Stage3Failed(format!("LLM request failed: {e}"))
1672                }
1673            })?;
1674
1675        let status = response.status();
1676        let response_text = response.text().await.map_err(|e| {
1677            ImportanceAssessmentError::Stage3Failed(format!("Failed to read response body: {e}"))
1678        })?;
1679
1680        if !status.is_success() {
1681            // Handle different HTTP error codes
1682            let error_msg = match status.as_u16() {
1683                400 => format!("Bad request to LLM service: {response_text}"),
1684                401 => "Unauthorized: Invalid API key or credentials".to_string(),
1685                403 => "Forbidden: Insufficient permissions".to_string(),
1686                429 => "Rate limit exceeded, will retry".to_string(),
1687                500..=599 => format!("LLM service error ({status}): {response_text}"),
1688                _ => format!("LLM service returned status {status}: {response_text}"),
1689            };
1690
1691            return Err(ImportanceAssessmentError::Stage3Failed(error_msg));
1692        }
1693
1694        // Parse response with proper error handling
1695        let response_json: serde_json::Value =
1696            serde_json::from_str(&response_text).map_err(|e| {
1697                ImportanceAssessmentError::Stage3Failed(format!(
1698                    "Failed to parse LLM response as JSON: {e}"
1699                ))
1700            })?;
1701
1702        // Extract response text with multiple fallback paths
1703        let response_content = response_json
1704            .get("choices")
1705            .and_then(|c| c.get(0))
1706            .and_then(|choice| {
1707                // Try different response formats (OpenAI, local models, etc.)
1708                choice
1709                    .get("text")
1710                    .or_else(|| choice.get("message").and_then(|m| m.get("content")))
1711                    .or_else(|| choice.get("generated_text"))
1712            })
1713            .and_then(|v| v.as_str())
1714            .ok_or_else(|| {
1715                ImportanceAssessmentError::Stage3Failed(format!(
1716                    "Invalid LLM response format. Expected 'choices[0].text' or similar. Got: {}",
1717                    serde_json::to_string_pretty(&response_json)
1718                        .unwrap_or_else(|_| "invalid JSON".to_string())
1719                ))
1720            })?
1721            .trim()
1722            .to_string();
1723
1724        // Validate response length
1725        if response_content.is_empty() {
1726            return Err(ImportanceAssessmentError::Stage3Failed(
1727                "LLM returned empty response".to_string(),
1728            ));
1729        }
1730
1731        if response_content.len() > 10000 {
1732            // 10KB response limit
1733            warn!("LLM response was truncated due to excessive length");
1734            return Ok(response_content[..10000].to_string());
1735        }
1736
1737        Ok(response_content)
1738    }
1739
1740    fn sanitize_llm_prompt(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1741        // Basic prompt injection protection
1742        let dangerous_patterns = [
1743            "ignore previous instructions",
1744            "ignore all previous",
1745            "disregard previous",
1746            "forget previous",
1747            "new instructions:",
1748            "system:",
1749            "assistant:",
1750            "user:",
1751            "<|endoftext|>",
1752            "###",
1753            "---\n",
1754        ];
1755
1756        let lower_prompt = prompt.to_lowercase();
1757        for pattern in &dangerous_patterns {
1758            if lower_prompt.contains(pattern) {
1759                warn!("Potential prompt injection detected: {}", pattern);
1760                return Err(ImportanceAssessmentError::Stage3Failed(
1761                    "Prompt contains potentially malicious content".to_string(),
1762                ));
1763            }
1764        }
1765
1766        // Remove or escape potentially dangerous characters
1767        let sanitized = prompt
1768            .replace('\0', "") // Remove null bytes
1769            .replace("\x1b", "") // Remove escape sequences
1770            .chars()
1771            .filter(|c| c.is_ascii_graphic() || c.is_ascii_whitespace())
1772            .collect::<String>();
1773
1774        // Validate final length
1775        if sanitized.len() > 10000 {
1776            return Err(ImportanceAssessmentError::Stage3Failed(
1777                "Prompt too long after sanitization".to_string(),
1778            ));
1779        }
1780
1781        Ok(sanitized)
1782    }
1783
1784    fn parse_llm_response(&self, response: &str) -> Result<(f64, f64), ImportanceAssessmentError> {
1785        // Parse LLM response to extract importance score and confidence
1786        // This is a simplified parser - in practice, you'd want more robust parsing
1787
1788        let lines: Vec<&str> = response.lines().collect();
1789        let mut importance_score = 0.5; // Default
1790        let mut confidence = 0.7; // Default
1791
1792        for line in lines {
1793            let line = line.trim().to_lowercase();
1794
1795            // Look for importance score
1796            if line.contains("importance:") || line.contains("score:") {
1797                if let Some(score_str) = line.split(':').nth(1) {
1798                    if let Ok(score) = score_str.trim().parse::<f64>() {
1799                        importance_score = score.clamp(0.0, 1.0);
1800                    }
1801                }
1802            }
1803
1804            // Look for confidence
1805            if line.contains("confidence:") {
1806                if let Some(conf_str) = line.split(':').nth(1) {
1807                    if let Ok(conf) = conf_str.trim().parse::<f64>() {
1808                        confidence = conf.clamp(0.0, 1.0);
1809                    }
1810                }
1811            }
1812        }
1813
1814        Ok((importance_score, confidence))
1815    }
1816
1817    fn extract_explanation_from_stage3(&self, stage_result: &StageResult) -> Option<String> {
1818        if let StageDetails::Stage3 { llm_response, .. } = &stage_result.details {
1819            Some(llm_response.clone())
1820        } else {
1821            None
1822        }
1823    }
1824
1825    fn record_final_metrics(&self, result: &ImportanceAssessmentResult) {
1826        self.metrics
1827            .assessment_confidence
1828            .observe(result.confidence);
1829        self.metrics
1830            .final_importance_scores
1831            .observe(result.importance_score);
1832
1833        info!(
1834            "Importance assessment completed: score={:.3}, confidence={:.3}, stage={:?}, time={}ms",
1835            result.importance_score,
1836            result.confidence,
1837            result.final_stage,
1838            result.total_processing_time_ms
1839        );
1840    }
1841
1842    /// Get current pipeline statistics
1843    pub async fn get_statistics(&self) -> PipelineStatistics {
1844        let cache_size = self.embedding_cache.len();
1845        let eviction_count = self.embedding_cache.eviction_count();
1846
1847        PipelineStatistics {
1848            cache_size,
1849            stage1_executions: self.metrics.stage1_executions.get(),
1850            stage2_executions: self.metrics.stage2_executions.get(),
1851            stage3_executions: self.metrics.stage3_executions.get(),
1852            completed_at_stage1: self.metrics.completed_at_stage1.get(),
1853            completed_at_stage2: self.metrics.completed_at_stage2.get(),
1854            completed_at_stage3: self.metrics.completed_at_stage3.get(),
1855            cache_hits: self.metrics.embedding_cache_hits.get(),
1856            cache_misses: self.metrics.embedding_cache_misses.get(),
1857            cache_evictions: eviction_count,
1858            circuit_breaker_state: format!("{:?}", *self.circuit_breaker.state.read().await),
1859            llm_success_rate: {
1860                let successes = self.metrics.llm_call_successes.get() as f64;
1861                let failures = self.metrics.llm_call_failures.get() as f64;
1862                let total = successes + failures;
1863                if total > 0.0 {
1864                    successes / total
1865                } else {
1866                    1.0
1867                }
1868            },
1869        }
1870    }
1871
1872    /// Clear the embedding cache
1873    pub async fn clear_cache(&self) {
1874        self.embedding_cache.clear().await;
1875        self.metrics.embedding_cache_size.set(0);
1876        info!("Embedding cache cleared");
1877    }
1878
1879    /// Get cache hit ratio
1880    pub fn get_cache_hit_ratio(&self) -> f64 {
1881        let hits = self.metrics.embedding_cache_hits.get() as f64;
1882        let misses = self.metrics.embedding_cache_misses.get() as f64;
1883        let total = hits + misses;
1884        if total > 0.0 {
1885            hits / total
1886        } else {
1887            0.0
1888        }
1889    }
1890}
1891
1892#[derive(Debug, Clone, Serialize, Deserialize)]
1893pub struct PipelineStatistics {
1894    pub cache_size: usize,
1895    pub stage1_executions: u64,
1896    pub stage2_executions: u64,
1897    pub stage3_executions: u64,
1898    pub completed_at_stage1: u64,
1899    pub completed_at_stage2: u64,
1900    pub completed_at_stage3: u64,
1901    pub cache_hits: u64,
1902    pub cache_misses: u64,
1903    pub cache_evictions: u64,
1904    pub circuit_breaker_state: String,
1905    pub llm_success_rate: f64,
1906}
1907
1908impl Default for ImportanceAssessmentConfig {
1909    fn default() -> Self {
1910        Self {
1911            stage1: Stage1Config {
1912                confidence_threshold: 0.6,
1913                pattern_library: vec![
1914                    ImportancePattern {
1915                        name: "remember_command".to_string(),
1916                        pattern: r"(?i)\b(remember|recall|don't forget)\b".to_string(),
1917                        weight: 0.8,
1918                        context_boosters: vec!["important".to_string(), "critical".to_string()],
1919                        category: "memory".to_string(),
1920                    },
1921                    ImportancePattern {
1922                        name: "preference_statement".to_string(),
1923                        pattern: r"(?i)\b(prefer|like|want|choose)\b".to_string(),
1924                        weight: 0.7,
1925                        context_boosters: vec!["always".to_string(), "usually".to_string()],
1926                        category: "preference".to_string(),
1927                    },
1928                    ImportancePattern {
1929                        name: "decision_making".to_string(),
1930                        pattern: r"(?i)\b(decide|decision|choose|select)\b".to_string(),
1931                        weight: 0.75,
1932                        context_boosters: vec!["final".to_string(), "official".to_string()],
1933                        category: "decision".to_string(),
1934                    },
1935                    ImportancePattern {
1936                        name: "correction".to_string(),
1937                        pattern: r"(?i)\b(correct|fix|wrong|mistake|error)\b".to_string(),
1938                        weight: 0.6,
1939                        context_boosters: vec!["actually".to_string(), "should".to_string()],
1940                        category: "correction".to_string(),
1941                    },
1942                    ImportancePattern {
1943                        name: "importance_marker".to_string(),
1944                        pattern: r"(?i)\b(important|critical|crucial|vital|essential)\b".to_string(),
1945                        weight: 0.9,
1946                        context_boosters: vec!["very".to_string(), "extremely".to_string()],
1947                        category: "importance".to_string(),
1948                    },
1949                ],
1950                max_processing_time_ms: 10,
1951            },
1952            stage2: Stage2Config {
1953                confidence_threshold: 0.7,
1954                max_processing_time_ms: 100,
1955                embedding_cache_ttl_seconds: 3600, // 1 hour
1956                embedding_cache_max_size: 10000, // Maximum 10k cached embeddings
1957                cache_eviction_threshold: 0.8, // Start evicting at 80% capacity
1958                similarity_threshold: 0.7,
1959                reference_embeddings: vec![], // Would be populated with pre-computed embeddings
1960            },
1961            stage3: Stage3Config {
1962                max_processing_time_ms: 1000,
1963                llm_endpoint: "http://localhost:8080/generate".to_string(),
1964                max_concurrent_requests: 5,
1965                prompt_template: "Assess the importance of this content on a scale of 0.0 to 1.0. Consider context, user intent, and actionability.\n\nContent: {content}\n\nProvide your assessment as:\nImportance: [score]\nConfidence: [confidence]\nReasoning: [explanation]".to_string(),
1966                target_usage_percentage: 20.0,
1967            },
1968            circuit_breaker: CircuitBreakerConfig {
1969                failure_threshold: 5,
1970                failure_window_seconds: 60,
1971                recovery_timeout_seconds: 30,
1972                minimum_requests: 3,
1973            },
1974            performance: PerformanceConfig {
1975                stage1_target_ms: 10,
1976                stage2_target_ms: 100,
1977                stage3_target_ms: 1000,
1978            },
1979        }
1980    }
1981}