1use anyhow::Result;
10use async_trait::async_trait;
11use chrono::Utc;
12use oxios_ouroboros::Seed;
13use parking_lot::RwLock;
14use std::collections::HashMap;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use tokio::task::JoinHandle;
18
19use crate::agent_runtime::AgentRuntime;
20use crate::event_bus::EventBus;
21use crate::resource_monitor::ResourceMonitor;
22use crate::types::{AgentId, AgentInfo, AgentStatus};
23use oxios_ouroboros::ExecutionResult;
24
25struct AgentHandle {
27 cancelled: Arc<AtomicBool>,
29 task: JoinHandle<Result<ExecutionResult>>,
31}
32
33#[async_trait]
35pub trait Supervisor: Send + Sync {
36 async fn fork(&self, spec: &Seed) -> Result<AgentId>;
38
39 async fn exec(&self, id: AgentId) -> Result<()>;
41
42 async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult>;
45
46 async fn wait(&self, id: AgentId) -> Result<AgentStatus>;
48
49 async fn kill(&self, id: AgentId) -> Result<()>;
51
52 async fn list(&self) -> Result<Vec<AgentInfo>>;
54}
55
56pub struct BasicSupervisor {
58 agents: RwLock<HashMap<AgentId, AgentInfo>>,
59 handles: RwLock<HashMap<AgentId, AgentHandle>>,
61 event_bus: EventBus,
62 runtime: Arc<AgentRuntime>,
63 resource_monitor: Option<Arc<ResourceMonitor>>,
64}
65
66impl BasicSupervisor {
67 pub fn new(event_bus: EventBus, runtime: AgentRuntime) -> Self {
69 Self {
70 agents: RwLock::new(HashMap::new()),
71 handles: RwLock::new(HashMap::new()),
72 event_bus,
73 runtime: Arc::new(runtime),
74 resource_monitor: None,
75 }
76 }
77
78 pub fn set_resource_monitor(&mut self, rm: Arc<ResourceMonitor>) {
80 self.resource_monitor = Some(rm);
81 }
82
83 fn update_agent_count(&self) {
85 if let Some(ref rm) = self.resource_monitor {
86 let count = self.agents.read().len();
87 rm.set_active_agents(count);
88 }
89 }
90}
91
92#[async_trait]
93impl Supervisor for BasicSupervisor {
94 async fn fork(&self, spec: &Seed) -> Result<AgentId> {
95 let id = AgentId::new_v4();
96 let info = AgentInfo {
97 id,
98 name: spec.goal.clone(),
99 status: AgentStatus::Starting,
100 created_at: Utc::now(),
101 seed_id: Some(spec.id),
102 };
103
104 {
105 let mut agents = self.agents.write();
106 agents.insert(id, info);
107 }
108
109 self.update_agent_count();
110
111 let _ = self
112 .event_bus
113 .publish(crate::event_bus::KernelEvent::AgentCreated {
114 id,
115 name: spec.goal.clone(),
116 });
117
118 tracing::info!(agent_id = %id, "Forked new agent from seed");
119 Ok(id)
120 }
121
122 async fn exec(&self, id: AgentId) -> Result<()> {
123 {
124 let mut agents = self.agents.write();
125 match agents.get_mut(&id) {
126 Some(agent) => {
127 agent.status = AgentStatus::Running;
128 }
129 None => anyhow::bail!("Agent {id} not found"),
130 }
131 }
132
133 self.update_agent_count();
134
135 let _ = self
136 .event_bus
137 .publish(crate::event_bus::KernelEvent::AgentStarted { id });
138 tracing::info!(agent_id = %id, "Agent execution started");
139
140 Ok(())
141 }
142
143 async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult> {
144 {
146 let mut agents = self.agents.write();
147 match agents.get_mut(&id) {
148 Some(agent) => agent.status = AgentStatus::Running,
149 None => anyhow::bail!("Agent {id} not found"),
150 }
151 }
152
153 let _ = self
154 .event_bus
155 .publish(crate::event_bus::KernelEvent::AgentStarted { id });
156
157 tracing::info!(agent_id = %id, seed_id = %seed.id, "Running agent task");
158
159 let cancelled = Arc::new(AtomicBool::new(false));
161 let runtime = Arc::clone(&self.runtime);
162 let seed = seed.clone();
163 let cancelled_clone = cancelled.clone();
164
165 let handle: JoinHandle<Result<ExecutionResult>> = tokio::spawn(async move {
166 if cancelled_clone.load(Ordering::Relaxed) {
168 return Ok(ExecutionResult {
169 output: "Agent cancelled before execution".into(),
170 steps_completed: 0,
171 success: false,
172 });
173 }
174 runtime.execute(id, &seed).await
175 });
176
177 {
179 let mut handles = self.handles.write();
180 handles.insert(
181 id,
182 AgentHandle {
183 cancelled,
184 task: handle,
185 },
186 );
187 }
188
189 let result = {
191 let agent_handle = {
192 let mut handles = self.handles.write();
193 handles.remove(&id)
194 };
195 match agent_handle {
198 Some(ah) => match ah.task.await {
199 Ok(res) => res,
200 Err(join_err) => {
201 tracing::warn!(agent_id = %id, error = %join_err, "Agent task join error");
203 Ok(ExecutionResult {
204 output: format!("Agent task aborted: {join_err}"),
205 steps_completed: 0,
206 success: false,
207 })
208 }
209 },
210 None => anyhow::bail!("Agent {id} handle disappeared"),
211 }
212 };
213
214 match result {
215 Ok(result) => {
216 tracing::info!(
217 agent_id = %id,
218 success = result.success,
219 steps = result.steps_completed,
220 "Agent task completed"
221 );
222
223 {
224 let mut agents = self.agents.write();
225 if let Some(agent) = agents.get_mut(&id) {
226 agent.status = if result.success {
227 AgentStatus::Idle
228 } else {
229 AgentStatus::Failed
230 };
231 }
232 }
233
234 let _ = self
235 .event_bus
236 .publish(crate::event_bus::KernelEvent::AgentStopped { id });
237 self.update_agent_count();
238 Ok(result)
239 }
240 Err(e) => {
241 tracing::error!(agent_id = %id, error = %e, "Agent task failed");
242
243 {
244 let mut agents = self.agents.write();
245 if let Some(agent) = agents.get_mut(&id) {
246 agent.status = AgentStatus::Failed;
247 }
248 }
249
250 let _ = self
251 .event_bus
252 .publish(crate::event_bus::KernelEvent::AgentFailed {
253 id,
254 error: e.to_string(),
255 });
256 self.update_agent_count();
257
258 Ok(ExecutionResult {
259 output: format!("Agent failed: {e}"),
260 steps_completed: 0,
261 success: false,
262 })
263 }
264 }
265 }
266
267 async fn wait(&self, id: AgentId) -> Result<AgentStatus> {
268 let agents = self.agents.read();
269 match agents.get(&id) {
270 Some(info) => Ok(info.status),
271 None => anyhow::bail!("Agent {id} not found"),
272 }
273 }
274
275 async fn kill(&self, id: AgentId) -> Result<()> {
276 {
278 let mut handles = self.handles.write();
279 if let Some(agent_handle) = handles.remove(&id) {
280 agent_handle.cancelled.store(true, Ordering::Relaxed);
281 agent_handle.task.abort();
282 tracing::info!(agent_id = %id, "Agent task aborted");
283 }
284 }
285
286 {
287 let mut agents = self.agents.write();
288 if let Some(agent) = agents.get_mut(&id) {
289 agent.status = AgentStatus::Stopped;
290 } else {
291 anyhow::bail!("Agent {id} not found");
292 }
293 }
294
295 let _ = self
296 .event_bus
297 .publish(crate::event_bus::KernelEvent::AgentStopped { id });
298 self.update_agent_count();
299 tracing::info!(agent_id = %id, "Agent killed");
300 Ok(())
301 }
302
303 async fn list(&self) -> Result<Vec<AgentInfo>> {
304 let agents = self.agents.read();
305 Ok(agents.values().cloned().collect())
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use crate::event_bus::EventBus;
313 use crate::types::AgentStatus;
314 use async_trait::async_trait;
315 use futures::Stream;
316 use oxi_sdk::{Context, Model, ProviderError, ProviderEvent, StreamOptions};
317 use oxios_ouroboros::Seed;
318 use std::pin::Pin;
319
320 struct MockProvider;
322
323 #[async_trait]
324 impl oxi_sdk::Provider for MockProvider {
325 async fn stream(
326 &self,
327 _model: &Model,
328 _context: &Context,
329 _options: Option<StreamOptions>,
330 ) -> Result<Pin<Box<dyn Stream<Item = ProviderEvent> + Send>>, ProviderError> {
331 let stream = futures::stream::empty();
333 Ok(Box::pin(stream))
334 }
335
336 fn name(&self) -> &str {
337 "mock"
338 }
339 }
340
341 async fn make_supervisor() -> BasicSupervisor {
343 let event_bus = EventBus::new(64);
344 let provider = Arc::new(MockProvider);
345
346 let tmp = std::env::temp_dir().join(format!("oxios-test-{}", uuid::Uuid::new_v4()));
348 let _ = std::fs::create_dir_all(&tmp);
349
350 let state_store_2 =
351 Arc::new(crate::state_store::StateStore::new(tmp.join("state")).expect("state store"));
352 let state_store = state_store_2.clone();
353 let state_store_for_space = state_store_2.clone();
354 let memory_manager = Arc::new({
355 let mut mm = crate::memory::MemoryManager::new(state_store.clone());
356 mm.set_git_layer(Arc::new(
357 crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
358 ));
359 mm
360 });
361
362 let kernel_handle = Arc::new(crate::KernelHandle::new(
363 crate::kernel_handle::StateApi::new(state_store),
364 crate::kernel_handle::AgentApi::new(
365 Arc::new(crate::supervisor::NoOpSupervisor),
366 Arc::new(crate::budget::BudgetManager::new()),
367 memory_manager.clone(),
368 Some(event_bus.clone()),
369 ),
370 crate::kernel_handle::SecurityApi::new(
371 Arc::new(parking_lot::Mutex::new(crate::auth::AuthManager::new())),
372 Arc::new(crate::audit_trail::AuditTrail::new(100)),
373 Arc::new(parking_lot::Mutex::new(
374 crate::access_manager::AccessManager::new(),
375 )),
376 Arc::new(
377 crate::state_store::StateStore::new(tmp.join("state2")).expect("state store 2"),
378 ),
379 ),
380 crate::kernel_handle::PersonaApi::new(Arc::new(
381 crate::persona_manager::PersonaManager::new(),
382 )),
383 crate::kernel_handle::ExtensionApi::new(
384 Arc::new(crate::program::ProgramManager::new(tmp.join("programs"))),
385 Arc::new(crate::skill::SkillStore::new(tmp.join("skills")).expect("skill store")),
386 Arc::new(crate::host_tools::HostToolValidator::new(vec![], vec![])),
387 ),
388 crate::kernel_handle::McpApi::new(Arc::new(crate::mcp::McpBridge::new())),
389 crate::kernel_handle::InfraApi::new(
390 Arc::new(
391 crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
392 ),
393 Arc::new(crate::scheduler::AgentScheduler::new(4, 60, 300)),
394 Arc::new(crate::cron::CronScheduler::new(
395 Arc::new(
396 crate::state_store::StateStore::new(tmp.join("cron")).expect("cron state"),
397 ),
398 60,
399 )),
400 Arc::new(crate::resource_monitor::ResourceMonitor::new(60, 100)),
401 EventBus::new(64),
402 crate::config::OxiosConfig::default(),
403 std::time::Instant::now(),
404 ),
405 crate::kernel_handle::SpaceApi::new(
406 Arc::new(
407 crate::space::SpaceManager::new(state_store_for_space, EventBus::new(64))
408 .await
409 .expect("space mgr"),
410 ),
411 EventBus::new(64),
412 ),
413 crate::kernel_handle::ExecApi::new(
414 Arc::new(crate::config::ExecConfig::default()),
415 Arc::new(parking_lot::Mutex::new(
416 crate::access_manager::AccessManager::new(),
417 )),
418 ),
419 crate::kernel_handle::BrowserApi::default(),
420 crate::kernel_handle::A2aApi::new(Arc::new(crate::a2a::A2AProtocol::new(
421 EventBus::new(64),
422 ))),
423 Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
424 Arc::new(
425 crate::kernel_handle::KnowledgeLens::new(
426 Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
427 memory_manager.clone(),
428 )
429 .unwrap(),
430 ),
431 ));
432
433 let runtime = AgentRuntime::new(provider, "mock/model", kernel_handle);
434 BasicSupervisor::new(event_bus, runtime)
435 }
436
437 fn make_seed(goal: &str) -> Seed {
439 Seed {
440 id: uuid::Uuid::new_v4(),
441 goal: goal.to_string(),
442 constraints: vec![],
443 acceptance_criteria: vec![],
444 ontology: vec![],
445 created_at: chrono::Utc::now(),
446 generation: 0,
447 parent_seed_id: None,
448 cspace_hint: None,
449 original_request: String::new(),
450 }
451 }
452
453 #[tokio::test]
454 async fn test_fork_creates_agent() {
455 let supervisor = make_supervisor().await;
456 let seed = make_seed("Test agent");
457
458 let id = supervisor.fork(&seed).await.unwrap();
459
460 let agents = supervisor.list().await.unwrap();
461 assert_eq!(agents.len(), 1);
462 assert_eq!(agents[0].id, id);
463 assert_eq!(agents[0].name, "Test agent");
464 assert_eq!(agents[0].status, AgentStatus::Starting);
465 assert_eq!(agents[0].seed_id, Some(seed.id));
466 }
467
468 #[tokio::test]
469 async fn test_exec_updates_status_to_running() {
470 let supervisor = make_supervisor().await;
471 let seed = make_seed("Running agent");
472
473 let id = supervisor.fork(&seed).await.unwrap();
474 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Starting);
475
476 supervisor.exec(id).await.unwrap();
477 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
478 }
479
480 #[tokio::test]
481 async fn test_kill_sets_stopped() {
482 let supervisor = make_supervisor().await;
483 let seed = make_seed("Doomed agent");
484
485 let id = supervisor.fork(&seed).await.unwrap();
486 supervisor.exec(id).await.unwrap();
487 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
488
489 supervisor.kill(id).await.unwrap();
490 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Stopped);
491 }
492
493 #[tokio::test]
494 async fn test_kill_unknown_agent_returns_error() {
495 let supervisor = make_supervisor().await;
496 let unknown_id = uuid::Uuid::new_v4();
497
498 let result = supervisor.kill(unknown_id).await;
499 assert!(result.is_err());
500 assert!(result.unwrap_err().to_string().contains("not found"));
501 }
502
503 #[tokio::test]
504 async fn test_list_returns_all_agents() {
505 let supervisor = make_supervisor().await;
506
507 let id1 = supervisor.fork(&make_seed("Agent 1")).await.unwrap();
508 let id2 = supervisor.fork(&make_seed("Agent 2")).await.unwrap();
509 let id3 = supervisor.fork(&make_seed("Agent 3")).await.unwrap();
510
511 let agents = supervisor.list().await.unwrap();
512 assert_eq!(agents.len(), 3);
513
514 let ids: std::collections::HashSet<AgentId> = agents.iter().map(|a| a.id).collect();
515 assert!(ids.contains(&id1));
516 assert!(ids.contains(&id2));
517 assert!(ids.contains(&id3));
518 }
519
520 #[tokio::test]
521 async fn test_exec_unknown_agent_returns_error() {
522 let supervisor = make_supervisor().await;
523 let unknown_id = uuid::Uuid::new_v4();
524
525 let result = supervisor.exec(unknown_id).await;
526 assert!(result.is_err());
527 assert!(result.unwrap_err().to_string().contains("not found"));
528 }
529
530 #[tokio::test]
531 async fn test_wait_unknown_agent_returns_error() {
532 let supervisor = make_supervisor().await;
533 let unknown_id = uuid::Uuid::new_v4();
534
535 let result = supervisor.wait(unknown_id).await;
536 assert!(result.is_err());
537 assert!(result.unwrap_err().to_string().contains("not found"));
538 }
539}
540
541pub struct NoOpSupervisor;
547
548#[async_trait::async_trait]
549impl Supervisor for NoOpSupervisor {
550 async fn fork(&self, _spec: &Seed) -> Result<AgentId> {
551 Err(anyhow::anyhow!(
552 "NoOpSupervisor: fork not available during build"
553 ))
554 }
555 async fn exec(&self, _id: AgentId) -> Result<()> {
556 Err(anyhow::anyhow!(
557 "NoOpSupervisor: exec not available during build"
558 ))
559 }
560 async fn run_with_seed(&self, _id: AgentId, _seed: &Seed) -> Result<ExecutionResult> {
561 Err(anyhow::anyhow!(
562 "NoOpSupervisor: run_with_seed not available during build"
563 ))
564 }
565 async fn wait(&self, _id: AgentId) -> Result<AgentStatus> {
566 Err(anyhow::anyhow!(
567 "NoOpSupervisor: wait not available during build"
568 ))
569 }
570 async fn kill(&self, _id: AgentId) -> Result<()> {
571 Err(anyhow::anyhow!(
572 "NoOpSupervisor: kill not available during build"
573 ))
574 }
575 async fn list(&self) -> Result<Vec<AgentInfo>> {
576 Ok(Vec::new())
577 }
578}