Skip to main content

systemprompt_analytics/repository/
events.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use sqlx::PgPool;
5use systemprompt_database::DbPool;
6use systemprompt_identifiers::{ContentId, SessionId, UserId};
7
8use crate::models::{AnalyticsEventCreated, AnalyticsEventType, CreateAnalyticsEventInput};
9
10#[derive(Clone, Debug)]
11pub struct AnalyticsEventsRepository {
12    pool: Arc<PgPool>,
13    write_pool: Arc<PgPool>,
14}
15
16impl AnalyticsEventsRepository {
17    pub fn new(db: &DbPool) -> Result<Self> {
18        let pool = db.pool_arc()?;
19        let write_pool = db.write_pool_arc()?;
20        Ok(Self { pool, write_pool })
21    }
22
23    pub async fn create_event(
24        &self,
25        session_id: &str,
26        user_id: &str,
27        input: &CreateAnalyticsEventInput,
28    ) -> Result<AnalyticsEventCreated> {
29        let id = format!("evt_{}", uuid::Uuid::new_v4());
30        let event_type = input.event_type.as_str();
31        let event_category = input.event_type.category();
32
33        let event_data = Self::build_event_data(input);
34
35        sqlx::query!(
36            r#"
37            INSERT INTO analytics_events (
38                id, user_id, session_id, event_type, event_category,
39                severity, endpoint, event_data
40            )
41            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
42            "#,
43            id,
44            user_id,
45            session_id,
46            event_type,
47            event_category,
48            "info",
49            input.page_url,
50            event_data
51        )
52        .execute(&*self.write_pool)
53        .await?;
54
55        Ok(AnalyticsEventCreated {
56            id,
57            event_type: event_type.to_string(),
58        })
59    }
60
61    pub async fn create_events_batch(
62        &self,
63        session_id: &str,
64        user_id: &str,
65        inputs: &[CreateAnalyticsEventInput],
66    ) -> Result<Vec<AnalyticsEventCreated>> {
67        if inputs.is_empty() {
68            return Ok(Vec::new());
69        }
70
71        let mut ids = Vec::with_capacity(inputs.len());
72        let mut user_ids = Vec::with_capacity(inputs.len());
73        let mut session_ids = Vec::with_capacity(inputs.len());
74        let mut event_types = Vec::with_capacity(inputs.len());
75        let mut event_categories = Vec::with_capacity(inputs.len());
76        let mut severities = Vec::with_capacity(inputs.len());
77        let mut endpoints: Vec<String> = Vec::with_capacity(inputs.len());
78        let mut event_datas = Vec::with_capacity(inputs.len());
79
80        for input in inputs {
81            let id = format!("evt_{}", uuid::Uuid::new_v4());
82            ids.push(id);
83            user_ids.push(user_id.to_string());
84            session_ids.push(session_id.to_string());
85            event_types.push(input.event_type.as_str().to_string());
86            event_categories.push(input.event_type.category().to_string());
87            severities.push("info".to_string());
88            endpoints.push(input.page_url.clone());
89            event_datas.push(Self::build_event_data(input));
90        }
91
92        sqlx::query!(
93            r#"
94            INSERT INTO analytics_events (id, user_id, session_id, event_type, event_category, severity, endpoint, event_data)
95            SELECT * FROM UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[], $8::jsonb[])
96            "#,
97            &ids,
98            &user_ids,
99            &session_ids,
100            &event_types,
101            &event_categories,
102            &severities,
103            &endpoints,
104            &event_datas
105        )
106        .execute(&*self.write_pool)
107        .await?;
108
109        Ok(ids
110            .into_iter()
111            .zip(event_types)
112            .map(|(id, event_type)| AnalyticsEventCreated { id, event_type })
113            .collect())
114    }
115
116    pub async fn count_events_by_type(
117        &self,
118        session_id: &str,
119        event_type: &AnalyticsEventType,
120    ) -> Result<i64> {
121        let count = sqlx::query_scalar!(
122            r#"
123            SELECT COUNT(*) as "count!"
124            FROM analytics_events
125            WHERE session_id = $1 AND event_type = $2
126            "#,
127            session_id,
128            event_type.as_str()
129        )
130        .fetch_one(&*self.pool)
131        .await?;
132
133        Ok(count)
134    }
135
136    pub async fn find_by_session(
137        &self,
138        session_id: &str,
139        limit: i64,
140    ) -> Result<Vec<StoredAnalyticsEvent>> {
141        let events = sqlx::query_as!(
142            StoredAnalyticsEvent,
143            r#"
144            SELECT
145                id,
146                user_id as "user_id: UserId",
147                session_id as "session_id: SessionId",
148                event_type,
149                event_category,
150                endpoint as page_url,
151                event_data,
152                timestamp
153            FROM analytics_events
154            WHERE session_id = $1
155            ORDER BY timestamp DESC
156            LIMIT $2
157            "#,
158            session_id,
159            limit
160        )
161        .fetch_all(&*self.pool)
162        .await?;
163
164        Ok(events)
165    }
166
167    pub async fn find_by_content(
168        &self,
169        content_id: &ContentId,
170        limit: i64,
171    ) -> Result<Vec<StoredAnalyticsEvent>> {
172        let events = sqlx::query_as!(
173            StoredAnalyticsEvent,
174            r#"
175            SELECT
176                id,
177                user_id as "user_id: UserId",
178                session_id as "session_id: SessionId",
179                event_type,
180                event_category,
181                endpoint as page_url,
182                event_data,
183                timestamp
184            FROM analytics_events
185            WHERE event_data->>'content_id' = $1
186            ORDER BY timestamp DESC
187            LIMIT $2
188            "#,
189            content_id.as_str(),
190            limit
191        )
192        .fetch_all(&*self.pool)
193        .await?;
194
195        Ok(events)
196    }
197
198    fn build_event_data(input: &CreateAnalyticsEventInput) -> serde_json::Value {
199        let mut data = input.data.clone().unwrap_or(serde_json::json!({}));
200
201        if let Some(obj) = data.as_object_mut() {
202            if let Some(content_id) = &input.content_id {
203                obj.insert(
204                    "content_id".to_string(),
205                    serde_json::json!(content_id.as_str()),
206                );
207            }
208            if let Some(slug) = &input.slug {
209                obj.insert("slug".to_string(), serde_json::json!(slug));
210            }
211            if let Some(referrer) = &input.referrer {
212                obj.insert("referrer".to_string(), serde_json::json!(referrer));
213            }
214        }
215
216        data
217    }
218}
219
220#[derive(Debug, Clone, sqlx::FromRow)]
221pub struct StoredAnalyticsEvent {
222    pub id: String,
223    pub user_id: UserId,
224    pub session_id: Option<SessionId>,
225    pub event_type: String,
226    pub event_category: String,
227    pub page_url: Option<String>,
228    pub event_data: Option<serde_json::Value>,
229    pub timestamp: chrono::DateTime<chrono::Utc>,
230}