Skip to main content

engine/
autopilot.rs

1//! Auto-Pilot Knowledge Management for Dakera AI Agent Memory Platform.
2//!
3//! Background tasks that automatically deduplicate and consolidate memories
4//! to keep the memory space clean and efficient.
5//!
6//! - **Auto-dedup** (every N hours): finds near-duplicate memories above a
7//!   similarity threshold and keeps the one with the highest retention score.
8//! - **Auto-consolidation** (every N hours): merges clusters of 3+ low-importance
9//!   memories sharing 2+ tags into single Semantic memories.
10
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13
14use common::{Memory, MemoryType};
15use storage::{RedisCache, VectorStorage};
16use tokio::sync::RwLock;
17use tracing;
18
19/// Configuration for the auto-pilot background tasks.
20#[derive(Clone)]
21pub struct AutoPilotConfig {
22    /// Master switch for all auto-pilot tasks.
23    pub enabled: bool,
24    /// Similarity threshold for deduplication (0.0–1.0).
25    pub dedup_threshold: f32,
26    /// How often to run deduplication (in hours).
27    pub dedup_interval_hours: u64,
28    /// How often to run consolidation (in hours).
29    pub consolidation_interval_hours: u64,
30}
31
32impl Default for AutoPilotConfig {
33    fn default() -> Self {
34        Self {
35            enabled: true,
36            dedup_threshold: 0.93,
37            dedup_interval_hours: 6,
38            consolidation_interval_hours: 12,
39        }
40    }
41}
42
43impl AutoPilotConfig {
44    /// Load configuration from environment variables.
45    pub fn from_env() -> Self {
46        let enabled: bool = std::env::var("DAKERA_AUTOPILOT_ENABLED")
47            .ok()
48            .and_then(|v| v.parse().ok())
49            .unwrap_or(true);
50
51        let dedup_threshold: f32 = std::env::var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD")
52            .ok()
53            .and_then(|v| v.parse().ok())
54            .unwrap_or(0.93);
55
56        let dedup_interval_hours: u64 = std::env::var("DAKERA_AUTOPILOT_DEDUP_INTERVAL_HOURS")
57            .ok()
58            .and_then(|v| v.parse().ok())
59            .unwrap_or(6);
60
61        let consolidation_interval_hours: u64 =
62            std::env::var("DAKERA_AUTOPILOT_CONSOLIDATION_INTERVAL_HOURS")
63                .ok()
64                .and_then(|v| v.parse().ok())
65                .unwrap_or(12);
66
67        Self {
68            enabled,
69            dedup_threshold,
70            dedup_interval_hours,
71            consolidation_interval_hours,
72        }
73    }
74}
75
76/// Result of a deduplication cycle.
77#[derive(Debug, Default)]
78pub struct DedupResult {
79    pub namespaces_processed: usize,
80    pub memories_scanned: usize,
81    pub duplicates_removed: usize,
82}
83
84/// Result of a consolidation cycle.
85#[derive(Debug, Default)]
86pub struct ConsolidationResult {
87    pub namespaces_processed: usize,
88    pub memories_scanned: usize,
89    pub clusters_merged: usize,
90    pub memories_consolidated: usize,
91}
92
93/// Auto-Pilot engine that runs deduplication and consolidation as background tasks.
94pub struct AutoPilotEngine {
95    pub config: AutoPilotConfig,
96}
97
98impl AutoPilotEngine {
99    pub fn new(config: AutoPilotConfig) -> Self {
100        Self { config }
101    }
102
103    /// Compute cosine similarity between two embedding vectors.
104    fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
105        if a.len() != b.len() || a.is_empty() {
106            return 0.0;
107        }
108        let mut dot = 0.0_f64;
109        let mut norm_a = 0.0_f64;
110        let mut norm_b = 0.0_f64;
111        for (x, y) in a.iter().zip(b.iter()) {
112            let xd = *x as f64;
113            let yd = *y as f64;
114            dot += xd * yd;
115            norm_a += xd * xd;
116            norm_b += yd * yd;
117        }
118        let denom = norm_a.sqrt() * norm_b.sqrt();
119        if denom == 0.0 {
120            0.0
121        } else {
122            (dot / denom) as f32
123        }
124    }
125
126    /// Retention score: higher = keep this memory over duplicates.
127    fn retention_score(memory: &Memory) -> f64 {
128        memory.importance as f64 + memory.access_count as f64 * 0.01
129    }
130
131    /// Run deduplication across all agent namespaces.
132    ///
133    /// For each `_dakera_agent_*` namespace, compares all memory pairs using
134    /// cosine similarity on their stored embeddings. When a pair exceeds the
135    /// threshold, the memory with the lower retention score is deleted.
136    pub async fn run_dedup(&self, storage: &Arc<dyn VectorStorage>) -> DedupResult {
137        let mut result = DedupResult::default();
138
139        let namespaces = match storage.list_namespaces().await {
140            Ok(ns) => ns,
141            Err(e) => {
142                tracing::error!(error = %e, "Auto-dedup: failed to list namespaces");
143                return result;
144            }
145        };
146
147        for namespace in namespaces {
148            if !namespace.starts_with("_dakera_agent_") {
149                continue;
150            }
151            result.namespaces_processed += 1;
152
153            let vectors = match storage.get_all(&namespace).await {
154                Ok(v) => v,
155                Err(e) => {
156                    tracing::warn!(
157                        namespace = %namespace,
158                        error = %e,
159                        "Auto-dedup: failed to get vectors"
160                    );
161                    continue;
162                }
163            };
164
165            // Parse memories paired with their stored embeddings
166            let items: Vec<(Memory, &[f32])> = vectors
167                .iter()
168                .filter_map(|v| {
169                    let mem = Memory::from_vector(v)?;
170                    if v.values.is_empty() {
171                        return None;
172                    }
173                    Some((mem, v.values.as_slice()))
174                })
175                .collect();
176
177            result.memories_scanned += items.len();
178
179            // Pairwise comparison — O(n²) but runs infrequently on bounded namespaces
180            let mut to_delete: HashSet<String> = HashSet::new();
181
182            for i in 0..items.len() {
183                if to_delete.contains(&items[i].0.id) {
184                    continue;
185                }
186                for j in (i + 1)..items.len() {
187                    if to_delete.contains(&items[j].0.id) {
188                        continue;
189                    }
190                    let sim = Self::cosine_similarity(items[i].1, items[j].1);
191                    if sim >= self.config.dedup_threshold {
192                        // Keep the one with the higher retention score
193                        if Self::retention_score(&items[i].0) >= Self::retention_score(&items[j].0)
194                        {
195                            to_delete.insert(items[j].0.id.clone());
196                        } else {
197                            to_delete.insert(items[i].0.id.clone());
198                            break; // i is deleted, skip remaining j comparisons
199                        }
200                    }
201                }
202            }
203
204            if !to_delete.is_empty() {
205                let ids: Vec<String> = to_delete.into_iter().collect();
206                result.duplicates_removed += ids.len();
207                if let Err(e) = storage.delete(&namespace, &ids).await {
208                    tracing::warn!(
209                        namespace = %namespace,
210                        count = ids.len(),
211                        error = %e,
212                        "Auto-dedup: failed to delete duplicates"
213                    );
214                }
215            }
216        }
217
218        tracing::info!(
219            namespaces = result.namespaces_processed,
220            scanned = result.memories_scanned,
221            removed = result.duplicates_removed,
222            "Auto-dedup cycle completed"
223        );
224
225        result
226    }
227
228    /// Run consolidation across all agent namespaces.
229    ///
230    /// Finds clusters of 3+ memories sharing 2+ tags where ALL members have
231    /// importance < 0.3, then merges each cluster into a single Semantic memory
232    /// with combined content, union of tags, and max importance.
233    pub async fn run_consolidation(&self, storage: &Arc<dyn VectorStorage>) -> ConsolidationResult {
234        let mut result = ConsolidationResult::default();
235
236        let namespaces = match storage.list_namespaces().await {
237            Ok(ns) => ns,
238            Err(e) => {
239                tracing::error!(error = %e, "Auto-consolidation: failed to list namespaces");
240                return result;
241            }
242        };
243
244        for namespace in namespaces {
245            if !namespace.starts_with("_dakera_agent_") {
246                continue;
247            }
248            result.namespaces_processed += 1;
249
250            let vectors = match storage.get_all(&namespace).await {
251                Ok(v) => v,
252                Err(e) => {
253                    tracing::warn!(
254                        namespace = %namespace,
255                        error = %e,
256                        "Auto-consolidation: failed to get vectors"
257                    );
258                    continue;
259                }
260            };
261
262            // Only consider low-importance memories with embeddings and tags
263            let items: Vec<(Memory, Vec<f32>)> = vectors
264                .iter()
265                .filter_map(|v| {
266                    let mem = Memory::from_vector(v)?;
267                    if mem.importance >= 0.3 || v.values.is_empty() || mem.tags.is_empty() {
268                        return None;
269                    }
270                    Some((mem, v.values.clone()))
271                })
272                .collect();
273
274            result.memories_scanned += items.len();
275
276            if items.len() < 3 {
277                continue;
278            }
279
280            // Build tag → memory-index mapping
281            let mut tag_to_indices: HashMap<&str, Vec<usize>> = HashMap::new();
282            for (i, (mem, _)) in items.iter().enumerate() {
283                for tag in &mem.tags {
284                    tag_to_indices.entry(tag.as_str()).or_default().push(i);
285                }
286            }
287
288            // Count shared tags between each pair
289            let mut pair_shared_tags: HashMap<(usize, usize), usize> = HashMap::new();
290            for indices in tag_to_indices.values() {
291                for ai in 0..indices.len() {
292                    for bi in (ai + 1)..indices.len() {
293                        let key = (indices[ai], indices[bi]);
294                        *pair_shared_tags.entry(key).or_default() += 1;
295                    }
296                }
297            }
298
299            // Build adjacency graph of memories sharing 2+ tags
300            let mut adjacency: HashMap<usize, HashSet<usize>> = HashMap::new();
301            for (&(a, b), &count) in &pair_shared_tags {
302                if count >= 2 {
303                    adjacency.entry(a).or_default().insert(b);
304                    adjacency.entry(b).or_default().insert(a);
305                }
306            }
307
308            // Find connected components (clusters of size >= 3)
309            let mut visited: HashSet<usize> = HashSet::new();
310            let mut clusters: Vec<Vec<usize>> = Vec::new();
311
312            for &node in adjacency.keys() {
313                if visited.contains(&node) {
314                    continue;
315                }
316                let mut cluster = Vec::new();
317                let mut stack = vec![node];
318                while let Some(n) = stack.pop() {
319                    if visited.insert(n) {
320                        cluster.push(n);
321                        if let Some(neighbors) = adjacency.get(&n) {
322                            for &nb in neighbors {
323                                if !visited.contains(&nb) {
324                                    stack.push(nb);
325                                }
326                            }
327                        }
328                    }
329                }
330                if cluster.len() >= 3 {
331                    clusters.push(cluster);
332                }
333            }
334
335            // Merge each qualifying cluster
336            for (ci, cluster) in clusters.iter().enumerate() {
337                let memories: Vec<&Memory> = cluster.iter().map(|&i| &items[i].0).collect();
338                let embeddings: Vec<&Vec<f32>> = cluster.iter().map(|&i| &items[i].1).collect();
339
340                let max_importance = memories
341                    .iter()
342                    .map(|m| m.importance)
343                    .fold(0.0_f32, f32::max);
344
345                // Union all tags
346                let mut all_tags: Vec<String> =
347                    memories.iter().flat_map(|m| m.tags.clone()).collect();
348                all_tags.sort();
349                all_tags.dedup();
350
351                // Combine content
352                let combined_content: String = memories
353                    .iter()
354                    .map(|m| m.content.as_str())
355                    .collect::<Vec<_>>()
356                    .join("\n---\n");
357
358                // Average embeddings for the merged vector
359                let dim = embeddings[0].len();
360                let mut avg_embedding = vec![0.0_f32; dim];
361                for emb in &embeddings {
362                    for (i, v) in emb.iter().enumerate() {
363                        avg_embedding[i] += v;
364                    }
365                }
366                let count = embeddings.len() as f32;
367                for v in &mut avg_embedding {
368                    *v /= count;
369                }
370
371                let now = std::time::SystemTime::now()
372                    .duration_since(std::time::UNIX_EPOCH)
373                    .unwrap_or_default()
374                    .as_nanos();
375
376                let agent_id = memories[0].agent_id.clone();
377                let merged_id = format!("mem_consolidated_{:x}_{}", now, ci);
378
379                let merged_memory = Memory {
380                    id: merged_id,
381                    memory_type: MemoryType::Semantic,
382                    content: combined_content,
383                    agent_id,
384                    session_id: None,
385                    importance: max_importance,
386                    tags: all_tags,
387                    metadata: None,
388                    created_at: (now / 1_000_000_000) as u64,
389                    last_accessed_at: (now / 1_000_000_000) as u64,
390                    access_count: 0,
391                    ttl_seconds: None,
392                    expires_at: None,
393                };
394
395                let merged_vector = merged_memory.to_vector(avg_embedding);
396
397                // Delete originals, then insert merged
398                let ids_to_delete: Vec<String> = memories.iter().map(|m| m.id.clone()).collect();
399
400                if let Err(e) = storage.delete(&namespace, &ids_to_delete).await {
401                    tracing::warn!(
402                        namespace = %namespace,
403                        error = %e,
404                        "Auto-consolidation: failed to delete originals"
405                    );
406                    continue;
407                }
408
409                if let Err(e) = storage.upsert(&namespace, vec![merged_vector]).await {
410                    tracing::warn!(
411                        namespace = %namespace,
412                        error = %e,
413                        "Auto-consolidation: failed to insert merged memory"
414                    );
415                    continue;
416                }
417
418                result.clusters_merged += 1;
419                result.memories_consolidated += ids_to_delete.len();
420            }
421        }
422
423        tracing::info!(
424            namespaces = result.namespaces_processed,
425            scanned = result.memories_scanned,
426            clusters = result.clusters_merged,
427            consolidated = result.memories_consolidated,
428            "Auto-consolidation cycle completed"
429        );
430
431        result
432    }
433
434    /// Spawn the auto-pilot as two background tokio tasks (dedup + consolidation).
435    ///
436    /// Takes a shared `Arc<RwLock<AutoPilotConfig>>` so that config changes made at
437    /// runtime via the PILOT-2 endpoint (`PUT /admin/autopilot/config`) take effect
438    /// without a server restart. Each loop iteration re-reads the config before
439    /// sleeping, so interval and threshold changes apply on the next cycle.
440    pub fn spawn(
441        config: Arc<RwLock<AutoPilotConfig>>,
442        storage: Arc<dyn VectorStorage>,
443        metrics: Arc<crate::decay::BackgroundMetrics>,
444        redis: Option<RedisCache>,
445        node_id: String,
446    ) -> (tokio::task::JoinHandle<()>, tokio::task::JoinHandle<()>) {
447        let storage_dedup = storage.clone();
448        let metrics_dedup = metrics.clone();
449        let config_dedup = config.clone();
450        let redis_dedup = redis.clone();
451        let node_id_dedup = node_id.clone();
452        const DEDUP_LOCK_KEY: &str = "dakera:lock:dedup";
453        const CONSOLIDATION_LOCK_KEY: &str = "dakera:lock:consolidation";
454
455        let dedup_handle = tokio::spawn(async move {
456            loop {
457                let (enabled, dedup_threshold, interval_hours) = {
458                    let cfg = config_dedup.read().await;
459                    (cfg.enabled, cfg.dedup_threshold, cfg.dedup_interval_hours)
460                };
461
462                if !enabled {
463                    // Retry every 5 minutes while disabled
464                    tokio::time::sleep(std::time::Duration::from_secs(300)).await;
465                    continue;
466                }
467
468                tokio::time::sleep(std::time::Duration::from_secs(interval_hours * 3600)).await;
469
470                // Re-check enabled after sleep — may have been disabled during the interval
471                if !config_dedup.read().await.enabled {
472                    continue;
473                }
474
475                // Leader election: only one replica should run dedup at a time.
476                let lock_ttl = interval_hours * 3600 + 300;
477                let acquired = match redis_dedup {
478                    Some(ref rc) => {
479                        rc.try_acquire_lock(DEDUP_LOCK_KEY, &node_id_dedup, lock_ttl)
480                            .await
481                    }
482                    None => true,
483                };
484
485                if !acquired {
486                    tracing::debug!("Dedup skipped — another replica holds the leader lock");
487                    continue;
488                }
489
490                let engine = AutoPilotEngine::new(AutoPilotConfig {
491                    enabled: true,
492                    dedup_threshold,
493                    ..Default::default()
494                });
495                let result = engine.run_dedup(&storage_dedup).await;
496                metrics_dedup.record_dedup(
497                    result.namespaces_processed,
498                    result.memories_scanned,
499                    result.duplicates_removed,
500                );
501
502                if let Some(ref rc) = redis_dedup {
503                    rc.release_lock(DEDUP_LOCK_KEY, &node_id_dedup).await;
504                }
505            }
506        });
507
508        let consolidation_handle = tokio::spawn(async move {
509            loop {
510                let (enabled, interval_hours) = {
511                    let cfg = config.read().await;
512                    (cfg.enabled, cfg.consolidation_interval_hours)
513                };
514
515                if !enabled {
516                    tokio::time::sleep(std::time::Duration::from_secs(300)).await;
517                    continue;
518                }
519
520                tokio::time::sleep(std::time::Duration::from_secs(interval_hours * 3600)).await;
521
522                if !config.read().await.enabled {
523                    continue;
524                }
525
526                // Leader election: only one replica should run consolidation at a time.
527                let lock_ttl = interval_hours * 3600 + 300;
528                let acquired = match redis {
529                    Some(ref rc) => {
530                        rc.try_acquire_lock(CONSOLIDATION_LOCK_KEY, &node_id, lock_ttl)
531                            .await
532                    }
533                    None => true,
534                };
535
536                if !acquired {
537                    tracing::debug!(
538                        "Consolidation skipped — another replica holds the leader lock"
539                    );
540                    continue;
541                }
542
543                let engine = AutoPilotEngine::new(AutoPilotConfig::default());
544                let result = engine.run_consolidation(&storage).await;
545                metrics.record_consolidation(
546                    result.namespaces_processed,
547                    result.memories_scanned,
548                    result.clusters_merged,
549                    result.memories_consolidated,
550                );
551
552                if let Some(ref rc) = redis {
553                    rc.release_lock(CONSOLIDATION_LOCK_KEY, &node_id).await;
554                }
555            }
556        });
557
558        (dedup_handle, consolidation_handle)
559    }
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565    use std::sync::Mutex;
566
567    // Shared lock for all tests that read or write DAKERA_AUTOPILOT_* env vars.
568    // Function-local `static` creates an independent mutex per test function,
569    // offering no cross-test serialization — apply the same fix as DAK-1018
570    // applied to decay tests.
571    static ENV_LOCK: Mutex<()> = Mutex::new(());
572
573    #[test]
574    fn test_cosine_similarity_identical() {
575        let a = vec![1.0, 0.0, 0.0];
576        let b = vec![1.0, 0.0, 0.0];
577        let sim = AutoPilotEngine::cosine_similarity(&a, &b);
578        assert!((sim - 1.0).abs() < 0.001);
579    }
580
581    #[test]
582    fn test_cosine_similarity_orthogonal() {
583        let a = vec![1.0, 0.0, 0.0];
584        let b = vec![0.0, 1.0, 0.0];
585        let sim = AutoPilotEngine::cosine_similarity(&a, &b);
586        assert!(sim.abs() < 0.001);
587    }
588
589    #[test]
590    fn test_cosine_similarity_opposite() {
591        let a = vec![1.0, 0.0];
592        let b = vec![-1.0, 0.0];
593        let sim = AutoPilotEngine::cosine_similarity(&a, &b);
594        assert!((sim - (-1.0)).abs() < 0.001);
595    }
596
597    #[test]
598    fn test_cosine_similarity_empty() {
599        let sim = AutoPilotEngine::cosine_similarity(&[], &[]);
600        assert!(sim.abs() < 0.001);
601    }
602
603    #[test]
604    fn test_retention_score() {
605        let mut mem = Memory {
606            id: "test".to_string(),
607            memory_type: MemoryType::Episodic,
608            content: "test".to_string(),
609            agent_id: "agent".to_string(),
610            session_id: None,
611            importance: 0.5,
612            tags: vec![],
613            metadata: None,
614            created_at: 0,
615            last_accessed_at: 0,
616            access_count: 10,
617            ttl_seconds: None,
618            expires_at: None,
619        };
620        let score_a = AutoPilotEngine::retention_score(&mem);
621
622        mem.importance = 0.8;
623        mem.access_count = 0;
624        let score_b = AutoPilotEngine::retention_score(&mem);
625
626        // 0.5 + 10*0.01 = 0.6  vs  0.8 + 0 = 0.8
627        assert!((score_a - 0.6).abs() < 0.001);
628        assert!((score_b - 0.8).abs() < 0.001);
629    }
630
631    #[test]
632    fn test_config_defaults() {
633        let config = AutoPilotConfig::default();
634        assert!(config.enabled);
635        assert!((config.dedup_threshold - 0.93).abs() < 0.001);
636        assert_eq!(config.dedup_interval_hours, 6);
637        assert_eq!(config.consolidation_interval_hours, 12);
638    }
639
640    // ── AutoPilotEngine::new ─────────────────────────────────────────────────
641
642    #[test]
643    fn test_engine_new_stores_config() {
644        let cfg = AutoPilotConfig {
645            enabled: false,
646            dedup_threshold: 0.85,
647            dedup_interval_hours: 3,
648            consolidation_interval_hours: 24,
649        };
650        let engine = AutoPilotEngine::new(cfg);
651        assert!(!engine.config.enabled);
652        assert!((engine.config.dedup_threshold - 0.85).abs() < 0.001);
653        assert_eq!(engine.config.dedup_interval_hours, 3);
654        assert_eq!(engine.config.consolidation_interval_hours, 24);
655    }
656
657    // ── cosine_similarity edge cases ─────────────────────────────────────────
658
659    #[test]
660    fn test_cosine_similarity_mismatched_lengths_returns_zero() {
661        let a = vec![1.0, 0.0, 0.0];
662        let b = vec![1.0, 0.0]; // different length
663        let sim = AutoPilotEngine::cosine_similarity(&a, &b);
664        assert!(
665            (sim - 0.0).abs() < 0.001,
666            "mismatched lengths should return 0.0, got {sim}"
667        );
668    }
669
670    #[test]
671    fn test_cosine_similarity_zero_vector_returns_zero() {
672        let a = vec![0.0, 0.0, 0.0];
673        let b = vec![1.0, 0.0, 0.0];
674        let sim = AutoPilotEngine::cosine_similarity(&a, &b);
675        assert!(
676            (sim - 0.0).abs() < 0.001,
677            "zero vector should give 0.0, got {sim}"
678        );
679    }
680
681    #[test]
682    fn test_cosine_similarity_single_element() {
683        let a = vec![2.0];
684        let b = vec![3.0];
685        let sim = AutoPilotEngine::cosine_similarity(&a, &b);
686        assert!(
687            (sim - 1.0).abs() < 0.001,
688            "same-direction scalars should give 1.0, got {sim}"
689        );
690    }
691
692    #[test]
693    fn test_cosine_similarity_partial_overlap() {
694        // 45-degree angle → cos(45°) ≈ 0.707
695        let a = vec![1.0_f32, 0.0];
696        let b = vec![1.0_f32, 1.0];
697        let sim = AutoPilotEngine::cosine_similarity(&a, &b);
698        let expected = 1.0_f32 / 2.0_f32.sqrt();
699        assert!(
700            (sim - expected).abs() < 0.001,
701            "expected ~{expected}, got {sim}"
702        );
703    }
704
705    // ── retention_score edge cases ────────────────────────────────────────────
706
707    #[test]
708    fn test_retention_score_zero_importance_zero_access() {
709        let mem = Memory {
710            id: "x".to_string(),
711            memory_type: MemoryType::Episodic,
712            content: "".to_string(),
713            agent_id: "a".to_string(),
714            session_id: None,
715            importance: 0.0,
716            tags: vec![],
717            metadata: None,
718            created_at: 0,
719            last_accessed_at: 0,
720            access_count: 0,
721            ttl_seconds: None,
722            expires_at: None,
723        };
724        let score = AutoPilotEngine::retention_score(&mem);
725        assert!((score - 0.0).abs() < 0.001);
726    }
727
728    #[test]
729    fn test_retention_score_access_count_dominates() {
730        let mut mem = Memory {
731            id: "x".to_string(),
732            memory_type: MemoryType::Episodic,
733            content: "".to_string(),
734            agent_id: "a".to_string(),
735            session_id: None,
736            importance: 0.1,
737            tags: vec![],
738            metadata: None,
739            created_at: 0,
740            last_accessed_at: 0,
741            access_count: 100,
742            ttl_seconds: None,
743            expires_at: None,
744        };
745        let score = AutoPilotEngine::retention_score(&mem);
746        // 0.1 + 100*0.01 = 0.1 + 1.0 = 1.1
747        assert!((score - 1.1).abs() < 0.001, "expected 1.1, got {score}");
748
749        mem.access_count = 0;
750        mem.importance = 1.0;
751        let score2 = AutoPilotEngine::retention_score(&mem);
752        assert!((score2 - 1.0).abs() < 0.001);
753    }
754
755    // ── AutoPilotConfig::from_env ─────────────────────────────────────────────
756
757    #[test]
758    fn test_autopilot_config_from_env_defaults() {
759        let _guard = ENV_LOCK.lock().unwrap();
760        std::env::remove_var("DAKERA_AUTOPILOT_ENABLED");
761        std::env::remove_var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD");
762        std::env::remove_var("DAKERA_AUTOPILOT_DEDUP_INTERVAL_HOURS");
763        std::env::remove_var("DAKERA_AUTOPILOT_CONSOLIDATION_INTERVAL_HOURS");
764        let cfg = AutoPilotConfig::from_env();
765        assert!(cfg.enabled);
766        assert!((cfg.dedup_threshold - 0.93).abs() < 0.001);
767        assert_eq!(cfg.dedup_interval_hours, 6);
768        assert_eq!(cfg.consolidation_interval_hours, 12);
769    }
770
771    #[test]
772    fn test_autopilot_config_from_env_disabled() {
773        let _guard = ENV_LOCK.lock().unwrap();
774
775        std::env::set_var("DAKERA_AUTOPILOT_ENABLED", "false");
776        let cfg = AutoPilotConfig::from_env();
777        std::env::remove_var("DAKERA_AUTOPILOT_ENABLED");
778        assert!(!cfg.enabled);
779    }
780
781    #[test]
782    fn test_autopilot_config_from_env_custom_threshold() {
783        let _guard = ENV_LOCK.lock().unwrap();
784
785        std::env::set_var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD", "0.75");
786        let cfg = AutoPilotConfig::from_env();
787        std::env::remove_var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD");
788        assert!((cfg.dedup_threshold - 0.75).abs() < 0.001);
789    }
790
791    // ── result types ─────────────────────────────────────────────────────────
792
793    #[test]
794    fn test_dedup_result_default() {
795        let r = DedupResult::default();
796        assert_eq!(r.namespaces_processed, 0);
797        assert_eq!(r.memories_scanned, 0);
798        assert_eq!(r.duplicates_removed, 0);
799    }
800
801    #[test]
802    fn test_consolidation_result_default() {
803        let r = ConsolidationResult::default();
804        assert_eq!(r.namespaces_processed, 0);
805        assert_eq!(r.memories_scanned, 0);
806        assert_eq!(r.clusters_merged, 0);
807        assert_eq!(r.memories_consolidated, 0);
808    }
809}