oxios_kernel/kernel_handle/
agent_api.rs1use crate::agent_log_db::{AgentListFilter, AgentStats, QueryResult};
4use crate::budget::{BudgetExceeded, BudgetInfo, BudgetLimit, BudgetManager};
5use crate::event_bus::{EventBus, KernelEvent};
6use crate::memory::{HnswMemoryIndex, SemanticHit};
7use crate::memory::{MemoryEntry, MemoryManager, MemoryType};
8use crate::state_store::StateStore;
9use crate::supervisor::Supervisor;
10use crate::types::{AgentId, AgentInfo};
11use std::sync::Arc;
12
13pub struct AgentApi {
15 pub(crate) supervisor: Arc<dyn Supervisor>,
16 pub(crate) budget_manager: Arc<BudgetManager>,
17 pub(crate) memory_manager: Arc<MemoryManager>,
18 pub(crate) hnsw_index: Option<Arc<HnswMemoryIndex>>,
20 pub(crate) event_bus: Option<EventBus>,
22 pub(crate) state_store: Option<Arc<StateStore>>,
24 #[cfg(feature = "sqlite-memory")]
26 pub(crate) agent_log_db: Option<Arc<crate::agent_log_db::AgentLogDb>>,
27}
28
29impl AgentApi {
30 pub fn new(
32 supervisor: Arc<dyn Supervisor>,
33 budget_manager: Arc<BudgetManager>,
34 memory_manager: Arc<MemoryManager>,
35 event_bus: Option<EventBus>,
36 ) -> Self {
37 Self {
38 supervisor,
39 budget_manager,
40 memory_manager,
41 hnsw_index: None,
42 event_bus,
43 state_store: None,
44 #[cfg(feature = "sqlite-memory")]
45 agent_log_db: None,
46 }
47 }
48
49 pub fn set_state_store(&mut self, store: Arc<StateStore>) {
51 self.state_store = Some(store);
52 }
53
54 #[cfg(feature = "sqlite-memory")]
56 pub fn set_agent_log_db(&mut self, db: Arc<crate::agent_log_db::AgentLogDb>) {
57 self.agent_log_db = Some(db);
58 }
59
60 pub fn set_hnsw_index(&mut self, index: Arc<HnswMemoryIndex>) {
62 self.hnsw_index = Some(index);
63 }
64
65 fn publish(&self, event: KernelEvent) {
67 if let Some(ref eb) = self.event_bus {
68 let _ = eb.publish(event);
69 }
70 }
71 pub async fn list(&self) -> anyhow::Result<Vec<AgentInfo>> {
73 self.supervisor
74 .list()
75 .await
76 .map_err(|e| anyhow::anyhow!("supervisor: {e}"))
77 }
78
79 pub async fn kill(&self, agent_id: &str) -> anyhow::Result<()> {
81 let id = uuid::Uuid::parse_str(agent_id)
82 .map_err(|e| anyhow::anyhow!("invalid agent id: {e}"))?;
83 self.supervisor
84 .kill(id)
85 .await
86 .map_err(|e| anyhow::anyhow!("supervisor: {e}"))
87 }
88
89 pub fn check_budget(&self, agent_id: &AgentId) -> BudgetInfo {
91 self.budget_manager.remaining(agent_id)
92 }
93
94 pub fn set_budget(&self, limit: BudgetLimit) {
96 self.budget_manager.set_budget(limit);
97 }
98
99 pub fn remove_budget(&self, agent_id: &AgentId) {
101 self.budget_manager.remove_budget(agent_id);
102 }
103
104 pub fn reserve_budget(&self, agent_id: &AgentId, tokens: u64) -> Result<(), BudgetExceeded> {
106 self.budget_manager.reserve(agent_id, tokens)
107 }
108
109 pub fn reset_budget(&self, agent_id: &AgentId) {
111 self.budget_manager.reset_window(agent_id);
112 }
113
114 pub fn full_budget_info(&self, agent_id: &AgentId) -> Option<crate::budget::FullBudgetInfo> {
116 self.budget_manager.full_info(agent_id)
117 }
118
119 pub fn all_budget_info(&self) -> Vec<crate::budget::FullBudgetInfo> {
121 self.budget_manager.all_full_info()
122 }
123
124 pub async fn memory_stats(&self) -> (usize, usize) {
126 (
127 self.memory_manager.vector_index_size(),
128 self.memory_manager.total_entries().await,
129 )
130 }
131
132 pub async fn remember(&self, entry: MemoryEntry) -> anyhow::Result<String> {
134 let id = self.memory_manager.remember(entry.clone()).await?;
135
136 self.publish(KernelEvent::MemoryStored {
138 id: id.clone(),
139 memory_type: entry.memory_type.label().to_string(),
140 source: entry.source.clone(),
141 });
142
143 Ok(id)
144 }
145
146 pub async fn search_memory(
148 &self,
149 query: &str,
150 memory_type: Option<MemoryType>,
151 limit: usize,
152 ) -> anyhow::Result<Vec<MemoryEntry>> {
153 self.memory_manager.search(query, memory_type, limit).await
154 }
155
156 pub async fn semantic_search_memory(
160 &self,
161 query: &str,
162 memory_type: Option<MemoryType>,
163 limit: usize,
164 ) -> anyhow::Result<Vec<SemanticHit>> {
165 if let Some(ref hnsw) = self.hnsw_index {
166 self.memory_manager
167 .semantic_search(query, memory_type, limit, hnsw)
168 .await
169 } else {
170 let entries = self.search_memory(query, memory_type, limit).await?;
172 Ok(entries
173 .into_iter()
174 .map(|entry| SemanticHit {
175 entry,
176 distance: 0.0,
177 similarity: 0.0,
178 })
179 .collect())
180 }
181 }
182
183 pub fn memory_manager(&self) -> &Arc<MemoryManager> {
185 &self.memory_manager
186 }
187
188 pub async fn query(&self, filter: &AgentListFilter) -> anyhow::Result<QueryResult> {
195 let running = self.supervisor.list().await.unwrap_or_default();
197
198 #[cfg(feature = "sqlite-memory")]
200 if let Some(ref db) = self.agent_log_db {
201 let mut result = db.query(filter).map_err(|e| anyhow::anyhow!("{e}"))?;
202
203 for agent in &running {
205 if filter_matches(agent, filter) {
206 result.items.insert(0, agent.clone());
207 result.total += 1;
208 }
209 }
210
211 return Ok(result);
212 }
213
214 #[allow(unused_mut)]
216 let mut persisted: Vec<AgentInfo> = Vec::new();
217 if let Some(ref store) = self.state_store {
218 let names = store.list_category("agents").await.unwrap_or_default();
219 for name in &names {
220 if let Ok(Some(agent)) = store.load_json::<AgentInfo>("agents", name).await {
221 persisted.push(agent);
222 }
223 }
224 }
225
226 let running_ids: std::collections::HashSet<_> = running.iter().map(|a| a.id).collect();
228 persisted.retain(|a| !running_ids.contains(&a.id));
229
230 let mut all = running;
231 all.extend(persisted);
232
233 let filtered = fallback_filter(all, filter);
235 let total = filtered.len() as u64;
236 let offset = ((filter.page.max(1) - 1) * filter.per_page) as usize;
237 let limit = filter.per_page.min(200) as usize;
238 let items: Vec<AgentInfo> = filtered.into_iter().skip(offset).take(limit).collect();
239 let total_pages = if total == 0 {
240 1
241 } else {
242 ((total as f64) / filter.per_page as f64).ceil() as u32
243 };
244
245 Ok(QueryResult {
246 items,
247 total,
248 page: filter.page,
249 per_page: filter.per_page,
250 total_pages,
251 stats: crate::agent_log_db::FilteredStats::default(),
252 })
253 }
254
255 pub async fn get(&self, id: &str) -> anyhow::Result<Option<AgentInfo>> {
257 #[cfg(feature = "sqlite-memory")]
259 if let Some(ref db) = self.agent_log_db
260 && let Ok(Some(agent)) = db.get(id)
261 {
262 return Ok(Some(agent));
263 }
264
265 if let Some(ref store) = self.state_store
267 && let Ok(Some(agent)) = store.load_json::<AgentInfo>("agents", id).await
268 {
269 return Ok(Some(agent));
270 }
271
272 if let Ok(agents) = self.supervisor.list().await
274 && let Some(agent) = agents.into_iter().find(|a| a.id.to_string() == id)
275 {
276 return Ok(Some(agent));
277 }
278
279 Ok(None)
280 }
281
282 pub async fn stats(&self) -> anyhow::Result<AgentStats> {
284 #[cfg(feature = "sqlite-memory")]
286 if let Some(ref db) = self.agent_log_db {
287 return db.stats().map_err(|e| anyhow::anyhow!("{e}"));
288 }
289
290 let mut s = AgentStats::default();
292 let running = self.supervisor.list().await.unwrap_or_default();
293 for a in &running {
294 s.total_agents += 1;
295 match a.status {
296 crate::types::AgentStatus::Running | crate::types::AgentStatus::Starting => {
297 s.running += 1
298 }
299 crate::types::AgentStatus::Idle
300 | crate::types::AgentStatus::Stopped
301 | crate::types::AgentStatus::Completed => s.completed += 1,
302 crate::types::AgentStatus::Failed => s.failed += 1,
303 }
304 s.total_tokens += a.tokens_input + a.tokens_output;
305 s.total_cost_usd += a.cost_usd;
306 }
307 Ok(s)
308 }
309
310 #[cfg(feature = "sqlite-memory")]
312 pub async fn reindex(&self) -> anyhow::Result<crate::agent_log_db::RebuildReport> {
313 match (self.agent_log_db.as_ref(), self.state_store.as_ref()) {
314 (Some(db), Some(store)) => db
315 .reindex_all(store)
316 .await
317 .map_err(|e| anyhow::anyhow!("{e}")),
318 _ => anyhow::bail!("Agent log DB not initialized"),
319 }
320 }
321
322 pub async fn rebuild_hnsw_index(&self) -> anyhow::Result<usize> {
324 if let Some(ref hnsw) = self.hnsw_index {
325 self.memory_manager.rebuild_hnsw_index(hnsw).await
326 } else {
327 Err(anyhow::anyhow!("HNSW index not initialized"))
328 }
329 }
330}
331
332fn filter_matches(agent: &AgentInfo, filter: &AgentListFilter) -> bool {
334 if let Some(status) = filter.status {
336 let status_str = agent.status.to_string();
337 if status_str != status.as_sql()
338 && !(status_str == "idle" && status.as_sql() == "completed")
339 && !(status_str == "idle" && status.as_sql() == "running")
340 {
341 return false;
342 }
343 }
344
345 if let Some(from) = filter.date_from
347 && agent.created_at < from
348 {
349 return false;
350 }
351 if let Some(to) = filter.date_to
352 && agent.created_at > to
353 {
354 return false;
355 }
356
357 if let Some(ref sid) = filter.session_id
359 && agent.session_id.as_deref() != Some(sid.as_str())
360 {
361 return false;
362 }
363 if let Some(ref pid) = filter.project_id
364 && agent.project_id.map(|p| p.to_string()).as_deref() != Some(pid.as_str())
365 {
366 return false;
367 }
368 if let Some(ref sid) = filter.seed_id
369 && agent.seed_id.map(|s| s.to_string()).as_deref() != Some(sid.as_str())
370 {
371 return false;
372 }
373
374 if let Some(ref model) = filter.model_id
376 && !agent.model_id.contains(model)
377 {
378 return false;
379 }
380
381 if let Some(ref q) = filter.q {
383 let q_lower = q.to_lowercase();
384 let name_match = agent.name.to_lowercase().contains(&q_lower);
385 let error_match = agent
386 .error
387 .as_deref()
388 .is_some_and(|e| e.to_lowercase().contains(&q_lower));
389 if !name_match && !error_match {
390 return false;
391 }
392 }
393
394 if let Some(has_err) = filter.has_error {
396 let agent_has_err = agent.error.as_deref().is_some_and(|e| !e.is_empty());
397 if has_err != agent_has_err {
398 return false;
399 }
400 }
401
402 if let Some(min) = filter.cost_min
404 && agent.cost_usd < min
405 {
406 return false;
407 }
408 if let Some(max) = filter.cost_max
409 && agent.cost_usd > max
410 {
411 return false;
412 }
413
414 true
415}
416
417fn fallback_filter(mut agents: Vec<AgentInfo>, filter: &AgentListFilter) -> Vec<AgentInfo> {
419 match filter.sort_by {
421 crate::agent_log_db::SortBy::CreatedAt => {
422 agents.sort_by_key(|a| std::cmp::Reverse(a.created_at));
423 }
424 crate::agent_log_db::SortBy::Cost => {
425 agents.sort_by(|a, b| {
426 b.cost_usd
427 .partial_cmp(&a.cost_usd)
428 .unwrap_or(std::cmp::Ordering::Equal)
429 });
430 }
431 crate::agent_log_db::SortBy::Duration => {
432 let dur = |a: &AgentInfo| -> i64 {
433 match (a.started_at, a.completed_at) {
434 (Some(s), Some(e)) => (e - s).num_seconds(),
435 _ => 0,
436 }
437 };
438 agents.sort_by_key(|a| std::cmp::Reverse(dur(a)));
439 }
440 crate::agent_log_db::SortBy::Tokens => {
441 agents.sort_by_key(|a| std::cmp::Reverse(a.tokens_input + a.tokens_output));
442 }
443 crate::agent_log_db::SortBy::Name => {
444 agents.sort_by(|a, b| a.name.cmp(&b.name));
445 }
446 }
447
448 agents
449}