Skip to main content

systemprompt_content/repository/link/
analytics.rs

1use crate::error::ContentError;
2use crate::models::{
3    CampaignPerformance, ContentJourneyNode, LinkClick, LinkPerformance, RecordClickParams,
4};
5use sqlx::PgPool;
6use std::sync::Arc;
7use systemprompt_database::DbPool;
8use systemprompt_identifiers::{
9    CampaignId, ContentId, ContextId, LinkClickId, LinkId, SessionId, TaskId, UserId,
10};
11
12#[derive(Debug)]
13pub struct LinkAnalyticsRepository {
14    pool: Arc<PgPool>,
15}
16
17impl LinkAnalyticsRepository {
18    pub fn new(db: &DbPool) -> Result<Self, ContentError> {
19        let pool = db
20            .pool_arc()
21            .map_err(|e| ContentError::InvalidRequest(format!("Database pool error: {e}")))?;
22        Ok(Self { pool })
23    }
24
25    pub async fn get_link_performance(
26        &self,
27        link_id: &LinkId,
28    ) -> Result<Option<LinkPerformance>, sqlx::Error> {
29        sqlx::query_as!(
30            LinkPerformance,
31            r#"
32            SELECT
33                l.id as "link_id: LinkId",
34                COALESCE(l.click_count, 0)::bigint as "click_count!",
35                COALESCE(l.unique_click_count, 0)::bigint as "unique_click_count!",
36                COALESCE(l.conversion_count, 0)::bigint as "conversion_count!",
37                CASE
38                    WHEN COALESCE(l.click_count, 0) > 0 THEN
39                        COALESCE(l.conversion_count, 0)::float / l.click_count
40                    ELSE 0.0
41                END as conversion_rate
42            FROM campaign_links l
43            WHERE l.id = $1
44            "#,
45            link_id.as_str()
46        )
47        .fetch_optional(&*self.pool)
48        .await
49    }
50
51    pub async fn check_session_clicked_link(
52        &self,
53        link_id: &LinkId,
54        session_id: &SessionId,
55    ) -> Result<bool, sqlx::Error> {
56        let result = sqlx::query!(
57            r#"SELECT COALESCE(COUNT(*), 0)::bigint as "count!" FROM link_clicks WHERE link_id = $1 AND session_id = $2"#,
58            link_id.as_str(),
59            session_id.as_str()
60        )
61        .fetch_one(&*self.pool)
62        .await?;
63
64        Ok(result.count > 0)
65    }
66
67    pub async fn increment_link_clicks(
68        &self,
69        link_id: &LinkId,
70        is_first_click: bool,
71    ) -> Result<(), sqlx::Error> {
72        if is_first_click {
73            sqlx::query!(
74                "UPDATE campaign_links SET click_count = click_count + 1, unique_click_count = \
75                 unique_click_count + 1 WHERE id = $1",
76                link_id.as_str()
77            )
78            .execute(&*self.pool)
79            .await?;
80        } else {
81            sqlx::query!(
82                "UPDATE campaign_links SET click_count = click_count + 1 WHERE id = $1",
83                link_id.as_str()
84            )
85            .execute(&*self.pool)
86            .await?;
87        }
88        Ok(())
89    }
90
91    pub async fn get_clicks_by_link(
92        &self,
93        link_id: &LinkId,
94        limit: i64,
95        offset: i64,
96    ) -> Result<Vec<LinkClick>, sqlx::Error> {
97        sqlx::query_as!(
98            LinkClick,
99            r#"
100            SELECT id as "id: LinkClickId", link_id as "link_id: LinkId",
101                   session_id as "session_id: SessionId", user_id as "user_id: UserId",
102                   context_id as "context_id: ContextId", task_id as "task_id: TaskId",
103                   referrer_page, referrer_url, clicked_at, user_agent, ip_address,
104                   device_type, country, is_first_click, is_conversion, conversion_at,
105                   time_on_page_seconds, scroll_depth_percent
106            FROM link_clicks
107            WHERE link_id = $1
108            ORDER BY clicked_at DESC
109            LIMIT $2 OFFSET $3
110            "#,
111            link_id.as_str(),
112            limit,
113            offset
114        )
115        .fetch_all(&*self.pool)
116        .await
117    }
118
119    pub async fn get_content_journey_map(
120        &self,
121        limit: i64,
122        offset: i64,
123    ) -> Result<Vec<ContentJourneyNode>, sqlx::Error> {
124        let rows = sqlx::query!(
125            r#"
126            SELECT source_content_id, target_url, COALESCE(click_count, 0) as "click_count!"
127            FROM campaign_links
128            WHERE source_content_id IS NOT NULL AND click_count > 0
129            ORDER BY click_count DESC
130            LIMIT $1 OFFSET $2
131            "#,
132            limit,
133            offset
134        )
135        .fetch_all(&*self.pool)
136        .await?;
137
138        Ok(rows
139            .into_iter()
140            .filter_map(|r| {
141                Some(ContentJourneyNode {
142                    source_content_id: ContentId::new(r.source_content_id?),
143                    target_url: r.target_url,
144                    click_count: r.click_count,
145                })
146            })
147            .collect())
148    }
149
150    pub async fn get_campaign_performance(
151        &self,
152        campaign_id: &CampaignId,
153    ) -> Result<Option<CampaignPerformance>, sqlx::Error> {
154        sqlx::query_as!(
155            CampaignPerformance,
156            r#"
157            SELECT
158                campaign_id as "campaign_id!: CampaignId",
159                COALESCE(SUM(click_count), 0)::bigint as "total_clicks!",
160                COUNT(*)::bigint as "link_count!",
161                COUNT(DISTINCT source_content_id) as unique_visitors,
162                COALESCE(SUM(conversion_count), 0)::bigint as conversion_count
163            FROM campaign_links
164            WHERE campaign_id = $1
165            GROUP BY campaign_id
166            "#,
167            campaign_id.as_str()
168        )
169        .fetch_optional(&*self.pool)
170        .await
171    }
172
173    #[allow(clippy::cognitive_complexity)]
174    pub async fn record_click(&self, params: &RecordClickParams) -> Result<(), sqlx::Error> {
175        sqlx::query!(
176            r#"
177            INSERT INTO link_clicks (
178                id, link_id, session_id, user_id, context_id, task_id,
179                referrer_page, referrer_url, clicked_at, user_agent, ip_address,
180                device_type, country, is_first_click, is_conversion
181            )
182            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
183            "#,
184            params.click_id.as_str(),
185            params.link_id.as_str(),
186            params.session_id.as_str(),
187            params.user_id.as_ref().map(UserId::as_str),
188            params.context_id.as_ref().map(ContextId::as_str),
189            params.task_id.as_ref().map(TaskId::as_str),
190            params.referrer_page,
191            params.referrer_url,
192            params.clicked_at,
193            params.user_agent,
194            params.ip_address,
195            params.device_type,
196            params.country,
197            params.is_first_click,
198            params.is_conversion
199        )
200        .execute(&*self.pool)
201        .await?;
202        Ok(())
203    }
204}