Skip to main content

nexus_memory_agent/
runtime.rs

1//! Session-scoped runtime controller for hook-driven cognition.
2
3use std::collections::{BTreeSet, HashSet};
4use std::hash::{Hash, Hasher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Instant;
8
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use nexus_core::config::{AgentConfig, CognitionConfig};
12use nexus_core::traits::EmbeddingService;
13use nexus_core::{
14    infer_perspective, CognitiveLevel, CognitiveMetadata, Memory, MemoryCategory,
15    MemoryLaneCognitiveType, MemoryLaneType, PerspectiveKey, PerspectiveSource,
16};
17use nexus_llm::{
18    create_client_auto_with_fallback, ChatMessage, GenerateParams, GenerateResponse, LlmClient,
19    LlmClientJson,
20};
21use nexus_storage::models::{memory_job_status, EnqueueJobParams, MemoryJobRow};
22use nexus_storage::repository::{MemoryRepository, NamespaceRepository, StoreMemoryParams};
23use nexus_storage::StorageManager;
24use serde::{Deserialize, Serialize};
25use serde_json::json;
26use tracing::{debug, info, warn};
27
28use crate::derive::DeriveService;
29use crate::digest::DigestService;
30use crate::error::AgentError;
31use crate::reflect::ReflectService;
32use crate::util::{flush_metric_samples, stage_metric_sample};
33
34const DERIVE_MEMORY_JOB: &str = "derive_memory";
35const ACTIVITY_DISTILL_JOB: &str = "activity_distill";
36const REFLECT_NAMESPACE_JOB: &str = "reflect_namespace";
37const REFLECT_PERSPECTIVE_JOB: &str = "reflect_perspective";
38const DIGEST_SESSION_JOB: &str = "digest_session";
39const ACTIVITY_DISTILL_SYSTEM_PROMPT: &str = r#"You are distilling a batch of raw agent hook events into a meaningful session summary.
40
41Given a set of raw hook events (JSON with timestamps, tool names, CWD, session IDs), produce a structured summary of what happened in the session.
42
43Focus on:
44- What the user/agent was working on (project, directory, task)
45- Which tools were used and how often
46- Key actions taken (tests run, files edited, commands executed)
47- Any patterns (repeated test runs, debugging cycles, etc.)
48
49Return strict JSON with these fields:
50- summary: A 1-3 sentence human-readable summary of the session
51- category: One of "session", "context", "facts"
52- labels: 2-5 descriptive labels
53- key_activities: List of notable activities
54- files_touched: List of files/directories mentioned
55- tools_used: List of unique tools used
56- decisions_made: Any decisions evident from the event sequence
57
58Return strict JSON only. No markdown fences."#;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum RuntimeMode {
62    SessionScoped,
63    Persistent,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum RuntimeShutdownReason {
68    SessionEnded,
69    IdleTimeout,
70    Manual,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
74struct RuntimeState {
75    agent_type: String,
76    session_key: String,
77    mode: RuntimeModeSerde,
78    started_at: DateTime<Utc>,
79    updated_at: DateTime<Utc>,
80}
81
82struct RuntimeMarker<'a> {
83    agent_type: &'a str,
84    session_key: Option<&'a str>,
85    cwd: Option<&'a str>,
86    event: &'a str,
87    detail: &'a str,
88    agent_namespace: &'a str,
89}
90
91#[derive(Debug, Clone)]
92pub struct DreamCycleRequest<'a> {
93    pub namespace_id: i64,
94    pub lease_owner: &'a str,
95    pub perspective: Option<&'a PerspectiveKey>,
96    pub session_key: Option<&'a str>,
97    pub reflect_reason: &'a str,
98    pub digest_reason: &'a str,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
102enum DreamScheduleAction {
103    ImmediateBounded,
104    DelayedEnqueue,
105    DigestOnly,
106    Skip,
107}
108
109#[derive(Debug, Clone)]
110struct DreamSchedulePlan {
111    action: DreamScheduleAction,
112    reason: &'static str,
113}
114
115#[derive(Debug, Clone, Default)]
116struct DreamSignals {
117    raw_event_count: usize,
118    explicit_count: usize,
119    derived_count: usize,
120    contradiction_count: usize,
121    has_digest_gap: bool,
122    /// Total non-raw memories in the session scope.
123    total_non_raw_count: usize,
124    /// Ratio of contradictions to total non-raw (0.0 when no memories).
125    contradiction_density: f32,
126}
127
128#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
129#[serde(rename_all = "snake_case")]
130enum RuntimeModeSerde {
131    SessionScoped,
132    Persistent,
133}
134
135impl From<RuntimeMode> for RuntimeModeSerde {
136    fn from(value: RuntimeMode) -> Self {
137        match value {
138            RuntimeMode::SessionScoped => Self::SessionScoped,
139            RuntimeMode::Persistent => Self::Persistent,
140        }
141    }
142}
143
144pub struct RuntimeController {
145    cognition: CognitionConfig,
146    agent: AgentConfig,
147    embeddings: Option<Arc<dyn EmbeddingService>>,
148}
149
150impl RuntimeController {
151    pub fn new(
152        cognition: CognitionConfig,
153        agent: AgentConfig,
154        embeddings: Option<Arc<dyn EmbeddingService>>,
155    ) -> Self {
156        Self {
157            cognition,
158            agent,
159            embeddings,
160        }
161    }
162
163    pub async fn ensure_started(
164        &self,
165        agent_type: &str,
166        session_key: Option<&str>,
167        cwd: Option<&str>,
168        mode: RuntimeMode,
169    ) -> Result<(), AgentError> {
170        if !self.cognition.auto_runtime_enabled {
171            return Ok(());
172        }
173
174        let config =
175            nexus_core::Config::from_env().map_err(|e| AgentError::Config(e.to_string()))?;
176        let mut storage = StorageManager::from_url(&config.database_url())
177            .await
178            .map_err(|e| AgentError::Storage(e.to_string()))?;
179        storage
180            .initialize()
181            .await
182            .map_err(|e| AgentError::Storage(e.to_string()))?;
183
184        let session_key = derive_session_key(agent_type, session_key, cwd);
185        let path = self.state_file_path(agent_type, &session_key)?;
186        let now = Utc::now();
187
188        let mut state = read_runtime_state(&path)?.unwrap_or(RuntimeState {
189            agent_type: agent_type.to_string(),
190            session_key: session_key.clone(),
191            mode: mode.into(),
192            started_at: now,
193            updated_at: now,
194        });
195
196        if (now - state.updated_at).num_seconds() > self.cognition.runtime_idle_timeout_secs as i64
197        {
198            state.started_at = now;
199        }
200        state.updated_at = now;
201        state.mode = mode.into();
202
203        write_runtime_state(&path, &state)?;
204        let llm = runtime_llm_client();
205        let processed = drain_cognition_jobs(
206            storage.pool().clone(),
207            namespace_id_for(agent_type, &storage).await?,
208            &self.cognition,
209            &self.agent,
210            llm,
211            self.embeddings.clone(),
212            &format!("runtime-start-{agent_type}-{}", state.session_key),
213        )
214        .await?;
215        debug!(
216            agent_type,
217            processed_jobs = processed,
218            "Runtime startup cognition drain complete"
219        );
220        debug!(agent_type, session_key = %state.session_key, "Runtime state ensured");
221        Ok(())
222    }
223
224    pub async fn flush_and_shutdown(
225        &self,
226        agent_type: &str,
227        session_key: Option<&str>,
228        cwd: Option<&str>,
229        reason: RuntimeShutdownReason,
230    ) -> Result<(), AgentError> {
231        if !self.cognition.auto_runtime_enabled {
232            return Ok(());
233        }
234
235        let config =
236            nexus_core::Config::from_env().map_err(|e| AgentError::Config(e.to_string()))?;
237        let mut storage = StorageManager::from_url(&config.database_url())
238            .await
239            .map_err(|e| AgentError::Storage(e.to_string()))?;
240        storage
241            .initialize()
242            .await
243            .map_err(|e| AgentError::Storage(e.to_string()))?;
244
245        let namespace_repo = NamespaceRepository::new(storage.pool().clone());
246        let namespace = namespace_repo
247            .get_or_create(agent_type, agent_type)
248            .await
249            .map_err(|e| AgentError::Storage(e.to_string()))?;
250
251        let memory_repo = MemoryRepository::new(storage.pool().clone());
252        let derived_session_key = derive_session_key(agent_type, session_key, cwd);
253        store_runtime_marker(
254            &memory_repo,
255            namespace.id,
256            RuntimeMarker {
257                agent_type,
258                session_key,
259                cwd,
260                event: "runtime_session_end",
261                detail: runtime_reason_label(reason),
262                agent_namespace: self.agent.namespace.as_str(),
263            },
264        )
265        .await?;
266
267        let llm = runtime_llm_client();
268        let processed = drain_cognition_jobs(
269            storage.pool().clone(),
270            namespace.id,
271            &self.cognition,
272            &self.agent,
273            llm.clone(),
274            self.embeddings.clone(),
275            &format!("runtime-stop-{agent_type}-{derived_session_key}"),
276        )
277        .await?;
278        debug!(
279            agent_type,
280            processed_jobs = processed,
281            "Runtime shutdown cognition drain complete"
282        );
283
284        if self.cognition.dream_on_session_end {
285            let lease_owner = format!("runtime-dream-{agent_type}-{derived_session_key}");
286            let shutdown_perspective = PerspectiveKey {
287                observer: agent_type.to_string(),
288                subject: agent_type.to_string(),
289                session_key: Some(derived_session_key.clone()),
290            };
291            let signals =
292                collect_dream_signals(&memory_repo, namespace.id, &derived_session_key).await?;
293            let plan = choose_dream_schedule(&self.cognition, &signals, reason);
294            debug!(
295                agent_type,
296                session_key = ?session_key,
297                action = ?plan.action,
298                plan_reason = plan.reason,
299                raw_event_count = signals.raw_event_count,
300                contradiction_count = signals.contradiction_count,
301                contradiction_density = signals.contradiction_density,
302                total_non_raw = signals.total_non_raw_count,
303                has_digest_gap = signals.has_digest_gap,
304                "Selected adaptive dream schedule"
305            );
306            match plan.action {
307                DreamScheduleAction::ImmediateBounded => match tokio::time::timeout(
308                    std::time::Duration::from_secs(self.cognition.session_end_dream_timeout_secs),
309                    run_dream_cycle(
310                        storage.pool().clone(),
311                        &self.cognition,
312                        &self.agent,
313                        llm,
314                        self.embeddings.clone(),
315                        DreamCycleRequest {
316                            namespace_id: namespace.id,
317                            lease_owner: &lease_owner,
318                            perspective: Some(&shutdown_perspective),
319                            session_key: Some(derived_session_key.as_str()),
320                            reflect_reason: "session_end_dream",
321                            digest_reason: "session_end",
322                        },
323                    ),
324                )
325                .await
326                {
327                    Ok(Ok(processed)) if processed > 0 => {
328                        info!(
329                            agent_type,
330                            session_key = ?session_key,
331                            processed,
332                            plan_reason = plan.reason,
333                            "Dream pass completed through cognition jobs"
334                        );
335                    }
336                    Ok(Ok(_)) => {
337                        debug!(
338                            agent_type,
339                            session_key = ?session_key,
340                            plan_reason = plan.reason,
341                            "Dream pass skipped"
342                        );
343                    }
344                    Ok(Err(error)) => {
345                        warn!(%error, agent_type, session_key = ?session_key, "Dream pass failed");
346                    }
347                    Err(_) => {
348                        warn!(
349                            agent_type,
350                            session_key = ?session_key,
351                            timeout_secs = self.cognition.session_end_dream_timeout_secs,
352                            "Dream pass timed out during shutdown"
353                        );
354                    }
355                },
356                DreamScheduleAction::DelayedEnqueue => {
357                    let queued = enqueue_dream_jobs(
358                        &memory_repo,
359                        namespace.id,
360                        Some(&shutdown_perspective),
361                        Some(derived_session_key.as_str()),
362                        "session_end_delayed_dream",
363                        "session_end",
364                    )
365                    .await?;
366                    debug!(
367                        agent_type,
368                        session_key = ?session_key,
369                        queued,
370                        plan_reason = plan.reason,
371                        "Queued delayed dream jobs without immediate drain"
372                    );
373                }
374                DreamScheduleAction::DigestOnly => {
375                    let queued = enqueue_digest_job_if_absent(
376                        &memory_repo,
377                        namespace.id,
378                        derived_session_key.as_str(),
379                        "session_end_digest_only",
380                    )
381                    .await?;
382                    debug!(
383                        agent_type,
384                        session_key = ?session_key,
385                        queued,
386                        plan_reason = plan.reason,
387                        "Queued digest-only shutdown work"
388                    );
389                    if queued && matches!(reason, RuntimeShutdownReason::SessionEnded) {
390                        match tokio::time::timeout(
391                            std::time::Duration::from_secs(
392                                self.cognition.session_end_dream_timeout_secs,
393                            ),
394                            drain_cognition_jobs(
395                                storage.pool().clone(),
396                                namespace.id,
397                                &self.cognition,
398                                &self.agent,
399                                llm.clone(),
400                                self.embeddings.clone(),
401                                &format!("runtime-finalize-{agent_type}-{derived_session_key}"),
402                            ),
403                        )
404                        .await
405                        {
406                            Ok(Ok(processed)) => {
407                                debug!(
408                                    agent_type,
409                                    session_key = ?session_key,
410                                    processed,
411                                    plan_reason = plan.reason,
412                                    "Drained digest-only shutdown work before runtime teardown"
413                                );
414                            }
415                            Ok(Err(error)) => {
416                                warn!(
417                                    %error,
418                                    agent_type,
419                                    session_key = ?session_key,
420                                    "Digest-only shutdown drain failed"
421                                );
422                            }
423                            Err(_) => {
424                                warn!(
425                                    agent_type,
426                                    session_key = ?session_key,
427                                    timeout_secs = self.cognition.session_end_dream_timeout_secs,
428                                    "Digest-only shutdown drain timed out"
429                                );
430                            }
431                        }
432                    }
433                }
434                DreamScheduleAction::Skip => {
435                    debug!(
436                        agent_type,
437                        session_key = ?session_key,
438                        plan_reason = plan.reason,
439                        "Skipped shutdown dream work after adaptive planning"
440                    );
441                }
442            }
443        }
444
445        let path = self.state_file_path(agent_type, &derived_session_key)?;
446        if path.exists() {
447            std::fs::remove_file(&path)?;
448        }
449
450        Ok(())
451    }
452
453    pub fn state_root() -> PathBuf {
454        if let Some(dir) = dirs::state_dir() {
455            dir.join("nexus-memory-system").join("runtime")
456        } else {
457            std::env::var("HOME")
458                .map(|h| PathBuf::from(h).join(".local/state/nexus-memory-system/runtime"))
459                .unwrap_or_else(|_| PathBuf::from(".nexus-runtime"))
460        }
461    }
462
463    fn state_file_path(&self, agent_type: &str, session_key: &str) -> Result<PathBuf, AgentError> {
464        let root = Self::state_root().join("sessions");
465        std::fs::create_dir_all(&root)?;
466        Ok(root.join(format!(
467            "{}__{}.json",
468            sanitize_component(agent_type),
469            sanitize_component(session_key)
470        )))
471    }
472}
473
474pub async fn drain_cognition_jobs(
475    pool: sqlx::SqlitePool,
476    namespace_id: i64,
477    cognition: &CognitionConfig,
478    agent: &AgentConfig,
479    llm: Arc<dyn LlmClient>,
480    embeddings: Option<Arc<dyn EmbeddingService>>,
481    lease_owner: &str,
482) -> Result<usize, AgentError> {
483    let repo = MemoryRepository::new(pool);
484    let mut total_processed = 0usize;
485
486    for _ in 0..3 {
487        let mut progressed = 0usize;
488
489        if cognition.activity_distill_enabled {
490            progressed += process_activity_distill_jobs(
491                &repo,
492                namespace_id,
493                cognition,
494                llm.clone(),
495                lease_owner,
496            )
497            .await?;
498        }
499        if cognition.derive_enabled {
500            progressed += process_derive_jobs(
501                &repo,
502                namespace_id,
503                cognition,
504                agent,
505                llm.clone(),
506                embeddings.clone(),
507                lease_owner,
508            )
509            .await?;
510        }
511        if cognition.reflect_enabled {
512            progressed += process_reflect_jobs(
513                &repo,
514                namespace_id,
515                cognition,
516                agent,
517                embeddings.clone(),
518                lease_owner,
519            )
520            .await?;
521            progressed += process_reflect_namespace_jobs(
522                &repo,
523                namespace_id,
524                cognition,
525                agent,
526                embeddings.clone(),
527                lease_owner,
528            )
529            .await?;
530        }
531        if cognition.digest_enabled {
532            progressed += process_digest_jobs(
533                &repo,
534                namespace_id,
535                cognition,
536                agent,
537                llm.clone(),
538                embeddings.clone(),
539                lease_owner,
540            )
541            .await?;
542        }
543
544        total_processed += progressed;
545        if progressed == 0 {
546            break;
547        }
548    }
549
550    Ok(total_processed)
551}
552
553pub async fn enqueue_dream_jobs(
554    repo: &MemoryRepository,
555    namespace_id: i64,
556    perspective: Option<&PerspectiveKey>,
557    session_key: Option<&str>,
558    reflect_reason: &str,
559    digest_reason: &str,
560) -> Result<usize, AgentError> {
561    let mut queued = 0usize;
562
563    if let Some(perspective) = perspective {
564        let perspective_json = serde_json::to_value(perspective)
565            .map_err(|error| AgentError::Reflection(error.to_string()))?;
566        let payload = json!({
567            "reason": reflect_reason,
568            "session_key": perspective.session_key,
569        });
570        if enqueue_job_if_absent(
571            repo,
572            EnqueueJobParams {
573                namespace_id,
574                job_type: REFLECT_PERSPECTIVE_JOB,
575                priority: 100,
576                perspective: Some(&perspective_json),
577                payload: &payload,
578            },
579        )
580        .await?
581        {
582            queued += 1;
583        }
584    } else {
585        let payload = json!({
586            "reason": reflect_reason,
587            "session_key": session_key,
588        });
589        if enqueue_job_if_absent(
590            repo,
591            EnqueueJobParams {
592                namespace_id,
593                job_type: REFLECT_NAMESPACE_JOB,
594                priority: 100,
595                perspective: None,
596                payload: &payload,
597            },
598        )
599        .await?
600        {
601            queued += 1;
602        }
603    }
604
605    if let Some(session_key) = session_key {
606        let payload = json!({
607            "session_key": session_key,
608            "reason": digest_reason,
609        });
610        if enqueue_job_if_absent(
611            repo,
612            EnqueueJobParams {
613                namespace_id,
614                job_type: DIGEST_SESSION_JOB,
615                priority: 110,
616                perspective: None,
617                payload: &payload,
618            },
619        )
620        .await?
621        {
622            queued += 1;
623        }
624    }
625
626    Ok(queued)
627}
628
629async fn enqueue_digest_job_if_absent(
630    repo: &MemoryRepository,
631    namespace_id: i64,
632    session_key: &str,
633    digest_reason: &str,
634) -> Result<bool, AgentError> {
635    let payload = json!({
636        "session_key": session_key,
637        "reason": digest_reason,
638    });
639    enqueue_job_if_absent(
640        repo,
641        EnqueueJobParams {
642            namespace_id,
643            job_type: DIGEST_SESSION_JOB,
644            priority: 110,
645            perspective: None,
646            payload: &payload,
647        },
648    )
649    .await
650}
651
652async fn enqueue_job_if_absent(
653    repo: &MemoryRepository,
654    params: EnqueueJobParams<'_>,
655) -> Result<bool, AgentError> {
656    for status in [memory_job_status::PENDING, memory_job_status::RUNNING] {
657        let jobs = repo
658            .list_jobs(
659                params.namespace_id,
660                Some(params.job_type),
661                Some(status),
662                64,
663                0,
664            )
665            .await
666            .map_err(|error| AgentError::Storage(error.to_string()))?;
667        if jobs
668            .iter()
669            .any(|row| queued_job_matches(row, params.perspective, params.payload))
670        {
671            return Ok(false);
672        }
673    }
674
675    repo.enqueue_job(params)
676        .await
677        .map_err(|error| AgentError::Storage(error.to_string()))?;
678    Ok(true)
679}
680
681fn queued_job_matches(
682    row: &MemoryJobRow,
683    perspective: Option<&serde_json::Value>,
684    payload: &serde_json::Value,
685) -> bool {
686    let row_payload: serde_json::Value = match serde_json::from_str(&row.payload_json) {
687        Ok(value) => value,
688        Err(_) => return false,
689    };
690    if &row_payload != payload {
691        return false;
692    }
693
694    match (&row.perspective_json, perspective) {
695        (None, None) => true,
696        (Some(existing), Some(expected)) => serde_json::from_str::<serde_json::Value>(existing)
697            .map(|value| value == *expected)
698            .unwrap_or(false),
699        _ => false,
700    }
701}
702
703pub async fn run_dream_cycle(
704    pool: sqlx::SqlitePool,
705    cognition: &CognitionConfig,
706    agent: &AgentConfig,
707    llm: Arc<dyn LlmClient>,
708    embeddings: Option<Arc<dyn EmbeddingService>>,
709    request: DreamCycleRequest<'_>,
710) -> Result<usize, AgentError> {
711    let repo = MemoryRepository::new(pool.clone());
712    let total_started = Instant::now();
713    let mut metrics = Vec::new();
714    let enqueue_started = Instant::now();
715    enqueue_dream_jobs(
716        &repo,
717        request.namespace_id,
718        request.perspective,
719        request.session_key,
720        request.reflect_reason,
721        request.digest_reason,
722    )
723    .await?;
724    metrics.push(stage_metric_sample(
725        request.namespace_id,
726        "cognition.dream.enqueue_ms",
727        enqueue_started.elapsed().as_secs_f64() * 1000.0,
728        "enqueue",
729    ));
730    let drain_started = Instant::now();
731    let processed = drain_cognition_jobs(
732        pool,
733        request.namespace_id,
734        cognition,
735        agent,
736        llm,
737        embeddings,
738        request.lease_owner,
739    )
740    .await?;
741    metrics.push(stage_metric_sample(
742        request.namespace_id,
743        "cognition.dream.drain_ms",
744        drain_started.elapsed().as_secs_f64() * 1000.0,
745        "drain",
746    ));
747    metrics.push(stage_metric_sample(
748        request.namespace_id,
749        "cognition.dream.total_ms",
750        total_started.elapsed().as_secs_f64() * 1000.0,
751        "total",
752    ));
753    flush_metric_samples(&repo, &metrics).await;
754    Ok(processed)
755}
756
757async fn collect_dream_signals(
758    repo: &MemoryRepository,
759    namespace_id: i64,
760    session_key: &str,
761) -> Result<DreamSignals, AgentError> {
762    let memories = repo
763        .list_filtered(
764            namespace_id,
765            nexus_storage::repository::ListMemoryFilters {
766                category: None,
767                since: None,
768                until: None,
769                content_like: None,
770                include_raw: true,
771                limit: 256,
772                offset: 0,
773            },
774        )
775        .await
776        .map_err(|error| AgentError::Storage(error.to_string()))?;
777    let has_digest_gap = repo
778        .count_digests(namespace_id, Some(session_key))
779        .await
780        .map_err(|error| AgentError::Storage(error.to_string()))?
781        == 0;
782
783    let mut signals = DreamSignals {
784        has_digest_gap,
785        ..DreamSignals::default()
786    };
787    for memory in memories
788        .iter()
789        .filter(|memory| memory_matches_session_key(memory, session_key))
790    {
791        if memory.labels.iter().any(|label| label == "raw-activity")
792            || memory.metadata.get("raw_activity").is_some()
793        {
794            signals.raw_event_count += 1;
795        }
796        if let Some(level) =
797            CognitiveMetadata::from_metadata(&memory.metadata).map(|meta| meta.level)
798        {
799            match level {
800                CognitiveLevel::Explicit => {
801                    signals.explicit_count += 1;
802                    signals.total_non_raw_count += 1;
803                }
804                CognitiveLevel::Derived => {
805                    signals.derived_count += 1;
806                    signals.total_non_raw_count += 1;
807                }
808                CognitiveLevel::Contradiction => {
809                    signals.contradiction_count += 1;
810                    signals.total_non_raw_count += 1;
811                }
812                _ => {}
813            }
814        }
815    }
816    signals.contradiction_density = if signals.total_non_raw_count > 0 {
817        signals.contradiction_count as f32 / signals.total_non_raw_count as f32
818    } else {
819        0.0
820    };
821    Ok(signals)
822}
823
824fn memory_matches_session_key(memory: &Memory, session_key: &str) -> bool {
825    let metadata = &memory.metadata;
826    let matches_value = |value: Option<&serde_json::Value>| {
827        value.and_then(serde_json::Value::as_str) == Some(session_key)
828    };
829
830    if matches_value(metadata.pointer("/cognitive/session_key"))
831        || matches_value(metadata.pointer("/raw_activity/derived_session_key"))
832        || matches_value(metadata.pointer("/runtime/derived_session_key"))
833        || matches_value(metadata.pointer("/runtime/session_key"))
834    {
835        return true;
836    }
837
838    for pointer in [
839        "/cognitive/session_keys",
840        "/source/derived_session_keys",
841        "/raw_activity/derived_session_keys",
842    ] {
843        if metadata
844            .pointer(pointer)
845            .and_then(serde_json::Value::as_array)
846            .is_some_and(|values| {
847                values
848                    .iter()
849                    .any(|value| value.as_str() == Some(session_key))
850            })
851        {
852            return true;
853        }
854    }
855
856    false
857}
858
859fn choose_dream_schedule(
860    cognition: &CognitionConfig,
861    signals: &DreamSignals,
862    reason: RuntimeShutdownReason,
863) -> DreamSchedulePlan {
864    if signals.raw_event_count == 0
865        && signals.explicit_count == 0
866        && signals.derived_count == 0
867        && signals.contradiction_count == 0
868    {
869        return DreamSchedulePlan {
870            action: DreamScheduleAction::Skip,
871            reason: "no session cognition signal",
872        };
873    }
874
875    // High contradiction density (>20% of non-raw memories) always triggers immediate.
876    if signals.contradiction_count > 0
877        && signals.total_non_raw_count > 0
878        && signals.contradiction_density > 0.20
879    {
880        return DreamSchedulePlan {
881            action: DreamScheduleAction::ImmediateBounded,
882            reason: "high contradiction density requires immediate reconciliation",
883        };
884    }
885
886    // Moderate contradiction density (5-20%) on session end triggers immediate.
887    if matches!(reason, RuntimeShutdownReason::SessionEnded)
888        && signals.contradiction_density > 0.05
889        && signals.contradiction_density <= 0.20
890    {
891        return DreamSchedulePlan {
892            action: DreamScheduleAction::ImmediateBounded,
893            reason: "moderate contradiction density at session end warrants reconciliation",
894        };
895    }
896
897    if signals.contradiction_count > 0 && signals.contradiction_density <= 0.05 {
898        return DreamSchedulePlan {
899            action: DreamScheduleAction::ImmediateBounded,
900            reason: "contradictions require immediate reconciliation",
901        };
902    }
903
904    if signals.raw_event_count >= cognition.activity_distill_min_events {
905        return DreamSchedulePlan {
906            action: DreamScheduleAction::ImmediateBounded,
907            reason: "high session activity warrants immediate dream pass",
908        };
909    }
910
911    if matches!(reason, RuntimeShutdownReason::SessionEnded)
912        && signals.explicit_count >= 2
913        && signals.derived_count == 0
914    {
915        return DreamSchedulePlan {
916            action: DreamScheduleAction::ImmediateBounded,
917            reason: "session end flushes unreconciled explicit observations immediately",
918        };
919    }
920
921    if signals.has_digest_gap
922        && signals.explicit_count == 0
923        && signals.derived_count == 0
924        && signals.contradiction_count == 0
925        && signals.raw_event_count <= (cognition.activity_distill_min_events / 2).max(1)
926    {
927        return DreamSchedulePlan {
928            action: DreamScheduleAction::DigestOnly,
929            reason: "light session activity with uncovered digest window",
930        };
931    }
932
933    if matches!(reason, RuntimeShutdownReason::IdleTimeout)
934        && (signals.raw_event_count > 0
935            || signals.explicit_count >= 2
936            || (signals.explicit_count > 0 && signals.derived_count == 0))
937    {
938        return DreamSchedulePlan {
939            action: DreamScheduleAction::DelayedEnqueue,
940            reason: "idle timeout defers medium-signal dreaming to background jobs",
941        };
942    }
943
944    if signals.explicit_count >= 2 && signals.derived_count == 0 {
945        return DreamSchedulePlan {
946            action: DreamScheduleAction::DelayedEnqueue,
947            reason: "explicit observations suggest deferred reflection opportunity",
948        };
949    }
950
951    DreamSchedulePlan {
952        action: DreamScheduleAction::Skip,
953        reason: "insufficient signal for additional dream work",
954    }
955}
956
957async fn store_runtime_marker(
958    memory_repo: &MemoryRepository,
959    namespace_id: i64,
960    marker: RuntimeMarker<'_>,
961) -> Result<(), AgentError> {
962    let session_tag = derive_session_key(marker.agent_type, marker.session_key, marker.cwd);
963    let content = format!(
964        "Runtime {} for {} [session:{}] ({})",
965        marker.event.replace('_', " "),
966        marker.agent_type,
967        session_tag,
968        marker.detail
969    );
970    let metadata = json!({
971        "runtime": {
972            "event": marker.event,
973            "detail": marker.detail,
974            "session_key": marker.session_key,
975            "derived_session_key": session_tag,
976            "cwd": marker.cwd,
977            "agent_type": marker.agent_type,
978            "agent_namespace": marker.agent_namespace,
979            "captured_at": Utc::now(),
980        }
981    });
982    let mut cognitive = CognitiveMetadata::new(
983        CognitiveLevel::Explicit,
984        marker.agent_type,
985        marker.agent_type,
986        Some(session_tag.clone()),
987        "runtime_controller",
988    );
989    cognitive.confidence = Some(1.0);
990    let metadata = cognitive.merge_into(&metadata);
991
992    memory_repo
993        .store(StoreMemoryParams {
994            namespace_id,
995            content: &content,
996            category: &MemoryCategory::Session,
997            memory_lane_type: None,
998            labels: &[
999                "runtime".to_string(),
1000                "session".to_string(),
1001                marker.event.to_string(),
1002            ],
1003            metadata: &metadata,
1004            embedding: None,
1005            embedding_model: None,
1006        })
1007        .await
1008        .map_err(|e| AgentError::Storage(e.to_string()))?;
1009
1010    Ok(())
1011}
1012
1013fn read_runtime_state(path: &Path) -> Result<Option<RuntimeState>, AgentError> {
1014    if !path.exists() {
1015        return Ok(None);
1016    }
1017
1018    let contents = std::fs::read_to_string(path)?;
1019    let state =
1020        serde_json::from_str(&contents).map_err(|e| AgentError::Supervisor(e.to_string()))?;
1021    Ok(Some(state))
1022}
1023
1024fn write_runtime_state(path: &Path, state: &RuntimeState) -> Result<(), AgentError> {
1025    let contents =
1026        serde_json::to_string_pretty(state).map_err(|e| AgentError::Supervisor(e.to_string()))?;
1027    std::fs::write(path, contents)?;
1028    Ok(())
1029}
1030
1031pub fn derive_session_key(
1032    agent_type: &str,
1033    session_key: Option<&str>,
1034    cwd: Option<&str>,
1035) -> String {
1036    if let Some(value) = session_key.filter(|value| !value.trim().is_empty()) {
1037        return value.to_string();
1038    }
1039
1040    let canonical_agent = nexus_core::canonicalize_agent_type(agent_type);
1041    let fallback_scope = cwd
1042        .filter(|value| !value.trim().is_empty())
1043        .map(nexus_core::normalize_project_path)
1044        .unwrap_or_else(|| "unknown-cwd".to_string());
1045    let mut hasher = std::collections::hash_map::DefaultHasher::new();
1046    canonical_agent.hash(&mut hasher);
1047    fallback_scope.hash(&mut hasher);
1048    format!("derived-{:016x}", hasher.finish())
1049}
1050
1051fn sanitize_component(value: &str) -> String {
1052    value
1053        .chars()
1054        .map(|ch| {
1055            if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
1056                ch
1057            } else {
1058                '_'
1059            }
1060        })
1061        .collect()
1062}
1063
1064fn runtime_reason_label(reason: RuntimeShutdownReason) -> &'static str {
1065    match reason {
1066        RuntimeShutdownReason::SessionEnded => "session-ended",
1067        RuntimeShutdownReason::IdleTimeout => "idle-timeout",
1068        RuntimeShutdownReason::Manual => "manual",
1069    }
1070}
1071
1072async fn namespace_id_for(agent_type: &str, storage: &StorageManager) -> Result<i64, AgentError> {
1073    let canonical = nexus_core::canonicalize_agent_type(agent_type);
1074    let namespace_repo = NamespaceRepository::new(storage.pool().clone());
1075    namespace_repo
1076        .get_or_create(&canonical, &canonical)
1077        .await
1078        .map(|namespace| namespace.id)
1079        .map_err(|error| AgentError::Storage(error.to_string()))
1080}
1081
1082async fn process_derive_jobs(
1083    repo: &MemoryRepository,
1084    namespace_id: i64,
1085    cognition: &CognitionConfig,
1086    agent: &AgentConfig,
1087    llm: Arc<dyn LlmClient>,
1088    embeddings: Option<Arc<dyn EmbeddingService>>,
1089    lease_owner: &str,
1090) -> Result<usize, AgentError> {
1091    let jobs = repo
1092        .claim_jobs(
1093            namespace_id,
1094            DERIVE_MEMORY_JOB,
1095            lease_owner,
1096            cognition.lease_ttl_secs,
1097            cognition.max_job_batch as i64,
1098        )
1099        .await
1100        .map_err(|error| AgentError::Storage(error.to_string()))?;
1101
1102    let service = DeriveService::new(agent.clone(), llm, embeddings);
1103    let mut processed = 0usize;
1104    for job in jobs {
1105        let memory_id = job
1106            .payload
1107            .get("memory_id")
1108            .and_then(serde_json::Value::as_i64);
1109        let outcome = async {
1110            let memory_id = memory_id.ok_or_else(|| {
1111                AgentError::Derivation("derive job missing memory_id".to_string())
1112            })?;
1113            let memory = repo
1114                .get_by_id(memory_id)
1115                .await
1116                .map_err(|error| AgentError::Storage(error.to_string()))?
1117                .ok_or_else(|| {
1118                    AgentError::Derivation(format!("derive source memory {memory_id} not found"))
1119                })?;
1120            service
1121                .derive_memory_with_perspective(&memory, job.perspective.as_ref(), repo)
1122                .await
1123                .map(|_| ())
1124        }
1125        .await;
1126
1127        match outcome {
1128            Ok(()) => {
1129                repo.complete_job(&job)
1130                    .await
1131                    .map_err(|error| AgentError::Storage(error.to_string()))?;
1132                processed += 1;
1133            }
1134            Err(error) => {
1135                repo.fail_job(&job, &error.to_string())
1136                    .await
1137                    .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
1138            }
1139        }
1140    }
1141
1142    Ok(processed)
1143}
1144
1145async fn process_reflect_jobs(
1146    repo: &MemoryRepository,
1147    namespace_id: i64,
1148    cognition: &CognitionConfig,
1149    agent: &AgentConfig,
1150    embeddings: Option<Arc<dyn EmbeddingService>>,
1151    lease_owner: &str,
1152) -> Result<usize, AgentError> {
1153    let jobs = repo
1154        .claim_jobs(
1155            namespace_id,
1156            REFLECT_PERSPECTIVE_JOB,
1157            lease_owner,
1158            cognition.lease_ttl_secs,
1159            cognition.max_job_batch as i64,
1160        )
1161        .await
1162        .map_err(|error| AgentError::Storage(error.to_string()))?;
1163
1164    let service = ReflectService::new(agent.clone(), cognition.clone(), embeddings.clone());
1165    let mut processed = 0usize;
1166    for job in jobs {
1167        let outcome = async {
1168            let perspective = job.perspective.as_ref().ok_or_else(|| {
1169                AgentError::Reflection("reflect job missing perspective".to_string())
1170            })?;
1171            service
1172                .reflect_perspective_cycle(namespace_id, perspective, repo)
1173                .await
1174                .map(|_| ())
1175        }
1176        .await;
1177
1178        match outcome {
1179            Ok(()) => {
1180                repo.complete_job(&job)
1181                    .await
1182                    .map_err(|error| AgentError::Storage(error.to_string()))?;
1183                processed += 1;
1184            }
1185            Err(error) => {
1186                repo.fail_job(&job, &error.to_string())
1187                    .await
1188                    .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
1189            }
1190        }
1191    }
1192
1193    Ok(processed)
1194}
1195
1196async fn process_reflect_namespace_jobs(
1197    repo: &MemoryRepository,
1198    namespace_id: i64,
1199    cognition: &CognitionConfig,
1200    agent: &AgentConfig,
1201    embeddings: Option<Arc<dyn EmbeddingService>>,
1202    lease_owner: &str,
1203) -> Result<usize, AgentError> {
1204    let jobs = repo
1205        .claim_jobs(
1206            namespace_id,
1207            REFLECT_NAMESPACE_JOB,
1208            lease_owner,
1209            cognition.lease_ttl_secs,
1210            cognition.max_job_batch as i64,
1211        )
1212        .await
1213        .map_err(|error| AgentError::Storage(error.to_string()))?;
1214
1215    let service = ReflectService::new(agent.clone(), cognition.clone(), embeddings.clone());
1216    let mut processed = 0usize;
1217    for job in jobs {
1218        let outcome = service.reflect_cycle(namespace_id, repo).await.map(|_| ());
1219
1220        match outcome {
1221            Ok(()) => {
1222                repo.complete_job(&job)
1223                    .await
1224                    .map_err(|error| AgentError::Storage(error.to_string()))?;
1225                processed += 1;
1226            }
1227            Err(error) => {
1228                repo.fail_job(&job, &error.to_string())
1229                    .await
1230                    .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
1231            }
1232        }
1233    }
1234
1235    Ok(processed)
1236}
1237
1238async fn process_digest_jobs(
1239    repo: &MemoryRepository,
1240    namespace_id: i64,
1241    cognition: &CognitionConfig,
1242    agent: &AgentConfig,
1243    llm: Arc<dyn LlmClient>,
1244    embeddings: Option<Arc<dyn EmbeddingService>>,
1245    lease_owner: &str,
1246) -> Result<usize, AgentError> {
1247    let jobs = repo
1248        .claim_jobs(
1249            namespace_id,
1250            DIGEST_SESSION_JOB,
1251            lease_owner,
1252            cognition.lease_ttl_secs,
1253            cognition.max_job_batch as i64,
1254        )
1255        .await
1256        .map_err(|error| AgentError::Storage(error.to_string()))?;
1257
1258    let service = DigestService::new(agent.clone(), llm, embeddings);
1259    let mut processed = 0usize;
1260    let mut seen_sessions = HashSet::new();
1261    for job in jobs {
1262        let session_key = job
1263            .payload
1264            .get("session_key")
1265            .and_then(serde_json::Value::as_str)
1266            .map(ToString::to_string)
1267            .or_else(|| {
1268                job.perspective
1269                    .as_ref()
1270                    .and_then(|perspective| perspective.session_key.clone())
1271            })
1272            .ok_or_else(|| AgentError::Digest("digest job missing session_key".to_string()))?;
1273        let force = digest_job_is_forced(
1274            job.payload
1275                .get("reason")
1276                .and_then(serde_json::Value::as_str),
1277        );
1278
1279        if !seen_sessions.insert(session_key.clone()) {
1280            repo.complete_job(&job)
1281                .await
1282                .map_err(|error| AgentError::Storage(error.to_string()))?;
1283            processed += 1;
1284            continue;
1285        }
1286
1287        if !force
1288            && !should_run_incremental_digest(repo, namespace_id, &session_key, cognition).await?
1289        {
1290            debug!(
1291                namespace_id,
1292                session_key, "Skipping digest rollover below threshold"
1293            );
1294            repo.complete_job(&job)
1295                .await
1296                .map_err(|error| AgentError::Storage(error.to_string()))?;
1297            processed += 1;
1298            continue;
1299        }
1300
1301        let outcome = async {
1302            service
1303                .digest_session(namespace_id, &session_key, repo, force)
1304                .await
1305                .map(|_| ())
1306        }
1307        .await;
1308
1309        match outcome {
1310            Ok(()) => {
1311                repo.complete_job(&job)
1312                    .await
1313                    .map_err(|error| AgentError::Storage(error.to_string()))?;
1314                processed += 1;
1315            }
1316            Err(error) => {
1317                repo.fail_job(&job, &error.to_string())
1318                    .await
1319                    .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
1320            }
1321        }
1322    }
1323
1324    Ok(processed)
1325}
1326
1327fn digest_job_is_forced(reason: Option<&str>) -> bool {
1328    matches!(
1329        reason,
1330        Some("dream_digest" | "session_end" | "manual_digest" | "manual_rebuild")
1331    )
1332}
1333
1334async fn should_run_incremental_digest(
1335    repo: &MemoryRepository,
1336    namespace_id: i64,
1337    session_key: &str,
1338    cognition: &CognitionConfig,
1339) -> Result<bool, AgentError> {
1340    let rollover = repo
1341        .session_digest_rollover(namespace_id, session_key)
1342        .await
1343        .map_err(|error| AgentError::Storage(error.to_string()))?;
1344
1345    if rollover.last_digest_end_memory_id.is_none() {
1346        return Ok(true);
1347    }
1348
1349    Ok(
1350        rollover.new_memory_count >= cognition.activity_distill_min_events as i64
1351            || rollover.estimated_new_tokens >= cognition.digest_short_target_tokens as i64,
1352    )
1353}
1354
1355#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1356struct DistilledSession {
1357    summary: String,
1358    category: String,
1359    labels: Vec<String>,
1360    key_activities: Vec<String>,
1361    files_touched: Vec<String>,
1362    tools_used: Vec<String>,
1363    decisions_made: Vec<String>,
1364}
1365
1366#[derive(Debug, Clone)]
1367struct DistillEvent {
1368    memory_id: i64,
1369    created_at: DateTime<Utc>,
1370    session_key: String,
1371    event_name: String,
1372    cwd: Option<String>,
1373    raw_payload: serde_json::Value,
1374}
1375
1376async fn process_activity_distill_jobs(
1377    repo: &MemoryRepository,
1378    namespace_id: i64,
1379    cognition: &CognitionConfig,
1380    llm: Arc<dyn LlmClient>,
1381    lease_owner: &str,
1382) -> Result<usize, AgentError> {
1383    let jobs = repo
1384        .claim_jobs(
1385            namespace_id,
1386            ACTIVITY_DISTILL_JOB,
1387            lease_owner,
1388            cognition.lease_ttl_secs,
1389            cognition.max_job_batch as i64,
1390        )
1391        .await
1392        .map_err(|error| AgentError::Storage(error.to_string()))?;
1393
1394    let mut processed = 0usize;
1395    let mut seen_sessions = HashSet::new();
1396    for job in jobs {
1397        let session_key = job
1398            .payload
1399            .get("session_key")
1400            .and_then(serde_json::Value::as_str)
1401            .map(ToString::to_string)
1402            .or_else(|| {
1403                job.perspective
1404                    .as_ref()
1405                    .and_then(|perspective| perspective.session_key.clone())
1406            })
1407            .ok_or_else(|| {
1408                AgentError::Ingest("activity_distill job missing session_key".to_string())
1409            })?;
1410        let agent_name = job
1411            .payload
1412            .get("agent")
1413            .and_then(serde_json::Value::as_str)
1414            .unwrap_or("unknown-agent")
1415            .to_string();
1416
1417        if !seen_sessions.insert(session_key.clone()) {
1418            repo.complete_job(&job)
1419                .await
1420                .map_err(|error| AgentError::Storage(error.to_string()))?;
1421            continue;
1422        }
1423
1424        match distill_raw_activity_session(
1425            repo,
1426            namespace_id,
1427            &agent_name,
1428            &session_key,
1429            cognition,
1430            llm.clone(),
1431        )
1432        .await
1433        {
1434            Ok(_) => {
1435                repo.complete_job(&job)
1436                    .await
1437                    .map_err(|error| AgentError::Storage(error.to_string()))?;
1438                processed += 1;
1439            }
1440            Err(error) => {
1441                repo.fail_job(&job, &error.to_string())
1442                    .await
1443                    .map_err(|storage_error| AgentError::Storage(storage_error.to_string()))?;
1444            }
1445        }
1446    }
1447
1448    Ok(processed)
1449}
1450
1451async fn distill_raw_activity_session(
1452    repo: &MemoryRepository,
1453    namespace_id: i64,
1454    agent: &str,
1455    session_key: &str,
1456    cognition: &CognitionConfig,
1457    llm: Arc<dyn LlmClient>,
1458) -> Result<Option<i64>, AgentError> {
1459    let events: Vec<DistillEvent> = repo
1460        .list_by_session_key(
1461            namespace_id,
1462            session_key,
1463            cognition.activity_distill_max_events as i64,
1464            true,
1465        )
1466        .await
1467        .map_err(|error| AgentError::Storage(error.to_string()))?
1468        .into_iter()
1469        .filter_map(distill_event_from_memory)
1470        .collect();
1471
1472    if events.len() < cognition.activity_distill_min_events {
1473        return Ok(None);
1474    }
1475
1476    let event_summaries: Vec<String> = events.iter().map(summarize_distill_event).collect();
1477    let source_ids: Vec<i64> = events.iter().map(|event| event.memory_id).collect();
1478    let distilled = distill_with_llm(&llm, session_key, event_summaries.as_slice())
1479        .await
1480        .unwrap_or_else(|_| fallback_distilled_session(&events));
1481
1482    let category = MemoryCategory::parse(&distilled.category).unwrap_or(MemoryCategory::Session);
1483    let lane_type = MemoryLaneType::Cognitive(MemoryLaneCognitiveType::Explicit);
1484    let cognitive = build_distill_cognitive_metadata(agent, session_key, &source_ids);
1485    let metadata = cognitive.merge_into(&serde_json::json!({
1486        "distilled_from": events.len(),
1487        "session_id": session_key,
1488        "key_activities": distilled.key_activities,
1489        "files_touched": distilled.files_touched,
1490        "tools_used": distilled.tools_used,
1491        "decisions_made": distilled.decisions_made,
1492        "pipeline": "distill-v1",
1493    }));
1494
1495    let memory = repo
1496        .store_distilled_summary(
1497            StoreMemoryParams {
1498                namespace_id,
1499                content: &distilled.summary,
1500                category: &category,
1501                memory_lane_type: Some(&lane_type),
1502                labels: &distilled.labels,
1503                metadata: &metadata,
1504                embedding: None,
1505                embedding_model: None,
1506            },
1507            &source_ids,
1508        )
1509        .await
1510        .map_err(|error| AgentError::Storage(error.to_string()))?;
1511
1512    Ok(Some(memory.id))
1513}
1514
1515async fn distill_with_llm(
1516    llm: &Arc<dyn LlmClient>,
1517    session_key: &str,
1518    event_summaries: &[String],
1519) -> Result<DistilledSession, AgentError> {
1520    let user_prompt = format!(
1521        "Session ID: {}\nEvent count: {}\n\nEvents:\n{}",
1522        session_key,
1523        event_summaries.len(),
1524        event_summaries.join("\n")
1525    );
1526    llm.generate_json(GenerateParams {
1527        messages: vec![
1528            ChatMessage::system(ACTIVITY_DISTILL_SYSTEM_PROMPT),
1529            ChatMessage::user(user_prompt),
1530        ],
1531        max_tokens: 2048,
1532        temperature: 0.3,
1533        json_mode: true,
1534    })
1535    .await
1536    .map_err(|error| AgentError::Llm(error.to_string()))
1537}
1538
1539fn distill_event_from_memory(memory: Memory) -> Option<DistillEvent> {
1540    let raw_activity = memory.metadata.get("raw_activity")?;
1541    if !raw_activity
1542        .get("distill_pending")
1543        .and_then(serde_json::Value::as_bool)
1544        .unwrap_or(false)
1545    {
1546        return None;
1547    }
1548
1549    let session_key = raw_activity
1550        .get("derived_session_key")
1551        .and_then(serde_json::Value::as_str)
1552        .or_else(|| {
1553            memory
1554                .metadata
1555                .pointer("/cognitive/session_key")
1556                .and_then(serde_json::Value::as_str)
1557        })?
1558        .to_string();
1559    let event_name = raw_activity
1560        .get("event_name")
1561        .and_then(serde_json::Value::as_str)
1562        .unwrap_or("hook_event")
1563        .to_string();
1564    let cwd = raw_activity
1565        .get("cwd")
1566        .and_then(serde_json::Value::as_str)
1567        .map(ToString::to_string);
1568    let raw_payload = memory
1569        .metadata
1570        .get("raw_payload")
1571        .cloned()
1572        .unwrap_or(serde_json::Value::Null);
1573
1574    Some(DistillEvent {
1575        memory_id: memory.id,
1576        created_at: memory.created_at,
1577        session_key,
1578        event_name,
1579        cwd,
1580        raw_payload,
1581    })
1582}
1583
1584fn summarize_distill_event(event: &DistillEvent) -> String {
1585    let ts = event
1586        .raw_payload
1587        .get("timestamp")
1588        .and_then(serde_json::Value::as_str)
1589        .map(ToOwned::to_owned)
1590        .unwrap_or_else(|| event.created_at.to_rfc3339());
1591    let event_type = event
1592        .raw_payload
1593        .get("event")
1594        .or_else(|| event.raw_payload.get("hook_event_name"))
1595        .and_then(serde_json::Value::as_str)
1596        .unwrap_or(&event.event_name);
1597    let tool = event
1598        .raw_payload
1599        .get("tool")
1600        .or_else(|| event.raw_payload.get("tool_name"))
1601        .or_else(|| event.raw_payload.get("toolName"))
1602        .and_then(serde_json::Value::as_str)
1603        .unwrap_or("-");
1604    let cwd = event
1605        .raw_payload
1606        .get("cwd")
1607        .or_else(|| event.raw_payload.get("working_directory"))
1608        .and_then(serde_json::Value::as_str)
1609        .or(event.cwd.as_deref())
1610        .unwrap_or("-");
1611    format!("{ts} | {event_type} | tool={tool} | cwd={cwd}")
1612}
1613
1614fn fallback_distilled_session(events: &[DistillEvent]) -> DistilledSession {
1615    let mut tools = BTreeSet::new();
1616    let mut workspaces = BTreeSet::new();
1617    let mut activities = Vec::new();
1618
1619    for event in events {
1620        if let Some(tool) = event
1621            .raw_payload
1622            .get("tool")
1623            .or_else(|| event.raw_payload.get("tool_name"))
1624            .or_else(|| event.raw_payload.get("toolName"))
1625            .and_then(serde_json::Value::as_str)
1626        {
1627            tools.insert(tool.to_string());
1628        }
1629        if let Some(cwd) = event.cwd.as_deref() {
1630            workspaces.insert(cwd.to_string());
1631        }
1632        activities.push(event.event_name.clone());
1633    }
1634
1635    DistilledSession {
1636        summary: format!(
1637            "Session {} produced {} low-signal hook events across {} tool(s), primarily in {}.",
1638            events
1639                .first()
1640                .map(|event| event.session_key.as_str())
1641                .unwrap_or("unknown-session"),
1642            events.len(),
1643            tools.len(),
1644            workspaces
1645                .iter()
1646                .next()
1647                .cloned()
1648                .unwrap_or_else(|| "an unknown workspace".to_string())
1649        ),
1650        category: "session".to_string(),
1651        labels: vec!["activity-summary".to_string(), "auto-distilled".to_string()],
1652        key_activities: activities.into_iter().take(5).collect(),
1653        files_touched: workspaces.into_iter().collect(),
1654        tools_used: tools.into_iter().collect(),
1655        decisions_made: Vec::new(),
1656    }
1657}
1658
1659/// Create an embedding service when `config.embedding.enabled` is true.
1660///
1661/// Returns `None` when embeddings are disabled or the service fails to
1662/// initialise — this preserves graceful degradation throughout the
1663/// cognition pipeline.
1664pub async fn create_embedding_service(
1665    config: &nexus_core::Config,
1666) -> Option<Arc<dyn EmbeddingService>> {
1667    if !config.embedding.enabled {
1668        return None;
1669    }
1670    match nexus_embeddings::create_service(config).await {
1671        Ok(Some(service)) => Some(service),
1672        Ok(None) => None,
1673        Err(error) => {
1674            warn!(
1675                %error,
1676                "Failed to initialize embedding service; configured LLM features remain available and cognition will run without semantic embeddings. Configure a remote embedding provider, local OpenAI-compatible runtime, or set NEXUS_EMBEDDINGS_ENABLED=false"
1677            );
1678            None
1679        }
1680    }
1681}
1682
1683fn build_distill_cognitive_metadata(
1684    agent: &str,
1685    session_key: &str,
1686    source_memory_ids: &[i64],
1687) -> CognitiveMetadata {
1688    let perspective = infer_perspective(
1689        PerspectiveSource::Digest,
1690        agent,
1691        None::<String>,
1692        Some(session_key.to_string()),
1693    );
1694    let mut cognitive = CognitiveMetadata::new(
1695        CognitiveLevel::SummaryShort,
1696        perspective.observer,
1697        perspective.subject,
1698        perspective.session_key,
1699        "nexus:distill-v1",
1700    );
1701    cognitive.source_memory_ids = source_memory_ids.to_vec();
1702    cognitive.confidence = Some(0.8);
1703    cognitive
1704}
1705
1706fn runtime_llm_client() -> Arc<dyn LlmClient> {
1707    match create_client_auto_with_fallback() {
1708        Ok(client) => client,
1709        Err(error) => {
1710            warn!(%error, "LLM unavailable, using deterministic cognition fallbacks");
1711            Arc::new(UnavailableLlmClient {
1712                message: error.to_string(),
1713            })
1714        }
1715    }
1716}
1717
1718struct UnavailableLlmClient {
1719    message: String,
1720}
1721
1722#[async_trait]
1723impl LlmClient for UnavailableLlmClient {
1724    async fn generate(&self, _params: GenerateParams) -> nexus_llm::Result<GenerateResponse> {
1725        Err(nexus_llm::LlmError::InvalidJsonResponse(
1726            self.message.clone(),
1727        ))
1728    }
1729
1730    fn provider_name(&self) -> String {
1731        "unavailable".to_string()
1732    }
1733
1734    fn model_name(&self) -> String {
1735        "deterministic-fallback".to_string()
1736    }
1737}
1738
1739#[cfg(test)]
1740mod tests {
1741    use super::*;
1742    use sqlx::sqlite::SqlitePoolOptions;
1743
1744    #[tokio::test]
1745    async fn run_dream_cycle_processes_namespace_jobs() {
1746        let pool = SqlitePoolOptions::new()
1747            .max_connections(1)
1748            .connect("sqlite::memory:")
1749            .await
1750            .unwrap();
1751        nexus_storage::migrations::run_migrations(&pool)
1752            .await
1753            .unwrap();
1754
1755        let namespace_repo = NamespaceRepository::new(pool.clone());
1756        let namespace = namespace_repo
1757            .get_or_create("runtime-dream-test", "runtime-dream-test")
1758            .await
1759            .unwrap();
1760        let repo = MemoryRepository::new(pool.clone());
1761
1762        for content in ["feature enabled", "feature not enabled"] {
1763            repo.store(StoreMemoryParams {
1764                namespace_id: namespace.id,
1765                content,
1766                category: &MemoryCategory::Facts,
1767                memory_lane_type: None,
1768                labels: &[],
1769                metadata: &json!({
1770                    "cognitive": {
1771                        "level": "explicit",
1772                        "observer": "claude-code",
1773                        "subject": "claude-code",
1774                        "generated_by": "test"
1775                    }
1776                }),
1777                embedding: None,
1778                embedding_model: None,
1779            })
1780            .await
1781            .unwrap();
1782        }
1783
1784        let processed = run_dream_cycle(
1785            pool.clone(),
1786            &CognitionConfig::default(),
1787            &AgentConfig::default(),
1788            Arc::new(UnavailableLlmClient {
1789                message: "offline".to_string(),
1790            }),
1791            None,
1792            DreamCycleRequest {
1793                namespace_id: namespace.id,
1794                lease_owner: "test-owner",
1795                perspective: None,
1796                session_key: None,
1797                reflect_reason: "namespace_dream",
1798                digest_reason: "dream_digest",
1799            },
1800        )
1801        .await
1802        .unwrap();
1803
1804        assert!(processed >= 1);
1805        assert_eq!(
1806            repo.get_by_cognitive_level(namespace.id, CognitiveLevel::Contradiction, 10)
1807                .await
1808                .unwrap()
1809                .len(),
1810            1
1811        );
1812        assert!(repo
1813            .list_jobs(
1814                namespace.id,
1815                Some(REFLECT_NAMESPACE_JOB),
1816                Some("pending"),
1817                10,
1818                0
1819            )
1820            .await
1821            .unwrap()
1822            .is_empty());
1823    }
1824
1825    #[tokio::test]
1826    async fn process_digest_jobs_skips_below_rollover_threshold() {
1827        let pool = SqlitePoolOptions::new()
1828            .max_connections(1)
1829            .connect("sqlite::memory:")
1830            .await
1831            .unwrap();
1832        nexus_storage::migrations::run_migrations(&pool)
1833            .await
1834            .unwrap();
1835
1836        let namespace_repo = NamespaceRepository::new(pool.clone());
1837        let namespace = namespace_repo
1838            .get_or_create("runtime-digest-skip", "runtime-digest-skip")
1839            .await
1840            .unwrap();
1841        let repo = MemoryRepository::new(pool.clone());
1842
1843        let source = repo
1844            .store(StoreMemoryParams {
1845                namespace_id: namespace.id,
1846                content: "Small explicit update.",
1847                category: &MemoryCategory::Session,
1848                memory_lane_type: None,
1849                labels: &[],
1850                metadata: &serde_json::json!({
1851                    "cognitive": {
1852                        "level": "explicit",
1853                        "observer": "claude-code",
1854                        "subject": "claude-code",
1855                        "session_key": "digest-skip-session"
1856                    }
1857                }),
1858                embedding: None,
1859                embedding_model: None,
1860            })
1861            .await
1862            .unwrap();
1863
1864        let prior_digest = repo
1865            .store(StoreMemoryParams {
1866                namespace_id: namespace.id,
1867                content: "Prior digest",
1868                category: &MemoryCategory::Session,
1869                memory_lane_type: None,
1870                labels: &[],
1871                metadata: &serde_json::json!({
1872                    "cognitive": {
1873                        "level": "summary_short",
1874                        "observer": "claude-code",
1875                        "subject": "claude-code",
1876                        "session_key": "digest-skip-session"
1877                    }
1878                }),
1879                embedding: None,
1880                embedding_model: None,
1881            })
1882            .await
1883            .unwrap();
1884
1885        repo.store_digest(nexus_storage::repository::StoreDigestParams {
1886            namespace_id: namespace.id,
1887            session_key: "digest-skip-session",
1888            digest_kind: "short",
1889            memory_id: prior_digest.id,
1890            start_memory_id: Some(source.id),
1891            end_memory_id: Some(source.id),
1892            token_count: 12,
1893        })
1894        .await
1895        .unwrap();
1896
1897        let follow_up = repo
1898            .store(StoreMemoryParams {
1899                namespace_id: namespace.id,
1900                content: "Tiny follow-up.",
1901                category: &MemoryCategory::Session,
1902                memory_lane_type: None,
1903                labels: &[],
1904                metadata: &serde_json::json!({
1905                    "cognitive": {
1906                        "level": "explicit",
1907                        "observer": "claude-code",
1908                        "subject": "claude-code",
1909                        "session_key": "digest-skip-session"
1910                    }
1911                }),
1912                embedding: None,
1913                embedding_model: None,
1914            })
1915            .await
1916            .unwrap();
1917
1918        repo.enqueue_job(nexus_storage::EnqueueJobParams {
1919            namespace_id: namespace.id,
1920            job_type: DIGEST_SESSION_JOB,
1921            priority: 90,
1922            perspective: None,
1923            payload: &serde_json::json!({
1924                "session_key": "digest-skip-session",
1925                "source_memory_id": follow_up.id
1926            }),
1927        })
1928        .await
1929        .unwrap();
1930
1931        assert_eq!(
1932            repo.list_jobs(
1933                namespace.id,
1934                Some(DIGEST_SESSION_JOB),
1935                Some("pending"),
1936                10,
1937                0
1938            )
1939            .await
1940            .unwrap()
1941            .len(),
1942            1
1943        );
1944
1945        let processed = process_digest_jobs(
1946            &repo,
1947            namespace.id,
1948            &CognitionConfig::default(),
1949            &AgentConfig::default(),
1950            Arc::new(UnavailableLlmClient {
1951                message: "offline".to_string(),
1952            }),
1953            None,
1954            "digest-skip",
1955        )
1956        .await
1957        .unwrap();
1958
1959        assert_eq!(processed, 1);
1960        let latest = repo
1961            .latest_digest_for_session(namespace.id, "digest-skip-session", "short")
1962            .await
1963            .unwrap()
1964            .unwrap();
1965        assert_eq!(latest.id, prior_digest.id);
1966        assert_eq!(
1967            repo.count_digests(namespace.id, Some("digest-skip-session"))
1968                .await
1969                .unwrap(),
1970            1
1971        );
1972    }
1973
1974    #[test]
1975    fn digest_job_force_reason_matches_expected_inputs() {
1976        assert!(digest_job_is_forced(Some("dream_digest")));
1977        assert!(digest_job_is_forced(Some("session_end")));
1978        assert!(digest_job_is_forced(Some("manual_digest")));
1979        assert!(digest_job_is_forced(Some("manual_rebuild")));
1980        assert!(!digest_job_is_forced(Some("derive_follow_up")));
1981        assert!(!digest_job_is_forced(None));
1982    }
1983
1984    #[test]
1985    fn sanitize_component_replaces_unsafe_chars() {
1986        assert_eq!(sanitize_component("claude/code:1"), "claude_code_1");
1987    }
1988
1989    #[test]
1990    fn derive_session_key_prefers_explicit_key() {
1991        assert_eq!(
1992            derive_session_key("claude-code", Some("abc"), Some("/tmp/project")),
1993            "abc"
1994        );
1995    }
1996
1997    #[test]
1998    fn derive_session_key_falls_back_to_stable_hash() {
1999        let first = derive_session_key("claude-code", None, Some("/tmp/project"));
2000        let second = derive_session_key("claude-code", Some(""), Some("/tmp/project"));
2001        assert_eq!(first, second);
2002        assert!(first.starts_with("derived-"));
2003    }
2004
2005    #[test]
2006    fn choose_dream_schedule_immediate_for_contradictions() {
2007        let plan = choose_dream_schedule(
2008            &CognitionConfig::default(),
2009            &DreamSignals {
2010                contradiction_count: 1,
2011                raw_event_count: 1,
2012                ..DreamSignals::default()
2013            },
2014            RuntimeShutdownReason::SessionEnded,
2015        );
2016
2017        assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
2018    }
2019
2020    #[test]
2021    fn choose_dream_schedule_digest_only_for_light_digest_gap() {
2022        let config = CognitionConfig::default();
2023        let plan = choose_dream_schedule(
2024            &config,
2025            &DreamSignals {
2026                raw_event_count: 1,
2027                has_digest_gap: true,
2028                ..DreamSignals::default()
2029            },
2030            RuntimeShutdownReason::SessionEnded,
2031        );
2032
2033        assert_eq!(plan.action, DreamScheduleAction::DigestOnly);
2034    }
2035
2036    #[test]
2037    fn choose_dream_schedule_delays_idle_medium_signal_sessions() {
2038        let plan = choose_dream_schedule(
2039            &CognitionConfig::default(),
2040            &DreamSignals {
2041                raw_event_count: 2,
2042                explicit_count: 2,
2043                derived_count: 0,
2044                ..DreamSignals::default()
2045            },
2046            RuntimeShutdownReason::IdleTimeout,
2047        );
2048
2049        assert_eq!(plan.action, DreamScheduleAction::DelayedEnqueue);
2050    }
2051
2052    #[test]
2053    fn choose_dream_schedule_session_end_flushes_explicit_reflection() {
2054        let plan = choose_dream_schedule(
2055            &CognitionConfig::default(),
2056            &DreamSignals {
2057                explicit_count: 2,
2058                has_digest_gap: true,
2059                ..DreamSignals::default()
2060            },
2061            RuntimeShutdownReason::SessionEnded,
2062        );
2063
2064        assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
2065    }
2066
2067    #[tokio::test]
2068    async fn collect_dream_signals_counts_session_signal_types() {
2069        let pool = SqlitePoolOptions::new()
2070            .max_connections(1)
2071            .connect("sqlite::memory:")
2072            .await
2073            .unwrap();
2074        nexus_storage::migrations::run_migrations(&pool)
2075            .await
2076            .unwrap();
2077
2078        let namespace_repo = NamespaceRepository::new(pool.clone());
2079        let namespace = namespace_repo
2080            .get_or_create("runtime-dream-signals", "runtime-dream-signals")
2081            .await
2082            .unwrap();
2083        let repo = MemoryRepository::new(pool.clone());
2084
2085        for (content, labels, metadata) in [
2086            (
2087                "tool event",
2088                vec!["raw-activity".to_string()],
2089                json!({
2090                    "raw_activity": true,
2091                    "cognitive": {
2092                        "level": "raw",
2093                        "observer": "claude-code",
2094                        "subject": "claude-code",
2095                        "session_key": "signals-session"
2096                    }
2097                }),
2098            ),
2099            (
2100                "explicit note",
2101                vec![],
2102                json!({
2103                    "cognitive": {
2104                        "level": "explicit",
2105                        "observer": "claude-code",
2106                        "subject": "claude-code",
2107                        "session_key": "signals-session"
2108                    }
2109                }),
2110            ),
2111            (
2112                "derived insight",
2113                vec![],
2114                json!({
2115                    "cognitive": {
2116                        "level": "derived",
2117                        "observer": "claude-code",
2118                        "subject": "claude-code",
2119                        "session_key": "signals-session"
2120                    }
2121                }),
2122            ),
2123            (
2124                "contradiction record",
2125                vec![],
2126                json!({
2127                    "cognitive": {
2128                        "level": "contradiction",
2129                        "observer": "claude-code",
2130                        "subject": "claude-code",
2131                        "session_key": "signals-session"
2132                    }
2133                }),
2134            ),
2135        ] {
2136            repo.store(StoreMemoryParams {
2137                namespace_id: namespace.id,
2138                content,
2139                category: &MemoryCategory::Session,
2140                memory_lane_type: None,
2141                labels: &labels,
2142                metadata: &metadata,
2143                embedding: None,
2144                embedding_model: None,
2145            })
2146            .await
2147            .unwrap();
2148        }
2149
2150        let signals = collect_dream_signals(&repo, namespace.id, "signals-session")
2151            .await
2152            .unwrap();
2153
2154        assert_eq!(signals.raw_event_count, 1);
2155        assert_eq!(signals.explicit_count, 1);
2156        assert_eq!(signals.derived_count, 1);
2157        assert_eq!(signals.contradiction_count, 1);
2158        assert!(signals.has_digest_gap);
2159    }
2160
2161    #[tokio::test]
2162    async fn enqueue_dream_jobs_coalesces_session_scoped_shutdown_work() {
2163        let pool = SqlitePoolOptions::new()
2164            .max_connections(1)
2165            .connect("sqlite::memory:")
2166            .await
2167            .unwrap();
2168        nexus_storage::migrations::run_migrations(&pool)
2169            .await
2170            .unwrap();
2171
2172        let namespace_repo = NamespaceRepository::new(pool.clone());
2173        let namespace = namespace_repo
2174            .get_or_create("runtime-dream-dedupe", "runtime-dream-dedupe")
2175            .await
2176            .unwrap();
2177        let repo = MemoryRepository::new(pool.clone());
2178        let perspective = PerspectiveKey {
2179            observer: "claude-code".to_string(),
2180            subject: "claude-code".to_string(),
2181            session_key: Some("session-123".to_string()),
2182        };
2183
2184        let first = enqueue_dream_jobs(
2185            &repo,
2186            namespace.id,
2187            Some(&perspective),
2188            Some("session-123"),
2189            "session_end_dream",
2190            "session_end",
2191        )
2192        .await
2193        .unwrap();
2194        let second = enqueue_dream_jobs(
2195            &repo,
2196            namespace.id,
2197            Some(&perspective),
2198            Some("session-123"),
2199            "session_end_dream",
2200            "session_end",
2201        )
2202        .await
2203        .unwrap();
2204
2205        assert_eq!(first, 2);
2206        assert_eq!(second, 0);
2207        assert_eq!(
2208            repo.list_jobs(
2209                namespace.id,
2210                Some(REFLECT_PERSPECTIVE_JOB),
2211                Some(memory_job_status::PENDING),
2212                10,
2213                0
2214            )
2215            .await
2216            .unwrap()
2217            .len(),
2218            1
2219        );
2220        assert_eq!(
2221            repo.list_jobs(
2222                namespace.id,
2223                Some(REFLECT_NAMESPACE_JOB),
2224                Some(memory_job_status::PENDING),
2225                10,
2226                0
2227            )
2228            .await
2229            .unwrap()
2230            .len(),
2231            0
2232        );
2233        assert_eq!(
2234            repo.list_jobs(
2235                namespace.id,
2236                Some(DIGEST_SESSION_JOB),
2237                Some(memory_job_status::PENDING),
2238                10,
2239                0
2240            )
2241            .await
2242            .unwrap()
2243            .len(),
2244            1
2245        );
2246    }
2247
2248    // ---- Adaptive dream scheduling tests (Phase 16) ----
2249
2250    #[test]
2251    fn test_choose_dream_schedule_uses_contradiction_density() {
2252        let cognition = CognitionConfig::default();
2253
2254        // No signals → skip.
2255        let plan = choose_dream_schedule(
2256            &cognition,
2257            &DreamSignals::default(),
2258            RuntimeShutdownReason::SessionEnded,
2259        );
2260        assert_eq!(plan.action, DreamScheduleAction::Skip);
2261
2262        // High contradiction density → immediate.
2263        let high_density = DreamSignals {
2264            total_non_raw_count: 10,
2265            contradiction_count: 3,
2266            contradiction_density: 0.30,
2267            ..DreamSignals::default()
2268        };
2269        let plan = choose_dream_schedule(
2270            &cognition,
2271            &high_density,
2272            RuntimeShutdownReason::SessionEnded,
2273        );
2274        assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
2275        assert!(plan.reason.contains("high contradiction density"));
2276
2277        // Moderate contradiction density at session end → immediate.
2278        let moderate_density = DreamSignals {
2279            total_non_raw_count: 20,
2280            contradiction_count: 2,
2281            contradiction_density: 0.10,
2282            explicit_count: 5,
2283            ..DreamSignals::default()
2284        };
2285        let plan = choose_dream_schedule(
2286            &cognition,
2287            &moderate_density,
2288            RuntimeShutdownReason::SessionEnded,
2289        );
2290        assert_eq!(plan.action, DreamScheduleAction::ImmediateBounded);
2291        assert!(plan.reason.contains("moderate contradiction density"));
2292
2293        // Moderate contradiction density at idle timeout → not immediate.
2294        let plan = choose_dream_schedule(
2295            &cognition,
2296            &moderate_density,
2297            RuntimeShutdownReason::IdleTimeout,
2298        );
2299        assert_ne!(plan.action, DreamScheduleAction::ImmediateBounded);
2300    }
2301
2302    #[test]
2303    fn test_dream_signals_computes_contradiction_density() {
2304        let signals = DreamSignals {
2305            total_non_raw_count: 20,
2306            contradiction_count: 4,
2307            contradiction_density: 4.0 / 20.0,
2308            ..DreamSignals::default()
2309        };
2310        assert!((signals.contradiction_density - 0.20).abs() < f32::EPSILON);
2311
2312        let empty = DreamSignals::default();
2313        assert!((empty.contradiction_density).abs() < f32::EPSILON);
2314    }
2315}