1use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use std::time::Instant;
7use tracing::warn;
8
9use nexus_core::config::{AgentConfig, CognitionConfig, CognitiveSystemConfig};
10use nexus_core::fsutil::atomic_write;
11use nexus_core::traits::EmbeddingService;
12use nexus_core::{cosine_similarity, CognitiveLevel, Memory, PerspectiveKey};
13use nexus_llm::LlmClient;
14use nexus_storage::models::EnqueueJobParams;
15use nexus_storage::repository::MemoryRepository;
16use serde_json::json;
17
18use crate::cognitive_cache::{CognitiveCache, ColdIndexEntry};
19use crate::context_builder::build_context_md;
20use crate::distill;
21use crate::error::AgentError;
22use crate::job_processor;
23use crate::session_manager::SessionManager;
24use crate::token_budget::TokenBudget;
25use crate::util::{flush_metric_samples, stage_metric_sample};
26use crate::RuntimeShutdownReason;
27
28use std::collections::{HashMap, HashSet};
29use std::path::{Path, PathBuf};
30use std::time::Duration;
31
32#[derive(Debug, Clone)]
35pub struct DreamCycleRequest<'a> {
36 pub namespace_id: i64,
37 pub lease_owner: &'a str,
38 pub perspective: Option<&'a PerspectiveKey>,
39 pub session_key: Option<&'a str>,
40 pub reflect_reason: &'a str,
41 pub digest_reason: &'a str,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub(crate) enum DreamScheduleAction {
48 ImmediateBounded,
49 DelayedEnqueue,
50 DigestOnly,
51 Skip,
52}
53
54#[derive(Debug, Clone)]
55pub(crate) struct DreamSchedulePlan {
56 pub action: DreamScheduleAction,
57 pub reason: &'static str,
58}
59
60#[derive(Debug, Clone, Default)]
61pub(crate) struct DreamSignals {
62 pub raw_event_count: usize,
63 pub explicit_count: usize,
64 pub derived_count: usize,
65 pub contradiction_count: usize,
66 pub has_digest_gap: bool,
67 pub total_non_raw_count: usize,
69 pub contradiction_density: f32,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct NapResult {
75 pub memories_processed: usize,
76 pub hot_cache_updated: bool,
77 pub elapsed_ms: u64,
78 pub timed_out: bool,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct DreamResult {
83 pub memories_derived: usize,
84 pub connections_found: usize,
85 pub hot_cold_reranked: bool,
86}
87
88pub struct DreamServices {
91 pub pool: sqlx::SqlitePool,
92 pub cognition: CognitionConfig,
93 pub agent: AgentConfig,
94 pub llm: Arc<dyn LlmClient>,
95 pub embeddings: Option<Arc<dyn EmbeddingService>>,
96 pub cognitive_system: CognitiveSystemConfig,
97}
98
99pub async fn run_nap(
101 session_id: &str,
102 project_root: &Path,
103 namespace_id: i64,
104 services: &DreamServices,
105 timeout: Duration,
106) -> Result<NapResult, AgentError> {
107 let start = Instant::now();
108 let nexus_dir = project_root.join(".nexus");
109
110 let result = tokio::time::timeout(timeout, async {
111 let processed = drain_cognition_jobs(
113 services.pool.clone(),
114 namespace_id,
115 &services.cognition,
116 &services.agent,
117 services.llm.clone(),
118 services.embeddings.clone(),
119 &format!("nap-{}", session_id),
120 )
121 .await?;
122
123 let mut cache = CognitiveCache::load_or_init(&nexus_dir);
125 let session_manager = SessionManager::new(project_root);
126 let merged = session_manager.merge_session(
127 session_id,
128 &mut cache.hot_cache,
129 services.cognitive_system.hot_cache_max_entries,
130 )?;
131
132 let window_size = TokenBudget::estimate_window(&services.agent.agent_type) as f32;
134 let max_context_tokens =
135 (window_size * services.cognitive_system.context_allocation_pct) as usize;
136 let context_md = build_context_md(&cache.hot_cache, &[], max_context_tokens);
137 let context_path = nexus_dir.join("context.md");
138 atomic_write(&context_path, &context_md)?;
139
140 cache.save(&nexus_dir)?;
142
143 if let Err(e) = session_manager.mark_session_merged(session_id) {
145 tracing::warn!(error = %e, "Failed to mark session as merged");
146 }
147
148 Ok::<NapResult, AgentError>(NapResult {
149 memories_processed: processed,
150 hot_cache_updated: merged > 0,
151 elapsed_ms: start.elapsed().as_millis() as u64,
152 timed_out: false,
153 })
154 })
155 .await;
156
157 match result {
158 Ok(Ok(res)) => Ok(res),
159 Ok(Err(e)) => Err(e),
160 Err(_) => {
161 warn!(
162 "Nap timed out after {:?}; leased cognition jobs remain in queue \
163 and will be re-claimed on the next cycle once their lease expires",
164 timeout
165 );
166 Ok(NapResult {
167 memories_processed: 0,
168 hot_cache_updated: false,
169 elapsed_ms: start.elapsed().as_millis() as u64,
170 timed_out: true,
171 })
172 }
173 }
174}
175
176pub async fn run_dream(
178 project_root: &Path,
179 namespace_id: i64,
180 services: &DreamServices,
181) -> Result<DreamResult, AgentError> {
182 let nexus_dir = project_root.join(".nexus");
183
184 let processed = match drain_cognition_jobs(
186 services.pool.clone(),
187 namespace_id,
188 &services.cognition,
189 &services.agent,
190 services.llm.clone(),
191 services.embeddings.clone(),
192 "dream-threshold",
193 )
194 .await
195 {
196 Ok(result) => result,
197 Err(e) => {
198 tracing::error!(
199 namespace_id = namespace_id,
200 error = %e,
201 "Failed to drain cognition jobs in dream cycle"
202 );
203 return Err(e);
204 }
205 };
206
207 let cache = CognitiveCache::load_or_init(&nexus_dir);
209
210 let window_size = TokenBudget::estimate_window(&services.agent.agent_type) as f32;
212 let max_context_tokens =
213 (window_size * services.cognitive_system.context_allocation_pct) as usize;
214 let context_md = build_context_md(&cache.hot_cache, &[], max_context_tokens);
215 let context_path = nexus_dir.join("context.md");
216 atomic_write(&context_path, &context_md)?;
217
218 cache.save(&nexus_dir)?;
220
221 Ok(DreamResult {
222 memories_derived: processed,
223 connections_found: 0,
224 hot_cold_reranked: false,
225 })
226}
227
228pub async fn run_dream_cycle(
231 pool: sqlx::SqlitePool,
232 cognition: &CognitionConfig,
233 agent: &AgentConfig,
234 llm: Arc<dyn LlmClient>,
235 embeddings: Option<Arc<dyn EmbeddingService>>,
236 request: DreamCycleRequest<'_>,
237) -> Result<usize, AgentError> {
238 let repo = MemoryRepository::new(pool.clone());
239 let total_started = Instant::now();
240 let mut metrics = Vec::new();
241 let enqueue_started = Instant::now();
242 enqueue_dream_jobs(
243 &repo,
244 request.namespace_id,
245 request.perspective,
246 request.session_key,
247 request.reflect_reason,
248 request.digest_reason,
249 )
250 .await?;
251 metrics.push(stage_metric_sample(
252 request.namespace_id,
253 "cognition.dream.enqueue_ms",
254 enqueue_started.elapsed().as_secs_f64() * 1000.0,
255 "enqueue",
256 ));
257 let drain_started = Instant::now();
258 let processed = drain_cognition_jobs(
259 pool,
260 request.namespace_id,
261 cognition,
262 agent,
263 llm,
264 embeddings,
265 request.lease_owner,
266 )
267 .await?;
268 metrics.push(stage_metric_sample(
269 request.namespace_id,
270 "cognition.dream.drain_ms",
271 drain_started.elapsed().as_secs_f64() * 1000.0,
272 "drain",
273 ));
274 metrics.push(stage_metric_sample(
275 request.namespace_id,
276 "cognition.dream.total_ms",
277 total_started.elapsed().as_secs_f64() * 1000.0,
278 "total",
279 ));
280 flush_metric_samples(&repo, &metrics).await;
281 Ok(processed)
282}
283
284pub async fn drain_cognition_jobs(
287 pool: sqlx::SqlitePool,
288 namespace_id: i64,
289 cognition: &CognitionConfig,
290 agent: &AgentConfig,
291 llm: Arc<dyn LlmClient>,
292 embeddings: Option<Arc<dyn EmbeddingService>>,
293 lease_owner: &str,
294) -> Result<usize, AgentError> {
295 let repo = MemoryRepository::new(pool);
296 let mut total_processed = 0usize;
297
298 for _ in 0..3 {
299 let mut progressed = 0usize;
300
301 if cognition.activity_distill_enabled {
302 progressed += distill::process_activity_distill_jobs(
303 &repo,
304 namespace_id,
305 cognition,
306 llm.clone(),
307 lease_owner,
308 )
309 .await?;
310 }
311 if cognition.derive_enabled {
312 progressed += job_processor::process_derive_jobs(
313 &repo,
314 namespace_id,
315 cognition,
316 agent,
317 llm.clone(),
318 embeddings.clone(),
319 lease_owner,
320 )
321 .await?;
322 }
323 if cognition.reflect_enabled {
324 progressed += job_processor::process_reflect_jobs(
325 &repo,
326 namespace_id,
327 cognition,
328 agent,
329 embeddings.clone(),
330 lease_owner,
331 )
332 .await?;
333 progressed += job_processor::process_reflect_namespace_jobs(
334 &repo,
335 namespace_id,
336 cognition,
337 agent,
338 embeddings.clone(),
339 lease_owner,
340 )
341 .await?;
342 }
343 if cognition.digest_enabled {
344 progressed += job_processor::process_digest_jobs(
345 &repo,
346 namespace_id,
347 cognition,
348 agent,
349 llm.clone(),
350 embeddings.clone(),
351 lease_owner,
352 )
353 .await?;
354 }
355
356 total_processed += progressed;
357 if progressed == 0 {
358 break;
359 }
360 }
361
362 Ok(total_processed)
363}
364
365pub async fn enqueue_dream_jobs(
368 repo: &MemoryRepository,
369 namespace_id: i64,
370 perspective: Option<&PerspectiveKey>,
371 session_key: Option<&str>,
372 reflect_reason: &str,
373 digest_reason: &str,
374) -> Result<usize, AgentError> {
375 let mut queued = 0usize;
376
377 if let Some(perspective) = perspective {
378 let perspective_json = serde_json::to_value(perspective)
379 .map_err(|error| AgentError::Reflection(error.to_string()))?;
380 let payload = json!({
381 "reason": reflect_reason,
382 "session_key": perspective.session_key,
383 });
384 if job_processor::enqueue_job_if_absent(
385 repo,
386 EnqueueJobParams {
387 namespace_id,
388 job_type: job_processor::REFLECT_PERSPECTIVE_JOB,
389 priority: 100,
390 perspective: Some(&perspective_json),
391 payload: &payload,
392 },
393 )
394 .await?
395 {
396 queued += 1;
397 }
398 } else {
399 let payload = json!({
400 "reason": reflect_reason,
401 "session_key": session_key,
402 });
403 if job_processor::enqueue_job_if_absent(
404 repo,
405 EnqueueJobParams {
406 namespace_id,
407 job_type: job_processor::REFLECT_NAMESPACE_JOB,
408 priority: 100,
409 perspective: None,
410 payload: &payload,
411 },
412 )
413 .await?
414 {
415 queued += 1;
416 }
417 }
418
419 if let Some(session_key) = session_key {
420 let payload = json!({
421 "session_key": session_key,
422 "reason": digest_reason,
423 });
424 if job_processor::enqueue_job_if_absent(
425 repo,
426 EnqueueJobParams {
427 namespace_id,
428 job_type: job_processor::DIGEST_SESSION_JOB,
429 priority: 110,
430 perspective: None,
431 payload: &payload,
432 },
433 )
434 .await?
435 {
436 queued += 1;
437 }
438 }
439
440 Ok(queued)
441}
442
443pub(crate) async fn collect_dream_signals(
446 repo: &MemoryRepository,
447 namespace_id: i64,
448 session_key: &str,
449) -> Result<DreamSignals, AgentError> {
450 let memories = repo
451 .list_by_session_key(namespace_id, session_key, 512, true)
452 .await
453 .map_err(|error| AgentError::Storage(error.to_string()))?;
454 let has_digest_gap = repo
455 .count_digests(namespace_id, Some(session_key))
456 .await
457 .map_err(|error| AgentError::Storage(error.to_string()))?
458 == 0;
459
460 let mut signals = DreamSignals {
461 has_digest_gap,
462 ..DreamSignals::default()
463 };
464 for memory in memories
465 .iter()
466 .filter(|memory| memory_matches_session_key(memory, session_key))
467 {
468 if is_raw_event(memory) {
469 signals.raw_event_count += 1;
470 } else {
471 signals.total_non_raw_count += 1;
472
473 let level = nexus_core::cognitive_level_from_metadata(&memory.metadata);
474 match level {
475 CognitiveLevel::Explicit => signals.explicit_count += 1,
476 CognitiveLevel::Derived => signals.derived_count += 1,
477 CognitiveLevel::Contradiction => signals.contradiction_count += 1,
478 _ => {}
479 }
480 if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&memory.metadata) {
483 if cog.times_contradicted > 0 && level != CognitiveLevel::Contradiction {
484 signals.contradiction_count += 1;
485 }
486 }
487 }
488 }
489
490 if signals.total_non_raw_count > 0 {
491 signals.contradiction_density =
492 signals.contradiction_count as f32 / signals.total_non_raw_count as f32;
493 }
494
495 Ok(signals)
496}
497
498fn memory_matches_session_key(memory: &Memory, session_key: &str) -> bool {
499 if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&memory.metadata) {
500 if cog.session_key.as_deref() == Some(session_key) {
501 return true;
502 }
503 if cog.session_keys.iter().any(|k| k == session_key) {
504 return true;
505 }
506 }
507 false
508}
509
510fn is_raw_event(memory: &Memory) -> bool {
516 memory.labels.iter().any(|l| l == "raw-activity")
519 || nexus_core::cognitive_level_from_metadata(&memory.metadata) == CognitiveLevel::Raw
520}
521
522use crate::activity_monitor::ActivityMonitor;
525use crate::soul::{SoulBuilder, SoulCandidate};
526
527#[derive(Debug, Clone, Serialize, Deserialize)]
528pub struct DeepDreamResult {
529 pub soul_updated: bool,
530 pub memories_pruned: usize,
531 pub cross_project_patterns: usize,
532 pub cold_caches_reindexed: usize,
533}
534
535pub async fn run_deep_dream(
537 services: &DreamServices,
538 soul_builder: &SoulBuilder,
539 activity_monitor: &mut ActivityMonitor,
540) -> Result<DeepDreamResult, AgentError> {
541 let repo = MemoryRepository::new(services.pool.clone());
542 let ns_repo = nexus_storage::repository::NamespaceRepository::new(services.pool.clone());
543
544 let namespaces = ns_repo
546 .list_all()
547 .await
548 .map_err(|e| AgentError::Storage(e.to_string()))?;
549
550 for ns in &namespaces {
552 if let Err(e) = drain_cognition_jobs(
553 services.pool.clone(),
554 ns.id,
555 &services.cognition,
556 &services.agent,
557 services.llm.clone(),
558 services.embeddings.clone(),
559 "deep-dream-cleanup",
560 )
561 .await
562 {
563 tracing::warn!(namespace_id = ns.id, error = %e, "drain_cognition_jobs failed");
564 }
565 }
566
567 let mut memories_by_project: HashMap<String, Vec<Memory>> = HashMap::new();
569 for ns in &namespaces {
570 let filters = nexus_storage::repository::ListMemoryFilters {
571 category: None,
572 since: None,
573 until: None,
574 content_like: None,
575 include_raw: false,
576 limit: 1000,
577 offset: 0,
578 };
579 if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
580 for m in memories {
581 let level = m.level();
582 if level == CognitiveLevel::Derived || level == CognitiveLevel::Explicit {
583 let project_name = m
584 .metadata
585 .get("runtime")
586 .and_then(|r| r.get("project_name"))
587 .and_then(|p| p.as_str())
588 .unwrap_or("unknown")
589 .to_string();
590 memories_by_project.entry(project_name).or_default().push(m);
591 }
592 }
593 }
594 }
595
596 let candidates = extract_cross_project_patterns(
597 memories_by_project,
598 services.embeddings.as_deref(),
599 services.cognitive_system.similarity_threshold,
600 )
601 .await;
602 let total_patterns = candidates.len();
603
604 let soul_updated = if !candidates.is_empty() {
606 soul_builder.rebuild_soul(&candidates).await.is_ok()
607 } else {
608 false
609 };
610
611 let mut memories_pruned = 0usize;
613 let prune_cutoff = chrono::Utc::now() - chrono::Duration::days(30);
614 for ns in &namespaces {
615 if let Ok(candidates) = repo
616 .list_archived_raw_cleanup_candidates(ns.id, prune_cutoff, 500)
617 .await
618 {
619 if !candidates.is_empty() {
620 let ids: Vec<i64> = candidates.iter().map(|m| m.id).collect();
621 match repo.delete_batch(&ids).await {
622 Ok(deleted) => memories_pruned += deleted as usize,
623 Err(e) => {
624 warn!(
625 error = %e, count = ids.len(),
626 "Failed to delete archived raw memories; skipping"
627 );
628 }
629 }
630 }
631 }
632 }
633
634 let mut cold_caches_reindexed = 0usize;
636 let mut project_roots: HashSet<PathBuf> = HashSet::new();
638 for ns in &namespaces {
639 let filters = nexus_storage::repository::ListMemoryFilters {
640 category: None,
641 since: Some(chrono::Utc::now() - chrono::Duration::days(90)),
642 until: None,
643 content_like: None,
644 include_raw: true,
645 limit: 200,
646 offset: 0,
647 };
648 if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
649 for m in &memories {
650 if let Some(cwd) = m
651 .metadata
652 .get("runtime")
653 .and_then(|r| r.get("cwd"))
654 .and_then(|v| v.as_str())
655 {
656 let root = PathBuf::from(cwd);
657 if root.join(".nexus").exists() {
658 project_roots.insert(root);
659 }
660 }
661 }
662 }
663 }
664
665 let reindex_cutoff = chrono::Utc::now() - chrono::Duration::days(7);
666 for root in &project_roots {
667 let nexus_dir = root.join(".nexus");
668 let mut cache = CognitiveCache::load_or_init(&nexus_dir);
669
670 let mut fresh_entries: Vec<ColdIndexEntry> = Vec::new();
672 for ns in &namespaces {
673 let filters = nexus_storage::repository::ListMemoryFilters {
674 category: None,
675 since: Some(reindex_cutoff),
676 until: None,
677 content_like: None,
678 include_raw: false,
679 limit: 50,
680 offset: 0,
681 };
682 if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
683 for m in memories {
684 let cwd_match = m
685 .metadata
686 .get("runtime")
687 .and_then(|r| r.get("cwd"))
688 .and_then(|v| v.as_str())
689 .map(|c| Path::new(c) == root.as_path())
690 .unwrap_or(false);
691 if cwd_match {
692 fresh_entries.push(ColdIndexEntry {
693 memory_id: m.id,
694 project_relevance: 0.7,
695 last_surfaced: Some(m.created_at),
696 });
697 }
698 }
699 }
700 }
701
702 cache.cold_index.entries = fresh_entries;
705 cache.cold_index.last_reindexed = Some(chrono::Utc::now());
706
707 match cache.save(&nexus_dir) {
708 Ok(()) if !cache.cold_index.entries.is_empty() => {
709 cold_caches_reindexed += 1;
710 }
711 Ok(()) => {}
712 Err(e) => {
713 tracing::warn!(error = %e, "Failed to save cold cache after reindexing");
714 }
715 }
716 }
717
718 activity_monitor.last_deep_dream = Some(chrono::Utc::now());
720 if let Err(e) = activity_monitor.save() {
721 tracing::error!("Failed to save activity monitor after deep dream: {}", e);
722 }
723
724 Ok(DeepDreamResult {
725 soul_updated,
726 memories_pruned,
727 cross_project_patterns: total_patterns,
728 cold_caches_reindexed,
729 })
730}
731
732pub async fn extract_cross_project_patterns(
737 memories_by_project: HashMap<String, Vec<Memory>>,
738 embeddings: Option<&dyn EmbeddingService>,
739 similarity_threshold: f32,
740) -> Vec<SoulCandidate> {
741 let mut flat_memories: Vec<(Memory, String)> = Vec::new();
743 for (project, memories) in memories_by_project {
744 for m in memories {
745 flat_memories.push((m, project.clone()));
746 }
747 }
748
749 if flat_memories.len() < 2 {
750 return Vec::new();
751 }
752
753 let mut emb_map: HashMap<i64, Vec<f32>> = HashMap::new();
755 let mut to_compute: Vec<usize> = Vec::new();
756
757 for (idx, (m, _)) in flat_memories.iter().enumerate() {
758 if let Some(emb) = &m.content_embedding {
759 emb_map.insert(m.id, emb.clone());
760 } else {
761 to_compute.push(idx);
762 }
763 }
764
765 if let Some(service) = embeddings {
767 let contents: Vec<String> = to_compute
768 .iter()
769 .map(|&idx| flat_memories[idx].0.content.clone())
770 .collect();
771 if !contents.is_empty() {
772 match service.embed_batch(&contents).await {
773 Ok(results) if results.len() == contents.len() => {
774 for (idx, emb) in to_compute.into_iter().zip(results) {
775 let mem_id = flat_memories[idx].0.id;
776 emb_map.insert(mem_id, emb);
777 }
778 }
779 Ok(results) => {
780 tracing::warn!(
781 "embed_batch returned {} results for {} inputs in pattern extraction",
782 results.len(),
783 contents.len()
784 );
785 }
786 Err(e) => {
787 tracing::warn!("embed_batch failed in pattern extraction: {}", e);
788 }
789 }
790 }
791 }
792
793 if emb_map.len() < flat_memories.len() / 2 {
796 let mut pattern_map: HashMap<String, (u32, Vec<String>)> = HashMap::new();
798 for (m, project) in &flat_memories {
799 let normalized = m.content.to_lowercase();
800 let entry = pattern_map.entry(normalized).or_insert((0, Vec::new()));
801 entry.0 += 1;
802 if !entry.1.contains(project) {
803 entry.1.push(project.clone());
804 }
805 }
806 return pattern_map
807 .into_iter()
808 .filter(|(_, (_count, projects))| projects.len() >= 2)
809 .map(|(content, (count, projects))| SoulCandidate {
810 content,
811 source_project: projects.join(", "),
812 observation_count: count,
813 category: "TechnicalLearning".into(),
814 source_agent: "nexus-dream-engine".into(),
815 })
816 .collect();
817 }
818
819 let n = flat_memories.len();
821 let mut parent: Vec<usize> = (0..n).collect();
822
823 fn find(mut x: usize, parent: &mut [usize]) -> usize {
824 let mut root = x;
825 while parent[root] != root {
826 root = parent[root];
827 }
828 while parent[x] != root {
829 let next = parent[x];
830 parent[x] = root;
831 x = next;
832 }
833 root
834 }
835
836 fn union(x: usize, y: usize, parent: &mut [usize]) {
837 let rx = find(x, parent);
838 let ry = find(y, parent);
839 if rx != ry {
840 parent[ry] = rx;
841 }
842 }
843
844 let indices: Vec<usize> = (0..n)
846 .filter(|i| emb_map.contains_key(&flat_memories[*i].0.id))
847 .collect();
848 for &i in &indices {
849 let emb_i = emb_map.get(&flat_memories[i].0.id).unwrap();
850 for &j in indices.iter().filter(|&&j| j > i) {
851 let emb_j = emb_map.get(&flat_memories[j].0.id).unwrap();
852 if cosine_similarity(emb_i, emb_j) >= similarity_threshold {
853 union(i, j, &mut parent);
854 }
855 }
856 }
857
858 let mut clusters: HashMap<usize, Vec<usize>> = HashMap::new();
860 for i in 0..n {
861 let root = find(i, &mut parent);
862 clusters.entry(root).or_default().push(i);
863 }
864
865 let mut candidates = Vec::new();
867 for indices in clusters.values() {
868 let mut projects_set: HashSet<String> = HashSet::new();
869 for &idx in indices {
870 projects_set.insert(flat_memories[idx].1.clone());
871 }
872 if projects_set.len() >= 2 {
873 let (rep_idx, _) = indices
875 .iter()
876 .map(|&idx| (idx, flat_memories[idx].0.content.len()))
877 .max_by_key(|(_, len)| *len)
878 .unwrap();
879 let content = flat_memories[rep_idx].0.content.clone();
880 let observation_count = indices.len() as u32;
881 let source_project = projects_set.into_iter().collect::<Vec<String>>().join(", ");
882 candidates.push(SoulCandidate {
883 content,
884 source_project,
885 observation_count,
886 category: "TechnicalLearning".into(),
887 source_agent: "nexus-dream-engine".into(),
888 });
889 }
890 }
891
892 candidates
893}
894
895pub(crate) fn choose_dream_schedule(
898 signals: &DreamSignals,
899 reason: RuntimeShutdownReason,
900) -> DreamSchedulePlan {
901 if signals.total_non_raw_count == 0
903 && signals.raw_event_count == 0
904 && signals.explicit_count == 0
905 && signals.derived_count == 0
906 && signals.contradiction_count == 0
907 {
908 return DreamSchedulePlan {
909 action: DreamScheduleAction::Skip,
910 reason: "no signals",
911 };
912 }
913
914 if signals.contradiction_density > 0.0 {
916 if signals.contradiction_density > 0.15 {
918 return DreamSchedulePlan {
919 action: DreamScheduleAction::ImmediateBounded,
920 reason: "high contradiction density",
921 };
922 }
923
924 if reason == RuntimeShutdownReason::SessionEnded && signals.contradiction_density >= 0.10 {
925 return DreamSchedulePlan {
926 action: DreamScheduleAction::ImmediateBounded,
927 reason: "moderate contradiction density at session end",
928 };
929 }
930
931 if reason == RuntimeShutdownReason::IdleTimeout {
933 if signals.explicit_count > 0 || signals.derived_count > 0 {
934 return DreamSchedulePlan {
935 action: DreamScheduleAction::DelayedEnqueue,
936 reason: "delays idle medium signal sessions",
937 };
938 }
939 return DreamSchedulePlan {
940 action: DreamScheduleAction::Skip,
941 reason: "idle without signal",
942 };
943 }
944
945 if reason == RuntimeShutdownReason::SessionEnded
948 && (signals.explicit_count > 0 || signals.derived_count > 0)
949 {
950 return DreamSchedulePlan {
951 action: DreamScheduleAction::ImmediateBounded,
952 reason: "session end flushes explicit reflection",
953 };
954 }
955
956 if signals.has_digest_gap && signals.raw_event_count > 0 {
958 return DreamSchedulePlan {
959 action: DreamScheduleAction::DigestOnly,
960 reason: "digest only for light digest gap",
961 };
962 }
963 return DreamSchedulePlan {
964 action: DreamScheduleAction::DelayedEnqueue,
965 reason: "default_background",
966 };
967 }
968
969 if signals.contradiction_count > 0 {
971 return DreamSchedulePlan {
972 action: DreamScheduleAction::ImmediateBounded,
973 reason: "contradiction detected",
974 };
975 }
976
977 if reason == RuntimeShutdownReason::SessionEnded
978 && (signals.explicit_count > 0 || signals.derived_count > 0)
979 {
980 return DreamSchedulePlan {
981 action: DreamScheduleAction::ImmediateBounded,
982 reason: "session end flushes explicit reflection",
983 };
984 }
985
986 if signals.has_digest_gap && signals.raw_event_count > 0 {
987 return DreamSchedulePlan {
988 action: DreamScheduleAction::DigestOnly,
989 reason: "digest only for light digest gap",
990 };
991 }
992
993 if reason == RuntimeShutdownReason::IdleTimeout {
994 if signals.explicit_count > 0 || signals.derived_count > 0 {
995 return DreamSchedulePlan {
996 action: DreamScheduleAction::DelayedEnqueue,
997 reason: "delays idle medium signal sessions",
998 };
999 }
1000 return DreamSchedulePlan {
1001 action: DreamScheduleAction::Skip,
1002 reason: "idle without signal",
1003 };
1004 }
1005
1006 DreamSchedulePlan {
1007 action: DreamScheduleAction::DelayedEnqueue,
1008 reason: "default_background",
1009 }
1010}
1011
1012pub(crate) async fn compute_adaptive_dream_interval(
1013 pool: sqlx::SqlitePool,
1014 namespace_id: i64,
1015 base_interval_secs: u64,
1016 cognition: &CognitionConfig,
1017) -> Duration {
1018 let repo = MemoryRepository::new(pool);
1019 let filters = nexus_storage::repository::ListMemoryFilters {
1020 category: None,
1021 since: None,
1022 until: None,
1023 content_like: None,
1024 include_raw: true,
1025 limit: 100,
1026 offset: 0,
1027 };
1028
1029 if let Ok(recent_memories) = repo.list_filtered(namespace_id, filters).await {
1030 if recent_memories.is_empty() {
1031 return Duration::from_secs(base_interval_secs.clamp(
1032 cognition.adaptive_dream_min_interval_secs,
1033 cognition.adaptive_dream_max_interval_secs,
1034 ));
1035 }
1036
1037 let contradiction_count = recent_memories
1038 .iter()
1039 .filter(|m| {
1040 if nexus_core::cognitive_level_from_metadata(&m.metadata)
1042 == CognitiveLevel::Contradiction
1043 {
1044 return true;
1045 }
1046 if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&m.metadata) {
1048 if cog.times_contradicted > 0 {
1049 return true;
1050 }
1051 }
1052 false
1053 })
1054 .count();
1055
1056 let density = contradiction_count as f32 / recent_memories.len() as f32;
1057
1058 let multiplier = if density > 0.10 {
1059 0.5
1060 } else if recent_memories.len() < 5 {
1061 2.0
1062 } else {
1063 1.0
1064 };
1065
1066 let interval = (base_interval_secs as f32 * multiplier) as u64;
1067 let interval = interval.clamp(
1068 cognition.adaptive_dream_min_interval_secs,
1069 cognition.adaptive_dream_max_interval_secs,
1070 );
1071
1072 return Duration::from_secs(interval);
1073 }
1074
1075 Duration::from_secs(base_interval_secs.clamp(
1076 cognition.adaptive_dream_min_interval_secs,
1077 cognition.adaptive_dream_max_interval_secs,
1078 ))
1079}