Skip to main content

oxios_kernel/
supervisor.rs

1//! Supervisor: agent lifecycle management.
2//!
3//! The supervisor handles forking, executing, monitoring, and
4//! terminating agent instances. It is the "init" of Oxios.
5//!
6//! When an agent is forked and executed, the supervisor delegates
7//! the actual tool-calling loop to the [`AgentRuntime`].
8
9use anyhow::Result;
10use async_trait::async_trait;
11use chrono::Utc;
12use oxios_ouroboros::Seed;
13use parking_lot::RwLock;
14use std::collections::HashMap;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use tokio::task::JoinHandle;
18
19use crate::agent_runtime::AgentRuntime;
20use crate::event_bus::EventBus;
21use crate::resource_monitor::ResourceMonitor;
22use crate::types::{AgentId, AgentInfo, AgentStatus};
23use oxios_ouroboros::ExecutionResult;
24
25/// Tracks the runtime handles needed to cancel a running agent.
26struct AgentHandle {
27    /// Flag set on `kill()` to cooperatively signal cancellation.
28    cancelled: Arc<AtomicBool>,
29    /// The tokio task running the agent execution. Aborted on `kill()`.
30    task: JoinHandle<Result<ExecutionResult>>,
31}
32
33/// Supervisor trait for managing agent lifecycles.
34#[async_trait]
35pub trait Supervisor: Send + Sync {
36    /// Fork a new agent from a seed specification.
37    async fn fork(&self, spec: &Seed) -> Result<AgentId>;
38
39    /// Start executing an agent.
40    async fn exec(&self, id: AgentId) -> Result<()>;
41
42    /// Fork and execute an agent with its seed, running to completion.
43    /// Returns the execution result from the agent runtime.
44    async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult>;
45
46    /// Wait for an agent to complete and return its final status.
47    async fn wait(&self, id: AgentId) -> Result<AgentStatus>;
48
49    /// Terminate an agent.
50    async fn kill(&self, id: AgentId) -> Result<()>;
51
52    /// List all known agents.
53    async fn list(&self) -> Result<Vec<AgentInfo>>;
54}
55
56/// Basic in-memory supervisor implementation with AgentRuntime integration.
57pub struct BasicSupervisor {
58    agents: RwLock<HashMap<AgentId, AgentInfo>>,
59    /// Per-agent cancellation tokens and join handles for task abortion.
60    handles: RwLock<HashMap<AgentId, AgentHandle>>,
61    event_bus: EventBus,
62    runtime: Arc<AgentRuntime>,
63    resource_monitor: Option<Arc<ResourceMonitor>>,
64}
65
66impl BasicSupervisor {
67    /// Creates a new supervisor with the given event bus and agent runtime.
68    pub fn new(event_bus: EventBus, runtime: AgentRuntime) -> Self {
69        Self {
70            agents: RwLock::new(HashMap::new()),
71            handles: RwLock::new(HashMap::new()),
72            event_bus,
73            runtime: Arc::new(runtime),
74            resource_monitor: None,
75        }
76    }
77
78    /// Attach a resource monitor for agent count tracking.
79    pub fn set_resource_monitor(&mut self, rm: Arc<ResourceMonitor>) {
80        self.resource_monitor = Some(rm);
81    }
82
83    /// Update the resource monitor with current active agent count.
84    fn update_agent_count(&self) {
85        if let Some(ref rm) = self.resource_monitor {
86            let count = self.agents.read().len();
87            rm.set_active_agents(count);
88        }
89    }
90}
91
92#[async_trait]
93impl Supervisor for BasicSupervisor {
94    async fn fork(&self, spec: &Seed) -> Result<AgentId> {
95        let id = AgentId::new_v4();
96        let info = AgentInfo {
97            id,
98            name: spec.goal.clone(),
99            status: AgentStatus::Starting,
100            created_at: Utc::now(),
101            seed_id: Some(spec.id),
102        };
103
104        {
105            let mut agents = self.agents.write();
106            agents.insert(id, info);
107        }
108
109        self.update_agent_count();
110
111        let _ = self
112            .event_bus
113            .publish(crate::event_bus::KernelEvent::AgentCreated {
114                id,
115                name: spec.goal.clone(),
116            });
117
118        tracing::info!(agent_id = %id, "Forked new agent from seed");
119        Ok(id)
120    }
121
122    async fn exec(&self, id: AgentId) -> Result<()> {
123        {
124            let mut agents = self.agents.write();
125            match agents.get_mut(&id) {
126                Some(agent) => {
127                    agent.status = AgentStatus::Running;
128                }
129                None => anyhow::bail!("Agent {id} not found"),
130            }
131        }
132
133        self.update_agent_count();
134
135        let _ = self
136            .event_bus
137            .publish(crate::event_bus::KernelEvent::AgentStarted { id });
138        tracing::info!(agent_id = %id, "Agent execution started");
139
140        Ok(())
141    }
142
143    async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult> {
144        // Mark as running.
145        {
146            let mut agents = self.agents.write();
147            match agents.get_mut(&id) {
148                Some(agent) => agent.status = AgentStatus::Running,
149                None => anyhow::bail!("Agent {id} not found"),
150            }
151        }
152
153        let _ = self
154            .event_bus
155            .publish(crate::event_bus::KernelEvent::AgentStarted { id });
156
157        tracing::info!(agent_id = %id, seed_id = %seed.id, "Running agent task");
158
159        // Spawn the execution as a tokio task so we can track and abort it.
160        let cancelled = Arc::new(AtomicBool::new(false));
161        let runtime = Arc::clone(&self.runtime);
162        let seed = seed.clone();
163        let cancelled_clone = cancelled.clone();
164
165        let handle: JoinHandle<Result<ExecutionResult>> = tokio::spawn(async move {
166            // Check for cancellation before starting.
167            if cancelled_clone.load(Ordering::Relaxed) {
168                return Ok(ExecutionResult {
169                    output: "Agent cancelled before execution".into(),
170                    steps_completed: 0,
171                    success: false,
172                });
173            }
174            runtime.execute(id, &seed).await
175        });
176
177        // Store the handle so kill() can abort the task.
178        {
179            let mut handles = self.handles.write();
180            handles.insert(
181                id,
182                AgentHandle {
183                    cancelled,
184                    task: handle,
185                },
186            );
187        }
188
189        // Await the spawned task.
190        let result = {
191            let agent_handle = {
192                let mut handles = self.handles.write();
193                handles.remove(&id)
194            };
195            // Guard is dropped above, safe to await.
196
197            match agent_handle {
198                Some(ah) => match ah.task.await {
199                    Ok(res) => res,
200                    Err(join_err) => {
201                        // Task was aborted (e.g. kill()) or panicked.
202                        tracing::warn!(agent_id = %id, error = %join_err, "Agent task join error");
203                        Ok(ExecutionResult {
204                            output: format!("Agent task aborted: {join_err}"),
205                            steps_completed: 0,
206                            success: false,
207                        })
208                    }
209                },
210                None => anyhow::bail!("Agent {id} handle disappeared"),
211            }
212        };
213
214        match result {
215            Ok(result) => {
216                tracing::info!(
217                    agent_id = %id,
218                    success = result.success,
219                    steps = result.steps_completed,
220                    "Agent task completed"
221                );
222
223                {
224                    let mut agents = self.agents.write();
225                    if let Some(agent) = agents.get_mut(&id) {
226                        agent.status = if result.success {
227                            AgentStatus::Idle
228                        } else {
229                            AgentStatus::Failed
230                        };
231                    }
232                }
233
234                let _ = self
235                    .event_bus
236                    .publish(crate::event_bus::KernelEvent::AgentStopped { id });
237                self.update_agent_count();
238                Ok(result)
239            }
240            Err(e) => {
241                tracing::error!(agent_id = %id, error = %e, "Agent task failed");
242
243                {
244                    let mut agents = self.agents.write();
245                    if let Some(agent) = agents.get_mut(&id) {
246                        agent.status = AgentStatus::Failed;
247                    }
248                }
249
250                let _ = self
251                    .event_bus
252                    .publish(crate::event_bus::KernelEvent::AgentFailed {
253                        id,
254                        error: e.to_string(),
255                    });
256                self.update_agent_count();
257
258                Ok(ExecutionResult {
259                    output: format!("Agent failed: {e}"),
260                    steps_completed: 0,
261                    success: false,
262                })
263            }
264        }
265    }
266
267    async fn wait(&self, id: AgentId) -> Result<AgentStatus> {
268        let agents = self.agents.read();
269        match agents.get(&id) {
270            Some(info) => Ok(info.status),
271            None => anyhow::bail!("Agent {id} not found"),
272        }
273    }
274
275    async fn kill(&self, id: AgentId) -> Result<()> {
276        // Cancel and abort the running task, if any.
277        {
278            let mut handles = self.handles.write();
279            if let Some(agent_handle) = handles.remove(&id) {
280                agent_handle.cancelled.store(true, Ordering::Relaxed);
281                agent_handle.task.abort();
282                tracing::info!(agent_id = %id, "Agent task aborted");
283            }
284        }
285
286        {
287            let mut agents = self.agents.write();
288            if let Some(agent) = agents.get_mut(&id) {
289                agent.status = AgentStatus::Stopped;
290            } else {
291                anyhow::bail!("Agent {id} not found");
292            }
293        }
294
295        let _ = self
296            .event_bus
297            .publish(crate::event_bus::KernelEvent::AgentStopped { id });
298        self.update_agent_count();
299        tracing::info!(agent_id = %id, "Agent killed");
300        Ok(())
301    }
302
303    async fn list(&self) -> Result<Vec<AgentInfo>> {
304        let agents = self.agents.read();
305        Ok(agents.values().cloned().collect())
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use crate::event_bus::EventBus;
313    use crate::types::AgentStatus;
314    use async_trait::async_trait;
315    use futures::Stream;
316    use oxi_sdk::{Context, Model, ProviderError, ProviderEvent, StreamOptions};
317    use oxios_ouroboros::Seed;
318    use std::pin::Pin;
319
320    /// Minimal mock LLM provider for constructing an AgentRuntime in tests.
321    struct MockProvider;
322
323    #[async_trait]
324    impl oxi_sdk::Provider for MockProvider {
325        async fn stream(
326            &self,
327            _model: &Model,
328            _context: &Context,
329            _options: Option<StreamOptions>,
330        ) -> Result<Pin<Box<dyn Stream<Item = ProviderEvent> + Send>>, ProviderError> {
331            // Return an empty stream — never actually invoked in supervisor lifecycle tests.
332            let stream = futures::stream::empty();
333            Ok(Box::pin(stream))
334        }
335
336        fn name(&self) -> &str {
337            "mock"
338        }
339    }
340
341    /// Helper to create a real BasicSupervisor wired to a real EventBus.
342    fn make_supervisor() -> BasicSupervisor {
343        use std::path::PathBuf;
344
345        let event_bus = EventBus::new(64);
346        let provider = Arc::new(MockProvider);
347
348        // Build a mock KernelHandle with temp dirs.
349        let tmp = std::env::temp_dir().join(format!("oxios-test-{}", uuid::Uuid::new_v4()));
350        let _ = std::fs::create_dir_all(&tmp);
351
352        let state_store_2 =
353            Arc::new(crate::state_store::StateStore::new(tmp.join("state")).expect("state store"));
354        let state_store = state_store_2.clone();
355        let state_store_for_space = state_store_2.clone();
356        let memory_manager = Arc::new({
357            let mut mm = crate::memory::MemoryManager::new(state_store.clone());
358            mm.set_git_layer(Arc::new(
359                crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
360            ));
361            mm
362        });
363
364        let kernel_handle = Arc::new(crate::KernelHandle::new(
365            crate::kernel_handle::StateApi::new(state_store),
366            crate::kernel_handle::AgentApi::new(
367                Arc::new(crate::supervisor::NoOpSupervisor),
368                Arc::new(crate::budget::BudgetManager::new()),
369                memory_manager,
370            ),
371            crate::kernel_handle::SecurityApi::new(
372                Arc::new(parking_lot::Mutex::new(crate::auth::AuthManager::new())),
373                Arc::new(crate::audit_trail::AuditTrail::new(100)),
374                Arc::new(parking_lot::Mutex::new(
375                    crate::access_manager::AccessManager::new(),
376                )),
377                Arc::new(
378                    crate::state_store::StateStore::new(tmp.join("state2")).expect("state store 2"),
379                ),
380            ),
381            crate::kernel_handle::PersonaApi::new(Arc::new(
382                crate::persona_manager::PersonaManager::new(),
383            )),
384            crate::kernel_handle::ExtensionApi::new(
385                Arc::new(crate::program::ProgramManager::new(tmp.join("programs"))),
386                Arc::new(crate::skill::SkillStore::new(tmp.join("skills")).expect("skill store")),
387                Arc::new(crate::host_tools::HostToolValidator::new(vec![], vec![])),
388            ),
389            crate::kernel_handle::McpApi::new(Arc::new(crate::mcp::McpBridge::new())),
390            crate::kernel_handle::InfraApi::new(
391                Arc::new(
392                    crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
393                ),
394                Arc::new(crate::scheduler::AgentScheduler::new(4, 60, 300)),
395                Arc::new(crate::cron::CronScheduler::new(
396                    Arc::new(
397                        crate::state_store::StateStore::new(tmp.join("cron")).expect("cron state"),
398                    ),
399                    60,
400                )),
401                Arc::new(crate::resource_monitor::ResourceMonitor::new(60, 100)),
402                EventBus::new(64),
403                crate::config::OxiosConfig::default(),
404                std::time::Instant::now(),
405            ),
406            crate::kernel_handle::SpaceApi::new(
407                Arc::new(
408                    tokio::runtime::Handle::current()
409                        .block_on(crate::space::SpaceManager::new(
410                            state_store_for_space,
411                            EventBus::new(64),
412                        ))
413                        .expect("space mgr"),
414                ),
415                EventBus::new(64),
416            ),
417            crate::kernel_handle::ExecApi::new(
418                Arc::new(crate::config::ExecConfig::default()),
419                Arc::new(parking_lot::Mutex::new(
420                    crate::access_manager::AccessManager::new(),
421                )),
422            ),
423            crate::kernel_handle::BrowserApi::default(),
424            crate::kernel_handle::A2aApi::new(Arc::new(crate::a2a::A2AProtocol::new(
425                EventBus::new(64),
426            ))),
427        ));
428
429        let runtime = AgentRuntime::new(provider, "mock/model", kernel_handle);
430        BasicSupervisor::new(event_bus, runtime)
431    }
432
433    /// Helper to create a minimal Seed for testing.
434    fn make_seed(goal: &str) -> Seed {
435        Seed {
436            id: uuid::Uuid::new_v4(),
437            goal: goal.to_string(),
438            constraints: vec![],
439            acceptance_criteria: vec![],
440            ontology: vec![],
441            created_at: chrono::Utc::now(),
442            generation: 0,
443            parent_seed_id: None,
444            cspace_hint: None,
445        }
446    }
447
448    #[tokio::test]
449    async fn test_fork_creates_agent() {
450        let supervisor = make_supervisor();
451        let seed = make_seed("Test agent");
452
453        let id = supervisor.fork(&seed).await.unwrap();
454
455        let agents = supervisor.list().await.unwrap();
456        assert_eq!(agents.len(), 1);
457        assert_eq!(agents[0].id, id);
458        assert_eq!(agents[0].name, "Test agent");
459        assert_eq!(agents[0].status, AgentStatus::Starting);
460        assert_eq!(agents[0].seed_id, Some(seed.id));
461    }
462
463    #[tokio::test]
464    async fn test_exec_updates_status_to_running() {
465        let supervisor = make_supervisor();
466        let seed = make_seed("Running agent");
467
468        let id = supervisor.fork(&seed).await.unwrap();
469        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Starting);
470
471        supervisor.exec(id).await.unwrap();
472        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
473    }
474
475    #[tokio::test]
476    async fn test_kill_sets_stopped() {
477        let supervisor = make_supervisor();
478        let seed = make_seed("Doomed agent");
479
480        let id = supervisor.fork(&seed).await.unwrap();
481        supervisor.exec(id).await.unwrap();
482        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
483
484        supervisor.kill(id).await.unwrap();
485        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Stopped);
486    }
487
488    #[tokio::test]
489    async fn test_kill_unknown_agent_returns_error() {
490        let supervisor = make_supervisor();
491        let unknown_id = uuid::Uuid::new_v4();
492
493        let result = supervisor.kill(unknown_id).await;
494        assert!(result.is_err());
495        assert!(result.unwrap_err().to_string().contains("not found"));
496    }
497
498    #[tokio::test]
499    async fn test_list_returns_all_agents() {
500        let supervisor = make_supervisor();
501
502        let id1 = supervisor.fork(&make_seed("Agent 1")).await.unwrap();
503        let id2 = supervisor.fork(&make_seed("Agent 2")).await.unwrap();
504        let id3 = supervisor.fork(&make_seed("Agent 3")).await.unwrap();
505
506        let agents = supervisor.list().await.unwrap();
507        assert_eq!(agents.len(), 3);
508
509        let ids: std::collections::HashSet<AgentId> = agents.iter().map(|a| a.id).collect();
510        assert!(ids.contains(&id1));
511        assert!(ids.contains(&id2));
512        assert!(ids.contains(&id3));
513    }
514
515    #[tokio::test]
516    async fn test_exec_unknown_agent_returns_error() {
517        let supervisor = make_supervisor();
518        let unknown_id = uuid::Uuid::new_v4();
519
520        let result = supervisor.exec(unknown_id).await;
521        assert!(result.is_err());
522        assert!(result.unwrap_err().to_string().contains("not found"));
523    }
524
525    #[tokio::test]
526    async fn test_wait_unknown_agent_returns_error() {
527        let supervisor = make_supervisor();
528        let unknown_id = uuid::Uuid::new_v4();
529
530        let result = supervisor.wait(unknown_id).await;
531        assert!(result.is_err());
532        assert!(result.unwrap_err().to_string().contains("not found"));
533    }
534}
535
536/// A no-op supervisor used during KernelBuilder::build() to break the
537/// KernelHandle → AgentRuntime → Supervisor → KernelHandle cycle.
538///
539/// AgentApi.supervisor is only used for list/kill operations, not during
540/// tool registration, so this placeholder is safe during build time.
541pub struct NoOpSupervisor;
542
543#[async_trait::async_trait]
544impl Supervisor for NoOpSupervisor {
545    async fn fork(&self, _spec: &Seed) -> Result<AgentId> {
546        Err(anyhow::anyhow!(
547            "NoOpSupervisor: fork not available during build"
548        ))
549    }
550    async fn exec(&self, _id: AgentId) -> Result<()> {
551        Err(anyhow::anyhow!(
552            "NoOpSupervisor: exec not available during build"
553        ))
554    }
555    async fn run_with_seed(&self, _id: AgentId, _seed: &Seed) -> Result<ExecutionResult> {
556        Err(anyhow::anyhow!(
557            "NoOpSupervisor: run_with_seed not available during build"
558        ))
559    }
560    async fn wait(&self, _id: AgentId) -> Result<AgentStatus> {
561        Err(anyhow::anyhow!(
562            "NoOpSupervisor: wait not available during build"
563        ))
564    }
565    async fn kill(&self, _id: AgentId) -> Result<()> {
566        Err(anyhow::anyhow!(
567            "NoOpSupervisor: kill not available during build"
568        ))
569    }
570    async fn list(&self) -> Result<Vec<AgentInfo>> {
571        Ok(Vec::new())
572    }
573}