use super::config::DreamConfig;
use super::helpers::{
build_closet_index, is_low_quality_content, merge_into, rebuild_index_from_drawers,
};
use crate::memory_core::decay::DecayConfig;
use crate::memory_core::palace::{Drawer, RoomType};
use crate::memory_core::retrieval::{PalaceHandle, shared_embedder};
use crate::memory_core::semantic_consolidation::{SemanticConsolidator, inference_available};
use crate::memory_core::store::vector::VectorStore;
use anyhow::Result;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
pub(super) async fn content_prune_pass(
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
min_words: usize,
) -> Result<usize> {
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
let mut victims: Vec<Uuid> = Vec::new();
for drawer in snapshot.iter() {
if started.elapsed() >= budget {
break;
}
if drawer.drawer_type.is_protected() {
continue;
}
if is_low_quality_content(&drawer.content, min_words) {
victims.push(drawer.id);
}
}
let count = victims.len();
for id in victims {
if started.elapsed() >= budget {
break;
}
if let Err(e) = handle.forget(id).await {
tracing::warn!(?id, "dream content prune: forget failed: {e:#}");
}
}
Ok(count)
}
pub(super) async fn compact_pass(
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
) -> Result<usize> {
let drawer_ids: HashSet<Uuid> = handle.drawers.read().iter().map(|d| d.id).collect();
let vector_ids = handle.vector_store.all_ids();
let mut removed: usize = 0;
for vid in vector_ids {
if started.elapsed() >= budget {
break;
}
if drawer_ids.contains(&vid) {
continue;
}
match handle.vector_store.remove(vid).await {
Ok(()) => removed += 1,
Err(e) => tracing::warn!(?vid, "dream compact: vector remove failed: {e:#}"),
}
}
let drawer_count = drawer_ids.len();
let index_size_after = handle.vector_store.index_size();
if drawer_count > 0 && index_size_after > drawer_count.saturating_mul(2) + 1 {
let rebuilt = rebuild_index_from_drawers(handle, started, budget)
.await
.map_err(|e| e.context("dream compact rebuild"))?;
let delta = index_size_after.saturating_sub(rebuilt);
removed = removed.saturating_add(delta);
}
Ok(removed)
}
pub(super) async fn dedup_pass(
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
dedup_threshold: f32,
) -> Result<usize> {
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
if snapshot.len() < 2 {
return Ok(0);
}
let embedder = shared_embedder()
.await
.map_err(|e| e.context("acquire shared embedder for dream dedup"))?;
let contents: Vec<String> = snapshot.iter().map(|d| d.content.clone()).collect();
let vectors = embedder
.embed_batch(&contents)
.await
.map_err(|e| e.context("batch embed drawers for dream dedup"))?;
if vectors.len() != snapshot.len() {
anyhow::bail!(
"embedder returned {} vectors for {} drawers",
vectors.len(),
snapshot.len()
);
}
let mut merges: usize = 0;
let mut already_removed: HashSet<Uuid> = HashSet::new();
for (drawer, query_vec) in snapshot.iter().zip(vectors.iter()) {
if started.elapsed() >= budget {
break;
}
if already_removed.contains(&drawer.id) {
continue;
}
if drawer.drawer_type.is_protected() {
continue;
}
let hits = handle.vector_store.search(query_vec, 3).await?;
for hit in hits.into_iter() {
if hit.drawer_id == drawer.id || already_removed.contains(&hit.drawer_id) {
continue;
}
if hit.score < dedup_threshold {
continue;
}
let Some(hit_drawer) = snapshot.iter().find(|d| d.id == hit.drawer_id) else {
continue;
};
if hit_drawer.drawer_type.is_protected() {
continue;
}
let (survivor, loser) = if drawer.importance >= hit_drawer.importance {
(drawer.clone(), hit_drawer.clone())
} else {
(hit_drawer.clone(), drawer.clone())
};
merge_into(handle, &survivor, &loser);
let _ = handle.forget(loser.id).await;
already_removed.insert(loser.id);
merges += 1;
break;
}
}
Ok(merges)
}
pub(super) async fn prune_pass(
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
prune_importance: f32,
) -> Result<usize> {
const MIN_AGE_DAYS: f32 = 30.0;
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
let mut victims: Vec<Uuid> = Vec::new();
for drawer in snapshot.iter() {
if started.elapsed() >= budget {
break;
}
if drawer.drawer_type.is_protected() {
continue;
}
let age = DecayConfig::age_days(drawer.created_at);
let boost = drawer.accumulated_boost(&handle.decay_config);
let eff = handle
.decay_config
.effective_importance(drawer.importance, age, boost);
if eff <= prune_importance && age > MIN_AGE_DAYS {
victims.push(drawer.id);
}
}
let count = victims.len();
for id in victims {
let _ = handle.forget(id).await;
}
Ok(count)
}
pub(super) fn refresh_closets(handle: &Arc<PalaceHandle>) -> usize {
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
let new_index = build_closet_index(&snapshot);
let count = new_index.len();
let mut closets = handle.closets.write();
*closets = new_index;
count
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct RoomConsolidationStats {
pub summary_facts_created: usize,
pub facts_evicted: usize,
}
fn build_consolidator_from_config(config: &DreamConfig) -> Option<Arc<SemanticConsolidator>> {
if !config.semantic.enabled {
return None;
}
let api_key = if !config.openrouter_api_key.is_empty() {
config.openrouter_api_key.clone()
} else {
std::env::var("OPENROUTER_API_KEY").unwrap_or_default()
};
if !inference_available(&api_key, config.local_model_enabled) {
return None;
}
use crate::memory_core::semantic_consolidation::{OllamaInference, OpenRouterInference};
let backend: Arc<dyn crate::memory_core::semantic_consolidation::Inference> =
if config.local_model_enabled && api_key.is_empty() {
Arc::new(OllamaInference::new(
"http://localhost:11434",
&config.semantic.model,
))
} else {
Arc::new(OpenRouterInference::new(api_key, &config.semantic.model))
};
Some(Arc::new(SemanticConsolidator::new(
backend,
config.semantic.clone(),
)))
}
async fn apply_consolidation_result(
handle: &Arc<PalaceHandle>,
result: &crate::memory_core::semantic_consolidation::ConsolidationResult,
) -> (usize, Vec<Uuid>) {
let mut canonical_count = 0usize;
let mut superseded_ids: Vec<Uuid> = Vec::new();
for canonical in &result.canonical_drawers {
match handle
.remember(
canonical.content.clone(),
RoomType::General,
canonical.tags.clone(),
canonical.importance,
)
.await
{
Ok(canonical_id) => {
canonical_count += 1;
for &orig_id in &canonical.canonical_for {
superseded_ids.push(orig_id);
let triple = crate::memory_core::store::kg::Triple {
subject: format!("drawer:{orig_id}"),
predicate: "superseded_by".to_string(),
object: format!("drawer:{canonical_id}"),
valid_from: chrono::Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: Some("dream:semantic_consolidation".to_string()),
};
if let Err(e) = handle.kg.assert(triple).await {
tracing::warn!(
orig = %orig_id,
canonical = %canonical_id,
"failed to write superseded_by triple: {e:#}"
);
}
}
}
Err(e) => {
tracing::warn!(
content = &canonical.content[..canonical.content.len().min(80)],
"dream semantic: failed to add canonical drawer: {e:#}"
);
}
}
}
for (from, to) in &result.aliases {
let triple = crate::memory_core::store::kg::Triple {
subject: from.clone(),
predicate: "alias_of".to_string(),
object: to.clone(),
valid_from: chrono::Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: Some("dream:semantic_consolidation".to_string()),
};
if let Err(e) = handle.kg.assert(triple).await {
tracing::warn!(
from,
to,
"dream semantic: failed to write alias triple: {e:#}"
);
}
}
for (id, reason) in &result.flagged_ids {
tracing::info!(
palace = %handle.id,
drawer_id = %id,
reason,
"dream semantic: flagged drawer for human review (contradiction)"
);
}
(canonical_count, superseded_ids)
}
pub(super) async fn semantic_consolidation_pass(
handle: &Arc<PalaceHandle>,
config: &DreamConfig,
injected: Option<Arc<SemanticConsolidator>>,
) -> (usize, usize, usize) {
if !config.semantic.enabled {
tracing::debug!(
palace = %handle.id,
"skipping semantic consolidation: disabled in config"
);
return (0, 0, 0);
}
let consolidator: Arc<SemanticConsolidator> = match injected {
Some(c) => c,
None => match build_consolidator_from_config(config) {
Some(c) => c,
None => {
tracing::debug!(
palace = %handle.id,
"skipping semantic consolidation: disabled or inference unavailable"
);
return (0, 0, 0);
}
},
};
let snapshot: Vec<Drawer> = handle
.drawers
.read()
.iter()
.filter(|d| !d.drawer_type.is_protected())
.cloned()
.collect();
if snapshot.is_empty() {
return (0, 0, 0);
}
let result = consolidator.consolidate(&snapshot).await;
let (canonical_count, _superseded) = apply_consolidation_result(handle, &result).await;
tracing::debug!(
palace = %handle.id,
canonical_added = canonical_count,
aliases = result.aliases.len(),
flagged = result.flagged_ids.len(),
llm_calls = result.llm_calls,
cache_hits = result.cache_hits,
"semantic consolidation phase complete"
);
(canonical_count, result.llm_calls, result.cache_hits)
}
pub async fn consolidate_scoped(
handle: &Arc<PalaceHandle>,
config: &DreamConfig,
room: Option<RoomType>,
max_age_days: i64,
injected: Option<Arc<SemanticConsolidator>>,
) -> Result<RoomConsolidationStats> {
if max_age_days <= 0 {
tracing::debug!(
palace = %handle.id,
max_age_days,
"dream_consolidate_room: non-positive age window; no-op"
);
return Ok(RoomConsolidationStats::default());
}
let consolidator: Arc<SemanticConsolidator> = match injected {
Some(c) => c,
None => match build_consolidator_from_config(config) {
Some(c) => c,
None => {
tracing::debug!(
palace = %handle.id,
"dream_consolidate_room: inference unavailable; no-op"
);
return Ok(RoomConsolidationStats::default());
}
},
};
let cutoff = chrono::Utc::now() - chrono::Duration::days(max_age_days);
let snapshot: Vec<Drawer> = handle
.list_drawers(room, None, usize::MAX)
.into_iter()
.filter(|d| !d.drawer_type.is_protected())
.filter(|d| d.created_at <= cutoff)
.collect();
if snapshot.is_empty() {
return Ok(RoomConsolidationStats::default());
}
let result = consolidator.consolidate(&snapshot).await;
let (summary_facts_created, superseded_ids) = apply_consolidation_result(handle, &result).await;
let mut evicted = 0usize;
let mut seen: HashSet<Uuid> = HashSet::new();
for id in superseded_ids {
if !seen.insert(id) {
continue;
}
match handle.forget(id).await {
Ok(()) => evicted += 1,
Err(e) => tracing::warn!(?id, "dream_consolidate_room: evict failed: {e:#}"),
}
}
if let Err(e) = handle.flush() {
tracing::warn!(palace = %handle.id, "dream_consolidate_room flush failed: {e:#}");
}
Ok(RoomConsolidationStats {
summary_facts_created,
facts_evicted: evicted,
})
}