systemprompt_users/repository/user/
list.rs1use systemprompt_identifiers::UserId;
2
3use crate::error::{Result, UserError};
4use crate::models::{User, UserActivity, UserRole, UserStatus, UserWithSessions};
5use crate::repository::{UserRepository, MAX_PAGE_SIZE};
6
7impl UserRepository {
8 pub async fn find_with_sessions(&self, user_id: &UserId) -> Result<Option<UserWithSessions>> {
9 let deleted_status = UserStatus::Deleted.as_str();
10 let row = sqlx::query_as!(
11 UserWithSessions,
12 r#"
13 SELECT
14 u.id, u.name, u.email, u.full_name, u.status, u.roles, u.created_at,
15 COUNT(s.session_id) FILTER (WHERE s.ended_at IS NULL) as "active_sessions!",
16 MAX(s.last_activity_at) as last_session_at
17 FROM users u
18 LEFT JOIN user_sessions s ON s.user_id = u.id
19 WHERE u.id = $1 AND u.status != $2
20 GROUP BY u.id
21 "#,
22 user_id.as_str(),
23 deleted_status
24 )
25 .fetch_optional(&*self.pool)
26 .await?;
27
28 Ok(row)
29 }
30
31 pub async fn get_activity(&self, user_id: &UserId) -> Result<UserActivity> {
32 let row = sqlx::query_as!(
33 UserActivity,
34 r#"
35 SELECT
36 u.id as user_id,
37 MAX(s.last_activity_at) as last_active,
38 COUNT(DISTINCT s.session_id) as "session_count!",
39 COUNT(DISTINCT t.task_id) as "task_count!",
40 0::bigint as "message_count!"
41 FROM users u
42 LEFT JOIN user_sessions s ON s.user_id = u.id
43 LEFT JOIN agent_tasks t ON t.user_id = u.id
44 WHERE u.id = $1
45 GROUP BY u.id
46 "#,
47 user_id.as_str()
48 )
49 .fetch_one(&*self.pool)
50 .await?;
51
52 Ok(row)
53 }
54
55 pub async fn list(&self, limit: i64, offset: i64) -> Result<Vec<User>> {
56 let safe_limit = limit.min(MAX_PAGE_SIZE);
57 let deleted_status = UserStatus::Deleted.as_str();
58 let rows = sqlx::query_as!(
59 User,
60 r#"
61 SELECT id, name, email, full_name, display_name, status, email_verified,
62 roles, avatar_url, is_bot, is_scanner, created_at, updated_at
63 FROM users
64 WHERE status != $1
65 ORDER BY created_at DESC
66 LIMIT $2 OFFSET $3
67 "#,
68 deleted_status,
69 safe_limit,
70 offset
71 )
72 .fetch_all(&*self.pool)
73 .await?;
74
75 Ok(rows)
76 }
77
78 pub async fn list_all(&self) -> Result<Vec<User>> {
79 let deleted_status = UserStatus::Deleted.as_str();
80 let rows = sqlx::query_as!(
81 User,
82 r#"
83 SELECT id, name, email, full_name, display_name, status, email_verified,
84 roles, avatar_url, is_bot, is_scanner, created_at, updated_at
85 FROM users
86 WHERE status != $1
87 ORDER BY created_at DESC
88 "#,
89 deleted_status
90 )
91 .fetch_all(&*self.pool)
92 .await?;
93
94 Ok(rows)
95 }
96
97 pub async fn search(&self, query: &str, limit: i64) -> Result<Vec<User>> {
98 let safe_limit = limit.min(MAX_PAGE_SIZE);
99 let pattern = format!("%{}%", query);
100 let deleted_status = UserStatus::Deleted.as_str();
101 let rows = sqlx::query_as!(
102 User,
103 r#"
104 SELECT id, name, email, full_name, display_name, status, email_verified,
105 roles, avatar_url, is_bot, is_scanner, created_at, updated_at
106 FROM users
107 WHERE status != $1
108 AND (name ILIKE $2 OR email ILIKE $2 OR full_name ILIKE $2)
109 ORDER BY
110 CASE WHEN name ILIKE $2 THEN 0 ELSE 1 END,
111 created_at DESC
112 LIMIT $3
113 "#,
114 deleted_status,
115 pattern,
116 safe_limit
117 )
118 .fetch_all(&*self.pool)
119 .await?;
120
121 Ok(rows)
122 }
123
124 pub async fn count(&self) -> Result<i64> {
125 let deleted_status = UserStatus::Deleted.as_str();
126 let result = sqlx::query_scalar!(
127 r#"SELECT COUNT(*) as "count!" FROM users WHERE status != $1"#,
128 deleted_status
129 )
130 .fetch_one(&*self.pool)
131 .await?;
132
133 Ok(result)
134 }
135
136 pub async fn bulk_update_status(&self, user_ids: &[UserId], new_status: &str) -> Result<u64> {
137 let ids: Vec<String> = user_ids.iter().map(ToString::to_string).collect();
138 let result = sqlx::query!(
139 r#"
140 UPDATE users
141 SET status = $1, updated_at = NOW()
142 WHERE id = ANY($2)
143 "#,
144 new_status,
145 &ids[..]
146 )
147 .execute(&*self.write_pool)
148 .await?;
149
150 Ok(result.rows_affected())
151 }
152
153 pub async fn bulk_delete(&self, user_ids: &[UserId]) -> Result<u64> {
154 let deleted_status = UserStatus::Deleted.as_str();
155 let ids: Vec<String> = user_ids.iter().map(ToString::to_string).collect();
156 let result = sqlx::query!(
157 r#"
158 UPDATE users
159 SET status = $1, updated_at = NOW()
160 WHERE id = ANY($2)
161 "#,
162 deleted_status,
163 &ids[..]
164 )
165 .execute(&*self.write_pool)
166 .await?;
167
168 Ok(result.rows_affected())
169 }
170
171 pub async fn list_by_filter(
172 &self,
173 status: Option<&str>,
174 role: Option<&str>,
175 older_than_days: Option<i64>,
176 limit: i64,
177 ) -> Result<Vec<User>> {
178 let safe_limit = limit.min(MAX_PAGE_SIZE);
179 let deleted_status = UserStatus::Deleted.as_str();
180
181 let rows = sqlx::query_as!(
182 User,
183 r#"
184 SELECT id, name, email, full_name, display_name, status, email_verified,
185 roles, avatar_url, is_bot, is_scanner, created_at, updated_at
186 FROM users
187 WHERE status != $1
188 AND ($2::text IS NULL OR status = $2)
189 AND ($3::text IS NULL OR $3 = ANY(roles))
190 AND ($4::bigint IS NULL OR created_at < NOW() - make_interval(days => $4::int))
191 ORDER BY created_at DESC
192 LIMIT $5
193 "#,
194 deleted_status,
195 status,
196 role,
197 older_than_days,
198 safe_limit
199 )
200 .fetch_all(&*self.pool)
201 .await?;
202
203 Ok(rows)
204 }
205
206 pub async fn is_temporary_anonymous(&self, id: &UserId) -> Result<bool> {
207 let anonymous_role = UserRole::Anonymous.as_str();
208 let result = sqlx::query_scalar!(
209 r#"
210 SELECT $1 = ANY(roles) as "is_anonymous!"
211 FROM users
212 WHERE id = $2
213 "#,
214 anonymous_role,
215 id.as_str()
216 )
217 .fetch_optional(&*self.pool)
218 .await?;
219
220 result.ok_or(UserError::NotFound(id.clone()))
221 }
222
223 pub async fn list_non_anonymous_with_sessions(
224 &self,
225 limit: i64,
226 ) -> Result<Vec<UserWithSessions>> {
227 let safe_limit = limit.min(MAX_PAGE_SIZE);
228 let deleted_status = UserStatus::Deleted.as_str();
229 let anonymous_role = UserRole::Anonymous.as_str();
230 let rows = sqlx::query_as!(
231 UserWithSessions,
232 r#"
233 SELECT
234 u.id, u.name, u.email, u.full_name, u.status, u.roles, u.created_at,
235 COUNT(s.session_id) FILTER (WHERE s.ended_at IS NULL) as "active_sessions!",
236 MAX(s.last_activity_at) as last_session_at
237 FROM users u
238 LEFT JOIN user_sessions s ON s.user_id = u.id
239 WHERE u.status != $1
240 AND NOT ($2 = ANY(u.roles))
241 GROUP BY u.id
242 ORDER BY last_session_at DESC NULLS LAST
243 LIMIT $3
244 "#,
245 deleted_status,
246 anonymous_role,
247 safe_limit
248 )
249 .fetch_all(&*self.pool)
250 .await?;
251
252 Ok(rows)
253 }
254}