systemprompt_content/repository/link/
analytics.rs1use crate::error::ContentError;
2use crate::models::{
3 CampaignPerformance, ContentJourneyNode, LinkClick, LinkPerformance, RecordClickParams,
4};
5use sqlx::PgPool;
6use std::sync::Arc;
7use systemprompt_database::DbPool;
8use systemprompt_identifiers::{
9 CampaignId, ContentId, ContextId, LinkClickId, LinkId, SessionId, TaskId, UserId,
10};
11
12#[derive(Debug)]
13pub struct LinkAnalyticsRepository {
14 pool: Arc<PgPool>,
15 write_pool: Arc<PgPool>,
16}
17
18impl LinkAnalyticsRepository {
19 pub fn new(db: &DbPool) -> Result<Self, ContentError> {
20 let pool = db
21 .pool_arc()
22 .map_err(|e| ContentError::InvalidRequest(format!("Database pool error: {e}")))?;
23 let write_pool = db
24 .write_pool_arc()
25 .map_err(|e| ContentError::InvalidRequest(format!("Database write pool error: {e}")))?;
26 Ok(Self { pool, write_pool })
27 }
28
29 pub async fn get_link_performance(
30 &self,
31 link_id: &LinkId,
32 ) -> Result<Option<LinkPerformance>, sqlx::Error> {
33 sqlx::query_as!(
34 LinkPerformance,
35 r#"
36 SELECT
37 l.id as "link_id: LinkId",
38 COALESCE(l.click_count, 0)::bigint as "click_count!",
39 COALESCE(l.unique_click_count, 0)::bigint as "unique_click_count!",
40 COALESCE(l.conversion_count, 0)::bigint as "conversion_count!",
41 CASE
42 WHEN COALESCE(l.click_count, 0) > 0 THEN
43 COALESCE(l.conversion_count, 0)::float / l.click_count
44 ELSE 0.0
45 END as conversion_rate
46 FROM campaign_links l
47 WHERE l.id = $1
48 "#,
49 link_id.as_str()
50 )
51 .fetch_optional(&*self.pool)
52 .await
53 }
54
55 pub async fn check_session_clicked_link(
56 &self,
57 link_id: &LinkId,
58 session_id: &SessionId,
59 ) -> Result<bool, sqlx::Error> {
60 let result = sqlx::query!(
61 r#"SELECT COALESCE(COUNT(*), 0)::bigint as "count!" FROM link_clicks WHERE link_id = $1 AND session_id = $2"#,
62 link_id.as_str(),
63 session_id.as_str()
64 )
65 .fetch_one(&*self.pool)
66 .await?;
67
68 Ok(result.count > 0)
69 }
70
71 pub async fn increment_link_clicks(
72 &self,
73 link_id: &LinkId,
74 is_first_click: bool,
75 ) -> Result<(), sqlx::Error> {
76 if is_first_click {
77 sqlx::query!(
78 "UPDATE campaign_links SET click_count = click_count + 1, unique_click_count = \
79 unique_click_count + 1 WHERE id = $1",
80 link_id.as_str()
81 )
82 .execute(&*self.write_pool)
83 .await?;
84 } else {
85 sqlx::query!(
86 "UPDATE campaign_links SET click_count = click_count + 1 WHERE id = $1",
87 link_id.as_str()
88 )
89 .execute(&*self.write_pool)
90 .await?;
91 }
92 Ok(())
93 }
94
95 pub async fn get_clicks_by_link(
96 &self,
97 link_id: &LinkId,
98 limit: i64,
99 offset: i64,
100 ) -> Result<Vec<LinkClick>, sqlx::Error> {
101 sqlx::query_as!(
102 LinkClick,
103 r#"
104 SELECT id as "id: LinkClickId", link_id as "link_id: LinkId",
105 session_id as "session_id: SessionId", user_id as "user_id: UserId",
106 context_id as "context_id: ContextId", task_id as "task_id: TaskId",
107 referrer_page, referrer_url, clicked_at, user_agent, ip_address,
108 device_type, country, is_first_click, is_conversion, conversion_at,
109 time_on_page_seconds, scroll_depth_percent
110 FROM link_clicks
111 WHERE link_id = $1
112 ORDER BY clicked_at DESC
113 LIMIT $2 OFFSET $3
114 "#,
115 link_id.as_str(),
116 limit,
117 offset
118 )
119 .fetch_all(&*self.pool)
120 .await
121 }
122
123 pub async fn get_content_journey_map(
124 &self,
125 limit: i64,
126 offset: i64,
127 ) -> Result<Vec<ContentJourneyNode>, sqlx::Error> {
128 let rows = sqlx::query!(
129 r#"
130 SELECT source_content_id, target_url, COALESCE(click_count, 0) as "click_count!"
131 FROM campaign_links
132 WHERE source_content_id IS NOT NULL AND click_count > 0
133 ORDER BY click_count DESC
134 LIMIT $1 OFFSET $2
135 "#,
136 limit,
137 offset
138 )
139 .fetch_all(&*self.pool)
140 .await?;
141
142 Ok(rows
143 .into_iter()
144 .filter_map(|r| {
145 Some(ContentJourneyNode {
146 source_content_id: ContentId::new(r.source_content_id?),
147 target_url: r.target_url,
148 click_count: r.click_count,
149 })
150 })
151 .collect())
152 }
153
154 pub async fn get_campaign_performance(
155 &self,
156 campaign_id: &CampaignId,
157 ) -> Result<Option<CampaignPerformance>, sqlx::Error> {
158 sqlx::query_as!(
159 CampaignPerformance,
160 r#"
161 SELECT
162 campaign_id as "campaign_id!: CampaignId",
163 COALESCE(SUM(click_count), 0)::bigint as "total_clicks!",
164 COUNT(*)::bigint as "link_count!",
165 COUNT(DISTINCT source_content_id) as unique_visitors,
166 COALESCE(SUM(conversion_count), 0)::bigint as conversion_count
167 FROM campaign_links
168 WHERE campaign_id = $1
169 GROUP BY campaign_id
170 "#,
171 campaign_id.as_str()
172 )
173 .fetch_optional(&*self.pool)
174 .await
175 }
176
177 #[allow(clippy::cognitive_complexity)]
178 pub async fn record_click(&self, params: &RecordClickParams) -> Result<(), sqlx::Error> {
179 sqlx::query!(
180 r#"
181 INSERT INTO link_clicks (
182 id, link_id, session_id, user_id, context_id, task_id,
183 referrer_page, referrer_url, clicked_at, user_agent, ip_address,
184 device_type, country, is_first_click, is_conversion
185 )
186 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
187 "#,
188 params.click_id.as_str(),
189 params.link_id.as_str(),
190 params.session_id.as_str(),
191 params.user_id.as_ref().map(UserId::as_str),
192 params.context_id.as_ref().map(ContextId::as_str),
193 params.task_id.as_ref().map(TaskId::as_str),
194 params.referrer_page,
195 params.referrer_url,
196 params.clicked_at,
197 params.user_agent,
198 params.ip_address,
199 params.device_type,
200 params.country,
201 params.is_first_click,
202 params.is_conversion
203 )
204 .execute(&*self.write_pool)
205 .await?;
206 Ok(())
207 }
208}