1use sqlx::{PgPool, Row};
2use crate::types::{AppError, Result};
3use serde::{Deserialize, Serialize};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6fn now_ts() -> i64 {
7 SystemTime::now()
8 .duration_since(UNIX_EPOCH)
9 .unwrap()
10 .as_secs() as i64
11}
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct AgentRun {
15 pub id: String,
16 pub tenant_id: String,
17 pub agent_name: String,
18 pub user_id: Option<String>,
19 pub status: String,
20 pub input_tokens: i64,
21 pub output_tokens: i64,
22 pub duration_ms: i64,
23 pub error: Option<String>,
24 pub created_at: i64,
25}
26
27#[derive(Debug, Clone, Serialize)]
28pub struct AgentRunStats {
29 pub total_runs: i64,
30 pub success_count: i64,
31 pub failed_count: i64,
32 pub avg_duration_ms: i64,
33 pub total_input_tokens: i64,
34 pub total_output_tokens: i64,
35}
36
37#[derive(Debug, Clone, Serialize)]
38pub struct PlatformStats {
39 pub total_tenants: i64,
40 pub total_agents: i64,
41 pub total_runs_today: i64,
42 pub total_tokens_today: i64,
43 pub active_alerts: i64,
44}
45
46#[derive(Debug, Clone, Serialize)]
47pub struct AllAgentsEntry {
48 pub tenant_id: String,
49 pub tenant_name: String,
50 pub agent_name: String,
51 pub display_name: String,
52 pub model: String,
53 pub enabled: bool,
54 pub total_runs: i64,
55 pub last_run_at: Option<i64>,
56}
57
58pub async fn insert_agent_run(
59 pool: &PgPool,
60 tenant_id: &str,
61 agent_name: &str,
62 user_id: Option<&str>,
63 status: &str,
64 input_tokens: i64,
65 output_tokens: i64,
66 duration_ms: i64,
67 error: Option<&str>,
68) -> Result<String> {
69 let id = uuid::Uuid::new_v4().to_string();
70 let now = now_ts();
71
72 sqlx::query(
73 "INSERT INTO agent_runs (id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at)
74 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"
75 )
76 .bind(&id)
77 .bind(tenant_id)
78 .bind(agent_name)
79 .bind(user_id)
80 .bind(status)
81 .bind(input_tokens)
82 .bind(output_tokens)
83 .bind(duration_ms)
84 .bind(error)
85 .bind(now)
86 .execute(pool)
87 .await
88 .map_err(|e| AppError::Database(e.to_string()))?;
89
90 Ok(id)
91}
92
93pub async fn list_agent_runs(
94 pool: &PgPool,
95 tenant_id: &str,
96 agent_name: Option<&str>,
97 limit: i64,
98 offset: i64,
99) -> Result<Vec<AgentRun>> {
100 let rows = if let Some(name) = agent_name {
101 sqlx::query(
102 "SELECT id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at
103 FROM agent_runs WHERE tenant_id = $1 AND agent_name = $2
104 ORDER BY created_at DESC LIMIT $3 OFFSET $4"
105 )
106 .bind(tenant_id)
107 .bind(name)
108 .bind(limit)
109 .bind(offset)
110 .fetch_all(pool)
111 .await
112 } else {
113 sqlx::query(
114 "SELECT id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at
115 FROM agent_runs WHERE tenant_id = $1
116 ORDER BY created_at DESC LIMIT $2 OFFSET $3"
117 )
118 .bind(tenant_id)
119 .bind(limit)
120 .bind(offset)
121 .fetch_all(pool)
122 .await
123 }
124 .map_err(|e| AppError::Database(e.to_string()))?;
125
126 rows.iter().map(|row| {
127 Ok(AgentRun {
128 id: row.get("id"),
129 tenant_id: row.get("tenant_id"),
130 agent_name: row.get("agent_name"),
131 user_id: row.get("user_id"),
132 status: row.get("status"),
133 input_tokens: row.get("input_tokens"),
134 output_tokens: row.get("output_tokens"),
135 duration_ms: row.get("duration_ms"),
136 error: row.get("error"),
137 created_at: row.get("created_at"),
138 })
139 }).collect()
140}
141
142pub async fn get_agent_run_stats(
143 pool: &PgPool,
144 tenant_id: &str,
145 agent_name: &str,
146) -> Result<AgentRunStats> {
147 let row = sqlx::query(
148 "SELECT
149 COUNT(*) as total_runs,
150 COUNT(*) FILTER (WHERE status = 'completed') as success_count,
151 COUNT(*) FILTER (WHERE status = 'failed') as failed_count,
152 COALESCE(AVG(duration_ms), 0)::BIGINT as avg_duration_ms,
153 COALESCE(SUM(input_tokens), 0)::BIGINT as total_input_tokens,
154 COALESCE(SUM(output_tokens), 0)::BIGINT as total_output_tokens
155 FROM agent_runs WHERE tenant_id = $1 AND agent_name = $2"
156 )
157 .bind(tenant_id)
158 .bind(agent_name)
159 .fetch_one(pool)
160 .await
161 .map_err(|e| AppError::Database(e.to_string()))?;
162
163 Ok(AgentRunStats {
164 total_runs: row.get("total_runs"),
165 success_count: row.get("success_count"),
166 failed_count: row.get("failed_count"),
167 avg_duration_ms: row.get("avg_duration_ms"),
168 total_input_tokens: row.get("total_input_tokens"),
169 total_output_tokens: row.get("total_output_tokens"),
170 })
171}
172
173pub async fn get_platform_stats(pool: &PgPool) -> Result<PlatformStats> {
174 let today_start = {
175 let now = now_ts();
176 now - (now % 86400)
177 };
178
179 let row = sqlx::query(
180 "SELECT
181 (SELECT COUNT(*) FROM tenants) as total_tenants,
182 (SELECT COUNT(*) FROM tenant_agents) as total_agents,
183 (SELECT COUNT(*) FROM agent_runs WHERE created_at >= $1) as total_runs_today,
184 (SELECT COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT FROM agent_runs WHERE created_at >= $1) as total_tokens_today,
185 (SELECT COUNT(*) FROM alerts WHERE resolved = FALSE) as active_alerts"
186 )
187 .bind(today_start)
188 .fetch_one(pool)
189 .await
190 .map_err(|e| AppError::Database(e.to_string()))?;
191
192 Ok(PlatformStats {
193 total_tenants: row.get("total_tenants"),
194 total_agents: row.get("total_agents"),
195 total_runs_today: row.get("total_runs_today"),
196 total_tokens_today: row.get("total_tokens_today"),
197 active_alerts: row.get("active_alerts"),
198 })
199}
200
201pub async fn list_all_agents(pool: &PgPool) -> Result<Vec<AllAgentsEntry>> {
202 let rows = sqlx::query(
203 "SELECT
204 ta.tenant_id,
205 t.name as tenant_name,
206 ta.agent_name,
207 ta.display_name,
208 COALESCE(ta.config->>'model', 'unknown') as model,
209 ta.enabled,
210 COALESCE(ar.total_runs, 0) as total_runs,
211 ar.last_run_at
212 FROM tenant_agents ta
213 JOIN tenants t ON t.id = ta.tenant_id
214 LEFT JOIN (
215 SELECT tenant_id, agent_name, COUNT(*) as total_runs, MAX(created_at) as last_run_at
216 FROM agent_runs GROUP BY tenant_id, agent_name
217 ) ar ON ar.tenant_id = ta.tenant_id AND ar.agent_name = ta.agent_name
218 ORDER BY t.name, ta.agent_name"
219 )
220 .fetch_all(pool)
221 .await
222 .map_err(|e| AppError::Database(e.to_string()))?;
223
224 rows.iter().map(|row| {
225 Ok(AllAgentsEntry {
226 tenant_id: row.get("tenant_id"),
227 tenant_name: row.get("tenant_name"),
228 agent_name: row.get("agent_name"),
229 display_name: row.get("display_name"),
230 model: row.get("model"),
231 enabled: row.get("enabled"),
232 total_runs: row.get("total_runs"),
233 last_run_at: row.get("last_run_at"),
234 })
235 }).collect()
236}