Skip to main content

engine/
autopilot.rs

1//! Auto-Pilot Knowledge Management for Dakera AI Agent Memory Platform.
2//!
3//! Background tasks that automatically deduplicate and consolidate memories
4//! to keep the memory space clean and efficient.
5//!
6//! - **Auto-dedup** (every N hours): finds near-duplicate memories above a
7//!   similarity threshold and keeps the one with the highest retention score.
8//! - **Auto-consolidation** (every N hours): merges clusters of 3+ low-importance
9//!   memories sharing 2+ tags into single Semantic memories.
10
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13
14use common::{Memory, MemoryType};
15use storage::VectorStorage;
16use tracing;
17
18/// Configuration for the auto-pilot background tasks.
19pub struct AutoPilotConfig {
20    /// Master switch for all auto-pilot tasks.
21    pub enabled: bool,
22    /// Similarity threshold for deduplication (0.0–1.0).
23    pub dedup_threshold: f32,
24    /// How often to run deduplication (in hours).
25    pub dedup_interval_hours: u64,
26    /// How often to run consolidation (in hours).
27    pub consolidation_interval_hours: u64,
28}
29
30impl Default for AutoPilotConfig {
31    fn default() -> Self {
32        Self {
33            enabled: true,
34            dedup_threshold: 0.93,
35            dedup_interval_hours: 6,
36            consolidation_interval_hours: 12,
37        }
38    }
39}
40
41impl AutoPilotConfig {
42    /// Load configuration from environment variables.
43    pub fn from_env() -> Self {
44        let enabled: bool = std::env::var("DAKERA_AUTOPILOT_ENABLED")
45            .ok()
46            .and_then(|v| v.parse().ok())
47            .unwrap_or(true);
48
49        let dedup_threshold: f32 = std::env::var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD")
50            .ok()
51            .and_then(|v| v.parse().ok())
52            .unwrap_or(0.93);
53
54        let dedup_interval_hours: u64 = std::env::var("DAKERA_AUTOPILOT_DEDUP_INTERVAL_HOURS")
55            .ok()
56            .and_then(|v| v.parse().ok())
57            .unwrap_or(6);
58
59        let consolidation_interval_hours: u64 =
60            std::env::var("DAKERA_AUTOPILOT_CONSOLIDATION_INTERVAL_HOURS")
61                .ok()
62                .and_then(|v| v.parse().ok())
63                .unwrap_or(12);
64
65        Self {
66            enabled,
67            dedup_threshold,
68            dedup_interval_hours,
69            consolidation_interval_hours,
70        }
71    }
72}
73
74/// Result of a deduplication cycle.
75#[derive(Debug, Default)]
76pub struct DedupResult {
77    pub namespaces_processed: usize,
78    pub memories_scanned: usize,
79    pub duplicates_removed: usize,
80}
81
82/// Result of a consolidation cycle.
83#[derive(Debug, Default)]
84pub struct ConsolidationResult {
85    pub namespaces_processed: usize,
86    pub memories_scanned: usize,
87    pub clusters_merged: usize,
88    pub memories_consolidated: usize,
89}
90
91/// Auto-Pilot engine that runs deduplication and consolidation as background tasks.
92pub struct AutoPilotEngine {
93    pub config: AutoPilotConfig,
94}
95
96impl AutoPilotEngine {
97    pub fn new(config: AutoPilotConfig) -> Self {
98        Self { config }
99    }
100
101    /// Compute cosine similarity between two embedding vectors.
102    fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
103        if a.len() != b.len() || a.is_empty() {
104            return 0.0;
105        }
106        let mut dot = 0.0_f64;
107        let mut norm_a = 0.0_f64;
108        let mut norm_b = 0.0_f64;
109        for (x, y) in a.iter().zip(b.iter()) {
110            let xd = *x as f64;
111            let yd = *y as f64;
112            dot += xd * yd;
113            norm_a += xd * xd;
114            norm_b += yd * yd;
115        }
116        let denom = norm_a.sqrt() * norm_b.sqrt();
117        if denom == 0.0 {
118            0.0
119        } else {
120            (dot / denom) as f32
121        }
122    }
123
124    /// Retention score: higher = keep this memory over duplicates.
125    fn retention_score(memory: &Memory) -> f64 {
126        memory.importance as f64 + memory.access_count as f64 * 0.01
127    }
128
129    /// Run deduplication across all agent namespaces.
130    ///
131    /// For each `_dakera_agent_*` namespace, compares all memory pairs using
132    /// cosine similarity on their stored embeddings. When a pair exceeds the
133    /// threshold, the memory with the lower retention score is deleted.
134    pub async fn run_dedup(&self, storage: &Arc<dyn VectorStorage>) -> DedupResult {
135        let mut result = DedupResult::default();
136
137        let namespaces = match storage.list_namespaces().await {
138            Ok(ns) => ns,
139            Err(e) => {
140                tracing::error!(error = %e, "Auto-dedup: failed to list namespaces");
141                return result;
142            }
143        };
144
145        for namespace in namespaces {
146            if !namespace.starts_with("_dakera_agent_") {
147                continue;
148            }
149            result.namespaces_processed += 1;
150
151            let vectors = match storage.get_all(&namespace).await {
152                Ok(v) => v,
153                Err(e) => {
154                    tracing::warn!(
155                        namespace = %namespace,
156                        error = %e,
157                        "Auto-dedup: failed to get vectors"
158                    );
159                    continue;
160                }
161            };
162
163            // Parse memories paired with their stored embeddings
164            let items: Vec<(Memory, &[f32])> = vectors
165                .iter()
166                .filter_map(|v| {
167                    let mem = Memory::from_vector(v)?;
168                    if v.values.is_empty() {
169                        return None;
170                    }
171                    Some((mem, v.values.as_slice()))
172                })
173                .collect();
174
175            result.memories_scanned += items.len();
176
177            // Pairwise comparison — O(n²) but runs infrequently on bounded namespaces
178            let mut to_delete: HashSet<String> = HashSet::new();
179
180            for i in 0..items.len() {
181                if to_delete.contains(&items[i].0.id) {
182                    continue;
183                }
184                for j in (i + 1)..items.len() {
185                    if to_delete.contains(&items[j].0.id) {
186                        continue;
187                    }
188                    let sim = Self::cosine_similarity(items[i].1, items[j].1);
189                    if sim >= self.config.dedup_threshold {
190                        // Keep the one with the higher retention score
191                        if Self::retention_score(&items[i].0) >= Self::retention_score(&items[j].0)
192                        {
193                            to_delete.insert(items[j].0.id.clone());
194                        } else {
195                            to_delete.insert(items[i].0.id.clone());
196                            break; // i is deleted, skip remaining j comparisons
197                        }
198                    }
199                }
200            }
201
202            if !to_delete.is_empty() {
203                let ids: Vec<String> = to_delete.into_iter().collect();
204                result.duplicates_removed += ids.len();
205                if let Err(e) = storage.delete(&namespace, &ids).await {
206                    tracing::warn!(
207                        namespace = %namespace,
208                        count = ids.len(),
209                        error = %e,
210                        "Auto-dedup: failed to delete duplicates"
211                    );
212                }
213            }
214        }
215
216        tracing::info!(
217            namespaces = result.namespaces_processed,
218            scanned = result.memories_scanned,
219            removed = result.duplicates_removed,
220            "Auto-dedup cycle completed"
221        );
222
223        result
224    }
225
226    /// Run consolidation across all agent namespaces.
227    ///
228    /// Finds clusters of 3+ memories sharing 2+ tags where ALL members have
229    /// importance < 0.3, then merges each cluster into a single Semantic memory
230    /// with combined content, union of tags, and max importance.
231    pub async fn run_consolidation(&self, storage: &Arc<dyn VectorStorage>) -> ConsolidationResult {
232        let mut result = ConsolidationResult::default();
233
234        let namespaces = match storage.list_namespaces().await {
235            Ok(ns) => ns,
236            Err(e) => {
237                tracing::error!(error = %e, "Auto-consolidation: failed to list namespaces");
238                return result;
239            }
240        };
241
242        for namespace in namespaces {
243            if !namespace.starts_with("_dakera_agent_") {
244                continue;
245            }
246            result.namespaces_processed += 1;
247
248            let vectors = match storage.get_all(&namespace).await {
249                Ok(v) => v,
250                Err(e) => {
251                    tracing::warn!(
252                        namespace = %namespace,
253                        error = %e,
254                        "Auto-consolidation: failed to get vectors"
255                    );
256                    continue;
257                }
258            };
259
260            // Only consider low-importance memories with embeddings and tags
261            let items: Vec<(Memory, Vec<f32>)> = vectors
262                .iter()
263                .filter_map(|v| {
264                    let mem = Memory::from_vector(v)?;
265                    if mem.importance >= 0.3 || v.values.is_empty() || mem.tags.is_empty() {
266                        return None;
267                    }
268                    Some((mem, v.values.clone()))
269                })
270                .collect();
271
272            result.memories_scanned += items.len();
273
274            if items.len() < 3 {
275                continue;
276            }
277
278            // Build tag → memory-index mapping
279            let mut tag_to_indices: HashMap<&str, Vec<usize>> = HashMap::new();
280            for (i, (mem, _)) in items.iter().enumerate() {
281                for tag in &mem.tags {
282                    tag_to_indices.entry(tag.as_str()).or_default().push(i);
283                }
284            }
285
286            // Count shared tags between each pair
287            let mut pair_shared_tags: HashMap<(usize, usize), usize> = HashMap::new();
288            for indices in tag_to_indices.values() {
289                for ai in 0..indices.len() {
290                    for bi in (ai + 1)..indices.len() {
291                        let key = (indices[ai], indices[bi]);
292                        *pair_shared_tags.entry(key).or_default() += 1;
293                    }
294                }
295            }
296
297            // Build adjacency graph of memories sharing 2+ tags
298            let mut adjacency: HashMap<usize, HashSet<usize>> = HashMap::new();
299            for (&(a, b), &count) in &pair_shared_tags {
300                if count >= 2 {
301                    adjacency.entry(a).or_default().insert(b);
302                    adjacency.entry(b).or_default().insert(a);
303                }
304            }
305
306            // Find connected components (clusters of size >= 3)
307            let mut visited: HashSet<usize> = HashSet::new();
308            let mut clusters: Vec<Vec<usize>> = Vec::new();
309
310            for &node in adjacency.keys() {
311                if visited.contains(&node) {
312                    continue;
313                }
314                let mut cluster = Vec::new();
315                let mut stack = vec![node];
316                while let Some(n) = stack.pop() {
317                    if visited.insert(n) {
318                        cluster.push(n);
319                        if let Some(neighbors) = adjacency.get(&n) {
320                            for &nb in neighbors {
321                                if !visited.contains(&nb) {
322                                    stack.push(nb);
323                                }
324                            }
325                        }
326                    }
327                }
328                if cluster.len() >= 3 {
329                    clusters.push(cluster);
330                }
331            }
332
333            // Merge each qualifying cluster
334            for (ci, cluster) in clusters.iter().enumerate() {
335                let memories: Vec<&Memory> = cluster.iter().map(|&i| &items[i].0).collect();
336                let embeddings: Vec<&Vec<f32>> = cluster.iter().map(|&i| &items[i].1).collect();
337
338                let max_importance = memories
339                    .iter()
340                    .map(|m| m.importance)
341                    .fold(0.0_f32, f32::max);
342
343                // Union all tags
344                let mut all_tags: Vec<String> =
345                    memories.iter().flat_map(|m| m.tags.clone()).collect();
346                all_tags.sort();
347                all_tags.dedup();
348
349                // Combine content
350                let combined_content: String = memories
351                    .iter()
352                    .map(|m| m.content.as_str())
353                    .collect::<Vec<_>>()
354                    .join("\n---\n");
355
356                // Average embeddings for the merged vector
357                let dim = embeddings[0].len();
358                let mut avg_embedding = vec![0.0_f32; dim];
359                for emb in &embeddings {
360                    for (i, v) in emb.iter().enumerate() {
361                        avg_embedding[i] += v;
362                    }
363                }
364                let count = embeddings.len() as f32;
365                for v in &mut avg_embedding {
366                    *v /= count;
367                }
368
369                let now = std::time::SystemTime::now()
370                    .duration_since(std::time::UNIX_EPOCH)
371                    .unwrap_or_default()
372                    .as_nanos();
373
374                let agent_id = memories[0].agent_id.clone();
375                let merged_id = format!("mem_consolidated_{:x}_{}", now, ci);
376
377                let merged_memory = Memory {
378                    id: merged_id,
379                    memory_type: MemoryType::Semantic,
380                    content: combined_content,
381                    agent_id,
382                    session_id: None,
383                    importance: max_importance,
384                    tags: all_tags,
385                    metadata: None,
386                    created_at: (now / 1_000_000_000) as u64,
387                    last_accessed_at: (now / 1_000_000_000) as u64,
388                    access_count: 0,
389                    ttl_seconds: None,
390                    expires_at: None,
391                };
392
393                let merged_vector = merged_memory.to_vector(avg_embedding);
394
395                // Delete originals, then insert merged
396                let ids_to_delete: Vec<String> = memories.iter().map(|m| m.id.clone()).collect();
397
398                if let Err(e) = storage.delete(&namespace, &ids_to_delete).await {
399                    tracing::warn!(
400                        namespace = %namespace,
401                        error = %e,
402                        "Auto-consolidation: failed to delete originals"
403                    );
404                    continue;
405                }
406
407                if let Err(e) = storage.upsert(&namespace, vec![merged_vector]).await {
408                    tracing::warn!(
409                        namespace = %namespace,
410                        error = %e,
411                        "Auto-consolidation: failed to insert merged memory"
412                    );
413                    continue;
414                }
415
416                result.clusters_merged += 1;
417                result.memories_consolidated += ids_to_delete.len();
418            }
419        }
420
421        tracing::info!(
422            namespaces = result.namespaces_processed,
423            scanned = result.memories_scanned,
424            clusters = result.clusters_merged,
425            consolidated = result.memories_consolidated,
426            "Auto-consolidation cycle completed"
427        );
428
429        result
430    }
431
432    /// Spawn the auto-pilot as two background tokio tasks (dedup + consolidation).
433    ///
434    /// Returns `None` if auto-pilot is disabled via configuration.
435    pub fn spawn(
436        config: AutoPilotConfig,
437        storage: Arc<dyn VectorStorage>,
438        metrics: Arc<crate::decay::BackgroundMetrics>,
439    ) -> Option<(tokio::task::JoinHandle<()>, tokio::task::JoinHandle<()>)> {
440        if !config.enabled {
441            tracing::info!("Auto-pilot disabled (DAKERA_AUTOPILOT_ENABLED=false)");
442            return None;
443        }
444
445        let dedup_interval = std::time::Duration::from_secs(config.dedup_interval_hours * 3600);
446        let consolidation_interval =
447            std::time::Duration::from_secs(config.consolidation_interval_hours * 3600);
448        let dedup_threshold = config.dedup_threshold;
449
450        tracing::info!(
451            dedup_interval_hours = config.dedup_interval_hours,
452            consolidation_interval_hours = config.consolidation_interval_hours,
453            dedup_threshold = dedup_threshold,
454            "Auto-pilot started"
455        );
456
457        let storage_dedup = storage.clone();
458        let metrics_dedup = metrics.clone();
459        let dedup_handle = tokio::spawn(async move {
460            let engine = AutoPilotEngine::new(AutoPilotConfig {
461                enabled: true,
462                dedup_threshold,
463                ..Default::default()
464            });
465            loop {
466                tokio::time::sleep(dedup_interval).await;
467                let result = engine.run_dedup(&storage_dedup).await;
468                metrics_dedup.record_dedup(
469                    result.namespaces_processed,
470                    result.memories_scanned,
471                    result.duplicates_removed,
472                );
473            }
474        });
475
476        let consolidation_handle = tokio::spawn(async move {
477            let engine = AutoPilotEngine::new(AutoPilotConfig::default());
478            loop {
479                tokio::time::sleep(consolidation_interval).await;
480                let result = engine.run_consolidation(&storage).await;
481                metrics.record_consolidation(
482                    result.namespaces_processed,
483                    result.memories_scanned,
484                    result.clusters_merged,
485                    result.memories_consolidated,
486                );
487            }
488        });
489
490        Some((dedup_handle, consolidation_handle))
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497
498    #[test]
499    fn test_cosine_similarity_identical() {
500        let a = vec![1.0, 0.0, 0.0];
501        let b = vec![1.0, 0.0, 0.0];
502        let sim = AutoPilotEngine::cosine_similarity(&a, &b);
503        assert!((sim - 1.0).abs() < 0.001);
504    }
505
506    #[test]
507    fn test_cosine_similarity_orthogonal() {
508        let a = vec![1.0, 0.0, 0.0];
509        let b = vec![0.0, 1.0, 0.0];
510        let sim = AutoPilotEngine::cosine_similarity(&a, &b);
511        assert!(sim.abs() < 0.001);
512    }
513
514    #[test]
515    fn test_cosine_similarity_opposite() {
516        let a = vec![1.0, 0.0];
517        let b = vec![-1.0, 0.0];
518        let sim = AutoPilotEngine::cosine_similarity(&a, &b);
519        assert!((sim - (-1.0)).abs() < 0.001);
520    }
521
522    #[test]
523    fn test_cosine_similarity_empty() {
524        let sim = AutoPilotEngine::cosine_similarity(&[], &[]);
525        assert!(sim.abs() < 0.001);
526    }
527
528    #[test]
529    fn test_retention_score() {
530        let mut mem = Memory {
531            id: "test".to_string(),
532            memory_type: MemoryType::Episodic,
533            content: "test".to_string(),
534            agent_id: "agent".to_string(),
535            session_id: None,
536            importance: 0.5,
537            tags: vec![],
538            metadata: None,
539            created_at: 0,
540            last_accessed_at: 0,
541            access_count: 10,
542            ttl_seconds: None,
543            expires_at: None,
544        };
545        let score_a = AutoPilotEngine::retention_score(&mem);
546
547        mem.importance = 0.8;
548        mem.access_count = 0;
549        let score_b = AutoPilotEngine::retention_score(&mem);
550
551        // 0.5 + 10*0.01 = 0.6  vs  0.8 + 0 = 0.8
552        assert!((score_a - 0.6).abs() < 0.001);
553        assert!((score_b - 0.8).abs() < 0.001);
554    }
555
556    #[test]
557    fn test_config_defaults() {
558        let config = AutoPilotConfig::default();
559        assert!(config.enabled);
560        assert!((config.dedup_threshold - 0.93).abs() < 0.001);
561        assert_eq!(config.dedup_interval_hours, 6);
562        assert_eq!(config.consolidation_interval_hours, 12);
563    }
564}