Skip to main content

systemprompt_analytics/repository/session/
mod.rs

1//! `SessionRepository` — repository surface over `user_sessions`. Internal
2//! submodules: `queries` (read), `mutations` (write), `behavioral` (bot
3//! detection writes), `behavioral_queries` (bot detection reads), `types`
4//! (DTO/row structs).
5
6mod behavioral;
7mod behavioral_queries;
8mod mutations;
9mod queries;
10mod types;
11
12use std::sync::Arc;
13
14use crate::Result;
15use chrono::{DateTime, Utc};
16use sqlx::PgPool;
17use systemprompt_database::DbPool;
18use systemprompt_identifiers::{SessionId, UserId};
19
20use crate::models::AnalyticsSession;
21
22pub use types::{
23    ActiveSessionLookup, CreateSessionParams, SessionBehavioralData, SessionMigrationResult,
24    SessionRecord,
25};
26
27#[derive(Clone, Debug)]
28pub struct SessionRepository {
29    pool: Arc<PgPool>,
30    write_pool: Arc<PgPool>,
31}
32
33impl SessionRepository {
34    pub fn new(db: &DbPool) -> Result<Self> {
35        let pool = db.pool_arc()?;
36        let write_pool = db.write_pool_arc()?;
37        Ok(Self { pool, write_pool })
38    }
39
40    pub async fn find_by_id(&self, session_id: &SessionId) -> Result<Option<AnalyticsSession>> {
41        queries::find_by_id(&self.pool, session_id).await
42    }
43
44    pub async fn find_active_by_id(
45        &self,
46        session_id: &SessionId,
47    ) -> Result<Option<ActiveSessionLookup>> {
48        queries::find_active_by_id(&self.pool, session_id).await
49    }
50
51    pub async fn revoke_session(&self, session_id: &SessionId) -> Result<()> {
52        mutations::revoke_session(&self.write_pool, session_id).await
53    }
54
55    pub async fn revoke_all_for_user(&self, user_id: &UserId) -> Result<u64> {
56        mutations::revoke_all_for_user(&self.write_pool, user_id).await
57    }
58
59    pub async fn find_by_fingerprint(
60        &self,
61        fingerprint_hash: &str,
62        user_id: &UserId,
63    ) -> Result<Option<AnalyticsSession>> {
64        queries::find_by_fingerprint(&self.pool, fingerprint_hash, user_id).await
65    }
66
67    pub async fn list_active_by_user(&self, user_id: &UserId) -> Result<Vec<AnalyticsSession>> {
68        queries::list_active_by_user(&self.pool, user_id).await
69    }
70
71    pub async fn update_activity(&self, session_id: &SessionId) -> Result<()> {
72        mutations::update_activity(&self.write_pool, session_id).await
73    }
74
75    pub async fn increment_request_count(&self, session_id: &SessionId) -> Result<()> {
76        mutations::increment_request_count(&self.write_pool, session_id).await
77    }
78
79    pub async fn increment_task_count(&self, session_id: &SessionId) -> Result<()> {
80        mutations::increment_task_count(&self.write_pool, session_id).await
81    }
82
83    pub async fn increment_ai_request_count(&self, session_id: &SessionId) -> Result<()> {
84        mutations::increment_ai_request_count(&self.write_pool, session_id).await
85    }
86
87    pub async fn increment_message_count(&self, session_id: &SessionId) -> Result<()> {
88        mutations::increment_message_count(&self.write_pool, session_id).await
89    }
90
91    pub async fn end_session(&self, session_id: &SessionId) -> Result<()> {
92        mutations::end_session(&self.write_pool, session_id).await
93    }
94
95    pub async fn mark_as_scanner(&self, session_id: &SessionId) -> Result<()> {
96        mutations::mark_as_scanner(&self.write_pool, session_id).await
97    }
98
99    pub async fn mark_converted(&self, session_id: &SessionId) -> Result<()> {
100        mutations::mark_converted(&self.write_pool, session_id).await
101    }
102
103    pub async fn mark_as_behavioral_bot(&self, session_id: &SessionId, reason: &str) -> Result<()> {
104        behavioral::mark_as_behavioral_bot(&self.write_pool, session_id, reason).await
105    }
106
107    pub async fn check_and_mark_behavioral_bot(
108        &self,
109        session_id: &SessionId,
110        request_count_threshold: i32,
111    ) -> Result<bool> {
112        behavioral::check_and_mark_behavioral_bot(
113            &self.write_pool,
114            session_id,
115            request_count_threshold,
116        )
117        .await
118    }
119
120    pub async fn cleanup_inactive(&self, inactive_hours: i32) -> Result<u64> {
121        mutations::cleanup_inactive(&self.write_pool, inactive_hours).await
122    }
123
124    pub async fn count_inactive(&self, inactive_hours: i32) -> Result<i64> {
125        queries::count_inactive(&self.pool, inactive_hours).await
126    }
127
128    pub async fn migrate_user_sessions(
129        &self,
130        old_user_id: &UserId,
131        new_user_id: &UserId,
132    ) -> Result<u64> {
133        mutations::migrate_user_sessions(&self.write_pool, old_user_id, new_user_id).await
134    }
135
136    pub async fn create_session(&self, params: &CreateSessionParams<'_>) -> Result<()> {
137        mutations::create_session(&self.write_pool, params).await
138    }
139
140    pub async fn find_recent_by_fingerprint(
141        &self,
142        fingerprint_hash: &str,
143        max_age_seconds: i64,
144    ) -> Result<Option<SessionRecord>> {
145        queries::find_recent_by_fingerprint(&self.pool, fingerprint_hash, max_age_seconds).await
146    }
147
148    pub async fn exists(&self, session_id: &SessionId) -> Result<bool> {
149        queries::exists(&self.pool, session_id).await
150    }
151
152    pub async fn increment_ai_usage(
153        &self,
154        session_id: &SessionId,
155        tokens: i32,
156        cost_microdollars: i64,
157    ) -> Result<()> {
158        mutations::increment_ai_usage(&self.write_pool, session_id, tokens, cost_microdollars).await
159    }
160
161    pub async fn update_behavioral_detection(
162        &self,
163        session_id: &SessionId,
164        score: i32,
165        is_behavioral_bot: bool,
166        reason: Option<&str>,
167    ) -> Result<()> {
168        behavioral::update_behavioral_detection(
169            &self.write_pool,
170            session_id,
171            score,
172            is_behavioral_bot,
173            reason,
174        )
175        .await
176    }
177
178    pub async fn escalate_throttle(&self, session_id: &SessionId, new_level: i32) -> Result<()> {
179        mutations::escalate_throttle(&self.write_pool, session_id, new_level).await
180    }
181
182    pub async fn get_throttle_level(&self, session_id: &SessionId) -> Result<i32> {
183        queries::get_throttle_level(&self.pool, session_id).await
184    }
185
186    pub async fn count_sessions_by_fingerprint(
187        &self,
188        fingerprint_hash: &str,
189        window_hours: i64,
190    ) -> Result<i64> {
191        behavioral_queries::count_sessions_by_fingerprint(
192            &self.pool,
193            fingerprint_hash,
194            window_hours,
195        )
196        .await
197    }
198
199    pub async fn get_endpoint_sequence(&self, session_id: &SessionId) -> Result<Vec<String>> {
200        behavioral_queries::get_endpoint_sequence(&self.pool, session_id).await
201    }
202
203    pub async fn get_request_timestamps(
204        &self,
205        session_id: &SessionId,
206    ) -> Result<Vec<DateTime<Utc>>> {
207        behavioral_queries::get_request_timestamps(&self.pool, session_id).await
208    }
209
210    pub async fn get_total_content_pages(&self) -> Result<i64> {
211        queries::get_total_content_pages(&self.pool).await
212    }
213
214    pub async fn get_session_for_behavioral_analysis(
215        &self,
216        session_id: &SessionId,
217    ) -> Result<Option<SessionBehavioralData>> {
218        behavioral_queries::get_session_for_behavioral_analysis(&self.pool, session_id).await
219    }
220
221    pub async fn has_analytics_events(&self, session_id: &SessionId) -> Result<bool> {
222        behavioral_queries::has_analytics_events(&self.pool, session_id).await
223    }
224
225    pub async fn count_unique_ips_by_fingerprint(
226        &self,
227        fingerprint_hash: &str,
228        window_days: i64,
229    ) -> Result<i64> {
230        behavioral_queries::count_unique_ips_by_fingerprint(
231            &self.pool,
232            fingerprint_hash,
233            window_days,
234        )
235        .await
236    }
237
238    pub async fn count_engagement_events_by_fingerprint(
239        &self,
240        fingerprint_hash: &str,
241        window_days: i64,
242    ) -> Result<i64> {
243        behavioral_queries::count_engagement_events_by_fingerprint(
244            &self.pool,
245            fingerprint_hash,
246            window_days,
247        )
248        .await
249    }
250
251    pub async fn get_session_starts_by_fingerprint(
252        &self,
253        fingerprint_hash: &str,
254        window_days: i64,
255    ) -> Result<Vec<DateTime<Utc>>> {
256        behavioral_queries::get_session_starts_by_fingerprint(
257            &self.pool,
258            fingerprint_hash,
259            window_days,
260        )
261        .await
262    }
263
264    pub async fn get_session_velocity(
265        &self,
266        session_id: &SessionId,
267    ) -> Result<(Option<i64>, Option<i64>)> {
268        behavioral_queries::get_session_velocity(&self.pool, session_id).await
269    }
270}