Skip to main content

systemprompt_analytics/repository/
conversations.rs

1//! Conversation analytics over agent contexts and gateway sessions.
2//!
3//! [`ConversationAnalyticsRepository`] lists agent-task contexts and
4//! task-less gateway AI sessions, and reports task, message, and timestamp
5//! counts used to build conversation activity trends.
6
7use crate::Result;
8use chrono::{DateTime, Utc};
9use sqlx::PgPool;
10use std::sync::Arc;
11use systemprompt_database::DbPool;
12
13use crate::models::cli::{ConversationListRow, GatewaySessionListRow, TimestampRow};
14
15#[derive(Debug)]
16pub struct ConversationAnalyticsRepository {
17    pool: Arc<PgPool>,
18}
19
20impl ConversationAnalyticsRepository {
21    pub fn new(db: &DbPool) -> Result<Self> {
22        let pool = db.pool_arc()?;
23        Ok(Self { pool })
24    }
25
26    pub async fn list_agent_contexts(
27        &self,
28        start: DateTime<Utc>,
29        end: DateTime<Utc>,
30        limit: i64,
31    ) -> Result<Vec<ConversationListRow>> {
32        sqlx::query_as!(
33            ConversationListRow,
34            r#"
35            SELECT
36                uc.context_id as "context_id!: systemprompt_identifiers::ContextId",
37                uc.name as "name?",
38                (SELECT COUNT(*) FROM agent_tasks at WHERE at.context_id = uc.context_id)::bigint as "task_count!",
39                (SELECT COUNT(*) FROM task_messages tm
40                 JOIN agent_tasks at ON at.task_id = tm.task_id
41                 WHERE at.context_id = uc.context_id)::bigint as "message_count!",
42                uc.created_at as "created_at!",
43                uc.updated_at as "updated_at!"
44            FROM user_contexts uc
45            WHERE uc.created_at >= $1 AND uc.created_at < $2
46            ORDER BY uc.updated_at DESC
47            LIMIT $3
48            "#,
49            start,
50            end,
51            limit
52        )
53        .fetch_all(&*self.pool)
54        .await
55        .map_err(Into::into)
56    }
57
58    pub async fn list_gateway_sessions(
59        &self,
60        start: DateTime<Utc>,
61        end: DateTime<Utc>,
62        limit: i64,
63    ) -> Result<Vec<GatewaySessionListRow>> {
64        sqlx::query_as!(
65            GatewaySessionListRow,
66            r#"
67            SELECT
68                ar.session_id as "session_id!: systemprompt_identifiers::SessionId",
69                COUNT(arm.id)::bigint as "message_count!",
70                MIN(ar.created_at) as "created_at!",
71                MAX(ar.created_at) as "updated_at!"
72            FROM ai_requests ar
73            LEFT JOIN ai_request_messages arm ON arm.request_id = ar.id
74            WHERE ar.task_id IS NULL
75              AND ar.session_id IS NOT NULL
76              AND ar.created_at >= $1 AND ar.created_at < $2
77              AND NOT EXISTS (
78                  SELECT 1 FROM user_contexts uc2 WHERE uc2.context_id::text = ar.session_id
79              )
80            GROUP BY ar.session_id
81            ORDER BY MAX(ar.created_at) DESC
82            LIMIT $3
83            "#,
84            start,
85            end,
86            limit
87        )
88        .fetch_all(&*self.pool)
89        .await
90        .map_err(Into::into)
91    }
92
93    pub async fn get_context_count(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<i64> {
94        let count = sqlx::query_scalar!(
95            r#"SELECT COUNT(*)::bigint as "count!" FROM user_contexts WHERE created_at >= $1 AND created_at < $2"#,
96            start,
97            end
98        )
99        .fetch_one(&*self.pool)
100        .await?;
101        Ok(count)
102    }
103
104    pub async fn get_task_stats(
105        &self,
106        start: DateTime<Utc>,
107        end: DateTime<Utc>,
108    ) -> Result<(i64, Option<f64>)> {
109        let row = sqlx::query!(
110            r#"
111            SELECT COUNT(*)::bigint as "count!", AVG(execution_time_ms)::float8 as avg_time
112            FROM agent_tasks
113            WHERE started_at >= $1 AND started_at < $2
114            "#,
115            start,
116            end
117        )
118        .fetch_one(&*self.pool)
119        .await?;
120        Ok((row.count, row.avg_time))
121    }
122
123    pub async fn get_message_count(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<i64> {
124        let count = sqlx::query_scalar!(
125            r#"SELECT COUNT(*)::bigint as "count!" FROM task_messages WHERE created_at >= $1 AND created_at < $2"#,
126            start,
127            end
128        )
129        .fetch_one(&*self.pool)
130        .await?;
131        Ok(count)
132    }
133
134    pub async fn get_context_timestamps(
135        &self,
136        start: DateTime<Utc>,
137        end: DateTime<Utc>,
138    ) -> Result<Vec<TimestampRow>> {
139        sqlx::query_as!(
140            TimestampRow,
141            r#"
142            SELECT created_at as "timestamp!"
143            FROM user_contexts
144            WHERE created_at >= $1 AND created_at < $2
145            "#,
146            start,
147            end
148        )
149        .fetch_all(&*self.pool)
150        .await
151        .map_err(Into::into)
152    }
153
154    pub async fn get_task_timestamps(
155        &self,
156        start: DateTime<Utc>,
157        end: DateTime<Utc>,
158    ) -> Result<Vec<TimestampRow>> {
159        sqlx::query_as!(
160            TimestampRow,
161            r#"
162            SELECT started_at as "timestamp!"
163            FROM agent_tasks
164            WHERE started_at >= $1 AND started_at < $2
165            "#,
166            start,
167            end
168        )
169        .fetch_all(&*self.pool)
170        .await
171        .map_err(Into::into)
172    }
173
174    pub async fn get_message_timestamps(
175        &self,
176        start: DateTime<Utc>,
177        end: DateTime<Utc>,
178    ) -> Result<Vec<TimestampRow>> {
179        sqlx::query_as!(
180            TimestampRow,
181            r#"
182            SELECT created_at as "timestamp!"
183            FROM task_messages
184            WHERE created_at >= $1 AND created_at < $2
185            "#,
186            start,
187            end
188        )
189        .fetch_all(&*self.pool)
190        .await
191        .map_err(Into::into)
192    }
193}