Skip to main content

ares/db/
agent_runs.rs

1use crate::types::{AppError, Result};
2use serde::{Deserialize, Serialize};
3use sqlx::{PgPool, Row};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6fn now_ts() -> i64 {
7    SystemTime::now()
8        .duration_since(UNIX_EPOCH)
9        .unwrap()
10        .as_secs() as i64
11}
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct AgentRun {
15    pub id: String,
16    pub tenant_id: String,
17    pub agent_name: String,
18    pub user_id: Option<String>,
19    pub status: String,
20    pub input_tokens: i64,
21    pub output_tokens: i64,
22    pub duration_ms: i64,
23    pub error: Option<String>,
24    pub created_at: i64,
25}
26
27#[derive(Debug, Clone, Serialize)]
28pub struct AgentRunStats {
29    pub total_runs: i64,
30    pub success_count: i64,
31    pub failed_count: i64,
32    pub avg_duration_ms: i64,
33    pub total_input_tokens: i64,
34    pub total_output_tokens: i64,
35}
36
37#[derive(Debug, Clone, Serialize)]
38pub struct PlatformStats {
39    pub total_tenants: i64,
40    pub total_agents: i64,
41    pub total_runs_today: i64,
42    pub total_tokens_today: i64,
43    pub active_alerts: i64,
44}
45
46#[derive(Debug, Clone, Serialize)]
47pub struct AllAgentsEntry {
48    pub tenant_id: String,
49    pub tenant_name: String,
50    pub agent_name: String,
51    pub display_name: String,
52    pub model: String,
53    pub enabled: bool,
54    pub total_runs: i64,
55    pub last_run_at: Option<i64>,
56}
57
58pub async fn insert_agent_run(
59    pool: &PgPool,
60    tenant_id: &str,
61    agent_name: &str,
62    user_id: Option<&str>,
63    status: &str,
64    input_tokens: i64,
65    output_tokens: i64,
66    duration_ms: i64,
67    error: Option<&str>,
68    model_name: &str,
69    provider_name: &str,
70    is_streaming: bool,
71) -> Result<String> {
72    let id = uuid::Uuid::new_v4().to_string();
73    let now = now_ts();
74
75    sqlx::query(
76        "INSERT INTO agent_runs (id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at, model_name, provider_name, is_streaming)
77         VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)"
78    )
79    .bind(&id)
80    .bind(tenant_id)
81    .bind(agent_name)
82    .bind(user_id)
83    .bind(status)
84    .bind(input_tokens)
85    .bind(output_tokens)
86    .bind(duration_ms)
87    .bind(error)
88    .bind(now)
89    .bind(model_name)
90    .bind(provider_name)
91    .bind(is_streaming)
92    .execute(pool)
93    .await
94    .map_err(|e| AppError::Database(e.to_string()))?;
95
96    Ok(id)
97}
98
99pub async fn list_agent_runs(
100    pool: &PgPool,
101    tenant_id: &str,
102    agent_name: Option<&str>,
103    limit: i64,
104    offset: i64,
105) -> Result<Vec<AgentRun>> {
106    let rows = if let Some(name) = agent_name {
107        sqlx::query(
108            "SELECT id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at
109             FROM agent_runs WHERE tenant_id = $1 AND agent_name = $2
110             ORDER BY created_at DESC LIMIT $3 OFFSET $4"
111        )
112        .bind(tenant_id)
113        .bind(name)
114        .bind(limit)
115        .bind(offset)
116        .fetch_all(pool)
117        .await
118    } else {
119        sqlx::query(
120            "SELECT id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at
121             FROM agent_runs WHERE tenant_id = $1
122             ORDER BY created_at DESC LIMIT $2 OFFSET $3"
123        )
124        .bind(tenant_id)
125        .bind(limit)
126        .bind(offset)
127        .fetch_all(pool)
128        .await
129    }
130    .map_err(|e| AppError::Database(e.to_string()))?;
131
132    rows.iter()
133        .map(|row| {
134            Ok(AgentRun {
135                id: row.get("id"),
136                tenant_id: row.get("tenant_id"),
137                agent_name: row.get("agent_name"),
138                user_id: row.get("user_id"),
139                status: row.get("status"),
140                input_tokens: row.get("input_tokens"),
141                output_tokens: row.get("output_tokens"),
142                duration_ms: row.get("duration_ms"),
143                error: row.get("error"),
144                created_at: row.get("created_at"),
145            })
146        })
147        .collect()
148}
149
150pub async fn get_agent_run_stats(
151    pool: &PgPool,
152    tenant_id: &str,
153    agent_name: &str,
154) -> Result<AgentRunStats> {
155    let row = sqlx::query(
156        "SELECT
157            COUNT(*) as total_runs,
158            COUNT(*) FILTER (WHERE status = 'completed') as success_count,
159            COUNT(*) FILTER (WHERE status = 'failed') as failed_count,
160            COALESCE(AVG(duration_ms), 0)::BIGINT as avg_duration_ms,
161            COALESCE(SUM(input_tokens), 0)::BIGINT as total_input_tokens,
162            COALESCE(SUM(output_tokens), 0)::BIGINT as total_output_tokens
163         FROM agent_runs WHERE tenant_id = $1 AND agent_name = $2",
164    )
165    .bind(tenant_id)
166    .bind(agent_name)
167    .fetch_one(pool)
168    .await
169    .map_err(|e| AppError::Database(e.to_string()))?;
170
171    Ok(AgentRunStats {
172        total_runs: row.get("total_runs"),
173        success_count: row.get("success_count"),
174        failed_count: row.get("failed_count"),
175        avg_duration_ms: row.get("avg_duration_ms"),
176        total_input_tokens: row.get("total_input_tokens"),
177        total_output_tokens: row.get("total_output_tokens"),
178    })
179}
180
181pub async fn get_platform_stats(pool: &PgPool) -> Result<PlatformStats> {
182    let today_start = {
183        let now = now_ts();
184        now - (now % 86400)
185    };
186
187    let row = sqlx::query(
188        "SELECT
189            (SELECT COUNT(*) FROM tenants) as total_tenants,
190            (SELECT COUNT(*) FROM tenant_agents) as total_agents,
191            (SELECT COUNT(*) FROM agent_runs WHERE created_at >= $1) as total_runs_today,
192            (SELECT COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT FROM agent_runs WHERE created_at >= $1) as total_tokens_today,
193            (SELECT COUNT(*) FROM alerts WHERE resolved = FALSE) as active_alerts"
194    )
195    .bind(today_start)
196    .fetch_one(pool)
197    .await
198    .map_err(|e| AppError::Database(e.to_string()))?;
199
200    Ok(PlatformStats {
201        total_tenants: row.get("total_tenants"),
202        total_agents: row.get("total_agents"),
203        total_runs_today: row.get("total_runs_today"),
204        total_tokens_today: row.get("total_tokens_today"),
205        active_alerts: row.get("active_alerts"),
206    })
207}
208
209pub async fn list_all_agents(pool: &PgPool) -> Result<Vec<AllAgentsEntry>> {
210    let rows = sqlx::query(
211        "SELECT
212            ta.tenant_id,
213            t.name as tenant_name,
214            ta.agent_name,
215            ta.display_name,
216            COALESCE(ta.config->>'model', 'unknown') as model,
217            ta.enabled,
218            COALESCE(ar.total_runs, 0) as total_runs,
219            ar.last_run_at
220         FROM tenant_agents ta
221         JOIN tenants t ON t.id = ta.tenant_id
222         LEFT JOIN (
223            SELECT tenant_id, agent_name, COUNT(*) as total_runs, MAX(created_at) as last_run_at
224            FROM agent_runs GROUP BY tenant_id, agent_name
225         ) ar ON ar.tenant_id = ta.tenant_id AND ar.agent_name = ta.agent_name
226         ORDER BY t.name, ta.agent_name",
227    )
228    .fetch_all(pool)
229    .await
230    .map_err(|e| AppError::Database(e.to_string()))?;
231
232    rows.iter()
233        .map(|row| {
234            Ok(AllAgentsEntry {
235                tenant_id: row.get("tenant_id"),
236                tenant_name: row.get("tenant_name"),
237                agent_name: row.get("agent_name"),
238                display_name: row.get("display_name"),
239                model: row.get("model"),
240                enabled: row.get("enabled"),
241                total_runs: row.get("total_runs"),
242                last_run_at: row.get("last_run_at"),
243            })
244        })
245        .collect()
246}