1use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13
14use common::{Memory, MemoryType};
15use storage::VectorStorage;
16use tracing;
17
18pub struct AutoPilotConfig {
20 pub enabled: bool,
22 pub dedup_threshold: f32,
24 pub dedup_interval_hours: u64,
26 pub consolidation_interval_hours: u64,
28}
29
30impl Default for AutoPilotConfig {
31 fn default() -> Self {
32 Self {
33 enabled: true,
34 dedup_threshold: 0.93,
35 dedup_interval_hours: 6,
36 consolidation_interval_hours: 12,
37 }
38 }
39}
40
41impl AutoPilotConfig {
42 pub fn from_env() -> Self {
44 let enabled: bool = std::env::var("DAKERA_AUTOPILOT_ENABLED")
45 .ok()
46 .and_then(|v| v.parse().ok())
47 .unwrap_or(true);
48
49 let dedup_threshold: f32 = std::env::var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD")
50 .ok()
51 .and_then(|v| v.parse().ok())
52 .unwrap_or(0.93);
53
54 let dedup_interval_hours: u64 = std::env::var("DAKERA_AUTOPILOT_DEDUP_INTERVAL_HOURS")
55 .ok()
56 .and_then(|v| v.parse().ok())
57 .unwrap_or(6);
58
59 let consolidation_interval_hours: u64 =
60 std::env::var("DAKERA_AUTOPILOT_CONSOLIDATION_INTERVAL_HOURS")
61 .ok()
62 .and_then(|v| v.parse().ok())
63 .unwrap_or(12);
64
65 Self {
66 enabled,
67 dedup_threshold,
68 dedup_interval_hours,
69 consolidation_interval_hours,
70 }
71 }
72}
73
74#[derive(Debug, Default)]
76pub struct DedupResult {
77 pub namespaces_processed: usize,
78 pub memories_scanned: usize,
79 pub duplicates_removed: usize,
80}
81
82#[derive(Debug, Default)]
84pub struct ConsolidationResult {
85 pub namespaces_processed: usize,
86 pub memories_scanned: usize,
87 pub clusters_merged: usize,
88 pub memories_consolidated: usize,
89}
90
91pub struct AutoPilotEngine {
93 pub config: AutoPilotConfig,
94}
95
96impl AutoPilotEngine {
97 pub fn new(config: AutoPilotConfig) -> Self {
98 Self { config }
99 }
100
101 fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
103 if a.len() != b.len() || a.is_empty() {
104 return 0.0;
105 }
106 let mut dot = 0.0_f64;
107 let mut norm_a = 0.0_f64;
108 let mut norm_b = 0.0_f64;
109 for (x, y) in a.iter().zip(b.iter()) {
110 let xd = *x as f64;
111 let yd = *y as f64;
112 dot += xd * yd;
113 norm_a += xd * xd;
114 norm_b += yd * yd;
115 }
116 let denom = norm_a.sqrt() * norm_b.sqrt();
117 if denom == 0.0 {
118 0.0
119 } else {
120 (dot / denom) as f32
121 }
122 }
123
124 fn retention_score(memory: &Memory) -> f64 {
126 memory.importance as f64 + memory.access_count as f64 * 0.01
127 }
128
129 pub async fn run_dedup(&self, storage: &Arc<dyn VectorStorage>) -> DedupResult {
135 let mut result = DedupResult::default();
136
137 let namespaces = match storage.list_namespaces().await {
138 Ok(ns) => ns,
139 Err(e) => {
140 tracing::error!(error = %e, "Auto-dedup: failed to list namespaces");
141 return result;
142 }
143 };
144
145 for namespace in namespaces {
146 if !namespace.starts_with("_dakera_agent_") {
147 continue;
148 }
149 result.namespaces_processed += 1;
150
151 let vectors = match storage.get_all(&namespace).await {
152 Ok(v) => v,
153 Err(e) => {
154 tracing::warn!(
155 namespace = %namespace,
156 error = %e,
157 "Auto-dedup: failed to get vectors"
158 );
159 continue;
160 }
161 };
162
163 let items: Vec<(Memory, &[f32])> = vectors
165 .iter()
166 .filter_map(|v| {
167 let mem = Memory::from_vector(v)?;
168 if v.values.is_empty() {
169 return None;
170 }
171 Some((mem, v.values.as_slice()))
172 })
173 .collect();
174
175 result.memories_scanned += items.len();
176
177 let mut to_delete: HashSet<String> = HashSet::new();
179
180 for i in 0..items.len() {
181 if to_delete.contains(&items[i].0.id) {
182 continue;
183 }
184 for j in (i + 1)..items.len() {
185 if to_delete.contains(&items[j].0.id) {
186 continue;
187 }
188 let sim = Self::cosine_similarity(items[i].1, items[j].1);
189 if sim >= self.config.dedup_threshold {
190 if Self::retention_score(&items[i].0) >= Self::retention_score(&items[j].0)
192 {
193 to_delete.insert(items[j].0.id.clone());
194 } else {
195 to_delete.insert(items[i].0.id.clone());
196 break; }
198 }
199 }
200 }
201
202 if !to_delete.is_empty() {
203 let ids: Vec<String> = to_delete.into_iter().collect();
204 result.duplicates_removed += ids.len();
205 if let Err(e) = storage.delete(&namespace, &ids).await {
206 tracing::warn!(
207 namespace = %namespace,
208 count = ids.len(),
209 error = %e,
210 "Auto-dedup: failed to delete duplicates"
211 );
212 }
213 }
214 }
215
216 tracing::info!(
217 namespaces = result.namespaces_processed,
218 scanned = result.memories_scanned,
219 removed = result.duplicates_removed,
220 "Auto-dedup cycle completed"
221 );
222
223 result
224 }
225
226 pub async fn run_consolidation(&self, storage: &Arc<dyn VectorStorage>) -> ConsolidationResult {
232 let mut result = ConsolidationResult::default();
233
234 let namespaces = match storage.list_namespaces().await {
235 Ok(ns) => ns,
236 Err(e) => {
237 tracing::error!(error = %e, "Auto-consolidation: failed to list namespaces");
238 return result;
239 }
240 };
241
242 for namespace in namespaces {
243 if !namespace.starts_with("_dakera_agent_") {
244 continue;
245 }
246 result.namespaces_processed += 1;
247
248 let vectors = match storage.get_all(&namespace).await {
249 Ok(v) => v,
250 Err(e) => {
251 tracing::warn!(
252 namespace = %namespace,
253 error = %e,
254 "Auto-consolidation: failed to get vectors"
255 );
256 continue;
257 }
258 };
259
260 let items: Vec<(Memory, Vec<f32>)> = vectors
262 .iter()
263 .filter_map(|v| {
264 let mem = Memory::from_vector(v)?;
265 if mem.importance >= 0.3 || v.values.is_empty() || mem.tags.is_empty() {
266 return None;
267 }
268 Some((mem, v.values.clone()))
269 })
270 .collect();
271
272 result.memories_scanned += items.len();
273
274 if items.len() < 3 {
275 continue;
276 }
277
278 let mut tag_to_indices: HashMap<&str, Vec<usize>> = HashMap::new();
280 for (i, (mem, _)) in items.iter().enumerate() {
281 for tag in &mem.tags {
282 tag_to_indices.entry(tag.as_str()).or_default().push(i);
283 }
284 }
285
286 let mut pair_shared_tags: HashMap<(usize, usize), usize> = HashMap::new();
288 for indices in tag_to_indices.values() {
289 for ai in 0..indices.len() {
290 for bi in (ai + 1)..indices.len() {
291 let key = (indices[ai], indices[bi]);
292 *pair_shared_tags.entry(key).or_default() += 1;
293 }
294 }
295 }
296
297 let mut adjacency: HashMap<usize, HashSet<usize>> = HashMap::new();
299 for (&(a, b), &count) in &pair_shared_tags {
300 if count >= 2 {
301 adjacency.entry(a).or_default().insert(b);
302 adjacency.entry(b).or_default().insert(a);
303 }
304 }
305
306 let mut visited: HashSet<usize> = HashSet::new();
308 let mut clusters: Vec<Vec<usize>> = Vec::new();
309
310 for &node in adjacency.keys() {
311 if visited.contains(&node) {
312 continue;
313 }
314 let mut cluster = Vec::new();
315 let mut stack = vec![node];
316 while let Some(n) = stack.pop() {
317 if visited.insert(n) {
318 cluster.push(n);
319 if let Some(neighbors) = adjacency.get(&n) {
320 for &nb in neighbors {
321 if !visited.contains(&nb) {
322 stack.push(nb);
323 }
324 }
325 }
326 }
327 }
328 if cluster.len() >= 3 {
329 clusters.push(cluster);
330 }
331 }
332
333 for (ci, cluster) in clusters.iter().enumerate() {
335 let memories: Vec<&Memory> = cluster.iter().map(|&i| &items[i].0).collect();
336 let embeddings: Vec<&Vec<f32>> = cluster.iter().map(|&i| &items[i].1).collect();
337
338 let max_importance = memories
339 .iter()
340 .map(|m| m.importance)
341 .fold(0.0_f32, f32::max);
342
343 let mut all_tags: Vec<String> =
345 memories.iter().flat_map(|m| m.tags.clone()).collect();
346 all_tags.sort();
347 all_tags.dedup();
348
349 let combined_content: String = memories
351 .iter()
352 .map(|m| m.content.as_str())
353 .collect::<Vec<_>>()
354 .join("\n---\n");
355
356 let dim = embeddings[0].len();
358 let mut avg_embedding = vec![0.0_f32; dim];
359 for emb in &embeddings {
360 for (i, v) in emb.iter().enumerate() {
361 avg_embedding[i] += v;
362 }
363 }
364 let count = embeddings.len() as f32;
365 for v in &mut avg_embedding {
366 *v /= count;
367 }
368
369 let now = std::time::SystemTime::now()
370 .duration_since(std::time::UNIX_EPOCH)
371 .unwrap_or_default()
372 .as_nanos();
373
374 let agent_id = memories[0].agent_id.clone();
375 let merged_id = format!("mem_consolidated_{:x}_{}", now, ci);
376
377 let merged_memory = Memory {
378 id: merged_id,
379 memory_type: MemoryType::Semantic,
380 content: combined_content,
381 agent_id,
382 session_id: None,
383 importance: max_importance,
384 tags: all_tags,
385 metadata: None,
386 created_at: (now / 1_000_000_000) as u64,
387 last_accessed_at: (now / 1_000_000_000) as u64,
388 access_count: 0,
389 ttl_seconds: None,
390 expires_at: None,
391 };
392
393 let merged_vector = merged_memory.to_vector(avg_embedding);
394
395 let ids_to_delete: Vec<String> = memories.iter().map(|m| m.id.clone()).collect();
397
398 if let Err(e) = storage.delete(&namespace, &ids_to_delete).await {
399 tracing::warn!(
400 namespace = %namespace,
401 error = %e,
402 "Auto-consolidation: failed to delete originals"
403 );
404 continue;
405 }
406
407 if let Err(e) = storage.upsert(&namespace, vec![merged_vector]).await {
408 tracing::warn!(
409 namespace = %namespace,
410 error = %e,
411 "Auto-consolidation: failed to insert merged memory"
412 );
413 continue;
414 }
415
416 result.clusters_merged += 1;
417 result.memories_consolidated += ids_to_delete.len();
418 }
419 }
420
421 tracing::info!(
422 namespaces = result.namespaces_processed,
423 scanned = result.memories_scanned,
424 clusters = result.clusters_merged,
425 consolidated = result.memories_consolidated,
426 "Auto-consolidation cycle completed"
427 );
428
429 result
430 }
431
432 pub fn spawn(
436 config: AutoPilotConfig,
437 storage: Arc<dyn VectorStorage>,
438 metrics: Arc<crate::decay::BackgroundMetrics>,
439 ) -> Option<(tokio::task::JoinHandle<()>, tokio::task::JoinHandle<()>)> {
440 if !config.enabled {
441 tracing::info!("Auto-pilot disabled (DAKERA_AUTOPILOT_ENABLED=false)");
442 return None;
443 }
444
445 let dedup_interval = std::time::Duration::from_secs(config.dedup_interval_hours * 3600);
446 let consolidation_interval =
447 std::time::Duration::from_secs(config.consolidation_interval_hours * 3600);
448 let dedup_threshold = config.dedup_threshold;
449
450 tracing::info!(
451 dedup_interval_hours = config.dedup_interval_hours,
452 consolidation_interval_hours = config.consolidation_interval_hours,
453 dedup_threshold = dedup_threshold,
454 "Auto-pilot started"
455 );
456
457 let storage_dedup = storage.clone();
458 let metrics_dedup = metrics.clone();
459 let dedup_handle = tokio::spawn(async move {
460 let engine = AutoPilotEngine::new(AutoPilotConfig {
461 enabled: true,
462 dedup_threshold,
463 ..Default::default()
464 });
465 loop {
466 tokio::time::sleep(dedup_interval).await;
467 let result = engine.run_dedup(&storage_dedup).await;
468 metrics_dedup.record_dedup(
469 result.namespaces_processed,
470 result.memories_scanned,
471 result.duplicates_removed,
472 );
473 }
474 });
475
476 let consolidation_handle = tokio::spawn(async move {
477 let engine = AutoPilotEngine::new(AutoPilotConfig::default());
478 loop {
479 tokio::time::sleep(consolidation_interval).await;
480 let result = engine.run_consolidation(&storage).await;
481 metrics.record_consolidation(
482 result.namespaces_processed,
483 result.memories_scanned,
484 result.clusters_merged,
485 result.memories_consolidated,
486 );
487 }
488 });
489
490 Some((dedup_handle, consolidation_handle))
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497
498 #[test]
499 fn test_cosine_similarity_identical() {
500 let a = vec![1.0, 0.0, 0.0];
501 let b = vec![1.0, 0.0, 0.0];
502 let sim = AutoPilotEngine::cosine_similarity(&a, &b);
503 assert!((sim - 1.0).abs() < 0.001);
504 }
505
506 #[test]
507 fn test_cosine_similarity_orthogonal() {
508 let a = vec![1.0, 0.0, 0.0];
509 let b = vec![0.0, 1.0, 0.0];
510 let sim = AutoPilotEngine::cosine_similarity(&a, &b);
511 assert!(sim.abs() < 0.001);
512 }
513
514 #[test]
515 fn test_cosine_similarity_opposite() {
516 let a = vec![1.0, 0.0];
517 let b = vec![-1.0, 0.0];
518 let sim = AutoPilotEngine::cosine_similarity(&a, &b);
519 assert!((sim - (-1.0)).abs() < 0.001);
520 }
521
522 #[test]
523 fn test_cosine_similarity_empty() {
524 let sim = AutoPilotEngine::cosine_similarity(&[], &[]);
525 assert!(sim.abs() < 0.001);
526 }
527
528 #[test]
529 fn test_retention_score() {
530 let mut mem = Memory {
531 id: "test".to_string(),
532 memory_type: MemoryType::Episodic,
533 content: "test".to_string(),
534 agent_id: "agent".to_string(),
535 session_id: None,
536 importance: 0.5,
537 tags: vec![],
538 metadata: None,
539 created_at: 0,
540 last_accessed_at: 0,
541 access_count: 10,
542 ttl_seconds: None,
543 expires_at: None,
544 };
545 let score_a = AutoPilotEngine::retention_score(&mem);
546
547 mem.importance = 0.8;
548 mem.access_count = 0;
549 let score_b = AutoPilotEngine::retention_score(&mem);
550
551 assert!((score_a - 0.6).abs() < 0.001);
553 assert!((score_b - 0.8).abs() < 0.001);
554 }
555
556 #[test]
557 fn test_config_defaults() {
558 let config = AutoPilotConfig::default();
559 assert!(config.enabled);
560 assert!((config.dedup_threshold - 0.93).abs() < 0.001);
561 assert_eq!(config.dedup_interval_hours, 6);
562 assert_eq!(config.consolidation_interval_hours, 12);
563 }
564}