oxios_kernel/kernel_handle/
agent_api.rs1use crate::agent_log_db::{AgentListFilter, AgentStats, QueryResult};
4use crate::budget::{BudgetExceeded, BudgetInfo, BudgetLimit, BudgetManager};
5use crate::event_bus::{EventBus, KernelEvent};
6use crate::memory::{HnswMemoryIndex, SemanticHit};
7use crate::memory::{MemoryEntry, MemoryManager, MemoryType};
8use crate::state_store::StateStore;
9use crate::supervisor::Supervisor;
10use crate::types::{AgentId, AgentInfo};
11use std::sync::Arc;
12
13pub struct AgentApi {
15 pub(crate) supervisor: Arc<dyn Supervisor>,
16 pub(crate) budget_manager: Arc<BudgetManager>,
17 pub(crate) memory_manager: Arc<MemoryManager>,
18 pub(crate) hnsw_index: Option<Arc<HnswMemoryIndex>>,
20 pub(crate) event_bus: Option<EventBus>,
22 pub(crate) state_store: Option<Arc<StateStore>>,
24 #[cfg(feature = "sqlite-memory")]
26 pub(crate) agent_log_db: Option<Arc<crate::agent_log_db::AgentLogDb>>,
27}
28
29impl AgentApi {
30 pub fn new(
32 supervisor: Arc<dyn Supervisor>,
33 budget_manager: Arc<BudgetManager>,
34 memory_manager: Arc<MemoryManager>,
35 event_bus: Option<EventBus>,
36 ) -> Self {
37 Self {
38 supervisor,
39 budget_manager,
40 memory_manager,
41 hnsw_index: None,
42 event_bus,
43 state_store: None,
44 #[cfg(feature = "sqlite-memory")]
45 agent_log_db: None,
46 }
47 }
48
49 pub fn set_state_store(&mut self, store: Arc<StateStore>) {
51 self.state_store = Some(store);
52 }
53
54 #[cfg(feature = "sqlite-memory")]
56 pub fn set_agent_log_db(&mut self, db: Arc<crate::agent_log_db::AgentLogDb>) {
57 self.agent_log_db = Some(db);
58 }
59
60 pub fn set_hnsw_index(&mut self, index: Arc<HnswMemoryIndex>) {
62 self.hnsw_index = Some(index);
63 }
64 pub fn has_hnsw_index(&self) -> bool {
66 self.hnsw_index.is_some()
67 }
68
69 fn publish(&self, event: KernelEvent) {
71 if let Some(ref eb) = self.event_bus {
72 let _ = eb.publish(event);
73 }
74 }
75 pub async fn list(&self) -> anyhow::Result<Vec<AgentInfo>> {
77 self.supervisor
78 .list()
79 .await
80 .map_err(|e| anyhow::anyhow!("supervisor: {e}"))
81 }
82
83 pub async fn kill(&self, agent_id: &str) -> anyhow::Result<()> {
85 let id = uuid::Uuid::parse_str(agent_id)
86 .map_err(|e| anyhow::anyhow!("invalid agent id: {e}"))?;
87 self.supervisor
88 .kill(id)
89 .await
90 .map_err(|e| anyhow::anyhow!("supervisor: {e}"))
91 }
92
93 pub fn check_budget(&self, agent_id: &AgentId) -> BudgetInfo {
95 self.budget_manager.remaining(agent_id)
96 }
97
98 pub fn set_budget(&self, limit: BudgetLimit) {
100 self.budget_manager.set_budget(limit);
101 }
102
103 pub fn remove_budget(&self, agent_id: &AgentId) {
105 self.budget_manager.remove_budget(agent_id);
106 }
107
108 pub fn reserve_budget(&self, agent_id: &AgentId, tokens: u64) -> Result<(), BudgetExceeded> {
110 self.budget_manager.reserve(agent_id, tokens)
111 }
112
113 pub fn reset_budget(&self, agent_id: &AgentId) {
115 self.budget_manager.reset_window(agent_id);
116 }
117
118 pub fn full_budget_info(&self, agent_id: &AgentId) -> Option<crate::budget::FullBudgetInfo> {
120 self.budget_manager.full_info(agent_id)
121 }
122
123 pub fn all_budget_info(&self) -> Vec<crate::budget::FullBudgetInfo> {
125 self.budget_manager.all_full_info()
126 }
127
128 pub async fn memory_stats(&self) -> (usize, usize) {
130 (
131 self.memory_manager.vector_index_size(),
132 self.memory_manager.total_entries().await,
133 )
134 }
135
136 pub async fn remember(&self, entry: MemoryEntry) -> anyhow::Result<String> {
138 let id = self.memory_manager.remember(entry.clone()).await?;
139
140 self.publish(KernelEvent::MemoryStored {
142 id: id.clone(),
143 memory_type: entry.memory_type.label().to_string(),
144 source: entry.source.clone(),
145 });
146
147 Ok(id)
148 }
149
150 pub async fn search_memory(
152 &self,
153 query: &str,
154 memory_type: Option<MemoryType>,
155 limit: usize,
156 ) -> anyhow::Result<Vec<MemoryEntry>> {
157 self.memory_manager.search(query, memory_type, limit).await
158 }
159
160 pub async fn semantic_search_memory(
164 &self,
165 query: &str,
166 memory_type: Option<MemoryType>,
167 limit: usize,
168 ) -> anyhow::Result<Vec<SemanticHit>> {
169 if let Some(ref hnsw) = self.hnsw_index {
170 self.memory_manager
171 .semantic_search(query, memory_type, limit, hnsw)
172 .await
173 } else {
174 let entries = self.search_memory(query, memory_type, limit).await?;
176 Ok(entries
177 .into_iter()
178 .map(|entry| SemanticHit {
179 entry,
180 distance: 0.0,
181 similarity: 0.0,
182 })
183 .collect())
184 }
185 }
186
187 pub fn memory_manager(&self) -> &Arc<MemoryManager> {
189 &self.memory_manager
190 }
191
192 pub async fn query(&self, filter: &AgentListFilter) -> anyhow::Result<QueryResult> {
199 let running = self.supervisor.list().await.unwrap_or_default();
201
202 #[cfg(feature = "sqlite-memory")]
204 if let Some(ref db) = self.agent_log_db {
205 let mut result = db.query(filter).map_err(|e| anyhow::anyhow!("{e}"))?;
206
207 if filter.page == 1 {
210 let existing_ids: std::collections::HashSet<_> =
213 result.items.iter().map(|a| a.id).collect();
214 let mut prepended = 0u64;
215 for agent in &running {
216 if existing_ids.contains(&agent.id) {
217 continue;
218 }
219 if filter_matches(agent, filter) {
220 result.items.insert(0, agent.clone());
221 prepended += 1;
222 }
223 }
224 result.total = result.total.saturating_add(prepended);
225 result.total_pages = if result.total == 0 {
227 1
228 } else {
229 ((result.total as f64) / filter.per_page.max(1) as f64).ceil() as u32
230 };
231 }
232
233 return Ok(result);
234 }
235
236 #[allow(unused_mut)]
238 let mut persisted: Vec<AgentInfo> = Vec::new();
239 if let Some(ref store) = self.state_store {
240 let names = store.list_category("agents").await.unwrap_or_default();
241 for name in &names {
242 if let Ok(Some(agent)) = store.load_json::<AgentInfo>("agents", name).await {
243 persisted.push(agent);
244 }
245 }
246 }
247
248 let running_ids: std::collections::HashSet<_> = running.iter().map(|a| a.id).collect();
250 persisted.retain(|a| !running_ids.contains(&a.id));
251
252 let mut all = running;
253 all.extend(persisted);
254
255 let filtered = fallback_filter(all, filter);
257 let total = filtered.len() as u64;
258 let offset = ((filter.page.max(1) - 1) * filter.per_page) as usize;
259 let limit = filter.per_page.min(200) as usize;
260 let items: Vec<AgentInfo> = filtered.into_iter().skip(offset).take(limit).collect();
261 let total_pages = if total == 0 {
262 1
263 } else {
264 ((total as f64) / filter.per_page as f64).ceil() as u32
265 };
266
267 Ok(QueryResult {
268 items,
269 total,
270 page: filter.page,
271 per_page: filter.per_page,
272 total_pages,
273 stats: crate::agent_log_db::FilteredStats::default(),
274 })
275 }
276
277 pub async fn get(&self, id: &str) -> anyhow::Result<Option<AgentInfo>> {
279 #[cfg(feature = "sqlite-memory")]
281 if let Some(ref db) = self.agent_log_db
282 && let Ok(Some(agent)) = db.get(id)
283 {
284 return Ok(Some(agent));
285 }
286
287 if let Some(ref store) = self.state_store
289 && let Ok(Some(agent)) = store.load_json::<AgentInfo>("agents", id).await
290 {
291 return Ok(Some(agent));
292 }
293
294 if let Ok(agents) = self.supervisor.list().await
296 && let Some(agent) = agents.into_iter().find(|a| a.id.to_string() == id)
297 {
298 return Ok(Some(agent));
299 }
300
301 Ok(None)
302 }
303
304 pub async fn stats(&self) -> anyhow::Result<AgentStats> {
306 #[cfg(feature = "sqlite-memory")]
308 if let Some(ref db) = self.agent_log_db {
309 return db.stats().map_err(|e| anyhow::anyhow!("{e}"));
310 }
311
312 let mut s = AgentStats::default();
314 let running = self.supervisor.list().await.unwrap_or_default();
315 for a in &running {
316 s.total_agents += 1;
317 match a.status {
318 crate::types::AgentStatus::Running | crate::types::AgentStatus::Starting => {
319 s.running += 1
320 }
321 crate::types::AgentStatus::Idle
322 | crate::types::AgentStatus::Stopped
323 | crate::types::AgentStatus::Completed => s.completed += 1,
324 crate::types::AgentStatus::Failed => s.failed += 1,
325 }
326 s.total_tokens += a.tokens_input + a.tokens_output;
327 s.total_cost_usd += a.cost_usd;
328 }
329 Ok(s)
330 }
331
332 #[cfg(feature = "sqlite-memory")]
334 pub async fn reindex(&self) -> anyhow::Result<crate::agent_log_db::RebuildReport> {
335 match (self.agent_log_db.as_ref(), self.state_store.as_ref()) {
336 (Some(db), Some(store)) => db
337 .reindex_all(store)
338 .await
339 .map_err(|e| anyhow::anyhow!("{e}")),
340 _ => anyhow::bail!("Agent log DB not initialized"),
341 }
342 }
343
344 pub async fn rebuild_hnsw_index(&self) -> anyhow::Result<usize> {
346 if let Some(ref hnsw) = self.hnsw_index {
347 self.memory_manager.rebuild_hnsw_index(hnsw).await
348 } else {
349 Err(anyhow::anyhow!("HNSW index not initialized"))
350 }
351 }
352}
353
354fn filter_matches(agent: &AgentInfo, filter: &AgentListFilter) -> bool {
356 if let Some(status) = filter.status {
358 let status_str = agent.status.to_string();
359 if status_str != status.as_sql()
360 && !(status_str == "idle" && status.as_sql() == "completed")
361 && !(status_str == "idle" && status.as_sql() == "running")
362 {
363 return false;
364 }
365 }
366
367 if let Some(from) = filter.date_from
369 && agent.created_at < from
370 {
371 return false;
372 }
373 if let Some(to) = filter.date_to
374 && agent.created_at > to
375 {
376 return false;
377 }
378
379 if let Some(ref sid) = filter.session_id
381 && agent.session_id.as_deref() != Some(sid.as_str())
382 {
383 return false;
384 }
385 if let Some(ref pid) = filter.project_id
386 && agent.project_id.map(|p| p.to_string()).as_deref() != Some(pid.as_str())
387 {
388 return false;
389 }
390 if let Some(ref sid) = filter.seed_id
391 && agent.seed_id.map(|s| s.to_string()).as_deref() != Some(sid.as_str())
392 {
393 return false;
394 }
395
396 if let Some(ref model) = filter.model_id
398 && !agent.model_id.contains(model)
399 {
400 return false;
401 }
402
403 if let Some(ref q) = filter.q {
405 let q_lower = q.to_lowercase();
406 let name_match = agent.name.to_lowercase().contains(&q_lower);
407 let error_match = agent
408 .error
409 .as_deref()
410 .is_some_and(|e| e.to_lowercase().contains(&q_lower));
411 if !name_match && !error_match {
412 return false;
413 }
414 }
415
416 if let Some(has_err) = filter.has_error {
418 let agent_has_err = agent.error.as_deref().is_some_and(|e| !e.is_empty());
419 if has_err != agent_has_err {
420 return false;
421 }
422 }
423
424 if let Some(min) = filter.cost_min
426 && agent.cost_usd < min
427 {
428 return false;
429 }
430 if let Some(max) = filter.cost_max
431 && agent.cost_usd > max
432 {
433 return false;
434 }
435
436 true
437}
438
439fn fallback_filter(mut agents: Vec<AgentInfo>, filter: &AgentListFilter) -> Vec<AgentInfo> {
441 match filter.sort_by {
443 crate::agent_log_db::SortBy::CreatedAt => {
444 agents.sort_by_key(|a| std::cmp::Reverse(a.created_at));
445 }
446 crate::agent_log_db::SortBy::Cost => {
447 agents.sort_by(|a, b| {
448 b.cost_usd
449 .partial_cmp(&a.cost_usd)
450 .unwrap_or(std::cmp::Ordering::Equal)
451 });
452 }
453 crate::agent_log_db::SortBy::Duration => {
454 let dur = |a: &AgentInfo| -> i64 {
455 match (a.started_at, a.completed_at) {
456 (Some(s), Some(e)) => (e - s).num_seconds(),
457 _ => 0,
458 }
459 };
460 agents.sort_by_key(|a| std::cmp::Reverse(dur(a)));
461 }
462 crate::agent_log_db::SortBy::Tokens => {
463 agents.sort_by_key(|a| std::cmp::Reverse(a.tokens_input + a.tokens_output));
464 }
465 crate::agent_log_db::SortBy::Name => {
466 agents.sort_by(|a, b| a.name.cmp(&b.name));
467 }
468 }
469
470 agents
471}