1use super::{
7 MigrationConfig, MigrationError, MigrationMetrics, MigrationOperation, MigrationPlan,
8 MigrationResult, MigrationStatus,
9};
10use std::collections::HashMap;
11use tokio::fs;
12use uuid::Uuid;
13
14#[cfg(feature = "enhanced-rbac")]
15use role_system::{AsyncRoleSystem, MemoryStorage as RoleMemoryStorage, Permission, Role, Subject};
16
17struct ExecutionContext<'a> {
25 config: &'a MigrationConfig,
27
28 permission_registry: HashMap<String, (String, String)>,
32
33 #[cfg(feature = "enhanced-rbac")]
36 role_system: Option<&'a AsyncRoleSystem<RoleMemoryStorage>>,
37}
38
39impl<'a> ExecutionContext<'a> {
40 fn new(config: &'a MigrationConfig) -> Self {
42 Self {
43 config,
44 permission_registry: HashMap::new(),
45 #[cfg(feature = "enhanced-rbac")]
46 role_system: None,
47 }
48 }
49
50 #[cfg(feature = "enhanced-rbac")]
52 fn with_role_system(mut self, rs: &'a AsyncRoleSystem<RoleMemoryStorage>) -> Self {
53 self.role_system = Some(rs);
54 self
55 }
56}
57
58pub async fn execute_migration_plan(
63 plan: &MigrationPlan,
64 config: &MigrationConfig,
65) -> Result<MigrationResult, MigrationError> {
66 let mut ctx = ExecutionContext::new(config);
67 execute_migration_plan_inner(plan, &mut ctx).await
68}
69
70#[cfg(feature = "enhanced-rbac")]
96pub async fn execute_migration_plan_with_role_system(
97 plan: &MigrationPlan,
98 config: &MigrationConfig,
99 role_system: &AsyncRoleSystem<RoleMemoryStorage>,
100) -> Result<MigrationResult, MigrationError> {
101 let mut ctx = ExecutionContext::new(config).with_role_system(role_system);
102 execute_migration_plan_inner(plan, &mut ctx).await
103}
104
105async fn execute_migration_plan_inner(
107 plan: &MigrationPlan,
108 ctx: &mut ExecutionContext<'_>,
109) -> Result<MigrationResult, MigrationError> {
110 let config = ctx.config;
112
113 let _execution_id = Uuid::new_v4().to_string();
114 let started_at = chrono::Utc::now();
115
116 let mut result = MigrationResult {
117 plan_id: plan.id.clone(),
118 status: MigrationStatus::InProgress,
119 started_at,
120 completed_at: None,
121 phases_completed: Vec::new(),
122 current_phase: None,
123 errors: Vec::new(),
124 warnings: Vec::new(),
125 metrics: MigrationMetrics {
126 roles_migrated: 0,
127 permissions_migrated: 0,
128 users_migrated: 0,
129 errors_encountered: 0,
130 warnings_generated: 0,
131 validation_failures: 0,
132 rollback_count: 0,
133 },
134 };
135
136 save_migration_status(&result, config).await?;
138
139 if config.dry_run {
140 log_message(config, "DRY RUN MODE - No actual changes will be made");
141 return execute_dry_run(plan, config, result).await;
142 }
143
144 if let Err(e) = execute_pre_validation(plan, config, &mut result).await {
146 result.status = MigrationStatus::Failed;
147 result.errors.push(format!("Pre-validation failed: {}", e));
148 save_migration_status(&result, config).await?;
149 return Ok(result);
150 }
151
152 for phase in &plan.phases {
154 result.current_phase = Some(phase.id.clone());
155 save_migration_status(&result, config).await?;
156
157 log_message(
158 config,
159 &format!("Executing phase: {} - {}", phase.id, phase.name),
160 );
161
162 match execute_phase(phase, ctx, &mut result).await {
163 Ok(_) => {
164 result.phases_completed.push(phase.id.clone());
165 log_message(
166 config,
167 &format!("Phase '{}' completed successfully", phase.id),
168 );
169 }
170 Err(e) => {
171 result.status = MigrationStatus::Failed;
172 result
173 .errors
174 .push(format!("Phase '{}' failed: {}", phase.id, e));
175 result.metrics.errors_encountered += 1;
176
177 log_message(config, &format!("Phase '{}' failed: {}", phase.id, e));
178
179 if let Err(rollback_error) =
181 execute_rollback_for_phase(phase, config, &mut result).await
182 {
183 result.errors.push(format!(
184 "Rollback for phase '{}' failed: {}",
185 phase.id, rollback_error
186 ));
187 }
188
189 save_migration_status(&result, config).await?;
190 return Ok(result);
191 }
192 }
193 }
194
195 if let Err(e) = execute_post_validation(plan, config, &mut result).await {
197 result.status = MigrationStatus::Failed;
198 result.errors.push(format!("Post-validation failed: {}", e));
199 save_migration_status(&result, config).await?;
200 return Ok(result);
201 }
202
203 result.status = MigrationStatus::Completed;
205 result.completed_at = Some(chrono::Utc::now());
206 result.current_phase = None;
207
208 log_message(config, "Migration completed successfully");
209 save_migration_status(&result, config).await?;
210
211 Ok(result)
212}
213
214async fn execute_dry_run(
216 plan: &MigrationPlan,
217 config: &MigrationConfig,
218 mut result: MigrationResult,
219) -> Result<MigrationResult, MigrationError> {
220 log_message(config, "=== DRY RUN EXECUTION ===");
221
222 for phase in &plan.phases {
223 log_message(
224 config,
225 &format!("DRY RUN - Phase: {} - {}", phase.id, phase.name),
226 );
227
228 for operation in &phase.operations {
229 match operation {
230 MigrationOperation::CreateRole { role_id, name, .. } => {
231 log_message(
232 config,
233 &format!(" [DRY RUN] Would create role: {} ({})", role_id, name),
234 );
235 result.metrics.roles_migrated += 1;
236 }
237 MigrationOperation::CreatePermission {
238 permission_id,
239 action,
240 resource,
241 ..
242 } => {
243 log_message(
244 config,
245 &format!(
246 " [DRY RUN] Would create permission: {} ({}:{})",
247 permission_id, action, resource
248 ),
249 );
250 result.metrics.permissions_migrated += 1;
251 }
252 MigrationOperation::AssignUserRole {
253 user_id, role_id, ..
254 } => {
255 log_message(
256 config,
257 &format!(
258 " [DRY RUN] Would assign role {} to user {}",
259 role_id, user_id
260 ),
261 );
262 result.metrics.users_migrated += 1;
263 }
264 MigrationOperation::Backup {
265 backup_location,
266 backup_type,
267 } => {
268 log_message(
269 config,
270 &format!(
271 " [DRY RUN] Would create {:?} backup at {:?}",
272 backup_type, backup_location
273 ),
274 );
275 }
276 MigrationOperation::ValidateIntegrity {
277 validation_type, ..
278 } => {
279 log_message(
280 config,
281 &format!(" [DRY RUN] Would validate: {}", validation_type),
282 );
283 }
284 MigrationOperation::MigrateCustomAttribute { attribute_name, .. } => {
285 log_message(
286 config,
287 &format!(
288 " [DRY RUN] Would migrate custom attribute: {}",
289 attribute_name
290 ),
291 );
292 }
293 }
294 }
295
296 result.phases_completed.push(phase.id.clone());
297 }
298
299 result.status = MigrationStatus::Completed;
300 result.completed_at = Some(chrono::Utc::now());
301
302 log_message(config, "=== DRY RUN COMPLETED ===");
303
304 Ok(result)
305}
306
307async fn execute_pre_validation(
309 plan: &MigrationPlan,
310 config: &MigrationConfig,
311 result: &mut MigrationResult,
312) -> Result<(), MigrationError> {
313 log_message(config, "Executing pre-validation steps");
314
315 for step in &plan.pre_validation_steps {
316 log_message(
317 config,
318 &format!("Pre-validation: {} - {}", step.id, step.name),
319 );
320
321 match execute_validation_step(step, config).await {
322 Ok(_) => {
323 log_message(config, &format!("Pre-validation '{}' passed", step.id));
324 }
325 Err(e) => {
326 if step.required {
327 return Err(MigrationError::ValidationError(format!(
328 "Required pre-validation '{}' failed: {}",
329 step.id, e
330 )));
331 } else {
332 result.warnings.push(format!(
333 "Optional pre-validation '{}' failed: {}",
334 step.id, e
335 ));
336 result.metrics.warnings_generated += 1;
337 }
338 }
339 }
340 }
341
342 Ok(())
343}
344
345async fn execute_post_validation(
347 plan: &MigrationPlan,
348 config: &MigrationConfig,
349 result: &mut MigrationResult,
350) -> Result<(), MigrationError> {
351 log_message(config, "Executing post-validation steps");
352
353 for step in &plan.post_validation_steps {
354 log_message(
355 config,
356 &format!("Post-validation: {} - {}", step.id, step.name),
357 );
358
359 match execute_validation_step(step, config).await {
360 Ok(_) => {
361 log_message(config, &format!("Post-validation '{}' passed", step.id));
362 }
363 Err(e) => {
364 if step.required {
365 result.metrics.validation_failures += 1;
366 return Err(MigrationError::ValidationError(format!(
367 "Required post-validation '{}' failed: {}",
368 step.id, e
369 )));
370 } else {
371 result.warnings.push(format!(
372 "Optional post-validation '{}' failed: {}",
373 step.id, e
374 ));
375 result.metrics.warnings_generated += 1;
376 }
377 }
378 }
379 }
380
381 Ok(())
382}
383
384async fn execute_validation_step(
386 step: &super::ValidationStep,
387 config: &MigrationConfig,
388) -> Result<(), MigrationError> {
389 use super::ValidationType;
390
391 match &step.validation_type {
392 ValidationType::HierarchyIntegrity => validate_hierarchy_integrity(config).await,
393 ValidationType::PermissionConsistency => validate_permission_consistency(config).await,
394 ValidationType::UserAssignmentValidity => validate_user_assignments(config).await,
395 ValidationType::PrivilegeEscalationCheck => validate_no_privilege_escalation(config).await,
396 ValidationType::Custom(validation_name) => {
397 execute_custom_validation(validation_name, &step.parameters, config).await
398 }
399 }
400}
401
402async fn execute_phase(
404 phase: &super::MigrationPhase,
405 ctx: &mut ExecutionContext<'_>,
406 result: &mut MigrationResult,
407) -> Result<(), MigrationError> {
408 for operation in &phase.operations {
409 if let Err(e) = execute_operation(operation, ctx, result).await {
410 return Err(MigrationError::ExecutionError(format!(
411 "Operation failed in phase '{}': {}",
412 phase.id, e
413 )));
414 }
415 }
416 Ok(())
417}
418
419async fn execute_operation(
421 operation: &MigrationOperation,
422 ctx: &mut ExecutionContext<'_>,
423 result: &mut MigrationResult,
424) -> Result<(), MigrationError> {
425 let config = ctx.config;
426 match operation {
427 MigrationOperation::CreateRole {
428 role_id,
429 name,
430 description,
431 permissions,
432 parent_role,
433 } => {
434 execute_create_role(
435 role_id,
436 name,
437 description.as_deref(),
438 permissions,
439 parent_role.as_deref(),
440 ctx,
441 )
442 .await?;
443 result.metrics.roles_migrated += 1;
444 }
445 MigrationOperation::CreatePermission {
446 permission_id,
447 action,
448 resource,
449 conditions,
450 } => {
451 execute_create_permission(permission_id, action, resource, conditions, ctx).await?;
452 result.metrics.permissions_migrated += 1;
453 }
454 MigrationOperation::AssignUserRole {
455 user_id,
456 role_id,
457 expiration,
458 } => {
459 execute_assign_user_role(user_id, role_id, expiration.as_ref(), ctx).await?;
460 result.metrics.users_migrated += 1;
461 }
462 MigrationOperation::Backup {
463 backup_location,
464 backup_type,
465 } => {
466 execute_backup(backup_location, backup_type, config).await?;
467 }
468 MigrationOperation::ValidateIntegrity {
469 validation_type,
470 parameters,
471 } => {
472 execute_integrity_validation(validation_type, parameters, config).await?;
473 }
474 MigrationOperation::MigrateCustomAttribute {
475 attribute_name,
476 conversion_logic,
477 } => {
478 execute_custom_attribute_migration(attribute_name, conversion_logic, config).await?;
479 }
480 }
481
482 Ok(())
483}
484
485async fn execute_create_role(
487 role_id: &str,
488 name: &str,
489 description: Option<&str>,
490 permissions: &[String],
491 parent_role: Option<&str>,
492 ctx: &mut ExecutionContext<'_>,
493) -> Result<(), MigrationError> {
494 let config = ctx.config;
495 log_message(config, &format!("Creating role: {} ({})", role_id, name));
496
497 if config.verbose {
498 log_message(config, &format!(" Description: {:?}", description));
499 log_message(config, &format!(" Permissions: {:?}", permissions));
500 log_message(config, &format!(" Parent role: {:?}", parent_role));
501 }
502
503 #[cfg(feature = "enhanced-rbac")]
505 if let Some(rs) = ctx.role_system {
506 let mut role = Role::new(role_id);
510 if let Some(desc) = description {
511 role = role.with_description(desc);
512 }
513 for perm_id in permissions {
514 if let Some((action, resource)) = ctx.permission_registry.get(perm_id) {
517 role = role.add_permission(Permission::new(action, resource));
518 } else {
519 let parts: Vec<&str> = perm_id.splitn(2, ':').collect();
520 if parts.len() == 2 {
521 role = role.add_permission(Permission::new(parts[0], parts[1]));
522 } else {
523 role = role.add_permission(Permission::new(perm_id.as_str(), "*"));
524 }
525 }
526 }
527 rs.register_role(role).await.map_err(|e| {
528 MigrationError::ExecutionError(format!(
529 "role-system register_role '{}' failed: {}",
530 role_id, e
531 ))
532 })?;
533 if let Some(parent) = parent_role {
534 rs.add_role_inheritance(role_id, parent)
535 .await
536 .map_err(|e| {
537 MigrationError::ExecutionError(format!(
538 "role-system add_role_inheritance '{}' -> '{}' failed: {}",
539 role_id, parent, e
540 ))
541 })?;
542 }
543 tracing::info!(role_id, "Role registered in role-system");
544 }
545
546 let record = serde_json::json!({
548 "op": "create_role",
549 "role_id": role_id,
550 "name": name,
551 "description": description,
552 "permissions": permissions,
553 "parent_role": parent_role,
554 "timestamp": chrono::Utc::now().to_rfc3339(),
555 });
556 append_manifest_record(config, &record).await?;
557
558 Ok(())
559}
560
561async fn execute_create_permission(
563 permission_id: &str,
564 action: &str,
565 resource: &str,
566 conditions: &HashMap<String, String>,
567 ctx: &mut ExecutionContext<'_>,
568) -> Result<(), MigrationError> {
569 let config = ctx.config;
570 log_message(
571 config,
572 &format!(
573 "Creating permission: {} ({}:{})",
574 permission_id, action, resource
575 ),
576 );
577
578 if config.verbose {
579 log_message(config, &format!(" Conditions: {:?}", conditions));
580 }
581
582 ctx.permission_registry.insert(
586 permission_id.to_string(),
587 (action.to_string(), resource.to_string()),
588 );
589
590 let record = serde_json::json!({
592 "op": "create_permission",
593 "permission_id": permission_id,
594 "action": action,
595 "resource": resource,
596 "conditions": conditions,
597 "timestamp": chrono::Utc::now().to_rfc3339(),
598 });
599 append_manifest_record(config, &record).await?;
600
601 Ok(())
602}
603
604async fn execute_assign_user_role(
606 user_id: &str,
607 role_id: &str,
608 expiration: Option<&chrono::DateTime<chrono::Utc>>,
609 ctx: &mut ExecutionContext<'_>,
610) -> Result<(), MigrationError> {
611 let config = ctx.config;
612 log_message(
613 config,
614 &format!("Assigning role {} to user {}", role_id, user_id),
615 );
616
617 if config.verbose {
618 log_message(config, &format!(" Expiration: {:?}", expiration));
619 }
620
621 #[cfg(feature = "enhanced-rbac")]
623 if let Some(rs) = ctx.role_system {
624 let subject = Subject::new(user_id);
625 if let Some(exp) = expiration {
626 let duration = (*exp - chrono::Utc::now()).to_std().ok();
627 rs.elevate_role(&subject, role_id, duration)
628 .await
629 .map_err(|e| {
630 MigrationError::ExecutionError(format!(
631 "role-system elevate_role '{}' for user '{}' failed: {}",
632 role_id, user_id, e
633 ))
634 })?;
635 } else {
636 rs.assign_role(&subject, role_id).await.map_err(|e| {
637 MigrationError::ExecutionError(format!(
638 "role-system assign_role '{}' for user '{}' failed: {}",
639 role_id, user_id, e
640 ))
641 })?;
642 }
643 tracing::info!(user_id, role_id, "Role assigned in role-system");
644 }
645
646 let record = serde_json::json!({
648 "op": "assign_user_role",
649 "user_id": user_id,
650 "role_id": role_id,
651 "expiration": expiration.map(|e| e.to_rfc3339()),
652 "timestamp": chrono::Utc::now().to_rfc3339(),
653 });
654 append_manifest_record(config, &record).await?;
655
656 Ok(())
657}
658
659async fn execute_backup(
661 backup_location: &std::path::Path,
662 backup_type: &super::BackupType,
663 config: &MigrationConfig,
664) -> Result<(), MigrationError> {
665 log_message(
666 config,
667 &format!("Creating {:?} backup at {:?}", backup_type, backup_location),
668 );
669
670 if let Some(parent) = backup_location.parent() {
672 fs::create_dir_all(parent).await?;
673 }
674
675 let backup_data = match backup_type {
677 super::BackupType::Full => create_full_backup(config).await?,
678 super::BackupType::Incremental => create_incremental_backup(config).await?,
679 super::BackupType::ConfigOnly => create_config_backup(config).await?,
680 super::BackupType::DataOnly => create_data_backup(config).await?,
681 };
682
683 fs::write(backup_location, backup_data).await?;
684
685 log_message(
686 config,
687 &format!("Backup created successfully at {:?}", backup_location),
688 );
689
690 Ok(())
691}
692
693async fn execute_integrity_validation(
695 validation_type: &str,
696 parameters: &HashMap<String, String>,
697 config: &MigrationConfig,
698) -> Result<(), MigrationError> {
699 log_message(
700 config,
701 &format!("Executing integrity validation: {}", validation_type),
702 );
703
704 if config.verbose {
705 log_message(config, &format!(" Parameters: {:?}", parameters));
706 }
707
708 match validation_type {
709 "pre_migration_check" => validate_pre_migration_state(config).await,
710 "post_migration_check" => validate_post_migration_state(config).await,
711 "stop_migration" => Ok(()), _ => {
713 log_message(
714 config,
715 &format!("Unknown validation type: {}", validation_type),
716 );
717 Ok(())
718 }
719 }
720}
721
722async fn execute_custom_attribute_migration(
724 attribute_name: &str,
725 conversion_logic: &str,
726 config: &MigrationConfig,
727) -> Result<(), MigrationError> {
728 log_message(
729 config,
730 &format!("Migrating custom attribute: {}", attribute_name),
731 );
732
733 if config.verbose {
734 log_message(config, &format!(" Conversion logic: {}", conversion_logic));
735 }
736
737 let record = serde_json::json!({
740 "op": "migrate_custom_attribute",
741 "attribute_name": attribute_name,
742 "conversion_logic": conversion_logic,
743 "timestamp": chrono::Utc::now().to_rfc3339(),
744 });
745 append_manifest_record(config, &record).await?;
746
747 Ok(())
748}
749
750async fn execute_rollback_for_phase(
752 phase: &super::MigrationPhase,
753 config: &MigrationConfig,
754 result: &mut MigrationResult,
755) -> Result<(), MigrationError> {
756 log_message(
757 config,
758 &format!("Executing rollback for phase: {}", phase.id),
759 );
760
761 let mut ctx = ExecutionContext::new(config);
763 for operation in &phase.rollback_operations {
764 if let Err(e) = execute_operation(operation, &mut ctx, result).await {
765 return Err(MigrationError::RollbackError(format!(
766 "Rollback operation failed: {}",
767 e
768 )));
769 }
770 }
771
772 result.metrics.rollback_count += 1;
773 Ok(())
774}
775
776pub async fn rollback_migration(
778 plan: &MigrationPlan,
779 config: &MigrationConfig,
780) -> Result<MigrationResult, MigrationError> {
781 let started_at = chrono::Utc::now();
782
783 let mut result = MigrationResult {
784 plan_id: plan.id.clone(),
785 status: MigrationStatus::InProgress,
786 started_at,
787 completed_at: None,
788 phases_completed: Vec::new(),
789 current_phase: Some("rollback".to_string()),
790 errors: Vec::new(),
791 warnings: Vec::new(),
792 metrics: MigrationMetrics {
793 roles_migrated: 0,
794 permissions_migrated: 0,
795 users_migrated: 0,
796 errors_encountered: 0,
797 warnings_generated: 0,
798 validation_failures: 0,
799 rollback_count: 0,
800 },
801 };
802
803 log_message(config, "Starting migration rollback");
804
805 let mut ctx = ExecutionContext::new(config);
808 for phase in plan.rollback_plan.phases.iter().rev() {
809 log_message(config, &format!("Executing rollback phase: {}", phase.id));
810
811 for operation in &phase.operations {
812 if let Err(e) = execute_operation(operation, &mut ctx, &mut result).await {
813 result.status = MigrationStatus::Failed;
814 result
815 .errors
816 .push(format!("Rollback operation failed: {}", e));
817 save_migration_status(&result, config).await?;
818 return Ok(result);
819 }
820 }
821
822 result.phases_completed.push(phase.id.clone());
823 }
824
825 result.status = MigrationStatus::RolledBack;
826 result.completed_at = Some(chrono::Utc::now());
827 result.current_phase = None;
828
829 log_message(config, "Migration rollback completed");
830 save_migration_status(&result, config).await?;
831
832 Ok(result)
833}
834
835async fn validate_hierarchy_integrity(config: &MigrationConfig) -> Result<(), MigrationError> {
837 let manifest_path = config.working_directory.join("migration_manifest.jsonl");
841 if !manifest_path.exists() {
842 return Ok(());
844 }
845 let content = fs::read_to_string(&manifest_path).await?;
846 let mut role_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
847 for line in content.lines() {
848 if let Ok(record) = serde_json::from_str::<serde_json::Value>(line)
849 && record.get("op").and_then(|v| v.as_str()) == Some("create_role")
850 && let Some(id) = record.get("role_id").and_then(|v| v.as_str())
851 {
852 if !role_ids.insert(id.to_string()) {
853 return Err(MigrationError::ValidationError(format!(
854 "Duplicate role ID detected in manifest: {}",
855 id
856 )));
857 }
858 if record.get("parent_role").and_then(|v| v.as_str()) == Some(id) {
859 return Err(MigrationError::ValidationError(format!(
860 "Role '{}' references itself as parent",
861 id
862 )));
863 }
864 }
865 }
866 Ok(())
867}
868
869async fn validate_permission_consistency(config: &MigrationConfig) -> Result<(), MigrationError> {
870 let manifest_path = config.working_directory.join("migration_manifest.jsonl");
873 if !manifest_path.exists() {
874 return Ok(());
875 }
876 let content = fs::read_to_string(&manifest_path).await?;
877 let mut defined_perms: std::collections::HashSet<String> = std::collections::HashSet::new();
878 let mut role_perms: Vec<(String, String)> = Vec::new();
879 for line in content.lines() {
880 if let Ok(record) = serde_json::from_str::<serde_json::Value>(line) {
881 match record.get("op").and_then(|v| v.as_str()) {
882 Some("create_permission") => {
883 if let Some(id) = record.get("permission_id").and_then(|v| v.as_str()) {
884 defined_perms.insert(id.to_string());
885 }
886 }
887 Some("create_role") => {
888 if let (Some(role), Some(perms)) = (
889 record.get("role_id").and_then(|v| v.as_str()),
890 record.get("permissions").and_then(|v| v.as_array()),
891 ) {
892 for p in perms {
893 if let Some(ps) = p.as_str() {
894 role_perms.push((role.to_string(), ps.to_string()));
895 }
896 }
897 }
898 }
899 _ => {}
900 }
901 }
902 }
903 for (role, perm) in &role_perms {
904 if !defined_perms.contains(perm) {
905 return Err(MigrationError::ValidationError(format!(
906 "Role '{}' references undefined permission '{}'",
907 role, perm
908 )));
909 }
910 }
911 Ok(())
912}
913
914async fn validate_user_assignments(config: &MigrationConfig) -> Result<(), MigrationError> {
915 let manifest_path = config.working_directory.join("migration_manifest.jsonl");
918 if !manifest_path.exists() {
919 return Ok(());
920 }
921 let content = fs::read_to_string(&manifest_path).await?;
922 let mut defined_roles: std::collections::HashSet<String> = std::collections::HashSet::new();
923 let mut assignments: Vec<(String, String)> = Vec::new();
924 for line in content.lines() {
925 if let Ok(record) = serde_json::from_str::<serde_json::Value>(line) {
926 match record.get("op").and_then(|v| v.as_str()) {
927 Some("create_role") => {
928 if let Some(id) = record.get("role_id").and_then(|v| v.as_str()) {
929 defined_roles.insert(id.to_string());
930 }
931 }
932 Some("assign_user_role") => {
933 if let (Some(uid), Some(rid)) = (
934 record.get("user_id").and_then(|v| v.as_str()),
935 record.get("role_id").and_then(|v| v.as_str()),
936 ) {
937 assignments.push((uid.to_string(), rid.to_string()));
938 }
939 }
940 _ => {}
941 }
942 }
943 }
944 for (user, role) in &assignments {
945 if !defined_roles.contains(role) {
946 return Err(MigrationError::ValidationError(format!(
947 "User '{}' is assigned to undefined role '{}'",
948 user, role
949 )));
950 }
951 }
952 Ok(())
953}
954
955async fn validate_no_privilege_escalation(config: &MigrationConfig) -> Result<(), MigrationError> {
956 let manifest_path = config.working_directory.join("migration_manifest.jsonl");
960 if !manifest_path.exists() {
961 return Ok(());
962 }
963 let content = fs::read_to_string(&manifest_path).await?;
964 let mut parent_map: HashMap<String, String> = HashMap::new();
966 let mut user_roles: HashMap<String, Vec<String>> = HashMap::new();
967 for line in content.lines() {
968 if let Ok(record) = serde_json::from_str::<serde_json::Value>(line) {
969 match record.get("op").and_then(|v| v.as_str()) {
970 Some("create_role") => {
971 if let (Some(id), Some(parent)) = (
972 record.get("role_id").and_then(|v| v.as_str()),
973 record.get("parent_role").and_then(|v| v.as_str()),
974 ) {
975 parent_map.insert(id.to_string(), parent.to_string());
976 }
977 }
978 Some("assign_user_role") => {
979 if let (Some(uid), Some(rid)) = (
980 record.get("user_id").and_then(|v| v.as_str()),
981 record.get("role_id").and_then(|v| v.as_str()),
982 ) {
983 user_roles
984 .entry(uid.to_string())
985 .or_default()
986 .push(rid.to_string());
987 }
988 }
989 _ => {}
990 }
991 }
992 }
993 for (user, roles) in &user_roles {
994 for role in roles {
996 let mut ancestor = parent_map.get(role);
997 while let Some(a) = ancestor {
998 if roles.iter().any(|r| r == a) {
999 log_message(
1000 config,
1001 &format!(
1002 "WARNING: user '{}' is assigned both '{}' and its ancestor '{}'. \
1003 Consider removing the redundant assignment.",
1004 user, role, a
1005 ),
1006 );
1007 break;
1008 }
1009 ancestor = parent_map.get(a);
1010 }
1011 }
1012 }
1013 Ok(())
1014}
1015
1016async fn execute_custom_validation(
1017 validation_name: &str,
1018 _parameters: &HashMap<String, String>,
1019 config: &MigrationConfig,
1020) -> Result<(), MigrationError> {
1021 log_message(
1022 config,
1023 &format!("Executing custom validation: {}", validation_name),
1024 );
1025 Ok(())
1029}
1030
1031async fn validate_pre_migration_state(config: &MigrationConfig) -> Result<(), MigrationError> {
1032 if !config.working_directory.exists() {
1035 return Err(MigrationError::ValidationError(format!(
1036 "Working directory does not exist: {:?}",
1037 config.working_directory
1038 )));
1039 }
1040 if !config.backup_directory.exists() {
1041 return Err(MigrationError::ValidationError(format!(
1042 "Backup directory does not exist: {:?}",
1043 config.backup_directory
1044 )));
1045 }
1046 Ok(())
1047}
1048
1049async fn validate_post_migration_state(config: &MigrationConfig) -> Result<(), MigrationError> {
1050 let manifest_path = config.working_directory.join("migration_manifest.jsonl");
1053 if manifest_path.exists() {
1054 let metadata = fs::metadata(&manifest_path).await?;
1055 if metadata.len() == 0 {
1056 return Err(MigrationError::ValidationError(
1057 "Migration manifest is empty — no operations were recorded".to_string(),
1058 ));
1059 }
1060 }
1061 Ok(())
1062}
1063
1064async fn create_full_backup(config: &MigrationConfig) -> Result<String, MigrationError> {
1066 let working_dir = &config.working_directory;
1068 let mut entries = Vec::new();
1069
1070 if working_dir.exists() {
1071 let mut read_dir = fs::read_dir(working_dir).await?;
1072
1073 while let Ok(Some(entry)) = read_dir.next_entry().await {
1074 if let Ok(content) = fs::read_to_string(entry.path()).await {
1075 entries.push(serde_json::json!({
1076 "path": entry.file_name().to_string_lossy(),
1077 "content": content,
1078 }));
1079 }
1080 }
1081 }
1082
1083 let backup = serde_json::json!({
1084 "backup_type": "full",
1085 "timestamp": chrono::Utc::now().to_rfc3339(),
1086 "working_directory": working_dir.display().to_string(),
1087 "entries": entries,
1088 });
1089
1090 Ok(serde_json::to_string_pretty(&backup)?)
1091}
1092
1093async fn create_incremental_backup(config: &MigrationConfig) -> Result<String, MigrationError> {
1094 let working_dir = &config.working_directory;
1096 let mut entries = Vec::new();
1097
1098 if working_dir.exists() {
1099 let mut read_dir = fs::read_dir(working_dir).await?;
1100
1101 while let Ok(Some(entry)) = read_dir.next_entry().await {
1102 let name = entry.file_name().to_string_lossy().to_string();
1103 if name.ends_with("_status.json") || name.ends_with("_manifest.json") {
1104 if let Ok(content) = fs::read_to_string(entry.path()).await {
1105 entries.push(serde_json::json!({
1106 "path": name,
1107 "content": content,
1108 }));
1109 }
1110 }
1111 }
1112 }
1113
1114 let backup = serde_json::json!({
1115 "backup_type": "incremental",
1116 "timestamp": chrono::Utc::now().to_rfc3339(),
1117 "working_directory": working_dir.display().to_string(),
1118 "entries": entries,
1119 });
1120
1121 Ok(serde_json::to_string_pretty(&backup)?)
1122}
1123
1124async fn create_config_backup(config: &MigrationConfig) -> Result<String, MigrationError> {
1125 let backup = serde_json::json!({
1126 "backup_type": "config",
1127 "timestamp": chrono::Utc::now().to_rfc3339(),
1128 "migration_config": {
1129 "working_directory": config.working_directory.display().to_string(),
1130 "dry_run": config.dry_run,
1131 "verbose": config.verbose,
1132 },
1133 });
1134
1135 Ok(serde_json::to_string_pretty(&backup)?)
1136}
1137
1138async fn create_data_backup(config: &MigrationConfig) -> Result<String, MigrationError> {
1139 let working_dir = &config.working_directory;
1141 let mut entries = Vec::new();
1142
1143 if working_dir.exists() {
1144 let mut read_dir = fs::read_dir(working_dir).await?;
1145
1146 while let Ok(Some(entry)) = read_dir.next_entry().await {
1147 let name = entry.file_name().to_string_lossy().to_string();
1148 if name.ends_with(".json") && !name.ends_with("_status.json") {
1149 if let Ok(content) = fs::read_to_string(entry.path()).await {
1150 entries.push(serde_json::json!({
1151 "path": name,
1152 "content": content,
1153 }));
1154 }
1155 }
1156 }
1157 }
1158
1159 let backup = serde_json::json!({
1160 "backup_type": "data",
1161 "timestamp": chrono::Utc::now().to_rfc3339(),
1162 "working_directory": working_dir.display().to_string(),
1163 "entries": entries,
1164 });
1165
1166 Ok(serde_json::to_string_pretty(&backup)?)
1167}
1168
1169async fn save_migration_status(
1171 result: &MigrationResult,
1172 config: &MigrationConfig,
1173) -> Result<(), MigrationError> {
1174 let status_file = config
1175 .working_directory
1176 .join(format!("{}_status.json", result.plan_id));
1177 let content = serde_json::to_string_pretty(result)?;
1178 fs::write(status_file, content).await?;
1179 Ok(())
1180}
1181
1182fn log_message(config: &MigrationConfig, message: &str) {
1184 if config.verbose {
1185 tracing::info!(message, "migration");
1186 }
1187}
1188
1189async fn append_manifest_record(
1196 config: &MigrationConfig,
1197 record: &serde_json::Value,
1198) -> Result<(), MigrationError> {
1199 use tokio::io::AsyncWriteExt;
1200 let manifest_path = config.working_directory.join("migration_manifest.jsonl");
1201 let mut file = tokio::fs::OpenOptions::new()
1203 .append(true)
1204 .create(true)
1205 .open(&manifest_path)
1206 .await
1207 .map_err(MigrationError::IoError)?;
1208 let mut line = serde_json::to_string(record).map_err(MigrationError::SerializationError)?;
1209 line.push('\n');
1210 file.write_all(line.as_bytes())
1211 .await
1212 .map_err(MigrationError::IoError)?;
1213 Ok(())
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218 use super::*;
1219 use crate::migration::{
1220 LegacySystemAnalysis, LegacySystemType, MigrationPhase, MigrationStrategy, RiskLevel,
1221 };
1222
1223 fn create_test_plan() -> MigrationPlan {
1224 MigrationPlan {
1225 id: "test_plan".to_string(),
1226 source_analysis: LegacySystemAnalysis {
1227 system_type: LegacySystemType::BasicRbac,
1228 role_count: 1,
1229 permission_count: 1,
1230 user_assignment_count: 1,
1231 roles: vec![],
1232 permissions: vec![],
1233 user_assignments: vec![],
1234 hierarchy_depth: 0,
1235 duplicates_found: false,
1236 orphaned_permissions: vec![],
1237 circular_dependencies: vec![],
1238 custom_attributes: std::collections::HashSet::new(),
1239 complexity_score: 3,
1240 recommended_strategy: MigrationStrategy::DirectMapping,
1241 },
1242 strategy: MigrationStrategy::DirectMapping,
1243 phases: vec![MigrationPhase {
1244 id: "test_phase".to_string(),
1245 name: "Test Phase".to_string(),
1246 description: "Test phase".to_string(),
1247 order: 1,
1248 operations: vec![MigrationOperation::CreateRole {
1249 role_id: "test_role".to_string(),
1250 name: "Test Role".to_string(),
1251 description: None,
1252 permissions: vec!["read".to_string()],
1253 parent_role: None,
1254 }],
1255 dependencies: vec![],
1256 estimated_duration: chrono::Duration::minutes(1),
1257 rollback_operations: vec![],
1258 }],
1259 role_mappings: std::collections::HashMap::new(),
1260 permission_mappings: std::collections::HashMap::new(),
1261 user_migrations: vec![],
1262 pre_validation_steps: vec![],
1263 post_validation_steps: vec![],
1264 rollback_plan: super::super::RollbackPlan {
1265 phases: vec![],
1266 backup_locations: vec![],
1267 recovery_time_objective: chrono::Duration::hours(1),
1268 manual_steps: vec![],
1269 },
1270 estimated_duration: chrono::Duration::minutes(30),
1271 risk_level: RiskLevel::Low,
1272 downtime_required: None,
1273 }
1274 }
1275
1276 #[tokio::test]
1277 async fn test_execute_migration_plan_dry_run() {
1278 let plan = create_test_plan();
1279 let config = MigrationConfig {
1280 dry_run: true,
1281 verbose: false, ..Default::default()
1283 };
1284
1285 let result = execute_migration_plan(&plan, &config).await.unwrap();
1286
1287 assert_eq!(result.status, MigrationStatus::Completed);
1288 assert_eq!(result.phases_completed.len(), 1);
1289 assert_eq!(result.metrics.roles_migrated, 1);
1290 }
1291
1292 #[tokio::test]
1293 async fn test_execute_migration_plan_real() {
1294 let plan = create_test_plan();
1295 let config = MigrationConfig {
1296 dry_run: false,
1297 verbose: false, ..Default::default()
1299 };
1300
1301 let result = execute_migration_plan(&plan, &config).await.unwrap();
1302
1303 assert_eq!(result.status, MigrationStatus::Completed);
1304 assert_eq!(result.phases_completed.len(), 1);
1305 assert_eq!(result.metrics.roles_migrated, 1);
1306 }
1307
1308 #[cfg(feature = "enhanced-rbac")]
1312 fn make_role_system() -> AsyncRoleSystem<RoleMemoryStorage> {
1313 use role_system::{RoleSystem, RoleSystemConfig};
1314 AsyncRoleSystem::new(RoleSystem::with_storage(
1315 RoleMemoryStorage::new(),
1316 RoleSystemConfig::default(),
1317 ))
1318 }
1319
1320 #[cfg(feature = "enhanced-rbac")]
1321 #[tokio::test]
1322 async fn test_execute_migration_plan_with_role_system_creates_role() {
1323 let plan = create_test_plan();
1324 let config = MigrationConfig {
1325 dry_run: false,
1326 verbose: false,
1327 ..Default::default()
1328 };
1329 let rs = make_role_system();
1330
1331 let result = execute_migration_plan_with_role_system(&plan, &config, &rs)
1332 .await
1333 .unwrap();
1334
1335 assert_eq!(result.status, MigrationStatus::Completed);
1336 assert_eq!(result.metrics.roles_migrated, 1);
1337 let role = rs.get_role("test_role").await.unwrap();
1339 assert!(role.is_some(), "Expected 'test_role' to be registered");
1340 assert_eq!(role.unwrap().name(), "test_role");
1342 }
1343
1344 #[cfg(feature = "enhanced-rbac")]
1345 #[tokio::test]
1346 async fn test_execute_migration_plan_with_role_system_assigns_user() {
1347 use role_system::Subject;
1348 let mut plan = create_test_plan();
1349 plan.phases[0]
1351 .operations
1352 .push(MigrationOperation::AssignUserRole {
1353 user_id: "user1".to_string(),
1354 role_id: "test_role".to_string(),
1355 expiration: None,
1356 });
1357 let config = MigrationConfig {
1358 dry_run: false,
1359 verbose: false,
1360 ..Default::default()
1361 };
1362 let rs = make_role_system();
1363
1364 let result = execute_migration_plan_with_role_system(&plan, &config, &rs)
1365 .await
1366 .unwrap();
1367
1368 assert_eq!(result.status, MigrationStatus::Completed);
1369 assert_eq!(result.metrics.users_migrated, 1);
1370 let subject = Subject::new("user1");
1372 let roles = rs.get_subject_roles(&subject).await.unwrap();
1373 assert!(
1374 roles.iter().any(|r| r == "test_role"),
1375 "Expected user1 to have test_role"
1376 );
1377 }
1378
1379 #[cfg(feature = "enhanced-rbac")]
1380 #[tokio::test]
1381 async fn test_execute_migration_plan_permission_registry_feeds_create_role() {
1382 let config = MigrationConfig {
1384 dry_run: false,
1385 verbose: false,
1386 ..Default::default()
1387 };
1388 let plan = {
1389 let mut p = create_test_plan();
1390 p.phases[0].operations.insert(
1393 0,
1394 MigrationOperation::CreatePermission {
1395 permission_id: "read_users".to_string(),
1396 action: "read".to_string(),
1397 resource: "users".to_string(),
1398 conditions: Default::default(),
1399 },
1400 );
1401 if let MigrationOperation::CreateRole { permissions, .. } =
1403 &mut p.phases[0].operations[1]
1404 {
1405 *permissions = vec!["read_users".to_string()];
1406 }
1407 p
1408 };
1409 let rs = make_role_system();
1410
1411 let result = execute_migration_plan_with_role_system(&plan, &config, &rs)
1412 .await
1413 .unwrap();
1414
1415 assert_eq!(result.status, MigrationStatus::Completed);
1416 assert_eq!(result.metrics.permissions_migrated, 1);
1417 assert_eq!(result.metrics.roles_migrated, 1);
1418
1419 let role = rs.get_role("test_role").await.unwrap().unwrap();
1421 assert!(
1422 role.has_permission("read", "users", &Default::default()),
1423 "Expected role to have permission read:users"
1424 );
1425 }
1426
1427 #[tokio::test]
1428 async fn test_manifest_only_mode_completes_without_role_system() {
1429 let plan = create_test_plan();
1432 let config = MigrationConfig {
1433 dry_run: false,
1434 verbose: false,
1435 ..Default::default()
1436 };
1437 let result = execute_migration_plan(&plan, &config).await.unwrap();
1438 assert_eq!(result.status, MigrationStatus::Completed);
1439 assert_eq!(result.metrics.roles_migrated, 1);
1440 }
1441
1442 #[tokio::test]
1443 async fn test_create_full_backup_reads_directory() {
1444 let tmp = tempfile::tempdir().unwrap();
1445 tokio::fs::write(tmp.path().join("data.json"), r#"{"key":"val"}"#)
1446 .await
1447 .unwrap();
1448 tokio::fs::write(tmp.path().join("notes.txt"), "hello")
1449 .await
1450 .unwrap();
1451
1452 let config = MigrationConfig {
1453 working_directory: tmp.path().to_path_buf(),
1454 ..Default::default()
1455 };
1456 let backup_str = create_full_backup(&config).await.unwrap();
1457 let backup: serde_json::Value = serde_json::from_str(&backup_str).unwrap();
1458 assert_eq!(backup["backup_type"], "full");
1459 let entries = backup["entries"].as_array().unwrap();
1460 assert_eq!(entries.len(), 2);
1461 }
1462
1463 #[tokio::test]
1464 async fn test_create_incremental_backup_only_status_files() {
1465 let tmp = tempfile::tempdir().unwrap();
1466 tokio::fs::write(tmp.path().join("migration_status.json"), "{}")
1467 .await
1468 .unwrap();
1469 tokio::fs::write(tmp.path().join("v1_manifest.json"), "{}")
1470 .await
1471 .unwrap();
1472 tokio::fs::write(tmp.path().join("data.json"), "{}")
1473 .await
1474 .unwrap();
1475
1476 let config = MigrationConfig {
1477 working_directory: tmp.path().to_path_buf(),
1478 ..Default::default()
1479 };
1480 let backup_str = create_incremental_backup(&config).await.unwrap();
1481 let backup: serde_json::Value = serde_json::from_str(&backup_str).unwrap();
1482 assert_eq!(backup["backup_type"], "incremental");
1483 let entries = backup["entries"].as_array().unwrap();
1484 assert_eq!(entries.len(), 2);
1486 }
1487
1488 #[tokio::test]
1489 async fn test_create_config_backup_serializes_config() {
1490 let config = MigrationConfig {
1491 dry_run: true,
1492 verbose: true,
1493 ..Default::default()
1494 };
1495 let backup_str = create_config_backup(&config).await.unwrap();
1496 let backup: serde_json::Value = serde_json::from_str(&backup_str).unwrap();
1497 assert_eq!(backup["backup_type"], "config");
1498 assert_eq!(backup["migration_config"]["dry_run"], true);
1499 assert_eq!(backup["migration_config"]["verbose"], true);
1500 }
1501
1502 #[tokio::test]
1503 async fn test_create_data_backup_excludes_status_files() {
1504 let tmp = tempfile::tempdir().unwrap();
1505 tokio::fs::write(tmp.path().join("users.json"), "[]")
1506 .await
1507 .unwrap();
1508 tokio::fs::write(tmp.path().join("migration_status.json"), "{}")
1509 .await
1510 .unwrap();
1511
1512 let config = MigrationConfig {
1513 working_directory: tmp.path().to_path_buf(),
1514 ..Default::default()
1515 };
1516 let backup_str = create_data_backup(&config).await.unwrap();
1517 let backup: serde_json::Value = serde_json::from_str(&backup_str).unwrap();
1518 assert_eq!(backup["backup_type"], "data");
1519 let entries = backup["entries"].as_array().unwrap();
1520 assert_eq!(entries.len(), 1);
1522 assert_eq!(entries[0]["path"], "users.json");
1523 }
1524
1525 #[tokio::test]
1526 async fn test_create_full_backup_empty_directory() {
1527 let tmp = tempfile::tempdir().unwrap();
1528 let config = MigrationConfig {
1529 working_directory: tmp.path().to_path_buf(),
1530 ..Default::default()
1531 };
1532 let backup_str = create_full_backup(&config).await.unwrap();
1533 let backup: serde_json::Value = serde_json::from_str(&backup_str).unwrap();
1534 assert_eq!(backup["entries"].as_array().unwrap().len(), 0);
1535 }
1536}