1use std::sync::Arc;
4
5use chrono::Utc;
6use nexus_core::config::AgentConfig;
7use nexus_core::traits::EmbeddingService;
8use nexus_core::Config;
9use nexus_llm::LlmClient;
10use nexus_storage::repository::{MemoryRepository, ProcessedFileRepository};
11use sqlx::SqlitePool;
12use tokio::sync::RwLock;
13use tokio::task::JoinHandle;
14use tokio::time::{interval, Duration};
15use tokio_util::sync::CancellationToken;
16use tracing::{debug, error, info};
17
18use crate::activity_monitor::ActivityMonitor;
19use crate::dream_cycle::{drain_cognition_jobs, run_dream_cycle, DreamCycleRequest};
20use crate::error::AgentError;
21use crate::inbox::InboxScanner;
22use crate::ingest::IngestService;
23use crate::pulse;
24use crate::query::QueryService;
25use crate::soul::SoulBuilder;
26use crate::types::{AgentStatus, QueryIntrospection};
27
28const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
30
31pub struct AgentSupervisor {
32 config: AgentConfig,
33 llm: Arc<dyn LlmClient>,
34 query_embedder: Option<Arc<dyn EmbeddingService>>,
35 pool: SqlitePool,
36 namespace_id: i64,
37 project_root: std::path::PathBuf,
38 status: Arc<RwLock<AgentStatus>>,
39 cancel_token: CancellationToken,
40 tasks: Vec<JoinHandle<()>>,
41}
42
43impl AgentSupervisor {
44 pub fn new(
45 config: AgentConfig,
46 llm: Arc<dyn LlmClient>,
47 pool: SqlitePool,
48 namespace_id: i64,
49 project_root: std::path::PathBuf,
50 ) -> Self {
51 let status = Arc::new(RwLock::new(AgentStatus {
52 enabled: config.enabled,
53 namespace: config.namespace.clone(),
54 inbox_dir: config.inbox_dir.clone(),
55 last_scan: None,
56 last_consolidation: None,
57 files_processed: 0,
58 memories_consolidated: 0,
59 queries_answered: 0,
60 errors: Vec::new(),
61 }));
62
63 Self {
64 config,
65 llm,
66 query_embedder: None,
67 pool,
68 namespace_id,
69 project_root,
70 status,
71 cancel_token: CancellationToken::new(),
72 tasks: Vec::new(),
73 }
74 }
75
76 pub async fn start(&mut self) -> Result<(), AgentError> {
77 if !self.config.enabled {
78 info!("Agent is disabled, not starting supervisor");
79 return Ok(());
80 }
81
82 info!("Starting agent supervisor");
83
84 let inbox_handle = self.spawn_inbox_scanner().await?;
86 self.tasks.push(inbox_handle);
87
88 let consolidation_handle = self.spawn_consolidation_task().await?;
90 self.tasks.push(consolidation_handle);
91
92 let cognition_handle = self.spawn_cognition_worker_task().await?;
93 self.tasks.push(cognition_handle);
94
95 info!("Agent supervisor started with {} tasks", self.tasks.len());
96 Ok(())
97 }
98
99 pub async fn stop(&mut self) {
100 info!("Stopping agent supervisor (signaling graceful shutdown)");
101
102 self.cancel_token.cancel();
103
104 let mut remaining: Vec<JoinHandle<()>> = Vec::new();
106 for task in self.tasks.drain(..) {
107 if task.is_finished() {
108 let _ = task.await;
109 } else {
110 remaining.push(task);
111 }
112 }
113
114 if !remaining.is_empty() {
115 info!(
116 "Waiting up to {}s for {} task(s) to finish gracefully",
117 GRACEFUL_SHUTDOWN_TIMEOUT.as_secs(),
118 remaining.len()
119 );
120 match tokio::time::timeout(GRACEFUL_SHUTDOWN_TIMEOUT, async {
121 for task in remaining {
122 let _ = task.await;
123 }
124 })
125 .await
126 {
127 Ok(()) => info!("All tasks shut down gracefully"),
128 Err(_) => {
129 info!("Graceful shutdown timed out");
130 }
131 }
132 }
133
134 self.tasks.clear();
135 info!("Agent supervisor stopped");
136 }
137
138 pub async fn get_status(&self) -> AgentStatus {
139 self.status.read().await.clone()
140 }
141
142 pub fn with_query_embedder(mut self, embedder: Arc<dyn EmbeddingService>) -> Self {
143 self.query_embedder = Some(embedder);
144 self
145 }
146
147 pub async fn increment_queries_answered(&self) {
148 let mut s = self.status.write().await;
149 s.queries_answered += 1;
150 }
151
152 pub fn query_service(&self) -> QueryService {
153 if let Some(embedder) = &self.query_embedder {
154 QueryService::with_embedder(self.llm.clone(), self.config.clone(), embedder.clone())
155 } else {
156 QueryService::new(self.llm.clone(), self.config.clone())
157 }
158 }
159
160 pub fn ingest_service(&self) -> IngestService {
161 IngestService::new(self.llm.clone(), self.config.clone())
162 }
163
164 pub fn namespace_id(&self) -> i64 {
165 self.namespace_id
166 }
167
168 pub async fn query_introspection(
169 &self,
170 question: &str,
171 namespace_id: i64,
172 memory_repo: &MemoryRepository,
173 ) -> Result<QueryIntrospection, AgentError> {
174 self.query_service()
175 .query_introspection(question, namespace_id, memory_repo)
176 .await
177 }
178
179 async fn spawn_inbox_scanner(&self) -> Result<JoinHandle<()>, AgentError> {
180 let config = self.config.clone();
181 let llm = self.llm.clone();
182 let pool = self.pool.clone();
183 let namespace_id = self.namespace_id;
184 let status = self.status.clone();
185 let interval_secs = config.scan_interval_secs;
186 let cancel = self.cancel_token.clone();
187
188 let handle = tokio::spawn(async move {
189 let ingest_service = IngestService::new(llm.clone(), config.clone());
190 let scanner = InboxScanner::new(config, ingest_service);
191 let mut ticker = interval(Duration::from_secs(interval_secs));
192
193 loop {
194 tokio::select! {
195 _ = ticker.tick() => {}
196 _ = cancel.cancelled() => {
197 info!("Inbox scanner received shutdown signal");
198 break;
199 }
200 }
201
202 let processed_repo = ProcessedFileRepository::new(&pool);
203 let memory_repo = MemoryRepository::new(pool.clone());
204
205 match scanner
206 .run(namespace_id, &processed_repo, &memory_repo)
207 .await
208 {
209 Ok(result) => {
210 let mut s = status.write().await;
211 s.last_scan = Some(Utc::now());
212 s.files_processed += result.processed;
213 pulse::write_pulse(
214 "inbox_scan",
215 s.memories_consolidated,
216 s.files_processed,
217 );
218 }
219 Err(e) => {
220 error!(error = %e, namespace_id, "Inbox scan failed");
221 let mut s = status.write().await;
222 s.errors.push(format!("Scan error: {}", e));
223 if s.errors.len() > 10 {
224 s.errors.remove(0);
225 }
226 }
227 }
228 }
229 });
230
231 Ok(handle)
232 }
233
234 async fn spawn_consolidation_task(&self) -> Result<JoinHandle<()>, AgentError> {
235 let config = self.config.clone();
236 let llm = self.llm.clone();
237 let pool = self.pool.clone();
238 let namespace_id = self.namespace_id;
239 let project_root = self.project_root.clone();
240 let status = self.status.clone();
241 let embedder = self.query_embedder.clone();
242 let base_interval_secs = config.consolidation_interval_mins * 60;
243 let cancel = self.cancel_token.clone();
244
245 let full_config = Config::from_env().unwrap_or_default();
246 let cognition = full_config.cognition.clone();
247 let cognitive_system = full_config.cognitive_system.clone();
248
249 let handle = tokio::spawn(async move {
250 let soul_builder = SoulBuilder::new(llm.clone());
251 let mut last_dream_count: usize = 0;
252
253 loop {
254 let mut activity_monitor = ActivityMonitor::load();
256 activity_monitor.deep_dream_cooldown = chrono::Duration::hours(
257 cognitive_system.dream_triggers.deep_dream_cooldown_hours as i64,
258 );
259 activity_monitor.deep_dream_inactivity_mins =
260 cognitive_system.dream_triggers.deep_dream_inactivity_mins;
261 if cognitive_system.enabled {
262 let memory_repo = MemoryRepository::new(pool.clone());
263
264 match memory_repo.count_by_namespace(namespace_id).await {
267 Ok(count) => {
268 let count_usize = count as usize;
269 let threshold = cognitive_system.dream_triggers.dream_memory_threshold;
270 let new_since_last = count_usize.saturating_sub(last_dream_count);
271 if count_usize >= threshold && new_since_last >= threshold {
272 info!(
273 "Dream threshold reached ({} memories). Running dream cycle.",
274 count
275 );
276 let cwd = project_root.clone();
277 let services = crate::dream_cycle::DreamServices {
278 pool: pool.clone(),
279 cognition: cognition.clone(),
280 agent: config.clone(),
281 llm: llm.clone(),
282 embeddings: embedder.clone(),
283 cognitive_system: cognitive_system.clone(),
284 };
285 match crate::dream_cycle::run_dream(&cwd, namespace_id, &services)
286 .await
287 {
288 Ok(_) => {
289 last_dream_count = count_usize;
290 }
291 Err(e) => {
292 tracing::error!(
293 error = %e,
294 namespace_id,
295 count = count_usize,
296 "Threshold dream failed"
297 );
298 }
299 }
300 }
301 }
302 Err(e) => {
303 tracing::warn!(
304 error = %e,
305 namespace_id,
306 "Failed to query memory count for threshold dream"
307 );
308 }
309 }
310
311 if activity_monitor.should_deep_dream() {
313 info!("Deep dream conditions met. Running global synthesis.");
314 let services = crate::dream_cycle::DreamServices {
315 pool: pool.clone(),
316 cognition: cognition.clone(),
317 agent: config.clone(),
318 llm: llm.clone(),
319 embeddings: embedder.clone(),
320 cognitive_system: cognitive_system.clone(),
321 };
322 if let Err(e) = crate::dream_cycle::run_deep_dream(
323 &services,
324 &soul_builder,
325 &mut activity_monitor,
326 )
327 .await
328 {
329 tracing::warn!(error = %e, "Deep dream cycle failed");
330 }
331 }
332 }
333
334 let disk_monitor = ActivityMonitor::load();
338 activity_monitor.activity_log = disk_monitor.activity_log;
339 if let Err(e) = activity_monitor.save() {
340 tracing::warn!(error = %e, "Failed to save activity monitor");
341 }
342
343 let sleep_duration = if cognition.adaptive_dream_enabled {
344 crate::dream_cycle::compute_adaptive_dream_interval(
345 pool.clone(),
346 namespace_id,
347 base_interval_secs,
348 &cognition,
349 )
350 .await
351 } else {
352 Duration::from_secs(base_interval_secs)
353 };
354
355 tokio::select! {
356 _ = tokio::time::sleep(sleep_duration) => {}
357 _ = cancel.cancelled() => {
358 info!("Consolidation task received shutdown signal");
359 break;
360 }
361 }
362
363 let lease_owner = format!("supervisor-dream-{}", namespace_id);
364 match run_dream_cycle(
365 pool.clone(),
366 &cognition,
367 &config,
368 llm.clone(),
369 None,
370 DreamCycleRequest {
371 namespace_id,
372 lease_owner: &lease_owner,
373 perspective: None,
374 session_key: None,
375 reflect_reason: "namespace_dream",
376 digest_reason: "dream_digest",
377 },
378 )
379 .await
380 {
381 Ok(processed) if processed > 0 => {
382 let mut s = status.write().await;
383 s.last_consolidation = Some(Utc::now());
384 s.memories_consolidated += processed as u64;
385 pulse::write_pulse(
386 "consolidation",
387 s.memories_consolidated,
388 s.files_processed,
389 );
390 }
391 Ok(_) => {
392 debug!("No memories to consolidate");
393 }
394 Err(e) => {
395 error!(error = %e, namespace_id, "Consolidation failed");
396 }
397 }
398 }
399 });
400
401 Ok(handle)
402 }
403
404 async fn spawn_cognition_worker_task(&self) -> Result<JoinHandle<()>, AgentError> {
405 let cognition = Config::from_env()
406 .map(|config| config.cognition)
407 .unwrap_or_default();
408 let agent = self.config.clone();
409 let llm = self.llm.clone();
410 let pool = self.pool.clone();
411 let namespace_id = self.namespace_id;
412 let cancel = self.cancel_token.clone();
413
414 let handle = tokio::spawn(async move {
415 let mut ticker = interval(Duration::from_secs(agent.scan_interval_secs.max(1)));
416
417 loop {
418 tokio::select! {
419 _ = ticker.tick() => {}
420 _ = cancel.cancelled() => {
421 info!("Cognition worker received shutdown signal");
422 break;
423 }
424 }
425
426 match drain_cognition_jobs(
427 pool.clone(),
428 namespace_id,
429 &cognition,
430 &agent,
431 llm.clone(),
432 None,
433 &format!("worker-{}", namespace_id),
434 )
435 .await
436 {
437 Ok(processed) if processed > 0 => {
438 debug!(processed, "Cognition worker drained jobs");
439 }
440 Ok(_) => {}
441 Err(e) => {
442 error!(error = %e, namespace_id, "Cognition worker failed");
443 }
444 }
445 }
446 });
447
448 Ok(handle)
449 }
450}