use crate::embedding::EmbeddingProvider;
use crate::fact::{Fact, FactFilter, FactId, FactPatch, MemoryTier};
use crate::llm::LlmClient;
use crate::scope::Scope;
use crate::store::{FactStore, MemoryError};
use crate::vector::{VectorFilter, VectorStore};
use chrono::Utc;
use serde::Serialize;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct ConsolidationConfig {
pub enabled_ops: Vec<ConsolidationOp>,
pub decay_factor: f64,
pub half_life_days: f64,
pub archive_threshold: f32,
pub promote_access_count: u64,
pub dedup_similarity: f32,
pub summarize_threshold: usize,
pub reflect_min_facts: usize,
pub max_llm_calls: usize,
}
impl Default for ConsolidationConfig {
fn default() -> Self {
Self {
enabled_ops: vec![
ConsolidationOp::Decay,
ConsolidationOp::Promote,
ConsolidationOp::Dedup,
ConsolidationOp::Summarize,
ConsolidationOp::Reflect,
],
decay_factor: 0.95,
half_life_days: 30.0,
archive_threshold: 0.3,
promote_access_count: 3,
dedup_similarity: 0.95,
summarize_threshold: 100,
reflect_min_facts: 10,
max_llm_calls: 10,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ConsolidationOp {
Decay,
Promote,
Dedup,
Summarize,
Reflect,
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct ConsolidationResult {
pub facts_decayed: usize,
pub facts_archived: usize,
pub facts_promoted: usize,
pub facts_deduped: usize,
pub facts_summarized: usize,
pub insights_generated: usize,
pub llm_calls_used: usize,
}
pub struct ConsolidationEngine {
fact_store: Arc<dyn FactStore>,
vector_store: Arc<dyn VectorStore>,
embedding: Arc<dyn EmbeddingProvider>,
config: ConsolidationConfig,
}
impl ConsolidationEngine {
pub fn new(
fact_store: Arc<dyn FactStore>,
vector_store: Arc<dyn VectorStore>,
embedding: Arc<dyn EmbeddingProvider>,
config: ConsolidationConfig,
) -> Self {
Self {
fact_store,
vector_store,
embedding,
config,
}
}
pub async fn run(
&self,
scope: &Scope,
llm: Option<&dyn LlmClient>,
) -> Result<ConsolidationResult, MemoryError> {
let mut result = ConsolidationResult::default();
for op in &self.config.enabled_ops {
match op {
ConsolidationOp::Decay => {
let (decayed, archived) = self.op_decay(scope).await?;
result.facts_decayed = decayed;
result.facts_archived = archived;
}
ConsolidationOp::Promote => {
result.facts_promoted = self.op_promote(scope).await?;
}
ConsolidationOp::Dedup => {
result.facts_deduped = self.op_dedup(scope).await?;
}
ConsolidationOp::Summarize => {
if let Some(llm) = llm {
if result.llm_calls_used < self.config.max_llm_calls {
let (summarized, calls) = self
.op_summarize(
scope,
llm,
self.config.max_llm_calls - result.llm_calls_used,
)
.await?;
result.facts_summarized = summarized;
result.llm_calls_used += calls;
}
}
}
ConsolidationOp::Reflect => {
if let Some(llm) = llm {
if result.llm_calls_used < self.config.max_llm_calls {
let (insights, calls) = self
.op_reflect(
scope,
llm,
self.config.max_llm_calls - result.llm_calls_used,
)
.await?;
result.insights_generated = insights;
result.llm_calls_used += calls;
}
}
}
}
}
Ok(result)
}
async fn op_decay(&self, scope: &Scope) -> Result<(usize, usize), MemoryError> {
let filter = FactFilter::new()
.with_scope(scope.clone())
.with_tier(MemoryTier::Knowledge);
let facts = self.fact_store.list_facts(&filter).await?;
let now = Utc::now();
let mut decayed = 0;
let mut archived = 0;
for fact in &facts {
let last_access = fact.last_accessed.unwrap_or(fact.created_at);
let days_elapsed = (now - last_access).num_milliseconds() as f64 / 86_400_000.0;
if days_elapsed <= 0.0 {
continue;
}
let current_confidence = fact.confidence.unwrap_or(1.0) as f64;
let new_confidence = current_confidence
* self
.config
.decay_factor
.powf(days_elapsed / self.config.half_life_days);
let new_confidence = new_confidence as f32;
if (new_confidence - fact.confidence.unwrap_or(1.0)).abs() < 0.001 {
continue; }
if new_confidence < self.config.archive_threshold {
self.fact_store.invalidate_fact(fact.id).await?;
self.vector_store.delete(fact.id).await?;
archived += 1;
} else {
let patch = FactPatch {
confidence: Some(new_confidence),
..Default::default()
};
self.fact_store.update_fact(fact.id, patch).await?;
}
decayed += 1;
}
Ok((decayed, archived))
}
async fn op_promote(&self, scope: &Scope) -> Result<usize, MemoryError> {
let filter = FactFilter::new()
.with_scope(scope.clone())
.with_tier(MemoryTier::Conversation);
let facts = self.fact_store.list_facts(&filter).await?;
let mut promoted = 0;
for fact in &facts {
if fact.access_count >= self.config.promote_access_count {
let patch = FactPatch {
tier: Some(MemoryTier::Knowledge),
..Default::default()
};
self.fact_store.update_fact(fact.id, patch).await?;
promoted += 1;
}
}
Ok(promoted)
}
async fn op_dedup(&self, scope: &Scope) -> Result<usize, MemoryError> {
let filter = FactFilter::new().with_scope(scope.clone());
let facts = self.fact_store.list_facts(&filter).await?;
let mut deduped = 0;
let mut invalidated_ids: Vec<FactId> = Vec::new();
for fact in &facts {
if invalidated_ids.contains(&fact.id) {
continue;
}
let embedding = match self.embedding.embed(&[fact.text.as_str()]).await {
Ok(mut embs) => match embs.pop() {
Some(e) => e,
None => continue,
},
Err(_) => continue,
};
let vector_filter = VectorFilter {
scope: Some(scope.clone()),
min_score: Some(self.config.dedup_similarity),
};
let matches = self
.vector_store
.search(&embedding, &vector_filter, 10)
.await?;
for m in &matches {
if m.id == fact.id || invalidated_ids.contains(&m.id) {
continue;
}
if m.score > 0.99 {
self.fact_store.invalidate_fact(m.id).await?;
self.vector_store.delete(m.id).await?;
let patch = FactPatch {
superseded_by: Some(fact.id),
..Default::default()
};
let _ = self.fact_store.update_fact(m.id, patch).await;
invalidated_ids.push(m.id);
deduped += 1;
}
}
}
Ok(deduped)
}
async fn op_summarize(
&self,
scope: &Scope,
llm: &dyn LlmClient,
max_calls: usize,
) -> Result<(usize, usize), MemoryError> {
let filter = FactFilter::new()
.with_scope(scope.clone())
.with_tier(MemoryTier::Conversation);
let facts = self.fact_store.list_facts(&filter).await?;
if facts.len() < self.config.summarize_threshold {
return Ok((0, 0));
}
let fact_texts: Vec<String> = facts.iter().map(|f| f.text.clone()).collect();
let prompt = fact_texts.join("\n- ");
let system = "You are a memory summarization engine. Given a list of conversation facts, produce 1-3 concise knowledge-level summaries that capture the key information. Respond with JSON: {\"summaries\": [\"summary1\", \"summary2\"]}";
let user_msg = format!("Summarize these conversation facts into knowledge:\n- {prompt}");
let response = llm.structured_output(system, &user_msg).await?;
let summaries: Vec<String> = response["summaries"]
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
let mut summarized = 0;
for summary_text in &summaries {
let mut embeddings = self.embedding.embed(&[summary_text.as_str()]).await?;
let embedding = embeddings.pop().unwrap_or_default();
let mut fact = Fact::new(summary_text, scope.clone());
fact.tier = MemoryTier::Knowledge;
fact.source = Some("consolidation:summarize".to_string());
fact.confidence = Some(0.85);
fact.embedding = embedding.clone();
let id = self.fact_store.insert_fact(fact).await?;
self.vector_store
.upsert(id, embedding, serde_json::json!({}))
.await?;
summarized += 1;
}
for fact in &facts {
let mut metadata = serde_json::Map::new();
metadata.insert("summarized".to_string(), serde_json::Value::Bool(true));
let patch = FactPatch {
metadata,
..Default::default()
};
let _ = self.fact_store.update_fact(fact.id, patch).await;
}
Ok((summarized, 1.min(max_calls)))
}
async fn op_reflect(
&self,
scope: &Scope,
llm: &dyn LlmClient,
max_calls: usize,
) -> Result<(usize, usize), MemoryError> {
if max_calls == 0 {
return Ok((0, 0));
}
let filter = FactFilter::new()
.with_scope(scope.clone())
.with_tier(MemoryTier::Knowledge);
let facts = self.fact_store.list_facts(&filter).await?;
if facts.len() < self.config.reflect_min_facts {
return Ok((0, 0));
}
let fact_texts: Vec<String> = facts.iter().take(30).map(|f| f.text.clone()).collect();
let prompt = fact_texts.join("\n- ");
let system = "You are a memory reflection engine. Given a list of facts about a user/topic, generate 1-2 higher-order insights or patterns. Each insight should be a concise statement that captures something not explicitly stated by any single fact. Respond with JSON: {\"insights\": [\"insight1\", \"insight2\"]}";
let user_msg = format!("Generate insights from these facts:\n- {prompt}");
let response = llm.structured_output(system, &user_msg).await?;
let insights: Vec<String> = response["insights"]
.as_array()
.unwrap_or(&vec![])
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
let mut generated = 0;
for insight_text in &insights {
let mut embeddings = self.embedding.embed(&[insight_text.as_str()]).await?;
let embedding = embeddings.pop().unwrap_or_default();
let mut fact = Fact::new(insight_text, scope.clone());
fact.tier = MemoryTier::Knowledge;
fact.source = Some("consolidation:reflect".to_string());
fact.confidence = Some(0.8);
fact.embedding = embedding.clone();
let id = self.fact_store.insert_fact(fact).await?;
self.vector_store
.upsert(id, embedding, serde_json::json!({}))
.await?;
generated += 1;
}
Ok((generated, 1))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_defaults() {
let cfg = ConsolidationConfig::default();
assert_eq!(cfg.enabled_ops.len(), 5);
assert!((cfg.decay_factor - 0.95).abs() < f64::EPSILON);
assert_eq!(cfg.promote_access_count, 3);
assert_eq!(cfg.max_llm_calls, 10);
}
#[test]
fn result_defaults_to_zero() {
let r = ConsolidationResult::default();
assert_eq!(r.facts_decayed, 0);
assert_eq!(r.facts_promoted, 0);
assert_eq!(r.llm_calls_used, 0);
}
#[test]
fn decay_math() {
let confidence = 0.95_f64;
let decayed = confidence * 0.95_f64.powf(30.0 / 30.0);
assert!((decayed - 0.9025).abs() < 0.001);
let decayed_60 = confidence * 0.95_f64.powf(60.0 / 30.0);
assert!((decayed_60 - 0.8574).abs() < 0.001);
}
}