use crate::agents::AgentResponse;
use crate::db::agent_runs;
use crate::db::tenant_agents::{self, TenantAgent};
use crate::memory::estimate_tokens;
use crate::models::TenantContext;
use crate::types::{AgentContext, AgentType, AppError, ChatRequest, ChatResponse, Result};
use crate::AppState;
use axum::{
extract::{Extension, Path, Query, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use chrono::{DateTime, Datelike, TimeZone, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize)]
pub struct V1Agent {
pub id: String,
pub name: String,
pub agent_type: String,
pub status: V1AgentStatus,
pub config: serde_json::Value,
pub created_at: DateTime<Utc>,
pub last_run: Option<DateTime<Utc>>,
pub total_runs: u64,
pub success_rate: f64,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum V1AgentStatus {
Active,
Idle,
Error,
Disabled,
}
impl From<TenantAgent> for V1Agent {
fn from(a: TenantAgent) -> Self {
let status = if a.enabled {
V1AgentStatus::Active
} else {
V1AgentStatus::Disabled
};
Self {
id: a.id,
name: a.agent_name,
agent_type: "custom".to_string(),
status,
config: a.config,
created_at: ts_to_dt(a.created_at),
last_run: None,
total_runs: 0,
success_rate: 0.0,
}
}
}
#[derive(Debug, Serialize)]
pub struct V1AgentRun {
pub id: String,
pub agent_id: String,
pub status: String,
pub input: serde_json::Value,
pub output: Option<serde_json::Value>,
pub error: Option<String>,
pub started_at: DateTime<Utc>,
pub finished_at: Option<DateTime<Utc>>,
pub duration_ms: Option<u64>,
pub tokens_used: Option<u64>,
}
#[derive(Debug, Serialize)]
pub struct V1AgentLog {
pub id: String,
pub agent_id: String,
pub run_id: Option<String>,
pub level: String,
pub message: String,
pub metadata: Option<serde_json::Value>,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Serialize)]
pub struct Paginated<T> {
pub items: Vec<T>,
pub total: u64,
pub page: u32,
pub per_page: u32,
pub total_pages: u32,
}
impl<T> Paginated<T> {
fn empty(page: u32, per_page: u32) -> Self {
Self {
items: vec![],
total: 0,
page,
per_page,
total_pages: 0,
}
}
}
#[derive(Debug, Serialize)]
pub struct V1Usage {
pub period_start: DateTime<Utc>,
pub period_end: DateTime<Utc>,
pub total_runs: u64,
pub total_tokens: u64,
pub total_api_calls: u64,
pub quota_runs: Option<u64>,
pub quota_tokens: Option<u64>,
pub daily_usage: Vec<DailyUsage>,
}
#[derive(Debug, Serialize)]
pub struct DailyUsage {
pub date: String,
pub runs: u64,
pub tokens: u64,
pub api_calls: u64,
}
#[derive(Debug, Serialize)]
pub struct V1ApiKey {
pub id: String,
pub name: String,
pub prefix: String,
pub created_at: DateTime<Utc>,
pub last_used: Option<DateTime<Utc>>,
pub expires_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Deserialize)]
pub struct CreateApiKeyRequest {
pub name: String,
pub expires_in_days: Option<u32>,
}
#[derive(Debug, Serialize)]
pub struct CreateApiKeyResponse {
pub key: V1ApiKey,
pub secret: String,
}
#[derive(Debug, Deserialize)]
pub struct PaginationQuery {
pub page: Option<u32>,
pub per_page: Option<u32>,
}
fn ts_to_dt(ts: i64) -> DateTime<Utc> {
Utc.timestamp_opt(ts, 0).single().unwrap_or_else(Utc::now)
}
fn extract_tenant(ctx: Option<Extension<TenantContext>>) -> Result<TenantContext> {
ctx.map(|Extension(c)| c)
.ok_or_else(|| AppError::Auth("Missing tenant context".to_string()))
}
pub async fn v1_chat(
State(state): State<AppState>,
ctx: Option<Extension<TenantContext>>,
Json(payload): Json<ChatRequest>,
) -> Result<axum::response::Response> {
let tc = extract_tenant(ctx)?;
if state.emergency_stop.load(std::sync::atomic::Ordering::Relaxed) {
return Err(crate::types::AppError::Unavailable(
"All agents are currently under human review. Please try again later.".to_string(),
));
}
let agent_context = AgentContext {
user_id: tc.tenant_id.clone(),
session_id: uuid::Uuid::new_v4().to_string(),
conversation_history: vec![],
user_memory: None,
};
let agent_type = if let Some(at) = payload.agent_type {
at
} else {
AgentType::Orchestrator
};
let agent_name = crate::agents::registry::AgentRegistry::type_to_name(&agent_type).to_string();
let start = std::time::Instant::now();
use crate::agents::Agent;
let agent = state.agent_registry.create_agent(&agent_name).await?;
let response = agent.execute(&payload.message, &agent_context).await?;
let duration_ms = start.elapsed().as_millis() as i64;
let response_text = response.content;
let model_name = response.metadata.as_ref()
.map(|m| m.model_name.clone())
.unwrap_or_else(|| "unknown".to_string());
let provider_name = response.metadata.as_ref()
.map(|m| m.provider_name.clone())
.unwrap_or_else(|| "unknown".to_string());
let (input_tokens, output_tokens) = if let Some(u) = response.usage {
(u.prompt_tokens, u.completion_tokens)
} else {
(
estimate_tokens(&payload.message) as u32,
estimate_tokens(&response_text) as u32,
)
};
{
let pool = state.tenant_db.pool().clone();
let tid = tc.tenant_id.clone();
let aname = agent_name;
let itok = input_tokens as i64;
let otok = output_tokens as i64;
let mname = model_name.clone();
let pname = provider_name.clone();
tokio::spawn(async move {
let _ = agent_runs::insert_agent_run(
&pool,
&tid,
&aname,
None,
"completed",
itok,
otok,
duration_ms,
None,
&mname,
&pname,
false,
)
.await;
});
}
let chat_response = ChatResponse {
response: response_text,
agent: format!("{:?} (system)", agent_type),
context_id: agent_context.session_id,
sources: None,
};
let body = Json(chat_response);
let mut response = body.into_response();
response.headers_mut().insert(
axum::http::HeaderName::from_static("x-input-tokens"),
axum::http::HeaderValue::from(input_tokens),
);
response.headers_mut().insert(
axum::http::HeaderName::from_static("x-output-tokens"),
axum::http::HeaderValue::from(output_tokens),
);
if let Ok(v) = axum::http::HeaderValue::from_str(&model_name) {
response.headers_mut().insert(
axum::http::HeaderName::from_static("x-model-name"), v,
);
}
if let Ok(v) = axum::http::HeaderValue::from_str(&provider_name) {
response.headers_mut().insert(
axum::http::HeaderName::from_static("x-provider-name"), v,
);
}
Ok(response)
}
pub async fn list_agents(
State(state): State<AppState>,
ctx: Option<Extension<TenantContext>>,
Query(q): Query<PaginationQuery>,
) -> Result<Json<Paginated<V1Agent>>> {
let tc = extract_tenant(ctx)?;
let page = q.page.unwrap_or(1).max(1);
let per_page = q.per_page.unwrap_or(20).min(100);
let agents = tenant_agents::list_tenant_agents(state.tenant_db.pool(), &tc.tenant_id).await?;
let total = agents.len() as u64;
let total_pages = ((total as f64) / (per_page as f64)).ceil() as u32;
let start = ((page - 1) * per_page) as usize;
let items: Vec<V1Agent> = agents
.into_iter()
.skip(start)
.take(per_page as usize)
.map(V1Agent::from)
.collect();
Ok(Json(Paginated {
items,
total,
page,
per_page,
total_pages,
}))
}
pub async fn get_agent(
State(state): State<AppState>,
ctx: Option<Extension<TenantContext>>,
Path(name): Path<String>,
) -> Result<Json<V1Agent>> {
let tc = extract_tenant(ctx)?;
let agent =
tenant_agents::get_tenant_agent(state.tenant_db.pool(), &tc.tenant_id, &name).await?;
Ok(Json(V1Agent::from(agent)))
}
pub async fn run_agent(
State(state): State<AppState>,
ctx: Option<Extension<TenantContext>>,
Path(name): Path<String>,
Json(input): Json<serde_json::Value>,
) -> Result<Json<V1AgentRun>> {
let tc = extract_tenant(ctx)?;
if state.emergency_stop.load(std::sync::atomic::Ordering::Relaxed) {
return Err(crate::types::AppError::Unavailable(
"All agents are currently under human review. Please try again later.".to_string(),
));
}
let _agent =
tenant_agents::get_tenant_agent(state.tenant_db.pool(), &tc.tenant_id, &name).await?;
let message = input
.get("message")
.or_else(|| input.get("input"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| serde_json::to_string(&input).unwrap_or_default());
let agent_context = AgentContext {
user_id: tc.tenant_id.clone(),
session_id: uuid::Uuid::new_v4().to_string(),
conversation_history: vec![],
user_memory: None,
};
let start = std::time::Instant::now();
use crate::agents::Agent;
let agent = state.agent_registry.create_agent(&name).await?;
let result = agent.execute(&message, &agent_context).await;
let duration_ms = start.elapsed().as_millis() as u64;
match result {
Ok(response) => {
let (input_tokens, output_tokens) = if let Some(ref u) = response.usage {
(u.prompt_tokens as u64, u.completion_tokens as u64)
} else {
(
estimate_tokens(&message) as u64,
estimate_tokens(&response.content) as u64,
)
};
let model_name = response
.metadata
.as_ref()
.map(|m| m.model_name.clone())
.unwrap_or_else(|| "unknown".to_string());
let provider_name = response
.metadata
.as_ref()
.map(|m| m.provider_name.clone())
.unwrap_or_else(|| "unknown".to_string());
let run_id = uuid::Uuid::new_v4().to_string();
{
let pool = state.tenant_db.pool().clone();
let tid = tc.tenant_id.clone();
let aname = name.clone();
let rid = run_id.clone();
let itok = input_tokens as i64;
let otok = output_tokens as i64;
let dur = duration_ms as i64;
let mname = model_name.clone();
let pname = provider_name.clone();
tokio::spawn(async move {
let _ = agent_runs::insert_agent_run(
&pool, &tid, &aname, None, "completed", itok, otok, dur, None,
&mname, &pname, false,
)
.await;
});
}
Ok(Json(V1AgentRun {
id: run_id,
agent_id: name,
status: "completed".to_string(),
input,
output: Some(serde_json::json!({"response": response.content})),
error: None,
started_at: Utc::now(),
finished_at: Some(Utc::now()),
duration_ms: Some(duration_ms),
tokens_used: Some(input_tokens + output_tokens),
}))
}
Err(e) => {
let run_id = uuid::Uuid::new_v4().to_string();
{
let pool = state.tenant_db.pool().clone();
let tid = tc.tenant_id.clone();
let aname = name.clone();
let err_msg = e.to_string();
let dur = duration_ms as i64;
tokio::spawn(async move {
let _ = agent_runs::insert_agent_run(
&pool, &tid, &aname, None, "failed", 0, 0, dur,
Some(&err_msg), "unknown", "unknown", false,
)
.await;
});
}
Ok(Json(V1AgentRun {
id: run_id,
agent_id: name,
status: "failed".to_string(),
input,
output: None,
error: Some(e.to_string()),
started_at: Utc::now(),
finished_at: Some(Utc::now()),
duration_ms: Some(duration_ms),
tokens_used: Some(0),
}))
}
}
}
pub async fn list_agent_runs(
State(state): State<AppState>,
ctx: Option<Extension<TenantContext>>,
Path(name): Path<String>,
Query(q): Query<PaginationQuery>,
) -> Result<Json<Paginated<V1AgentRun>>> {
let tc = extract_tenant(ctx)?;
let page = q.page.unwrap_or(1).max(1);
let per_page = q.per_page.unwrap_or(25).min(100);
let offset = ((page - 1) * per_page) as i64;
let runs = agent_runs::list_agent_runs(
state.tenant_db.pool(),
&tc.tenant_id,
Some(&name),
per_page as i64,
offset,
)
.await?;
let items: Vec<V1AgentRun> = runs
.into_iter()
.map(|r| V1AgentRun {
id: r.id,
agent_id: r.agent_name,
status: r.status,
input: serde_json::json!({"tokens": r.input_tokens}),
output: Some(serde_json::json!({"tokens": r.output_tokens})),
error: r.error,
started_at: ts_to_dt(r.created_at),
finished_at: Some(ts_to_dt(r.created_at + (r.duration_ms / 1000))),
duration_ms: Some(r.duration_ms as u64),
tokens_used: Some((r.input_tokens + r.output_tokens) as u64),
})
.collect();
let total = items.len() as u64;
Ok(Json(Paginated {
items,
total,
page,
per_page,
total_pages: ((total as f64) / (per_page as f64)).ceil() as u32,
}))
}
pub async fn list_agent_logs(
ctx: Option<Extension<TenantContext>>,
Path(name): Path<String>,
Query(q): Query<PaginationQuery>,
) -> Result<Json<Paginated<V1AgentLog>>> {
let _tc = extract_tenant(ctx)?;
let page = q.page.unwrap_or(1);
let per_page = q.per_page.unwrap_or(50);
let _ = name;
Ok(Json(Paginated::empty(page, per_page)))
}
pub async fn get_usage(
State(state): State<AppState>,
ctx: Option<Extension<TenantContext>>,
) -> Result<Json<V1Usage>> {
let tc = extract_tenant(ctx)?;
let summary = state.tenant_db.get_usage_summary(&tc.tenant_id).await?;
let now = Utc::now();
let period_start = now
.date_naive()
.with_day(1)
.unwrap()
.and_hms_opt(0, 0, 0)
.unwrap()
.and_utc();
let quota_runs = if tc.quota.requests_per_month == u64::MAX {
None
} else {
Some(tc.quota.requests_per_month)
};
let quota_tokens = if tc.quota.tokens_per_month == u64::MAX {
None
} else {
Some(tc.quota.tokens_per_month)
};
Ok(Json(V1Usage {
period_start,
period_end: now,
total_runs: summary.monthly_requests,
total_tokens: summary.monthly_tokens,
total_api_calls: summary.monthly_requests,
quota_runs,
quota_tokens,
daily_usage: vec![],
}))
}
pub async fn list_api_keys(
State(state): State<AppState>,
ctx: Option<Extension<TenantContext>>,
) -> Result<Json<Vec<V1ApiKey>>> {
let tc = extract_tenant(ctx)?;
let keys = state.tenant_db.list_api_keys(&tc.tenant_id).await?;
let response: Vec<V1ApiKey> = keys
.into_iter()
.filter(|k| k.is_active)
.map(|k| V1ApiKey {
id: k.id,
name: k.name,
prefix: k.key_prefix,
created_at: ts_to_dt(k.created_at),
last_used: None,
expires_at: k.expires_at.map(|e| ts_to_dt(e)),
})
.collect();
Ok(Json(response))
}
pub async fn create_api_key(
State(state): State<AppState>,
ctx: Option<Extension<TenantContext>>,
Json(payload): Json<CreateApiKeyRequest>,
) -> Result<Json<CreateApiKeyResponse>> {
let tc = extract_tenant(ctx)?;
let (api_key, raw_key) = state
.tenant_db
.create_api_key(&tc.tenant_id, payload.name)
.await?;
Ok(Json(CreateApiKeyResponse {
key: V1ApiKey {
id: api_key.id,
name: api_key.name,
prefix: api_key.key_prefix,
created_at: ts_to_dt(api_key.created_at),
last_used: None,
expires_at: api_key.expires_at.map(|e| ts_to_dt(e)),
},
secret: raw_key,
}))
}
pub async fn revoke_api_key(
State(state): State<AppState>,
ctx: Option<Extension<TenantContext>>,
Path(key_id): Path<String>,
) -> Result<StatusCode> {
let tc = extract_tenant(ctx)?;
state
.tenant_db
.revoke_api_key(&tc.tenant_id, &key_id)
.await?;
Ok(StatusCode::NO_CONTENT)
}
pub async fn delete_tenant_data(
State(state): State<AppState>,
ctx: Option<Extension<TenantContext>>,
) -> Result<Json<serde_json::Value>> {
let tc = extract_tenant(ctx)?;
let tid = &tc.tenant_id;
let pool = state.tenant_db.pool();
let usage_rows: Vec<i64> = sqlx::query_scalar(
"DELETE FROM usage_events WHERE tenant_id = $1 RETURNING 1"
)
.bind(tid)
.fetch_all(pool)
.await
.unwrap_or_default();
let usage_deleted = usage_rows.len() as i64;
let run_rows: Vec<i64> = sqlx::query_scalar(
"DELETE FROM agent_runs WHERE tenant_id = $1 RETURNING 1"
)
.bind(tid)
.fetch_all(pool)
.await
.unwrap_or_default();
let runs_deleted = run_rows.len() as i64;
let key_rows: Vec<i64> = sqlx::query_scalar(
"DELETE FROM api_keys WHERE tenant_id = $1 RETURNING 1"
)
.bind(tid)
.fetch_all(pool)
.await
.unwrap_or_default();
let keys_deleted = key_rows.len() as i64;
let _ = sqlx::query("DELETE FROM monthly_usage_cache WHERE tenant_id = $1")
.bind(tid)
.execute(pool)
.await;
Ok(Json(serde_json::json!({
"status": "purged",
"tenant_id": tid,
"usage_events_deleted": usage_deleted,
"agent_runs_deleted": runs_deleted,
"api_keys_revoked": keys_deleted,
"note": "Tenant account retained. All operational data purged per GDPR Article 17."
})))
}