Skip to main content

oxios_kernel/kernel_handle/
agent_api.rs

1//! Agent API — agent lifecycle, budget, memory, history log.
2
3use crate::agent_log_db::{AgentListFilter, AgentStats, QueryResult};
4use crate::budget::{BudgetExceeded, BudgetInfo, BudgetLimit, BudgetManager};
5use crate::event_bus::{EventBus, KernelEvent};
6use crate::memory::{HnswMemoryIndex, SemanticHit};
7use crate::memory::{MemoryEntry, MemoryManager, MemoryType};
8use crate::state_store::StateStore;
9use crate::supervisor::Supervisor;
10use crate::types::{AgentId, AgentInfo};
11use std::sync::Arc;
12
13/// Agent management system calls.
14pub struct AgentApi {
15    pub(crate) supervisor: Arc<dyn Supervisor>,
16    pub(crate) budget_manager: Arc<BudgetManager>,
17    pub(crate) memory_manager: Arc<MemoryManager>,
18    /// HNSW index for semantic search (optional, initialized on demand).
19    pub(crate) hnsw_index: Option<Arc<HnswMemoryIndex>>,
20    /// Event bus for publishing agent-related events.
21    pub(crate) event_bus: Option<EventBus>,
22    /// State store for filesystem agent persistence.
23    pub(crate) state_store: Option<Arc<StateStore>>,
24    /// SQLite-backed agent history query index.
25    #[cfg(feature = "sqlite-memory")]
26    pub(crate) agent_log_db: Option<Arc<crate::agent_log_db::AgentLogDb>>,
27}
28
29impl AgentApi {
30    /// Create a new AgentApi.
31    pub fn new(
32        supervisor: Arc<dyn Supervisor>,
33        budget_manager: Arc<BudgetManager>,
34        memory_manager: Arc<MemoryManager>,
35        event_bus: Option<EventBus>,
36    ) -> Self {
37        Self {
38            supervisor,
39            budget_manager,
40            memory_manager,
41            hnsw_index: None,
42            event_bus,
43            state_store: None,
44            #[cfg(feature = "sqlite-memory")]
45            agent_log_db: None,
46        }
47    }
48
49    /// Attach a state store for agent history persistence.
50    pub fn set_state_store(&mut self, store: Arc<StateStore>) {
51        self.state_store = Some(store);
52    }
53
54    /// Attach an SQLite-backed agent log database.
55    #[cfg(feature = "sqlite-memory")]
56    pub fn set_agent_log_db(&mut self, db: Arc<crate::agent_log_db::AgentLogDb>) {
57        self.agent_log_db = Some(db);
58    }
59
60    /// Attach an HNSW index for semantic search.
61    pub fn set_hnsw_index(&mut self, index: Arc<HnswMemoryIndex>) {
62        self.hnsw_index = Some(index);
63    }
64    /// Check whether an HNSW index is attached for fast semantic search.
65    pub fn has_hnsw_index(&self) -> bool {
66        self.hnsw_index.is_some()
67    }
68
69    /// Publish a kernel event if the event bus is available.
70    fn publish(&self, event: KernelEvent) {
71        if let Some(ref eb) = self.event_bus {
72            let _ = eb.publish(event);
73        }
74    }
75    /// List running agents (in-memory only).
76    pub async fn list(&self) -> anyhow::Result<Vec<AgentInfo>> {
77        self.supervisor
78            .list()
79            .await
80            .map_err(|e| anyhow::anyhow!("supervisor: {e}"))
81    }
82
83    /// Kill a running agent.
84    pub async fn kill(&self, agent_id: &str) -> anyhow::Result<()> {
85        let id = uuid::Uuid::parse_str(agent_id)
86            .map_err(|e| anyhow::anyhow!("invalid agent id: {e}"))?;
87        self.supervisor
88            .kill(id)
89            .await
90            .map_err(|e| anyhow::anyhow!("supervisor: {e}"))
91    }
92
93    /// Check budget for an agent.
94    pub fn check_budget(&self, agent_id: &AgentId) -> BudgetInfo {
95        self.budget_manager.remaining(agent_id)
96    }
97
98    /// Set budget for an agent.
99    pub fn set_budget(&self, limit: BudgetLimit) {
100        self.budget_manager.set_budget(limit);
101    }
102
103    /// Remove budget for an agent.
104    pub fn remove_budget(&self, agent_id: &AgentId) {
105        self.budget_manager.remove_budget(agent_id);
106    }
107
108    /// Reserve tokens for an agent.
109    pub fn reserve_budget(&self, agent_id: &AgentId, tokens: u64) -> Result<(), BudgetExceeded> {
110        self.budget_manager.reserve(agent_id, tokens)
111    }
112
113    /// Reset budget window for an agent.
114    pub fn reset_budget(&self, agent_id: &AgentId) {
115        self.budget_manager.reset_window(agent_id);
116    }
117
118    /// Get full budget info (limits + usage) for an agent.
119    pub fn full_budget_info(&self, agent_id: &AgentId) -> Option<crate::budget::FullBudgetInfo> {
120        self.budget_manager.full_info(agent_id)
121    }
122
123    /// Get full budget info for all agents with configured budgets.
124    pub fn all_budget_info(&self) -> Vec<crate::budget::FullBudgetInfo> {
125        self.budget_manager.all_full_info()
126    }
127
128    /// Get memory stats.
129    pub async fn memory_stats(&self) -> (usize, usize) {
130        (
131            self.memory_manager.vector_index_size(),
132            self.memory_manager.total_entries().await,
133        )
134    }
135
136    /// Store a memory entry.
137    pub async fn remember(&self, entry: MemoryEntry) -> anyhow::Result<String> {
138        let id = self.memory_manager.remember(entry.clone()).await?;
139
140        // Publish MemoryStored event
141        self.publish(KernelEvent::MemoryStored {
142            id: id.clone(),
143            memory_type: entry.memory_type.label().to_string(),
144            source: entry.source.clone(),
145        });
146
147        Ok(id)
148    }
149
150    /// Search memory entries.
151    pub async fn search_memory(
152        &self,
153        query: &str,
154        memory_type: Option<MemoryType>,
155        limit: usize,
156    ) -> anyhow::Result<Vec<MemoryEntry>> {
157        self.memory_manager.search(query, memory_type, limit).await
158    }
159
160    /// Semantic search using HNSW index.
161    ///
162    /// Falls back to regular search if HNSW index is not available.
163    pub async fn semantic_search_memory(
164        &self,
165        query: &str,
166        memory_type: Option<MemoryType>,
167        limit: usize,
168    ) -> anyhow::Result<Vec<SemanticHit>> {
169        if let Some(ref hnsw) = self.hnsw_index {
170            self.memory_manager
171                .semantic_search(query, memory_type, limit, hnsw)
172                .await
173        } else {
174            // Fallback to regular search, wrap results
175            let entries = self.search_memory(query, memory_type, limit).await?;
176            Ok(entries
177                .into_iter()
178                .map(|entry| SemanticHit {
179                    entry,
180                    distance: 0.0,
181                    similarity: 0.0,
182                })
183                .collect())
184        }
185    }
186
187    /// Memory manager reference.
188    pub fn memory_manager(&self) -> &Arc<MemoryManager> {
189        &self.memory_manager
190    }
191
192    // ── Agent History Log ─────────────────────────────────────────
193
194    /// Query agent history (in-memory + SQLite) with filters.
195    ///
196    /// Merges running agents from supervisor with persisted agents
197    /// from the SQLite query index. Running agents are prepended.
198    pub async fn query(&self, filter: &AgentListFilter) -> anyhow::Result<QueryResult> {
199        // Get running agents from supervisor
200        let running = self.supervisor.list().await.unwrap_or_default();
201
202        // Query SQLite for historical agents
203        #[cfg(feature = "sqlite-memory")]
204        if let Some(ref db) = self.agent_log_db {
205            let mut result = db.query(filter).map_err(|e| anyhow::anyhow!("{e}"))?;
206
207            // Only prepend running agents on the first page — otherwise we'd
208            // inject them into every requested page and break pagination.
209            if filter.page == 1 {
210                // Dedup against the SQLite items so a running agent that is
211                // also already persisted isn't shown twice.
212                let existing_ids: std::collections::HashSet<_> =
213                    result.items.iter().map(|a| a.id).collect();
214                let mut prepended = 0u64;
215                for agent in &running {
216                    if existing_ids.contains(&agent.id) {
217                        continue;
218                    }
219                    if filter_matches(agent, filter) {
220                        result.items.insert(0, agent.clone());
221                        prepended += 1;
222                    }
223                }
224                result.total = result.total.saturating_add(prepended);
225                // Recompute total_pages so callers see a consistent view.
226                result.total_pages = if result.total == 0 {
227                    1
228                } else {
229                    ((result.total as f64) / filter.per_page.max(1) as f64).ceil() as u32
230                };
231            }
232
233            return Ok(result);
234        }
235
236        // Fallback: filesystem-only scan
237        #[allow(unused_mut)]
238        let mut persisted: Vec<AgentInfo> = Vec::new();
239        if let Some(ref store) = self.state_store {
240            let names = store.list_category("agents").await.unwrap_or_default();
241            for name in &names {
242                if let Ok(Some(agent)) = store.load_json::<AgentInfo>("agents", name).await {
243                    persisted.push(agent);
244                }
245            }
246        }
247
248        // Merge running + persisted, dedup by id (running wins)
249        let running_ids: std::collections::HashSet<_> = running.iter().map(|a| a.id).collect();
250        persisted.retain(|a| !running_ids.contains(&a.id));
251
252        let mut all = running;
253        all.extend(persisted);
254
255        // In-memory filter/sort/paginate (basic fallback)
256        let filtered = fallback_filter(all, filter);
257        let total = filtered.len() as u64;
258        let offset = ((filter.page.max(1) - 1) * filter.per_page) as usize;
259        let limit = filter.per_page.min(200) as usize;
260        let items: Vec<AgentInfo> = filtered.into_iter().skip(offset).take(limit).collect();
261        let total_pages = if total == 0 {
262            1
263        } else {
264            ((total as f64) / filter.per_page as f64).ceil() as u32
265        };
266
267        Ok(QueryResult {
268            items,
269            total,
270            page: filter.page,
271            per_page: filter.per_page,
272            total_pages,
273            stats: crate::agent_log_db::FilteredStats::default(),
274        })
275    }
276
277    /// Get an agent by ID (from SQLite or filesystem fallback).
278    pub async fn get(&self, id: &str) -> anyhow::Result<Option<AgentInfo>> {
279        // Try SQLite first
280        #[cfg(feature = "sqlite-memory")]
281        if let Some(ref db) = self.agent_log_db
282            && let Ok(Some(agent)) = db.get(id)
283        {
284            return Ok(Some(agent));
285        }
286
287        // Fallback: filesystem JSON
288        if let Some(ref store) = self.state_store
289            && let Ok(Some(agent)) = store.load_json::<AgentInfo>("agents", id).await
290        {
291            return Ok(Some(agent));
292        }
293
294        // Fallback: in-memory
295        if let Ok(agents) = self.supervisor.list().await
296            && let Some(agent) = agents.into_iter().find(|a| a.id.to_string() == id)
297        {
298            return Ok(Some(agent));
299        }
300
301        Ok(None)
302    }
303
304    /// Global agent stats (unfiltered).
305    pub async fn stats(&self) -> anyhow::Result<AgentStats> {
306        // Try SQLite first
307        #[cfg(feature = "sqlite-memory")]
308        if let Some(ref db) = self.agent_log_db {
309            return db.stats().map_err(|e| anyhow::anyhow!("{e}"));
310        }
311
312        // Fallback: compute from in-memory + filesystem
313        let mut s = AgentStats::default();
314        let running = self.supervisor.list().await.unwrap_or_default();
315        for a in &running {
316            s.total_agents += 1;
317            match a.status {
318                crate::types::AgentStatus::Running | crate::types::AgentStatus::Starting => {
319                    s.running += 1
320                }
321                crate::types::AgentStatus::Idle
322                | crate::types::AgentStatus::Stopped
323                | crate::types::AgentStatus::Completed => s.completed += 1,
324                crate::types::AgentStatus::Failed => s.failed += 1,
325            }
326            s.total_tokens += a.tokens_input + a.tokens_output;
327            s.total_cost_usd += a.cost_usd;
328        }
329        Ok(s)
330    }
331
332    /// Rebuild SQLite agent log index from filesystem JSON.
333    #[cfg(feature = "sqlite-memory")]
334    pub async fn reindex(&self) -> anyhow::Result<crate::agent_log_db::RebuildReport> {
335        match (self.agent_log_db.as_ref(), self.state_store.as_ref()) {
336            (Some(db), Some(store)) => db
337                .reindex_all(store)
338                .await
339                .map_err(|e| anyhow::anyhow!("{e}")),
340            _ => anyhow::bail!("Agent log DB not initialized"),
341        }
342    }
343
344    /// Rebuild the HNSW index from all stored memories.
345    pub async fn rebuild_hnsw_index(&self) -> anyhow::Result<usize> {
346        if let Some(ref hnsw) = self.hnsw_index {
347            self.memory_manager.rebuild_hnsw_index(hnsw).await
348        } else {
349            Err(anyhow::anyhow!("HNSW index not initialized"))
350        }
351    }
352}
353
354/// Check if an agent matches the filter (used for prepending running agents).
355fn filter_matches(agent: &AgentInfo, filter: &AgentListFilter) -> bool {
356    // Status filter
357    if let Some(status) = filter.status {
358        let status_str = agent.status.to_string();
359        if status_str != status.as_sql()
360            && !(status_str == "idle" && status.as_sql() == "completed")
361            && !(status_str == "idle" && status.as_sql() == "running")
362        {
363            return false;
364        }
365    }
366
367    // Date range
368    if let Some(from) = filter.date_from
369        && agent.created_at < from
370    {
371        return false;
372    }
373    if let Some(to) = filter.date_to
374        && agent.created_at > to
375    {
376        return false;
377    }
378
379    // Session / project / seed
380    if let Some(ref sid) = filter.session_id
381        && agent.session_id.as_deref() != Some(sid.as_str())
382    {
383        return false;
384    }
385    if let Some(ref pid) = filter.project_id
386        && agent.project_id.map(|p| p.to_string()).as_deref() != Some(pid.as_str())
387    {
388        return false;
389    }
390    if let Some(ref sid) = filter.seed_id
391        && agent.seed_id.map(|s| s.to_string()).as_deref() != Some(sid.as_str())
392    {
393        return false;
394    }
395
396    // Model filter (substring)
397    if let Some(ref model) = filter.model_id
398        && !agent.model_id.contains(model)
399    {
400        return false;
401    }
402
403    // Text search (name + error only for in-memory agents — no tool_calls scan)
404    if let Some(ref q) = filter.q {
405        let q_lower = q.to_lowercase();
406        let name_match = agent.name.to_lowercase().contains(&q_lower);
407        let error_match = agent
408            .error
409            .as_deref()
410            .is_some_and(|e| e.to_lowercase().contains(&q_lower));
411        if !name_match && !error_match {
412            return false;
413        }
414    }
415
416    // Error filter
417    if let Some(has_err) = filter.has_error {
418        let agent_has_err = agent.error.as_deref().is_some_and(|e| !e.is_empty());
419        if has_err != agent_has_err {
420            return false;
421        }
422    }
423
424    // Budget ranges
425    if let Some(min) = filter.cost_min
426        && agent.cost_usd < min
427    {
428        return false;
429    }
430    if let Some(max) = filter.cost_max
431        && agent.cost_usd > max
432    {
433        return false;
434    }
435
436    true
437}
438
439/// Fallback in-memory filtering (used when SQLite is not available).
440fn fallback_filter(mut agents: Vec<AgentInfo>, filter: &AgentListFilter) -> Vec<AgentInfo> {
441    // Sort
442    match filter.sort_by {
443        crate::agent_log_db::SortBy::CreatedAt => {
444            agents.sort_by_key(|a| std::cmp::Reverse(a.created_at));
445        }
446        crate::agent_log_db::SortBy::Cost => {
447            agents.sort_by(|a, b| {
448                b.cost_usd
449                    .partial_cmp(&a.cost_usd)
450                    .unwrap_or(std::cmp::Ordering::Equal)
451            });
452        }
453        crate::agent_log_db::SortBy::Duration => {
454            let dur = |a: &AgentInfo| -> i64 {
455                match (a.started_at, a.completed_at) {
456                    (Some(s), Some(e)) => (e - s).num_seconds(),
457                    _ => 0,
458                }
459            };
460            agents.sort_by_key(|a| std::cmp::Reverse(dur(a)));
461        }
462        crate::agent_log_db::SortBy::Tokens => {
463            agents.sort_by_key(|a| std::cmp::Reverse(a.tokens_input + a.tokens_output));
464        }
465        crate::agent_log_db::SortBy::Name => {
466            agents.sort_by(|a, b| a.name.cmp(&b.name));
467        }
468    }
469
470    agents
471}