1use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13
14use common::{Memory, MemoryType};
15use storage::{RedisCache, VectorStorage};
16use tokio::sync::RwLock;
17use tracing;
18
19#[derive(Clone)]
21pub struct AutoPilotConfig {
22 pub enabled: bool,
24 pub dedup_threshold: f32,
26 pub dedup_interval_hours: u64,
28 pub consolidation_interval_hours: u64,
30}
31
32impl Default for AutoPilotConfig {
33 fn default() -> Self {
34 Self {
35 enabled: true,
36 dedup_threshold: 0.93,
37 dedup_interval_hours: 6,
38 consolidation_interval_hours: 12,
39 }
40 }
41}
42
43impl AutoPilotConfig {
44 pub fn from_env() -> Self {
46 let enabled: bool = std::env::var("DAKERA_AUTOPILOT_ENABLED")
47 .ok()
48 .and_then(|v| v.parse().ok())
49 .unwrap_or(true);
50
51 let dedup_threshold: f32 = std::env::var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD")
52 .ok()
53 .and_then(|v| v.parse().ok())
54 .unwrap_or(0.93);
55
56 let dedup_interval_hours: u64 = std::env::var("DAKERA_AUTOPILOT_DEDUP_INTERVAL_HOURS")
57 .ok()
58 .and_then(|v| v.parse().ok())
59 .unwrap_or(6);
60
61 let consolidation_interval_hours: u64 =
62 std::env::var("DAKERA_AUTOPILOT_CONSOLIDATION_INTERVAL_HOURS")
63 .ok()
64 .and_then(|v| v.parse().ok())
65 .unwrap_or(12);
66
67 Self {
68 enabled,
69 dedup_threshold,
70 dedup_interval_hours,
71 consolidation_interval_hours,
72 }
73 }
74}
75
76#[derive(Debug, Default)]
78pub struct DedupResult {
79 pub namespaces_processed: usize,
80 pub memories_scanned: usize,
81 pub duplicates_removed: usize,
82}
83
84#[derive(Debug, Default)]
86pub struct ConsolidationResult {
87 pub namespaces_processed: usize,
88 pub memories_scanned: usize,
89 pub clusters_merged: usize,
90 pub memories_consolidated: usize,
91}
92
93pub struct AutoPilotEngine {
95 pub config: AutoPilotConfig,
96}
97
98impl AutoPilotEngine {
99 pub fn new(config: AutoPilotConfig) -> Self {
100 Self { config }
101 }
102
103 fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
105 if a.len() != b.len() || a.is_empty() {
106 return 0.0;
107 }
108 let mut dot = 0.0_f64;
109 let mut norm_a = 0.0_f64;
110 let mut norm_b = 0.0_f64;
111 for (x, y) in a.iter().zip(b.iter()) {
112 let xd = *x as f64;
113 let yd = *y as f64;
114 dot += xd * yd;
115 norm_a += xd * xd;
116 norm_b += yd * yd;
117 }
118 let denom = norm_a.sqrt() * norm_b.sqrt();
119 if denom == 0.0 {
120 0.0
121 } else {
122 (dot / denom) as f32
123 }
124 }
125
126 fn retention_score(memory: &Memory) -> f64 {
128 memory.importance as f64 + memory.access_count as f64 * 0.01
129 }
130
131 pub async fn run_dedup(&self, storage: &Arc<dyn VectorStorage>) -> DedupResult {
137 let mut result = DedupResult::default();
138
139 let namespaces = match storage.list_namespaces().await {
140 Ok(ns) => ns,
141 Err(e) => {
142 tracing::error!(error = %e, "Auto-dedup: failed to list namespaces");
143 return result;
144 }
145 };
146
147 for namespace in namespaces {
148 if !namespace.starts_with("_dakera_agent_") {
149 continue;
150 }
151 result.namespaces_processed += 1;
152
153 let vectors = match storage.get_all(&namespace).await {
154 Ok(v) => v,
155 Err(e) => {
156 tracing::warn!(
157 namespace = %namespace,
158 error = %e,
159 "Auto-dedup: failed to get vectors"
160 );
161 continue;
162 }
163 };
164
165 let items: Vec<(Memory, &[f32])> = vectors
167 .iter()
168 .filter_map(|v| {
169 let mem = Memory::from_vector(v)?;
170 if v.values.is_empty() {
171 return None;
172 }
173 Some((mem, v.values.as_slice()))
174 })
175 .collect();
176
177 result.memories_scanned += items.len();
178
179 let mut to_delete: HashSet<String> = HashSet::new();
181
182 for i in 0..items.len() {
183 if to_delete.contains(&items[i].0.id) {
184 continue;
185 }
186 for j in (i + 1)..items.len() {
187 if to_delete.contains(&items[j].0.id) {
188 continue;
189 }
190 let sim = Self::cosine_similarity(items[i].1, items[j].1);
191 if sim >= self.config.dedup_threshold {
192 if Self::retention_score(&items[i].0) >= Self::retention_score(&items[j].0)
194 {
195 to_delete.insert(items[j].0.id.clone());
196 } else {
197 to_delete.insert(items[i].0.id.clone());
198 break; }
200 }
201 }
202 }
203
204 if !to_delete.is_empty() {
205 let ids: Vec<String> = to_delete.into_iter().collect();
206 result.duplicates_removed += ids.len();
207 if let Err(e) = storage.delete(&namespace, &ids).await {
208 tracing::warn!(
209 namespace = %namespace,
210 count = ids.len(),
211 error = %e,
212 "Auto-dedup: failed to delete duplicates"
213 );
214 }
215 }
216 }
217
218 tracing::info!(
219 namespaces = result.namespaces_processed,
220 scanned = result.memories_scanned,
221 removed = result.duplicates_removed,
222 "Auto-dedup cycle completed"
223 );
224
225 result
226 }
227
228 pub async fn run_consolidation(&self, storage: &Arc<dyn VectorStorage>) -> ConsolidationResult {
234 let mut result = ConsolidationResult::default();
235
236 let namespaces = match storage.list_namespaces().await {
237 Ok(ns) => ns,
238 Err(e) => {
239 tracing::error!(error = %e, "Auto-consolidation: failed to list namespaces");
240 return result;
241 }
242 };
243
244 for namespace in namespaces {
245 if !namespace.starts_with("_dakera_agent_") {
246 continue;
247 }
248 result.namespaces_processed += 1;
249
250 let vectors = match storage.get_all(&namespace).await {
251 Ok(v) => v,
252 Err(e) => {
253 tracing::warn!(
254 namespace = %namespace,
255 error = %e,
256 "Auto-consolidation: failed to get vectors"
257 );
258 continue;
259 }
260 };
261
262 let items: Vec<(Memory, Vec<f32>)> = vectors
264 .iter()
265 .filter_map(|v| {
266 let mem = Memory::from_vector(v)?;
267 if mem.importance >= 0.3 || v.values.is_empty() || mem.tags.is_empty() {
268 return None;
269 }
270 Some((mem, v.values.clone()))
271 })
272 .collect();
273
274 result.memories_scanned += items.len();
275
276 if items.len() < 3 {
277 continue;
278 }
279
280 let mut tag_to_indices: HashMap<&str, Vec<usize>> = HashMap::new();
282 for (i, (mem, _)) in items.iter().enumerate() {
283 for tag in &mem.tags {
284 tag_to_indices.entry(tag.as_str()).or_default().push(i);
285 }
286 }
287
288 let mut pair_shared_tags: HashMap<(usize, usize), usize> = HashMap::new();
290 for indices in tag_to_indices.values() {
291 for ai in 0..indices.len() {
292 for bi in (ai + 1)..indices.len() {
293 let key = (indices[ai], indices[bi]);
294 *pair_shared_tags.entry(key).or_default() += 1;
295 }
296 }
297 }
298
299 let mut adjacency: HashMap<usize, HashSet<usize>> = HashMap::new();
301 for (&(a, b), &count) in &pair_shared_tags {
302 if count >= 2 {
303 adjacency.entry(a).or_default().insert(b);
304 adjacency.entry(b).or_default().insert(a);
305 }
306 }
307
308 let mut visited: HashSet<usize> = HashSet::new();
310 let mut clusters: Vec<Vec<usize>> = Vec::new();
311
312 for &node in adjacency.keys() {
313 if visited.contains(&node) {
314 continue;
315 }
316 let mut cluster = Vec::new();
317 let mut stack = vec![node];
318 while let Some(n) = stack.pop() {
319 if visited.insert(n) {
320 cluster.push(n);
321 if let Some(neighbors) = adjacency.get(&n) {
322 for &nb in neighbors {
323 if !visited.contains(&nb) {
324 stack.push(nb);
325 }
326 }
327 }
328 }
329 }
330 if cluster.len() >= 3 {
331 clusters.push(cluster);
332 }
333 }
334
335 for (ci, cluster) in clusters.iter().enumerate() {
337 let memories: Vec<&Memory> = cluster.iter().map(|&i| &items[i].0).collect();
338 let embeddings: Vec<&Vec<f32>> = cluster.iter().map(|&i| &items[i].1).collect();
339
340 let max_importance = memories
341 .iter()
342 .map(|m| m.importance)
343 .fold(0.0_f32, f32::max);
344
345 let mut all_tags: Vec<String> =
347 memories.iter().flat_map(|m| m.tags.clone()).collect();
348 all_tags.sort();
349 all_tags.dedup();
350
351 let combined_content: String = memories
353 .iter()
354 .map(|m| m.content.as_str())
355 .collect::<Vec<_>>()
356 .join("\n---\n");
357
358 let dim = embeddings[0].len();
360 let mut avg_embedding = vec![0.0_f32; dim];
361 for emb in &embeddings {
362 for (i, v) in emb.iter().enumerate() {
363 avg_embedding[i] += v;
364 }
365 }
366 let count = embeddings.len() as f32;
367 for v in &mut avg_embedding {
368 *v /= count;
369 }
370
371 let now = std::time::SystemTime::now()
372 .duration_since(std::time::UNIX_EPOCH)
373 .unwrap_or_default()
374 .as_nanos();
375
376 let agent_id = memories[0].agent_id.clone();
377 let merged_id = format!("mem_consolidated_{:x}_{}", now, ci);
378
379 let merged_memory = Memory {
380 id: merged_id,
381 memory_type: MemoryType::Semantic,
382 content: combined_content,
383 agent_id,
384 session_id: None,
385 importance: max_importance,
386 tags: all_tags,
387 metadata: None,
388 created_at: (now / 1_000_000_000) as u64,
389 last_accessed_at: (now / 1_000_000_000) as u64,
390 access_count: 0,
391 ttl_seconds: None,
392 expires_at: None,
393 };
394
395 let merged_vector = merged_memory.to_vector(avg_embedding);
396
397 let ids_to_delete: Vec<String> = memories.iter().map(|m| m.id.clone()).collect();
399
400 if let Err(e) = storage.delete(&namespace, &ids_to_delete).await {
401 tracing::warn!(
402 namespace = %namespace,
403 error = %e,
404 "Auto-consolidation: failed to delete originals"
405 );
406 continue;
407 }
408
409 if let Err(e) = storage.upsert(&namespace, vec![merged_vector]).await {
410 tracing::warn!(
411 namespace = %namespace,
412 error = %e,
413 "Auto-consolidation: failed to insert merged memory"
414 );
415 continue;
416 }
417
418 result.clusters_merged += 1;
419 result.memories_consolidated += ids_to_delete.len();
420 }
421 }
422
423 tracing::info!(
424 namespaces = result.namespaces_processed,
425 scanned = result.memories_scanned,
426 clusters = result.clusters_merged,
427 consolidated = result.memories_consolidated,
428 "Auto-consolidation cycle completed"
429 );
430
431 result
432 }
433
434 pub fn spawn(
441 config: Arc<RwLock<AutoPilotConfig>>,
442 storage: Arc<dyn VectorStorage>,
443 metrics: Arc<crate::decay::BackgroundMetrics>,
444 redis: Option<RedisCache>,
445 node_id: String,
446 ) -> (tokio::task::JoinHandle<()>, tokio::task::JoinHandle<()>) {
447 let storage_dedup = storage.clone();
448 let metrics_dedup = metrics.clone();
449 let config_dedup = config.clone();
450 let redis_dedup = redis.clone();
451 let node_id_dedup = node_id.clone();
452 const DEDUP_LOCK_KEY: &str = "dakera:lock:dedup";
453 const CONSOLIDATION_LOCK_KEY: &str = "dakera:lock:consolidation";
454
455 let dedup_handle = tokio::spawn(async move {
456 loop {
457 let (enabled, dedup_threshold, interval_hours) = {
458 let cfg = config_dedup.read().await;
459 (cfg.enabled, cfg.dedup_threshold, cfg.dedup_interval_hours)
460 };
461
462 if !enabled {
463 tokio::time::sleep(std::time::Duration::from_secs(300)).await;
465 continue;
466 }
467
468 tokio::time::sleep(std::time::Duration::from_secs(interval_hours * 3600)).await;
469
470 if !config_dedup.read().await.enabled {
472 continue;
473 }
474
475 let lock_ttl = interval_hours * 3600 + 300;
477 let acquired = match redis_dedup {
478 Some(ref rc) => {
479 rc.try_acquire_lock(DEDUP_LOCK_KEY, &node_id_dedup, lock_ttl)
480 .await
481 }
482 None => true,
483 };
484
485 if !acquired {
486 tracing::debug!("Dedup skipped — another replica holds the leader lock");
487 continue;
488 }
489
490 let engine = AutoPilotEngine::new(AutoPilotConfig {
491 enabled: true,
492 dedup_threshold,
493 ..Default::default()
494 });
495 let result = engine.run_dedup(&storage_dedup).await;
496 metrics_dedup.record_dedup(
497 result.namespaces_processed,
498 result.memories_scanned,
499 result.duplicates_removed,
500 );
501
502 if let Some(ref rc) = redis_dedup {
503 rc.release_lock(DEDUP_LOCK_KEY, &node_id_dedup).await;
504 }
505 }
506 });
507
508 let consolidation_handle = tokio::spawn(async move {
509 loop {
510 let (enabled, interval_hours) = {
511 let cfg = config.read().await;
512 (cfg.enabled, cfg.consolidation_interval_hours)
513 };
514
515 if !enabled {
516 tokio::time::sleep(std::time::Duration::from_secs(300)).await;
517 continue;
518 }
519
520 tokio::time::sleep(std::time::Duration::from_secs(interval_hours * 3600)).await;
521
522 if !config.read().await.enabled {
523 continue;
524 }
525
526 let lock_ttl = interval_hours * 3600 + 300;
528 let acquired = match redis {
529 Some(ref rc) => {
530 rc.try_acquire_lock(CONSOLIDATION_LOCK_KEY, &node_id, lock_ttl)
531 .await
532 }
533 None => true,
534 };
535
536 if !acquired {
537 tracing::debug!(
538 "Consolidation skipped — another replica holds the leader lock"
539 );
540 continue;
541 }
542
543 let engine = AutoPilotEngine::new(AutoPilotConfig::default());
544 let result = engine.run_consolidation(&storage).await;
545 metrics.record_consolidation(
546 result.namespaces_processed,
547 result.memories_scanned,
548 result.clusters_merged,
549 result.memories_consolidated,
550 );
551
552 if let Some(ref rc) = redis {
553 rc.release_lock(CONSOLIDATION_LOCK_KEY, &node_id).await;
554 }
555 }
556 });
557
558 (dedup_handle, consolidation_handle)
559 }
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565 use std::sync::Mutex;
566
567 static ENV_LOCK: Mutex<()> = Mutex::new(());
572
573 #[test]
574 fn test_cosine_similarity_identical() {
575 let a = vec![1.0, 0.0, 0.0];
576 let b = vec![1.0, 0.0, 0.0];
577 let sim = AutoPilotEngine::cosine_similarity(&a, &b);
578 assert!((sim - 1.0).abs() < 0.001);
579 }
580
581 #[test]
582 fn test_cosine_similarity_orthogonal() {
583 let a = vec![1.0, 0.0, 0.0];
584 let b = vec![0.0, 1.0, 0.0];
585 let sim = AutoPilotEngine::cosine_similarity(&a, &b);
586 assert!(sim.abs() < 0.001);
587 }
588
589 #[test]
590 fn test_cosine_similarity_opposite() {
591 let a = vec![1.0, 0.0];
592 let b = vec![-1.0, 0.0];
593 let sim = AutoPilotEngine::cosine_similarity(&a, &b);
594 assert!((sim - (-1.0)).abs() < 0.001);
595 }
596
597 #[test]
598 fn test_cosine_similarity_empty() {
599 let sim = AutoPilotEngine::cosine_similarity(&[], &[]);
600 assert!(sim.abs() < 0.001);
601 }
602
603 #[test]
604 fn test_retention_score() {
605 let mut mem = Memory {
606 id: "test".to_string(),
607 memory_type: MemoryType::Episodic,
608 content: "test".to_string(),
609 agent_id: "agent".to_string(),
610 session_id: None,
611 importance: 0.5,
612 tags: vec![],
613 metadata: None,
614 created_at: 0,
615 last_accessed_at: 0,
616 access_count: 10,
617 ttl_seconds: None,
618 expires_at: None,
619 };
620 let score_a = AutoPilotEngine::retention_score(&mem);
621
622 mem.importance = 0.8;
623 mem.access_count = 0;
624 let score_b = AutoPilotEngine::retention_score(&mem);
625
626 assert!((score_a - 0.6).abs() < 0.001);
628 assert!((score_b - 0.8).abs() < 0.001);
629 }
630
631 #[test]
632 fn test_config_defaults() {
633 let config = AutoPilotConfig::default();
634 assert!(config.enabled);
635 assert!((config.dedup_threshold - 0.93).abs() < 0.001);
636 assert_eq!(config.dedup_interval_hours, 6);
637 assert_eq!(config.consolidation_interval_hours, 12);
638 }
639
640 #[test]
643 fn test_engine_new_stores_config() {
644 let cfg = AutoPilotConfig {
645 enabled: false,
646 dedup_threshold: 0.85,
647 dedup_interval_hours: 3,
648 consolidation_interval_hours: 24,
649 };
650 let engine = AutoPilotEngine::new(cfg);
651 assert!(!engine.config.enabled);
652 assert!((engine.config.dedup_threshold - 0.85).abs() < 0.001);
653 assert_eq!(engine.config.dedup_interval_hours, 3);
654 assert_eq!(engine.config.consolidation_interval_hours, 24);
655 }
656
657 #[test]
660 fn test_cosine_similarity_mismatched_lengths_returns_zero() {
661 let a = vec![1.0, 0.0, 0.0];
662 let b = vec![1.0, 0.0]; let sim = AutoPilotEngine::cosine_similarity(&a, &b);
664 assert!(
665 (sim - 0.0).abs() < 0.001,
666 "mismatched lengths should return 0.0, got {sim}"
667 );
668 }
669
670 #[test]
671 fn test_cosine_similarity_zero_vector_returns_zero() {
672 let a = vec![0.0, 0.0, 0.0];
673 let b = vec![1.0, 0.0, 0.0];
674 let sim = AutoPilotEngine::cosine_similarity(&a, &b);
675 assert!(
676 (sim - 0.0).abs() < 0.001,
677 "zero vector should give 0.0, got {sim}"
678 );
679 }
680
681 #[test]
682 fn test_cosine_similarity_single_element() {
683 let a = vec![2.0];
684 let b = vec![3.0];
685 let sim = AutoPilotEngine::cosine_similarity(&a, &b);
686 assert!(
687 (sim - 1.0).abs() < 0.001,
688 "same-direction scalars should give 1.0, got {sim}"
689 );
690 }
691
692 #[test]
693 fn test_cosine_similarity_partial_overlap() {
694 let a = vec![1.0_f32, 0.0];
696 let b = vec![1.0_f32, 1.0];
697 let sim = AutoPilotEngine::cosine_similarity(&a, &b);
698 let expected = 1.0_f32 / 2.0_f32.sqrt();
699 assert!(
700 (sim - expected).abs() < 0.001,
701 "expected ~{expected}, got {sim}"
702 );
703 }
704
705 #[test]
708 fn test_retention_score_zero_importance_zero_access() {
709 let mem = Memory {
710 id: "x".to_string(),
711 memory_type: MemoryType::Episodic,
712 content: "".to_string(),
713 agent_id: "a".to_string(),
714 session_id: None,
715 importance: 0.0,
716 tags: vec![],
717 metadata: None,
718 created_at: 0,
719 last_accessed_at: 0,
720 access_count: 0,
721 ttl_seconds: None,
722 expires_at: None,
723 };
724 let score = AutoPilotEngine::retention_score(&mem);
725 assert!((score - 0.0).abs() < 0.001);
726 }
727
728 #[test]
729 fn test_retention_score_access_count_dominates() {
730 let mut mem = Memory {
731 id: "x".to_string(),
732 memory_type: MemoryType::Episodic,
733 content: "".to_string(),
734 agent_id: "a".to_string(),
735 session_id: None,
736 importance: 0.1,
737 tags: vec![],
738 metadata: None,
739 created_at: 0,
740 last_accessed_at: 0,
741 access_count: 100,
742 ttl_seconds: None,
743 expires_at: None,
744 };
745 let score = AutoPilotEngine::retention_score(&mem);
746 assert!((score - 1.1).abs() < 0.001, "expected 1.1, got {score}");
748
749 mem.access_count = 0;
750 mem.importance = 1.0;
751 let score2 = AutoPilotEngine::retention_score(&mem);
752 assert!((score2 - 1.0).abs() < 0.001);
753 }
754
755 #[test]
758 fn test_autopilot_config_from_env_defaults() {
759 let _guard = ENV_LOCK.lock().unwrap();
760 std::env::remove_var("DAKERA_AUTOPILOT_ENABLED");
761 std::env::remove_var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD");
762 std::env::remove_var("DAKERA_AUTOPILOT_DEDUP_INTERVAL_HOURS");
763 std::env::remove_var("DAKERA_AUTOPILOT_CONSOLIDATION_INTERVAL_HOURS");
764 let cfg = AutoPilotConfig::from_env();
765 assert!(cfg.enabled);
766 assert!((cfg.dedup_threshold - 0.93).abs() < 0.001);
767 assert_eq!(cfg.dedup_interval_hours, 6);
768 assert_eq!(cfg.consolidation_interval_hours, 12);
769 }
770
771 #[test]
772 fn test_autopilot_config_from_env_disabled() {
773 let _guard = ENV_LOCK.lock().unwrap();
774
775 std::env::set_var("DAKERA_AUTOPILOT_ENABLED", "false");
776 let cfg = AutoPilotConfig::from_env();
777 std::env::remove_var("DAKERA_AUTOPILOT_ENABLED");
778 assert!(!cfg.enabled);
779 }
780
781 #[test]
782 fn test_autopilot_config_from_env_custom_threshold() {
783 let _guard = ENV_LOCK.lock().unwrap();
784
785 std::env::set_var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD", "0.75");
786 let cfg = AutoPilotConfig::from_env();
787 std::env::remove_var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD");
788 assert!((cfg.dedup_threshold - 0.75).abs() < 0.001);
789 }
790
791 #[test]
794 fn test_dedup_result_default() {
795 let r = DedupResult::default();
796 assert_eq!(r.namespaces_processed, 0);
797 assert_eq!(r.memories_scanned, 0);
798 assert_eq!(r.duplicates_removed, 0);
799 }
800
801 #[test]
802 fn test_consolidation_result_default() {
803 let r = ConsolidationResult::default();
804 assert_eq!(r.namespaces_processed, 0);
805 assert_eq!(r.memories_scanned, 0);
806 assert_eq!(r.clusters_merged, 0);
807 assert_eq!(r.memories_consolidated, 0);
808 }
809}