Skip to main content

systemprompt_analytics/repository/session/
mod.rs

1//! `SessionRepository` — repository surface over `user_sessions`. Internal
2//! submodules: `queries` (read), `mutations` (write), `behavioral` (bot
3//! detection writes), `behavioral_queries` (bot detection reads), `types`
4//! (DTO/row structs).
5
6mod behavioral;
7mod behavioral_queries;
8mod mutations;
9mod queries;
10mod types;
11
12use std::sync::Arc;
13
14use crate::Result;
15use chrono::{DateTime, Utc};
16use sqlx::PgPool;
17use systemprompt_database::DbPool;
18use systemprompt_identifiers::{SessionId, UserId};
19
20use crate::models::AnalyticsSession;
21
22pub use types::{
23    CreateSessionParams, SessionBehavioralData, SessionMigrationResult, SessionRecord,
24};
25
26#[derive(Clone, Debug)]
27pub struct SessionRepository {
28    pool: Arc<PgPool>,
29    write_pool: Arc<PgPool>,
30}
31
32impl SessionRepository {
33    pub fn new(db: &DbPool) -> Result<Self> {
34        let pool = db.pool_arc()?;
35        let write_pool = db.write_pool_arc()?;
36        Ok(Self { pool, write_pool })
37    }
38
39    pub async fn find_by_id(&self, session_id: &SessionId) -> Result<Option<AnalyticsSession>> {
40        queries::find_by_id(&self.pool, session_id).await
41    }
42
43    pub async fn find_by_fingerprint(
44        &self,
45        fingerprint_hash: &str,
46        user_id: &UserId,
47    ) -> Result<Option<AnalyticsSession>> {
48        queries::find_by_fingerprint(&self.pool, fingerprint_hash, user_id).await
49    }
50
51    pub async fn list_active_by_user(&self, user_id: &UserId) -> Result<Vec<AnalyticsSession>> {
52        queries::list_active_by_user(&self.pool, user_id).await
53    }
54
55    pub async fn update_activity(&self, session_id: &SessionId) -> Result<()> {
56        mutations::update_activity(&self.write_pool, session_id).await
57    }
58
59    pub async fn increment_request_count(&self, session_id: &SessionId) -> Result<()> {
60        mutations::increment_request_count(&self.write_pool, session_id).await
61    }
62
63    pub async fn increment_task_count(&self, session_id: &SessionId) -> Result<()> {
64        mutations::increment_task_count(&self.write_pool, session_id).await
65    }
66
67    pub async fn increment_ai_request_count(&self, session_id: &SessionId) -> Result<()> {
68        mutations::increment_ai_request_count(&self.write_pool, session_id).await
69    }
70
71    pub async fn increment_message_count(&self, session_id: &SessionId) -> Result<()> {
72        mutations::increment_message_count(&self.write_pool, session_id).await
73    }
74
75    pub async fn end_session(&self, session_id: &SessionId) -> Result<()> {
76        mutations::end_session(&self.write_pool, session_id).await
77    }
78
79    pub async fn mark_as_scanner(&self, session_id: &SessionId) -> Result<()> {
80        mutations::mark_as_scanner(&self.write_pool, session_id).await
81    }
82
83    pub async fn mark_converted(&self, session_id: &SessionId) -> Result<()> {
84        mutations::mark_converted(&self.write_pool, session_id).await
85    }
86
87    pub async fn mark_as_behavioral_bot(&self, session_id: &SessionId, reason: &str) -> Result<()> {
88        behavioral::mark_as_behavioral_bot(&self.write_pool, session_id, reason).await
89    }
90
91    pub async fn check_and_mark_behavioral_bot(
92        &self,
93        session_id: &SessionId,
94        request_count_threshold: i32,
95    ) -> Result<bool> {
96        behavioral::check_and_mark_behavioral_bot(
97            &self.write_pool,
98            session_id,
99            request_count_threshold,
100        )
101        .await
102    }
103
104    pub async fn cleanup_inactive(&self, inactive_hours: i32) -> Result<u64> {
105        mutations::cleanup_inactive(&self.write_pool, inactive_hours).await
106    }
107
108    pub async fn count_inactive(&self, inactive_hours: i32) -> Result<i64> {
109        queries::count_inactive(&self.pool, inactive_hours).await
110    }
111
112    pub async fn migrate_user_sessions(
113        &self,
114        old_user_id: &UserId,
115        new_user_id: &UserId,
116    ) -> Result<u64> {
117        mutations::migrate_user_sessions(&self.write_pool, old_user_id, new_user_id).await
118    }
119
120    pub async fn create_session(&self, params: &CreateSessionParams<'_>) -> Result<()> {
121        mutations::create_session(&self.write_pool, params).await
122    }
123
124    pub async fn find_recent_by_fingerprint(
125        &self,
126        fingerprint_hash: &str,
127        max_age_seconds: i64,
128    ) -> Result<Option<SessionRecord>> {
129        queries::find_recent_by_fingerprint(&self.pool, fingerprint_hash, max_age_seconds).await
130    }
131
132    pub async fn exists(&self, session_id: &SessionId) -> Result<bool> {
133        queries::exists(&self.pool, session_id).await
134    }
135
136    pub async fn increment_ai_usage(
137        &self,
138        session_id: &SessionId,
139        tokens: i32,
140        cost_microdollars: i64,
141    ) -> Result<()> {
142        mutations::increment_ai_usage(&self.write_pool, session_id, tokens, cost_microdollars).await
143    }
144
145    pub async fn update_behavioral_detection(
146        &self,
147        session_id: &SessionId,
148        score: i32,
149        is_behavioral_bot: bool,
150        reason: Option<&str>,
151    ) -> Result<()> {
152        behavioral::update_behavioral_detection(
153            &self.write_pool,
154            session_id,
155            score,
156            is_behavioral_bot,
157            reason,
158        )
159        .await
160    }
161
162    pub async fn escalate_throttle(&self, session_id: &SessionId, new_level: i32) -> Result<()> {
163        mutations::escalate_throttle(&self.write_pool, session_id, new_level).await
164    }
165
166    pub async fn get_throttle_level(&self, session_id: &SessionId) -> Result<i32> {
167        queries::get_throttle_level(&self.pool, session_id).await
168    }
169
170    pub async fn count_sessions_by_fingerprint(
171        &self,
172        fingerprint_hash: &str,
173        window_hours: i64,
174    ) -> Result<i64> {
175        behavioral_queries::count_sessions_by_fingerprint(
176            &self.pool,
177            fingerprint_hash,
178            window_hours,
179        )
180        .await
181    }
182
183    pub async fn get_endpoint_sequence(&self, session_id: &SessionId) -> Result<Vec<String>> {
184        behavioral_queries::get_endpoint_sequence(&self.pool, session_id).await
185    }
186
187    pub async fn get_request_timestamps(
188        &self,
189        session_id: &SessionId,
190    ) -> Result<Vec<DateTime<Utc>>> {
191        behavioral_queries::get_request_timestamps(&self.pool, session_id).await
192    }
193
194    pub async fn get_total_content_pages(&self) -> Result<i64> {
195        queries::get_total_content_pages(&self.pool).await
196    }
197
198    pub async fn get_session_for_behavioral_analysis(
199        &self,
200        session_id: &SessionId,
201    ) -> Result<Option<SessionBehavioralData>> {
202        behavioral_queries::get_session_for_behavioral_analysis(&self.pool, session_id).await
203    }
204
205    pub async fn has_analytics_events(&self, session_id: &SessionId) -> Result<bool> {
206        behavioral_queries::has_analytics_events(&self.pool, session_id).await
207    }
208
209    pub async fn count_unique_ips_by_fingerprint(
210        &self,
211        fingerprint_hash: &str,
212        window_days: i64,
213    ) -> Result<i64> {
214        behavioral_queries::count_unique_ips_by_fingerprint(
215            &self.pool,
216            fingerprint_hash,
217            window_days,
218        )
219        .await
220    }
221
222    pub async fn count_engagement_events_by_fingerprint(
223        &self,
224        fingerprint_hash: &str,
225        window_days: i64,
226    ) -> Result<i64> {
227        behavioral_queries::count_engagement_events_by_fingerprint(
228            &self.pool,
229            fingerprint_hash,
230            window_days,
231        )
232        .await
233    }
234
235    pub async fn get_session_starts_by_fingerprint(
236        &self,
237        fingerprint_hash: &str,
238        window_days: i64,
239    ) -> Result<Vec<DateTime<Utc>>> {
240        behavioral_queries::get_session_starts_by_fingerprint(
241            &self.pool,
242            fingerprint_hash,
243            window_days,
244        )
245        .await
246    }
247
248    pub async fn get_session_velocity(
249        &self,
250        session_id: &SessionId,
251    ) -> Result<(Option<i64>, Option<i64>)> {
252        behavioral_queries::get_session_velocity(&self.pool, session_id).await
253    }
254}