Skip to main content

oxios_kernel/kernel_handle/
agent_api.rs

1//! Agent API — agent lifecycle, budget, memory, history log.
2
3use crate::agent_log_db::{AgentListFilter, AgentStats, QueryResult};
4use crate::budget::{BudgetExceeded, BudgetInfo, BudgetLimit, BudgetManager};
5use crate::event_bus::{EventBus, KernelEvent};
6use crate::memory::{HnswMemoryIndex, SemanticHit};
7use crate::memory::{MemoryEntry, MemoryManager, MemoryType};
8use crate::state_store::StateStore;
9use crate::supervisor::Supervisor;
10use crate::types::{AgentId, AgentInfo};
11use std::sync::Arc;
12
13/// Agent management system calls.
14pub struct AgentApi {
15    pub(crate) supervisor: Arc<dyn Supervisor>,
16    pub(crate) budget_manager: Arc<BudgetManager>,
17    pub(crate) memory_manager: Arc<MemoryManager>,
18    /// HNSW index for semantic search (optional, initialized on demand).
19    pub(crate) hnsw_index: Option<Arc<HnswMemoryIndex>>,
20    /// Event bus for publishing agent-related events.
21    pub(crate) event_bus: Option<EventBus>,
22    /// State store for filesystem agent persistence.
23    pub(crate) state_store: Option<Arc<StateStore>>,
24    /// SQLite-backed agent history query index.
25    #[cfg(feature = "sqlite-memory")]
26    pub(crate) agent_log_db: Option<Arc<crate::agent_log_db::AgentLogDb>>,
27}
28
29impl AgentApi {
30    /// Create a new AgentApi.
31    pub fn new(
32        supervisor: Arc<dyn Supervisor>,
33        budget_manager: Arc<BudgetManager>,
34        memory_manager: Arc<MemoryManager>,
35        event_bus: Option<EventBus>,
36    ) -> Self {
37        Self {
38            supervisor,
39            budget_manager,
40            memory_manager,
41            hnsw_index: None,
42            event_bus,
43            state_store: None,
44            #[cfg(feature = "sqlite-memory")]
45            agent_log_db: None,
46        }
47    }
48
49    /// Attach a state store for agent history persistence.
50    pub fn set_state_store(&mut self, store: Arc<StateStore>) {
51        self.state_store = Some(store);
52    }
53
54    /// Attach an SQLite-backed agent log database.
55    #[cfg(feature = "sqlite-memory")]
56    pub fn set_agent_log_db(&mut self, db: Arc<crate::agent_log_db::AgentLogDb>) {
57        self.agent_log_db = Some(db);
58    }
59
60    /// Attach an HNSW index for semantic search.
61    pub fn set_hnsw_index(&mut self, index: Arc<HnswMemoryIndex>) {
62        self.hnsw_index = Some(index);
63    }
64
65    /// Publish a kernel event if the event bus is available.
66    fn publish(&self, event: KernelEvent) {
67        if let Some(ref eb) = self.event_bus {
68            let _ = eb.publish(event);
69        }
70    }
71    /// List running agents (in-memory only).
72    pub async fn list(&self) -> anyhow::Result<Vec<AgentInfo>> {
73        self.supervisor
74            .list()
75            .await
76            .map_err(|e| anyhow::anyhow!("supervisor: {e}"))
77    }
78
79    /// Kill a running agent.
80    pub async fn kill(&self, agent_id: &str) -> anyhow::Result<()> {
81        let id = uuid::Uuid::parse_str(agent_id)
82            .map_err(|e| anyhow::anyhow!("invalid agent id: {e}"))?;
83        self.supervisor
84            .kill(id)
85            .await
86            .map_err(|e| anyhow::anyhow!("supervisor: {e}"))
87    }
88
89    /// Check budget for an agent.
90    pub fn check_budget(&self, agent_id: &AgentId) -> BudgetInfo {
91        self.budget_manager.remaining(agent_id)
92    }
93
94    /// Set budget for an agent.
95    pub fn set_budget(&self, limit: BudgetLimit) {
96        self.budget_manager.set_budget(limit);
97    }
98
99    /// Remove budget for an agent.
100    pub fn remove_budget(&self, agent_id: &AgentId) {
101        self.budget_manager.remove_budget(agent_id);
102    }
103
104    /// Reserve tokens for an agent.
105    pub fn reserve_budget(&self, agent_id: &AgentId, tokens: u64) -> Result<(), BudgetExceeded> {
106        self.budget_manager.reserve(agent_id, tokens)
107    }
108
109    /// Reset budget window for an agent.
110    pub fn reset_budget(&self, agent_id: &AgentId) {
111        self.budget_manager.reset_window(agent_id);
112    }
113
114    /// Get full budget info (limits + usage) for an agent.
115    pub fn full_budget_info(&self, agent_id: &AgentId) -> Option<crate::budget::FullBudgetInfo> {
116        self.budget_manager.full_info(agent_id)
117    }
118
119    /// Get full budget info for all agents with configured budgets.
120    pub fn all_budget_info(&self) -> Vec<crate::budget::FullBudgetInfo> {
121        self.budget_manager.all_full_info()
122    }
123
124    /// Get memory stats.
125    pub async fn memory_stats(&self) -> (usize, usize) {
126        (
127            self.memory_manager.vector_index_size(),
128            self.memory_manager.total_entries().await,
129        )
130    }
131
132    /// Store a memory entry.
133    pub async fn remember(&self, entry: MemoryEntry) -> anyhow::Result<String> {
134        let id = self.memory_manager.remember(entry.clone()).await?;
135
136        // Publish MemoryStored event
137        self.publish(KernelEvent::MemoryStored {
138            id: id.clone(),
139            memory_type: entry.memory_type.label().to_string(),
140            source: entry.source.clone(),
141        });
142
143        Ok(id)
144    }
145
146    /// Search memory entries.
147    pub async fn search_memory(
148        &self,
149        query: &str,
150        memory_type: Option<MemoryType>,
151        limit: usize,
152    ) -> anyhow::Result<Vec<MemoryEntry>> {
153        self.memory_manager.search(query, memory_type, limit).await
154    }
155
156    /// Semantic search using HNSW index.
157    ///
158    /// Falls back to regular search if HNSW index is not available.
159    pub async fn semantic_search_memory(
160        &self,
161        query: &str,
162        memory_type: Option<MemoryType>,
163        limit: usize,
164    ) -> anyhow::Result<Vec<SemanticHit>> {
165        if let Some(ref hnsw) = self.hnsw_index {
166            self.memory_manager
167                .semantic_search(query, memory_type, limit, hnsw)
168                .await
169        } else {
170            // Fallback to regular search, wrap results
171            let entries = self.search_memory(query, memory_type, limit).await?;
172            Ok(entries
173                .into_iter()
174                .map(|entry| SemanticHit {
175                    entry,
176                    distance: 0.0,
177                    similarity: 0.0,
178                })
179                .collect())
180        }
181    }
182
183    /// Memory manager reference.
184    pub fn memory_manager(&self) -> &Arc<MemoryManager> {
185        &self.memory_manager
186    }
187
188    // ── Agent History Log ─────────────────────────────────────────
189
190    /// Query agent history (in-memory + SQLite) with filters.
191    ///
192    /// Merges running agents from supervisor with persisted agents
193    /// from the SQLite query index. Running agents are prepended.
194    pub async fn query(&self, filter: &AgentListFilter) -> anyhow::Result<QueryResult> {
195        // Get running agents from supervisor
196        let running = self.supervisor.list().await.unwrap_or_default();
197
198        // Query SQLite for historical agents
199        #[cfg(feature = "sqlite-memory")]
200        if let Some(ref db) = self.agent_log_db {
201            let mut result = db.query(filter).map_err(|e| anyhow::anyhow!("{e}"))?;
202
203            // Prepend running agents that match the filter
204            for agent in &running {
205                if filter_matches(agent, filter) {
206                    result.items.insert(0, agent.clone());
207                    result.total += 1;
208                }
209            }
210
211            return Ok(result);
212        }
213
214        // Fallback: filesystem-only scan
215        #[allow(unused_mut)]
216        let mut persisted: Vec<AgentInfo> = Vec::new();
217        if let Some(ref store) = self.state_store {
218            let names = store.list_category("agents").await.unwrap_or_default();
219            for name in &names {
220                if let Ok(Some(agent)) = store.load_json::<AgentInfo>("agents", name).await {
221                    persisted.push(agent);
222                }
223            }
224        }
225
226        // Merge running + persisted, dedup by id (running wins)
227        let running_ids: std::collections::HashSet<_> = running.iter().map(|a| a.id).collect();
228        persisted.retain(|a| !running_ids.contains(&a.id));
229
230        let mut all = running;
231        all.extend(persisted);
232
233        // In-memory filter/sort/paginate (basic fallback)
234        let filtered = fallback_filter(all, filter);
235        let total = filtered.len() as u64;
236        let offset = ((filter.page.max(1) - 1) * filter.per_page) as usize;
237        let limit = filter.per_page.min(200) as usize;
238        let items: Vec<AgentInfo> = filtered.into_iter().skip(offset).take(limit).collect();
239        let total_pages = if total == 0 {
240            1
241        } else {
242            ((total as f64) / filter.per_page as f64).ceil() as u32
243        };
244
245        Ok(QueryResult {
246            items,
247            total,
248            page: filter.page,
249            per_page: filter.per_page,
250            total_pages,
251            stats: crate::agent_log_db::FilteredStats::default(),
252        })
253    }
254
255    /// Get an agent by ID (from SQLite or filesystem fallback).
256    pub async fn get(&self, id: &str) -> anyhow::Result<Option<AgentInfo>> {
257        // Try SQLite first
258        #[cfg(feature = "sqlite-memory")]
259        if let Some(ref db) = self.agent_log_db
260            && let Ok(Some(agent)) = db.get(id)
261        {
262            return Ok(Some(agent));
263        }
264
265        // Fallback: filesystem JSON
266        if let Some(ref store) = self.state_store
267            && let Ok(Some(agent)) = store.load_json::<AgentInfo>("agents", id).await
268        {
269            return Ok(Some(agent));
270        }
271
272        // Fallback: in-memory
273        if let Ok(agents) = self.supervisor.list().await
274            && let Some(agent) = agents.into_iter().find(|a| a.id.to_string() == id)
275        {
276            return Ok(Some(agent));
277        }
278
279        Ok(None)
280    }
281
282    /// Global agent stats (unfiltered).
283    pub async fn stats(&self) -> anyhow::Result<AgentStats> {
284        // Try SQLite first
285        #[cfg(feature = "sqlite-memory")]
286        if let Some(ref db) = self.agent_log_db {
287            return db.stats().map_err(|e| anyhow::anyhow!("{e}"));
288        }
289
290        // Fallback: compute from in-memory + filesystem
291        let mut s = AgentStats::default();
292        let running = self.supervisor.list().await.unwrap_or_default();
293        for a in &running {
294            s.total_agents += 1;
295            match a.status {
296                crate::types::AgentStatus::Running | crate::types::AgentStatus::Starting => {
297                    s.running += 1
298                }
299                crate::types::AgentStatus::Idle
300                | crate::types::AgentStatus::Stopped
301                | crate::types::AgentStatus::Completed => s.completed += 1,
302                crate::types::AgentStatus::Failed => s.failed += 1,
303            }
304            s.total_tokens += a.tokens_input + a.tokens_output;
305            s.total_cost_usd += a.cost_usd;
306        }
307        Ok(s)
308    }
309
310    /// Rebuild SQLite agent log index from filesystem JSON.
311    #[cfg(feature = "sqlite-memory")]
312    pub async fn reindex(&self) -> anyhow::Result<crate::agent_log_db::RebuildReport> {
313        match (self.agent_log_db.as_ref(), self.state_store.as_ref()) {
314            (Some(db), Some(store)) => db
315                .reindex_all(store)
316                .await
317                .map_err(|e| anyhow::anyhow!("{e}")),
318            _ => anyhow::bail!("Agent log DB not initialized"),
319        }
320    }
321
322    /// Rebuild the HNSW index from all stored memories.
323    pub async fn rebuild_hnsw_index(&self) -> anyhow::Result<usize> {
324        if let Some(ref hnsw) = self.hnsw_index {
325            self.memory_manager.rebuild_hnsw_index(hnsw).await
326        } else {
327            Err(anyhow::anyhow!("HNSW index not initialized"))
328        }
329    }
330}
331
332/// Check if an agent matches the filter (used for prepending running agents).
333fn filter_matches(agent: &AgentInfo, filter: &AgentListFilter) -> bool {
334    // Status filter
335    if let Some(status) = filter.status {
336        let status_str = agent.status.to_string();
337        if status_str != status.as_sql()
338            && !(status_str == "idle" && status.as_sql() == "completed")
339            && !(status_str == "idle" && status.as_sql() == "running")
340        {
341            return false;
342        }
343    }
344
345    // Date range
346    if let Some(from) = filter.date_from
347        && agent.created_at < from
348    {
349        return false;
350    }
351    if let Some(to) = filter.date_to
352        && agent.created_at > to
353    {
354        return false;
355    }
356
357    // Session / project / seed
358    if let Some(ref sid) = filter.session_id
359        && agent.session_id.as_deref() != Some(sid.as_str())
360    {
361        return false;
362    }
363    if let Some(ref pid) = filter.project_id
364        && agent.project_id.map(|p| p.to_string()).as_deref() != Some(pid.as_str())
365    {
366        return false;
367    }
368    if let Some(ref sid) = filter.seed_id
369        && agent.seed_id.map(|s| s.to_string()).as_deref() != Some(sid.as_str())
370    {
371        return false;
372    }
373
374    // Model filter (substring)
375    if let Some(ref model) = filter.model_id
376        && !agent.model_id.contains(model)
377    {
378        return false;
379    }
380
381    // Text search (name + error only for in-memory agents — no tool_calls scan)
382    if let Some(ref q) = filter.q {
383        let q_lower = q.to_lowercase();
384        let name_match = agent.name.to_lowercase().contains(&q_lower);
385        let error_match = agent
386            .error
387            .as_deref()
388            .is_some_and(|e| e.to_lowercase().contains(&q_lower));
389        if !name_match && !error_match {
390            return false;
391        }
392    }
393
394    // Error filter
395    if let Some(has_err) = filter.has_error {
396        let agent_has_err = agent.error.as_deref().is_some_and(|e| !e.is_empty());
397        if has_err != agent_has_err {
398            return false;
399        }
400    }
401
402    // Budget ranges
403    if let Some(min) = filter.cost_min
404        && agent.cost_usd < min
405    {
406        return false;
407    }
408    if let Some(max) = filter.cost_max
409        && agent.cost_usd > max
410    {
411        return false;
412    }
413
414    true
415}
416
417/// Fallback in-memory filtering (used when SQLite is not available).
418fn fallback_filter(mut agents: Vec<AgentInfo>, filter: &AgentListFilter) -> Vec<AgentInfo> {
419    // Sort
420    match filter.sort_by {
421        crate::agent_log_db::SortBy::CreatedAt => {
422            agents.sort_by_key(|a| std::cmp::Reverse(a.created_at));
423        }
424        crate::agent_log_db::SortBy::Cost => {
425            agents.sort_by(|a, b| {
426                b.cost_usd
427                    .partial_cmp(&a.cost_usd)
428                    .unwrap_or(std::cmp::Ordering::Equal)
429            });
430        }
431        crate::agent_log_db::SortBy::Duration => {
432            let dur = |a: &AgentInfo| -> i64 {
433                match (a.started_at, a.completed_at) {
434                    (Some(s), Some(e)) => (e - s).num_seconds(),
435                    _ => 0,
436                }
437            };
438            agents.sort_by_key(|a| std::cmp::Reverse(dur(a)));
439        }
440        crate::agent_log_db::SortBy::Tokens => {
441            agents.sort_by_key(|a| std::cmp::Reverse(a.tokens_input + a.tokens_output));
442        }
443        crate::agent_log_db::SortBy::Name => {
444            agents.sort_by(|a, b| a.name.cmp(&b.name));
445        }
446    }
447
448    agents
449}