Skip to main content

oxios_kernel/
supervisor.rs

1//! Supervisor: agent lifecycle management.
2//!
3//! The supervisor handles forking, executing, monitoring, and
4//! terminating agent instances. It is the "init" of Oxios.
5//!
6//! When an agent is forked and executed, the supervisor delegates
7//! the actual tool-calling loop to the [`AgentRuntime`].
8//!
9//! # Agent Pool & Session Persistence
10//!
11//! Agents are retained in an [`AgentPool`] after execution for:
12//! - **Session continuation** via `Agent::continue_with()` — multi-turn
13//!   conversations without re-creating the agent.
14//! - **State export/import** — serialize agent conversation history to
15//!   JSON for crash recovery, migration, or debugging.
16//! - **Provider rate limiting** — all agents share a [`ProviderPool`] to
17//!   respect per-provider RPM/concurrency limits.
18
19use anyhow::Result;
20use async_trait::async_trait;
21use chrono::Utc;
22use oxi_sdk::Agent;
23use oxios_ouroboros::Seed;
24use parking_lot::RwLock;
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, Ordering};
28use tokio::task::JoinHandle;
29
30use crate::agent_runtime::AgentRuntime;
31use crate::config::AgentLogConfig;
32use crate::event_bus::EventBus;
33use crate::resource_monitor::ResourceMonitor;
34use crate::session_context::SessionContext;
35use crate::state_store::StateStore;
36use crate::types::{AgentId, AgentInfo, AgentStatus};
37use oxios_ouroboros::ExecutionResult;
38
39#[cfg(feature = "sqlite-memory")]
40use crate::agent_log_db::AgentLogDb;
41
42/// Tracks the runtime handles needed to cancel a running agent.
43struct AgentHandle {
44    /// Flag set on `kill()` to cooperatively signal cancellation.
45    cancelled: Arc<AtomicBool>,
46    /// The tokio task running the agent execution. Aborted on `kill()`.
47    task: JoinHandle<()>,
48}
49
50/// Pool of live `Agent` instances, keyed by AgentId.
51///
52/// Retains agents after execution for:
53/// - **State persistence** — `export_state()` serializes conversation history
54///   to JSON for crash recovery, migration, or debugging.
55/// - **State restoration** — `import_state()` restores a previous session.
56#[derive(Default)]
57pub struct AgentPool {
58    agents: RwLock<HashMap<AgentId, Arc<Agent>>>,
59}
60
61impl AgentPool {
62    /// Create an empty agent pool.
63    pub fn new() -> Self {
64        Self {
65            agents: RwLock::new(HashMap::new()),
66        }
67    }
68
69    /// Insert an agent into the pool.
70    pub fn insert(&self, id: AgentId, agent: Arc<Agent>) {
71        self.agents.write().insert(id, agent);
72    }
73
74    /// Get a pooled agent by ID.
75    pub fn get(&self, id: &AgentId) -> Option<Arc<Agent>> {
76        self.agents.read().get(id).cloned()
77    }
78
79    /// Remove an agent from the pool.
80    pub fn remove(&self, id: &AgentId) -> Option<Arc<Agent>> {
81        self.agents.write().remove(id)
82    }
83
84    /// Export an agent's state as JSON.
85    ///
86    /// Returns `None` if the agent is not in the pool or export fails.
87    pub fn export_state(&self, id: &AgentId) -> Option<serde_json::Value> {
88        self.agents
89            .read()
90            .get(id)
91            .and_then(|agent| agent.export_state().ok())
92    }
93
94    /// Import agent state from JSON.
95    ///
96    /// Returns `false` if the agent is not in the pool or import fails.
97    pub fn import_state(&self, id: &AgentId, state: serde_json::Value) -> bool {
98        if let Some(agent) = self.agents.read().get(id) {
99            agent.import_state(state).is_ok()
100        } else {
101            false
102        }
103    }
104
105    /// Number of agents currently in the pool.
106    pub fn len(&self) -> usize {
107        self.agents.read().len()
108    }
109
110    /// Whether the pool is empty.
111    pub fn is_empty(&self) -> bool {
112        self.agents.read().is_empty()
113    }
114}
115
116/// Supervisor trait for managing agent lifecycles.
117#[async_trait]
118pub trait Supervisor: Send + Sync {
119    /// Fork a new agent from a seed specification.
120    async fn fork(&self, spec: &Seed) -> Result<AgentId>;
121
122    /// Start executing an agent.
123    async fn exec(&self, id: AgentId) -> Result<()>;
124
125    /// Fork and execute an agent with its seed, running to completion.
126    /// Returns the execution result from the agent runtime.
127    async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult>;
128
129    /// Wait for an agent to complete and return its final status.
130    async fn wait(&self, id: AgentId) -> Result<AgentStatus>;
131
132    /// Terminate an agent.
133    async fn kill(&self, id: AgentId) -> Result<()>;
134
135    /// List all known agents.
136    async fn list(&self) -> Result<Vec<AgentInfo>>;
137}
138
139/// Basic in-memory supervisor implementation with AgentRuntime integration.
140pub struct BasicSupervisor {
141    agents: RwLock<HashMap<AgentId, AgentInfo>>,
142    /// Per-agent cancellation tokens and join handles for task abortion.
143    handles: RwLock<HashMap<AgentId, AgentHandle>>,
144    /// Pool of live Agent instances for session continuation.
145    agent_pool: AgentPool,
146    event_bus: EventBus,
147    runtime: Arc<AgentRuntime>,
148    resource_monitor: Option<Arc<ResourceMonitor>>,
149    /// Session context for proactive recall timing (RFC-020).
150    /// Shared across all Seed executions within this supervisor's lifetime
151    /// so that RecallTiming can track message count and topic changes.
152    /// Uses tokio::sync::RwLock (not parking_lot) so the guard is Send,
153    /// allowing it to be held across .await in tokio::spawn.
154    session_context: Arc<tokio::sync::RwLock<SessionContext>>,
155    /// Filesystem state store for agent persistence (JSON files).
156    state_store: Option<Arc<StateStore>>,
157    /// SQLite-backed agent history query index.
158    #[cfg(feature = "sqlite-memory")]
159    agent_log_db: Option<Arc<AgentLogDb>>,
160    /// Agent log retention configuration.
161    agent_log_config: AgentLogConfig,
162}
163
164impl BasicSupervisor {
165    /// Creates a new supervisor with the given event bus and agent runtime.
166    pub fn new(event_bus: EventBus, runtime: AgentRuntime) -> Self {
167        Self {
168            agents: RwLock::new(HashMap::new()),
169            handles: RwLock::new(HashMap::new()),
170            agent_pool: AgentPool::new(),
171            event_bus,
172            runtime: Arc::new(runtime),
173            resource_monitor: None,
174            session_context: Arc::new(tokio::sync::RwLock::new(SessionContext::new())),
175            state_store: None,
176            #[cfg(feature = "sqlite-memory")]
177            agent_log_db: None,
178            agent_log_config: AgentLogConfig::default(),
179        }
180    }
181
182    /// Attach a filesystem state store for agent history persistence.
183    pub fn set_state_store(&mut self, store: Arc<StateStore>) {
184        self.state_store = Some(store);
185    }
186
187    /// Attach a SQLite-backed agent history log database.
188    #[cfg(feature = "sqlite-memory")]
189    pub fn set_agent_log_db(&mut self, db: Arc<AgentLogDb>) {
190        self.agent_log_db = Some(db);
191    }
192
193    /// Set agent log retention configuration.
194    pub fn set_agent_log_config(&mut self, config: AgentLogConfig) {
195        self.agent_log_config = config;
196    }
197
198    /// Attach a resource monitor for agent count tracking.
199    pub fn set_resource_monitor(&mut self, rm: Arc<ResourceMonitor>) {
200        self.resource_monitor = Some(rm);
201    }
202
203    /// Update the resource monitor with current active agent count.
204    fn update_agent_count(&self) {
205        if let Some(ref rm) = self.resource_monitor {
206            let count = self.agents.read().len();
207            rm.set_active_agents(count);
208        }
209    }
210
211    /// Access the agent pool for session continuation.
212    pub fn pool(&self) -> &AgentPool {
213        &self.agent_pool
214    }
215}
216
217#[async_trait]
218impl Supervisor for BasicSupervisor {
219    async fn fork(&self, spec: &Seed) -> Result<AgentId> {
220        let id = AgentId::new_v4();
221        let info = AgentInfo {
222            id,
223            name: spec.goal.clone(),
224            status: AgentStatus::Starting,
225            created_at: Utc::now(),
226            seed_id: Some(spec.id),
227            project_id: spec.project_id,
228            started_at: None,
229            completed_at: None,
230            error: None,
231            steps_completed: 0,
232            steps_total: None,
233            tool_calls: vec![],
234            tokens_input: 0,
235            tokens_output: 0,
236            cost_usd: 0.0,
237            model_id: String::new(),
238            session_id: None,
239        };
240
241        {
242            let mut agents = self.agents.write();
243            agents.insert(id, info);
244        }
245
246        self.update_agent_count();
247
248        let _ = self
249            .event_bus
250            .publish(crate::event_bus::KernelEvent::AgentCreated {
251                id,
252                name: spec.goal.clone(),
253            });
254
255        tracing::info!(agent_id = %id, "Forked new agent from seed");
256        Ok(id)
257    }
258
259    async fn exec(&self, id: AgentId) -> Result<()> {
260        {
261            let mut agents = self.agents.write();
262            match agents.get_mut(&id) {
263                Some(agent) => {
264                    agent.status = AgentStatus::Running;
265                }
266                None => anyhow::bail!("Agent {id} not found"),
267            }
268        }
269
270        self.update_agent_count();
271
272        let _ = self
273            .event_bus
274            .publish(crate::event_bus::KernelEvent::AgentStarted { id });
275        tracing::info!(agent_id = %id, "Agent execution started");
276
277        Ok(())
278    }
279
280    async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult> {
281        // Mark as running.
282        {
283            let mut agents = self.agents.write();
284            match agents.get_mut(&id) {
285                Some(agent) => {
286                    agent.status = AgentStatus::Running;
287                    agent.started_at = Some(Utc::now());
288                }
289                None => anyhow::bail!("Agent {id} not found"),
290            }
291        }
292
293        let _ = self
294            .event_bus
295            .publish(crate::event_bus::KernelEvent::AgentStarted { id });
296
297        tracing::info!(agent_id = %id, seed_id = %seed.id, "Running agent task");
298
299        // Spawn the execution as a tokio task so we can track and abort it.
300        let cancelled = Arc::new(AtomicBool::new(false));
301        let runtime = Arc::clone(&self.runtime);
302        let seed = seed.clone();
303
304        // Share the session context so RecallTiming persists across Seeds.
305        // Uses tokio::sync::RwLock so the guard is Send-safe across .await.
306        let session_ctx = self.session_context.clone();
307
308        let (done_tx, done_rx) = tokio::sync::oneshot::channel::<Result<ExecutionResult>>();
309        let cancelled_done = cancelled.clone();
310        let handle: JoinHandle<()> = tokio::spawn(async move {
311            // Check for cancellation before starting.
312            let result = if cancelled_done.load(Ordering::Relaxed) {
313                Ok(ExecutionResult {
314                    output: "Agent cancelled before execution".into(),
315                    steps_completed: 0,
316                    success: false,
317                    tool_calls: vec![],
318                    tokens_input: 0,
319                    tokens_output: 0,
320                    model_id: String::new(),
321                })
322            } else {
323                let mut ctx = session_ctx.write().await;
324                runtime.execute(id, &seed, &mut ctx).await
325            };
326            // Receiver gone (run_with_seed returned early) → ignore error.
327            let _ = done_tx.send(result);
328        });
329
330        // Store the handle so kill() can abort the task. The handle STAYS in
331        // the map during await — the previous impl removed it before awaiting,
332        // so kill() could never find a live handle to abort (kill was a no-op
333        // for running agents).
334        {
335            let mut handles = self.handles.write();
336            handles.insert(
337                id,
338                AgentHandle {
339                    cancelled,
340                    task: handle,
341                },
342            );
343        }
344
345        // Await completion via the oneshot channel. If kill() aborts the task
346        // (or it panics), done_tx is dropped and this returns Err — treat as
347        // cancellation.
348        let result = match done_rx.await {
349            Ok(res) => res,
350            Err(_) => {
351                let mut handles = self.handles.write();
352                handles.remove(&id);
353                Ok(ExecutionResult {
354                    output: "Agent task aborted".into(),
355                    steps_completed: 0,
356                    success: false,
357                    tool_calls: vec![],
358                    tokens_input: 0,
359                    tokens_output: 0,
360                    model_id: String::new(),
361                })
362            }
363        };
364
365        // Natural completion — remove the handle.
366        {
367            let mut handles = self.handles.write();
368            handles.remove(&id);
369        }
370
371        match result {
372            Ok(result) => {
373                tracing::info!(
374                    agent_id = %id,
375                    success = result.success,
376                    steps = result.steps_completed,
377                    "Agent task completed"
378                );
379
380                {
381                    let mut agents = self.agents.write();
382                    if let Some(agent) = agents.get_mut(&id) {
383                        agent.status = if result.success {
384                            AgentStatus::Idle
385                        } else {
386                            AgentStatus::Failed
387                        };
388                        agent.completed_at = Some(Utc::now());
389                        agent.steps_completed = result.steps_completed;
390                        agent.tool_calls = result
391                            .tool_calls
392                            .iter()
393                            .map(|tc| crate::types::ToolCallRecord {
394                                tool: tc.tool.clone(),
395                                input: tc.input.clone(),
396                                output: tc.output.clone(),
397                                duration_ms: tc.duration_ms,
398                                is_error: tc.is_error,
399                                tool_call_id: tc.tool_call_id.clone(),
400                                timestamp: tc.timestamp,
401                            })
402                            .collect();
403                        agent.tokens_input = result.tokens_input;
404                        agent.tokens_output = result.tokens_output;
405                        agent.model_id = result.model_id.clone();
406                        agent.cost_usd = if !result.model_id.is_empty() {
407                            crate::kernel_handle::engine_api::estimate_cost(
408                                &result.model_id,
409                                result.tokens_input,
410                                result.tokens_output,
411                            )
412                        } else {
413                            0.0
414                        };
415                        if !result.success {
416                            agent.error = Some(result.output.clone());
417                        }
418                    }
419                }
420
421                let _ = self
422                    .event_bus
423                    .publish(crate::event_bus::KernelEvent::AgentStopped { id });
424                self.update_agent_count();
425
426                // Persist to agent history log (async, non-blocking)
427                self.persist_agent(id).await;
428
429                Ok(result)
430            }
431            Err(e) => {
432                tracing::error!(agent_id = %id, error = %e, "Agent task failed");
433
434                {
435                    let mut agents = self.agents.write();
436                    if let Some(agent) = agents.get_mut(&id) {
437                        agent.status = AgentStatus::Failed;
438                        agent.completed_at = Some(Utc::now());
439                        agent.error = Some(e.to_string());
440                    }
441                }
442
443                let _ = self
444                    .event_bus
445                    .publish(crate::event_bus::KernelEvent::AgentFailed {
446                        id,
447                        error: e.to_string(),
448                    });
449                self.update_agent_count();
450
451                // Persist to agent history log (async, non-blocking)
452                self.persist_agent(id).await;
453
454                Ok(ExecutionResult {
455                    output: format!("Agent failed: {e}"),
456                    steps_completed: 0,
457                    success: false,
458                    tool_calls: vec![],
459                    tokens_input: 0,
460                    tokens_output: 0,
461                    model_id: String::new(),
462                })
463            }
464        }
465    }
466
467    async fn wait(&self, id: AgentId) -> Result<AgentStatus> {
468        let agents = self.agents.read();
469        match agents.get(&id) {
470            Some(info) => Ok(info.status),
471            None => anyhow::bail!("Agent {id} not found"),
472        }
473    }
474
475    async fn kill(&self, id: AgentId) -> Result<()> {
476        // Cancel and abort the running task, if any.
477        {
478            let mut handles = self.handles.write();
479            if let Some(agent_handle) = handles.remove(&id) {
480                agent_handle.cancelled.store(true, Ordering::Relaxed);
481                agent_handle.task.abort();
482                tracing::info!(agent_id = %id, "Agent task aborted");
483            }
484        }
485
486        {
487            let mut agents = self.agents.write();
488            if let Some(agent) = agents.get_mut(&id) {
489                agent.status = AgentStatus::Stopped;
490                agent.completed_at = Some(Utc::now());
491            } else {
492                anyhow::bail!("Agent {id} not found");
493            }
494        }
495
496        let _ = self
497            .event_bus
498            .publish(crate::event_bus::KernelEvent::AgentStopped { id });
499        self.update_agent_count();
500
501        // Persist to agent history log (async, non-blocking)
502        self.persist_agent(id).await;
503
504        tracing::info!(agent_id = %id, "Agent killed");
505        Ok(())
506    }
507
508    async fn list(&self) -> Result<Vec<AgentInfo>> {
509        let agents = self.agents.read();
510        Ok(agents.values().cloned().collect())
511    }
512}
513
514impl BasicSupervisor {
515    /// Persist a terminated agent to both filesystem JSON and SQLite.
516    /// Non-blocking: spawns a tokio task for the actual persistence.
517    async fn persist_agent(&self, id: AgentId) {
518        // Snapshot the agent info from the in-memory map
519        let info = {
520            let agents = self.agents.read();
521            agents.get(&id).cloned()
522        };
523
524        let Some(info) = info else { return };
525
526        // 1. Filesystem JSON (source of truth)
527        if let Some(ref store) = self.state_store {
528            let store = store.clone();
529            let info = info.clone();
530            let max_entries = self.agent_log_config.max_entries;
531            let ttl_hours = self.agent_log_config.ttl_hours;
532            let batch_size = self.agent_log_config.prune_batch_size;
533            tokio::spawn(async move {
534                let _ = store
535                    .save_json("agents", &id.to_string(), &info)
536                    .await
537                    .inspect_err(|e| tracing::warn!(agent_id = %id, error = %e, "Failed to persist agent to filesystem"));
538
539                // Prune old records (async, best-effort)
540                if max_entries > 0 || ttl_hours > 0 {
541                    let _ = store
542                        .prune_agents_by_config(max_entries, ttl_hours, batch_size)
543                        .await
544                        .inspect_err(|e| tracing::warn!(error = %e, "Failed to prune agent log"));
545                }
546            });
547        }
548
549        // 2. SQLite (query index)
550        #[cfg(feature = "sqlite-memory")]
551        if let Some(ref db) = self.agent_log_db {
552            let db = db.clone();
553            let info = info.clone();
554            let config = self.agent_log_config.clone();
555            tokio::spawn(async move {
556                let _ = db
557                    .upsert_agent(&info)
558                    .inspect_err(|e| tracing::warn!(agent_id = %id, error = %e, "Failed to upsert agent to SQLite"));
559
560                // Prune old records
561                let _ = db
562                    .prune(&config)
563                    .inspect_err(|e| tracing::warn!(error = %e, "Failed to prune agent SQLite"));
564            });
565        }
566    }
567}
568
569/// A no-op supervisor used during KernelBuilder::build() to break the
570/// KernelHandle → AgentRuntime → Supervisor → KernelHandle cycle.
571///
572/// AgentApi.supervisor is only used for list/kill operations, not during
573/// tool registration, so this placeholder is safe during build time.
574pub struct NoOpSupervisor;
575
576#[async_trait::async_trait]
577impl Supervisor for NoOpSupervisor {
578    async fn fork(&self, _spec: &Seed) -> Result<AgentId> {
579        Err(anyhow::anyhow!(
580            "NoOpSupervisor: fork not available during build"
581        ))
582    }
583    async fn exec(&self, _id: AgentId) -> Result<()> {
584        Err(anyhow::anyhow!(
585            "NoOpSupervisor: exec not available during build"
586        ))
587    }
588    async fn run_with_seed(&self, _id: AgentId, _seed: &Seed) -> Result<ExecutionResult> {
589        Err(anyhow::anyhow!(
590            "NoOpSupervisor: run_with_seed not available during build"
591        ))
592    }
593    async fn wait(&self, _id: AgentId) -> Result<AgentStatus> {
594        Err(anyhow::anyhow!(
595            "NoOpSupervisor: wait not available during build"
596        ))
597    }
598    async fn kill(&self, _id: AgentId) -> Result<()> {
599        Err(anyhow::anyhow!(
600            "NoOpSupervisor: kill not available during build"
601        ))
602    }
603    async fn list(&self) -> Result<Vec<AgentInfo>> {
604        Ok(Vec::new())
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611    use crate::event_bus::EventBus;
612    use crate::types::AgentStatus;
613    // Note: imports kept for potential future test extensions.
614    use oxios_ouroboros::Seed;
615
616    // Note: MockProvider no longer needed — OxiosEngine handles provider resolution.
617    // The engine resolves models internally, so tests just use OxiosEngine::new().
618
619    /// Helper to create a real BasicSupervisor wired to a real EventBus.
620    async fn make_supervisor() -> BasicSupervisor {
621        let event_bus = EventBus::new(64);
622
623        // Build a mock KernelHandle with temp dirs.
624        let tmp = std::env::temp_dir().join(format!("oxios-test-{}", uuid::Uuid::new_v4()));
625        let _ = std::fs::create_dir_all(&tmp);
626
627        let state_store_2 =
628            Arc::new(crate::state_store::StateStore::new(tmp.join("state")).expect("state store"));
629        let state_store = state_store_2.clone();
630        let memory_manager = Arc::new({
631            let mut mm = crate::memory::MemoryManager::new(state_store.clone());
632            mm.set_git_layer(Arc::new(
633                crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
634            ));
635            mm
636        });
637
638        let kernel_handle = Arc::new(crate::KernelHandle::new(
639            crate::kernel_handle::StateApi::new(state_store),
640            crate::kernel_handle::AgentApi::new(
641                Arc::new(crate::supervisor::NoOpSupervisor),
642                Arc::new(crate::budget::BudgetManager::new()),
643                memory_manager.clone(),
644                Some(event_bus.clone()),
645            ),
646            crate::kernel_handle::SecurityApi::new(
647                Arc::new(parking_lot::Mutex::new(crate::auth::AuthManager::new())),
648                Arc::new(oxi_sdk::observability::AuditTrail::new(100)),
649                Arc::new(parking_lot::Mutex::new(
650                    crate::access_manager::AccessManager::new(),
651                )),
652                Arc::new(
653                    crate::state_store::StateStore::new(tmp.join("state2")).expect("state store 2"),
654                ),
655            ),
656            crate::kernel_handle::PersonaApi::new(Arc::new(crate::persona::PersonaManager::new())),
657            crate::kernel_handle::ExtensionApi::new(Arc::new(crate::skill::SkillManager::new(
658                tmp.join("skills"),
659                tmp.join("share/skills"),
660            ))),
661            crate::kernel_handle::McpApi::new(Arc::new(crate::mcp::McpBridge::new())),
662            crate::kernel_handle::InfraApi::new(
663                Arc::new(
664                    crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
665                ),
666                Arc::new(crate::scheduler::AgentScheduler::new(4, 60, 300)),
667                Arc::new(crate::cron::CronScheduler::new(
668                    Arc::new(
669                        crate::state_store::StateStore::new(tmp.join("cron")).expect("cron state"),
670                    ),
671                    60,
672                )),
673                Arc::new(crate::resource_monitor::ResourceMonitor::new(60, 100)),
674                EventBus::new(64),
675                crate::config::OxiosConfig::default(),
676                std::time::Instant::now(),
677            ),
678            None,
679            crate::kernel_handle::ExecApi::new(
680                Arc::new(parking_lot::RwLock::new(
681                    crate::config::ExecConfig::default(),
682                )),
683                Arc::new(parking_lot::Mutex::new(
684                    crate::access_manager::AccessManager::new(),
685                )),
686            ),
687            crate::kernel_handle::A2aApi::new(Arc::new(crate::a2a::A2AProtocol::new(
688                EventBus::new(64),
689            ))),
690            crate::kernel_handle::EngineApi::new(
691                Arc::new(parking_lot::RwLock::new(
692                    crate::config::OxiosConfig::default(),
693                )),
694                tmp.join("config.toml"),
695                Arc::new(crate::kernel_handle::RoutingStats::new()),
696                Arc::new(crate::engine::EngineHandle::new(Arc::new(
697                    crate::OxiosEngine::new("anthropic/claude-sonnet-4-20250514"),
698                ))),
699            ),
700            Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
701            Arc::new(
702                crate::kernel_handle::KnowledgeLens::new(
703                    Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
704                    memory_manager.clone(),
705                )
706                .unwrap(),
707            ),
708            crate::kernel_handle::MarketplaceApi::new(
709                Arc::new(crate::skill::clawhub::ClawHubInstaller::new(
710                    tmp.join("skills"),
711                    tmp.join("state"),
712                    None,
713                )),
714                Arc::new(
715                    crate::skill::clawhub::ClawHubClient::new(None).expect("valid ClawHub client"),
716                ),
717                Arc::new(crate::skill::skills_sh::SkillsShInstaller::new(
718                    tmp.join("skills"),
719                    None,
720                    None,
721                )),
722                Arc::new(
723                    crate::skill::skills_sh::SkillsShClient::new(None, None)
724                        .expect("valid Skills.sh client"),
725                ),
726            ),
727            None, // calendar (not configured in test)
728            None, // email (not configured in test)
729        ));
730
731        let engine = crate::OxiosEngine::new("mock/model");
732        let engine_handle = Arc::new(crate::engine::EngineHandle::new(Arc::new(engine)));
733        let runtime = AgentRuntime::new(engine_handle, kernel_handle, None);
734        BasicSupervisor::new(event_bus, runtime)
735    }
736
737    /// Helper to create a minimal Seed for testing.
738    fn make_seed(goal: &str) -> Seed {
739        Seed {
740            id: uuid::Uuid::new_v4(),
741            goal: goal.to_string(),
742            constraints: vec![],
743            acceptance_criteria: vec![],
744            ontology: vec![],
745            created_at: chrono::Utc::now(),
746            generation: 0,
747            parent_seed_id: None,
748            cspace_hint: None,
749            original_request: String::new(),
750            output_schema: None,
751            project_id: None,
752            workspace_context: None,
753            mount_paths: Vec::new(),
754        }
755    }
756
757    #[tokio::test]
758    async fn test_fork_creates_agent() {
759        let supervisor = make_supervisor().await;
760        let seed = make_seed("Test agent");
761
762        let id = supervisor.fork(&seed).await.unwrap();
763
764        let agents = supervisor.list().await.unwrap();
765        assert_eq!(agents.len(), 1);
766        assert_eq!(agents[0].id, id);
767        assert_eq!(agents[0].name, "Test agent");
768        assert_eq!(agents[0].status, AgentStatus::Starting);
769        assert_eq!(agents[0].seed_id, Some(seed.id));
770    }
771
772    #[tokio::test]
773    async fn test_exec_updates_status_to_running() {
774        let supervisor = make_supervisor().await;
775        let seed = make_seed("Running agent");
776
777        let id = supervisor.fork(&seed).await.unwrap();
778        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Starting);
779
780        supervisor.exec(id).await.unwrap();
781        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
782    }
783
784    #[tokio::test]
785    async fn test_kill_sets_stopped() {
786        let supervisor = make_supervisor().await;
787        let seed = make_seed("Doomed agent");
788
789        let id = supervisor.fork(&seed).await.unwrap();
790        supervisor.exec(id).await.unwrap();
791        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
792
793        supervisor.kill(id).await.unwrap();
794        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Stopped);
795    }
796
797    #[tokio::test]
798    async fn test_kill_unknown_agent_returns_error() {
799        let supervisor = make_supervisor().await;
800        let unknown_id = uuid::Uuid::new_v4();
801
802        let result = supervisor.kill(unknown_id).await;
803        assert!(result.is_err());
804        assert!(result.unwrap_err().to_string().contains("not found"));
805    }
806
807    #[tokio::test]
808    async fn test_list_returns_all_agents() {
809        let supervisor = make_supervisor().await;
810
811        let id1 = supervisor.fork(&make_seed("Agent 1")).await.unwrap();
812        let id2 = supervisor.fork(&make_seed("Agent 2")).await.unwrap();
813        let id3 = supervisor.fork(&make_seed("Agent 3")).await.unwrap();
814
815        let agents = supervisor.list().await.unwrap();
816        assert_eq!(agents.len(), 3);
817
818        let ids: std::collections::HashSet<AgentId> = agents.iter().map(|a| a.id).collect();
819        assert!(ids.contains(&id1));
820        assert!(ids.contains(&id2));
821        assert!(ids.contains(&id3));
822    }
823
824    #[tokio::test]
825    async fn test_exec_unknown_agent_returns_error() {
826        let supervisor = make_supervisor().await;
827        let unknown_id = uuid::Uuid::new_v4();
828
829        let result = supervisor.exec(unknown_id).await;
830        assert!(result.is_err());
831        assert!(result.unwrap_err().to_string().contains("not found"));
832    }
833
834    #[tokio::test]
835    async fn test_wait_unknown_agent_returns_error() {
836        let supervisor = make_supervisor().await;
837        let unknown_id = uuid::Uuid::new_v4();
838
839        let result = supervisor.wait(unknown_id).await;
840        assert!(result.is_err());
841        assert!(result.unwrap_err().to_string().contains("not found"));
842    }
843}