systemprompt_analytics/repository/
conversations.rs1use crate::Result;
8use chrono::{DateTime, Utc};
9use sqlx::PgPool;
10use std::sync::Arc;
11use systemprompt_database::DbPool;
12
13use crate::models::cli::{ConversationListRow, GatewaySessionListRow, TimestampRow};
14
15#[derive(Debug)]
16pub struct ConversationAnalyticsRepository {
17 pool: Arc<PgPool>,
18}
19
20impl ConversationAnalyticsRepository {
21 pub fn new(db: &DbPool) -> Result<Self> {
22 let pool = db.pool_arc()?;
23 Ok(Self { pool })
24 }
25
26 pub async fn list_agent_contexts(
27 &self,
28 start: DateTime<Utc>,
29 end: DateTime<Utc>,
30 limit: i64,
31 ) -> Result<Vec<ConversationListRow>> {
32 sqlx::query_as!(
33 ConversationListRow,
34 r#"
35 SELECT
36 uc.context_id as "context_id!: systemprompt_identifiers::ContextId",
37 uc.name as "name?",
38 (SELECT COUNT(*) FROM agent_tasks at WHERE at.context_id = uc.context_id)::bigint as "task_count!",
39 (SELECT COUNT(*) FROM task_messages tm
40 JOIN agent_tasks at ON at.task_id = tm.task_id
41 WHERE at.context_id = uc.context_id)::bigint as "message_count!",
42 uc.created_at as "created_at!",
43 uc.updated_at as "updated_at!"
44 FROM user_contexts uc
45 WHERE uc.created_at >= $1 AND uc.created_at < $2
46 ORDER BY uc.updated_at DESC
47 LIMIT $3
48 "#,
49 start,
50 end,
51 limit
52 )
53 .fetch_all(&*self.pool)
54 .await
55 .map_err(Into::into)
56 }
57
58 pub async fn list_gateway_sessions(
59 &self,
60 start: DateTime<Utc>,
61 end: DateTime<Utc>,
62 limit: i64,
63 ) -> Result<Vec<GatewaySessionListRow>> {
64 sqlx::query_as!(
65 GatewaySessionListRow,
66 r#"
67 SELECT
68 ar.session_id as "session_id!: systemprompt_identifiers::SessionId",
69 COUNT(arm.id)::bigint as "message_count!",
70 MIN(ar.created_at) as "created_at!",
71 MAX(ar.created_at) as "updated_at!"
72 FROM ai_requests ar
73 LEFT JOIN ai_request_messages arm ON arm.request_id = ar.id
74 WHERE ar.task_id IS NULL
75 AND ar.session_id IS NOT NULL
76 AND ar.created_at >= $1 AND ar.created_at < $2
77 AND NOT EXISTS (
78 SELECT 1 FROM user_contexts uc2 WHERE uc2.context_id::text = ar.session_id
79 )
80 GROUP BY ar.session_id
81 ORDER BY MAX(ar.created_at) DESC
82 LIMIT $3
83 "#,
84 start,
85 end,
86 limit
87 )
88 .fetch_all(&*self.pool)
89 .await
90 .map_err(Into::into)
91 }
92
93 pub async fn get_context_count(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<i64> {
94 let count = sqlx::query_scalar!(
95 r#"SELECT COUNT(*)::bigint as "count!" FROM user_contexts WHERE created_at >= $1 AND created_at < $2"#,
96 start,
97 end
98 )
99 .fetch_one(&*self.pool)
100 .await?;
101 Ok(count)
102 }
103
104 pub async fn get_task_stats(
105 &self,
106 start: DateTime<Utc>,
107 end: DateTime<Utc>,
108 ) -> Result<(i64, Option<f64>)> {
109 let row = sqlx::query!(
110 r#"
111 SELECT COUNT(*)::bigint as "count!", AVG(execution_time_ms)::float8 as avg_time
112 FROM agent_tasks
113 WHERE started_at >= $1 AND started_at < $2
114 "#,
115 start,
116 end
117 )
118 .fetch_one(&*self.pool)
119 .await?;
120 Ok((row.count, row.avg_time))
121 }
122
123 pub async fn get_message_count(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<i64> {
124 let count = sqlx::query_scalar!(
125 r#"SELECT COUNT(*)::bigint as "count!" FROM task_messages WHERE created_at >= $1 AND created_at < $2"#,
126 start,
127 end
128 )
129 .fetch_one(&*self.pool)
130 .await?;
131 Ok(count)
132 }
133
134 pub async fn get_context_timestamps(
135 &self,
136 start: DateTime<Utc>,
137 end: DateTime<Utc>,
138 ) -> Result<Vec<TimestampRow>> {
139 sqlx::query_as!(
140 TimestampRow,
141 r#"
142 SELECT created_at as "timestamp!"
143 FROM user_contexts
144 WHERE created_at >= $1 AND created_at < $2
145 "#,
146 start,
147 end
148 )
149 .fetch_all(&*self.pool)
150 .await
151 .map_err(Into::into)
152 }
153
154 pub async fn get_task_timestamps(
155 &self,
156 start: DateTime<Utc>,
157 end: DateTime<Utc>,
158 ) -> Result<Vec<TimestampRow>> {
159 sqlx::query_as!(
160 TimestampRow,
161 r#"
162 SELECT started_at as "timestamp!"
163 FROM agent_tasks
164 WHERE started_at >= $1 AND started_at < $2
165 "#,
166 start,
167 end
168 )
169 .fetch_all(&*self.pool)
170 .await
171 .map_err(Into::into)
172 }
173
174 pub async fn get_message_timestamps(
175 &self,
176 start: DateTime<Utc>,
177 end: DateTime<Utc>,
178 ) -> Result<Vec<TimestampRow>> {
179 sqlx::query_as!(
180 TimestampRow,
181 r#"
182 SELECT created_at as "timestamp!"
183 FROM task_messages
184 WHERE created_at >= $1 AND created_at < $2
185 "#,
186 start,
187 end
188 )
189 .fetch_all(&*self.pool)
190 .await
191 .map_err(Into::into)
192 }
193}