1use crate::types::{AppError, Result};
2use serde::{Deserialize, Serialize};
3use sqlx::{PgPool, Row};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6fn now_ts() -> i64 {
7 SystemTime::now()
8 .duration_since(UNIX_EPOCH)
9 .unwrap()
10 .as_secs() as i64
11}
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct AgentRun {
15 pub id: String,
16 pub tenant_id: String,
17 pub agent_name: String,
18 pub user_id: Option<String>,
19 pub status: String,
20 pub input_tokens: i64,
21 pub output_tokens: i64,
22 pub duration_ms: i64,
23 pub error: Option<String>,
24 pub created_at: i64,
25}
26
27#[derive(Debug, Clone, Serialize)]
28pub struct AgentRunStats {
29 pub total_runs: i64,
30 pub success_count: i64,
31 pub failed_count: i64,
32 pub avg_duration_ms: i64,
33 pub total_input_tokens: i64,
34 pub total_output_tokens: i64,
35}
36
37#[derive(Debug, Clone, Serialize)]
38pub struct PlatformStats {
39 pub total_tenants: i64,
40 pub total_agents: i64,
41 pub total_runs_today: i64,
42 pub total_tokens_today: i64,
43 pub active_alerts: i64,
44}
45
46#[derive(Debug, Clone, Serialize)]
47pub struct AllAgentsEntry {
48 pub tenant_id: String,
49 pub tenant_name: String,
50 pub agent_name: String,
51 pub display_name: String,
52 pub model: String,
53 pub enabled: bool,
54 pub total_runs: i64,
55 pub last_run_at: Option<i64>,
56}
57
58pub async fn insert_agent_run(
59 pool: &PgPool,
60 tenant_id: &str,
61 agent_name: &str,
62 user_id: Option<&str>,
63 status: &str,
64 input_tokens: i64,
65 output_tokens: i64,
66 duration_ms: i64,
67 error: Option<&str>,
68 model_name: &str,
69 provider_name: &str,
70 is_streaming: bool,
71) -> Result<String> {
72 let id = uuid::Uuid::new_v4().to_string();
73 let now = now_ts();
74
75 sqlx::query(
76 "INSERT INTO agent_runs (id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at, model_name, provider_name, is_streaming)
77 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)"
78 )
79 .bind(&id)
80 .bind(tenant_id)
81 .bind(agent_name)
82 .bind(user_id)
83 .bind(status)
84 .bind(input_tokens)
85 .bind(output_tokens)
86 .bind(duration_ms)
87 .bind(error)
88 .bind(now)
89 .bind(model_name)
90 .bind(provider_name)
91 .bind(is_streaming)
92 .execute(pool)
93 .await
94 .map_err(|e| AppError::Database(e.to_string()))?;
95
96 Ok(id)
97}
98
99pub async fn list_agent_runs(
100 pool: &PgPool,
101 tenant_id: &str,
102 agent_name: Option<&str>,
103 limit: i64,
104 offset: i64,
105) -> Result<Vec<AgentRun>> {
106 let rows = if let Some(name) = agent_name {
107 sqlx::query(
108 "SELECT id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at
109 FROM agent_runs WHERE tenant_id = $1 AND agent_name = $2
110 ORDER BY created_at DESC LIMIT $3 OFFSET $4"
111 )
112 .bind(tenant_id)
113 .bind(name)
114 .bind(limit)
115 .bind(offset)
116 .fetch_all(pool)
117 .await
118 } else {
119 sqlx::query(
120 "SELECT id, tenant_id, agent_name, user_id, status, input_tokens, output_tokens, duration_ms, error, created_at
121 FROM agent_runs WHERE tenant_id = $1
122 ORDER BY created_at DESC LIMIT $2 OFFSET $3"
123 )
124 .bind(tenant_id)
125 .bind(limit)
126 .bind(offset)
127 .fetch_all(pool)
128 .await
129 }
130 .map_err(|e| AppError::Database(e.to_string()))?;
131
132 rows.iter()
133 .map(|row| {
134 Ok(AgentRun {
135 id: row.get("id"),
136 tenant_id: row.get("tenant_id"),
137 agent_name: row.get("agent_name"),
138 user_id: row.get("user_id"),
139 status: row.get("status"),
140 input_tokens: row.get("input_tokens"),
141 output_tokens: row.get("output_tokens"),
142 duration_ms: row.get("duration_ms"),
143 error: row.get("error"),
144 created_at: row.get("created_at"),
145 })
146 })
147 .collect()
148}
149
150pub async fn get_agent_run_stats(
151 pool: &PgPool,
152 tenant_id: &str,
153 agent_name: &str,
154) -> Result<AgentRunStats> {
155 let row = sqlx::query(
156 "SELECT
157 COUNT(*) as total_runs,
158 COUNT(*) FILTER (WHERE status = 'completed') as success_count,
159 COUNT(*) FILTER (WHERE status = 'failed') as failed_count,
160 COALESCE(AVG(duration_ms), 0)::BIGINT as avg_duration_ms,
161 COALESCE(SUM(input_tokens), 0)::BIGINT as total_input_tokens,
162 COALESCE(SUM(output_tokens), 0)::BIGINT as total_output_tokens
163 FROM agent_runs WHERE tenant_id = $1 AND agent_name = $2",
164 )
165 .bind(tenant_id)
166 .bind(agent_name)
167 .fetch_one(pool)
168 .await
169 .map_err(|e| AppError::Database(e.to_string()))?;
170
171 Ok(AgentRunStats {
172 total_runs: row.get("total_runs"),
173 success_count: row.get("success_count"),
174 failed_count: row.get("failed_count"),
175 avg_duration_ms: row.get("avg_duration_ms"),
176 total_input_tokens: row.get("total_input_tokens"),
177 total_output_tokens: row.get("total_output_tokens"),
178 })
179}
180
181pub async fn get_platform_stats(pool: &PgPool) -> Result<PlatformStats> {
182 let today_start = {
183 let now = now_ts();
184 now - (now % 86400)
185 };
186
187 let row = sqlx::query(
188 "SELECT
189 (SELECT COUNT(*) FROM tenants) as total_tenants,
190 (SELECT COUNT(*) FROM tenant_agents) as total_agents,
191 (SELECT COUNT(*) FROM agent_runs WHERE created_at >= $1) as total_runs_today,
192 (SELECT COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT FROM agent_runs WHERE created_at >= $1) as total_tokens_today,
193 (SELECT COUNT(*) FROM alerts WHERE resolved = FALSE) as active_alerts"
194 )
195 .bind(today_start)
196 .fetch_one(pool)
197 .await
198 .map_err(|e| AppError::Database(e.to_string()))?;
199
200 Ok(PlatformStats {
201 total_tenants: row.get("total_tenants"),
202 total_agents: row.get("total_agents"),
203 total_runs_today: row.get("total_runs_today"),
204 total_tokens_today: row.get("total_tokens_today"),
205 active_alerts: row.get("active_alerts"),
206 })
207}
208
209pub async fn list_all_agents(pool: &PgPool) -> Result<Vec<AllAgentsEntry>> {
210 let rows = sqlx::query(
211 "SELECT
212 ta.tenant_id,
213 t.name as tenant_name,
214 ta.agent_name,
215 ta.display_name,
216 COALESCE(ta.config->>'model', 'unknown') as model,
217 ta.enabled,
218 COALESCE(ar.total_runs, 0) as total_runs,
219 ar.last_run_at
220 FROM tenant_agents ta
221 JOIN tenants t ON t.id = ta.tenant_id
222 LEFT JOIN (
223 SELECT tenant_id, agent_name, COUNT(*) as total_runs, MAX(created_at) as last_run_at
224 FROM agent_runs GROUP BY tenant_id, agent_name
225 ) ar ON ar.tenant_id = ta.tenant_id AND ar.agent_name = ta.agent_name
226 ORDER BY t.name, ta.agent_name",
227 )
228 .fetch_all(pool)
229 .await
230 .map_err(|e| AppError::Database(e.to_string()))?;
231
232 rows.iter()
233 .map(|row| {
234 Ok(AllAgentsEntry {
235 tenant_id: row.get("tenant_id"),
236 tenant_name: row.get("tenant_name"),
237 agent_name: row.get("agent_name"),
238 display_name: row.get("display_name"),
239 model: row.get("model"),
240 enabled: row.get("enabled"),
241 total_runs: row.get("total_runs"),
242 last_run_at: row.get("last_run_at"),
243 })
244 })
245 .collect()
246}