Skip to main content

systemprompt_content/repository/link/
analytics.rs

1use crate::error::ContentError;
2use crate::models::{
3    CampaignPerformance, ContentJourneyNode, LinkClick, LinkPerformance, RecordClickParams,
4};
5use sqlx::PgPool;
6use std::sync::Arc;
7use systemprompt_database::DbPool;
8use systemprompt_identifiers::{
9    CampaignId, ContentId, ContextId, LinkClickId, LinkId, SessionId, TaskId, UserId,
10};
11
12#[derive(Debug)]
13pub struct LinkAnalyticsRepository {
14    pool: Arc<PgPool>,
15    write_pool: Arc<PgPool>,
16}
17
18impl LinkAnalyticsRepository {
19    pub fn new(db: &DbPool) -> Result<Self, ContentError> {
20        let pool = db
21            .pool_arc()
22            .map_err(|e| ContentError::InvalidRequest(format!("Database pool error: {e}")))?;
23        let write_pool = db
24            .write_pool_arc()
25            .map_err(|e| ContentError::InvalidRequest(format!("Database write pool error: {e}")))?;
26        Ok(Self { pool, write_pool })
27    }
28
29    pub async fn get_link_performance(
30        &self,
31        link_id: &LinkId,
32    ) -> Result<Option<LinkPerformance>, sqlx::Error> {
33        sqlx::query_as!(
34            LinkPerformance,
35            r#"
36            SELECT
37                l.id as "link_id: LinkId",
38                COALESCE(l.click_count, 0)::bigint as "click_count!",
39                COALESCE(l.unique_click_count, 0)::bigint as "unique_click_count!",
40                COALESCE(l.conversion_count, 0)::bigint as "conversion_count!",
41                CASE
42                    WHEN COALESCE(l.click_count, 0) > 0 THEN
43                        COALESCE(l.conversion_count, 0)::float / l.click_count
44                    ELSE 0.0
45                END as conversion_rate
46            FROM campaign_links l
47            WHERE l.id = $1
48            "#,
49            link_id.as_str()
50        )
51        .fetch_optional(&*self.pool)
52        .await
53    }
54
55    pub async fn check_session_clicked_link(
56        &self,
57        link_id: &LinkId,
58        session_id: &SessionId,
59    ) -> Result<bool, sqlx::Error> {
60        let result = sqlx::query!(
61            r#"SELECT COALESCE(COUNT(*), 0)::bigint as "count!" FROM link_clicks WHERE link_id = $1 AND session_id = $2"#,
62            link_id.as_str(),
63            session_id.as_str()
64        )
65        .fetch_one(&*self.pool)
66        .await?;
67
68        Ok(result.count > 0)
69    }
70
71    pub async fn increment_link_clicks(
72        &self,
73        link_id: &LinkId,
74        is_first_click: bool,
75    ) -> Result<(), sqlx::Error> {
76        if is_first_click {
77            sqlx::query!(
78                "UPDATE campaign_links SET click_count = click_count + 1, unique_click_count = \
79                 unique_click_count + 1 WHERE id = $1",
80                link_id.as_str()
81            )
82            .execute(&*self.write_pool)
83            .await?;
84        } else {
85            sqlx::query!(
86                "UPDATE campaign_links SET click_count = click_count + 1 WHERE id = $1",
87                link_id.as_str()
88            )
89            .execute(&*self.write_pool)
90            .await?;
91        }
92        Ok(())
93    }
94
95    pub async fn get_clicks_by_link(
96        &self,
97        link_id: &LinkId,
98        limit: i64,
99        offset: i64,
100    ) -> Result<Vec<LinkClick>, sqlx::Error> {
101        sqlx::query_as!(
102            LinkClick,
103            r#"
104            SELECT id as "id: LinkClickId", link_id as "link_id: LinkId",
105                   session_id as "session_id: SessionId", user_id as "user_id: UserId",
106                   context_id as "context_id: ContextId", task_id as "task_id: TaskId",
107                   referrer_page, referrer_url, clicked_at, user_agent, ip_address,
108                   device_type, country, is_first_click, is_conversion, conversion_at,
109                   time_on_page_seconds, scroll_depth_percent
110            FROM link_clicks
111            WHERE link_id = $1
112            ORDER BY clicked_at DESC
113            LIMIT $2 OFFSET $3
114            "#,
115            link_id.as_str(),
116            limit,
117            offset
118        )
119        .fetch_all(&*self.pool)
120        .await
121    }
122
123    pub async fn get_content_journey_map(
124        &self,
125        limit: i64,
126        offset: i64,
127    ) -> Result<Vec<ContentJourneyNode>, sqlx::Error> {
128        let rows = sqlx::query!(
129            r#"
130            SELECT source_content_id, target_url, COALESCE(click_count, 0) as "click_count!"
131            FROM campaign_links
132            WHERE source_content_id IS NOT NULL AND click_count > 0
133            ORDER BY click_count DESC
134            LIMIT $1 OFFSET $2
135            "#,
136            limit,
137            offset
138        )
139        .fetch_all(&*self.pool)
140        .await?;
141
142        Ok(rows
143            .into_iter()
144            .filter_map(|r| {
145                Some(ContentJourneyNode {
146                    source_content_id: ContentId::new(r.source_content_id?),
147                    target_url: r.target_url,
148                    click_count: r.click_count,
149                })
150            })
151            .collect())
152    }
153
154    pub async fn get_campaign_performance(
155        &self,
156        campaign_id: &CampaignId,
157    ) -> Result<Option<CampaignPerformance>, sqlx::Error> {
158        sqlx::query_as!(
159            CampaignPerformance,
160            r#"
161            SELECT
162                campaign_id as "campaign_id!: CampaignId",
163                COALESCE(SUM(click_count), 0)::bigint as "total_clicks!",
164                COUNT(*)::bigint as "link_count!",
165                COUNT(DISTINCT source_content_id) as unique_visitors,
166                COALESCE(SUM(conversion_count), 0)::bigint as conversion_count
167            FROM campaign_links
168            WHERE campaign_id = $1
169            GROUP BY campaign_id
170            "#,
171            campaign_id.as_str()
172        )
173        .fetch_optional(&*self.pool)
174        .await
175    }
176
177    #[allow(clippy::cognitive_complexity)]
178    pub async fn record_click(&self, params: &RecordClickParams) -> Result<(), sqlx::Error> {
179        sqlx::query!(
180            r#"
181            INSERT INTO link_clicks (
182                id, link_id, session_id, user_id, context_id, task_id,
183                referrer_page, referrer_url, clicked_at, user_agent, ip_address,
184                device_type, country, is_first_click, is_conversion
185            )
186            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
187            "#,
188            params.click_id.as_str(),
189            params.link_id.as_str(),
190            params.session_id.as_str(),
191            params.user_id.as_ref().map(UserId::as_str),
192            params.context_id.as_ref().map(ContextId::as_str),
193            params.task_id.as_ref().map(TaskId::as_str),
194            params.referrer_page,
195            params.referrer_url,
196            params.clicked_at,
197            params.user_agent,
198            params.ip_address,
199            params.device_type,
200            params.country,
201            params.is_first_click,
202            params.is_conversion
203        )
204        .execute(&*self.write_pool)
205        .await?;
206        Ok(())
207    }
208}