systemprompt_analytics/repository/
costs.rs1use crate::Result;
2use chrono::{DateTime, Utc};
3use sqlx::PgPool;
4use std::sync::Arc;
5use systemprompt_database::DbPool;
6use systemprompt_identifiers::{ContextId, UserId};
7
8use crate::models::cli::{
9 ContextGroupRow, ContextSummaryRow, CostBreakdownRow, CostSummaryRow, CostTrendRow,
10 PreviousCostRow, RecentContextRow,
11};
12
13#[derive(Debug)]
14pub struct CostAnalyticsRepository {
15 pool: Arc<PgPool>,
16}
17
18impl CostAnalyticsRepository {
19 pub fn new(db: &DbPool) -> Result<Self> {
20 let pool = db.pool_arc()?;
21 Ok(Self { pool })
22 }
23
24 pub async fn get_summary(
25 &self,
26 start: DateTime<Utc>,
27 end: DateTime<Utc>,
28 ) -> Result<CostSummaryRow> {
29 sqlx::query_as!(
30 CostSummaryRow,
31 r#"
32 SELECT
33 COUNT(*)::bigint as "requests!",
34 SUM(cost_microdollars)::bigint as "cost",
35 SUM(tokens_used)::bigint as "tokens"
36 FROM ai_requests
37 WHERE created_at >= $1 AND created_at < $2
38 "#,
39 start,
40 end
41 )
42 .fetch_one(&*self.pool)
43 .await
44 .map_err(Into::into)
45 }
46
47 pub async fn get_previous_cost(
48 &self,
49 start: DateTime<Utc>,
50 end: DateTime<Utc>,
51 ) -> Result<PreviousCostRow> {
52 sqlx::query_as!(
53 PreviousCostRow,
54 r#"
55 SELECT SUM(cost_microdollars)::bigint as "cost"
56 FROM ai_requests
57 WHERE created_at >= $1 AND created_at < $2
58 "#,
59 start,
60 end
61 )
62 .fetch_one(&*self.pool)
63 .await
64 .map_err(Into::into)
65 }
66
67 pub async fn get_breakdown_by_model(
68 &self,
69 start: DateTime<Utc>,
70 end: DateTime<Utc>,
71 limit: i64,
72 ) -> Result<Vec<CostBreakdownRow>> {
73 sqlx::query_as!(
74 CostBreakdownRow,
75 r#"
76 SELECT
77 model as "name!",
78 COALESCE(SUM(cost_microdollars), 0)::bigint as "cost!",
79 COUNT(*)::bigint as "requests!",
80 COALESCE(SUM(tokens_used), 0)::bigint as "tokens!"
81 FROM ai_requests
82 WHERE created_at >= $1 AND created_at < $2
83 GROUP BY model
84 ORDER BY SUM(cost_microdollars) DESC NULLS LAST
85 LIMIT $3
86 "#,
87 start,
88 end,
89 limit
90 )
91 .fetch_all(&*self.pool)
92 .await
93 .map_err(Into::into)
94 }
95
96 pub async fn get_breakdown_by_provider(
97 &self,
98 start: DateTime<Utc>,
99 end: DateTime<Utc>,
100 limit: i64,
101 ) -> Result<Vec<CostBreakdownRow>> {
102 sqlx::query_as!(
103 CostBreakdownRow,
104 r#"
105 SELECT
106 provider as "name!",
107 COALESCE(SUM(cost_microdollars), 0)::bigint as "cost!",
108 COUNT(*)::bigint as "requests!",
109 COALESCE(SUM(tokens_used), 0)::bigint as "tokens!"
110 FROM ai_requests
111 WHERE created_at >= $1 AND created_at < $2
112 GROUP BY provider
113 ORDER BY SUM(cost_microdollars) DESC NULLS LAST
114 LIMIT $3
115 "#,
116 start,
117 end,
118 limit
119 )
120 .fetch_all(&*self.pool)
121 .await
122 .map_err(Into::into)
123 }
124
125 pub async fn get_breakdown_by_agent(
126 &self,
127 start: DateTime<Utc>,
128 end: DateTime<Utc>,
129 limit: i64,
130 ) -> Result<Vec<CostBreakdownRow>> {
131 sqlx::query_as!(
132 CostBreakdownRow,
133 r#"
134 (
135 SELECT
136 at.agent_name as "name!",
137 COALESCE(SUM(r.cost_microdollars), 0)::bigint as "cost!",
138 COUNT(*)::bigint as "requests!",
139 COALESCE(SUM(r.tokens_used), 0)::bigint as "tokens!"
140 FROM ai_requests r
141 INNER JOIN agent_tasks at ON at.task_id = r.task_id
142 WHERE r.created_at >= $1 AND r.created_at < $2
143 AND at.agent_name IS NOT NULL
144 GROUP BY at.agent_name
145 ORDER BY SUM(r.cost_microdollars) DESC NULLS LAST
146 LIMIT $3
147 )
148 UNION ALL
149 (
150 SELECT
151 'unattributed' as "name!",
152 COALESCE(SUM(r.cost_microdollars), 0)::bigint as "cost!",
153 COUNT(*)::bigint as "requests!",
154 COALESCE(SUM(r.tokens_used), 0)::bigint as "tokens!"
155 FROM ai_requests r
156 LEFT JOIN agent_tasks at ON at.task_id = r.task_id
157 WHERE r.created_at >= $1 AND r.created_at < $2
158 AND (r.task_id IS NULL OR at.agent_name IS NULL)
159 HAVING COUNT(*) > 0
160 )
161 "#,
162 start,
163 end,
164 limit
165 )
166 .fetch_all(&*self.pool)
167 .await
168 .map_err(Into::into)
169 }
170
171 pub async fn get_summary_for_user(
172 &self,
173 user_id: &UserId,
174 start: DateTime<Utc>,
175 end: DateTime<Utc>,
176 ) -> Result<CostSummaryRow> {
177 sqlx::query_as!(
178 CostSummaryRow,
179 r#"
180 SELECT
181 COUNT(*)::bigint as "requests!",
182 SUM(cost_microdollars)::bigint as "cost",
183 SUM(tokens_used)::bigint as "tokens"
184 FROM ai_requests
185 WHERE created_at >= $1 AND created_at < $2 AND user_id = $3
186 "#,
187 start,
188 end,
189 user_id.as_str()
190 )
191 .fetch_one(&*self.pool)
192 .await
193 .map_err(Into::into)
194 }
195
196 pub async fn get_previous_cost_for_user(
197 &self,
198 user_id: &UserId,
199 start: DateTime<Utc>,
200 end: DateTime<Utc>,
201 ) -> Result<PreviousCostRow> {
202 sqlx::query_as!(
203 PreviousCostRow,
204 r#"
205 SELECT SUM(cost_microdollars)::bigint as "cost"
206 FROM ai_requests
207 WHERE created_at >= $1 AND created_at < $2 AND user_id = $3
208 "#,
209 start,
210 end,
211 user_id.as_str()
212 )
213 .fetch_one(&*self.pool)
214 .await
215 .map_err(Into::into)
216 }
217
218 pub async fn get_breakdown_by_model_for_user(
219 &self,
220 user_id: &UserId,
221 start: DateTime<Utc>,
222 end: DateTime<Utc>,
223 limit: i64,
224 ) -> Result<Vec<CostBreakdownRow>> {
225 sqlx::query_as!(
226 CostBreakdownRow,
227 r#"
228 SELECT
229 model as "name!",
230 COALESCE(SUM(cost_microdollars), 0)::bigint as "cost!",
231 COUNT(*)::bigint as "requests!",
232 COALESCE(SUM(tokens_used), 0)::bigint as "tokens!"
233 FROM ai_requests
234 WHERE created_at >= $1 AND created_at < $2 AND user_id = $4
235 GROUP BY model
236 ORDER BY SUM(cost_microdollars) DESC NULLS LAST
237 LIMIT $3
238 "#,
239 start,
240 end,
241 limit,
242 user_id.as_str()
243 )
244 .fetch_all(&*self.pool)
245 .await
246 .map_err(Into::into)
247 }
248
249 pub async fn get_context_summary_for_user(
250 &self,
251 user_id: &UserId,
252 start: DateTime<Utc>,
253 end: DateTime<Utc>,
254 ) -> Result<ContextSummaryRow> {
255 sqlx::query_as!(
256 ContextSummaryRow,
257 r#"
258 SELECT
259 COUNT(DISTINCT context_id)::bigint as "conversations!",
260 COUNT(*)::bigint as "ai_requests!"
261 FROM ai_requests
262 WHERE created_at >= $1 AND created_at < $2
263 AND user_id = $3
264 AND context_id IS NOT NULL
265 "#,
266 start,
267 end,
268 user_id.as_str()
269 )
270 .fetch_one(&*self.pool)
271 .await
272 .map_err(Into::into)
273 }
274
275 pub async fn get_contexts_by_model_for_user(
276 &self,
277 user_id: &UserId,
278 start: DateTime<Utc>,
279 end: DateTime<Utc>,
280 limit: i64,
281 ) -> Result<Vec<ContextGroupRow>> {
282 sqlx::query_as!(
283 ContextGroupRow,
284 r#"
285 SELECT
286 model as "name!",
287 COUNT(DISTINCT context_id)::bigint as "conversations!",
288 COUNT(*)::bigint as "ai_requests!"
289 FROM ai_requests
290 WHERE created_at >= $1 AND created_at < $2
291 AND user_id = $3
292 AND context_id IS NOT NULL
293 GROUP BY model
294 ORDER BY COUNT(DISTINCT context_id) DESC
295 LIMIT $4
296 "#,
297 start,
298 end,
299 user_id.as_str(),
300 limit
301 )
302 .fetch_all(&*self.pool)
303 .await
304 .map_err(Into::into)
305 }
306
307 pub async fn get_contexts_by_agent_for_user(
308 &self,
309 user_id: &UserId,
310 start: DateTime<Utc>,
311 end: DateTime<Utc>,
312 limit: i64,
313 ) -> Result<Vec<ContextGroupRow>> {
314 sqlx::query_as!(
315 ContextGroupRow,
316 r#"
317 SELECT
318 COALESCE(at.agent_name, 'unattributed') as "name!",
319 COUNT(DISTINCT r.context_id)::bigint as "conversations!",
320 COUNT(*)::bigint as "ai_requests!"
321 FROM ai_requests r
322 LEFT JOIN agent_tasks at ON at.task_id = r.task_id
323 WHERE r.created_at >= $1 AND r.created_at < $2
324 AND r.user_id = $3
325 AND r.context_id IS NOT NULL
326 GROUP BY COALESCE(at.agent_name, 'unattributed')
327 ORDER BY COUNT(DISTINCT r.context_id) DESC
328 LIMIT $4
329 "#,
330 start,
331 end,
332 user_id.as_str(),
333 limit
334 )
335 .fetch_all(&*self.pool)
336 .await
337 .map_err(Into::into)
338 }
339
340 pub async fn get_recent_contexts_for_user(
341 &self,
342 user_id: &UserId,
343 end: DateTime<Utc>,
344 limit: i64,
345 ) -> Result<Vec<RecentContextRow>> {
346 sqlx::query_as!(
347 RecentContextRow,
348 r#"
349 SELECT
350 ctx.context_id as "context_id!: ContextId",
351 ctx.last_activity as "last_activity!",
352 ctx.ai_requests as "ai_requests!",
353 last_req.model,
354 last_task.agent_name
355 FROM (
356 SELECT
357 r.context_id,
358 MAX(r.created_at) AS last_activity,
359 COUNT(*) AS ai_requests
360 FROM ai_requests r
361 WHERE r.user_id = $1
362 AND r.created_at < $2
363 AND r.context_id IS NOT NULL
364 GROUP BY r.context_id
365 ORDER BY MAX(r.created_at) DESC
366 LIMIT $3
367 ) ctx
368 LEFT JOIN LATERAL (
369 SELECT model, task_id FROM ai_requests
370 WHERE context_id = ctx.context_id
371 ORDER BY created_at DESC
372 LIMIT 1
373 ) last_req ON TRUE
374 LEFT JOIN agent_tasks last_task ON last_task.task_id = last_req.task_id
375 ORDER BY ctx.last_activity DESC
376 "#,
377 user_id.as_str(),
378 end,
379 limit
380 )
381 .fetch_all(&*self.pool)
382 .await
383 .map_err(Into::into)
384 }
385
386 pub async fn get_costs_for_trends(
387 &self,
388 start: DateTime<Utc>,
389 end: DateTime<Utc>,
390 ) -> Result<Vec<CostTrendRow>> {
391 sqlx::query_as!(
392 CostTrendRow,
393 r#"
394 SELECT
395 created_at as "created_at!",
396 cost_microdollars,
397 tokens_used
398 FROM ai_requests
399 WHERE created_at >= $1 AND created_at < $2
400 ORDER BY created_at
401 "#,
402 start,
403 end
404 )
405 .fetch_all(&*self.pool)
406 .await
407 .map_err(Into::into)
408 }
409}