Skip to main content

systemprompt_users/repository/user/
list.rs

1use systemprompt_identifiers::UserId;
2
3use crate::error::{Result, UserError};
4use crate::models::{User, UserActivity, UserRole, UserStatus, UserWithSessions};
5use crate::repository::{UserRepository, MAX_PAGE_SIZE};
6
7impl UserRepository {
8    pub async fn find_with_sessions(&self, user_id: &UserId) -> Result<Option<UserWithSessions>> {
9        let deleted_status = UserStatus::Deleted.as_str();
10        let row = sqlx::query_as!(
11            UserWithSessions,
12            r#"
13            SELECT
14                u.id, u.name, u.email, u.full_name, u.status, u.roles, u.created_at,
15                COUNT(s.session_id) FILTER (WHERE s.ended_at IS NULL) as "active_sessions!",
16                MAX(s.last_activity_at) as last_session_at
17            FROM users u
18            LEFT JOIN user_sessions s ON s.user_id = u.id
19            WHERE u.id = $1 AND u.status != $2
20            GROUP BY u.id
21            "#,
22            user_id.as_str(),
23            deleted_status
24        )
25        .fetch_optional(&*self.pool)
26        .await?;
27
28        Ok(row)
29    }
30
31    pub async fn get_activity(&self, user_id: &UserId) -> Result<UserActivity> {
32        let row = sqlx::query_as!(
33            UserActivity,
34            r#"
35            SELECT
36                u.id as user_id,
37                MAX(s.last_activity_at) as last_active,
38                COUNT(DISTINCT s.session_id) as "session_count!",
39                COUNT(DISTINCT t.task_id) as "task_count!",
40                0::bigint as "message_count!"
41            FROM users u
42            LEFT JOIN user_sessions s ON s.user_id = u.id
43            LEFT JOIN agent_tasks t ON t.user_id = u.id
44            WHERE u.id = $1
45            GROUP BY u.id
46            "#,
47            user_id.as_str()
48        )
49        .fetch_one(&*self.pool)
50        .await?;
51
52        Ok(row)
53    }
54
55    pub async fn list(&self, limit: i64, offset: i64) -> Result<Vec<User>> {
56        let safe_limit = limit.min(MAX_PAGE_SIZE);
57        let deleted_status = UserStatus::Deleted.as_str();
58        let rows = sqlx::query_as!(
59            User,
60            r#"
61            SELECT id, name, email, full_name, display_name, status, email_verified,
62                   roles, avatar_url, is_bot, is_scanner, created_at, updated_at
63            FROM users
64            WHERE status != $1
65            ORDER BY created_at DESC
66            LIMIT $2 OFFSET $3
67            "#,
68            deleted_status,
69            safe_limit,
70            offset
71        )
72        .fetch_all(&*self.pool)
73        .await?;
74
75        Ok(rows)
76    }
77
78    pub async fn list_all(&self) -> Result<Vec<User>> {
79        let deleted_status = UserStatus::Deleted.as_str();
80        let rows = sqlx::query_as!(
81            User,
82            r#"
83            SELECT id, name, email, full_name, display_name, status, email_verified,
84                   roles, avatar_url, is_bot, is_scanner, created_at, updated_at
85            FROM users
86            WHERE status != $1
87            ORDER BY created_at DESC
88            "#,
89            deleted_status
90        )
91        .fetch_all(&*self.pool)
92        .await?;
93
94        Ok(rows)
95    }
96
97    pub async fn search(&self, query: &str, limit: i64) -> Result<Vec<User>> {
98        let safe_limit = limit.min(MAX_PAGE_SIZE);
99        let pattern = format!("%{}%", query);
100        let deleted_status = UserStatus::Deleted.as_str();
101        let rows = sqlx::query_as!(
102            User,
103            r#"
104            SELECT id, name, email, full_name, display_name, status, email_verified,
105                   roles, avatar_url, is_bot, is_scanner, created_at, updated_at
106            FROM users
107            WHERE status != $1
108              AND (name ILIKE $2 OR email ILIKE $2 OR full_name ILIKE $2)
109            ORDER BY
110                CASE WHEN name ILIKE $2 THEN 0 ELSE 1 END,
111                created_at DESC
112            LIMIT $3
113            "#,
114            deleted_status,
115            pattern,
116            safe_limit
117        )
118        .fetch_all(&*self.pool)
119        .await?;
120
121        Ok(rows)
122    }
123
124    pub async fn count(&self) -> Result<i64> {
125        let deleted_status = UserStatus::Deleted.as_str();
126        let result = sqlx::query_scalar!(
127            r#"SELECT COUNT(*) as "count!" FROM users WHERE status != $1"#,
128            deleted_status
129        )
130        .fetch_one(&*self.pool)
131        .await?;
132
133        Ok(result)
134    }
135
136    pub async fn bulk_update_status(&self, user_ids: &[UserId], new_status: &str) -> Result<u64> {
137        let ids: Vec<String> = user_ids.iter().map(ToString::to_string).collect();
138        let result = sqlx::query!(
139            r#"
140            UPDATE users
141            SET status = $1, updated_at = NOW()
142            WHERE id = ANY($2)
143            "#,
144            new_status,
145            &ids[..]
146        )
147        .execute(&*self.pool)
148        .await?;
149
150        Ok(result.rows_affected())
151    }
152
153    pub async fn bulk_delete(&self, user_ids: &[UserId]) -> Result<u64> {
154        let deleted_status = UserStatus::Deleted.as_str();
155        let ids: Vec<String> = user_ids.iter().map(ToString::to_string).collect();
156        let result = sqlx::query!(
157            r#"
158            UPDATE users
159            SET status = $1, updated_at = NOW()
160            WHERE id = ANY($2)
161            "#,
162            deleted_status,
163            &ids[..]
164        )
165        .execute(&*self.pool)
166        .await?;
167
168        Ok(result.rows_affected())
169    }
170
171    pub async fn list_by_filter(
172        &self,
173        status: Option<&str>,
174        role: Option<&str>,
175        older_than_days: Option<i64>,
176        limit: i64,
177    ) -> Result<Vec<User>> {
178        let safe_limit = limit.min(MAX_PAGE_SIZE);
179        let deleted_status = UserStatus::Deleted.as_str();
180
181        let rows = sqlx::query_as!(
182            User,
183            r#"
184            SELECT id, name, email, full_name, display_name, status, email_verified,
185                   roles, avatar_url, is_bot, is_scanner, created_at, updated_at
186            FROM users
187            WHERE status != $1
188              AND ($2::text IS NULL OR status = $2)
189              AND ($3::text IS NULL OR $3 = ANY(roles))
190              AND ($4::bigint IS NULL OR created_at < NOW() - make_interval(days => $4::int))
191            ORDER BY created_at DESC
192            LIMIT $5
193            "#,
194            deleted_status,
195            status,
196            role,
197            older_than_days,
198            safe_limit
199        )
200        .fetch_all(&*self.pool)
201        .await?;
202
203        Ok(rows)
204    }
205
206    pub async fn is_temporary_anonymous(&self, id: &UserId) -> Result<bool> {
207        let anonymous_role = UserRole::Anonymous.as_str();
208        let result = sqlx::query_scalar!(
209            r#"
210            SELECT $1 = ANY(roles) as "is_anonymous!"
211            FROM users
212            WHERE id = $2
213            "#,
214            anonymous_role,
215            id.as_str()
216        )
217        .fetch_optional(&*self.pool)
218        .await?;
219
220        result.ok_or(UserError::NotFound(id.clone()))
221    }
222
223    pub async fn list_non_anonymous_with_sessions(
224        &self,
225        limit: i64,
226    ) -> Result<Vec<UserWithSessions>> {
227        let safe_limit = limit.min(MAX_PAGE_SIZE);
228        let deleted_status = UserStatus::Deleted.as_str();
229        let anonymous_role = UserRole::Anonymous.as_str();
230        let rows = sqlx::query_as!(
231            UserWithSessions,
232            r#"
233            SELECT
234                u.id, u.name, u.email, u.full_name, u.status, u.roles, u.created_at,
235                COUNT(s.session_id) FILTER (WHERE s.ended_at IS NULL) as "active_sessions!",
236                MAX(s.last_activity_at) as last_session_at
237            FROM users u
238            LEFT JOIN user_sessions s ON s.user_id = u.id
239            WHERE u.status != $1
240              AND NOT ($2 = ANY(u.roles))
241            GROUP BY u.id
242            ORDER BY last_session_at DESC NULLS LAST
243            LIMIT $3
244            "#,
245            deleted_status,
246            anonymous_role,
247            safe_limit
248        )
249        .fetch_all(&*self.pool)
250        .await?;
251
252        Ok(rows)
253    }
254}