Skip to main content

oxios_kernel/
supervisor.rs

1//! Supervisor: agent lifecycle management.
2//!
3//! The supervisor handles forking, executing, monitoring, and
4//! terminating agent instances. It is the "init" of Oxios.
5//!
6//! When an agent is forked and executed, the supervisor delegates
7//! the actual tool-calling loop to the [`AgentRuntime`].
8//!
9//! # Agent Pool & Session Persistence
10//!
11//! Agents are retained in an [`AgentPool`] after execution for:
12//! - **Session continuation** via `Agent::continue_with()` — multi-turn
13//!   conversations without re-creating the agent.
14//! - **State export/import** — serialize agent conversation history to
15//!   JSON for crash recovery, migration, or debugging.
16//! - **Provider rate limiting** — all agents share a [`ProviderPool`] to
17//!   respect per-provider RPM/concurrency limits.
18
19use anyhow::Result;
20use async_trait::async_trait;
21use chrono::Utc;
22use oxi_sdk::Agent;
23use oxios_ouroboros::{Directive, ExecEnv, Seed};
24use parking_lot::RwLock;
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, Ordering};
28use tokio::task::JoinHandle;
29
30use crate::agent_runtime::AgentRuntime;
31use crate::config::AgentLogConfig;
32use crate::event_bus::EventBus;
33use crate::resource_monitor::ResourceMonitor;
34use crate::session_context::SessionContext;
35use crate::state_store::StateStore;
36use crate::types::{AgentId, AgentInfo, AgentStatus};
37use oxios_ouroboros::ExecutionResult;
38
39#[cfg(feature = "sqlite-memory")]
40use crate::agent_log_db::AgentLogDb;
41
42/// Tracks the runtime handles needed to cancel a running agent.
43struct AgentHandle {
44    /// Flag set on `kill()` to cooperatively signal cancellation.
45    cancelled: Arc<AtomicBool>,
46    /// The tokio task running the agent execution. Aborted on `kill()`.
47    task: JoinHandle<()>,
48}
49
50/// Pool of live `Agent` instances, keyed by AgentId.
51///
52/// Retains agents after execution for:
53/// - **State persistence** — `export_state()` serializes conversation history
54///   to JSON for crash recovery, migration, or debugging.
55/// - **State restoration** — `import_state()` restores a previous session.
56#[derive(Default)]
57pub struct AgentPool {
58    agents: RwLock<HashMap<AgentId, Arc<Agent>>>,
59}
60
61impl AgentPool {
62    /// Create an empty agent pool.
63    pub fn new() -> Self {
64        Self {
65            agents: RwLock::new(HashMap::new()),
66        }
67    }
68
69    /// Insert an agent into the pool.
70    pub fn insert(&self, id: AgentId, agent: Arc<Agent>) {
71        self.agents.write().insert(id, agent);
72    }
73
74    /// Get a pooled agent by ID.
75    pub fn get(&self, id: &AgentId) -> Option<Arc<Agent>> {
76        self.agents.read().get(id).cloned()
77    }
78
79    /// Remove an agent from the pool.
80    pub fn remove(&self, id: &AgentId) -> Option<Arc<Agent>> {
81        self.agents.write().remove(id)
82    }
83
84    /// Export an agent's state as JSON.
85    ///
86    /// Returns `None` if the agent is not in the pool or export fails.
87    pub fn export_state(&self, id: &AgentId) -> Option<serde_json::Value> {
88        self.agents
89            .read()
90            .get(id)
91            .and_then(|agent| agent.export_state().ok())
92    }
93
94    /// Import agent state from JSON.
95    ///
96    /// Returns `false` if the agent is not in the pool or import fails.
97    pub fn import_state(&self, id: &AgentId, state: serde_json::Value) -> bool {
98        if let Some(agent) = self.agents.read().get(id) {
99            agent.import_state(state).is_ok()
100        } else {
101            false
102        }
103    }
104
105    /// Number of agents currently in the pool.
106    pub fn len(&self) -> usize {
107        self.agents.read().len()
108    }
109
110    /// Whether the pool is empty.
111    pub fn is_empty(&self) -> bool {
112        self.agents.read().is_empty()
113    }
114}
115
116/// Supervisor trait for managing agent lifecycles.
117#[async_trait]
118pub trait Supervisor: Send + Sync {
119    /// Fork a new agent from a seed specification.
120    async fn fork(&self, spec: &Seed) -> Result<AgentId>;
121
122    /// Start executing an agent.
123    async fn exec(&self, id: AgentId) -> Result<()>;
124
125    /// Fork and execute an agent with its seed, running to completion.
126    /// Returns the execution result from the agent runtime.
127    async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult>;
128
129    /// Fork a new agent from a Directive and ExecEnv (RFC-027).
130    ///
131    /// Mirrors [`Supervisor::fork`] but reads the goal / project_id from
132    /// the unified-intent types. The resulting agent has no `seed_id` (a
133    /// Directive has no stable per-execution UUID yet — Phase 6 will mint
134    /// one in `Orchestrator::handle()`).
135    async fn fork_directive(&self, directive: &Directive, env: &ExecEnv) -> Result<AgentId>;
136
137    /// Fork and execute an agent with a Directive + ExecEnv, running to completion.
138    ///
139    /// Mirrors [`Supervisor::run_with_seed`] but dispatches to
140    /// `AgentRuntime::execute_directive_with_session`. The legacy Seed
141    /// variants stay until Phase 6 removes them.
142    async fn run_with_directive(
143        &self,
144        id: AgentId,
145        directive: &Directive,
146        env: &ExecEnv,
147    ) -> Result<ExecutionResult>;
148
149    /// Wait for an agent to complete and return its final status.
150    async fn wait(&self, id: AgentId) -> Result<AgentStatus>;
151
152    /// Terminate an agent.
153    async fn kill(&self, id: AgentId) -> Result<()>;
154
155    /// List all known agents.
156    async fn list(&self) -> Result<Vec<AgentInfo>>;
157}
158
159/// Basic in-memory supervisor implementation with AgentRuntime integration.
160pub struct BasicSupervisor {
161    agents: RwLock<HashMap<AgentId, AgentInfo>>,
162    /// Per-agent cancellation tokens and join handles for task abortion.
163    handles: RwLock<HashMap<AgentId, AgentHandle>>,
164    /// Pool of live Agent instances for session continuation.
165    agent_pool: AgentPool,
166    event_bus: EventBus,
167    runtime: Arc<AgentRuntime>,
168    resource_monitor: Option<Arc<ResourceMonitor>>,
169    /// Session context for proactive recall timing (RFC-020).
170    /// Shared across all Seed executions within this supervisor's lifetime
171    /// so that RecallTiming can track message count and topic changes.
172    /// Uses tokio::sync::RwLock (not parking_lot) so the guard is Send,
173    /// allowing it to be held across .await in tokio::spawn.
174    session_context: Arc<tokio::sync::RwLock<SessionContext>>,
175    /// Filesystem state store for agent persistence (JSON files).
176    state_store: Option<Arc<StateStore>>,
177    /// SQLite-backed agent history query index.
178    #[cfg(feature = "sqlite-memory")]
179    agent_log_db: Option<Arc<AgentLogDb>>,
180    /// Agent log retention configuration.
181    agent_log_config: AgentLogConfig,
182}
183
184impl BasicSupervisor {
185    /// Creates a new supervisor with the given event bus and agent runtime.
186    pub fn new(event_bus: EventBus, runtime: AgentRuntime) -> Self {
187        Self {
188            agents: RwLock::new(HashMap::new()),
189            handles: RwLock::new(HashMap::new()),
190            agent_pool: AgentPool::new(),
191            event_bus,
192            runtime: Arc::new(runtime),
193            resource_monitor: None,
194            session_context: Arc::new(tokio::sync::RwLock::new(SessionContext::new())),
195            state_store: None,
196            #[cfg(feature = "sqlite-memory")]
197            agent_log_db: None,
198            agent_log_config: AgentLogConfig::default(),
199        }
200    }
201
202    /// Attach a filesystem state store for agent history persistence.
203    pub fn set_state_store(&mut self, store: Arc<StateStore>) {
204        self.state_store = Some(store);
205    }
206
207    /// Attach a SQLite-backed agent history log database.
208    #[cfg(feature = "sqlite-memory")]
209    pub fn set_agent_log_db(&mut self, db: Arc<AgentLogDb>) {
210        self.agent_log_db = Some(db);
211    }
212
213    /// Set agent log retention configuration.
214    pub fn set_agent_log_config(&mut self, config: AgentLogConfig) {
215        self.agent_log_config = config;
216    }
217
218    /// Attach a resource monitor for agent count tracking.
219    pub fn set_resource_monitor(&mut self, rm: Arc<ResourceMonitor>) {
220        self.resource_monitor = Some(rm);
221    }
222
223    /// Update the resource monitor with current active agent count.
224    fn update_agent_count(&self) {
225        if let Some(ref rm) = self.resource_monitor {
226            let count = self.agents.read().len();
227            rm.set_active_agents(count);
228        }
229    }
230
231    /// Access the agent pool for session continuation.
232    pub fn pool(&self) -> &AgentPool {
233        &self.agent_pool
234    }
235}
236
237#[async_trait]
238impl Supervisor for BasicSupervisor {
239    async fn fork(&self, spec: &Seed) -> Result<AgentId> {
240        let id = AgentId::new_v4();
241        let info = AgentInfo {
242            id,
243            name: spec.goal.clone(),
244            status: AgentStatus::Starting,
245            created_at: Utc::now(),
246            seed_id: Some(spec.id),
247            project_id: spec.project_id,
248            started_at: None,
249            completed_at: None,
250            error: None,
251            steps_completed: 0,
252            steps_total: None,
253            tool_calls: vec![],
254            tokens_input: 0,
255            tokens_output: 0,
256            cost_usd: 0.0,
257            model_id: String::new(),
258            session_id: None,
259        };
260
261        {
262            let mut agents = self.agents.write();
263            agents.insert(id, info);
264        }
265
266        self.update_agent_count();
267
268        let _ = self
269            .event_bus
270            .publish(crate::event_bus::KernelEvent::AgentCreated {
271                id,
272                name: spec.goal.clone(),
273            });
274
275        tracing::info!(agent_id = %id, "Forked new agent from seed");
276        Ok(id)
277    }
278
279    async fn fork_directive(&self, directive: &Directive, env: &ExecEnv) -> Result<AgentId> {
280        let id = AgentId::new_v4();
281        let info = AgentInfo {
282            id,
283            name: directive.goal.clone(),
284            status: AgentStatus::Starting,
285            created_at: Utc::now(),
286            // Directive has no per-execution UUID yet (Phase 6). Leave
287            // seed_id None to mark this as a directive-spawned agent.
288            seed_id: None,
289            project_id: env.project_id,
290            started_at: None,
291            completed_at: None,
292            error: None,
293            steps_completed: 0,
294            steps_total: None,
295            tool_calls: vec![],
296            tokens_input: 0,
297            tokens_output: 0,
298            cost_usd: 0.0,
299            model_id: String::new(),
300            session_id: None,
301        };
302
303        {
304            let mut agents = self.agents.write();
305            agents.insert(id, info);
306        }
307
308        self.update_agent_count();
309
310        let _ = self
311            .event_bus
312            .publish(crate::event_bus::KernelEvent::AgentCreated {
313                id,
314                name: directive.goal.clone(),
315            });
316
317        tracing::info!(agent_id = %id, "Forked new agent from directive");
318        Ok(id)
319    }
320
321    async fn exec(&self, id: AgentId) -> Result<()> {
322        {
323            let mut agents = self.agents.write();
324            match agents.get_mut(&id) {
325                Some(agent) => {
326                    agent.status = AgentStatus::Running;
327                }
328                None => anyhow::bail!("Agent {id} not found"),
329            }
330        }
331
332        self.update_agent_count();
333
334        let _ = self
335            .event_bus
336            .publish(crate::event_bus::KernelEvent::AgentStarted { id });
337        tracing::info!(agent_id = %id, "Agent execution started");
338
339        Ok(())
340    }
341
342    async fn run_with_seed(&self, id: AgentId, seed: &Seed) -> Result<ExecutionResult> {
343        // Mark as running.
344        {
345            let mut agents = self.agents.write();
346            match agents.get_mut(&id) {
347                Some(agent) => {
348                    agent.status = AgentStatus::Running;
349                    agent.started_at = Some(Utc::now());
350                }
351                None => anyhow::bail!("Agent {id} not found"),
352            }
353        }
354
355        let _ = self
356            .event_bus
357            .publish(crate::event_bus::KernelEvent::AgentStarted { id });
358
359        tracing::info!(agent_id = %id, seed_id = %seed.id, "Running agent task");
360
361        // Spawn the execution as a tokio task so we can track and abort it.
362        let cancelled = Arc::new(AtomicBool::new(false));
363        let runtime = Arc::clone(&self.runtime);
364        let seed = seed.clone();
365
366        // Share the session context so RecallTiming persists across Seeds.
367        // Uses tokio::sync::RwLock so the guard is Send-safe across .await.
368        let session_ctx = self.session_context.clone();
369
370        let (done_tx, done_rx) = tokio::sync::oneshot::channel::<Result<ExecutionResult>>();
371        let cancelled_done = cancelled.clone();
372        let handle: JoinHandle<()> = tokio::spawn(async move {
373            // Check for cancellation before starting.
374            let result = if cancelled_done.load(Ordering::Relaxed) {
375                Ok(ExecutionResult {
376                    output: "Agent cancelled before execution".into(),
377                    steps_completed: 0,
378                    success: false,
379                    tool_calls: vec![],
380                    tokens_input: 0,
381                    tokens_output: 0,
382                    model_id: String::new(),
383                })
384            } else {
385                let mut ctx = session_ctx.write().await;
386                runtime.execute(id, &seed, &mut ctx).await
387            };
388            // Receiver gone (run_with_seed returned early) → ignore error.
389            let _ = done_tx.send(result);
390        });
391
392        // Store the handle so kill() can abort the task. The handle STAYS in
393        // the map during await — the previous impl removed it before awaiting,
394        // so kill() could never find a live handle to abort (kill was a no-op
395        // for running agents).
396        {
397            let mut handles = self.handles.write();
398            handles.insert(
399                id,
400                AgentHandle {
401                    cancelled,
402                    task: handle,
403                },
404            );
405        }
406
407        // Await completion via the oneshot channel. If kill() aborts the task
408        // (or it panics), done_tx is dropped and this returns Err — treat as
409        // cancellation.
410        let result = match done_rx.await {
411            Ok(res) => res,
412            Err(_) => {
413                let mut handles = self.handles.write();
414                handles.remove(&id);
415                Ok(ExecutionResult {
416                    output: "Agent task aborted".into(),
417                    steps_completed: 0,
418                    success: false,
419                    tool_calls: vec![],
420                    tokens_input: 0,
421                    tokens_output: 0,
422                    model_id: String::new(),
423                })
424            }
425        };
426
427        // Natural completion — remove the handle.
428        {
429            let mut handles = self.handles.write();
430            handles.remove(&id);
431        }
432
433        match result {
434            Ok(result) => {
435                tracing::info!(
436                    agent_id = %id,
437                    success = result.success,
438                    steps = result.steps_completed,
439                    "Agent task completed"
440                );
441
442                {
443                    let mut agents = self.agents.write();
444                    if let Some(agent) = agents.get_mut(&id) {
445                        agent.status = if result.success {
446                            AgentStatus::Idle
447                        } else {
448                            AgentStatus::Failed
449                        };
450                        agent.completed_at = Some(Utc::now());
451                        agent.steps_completed = result.steps_completed;
452                        agent.tool_calls = result
453                            .tool_calls
454                            .iter()
455                            .map(|tc| crate::types::ToolCallRecord {
456                                tool: tc.tool.clone(),
457                                input: tc.input.clone(),
458                                output: tc.output.clone(),
459                                duration_ms: tc.duration_ms,
460                                is_error: tc.is_error,
461                                tool_call_id: tc.tool_call_id.clone(),
462                                timestamp: tc.timestamp,
463                            })
464                            .collect();
465                        agent.tokens_input = result.tokens_input;
466                        agent.tokens_output = result.tokens_output;
467                        agent.model_id = result.model_id.clone();
468                        agent.cost_usd = if !result.model_id.is_empty() {
469                            crate::kernel_handle::engine_api::estimate_cost(
470                                &result.model_id,
471                                result.tokens_input,
472                                result.tokens_output,
473                            )
474                        } else {
475                            0.0
476                        };
477                        if !result.success {
478                            agent.error = Some(result.output.clone());
479                        }
480                    }
481                }
482
483                let _ = self
484                    .event_bus
485                    .publish(crate::event_bus::KernelEvent::AgentStopped {
486                        id,
487                        success: result.success,
488                    });
489                self.update_agent_count();
490
491                // Persist to agent history log (async, non-blocking)
492                self.persist_agent(id).await;
493
494                Ok(result)
495            }
496            Err(e) => {
497                tracing::error!(agent_id = %id, error = %e, "Agent task failed");
498
499                {
500                    let mut agents = self.agents.write();
501                    if let Some(agent) = agents.get_mut(&id) {
502                        agent.status = AgentStatus::Failed;
503                        agent.completed_at = Some(Utc::now());
504                        agent.error = Some(e.to_string());
505                    }
506                }
507
508                let _ = self
509                    .event_bus
510                    .publish(crate::event_bus::KernelEvent::AgentFailed {
511                        id,
512                        error: e.to_string(),
513                    });
514                self.update_agent_count();
515
516                // Persist to agent history log (async, non-blocking)
517                self.persist_agent(id).await;
518
519                Ok(ExecutionResult {
520                    output: format!("Agent failed: {e}"),
521                    steps_completed: 0,
522                    success: false,
523                    tool_calls: vec![],
524                    tokens_input: 0,
525                    tokens_output: 0,
526                    model_id: String::new(),
527                })
528            }
529        }
530    }
531
532    async fn run_with_directive(
533        &self,
534        id: AgentId,
535        directive: &Directive,
536        env: &ExecEnv,
537    ) -> Result<ExecutionResult> {
538        // Mark as running.
539        {
540            let mut agents = self.agents.write();
541            match agents.get_mut(&id) {
542                Some(agent) => {
543                    agent.status = AgentStatus::Running;
544                    agent.started_at = Some(Utc::now());
545                }
546                None => anyhow::bail!("Agent {id} not found"),
547            }
548        }
549
550        let _ = self
551            .event_bus
552            .publish(crate::event_bus::KernelEvent::AgentStarted { id });
553
554        tracing::info!(agent_id = %id, "Running agent task from directive");
555
556        // Spawn the execution as a tokio task so we can track and abort it.
557        let cancelled = Arc::new(AtomicBool::new(false));
558        let runtime = Arc::clone(&self.runtime);
559        let directive = directive.clone();
560        let env = env.clone();
561
562        // Share the session context so RecallTiming persists across directives.
563        // Uses tokio::sync::RwLock so the guard is Send-safe across .await.
564        let session_ctx = self.session_context.clone();
565
566        let (done_tx, done_rx) = tokio::sync::oneshot::channel::<Result<ExecutionResult>>();
567        let cancelled_done = cancelled.clone();
568        let handle: JoinHandle<()> = tokio::spawn(async move {
569            // Check for cancellation before starting.
570            let result = if cancelled_done.load(Ordering::Relaxed) {
571                Ok(ExecutionResult {
572                    output: "Agent cancelled before execution".into(),
573                    steps_completed: 0,
574                    success: false,
575                    tool_calls: vec![],
576                    tokens_input: 0,
577                    tokens_output: 0,
578                    model_id: String::new(),
579                })
580            } else {
581                let mut ctx = session_ctx.write().await;
582                runtime
583                    .execute_directive(id, &directive, &env, &mut ctx)
584                    .await
585            };
586            // Receiver gone (run_with_directive returned early) → ignore error.
587            let _ = done_tx.send(result);
588        });
589
590        // Store the handle so kill() can abort the task.
591        {
592            let mut handles = self.handles.write();
593            handles.insert(
594                id,
595                AgentHandle {
596                    cancelled,
597                    task: handle,
598                },
599            );
600        }
601
602        // Await completion via the oneshot channel. If kill() aborts the task
603        // (or it panics), done_tx is dropped and this returns Err — treat as
604        // cancellation.
605        let result = match done_rx.await {
606            Ok(res) => res,
607            Err(_) => {
608                let mut handles = self.handles.write();
609                handles.remove(&id);
610                Ok(ExecutionResult {
611                    output: "Agent task aborted".into(),
612                    steps_completed: 0,
613                    success: false,
614                    tool_calls: vec![],
615                    tokens_input: 0,
616                    tokens_output: 0,
617                    model_id: String::new(),
618                })
619            }
620        };
621
622        // Natural completion — remove the handle.
623        {
624            let mut handles = self.handles.write();
625            handles.remove(&id);
626        }
627
628        match result {
629            Ok(result) => {
630                tracing::info!(
631                    agent_id = %id,
632                    success = result.success,
633                    steps = result.steps_completed,
634                    "Agent task completed (directive)"
635                );
636
637                {
638                    let mut agents = self.agents.write();
639                    if let Some(agent) = agents.get_mut(&id) {
640                        agent.status = if result.success {
641                            AgentStatus::Idle
642                        } else {
643                            AgentStatus::Failed
644                        };
645                        agent.completed_at = Some(Utc::now());
646                        agent.steps_completed = result.steps_completed;
647                        agent.tool_calls = result
648                            .tool_calls
649                            .iter()
650                            .map(|tc| crate::types::ToolCallRecord {
651                                tool: tc.tool.clone(),
652                                input: tc.input.clone(),
653                                output: tc.output.clone(),
654                                duration_ms: tc.duration_ms,
655                                is_error: tc.is_error,
656                                tool_call_id: tc.tool_call_id.clone(),
657                                timestamp: tc.timestamp,
658                            })
659                            .collect();
660                        agent.tokens_input = result.tokens_input;
661                        agent.tokens_output = result.tokens_output;
662                        agent.model_id = result.model_id.clone();
663                        agent.cost_usd = if !result.model_id.is_empty() {
664                            crate::kernel_handle::engine_api::estimate_cost(
665                                &result.model_id,
666                                result.tokens_input,
667                                result.tokens_output,
668                            )
669                        } else {
670                            0.0
671                        };
672                        if !result.success {
673                            agent.error = Some(result.output.clone());
674                        }
675                    }
676                }
677
678                let _ = self
679                    .event_bus
680                    .publish(crate::event_bus::KernelEvent::AgentStopped {
681                        id,
682                        success: result.success,
683                    });
684                self.update_agent_count();
685
686                // Persist to agent history log (async, non-blocking)
687                self.persist_agent(id).await;
688
689                Ok(result)
690            }
691            Err(e) => {
692                tracing::error!(agent_id = %id, error = %e, "Agent task failed (directive)");
693
694                {
695                    let mut agents = self.agents.write();
696                    if let Some(agent) = agents.get_mut(&id) {
697                        agent.status = AgentStatus::Failed;
698                        agent.completed_at = Some(Utc::now());
699                        agent.error = Some(e.to_string());
700                    }
701                }
702
703                let _ = self
704                    .event_bus
705                    .publish(crate::event_bus::KernelEvent::AgentFailed {
706                        id,
707                        error: e.to_string(),
708                    });
709                self.update_agent_count();
710
711                // Persist to agent history log (async, non-blocking)
712                self.persist_agent(id).await;
713
714                Ok(ExecutionResult {
715                    output: format!("Agent failed: {e}"),
716                    steps_completed: 0,
717                    success: false,
718                    tool_calls: vec![],
719                    tokens_input: 0,
720                    tokens_output: 0,
721                    model_id: String::new(),
722                })
723            }
724        }
725    }
726
727    async fn wait(&self, id: AgentId) -> Result<AgentStatus> {
728        let agents = self.agents.read();
729        match agents.get(&id) {
730            Some(info) => Ok(info.status),
731            None => anyhow::bail!("Agent {id} not found"),
732        }
733    }
734
735    async fn kill(&self, id: AgentId) -> Result<()> {
736        // Cancel and abort the running task, if any.
737        {
738            let mut handles = self.handles.write();
739            if let Some(agent_handle) = handles.remove(&id) {
740                agent_handle.cancelled.store(true, Ordering::Relaxed);
741                agent_handle.task.abort();
742                tracing::info!(agent_id = %id, "Agent task aborted");
743            }
744        }
745
746        {
747            let mut agents = self.agents.write();
748            if let Some(agent) = agents.get_mut(&id) {
749                agent.status = AgentStatus::Stopped;
750                agent.completed_at = Some(Utc::now());
751            } else {
752                anyhow::bail!("Agent {id} not found");
753            }
754        }
755
756        let _ = self
757            .event_bus
758            .publish(crate::event_bus::KernelEvent::AgentStopped { id, success: false });
759        self.update_agent_count();
760
761        // Persist to agent history log (async, non-blocking)
762        self.persist_agent(id).await;
763
764        tracing::info!(agent_id = %id, "Agent killed");
765        Ok(())
766    }
767
768    async fn list(&self) -> Result<Vec<AgentInfo>> {
769        let agents = self.agents.read();
770        Ok(agents.values().cloned().collect())
771    }
772}
773
774impl BasicSupervisor {
775    /// Persist a terminated agent to both filesystem JSON and SQLite.
776    /// Non-blocking: spawns a tokio task for the actual persistence.
777    async fn persist_agent(&self, id: AgentId) {
778        // Snapshot the agent info from the in-memory map
779        let info = {
780            let agents = self.agents.read();
781            agents.get(&id).cloned()
782        };
783
784        let Some(info) = info else { return };
785
786        // 1. Filesystem JSON (source of truth)
787        if let Some(ref store) = self.state_store {
788            let store = store.clone();
789            let info = info.clone();
790            let max_entries = self.agent_log_config.max_entries;
791            let ttl_hours = self.agent_log_config.ttl_hours;
792            let batch_size = self.agent_log_config.prune_batch_size;
793            tokio::spawn(async move {
794                let _ = store
795                    .save_json("agents", &id.to_string(), &info)
796                    .await
797                    .inspect_err(|e| tracing::warn!(agent_id = %id, error = %e, "Failed to persist agent to filesystem"));
798
799                // Prune old records (async, best-effort)
800                if max_entries > 0 || ttl_hours > 0 {
801                    let _ = store
802                        .prune_agents_by_config(max_entries, ttl_hours, batch_size)
803                        .await
804                        .inspect_err(|e| tracing::warn!(error = %e, "Failed to prune agent log"));
805                }
806            });
807        }
808
809        // 2. SQLite (query index)
810        #[cfg(feature = "sqlite-memory")]
811        if let Some(ref db) = self.agent_log_db {
812            let db = db.clone();
813            let info = info.clone();
814            let config = self.agent_log_config.clone();
815            tokio::spawn(async move {
816                let _ = db
817                    .upsert_agent(&info)
818                    .inspect_err(|e| tracing::warn!(agent_id = %id, error = %e, "Failed to upsert agent to SQLite"));
819
820                // Prune old records
821                let _ = db
822                    .prune(&config)
823                    .inspect_err(|e| tracing::warn!(error = %e, "Failed to prune agent SQLite"));
824            });
825        }
826    }
827}
828
829/// A no-op supervisor used during KernelBuilder::build() to break the
830/// KernelHandle → AgentRuntime → Supervisor → KernelHandle cycle.
831///
832/// AgentApi.supervisor is only used for list/kill operations, not during
833/// tool registration, so this placeholder is safe during build time.
834pub struct NoOpSupervisor;
835
836#[async_trait::async_trait]
837impl Supervisor for NoOpSupervisor {
838    async fn fork(&self, _spec: &Seed) -> Result<AgentId> {
839        Err(anyhow::anyhow!(
840            "NoOpSupervisor: fork not available during build"
841        ))
842    }
843    async fn exec(&self, _id: AgentId) -> Result<()> {
844        Err(anyhow::anyhow!(
845            "NoOpSupervisor: exec not available during build"
846        ))
847    }
848    async fn run_with_seed(&self, _id: AgentId, _seed: &Seed) -> Result<ExecutionResult> {
849        Err(anyhow::anyhow!(
850            "NoOpSupervisor: run_with_seed not available during build"
851        ))
852    }
853    async fn fork_directive(&self, _directive: &Directive, _env: &ExecEnv) -> Result<AgentId> {
854        Err(anyhow::anyhow!(
855            "NoOpSupervisor: fork_directive not available during build"
856        ))
857    }
858    async fn run_with_directive(
859        &self,
860        _id: AgentId,
861        _directive: &Directive,
862        _env: &ExecEnv,
863    ) -> Result<ExecutionResult> {
864        Err(anyhow::anyhow!(
865            "NoOpSupervisor: run_with_directive not available during build"
866        ))
867    }
868    async fn wait(&self, _id: AgentId) -> Result<AgentStatus> {
869        Err(anyhow::anyhow!(
870            "NoOpSupervisor: wait not available during build"
871        ))
872    }
873    async fn kill(&self, _id: AgentId) -> Result<()> {
874        Err(anyhow::anyhow!(
875            "NoOpSupervisor: kill not available during build"
876        ))
877    }
878    async fn list(&self) -> Result<Vec<AgentInfo>> {
879        Ok(Vec::new())
880    }
881}
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886    use crate::event_bus::EventBus;
887    use crate::types::AgentStatus;
888    // Note: imports kept for potential future test extensions.
889    use oxios_ouroboros::Seed;
890
891    // Note: MockProvider no longer needed — OxiosEngine handles provider resolution.
892    // The engine resolves models internally, so tests just use OxiosEngine::new().
893
894    /// Helper to create a real BasicSupervisor wired to a real EventBus.
895    async fn make_supervisor() -> BasicSupervisor {
896        let event_bus = EventBus::new(64);
897
898        // Build a mock KernelHandle with temp dirs.
899        let tmp = std::env::temp_dir().join(format!("oxios-test-{}", uuid::Uuid::new_v4()));
900        let _ = std::fs::create_dir_all(&tmp);
901
902        let state_store_2 =
903            Arc::new(crate::state_store::StateStore::new(tmp.join("state")).expect("state store"));
904        let state_store = state_store_2.clone();
905        let memory_manager = Arc::new({
906            let mut mm = crate::memory::MemoryManager::new(state_store.clone());
907            mm.set_git_layer(Arc::new(
908                crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
909            ));
910            mm
911        });
912
913        let kernel_handle = Arc::new(crate::KernelHandle::new(
914            crate::kernel_handle::StateApi::new(state_store),
915            crate::kernel_handle::AgentApi::new(
916                Arc::new(crate::supervisor::NoOpSupervisor),
917                Arc::new(crate::budget::BudgetManager::new()),
918                memory_manager.clone(),
919                Some(event_bus.clone()),
920            ),
921            crate::kernel_handle::SecurityApi::new(
922                Arc::new(parking_lot::Mutex::new(crate::auth::AuthManager::new())),
923                Arc::new(oxi_sdk::observability::AuditTrail::new(100)),
924                Arc::new(parking_lot::Mutex::new(
925                    crate::access_manager::AccessManager::new(),
926                )),
927                Arc::new(
928                    crate::state_store::StateStore::new(tmp.join("state2")).expect("state store 2"),
929                ),
930            ),
931            crate::kernel_handle::PersonaApi::new(Arc::new(crate::persona::PersonaManager::new())),
932            crate::kernel_handle::ExtensionApi::new(Arc::new(crate::skill::SkillManager::new(
933                tmp.join("skills"),
934                tmp.join("share/skills"),
935            ))),
936            crate::kernel_handle::McpApi::new(Arc::new(crate::mcp::McpBridge::new())),
937            crate::kernel_handle::InfraApi::new(
938                Arc::new(
939                    crate::git_layer::GitLayer::new(tmp.join("git"), false).expect("git layer"),
940                ),
941                Arc::new(crate::scheduler::AgentScheduler::new(4, 60, 300)),
942                Arc::new(crate::cron::CronScheduler::new(
943                    Arc::new(
944                        crate::state_store::StateStore::new(tmp.join("cron")).expect("cron state"),
945                    ),
946                    60,
947                )),
948                Arc::new(crate::resource_monitor::ResourceMonitor::new(60, 100)),
949                EventBus::new(64),
950                crate::config::OxiosConfig::default(),
951                std::time::Instant::now(),
952            ),
953            None,
954            crate::kernel_handle::ExecApi::new(
955                Arc::new(parking_lot::RwLock::new(
956                    crate::config::ExecConfig::default(),
957                )),
958                Arc::new(parking_lot::Mutex::new(
959                    crate::access_manager::AccessManager::new(),
960                )),
961            ),
962            crate::kernel_handle::A2aApi::new(Arc::new(crate::a2a::A2AProtocol::new(
963                EventBus::new(64),
964            ))),
965            crate::kernel_handle::EngineApi::new(
966                Arc::new(parking_lot::RwLock::new(
967                    crate::config::OxiosConfig::default(),
968                )),
969                tmp.join("config.toml"),
970                Arc::new(crate::kernel_handle::RoutingStats::new()),
971                Arc::new(crate::engine::EngineHandle::new(Arc::new(
972                    crate::OxiosEngine::new("anthropic/claude-sonnet-4-20250514"),
973                ))),
974            ),
975            Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
976            Arc::new(
977                crate::kernel_handle::KnowledgeLens::new(
978                    Arc::new(oxios_markdown::KnowledgeBase::new(tmp.join("knowledge")).unwrap()),
979                    memory_manager.clone(),
980                )
981                .unwrap(),
982            ),
983            crate::kernel_handle::MarketplaceApi::new(
984                Arc::new(crate::skill::clawhub::ClawHubInstaller::new(
985                    tmp.join("skills"),
986                    tmp.join("state"),
987                    None,
988                )),
989                Arc::new(
990                    crate::skill::clawhub::ClawHubClient::new(None).expect("valid ClawHub client"),
991                ),
992                Arc::new(crate::skill::skills_sh::SkillsShInstaller::new(
993                    tmp.join("skills"),
994                    None,
995                    None,
996                )),
997                Arc::new(
998                    crate::skill::skills_sh::SkillsShClient::new(None, None)
999                        .expect("valid Skills.sh client"),
1000                ),
1001            ),
1002            None, // calendar (not configured in test)
1003            None, // email (not configured in test)
1004        ));
1005
1006        let engine = crate::OxiosEngine::new("mock/model");
1007        let engine_handle = Arc::new(crate::engine::EngineHandle::new(Arc::new(engine)));
1008        let runtime = AgentRuntime::new(engine_handle, kernel_handle, None);
1009        BasicSupervisor::new(event_bus, runtime)
1010    }
1011
1012    /// Helper to create a minimal Seed for testing.
1013    fn make_seed(goal: &str) -> Seed {
1014        Seed {
1015            id: uuid::Uuid::new_v4(),
1016            goal: goal.to_string(),
1017            constraints: vec![],
1018            acceptance_criteria: vec![],
1019            ontology: vec![],
1020            created_at: chrono::Utc::now(),
1021            generation: 0,
1022            parent_seed_id: None,
1023            cspace_hint: None,
1024            original_request: String::new(),
1025            output_schema: None,
1026            project_id: None,
1027            workspace_context: None,
1028            mount_paths: Vec::new(),
1029        }
1030    }
1031
1032    #[tokio::test]
1033    async fn test_fork_creates_agent() {
1034        let supervisor = make_supervisor().await;
1035        let seed = make_seed("Test agent");
1036
1037        let id = supervisor.fork(&seed).await.unwrap();
1038
1039        let agents = supervisor.list().await.unwrap();
1040        assert_eq!(agents.len(), 1);
1041        assert_eq!(agents[0].id, id);
1042        assert_eq!(agents[0].name, "Test agent");
1043        assert_eq!(agents[0].status, AgentStatus::Starting);
1044        assert_eq!(agents[0].seed_id, Some(seed.id));
1045    }
1046
1047    #[tokio::test]
1048    async fn test_exec_updates_status_to_running() {
1049        let supervisor = make_supervisor().await;
1050        let seed = make_seed("Running agent");
1051
1052        let id = supervisor.fork(&seed).await.unwrap();
1053        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Starting);
1054
1055        supervisor.exec(id).await.unwrap();
1056        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
1057    }
1058
1059    #[tokio::test]
1060    async fn test_kill_sets_stopped() {
1061        let supervisor = make_supervisor().await;
1062        let seed = make_seed("Doomed agent");
1063
1064        let id = supervisor.fork(&seed).await.unwrap();
1065        supervisor.exec(id).await.unwrap();
1066        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Running);
1067
1068        supervisor.kill(id).await.unwrap();
1069        assert_eq!(supervisor.wait(id).await.unwrap(), AgentStatus::Stopped);
1070    }
1071
1072    #[tokio::test]
1073    async fn test_kill_unknown_agent_returns_error() {
1074        let supervisor = make_supervisor().await;
1075        let unknown_id = uuid::Uuid::new_v4();
1076
1077        let result = supervisor.kill(unknown_id).await;
1078        assert!(result.is_err());
1079        assert!(result.unwrap_err().to_string().contains("not found"));
1080    }
1081
1082    #[tokio::test]
1083    async fn test_list_returns_all_agents() {
1084        let supervisor = make_supervisor().await;
1085
1086        let id1 = supervisor.fork(&make_seed("Agent 1")).await.unwrap();
1087        let id2 = supervisor.fork(&make_seed("Agent 2")).await.unwrap();
1088        let id3 = supervisor.fork(&make_seed("Agent 3")).await.unwrap();
1089
1090        let agents = supervisor.list().await.unwrap();
1091        assert_eq!(agents.len(), 3);
1092
1093        let ids: std::collections::HashSet<AgentId> = agents.iter().map(|a| a.id).collect();
1094        assert!(ids.contains(&id1));
1095        assert!(ids.contains(&id2));
1096        assert!(ids.contains(&id3));
1097    }
1098
1099    #[tokio::test]
1100    async fn test_exec_unknown_agent_returns_error() {
1101        let supervisor = make_supervisor().await;
1102        let unknown_id = uuid::Uuid::new_v4();
1103
1104        let result = supervisor.exec(unknown_id).await;
1105        assert!(result.is_err());
1106        assert!(result.unwrap_err().to_string().contains("not found"));
1107    }
1108
1109    #[tokio::test]
1110    async fn test_wait_unknown_agent_returns_error() {
1111        let supervisor = make_supervisor().await;
1112        let unknown_id = uuid::Uuid::new_v4();
1113
1114        let result = supervisor.wait(unknown_id).await;
1115        assert!(result.is_err());
1116        assert!(result.unwrap_err().to_string().contains("not found"));
1117    }
1118}