use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Instant;
use tracing::warn;
use nexus_core::config::{AgentConfig, CognitionConfig, CognitiveSystemConfig};
use nexus_core::fsutil::atomic_write;
use nexus_core::traits::EmbeddingService;
use nexus_core::{cosine_similarity, CognitiveLevel, Memory, PerspectiveKey};
use nexus_llm::LlmClient;
use nexus_storage::models::EnqueueJobParams;
use nexus_storage::repository::MemoryRepository;
use serde_json::json;
use crate::cognitive_cache::{CognitiveCache, ColdIndexEntry};
use crate::context_builder::build_context_md;
use crate::distill;
use crate::error::AgentError;
use crate::job_processor;
use crate::session_manager::SessionManager;
use crate::token_budget::TokenBudget;
use crate::util::{flush_metric_samples, stage_metric_sample};
use crate::RuntimeShutdownReason;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct DreamCycleRequest<'a> {
pub namespace_id: i64,
pub lease_owner: &'a str,
pub perspective: Option<&'a PerspectiveKey>,
pub session_key: Option<&'a str>,
pub reflect_reason: &'a str,
pub digest_reason: &'a str,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum DreamScheduleAction {
ImmediateBounded,
DelayedEnqueue,
DigestOnly,
Skip,
}
#[derive(Debug, Clone)]
pub(crate) struct DreamSchedulePlan {
pub action: DreamScheduleAction,
pub reason: &'static str,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct DreamSignals {
pub raw_event_count: usize,
pub explicit_count: usize,
pub derived_count: usize,
pub contradiction_count: usize,
pub has_digest_gap: bool,
pub total_non_raw_count: usize,
pub contradiction_density: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NapResult {
pub memories_processed: usize,
pub hot_cache_updated: bool,
pub elapsed_ms: u64,
pub timed_out: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DreamResult {
pub memories_derived: usize,
pub connections_found: usize,
pub hot_cold_reranked: bool,
}
pub struct DreamServices {
pub pool: sqlx::SqlitePool,
pub cognition: CognitionConfig,
pub agent: AgentConfig,
pub llm: Arc<dyn LlmClient>,
pub embeddings: Option<Arc<dyn EmbeddingService>>,
pub cognitive_system: CognitiveSystemConfig,
}
pub async fn run_nap(
session_id: &str,
project_root: &Path,
namespace_id: i64,
services: &DreamServices,
timeout: Duration,
) -> Result<NapResult, AgentError> {
let start = Instant::now();
let nexus_dir = project_root.join(".nexus");
let result = tokio::time::timeout(timeout, async {
let processed = drain_cognition_jobs(
services.pool.clone(),
namespace_id,
&services.cognition,
&services.agent,
services.llm.clone(),
services.embeddings.clone(),
&format!("nap-{}", session_id),
)
.await?;
let mut cache = CognitiveCache::load_or_init(&nexus_dir);
let session_manager = SessionManager::new(project_root);
let merged = session_manager.merge_session(
session_id,
&mut cache.hot_cache,
services.cognitive_system.hot_cache_max_entries,
)?;
let window_size = TokenBudget::estimate_window(&services.agent.agent_type) as f32;
let max_context_tokens =
(window_size * services.cognitive_system.context_allocation_pct) as usize;
let context_md = build_context_md(&cache.hot_cache, &[], max_context_tokens);
let context_path = nexus_dir.join("context.md");
atomic_write(&context_path, &context_md)?;
cache.save(&nexus_dir)?;
if let Err(e) = session_manager.mark_session_merged(session_id) {
tracing::warn!(error = %e, "Failed to mark session as merged");
}
Ok::<NapResult, AgentError>(NapResult {
memories_processed: processed,
hot_cache_updated: merged > 0,
elapsed_ms: start.elapsed().as_millis() as u64,
timed_out: false,
})
})
.await;
match result {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => Err(e),
Err(_) => {
warn!(
"Nap timed out after {:?}; leased cognition jobs remain in queue \
and will be re-claimed on the next cycle once their lease expires",
timeout
);
Ok(NapResult {
memories_processed: 0,
hot_cache_updated: false,
elapsed_ms: start.elapsed().as_millis() as u64,
timed_out: true,
})
}
}
}
pub async fn run_dream(
project_root: &Path,
namespace_id: i64,
services: &DreamServices,
) -> Result<DreamResult, AgentError> {
let nexus_dir = project_root.join(".nexus");
let processed = drain_cognition_jobs(
services.pool.clone(),
namespace_id,
&services.cognition,
&services.agent,
services.llm.clone(),
services.embeddings.clone(),
"dream-threshold",
)
.await?;
let cache = CognitiveCache::load_or_init(&nexus_dir);
let window_size = TokenBudget::estimate_window(&services.agent.agent_type) as f32;
let max_context_tokens =
(window_size * services.cognitive_system.context_allocation_pct) as usize;
let context_md = build_context_md(&cache.hot_cache, &[], max_context_tokens);
let context_path = nexus_dir.join("context.md");
atomic_write(&context_path, &context_md)?;
cache.save(&nexus_dir)?;
Ok(DreamResult {
memories_derived: processed,
connections_found: 0,
hot_cold_reranked: false,
})
}
pub async fn run_dream_cycle(
pool: sqlx::SqlitePool,
cognition: &CognitionConfig,
agent: &AgentConfig,
llm: Arc<dyn LlmClient>,
embeddings: Option<Arc<dyn EmbeddingService>>,
request: DreamCycleRequest<'_>,
) -> Result<usize, AgentError> {
let repo = MemoryRepository::new(pool.clone());
let total_started = Instant::now();
let mut metrics = Vec::new();
let enqueue_started = Instant::now();
enqueue_dream_jobs(
&repo,
request.namespace_id,
request.perspective,
request.session_key,
request.reflect_reason,
request.digest_reason,
)
.await?;
metrics.push(stage_metric_sample(
request.namespace_id,
"cognition.dream.enqueue_ms",
enqueue_started.elapsed().as_secs_f64() * 1000.0,
"enqueue",
));
let drain_started = Instant::now();
let processed = drain_cognition_jobs(
pool,
request.namespace_id,
cognition,
agent,
llm,
embeddings,
request.lease_owner,
)
.await?;
metrics.push(stage_metric_sample(
request.namespace_id,
"cognition.dream.drain_ms",
drain_started.elapsed().as_secs_f64() * 1000.0,
"drain",
));
metrics.push(stage_metric_sample(
request.namespace_id,
"cognition.dream.total_ms",
total_started.elapsed().as_secs_f64() * 1000.0,
"total",
));
flush_metric_samples(&repo, &metrics).await;
Ok(processed)
}
pub async fn drain_cognition_jobs(
pool: sqlx::SqlitePool,
namespace_id: i64,
cognition: &CognitionConfig,
agent: &AgentConfig,
llm: Arc<dyn LlmClient>,
embeddings: Option<Arc<dyn EmbeddingService>>,
lease_owner: &str,
) -> Result<usize, AgentError> {
let repo = MemoryRepository::new(pool);
let mut total_processed = 0usize;
for _ in 0..3 {
let mut progressed = 0usize;
if cognition.activity_distill_enabled {
progressed += distill::process_activity_distill_jobs(
&repo,
namespace_id,
cognition,
llm.clone(),
lease_owner,
)
.await?;
}
if cognition.derive_enabled {
progressed += job_processor::process_derive_jobs(
&repo,
namespace_id,
cognition,
agent,
llm.clone(),
embeddings.clone(),
lease_owner,
)
.await?;
}
if cognition.reflect_enabled {
progressed += job_processor::process_reflect_jobs(
&repo,
namespace_id,
cognition,
agent,
embeddings.clone(),
lease_owner,
)
.await?;
progressed += job_processor::process_reflect_namespace_jobs(
&repo,
namespace_id,
cognition,
agent,
embeddings.clone(),
lease_owner,
)
.await?;
}
if cognition.digest_enabled {
progressed += job_processor::process_digest_jobs(
&repo,
namespace_id,
cognition,
agent,
llm.clone(),
embeddings.clone(),
lease_owner,
)
.await?;
}
total_processed += progressed;
if progressed == 0 {
break;
}
}
Ok(total_processed)
}
pub async fn enqueue_dream_jobs(
repo: &MemoryRepository,
namespace_id: i64,
perspective: Option<&PerspectiveKey>,
session_key: Option<&str>,
reflect_reason: &str,
digest_reason: &str,
) -> Result<usize, AgentError> {
let mut queued = 0usize;
if let Some(perspective) = perspective {
let perspective_json = serde_json::to_value(perspective)
.map_err(|error| AgentError::Reflection(error.to_string()))?;
let payload = json!({
"reason": reflect_reason,
"session_key": perspective.session_key,
});
if job_processor::enqueue_job_if_absent(
repo,
EnqueueJobParams {
namespace_id,
job_type: job_processor::REFLECT_PERSPECTIVE_JOB,
priority: 100,
perspective: Some(&perspective_json),
payload: &payload,
},
)
.await?
{
queued += 1;
}
} else {
let payload = json!({
"reason": reflect_reason,
"session_key": session_key,
});
if job_processor::enqueue_job_if_absent(
repo,
EnqueueJobParams {
namespace_id,
job_type: job_processor::REFLECT_NAMESPACE_JOB,
priority: 100,
perspective: None,
payload: &payload,
},
)
.await?
{
queued += 1;
}
}
if let Some(session_key) = session_key {
let payload = json!({
"session_key": session_key,
"reason": digest_reason,
});
if job_processor::enqueue_job_if_absent(
repo,
EnqueueJobParams {
namespace_id,
job_type: job_processor::DIGEST_SESSION_JOB,
priority: 110,
perspective: None,
payload: &payload,
},
)
.await?
{
queued += 1;
}
}
Ok(queued)
}
pub(crate) async fn collect_dream_signals(
repo: &MemoryRepository,
namespace_id: i64,
session_key: &str,
) -> Result<DreamSignals, AgentError> {
let memories = repo
.list_by_session_key(namespace_id, session_key, 512, true)
.await
.map_err(|error| AgentError::Storage(error.to_string()))?;
let has_digest_gap = repo
.count_digests(namespace_id, Some(session_key))
.await
.map_err(|error| AgentError::Storage(error.to_string()))?
== 0;
let mut signals = DreamSignals {
has_digest_gap,
..DreamSignals::default()
};
for memory in memories
.iter()
.filter(|memory| memory_matches_session_key(memory, session_key))
{
if is_raw_event(memory) {
signals.raw_event_count += 1;
} else {
signals.total_non_raw_count += 1;
let level = nexus_core::cognitive_level_from_metadata(&memory.metadata);
match level {
CognitiveLevel::Explicit => signals.explicit_count += 1,
CognitiveLevel::Derived => signals.derived_count += 1,
CognitiveLevel::Contradiction => signals.contradiction_count += 1,
_ => {}
}
if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&memory.metadata) {
if cog.times_contradicted > 0 && level != CognitiveLevel::Contradiction {
signals.contradiction_count += 1;
}
}
}
}
if signals.total_non_raw_count > 0 {
signals.contradiction_density =
signals.contradiction_count as f32 / signals.total_non_raw_count as f32;
}
Ok(signals)
}
fn memory_matches_session_key(memory: &Memory, session_key: &str) -> bool {
if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&memory.metadata) {
if cog.session_key.as_deref() == Some(session_key) {
return true;
}
if cog.session_keys.iter().any(|k| k == session_key) {
return true;
}
}
false
}
fn is_raw_event(memory: &Memory) -> bool {
memory.labels.iter().any(|l| l == "raw-activity")
|| nexus_core::cognitive_level_from_metadata(&memory.metadata) == CognitiveLevel::Raw
}
use crate::activity_monitor::ActivityMonitor;
use crate::soul::{SoulBuilder, SoulCandidate};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeepDreamResult {
pub soul_updated: bool,
pub memories_pruned: usize,
pub cross_project_patterns: usize,
pub cold_caches_reindexed: usize,
}
pub async fn run_deep_dream(
services: &DreamServices,
soul_builder: &SoulBuilder,
activity_monitor: &mut ActivityMonitor,
) -> Result<DeepDreamResult, AgentError> {
let repo = MemoryRepository::new(services.pool.clone());
let ns_repo = nexus_storage::repository::NamespaceRepository::new(services.pool.clone());
let namespaces = ns_repo
.list_all()
.await
.map_err(|e| AgentError::Storage(e.to_string()))?;
for ns in &namespaces {
if let Err(e) = drain_cognition_jobs(
services.pool.clone(),
ns.id,
&services.cognition,
&services.agent,
services.llm.clone(),
services.embeddings.clone(),
"deep-dream-cleanup",
)
.await
{
tracing::warn!(namespace_id = ns.id, error = %e, "drain_cognition_jobs failed");
}
}
let mut memories_by_project: HashMap<String, Vec<Memory>> = HashMap::new();
for ns in &namespaces {
let filters = nexus_storage::repository::ListMemoryFilters {
category: None,
since: None,
until: None,
content_like: None,
include_raw: false,
limit: 1000,
offset: 0,
};
if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
for m in memories {
let level = m.level();
if level == CognitiveLevel::Derived || level == CognitiveLevel::Explicit {
let project_name = m
.metadata
.get("runtime")
.and_then(|r| r.get("project_name"))
.and_then(|p| p.as_str())
.unwrap_or("unknown")
.to_string();
memories_by_project.entry(project_name).or_default().push(m);
}
}
}
}
let candidates = extract_cross_project_patterns(
memories_by_project,
services.embeddings.as_deref(),
services.cognitive_system.similarity_threshold,
)
.await;
let total_patterns = candidates.len();
let soul_updated = if !candidates.is_empty() {
soul_builder.rebuild_soul(&candidates).await.is_ok()
} else {
false
};
let mut memories_pruned = 0usize;
let prune_cutoff = chrono::Utc::now() - chrono::Duration::days(30);
for ns in &namespaces {
if let Ok(candidates) = repo
.list_archived_raw_cleanup_candidates(ns.id, prune_cutoff, 500)
.await
{
if !candidates.is_empty() {
let ids: Vec<i64> = candidates.iter().map(|m| m.id).collect();
match repo.delete_batch(&ids).await {
Ok(deleted) => memories_pruned += deleted as usize,
Err(e) => {
warn!(
error = %e, count = ids.len(),
"Failed to delete archived raw memories; skipping"
);
}
}
}
}
}
let mut cold_caches_reindexed = 0usize;
let mut project_roots: HashSet<PathBuf> = HashSet::new();
for ns in &namespaces {
let filters = nexus_storage::repository::ListMemoryFilters {
category: None,
since: Some(chrono::Utc::now() - chrono::Duration::days(90)),
until: None,
content_like: None,
include_raw: true,
limit: 200,
offset: 0,
};
if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
for m in &memories {
if let Some(cwd) = m
.metadata
.get("runtime")
.and_then(|r| r.get("cwd"))
.and_then(|v| v.as_str())
{
let root = PathBuf::from(cwd);
if root.join(".nexus").exists() {
project_roots.insert(root);
}
}
}
}
}
let reindex_cutoff = chrono::Utc::now() - chrono::Duration::days(7);
for root in &project_roots {
let nexus_dir = root.join(".nexus");
let mut cache = CognitiveCache::load_or_init(&nexus_dir);
let mut fresh_entries: Vec<ColdIndexEntry> = Vec::new();
for ns in &namespaces {
let filters = nexus_storage::repository::ListMemoryFilters {
category: None,
since: Some(reindex_cutoff),
until: None,
content_like: None,
include_raw: false,
limit: 50,
offset: 0,
};
if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
for m in memories {
let cwd_match = m
.metadata
.get("runtime")
.and_then(|r| r.get("cwd"))
.and_then(|v| v.as_str())
.map(|c| Path::new(c) == root.as_path())
.unwrap_or(false);
if cwd_match {
fresh_entries.push(ColdIndexEntry {
memory_id: m.id,
project_relevance: 0.7,
last_surfaced: Some(m.created_at),
});
}
}
}
}
cache.cold_index.entries = fresh_entries;
cache.cold_index.last_reindexed = Some(chrono::Utc::now());
match cache.save(&nexus_dir) {
Ok(()) if !cache.cold_index.entries.is_empty() => {
cold_caches_reindexed += 1;
}
Ok(()) => {}
Err(e) => {
tracing::warn!(error = %e, "Failed to save cold cache after reindexing");
}
}
}
activity_monitor.last_deep_dream = Some(chrono::Utc::now());
if let Err(e) = activity_monitor.save() {
tracing::error!("Failed to save activity monitor after deep dream: {}", e);
}
Ok(DeepDreamResult {
soul_updated,
memories_pruned,
cross_project_patterns: total_patterns,
cold_caches_reindexed,
})
}
pub async fn extract_cross_project_patterns(
memories_by_project: HashMap<String, Vec<Memory>>,
embeddings: Option<&dyn EmbeddingService>,
similarity_threshold: f32,
) -> Vec<SoulCandidate> {
let mut flat_memories: Vec<(Memory, String)> = Vec::new();
for (project, memories) in memories_by_project {
for m in memories {
flat_memories.push((m, project.clone()));
}
}
if flat_memories.len() < 2 {
return Vec::new();
}
let mut emb_map: HashMap<i64, Vec<f32>> = HashMap::new();
let mut to_compute: Vec<usize> = Vec::new();
for (idx, (m, _)) in flat_memories.iter().enumerate() {
if let Some(emb) = &m.content_embedding {
emb_map.insert(m.id, emb.clone());
} else {
to_compute.push(idx);
}
}
if let Some(service) = embeddings {
let contents: Vec<String> = to_compute
.iter()
.map(|&idx| flat_memories[idx].0.content.clone())
.collect();
if !contents.is_empty() {
match service.embed_batch(&contents).await {
Ok(results) if results.len() == contents.len() => {
for (idx, emb) in to_compute.into_iter().zip(results) {
let mem_id = flat_memories[idx].0.id;
emb_map.insert(mem_id, emb);
}
}
Ok(results) => {
tracing::warn!(
"embed_batch returned {} results for {} inputs in pattern extraction",
results.len(),
contents.len()
);
}
Err(e) => {
tracing::warn!("embed_batch failed in pattern extraction: {}", e);
}
}
}
}
if emb_map.len() * 2 < flat_memories.len() {
let mut pattern_map: HashMap<String, (u32, Vec<String>)> = HashMap::new();
for (m, project) in &flat_memories {
let normalized = m.content.to_lowercase();
let entry = pattern_map.entry(normalized).or_insert((0, Vec::new()));
entry.0 += 1;
if !entry.1.contains(project) {
entry.1.push(project.clone());
}
}
return pattern_map
.into_iter()
.filter(|(_, (_count, projects))| projects.len() >= 2)
.map(|(content, (count, projects))| SoulCandidate {
content,
source_project: projects.join(", "),
observation_count: count,
category: "TechnicalLearning".into(),
source_agent: "nexus-dream-engine".into(),
})
.collect();
}
let n = flat_memories.len();
let mut parent: Vec<usize> = (0..n).collect();
fn find(mut x: usize, parent: &mut [usize]) -> usize {
let mut root = x;
while parent[root] != root {
root = parent[root];
}
while parent[x] != root {
let next = parent[x];
parent[x] = root;
x = next;
}
root
}
fn union(x: usize, y: usize, parent: &mut [usize]) {
let rx = find(x, parent);
let ry = find(y, parent);
if rx != ry {
parent[ry] = rx;
}
}
let indices: Vec<usize> = (0..n)
.filter(|i| emb_map.contains_key(&flat_memories[*i].0.id))
.collect();
for &i in &indices {
let emb_i = emb_map.get(&flat_memories[i].0.id).unwrap();
for &j in indices.iter().filter(|&&j| j > i) {
let emb_j = emb_map.get(&flat_memories[j].0.id).unwrap();
if cosine_similarity(emb_i, emb_j) >= similarity_threshold {
union(i, j, &mut parent);
}
}
}
let mut clusters: HashMap<usize, Vec<usize>> = HashMap::new();
for i in 0..n {
let root = find(i, &mut parent);
clusters.entry(root).or_default().push(i);
}
let mut candidates = Vec::new();
for indices in clusters.values() {
let mut projects_set: HashSet<String> = HashSet::new();
for &idx in indices {
projects_set.insert(flat_memories[idx].1.clone());
}
if projects_set.len() >= 2 {
let (rep_idx, _) = indices
.iter()
.map(|&idx| (idx, flat_memories[idx].0.content.len()))
.max_by_key(|(_, len)| *len)
.unwrap();
let content = flat_memories[rep_idx].0.content.clone();
let observation_count = indices.len() as u32;
let source_project = projects_set.into_iter().collect::<Vec<String>>().join(", ");
candidates.push(SoulCandidate {
content,
source_project,
observation_count,
category: "TechnicalLearning".into(),
source_agent: "nexus-dream-engine".into(),
});
}
}
candidates
}
pub(crate) fn choose_dream_schedule(
signals: &DreamSignals,
reason: RuntimeShutdownReason,
) -> DreamSchedulePlan {
if signals.total_non_raw_count == 0
&& signals.raw_event_count == 0
&& signals.explicit_count == 0
&& signals.derived_count == 0
&& signals.contradiction_count == 0
{
return DreamSchedulePlan {
action: DreamScheduleAction::Skip,
reason: "no signals",
};
}
if signals.contradiction_density > 0.0 {
if signals.contradiction_density > 0.15 {
return DreamSchedulePlan {
action: DreamScheduleAction::ImmediateBounded,
reason: "high contradiction density",
};
}
if reason == RuntimeShutdownReason::SessionEnded && signals.contradiction_density >= 0.10 {
return DreamSchedulePlan {
action: DreamScheduleAction::ImmediateBounded,
reason: "moderate contradiction density at session end",
};
}
if reason == RuntimeShutdownReason::IdleTimeout {
if signals.explicit_count > 0 || signals.derived_count > 0 {
return DreamSchedulePlan {
action: DreamScheduleAction::DelayedEnqueue,
reason: "delays idle medium signal sessions",
};
}
return DreamSchedulePlan {
action: DreamScheduleAction::Skip,
reason: "idle without signal",
};
}
if reason == RuntimeShutdownReason::SessionEnded
&& (signals.explicit_count > 0 || signals.derived_count > 0)
{
return DreamSchedulePlan {
action: DreamScheduleAction::ImmediateBounded,
reason: "session end flushes explicit reflection",
};
}
if signals.has_digest_gap && signals.raw_event_count > 0 {
return DreamSchedulePlan {
action: DreamScheduleAction::DigestOnly,
reason: "digest only for light digest gap",
};
}
return DreamSchedulePlan {
action: DreamScheduleAction::DelayedEnqueue,
reason: "default_background",
};
}
if signals.contradiction_count > 0 {
return DreamSchedulePlan {
action: DreamScheduleAction::ImmediateBounded,
reason: "contradiction detected",
};
}
if reason == RuntimeShutdownReason::SessionEnded
&& (signals.explicit_count > 0 || signals.derived_count > 0)
{
return DreamSchedulePlan {
action: DreamScheduleAction::ImmediateBounded,
reason: "session end flushes explicit reflection",
};
}
if signals.has_digest_gap && signals.raw_event_count > 0 {
return DreamSchedulePlan {
action: DreamScheduleAction::DigestOnly,
reason: "digest only for light digest gap",
};
}
if reason == RuntimeShutdownReason::IdleTimeout {
if signals.explicit_count > 0 || signals.derived_count > 0 {
return DreamSchedulePlan {
action: DreamScheduleAction::DelayedEnqueue,
reason: "delays idle medium signal sessions",
};
}
return DreamSchedulePlan {
action: DreamScheduleAction::Skip,
reason: "idle without signal",
};
}
DreamSchedulePlan {
action: DreamScheduleAction::DelayedEnqueue,
reason: "default_background",
}
}
pub(crate) async fn compute_adaptive_dream_interval(
pool: sqlx::SqlitePool,
namespace_id: i64,
base_interval_secs: u64,
cognition: &CognitionConfig,
) -> Duration {
let repo = MemoryRepository::new(pool);
let filters = nexus_storage::repository::ListMemoryFilters {
category: None,
since: None,
until: None,
content_like: None,
include_raw: true,
limit: 100,
offset: 0,
};
if let Ok(recent_memories) = repo.list_filtered(namespace_id, filters).await {
if recent_memories.is_empty() {
return Duration::from_secs(base_interval_secs.clamp(
cognition.adaptive_dream_min_interval_secs,
cognition.adaptive_dream_max_interval_secs,
));
}
let contradiction_count = recent_memories
.iter()
.filter(|m| {
if nexus_core::cognitive_level_from_metadata(&m.metadata)
== CognitiveLevel::Contradiction
{
return true;
}
if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&m.metadata) {
if cog.times_contradicted > 0 {
return true;
}
}
false
})
.count();
let density = contradiction_count as f32 / recent_memories.len() as f32;
let multiplier = if density > 0.10 {
0.5
} else if recent_memories.len() < 5 {
2.0
} else {
1.0
};
let interval = (base_interval_secs as f32 * multiplier) as u64;
let interval = interval.clamp(
cognition.adaptive_dream_min_interval_secs,
cognition.adaptive_dream_max_interval_secs,
);
return Duration::from_secs(interval);
}
Duration::from_secs(base_interval_secs.clamp(
cognition.adaptive_dream_min_interval_secs,
cognition.adaptive_dream_max_interval_secs,
))
}