1use crate::embedding::EmbeddingService;
2use crate::memory::MemoryError;
3use anyhow::Result;
4use chrono::{DateTime, Utc};
5use prometheus::{Counter, Histogram, IntCounter, IntGauge, Registry};
6use rand::Rng;
7use regex::Regex;
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10use std::collections::{HashMap, LinkedList};
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use thiserror::Error;
15use tokio::sync::RwLock;
16use tokio::time::timeout;
17use tracing::{debug, error, info, warn};
18
19#[derive(Debug, Error)]
20pub enum ImportanceAssessmentError {
21 #[error("Stage 1 pattern matching failed: {0}")]
22 Stage1Failed(String),
23
24 #[error("Stage 2 semantic analysis failed: {0}")]
25 Stage2Failed(String),
26
27 #[error("Stage 3 LLM scoring failed: {0}")]
28 Stage3Failed(String),
29
30 #[error("Circuit breaker is open: {0}")]
31 CircuitBreakerOpen(String),
32
33 #[error("Timeout exceeded: {0}")]
34 Timeout(String),
35
36 #[error("Configuration error: {0}")]
37 Configuration(String),
38
39 #[error("Memory operation failed: {0}")]
40 Memory(#[from] MemoryError),
41
42 #[error("Cache operation failed: {0}")]
43 Cache(String),
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ImportanceAssessmentConfig {
49 pub stage1: Stage1Config,
51
52 pub stage2: Stage2Config,
54
55 pub stage3: Stage3Config,
57
58 pub circuit_breaker: CircuitBreakerConfig,
60
61 pub performance: PerformanceConfig,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct Stage1Config {
67 pub confidence_threshold: f64,
69
70 pub pattern_library: Vec<ImportancePattern>,
72
73 pub max_processing_time_ms: u64,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct Stage2Config {
79 pub confidence_threshold: f64,
81
82 pub max_processing_time_ms: u64,
84
85 pub embedding_cache_ttl_seconds: u64,
87
88 pub embedding_cache_max_size: usize,
90
91 pub cache_eviction_threshold: f64,
93
94 pub similarity_threshold: f32,
96
97 pub reference_embeddings: Vec<ReferenceEmbedding>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct Stage3Config {
103 pub max_processing_time_ms: u64,
105
106 pub llm_endpoint: String,
108
109 pub max_concurrent_requests: usize,
111
112 pub prompt_template: String,
114
115 pub target_usage_percentage: f64,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct CircuitBreakerConfig {
121 pub failure_threshold: usize,
123
124 pub failure_window_seconds: u64,
126
127 pub recovery_timeout_seconds: u64,
129
130 pub minimum_requests: usize,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct PerformanceConfig {
136 pub stage1_target_ms: u64,
138
139 pub stage2_target_ms: u64,
141
142 pub stage3_target_ms: u64,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct ImportancePattern {
148 pub name: String,
150
151 pub pattern: String,
153
154 pub weight: f64,
156
157 pub context_boosters: Vec<String>,
159
160 pub category: String,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct ReferenceEmbedding {
166 pub name: String,
168
169 pub embedding: Vec<f32>,
171
172 pub weight: f64,
174
175 pub category: String,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct ImportanceAssessmentResult {
182 pub importance_score: f64,
184
185 pub final_stage: AssessmentStage,
187
188 pub stage_results: Vec<StageResult>,
190
191 pub total_processing_time_ms: u64,
193
194 pub assessed_at: DateTime<Utc>,
196
197 pub confidence: f64,
199
200 pub explanation: Option<String>,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct StageResult {
206 pub stage: AssessmentStage,
208
209 pub score: f64,
211
212 pub confidence: f64,
214
215 pub processing_time_ms: u64,
217
218 pub passed_threshold: bool,
220
221 pub details: StageDetails,
223}
224
225#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
226pub enum AssessmentStage {
227 Stage1PatternMatching,
228 Stage2SemanticSimilarity,
229 Stage3LLMScoring,
230}
231
232#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
233pub enum StageDetails {
234 Stage1 {
235 matched_patterns: Vec<MatchedPattern>,
236 total_patterns_checked: usize,
237 },
238 Stage2 {
239 similarity_scores: Vec<SimilarityScore>,
240 cache_hit: bool,
241 embedding_generation_time_ms: Option<u64>,
242 },
243 Stage3 {
244 llm_response: String,
245 prompt_tokens: Option<usize>,
246 completion_tokens: Option<usize>,
247 model_used: String,
248 },
249}
250
251#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
252pub struct MatchedPattern {
253 pub pattern_name: String,
254 pub pattern_category: String,
255 pub match_text: String,
256 pub match_position: usize,
257 pub weight: f64,
258 pub context_boost: f64,
259}
260
261#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
262pub struct SimilarityScore {
263 pub reference_name: String,
264 pub reference_category: String,
265 pub similarity: f32,
266 pub weight: f64,
267 pub weighted_score: f64,
268}
269
270#[derive(Debug, Clone, PartialEq)]
272enum CircuitBreakerState {
273 Closed,
274 Open(DateTime<Utc>),
275 HalfOpen,
276}
277
278#[derive(Debug)]
280struct CircuitBreaker {
281 state: RwLock<CircuitBreakerState>,
282 config: CircuitBreakerConfig,
283 failure_count: AtomicUsize,
284 last_failure_time: RwLock<Option<DateTime<Utc>>>,
285 request_count: AtomicUsize,
286 consecutive_successes: AtomicUsize, }
288
289impl CircuitBreaker {
290 fn new(config: CircuitBreakerConfig) -> Self {
291 Self {
292 state: RwLock::new(CircuitBreakerState::Closed),
293 config,
294 failure_count: AtomicUsize::new(0),
295 last_failure_time: RwLock::new(None),
296 request_count: AtomicUsize::new(0),
297 consecutive_successes: AtomicUsize::new(0),
298 }
299 }
300
301 async fn can_execute(&self) -> Result<bool, ImportanceAssessmentError> {
302 let state = self.state.read().await;
303 match *state {
304 CircuitBreakerState::Closed => Ok(true),
305 CircuitBreakerState::Open(opened_at) => {
306 let now = Utc::now();
307 let recovery_time = opened_at
308 + chrono::Duration::seconds(self.config.recovery_timeout_seconds as i64);
309
310 if now >= recovery_time {
311 drop(state);
312 let mut state = self.state.write().await;
313 *state = CircuitBreakerState::HalfOpen;
314 Ok(true)
315 } else {
316 Err(ImportanceAssessmentError::CircuitBreakerOpen(format!(
317 "Circuit breaker is open until {}",
318 recovery_time
319 )))
320 }
321 }
322 CircuitBreakerState::HalfOpen => Ok(true),
323 }
324 }
325
326 async fn record_success(&self) {
327 let consecutive_successes = self.consecutive_successes.fetch_add(1, Ordering::SeqCst);
329
330 self.failure_count.store(0, Ordering::SeqCst);
332
333 let current_state = {
335 let state = self.state.read().await;
336 state.clone()
337 };
338
339 match current_state {
340 CircuitBreakerState::HalfOpen => {
341 if consecutive_successes >= 2 {
343 let mut state = self.state.write().await;
344 *state = CircuitBreakerState::Closed;
345
346 let mut last_failure_time = self.last_failure_time.write().await;
347 *last_failure_time = None;
348
349 info!(
350 "Circuit breaker closed after {} consecutive successes",
351 consecutive_successes + 1
352 );
353 }
354 }
355 CircuitBreakerState::Open(_) => {
356 warn!(
358 "Received success while circuit breaker is open - state inconsistency detected"
359 );
360 }
361 CircuitBreakerState::Closed => {
362 let mut last_failure_time = self.last_failure_time.write().await;
364 *last_failure_time = None;
365 }
366 }
367 }
368
369 async fn record_failure(&self) {
370 let now = Utc::now();
371
372 self.consecutive_successes.store(0, Ordering::SeqCst);
374
375 let request_count = self.request_count.fetch_add(1, Ordering::SeqCst) + 1;
377
378 let should_open_circuit = {
380 let mut last_failure_time = self.last_failure_time.write().await;
381
382 if let Some(last_failure) = *last_failure_time {
384 let window_start =
385 now - chrono::Duration::seconds(self.config.failure_window_seconds as i64);
386 if last_failure < window_start {
387 self.failure_count.store(0, Ordering::SeqCst);
389 }
390 }
391
392 let failure_count = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
394 *last_failure_time = Some(now);
395
396 request_count >= self.config.minimum_requests
398 && failure_count >= self.config.failure_threshold
399 };
400
401 if should_open_circuit {
402 let current_state = {
404 let state = self.state.read().await;
405 state.clone()
406 };
407
408 match current_state {
409 CircuitBreakerState::Closed | CircuitBreakerState::HalfOpen => {
410 let mut state = self.state.write().await;
411 match *state {
413 CircuitBreakerState::Closed | CircuitBreakerState::HalfOpen => {
414 *state = CircuitBreakerState::Open(now);
415 let failure_count = self.failure_count.load(Ordering::SeqCst);
416 warn!(
417 "Circuit breaker opened due to {} failures out of {} requests",
418 failure_count, request_count
419 );
420 }
421 CircuitBreakerState::Open(_) => {
422 }
424 }
425 }
426 CircuitBreakerState::Open(_) => {
427 debug!("Additional failure recorded while circuit breaker is open");
429 }
430 }
431 }
432 }
433}
434
435#[derive(Debug, Clone)]
437struct CachedEmbedding {
438 embedding: Vec<f32>,
439 cached_at: DateTime<Utc>,
440 ttl_seconds: u64,
441 last_accessed: DateTime<Utc>,
442}
443
444#[derive(Debug, Clone)]
446#[allow(dead_code)] struct LRUNode {
448 key: String,
449 timestamp: DateTime<Utc>,
450}
451
452#[derive(Debug)]
454struct EmbeddingCache {
455 cache: RwLock<HashMap<String, CachedEmbedding>>,
456 lru_list: RwLock<LinkedList<LRUNode>>,
457 current_size: AtomicUsize,
458 max_size: usize,
459 #[allow(dead_code)] eviction_threshold: f64,
461 eviction_count: AtomicU64,
462 memory_pressure_threshold: usize,
463}
464
465impl CachedEmbedding {
466 fn new(embedding: Vec<f32>, ttl_seconds: u64) -> Self {
467 let now = Utc::now();
468 Self {
469 embedding,
470 cached_at: now,
471 ttl_seconds,
472 last_accessed: now,
473 }
474 }
475
476 fn is_expired(&self) -> bool {
477 let now = Utc::now();
478 let expiry = self.cached_at + chrono::Duration::seconds(self.ttl_seconds as i64);
479 now >= expiry
480 }
481
482 fn touch(&mut self) {
483 self.last_accessed = Utc::now();
484 }
485}
486
487impl EmbeddingCache {
488 fn new(max_size: usize, eviction_threshold: f64) -> Self {
489 Self {
490 cache: RwLock::new(HashMap::new()),
491 lru_list: RwLock::new(LinkedList::new()),
492 current_size: AtomicUsize::new(0),
493 max_size,
494 eviction_threshold,
495 eviction_count: AtomicU64::new(0),
496 memory_pressure_threshold: (max_size as f64 * eviction_threshold) as usize,
497 }
498 }
499
500 async fn get(&self, key: &str) -> Option<CachedEmbedding> {
501 let mut cache = self.cache.write().await;
502 let mut lru_list = self.lru_list.write().await;
503
504 if let Some(cached) = cache.get_mut(key) {
505 if cached.is_expired() {
506 cache.remove(key);
507 self.current_size.fetch_sub(1, Ordering::Relaxed);
508 *lru_list = lru_list.iter().filter(|node| node.key != key).cloned().collect();
510 return None;
511 }
512
513 cached.touch();
514 *lru_list = lru_list.iter().filter(|node| node.key != key).cloned().collect();
516 lru_list.push_front(LRUNode {
517 key: key.to_string(),
518 timestamp: cached.last_accessed,
519 });
520
521 Some(cached.clone())
522 } else {
523 None
524 }
525 }
526
527 async fn insert(
528 &self,
529 key: String,
530 value: CachedEmbedding,
531 ) -> Result<(), ImportanceAssessmentError> {
532 let current_size = self.current_size.load(Ordering::Relaxed);
534 if current_size >= self.memory_pressure_threshold {
535 self.evict_lru_entries().await?;
536 }
537
538 let mut cache = self.cache.write().await;
539 let mut lru_list = self.lru_list.write().await;
540
541 if cache.contains_key(&key) {
543 *lru_list = lru_list.iter().filter(|node| node.key != key).cloned().collect();
544 } else {
545 self.current_size.fetch_add(1, Ordering::Relaxed);
546 }
547
548 cache.insert(key.clone(), value.clone());
549 lru_list.push_front(LRUNode {
550 key: key.clone(),
551 timestamp: value.last_accessed,
552 });
553
554 Ok(())
555 }
556
557 async fn evict_lru_entries(&self) -> Result<(), ImportanceAssessmentError> {
558 let mut cache = self.cache.write().await;
559 let mut lru_list = self.lru_list.write().await;
560
561 let target_size = (self.max_size as f64 * 0.7) as usize; let current_size = cache.len();
563
564 if current_size <= target_size {
565 return Ok(());
566 }
567
568 let entries_to_remove = current_size - target_size;
569 let mut removed_count = 0;
570
571 while removed_count < entries_to_remove && !lru_list.is_empty() {
573 if let Some(node) = lru_list.pop_back() {
574 cache.remove(&node.key);
575 removed_count += 1;
576 self.eviction_count.fetch_add(1, Ordering::Relaxed);
577 }
578 }
579
580 self.current_size.store(cache.len(), Ordering::Relaxed);
581
582 if removed_count > 0 {
583 info!(
584 "Evicted {} entries from embedding cache due to memory pressure",
585 removed_count
586 );
587 }
588
589 Ok(())
590 }
591
592 async fn clear(&self) {
593 let mut cache = self.cache.write().await;
594 let mut lru_list = self.lru_list.write().await;
595
596 cache.clear();
597 lru_list.clear();
598 self.current_size.store(0, Ordering::Relaxed);
599 }
600
601 fn len(&self) -> usize {
602 self.current_size.load(Ordering::Relaxed)
603 }
604
605 fn eviction_count(&self) -> u64 {
606 self.eviction_count.load(Ordering::Relaxed)
607 }
608
609 async fn cleanup_expired(&self) -> Result<usize, ImportanceAssessmentError> {
610 let mut cache = self.cache.write().await;
611 let mut lru_list = self.lru_list.write().await;
612
613 let mut expired_keys = Vec::new();
614
615 for (key, value) in cache.iter() {
616 if value.is_expired() {
617 expired_keys.push(key.clone());
618 }
619 }
620
621 for key in &expired_keys {
622 cache.remove(key);
623 *lru_list = lru_list.iter().filter(|node| &node.key != key).cloned().collect();
624 }
625
626 let removed_count = expired_keys.len();
627 self.current_size.store(cache.len(), Ordering::Relaxed);
628
629 if removed_count > 0 {
630 debug!(
631 "Cleaned up {} expired entries from embedding cache",
632 removed_count
633 );
634 }
635
636 Ok(removed_count)
637 }
638}
639
640#[derive(Debug)]
642pub struct ImportanceAssessmentMetrics {
643 pub stage1_executions: IntCounter,
645 pub stage2_executions: IntCounter,
646 pub stage3_executions: IntCounter,
647
648 pub stage1_duration: Histogram,
650 pub stage2_duration: Histogram,
651 pub stage3_duration: Histogram,
652
653 pub completed_at_stage1: IntCounter,
655 pub completed_at_stage2: IntCounter,
656 pub completed_at_stage3: IntCounter,
657
658 pub stage1_threshold_violations: IntCounter,
660 pub stage2_threshold_violations: IntCounter,
661 pub stage3_threshold_violations: IntCounter,
662
663 pub embedding_cache_hits: IntCounter,
665 pub embedding_cache_misses: IntCounter,
666 pub embedding_cache_size: IntGauge,
667
668 pub circuit_breaker_opened: Counter,
670 pub circuit_breaker_half_open: Counter,
671 pub circuit_breaker_closed: Counter,
672 pub llm_call_failures: IntCounter,
673 pub llm_call_successes: IntCounter,
674
675 pub assessment_confidence: Histogram,
677 pub final_importance_scores: Histogram,
678}
679
680impl ImportanceAssessmentMetrics {
681 pub fn new(registry: &Registry) -> Result<Self> {
682 let stage1_executions = IntCounter::new(
683 "importance_assessment_stage1_executions_total",
684 "Total number of Stage 1 executions",
685 )?;
686 registry.register(Box::new(stage1_executions.clone()))?;
687
688 let stage2_executions = IntCounter::new(
689 "importance_assessment_stage2_executions_total",
690 "Total number of Stage 2 executions",
691 )?;
692 registry.register(Box::new(stage2_executions.clone()))?;
693
694 let stage3_executions = IntCounter::new(
695 "importance_assessment_stage3_executions_total",
696 "Total number of Stage 3 executions",
697 )?;
698 registry.register(Box::new(stage3_executions.clone()))?;
699
700 let stage1_duration = Histogram::with_opts(
701 prometheus::HistogramOpts::new(
702 "importance_assessment_stage1_duration_seconds",
703 "Duration of Stage 1 processing",
704 )
705 .buckets(vec![0.001, 0.005, 0.01, 0.02, 0.05, 0.1]),
706 )?;
707 registry.register(Box::new(stage1_duration.clone()))?;
708
709 let stage2_duration = Histogram::with_opts(
710 prometheus::HistogramOpts::new(
711 "importance_assessment_stage2_duration_seconds",
712 "Duration of Stage 2 processing",
713 )
714 .buckets(vec![0.01, 0.05, 0.1, 0.2, 0.5, 1.0]),
715 )?;
716 registry.register(Box::new(stage2_duration.clone()))?;
717
718 let stage3_duration = Histogram::with_opts(
719 prometheus::HistogramOpts::new(
720 "importance_assessment_stage3_duration_seconds",
721 "Duration of Stage 3 processing",
722 )
723 .buckets(vec![0.1, 0.5, 1.0, 2.0, 5.0, 10.0]),
724 )?;
725 registry.register(Box::new(stage3_duration.clone()))?;
726
727 let completed_at_stage1 = IntCounter::new(
728 "importance_assessment_completed_at_stage1_total",
729 "Total assessments completed at Stage 1",
730 )?;
731 registry.register(Box::new(completed_at_stage1.clone()))?;
732
733 let completed_at_stage2 = IntCounter::new(
734 "importance_assessment_completed_at_stage2_total",
735 "Total assessments completed at Stage 2",
736 )?;
737 registry.register(Box::new(completed_at_stage2.clone()))?;
738
739 let completed_at_stage3 = IntCounter::new(
740 "importance_assessment_completed_at_stage3_total",
741 "Total assessments completed at Stage 3",
742 )?;
743 registry.register(Box::new(completed_at_stage3.clone()))?;
744
745 let stage1_threshold_violations = IntCounter::new(
746 "importance_assessment_stage1_threshold_violations_total",
747 "Total Stage 1 performance threshold violations",
748 )?;
749 registry.register(Box::new(stage1_threshold_violations.clone()))?;
750
751 let stage2_threshold_violations = IntCounter::new(
752 "importance_assessment_stage2_threshold_violations_total",
753 "Total Stage 2 performance threshold violations",
754 )?;
755 registry.register(Box::new(stage2_threshold_violations.clone()))?;
756
757 let stage3_threshold_violations = IntCounter::new(
758 "importance_assessment_stage3_threshold_violations_total",
759 "Total Stage 3 performance threshold violations",
760 )?;
761 registry.register(Box::new(stage3_threshold_violations.clone()))?;
762
763 let embedding_cache_hits = IntCounter::new(
764 "importance_assessment_embedding_cache_hits_total",
765 "Total embedding cache hits",
766 )?;
767 registry.register(Box::new(embedding_cache_hits.clone()))?;
768
769 let embedding_cache_misses = IntCounter::new(
770 "importance_assessment_embedding_cache_misses_total",
771 "Total embedding cache misses",
772 )?;
773 registry.register(Box::new(embedding_cache_misses.clone()))?;
774
775 let embedding_cache_size = IntGauge::new(
776 "importance_assessment_embedding_cache_size",
777 "Current size of embedding cache",
778 )?;
779 registry.register(Box::new(embedding_cache_size.clone()))?;
780
781 let circuit_breaker_opened = Counter::new(
782 "importance_assessment_circuit_breaker_opened_total",
783 "Total times circuit breaker opened",
784 )?;
785 registry.register(Box::new(circuit_breaker_opened.clone()))?;
786
787 let circuit_breaker_half_open = Counter::new(
788 "importance_assessment_circuit_breaker_half_open_total",
789 "Total times circuit breaker went half-open",
790 )?;
791 registry.register(Box::new(circuit_breaker_half_open.clone()))?;
792
793 let circuit_breaker_closed = Counter::new(
794 "importance_assessment_circuit_breaker_closed_total",
795 "Total times circuit breaker closed",
796 )?;
797 registry.register(Box::new(circuit_breaker_closed.clone()))?;
798
799 let llm_call_failures = IntCounter::new(
800 "importance_assessment_llm_call_failures_total",
801 "Total LLM call failures",
802 )?;
803 registry.register(Box::new(llm_call_failures.clone()))?;
804
805 let llm_call_successes = IntCounter::new(
806 "importance_assessment_llm_call_successes_total",
807 "Total LLM call successes",
808 )?;
809 registry.register(Box::new(llm_call_successes.clone()))?;
810
811 let assessment_confidence = Histogram::with_opts(
812 prometheus::HistogramOpts::new(
813 "importance_assessment_confidence",
814 "Confidence scores of assessments",
815 )
816 .buckets(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]),
817 )?;
818 registry.register(Box::new(assessment_confidence.clone()))?;
819
820 let final_importance_scores = Histogram::with_opts(
821 prometheus::HistogramOpts::new(
822 "importance_assessment_final_scores",
823 "Final importance scores from assessments",
824 )
825 .buckets(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]),
826 )?;
827 registry.register(Box::new(final_importance_scores.clone()))?;
828
829 Ok(Self {
830 stage1_executions,
831 stage2_executions,
832 stage3_executions,
833 stage1_duration,
834 stage2_duration,
835 stage3_duration,
836 completed_at_stage1,
837 completed_at_stage2,
838 completed_at_stage3,
839 stage1_threshold_violations,
840 stage2_threshold_violations,
841 stage3_threshold_violations,
842 embedding_cache_hits,
843 embedding_cache_misses,
844 embedding_cache_size,
845 circuit_breaker_opened,
846 circuit_breaker_half_open,
847 circuit_breaker_closed,
848 llm_call_failures,
849 llm_call_successes,
850 assessment_confidence,
851 final_importance_scores,
852 })
853 }
854}
855
856#[derive(Debug)]
858struct OptimizedPatternMatcher {
859 regex_patterns: Vec<(Regex, ImportancePattern)>,
861 keyword_patterns: Vec<(String, ImportancePattern)>,
863 #[allow(dead_code)] combined_regex: Option<Regex>,
866}
867
868impl OptimizedPatternMatcher {
869 fn new(patterns: &[ImportancePattern]) -> Result<Self, ImportanceAssessmentError> {
870 let mut regex_patterns = Vec::new();
871 let mut keyword_patterns = Vec::new();
872
873 for pattern in patterns {
874 Self::validate_regex_complexity(&pattern.pattern)?;
876
877 if Self::is_simple_keyword_pattern(&pattern.pattern) {
879 if let Some(keyword) = Self::extract_keyword(&pattern.pattern) {
881 keyword_patterns.push((keyword.to_lowercase(), pattern.clone()));
882 continue;
883 }
884 }
885
886 match Regex::new(&pattern.pattern) {
888 Ok(regex) => regex_patterns.push((regex, pattern.clone())),
889 Err(e) => {
890 error!(
891 "Failed to compile regex pattern '{}': {}",
892 pattern.pattern, e
893 );
894 return Err(ImportanceAssessmentError::Configuration(format!(
895 "Invalid regex pattern '{}': {}",
896 pattern.pattern, e
897 )));
898 }
899 }
900 }
901
902 Ok(Self {
903 regex_patterns,
904 keyword_patterns,
905 combined_regex: None, })
907 }
908
909 fn validate_regex_complexity(pattern: &str) -> Result<(), ImportanceAssessmentError> {
910 let dangerous_patterns = [
912 "(.*)*",
913 "(.+)+",
914 "([^x]*)*",
915 "(a|a)*",
916 "(a|a)+",
917 "(a*)*",
918 "(a+)+",
919 "(.{0,10000})*",
920 "(.*){10,}",
921 "(.+){10,}",
922 ];
923
924 for dangerous in &dangerous_patterns {
925 if pattern.contains(dangerous) {
926 return Err(ImportanceAssessmentError::Configuration(format!(
927 "Regex pattern contains potentially dangerous sequence '{}': {}",
928 dangerous, pattern
929 )));
930 }
931 }
932
933 if pattern.len() > 1000 {
935 return Err(ImportanceAssessmentError::Configuration(
936 "Regex pattern too long (max 1000 characters)".to_string(),
937 ));
938 }
939
940 let open_parens = pattern.chars().filter(|&c| c == '(').count();
942 if open_parens > 20 {
943 return Err(ImportanceAssessmentError::Configuration(
944 "Regex pattern has too many nested groups (max 20)".to_string(),
945 ));
946 }
947
948 Ok(())
949 }
950
951 fn is_simple_keyword_pattern(pattern: &str) -> bool {
952 pattern.starts_with("(?i)")
954 && pattern.contains("\\b")
955 && !pattern.contains(".*")
956 && !pattern.contains(".+")
957 && !pattern.contains("\\d")
958 && !pattern.contains("\\s")
959 && pattern.chars().filter(|&c| c == '(').count() <= 2
960 }
961
962 fn extract_keyword(pattern: &str) -> Option<String> {
963 if let Some(start) = pattern.find('(') {
965 if let Some(end) = pattern.rfind(')') {
966 let keywords = &pattern[start + 1..end];
967 if !keywords.contains('|') && keywords.chars().all(|c| c.is_alphabetic()) {
969 return Some(keywords.to_string());
970 }
971 }
972 }
973 None
974 }
975
976 fn find_matches(&self, content: &str, max_matches: usize) -> Vec<MatchedPattern> {
977 let mut matches = Vec::new();
978 let content_lower = content.to_lowercase();
979
980 for (keyword, pattern) in &self.keyword_patterns {
982 if matches.len() >= max_matches {
983 break;
984 }
985
986 let mut start = 0;
987 while let Some(pos) = content_lower[start..].find(keyword) {
988 let absolute_pos = start + pos;
989
990 let is_word_start = absolute_pos == 0
992 || !content_lower
993 .chars()
994 .nth(absolute_pos - 1)
995 .unwrap_or(' ')
996 .is_alphabetic();
997 let is_word_end = absolute_pos + keyword.len() >= content_lower.len()
998 || !content_lower
999 .chars()
1000 .nth(absolute_pos + keyword.len())
1001 .unwrap_or(' ')
1002 .is_alphabetic();
1003
1004 if is_word_start && is_word_end {
1005 let context_boost = self.calculate_context_boost(
1006 content,
1007 absolute_pos,
1008 &pattern.context_boosters,
1009 );
1010
1011 matches.push(MatchedPattern {
1012 pattern_name: pattern.name.clone(),
1013 pattern_category: pattern.category.clone(),
1014 match_text: keyword.clone(),
1015 match_position: absolute_pos,
1016 weight: pattern.weight,
1017 context_boost,
1018 });
1019 }
1020
1021 start = absolute_pos + 1;
1022 }
1023 }
1024
1025 for (regex, pattern) in &self.regex_patterns {
1027 if matches.len() >= max_matches {
1028 break;
1029 }
1030
1031 for mat in regex.find_iter(content).take(max_matches - matches.len()) {
1032 let match_text = mat.as_str().to_string();
1033 let match_position = mat.start();
1034
1035 let context_boost = self.calculate_context_boost(
1036 content,
1037 match_position,
1038 &pattern.context_boosters,
1039 );
1040
1041 matches.push(MatchedPattern {
1042 pattern_name: pattern.name.clone(),
1043 pattern_category: pattern.category.clone(),
1044 match_text,
1045 match_position,
1046 weight: pattern.weight,
1047 context_boost,
1048 });
1049 }
1050 }
1051
1052 matches
1053 }
1054
1055 fn calculate_context_boost(
1056 &self,
1057 content: &str,
1058 match_position: usize,
1059 boosters: &[String],
1060 ) -> f64 {
1061 let window_size = 100;
1062 let start = match_position.saturating_sub(window_size);
1063 let end = (match_position + window_size).min(content.len());
1064 let context = &content[start..end].to_lowercase();
1065
1066 let mut boost: f64 = 0.0;
1067 for booster in boosters {
1068 if context.contains(&booster.to_lowercase()) {
1069 boost += 0.1; }
1071 }
1072
1073 boost.min(0.5_f64) }
1075}
1076
1077pub struct ImportanceAssessmentPipeline {
1079 config: ImportanceAssessmentConfig,
1080 pattern_matcher: OptimizedPatternMatcher,
1081 embedding_service: Arc<dyn EmbeddingService>,
1082 embedding_cache: EmbeddingCache,
1083 circuit_breaker: CircuitBreaker,
1084 metrics: ImportanceAssessmentMetrics,
1085 http_client: reqwest::Client,
1086}
1087
1088impl ImportanceAssessmentPipeline {
1089 pub fn new(
1090 config: ImportanceAssessmentConfig,
1091 embedding_service: Arc<dyn EmbeddingService>,
1092 metrics_registry: &Registry,
1093 ) -> Result<Self> {
1094 let pattern_matcher = OptimizedPatternMatcher::new(&config.stage1.pattern_library)?;
1096
1097 let metrics = ImportanceAssessmentMetrics::new(metrics_registry)?;
1098
1099 let circuit_breaker = CircuitBreaker::new(config.circuit_breaker.clone());
1100
1101 let http_client = reqwest::Client::builder()
1102 .timeout(Duration::from_millis(config.stage3.max_processing_time_ms))
1103 .build()?;
1104
1105 let embedding_cache = EmbeddingCache::new(
1106 config.stage2.embedding_cache_max_size,
1107 config.stage2.cache_eviction_threshold,
1108 );
1109
1110 Ok(Self {
1111 config,
1112 pattern_matcher,
1113 embedding_service,
1114 embedding_cache,
1115 circuit_breaker,
1116 metrics,
1117 http_client,
1118 })
1119 }
1120
1121 pub async fn assess_importance(
1123 &self,
1124 content: &str,
1125 ) -> Result<ImportanceAssessmentResult, ImportanceAssessmentError> {
1126 let assessment_start = Instant::now();
1127 let mut stage_results = Vec::new();
1128
1129 info!(
1130 "Starting importance assessment for content length: {}",
1131 content.len()
1132 );
1133
1134 let stage1_result = self.execute_stage1(content).await?;
1136 let stage1_passed = stage1_result.passed_threshold;
1137 stage_results.push(stage1_result.clone());
1138
1139 if stage1_passed {
1140 debug!("Stage 1 passed threshold, proceeding to Stage 2");
1141
1142 let stage2_result = self.execute_stage2(content).await?;
1144 let stage2_passed = stage2_result.passed_threshold;
1145 stage_results.push(stage2_result.clone());
1146
1147 if stage2_passed {
1148 debug!("Stage 2 passed threshold, proceeding to Stage 3");
1149
1150 let stage3_result = self.execute_stage3(content).await?;
1152 stage_results.push(stage3_result.clone());
1153
1154 self.metrics.completed_at_stage3.inc();
1155
1156 let final_score = stage3_result.score;
1157 let confidence = stage3_result.confidence;
1158
1159 let result = ImportanceAssessmentResult {
1160 importance_score: final_score,
1161 final_stage: AssessmentStage::Stage3LLMScoring,
1162 stage_results,
1163 total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1164 assessed_at: Utc::now(),
1165 confidence,
1166 explanation: self.extract_explanation_from_stage3(&stage3_result),
1167 };
1168
1169 self.record_final_metrics(&result);
1170 return Ok(result);
1171 } else {
1172 self.metrics.completed_at_stage2.inc();
1173
1174 let final_score = stage2_result.score;
1175 let confidence = stage2_result.confidence;
1176
1177 let result = ImportanceAssessmentResult {
1178 importance_score: final_score,
1179 final_stage: AssessmentStage::Stage2SemanticSimilarity,
1180 stage_results,
1181 total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1182 assessed_at: Utc::now(),
1183 confidence,
1184 explanation: Some(
1185 "Assessment completed at Stage 2 based on semantic similarity".to_string(),
1186 ),
1187 };
1188
1189 self.record_final_metrics(&result);
1190 return Ok(result);
1191 }
1192 } else {
1193 self.metrics.completed_at_stage1.inc();
1194
1195 let final_score = stage1_result.score;
1196 let confidence = stage1_result.confidence;
1197
1198 let result = ImportanceAssessmentResult {
1199 importance_score: final_score,
1200 final_stage: AssessmentStage::Stage1PatternMatching,
1201 stage_results,
1202 total_processing_time_ms: assessment_start.elapsed().as_millis() as u64,
1203 assessed_at: Utc::now(),
1204 confidence,
1205 explanation: Some(
1206 "Assessment completed at Stage 1 based on pattern matching".to_string(),
1207 ),
1208 };
1209
1210 self.record_final_metrics(&result);
1211 return Ok(result);
1212 }
1213 }
1214
1215 async fn execute_stage1(
1216 &self,
1217 content: &str,
1218 ) -> Result<StageResult, ImportanceAssessmentError> {
1219 let stage_start = Instant::now();
1220 self.metrics.stage1_executions.inc();
1221
1222 let timeout_duration = Duration::from_millis(self.config.stage1.max_processing_time_ms);
1223
1224 let result = timeout(timeout_duration, async {
1225 let content_for_analysis = if content.len() > 10000 {
1227 warn!(
1228 "Content length {} exceeds Stage 1 limit, truncating to 10000 chars",
1229 content.len()
1230 );
1231 &content[..10000]
1232 } else {
1233 content
1234 };
1235
1236 let max_matches = 50; let matched_patterns = self
1239 .pattern_matcher
1240 .find_matches(content_for_analysis, max_matches);
1241
1242 let mut total_score = 0.0;
1243 let mut max_weight: f64 = 0.0;
1244
1245 for pattern in &matched_patterns {
1246 let effective_weight = pattern.weight * (1.0 + pattern.context_boost);
1247 total_score += effective_weight;
1248 max_weight = max_weight.max(effective_weight);
1249 }
1250
1251 let normalized_score = if matched_patterns.is_empty() {
1253 0.0
1254 } else {
1255 (total_score / (matched_patterns.len() as f64)).min(1.0)
1256 };
1257
1258 let confidence = if matched_patterns.is_empty() {
1260 0.1 } else {
1262 let pattern_diversity = matched_patterns
1263 .iter()
1264 .map(|m| m.pattern_category.clone())
1265 .collect::<std::collections::HashSet<_>>()
1266 .len() as f64;
1267 let pattern_count = self.config.stage1.pattern_library.len().max(1) as f64; let base_confidence = (pattern_diversity / pattern_count).min(1.0);
1269 let strength_boost = (max_weight / 1.0_f64).min(0.3); (base_confidence + strength_boost).min(1.0)
1271 };
1272
1273 let passed_threshold = confidence >= self.config.stage1.confidence_threshold;
1274
1275 StageResult {
1276 stage: AssessmentStage::Stage1PatternMatching,
1277 score: normalized_score,
1278 confidence,
1279 processing_time_ms: stage_start.elapsed().as_millis() as u64,
1280 passed_threshold,
1281 details: StageDetails::Stage1 {
1282 matched_patterns,
1283 total_patterns_checked: self.config.stage1.pattern_library.len(),
1284 },
1285 }
1286 })
1287 .await;
1288
1289 match result {
1290 Ok(stage_result) => {
1291 let duration_seconds = stage_start.elapsed().as_secs_f64();
1292 self.metrics.stage1_duration.observe(duration_seconds);
1293
1294 if stage_result.processing_time_ms > self.config.performance.stage1_target_ms {
1296 self.metrics.stage1_threshold_violations.inc();
1297 warn!(
1298 "Stage 1 exceeded target time: {}ms > {}ms",
1299 stage_result.processing_time_ms, self.config.performance.stage1_target_ms
1300 );
1301 }
1302
1303 debug!(
1304 "Stage 1 completed in {}ms with score {:.3} and confidence {:.3}",
1305 stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1306 );
1307
1308 Ok(stage_result)
1309 }
1310 Err(_) => {
1311 self.metrics.stage1_threshold_violations.inc();
1312 Err(ImportanceAssessmentError::Timeout(format!(
1313 "Stage 1 exceeded maximum processing time of {}ms",
1314 self.config.stage1.max_processing_time_ms
1315 )))
1316 }
1317 }
1318 }
1319
1320 async fn execute_stage2(
1321 &self,
1322 content: &str,
1323 ) -> Result<StageResult, ImportanceAssessmentError> {
1324 let stage_start = Instant::now();
1325 self.metrics.stage2_executions.inc();
1326
1327 let timeout_duration = Duration::from_millis(self.config.stage2.max_processing_time_ms);
1328
1329 let stage2_result = async {
1330 if self.metrics.stage2_executions.get() % 100 == 0 {
1332 if let Err(e) = self.embedding_cache.cleanup_expired().await {
1333 warn!("Failed to cleanup expired cache entries: {}", e);
1334 }
1335 }
1336
1337 let mut hasher = Sha256::new();
1339 hasher.update(content.as_bytes());
1340 let content_hash = format!("{:x}", hasher.finalize());
1341
1342 let (content_embedding, cache_hit, embedding_time) =
1343 if let Some(cached) = self.embedding_cache.get(&content_hash).await {
1344 self.metrics.embedding_cache_hits.inc();
1345 (cached.embedding, true, None)
1346 } else {
1347 self.metrics.embedding_cache_misses.inc();
1348 let embed_start = Instant::now();
1349 let embedding = match self.embedding_service.generate_embedding(content).await {
1350 Ok(emb) => emb,
1351 Err(e) => {
1352 return Err(ImportanceAssessmentError::Stage2Failed(format!(
1353 "Embedding generation failed: {}",
1354 e
1355 )))
1356 }
1357 };
1358 let embed_time = embed_start.elapsed().as_millis() as u64;
1359
1360 let cached_embedding = CachedEmbedding::new(
1362 embedding.clone(),
1363 self.config.stage2.embedding_cache_ttl_seconds,
1364 );
1365
1366 if let Err(e) = self
1367 .embedding_cache
1368 .insert(content_hash, cached_embedding)
1369 .await
1370 {
1371 warn!("Failed to cache embedding: {}", e);
1372 }
1373
1374 self.metrics
1375 .embedding_cache_size
1376 .set(self.embedding_cache.len() as i64);
1377 (embedding, false, Some(embed_time))
1378 };
1379
1380 let mut similarity_scores = Vec::new();
1382 let mut total_weighted_score = 0.0;
1383 let mut total_weight = 0.0;
1384
1385 for reference in &self.config.stage2.reference_embeddings {
1386 let similarity =
1387 self.calculate_cosine_similarity(&content_embedding, &reference.embedding);
1388
1389 if similarity >= self.config.stage2.similarity_threshold {
1390 let weighted_score = similarity as f64 * reference.weight;
1391
1392 similarity_scores.push(SimilarityScore {
1393 reference_name: reference.name.clone(),
1394 reference_category: reference.category.clone(),
1395 similarity,
1396 weight: reference.weight,
1397 weighted_score,
1398 });
1399
1400 total_weighted_score += weighted_score;
1401 total_weight += reference.weight;
1402 }
1403 }
1404
1405 let normalized_score = if total_weight > 0.0 {
1407 (total_weighted_score / total_weight).min(1.0)
1408 } else {
1409 0.0
1410 };
1411
1412 let confidence = if similarity_scores.is_empty() {
1414 0.1 } else {
1416 let match_ratio = similarity_scores.len() as f64
1417 / self.config.stage2.reference_embeddings.len() as f64;
1418 let avg_similarity = similarity_scores
1419 .iter()
1420 .map(|s| s.similarity as f64)
1421 .sum::<f64>()
1422 / similarity_scores.len() as f64;
1423 (match_ratio * 0.5 + avg_similarity * 0.5).min(1.0)
1424 };
1425
1426 let passed_threshold = confidence >= self.config.stage2.confidence_threshold;
1427
1428 Ok(StageResult {
1429 stage: AssessmentStage::Stage2SemanticSimilarity,
1430 score: normalized_score,
1431 confidence,
1432 processing_time_ms: stage_start.elapsed().as_millis() as u64,
1433 passed_threshold,
1434 details: StageDetails::Stage2 {
1435 similarity_scores,
1436 cache_hit,
1437 embedding_generation_time_ms: embedding_time,
1438 },
1439 })
1440 };
1441
1442 let result = timeout(timeout_duration, stage2_result).await;
1443
1444 match result {
1445 Ok(Ok(stage_result)) => {
1446 let duration_seconds = stage_start.elapsed().as_secs_f64();
1447 self.metrics.stage2_duration.observe(duration_seconds);
1448
1449 if stage_result.processing_time_ms > self.config.performance.stage2_target_ms {
1451 self.metrics.stage2_threshold_violations.inc();
1452 warn!(
1453 "Stage 2 exceeded target time: {}ms > {}ms",
1454 stage_result.processing_time_ms, self.config.performance.stage2_target_ms
1455 );
1456 }
1457
1458 debug!(
1459 "Stage 2 completed in {}ms with score {:.3} and confidence {:.3}",
1460 stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1461 );
1462
1463 Ok(stage_result)
1464 }
1465 Ok(Err(e)) => Err(e),
1466 Err(_) => {
1467 self.metrics.stage2_threshold_violations.inc();
1468 Err(ImportanceAssessmentError::Timeout(format!(
1469 "Stage 2 exceeded maximum processing time of {}ms",
1470 self.config.stage2.max_processing_time_ms
1471 )))
1472 }
1473 }
1474 }
1475
1476 async fn execute_stage3(
1477 &self,
1478 content: &str,
1479 ) -> Result<StageResult, ImportanceAssessmentError> {
1480 let stage_start = Instant::now();
1481 self.metrics.stage3_executions.inc();
1482
1483 if !self.circuit_breaker.can_execute().await? {
1485 return Err(ImportanceAssessmentError::CircuitBreakerOpen(
1486 "LLM circuit breaker is open".to_string(),
1487 ));
1488 }
1489
1490 let timeout_duration = Duration::from_millis(self.config.stage3.max_processing_time_ms);
1491
1492 let result = timeout(timeout_duration, async {
1493 let content_preview = if content.len() > 2000 {
1495 format!(
1496 "{}... [truncated from {} chars]",
1497 &content[..2000],
1498 content.len()
1499 )
1500 } else {
1501 content.to_string()
1502 };
1503
1504 let prompt = self
1505 .config
1506 .stage3
1507 .prompt_template
1508 .replace("{content}", &content_preview)
1509 .replace("{timestamp}", &Utc::now().to_rfc3339());
1510
1511 let llm_response = self.call_llm(&prompt).await?;
1513
1514 let (importance_score, confidence) = self.parse_llm_response(&llm_response)?;
1516
1517 let passed_threshold = true; Ok::<StageResult, ImportanceAssessmentError>(StageResult {
1520 stage: AssessmentStage::Stage3LLMScoring,
1521 score: importance_score,
1522 confidence,
1523 processing_time_ms: stage_start.elapsed().as_millis() as u64,
1524 passed_threshold,
1525 details: StageDetails::Stage3 {
1526 llm_response,
1527 prompt_tokens: Some(prompt.len() / 4), completion_tokens: None, model_used: "configured-model".to_string(),
1530 },
1531 })
1532 })
1533 .await;
1534
1535 match result {
1536 Ok(Ok(stage_result)) => {
1537 let duration_seconds = stage_start.elapsed().as_secs_f64();
1538 self.metrics.stage3_duration.observe(duration_seconds);
1539 self.metrics.llm_call_successes.inc();
1540 self.circuit_breaker.record_success().await;
1541
1542 if stage_result.processing_time_ms > self.config.performance.stage3_target_ms {
1544 self.metrics.stage3_threshold_violations.inc();
1545 warn!(
1546 "Stage 3 exceeded target time: {}ms > {}ms",
1547 stage_result.processing_time_ms, self.config.performance.stage3_target_ms
1548 );
1549 }
1550
1551 debug!(
1552 "Stage 3 completed in {}ms with score {:.3} and confidence {:.3}",
1553 stage_result.processing_time_ms, stage_result.score, stage_result.confidence
1554 );
1555
1556 Ok(stage_result)
1557 }
1558 Ok(Err(e)) => {
1559 self.metrics.llm_call_failures.inc();
1560 self.circuit_breaker.record_failure().await;
1561 Err(e)
1562 }
1563 Err(_) => {
1564 self.metrics.stage3_threshold_violations.inc();
1565 self.metrics.llm_call_failures.inc();
1566 self.circuit_breaker.record_failure().await;
1567 Err(ImportanceAssessmentError::Timeout(format!(
1568 "Stage 3 exceeded maximum processing time of {}ms",
1569 self.config.stage3.max_processing_time_ms
1570 )))
1571 }
1572 }
1573 }
1574
1575 fn calculate_cosine_similarity(&self, a: &[f32], b: &[f32]) -> f32 {
1576 if a.len() != b.len() {
1577 return 0.0;
1578 }
1579
1580 let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
1581 let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1582 let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1583
1584 if norm_a == 0.0 || norm_b == 0.0 {
1585 return 0.0;
1586 }
1587
1588 dot_product / (norm_a * norm_b)
1589 }
1590
1591 async fn call_llm(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1592 let sanitized_prompt = self.sanitize_llm_prompt(prompt)?;
1593
1594 let mut attempts = 0;
1596 let max_retries = 3;
1597 let base_delay = Duration::from_millis(100);
1598
1599 loop {
1600 match self.attempt_llm_call(&sanitized_prompt).await {
1601 Ok(response) => return Ok(response),
1602 Err(e) => {
1603 attempts += 1;
1604 if attempts > max_retries {
1605 return Err(e);
1606 }
1607
1608 let delay = base_delay * (2_u32.pow(attempts - 1));
1610 let jitter = Duration::from_millis(rand::thread_rng().gen_range(0..=100));
1611 tokio::time::sleep(delay + jitter).await;
1612
1613 warn!(
1614 "LLM call failed (attempt {}/{}), retrying in {:?}: {}",
1615 attempts,
1616 max_retries,
1617 delay + jitter,
1618 e
1619 );
1620 }
1621 }
1622 }
1623 }
1624
1625 async fn attempt_llm_call(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1626 if prompt.len() > 50000 {
1628 return Err(ImportanceAssessmentError::Stage3Failed(
1630 "Prompt exceeds maximum length".to_string(),
1631 ));
1632 }
1633
1634 let request_body = serde_json::json!({
1635 "prompt": prompt,
1636 "max_tokens": 150,
1637 "temperature": 0.1,
1638 "stop": ["\n\n", "---"],
1639 "model": "text-davinci-003" });
1641
1642 let response = self
1643 .http_client
1644 .post(&self.config.stage3.llm_endpoint)
1645 .header("Content-Type", "application/json")
1646 .header("User-Agent", "codex-memory/0.1.0")
1647 .json(&request_body)
1648 .send()
1649 .await
1650 .map_err(|e| {
1651 if e.is_timeout() {
1652 ImportanceAssessmentError::Timeout(format!("LLM request timed out: {}", e))
1653 } else if e.is_connect() {
1654 ImportanceAssessmentError::Stage3Failed(format!(
1655 "Failed to connect to LLM service: {}",
1656 e
1657 ))
1658 } else {
1659 ImportanceAssessmentError::Stage3Failed(format!("LLM request failed: {}", e))
1660 }
1661 })?;
1662
1663 let status = response.status();
1664 let response_text = response.text().await.map_err(|e| {
1665 ImportanceAssessmentError::Stage3Failed(format!("Failed to read response body: {}", e))
1666 })?;
1667
1668 if !status.is_success() {
1669 let error_msg = match status.as_u16() {
1671 400 => format!("Bad request to LLM service: {}", response_text),
1672 401 => "Unauthorized: Invalid API key or credentials".to_string(),
1673 403 => "Forbidden: Insufficient permissions".to_string(),
1674 429 => "Rate limit exceeded, will retry".to_string(),
1675 500..=599 => format!("LLM service error ({}): {}", status, response_text),
1676 _ => format!("LLM service returned status {}: {}", status, response_text),
1677 };
1678
1679 return Err(ImportanceAssessmentError::Stage3Failed(error_msg));
1680 }
1681
1682 let response_json: serde_json::Value =
1684 serde_json::from_str(&response_text).map_err(|e| {
1685 ImportanceAssessmentError::Stage3Failed(format!(
1686 "Failed to parse LLM response as JSON: {}",
1687 e
1688 ))
1689 })?;
1690
1691 let response_content = response_json
1693 .get("choices")
1694 .and_then(|c| c.get(0))
1695 .and_then(|choice| {
1696 choice
1698 .get("text")
1699 .or_else(|| choice.get("message").and_then(|m| m.get("content")))
1700 .or_else(|| choice.get("generated_text"))
1701 })
1702 .and_then(|v| v.as_str())
1703 .ok_or_else(|| {
1704 ImportanceAssessmentError::Stage3Failed(format!(
1705 "Invalid LLM response format. Expected 'choices[0].text' or similar. Got: {}",
1706 serde_json::to_string_pretty(&response_json)
1707 .unwrap_or_else(|_| "invalid JSON".to_string())
1708 ))
1709 })?
1710 .trim()
1711 .to_string();
1712
1713 if response_content.is_empty() {
1715 return Err(ImportanceAssessmentError::Stage3Failed(
1716 "LLM returned empty response".to_string(),
1717 ));
1718 }
1719
1720 if response_content.len() > 10000 {
1721 warn!("LLM response was truncated due to excessive length");
1723 return Ok(response_content[..10000].to_string());
1724 }
1725
1726 Ok(response_content)
1727 }
1728
1729 fn sanitize_llm_prompt(&self, prompt: &str) -> Result<String, ImportanceAssessmentError> {
1730 let dangerous_patterns = [
1732 "ignore previous instructions",
1733 "ignore all previous",
1734 "disregard previous",
1735 "forget previous",
1736 "new instructions:",
1737 "system:",
1738 "assistant:",
1739 "user:",
1740 "<|endoftext|>",
1741 "###",
1742 "---\n",
1743 ];
1744
1745 let lower_prompt = prompt.to_lowercase();
1746 for pattern in &dangerous_patterns {
1747 if lower_prompt.contains(pattern) {
1748 warn!("Potential prompt injection detected: {}", pattern);
1749 return Err(ImportanceAssessmentError::Stage3Failed(
1750 "Prompt contains potentially malicious content".to_string(),
1751 ));
1752 }
1753 }
1754
1755 let sanitized = prompt
1757 .replace('\0', "") .replace("\x1b", "") .chars()
1760 .filter(|c| c.is_ascii_graphic() || c.is_ascii_whitespace())
1761 .collect::<String>();
1762
1763 if sanitized.len() > 10000 {
1765 return Err(ImportanceAssessmentError::Stage3Failed(
1766 "Prompt too long after sanitization".to_string(),
1767 ));
1768 }
1769
1770 Ok(sanitized)
1771 }
1772
1773 fn parse_llm_response(&self, response: &str) -> Result<(f64, f64), ImportanceAssessmentError> {
1774 let lines: Vec<&str> = response.lines().collect();
1778 let mut importance_score = 0.5; let mut confidence = 0.7; for line in lines {
1782 let line = line.trim().to_lowercase();
1783
1784 if line.contains("importance:") || line.contains("score:") {
1786 if let Some(score_str) = line.split(':').nth(1) {
1787 if let Ok(score) = score_str.trim().parse::<f64>() {
1788 importance_score = score.clamp(0.0, 1.0);
1789 }
1790 }
1791 }
1792
1793 if line.contains("confidence:") {
1795 if let Some(conf_str) = line.split(':').nth(1) {
1796 if let Ok(conf) = conf_str.trim().parse::<f64>() {
1797 confidence = conf.clamp(0.0, 1.0);
1798 }
1799 }
1800 }
1801 }
1802
1803 Ok((importance_score, confidence))
1804 }
1805
1806 fn extract_explanation_from_stage3(&self, stage_result: &StageResult) -> Option<String> {
1807 if let StageDetails::Stage3 { llm_response, .. } = &stage_result.details {
1808 Some(llm_response.clone())
1809 } else {
1810 None
1811 }
1812 }
1813
1814 fn record_final_metrics(&self, result: &ImportanceAssessmentResult) {
1815 self.metrics
1816 .assessment_confidence
1817 .observe(result.confidence);
1818 self.metrics
1819 .final_importance_scores
1820 .observe(result.importance_score);
1821
1822 info!(
1823 "Importance assessment completed: score={:.3}, confidence={:.3}, stage={:?}, time={}ms",
1824 result.importance_score,
1825 result.confidence,
1826 result.final_stage,
1827 result.total_processing_time_ms
1828 );
1829 }
1830
1831 pub async fn get_statistics(&self) -> PipelineStatistics {
1833 let cache_size = self.embedding_cache.len();
1834 let eviction_count = self.embedding_cache.eviction_count();
1835
1836 PipelineStatistics {
1837 cache_size,
1838 stage1_executions: self.metrics.stage1_executions.get(),
1839 stage2_executions: self.metrics.stage2_executions.get(),
1840 stage3_executions: self.metrics.stage3_executions.get(),
1841 completed_at_stage1: self.metrics.completed_at_stage1.get(),
1842 completed_at_stage2: self.metrics.completed_at_stage2.get(),
1843 completed_at_stage3: self.metrics.completed_at_stage3.get(),
1844 cache_hits: self.metrics.embedding_cache_hits.get(),
1845 cache_misses: self.metrics.embedding_cache_misses.get(),
1846 cache_evictions: eviction_count,
1847 circuit_breaker_state: format!("{:?}", *self.circuit_breaker.state.read().await),
1848 llm_success_rate: {
1849 let successes = self.metrics.llm_call_successes.get() as f64;
1850 let failures = self.metrics.llm_call_failures.get() as f64;
1851 let total = successes + failures;
1852 if total > 0.0 {
1853 successes / total
1854 } else {
1855 1.0
1856 }
1857 },
1858 }
1859 }
1860
1861 pub async fn clear_cache(&self) {
1863 self.embedding_cache.clear().await;
1864 self.metrics.embedding_cache_size.set(0);
1865 info!("Embedding cache cleared");
1866 }
1867
1868 pub fn get_cache_hit_ratio(&self) -> f64 {
1870 let hits = self.metrics.embedding_cache_hits.get() as f64;
1871 let misses = self.metrics.embedding_cache_misses.get() as f64;
1872 let total = hits + misses;
1873 if total > 0.0 {
1874 hits / total
1875 } else {
1876 0.0
1877 }
1878 }
1879}
1880
1881#[derive(Debug, Clone, Serialize, Deserialize)]
1882pub struct PipelineStatistics {
1883 pub cache_size: usize,
1884 pub stage1_executions: u64,
1885 pub stage2_executions: u64,
1886 pub stage3_executions: u64,
1887 pub completed_at_stage1: u64,
1888 pub completed_at_stage2: u64,
1889 pub completed_at_stage3: u64,
1890 pub cache_hits: u64,
1891 pub cache_misses: u64,
1892 pub cache_evictions: u64,
1893 pub circuit_breaker_state: String,
1894 pub llm_success_rate: f64,
1895}
1896
1897impl Default for ImportanceAssessmentConfig {
1898 fn default() -> Self {
1899 Self {
1900 stage1: Stage1Config {
1901 confidence_threshold: 0.6,
1902 pattern_library: vec![
1903 ImportancePattern {
1904 name: "remember_command".to_string(),
1905 pattern: r"(?i)\b(remember|recall|don't forget)\b".to_string(),
1906 weight: 0.8,
1907 context_boosters: vec!["important".to_string(), "critical".to_string()],
1908 category: "memory".to_string(),
1909 },
1910 ImportancePattern {
1911 name: "preference_statement".to_string(),
1912 pattern: r"(?i)\b(prefer|like|want|choose)\b".to_string(),
1913 weight: 0.7,
1914 context_boosters: vec!["always".to_string(), "usually".to_string()],
1915 category: "preference".to_string(),
1916 },
1917 ImportancePattern {
1918 name: "decision_making".to_string(),
1919 pattern: r"(?i)\b(decide|decision|choose|select)\b".to_string(),
1920 weight: 0.75,
1921 context_boosters: vec!["final".to_string(), "official".to_string()],
1922 category: "decision".to_string(),
1923 },
1924 ImportancePattern {
1925 name: "correction".to_string(),
1926 pattern: r"(?i)\b(correct|fix|wrong|mistake|error)\b".to_string(),
1927 weight: 0.6,
1928 context_boosters: vec!["actually".to_string(), "should".to_string()],
1929 category: "correction".to_string(),
1930 },
1931 ImportancePattern {
1932 name: "importance_marker".to_string(),
1933 pattern: r"(?i)\b(important|critical|crucial|vital|essential)\b".to_string(),
1934 weight: 0.9,
1935 context_boosters: vec!["very".to_string(), "extremely".to_string()],
1936 category: "importance".to_string(),
1937 },
1938 ],
1939 max_processing_time_ms: 10,
1940 },
1941 stage2: Stage2Config {
1942 confidence_threshold: 0.7,
1943 max_processing_time_ms: 100,
1944 embedding_cache_ttl_seconds: 3600, embedding_cache_max_size: 10000, cache_eviction_threshold: 0.8, similarity_threshold: 0.7,
1948 reference_embeddings: vec![], },
1950 stage3: Stage3Config {
1951 max_processing_time_ms: 1000,
1952 llm_endpoint: "http://localhost:8080/generate".to_string(),
1953 max_concurrent_requests: 5,
1954 prompt_template: "Assess the importance of this content on a scale of 0.0 to 1.0. Consider context, user intent, and actionability.\n\nContent: {content}\n\nProvide your assessment as:\nImportance: [score]\nConfidence: [confidence]\nReasoning: [explanation]".to_string(),
1955 target_usage_percentage: 20.0,
1956 },
1957 circuit_breaker: CircuitBreakerConfig {
1958 failure_threshold: 5,
1959 failure_window_seconds: 60,
1960 recovery_timeout_seconds: 30,
1961 minimum_requests: 3,
1962 },
1963 performance: PerformanceConfig {
1964 stage1_target_ms: 10,
1965 stage2_target_ms: 100,
1966 stage3_target_ms: 1000,
1967 },
1968 }
1969 }
1970}