1use anyhow::Result;
10use async_trait::async_trait;
11use chrono::Utc;
12use oxios_ouroboros::Seed;
13use parking_lot::RwLock;
14use std::collections::HashMap;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use tokio::task::JoinHandle;
18
19use crate::agent_runtime::AgentRuntime;
20use crate::event_bus::EventBus;
21use crate::resource_monitor::ResourceMonitor;
22use crate::types::{AgentId, AgentInfo, AgentStatus};
23use oxios_ouroboros::ExecutionResult;
24
25struct AgentHandle {
27 cancelled: Arc<AtomicBool>,
29 task: JoinHandle<Result<ExecutionResult>>,
31}
32
33#[async_trait]
35pub trait Supervisor: Send + Sync {
36 async fn fork(&self, spec: &Seed) -> Result<AgentId>;
38
39 async fn exec(&self, id: AgentId) -> Result<()>;
41
42 async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult>;
45
46 async fn wait(&self, id: AgentId) -> Result<AgentStatus>;
48
49 async fn kill(&self, id: AgentId) -> Result<()>;
51
52 async fn list(&self) -> Result<Vec<AgentInfo>>;
54}
55
56pub struct BasicSupervisor {
58 agents: RwLock<HashMap<AgentId, AgentInfo>>,
59 handles: RwLock<HashMap<AgentId, AgentHandle>>,
61 event_bus: EventBus,
62 runtime: Arc<AgentRuntime>,
63 resource_monitor: Option<Arc<ResourceMonitor>>,
64}
65
66impl BasicSupervisor {
67 pub fn new(event_bus: EventBus, runtime: AgentRuntime) -> Self {
69 Self {
70 agents: RwLock::new(HashMap::new()),
71 handles: RwLock::new(HashMap::new()),
72 event_bus,
73 runtime: Arc::new(runtime),
74 resource_monitor: None,
75 }
76 }
77
78 pub fn set_resource_monitor(&mut self, rm: Arc<ResourceMonitor>) {
80 self.resource_monitor = Some(rm);
81 }
82
83 fn update_agent_count(&self) {
85 if let Some(ref rm) = self.resource_monitor {
86 let count = self.agents.read().len();
87 rm.set_active_agents(count);
88 }
89 }
90}
91
92#[async_trait]
93impl Supervisor for BasicSupervisor {
94 async fn fork(&self, spec: &Seed) -> Result<AgentId> {
95 let id = AgentId::new_v4();
96 let info = AgentInfo {
97 id,
98 name: spec.goal.clone(),
99 status: AgentStatus::Starting,
100 created_at: Utc::now(),
101 seed_id: Some(spec.id),
102 };
103
104 {
105 let mut agents = self.agents.write();
106 agents.insert(id, info);
107 }
108
109 self.update_agent_count();
110
111 let _ = self
112 .event_bus
113 .publish(crate::event_bus::KernelEvent::AgentCreated {
114 id,
115 name: spec.goal.clone(),
116 });
117
118 tracing::info!(agent_id = %id, "Forked new agent from seed");
119 Ok(id)
120 }
121
122 async fn exec(&self, id: AgentId) -> Result<()> {
123 {
124 let mut agents = self.agents.write();
125 match agents.get_mut(&id) {
126 Some(agent) => {
127 agent.status = AgentStatus::Running;
128 }
129 None => anyhow::bail!("Agent {id} not found"),
130 }
131 }
132
133 self.update_agent_count();
134
135 let _ = self
136 .event_bus
137 .publish(crate::event_bus::KernelEvent::AgentStarted { id });
138 tracing::info!(agent_id = %id, "Agent execution started");
139
140 Ok(())
141 }
142
143 async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult> {
144 {
146 let mut agents = self.agents.write();
147 match agents.get_mut(&id) {
148 Some(agent) => agent.status = AgentStatus::Running,
149 None => anyhow::bail!("Agent {id} not found"),
150 }
151 }
152
153 let _ = self
154 .event_bus
155 .publish(crate::event_bus::KernelEvent::AgentStarted { id });
156
157 tracing::info!(agent_id = %id, seed_id = %seed.id, "Running agent task");
158
159 let cancelled = Arc::new(AtomicBool::new(false));
161 let runtime = Arc::clone(&self.runtime);
162 let seed = seed.clone();
163 let cancelled_clone = cancelled.clone();
164
165 let handle: JoinHandle<Result<ExecutionResult>> = tokio::spawn(async move {
166 if cancelled_clone.load(Ordering::Relaxed) {
168 return Ok(ExecutionResult {
169 output: "Agent cancelled before execution".into(),
170 steps_completed: 0,
171 success: false,
172 });
173 }
174 runtime.execute(id, &seed).await
175 });
176
177 {
179 let mut handles = self.handles.write();
180 handles.insert(
181 id,
182 AgentHandle {
183 cancelled,
184 task: handle,
185 },
186 );
187 }
188
189 let result = {
191 let agent_handle = {
192 let mut handles = self.handles.write();
193 handles.remove(&id)
194 };
195 match agent_handle {
198 Some(ah) => match ah.task.await {
199 Ok(res) => res,
200 Err(join_err) => {
201 tracing::warn!(agent_id = %id, error = %join_err, "Agent task join error");
203 Ok(ExecutionResult {
204 output: format!("Agent task aborted: {join_err}"),
205 steps_completed: 0,
206 success: false,
207 })
208 }
209 },
210 None => anyhow::bail!("Agent {id} handle disappeared"),
211 }
212 };
213
214 match result {
215 Ok(result) => {
216 tracing::info!(
217 agent_id = %id,
218 success = result.success,
219 steps = result.steps_completed,
220 "Agent task completed"
221 );
222
223 {
224 let mut agents = self.agents.write();
225 if let Some(agent) = agents.get_mut(&id) {
226 agent.status = if result.success {
227 AgentStatus::Idle
228 } else {
229 AgentStatus::Failed
230 };
231 }
232 }
233
234 let _ = self
235 .event_bus
236 .publish(crate::event_bus::KernelEvent::AgentStopped { id });
237 self.update_agent_count();
238 Ok(result)
239 }
240 Err(e) => {
241 tracing::error!(agent_id = %id, error = %e, "Agent task failed");
242
243 {
244 let mut agents = self.agents.write();
245 if let Some(agent) = agents.get_mut(&id) {
246 agent.status = AgentStatus::Failed;
247 }
248 }
249
250 let _ = self
251 .event_bus
252 .publish(crate::event_bus::KernelEvent::AgentFailed {
253 id,
254 error: e.to_string(),
255 });
256 self.update_agent_count();
257
258 Ok(ExecutionResult {
259 output: format!("Agent failed: {e}"),
260 steps_completed: 0,
261 success: false,
262 })
263 }
264 }
265 }
266
267 async fn wait(&self, id: AgentId) -> Result<AgentStatus> {
268 let agents = self.agents.read();
269 match agents.get(&id) {
270 Some(info) => Ok(info.status),
271 None => anyhow::bail!("Agent {id} not found"),
272 }
273 }
274
275 async fn kill(&self, id: AgentId) -> Result<()> {
276 {
278 let mut handles = self.handles.write();
279 if let Some(agent_handle) = handles.remove(&id) {
280 agent_handle.cancelled.store(true, Ordering::Relaxed);
281 agent_handle.task.abort();
282 tracing::info!(agent_id = %id, "Agent task aborted");
283 }
284 }
285
286 {
287 let mut agents = self.agents.write();
288 if let Some(agent) = agents.get_mut(&id) {
289 agent.status = AgentStatus::Stopped;
290 } else {
291 anyhow::bail!("Agent {id} not found");
292 }
293 }
294
295 let _ = self
296 .event_bus
297 .publish(crate::event_bus::KernelEvent::AgentStopped { id });
298 self.update_agent_count();
299 tracing::info!(agent_id = %id, "Agent killed");
300 Ok(())
301 }
302
303 async fn list(&self) -> Result<Vec<AgentInfo>> {
304 let agents = self.agents.read();
305 Ok(agents.values().cloned().collect())
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use crate::event_bus::EventBus;
313 use crate::types::AgentStatus;
314 use async_trait::async_trait;
315 use futures::Stream;
316 use oxi_sdk::{Context, Model, ProviderError, ProviderEvent, StreamOptions};
317 use oxios_ouroboros::Seed;
318 use std::pin::Pin;
319
320 struct MockProvider;
322
323 #[async_trait]
324 impl oxi_sdk::Provider for MockProvider {
325 async fn stream(
326 &self,
327 _model: &Model,
328 _context: &Context,
329 _options: Option<StreamOptions>,
330 ) -> Result<Pin<Box<dyn Stream<Item = ProviderEvent> + Send>>, ProviderError> {
331 let stream = futures::stream::empty();
333 Ok(Box::pin(stream))
334 }
335
336 fn name(&self) -> &str {
337 "mock"
338 }
339 }
340
341 async fn make_supervisor() -> BasicSupervisor {
343 let event_bus = EventBus::new(64);
344 let provider = Arc::new(MockProvider);
345
346 let tmp = std::env::temp_dir().join(format!("oxios-test-{}", uuid::Uuid::new_v4()));
348 let _ = std::fs::create_dir_all(&tmp);
349
350 let state_store_2 =
351 Arc::new(crate::state_store::StateStore::new(tmp.join("state")).expect("state store"));
352 let state_store = state_store_2.clone();
353 let state_store_for_space = state_store_2.clone();
354 let memory_manager = Arc::new({
355 let mut mm = crate::memory::MemoryManager::new(state_store.clone());
356 mm.set_git_layer(Arc::new(
357 crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
358 ));
359 mm
360 });
361
362 let kernel_handle = Arc::new(crate::KernelHandle::new(
363 crate::kernel_handle::StateApi::new(state_store),
364 crate::kernel_handle::AgentApi::new(
365 Arc::new(crate::supervisor::NoOpSupervisor),
366 Arc::new(crate::budget::BudgetManager::new()),
367 memory_manager.clone(),
368 Some(event_bus.clone()),
369 ),
370 crate::kernel_handle::SecurityApi::new(
371 Arc::new(parking_lot::Mutex::new(crate::auth::AuthManager::new())),
372 Arc::new(crate::audit_trail::AuditTrail::new(100)),
373 Arc::new(parking_lot::Mutex::new(
374 crate::access_manager::AccessManager::new(),
375 )),
376 Arc::new(
377 crate::state_store::StateStore::new(tmp.join("state2")).expect("state store 2"),
378 ),
379 ),
380 crate::kernel_handle::PersonaApi::new(Arc::new(
381 crate::persona_manager::PersonaManager::new(),
382 )),
383 crate::kernel_handle::ExtensionApi::new(
384 Arc::new(crate::program::ProgramManager::new(tmp.join("programs"))),
385 Arc::new(crate::skill::SkillStore::new(tmp.join("skills")).expect("skill store")),
386 Arc::new(crate::host_tools::HostToolValidator::new(vec![], vec![])),
387 ),
388 crate::kernel_handle::McpApi::new(Arc::new(crate::mcp::McpBridge::new())),
389 crate::kernel_handle::InfraApi::new(
390 Arc::new(
391 crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
392 ),
393 Arc::new(crate::scheduler::AgentScheduler::new(4, 60, 300)),
394 Arc::new(crate::cron::CronScheduler::new(
395 Arc::new(
396 crate::state_store::StateStore::new(tmp.join("cron")).expect("cron state"),
397 ),
398 60,
399 )),
400 Arc::new(crate::resource_monitor::ResourceMonitor::new(60, 100)),
401 EventBus::new(64),
402 crate::config::OxiosConfig::default(),
403 std::time::Instant::now(),
404 ),
405 crate::kernel_handle::SpaceApi::new(
406 Arc::new(
407 crate::space::SpaceManager::new(state_store_for_space, EventBus::new(64))
408 .await
409 .expect("space mgr"),
410 ),
411 EventBus::new(64),
412 ),
413 crate::kernel_handle::ExecApi::new(
414 Arc::new(crate::config::ExecConfig::default()),
415 Arc::new(parking_lot::Mutex::new(
416 crate::access_manager::AccessManager::new(),
417 )),
418 ),
419 crate::kernel_handle::BrowserApi::default(),
420 crate::kernel_handle::A2aApi::new(Arc::new(crate::a2a::A2AProtocol::new(
421 EventBus::new(64),
422 ))),
423 Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
424 Arc::new(
425 crate::kernel_handle::KnowledgeLens::new(
426 Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
427 memory_manager.clone(),
428 )
429 .unwrap(),
430 ),
431 ));
432
433 let runtime = AgentRuntime::new(provider, "mock/model", kernel_handle);
434 BasicSupervisor::new(event_bus, runtime)
435 }
436
437 fn make_seed(goal: &str) -> Seed {
439 Seed {
440 id: uuid::Uuid::new_v4(),
441 goal: goal.to_string(),
442 constraints: vec![],
443 acceptance_criteria: vec![],
444 ontology: vec![],
445 created_at: chrono::Utc::now(),
446 generation: 0,
447 parent_seed_id: None,
448 cspace_hint: None,
449 }
450 }
451
452 #[tokio::test]
453 async fn test_fork_creates_agent() {
454 let supervisor = make_supervisor().await;
455 let seed = make_seed("Test agent");
456
457 let id = supervisor.fork(&seed).await.unwrap();
458
459 let agents = supervisor.list().await.unwrap();
460 assert_eq!(agents.len(), 1);
461 assert_eq!(agents[0].id, id);
462 assert_eq!(agents[0].name, "Test agent");
463 assert_eq!(agents[0].status, AgentStatus::Starting);
464 assert_eq!(agents[0].seed_id, Some(seed.id));
465 }
466
467 #[tokio::test]
468 async fn test_exec_updates_status_to_running() {
469 let supervisor = make_supervisor().await;
470 let seed = make_seed("Running agent");
471
472 let id = supervisor.fork(&seed).await.unwrap();
473 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Starting);
474
475 supervisor.exec(id).await.unwrap();
476 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
477 }
478
479 #[tokio::test]
480 async fn test_kill_sets_stopped() {
481 let supervisor = make_supervisor().await;
482 let seed = make_seed("Doomed agent");
483
484 let id = supervisor.fork(&seed).await.unwrap();
485 supervisor.exec(id).await.unwrap();
486 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
487
488 supervisor.kill(id).await.unwrap();
489 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Stopped);
490 }
491
492 #[tokio::test]
493 async fn test_kill_unknown_agent_returns_error() {
494 let supervisor = make_supervisor().await;
495 let unknown_id = uuid::Uuid::new_v4();
496
497 let result = supervisor.kill(unknown_id).await;
498 assert!(result.is_err());
499 assert!(result.unwrap_err().to_string().contains("not found"));
500 }
501
502 #[tokio::test]
503 async fn test_list_returns_all_agents() {
504 let supervisor = make_supervisor().await;
505
506 let id1 = supervisor.fork(&make_seed("Agent 1")).await.unwrap();
507 let id2 = supervisor.fork(&make_seed("Agent 2")).await.unwrap();
508 let id3 = supervisor.fork(&make_seed("Agent 3")).await.unwrap();
509
510 let agents = supervisor.list().await.unwrap();
511 assert_eq!(agents.len(), 3);
512
513 let ids: std::collections::HashSet<AgentId> = agents.iter().map(|a| a.id).collect();
514 assert!(ids.contains(&id1));
515 assert!(ids.contains(&id2));
516 assert!(ids.contains(&id3));
517 }
518
519 #[tokio::test]
520 async fn test_exec_unknown_agent_returns_error() {
521 let supervisor = make_supervisor().await;
522 let unknown_id = uuid::Uuid::new_v4();
523
524 let result = supervisor.exec(unknown_id).await;
525 assert!(result.is_err());
526 assert!(result.unwrap_err().to_string().contains("not found"));
527 }
528
529 #[tokio::test]
530 async fn test_wait_unknown_agent_returns_error() {
531 let supervisor = make_supervisor().await;
532 let unknown_id = uuid::Uuid::new_v4();
533
534 let result = supervisor.wait(unknown_id).await;
535 assert!(result.is_err());
536 assert!(result.unwrap_err().to_string().contains("not found"));
537 }
538}
539
540pub struct NoOpSupervisor;
546
547#[async_trait::async_trait]
548impl Supervisor for NoOpSupervisor {
549 async fn fork(&self, _spec: &Seed) -> Result<AgentId> {
550 Err(anyhow::anyhow!(
551 "NoOpSupervisor: fork not available during build"
552 ))
553 }
554 async fn exec(&self, _id: AgentId) -> Result<()> {
555 Err(anyhow::anyhow!(
556 "NoOpSupervisor: exec not available during build"
557 ))
558 }
559 async fn run_with_seed(&self, _id: AgentId, _seed: &Seed) -> Result<ExecutionResult> {
560 Err(anyhow::anyhow!(
561 "NoOpSupervisor: run_with_seed not available during build"
562 ))
563 }
564 async fn wait(&self, _id: AgentId) -> Result<AgentStatus> {
565 Err(anyhow::anyhow!(
566 "NoOpSupervisor: wait not available during build"
567 ))
568 }
569 async fn kill(&self, _id: AgentId) -> Result<()> {
570 Err(anyhow::anyhow!(
571 "NoOpSupervisor: kill not available during build"
572 ))
573 }
574 async fn list(&self) -> Result<Vec<AgentInfo>> {
575 Ok(Vec::new())
576 }
577}