systemprompt_analytics/services/
providers.rs1use async_trait::async_trait;
2use chrono::Utc;
3use http::{HeaderMap, Uri};
4use systemprompt_identifiers::{SessionId, UserId};
5use systemprompt_traits::{
6 ActiveSession, AnalyticsProvider, AnalyticsProviderError, AnalyticsResult, AnalyticsSession,
7 CreateSessionInput, FingerprintProvider, SessionAnalytics as TraitSessionAnalytics,
8 SessionAnalyticsProvider, SessionAnalyticsProviderError, SessionAnalyticsResult,
9};
10
11use super::SessionAnalytics;
12use super::service::AnalyticsService;
13use crate::repository::{FingerprintRepository, SessionRepository};
14
15#[async_trait]
16impl AnalyticsProvider for AnalyticsService {
17 fn extract_analytics(&self, headers: &HeaderMap, uri: Option<&Uri>) -> TraitSessionAnalytics {
18 let local = Self::extract_analytics(self, headers, uri);
19 TraitSessionAnalytics {
20 ip_address: local.ip_address.clone(),
21 user_agent: local.user_agent.clone(),
22 device_type: local.device_type.clone(),
23 browser: local.browser.clone(),
24 os: local.os.clone(),
25 fingerprint_hash: local.fingerprint_hash.clone(),
26 referer: local.referrer_url.clone(),
27 referrer_url: local.referrer_url.clone(),
28 referrer_source: local.referrer_source.clone(),
29 accept_language: local.preferred_locale.clone(),
30 preferred_locale: local.preferred_locale.clone(),
31 screen_width: None,
32 screen_height: None,
33 timezone: None,
34 page_url: local.entry_url.clone(),
35 landing_page: local.landing_page.clone(),
36 entry_url: local.entry_url,
37 country: local.country.clone(),
38 region: local.region.clone(),
39 city: local.city,
40 utm_source: local.utm_source,
41 utm_medium: local.utm_medium,
42 utm_campaign: local.utm_campaign,
43 utm_content: local.utm_content,
44 utm_term: local.utm_term,
45 }
46 }
47
48 async fn create_session(&self, input: CreateSessionInput<'_>) -> AnalyticsResult<()> {
49 let local_analytics = SessionAnalytics {
50 ip_address: input.analytics.ip_address.clone(),
51 user_agent: input.analytics.user_agent.clone(),
52 device_type: input.analytics.device_type.clone(),
53 browser: input.analytics.browser.clone(),
54 os: input.analytics.os.clone(),
55 fingerprint_hash: input.analytics.fingerprint_hash.clone(),
56 referrer_url: input
57 .analytics
58 .referrer_url
59 .clone()
60 .or_else(|| input.analytics.referer.clone()),
61 referrer_source: input.analytics.referrer_source.clone(),
62 preferred_locale: input
63 .analytics
64 .preferred_locale
65 .clone()
66 .or_else(|| input.analytics.accept_language.clone()),
67 landing_page: input.analytics.landing_page.clone(),
68 entry_url: input
69 .analytics
70 .entry_url
71 .clone()
72 .or_else(|| input.analytics.page_url.clone()),
73 country: input.analytics.country.clone(),
74 region: input.analytics.region.clone(),
75 city: input.analytics.city.clone(),
76 utm_source: input.analytics.utm_source.clone(),
77 utm_medium: input.analytics.utm_medium.clone(),
78 utm_campaign: input.analytics.utm_campaign.clone(),
79 utm_content: input.analytics.utm_content.clone(),
80 utm_term: input.analytics.utm_term.clone(),
81 };
82
83 let local_input = super::service::CreateAnalyticsSessionInput {
84 session_id: input.session_id,
85 user_id: input.user_id,
86 analytics: &local_analytics,
87 session_source: input.session_source,
88 is_bot: input.is_bot,
89 is_ai_crawler: input.is_ai_crawler,
90 expires_at: input.expires_at,
91 };
92
93 self.create_analytics_session(local_input)
94 .await
95 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
96 }
97
98 async fn find_recent_session_by_fingerprint(
99 &self,
100 fingerprint: &str,
101 max_age_seconds: i64,
102 ) -> AnalyticsResult<Option<AnalyticsSession>> {
103 let result = Self::find_recent_session_by_fingerprint(self, fingerprint, max_age_seconds)
104 .await
105 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))?;
106
107 Ok(result.map(|r| AnalyticsSession {
108 session_id: r.session_id,
109 user_id: r.user_id,
110 fingerprint: Some(fingerprint.to_owned()),
111 created_at: Utc::now(),
112 }))
113 }
114
115 async fn find_session_by_id(
116 &self,
117 session_id: &SessionId,
118 ) -> AnalyticsResult<Option<AnalyticsSession>> {
119 let result = self
120 .session_repo()
121 .find_by_id(session_id)
122 .await
123 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))?;
124
125 Ok(result.map(|r| AnalyticsSession {
126 session_id: r.session_id,
127 user_id: r.user_id,
128 fingerprint: r.fingerprint_hash,
129 created_at: r.started_at.unwrap_or_else(Utc::now),
130 }))
131 }
132
133 async fn find_active_session_by_id(
134 &self,
135 session_id: &SessionId,
136 ) -> AnalyticsResult<Option<ActiveSession>> {
137 let result = self
138 .session_repo()
139 .find_active_by_id(session_id)
140 .await
141 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))?;
142
143 Ok(result.map(|r| ActiveSession { user_id: r.user_id }))
144 }
145
146 async fn revoke_session(&self, session_id: &SessionId) -> AnalyticsResult<()> {
147 self.session_repo()
148 .revoke_session(session_id)
149 .await
150 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
151 }
152
153 async fn revoke_all_sessions_for_user(&self, user_id: &UserId) -> AnalyticsResult<u64> {
154 self.session_repo()
155 .revoke_all_for_user(user_id)
156 .await
157 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
158 }
159
160 async fn migrate_user_sessions(
161 &self,
162 from_user_id: &UserId,
163 to_user_id: &UserId,
164 ) -> AnalyticsResult<u64> {
165 self.session_repo()
166 .migrate_user_sessions(from_user_id, to_user_id)
167 .await
168 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
169 }
170
171 async fn mark_session_converted(&self, session_id: &SessionId) -> AnalyticsResult<()> {
172 self.session_repo()
173 .mark_converted(session_id)
174 .await
175 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
176 }
177}
178
179#[async_trait]
180impl FingerprintProvider for FingerprintRepository {
181 async fn count_active_sessions(&self, fingerprint: &str) -> AnalyticsResult<i64> {
182 self.count_active_sessions(fingerprint)
183 .await
184 .map(i64::from)
185 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
186 }
187
188 async fn find_reusable_session(&self, fingerprint: &str) -> AnalyticsResult<Option<String>> {
189 self.find_reusable_session(fingerprint)
190 .await
191 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
192 }
193
194 async fn upsert_fingerprint(
195 &self,
196 fingerprint: &str,
197 ip_address: Option<&str>,
198 user_agent: Option<&str>,
199 _screen_info: Option<&str>,
200 ) -> AnalyticsResult<()> {
201 self.upsert_fingerprint(fingerprint, ip_address, user_agent, None)
202 .await
203 .map(|_| ())
204 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
205 }
206}
207
208#[async_trait]
209impl SessionAnalyticsProvider for SessionRepository {
210 async fn increment_task_count(&self, session_id: &SessionId) -> SessionAnalyticsResult<()> {
211 Self::increment_task_count(self, session_id)
212 .await
213 .map_err(|e| SessionAnalyticsProviderError::Internal(e.to_string()))
214 }
215
216 async fn increment_message_count(&self, session_id: &SessionId) -> SessionAnalyticsResult<()> {
217 Self::increment_message_count(self, session_id)
218 .await
219 .map_err(|e| SessionAnalyticsProviderError::Internal(e.to_string()))
220 }
221}