systemprompt_analytics/repository/
events.rs1use std::sync::Arc;
10
11use crate::Result;
12use sqlx::PgPool;
13use systemprompt_database::DbPool;
14use systemprompt_identifiers::{ContentId, SessionId, UserId};
15
16use crate::models::{AnalyticsEventCreated, AnalyticsEventType, CreateAnalyticsEventInput};
17
18#[derive(Clone, Debug)]
19pub struct AnalyticsEventsRepository {
20 pool: Arc<PgPool>,
21 write_pool: Arc<PgPool>,
22}
23
24impl AnalyticsEventsRepository {
25 pub fn new(db: &DbPool) -> Result<Self> {
26 let pool = db.pool_arc()?;
27 let write_pool = db.write_pool_arc()?;
28 Ok(Self { pool, write_pool })
29 }
30
31 pub async fn create_event(
32 &self,
33 session_id: &SessionId,
34 user_id: &UserId,
35 input: &CreateAnalyticsEventInput,
36 ) -> Result<AnalyticsEventCreated> {
37 let id = format!("evt_{}", uuid::Uuid::new_v4());
38 let event_type = input.event_type.as_str();
39 let event_category = input.event_type.category();
40
41 let event_data = Self::build_event_data(input);
42
43 sqlx::query!(
44 r#"
45 INSERT INTO analytics_events (
46 id, user_id, session_id, event_type, event_category,
47 severity, endpoint, event_data
48 )
49 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
50 "#,
51 id,
52 user_id.as_str(),
53 session_id.as_str(),
54 event_type,
55 event_category,
56 "info",
57 input.page_url,
58 event_data
59 )
60 .execute(&*self.write_pool)
61 .await?;
62
63 Ok(AnalyticsEventCreated {
64 id,
65 event_type: event_type.to_owned(),
66 })
67 }
68
69 pub async fn create_events_batch(
70 &self,
71 session_id: &SessionId,
72 user_id: &UserId,
73 inputs: &[CreateAnalyticsEventInput],
74 ) -> Result<Vec<AnalyticsEventCreated>> {
75 if inputs.is_empty() {
76 return Ok(Vec::new());
77 }
78
79 let mut ids = Vec::with_capacity(inputs.len());
80 let mut user_ids = Vec::with_capacity(inputs.len());
81 let mut session_ids = Vec::with_capacity(inputs.len());
82 let mut event_types = Vec::with_capacity(inputs.len());
83 let mut event_categories = Vec::with_capacity(inputs.len());
84 let mut severities = Vec::with_capacity(inputs.len());
85 let mut endpoints: Vec<String> = Vec::with_capacity(inputs.len());
86 let mut event_datas = Vec::with_capacity(inputs.len());
87
88 for input in inputs {
89 let id = format!("evt_{}", uuid::Uuid::new_v4());
90 ids.push(id);
91 user_ids.push(user_id.as_str().to_owned());
92 session_ids.push(session_id.as_str().to_owned());
93 event_types.push(input.event_type.as_str().to_owned());
94 event_categories.push(input.event_type.category().to_owned());
95 severities.push("info".to_owned());
96 endpoints.push(input.page_url.clone());
97 event_datas.push(Self::build_event_data(input));
98 }
99
100 sqlx::query!(
101 r#"
102 INSERT INTO analytics_events (id, user_id, session_id, event_type, event_category, severity, endpoint, event_data)
103 SELECT * FROM UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[], $8::jsonb[])
104 "#,
105 &ids,
106 &user_ids,
107 &session_ids,
108 &event_types,
109 &event_categories,
110 &severities,
111 &endpoints,
112 &event_datas
113 )
114 .execute(&*self.write_pool)
115 .await?;
116
117 Ok(ids
118 .into_iter()
119 .zip(event_types)
120 .map(|(id, event_type)| AnalyticsEventCreated { id, event_type })
121 .collect())
122 }
123
124 pub async fn count_events_by_type(
125 &self,
126 session_id: &SessionId,
127 event_type: &AnalyticsEventType,
128 ) -> Result<i64> {
129 let count = sqlx::query_scalar!(
130 r#"
131 SELECT COUNT(*) as "count!"
132 FROM analytics_events
133 WHERE session_id = $1 AND event_type = $2
134 "#,
135 session_id.as_str(),
136 event_type.as_str()
137 )
138 .fetch_one(&*self.pool)
139 .await?;
140
141 Ok(count)
142 }
143
144 pub async fn find_by_session(
145 &self,
146 session_id: &SessionId,
147 limit: i64,
148 ) -> Result<Vec<StoredAnalyticsEvent>> {
149 let events = sqlx::query_as!(
150 StoredAnalyticsEvent,
151 r#"
152 SELECT
153 id,
154 user_id as "user_id: UserId",
155 session_id as "session_id: SessionId",
156 event_type,
157 event_category,
158 endpoint as page_url,
159 event_data,
160 timestamp
161 FROM analytics_events
162 WHERE session_id = $1
163 ORDER BY timestamp DESC
164 LIMIT $2
165 "#,
166 session_id.as_str(),
167 limit
168 )
169 .fetch_all(&*self.pool)
170 .await?;
171
172 Ok(events)
173 }
174
175 pub async fn find_by_content(
176 &self,
177 content_id: &ContentId,
178 limit: i64,
179 ) -> Result<Vec<StoredAnalyticsEvent>> {
180 let events = sqlx::query_as!(
181 StoredAnalyticsEvent,
182 r#"
183 SELECT
184 id,
185 user_id as "user_id: UserId",
186 session_id as "session_id: SessionId",
187 event_type,
188 event_category,
189 endpoint as page_url,
190 event_data,
191 timestamp
192 FROM analytics_events
193 WHERE event_data->>'content_id' = $1
194 ORDER BY timestamp DESC
195 LIMIT $2
196 "#,
197 content_id.as_str(),
198 limit
199 )
200 .fetch_all(&*self.pool)
201 .await?;
202
203 Ok(events)
204 }
205
206 fn build_event_data(input: &CreateAnalyticsEventInput) -> serde_json::Value {
207 let mut data = input.data.clone().unwrap_or(serde_json::json!({}));
208
209 if let Some(obj) = data.as_object_mut() {
210 if let Some(content_id) = &input.content_id {
211 obj.insert(
212 "content_id".to_owned(),
213 serde_json::json!(content_id.as_str()),
214 );
215 }
216 if let Some(slug) = &input.slug {
217 obj.insert("slug".to_owned(), serde_json::json!(slug));
218 }
219 if let Some(referrer) = &input.referrer {
220 obj.insert("referrer".to_owned(), serde_json::json!(referrer));
221 }
222 }
223
224 data
225 }
226}
227
228#[derive(Debug, Clone, sqlx::FromRow)]
229pub struct StoredAnalyticsEvent {
230 pub id: String,
231 pub user_id: UserId,
232 pub session_id: Option<SessionId>,
233 pub event_type: String,
234 pub event_category: String,
235 pub page_url: Option<String>,
236 pub event_data: Option<serde_json::Value>,
237 pub timestamp: chrono::DateTime<chrono::Utc>,
238}