Skip to main content

oxios_kernel/
supervisor.rs

1//! Supervisor: agent lifecycle management.
2//!
3//! The supervisor handles forking, executing, monitoring, and
4//! terminating agent instances. It is the "init" of Oxios.
5//!
6//! When an agent is forked and executed, the supervisor delegates
7//! the actual tool-calling loop to the [`AgentRuntime`].
8//!
9//! # Agent Pool & Session Persistence
10//!
11//! Agents are retained in an [`AgentPool`] after execution for:
12//! - **Session continuation** via `Agent::continue_with()` — multi-turn
13//!   conversations without re-creating the agent.
14//! - **State export/import** — serialize agent conversation history to
15//!   JSON for crash recovery, migration, or debugging.
16//! - **Provider rate limiting** — all agents share a [`ProviderPool`] to
17//!   respect per-provider RPM/concurrency limits.
18
19use anyhow::Result;
20use async_trait::async_trait;
21use chrono::Utc;
22use oxi_sdk::Agent;
23use oxios_ouroboros::Seed;
24use parking_lot::RwLock;
25use std::collections::HashMap;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::Arc;
28use tokio::task::JoinHandle;
29
30use crate::agent_runtime::AgentRuntime;
31use crate::event_bus::EventBus;
32use crate::resource_monitor::ResourceMonitor;
33use crate::session_context::SessionContext;
34use crate::types::{AgentId, AgentInfo, AgentStatus};
35use oxios_ouroboros::ExecutionResult;
36
37/// Tracks the runtime handles needed to cancel a running agent.
38struct AgentHandle {
39    /// Flag set on `kill()` to cooperatively signal cancellation.
40    cancelled: Arc<AtomicBool>,
41    /// The tokio task running the agent execution. Aborted on `kill()`.
42    task: JoinHandle<Result<ExecutionResult>>,
43}
44
45/// Pool of live `Agent` instances, keyed by AgentId.
46///
47/// Retains agents after execution for:
48/// - **State persistence** — `export_state()` serializes conversation history
49///   to JSON for crash recovery, migration, or debugging.
50/// - **State restoration** — `import_state()` restores a previous session.
51#[derive(Default)]
52pub struct AgentPool {
53    agents: RwLock<HashMap<AgentId, Arc<Agent>>>,
54}
55
56impl AgentPool {
57    /// Create an empty agent pool.
58    pub fn new() -> Self {
59        Self {
60            agents: RwLock::new(HashMap::new()),
61        }
62    }
63
64    /// Insert an agent into the pool.
65    pub fn insert(&self, id: AgentId, agent: Arc<Agent>) {
66        self.agents.write().insert(id, agent);
67    }
68
69    /// Get a pooled agent by ID.
70    pub fn get(&self, id: &AgentId) -> Option<Arc<Agent>> {
71        self.agents.read().get(id).cloned()
72    }
73
74    /// Remove an agent from the pool.
75    pub fn remove(&self, id: &AgentId) -> Option<Arc<Agent>> {
76        self.agents.write().remove(id)
77    }
78
79    /// Export an agent's state as JSON.
80    ///
81    /// Returns `None` if the agent is not in the pool or export fails.
82    pub fn export_state(&self, id: &AgentId) -> Option<serde_json::Value> {
83        self.agents
84            .read()
85            .get(id)
86            .and_then(|agent| agent.export_state().ok())
87    }
88
89    /// Import agent state from JSON.
90    ///
91    /// Returns `false` if the agent is not in the pool or import fails.
92    pub fn import_state(&self, id: &AgentId, state: serde_json::Value) -> bool {
93        if let Some(agent) = self.agents.read().get(id) {
94            agent.import_state(state).is_ok()
95        } else {
96            false
97        }
98    }
99
100    /// Number of agents currently in the pool.
101    pub fn len(&self) -> usize {
102        self.agents.read().len()
103    }
104
105    /// Whether the pool is empty.
106    pub fn is_empty(&self) -> bool {
107        self.agents.read().is_empty()
108    }
109}
110
111/// Supervisor trait for managing agent lifecycles.
112#[async_trait]
113pub trait Supervisor: Send + Sync {
114    /// Fork a new agent from a seed specification.
115    async fn fork(&self, spec: &Seed) -> Result<AgentId>;
116
117    /// Start executing an agent.
118    async fn exec(&self, id: AgentId) -> Result<()>;
119
120    /// Fork and execute an agent with its seed, running to completion.
121    /// Returns the execution result from the agent runtime.
122    async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult>;
123
124    /// Wait for an agent to complete and return its final status.
125    async fn wait(&self, id: AgentId) -> Result<AgentStatus>;
126
127    /// Terminate an agent.
128    async fn kill(&self, id: AgentId) -> Result<()>;
129
130    /// List all known agents.
131    async fn list(&self) -> Result<Vec<AgentInfo>>;
132}
133
134/// Basic in-memory supervisor implementation with AgentRuntime integration.
135pub struct BasicSupervisor {
136    agents: RwLock<HashMap<AgentId, AgentInfo>>,
137    /// Per-agent cancellation tokens and join handles for task abortion.
138    handles: RwLock<HashMap<AgentId, AgentHandle>>,
139    /// Pool of live Agent instances for session continuation.
140    agent_pool: AgentPool,
141    event_bus: EventBus,
142    runtime: Arc<AgentRuntime>,
143    resource_monitor: Option<Arc<ResourceMonitor>>,
144    /// Session context for proactive recall timing (RFC-020).
145    /// Shared across all Seed executions within this supervisor's lifetime
146    /// so that RecallTiming can track message count and topic changes.
147    /// Uses tokio::sync::RwLock (not parking_lot) so the guard is Send,
148    /// allowing it to be held across .await in tokio::spawn.
149    session_context: Arc<tokio::sync::RwLock<SessionContext>>,
150}
151
152impl BasicSupervisor {
153    /// Creates a new supervisor with the given event bus and agent runtime.
154    pub fn new(event_bus: EventBus, runtime: AgentRuntime) -> Self {
155        Self {
156            agents: RwLock::new(HashMap::new()),
157            handles: RwLock::new(HashMap::new()),
158            agent_pool: AgentPool::new(),
159            event_bus,
160            runtime: Arc::new(runtime),
161            resource_monitor: None,
162            session_context: Arc::new(tokio::sync::RwLock::new(SessionContext::new())),
163        }
164    }
165
166    /// Attach a resource monitor for agent count tracking.
167    pub fn set_resource_monitor(&mut self, rm: Arc<ResourceMonitor>) {
168        self.resource_monitor = Some(rm);
169    }
170
171    /// Update the resource monitor with current active agent count.
172    fn update_agent_count(&self) {
173        if let Some(ref rm) = self.resource_monitor {
174            let count = self.agents.read().len();
175            rm.set_active_agents(count);
176        }
177    }
178
179    /// Access the agent pool for session continuation.
180    pub fn pool(&self) -> &AgentPool {
181        &self.agent_pool
182    }
183}
184
185#[async_trait]
186impl Supervisor for BasicSupervisor {
187    async fn fork(&self, spec: &Seed) -> Result<AgentId> {
188        let id = AgentId::new_v4();
189        let info = AgentInfo {
190            id,
191            name: spec.goal.clone(),
192            status: AgentStatus::Starting,
193            created_at: Utc::now(),
194            seed_id: Some(spec.id),
195        };
196
197        {
198            let mut agents = self.agents.write();
199            agents.insert(id, info);
200        }
201
202        self.update_agent_count();
203
204        let _ = self
205            .event_bus
206            .publish(crate::event_bus::KernelEvent::AgentCreated {
207                id,
208                name: spec.goal.clone(),
209            });
210
211        tracing::info!(agent_id = %id, "Forked new agent from seed");
212        Ok(id)
213    }
214
215    async fn exec(&self, id: AgentId) -> Result<()> {
216        {
217            let mut agents = self.agents.write();
218            match agents.get_mut(&id) {
219                Some(agent) => {
220                    agent.status = AgentStatus::Running;
221                }
222                None => anyhow::bail!("Agent {id} not found"),
223            }
224        }
225
226        self.update_agent_count();
227
228        let _ = self
229            .event_bus
230            .publish(crate::event_bus::KernelEvent::AgentStarted { id });
231        tracing::info!(agent_id = %id, "Agent execution started");
232
233        Ok(())
234    }
235
236    async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult> {
237        // Mark as running.
238        {
239            let mut agents = self.agents.write();
240            match agents.get_mut(&id) {
241                Some(agent) => agent.status = AgentStatus::Running,
242                None => anyhow::bail!("Agent {id} not found"),
243            }
244        }
245
246        let _ = self
247            .event_bus
248            .publish(crate::event_bus::KernelEvent::AgentStarted { id });
249
250        tracing::info!(agent_id = %id, seed_id = %seed.id, "Running agent task");
251
252        // Spawn the execution as a tokio task so we can track and abort it.
253        let cancelled = Arc::new(AtomicBool::new(false));
254        let runtime = Arc::clone(&self.runtime);
255        let seed = seed.clone();
256        let cancelled_clone = cancelled.clone();
257
258        // Share the session context so RecallTiming persists across Seeds.
259        // Uses tokio::sync::RwLock so the guard is Send-safe across .await.
260        let session_ctx = self.session_context.clone();
261
262        let handle: JoinHandle<Result<ExecutionResult>> = tokio::spawn(async move {
263            // Check for cancellation before starting.
264            if cancelled_clone.load(Ordering::Relaxed) {
265                return Ok(ExecutionResult {
266                    output: "Agent cancelled before execution".into(),
267                    steps_completed: 0,
268                    success: false,
269                    tool_calls: vec![],
270                });
271            }
272            let mut ctx = session_ctx.write().await;
273            runtime.execute(id, &seed, &mut ctx).await
274        });
275
276        // Store the handle so kill() can abort the task.
277        {
278            let mut handles = self.handles.write();
279            handles.insert(
280                id,
281                AgentHandle {
282                    cancelled,
283                    task: handle,
284                },
285            );
286        }
287
288        // Await the spawned task.
289        let result = {
290            let agent_handle = {
291                let mut handles = self.handles.write();
292                handles.remove(&id)
293            };
294            // Guard is dropped above, safe to await.
295
296            match agent_handle {
297                Some(ah) => match ah.task.await {
298                    Ok(res) => res,
299                    Err(join_err) => {
300                        // Task was aborted (e.g. kill()) or panicked.
301                        tracing::warn!(agent_id = %id, error = %join_err, "Agent task join error");
302                        Ok(ExecutionResult {
303                            output: format!("Agent task aborted: {join_err}"),
304                            steps_completed: 0,
305                            success: false,
306                            tool_calls: vec![],
307                        })
308                    }
309                },
310                None => anyhow::bail!("Agent {id} handle disappeared"),
311            }
312        };
313
314        match result {
315            Ok(result) => {
316                tracing::info!(
317                    agent_id = %id,
318                    success = result.success,
319                    steps = result.steps_completed,
320                    "Agent task completed"
321                );
322
323                {
324                    let mut agents = self.agents.write();
325                    if let Some(agent) = agents.get_mut(&id) {
326                        agent.status = if result.success {
327                            AgentStatus::Idle
328                        } else {
329                            AgentStatus::Failed
330                        };
331                    }
332                }
333
334                let _ = self
335                    .event_bus
336                    .publish(crate::event_bus::KernelEvent::AgentStopped { id });
337                self.update_agent_count();
338                Ok(result)
339            }
340            Err(e) => {
341                tracing::error!(agent_id = %id, error = %e, "Agent task failed");
342
343                {
344                    let mut agents = self.agents.write();
345                    if let Some(agent) = agents.get_mut(&id) {
346                        agent.status = AgentStatus::Failed;
347                    }
348                }
349
350                let _ = self
351                    .event_bus
352                    .publish(crate::event_bus::KernelEvent::AgentFailed {
353                        id,
354                        error: e.to_string(),
355                    });
356                self.update_agent_count();
357
358                Ok(ExecutionResult {
359                    output: format!("Agent failed: {e}"),
360                    steps_completed: 0,
361                    success: false,
362                    tool_calls: vec![],
363                })
364            }
365        }
366    }
367
368    async fn wait(&self, id: AgentId) -> Result<AgentStatus> {
369        let agents = self.agents.read();
370        match agents.get(&id) {
371            Some(info) => Ok(info.status),
372            None => anyhow::bail!("Agent {id} not found"),
373        }
374    }
375
376    async fn kill(&self, id: AgentId) -> Result<()> {
377        // Cancel and abort the running task, if any.
378        {
379            let mut handles = self.handles.write();
380            if let Some(agent_handle) = handles.remove(&id) {
381                agent_handle.cancelled.store(true, Ordering::Relaxed);
382                agent_handle.task.abort();
383                tracing::info!(agent_id = %id, "Agent task aborted");
384            }
385        }
386
387        {
388            let mut agents = self.agents.write();
389            if let Some(agent) = agents.get_mut(&id) {
390                agent.status = AgentStatus::Stopped;
391            } else {
392                anyhow::bail!("Agent {id} not found");
393            }
394        }
395
396        let _ = self
397            .event_bus
398            .publish(crate::event_bus::KernelEvent::AgentStopped { id });
399        self.update_agent_count();
400        tracing::info!(agent_id = %id, "Agent killed");
401        Ok(())
402    }
403
404    async fn list(&self) -> Result<Vec<AgentInfo>> {
405        let agents = self.agents.read();
406        Ok(agents.values().cloned().collect())
407    }
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413    use crate::event_bus::EventBus;
414    use crate::types::AgentStatus;
415    // Note: imports kept for potential future test extensions.
416    use oxios_ouroboros::Seed;
417
418    // Note: MockProvider no longer needed — OxiosEngine handles provider resolution.
419    // The engine resolves models internally, so tests just use OxiosEngine::new().
420
421    /// Helper to create a real BasicSupervisor wired to a real EventBus.
422    async fn make_supervisor() -> BasicSupervisor {
423        let event_bus = EventBus::new(64);
424
425        // Build a mock KernelHandle with temp dirs.
426        let tmp = std::env::temp_dir().join(format!("oxios-test-{}", uuid::Uuid::new_v4()));
427        let _ = std::fs::create_dir_all(&tmp);
428
429        let state_store_2 =
430            Arc::new(crate::state_store::StateStore::new(tmp.join("state")).expect("state store"));
431        let state_store = state_store_2.clone();
432        let state_store_for_space = state_store_2.clone();
433        let memory_manager = Arc::new({
434            let mut mm = crate::memory::MemoryManager::new(state_store.clone());
435            mm.set_git_layer(Arc::new(
436                crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
437            ));
438            mm
439        });
440
441        let kernel_handle = Arc::new(crate::KernelHandle::new(
442            crate::kernel_handle::StateApi::new(state_store),
443            crate::kernel_handle::AgentApi::new(
444                Arc::new(crate::supervisor::NoOpSupervisor),
445                Arc::new(crate::budget::BudgetManager::new()),
446                memory_manager.clone(),
447                Some(event_bus.clone()),
448            ),
449            crate::kernel_handle::SecurityApi::new(
450                Arc::new(parking_lot::Mutex::new(crate::auth::AuthManager::new())),
451                Arc::new(oxi_sdk::observability::AuditTrail::new(100)),
452                Arc::new(parking_lot::Mutex::new(
453                    crate::access_manager::AccessManager::new(),
454                )),
455                Arc::new(
456                    crate::state_store::StateStore::new(tmp.join("state2")).expect("state store 2"),
457                ),
458            ),
459            crate::kernel_handle::PersonaApi::new(Arc::new(crate::persona::PersonaManager::new())),
460            crate::kernel_handle::ExtensionApi::new(Arc::new(crate::skill::SkillManager::new(
461                tmp.join("skills"),
462                tmp.join("share/skills"),
463            ))),
464            crate::kernel_handle::McpApi::new(Arc::new(crate::mcp::McpBridge::new())),
465            crate::kernel_handle::InfraApi::new(
466                Arc::new(
467                    crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
468                ),
469                Arc::new(crate::scheduler::AgentScheduler::new(4, 60, 300)),
470                Arc::new(crate::cron::CronScheduler::new(
471                    Arc::new(
472                        crate::state_store::StateStore::new(tmp.join("cron")).expect("cron state"),
473                    ),
474                    60,
475                )),
476                Arc::new(crate::resource_monitor::ResourceMonitor::new(60, 100)),
477                EventBus::new(64),
478                crate::config::OxiosConfig::default(),
479                std::time::Instant::now(),
480            ),
481            None,
482            crate::kernel_handle::ExecApi::new(
483                Arc::new(parking_lot::RwLock::new(
484                    crate::config::ExecConfig::default(),
485                )),
486                Arc::new(parking_lot::Mutex::new(
487                    crate::access_manager::AccessManager::new(),
488                )),
489            ),
490            crate::kernel_handle::A2aApi::new(Arc::new(crate::a2a::A2AProtocol::new(
491                EventBus::new(64),
492            ))),
493            crate::kernel_handle::EngineApi::new(
494                Arc::new(parking_lot::RwLock::new(
495                    crate::config::OxiosConfig::default(),
496                )),
497                tmp.join("config.toml"),
498                Arc::new(crate::kernel_handle::RoutingStats::new()),
499                Arc::new(crate::engine::EngineHandle::new(Arc::new(
500                    crate::OxiosEngine::new("anthropic/claude-sonnet-4-20250514"),
501                ))),
502            ),
503            Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
504            Arc::new(
505                crate::kernel_handle::KnowledgeLens::new(
506                    Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
507                    memory_manager.clone(),
508                )
509                .unwrap(),
510            ),
511            crate::kernel_handle::MarketplaceApi::new(
512                Arc::new(crate::skill::clawhub::ClawHubInstaller::new(
513                    tmp.join("skills"),
514                    tmp.join("state"),
515                    None,
516                )),
517                Arc::new(
518                    crate::skill::clawhub::ClawHubClient::new(None).expect("valid ClawHub client"),
519                ),
520                Arc::new(crate::skill::skills_sh::SkillsShInstaller::new(
521                    tmp.join("skills"),
522                    None,
523                    None,
524                )),
525                Arc::new(
526                    crate::skill::skills_sh::SkillsShClient::new(None, None)
527                        .expect("valid Skills.sh client"),
528                ),
529            ),
530            None, // calendar (not configured in test)
531            None, // email (not configured in test)
532        ));
533
534        let engine = crate::OxiosEngine::new("mock/model");
535        let engine_handle = Arc::new(crate::engine::EngineHandle::new(Arc::new(engine)));
536        let runtime = AgentRuntime::new(engine_handle, "mock/model", kernel_handle, None);
537        BasicSupervisor::new(event_bus, runtime)
538    }
539
540    /// Helper to create a minimal Seed for testing.
541    fn make_seed(goal: &str) -> Seed {
542        Seed {
543            id: uuid::Uuid::new_v4(),
544            goal: goal.to_string(),
545            constraints: vec![],
546            acceptance_criteria: vec![],
547            ontology: vec![],
548            created_at: chrono::Utc::now(),
549            generation: 0,
550            parent_seed_id: None,
551            cspace_hint: None,
552            original_request: String::new(),
553            output_schema: None,
554        }
555    }
556
557    #[tokio::test]
558    async fn test_fork_creates_agent() {
559        let supervisor = make_supervisor().await;
560        let seed = make_seed("Test agent");
561
562        let id = supervisor.fork(&seed).await.unwrap();
563
564        let agents = supervisor.list().await.unwrap();
565        assert_eq!(agents.len(), 1);
566        assert_eq!(agents[0].id, id);
567        assert_eq!(agents[0].name, "Test agent");
568        assert_eq!(agents[0].status, AgentStatus::Starting);
569        assert_eq!(agents[0].seed_id, Some(seed.id));
570    }
571
572    #[tokio::test]
573    async fn test_exec_updates_status_to_running() {
574        let supervisor = make_supervisor().await;
575        let seed = make_seed("Running agent");
576
577        let id = supervisor.fork(&seed).await.unwrap();
578        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Starting);
579
580        supervisor.exec(id).await.unwrap();
581        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
582    }
583
584    #[tokio::test]
585    async fn test_kill_sets_stopped() {
586        let supervisor = make_supervisor().await;
587        let seed = make_seed("Doomed agent");
588
589        let id = supervisor.fork(&seed).await.unwrap();
590        supervisor.exec(id).await.unwrap();
591        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
592
593        supervisor.kill(id).await.unwrap();
594        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Stopped);
595    }
596
597    #[tokio::test]
598    async fn test_kill_unknown_agent_returns_error() {
599        let supervisor = make_supervisor().await;
600        let unknown_id = uuid::Uuid::new_v4();
601
602        let result = supervisor.kill(unknown_id).await;
603        assert!(result.is_err());
604        assert!(result.unwrap_err().to_string().contains("not found"));
605    }
606
607    #[tokio::test]
608    async fn test_list_returns_all_agents() {
609        let supervisor = make_supervisor().await;
610
611        let id1 = supervisor.fork(&make_seed("Agent 1")).await.unwrap();
612        let id2 = supervisor.fork(&make_seed("Agent 2")).await.unwrap();
613        let id3 = supervisor.fork(&make_seed("Agent 3")).await.unwrap();
614
615        let agents = supervisor.list().await.unwrap();
616        assert_eq!(agents.len(), 3);
617
618        let ids: std::collections::HashSet<AgentId> = agents.iter().map(|a| a.id).collect();
619        assert!(ids.contains(&id1));
620        assert!(ids.contains(&id2));
621        assert!(ids.contains(&id3));
622    }
623
624    #[tokio::test]
625    async fn test_exec_unknown_agent_returns_error() {
626        let supervisor = make_supervisor().await;
627        let unknown_id = uuid::Uuid::new_v4();
628
629        let result = supervisor.exec(unknown_id).await;
630        assert!(result.is_err());
631        assert!(result.unwrap_err().to_string().contains("not found"));
632    }
633
634    #[tokio::test]
635    async fn test_wait_unknown_agent_returns_error() {
636        let supervisor = make_supervisor().await;
637        let unknown_id = uuid::Uuid::new_v4();
638
639        let result = supervisor.wait(unknown_id).await;
640        assert!(result.is_err());
641        assert!(result.unwrap_err().to_string().contains("not found"));
642    }
643}
644
645/// A no-op supervisor used during KernelBuilder::build() to break the
646/// KernelHandle → AgentRuntime → Supervisor → KernelHandle cycle.
647///
648/// AgentApi.supervisor is only used for list/kill operations, not during
649/// tool registration, so this placeholder is safe during build time.
650pub struct NoOpSupervisor;
651
652#[async_trait::async_trait]
653impl Supervisor for NoOpSupervisor {
654    async fn fork(&self, _spec: &Seed) -> Result<AgentId> {
655        Err(anyhow::anyhow!(
656            "NoOpSupervisor: fork not available during build"
657        ))
658    }
659    async fn exec(&self, _id: AgentId) -> Result<()> {
660        Err(anyhow::anyhow!(
661            "NoOpSupervisor: exec not available during build"
662        ))
663    }
664    async fn run_with_seed(&self, _id: AgentId, _seed: &Seed) -> Result<ExecutionResult> {
665        Err(anyhow::anyhow!(
666            "NoOpSupervisor: run_with_seed not available during build"
667        ))
668    }
669    async fn wait(&self, _id: AgentId) -> Result<AgentStatus> {
670        Err(anyhow::anyhow!(
671            "NoOpSupervisor: wait not available during build"
672        ))
673    }
674    async fn kill(&self, _id: AgentId) -> Result<()> {
675        Err(anyhow::anyhow!(
676            "NoOpSupervisor: kill not available during build"
677        ))
678    }
679    async fn list(&self) -> Result<Vec<AgentInfo>> {
680        Ok(Vec::new())
681    }
682}