1use std::sync::Arc;
13
14use async_trait::async_trait;
15use chrono::Utc;
16use nexus_core::config::{AgentConfig, CognitionConfig};
17use nexus_core::traits::EmbeddingService;
18use nexus_core::PerspectiveKey;
19use nexus_llm::{create_client_auto_with_fallback, GenerateParams, GenerateResponse, LlmClient};
20use nexus_storage::repository::{MemoryRepository, NamespaceRepository};
21use nexus_storage::StorageManager;
22use tracing::{debug, info, warn};
23
24use crate::dream_cycle::{self, DreamCycleRequest, DreamScheduleAction};
25use crate::error::AgentError;
26use crate::job_processor;
27use crate::runtime_state::{self, RuntimeState};
28
29pub use crate::runtime_state::{derive_session_key, RuntimeMode, RuntimeShutdownReason};
31
32pub struct RuntimeController {
35 cognition: CognitionConfig,
36 agent: AgentConfig,
37 embeddings: Option<Arc<dyn EmbeddingService>>,
38}
39
40impl RuntimeController {
41 pub fn new(
42 cognition: CognitionConfig,
43 agent: AgentConfig,
44 embeddings: Option<Arc<dyn EmbeddingService>>,
45 ) -> Self {
46 Self {
47 cognition,
48 agent,
49 embeddings,
50 }
51 }
52
53 pub async fn ensure_started(
54 &self,
55 agent_type: &str,
56 session_key: Option<&str>,
57 cwd: Option<&str>,
58 mode: RuntimeMode,
59 ) -> Result<(), AgentError> {
60 if !self.cognition.auto_runtime_enabled {
61 return Ok(());
62 }
63
64 let config =
65 nexus_core::Config::from_env().map_err(|e| AgentError::Config(e.to_string()))?;
66 let mut storage = StorageManager::from_url(&config.database_url())
67 .await
68 .map_err(|e| AgentError::Storage(e.to_string()))?;
69 storage
70 .initialize()
71 .await
72 .map_err(|e| AgentError::Storage(e.to_string()))?;
73
74 let session_key = derive_session_key(agent_type, session_key, cwd);
75 let path = runtime_state::state_file_path(agent_type, &session_key)?;
76 let now = Utc::now();
77
78 let mut state = runtime_state::read_runtime_state(&path)?.unwrap_or(RuntimeState {
79 agent_type: agent_type.to_string(),
80 session_key: session_key.clone(),
81 mode: mode.into(),
82 started_at: now,
83 updated_at: now,
84 });
85
86 if (now - state.updated_at).num_seconds() > self.cognition.runtime_idle_timeout_secs as i64
87 {
88 state.started_at = now;
89 }
90 state.updated_at = now;
91 state.mode = mode.into();
92
93 runtime_state::write_runtime_state(&path, &state)?;
94 let llm = runtime_llm_client();
95 let processed = dream_cycle::drain_cognition_jobs(
96 storage.pool().clone(),
97 runtime_state::namespace_id_for(agent_type, &storage).await?,
98 &self.cognition,
99 &self.agent,
100 llm,
101 self.embeddings.clone(),
102 &format!("runtime-start-{agent_type}-{}", state.session_key),
103 )
104 .await?;
105 debug!(
106 agent_type,
107 processed_jobs = processed,
108 "Runtime startup cognition drain complete"
109 );
110 debug!(agent_type, session_key = %state.session_key, "Runtime state ensured");
111 Ok(())
112 }
113
114 pub async fn flush_and_shutdown(
115 &self,
116 agent_type: &str,
117 session_key: Option<&str>,
118 cwd: Option<&str>,
119 reason: RuntimeShutdownReason,
120 ) -> Result<(), AgentError> {
121 if !self.cognition.auto_runtime_enabled {
122 return Ok(());
123 }
124
125 let config =
126 nexus_core::Config::from_env().map_err(|e| AgentError::Config(e.to_string()))?;
127 let mut storage = StorageManager::from_url(&config.database_url())
128 .await
129 .map_err(|e| AgentError::Storage(e.to_string()))?;
130 storage
131 .initialize()
132 .await
133 .map_err(|e| AgentError::Storage(e.to_string()))?;
134
135 let namespace_repo = NamespaceRepository::new(storage.pool().clone());
136 let namespace = namespace_repo
137 .get_or_create(agent_type, agent_type)
138 .await
139 .map_err(|e| AgentError::Storage(e.to_string()))?;
140
141 let memory_repo = MemoryRepository::new(storage.pool().clone());
142 let derived_session_key = derive_session_key(agent_type, session_key, cwd);
143 runtime_state::store_runtime_marker(
144 &memory_repo,
145 namespace.id,
146 runtime_state::RuntimeMarker {
147 agent_type,
148 session_key,
149 cwd,
150 event: "runtime_session_end",
151 detail: runtime_state::runtime_reason_label(reason),
152 agent_namespace: self.agent.namespace.as_str(),
153 },
154 )
155 .await?;
156
157 let llm = runtime_llm_client();
158 let processed = dream_cycle::drain_cognition_jobs(
159 storage.pool().clone(),
160 namespace.id,
161 &self.cognition,
162 &self.agent,
163 llm.clone(),
164 self.embeddings.clone(),
165 &format!("runtime-stop-{agent_type}-{derived_session_key}"),
166 )
167 .await?;
168 debug!(
169 agent_type,
170 processed_jobs = processed,
171 "Runtime shutdown cognition drain complete"
172 );
173
174 if self.cognition.dream_on_session_end {
175 let lease_owner = format!("runtime-dream-{agent_type}-{derived_session_key}");
176 let shutdown_perspective = PerspectiveKey {
177 observer: agent_type.to_string(),
178 subject: agent_type.to_string(),
179 session_key: Some(derived_session_key.clone()),
180 };
181 let signals = dream_cycle::collect_dream_signals(
182 &memory_repo,
183 namespace.id,
184 &derived_session_key,
185 )
186 .await?;
187 let plan = dream_cycle::choose_dream_schedule(&signals, reason);
188 debug!(
189 agent_type,
190 session_key = ?session_key,
191 action = ?plan.action,
192 plan_reason = plan.reason,
193 raw_event_count = signals.raw_event_count,
194 contradiction_count = signals.contradiction_count,
195 contradiction_density = signals.contradiction_density,
196 total_non_raw = signals.total_non_raw_count,
197 has_digest_gap = signals.has_digest_gap,
198 "Selected adaptive dream schedule"
199 );
200 match plan.action {
201 DreamScheduleAction::ImmediateBounded => match tokio::time::timeout(
202 std::time::Duration::from_secs(self.cognition.session_end_dream_timeout_secs),
203 dream_cycle::run_dream_cycle(
204 storage.pool().clone(),
205 &self.cognition,
206 &self.agent,
207 llm,
208 self.embeddings.clone(),
209 DreamCycleRequest {
210 namespace_id: namespace.id,
211 lease_owner: &lease_owner,
212 perspective: Some(&shutdown_perspective),
213 session_key: Some(derived_session_key.as_str()),
214 reflect_reason: "session_end_dream",
215 digest_reason: "session_end",
216 },
217 ),
218 )
219 .await
220 {
221 Ok(Ok(processed)) if processed > 0 => {
222 info!(
223 agent_type,
224 session_key = ?session_key,
225 processed,
226 plan_reason = plan.reason,
227 "Dream pass completed through cognition jobs"
228 );
229 }
230 Ok(Ok(_)) => {
231 debug!(
232 agent_type,
233 session_key = ?session_key,
234 plan_reason = plan.reason,
235 "Dream pass skipped"
236 );
237 }
238 Ok(Err(error)) => {
239 warn!(%error, agent_type, session_key = ?session_key, "Dream pass failed");
240 }
241 Err(_) => {
242 warn!(
243 agent_type,
244 session_key = ?session_key,
245 timeout_secs = self.cognition.session_end_dream_timeout_secs,
246 "Dream pass timed out during shutdown"
247 );
248 }
249 },
250 DreamScheduleAction::DelayedEnqueue => {
251 let queued = dream_cycle::enqueue_dream_jobs(
252 &memory_repo,
253 namespace.id,
254 Some(&shutdown_perspective),
255 Some(derived_session_key.as_str()),
256 "session_end_delayed_dream",
257 "session_end",
258 )
259 .await?;
260 debug!(
261 agent_type,
262 session_key = ?session_key,
263 queued,
264 plan_reason = plan.reason,
265 "Queued delayed dream jobs without immediate drain"
266 );
267 }
268 DreamScheduleAction::DigestOnly => {
269 let queued = job_processor::enqueue_digest_job_if_absent(
270 &memory_repo,
271 namespace.id,
272 derived_session_key.as_str(),
273 "session_end_digest_only",
274 )
275 .await?;
276 debug!(
277 agent_type,
278 session_key = ?session_key,
279 queued,
280 plan_reason = plan.reason,
281 "Queued digest-only shutdown work"
282 );
283 if queued && matches!(reason, RuntimeShutdownReason::SessionEnded) {
284 match tokio::time::timeout(
285 std::time::Duration::from_secs(
286 self.cognition.session_end_dream_timeout_secs,
287 ),
288 dream_cycle::drain_cognition_jobs(
289 storage.pool().clone(),
290 namespace.id,
291 &self.cognition,
292 &self.agent,
293 llm.clone(),
294 self.embeddings.clone(),
295 &format!("runtime-finalize-{agent_type}-{derived_session_key}"),
296 ),
297 )
298 .await
299 {
300 Ok(Ok(processed)) => {
301 debug!(
302 agent_type,
303 session_key = ?session_key,
304 processed,
305 plan_reason = plan.reason,
306 "Drained digest-only shutdown work before runtime teardown"
307 );
308 }
309 Ok(Err(error)) => {
310 warn!(
311 %error,
312 agent_type,
313 session_key = ?session_key,
314 "Digest-only shutdown drain failed"
315 );
316 }
317 Err(_) => {
318 warn!(
319 agent_type,
320 session_key = ?session_key,
321 timeout_secs = self.cognition.session_end_dream_timeout_secs,
322 "Digest-only shutdown drain timed out"
323 );
324 }
325 }
326 }
327 }
328 DreamScheduleAction::Skip => {
329 debug!(
330 agent_type,
331 session_key = ?session_key,
332 plan_reason = plan.reason,
333 "Skipped shutdown dream work after adaptive planning"
334 );
335 }
336 }
337 }
338
339 let path = runtime_state::state_file_path(agent_type, &derived_session_key)?;
340 if path.exists() {
341 std::fs::remove_file(&path)?;
342 }
343
344 Ok(())
345 }
346
347 pub fn state_root() -> std::path::PathBuf {
348 runtime_state::state_root()
349 }
350}
351
352pub async fn create_embedding_service(
360 config: &nexus_core::Config,
361) -> Option<Arc<dyn EmbeddingService>> {
362 if !config.embedding.enabled {
363 return None;
364 }
365 match nexus_embeddings::create_service(config).await {
366 Ok(Some(service)) => Some(service),
367 Ok(None) => None,
368 Err(error) => {
369 warn!(
370 %error,
371 "Failed to initialize embedding service; configured LLM features remain available and cognition will run without semantic embeddings. Configure a remote embedding provider, local OpenAI-compatible runtime, or set NEXUS_EMBEDDINGS_ENABLED=false"
372 );
373 None
374 }
375 }
376}
377
378fn runtime_llm_client() -> Arc<dyn LlmClient> {
381 match create_client_auto_with_fallback() {
382 Ok(client) => client,
383 Err(error) => {
384 warn!(%error, "LLM unavailable, using deterministic cognition fallbacks");
385 Arc::new(UnavailableLlmClient {
386 message: error.to_string(),
387 })
388 }
389 }
390}
391
392struct UnavailableLlmClient {
393 message: String,
394}
395
396#[async_trait]
397impl LlmClient for UnavailableLlmClient {
398 async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
399 Err(nexus_llm::LlmError::InvalidJsonResponse(
400 self.message.clone(),
401 ))
402 }
403
404 fn provider_name(&self) -> String {
405 "unavailable".to_string()
406 }
407
408 fn model_name(&self) -> String {
409 "deterministic-fallback".to_string()
410 }
411}
412
413#[cfg(test)]
416mod tests {
417 use super::*;
418 use crate::dream_cycle::{
419 choose_dream_schedule, collect_dream_signals, enqueue_dream_jobs, DreamScheduleAction,
420 DreamSignals,
421 };
422 use crate::job_processor::{
423 digest_job_is_forced, process_digest_jobs, DIGEST_SESSION_JOB, REFLECT_NAMESPACE_JOB,
424 REFLECT_PERSPECTIVE_JOB,
425 };
426 use crate::runtime_state::{derive_session_key, sanitize_component, RuntimeShutdownReason};
427 use nexus_core::config::{AgentConfig, CognitionConfig};
428 use nexus_core::{CognitiveLevel, MemoryCategory};
429 use nexus_storage::models::memory_job_status;
430 use nexus_storage::repository::{MemoryRepository, NamespaceRepository, StoreMemoryParams};
431 use serde_json::json;
432 use sqlx::sqlite::SqlitePoolOptions;
433 use std::sync::Arc;
434
435 #[tokio::test]
436 async fn run_dream_cycle_processes_namespace_jobs() {
437 let pool = SqlitePoolOptions::new()
438 .max_connections(1)
439 .connect("sqlite::memory:")
440 .await
441 .unwrap();
442 nexus_storage::migrations::run_migrations(&pool)
443 .await
444 .unwrap();
445
446 let namespace_repo = NamespaceRepository::new(pool.clone());
447 let namespace = namespace_repo
448 .get_or_create("runtime-dream-test", "runtime-dream-test")
449 .await
450 .unwrap();
451 let repo = MemoryRepository::new(pool.clone());
452
453 for content in ["feature enabled", "feature not enabled"] {
454 repo.store(StoreMemoryParams {
455 namespace_id: namespace.id,
456 content,
457 category: &MemoryCategory::Facts,
458 memory_lane_type: None,
459 labels: &[],
460 metadata: &json!({
461 "cognitive": {
462 "level": "explicit",
463 "observer": "claude-code",
464 "subject": "claude-code",
465 "generated_by": "test"
466 }
467 }),
468 embedding: None,
469 embedding_model: None,
470 })
471 .await
472 .unwrap();
473 }
474
475 let processed = dream_cycle::run_dream_cycle(
476 pool.clone(),
477 &CognitionConfig::default(),
478 &AgentConfig::default(),
479 Arc::new(UnavailableLlmClient {
480 message: "offline".to_string(),
481 }),
482 None,
483 DreamCycleRequest {
484 namespace_id: namespace.id,
485 lease_owner: "test-owner",
486 perspective: None,
487 session_key: None,
488 reflect_reason: "namespace_dream",
489 digest_reason: "dream_digest",
490 },
491 )
492 .await
493 .unwrap();
494
495 assert!(processed >= 1);
496 assert_eq!(
497 repo.get_by_cognitive_level(namespace.id, CognitiveLevel::Contradiction, 10)
498 .await
499 .unwrap()
500 .len(),
501 1
502 );
503 assert!(repo
504 .list_jobs(
505 namespace.id,
506 Some(REFLECT_NAMESPACE_JOB),
507 Some("pending"),
508 10,
509 0
510 )
511 .await
512 .unwrap()
513 .is_empty());
514 }
515
516 #[tokio::test]
517 async fn process_digest_jobs_skips_below_rollover_threshold() {
518 let pool = SqlitePoolOptions::new()
519 .max_connections(1)
520 .connect("sqlite::memory:")
521 .await
522 .unwrap();
523 nexus_storage::migrations::run_migrations(&pool)
524 .await
525 .unwrap();
526
527 let namespace_repo = NamespaceRepository::new(pool.clone());
528 let namespace = namespace_repo
529 .get_or_create("runtime-digest-skip", "runtime-digest-skip")
530 .await
531 .unwrap();
532 let repo = MemoryRepository::new(pool.clone());
533
534 let source = repo
535 .store(StoreMemoryParams {
536 namespace_id: namespace.id,
537 content: "Small explicit update.",
538 category: &MemoryCategory::Session,
539 memory_lane_type: None,
540 labels: &[],
541 metadata: &serde_json::json!({
542 "cognitive": {
543 "level": "explicit",
544 "observer": "claude-code",
545 "subject": "claude-code",
546 "session_key": "digest-skip-session"
547 }
548 }),
549 embedding: None,
550 embedding_model: None,
551 })
552 .await
553 .unwrap();
554
555 let prior_digest = repo
556 .store(StoreMemoryParams {
557 namespace_id: namespace.id,
558 content: "Prior digest",
559 category: &MemoryCategory::Session,
560 memory_lane_type: None,
561 labels: &[],
562 metadata: &serde_json::json!({
563 "cognitive": {
564 "level": "summary_short",
565 "observer": "claude-code",
566 "subject": "claude-code",
567 "session_key": "digest-skip-session"
568 }
569 }),
570 embedding: None,
571 embedding_model: None,
572 })
573 .await
574 .unwrap();
575
576 repo.store_digest(nexus_storage::repository::StoreDigestParams {
577 namespace_id: namespace.id,
578 session_key: "digest-skip-session",
579 digest_kind: "short",
580 memory_id: prior_digest.id,
581 start_memory_id: Some(source.id),
582 end_memory_id: Some(source.id),
583 token_count: 12,
584 })
585 .await
586 .unwrap();
587
588 let follow_up = repo
589 .store(StoreMemoryParams {
590 namespace_id: namespace.id,
591 content: "Tiny follow-up.",
592 category: &MemoryCategory::Session,
593 memory_lane_type: None,
594 labels: &[],
595 metadata: &serde_json::json!({
596 "cognitive": {
597 "level": "explicit",
598 "observer": "claude-code",
599 "subject": "claude-code",
600 "session_key": "digest-skip-session"
601 }
602 }),
603 embedding: None,
604 embedding_model: None,
605 })
606 .await
607 .unwrap();
608
609 repo.enqueue_job(nexus_storage::EnqueueJobParams {
610 namespace_id: namespace.id,
611 job_type: DIGEST_SESSION_JOB,
612 priority: 90,
613 perspective: None,
614 payload: &serde_json::json!({
615 "session_key": "digest-skip-session",
616 "source_memory_id": follow_up.id
617 }),
618 })
619 .await
620 .unwrap();
621
622 assert_eq!(
623 repo.list_jobs(
624 namespace.id,
625 Some(DIGEST_SESSION_JOB),
626 Some("pending"),
627 10,
628 0
629 )
630 .await
631 .unwrap()
632 .len(),
633 1
634 );
635
636 let processed = process_digest_jobs(
637 &repo,
638 namespace.id,
639 &CognitionConfig::default(),
640 &AgentConfig::default(),
641 Arc::new(UnavailableLlmClient {
642 message: "offline".to_string(),
643 }),
644 None,
645 "digest-skip",
646 )
647 .await
648 .unwrap();
649
650 assert_eq!(processed, 1);
651 let latest = repo
652 .latest_digest_for_session(namespace.id, "digest-skip-session", "short")
653 .await
654 .unwrap()
655 .unwrap();
656 assert_eq!(latest.id, prior_digest.id);
657 assert_eq!(
658 repo.count_digests(namespace.id, Some("digest-skip-session"))
659 .await
660 .unwrap(),
661 1
662 );
663 }
664
665 #[test]
666 fn digest_job_force_reason_matches_expected_inputs() {
667 assert!(digest_job_is_forced(Some("dream_digest")));
668 assert!(digest_job_is_forced(Some("session_end")));
669 assert!(digest_job_is_forced(Some("manual_digest")));
670 assert!(digest_job_is_forced(Some("manual_rebuild")));
671 assert!(!digest_job_is_forced(Some("derive_follow_up")));
672 assert!(!digest_job_is_forced(None));
673 }
674
675 #[test]
676 fn sanitize_component_replaces_unsafe_chars() {
677 assert_eq!(sanitize_component("claude/code:1"), "claude_code_1");
678 }
679
680 #[test]
681 fn derive_session_key_prefers_explicit_key() {
682 assert_eq!(
683 derive_session_key("claude-code", Some("abc"), Some("/tmp/project")),
684 "abc"
685 );
686 }
687
688 #[test]
689 fn derive_session_key_falls_back_to_stable_hash() {
690 let first = derive_session_key("claude-code", None, Some("/tmp/project"));
691 let second = derive_session_key("claude-code", Some(""), Some("/tmp/project"));
692 assert_eq!(first, second);
693 assert!(first.starts_with("derived-"));
694 }
695
696 #[test]
697 fn choose_dream_schedule_immediate_for_contradictions() {
698 let plan = choose_dream_schedule(
699 &DreamSignals {
700 contradiction_count: 1,
701 raw_event_count: 1,
702 ..DreamSignals::default()
703 },
704 RuntimeShutdownReason::SessionEnded,
705 );
706
707 assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
708 }
709
710 #[test]
711 fn choose_dream_schedule_digest_only_for_light_digest_gap() {
712 let plan = choose_dream_schedule(
713 &DreamSignals {
714 raw_event_count: 1,
715 has_digest_gap: true,
716 ..DreamSignals::default()
717 },
718 RuntimeShutdownReason::SessionEnded,
719 );
720
721 assert_eq!(plan.action, DreamScheduleAction::DigestOnly);
722 }
723
724 #[test]
725 fn choose_dream_schedule_delays_idle_medium_signal_sessions() {
726 let plan = choose_dream_schedule(
727 &DreamSignals {
728 raw_event_count: 2,
729 explicit_count: 2,
730 derived_count: 0,
731 ..DreamSignals::default()
732 },
733 RuntimeShutdownReason::IdleTimeout,
734 );
735
736 assert_eq!(plan.action, DreamScheduleAction::DelayedEnqueue);
737 }
738
739 #[test]
740 fn choose_dream_schedule_session_end_flushes_explicit_reflection() {
741 let plan = choose_dream_schedule(
742 &DreamSignals {
743 explicit_count: 2,
744 has_digest_gap: true,
745 ..DreamSignals::default()
746 },
747 RuntimeShutdownReason::SessionEnded,
748 );
749
750 assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
751 }
752
753 #[tokio::test]
754 async fn collect_dream_signals_counts_session_signal_types() {
755 let pool = SqlitePoolOptions::new()
756 .max_connections(1)
757 .connect("sqlite::memory:")
758 .await
759 .unwrap();
760 nexus_storage::migrations::run_migrations(&pool)
761 .await
762 .unwrap();
763
764 let namespace_repo = NamespaceRepository::new(pool.clone());
765 let namespace = namespace_repo
766 .get_or_create("runtime-dream-signals", "runtime-dream-signals")
767 .await
768 .unwrap();
769 let repo = MemoryRepository::new(pool.clone());
770
771 for (content, labels, metadata) in [
772 (
773 "tool event",
774 vec!["raw-activity".to_string()],
775 json!({
776 "raw_activity": true,
777 "cognitive": {
778 "level": "raw",
779 "observer": "claude-code",
780 "subject": "claude-code",
781 "session_key": "signals-session"
782 }
783 }),
784 ),
785 (
786 "explicit note",
787 vec![],
788 json!({
789 "cognitive": {
790 "level": "explicit",
791 "observer": "claude-code",
792 "subject": "claude-code",
793 "session_key": "signals-session"
794 }
795 }),
796 ),
797 (
798 "derived insight",
799 vec![],
800 json!({
801 "cognitive": {
802 "level": "derived",
803 "observer": "claude-code",
804 "subject": "claude-code",
805 "session_key": "signals-session"
806 }
807 }),
808 ),
809 (
810 "contradiction record",
811 vec![],
812 json!({
813 "cognitive": {
814 "level": "contradiction",
815 "observer": "claude-code",
816 "subject": "claude-code",
817 "session_key": "signals-session"
818 }
819 }),
820 ),
821 ] {
822 repo.store(StoreMemoryParams {
823 namespace_id: namespace.id,
824 content,
825 category: &MemoryCategory::Session,
826 memory_lane_type: None,
827 labels: &labels,
828 metadata: &metadata,
829 embedding: None,
830 embedding_model: None,
831 })
832 .await
833 .unwrap();
834 }
835
836 let signals = collect_dream_signals(&repo, namespace.id, "signals-session")
837 .await
838 .unwrap();
839
840 assert_eq!(signals.raw_event_count, 1);
841 assert_eq!(signals.explicit_count, 1);
842 assert_eq!(signals.derived_count, 1);
843 assert_eq!(signals.contradiction_count, 1);
844 assert!(signals.has_digest_gap);
845 }
846
847 #[tokio::test]
848 async fn enqueue_dream_jobs_coalesces_session_scoped_shutdown_work() {
849 let pool = SqlitePoolOptions::new()
850 .max_connections(1)
851 .connect("sqlite::memory:")
852 .await
853 .unwrap();
854 nexus_storage::migrations::run_migrations(&pool)
855 .await
856 .unwrap();
857
858 let namespace_repo = NamespaceRepository::new(pool.clone());
859 let namespace = namespace_repo
860 .get_or_create("runtime-dream-dedupe", "runtime-dream-dedupe")
861 .await
862 .unwrap();
863 let repo = MemoryRepository::new(pool.clone());
864 let perspective = nexus_core::PerspectiveKey {
865 observer: "claude-code".to_string(),
866 subject: "claude-code".to_string(),
867 session_key: Some("session-123".to_string()),
868 };
869
870 let first = enqueue_dream_jobs(
871 &repo,
872 namespace.id,
873 Some(&perspective),
874 Some("session-123"),
875 "session_end_dream",
876 "session_end",
877 )
878 .await
879 .unwrap();
880 let second = enqueue_dream_jobs(
881 &repo,
882 namespace.id,
883 Some(&perspective),
884 Some("session-123"),
885 "session_end_dream",
886 "session_end",
887 )
888 .await
889 .unwrap();
890
891 assert_eq!(first, 2);
892 assert_eq!(second, 0);
893 assert_eq!(
894 repo.list_jobs(
895 namespace.id,
896 Some(REFLECT_PERSPECTIVE_JOB),
897 Some(memory_job_status::PENDING),
898 10,
899 0
900 )
901 .await
902 .unwrap()
903 .len(),
904 1
905 );
906 assert_eq!(
907 repo.list_jobs(
908 namespace.id,
909 Some(REFLECT_NAMESPACE_JOB),
910 Some(memory_job_status::PENDING),
911 10,
912 0
913 )
914 .await
915 .unwrap()
916 .len(),
917 0
918 );
919 assert_eq!(
920 repo.list_jobs(
921 namespace.id,
922 Some(DIGEST_SESSION_JOB),
923 Some(memory_job_status::PENDING),
924 10,
925 0
926 )
927 .await
928 .unwrap()
929 .len(),
930 1
931 );
932 }
933
934 #[test]
937 fn test_choose_dream_schedule_uses_contradiction_density() {
938 let plan = choose_dream_schedule(
940 &DreamSignals::default(),
941 RuntimeShutdownReason::SessionEnded,
942 );
943 assert_eq!(plan.action, DreamScheduleAction::Skip);
944
945 let high_density = DreamSignals {
947 total_non_raw_count: 10,
948 contradiction_count: 3,
949 contradiction_density: 0.30,
950 ..DreamSignals::default()
951 };
952 let plan = choose_dream_schedule(&high_density, RuntimeShutdownReason::SessionEnded);
953 assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
954 assert!(plan.reason.contains("high contradiction density"));
955
956 let moderate_density = DreamSignals {
958 total_non_raw_count: 20,
959 contradiction_count: 2,
960 contradiction_density: 0.10,
961 explicit_count: 5,
962 ..DreamSignals::default()
963 };
964 let plan = choose_dream_schedule(&moderate_density, RuntimeShutdownReason::SessionEnded);
965 assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
966 assert!(plan.reason.contains("moderate contradiction density"));
967
968 let plan = choose_dream_schedule(&moderate_density, RuntimeShutdownReason::IdleTimeout);
970 assert_ne!(plan.action, DreamScheduleAction::ImmediateBounded);
971 }
972
973 #[test]
974 fn test_dream_signals_computes_contradiction_density() {
975 let signals = DreamSignals {
976 total_non_raw_count: 20,
977 contradiction_count: 4,
978 contradiction_density: 4.0 / 20.0,
979 ..DreamSignals::default()
980 };
981 assert!((signals.contradiction_density - 0.20).abs() < f32::EPSILON);
982
983 let empty = DreamSignals::default();
984 assert!((empty.contradiction_density).abs() < f32::EPSILON);
985 }
986}