raps_admin/operations/
remove_user.rs1use std::sync::Arc;
7
8use anyhow::{Context, Result};
9use uuid::Uuid;
10
11use raps_acc::admin::AccountAdminClient;
12use raps_acc::users::ProjectUsersClient;
13
14use crate::bulk::executor::{
15 BulkConfig, BulkExecutor, BulkOperationResult, ItemResult, ProcessItem, ProgressUpdate,
16};
17use crate::bulk::state::{StateManager, StateUpdate};
18use crate::error::AdminError;
19use crate::filter::ProjectFilter;
20use crate::types::OperationType;
21
22#[derive(Debug, Clone)]
24pub struct BulkRemoveUserParams {
25 pub account_id: String,
27 pub user_email: String,
29}
30
31pub async fn bulk_remove_user<P>(
45 admin_client: &AccountAdminClient,
46 users_client: Arc<ProjectUsersClient>,
47 account_id: &str,
48 user_email: &str,
49 project_filter: &ProjectFilter,
50 config: BulkConfig,
51 on_progress: P,
52) -> Result<BulkOperationResult>
53where
54 P: Fn(ProgressUpdate) + Send + Sync + 'static,
55{
56 let user = admin_client
58 .find_user_by_email(account_id, user_email)
59 .await?
60 .ok_or_else(|| AdminError::UserNotFound {
61 email: user_email.to_string(),
62 })?;
63
64 let user_id = user.id.clone();
65
66 let all_projects = admin_client.list_all_projects(account_id).await?;
68 let filtered_projects = project_filter.apply(all_projects);
69
70 if filtered_projects.is_empty() {
71 return Ok(BulkOperationResult {
72 operation_id: Uuid::new_v4(),
73 total: 0,
74 completed: 0,
75 failed: 0,
76 skipped: 0,
77 duration: std::time::Duration::from_secs(0),
78 details: vec![],
79 });
80 }
81
82 let state_manager = StateManager::new()?;
84 let project_ids: Vec<String> = filtered_projects.iter().map(|p| p.id.clone()).collect();
85
86 let params = serde_json::json!({
87 "account_id": account_id,
88 "user_email": user_email,
89 "user_id": user_id,
90 });
91
92 let operation_id = state_manager
93 .create_operation(OperationType::RemoveUser, params, project_ids)
94 .await?;
95
96 state_manager
98 .update_state(
99 operation_id,
100 StateUpdate::StatusChanged {
101 status: crate::types::OperationStatus::InProgress,
102 },
103 )
104 .await?;
105
106 let items: Vec<ProcessItem> = filtered_projects
108 .into_iter()
109 .map(|p| ProcessItem {
110 project_id: p.id,
111 project_name: Some(p.name),
112 })
113 .collect();
114
115 let user_id_clone = user_id.clone();
117 let users_client_clone = Arc::clone(&users_client);
118
119 let processor = move |project_id: String| {
120 let user_id = user_id_clone.clone();
121 let users_client = Arc::clone(&users_client_clone);
122
123 async move { remove_user_from_project(&users_client, &project_id, &user_id).await }
124 };
125
126 let executor = BulkExecutor::new(config);
128 let result = executor
129 .execute(operation_id, items, processor, on_progress)
130 .await;
131
132 let final_status = if result.failed > 0 {
134 crate::types::OperationStatus::Failed
135 } else {
136 crate::types::OperationStatus::Completed
137 };
138
139 state_manager
140 .complete_operation(operation_id, final_status)
141 .await?;
142
143 Ok(result)
144}
145
146async fn remove_user_from_project(
148 users_client: &ProjectUsersClient,
149 project_id: &str,
150 user_id: &str,
151) -> ItemResult {
152 match users_client.user_exists(project_id, user_id).await {
154 Ok(exists) => {
155 if !exists {
156 return ItemResult::Skipped {
158 reason: "user_not_in_project".to_string(),
159 };
160 }
161 }
162 Err(e) => {
163 let error_str = e.to_string();
164 return ItemResult::Failed {
165 error: format!("Failed to check user existence: {}", error_str),
166 retryable: is_retryable_error(&error_str),
167 };
168 }
169 }
170
171 match users_client.remove_user(project_id, user_id).await {
173 Ok(()) => ItemResult::Success,
174 Err(e) => {
175 let error_str = e.to_string();
176 if error_str.contains("404") || error_str.contains("not found") {
178 return ItemResult::Skipped {
179 reason: "user_not_in_project".to_string(),
180 };
181 }
182 ItemResult::Failed {
183 error: error_str.clone(),
184 retryable: is_retryable_error(&error_str),
185 }
186 }
187 }
188}
189
190fn is_retryable_error(error: &str) -> bool {
192 let lower = error.to_lowercase();
193 lower.contains("429")
194 || lower.contains("rate limit")
195 || lower.contains("too many requests")
196 || lower.contains("503")
197 || lower.contains("service unavailable")
198 || lower.contains("502")
199 || lower.contains("bad gateway")
200 || lower.contains("timeout")
201 || lower.contains("connection")
202}
203
204pub async fn resume_bulk_remove_user<P>(
206 users_client: Arc<ProjectUsersClient>,
207 operation_id: Uuid,
208 config: BulkConfig,
209 on_progress: P,
210) -> Result<BulkOperationResult>
211where
212 P: Fn(ProgressUpdate) + Send + Sync + 'static,
213{
214 let state_manager = StateManager::new()?;
215 let state = state_manager.load_operation(operation_id).await?;
216
217 let user_id = state.parameters["user_id"]
219 .as_str()
220 .context("Missing user_id in operation parameters")?
221 .to_string();
222
223 let pending_project_ids = state_manager.get_pending_projects(&state);
225
226 if pending_project_ids.is_empty() {
227 return Ok(BulkOperationResult {
228 operation_id,
229 total: state.project_ids.len(),
230 completed: state
231 .results
232 .values()
233 .filter(|r| matches!(r.result, ItemResult::Success))
234 .count(),
235 failed: state
236 .results
237 .values()
238 .filter(|r| matches!(r.result, ItemResult::Failed { .. }))
239 .count(),
240 skipped: state
241 .results
242 .values()
243 .filter(|r| matches!(r.result, ItemResult::Skipped { .. }))
244 .count(),
245 duration: std::time::Duration::from_secs(0),
246 details: vec![],
247 });
248 }
249
250 state_manager
252 .update_state(
253 operation_id,
254 StateUpdate::StatusChanged {
255 status: crate::types::OperationStatus::InProgress,
256 },
257 )
258 .await?;
259
260 let items: Vec<ProcessItem> = pending_project_ids
262 .into_iter()
263 .map(|id| ProcessItem {
264 project_id: id,
265 project_name: None,
266 })
267 .collect();
268
269 let users_client_clone = Arc::clone(&users_client);
271
272 let processor = move |project_id: String| {
273 let user_id = user_id.clone();
274 let users_client = Arc::clone(&users_client_clone);
275
276 async move { remove_user_from_project(&users_client, &project_id, &user_id).await }
277 };
278
279 let executor = BulkExecutor::new(config);
281 let result = executor
282 .execute(operation_id, items, processor, on_progress)
283 .await;
284
285 let final_status = if result.failed > 0 {
287 crate::types::OperationStatus::Failed
288 } else {
289 crate::types::OperationStatus::Completed
290 };
291
292 state_manager
293 .complete_operation(operation_id, final_status)
294 .await?;
295
296 Ok(result)
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn test_is_retryable_error() {
305 assert!(is_retryable_error("429 Too Many Requests"));
306 assert!(is_retryable_error("Rate limit exceeded"));
307 assert!(is_retryable_error("503 Service Unavailable"));
308 assert!(!is_retryable_error("404 Not Found"));
309 assert!(!is_retryable_error("403 Forbidden"));
310 }
311}