1use crate::db::agent_runs;
8use crate::db::tenant_agents::{self, TenantAgent};
9use crate::memory::estimate_tokens;
10use crate::models::{TenantContext, TenantTier};
11use crate::types::{AgentContext, AgentType, AppError, ChatRequest, ChatResponse, Result};
12use crate::AppState;
13use axum::{
14 extract::{Extension, Path, Query, State},
15 http::StatusCode,
16 response::IntoResponse,
17 Json,
18};
19use chrono::{DateTime, Datelike, TimeZone, Utc};
20use serde::{Deserialize, Serialize};
21
22#[derive(Debug, Serialize)]
27pub struct V1Agent {
28 pub id: String,
29 pub name: String,
30 pub agent_type: String,
31 pub status: V1AgentStatus,
32 pub config: serde_json::Value,
33 pub created_at: DateTime<Utc>,
34 pub last_run: Option<DateTime<Utc>>,
35 pub total_runs: u64,
36 pub success_rate: f64,
37}
38
39#[derive(Debug, Serialize)]
40#[serde(rename_all = "snake_case")]
41pub enum V1AgentStatus {
42 Active,
43 Idle,
44 Error,
45 Disabled,
46}
47
48impl From<TenantAgent> for V1Agent {
49 fn from(a: TenantAgent) -> Self {
50 let status = if a.enabled {
51 V1AgentStatus::Active
52 } else {
53 V1AgentStatus::Disabled
54 };
55 Self {
56 id: a.id,
57 name: a.agent_name,
58 agent_type: "custom".to_string(),
59 status,
60 config: a.config,
61 created_at: ts_to_dt(a.created_at),
62 last_run: None,
63 total_runs: 0,
64 success_rate: 0.0,
65 }
66 }
67}
68
69#[derive(Debug, Serialize)]
70pub struct V1AgentRun {
71 pub id: String,
72 pub agent_id: String,
73 pub status: String,
74 pub input: serde_json::Value,
75 pub output: Option<serde_json::Value>,
76 pub error: Option<String>,
77 pub started_at: DateTime<Utc>,
78 pub finished_at: Option<DateTime<Utc>>,
79 pub duration_ms: Option<u64>,
80 pub tokens_used: Option<u64>,
81}
82
83#[derive(Debug, Serialize)]
84pub struct V1AgentLog {
85 pub id: String,
86 pub agent_id: String,
87 pub run_id: Option<String>,
88 pub level: String,
89 pub message: String,
90 pub metadata: Option<serde_json::Value>,
91 pub timestamp: DateTime<Utc>,
92}
93
94#[derive(Debug, Serialize)]
95pub struct Paginated<T> {
96 pub items: Vec<T>,
97 pub total: u64,
98 pub page: u32,
99 pub per_page: u32,
100 pub total_pages: u32,
101}
102
103impl<T> Paginated<T> {
104 fn empty(page: u32, per_page: u32) -> Self {
105 Self {
106 items: vec![],
107 total: 0,
108 page,
109 per_page,
110 total_pages: 0,
111 }
112 }
113}
114
115#[derive(Debug, Serialize)]
116pub struct V1Usage {
117 pub period_start: DateTime<Utc>,
118 pub period_end: DateTime<Utc>,
119 pub total_runs: u64,
120 pub total_tokens: u64,
121 pub total_api_calls: u64,
122 pub quota_runs: Option<u64>,
123 pub quota_tokens: Option<u64>,
124 pub daily_usage: Vec<DailyUsage>,
125}
126
127#[derive(Debug, Serialize)]
128pub struct DailyUsage {
129 pub date: String,
130 pub runs: u64,
131 pub tokens: u64,
132 pub api_calls: u64,
133}
134
135#[derive(Debug, Serialize)]
136pub struct V1ApiKey {
137 pub id: String,
138 pub name: String,
139 pub prefix: String,
140 pub created_at: DateTime<Utc>,
141 pub last_used: Option<DateTime<Utc>>,
142 pub expires_at: Option<DateTime<Utc>>,
143}
144
145#[derive(Debug, Deserialize)]
146pub struct CreateApiKeyRequest {
147 pub name: String,
148 pub expires_in_days: Option<u32>,
149}
150
151#[derive(Debug, Serialize)]
152pub struct CreateApiKeyResponse {
153 pub key: V1ApiKey,
154 pub secret: String,
155}
156
157#[derive(Debug, Deserialize)]
158pub struct PaginationQuery {
159 pub page: Option<u32>,
160 pub per_page: Option<u32>,
161}
162
163fn ts_to_dt(ts: i64) -> DateTime<Utc> {
168 Utc.timestamp_opt(ts, 0).single().unwrap_or_else(Utc::now)
169}
170
171fn extract_tenant(ctx: Option<Extension<TenantContext>>) -> Result<TenantContext> {
172 ctx.map(|Extension(c)| c)
173 .ok_or_else(|| AppError::Auth("Missing tenant context".to_string()))
174}
175
176pub async fn v1_chat(
182 State(state): State<AppState>,
183 ctx: Option<Extension<TenantContext>>,
184 Json(payload): Json<ChatRequest>,
185) -> Result<axum::response::Response> {
186 let tc = extract_tenant(ctx)?;
187
188 if tc.tier != TenantTier::Enterprise {
190 let monthly = state.tenant_db.get_monthly_requests(&tc.tenant_id).await.unwrap_or(0);
191 let daily = state.tenant_db.get_daily_requests(&tc.tenant_id).await.unwrap_or(0);
192 if !tc.can_make_request(monthly, daily) {
193 return Err(crate::types::AppError::RateLimited(format!(
194 "Quota exceeded for {:?} tier. Monthly: {}/{}, Daily: {}/{}",
195 tc.tier, monthly, tc.quota.requests_per_month, daily, tc.quota.requests_per_day
196 )));
197 }
198 }
199
200 if state.emergency_stop.load(std::sync::atomic::Ordering::Relaxed) {
202 return Err(crate::types::AppError::Unavailable(
203 "All agents are currently under human review. Please try again later.".to_string(),
204 ));
205 }
206
207 let agent_context = AgentContext {
209 user_id: tc.tenant_id.clone(),
210 session_id: uuid::Uuid::new_v4().to_string(),
211 conversation_history: vec![],
212 user_memory: None,
213 };
214
215 let agent_type = if let Some(at) = payload.agent_type {
217 at
218 } else {
219 AgentType::Orchestrator
220 };
221
222 let agent_name = crate::agents::registry::AgentRegistry::type_to_name(&agent_type).to_string();
224 let start = std::time::Instant::now();
225
226 let eruka_context = state.context_provider
230 .get_context(&agent_name, &tc.tenant_id)
231 .await;
232
233 let effective_message = if let Some(ctx) = eruka_context {
234 tracing::info!(
235 agent = %agent_name,
236 tenant = %tc.tenant_id,
237 ctx_len = ctx.len(),
238 "External context injected into agent call"
239 );
240 format!("{}\n\n---\nUser message: {}", ctx, payload.message)
241 } else {
242 payload.message.clone()
243 };
244
245 use crate::agents::Agent;
246 let agent = state.agent_registry.create_agent(&agent_name).await?;
247 let response = agent.execute(&effective_message, &agent_context).await?;
248 let duration_ms = start.elapsed().as_millis() as i64;
249
250 let response_text = response.content;
251 let model_name = response.metadata.as_ref()
252 .map(|m| m.model_name.clone())
253 .unwrap_or_else(|| "unknown".to_string());
254 let provider_name = response.metadata.as_ref()
255 .map(|m| m.provider_name.clone())
256 .unwrap_or_else(|| "unknown".to_string());
257
258 let (input_tokens, output_tokens) = if let Some(u) = response.usage {
260 (u.prompt_tokens, u.completion_tokens)
261 } else {
262 (
263 estimate_tokens(&effective_message) as u32,
264 estimate_tokens(&response_text) as u32,
265 )
266 };
267
268 {
270 let pool = state.tenant_db.pool().clone();
271 let tid = tc.tenant_id.clone();
272 let aname = agent_name;
273 let itok = input_tokens as i64;
274 let otok = output_tokens as i64;
275 let mname = model_name.clone();
276 let pname = provider_name.clone();
277 tokio::spawn(async move {
278 let _ = agent_runs::insert_agent_run(
279 &pool,
280 &tid,
281 &aname,
282 None,
283 "completed",
284 itok,
285 otok,
286 duration_ms,
287 None,
288 &mname,
289 &pname,
290 false,
291 )
292 .await;
293 });
294 }
295
296 let chat_response = ChatResponse {
297 response: response_text,
298 agent: format!("{:?} (system)", agent_type),
299 context_id: agent_context.session_id,
300 sources: None,
301 };
302
303 let body = Json(chat_response);
304 let mut response = body.into_response();
305 response.headers_mut().insert(
306 axum::http::HeaderName::from_static("x-input-tokens"),
307 axum::http::HeaderValue::from(input_tokens),
308 );
309 response.headers_mut().insert(
310 axum::http::HeaderName::from_static("x-output-tokens"),
311 axum::http::HeaderValue::from(output_tokens),
312 );
313 if let Ok(v) = axum::http::HeaderValue::from_str(&model_name) {
314 response.headers_mut().insert(
315 axum::http::HeaderName::from_static("x-model-name"), v,
316 );
317 }
318 if let Ok(v) = axum::http::HeaderValue::from_str(&provider_name) {
319 response.headers_mut().insert(
320 axum::http::HeaderName::from_static("x-provider-name"), v,
321 );
322 }
323
324 Ok(response)
325}
326
327pub async fn list_agents(
329 State(state): State<AppState>,
330 ctx: Option<Extension<TenantContext>>,
331 Query(q): Query<PaginationQuery>,
332) -> Result<Json<Paginated<V1Agent>>> {
333 let tc = extract_tenant(ctx)?;
334 let page = q.page.unwrap_or(1).max(1);
335 let per_page = q.per_page.unwrap_or(20).min(100);
336
337 let agents = tenant_agents::list_tenant_agents(state.tenant_db.pool(), &tc.tenant_id).await?;
338 let total = agents.len() as u64;
339 let total_pages = ((total as f64) / (per_page as f64)).ceil() as u32;
340
341 let start = ((page - 1) * per_page) as usize;
342 let items: Vec<V1Agent> = agents
343 .into_iter()
344 .skip(start)
345 .take(per_page as usize)
346 .map(V1Agent::from)
347 .collect();
348
349 Ok(Json(Paginated {
350 items,
351 total,
352 page,
353 per_page,
354 total_pages,
355 }))
356}
357
358pub async fn get_agent(
360 State(state): State<AppState>,
361 ctx: Option<Extension<TenantContext>>,
362 Path(name): Path<String>,
363) -> Result<Json<V1Agent>> {
364 let tc = extract_tenant(ctx)?;
365 let agent =
366 tenant_agents::get_tenant_agent(state.tenant_db.pool(), &tc.tenant_id, &name).await?;
367 Ok(Json(V1Agent::from(agent)))
368}
369
370pub async fn run_agent(
372 State(state): State<AppState>,
373 ctx: Option<Extension<TenantContext>>,
374 Path(name): Path<String>,
375 Json(input): Json<serde_json::Value>,
376) -> Result<Json<V1AgentRun>> {
377 let tc = extract_tenant(ctx)?;
378
379 if state.emergency_stop.load(std::sync::atomic::Ordering::Relaxed) {
381 return Err(crate::types::AppError::Unavailable(
382 "All agents are currently under human review. Please try again later.".to_string(),
383 ));
384 }
385
386 let _agent =
388 tenant_agents::get_tenant_agent(state.tenant_db.pool(), &tc.tenant_id, &name).await?;
389
390 let message = input
392 .get("message")
393 .or_else(|| input.get("input"))
394 .and_then(|v| v.as_str())
395 .map(|s| s.to_string())
396 .unwrap_or_else(|| serde_json::to_string(&input).unwrap_or_default());
397
398 let agent_context = AgentContext {
400 user_id: tc.tenant_id.clone(),
401 session_id: uuid::Uuid::new_v4().to_string(),
402 conversation_history: vec![],
403 user_memory: None,
404 };
405
406 let start = std::time::Instant::now();
408 use crate::agents::Agent;
409 let agent = state.agent_registry.create_agent(&name).await?;
410 let result = agent.execute(&message, &agent_context).await;
411 let duration_ms = start.elapsed().as_millis() as u64;
412
413 match result {
414 Ok(response) => {
415 let (input_tokens, output_tokens) = if let Some(ref u) = response.usage {
416 (u.prompt_tokens as u64, u.completion_tokens as u64)
417 } else {
418 (
419 estimate_tokens(&message) as u64,
420 estimate_tokens(&response.content) as u64,
421 )
422 };
423
424 let model_name = response
425 .metadata
426 .as_ref()
427 .map(|m| m.model_name.clone())
428 .unwrap_or_else(|| "unknown".to_string());
429 let provider_name = response
430 .metadata
431 .as_ref()
432 .map(|m| m.provider_name.clone())
433 .unwrap_or_else(|| "unknown".to_string());
434
435 let run_id = uuid::Uuid::new_v4().to_string();
437 {
438 let pool = state.tenant_db.pool().clone();
439 let tid = tc.tenant_id.clone();
440 let aname = name.clone();
441 let _rid = run_id.clone();
442 let itok = input_tokens as i64;
443 let otok = output_tokens as i64;
444 let dur = duration_ms as i64;
445 let mname = model_name.clone();
446 let pname = provider_name.clone();
447 tokio::spawn(async move {
448 let _ = agent_runs::insert_agent_run(
449 &pool, &tid, &aname, None, "completed", itok, otok, dur, None,
450 &mname, &pname, false,
451 )
452 .await;
453 });
454 }
455
456 Ok(Json(V1AgentRun {
457 id: run_id,
458 agent_id: name,
459 status: "completed".to_string(),
460 input,
461 output: Some(serde_json::json!({"response": response.content})),
462 error: None,
463 started_at: Utc::now(),
464 finished_at: Some(Utc::now()),
465 duration_ms: Some(duration_ms),
466 tokens_used: Some(input_tokens + output_tokens),
467 }))
468 }
469 Err(e) => {
470 let run_id = uuid::Uuid::new_v4().to_string();
472 {
473 let pool = state.tenant_db.pool().clone();
474 let tid = tc.tenant_id.clone();
475 let aname = name.clone();
476 let err_msg = e.to_string();
477 let dur = duration_ms as i64;
478 tokio::spawn(async move {
479 let _ = agent_runs::insert_agent_run(
480 &pool, &tid, &aname, None, "failed", 0, 0, dur,
481 Some(&err_msg), "unknown", "unknown", false,
482 )
483 .await;
484 });
485 }
486
487 Ok(Json(V1AgentRun {
488 id: run_id,
489 agent_id: name,
490 status: "failed".to_string(),
491 input,
492 output: None,
493 error: Some(e.to_string()),
494 started_at: Utc::now(),
495 finished_at: Some(Utc::now()),
496 duration_ms: Some(duration_ms),
497 tokens_used: Some(0),
498 }))
499 }
500 }
501}
502
503pub async fn list_agent_runs(
505 State(state): State<AppState>,
506 ctx: Option<Extension<TenantContext>>,
507 Path(name): Path<String>,
508 Query(q): Query<PaginationQuery>,
509) -> Result<Json<Paginated<V1AgentRun>>> {
510 let tc = extract_tenant(ctx)?;
511 let page = q.page.unwrap_or(1).max(1);
512 let per_page = q.per_page.unwrap_or(25).min(100);
513 let offset = ((page - 1) * per_page) as i64;
514
515 let runs = agent_runs::list_agent_runs(
516 state.tenant_db.pool(),
517 &tc.tenant_id,
518 Some(&name),
519 per_page as i64,
520 offset,
521 )
522 .await?;
523
524 let items: Vec<V1AgentRun> = runs
525 .into_iter()
526 .map(|r| V1AgentRun {
527 id: r.id,
528 agent_id: r.agent_name,
529 status: r.status,
530 input: serde_json::json!({"tokens": r.input_tokens}),
531 output: Some(serde_json::json!({"tokens": r.output_tokens})),
532 error: r.error,
533 started_at: ts_to_dt(r.created_at),
534 finished_at: Some(ts_to_dt(r.created_at + (r.duration_ms / 1000))),
535 duration_ms: Some(r.duration_ms as u64),
536 tokens_used: Some((r.input_tokens + r.output_tokens) as u64),
537 })
538 .collect();
539
540 let total = items.len() as u64;
541 Ok(Json(Paginated {
542 items,
543 total,
544 page,
545 per_page,
546 total_pages: ((total as f64) / (per_page as f64)).ceil() as u32,
547 }))
548}
549
550pub async fn list_agent_logs(
552 ctx: Option<Extension<TenantContext>>,
553 Path(name): Path<String>,
554 Query(q): Query<PaginationQuery>,
555) -> Result<Json<Paginated<V1AgentLog>>> {
556 let _tc = extract_tenant(ctx)?;
557 let page = q.page.unwrap_or(1);
558 let per_page = q.per_page.unwrap_or(50);
559 let _ = name;
560 Ok(Json(Paginated::empty(page, per_page)))
561}
562
563pub async fn get_usage(
565 State(state): State<AppState>,
566 ctx: Option<Extension<TenantContext>>,
567) -> Result<Json<V1Usage>> {
568 let tc = extract_tenant(ctx)?;
569 let summary = state.tenant_db.get_usage_summary(&tc.tenant_id).await?;
570
571 let now = Utc::now();
572 let period_start = now
573 .date_naive()
574 .with_day(1)
575 .unwrap()
576 .and_hms_opt(0, 0, 0)
577 .unwrap()
578 .and_utc();
579
580 let quota_runs = if tc.quota.requests_per_month == u64::MAX {
582 None
583 } else {
584 Some(tc.quota.requests_per_month)
585 };
586 let quota_tokens = if tc.quota.tokens_per_month == u64::MAX {
587 None
588 } else {
589 Some(tc.quota.tokens_per_month)
590 };
591
592 Ok(Json(V1Usage {
593 period_start,
594 period_end: now,
595 total_runs: summary.monthly_requests,
596 total_tokens: summary.monthly_tokens,
597 total_api_calls: summary.monthly_requests,
598 quota_runs,
599 quota_tokens,
600 daily_usage: vec![],
601 }))
602}
603
604pub async fn list_api_keys(
606 State(state): State<AppState>,
607 ctx: Option<Extension<TenantContext>>,
608) -> Result<Json<Vec<V1ApiKey>>> {
609 let tc = extract_tenant(ctx)?;
610 let keys = state.tenant_db.list_api_keys(&tc.tenant_id).await?;
611
612 let response: Vec<V1ApiKey> = keys
613 .into_iter()
614 .filter(|k| k.is_active)
615 .map(|k| V1ApiKey {
616 id: k.id,
617 name: k.name,
618 prefix: k.key_prefix,
619 created_at: ts_to_dt(k.created_at),
620 last_used: None,
621 expires_at: k.expires_at.map(|e| ts_to_dt(e)),
622 })
623 .collect();
624
625 Ok(Json(response))
626}
627
628pub async fn create_api_key(
630 State(state): State<AppState>,
631 ctx: Option<Extension<TenantContext>>,
632 Json(payload): Json<CreateApiKeyRequest>,
633) -> Result<Json<CreateApiKeyResponse>> {
634 let tc = extract_tenant(ctx)?;
635 let (api_key, raw_key) = state
636 .tenant_db
637 .create_api_key(&tc.tenant_id, payload.name)
638 .await?;
639
640 Ok(Json(CreateApiKeyResponse {
641 key: V1ApiKey {
642 id: api_key.id,
643 name: api_key.name,
644 prefix: api_key.key_prefix,
645 created_at: ts_to_dt(api_key.created_at),
646 last_used: None,
647 expires_at: api_key.expires_at.map(|e| ts_to_dt(e)),
648 },
649 secret: raw_key,
650 }))
651}
652
653pub async fn revoke_api_key(
655 State(state): State<AppState>,
656 ctx: Option<Extension<TenantContext>>,
657 Path(key_id): Path<String>,
658) -> Result<StatusCode> {
659 let tc = extract_tenant(ctx)?;
660 state
661 .tenant_db
662 .revoke_api_key(&tc.tenant_id, &key_id)
663 .await?;
664 Ok(StatusCode::NO_CONTENT)
665}
666
667pub async fn delete_tenant_data(
670 State(state): State<AppState>,
671 ctx: Option<Extension<TenantContext>>,
672) -> Result<Json<serde_json::Value>> {
673 let tc = extract_tenant(ctx)?;
674 let tid = &tc.tenant_id;
675
676 let pool = state.tenant_db.pool();
677
678 let usage_rows: Vec<i64> = sqlx::query_scalar(
679 "DELETE FROM usage_events WHERE tenant_id = $1 RETURNING 1"
680 )
681 .bind(tid)
682 .fetch_all(pool)
683 .await
684 .unwrap_or_default();
685 let usage_deleted = usage_rows.len() as i64;
686
687 let run_rows: Vec<i64> = sqlx::query_scalar(
688 "DELETE FROM agent_runs WHERE tenant_id = $1 RETURNING 1"
689 )
690 .bind(tid)
691 .fetch_all(pool)
692 .await
693 .unwrap_or_default();
694 let runs_deleted = run_rows.len() as i64;
695
696 let key_rows: Vec<i64> = sqlx::query_scalar(
698 "DELETE FROM api_keys WHERE tenant_id = $1 RETURNING 1"
699 )
700 .bind(tid)
701 .fetch_all(pool)
702 .await
703 .unwrap_or_default();
704 let keys_deleted = key_rows.len() as i64;
705
706 let _ = sqlx::query("DELETE FROM monthly_usage_cache WHERE tenant_id = $1")
708 .bind(tid)
709 .execute(pool)
710 .await;
711
712 Ok(Json(serde_json::json!({
713 "status": "purged",
714 "tenant_id": tid,
715 "usage_events_deleted": usage_deleted,
716 "agent_runs_deleted": runs_deleted,
717 "api_keys_revoked": keys_deleted,
718 "note": "Tenant account retained. All operational data purged per GDPR Article 17."
719 })))
720}
721