1use std::collections::{BTreeSet, HashSet};
4use std::hash::{Hash, Hasher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Instant;
8
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use nexus_core::config::{AgentConfig, CognitionConfig};
12use nexus_core::traits::EmbeddingService;
13use nexus_core::{
14 infer_perspective, CognitiveLevel, CognitiveMetadata, Memory, MemoryCategory,
15 MemoryLaneCognitiveType, MemoryLaneType, PerspectiveKey, PerspectiveSource,
16};
17use nexus_llm::{
18 create_client_auto_with_fallback, ChatMessage, GenerateParams, GenerateResponse, LlmClient,
19 LlmClientJson,
20};
21use nexus_storage::models::{memory_job_status, EnqueueJobParams, MemoryJobRow};
22use nexus_storage::repository::{MemoryRepository, NamespaceRepository, StoreMemoryParams};
23use nexus_storage::StorageManager;
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26use tracing::{debug, info, warn};
27
28use crate::derive::DeriveService;
29use crate::digest::DigestService;
30use crate::error::AgentError;
31use crate::reflect::ReflectService;
32use crate::util::{flush_metric_samples, stage_metric_sample};
33
34const DERIVE_MEMORY_JOB: &str = "derive_memory";
35const ACTIVITY_DISTILL_JOB: &str = "activity_distill";
36const REFLECT_NAMESPACE_JOB: &str = "reflect_namespace";
37const REFLECT_PERSPECTIVE_JOB: &str = "reflect_perspective";
38const DIGEST_SESSION_JOB: &str = "digest_session";
39const ACTIVITY_DISTILL_SYSTEM_PROMPT: &str = r#"You are distilling a batch of raw agent hook events into a meaningful session summary.
40
41Given a set of raw hook events (JSON with timestamps, tool names, CWD, session IDs), produce a structured summary of what happened in the session.
42
43Focus on:
44- What the user/agent was working on (project, directory, task)
45- Which tools were used and how often
46- Key actions taken (tests run, files edited, commands executed)
47- Any patterns (repeated test runs, debugging cycles, etc.)
48
49Return strict JSON with these fields:
50- summary: A 1-3 sentence human-readable summary of the session
51- category: One of "session", "context", "facts"
52- labels: 2-5 descriptive labels
53- key_activities: List of notable activities
54- files_touched: List of files/directories mentioned
55- tools_used: List of unique tools used
56- decisions_made: Any decisions evident from the event sequence
57
58Return strict JSON only. No markdown fences."#;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum RuntimeMode {
62 SessionScoped,
63 Persistent,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum RuntimeShutdownReason {
68 SessionEnded,
69 IdleTimeout,
70 Manual,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
74struct RuntimeState {
75 agent_type: String,
76 session_key: String,
77 mode: RuntimeModeSerde,
78 started_at: DateTime<Utc>,
79 updated_at: DateTime<Utc>,
80}
81
82struct RuntimeMarker<'a> {
83 agent_type: &'a str,
84 session_key: Option<&'a str>,
85 cwd: Option<&'a str>,
86 event: &'a str,
87 detail: &'a str,
88 agent_namespace: &'a str,
89}
90
91#[derive(Debug, Clone)]
92pub struct DreamCycleRequest<'a> {
93 pub namespace_id: i64,
94 pub lease_owner: &'a str,
95 pub perspective: Option<&'a PerspectiveKey>,
96 pub session_key: Option<&'a str>,
97 pub reflect_reason: &'a str,
98 pub digest_reason: &'a str,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102enum DreamScheduleAction {
103 ImmediateBounded,
104 DelayedEnqueue,
105 DigestOnly,
106 Skip,
107}
108
109#[derive(Debug, Clone)]
110struct DreamSchedulePlan {
111 action: DreamScheduleAction,
112 reason: &'static str,
113}
114
115#[derive(Debug, Clone, Default)]
116struct DreamSignals {
117 raw_event_count: usize,
118 explicit_count: usize,
119 derived_count: usize,
120 contradiction_count: usize,
121 has_digest_gap: bool,
122 total_non_raw_count: usize,
124 contradiction_density: f32,
126}
127
128#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
129#[serde(rename_all = "snake_case")]
130enum RuntimeModeSerde {
131 SessionScoped,
132 Persistent,
133}
134
135impl From<RuntimeMode> for RuntimeModeSerde {
136 fn from(value: RuntimeMode) -> Self {
137 match value {
138 RuntimeMode::SessionScoped => Self::SessionScoped,
139 RuntimeMode::Persistent => Self::Persistent,
140 }
141 }
142}
143
144pub struct RuntimeController {
145 cognition: CognitionConfig,
146 agent: AgentConfig,
147 embeddings: Option<Arc<dyn EmbeddingService>>,
148}
149
150impl RuntimeController {
151 pub fn new(
152 cognition: CognitionConfig,
153 agent: AgentConfig,
154 embeddings: Option<Arc<dyn EmbeddingService>>,
155 ) -> Self {
156 Self {
157 cognition,
158 agent,
159 embeddings,
160 }
161 }
162
163 pub async fn ensure_started(
164 &self,
165 agent_type: &str,
166 session_key: Option<&str>,
167 cwd: Option<&str>,
168 mode: RuntimeMode,
169 ) -> Result<(), AgentError> {
170 if !self.cognition.auto_runtime_enabled {
171 return Ok(());
172 }
173
174 let config =
175 nexus_core::Config::from_env().map_err(|e| AgentError::Config(e.to_string()))?;
176 let mut storage = StorageManager::from_url(&config.database_url())
177 .await
178 .map_err(|e| AgentError::Storage(e.to_string()))?;
179 storage
180 .initialize()
181 .await
182 .map_err(|e| AgentError::Storage(e.to_string()))?;
183
184 let session_key = derive_session_key(agent_type, session_key, cwd);
185 let path = self.state_file_path(agent_type, &session_key)?;
186 let now = Utc::now();
187
188 let mut state = read_runtime_state(&path)?.unwrap_or(RuntimeState {
189 agent_type: agent_type.to_string(),
190 session_key: session_key.clone(),
191 mode: mode.into(),
192 started_at: now,
193 updated_at: now,
194 });
195
196 if (now - state.updated_at).num_seconds() > self.cognition.runtime_idle_timeout_secs as i64
197 {
198 state.started_at = now;
199 }
200 state.updated_at = now;
201 state.mode = mode.into();
202
203 write_runtime_state(&path, &state)?;
204 let llm = runtime_llm_client();
205 let processed = drain_cognition_jobs(
206 storage.pool().clone(),
207 namespace_id_for(agent_type, &storage).await?,
208 &self.cognition,
209 &self.agent,
210 llm,
211 self.embeddings.clone(),
212 &format!("runtime-start-{agent_type}-{}", state.session_key),
213 )
214 .await?;
215 debug!(
216 agent_type,
217 processed_jobs = processed,
218 "Runtime startup cognition drain complete"
219 );
220 debug!(agent_type, session_key = %state.session_key, "Runtime state ensured");
221 Ok(())
222 }
223
224 pub async fn flush_and_shutdown(
225 &self,
226 agent_type: &str,
227 session_key: Option<&str>,
228 cwd: Option<&str>,
229 reason: RuntimeShutdownReason,
230 ) -> Result<(), AgentError> {
231 if !self.cognition.auto_runtime_enabled {
232 return Ok(());
233 }
234
235 let config =
236 nexus_core::Config::from_env().map_err(|e| AgentError::Config(e.to_string()))?;
237 let mut storage = StorageManager::from_url(&config.database_url())
238 .await
239 .map_err(|e| AgentError::Storage(e.to_string()))?;
240 storage
241 .initialize()
242 .await
243 .map_err(|e| AgentError::Storage(e.to_string()))?;
244
245 let namespace_repo = NamespaceRepository::new(storage.pool().clone());
246 let namespace = namespace_repo
247 .get_or_create(agent_type, agent_type)
248 .await
249 .map_err(|e| AgentError::Storage(e.to_string()))?;
250
251 let memory_repo = MemoryRepository::new(storage.pool().clone());
252 let derived_session_key = derive_session_key(agent_type, session_key, cwd);
253 store_runtime_marker(
254 &memory_repo,
255 namespace.id,
256 RuntimeMarker {
257 agent_type,
258 session_key,
259 cwd,
260 event: "runtime_session_end",
261 detail: runtime_reason_label(reason),
262 agent_namespace: self.agent.namespace.as_str(),
263 },
264 )
265 .await?;
266
267 let llm = runtime_llm_client();
268 let processed = drain_cognition_jobs(
269 storage.pool().clone(),
270 namespace.id,
271 &self.cognition,
272 &self.agent,
273 llm.clone(),
274 self.embeddings.clone(),
275 &format!("runtime-stop-{agent_type}-{derived_session_key}"),
276 )
277 .await?;
278 debug!(
279 agent_type,
280 processed_jobs = processed,
281 "Runtime shutdown cognition drain complete"
282 );
283
284 if self.cognition.dream_on_session_end {
285 let lease_owner = format!("runtime-dream-{agent_type}-{derived_session_key}");
286 let shutdown_perspective = PerspectiveKey {
287 observer: agent_type.to_string(),
288 subject: agent_type.to_string(),
289 session_key: Some(derived_session_key.clone()),
290 };
291 let signals =
292 collect_dream_signals(&memory_repo, namespace.id, &derived_session_key).await?;
293 let plan = choose_dream_schedule(&self.cognition, &signals, reason);
294 debug!(
295 agent_type,
296 session_key = ?session_key,
297 action = ?plan.action,
298 plan_reason = plan.reason,
299 raw_event_count = signals.raw_event_count,
300 contradiction_count = signals.contradiction_count,
301 contradiction_density = signals.contradiction_density,
302 total_non_raw = signals.total_non_raw_count,
303 has_digest_gap = signals.has_digest_gap,
304 "Selected adaptive dream schedule"
305 );
306 match plan.action {
307 DreamScheduleAction::ImmediateBounded => match tokio::time::timeout(
308 std::time::Duration::from_secs(self.cognition.session_end_dream_timeout_secs),
309 run_dream_cycle(
310 storage.pool().clone(),
311 &self.cognition,
312 &self.agent,
313 llm,
314 self.embeddings.clone(),
315 DreamCycleRequest {
316 namespace_id: namespace.id,
317 lease_owner: &lease_owner,
318 perspective: Some(&shutdown_perspective),
319 session_key: Some(derived_session_key.as_str()),
320 reflect_reason: "session_end_dream",
321 digest_reason: "session_end",
322 },
323 ),
324 )
325 .await
326 {
327 Ok(Ok(processed)) if processed > 0 => {
328 info!(
329 agent_type,
330 session_key = ?session_key,
331 processed,
332 plan_reason = plan.reason,
333 "Dream pass completed through cognition jobs"
334 );
335 }
336 Ok(Ok(_)) => {
337 debug!(
338 agent_type,
339 session_key = ?session_key,
340 plan_reason = plan.reason,
341 "Dream pass skipped"
342 );
343 }
344 Ok(Err(error)) => {
345 warn!(%error, agent_type, session_key = ?session_key, "Dream pass failed");
346 }
347 Err(_) => {
348 warn!(
349 agent_type,
350 session_key = ?session_key,
351 timeout_secs = self.cognition.session_end_dream_timeout_secs,
352 "Dream pass timed out during shutdown"
353 );
354 }
355 },
356 DreamScheduleAction::DelayedEnqueue => {
357 let queued = enqueue_dream_jobs(
358 &memory_repo,
359 namespace.id,
360 Some(&shutdown_perspective),
361 Some(derived_session_key.as_str()),
362 "session_end_delayed_dream",
363 "session_end",
364 )
365 .await?;
366 debug!(
367 agent_type,
368 session_key = ?session_key,
369 queued,
370 plan_reason = plan.reason,
371 "Queued delayed dream jobs without immediate drain"
372 );
373 }
374 DreamScheduleAction::DigestOnly => {
375 let queued = enqueue_digest_job_if_absent(
376 &memory_repo,
377 namespace.id,
378 derived_session_key.as_str(),
379 "session_end_digest_only",
380 )
381 .await?;
382 debug!(
383 agent_type,
384 session_key = ?session_key,
385 queued,
386 plan_reason = plan.reason,
387 "Queued digest-only shutdown work"
388 );
389 if queued && matches!(reason, RuntimeShutdownReason::SessionEnded) {
390 match tokio::time::timeout(
391 std::time::Duration::from_secs(
392 self.cognition.session_end_dream_timeout_secs,
393 ),
394 drain_cognition_jobs(
395 storage.pool().clone(),
396 namespace.id,
397 &self.cognition,
398 &self.agent,
399 llm.clone(),
400 self.embeddings.clone(),
401 &format!("runtime-finalize-{agent_type}-{derived_session_key}"),
402 ),
403 )
404 .await
405 {
406 Ok(Ok(processed)) => {
407 debug!(
408 agent_type,
409 session_key = ?session_key,
410 processed,
411 plan_reason = plan.reason,
412 "Drained digest-only shutdown work before runtime teardown"
413 );
414 }
415 Ok(Err(error)) => {
416 warn!(
417 %error,
418 agent_type,
419 session_key = ?session_key,
420 "Digest-only shutdown drain failed"
421 );
422 }
423 Err(_) => {
424 warn!(
425 agent_type,
426 session_key = ?session_key,
427 timeout_secs = self.cognition.session_end_dream_timeout_secs,
428 "Digest-only shutdown drain timed out"
429 );
430 }
431 }
432 }
433 }
434 DreamScheduleAction::Skip => {
435 debug!(
436 agent_type,
437 session_key = ?session_key,
438 plan_reason = plan.reason,
439 "Skipped shutdown dream work after adaptive planning"
440 );
441 }
442 }
443 }
444
445 let path = self.state_file_path(agent_type, &derived_session_key)?;
446 if path.exists() {
447 std::fs::remove_file(&path)?;
448 }
449
450 Ok(())
451 }
452
453 pub fn state_root() -> PathBuf {
454 if let Some(dir) = dirs::state_dir() {
455 dir.join("nexus-memory-system").join("runtime")
456 } else {
457 std::env::var("HOME")
458 .map(|h| PathBuf::from(h).join(".local/state/nexus-memory-system/runtime"))
459 .unwrap_or_else(|_| PathBuf::from(".nexus-runtime"))
460 }
461 }
462
463 fn state_file_path(&self, agent_type: &str, session_key: &str) -> Result<PathBuf, AgentError> {
464 let root = Self::state_root().join("sessions");
465 std::fs::create_dir_all(&root)?;
466 Ok(root.join(format!(
467 "{}__{}.json",
468 sanitize_component(agent_type),
469 sanitize_component(session_key)
470 )))
471 }
472}
473
474pub async fn drain_cognition_jobs(
475 pool: sqlx::SqlitePool,
476 namespace_id: i64,
477 cognition: &CognitionConfig,
478 agent: &AgentConfig,
479 llm: Arc<dyn LlmClient>,
480 embeddings: Option<Arc<dyn EmbeddingService>>,
481 lease_owner: &str,
482) -> Result<usize, AgentError> {
483 let repo = MemoryRepository::new(pool);
484 let mut total_processed = 0usize;
485
486 for _ in 0..3 {
487 let mut progressed = 0usize;
488
489 if cognition.activity_distill_enabled {
490 progressed += process_activity_distill_jobs(
491 &repo,
492 namespace_id,
493 cognition,
494 llm.clone(),
495 lease_owner,
496 )
497 .await?;
498 }
499 if cognition.derive_enabled {
500 progressed += process_derive_jobs(
501 &repo,
502 namespace_id,
503 cognition,
504 agent,
505 llm.clone(),
506 embeddings.clone(),
507 lease_owner,
508 )
509 .await?;
510 }
511 if cognition.reflect_enabled {
512 progressed += process_reflect_jobs(
513 &repo,
514 namespace_id,
515 cognition,
516 agent,
517 embeddings.clone(),
518 lease_owner,
519 )
520 .await?;
521 progressed += process_reflect_namespace_jobs(
522 &repo,
523 namespace_id,
524 cognition,
525 agent,
526 embeddings.clone(),
527 lease_owner,
528 )
529 .await?;
530 }
531 if cognition.digest_enabled {
532 progressed += process_digest_jobs(
533 &repo,
534 namespace_id,
535 cognition,
536 agent,
537 llm.clone(),
538 embeddings.clone(),
539 lease_owner,
540 )
541 .await?;
542 }
543
544 total_processed += progressed;
545 if progressed == 0 {
546 break;
547 }
548 }
549
550 Ok(total_processed)
551}
552
553pub async fn enqueue_dream_jobs(
554 repo: &MemoryRepository,
555 namespace_id: i64,
556 perspective: Option<&PerspectiveKey>,
557 session_key: Option<&str>,
558 reflect_reason: &str,
559 digest_reason: &str,
560) -> Result<usize, AgentError> {
561 let mut queued = 0usize;
562
563 if let Some(perspective) = perspective {
564 let perspective_json = serde_json::to_value(perspective)
565 .map_err(|error| AgentError::Reflection(error.to_string()))?;
566 let payload = json!({
567 "reason": reflect_reason,
568 "session_key": perspective.session_key,
569 });
570 if enqueue_job_if_absent(
571 repo,
572 EnqueueJobParams {
573 namespace_id,
574 job_type: REFLECT_PERSPECTIVE_JOB,
575 priority: 100,
576 perspective: Some(&perspective_json),
577 payload: &payload,
578 },
579 )
580 .await?
581 {
582 queued += 1;
583 }
584 } else {
585 let payload = json!({
586 "reason": reflect_reason,
587 "session_key": session_key,
588 });
589 if enqueue_job_if_absent(
590 repo,
591 EnqueueJobParams {
592 namespace_id,
593 job_type: REFLECT_NAMESPACE_JOB,
594 priority: 100,
595 perspective: None,
596 payload: &payload,
597 },
598 )
599 .await?
600 {
601 queued += 1;
602 }
603 }
604
605 if let Some(session_key) = session_key {
606 let payload = json!({
607 "session_key": session_key,
608 "reason": digest_reason,
609 });
610 if enqueue_job_if_absent(
611 repo,
612 EnqueueJobParams {
613 namespace_id,
614 job_type: DIGEST_SESSION_JOB,
615 priority: 110,
616 perspective: None,
617 payload: &payload,
618 },
619 )
620 .await?
621 {
622 queued += 1;
623 }
624 }
625
626 Ok(queued)
627}
628
629async fn enqueue_digest_job_if_absent(
630 repo: &MemoryRepository,
631 namespace_id: i64,
632 session_key: &str,
633 digest_reason: &str,
634) -> Result<bool, AgentError> {
635 let payload = json!({
636 "session_key": session_key,
637 "reason": digest_reason,
638 });
639 enqueue_job_if_absent(
640 repo,
641 EnqueueJobParams {
642 namespace_id,
643 job_type: DIGEST_SESSION_JOB,
644 priority: 110,
645 perspective: None,
646 payload: &payload,
647 },
648 )
649 .await
650}
651
652async fn enqueue_job_if_absent(
653 repo: &MemoryRepository,
654 params: EnqueueJobParams<'_>,
655) -> Result<bool, AgentError> {
656 for status in [memory_job_status::PENDING, memory_job_status::RUNNING] {
657 let jobs = repo
658 .list_jobs(
659 params.namespace_id,
660 Some(params.job_type),
661 Some(status),
662 64,
663 0,
664 )
665 .await
666 .map_err(|error| AgentError::Storage(error.to_string()))?;
667 if jobs
668 .iter()
669 .any(|row| queued_job_matches(row, params.perspective, params.payload))
670 {
671 return Ok(false);
672 }
673 }
674
675 repo.enqueue_job(params)
676 .await
677 .map_err(|error| AgentError::Storage(error.to_string()))?;
678 Ok(true)
679}
680
681fn queued_job_matches(
682 row: &MemoryJobRow,
683 perspective: Option<&serde_json::Value>,
684 payload: &serde_json::Value,
685) -> bool {
686 let row_payload: serde_json::Value = match serde_json::from_str(&row.payload_json) {
687 Ok(value) => value,
688 Err(_) => return false,
689 };
690 if &row_payload != payload {
691 return false;
692 }
693
694 match (&row.perspective_json, perspective) {
695 (None, None) => true,
696 (Some(existing), Some(expected)) => serde_json::from_str::<serde_json::Value>(existing)
697 .map(|value| value == *expected)
698 .unwrap_or(false),
699 _ => false,
700 }
701}
702
703pub async fn run_dream_cycle(
704 pool: sqlx::SqlitePool,
705 cognition: &CognitionConfig,
706 agent: &AgentConfig,
707 llm: Arc<dyn LlmClient>,
708 embeddings: Option<Arc<dyn EmbeddingService>>,
709 request: DreamCycleRequest<'_>,
710) -> Result<usize, AgentError> {
711 let repo = MemoryRepository::new(pool.clone());
712 let total_started = Instant::now();
713 let mut metrics = Vec::new();
714 let enqueue_started = Instant::now();
715 enqueue_dream_jobs(
716 &repo,
717 request.namespace_id,
718 request.perspective,
719 request.session_key,
720 request.reflect_reason,
721 request.digest_reason,
722 )
723 .await?;
724 metrics.push(stage_metric_sample(
725 request.namespace_id,
726 "cognition.dream.enqueue_ms",
727 enqueue_started.elapsed().as_secs_f64() * 1000.0,
728 "enqueue",
729 ));
730 let drain_started = Instant::now();
731 let processed = drain_cognition_jobs(
732 pool,
733 request.namespace_id,
734 cognition,
735 agent,
736 llm,
737 embeddings,
738 request.lease_owner,
739 )
740 .await?;
741 metrics.push(stage_metric_sample(
742 request.namespace_id,
743 "cognition.dream.drain_ms",
744 drain_started.elapsed().as_secs_f64() * 1000.0,
745 "drain",
746 ));
747 metrics.push(stage_metric_sample(
748 request.namespace_id,
749 "cognition.dream.total_ms",
750 total_started.elapsed().as_secs_f64() * 1000.0,
751 "total",
752 ));
753 flush_metric_samples(&repo, &metrics).await;
754 Ok(processed)
755}
756
757async fn collect_dream_signals(
758 repo: &MemoryRepository,
759 namespace_id: i64,
760 session_key: &str,
761) -> Result<DreamSignals, AgentError> {
762 let memories = repo
763 .list_filtered(
764 namespace_id,
765 nexus_storage::repository::ListMemoryFilters {
766 category: None,
767 since: None,
768 until: None,
769 content_like: None,
770 include_raw: true,
771 limit: 256,
772 offset: 0,
773 },
774 )
775 .await
776 .map_err(|error| AgentError::Storage(error.to_string()))?;
777 let has_digest_gap = repo
778 .count_digests(namespace_id, Some(session_key))
779 .await
780 .map_err(|error| AgentError::Storage(error.to_string()))?
781 == 0;
782
783 let mut signals = DreamSignals {
784 has_digest_gap,
785 ..DreamSignals::default()
786 };
787 for memory in memories
788 .iter()
789 .filter(|memory| memory_matches_session_key(memory, session_key))
790 {
791 if memory.labels.iter().any(|label| label == "raw-activity")
792 || memory.metadata.get("raw_activity").is_some()
793 {
794 signals.raw_event_count += 1;
795 }
796 if let Some(level) =
797 CognitiveMetadata::from_metadata(&memory.metadata).map(|meta| meta.level)
798 {
799 match level {
800 CognitiveLevel::Explicit => {
801 signals.explicit_count += 1;
802 signals.total_non_raw_count += 1;
803 }
804 CognitiveLevel::Derived => {
805 signals.derived_count += 1;
806 signals.total_non_raw_count += 1;
807 }
808 CognitiveLevel::Contradiction => {
809 signals.contradiction_count += 1;
810 signals.total_non_raw_count += 1;
811 }
812 _ => {}
813 }
814 }
815 }
816 signals.contradiction_density = if signals.total_non_raw_count > 0 {
817 signals.contradiction_count as f32 / signals.total_non_raw_count as f32
818 } else {
819 0.0
820 };
821 Ok(signals)
822}
823
824fn memory_matches_session_key(memory: &Memory, session_key: &str) -> bool {
825 let metadata = &memory.metadata;
826 let matches_value = |value: Option<&serde_json::Value>| {
827 value.and_then(serde_json::Value::as_str) == Some(session_key)
828 };
829
830 if matches_value(metadata.pointer("/cognitive/session_key"))
831 || matches_value(metadata.pointer("/raw_activity/derived_session_key"))
832 || matches_value(metadata.pointer("/runtime/derived_session_key"))
833 || matches_value(metadata.pointer("/runtime/session_key"))
834 {
835 return true;
836 }
837
838 for pointer in [
839 "/cognitive/session_keys",
840 "/source/derived_session_keys",
841 "/raw_activity/derived_session_keys",
842 ] {
843 if metadata
844 .pointer(pointer)
845 .and_then(serde_json::Value::as_array)
846 .is_some_and(|values| {
847 values
848 .iter()
849 .any(|value| value.as_str() == Some(session_key))
850 })
851 {
852 return true;
853 }
854 }
855
856 false
857}
858
859fn choose_dream_schedule(
860 cognition: &CognitionConfig,
861 signals: &DreamSignals,
862 reason: RuntimeShutdownReason,
863) -> DreamSchedulePlan {
864 if signals.raw_event_count == 0
865 && signals.explicit_count == 0
866 && signals.derived_count == 0
867 && signals.contradiction_count == 0
868 {
869 return DreamSchedulePlan {
870 action: DreamScheduleAction::Skip,
871 reason: "no session cognition signal",
872 };
873 }
874
875 if signals.contradiction_count > 0
877 && signals.total_non_raw_count > 0
878 && signals.contradiction_density > 0.20
879 {
880 return DreamSchedulePlan {
881 action: DreamScheduleAction::ImmediateBounded,
882 reason: "high contradiction density requires immediate reconciliation",
883 };
884 }
885
886 if matches!(reason, RuntimeShutdownReason::SessionEnded)
888 && signals.contradiction_density > 0.05
889 && signals.contradiction_density <= 0.20
890 {
891 return DreamSchedulePlan {
892 action: DreamScheduleAction::ImmediateBounded,
893 reason: "moderate contradiction density at session end warrants reconciliation",
894 };
895 }
896
897 if signals.contradiction_count > 0 && signals.contradiction_density <= 0.05 {
898 return DreamSchedulePlan {
899 action: DreamScheduleAction::ImmediateBounded,
900 reason: "contradictions require immediate reconciliation",
901 };
902 }
903
904 if signals.raw_event_count >= cognition.activity_distill_min_events {
905 return DreamSchedulePlan {
906 action: DreamScheduleAction::ImmediateBounded,
907 reason: "high session activity warrants immediate dream pass",
908 };
909 }
910
911 if matches!(reason, RuntimeShutdownReason::SessionEnded)
912 && signals.explicit_count >= 2
913 && signals.derived_count == 0
914 {
915 return DreamSchedulePlan {
916 action: DreamScheduleAction::ImmediateBounded,
917 reason: "session end flushes unreconciled explicit observations immediately",
918 };
919 }
920
921 if signals.has_digest_gap
922 && signals.explicit_count == 0
923 && signals.derived_count == 0
924 && signals.contradiction_count == 0
925 && signals.raw_event_count <= (cognition.activity_distill_min_events / 2).max(1)
926 {
927 return DreamSchedulePlan {
928 action: DreamScheduleAction::DigestOnly,
929 reason: "light session activity with uncovered digest window",
930 };
931 }
932
933 if matches!(reason, RuntimeShutdownReason::IdleTimeout)
934 && (signals.raw_event_count > 0
935 || signals.explicit_count >= 2
936 || (signals.explicit_count > 0 && signals.derived_count == 0))
937 {
938 return DreamSchedulePlan {
939 action: DreamScheduleAction::DelayedEnqueue,
940 reason: "idle timeout defers medium-signal dreaming to background jobs",
941 };
942 }
943
944 if signals.explicit_count >= 2 && signals.derived_count == 0 {
945 return DreamSchedulePlan {
946 action: DreamScheduleAction::DelayedEnqueue,
947 reason: "explicit observations suggest deferred reflection opportunity",
948 };
949 }
950
951 DreamSchedulePlan {
952 action: DreamScheduleAction::Skip,
953 reason: "insufficient signal for additional dream work",
954 }
955}
956
957async fn store_runtime_marker(
958 memory_repo: &MemoryRepository,
959 namespace_id: i64,
960 marker: RuntimeMarker<'_>,
961) -> Result<(), AgentError> {
962 let session_tag = derive_session_key(marker.agent_type, marker.session_key, marker.cwd);
963 let content = format!(
964 "Runtime {} for {} [session:{}] ({})",
965 marker.event.replace('_', " "),
966 marker.agent_type,
967 session_tag,
968 marker.detail
969 );
970 let metadata = json!({
971 "runtime": {
972 "event": marker.event,
973 "detail": marker.detail,
974 "session_key": marker.session_key,
975 "derived_session_key": session_tag,
976 "cwd": marker.cwd,
977 "agent_type": marker.agent_type,
978 "agent_namespace": marker.agent_namespace,
979 "captured_at": Utc::now(),
980 }
981 });
982 let mut cognitive = CognitiveMetadata::new(
983 CognitiveLevel::Explicit,
984 marker.agent_type,
985 marker.agent_type,
986 Some(session_tag.clone()),
987 "runtime_controller",
988 );
989 cognitive.confidence = Some(1.0);
990 let metadata = cognitive.merge_into(&metadata);
991
992 memory_repo
993 .store(StoreMemoryParams {
994 namespace_id,
995 content: &content,
996 category: &MemoryCategory::Session,
997 memory_lane_type: None,
998 labels: &[
999 "runtime".to_string(),
1000 "session".to_string(),
1001 marker.event.to_string(),
1002 ],
1003 metadata: &metadata,
1004 embedding: None,
1005 embedding_model: None,
1006 })
1007 .await
1008 .map_err(|e| AgentError::Storage(e.to_string()))?;
1009
1010 Ok(())
1011}
1012
1013fn read_runtime_state(path: &Path) -> Result<Option<RuntimeState>, AgentError> {
1014 if !path.exists() {
1015 return Ok(None);
1016 }
1017
1018 let contents = std::fs::read_to_string(path)?;
1019 let state =
1020 serde_json::from_str(&contents).map_err(|e| AgentError::Supervisor(e.to_string()))?;
1021 Ok(Some(state))
1022}
1023
1024fn write_runtime_state(path: &Path, state: &RuntimeState) -> Result<(), AgentError> {
1025 let contents =
1026 serde_json::to_string_pretty(state).map_err(|e| AgentError::Supervisor(e.to_string()))?;
1027 std::fs::write(path, contents)?;
1028 Ok(())
1029}
1030
1031pub fn derive_session_key(
1032 agent_type: &str,
1033 session_key: Option<&str>,
1034 cwd: Option<&str>,
1035) -> String {
1036 if let Some(value) = session_key.filter(|value| !value.trim().is_empty()) {
1037 return value.to_string();
1038 }
1039
1040 let canonical_agent = nexus_core::canonicalize_agent_type(agent_type);
1041 let fallback_scope = cwd
1042 .filter(|value| !value.trim().is_empty())
1043 .map(nexus_core::normalize_project_path)
1044 .unwrap_or_else(|| "unknown-cwd".to_string());
1045 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1046 canonical_agent.hash(&mut hasher);
1047 fallback_scope.hash(&mut hasher);
1048 format!("derived-{:016x}", hasher.finish())
1049}
1050
1051fn sanitize_component(value: &str) -> String {
1052 value
1053 .chars()
1054 .map(|ch| {
1055 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
1056 ch
1057 } else {
1058 '_'
1059 }
1060 })
1061 .collect()
1062}
1063
1064fn runtime_reason_label(reason: RuntimeShutdownReason) -> &'static str {
1065 match reason {
1066 RuntimeShutdownReason::SessionEnded => "session-ended",
1067 RuntimeShutdownReason::IdleTimeout => "idle-timeout",
1068 RuntimeShutdownReason::Manual => "manual",
1069 }
1070}
1071
1072async fn namespace_id_for(agent_type: &str, storage: &StorageManager) -> Result<i64, AgentError> {
1073 let canonical = nexus_core::canonicalize_agent_type(agent_type);
1074 let namespace_repo = NamespaceRepository::new(storage.pool().clone());
1075 namespace_repo
1076 .get_or_create(&canonical, &canonical)
1077 .await
1078 .map(|namespace| namespace.id)
1079 .map_err(|error| AgentError::Storage(error.to_string()))
1080}
1081
1082async fn process_derive_jobs(
1083 repo: &MemoryRepository,
1084 namespace_id: i64,
1085 cognition: &CognitionConfig,
1086 agent: &AgentConfig,
1087 llm: Arc<dyn LlmClient>,
1088 embeddings: Option<Arc<dyn EmbeddingService>>,
1089 lease_owner: &str,
1090) -> Result<usize, AgentError> {
1091 let jobs = repo
1092 .claim_jobs(
1093 namespace_id,
1094 DERIVE_MEMORY_JOB,
1095 lease_owner,
1096 cognition.lease_ttl_secs,
1097 cognition.max_job_batch as i64,
1098 )
1099 .await
1100 .map_err(|error| AgentError::Storage(error.to_string()))?;
1101
1102 let service = DeriveService::new(agent.clone(), llm, embeddings);
1103 let mut processed = 0usize;
1104 for job in jobs {
1105 let memory_id = job
1106 .payload
1107 .get("memory_id")
1108 .and_then(serde_json::Value::as_i64);
1109 let outcome = async {
1110 let memory_id = memory_id.ok_or_else(|| {
1111 AgentError::Derivation("derive job missing memory_id".to_string())
1112 })?;
1113 let memory = repo
1114 .get_by_id(memory_id)
1115 .await
1116 .map_err(|error| AgentError::Storage(error.to_string()))?
1117 .ok_or_else(|| {
1118 AgentError::Derivation(format!("derive source memory {memory_id} not found"))
1119 })?;
1120 service
1121 .derive_memory_with_perspective(&memory, job.perspective.as_ref(), repo)
1122 .await
1123 .map(|_| ())
1124 }
1125 .await;
1126
1127 match outcome {
1128 Ok(()) => {
1129 repo.complete_job(&job)
1130 .await
1131 .map_err(|error| AgentError::Storage(error.to_string()))?;
1132 processed += 1;
1133 }
1134 Err(error) => {
1135 repo.fail_job(&job, &error.to_string())
1136 .await
1137 .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
1138 }
1139 }
1140 }
1141
1142 Ok(processed)
1143}
1144
1145async fn process_reflect_jobs(
1146 repo: &MemoryRepository,
1147 namespace_id: i64,
1148 cognition: &CognitionConfig,
1149 agent: &AgentConfig,
1150 embeddings: Option<Arc<dyn EmbeddingService>>,
1151 lease_owner: &str,
1152) -> Result<usize, AgentError> {
1153 let jobs = repo
1154 .claim_jobs(
1155 namespace_id,
1156 REFLECT_PERSPECTIVE_JOB,
1157 lease_owner,
1158 cognition.lease_ttl_secs,
1159 cognition.max_job_batch as i64,
1160 )
1161 .await
1162 .map_err(|error| AgentError::Storage(error.to_string()))?;
1163
1164 let service = ReflectService::new(agent.clone(), cognition.clone(), embeddings.clone());
1165 let mut processed = 0usize;
1166 for job in jobs {
1167 let outcome = async {
1168 let perspective = job.perspective.as_ref().ok_or_else(|| {
1169 AgentError::Reflection("reflect job missing perspective".to_string())
1170 })?;
1171 service
1172 .reflect_perspective_cycle(namespace_id, perspective, repo)
1173 .await
1174 .map(|_| ())
1175 }
1176 .await;
1177
1178 match outcome {
1179 Ok(()) => {
1180 repo.complete_job(&job)
1181 .await
1182 .map_err(|error| AgentError::Storage(error.to_string()))?;
1183 processed += 1;
1184 }
1185 Err(error) => {
1186 repo.fail_job(&job, &error.to_string())
1187 .await
1188 .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
1189 }
1190 }
1191 }
1192
1193 Ok(processed)
1194}
1195
1196async fn process_reflect_namespace_jobs(
1197 repo: &MemoryRepository,
1198 namespace_id: i64,
1199 cognition: &CognitionConfig,
1200 agent: &AgentConfig,
1201 embeddings: Option<Arc<dyn EmbeddingService>>,
1202 lease_owner: &str,
1203) -> Result<usize, AgentError> {
1204 let jobs = repo
1205 .claim_jobs(
1206 namespace_id,
1207 REFLECT_NAMESPACE_JOB,
1208 lease_owner,
1209 cognition.lease_ttl_secs,
1210 cognition.max_job_batch as i64,
1211 )
1212 .await
1213 .map_err(|error| AgentError::Storage(error.to_string()))?;
1214
1215 let service = ReflectService::new(agent.clone(), cognition.clone(), embeddings.clone());
1216 let mut processed = 0usize;
1217 for job in jobs {
1218 let outcome = service.reflect_cycle(namespace_id, repo).await.map(|_| ());
1219
1220 match outcome {
1221 Ok(()) => {
1222 repo.complete_job(&job)
1223 .await
1224 .map_err(|error| AgentError::Storage(error.to_string()))?;
1225 processed += 1;
1226 }
1227 Err(error) => {
1228 repo.fail_job(&job, &error.to_string())
1229 .await
1230 .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
1231 }
1232 }
1233 }
1234
1235 Ok(processed)
1236}
1237
1238async fn process_digest_jobs(
1239 repo: &MemoryRepository,
1240 namespace_id: i64,
1241 cognition: &CognitionConfig,
1242 agent: &AgentConfig,
1243 llm: Arc<dyn LlmClient>,
1244 embeddings: Option<Arc<dyn EmbeddingService>>,
1245 lease_owner: &str,
1246) -> Result<usize, AgentError> {
1247 let jobs = repo
1248 .claim_jobs(
1249 namespace_id,
1250 DIGEST_SESSION_JOB,
1251 lease_owner,
1252 cognition.lease_ttl_secs,
1253 cognition.max_job_batch as i64,
1254 )
1255 .await
1256 .map_err(|error| AgentError::Storage(error.to_string()))?;
1257
1258 let service = DigestService::new(agent.clone(), llm, embeddings);
1259 let mut processed = 0usize;
1260 let mut seen_sessions = HashSet::new();
1261 for job in jobs {
1262 let session_key = job
1263 .payload
1264 .get("session_key")
1265 .and_then(serde_json::Value::as_str)
1266 .map(ToString::to_string)
1267 .or_else(|| {
1268 job.perspective
1269 .as_ref()
1270 .and_then(|perspective| perspective.session_key.clone())
1271 })
1272 .ok_or_else(|| AgentError::Digest("digest job missing session_key".to_string()))?;
1273 let force = digest_job_is_forced(
1274 job.payload
1275 .get("reason")
1276 .and_then(serde_json::Value::as_str),
1277 );
1278
1279 if !seen_sessions.insert(session_key.clone()) {
1280 repo.complete_job(&job)
1281 .await
1282 .map_err(|error| AgentError::Storage(error.to_string()))?;
1283 processed += 1;
1284 continue;
1285 }
1286
1287 if !force
1288 && !should_run_incremental_digest(repo, namespace_id, &session_key, cognition).await?
1289 {
1290 debug!(
1291 namespace_id,
1292 session_key, "Skipping digest rollover below threshold"
1293 );
1294 repo.complete_job(&job)
1295 .await
1296 .map_err(|error| AgentError::Storage(error.to_string()))?;
1297 processed += 1;
1298 continue;
1299 }
1300
1301 let outcome = async {
1302 service
1303 .digest_session(namespace_id, &session_key, repo, force)
1304 .await
1305 .map(|_| ())
1306 }
1307 .await;
1308
1309 match outcome {
1310 Ok(()) => {
1311 repo.complete_job(&job)
1312 .await
1313 .map_err(|error| AgentError::Storage(error.to_string()))?;
1314 processed += 1;
1315 }
1316 Err(error) => {
1317 repo.fail_job(&job, &error.to_string())
1318 .await
1319 .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
1320 }
1321 }
1322 }
1323
1324 Ok(processed)
1325}
1326
1327fn digest_job_is_forced(reason: Option<&str>) -> bool {
1328 matches!(
1329 reason,
1330 Some("dream_digest" | "session_end" | "manual_digest" | "manual_rebuild")
1331 )
1332}
1333
1334async fn should_run_incremental_digest(
1335 repo: &MemoryRepository,
1336 namespace_id: i64,
1337 session_key: &str,
1338 cognition: &CognitionConfig,
1339) -> Result<bool, AgentError> {
1340 let rollover = repo
1341 .session_digest_rollover(namespace_id, session_key)
1342 .await
1343 .map_err(|error| AgentError::Storage(error.to_string()))?;
1344
1345 if rollover.last_digest_end_memory_id.is_none() {
1346 return Ok(true);
1347 }
1348
1349 Ok(
1350 rollover.new_memory_count >= cognition.activity_distill_min_events as i64
1351 || rollover.estimated_new_tokens >= cognition.digest_short_target_tokens as i64,
1352 )
1353}
1354
1355#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1356struct DistilledSession {
1357 summary: String,
1358 category: String,
1359 labels: Vec<String>,
1360 key_activities: Vec<String>,
1361 files_touched: Vec<String>,
1362 tools_used: Vec<String>,
1363 decisions_made: Vec<String>,
1364}
1365
1366#[derive(Debug, Clone)]
1367struct DistillEvent {
1368 memory_id: i64,
1369 created_at: DateTime<Utc>,
1370 session_key: String,
1371 event_name: String,
1372 cwd: Option<String>,
1373 raw_payload: serde_json::Value,
1374}
1375
1376async fn process_activity_distill_jobs(
1377 repo: &MemoryRepository,
1378 namespace_id: i64,
1379 cognition: &CognitionConfig,
1380 llm: Arc<dyn LlmClient>,
1381 lease_owner: &str,
1382) -> Result<usize, AgentError> {
1383 let jobs = repo
1384 .claim_jobs(
1385 namespace_id,
1386 ACTIVITY_DISTILL_JOB,
1387 lease_owner,
1388 cognition.lease_ttl_secs,
1389 cognition.max_job_batch as i64,
1390 )
1391 .await
1392 .map_err(|error| AgentError::Storage(error.to_string()))?;
1393
1394 let mut processed = 0usize;
1395 let mut seen_sessions = HashSet::new();
1396 for job in jobs {
1397 let session_key = job
1398 .payload
1399 .get("session_key")
1400 .and_then(serde_json::Value::as_str)
1401 .map(ToString::to_string)
1402 .or_else(|| {
1403 job.perspective
1404 .as_ref()
1405 .and_then(|perspective| perspective.session_key.clone())
1406 })
1407 .ok_or_else(|| {
1408 AgentError::Ingest("activity_distill job missing session_key".to_string())
1409 })?;
1410 let agent_name = job
1411 .payload
1412 .get("agent")
1413 .and_then(serde_json::Value::as_str)
1414 .unwrap_or("unknown-agent")
1415 .to_string();
1416
1417 if !seen_sessions.insert(session_key.clone()) {
1418 repo.complete_job(&job)
1419 .await
1420 .map_err(|error| AgentError::Storage(error.to_string()))?;
1421 continue;
1422 }
1423
1424 match distill_raw_activity_session(
1425 repo,
1426 namespace_id,
1427 &agent_name,
1428 &session_key,
1429 cognition,
1430 llm.clone(),
1431 )
1432 .await
1433 {
1434 Ok(_) => {
1435 repo.complete_job(&job)
1436 .await
1437 .map_err(|error| AgentError::Storage(error.to_string()))?;
1438 processed += 1;
1439 }
1440 Err(error) => {
1441 repo.fail_job(&job, &error.to_string())
1442 .await
1443 .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
1444 }
1445 }
1446 }
1447
1448 Ok(processed)
1449}
1450
1451async fn distill_raw_activity_session(
1452 repo: &MemoryRepository,
1453 namespace_id: i64,
1454 agent: &str,
1455 session_key: &str,
1456 cognition: &CognitionConfig,
1457 llm: Arc<dyn LlmClient>,
1458) -> Result<Option<i64>, AgentError> {
1459 let events: Vec<DistillEvent> = repo
1460 .list_by_session_key(
1461 namespace_id,
1462 session_key,
1463 cognition.activity_distill_max_events as i64,
1464 true,
1465 )
1466 .await
1467 .map_err(|error| AgentError::Storage(error.to_string()))?
1468 .into_iter()
1469 .filter_map(distill_event_from_memory)
1470 .collect();
1471
1472 if events.len() < cognition.activity_distill_min_events {
1473 return Ok(None);
1474 }
1475
1476 let event_summaries: Vec<String> = events.iter().map(summarize_distill_event).collect();
1477 let source_ids: Vec<i64> = events.iter().map(|event| event.memory_id).collect();
1478 let distilled = distill_with_llm(&llm, session_key, event_summaries.as_slice())
1479 .await
1480 .unwrap_or_else(|_| fallback_distilled_session(&events));
1481
1482 let category = MemoryCategory::parse(&distilled.category).unwrap_or(MemoryCategory::Session);
1483 let lane_type = MemoryLaneType::Cognitive(MemoryLaneCognitiveType::Explicit);
1484 let cognitive = build_distill_cognitive_metadata(agent, session_key, &source_ids);
1485 let metadata = cognitive.merge_into(&serde_json::json!({
1486 "distilled_from": events.len(),
1487 "session_id": session_key,
1488 "key_activities": distilled.key_activities,
1489 "files_touched": distilled.files_touched,
1490 "tools_used": distilled.tools_used,
1491 "decisions_made": distilled.decisions_made,
1492 "pipeline": "distill-v1",
1493 }));
1494
1495 let memory = repo
1496 .store_distilled_summary(
1497 StoreMemoryParams {
1498 namespace_id,
1499 content: &distilled.summary,
1500 category: &category,
1501 memory_lane_type: Some(&lane_type),
1502 labels: &distilled.labels,
1503 metadata: &metadata,
1504 embedding: None,
1505 embedding_model: None,
1506 },
1507 &source_ids,
1508 )
1509 .await
1510 .map_err(|error| AgentError::Storage(error.to_string()))?;
1511
1512 Ok(Some(memory.id))
1513}
1514
1515async fn distill_with_llm(
1516 llm: &Arc<dyn LlmClient>,
1517 session_key: &str,
1518 event_summaries: &[String],
1519) -> Result<DistilledSession, AgentError> {
1520 let user_prompt = format!(
1521 "Session ID: {}\nEvent count: {}\n\nEvents:\n{}",
1522 session_key,
1523 event_summaries.len(),
1524 event_summaries.join("\n")
1525 );
1526 llm.generate_json(GenerateParams {
1527 messages: vec![
1528 ChatMessage::system(ACTIVITY_DISTILL_SYSTEM_PROMPT),
1529 ChatMessage::user(user_prompt),
1530 ],
1531 max_tokens: 2048,
1532 temperature: 0.3,
1533 json_mode: true,
1534 })
1535 .await
1536 .map_err(|error| AgentError::Llm(error.to_string()))
1537}
1538
1539fn distill_event_from_memory(memory: Memory) -> Option<DistillEvent> {
1540 let raw_activity = memory.metadata.get("raw_activity")?;
1541 if !raw_activity
1542 .get("distill_pending")
1543 .and_then(serde_json::Value::as_bool)
1544 .unwrap_or(false)
1545 {
1546 return None;
1547 }
1548
1549 let session_key = raw_activity
1550 .get("derived_session_key")
1551 .and_then(serde_json::Value::as_str)
1552 .or_else(|| {
1553 memory
1554 .metadata
1555 .pointer("/cognitive/session_key")
1556 .and_then(serde_json::Value::as_str)
1557 })?
1558 .to_string();
1559 let event_name = raw_activity
1560 .get("event_name")
1561 .and_then(serde_json::Value::as_str)
1562 .unwrap_or("hook_event")
1563 .to_string();
1564 let cwd = raw_activity
1565 .get("cwd")
1566 .and_then(serde_json::Value::as_str)
1567 .map(ToString::to_string);
1568 let raw_payload = memory
1569 .metadata
1570 .get("raw_payload")
1571 .cloned()
1572 .unwrap_or(serde_json::Value::Null);
1573
1574 Some(DistillEvent {
1575 memory_id: memory.id,
1576 created_at: memory.created_at,
1577 session_key,
1578 event_name,
1579 cwd,
1580 raw_payload,
1581 })
1582}
1583
1584fn summarize_distill_event(event: &DistillEvent) -> String {
1585 let ts = event
1586 .raw_payload
1587 .get("timestamp")
1588 .and_then(serde_json::Value::as_str)
1589 .map(ToOwned::to_owned)
1590 .unwrap_or_else(|| event.created_at.to_rfc3339());
1591 let event_type = event
1592 .raw_payload
1593 .get("event")
1594 .or_else(|| event.raw_payload.get("hook_event_name"))
1595 .and_then(serde_json::Value::as_str)
1596 .unwrap_or(&event.event_name);
1597 let tool = event
1598 .raw_payload
1599 .get("tool")
1600 .or_else(|| event.raw_payload.get("tool_name"))
1601 .or_else(|| event.raw_payload.get("toolName"))
1602 .and_then(serde_json::Value::as_str)
1603 .unwrap_or("-");
1604 let cwd = event
1605 .raw_payload
1606 .get("cwd")
1607 .or_else(|| event.raw_payload.get("working_directory"))
1608 .and_then(serde_json::Value::as_str)
1609 .or(event.cwd.as_deref())
1610 .unwrap_or("-");
1611 format!("{ts} | {event_type} | tool={tool} | cwd={cwd}")
1612}
1613
1614fn fallback_distilled_session(events: &[DistillEvent]) -> DistilledSession {
1615 let mut tools = BTreeSet::new();
1616 let mut workspaces = BTreeSet::new();
1617 let mut activities = Vec::new();
1618
1619 for event in events {
1620 if let Some(tool) = event
1621 .raw_payload
1622 .get("tool")
1623 .or_else(|| event.raw_payload.get("tool_name"))
1624 .or_else(|| event.raw_payload.get("toolName"))
1625 .and_then(serde_json::Value::as_str)
1626 {
1627 tools.insert(tool.to_string());
1628 }
1629 if let Some(cwd) = event.cwd.as_deref() {
1630 workspaces.insert(cwd.to_string());
1631 }
1632 activities.push(event.event_name.clone());
1633 }
1634
1635 DistilledSession {
1636 summary: format!(
1637 "Session {} produced {} low-signal hook events across {} tool(s), primarily in {}.",
1638 events
1639 .first()
1640 .map(|event| event.session_key.as_str())
1641 .unwrap_or("unknown-session"),
1642 events.len(),
1643 tools.len(),
1644 workspaces
1645 .iter()
1646 .next()
1647 .cloned()
1648 .unwrap_or_else(|| "an unknown workspace".to_string())
1649 ),
1650 category: "session".to_string(),
1651 labels: vec!["activity-summary".to_string(), "auto-distilled".to_string()],
1652 key_activities: activities.into_iter().take(5).collect(),
1653 files_touched: workspaces.into_iter().collect(),
1654 tools_used: tools.into_iter().collect(),
1655 decisions_made: Vec::new(),
1656 }
1657}
1658
1659pub async fn create_embedding_service(
1665 config: &nexus_core::Config,
1666) -> Option<Arc<dyn EmbeddingService>> {
1667 if !config.embedding.enabled {
1668 return None;
1669 }
1670 match nexus_embeddings::create_service(config).await {
1671 Ok(Some(service)) => Some(service),
1672 Ok(None) => None,
1673 Err(error) => {
1674 warn!(
1675 %error,
1676 "Failed to initialize embedding service; configured LLM features remain available and cognition will run without semantic embeddings. Configure a remote embedding provider, local OpenAI-compatible runtime, or set NEXUS_EMBEDDINGS_ENABLED=false"
1677 );
1678 None
1679 }
1680 }
1681}
1682
1683fn build_distill_cognitive_metadata(
1684 agent: &str,
1685 session_key: &str,
1686 source_memory_ids: &[i64],
1687) -> CognitiveMetadata {
1688 let perspective = infer_perspective(
1689 PerspectiveSource::Digest,
1690 agent,
1691 None::<String>,
1692 Some(session_key.to_string()),
1693 );
1694 let mut cognitive = CognitiveMetadata::new(
1695 CognitiveLevel::SummaryShort,
1696 perspective.observer,
1697 perspective.subject,
1698 perspective.session_key,
1699 "nexus:distill-v1",
1700 );
1701 cognitive.source_memory_ids = source_memory_ids.to_vec();
1702 cognitive.confidence = Some(0.8);
1703 cognitive
1704}
1705
1706fn runtime_llm_client() -> Arc<dyn LlmClient> {
1707 match create_client_auto_with_fallback() {
1708 Ok(client) => client,
1709 Err(error) => {
1710 warn!(%error, "LLM unavailable, using deterministic cognition fallbacks");
1711 Arc::new(UnavailableLlmClient {
1712 message: error.to_string(),
1713 })
1714 }
1715 }
1716}
1717
1718struct UnavailableLlmClient {
1719 message: String,
1720}
1721
1722#[async_trait]
1723impl LlmClient for UnavailableLlmClient {
1724 async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
1725 Err(nexus_llm::LlmError::InvalidJsonResponse(
1726 self.message.clone(),
1727 ))
1728 }
1729
1730 fn provider_name(&self) -> String {
1731 "unavailable".to_string()
1732 }
1733
1734 fn model_name(&self) -> String {
1735 "deterministic-fallback".to_string()
1736 }
1737}
1738
1739#[cfg(test)]
1740mod tests {
1741 use super::*;
1742 use sqlx::sqlite::SqlitePoolOptions;
1743
1744 #[tokio::test]
1745 async fn run_dream_cycle_processes_namespace_jobs() {
1746 let pool = SqlitePoolOptions::new()
1747 .max_connections(1)
1748 .connect("sqlite::memory:")
1749 .await
1750 .unwrap();
1751 nexus_storage::migrations::run_migrations(&pool)
1752 .await
1753 .unwrap();
1754
1755 let namespace_repo = NamespaceRepository::new(pool.clone());
1756 let namespace = namespace_repo
1757 .get_or_create("runtime-dream-test", "runtime-dream-test")
1758 .await
1759 .unwrap();
1760 let repo = MemoryRepository::new(pool.clone());
1761
1762 for content in ["feature enabled", "feature not enabled"] {
1763 repo.store(StoreMemoryParams {
1764 namespace_id: namespace.id,
1765 content,
1766 category: &MemoryCategory::Facts,
1767 memory_lane_type: None,
1768 labels: &[],
1769 metadata: &json!({
1770 "cognitive": {
1771 "level": "explicit",
1772 "observer": "claude-code",
1773 "subject": "claude-code",
1774 "generated_by": "test"
1775 }
1776 }),
1777 embedding: None,
1778 embedding_model: None,
1779 })
1780 .await
1781 .unwrap();
1782 }
1783
1784 let processed = run_dream_cycle(
1785 pool.clone(),
1786 &CognitionConfig::default(),
1787 &AgentConfig::default(),
1788 Arc::new(UnavailableLlmClient {
1789 message: "offline".to_string(),
1790 }),
1791 None,
1792 DreamCycleRequest {
1793 namespace_id: namespace.id,
1794 lease_owner: "test-owner",
1795 perspective: None,
1796 session_key: None,
1797 reflect_reason: "namespace_dream",
1798 digest_reason: "dream_digest",
1799 },
1800 )
1801 .await
1802 .unwrap();
1803
1804 assert!(processed >= 1);
1805 assert_eq!(
1806 repo.get_by_cognitive_level(namespace.id, CognitiveLevel::Contradiction, 10)
1807 .await
1808 .unwrap()
1809 .len(),
1810 1
1811 );
1812 assert!(repo
1813 .list_jobs(
1814 namespace.id,
1815 Some(REFLECT_NAMESPACE_JOB),
1816 Some("pending"),
1817 10,
1818 0
1819 )
1820 .await
1821 .unwrap()
1822 .is_empty());
1823 }
1824
1825 #[tokio::test]
1826 async fn process_digest_jobs_skips_below_rollover_threshold() {
1827 let pool = SqlitePoolOptions::new()
1828 .max_connections(1)
1829 .connect("sqlite::memory:")
1830 .await
1831 .unwrap();
1832 nexus_storage::migrations::run_migrations(&pool)
1833 .await
1834 .unwrap();
1835
1836 let namespace_repo = NamespaceRepository::new(pool.clone());
1837 let namespace = namespace_repo
1838 .get_or_create("runtime-digest-skip", "runtime-digest-skip")
1839 .await
1840 .unwrap();
1841 let repo = MemoryRepository::new(pool.clone());
1842
1843 let source = repo
1844 .store(StoreMemoryParams {
1845 namespace_id: namespace.id,
1846 content: "Small explicit update.",
1847 category: &MemoryCategory::Session,
1848 memory_lane_type: None,
1849 labels: &[],
1850 metadata: &serde_json::json!({
1851 "cognitive": {
1852 "level": "explicit",
1853 "observer": "claude-code",
1854 "subject": "claude-code",
1855 "session_key": "digest-skip-session"
1856 }
1857 }),
1858 embedding: None,
1859 embedding_model: None,
1860 })
1861 .await
1862 .unwrap();
1863
1864 let prior_digest = repo
1865 .store(StoreMemoryParams {
1866 namespace_id: namespace.id,
1867 content: "Prior digest",
1868 category: &MemoryCategory::Session,
1869 memory_lane_type: None,
1870 labels: &[],
1871 metadata: &serde_json::json!({
1872 "cognitive": {
1873 "level": "summary_short",
1874 "observer": "claude-code",
1875 "subject": "claude-code",
1876 "session_key": "digest-skip-session"
1877 }
1878 }),
1879 embedding: None,
1880 embedding_model: None,
1881 })
1882 .await
1883 .unwrap();
1884
1885 repo.store_digest(nexus_storage::repository::StoreDigestParams {
1886 namespace_id: namespace.id,
1887 session_key: "digest-skip-session",
1888 digest_kind: "short",
1889 memory_id: prior_digest.id,
1890 start_memory_id: Some(source.id),
1891 end_memory_id: Some(source.id),
1892 token_count: 12,
1893 })
1894 .await
1895 .unwrap();
1896
1897 let follow_up = repo
1898 .store(StoreMemoryParams {
1899 namespace_id: namespace.id,
1900 content: "Tiny follow-up.",
1901 category: &MemoryCategory::Session,
1902 memory_lane_type: None,
1903 labels: &[],
1904 metadata: &serde_json::json!({
1905 "cognitive": {
1906 "level": "explicit",
1907 "observer": "claude-code",
1908 "subject": "claude-code",
1909 "session_key": "digest-skip-session"
1910 }
1911 }),
1912 embedding: None,
1913 embedding_model: None,
1914 })
1915 .await
1916 .unwrap();
1917
1918 repo.enqueue_job(nexus_storage::EnqueueJobParams {
1919 namespace_id: namespace.id,
1920 job_type: DIGEST_SESSION_JOB,
1921 priority: 90,
1922 perspective: None,
1923 payload: &serde_json::json!({
1924 "session_key": "digest-skip-session",
1925 "source_memory_id": follow_up.id
1926 }),
1927 })
1928 .await
1929 .unwrap();
1930
1931 assert_eq!(
1932 repo.list_jobs(
1933 namespace.id,
1934 Some(DIGEST_SESSION_JOB),
1935 Some("pending"),
1936 10,
1937 0
1938 )
1939 .await
1940 .unwrap()
1941 .len(),
1942 1
1943 );
1944
1945 let processed = process_digest_jobs(
1946 &repo,
1947 namespace.id,
1948 &CognitionConfig::default(),
1949 &AgentConfig::default(),
1950 Arc::new(UnavailableLlmClient {
1951 message: "offline".to_string(),
1952 }),
1953 None,
1954 "digest-skip",
1955 )
1956 .await
1957 .unwrap();
1958
1959 assert_eq!(processed, 1);
1960 let latest = repo
1961 .latest_digest_for_session(namespace.id, "digest-skip-session", "short")
1962 .await
1963 .unwrap()
1964 .unwrap();
1965 assert_eq!(latest.id, prior_digest.id);
1966 assert_eq!(
1967 repo.count_digests(namespace.id, Some("digest-skip-session"))
1968 .await
1969 .unwrap(),
1970 1
1971 );
1972 }
1973
1974 #[test]
1975 fn digest_job_force_reason_matches_expected_inputs() {
1976 assert!(digest_job_is_forced(Some("dream_digest")));
1977 assert!(digest_job_is_forced(Some("session_end")));
1978 assert!(digest_job_is_forced(Some("manual_digest")));
1979 assert!(digest_job_is_forced(Some("manual_rebuild")));
1980 assert!(!digest_job_is_forced(Some("derive_follow_up")));
1981 assert!(!digest_job_is_forced(None));
1982 }
1983
1984 #[test]
1985 fn sanitize_component_replaces_unsafe_chars() {
1986 assert_eq!(sanitize_component("claude/code:1"), "claude_code_1");
1987 }
1988
1989 #[test]
1990 fn derive_session_key_prefers_explicit_key() {
1991 assert_eq!(
1992 derive_session_key("claude-code", Some("abc"), Some("/tmp/project")),
1993 "abc"
1994 );
1995 }
1996
1997 #[test]
1998 fn derive_session_key_falls_back_to_stable_hash() {
1999 let first = derive_session_key("claude-code", None, Some("/tmp/project"));
2000 let second = derive_session_key("claude-code", Some(""), Some("/tmp/project"));
2001 assert_eq!(first, second);
2002 assert!(first.starts_with("derived-"));
2003 }
2004
2005 #[test]
2006 fn choose_dream_schedule_immediate_for_contradictions() {
2007 let plan = choose_dream_schedule(
2008 &CognitionConfig::default(),
2009 &DreamSignals {
2010 contradiction_count: 1,
2011 raw_event_count: 1,
2012 ..DreamSignals::default()
2013 },
2014 RuntimeShutdownReason::SessionEnded,
2015 );
2016
2017 assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
2018 }
2019
2020 #[test]
2021 fn choose_dream_schedule_digest_only_for_light_digest_gap() {
2022 let config = CognitionConfig::default();
2023 let plan = choose_dream_schedule(
2024 &config,
2025 &DreamSignals {
2026 raw_event_count: 1,
2027 has_digest_gap: true,
2028 ..DreamSignals::default()
2029 },
2030 RuntimeShutdownReason::SessionEnded,
2031 );
2032
2033 assert_eq!(plan.action, DreamScheduleAction::DigestOnly);
2034 }
2035
2036 #[test]
2037 fn choose_dream_schedule_delays_idle_medium_signal_sessions() {
2038 let plan = choose_dream_schedule(
2039 &CognitionConfig::default(),
2040 &DreamSignals {
2041 raw_event_count: 2,
2042 explicit_count: 2,
2043 derived_count: 0,
2044 ..DreamSignals::default()
2045 },
2046 RuntimeShutdownReason::IdleTimeout,
2047 );
2048
2049 assert_eq!(plan.action, DreamScheduleAction::DelayedEnqueue);
2050 }
2051
2052 #[test]
2053 fn choose_dream_schedule_session_end_flushes_explicit_reflection() {
2054 let plan = choose_dream_schedule(
2055 &CognitionConfig::default(),
2056 &DreamSignals {
2057 explicit_count: 2,
2058 has_digest_gap: true,
2059 ..DreamSignals::default()
2060 },
2061 RuntimeShutdownReason::SessionEnded,
2062 );
2063
2064 assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
2065 }
2066
2067 #[tokio::test]
2068 async fn collect_dream_signals_counts_session_signal_types() {
2069 let pool = SqlitePoolOptions::new()
2070 .max_connections(1)
2071 .connect("sqlite::memory:")
2072 .await
2073 .unwrap();
2074 nexus_storage::migrations::run_migrations(&pool)
2075 .await
2076 .unwrap();
2077
2078 let namespace_repo = NamespaceRepository::new(pool.clone());
2079 let namespace = namespace_repo
2080 .get_or_create("runtime-dream-signals", "runtime-dream-signals")
2081 .await
2082 .unwrap();
2083 let repo = MemoryRepository::new(pool.clone());
2084
2085 for (content, labels, metadata) in [
2086 (
2087 "tool event",
2088 vec!["raw-activity".to_string()],
2089 json!({
2090 "raw_activity": true,
2091 "cognitive": {
2092 "level": "raw",
2093 "observer": "claude-code",
2094 "subject": "claude-code",
2095 "session_key": "signals-session"
2096 }
2097 }),
2098 ),
2099 (
2100 "explicit note",
2101 vec![],
2102 json!({
2103 "cognitive": {
2104 "level": "explicit",
2105 "observer": "claude-code",
2106 "subject": "claude-code",
2107 "session_key": "signals-session"
2108 }
2109 }),
2110 ),
2111 (
2112 "derived insight",
2113 vec![],
2114 json!({
2115 "cognitive": {
2116 "level": "derived",
2117 "observer": "claude-code",
2118 "subject": "claude-code",
2119 "session_key": "signals-session"
2120 }
2121 }),
2122 ),
2123 (
2124 "contradiction record",
2125 vec![],
2126 json!({
2127 "cognitive": {
2128 "level": "contradiction",
2129 "observer": "claude-code",
2130 "subject": "claude-code",
2131 "session_key": "signals-session"
2132 }
2133 }),
2134 ),
2135 ] {
2136 repo.store(StoreMemoryParams {
2137 namespace_id: namespace.id,
2138 content,
2139 category: &MemoryCategory::Session,
2140 memory_lane_type: None,
2141 labels: &labels,
2142 metadata: &metadata,
2143 embedding: None,
2144 embedding_model: None,
2145 })
2146 .await
2147 .unwrap();
2148 }
2149
2150 let signals = collect_dream_signals(&repo, namespace.id, "signals-session")
2151 .await
2152 .unwrap();
2153
2154 assert_eq!(signals.raw_event_count, 1);
2155 assert_eq!(signals.explicit_count, 1);
2156 assert_eq!(signals.derived_count, 1);
2157 assert_eq!(signals.contradiction_count, 1);
2158 assert!(signals.has_digest_gap);
2159 }
2160
2161 #[tokio::test]
2162 async fn enqueue_dream_jobs_coalesces_session_scoped_shutdown_work() {
2163 let pool = SqlitePoolOptions::new()
2164 .max_connections(1)
2165 .connect("sqlite::memory:")
2166 .await
2167 .unwrap();
2168 nexus_storage::migrations::run_migrations(&pool)
2169 .await
2170 .unwrap();
2171
2172 let namespace_repo = NamespaceRepository::new(pool.clone());
2173 let namespace = namespace_repo
2174 .get_or_create("runtime-dream-dedupe", "runtime-dream-dedupe")
2175 .await
2176 .unwrap();
2177 let repo = MemoryRepository::new(pool.clone());
2178 let perspective = PerspectiveKey {
2179 observer: "claude-code".to_string(),
2180 subject: "claude-code".to_string(),
2181 session_key: Some("session-123".to_string()),
2182 };
2183
2184 let first = enqueue_dream_jobs(
2185 &repo,
2186 namespace.id,
2187 Some(&perspective),
2188 Some("session-123"),
2189 "session_end_dream",
2190 "session_end",
2191 )
2192 .await
2193 .unwrap();
2194 let second = enqueue_dream_jobs(
2195 &repo,
2196 namespace.id,
2197 Some(&perspective),
2198 Some("session-123"),
2199 "session_end_dream",
2200 "session_end",
2201 )
2202 .await
2203 .unwrap();
2204
2205 assert_eq!(first, 2);
2206 assert_eq!(second, 0);
2207 assert_eq!(
2208 repo.list_jobs(
2209 namespace.id,
2210 Some(REFLECT_PERSPECTIVE_JOB),
2211 Some(memory_job_status::PENDING),
2212 10,
2213 0
2214 )
2215 .await
2216 .unwrap()
2217 .len(),
2218 1
2219 );
2220 assert_eq!(
2221 repo.list_jobs(
2222 namespace.id,
2223 Some(REFLECT_NAMESPACE_JOB),
2224 Some(memory_job_status::PENDING),
2225 10,
2226 0
2227 )
2228 .await
2229 .unwrap()
2230 .len(),
2231 0
2232 );
2233 assert_eq!(
2234 repo.list_jobs(
2235 namespace.id,
2236 Some(DIGEST_SESSION_JOB),
2237 Some(memory_job_status::PENDING),
2238 10,
2239 0
2240 )
2241 .await
2242 .unwrap()
2243 .len(),
2244 1
2245 );
2246 }
2247
2248 #[test]
2251 fn test_choose_dream_schedule_uses_contradiction_density() {
2252 let cognition = CognitionConfig::default();
2253
2254 let plan = choose_dream_schedule(
2256 &cognition,
2257 &DreamSignals::default(),
2258 RuntimeShutdownReason::SessionEnded,
2259 );
2260 assert_eq!(plan.action, DreamScheduleAction::Skip);
2261
2262 let high_density = DreamSignals {
2264 total_non_raw_count: 10,
2265 contradiction_count: 3,
2266 contradiction_density: 0.30,
2267 ..DreamSignals::default()
2268 };
2269 let plan = choose_dream_schedule(
2270 &cognition,
2271 &high_density,
2272 RuntimeShutdownReason::SessionEnded,
2273 );
2274 assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
2275 assert!(plan.reason.contains("high contradiction density"));
2276
2277 let moderate_density = DreamSignals {
2279 total_non_raw_count: 20,
2280 contradiction_count: 2,
2281 contradiction_density: 0.10,
2282 explicit_count: 5,
2283 ..DreamSignals::default()
2284 };
2285 let plan = choose_dream_schedule(
2286 &cognition,
2287 &moderate_density,
2288 RuntimeShutdownReason::SessionEnded,
2289 );
2290 assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
2291 assert!(plan.reason.contains("moderate contradiction density"));
2292
2293 let plan = choose_dream_schedule(
2295 &cognition,
2296 &moderate_density,
2297 RuntimeShutdownReason::IdleTimeout,
2298 );
2299 assert_ne!(plan.action, DreamScheduleAction::ImmediateBounded);
2300 }
2301
2302 #[test]
2303 fn test_dream_signals_computes_contradiction_density() {
2304 let signals = DreamSignals {
2305 total_non_raw_count: 20,
2306 contradiction_count: 4,
2307 contradiction_density: 4.0 / 20.0,
2308 ..DreamSignals::default()
2309 };
2310 assert!((signals.contradiction_density - 0.20).abs() < f32::EPSILON);
2311
2312 let empty = DreamSignals::default();
2313 assert!((empty.contradiction_density).abs() < f32::EPSILON);
2314 }
2315}