Skip to main content

nexus_memory_agent/
dream_cycle.rs

1//! Dream cycle orchestration — signal collection, adaptive scheduling,
2//! job enqueue, and cognition draining.
3
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use std::time::Instant;
7use tracing::warn;
8
9use nexus_core::config::{AgentConfig, CognitionConfig, CognitiveSystemConfig};
10use nexus_core::fsutil::atomic_write;
11use nexus_core::traits::EmbeddingService;
12use nexus_core::{cosine_similarity, CognitiveLevel, Memory, PerspectiveKey};
13use nexus_llm::LlmClient;
14use nexus_storage::models::EnqueueJobParams;
15use nexus_storage::repository::MemoryRepository;
16use serde_json::json;
17
18use crate::cognitive_cache::{CognitiveCache, ColdIndexEntry};
19use crate::context_builder::build_context_md;
20use crate::distill;
21use crate::error::AgentError;
22use crate::job_processor;
23use crate::session_manager::SessionManager;
24use crate::token_budget::TokenBudget;
25use crate::util::{flush_metric_samples, stage_metric_sample};
26use crate::RuntimeShutdownReason;
27
28use std::collections::{HashMap, HashSet};
29use std::path::{Path, PathBuf};
30use std::time::Duration;
31
32// ── Public request types ──────────────────────────────────────────────
33
34#[derive(Debug, Clone)]
35pub struct DreamCycleRequest<'a> {
36    pub namespace_id: i64,
37    pub lease_owner: &'a str,
38    pub perspective: Option<&'a PerspectiveKey>,
39    pub session_key: Option<&'a str>,
40    pub reflect_reason: &'a str,
41    pub digest_reason: &'a str,
42}
43
44// ── Internal scheduling types ─────────────────────────────────────────
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub(crate) enum DreamScheduleAction {
48    ImmediateBounded,
49    DelayedEnqueue,
50    DigestOnly,
51    Skip,
52}
53
54#[derive(Debug, Clone)]
55pub(crate) struct DreamSchedulePlan {
56    pub action: DreamScheduleAction,
57    pub reason: &'static str,
58}
59
60#[derive(Debug, Clone, Default)]
61pub(crate) struct DreamSignals {
62    pub raw_event_count: usize,
63    pub explicit_count: usize,
64    pub derived_count: usize,
65    pub contradiction_count: usize,
66    pub has_digest_gap: bool,
67    /// Total non-raw memories in the session scope.
68    pub total_non_raw_count: usize,
69    /// Ratio of contradictions to total non-raw (0.0 when no memories).
70    pub contradiction_density: f32,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct NapResult {
75    pub memories_processed: usize,
76    pub hot_cache_updated: bool,
77    pub elapsed_ms: u64,
78    pub timed_out: bool,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct DreamResult {
83    pub memories_derived: usize,
84    pub connections_found: usize,
85    pub hot_cold_reranked: bool,
86}
87
88/// Bundled runtime services and configuration for dream-cycle functions.
89/// Reduces argument count while keeping the dependency graph explicit.
90pub struct DreamServices {
91    pub pool: sqlx::SqlitePool,
92    pub cognition: CognitionConfig,
93    pub agent: AgentConfig,
94    pub llm: Arc<dyn LlmClient>,
95    pub embeddings: Option<Arc<dyn EmbeddingService>>,
96    pub cognitive_system: CognitiveSystemConfig,
97}
98
99/// Run a bounded "nap" cycle on session end or idle.
100pub async fn run_nap(
101    session_id: &str,
102    project_root: &Path,
103    namespace_id: i64,
104    services: &DreamServices,
105    timeout: Duration,
106) -> Result<NapResult, AgentError> {
107    let start = Instant::now();
108    let nexus_dir = project_root.join(".nexus");
109
110    let result = tokio::time::timeout(timeout, async {
111        // 1. Process raw -> derived for this session
112        let processed = drain_cognition_jobs(
113            services.pool.clone(),
114            namespace_id,
115            &services.cognition,
116            &services.agent,
117            services.llm.clone(),
118            services.embeddings.clone(),
119            &format!("nap-{}", session_id),
120        )
121        .await?;
122
123        // 2. Merge session scratch into hot cache
124        let mut cache = CognitiveCache::load_or_init(&nexus_dir);
125        let session_manager = SessionManager::new(project_root);
126        let merged = session_manager.merge_session(
127            session_id,
128            &mut cache.hot_cache,
129            services.cognitive_system.hot_cache_max_entries,
130        )?;
131
132        // 3. Update context.md
133        let window_size = TokenBudget::estimate_window(&services.agent.agent_type) as f32;
134        let max_context_tokens =
135            (window_size * services.cognitive_system.context_allocation_pct) as usize;
136        let context_md = build_context_md(&cache.hot_cache, &[], max_context_tokens);
137        let context_path = nexus_dir.join("context.md");
138        atomic_write(&context_path, &context_md)?;
139
140        // 4. Save cache
141        cache.save(&nexus_dir)?;
142
143        // 5. Mark session as merged only after cache is persisted
144        if let Err(e) = session_manager.mark_session_merged(session_id) {
145            tracing::warn!(error = %e, "Failed to mark session as merged");
146        }
147
148        Ok::<NapResult, AgentError>(NapResult {
149            memories_processed: processed,
150            hot_cache_updated: merged > 0,
151            elapsed_ms: start.elapsed().as_millis() as u64,
152            timed_out: false,
153        })
154    })
155    .await;
156
157    match result {
158        Ok(Ok(res)) => Ok(res),
159        Ok(Err(e)) => Err(e),
160        Err(_) => {
161            warn!(
162                "Nap timed out after {:?}; leased cognition jobs remain in queue \
163                 and will be re-claimed on the next cycle once their lease expires",
164                timeout
165            );
166            Ok(NapResult {
167                memories_processed: 0,
168                hot_cache_updated: false,
169                elapsed_ms: start.elapsed().as_millis() as u64,
170                timed_out: true,
171            })
172        }
173    }
174}
175
176/// Run a comprehensive "dream" cycle triggered by memory accumulation.
177pub async fn run_dream(
178    project_root: &Path,
179    namespace_id: i64,
180    services: &DreamServices,
181) -> Result<DreamResult, AgentError> {
182    let nexus_dir = project_root.join(".nexus");
183
184    // 1. Run standard dream cycle (includes reflect/digest)
185    let processed = drain_cognition_jobs(
186        services.pool.clone(),
187        namespace_id,
188        &services.cognition,
189        &services.agent,
190        services.llm.clone(),
191        services.embeddings.clone(),
192        "dream-threshold",
193    )
194    .await?;
195
196    // 2. Load cache (reranking deferred to deep-dream cycle)
197    let cache = CognitiveCache::load_or_init(&nexus_dir);
198
199    // 3. Update context.md
200    let window_size = TokenBudget::estimate_window(&services.agent.agent_type) as f32;
201    let max_context_tokens =
202        (window_size * services.cognitive_system.context_allocation_pct) as usize;
203    let context_md = build_context_md(&cache.hot_cache, &[], max_context_tokens);
204    let context_path = nexus_dir.join("context.md");
205    atomic_write(&context_path, &context_md)?;
206
207    // 4. Save cache
208    cache.save(&nexus_dir)?;
209
210    Ok(DreamResult {
211        memories_derived: processed,
212        connections_found: 0,
213        hot_cold_reranked: false,
214    })
215}
216
217// ── Top-level dream cycle ─────────────────────────────────────────────
218
219pub async fn run_dream_cycle(
220    pool: sqlx::SqlitePool,
221    cognition: &CognitionConfig,
222    agent: &AgentConfig,
223    llm: Arc<dyn LlmClient>,
224    embeddings: Option<Arc<dyn EmbeddingService>>,
225    request: DreamCycleRequest<'_>,
226) -> Result<usize, AgentError> {
227    let repo = MemoryRepository::new(pool.clone());
228    let total_started = Instant::now();
229    let mut metrics = Vec::new();
230    let enqueue_started = Instant::now();
231    enqueue_dream_jobs(
232        &repo,
233        request.namespace_id,
234        request.perspective,
235        request.session_key,
236        request.reflect_reason,
237        request.digest_reason,
238    )
239    .await?;
240    metrics.push(stage_metric_sample(
241        request.namespace_id,
242        "cognition.dream.enqueue_ms",
243        enqueue_started.elapsed().as_secs_f64() * 1000.0,
244        "enqueue",
245    ));
246    let drain_started = Instant::now();
247    let processed = drain_cognition_jobs(
248        pool,
249        request.namespace_id,
250        cognition,
251        agent,
252        llm,
253        embeddings,
254        request.lease_owner,
255    )
256    .await?;
257    metrics.push(stage_metric_sample(
258        request.namespace_id,
259        "cognition.dream.drain_ms",
260        drain_started.elapsed().as_secs_f64() * 1000.0,
261        "drain",
262    ));
263    metrics.push(stage_metric_sample(
264        request.namespace_id,
265        "cognition.dream.total_ms",
266        total_started.elapsed().as_secs_f64() * 1000.0,
267        "total",
268    ));
269    flush_metric_samples(&repo, &metrics).await;
270    Ok(processed)
271}
272
273// ── Cognition draining ────────────────────────────────────────────────
274
275pub async fn drain_cognition_jobs(
276    pool: sqlx::SqlitePool,
277    namespace_id: i64,
278    cognition: &CognitionConfig,
279    agent: &AgentConfig,
280    llm: Arc<dyn LlmClient>,
281    embeddings: Option<Arc<dyn EmbeddingService>>,
282    lease_owner: &str,
283) -> Result<usize, AgentError> {
284    let repo = MemoryRepository::new(pool);
285    let mut total_processed = 0usize;
286
287    for _ in 0..3 {
288        let mut progressed = 0usize;
289
290        if cognition.activity_distill_enabled {
291            progressed += distill::process_activity_distill_jobs(
292                &repo,
293                namespace_id,
294                cognition,
295                llm.clone(),
296                lease_owner,
297            )
298            .await?;
299        }
300        if cognition.derive_enabled {
301            progressed += job_processor::process_derive_jobs(
302                &repo,
303                namespace_id,
304                cognition,
305                agent,
306                llm.clone(),
307                embeddings.clone(),
308                lease_owner,
309            )
310            .await?;
311        }
312        if cognition.reflect_enabled {
313            progressed += job_processor::process_reflect_jobs(
314                &repo,
315                namespace_id,
316                cognition,
317                agent,
318                embeddings.clone(),
319                lease_owner,
320            )
321            .await?;
322            progressed += job_processor::process_reflect_namespace_jobs(
323                &repo,
324                namespace_id,
325                cognition,
326                agent,
327                embeddings.clone(),
328                lease_owner,
329            )
330            .await?;
331        }
332        if cognition.digest_enabled {
333            progressed += job_processor::process_digest_jobs(
334                &repo,
335                namespace_id,
336                cognition,
337                agent,
338                llm.clone(),
339                embeddings.clone(),
340                lease_owner,
341            )
342            .await?;
343        }
344
345        total_processed += progressed;
346        if progressed == 0 {
347            break;
348        }
349    }
350
351    Ok(total_processed)
352}
353
354// ── Dream job enqueue ─────────────────────────────────────────────────
355
356pub async fn enqueue_dream_jobs(
357    repo: &MemoryRepository,
358    namespace_id: i64,
359    perspective: Option<&PerspectiveKey>,
360    session_key: Option<&str>,
361    reflect_reason: &str,
362    digest_reason: &str,
363) -> Result<usize, AgentError> {
364    let mut queued = 0usize;
365
366    if let Some(perspective) = perspective {
367        let perspective_json = serde_json::to_value(perspective)
368            .map_err(|error| AgentError::Reflection(error.to_string()))?;
369        let payload = json!({
370            "reason": reflect_reason,
371            "session_key": perspective.session_key,
372        });
373        if job_processor::enqueue_job_if_absent(
374            repo,
375            EnqueueJobParams {
376                namespace_id,
377                job_type: job_processor::REFLECT_PERSPECTIVE_JOB,
378                priority: 100,
379                perspective: Some(&perspective_json),
380                payload: &payload,
381            },
382        )
383        .await?
384        {
385            queued += 1;
386        }
387    } else {
388        let payload = json!({
389            "reason": reflect_reason,
390            "session_key": session_key,
391        });
392        if job_processor::enqueue_job_if_absent(
393            repo,
394            EnqueueJobParams {
395                namespace_id,
396                job_type: job_processor::REFLECT_NAMESPACE_JOB,
397                priority: 100,
398                perspective: None,
399                payload: &payload,
400            },
401        )
402        .await?
403        {
404            queued += 1;
405        }
406    }
407
408    if let Some(session_key) = session_key {
409        let payload = json!({
410            "session_key": session_key,
411            "reason": digest_reason,
412        });
413        if job_processor::enqueue_job_if_absent(
414            repo,
415            EnqueueJobParams {
416                namespace_id,
417                job_type: job_processor::DIGEST_SESSION_JOB,
418                priority: 110,
419                perspective: None,
420                payload: &payload,
421            },
422        )
423        .await?
424        {
425            queued += 1;
426        }
427    }
428
429    Ok(queued)
430}
431
432// ── Signal collection ─────────────────────────────────────────────────
433
434pub(crate) async fn collect_dream_signals(
435    repo: &MemoryRepository,
436    namespace_id: i64,
437    session_key: &str,
438) -> Result<DreamSignals, AgentError> {
439    let memories = repo
440        .list_by_session_key(namespace_id, session_key, 512, true)
441        .await
442        .map_err(|error| AgentError::Storage(error.to_string()))?;
443    let has_digest_gap = repo
444        .count_digests(namespace_id, Some(session_key))
445        .await
446        .map_err(|error| AgentError::Storage(error.to_string()))?
447        == 0;
448
449    let mut signals = DreamSignals {
450        has_digest_gap,
451        ..DreamSignals::default()
452    };
453    for memory in memories
454        .iter()
455        .filter(|memory| memory_matches_session_key(memory, session_key))
456    {
457        if is_raw_event(memory) {
458            signals.raw_event_count += 1;
459        } else {
460            signals.total_non_raw_count += 1;
461
462            let level = nexus_core::cognitive_level_from_metadata(&memory.metadata);
463            match level {
464                CognitiveLevel::Explicit => signals.explicit_count += 1,
465                CognitiveLevel::Derived => signals.derived_count += 1,
466                CognitiveLevel::Contradiction => signals.contradiction_count += 1,
467                _ => {}
468            }
469            // Also count contradictions from times_contradicted field
470            // (avoid double-count: skip if already counted via CognitiveLevel)
471            if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&memory.metadata) {
472                if cog.times_contradicted > 0 && level != CognitiveLevel::Contradiction {
473                    signals.contradiction_count += 1;
474                }
475            }
476        }
477    }
478
479    if signals.total_non_raw_count > 0 {
480        signals.contradiction_density =
481            signals.contradiction_count as f32 / signals.total_non_raw_count as f32;
482    }
483
484    Ok(signals)
485}
486
487fn memory_matches_session_key(memory: &Memory, session_key: &str) -> bool {
488    if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&memory.metadata) {
489        if cog.session_key.as_deref() == Some(session_key) {
490            return true;
491        }
492        if cog.session_keys.iter().any(|k| k == session_key) {
493            return true;
494        }
495    }
496    false
497}
498
499/// Determine if a memory represents raw activity.
500///
501/// Returns true if the memory has the "raw-activity" label OR its cognitive
502/// level is explicitly Raw. Memories with missing cognitive metadata default
503/// to Raw (intentional: unclassified activity is treated as raw until processed).
504fn is_raw_event(memory: &Memory) -> bool {
505    // A raw event is any memory labeled raw-activity regardless of category,
506    // or one whose cognitive level is explicitly Raw.
507    memory.labels.iter().any(|l| l == "raw-activity")
508        || nexus_core::cognitive_level_from_metadata(&memory.metadata) == CognitiveLevel::Raw
509}
510
511// ── Deep Dream & Cross-Project Logic ─────────────────────────────────
512
513use crate::activity_monitor::ActivityMonitor;
514use crate::soul::{SoulBuilder, SoulCandidate};
515
516#[derive(Debug, Clone, Serialize, Deserialize)]
517pub struct DeepDreamResult {
518    pub soul_updated: bool,
519    pub memories_pruned: usize,
520    pub cross_project_patterns: usize,
521    pub cold_caches_reindexed: usize,
522}
523
524/// Run a comprehensive deep dream cycle across all namespaces.
525pub async fn run_deep_dream(
526    services: &DreamServices,
527    soul_builder: &SoulBuilder,
528    activity_monitor: &mut ActivityMonitor,
529) -> Result<DeepDreamResult, AgentError> {
530    let repo = MemoryRepository::new(services.pool.clone());
531    let ns_repo = nexus_storage::repository::NamespaceRepository::new(services.pool.clone());
532
533    // Fetch all namespaces once for reuse across all deep-dream stages
534    let namespaces = ns_repo
535        .list_all()
536        .await
537        .map_err(|e| AgentError::Storage(e.to_string()))?;
538
539    // 1. Run standard dream for all namespaces
540    for ns in &namespaces {
541        if let Err(e) = drain_cognition_jobs(
542            services.pool.clone(),
543            ns.id,
544            &services.cognition,
545            &services.agent,
546            services.llm.clone(),
547            services.embeddings.clone(),
548            "deep-dream-cleanup",
549        )
550        .await
551        {
552            tracing::warn!(namespace_id = ns.id, error = %e, "drain_cognition_jobs failed");
553        }
554    }
555
556    // 2. Cross-project pattern extraction
557    let mut memories_by_project: HashMap<String, Vec<Memory>> = HashMap::new();
558    for ns in &namespaces {
559        let filters = nexus_storage::repository::ListMemoryFilters {
560            category: None,
561            since: None,
562            until: None,
563            content_like: None,
564            include_raw: false,
565            limit: 1000,
566            offset: 0,
567        };
568        if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
569            for m in memories {
570                let level = m.level();
571                if level == CognitiveLevel::Derived || level == CognitiveLevel::Explicit {
572                    let project_name = m
573                        .metadata
574                        .get("runtime")
575                        .and_then(|r| r.get("project_name"))
576                        .and_then(|p| p.as_str())
577                        .unwrap_or("unknown")
578                        .to_string();
579                    memories_by_project.entry(project_name).or_default().push(m);
580                }
581            }
582        }
583    }
584
585    let candidates = extract_cross_project_patterns(
586        memories_by_project,
587        services.embeddings.as_deref(),
588        services.cognitive_system.similarity_threshold,
589    )
590    .await;
591    let total_patterns = candidates.len();
592
593    // 3. Rebuild Soul
594    let soul_updated = if !candidates.is_empty() {
595        soul_builder.rebuild_soul(&candidates).await.is_ok()
596    } else {
597        false
598    };
599
600    // 4. Prune redundant archived raw-activity memories
601    let mut memories_pruned = 0usize;
602    let prune_cutoff = chrono::Utc::now() - chrono::Duration::days(30);
603    for ns in &namespaces {
604        if let Ok(candidates) = repo
605            .list_archived_raw_cleanup_candidates(ns.id, prune_cutoff, 500)
606            .await
607        {
608            if !candidates.is_empty() {
609                let ids: Vec<i64> = candidates.iter().map(|m| m.id).collect();
610                match repo.delete_batch(&ids).await {
611                    Ok(deleted) => memories_pruned += deleted as usize,
612                    Err(e) => {
613                        warn!(
614                            error = %e, count = ids.len(),
615                            "Failed to delete archived raw memories; skipping"
616                        );
617                    }
618                }
619            }
620        }
621    }
622
623    // 5. Cold-cache reindexing across discoverable project roots
624    let mut cold_caches_reindexed = 0usize;
625    // Discover project roots from memory metadata (cwd field)
626    let mut project_roots: HashSet<PathBuf> = HashSet::new();
627    for ns in &namespaces {
628        let filters = nexus_storage::repository::ListMemoryFilters {
629            category: None,
630            since: Some(chrono::Utc::now() - chrono::Duration::days(90)),
631            until: None,
632            content_like: None,
633            include_raw: true,
634            limit: 200,
635            offset: 0,
636        };
637        if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
638            for m in &memories {
639                if let Some(cwd) = m
640                    .metadata
641                    .get("runtime")
642                    .and_then(|r| r.get("cwd"))
643                    .and_then(|v| v.as_str())
644                {
645                    let root = PathBuf::from(cwd);
646                    if root.join(".nexus").exists() {
647                        project_roots.insert(root);
648                    }
649                }
650            }
651        }
652    }
653
654    let reindex_cutoff = chrono::Utc::now() - chrono::Duration::days(7);
655    for root in &project_roots {
656        let nexus_dir = root.join(".nexus");
657        let mut cache = CognitiveCache::load_or_init(&nexus_dir);
658
659        // Collect recent memory IDs across all namespaces for this project
660        let mut fresh_entries: Vec<ColdIndexEntry> = Vec::new();
661        for ns in &namespaces {
662            let filters = nexus_storage::repository::ListMemoryFilters {
663                category: None,
664                since: Some(reindex_cutoff),
665                until: None,
666                content_like: None,
667                include_raw: false,
668                limit: 50,
669                offset: 0,
670            };
671            if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
672                for m in memories {
673                    let cwd_match = m
674                        .metadata
675                        .get("runtime")
676                        .and_then(|r| r.get("cwd"))
677                        .and_then(|v| v.as_str())
678                        .map(|c| Path::new(c) == root.as_path())
679                        .unwrap_or(false);
680                    if cwd_match {
681                        fresh_entries.push(ColdIndexEntry {
682                            memory_id: m.id,
683                            project_relevance: 0.7,
684                            last_surfaced: Some(m.created_at),
685                        });
686                    }
687                }
688            }
689        }
690
691        // Reindex: cold cache is now exactly the fresh set.
692        // This replaces the old merge strategy that accumulated stale entries.
693        cache.cold_index.entries = fresh_entries;
694        cache.cold_index.last_reindexed = Some(chrono::Utc::now());
695
696        match cache.save(&nexus_dir) {
697            Ok(()) if !cache.cold_index.entries.is_empty() => {
698                cold_caches_reindexed += 1;
699            }
700            Ok(()) => {}
701            Err(e) => {
702                tracing::warn!(error = %e, "Failed to save cold cache after reindexing");
703            }
704        }
705    }
706
707    // 6. Update monitor
708    activity_monitor.last_deep_dream = Some(chrono::Utc::now());
709    if let Err(e) = activity_monitor.save() {
710        tracing::error!("Failed to save activity monitor after deep dream: {}", e);
711    }
712
713    Ok(DeepDreamResult {
714        soul_updated,
715        memories_pruned,
716        cross_project_patterns: total_patterns,
717        cold_caches_reindexed,
718    })
719}
720
721/// Extract patterns that appear across multiple projects.
722///
723/// Uses semantic similarity (embeddings) to group conceptually similar memories
724/// rather than brittle exact string matching.
725pub async fn extract_cross_project_patterns(
726    memories_by_project: HashMap<String, Vec<Memory>>,
727    embeddings: Option<&dyn EmbeddingService>,
728    similarity_threshold: f32,
729) -> Vec<SoulCandidate> {
730    // Flatten memories with their project attribution
731    let mut flat_memories: Vec<(Memory, String)> = Vec::new();
732    for (project, memories) in memories_by_project {
733        for m in memories {
734            flat_memories.push((m, project.clone()));
735        }
736    }
737
738    if flat_memories.len() < 2 {
739        return Vec::new();
740    }
741
742    // Collect embeddings (existing or computed)
743    let mut emb_map: HashMap<i64, Vec<f32>> = HashMap::new();
744    let mut to_compute: Vec<usize> = Vec::new();
745
746    for (idx, (m, _)) in flat_memories.iter().enumerate() {
747        if let Some(emb) = &m.content_embedding {
748            emb_map.insert(m.id, emb.clone());
749        } else {
750            to_compute.push(idx);
751        }
752    }
753
754    // Compute missing embeddings in batch if service available
755    if let Some(service) = embeddings {
756        let contents: Vec<String> = to_compute
757            .iter()
758            .map(|&idx| flat_memories[idx].0.content.clone())
759            .collect();
760        if !contents.is_empty() {
761            match service.embed_batch(&contents).await {
762                Ok(results) if results.len() == contents.len() => {
763                    for (idx, emb) in to_compute.into_iter().zip(results) {
764                        let mem_id = flat_memories[idx].0.id;
765                        emb_map.insert(mem_id, emb);
766                    }
767                }
768                Ok(results) => {
769                    tracing::warn!(
770                        "embed_batch returned {} results for {} inputs in pattern extraction",
771                        results.len(),
772                        contents.len()
773                    );
774                }
775                Err(e) => {
776                    tracing::warn!("embed_batch failed in pattern extraction: {}", e);
777                }
778            }
779        }
780    }
781
782    // If we don't have embeddings for at least some memories, semantic grouping
783    // won't work well. Fall back to simple exact-match grouping on lowercased content.
784    if emb_map.len() * 2 < flat_memories.len() {
785        // Fallback: original algorithm (exact match after lowercase)
786        let mut pattern_map: HashMap<String, (u32, Vec<String>)> = HashMap::new();
787        for (m, project) in &flat_memories {
788            let normalized = m.content.to_lowercase();
789            let entry = pattern_map.entry(normalized).or_insert((0, Vec::new()));
790            entry.0 += 1;
791            if !entry.1.contains(project) {
792                entry.1.push(project.clone());
793            }
794        }
795        return pattern_map
796            .into_iter()
797            .filter(|(_, (_count, projects))| projects.len() >= 2)
798            .map(|(content, (count, projects))| SoulCandidate {
799                content,
800                source_project: projects.join(", "),
801                observation_count: count,
802                category: "TechnicalLearning".into(),
803                source_agent: "nexus-dream-engine".into(),
804            })
805            .collect();
806    }
807
808    // Union-find for clustering
809    let n = flat_memories.len();
810    let mut parent: Vec<usize> = (0..n).collect();
811
812    fn find(mut x: usize, parent: &mut [usize]) -> usize {
813        let mut root = x;
814        while parent[root] != root {
815            root = parent[root];
816        }
817        while parent[x] != root {
818            let next = parent[x];
819            parent[x] = root;
820            x = next;
821        }
822        root
823    }
824
825    fn union(x: usize, y: usize, parent: &mut [usize]) {
826        let rx = find(x, parent);
827        let ry = find(y, parent);
828        if rx != ry {
829            parent[ry] = rx;
830        }
831    }
832
833    // Compare each pair with available embeddings
834    let indices: Vec<usize> = (0..n)
835        .filter(|i| emb_map.contains_key(&flat_memories[*i].0.id))
836        .collect();
837    for &i in &indices {
838        let emb_i = emb_map.get(&flat_memories[i].0.id).unwrap();
839        for &j in indices.iter().filter(|&&j| j > i) {
840            let emb_j = emb_map.get(&flat_memories[j].0.id).unwrap();
841            if cosine_similarity(emb_i, emb_j) >= similarity_threshold {
842                union(i, j, &mut parent);
843            }
844        }
845    }
846
847    // Build clusters
848    let mut clusters: HashMap<usize, Vec<usize>> = HashMap::new();
849    for i in 0..n {
850        let root = find(i, &mut parent);
851        clusters.entry(root).or_default().push(i);
852    }
853
854    // Convert clusters to SoulCandidates
855    let mut candidates = Vec::new();
856    for indices in clusters.values() {
857        let mut projects_set: HashSet<String> = HashSet::new();
858        for &idx in indices {
859            projects_set.insert(flat_memories[idx].1.clone());
860        }
861        if projects_set.len() >= 2 {
862            // Choose representative: longest content (most descriptive)
863            let (rep_idx, _) = indices
864                .iter()
865                .map(|&idx| (idx, flat_memories[idx].0.content.len()))
866                .max_by_key(|(_, len)| *len)
867                .unwrap();
868            let content = flat_memories[rep_idx].0.content.clone();
869            let observation_count = indices.len() as u32;
870            let source_project = projects_set.into_iter().collect::<Vec<String>>().join(", ");
871            candidates.push(SoulCandidate {
872                content,
873                source_project,
874                observation_count,
875                category: "TechnicalLearning".into(),
876                source_agent: "nexus-dream-engine".into(),
877            });
878        }
879    }
880
881    candidates
882}
883
884// ── Adaptive scheduling ───────────────────────────────────────────────
885
886pub(crate) fn choose_dream_schedule(
887    signals: &DreamSignals,
888    reason: RuntimeShutdownReason,
889) -> DreamSchedulePlan {
890    // No signals at all -> skip
891    if signals.total_non_raw_count == 0
892        && signals.raw_event_count == 0
893        && signals.explicit_count == 0
894        && signals.derived_count == 0
895        && signals.contradiction_count == 0
896    {
897        return DreamSchedulePlan {
898            action: DreamScheduleAction::Skip,
899            reason: "no signals",
900        };
901    }
902
903    // --- Density-based decisions (when density is nonzero) ---
904    if signals.contradiction_density > 0.0 {
905        // High contradiction density -> immediate regardless of reason
906        if signals.contradiction_density > 0.15 {
907            return DreamSchedulePlan {
908                action: DreamScheduleAction::ImmediateBounded,
909                reason: "high contradiction density",
910            };
911        }
912
913        if reason == RuntimeShutdownReason::SessionEnded && signals.contradiction_density >= 0.10 {
914            return DreamSchedulePlan {
915                action: DreamScheduleAction::ImmediateBounded,
916                reason: "moderate contradiction density at session end",
917            };
918        }
919
920        // Low density with contradictions: defer to IdleTimeout or DelayedEnqueue
921        if reason == RuntimeShutdownReason::IdleTimeout {
922            if signals.explicit_count > 0 || signals.derived_count > 0 {
923                return DreamSchedulePlan {
924                    action: DreamScheduleAction::DelayedEnqueue,
925                    reason: "delays idle medium signal sessions",
926                };
927            }
928            return DreamSchedulePlan {
929                action: DreamScheduleAction::Skip,
930                reason: "idle without signal",
931            };
932        }
933
934        // Density present but not high enough for immediate -> fall through to
935        // count-based or explicit/derived logic below only at session end.
936        if reason == RuntimeShutdownReason::SessionEnded
937            && (signals.explicit_count > 0 || signals.derived_count > 0)
938        {
939            return DreamSchedulePlan {
940                action: DreamScheduleAction::ImmediateBounded,
941                reason: "session end flushes explicit reflection",
942            };
943        }
944
945        // Fallback for density-present cases: DelayedEnqueue or DigestOnly
946        if signals.has_digest_gap && signals.raw_event_count > 0 {
947            return DreamSchedulePlan {
948                action: DreamScheduleAction::DigestOnly,
949                reason: "digest only for light digest gap",
950            };
951        }
952        return DreamSchedulePlan {
953            action: DreamScheduleAction::DelayedEnqueue,
954            reason: "default_background",
955        };
956    }
957
958    // --- Count-based decisions (density is zero) ---
959    if signals.contradiction_count > 0 {
960        return DreamSchedulePlan {
961            action: DreamScheduleAction::ImmediateBounded,
962            reason: "contradiction detected",
963        };
964    }
965
966    if reason == RuntimeShutdownReason::SessionEnded
967        && (signals.explicit_count > 0 || signals.derived_count > 0)
968    {
969        return DreamSchedulePlan {
970            action: DreamScheduleAction::ImmediateBounded,
971            reason: "session end flushes explicit reflection",
972        };
973    }
974
975    if signals.has_digest_gap && signals.raw_event_count > 0 {
976        return DreamSchedulePlan {
977            action: DreamScheduleAction::DigestOnly,
978            reason: "digest only for light digest gap",
979        };
980    }
981
982    if reason == RuntimeShutdownReason::IdleTimeout {
983        if signals.explicit_count > 0 || signals.derived_count > 0 {
984            return DreamSchedulePlan {
985                action: DreamScheduleAction::DelayedEnqueue,
986                reason: "delays idle medium signal sessions",
987            };
988        }
989        return DreamSchedulePlan {
990            action: DreamScheduleAction::Skip,
991            reason: "idle without signal",
992        };
993    }
994
995    DreamSchedulePlan {
996        action: DreamScheduleAction::DelayedEnqueue,
997        reason: "default_background",
998    }
999}
1000
1001pub(crate) async fn compute_adaptive_dream_interval(
1002    pool: sqlx::SqlitePool,
1003    namespace_id: i64,
1004    base_interval_secs: u64,
1005    cognition: &CognitionConfig,
1006) -> Duration {
1007    let repo = MemoryRepository::new(pool);
1008    let filters = nexus_storage::repository::ListMemoryFilters {
1009        category: None,
1010        since: None,
1011        until: None,
1012        content_like: None,
1013        include_raw: true,
1014        limit: 100,
1015        offset: 0,
1016    };
1017
1018    if let Ok(recent_memories) = repo.list_filtered(namespace_id, filters).await {
1019        if recent_memories.is_empty() {
1020            return Duration::from_secs(base_interval_secs.clamp(
1021                cognition.adaptive_dream_min_interval_secs,
1022                cognition.adaptive_dream_max_interval_secs,
1023            ));
1024        }
1025
1026        let contradiction_count = recent_memories
1027            .iter()
1028            .filter(|m| {
1029                // Count if explicitly marked as Contradiction level
1030                if nexus_core::cognitive_level_from_metadata(&m.metadata)
1031                    == CognitiveLevel::Contradiction
1032                {
1033                    return true;
1034                }
1035                // Or if times_contradicted field indicates contradiction
1036                if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&m.metadata) {
1037                    if cog.times_contradicted > 0 {
1038                        return true;
1039                    }
1040                }
1041                false
1042            })
1043            .count();
1044
1045        let density = contradiction_count as f32 / recent_memories.len() as f32;
1046
1047        let multiplier = if density > 0.10 {
1048            0.5
1049        } else if recent_memories.len() < 5 {
1050            2.0
1051        } else {
1052            1.0
1053        };
1054
1055        let interval = (base_interval_secs as f32 * multiplier) as u64;
1056        let interval = interval.clamp(
1057            cognition.adaptive_dream_min_interval_secs,
1058            cognition.adaptive_dream_max_interval_secs,
1059        );
1060
1061        return Duration::from_secs(interval);
1062    }
1063
1064    Duration::from_secs(base_interval_secs.clamp(
1065        cognition.adaptive_dream_min_interval_secs,
1066        cognition.adaptive_dream_max_interval_secs,
1067    ))
1068}