systemprompt_analytics/services/
providers.rs1use async_trait::async_trait;
12use chrono::Utc;
13use http::{HeaderMap, Uri};
14use systemprompt_identifiers::{SessionId, UserId};
15use systemprompt_traits::{
16 ActiveSession, AnalyticsProvider, AnalyticsProviderError, AnalyticsResult, AnalyticsSession,
17 CreateSessionInput, FingerprintProvider, SessionAnalytics as TraitSessionAnalytics,
18 SessionAnalyticsProvider, SessionAnalyticsProviderError, SessionAnalyticsResult,
19};
20
21use super::SessionAnalytics;
22use super::service::AnalyticsService;
23use crate::repository::{FingerprintRepository, SessionRepository};
24
25#[async_trait]
26impl AnalyticsProvider for AnalyticsService {
27 fn extract_analytics(&self, headers: &HeaderMap, uri: Option<&Uri>) -> TraitSessionAnalytics {
28 let local = Self::extract_analytics(self, headers, uri);
29 TraitSessionAnalytics {
30 ip_address: local.ip_address,
31 user_agent: local.user_agent,
32 device_type: local.device_type,
33 browser: local.browser,
34 os: local.os,
35 fingerprint_hash: local.fingerprint_hash,
36 referer: local.referrer_url.clone(),
37 referrer_url: local.referrer_url,
38 referrer_source: local.referrer_source,
39 accept_language: local.preferred_locale.clone(),
40 preferred_locale: local.preferred_locale,
41 screen_width: None,
42 screen_height: None,
43 timezone: None,
44 page_url: local.entry_url.clone(),
45 landing_page: local.landing_page,
46 entry_url: local.entry_url,
47 country: local.country,
48 region: local.region,
49 city: local.city,
50 utm_source: local.utm_source,
51 utm_medium: local.utm_medium,
52 utm_campaign: local.utm_campaign,
53 utm_content: local.utm_content,
54 utm_term: local.utm_term,
55 }
56 }
57
58 async fn create_session(&self, input: CreateSessionInput<'_>) -> AnalyticsResult<()> {
59 let local_analytics = SessionAnalytics {
60 ip_address: input.analytics.ip_address.clone(),
61 user_agent: input.analytics.user_agent.clone(),
62 device_type: input.analytics.device_type.clone(),
63 browser: input.analytics.browser.clone(),
64 os: input.analytics.os.clone(),
65 fingerprint_hash: input.analytics.fingerprint_hash.clone(),
66 referrer_url: input
67 .analytics
68 .referrer_url
69 .clone()
70 .or_else(|| input.analytics.referer.clone()),
71 referrer_source: input.analytics.referrer_source.clone(),
72 preferred_locale: input
73 .analytics
74 .preferred_locale
75 .clone()
76 .or_else(|| input.analytics.accept_language.clone()),
77 landing_page: input.analytics.landing_page.clone(),
78 entry_url: input
79 .analytics
80 .entry_url
81 .clone()
82 .or_else(|| input.analytics.page_url.clone()),
83 country: input.analytics.country.clone(),
84 region: input.analytics.region.clone(),
85 city: input.analytics.city.clone(),
86 utm_source: input.analytics.utm_source.clone(),
87 utm_medium: input.analytics.utm_medium.clone(),
88 utm_campaign: input.analytics.utm_campaign.clone(),
89 utm_content: input.analytics.utm_content.clone(),
90 utm_term: input.analytics.utm_term.clone(),
91 };
92
93 let local_input = super::service::CreateAnalyticsSessionInput {
94 session_id: input.session_id,
95 user_id: input.user_id,
96 analytics: &local_analytics,
97 session_source: input.session_source,
98 is_bot: input.is_bot,
99 is_ai_crawler: input.is_ai_crawler,
100 expires_at: input.expires_at,
101 };
102
103 self.create_analytics_session(local_input)
104 .await
105 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
106 }
107
108 async fn find_recent_session_by_fingerprint(
109 &self,
110 fingerprint: &str,
111 max_age_seconds: i64,
112 ) -> AnalyticsResult<Option<AnalyticsSession>> {
113 let result = Self::find_recent_session_by_fingerprint(self, fingerprint, max_age_seconds)
114 .await
115 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))?;
116
117 Ok(result.map(|r| AnalyticsSession {
118 session_id: r.session_id,
119 user_id: r.user_id,
120 fingerprint: Some(fingerprint.to_owned()),
121 created_at: Utc::now(),
122 }))
123 }
124
125 async fn find_session_by_id(
126 &self,
127 session_id: &SessionId,
128 ) -> AnalyticsResult<Option<AnalyticsSession>> {
129 let result = self
130 .session_repo()
131 .find_by_id(session_id)
132 .await
133 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))?;
134
135 Ok(result.map(|r| AnalyticsSession {
136 session_id: r.session_id,
137 user_id: r.user_id,
138 fingerprint: r.fingerprint_hash,
139 created_at: r.started_at.unwrap_or_else(Utc::now),
140 }))
141 }
142
143 async fn find_active_session_by_id(
144 &self,
145 session_id: &SessionId,
146 ) -> AnalyticsResult<Option<ActiveSession>> {
147 let result = self
148 .session_repo()
149 .find_active_by_id(session_id)
150 .await
151 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))?;
152
153 Ok(result.map(|r| ActiveSession { user_id: r.user_id }))
154 }
155
156 async fn revoke_session(&self, session_id: &SessionId) -> AnalyticsResult<()> {
157 self.session_repo()
158 .revoke_session(session_id)
159 .await
160 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
161 }
162
163 async fn revoke_all_sessions_for_user(&self, user_id: &UserId) -> AnalyticsResult<u64> {
164 self.session_repo()
165 .revoke_all_for_user(user_id)
166 .await
167 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
168 }
169
170 async fn migrate_user_sessions(
171 &self,
172 from_user_id: &UserId,
173 to_user_id: &UserId,
174 ) -> AnalyticsResult<u64> {
175 self.session_repo()
176 .migrate_user_sessions(from_user_id, to_user_id)
177 .await
178 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
179 }
180
181 async fn mark_session_converted(&self, session_id: &SessionId) -> AnalyticsResult<()> {
182 self.session_repo()
183 .mark_converted(session_id)
184 .await
185 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
186 }
187}
188
189#[async_trait]
190impl FingerprintProvider for FingerprintRepository {
191 async fn count_active_sessions(&self, fingerprint: &str) -> AnalyticsResult<i64> {
192 self.count_active_sessions(fingerprint)
193 .await
194 .map(i64::from)
195 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
196 }
197
198 async fn find_reusable_session(&self, fingerprint: &str) -> AnalyticsResult<Option<String>> {
199 self.find_reusable_session(fingerprint)
200 .await
201 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
202 }
203
204 async fn upsert_fingerprint(
205 &self,
206 fingerprint: &str,
207 ip_address: Option<&str>,
208 user_agent: Option<&str>,
209 _screen_info: Option<&str>,
210 ) -> AnalyticsResult<()> {
211 self.upsert_fingerprint(fingerprint, ip_address, user_agent, None)
212 .await
213 .map(|_| ())
214 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
215 }
216}
217
218#[async_trait]
219impl SessionAnalyticsProvider for SessionRepository {
220 async fn increment_task_count(&self, session_id: &SessionId) -> SessionAnalyticsResult<()> {
221 Self::increment_task_count(self, session_id)
222 .await
223 .map_err(|e| SessionAnalyticsProviderError::Internal(e.to_string()))
224 }
225
226 async fn increment_message_count(&self, session_id: &SessionId) -> SessionAnalyticsResult<()> {
227 Self::increment_message_count(self, session_id)
228 .await
229 .map_err(|e| SessionAnalyticsProviderError::Internal(e.to_string()))
230 }
231}