systemprompt_analytics/repository/
events.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use sqlx::PgPool;
5use systemprompt_database::DbPool;
6use systemprompt_identifiers::{ContentId, SessionId, UserId};
7
8use crate::models::{AnalyticsEventCreated, AnalyticsEventType, CreateAnalyticsEventInput};
9
10#[derive(Clone, Debug)]
11pub struct AnalyticsEventsRepository {
12 pool: Arc<PgPool>,
13 write_pool: Arc<PgPool>,
14}
15
16impl AnalyticsEventsRepository {
17 pub fn new(db: &DbPool) -> Result<Self> {
18 let pool = db.pool_arc()?;
19 let write_pool = db.write_pool_arc()?;
20 Ok(Self { pool, write_pool })
21 }
22
23 pub async fn create_event(
24 &self,
25 session_id: &SessionId,
26 user_id: &UserId,
27 input: &CreateAnalyticsEventInput,
28 ) -> Result<AnalyticsEventCreated> {
29 let id = format!("evt_{}", uuid::Uuid::new_v4());
30 let event_type = input.event_type.as_str();
31 let event_category = input.event_type.category();
32
33 let event_data = Self::build_event_data(input);
34
35 sqlx::query!(
36 r#"
37 INSERT INTO analytics_events (
38 id, user_id, session_id, event_type, event_category,
39 severity, endpoint, event_data
40 )
41 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
42 "#,
43 id,
44 user_id.as_str(),
45 session_id.as_str(),
46 event_type,
47 event_category,
48 "info",
49 input.page_url,
50 event_data
51 )
52 .execute(&*self.write_pool)
53 .await?;
54
55 Ok(AnalyticsEventCreated {
56 id,
57 event_type: event_type.to_string(),
58 })
59 }
60
61 pub async fn create_events_batch(
62 &self,
63 session_id: &SessionId,
64 user_id: &UserId,
65 inputs: &[CreateAnalyticsEventInput],
66 ) -> Result<Vec<AnalyticsEventCreated>> {
67 if inputs.is_empty() {
68 return Ok(Vec::new());
69 }
70
71 let mut ids = Vec::with_capacity(inputs.len());
72 let mut user_ids = Vec::with_capacity(inputs.len());
73 let mut session_ids = Vec::with_capacity(inputs.len());
74 let mut event_types = Vec::with_capacity(inputs.len());
75 let mut event_categories = Vec::with_capacity(inputs.len());
76 let mut severities = Vec::with_capacity(inputs.len());
77 let mut endpoints: Vec<String> = Vec::with_capacity(inputs.len());
78 let mut event_datas = Vec::with_capacity(inputs.len());
79
80 for input in inputs {
81 let id = format!("evt_{}", uuid::Uuid::new_v4());
82 ids.push(id);
83 user_ids.push(user_id.as_str().to_string());
84 session_ids.push(session_id.as_str().to_string());
85 event_types.push(input.event_type.as_str().to_string());
86 event_categories.push(input.event_type.category().to_string());
87 severities.push("info".to_string());
88 endpoints.push(input.page_url.clone());
89 event_datas.push(Self::build_event_data(input));
90 }
91
92 sqlx::query!(
93 r#"
94 INSERT INTO analytics_events (id, user_id, session_id, event_type, event_category, severity, endpoint, event_data)
95 SELECT * FROM UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[], $8::jsonb[])
96 "#,
97 &ids,
98 &user_ids,
99 &session_ids,
100 &event_types,
101 &event_categories,
102 &severities,
103 &endpoints,
104 &event_datas
105 )
106 .execute(&*self.write_pool)
107 .await?;
108
109 Ok(ids
110 .into_iter()
111 .zip(event_types)
112 .map(|(id, event_type)| AnalyticsEventCreated { id, event_type })
113 .collect())
114 }
115
116 pub async fn count_events_by_type(
117 &self,
118 session_id: &SessionId,
119 event_type: &AnalyticsEventType,
120 ) -> Result<i64> {
121 let count = sqlx::query_scalar!(
122 r#"
123 SELECT COUNT(*) as "count!"
124 FROM analytics_events
125 WHERE session_id = $1 AND event_type = $2
126 "#,
127 session_id.as_str(),
128 event_type.as_str()
129 )
130 .fetch_one(&*self.pool)
131 .await?;
132
133 Ok(count)
134 }
135
136 pub async fn find_by_session(
137 &self,
138 session_id: &SessionId,
139 limit: i64,
140 ) -> Result<Vec<StoredAnalyticsEvent>> {
141 let events = sqlx::query_as!(
142 StoredAnalyticsEvent,
143 r#"
144 SELECT
145 id,
146 user_id as "user_id: UserId",
147 session_id as "session_id: SessionId",
148 event_type,
149 event_category,
150 endpoint as page_url,
151 event_data,
152 timestamp
153 FROM analytics_events
154 WHERE session_id = $1
155 ORDER BY timestamp DESC
156 LIMIT $2
157 "#,
158 session_id.as_str(),
159 limit
160 )
161 .fetch_all(&*self.pool)
162 .await?;
163
164 Ok(events)
165 }
166
167 pub async fn find_by_content(
168 &self,
169 content_id: &ContentId,
170 limit: i64,
171 ) -> Result<Vec<StoredAnalyticsEvent>> {
172 let events = sqlx::query_as!(
173 StoredAnalyticsEvent,
174 r#"
175 SELECT
176 id,
177 user_id as "user_id: UserId",
178 session_id as "session_id: SessionId",
179 event_type,
180 event_category,
181 endpoint as page_url,
182 event_data,
183 timestamp
184 FROM analytics_events
185 WHERE event_data->>'content_id' = $1
186 ORDER BY timestamp DESC
187 LIMIT $2
188 "#,
189 content_id.as_str(),
190 limit
191 )
192 .fetch_all(&*self.pool)
193 .await?;
194
195 Ok(events)
196 }
197
198 fn build_event_data(input: &CreateAnalyticsEventInput) -> serde_json::Value {
199 let mut data = input.data.clone().unwrap_or(serde_json::json!({}));
200
201 if let Some(obj) = data.as_object_mut() {
202 if let Some(content_id) = &input.content_id {
203 obj.insert(
204 "content_id".to_string(),
205 serde_json::json!(content_id.as_str()),
206 );
207 }
208 if let Some(slug) = &input.slug {
209 obj.insert("slug".to_string(), serde_json::json!(slug));
210 }
211 if let Some(referrer) = &input.referrer {
212 obj.insert("referrer".to_string(), serde_json::json!(referrer));
213 }
214 }
215
216 data
217 }
218}
219
220#[derive(Debug, Clone, sqlx::FromRow)]
221pub struct StoredAnalyticsEvent {
222 pub id: String,
224 pub user_id: UserId,
225 pub session_id: Option<SessionId>,
226 pub event_type: String,
227 pub event_category: String,
228 pub page_url: Option<String>,
229 pub event_data: Option<serde_json::Value>,
230 pub timestamp: chrono::DateTime<chrono::Utc>,
231}