Skip to main content

ares/db/
agent_runs.rs

1use sqlx::{PgPool, Row};
2use crate::types::{AppError, Result};
3use serde::{Deserialize, Serialize};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6fn now_ts() -> i64 {
7    SystemTime::now()
8        .duration_since(UNIX_EPOCH)
9        .unwrap()
10        .as_secs() as i64
11}
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct AgentRun {
15    pub id: String,
16    pub tenant_id: String,
17    pub agent_name: String,
18    pub user_id: Option<String>,
19    pub status: String,
20    pub input_tokens: i64,
21    pub output_tokens: i64,
22    pub duration_ms: i64,
23    pub error: Option<String>,
24    pub created_at: i64,
25}
26
27#[derive(Debug, Clone, Serialize)]
28pub struct AgentRunStats {
29    pub total_runs: i64,
30    pub success_count: i64,
31    pub failed_count: i64,
32    pub avg_duration_ms: i64,
33    pub total_input_tokens: i64,
34    pub total_output_tokens: i64,
35}
36
37#[derive(Debug, Clone, Serialize)]
38pub struct PlatformStats {
39    pub total_tenants: i64,
40    pub total_agents: i64,
41    pub total_runs_today: i64,
42    pub total_tokens_today: i64,
43    pub active_alerts: i64,
44}
45
46#[derive(Debug, Clone, Serialize)]
47pub struct AllAgentsEntry {
48    pub tenant_id: String,
49    pub tenant_name: String,
50    pub agent_name: String,
51    pub display_name: String,
52    pub model: String,
53    pub enabled: bool,
54    pub total_runs: i64,
55    pub last_run_at: Option<i64>,
56}
57
58pub async fn insert_agent_run(
59    pool: &PgPool,
60    tenant_id: &str,
61    agent_name: &str,
62    user_id: Option<&str>,
63    status: &str,
64    input_tokens: i64,
65    output_tokens: i64,
66    duration_ms: i64,
67    error: Option<&str>,
68) -> Result<String> {
69    let id = uuid::Uuid::new_v4().to_string();
70    let now = now_ts();
71
72    sqlx::query(
73        "INSERT INTO agent_runs (id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at)
74         VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"
75    )
76    .bind(&id)
77    .bind(tenant_id)
78    .bind(agent_name)
79    .bind(user_id)
80    .bind(status)
81    .bind(input_tokens)
82    .bind(output_tokens)
83    .bind(duration_ms)
84    .bind(error)
85    .bind(now)
86    .execute(pool)
87    .await
88    .map_err(|e| AppError::Database(e.to_string()))?;
89
90    Ok(id)
91}
92
93pub async fn list_agent_runs(
94    pool: &PgPool,
95    tenant_id: &str,
96    agent_name: Option<&str>,
97    limit: i64,
98    offset: i64,
99) -> Result<Vec<AgentRun>> {
100    let rows = if let Some(name) = agent_name {
101        sqlx::query(
102            "SELECT id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at
103             FROM agent_runs WHERE tenant_id = $1 AND agent_name = $2
104             ORDER BY created_at DESC LIMIT $3 OFFSET $4"
105        )
106        .bind(tenant_id)
107        .bind(name)
108        .bind(limit)
109        .bind(offset)
110        .fetch_all(pool)
111        .await
112    } else {
113        sqlx::query(
114            "SELECT id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at
115             FROM agent_runs WHERE tenant_id = $1
116             ORDER BY created_at DESC LIMIT $2 OFFSET $3"
117        )
118        .bind(tenant_id)
119        .bind(limit)
120        .bind(offset)
121        .fetch_all(pool)
122        .await
123    }
124    .map_err(|e| AppError::Database(e.to_string()))?;
125
126    rows.iter().map(|row| {
127        Ok(AgentRun {
128            id: row.get("id"),
129            tenant_id: row.get("tenant_id"),
130            agent_name: row.get("agent_name"),
131            user_id: row.get("user_id"),
132            status: row.get("status"),
133            input_tokens: row.get("input_tokens"),
134            output_tokens: row.get("output_tokens"),
135            duration_ms: row.get("duration_ms"),
136            error: row.get("error"),
137            created_at: row.get("created_at"),
138        })
139    }).collect()
140}
141
142pub async fn get_agent_run_stats(
143    pool: &PgPool,
144    tenant_id: &str,
145    agent_name: &str,
146) -> Result<AgentRunStats> {
147    let row = sqlx::query(
148        "SELECT
149            COUNT(*) as total_runs,
150            COUNT(*) FILTER (WHERE status = 'completed') as success_count,
151            COUNT(*) FILTER (WHERE status = 'failed') as failed_count,
152            COALESCE(AVG(duration_ms), 0)::BIGINT as avg_duration_ms,
153            COALESCE(SUM(input_tokens), 0)::BIGINT as total_input_tokens,
154            COALESCE(SUM(output_tokens), 0)::BIGINT as total_output_tokens
155         FROM agent_runs WHERE tenant_id = $1 AND agent_name = $2"
156    )
157    .bind(tenant_id)
158    .bind(agent_name)
159    .fetch_one(pool)
160    .await
161    .map_err(|e| AppError::Database(e.to_string()))?;
162
163    Ok(AgentRunStats {
164        total_runs: row.get("total_runs"),
165        success_count: row.get("success_count"),
166        failed_count: row.get("failed_count"),
167        avg_duration_ms: row.get("avg_duration_ms"),
168        total_input_tokens: row.get("total_input_tokens"),
169        total_output_tokens: row.get("total_output_tokens"),
170    })
171}
172
173pub async fn get_platform_stats(pool: &PgPool) -> Result<PlatformStats> {
174    let today_start = {
175        let now = now_ts();
176        now - (now % 86400)
177    };
178
179    let row = sqlx::query(
180        "SELECT
181            (SELECT COUNT(*) FROM tenants) as total_tenants,
182            (SELECT COUNT(*) FROM tenant_agents) as total_agents,
183            (SELECT COUNT(*) FROM agent_runs WHERE created_at >= $1) as total_runs_today,
184            (SELECT COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT FROM agent_runs WHERE created_at >= $1) as total_tokens_today,
185            (SELECT COUNT(*) FROM alerts WHERE resolved = FALSE) as active_alerts"
186    )
187    .bind(today_start)
188    .fetch_one(pool)
189    .await
190    .map_err(|e| AppError::Database(e.to_string()))?;
191
192    Ok(PlatformStats {
193        total_tenants: row.get("total_tenants"),
194        total_agents: row.get("total_agents"),
195        total_runs_today: row.get("total_runs_today"),
196        total_tokens_today: row.get("total_tokens_today"),
197        active_alerts: row.get("active_alerts"),
198    })
199}
200
201pub async fn list_all_agents(pool: &PgPool) -> Result<Vec<AllAgentsEntry>> {
202    let rows = sqlx::query(
203        "SELECT
204            ta.tenant_id,
205            t.name as tenant_name,
206            ta.agent_name,
207            ta.display_name,
208            COALESCE(ta.config->>'model', 'unknown') as model,
209            ta.enabled,
210            COALESCE(ar.total_runs, 0) as total_runs,
211            ar.last_run_at
212         FROM tenant_agents ta
213         JOIN tenants t ON t.id = ta.tenant_id
214         LEFT JOIN (
215            SELECT tenant_id, agent_name, COUNT(*) as total_runs, MAX(created_at) as last_run_at
216            FROM agent_runs GROUP BY tenant_id, agent_name
217         ) ar ON ar.tenant_id = ta.tenant_id AND ar.agent_name = ta.agent_name
218         ORDER BY t.name, ta.agent_name"
219    )
220    .fetch_all(pool)
221    .await
222    .map_err(|e| AppError::Database(e.to_string()))?;
223
224    rows.iter().map(|row| {
225        Ok(AllAgentsEntry {
226            tenant_id: row.get("tenant_id"),
227            tenant_name: row.get("tenant_name"),
228            agent_name: row.get("agent_name"),
229            display_name: row.get("display_name"),
230            model: row.get("model"),
231            enabled: row.get("enabled"),
232            total_runs: row.get("total_runs"),
233            last_run_at: row.get("last_run_at"),
234        })
235    }).collect()
236}