Skip to main content

raps_admin/operations/
update_role.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2025 Dmytro Yemelianov
3
4//! Bulk update role operation
5
6use std::sync::Arc;
7
8use anyhow::{Context, Result};
9use uuid::Uuid;
10
11use raps_acc::admin::AccountAdminClient;
12use raps_acc::users::{ProjectUsersClient, UpdateProjectUserRequest};
13
14use crate::bulk::executor::{
15    BulkConfig, BulkExecutor, BulkOperationResult, ItemResult, ProcessItem, ProgressUpdate,
16};
17use crate::bulk::state::{StateManager, StateUpdate};
18use crate::error::AdminError;
19use crate::filter::ProjectFilter;
20use crate::types::OperationType;
21
22/// Parameters for bulk update role operation
23#[derive(Debug, Clone)]
24pub struct BulkUpdateRoleParams {
25    /// Account ID
26    pub account_id: String,
27    /// User email to update
28    pub user_email: String,
29    /// New role ID to assign
30    pub new_role_id: String,
31    /// Only update if user has this current role (optional filter)
32    pub from_role_id: Option<String>,
33}
34
35/// Update a user's role across multiple projects in bulk
36///
37/// # Arguments
38/// * `admin_client` - Client for account admin API (user/project lookup)
39/// * `users_client` - Client for project users API (update user)
40/// * `account_id` - The account ID
41/// * `user_email` - Email of the user to update
42/// * `new_role_id` - The new role ID to assign
43/// * `from_role_id` - Only update if user has this current role (optional)
44/// * `project_filter` - Filter for selecting target projects
45/// * `config` - Bulk execution configuration
46/// * `on_progress` - Progress callback
47///
48/// # Returns
49/// Result containing the bulk operation outcome
50#[allow(clippy::too_many_arguments)]
51pub async fn bulk_update_role<P>(
52    admin_client: &AccountAdminClient,
53    users_client: Arc<ProjectUsersClient>,
54    account_id: &str,
55    user_email: &str,
56    new_role_id: &str,
57    from_role_id: Option<&str>,
58    project_filter: &ProjectFilter,
59    config: BulkConfig,
60    on_progress: P,
61) -> Result<BulkOperationResult>
62where
63    P: Fn(ProgressUpdate) + Send + Sync + 'static,
64{
65    // Step 1: Look up user by email to get their user ID
66    let user = admin_client
67        .find_user_by_email(account_id, user_email)
68        .await?
69        .ok_or_else(|| AdminError::UserNotFound {
70            email: user_email.to_string(),
71        })?;
72
73    let user_id = user.id.clone();
74
75    // Step 2: Get list of projects matching the filter
76    let all_projects = admin_client.list_all_projects(account_id).await?;
77    let filtered_projects = project_filter.apply(all_projects);
78
79    if filtered_projects.is_empty() {
80        return Ok(BulkOperationResult {
81            operation_id: Uuid::new_v4(),
82            total: 0,
83            completed: 0,
84            failed: 0,
85            skipped: 0,
86            duration: std::time::Duration::from_secs(0),
87            details: vec![],
88        });
89    }
90
91    // Step 3: Create operation state for resumability
92    let state_manager = StateManager::new()?;
93    let project_ids: Vec<String> = filtered_projects.iter().map(|p| p.id.clone()).collect();
94
95    let params = serde_json::json!({
96        "account_id": account_id,
97        "user_email": user_email,
98        "user_id": user_id,
99        "new_role_id": new_role_id,
100        "from_role_id": from_role_id,
101    });
102
103    let operation_id = state_manager
104        .create_operation(OperationType::UpdateRole, params, project_ids)
105        .await?;
106
107    // Mark operation as in progress
108    state_manager
109        .update_state(
110            operation_id,
111            StateUpdate::StatusChanged {
112                status: crate::types::OperationStatus::InProgress,
113            },
114        )
115        .await?;
116
117    // Step 4: Prepare items for processing
118    let items: Vec<ProcessItem> = filtered_projects
119        .into_iter()
120        .map(|p| ProcessItem {
121            project_id: p.id,
122            project_name: Some(p.name),
123        })
124        .collect();
125
126    // Step 5: Create the processor closure
127    let user_id_clone = user_id.clone();
128    let new_role_id_clone = new_role_id.to_string();
129    let from_role_id_clone = from_role_id.map(|s| s.to_string());
130    let users_client_clone = Arc::clone(&users_client);
131
132    let processor = move |project_id: String| {
133        let user_id = user_id_clone.clone();
134        let new_role_id = new_role_id_clone.clone();
135        let from_role_id = from_role_id_clone.clone();
136        let users_client = Arc::clone(&users_client_clone);
137
138        async move {
139            update_user_role(
140                &users_client,
141                &project_id,
142                &user_id,
143                &new_role_id,
144                from_role_id.as_deref(),
145            )
146            .await
147        }
148    };
149
150    // Step 6: Execute bulk operation
151    let executor = BulkExecutor::new(config);
152    let result = executor
153        .execute(operation_id, items, processor, on_progress)
154        .await;
155
156    // Step 7: Update final operation status
157    let final_status = if result.failed > 0 {
158        crate::types::OperationStatus::Failed
159    } else {
160        crate::types::OperationStatus::Completed
161    };
162
163    state_manager
164        .complete_operation(operation_id, final_status)
165        .await?;
166
167    Ok(result)
168}
169
170/// Update a single user's role in a single project with from-role filtering
171async fn update_user_role(
172    users_client: &ProjectUsersClient,
173    project_id: &str,
174    user_id: &str,
175    new_role_id: &str,
176    from_role_id: Option<&str>,
177) -> ItemResult {
178    // First, check if user exists in this project and get their current role
179    let current_user = match users_client.get_project_user(project_id, user_id).await {
180        Ok(user) => user,
181        Err(e) => {
182            let error_str = e.to_string();
183            // If user not found in project, skip
184            if error_str.contains("404") || error_str.contains("not found") {
185                return ItemResult::Skipped {
186                    reason: "user_not_in_project".to_string(),
187                };
188            }
189            return ItemResult::Failed {
190                error: format!("Failed to get user: {}", error_str),
191                retryable: is_retryable_error(&error_str),
192            };
193        }
194    };
195
196    // Check from-role filter if specified
197    if let Some(from_role) = from_role_id {
198        let current_role = current_user.role_id.as_deref().unwrap_or("");
199        if current_role != from_role {
200            return ItemResult::Skipped {
201                reason: format!("role_mismatch: current={}", current_role),
202            };
203        }
204    }
205
206    // Check if already has the target role
207    if current_user.role_id.as_deref() == Some(new_role_id) {
208        return ItemResult::Skipped {
209            reason: "already_has_role".to_string(),
210        };
211    }
212
213    // Update the user's role
214    let request = UpdateProjectUserRequest {
215        role_id: Some(new_role_id.to_string()),
216        products: None,
217    };
218
219    match users_client.update_user(project_id, user_id, request).await {
220        Ok(_) => ItemResult::Success,
221        Err(e) => {
222            let error_str = e.to_string();
223            ItemResult::Failed {
224                error: error_str.clone(),
225                retryable: is_retryable_error(&error_str),
226            }
227        }
228    }
229}
230
231/// Check if an error is retryable
232fn is_retryable_error(error: &str) -> bool {
233    let lower = error.to_lowercase();
234    lower.contains("429")
235        || lower.contains("rate limit")
236        || lower.contains("too many requests")
237        || lower.contains("503")
238        || lower.contains("service unavailable")
239        || lower.contains("502")
240        || lower.contains("bad gateway")
241        || lower.contains("timeout")
242        || lower.contains("connection")
243}
244
245/// Resume an interrupted bulk update role operation
246pub async fn resume_bulk_update_role<P>(
247    users_client: Arc<ProjectUsersClient>,
248    operation_id: Uuid,
249    config: BulkConfig,
250    on_progress: P,
251) -> Result<BulkOperationResult>
252where
253    P: Fn(ProgressUpdate) + Send + Sync + 'static,
254{
255    let state_manager = StateManager::new()?;
256    let state = state_manager.load_operation(operation_id).await?;
257
258    // Get parameters from saved state
259    let user_id = state.parameters["user_id"]
260        .as_str()
261        .context("Missing user_id in operation parameters")?
262        .to_string();
263
264    let new_role_id = state.parameters["new_role_id"]
265        .as_str()
266        .context("Missing new_role_id in operation parameters")?
267        .to_string();
268
269    let from_role_id = state.parameters["from_role_id"]
270        .as_str()
271        .map(|s| s.to_string());
272
273    // Get pending projects
274    let pending_project_ids = state_manager.get_pending_projects(&state);
275
276    if pending_project_ids.is_empty() {
277        return Ok(BulkOperationResult {
278            operation_id,
279            total: state.project_ids.len(),
280            completed: state
281                .results
282                .values()
283                .filter(|r| matches!(r.result, ItemResult::Success))
284                .count(),
285            failed: state
286                .results
287                .values()
288                .filter(|r| matches!(r.result, ItemResult::Failed { .. }))
289                .count(),
290            skipped: state
291                .results
292                .values()
293                .filter(|r| matches!(r.result, ItemResult::Skipped { .. }))
294                .count(),
295            duration: std::time::Duration::from_secs(0),
296            details: vec![],
297        });
298    }
299
300    // Mark operation as in progress again
301    state_manager
302        .update_state(
303            operation_id,
304            StateUpdate::StatusChanged {
305                status: crate::types::OperationStatus::InProgress,
306            },
307        )
308        .await?;
309
310    // Prepare items for processing
311    let items: Vec<ProcessItem> = pending_project_ids
312        .into_iter()
313        .map(|id| ProcessItem {
314            project_id: id,
315            project_name: None,
316        })
317        .collect();
318
319    // Create the processor closure
320    let users_client_clone = Arc::clone(&users_client);
321
322    let processor = move |project_id: String| {
323        let user_id = user_id.clone();
324        let new_role_id = new_role_id.clone();
325        let from_role_id = from_role_id.clone();
326        let users_client = Arc::clone(&users_client_clone);
327
328        async move {
329            update_user_role(
330                &users_client,
331                &project_id,
332                &user_id,
333                &new_role_id,
334                from_role_id.as_deref(),
335            )
336            .await
337        }
338    };
339
340    // Execute bulk operation
341    let executor = BulkExecutor::new(config);
342    let result = executor
343        .execute(operation_id, items, processor, on_progress)
344        .await;
345
346    // Update final operation status
347    let final_status = if result.failed > 0 {
348        crate::types::OperationStatus::Failed
349    } else {
350        crate::types::OperationStatus::Completed
351    };
352
353    state_manager
354        .complete_operation(operation_id, final_status)
355        .await?;
356
357    Ok(result)
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    #[test]
365    fn test_is_retryable_error() {
366        assert!(is_retryable_error("429 Too Many Requests"));
367        assert!(is_retryable_error("Rate limit exceeded"));
368        assert!(is_retryable_error("503 Service Unavailable"));
369        assert!(!is_retryable_error("404 Not Found"));
370        assert!(!is_retryable_error("403 Forbidden"));
371    }
372}