Skip to main content

nexus_memory_agent/
runtime.rs

1//! Session-scoped runtime controller for hook-driven cognition.
2//!
3//! This module owns the `RuntimeController` lifecycle (start / shutdown),
4//! the embedding service factory, and the LLM client fallback.  All heavy
5//! cognition logic is delegated to the sibling modules:
6//!
7//! - [`dream_cycle`]     — dream-cycle orchestration, signal collection, scheduling
8//! - [`job_processor`]   — derive / reflect / digest job handlers
9//! - [`distill`]         — activity distillation pipeline (LLM summarisation)
10//! - [`runtime_state`]   — state persistence, session-key derivation, helper types
11
12use std::sync::Arc;
13
14use async_trait::async_trait;
15use chrono::Utc;
16use nexus_core::config::{AgentConfig, CognitionConfig};
17use nexus_core::traits::EmbeddingService;
18use nexus_core::PerspectiveKey;
19use nexus_llm::{create_client_auto_with_fallback, GenerateParams, GenerateResponse, LlmClient};
20use nexus_storage::repository::{MemoryRepository, NamespaceRepository};
21use nexus_storage::StorageManager;
22use tracing::{debug, info, warn};
23
24use crate::dream_cycle::{self, DreamCycleRequest, DreamScheduleAction};
25use crate::error::AgentError;
26use crate::job_processor;
27use crate::runtime_state::{self, RuntimeState};
28
29// ── Re-exports (public API stability) ─────────────────────────────────
30pub use crate::runtime_state::{derive_session_key, RuntimeMode, RuntimeShutdownReason};
31
32// ── RuntimeController ─────────────────────────────────────────────────
33
34pub struct RuntimeController {
35    cognition: CognitionConfig,
36    agent: AgentConfig,
37    embeddings: Option<Arc<dyn EmbeddingService>>,
38}
39
40impl RuntimeController {
41    pub fn new(
42        cognition: CognitionConfig,
43        agent: AgentConfig,
44        embeddings: Option<Arc<dyn EmbeddingService>>,
45    ) -> Self {
46        Self {
47            cognition,
48            agent,
49            embeddings,
50        }
51    }
52
53    pub async fn ensure_started(
54        &self,
55        agent_type: &str,
56        session_key: Option<&str>,
57        cwd: Option<&str>,
58        mode: RuntimeMode,
59    ) -> Result<(), AgentError> {
60        if !self.cognition.auto_runtime_enabled {
61            return Ok(());
62        }
63
64        let config =
65            nexus_core::Config::from_env().map_err(|e| AgentError::Config(e.to_string()))?;
66        let mut storage = StorageManager::from_url(&config.database_url())
67            .await
68            .map_err(|e| AgentError::Storage(e.to_string()))?;
69        storage
70            .initialize()
71            .await
72            .map_err(|e| AgentError::Storage(e.to_string()))?;
73
74        let session_key = derive_session_key(agent_type, session_key, cwd);
75        let path = runtime_state::state_file_path(agent_type, &session_key)?;
76        let now = Utc::now();
77
78        let mut state = runtime_state::read_runtime_state(&path)?.unwrap_or(RuntimeState {
79            agent_type: agent_type.to_string(),
80            session_key: session_key.clone(),
81            mode: mode.into(),
82            started_at: now,
83            updated_at: now,
84        });
85
86        if (now - state.updated_at).num_seconds() > self.cognition.runtime_idle_timeout_secs as i64
87        {
88            state.started_at = now;
89        }
90        state.updated_at = now;
91        state.mode = mode.into();
92
93        runtime_state::write_runtime_state(&path, &state)?;
94        let llm = runtime_llm_client();
95        let processed = dream_cycle::drain_cognition_jobs(
96            storage.pool().clone(),
97            runtime_state::namespace_id_for(agent_type, &storage).await?,
98            &self.cognition,
99            &self.agent,
100            llm,
101            self.embeddings.clone(),
102            &format!("runtime-start-{agent_type}-{}", state.session_key),
103        )
104        .await?;
105        debug!(
106            agent_type,
107            processed_jobs = processed,
108            "Runtime startup cognition drain complete"
109        );
110        debug!(agent_type, session_key = %state.session_key, "Runtime state ensured");
111        Ok(())
112    }
113
114    pub async fn flush_and_shutdown(
115        &self,
116        agent_type: &str,
117        session_key: Option<&str>,
118        cwd: Option<&str>,
119        reason: RuntimeShutdownReason,
120    ) -> Result<(), AgentError> {
121        if !self.cognition.auto_runtime_enabled {
122            return Ok(());
123        }
124
125        let config =
126            nexus_core::Config::from_env().map_err(|e| AgentError::Config(e.to_string()))?;
127        let mut storage = StorageManager::from_url(&config.database_url())
128            .await
129            .map_err(|e| AgentError::Storage(e.to_string()))?;
130        storage
131            .initialize()
132            .await
133            .map_err(|e| AgentError::Storage(e.to_string()))?;
134
135        let namespace_repo = NamespaceRepository::new(storage.pool().clone());
136        let namespace = namespace_repo
137            .get_or_create(agent_type, agent_type)
138            .await
139            .map_err(|e| AgentError::Storage(e.to_string()))?;
140
141        let memory_repo = MemoryRepository::new(storage.pool().clone());
142        let derived_session_key = derive_session_key(agent_type, session_key, cwd);
143        runtime_state::store_runtime_marker(
144            &memory_repo,
145            namespace.id,
146            runtime_state::RuntimeMarker {
147                agent_type,
148                session_key,
149                cwd,
150                event: "runtime_session_end",
151                detail: runtime_state::runtime_reason_label(reason),
152                agent_namespace: self.agent.namespace.as_str(),
153            },
154        )
155        .await?;
156
157        let llm = runtime_llm_client();
158        let processed = dream_cycle::drain_cognition_jobs(
159            storage.pool().clone(),
160            namespace.id,
161            &self.cognition,
162            &self.agent,
163            llm.clone(),
164            self.embeddings.clone(),
165            &format!("runtime-stop-{agent_type}-{derived_session_key}"),
166        )
167        .await?;
168        debug!(
169            agent_type,
170            processed_jobs = processed,
171            "Runtime shutdown cognition drain complete"
172        );
173
174        if self.cognition.dream_on_session_end {
175            let lease_owner = format!("runtime-dream-{agent_type}-{derived_session_key}");
176            let shutdown_perspective = PerspectiveKey {
177                observer: agent_type.to_string(),
178                subject: agent_type.to_string(),
179                session_key: Some(derived_session_key.clone()),
180            };
181            let signals = dream_cycle::collect_dream_signals(
182                &memory_repo,
183                namespace.id,
184                &derived_session_key,
185            )
186            .await?;
187            let plan = dream_cycle::choose_dream_schedule(&signals, reason);
188            debug!(
189                agent_type,
190                session_key = ?session_key,
191                action = ?plan.action,
192                plan_reason = plan.reason,
193                raw_event_count = signals.raw_event_count,
194                contradiction_count = signals.contradiction_count,
195                contradiction_density = signals.contradiction_density,
196                total_non_raw = signals.total_non_raw_count,
197                has_digest_gap = signals.has_digest_gap,
198                "Selected adaptive dream schedule"
199            );
200            match plan.action {
201                DreamScheduleAction::ImmediateBounded => match tokio::time::timeout(
202                    std::time::Duration::from_secs(self.cognition.session_end_dream_timeout_secs),
203                    dream_cycle::run_dream_cycle(
204                        storage.pool().clone(),
205                        &self.cognition,
206                        &self.agent,
207                        llm,
208                        self.embeddings.clone(),
209                        DreamCycleRequest {
210                            namespace_id: namespace.id,
211                            lease_owner: &lease_owner,
212                            perspective: Some(&shutdown_perspective),
213                            session_key: Some(derived_session_key.as_str()),
214                            reflect_reason: "session_end_dream",
215                            digest_reason: "session_end",
216                        },
217                    ),
218                )
219                .await
220                {
221                    Ok(Ok(processed)) if processed > 0 => {
222                        info!(
223                            agent_type,
224                            session_key = ?session_key,
225                            processed,
226                            plan_reason = plan.reason,
227                            "Dream pass completed through cognition jobs"
228                        );
229                    }
230                    Ok(Ok(_)) => {
231                        debug!(
232                            agent_type,
233                            session_key = ?session_key,
234                            plan_reason = plan.reason,
235                            "Dream pass skipped"
236                        );
237                    }
238                    Ok(Err(error)) => {
239                        warn!(%error, agent_type, session_key = ?session_key, "Dream pass failed");
240                    }
241                    Err(_) => {
242                        warn!(
243                            agent_type,
244                            session_key = ?session_key,
245                            timeout_secs = self.cognition.session_end_dream_timeout_secs,
246                            "Dream pass timed out during shutdown"
247                        );
248                    }
249                },
250                DreamScheduleAction::DelayedEnqueue => {
251                    let queued = dream_cycle::enqueue_dream_jobs(
252                        &memory_repo,
253                        namespace.id,
254                        Some(&shutdown_perspective),
255                        Some(derived_session_key.as_str()),
256                        "session_end_delayed_dream",
257                        "session_end",
258                    )
259                    .await?;
260                    debug!(
261                        agent_type,
262                        session_key = ?session_key,
263                        queued,
264                        plan_reason = plan.reason,
265                        "Queued delayed dream jobs without immediate drain"
266                    );
267                }
268                DreamScheduleAction::DigestOnly => {
269                    let queued = job_processor::enqueue_digest_job_if_absent(
270                        &memory_repo,
271                        namespace.id,
272                        derived_session_key.as_str(),
273                        "session_end_digest_only",
274                    )
275                    .await?;
276                    debug!(
277                        agent_type,
278                        session_key = ?session_key,
279                        queued,
280                        plan_reason = plan.reason,
281                        "Queued digest-only shutdown work"
282                    );
283                    if queued && matches!(reason, RuntimeShutdownReason::SessionEnded) {
284                        match tokio::time::timeout(
285                            std::time::Duration::from_secs(
286                                self.cognition.session_end_dream_timeout_secs,
287                            ),
288                            dream_cycle::drain_cognition_jobs(
289                                storage.pool().clone(),
290                                namespace.id,
291                                &self.cognition,
292                                &self.agent,
293                                llm.clone(),
294                                self.embeddings.clone(),
295                                &format!("runtime-finalize-{agent_type}-{derived_session_key}"),
296                            ),
297                        )
298                        .await
299                        {
300                            Ok(Ok(processed)) => {
301                                debug!(
302                                    agent_type,
303                                    session_key = ?session_key,
304                                    processed,
305                                    plan_reason = plan.reason,
306                                    "Drained digest-only shutdown work before runtime teardown"
307                                );
308                            }
309                            Ok(Err(error)) => {
310                                warn!(
311                                    %error,
312                                    agent_type,
313                                    session_key = ?session_key,
314                                    "Digest-only shutdown drain failed"
315                                );
316                            }
317                            Err(_) => {
318                                warn!(
319                                    agent_type,
320                                    session_key = ?session_key,
321                                    timeout_secs = self.cognition.session_end_dream_timeout_secs,
322                                    "Digest-only shutdown drain timed out"
323                                );
324                            }
325                        }
326                    }
327                }
328                DreamScheduleAction::Skip => {
329                    debug!(
330                        agent_type,
331                        session_key = ?session_key,
332                        plan_reason = plan.reason,
333                        "Skipped shutdown dream work after adaptive planning"
334                    );
335                }
336            }
337        }
338
339        let path = runtime_state::state_file_path(agent_type, &derived_session_key)?;
340        if path.exists() {
341            std::fs::remove_file(&path)?;
342        }
343
344        Ok(())
345    }
346
347    pub fn state_root() -> std::path::PathBuf {
348        runtime_state::state_root()
349    }
350}
351
352// ── Embedding service factory ─────────────────────────────────────────
353
354/// Create an embedding service when `config.embedding.enabled` is true.
355///
356/// Returns `None` when embeddings are disabled or the service fails to
357/// initialise — this preserves graceful degradation throughout the
358/// cognition pipeline.
359pub async fn create_embedding_service(
360    config: &nexus_core::Config,
361) -> Option<Arc<dyn EmbeddingService>> {
362    if !config.embedding.enabled {
363        return None;
364    }
365    match nexus_embeddings::create_service(config).await {
366        Ok(Some(service)) => Some(service),
367        Ok(None) => None,
368        Err(error) => {
369            warn!(
370                %error,
371                "Failed to initialize embedding service; configured LLM features remain available and cognition will run without semantic embeddings. Configure a remote embedding provider, local OpenAI-compatible runtime, or set NEXUS_EMBEDDINGS_ENABLED=false"
372            );
373            None
374        }
375    }
376}
377
378// ── LLM client fallback ──────────────────────────────────────────────
379
380fn runtime_llm_client() -> Arc<dyn LlmClient> {
381    match create_client_auto_with_fallback() {
382        Ok(client) => client,
383        Err(error) => {
384            warn!(%error, "LLM unavailable, using deterministic cognition fallbacks");
385            Arc::new(UnavailableLlmClient {
386                message: error.to_string(),
387            })
388        }
389    }
390}
391
392struct UnavailableLlmClient {
393    message: String,
394}
395
396#[async_trait]
397impl LlmClient for UnavailableLlmClient {
398    async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
399        Err(nexus_llm::LlmError::InvalidJsonResponse(
400            self.message.clone(),
401        ))
402    }
403
404    fn provider_name(&self) -> String {
405        "unavailable".to_string()
406    }
407
408    fn model_name(&self) -> String {
409        "deterministic-fallback".to_string()
410    }
411}
412
413// ── Tests ─────────────────────────────────────────────────────────────
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use crate::dream_cycle::{
419        choose_dream_schedule, collect_dream_signals, enqueue_dream_jobs, DreamScheduleAction,
420        DreamSignals,
421    };
422    use crate::job_processor::{
423        digest_job_is_forced, process_digest_jobs, DIGEST_SESSION_JOB, REFLECT_NAMESPACE_JOB,
424        REFLECT_PERSPECTIVE_JOB,
425    };
426    use crate::runtime_state::{derive_session_key, sanitize_component, RuntimeShutdownReason};
427    use nexus_core::config::{AgentConfig, CognitionConfig};
428    use nexus_core::{CognitiveLevel, MemoryCategory};
429    use nexus_storage::models::memory_job_status;
430    use nexus_storage::repository::{MemoryRepository, NamespaceRepository, StoreMemoryParams};
431    use serde_json::json;
432    use sqlx::sqlite::SqlitePoolOptions;
433    use std::sync::Arc;
434
435    #[tokio::test]
436    async fn run_dream_cycle_processes_namespace_jobs() {
437        let pool = SqlitePoolOptions::new()
438            .max_connections(1)
439            .connect("sqlite::memory:")
440            .await
441            .unwrap();
442        nexus_storage::migrations::run_migrations(&pool)
443            .await
444            .unwrap();
445
446        let namespace_repo = NamespaceRepository::new(pool.clone());
447        let namespace = namespace_repo
448            .get_or_create("runtime-dream-test", "runtime-dream-test")
449            .await
450            .unwrap();
451        let repo = MemoryRepository::new(pool.clone());
452
453        for content in ["feature enabled", "feature not enabled"] {
454            repo.store(StoreMemoryParams {
455                namespace_id: namespace.id,
456                content,
457                category: &MemoryCategory::Facts,
458                memory_lane_type: None,
459                labels: &[],
460                metadata: &json!({
461                    "cognitive": {
462                        "level": "explicit",
463                        "observer": "claude-code",
464                        "subject": "claude-code",
465                        "generated_by": "test"
466                    }
467                }),
468                embedding: None,
469                embedding_model: None,
470            })
471            .await
472            .unwrap();
473        }
474
475        let processed = dream_cycle::run_dream_cycle(
476            pool.clone(),
477            &CognitionConfig::default(),
478            &AgentConfig::default(),
479            Arc::new(UnavailableLlmClient {
480                message: "offline".to_string(),
481            }),
482            None,
483            DreamCycleRequest {
484                namespace_id: namespace.id,
485                lease_owner: "test-owner",
486                perspective: None,
487                session_key: None,
488                reflect_reason: "namespace_dream",
489                digest_reason: "dream_digest",
490            },
491        )
492        .await
493        .unwrap();
494
495        assert!(processed >= 1);
496        assert_eq!(
497            repo.get_by_cognitive_level(namespace.id, CognitiveLevel::Contradiction, 10)
498                .await
499                .unwrap()
500                .len(),
501            1
502        );
503        assert!(repo
504            .list_jobs(
505                namespace.id,
506                Some(REFLECT_NAMESPACE_JOB),
507                Some("pending"),
508                10,
509                0
510            )
511            .await
512            .unwrap()
513            .is_empty());
514    }
515
516    #[tokio::test]
517    async fn process_digest_jobs_skips_below_rollover_threshold() {
518        let pool = SqlitePoolOptions::new()
519            .max_connections(1)
520            .connect("sqlite::memory:")
521            .await
522            .unwrap();
523        nexus_storage::migrations::run_migrations(&pool)
524            .await
525            .unwrap();
526
527        let namespace_repo = NamespaceRepository::new(pool.clone());
528        let namespace = namespace_repo
529            .get_or_create("runtime-digest-skip", "runtime-digest-skip")
530            .await
531            .unwrap();
532        let repo = MemoryRepository::new(pool.clone());
533
534        let source = repo
535            .store(StoreMemoryParams {
536                namespace_id: namespace.id,
537                content: "Small explicit update.",
538                category: &MemoryCategory::Session,
539                memory_lane_type: None,
540                labels: &[],
541                metadata: &serde_json::json!({
542                    "cognitive": {
543                        "level": "explicit",
544                        "observer": "claude-code",
545                        "subject": "claude-code",
546                        "session_key": "digest-skip-session"
547                    }
548                }),
549                embedding: None,
550                embedding_model: None,
551            })
552            .await
553            .unwrap();
554
555        let prior_digest = repo
556            .store(StoreMemoryParams {
557                namespace_id: namespace.id,
558                content: "Prior digest",
559                category: &MemoryCategory::Session,
560                memory_lane_type: None,
561                labels: &[],
562                metadata: &serde_json::json!({
563                    "cognitive": {
564                        "level": "summary_short",
565                        "observer": "claude-code",
566                        "subject": "claude-code",
567                        "session_key": "digest-skip-session"
568                    }
569                }),
570                embedding: None,
571                embedding_model: None,
572            })
573            .await
574            .unwrap();
575
576        repo.store_digest(nexus_storage::repository::StoreDigestParams {
577            namespace_id: namespace.id,
578            session_key: "digest-skip-session",
579            digest_kind: "short",
580            memory_id: prior_digest.id,
581            start_memory_id: Some(source.id),
582            end_memory_id: Some(source.id),
583            token_count: 12,
584        })
585        .await
586        .unwrap();
587
588        let follow_up = repo
589            .store(StoreMemoryParams {
590                namespace_id: namespace.id,
591                content: "Tiny follow-up.",
592                category: &MemoryCategory::Session,
593                memory_lane_type: None,
594                labels: &[],
595                metadata: &serde_json::json!({
596                    "cognitive": {
597                        "level": "explicit",
598                        "observer": "claude-code",
599                        "subject": "claude-code",
600                        "session_key": "digest-skip-session"
601                    }
602                }),
603                embedding: None,
604                embedding_model: None,
605            })
606            .await
607            .unwrap();
608
609        repo.enqueue_job(nexus_storage::EnqueueJobParams {
610            namespace_id: namespace.id,
611            job_type: DIGEST_SESSION_JOB,
612            priority: 90,
613            perspective: None,
614            payload: &serde_json::json!({
615                "session_key": "digest-skip-session",
616                "source_memory_id": follow_up.id
617            }),
618        })
619        .await
620        .unwrap();
621
622        assert_eq!(
623            repo.list_jobs(
624                namespace.id,
625                Some(DIGEST_SESSION_JOB),
626                Some("pending"),
627                10,
628                0
629            )
630            .await
631            .unwrap()
632            .len(),
633            1
634        );
635
636        let processed = process_digest_jobs(
637            &repo,
638            namespace.id,
639            &CognitionConfig::default(),
640            &AgentConfig::default(),
641            Arc::new(UnavailableLlmClient {
642                message: "offline".to_string(),
643            }),
644            None,
645            "digest-skip",
646        )
647        .await
648        .unwrap();
649
650        assert_eq!(processed, 1);
651        let latest = repo
652            .latest_digest_for_session(namespace.id, "digest-skip-session", "short")
653            .await
654            .unwrap()
655            .unwrap();
656        assert_eq!(latest.id, prior_digest.id);
657        assert_eq!(
658            repo.count_digests(namespace.id, Some("digest-skip-session"))
659                .await
660                .unwrap(),
661            1
662        );
663    }
664
665    #[test]
666    fn digest_job_force_reason_matches_expected_inputs() {
667        assert!(digest_job_is_forced(Some("dream_digest")));
668        assert!(digest_job_is_forced(Some("session_end")));
669        assert!(digest_job_is_forced(Some("manual_digest")));
670        assert!(digest_job_is_forced(Some("manual_rebuild")));
671        assert!(!digest_job_is_forced(Some("derive_follow_up")));
672        assert!(!digest_job_is_forced(None));
673    }
674
675    #[test]
676    fn sanitize_component_replaces_unsafe_chars() {
677        assert_eq!(sanitize_component("claude/code:1"), "claude_code_1");
678    }
679
680    #[test]
681    fn derive_session_key_prefers_explicit_key() {
682        assert_eq!(
683            derive_session_key("claude-code", Some("abc"), Some("/tmp/project")),
684            "abc"
685        );
686    }
687
688    #[test]
689    fn derive_session_key_falls_back_to_stable_hash() {
690        let first = derive_session_key("claude-code", None, Some("/tmp/project"));
691        let second = derive_session_key("claude-code", Some(""), Some("/tmp/project"));
692        assert_eq!(first, second);
693        assert!(first.starts_with("derived-"));
694    }
695
696    #[test]
697    fn choose_dream_schedule_immediate_for_contradictions() {
698        let plan = choose_dream_schedule(
699            &DreamSignals {
700                contradiction_count: 1,
701                raw_event_count: 1,
702                ..DreamSignals::default()
703            },
704            RuntimeShutdownReason::SessionEnded,
705        );
706
707        assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
708    }
709
710    #[test]
711    fn choose_dream_schedule_digest_only_for_light_digest_gap() {
712        let plan = choose_dream_schedule(
713            &DreamSignals {
714                raw_event_count: 1,
715                has_digest_gap: true,
716                ..DreamSignals::default()
717            },
718            RuntimeShutdownReason::SessionEnded,
719        );
720
721        assert_eq!(plan.action, DreamScheduleAction::DigestOnly);
722    }
723
724    #[test]
725    fn choose_dream_schedule_delays_idle_medium_signal_sessions() {
726        let plan = choose_dream_schedule(
727            &DreamSignals {
728                raw_event_count: 2,
729                explicit_count: 2,
730                derived_count: 0,
731                ..DreamSignals::default()
732            },
733            RuntimeShutdownReason::IdleTimeout,
734        );
735
736        assert_eq!(plan.action, DreamScheduleAction::DelayedEnqueue);
737    }
738
739    #[test]
740    fn choose_dream_schedule_session_end_flushes_explicit_reflection() {
741        let plan = choose_dream_schedule(
742            &DreamSignals {
743                explicit_count: 2,
744                has_digest_gap: true,
745                ..DreamSignals::default()
746            },
747            RuntimeShutdownReason::SessionEnded,
748        );
749
750        assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
751    }
752
753    #[tokio::test]
754    async fn collect_dream_signals_counts_session_signal_types() {
755        let pool = SqlitePoolOptions::new()
756            .max_connections(1)
757            .connect("sqlite::memory:")
758            .await
759            .unwrap();
760        nexus_storage::migrations::run_migrations(&pool)
761            .await
762            .unwrap();
763
764        let namespace_repo = NamespaceRepository::new(pool.clone());
765        let namespace = namespace_repo
766            .get_or_create("runtime-dream-signals", "runtime-dream-signals")
767            .await
768            .unwrap();
769        let repo = MemoryRepository::new(pool.clone());
770
771        for (content, labels, metadata) in [
772            (
773                "tool event",
774                vec!["raw-activity".to_string()],
775                json!({
776                    "raw_activity": true,
777                    "cognitive": {
778                        "level": "raw",
779                        "observer": "claude-code",
780                        "subject": "claude-code",
781                        "session_key": "signals-session"
782                    }
783                }),
784            ),
785            (
786                "explicit note",
787                vec![],
788                json!({
789                    "cognitive": {
790                        "level": "explicit",
791                        "observer": "claude-code",
792                        "subject": "claude-code",
793                        "session_key": "signals-session"
794                    }
795                }),
796            ),
797            (
798                "derived insight",
799                vec![],
800                json!({
801                    "cognitive": {
802                        "level": "derived",
803                        "observer": "claude-code",
804                        "subject": "claude-code",
805                        "session_key": "signals-session"
806                    }
807                }),
808            ),
809            (
810                "contradiction record",
811                vec![],
812                json!({
813                    "cognitive": {
814                        "level": "contradiction",
815                        "observer": "claude-code",
816                        "subject": "claude-code",
817                        "session_key": "signals-session"
818                    }
819                }),
820            ),
821        ] {
822            repo.store(StoreMemoryParams {
823                namespace_id: namespace.id,
824                content,
825                category: &MemoryCategory::Session,
826                memory_lane_type: None,
827                labels: &labels,
828                metadata: &metadata,
829                embedding: None,
830                embedding_model: None,
831            })
832            .await
833            .unwrap();
834        }
835
836        let signals = collect_dream_signals(&repo, namespace.id, "signals-session")
837            .await
838            .unwrap();
839
840        assert_eq!(signals.raw_event_count, 1);
841        assert_eq!(signals.explicit_count, 1);
842        assert_eq!(signals.derived_count, 1);
843        assert_eq!(signals.contradiction_count, 1);
844        assert!(signals.has_digest_gap);
845    }
846
847    #[tokio::test]
848    async fn enqueue_dream_jobs_coalesces_session_scoped_shutdown_work() {
849        let pool = SqlitePoolOptions::new()
850            .max_connections(1)
851            .connect("sqlite::memory:")
852            .await
853            .unwrap();
854        nexus_storage::migrations::run_migrations(&pool)
855            .await
856            .unwrap();
857
858        let namespace_repo = NamespaceRepository::new(pool.clone());
859        let namespace = namespace_repo
860            .get_or_create("runtime-dream-dedupe", "runtime-dream-dedupe")
861            .await
862            .unwrap();
863        let repo = MemoryRepository::new(pool.clone());
864        let perspective = nexus_core::PerspectiveKey {
865            observer: "claude-code".to_string(),
866            subject: "claude-code".to_string(),
867            session_key: Some("session-123".to_string()),
868        };
869
870        let first = enqueue_dream_jobs(
871            &repo,
872            namespace.id,
873            Some(&perspective),
874            Some("session-123"),
875            "session_end_dream",
876            "session_end",
877        )
878        .await
879        .unwrap();
880        let second = enqueue_dream_jobs(
881            &repo,
882            namespace.id,
883            Some(&perspective),
884            Some("session-123"),
885            "session_end_dream",
886            "session_end",
887        )
888        .await
889        .unwrap();
890
891        assert_eq!(first, 2);
892        assert_eq!(second, 0);
893        assert_eq!(
894            repo.list_jobs(
895                namespace.id,
896                Some(REFLECT_PERSPECTIVE_JOB),
897                Some(memory_job_status::PENDING),
898                10,
899                0
900            )
901            .await
902            .unwrap()
903            .len(),
904            1
905        );
906        assert_eq!(
907            repo.list_jobs(
908                namespace.id,
909                Some(REFLECT_NAMESPACE_JOB),
910                Some(memory_job_status::PENDING),
911                10,
912                0
913            )
914            .await
915            .unwrap()
916            .len(),
917            0
918        );
919        assert_eq!(
920            repo.list_jobs(
921                namespace.id,
922                Some(DIGEST_SESSION_JOB),
923                Some(memory_job_status::PENDING),
924                10,
925                0
926            )
927            .await
928            .unwrap()
929            .len(),
930            1
931        );
932    }
933
934    // ---- Adaptive dream scheduling tests (Phase 16) ----
935
936    #[test]
937    fn test_choose_dream_schedule_uses_contradiction_density() {
938        // No signals → skip.
939        let plan = choose_dream_schedule(
940            &DreamSignals::default(),
941            RuntimeShutdownReason::SessionEnded,
942        );
943        assert_eq!(plan.action, DreamScheduleAction::Skip);
944
945        // High contradiction density → immediate.
946        let high_density = DreamSignals {
947            total_non_raw_count: 10,
948            contradiction_count: 3,
949            contradiction_density: 0.30,
950            ..DreamSignals::default()
951        };
952        let plan = choose_dream_schedule(&high_density, RuntimeShutdownReason::SessionEnded);
953        assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
954        assert!(plan.reason.contains("high contradiction density"));
955
956        // Moderate contradiction density at session end → immediate.
957        let moderate_density = DreamSignals {
958            total_non_raw_count: 20,
959            contradiction_count: 2,
960            contradiction_density: 0.10,
961            explicit_count: 5,
962            ..DreamSignals::default()
963        };
964        let plan = choose_dream_schedule(&moderate_density, RuntimeShutdownReason::SessionEnded);
965        assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
966        assert!(plan.reason.contains("moderate contradiction density"));
967
968        // Moderate contradiction density at idle timeout → not immediate.
969        let plan = choose_dream_schedule(&moderate_density, RuntimeShutdownReason::IdleTimeout);
970        assert_ne!(plan.action, DreamScheduleAction::ImmediateBounded);
971    }
972
973    #[test]
974    fn test_dream_signals_computes_contradiction_density() {
975        let signals = DreamSignals {
976            total_non_raw_count: 20,
977            contradiction_count: 4,
978            contradiction_density: 4.0 / 20.0,
979            ..DreamSignals::default()
980        };
981        assert!((signals.contradiction_density - 0.20).abs() < f32::EPSILON);
982
983        let empty = DreamSignals::default();
984        assert!((empty.contradiction_density).abs() < f32::EPSILON);
985    }
986}