codex_memory/memory/
silent_harvester.rs

1use crate::embedding::EmbeddingService;
2use crate::memory::{ImportanceAssessmentPipeline, Memory, MemoryRepository, MemoryTier};
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use futures::future;
6use prometheus::{Counter, Histogram, Registry};
7use regex::Regex;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use thiserror::Error;
14use tokio::sync::{Mutex, RwLock, Semaphore};
15use tokio::time::{interval, timeout};
16use tracing::{debug, error, info, trace, warn};
17
18#[derive(Debug, Error)]
19pub enum HarvesterError {
20    #[error("Pattern extraction failed: {0}")]
21    ExtractionFailed(String),
22
23    #[error("Deduplication failed: {0}")]
24    DeduplicationFailed(String),
25
26    #[error("Batch processing failed: {0}")]
27    BatchProcessingFailed(String),
28
29    #[error("Repository operation failed: {0}")]
30    RepositoryFailed(#[from] crate::memory::error::MemoryError),
31
32    #[error("Importance assessment failed: {0}")]
33    ImportanceAssessmentFailed(String),
34
35    #[error("Configuration error: {0}")]
36    Configuration(String),
37
38    #[error("Background task failed: {0}")]
39    BackgroundTaskFailed(String),
40
41    #[error("Circuit breaker open: {0}")]
42    CircuitBreakerOpen(String),
43
44    #[error("Resource exhaustion: {0}")]
45    ResourceExhaustion(String),
46
47    #[error("Backpressure applied: {0}")]
48    BackpressureApplied(String),
49}
50
51/// Types of memory patterns that can be extracted
52#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
53pub enum MemoryPatternType {
54    Preference,
55    Fact,
56    Decision,
57    Correction,
58    Emotion,
59    Goal,
60    Relationship,
61    Skill,
62}
63
64/// A detected memory pattern with confidence score
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ExtractedMemoryPattern {
67    pub pattern_type: MemoryPatternType,
68    pub content: String,
69    pub confidence: f64,
70    pub extracted_at: DateTime<Utc>,
71    pub source_message_id: Option<String>,
72    pub context: String,
73    pub metadata: HashMap<String, serde_json::Value>,
74}
75
76/// Configuration for the silent harvester
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct SilentHarvesterConfig {
79    /// Auto-store threshold (default: 0.7)
80    pub confidence_threshold: f64,
81
82    /// Deduplication similarity threshold (default: 0.85)
83    pub deduplication_threshold: f64,
84
85    /// Trigger every N messages (default: 10)
86    pub message_trigger_count: usize,
87
88    /// Trigger every N minutes (default: 5)
89    pub time_trigger_minutes: u64,
90
91    /// Maximum batch size for processing (default: 50)
92    pub max_batch_size: usize,
93
94    /// Performance target: max processing time in seconds (default: 2)
95    pub max_processing_time_seconds: u64,
96
97    /// Enable silent mode (no user feedback)
98    pub silent_mode: bool,
99
100    /// Pattern extraction configuration
101    pub pattern_config: PatternExtractionConfig,
102
103    /// Enable graceful degradation when errors occur
104    pub graceful_degradation: bool,
105
106    /// Maximum retries for failed operations
107    pub max_retries: u32,
108
109    /// Enable fallback storage when primary storage fails
110    pub enable_fallback_storage: bool,
111}
112
113impl Default for SilentHarvesterConfig {
114    fn default() -> Self {
115        Self {
116            confidence_threshold: 0.7,
117            deduplication_threshold: 0.85,
118            message_trigger_count: 10,
119            time_trigger_minutes: 5,
120            max_batch_size: 50,
121            max_processing_time_seconds: 2,
122            silent_mode: true,
123            pattern_config: PatternExtractionConfig::default(),
124            graceful_degradation: true,
125            max_retries: 3,
126            enable_fallback_storage: true,
127        }
128    }
129}
130
131/// Configuration for pattern extraction
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct PatternExtractionConfig {
134    pub preference_patterns: Vec<String>,
135    pub fact_patterns: Vec<String>,
136    pub decision_patterns: Vec<String>,
137    pub correction_patterns: Vec<String>,
138    pub emotion_patterns: Vec<String>,
139    pub goal_patterns: Vec<String>,
140    pub relationship_patterns: Vec<String>,
141    pub skill_patterns: Vec<String>,
142}
143
144impl Default for PatternExtractionConfig {
145    fn default() -> Self {
146        Self {
147            preference_patterns: vec![
148                r"(?i)I prefer|I like|I enjoy|I love|I hate|I dislike".to_string(),
149                r"(?i)my favorite|I'd rather|I tend to|I usually".to_string(),
150                r"(?i)I always|I never|I often|I rarely".to_string(),
151            ],
152            fact_patterns: vec![
153                r"(?i)I am|I work|I live|I have|my name is".to_string(),
154                r"(?i)I was born|I graduated|I studied|I learned".to_string(),
155                r"(?i)the fact is|it's true that|I know that".to_string(),
156            ],
157            decision_patterns: vec![
158                r"(?i)I decided|I chose|I will|I'm going to".to_string(),
159                r"(?i)I've decided|my decision|I'll go with".to_string(),
160                r"(?i)I think we should|let's go with|I recommend".to_string(),
161            ],
162            correction_patterns: vec![
163                r"(?i)actually|correction|I meant|let me clarify".to_string(),
164                r"(?i)that's wrong|that's incorrect|I misspoke".to_string(),
165                r"(?i)sorry, I meant|to be clear|what I should have said".to_string(),
166            ],
167            emotion_patterns: vec![
168                r"(?i)I feel|I'm excited|I'm worried|I'm happy".to_string(),
169                r"(?i)I'm frustrated|I'm confused|I'm concerned".to_string(),
170                r"(?i)this makes me|I'm feeling|emotionally".to_string(),
171            ],
172            goal_patterns: vec![
173                r"(?i)I want to|I hope to|my goal|I'm trying to".to_string(),
174                r"(?i)I'm working toward|I aim to|I plan to".to_string(),
175                r"(?i)I need to|I should|I must".to_string(),
176            ],
177            relationship_patterns: vec![
178                r"(?i)my friend|my colleague|my family|my partner".to_string(),
179                r"(?i)I work with|I know someone|my relationship".to_string(),
180                r"(?i)my team|my boss|my client".to_string(),
181            ],
182            skill_patterns: vec![
183                r"(?i)I can|I'm good at|I know how to|I'm skilled".to_string(),
184                r"(?i)I'm learning|I'm studying|I practice".to_string(),
185                r"(?i)I'm experienced|I specialize|my expertise".to_string(),
186            ],
187        }
188    }
189}
190
191/// Metrics for tracking harvester performance
192#[derive(Debug)]
193pub struct HarvesterMetrics {
194    pub messages_processed: Arc<AtomicU64>,
195    pub patterns_extracted: Arc<AtomicU64>,
196    pub memories_stored: Arc<AtomicU64>,
197    pub duplicates_filtered: Arc<AtomicU64>,
198    pub extraction_time_ms: Arc<AtomicU64>,
199    pub batch_processing_time_ms: Arc<AtomicU64>,
200    pub last_harvest_time: Arc<Mutex<Option<DateTime<Utc>>>>,
201
202    // Prometheus metrics
203    pub extraction_counter: Counter,
204    pub storage_counter: Counter,
205    pub deduplication_counter: Counter,
206    pub processing_time_histogram: Histogram,
207    pub batch_size_histogram: Histogram,
208    pub confidence_histogram: Histogram,
209}
210
211impl HarvesterMetrics {
212    pub fn new(registry: &Registry) -> Result<Self> {
213        let extraction_counter = Counter::new(
214            "harvester_patterns_extracted_total",
215            "Total number of patterns extracted",
216        )?;
217        registry.register(Box::new(extraction_counter.clone()))?;
218
219        let storage_counter = Counter::new(
220            "harvester_memories_stored_total",
221            "Total number of memories stored",
222        )?;
223        registry.register(Box::new(storage_counter.clone()))?;
224
225        let deduplication_counter = Counter::new(
226            "harvester_duplicates_filtered_total",
227            "Total number of duplicates filtered out",
228        )?;
229        registry.register(Box::new(deduplication_counter.clone()))?;
230
231        let processing_time_histogram = Histogram::with_opts(
232            prometheus::HistogramOpts::new(
233                "harvester_processing_duration_seconds",
234                "Time spent processing messages",
235            )
236            .buckets(vec![0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0]),
237        )?;
238        registry.register(Box::new(processing_time_histogram.clone()))?;
239
240        let batch_size_histogram = Histogram::with_opts(
241            prometheus::HistogramOpts::new("harvester_batch_size", "Size of processing batches")
242                .buckets(vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0]),
243        )?;
244        registry.register(Box::new(batch_size_histogram.clone()))?;
245
246        let confidence_histogram = Histogram::with_opts(
247            prometheus::HistogramOpts::new(
248                "harvester_pattern_confidence",
249                "Confidence scores of extracted patterns",
250            )
251            .buckets(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]),
252        )?;
253        registry.register(Box::new(confidence_histogram.clone()))?;
254
255        Ok(Self {
256            messages_processed: Arc::new(AtomicU64::new(0)),
257            patterns_extracted: Arc::new(AtomicU64::new(0)),
258            memories_stored: Arc::new(AtomicU64::new(0)),
259            duplicates_filtered: Arc::new(AtomicU64::new(0)),
260            extraction_time_ms: Arc::new(AtomicU64::new(0)),
261            batch_processing_time_ms: Arc::new(AtomicU64::new(0)),
262            last_harvest_time: Arc::new(Mutex::new(None)),
263            extraction_counter,
264            storage_counter,
265            deduplication_counter,
266            processing_time_histogram,
267            batch_size_histogram,
268            confidence_histogram,
269        })
270    }
271
272    pub async fn record_harvest(&self, patterns_count: u64, processing_time_ms: u64) {
273        self.patterns_extracted
274            .fetch_add(patterns_count, Ordering::Relaxed);
275        self.extraction_time_ms
276            .fetch_add(processing_time_ms, Ordering::Relaxed);
277        *self.last_harvest_time.lock().await = Some(Utc::now());
278
279        self.extraction_counter.inc_by(patterns_count as f64);
280        self.processing_time_histogram
281            .observe(processing_time_ms as f64 / 1000.0);
282    }
283
284    pub async fn record_storage(&self, stored_count: u64, duplicates_count: u64) {
285        self.memories_stored
286            .fetch_add(stored_count, Ordering::Relaxed);
287        self.duplicates_filtered
288            .fetch_add(duplicates_count, Ordering::Relaxed);
289
290        self.storage_counter.inc_by(stored_count as f64);
291        self.deduplication_counter.inc_by(duplicates_count as f64);
292    }
293
294    pub fn record_batch_processing(&self, batch_size: usize, processing_time_ms: u64) {
295        self.batch_processing_time_ms
296            .fetch_add(processing_time_ms, Ordering::Relaxed);
297        self.batch_size_histogram.observe(batch_size as f64);
298    }
299
300    pub fn record_pattern_confidence(&self, confidence: f64) {
301        self.confidence_histogram.observe(confidence);
302    }
303}
304
305/// Pattern matcher for extracting specific types of memories
306pub struct PatternMatcher {
307    preference_regexes: Vec<Regex>,
308    fact_regexes: Vec<Regex>,
309    decision_regexes: Vec<Regex>,
310    correction_regexes: Vec<Regex>,
311    emotion_regexes: Vec<Regex>,
312    goal_regexes: Vec<Regex>,
313    relationship_regexes: Vec<Regex>,
314    skill_regexes: Vec<Regex>,
315}
316
317impl PatternMatcher {
318    pub fn new(config: &PatternExtractionConfig) -> Result<Self> {
319        let compile_patterns = |patterns: &[String]| -> Result<Vec<Regex>> {
320            patterns
321                .iter()
322                .map(|p| Regex::new(p).context("Failed to compile regex pattern"))
323                .collect()
324        };
325
326        Ok(Self {
327            preference_regexes: compile_patterns(&config.preference_patterns)?,
328            fact_regexes: compile_patterns(&config.fact_patterns)?,
329            decision_regexes: compile_patterns(&config.decision_patterns)?,
330            correction_regexes: compile_patterns(&config.correction_patterns)?,
331            emotion_regexes: compile_patterns(&config.emotion_patterns)?,
332            goal_regexes: compile_patterns(&config.goal_patterns)?,
333            relationship_regexes: compile_patterns(&config.relationship_patterns)?,
334            skill_regexes: compile_patterns(&config.skill_patterns)?,
335        })
336    }
337
338    /// Extract all patterns from a message
339    pub fn extract_patterns(&self, message: &str, context: &str) -> Vec<ExtractedMemoryPattern> {
340        let mut patterns = Vec::new();
341        let extracted_at = Utc::now();
342
343        // Extract each pattern type
344        patterns.extend(self.extract_pattern_type(
345            message,
346            context,
347            MemoryPatternType::Preference,
348            &self.preference_regexes,
349            extracted_at,
350        ));
351
352        patterns.extend(self.extract_pattern_type(
353            message,
354            context,
355            MemoryPatternType::Fact,
356            &self.fact_regexes,
357            extracted_at,
358        ));
359
360        patterns.extend(self.extract_pattern_type(
361            message,
362            context,
363            MemoryPatternType::Decision,
364            &self.decision_regexes,
365            extracted_at,
366        ));
367
368        patterns.extend(self.extract_pattern_type(
369            message,
370            context,
371            MemoryPatternType::Correction,
372            &self.correction_regexes,
373            extracted_at,
374        ));
375
376        patterns.extend(self.extract_pattern_type(
377            message,
378            context,
379            MemoryPatternType::Emotion,
380            &self.emotion_regexes,
381            extracted_at,
382        ));
383
384        patterns.extend(self.extract_pattern_type(
385            message,
386            context,
387            MemoryPatternType::Goal,
388            &self.goal_regexes,
389            extracted_at,
390        ));
391
392        patterns.extend(self.extract_pattern_type(
393            message,
394            context,
395            MemoryPatternType::Relationship,
396            &self.relationship_regexes,
397            extracted_at,
398        ));
399
400        patterns.extend(self.extract_pattern_type(
401            message,
402            context,
403            MemoryPatternType::Skill,
404            &self.skill_regexes,
405            extracted_at,
406        ));
407
408        patterns
409    }
410
411    fn extract_pattern_type(
412        &self,
413        message: &str,
414        context: &str,
415        pattern_type: MemoryPatternType,
416        regexes: &[Regex],
417        extracted_at: DateTime<Utc>,
418    ) -> Vec<ExtractedMemoryPattern> {
419        let mut patterns = Vec::new();
420
421        for regex in regexes {
422            for mat in regex.find_iter(message) {
423                // Extract the sentence containing the match
424                let content = self.extract_sentence_with_match(message, mat.start(), mat.end());
425
426                // Calculate confidence based on pattern strength and context
427                let confidence =
428                    self.calculate_pattern_confidence(&pattern_type, &content, context);
429
430                let mut metadata = HashMap::new();
431                metadata.insert(
432                    "match_start".to_string(),
433                    serde_json::Value::Number(mat.start().into()),
434                );
435                metadata.insert(
436                    "match_end".to_string(),
437                    serde_json::Value::Number(mat.end().into()),
438                );
439                metadata.insert(
440                    "matched_text".to_string(),
441                    serde_json::Value::String(mat.as_str().to_string()),
442                );
443
444                patterns.push(ExtractedMemoryPattern {
445                    pattern_type: pattern_type.clone(),
446                    content,
447                    confidence,
448                    extracted_at,
449                    source_message_id: None, // Will be set by caller
450                    context: context.to_string(),
451                    metadata,
452                });
453            }
454        }
455
456        patterns
457    }
458
459    fn extract_sentence_with_match(&self, text: &str, start: usize, end: usize) -> String {
460        // Find sentence boundaries around the match
461        let before = &text[..start];
462        let after = &text[end..];
463
464        // Find start of sentence (look for . ! ? or start of text)
465        let sentence_start = before
466            .rfind(['.', '!', '?'])
467            .map(|pos| pos + 1)
468            .unwrap_or(0);
469
470        // Find end of sentence (look for . ! ? or end of text)
471        let sentence_end = after
472            .find(['.', '!', '?'])
473            .map(|pos| end + pos + 1)
474            .unwrap_or(text.len());
475
476        text[sentence_start..sentence_end].trim().to_string()
477    }
478
479    fn calculate_pattern_confidence(
480        &self,
481        pattern_type: &MemoryPatternType,
482        content: &str,
483        context: &str,
484    ) -> f64 {
485        // Research-backed confidence calculation using multiple signals
486        // Base confidence starts lower to be more conservative
487        let mut confidence: f64 = 0.3;
488
489        // 1. Pattern type specificity (based on linguistic certainty markers)
490        let type_boost = match pattern_type {
491            MemoryPatternType::Correction => 0.3, // Corrections are highly reliable
492            MemoryPatternType::Decision => 0.25,  // Decisions show clear intent
493            MemoryPatternType::Fact => 0.2,       // Facts are generally reliable
494            MemoryPatternType::Goal => 0.18,      // Goals show clear intent
495            MemoryPatternType::Skill => 0.15,     // Skills are moderately reliable
496            MemoryPatternType::Preference => 0.12, // Preferences can be temporary
497            MemoryPatternType::Relationship => 0.1, // Relationships context-dependent
498            MemoryPatternType::Emotion => 0.08,   // Emotions are ephemeral
499        };
500        confidence += type_boost;
501
502        // 2. Linguistic certainty markers (research-backed indicators)
503        let certainty_markers = [
504            ("definitely", 0.15),
505            ("certainly", 0.15),
506            ("absolutely", 0.15),
507            ("always", 0.12),
508            ("never", 0.12),
509            ("completely", 0.12),
510            ("strongly", 0.1),
511            ("really", 0.08),
512            ("very", 0.06),
513            ("quite", 0.04),
514            ("somewhat", -0.05),
515            ("maybe", -0.1),
516            ("perhaps", -0.08),
517            ("possibly", -0.08),
518            ("might", -0.06),
519        ];
520
521        for (marker, boost) in &certainty_markers {
522            if content.to_lowercase().contains(marker) {
523                confidence += boost;
524                break; // Only apply the first marker found
525            }
526        }
527
528        // 3. Personal agency indicators (research shows first-person statements more reliable)
529        let personal_indicators = content.matches('I').count() as f64;
530        let my_indicators = content.to_lowercase().matches("my ").count() as f64;
531        let me_indicators = content.to_lowercase().matches("me ").count() as f64;
532
533        let personal_score =
534            (personal_indicators * 0.03 + my_indicators * 0.04 + me_indicators * 0.02).min(0.15);
535        confidence += personal_score;
536
537        // 4. Content length and informativeness (optimal range based on memory research)
538        let length_score = match content.len() {
539            0..=10 => -0.3,   // Too short, likely incomplete
540            11..=30 => -0.1,  // Short but potentially valid
541            31..=80 => 0.1,   // Good length for memory patterns
542            81..=150 => 0.15, // Optimal range for detailed patterns
543            151..=250 => 0.1, // Still good, getting longer
544            251..=400 => 0.0, // Neutral, might be too verbose
545            _ => -0.15,       // Too long, likely contains noise
546        };
547        confidence += length_score;
548
549        // 5. Context relevance (simple heuristic)
550        if !context.is_empty() && content.len() > context.len() / 10 {
551            confidence += 0.05; // Bonus for substantial content relative to context
552        }
553
554        // 6. Sentence structure quality (basic grammar indicators)
555        let word_count = content.split_whitespace().count() as f64;
556        let sentence_count = content.matches(['.', '!', '?']).count() as f64;
557
558        if word_count > 0.0 {
559            let avg_sentence_length = word_count / sentence_count.max(1.0);
560            // Optimal sentence length for memory patterns is 8-20 words
561            let structure_score = match avg_sentence_length as usize {
562                1..=3 => -0.05,  // Too terse
563                4..=7 => 0.0,    // Short but acceptable
564                8..=20 => 0.08,  // Optimal range
565                21..=35 => 0.02, // Getting long
566                _ => -0.05,      // Too complex
567            };
568            confidence += structure_score;
569        }
570
571        // 7. Redundancy penalty (repeated words suggest lower quality)
572        let lowercase_content = content.to_lowercase();
573        let words: Vec<&str> = lowercase_content.split_whitespace().collect();
574        if words.len() > 5 {
575            let unique_words: std::collections::HashSet<_> = words.iter().collect();
576            let uniqueness_ratio = unique_words.len() as f64 / words.len() as f64;
577
578            // Penalize low uniqueness (high repetition)
579            if uniqueness_ratio < 0.7 {
580                confidence -= 0.1;
581            } else if uniqueness_ratio > 0.9 {
582                confidence += 0.05; // Bonus for diverse vocabulary
583            }
584        }
585
586        // Ensure confidence is within valid range and apply final calibration
587        confidence.clamp(0.1, 0.95) // Never completely certain or uncertain
588    }
589}
590
591/// Circuit breaker states
592#[derive(Debug, Clone, PartialEq)]
593enum CircuitBreakerState {
594    Closed,
595    Open,
596    HalfOpen,
597}
598
599/// Circuit breaker for embedding service
600#[derive(Debug)]
601struct CircuitBreaker {
602    state: Arc<RwLock<CircuitBreakerState>>,
603    failure_count: Arc<AtomicU64>,
604    last_failure_time: Arc<RwLock<Option<Instant>>>,
605    failure_threshold: u64,
606    timeout: Duration,
607    half_open_max_calls: u64,
608    half_open_calls: Arc<AtomicU64>,
609}
610
611impl CircuitBreaker {
612    fn new(failure_threshold: u64, timeout: Duration) -> Self {
613        Self {
614            state: Arc::new(RwLock::new(CircuitBreakerState::Closed)),
615            failure_count: Arc::new(AtomicU64::new(0)),
616            last_failure_time: Arc::new(RwLock::new(None)),
617            failure_threshold,
618            timeout,
619            half_open_max_calls: 3,
620            half_open_calls: Arc::new(AtomicU64::new(0)),
621        }
622    }
623
624    async fn call<T, F, Fut>(&self, f: F) -> Result<T>
625    where
626        F: FnOnce() -> Fut,
627        Fut: std::future::Future<Output = Result<T>>,
628    {
629        match *self.state.read().await {
630            CircuitBreakerState::Open => {
631                let last_failure = *self.last_failure_time.read().await;
632                if let Some(failure_time) = last_failure {
633                    if failure_time.elapsed() > self.timeout {
634                        *self.state.write().await = CircuitBreakerState::HalfOpen;
635                        self.half_open_calls.store(0, Ordering::Relaxed);
636                    } else {
637                        return Err(HarvesterError::CircuitBreakerOpen(
638                            "Embedding service circuit breaker is open".to_string(),
639                        )
640                        .into());
641                    }
642                } else {
643                    return Err(HarvesterError::CircuitBreakerOpen(
644                        "Embedding service circuit breaker is open".to_string(),
645                    )
646                    .into());
647                }
648            }
649            CircuitBreakerState::HalfOpen => {
650                if self.half_open_calls.load(Ordering::Relaxed) >= self.half_open_max_calls {
651                    return Err(HarvesterError::CircuitBreakerOpen(
652                        "Half-open circuit breaker call limit exceeded".to_string(),
653                    )
654                    .into());
655                }
656                self.half_open_calls.fetch_add(1, Ordering::Relaxed);
657            }
658            CircuitBreakerState::Closed => {}
659        }
660
661        match f().await {
662            Ok(result) => {
663                self.on_success().await;
664                Ok(result)
665            }
666            Err(e) => {
667                self.on_failure().await;
668                Err(e)
669            }
670        }
671    }
672
673    async fn on_success(&self) {
674        let current_state = self.state.read().await.clone();
675        match current_state {
676            CircuitBreakerState::HalfOpen => {
677                *self.state.write().await = CircuitBreakerState::Closed;
678                self.failure_count.store(0, Ordering::Relaxed);
679            }
680            _ => {
681                self.failure_count.store(0, Ordering::Relaxed);
682            }
683        }
684    }
685
686    async fn on_failure(&self) {
687        let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
688
689        if failures >= self.failure_threshold {
690            *self.state.write().await = CircuitBreakerState::Open;
691            *self.last_failure_time.write().await = Some(Instant::now());
692            warn!("Circuit breaker opened after {} failures", failures);
693        }
694    }
695}
696
697/// Service for deduplicating extracted patterns with bounded cache and circuit breaker
698pub struct DeduplicationService {
699    threshold: f64,
700    embedding_service: Arc<dyn EmbeddingService>,
701    recent_embeddings: Arc<RwLock<VecDeque<(String, Vec<f32>, Instant)>>>,
702    max_cache_size: usize,
703    cache_cleanup_threshold: f64,
704    circuit_breaker: CircuitBreaker,
705    bypass_on_failure: Arc<AtomicBool>,
706    cache_ttl: Duration,
707}
708
709impl DeduplicationService {
710    pub fn new(
711        threshold: f64,
712        embedding_service: Arc<dyn EmbeddingService>,
713        max_cache_size: usize,
714    ) -> Self {
715        Self {
716            threshold,
717            embedding_service,
718            recent_embeddings: Arc::new(RwLock::new(VecDeque::new())),
719            max_cache_size,
720            cache_cleanup_threshold: 0.8, // Start cleanup at 80% capacity
721            circuit_breaker: CircuitBreaker::new(5, Duration::from_secs(60)), // 5 failures, 60s timeout
722            bypass_on_failure: Arc::new(AtomicBool::new(false)),
723            cache_ttl: Duration::from_secs(3600), // 1 hour TTL for cache entries
724        }
725    }
726
727    /// Check if a pattern is a duplicate of recent patterns
728    pub async fn is_duplicate(&self, pattern: &ExtractedMemoryPattern) -> Result<bool> {
729        // First clean up expired entries to prevent unbounded growth
730        self.cleanup_expired_entries().await;
731
732        // Check if we should bypass deduplication due to repeated failures
733        if self.bypass_on_failure.load(Ordering::Relaxed) {
734            warn!("Bypassing deduplication due to embedding service failures");
735            return Ok(false);
736        }
737
738        // Generate embedding with circuit breaker protection
739        let embedding = match self
740            .circuit_breaker
741            .call(|| async {
742                self.embedding_service
743                    .generate_embedding(&pattern.content)
744                    .await
745                    .context("Failed to generate embedding for deduplication")
746            })
747            .await
748        {
749            Ok(embedding) => {
750                // Reset bypass flag on successful embedding generation
751                self.bypass_on_failure.store(false, Ordering::Relaxed);
752                embedding
753            }
754            Err(e) => {
755                // Set bypass flag to prevent further deduplication failures
756                self.bypass_on_failure.store(true, Ordering::Relaxed);
757                warn!(
758                    "Embedding generation failed, bypassing deduplication: {}",
759                    e
760                );
761                return Ok(false); // Don't treat as duplicate when we can't check
762            }
763        };
764
765        let now = Instant::now();
766        let recent_embeddings = self.recent_embeddings.read().await;
767
768        // Check for duplicates among valid (non-expired) entries
769        for (_, cached_embedding, timestamp) in recent_embeddings.iter() {
770            if now.duration_since(*timestamp) <= self.cache_ttl {
771                let similarity = self.cosine_similarity(&embedding, cached_embedding);
772                if similarity >= self.threshold {
773                    trace!(
774                        "Duplicate pattern detected with similarity {:.3}: '{}'",
775                        similarity,
776                        pattern.content.chars().take(50).collect::<String>()
777                    );
778                    return Ok(true);
779                }
780            }
781        }
782
783        drop(recent_embeddings);
784
785        // Add to cache with timestamp
786        let mut cache = self.recent_embeddings.write().await;
787        cache.push_back((pattern.content.clone(), embedding, now));
788
789        // Maintain cache size with aggressive cleanup when approaching limit
790        let current_size = cache.len();
791        let cleanup_threshold_size =
792            (self.max_cache_size as f64 * self.cache_cleanup_threshold) as usize;
793
794        if current_size >= cleanup_threshold_size {
795            self.aggressive_cache_cleanup(&mut cache, now).await;
796        }
797
798        // Final size enforcement - remove oldest entries if still over limit
799        while cache.len() > self.max_cache_size {
800            cache.pop_front();
801        }
802
803        Ok(false)
804    }
805
806    /// Clean up expired cache entries
807    async fn cleanup_expired_entries(&self) {
808        let mut cache = self.recent_embeddings.write().await;
809        let now = Instant::now();
810        let initial_size = cache.len();
811
812        // Remove expired entries from the front (oldest entries)
813        while let Some((_, _, timestamp)) = cache.front() {
814            if now.duration_since(*timestamp) > self.cache_ttl {
815                cache.pop_front();
816            } else {
817                break; // Since entries are ordered by time, we can stop here
818            }
819        }
820
821        let cleaned_count = initial_size - cache.len();
822        if cleaned_count > 0 {
823            trace!("Cleaned up {} expired cache entries", cleaned_count);
824        }
825    }
826
827    /// Aggressive cache cleanup when approaching size limit
828    async fn aggressive_cache_cleanup(
829        &self,
830        cache: &mut VecDeque<(String, Vec<f32>, Instant)>,
831        _now: Instant,
832    ) {
833        let initial_size = cache.len();
834
835        // Remove oldest 25% of entries to create breathing room
836        let removal_count = cache.len() / 4;
837        for _ in 0..removal_count {
838            cache.pop_front();
839        }
840
841        let removed_count = initial_size - cache.len();
842        if removed_count > 0 {
843            debug!("Aggressive cache cleanup removed {} entries", removed_count);
844        }
845    }
846
847    fn cosine_similarity(&self, a: &[f32], b: &[f32]) -> f64 {
848        if a.len() != b.len() {
849            return 0.0;
850        }
851
852        let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
853        let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
854        let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
855
856        if norm_a == 0.0 || norm_b == 0.0 {
857            return 0.0;
858        }
859
860        (dot_product / (norm_a * norm_b)) as f64
861    }
862}
863
864/// Message queue for batch processing
865#[derive(Debug, Clone)]
866pub struct ConversationMessage {
867    pub id: String,
868    pub content: String,
869    pub timestamp: DateTime<Utc>,
870    pub role: String, // "user" or "assistant"
871    pub context: String,
872}
873
874/// Bounded message queue with backpressure
875struct BoundedMessageQueue {
876    queue: VecDeque<ConversationMessage>,
877    max_size: usize,
878    max_memory_mb: usize,
879    current_memory_bytes: usize,
880}
881
882impl BoundedMessageQueue {
883    fn new(max_size: usize, max_memory_mb: usize) -> Self {
884        Self {
885            queue: VecDeque::new(),
886            max_size,
887            max_memory_mb,
888            current_memory_bytes: 0,
889        }
890    }
891
892    fn try_push(&mut self, message: ConversationMessage) -> Result<()> {
893        let message_size = self.estimate_message_size(&message);
894        let new_memory_bytes = self.current_memory_bytes + message_size;
895        let max_memory_bytes = self.max_memory_mb * 1024 * 1024;
896
897        // Check size and memory limits
898        if self.queue.len() >= self.max_size {
899            return Err(HarvesterError::BackpressureApplied(format!(
900                "Message queue size limit exceeded: {}",
901                self.max_size
902            ))
903            .into());
904        }
905
906        if new_memory_bytes > max_memory_bytes {
907            return Err(HarvesterError::BackpressureApplied(format!(
908                "Message queue memory limit exceeded: {} MB",
909                self.max_memory_mb
910            ))
911            .into());
912        }
913
914        self.current_memory_bytes = new_memory_bytes;
915        self.queue.push_back(message);
916        Ok(())
917    }
918
919    fn drain_all(&mut self) -> Vec<ConversationMessage> {
920        self.current_memory_bytes = 0;
921        self.queue.drain(..).collect()
922    }
923
924    fn len(&self) -> usize {
925        self.queue.len()
926    }
927
928    fn estimate_message_size(&self, message: &ConversationMessage) -> usize {
929        // Rough estimate: ID + content + context + timestamp + metadata
930        message.id.len() + message.content.len() + message.context.len() + message.role.len() + 100
931    }
932}
933
934/// Core harvesting engine
935pub struct HarvestingEngine {
936    config: SilentHarvesterConfig,
937    pattern_matcher: PatternMatcher,
938    deduplication_service: Arc<DeduplicationService>, // Shared across all tasks
939    repository: Arc<MemoryRepository>,
940    importance_pipeline: Arc<ImportanceAssessmentPipeline>,
941    metrics: Arc<HarvesterMetrics>,
942    message_queue: Arc<Mutex<BoundedMessageQueue>>,
943    last_harvest_time: Arc<Mutex<Option<Instant>>>,
944    processing_semaphore: Arc<Semaphore>, // Limit concurrent processing
945}
946
947impl HarvestingEngine {
948    pub fn new(
949        config: SilentHarvesterConfig,
950        repository: Arc<MemoryRepository>,
951        importance_pipeline: Arc<ImportanceAssessmentPipeline>,
952        embedding_service: Arc<dyn EmbeddingService>,
953        metrics: Arc<HarvesterMetrics>,
954    ) -> Result<Self> {
955        let pattern_matcher = PatternMatcher::new(&config.pattern_config)?;
956        let deduplication_service = Arc::new(DeduplicationService::new(
957            config.deduplication_threshold,
958            embedding_service,
959            1000, // Cache size
960        ));
961
962        // Bounded message queue with size and memory limits
963        let message_queue = BoundedMessageQueue::new(
964            config.max_batch_size * 5, // 5x batch size for queuing
965            50,                        // 50MB memory limit
966        );
967
968        Ok(Self {
969            config,
970            pattern_matcher,
971            deduplication_service,
972            repository,
973            importance_pipeline,
974            metrics,
975            message_queue: Arc::new(Mutex::new(message_queue)),
976            last_harvest_time: Arc::new(Mutex::new(None)),
977            processing_semaphore: Arc::new(Semaphore::new(2)), // Allow max 2 concurrent processing tasks
978        })
979    }
980
981    /// Add a message to the processing queue with backpressure
982    pub async fn queue_message(&self, message: ConversationMessage) -> Result<()> {
983        let mut queue = self.message_queue.lock().await;
984
985        // Try to add message with backpressure handling
986        match queue.try_push(message) {
987            Ok(()) => {}
988            Err(e) => {
989                // Apply backpressure by forcing immediate processing
990                warn!("Queue limit reached, forcing immediate processing: {}", e);
991                let messages = queue.drain_all();
992                drop(queue);
993
994                // Process immediately with higher priority (synchronously to avoid lifetime issues)
995                if !messages.is_empty() {
996                    if let Ok(_permit) = self.processing_semaphore.try_acquire() {
997                        self.process_message_batch(messages)
998                            .await
999                            .unwrap_or_else(|e| {
1000                                error!("Forced harvest processing failed: {}", e);
1001                            });
1002                    }
1003                }
1004                return Err(e);
1005            }
1006        }
1007
1008        // Check if we should trigger processing
1009        let should_process =
1010            queue.len() >= self.config.message_trigger_count || self.should_trigger_by_time().await;
1011
1012        if should_process {
1013            // Get messages and clear queue
1014            let messages = queue.drain_all();
1015            drop(queue);
1016
1017            // Process in background with semaphore protection
1018            if !messages.is_empty() {
1019                match self.processing_semaphore.clone().try_acquire_owned() {
1020                    Ok(permit) => {
1021                        // Clone needed data before spawn
1022                        let config = self.config.clone();
1023                        let dedup_service = self.deduplication_service.clone();
1024                        let repository = self.repository.clone();
1025                        let importance_pipeline = self.importance_pipeline.clone();
1026                        let metrics = self.metrics.clone();
1027                        let last_harvest_time = self.last_harvest_time.clone();
1028                        let pattern_config = self.config.pattern_config.clone();
1029
1030                        tokio::spawn(async move {
1031                            let pattern_matcher = match PatternMatcher::new(&pattern_config) {
1032                                Ok(pm) => pm,
1033                                Err(e) => {
1034                                    error!("Failed to create pattern matcher: {}", e);
1035                                    return;
1036                                }
1037                            };
1038
1039                            let engine_handle = HarvestingEngineHandle {
1040                                config,
1041                                pattern_matcher,
1042                                deduplication_service: dedup_service,
1043                                repository,
1044                                importance_pipeline,
1045                                metrics,
1046                                last_harvest_time,
1047                            };
1048
1049                            let _permit = permit; // Keep permit alive
1050                            if let Err(e) = engine_handle.process_message_batch(messages).await {
1051                                error!("Background harvest processing failed: {}", e);
1052                            }
1053                        });
1054                    }
1055                    Err(_) => {
1056                        warn!("Processing semaphore exhausted, skipping background processing");
1057                    }
1058                }
1059            }
1060        }
1061
1062        Ok(())
1063    }
1064
1065    /// Check if we should trigger processing based on time
1066    async fn should_trigger_by_time(&self) -> bool {
1067        let last_harvest = self.last_harvest_time.lock().await;
1068        match *last_harvest {
1069            Some(last_time) => {
1070                let elapsed = last_time.elapsed();
1071                elapsed >= Duration::from_secs(self.config.time_trigger_minutes * 60)
1072            }
1073            None => true, // First run
1074        }
1075    }
1076
1077    /// Process a batch of messages
1078    pub async fn process_message_batch(&self, messages: Vec<ConversationMessage>) -> Result<()> {
1079        if messages.is_empty() {
1080            return Ok(());
1081        }
1082
1083        let start_time = Instant::now();
1084        debug!("Processing batch of {} messages", messages.len());
1085
1086        // Set processing timeout
1087        let processing_future = self.process_messages_internal(messages.clone());
1088        let timeout_duration = Duration::from_secs(self.config.max_processing_time_seconds);
1089
1090        match timeout(timeout_duration, processing_future).await {
1091            Ok(result) => {
1092                let processing_time = start_time.elapsed();
1093                self.metrics
1094                    .record_batch_processing(messages.len(), processing_time.as_millis() as u64);
1095
1096                *self.last_harvest_time.lock().await = Some(Instant::now());
1097
1098                result
1099            }
1100            Err(_) => {
1101                warn!(
1102                    "Message batch processing timed out after {:?}",
1103                    timeout_duration
1104                );
1105                Err(HarvesterError::BatchProcessingFailed(
1106                    "Processing timeout exceeded".to_string(),
1107                )
1108                .into())
1109            }
1110        }
1111    }
1112
1113    async fn process_messages_internal(&self, messages: Vec<ConversationMessage>) -> Result<()> {
1114        let extraction_start = Instant::now();
1115
1116        // Extract patterns from all messages in parallel
1117        let pattern_futures: Vec<_> = messages
1118            .iter()
1119            .map(|message| {
1120                let pattern_matcher = &self.pattern_matcher;
1121                let metrics = &self.metrics;
1122                async move {
1123                    let patterns =
1124                        pattern_matcher.extract_patterns(&message.content, &message.context);
1125
1126                    let mut message_patterns = Vec::new();
1127                    for mut pattern in patterns {
1128                        pattern.source_message_id = Some(message.id.clone());
1129                        metrics.record_pattern_confidence(pattern.confidence);
1130                        message_patterns.push(pattern);
1131                    }
1132                    message_patterns
1133                }
1134            })
1135            .collect();
1136
1137        // Execute pattern extraction in parallel
1138        let pattern_results = future::join_all(pattern_futures).await;
1139        let mut all_patterns = Vec::new();
1140        for patterns in pattern_results {
1141            all_patterns.extend(patterns);
1142        }
1143
1144        let extraction_time = extraction_start.elapsed();
1145        self.metrics
1146            .record_harvest(
1147                all_patterns.len() as u64,
1148                extraction_time.as_millis() as u64,
1149            )
1150            .await;
1151
1152        if all_patterns.is_empty() {
1153            debug!("No patterns extracted from message batch");
1154            return Ok(());
1155        }
1156
1157        debug!(
1158            "Extracted {} patterns from {} messages",
1159            all_patterns.len(),
1160            messages.len()
1161        );
1162
1163        // Filter patterns by confidence threshold
1164        let high_confidence_patterns: Vec<ExtractedMemoryPattern> = all_patterns
1165            .into_iter()
1166            .filter(|p| p.confidence >= self.config.confidence_threshold)
1167            .collect();
1168
1169        if high_confidence_patterns.is_empty() {
1170            debug!(
1171                "No patterns met confidence threshold of {}",
1172                self.config.confidence_threshold
1173            );
1174            return Ok(());
1175        }
1176
1177        // Deduplicate patterns in parallel batches
1178        let dedup_batch_size = 10; // Process 10 patterns at a time
1179        let mut unique_patterns = Vec::new();
1180        let mut duplicate_count = 0;
1181
1182        for batch in high_confidence_patterns.chunks(dedup_batch_size) {
1183            let dedup_futures: Vec<_> = batch
1184                .iter()
1185                .map(|pattern| {
1186                    let dedup_service = &self.deduplication_service;
1187                    async move {
1188                        match dedup_service.is_duplicate(pattern).await {
1189                            Ok(is_duplicate) => (pattern, is_duplicate, None),
1190                            Err(e) => {
1191                                warn!("Deduplication check failed for pattern: {}", e);
1192                                (pattern, false, Some(e)) // Treat as unique to avoid data loss
1193                            }
1194                        }
1195                    }
1196                })
1197                .collect();
1198
1199            let dedup_results = future::join_all(dedup_futures).await;
1200
1201            for (pattern, is_duplicate, error) in dedup_results {
1202                if is_duplicate {
1203                    duplicate_count += 1;
1204                } else {
1205                    unique_patterns.push(pattern.clone());
1206                    if error.is_some() {
1207                        // Log deduplication failures but continue processing
1208                        warn!("Pattern included despite deduplication failure");
1209                    }
1210                }
1211            }
1212        }
1213
1214        debug!(
1215            "After deduplication: {} unique patterns, {} duplicates",
1216            unique_patterns.len(),
1217            duplicate_count
1218        );
1219
1220        // Store unique patterns as memories using batch operations with error handling
1221        let stored_count = match self
1222            .store_patterns_as_memories_batch(unique_patterns.clone())
1223            .await
1224        {
1225            Ok(count) => count,
1226            Err(e) => {
1227                error!("Batch storage failed: {}", e);
1228                if self.config.graceful_degradation {
1229                    warn!("Falling back to individual pattern storage");
1230                    self.fallback_individual_storage(unique_patterns).await
1231                } else {
1232                    return Err(e);
1233                }
1234            }
1235        };
1236
1237        self.metrics
1238            .record_storage(stored_count, duplicate_count)
1239            .await;
1240
1241        if self.config.silent_mode {
1242            // Silent operation - only log at debug level
1243            debug!(
1244                "Silent harvest completed: {} patterns stored, {} duplicates filtered",
1245                stored_count, duplicate_count
1246            );
1247        } else {
1248            info!(
1249                "Memory harvest completed: {} patterns stored, {} duplicates filtered",
1250                stored_count, duplicate_count
1251            );
1252        }
1253
1254        Ok(())
1255    }
1256
1257    async fn store_pattern_as_memory(&self, pattern: ExtractedMemoryPattern) -> Result<Memory> {
1258        // Create metadata for the memory
1259        let mut metadata = pattern.metadata.clone();
1260        metadata.insert(
1261            "pattern_type".to_string(),
1262            serde_json::to_value(&pattern.pattern_type)?,
1263        );
1264        metadata.insert(
1265            "extraction_confidence".to_string(),
1266            serde_json::Value::Number(
1267                serde_json::Number::from_f64(pattern.confidence)
1268                    .unwrap_or_else(|| serde_json::Number::from(0)),
1269            ),
1270        );
1271        metadata.insert(
1272            "extracted_at".to_string(),
1273            serde_json::Value::String(pattern.extracted_at.to_rfc3339()),
1274        );
1275        if let Some(ref source_id) = pattern.source_message_id {
1276            metadata.insert(
1277                "source_message_id".to_string(),
1278                serde_json::Value::String(source_id.clone()),
1279            );
1280        }
1281        metadata.insert(
1282            "context".to_string(),
1283            serde_json::Value::String(pattern.context.clone()),
1284        );
1285        metadata.insert(
1286            "harvester_version".to_string(),
1287            serde_json::Value::String("1.0".to_string()),
1288        );
1289
1290        // Use importance assessment to determine final confidence
1291        let assessment_result = self
1292            .importance_pipeline
1293            .assess_importance(&pattern.content)
1294            .await
1295            .map_err(|e| HarvesterError::ImportanceAssessmentFailed(e.to_string()))?;
1296
1297        let final_importance = assessment_result.importance_score.max(pattern.confidence);
1298
1299        // Create memory request
1300        let create_request = crate::memory::models::CreateMemoryRequest {
1301            content: pattern.content,
1302            embedding: None,                 // Will be generated by repository
1303            tier: Some(MemoryTier::Working), // Start in working memory
1304            importance_score: Some(final_importance),
1305            metadata: Some(serde_json::Value::Object(metadata.into_iter().collect())),
1306            parent_id: None,
1307            expires_at: None,
1308        };
1309
1310        // Store the memory
1311        self.repository
1312            .create_memory(create_request)
1313            .await
1314            .map_err(HarvesterError::RepositoryFailed)
1315            .map_err(Into::into)
1316    }
1317
1318    /// Get current metrics summary
1319    pub async fn get_metrics_summary(&self) -> HarvesterMetricsSummary {
1320        HarvesterMetricsSummary {
1321            messages_processed: self.metrics.messages_processed.load(Ordering::Relaxed),
1322            patterns_extracted: self.metrics.patterns_extracted.load(Ordering::Relaxed),
1323            memories_stored: self.metrics.memories_stored.load(Ordering::Relaxed),
1324            duplicates_filtered: self.metrics.duplicates_filtered.load(Ordering::Relaxed),
1325            avg_extraction_time_ms: self.metrics.extraction_time_ms.load(Ordering::Relaxed),
1326            avg_batch_processing_time_ms: self
1327                .metrics
1328                .batch_processing_time_ms
1329                .load(Ordering::Relaxed),
1330            last_harvest_time: *self.metrics.last_harvest_time.lock().await,
1331        }
1332    }
1333
1334    /// Force immediate harvest of queued messages
1335    pub async fn force_harvest(&self) -> Result<HarvestResult> {
1336        let messages = {
1337            let mut queue = self.message_queue.lock().await;
1338            queue.drain_all()
1339        };
1340
1341        if messages.is_empty() {
1342            return Ok(HarvestResult {
1343                messages_processed: 0,
1344                patterns_extracted: 0,
1345                patterns_stored: 0,
1346                duplicates_filtered: 0,
1347                processing_time_ms: 0,
1348            });
1349        }
1350
1351        let start_time = Instant::now();
1352        self.process_message_batch(messages.clone()).await?;
1353        let processing_time = start_time.elapsed();
1354
1355        Ok(HarvestResult {
1356            messages_processed: messages.len(),
1357            patterns_extracted: 0,  // Would need to track during processing
1358            patterns_stored: 0,     // Would need to track during processing
1359            duplicates_filtered: 0, // Would need to track during processing
1360            processing_time_ms: processing_time.as_millis() as u64,
1361        })
1362    }
1363
1364    /// Store multiple patterns as memories using batch operations for better performance
1365    async fn store_patterns_as_memories_batch(
1366        &self,
1367        patterns: Vec<ExtractedMemoryPattern>,
1368    ) -> Result<u64> {
1369        if patterns.is_empty() {
1370            return Ok(0);
1371        }
1372
1373        // Process in parallel batches to avoid overwhelming the database
1374        let batch_size = 20; // Store up to 20 patterns concurrently
1375        let mut stored_count = 0;
1376
1377        for batch in patterns.chunks(batch_size) {
1378            let storage_futures: Vec<_> = batch
1379                .iter()
1380                .map(|pattern| self.store_pattern_as_memory(pattern.clone()))
1381                .collect();
1382
1383            // Execute batch storage operations
1384            let results = future::join_all(storage_futures).await;
1385
1386            // Count successful storage operations
1387            for result in results {
1388                match result {
1389                    Ok(_) => stored_count += 1,
1390                    Err(e) => {
1391                        warn!("Failed to store pattern as memory in batch: {}", e);
1392                        // Continue with other patterns rather than failing entire batch
1393                    }
1394                }
1395            }
1396        }
1397
1398        Ok(stored_count)
1399    }
1400
1401    /// Fallback storage method for when batch operations fail
1402    async fn fallback_individual_storage(&self, patterns: Vec<ExtractedMemoryPattern>) -> u64 {
1403        let mut stored_count = 0;
1404        let mut consecutive_failures = 0;
1405        const MAX_CONSECUTIVE_FAILURES: u32 = 5;
1406
1407        let patterns_len = patterns.len();
1408        for pattern in patterns {
1409            // Implement retry logic with exponential backoff
1410            let mut retry_count = 0;
1411            let mut success = false;
1412
1413            while retry_count < self.config.max_retries && !success {
1414                match self.store_pattern_as_memory(pattern.clone()).await {
1415                    Ok(_) => {
1416                        stored_count += 1;
1417                        consecutive_failures = 0;
1418                        success = true;
1419                    }
1420                    Err(e) => {
1421                        retry_count += 1;
1422                        consecutive_failures += 1;
1423
1424                        warn!(
1425                            "Failed to store pattern (attempt {} of {}): {}",
1426                            retry_count, self.config.max_retries, e
1427                        );
1428
1429                        if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
1430                            error!(
1431                                "Too many consecutive failures ({}), stopping fallback storage",
1432                                consecutive_failures
1433                            );
1434                            break;
1435                        }
1436
1437                        // Exponential backoff: 100ms, 200ms, 400ms
1438                        if retry_count < self.config.max_retries {
1439                            let delay = Duration::from_millis(100 * (1u64 << retry_count));
1440                            tokio::time::sleep(delay).await;
1441                        }
1442                    }
1443                }
1444            }
1445
1446            if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
1447                break;
1448            }
1449        }
1450
1451        if stored_count < patterns_len as u64 {
1452            warn!(
1453                "Fallback storage completed with partial success: {}/{} patterns stored",
1454                stored_count, patterns_len
1455            );
1456        }
1457
1458        stored_count
1459    }
1460}
1461
1462/// Shared handle for background processing (prevents race conditions)
1463struct HarvestingEngineHandle {
1464    config: SilentHarvesterConfig,
1465    pattern_matcher: PatternMatcher,
1466    deduplication_service: Arc<DeduplicationService>, // Shared service prevents race conditions
1467    repository: Arc<MemoryRepository>,
1468    importance_pipeline: Arc<ImportanceAssessmentPipeline>,
1469    metrics: Arc<HarvesterMetrics>,
1470    #[allow(dead_code)] // May be used for future optimizations
1471    last_harvest_time: Arc<Mutex<Option<Instant>>>,
1472}
1473
1474impl HarvestingEngineHandle {
1475    async fn process_message_batch(&self, messages: Vec<ConversationMessage>) -> Result<()> {
1476        // Simplified processing logic - reuse the main logic structure
1477        // This is essentially the same as the main engine's process_messages_internal
1478        let mut all_patterns = Vec::new();
1479        let extraction_start = Instant::now();
1480
1481        // Extract patterns
1482        for message in &messages {
1483            let patterns = self
1484                .pattern_matcher
1485                .extract_patterns(&message.content, &message.context);
1486
1487            for mut pattern in patterns {
1488                pattern.source_message_id = Some(message.id.clone());
1489                self.metrics.record_pattern_confidence(pattern.confidence);
1490                all_patterns.push(pattern);
1491            }
1492        }
1493
1494        let extraction_time = extraction_start.elapsed();
1495        self.metrics
1496            .record_harvest(
1497                all_patterns.len() as u64,
1498                extraction_time.as_millis() as u64,
1499            )
1500            .await;
1501
1502        // Filter by confidence
1503        let high_confidence_patterns: Vec<ExtractedMemoryPattern> = all_patterns
1504            .into_iter()
1505            .filter(|p| p.confidence >= self.config.confidence_threshold)
1506            .collect();
1507
1508        // Deduplicate and store
1509        let mut stored_count = 0;
1510        let mut duplicate_count = 0;
1511
1512        for pattern in high_confidence_patterns {
1513            match self.deduplication_service.is_duplicate(&pattern).await {
1514                Ok(is_duplicate) => {
1515                    if is_duplicate {
1516                        duplicate_count += 1;
1517                    } else {
1518                        match self.store_pattern_as_memory(pattern).await {
1519                            Ok(_) => stored_count += 1,
1520                            Err(e) => warn!("Failed to store pattern: {}", e),
1521                        }
1522                    }
1523                }
1524                Err(e) => {
1525                    warn!("Deduplication check failed: {}", e);
1526                }
1527            }
1528        }
1529
1530        self.metrics
1531            .record_storage(stored_count, duplicate_count)
1532            .await;
1533        debug!(
1534            "Background harvest: {} stored, {} duplicates",
1535            stored_count, duplicate_count
1536        );
1537
1538        Ok(())
1539    }
1540
1541    async fn store_pattern_as_memory(&self, pattern: ExtractedMemoryPattern) -> Result<Memory> {
1542        // Create metadata
1543        let mut metadata = pattern.metadata.clone();
1544        metadata.insert(
1545            "pattern_type".to_string(),
1546            serde_json::to_value(&pattern.pattern_type)?,
1547        );
1548        metadata.insert(
1549            "extraction_confidence".to_string(),
1550            serde_json::Value::Number(
1551                serde_json::Number::from_f64(pattern.confidence)
1552                    .unwrap_or_else(|| serde_json::Number::from(0)),
1553            ),
1554        );
1555
1556        // Use importance assessment
1557        let assessment_result = self
1558            .importance_pipeline
1559            .assess_importance(&pattern.content)
1560            .await
1561            .map_err(|e| HarvesterError::ImportanceAssessmentFailed(e.to_string()))?;
1562
1563        let final_importance = assessment_result.importance_score.max(pattern.confidence);
1564
1565        // Create and store memory
1566        let create_request = crate::memory::models::CreateMemoryRequest {
1567            content: pattern.content,
1568            embedding: None,
1569            tier: Some(MemoryTier::Working),
1570            importance_score: Some(final_importance),
1571            metadata: Some(serde_json::Value::Object(metadata.into_iter().collect())),
1572            parent_id: None,
1573            expires_at: None,
1574        };
1575
1576        self.repository
1577            .create_memory(create_request)
1578            .await
1579            .map_err(HarvesterError::RepositoryFailed)
1580            .map_err(Into::into)
1581    }
1582}
1583
1584/// Summary of harvester metrics
1585#[derive(Debug, Serialize, Deserialize)]
1586pub struct HarvesterMetricsSummary {
1587    pub messages_processed: u64,
1588    pub patterns_extracted: u64,
1589    pub memories_stored: u64,
1590    pub duplicates_filtered: u64,
1591    pub avg_extraction_time_ms: u64,
1592    pub avg_batch_processing_time_ms: u64,
1593    pub last_harvest_time: Option<DateTime<Utc>>,
1594}
1595
1596/// Result of a harvest operation
1597#[derive(Debug, Serialize, Deserialize)]
1598pub struct HarvestResult {
1599    pub messages_processed: usize,
1600    pub patterns_extracted: usize,
1601    pub patterns_stored: usize,
1602    pub duplicates_filtered: usize,
1603    pub processing_time_ms: u64,
1604}
1605
1606/// Background task manager for the silent harvester
1607pub struct SilentHarvesterService {
1608    engine: Arc<HarvestingEngine>,
1609    #[allow(dead_code)] // Stored for potential future use
1610    config: SilentHarvesterConfig,
1611    _shutdown_tx: tokio::sync::oneshot::Sender<()>,
1612}
1613
1614impl SilentHarvesterService {
1615    pub fn new(
1616        repository: Arc<MemoryRepository>,
1617        importance_pipeline: Arc<ImportanceAssessmentPipeline>,
1618        embedding_service: Arc<dyn EmbeddingService>,
1619        config: Option<SilentHarvesterConfig>,
1620        registry: &Registry,
1621    ) -> Result<Self> {
1622        let config = config.unwrap_or_default();
1623        let metrics = Arc::new(HarvesterMetrics::new(registry)?);
1624
1625        let engine = Arc::new(HarvestingEngine::new(
1626            config.clone(),
1627            repository,
1628            importance_pipeline,
1629            embedding_service,
1630            metrics,
1631        )?);
1632
1633        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1634
1635        // Start background task for time-based triggering
1636        let engine_clone = engine.clone();
1637        let time_trigger_minutes = config.time_trigger_minutes;
1638
1639        tokio::spawn(async move {
1640            let mut interval = interval(Duration::from_secs(time_trigger_minutes * 60));
1641            let mut shutdown_rx = shutdown_rx;
1642
1643            loop {
1644                tokio::select! {
1645                    _ = interval.tick() => {
1646                        if let Err(e) = engine_clone.force_harvest().await {
1647                            error!("Scheduled harvest failed: {}", e);
1648                        }
1649                    }
1650                    _ = &mut shutdown_rx => {
1651                        info!("Silent harvester service shutting down");
1652                        break;
1653                    }
1654                }
1655            }
1656        });
1657
1658        Ok(Self {
1659            engine,
1660            config,
1661            _shutdown_tx: shutdown_tx,
1662        })
1663    }
1664
1665    /// Add a message to the harvesting queue
1666    pub async fn add_message(&self, message: ConversationMessage) -> Result<()> {
1667        self.engine.queue_message(message).await
1668    }
1669
1670    /// Get the harvesting engine for direct access
1671    pub fn engine(&self) -> &Arc<HarvestingEngine> {
1672        &self.engine
1673    }
1674
1675    /// Force immediate harvest
1676    pub async fn force_harvest(&self) -> Result<HarvestResult> {
1677        self.engine.force_harvest().await
1678    }
1679
1680    /// Get metrics summary
1681    pub async fn get_metrics(&self) -> HarvesterMetricsSummary {
1682        self.engine.get_metrics_summary().await
1683    }
1684}