Skip to main content

systemprompt_analytics/repository/
events.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use sqlx::PgPool;
5use systemprompt_database::DbPool;
6use systemprompt_identifiers::{ContentId, SessionId, UserId};
7
8use crate::models::{AnalyticsEventCreated, AnalyticsEventType, CreateAnalyticsEventInput};
9
10#[derive(Clone, Debug)]
11pub struct AnalyticsEventsRepository {
12    pool: Arc<PgPool>,
13    write_pool: Arc<PgPool>,
14}
15
16impl AnalyticsEventsRepository {
17    pub fn new(db: &DbPool) -> Result<Self> {
18        let pool = db.pool_arc()?;
19        let write_pool = db.write_pool_arc()?;
20        Ok(Self { pool, write_pool })
21    }
22
23    pub async fn create_event(
24        &self,
25        session_id: &str,
26        user_id: &str,
27        input: &CreateAnalyticsEventInput,
28    ) -> Result<AnalyticsEventCreated> {
29        let id = format!("evt_{}", uuid::Uuid::new_v4());
30        let event_type = input.event_type.as_str();
31        let event_category = input.event_type.category();
32
33        let event_data = Self::build_event_data(input);
34
35        sqlx::query!(
36            r#"
37            INSERT INTO analytics_events (
38                id, user_id, session_id, event_type, event_category,
39                severity, endpoint, event_data
40            )
41            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
42            "#,
43            id,
44            user_id,
45            session_id,
46            event_type,
47            event_category,
48            "info",
49            input.page_url,
50            event_data
51        )
52        .execute(&*self.write_pool)
53        .await?;
54
55        Ok(AnalyticsEventCreated {
56            id,
57            event_type: event_type.to_string(),
58        })
59    }
60
61    pub async fn create_events_batch(
62        &self,
63        session_id: &str,
64        user_id: &str,
65        inputs: &[CreateAnalyticsEventInput],
66    ) -> Result<Vec<AnalyticsEventCreated>> {
67        let mut results = Vec::with_capacity(inputs.len());
68
69        for input in inputs {
70            let created = self.create_event(session_id, user_id, input).await?;
71            results.push(created);
72        }
73
74        Ok(results)
75    }
76
77    pub async fn count_events_by_type(
78        &self,
79        session_id: &str,
80        event_type: &AnalyticsEventType,
81    ) -> Result<i64> {
82        let count = sqlx::query_scalar!(
83            r#"
84            SELECT COUNT(*) as "count!"
85            FROM analytics_events
86            WHERE session_id = $1 AND event_type = $2
87            "#,
88            session_id,
89            event_type.as_str()
90        )
91        .fetch_one(&*self.pool)
92        .await?;
93
94        Ok(count)
95    }
96
97    pub async fn find_by_session(
98        &self,
99        session_id: &str,
100        limit: i64,
101    ) -> Result<Vec<StoredAnalyticsEvent>> {
102        let events = sqlx::query_as!(
103            StoredAnalyticsEvent,
104            r#"
105            SELECT
106                id,
107                user_id as "user_id: UserId",
108                session_id as "session_id: SessionId",
109                event_type,
110                event_category,
111                endpoint as page_url,
112                event_data,
113                timestamp
114            FROM analytics_events
115            WHERE session_id = $1
116            ORDER BY timestamp DESC
117            LIMIT $2
118            "#,
119            session_id,
120            limit
121        )
122        .fetch_all(&*self.pool)
123        .await?;
124
125        Ok(events)
126    }
127
128    pub async fn find_by_content(
129        &self,
130        content_id: &ContentId,
131        limit: i64,
132    ) -> Result<Vec<StoredAnalyticsEvent>> {
133        let events = sqlx::query_as!(
134            StoredAnalyticsEvent,
135            r#"
136            SELECT
137                id,
138                user_id as "user_id: UserId",
139                session_id as "session_id: SessionId",
140                event_type,
141                event_category,
142                endpoint as page_url,
143                event_data,
144                timestamp
145            FROM analytics_events
146            WHERE event_data->>'content_id' = $1
147            ORDER BY timestamp DESC
148            LIMIT $2
149            "#,
150            content_id.as_str(),
151            limit
152        )
153        .fetch_all(&*self.pool)
154        .await?;
155
156        Ok(events)
157    }
158
159    fn build_event_data(input: &CreateAnalyticsEventInput) -> serde_json::Value {
160        let mut data = input.data.clone().unwrap_or(serde_json::json!({}));
161
162        if let Some(obj) = data.as_object_mut() {
163            if let Some(content_id) = &input.content_id {
164                obj.insert(
165                    "content_id".to_string(),
166                    serde_json::json!(content_id.as_str()),
167                );
168            }
169            if let Some(slug) = &input.slug {
170                obj.insert("slug".to_string(), serde_json::json!(slug));
171            }
172            if let Some(referrer) = &input.referrer {
173                obj.insert("referrer".to_string(), serde_json::json!(referrer));
174            }
175        }
176
177        data
178    }
179}
180
181#[derive(Debug, Clone, sqlx::FromRow)]
182pub struct StoredAnalyticsEvent {
183    pub id: String,
184    pub user_id: UserId,
185    pub session_id: Option<SessionId>,
186    pub event_type: String,
187    pub event_category: String,
188    pub page_url: Option<String>,
189    pub event_data: Option<serde_json::Value>,
190    pub timestamp: chrono::DateTime<chrono::Utc>,
191}