Skip to main content

systemprompt_analytics/repository/
events.rs

1//! Persistence for raw analytics events.
2//!
3//! [`AnalyticsEventsRepository`] writes individual and batched
4//! `analytics_events` rows (the batch path uses `UNNEST` for a single
5//! round-trip) and reads them back per session or content as
6//! [`StoredAnalyticsEvent`]. Writes target the write pool; reads the read
7//! pool.
8
9use std::sync::Arc;
10
11use crate::Result;
12use sqlx::PgPool;
13use systemprompt_database::DbPool;
14use systemprompt_identifiers::{ContentId, SessionId, UserId};
15
16use crate::models::{AnalyticsEventCreated, AnalyticsEventType, CreateAnalyticsEventInput};
17
18#[derive(Clone, Debug)]
19pub struct AnalyticsEventsRepository {
20    pool: Arc<PgPool>,
21    write_pool: Arc<PgPool>,
22}
23
24impl AnalyticsEventsRepository {
25    pub fn new(db: &DbPool) -> Result<Self> {
26        let pool = db.pool_arc()?;
27        let write_pool = db.write_pool_arc()?;
28        Ok(Self { pool, write_pool })
29    }
30
31    pub async fn create_event(
32        &self,
33        session_id: &SessionId,
34        user_id: &UserId,
35        input: &CreateAnalyticsEventInput,
36    ) -> Result<AnalyticsEventCreated> {
37        let id = format!("evt_{}", uuid::Uuid::new_v4());
38        let event_type = input.event_type.as_str();
39        let event_category = input.event_type.category();
40
41        let event_data = Self::build_event_data(input);
42
43        sqlx::query!(
44            r#"
45            INSERT INTO analytics_events (
46                id, user_id, session_id, event_type, event_category,
47                severity, endpoint, event_data
48            )
49            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
50            "#,
51            id,
52            user_id.as_str(),
53            session_id.as_str(),
54            event_type,
55            event_category,
56            "info",
57            input.page_url,
58            event_data
59        )
60        .execute(&*self.write_pool)
61        .await?;
62
63        Ok(AnalyticsEventCreated {
64            id,
65            event_type: event_type.to_owned(),
66        })
67    }
68
69    pub async fn create_events_batch(
70        &self,
71        session_id: &SessionId,
72        user_id: &UserId,
73        inputs: &[CreateAnalyticsEventInput],
74    ) -> Result<Vec<AnalyticsEventCreated>> {
75        if inputs.is_empty() {
76            return Ok(Vec::new());
77        }
78
79        let mut ids = Vec::with_capacity(inputs.len());
80        let mut user_ids = Vec::with_capacity(inputs.len());
81        let mut session_ids = Vec::with_capacity(inputs.len());
82        let mut event_types = Vec::with_capacity(inputs.len());
83        let mut event_categories = Vec::with_capacity(inputs.len());
84        let mut severities = Vec::with_capacity(inputs.len());
85        let mut endpoints: Vec<String> = Vec::with_capacity(inputs.len());
86        let mut event_datas = Vec::with_capacity(inputs.len());
87
88        for input in inputs {
89            let id = format!("evt_{}", uuid::Uuid::new_v4());
90            ids.push(id);
91            user_ids.push(user_id.as_str().to_owned());
92            session_ids.push(session_id.as_str().to_owned());
93            event_types.push(input.event_type.as_str().to_owned());
94            event_categories.push(input.event_type.category().to_owned());
95            severities.push("info".to_owned());
96            endpoints.push(input.page_url.clone());
97            event_datas.push(Self::build_event_data(input));
98        }
99
100        sqlx::query!(
101            r#"
102            INSERT INTO analytics_events (id, user_id, session_id, event_type, event_category, severity, endpoint, event_data)
103            SELECT * FROM UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[], $8::jsonb[])
104            "#,
105            &ids,
106            &user_ids,
107            &session_ids,
108            &event_types,
109            &event_categories,
110            &severities,
111            &endpoints,
112            &event_datas
113        )
114        .execute(&*self.write_pool)
115        .await?;
116
117        Ok(ids
118            .into_iter()
119            .zip(event_types)
120            .map(|(id, event_type)| AnalyticsEventCreated { id, event_type })
121            .collect())
122    }
123
124    pub async fn count_events_by_type(
125        &self,
126        session_id: &SessionId,
127        event_type: &AnalyticsEventType,
128    ) -> Result<i64> {
129        let count = sqlx::query_scalar!(
130            r#"
131            SELECT COUNT(*) as "count!"
132            FROM analytics_events
133            WHERE session_id = $1 AND event_type = $2
134            "#,
135            session_id.as_str(),
136            event_type.as_str()
137        )
138        .fetch_one(&*self.pool)
139        .await?;
140
141        Ok(count)
142    }
143
144    pub async fn find_by_session(
145        &self,
146        session_id: &SessionId,
147        limit: i64,
148    ) -> Result<Vec<StoredAnalyticsEvent>> {
149        let events = sqlx::query_as!(
150            StoredAnalyticsEvent,
151            r#"
152            SELECT
153                id,
154                user_id as "user_id: UserId",
155                session_id as "session_id: SessionId",
156                event_type,
157                event_category,
158                endpoint as page_url,
159                event_data,
160                timestamp
161            FROM analytics_events
162            WHERE session_id = $1
163            ORDER BY timestamp DESC
164            LIMIT $2
165            "#,
166            session_id.as_str(),
167            limit
168        )
169        .fetch_all(&*self.pool)
170        .await?;
171
172        Ok(events)
173    }
174
175    pub async fn find_by_content(
176        &self,
177        content_id: &ContentId,
178        limit: i64,
179    ) -> Result<Vec<StoredAnalyticsEvent>> {
180        let events = sqlx::query_as!(
181            StoredAnalyticsEvent,
182            r#"
183            SELECT
184                id,
185                user_id as "user_id: UserId",
186                session_id as "session_id: SessionId",
187                event_type,
188                event_category,
189                endpoint as page_url,
190                event_data,
191                timestamp
192            FROM analytics_events
193            WHERE event_data->>'content_id' = $1
194            ORDER BY timestamp DESC
195            LIMIT $2
196            "#,
197            content_id.as_str(),
198            limit
199        )
200        .fetch_all(&*self.pool)
201        .await?;
202
203        Ok(events)
204    }
205
206    fn build_event_data(input: &CreateAnalyticsEventInput) -> serde_json::Value {
207        let mut data = input.data.clone().unwrap_or(serde_json::json!({}));
208
209        if let Some(obj) = data.as_object_mut() {
210            if let Some(content_id) = &input.content_id {
211                obj.insert(
212                    "content_id".to_owned(),
213                    serde_json::json!(content_id.as_str()),
214                );
215            }
216            if let Some(slug) = &input.slug {
217                obj.insert("slug".to_owned(), serde_json::json!(slug));
218            }
219            if let Some(referrer) = &input.referrer {
220                obj.insert("referrer".to_owned(), serde_json::json!(referrer));
221            }
222        }
223
224        data
225    }
226}
227
228#[derive(Debug, Clone, sqlx::FromRow)]
229pub struct StoredAnalyticsEvent {
230    pub id: String,
231    pub user_id: UserId,
232    pub session_id: Option<SessionId>,
233    pub event_type: String,
234    pub event_category: String,
235    pub page_url: Option<String>,
236    pub event_data: Option<serde_json::Value>,
237    pub timestamp: chrono::DateTime<chrono::Utc>,
238}