Skip to main content

systemprompt_analytics/repository/session/
mod.rs

1//! `SessionRepository` — repository surface over `user_sessions`. Internal
2//! submodules: `queries` (read), `mutations` (write), `behavioral` (bot
3//! detection writes), `behavioral_queries` (bot detection reads), `types`
4//! (DTO/row structs).
5
6mod behavioral;
7mod behavioral_queries;
8mod mutations;
9mod queries;
10mod types;
11
12use std::sync::Arc;
13
14use crate::Result;
15use chrono::{DateTime, Utc};
16use sqlx::PgPool;
17use systemprompt_database::DbPool;
18use systemprompt_identifiers::{SessionId, UserId};
19
20use crate::models::AnalyticsSession;
21
22pub(super) use types::ActiveSessionLookup;
23pub use types::{
24    CreateSessionParams, SessionBehavioralData, SessionMigrationResult, SessionRecord,
25};
26
27#[derive(Clone, Debug)]
28pub struct SessionRepository {
29    pool: Arc<PgPool>,
30    write_pool: Arc<PgPool>,
31}
32
33impl SessionRepository {
34    pub fn new(db: &DbPool) -> Result<Self> {
35        let pool = db.pool_arc()?;
36        let write_pool = db.write_pool_arc()?;
37        Ok(Self { pool, write_pool })
38    }
39
40    pub async fn find_by_id(&self, session_id: &SessionId) -> Result<Option<AnalyticsSession>> {
41        queries::find_by_id(&self.pool, session_id).await
42    }
43
44    pub async fn find_active_by_id(
45        &self,
46        session_id: &SessionId,
47    ) -> Result<Option<ActiveSessionLookup>> {
48        queries::find_active_by_id(&self.pool, session_id).await
49    }
50
51    pub async fn revoke_session(&self, session_id: &SessionId) -> Result<()> {
52        mutations::revoke_session(&self.write_pool, session_id).await
53    }
54
55    pub async fn revoke_all_for_user(&self, user_id: &UserId) -> Result<u64> {
56        mutations::revoke_all_for_user(&self.write_pool, user_id).await
57    }
58
59    pub async fn find_by_fingerprint(
60        &self,
61        fingerprint_hash: &str,
62        user_id: &UserId,
63    ) -> Result<Option<AnalyticsSession>> {
64        queries::find_by_fingerprint(&self.pool, fingerprint_hash, user_id).await
65    }
66
67    pub async fn list_active_by_user(&self, user_id: &UserId) -> Result<Vec<AnalyticsSession>> {
68        queries::list_active_by_user(&self.pool, user_id).await
69    }
70
71    pub async fn update_activity(&self, session_id: &SessionId) -> Result<()> {
72        mutations::update_activity(&self.write_pool, session_id).await
73    }
74
75    pub async fn increment_request_count(&self, session_id: &SessionId) -> Result<()> {
76        mutations::increment_request_count(&self.write_pool, session_id).await
77    }
78
79    pub async fn increment_task_count(&self, session_id: &SessionId) -> Result<()> {
80        mutations::increment_task_count(&self.write_pool, session_id).await
81    }
82
83    pub async fn increment_ai_request_count(&self, session_id: &SessionId) -> Result<()> {
84        mutations::increment_ai_request_count(&self.write_pool, session_id).await
85    }
86
87    pub async fn increment_message_count(&self, session_id: &SessionId) -> Result<()> {
88        mutations::increment_message_count(&self.write_pool, session_id).await
89    }
90
91    pub async fn end_session(&self, session_id: &SessionId) -> Result<()> {
92        mutations::end_session(&self.write_pool, session_id).await
93    }
94
95    pub async fn mark_as_scanner(&self, session_id: &SessionId) -> Result<()> {
96        mutations::mark_as_scanner(&self.write_pool, session_id).await
97    }
98
99    pub async fn mark_converted(&self, session_id: &SessionId) -> Result<()> {
100        mutations::mark_converted(&self.write_pool, session_id).await
101    }
102
103    pub async fn mark_as_behavioral_bot(&self, session_id: &SessionId, reason: &str) -> Result<()> {
104        behavioral::mark_as_behavioral_bot(&self.write_pool, session_id, reason).await
105    }
106
107    pub async fn check_and_mark_behavioral_bot(
108        &self,
109        session_id: &SessionId,
110        request_count_threshold: i32,
111    ) -> Result<bool> {
112        behavioral::check_and_mark_behavioral_bot(
113            &self.write_pool,
114            session_id,
115            request_count_threshold,
116        )
117        .await
118    }
119
120    pub async fn cleanup_inactive(&self, inactive_hours: i32) -> Result<u64> {
121        mutations::cleanup_inactive(&self.write_pool, inactive_hours).await
122    }
123
124    pub async fn count_inactive(&self, inactive_hours: i32) -> Result<i64> {
125        queries::count_inactive(&self.pool, inactive_hours).await
126    }
127
128    pub async fn migrate_user_sessions(
129        &self,
130        old_user_id: &UserId,
131        new_user_id: &UserId,
132    ) -> Result<u64> {
133        mutations::migrate_user_sessions(&self.write_pool, old_user_id, new_user_id).await
134    }
135
136    pub async fn create_session(&self, params: &CreateSessionParams<'_>) -> Result<()> {
137        mutations::create_session(&self.write_pool, params).await
138    }
139
140    pub async fn find_recent_by_fingerprint(
141        &self,
142        fingerprint_hash: &str,
143        max_age_seconds: i64,
144    ) -> Result<Option<SessionRecord>> {
145        queries::find_recent_by_fingerprint(&self.pool, fingerprint_hash, max_age_seconds).await
146    }
147
148    pub async fn exists(&self, session_id: &SessionId) -> Result<bool> {
149        queries::exists(&self.pool, session_id).await
150    }
151
152    pub async fn increment_ai_usage(
153        &self,
154        session_id: &SessionId,
155        tokens: i32,
156        cost_microdollars: i64,
157    ) -> Result<()> {
158        mutations::increment_ai_usage(&self.write_pool, session_id, tokens, cost_microdollars).await
159    }
160
161    pub async fn update_behavioral_detection(
162        &self,
163        session_id: &SessionId,
164        score: i32,
165        is_behavioral_bot: bool,
166        reason: Option<&str>,
167    ) -> Result<()> {
168        behavioral::update_behavioral_detection(
169            &self.write_pool,
170            session_id,
171            score,
172            is_behavioral_bot,
173            reason,
174        )
175        .await
176    }
177
178    pub async fn count_sessions_by_fingerprint(
179        &self,
180        fingerprint_hash: &str,
181        window_hours: i64,
182    ) -> Result<i64> {
183        behavioral_queries::count_sessions_by_fingerprint(
184            &self.pool,
185            fingerprint_hash,
186            window_hours,
187        )
188        .await
189    }
190
191    pub async fn get_endpoint_sequence(&self, session_id: &SessionId) -> Result<Vec<String>> {
192        behavioral_queries::get_endpoint_sequence(&self.pool, session_id).await
193    }
194
195    pub async fn get_request_timestamps(
196        &self,
197        session_id: &SessionId,
198    ) -> Result<Vec<DateTime<Utc>>> {
199        behavioral_queries::get_request_timestamps(&self.pool, session_id).await
200    }
201
202    pub async fn get_total_content_pages(&self) -> Result<i64> {
203        queries::get_total_content_pages(&self.pool).await
204    }
205
206    pub async fn get_session_for_behavioral_analysis(
207        &self,
208        session_id: &SessionId,
209    ) -> Result<Option<SessionBehavioralData>> {
210        behavioral_queries::get_session_for_behavioral_analysis(&self.pool, session_id).await
211    }
212
213    pub async fn has_analytics_events(&self, session_id: &SessionId) -> Result<bool> {
214        behavioral_queries::has_analytics_events(&self.pool, session_id).await
215    }
216
217    pub async fn count_unique_ips_by_fingerprint(
218        &self,
219        fingerprint_hash: &str,
220        window_days: i64,
221    ) -> Result<i64> {
222        behavioral_queries::count_unique_ips_by_fingerprint(
223            &self.pool,
224            fingerprint_hash,
225            window_days,
226        )
227        .await
228    }
229
230    pub async fn count_engagement_events_by_fingerprint(
231        &self,
232        fingerprint_hash: &str,
233        window_days: i64,
234    ) -> Result<i64> {
235        behavioral_queries::count_engagement_events_by_fingerprint(
236            &self.pool,
237            fingerprint_hash,
238            window_days,
239        )
240        .await
241    }
242
243    pub async fn get_session_starts_by_fingerprint(
244        &self,
245        fingerprint_hash: &str,
246        window_days: i64,
247    ) -> Result<Vec<DateTime<Utc>>> {
248        behavioral_queries::get_session_starts_by_fingerprint(
249            &self.pool,
250            fingerprint_hash,
251            window_days,
252        )
253        .await
254    }
255
256    pub async fn get_session_velocity(
257        &self,
258        session_id: &SessionId,
259    ) -> Result<(Option<i64>, Option<i64>)> {
260        behavioral_queries::get_session_velocity(&self.pool, session_id).await
261    }
262}