use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common::{Memory, MemoryType};
use storage::{RedisCache, VectorStorage};
use tokio::sync::RwLock;
use tracing;
#[derive(Clone)]
pub struct AutoPilotConfig {
pub enabled: bool,
pub dedup_threshold: f32,
pub dedup_interval_hours: u64,
pub consolidation_interval_hours: u64,
}
impl Default for AutoPilotConfig {
fn default() -> Self {
Self {
enabled: true,
dedup_threshold: 0.93,
dedup_interval_hours: 6,
consolidation_interval_hours: 12,
}
}
}
impl AutoPilotConfig {
pub fn from_env() -> Self {
let enabled: bool = std::env::var("DAKERA_AUTOPILOT_ENABLED")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(true);
let dedup_threshold: f32 = std::env::var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(0.93);
let dedup_interval_hours: u64 = std::env::var("DAKERA_AUTOPILOT_DEDUP_INTERVAL_HOURS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(6);
let consolidation_interval_hours: u64 =
std::env::var("DAKERA_AUTOPILOT_CONSOLIDATION_INTERVAL_HOURS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(12);
Self {
enabled,
dedup_threshold,
dedup_interval_hours,
consolidation_interval_hours,
}
}
}
#[derive(Debug, Default)]
pub struct DedupResult {
pub namespaces_processed: usize,
pub memories_scanned: usize,
pub duplicates_removed: usize,
}
#[derive(Debug, Default)]
pub struct ConsolidationResult {
pub namespaces_processed: usize,
pub memories_scanned: usize,
pub clusters_merged: usize,
pub memories_consolidated: usize,
}
pub struct AutoPilotEngine {
pub config: AutoPilotConfig,
}
impl AutoPilotEngine {
pub fn new(config: AutoPilotConfig) -> Self {
Self { config }
}
fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
if a.len() != b.len() || a.is_empty() {
return 0.0;
}
let mut dot = 0.0_f64;
let mut norm_a = 0.0_f64;
let mut norm_b = 0.0_f64;
for (x, y) in a.iter().zip(b.iter()) {
let xd = *x as f64;
let yd = *y as f64;
dot += xd * yd;
norm_a += xd * xd;
norm_b += yd * yd;
}
let denom = norm_a.sqrt() * norm_b.sqrt();
if denom == 0.0 {
0.0
} else {
(dot / denom) as f32
}
}
fn retention_score(memory: &Memory) -> f64 {
memory.importance as f64 + memory.access_count as f64 * 0.01
}
pub async fn run_dedup(&self, storage: &Arc<dyn VectorStorage>) -> DedupResult {
let mut result = DedupResult::default();
let namespaces = match storage.list_namespaces().await {
Ok(ns) => ns,
Err(e) => {
tracing::error!(error = %e, "Auto-dedup: failed to list namespaces");
return result;
}
};
for namespace in namespaces {
if !namespace.starts_with("_dakera_agent_") {
continue;
}
result.namespaces_processed += 1;
let vectors = match storage.get_all(&namespace).await {
Ok(v) => v,
Err(e) => {
tracing::warn!(
namespace = %namespace,
error = %e,
"Auto-dedup: failed to get vectors"
);
continue;
}
};
let items: Vec<(Memory, &[f32])> = vectors
.iter()
.filter_map(|v| {
let mem = Memory::from_vector(v)?;
if v.values.is_empty() {
return None;
}
Some((mem, v.values.as_slice()))
})
.collect();
result.memories_scanned += items.len();
let mut to_delete: HashSet<String> = HashSet::new();
for i in 0..items.len() {
if to_delete.contains(&items[i].0.id) {
continue;
}
for j in (i + 1)..items.len() {
if to_delete.contains(&items[j].0.id) {
continue;
}
let sim = Self::cosine_similarity(items[i].1, items[j].1);
if sim >= self.config.dedup_threshold {
if Self::retention_score(&items[i].0) >= Self::retention_score(&items[j].0)
{
to_delete.insert(items[j].0.id.clone());
} else {
to_delete.insert(items[i].0.id.clone());
break; }
}
}
}
if !to_delete.is_empty() {
let ids: Vec<String> = to_delete.into_iter().collect();
result.duplicates_removed += ids.len();
if let Err(e) = storage.delete(&namespace, &ids).await {
tracing::warn!(
namespace = %namespace,
count = ids.len(),
error = %e,
"Auto-dedup: failed to delete duplicates"
);
}
}
}
tracing::info!(
namespaces = result.namespaces_processed,
scanned = result.memories_scanned,
removed = result.duplicates_removed,
"Auto-dedup cycle completed"
);
result
}
pub async fn run_consolidation(&self, storage: &Arc<dyn VectorStorage>) -> ConsolidationResult {
let mut result = ConsolidationResult::default();
let namespaces = match storage.list_namespaces().await {
Ok(ns) => ns,
Err(e) => {
tracing::error!(error = %e, "Auto-consolidation: failed to list namespaces");
return result;
}
};
for namespace in namespaces {
if !namespace.starts_with("_dakera_agent_") {
continue;
}
result.namespaces_processed += 1;
let vectors = match storage.get_all(&namespace).await {
Ok(v) => v,
Err(e) => {
tracing::warn!(
namespace = %namespace,
error = %e,
"Auto-consolidation: failed to get vectors"
);
continue;
}
};
let items: Vec<(Memory, Vec<f32>)> = vectors
.iter()
.filter_map(|v| {
let mem = Memory::from_vector(v)?;
if mem.importance >= 0.3 || v.values.is_empty() || mem.tags.is_empty() {
return None;
}
Some((mem, v.values.clone()))
})
.collect();
result.memories_scanned += items.len();
if items.len() < 3 {
continue;
}
let mut tag_to_indices: HashMap<&str, Vec<usize>> = HashMap::new();
for (i, (mem, _)) in items.iter().enumerate() {
for tag in &mem.tags {
tag_to_indices.entry(tag.as_str()).or_default().push(i);
}
}
let mut pair_shared_tags: HashMap<(usize, usize), usize> = HashMap::new();
for indices in tag_to_indices.values() {
for ai in 0..indices.len() {
for bi in (ai + 1)..indices.len() {
let key = (indices[ai], indices[bi]);
*pair_shared_tags.entry(key).or_default() += 1;
}
}
}
let mut adjacency: HashMap<usize, HashSet<usize>> = HashMap::new();
for (&(a, b), &count) in &pair_shared_tags {
if count >= 2 {
adjacency.entry(a).or_default().insert(b);
adjacency.entry(b).or_default().insert(a);
}
}
let mut visited: HashSet<usize> = HashSet::new();
let mut clusters: Vec<Vec<usize>> = Vec::new();
for &node in adjacency.keys() {
if visited.contains(&node) {
continue;
}
let mut cluster = Vec::new();
let mut stack = vec![node];
while let Some(n) = stack.pop() {
if visited.insert(n) {
cluster.push(n);
if let Some(neighbors) = adjacency.get(&n) {
for &nb in neighbors {
if !visited.contains(&nb) {
stack.push(nb);
}
}
}
}
}
if cluster.len() >= 3 {
clusters.push(cluster);
}
}
for (ci, cluster) in clusters.iter().enumerate() {
let memories: Vec<&Memory> = cluster.iter().map(|&i| &items[i].0).collect();
let embeddings: Vec<&Vec<f32>> = cluster.iter().map(|&i| &items[i].1).collect();
let max_importance = memories
.iter()
.map(|m| m.importance)
.fold(0.0_f32, f32::max);
let mut all_tags: Vec<String> =
memories.iter().flat_map(|m| m.tags.clone()).collect();
all_tags.sort();
all_tags.dedup();
let combined_content: String = memories
.iter()
.map(|m| m.content.as_str())
.collect::<Vec<_>>()
.join("\n---\n");
let dim = embeddings[0].len();
let mut avg_embedding = vec![0.0_f32; dim];
for emb in &embeddings {
for (i, v) in emb.iter().enumerate() {
avg_embedding[i] += v;
}
}
let count = embeddings.len() as f32;
for v in &mut avg_embedding {
*v /= count;
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let agent_id = memories[0].agent_id.clone();
let merged_id = format!("mem_consolidated_{:x}_{}", now, ci);
let merged_memory = Memory {
id: merged_id,
memory_type: MemoryType::Semantic,
content: combined_content,
agent_id,
session_id: None,
importance: max_importance,
tags: all_tags,
metadata: None,
created_at: (now / 1_000_000_000) as u64,
last_accessed_at: (now / 1_000_000_000) as u64,
access_count: 0,
ttl_seconds: None,
expires_at: None,
};
let merged_vector = merged_memory.to_vector(avg_embedding);
let ids_to_delete: Vec<String> = memories.iter().map(|m| m.id.clone()).collect();
if let Err(e) = storage.delete(&namespace, &ids_to_delete).await {
tracing::warn!(
namespace = %namespace,
error = %e,
"Auto-consolidation: failed to delete originals"
);
continue;
}
if let Err(e) = storage.upsert(&namespace, vec![merged_vector]).await {
tracing::warn!(
namespace = %namespace,
error = %e,
"Auto-consolidation: failed to insert merged memory"
);
continue;
}
result.clusters_merged += 1;
result.memories_consolidated += ids_to_delete.len();
}
}
tracing::info!(
namespaces = result.namespaces_processed,
scanned = result.memories_scanned,
clusters = result.clusters_merged,
consolidated = result.memories_consolidated,
"Auto-consolidation cycle completed"
);
result
}
pub fn spawn(
config: Arc<RwLock<AutoPilotConfig>>,
storage: Arc<dyn VectorStorage>,
metrics: Arc<crate::decay::BackgroundMetrics>,
redis: Option<RedisCache>,
node_id: String,
) -> (tokio::task::JoinHandle<()>, tokio::task::JoinHandle<()>) {
let storage_dedup = storage.clone();
let metrics_dedup = metrics.clone();
let config_dedup = config.clone();
let redis_dedup = redis.clone();
let node_id_dedup = node_id.clone();
const DEDUP_LOCK_KEY: &str = "dakera:lock:dedup";
const CONSOLIDATION_LOCK_KEY: &str = "dakera:lock:consolidation";
let dedup_handle = tokio::spawn(async move {
loop {
let (enabled, dedup_threshold, interval_hours) = {
let cfg = config_dedup.read().await;
(cfg.enabled, cfg.dedup_threshold, cfg.dedup_interval_hours)
};
if !enabled {
tokio::time::sleep(std::time::Duration::from_secs(300)).await;
continue;
}
tokio::time::sleep(std::time::Duration::from_secs(interval_hours * 3600)).await;
if !config_dedup.read().await.enabled {
continue;
}
let lock_ttl = interval_hours * 3600 + 300;
let acquired = match redis_dedup {
Some(ref rc) => {
rc.try_acquire_lock(DEDUP_LOCK_KEY, &node_id_dedup, lock_ttl)
.await
}
None => true,
};
if !acquired {
tracing::debug!("Dedup skipped — another replica holds the leader lock");
continue;
}
let engine = AutoPilotEngine::new(AutoPilotConfig {
enabled: true,
dedup_threshold,
..Default::default()
});
let result = engine.run_dedup(&storage_dedup).await;
metrics_dedup.record_dedup(
result.namespaces_processed,
result.memories_scanned,
result.duplicates_removed,
);
if let Some(ref rc) = redis_dedup {
rc.release_lock(DEDUP_LOCK_KEY, &node_id_dedup).await;
}
}
});
let consolidation_handle = tokio::spawn(async move {
loop {
let (enabled, interval_hours) = {
let cfg = config.read().await;
(cfg.enabled, cfg.consolidation_interval_hours)
};
if !enabled {
tokio::time::sleep(std::time::Duration::from_secs(300)).await;
continue;
}
tokio::time::sleep(std::time::Duration::from_secs(interval_hours * 3600)).await;
if !config.read().await.enabled {
continue;
}
let lock_ttl = interval_hours * 3600 + 300;
let acquired = match redis {
Some(ref rc) => {
rc.try_acquire_lock(CONSOLIDATION_LOCK_KEY, &node_id, lock_ttl)
.await
}
None => true,
};
if !acquired {
tracing::debug!(
"Consolidation skipped — another replica holds the leader lock"
);
continue;
}
let engine = AutoPilotEngine::new(AutoPilotConfig::default());
let result = engine.run_consolidation(&storage).await;
metrics.record_consolidation(
result.namespaces_processed,
result.memories_scanned,
result.clusters_merged,
result.memories_consolidated,
);
if let Some(ref rc) = redis {
rc.release_lock(CONSOLIDATION_LOCK_KEY, &node_id).await;
}
}
});
(dedup_handle, consolidation_handle)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
static ENV_LOCK: Mutex<()> = Mutex::new(());
#[test]
fn test_cosine_similarity_identical() {
let a = vec![1.0, 0.0, 0.0];
let b = vec![1.0, 0.0, 0.0];
let sim = AutoPilotEngine::cosine_similarity(&a, &b);
assert!((sim - 1.0).abs() < 0.001);
}
#[test]
fn test_cosine_similarity_orthogonal() {
let a = vec![1.0, 0.0, 0.0];
let b = vec![0.0, 1.0, 0.0];
let sim = AutoPilotEngine::cosine_similarity(&a, &b);
assert!(sim.abs() < 0.001);
}
#[test]
fn test_cosine_similarity_opposite() {
let a = vec![1.0, 0.0];
let b = vec![-1.0, 0.0];
let sim = AutoPilotEngine::cosine_similarity(&a, &b);
assert!((sim - (-1.0)).abs() < 0.001);
}
#[test]
fn test_cosine_similarity_empty() {
let sim = AutoPilotEngine::cosine_similarity(&[], &[]);
assert!(sim.abs() < 0.001);
}
#[test]
fn test_retention_score() {
let mut mem = Memory {
id: "test".to_string(),
memory_type: MemoryType::Episodic,
content: "test".to_string(),
agent_id: "agent".to_string(),
session_id: None,
importance: 0.5,
tags: vec![],
metadata: None,
created_at: 0,
last_accessed_at: 0,
access_count: 10,
ttl_seconds: None,
expires_at: None,
};
let score_a = AutoPilotEngine::retention_score(&mem);
mem.importance = 0.8;
mem.access_count = 0;
let score_b = AutoPilotEngine::retention_score(&mem);
assert!((score_a - 0.6).abs() < 0.001);
assert!((score_b - 0.8).abs() < 0.001);
}
#[test]
fn test_config_defaults() {
let config = AutoPilotConfig::default();
assert!(config.enabled);
assert!((config.dedup_threshold - 0.93).abs() < 0.001);
assert_eq!(config.dedup_interval_hours, 6);
assert_eq!(config.consolidation_interval_hours, 12);
}
#[test]
fn test_engine_new_stores_config() {
let cfg = AutoPilotConfig {
enabled: false,
dedup_threshold: 0.85,
dedup_interval_hours: 3,
consolidation_interval_hours: 24,
};
let engine = AutoPilotEngine::new(cfg);
assert!(!engine.config.enabled);
assert!((engine.config.dedup_threshold - 0.85).abs() < 0.001);
assert_eq!(engine.config.dedup_interval_hours, 3);
assert_eq!(engine.config.consolidation_interval_hours, 24);
}
#[test]
fn test_cosine_similarity_mismatched_lengths_returns_zero() {
let a = vec![1.0, 0.0, 0.0];
let b = vec![1.0, 0.0]; let sim = AutoPilotEngine::cosine_similarity(&a, &b);
assert!(
(sim - 0.0).abs() < 0.001,
"mismatched lengths should return 0.0, got {sim}"
);
}
#[test]
fn test_cosine_similarity_zero_vector_returns_zero() {
let a = vec![0.0, 0.0, 0.0];
let b = vec![1.0, 0.0, 0.0];
let sim = AutoPilotEngine::cosine_similarity(&a, &b);
assert!(
(sim - 0.0).abs() < 0.001,
"zero vector should give 0.0, got {sim}"
);
}
#[test]
fn test_cosine_similarity_single_element() {
let a = vec![2.0];
let b = vec![3.0];
let sim = AutoPilotEngine::cosine_similarity(&a, &b);
assert!(
(sim - 1.0).abs() < 0.001,
"same-direction scalars should give 1.0, got {sim}"
);
}
#[test]
fn test_cosine_similarity_partial_overlap() {
let a = vec![1.0_f32, 0.0];
let b = vec![1.0_f32, 1.0];
let sim = AutoPilotEngine::cosine_similarity(&a, &b);
let expected = 1.0_f32 / 2.0_f32.sqrt();
assert!(
(sim - expected).abs() < 0.001,
"expected ~{expected}, got {sim}"
);
}
#[test]
fn test_retention_score_zero_importance_zero_access() {
let mem = Memory {
id: "x".to_string(),
memory_type: MemoryType::Episodic,
content: "".to_string(),
agent_id: "a".to_string(),
session_id: None,
importance: 0.0,
tags: vec![],
metadata: None,
created_at: 0,
last_accessed_at: 0,
access_count: 0,
ttl_seconds: None,
expires_at: None,
};
let score = AutoPilotEngine::retention_score(&mem);
assert!((score - 0.0).abs() < 0.001);
}
#[test]
fn test_retention_score_access_count_dominates() {
let mut mem = Memory {
id: "x".to_string(),
memory_type: MemoryType::Episodic,
content: "".to_string(),
agent_id: "a".to_string(),
session_id: None,
importance: 0.1,
tags: vec![],
metadata: None,
created_at: 0,
last_accessed_at: 0,
access_count: 100,
ttl_seconds: None,
expires_at: None,
};
let score = AutoPilotEngine::retention_score(&mem);
assert!((score - 1.1).abs() < 0.001, "expected 1.1, got {score}");
mem.access_count = 0;
mem.importance = 1.0;
let score2 = AutoPilotEngine::retention_score(&mem);
assert!((score2 - 1.0).abs() < 0.001);
}
#[test]
fn test_autopilot_config_from_env_defaults() {
let _guard = ENV_LOCK.lock().unwrap();
std::env::remove_var("DAKERA_AUTOPILOT_ENABLED");
std::env::remove_var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD");
std::env::remove_var("DAKERA_AUTOPILOT_DEDUP_INTERVAL_HOURS");
std::env::remove_var("DAKERA_AUTOPILOT_CONSOLIDATION_INTERVAL_HOURS");
let cfg = AutoPilotConfig::from_env();
assert!(cfg.enabled);
assert!((cfg.dedup_threshold - 0.93).abs() < 0.001);
assert_eq!(cfg.dedup_interval_hours, 6);
assert_eq!(cfg.consolidation_interval_hours, 12);
}
#[test]
fn test_autopilot_config_from_env_disabled() {
let _guard = ENV_LOCK.lock().unwrap();
std::env::set_var("DAKERA_AUTOPILOT_ENABLED", "false");
let cfg = AutoPilotConfig::from_env();
std::env::remove_var("DAKERA_AUTOPILOT_ENABLED");
assert!(!cfg.enabled);
}
#[test]
fn test_autopilot_config_from_env_custom_threshold() {
let _guard = ENV_LOCK.lock().unwrap();
std::env::set_var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD", "0.75");
let cfg = AutoPilotConfig::from_env();
std::env::remove_var("DAKERA_AUTOPILOT_DEDUP_THRESHOLD");
assert!((cfg.dedup_threshold - 0.75).abs() < 0.001);
}
#[test]
fn test_dedup_result_default() {
let r = DedupResult::default();
assert_eq!(r.namespaces_processed, 0);
assert_eq!(r.memories_scanned, 0);
assert_eq!(r.duplicates_removed, 0);
}
#[test]
fn test_consolidation_result_default() {
let r = ConsolidationResult::default();
assert_eq!(r.namespaces_processed, 0);
assert_eq!(r.memories_scanned, 0);
assert_eq!(r.clusters_merged, 0);
assert_eq!(r.memories_consolidated, 0);
}
}