Skip to main content

systemprompt_analytics/repository/
costs.rs

1use crate::Result;
2use chrono::{DateTime, Utc};
3use sqlx::PgPool;
4use std::sync::Arc;
5use systemprompt_database::DbPool;
6
7use crate::models::cli::{
8    ContextGroupRow, ContextSummaryRow, CostBreakdownRow, CostSummaryRow, CostTrendRow,
9    PreviousCostRow, RecentContextRow,
10};
11
12#[derive(Debug)]
13pub struct CostAnalyticsRepository {
14    pool: Arc<PgPool>,
15}
16
17impl CostAnalyticsRepository {
18    pub fn new(db: &DbPool) -> Result<Self> {
19        let pool = db.pool_arc()?;
20        Ok(Self { pool })
21    }
22
23    pub async fn get_summary(
24        &self,
25        start: DateTime<Utc>,
26        end: DateTime<Utc>,
27    ) -> Result<CostSummaryRow> {
28        sqlx::query_as!(
29            CostSummaryRow,
30            r#"
31            SELECT
32                COUNT(*)::bigint as "requests!",
33                SUM(cost_microdollars)::bigint as "cost",
34                SUM(tokens_used)::bigint as "tokens"
35            FROM ai_requests
36            WHERE created_at >= $1 AND created_at < $2
37            "#,
38            start,
39            end
40        )
41        .fetch_one(&*self.pool)
42        .await
43        .map_err(Into::into)
44    }
45
46    pub async fn get_previous_cost(
47        &self,
48        start: DateTime<Utc>,
49        end: DateTime<Utc>,
50    ) -> Result<PreviousCostRow> {
51        sqlx::query_as!(
52            PreviousCostRow,
53            r#"
54            SELECT SUM(cost_microdollars)::bigint as "cost"
55            FROM ai_requests
56            WHERE created_at >= $1 AND created_at < $2
57            "#,
58            start,
59            end
60        )
61        .fetch_one(&*self.pool)
62        .await
63        .map_err(Into::into)
64    }
65
66    pub async fn get_breakdown_by_model(
67        &self,
68        start: DateTime<Utc>,
69        end: DateTime<Utc>,
70        limit: i64,
71    ) -> Result<Vec<CostBreakdownRow>> {
72        sqlx::query_as!(
73            CostBreakdownRow,
74            r#"
75            SELECT
76                model as "name!",
77                COALESCE(SUM(cost_microdollars), 0)::bigint as "cost!",
78                COUNT(*)::bigint as "requests!",
79                COALESCE(SUM(tokens_used), 0)::bigint as "tokens!"
80            FROM ai_requests
81            WHERE created_at >= $1 AND created_at < $2
82            GROUP BY model
83            ORDER BY SUM(cost_microdollars) DESC NULLS LAST
84            LIMIT $3
85            "#,
86            start,
87            end,
88            limit
89        )
90        .fetch_all(&*self.pool)
91        .await
92        .map_err(Into::into)
93    }
94
95    pub async fn get_breakdown_by_provider(
96        &self,
97        start: DateTime<Utc>,
98        end: DateTime<Utc>,
99        limit: i64,
100    ) -> Result<Vec<CostBreakdownRow>> {
101        sqlx::query_as!(
102            CostBreakdownRow,
103            r#"
104            SELECT
105                provider as "name!",
106                COALESCE(SUM(cost_microdollars), 0)::bigint as "cost!",
107                COUNT(*)::bigint as "requests!",
108                COALESCE(SUM(tokens_used), 0)::bigint as "tokens!"
109            FROM ai_requests
110            WHERE created_at >= $1 AND created_at < $2
111            GROUP BY provider
112            ORDER BY SUM(cost_microdollars) DESC NULLS LAST
113            LIMIT $3
114            "#,
115            start,
116            end,
117            limit
118        )
119        .fetch_all(&*self.pool)
120        .await
121        .map_err(Into::into)
122    }
123
124    pub async fn get_breakdown_by_agent(
125        &self,
126        start: DateTime<Utc>,
127        end: DateTime<Utc>,
128        limit: i64,
129    ) -> Result<Vec<CostBreakdownRow>> {
130        sqlx::query_as!(
131            CostBreakdownRow,
132            r#"
133            (
134                SELECT
135                    at.agent_name as "name!",
136                    COALESCE(SUM(r.cost_microdollars), 0)::bigint as "cost!",
137                    COUNT(*)::bigint as "requests!",
138                    COALESCE(SUM(r.tokens_used), 0)::bigint as "tokens!"
139                FROM ai_requests r
140                INNER JOIN agent_tasks at ON at.task_id = r.task_id
141                WHERE r.created_at >= $1 AND r.created_at < $2
142                  AND at.agent_name IS NOT NULL
143                GROUP BY at.agent_name
144                ORDER BY SUM(r.cost_microdollars) DESC NULLS LAST
145                LIMIT $3
146            )
147            UNION ALL
148            (
149                SELECT
150                    'unattributed' as "name!",
151                    COALESCE(SUM(r.cost_microdollars), 0)::bigint as "cost!",
152                    COUNT(*)::bigint as "requests!",
153                    COALESCE(SUM(r.tokens_used), 0)::bigint as "tokens!"
154                FROM ai_requests r
155                LEFT JOIN agent_tasks at ON at.task_id = r.task_id
156                WHERE r.created_at >= $1 AND r.created_at < $2
157                  AND (r.task_id IS NULL OR at.agent_name IS NULL)
158                HAVING COUNT(*) > 0
159            )
160            "#,
161            start,
162            end,
163            limit
164        )
165        .fetch_all(&*self.pool)
166        .await
167        .map_err(Into::into)
168    }
169
170    pub async fn get_summary_for_user(
171        &self,
172        user_id: &str,
173        start: DateTime<Utc>,
174        end: DateTime<Utc>,
175    ) -> Result<CostSummaryRow> {
176        sqlx::query_as!(
177            CostSummaryRow,
178            r#"
179            SELECT
180                COUNT(*)::bigint as "requests!",
181                SUM(cost_microdollars)::bigint as "cost",
182                SUM(tokens_used)::bigint as "tokens"
183            FROM ai_requests
184            WHERE created_at >= $1 AND created_at < $2 AND user_id = $3
185            "#,
186            start,
187            end,
188            user_id
189        )
190        .fetch_one(&*self.pool)
191        .await
192        .map_err(Into::into)
193    }
194
195    pub async fn get_previous_cost_for_user(
196        &self,
197        user_id: &str,
198        start: DateTime<Utc>,
199        end: DateTime<Utc>,
200    ) -> Result<PreviousCostRow> {
201        sqlx::query_as!(
202            PreviousCostRow,
203            r#"
204            SELECT SUM(cost_microdollars)::bigint as "cost"
205            FROM ai_requests
206            WHERE created_at >= $1 AND created_at < $2 AND user_id = $3
207            "#,
208            start,
209            end,
210            user_id
211        )
212        .fetch_one(&*self.pool)
213        .await
214        .map_err(Into::into)
215    }
216
217    pub async fn get_breakdown_by_model_for_user(
218        &self,
219        user_id: &str,
220        start: DateTime<Utc>,
221        end: DateTime<Utc>,
222        limit: i64,
223    ) -> Result<Vec<CostBreakdownRow>> {
224        sqlx::query_as!(
225            CostBreakdownRow,
226            r#"
227            SELECT
228                model as "name!",
229                COALESCE(SUM(cost_microdollars), 0)::bigint as "cost!",
230                COUNT(*)::bigint as "requests!",
231                COALESCE(SUM(tokens_used), 0)::bigint as "tokens!"
232            FROM ai_requests
233            WHERE created_at >= $1 AND created_at < $2 AND user_id = $4
234            GROUP BY model
235            ORDER BY SUM(cost_microdollars) DESC NULLS LAST
236            LIMIT $3
237            "#,
238            start,
239            end,
240            limit,
241            user_id
242        )
243        .fetch_all(&*self.pool)
244        .await
245        .map_err(Into::into)
246    }
247
248    pub async fn get_context_summary_for_user(
249        &self,
250        user_id: &str,
251        start: DateTime<Utc>,
252        end: DateTime<Utc>,
253    ) -> Result<ContextSummaryRow> {
254        sqlx::query_as!(
255            ContextSummaryRow,
256            r#"
257            SELECT
258                COUNT(DISTINCT context_id)::bigint as "conversations!",
259                COUNT(*)::bigint as "ai_requests!"
260            FROM ai_requests
261            WHERE created_at >= $1 AND created_at < $2
262              AND user_id = $3
263              AND context_id IS NOT NULL
264            "#,
265            start,
266            end,
267            user_id
268        )
269        .fetch_one(&*self.pool)
270        .await
271        .map_err(Into::into)
272    }
273
274    pub async fn get_contexts_by_model_for_user(
275        &self,
276        user_id: &str,
277        start: DateTime<Utc>,
278        end: DateTime<Utc>,
279        limit: i64,
280    ) -> Result<Vec<ContextGroupRow>> {
281        sqlx::query_as!(
282            ContextGroupRow,
283            r#"
284            SELECT
285                model as "name!",
286                COUNT(DISTINCT context_id)::bigint as "conversations!",
287                COUNT(*)::bigint as "ai_requests!"
288            FROM ai_requests
289            WHERE created_at >= $1 AND created_at < $2
290              AND user_id = $3
291              AND context_id IS NOT NULL
292            GROUP BY model
293            ORDER BY COUNT(DISTINCT context_id) DESC
294            LIMIT $4
295            "#,
296            start,
297            end,
298            user_id,
299            limit
300        )
301        .fetch_all(&*self.pool)
302        .await
303        .map_err(Into::into)
304    }
305
306    pub async fn get_contexts_by_agent_for_user(
307        &self,
308        user_id: &str,
309        start: DateTime<Utc>,
310        end: DateTime<Utc>,
311        limit: i64,
312    ) -> Result<Vec<ContextGroupRow>> {
313        sqlx::query_as!(
314            ContextGroupRow,
315            r#"
316            SELECT
317                COALESCE(at.agent_name, 'unattributed') as "name!",
318                COUNT(DISTINCT r.context_id)::bigint as "conversations!",
319                COUNT(*)::bigint as "ai_requests!"
320            FROM ai_requests r
321            LEFT JOIN agent_tasks at ON at.task_id = r.task_id
322            WHERE r.created_at >= $1 AND r.created_at < $2
323              AND r.user_id = $3
324              AND r.context_id IS NOT NULL
325            GROUP BY COALESCE(at.agent_name, 'unattributed')
326            ORDER BY COUNT(DISTINCT r.context_id) DESC
327            LIMIT $4
328            "#,
329            start,
330            end,
331            user_id,
332            limit
333        )
334        .fetch_all(&*self.pool)
335        .await
336        .map_err(Into::into)
337    }
338
339    pub async fn get_recent_contexts_for_user(
340        &self,
341        user_id: &str,
342        end: DateTime<Utc>,
343        limit: i64,
344    ) -> Result<Vec<RecentContextRow>> {
345        sqlx::query_as!(
346            RecentContextRow,
347            r#"
348            SELECT
349                ctx.context_id as "context_id!",
350                ctx.last_activity as "last_activity!",
351                ctx.ai_requests as "ai_requests!",
352                last_req.model,
353                last_task.agent_name
354            FROM (
355                SELECT
356                    r.context_id,
357                    MAX(r.created_at) AS last_activity,
358                    COUNT(*) AS ai_requests
359                FROM ai_requests r
360                WHERE r.user_id = $1
361                  AND r.created_at < $2
362                  AND r.context_id IS NOT NULL
363                GROUP BY r.context_id
364                ORDER BY MAX(r.created_at) DESC
365                LIMIT $3
366            ) ctx
367            LEFT JOIN LATERAL (
368                SELECT model, task_id FROM ai_requests
369                WHERE context_id = ctx.context_id
370                ORDER BY created_at DESC
371                LIMIT 1
372            ) last_req ON TRUE
373            LEFT JOIN agent_tasks last_task ON last_task.task_id = last_req.task_id
374            ORDER BY ctx.last_activity DESC
375            "#,
376            user_id,
377            end,
378            limit
379        )
380        .fetch_all(&*self.pool)
381        .await
382        .map_err(Into::into)
383    }
384
385    pub async fn get_costs_for_trends(
386        &self,
387        start: DateTime<Utc>,
388        end: DateTime<Utc>,
389    ) -> Result<Vec<CostTrendRow>> {
390        sqlx::query_as!(
391            CostTrendRow,
392            r#"
393            SELECT
394                created_at as "created_at!",
395                cost_microdollars,
396                tokens_used
397            FROM ai_requests
398            WHERE created_at >= $1 AND created_at < $2
399            ORDER BY created_at
400            "#,
401            start,
402            end
403        )
404        .fetch_all(&*self.pool)
405        .await
406        .map_err(Into::into)
407    }
408}