systemprompt_analytics/repository/
events.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use sqlx::PgPool;
5use systemprompt_database::DbPool;
6use systemprompt_identifiers::{ContentId, SessionId, UserId};
7
8use crate::models::{AnalyticsEventCreated, AnalyticsEventType, CreateAnalyticsEventInput};
9
10#[derive(Clone, Debug)]
11pub struct AnalyticsEventsRepository {
12 pool: Arc<PgPool>,
13 write_pool: Arc<PgPool>,
14}
15
16impl AnalyticsEventsRepository {
17 pub fn new(db: &DbPool) -> Result<Self> {
18 let pool = db.pool_arc()?;
19 let write_pool = db.write_pool_arc()?;
20 Ok(Self { pool, write_pool })
21 }
22
23 pub async fn create_event(
24 &self,
25 session_id: &str,
26 user_id: &str,
27 input: &CreateAnalyticsEventInput,
28 ) -> Result<AnalyticsEventCreated> {
29 let id = format!("evt_{}", uuid::Uuid::new_v4());
30 let event_type = input.event_type.as_str();
31 let event_category = input.event_type.category();
32
33 let event_data = Self::build_event_data(input);
34
35 sqlx::query!(
36 r#"
37 INSERT INTO analytics_events (
38 id, user_id, session_id, event_type, event_category,
39 severity, endpoint, event_data
40 )
41 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
42 "#,
43 id,
44 user_id,
45 session_id,
46 event_type,
47 event_category,
48 "info",
49 input.page_url,
50 event_data
51 )
52 .execute(&*self.write_pool)
53 .await?;
54
55 Ok(AnalyticsEventCreated {
56 id,
57 event_type: event_type.to_string(),
58 })
59 }
60
61 pub async fn create_events_batch(
62 &self,
63 session_id: &str,
64 user_id: &str,
65 inputs: &[CreateAnalyticsEventInput],
66 ) -> Result<Vec<AnalyticsEventCreated>> {
67 let mut results = Vec::with_capacity(inputs.len());
68
69 for input in inputs {
70 let created = self.create_event(session_id, user_id, input).await?;
71 results.push(created);
72 }
73
74 Ok(results)
75 }
76
77 pub async fn count_events_by_type(
78 &self,
79 session_id: &str,
80 event_type: &AnalyticsEventType,
81 ) -> Result<i64> {
82 let count = sqlx::query_scalar!(
83 r#"
84 SELECT COUNT(*) as "count!"
85 FROM analytics_events
86 WHERE session_id = $1 AND event_type = $2
87 "#,
88 session_id,
89 event_type.as_str()
90 )
91 .fetch_one(&*self.pool)
92 .await?;
93
94 Ok(count)
95 }
96
97 pub async fn find_by_session(
98 &self,
99 session_id: &str,
100 limit: i64,
101 ) -> Result<Vec<StoredAnalyticsEvent>> {
102 let events = sqlx::query_as!(
103 StoredAnalyticsEvent,
104 r#"
105 SELECT
106 id,
107 user_id as "user_id: UserId",
108 session_id as "session_id: SessionId",
109 event_type,
110 event_category,
111 endpoint as page_url,
112 event_data,
113 timestamp
114 FROM analytics_events
115 WHERE session_id = $1
116 ORDER BY timestamp DESC
117 LIMIT $2
118 "#,
119 session_id,
120 limit
121 )
122 .fetch_all(&*self.pool)
123 .await?;
124
125 Ok(events)
126 }
127
128 pub async fn find_by_content(
129 &self,
130 content_id: &ContentId,
131 limit: i64,
132 ) -> Result<Vec<StoredAnalyticsEvent>> {
133 let events = sqlx::query_as!(
134 StoredAnalyticsEvent,
135 r#"
136 SELECT
137 id,
138 user_id as "user_id: UserId",
139 session_id as "session_id: SessionId",
140 event_type,
141 event_category,
142 endpoint as page_url,
143 event_data,
144 timestamp
145 FROM analytics_events
146 WHERE event_data->>'content_id' = $1
147 ORDER BY timestamp DESC
148 LIMIT $2
149 "#,
150 content_id.as_str(),
151 limit
152 )
153 .fetch_all(&*self.pool)
154 .await?;
155
156 Ok(events)
157 }
158
159 fn build_event_data(input: &CreateAnalyticsEventInput) -> serde_json::Value {
160 let mut data = input.data.clone().unwrap_or(serde_json::json!({}));
161
162 if let Some(obj) = data.as_object_mut() {
163 if let Some(content_id) = &input.content_id {
164 obj.insert(
165 "content_id".to_string(),
166 serde_json::json!(content_id.as_str()),
167 );
168 }
169 if let Some(slug) = &input.slug {
170 obj.insert("slug".to_string(), serde_json::json!(slug));
171 }
172 if let Some(referrer) = &input.referrer {
173 obj.insert("referrer".to_string(), serde_json::json!(referrer));
174 }
175 }
176
177 data
178 }
179}
180
181#[derive(Debug, Clone, sqlx::FromRow)]
182pub struct StoredAnalyticsEvent {
183 pub id: String,
184 pub user_id: UserId,
185 pub session_id: Option<SessionId>,
186 pub event_type: String,
187 pub event_category: String,
188 pub page_url: Option<String>,
189 pub event_data: Option<serde_json::Value>,
190 pub timestamp: chrono::DateTime<chrono::Utc>,
191}