1use anyhow::Result;
20use async_trait::async_trait;
21use chrono::Utc;
22use oxi_sdk::Agent;
23use oxios_ouroboros::Seed;
24use parking_lot::RwLock;
25use std::collections::HashMap;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::Arc;
28use tokio::task::JoinHandle;
29
30use crate::agent_runtime::AgentRuntime;
31use crate::event_bus::EventBus;
32use crate::resource_monitor::ResourceMonitor;
33use crate::session_context::SessionContext;
34use crate::types::{AgentId, AgentInfo, AgentStatus};
35use oxios_ouroboros::ExecutionResult;
36
37struct AgentHandle {
39 cancelled: Arc<AtomicBool>,
41 task: JoinHandle<Result<ExecutionResult>>,
43}
44
45#[derive(Default)]
52pub struct AgentPool {
53 agents: RwLock<HashMap<AgentId, Arc<Agent>>>,
54}
55
56impl AgentPool {
57 pub fn new() -> Self {
59 Self {
60 agents: RwLock::new(HashMap::new()),
61 }
62 }
63
64 pub fn insert(&self, id: AgentId, agent: Arc<Agent>) {
66 self.agents.write().insert(id, agent);
67 }
68
69 pub fn get(&self, id: &AgentId) -> Option<Arc<Agent>> {
71 self.agents.read().get(id).cloned()
72 }
73
74 pub fn remove(&self, id: &AgentId) -> Option<Arc<Agent>> {
76 self.agents.write().remove(id)
77 }
78
79 pub fn export_state(&self, id: &AgentId) -> Option<serde_json::Value> {
83 self.agents
84 .read()
85 .get(id)
86 .and_then(|agent| agent.export_state().ok())
87 }
88
89 pub fn import_state(&self, id: &AgentId, state: serde_json::Value) -> bool {
93 if let Some(agent) = self.agents.read().get(id) {
94 agent.import_state(state).is_ok()
95 } else {
96 false
97 }
98 }
99
100 pub fn len(&self) -> usize {
102 self.agents.read().len()
103 }
104
105 pub fn is_empty(&self) -> bool {
107 self.agents.read().is_empty()
108 }
109}
110
111#[async_trait]
113pub trait Supervisor: Send + Sync {
114 async fn fork(&self, spec: &Seed) -> Result<AgentId>;
116
117 async fn exec(&self, id: AgentId) -> Result<()>;
119
120 async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult>;
123
124 async fn wait(&self, id: AgentId) -> Result<AgentStatus>;
126
127 async fn kill(&self, id: AgentId) -> Result<()>;
129
130 async fn list(&self) -> Result<Vec<AgentInfo>>;
132}
133
134pub struct BasicSupervisor {
136 agents: RwLock<HashMap<AgentId, AgentInfo>>,
137 handles: RwLock<HashMap<AgentId, AgentHandle>>,
139 agent_pool: AgentPool,
141 event_bus: EventBus,
142 runtime: Arc<AgentRuntime>,
143 resource_monitor: Option<Arc<ResourceMonitor>>,
144 session_context: Arc<tokio::sync::RwLock<SessionContext>>,
150}
151
152impl BasicSupervisor {
153 pub fn new(event_bus: EventBus, runtime: AgentRuntime) -> Self {
155 Self {
156 agents: RwLock::new(HashMap::new()),
157 handles: RwLock::new(HashMap::new()),
158 agent_pool: AgentPool::new(),
159 event_bus,
160 runtime: Arc::new(runtime),
161 resource_monitor: None,
162 session_context: Arc::new(tokio::sync::RwLock::new(SessionContext::new())),
163 }
164 }
165
166 pub fn set_resource_monitor(&mut self, rm: Arc<ResourceMonitor>) {
168 self.resource_monitor = Some(rm);
169 }
170
171 fn update_agent_count(&self) {
173 if let Some(ref rm) = self.resource_monitor {
174 let count = self.agents.read().len();
175 rm.set_active_agents(count);
176 }
177 }
178
179 pub fn pool(&self) -> &AgentPool {
181 &self.agent_pool
182 }
183}
184
185#[async_trait]
186impl Supervisor for BasicSupervisor {
187 async fn fork(&self, spec: &Seed) -> Result<AgentId> {
188 let id = AgentId::new_v4();
189 let info = AgentInfo {
190 id,
191 name: spec.goal.clone(),
192 status: AgentStatus::Starting,
193 created_at: Utc::now(),
194 seed_id: Some(spec.id),
195 };
196
197 {
198 let mut agents = self.agents.write();
199 agents.insert(id, info);
200 }
201
202 self.update_agent_count();
203
204 let _ = self
205 .event_bus
206 .publish(crate::event_bus::KernelEvent::AgentCreated {
207 id,
208 name: spec.goal.clone(),
209 });
210
211 tracing::info!(agent_id = %id, "Forked new agent from seed");
212 Ok(id)
213 }
214
215 async fn exec(&self, id: AgentId) -> Result<()> {
216 {
217 let mut agents = self.agents.write();
218 match agents.get_mut(&id) {
219 Some(agent) => {
220 agent.status = AgentStatus::Running;
221 }
222 None => anyhow::bail!("Agent {id} not found"),
223 }
224 }
225
226 self.update_agent_count();
227
228 let _ = self
229 .event_bus
230 .publish(crate::event_bus::KernelEvent::AgentStarted { id });
231 tracing::info!(agent_id = %id, "Agent execution started");
232
233 Ok(())
234 }
235
236 async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult> {
237 {
239 let mut agents = self.agents.write();
240 match agents.get_mut(&id) {
241 Some(agent) => agent.status = AgentStatus::Running,
242 None => anyhow::bail!("Agent {id} not found"),
243 }
244 }
245
246 let _ = self
247 .event_bus
248 .publish(crate::event_bus::KernelEvent::AgentStarted { id });
249
250 tracing::info!(agent_id = %id, seed_id = %seed.id, "Running agent task");
251
252 let cancelled = Arc::new(AtomicBool::new(false));
254 let runtime = Arc::clone(&self.runtime);
255 let seed = seed.clone();
256 let cancelled_clone = cancelled.clone();
257
258 let session_ctx = self.session_context.clone();
261
262 let handle: JoinHandle<Result<ExecutionResult>> = tokio::spawn(async move {
263 if cancelled_clone.load(Ordering::Relaxed) {
265 return Ok(ExecutionResult {
266 output: "Agent cancelled before execution".into(),
267 steps_completed: 0,
268 success: false,
269 tool_calls: vec![],
270 });
271 }
272 let mut ctx = session_ctx.write().await;
273 runtime.execute(id, &seed, &mut ctx).await
274 });
275
276 {
278 let mut handles = self.handles.write();
279 handles.insert(
280 id,
281 AgentHandle {
282 cancelled,
283 task: handle,
284 },
285 );
286 }
287
288 let result = {
290 let agent_handle = {
291 let mut handles = self.handles.write();
292 handles.remove(&id)
293 };
294 match agent_handle {
297 Some(ah) => match ah.task.await {
298 Ok(res) => res,
299 Err(join_err) => {
300 tracing::warn!(agent_id = %id, error = %join_err, "Agent task join error");
302 Ok(ExecutionResult {
303 output: format!("Agent task aborted: {join_err}"),
304 steps_completed: 0,
305 success: false,
306 tool_calls: vec![],
307 })
308 }
309 },
310 None => anyhow::bail!("Agent {id} handle disappeared"),
311 }
312 };
313
314 match result {
315 Ok(result) => {
316 tracing::info!(
317 agent_id = %id,
318 success = result.success,
319 steps = result.steps_completed,
320 "Agent task completed"
321 );
322
323 {
324 let mut agents = self.agents.write();
325 if let Some(agent) = agents.get_mut(&id) {
326 agent.status = if result.success {
327 AgentStatus::Idle
328 } else {
329 AgentStatus::Failed
330 };
331 }
332 }
333
334 let _ = self
335 .event_bus
336 .publish(crate::event_bus::KernelEvent::AgentStopped { id });
337 self.update_agent_count();
338 Ok(result)
339 }
340 Err(e) => {
341 tracing::error!(agent_id = %id, error = %e, "Agent task failed");
342
343 {
344 let mut agents = self.agents.write();
345 if let Some(agent) = agents.get_mut(&id) {
346 agent.status = AgentStatus::Failed;
347 }
348 }
349
350 let _ = self
351 .event_bus
352 .publish(crate::event_bus::KernelEvent::AgentFailed {
353 id,
354 error: e.to_string(),
355 });
356 self.update_agent_count();
357
358 Ok(ExecutionResult {
359 output: format!("Agent failed: {e}"),
360 steps_completed: 0,
361 success: false,
362 tool_calls: vec![],
363 })
364 }
365 }
366 }
367
368 async fn wait(&self, id: AgentId) -> Result<AgentStatus> {
369 let agents = self.agents.read();
370 match agents.get(&id) {
371 Some(info) => Ok(info.status),
372 None => anyhow::bail!("Agent {id} not found"),
373 }
374 }
375
376 async fn kill(&self, id: AgentId) -> Result<()> {
377 {
379 let mut handles = self.handles.write();
380 if let Some(agent_handle) = handles.remove(&id) {
381 agent_handle.cancelled.store(true, Ordering::Relaxed);
382 agent_handle.task.abort();
383 tracing::info!(agent_id = %id, "Agent task aborted");
384 }
385 }
386
387 {
388 let mut agents = self.agents.write();
389 if let Some(agent) = agents.get_mut(&id) {
390 agent.status = AgentStatus::Stopped;
391 } else {
392 anyhow::bail!("Agent {id} not found");
393 }
394 }
395
396 let _ = self
397 .event_bus
398 .publish(crate::event_bus::KernelEvent::AgentStopped { id });
399 self.update_agent_count();
400 tracing::info!(agent_id = %id, "Agent killed");
401 Ok(())
402 }
403
404 async fn list(&self) -> Result<Vec<AgentInfo>> {
405 let agents = self.agents.read();
406 Ok(agents.values().cloned().collect())
407 }
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413 use crate::event_bus::EventBus;
414 use crate::types::AgentStatus;
415 use oxios_ouroboros::Seed;
417
418 async fn make_supervisor() -> BasicSupervisor {
423 let event_bus = EventBus::new(64);
424
425 let tmp = std::env::temp_dir().join(format!("oxios-test-{}", uuid::Uuid::new_v4()));
427 let _ = std::fs::create_dir_all(&tmp);
428
429 let state_store_2 =
430 Arc::new(crate::state_store::StateStore::new(tmp.join("state")).expect("state store"));
431 let state_store = state_store_2.clone();
432 let state_store_for_space = state_store_2.clone();
433 let memory_manager = Arc::new({
434 let mut mm = crate::memory::MemoryManager::new(state_store.clone());
435 mm.set_git_layer(Arc::new(
436 crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
437 ));
438 mm
439 });
440
441 let kernel_handle = Arc::new(crate::KernelHandle::new(
442 crate::kernel_handle::StateApi::new(state_store),
443 crate::kernel_handle::AgentApi::new(
444 Arc::new(crate::supervisor::NoOpSupervisor),
445 Arc::new(crate::budget::BudgetManager::new()),
446 memory_manager.clone(),
447 Some(event_bus.clone()),
448 ),
449 crate::kernel_handle::SecurityApi::new(
450 Arc::new(parking_lot::Mutex::new(crate::auth::AuthManager::new())),
451 Arc::new(oxi_sdk::observability::AuditTrail::new(100)),
452 Arc::new(parking_lot::Mutex::new(
453 crate::access_manager::AccessManager::new(),
454 )),
455 Arc::new(
456 crate::state_store::StateStore::new(tmp.join("state2")).expect("state store 2"),
457 ),
458 ),
459 crate::kernel_handle::PersonaApi::new(Arc::new(crate::persona::PersonaManager::new())),
460 crate::kernel_handle::ExtensionApi::new(Arc::new(crate::skill::SkillManager::new(
461 tmp.join("skills"),
462 tmp.join("share/skills"),
463 ))),
464 crate::kernel_handle::McpApi::new(Arc::new(crate::mcp::McpBridge::new())),
465 crate::kernel_handle::InfraApi::new(
466 Arc::new(
467 crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
468 ),
469 Arc::new(crate::scheduler::AgentScheduler::new(4, 60, 300)),
470 Arc::new(crate::cron::CronScheduler::new(
471 Arc::new(
472 crate::state_store::StateStore::new(tmp.join("cron")).expect("cron state"),
473 ),
474 60,
475 )),
476 Arc::new(crate::resource_monitor::ResourceMonitor::new(60, 100)),
477 EventBus::new(64),
478 crate::config::OxiosConfig::default(),
479 std::time::Instant::now(),
480 ),
481 None,
482 crate::kernel_handle::ExecApi::new(
483 Arc::new(parking_lot::RwLock::new(
484 crate::config::ExecConfig::default(),
485 )),
486 Arc::new(parking_lot::Mutex::new(
487 crate::access_manager::AccessManager::new(),
488 )),
489 ),
490 crate::kernel_handle::A2aApi::new(Arc::new(crate::a2a::A2AProtocol::new(
491 EventBus::new(64),
492 ))),
493 crate::kernel_handle::EngineApi::new(
494 Arc::new(parking_lot::RwLock::new(
495 crate::config::OxiosConfig::default(),
496 )),
497 tmp.join("config.toml"),
498 Arc::new(crate::kernel_handle::RoutingStats::new()),
499 Arc::new(crate::engine::EngineHandle::new(Arc::new(
500 crate::OxiosEngine::new("anthropic/claude-sonnet-4-20250514"),
501 ))),
502 ),
503 Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
504 Arc::new(
505 crate::kernel_handle::KnowledgeLens::new(
506 Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
507 memory_manager.clone(),
508 )
509 .unwrap(),
510 ),
511 crate::kernel_handle::MarketplaceApi::new(
512 Arc::new(crate::skill::clawhub::ClawHubInstaller::new(
513 tmp.join("skills"),
514 tmp.join("state"),
515 None,
516 )),
517 Arc::new(
518 crate::skill::clawhub::ClawHubClient::new(None).expect("valid ClawHub client"),
519 ),
520 Arc::new(crate::skill::skills_sh::SkillsShInstaller::new(
521 tmp.join("skills"),
522 None,
523 None,
524 )),
525 Arc::new(
526 crate::skill::skills_sh::SkillsShClient::new(None, None)
527 .expect("valid Skills.sh client"),
528 ),
529 ),
530 None, None, ));
533
534 let engine = crate::OxiosEngine::new("mock/model");
535 let engine_handle = Arc::new(crate::engine::EngineHandle::new(Arc::new(engine)));
536 let runtime = AgentRuntime::new(engine_handle, "mock/model", kernel_handle, None);
537 BasicSupervisor::new(event_bus, runtime)
538 }
539
540 fn make_seed(goal: &str) -> Seed {
542 Seed {
543 id: uuid::Uuid::new_v4(),
544 goal: goal.to_string(),
545 constraints: vec![],
546 acceptance_criteria: vec![],
547 ontology: vec![],
548 created_at: chrono::Utc::now(),
549 generation: 0,
550 parent_seed_id: None,
551 cspace_hint: None,
552 original_request: String::new(),
553 output_schema: None,
554 }
555 }
556
557 #[tokio::test]
558 async fn test_fork_creates_agent() {
559 let supervisor = make_supervisor().await;
560 let seed = make_seed("Test agent");
561
562 let id = supervisor.fork(&seed).await.unwrap();
563
564 let agents = supervisor.list().await.unwrap();
565 assert_eq!(agents.len(), 1);
566 assert_eq!(agents[0].id, id);
567 assert_eq!(agents[0].name, "Test agent");
568 assert_eq!(agents[0].status, AgentStatus::Starting);
569 assert_eq!(agents[0].seed_id, Some(seed.id));
570 }
571
572 #[tokio::test]
573 async fn test_exec_updates_status_to_running() {
574 let supervisor = make_supervisor().await;
575 let seed = make_seed("Running agent");
576
577 let id = supervisor.fork(&seed).await.unwrap();
578 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Starting);
579
580 supervisor.exec(id).await.unwrap();
581 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
582 }
583
584 #[tokio::test]
585 async fn test_kill_sets_stopped() {
586 let supervisor = make_supervisor().await;
587 let seed = make_seed("Doomed agent");
588
589 let id = supervisor.fork(&seed).await.unwrap();
590 supervisor.exec(id).await.unwrap();
591 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
592
593 supervisor.kill(id).await.unwrap();
594 assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Stopped);
595 }
596
597 #[tokio::test]
598 async fn test_kill_unknown_agent_returns_error() {
599 let supervisor = make_supervisor().await;
600 let unknown_id = uuid::Uuid::new_v4();
601
602 let result = supervisor.kill(unknown_id).await;
603 assert!(result.is_err());
604 assert!(result.unwrap_err().to_string().contains("not found"));
605 }
606
607 #[tokio::test]
608 async fn test_list_returns_all_agents() {
609 let supervisor = make_supervisor().await;
610
611 let id1 = supervisor.fork(&make_seed("Agent 1")).await.unwrap();
612 let id2 = supervisor.fork(&make_seed("Agent 2")).await.unwrap();
613 let id3 = supervisor.fork(&make_seed("Agent 3")).await.unwrap();
614
615 let agents = supervisor.list().await.unwrap();
616 assert_eq!(agents.len(), 3);
617
618 let ids: std::collections::HashSet<AgentId> = agents.iter().map(|a| a.id).collect();
619 assert!(ids.contains(&id1));
620 assert!(ids.contains(&id2));
621 assert!(ids.contains(&id3));
622 }
623
624 #[tokio::test]
625 async fn test_exec_unknown_agent_returns_error() {
626 let supervisor = make_supervisor().await;
627 let unknown_id = uuid::Uuid::new_v4();
628
629 let result = supervisor.exec(unknown_id).await;
630 assert!(result.is_err());
631 assert!(result.unwrap_err().to_string().contains("not found"));
632 }
633
634 #[tokio::test]
635 async fn test_wait_unknown_agent_returns_error() {
636 let supervisor = make_supervisor().await;
637 let unknown_id = uuid::Uuid::new_v4();
638
639 let result = supervisor.wait(unknown_id).await;
640 assert!(result.is_err());
641 assert!(result.unwrap_err().to_string().contains("not found"));
642 }
643}
644
645pub struct NoOpSupervisor;
651
652#[async_trait::async_trait]
653impl Supervisor for NoOpSupervisor {
654 async fn fork(&self, _spec: &Seed) -> Result<AgentId> {
655 Err(anyhow::anyhow!(
656 "NoOpSupervisor: fork not available during build"
657 ))
658 }
659 async fn exec(&self, _id: AgentId) -> Result<()> {
660 Err(anyhow::anyhow!(
661 "NoOpSupervisor: exec not available during build"
662 ))
663 }
664 async fn run_with_seed(&self, _id: AgentId, _seed: &Seed) -> Result<ExecutionResult> {
665 Err(anyhow::anyhow!(
666 "NoOpSupervisor: run_with_seed not available during build"
667 ))
668 }
669 async fn wait(&self, _id: AgentId) -> Result<AgentStatus> {
670 Err(anyhow::anyhow!(
671 "NoOpSupervisor: wait not available during build"
672 ))
673 }
674 async fn kill(&self, _id: AgentId) -> Result<()> {
675 Err(anyhow::anyhow!(
676 "NoOpSupervisor: kill not available during build"
677 ))
678 }
679 async fn list(&self) -> Result<Vec<AgentInfo>> {
680 Ok(Vec::new())
681 }
682}