use super::config::DreamConfig;
use super::helpers::{
build_closet_index, is_low_quality_content, merge_into, rebuild_index_from_drawers,
};
use crate::memory_core::decay::DecayConfig;
use crate::memory_core::palace::{Drawer, RoomType};
use crate::memory_core::retrieval::{PalaceHandle, shared_embedder};
use crate::memory_core::semantic_consolidation::{SemanticConsolidator, inference_available};
use crate::memory_core::store::vector::VectorStore;
use anyhow::Result;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
pub(super) async fn content_prune_pass(
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
min_words: usize,
) -> Result<usize> {
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
let mut victims: Vec<Uuid> = Vec::new();
for drawer in snapshot.iter() {
if started.elapsed() >= budget {
break;
}
if is_low_quality_content(&drawer.content, min_words) {
victims.push(drawer.id);
}
}
let count = victims.len();
for id in victims {
if started.elapsed() >= budget {
break;
}
if let Err(e) = handle.forget(id).await {
tracing::warn!(?id, "dream content prune: forget failed: {e:#}");
}
}
Ok(count)
}
pub(super) async fn compact_pass(
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
) -> Result<usize> {
let drawer_ids: HashSet<Uuid> = handle.drawers.read().iter().map(|d| d.id).collect();
let vector_ids = handle.vector_store.all_ids();
let mut removed: usize = 0;
for vid in vector_ids {
if started.elapsed() >= budget {
break;
}
if drawer_ids.contains(&vid) {
continue;
}
match handle.vector_store.remove(vid).await {
Ok(()) => removed += 1,
Err(e) => tracing::warn!(?vid, "dream compact: vector remove failed: {e:#}"),
}
}
let drawer_count = drawer_ids.len();
let index_size_after = handle.vector_store.index_size();
if drawer_count > 0 && index_size_after > drawer_count.saturating_mul(2) + 1 {
let rebuilt = rebuild_index_from_drawers(handle, started, budget)
.await
.map_err(|e| e.context("dream compact rebuild"))?;
let delta = index_size_after.saturating_sub(rebuilt);
removed = removed.saturating_add(delta);
}
Ok(removed)
}
pub(super) async fn dedup_pass(
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
dedup_threshold: f32,
) -> Result<usize> {
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
if snapshot.len() < 2 {
return Ok(0);
}
let embedder = shared_embedder()
.await
.map_err(|e| e.context("acquire shared embedder for dream dedup"))?;
let contents: Vec<String> = snapshot.iter().map(|d| d.content.clone()).collect();
let vectors = embedder
.embed_batch(&contents)
.await
.map_err(|e| e.context("batch embed drawers for dream dedup"))?;
if vectors.len() != snapshot.len() {
anyhow::bail!(
"embedder returned {} vectors for {} drawers",
vectors.len(),
snapshot.len()
);
}
let mut merges: usize = 0;
let mut already_removed: HashSet<Uuid> = HashSet::new();
for (drawer, query_vec) in snapshot.iter().zip(vectors.iter()) {
if started.elapsed() >= budget {
break;
}
if already_removed.contains(&drawer.id) {
continue;
}
let hits = handle.vector_store.search(query_vec, 3).await?;
for hit in hits.into_iter() {
if hit.drawer_id == drawer.id || already_removed.contains(&hit.drawer_id) {
continue;
}
if hit.score < dedup_threshold {
continue;
}
let Some(hit_drawer) = snapshot.iter().find(|d| d.id == hit.drawer_id) else {
continue;
};
let (survivor, loser) = if drawer.importance >= hit_drawer.importance {
(drawer.clone(), hit_drawer.clone())
} else {
(hit_drawer.clone(), drawer.clone())
};
merge_into(handle, &survivor, &loser);
let _ = handle.forget(loser.id).await;
already_removed.insert(loser.id);
merges += 1;
break;
}
}
Ok(merges)
}
pub(super) async fn prune_pass(
handle: &Arc<PalaceHandle>,
started: std::time::Instant,
budget: Duration,
prune_importance: f32,
) -> Result<usize> {
const MIN_AGE_DAYS: f32 = 30.0;
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
let mut victims: Vec<Uuid> = Vec::new();
for drawer in snapshot.iter() {
if started.elapsed() >= budget {
break;
}
let age = DecayConfig::age_days(drawer.created_at);
let boost = drawer.accumulated_boost(&handle.decay_config);
let eff = handle
.decay_config
.effective_importance(drawer.importance, age, boost);
if eff <= prune_importance && age > MIN_AGE_DAYS {
victims.push(drawer.id);
}
}
let count = victims.len();
for id in victims {
let _ = handle.forget(id).await;
}
Ok(count)
}
pub(super) fn refresh_closets(handle: &Arc<PalaceHandle>) -> usize {
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
let new_index = build_closet_index(&snapshot);
let count = new_index.len();
let mut closets = handle.closets.write();
*closets = new_index;
count
}
pub(super) async fn semantic_consolidation_pass(
handle: &Arc<PalaceHandle>,
config: &DreamConfig,
injected: Option<Arc<SemanticConsolidator>>,
) -> (usize, usize, usize) {
if !config.semantic.enabled {
tracing::debug!(
palace = %handle.id,
"skipping semantic consolidation: disabled in config"
);
return (0, 0, 0);
}
let consolidator: Arc<SemanticConsolidator> = if let Some(c) = injected {
c
} else {
let api_key = if !config.openrouter_api_key.is_empty() {
config.openrouter_api_key.clone()
} else {
std::env::var("OPENROUTER_API_KEY").unwrap_or_default()
};
if !inference_available(&api_key, config.local_model_enabled) {
tracing::debug!(
palace = %handle.id,
"skipping semantic consolidation: inference unavailable \
(set OPENROUTER_API_KEY or enable local_model)"
);
return (0, 0, 0);
}
use crate::memory_core::semantic_consolidation::{OllamaInference, OpenRouterInference};
let backend: Arc<dyn crate::memory_core::semantic_consolidation::Inference> =
if config.local_model_enabled && api_key.is_empty() {
Arc::new(OllamaInference::new(
"http://localhost:11434",
&config.semantic.model,
))
} else {
Arc::new(OpenRouterInference::new(api_key, &config.semantic.model))
};
Arc::new(SemanticConsolidator::new(backend, config.semantic.clone()))
};
let snapshot: Vec<Drawer> = handle.drawers.read().clone();
if snapshot.is_empty() {
return (0, 0, 0);
}
let consolidation_result = consolidator.consolidate(&snapshot).await;
let mut canonical_count = 0usize;
for canonical in &consolidation_result.canonical_drawers {
let room_type = RoomType::General;
match handle
.remember(
canonical.content.clone(),
room_type,
canonical.tags.clone(),
canonical.importance,
)
.await
{
Ok(canonical_id) => {
canonical_count += 1;
for &orig_id in &canonical.canonical_for {
let triple_subject = format!("drawer:{orig_id}");
let triple_object = format!("drawer:{canonical_id}");
let triple = crate::memory_core::store::kg::Triple {
subject: triple_subject,
predicate: "superseded_by".to_string(),
object: triple_object,
valid_from: chrono::Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: Some("dream:semantic_consolidation".to_string()),
};
if let Err(e) = handle.kg.assert(triple).await {
tracing::warn!(
orig = %orig_id,
canonical = %canonical_id,
"failed to write superseded_by triple: {e:#}"
);
}
}
}
Err(e) => {
tracing::warn!(
content = &canonical.content[..canonical.content.len().min(80)],
"dream semantic: failed to add canonical drawer: {e:#}"
);
}
}
}
for (from, to) in &consolidation_result.aliases {
let triple = crate::memory_core::store::kg::Triple {
subject: from.clone(),
predicate: "alias_of".to_string(),
object: to.clone(),
valid_from: chrono::Utc::now(),
valid_to: None,
confidence: 1.0,
provenance: Some("dream:semantic_consolidation".to_string()),
};
if let Err(e) = handle.kg.assert(triple).await {
tracing::warn!(
from,
to,
"dream semantic: failed to write alias triple: {e:#}"
);
}
}
for (id, reason) in &consolidation_result.flagged_ids {
tracing::info!(
palace = %handle.id,
drawer_id = %id,
reason,
"dream semantic: flagged drawer for human review (contradiction)"
);
}
tracing::debug!(
palace = %handle.id,
canonical_added = canonical_count,
aliases = consolidation_result.aliases.len(),
flagged = consolidation_result.flagged_ids.len(),
llm_calls = consolidation_result.llm_calls,
cache_hits = consolidation_result.cache_hits,
"semantic consolidation phase complete"
);
(
canonical_count,
consolidation_result.llm_calls,
consolidation_result.cache_hits,
)
}