1use crate::embedding::EmbeddingService;
2use crate::memory::MemoryError;
3use anyhow::Result;
4use chrono::{DateTime, Utc};
5use prometheus::{Counter, Histogram, IntCounter, IntGauge, Registry};
6use rand::Rng;
7use regex::Regex;
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10use std::collections::{HashMap, LinkedList};
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use thiserror::Error;
15use tokio::sync::RwLock;
16use tokio::time::timeout;
17use tracing::{debug, error, info, warn};
18
19#[derive(Debug, Error)]
20pub enum ImportanceAssessmentError {
21 #[error("Stage 1 pattern matching failed: {0}")]
22 Stage1Failed(String),
23
24 #[error("Stage 2 semantic analysis failed: {0}")]
25 Stage2Failed(String),
26
27 #[error("Stage 3 LLM scoring failed: {0}")]
28 Stage3Failed(String),
29
30 #[error("Circuit breaker is open: {0}")]
31 CircuitBreakerOpen(String),
32
33 #[error("Timeout exceeded: {0}")]
34 Timeout(String),
35
36 #[error("Configuration error: {0}")]
37 Configuration(String),
38
39 #[error("Memory operation failed: {0}")]
40 Memory(#[from] MemoryError),
41
42 #[error("Cache operation failed: {0}")]
43 Cache(String),
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ImportanceAssessmentConfig {
49 pub stage1: Stage1Config,
51
52 pub stage2: Stage2Config,
54
55 pub stage3: Stage3Config,
57
58 pub circuit_breaker: CircuitBreakerConfig,
60
61 pub performance: PerformanceConfig,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct Stage1Config {
67 pub confidence_threshold: f64,
69
70 pub pattern_library: Vec<ImportancePattern>,
72
73 pub max_processing_time_ms: u64,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct Stage2Config {
79 pub confidence_threshold: f64,
81
82 pub max_processing_time_ms: u64,
84
85 pub embedding_cache_ttl_seconds: u64,
87
88 pub embedding_cache_max_size: usize,
90
91 pub cache_eviction_threshold: f64,
93
94 pub similarity_threshold: f32,
96
97 pub reference_embeddings: Vec<ReferenceEmbedding>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct Stage3Config {
103 pub max_processing_time_ms: u64,
105
106 pub llm_endpoint: String,
108
109 pub max_concurrent_requests: usize,
111
112 pub prompt_template: String,
114
115 pub target_usage_percentage: f64,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct CircuitBreakerConfig {
121 pub failure_threshold: usize,
123
124 pub failure_window_seconds: u64,
126
127 pub recovery_timeout_seconds: u64,
129
130 pub minimum_requests: usize,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct PerformanceConfig {
136 pub stage1_target_ms: u64,
138
139 pub stage2_target_ms: u64,
141
142 pub stage3_target_ms: u64,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct ImportancePattern {
148 pub name: String,
150
151 pub pattern: String,
153
154 pub weight: f64,
156
157 pub context_boosters: Vec<String>,
159
160 pub category: String,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct ReferenceEmbedding {
166 pub name: String,
168
169 pub embedding: Vec<f32>,
171
172 pub weight: f64,
174
175 pub category: String,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct ImportanceAssessmentResult {
182 pub importance_score: f64,
184
185 pub final_stage: AssessmentStage,
187
188 pub stage_results: Vec<StageResult>,
190
191 pub total_processing_time_ms: u64,
193
194 pub assessed_at: DateTime<Utc>,
196
197 pub confidence: f64,
199
200 pub explanation: Option<String>,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct StageResult {
206 pub stage: AssessmentStage,
208
209 pub score: f64,
211
212 pub confidence: f64,
214
215 pub processing_time_ms: u64,
217
218 pub passed_threshold: bool,
220
221 pub details: StageDetails,
223}
224
225#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
226pub enum AssessmentStage {
227 Stage1PatternMatching,
228 Stage2SemanticSimilarity,
229 Stage3LLMScoring,
230}
231
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233pub enum StageDetails {
234 Stage1 {
235 matched_patterns: Vec<MatchedPattern>,
236 total_patterns_checked: usize,
237 },
238 Stage2 {
239 similarity_scores: Vec<SimilarityScore>,
240 cache_hit: bool,
241 embedding_generation_time_ms: Option<u64>,
242 },
243 Stage3 {
244 llm_response: String,
245 prompt_tokens: Option<usize>,
246 completion_tokens: Option<usize>,
247 model_used: String,
248 },
249}
250
251#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
252pub struct MatchedPattern {
253 pub pattern_name: String,
254 pub pattern_category: String,
255 pub match_text: String,
256 pub match_position: usize,
257 pub weight: f64,
258 pub context_boost: f64,
259}
260
261#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
262pub struct SimilarityScore {
263 pub reference_name: String,
264 pub reference_category: String,
265 pub similarity: f32,
266 pub weight: f64,
267 pub weighted_score: f64,
268}
269
270#[derive(Debug, Clone, PartialEq)]
272enum CircuitBreakerState {
273 Closed,
274 Open(DateTime<Utc>),
275 HalfOpen,
276}
277
278#[derive(Debug)]
280struct CircuitBreaker {
281 state: RwLock<CircuitBreakerState>,
282 config: CircuitBreakerConfig,
283 failure_count: AtomicUsize,
284 last_failure_time: RwLock<Option<DateTime<Utc>>>,
285 request_count: AtomicUsize,
286 consecutive_successes: AtomicUsize, }
288
289impl CircuitBreaker {
290 fn new(config: CircuitBreakerConfig) -> Self {
291 Self {
292 state: RwLock::new(CircuitBreakerState::Closed),
293 config,
294 failure_count: AtomicUsize::new(0),
295 last_failure_time: RwLock::new(None),
296 request_count: AtomicUsize::new(0),
297 consecutive_successes: AtomicUsize::new(0),
298 }
299 }
300
301 async fn can_execute(&self) -> Result<bool, ImportanceAssessmentError> {
302 let state = self.state.read().await;
303 match *state {
304 CircuitBreakerState::Closed => Ok(true),
305 CircuitBreakerState::Open(opened_at) => {
306 let now = Utc::now();
307 let recovery_time = opened_at
308 + chrono::Duration::seconds(self.config.recovery_timeout_seconds as i64);
309
310 if now >= recovery_time {
311 drop(state);
312 let mut state = self.state.write().await;
313 *state = CircuitBreakerState::HalfOpen;
314 Ok(true)
315 } else {
316 Err(ImportanceAssessmentError::CircuitBreakerOpen(format!(
317 "Circuit breaker is open until {recovery_time}"
318 )))
319 }
320 }
321 CircuitBreakerState::HalfOpen => Ok(true),
322 }
323 }
324
325 async fn record_success(&self) {
326 let consecutive_successes = self.consecutive_successes.fetch_add(1, Ordering::SeqCst);
328
329 self.failure_count.store(0, Ordering::SeqCst);
331
332 let current_state = {
334 let state = self.state.read().await;
335 state.clone()
336 };
337
338 match current_state {
339 CircuitBreakerState::HalfOpen => {
340 if consecutive_successes >= 2 {
342 let mut state = self.state.write().await;
343 *state = CircuitBreakerState::Closed;
344
345 let mut last_failure_time = self.last_failure_time.write().await;
346 *last_failure_time = None;
347
348 info!(
349 "Circuit breaker closed after {} consecutive successes",
350 consecutive_successes + 1
351 );
352 }
353 }
354 CircuitBreakerState::Open(_) => {
355 warn!(
357 "Received success while circuit breaker is open - state inconsistency detected"
358 );
359 }
360 CircuitBreakerState::Closed => {
361 let mut last_failure_time = self.last_failure_time.write().await;
363 *last_failure_time = None;
364 }
365 }
366 }
367
368 async fn record_failure(&self) {
369 let now = Utc::now();
370
371 self.consecutive_successes.store(0, Ordering::SeqCst);
373
374 let request_count = self.request_count.fetch_add(1, Ordering::SeqCst) + 1;
376
377 let should_open_circuit = {
379 let mut last_failure_time = self.last_failure_time.write().await;
380
381 if let Some(last_failure) = *last_failure_time {
383 let window_start =
384 now - chrono::Duration::seconds(self.config.failure_window_seconds as i64);
385 if last_failure < window_start {
386 self.failure_count.store(0, Ordering::SeqCst);
388 }
389 }
390
391 let failure_count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
393 *last_failure_time = Some(now);
394
395 request_count >= self.config.minimum_requests
397 && failure_count >= self.config.failure_threshold
398 };
399
400 if should_open_circuit {
401 let current_state = {
403 let state = self.state.read().await;
404 state.clone()
405 };
406
407 match current_state {
408 CircuitBreakerState::Closed | CircuitBreakerState::HalfOpen => {
409 let mut state = self.state.write().await;
410 match *state {
412 CircuitBreakerState::Closed | CircuitBreakerState::HalfOpen => {
413 *state = CircuitBreakerState::Open(now);
414 let failure_count = self.failure_count.load(Ordering::SeqCst);
415 warn!(
416 "Circuit breaker opened due to {} failures out of {} requests",
417 failure_count, request_count
418 );
419 }
420 CircuitBreakerState::Open(_) => {
421 }
423 }
424 }
425 CircuitBreakerState::Open(_) => {
426 debug!("Additional failure recorded while circuit breaker is open");
428 }
429 }
430 }
431 }
432}
433
434#[derive(Debug, Clone)]
436struct CachedEmbedding {
437 embedding: Vec<f32>,
438 cached_at: DateTime<Utc>,
439 ttl_seconds: u64,
440 last_accessed: DateTime<Utc>,
441}
442
443#[derive(Debug, Clone)]
445#[allow(dead_code)] struct LRUNode {
447 key: String,
448 timestamp: DateTime<Utc>,
449}
450
451#[derive(Debug)]
453struct EmbeddingCache {
454 cache: RwLock<HashMap<String, CachedEmbedding>>,
455 lru_list: RwLock<LinkedList<LRUNode>>,
456 current_size: AtomicUsize,
457 max_size: usize,
458 #[allow(dead_code)] eviction_threshold: f64,
460 eviction_count: AtomicU64,
461 memory_pressure_threshold: usize,
462}
463
464impl CachedEmbedding {
465 fn new(embedding: Vec<f32>, ttl_seconds: u64) -> Self {
466 let now = Utc::now();
467 Self {
468 embedding,
469 cached_at: now,
470 ttl_seconds,
471 last_accessed: now,
472 }
473 }
474
475 fn is_expired(&self) -> bool {
476 let now = Utc::now();
477 let expiry = self.cached_at + chrono::Duration::seconds(self.ttl_seconds as i64);
478 now >= expiry
479 }
480
481 fn touch(&mut self) {
482 self.last_accessed = Utc::now();
483 }
484}
485
486impl EmbeddingCache {
487 fn new(max_size: usize, eviction_threshold: f64) -> Self {
488 Self {
489 cache: RwLock::new(HashMap::new()),
490 lru_list: RwLock::new(LinkedList::new()),
491 current_size: AtomicUsize::new(0),
492 max_size,
493 eviction_threshold,
494 eviction_count: AtomicU64::new(0),
495 memory_pressure_threshold: (max_size as f64 * eviction_threshold) as usize,
496 }
497 }
498
499 async fn get(&self, key: &str) -> Option<CachedEmbedding> {
500 let mut cache = self.cache.write().await;
501 let mut lru_list = self.lru_list.write().await;
502
503 if let Some(cached) = cache.get_mut(key) {
504 if cached.is_expired() {
505 cache.remove(key);
506 self.current_size.fetch_sub(1, Ordering::Relaxed);
507 *lru_list = lru_list
509 .iter()
510 .filter(|node| node.key != key)
511 .cloned()
512 .collect();
513 return None;
514 }
515
516 cached.touch();
517 *lru_list = lru_list
519 .iter()
520 .filter(|node| node.key != key)
521 .cloned()
522 .collect();
523 lru_list.push_front(LRUNode {
524 key: key.to_string(),
525 timestamp: cached.last_accessed,
526 });
527
528 Some(cached.clone())
529 } else {
530 None
531 }
532 }
533
534 async fn insert(
535 &self,
536 key: String,
537 value: CachedEmbedding,
538 ) -> Result<(), ImportanceAssessmentError> {
539 let current_size = self.current_size.load(Ordering::Relaxed);
541 if current_size >= self.memory_pressure_threshold {
542 self.evict_lru_entries().await?;
543 }
544
545 let mut cache = self.cache.write().await;
546 let mut lru_list = self.lru_list.write().await;
547
548 if cache.contains_key(&key) {
550 *lru_list = lru_list
551 .iter()
552 .filter(|node| node.key != key)
553 .cloned()
554 .collect();
555 } else {
556 self.current_size.fetch_add(1, Ordering::Relaxed);
557 }
558
559 cache.insert(key.clone(), value.clone());
560 lru_list.push_front(LRUNode {
561 key: key.clone(),
562 timestamp: value.last_accessed,
563 });
564
565 Ok(())
566 }
567
568 async fn evict_lru_entries(&self) -> Result<(), ImportanceAssessmentError> {
569 let mut cache = self.cache.write().await;
570 let mut lru_list = self.lru_list.write().await;
571
572 let target_size = (self.max_size as f64 * 0.7) as usize; let current_size = cache.len();
574
575 if current_size <= target_size {
576 return Ok(());
577 }
578
579 let entries_to_remove = current_size - target_size;
580 let mut removed_count = 0;
581
582 while removed_count < entries_to_remove && !lru_list.is_empty() {
584 if let Some(node) = lru_list.pop_back() {
585 cache.remove(&node.key);
586 removed_count += 1;
587 self.eviction_count.fetch_add(1, Ordering::Relaxed);
588 }
589 }
590
591 self.current_size.store(cache.len(), Ordering::Relaxed);
592
593 if removed_count > 0 {
594 info!(
595 "Evicted {} entries from embedding cache due to memory pressure",
596 removed_count
597 );
598 }
599
600 Ok(())
601 }
602
603 async fn clear(&self) {
604 let mut cache = self.cache.write().await;
605 let mut lru_list = self.lru_list.write().await;
606
607 cache.clear();
608 lru_list.clear();
609 self.current_size.store(0, Ordering::Relaxed);
610 }
611
612 fn len(&self) -> usize {
613 self.current_size.load(Ordering::Relaxed)
614 }
615
616 fn eviction_count(&self) -> u64 {
617 self.eviction_count.load(Ordering::Relaxed)
618 }
619
620 async fn cleanup_expired(&self) -> Result<usize, ImportanceAssessmentError> {
621 let mut cache = self.cache.write().await;
622 let mut lru_list = self.lru_list.write().await;
623
624 let mut expired_keys = Vec::new();
625
626 for (key, value) in cache.iter() {
627 if value.is_expired() {
628 expired_keys.push(key.clone());
629 }
630 }
631
632 for key in &expired_keys {
633 cache.remove(key);
634 *lru_list = lru_list
635 .iter()
636 .filter(|node| &node.key != key)
637 .cloned()
638 .collect();
639 }
640
641 let removed_count = expired_keys.len();
642 self.current_size.store(cache.len(), Ordering::Relaxed);
643
644 if removed_count > 0 {
645 debug!(
646 "Cleaned up {} expired entries from embedding cache",
647 removed_count
648 );
649 }
650
651 Ok(removed_count)
652 }
653}
654
655#[derive(Debug)]
657pub struct ImportanceAssessmentMetrics {
658 pub stage1_executions: IntCounter,
660 pub stage2_executions: IntCounter,
661 pub stage3_executions: IntCounter,
662
663 pub stage1_duration: Histogram,
665 pub stage2_duration: Histogram,
666 pub stage3_duration: Histogram,
667
668 pub completed_at_stage1: IntCounter,
670 pub completed_at_stage2: IntCounter,
671 pub completed_at_stage3: IntCounter,
672
673 pub stage1_threshold_violations: IntCounter,
675 pub stage2_threshold_violations: IntCounter,
676 pub stage3_threshold_violations: IntCounter,
677
678 pub embedding_cache_hits: IntCounter,
680 pub embedding_cache_misses: IntCounter,
681 pub embedding_cache_size: IntGauge,
682
683 pub circuit_breaker_opened: Counter,
685 pub circuit_breaker_half_open: Counter,
686 pub circuit_breaker_closed: Counter,
687 pub llm_call_failures: IntCounter,
688 pub llm_call_successes: IntCounter,
689
690 pub assessment_confidence: Histogram,
692 pub final_importance_scores: Histogram,
693}
694
695impl ImportanceAssessmentMetrics {
696 pub fn new(registry: &Registry) -> Result<Self> {
697 let stage1_executions = IntCounter::new(
698 "importance_assessment_stage1_executions_total",
699 "Total number of Stage 1 executions",
700 )?;
701 registry.register(Box::new(stage1_executions.clone()))?;
702
703 let stage2_executions = IntCounter::new(
704 "importance_assessment_stage2_executions_total",
705 "Total number of Stage 2 executions",
706 )?;
707 registry.register(Box::new(stage2_executions.clone()))?;
708
709 let stage3_executions = IntCounter::new(
710 "importance_assessment_stage3_executions_total",
711 "Total number of Stage 3 executions",
712 )?;
713 registry.register(Box::new(stage3_executions.clone()))?;
714
715 let stage1_duration = Histogram::with_opts(
716 prometheus::HistogramOpts::new(
717 "importance_assessment_stage1_duration_seconds",
718 "Duration of Stage 1 processing",
719 )
720 .buckets(vec![0.001, 0.005, 0.01, 0.02, 0.05, 0.1]),
721 )?;
722 registry.register(Box::new(stage1_duration.clone()))?;
723
724 let stage2_duration = Histogram::with_opts(
725 prometheus::HistogramOpts::new(
726 "importance_assessment_stage2_duration_seconds",
727 "Duration of Stage 2 processing",
728 )
729 .buckets(vec![0.01, 0.05, 0.1, 0.2, 0.5, 1.0]),
730 )?;
731 registry.register(Box::new(stage2_duration.clone()))?;
732
733 let stage3_duration = Histogram::with_opts(
734 prometheus::HistogramOpts::new(
735 "importance_assessment_stage3_duration_seconds",
736 "Duration of Stage 3 processing",
737 )
738 .buckets(vec![0.1, 0.5, 1.0, 2.0, 5.0, 10.0]),
739 )?;
740 registry.register(Box::new(stage3_duration.clone()))?;
741
742 let completed_at_stage1 = IntCounter::new(
743 "importance_assessment_completed_at_stage1_total",
744 "Total assessments completed at Stage 1",
745 )?;
746 registry.register(Box::new(completed_at_stage1.clone()))?;
747
748 let completed_at_stage2 = IntCounter::new(
749 "importance_assessment_completed_at_stage2_total",
750 "Total assessments completed at Stage 2",
751 )?;
752 registry.register(Box::new(completed_at_stage2.clone()))?;
753
754 let completed_at_stage3 = IntCounter::new(
755 "importance_assessment_completed_at_stage3_total",
756 "Total assessments completed at Stage 3",
757 )?;
758 registry.register(Box::new(completed_at_stage3.clone()))?;
759
760 let stage1_threshold_violations = IntCounter::new(
761 "importance_assessment_stage1_threshold_violations_total",
762 "Total Stage 1 performance threshold violations",
763 )?;
764 registry.register(Box::new(stage1_threshold_violations.clone()))?;
765
766 let stage2_threshold_violations = IntCounter::new(
767 "importance_assessment_stage2_threshold_violations_total",
768 "Total Stage 2 performance threshold violations",
769 )?;
770 registry.register(Box::new(stage2_threshold_violations.clone()))?;
771
772 let stage3_threshold_violations = IntCounter::new(
773 "importance_assessment_stage3_threshold_violations_total",
774 "Total Stage 3 performance threshold violations",
775 )?;
776 registry.register(Box::new(stage3_threshold_violations.clone()))?;
777
778 let embedding_cache_hits = IntCounter::new(
779 "importance_assessment_embedding_cache_hits_total",
780 "Total embedding cache hits",
781 )?;
782 registry.register(Box::new(embedding_cache_hits.clone()))?;
783
784 let embedding_cache_misses = IntCounter::new(
785 "importance_assessment_embedding_cache_misses_total",
786 "Total embedding cache misses",
787 )?;
788 registry.register(Box::new(embedding_cache_misses.clone()))?;
789
790 let embedding_cache_size = IntGauge::new(
791 "importance_assessment_embedding_cache_size",
792 "Current size of embedding cache",
793 )?;
794 registry.register(Box::new(embedding_cache_size.clone()))?;
795
796 let circuit_breaker_opened = Counter::new(
797 "importance_assessment_circuit_breaker_opened_total",
798 "Total times circuit breaker opened",
799 )?;
800 registry.register(Box::new(circuit_breaker_opened.clone()))?;
801
802 let circuit_breaker_half_open = Counter::new(
803 "importance_assessment_circuit_breaker_half_open_total",
804 "Total times circuit breaker went half-open",
805 )?;
806 registry.register(Box::new(circuit_breaker_half_open.clone()))?;
807
808 let circuit_breaker_closed = Counter::new(
809 "importance_assessment_circuit_breaker_closed_total",
810 "Total times circuit breaker closed",
811 )?;
812 registry.register(Box::new(circuit_breaker_closed.clone()))?;
813
814 let llm_call_failures = IntCounter::new(
815 "importance_assessment_llm_call_failures_total",
816 "Total LLM call failures",
817 )?;
818 registry.register(Box::new(llm_call_failures.clone()))?;
819
820 let llm_call_successes = IntCounter::new(
821 "importance_assessment_llm_call_successes_total",
822 "Total LLM call successes",
823 )?;
824 registry.register(Box::new(llm_call_successes.clone()))?;
825
826 let assessment_confidence = Histogram::with_opts(
827 prometheus::HistogramOpts::new(
828 "importance_assessment_confidence",
829 "Confidence scores of assessments",
830 )
831 .buckets(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]),
832 )?;
833 registry.register(Box::new(assessment_confidence.clone()))?;
834
835 let final_importance_scores = Histogram::with_opts(
836 prometheus::HistogramOpts::new(
837 "importance_assessment_final_scores",
838 "Final importance scores from assessments",
839 )
840 .buckets(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]),
841 )?;
842 registry.register(Box::new(final_importance_scores.clone()))?;
843
844 Ok(Self {
845 stage1_executions,
846 stage2_executions,
847 stage3_executions,
848 stage1_duration,
849 stage2_duration,
850 stage3_duration,
851 completed_at_stage1,
852 completed_at_stage2,
853 completed_at_stage3,
854 stage1_threshold_violations,
855 stage2_threshold_violations,
856 stage3_threshold_violations,
857 embedding_cache_hits,
858 embedding_cache_misses,
859 embedding_cache_size,
860 circuit_breaker_opened,
861 circuit_breaker_half_open,
862 circuit_breaker_closed,
863 llm_call_failures,
864 llm_call_successes,
865 assessment_confidence,
866 final_importance_scores,
867 })
868 }
869}
870
871#[derive(Debug)]
873struct OptimizedPatternMatcher {
874 regex_patterns: Vec<(Regex, ImportancePattern)>,
876 keyword_patterns: Vec<(String, ImportancePattern)>,
878 #[allow(dead_code)] combined_regex: Option<Regex>,
881}
882
883impl OptimizedPatternMatcher {
884 fn new(patterns: &[ImportancePattern]) -> Result<Self, ImportanceAssessmentError> {
885 let mut regex_patterns = Vec::new();
886 let mut keyword_patterns = Vec::new();
887
888 for pattern in patterns {
889 Self::validate_regex_complexity(&pattern.pattern)?;
891
892 if Self::is_simple_keyword_pattern(&pattern.pattern) {
894 if let Some(keyword) = Self::extract_keyword(&pattern.pattern) {
896 keyword_patterns.push((keyword.to_lowercase(), pattern.clone()));
897 continue;
898 }
899 }
900
901 match Regex::new(&pattern.pattern) {
903 Ok(regex) => regex_patterns.push((regex, pattern.clone())),
904 Err(e) => {
905 error!(
906 "Failed to compile regex pattern '{}': {}",
907 pattern.pattern, e
908 );
909 return Err(ImportanceAssessmentError::Configuration(format!(
910 "Invalid regex pattern '{}': {}",
911 pattern.pattern, e
912 )));
913 }
914 }
915 }
916
917 Ok(Self {
918 regex_patterns,
919 keyword_patterns,
920 combined_regex: None, })
922 }
923
924 fn validate_regex_complexity(pattern: &str) -> Result<(), ImportanceAssessmentError> {
925 let dangerous_patterns = [
927 "(.*)*",
928 "(.+)+",
929 "([^x]*)*",
930 "(a|a)*",
931 "(a|a)+",
932 "(a*)*",
933 "(a+)+",
934 "(.{0,10000})*",
935 "(.*){10,}",
936 "(.+){10,}",
937 ];
938
939 for dangerous in &dangerous_patterns {
940 if pattern.contains(dangerous) {
941 return Err(ImportanceAssessmentError::Configuration(format!(
942 "Regex pattern contains potentially dangerous sequence '{dangerous}': {pattern}"
943 )));
944 }
945 }
946
947 if pattern.len() > 1000 {
949 return Err(ImportanceAssessmentError::Configuration(
950 "Regex pattern too long (max 1000 characters)".to_string(),
951 ));
952 }
953
954 let open_parens = pattern.chars().filter(|&c| c == '(').count();
956 if open_parens > 20 {
957 return Err(ImportanceAssessmentError::Configuration(
958 "Regex pattern has too many nested groups (max 20)".to_string(),
959 ));
960 }
961
962 Ok(())
963 }
964
965 fn is_simple_keyword_pattern(pattern: &str) -> bool {
966 pattern.starts_with("(?i)")
968 && pattern.contains("\\b")
969 && !pattern.contains(".*")
970 && !pattern.contains(".+")
971 && !pattern.contains("\\d")
972 && !pattern.contains("\\s")
973 && pattern.chars().filter(|&c| c == '(').count() <= 2
974 }
975
976 fn extract_keyword(pattern: &str) -> Option<String> {
977 if let Some(start) = pattern.find('(') {
979 if let Some(end) = pattern.rfind(')') {
980 let keywords = &pattern[start + 1..end];
981 if !keywords.contains('|') && keywords.chars().all(|c| c.is_alphabetic()) {
983 return Some(keywords.to_string());
984 }
985 }
986 }
987 None
988 }
989
990 fn find_matches(&self, content: &str, max_matches: usize) -> Vec<MatchedPattern> {
991 let mut matches = Vec::new();
992 let content_lower = content.to_lowercase();
993
994 for (keyword, pattern) in &self.keyword_patterns {
996 if matches.len() >= max_matches {
997 break;
998 }
999
1000 let mut start = 0;
1001 while let Some(pos) = content_lower[start..].find(keyword) {
1002 let absolute_pos = start + pos;
1003
1004 let is_word_start = absolute_pos == 0
1006 || !content_lower
1007 .chars()
1008 .nth(absolute_pos - 1)
1009 .unwrap_or(' ')
1010 .is_alphabetic();
1011 let is_word_end = absolute_pos + keyword.len() >= content_lower.len()
1012 || !content_lower
1013 .chars()
1014 .nth(absolute_pos + keyword.len())
1015 .unwrap_or(' ')
1016 .is_alphabetic();
1017
1018 if is_word_start && is_word_end {
1019 let context_boost = self.calculate_context_boost(
1020 content,
1021 absolute_pos,
1022 &pattern.context_boosters,
1023 );
1024
1025 matches.push(MatchedPattern {
1026 pattern_name: pattern.name.clone(),
1027 pattern_category: pattern.category.clone(),
1028 match_text: keyword.clone(),
1029 match_position: absolute_pos,
1030 weight: pattern.weight,
1031 context_boost,
1032 });
1033 }
1034
1035 start = absolute_pos + 1;
1036 }
1037 }
1038
1039 for (regex, pattern) in &self.regex_patterns {
1041 if matches.len() >= max_matches {
1042 break;
1043 }
1044
1045 for mat in regex.find_iter(content).take(max_matches - matches.len()) {
1046 let match_text = mat.as_str().to_string();
1047 let match_position = mat.start();
1048
1049 let context_boost = self.calculate_context_boost(
1050 content,
1051 match_position,
1052 &pattern.context_boosters,
1053 );
1054
1055 matches.push(MatchedPattern {
1056 pattern_name: pattern.name.clone(),
1057 pattern_category: pattern.category.clone(),
1058 match_text,
1059 match_position,
1060 weight: pattern.weight,
1061 context_boost,
1062 });
1063 }
1064 }
1065
1066 matches
1067 }
1068
1069 fn calculate_context_boost(
1070 &self,
1071 content: &str,
1072 match_position: usize,
1073 boosters: &[String],
1074 ) -> f64 {
1075 let window_size = 100;
1076 let start = match_position.saturating_sub(window_size);
1077 let end = (match_position + window_size).min(content.len());
1078 let context = &content[start..end].to_lowercase();
1079
1080 let mut boost: f64 = 0.0;
1081 for booster in boosters {
1082 if context.contains(&booster.to_lowercase()) {
1083 boost += 0.1; }
1085 }
1086
1087 boost.min(0.5_f64) }
1089}
1090
1091pub struct ImportanceAssessmentPipeline {
1093 config: ImportanceAssessmentConfig,
1094 pattern_matcher: OptimizedPatternMatcher,
1095 embedding_service: Arc<dyn EmbeddingService>,
1096 embedding_cache: EmbeddingCache,
1097 circuit_breaker: CircuitBreaker,
1098 metrics: ImportanceAssessmentMetrics,
1099 http_client: reqwest::Client,
1100}
1101
1102impl ImportanceAssessmentPipeline {
1103 pub fn new(
1104 config: ImportanceAssessmentConfig,
1105 embedding_service: Arc<dyn EmbeddingService>,
1106 metrics_registry: &Registry,
1107 ) -> Result<Self> {
1108 let pattern_matcher = OptimizedPatternMatcher::new(&config.stage1.pattern_library)?;
1110
1111 let metrics = ImportanceAssessmentMetrics::new(metrics_registry)?;
1112
1113 let circuit_breaker = CircuitBreaker::new(config.circuit_breaker.clone());
1114
1115 let http_client = reqwest::Client::builder()
1116 .timeout(Duration::from_millis(config.stage3.max_processing_time_ms))
1117 .build()?;
1118
1119 let embedding_cache = EmbeddingCache::new(
1120 config.stage2.embedding_cache_max_size,
1121 config.stage2.cache_eviction_threshold,
1122 );
1123
1124 Ok(Self {
1125 config,
1126 pattern_matcher,
1127 embedding_service,
1128 embedding_cache,
1129 circuit_breaker,
1130 metrics,
1131 http_client,
1132 })
1133 }
1134
1135 pub async fn assess_importance(
1137 &self,
1138 content: &str,
1139 ) -> Result<ImportanceAssessmentResult, ImportanceAssessmentError> {
1140 let assessment_start = Instant::now();
1141 let mut stage_results = Vec::new();
1142
1143 info!(
1144 "Starting importance assessment for content length: {}",
1145 content.len()
1146 );
1147
1148 let stage1_result = self.execute_stage1(content).await?;
1150 let stage1_passed = stage1_result.passed_threshold;
1151 stage_results.push(stage1_result.clone());
1152
1153 if stage1_passed {
1154 debug!("Stage 1 passed threshold, proceeding to Stage 2");
1155
1156 let stage2_result = self.execute_stage2(content).await?;
1158 let stage2_passed = stage2_result.passed_threshold;
1159 stage_results.push(stage2_result.clone());
1160
1161 if stage2_passed {
1162 debug!("Stage 2 passed threshold, proceeding to Stage 3");
1163
1164 let stage3_result = self.execute_stage3(content).await?;
1166 stage_results.push(stage3_result.clone());
1167
1168 self.metrics.completed_at_stage3.inc();
1169
1170 let final_score = stage3_result.score;
1171 let confidence = stage3_result.confidence;
1172
1173 let result = ImportanceAssessmentResult {
1174 importance_score: final_score,
1175 final_stage: AssessmentStage::Stage3LLMScoring,
1176 stage_results,
1177 total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1178 assessed_at: Utc::now(),
1179 confidence,
1180 explanation: self.extract_explanation_from_stage3(&stage3_result),
1181 };
1182
1183 self.record_final_metrics(&result);
1184 Ok(result)
1185 } else {
1186 self.metrics.completed_at_stage2.inc();
1187
1188 let final_score = stage2_result.score;
1189 let confidence = stage2_result.confidence;
1190
1191 let result = ImportanceAssessmentResult {
1192 importance_score: final_score,
1193 final_stage: AssessmentStage::Stage2SemanticSimilarity,
1194 stage_results,
1195 total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1196 assessed_at: Utc::now(),
1197 confidence,
1198 explanation: Some(
1199 "Assessment completed at Stage 2 based on semantic similarity".to_string(),
1200 ),
1201 };
1202
1203 self.record_final_metrics(&result);
1204 Ok(result)
1205 }
1206 } else {
1207 self.metrics.completed_at_stage1.inc();
1208
1209 let final_score = stage1_result.score;
1210 let confidence = stage1_result.confidence;
1211
1212 let result = ImportanceAssessmentResult {
1213 importance_score: final_score,
1214 final_stage: AssessmentStage::Stage1PatternMatching,
1215 stage_results,
1216 total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1217 assessed_at: Utc::now(),
1218 confidence,
1219 explanation: Some(
1220 "Assessment completed at Stage 1 based on pattern matching".to_string(),
1221 ),
1222 };
1223
1224 self.record_final_metrics(&result);
1225 Ok(result)
1226 }
1227 }
1228
1229 async fn execute_stage1(
1230 &self,
1231 content: &str,
1232 ) -> Result<StageResult, ImportanceAssessmentError> {
1233 let stage_start = Instant::now();
1234 self.metrics.stage1_executions.inc();
1235
1236 let timeout_duration = Duration::from_millis(self.config.stage1.max_processing_time_ms);
1237
1238 let result = timeout(timeout_duration, async {
1239 let content_for_analysis = if content.len() > 10000 {
1241 warn!(
1242 "Content length {} exceeds Stage 1 limit, truncating to 10000 chars",
1243 content.len()
1244 );
1245 &content[..10000]
1246 } else {
1247 content
1248 };
1249
1250 let max_matches = 50; let matched_patterns = self
1253 .pattern_matcher
1254 .find_matches(content_for_analysis, max_matches);
1255
1256 let mut total_score = 0.0;
1257 let mut max_weight: f64 = 0.0;
1258
1259 for pattern in &matched_patterns {
1260 let effective_weight = pattern.weight * (1.0 + pattern.context_boost);
1261 total_score += effective_weight;
1262 max_weight = max_weight.max(effective_weight);
1263 }
1264
1265 let normalized_score = if matched_patterns.is_empty() {
1267 0.0
1268 } else {
1269 (total_score / (matched_patterns.len() as f64)).min(1.0)
1270 };
1271
1272 let confidence = if matched_patterns.is_empty() {
1274 0.1 } else {
1276 let pattern_diversity = matched_patterns
1277 .iter()
1278 .map(|m| m.pattern_category.clone())
1279 .collect::<std::collections::HashSet<_>>()
1280 .len() as f64;
1281 let pattern_count = self.config.stage1.pattern_library.len().max(1) as f64; let base_confidence = (pattern_diversity / pattern_count).min(1.0);
1283 let strength_boost = (max_weight / 1.0_f64).min(0.3); (base_confidence + strength_boost).min(1.0)
1285 };
1286
1287 let passed_threshold = confidence >= self.config.stage1.confidence_threshold;
1288
1289 StageResult {
1290 stage: AssessmentStage::Stage1PatternMatching,
1291 score: normalized_score,
1292 confidence,
1293 processing_time_ms: stage_start.elapsed().as_millis() as u64,
1294 passed_threshold,
1295 details: StageDetails::Stage1 {
1296 matched_patterns,
1297 total_patterns_checked: self.config.stage1.pattern_library.len(),
1298 },
1299 }
1300 })
1301 .await;
1302
1303 match result {
1304 Ok(stage_result) => {
1305 let duration_seconds = stage_start.elapsed().as_secs_f64();
1306 self.metrics.stage1_duration.observe(duration_seconds);
1307
1308 if stage_result.processing_time_ms > self.config.performance.stage1_target_ms {
1310 self.metrics.stage1_threshold_violations.inc();
1311 warn!(
1312 "Stage 1 exceeded target time: {}ms > {}ms",
1313 stage_result.processing_time_ms, self.config.performance.stage1_target_ms
1314 );
1315 }
1316
1317 debug!(
1318 "Stage 1 completed in {}ms with score {:.3} and confidence {:.3}",
1319 stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1320 );
1321
1322 Ok(stage_result)
1323 }
1324 Err(_) => {
1325 self.metrics.stage1_threshold_violations.inc();
1326 Err(ImportanceAssessmentError::Timeout(format!(
1327 "Stage 1 exceeded maximum processing time of {}ms",
1328 self.config.stage1.max_processing_time_ms
1329 )))
1330 }
1331 }
1332 }
1333
1334 async fn execute_stage2(
1335 &self,
1336 content: &str,
1337 ) -> Result<StageResult, ImportanceAssessmentError> {
1338 let stage_start = Instant::now();
1339 self.metrics.stage2_executions.inc();
1340
1341 let timeout_duration = Duration::from_millis(self.config.stage2.max_processing_time_ms);
1342
1343 let stage2_result = async {
1344 if self.metrics.stage2_executions.get() % 100 == 0 {
1346 if let Err(e) = self.embedding_cache.cleanup_expired().await {
1347 warn!("Failed to cleanup expired cache entries: {}", e);
1348 }
1349 }
1350
1351 let mut hasher = Sha256::new();
1353 hasher.update(content.as_bytes());
1354 let content_hash = format!("{:x}", hasher.finalize());
1355
1356 let (content_embedding, cache_hit, embedding_time) =
1357 if let Some(cached) = self.embedding_cache.get(&content_hash).await {
1358 self.metrics.embedding_cache_hits.inc();
1359 (cached.embedding, true, None)
1360 } else {
1361 self.metrics.embedding_cache_misses.inc();
1362 let embed_start = Instant::now();
1363 let embedding = match self.embedding_service.generate_embedding(content).await {
1364 Ok(emb) => emb,
1365 Err(e) => {
1366 return Err(ImportanceAssessmentError::Stage2Failed(format!(
1367 "Embedding generation failed: {e}"
1368 )))
1369 }
1370 };
1371 let embed_time = embed_start.elapsed().as_millis() as u64;
1372
1373 let cached_embedding = CachedEmbedding::new(
1375 embedding.clone(),
1376 self.config.stage2.embedding_cache_ttl_seconds,
1377 );
1378
1379 if let Err(e) = self
1380 .embedding_cache
1381 .insert(content_hash, cached_embedding)
1382 .await
1383 {
1384 warn!("Failed to cache embedding: {}", e);
1385 }
1386
1387 self.metrics
1388 .embedding_cache_size
1389 .set(self.embedding_cache.len() as i64);
1390 (embedding, false, Some(embed_time))
1391 };
1392
1393 let mut similarity_scores = Vec::new();
1395 let mut total_weighted_score = 0.0;
1396 let mut total_weight = 0.0;
1397
1398 for reference in &self.config.stage2.reference_embeddings {
1399 let similarity =
1400 self.calculate_cosine_similarity(&content_embedding, &reference.embedding);
1401
1402 if similarity >= self.config.stage2.similarity_threshold {
1403 let weighted_score = similarity as f64 * reference.weight;
1404
1405 similarity_scores.push(SimilarityScore {
1406 reference_name: reference.name.clone(),
1407 reference_category: reference.category.clone(),
1408 similarity,
1409 weight: reference.weight,
1410 weighted_score,
1411 });
1412
1413 total_weighted_score += weighted_score;
1414 total_weight += reference.weight;
1415 }
1416 }
1417
1418 let normalized_score = if total_weight > 0.0 {
1420 (total_weighted_score / total_weight).min(1.0)
1421 } else {
1422 0.0
1423 };
1424
1425 let confidence = if similarity_scores.is_empty() {
1427 0.1 } else {
1429 let match_ratio = similarity_scores.len() as f64
1430 / self.config.stage2.reference_embeddings.len() as f64;
1431 let avg_similarity = similarity_scores
1432 .iter()
1433 .map(|s| s.similarity as f64)
1434 .sum::<f64>()
1435 / similarity_scores.len() as f64;
1436 (match_ratio * 0.5 + avg_similarity * 0.5).min(1.0)
1437 };
1438
1439 let passed_threshold = confidence >= self.config.stage2.confidence_threshold;
1440
1441 Ok(StageResult {
1442 stage: AssessmentStage::Stage2SemanticSimilarity,
1443 score: normalized_score,
1444 confidence,
1445 processing_time_ms: stage_start.elapsed().as_millis() as u64,
1446 passed_threshold,
1447 details: StageDetails::Stage2 {
1448 similarity_scores,
1449 cache_hit,
1450 embedding_generation_time_ms: embedding_time,
1451 },
1452 })
1453 };
1454
1455 let result = timeout(timeout_duration, stage2_result).await;
1456
1457 match result {
1458 Ok(Ok(stage_result)) => {
1459 let duration_seconds = stage_start.elapsed().as_secs_f64();
1460 self.metrics.stage2_duration.observe(duration_seconds);
1461
1462 if stage_result.processing_time_ms > self.config.performance.stage2_target_ms {
1464 self.metrics.stage2_threshold_violations.inc();
1465 warn!(
1466 "Stage 2 exceeded target time: {}ms > {}ms",
1467 stage_result.processing_time_ms, self.config.performance.stage2_target_ms
1468 );
1469 }
1470
1471 debug!(
1472 "Stage 2 completed in {}ms with score {:.3} and confidence {:.3}",
1473 stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1474 );
1475
1476 Ok(stage_result)
1477 }
1478 Ok(Err(e)) => Err(e),
1479 Err(_) => {
1480 self.metrics.stage2_threshold_violations.inc();
1481 Err(ImportanceAssessmentError::Timeout(format!(
1482 "Stage 2 exceeded maximum processing time of {}ms",
1483 self.config.stage2.max_processing_time_ms
1484 )))
1485 }
1486 }
1487 }
1488
1489 async fn execute_stage3(
1490 &self,
1491 content: &str,
1492 ) -> Result<StageResult, ImportanceAssessmentError> {
1493 let stage_start = Instant::now();
1494 self.metrics.stage3_executions.inc();
1495
1496 if !self.circuit_breaker.can_execute().await? {
1498 return Err(ImportanceAssessmentError::CircuitBreakerOpen(
1499 "LLM circuit breaker is open".to_string(),
1500 ));
1501 }
1502
1503 let timeout_duration = Duration::from_millis(self.config.stage3.max_processing_time_ms);
1504
1505 let result = timeout(timeout_duration, async {
1506 let content_preview = if content.len() > 2000 {
1508 format!(
1509 "{}... [truncated from {} chars]",
1510 &content[..2000],
1511 content.len()
1512 )
1513 } else {
1514 content.to_string()
1515 };
1516
1517 let prompt = self
1518 .config
1519 .stage3
1520 .prompt_template
1521 .replace("{content}", &content_preview)
1522 .replace("{timestamp}", &Utc::now().to_rfc3339());
1523
1524 let llm_response = self.call_llm(&prompt).await?;
1526
1527 let (importance_score, confidence) = self.parse_llm_response(&llm_response)?;
1529
1530 let passed_threshold = true; Ok::<StageResult, ImportanceAssessmentError>(StageResult {
1533 stage: AssessmentStage::Stage3LLMScoring,
1534 score: importance_score,
1535 confidence,
1536 processing_time_ms: stage_start.elapsed().as_millis() as u64,
1537 passed_threshold,
1538 details: StageDetails::Stage3 {
1539 llm_response,
1540 prompt_tokens: Some(prompt.len() / 4), completion_tokens: None, model_used: "configured-model".to_string(),
1543 },
1544 })
1545 })
1546 .await;
1547
1548 match result {
1549 Ok(Ok(stage_result)) => {
1550 let duration_seconds = stage_start.elapsed().as_secs_f64();
1551 self.metrics.stage3_duration.observe(duration_seconds);
1552 self.metrics.llm_call_successes.inc();
1553 self.circuit_breaker.record_success().await;
1554
1555 if stage_result.processing_time_ms > self.config.performance.stage3_target_ms {
1557 self.metrics.stage3_threshold_violations.inc();
1558 warn!(
1559 "Stage 3 exceeded target time: {}ms > {}ms",
1560 stage_result.processing_time_ms, self.config.performance.stage3_target_ms
1561 );
1562 }
1563
1564 debug!(
1565 "Stage 3 completed in {}ms with score {:.3} and confidence {:.3}",
1566 stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1567 );
1568
1569 Ok(stage_result)
1570 }
1571 Ok(Err(e)) => {
1572 self.metrics.llm_call_failures.inc();
1573 self.circuit_breaker.record_failure().await;
1574 Err(e)
1575 }
1576 Err(_) => {
1577 self.metrics.stage3_threshold_violations.inc();
1578 self.metrics.llm_call_failures.inc();
1579 self.circuit_breaker.record_failure().await;
1580 Err(ImportanceAssessmentError::Timeout(format!(
1581 "Stage 3 exceeded maximum processing time of {}ms",
1582 self.config.stage3.max_processing_time_ms
1583 )))
1584 }
1585 }
1586 }
1587
1588 fn calculate_cosine_similarity(&self, a: &[f32], b: &[f32]) -> f32 {
1589 if a.len() != b.len() {
1590 return 0.0;
1591 }
1592
1593 let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
1594 let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1595 let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1596
1597 if norm_a == 0.0 || norm_b == 0.0 {
1598 return 0.0;
1599 }
1600
1601 dot_product / (norm_a * norm_b)
1602 }
1603
1604 async fn call_llm(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1605 let sanitized_prompt = self.sanitize_llm_prompt(prompt)?;
1606
1607 let mut attempts = 0;
1609 let max_retries = 3;
1610 let base_delay = Duration::from_millis(100);
1611
1612 loop {
1613 match self.attempt_llm_call(&sanitized_prompt).await {
1614 Ok(response) => return Ok(response),
1615 Err(e) => {
1616 attempts += 1;
1617 if attempts > max_retries {
1618 return Err(e);
1619 }
1620
1621 let delay = base_delay * (2_u32.pow(attempts - 1));
1623 let jitter = Duration::from_millis(rand::thread_rng().gen_range(0..=100));
1624 tokio::time::sleep(delay + jitter).await;
1625
1626 warn!(
1627 "LLM call failed (attempt {}/{}), retrying in {:?}: {}",
1628 attempts,
1629 max_retries,
1630 delay + jitter,
1631 e
1632 );
1633 }
1634 }
1635 }
1636 }
1637
1638 async fn attempt_llm_call(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1639 if prompt.len() > 50000 {
1641 return Err(ImportanceAssessmentError::Stage3Failed(
1643 "Prompt exceeds maximum length".to_string(),
1644 ));
1645 }
1646
1647 let request_body = serde_json::json!({
1648 "prompt": prompt,
1649 "max_tokens": 150,
1650 "temperature": 0.1,
1651 "stop": ["\n\n", "---"],
1652 "model": "text-davinci-003" });
1654
1655 let response = self
1656 .http_client
1657 .post(&self.config.stage3.llm_endpoint)
1658 .header("Content-Type", "application/json")
1659 .header("User-Agent", "codex-memory/0.1.0")
1660 .json(&request_body)
1661 .send()
1662 .await
1663 .map_err(|e| {
1664 if e.is_timeout() {
1665 ImportanceAssessmentError::Timeout(format!("LLM request timed out: {e}"))
1666 } else if e.is_connect() {
1667 ImportanceAssessmentError::Stage3Failed(format!(
1668 "Failed to connect to LLM service: {e}"
1669 ))
1670 } else {
1671 ImportanceAssessmentError::Stage3Failed(format!("LLM request failed: {e}"))
1672 }
1673 })?;
1674
1675 let status = response.status();
1676 let response_text = response.text().await.map_err(|e| {
1677 ImportanceAssessmentError::Stage3Failed(format!("Failed to read response body: {e}"))
1678 })?;
1679
1680 if !status.is_success() {
1681 let error_msg = match status.as_u16() {
1683 400 => format!("Bad request to LLM service: {response_text}"),
1684 401 => "Unauthorized: Invalid API key or credentials".to_string(),
1685 403 => "Forbidden: Insufficient permissions".to_string(),
1686 429 => "Rate limit exceeded, will retry".to_string(),
1687 500..=599 => format!("LLM service error ({status}): {response_text}"),
1688 _ => format!("LLM service returned status {status}: {response_text}"),
1689 };
1690
1691 return Err(ImportanceAssessmentError::Stage3Failed(error_msg));
1692 }
1693
1694 let response_json: serde_json::Value =
1696 serde_json::from_str(&response_text).map_err(|e| {
1697 ImportanceAssessmentError::Stage3Failed(format!(
1698 "Failed to parse LLM response as JSON: {e}"
1699 ))
1700 })?;
1701
1702 let response_content = response_json
1704 .get("choices")
1705 .and_then(|c| c.get(0))
1706 .and_then(|choice| {
1707 choice
1709 .get("text")
1710 .or_else(|| choice.get("message").and_then(|m| m.get("content")))
1711 .or_else(|| choice.get("generated_text"))
1712 })
1713 .and_then(|v| v.as_str())
1714 .ok_or_else(|| {
1715 ImportanceAssessmentError::Stage3Failed(format!(
1716 "Invalid LLM response format. Expected 'choices[0].text' or similar. Got: {}",
1717 serde_json::to_string_pretty(&response_json)
1718 .unwrap_or_else(|_| "invalid JSON".to_string())
1719 ))
1720 })?
1721 .trim()
1722 .to_string();
1723
1724 if response_content.is_empty() {
1726 return Err(ImportanceAssessmentError::Stage3Failed(
1727 "LLM returned empty response".to_string(),
1728 ));
1729 }
1730
1731 if response_content.len() > 10000 {
1732 warn!("LLM response was truncated due to excessive length");
1734 return Ok(response_content[..10000].to_string());
1735 }
1736
1737 Ok(response_content)
1738 }
1739
1740 fn sanitize_llm_prompt(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1741 let dangerous_patterns = [
1743 "ignore previous instructions",
1744 "ignore all previous",
1745 "disregard previous",
1746 "forget previous",
1747 "new instructions:",
1748 "system:",
1749 "assistant:",
1750 "user:",
1751 "<|endoftext|>",
1752 "###",
1753 "---\n",
1754 ];
1755
1756 let lower_prompt = prompt.to_lowercase();
1757 for pattern in &dangerous_patterns {
1758 if lower_prompt.contains(pattern) {
1759 warn!("Potential prompt injection detected: {}", pattern);
1760 return Err(ImportanceAssessmentError::Stage3Failed(
1761 "Prompt contains potentially malicious content".to_string(),
1762 ));
1763 }
1764 }
1765
1766 let sanitized = prompt
1768 .replace('\0', "") .replace("\x1b", "") .chars()
1771 .filter(|c| c.is_ascii_graphic() || c.is_ascii_whitespace())
1772 .collect::<String>();
1773
1774 if sanitized.len() > 10000 {
1776 return Err(ImportanceAssessmentError::Stage3Failed(
1777 "Prompt too long after sanitization".to_string(),
1778 ));
1779 }
1780
1781 Ok(sanitized)
1782 }
1783
1784 fn parse_llm_response(&self, response: &str) -> Result<(f64, f64), ImportanceAssessmentError> {
1785 let lines: Vec<&str> = response.lines().collect();
1789 let mut importance_score = 0.5; let mut confidence = 0.7; for line in lines {
1793 let line = line.trim().to_lowercase();
1794
1795 if line.contains("importance:") || line.contains("score:") {
1797 if let Some(score_str) = line.split(':').nth(1) {
1798 if let Ok(score) = score_str.trim().parse::<f64>() {
1799 importance_score = score.clamp(0.0, 1.0);
1800 }
1801 }
1802 }
1803
1804 if line.contains("confidence:") {
1806 if let Some(conf_str) = line.split(':').nth(1) {
1807 if let Ok(conf) = conf_str.trim().parse::<f64>() {
1808 confidence = conf.clamp(0.0, 1.0);
1809 }
1810 }
1811 }
1812 }
1813
1814 Ok((importance_score, confidence))
1815 }
1816
1817 fn extract_explanation_from_stage3(&self, stage_result: &StageResult) -> Option<String> {
1818 if let StageDetails::Stage3 { llm_response, .. } = &stage_result.details {
1819 Some(llm_response.clone())
1820 } else {
1821 None
1822 }
1823 }
1824
1825 fn record_final_metrics(&self, result: &ImportanceAssessmentResult) {
1826 self.metrics
1827 .assessment_confidence
1828 .observe(result.confidence);
1829 self.metrics
1830 .final_importance_scores
1831 .observe(result.importance_score);
1832
1833 info!(
1834 "Importance assessment completed: score={:.3}, confidence={:.3}, stage={:?}, time={}ms",
1835 result.importance_score,
1836 result.confidence,
1837 result.final_stage,
1838 result.total_processing_time_ms
1839 );
1840 }
1841
1842 pub async fn get_statistics(&self) -> PipelineStatistics {
1844 let cache_size = self.embedding_cache.len();
1845 let eviction_count = self.embedding_cache.eviction_count();
1846
1847 PipelineStatistics {
1848 cache_size,
1849 stage1_executions: self.metrics.stage1_executions.get(),
1850 stage2_executions: self.metrics.stage2_executions.get(),
1851 stage3_executions: self.metrics.stage3_executions.get(),
1852 completed_at_stage1: self.metrics.completed_at_stage1.get(),
1853 completed_at_stage2: self.metrics.completed_at_stage2.get(),
1854 completed_at_stage3: self.metrics.completed_at_stage3.get(),
1855 cache_hits: self.metrics.embedding_cache_hits.get(),
1856 cache_misses: self.metrics.embedding_cache_misses.get(),
1857 cache_evictions: eviction_count,
1858 circuit_breaker_state: format!("{:?}", *self.circuit_breaker.state.read().await),
1859 llm_success_rate: {
1860 let successes = self.metrics.llm_call_successes.get() as f64;
1861 let failures = self.metrics.llm_call_failures.get() as f64;
1862 let total = successes + failures;
1863 if total > 0.0 {
1864 successes / total
1865 } else {
1866 1.0
1867 }
1868 },
1869 }
1870 }
1871
1872 pub async fn clear_cache(&self) {
1874 self.embedding_cache.clear().await;
1875 self.metrics.embedding_cache_size.set(0);
1876 info!("Embedding cache cleared");
1877 }
1878
1879 pub fn get_cache_hit_ratio(&self) -> f64 {
1881 let hits = self.metrics.embedding_cache_hits.get() as f64;
1882 let misses = self.metrics.embedding_cache_misses.get() as f64;
1883 let total = hits + misses;
1884 if total > 0.0 {
1885 hits / total
1886 } else {
1887 0.0
1888 }
1889 }
1890}
1891
1892#[derive(Debug, Clone, Serialize, Deserialize)]
1893pub struct PipelineStatistics {
1894 pub cache_size: usize,
1895 pub stage1_executions: u64,
1896 pub stage2_executions: u64,
1897 pub stage3_executions: u64,
1898 pub completed_at_stage1: u64,
1899 pub completed_at_stage2: u64,
1900 pub completed_at_stage3: u64,
1901 pub cache_hits: u64,
1902 pub cache_misses: u64,
1903 pub cache_evictions: u64,
1904 pub circuit_breaker_state: String,
1905 pub llm_success_rate: f64,
1906}
1907
1908impl Default for ImportanceAssessmentConfig {
1909 fn default() -> Self {
1910 Self {
1911 stage1: Stage1Config {
1912 confidence_threshold: 0.6,
1913 pattern_library: vec![
1914 ImportancePattern {
1915 name: "remember_command".to_string(),
1916 pattern: r"(?i)\b(remember|recall|don't forget)\b".to_string(),
1917 weight: 0.8,
1918 context_boosters: vec!["important".to_string(), "critical".to_string()],
1919 category: "memory".to_string(),
1920 },
1921 ImportancePattern {
1922 name: "preference_statement".to_string(),
1923 pattern: r"(?i)\b(prefer|like|want|choose)\b".to_string(),
1924 weight: 0.7,
1925 context_boosters: vec!["always".to_string(), "usually".to_string()],
1926 category: "preference".to_string(),
1927 },
1928 ImportancePattern {
1929 name: "decision_making".to_string(),
1930 pattern: r"(?i)\b(decide|decision|choose|select)\b".to_string(),
1931 weight: 0.75,
1932 context_boosters: vec!["final".to_string(), "official".to_string()],
1933 category: "decision".to_string(),
1934 },
1935 ImportancePattern {
1936 name: "correction".to_string(),
1937 pattern: r"(?i)\b(correct|fix|wrong|mistake|error)\b".to_string(),
1938 weight: 0.6,
1939 context_boosters: vec!["actually".to_string(), "should".to_string()],
1940 category: "correction".to_string(),
1941 },
1942 ImportancePattern {
1943 name: "importance_marker".to_string(),
1944 pattern: r"(?i)\b(important|critical|crucial|vital|essential)\b".to_string(),
1945 weight: 0.9,
1946 context_boosters: vec!["very".to_string(), "extremely".to_string()],
1947 category: "importance".to_string(),
1948 },
1949 ],
1950 max_processing_time_ms: 10,
1951 },
1952 stage2: Stage2Config {
1953 confidence_threshold: 0.7,
1954 max_processing_time_ms: 100,
1955 embedding_cache_ttl_seconds: 3600, embedding_cache_max_size: 10000, cache_eviction_threshold: 0.8, similarity_threshold: 0.7,
1959 reference_embeddings: vec![], },
1961 stage3: Stage3Config {
1962 max_processing_time_ms: 1000,
1963 llm_endpoint: "http://localhost:8080/generate".to_string(),
1964 max_concurrent_requests: 5,
1965 prompt_template: "Assess the importance of this content on a scale of 0.0 to 1.0. Consider context, user intent, and actionability.\n\nContent: {content}\n\nProvide your assessment as:\nImportance: [score]\nConfidence: [confidence]\nReasoning: [explanation]".to_string(),
1966 target_usage_percentage: 20.0,
1967 },
1968 circuit_breaker: CircuitBreakerConfig {
1969 failure_threshold: 5,
1970 failure_window_seconds: 60,
1971 recovery_timeout_seconds: 30,
1972 minimum_requests: 3,
1973 },
1974 performance: PerformanceConfig {
1975 stage1_target_ms: 10,
1976 stage2_target_ms: 100,
1977 stage3_target_ms: 1000,
1978 },
1979 }
1980 }
1981}