1use crate::embedding::EmbeddingService;
2use crate::memory::{ImportanceAssessmentPipeline, Memory, MemoryRepository, MemoryTier};
3use anyhow::{Context, Result};
4use chrono::{DateTime, Utc};
5use futures::future;
6use prometheus::{Counter, Histogram, Registry};
7use regex::Regex;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use thiserror::Error;
14use tokio::sync::{Mutex, RwLock, Semaphore};
15use tokio::time::{interval, timeout};
16use tracing::{debug, error, info, trace, warn};
17
18#[derive(Debug, Error)]
19pub enum HarvesterError {
20 #[error("Pattern extraction failed: {0}")]
21 ExtractionFailed(String),
22
23 #[error("Deduplication failed: {0}")]
24 DeduplicationFailed(String),
25
26 #[error("Batch processing failed: {0}")]
27 BatchProcessingFailed(String),
28
29 #[error("Repository operation failed: {0}")]
30 RepositoryFailed(#[from] crate::memory::error::MemoryError),
31
32 #[error("Importance assessment failed: {0}")]
33 ImportanceAssessmentFailed(String),
34
35 #[error("Configuration error: {0}")]
36 Configuration(String),
37
38 #[error("Background task failed: {0}")]
39 BackgroundTaskFailed(String),
40
41 #[error("Circuit breaker open: {0}")]
42 CircuitBreakerOpen(String),
43
44 #[error("Resource exhaustion: {0}")]
45 ResourceExhaustion(String),
46
47 #[error("Backpressure applied: {0}")]
48 BackpressureApplied(String),
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
53pub enum MemoryPatternType {
54 Preference,
55 Fact,
56 Decision,
57 Correction,
58 Emotion,
59 Goal,
60 Relationship,
61 Skill,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ExtractedMemoryPattern {
67 pub pattern_type: MemoryPatternType,
68 pub content: String,
69 pub confidence: f64,
70 pub extracted_at: DateTime<Utc>,
71 pub source_message_id: Option<String>,
72 pub context: String,
73 pub metadata: HashMap<String, serde_json::Value>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct SilentHarvesterConfig {
79 pub confidence_threshold: f64,
81
82 pub deduplication_threshold: f64,
84
85 pub message_trigger_count: usize,
87
88 pub time_trigger_minutes: u64,
90
91 pub max_batch_size: usize,
93
94 pub max_processing_time_seconds: u64,
96
97 pub silent_mode: bool,
99
100 pub pattern_config: PatternExtractionConfig,
102
103 pub graceful_degradation: bool,
105
106 pub max_retries: u32,
108
109 pub enable_fallback_storage: bool,
111}
112
113impl Default for SilentHarvesterConfig {
114 fn default() -> Self {
115 Self {
116 confidence_threshold: 0.7,
117 deduplication_threshold: 0.85,
118 message_trigger_count: 10,
119 time_trigger_minutes: 5,
120 max_batch_size: 50,
121 max_processing_time_seconds: 2,
122 silent_mode: true,
123 pattern_config: PatternExtractionConfig::default(),
124 graceful_degradation: true,
125 max_retries: 3,
126 enable_fallback_storage: true,
127 }
128 }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct PatternExtractionConfig {
134 pub preference_patterns: Vec<String>,
135 pub fact_patterns: Vec<String>,
136 pub decision_patterns: Vec<String>,
137 pub correction_patterns: Vec<String>,
138 pub emotion_patterns: Vec<String>,
139 pub goal_patterns: Vec<String>,
140 pub relationship_patterns: Vec<String>,
141 pub skill_patterns: Vec<String>,
142}
143
144impl Default for PatternExtractionConfig {
145 fn default() -> Self {
146 Self {
147 preference_patterns: vec![
148 r"(?i)I prefer|I like|I enjoy|I love|I hate|I dislike".to_string(),
149 r"(?i)my favorite|I'd rather|I tend to|I usually".to_string(),
150 r"(?i)I always|I never|I often|I rarely".to_string(),
151 ],
152 fact_patterns: vec![
153 r"(?i)I am|I work|I live|I have|my name is".to_string(),
154 r"(?i)I was born|I graduated|I studied|I learned".to_string(),
155 r"(?i)the fact is|it's true that|I know that".to_string(),
156 ],
157 decision_patterns: vec![
158 r"(?i)I decided|I chose|I will|I'm going to".to_string(),
159 r"(?i)I've decided|my decision|I'll go with".to_string(),
160 r"(?i)I think we should|let's go with|I recommend".to_string(),
161 ],
162 correction_patterns: vec![
163 r"(?i)actually|correction|I meant|let me clarify".to_string(),
164 r"(?i)that's wrong|that's incorrect|I misspoke".to_string(),
165 r"(?i)sorry, I meant|to be clear|what I should have said".to_string(),
166 ],
167 emotion_patterns: vec![
168 r"(?i)I feel|I'm excited|I'm worried|I'm happy".to_string(),
169 r"(?i)I'm frustrated|I'm confused|I'm concerned".to_string(),
170 r"(?i)this makes me|I'm feeling|emotionally".to_string(),
171 ],
172 goal_patterns: vec![
173 r"(?i)I want to|I hope to|my goal|I'm trying to".to_string(),
174 r"(?i)I'm working toward|I aim to|I plan to".to_string(),
175 r"(?i)I need to|I should|I must".to_string(),
176 ],
177 relationship_patterns: vec![
178 r"(?i)my friend|my colleague|my family|my partner".to_string(),
179 r"(?i)I work with|I know someone|my relationship".to_string(),
180 r"(?i)my team|my boss|my client".to_string(),
181 ],
182 skill_patterns: vec![
183 r"(?i)I can|I'm good at|I know how to|I'm skilled".to_string(),
184 r"(?i)I'm learning|I'm studying|I practice".to_string(),
185 r"(?i)I'm experienced|I specialize|my expertise".to_string(),
186 ],
187 }
188 }
189}
190
191#[derive(Debug)]
193pub struct HarvesterMetrics {
194 pub messages_processed: Arc<AtomicU64>,
195 pub patterns_extracted: Arc<AtomicU64>,
196 pub memories_stored: Arc<AtomicU64>,
197 pub duplicates_filtered: Arc<AtomicU64>,
198 pub extraction_time_ms: Arc<AtomicU64>,
199 pub batch_processing_time_ms: Arc<AtomicU64>,
200 pub last_harvest_time: Arc<Mutex<Option<DateTime<Utc>>>>,
201
202 pub extraction_counter: Counter,
204 pub storage_counter: Counter,
205 pub deduplication_counter: Counter,
206 pub processing_time_histogram: Histogram,
207 pub batch_size_histogram: Histogram,
208 pub confidence_histogram: Histogram,
209}
210
211impl HarvesterMetrics {
212 pub fn new(registry: &Registry) -> Result<Self> {
213 let extraction_counter = Counter::new(
214 "harvester_patterns_extracted_total",
215 "Total number of patterns extracted",
216 )?;
217 registry.register(Box::new(extraction_counter.clone()))?;
218
219 let storage_counter = Counter::new(
220 "harvester_memories_stored_total",
221 "Total number of memories stored",
222 )?;
223 registry.register(Box::new(storage_counter.clone()))?;
224
225 let deduplication_counter = Counter::new(
226 "harvester_duplicates_filtered_total",
227 "Total number of duplicates filtered out",
228 )?;
229 registry.register(Box::new(deduplication_counter.clone()))?;
230
231 let processing_time_histogram = Histogram::with_opts(
232 prometheus::HistogramOpts::new(
233 "harvester_processing_duration_seconds",
234 "Time spent processing messages",
235 )
236 .buckets(vec![0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0]),
237 )?;
238 registry.register(Box::new(processing_time_histogram.clone()))?;
239
240 let batch_size_histogram = Histogram::with_opts(
241 prometheus::HistogramOpts::new("harvester_batch_size", "Size of processing batches")
242 .buckets(vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0]),
243 )?;
244 registry.register(Box::new(batch_size_histogram.clone()))?;
245
246 let confidence_histogram = Histogram::with_opts(
247 prometheus::HistogramOpts::new(
248 "harvester_pattern_confidence",
249 "Confidence scores of extracted patterns",
250 )
251 .buckets(vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]),
252 )?;
253 registry.register(Box::new(confidence_histogram.clone()))?;
254
255 Ok(Self {
256 messages_processed: Arc::new(AtomicU64::new(0)),
257 patterns_extracted: Arc::new(AtomicU64::new(0)),
258 memories_stored: Arc::new(AtomicU64::new(0)),
259 duplicates_filtered: Arc::new(AtomicU64::new(0)),
260 extraction_time_ms: Arc::new(AtomicU64::new(0)),
261 batch_processing_time_ms: Arc::new(AtomicU64::new(0)),
262 last_harvest_time: Arc::new(Mutex::new(None)),
263 extraction_counter,
264 storage_counter,
265 deduplication_counter,
266 processing_time_histogram,
267 batch_size_histogram,
268 confidence_histogram,
269 })
270 }
271
272 pub async fn record_harvest(&self, patterns_count: u64, processing_time_ms: u64) {
273 self.patterns_extracted
274 .fetch_add(patterns_count, Ordering::Relaxed);
275 self.extraction_time_ms
276 .fetch_add(processing_time_ms, Ordering::Relaxed);
277 *self.last_harvest_time.lock().await = Some(Utc::now());
278
279 self.extraction_counter.inc_by(patterns_count as f64);
280 self.processing_time_histogram
281 .observe(processing_time_ms as f64 / 1000.0);
282 }
283
284 pub async fn record_storage(&self, stored_count: u64, duplicates_count: u64) {
285 self.memories_stored
286 .fetch_add(stored_count, Ordering::Relaxed);
287 self.duplicates_filtered
288 .fetch_add(duplicates_count, Ordering::Relaxed);
289
290 self.storage_counter.inc_by(stored_count as f64);
291 self.deduplication_counter.inc_by(duplicates_count as f64);
292 }
293
294 pub fn record_batch_processing(&self, batch_size: usize, processing_time_ms: u64) {
295 self.batch_processing_time_ms
296 .fetch_add(processing_time_ms, Ordering::Relaxed);
297 self.batch_size_histogram.observe(batch_size as f64);
298 }
299
300 pub fn record_pattern_confidence(&self, confidence: f64) {
301 self.confidence_histogram.observe(confidence);
302 }
303}
304
305pub struct PatternMatcher {
307 preference_regexes: Vec<Regex>,
308 fact_regexes: Vec<Regex>,
309 decision_regexes: Vec<Regex>,
310 correction_regexes: Vec<Regex>,
311 emotion_regexes: Vec<Regex>,
312 goal_regexes: Vec<Regex>,
313 relationship_regexes: Vec<Regex>,
314 skill_regexes: Vec<Regex>,
315}
316
317impl PatternMatcher {
318 pub fn new(config: &PatternExtractionConfig) -> Result<Self> {
319 let compile_patterns = |patterns: &[String]| -> Result<Vec<Regex>> {
320 patterns
321 .iter()
322 .map(|p| Regex::new(p).context("Failed to compile regex pattern"))
323 .collect()
324 };
325
326 Ok(Self {
327 preference_regexes: compile_patterns(&config.preference_patterns)?,
328 fact_regexes: compile_patterns(&config.fact_patterns)?,
329 decision_regexes: compile_patterns(&config.decision_patterns)?,
330 correction_regexes: compile_patterns(&config.correction_patterns)?,
331 emotion_regexes: compile_patterns(&config.emotion_patterns)?,
332 goal_regexes: compile_patterns(&config.goal_patterns)?,
333 relationship_regexes: compile_patterns(&config.relationship_patterns)?,
334 skill_regexes: compile_patterns(&config.skill_patterns)?,
335 })
336 }
337
338 pub fn extract_patterns(&self, message: &str, context: &str) -> Vec<ExtractedMemoryPattern> {
340 let mut patterns = Vec::new();
341 let extracted_at = Utc::now();
342
343 patterns.extend(self.extract_pattern_type(
345 message,
346 context,
347 MemoryPatternType::Preference,
348 &self.preference_regexes,
349 extracted_at,
350 ));
351
352 patterns.extend(self.extract_pattern_type(
353 message,
354 context,
355 MemoryPatternType::Fact,
356 &self.fact_regexes,
357 extracted_at,
358 ));
359
360 patterns.extend(self.extract_pattern_type(
361 message,
362 context,
363 MemoryPatternType::Decision,
364 &self.decision_regexes,
365 extracted_at,
366 ));
367
368 patterns.extend(self.extract_pattern_type(
369 message,
370 context,
371 MemoryPatternType::Correction,
372 &self.correction_regexes,
373 extracted_at,
374 ));
375
376 patterns.extend(self.extract_pattern_type(
377 message,
378 context,
379 MemoryPatternType::Emotion,
380 &self.emotion_regexes,
381 extracted_at,
382 ));
383
384 patterns.extend(self.extract_pattern_type(
385 message,
386 context,
387 MemoryPatternType::Goal,
388 &self.goal_regexes,
389 extracted_at,
390 ));
391
392 patterns.extend(self.extract_pattern_type(
393 message,
394 context,
395 MemoryPatternType::Relationship,
396 &self.relationship_regexes,
397 extracted_at,
398 ));
399
400 patterns.extend(self.extract_pattern_type(
401 message,
402 context,
403 MemoryPatternType::Skill,
404 &self.skill_regexes,
405 extracted_at,
406 ));
407
408 patterns
409 }
410
411 fn extract_pattern_type(
412 &self,
413 message: &str,
414 context: &str,
415 pattern_type: MemoryPatternType,
416 regexes: &[Regex],
417 extracted_at: DateTime<Utc>,
418 ) -> Vec<ExtractedMemoryPattern> {
419 let mut patterns = Vec::new();
420
421 for regex in regexes {
422 for mat in regex.find_iter(message) {
423 let content = self.extract_sentence_with_match(message, mat.start(), mat.end());
425
426 let confidence =
428 self.calculate_pattern_confidence(&pattern_type, &content, context);
429
430 let mut metadata = HashMap::new();
431 metadata.insert(
432 "match_start".to_string(),
433 serde_json::Value::Number(mat.start().into()),
434 );
435 metadata.insert(
436 "match_end".to_string(),
437 serde_json::Value::Number(mat.end().into()),
438 );
439 metadata.insert(
440 "matched_text".to_string(),
441 serde_json::Value::String(mat.as_str().to_string()),
442 );
443
444 patterns.push(ExtractedMemoryPattern {
445 pattern_type: pattern_type.clone(),
446 content,
447 confidence,
448 extracted_at,
449 source_message_id: None, context: context.to_string(),
451 metadata,
452 });
453 }
454 }
455
456 patterns
457 }
458
459 fn extract_sentence_with_match(&self, text: &str, start: usize, end: usize) -> String {
460 let before = &text[..start];
462 let after = &text[end..];
463
464 let sentence_start = before
466 .rfind(['.', '!', '?'])
467 .map(|pos| pos + 1)
468 .unwrap_or(0);
469
470 let sentence_end = after
472 .find(['.', '!', '?'])
473 .map(|pos| end + pos + 1)
474 .unwrap_or(text.len());
475
476 text[sentence_start..sentence_end].trim().to_string()
477 }
478
479 fn calculate_pattern_confidence(
480 &self,
481 pattern_type: &MemoryPatternType,
482 content: &str,
483 context: &str,
484 ) -> f64 {
485 let mut confidence: f64 = 0.3;
488
489 let type_boost = match pattern_type {
491 MemoryPatternType::Correction => 0.3, MemoryPatternType::Decision => 0.25, MemoryPatternType::Fact => 0.2, MemoryPatternType::Goal => 0.18, MemoryPatternType::Skill => 0.15, MemoryPatternType::Preference => 0.12, MemoryPatternType::Relationship => 0.1, MemoryPatternType::Emotion => 0.08, };
500 confidence += type_boost;
501
502 let certainty_markers = [
504 ("definitely", 0.15),
505 ("certainly", 0.15),
506 ("absolutely", 0.15),
507 ("always", 0.12),
508 ("never", 0.12),
509 ("completely", 0.12),
510 ("strongly", 0.1),
511 ("really", 0.08),
512 ("very", 0.06),
513 ("quite", 0.04),
514 ("somewhat", -0.05),
515 ("maybe", -0.1),
516 ("perhaps", -0.08),
517 ("possibly", -0.08),
518 ("might", -0.06),
519 ];
520
521 for (marker, boost) in &certainty_markers {
522 if content.to_lowercase().contains(marker) {
523 confidence += boost;
524 break; }
526 }
527
528 let personal_indicators = content.matches('I').count() as f64;
530 let my_indicators = content.to_lowercase().matches("my ").count() as f64;
531 let me_indicators = content.to_lowercase().matches("me ").count() as f64;
532
533 let personal_score =
534 (personal_indicators * 0.03 + my_indicators * 0.04 + me_indicators * 0.02).min(0.15);
535 confidence += personal_score;
536
537 let length_score = match content.len() {
539 0..=10 => -0.3, 11..=30 => -0.1, 31..=80 => 0.1, 81..=150 => 0.15, 151..=250 => 0.1, 251..=400 => 0.0, _ => -0.15, };
547 confidence += length_score;
548
549 if !context.is_empty() && content.len() > context.len() / 10 {
551 confidence += 0.05; }
553
554 let word_count = content.split_whitespace().count() as f64;
556 let sentence_count = content.matches(['.', '!', '?']).count() as f64;
557
558 if word_count > 0.0 {
559 let avg_sentence_length = word_count / sentence_count.max(1.0);
560 let structure_score = match avg_sentence_length as usize {
562 1..=3 => -0.05, 4..=7 => 0.0, 8..=20 => 0.08, 21..=35 => 0.02, _ => -0.05, };
568 confidence += structure_score;
569 }
570
571 let lowercase_content = content.to_lowercase();
573 let words: Vec<&str> = lowercase_content.split_whitespace().collect();
574 if words.len() > 5 {
575 let unique_words: std::collections::HashSet<_> = words.iter().collect();
576 let uniqueness_ratio = unique_words.len() as f64 / words.len() as f64;
577
578 if uniqueness_ratio < 0.7 {
580 confidence -= 0.1;
581 } else if uniqueness_ratio > 0.9 {
582 confidence += 0.05; }
584 }
585
586 confidence.clamp(0.1, 0.95) }
589}
590
591#[derive(Debug, Clone, PartialEq)]
593enum CircuitBreakerState {
594 Closed,
595 Open,
596 HalfOpen,
597}
598
599#[derive(Debug)]
601struct CircuitBreaker {
602 state: Arc<RwLock<CircuitBreakerState>>,
603 failure_count: Arc<AtomicU64>,
604 last_failure_time: Arc<RwLock<Option<Instant>>>,
605 failure_threshold: u64,
606 timeout: Duration,
607 half_open_max_calls: u64,
608 half_open_calls: Arc<AtomicU64>,
609}
610
611impl CircuitBreaker {
612 fn new(failure_threshold: u64, timeout: Duration) -> Self {
613 Self {
614 state: Arc::new(RwLock::new(CircuitBreakerState::Closed)),
615 failure_count: Arc::new(AtomicU64::new(0)),
616 last_failure_time: Arc::new(RwLock::new(None)),
617 failure_threshold,
618 timeout,
619 half_open_max_calls: 3,
620 half_open_calls: Arc::new(AtomicU64::new(0)),
621 }
622 }
623
624 async fn call<T, F, Fut>(&self, f: F) -> Result<T>
625 where
626 F: FnOnce() -> Fut,
627 Fut: std::future::Future<Output = Result<T>>,
628 {
629 match *self.state.read().await {
630 CircuitBreakerState::Open => {
631 let last_failure = *self.last_failure_time.read().await;
632 if let Some(failure_time) = last_failure {
633 if failure_time.elapsed() > self.timeout {
634 *self.state.write().await = CircuitBreakerState::HalfOpen;
635 self.half_open_calls.store(0, Ordering::Relaxed);
636 } else {
637 return Err(HarvesterError::CircuitBreakerOpen(
638 "Embedding service circuit breaker is open".to_string(),
639 )
640 .into());
641 }
642 } else {
643 return Err(HarvesterError::CircuitBreakerOpen(
644 "Embedding service circuit breaker is open".to_string(),
645 )
646 .into());
647 }
648 }
649 CircuitBreakerState::HalfOpen => {
650 if self.half_open_calls.load(Ordering::Relaxed) >= self.half_open_max_calls {
651 return Err(HarvesterError::CircuitBreakerOpen(
652 "Half-open circuit breaker call limit exceeded".to_string(),
653 )
654 .into());
655 }
656 self.half_open_calls.fetch_add(1, Ordering::Relaxed);
657 }
658 CircuitBreakerState::Closed => {}
659 }
660
661 match f().await {
662 Ok(result) => {
663 self.on_success().await;
664 Ok(result)
665 }
666 Err(e) => {
667 self.on_failure().await;
668 Err(e)
669 }
670 }
671 }
672
673 async fn on_success(&self) {
674 let current_state = self.state.read().await.clone();
675 match current_state {
676 CircuitBreakerState::HalfOpen => {
677 *self.state.write().await = CircuitBreakerState::Closed;
678 self.failure_count.store(0, Ordering::Relaxed);
679 }
680 _ => {
681 self.failure_count.store(0, Ordering::Relaxed);
682 }
683 }
684 }
685
686 async fn on_failure(&self) {
687 let failures = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
688
689 if failures >= self.failure_threshold {
690 *self.state.write().await = CircuitBreakerState::Open;
691 *self.last_failure_time.write().await = Some(Instant::now());
692 warn!("Circuit breaker opened after {} failures", failures);
693 }
694 }
695}
696
697pub struct DeduplicationService {
699 threshold: f64,
700 embedding_service: Arc<dyn EmbeddingService>,
701 recent_embeddings: Arc<RwLock<VecDeque<(String, Vec<f32>, Instant)>>>,
702 max_cache_size: usize,
703 cache_cleanup_threshold: f64,
704 circuit_breaker: CircuitBreaker,
705 bypass_on_failure: Arc<AtomicBool>,
706 cache_ttl: Duration,
707}
708
709impl DeduplicationService {
710 pub fn new(
711 threshold: f64,
712 embedding_service: Arc<dyn EmbeddingService>,
713 max_cache_size: usize,
714 ) -> Self {
715 Self {
716 threshold,
717 embedding_service,
718 recent_embeddings: Arc::new(RwLock::new(VecDeque::new())),
719 max_cache_size,
720 cache_cleanup_threshold: 0.8, circuit_breaker: CircuitBreaker::new(5, Duration::from_secs(60)), bypass_on_failure: Arc::new(AtomicBool::new(false)),
723 cache_ttl: Duration::from_secs(3600), }
725 }
726
727 pub async fn is_duplicate(&self, pattern: &ExtractedMemoryPattern) -> Result<bool> {
729 self.cleanup_expired_entries().await;
731
732 if self.bypass_on_failure.load(Ordering::Relaxed) {
734 warn!("Bypassing deduplication due to embedding service failures");
735 return Ok(false);
736 }
737
738 let embedding = match self
740 .circuit_breaker
741 .call(|| async {
742 self.embedding_service
743 .generate_embedding(&pattern.content)
744 .await
745 .context("Failed to generate embedding for deduplication")
746 })
747 .await
748 {
749 Ok(embedding) => {
750 self.bypass_on_failure.store(false, Ordering::Relaxed);
752 embedding
753 }
754 Err(e) => {
755 self.bypass_on_failure.store(true, Ordering::Relaxed);
757 warn!(
758 "Embedding generation failed, bypassing deduplication: {}",
759 e
760 );
761 return Ok(false); }
763 };
764
765 let now = Instant::now();
766 let recent_embeddings = self.recent_embeddings.read().await;
767
768 for (_, cached_embedding, timestamp) in recent_embeddings.iter() {
770 if now.duration_since(*timestamp) <= self.cache_ttl {
771 let similarity = self.cosine_similarity(&embedding, cached_embedding);
772 if similarity >= self.threshold {
773 trace!(
774 "Duplicate pattern detected with similarity {:.3}: '{}'",
775 similarity,
776 pattern.content.chars().take(50).collect::<String>()
777 );
778 return Ok(true);
779 }
780 }
781 }
782
783 drop(recent_embeddings);
784
785 let mut cache = self.recent_embeddings.write().await;
787 cache.push_back((pattern.content.clone(), embedding, now));
788
789 let current_size = cache.len();
791 let cleanup_threshold_size =
792 (self.max_cache_size as f64 * self.cache_cleanup_threshold) as usize;
793
794 if current_size >= cleanup_threshold_size {
795 self.aggressive_cache_cleanup(&mut cache, now).await;
796 }
797
798 while cache.len() > self.max_cache_size {
800 cache.pop_front();
801 }
802
803 Ok(false)
804 }
805
806 async fn cleanup_expired_entries(&self) {
808 let mut cache = self.recent_embeddings.write().await;
809 let now = Instant::now();
810 let initial_size = cache.len();
811
812 while let Some((_, _, timestamp)) = cache.front() {
814 if now.duration_since(*timestamp) > self.cache_ttl {
815 cache.pop_front();
816 } else {
817 break; }
819 }
820
821 let cleaned_count = initial_size - cache.len();
822 if cleaned_count > 0 {
823 trace!("Cleaned up {} expired cache entries", cleaned_count);
824 }
825 }
826
827 async fn aggressive_cache_cleanup(
829 &self,
830 cache: &mut VecDeque<(String, Vec<f32>, Instant)>,
831 _now: Instant,
832 ) {
833 let initial_size = cache.len();
834
835 let removal_count = cache.len() / 4;
837 for _ in 0..removal_count {
838 cache.pop_front();
839 }
840
841 let removed_count = initial_size - cache.len();
842 if removed_count > 0 {
843 debug!("Aggressive cache cleanup removed {} entries", removed_count);
844 }
845 }
846
847 fn cosine_similarity(&self, a: &[f32], b: &[f32]) -> f64 {
848 if a.len() != b.len() {
849 return 0.0;
850 }
851
852 let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
853 let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
854 let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
855
856 if norm_a == 0.0 || norm_b == 0.0 {
857 return 0.0;
858 }
859
860 (dot_product / (norm_a * norm_b)) as f64
861 }
862}
863
864#[derive(Debug, Clone)]
866pub struct ConversationMessage {
867 pub id: String,
868 pub content: String,
869 pub timestamp: DateTime<Utc>,
870 pub role: String, pub context: String,
872}
873
874struct BoundedMessageQueue {
876 queue: VecDeque<ConversationMessage>,
877 max_size: usize,
878 max_memory_mb: usize,
879 current_memory_bytes: usize,
880}
881
882impl BoundedMessageQueue {
883 fn new(max_size: usize, max_memory_mb: usize) -> Self {
884 Self {
885 queue: VecDeque::new(),
886 max_size,
887 max_memory_mb,
888 current_memory_bytes: 0,
889 }
890 }
891
892 fn try_push(&mut self, message: ConversationMessage) -> Result<()> {
893 let message_size = self.estimate_message_size(&message);
894 let new_memory_bytes = self.current_memory_bytes + message_size;
895 let max_memory_bytes = self.max_memory_mb * 1024 * 1024;
896
897 if self.queue.len() >= self.max_size {
899 return Err(HarvesterError::BackpressureApplied(format!(
900 "Message queue size limit exceeded: {}",
901 self.max_size
902 ))
903 .into());
904 }
905
906 if new_memory_bytes > max_memory_bytes {
907 return Err(HarvesterError::BackpressureApplied(format!(
908 "Message queue memory limit exceeded: {} MB",
909 self.max_memory_mb
910 ))
911 .into());
912 }
913
914 self.current_memory_bytes = new_memory_bytes;
915 self.queue.push_back(message);
916 Ok(())
917 }
918
919 fn drain_all(&mut self) -> Vec<ConversationMessage> {
920 self.current_memory_bytes = 0;
921 self.queue.drain(..).collect()
922 }
923
924 fn len(&self) -> usize {
925 self.queue.len()
926 }
927
928 fn estimate_message_size(&self, message: &ConversationMessage) -> usize {
929 message.id.len() + message.content.len() + message.context.len() + message.role.len() + 100
931 }
932}
933
934pub struct HarvestingEngine {
936 config: SilentHarvesterConfig,
937 pattern_matcher: PatternMatcher,
938 deduplication_service: Arc<DeduplicationService>, repository: Arc<MemoryRepository>,
940 importance_pipeline: Arc<ImportanceAssessmentPipeline>,
941 metrics: Arc<HarvesterMetrics>,
942 message_queue: Arc<Mutex<BoundedMessageQueue>>,
943 last_harvest_time: Arc<Mutex<Option<Instant>>>,
944 processing_semaphore: Arc<Semaphore>, }
946
947impl HarvestingEngine {
948 pub fn new(
949 config: SilentHarvesterConfig,
950 repository: Arc<MemoryRepository>,
951 importance_pipeline: Arc<ImportanceAssessmentPipeline>,
952 embedding_service: Arc<dyn EmbeddingService>,
953 metrics: Arc<HarvesterMetrics>,
954 ) -> Result<Self> {
955 let pattern_matcher = PatternMatcher::new(&config.pattern_config)?;
956 let deduplication_service = Arc::new(DeduplicationService::new(
957 config.deduplication_threshold,
958 embedding_service,
959 1000, ));
961
962 let message_queue = BoundedMessageQueue::new(
964 config.max_batch_size * 5, 50, );
967
968 Ok(Self {
969 config,
970 pattern_matcher,
971 deduplication_service,
972 repository,
973 importance_pipeline,
974 metrics,
975 message_queue: Arc::new(Mutex::new(message_queue)),
976 last_harvest_time: Arc::new(Mutex::new(None)),
977 processing_semaphore: Arc::new(Semaphore::new(2)), })
979 }
980
981 pub async fn queue_message(&self, message: ConversationMessage) -> Result<()> {
983 let mut queue = self.message_queue.lock().await;
984
985 match queue.try_push(message) {
987 Ok(()) => {}
988 Err(e) => {
989 warn!("Queue limit reached, forcing immediate processing: {}", e);
991 let messages = queue.drain_all();
992 drop(queue);
993
994 if !messages.is_empty() {
996 if let Ok(_permit) = self.processing_semaphore.try_acquire() {
997 self.process_message_batch(messages)
998 .await
999 .unwrap_or_else(|e| {
1000 error!("Forced harvest processing failed: {}", e);
1001 });
1002 }
1003 }
1004 return Err(e);
1005 }
1006 }
1007
1008 let should_process =
1010 queue.len() >= self.config.message_trigger_count || self.should_trigger_by_time().await;
1011
1012 if should_process {
1013 let messages = queue.drain_all();
1015 drop(queue);
1016
1017 if !messages.is_empty() {
1019 match self.processing_semaphore.clone().try_acquire_owned() {
1020 Ok(permit) => {
1021 let config = self.config.clone();
1023 let dedup_service = self.deduplication_service.clone();
1024 let repository = self.repository.clone();
1025 let importance_pipeline = self.importance_pipeline.clone();
1026 let metrics = self.metrics.clone();
1027 let last_harvest_time = self.last_harvest_time.clone();
1028 let pattern_config = self.config.pattern_config.clone();
1029
1030 tokio::spawn(async move {
1031 let pattern_matcher = match PatternMatcher::new(&pattern_config) {
1032 Ok(pm) => pm,
1033 Err(e) => {
1034 error!("Failed to create pattern matcher: {}", e);
1035 return;
1036 }
1037 };
1038
1039 let engine_handle = HarvestingEngineHandle {
1040 config,
1041 pattern_matcher,
1042 deduplication_service: dedup_service,
1043 repository,
1044 importance_pipeline,
1045 metrics,
1046 last_harvest_time,
1047 };
1048
1049 let _permit = permit; if let Err(e) = engine_handle.process_message_batch(messages).await {
1051 error!("Background harvest processing failed: {}", e);
1052 }
1053 });
1054 }
1055 Err(_) => {
1056 warn!("Processing semaphore exhausted, skipping background processing");
1057 }
1058 }
1059 }
1060 }
1061
1062 Ok(())
1063 }
1064
1065 async fn should_trigger_by_time(&self) -> bool {
1067 let last_harvest = self.last_harvest_time.lock().await;
1068 match *last_harvest {
1069 Some(last_time) => {
1070 let elapsed = last_time.elapsed();
1071 elapsed >= Duration::from_secs(self.config.time_trigger_minutes * 60)
1072 }
1073 None => true, }
1075 }
1076
1077 pub async fn process_message_batch(&self, messages: Vec<ConversationMessage>) -> Result<()> {
1079 if messages.is_empty() {
1080 return Ok(());
1081 }
1082
1083 let start_time = Instant::now();
1084 debug!("Processing batch of {} messages", messages.len());
1085
1086 let processing_future = self.process_messages_internal(messages.clone());
1088 let timeout_duration = Duration::from_secs(self.config.max_processing_time_seconds);
1089
1090 match timeout(timeout_duration, processing_future).await {
1091 Ok(result) => {
1092 let processing_time = start_time.elapsed();
1093 self.metrics
1094 .record_batch_processing(messages.len(), processing_time.as_millis() as u64);
1095
1096 *self.last_harvest_time.lock().await = Some(Instant::now());
1097
1098 result
1099 }
1100 Err(_) => {
1101 warn!(
1102 "Message batch processing timed out after {:?}",
1103 timeout_duration
1104 );
1105 Err(HarvesterError::BatchProcessingFailed(
1106 "Processing timeout exceeded".to_string(),
1107 )
1108 .into())
1109 }
1110 }
1111 }
1112
1113 async fn process_messages_internal(&self, messages: Vec<ConversationMessage>) -> Result<()> {
1114 let extraction_start = Instant::now();
1115
1116 let pattern_futures: Vec<_> = messages
1118 .iter()
1119 .map(|message| {
1120 let pattern_matcher = &self.pattern_matcher;
1121 let metrics = &self.metrics;
1122 async move {
1123 let patterns =
1124 pattern_matcher.extract_patterns(&message.content, &message.context);
1125
1126 let mut message_patterns = Vec::new();
1127 for mut pattern in patterns {
1128 pattern.source_message_id = Some(message.id.clone());
1129 metrics.record_pattern_confidence(pattern.confidence);
1130 message_patterns.push(pattern);
1131 }
1132 message_patterns
1133 }
1134 })
1135 .collect();
1136
1137 let pattern_results = future::join_all(pattern_futures).await;
1139 let mut all_patterns = Vec::new();
1140 for patterns in pattern_results {
1141 all_patterns.extend(patterns);
1142 }
1143
1144 let extraction_time = extraction_start.elapsed();
1145 self.metrics
1146 .record_harvest(
1147 all_patterns.len() as u64,
1148 extraction_time.as_millis() as u64,
1149 )
1150 .await;
1151
1152 if all_patterns.is_empty() {
1153 debug!("No patterns extracted from message batch");
1154 return Ok(());
1155 }
1156
1157 debug!(
1158 "Extracted {} patterns from {} messages",
1159 all_patterns.len(),
1160 messages.len()
1161 );
1162
1163 let high_confidence_patterns: Vec<ExtractedMemoryPattern> = all_patterns
1165 .into_iter()
1166 .filter(|p| p.confidence >= self.config.confidence_threshold)
1167 .collect();
1168
1169 if high_confidence_patterns.is_empty() {
1170 debug!(
1171 "No patterns met confidence threshold of {}",
1172 self.config.confidence_threshold
1173 );
1174 return Ok(());
1175 }
1176
1177 let dedup_batch_size = 10; let mut unique_patterns = Vec::new();
1180 let mut duplicate_count = 0;
1181
1182 for batch in high_confidence_patterns.chunks(dedup_batch_size) {
1183 let dedup_futures: Vec<_> = batch
1184 .iter()
1185 .map(|pattern| {
1186 let dedup_service = &self.deduplication_service;
1187 async move {
1188 match dedup_service.is_duplicate(pattern).await {
1189 Ok(is_duplicate) => (pattern, is_duplicate, None),
1190 Err(e) => {
1191 warn!("Deduplication check failed for pattern: {}", e);
1192 (pattern, false, Some(e)) }
1194 }
1195 }
1196 })
1197 .collect();
1198
1199 let dedup_results = future::join_all(dedup_futures).await;
1200
1201 for (pattern, is_duplicate, error) in dedup_results {
1202 if is_duplicate {
1203 duplicate_count += 1;
1204 } else {
1205 unique_patterns.push(pattern.clone());
1206 if error.is_some() {
1207 warn!("Pattern included despite deduplication failure");
1209 }
1210 }
1211 }
1212 }
1213
1214 debug!(
1215 "After deduplication: {} unique patterns, {} duplicates",
1216 unique_patterns.len(),
1217 duplicate_count
1218 );
1219
1220 let stored_count = match self
1222 .store_patterns_as_memories_batch(unique_patterns.clone())
1223 .await
1224 {
1225 Ok(count) => count,
1226 Err(e) => {
1227 error!("Batch storage failed: {}", e);
1228 if self.config.graceful_degradation {
1229 warn!("Falling back to individual pattern storage");
1230 self.fallback_individual_storage(unique_patterns).await
1231 } else {
1232 return Err(e);
1233 }
1234 }
1235 };
1236
1237 self.metrics
1238 .record_storage(stored_count, duplicate_count)
1239 .await;
1240
1241 if self.config.silent_mode {
1242 debug!(
1244 "Silent harvest completed: {} patterns stored, {} duplicates filtered",
1245 stored_count, duplicate_count
1246 );
1247 } else {
1248 info!(
1249 "Memory harvest completed: {} patterns stored, {} duplicates filtered",
1250 stored_count, duplicate_count
1251 );
1252 }
1253
1254 Ok(())
1255 }
1256
1257 async fn store_pattern_as_memory(&self, pattern: ExtractedMemoryPattern) -> Result<Memory> {
1258 let mut metadata = pattern.metadata.clone();
1260 metadata.insert(
1261 "pattern_type".to_string(),
1262 serde_json::to_value(&pattern.pattern_type)?,
1263 );
1264 metadata.insert(
1265 "extraction_confidence".to_string(),
1266 serde_json::Value::Number(
1267 serde_json::Number::from_f64(pattern.confidence)
1268 .unwrap_or_else(|| serde_json::Number::from(0)),
1269 ),
1270 );
1271 metadata.insert(
1272 "extracted_at".to_string(),
1273 serde_json::Value::String(pattern.extracted_at.to_rfc3339()),
1274 );
1275 if let Some(ref source_id) = pattern.source_message_id {
1276 metadata.insert(
1277 "source_message_id".to_string(),
1278 serde_json::Value::String(source_id.clone()),
1279 );
1280 }
1281 metadata.insert(
1282 "context".to_string(),
1283 serde_json::Value::String(pattern.context.clone()),
1284 );
1285 metadata.insert(
1286 "harvester_version".to_string(),
1287 serde_json::Value::String("1.0".to_string()),
1288 );
1289
1290 let assessment_result = self
1292 .importance_pipeline
1293 .assess_importance(&pattern.content)
1294 .await
1295 .map_err(|e| HarvesterError::ImportanceAssessmentFailed(e.to_string()))?;
1296
1297 let final_importance = assessment_result.importance_score.max(pattern.confidence);
1298
1299 let create_request = crate::memory::models::CreateMemoryRequest {
1301 content: pattern.content,
1302 embedding: None, tier: Some(MemoryTier::Working), importance_score: Some(final_importance),
1305 metadata: Some(serde_json::Value::Object(metadata.into_iter().collect())),
1306 parent_id: None,
1307 expires_at: None,
1308 };
1309
1310 self.repository
1312 .create_memory(create_request)
1313 .await
1314 .map_err(HarvesterError::RepositoryFailed)
1315 .map_err(Into::into)
1316 }
1317
1318 pub async fn get_metrics_summary(&self) -> HarvesterMetricsSummary {
1320 HarvesterMetricsSummary {
1321 messages_processed: self.metrics.messages_processed.load(Ordering::Relaxed),
1322 patterns_extracted: self.metrics.patterns_extracted.load(Ordering::Relaxed),
1323 memories_stored: self.metrics.memories_stored.load(Ordering::Relaxed),
1324 duplicates_filtered: self.metrics.duplicates_filtered.load(Ordering::Relaxed),
1325 avg_extraction_time_ms: self.metrics.extraction_time_ms.load(Ordering::Relaxed),
1326 avg_batch_processing_time_ms: self
1327 .metrics
1328 .batch_processing_time_ms
1329 .load(Ordering::Relaxed),
1330 last_harvest_time: *self.metrics.last_harvest_time.lock().await,
1331 }
1332 }
1333
1334 pub async fn force_harvest(&self) -> Result<HarvestResult> {
1336 let messages = {
1337 let mut queue = self.message_queue.lock().await;
1338 queue.drain_all()
1339 };
1340
1341 if messages.is_empty() {
1342 return Ok(HarvestResult {
1343 messages_processed: 0,
1344 patterns_extracted: 0,
1345 patterns_stored: 0,
1346 duplicates_filtered: 0,
1347 processing_time_ms: 0,
1348 });
1349 }
1350
1351 let start_time = Instant::now();
1352 self.process_message_batch(messages.clone()).await?;
1353 let processing_time = start_time.elapsed();
1354
1355 Ok(HarvestResult {
1356 messages_processed: messages.len(),
1357 patterns_extracted: 0, patterns_stored: 0, duplicates_filtered: 0, processing_time_ms: processing_time.as_millis() as u64,
1361 })
1362 }
1363
1364 async fn store_patterns_as_memories_batch(
1366 &self,
1367 patterns: Vec<ExtractedMemoryPattern>,
1368 ) -> Result<u64> {
1369 if patterns.is_empty() {
1370 return Ok(0);
1371 }
1372
1373 let batch_size = 20; let mut stored_count = 0;
1376
1377 for batch in patterns.chunks(batch_size) {
1378 let storage_futures: Vec<_> = batch
1379 .iter()
1380 .map(|pattern| self.store_pattern_as_memory(pattern.clone()))
1381 .collect();
1382
1383 let results = future::join_all(storage_futures).await;
1385
1386 for result in results {
1388 match result {
1389 Ok(_) => stored_count += 1,
1390 Err(e) => {
1391 warn!("Failed to store pattern as memory in batch: {}", e);
1392 }
1394 }
1395 }
1396 }
1397
1398 Ok(stored_count)
1399 }
1400
1401 async fn fallback_individual_storage(&self, patterns: Vec<ExtractedMemoryPattern>) -> u64 {
1403 let mut stored_count = 0;
1404 let mut consecutive_failures = 0;
1405 const MAX_CONSECUTIVE_FAILURES: u32 = 5;
1406
1407 let patterns_len = patterns.len();
1408 for pattern in patterns {
1409 let mut retry_count = 0;
1411 let mut success = false;
1412
1413 while retry_count < self.config.max_retries && !success {
1414 match self.store_pattern_as_memory(pattern.clone()).await {
1415 Ok(_) => {
1416 stored_count += 1;
1417 consecutive_failures = 0;
1418 success = true;
1419 }
1420 Err(e) => {
1421 retry_count += 1;
1422 consecutive_failures += 1;
1423
1424 warn!(
1425 "Failed to store pattern (attempt {} of {}): {}",
1426 retry_count, self.config.max_retries, e
1427 );
1428
1429 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
1430 error!(
1431 "Too many consecutive failures ({}), stopping fallback storage",
1432 consecutive_failures
1433 );
1434 break;
1435 }
1436
1437 if retry_count < self.config.max_retries {
1439 let delay = Duration::from_millis(100 * (1u64 << retry_count));
1440 tokio::time::sleep(delay).await;
1441 }
1442 }
1443 }
1444 }
1445
1446 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
1447 break;
1448 }
1449 }
1450
1451 if stored_count < patterns_len as u64 {
1452 warn!(
1453 "Fallback storage completed with partial success: {}/{} patterns stored",
1454 stored_count, patterns_len
1455 );
1456 }
1457
1458 stored_count
1459 }
1460}
1461
1462struct HarvestingEngineHandle {
1464 config: SilentHarvesterConfig,
1465 pattern_matcher: PatternMatcher,
1466 deduplication_service: Arc<DeduplicationService>, repository: Arc<MemoryRepository>,
1468 importance_pipeline: Arc<ImportanceAssessmentPipeline>,
1469 metrics: Arc<HarvesterMetrics>,
1470 #[allow(dead_code)] last_harvest_time: Arc<Mutex<Option<Instant>>>,
1472}
1473
1474impl HarvestingEngineHandle {
1475 async fn process_message_batch(&self, messages: Vec<ConversationMessage>) -> Result<()> {
1476 let mut all_patterns = Vec::new();
1479 let extraction_start = Instant::now();
1480
1481 for message in &messages {
1483 let patterns = self
1484 .pattern_matcher
1485 .extract_patterns(&message.content, &message.context);
1486
1487 for mut pattern in patterns {
1488 pattern.source_message_id = Some(message.id.clone());
1489 self.metrics.record_pattern_confidence(pattern.confidence);
1490 all_patterns.push(pattern);
1491 }
1492 }
1493
1494 let extraction_time = extraction_start.elapsed();
1495 self.metrics
1496 .record_harvest(
1497 all_patterns.len() as u64,
1498 extraction_time.as_millis() as u64,
1499 )
1500 .await;
1501
1502 let high_confidence_patterns: Vec<ExtractedMemoryPattern> = all_patterns
1504 .into_iter()
1505 .filter(|p| p.confidence >= self.config.confidence_threshold)
1506 .collect();
1507
1508 let mut stored_count = 0;
1510 let mut duplicate_count = 0;
1511
1512 for pattern in high_confidence_patterns {
1513 match self.deduplication_service.is_duplicate(&pattern).await {
1514 Ok(is_duplicate) => {
1515 if is_duplicate {
1516 duplicate_count += 1;
1517 } else {
1518 match self.store_pattern_as_memory(pattern).await {
1519 Ok(_) => stored_count += 1,
1520 Err(e) => warn!("Failed to store pattern: {}", e),
1521 }
1522 }
1523 }
1524 Err(e) => {
1525 warn!("Deduplication check failed: {}", e);
1526 }
1527 }
1528 }
1529
1530 self.metrics
1531 .record_storage(stored_count, duplicate_count)
1532 .await;
1533 debug!(
1534 "Background harvest: {} stored, {} duplicates",
1535 stored_count, duplicate_count
1536 );
1537
1538 Ok(())
1539 }
1540
1541 async fn store_pattern_as_memory(&self, pattern: ExtractedMemoryPattern) -> Result<Memory> {
1542 let mut metadata = pattern.metadata.clone();
1544 metadata.insert(
1545 "pattern_type".to_string(),
1546 serde_json::to_value(&pattern.pattern_type)?,
1547 );
1548 metadata.insert(
1549 "extraction_confidence".to_string(),
1550 serde_json::Value::Number(
1551 serde_json::Number::from_f64(pattern.confidence)
1552 .unwrap_or_else(|| serde_json::Number::from(0)),
1553 ),
1554 );
1555
1556 let assessment_result = self
1558 .importance_pipeline
1559 .assess_importance(&pattern.content)
1560 .await
1561 .map_err(|e| HarvesterError::ImportanceAssessmentFailed(e.to_string()))?;
1562
1563 let final_importance = assessment_result.importance_score.max(pattern.confidence);
1564
1565 let create_request = crate::memory::models::CreateMemoryRequest {
1567 content: pattern.content,
1568 embedding: None,
1569 tier: Some(MemoryTier::Working),
1570 importance_score: Some(final_importance),
1571 metadata: Some(serde_json::Value::Object(metadata.into_iter().collect())),
1572 parent_id: None,
1573 expires_at: None,
1574 };
1575
1576 self.repository
1577 .create_memory(create_request)
1578 .await
1579 .map_err(HarvesterError::RepositoryFailed)
1580 .map_err(Into::into)
1581 }
1582}
1583
1584#[derive(Debug, Serialize, Deserialize)]
1586pub struct HarvesterMetricsSummary {
1587 pub messages_processed: u64,
1588 pub patterns_extracted: u64,
1589 pub memories_stored: u64,
1590 pub duplicates_filtered: u64,
1591 pub avg_extraction_time_ms: u64,
1592 pub avg_batch_processing_time_ms: u64,
1593 pub last_harvest_time: Option<DateTime<Utc>>,
1594}
1595
1596#[derive(Debug, Serialize, Deserialize)]
1598pub struct HarvestResult {
1599 pub messages_processed: usize,
1600 pub patterns_extracted: usize,
1601 pub patterns_stored: usize,
1602 pub duplicates_filtered: usize,
1603 pub processing_time_ms: u64,
1604}
1605
1606pub struct SilentHarvesterService {
1608 engine: Arc<HarvestingEngine>,
1609 #[allow(dead_code)] config: SilentHarvesterConfig,
1611 _shutdown_tx: tokio::sync::oneshot::Sender<()>,
1612}
1613
1614impl SilentHarvesterService {
1615 pub fn new(
1616 repository: Arc<MemoryRepository>,
1617 importance_pipeline: Arc<ImportanceAssessmentPipeline>,
1618 embedding_service: Arc<dyn EmbeddingService>,
1619 config: Option<SilentHarvesterConfig>,
1620 registry: &Registry,
1621 ) -> Result<Self> {
1622 let config = config.unwrap_or_default();
1623 let metrics = Arc::new(HarvesterMetrics::new(registry)?);
1624
1625 let engine = Arc::new(HarvestingEngine::new(
1626 config.clone(),
1627 repository,
1628 importance_pipeline,
1629 embedding_service,
1630 metrics,
1631 )?);
1632
1633 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
1634
1635 let engine_clone = engine.clone();
1637 let time_trigger_minutes = config.time_trigger_minutes;
1638
1639 tokio::spawn(async move {
1640 let mut interval = interval(Duration::from_secs(time_trigger_minutes * 60));
1641 let mut shutdown_rx = shutdown_rx;
1642
1643 loop {
1644 tokio::select! {
1645 _ = interval.tick() => {
1646 if let Err(e) = engine_clone.force_harvest().await {
1647 error!("Scheduled harvest failed: {}", e);
1648 }
1649 }
1650 _ = &mut shutdown_rx => {
1651 info!("Silent harvester service shutting down");
1652 break;
1653 }
1654 }
1655 }
1656 });
1657
1658 Ok(Self {
1659 engine,
1660 config,
1661 _shutdown_tx: shutdown_tx,
1662 })
1663 }
1664
1665 pub async fn add_message(&self, message: ConversationMessage) -> Result<()> {
1667 self.engine.queue_message(message).await
1668 }
1669
1670 pub fn engine(&self) -> &Arc<HarvestingEngine> {
1672 &self.engine
1673 }
1674
1675 pub async fn force_harvest(&self) -> Result<HarvestResult> {
1677 self.engine.force_harvest().await
1678 }
1679
1680 pub async fn get_metrics(&self) -> HarvesterMetricsSummary {
1682 self.engine.get_metrics_summary().await
1683 }
1684}