1use anyhow::Result;
10use async_trait::async_trait;
11use chrono::Utc;
12use oxios_ouroboros::Seed;
13use parking_lot::RwLock;
14use std::collections::HashMap;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use tokio::task::JoinHandle;
18
19use crate::agent_runtime::AgentRuntime;
20use crate::event_bus::EventBus;
21use crate::resource_monitor::ResourceMonitor;
22use crate::types::{AgentId, AgentInfo, AgentStatus};
23use oxios_ouroboros::ExecutionResult;
24
25struct AgentHandle {
27 cancelled: Arc<AtomicBool>,
29 task: JoinHandle<Result<ExecutionResult>>,
31}
32
33#[async_trait]
35pub trait Supervisor: Send + Sync {
36 async fn fork(&self, spec: &Seed) -> Result<AgentId>;
38
39 async fn exec(&self, id: AgentId) -> Result<()>;
41
42 async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult>;
45
46 async fn wait(&self, id: AgentId) -> Result<AgentStatus>;
48
49 async fn kill(&self, id: AgentId) -> Result<()>;
51
52 async fn list(&self) -> Result<Vec<AgentInfo>>;
54}
55
56pub struct BasicSupervisor {
58 agents: RwLock<HashMap<AgentId, AgentInfo>>,
59 handles: RwLock<HashMap<AgentId, AgentHandle>>,
61 event_bus: EventBus,
62 runtime: Arc<AgentRuntime>,
63 resource_monitor: Option<Arc<ResourceMonitor>>,
64}
65
66impl BasicSupervisor {
67 pub fn new(event_bus: EventBus, runtime: AgentRuntime) -> Self {
69 Self {
70 agents: RwLock::new(HashMap::new()),
71 handles: RwLock::new(HashMap::new()),
72 event_bus,
73 runtime: Arc::new(runtime),
74 resource_monitor: None,
75 }
76 }
77
78 pub fn set_resource_monitor(&mut self, rm: Arc<ResourceMonitor>) {
80 self.resource_monitor = Some(rm);
81 }
82
83 fn update_agent_count(&self) {
85 if let Some(ref rm) = self.resource_monitor {
86 let count = self.agents.read().len();
87 rm.set_active_agents(count);
88 }
89 }
90}
91
92#[async_trait]
93impl Supervisor for BasicSupervisor {
94 async fn fork(&self, spec: &Seed) -> Result<AgentId> {
95 let id = AgentId::new_v4();
96 let info = AgentInfo {
97 id,
98 name: spec.goal.clone(),
99 status: AgentStatus::Starting,
100 created_at: Utc::now(),
101 seed_id: Some(spec.id),
102 };
103
104 {
105 let mut agents = self.agents.write();
106 agents.insert(id, info);
107 }
108
109 self.update_agent_count();
110
111 let _ = self
112 .event_bus
113 .publish(crate::event_bus::KernelEvent::AgentCreated {
114 id,
115 name: spec.goal.clone(),
116 });
117
118 tracing::info!(agent_id = %id, "Forked new agent from seed");
119 Ok(id)
120 }
121
122 async fn exec(&self, id: AgentId) -> Result<()> {
123 {
124 let mut agents = self.agents.write();
125 match agents.get_mut(&id) {
126 Some(agent) => {
127 agent.status = AgentStatus::Running;
128 }
129 None => anyhow::bail!("Agent {id} not found"),
130 }
131 }
132
133 self.update_agent_count();
134
135 let _ = self
136 .event_bus
137 .publish(crate::event_bus::KernelEvent::AgentStarted { id });
138 tracing::info!(agent_id = %id, "Agent execution started");
139
140 Ok(())
141 }
142
143 async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult> {
144 {
146 let mut agents = self.agents.write();
147 match agents.get_mut(&id) {
148 Some(agent) => agent.status = AgentStatus::Running,
149 None => anyhow::bail!("Agent {id} not found"),
150 }
151 }
152
153 let _ = self
154 .event_bus
155 .publish(crate::event_bus::KernelEvent::AgentStarted { id });
156
157 tracing::info!(agent_id = %id, seed_id = %seed.id, "Running agent task");
158
159 let cancelled = Arc::new(AtomicBool::new(false));
161 let runtime = Arc::clone(&self.runtime);
162 let seed = seed.clone();
163 let cancelled_clone = cancelled.clone();
164
165 let handle: JoinHandle<Result<ExecutionResult>> = tokio::spawn(async move {
166 if cancelled_clone.load(Ordering::Relaxed) {
168 return Ok(ExecutionResult {
169 output: "Agent cancelled before execution".into(),
170 steps_completed: 0,
171 success: false,
172 });
173 }
174 runtime.execute(id, &seed).await
175 });
176
177 {
179 let mut handles = self.handles.write();
180 handles.insert(
181 id,
182 AgentHandle {
183 cancelled,
184 task: handle,
185 },
186 );
187 }
188
189 let result = {
191 let agent_handle = {
192 let mut handles = self.handles.write();
193 handles.remove(&id)
194 };
195 match agent_handle {
198 Some(ah) => match ah.task.await {
199 Ok(res) => res,
200 Err(join_err) => {
201 tracing::warn!(agent_id = %id, error = %join_err, "Agent task join error");
203 Ok(ExecutionResult {
204 output: format!("Agent task aborted: {join_err}"),
205 steps_completed: 0,
206 success: false,
207 })
208 }
209 },
210 None => anyhow::bail!("Agent {id} handle disappeared"),
211 }
212 };
213
214 match result {
215 Ok(result) => {
216 tracing::info!(
217 agent_id = %id,
218 success = result.success,
219 steps = result.steps_completed,
220 "Agent task completed"
221 );
222
223 {
224 let mut agents = self.agents.write();
225 if let Some(agent) = agents.get_mut(&id) {
226 agent.status = if result.success {
227 AgentStatus::Idle
228 } else {
229 AgentStatus::Failed
230 };
231 }
232 }
233
234 let _ = self
235 .event_bus
236 .publish(crate::event_bus::KernelEvent::AgentStopped { id });
237 self.update_agent_count();
238 Ok(result)
239 }
240 Err(e) => {
241 tracing::error!(agent_id = %id, error = %e, "Agent task failed");
242
243 {
244 let mut agents = self.agents.write();
245 if let Some(agent) = agents.get_mut(&id) {
246 agent.status = AgentStatus::Failed;
247 }
248 }
249
250 let _ = self
251 .event_bus
252 .publish(crate::event_bus::KernelEvent::AgentFailed {
253 id,
254 error: e.to_string(),
255 });
256 self.update_agent_count();
257
258 Ok(ExecutionResult {
259 output: format!("Agent failed: {e}"),
260 steps_completed: 0,
261 success: false,
262 })
263 }
264 }
265 }
266
267 async fn wait(&self, id: AgentId) -> Result<AgentStatus> {
268 let agents = self.agents.read();
269 match agents.get(&id) {
270 Some(info) => Ok(info.status),
271 None => anyhow::bail!("Agent {id} not found"),
272 }
273 }
274
275 async fn kill(&self, id: AgentId) -> Result<()> {
276 {
278 let mut handles = self.handles.write();
279 if let Some(agent_handle) = handles.remove(&id) {
280 agent_handle.cancelled.store(true, Ordering::Relaxed);
281 agent_handle.task.abort();
282 tracing::info!(agent_id = %id, "Agent task aborted");
283 }
284 }
285
286 {
287 let mut agents = self.agents.write();
288 if let Some(agent) = agents.get_mut(&id) {
289 agent.status = AgentStatus::Stopped;
290 } else {
291 anyhow::bail!("Agent {id} not found");
292 }
293 }
294
295 let _ = self
296 .event_bus
297 .publish(crate::event_bus::KernelEvent::AgentStopped { id });
298 self.update_agent_count();
299 tracing::info!(agent_id = %id, "Agent killed");
300 Ok(())
301 }
302
303 async fn list(&self) -> Result<Vec<AgentInfo>> {
304 let agents = self.agents.read();
305 Ok(agents.values().cloned().collect())
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use crate::event_bus::EventBus;
313 use crate::types::AgentStatus;
314 use async_trait::async_trait;
315 use futures::Stream;
316 use oxi_sdk::{Context, Model, ProviderError, ProviderEvent, StreamOptions};
317 use oxios_ouroboros::Seed;
318 use std::pin::Pin;
319
320 struct MockProvider;
322
323 #[async_trait]
324 impl oxi_sdk::Provider for MockProvider {
325 async fn stream(
326 &self,
327 _model: &Model,
328 _context: &Context,
329 _options: Option<StreamOptions>,
330 ) -> Result<Pin<Box<dyn Stream<Item = ProviderEvent> + Send>>, ProviderError> {
331 let stream = futures::stream::empty();
333 Ok(Box::pin(stream))
334 }
335
336 fn name(&self) -> &str {
337 "mock"
338 }
339 }
340
341 fn make_supervisor() -> BasicSupervisor {
343 use std::path::PathBuf;
344
345 let event_bus = EventBus::new(64);
346 let provider = Arc::new(MockProvider);
347
348 let tmp = std::env::temp_dir().join(format!("oxios-test-{}", uuid::Uuid::new_v4()));
350 let _ = std::fs::create_dir_all(&tmp);
351
352 let state_store_2 =
353 Arc::new(crate::state_store::StateStore::new(tmp.join("state")).expect("state store"));
354 let state_store = state_store_2.clone();
355 let state_store_for_space = state_store_2.clone();
356 let memory_manager = Arc::new({
357 let mut mm = crate::memory::MemoryManager::new(state_store.clone());
358 mm.set_git_layer(Arc::new(
359 crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
360 ));
361 mm
362 });
363
364 let kernel_handle = Arc::new(crate::KernelHandle::new(
365 crate::kernel_handle::StateApi::new(state_store),
366 crate::kernel_handle::AgentApi::new(
367 Arc::new(crate::supervisor::NoOpSupervisor),
368 Arc::new(crate::budget::BudgetManager::new()),
369 memory_manager,
370 ),
371 crate::kernel_handle::SecurityApi::new(
372 Arc::new(parking_lot::Mutex::new(crate::auth::AuthManager::new())),
373 Arc::new(crate::audit_trail::AuditTrail::new(100)),
374 Arc::new(parking_lot::Mutex::new(
375 crate::access_manager::AccessManager::new(),
376 )),
377 Arc::new(
378 crate::state_store::StateStore::new(tmp.join("state2")).expect("state store 2"),
379 ),
380 ),
381 crate::kernel_handle::PersonaApi::new(Arc::new(
382 crate::persona_manager::PersonaManager::new(),
383 )),
384 crate::kernel_handle::ExtensionApi::new(
385 Arc::new(crate::program::ProgramManager::new(tmp.join("programs"))),
386 Arc::new(crate::skill::SkillStore::new(tmp.join("skills")).expect("skill store")),
387 Arc::new(crate::host_tools::HostToolValidator::new(vec![], vec![])),
388 ),
389 crate::kernel_handle::McpApi::new(Arc::new(crate::mcp::McpBridge::new())),
390 crate::kernel_handle::InfraApi::new(
391 Arc::new(
392 crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
393 ),
394 Arc::new(crate::scheduler::AgentScheduler::new(4, 60, 300)),
395 Arc::new(crate::cron::CronScheduler::new(
396 Arc::new(
397 crate::state_store::StateStore::new(tmp.join("cron")).expect("cron state"),
398 ),
399 60,
400 )),
401 Arc::new(crate::resource_monitor::ResourceMonitor::new(60, 100)),
402 EventBus::new(64),
403 crate::config::OxiosConfig::default(),
404 std::time::Instant::now(),
405 ),
406 crate::kernel_handle::SpaceApi::new(
407 Arc::new(
408 tokio::runtime::Handle::current()
409 .block_on(crate::space::SpaceManager::new(
410 state_store_for_space,
411 EventBus::new(64),
412 ))
413 .expect("space mgr"),
414 ),
415 EventBus::new(64),
416 ),
417 crate::kernel_handle::ExecApi::new(
418 Arc::new(crate::config::ExecConfig::default()),
419 Arc::new(parking_lot::Mutex::new(
420 crate::access_manager::AccessManager::new(),
421 )),
422 ),
423 crate::kernel_handle::BrowserApi::default(),
424 crate::kernel_handle::A2aApi::new(Arc::new(crate::a2a::A2AProtocol::new(
425 EventBus::new(64),
426 ))),
427 ));
428
429 let runtime = AgentRuntime::new(provider, "mock/model", kernel_handle);
430 BasicSupervisor::new(event_bus, runtime)
431 }
432
433 fn make_seed(goal: &str) -> Seed {
435 Seed {
436 id: uuid::Uuid::new_v4(),
437 goal: goal.to_string(),
438 constraints: vec![],
439 acceptance_criteria: vec![],
440 ontology: vec![],
441 created_at: chrono::Utc::now(),
442 generation: 0,
443 parent_seed_id: None,
444 cspace_hint: None,
445 }
446 }
447
448 #[tokio::test]
449 async fn test_fork_creates_agent() {
450 let supervisor = make_supervisor();
451 let seed = make_seed("Test agent");
452
453 let id = supervisor.fork(&seed).await.unwrap();
454
455 let agents = supervisor.list().await.unwrap();
456 assert_eq!(agents.len(), 1);
457 assert_eq!(agents[0].id, id);
458 assert_eq!(agents[0].name, "Test agent");
459 assert_eq!(agents[0].status, AgentStatus::Starting);
460 assert_eq!(agents[0].seed_id, Some(seed.id));
461 }
462
463 #[tokio::test]
464 async fn test_exec_updates_status_to_running() {
465 let supervisor = make_supervisor();
466 let seed = make_seed("Running agent");
467
468 let id = supervisor.fork(&seed).await.unwrap();
469 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Starting);
470
471 supervisor.exec(id).await.unwrap();
472 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
473 }
474
475 #[tokio::test]
476 async fn test_kill_sets_stopped() {
477 let supervisor = make_supervisor();
478 let seed = make_seed("Doomed agent");
479
480 let id = supervisor.fork(&seed).await.unwrap();
481 supervisor.exec(id).await.unwrap();
482 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
483
484 supervisor.kill(id).await.unwrap();
485 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Stopped);
486 }
487
488 #[tokio::test]
489 async fn test_kill_unknown_agent_returns_error() {
490 let supervisor = make_supervisor();
491 let unknown_id = uuid::Uuid::new_v4();
492
493 let result = supervisor.kill(unknown_id).await;
494 assert!(result.is_err());
495 assert!(result.unwrap_err().to_string().contains("not found"));
496 }
497
498 #[tokio::test]
499 async fn test_list_returns_all_agents() {
500 let supervisor = make_supervisor();
501
502 let id1 = supervisor.fork(&make_seed("Agent 1")).await.unwrap();
503 let id2 = supervisor.fork(&make_seed("Agent 2")).await.unwrap();
504 let id3 = supervisor.fork(&make_seed("Agent 3")).await.unwrap();
505
506 let agents = supervisor.list().await.unwrap();
507 assert_eq!(agents.len(), 3);
508
509 let ids: std::collections::HashSet<AgentId> = agents.iter().map(|a| a.id).collect();
510 assert!(ids.contains(&id1));
511 assert!(ids.contains(&id2));
512 assert!(ids.contains(&id3));
513 }
514
515 #[tokio::test]
516 async fn test_exec_unknown_agent_returns_error() {
517 let supervisor = make_supervisor();
518 let unknown_id = uuid::Uuid::new_v4();
519
520 let result = supervisor.exec(unknown_id).await;
521 assert!(result.is_err());
522 assert!(result.unwrap_err().to_string().contains("not found"));
523 }
524
525 #[tokio::test]
526 async fn test_wait_unknown_agent_returns_error() {
527 let supervisor = make_supervisor();
528 let unknown_id = uuid::Uuid::new_v4();
529
530 let result = supervisor.wait(unknown_id).await;
531 assert!(result.is_err());
532 assert!(result.unwrap_err().to_string().contains("not found"));
533 }
534}
535
536pub struct NoOpSupervisor;
542
543#[async_trait::async_trait]
544impl Supervisor for NoOpSupervisor {
545 async fn fork(&self, _spec: &Seed) -> Result<AgentId> {
546 Err(anyhow::anyhow!(
547 "NoOpSupervisor: fork not available during build"
548 ))
549 }
550 async fn exec(&self, _id: AgentId) -> Result<()> {
551 Err(anyhow::anyhow!(
552 "NoOpSupervisor: exec not available during build"
553 ))
554 }
555 async fn run_with_seed(&self, _id: AgentId, _seed: &Seed) -> Result<ExecutionResult> {
556 Err(anyhow::anyhow!(
557 "NoOpSupervisor: run_with_seed not available during build"
558 ))
559 }
560 async fn wait(&self, _id: AgentId) -> Result<AgentStatus> {
561 Err(anyhow::anyhow!(
562 "NoOpSupervisor: wait not available during build"
563 ))
564 }
565 async fn kill(&self, _id: AgentId) -> Result<()> {
566 Err(anyhow::anyhow!(
567 "NoOpSupervisor: kill not available during build"
568 ))
569 }
570 async fn list(&self) -> Result<Vec<AgentInfo>> {
571 Ok(Vec::new())
572 }
573}