1use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use std::time::Instant;
7use tracing::warn;
8
9use nexus_core::config::{AgentConfig, CognitionConfig, CognitiveSystemConfig};
10use nexus_core::fsutil::atomic_write;
11use nexus_core::traits::EmbeddingService;
12use nexus_core::{cosine_similarity, CognitiveLevel, Memory, PerspectiveKey};
13use nexus_llm::LlmClient;
14use nexus_storage::models::EnqueueJobParams;
15use nexus_storage::repository::MemoryRepository;
16use serde_json::json;
17
18use crate::cognitive_cache::{CognitiveCache, ColdIndexEntry};
19use crate::context_builder::build_context_md;
20use crate::distill;
21use crate::error::AgentError;
22use crate::job_processor;
23use crate::session_manager::SessionManager;
24use crate::token_budget::TokenBudget;
25use crate::util::{flush_metric_samples, stage_metric_sample};
26use crate::RuntimeShutdownReason;
27
28use std::collections::{HashMap, HashSet};
29use std::path::{Path, PathBuf};
30use std::time::Duration;
31
32#[derive(Debug, Clone)]
35pub struct DreamCycleRequest<'a> {
36 pub namespace_id: i64,
37 pub lease_owner: &'a str,
38 pub perspective: Option<&'a PerspectiveKey>,
39 pub session_key: Option<&'a str>,
40 pub reflect_reason: &'a str,
41 pub digest_reason: &'a str,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub(crate) enum DreamScheduleAction {
48 ImmediateBounded,
49 DelayedEnqueue,
50 DigestOnly,
51 Skip,
52}
53
54#[derive(Debug, Clone)]
55pub(crate) struct DreamSchedulePlan {
56 pub action: DreamScheduleAction,
57 pub reason: &'static str,
58}
59
60#[derive(Debug, Clone, Default)]
61pub(crate) struct DreamSignals {
62 pub raw_event_count: usize,
63 pub explicit_count: usize,
64 pub derived_count: usize,
65 pub contradiction_count: usize,
66 pub has_digest_gap: bool,
67 pub total_non_raw_count: usize,
69 pub contradiction_density: f32,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct NapResult {
75 pub memories_processed: usize,
76 pub hot_cache_updated: bool,
77 pub elapsed_ms: u64,
78 pub timed_out: bool,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct DreamResult {
83 pub memories_derived: usize,
84 pub connections_found: usize,
85 pub hot_cold_reranked: bool,
86}
87
88pub struct DreamServices {
91 pub pool: sqlx::SqlitePool,
92 pub cognition: CognitionConfig,
93 pub agent: AgentConfig,
94 pub llm: Arc<dyn LlmClient>,
95 pub embeddings: Option<Arc<dyn EmbeddingService>>,
96 pub cognitive_system: CognitiveSystemConfig,
97}
98
99pub async fn run_nap(
101 session_id: &str,
102 project_root: &Path,
103 namespace_id: i64,
104 services: &DreamServices,
105 timeout: Duration,
106) -> Result<NapResult, AgentError> {
107 let start = Instant::now();
108 let nexus_dir = project_root.join(".nexus");
109
110 let result = tokio::time::timeout(timeout, async {
111 let processed = drain_cognition_jobs(
113 services.pool.clone(),
114 namespace_id,
115 &services.cognition,
116 &services.agent,
117 services.llm.clone(),
118 services.embeddings.clone(),
119 &format!("nap-{}", session_id),
120 )
121 .await?;
122
123 let mut cache = CognitiveCache::load_or_init(&nexus_dir);
125 let session_manager = SessionManager::new(project_root);
126 let merged = session_manager.merge_session(
127 session_id,
128 &mut cache.hot_cache,
129 services.cognitive_system.hot_cache_max_entries,
130 )?;
131
132 let window_size = TokenBudget::estimate_window(&services.agent.agent_type) as f32;
134 let max_context_tokens =
135 (window_size * services.cognitive_system.context_allocation_pct) as usize;
136 let context_md = build_context_md(&cache.hot_cache, &[], max_context_tokens);
137 let context_path = nexus_dir.join("context.md");
138 atomic_write(&context_path, &context_md)?;
139
140 cache.save(&nexus_dir)?;
142
143 if let Err(e) = session_manager.mark_session_merged(session_id) {
145 tracing::warn!(error = %e, "Failed to mark session as merged");
146 }
147
148 Ok::<NapResult, AgentError>(NapResult {
149 memories_processed: processed,
150 hot_cache_updated: merged > 0,
151 elapsed_ms: start.elapsed().as_millis() as u64,
152 timed_out: false,
153 })
154 })
155 .await;
156
157 match result {
158 Ok(Ok(res)) => Ok(res),
159 Ok(Err(e)) => Err(e),
160 Err(_) => {
161 warn!(
162 "Nap timed out after {:?}; leased cognition jobs remain in queue \
163 and will be re-claimed on the next cycle once their lease expires",
164 timeout
165 );
166 Ok(NapResult {
167 memories_processed: 0,
168 hot_cache_updated: false,
169 elapsed_ms: start.elapsed().as_millis() as u64,
170 timed_out: true,
171 })
172 }
173 }
174}
175
176pub async fn run_dream(
178 project_root: &Path,
179 namespace_id: i64,
180 services: &DreamServices,
181) -> Result<DreamResult, AgentError> {
182 let nexus_dir = project_root.join(".nexus");
183
184 let processed = drain_cognition_jobs(
186 services.pool.clone(),
187 namespace_id,
188 &services.cognition,
189 &services.agent,
190 services.llm.clone(),
191 services.embeddings.clone(),
192 "dream-threshold",
193 )
194 .await?;
195
196 let cache = CognitiveCache::load_or_init(&nexus_dir);
198
199 let window_size = TokenBudget::estimate_window(&services.agent.agent_type) as f32;
201 let max_context_tokens =
202 (window_size * services.cognitive_system.context_allocation_pct) as usize;
203 let context_md = build_context_md(&cache.hot_cache, &[], max_context_tokens);
204 let context_path = nexus_dir.join("context.md");
205 atomic_write(&context_path, &context_md)?;
206
207 cache.save(&nexus_dir)?;
209
210 Ok(DreamResult {
211 memories_derived: processed,
212 connections_found: 0,
213 hot_cold_reranked: false,
214 })
215}
216
217pub async fn run_dream_cycle(
220 pool: sqlx::SqlitePool,
221 cognition: &CognitionConfig,
222 agent: &AgentConfig,
223 llm: Arc<dyn LlmClient>,
224 embeddings: Option<Arc<dyn EmbeddingService>>,
225 request: DreamCycleRequest<'_>,
226) -> Result<usize, AgentError> {
227 let repo = MemoryRepository::new(pool.clone());
228 let total_started = Instant::now();
229 let mut metrics = Vec::new();
230 let enqueue_started = Instant::now();
231 enqueue_dream_jobs(
232 &repo,
233 request.namespace_id,
234 request.perspective,
235 request.session_key,
236 request.reflect_reason,
237 request.digest_reason,
238 )
239 .await?;
240 metrics.push(stage_metric_sample(
241 request.namespace_id,
242 "cognition.dream.enqueue_ms",
243 enqueue_started.elapsed().as_secs_f64() * 1000.0,
244 "enqueue",
245 ));
246 let drain_started = Instant::now();
247 let processed = drain_cognition_jobs(
248 pool,
249 request.namespace_id,
250 cognition,
251 agent,
252 llm,
253 embeddings,
254 request.lease_owner,
255 )
256 .await?;
257 metrics.push(stage_metric_sample(
258 request.namespace_id,
259 "cognition.dream.drain_ms",
260 drain_started.elapsed().as_secs_f64() * 1000.0,
261 "drain",
262 ));
263 metrics.push(stage_metric_sample(
264 request.namespace_id,
265 "cognition.dream.total_ms",
266 total_started.elapsed().as_secs_f64() * 1000.0,
267 "total",
268 ));
269 flush_metric_samples(&repo, &metrics).await;
270 Ok(processed)
271}
272
273pub async fn drain_cognition_jobs(
276 pool: sqlx::SqlitePool,
277 namespace_id: i64,
278 cognition: &CognitionConfig,
279 agent: &AgentConfig,
280 llm: Arc<dyn LlmClient>,
281 embeddings: Option<Arc<dyn EmbeddingService>>,
282 lease_owner: &str,
283) -> Result<usize, AgentError> {
284 let repo = MemoryRepository::new(pool);
285 let mut total_processed = 0usize;
286
287 for _ in 0..3 {
288 let mut progressed = 0usize;
289
290 if cognition.activity_distill_enabled {
291 progressed += distill::process_activity_distill_jobs(
292 &repo,
293 namespace_id,
294 cognition,
295 llm.clone(),
296 lease_owner,
297 )
298 .await?;
299 }
300 if cognition.derive_enabled {
301 progressed += job_processor::process_derive_jobs(
302 &repo,
303 namespace_id,
304 cognition,
305 agent,
306 llm.clone(),
307 embeddings.clone(),
308 lease_owner,
309 )
310 .await?;
311 }
312 if cognition.reflect_enabled {
313 progressed += job_processor::process_reflect_jobs(
314 &repo,
315 namespace_id,
316 cognition,
317 agent,
318 embeddings.clone(),
319 lease_owner,
320 )
321 .await?;
322 progressed += job_processor::process_reflect_namespace_jobs(
323 &repo,
324 namespace_id,
325 cognition,
326 agent,
327 embeddings.clone(),
328 lease_owner,
329 )
330 .await?;
331 }
332 if cognition.digest_enabled {
333 progressed += job_processor::process_digest_jobs(
334 &repo,
335 namespace_id,
336 cognition,
337 agent,
338 llm.clone(),
339 embeddings.clone(),
340 lease_owner,
341 )
342 .await?;
343 }
344
345 total_processed += progressed;
346 if progressed == 0 {
347 break;
348 }
349 }
350
351 Ok(total_processed)
352}
353
354pub async fn enqueue_dream_jobs(
357 repo: &MemoryRepository,
358 namespace_id: i64,
359 perspective: Option<&PerspectiveKey>,
360 session_key: Option<&str>,
361 reflect_reason: &str,
362 digest_reason: &str,
363) -> Result<usize, AgentError> {
364 let mut queued = 0usize;
365
366 if let Some(perspective) = perspective {
367 let perspective_json = serde_json::to_value(perspective)
368 .map_err(|error| AgentError::Reflection(error.to_string()))?;
369 let payload = json!({
370 "reason": reflect_reason,
371 "session_key": perspective.session_key,
372 });
373 if job_processor::enqueue_job_if_absent(
374 repo,
375 EnqueueJobParams {
376 namespace_id,
377 job_type: job_processor::REFLECT_PERSPECTIVE_JOB,
378 priority: 100,
379 perspective: Some(&perspective_json),
380 payload: &payload,
381 },
382 )
383 .await?
384 {
385 queued += 1;
386 }
387 } else {
388 let payload = json!({
389 "reason": reflect_reason,
390 "session_key": session_key,
391 });
392 if job_processor::enqueue_job_if_absent(
393 repo,
394 EnqueueJobParams {
395 namespace_id,
396 job_type: job_processor::REFLECT_NAMESPACE_JOB,
397 priority: 100,
398 perspective: None,
399 payload: &payload,
400 },
401 )
402 .await?
403 {
404 queued += 1;
405 }
406 }
407
408 if let Some(session_key) = session_key {
409 let payload = json!({
410 "session_key": session_key,
411 "reason": digest_reason,
412 });
413 if job_processor::enqueue_job_if_absent(
414 repo,
415 EnqueueJobParams {
416 namespace_id,
417 job_type: job_processor::DIGEST_SESSION_JOB,
418 priority: 110,
419 perspective: None,
420 payload: &payload,
421 },
422 )
423 .await?
424 {
425 queued += 1;
426 }
427 }
428
429 Ok(queued)
430}
431
432pub(crate) async fn collect_dream_signals(
435 repo: &MemoryRepository,
436 namespace_id: i64,
437 session_key: &str,
438) -> Result<DreamSignals, AgentError> {
439 let memories = repo
440 .list_by_session_key(namespace_id, session_key, 512, true)
441 .await
442 .map_err(|error| AgentError::Storage(error.to_string()))?;
443 let has_digest_gap = repo
444 .count_digests(namespace_id, Some(session_key))
445 .await
446 .map_err(|error| AgentError::Storage(error.to_string()))?
447 == 0;
448
449 let mut signals = DreamSignals {
450 has_digest_gap,
451 ..DreamSignals::default()
452 };
453 for memory in memories
454 .iter()
455 .filter(|memory| memory_matches_session_key(memory, session_key))
456 {
457 if is_raw_event(memory) {
458 signals.raw_event_count += 1;
459 } else {
460 signals.total_non_raw_count += 1;
461
462 let level = nexus_core::cognitive_level_from_metadata(&memory.metadata);
463 match level {
464 CognitiveLevel::Explicit => signals.explicit_count += 1,
465 CognitiveLevel::Derived => signals.derived_count += 1,
466 CognitiveLevel::Contradiction => signals.contradiction_count += 1,
467 _ => {}
468 }
469 if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&memory.metadata) {
472 if cog.times_contradicted > 0 && level != CognitiveLevel::Contradiction {
473 signals.contradiction_count += 1;
474 }
475 }
476 }
477 }
478
479 if signals.total_non_raw_count > 0 {
480 signals.contradiction_density =
481 signals.contradiction_count as f32 / signals.total_non_raw_count as f32;
482 }
483
484 Ok(signals)
485}
486
487fn memory_matches_session_key(memory: &Memory, session_key: &str) -> bool {
488 if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&memory.metadata) {
489 if cog.session_key.as_deref() == Some(session_key) {
490 return true;
491 }
492 if cog.session_keys.iter().any(|k| k == session_key) {
493 return true;
494 }
495 }
496 false
497}
498
499fn is_raw_event(memory: &Memory) -> bool {
505 memory.labels.iter().any(|l| l == "raw-activity")
508 || nexus_core::cognitive_level_from_metadata(&memory.metadata) == CognitiveLevel::Raw
509}
510
511use crate::activity_monitor::ActivityMonitor;
514use crate::soul::{SoulBuilder, SoulCandidate};
515
516#[derive(Debug, Clone, Serialize, Deserialize)]
517pub struct DeepDreamResult {
518 pub soul_updated: bool,
519 pub memories_pruned: usize,
520 pub cross_project_patterns: usize,
521 pub cold_caches_reindexed: usize,
522}
523
524pub async fn run_deep_dream(
526 services: &DreamServices,
527 soul_builder: &SoulBuilder,
528 activity_monitor: &mut ActivityMonitor,
529) -> Result<DeepDreamResult, AgentError> {
530 let repo = MemoryRepository::new(services.pool.clone());
531 let ns_repo = nexus_storage::repository::NamespaceRepository::new(services.pool.clone());
532
533 let namespaces = ns_repo
535 .list_all()
536 .await
537 .map_err(|e| AgentError::Storage(e.to_string()))?;
538
539 for ns in &namespaces {
541 if let Err(e) = drain_cognition_jobs(
542 services.pool.clone(),
543 ns.id,
544 &services.cognition,
545 &services.agent,
546 services.llm.clone(),
547 services.embeddings.clone(),
548 "deep-dream-cleanup",
549 )
550 .await
551 {
552 tracing::warn!(namespace_id = ns.id, error = %e, "drain_cognition_jobs failed");
553 }
554 }
555
556 let mut memories_by_project: HashMap<String, Vec<Memory>> = HashMap::new();
558 for ns in &namespaces {
559 let filters = nexus_storage::repository::ListMemoryFilters {
560 category: None,
561 since: None,
562 until: None,
563 content_like: None,
564 include_raw: false,
565 limit: 1000,
566 offset: 0,
567 };
568 if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
569 for m in memories {
570 let level = m.level();
571 if level == CognitiveLevel::Derived || level == CognitiveLevel::Explicit {
572 let project_name = m
573 .metadata
574 .get("runtime")
575 .and_then(|r| r.get("project_name"))
576 .and_then(|p| p.as_str())
577 .unwrap_or("unknown")
578 .to_string();
579 memories_by_project.entry(project_name).or_default().push(m);
580 }
581 }
582 }
583 }
584
585 let candidates = extract_cross_project_patterns(
586 memories_by_project,
587 services.embeddings.as_deref(),
588 services.cognitive_system.similarity_threshold,
589 )
590 .await;
591 let total_patterns = candidates.len();
592
593 let soul_updated = if !candidates.is_empty() {
595 soul_builder.rebuild_soul(&candidates).await.is_ok()
596 } else {
597 false
598 };
599
600 let mut memories_pruned = 0usize;
602 let prune_cutoff = chrono::Utc::now() - chrono::Duration::days(30);
603 for ns in &namespaces {
604 if let Ok(candidates) = repo
605 .list_archived_raw_cleanup_candidates(ns.id, prune_cutoff, 500)
606 .await
607 {
608 if !candidates.is_empty() {
609 let ids: Vec<i64> = candidates.iter().map(|m| m.id).collect();
610 match repo.delete_batch(&ids).await {
611 Ok(deleted) => memories_pruned += deleted as usize,
612 Err(e) => {
613 warn!(
614 error = %e, count = ids.len(),
615 "Failed to delete archived raw memories; skipping"
616 );
617 }
618 }
619 }
620 }
621 }
622
623 let mut cold_caches_reindexed = 0usize;
625 let mut project_roots: HashSet<PathBuf> = HashSet::new();
627 for ns in &namespaces {
628 let filters = nexus_storage::repository::ListMemoryFilters {
629 category: None,
630 since: Some(chrono::Utc::now() - chrono::Duration::days(90)),
631 until: None,
632 content_like: None,
633 include_raw: true,
634 limit: 200,
635 offset: 0,
636 };
637 if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
638 for m in &memories {
639 if let Some(cwd) = m
640 .metadata
641 .get("runtime")
642 .and_then(|r| r.get("cwd"))
643 .and_then(|v| v.as_str())
644 {
645 let root = PathBuf::from(cwd);
646 if root.join(".nexus").exists() {
647 project_roots.insert(root);
648 }
649 }
650 }
651 }
652 }
653
654 let reindex_cutoff = chrono::Utc::now() - chrono::Duration::days(7);
655 for root in &project_roots {
656 let nexus_dir = root.join(".nexus");
657 let mut cache = CognitiveCache::load_or_init(&nexus_dir);
658
659 let mut fresh_entries: Vec<ColdIndexEntry> = Vec::new();
661 for ns in &namespaces {
662 let filters = nexus_storage::repository::ListMemoryFilters {
663 category: None,
664 since: Some(reindex_cutoff),
665 until: None,
666 content_like: None,
667 include_raw: false,
668 limit: 50,
669 offset: 0,
670 };
671 if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
672 for m in memories {
673 let cwd_match = m
674 .metadata
675 .get("runtime")
676 .and_then(|r| r.get("cwd"))
677 .and_then(|v| v.as_str())
678 .map(|c| Path::new(c) == root.as_path())
679 .unwrap_or(false);
680 if cwd_match {
681 fresh_entries.push(ColdIndexEntry {
682 memory_id: m.id,
683 project_relevance: 0.7,
684 last_surfaced: Some(m.created_at),
685 });
686 }
687 }
688 }
689 }
690
691 cache.cold_index.entries = fresh_entries;
694 cache.cold_index.last_reindexed = Some(chrono::Utc::now());
695
696 match cache.save(&nexus_dir) {
697 Ok(()) if !cache.cold_index.entries.is_empty() => {
698 cold_caches_reindexed += 1;
699 }
700 Ok(()) => {}
701 Err(e) => {
702 tracing::warn!(error = %e, "Failed to save cold cache after reindexing");
703 }
704 }
705 }
706
707 activity_monitor.last_deep_dream = Some(chrono::Utc::now());
709 if let Err(e) = activity_monitor.save() {
710 tracing::error!("Failed to save activity monitor after deep dream: {}", e);
711 }
712
713 Ok(DeepDreamResult {
714 soul_updated,
715 memories_pruned,
716 cross_project_patterns: total_patterns,
717 cold_caches_reindexed,
718 })
719}
720
721pub async fn extract_cross_project_patterns(
726 memories_by_project: HashMap<String, Vec<Memory>>,
727 embeddings: Option<&dyn EmbeddingService>,
728 similarity_threshold: f32,
729) -> Vec<SoulCandidate> {
730 let mut flat_memories: Vec<(Memory, String)> = Vec::new();
732 for (project, memories) in memories_by_project {
733 for m in memories {
734 flat_memories.push((m, project.clone()));
735 }
736 }
737
738 if flat_memories.len() < 2 {
739 return Vec::new();
740 }
741
742 let mut emb_map: HashMap<i64, Vec<f32>> = HashMap::new();
744 let mut to_compute: Vec<usize> = Vec::new();
745
746 for (idx, (m, _)) in flat_memories.iter().enumerate() {
747 if let Some(emb) = &m.content_embedding {
748 emb_map.insert(m.id, emb.clone());
749 } else {
750 to_compute.push(idx);
751 }
752 }
753
754 if let Some(service) = embeddings {
756 let contents: Vec<String> = to_compute
757 .iter()
758 .map(|&idx| flat_memories[idx].0.content.clone())
759 .collect();
760 if !contents.is_empty() {
761 match service.embed_batch(&contents).await {
762 Ok(results) if results.len() == contents.len() => {
763 for (idx, emb) in to_compute.into_iter().zip(results) {
764 let mem_id = flat_memories[idx].0.id;
765 emb_map.insert(mem_id, emb);
766 }
767 }
768 Ok(results) => {
769 tracing::warn!(
770 "embed_batch returned {} results for {} inputs in pattern extraction",
771 results.len(),
772 contents.len()
773 );
774 }
775 Err(e) => {
776 tracing::warn!("embed_batch failed in pattern extraction: {}", e);
777 }
778 }
779 }
780 }
781
782 if emb_map.len() * 2 < flat_memories.len() {
785 let mut pattern_map: HashMap<String, (u32, Vec<String>)> = HashMap::new();
787 for (m, project) in &flat_memories {
788 let normalized = m.content.to_lowercase();
789 let entry = pattern_map.entry(normalized).or_insert((0, Vec::new()));
790 entry.0 += 1;
791 if !entry.1.contains(project) {
792 entry.1.push(project.clone());
793 }
794 }
795 return pattern_map
796 .into_iter()
797 .filter(|(_, (_count, projects))| projects.len() >= 2)
798 .map(|(content, (count, projects))| SoulCandidate {
799 content,
800 source_project: projects.join(", "),
801 observation_count: count,
802 category: "TechnicalLearning".into(),
803 source_agent: "nexus-dream-engine".into(),
804 })
805 .collect();
806 }
807
808 let n = flat_memories.len();
810 let mut parent: Vec<usize> = (0..n).collect();
811
812 fn find(mut x: usize, parent: &mut [usize]) -> usize {
813 let mut root = x;
814 while parent[root] != root {
815 root = parent[root];
816 }
817 while parent[x] != root {
818 let next = parent[x];
819 parent[x] = root;
820 x = next;
821 }
822 root
823 }
824
825 fn union(x: usize, y: usize, parent: &mut [usize]) {
826 let rx = find(x, parent);
827 let ry = find(y, parent);
828 if rx != ry {
829 parent[ry] = rx;
830 }
831 }
832
833 let indices: Vec<usize> = (0..n)
835 .filter(|i| emb_map.contains_key(&flat_memories[*i].0.id))
836 .collect();
837 for &i in &indices {
838 let emb_i = emb_map.get(&flat_memories[i].0.id).unwrap();
839 for &j in indices.iter().filter(|&&j| j > i) {
840 let emb_j = emb_map.get(&flat_memories[j].0.id).unwrap();
841 if cosine_similarity(emb_i, emb_j) >= similarity_threshold {
842 union(i, j, &mut parent);
843 }
844 }
845 }
846
847 let mut clusters: HashMap<usize, Vec<usize>> = HashMap::new();
849 for i in 0..n {
850 let root = find(i, &mut parent);
851 clusters.entry(root).or_default().push(i);
852 }
853
854 let mut candidates = Vec::new();
856 for indices in clusters.values() {
857 let mut projects_set: HashSet<String> = HashSet::new();
858 for &idx in indices {
859 projects_set.insert(flat_memories[idx].1.clone());
860 }
861 if projects_set.len() >= 2 {
862 let (rep_idx, _) = indices
864 .iter()
865 .map(|&idx| (idx, flat_memories[idx].0.content.len()))
866 .max_by_key(|(_, len)| *len)
867 .unwrap();
868 let content = flat_memories[rep_idx].0.content.clone();
869 let observation_count = indices.len() as u32;
870 let source_project = projects_set.into_iter().collect::<Vec<String>>().join(", ");
871 candidates.push(SoulCandidate {
872 content,
873 source_project,
874 observation_count,
875 category: "TechnicalLearning".into(),
876 source_agent: "nexus-dream-engine".into(),
877 });
878 }
879 }
880
881 candidates
882}
883
884pub(crate) fn choose_dream_schedule(
887 signals: &DreamSignals,
888 reason: RuntimeShutdownReason,
889) -> DreamSchedulePlan {
890 if signals.total_non_raw_count == 0
892 && signals.raw_event_count == 0
893 && signals.explicit_count == 0
894 && signals.derived_count == 0
895 && signals.contradiction_count == 0
896 {
897 return DreamSchedulePlan {
898 action: DreamScheduleAction::Skip,
899 reason: "no signals",
900 };
901 }
902
903 if signals.contradiction_density > 0.0 {
905 if signals.contradiction_density > 0.15 {
907 return DreamSchedulePlan {
908 action: DreamScheduleAction::ImmediateBounded,
909 reason: "high contradiction density",
910 };
911 }
912
913 if reason == RuntimeShutdownReason::SessionEnded && signals.contradiction_density >= 0.10 {
914 return DreamSchedulePlan {
915 action: DreamScheduleAction::ImmediateBounded,
916 reason: "moderate contradiction density at session end",
917 };
918 }
919
920 if reason == RuntimeShutdownReason::IdleTimeout {
922 if signals.explicit_count > 0 || signals.derived_count > 0 {
923 return DreamSchedulePlan {
924 action: DreamScheduleAction::DelayedEnqueue,
925 reason: "delays idle medium signal sessions",
926 };
927 }
928 return DreamSchedulePlan {
929 action: DreamScheduleAction::Skip,
930 reason: "idle without signal",
931 };
932 }
933
934 if reason == RuntimeShutdownReason::SessionEnded
937 && (signals.explicit_count > 0 || signals.derived_count > 0)
938 {
939 return DreamSchedulePlan {
940 action: DreamScheduleAction::ImmediateBounded,
941 reason: "session end flushes explicit reflection",
942 };
943 }
944
945 if signals.has_digest_gap && signals.raw_event_count > 0 {
947 return DreamSchedulePlan {
948 action: DreamScheduleAction::DigestOnly,
949 reason: "digest only for light digest gap",
950 };
951 }
952 return DreamSchedulePlan {
953 action: DreamScheduleAction::DelayedEnqueue,
954 reason: "default_background",
955 };
956 }
957
958 if signals.contradiction_count > 0 {
960 return DreamSchedulePlan {
961 action: DreamScheduleAction::ImmediateBounded,
962 reason: "contradiction detected",
963 };
964 }
965
966 if reason == RuntimeShutdownReason::SessionEnded
967 && (signals.explicit_count > 0 || signals.derived_count > 0)
968 {
969 return DreamSchedulePlan {
970 action: DreamScheduleAction::ImmediateBounded,
971 reason: "session end flushes explicit reflection",
972 };
973 }
974
975 if signals.has_digest_gap && signals.raw_event_count > 0 {
976 return DreamSchedulePlan {
977 action: DreamScheduleAction::DigestOnly,
978 reason: "digest only for light digest gap",
979 };
980 }
981
982 if reason == RuntimeShutdownReason::IdleTimeout {
983 if signals.explicit_count > 0 || signals.derived_count > 0 {
984 return DreamSchedulePlan {
985 action: DreamScheduleAction::DelayedEnqueue,
986 reason: "delays idle medium signal sessions",
987 };
988 }
989 return DreamSchedulePlan {
990 action: DreamScheduleAction::Skip,
991 reason: "idle without signal",
992 };
993 }
994
995 DreamSchedulePlan {
996 action: DreamScheduleAction::DelayedEnqueue,
997 reason: "default_background",
998 }
999}
1000
1001pub(crate) async fn compute_adaptive_dream_interval(
1002 pool: sqlx::SqlitePool,
1003 namespace_id: i64,
1004 base_interval_secs: u64,
1005 cognition: &CognitionConfig,
1006) -> Duration {
1007 let repo = MemoryRepository::new(pool);
1008 let filters = nexus_storage::repository::ListMemoryFilters {
1009 category: None,
1010 since: None,
1011 until: None,
1012 content_like: None,
1013 include_raw: true,
1014 limit: 100,
1015 offset: 0,
1016 };
1017
1018 if let Ok(recent_memories) = repo.list_filtered(namespace_id, filters).await {
1019 if recent_memories.is_empty() {
1020 return Duration::from_secs(base_interval_secs.clamp(
1021 cognition.adaptive_dream_min_interval_secs,
1022 cognition.adaptive_dream_max_interval_secs,
1023 ));
1024 }
1025
1026 let contradiction_count = recent_memories
1027 .iter()
1028 .filter(|m| {
1029 if nexus_core::cognitive_level_from_metadata(&m.metadata)
1031 == CognitiveLevel::Contradiction
1032 {
1033 return true;
1034 }
1035 if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&m.metadata) {
1037 if cog.times_contradicted > 0 {
1038 return true;
1039 }
1040 }
1041 false
1042 })
1043 .count();
1044
1045 let density = contradiction_count as f32 / recent_memories.len() as f32;
1046
1047 let multiplier = if density > 0.10 {
1048 0.5
1049 } else if recent_memories.len() < 5 {
1050 2.0
1051 } else {
1052 1.0
1053 };
1054
1055 let interval = (base_interval_secs as f32 * multiplier) as u64;
1056 let interval = interval.clamp(
1057 cognition.adaptive_dream_min_interval_secs,
1058 cognition.adaptive_dream_max_interval_secs,
1059 );
1060
1061 return Duration::from_secs(interval);
1062 }
1063
1064 Duration::from_secs(base_interval_secs.clamp(
1065 cognition.adaptive_dream_min_interval_secs,
1066 cognition.adaptive_dream_max_interval_secs,
1067 ))
1068}