Skip to main content

nexus_memory_agent/
dream_cycle.rs

1//! Dream cycle orchestration — signal collection, adaptive scheduling,
2//! job enqueue, and cognition draining.
3
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use std::time::Instant;
7use tracing::warn;
8
9use nexus_core::config::{AgentConfig, CognitionConfig, CognitiveSystemConfig};
10use nexus_core::fsutil::atomic_write;
11use nexus_core::traits::EmbeddingService;
12use nexus_core::{cosine_similarity, CognitiveLevel, Memory, PerspectiveKey};
13use nexus_llm::LlmClient;
14use nexus_storage::models::EnqueueJobParams;
15use nexus_storage::repository::MemoryRepository;
16use serde_json::json;
17
18use crate::cognitive_cache::{CognitiveCache, ColdIndexEntry};
19use crate::context_builder::build_context_md;
20use crate::distill;
21use crate::error::AgentError;
22use crate::job_processor;
23use crate::session_manager::SessionManager;
24use crate::token_budget::TokenBudget;
25use crate::util::{flush_metric_samples, stage_metric_sample};
26use crate::RuntimeShutdownReason;
27
28use std::collections::{HashMap, HashSet};
29use std::path::{Path, PathBuf};
30use std::time::Duration;
31
32// ── Public request types ──────────────────────────────────────────────
33
34#[derive(Debug, Clone)]
35pub struct DreamCycleRequest<'a> {
36    pub namespace_id: i64,
37    pub lease_owner: &'a str,
38    pub perspective: Option<&'a PerspectiveKey>,
39    pub session_key: Option<&'a str>,
40    pub reflect_reason: &'a str,
41    pub digest_reason: &'a str,
42}
43
44// ── Internal scheduling types ─────────────────────────────────────────
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub(crate) enum DreamScheduleAction {
48    ImmediateBounded,
49    DelayedEnqueue,
50    DigestOnly,
51    Skip,
52}
53
54#[derive(Debug, Clone)]
55pub(crate) struct DreamSchedulePlan {
56    pub action: DreamScheduleAction,
57    pub reason: &'static str,
58}
59
60#[derive(Debug, Clone, Default)]
61pub(crate) struct DreamSignals {
62    pub raw_event_count: usize,
63    pub explicit_count: usize,
64    pub derived_count: usize,
65    pub contradiction_count: usize,
66    pub has_digest_gap: bool,
67    /// Total non-raw memories in the session scope.
68    pub total_non_raw_count: usize,
69    /// Ratio of contradictions to total non-raw (0.0 when no memories).
70    pub contradiction_density: f32,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct NapResult {
75    pub memories_processed: usize,
76    pub hot_cache_updated: bool,
77    pub elapsed_ms: u64,
78    pub timed_out: bool,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct DreamResult {
83    pub memories_derived: usize,
84    pub connections_found: usize,
85    pub hot_cold_reranked: bool,
86}
87
88/// Bundled runtime services and configuration for dream-cycle functions.
89/// Reduces argument count while keeping the dependency graph explicit.
90pub struct DreamServices {
91    pub pool: sqlx::SqlitePool,
92    pub cognition: CognitionConfig,
93    pub agent: AgentConfig,
94    pub llm: Arc<dyn LlmClient>,
95    pub embeddings: Option<Arc<dyn EmbeddingService>>,
96    pub cognitive_system: CognitiveSystemConfig,
97}
98
99/// Run a bounded "nap" cycle on session end or idle.
100pub async fn run_nap(
101    session_id: &str,
102    project_root: &Path,
103    namespace_id: i64,
104    services: &DreamServices,
105    timeout: Duration,
106) -> Result<NapResult, AgentError> {
107    let start = Instant::now();
108    let nexus_dir = project_root.join(".nexus");
109
110    let result = tokio::time::timeout(timeout, async {
111        // 1. Process raw -> derived for this session
112        let processed = drain_cognition_jobs(
113            services.pool.clone(),
114            namespace_id,
115            &services.cognition,
116            &services.agent,
117            services.llm.clone(),
118            services.embeddings.clone(),
119            &format!("nap-{}", session_id),
120        )
121        .await?;
122
123        // 2. Merge session scratch into hot cache
124        let mut cache = CognitiveCache::load_or_init(&nexus_dir);
125        let session_manager = SessionManager::new(project_root);
126        let merged = session_manager.merge_session(
127            session_id,
128            &mut cache.hot_cache,
129            services.cognitive_system.hot_cache_max_entries,
130        )?;
131
132        // 3. Update context.md
133        let window_size = TokenBudget::estimate_window(&services.agent.agent_type) as f32;
134        let max_context_tokens =
135            (window_size * services.cognitive_system.context_allocation_pct) as usize;
136        let context_md = build_context_md(&cache.hot_cache, &[], max_context_tokens);
137        let context_path = nexus_dir.join("context.md");
138        atomic_write(&context_path, &context_md)?;
139
140        // 4. Save cache
141        cache.save(&nexus_dir)?;
142
143        // 5. Mark session as merged only after cache is persisted
144        if let Err(e) = session_manager.mark_session_merged(session_id) {
145            tracing::warn!(error = %e, "Failed to mark session as merged");
146        }
147
148        Ok::<NapResult, AgentError>(NapResult {
149            memories_processed: processed,
150            hot_cache_updated: merged > 0,
151            elapsed_ms: start.elapsed().as_millis() as u64,
152            timed_out: false,
153        })
154    })
155    .await;
156
157    match result {
158        Ok(Ok(res)) => Ok(res),
159        Ok(Err(e)) => Err(e),
160        Err(_) => {
161            warn!(
162                "Nap timed out after {:?}; leased cognition jobs remain in queue \
163                 and will be re-claimed on the next cycle once their lease expires",
164                timeout
165            );
166            Ok(NapResult {
167                memories_processed: 0,
168                hot_cache_updated: false,
169                elapsed_ms: start.elapsed().as_millis() as u64,
170                timed_out: true,
171            })
172        }
173    }
174}
175
176/// Run a comprehensive "dream" cycle triggered by memory accumulation.
177pub async fn run_dream(
178    project_root: &Path,
179    namespace_id: i64,
180    services: &DreamServices,
181) -> Result<DreamResult, AgentError> {
182    let nexus_dir = project_root.join(".nexus");
183
184    // 1. Run standard dream cycle (includes reflect/digest)
185    let processed = match drain_cognition_jobs(
186        services.pool.clone(),
187        namespace_id,
188        &services.cognition,
189        &services.agent,
190        services.llm.clone(),
191        services.embeddings.clone(),
192        "dream-threshold",
193    )
194    .await
195    {
196        Ok(result) => result,
197        Err(e) => {
198            tracing::error!(
199                namespace_id = namespace_id,
200                error = %e,
201                "Failed to drain cognition jobs in dream cycle"
202            );
203            return Err(e);
204        }
205    };
206
207    // 2. Load cache (reranking deferred to deep-dream cycle)
208    let cache = CognitiveCache::load_or_init(&nexus_dir);
209
210    // 3. Update context.md
211    let window_size = TokenBudget::estimate_window(&services.agent.agent_type) as f32;
212    let max_context_tokens =
213        (window_size * services.cognitive_system.context_allocation_pct) as usize;
214    let context_md = build_context_md(&cache.hot_cache, &[], max_context_tokens);
215    let context_path = nexus_dir.join("context.md");
216    atomic_write(&context_path, &context_md)?;
217
218    // 4. Save cache
219    cache.save(&nexus_dir)?;
220
221    Ok(DreamResult {
222        memories_derived: processed,
223        connections_found: 0,
224        hot_cold_reranked: false,
225    })
226}
227
228// ── Top-level dream cycle ─────────────────────────────────────────────
229
230pub async fn run_dream_cycle(
231    pool: sqlx::SqlitePool,
232    cognition: &CognitionConfig,
233    agent: &AgentConfig,
234    llm: Arc<dyn LlmClient>,
235    embeddings: Option<Arc<dyn EmbeddingService>>,
236    request: DreamCycleRequest<'_>,
237) -> Result<usize, AgentError> {
238    let repo = MemoryRepository::new(pool.clone());
239    let total_started = Instant::now();
240    let mut metrics = Vec::new();
241    let enqueue_started = Instant::now();
242    enqueue_dream_jobs(
243        &repo,
244        request.namespace_id,
245        request.perspective,
246        request.session_key,
247        request.reflect_reason,
248        request.digest_reason,
249    )
250    .await?;
251    metrics.push(stage_metric_sample(
252        request.namespace_id,
253        "cognition.dream.enqueue_ms",
254        enqueue_started.elapsed().as_secs_f64() * 1000.0,
255        "enqueue",
256    ));
257    let drain_started = Instant::now();
258    let processed = drain_cognition_jobs(
259        pool,
260        request.namespace_id,
261        cognition,
262        agent,
263        llm,
264        embeddings,
265        request.lease_owner,
266    )
267    .await?;
268    metrics.push(stage_metric_sample(
269        request.namespace_id,
270        "cognition.dream.drain_ms",
271        drain_started.elapsed().as_secs_f64() * 1000.0,
272        "drain",
273    ));
274    metrics.push(stage_metric_sample(
275        request.namespace_id,
276        "cognition.dream.total_ms",
277        total_started.elapsed().as_secs_f64() * 1000.0,
278        "total",
279    ));
280    flush_metric_samples(&repo, &metrics).await;
281    Ok(processed)
282}
283
284// ── Cognition draining ────────────────────────────────────────────────
285
286pub async fn drain_cognition_jobs(
287    pool: sqlx::SqlitePool,
288    namespace_id: i64,
289    cognition: &CognitionConfig,
290    agent: &AgentConfig,
291    llm: Arc<dyn LlmClient>,
292    embeddings: Option<Arc<dyn EmbeddingService>>,
293    lease_owner: &str,
294) -> Result<usize, AgentError> {
295    let repo = MemoryRepository::new(pool);
296    let mut total_processed = 0usize;
297
298    for _ in 0..3 {
299        let mut progressed = 0usize;
300
301        if cognition.activity_distill_enabled {
302            progressed += distill::process_activity_distill_jobs(
303                &repo,
304                namespace_id,
305                cognition,
306                llm.clone(),
307                lease_owner,
308            )
309            .await?;
310        }
311        if cognition.derive_enabled {
312            progressed += job_processor::process_derive_jobs(
313                &repo,
314                namespace_id,
315                cognition,
316                agent,
317                llm.clone(),
318                embeddings.clone(),
319                lease_owner,
320            )
321            .await?;
322        }
323        if cognition.reflect_enabled {
324            progressed += job_processor::process_reflect_jobs(
325                &repo,
326                namespace_id,
327                cognition,
328                agent,
329                embeddings.clone(),
330                lease_owner,
331            )
332            .await?;
333            progressed += job_processor::process_reflect_namespace_jobs(
334                &repo,
335                namespace_id,
336                cognition,
337                agent,
338                embeddings.clone(),
339                lease_owner,
340            )
341            .await?;
342        }
343        if cognition.digest_enabled {
344            progressed += job_processor::process_digest_jobs(
345                &repo,
346                namespace_id,
347                cognition,
348                agent,
349                llm.clone(),
350                embeddings.clone(),
351                lease_owner,
352            )
353            .await?;
354        }
355
356        total_processed += progressed;
357        if progressed == 0 {
358            break;
359        }
360    }
361
362    Ok(total_processed)
363}
364
365// ── Dream job enqueue ─────────────────────────────────────────────────
366
367pub async fn enqueue_dream_jobs(
368    repo: &MemoryRepository,
369    namespace_id: i64,
370    perspective: Option<&PerspectiveKey>,
371    session_key: Option<&str>,
372    reflect_reason: &str,
373    digest_reason: &str,
374) -> Result<usize, AgentError> {
375    let mut queued = 0usize;
376
377    if let Some(perspective) = perspective {
378        let perspective_json = serde_json::to_value(perspective)
379            .map_err(|error| AgentError::Reflection(error.to_string()))?;
380        let payload = json!({
381            "reason": reflect_reason,
382            "session_key": perspective.session_key,
383        });
384        if job_processor::enqueue_job_if_absent(
385            repo,
386            EnqueueJobParams {
387                namespace_id,
388                job_type: job_processor::REFLECT_PERSPECTIVE_JOB,
389                priority: 100,
390                perspective: Some(&perspective_json),
391                payload: &payload,
392            },
393        )
394        .await?
395        {
396            queued += 1;
397        }
398    } else {
399        let payload = json!({
400            "reason": reflect_reason,
401            "session_key": session_key,
402        });
403        if job_processor::enqueue_job_if_absent(
404            repo,
405            EnqueueJobParams {
406                namespace_id,
407                job_type: job_processor::REFLECT_NAMESPACE_JOB,
408                priority: 100,
409                perspective: None,
410                payload: &payload,
411            },
412        )
413        .await?
414        {
415            queued += 1;
416        }
417    }
418
419    if let Some(session_key) = session_key {
420        let payload = json!({
421            "session_key": session_key,
422            "reason": digest_reason,
423        });
424        if job_processor::enqueue_job_if_absent(
425            repo,
426            EnqueueJobParams {
427                namespace_id,
428                job_type: job_processor::DIGEST_SESSION_JOB,
429                priority: 110,
430                perspective: None,
431                payload: &payload,
432            },
433        )
434        .await?
435        {
436            queued += 1;
437        }
438    }
439
440    Ok(queued)
441}
442
443// ── Signal collection ─────────────────────────────────────────────────
444
445pub(crate) async fn collect_dream_signals(
446    repo: &MemoryRepository,
447    namespace_id: i64,
448    session_key: &str,
449) -> Result<DreamSignals, AgentError> {
450    let memories = repo
451        .list_by_session_key(namespace_id, session_key, 512, true)
452        .await
453        .map_err(|error| AgentError::Storage(error.to_string()))?;
454    let has_digest_gap = repo
455        .count_digests(namespace_id, Some(session_key))
456        .await
457        .map_err(|error| AgentError::Storage(error.to_string()))?
458        == 0;
459
460    let mut signals = DreamSignals {
461        has_digest_gap,
462        ..DreamSignals::default()
463    };
464    for memory in memories
465        .iter()
466        .filter(|memory| memory_matches_session_key(memory, session_key))
467    {
468        if is_raw_event(memory) {
469            signals.raw_event_count += 1;
470        } else {
471            signals.total_non_raw_count += 1;
472
473            let level = nexus_core::cognitive_level_from_metadata(&memory.metadata);
474            match level {
475                CognitiveLevel::Explicit => signals.explicit_count += 1,
476                CognitiveLevel::Derived => signals.derived_count += 1,
477                CognitiveLevel::Contradiction => signals.contradiction_count += 1,
478                _ => {}
479            }
480            // Also count contradictions from times_contradicted field
481            // (avoid double-count: skip if already counted via CognitiveLevel)
482            if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&memory.metadata) {
483                if cog.times_contradicted > 0 && level != CognitiveLevel::Contradiction {
484                    signals.contradiction_count += 1;
485                }
486            }
487        }
488    }
489
490    if signals.total_non_raw_count > 0 {
491        signals.contradiction_density =
492            signals.contradiction_count as f32 / signals.total_non_raw_count as f32;
493    }
494
495    Ok(signals)
496}
497
498fn memory_matches_session_key(memory: &Memory, session_key: &str) -> bool {
499    if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&memory.metadata) {
500        if cog.session_key.as_deref() == Some(session_key) {
501            return true;
502        }
503        if cog.session_keys.iter().any(|k| k == session_key) {
504            return true;
505        }
506    }
507    false
508}
509
510/// Determine if a memory represents raw activity.
511///
512/// Returns true if the memory has the "raw-activity" label OR its cognitive
513/// level is explicitly Raw. Memories with missing cognitive metadata default
514/// to Raw (intentional: unclassified activity is treated as raw until processed).
515fn is_raw_event(memory: &Memory) -> bool {
516    // A raw event is any memory labeled raw-activity regardless of category,
517    // or one whose cognitive level is explicitly Raw.
518    memory.labels.iter().any(|l| l == "raw-activity")
519        || nexus_core::cognitive_level_from_metadata(&memory.metadata) == CognitiveLevel::Raw
520}
521
522// ── Deep Dream & Cross-Project Logic ─────────────────────────────────
523
524use crate::activity_monitor::ActivityMonitor;
525use crate::soul::{SoulBuilder, SoulCandidate};
526
527#[derive(Debug, Clone, Serialize, Deserialize)]
528pub struct DeepDreamResult {
529    pub soul_updated: bool,
530    pub memories_pruned: usize,
531    pub cross_project_patterns: usize,
532    pub cold_caches_reindexed: usize,
533}
534
535/// Run a comprehensive deep dream cycle across all namespaces.
536pub async fn run_deep_dream(
537    services: &DreamServices,
538    soul_builder: &SoulBuilder,
539    activity_monitor: &mut ActivityMonitor,
540) -> Result<DeepDreamResult, AgentError> {
541    let repo = MemoryRepository::new(services.pool.clone());
542    let ns_repo = nexus_storage::repository::NamespaceRepository::new(services.pool.clone());
543
544    // Fetch all namespaces once for reuse across all deep-dream stages
545    let namespaces = ns_repo
546        .list_all()
547        .await
548        .map_err(|e| AgentError::Storage(e.to_string()))?;
549
550    // 1. Run standard dream for all namespaces
551    for ns in &namespaces {
552        if let Err(e) = drain_cognition_jobs(
553            services.pool.clone(),
554            ns.id,
555            &services.cognition,
556            &services.agent,
557            services.llm.clone(),
558            services.embeddings.clone(),
559            "deep-dream-cleanup",
560        )
561        .await
562        {
563            tracing::warn!(namespace_id = ns.id, error = %e, "drain_cognition_jobs failed");
564        }
565    }
566
567    // 2. Cross-project pattern extraction
568    let mut memories_by_project: HashMap<String, Vec<Memory>> = HashMap::new();
569    for ns in &namespaces {
570        let filters = nexus_storage::repository::ListMemoryFilters {
571            category: None,
572            since: None,
573            until: None,
574            content_like: None,
575            include_raw: false,
576            limit: 1000,
577            offset: 0,
578        };
579        if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
580            for m in memories {
581                let level = m.level();
582                if level == CognitiveLevel::Derived || level == CognitiveLevel::Explicit {
583                    let project_name = m
584                        .metadata
585                        .get("runtime")
586                        .and_then(|r| r.get("project_name"))
587                        .and_then(|p| p.as_str())
588                        .unwrap_or("unknown")
589                        .to_string();
590                    memories_by_project.entry(project_name).or_default().push(m);
591                }
592            }
593        }
594    }
595
596    let candidates = extract_cross_project_patterns(
597        memories_by_project,
598        services.embeddings.as_deref(),
599        services.cognitive_system.similarity_threshold,
600    )
601    .await;
602    let total_patterns = candidates.len();
603
604    // 3. Rebuild Soul
605    let soul_updated = if !candidates.is_empty() {
606        soul_builder.rebuild_soul(&candidates).await.is_ok()
607    } else {
608        false
609    };
610
611    // 4. Prune redundant archived raw-activity memories
612    let mut memories_pruned = 0usize;
613    let prune_cutoff = chrono::Utc::now() - chrono::Duration::days(30);
614    for ns in &namespaces {
615        if let Ok(candidates) = repo
616            .list_archived_raw_cleanup_candidates(ns.id, prune_cutoff, 500)
617            .await
618        {
619            if !candidates.is_empty() {
620                let ids: Vec<i64> = candidates.iter().map(|m| m.id).collect();
621                match repo.delete_batch(&ids).await {
622                    Ok(deleted) => memories_pruned += deleted as usize,
623                    Err(e) => {
624                        warn!(
625                            error = %e, count = ids.len(),
626                            "Failed to delete archived raw memories; skipping"
627                        );
628                    }
629                }
630            }
631        }
632    }
633
634    // 5. Cold-cache reindexing across discoverable project roots
635    let mut cold_caches_reindexed = 0usize;
636    // Discover project roots from memory metadata (cwd field)
637    let mut project_roots: HashSet<PathBuf> = HashSet::new();
638    for ns in &namespaces {
639        let filters = nexus_storage::repository::ListMemoryFilters {
640            category: None,
641            since: Some(chrono::Utc::now() - chrono::Duration::days(90)),
642            until: None,
643            content_like: None,
644            include_raw: true,
645            limit: 200,
646            offset: 0,
647        };
648        if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
649            for m in &memories {
650                if let Some(cwd) = m
651                    .metadata
652                    .get("runtime")
653                    .and_then(|r| r.get("cwd"))
654                    .and_then(|v| v.as_str())
655                {
656                    let root = PathBuf::from(cwd);
657                    if root.join(".nexus").exists() {
658                        project_roots.insert(root);
659                    }
660                }
661            }
662        }
663    }
664
665    let reindex_cutoff = chrono::Utc::now() - chrono::Duration::days(7);
666    for root in &project_roots {
667        let nexus_dir = root.join(".nexus");
668        let mut cache = CognitiveCache::load_or_init(&nexus_dir);
669
670        // Collect recent memory IDs across all namespaces for this project
671        let mut fresh_entries: Vec<ColdIndexEntry> = Vec::new();
672        for ns in &namespaces {
673            let filters = nexus_storage::repository::ListMemoryFilters {
674                category: None,
675                since: Some(reindex_cutoff),
676                until: None,
677                content_like: None,
678                include_raw: false,
679                limit: 50,
680                offset: 0,
681            };
682            if let Ok(memories) = repo.list_filtered(ns.id, filters).await {
683                for m in memories {
684                    let cwd_match = m
685                        .metadata
686                        .get("runtime")
687                        .and_then(|r| r.get("cwd"))
688                        .and_then(|v| v.as_str())
689                        .map(|c| Path::new(c) == root.as_path())
690                        .unwrap_or(false);
691                    if cwd_match {
692                        fresh_entries.push(ColdIndexEntry {
693                            memory_id: m.id,
694                            project_relevance: 0.7,
695                            last_surfaced: Some(m.created_at),
696                        });
697                    }
698                }
699            }
700        }
701
702        // Reindex: cold cache is now exactly the fresh set.
703        // This replaces the old merge strategy that accumulated stale entries.
704        cache.cold_index.entries = fresh_entries;
705        cache.cold_index.last_reindexed = Some(chrono::Utc::now());
706
707        match cache.save(&nexus_dir) {
708            Ok(()) if !cache.cold_index.entries.is_empty() => {
709                cold_caches_reindexed += 1;
710            }
711            Ok(()) => {}
712            Err(e) => {
713                tracing::warn!(error = %e, "Failed to save cold cache after reindexing");
714            }
715        }
716    }
717
718    // 6. Update monitor
719    activity_monitor.last_deep_dream = Some(chrono::Utc::now());
720    if let Err(e) = activity_monitor.save() {
721        tracing::error!("Failed to save activity monitor after deep dream: {}", e);
722    }
723
724    Ok(DeepDreamResult {
725        soul_updated,
726        memories_pruned,
727        cross_project_patterns: total_patterns,
728        cold_caches_reindexed,
729    })
730}
731
732/// Extract patterns that appear across multiple projects.
733///
734/// Uses semantic similarity (embeddings) to group conceptually similar memories
735/// rather than brittle exact string matching.
736pub async fn extract_cross_project_patterns(
737    memories_by_project: HashMap<String, Vec<Memory>>,
738    embeddings: Option<&dyn EmbeddingService>,
739    similarity_threshold: f32,
740) -> Vec<SoulCandidate> {
741    // Flatten memories with their project attribution
742    let mut flat_memories: Vec<(Memory, String)> = Vec::new();
743    for (project, memories) in memories_by_project {
744        for m in memories {
745            flat_memories.push((m, project.clone()));
746        }
747    }
748
749    if flat_memories.len() < 2 {
750        return Vec::new();
751    }
752
753    // Collect embeddings (existing or computed)
754    let mut emb_map: HashMap<i64, Vec<f32>> = HashMap::new();
755    let mut to_compute: Vec<usize> = Vec::new();
756
757    for (idx, (m, _)) in flat_memories.iter().enumerate() {
758        if let Some(emb) = &m.content_embedding {
759            emb_map.insert(m.id, emb.clone());
760        } else {
761            to_compute.push(idx);
762        }
763    }
764
765    // Compute missing embeddings in batch if service available
766    if let Some(service) = embeddings {
767        let contents: Vec<String> = to_compute
768            .iter()
769            .map(|&idx| flat_memories[idx].0.content.clone())
770            .collect();
771        if !contents.is_empty() {
772            match service.embed_batch(&contents).await {
773                Ok(results) if results.len() == contents.len() => {
774                    for (idx, emb) in to_compute.into_iter().zip(results) {
775                        let mem_id = flat_memories[idx].0.id;
776                        emb_map.insert(mem_id, emb);
777                    }
778                }
779                Ok(results) => {
780                    tracing::warn!(
781                        "embed_batch returned {} results for {} inputs in pattern extraction",
782                        results.len(),
783                        contents.len()
784                    );
785                }
786                Err(e) => {
787                    tracing::warn!("embed_batch failed in pattern extraction: {}", e);
788                }
789            }
790        }
791    }
792
793    // If we don't have embeddings for at least some memories, semantic grouping
794    // won't work well. Fall back to simple exact-match grouping on lowercased content.
795    if emb_map.len() < flat_memories.len() / 2 {
796        // Fallback: original algorithm (exact match after lowercase)
797        let mut pattern_map: HashMap<String, (u32, Vec<String>)> = HashMap::new();
798        for (m, project) in &flat_memories {
799            let normalized = m.content.to_lowercase();
800            let entry = pattern_map.entry(normalized).or_insert((0, Vec::new()));
801            entry.0 += 1;
802            if !entry.1.contains(project) {
803                entry.1.push(project.clone());
804            }
805        }
806        return pattern_map
807            .into_iter()
808            .filter(|(_, (_count, projects))| projects.len() >= 2)
809            .map(|(content, (count, projects))| SoulCandidate {
810                content,
811                source_project: projects.join(", "),
812                observation_count: count,
813                category: "TechnicalLearning".into(),
814                source_agent: "nexus-dream-engine".into(),
815            })
816            .collect();
817    }
818
819    // Union-find for clustering
820    let n = flat_memories.len();
821    let mut parent: Vec<usize> = (0..n).collect();
822
823    fn find(mut x: usize, parent: &mut [usize]) -> usize {
824        let mut root = x;
825        while parent[root] != root {
826            root = parent[root];
827        }
828        while parent[x] != root {
829            let next = parent[x];
830            parent[x] = root;
831            x = next;
832        }
833        root
834    }
835
836    fn union(x: usize, y: usize, parent: &mut [usize]) {
837        let rx = find(x, parent);
838        let ry = find(y, parent);
839        if rx != ry {
840            parent[ry] = rx;
841        }
842    }
843
844    // Compare each pair with available embeddings
845    let indices: Vec<usize> = (0..n)
846        .filter(|i| emb_map.contains_key(&flat_memories[*i].0.id))
847        .collect();
848    for &i in &indices {
849        let emb_i = emb_map.get(&flat_memories[i].0.id).unwrap();
850        for &j in indices.iter().filter(|&&j| j > i) {
851            let emb_j = emb_map.get(&flat_memories[j].0.id).unwrap();
852            if cosine_similarity(emb_i, emb_j) >= similarity_threshold {
853                union(i, j, &mut parent);
854            }
855        }
856    }
857
858    // Build clusters
859    let mut clusters: HashMap<usize, Vec<usize>> = HashMap::new();
860    for i in 0..n {
861        let root = find(i, &mut parent);
862        clusters.entry(root).or_default().push(i);
863    }
864
865    // Convert clusters to SoulCandidates
866    let mut candidates = Vec::new();
867    for indices in clusters.values() {
868        let mut projects_set: HashSet<String> = HashSet::new();
869        for &idx in indices {
870            projects_set.insert(flat_memories[idx].1.clone());
871        }
872        if projects_set.len() >= 2 {
873            // Choose representative: longest content (most descriptive)
874            let (rep_idx, _) = indices
875                .iter()
876                .map(|&idx| (idx, flat_memories[idx].0.content.len()))
877                .max_by_key(|(_, len)| *len)
878                .unwrap();
879            let content = flat_memories[rep_idx].0.content.clone();
880            let observation_count = indices.len() as u32;
881            let source_project = projects_set.into_iter().collect::<Vec<String>>().join(", ");
882            candidates.push(SoulCandidate {
883                content,
884                source_project,
885                observation_count,
886                category: "TechnicalLearning".into(),
887                source_agent: "nexus-dream-engine".into(),
888            });
889        }
890    }
891
892    candidates
893}
894
895// ── Adaptive scheduling ───────────────────────────────────────────────
896
897pub(crate) fn choose_dream_schedule(
898    signals: &DreamSignals,
899    reason: RuntimeShutdownReason,
900) -> DreamSchedulePlan {
901    // No signals at all -> skip
902    if signals.total_non_raw_count == 0
903        && signals.raw_event_count == 0
904        && signals.explicit_count == 0
905        && signals.derived_count == 0
906        && signals.contradiction_count == 0
907    {
908        return DreamSchedulePlan {
909            action: DreamScheduleAction::Skip,
910            reason: "no signals",
911        };
912    }
913
914    // --- Density-based decisions (when density is nonzero) ---
915    if signals.contradiction_density > 0.0 {
916        // High contradiction density -> immediate regardless of reason
917        if signals.contradiction_density > 0.15 {
918            return DreamSchedulePlan {
919                action: DreamScheduleAction::ImmediateBounded,
920                reason: "high contradiction density",
921            };
922        }
923
924        if reason == RuntimeShutdownReason::SessionEnded && signals.contradiction_density >= 0.10 {
925            return DreamSchedulePlan {
926                action: DreamScheduleAction::ImmediateBounded,
927                reason: "moderate contradiction density at session end",
928            };
929        }
930
931        // Low density with contradictions: defer to IdleTimeout or DelayedEnqueue
932        if reason == RuntimeShutdownReason::IdleTimeout {
933            if signals.explicit_count > 0 || signals.derived_count > 0 {
934                return DreamSchedulePlan {
935                    action: DreamScheduleAction::DelayedEnqueue,
936                    reason: "delays idle medium signal sessions",
937                };
938            }
939            return DreamSchedulePlan {
940                action: DreamScheduleAction::Skip,
941                reason: "idle without signal",
942            };
943        }
944
945        // Density present but not high enough for immediate -> fall through to
946        // count-based or explicit/derived logic below only at session end.
947        if reason == RuntimeShutdownReason::SessionEnded
948            && (signals.explicit_count > 0 || signals.derived_count > 0)
949        {
950            return DreamSchedulePlan {
951                action: DreamScheduleAction::ImmediateBounded,
952                reason: "session end flushes explicit reflection",
953            };
954        }
955
956        // Fallback for density-present cases: DelayedEnqueue or DigestOnly
957        if signals.has_digest_gap && signals.raw_event_count > 0 {
958            return DreamSchedulePlan {
959                action: DreamScheduleAction::DigestOnly,
960                reason: "digest only for light digest gap",
961            };
962        }
963        return DreamSchedulePlan {
964            action: DreamScheduleAction::DelayedEnqueue,
965            reason: "default_background",
966        };
967    }
968
969    // --- Count-based decisions (density is zero) ---
970    if signals.contradiction_count > 0 {
971        return DreamSchedulePlan {
972            action: DreamScheduleAction::ImmediateBounded,
973            reason: "contradiction detected",
974        };
975    }
976
977    if reason == RuntimeShutdownReason::SessionEnded
978        && (signals.explicit_count > 0 || signals.derived_count > 0)
979    {
980        return DreamSchedulePlan {
981            action: DreamScheduleAction::ImmediateBounded,
982            reason: "session end flushes explicit reflection",
983        };
984    }
985
986    if signals.has_digest_gap && signals.raw_event_count > 0 {
987        return DreamSchedulePlan {
988            action: DreamScheduleAction::DigestOnly,
989            reason: "digest only for light digest gap",
990        };
991    }
992
993    if reason == RuntimeShutdownReason::IdleTimeout {
994        if signals.explicit_count > 0 || signals.derived_count > 0 {
995            return DreamSchedulePlan {
996                action: DreamScheduleAction::DelayedEnqueue,
997                reason: "delays idle medium signal sessions",
998            };
999        }
1000        return DreamSchedulePlan {
1001            action: DreamScheduleAction::Skip,
1002            reason: "idle without signal",
1003        };
1004    }
1005
1006    DreamSchedulePlan {
1007        action: DreamScheduleAction::DelayedEnqueue,
1008        reason: "default_background",
1009    }
1010}
1011
1012pub(crate) async fn compute_adaptive_dream_interval(
1013    pool: sqlx::SqlitePool,
1014    namespace_id: i64,
1015    base_interval_secs: u64,
1016    cognition: &CognitionConfig,
1017) -> Duration {
1018    let repo = MemoryRepository::new(pool);
1019    let filters = nexus_storage::repository::ListMemoryFilters {
1020        category: None,
1021        since: None,
1022        until: None,
1023        content_like: None,
1024        include_raw: true,
1025        limit: 100,
1026        offset: 0,
1027    };
1028
1029    if let Ok(recent_memories) = repo.list_filtered(namespace_id, filters).await {
1030        if recent_memories.is_empty() {
1031            return Duration::from_secs(base_interval_secs.clamp(
1032                cognition.adaptive_dream_min_interval_secs,
1033                cognition.adaptive_dream_max_interval_secs,
1034            ));
1035        }
1036
1037        let contradiction_count = recent_memories
1038            .iter()
1039            .filter(|m| {
1040                // Count if explicitly marked as Contradiction level
1041                if nexus_core::cognitive_level_from_metadata(&m.metadata)
1042                    == CognitiveLevel::Contradiction
1043                {
1044                    return true;
1045                }
1046                // Or if times_contradicted field indicates contradiction
1047                if let Some(cog) = nexus_core::CognitiveMetadata::from_metadata(&m.metadata) {
1048                    if cog.times_contradicted > 0 {
1049                        return true;
1050                    }
1051                }
1052                false
1053            })
1054            .count();
1055
1056        let density = contradiction_count as f32 / recent_memories.len() as f32;
1057
1058        let multiplier = if density > 0.10 {
1059            0.5
1060        } else if recent_memories.len() < 5 {
1061            2.0
1062        } else {
1063            1.0
1064        };
1065
1066        let interval = (base_interval_secs as f32 * multiplier) as u64;
1067        let interval = interval.clamp(
1068            cognition.adaptive_dream_min_interval_secs,
1069            cognition.adaptive_dream_max_interval_secs,
1070        );
1071
1072        return Duration::from_secs(interval);
1073    }
1074
1075    Duration::from_secs(base_interval_secs.clamp(
1076        cognition.adaptive_dream_min_interval_secs,
1077        cognition.adaptive_dream_max_interval_secs,
1078    ))
1079}