Skip to main content

winterbaume_backup/
handlers.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use http::header::HeaderName;
7use serde_json::{Value, json};
8use winterbaume_core::{
9    BackendState, MockRequest, MockResponse, MockService, StateChangeNotifier, StatefulService,
10    default_account_id, extract_path, extract_query_string, parse_query_string, percent_decode,
11    rest_json_error,
12};
13
14use crate::state::{BackupError, BackupState};
15use crate::views::BackupStateView;
16use crate::wire;
17
18const X_AMZN_ERRORTYPE: HeaderName = HeaderName::from_static("x-amzn-errortype");
19
20pub struct BackupService {
21    pub(crate) state: Arc<BackendState<BackupState>>,
22    pub(crate) notifier: StateChangeNotifier<BackupStateView>,
23}
24
25impl BackupService {
26    pub fn new() -> Self {
27        Self {
28            state: Arc::new(BackendState::new()),
29            notifier: StateChangeNotifier::new(),
30        }
31    }
32}
33
34impl Default for BackupService {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl MockService for BackupService {
41    fn service_name(&self) -> &str {
42        "backup"
43    }
44
45    fn url_patterns(&self) -> Vec<&str> {
46        vec![
47            r"https?://backup\..*\.amazonaws\.com",
48            r"https?://backup\.amazonaws\.com",
49        ]
50    }
51
52    fn handle(
53        &self,
54        request: MockRequest,
55    ) -> Pin<Box<dyn Future<Output = MockResponse> + Send + '_>> {
56        Box::pin(async move { self.dispatch(request).await })
57    }
58}
59
60impl BackupService {
61    async fn dispatch(&self, request: MockRequest) -> MockResponse {
62        let region = winterbaume_core::auth::extract_region_from_uri(&request.uri);
63        let account_id = default_account_id();
64        let state = self.state.get(account_id, &region);
65
66        let path = extract_path(&request.uri);
67        let method = request.method.as_str();
68        // Validate JSON body up-front; typed deserialisers re-parse per operation.
69        if !request.body.is_empty() && serde_json::from_slice::<Value>(&request.body).is_err() {
70            return rest_json_error(400, "BadRequestException", "Invalid JSON body");
71        }
72        let query = parse_query_string(extract_query_string(&request.uri));
73
74        let segments: Vec<&str> = path.trim_start_matches('/').split('/').collect();
75
76        let response = match (method, segments.as_slice()) {
77            // PUT /backup-vaults/{name} - CreateBackupVault
78            ("PUT", ["backup-vaults", name]) if !name.is_empty() => {
79                let vault_name = percent_decode(name);
80                self.handle_create_backup_vault(
81                    &state,
82                    &vault_name,
83                    &region,
84                    account_id,
85                    &request,
86                    &query,
87                )
88                .await
89            }
90            // GET /backup-vaults/{name} - DescribeBackupVault
91            ("GET", ["backup-vaults", name]) if !name.is_empty() => {
92                let vault_name = percent_decode(name);
93                self.handle_describe_backup_vault(&state, &vault_name).await
94            }
95            // DELETE /backup-vaults/{name} - DeleteBackupVault
96            ("DELETE", ["backup-vaults", name]) if !name.is_empty() => {
97                let vault_name = percent_decode(name);
98                self.handle_delete_backup_vault(&state, &vault_name).await
99            }
100            // GET /backup-vaults - ListBackupVaults
101            ("GET", ["backup-vaults"]) => self.handle_list_backup_vaults(&state).await,
102
103            // PUT /backup/plans - CreateBackupPlan
104            ("PUT", ["backup", "plans"]) => {
105                self.handle_create_backup_plan(&state, &request, &query, &region, account_id)
106                    .await
107            }
108            // GET /backup/plans/{BackupPlanId} - GetBackupPlan
109            ("GET", ["backup", "plans", plan_id]) if !plan_id.is_empty() => {
110                let plan_id = percent_decode(plan_id);
111                self.handle_get_backup_plan(&state, &plan_id).await
112            }
113            // DELETE /backup/plans/{BackupPlanId} - DeleteBackupPlan
114            ("DELETE", ["backup", "plans", plan_id]) if !plan_id.is_empty() => {
115                let plan_id = percent_decode(plan_id);
116                self.handle_delete_backup_plan(&state, &plan_id).await
117            }
118            // GET /backup/plans - ListBackupPlans
119            ("GET", ["backup", "plans"]) => self.handle_list_backup_plans(&state).await,
120
121            // POST /audit/report-plans - CreateReportPlan
122            ("POST", ["audit", "report-plans"]) => {
123                self.handle_create_report_plan(&state, &request, &query, &region, account_id)
124                    .await
125            }
126            // GET /audit/report-plans/{ReportPlanName} - DescribeReportPlan
127            ("GET", ["audit", "report-plans", name]) if !name.is_empty() => {
128                let name = percent_decode(name);
129                self.handle_describe_report_plan(&state, &name).await
130            }
131            // DELETE /audit/report-plans/{ReportPlanName} - DeleteReportPlan
132            ("DELETE", ["audit", "report-plans", name]) if !name.is_empty() => {
133                let name = percent_decode(name);
134                self.handle_delete_report_plan(&state, &name).await
135            }
136            // GET /audit/report-plans - ListReportPlans
137            ("GET", ["audit", "report-plans"]) => self.handle_list_report_plans(&state).await,
138
139            // PUT /backup-vaults/{BackupVaultName}/vault-lock - PutBackupVaultLockConfiguration
140            ("PUT", ["backup-vaults", name, "vault-lock"]) if !name.is_empty() => {
141                let vault_name = percent_decode(name);
142                self.handle_put_backup_vault_lock_configuration(
143                    &state,
144                    &vault_name,
145                    &request,
146                    &query,
147                )
148                .await
149            }
150            // DELETE /backup-vaults/{BackupVaultName}/vault-lock - DeleteBackupVaultLockConfiguration
151            ("DELETE", ["backup-vaults", name, "vault-lock"]) if !name.is_empty() => {
152                let vault_name = percent_decode(name);
153                self.handle_delete_backup_vault_lock_configuration(&state, &vault_name)
154                    .await
155            }
156
157            // GET /tags/{ResourceArn} - ListTags
158            ("GET", ["tags", rest @ ..]) if !rest.is_empty() => {
159                // The resource ARN is URL-encoded and may contain slashes
160                let resource_arn = percent_decode(&segments[1..].join("/"));
161                self.handle_list_tags(&state, &resource_arn).await
162            }
163            // POST /tags/{ResourceArn} - TagResource
164            ("POST", ["tags", rest @ ..]) if !rest.is_empty() => {
165                let resource_arn = percent_decode(&segments[1..].join("/"));
166                self.handle_tag_resource(&state, &resource_arn, &request, &query)
167                    .await
168            }
169            // POST /untag/{ResourceArn} - UntagResource
170            ("POST", ["untag", rest @ ..]) if !rest.is_empty() => {
171                let resource_arn = percent_decode(&segments[1..].join("/"));
172                self.handle_untag_resource(&state, &resource_arn, &request, &query)
173                    .await
174            }
175
176            // PUT /backup-vaults/{BackupVaultName}/mpaApprovalTeam => AssociateBackupVaultMpaApprovalTeam
177            ("PUT", ["backup-vaults", name, "mpaApprovalTeam"]) if !name.is_empty() => {
178                self.handle_associate_backup_vault_mpa_approval_team().await
179            }
180            // DELETE /legal-holds/{LegalHoldId} => CancelLegalHold
181            ("DELETE", ["legal-holds", id]) => {
182                let hold_id = percent_decode(id);
183                self.handle_cancel_legal_hold(&state, &hold_id).await
184            }
185            // PUT /backup/plans/{BackupPlanId}/selections => CreateBackupSelection
186            ("PUT", ["backup", "plans", plan_id, "selections"]) => {
187                let plan_id = percent_decode(plan_id);
188                self.handle_create_backup_selection(&state, &plan_id, &request, &query)
189                    .await
190            }
191            // POST /audit/frameworks => CreateFramework
192            ("POST", ["audit", "frameworks"]) => {
193                self.handle_create_framework(&state, &request, &query, &region, account_id)
194                    .await
195            }
196            // POST /legal-holds => CreateLegalHold
197            ("POST", ["legal-holds"]) => {
198                self.handle_create_legal_hold(&state, &request, &query, &region, account_id)
199                    .await
200            }
201            // PUT /logically-air-gapped-backup-vaults/{BackupVaultName} => CreateLogicallyAirGappedBackupVault
202            ("PUT", ["logically-air-gapped-backup-vaults", name]) => {
203                let vault_name = percent_decode(name);
204                self.handle_create_logically_air_gapped_backup_vault(
205                    &state,
206                    &vault_name,
207                    &region,
208                    account_id,
209                    &request,
210                    &query,
211                )
212                .await
213            }
214            // PUT /restore-access-backup-vaults => CreateRestoreAccessBackupVault
215            ("PUT", ["restore-access-backup-vaults"]) => {
216                self.handle_create_restore_access_backup_vault(
217                    &state, &request, &query, &region, account_id,
218                )
219                .await
220            }
221            // PUT /restore-testing/plans => CreateRestoreTestingPlan
222            ("PUT", ["restore-testing", "plans"]) => {
223                self.handle_create_restore_testing_plan(
224                    &state, &request, &query, &region, account_id,
225                )
226                .await
227            }
228            // PUT /restore-testing/plans/{RestoreTestingPlanName}/selections => CreateRestoreTestingSelection
229            ("PUT", ["restore-testing", "plans", plan_name, "selections"]) => {
230                let plan_name = percent_decode(plan_name);
231                self.handle_create_restore_testing_selection(&state, &plan_name, &request, &query)
232                    .await
233            }
234            // PUT /tiering-configurations => CreateTieringConfiguration
235            ("PUT", ["tiering-configurations"]) => {
236                self.handle_create_tiering_configuration(
237                    &state, &request, &query, &region, account_id,
238                )
239                .await
240            }
241            // DELETE /backup/plans/{BackupPlanId}/selections/{SelectionId} => DeleteBackupSelection
242            ("DELETE", ["backup", "plans", plan_id, "selections", sel_id]) => {
243                let plan_id = percent_decode(plan_id);
244                let sel_id = percent_decode(sel_id);
245                self.handle_delete_backup_selection(&state, &plan_id, &sel_id)
246                    .await
247            }
248            // DELETE /backup-vaults/{BackupVaultName}/access-policy => DeleteBackupVaultAccessPolicy
249            ("DELETE", ["backup-vaults", name, "access-policy"]) => {
250                let vault_name = percent_decode(name);
251                self.handle_delete_backup_vault_access_policy(&state, &vault_name)
252                    .await
253            }
254            // DELETE /backup-vaults/{BackupVaultName}/notification-configuration => DeleteBackupVaultNotifications
255            ("DELETE", ["backup-vaults", name, "notification-configuration"]) => {
256                let vault_name = percent_decode(name);
257                self.handle_delete_backup_vault_notifications(&state, &vault_name)
258                    .await
259            }
260            // DELETE /audit/frameworks/{FrameworkName} => DeleteFramework
261            ("DELETE", ["audit", "frameworks", name]) => {
262                let fw_name = percent_decode(name);
263                self.handle_delete_framework(&state, &fw_name).await
264            }
265            // DELETE /backup-vaults/{BackupVaultName}/recovery-points/{RecoveryPointArn} => DeleteRecoveryPoint
266            ("DELETE", ["backup-vaults", vault, "recovery-points", rp]) => {
267                let vault_name = percent_decode(vault);
268                let rp_arn = percent_decode(rp);
269                self.handle_delete_recovery_point(&state, &vault_name, &rp_arn)
270                    .await
271            }
272            // DELETE /restore-testing/plans/{RestoreTestingPlanName} => DeleteRestoreTestingPlan
273            ("DELETE", ["restore-testing", "plans", plan_name]) => {
274                let plan_name = percent_decode(plan_name);
275                self.handle_delete_restore_testing_plan(&state, &plan_name)
276                    .await
277            }
278            // DELETE /restore-testing/plans/{RestoreTestingPlanName}/selections/{RestoreTestingSelectionName} => DeleteRestoreTestingSelection
279            (
280                "DELETE",
281                [
282                    "restore-testing",
283                    "plans",
284                    plan_name,
285                    "selections",
286                    sel_name,
287                ],
288            ) => {
289                let plan_name = percent_decode(plan_name);
290                let sel_name = percent_decode(sel_name);
291                self.handle_delete_restore_testing_selection(&state, &plan_name, &sel_name)
292                    .await
293            }
294            // DELETE /tiering-configurations/{TieringConfigurationName} => DeleteTieringConfiguration
295            ("DELETE", ["tiering-configurations", name]) => {
296                let config_name = percent_decode(name);
297                self.handle_delete_tiering_configuration(&state, &config_name)
298                    .await
299            }
300            // GET /backup-jobs/{BackupJobId} => DescribeBackupJob
301            ("GET", ["backup-jobs", id]) => {
302                let job_id = percent_decode(id);
303                self.handle_describe_backup_job(&state, &job_id).await
304            }
305            // GET /copy-jobs/{CopyJobId} => DescribeCopyJob
306            ("GET", ["copy-jobs", id]) => {
307                let copy_job_id = percent_decode(id);
308                self.handle_describe_copy_job(&state, &copy_job_id).await
309            }
310            // GET /audit/frameworks/{FrameworkName} => DescribeFramework
311            ("GET", ["audit", "frameworks", name]) => {
312                let fw_name = percent_decode(name);
313                self.handle_describe_framework(&state, &fw_name).await
314            }
315            // GET /global-settings => DescribeGlobalSettings
316            ("GET", ["global-settings"]) => self.handle_describe_global_settings(&state).await,
317            // GET /resources/{ResourceArn} => DescribeProtectedResource
318            ("GET", ["resources", rest @ ..]) if rest.len() == 1 => {
319                let resource_arn = percent_decode(&segments[1..].join("/"));
320                self.handle_describe_protected_resource(&state, &resource_arn)
321                    .await
322            }
323            // GET /backup-vaults/{BackupVaultName}/recovery-points/{RecoveryPointArn} => DescribeRecoveryPoint
324            ("GET", ["backup-vaults", vault, "recovery-points", rp]) => {
325                let vault_name = percent_decode(vault);
326                let rp_arn = percent_decode(rp);
327                self.handle_describe_recovery_point(&state, &vault_name, &rp_arn)
328                    .await
329            }
330            // GET /account-settings => DescribeRegionSettings
331            ("GET", ["account-settings"]) => self.handle_describe_region_settings(&state).await,
332            // GET /audit/report-jobs/{ReportJobId} => DescribeReportJob
333            ("GET", ["audit", "report-jobs", id]) => {
334                let job_id = percent_decode(id);
335                self.handle_describe_report_job(&state, &job_id).await
336            }
337            // GET /restore-jobs/{RestoreJobId} => DescribeRestoreJob
338            ("GET", ["restore-jobs", id]) => {
339                let job_id = percent_decode(id);
340                self.handle_describe_restore_job(&state, &job_id).await
341            }
342            // GET /scan/jobs/{ScanJobId} => DescribeScanJob
343            ("GET", ["scan", "jobs", id]) => {
344                let scan_job_id = percent_decode(id);
345                self.handle_describe_scan_job(&state, &scan_job_id).await
346            }
347            // POST /backup-vaults/{BackupVaultName}/mpaApprovalTeam?delete => DisassociateBackupVaultMpaApprovalTeam
348            ("POST", ["backup-vaults", _name, "mpaApprovalTeam"]) => {
349                self.handle_disassociate_backup_vault_mpa_approval_team()
350                    .await
351            }
352            // POST /backup-vaults/{BackupVaultName}/recovery-points/{RecoveryPointArn}/disassociate => DisassociateRecoveryPoint
353            (
354                "POST",
355                [
356                    "backup-vaults",
357                    _vault,
358                    "recovery-points",
359                    _rp,
360                    "disassociate",
361                ],
362            ) => self.handle_disassociate_recovery_point().await,
363            // DELETE /backup-vaults/{BackupVaultName}/recovery-points/{RecoveryPointArn}/parentAssociation => DisassociateRecoveryPointFromParent
364            (
365                "DELETE",
366                [
367                    "backup-vaults",
368                    _vault,
369                    "recovery-points",
370                    _rp,
371                    "parentAssociation",
372                ],
373            ) => self.handle_disassociate_recovery_point_from_parent().await,
374            // GET /backup/plans/{BackupPlanId}/toTemplate => ExportBackupPlanTemplate
375            ("GET", ["backup", "plans", id, "toTemplate"]) => {
376                let plan_id = percent_decode(id);
377                self.handle_export_backup_plan_template(&state, &plan_id)
378                    .await
379            }
380            // POST /backup/template/json/toPlan => GetBackupPlanFromJSON
381            ("POST", ["backup", "template", "json", "toPlan"]) => {
382                self.handle_get_backup_plan_from_j_s_o_n(&request, &query)
383                    .await
384            }
385            // GET /backup/template/plans/{BackupPlanTemplateId}/toPlan => GetBackupPlanFromTemplate
386            ("GET", ["backup", "template", "plans", _id, "toPlan"]) => {
387                self.handle_get_backup_plan_from_template().await
388            }
389            // GET /backup/plans/{BackupPlanId}/selections/{SelectionId} => GetBackupSelection
390            ("GET", ["backup", "plans", plan_id, "selections", sel_id]) => {
391                let plan_id = percent_decode(plan_id);
392                let sel_id = percent_decode(sel_id);
393                self.handle_get_backup_selection(&state, &plan_id, &sel_id)
394                    .await
395            }
396            // GET /backup-vaults/{BackupVaultName}/access-policy => GetBackupVaultAccessPolicy
397            ("GET", ["backup-vaults", name, "access-policy"]) => {
398                let vault_name = percent_decode(name);
399                self.handle_get_backup_vault_access_policy(&state, &vault_name)
400                    .await
401            }
402            // GET /backup-vaults/{BackupVaultName}/notification-configuration => GetBackupVaultNotifications
403            ("GET", ["backup-vaults", name, "notification-configuration"]) => {
404                let vault_name = percent_decode(name);
405                self.handle_get_backup_vault_notifications(&state, &vault_name)
406                    .await
407            }
408            // GET /legal-holds/{LegalHoldId} => GetLegalHold
409            ("GET", ["legal-holds", id]) => {
410                let hold_id = percent_decode(id);
411                self.handle_get_legal_hold(&state, &hold_id).await
412            }
413            // GET /backup-vaults/{BackupVaultName}/recovery-points/{RecoveryPointArn}/index => GetRecoveryPointIndexDetails
414            ("GET", ["backup-vaults", vault, "recovery-points", rp, "index"]) => {
415                let vault_name = percent_decode(vault);
416                let rp_arn = percent_decode(rp);
417                self.handle_get_recovery_point_index_details(&state, &vault_name, &rp_arn)
418                    .await
419            }
420            // GET /backup-vaults/{BackupVaultName}/recovery-points/{RecoveryPointArn}/restore-metadata => GetRecoveryPointRestoreMetadata
421            (
422                "GET",
423                [
424                    "backup-vaults",
425                    vault,
426                    "recovery-points",
427                    rp,
428                    "restore-metadata",
429                ],
430            ) => {
431                let vault_name = percent_decode(vault);
432                let rp_arn = percent_decode(rp);
433                self.handle_get_recovery_point_restore_metadata(&state, &vault_name, &rp_arn)
434                    .await
435            }
436            // GET /restore-jobs/{RestoreJobId}/metadata => GetRestoreJobMetadata
437            ("GET", ["restore-jobs", id, "metadata"]) => {
438                let job_id = percent_decode(id);
439                self.handle_get_restore_job_metadata(&state, &job_id).await
440            }
441            // GET /restore-testing/inferred-metadata => GetRestoreTestingInferredMetadata
442            ("GET", ["restore-testing", "inferred-metadata"]) => {
443                self.handle_get_restore_testing_inferred_metadata().await
444            }
445            // GET /restore-testing/plans/{RestoreTestingPlanName} => GetRestoreTestingPlan
446            ("GET", ["restore-testing", "plans", name]) => {
447                let plan_name = percent_decode(name);
448                self.handle_get_restore_testing_plan(&state, &plan_name)
449                    .await
450            }
451            // GET /restore-testing/plans/{RestoreTestingPlanName}/selections/{RestoreTestingSelectionName} => GetRestoreTestingSelection
452            (
453                "GET",
454                [
455                    "restore-testing",
456                    "plans",
457                    plan_name,
458                    "selections",
459                    sel_name,
460                ],
461            ) => {
462                let plan_name = percent_decode(plan_name);
463                let sel_name = percent_decode(sel_name);
464                self.handle_get_restore_testing_selection(&state, &plan_name, &sel_name)
465                    .await
466            }
467            // GET /supported-resource-types => GetSupportedResourceTypes
468            ("GET", ["supported-resource-types"]) => {
469                self.handle_get_supported_resource_types().await
470            }
471            // GET /tiering-configurations/{TieringConfigurationName} => GetTieringConfiguration
472            ("GET", ["tiering-configurations", name]) => {
473                let config_name = percent_decode(name);
474                self.handle_get_tiering_configuration(&state, &config_name)
475                    .await
476            }
477            // GET /audit/backup-job-summaries => ListBackupJobSummaries
478            ("GET", ["audit", "backup-job-summaries"]) => {
479                self.handle_list_backup_job_summaries(&state, account_id, &region)
480                    .await
481            }
482            // GET /backup-jobs => ListBackupJobs
483            ("GET", ["backup-jobs"]) => self.handle_list_backup_jobs(&state).await,
484            // GET /backup/template/plans => ListBackupPlanTemplates
485            ("GET", ["backup", "template", "plans"]) => {
486                self.handle_list_backup_plan_templates(&state).await
487            }
488            // GET /backup/plans/{BackupPlanId}/versions => ListBackupPlanVersions
489            ("GET", ["backup", "plans", id, "versions"]) => {
490                let plan_id = percent_decode(id);
491                self.handle_list_backup_plan_versions(&state, &plan_id)
492                    .await
493            }
494            // GET /backup/plans/{BackupPlanId}/selections => ListBackupSelections
495            ("GET", ["backup", "plans", plan_id, "selections"]) => {
496                let plan_id = percent_decode(plan_id);
497                self.handle_list_backup_selections(&state, &plan_id).await
498            }
499            // GET /audit/copy-job-summaries => ListCopyJobSummaries
500            ("GET", ["audit", "copy-job-summaries"]) => {
501                self.handle_list_copy_job_summaries(&state).await
502            }
503            // GET /copy-jobs => ListCopyJobs
504            ("GET", ["copy-jobs"]) => self.handle_list_copy_jobs(&state).await,
505            // GET /audit/frameworks => ListFrameworks
506            ("GET", ["audit", "frameworks"]) => self.handle_list_frameworks(&state).await,
507            // GET /indexes/recovery-point => ListIndexedRecoveryPoints
508            ("GET", ["indexes", "recovery-point"]) => {
509                self.handle_list_indexed_recovery_points(&state).await
510            }
511            // GET /legal-holds => ListLegalHolds
512            ("GET", ["legal-holds"]) => self.handle_list_legal_holds(&state).await,
513            // GET /resources => ListProtectedResources
514            ("GET", ["resources"]) => self.handle_list_protected_resources(&state).await,
515            // GET /backup-vaults/{BackupVaultName}/resources => ListProtectedResourcesByBackupVault
516            ("GET", ["backup-vaults", name, "resources"]) => {
517                let vault_name = percent_decode(name);
518                self.handle_list_protected_resources_by_backup_vault(&state, &vault_name)
519                    .await
520            }
521            // GET /backup-vaults/{BackupVaultName}/recovery-points => ListRecoveryPointsByBackupVault
522            ("GET", ["backup-vaults", name, "recovery-points"]) => {
523                let vault_name = percent_decode(name);
524                self.handle_list_recovery_points_by_backup_vault(&state, &vault_name)
525                    .await
526            }
527            // GET /legal-holds/{LegalHoldId}/recovery-points => ListRecoveryPointsByLegalHold
528            ("GET", ["legal-holds", id, "recovery-points"]) => {
529                let hold_id = percent_decode(id);
530                self.handle_list_recovery_points_by_legal_hold(&state, &hold_id)
531                    .await
532            }
533            // GET /resources/{ResourceArn}/recovery-points => ListRecoveryPointsByResource
534            ("GET", ["resources", rest @ ..])
535                if rest.len() >= 2 && rest.last() == Some(&"recovery-points") =>
536            {
537                // Resource ARN is everything between /resources/ and /recovery-points
538                let resource_segments = &segments[1..segments.len() - 1];
539                let resource_arn = percent_decode(&resource_segments.join("/"));
540                self.handle_list_recovery_points_by_resource(&state, &resource_arn)
541                    .await
542            }
543            // GET /audit/report-jobs => ListReportJobs
544            ("GET", ["audit", "report-jobs"]) => {
545                self.handle_list_report_jobs(&state, &request.uri).await
546            }
547            // GET /logically-air-gapped-backup-vaults/{BackupVaultName}/restore-access-backup-vaults => ListRestoreAccessBackupVaults
548            (
549                "GET",
550                [
551                    "logically-air-gapped-backup-vaults",
552                    _name,
553                    "restore-access-backup-vaults",
554                ],
555            ) => self.handle_list_restore_access_backup_vaults().await,
556            // GET /audit/restore-job-summaries => ListRestoreJobSummaries
557            ("GET", ["audit", "restore-job-summaries"]) => {
558                self.handle_list_restore_job_summaries(&state, account_id, &region)
559                    .await
560            }
561            // GET /restore-jobs => ListRestoreJobs
562            ("GET", ["restore-jobs"]) => self.handle_list_restore_jobs(&state).await,
563            // GET /resources/{ResourceArn}/restore-jobs => ListRestoreJobsByProtectedResource
564            ("GET", ["resources", rest @ ..])
565                if rest.len() >= 2 && rest.last() == Some(&"restore-jobs") =>
566            {
567                let resource_segments = &segments[1..segments.len() - 1];
568                let resource_arn = percent_decode(&resource_segments.join("/"));
569                self.handle_list_restore_jobs_by_protected_resource(&state, &resource_arn)
570                    .await
571            }
572            // GET /restore-testing/plans => ListRestoreTestingPlans
573            ("GET", ["restore-testing", "plans"]) => {
574                self.handle_list_restore_testing_plans(&state).await
575            }
576            // GET /restore-testing/plans/{RestoreTestingPlanName}/selections => ListRestoreTestingSelections
577            ("GET", ["restore-testing", "plans", plan_name, "selections"]) => {
578                let plan_name = percent_decode(plan_name);
579                self.handle_list_restore_testing_selections(&state, &plan_name)
580                    .await
581            }
582            // GET /audit/scan-job-summaries => ListScanJobSummaries
583            ("GET", ["audit", "scan-job-summaries"]) => {
584                self.handle_list_scan_job_summaries(&state).await
585            }
586            // GET /scan/jobs => ListScanJobs
587            ("GET", ["scan", "jobs"]) => self.handle_list_scan_jobs(&state).await,
588            // GET /tiering-configurations => ListTieringConfigurations
589            ("GET", ["tiering-configurations"]) => {
590                self.handle_list_tiering_configurations(&state).await
591            }
592            // PUT /backup-vaults/{BackupVaultName}/access-policy => PutBackupVaultAccessPolicy
593            ("PUT", ["backup-vaults", name, "access-policy"]) => {
594                let vault_name = percent_decode(name);
595                self.handle_put_backup_vault_access_policy(
596                    &state,
597                    &vault_name,
598                    &region,
599                    account_id,
600                    &request,
601                    &query,
602                )
603                .await
604            }
605            // PUT /backup-vaults/{BackupVaultName}/notification-configuration => PutBackupVaultNotifications
606            ("PUT", ["backup-vaults", name, "notification-configuration"]) => {
607                let vault_name = percent_decode(name);
608                self.handle_put_backup_vault_notifications(
609                    &state,
610                    &vault_name,
611                    &region,
612                    account_id,
613                    &request,
614                    &query,
615                )
616                .await
617            }
618            // PUT /restore-jobs/{RestoreJobId}/validations => PutRestoreValidationResult
619            ("PUT", ["restore-jobs", id, "validations"]) => {
620                let job_id = percent_decode(id);
621                self.handle_put_restore_validation_result(&state, &job_id, &request, &query)
622                    .await
623            }
624            // DELETE /logically-air-gapped-backup-vaults/{BackupVaultName}/restore-access-backup-vaults/{RestoreAccessBackupVaultArn} => RevokeRestoreAccessBackupVault
625            (
626                "DELETE",
627                [
628                    "logically-air-gapped-backup-vaults",
629                    _vault,
630                    "restore-access-backup-vaults",
631                    _arn,
632                ],
633            ) => self.handle_revoke_restore_access_backup_vault().await, // No state needed - passthrough
634            // PUT /backup-jobs => StartBackupJob
635            ("PUT", ["backup-jobs"]) => {
636                self.handle_start_backup_job(&state, &request, &query, &region, account_id)
637                    .await
638            }
639            // PUT /copy-jobs => StartCopyJob
640            ("PUT", ["copy-jobs"]) => {
641                self.handle_start_copy_job(&state, &request, &query, &region, account_id)
642                    .await
643            }
644            // POST /audit/report-jobs/{ReportPlanName} => StartReportJob
645            ("POST", ["audit", "report-jobs", name]) => {
646                let plan_name = percent_decode(name);
647                self.handle_start_report_job(&state, &plan_name, &region, account_id)
648                    .await
649            }
650            // PUT /restore-jobs => StartRestoreJob
651            ("PUT", ["restore-jobs"]) => {
652                self.handle_start_restore_job(&state, &request, &query, account_id)
653                    .await
654            }
655            // PUT /scan/job => StartScanJob
656            ("PUT", ["scan", "job"]) => {
657                self.handle_start_scan_job(&state, &request, &query, &region, account_id)
658                    .await
659            }
660            // POST /backup-jobs/{BackupJobId} => StopBackupJob
661            ("POST", ["backup-jobs", id]) => {
662                let job_id = percent_decode(id);
663                self.handle_stop_backup_job(&state, &job_id).await
664            }
665            // POST /backup/plans/{BackupPlanId} => UpdateBackupPlan
666            ("POST", ["backup", "plans", id]) => {
667                let plan_id = percent_decode(id);
668                self.handle_update_backup_plan(&state, &plan_id, &request, &query)
669                    .await
670            }
671            // PUT /audit/frameworks/{FrameworkName} => UpdateFramework
672            ("PUT", ["audit", "frameworks", name]) => {
673                let fw_name = percent_decode(name);
674                self.handle_update_framework(&state, &fw_name, &request, &query)
675                    .await
676            }
677            // PUT /global-settings => UpdateGlobalSettings
678            ("PUT", ["global-settings"]) => {
679                self.handle_update_global_settings(&state, &request, &query)
680                    .await
681            }
682            // POST /backup-vaults/{BackupVaultName}/recovery-points/{RecoveryPointArn}/index => UpdateRecoveryPointIndexSettings
683            ("POST", ["backup-vaults", vault, "recovery-points", rp, "index"]) => {
684                let vault_name = percent_decode(vault);
685                let rp_arn = percent_decode(rp);
686                self.handle_update_recovery_point_index_settings(
687                    &state,
688                    &vault_name,
689                    &rp_arn,
690                    &request,
691                    &query,
692                )
693                .await
694            }
695            // POST /backup-vaults/{BackupVaultName}/recovery-points/{RecoveryPointArn} => UpdateRecoveryPointLifecycle
696            ("POST", ["backup-vaults", vault, "recovery-points", rp]) => {
697                let vault_name = percent_decode(vault);
698                let rp_arn = percent_decode(rp);
699                self.handle_update_recovery_point_lifecycle(
700                    &state,
701                    &vault_name,
702                    &rp_arn,
703                    &request,
704                    &query,
705                )
706                .await
707            }
708            // PUT /account-settings => UpdateRegionSettings
709            ("PUT", ["account-settings"]) => {
710                self.handle_update_region_settings(&state, &request, &query)
711                    .await
712            }
713            // PUT /audit/report-plans/{ReportPlanName} => UpdateReportPlan
714            ("PUT", ["audit", "report-plans", name]) => {
715                let plan_name = percent_decode(name);
716                self.handle_update_report_plan(&state, &plan_name, &request, &query)
717                    .await
718            }
719            // PUT /restore-testing/plans/{RestoreTestingPlanName} => UpdateRestoreTestingPlan
720            ("PUT", ["restore-testing", "plans", name]) => {
721                let plan_name = percent_decode(name);
722                self.handle_update_restore_testing_plan(&state, &plan_name, &request, &query)
723                    .await
724            }
725            // PUT /restore-testing/plans/{RestoreTestingPlanName}/selections/{RestoreTestingSelectionName} => UpdateRestoreTestingSelection
726            (
727                "PUT",
728                [
729                    "restore-testing",
730                    "plans",
731                    plan_name,
732                    "selections",
733                    sel_name,
734                ],
735            ) => {
736                let plan_name = percent_decode(plan_name);
737                let sel_name = percent_decode(sel_name);
738                self.handle_update_restore_testing_selection(
739                    &state, &plan_name, &sel_name, &request, &query,
740                )
741                .await
742            }
743            // PUT /tiering-configurations/{TieringConfigurationName} => UpdateTieringConfiguration
744            ("PUT", ["tiering-configurations", name]) => {
745                let config_name = percent_decode(name);
746                self.handle_update_tiering_configuration(&state, &config_name, &request, &query)
747                    .await
748            }
749            _ => rest_json_error(404, "UnknownOperationException", "Not found"),
750        };
751
752        if matches!(method, "PUT" | "POST" | "DELETE") && response.status / 100 == 2 {
753            self.notify_state_changed(account_id, &region).await;
754        }
755        response
756    }
757
758    async fn handle_create_backup_vault(
759        &self,
760        state: &Arc<tokio::sync::RwLock<BackupState>>,
761        vault_name: &str,
762        region: &str,
763        account_id: &str,
764        request: &MockRequest,
765        query: &HashMap<String, String>,
766    ) -> MockResponse {
767        let labels: &[(&str, &str)] = &[("BackupVaultName", vault_name)];
768        let input = match wire::deserialize_create_backup_vault_request(request, labels, query) {
769            Ok(v) => v,
770            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
771        };
772        let arn = format!("arn:aws:backup:{region}:{account_id}:backup-vault:{vault_name}");
773
774        let tags: HashMap<String, String> = input.backup_vault_tags.unwrap_or_default();
775
776        let mut state = state.write().await;
777        match state.create_backup_vault(vault_name, &arn, tags) {
778            Ok(vault) => {
779                let creation_epoch = vault.creation_date.timestamp() as f64
780                    + (vault.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
781                wire::serialize_create_backup_vault_response(&wire::CreateBackupVaultOutput {
782                    backup_vault_name: Some(vault.backup_vault_name.clone()),
783                    backup_vault_arn: Some(vault.backup_vault_arn.clone()),
784                    creation_date: Some(creation_epoch),
785                    ..Default::default()
786                })
787            }
788            Err(e) => backup_error_response(&e),
789        }
790    }
791
792    async fn handle_describe_backup_vault(
793        &self,
794        state: &Arc<tokio::sync::RwLock<BackupState>>,
795        vault_name: &str,
796    ) -> MockResponse {
797        let state = state.read().await;
798        match state.describe_backup_vault(vault_name) {
799            Ok(vault) => {
800                let creation_epoch = vault.creation_date.timestamp() as f64
801                    + (vault.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
802                wire::serialize_describe_backup_vault_response(&wire::DescribeBackupVaultOutput {
803                    backup_vault_name: Some(vault.backup_vault_name.clone()),
804                    backup_vault_arn: Some(vault.backup_vault_arn.clone()),
805                    creation_date: Some(creation_epoch),
806                    number_of_recovery_points: Some(vault.number_of_recovery_points),
807                    locked: Some(vault.locked),
808                    min_retention_days: vault.min_retention_days,
809                    max_retention_days: vault.max_retention_days,
810                    ..Default::default()
811                })
812            }
813            Err(e) => backup_error_response(&e),
814        }
815    }
816
817    async fn handle_delete_backup_vault(
818        &self,
819        state: &Arc<tokio::sync::RwLock<BackupState>>,
820        vault_name: &str,
821    ) -> MockResponse {
822        let mut state = state.write().await;
823        match state.delete_backup_vault(vault_name) {
824            Ok(()) => wire::serialize_delete_backup_vault_response(),
825            Err(e) => backup_error_response(&e),
826        }
827    }
828
829    async fn handle_list_backup_vaults(
830        &self,
831        state: &Arc<tokio::sync::RwLock<BackupState>>,
832    ) -> MockResponse {
833        let state = state.read().await;
834        let vaults = state.list_backup_vaults();
835        let entries: Vec<wire::BackupVaultListMember> = vaults
836            .iter()
837            .map(|v| {
838                let creation_epoch = v.creation_date.timestamp() as f64
839                    + (v.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
840                let lock_date_epoch = v.lock_date.map(|ld| {
841                    ld.timestamp() as f64 + (ld.timestamp_subsec_millis() as f64 / 1000.0)
842                });
843                wire::BackupVaultListMember {
844                    backup_vault_name: Some(v.backup_vault_name.clone()),
845                    backup_vault_arn: Some(v.backup_vault_arn.clone()),
846                    creation_date: Some(creation_epoch),
847                    number_of_recovery_points: Some(v.number_of_recovery_points),
848                    locked: if v.locked { Some(true) } else { None },
849                    min_retention_days: v.min_retention_days,
850                    max_retention_days: v.max_retention_days,
851                    lock_date: lock_date_epoch,
852                    ..Default::default()
853                }
854            })
855            .collect();
856        wire::serialize_list_backup_vaults_response(&wire::ListBackupVaultsOutput {
857            backup_vault_list: Some(entries),
858            ..Default::default()
859        })
860    }
861
862    // --- Backup Plan handlers ---
863
864    async fn handle_create_backup_plan(
865        &self,
866        state: &Arc<tokio::sync::RwLock<BackupState>>,
867        request: &MockRequest,
868        query: &HashMap<String, String>,
869        region: &str,
870        account_id: &str,
871    ) -> MockResponse {
872        // Inspect raw body to preserve original "missing field" error messages.
873        let raw: Value = if request.body.is_empty() {
874            Value::Null
875        } else {
876            serde_json::from_slice(&request.body).unwrap_or(Value::Null)
877        };
878        if raw.get("BackupPlan").is_none() {
879            return rest_json_error(
880                400,
881                "InvalidParameterValueException",
882                "Missing 'BackupPlan'",
883            );
884        }
885        let input = match wire::deserialize_create_backup_plan_request(request, &[], query) {
886            Ok(v) => v,
887            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
888        };
889        if input.backup_plan.backup_plan_name.is_empty() {
890            return rest_json_error(
891                400,
892                "InvalidParameterValueException",
893                "Missing 'BackupPlanName'",
894            );
895        }
896        let plan_name = input.backup_plan.backup_plan_name.clone();
897        let backup_plan_value = serde_json::to_value(&input.backup_plan).unwrap_or(Value::Null);
898
899        let tags: HashMap<String, String> = input.backup_plan_tags.unwrap_or_default();
900
901        let mut state = state.write().await;
902        match state.create_backup_plan(&plan_name, &backup_plan_value, region, account_id, tags) {
903            Ok(plan) => {
904                let creation_epoch = plan.creation_date.timestamp() as f64
905                    + (plan.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
906                wire::serialize_create_backup_plan_response(&wire::CreateBackupPlanOutput {
907                    backup_plan_id: Some(plan.backup_plan_id.clone()),
908                    backup_plan_arn: Some(plan.backup_plan_arn.clone()),
909                    creation_date: Some(creation_epoch),
910                    version_id: Some(plan.version_id.clone()),
911                    ..Default::default()
912                })
913            }
914            Err(e) => backup_error_response(&e),
915        }
916    }
917
918    async fn handle_get_backup_plan(
919        &self,
920        state: &Arc<tokio::sync::RwLock<BackupState>>,
921        plan_id: &str,
922    ) -> MockResponse {
923        let state = state.read().await;
924        match state.get_backup_plan(plan_id) {
925            Ok(plan) => {
926                let creation_epoch = plan.creation_date.timestamp() as f64
927                    + (plan.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
928
929                // Build BackupPlan wire type from stored JSON
930                let rules_json = plan
931                    .backup_plan_json
932                    .get("Rules")
933                    .cloned()
934                    .unwrap_or(Value::Array(vec![]));
935                let rules: Vec<wire::BackupRule> = if let Value::Array(arr) = &rules_json {
936                    arr.iter()
937                        .map(|r| wire::BackupRule {
938                            rule_name: r
939                                .get("RuleName")
940                                .and_then(|v| v.as_str())
941                                .map(|s| s.to_string()),
942                            target_backup_vault_name: r
943                                .get("TargetBackupVaultName")
944                                .and_then(|v| v.as_str())
945                                .map(|s| s.to_string()),
946                            schedule_expression: r
947                                .get("ScheduleExpression")
948                                .and_then(|v| v.as_str())
949                                .map(|s| s.to_string()),
950                            rule_id: r
951                                .get("RuleId")
952                                .and_then(|v| v.as_str())
953                                .map(|s| s.to_string()),
954                            start_window_minutes: r
955                                .get("StartWindowMinutes")
956                                .and_then(|v| v.as_i64()),
957                            completion_window_minutes: r
958                                .get("CompletionWindowMinutes")
959                                .and_then(|v| v.as_i64()),
960                            ..Default::default()
961                        })
962                        .collect()
963                } else {
964                    vec![]
965                };
966
967                let backup_plan_wire = wire::BackupPlan {
968                    backup_plan_name: Some(plan.backup_plan_name.clone()),
969                    rules: Some(rules),
970                    ..Default::default()
971                };
972
973                wire::serialize_get_backup_plan_response(&wire::GetBackupPlanOutput {
974                    backup_plan: Some(backup_plan_wire),
975                    backup_plan_id: Some(plan.backup_plan_id.clone()),
976                    backup_plan_arn: Some(plan.backup_plan_arn.clone()),
977                    creation_date: Some(creation_epoch),
978                    version_id: Some(plan.version_id.clone()),
979                    ..Default::default()
980                })
981            }
982            Err(e) => backup_error_response(&e),
983        }
984    }
985
986    async fn handle_delete_backup_plan(
987        &self,
988        state: &Arc<tokio::sync::RwLock<BackupState>>,
989        plan_id: &str,
990    ) -> MockResponse {
991        let mut state = state.write().await;
992        match state.delete_backup_plan(plan_id) {
993            Ok(plan) => {
994                let deletion_epoch = chrono::Utc::now().timestamp() as f64;
995                wire::serialize_delete_backup_plan_response(&wire::DeleteBackupPlanOutput {
996                    backup_plan_id: Some(plan.backup_plan_id),
997                    backup_plan_arn: Some(plan.backup_plan_arn),
998                    deletion_date: Some(deletion_epoch),
999                    version_id: Some(plan.version_id),
1000                    ..Default::default()
1001                })
1002            }
1003            Err(e) => backup_error_response(&e),
1004        }
1005    }
1006
1007    async fn handle_list_backup_plans(
1008        &self,
1009        state: &Arc<tokio::sync::RwLock<BackupState>>,
1010    ) -> MockResponse {
1011        let state = state.read().await;
1012        let plans = state.list_backup_plans();
1013        let entries: Vec<wire::BackupPlansListMember> = plans
1014            .iter()
1015            .map(|p| {
1016                let creation_epoch = p.creation_date.timestamp() as f64
1017                    + (p.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
1018                wire::BackupPlansListMember {
1019                    backup_plan_id: Some(p.backup_plan_id.clone()),
1020                    backup_plan_arn: Some(p.backup_plan_arn.clone()),
1021                    backup_plan_name: Some(p.backup_plan_name.clone()),
1022                    creation_date: Some(creation_epoch),
1023                    version_id: Some(p.version_id.clone()),
1024                    ..Default::default()
1025                }
1026            })
1027            .collect();
1028        wire::serialize_list_backup_plans_response(&wire::ListBackupPlansOutput {
1029            backup_plans_list: Some(entries),
1030            ..Default::default()
1031        })
1032    }
1033
1034    // --- Report Plan handlers ---
1035
1036    async fn handle_create_report_plan(
1037        &self,
1038        state: &Arc<tokio::sync::RwLock<BackupState>>,
1039        request: &MockRequest,
1040        query: &HashMap<String, String>,
1041        region: &str,
1042        account_id: &str,
1043    ) -> MockResponse {
1044        let raw: Value = if request.body.is_empty() {
1045            Value::Null
1046        } else {
1047            serde_json::from_slice(&request.body).unwrap_or(Value::Null)
1048        };
1049        if raw.get("ReportPlanName").is_none() {
1050            return rest_json_error(
1051                400,
1052                "InvalidParameterValueException",
1053                "Missing 'ReportPlanName'",
1054            );
1055        }
1056        if raw.get("ReportDeliveryChannel").is_none() {
1057            return rest_json_error(
1058                400,
1059                "InvalidParameterValueException",
1060                "Missing 'ReportDeliveryChannel'",
1061            );
1062        }
1063        if raw.get("ReportSetting").is_none() {
1064            return rest_json_error(
1065                400,
1066                "InvalidParameterValueException",
1067                "Missing 'ReportSetting'",
1068            );
1069        }
1070        let input = match wire::deserialize_create_report_plan_request(request, &[], query) {
1071            Ok(v) => v,
1072            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1073        };
1074
1075        let report_plan_name = input.report_plan_name.as_str();
1076        let report_delivery_channel =
1077            serde_json::to_value(&input.report_delivery_channel).unwrap_or(Value::Null);
1078        let report_setting = serde_json::to_value(&input.report_setting).unwrap_or(Value::Null);
1079        let description = input.report_plan_description.as_deref().unwrap_or("");
1080        let tags: HashMap<String, String> = input.report_plan_tags.unwrap_or_default();
1081
1082        let mut state = state.write().await;
1083        match state.create_report_plan(
1084            report_plan_name,
1085            description,
1086            &report_delivery_channel,
1087            &report_setting,
1088            region,
1089            account_id,
1090            tags,
1091        ) {
1092            Ok(plan) => {
1093                let creation_epoch = plan.creation_time.timestamp() as f64
1094                    + (plan.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
1095                wire::serialize_create_report_plan_response(&wire::CreateReportPlanOutput {
1096                    report_plan_name: Some(plan.report_plan_name.clone()),
1097                    report_plan_arn: Some(plan.report_plan_arn.clone()),
1098                    creation_time: Some(creation_epoch),
1099                    ..Default::default()
1100                })
1101            }
1102            Err(e) => backup_error_response(&e),
1103        }
1104    }
1105
1106    async fn handle_describe_report_plan(
1107        &self,
1108        state: &Arc<tokio::sync::RwLock<BackupState>>,
1109        name: &str,
1110    ) -> MockResponse {
1111        let state = state.read().await;
1112        match state.describe_report_plan(name) {
1113            Ok(plan) => {
1114                let creation_epoch = plan.creation_time.timestamp() as f64
1115                    + (plan.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
1116
1117                let dc = &plan.report_delivery_channel;
1118                let delivery_channel = wire::ReportDeliveryChannel {
1119                    s3_bucket_name: dc
1120                        .get("S3BucketName")
1121                        .and_then(|v| v.as_str())
1122                        .unwrap_or("")
1123                        .to_string(),
1124                    s3_key_prefix: dc
1125                        .get("S3KeyPrefix")
1126                        .and_then(|v| v.as_str())
1127                        .map(|s| s.to_string()),
1128                    formats: dc.get("Formats").and_then(|v| {
1129                        v.as_array().map(|arr| {
1130                            arr.iter()
1131                                .filter_map(|x| x.as_str().map(|s| s.to_string()))
1132                                .collect()
1133                        })
1134                    }),
1135                };
1136
1137                let rs = &plan.report_setting;
1138                let report_setting = wire::ReportSetting {
1139                    report_template: rs
1140                        .get("ReportTemplate")
1141                        .and_then(|v| v.as_str())
1142                        .unwrap_or("")
1143                        .to_string(),
1144                    ..Default::default()
1145                };
1146
1147                wire::serialize_describe_report_plan_response(&wire::DescribeReportPlanOutput {
1148                    report_plan: Some(wire::ReportPlan {
1149                        report_plan_name: Some(plan.report_plan_name.clone()),
1150                        report_plan_arn: Some(plan.report_plan_arn.clone()),
1151                        report_plan_description: Some(plan.report_plan_description.clone()),
1152                        report_delivery_channel: Some(delivery_channel),
1153                        report_setting: Some(report_setting),
1154                        creation_time: Some(creation_epoch),
1155                        deployment_status: Some(plan.deployment_status.clone()),
1156                        ..Default::default()
1157                    }),
1158                    ..Default::default()
1159                })
1160            }
1161            Err(e) => backup_error_response(&e),
1162        }
1163    }
1164
1165    async fn handle_delete_report_plan(
1166        &self,
1167        state: &Arc<tokio::sync::RwLock<BackupState>>,
1168        name: &str,
1169    ) -> MockResponse {
1170        let mut state = state.write().await;
1171        match state.delete_report_plan(name) {
1172            Ok(()) => wire::serialize_delete_report_plan_response(),
1173            Err(e) => backup_error_response(&e),
1174        }
1175    }
1176
1177    async fn handle_list_report_plans(
1178        &self,
1179        state: &Arc<tokio::sync::RwLock<BackupState>>,
1180    ) -> MockResponse {
1181        let state = state.read().await;
1182        let plans = state.list_report_plans();
1183        let entries: Vec<wire::ReportPlan> = plans
1184            .iter()
1185            .map(|p| {
1186                let creation_epoch = p.creation_time.timestamp() as f64
1187                    + (p.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
1188                wire::ReportPlan {
1189                    report_plan_name: Some(p.report_plan_name.clone()),
1190                    report_plan_arn: Some(p.report_plan_arn.clone()),
1191                    report_plan_description: Some(p.report_plan_description.clone()),
1192                    creation_time: Some(creation_epoch),
1193                    deployment_status: Some(p.deployment_status.clone()),
1194                    ..Default::default()
1195                }
1196            })
1197            .collect();
1198        wire::serialize_list_report_plans_response(&wire::ListReportPlansOutput {
1199            report_plans: Some(entries),
1200            ..Default::default()
1201        })
1202    }
1203
1204    // --- Vault Lock handlers ---
1205
1206    async fn handle_put_backup_vault_lock_configuration(
1207        &self,
1208        state: &Arc<tokio::sync::RwLock<BackupState>>,
1209        vault_name: &str,
1210        request: &MockRequest,
1211        query: &HashMap<String, String>,
1212    ) -> MockResponse {
1213        let labels: &[(&str, &str)] = &[("BackupVaultName", vault_name)];
1214        let input = match wire::deserialize_put_backup_vault_lock_configuration_request(
1215            request, labels, query,
1216        ) {
1217            Ok(v) => v,
1218            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1219        };
1220        let min_retention_days = input.min_retention_days;
1221        let max_retention_days = input.max_retention_days;
1222
1223        let mut state = state.write().await;
1224        match state.put_backup_vault_lock_configuration(
1225            vault_name,
1226            min_retention_days,
1227            max_retention_days,
1228        ) {
1229            Ok(()) => wire::serialize_put_backup_vault_lock_configuration_response(),
1230            Err(e) => backup_error_response(&e),
1231        }
1232    }
1233
1234    async fn handle_delete_backup_vault_lock_configuration(
1235        &self,
1236        state: &Arc<tokio::sync::RwLock<BackupState>>,
1237        vault_name: &str,
1238    ) -> MockResponse {
1239        let mut state = state.write().await;
1240        match state.delete_backup_vault_lock_configuration(vault_name) {
1241            Ok(()) => wire::serialize_delete_backup_vault_lock_configuration_response(),
1242            Err(e) => backup_error_response(&e),
1243        }
1244    }
1245
1246    // --- Tag handlers ---
1247
1248    async fn handle_list_tags(
1249        &self,
1250        state: &Arc<tokio::sync::RwLock<BackupState>>,
1251        resource_arn: &str,
1252    ) -> MockResponse {
1253        let state = state.read().await;
1254        let tags = state.list_tags(resource_arn);
1255        wire::serialize_list_tags_response(&wire::ListTagsOutput {
1256            tags: if tags.is_empty() { None } else { Some(tags) },
1257            ..Default::default()
1258        })
1259    }
1260
1261    async fn handle_tag_resource(
1262        &self,
1263        state: &Arc<tokio::sync::RwLock<BackupState>>,
1264        resource_arn: &str,
1265        request: &MockRequest,
1266        query: &HashMap<String, String>,
1267    ) -> MockResponse {
1268        let labels: &[(&str, &str)] = &[("ResourceArn", resource_arn)];
1269        let input = match wire::deserialize_tag_resource_request(request, labels, query) {
1270            Ok(v) => v,
1271            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1272        };
1273        let tags: HashMap<String, String> = input.tags;
1274
1275        let mut state = state.write().await;
1276        match state.tag_resource(&input.resource_arn, tags) {
1277            Ok(()) => wire::serialize_tag_resource_response(),
1278            Err(e) => backup_error_response(&e),
1279        }
1280    }
1281
1282    async fn handle_untag_resource(
1283        &self,
1284        state: &Arc<tokio::sync::RwLock<BackupState>>,
1285        resource_arn: &str,
1286        request: &MockRequest,
1287        query: &HashMap<String, String>,
1288    ) -> MockResponse {
1289        let labels: &[(&str, &str)] = &[("ResourceArn", resource_arn)];
1290        let input = match wire::deserialize_untag_resource_request(request, labels, query) {
1291            Ok(v) => v,
1292            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1293        };
1294        let tag_keys: Vec<String> = input.tag_key_list;
1295
1296        let mut state = state.write().await;
1297        match state.untag_resource(&input.resource_arn, &tag_keys) {
1298            Ok(()) => wire::serialize_untag_resource_response(),
1299            Err(e) => backup_error_response(&e),
1300        }
1301    }
1302
1303    // --- Stub handlers for remaining operations ---
1304
1305    // MPA approval team association is a no-op in mock (approval workflow)
1306    async fn handle_associate_backup_vault_mpa_approval_team(&self) -> MockResponse {
1307        wire::serialize_associate_backup_vault_mpa_approval_team_response()
1308    }
1309
1310    async fn handle_cancel_legal_hold(
1311        &self,
1312        state: &Arc<tokio::sync::RwLock<BackupState>>,
1313        legal_hold_id: &str,
1314    ) -> MockResponse {
1315        let mut state = state.write().await;
1316        match state.cancel_legal_hold(legal_hold_id) {
1317            Ok(()) => wire::serialize_cancel_legal_hold_response(&wire::CancelLegalHoldOutput {}),
1318            Err(e) => backup_error_response(&e),
1319        }
1320    }
1321
1322    async fn handle_create_backup_selection(
1323        &self,
1324        state: &Arc<tokio::sync::RwLock<BackupState>>,
1325        plan_id: &str,
1326        request: &MockRequest,
1327        query: &HashMap<String, String>,
1328    ) -> MockResponse {
1329        let raw: Value = if request.body.is_empty() {
1330            Value::Null
1331        } else {
1332            serde_json::from_slice(&request.body).unwrap_or(Value::Null)
1333        };
1334        if raw.get("BackupSelection").is_none() {
1335            return rest_json_error(
1336                400,
1337                "InvalidParameterValueException",
1338                "Missing 'BackupSelection'",
1339            );
1340        }
1341        let labels: &[(&str, &str)] = &[("BackupPlanId", plan_id)];
1342        let input = match wire::deserialize_create_backup_selection_request(request, labels, query)
1343        {
1344            Ok(v) => v,
1345            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1346        };
1347        if input.backup_selection.selection_name.is_empty() {
1348            return rest_json_error(
1349                400,
1350                "InvalidParameterValueException",
1351                "Missing 'SelectionName'",
1352            );
1353        }
1354        let selection_name = input.backup_selection.selection_name.clone();
1355        let iam_role_arn = input.backup_selection.iam_role_arn.clone();
1356        let resources: Vec<String> = input.backup_selection.resources.clone().unwrap_or_default();
1357        let backup_selection_value =
1358            serde_json::to_value(&input.backup_selection).unwrap_or(Value::Null);
1359
1360        let mut state = state.write().await;
1361        match state.create_backup_selection(
1362            plan_id,
1363            &selection_name,
1364            &iam_role_arn,
1365            resources,
1366            backup_selection_value,
1367        ) {
1368            Ok(sel) => {
1369                let creation_epoch = sel.creation_date.timestamp() as f64
1370                    + (sel.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
1371                wire::serialize_create_backup_selection_response(
1372                    &wire::CreateBackupSelectionOutput {
1373                        backup_plan_id: Some(sel.backup_plan_id.clone()),
1374                        selection_id: Some(sel.selection_id.clone()),
1375                        creation_date: Some(creation_epoch),
1376                    },
1377                )
1378            }
1379            Err(e) => backup_error_response(&e),
1380        }
1381    }
1382
1383    async fn handle_create_framework(
1384        &self,
1385        state: &Arc<tokio::sync::RwLock<BackupState>>,
1386        request: &MockRequest,
1387        query: &HashMap<String, String>,
1388        region: &str,
1389        account_id: &str,
1390    ) -> MockResponse {
1391        let input = match wire::deserialize_create_framework_request(request, &[], query) {
1392            Ok(v) => v,
1393            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1394        };
1395        if input.framework_name.is_empty() {
1396            return rest_json_error(
1397                400,
1398                "InvalidParameterValueException",
1399                "Missing 'FrameworkName'",
1400            );
1401        }
1402        let name = input.framework_name.clone();
1403        let description = input.framework_description.clone().unwrap_or_default();
1404        let controls =
1405            serde_json::to_value(&input.framework_controls).unwrap_or(Value::Array(vec![]));
1406        let tags: HashMap<String, String> = input.framework_tags.unwrap_or_default();
1407
1408        let mut state = state.write().await;
1409        match state.create_framework(&name, &description, controls, region, account_id, tags) {
1410            Ok(fw) => wire::serialize_create_framework_response(&wire::CreateFrameworkOutput {
1411                framework_name: Some(fw.framework_name.clone()),
1412                framework_arn: Some(fw.framework_arn.clone()),
1413            }),
1414            Err(e) => backup_error_response(&e),
1415        }
1416    }
1417
1418    async fn handle_create_legal_hold(
1419        &self,
1420        state: &Arc<tokio::sync::RwLock<BackupState>>,
1421        request: &MockRequest,
1422        query: &HashMap<String, String>,
1423        region: &str,
1424        account_id: &str,
1425    ) -> MockResponse {
1426        let input = match wire::deserialize_create_legal_hold_request(request, &[], query) {
1427            Ok(v) => v,
1428            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1429        };
1430        let title = input.title.clone();
1431        let description = input.description.clone();
1432        let recovery_point_selection = input
1433            .recovery_point_selection
1434            .as_ref()
1435            .map(|v| serde_json::to_value(v).unwrap_or(Value::Null))
1436            .unwrap_or(Value::Null);
1437        let tags: HashMap<String, String> = input.tags.unwrap_or_default();
1438
1439        let mut state = state.write().await;
1440        match state.create_legal_hold(
1441            &title,
1442            &description,
1443            recovery_point_selection,
1444            region,
1445            account_id,
1446            tags,
1447        ) {
1448            Ok(hold) => {
1449                let creation_epoch = hold.creation_date.timestamp() as f64
1450                    + (hold.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
1451                let rps: Option<wire::RecoveryPointSelection> =
1452                    serde_json::from_value(hold.recovery_point_selection.clone()).ok();
1453                wire::serialize_create_legal_hold_response(&wire::CreateLegalHoldOutput {
1454                    legal_hold_id: Some(hold.legal_hold_id.clone()),
1455                    legal_hold_arn: Some(hold.legal_hold_arn.clone()),
1456                    title: Some(hold.title.clone()),
1457                    description: Some(hold.description.clone()),
1458                    status: Some(hold.status.clone()),
1459                    creation_date: Some(creation_epoch),
1460                    recovery_point_selection: rps,
1461                })
1462            }
1463            Err(e) => backup_error_response(&e),
1464        }
1465    }
1466
1467    async fn handle_create_logically_air_gapped_backup_vault(
1468        &self,
1469        state: &Arc<tokio::sync::RwLock<BackupState>>,
1470        vault_name: &str,
1471        region: &str,
1472        account_id: &str,
1473        request: &MockRequest,
1474        query: &HashMap<String, String>,
1475    ) -> MockResponse {
1476        let labels: &[(&str, &str)] = &[("BackupVaultName", vault_name)];
1477        let input = match wire::deserialize_create_logically_air_gapped_backup_vault_request(
1478            request, labels, query,
1479        ) {
1480            Ok(v) => v,
1481            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1482        };
1483        // Logically air-gapped vaults are stored as regular vaults with lock config
1484        let arn = format!("arn:aws:backup:{region}:{account_id}:backup-vault:{vault_name}");
1485        let tags: HashMap<String, String> = input.backup_vault_tags.unwrap_or_default();
1486        let min_retention_days = input.min_retention_days;
1487        let max_retention_days = input.max_retention_days;
1488
1489        let mut state = state.write().await;
1490        match state.create_backup_vault(vault_name, &arn, tags) {
1491            Ok(_) => {
1492                // Apply lock configuration
1493                let _ = state.put_backup_vault_lock_configuration(
1494                    vault_name,
1495                    Some(min_retention_days),
1496                    Some(max_retention_days),
1497                );
1498                let vault = state.describe_backup_vault(vault_name).unwrap();
1499                let creation_epoch = vault.creation_date.timestamp() as f64
1500                    + (vault.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
1501                wire::serialize_create_logically_air_gapped_backup_vault_response(
1502                    &wire::CreateLogicallyAirGappedBackupVaultOutput {
1503                        backup_vault_name: Some(vault_name.to_string()),
1504                        backup_vault_arn: Some(vault.backup_vault_arn.clone()),
1505                        creation_date: Some(creation_epoch),
1506                        vault_state: Some("AVAILABLE".to_string()),
1507                    },
1508                )
1509            }
1510            Err(e) => backup_error_response(&e),
1511        }
1512    }
1513
1514    async fn handle_create_restore_access_backup_vault(
1515        &self,
1516        state: &Arc<tokio::sync::RwLock<BackupState>>,
1517        request: &MockRequest,
1518        query: &HashMap<String, String>,
1519        region: &str,
1520        account_id: &str,
1521    ) -> MockResponse {
1522        let input =
1523            match wire::deserialize_create_restore_access_backup_vault_request(request, &[], query)
1524            {
1525                Ok(v) => v,
1526                Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1527            };
1528        // Create a restore access backup vault (backed by a regular vault entry)
1529        let source_vault_arn = input.source_backup_vault_arn.as_str();
1530        let derived_name = input.backup_vault_name.clone().unwrap_or_else(|| {
1531            source_vault_arn
1532                .rsplit(':')
1533                .next()
1534                .unwrap_or("restore-access")
1535                .to_string()
1536        });
1537        let vault_name = derived_name.as_str();
1538        let tags: HashMap<String, String> = input.backup_vault_tags.unwrap_or_default();
1539        let arn = format!(
1540            "arn:aws:backup:{region}:{account_id}:restore-access-backup-vault:{vault_name}"
1541        );
1542
1543        let mut state = state.write().await;
1544        match state.create_backup_vault(vault_name, &arn, tags) {
1545            Ok(vault) => {
1546                let creation_epoch = vault.creation_date.timestamp() as f64
1547                    + (vault.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
1548                wire::serialize_create_restore_access_backup_vault_response(
1549                    &wire::CreateRestoreAccessBackupVaultOutput {
1550                        restore_access_backup_vault_name: Some(vault_name.to_string()),
1551                        restore_access_backup_vault_arn: Some(arn),
1552                        creation_date: Some(creation_epoch),
1553                        vault_state: Some("AVAILABLE".to_string()),
1554                    },
1555                )
1556            }
1557            Err(e) => backup_error_response(&e),
1558        }
1559    }
1560
1561    async fn handle_create_restore_testing_plan(
1562        &self,
1563        state: &Arc<tokio::sync::RwLock<BackupState>>,
1564        request: &MockRequest,
1565        query: &HashMap<String, String>,
1566        region: &str,
1567        account_id: &str,
1568    ) -> MockResponse {
1569        let raw: Value = if request.body.is_empty() {
1570            Value::Null
1571        } else {
1572            serde_json::from_slice(&request.body).unwrap_or(Value::Null)
1573        };
1574        if raw.get("RestoreTestingPlan").is_none() {
1575            return rest_json_error(
1576                400,
1577                "InvalidParameterValueException",
1578                "Missing 'RestoreTestingPlan'",
1579            );
1580        }
1581        let input = match wire::deserialize_create_restore_testing_plan_request(request, &[], query)
1582        {
1583            Ok(v) => v,
1584            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1585        };
1586        if input
1587            .restore_testing_plan
1588            .restore_testing_plan_name
1589            .is_empty()
1590        {
1591            return rest_json_error(
1592                400,
1593                "InvalidParameterValueException",
1594                "Missing 'RestoreTestingPlanName'",
1595            );
1596        }
1597        let name = input.restore_testing_plan.restore_testing_plan_name.clone();
1598        let schedule_expression = input.restore_testing_plan.schedule_expression.clone();
1599        let schedule_expression_timezone = input
1600            .restore_testing_plan
1601            .schedule_expression_timezone
1602            .clone();
1603        let start_window_hours = input.restore_testing_plan.start_window_hours;
1604        let recovery_point_selection =
1605            serde_json::to_value(&input.restore_testing_plan.recovery_point_selection)
1606                .unwrap_or(Value::Null);
1607        let creator_request_id = input.creator_request_id.clone();
1608        let tags: HashMap<String, String> = input.tags.unwrap_or_default();
1609
1610        let mut state = state.write().await;
1611        match state.create_restore_testing_plan(
1612            &name,
1613            &schedule_expression,
1614            schedule_expression_timezone,
1615            start_window_hours,
1616            recovery_point_selection,
1617            creator_request_id,
1618            region,
1619            account_id,
1620            tags,
1621        ) {
1622            Ok(plan) => {
1623                let creation_epoch = plan.creation_time.timestamp() as f64
1624                    + (plan.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
1625                wire::serialize_create_restore_testing_plan_response(
1626                    &wire::CreateRestoreTestingPlanOutput {
1627                        restore_testing_plan_name: Some(plan.restore_testing_plan_name.clone()),
1628                        restore_testing_plan_arn: Some(plan.restore_testing_plan_arn.clone()),
1629                        creation_time: Some(creation_epoch),
1630                    },
1631                )
1632            }
1633            Err(e) => backup_error_response(&e),
1634        }
1635    }
1636
1637    async fn handle_create_restore_testing_selection(
1638        &self,
1639        state: &Arc<tokio::sync::RwLock<BackupState>>,
1640        plan_name: &str,
1641        request: &MockRequest,
1642        query: &HashMap<String, String>,
1643    ) -> MockResponse {
1644        let raw: Value = if request.body.is_empty() {
1645            Value::Null
1646        } else {
1647            serde_json::from_slice(&request.body).unwrap_or(Value::Null)
1648        };
1649        if raw.get("RestoreTestingSelection").is_none() {
1650            return rest_json_error(
1651                400,
1652                "InvalidParameterValueException",
1653                "Missing 'RestoreTestingSelection'",
1654            );
1655        }
1656        let labels: &[(&str, &str)] = &[("RestoreTestingPlanName", plan_name)];
1657        let input = match wire::deserialize_create_restore_testing_selection_request(
1658            request, labels, query,
1659        ) {
1660            Ok(v) => v,
1661            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1662        };
1663        let sel = &input.restore_testing_selection;
1664        if sel.restore_testing_selection_name.is_empty() {
1665            return rest_json_error(
1666                400,
1667                "InvalidParameterValueException",
1668                "Missing 'RestoreTestingSelectionName'",
1669            );
1670        }
1671        let selection_name = sel.restore_testing_selection_name.clone();
1672        let iam_role_arn = sel.iam_role_arn.clone();
1673        let protected_resource_type = sel.protected_resource_type.clone();
1674        let protected_resource_arns: Vec<String> =
1675            sel.protected_resource_arns.clone().unwrap_or_default();
1676        let protected_resource_conditions = sel
1677            .protected_resource_conditions
1678            .as_ref()
1679            .map(|v| serde_json::to_value(v).unwrap_or(Value::Null))
1680            .unwrap_or(Value::Null);
1681        let restore_metadata_overrides: HashMap<String, String> =
1682            sel.restore_metadata_overrides.clone().unwrap_or_default();
1683        let validation_window_hours = sel.validation_window_hours;
1684        let creator_request_id = input.creator_request_id.clone();
1685
1686        let mut state = state.write().await;
1687        match state.create_restore_testing_selection(
1688            plan_name,
1689            &selection_name,
1690            &iam_role_arn,
1691            &protected_resource_type,
1692            protected_resource_arns,
1693            protected_resource_conditions,
1694            restore_metadata_overrides,
1695            validation_window_hours,
1696            creator_request_id,
1697        ) {
1698            Ok(sel) => {
1699                let creation_epoch = sel.creation_time.timestamp() as f64
1700                    + (sel.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
1701                wire::serialize_create_restore_testing_selection_response(
1702                    &wire::CreateRestoreTestingSelectionOutput {
1703                        restore_testing_selection_name: Some(
1704                            sel.restore_testing_selection_name.clone(),
1705                        ),
1706                        restore_testing_plan_name: Some(sel.restore_testing_plan_name.clone()),
1707                        restore_testing_plan_arn: Some(sel.restore_testing_plan_arn.clone()),
1708                        creation_time: Some(creation_epoch),
1709                    },
1710                )
1711            }
1712            Err(e) => backup_error_response(&e),
1713        }
1714    }
1715
1716    async fn handle_create_tiering_configuration(
1717        &self,
1718        state: &Arc<tokio::sync::RwLock<BackupState>>,
1719        request: &MockRequest,
1720        query: &HashMap<String, String>,
1721        region: &str,
1722        account_id: &str,
1723    ) -> MockResponse {
1724        let raw: Value = if request.body.is_empty() {
1725            Value::Null
1726        } else {
1727            serde_json::from_slice(&request.body).unwrap_or(Value::Null)
1728        };
1729        if raw.get("TieringConfiguration").is_none() {
1730            return rest_json_error(
1731                400,
1732                "InvalidParameterValueException",
1733                "Missing 'TieringConfiguration'",
1734            );
1735        }
1736        let input =
1737            match wire::deserialize_create_tiering_configuration_request(request, &[], query) {
1738                Ok(v) => v,
1739                Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
1740            };
1741        let cfg = &input.tiering_configuration;
1742        if cfg.tiering_configuration_name.is_empty() {
1743            return rest_json_error(
1744                400,
1745                "InvalidParameterValueException",
1746                "Missing 'TieringConfigurationName'",
1747            );
1748        }
1749        let name = cfg.tiering_configuration_name.clone();
1750        let vault_name = cfg.backup_vault_name.clone();
1751        let resource_selection =
1752            serde_json::to_value(&cfg.resource_selection).unwrap_or(Value::Array(vec![]));
1753        let creator_request_id = input.creator_request_id.clone();
1754        let tags: HashMap<String, String> = input.tiering_configuration_tags.unwrap_or_default();
1755
1756        let mut state = state.write().await;
1757        match state.create_tiering_configuration(
1758            &name,
1759            &vault_name,
1760            resource_selection,
1761            creator_request_id,
1762            region,
1763            account_id,
1764            tags,
1765        ) {
1766            Ok(config) => {
1767                let creation_epoch = config.creation_time.timestamp() as f64
1768                    + (config.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
1769                wire::serialize_create_tiering_configuration_response(
1770                    &wire::CreateTieringConfigurationOutput {
1771                        tiering_configuration_name: Some(config.tiering_configuration_name.clone()),
1772                        tiering_configuration_arn: Some(config.tiering_configuration_arn.clone()),
1773                        creation_time: Some(creation_epoch),
1774                    },
1775                )
1776            }
1777            Err(e) => backup_error_response(&e),
1778        }
1779    }
1780
1781    async fn handle_delete_backup_selection(
1782        &self,
1783        state: &Arc<tokio::sync::RwLock<BackupState>>,
1784        plan_id: &str,
1785        selection_id: &str,
1786    ) -> MockResponse {
1787        let mut state = state.write().await;
1788        match state.delete_backup_selection(plan_id, selection_id) {
1789            Ok(()) => wire::serialize_delete_backup_selection_response(),
1790            Err(e) => backup_error_response(&e),
1791        }
1792    }
1793
1794    async fn handle_delete_backup_vault_access_policy(
1795        &self,
1796        state: &Arc<tokio::sync::RwLock<BackupState>>,
1797        vault_name: &str,
1798    ) -> MockResponse {
1799        let mut state = state.write().await;
1800        match state.delete_backup_vault_access_policy(vault_name) {
1801            Ok(()) => wire::serialize_delete_backup_vault_access_policy_response(),
1802            Err(e) => backup_error_response(&e),
1803        }
1804    }
1805
1806    async fn handle_delete_backup_vault_notifications(
1807        &self,
1808        state: &Arc<tokio::sync::RwLock<BackupState>>,
1809        vault_name: &str,
1810    ) -> MockResponse {
1811        let mut state = state.write().await;
1812        match state.delete_backup_vault_notifications(vault_name) {
1813            Ok(()) => wire::serialize_delete_backup_vault_notifications_response(),
1814            Err(e) => backup_error_response(&e),
1815        }
1816    }
1817
1818    async fn handle_delete_framework(
1819        &self,
1820        state: &Arc<tokio::sync::RwLock<BackupState>>,
1821        name: &str,
1822    ) -> MockResponse {
1823        let mut state = state.write().await;
1824        match state.delete_framework(name) {
1825            Ok(()) => wire::serialize_delete_framework_response(),
1826            Err(e) => backup_error_response(&e),
1827        }
1828    }
1829
1830    async fn handle_delete_recovery_point(
1831        &self,
1832        state: &Arc<tokio::sync::RwLock<BackupState>>,
1833        vault_name: &str,
1834        recovery_point_arn: &str,
1835    ) -> MockResponse {
1836        let mut state = state.write().await;
1837        match state.delete_recovery_point(vault_name, recovery_point_arn) {
1838            Ok(()) => wire::serialize_delete_recovery_point_response(),
1839            Err(e) => backup_error_response(&e),
1840        }
1841    }
1842
1843    async fn handle_delete_restore_testing_plan(
1844        &self,
1845        state: &Arc<tokio::sync::RwLock<BackupState>>,
1846        plan_name: &str,
1847    ) -> MockResponse {
1848        let mut state = state.write().await;
1849        match state.delete_restore_testing_plan(plan_name) {
1850            Ok(()) => wire::serialize_delete_restore_testing_plan_response(),
1851            Err(e) => backup_error_response(&e),
1852        }
1853    }
1854
1855    async fn handle_delete_restore_testing_selection(
1856        &self,
1857        state: &Arc<tokio::sync::RwLock<BackupState>>,
1858        plan_name: &str,
1859        selection_name: &str,
1860    ) -> MockResponse {
1861        let mut state = state.write().await;
1862        match state.delete_restore_testing_selection(plan_name, selection_name) {
1863            Ok(()) => wire::serialize_delete_restore_testing_selection_response(),
1864            Err(e) => backup_error_response(&e),
1865        }
1866    }
1867
1868    async fn handle_delete_tiering_configuration(
1869        &self,
1870        state: &Arc<tokio::sync::RwLock<BackupState>>,
1871        name: &str,
1872    ) -> MockResponse {
1873        let mut state = state.write().await;
1874        match state.delete_tiering_configuration(name) {
1875            Ok(()) => wire::serialize_delete_tiering_configuration_response(
1876                &wire::DeleteTieringConfigurationOutput {},
1877            ),
1878            Err(e) => backup_error_response(&e),
1879        }
1880    }
1881
1882    async fn handle_describe_backup_job(
1883        &self,
1884        state: &Arc<tokio::sync::RwLock<BackupState>>,
1885        job_id: &str,
1886    ) -> MockResponse {
1887        let state = state.read().await;
1888        match state.describe_backup_job(job_id) {
1889            Ok(job) => {
1890                let creation_epoch = job.creation_date.timestamp() as f64
1891                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
1892                let completion_epoch = job
1893                    .completion_date
1894                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
1895                wire::serialize_describe_backup_job_response(&wire::DescribeBackupJobOutput {
1896                    backup_job_id: Some(job.backup_job_id.clone()),
1897                    backup_vault_name: Some(job.backup_vault_name.clone()),
1898                    backup_vault_arn: Some(job.backup_vault_arn.clone()),
1899                    recovery_point_arn: Some(job.recovery_point_arn.clone()),
1900                    resource_arn: Some(job.resource_arn.clone()),
1901                    resource_type: Some(job.resource_type.clone()),
1902                    iam_role_arn: Some(job.iam_role_arn.clone()),
1903                    state: Some(job.state.clone()),
1904                    creation_date: Some(creation_epoch),
1905                    completion_date: completion_epoch,
1906                    account_id: Some(job.account_id.clone()),
1907                    ..Default::default()
1908                })
1909            }
1910            Err(e) => backup_error_response(&e),
1911        }
1912    }
1913
1914    async fn handle_describe_copy_job(
1915        &self,
1916        state: &Arc<tokio::sync::RwLock<BackupState>>,
1917        copy_job_id: &str,
1918    ) -> MockResponse {
1919        let state = state.read().await;
1920        match state.describe_copy_job(copy_job_id) {
1921            Ok(job) => {
1922                let creation_epoch = job.creation_date.timestamp() as f64
1923                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
1924                let completion_epoch = job
1925                    .completion_date
1926                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
1927                wire::serialize_describe_copy_job_response(&wire::DescribeCopyJobOutput {
1928                    copy_job: Some(wire::CopyJob {
1929                        copy_job_id: Some(job.copy_job_id.clone()),
1930                        source_backup_vault_arn: Some(job.source_backup_vault_arn.clone()),
1931                        source_recovery_point_arn: Some(job.source_recovery_point_arn.clone()),
1932                        destination_backup_vault_arn: Some(
1933                            job.destination_backup_vault_arn.clone(),
1934                        ),
1935                        destination_recovery_point_arn: Some(
1936                            job.destination_recovery_point_arn.clone(),
1937                        ),
1938                        iam_role_arn: Some(job.iam_role_arn.clone()),
1939                        state: Some(job.state.clone()),
1940                        creation_date: Some(creation_epoch),
1941                        completion_date: completion_epoch,
1942                        account_id: Some(job.account_id.clone()),
1943                        ..Default::default()
1944                    }),
1945                })
1946            }
1947            Err(e) => backup_error_response(&e),
1948        }
1949    }
1950
1951    async fn handle_describe_framework(
1952        &self,
1953        state: &Arc<tokio::sync::RwLock<BackupState>>,
1954        name: &str,
1955    ) -> MockResponse {
1956        let state = state.read().await;
1957        match state.describe_framework(name) {
1958            Ok(fw) => {
1959                let creation_epoch = fw.creation_time.timestamp() as f64
1960                    + (fw.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
1961                let controls: Vec<wire::FrameworkControl> =
1962                    if let Some(arr) = fw.framework_controls.as_array() {
1963                        arr.iter()
1964                            .filter_map(|c| serde_json::from_value(c.clone()).ok())
1965                            .collect()
1966                    } else {
1967                        vec![]
1968                    };
1969                wire::serialize_describe_framework_response(&wire::DescribeFrameworkOutput {
1970                    framework_name: Some(fw.framework_name.clone()),
1971                    framework_arn: Some(fw.framework_arn.clone()),
1972                    framework_description: Some(fw.framework_description.clone()),
1973                    framework_controls: Some(controls),
1974                    creation_time: Some(creation_epoch),
1975                    deployment_status: Some(fw.deployment_status.clone()),
1976                    ..Default::default()
1977                })
1978            }
1979            Err(e) => backup_error_response(&e),
1980        }
1981    }
1982
1983    async fn handle_describe_global_settings(
1984        &self,
1985        state: &Arc<tokio::sync::RwLock<BackupState>>,
1986    ) -> MockResponse {
1987        let state = state.read().await;
1988        let gs = state.describe_global_settings();
1989        wire::serialize_describe_global_settings_response(&wire::DescribeGlobalSettingsOutput {
1990            global_settings: if gs.global_settings.is_empty() {
1991                None
1992            } else {
1993                Some(gs.global_settings.clone())
1994            },
1995            ..Default::default()
1996        })
1997    }
1998
1999    async fn handle_describe_protected_resource(
2000        &self,
2001        state: &Arc<tokio::sync::RwLock<BackupState>>,
2002        resource_arn: &str,
2003    ) -> MockResponse {
2004        // Derive protected resource info from recovery points
2005        let state = state.read().await;
2006        let matching_rp = state
2007            .recovery_points
2008            .values()
2009            .filter(|rp| rp.resource_arn == resource_arn)
2010            .max_by_key(|rp| rp.creation_date);
2011        match matching_rp {
2012            Some(rp) => {
2013                let last_backup_epoch = rp.creation_date.timestamp() as f64
2014                    + (rp.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2015                wire::serialize_describe_protected_resource_response(
2016                    &wire::DescribeProtectedResourceOutput {
2017                        resource_arn: Some(rp.resource_arn.clone()),
2018                        resource_type: Some(rp.resource_type.clone()),
2019                        last_backup_time: Some(last_backup_epoch),
2020                        last_backup_vault_arn: Some(rp.backup_vault_arn.clone()),
2021                        last_recovery_point_arn: Some(rp.recovery_point_arn.clone()),
2022                        ..Default::default()
2023                    },
2024                )
2025            }
2026            None => wire::serialize_describe_protected_resource_response(
2027                &wire::DescribeProtectedResourceOutput::default(),
2028            ),
2029        }
2030    }
2031
2032    async fn handle_describe_recovery_point(
2033        &self,
2034        state: &Arc<tokio::sync::RwLock<BackupState>>,
2035        vault_name: &str,
2036        recovery_point_arn: &str,
2037    ) -> MockResponse {
2038        let state = state.read().await;
2039        match state.describe_recovery_point(vault_name, recovery_point_arn) {
2040            Ok(rp) => {
2041                let creation_epoch = rp.creation_date.timestamp() as f64
2042                    + (rp.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2043                wire::serialize_describe_recovery_point_response(
2044                    &wire::DescribeRecoveryPointOutput {
2045                        recovery_point_arn: Some(rp.recovery_point_arn.clone()),
2046                        backup_vault_name: Some(rp.backup_vault_name.clone()),
2047                        backup_vault_arn: Some(rp.backup_vault_arn.clone()),
2048                        resource_arn: Some(rp.resource_arn.clone()),
2049                        resource_type: Some(rp.resource_type.clone()),
2050                        iam_role_arn: Some(rp.iam_role_arn.clone()),
2051                        status: Some(rp.status.clone()),
2052                        creation_date: Some(creation_epoch),
2053                        backup_size_in_bytes: Some(rp.backup_size_bytes),
2054                        ..Default::default()
2055                    },
2056                )
2057            }
2058            Err(e) => backup_error_response(&e),
2059        }
2060    }
2061
2062    async fn handle_describe_region_settings(
2063        &self,
2064        state: &Arc<tokio::sync::RwLock<BackupState>>,
2065    ) -> MockResponse {
2066        let state = state.read().await;
2067        let rs = state.describe_region_settings();
2068        wire::serialize_describe_region_settings_response(&wire::DescribeRegionSettingsOutput {
2069            resource_type_opt_in_preference: if rs.resource_type_opt_in_preference.is_empty() {
2070                None
2071            } else {
2072                Some(rs.resource_type_opt_in_preference.clone())
2073            },
2074            resource_type_management_preference: if rs
2075                .resource_type_management_preference
2076                .is_empty()
2077            {
2078                None
2079            } else {
2080                Some(rs.resource_type_management_preference.clone())
2081            },
2082        })
2083    }
2084
2085    async fn handle_describe_report_job(
2086        &self,
2087        state: &Arc<tokio::sync::RwLock<BackupState>>,
2088        job_id: &str,
2089    ) -> MockResponse {
2090        let state = state.read().await;
2091        match state.describe_report_job(job_id) {
2092            Ok(job) => {
2093                let creation_epoch = job.creation_time.timestamp() as f64
2094                    + (job.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
2095                let completion_epoch = job
2096                    .completion_time
2097                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
2098                wire::serialize_describe_report_job_response(&wire::DescribeReportJobOutput {
2099                    report_job: Some(wire::ReportJob {
2100                        report_job_id: Some(job.report_job_id.clone()),
2101                        report_plan_arn: Some(job.report_plan_arn.clone()),
2102                        report_template: Some(job.report_template.clone()),
2103                        creation_time: Some(creation_epoch),
2104                        completion_time: completion_epoch,
2105                        status: Some(job.status.clone()),
2106                        ..Default::default()
2107                    }),
2108                })
2109            }
2110            Err(e) => backup_error_response(&e),
2111        }
2112    }
2113
2114    async fn handle_describe_restore_job(
2115        &self,
2116        state: &Arc<tokio::sync::RwLock<BackupState>>,
2117        restore_job_id: &str,
2118    ) -> MockResponse {
2119        let state = state.read().await;
2120        match state.describe_restore_job(restore_job_id) {
2121            Ok(job) => {
2122                let creation_epoch = job.creation_date.timestamp() as f64
2123                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2124                let completion_epoch = job
2125                    .completion_date
2126                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
2127                wire::serialize_describe_restore_job_response(&wire::DescribeRestoreJobOutput {
2128                    restore_job_id: Some(job.restore_job_id.clone()),
2129                    recovery_point_arn: Some(job.recovery_point_arn.clone()),
2130                    resource_type: Some(job.resource_type.clone()),
2131                    iam_role_arn: Some(job.iam_role_arn.clone()),
2132                    status: Some(job.status.clone()),
2133                    creation_date: Some(creation_epoch),
2134                    completion_date: completion_epoch,
2135                    backup_size_in_bytes: Some(job.backup_size_in_bytes),
2136                    account_id: Some(job.account_id.clone()),
2137                    validation_status: job.validation_status.clone(),
2138                    validation_status_message: job.validation_status_message.clone(),
2139                    ..Default::default()
2140                })
2141            }
2142            Err(e) => backup_error_response(&e),
2143        }
2144    }
2145
2146    async fn handle_describe_scan_job(
2147        &self,
2148        state: &Arc<tokio::sync::RwLock<BackupState>>,
2149        scan_job_id: &str,
2150    ) -> MockResponse {
2151        let state = state.read().await;
2152        match state.describe_scan_job(scan_job_id) {
2153            Ok(job) => {
2154                let creation_epoch = job.creation_date.timestamp() as f64
2155                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2156                let completion_epoch = job
2157                    .completion_date
2158                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
2159                wire::serialize_describe_scan_job_response(&wire::DescribeScanJobOutput {
2160                    scan_job_id: Some(job.scan_job_id.clone()),
2161                    backup_vault_name: Some(job.backup_vault_name.clone()),
2162                    backup_vault_arn: Some(job.backup_vault_arn.clone()),
2163                    recovery_point_arn: Some(job.recovery_point_arn.clone()),
2164                    iam_role_arn: Some(job.iam_role_arn.clone()),
2165                    malware_scanner: Some(job.malware_scanner.clone()),
2166                    scan_mode: Some(job.scan_mode.clone()),
2167                    scanner_role_arn: Some(job.scanner_role_arn.clone()),
2168                    scan_base_recovery_point_arn: job.scan_base_recovery_point_arn.clone(),
2169                    state: Some(job.state.clone()),
2170                    creation_date: Some(creation_epoch),
2171                    completion_date: completion_epoch,
2172                    account_id: Some(job.account_id.clone()),
2173                    ..Default::default()
2174                })
2175            }
2176            Err(e) => backup_error_response(&e),
2177        }
2178    }
2179
2180    // MPA approval team operations are no-ops in mock (no persistent state needed)
2181    async fn handle_disassociate_backup_vault_mpa_approval_team(&self) -> MockResponse {
2182        wire::serialize_disassociate_backup_vault_mpa_approval_team_response()
2183    }
2184
2185    // Recovery point disassociation is a no-op in mock
2186    async fn handle_disassociate_recovery_point(&self) -> MockResponse {
2187        wire::serialize_disassociate_recovery_point_response()
2188    }
2189
2190    // Recovery point parent disassociation is a no-op in mock
2191    async fn handle_disassociate_recovery_point_from_parent(&self) -> MockResponse {
2192        wire::serialize_disassociate_recovery_point_from_parent_response()
2193    }
2194
2195    async fn handle_export_backup_plan_template(
2196        &self,
2197        state: &Arc<tokio::sync::RwLock<BackupState>>,
2198        plan_id: &str,
2199    ) -> MockResponse {
2200        let state = state.read().await;
2201        match state.get_backup_plan(plan_id) {
2202            Ok(plan) => {
2203                let template_json = serde_json::to_string(&plan.backup_plan_json)
2204                    .unwrap_or_else(|_| "{}".to_string());
2205                wire::serialize_export_backup_plan_template_response(
2206                    &wire::ExportBackupPlanTemplateOutput {
2207                        backup_plan_template_json: Some(template_json),
2208                    },
2209                )
2210            }
2211            Err(e) => backup_error_response(&e),
2212        }
2213    }
2214
2215    async fn handle_get_backup_plan_from_j_s_o_n(
2216        &self,
2217        request: &MockRequest,
2218        query: &HashMap<String, String>,
2219    ) -> MockResponse {
2220        let input =
2221            match wire::deserialize_get_backup_plan_from_j_s_o_n_request(request, &[], query) {
2222                Ok(v) => v,
2223                Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
2224            };
2225        // Parse the BackupPlanTemplateJson string and return a plan structure
2226        let template_json_str = if input.backup_plan_template_json.is_empty() {
2227            "{}".to_string()
2228        } else {
2229            input.backup_plan_template_json.clone()
2230        };
2231        let parsed: Value =
2232            serde_json::from_str(&template_json_str).unwrap_or(Value::Object(Default::default()));
2233
2234        let plan_name = parsed
2235            .get("BackupPlanName")
2236            .and_then(|v| v.as_str())
2237            .unwrap_or("")
2238            .to_string();
2239
2240        let rules_json = parsed.get("Rules").cloned().unwrap_or(Value::Array(vec![]));
2241        let rules: Vec<wire::BackupRule> = if let Value::Array(arr) = &rules_json {
2242            arr.iter()
2243                .map(|r| wire::BackupRule {
2244                    rule_name: r
2245                        .get("RuleName")
2246                        .and_then(|v| v.as_str())
2247                        .map(|s| s.to_string()),
2248                    target_backup_vault_name: r
2249                        .get("TargetBackupVaultName")
2250                        .and_then(|v| v.as_str())
2251                        .map(|s| s.to_string()),
2252                    schedule_expression: r
2253                        .get("ScheduleExpression")
2254                        .and_then(|v| v.as_str())
2255                        .map(|s| s.to_string()),
2256                    ..Default::default()
2257                })
2258                .collect()
2259        } else {
2260            vec![]
2261        };
2262
2263        wire::serialize_get_backup_plan_from_j_s_o_n_response(&wire::GetBackupPlanFromJSONOutput {
2264            backup_plan: Some(wire::BackupPlan {
2265                backup_plan_name: Some(plan_name),
2266                rules: Some(rules),
2267                ..Default::default()
2268            }),
2269        })
2270    }
2271
2272    // Plan templates are static/built-in - return empty list in mock
2273    async fn handle_get_backup_plan_from_template(&self) -> MockResponse {
2274        wire::serialize_get_backup_plan_from_template_response(
2275            &wire::GetBackupPlanFromTemplateOutput::default(),
2276        )
2277    }
2278
2279    async fn handle_get_backup_selection(
2280        &self,
2281        state: &Arc<tokio::sync::RwLock<BackupState>>,
2282        plan_id: &str,
2283        selection_id: &str,
2284    ) -> MockResponse {
2285        let state = state.read().await;
2286        match state.get_backup_selection(plan_id, selection_id) {
2287            Ok(sel) => {
2288                let creation_epoch = sel.creation_date.timestamp() as f64
2289                    + (sel.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2290                let backup_selection_wire: wire::BackupSelection =
2291                    serde_json::from_value(sel.selection_json.clone()).unwrap_or_else(|_| {
2292                        wire::BackupSelection {
2293                            selection_name: sel.selection_name.clone(),
2294                            iam_role_arn: sel.iam_role_arn.clone(),
2295                            resources: if sel.resources.is_empty() {
2296                                None
2297                            } else {
2298                                Some(sel.resources.clone())
2299                            },
2300                            ..Default::default()
2301                        }
2302                    });
2303                wire::serialize_get_backup_selection_response(&wire::GetBackupSelectionOutput {
2304                    backup_plan_id: Some(sel.backup_plan_id.clone()),
2305                    selection_id: Some(sel.selection_id.clone()),
2306                    creation_date: Some(creation_epoch),
2307                    backup_selection: Some(backup_selection_wire),
2308                    ..Default::default()
2309                })
2310            }
2311            Err(e) => backup_error_response(&e),
2312        }
2313    }
2314
2315    async fn handle_get_backup_vault_access_policy(
2316        &self,
2317        state: &Arc<tokio::sync::RwLock<BackupState>>,
2318        vault_name: &str,
2319    ) -> MockResponse {
2320        let state = state.read().await;
2321        match state.get_backup_vault_access_policy(vault_name) {
2322            Ok(policy) => wire::serialize_get_backup_vault_access_policy_response(
2323                &wire::GetBackupVaultAccessPolicyOutput {
2324                    backup_vault_name: Some(policy.backup_vault_name.clone()),
2325                    backup_vault_arn: Some(policy.backup_vault_arn.clone()),
2326                    policy: Some(policy.policy.clone()),
2327                },
2328            ),
2329            Err(e) => backup_error_response(&e),
2330        }
2331    }
2332
2333    async fn handle_get_backup_vault_notifications(
2334        &self,
2335        state: &Arc<tokio::sync::RwLock<BackupState>>,
2336        vault_name: &str,
2337    ) -> MockResponse {
2338        let state = state.read().await;
2339        match state.get_backup_vault_notifications(vault_name) {
2340            Ok(config) => wire::serialize_get_backup_vault_notifications_response(
2341                &wire::GetBackupVaultNotificationsOutput {
2342                    backup_vault_name: Some(config.backup_vault_name.clone()),
2343                    backup_vault_arn: Some(config.backup_vault_arn.clone()),
2344                    s_n_s_topic_arn: Some(config.sns_topic_arn.clone()),
2345                    backup_vault_events: Some(config.backup_vault_events.clone()),
2346                },
2347            ),
2348            Err(e) => backup_error_response(&e),
2349        }
2350    }
2351
2352    async fn handle_get_legal_hold(
2353        &self,
2354        state: &Arc<tokio::sync::RwLock<BackupState>>,
2355        legal_hold_id: &str,
2356    ) -> MockResponse {
2357        let state = state.read().await;
2358        match state.get_legal_hold(legal_hold_id) {
2359            Ok(hold) => {
2360                let creation_epoch = hold.creation_date.timestamp() as f64
2361                    + (hold.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2362                let cancellation_epoch = hold
2363                    .cancellation_date
2364                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
2365                let rps: Option<wire::RecoveryPointSelection> =
2366                    serde_json::from_value(hold.recovery_point_selection.clone()).ok();
2367                wire::serialize_get_legal_hold_response(&wire::GetLegalHoldOutput {
2368                    legal_hold_id: Some(hold.legal_hold_id.clone()),
2369                    legal_hold_arn: Some(hold.legal_hold_arn.clone()),
2370                    title: Some(hold.title.clone()),
2371                    description: Some(hold.description.clone()),
2372                    status: Some(hold.status.clone()),
2373                    creation_date: Some(creation_epoch),
2374                    cancellation_date: cancellation_epoch,
2375                    recovery_point_selection: rps,
2376                    ..Default::default()
2377                })
2378            }
2379            Err(e) => backup_error_response(&e),
2380        }
2381    }
2382
2383    async fn handle_get_recovery_point_index_details(
2384        &self,
2385        state: &Arc<tokio::sync::RwLock<BackupState>>,
2386        vault_name: &str,
2387        recovery_point_arn: &str,
2388    ) -> MockResponse {
2389        let state = state.read().await;
2390        match state.describe_recovery_point(vault_name, recovery_point_arn) {
2391            Ok(rp) => wire::serialize_get_recovery_point_index_details_response(
2392                &wire::GetRecoveryPointIndexDetailsOutput {
2393                    recovery_point_arn: Some(rp.recovery_point_arn.clone()),
2394                    backup_vault_arn: Some(rp.backup_vault_arn.clone()),
2395                    source_resource_arn: Some(rp.resource_arn.clone()),
2396                    index_status: Some("ACTIVE".to_string()),
2397                    ..Default::default()
2398                },
2399            ),
2400            Err(e) => backup_error_response(&e),
2401        }
2402    }
2403
2404    async fn handle_get_recovery_point_restore_metadata(
2405        &self,
2406        state: &Arc<tokio::sync::RwLock<BackupState>>,
2407        vault_name: &str,
2408        recovery_point_arn: &str,
2409    ) -> MockResponse {
2410        let state = state.read().await;
2411        match state.describe_recovery_point(vault_name, recovery_point_arn) {
2412            Ok(rp) => wire::serialize_get_recovery_point_restore_metadata_response(
2413                &wire::GetRecoveryPointRestoreMetadataOutput {
2414                    recovery_point_arn: Some(rp.recovery_point_arn.clone()),
2415                    backup_vault_arn: Some(rp.backup_vault_arn.clone()),
2416                    resource_type: Some(rp.resource_type.clone()),
2417                    restore_metadata: Some(HashMap::new()),
2418                },
2419            ),
2420            Err(e) => backup_error_response(&e),
2421        }
2422    }
2423
2424    async fn handle_get_restore_job_metadata(
2425        &self,
2426        state: &Arc<tokio::sync::RwLock<BackupState>>,
2427        restore_job_id: &str,
2428    ) -> MockResponse {
2429        let state = state.read().await;
2430        match state.get_restore_job_metadata(restore_job_id) {
2431            Ok(job) => wire::serialize_get_restore_job_metadata_response(
2432                &wire::GetRestoreJobMetadataOutput {
2433                    restore_job_id: Some(job.restore_job_id.clone()),
2434                    metadata: if job.metadata.is_empty() {
2435                        None
2436                    } else {
2437                        Some(job.metadata.clone())
2438                    },
2439                },
2440            ),
2441            Err(e) => backup_error_response(&e),
2442        }
2443    }
2444
2445    // Inferred metadata is a static/computed operation - return empty map in mock
2446    async fn handle_get_restore_testing_inferred_metadata(&self) -> MockResponse {
2447        wire::serialize_get_restore_testing_inferred_metadata_response(
2448            &wire::GetRestoreTestingInferredMetadataOutput {
2449                inferred_metadata: Some(HashMap::new()),
2450            },
2451        )
2452    }
2453
2454    async fn handle_get_restore_testing_plan(
2455        &self,
2456        state: &Arc<tokio::sync::RwLock<BackupState>>,
2457        plan_name: &str,
2458    ) -> MockResponse {
2459        let state = state.read().await;
2460        match state.get_restore_testing_plan(plan_name) {
2461            Ok(plan) => {
2462                let creation_epoch = plan.creation_time.timestamp() as f64
2463                    + (plan.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
2464                let update_epoch = plan.last_update_time.timestamp() as f64
2465                    + (plan.last_update_time.timestamp_subsec_millis() as f64 / 1000.0);
2466                let rps: Option<wire::RestoreTestingRecoveryPointSelection> =
2467                    serde_json::from_value(plan.recovery_point_selection.clone()).ok();
2468                wire::serialize_get_restore_testing_plan_response(
2469                    &wire::GetRestoreTestingPlanOutput {
2470                        restore_testing_plan: Some(wire::RestoreTestingPlanForGet {
2471                            restore_testing_plan_name: Some(plan.restore_testing_plan_name.clone()),
2472                            restore_testing_plan_arn: Some(plan.restore_testing_plan_arn.clone()),
2473                            schedule_expression: Some(plan.schedule_expression.clone()),
2474                            schedule_expression_timezone: plan.schedule_expression_timezone.clone(),
2475                            start_window_hours: plan.start_window_hours,
2476                            recovery_point_selection: rps,
2477                            creator_request_id: plan.creator_request_id.clone(),
2478                            creation_time: Some(creation_epoch),
2479                            last_update_time: Some(update_epoch),
2480                            last_execution_time: None,
2481                        }),
2482                    },
2483                )
2484            }
2485            Err(e) => backup_error_response(&e),
2486        }
2487    }
2488
2489    async fn handle_get_restore_testing_selection(
2490        &self,
2491        state: &Arc<tokio::sync::RwLock<BackupState>>,
2492        plan_name: &str,
2493        selection_name: &str,
2494    ) -> MockResponse {
2495        let state = state.read().await;
2496        match state.get_restore_testing_selection(plan_name, selection_name) {
2497            Ok(sel) => {
2498                let creation_epoch = sel.creation_time.timestamp() as f64
2499                    + (sel.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
2500                let prc: Option<wire::ProtectedResourceConditions> =
2501                    serde_json::from_value(sel.protected_resource_conditions.clone()).ok();
2502                wire::serialize_get_restore_testing_selection_response(
2503                    &wire::GetRestoreTestingSelectionOutput {
2504                        restore_testing_selection: Some(wire::RestoreTestingSelectionForGet {
2505                            restore_testing_selection_name: Some(
2506                                sel.restore_testing_selection_name.clone(),
2507                            ),
2508                            restore_testing_plan_name: Some(sel.restore_testing_plan_name.clone()),
2509                            iam_role_arn: Some(sel.iam_role_arn.clone()),
2510                            protected_resource_type: Some(sel.protected_resource_type.clone()),
2511                            protected_resource_arns: if sel.protected_resource_arns.is_empty() {
2512                                None
2513                            } else {
2514                                Some(sel.protected_resource_arns.clone())
2515                            },
2516                            protected_resource_conditions: prc,
2517                            restore_metadata_overrides: if sel.restore_metadata_overrides.is_empty()
2518                            {
2519                                None
2520                            } else {
2521                                Some(sel.restore_metadata_overrides.clone())
2522                            },
2523                            validation_window_hours: sel.validation_window_hours,
2524                            creator_request_id: sel.creator_request_id.clone(),
2525                            creation_time: Some(creation_epoch),
2526                        }),
2527                    },
2528                )
2529            }
2530            Err(e) => backup_error_response(&e),
2531        }
2532    }
2533
2534    // Supported resource types is a static list
2535    async fn handle_get_supported_resource_types(&self) -> MockResponse {
2536        wire::serialize_get_supported_resource_types_response(
2537            &wire::GetSupportedResourceTypesOutput {
2538                resource_types: Some(vec![
2539                    "EC2".to_string(),
2540                    "EBS".to_string(),
2541                    "RDS".to_string(),
2542                    "DynamoDB".to_string(),
2543                    "EFS".to_string(),
2544                    "S3".to_string(),
2545                ]),
2546            },
2547        )
2548    }
2549
2550    async fn handle_get_tiering_configuration(
2551        &self,
2552        state: &Arc<tokio::sync::RwLock<BackupState>>,
2553        name: &str,
2554    ) -> MockResponse {
2555        let state = state.read().await;
2556        match state.get_tiering_configuration(name) {
2557            Ok(config) => {
2558                let creation_epoch = config.creation_time.timestamp() as f64
2559                    + (config.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
2560                let updated_epoch = config.last_updated_time.timestamp() as f64
2561                    + (config.last_updated_time.timestamp_subsec_millis() as f64 / 1000.0);
2562                let resource_selection: Option<Vec<wire::ResourceSelection>> =
2563                    serde_json::from_value(config.resource_selection.clone()).ok();
2564                wire::serialize_get_tiering_configuration_response(
2565                    &wire::GetTieringConfigurationOutput {
2566                        tiering_configuration: Some(wire::TieringConfiguration {
2567                            tiering_configuration_name: Some(
2568                                config.tiering_configuration_name.clone(),
2569                            ),
2570                            tiering_configuration_arn: Some(
2571                                config.tiering_configuration_arn.clone(),
2572                            ),
2573                            backup_vault_name: Some(config.backup_vault_name.clone()),
2574                            resource_selection,
2575                            creation_time: Some(creation_epoch),
2576                            last_updated_time: Some(updated_epoch),
2577                            creator_request_id: config.creator_request_id.clone(),
2578                        }),
2579                    },
2580                )
2581            }
2582            Err(e) => backup_error_response(&e),
2583        }
2584    }
2585
2586    async fn handle_list_backup_job_summaries(
2587        &self,
2588        state: &Arc<tokio::sync::RwLock<BackupState>>,
2589        account_id: &str,
2590        region: &str,
2591    ) -> MockResponse {
2592        let state = state.read().await;
2593        let jobs = state.list_backup_jobs();
2594        let mut counts: HashMap<String, i32> = HashMap::new();
2595        for job in &jobs {
2596            *counts.entry(job.state.clone()).or_insert(0) += 1;
2597        }
2598        let summaries: Vec<wire::BackupJobSummary> = counts
2599            .into_iter()
2600            .map(|(state_val, count)| wire::BackupJobSummary {
2601                state: Some(state_val),
2602                count: Some(count),
2603                account_id: Some(account_id.to_string()),
2604                region: Some(region.to_string()),
2605                ..Default::default()
2606            })
2607            .collect();
2608        wire::serialize_list_backup_job_summaries_response(&wire::ListBackupJobSummariesOutput {
2609            backup_job_summaries: if summaries.is_empty() {
2610                None
2611            } else {
2612                Some(summaries)
2613            },
2614            ..Default::default()
2615        })
2616    }
2617
2618    async fn handle_list_backup_jobs(
2619        &self,
2620        state: &Arc<tokio::sync::RwLock<BackupState>>,
2621    ) -> MockResponse {
2622        let state = state.read().await;
2623        let jobs = state.list_backup_jobs();
2624        let entries: Vec<wire::BackupJob> = jobs
2625            .iter()
2626            .map(|job| {
2627                let creation_epoch = job.creation_date.timestamp() as f64
2628                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2629                let completion_epoch = job
2630                    .completion_date
2631                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
2632                wire::BackupJob {
2633                    backup_job_id: Some(job.backup_job_id.clone()),
2634                    backup_vault_name: Some(job.backup_vault_name.clone()),
2635                    backup_vault_arn: Some(job.backup_vault_arn.clone()),
2636                    recovery_point_arn: Some(job.recovery_point_arn.clone()),
2637                    resource_arn: Some(job.resource_arn.clone()),
2638                    resource_type: Some(job.resource_type.clone()),
2639                    iam_role_arn: Some(job.iam_role_arn.clone()),
2640                    state: Some(job.state.clone()),
2641                    creation_date: Some(creation_epoch),
2642                    completion_date: completion_epoch,
2643                    account_id: Some(job.account_id.clone()),
2644                    ..Default::default()
2645                }
2646            })
2647            .collect();
2648        wire::serialize_list_backup_jobs_response(&wire::ListBackupJobsOutput {
2649            backup_jobs: Some(entries),
2650            ..Default::default()
2651        })
2652    }
2653
2654    // Plan templates are built-in AWS templates - return empty in mock
2655    async fn handle_list_backup_plan_templates(
2656        &self,
2657        _state: &Arc<tokio::sync::RwLock<BackupState>>,
2658    ) -> MockResponse {
2659        wire::serialize_list_backup_plan_templates_response(&wire::ListBackupPlanTemplatesOutput {
2660            backup_plan_templates_list: Some(vec![]),
2661            ..Default::default()
2662        })
2663    }
2664
2665    async fn handle_list_backup_plan_versions(
2666        &self,
2667        state: &Arc<tokio::sync::RwLock<BackupState>>,
2668        plan_id: &str,
2669    ) -> MockResponse {
2670        let state = state.read().await;
2671        match state.get_backup_plan(plan_id) {
2672            Ok(plan) => {
2673                let creation_epoch = plan.creation_date.timestamp() as f64
2674                    + (plan.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2675                // Return the current version as the only version
2676                let entry = wire::BackupPlansListMember {
2677                    backup_plan_id: Some(plan.backup_plan_id.clone()),
2678                    backup_plan_arn: Some(plan.backup_plan_arn.clone()),
2679                    backup_plan_name: Some(plan.backup_plan_name.clone()),
2680                    creation_date: Some(creation_epoch),
2681                    version_id: Some(plan.version_id.clone()),
2682                    ..Default::default()
2683                };
2684                wire::serialize_list_backup_plan_versions_response(
2685                    &wire::ListBackupPlanVersionsOutput {
2686                        backup_plan_versions_list: Some(vec![entry]),
2687                        ..Default::default()
2688                    },
2689                )
2690            }
2691            Err(e) => backup_error_response(&e),
2692        }
2693    }
2694
2695    async fn handle_list_backup_selections(
2696        &self,
2697        state: &Arc<tokio::sync::RwLock<BackupState>>,
2698        plan_id: &str,
2699    ) -> MockResponse {
2700        let state = state.read().await;
2701        let selections = state.list_backup_selections(plan_id);
2702        let entries: Vec<wire::BackupSelectionsListMember> = selections
2703            .iter()
2704            .map(|sel| {
2705                let creation_epoch = sel.creation_date.timestamp() as f64
2706                    + (sel.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2707                wire::BackupSelectionsListMember {
2708                    backup_plan_id: Some(sel.backup_plan_id.clone()),
2709                    selection_id: Some(sel.selection_id.clone()),
2710                    selection_name: Some(sel.selection_name.clone()),
2711                    iam_role_arn: Some(sel.iam_role_arn.clone()),
2712                    creation_date: Some(creation_epoch),
2713                    ..Default::default()
2714                }
2715            })
2716            .collect();
2717        wire::serialize_list_backup_selections_response(&wire::ListBackupSelectionsOutput {
2718            backup_selections_list: Some(entries),
2719            ..Default::default()
2720        })
2721    }
2722
2723    async fn handle_list_copy_job_summaries(
2724        &self,
2725        state: &Arc<tokio::sync::RwLock<BackupState>>,
2726    ) -> MockResponse {
2727        let state = state.read().await;
2728        let jobs = state.list_copy_jobs();
2729        let mut counts: HashMap<String, i32> = HashMap::new();
2730        for job in &jobs {
2731            *counts.entry(job.state.clone()).or_insert(0) += 1;
2732        }
2733        let summaries: Vec<wire::CopyJobSummary> = counts
2734            .into_iter()
2735            .map(|(state_val, count)| wire::CopyJobSummary {
2736                state: Some(state_val),
2737                count: Some(count),
2738                ..Default::default()
2739            })
2740            .collect();
2741        wire::serialize_list_copy_job_summaries_response(&wire::ListCopyJobSummariesOutput {
2742            copy_job_summaries: if summaries.is_empty() {
2743                None
2744            } else {
2745                Some(summaries)
2746            },
2747            ..Default::default()
2748        })
2749    }
2750
2751    async fn handle_list_copy_jobs(
2752        &self,
2753        state: &Arc<tokio::sync::RwLock<BackupState>>,
2754    ) -> MockResponse {
2755        let state = state.read().await;
2756        let jobs = state.list_copy_jobs();
2757        let entries: Vec<wire::CopyJob> = jobs
2758            .iter()
2759            .map(|job| {
2760                let creation_epoch = job.creation_date.timestamp() as f64
2761                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2762                let completion_epoch = job
2763                    .completion_date
2764                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
2765                wire::CopyJob {
2766                    copy_job_id: Some(job.copy_job_id.clone()),
2767                    source_backup_vault_arn: Some(job.source_backup_vault_arn.clone()),
2768                    source_recovery_point_arn: Some(job.source_recovery_point_arn.clone()),
2769                    destination_backup_vault_arn: Some(job.destination_backup_vault_arn.clone()),
2770                    destination_recovery_point_arn: Some(
2771                        job.destination_recovery_point_arn.clone(),
2772                    ),
2773                    iam_role_arn: Some(job.iam_role_arn.clone()),
2774                    state: Some(job.state.clone()),
2775                    creation_date: Some(creation_epoch),
2776                    completion_date: completion_epoch,
2777                    account_id: Some(job.account_id.clone()),
2778                    ..Default::default()
2779                }
2780            })
2781            .collect();
2782        wire::serialize_list_copy_jobs_response(&wire::ListCopyJobsOutput {
2783            copy_jobs: if entries.is_empty() {
2784                None
2785            } else {
2786                Some(entries)
2787            },
2788            ..Default::default()
2789        })
2790    }
2791
2792    async fn handle_list_frameworks(
2793        &self,
2794        state: &Arc<tokio::sync::RwLock<BackupState>>,
2795    ) -> MockResponse {
2796        let state = state.read().await;
2797        let frameworks = state.list_frameworks();
2798        let entries: Vec<wire::Framework> = frameworks
2799            .iter()
2800            .map(|fw| {
2801                let creation_epoch = fw.creation_time.timestamp() as f64
2802                    + (fw.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
2803                wire::Framework {
2804                    framework_name: Some(fw.framework_name.clone()),
2805                    framework_arn: Some(fw.framework_arn.clone()),
2806                    framework_description: Some(fw.framework_description.clone()),
2807                    number_of_controls: Some(fw.number_of_controls),
2808                    creation_time: Some(creation_epoch),
2809                    deployment_status: Some(fw.deployment_status.clone()),
2810                }
2811            })
2812            .collect();
2813        wire::serialize_list_frameworks_response(&wire::ListFrameworksOutput {
2814            frameworks: Some(entries),
2815            ..Default::default()
2816        })
2817    }
2818
2819    async fn handle_list_indexed_recovery_points(
2820        &self,
2821        state: &Arc<tokio::sync::RwLock<BackupState>>,
2822    ) -> MockResponse {
2823        let state = state.read().await;
2824        let entries: Vec<wire::IndexedRecoveryPoint> = state
2825            .recovery_points
2826            .values()
2827            .map(|rp| {
2828                let creation_epoch = rp.creation_date.timestamp() as f64
2829                    + (rp.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2830                wire::IndexedRecoveryPoint {
2831                    recovery_point_arn: Some(rp.recovery_point_arn.clone()),
2832                    backup_vault_arn: Some(rp.backup_vault_arn.clone()),
2833                    source_resource_arn: Some(rp.resource_arn.clone()),
2834                    resource_type: Some(rp.resource_type.clone()),
2835                    iam_role_arn: Some(rp.iam_role_arn.clone()),
2836                    backup_creation_date: Some(creation_epoch),
2837                    index_status: Some("ACTIVE".to_string()),
2838                    ..Default::default()
2839                }
2840            })
2841            .collect();
2842        wire::serialize_list_indexed_recovery_points_response(
2843            &wire::ListIndexedRecoveryPointsOutput {
2844                indexed_recovery_points: if entries.is_empty() {
2845                    None
2846                } else {
2847                    Some(entries)
2848                },
2849                ..Default::default()
2850            },
2851        )
2852    }
2853
2854    async fn handle_list_legal_holds(
2855        &self,
2856        state: &Arc<tokio::sync::RwLock<BackupState>>,
2857    ) -> MockResponse {
2858        let state = state.read().await;
2859        let holds = state.list_legal_holds();
2860        let entries: Vec<wire::LegalHold> = holds
2861            .iter()
2862            .map(|h| {
2863                let creation_epoch = h.creation_date.timestamp() as f64
2864                    + (h.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2865                let cancellation_epoch = h
2866                    .cancellation_date
2867                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
2868                wire::LegalHold {
2869                    legal_hold_id: Some(h.legal_hold_id.clone()),
2870                    legal_hold_arn: Some(h.legal_hold_arn.clone()),
2871                    title: Some(h.title.clone()),
2872                    description: Some(h.description.clone()),
2873                    status: Some(h.status.clone()),
2874                    creation_date: Some(creation_epoch),
2875                    cancellation_date: cancellation_epoch,
2876                }
2877            })
2878            .collect();
2879        wire::serialize_list_legal_holds_response(&wire::ListLegalHoldsOutput {
2880            legal_holds: if entries.is_empty() {
2881                None
2882            } else {
2883                Some(entries)
2884            },
2885            ..Default::default()
2886        })
2887    }
2888
2889    async fn handle_list_protected_resources(
2890        &self,
2891        state: &Arc<tokio::sync::RwLock<BackupState>>,
2892    ) -> MockResponse {
2893        // Derive protected resources from recovery points
2894        let state = state.read().await;
2895        let mut resource_map: HashMap<String, &crate::types::RecoveryPointData> = HashMap::new();
2896        for rp in state.recovery_points.values() {
2897            let entry = resource_map.get(&rp.resource_arn);
2898            if entry.is_none() || entry.unwrap().creation_date < rp.creation_date {
2899                resource_map.insert(rp.resource_arn.clone(), rp);
2900            }
2901        }
2902        let entries: Vec<wire::ProtectedResource> = resource_map
2903            .values()
2904            .map(|rp| {
2905                let last_backup_epoch = rp.creation_date.timestamp() as f64
2906                    + (rp.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2907                wire::ProtectedResource {
2908                    resource_arn: Some(rp.resource_arn.clone()),
2909                    resource_type: Some(rp.resource_type.clone()),
2910                    last_backup_time: Some(last_backup_epoch),
2911                    last_backup_vault_arn: Some(rp.backup_vault_arn.clone()),
2912                    last_recovery_point_arn: Some(rp.recovery_point_arn.clone()),
2913                    ..Default::default()
2914                }
2915            })
2916            .collect();
2917        wire::serialize_list_protected_resources_response(&wire::ListProtectedResourcesOutput {
2918            results: if entries.is_empty() {
2919                None
2920            } else {
2921                Some(entries)
2922            },
2923            ..Default::default()
2924        })
2925    }
2926
2927    async fn handle_list_protected_resources_by_backup_vault(
2928        &self,
2929        state: &Arc<tokio::sync::RwLock<BackupState>>,
2930        vault_name: &str,
2931    ) -> MockResponse {
2932        let state = state.read().await;
2933        let mut resource_map: HashMap<String, &crate::types::RecoveryPointData> = HashMap::new();
2934        for rp in state
2935            .recovery_points
2936            .values()
2937            .filter(|rp| rp.backup_vault_name == vault_name)
2938        {
2939            let entry = resource_map.get(&rp.resource_arn);
2940            if entry.is_none() || entry.unwrap().creation_date < rp.creation_date {
2941                resource_map.insert(rp.resource_arn.clone(), rp);
2942            }
2943        }
2944        let entries: Vec<wire::ProtectedResource> = resource_map
2945            .values()
2946            .map(|rp| {
2947                let last_backup_epoch = rp.creation_date.timestamp() as f64
2948                    + (rp.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2949                wire::ProtectedResource {
2950                    resource_arn: Some(rp.resource_arn.clone()),
2951                    resource_type: Some(rp.resource_type.clone()),
2952                    last_backup_time: Some(last_backup_epoch),
2953                    last_backup_vault_arn: Some(rp.backup_vault_arn.clone()),
2954                    last_recovery_point_arn: Some(rp.recovery_point_arn.clone()),
2955                    ..Default::default()
2956                }
2957            })
2958            .collect();
2959        wire::serialize_list_protected_resources_by_backup_vault_response(
2960            &wire::ListProtectedResourcesByBackupVaultOutput {
2961                results: if entries.is_empty() {
2962                    None
2963                } else {
2964                    Some(entries)
2965                },
2966                ..Default::default()
2967            },
2968        )
2969    }
2970
2971    async fn handle_list_recovery_points_by_backup_vault(
2972        &self,
2973        state: &Arc<tokio::sync::RwLock<BackupState>>,
2974        vault_name: &str,
2975    ) -> MockResponse {
2976        let state = state.read().await;
2977        let rps = state.list_recovery_points_by_backup_vault(vault_name);
2978        let entries: Vec<wire::RecoveryPointByBackupVault> = rps
2979            .iter()
2980            .map(|rp| {
2981                let creation_epoch = rp.creation_date.timestamp() as f64
2982                    + (rp.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
2983                wire::RecoveryPointByBackupVault {
2984                    recovery_point_arn: Some(rp.recovery_point_arn.clone()),
2985                    backup_vault_name: Some(rp.backup_vault_name.clone()),
2986                    backup_vault_arn: Some(rp.backup_vault_arn.clone()),
2987                    resource_arn: Some(rp.resource_arn.clone()),
2988                    resource_type: Some(rp.resource_type.clone()),
2989                    iam_role_arn: Some(rp.iam_role_arn.clone()),
2990                    status: Some(rp.status.clone()),
2991                    creation_date: Some(creation_epoch),
2992                    backup_size_in_bytes: Some(rp.backup_size_bytes),
2993                    ..Default::default()
2994                }
2995            })
2996            .collect();
2997        wire::serialize_list_recovery_points_by_backup_vault_response(
2998            &wire::ListRecoveryPointsByBackupVaultOutput {
2999                recovery_points: Some(entries),
3000                ..Default::default()
3001            },
3002        )
3003    }
3004
3005    async fn handle_list_recovery_points_by_legal_hold(
3006        &self,
3007        state: &Arc<tokio::sync::RwLock<BackupState>>,
3008        _legal_hold_id: &str,
3009    ) -> MockResponse {
3010        // In mock, legal holds don't actually filter recovery points - return all
3011        let state = state.read().await;
3012        let entries: Vec<wire::RecoveryPointMember> = state
3013            .recovery_points
3014            .values()
3015            .map(|rp| wire::RecoveryPointMember {
3016                recovery_point_arn: Some(rp.recovery_point_arn.clone()),
3017                backup_vault_name: Some(rp.backup_vault_name.clone()),
3018                resource_arn: Some(rp.resource_arn.clone()),
3019                resource_type: Some(rp.resource_type.clone()),
3020            })
3021            .collect();
3022        wire::serialize_list_recovery_points_by_legal_hold_response(
3023            &wire::ListRecoveryPointsByLegalHoldOutput {
3024                recovery_points: if entries.is_empty() {
3025                    None
3026                } else {
3027                    Some(entries)
3028                },
3029                ..Default::default()
3030            },
3031        )
3032    }
3033
3034    async fn handle_list_recovery_points_by_resource(
3035        &self,
3036        state: &Arc<tokio::sync::RwLock<BackupState>>,
3037        resource_arn: &str,
3038    ) -> MockResponse {
3039        let state = state.read().await;
3040        let entries: Vec<wire::RecoveryPointByResource> = state
3041            .recovery_points
3042            .values()
3043            .filter(|rp| rp.resource_arn == resource_arn)
3044            .map(|rp| {
3045                let creation_epoch = rp.creation_date.timestamp() as f64
3046                    + (rp.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
3047                wire::RecoveryPointByResource {
3048                    recovery_point_arn: Some(rp.recovery_point_arn.clone()),
3049                    backup_vault_name: Some(rp.backup_vault_name.clone()),
3050                    backup_size_bytes: Some(rp.backup_size_bytes),
3051                    creation_date: Some(creation_epoch),
3052                    status: Some(rp.status.clone()),
3053                    ..Default::default()
3054                }
3055            })
3056            .collect();
3057        wire::serialize_list_recovery_points_by_resource_response(
3058            &wire::ListRecoveryPointsByResourceOutput {
3059                recovery_points: if entries.is_empty() {
3060                    None
3061                } else {
3062                    Some(entries)
3063                },
3064                ..Default::default()
3065            },
3066        )
3067    }
3068
3069    async fn handle_list_report_jobs(
3070        &self,
3071        state: &Arc<tokio::sync::RwLock<BackupState>>,
3072        uri: &str,
3073    ) -> MockResponse {
3074        // Extract optional ReportPlanName query param
3075        let report_plan_name: Option<String> =
3076            uri.find('?').map(|i| &uri[i + 1..]).and_then(|qs| {
3077                qs.split('&').find_map(|kv| {
3078                    let (k, v) = kv.split_once('=')?;
3079                    if k == "ReportPlanName" {
3080                        Some(percent_decode(v))
3081                    } else {
3082                        None
3083                    }
3084                })
3085            });
3086
3087        let state = state.read().await;
3088        let jobs = state.list_report_jobs(report_plan_name.as_deref());
3089        let entries: Vec<wire::ReportJob> = jobs
3090            .iter()
3091            .map(|j| {
3092                let creation_epoch = j.creation_time.timestamp() as f64
3093                    + (j.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
3094                let completion_epoch = j
3095                    .completion_time
3096                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
3097                wire::ReportJob {
3098                    report_job_id: Some(j.report_job_id.clone()),
3099                    report_plan_arn: Some(j.report_plan_arn.clone()),
3100                    report_template: Some(j.report_template.clone()),
3101                    creation_time: Some(creation_epoch),
3102                    completion_time: completion_epoch,
3103                    status: Some(j.status.clone()),
3104                    ..Default::default()
3105                }
3106            })
3107            .collect();
3108        wire::serialize_list_report_jobs_response(&wire::ListReportJobsOutput {
3109            report_jobs: if entries.is_empty() {
3110                None
3111            } else {
3112                Some(entries)
3113            },
3114            ..Default::default()
3115        })
3116    }
3117
3118    // Restore access backup vaults are a subset feature - return empty list in mock
3119    async fn handle_list_restore_access_backup_vaults(&self) -> MockResponse {
3120        wire::serialize_list_restore_access_backup_vaults_response(
3121            &wire::ListRestoreAccessBackupVaultsOutput {
3122                restore_access_backup_vaults: Some(vec![]),
3123                ..Default::default()
3124            },
3125        )
3126    }
3127
3128    async fn handle_list_restore_job_summaries(
3129        &self,
3130        state: &Arc<tokio::sync::RwLock<BackupState>>,
3131        account_id: &str,
3132        region: &str,
3133    ) -> MockResponse {
3134        let state = state.read().await;
3135        let jobs = state.list_restore_jobs();
3136        let mut counts: HashMap<String, i32> = HashMap::new();
3137        for job in &jobs {
3138            *counts.entry(job.status.clone()).or_insert(0) += 1;
3139        }
3140        let summaries: Vec<wire::RestoreJobSummary> = counts
3141            .into_iter()
3142            .map(|(state_val, count)| wire::RestoreJobSummary {
3143                state: Some(state_val),
3144                count: Some(count),
3145                account_id: Some(account_id.to_string()),
3146                region: Some(region.to_string()),
3147                ..Default::default()
3148            })
3149            .collect();
3150        wire::serialize_list_restore_job_summaries_response(&wire::ListRestoreJobSummariesOutput {
3151            restore_job_summaries: if summaries.is_empty() {
3152                None
3153            } else {
3154                Some(summaries)
3155            },
3156            ..Default::default()
3157        })
3158    }
3159
3160    async fn handle_list_restore_jobs(
3161        &self,
3162        state: &Arc<tokio::sync::RwLock<BackupState>>,
3163    ) -> MockResponse {
3164        let state = state.read().await;
3165        let jobs = state.list_restore_jobs();
3166        let entries: Vec<wire::RestoreJobsListMember> = jobs
3167            .iter()
3168            .map(|job| {
3169                let creation_epoch = job.creation_date.timestamp() as f64
3170                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
3171                let completion_epoch = job
3172                    .completion_date
3173                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
3174                wire::RestoreJobsListMember {
3175                    restore_job_id: Some(job.restore_job_id.clone()),
3176                    recovery_point_arn: Some(job.recovery_point_arn.clone()),
3177                    resource_type: Some(job.resource_type.clone()),
3178                    iam_role_arn: Some(job.iam_role_arn.clone()),
3179                    status: Some(job.status.clone()),
3180                    creation_date: Some(creation_epoch),
3181                    completion_date: completion_epoch,
3182                    backup_size_in_bytes: Some(job.backup_size_in_bytes),
3183                    account_id: Some(job.account_id.clone()),
3184                    validation_status: job.validation_status.clone(),
3185                    validation_status_message: job.validation_status_message.clone(),
3186                    ..Default::default()
3187                }
3188            })
3189            .collect();
3190        wire::serialize_list_restore_jobs_response(&wire::ListRestoreJobsOutput {
3191            restore_jobs: if entries.is_empty() {
3192                None
3193            } else {
3194                Some(entries)
3195            },
3196            ..Default::default()
3197        })
3198    }
3199
3200    async fn handle_list_restore_jobs_by_protected_resource(
3201        &self,
3202        state: &Arc<tokio::sync::RwLock<BackupState>>,
3203        resource_arn: &str,
3204    ) -> MockResponse {
3205        let state = state.read().await;
3206        let jobs = state.list_restore_jobs_by_recovery_point(resource_arn);
3207        let entries: Vec<wire::RestoreJobsListMember> = jobs
3208            .iter()
3209            .map(|job| {
3210                let creation_epoch = job.creation_date.timestamp() as f64
3211                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
3212                let completion_epoch = job
3213                    .completion_date
3214                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
3215                wire::RestoreJobsListMember {
3216                    restore_job_id: Some(job.restore_job_id.clone()),
3217                    recovery_point_arn: Some(job.recovery_point_arn.clone()),
3218                    resource_type: Some(job.resource_type.clone()),
3219                    iam_role_arn: Some(job.iam_role_arn.clone()),
3220                    status: Some(job.status.clone()),
3221                    creation_date: Some(creation_epoch),
3222                    completion_date: completion_epoch,
3223                    backup_size_in_bytes: Some(job.backup_size_in_bytes),
3224                    account_id: Some(job.account_id.clone()),
3225                    ..Default::default()
3226                }
3227            })
3228            .collect();
3229        wire::serialize_list_restore_jobs_by_protected_resource_response(
3230            &wire::ListRestoreJobsByProtectedResourceOutput {
3231                restore_jobs: if entries.is_empty() {
3232                    None
3233                } else {
3234                    Some(entries)
3235                },
3236                ..Default::default()
3237            },
3238        )
3239    }
3240
3241    async fn handle_list_restore_testing_plans(
3242        &self,
3243        state: &Arc<tokio::sync::RwLock<BackupState>>,
3244    ) -> MockResponse {
3245        let state = state.read().await;
3246        let plans = state.list_restore_testing_plans();
3247        let entries: Vec<wire::RestoreTestingPlanForList> = plans
3248            .iter()
3249            .map(|p| {
3250                let creation_epoch = p.creation_time.timestamp() as f64
3251                    + (p.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
3252                let update_epoch = p.last_update_time.timestamp() as f64
3253                    + (p.last_update_time.timestamp_subsec_millis() as f64 / 1000.0);
3254                wire::RestoreTestingPlanForList {
3255                    restore_testing_plan_name: Some(p.restore_testing_plan_name.clone()),
3256                    restore_testing_plan_arn: Some(p.restore_testing_plan_arn.clone()),
3257                    schedule_expression: Some(p.schedule_expression.clone()),
3258                    creation_time: Some(creation_epoch),
3259                    last_update_time: Some(update_epoch),
3260                    last_execution_time: None,
3261                    ..Default::default()
3262                }
3263            })
3264            .collect();
3265        wire::serialize_list_restore_testing_plans_response(&wire::ListRestoreTestingPlansOutput {
3266            restore_testing_plans: if entries.is_empty() {
3267                None
3268            } else {
3269                Some(entries)
3270            },
3271            ..Default::default()
3272        })
3273    }
3274
3275    async fn handle_list_restore_testing_selections(
3276        &self,
3277        state: &Arc<tokio::sync::RwLock<BackupState>>,
3278        plan_name: &str,
3279    ) -> MockResponse {
3280        let state = state.read().await;
3281        let selections = state.list_restore_testing_selections(plan_name);
3282        let entries: Vec<wire::RestoreTestingSelectionForList> = selections
3283            .iter()
3284            .map(|s| {
3285                let creation_epoch = s.creation_time.timestamp() as f64
3286                    + (s.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
3287                wire::RestoreTestingSelectionForList {
3288                    restore_testing_selection_name: Some(s.restore_testing_selection_name.clone()),
3289                    restore_testing_plan_name: Some(s.restore_testing_plan_name.clone()),
3290                    iam_role_arn: Some(s.iam_role_arn.clone()),
3291                    protected_resource_type: Some(s.protected_resource_type.clone()),
3292                    creation_time: Some(creation_epoch),
3293                    validation_window_hours: s.validation_window_hours,
3294                }
3295            })
3296            .collect();
3297        wire::serialize_list_restore_testing_selections_response(
3298            &wire::ListRestoreTestingSelectionsOutput {
3299                restore_testing_selections: if entries.is_empty() {
3300                    None
3301                } else {
3302                    Some(entries)
3303                },
3304                ..Default::default()
3305            },
3306        )
3307    }
3308
3309    async fn handle_list_scan_job_summaries(
3310        &self,
3311        state: &Arc<tokio::sync::RwLock<BackupState>>,
3312    ) -> MockResponse {
3313        // Return count summary per state from stored scan jobs.
3314        let state = state.read().await;
3315        let jobs = state.list_scan_jobs();
3316        let mut counts: HashMap<String, i32> = HashMap::new();
3317        for job in &jobs {
3318            *counts.entry(job.state.clone()).or_insert(0) += 1;
3319        }
3320        let summaries: Vec<wire::ScanJobSummary> = counts
3321            .into_iter()
3322            .map(|(state_val, count)| wire::ScanJobSummary {
3323                state: Some(state_val),
3324                count: Some(count),
3325                ..Default::default()
3326            })
3327            .collect();
3328        wire::serialize_list_scan_job_summaries_response(&wire::ListScanJobSummariesOutput {
3329            scan_job_summaries: if summaries.is_empty() {
3330                None
3331            } else {
3332                Some(summaries)
3333            },
3334            ..Default::default()
3335        })
3336    }
3337
3338    async fn handle_list_scan_jobs(
3339        &self,
3340        state: &Arc<tokio::sync::RwLock<BackupState>>,
3341    ) -> MockResponse {
3342        let state = state.read().await;
3343        let jobs = state.list_scan_jobs();
3344        let entries: Vec<wire::ScanJob> = jobs
3345            .iter()
3346            .map(|job| {
3347                let creation_epoch = job.creation_date.timestamp() as f64
3348                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
3349                let completion_epoch = job
3350                    .completion_date
3351                    .map(|d| d.timestamp() as f64 + (d.timestamp_subsec_millis() as f64 / 1000.0));
3352                wire::ScanJob {
3353                    scan_job_id: Some(job.scan_job_id.clone()),
3354                    backup_vault_name: Some(job.backup_vault_name.clone()),
3355                    backup_vault_arn: Some(job.backup_vault_arn.clone()),
3356                    recovery_point_arn: Some(job.recovery_point_arn.clone()),
3357                    iam_role_arn: Some(job.iam_role_arn.clone()),
3358                    malware_scanner: Some(job.malware_scanner.clone()),
3359                    scan_mode: Some(job.scan_mode.clone()),
3360                    scanner_role_arn: Some(job.scanner_role_arn.clone()),
3361                    scan_base_recovery_point_arn: job.scan_base_recovery_point_arn.clone(),
3362                    state: Some(job.state.clone()),
3363                    creation_date: Some(creation_epoch),
3364                    completion_date: completion_epoch,
3365                    account_id: Some(job.account_id.clone()),
3366                    ..Default::default()
3367                }
3368            })
3369            .collect();
3370        wire::serialize_list_scan_jobs_response(&wire::ListScanJobsOutput {
3371            scan_jobs: if entries.is_empty() {
3372                None
3373            } else {
3374                Some(entries)
3375            },
3376            ..Default::default()
3377        })
3378    }
3379
3380    async fn handle_list_tiering_configurations(
3381        &self,
3382        state: &Arc<tokio::sync::RwLock<BackupState>>,
3383    ) -> MockResponse {
3384        let state = state.read().await;
3385        let configs = state.list_tiering_configurations();
3386        let entries: Vec<wire::TieringConfigurationsListMember> = configs
3387            .iter()
3388            .map(|c| {
3389                let creation_epoch = c.creation_time.timestamp() as f64
3390                    + (c.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
3391                wire::TieringConfigurationsListMember {
3392                    tiering_configuration_name: Some(c.tiering_configuration_name.clone()),
3393                    tiering_configuration_arn: Some(c.tiering_configuration_arn.clone()),
3394                    backup_vault_name: Some(c.backup_vault_name.clone()),
3395                    creation_time: Some(creation_epoch),
3396                    ..Default::default()
3397                }
3398            })
3399            .collect();
3400        wire::serialize_list_tiering_configurations_response(
3401            &wire::ListTieringConfigurationsOutput {
3402                tiering_configurations: if entries.is_empty() {
3403                    None
3404                } else {
3405                    Some(entries)
3406                },
3407                ..Default::default()
3408            },
3409        )
3410    }
3411
3412    async fn handle_put_backup_vault_access_policy(
3413        &self,
3414        state: &Arc<tokio::sync::RwLock<BackupState>>,
3415        vault_name: &str,
3416        region: &str,
3417        account_id: &str,
3418        request: &MockRequest,
3419        query: &HashMap<String, String>,
3420    ) -> MockResponse {
3421        let labels: &[(&str, &str)] = &[("BackupVaultName", vault_name)];
3422        let input = match wire::deserialize_put_backup_vault_access_policy_request(
3423            request, labels, query,
3424        ) {
3425            Ok(v) => v,
3426            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3427        };
3428        let policy = input.policy.clone().unwrap_or_default();
3429        let vault_arn = format!("arn:aws:backup:{region}:{account_id}:backup-vault:{vault_name}");
3430        let mut state = state.write().await;
3431        match state.put_backup_vault_access_policy(vault_name, &vault_arn, &policy) {
3432            Ok(()) => wire::serialize_put_backup_vault_access_policy_response(),
3433            Err(e) => backup_error_response(&e),
3434        }
3435    }
3436
3437    async fn handle_put_backup_vault_notifications(
3438        &self,
3439        state: &Arc<tokio::sync::RwLock<BackupState>>,
3440        vault_name: &str,
3441        region: &str,
3442        account_id: &str,
3443        request: &MockRequest,
3444        query: &HashMap<String, String>,
3445    ) -> MockResponse {
3446        let labels: &[(&str, &str)] = &[("BackupVaultName", vault_name)];
3447        let input = match wire::deserialize_put_backup_vault_notifications_request(
3448            request, labels, query,
3449        ) {
3450            Ok(v) => v,
3451            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3452        };
3453        let sns_topic_arn = input.s_n_s_topic_arn.clone();
3454        let backup_vault_events: Vec<String> = input.backup_vault_events.clone();
3455        let vault_arn = format!("arn:aws:backup:{region}:{account_id}:backup-vault:{vault_name}");
3456        let mut state = state.write().await;
3457        match state.put_backup_vault_notifications(
3458            vault_name,
3459            &vault_arn,
3460            &sns_topic_arn,
3461            backup_vault_events,
3462        ) {
3463            Ok(()) => wire::serialize_put_backup_vault_notifications_response(),
3464            Err(e) => backup_error_response(&e),
3465        }
3466    }
3467
3468    async fn handle_put_restore_validation_result(
3469        &self,
3470        state: &Arc<tokio::sync::RwLock<BackupState>>,
3471        restore_job_id: &str,
3472        request: &MockRequest,
3473        query: &HashMap<String, String>,
3474    ) -> MockResponse {
3475        let labels: &[(&str, &str)] = &[("RestoreJobId", restore_job_id)];
3476        let input =
3477            match wire::deserialize_put_restore_validation_result_request(request, labels, query) {
3478                Ok(v) => v,
3479                Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3480            };
3481        let validation_status = input.validation_status.clone();
3482        let validation_status_message = input.validation_status_message.clone();
3483        let mut state = state.write().await;
3484        match state.put_restore_validation_result(
3485            restore_job_id,
3486            &validation_status,
3487            validation_status_message.as_deref(),
3488        ) {
3489            Ok(()) => wire::serialize_put_restore_validation_result_response(),
3490            Err(e) => backup_error_response(&e),
3491        }
3492    }
3493
3494    // Revoke restore access is a no-op in mock
3495    async fn handle_revoke_restore_access_backup_vault(&self) -> MockResponse {
3496        wire::serialize_revoke_restore_access_backup_vault_response()
3497    }
3498
3499    async fn handle_start_backup_job(
3500        &self,
3501        state: &Arc<tokio::sync::RwLock<BackupState>>,
3502        request: &MockRequest,
3503        query: &HashMap<String, String>,
3504        region: &str,
3505        account_id: &str,
3506    ) -> MockResponse {
3507        let input = match wire::deserialize_start_backup_job_request(request, &[], query) {
3508            Ok(v) => v,
3509            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3510        };
3511        if input.backup_vault_name.is_empty() {
3512            return rest_json_error(
3513                400,
3514                "InvalidParameterValueException",
3515                "Missing 'BackupVaultName'",
3516            );
3517        }
3518        let vault_name = input.backup_vault_name.clone();
3519        let resource_arn = input.resource_arn.clone();
3520        let iam_role_arn = input.iam_role_arn.clone();
3521        // ResourceType is not part of the StartBackupJob Smithy input shape — fall
3522        // back to inspecting the raw body for backwards compatibility.
3523        let raw: Value = if request.body.is_empty() {
3524            Value::Null
3525        } else {
3526            serde_json::from_slice(&request.body).unwrap_or(Value::Null)
3527        };
3528        let resource_type = raw
3529            .get("ResourceType")
3530            .and_then(|v| v.as_str())
3531            .unwrap_or("")
3532            .to_string();
3533        let vault_arn = format!("arn:aws:backup:{region}:{account_id}:backup-vault:{vault_name}");
3534
3535        let mut state = state.write().await;
3536        match state.start_backup_job(
3537            &vault_name,
3538            &vault_arn,
3539            &resource_arn,
3540            &resource_type,
3541            &iam_role_arn,
3542            account_id,
3543            region,
3544        ) {
3545            Ok(job) => {
3546                let creation_epoch = job.creation_date.timestamp() as f64
3547                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
3548                wire::serialize_start_backup_job_response(&wire::StartBackupJobOutput {
3549                    backup_job_id: Some(job.backup_job_id.clone()),
3550                    recovery_point_arn: Some(job.recovery_point_arn.clone()),
3551                    creation_date: Some(creation_epoch),
3552                    ..Default::default()
3553                })
3554            }
3555            Err(e) => backup_error_response(&e),
3556        }
3557    }
3558
3559    async fn handle_start_copy_job(
3560        &self,
3561        state: &Arc<tokio::sync::RwLock<BackupState>>,
3562        request: &MockRequest,
3563        query: &HashMap<String, String>,
3564        region: &str,
3565        account_id: &str,
3566    ) -> MockResponse {
3567        let input = match wire::deserialize_start_copy_job_request(request, &[], query) {
3568            Ok(v) => v,
3569            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3570        };
3571        let source_backup_vault_name = input.source_backup_vault_name.clone();
3572        let recovery_point_arn = input.recovery_point_arn.clone();
3573        let destination_backup_vault_arn = input.destination_backup_vault_arn.clone();
3574        let iam_role_arn = input.iam_role_arn.clone();
3575
3576        let mut state = state.write().await;
3577        match state.start_copy_job(
3578            &source_backup_vault_name,
3579            &recovery_point_arn,
3580            &destination_backup_vault_arn,
3581            &iam_role_arn,
3582            account_id,
3583            region,
3584        ) {
3585            Ok(job) => {
3586                let creation_epoch = job.creation_date.timestamp() as f64
3587                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
3588                wire::serialize_start_copy_job_response(&wire::StartCopyJobOutput {
3589                    copy_job_id: Some(job.copy_job_id.clone()),
3590                    creation_date: Some(creation_epoch),
3591                    is_parent: Some(false),
3592                })
3593            }
3594            Err(e) => backup_error_response(&e),
3595        }
3596    }
3597
3598    async fn handle_start_report_job(
3599        &self,
3600        state: &Arc<tokio::sync::RwLock<BackupState>>,
3601        report_plan_name: &str,
3602        region: &str,
3603        account_id: &str,
3604    ) -> MockResponse {
3605        let mut state = state.write().await;
3606        match state.start_report_job(report_plan_name, region, account_id) {
3607            Ok(job) => wire::serialize_start_report_job_response(&wire::StartReportJobOutput {
3608                report_job_id: Some(job.report_job_id.clone()),
3609            }),
3610            Err(e) => backup_error_response(&e),
3611        }
3612    }
3613
3614    async fn handle_start_restore_job(
3615        &self,
3616        state: &Arc<tokio::sync::RwLock<BackupState>>,
3617        request: &MockRequest,
3618        query: &HashMap<String, String>,
3619        account_id: &str,
3620    ) -> MockResponse {
3621        let input = match wire::deserialize_start_restore_job_request(request, &[], query) {
3622            Ok(v) => v,
3623            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3624        };
3625        let recovery_point_arn = input.recovery_point_arn.clone();
3626        let iam_role_arn = input.iam_role_arn.clone().unwrap_or_default();
3627        let resource_type = input.resource_type.clone().unwrap_or_default();
3628        let metadata: HashMap<String, String> = input.metadata.clone();
3629
3630        let mut state = state.write().await;
3631        match state.start_restore_job(
3632            &recovery_point_arn,
3633            &iam_role_arn,
3634            &resource_type,
3635            metadata,
3636            account_id,
3637        ) {
3638            Ok(job) => wire::serialize_start_restore_job_response(&wire::StartRestoreJobOutput {
3639                restore_job_id: Some(job.restore_job_id.clone()),
3640            }),
3641            Err(e) => backup_error_response(&e),
3642        }
3643    }
3644
3645    async fn handle_start_scan_job(
3646        &self,
3647        state: &Arc<tokio::sync::RwLock<BackupState>>,
3648        request: &MockRequest,
3649        query: &HashMap<String, String>,
3650        region: &str,
3651        account_id: &str,
3652    ) -> MockResponse {
3653        let input = match wire::deserialize_start_scan_job_request(request, &[], query) {
3654            Ok(v) => v,
3655            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3656        };
3657        let vault_name = input.backup_vault_name.clone();
3658        let vault_arn = format!("arn:aws:backup:{region}:{account_id}:backup-vault:{vault_name}");
3659        let recovery_point_arn = input.recovery_point_arn.clone();
3660        let iam_role_arn = input.iam_role_arn.clone();
3661        let malware_scanner = input.malware_scanner.clone();
3662        let scan_mode = input.scan_mode.clone();
3663        let scanner_role_arn = input.scanner_role_arn.clone();
3664        let scan_base_recovery_point_arn = input.scan_base_recovery_point_arn.clone();
3665
3666        let mut state = state.write().await;
3667        match state.start_scan_job(
3668            &vault_name,
3669            &vault_arn,
3670            &recovery_point_arn,
3671            &iam_role_arn,
3672            &malware_scanner,
3673            &scan_mode,
3674            &scanner_role_arn,
3675            scan_base_recovery_point_arn,
3676            account_id,
3677            region,
3678        ) {
3679            Ok(job) => {
3680                let creation_epoch = job.creation_date.timestamp() as f64
3681                    + (job.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
3682                wire::serialize_start_scan_job_response(&wire::StartScanJobOutput {
3683                    scan_job_id: Some(job.scan_job_id.clone()),
3684                    creation_date: Some(creation_epoch),
3685                })
3686            }
3687            Err(e) => backup_error_response(&e),
3688        }
3689    }
3690
3691    async fn handle_stop_backup_job(
3692        &self,
3693        state: &Arc<tokio::sync::RwLock<BackupState>>,
3694        job_id: &str,
3695    ) -> MockResponse {
3696        let mut state = state.write().await;
3697        match state.stop_backup_job(job_id) {
3698            Ok(()) => wire::serialize_stop_backup_job_response(),
3699            Err(e) => backup_error_response(&e),
3700        }
3701    }
3702
3703    async fn handle_update_backup_plan(
3704        &self,
3705        state: &Arc<tokio::sync::RwLock<BackupState>>,
3706        plan_id: &str,
3707        request: &MockRequest,
3708        query: &HashMap<String, String>,
3709    ) -> MockResponse {
3710        let raw: Value = if request.body.is_empty() {
3711            Value::Null
3712        } else {
3713            serde_json::from_slice(&request.body).unwrap_or(Value::Null)
3714        };
3715        if raw.get("BackupPlan").is_none() {
3716            return rest_json_error(
3717                400,
3718                "InvalidParameterValueException",
3719                "Missing 'BackupPlan'",
3720            );
3721        }
3722        let labels: &[(&str, &str)] = &[("BackupPlanId", plan_id)];
3723        let input = match wire::deserialize_update_backup_plan_request(request, labels, query) {
3724            Ok(v) => v,
3725            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3726        };
3727        let backup_plan = serde_json::to_value(&input.backup_plan).unwrap_or(Value::Null);
3728
3729        let mut state = state.write().await;
3730        match state.update_backup_plan(plan_id, &backup_plan) {
3731            Ok(plan) => {
3732                let creation_epoch = plan.creation_date.timestamp() as f64
3733                    + (plan.creation_date.timestamp_subsec_millis() as f64 / 1000.0);
3734                wire::serialize_update_backup_plan_response(&wire::UpdateBackupPlanOutput {
3735                    backup_plan_id: Some(plan.backup_plan_id.clone()),
3736                    backup_plan_arn: Some(plan.backup_plan_arn.clone()),
3737                    creation_date: Some(creation_epoch),
3738                    version_id: Some(plan.version_id.clone()),
3739                    ..Default::default()
3740                })
3741            }
3742            Err(e) => backup_error_response(&e),
3743        }
3744    }
3745
3746    async fn handle_update_framework(
3747        &self,
3748        state: &Arc<tokio::sync::RwLock<BackupState>>,
3749        name: &str,
3750        request: &MockRequest,
3751        query: &HashMap<String, String>,
3752    ) -> MockResponse {
3753        let labels: &[(&str, &str)] = &[("FrameworkName", name)];
3754        let input = match wire::deserialize_update_framework_request(request, labels, query) {
3755            Ok(v) => v,
3756            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3757        };
3758        let description = input.framework_description.clone();
3759        let controls = input
3760            .framework_controls
3761            .as_ref()
3762            .map(|v| serde_json::to_value(v).unwrap_or(Value::Null));
3763        let mut state = state.write().await;
3764        match state.update_framework(name, description.as_deref(), controls) {
3765            Ok(fw) => {
3766                let creation_epoch = fw.creation_time.timestamp() as f64
3767                    + (fw.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
3768                wire::serialize_update_framework_response(&wire::UpdateFrameworkOutput {
3769                    framework_name: Some(fw.framework_name.clone()),
3770                    framework_arn: Some(fw.framework_arn.clone()),
3771                    creation_time: Some(creation_epoch),
3772                    ..Default::default()
3773                })
3774            }
3775            Err(e) => backup_error_response(&e),
3776        }
3777    }
3778
3779    async fn handle_update_global_settings(
3780        &self,
3781        state: &Arc<tokio::sync::RwLock<BackupState>>,
3782        request: &MockRequest,
3783        query: &HashMap<String, String>,
3784    ) -> MockResponse {
3785        let input = match wire::deserialize_update_global_settings_request(request, &[], query) {
3786            Ok(v) => v,
3787            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3788        };
3789        let settings: HashMap<String, String> = input.global_settings.unwrap_or_default();
3790        let mut state = state.write().await;
3791        state.update_global_settings(settings);
3792        wire::serialize_update_global_settings_response()
3793    }
3794
3795    async fn handle_update_recovery_point_index_settings(
3796        &self,
3797        state: &Arc<tokio::sync::RwLock<BackupState>>,
3798        vault_name: &str,
3799        recovery_point_arn: &str,
3800        request: &MockRequest,
3801        query: &HashMap<String, String>,
3802    ) -> MockResponse {
3803        let labels: &[(&str, &str)] = &[
3804            ("BackupVaultName", vault_name),
3805            ("RecoveryPointArn", recovery_point_arn),
3806        ];
3807        let input = match wire::deserialize_update_recovery_point_index_settings_request(
3808            request, labels, query,
3809        ) {
3810            Ok(v) => v,
3811            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3812        };
3813        let index = if input.index.is_empty() {
3814            "ENABLED".to_string()
3815        } else {
3816            input.index.clone()
3817        };
3818        let state = state.read().await;
3819        match state.describe_recovery_point(vault_name, recovery_point_arn) {
3820            Ok(rp) => wire::serialize_update_recovery_point_index_settings_response(
3821                &wire::UpdateRecoveryPointIndexSettingsOutput {
3822                    backup_vault_name: Some(rp.backup_vault_name.clone()),
3823                    recovery_point_arn: Some(rp.recovery_point_arn.clone()),
3824                    index: Some(index),
3825                    index_status: Some("ACTIVE".to_string()),
3826                },
3827            ),
3828            Err(e) => backup_error_response(&e),
3829        }
3830    }
3831
3832    async fn handle_update_recovery_point_lifecycle(
3833        &self,
3834        state: &Arc<tokio::sync::RwLock<BackupState>>,
3835        vault_name: &str,
3836        recovery_point_arn: &str,
3837        request: &MockRequest,
3838        query: &HashMap<String, String>,
3839    ) -> MockResponse {
3840        let labels: &[(&str, &str)] = &[
3841            ("BackupVaultName", vault_name),
3842            ("RecoveryPointArn", recovery_point_arn),
3843        ];
3844        let input =
3845            match wire::deserialize_update_recovery_point_lifecycle_request(request, labels, query)
3846            {
3847                Ok(v) => v,
3848                Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3849            };
3850        let lifecycle_wire = input.lifecycle.clone();
3851        let state = state.read().await;
3852        match state.describe_recovery_point(vault_name, recovery_point_arn) {
3853            Ok(rp) => wire::serialize_update_recovery_point_lifecycle_response(
3854                &wire::UpdateRecoveryPointLifecycleOutput {
3855                    backup_vault_arn: Some(rp.backup_vault_arn.clone()),
3856                    recovery_point_arn: Some(rp.recovery_point_arn.clone()),
3857                    lifecycle: lifecycle_wire,
3858                    ..Default::default()
3859                },
3860            ),
3861            Err(e) => backup_error_response(&e),
3862        }
3863    }
3864
3865    async fn handle_update_region_settings(
3866        &self,
3867        state: &Arc<tokio::sync::RwLock<BackupState>>,
3868        request: &MockRequest,
3869        query: &HashMap<String, String>,
3870    ) -> MockResponse {
3871        let input = match wire::deserialize_update_region_settings_request(request, &[], query) {
3872            Ok(v) => v,
3873            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3874        };
3875        let opt_in = input.resource_type_opt_in_preference.clone();
3876        let management = input.resource_type_management_preference.clone();
3877        let mut state = state.write().await;
3878        state.update_region_settings(opt_in, management);
3879        wire::serialize_update_region_settings_response()
3880    }
3881
3882    async fn handle_update_report_plan(
3883        &self,
3884        state: &Arc<tokio::sync::RwLock<BackupState>>,
3885        name: &str,
3886        request: &MockRequest,
3887        query: &HashMap<String, String>,
3888    ) -> MockResponse {
3889        let labels: &[(&str, &str)] = &[("ReportPlanName", name)];
3890        let input = match wire::deserialize_update_report_plan_request(request, labels, query) {
3891            Ok(v) => v,
3892            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3893        };
3894        let description = input.report_plan_description.clone();
3895        let delivery_channel = input
3896            .report_delivery_channel
3897            .as_ref()
3898            .map(|v| serde_json::to_value(v).unwrap_or(Value::Null));
3899        let report_setting = input
3900            .report_setting
3901            .as_ref()
3902            .map(|v| serde_json::to_value(v).unwrap_or(Value::Null));
3903        let mut state = state.write().await;
3904        match state.update_report_plan(
3905            name,
3906            description.as_deref(),
3907            delivery_channel,
3908            report_setting,
3909        ) {
3910            Ok(plan) => {
3911                let creation_epoch = plan.creation_time.timestamp() as f64
3912                    + (plan.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
3913                wire::serialize_update_report_plan_response(&wire::UpdateReportPlanOutput {
3914                    report_plan_name: Some(plan.report_plan_name.clone()),
3915                    report_plan_arn: Some(plan.report_plan_arn.clone()),
3916                    creation_time: Some(creation_epoch),
3917                })
3918            }
3919            Err(e) => backup_error_response(&e),
3920        }
3921    }
3922
3923    async fn handle_update_restore_testing_plan(
3924        &self,
3925        state: &Arc<tokio::sync::RwLock<BackupState>>,
3926        plan_name: &str,
3927        request: &MockRequest,
3928        query: &HashMap<String, String>,
3929    ) -> MockResponse {
3930        let labels: &[(&str, &str)] = &[("RestoreTestingPlanName", plan_name)];
3931        let input =
3932            match wire::deserialize_update_restore_testing_plan_request(request, labels, query) {
3933                Ok(v) => v,
3934                Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3935            };
3936        let plan_in = &input.restore_testing_plan;
3937        let schedule_expression = plan_in.schedule_expression.clone();
3938        let schedule_expression_timezone = plan_in.schedule_expression_timezone.clone();
3939        let start_window_hours = plan_in.start_window_hours;
3940        let recovery_point_selection = plan_in
3941            .recovery_point_selection
3942            .as_ref()
3943            .map(|v| serde_json::to_value(v).unwrap_or(Value::Null));
3944
3945        let mut state = state.write().await;
3946        match state.update_restore_testing_plan(
3947            plan_name,
3948            schedule_expression.as_deref(),
3949            schedule_expression_timezone,
3950            start_window_hours,
3951            recovery_point_selection,
3952        ) {
3953            Ok(plan) => {
3954                let creation_epoch = plan.creation_time.timestamp() as f64
3955                    + (plan.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
3956                let update_epoch = plan.last_update_time.timestamp() as f64
3957                    + (plan.last_update_time.timestamp_subsec_millis() as f64 / 1000.0);
3958                wire::serialize_update_restore_testing_plan_response(
3959                    &wire::UpdateRestoreTestingPlanOutput {
3960                        restore_testing_plan_name: Some(plan.restore_testing_plan_name.clone()),
3961                        restore_testing_plan_arn: Some(plan.restore_testing_plan_arn.clone()),
3962                        creation_time: Some(creation_epoch),
3963                        update_time: Some(update_epoch),
3964                    },
3965                )
3966            }
3967            Err(e) => backup_error_response(&e),
3968        }
3969    }
3970
3971    async fn handle_update_restore_testing_selection(
3972        &self,
3973        state: &Arc<tokio::sync::RwLock<BackupState>>,
3974        plan_name: &str,
3975        selection_name: &str,
3976        request: &MockRequest,
3977        query: &HashMap<String, String>,
3978    ) -> MockResponse {
3979        let labels: &[(&str, &str)] = &[
3980            ("RestoreTestingPlanName", plan_name),
3981            ("RestoreTestingSelectionName", selection_name),
3982        ];
3983        let input = match wire::deserialize_update_restore_testing_selection_request(
3984            request, labels, query,
3985        ) {
3986            Ok(v) => v,
3987            Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
3988        };
3989        let sel_in = &input.restore_testing_selection;
3990        let iam_role_arn = sel_in.iam_role_arn.clone();
3991        let protected_resource_arns = sel_in.protected_resource_arns.clone();
3992        let protected_resource_conditions = sel_in
3993            .protected_resource_conditions
3994            .as_ref()
3995            .map(|v| serde_json::to_value(v).unwrap_or(Value::Null));
3996        let restore_metadata_overrides = sel_in.restore_metadata_overrides.clone();
3997        let validation_window_hours = sel_in.validation_window_hours;
3998
3999        let mut state = state.write().await;
4000        match state.update_restore_testing_selection(
4001            plan_name,
4002            selection_name,
4003            iam_role_arn.as_deref(),
4004            protected_resource_arns,
4005            protected_resource_conditions,
4006            restore_metadata_overrides,
4007            validation_window_hours,
4008        ) {
4009            Ok(sel) => {
4010                let creation_epoch = sel.creation_time.timestamp() as f64
4011                    + (sel.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
4012                let update_epoch = sel.last_update_time.timestamp() as f64
4013                    + (sel.last_update_time.timestamp_subsec_millis() as f64 / 1000.0);
4014                // Need to get the plan ARN
4015                wire::serialize_update_restore_testing_selection_response(
4016                    &wire::UpdateRestoreTestingSelectionOutput {
4017                        restore_testing_selection_name: Some(
4018                            sel.restore_testing_selection_name.clone(),
4019                        ),
4020                        restore_testing_plan_name: Some(sel.restore_testing_plan_name.clone()),
4021                        restore_testing_plan_arn: Some(sel.restore_testing_plan_arn.clone()),
4022                        creation_time: Some(creation_epoch),
4023                        update_time: Some(update_epoch),
4024                    },
4025                )
4026            }
4027            Err(e) => backup_error_response(&e),
4028        }
4029    }
4030
4031    async fn handle_update_tiering_configuration(
4032        &self,
4033        state: &Arc<tokio::sync::RwLock<BackupState>>,
4034        name: &str,
4035        request: &MockRequest,
4036        query: &HashMap<String, String>,
4037    ) -> MockResponse {
4038        let labels: &[(&str, &str)] = &[("TieringConfigurationName", name)];
4039        let input =
4040            match wire::deserialize_update_tiering_configuration_request(request, labels, query) {
4041                Ok(v) => v,
4042                Err(e) => return rest_json_error(400, "InvalidParameterValueException", &e),
4043            };
4044        let vault_name = if input.tiering_configuration.backup_vault_name.is_empty() {
4045            None
4046        } else {
4047            Some(input.tiering_configuration.backup_vault_name.clone())
4048        };
4049        let resource_selection = if input.tiering_configuration.resource_selection.is_empty() {
4050            None
4051        } else {
4052            serde_json::to_value(&input.tiering_configuration.resource_selection).ok()
4053        };
4054
4055        let mut state = state.write().await;
4056        match state.update_tiering_configuration(name, vault_name.as_deref(), resource_selection) {
4057            Ok(config) => {
4058                let creation_epoch = config.creation_time.timestamp() as f64
4059                    + (config.creation_time.timestamp_subsec_millis() as f64 / 1000.0);
4060                let updated_epoch = config.last_updated_time.timestamp() as f64
4061                    + (config.last_updated_time.timestamp_subsec_millis() as f64 / 1000.0);
4062                wire::serialize_update_tiering_configuration_response(
4063                    &wire::UpdateTieringConfigurationOutput {
4064                        tiering_configuration_name: Some(config.tiering_configuration_name.clone()),
4065                        tiering_configuration_arn: Some(config.tiering_configuration_arn.clone()),
4066                        creation_time: Some(creation_epoch),
4067                        last_updated_time: Some(updated_epoch),
4068                    },
4069                )
4070            }
4071            Err(e) => backup_error_response(&e),
4072        }
4073    }
4074}
4075
4076fn backup_error_response(err: &BackupError) -> MockResponse {
4077    let (status, error_type) = match err {
4078        BackupError::VaultAlreadyExists(_) => (400, "AlreadyExistsException"),
4079        BackupError::VaultNotFound(_) => (404, "ResourceNotFoundException"),
4080        BackupError::PlanAlreadyExists(_) => (400, "AlreadyExistsException"),
4081        BackupError::PlanNotFound(_) => (404, "ResourceNotFoundException"),
4082        BackupError::ReportPlanAlreadyExists(_) => (400, "AlreadyExistsException"),
4083        BackupError::ReportPlanNotFound(_) => (404, "ResourceNotFoundException"),
4084        BackupError::FrameworkAlreadyExists(_) => (400, "AlreadyExistsException"),
4085        BackupError::FrameworkNotFound(_) => (404, "ResourceNotFoundException"),
4086        BackupError::SelectionNotFound(_) => (404, "ResourceNotFoundException"),
4087        BackupError::RecoveryPointNotFound(_) => (404, "ResourceNotFoundException"),
4088        BackupError::VaultAccessPolicyNotFound(_) => (404, "ResourceNotFoundException"),
4089        BackupError::VaultNotificationsNotFound(_) => (404, "ResourceNotFoundException"),
4090        BackupError::BackupJobNotFound(_) => (404, "ResourceNotFoundException"),
4091        BackupError::ReportJobNotFound(_) => (404, "ResourceNotFoundException"),
4092        BackupError::ScanJobNotFound(_) => (404, "ResourceNotFoundException"),
4093        BackupError::TieringConfigAlreadyExists(_) => (400, "AlreadyExistsException"),
4094        BackupError::TieringConfigNotFound(_) => (404, "ResourceNotFoundException"),
4095        BackupError::VaultNotLocked(_) => (400, "InvalidParameterValueException"),
4096        BackupError::BackupJobNotCancellable(_) => (400, "InvalidRequestException"),
4097        BackupError::LegalHoldAlreadyExists(_) => (400, "AlreadyExistsException"),
4098        BackupError::LegalHoldNotFound(_) => (404, "ResourceNotFoundException"),
4099        BackupError::CopyJobNotFound(_) => (404, "ResourceNotFoundException"),
4100        BackupError::RestoreJobNotFound(_) => (404, "ResourceNotFoundException"),
4101        BackupError::RestoreTestingPlanAlreadyExists(_) => (400, "AlreadyExistsException"),
4102        BackupError::RestoreTestingPlanNotFound(_) => (404, "ResourceNotFoundException"),
4103        BackupError::RestoreTestingSelectionAlreadyExists(_) => (400, "AlreadyExistsException"),
4104        BackupError::RestoreTestingSelectionNotFound(_) => (404, "ResourceNotFoundException"),
4105    };
4106    let body = json!({
4107        "Type": "User",
4108        "Message": err.to_string(),
4109    });
4110    let mut resp = MockResponse::rest_json(status, body.to_string());
4111    resp.headers
4112        .insert(X_AMZN_ERRORTYPE, error_type.parse().unwrap());
4113    resp
4114}