Skip to main content

raps_admin/operations/
remove_user.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2025 Dmytro Yemelianov
3
4//! Bulk remove user operation
5
6use std::sync::Arc;
7
8use anyhow::{Context, Result};
9use uuid::Uuid;
10
11use raps_acc::admin::AccountAdminClient;
12use raps_acc::users::ProjectUsersClient;
13
14use crate::bulk::executor::{
15    BulkConfig, BulkExecutor, BulkOperationResult, ItemResult, ProcessItem, ProgressUpdate,
16};
17use crate::bulk::state::{StateManager, StateUpdate};
18use crate::error::AdminError;
19use crate::filter::ProjectFilter;
20use crate::types::OperationType;
21
22/// Parameters for bulk remove user operation
23#[derive(Debug, Clone)]
24pub struct BulkRemoveUserParams {
25    /// Account ID
26    pub account_id: String,
27    /// User email to remove
28    pub user_email: String,
29}
30
31/// Remove a user from multiple projects in bulk
32///
33/// # Arguments
34/// * `admin_client` - Client for account admin API (user/project lookup)
35/// * `users_client` - Client for project users API (remove user)
36/// * `account_id` - The account ID
37/// * `user_email` - Email of the user to remove
38/// * `project_filter` - Filter for selecting target projects
39/// * `config` - Bulk execution configuration
40/// * `on_progress` - Progress callback
41///
42/// # Returns
43/// Result containing the bulk operation outcome
44pub async fn bulk_remove_user<P>(
45    admin_client: &AccountAdminClient,
46    users_client: Arc<ProjectUsersClient>,
47    account_id: &str,
48    user_email: &str,
49    project_filter: &ProjectFilter,
50    config: BulkConfig,
51    on_progress: P,
52) -> Result<BulkOperationResult>
53where
54    P: Fn(ProgressUpdate) + Send + Sync + 'static,
55{
56    // Step 1: Look up user by email to get their user ID
57    let user = admin_client
58        .find_user_by_email(account_id, user_email)
59        .await?
60        .ok_or_else(|| AdminError::UserNotFound {
61            email: user_email.to_string(),
62        })?;
63
64    let user_id = user.id.clone();
65
66    // Step 2: Get list of projects matching the filter
67    let all_projects = admin_client.list_all_projects(account_id).await?;
68    let filtered_projects = project_filter.apply(all_projects);
69
70    if filtered_projects.is_empty() {
71        return Ok(BulkOperationResult {
72            operation_id: Uuid::new_v4(),
73            total: 0,
74            completed: 0,
75            failed: 0,
76            skipped: 0,
77            duration: std::time::Duration::from_secs(0),
78            details: vec![],
79        });
80    }
81
82    // Step 3: Create operation state for resumability
83    let state_manager = StateManager::new()?;
84    let project_ids: Vec<String> = filtered_projects.iter().map(|p| p.id.clone()).collect();
85
86    let params = serde_json::json!({
87        "account_id": account_id,
88        "user_email": user_email,
89        "user_id": user_id,
90    });
91
92    let operation_id = state_manager
93        .create_operation(OperationType::RemoveUser, params, project_ids)
94        .await?;
95
96    // Mark operation as in progress
97    state_manager
98        .update_state(
99            operation_id,
100            StateUpdate::StatusChanged {
101                status: crate::types::OperationStatus::InProgress,
102            },
103        )
104        .await?;
105
106    // Step 4: Prepare items for processing
107    let items: Vec<ProcessItem> = filtered_projects
108        .into_iter()
109        .map(|p| ProcessItem {
110            project_id: p.id,
111            project_name: Some(p.name),
112        })
113        .collect();
114
115    // Step 5: Create the processor closure
116    let user_id_clone = user_id.clone();
117    let users_client_clone = Arc::clone(&users_client);
118
119    let processor = move |project_id: String| {
120        let user_id = user_id_clone.clone();
121        let users_client = Arc::clone(&users_client_clone);
122
123        async move { remove_user_from_project(&users_client, &project_id, &user_id).await }
124    };
125
126    // Step 6: Execute bulk operation
127    let executor = BulkExecutor::new(config);
128    let result = executor
129        .execute(operation_id, items, processor, on_progress)
130        .await;
131
132    // Step 7: Update final operation status
133    let final_status = if result.failed > 0 {
134        crate::types::OperationStatus::Failed
135    } else {
136        crate::types::OperationStatus::Completed
137    };
138
139    state_manager
140        .complete_operation(operation_id, final_status)
141        .await?;
142
143    Ok(result)
144}
145
146/// Remove a single user from a single project
147async fn remove_user_from_project(
148    users_client: &ProjectUsersClient,
149    project_id: &str,
150    user_id: &str,
151) -> ItemResult {
152    // First, check if user exists in this project
153    match users_client.user_exists(project_id, user_id).await {
154        Ok(exists) => {
155            if !exists {
156                // User not in project - skip (not error)
157                return ItemResult::Skipped {
158                    reason: "user_not_in_project".to_string(),
159                };
160            }
161        }
162        Err(e) => {
163            let error_str = e.to_string();
164            return ItemResult::Failed {
165                error: format!("Failed to check user existence: {}", error_str),
166                retryable: is_retryable_error(&error_str),
167            };
168        }
169    }
170
171    // Remove the user from the project
172    match users_client.remove_user(project_id, user_id).await {
173        Ok(()) => ItemResult::Success,
174        Err(e) => {
175            let error_str = e.to_string();
176            // Handle 404 as skip (user may have been removed between check and delete)
177            if error_str.contains("404") || error_str.contains("not found") {
178                return ItemResult::Skipped {
179                    reason: "user_not_in_project".to_string(),
180                };
181            }
182            ItemResult::Failed {
183                error: error_str.clone(),
184                retryable: is_retryable_error(&error_str),
185            }
186        }
187    }
188}
189
190/// Check if an error is retryable
191fn is_retryable_error(error: &str) -> bool {
192    let lower = error.to_lowercase();
193    lower.contains("429")
194        || lower.contains("rate limit")
195        || lower.contains("too many requests")
196        || lower.contains("503")
197        || lower.contains("service unavailable")
198        || lower.contains("502")
199        || lower.contains("bad gateway")
200        || lower.contains("timeout")
201        || lower.contains("connection")
202}
203
204/// Resume an interrupted bulk remove user operation
205pub async fn resume_bulk_remove_user<P>(
206    users_client: Arc<ProjectUsersClient>,
207    operation_id: Uuid,
208    config: BulkConfig,
209    on_progress: P,
210) -> Result<BulkOperationResult>
211where
212    P: Fn(ProgressUpdate) + Send + Sync + 'static,
213{
214    let state_manager = StateManager::new()?;
215    let state = state_manager.load_operation(operation_id).await?;
216
217    // Get parameters from saved state
218    let user_id = state.parameters["user_id"]
219        .as_str()
220        .context("Missing user_id in operation parameters")?
221        .to_string();
222
223    // Get pending projects
224    let pending_project_ids = state_manager.get_pending_projects(&state);
225
226    if pending_project_ids.is_empty() {
227        return Ok(BulkOperationResult {
228            operation_id,
229            total: state.project_ids.len(),
230            completed: state
231                .results
232                .values()
233                .filter(|r| matches!(r.result, ItemResult::Success))
234                .count(),
235            failed: state
236                .results
237                .values()
238                .filter(|r| matches!(r.result, ItemResult::Failed { .. }))
239                .count(),
240            skipped: state
241                .results
242                .values()
243                .filter(|r| matches!(r.result, ItemResult::Skipped { .. }))
244                .count(),
245            duration: std::time::Duration::from_secs(0),
246            details: vec![],
247        });
248    }
249
250    // Mark operation as in progress again
251    state_manager
252        .update_state(
253            operation_id,
254            StateUpdate::StatusChanged {
255                status: crate::types::OperationStatus::InProgress,
256            },
257        )
258        .await?;
259
260    // Prepare items for processing
261    let items: Vec<ProcessItem> = pending_project_ids
262        .into_iter()
263        .map(|id| ProcessItem {
264            project_id: id,
265            project_name: None,
266        })
267        .collect();
268
269    // Create the processor closure
270    let users_client_clone = Arc::clone(&users_client);
271
272    let processor = move |project_id: String| {
273        let user_id = user_id.clone();
274        let users_client = Arc::clone(&users_client_clone);
275
276        async move { remove_user_from_project(&users_client, &project_id, &user_id).await }
277    };
278
279    // Execute bulk operation
280    let executor = BulkExecutor::new(config);
281    let result = executor
282        .execute(operation_id, items, processor, on_progress)
283        .await;
284
285    // Update final operation status
286    let final_status = if result.failed > 0 {
287        crate::types::OperationStatus::Failed
288    } else {
289        crate::types::OperationStatus::Completed
290    };
291
292    state_manager
293        .complete_operation(operation_id, final_status)
294        .await?;
295
296    Ok(result)
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn test_is_retryable_error() {
305        assert!(is_retryable_error("429 Too Many Requests"));
306        assert!(is_retryable_error("Rate limit exceeded"));
307        assert!(is_retryable_error("503 Service Unavailable"));
308        assert!(!is_retryable_error("404 Not Found"));
309        assert!(!is_retryable_error("403 Forbidden"));
310    }
311}