Skip to main content

systemprompt_analytics/repository/
events.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use sqlx::PgPool;
5use systemprompt_database::DbPool;
6use systemprompt_identifiers::{ContentId, SessionId, UserId};
7
8use crate::models::{AnalyticsEventCreated, AnalyticsEventType, CreateAnalyticsEventInput};
9
10#[derive(Clone, Debug)]
11pub struct AnalyticsEventsRepository {
12    pool: Arc<PgPool>,
13}
14
15impl AnalyticsEventsRepository {
16    pub fn new(db: &DbPool) -> Result<Self> {
17        let pool = db.pool_arc()?;
18        Ok(Self { pool })
19    }
20
21    pub async fn create_event(
22        &self,
23        session_id: &str,
24        user_id: &str,
25        input: &CreateAnalyticsEventInput,
26    ) -> Result<AnalyticsEventCreated> {
27        let id = format!("evt_{}", uuid::Uuid::new_v4());
28        let event_type = input.event_type.as_str();
29        let event_category = input.event_type.category();
30
31        let event_data = Self::build_event_data(input);
32
33        sqlx::query!(
34            r#"
35            INSERT INTO analytics_events (
36                id, user_id, session_id, event_type, event_category,
37                severity, endpoint, event_data
38            )
39            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
40            "#,
41            id,
42            user_id,
43            session_id,
44            event_type,
45            event_category,
46            "info",
47            input.page_url,
48            event_data
49        )
50        .execute(&*self.pool)
51        .await?;
52
53        Ok(AnalyticsEventCreated {
54            id,
55            event_type: event_type.to_string(),
56        })
57    }
58
59    pub async fn create_events_batch(
60        &self,
61        session_id: &str,
62        user_id: &str,
63        inputs: &[CreateAnalyticsEventInput],
64    ) -> Result<Vec<AnalyticsEventCreated>> {
65        let mut results = Vec::with_capacity(inputs.len());
66
67        for input in inputs {
68            let created = self.create_event(session_id, user_id, input).await?;
69            results.push(created);
70        }
71
72        Ok(results)
73    }
74
75    pub async fn count_events_by_type(
76        &self,
77        session_id: &str,
78        event_type: &AnalyticsEventType,
79    ) -> Result<i64> {
80        let count = sqlx::query_scalar!(
81            r#"
82            SELECT COUNT(*) as "count!"
83            FROM analytics_events
84            WHERE session_id = $1 AND event_type = $2
85            "#,
86            session_id,
87            event_type.as_str()
88        )
89        .fetch_one(&*self.pool)
90        .await?;
91
92        Ok(count)
93    }
94
95    pub async fn find_by_session(
96        &self,
97        session_id: &str,
98        limit: i64,
99    ) -> Result<Vec<StoredAnalyticsEvent>> {
100        let events = sqlx::query_as!(
101            StoredAnalyticsEvent,
102            r#"
103            SELECT
104                id,
105                user_id as "user_id: UserId",
106                session_id as "session_id: SessionId",
107                event_type,
108                event_category,
109                endpoint as page_url,
110                event_data,
111                timestamp
112            FROM analytics_events
113            WHERE session_id = $1
114            ORDER BY timestamp DESC
115            LIMIT $2
116            "#,
117            session_id,
118            limit
119        )
120        .fetch_all(&*self.pool)
121        .await?;
122
123        Ok(events)
124    }
125
126    pub async fn find_by_content(
127        &self,
128        content_id: &ContentId,
129        limit: i64,
130    ) -> Result<Vec<StoredAnalyticsEvent>> {
131        let events = sqlx::query_as!(
132            StoredAnalyticsEvent,
133            r#"
134            SELECT
135                id,
136                user_id as "user_id: UserId",
137                session_id as "session_id: SessionId",
138                event_type,
139                event_category,
140                endpoint as page_url,
141                event_data,
142                timestamp
143            FROM analytics_events
144            WHERE event_data->>'content_id' = $1
145            ORDER BY timestamp DESC
146            LIMIT $2
147            "#,
148            content_id.as_str(),
149            limit
150        )
151        .fetch_all(&*self.pool)
152        .await?;
153
154        Ok(events)
155    }
156
157    fn build_event_data(input: &CreateAnalyticsEventInput) -> serde_json::Value {
158        let mut data = input.data.clone().unwrap_or(serde_json::json!({}));
159
160        if let Some(obj) = data.as_object_mut() {
161            if let Some(content_id) = &input.content_id {
162                obj.insert(
163                    "content_id".to_string(),
164                    serde_json::json!(content_id.as_str()),
165                );
166            }
167            if let Some(slug) = &input.slug {
168                obj.insert("slug".to_string(), serde_json::json!(slug));
169            }
170            if let Some(referrer) = &input.referrer {
171                obj.insert("referrer".to_string(), serde_json::json!(referrer));
172            }
173        }
174
175        data
176    }
177}
178
179#[derive(Debug, Clone, sqlx::FromRow)]
180pub struct StoredAnalyticsEvent {
181    pub id: String,
182    pub user_id: UserId,
183    pub session_id: Option<SessionId>,
184    pub event_type: String,
185    pub event_category: String,
186    pub page_url: Option<String>,
187    pub event_data: Option<serde_json::Value>,
188    pub timestamp: chrono::DateTime<chrono::Utc>,
189}