systemprompt_analytics/services/
providers.rs1use async_trait::async_trait;
2use chrono::Utc;
3use http::{HeaderMap, Uri};
4use systemprompt_identifiers::{SessionId, UserId};
5use systemprompt_traits::{
6 AnalyticsProvider, AnalyticsProviderError, AnalyticsResult, AnalyticsSession,
7 CreateSessionInput, FingerprintProvider, SessionAnalytics as TraitSessionAnalytics,
8 SessionAnalyticsProvider, SessionAnalyticsProviderError, SessionAnalyticsResult,
9};
10
11use super::service::AnalyticsService;
12use super::SessionAnalytics;
13use crate::repository::{FingerprintRepository, SessionRepository};
14
15#[async_trait]
16impl AnalyticsProvider for AnalyticsService {
17 fn extract_analytics(&self, headers: &HeaderMap, uri: Option<&Uri>) -> TraitSessionAnalytics {
18 let local = Self::extract_analytics(self, headers, uri);
19 TraitSessionAnalytics {
20 ip_address: local.ip_address.clone(),
21 user_agent: local.user_agent.clone(),
22 referer: local.referrer_url.clone(),
23 accept_language: local.preferred_locale.clone(),
24 screen_width: None,
25 screen_height: None,
26 timezone: None,
27 page_url: local.entry_url,
28 }
29 }
30
31 async fn create_session(&self, input: CreateSessionInput<'_>) -> AnalyticsResult<()> {
32 let local_analytics = SessionAnalytics {
33 ip_address: input.analytics.ip_address.clone(),
34 user_agent: input.analytics.user_agent.clone(),
35 referrer_url: input.analytics.referer.clone(),
36 preferred_locale: input.analytics.accept_language.clone(),
37 entry_url: input.analytics.page_url.clone(),
38 ..Default::default()
39 };
40
41 let local_input = super::service::CreateAnalyticsSessionInput {
42 session_id: input.session_id,
43 user_id: input.user_id,
44 analytics: &local_analytics,
45 session_source: input.session_source,
46 is_bot: input.is_bot,
47 expires_at: input.expires_at,
48 };
49
50 self.create_analytics_session(local_input)
51 .await
52 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
53 }
54
55 async fn find_recent_session_by_fingerprint(
56 &self,
57 fingerprint: &str,
58 max_age_seconds: i64,
59 ) -> AnalyticsResult<Option<AnalyticsSession>> {
60 let result = Self::find_recent_session_by_fingerprint(self, fingerprint, max_age_seconds)
61 .await
62 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))?;
63
64 Ok(result.map(|r| AnalyticsSession {
65 session_id: r.session_id.to_string(),
66 user_id: r.user_id.map(|u| u.to_string()),
67 fingerprint: Some(fingerprint.to_string()),
68 created_at: Utc::now(),
69 }))
70 }
71
72 async fn find_session_by_id(
73 &self,
74 session_id: &SessionId,
75 ) -> AnalyticsResult<Option<AnalyticsSession>> {
76 let result = self
77 .session_repo()
78 .find_by_id(session_id)
79 .await
80 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))?;
81
82 Ok(result.map(|r| AnalyticsSession {
83 session_id: r.session_id.to_string(),
84 user_id: r.user_id.map(|u| u.to_string()),
85 fingerprint: r.fingerprint_hash,
86 created_at: r.started_at.unwrap_or_else(Utc::now),
87 }))
88 }
89
90 async fn migrate_user_sessions(
91 &self,
92 from_user_id: &UserId,
93 to_user_id: &UserId,
94 ) -> AnalyticsResult<u64> {
95 self.session_repo()
96 .migrate_user_sessions(from_user_id, to_user_id)
97 .await
98 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
99 }
100}
101
102#[async_trait]
103impl FingerprintProvider for FingerprintRepository {
104 async fn count_active_sessions(&self, fingerprint: &str) -> AnalyticsResult<i64> {
105 self.count_active_sessions(fingerprint)
106 .await
107 .map(i64::from)
108 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
109 }
110
111 async fn find_reusable_session(&self, fingerprint: &str) -> AnalyticsResult<Option<String>> {
112 self.find_reusable_session(fingerprint)
113 .await
114 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
115 }
116
117 async fn upsert_fingerprint(
118 &self,
119 fingerprint: &str,
120 ip_address: Option<&str>,
121 user_agent: Option<&str>,
122 _screen_info: Option<&str>,
123 ) -> AnalyticsResult<()> {
124 self.upsert_fingerprint(fingerprint, ip_address, user_agent, None)
125 .await
126 .map(|_| ())
127 .map_err(|e| AnalyticsProviderError::Internal(e.to_string()))
128 }
129}
130
131#[async_trait]
132impl SessionAnalyticsProvider for SessionRepository {
133 async fn increment_task_count(&self, session_id: &SessionId) -> SessionAnalyticsResult<()> {
134 Self::increment_task_count(self, session_id)
135 .await
136 .map_err(|e| SessionAnalyticsProviderError::Internal(e.to_string()))
137 }
138
139 async fn increment_message_count(&self, session_id: &SessionId) -> SessionAnalyticsResult<()> {
140 Self::increment_message_count(self, session_id)
141 .await
142 .map_err(|e| SessionAnalyticsProviderError::Internal(e.to_string()))
143 }
144}