Skip to main content

systemprompt_analytics/repository/
costs.rs

1use crate::Result;
2use chrono::{DateTime, Utc};
3use sqlx::PgPool;
4use std::sync::Arc;
5use systemprompt_database::DbPool;
6use systemprompt_identifiers::{ContextId, UserId};
7
8use crate::models::cli::{
9    ContextGroupRow, ContextSummaryRow, CostBreakdownRow, CostSummaryRow, CostTrendRow,
10    PreviousCostRow, RecentContextRow,
11};
12
13#[derive(Debug)]
14pub struct CostAnalyticsRepository {
15    pool: Arc<PgPool>,
16}
17
18impl CostAnalyticsRepository {
19    pub fn new(db: &DbPool) -> Result<Self> {
20        let pool = db.pool_arc()?;
21        Ok(Self { pool })
22    }
23
24    pub async fn get_summary(
25        &self,
26        start: DateTime<Utc>,
27        end: DateTime<Utc>,
28    ) -> Result<CostSummaryRow> {
29        sqlx::query_as!(
30            CostSummaryRow,
31            r#"
32            SELECT
33                COUNT(*)::bigint as "requests!",
34                SUM(cost_microdollars)::bigint as "cost",
35                SUM(tokens_used)::bigint as "tokens"
36            FROM ai_requests
37            WHERE created_at >= $1 AND created_at < $2
38            "#,
39            start,
40            end
41        )
42        .fetch_one(&*self.pool)
43        .await
44        .map_err(Into::into)
45    }
46
47    pub async fn get_previous_cost(
48        &self,
49        start: DateTime<Utc>,
50        end: DateTime<Utc>,
51    ) -> Result<PreviousCostRow> {
52        sqlx::query_as!(
53            PreviousCostRow,
54            r#"
55            SELECT SUM(cost_microdollars)::bigint as "cost"
56            FROM ai_requests
57            WHERE created_at >= $1 AND created_at < $2
58            "#,
59            start,
60            end
61        )
62        .fetch_one(&*self.pool)
63        .await
64        .map_err(Into::into)
65    }
66
67    pub async fn get_breakdown_by_model(
68        &self,
69        start: DateTime<Utc>,
70        end: DateTime<Utc>,
71        limit: i64,
72    ) -> Result<Vec<CostBreakdownRow>> {
73        sqlx::query_as!(
74            CostBreakdownRow,
75            r#"
76            SELECT
77                model as "name!",
78                COALESCE(SUM(cost_microdollars), 0)::bigint as "cost!",
79                COUNT(*)::bigint as "requests!",
80                COALESCE(SUM(tokens_used), 0)::bigint as "tokens!"
81            FROM ai_requests
82            WHERE created_at >= $1 AND created_at < $2
83            GROUP BY model
84            ORDER BY SUM(cost_microdollars) DESC NULLS LAST
85            LIMIT $3
86            "#,
87            start,
88            end,
89            limit
90        )
91        .fetch_all(&*self.pool)
92        .await
93        .map_err(Into::into)
94    }
95
96    pub async fn get_breakdown_by_provider(
97        &self,
98        start: DateTime<Utc>,
99        end: DateTime<Utc>,
100        limit: i64,
101    ) -> Result<Vec<CostBreakdownRow>> {
102        sqlx::query_as!(
103            CostBreakdownRow,
104            r#"
105            SELECT
106                provider as "name!",
107                COALESCE(SUM(cost_microdollars), 0)::bigint as "cost!",
108                COUNT(*)::bigint as "requests!",
109                COALESCE(SUM(tokens_used), 0)::bigint as "tokens!"
110            FROM ai_requests
111            WHERE created_at >= $1 AND created_at < $2
112            GROUP BY provider
113            ORDER BY SUM(cost_microdollars) DESC NULLS LAST
114            LIMIT $3
115            "#,
116            start,
117            end,
118            limit
119        )
120        .fetch_all(&*self.pool)
121        .await
122        .map_err(Into::into)
123    }
124
125    pub async fn get_breakdown_by_agent(
126        &self,
127        start: DateTime<Utc>,
128        end: DateTime<Utc>,
129        limit: i64,
130    ) -> Result<Vec<CostBreakdownRow>> {
131        sqlx::query_as!(
132            CostBreakdownRow,
133            r#"
134            (
135                SELECT
136                    at.agent_name as "name!",
137                    COALESCE(SUM(r.cost_microdollars), 0)::bigint as "cost!",
138                    COUNT(*)::bigint as "requests!",
139                    COALESCE(SUM(r.tokens_used), 0)::bigint as "tokens!"
140                FROM ai_requests r
141                INNER JOIN agent_tasks at ON at.task_id = r.task_id
142                WHERE r.created_at >= $1 AND r.created_at < $2
143                  AND at.agent_name IS NOT NULL
144                GROUP BY at.agent_name
145                ORDER BY SUM(r.cost_microdollars) DESC NULLS LAST
146                LIMIT $3
147            )
148            UNION ALL
149            (
150                SELECT
151                    'unattributed' as "name!",
152                    COALESCE(SUM(r.cost_microdollars), 0)::bigint as "cost!",
153                    COUNT(*)::bigint as "requests!",
154                    COALESCE(SUM(r.tokens_used), 0)::bigint as "tokens!"
155                FROM ai_requests r
156                LEFT JOIN agent_tasks at ON at.task_id = r.task_id
157                WHERE r.created_at >= $1 AND r.created_at < $2
158                  AND (r.task_id IS NULL OR at.agent_name IS NULL)
159                HAVING COUNT(*) > 0
160            )
161            "#,
162            start,
163            end,
164            limit
165        )
166        .fetch_all(&*self.pool)
167        .await
168        .map_err(Into::into)
169    }
170
171    pub async fn get_summary_for_user(
172        &self,
173        user_id: &UserId,
174        start: DateTime<Utc>,
175        end: DateTime<Utc>,
176    ) -> Result<CostSummaryRow> {
177        sqlx::query_as!(
178            CostSummaryRow,
179            r#"
180            SELECT
181                COUNT(*)::bigint as "requests!",
182                SUM(cost_microdollars)::bigint as "cost",
183                SUM(tokens_used)::bigint as "tokens"
184            FROM ai_requests
185            WHERE created_at >= $1 AND created_at < $2 AND user_id = $3
186            "#,
187            start,
188            end,
189            user_id.as_str()
190        )
191        .fetch_one(&*self.pool)
192        .await
193        .map_err(Into::into)
194    }
195
196    pub async fn get_previous_cost_for_user(
197        &self,
198        user_id: &UserId,
199        start: DateTime<Utc>,
200        end: DateTime<Utc>,
201    ) -> Result<PreviousCostRow> {
202        sqlx::query_as!(
203            PreviousCostRow,
204            r#"
205            SELECT SUM(cost_microdollars)::bigint as "cost"
206            FROM ai_requests
207            WHERE created_at >= $1 AND created_at < $2 AND user_id = $3
208            "#,
209            start,
210            end,
211            user_id.as_str()
212        )
213        .fetch_one(&*self.pool)
214        .await
215        .map_err(Into::into)
216    }
217
218    pub async fn get_breakdown_by_model_for_user(
219        &self,
220        user_id: &UserId,
221        start: DateTime<Utc>,
222        end: DateTime<Utc>,
223        limit: i64,
224    ) -> Result<Vec<CostBreakdownRow>> {
225        sqlx::query_as!(
226            CostBreakdownRow,
227            r#"
228            SELECT
229                model as "name!",
230                COALESCE(SUM(cost_microdollars), 0)::bigint as "cost!",
231                COUNT(*)::bigint as "requests!",
232                COALESCE(SUM(tokens_used), 0)::bigint as "tokens!"
233            FROM ai_requests
234            WHERE created_at >= $1 AND created_at < $2 AND user_id = $4
235            GROUP BY model
236            ORDER BY SUM(cost_microdollars) DESC NULLS LAST
237            LIMIT $3
238            "#,
239            start,
240            end,
241            limit,
242            user_id.as_str()
243        )
244        .fetch_all(&*self.pool)
245        .await
246        .map_err(Into::into)
247    }
248
249    pub async fn get_context_summary_for_user(
250        &self,
251        user_id: &UserId,
252        start: DateTime<Utc>,
253        end: DateTime<Utc>,
254    ) -> Result<ContextSummaryRow> {
255        sqlx::query_as!(
256            ContextSummaryRow,
257            r#"
258            SELECT
259                COUNT(DISTINCT context_id)::bigint as "conversations!",
260                COUNT(*)::bigint as "ai_requests!"
261            FROM ai_requests
262            WHERE created_at >= $1 AND created_at < $2
263              AND user_id = $3
264              AND context_id IS NOT NULL
265            "#,
266            start,
267            end,
268            user_id.as_str()
269        )
270        .fetch_one(&*self.pool)
271        .await
272        .map_err(Into::into)
273    }
274
275    pub async fn get_contexts_by_model_for_user(
276        &self,
277        user_id: &UserId,
278        start: DateTime<Utc>,
279        end: DateTime<Utc>,
280        limit: i64,
281    ) -> Result<Vec<ContextGroupRow>> {
282        sqlx::query_as!(
283            ContextGroupRow,
284            r#"
285            SELECT
286                model as "name!",
287                COUNT(DISTINCT context_id)::bigint as "conversations!",
288                COUNT(*)::bigint as "ai_requests!"
289            FROM ai_requests
290            WHERE created_at >= $1 AND created_at < $2
291              AND user_id = $3
292              AND context_id IS NOT NULL
293            GROUP BY model
294            ORDER BY COUNT(DISTINCT context_id) DESC
295            LIMIT $4
296            "#,
297            start,
298            end,
299            user_id.as_str(),
300            limit
301        )
302        .fetch_all(&*self.pool)
303        .await
304        .map_err(Into::into)
305    }
306
307    pub async fn get_contexts_by_agent_for_user(
308        &self,
309        user_id: &UserId,
310        start: DateTime<Utc>,
311        end: DateTime<Utc>,
312        limit: i64,
313    ) -> Result<Vec<ContextGroupRow>> {
314        sqlx::query_as!(
315            ContextGroupRow,
316            r#"
317            SELECT
318                COALESCE(at.agent_name, 'unattributed') as "name!",
319                COUNT(DISTINCT r.context_id)::bigint as "conversations!",
320                COUNT(*)::bigint as "ai_requests!"
321            FROM ai_requests r
322            LEFT JOIN agent_tasks at ON at.task_id = r.task_id
323            WHERE r.created_at >= $1 AND r.created_at < $2
324              AND r.user_id = $3
325              AND r.context_id IS NOT NULL
326            GROUP BY COALESCE(at.agent_name, 'unattributed')
327            ORDER BY COUNT(DISTINCT r.context_id) DESC
328            LIMIT $4
329            "#,
330            start,
331            end,
332            user_id.as_str(),
333            limit
334        )
335        .fetch_all(&*self.pool)
336        .await
337        .map_err(Into::into)
338    }
339
340    pub async fn get_recent_contexts_for_user(
341        &self,
342        user_id: &UserId,
343        end: DateTime<Utc>,
344        limit: i64,
345    ) -> Result<Vec<RecentContextRow>> {
346        sqlx::query_as!(
347            RecentContextRow,
348            r#"
349            SELECT
350                ctx.context_id as "context_id!: ContextId",
351                ctx.last_activity as "last_activity!",
352                ctx.ai_requests as "ai_requests!",
353                last_req.model,
354                last_task.agent_name
355            FROM (
356                SELECT
357                    r.context_id,
358                    MAX(r.created_at) AS last_activity,
359                    COUNT(*) AS ai_requests
360                FROM ai_requests r
361                WHERE r.user_id = $1
362                  AND r.created_at < $2
363                  AND r.context_id IS NOT NULL
364                GROUP BY r.context_id
365                ORDER BY MAX(r.created_at) DESC
366                LIMIT $3
367            ) ctx
368            LEFT JOIN LATERAL (
369                SELECT model, task_id FROM ai_requests
370                WHERE context_id = ctx.context_id
371                ORDER BY created_at DESC
372                LIMIT 1
373            ) last_req ON TRUE
374            LEFT JOIN agent_tasks last_task ON last_task.task_id = last_req.task_id
375            ORDER BY ctx.last_activity DESC
376            "#,
377            user_id.as_str(),
378            end,
379            limit
380        )
381        .fetch_all(&*self.pool)
382        .await
383        .map_err(Into::into)
384    }
385
386    pub async fn get_costs_for_trends(
387        &self,
388        start: DateTime<Utc>,
389        end: DateTime<Utc>,
390    ) -> Result<Vec<CostTrendRow>> {
391        sqlx::query_as!(
392            CostTrendRow,
393            r#"
394            SELECT
395                created_at as "created_at!",
396                cost_microdollars,
397                tokens_used
398            FROM ai_requests
399            WHERE created_at >= $1 AND created_at < $2
400            ORDER BY created_at
401            "#,
402            start,
403            end
404        )
405        .fetch_all(&*self.pool)
406        .await
407        .map_err(Into::into)
408    }
409}