systemprompt_content/repository/link/
analytics.rs1use crate::error::ContentError;
2use crate::models::{
3 CampaignPerformance, ContentJourneyNode, LinkClick, LinkPerformance, RecordClickParams,
4};
5use sqlx::PgPool;
6use std::sync::Arc;
7use systemprompt_database::DbPool;
8use systemprompt_identifiers::{
9 CampaignId, ContentId, ContextId, LinkClickId, LinkId, SessionId, TaskId, UserId,
10};
11
12#[derive(Debug)]
13pub struct LinkAnalyticsRepository {
14 pool: Arc<PgPool>,
15}
16
17impl LinkAnalyticsRepository {
18 pub fn new(db: &DbPool) -> Result<Self, ContentError> {
19 let pool = db
20 .pool_arc()
21 .map_err(|e| ContentError::InvalidRequest(format!("Database pool error: {e}")))?;
22 Ok(Self { pool })
23 }
24
25 pub async fn get_link_performance(
26 &self,
27 link_id: &LinkId,
28 ) -> Result<Option<LinkPerformance>, sqlx::Error> {
29 sqlx::query_as!(
30 LinkPerformance,
31 r#"
32 SELECT
33 l.id as "link_id: LinkId",
34 COALESCE(l.click_count, 0)::bigint as "click_count!",
35 COALESCE(l.unique_click_count, 0)::bigint as "unique_click_count!",
36 COALESCE(l.conversion_count, 0)::bigint as "conversion_count!",
37 CASE
38 WHEN COALESCE(l.click_count, 0) > 0 THEN
39 COALESCE(l.conversion_count, 0)::float / l.click_count
40 ELSE 0.0
41 END as conversion_rate
42 FROM campaign_links l
43 WHERE l.id = $1
44 "#,
45 link_id.as_str()
46 )
47 .fetch_optional(&*self.pool)
48 .await
49 }
50
51 pub async fn check_session_clicked_link(
52 &self,
53 link_id: &LinkId,
54 session_id: &SessionId,
55 ) -> Result<bool, sqlx::Error> {
56 let result = sqlx::query!(
57 r#"SELECT COALESCE(COUNT(*), 0)::bigint as "count!" FROM link_clicks WHERE link_id = $1 AND session_id = $2"#,
58 link_id.as_str(),
59 session_id.as_str()
60 )
61 .fetch_one(&*self.pool)
62 .await?;
63
64 Ok(result.count > 0)
65 }
66
67 pub async fn increment_link_clicks(
68 &self,
69 link_id: &LinkId,
70 is_first_click: bool,
71 ) -> Result<(), sqlx::Error> {
72 if is_first_click {
73 sqlx::query!(
74 "UPDATE campaign_links SET click_count = click_count + 1, unique_click_count = \
75 unique_click_count + 1 WHERE id = $1",
76 link_id.as_str()
77 )
78 .execute(&*self.pool)
79 .await?;
80 } else {
81 sqlx::query!(
82 "UPDATE campaign_links SET click_count = click_count + 1 WHERE id = $1",
83 link_id.as_str()
84 )
85 .execute(&*self.pool)
86 .await?;
87 }
88 Ok(())
89 }
90
91 pub async fn get_clicks_by_link(
92 &self,
93 link_id: &LinkId,
94 limit: i64,
95 offset: i64,
96 ) -> Result<Vec<LinkClick>, sqlx::Error> {
97 sqlx::query_as!(
98 LinkClick,
99 r#"
100 SELECT id as "id: LinkClickId", link_id as "link_id: LinkId",
101 session_id as "session_id: SessionId", user_id as "user_id: UserId",
102 context_id as "context_id: ContextId", task_id as "task_id: TaskId",
103 referrer_page, referrer_url, clicked_at, user_agent, ip_address,
104 device_type, country, is_first_click, is_conversion, conversion_at,
105 time_on_page_seconds, scroll_depth_percent
106 FROM link_clicks
107 WHERE link_id = $1
108 ORDER BY clicked_at DESC
109 LIMIT $2 OFFSET $3
110 "#,
111 link_id.as_str(),
112 limit,
113 offset
114 )
115 .fetch_all(&*self.pool)
116 .await
117 }
118
119 pub async fn get_content_journey_map(
120 &self,
121 limit: i64,
122 offset: i64,
123 ) -> Result<Vec<ContentJourneyNode>, sqlx::Error> {
124 let rows = sqlx::query!(
125 r#"
126 SELECT source_content_id, target_url, COALESCE(click_count, 0) as "click_count!"
127 FROM campaign_links
128 WHERE source_content_id IS NOT NULL AND click_count > 0
129 ORDER BY click_count DESC
130 LIMIT $1 OFFSET $2
131 "#,
132 limit,
133 offset
134 )
135 .fetch_all(&*self.pool)
136 .await?;
137
138 Ok(rows
139 .into_iter()
140 .filter_map(|r| {
141 Some(ContentJourneyNode {
142 source_content_id: ContentId::new(r.source_content_id?),
143 target_url: r.target_url,
144 click_count: r.click_count,
145 })
146 })
147 .collect())
148 }
149
150 pub async fn get_campaign_performance(
151 &self,
152 campaign_id: &CampaignId,
153 ) -> Result<Option<CampaignPerformance>, sqlx::Error> {
154 sqlx::query_as!(
155 CampaignPerformance,
156 r#"
157 SELECT
158 campaign_id as "campaign_id!: CampaignId",
159 COALESCE(SUM(click_count), 0)::bigint as "total_clicks!",
160 COUNT(*)::bigint as "link_count!",
161 COUNT(DISTINCT source_content_id) as unique_visitors,
162 COALESCE(SUM(conversion_count), 0)::bigint as conversion_count
163 FROM campaign_links
164 WHERE campaign_id = $1
165 GROUP BY campaign_id
166 "#,
167 campaign_id.as_str()
168 )
169 .fetch_optional(&*self.pool)
170 .await
171 }
172
173 #[allow(clippy::cognitive_complexity)]
174 pub async fn record_click(&self, params: &RecordClickParams) -> Result<(), sqlx::Error> {
175 sqlx::query!(
176 r#"
177 INSERT INTO link_clicks (
178 id, link_id, session_id, user_id, context_id, task_id,
179 referrer_page, referrer_url, clicked_at, user_agent, ip_address,
180 device_type, country, is_first_click, is_conversion
181 )
182 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
183 "#,
184 params.click_id.as_str(),
185 params.link_id.as_str(),
186 params.session_id.as_str(),
187 params.user_id.as_ref().map(UserId::as_str),
188 params.context_id.as_ref().map(ContextId::as_str),
189 params.task_id.as_ref().map(TaskId::as_str),
190 params.referrer_page,
191 params.referrer_url,
192 params.clicked_at,
193 params.user_agent,
194 params.ip_address,
195 params.device_type,
196 params.country,
197 params.is_first_click,
198 params.is_conversion
199 )
200 .execute(&*self.pool)
201 .await?;
202 Ok(())
203 }
204}