Skip to main content

nexus_memory_agent/
supervisor.rs

1//! Agent supervisor - manages background agent loops
2
3use std::sync::Arc;
4
5use chrono::Utc;
6use nexus_core::config::AgentConfig;
7use nexus_core::traits::EmbeddingService;
8use nexus_core::Config;
9use nexus_llm::LlmClient;
10use nexus_storage::repository::{MemoryRepository, ProcessedFileRepository};
11use sqlx::SqlitePool;
12use tokio::sync::RwLock;
13use tokio::task::JoinHandle;
14use tokio::time::{interval, Duration};
15use tokio_util::sync::CancellationToken;
16use tracing::{debug, error, info};
17
18use crate::activity_monitor::ActivityMonitor;
19use crate::dream_cycle::{drain_cognition_jobs, run_dream_cycle, DreamCycleRequest};
20use crate::error::AgentError;
21use crate::inbox::InboxScanner;
22use crate::ingest::IngestService;
23use crate::pulse;
24use crate::query::QueryService;
25use crate::soul::SoulBuilder;
26use crate::types::{AgentStatus, QueryIntrospection};
27
28/// How long to wait for tasks to shut down gracefully before force-aborting.
29const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
30
31pub struct AgentSupervisor {
32    config: AgentConfig,
33    llm: Arc<dyn LlmClient>,
34    query_embedder: Option<Arc<dyn EmbeddingService>>,
35    pool: SqlitePool,
36    namespace_id: i64,
37    project_root: std::path::PathBuf,
38    status: Arc<RwLock<AgentStatus>>,
39    cancel_token: CancellationToken,
40    tasks: Vec<JoinHandle<()>>,
41}
42
43impl AgentSupervisor {
44    pub fn new(
45        config: AgentConfig,
46        llm: Arc<dyn LlmClient>,
47        pool: SqlitePool,
48        namespace_id: i64,
49        project_root: std::path::PathBuf,
50    ) -> Self {
51        let status = Arc::new(RwLock::new(AgentStatus {
52            enabled: config.enabled,
53            namespace: config.namespace.clone(),
54            inbox_dir: config.inbox_dir.clone(),
55            last_scan: None,
56            last_consolidation: None,
57            files_processed: 0,
58            memories_consolidated: 0,
59            queries_answered: 0,
60            errors: Vec::new(),
61        }));
62
63        Self {
64            config,
65            llm,
66            query_embedder: None,
67            pool,
68            namespace_id,
69            project_root,
70            status,
71            cancel_token: CancellationToken::new(),
72            tasks: Vec::new(),
73        }
74    }
75
76    pub async fn start(&mut self) -> Result<(), AgentError> {
77        if !self.config.enabled {
78            info!("Agent is disabled, not starting supervisor");
79            return Ok(());
80        }
81
82        info!("Starting agent supervisor");
83
84        // Spawn inbox scanner task
85        let inbox_handle = self.spawn_inbox_scanner().await?;
86        self.tasks.push(inbox_handle);
87
88        // Spawn consolidation task
89        let consolidation_handle = self.spawn_consolidation_task().await?;
90        self.tasks.push(consolidation_handle);
91
92        let cognition_handle = self.spawn_cognition_worker_task().await?;
93        self.tasks.push(cognition_handle);
94
95        info!("Agent supervisor started with {} tasks", self.tasks.len());
96        Ok(())
97    }
98
99    pub async fn stop(&mut self) {
100        info!("Stopping agent supervisor (signaling graceful shutdown)");
101
102        self.cancel_token.cancel();
103
104        // Wait for tasks to complete gracefully
105        let mut remaining: Vec<JoinHandle<()>> = Vec::new();
106        for task in self.tasks.drain(..) {
107            if task.is_finished() {
108                let _ = task.await;
109            } else {
110                remaining.push(task);
111            }
112        }
113
114        if !remaining.is_empty() {
115            info!(
116                "Waiting up to {}s for {} task(s) to finish gracefully",
117                GRACEFUL_SHUTDOWN_TIMEOUT.as_secs(),
118                remaining.len()
119            );
120            match tokio::time::timeout(GRACEFUL_SHUTDOWN_TIMEOUT, async {
121                for task in remaining {
122                    let _ = task.await;
123                }
124            })
125            .await
126            {
127                Ok(()) => info!("All tasks shut down gracefully"),
128                Err(_) => {
129                    info!("Graceful shutdown timed out");
130                }
131            }
132        }
133
134        self.tasks.clear();
135        info!("Agent supervisor stopped");
136    }
137
138    pub async fn get_status(&self) -> AgentStatus {
139        self.status.read().await.clone()
140    }
141
142    pub fn with_query_embedder(mut self, embedder: Arc<dyn EmbeddingService>) -> Self {
143        self.query_embedder = Some(embedder);
144        self
145    }
146
147    pub async fn increment_queries_answered(&self) {
148        let mut s = self.status.write().await;
149        s.queries_answered += 1;
150    }
151
152    pub fn query_service(&self) -> QueryService {
153        if let Some(embedder) = &self.query_embedder {
154            QueryService::with_embedder(self.llm.clone(), self.config.clone(), embedder.clone())
155        } else {
156            QueryService::new(self.llm.clone(), self.config.clone())
157        }
158    }
159
160    pub fn ingest_service(&self) -> IngestService {
161        IngestService::new(self.llm.clone(), self.config.clone())
162    }
163
164    pub fn namespace_id(&self) -> i64 {
165        self.namespace_id
166    }
167
168    pub async fn query_introspection(
169        &self,
170        question: &str,
171        namespace_id: i64,
172        memory_repo: &MemoryRepository,
173    ) -> Result<QueryIntrospection, AgentError> {
174        self.query_service()
175            .query_introspection(question, namespace_id, memory_repo)
176            .await
177    }
178
179    async fn spawn_inbox_scanner(&self) -> Result<JoinHandle<()>, AgentError> {
180        let config = self.config.clone();
181        let llm = self.llm.clone();
182        let pool = self.pool.clone();
183        let namespace_id = self.namespace_id;
184        let status = self.status.clone();
185        let interval_secs = config.scan_interval_secs;
186        let cancel = self.cancel_token.clone();
187
188        let handle = tokio::spawn(async move {
189            let ingest_service = IngestService::new(llm.clone(), config.clone());
190            let scanner = InboxScanner::new(config, ingest_service);
191            let mut ticker = interval(Duration::from_secs(interval_secs));
192
193            loop {
194                tokio::select! {
195                    _ = ticker.tick() => {}
196                    _ = cancel.cancelled() => {
197                        info!("Inbox scanner received shutdown signal");
198                        break;
199                    }
200                }
201
202                let processed_repo = ProcessedFileRepository::new(&pool);
203                let memory_repo = MemoryRepository::new(pool.clone());
204
205                match scanner
206                    .run(namespace_id, &processed_repo, &memory_repo)
207                    .await
208                {
209                    Ok(result) => {
210                        let mut s = status.write().await;
211                        s.last_scan = Some(Utc::now());
212                        s.files_processed += result.processed;
213                        pulse::write_pulse(
214                            "inbox_scan",
215                            s.memories_consolidated,
216                            s.files_processed,
217                        );
218                    }
219                    Err(e) => {
220                        error!(error = %e, namespace_id, "Inbox scan failed");
221                        let mut s = status.write().await;
222                        s.errors.push(format!("Scan error: {}", e));
223                        if s.errors.len() > 10 {
224                            s.errors.remove(0);
225                        }
226                    }
227                }
228            }
229        });
230
231        Ok(handle)
232    }
233
234    async fn spawn_consolidation_task(&self) -> Result<JoinHandle<()>, AgentError> {
235        let config = self.config.clone();
236        let llm = self.llm.clone();
237        let pool = self.pool.clone();
238        let namespace_id = self.namespace_id;
239        let project_root = self.project_root.clone();
240        let status = self.status.clone();
241        let embedder = self.query_embedder.clone();
242        let base_interval_secs = config.consolidation_interval_mins * 60;
243        let cancel = self.cancel_token.clone();
244
245        let full_config = Config::from_env().unwrap_or_default();
246        let cognition = full_config.cognition.clone();
247        let cognitive_system = full_config.cognitive_system.clone();
248
249        let handle = tokio::spawn(async move {
250            let soul_builder = SoulBuilder::new(llm.clone());
251            let mut last_dream_count: usize = 0;
252
253            loop {
254                // Reload activity monitor from disk each iteration
255                let mut activity_monitor = ActivityMonitor::load();
256                activity_monitor.deep_dream_cooldown = chrono::Duration::hours(
257                    cognitive_system.dream_triggers.deep_dream_cooldown_hours as i64,
258                );
259                activity_monitor.deep_dream_inactivity_mins =
260                    cognitive_system.dream_triggers.deep_dream_inactivity_mins;
261                if cognitive_system.enabled {
262                    let memory_repo = MemoryRepository::new(pool.clone());
263
264                    // 1. Threshold dream (trigger when enough new memories accumulate
265                    //    since the last dream, not on every single increment)
266                    match memory_repo.count_by_namespace(namespace_id).await {
267                        Ok(count) => {
268                            let count_usize = count as usize;
269                            let threshold = cognitive_system.dream_triggers.dream_memory_threshold;
270                            let new_since_last = count_usize.saturating_sub(last_dream_count);
271                            if count_usize >= threshold && new_since_last >= threshold {
272                                info!(
273                                    "Dream threshold reached ({} memories). Running dream cycle.",
274                                    count
275                                );
276                                let cwd = project_root.clone();
277                                let services = crate::dream_cycle::DreamServices {
278                                    pool: pool.clone(),
279                                    cognition: cognition.clone(),
280                                    agent: config.clone(),
281                                    llm: llm.clone(),
282                                    embeddings: embedder.clone(),
283                                    cognitive_system: cognitive_system.clone(),
284                                };
285                                match crate::dream_cycle::run_dream(&cwd, namespace_id, &services)
286                                    .await
287                                {
288                                    Ok(_) => {
289                                        last_dream_count = count_usize;
290                                    }
291                                    Err(e) => {
292                                        tracing::error!(
293                                            error = %e,
294                                            namespace_id,
295                                            count = count_usize,
296                                            "Threshold dream failed"
297                                        );
298                                    }
299                                }
300                            }
301                        }
302                        Err(e) => {
303                            tracing::warn!(
304                                error = %e,
305                                namespace_id,
306                                "Failed to query memory count for threshold dream"
307                            );
308                        }
309                    }
310
311                    // 2. Deep dream
312                    if activity_monitor.should_deep_dream() {
313                        info!("Deep dream conditions met. Running global synthesis.");
314                        let services = crate::dream_cycle::DreamServices {
315                            pool: pool.clone(),
316                            cognition: cognition.clone(),
317                            agent: config.clone(),
318                            llm: llm.clone(),
319                            embeddings: embedder.clone(),
320                            cognitive_system: cognitive_system.clone(),
321                        };
322                        if let Err(e) = crate::dream_cycle::run_deep_dream(
323                            &services,
324                            &soul_builder,
325                            &mut activity_monitor,
326                        )
327                        .await
328                        {
329                            tracing::warn!(error = %e, "Deep dream cycle failed");
330                        }
331                    }
332                }
333
334                // Persist activity monitor state after potential deep dream.
335                // Reload activity_log from disk first to merge any samples
336                // recorded by hooks during the deep-dream run.
337                let disk_monitor = ActivityMonitor::load();
338                activity_monitor.activity_log = disk_monitor.activity_log;
339                if let Err(e) = activity_monitor.save() {
340                    tracing::warn!(error = %e, "Failed to save activity monitor");
341                }
342
343                let sleep_duration = if cognition.adaptive_dream_enabled {
344                    crate::dream_cycle::compute_adaptive_dream_interval(
345                        pool.clone(),
346                        namespace_id,
347                        base_interval_secs,
348                        &cognition,
349                    )
350                    .await
351                } else {
352                    Duration::from_secs(base_interval_secs)
353                };
354
355                tokio::select! {
356                    _ = tokio::time::sleep(sleep_duration) => {}
357                    _ = cancel.cancelled() => {
358                        info!("Consolidation task received shutdown signal");
359                        break;
360                    }
361                }
362
363                let lease_owner = format!("supervisor-dream-{}", namespace_id);
364                match run_dream_cycle(
365                    pool.clone(),
366                    &cognition,
367                    &config,
368                    llm.clone(),
369                    None,
370                    DreamCycleRequest {
371                        namespace_id,
372                        lease_owner: &lease_owner,
373                        perspective: None,
374                        session_key: None,
375                        reflect_reason: "namespace_dream",
376                        digest_reason: "dream_digest",
377                    },
378                )
379                .await
380                {
381                    Ok(processed) if processed > 0 => {
382                        let mut s = status.write().await;
383                        s.last_consolidation = Some(Utc::now());
384                        s.memories_consolidated += processed as u64;
385                        pulse::write_pulse(
386                            "consolidation",
387                            s.memories_consolidated,
388                            s.files_processed,
389                        );
390                    }
391                    Ok(_) => {
392                        debug!("No memories to consolidate");
393                    }
394                    Err(e) => {
395                        error!(error = %e, namespace_id, "Consolidation failed");
396                    }
397                }
398            }
399        });
400
401        Ok(handle)
402    }
403
404    async fn spawn_cognition_worker_task(&self) -> Result<JoinHandle<()>, AgentError> {
405        let cognition = Config::from_env()
406            .map(|config| config.cognition)
407            .unwrap_or_default();
408        let agent = self.config.clone();
409        let llm = self.llm.clone();
410        let pool = self.pool.clone();
411        let namespace_id = self.namespace_id;
412        let cancel = self.cancel_token.clone();
413
414        let handle = tokio::spawn(async move {
415            let mut ticker = interval(Duration::from_secs(agent.scan_interval_secs.max(1)));
416
417            loop {
418                tokio::select! {
419                    _ = ticker.tick() => {}
420                    _ = cancel.cancelled() => {
421                        info!("Cognition worker received shutdown signal");
422                        break;
423                    }
424                }
425
426                match drain_cognition_jobs(
427                    pool.clone(),
428                    namespace_id,
429                    &cognition,
430                    &agent,
431                    llm.clone(),
432                    None,
433                    &format!("worker-{}", namespace_id),
434                )
435                .await
436                {
437                    Ok(processed) if processed > 0 => {
438                        debug!(processed, "Cognition worker drained jobs");
439                    }
440                    Ok(_) => {}
441                    Err(e) => {
442                        error!(error = %e, namespace_id, "Cognition worker failed");
443                    }
444                }
445            }
446        });
447
448        Ok(handle)
449    }
450}