Skip to main content

fakecloud_rds/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_persistence::SnapshotStore;
14
15use crate::runtime::{RdsRuntime, RuntimeError};
16use crate::state::{
17    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
18    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
19    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
20};
21
22const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
23
24fn is_mutating_action(action: &str) -> bool {
25    if matches!(
26        action,
27        "AddTagsToResource"
28            | "CreateDBInstance"
29            | "CreateDBInstanceReadReplica"
30            | "CreateDBParameterGroup"
31            | "CreateDBSnapshot"
32            | "CreateDBSubnetGroup"
33            | "DeleteDBInstance"
34            | "DeleteDBParameterGroup"
35            | "DeleteDBSnapshot"
36            | "DeleteDBSubnetGroup"
37            | "ModifyDBInstance"
38            | "ModifyDBParameterGroup"
39            | "ModifyDBSubnetGroup"
40            | "RebootDBInstance"
41            | "RemoveTagsFromResource"
42            | "RestoreDBInstanceFromDBSnapshot"
43    ) {
44        return true;
45    }
46    // Heuristic for the 140 extra ops: any verb that mutates state.
47    let mutating_prefixes = [
48        "Create",
49        "Modify",
50        "Delete",
51        "Reboot",
52        "Start",
53        "Stop",
54        "Failover",
55        "Switchover",
56        "Promote",
57        "Reset",
58        "Apply",
59        "Authorize",
60        "Revoke",
61        "Add",
62        "Remove",
63        "Register",
64        "Deregister",
65        "Copy",
66        "Restore",
67        "Backtrack",
68        "Cancel",
69        "Purchase",
70        "Disable",
71        "Enable",
72    ];
73    mutating_prefixes.iter().any(|p| action.starts_with(p))
74}
75const SUPPORTED_ACTIONS: &[&str] = &[
76    "AddRoleToDBCluster",
77    "AddRoleToDBInstance",
78    "AddSourceIdentifierToSubscription",
79    "AddTagsToResource",
80    "ApplyPendingMaintenanceAction",
81    "AuthorizeDBSecurityGroupIngress",
82    "BacktrackDBCluster",
83    "CancelExportTask",
84    "CopyDBClusterParameterGroup",
85    "CopyDBClusterSnapshot",
86    "CopyDBParameterGroup",
87    "CopyDBSnapshot",
88    "CopyOptionGroup",
89    "CreateBlueGreenDeployment",
90    "CreateCustomDBEngineVersion",
91    "CreateDBCluster",
92    "CreateDBClusterEndpoint",
93    "CreateDBClusterParameterGroup",
94    "CreateDBClusterSnapshot",
95    "CreateDBInstance",
96    "CreateDBInstanceReadReplica",
97    "CreateDBParameterGroup",
98    "CreateDBProxy",
99    "CreateDBProxyEndpoint",
100    "CreateDBSecurityGroup",
101    "CreateDBShardGroup",
102    "CreateDBSnapshot",
103    "CreateDBSubnetGroup",
104    "CreateEventSubscription",
105    "CreateGlobalCluster",
106    "CreateIntegration",
107    "CreateOptionGroup",
108    "CreateTenantDatabase",
109    "DeleteBlueGreenDeployment",
110    "DeleteCustomDBEngineVersion",
111    "DeleteDBCluster",
112    "DeleteDBClusterAutomatedBackup",
113    "DeleteDBClusterEndpoint",
114    "DeleteDBClusterParameterGroup",
115    "DeleteDBClusterSnapshot",
116    "DeleteDBInstance",
117    "DeleteDBInstanceAutomatedBackup",
118    "DeleteDBParameterGroup",
119    "DeleteDBProxy",
120    "DeleteDBProxyEndpoint",
121    "DeleteDBSecurityGroup",
122    "DeleteDBShardGroup",
123    "DeleteDBSnapshot",
124    "DeleteDBSubnetGroup",
125    "DeleteEventSubscription",
126    "DeleteGlobalCluster",
127    "DeleteIntegration",
128    "DeleteOptionGroup",
129    "DeleteTenantDatabase",
130    "DeregisterDBProxyTargets",
131    "DescribeAccountAttributes",
132    "DescribeBlueGreenDeployments",
133    "DescribeCertificates",
134    "DescribeDBClusterAutomatedBackups",
135    "DescribeDBClusterBacktracks",
136    "DescribeDBClusterEndpoints",
137    "DescribeDBClusterParameterGroups",
138    "DescribeDBClusterParameters",
139    "DescribeDBClusterSnapshotAttributes",
140    "DescribeDBClusterSnapshots",
141    "DescribeDBClusters",
142    "DescribeDBEngineVersions",
143    "DescribeDBInstanceAutomatedBackups",
144    "DescribeDBInstances",
145    "DescribeDBLogFiles",
146    "DescribeDBMajorEngineVersions",
147    "DescribeDBParameterGroups",
148    "DescribeDBParameters",
149    "DescribeDBProxies",
150    "DescribeDBProxyEndpoints",
151    "DescribeDBProxyTargetGroups",
152    "DescribeDBProxyTargets",
153    "DescribeDBRecommendations",
154    "DescribeDBSecurityGroups",
155    "DescribeDBShardGroups",
156    "DescribeDBSnapshotAttributes",
157    "DescribeDBSnapshotTenantDatabases",
158    "DescribeDBSnapshots",
159    "DescribeDBSubnetGroups",
160    "DescribeEngineDefaultClusterParameters",
161    "DescribeEngineDefaultParameters",
162    "DescribeEventCategories",
163    "DescribeEventSubscriptions",
164    "DescribeEvents",
165    "DescribeExportTasks",
166    "DescribeGlobalClusters",
167    "DescribeIntegrations",
168    "DescribeOptionGroupOptions",
169    "DescribeOptionGroups",
170    "DescribeOrderableDBInstanceOptions",
171    "DescribePendingMaintenanceActions",
172    "DescribeReservedDBInstances",
173    "DescribeReservedDBInstancesOfferings",
174    "DescribeSourceRegions",
175    "DescribeTenantDatabases",
176    "DescribeValidDBInstanceModifications",
177    "DisableHttpEndpoint",
178    "DownloadDBLogFilePortion",
179    "EnableHttpEndpoint",
180    "FailoverDBCluster",
181    "FailoverGlobalCluster",
182    "ListTagsForResource",
183    "ModifyActivityStream",
184    "ModifyCertificates",
185    "ModifyCurrentDBClusterCapacity",
186    "ModifyCustomDBEngineVersion",
187    "ModifyDBCluster",
188    "ModifyDBClusterEndpoint",
189    "ModifyDBClusterParameterGroup",
190    "ModifyDBClusterSnapshotAttribute",
191    "ModifyDBInstance",
192    "ModifyDBParameterGroup",
193    "ModifyDBProxy",
194    "ModifyDBProxyEndpoint",
195    "ModifyDBProxyTargetGroup",
196    "ModifyDBRecommendation",
197    "ModifyDBShardGroup",
198    "ModifyDBSnapshot",
199    "ModifyDBSnapshotAttribute",
200    "ModifyDBSubnetGroup",
201    "ModifyEventSubscription",
202    "ModifyGlobalCluster",
203    "ModifyIntegration",
204    "ModifyOptionGroup",
205    "ModifyTenantDatabase",
206    "PromoteReadReplica",
207    "PromoteReadReplicaDBCluster",
208    "PurchaseReservedDBInstancesOffering",
209    "RebootDBCluster",
210    "RebootDBInstance",
211    "RebootDBShardGroup",
212    "RegisterDBProxyTargets",
213    "RemoveFromGlobalCluster",
214    "RemoveRoleFromDBCluster",
215    "RemoveRoleFromDBInstance",
216    "RemoveSourceIdentifierFromSubscription",
217    "RemoveTagsFromResource",
218    "ResetDBClusterParameterGroup",
219    "ResetDBParameterGroup",
220    "RestoreDBClusterFromS3",
221    "RestoreDBClusterFromSnapshot",
222    "RestoreDBClusterToPointInTime",
223    "RestoreDBInstanceFromDBSnapshot",
224    "RestoreDBInstanceFromS3",
225    "RestoreDBInstanceToPointInTime",
226    "RevokeDBSecurityGroupIngress",
227    "StartActivityStream",
228    "StartDBCluster",
229    "StartDBInstance",
230    "StartDBInstanceAutomatedBackupsReplication",
231    "StartExportTask",
232    "StopActivityStream",
233    "StopDBCluster",
234    "StopDBInstance",
235    "StopDBInstanceAutomatedBackupsReplication",
236    "SwitchoverBlueGreenDeployment",
237    "SwitchoverGlobalCluster",
238    "SwitchoverReadReplica",
239];
240
241pub struct RdsService {
242    pub(crate) state: SharedRdsState,
243    runtime: Option<Arc<RdsRuntime>>,
244    snapshot_store: Option<Arc<dyn SnapshotStore>>,
245    snapshot_lock: Arc<AsyncMutex<()>>,
246    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
247}
248
249/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
250#[derive(Clone, Copy)]
251#[allow(dead_code, clippy::enum_variant_names)]
252pub(crate) enum RdsSourceType {
253    DbInstance,
254    DbSnapshot,
255    DbParameterGroup,
256}
257
258impl RdsSourceType {
259    fn as_str(self) -> &'static str {
260        match self {
261            Self::DbInstance => "DB_INSTANCE",
262            Self::DbSnapshot => "DB_SNAPSHOT",
263            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
264        }
265    }
266
267    fn detail_type(self) -> &'static str {
268        match self {
269            Self::DbInstance => "RDS DB Instance Event",
270            Self::DbSnapshot => "RDS DB Snapshot Event",
271            Self::DbParameterGroup => "RDS DB Parameter Group Event",
272        }
273    }
274}
275
276impl RdsService {
277    pub(crate) fn state_handle(&self) -> &SharedRdsState {
278        &self.state
279    }
280}
281
282impl RdsService {
283    pub fn new(state: SharedRdsState) -> Self {
284        Self {
285            state,
286            runtime: None,
287            snapshot_store: None,
288            snapshot_lock: Arc::new(AsyncMutex::new(())),
289            delivery_bus: None,
290        }
291    }
292
293    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
294        self.runtime = Some(runtime);
295        self
296    }
297
298    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
299        self.snapshot_store = Some(store);
300        self
301    }
302
303    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
304        self.delivery_bus = Some(bus);
305        self
306    }
307
308    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
309    /// No-op when the delivery bus isn't wired (tests, minimal configs).
310    pub(crate) fn emit_event(
311        &self,
312        source_type: RdsSourceType,
313        source_identifier: &str,
314        source_arn: &str,
315        event_id: &str,
316        event_categories: &[&str],
317        message: &str,
318    ) {
319        let Some(ref bus) = self.delivery_bus else {
320            return;
321        };
322        let detail = serde_json::json!({
323            "EventCategories": event_categories,
324            "SourceType": source_type.as_str(),
325            "SourceArn": source_arn,
326            "Date": Utc::now().to_rfc3339(),
327            "Message": message,
328            "SourceIdentifier": source_identifier,
329            "EventID": event_id,
330        });
331        bus.put_event_to_eventbridge(
332            "aws.rds",
333            source_type.detail_type(),
334            &detail.to_string(),
335            "default",
336        );
337    }
338
339    async fn save_snapshot(&self) {
340        let Some(store) = self.snapshot_store.clone() else {
341            return;
342        };
343        let _guard = self.snapshot_lock.lock().await;
344        let snapshot = RdsSnapshot {
345            schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
346            state: None,
347            accounts: Some(self.state.read().clone()),
348        };
349        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
350            let bytes = serde_json::to_vec(&snapshot)
351                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
352            store.save(&bytes)
353        })
354        .await;
355        match join {
356            Ok(Ok(())) => {}
357            Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
358            Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
359        }
360    }
361
362    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
363    ///
364    /// RDS operations that start, stop, or reach into a database container fail
365    /// with a consistent wire error when the daemon (Docker/Podman) is missing
366    /// rather than each caller restating the message.
367    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
368        self.runtime.as_ref().ok_or_else(|| {
369            AwsServiceError::aws_error(
370                StatusCode::SERVICE_UNAVAILABLE,
371                "InvalidParameterValue",
372                "Docker/Podman is required for RDS DB instances but is not available",
373            )
374        })
375    }
376}
377
378#[async_trait]
379impl AwsService for RdsService {
380    fn service_name(&self) -> &str {
381        "rds"
382    }
383
384    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
385        let mutates = is_mutating_action(request.action.as_str());
386        let result = match request.action.as_str() {
387            "AddTagsToResource" => self.add_tags_to_resource(&request),
388            "CreateDBInstance" => self.create_db_instance(&request).await,
389            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
390            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
391            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
392            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
393            "DeleteDBInstance" => self.delete_db_instance(&request).await,
394            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
395            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
396            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
397            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
398            "DescribeDBInstances" => self.describe_db_instances(&request),
399            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
400            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
401            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
402            "DescribeOrderableDBInstanceOptions" => {
403                self.describe_orderable_db_instance_options(&request)
404            }
405            "ListTagsForResource" => self.list_tags_for_resource(&request),
406            "ModifyDBInstance" => self.modify_db_instance(&request),
407            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
408            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
409            "RebootDBInstance" => self.reboot_db_instance(&request).await,
410            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
411            "RestoreDBInstanceFromDBSnapshot" => {
412                self.restore_db_instance_from_db_snapshot(&request).await
413            }
414            _ => self.handle_extra_action(&request),
415        };
416        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
417            self.save_snapshot().await;
418        }
419        result
420    }
421
422    fn supported_actions(&self) -> &[&str] {
423        SUPPORTED_ACTIONS
424    }
425}
426
427impl RdsService {
428    async fn create_db_instance(
429        &self,
430        request: &AwsRequest,
431    ) -> Result<AwsResponse, AwsServiceError> {
432        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
433        let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
434        let db_instance_class = required_param(request, "DBInstanceClass")?;
435        let engine = required_param(request, "Engine")?;
436        let master_username = required_param(request, "MasterUsername")?;
437        let master_user_password = required_param(request, "MasterUserPassword")?;
438        let db_name = optional_param(request, "DBName");
439        let engine_version =
440            optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
441        let publicly_accessible =
442            parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
443                .unwrap_or(true);
444        let deletion_protection =
445            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
446                .unwrap_or(false);
447        let port = optional_i32_param(request, "Port")?
448            .unwrap_or_else(|| default_port_for_engine(&engine));
449        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
450
451        let db_parameter_group_name = optional_param(request, "DBParameterGroupName")
452            .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
453
454        let backup_retention_period =
455            optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
456        let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
457            .unwrap_or_else(|| "03:00-04:00".to_string());
458        let option_group_name = optional_param(request, "OptionGroupName");
459        let multi_az =
460            parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
461
462        validate_create_request(
463            &db_instance_identifier,
464            allocated_storage,
465            &db_instance_class,
466            &engine,
467            &engine_version,
468            port,
469        )?;
470
471        {
472            let mut accounts = self.state.write();
473            let state = accounts.get_or_create(&request.account_id);
474            if !state.begin_instance_creation(&db_instance_identifier) {
475                return Err(AwsServiceError::aws_error(
476                    StatusCode::BAD_REQUEST,
477                    "DBInstanceAlreadyExists",
478                    format!("DBInstance {} already exists.", db_instance_identifier),
479                ));
480            }
481            // Validate parameter group exists if specified by the caller
482            if let Some(ref pg_name) = db_parameter_group_name {
483                if !state.parameter_groups.contains_key(pg_name) {
484                    state.cancel_instance_creation(&db_instance_identifier);
485                    return Err(AwsServiceError::aws_error(
486                        StatusCode::NOT_FOUND,
487                        "DBParameterGroupNotFound",
488                        format!("DBParameterGroup {} not found.", pg_name),
489                    ));
490                }
491            }
492        }
493
494        let runtime = self.require_runtime()?;
495
496        let logical_db_name = db_name
497            .clone()
498            .unwrap_or_else(|| default_db_name(&engine).to_string());
499        let running = runtime
500            .ensure_postgres(
501                &db_instance_identifier,
502                &engine,
503                &engine_version,
504                &master_username,
505                &master_user_password,
506                &logical_db_name,
507            )
508            .await
509            .map_err(|error| {
510                self.state
511                    .write()
512                    .get_or_create(&request.account_id)
513                    .cancel_instance_creation(&db_instance_identifier);
514                runtime_error_to_service_error(error)
515            })?;
516
517        let mut accounts = self.state.write();
518        let state = accounts.get_or_create(&request.account_id);
519        let created_at = Utc::now();
520        let instance = DbInstance {
521            db_instance_identifier: db_instance_identifier.clone(),
522            db_instance_arn: state.db_instance_arn(&db_instance_identifier),
523            db_instance_class: db_instance_class.clone(),
524            engine: engine.clone(),
525            engine_version: engine_version.clone(),
526            db_instance_status: "available".to_string(),
527            master_username: master_username.clone(),
528            db_name: db_name.clone(),
529            endpoint_address: "127.0.0.1".to_string(),
530            port: i32::from(running.host_port),
531            allocated_storage,
532            publicly_accessible,
533            deletion_protection,
534            created_at,
535            dbi_resource_id: state.next_dbi_resource_id(),
536            master_user_password,
537            container_id: running.container_id,
538            host_port: running.host_port,
539            tags: Vec::new(),
540            read_replica_source_db_instance_identifier: None,
541            read_replica_db_instance_identifiers: Vec::new(),
542            vpc_security_group_ids,
543            db_parameter_group_name,
544            backup_retention_period,
545            preferred_backup_window,
546            latest_restorable_time: if backup_retention_period > 0 {
547                Some(created_at)
548            } else {
549                None
550            },
551            option_group_name,
552            multi_az,
553            pending_modified_values: None,
554        };
555        state.finish_instance_creation(instance.clone());
556        let instance_arn = instance.db_instance_arn.clone();
557        drop(accounts);
558
559        self.emit_event(
560            RdsSourceType::DbInstance,
561            &db_instance_identifier,
562            &instance_arn,
563            "RDS-EVENT-0005",
564            &["creation"],
565            "DB instance created",
566        );
567
568        Ok(AwsResponse::xml(
569            StatusCode::OK,
570            xml_wrap(
571                "CreateDBInstance",
572                &format!(
573                    "<DBInstance>{}</DBInstance>",
574                    db_instance_xml(&instance, Some("creating"))
575                ),
576                &request.request_id,
577            ),
578        ))
579    }
580
581    async fn delete_db_instance(
582        &self,
583        request: &AwsRequest,
584    ) -> Result<AwsResponse, AwsServiceError> {
585        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
586        let skip_final_snapshot =
587            parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
588                .unwrap_or(false);
589        let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
590
591        if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
592            return Err(AwsServiceError::aws_error(
593                StatusCode::BAD_REQUEST,
594                "InvalidParameterCombination",
595                "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
596            ));
597        }
598        if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
599            return Err(AwsServiceError::aws_error(
600                StatusCode::BAD_REQUEST,
601                "InvalidParameterCombination",
602                "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
603            ));
604        }
605
606        // Check deletion protection BEFORE creating snapshot or making any changes
607        {
608            let accounts = self.state.read();
609            let empty = RdsState::new(&request.account_id, &request.region);
610            let state = accounts.get(&request.account_id).unwrap_or(&empty);
611            if let Some(instance) = state.instances.get(&db_instance_identifier) {
612                if instance.deletion_protection {
613                    return Err(AwsServiceError::aws_error(
614                        StatusCode::BAD_REQUEST,
615                        "InvalidDBInstanceState",
616                        format!(
617                            "DBInstance {} cannot be deleted because deletion protection is enabled.",
618                            db_instance_identifier
619                        ),
620                    ));
621                }
622            } else {
623                return Err(db_instance_not_found(&db_instance_identifier));
624            }
625        }
626
627        if let Some(ref snapshot_id) = final_db_snapshot_identifier {
628            self.create_final_db_snapshot(
629                &db_instance_identifier,
630                snapshot_id,
631                &request.account_id,
632                &request.region,
633            )
634            .await?;
635        }
636
637        let instance = {
638            let mut accounts = self.state.write();
639            let state = accounts.get_or_create(&request.account_id);
640            let instance = state
641                .instances
642                .remove(&db_instance_identifier)
643                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
644
645            if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
646                if let Some(source) = state.instances.get_mut(source_id) {
647                    source
648                        .read_replica_db_instance_identifiers
649                        .retain(|id| id != &db_instance_identifier);
650                }
651            }
652
653            for replica_id in &instance.read_replica_db_instance_identifiers {
654                if let Some(replica) = state.instances.get_mut(replica_id) {
655                    replica.read_replica_source_db_instance_identifier = None;
656                }
657            }
658
659            instance
660        };
661
662        if let Some(runtime) = &self.runtime {
663            runtime.stop_container(&db_instance_identifier).await;
664        }
665
666        self.emit_event(
667            RdsSourceType::DbInstance,
668            &db_instance_identifier,
669            &instance.db_instance_arn,
670            "RDS-EVENT-0003",
671            &["deletion"],
672            "DB instance deleted",
673        );
674
675        Ok(AwsResponse::xml(
676            StatusCode::OK,
677            xml_wrap(
678                "DeleteDBInstance",
679                &format!(
680                    "<DBInstance>{}</DBInstance>",
681                    db_instance_xml(&instance, Some("deleting"))
682                ),
683                &request.request_id,
684            ),
685        ))
686    }
687
688    /// Take a final snapshot of an instance that is about to be deleted,
689    /// persisting the dumped database into `state.snapshots`. The DLQ-style
690    /// conflict check runs twice — once under the read lock before paying
691    /// for the dump, once under the write lock before committing — to keep
692    /// concurrent deletes from colliding.
693    async fn create_final_db_snapshot(
694        &self,
695        db_instance_identifier: &str,
696        snapshot_id: &str,
697        account_id: &str,
698        region: &str,
699    ) -> Result<(), AwsServiceError> {
700        let runtime = self.runtime.as_ref().ok_or_else(|| {
701            AwsServiceError::aws_error(
702                StatusCode::SERVICE_UNAVAILABLE,
703                "InvalidParameterValue",
704                "Docker/Podman is required for RDS snapshots but is not available",
705            )
706        })?;
707
708        let (instance_for_snapshot, db_name) = {
709            let accounts = self.state.read();
710            let empty = RdsState::new(account_id, region);
711            let state = accounts.get(account_id).unwrap_or(&empty);
712
713            if state.snapshots.contains_key(snapshot_id) {
714                return Err(AwsServiceError::aws_error(
715                    StatusCode::CONFLICT,
716                    "DBSnapshotAlreadyExists",
717                    format!("DBSnapshot {snapshot_id} already exists."),
718                ));
719            }
720
721            let instance = state
722                .instances
723                .get(db_instance_identifier)
724                .cloned()
725                .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
726
727            let default_db = default_db_name(&instance.engine);
728            let db_name = instance
729                .db_name
730                .as_deref()
731                .unwrap_or(default_db)
732                .to_string();
733
734            (instance, db_name)
735        };
736
737        let dump_data = runtime
738            .dump_database(
739                db_instance_identifier,
740                &instance_for_snapshot.engine,
741                &instance_for_snapshot.master_username,
742                &instance_for_snapshot.master_user_password,
743                &db_name,
744            )
745            .await
746            .map_err(runtime_error_to_service_error)?;
747
748        let mut accounts = self.state.write();
749        let state = accounts.get_or_create(account_id);
750
751        if state.snapshots.contains_key(snapshot_id) {
752            return Err(AwsServiceError::aws_error(
753                StatusCode::CONFLICT,
754                "DBSnapshotAlreadyExists",
755                format!("DBSnapshot {snapshot_id} already exists."),
756            ));
757        }
758
759        let snapshot_arn = state.db_snapshot_arn(snapshot_id);
760
761        let snapshot = DbSnapshot {
762            db_snapshot_identifier: snapshot_id.to_string(),
763            db_snapshot_arn: snapshot_arn,
764            db_instance_identifier: db_instance_identifier.to_string(),
765            snapshot_create_time: Utc::now(),
766            engine: instance_for_snapshot.engine.clone(),
767            engine_version: instance_for_snapshot.engine_version.clone(),
768            allocated_storage: instance_for_snapshot.allocated_storage,
769            status: "available".to_string(),
770            port: instance_for_snapshot.port,
771            master_username: instance_for_snapshot.master_username.clone(),
772            db_name: instance_for_snapshot.db_name.clone(),
773            dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
774            snapshot_type: "manual".to_string(),
775            master_user_password: instance_for_snapshot.master_user_password.clone(),
776            tags: Vec::new(),
777            dump_data,
778        };
779
780        state.snapshots.insert(snapshot_id.to_string(), snapshot);
781        Ok(())
782    }
783
784    fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
785        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
786        let db_instance_class = optional_param(request, "DBInstanceClass");
787        let deletion_protection =
788            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
789        let apply_immediately =
790            parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
791
792        // Parse VPC security group IDs - only if at least one is provided
793        let vpc_security_group_ids = {
794            let mut ids = Vec::new();
795            for index in 1.. {
796                let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
797                match optional_param(request, &sg_id_name) {
798                    Some(sg_id) => ids.push(sg_id),
799                    None => break,
800                }
801            }
802            if ids.is_empty() {
803                None
804            } else {
805                Some(ids)
806            }
807        };
808
809        if db_instance_class.is_none()
810            && deletion_protection.is_none()
811            && vpc_security_group_ids.is_none()
812        {
813            return Err(AwsServiceError::aws_error(
814                StatusCode::BAD_REQUEST,
815                "InvalidParameterCombination",
816                "At least one supported mutable field must be provided.",
817            ));
818        }
819        if let Some(ref class) = db_instance_class {
820            validate_db_instance_class(class)?;
821        }
822
823        let mut accounts = self.state.write();
824        let state = accounts.get_or_create(&request.account_id);
825        let instance = state
826            .instances
827            .get_mut(&db_instance_identifier)
828            .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
829
830        // If ApplyImmediately is false, stage changes as pending
831        if apply_immediately == Some(false) {
832            let pending = instance
833                .pending_modified_values
834                .get_or_insert(Default::default());
835            if let Some(class) = db_instance_class {
836                pending.db_instance_class = Some(class);
837            }
838            // Note: deletion_protection and vpc_security_group_ids are applied immediately
839            // regardless of ApplyImmediately flag (per AWS behavior)
840            if let Some(deletion_protection) = deletion_protection {
841                instance.deletion_protection = deletion_protection;
842            }
843            if let Some(security_group_ids) = vpc_security_group_ids {
844                instance.vpc_security_group_ids = security_group_ids;
845            }
846        } else {
847            // Apply immediately (default behavior)
848            if let Some(class) = db_instance_class {
849                instance.db_instance_class = class;
850            }
851            if let Some(deletion_protection) = deletion_protection {
852                instance.deletion_protection = deletion_protection;
853            }
854            if let Some(security_group_ids) = vpc_security_group_ids {
855                instance.vpc_security_group_ids = security_group_ids;
856            }
857        }
858        let instance_arn = instance.db_instance_arn.clone();
859        let xml = xml_wrap(
860            "ModifyDBInstance",
861            &format!(
862                "<DBInstance>{}</DBInstance>",
863                db_instance_xml(instance, Some("modifying"))
864            ),
865            &request.request_id,
866        );
867        drop(accounts);
868
869        self.emit_event(
870            RdsSourceType::DbInstance,
871            &db_instance_identifier,
872            &instance_arn,
873            "RDS-EVENT-0014",
874            &["configuration change"],
875            "DB instance was modified",
876        );
877
878        Ok(AwsResponse::xml(StatusCode::OK, xml))
879    }
880
881    async fn reboot_db_instance(
882        &self,
883        request: &AwsRequest,
884    ) -> Result<AwsResponse, AwsServiceError> {
885        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
886        let force_failover =
887            parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
888        if force_failover == Some(true) {
889            return Err(AwsServiceError::aws_error(
890                StatusCode::BAD_REQUEST,
891                "InvalidParameterCombination",
892                "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
893            ));
894        }
895
896        let instance = {
897            let accounts = self.state.read();
898            let empty = RdsState::new(&request.account_id, &request.region);
899            let state = accounts.get(&request.account_id).unwrap_or(&empty);
900            state
901                .instances
902                .get(&db_instance_identifier)
903                .cloned()
904                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
905        };
906
907        let runtime = self.require_runtime()?;
908
909        let running = runtime
910            .restart_container(
911                &db_instance_identifier,
912                &instance.engine,
913                &instance.master_username,
914                &instance.master_user_password,
915                instance
916                    .db_name
917                    .as_deref()
918                    .unwrap_or(default_db_name(&instance.engine)),
919            )
920            .await
921            .map_err(runtime_error_to_service_error)?;
922
923        let instance = {
924            let mut accounts = self.state.write();
925            let state = accounts.get_or_create(&request.account_id);
926            let instance = state
927                .instances
928                .get_mut(&db_instance_identifier)
929                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
930            instance.host_port = running.host_port;
931            instance.port = i32::from(running.host_port);
932
933            // Apply any pending modifications
934            if let Some(pending) = instance.pending_modified_values.take() {
935                if let Some(class) = pending.db_instance_class {
936                    instance.db_instance_class = class;
937                }
938                if let Some(allocated_storage) = pending.allocated_storage {
939                    instance.allocated_storage = allocated_storage;
940                }
941                if let Some(backup_retention_period) = pending.backup_retention_period {
942                    instance.backup_retention_period = backup_retention_period;
943                }
944                if let Some(multi_az) = pending.multi_az {
945                    instance.multi_az = multi_az;
946                }
947                if let Some(engine_version) = pending.engine_version {
948                    instance.engine_version = engine_version;
949                }
950                if let Some(master_user_password) = pending.master_user_password {
951                    instance.master_user_password = master_user_password;
952                }
953            }
954
955            instance.clone()
956        };
957
958        self.emit_event(
959            RdsSourceType::DbInstance,
960            &db_instance_identifier,
961            &instance.db_instance_arn,
962            "RDS-EVENT-0006",
963            &["availability"],
964            "DB instance restarted",
965        );
966
967        Ok(AwsResponse::xml(
968            StatusCode::OK,
969            xml_wrap(
970                "RebootDBInstance",
971                &format!(
972                    "<DBInstance>{}</DBInstance>",
973                    db_instance_xml(&instance, Some("rebooting"))
974                ),
975                &request.request_id,
976            ),
977        ))
978    }
979
980    fn describe_db_engine_versions(
981        &self,
982        request: &AwsRequest,
983    ) -> Result<AwsResponse, AwsServiceError> {
984        let engine = optional_param(request, "Engine");
985        let engine_version = optional_param(request, "EngineVersion");
986        let family = optional_param(request, "DBParameterGroupFamily");
987        let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
988
989        let mut versions = filter_engine_versions(
990            &default_engine_versions(),
991            &engine,
992            &engine_version,
993            &family,
994        );
995
996        if default_only.unwrap_or(false) {
997            versions.truncate(1);
998        }
999
1000        Ok(AwsResponse::xml(
1001            StatusCode::OK,
1002            xml_wrap(
1003                "DescribeDBEngineVersions",
1004                &format!(
1005                    "<DBEngineVersions>{}</DBEngineVersions>",
1006                    versions.iter().map(engine_version_xml).collect::<String>()
1007                ),
1008                &request.request_id,
1009            ),
1010        ))
1011    }
1012
1013    fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1014        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1015        let marker = optional_param(request, "Marker");
1016        let max_records = optional_param(request, "MaxRecords");
1017
1018        let accounts = self.state.read();
1019        let empty = RdsState::new(&request.account_id, &request.region);
1020        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1021
1022        // If specific identifier requested, return just that one (no pagination)
1023        if let Some(identifier) = db_instance_identifier {
1024            let instance = state
1025                .instances
1026                .get(&identifier)
1027                .cloned()
1028                .ok_or_else(|| db_instance_not_found(&identifier))?;
1029
1030            return Ok(AwsResponse::xml(
1031                StatusCode::OK,
1032                xml_wrap(
1033                    "DescribeDBInstances",
1034                    &format!(
1035                        "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
1036                        db_instance_xml(&instance, None)
1037                    ),
1038                    &request.request_id,
1039                ),
1040            ));
1041        }
1042
1043        // Get all instances sorted by created_at, then identifier
1044        let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
1045        instances.sort_by(|a, b| {
1046            a.created_at
1047                .cmp(&b.created_at)
1048                .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
1049        });
1050
1051        // Apply pagination
1052        let paginated = paginate(instances, marker, max_records, |inst| {
1053            &inst.db_instance_identifier
1054        })?;
1055
1056        let marker_xml = paginated
1057            .next_marker
1058            .as_ref()
1059            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1060            .unwrap_or_default();
1061
1062        Ok(AwsResponse::xml(
1063            StatusCode::OK,
1064            xml_wrap(
1065                "DescribeDBInstances",
1066                &format!(
1067                    "<DBInstances>{}</DBInstances>{}",
1068                    paginated
1069                        .items
1070                        .iter()
1071                        .map(|instance| {
1072                            format!(
1073                                "<DBInstance>{}</DBInstance>",
1074                                db_instance_xml(instance, None)
1075                            )
1076                        })
1077                        .collect::<String>(),
1078                    marker_xml
1079                ),
1080                &request.request_id,
1081            ),
1082        ))
1083    }
1084
1085    fn describe_orderable_db_instance_options(
1086        &self,
1087        request: &AwsRequest,
1088    ) -> Result<AwsResponse, AwsServiceError> {
1089        let engine = optional_param(request, "Engine");
1090        let engine_version = optional_param(request, "EngineVersion");
1091        let db_instance_class = optional_param(request, "DBInstanceClass");
1092        let license_model = optional_param(request, "LicenseModel");
1093        let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
1094
1095        let options = filter_orderable_options(
1096            &default_orderable_options(),
1097            &engine,
1098            &engine_version,
1099            &db_instance_class,
1100            &license_model,
1101            vpc,
1102        );
1103
1104        Ok(AwsResponse::xml(
1105            StatusCode::OK,
1106            xml_wrap(
1107                "DescribeOrderableDBInstanceOptions",
1108                &format!(
1109                    "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
1110                    options.iter().map(orderable_option_xml).collect::<String>()
1111                ),
1112                &request.request_id,
1113            ),
1114        ))
1115    }
1116
1117    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1118        let resource_name = required_param(request, "ResourceName")?;
1119        let tags = parse_tags(request)?;
1120
1121        if tags.is_empty() {
1122            return Err(AwsServiceError::aws_error(
1123                StatusCode::BAD_REQUEST,
1124                "MissingParameter",
1125                "The request must contain the parameter Tags.",
1126            ));
1127        }
1128
1129        let mut accounts = self.state.write();
1130        let state = accounts.get_or_create(&request.account_id);
1131        let instance = find_instance_by_arn_mut(state, &resource_name)?;
1132        merge_tags(&mut instance.tags, &tags);
1133
1134        Ok(AwsResponse::xml(
1135            StatusCode::OK,
1136            xml_wrap("AddTagsToResource", "", &request.request_id),
1137        ))
1138    }
1139
1140    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1141        let resource_name = required_param(request, "ResourceName")?;
1142        if query_param_prefix_exists(request, "Filters.") {
1143            return Err(AwsServiceError::aws_error(
1144                StatusCode::BAD_REQUEST,
1145                "InvalidParameterValue",
1146                "Filters are not yet supported for ListTagsForResource.",
1147            ));
1148        }
1149
1150        let accounts = self.state.read();
1151        let empty = RdsState::new(&request.account_id, &request.region);
1152        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1153        let instance = find_instance_by_arn(state, &resource_name)?;
1154        let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
1155
1156        Ok(AwsResponse::xml(
1157            StatusCode::OK,
1158            xml_wrap(
1159                "ListTagsForResource",
1160                &format!("<TagList>{tag_xml}</TagList>"),
1161                &request.request_id,
1162            ),
1163        ))
1164    }
1165
1166    fn remove_tags_from_resource(
1167        &self,
1168        request: &AwsRequest,
1169    ) -> Result<AwsResponse, AwsServiceError> {
1170        let resource_name = required_param(request, "ResourceName")?;
1171        let tag_keys = parse_tag_keys(request)?;
1172
1173        if tag_keys.is_empty() {
1174            return Err(AwsServiceError::aws_error(
1175                StatusCode::BAD_REQUEST,
1176                "MissingParameter",
1177                "The request must contain the parameter TagKeys.",
1178            ));
1179        }
1180
1181        let mut accounts = self.state.write();
1182        let state = accounts.get_or_create(&request.account_id);
1183        let instance = find_instance_by_arn_mut(state, &resource_name)?;
1184        instance
1185            .tags
1186            .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
1187
1188        Ok(AwsResponse::xml(
1189            StatusCode::OK,
1190            xml_wrap("RemoveTagsFromResource", "", &request.request_id),
1191        ))
1192    }
1193
1194    async fn create_db_snapshot(
1195        &self,
1196        request: &AwsRequest,
1197    ) -> Result<AwsResponse, AwsServiceError> {
1198        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1199        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1200
1201        let runtime = self.runtime.as_ref().ok_or_else(|| {
1202            AwsServiceError::aws_error(
1203                StatusCode::SERVICE_UNAVAILABLE,
1204                "InvalidParameterValue",
1205                "Docker/Podman is required for RDS snapshots but is not available",
1206            )
1207        })?;
1208
1209        let (instance, db_name) = {
1210            let accounts = self.state.read();
1211            let empty = RdsState::new(&request.account_id, &request.region);
1212            let state = accounts.get(&request.account_id).unwrap_or(&empty);
1213
1214            if state.snapshots.contains_key(&db_snapshot_identifier) {
1215                return Err(AwsServiceError::aws_error(
1216                    StatusCode::CONFLICT,
1217                    "DBSnapshotAlreadyExists",
1218                    format!("DBSnapshot {db_snapshot_identifier} already exists."),
1219                ));
1220            }
1221
1222            let instance = state
1223                .instances
1224                .get(&db_instance_identifier)
1225                .cloned()
1226                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1227
1228            let default_db = default_db_name(&instance.engine);
1229            let db_name = instance
1230                .db_name
1231                .as_deref()
1232                .unwrap_or(default_db)
1233                .to_string();
1234
1235            (instance, db_name)
1236        };
1237
1238        let dump_data = runtime
1239            .dump_database(
1240                &db_instance_identifier,
1241                &instance.engine,
1242                &instance.master_username,
1243                &instance.master_user_password,
1244                &db_name,
1245            )
1246            .await
1247            .map_err(runtime_error_to_service_error)?;
1248
1249        let mut accounts = self.state.write();
1250        let state = accounts.get_or_create(&request.account_id);
1251
1252        if state.snapshots.contains_key(&db_snapshot_identifier) {
1253            return Err(AwsServiceError::aws_error(
1254                StatusCode::CONFLICT,
1255                "DBSnapshotAlreadyExists",
1256                format!("DBSnapshot {db_snapshot_identifier} already exists."),
1257            ));
1258        }
1259
1260        let snapshot = DbSnapshot {
1261            db_snapshot_identifier: db_snapshot_identifier.clone(),
1262            db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
1263            db_instance_identifier: instance.db_instance_identifier.clone(),
1264            snapshot_create_time: Utc::now(),
1265            engine: instance.engine.clone(),
1266            engine_version: instance.engine_version.clone(),
1267            allocated_storage: instance.allocated_storage,
1268            status: "available".to_string(),
1269            port: instance.port,
1270            master_username: instance.master_username.clone(),
1271            db_name: instance.db_name.clone(),
1272            dbi_resource_id: instance.dbi_resource_id.clone(),
1273            snapshot_type: "manual".to_string(),
1274            master_user_password: instance.master_user_password.clone(),
1275            tags: Vec::new(),
1276            dump_data,
1277        };
1278
1279        state
1280            .snapshots
1281            .insert(db_snapshot_identifier.clone(), snapshot.clone());
1282        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1283        drop(accounts);
1284
1285        self.emit_event(
1286            RdsSourceType::DbSnapshot,
1287            &db_snapshot_identifier,
1288            &snapshot_arn,
1289            "RDS-EVENT-0042",
1290            &["creation"],
1291            "Manual snapshot created",
1292        );
1293
1294        Ok(AwsResponse::xml(
1295            StatusCode::OK,
1296            xml_wrap(
1297                "CreateDBSnapshot",
1298                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1299                &request.request_id,
1300            ),
1301        ))
1302    }
1303
1304    fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1305        let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
1306        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1307        let marker = optional_param(request, "Marker");
1308        let max_records = optional_param(request, "MaxRecords");
1309
1310        if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
1311            return Err(AwsServiceError::aws_error(
1312                StatusCode::BAD_REQUEST,
1313                "InvalidParameterCombination",
1314                "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
1315            ));
1316        }
1317
1318        let accounts = self.state.read();
1319        let empty = RdsState::new(&request.account_id, &request.region);
1320        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1321
1322        // If specific snapshot requested, return just that one (no pagination)
1323        if let Some(snapshot_id) = db_snapshot_identifier {
1324            let snapshot = state
1325                .snapshots
1326                .get(&snapshot_id)
1327                .cloned()
1328                .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
1329
1330            return Ok(AwsResponse::xml(
1331                StatusCode::OK,
1332                xml_wrap(
1333                    "DescribeDBSnapshots",
1334                    &format!(
1335                        "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
1336                        db_snapshot_xml(&snapshot)
1337                    ),
1338                    &request.request_id,
1339                ),
1340            ));
1341        }
1342
1343        // Get snapshots, filtered by instance identifier if provided
1344        let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
1345            state
1346                .snapshots
1347                .values()
1348                .filter(|s| s.db_instance_identifier == instance_id)
1349                .cloned()
1350                .collect()
1351        } else {
1352            state.snapshots.values().cloned().collect()
1353        };
1354
1355        // Sort by creation time, then identifier
1356        snapshots.sort_by(|a, b| {
1357            a.snapshot_create_time
1358                .cmp(&b.snapshot_create_time)
1359                .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
1360        });
1361
1362        // Apply pagination
1363        let paginated = paginate(snapshots, marker, max_records, |snap| {
1364            &snap.db_snapshot_identifier
1365        })?;
1366
1367        let marker_xml = paginated
1368            .next_marker
1369            .as_ref()
1370            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1371            .unwrap_or_default();
1372
1373        Ok(AwsResponse::xml(
1374            StatusCode::OK,
1375            xml_wrap(
1376                "DescribeDBSnapshots",
1377                &format!(
1378                    "<DBSnapshots>{}</DBSnapshots>{}",
1379                    paginated
1380                        .items
1381                        .iter()
1382                        .map(|snapshot| format!(
1383                            "<DBSnapshot>{}</DBSnapshot>",
1384                            db_snapshot_xml(snapshot)
1385                        ))
1386                        .collect::<String>(),
1387                    marker_xml
1388                ),
1389                &request.request_id,
1390            ),
1391        ))
1392    }
1393
1394    fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1395        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1396
1397        let mut accounts = self.state.write();
1398        let state = accounts.get_or_create(&request.account_id);
1399
1400        let snapshot = state
1401            .snapshots
1402            .remove(&db_snapshot_identifier)
1403            .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1404        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1405        drop(accounts);
1406
1407        self.emit_event(
1408            RdsSourceType::DbSnapshot,
1409            &db_snapshot_identifier,
1410            &snapshot_arn,
1411            "RDS-EVENT-0041",
1412            &["deletion"],
1413            "Manual snapshot deleted",
1414        );
1415
1416        Ok(AwsResponse::xml(
1417            StatusCode::OK,
1418            xml_wrap(
1419                "DeleteDBSnapshot",
1420                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1421                &request.request_id,
1422            ),
1423        ))
1424    }
1425
1426    async fn restore_db_instance_from_db_snapshot(
1427        &self,
1428        request: &AwsRequest,
1429    ) -> Result<AwsResponse, AwsServiceError> {
1430        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1431        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1432        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1433
1434        let runtime = self.require_runtime()?;
1435
1436        let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1437            let mut accounts = self.state.write();
1438            let state = accounts.get_or_create(&request.account_id);
1439
1440            if !state.begin_instance_creation(&db_instance_identifier) {
1441                return Err(AwsServiceError::aws_error(
1442                    StatusCode::CONFLICT,
1443                    "DBInstanceAlreadyExists",
1444                    format!("DBInstance {db_instance_identifier} already exists."),
1445                ));
1446            }
1447
1448            let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1449                Some(s) => s,
1450                None => {
1451                    state.cancel_instance_creation(&db_instance_identifier);
1452                    return Err(db_snapshot_not_found(&db_snapshot_identifier));
1453                }
1454            };
1455
1456            let dbi_resource_id = state.next_dbi_resource_id();
1457            let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1458            let created_at = Utc::now();
1459
1460            (snapshot, dbi_resource_id, db_instance_arn, created_at)
1461        };
1462
1463        let db_name = snapshot
1464            .db_name
1465            .as_deref()
1466            .unwrap_or(default_db_name(&snapshot.engine));
1467        let running = match runtime
1468            .ensure_postgres(
1469                &db_instance_identifier,
1470                &snapshot.engine,
1471                &snapshot.engine_version,
1472                &snapshot.master_username,
1473                &snapshot.master_user_password,
1474                db_name,
1475            )
1476            .await
1477        {
1478            Ok(running) => running,
1479            Err(e) => {
1480                self.state
1481                    .write()
1482                    .get_or_create(&request.account_id)
1483                    .cancel_instance_creation(&db_instance_identifier);
1484                return Err(runtime_error_to_service_error(e));
1485            }
1486        };
1487
1488        if let Err(e) = runtime
1489            .restore_database(
1490                &db_instance_identifier,
1491                &snapshot.engine,
1492                &snapshot.master_username,
1493                &snapshot.master_user_password,
1494                db_name,
1495                &snapshot.dump_data,
1496            )
1497            .await
1498        {
1499            self.state
1500                .write()
1501                .get_or_create(&request.account_id)
1502                .cancel_instance_creation(&db_instance_identifier);
1503            runtime.stop_container(&db_instance_identifier).await;
1504            return Err(runtime_error_to_service_error(e));
1505        }
1506
1507        let instance = build_restored_instance(
1508            &db_instance_identifier,
1509            db_instance_arn,
1510            dbi_resource_id,
1511            created_at,
1512            vpc_security_group_ids,
1513            &snapshot,
1514            &running,
1515        );
1516
1517        self.state
1518            .write()
1519            .get_or_create(&request.account_id)
1520            .finish_instance_creation(instance.clone());
1521
1522        self.emit_event(
1523            RdsSourceType::DbInstance,
1524            &db_instance_identifier,
1525            &instance.db_instance_arn,
1526            "RDS-EVENT-0043",
1527            &["creation"],
1528            "DB instance restored from snapshot",
1529        );
1530
1531        Ok(AwsResponse::xml(
1532            StatusCode::OK,
1533            xml_wrap(
1534                "RestoreDBInstanceFromDBSnapshot",
1535                &format!(
1536                    "<DBInstance>{}</DBInstance>",
1537                    db_instance_xml(&instance, None)
1538                ),
1539                &request.request_id,
1540            ),
1541        ))
1542    }
1543
1544    async fn create_db_instance_read_replica(
1545        &self,
1546        request: &AwsRequest,
1547    ) -> Result<AwsResponse, AwsServiceError> {
1548        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1549        let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1550
1551        let runtime = self.runtime.as_ref().ok_or_else(|| {
1552            AwsServiceError::aws_error(
1553                StatusCode::SERVICE_UNAVAILABLE,
1554                "InvalidParameterValue",
1555                "Docker/Podman is required for RDS read replicas but is not available",
1556            )
1557        })?;
1558
1559        let (source_instance, db_name) = {
1560            let mut accounts = self.state.write();
1561            let state = accounts.get_or_create(&request.account_id);
1562
1563            if !state.begin_instance_creation(&db_instance_identifier) {
1564                return Err(AwsServiceError::aws_error(
1565                    StatusCode::CONFLICT,
1566                    "DBInstanceAlreadyExists",
1567                    format!("DBInstance {db_instance_identifier} already exists."),
1568                ));
1569            }
1570
1571            let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1572            {
1573                Some(inst) => inst,
1574                None => {
1575                    state.cancel_instance_creation(&db_instance_identifier);
1576                    return Err(db_instance_not_found(&source_db_instance_identifier));
1577                }
1578            };
1579
1580            let default_db = default_db_name(&source_instance.engine);
1581            let db_name = source_instance
1582                .db_name
1583                .as_deref()
1584                .unwrap_or(default_db)
1585                .to_string();
1586
1587            (source_instance, db_name)
1588        };
1589
1590        let dump_data = match runtime
1591            .dump_database(
1592                &source_db_instance_identifier,
1593                &source_instance.engine,
1594                &source_instance.master_username,
1595                &source_instance.master_user_password,
1596                &db_name,
1597            )
1598            .await
1599        {
1600            Ok(data) => data,
1601            Err(e) => {
1602                self.state
1603                    .write()
1604                    .get_or_create(&request.account_id)
1605                    .cancel_instance_creation(&db_instance_identifier);
1606                return Err(runtime_error_to_service_error(e));
1607            }
1608        };
1609
1610        let (dbi_resource_id, db_instance_arn) = {
1611            let accounts = self.state.read();
1612            let empty = RdsState::new(&request.account_id, &request.region);
1613            let s = accounts.get(&request.account_id).unwrap_or(&empty);
1614            (
1615                s.next_dbi_resource_id(),
1616                s.db_instance_arn(&db_instance_identifier),
1617            )
1618        };
1619        let created_at = Utc::now();
1620
1621        let running = match runtime
1622            .ensure_postgres(
1623                &db_instance_identifier,
1624                &source_instance.engine,
1625                &source_instance.engine_version,
1626                &source_instance.master_username,
1627                &source_instance.master_user_password,
1628                &db_name,
1629            )
1630            .await
1631        {
1632            Ok(running) => running,
1633            Err(e) => {
1634                self.state
1635                    .write()
1636                    .get_or_create(&request.account_id)
1637                    .cancel_instance_creation(&db_instance_identifier);
1638                return Err(runtime_error_to_service_error(e));
1639            }
1640        };
1641
1642        if let Err(e) = runtime
1643            .restore_database(
1644                &db_instance_identifier,
1645                &source_instance.engine,
1646                &source_instance.master_username,
1647                &source_instance.master_user_password,
1648                &db_name,
1649                &dump_data,
1650            )
1651            .await
1652        {
1653            self.state
1654                .write()
1655                .get_or_create(&request.account_id)
1656                .cancel_instance_creation(&db_instance_identifier);
1657            runtime.stop_container(&db_instance_identifier).await;
1658            return Err(runtime_error_to_service_error(e));
1659        }
1660
1661        let replica = build_read_replica_instance(
1662            &db_instance_identifier,
1663            db_instance_arn,
1664            dbi_resource_id,
1665            created_at,
1666            &source_db_instance_identifier,
1667            &source_instance,
1668            &running,
1669        );
1670
1671        let source_missing = {
1672            let mut accounts = self.state.write();
1673            let state = accounts.get_or_create(&request.account_id);
1674            match state.instances.get_mut(&source_db_instance_identifier) {
1675                Some(source) => {
1676                    source
1677                        .read_replica_db_instance_identifiers
1678                        .push(db_instance_identifier.clone());
1679                    state.finish_instance_creation(replica.clone());
1680                    false
1681                }
1682                None => {
1683                    state.cancel_instance_creation(&db_instance_identifier);
1684                    true
1685                }
1686            }
1687        };
1688
1689        if source_missing {
1690            runtime.stop_container(&db_instance_identifier).await;
1691            return Err(db_instance_not_found(&source_db_instance_identifier));
1692        }
1693
1694        self.emit_event(
1695            RdsSourceType::DbInstance,
1696            &db_instance_identifier,
1697            &replica.db_instance_arn,
1698            "RDS-EVENT-0005",
1699            &["creation", "read replica"],
1700            "Read replica DB instance created",
1701        );
1702
1703        Ok(AwsResponse::xml(
1704            StatusCode::OK,
1705            xml_wrap(
1706                "CreateDBInstanceReadReplica",
1707                &format!(
1708                    "<DBInstance>{}</DBInstance>",
1709                    db_instance_xml(&replica, None)
1710                ),
1711                &request.request_id,
1712            ),
1713        ))
1714    }
1715
1716    fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1717        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1718        let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1719        let subnet_ids = parse_subnet_ids(request)?;
1720
1721        if subnet_ids.is_empty() {
1722            return Err(AwsServiceError::aws_error(
1723                StatusCode::BAD_REQUEST,
1724                "InvalidParameterValue",
1725                "At least one subnet must be specified.",
1726            ));
1727        }
1728
1729        if subnet_ids.len() < 2 {
1730            return Err(AwsServiceError::aws_error(
1731                StatusCode::BAD_REQUEST,
1732                "DBSubnetGroupDoesNotCoverEnoughAZs",
1733                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1734            ));
1735        }
1736
1737        let mut accounts = self.state.write();
1738        let state = accounts.get_or_create(&request.account_id);
1739
1740        if state.subnet_groups.contains_key(&db_subnet_group_name) {
1741            return Err(AwsServiceError::aws_error(
1742                StatusCode::CONFLICT,
1743                "DBSubnetGroupAlreadyExists",
1744                format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1745            ));
1746        }
1747
1748        let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1749        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1750            .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1751            .collect();
1752
1753        // Validate that subnets span at least 2 unique Availability Zones
1754        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1755        if unique_azs.len() < 2 {
1756            return Err(AwsServiceError::aws_error(
1757                StatusCode::BAD_REQUEST,
1758                "DBSubnetGroupDoesNotCoverEnoughAZs",
1759                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1760            ));
1761        }
1762
1763        let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1764        let tags = parse_tags(request)?;
1765
1766        let subnet_group = DbSubnetGroup {
1767            db_subnet_group_name: db_subnet_group_name.clone(),
1768            db_subnet_group_arn,
1769            db_subnet_group_description,
1770            vpc_id,
1771            subnet_ids,
1772            subnet_availability_zones,
1773            tags,
1774        };
1775
1776        state
1777            .subnet_groups
1778            .insert(db_subnet_group_name, subnet_group.clone());
1779
1780        Ok(AwsResponse::xml(
1781            StatusCode::OK,
1782            xml_wrap(
1783                "CreateDBSubnetGroup",
1784                &format!(
1785                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1786                    db_subnet_group_xml(&subnet_group)
1787                ),
1788                &request.request_id,
1789            ),
1790        ))
1791    }
1792
1793    fn describe_db_subnet_groups(
1794        &self,
1795        request: &AwsRequest,
1796    ) -> Result<AwsResponse, AwsServiceError> {
1797        let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1798        let marker = optional_param(request, "Marker");
1799        let max_records = optional_param(request, "MaxRecords");
1800
1801        let accounts = self.state.read();
1802        let empty = RdsState::new(&request.account_id, &request.region);
1803        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1804
1805        // If specific subnet group requested, return just that one (no pagination)
1806        if let Some(name) = db_subnet_group_name {
1807            let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1808                AwsServiceError::aws_error(
1809                    StatusCode::NOT_FOUND,
1810                    "DBSubnetGroupNotFoundFault",
1811                    format!("DBSubnetGroup {} not found.", name),
1812                )
1813            })?;
1814
1815            return Ok(AwsResponse::xml(
1816                StatusCode::OK,
1817                xml_wrap(
1818                    "DescribeDBSubnetGroups",
1819                    &format!(
1820                        "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1821                        db_subnet_group_xml(sg)
1822                    ),
1823                    &request.request_id,
1824                ),
1825            ));
1826        }
1827
1828        // Get all subnet groups sorted by name
1829        let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1830        subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1831
1832        // Apply pagination
1833        let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1834            &sg.db_subnet_group_name
1835        })?;
1836
1837        let marker_xml = paginated
1838            .next_marker
1839            .as_ref()
1840            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1841            .unwrap_or_default();
1842
1843        let body = paginated
1844            .items
1845            .iter()
1846            .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1847            .collect::<Vec<_>>()
1848            .join("");
1849
1850        Ok(AwsResponse::xml(
1851            StatusCode::OK,
1852            xml_wrap(
1853                "DescribeDBSubnetGroups",
1854                &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1855                &request.request_id,
1856            ),
1857        ))
1858    }
1859
1860    fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1861        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1862
1863        let mut accounts = self.state.write();
1864        let state = accounts.get_or_create(&request.account_id);
1865
1866        if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1867            return Err(AwsServiceError::aws_error(
1868                StatusCode::NOT_FOUND,
1869                "DBSubnetGroupNotFoundFault",
1870                format!("DBSubnetGroup {db_subnet_group_name} not found."),
1871            ));
1872        }
1873
1874        Ok(AwsResponse::xml(
1875            StatusCode::OK,
1876            xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1877        ))
1878    }
1879
1880    fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1881        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1882        let subnet_ids = parse_subnet_ids(request)?;
1883
1884        if subnet_ids.is_empty() {
1885            return Err(AwsServiceError::aws_error(
1886                StatusCode::BAD_REQUEST,
1887                "InvalidParameterValue",
1888                "At least one subnet must be specified.",
1889            ));
1890        }
1891
1892        if subnet_ids.len() < 2 {
1893            return Err(AwsServiceError::aws_error(
1894                StatusCode::BAD_REQUEST,
1895                "DBSubnetGroupDoesNotCoverEnoughAZs",
1896                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1897            ));
1898        }
1899
1900        let mut accounts = self.state.write();
1901        let state = accounts.get_or_create(&request.account_id);
1902
1903        let region = state.region.clone();
1904
1905        let subnet_group = state
1906            .subnet_groups
1907            .get_mut(&db_subnet_group_name)
1908            .ok_or_else(|| {
1909                AwsServiceError::aws_error(
1910                    StatusCode::NOT_FOUND,
1911                    "DBSubnetGroupNotFoundFault",
1912                    format!("DBSubnetGroup {db_subnet_group_name} not found."),
1913                )
1914            })?;
1915
1916        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1917            .map(|i| format!("{}{}", &region, char::from(b'a' + (i % 6) as u8)))
1918            .collect();
1919
1920        // Validate that subnets span at least 2 unique Availability Zones
1921        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1922        if unique_azs.len() < 2 {
1923            return Err(AwsServiceError::aws_error(
1924                StatusCode::BAD_REQUEST,
1925                "DBSubnetGroupDoesNotCoverEnoughAZs",
1926                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1927            ));
1928        }
1929
1930        subnet_group.subnet_ids = subnet_ids;
1931        subnet_group.subnet_availability_zones = subnet_availability_zones;
1932
1933        let subnet_group_clone = subnet_group.clone();
1934
1935        Ok(AwsResponse::xml(
1936            StatusCode::OK,
1937            xml_wrap(
1938                "ModifyDBSubnetGroup",
1939                &format!(
1940                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1941                    db_subnet_group_xml(&subnet_group_clone)
1942                ),
1943                &request.request_id,
1944            ),
1945        ))
1946    }
1947
1948    fn create_db_parameter_group(
1949        &self,
1950        request: &AwsRequest,
1951    ) -> Result<AwsResponse, AwsServiceError> {
1952        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1953        let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1954        let description = required_param(request, "Description")?;
1955
1956        // Validate parameter group family against supported engines and versions
1957        let valid_families = [
1958            "postgres16",
1959            "postgres15",
1960            "postgres14",
1961            "postgres13",
1962            "mysql8.0",
1963            "mysql5.7",
1964            "mariadb10.11",
1965            "mariadb10.6",
1966        ];
1967
1968        if !valid_families.contains(&db_parameter_group_family.as_str()) {
1969            return Err(AwsServiceError::aws_error(
1970                StatusCode::BAD_REQUEST,
1971                "InvalidParameterValue",
1972                format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
1973            ));
1974        }
1975
1976        let mut accounts = self.state.write();
1977        let state = accounts.get_or_create(&request.account_id);
1978
1979        if state
1980            .parameter_groups
1981            .contains_key(&db_parameter_group_name)
1982        {
1983            return Err(AwsServiceError::aws_error(
1984                StatusCode::CONFLICT,
1985                "DBParameterGroupAlreadyExists",
1986                format!("DBParameterGroup {db_parameter_group_name} already exists."),
1987            ));
1988        }
1989
1990        let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
1991        let tags = parse_tags(request)?;
1992
1993        let parameter_group = DbParameterGroup {
1994            db_parameter_group_name: db_parameter_group_name.clone(),
1995            db_parameter_group_arn,
1996            db_parameter_group_family,
1997            description,
1998            parameters: std::collections::HashMap::new(),
1999            tags,
2000        };
2001
2002        state
2003            .parameter_groups
2004            .insert(db_parameter_group_name, parameter_group.clone());
2005
2006        Ok(AwsResponse::xml(
2007            StatusCode::OK,
2008            xml_wrap(
2009                "CreateDBParameterGroup",
2010                &format!(
2011                    "<DBParameterGroup>{}</DBParameterGroup>",
2012                    db_parameter_group_xml(&parameter_group)
2013                ),
2014                &request.request_id,
2015            ),
2016        ))
2017    }
2018
2019    fn describe_db_parameter_groups(
2020        &self,
2021        request: &AwsRequest,
2022    ) -> Result<AwsResponse, AwsServiceError> {
2023        let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
2024        let marker = optional_param(request, "Marker");
2025        let max_records = optional_param(request, "MaxRecords");
2026
2027        let accounts = self.state.read();
2028        let empty = RdsState::new(&request.account_id, &request.region);
2029        let state = accounts.get(&request.account_id).unwrap_or(&empty);
2030
2031        // If specific parameter group requested, return just that one (no pagination)
2032        if let Some(name) = db_parameter_group_name {
2033            let pg = state.parameter_groups.get(&name).ok_or_else(|| {
2034                AwsServiceError::aws_error(
2035                    StatusCode::NOT_FOUND,
2036                    "DBParameterGroupNotFound",
2037                    format!("DBParameterGroup {} not found.", name),
2038                )
2039            })?;
2040
2041            return Ok(AwsResponse::xml(
2042                StatusCode::OK,
2043                xml_wrap(
2044                    "DescribeDBParameterGroups",
2045                    &format!(
2046                        "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
2047                        db_parameter_group_xml(pg)
2048                    ),
2049                    &request.request_id,
2050                ),
2051            ));
2052        }
2053
2054        // Get all parameter groups sorted by name
2055        let mut parameter_groups: Vec<DbParameterGroup> =
2056            state.parameter_groups.values().cloned().collect();
2057        parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
2058
2059        // Apply pagination
2060        let paginated = paginate(parameter_groups, marker, max_records, |pg| {
2061            &pg.db_parameter_group_name
2062        })?;
2063
2064        let marker_xml = paginated
2065            .next_marker
2066            .as_ref()
2067            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
2068            .unwrap_or_default();
2069
2070        let body = paginated
2071            .items
2072            .iter()
2073            .map(|pg| {
2074                format!(
2075                    "<DBParameterGroup>{}</DBParameterGroup>",
2076                    db_parameter_group_xml(pg)
2077                )
2078            })
2079            .collect::<Vec<_>>()
2080            .join("");
2081
2082        Ok(AwsResponse::xml(
2083            StatusCode::OK,
2084            xml_wrap(
2085                "DescribeDBParameterGroups",
2086                &format!(
2087                    "<DBParameterGroups>{}</DBParameterGroups>{}",
2088                    body, marker_xml
2089                ),
2090                &request.request_id,
2091            ),
2092        ))
2093    }
2094
2095    fn delete_db_parameter_group(
2096        &self,
2097        request: &AwsRequest,
2098    ) -> Result<AwsResponse, AwsServiceError> {
2099        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2100
2101        let mut accounts = self.state.write();
2102        let state = accounts.get_or_create(&request.account_id);
2103
2104        if db_parameter_group_name.starts_with("default.") {
2105            return Err(AwsServiceError::aws_error(
2106                StatusCode::BAD_REQUEST,
2107                "InvalidParameterValue",
2108                "Cannot delete default parameter groups.",
2109            ));
2110        }
2111
2112        if state
2113            .parameter_groups
2114            .remove(&db_parameter_group_name)
2115            .is_none()
2116        {
2117            return Err(AwsServiceError::aws_error(
2118                StatusCode::NOT_FOUND,
2119                "DBParameterGroupNotFound",
2120                format!("DBParameterGroup {db_parameter_group_name} not found."),
2121            ));
2122        }
2123
2124        Ok(AwsResponse::xml(
2125            StatusCode::OK,
2126            xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
2127        ))
2128    }
2129
2130    fn modify_db_parameter_group(
2131        &self,
2132        request: &AwsRequest,
2133    ) -> Result<AwsResponse, AwsServiceError> {
2134        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2135
2136        let mut accounts = self.state.write();
2137        let state = accounts.get_or_create(&request.account_id);
2138
2139        let parameter_group = state
2140            .parameter_groups
2141            .get_mut(&db_parameter_group_name)
2142            .ok_or_else(|| {
2143                AwsServiceError::aws_error(
2144                    StatusCode::NOT_FOUND,
2145                    "DBParameterGroupNotFound",
2146                    format!("DBParameterGroup {db_parameter_group_name} not found."),
2147                )
2148            })?;
2149
2150        if let Some(new_description) = optional_param(request, "Description") {
2151            parameter_group.description = new_description;
2152        }
2153
2154        let parameter_group_clone = parameter_group.clone();
2155
2156        Ok(AwsResponse::xml(
2157            StatusCode::OK,
2158            xml_wrap(
2159                "ModifyDBParameterGroup",
2160                &format!(
2161                    "<DBParameterGroupName>{}</DBParameterGroupName>",
2162                    xml_escape(&parameter_group_clone.db_parameter_group_name)
2163                ),
2164                &request.request_id,
2165            ),
2166        ))
2167    }
2168}
2169
2170fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
2171    fakecloud_core::query::optional_query_param(req, name)
2172}
2173
2174fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
2175    fakecloud_core::query::required_query_param(req, name)
2176}
2177
2178fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
2179    let value = required_param(req, name)?;
2180    value.parse::<i32>().map_err(|_| {
2181        AwsServiceError::aws_error(
2182            StatusCode::BAD_REQUEST,
2183            "InvalidParameterValue",
2184            format!("Parameter {name} must be a valid integer."),
2185        )
2186    })
2187}
2188
2189fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
2190    optional_param(req, name)
2191        .map(|value| {
2192            value.parse::<i32>().map_err(|_| {
2193                AwsServiceError::aws_error(
2194                    StatusCode::BAD_REQUEST,
2195                    "InvalidParameterValue",
2196                    format!("Parameter {name} must be a valid integer."),
2197                )
2198            })
2199        })
2200        .transpose()
2201}
2202
2203fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
2204    let mut tags = Vec::new();
2205    for index in 1.. {
2206        let key_name = format!("Tags.Tag.{index}.Key");
2207        let value_name = format!("Tags.Tag.{index}.Value");
2208        let key = optional_param(req, &key_name);
2209        let value = optional_param(req, &value_name);
2210
2211        match (key, value) {
2212            (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
2213            (None, None) => break,
2214            _ => {
2215                return Err(AwsServiceError::aws_error(
2216                    StatusCode::BAD_REQUEST,
2217                    "InvalidParameterValue",
2218                    "Each tag must include both Key and Value.",
2219                ));
2220            }
2221        }
2222    }
2223
2224    Ok(tags)
2225}
2226
2227fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2228    let mut keys = Vec::new();
2229    for index in 1.. {
2230        let key_name = format!("TagKeys.member.{index}");
2231        match optional_param(req, &key_name) {
2232            Some(key) => keys.push(key),
2233            None => break,
2234        }
2235    }
2236
2237    Ok(keys)
2238}
2239
2240fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2241    let mut subnet_ids = Vec::new();
2242    for index in 1.. {
2243        let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
2244        match optional_param(req, &subnet_id_name) {
2245            Some(subnet_id) => subnet_ids.push(subnet_id),
2246            None => break,
2247        }
2248    }
2249
2250    Ok(subnet_ids)
2251}
2252
2253fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
2254    let mut security_group_ids = Vec::new();
2255    for index in 1.. {
2256        let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
2257        match optional_param(req, &sg_id_name) {
2258            Some(sg_id) => security_group_ids.push(sg_id),
2259            None => break,
2260        }
2261    }
2262
2263    // If no security groups provided, return a default one
2264    if security_group_ids.is_empty() {
2265        security_group_ids.push("sg-default".to_string());
2266    }
2267
2268    security_group_ids
2269}
2270
2271fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
2272    req.query_params.keys().any(|key| key.starts_with(prefix))
2273}
2274
2275fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
2276    value
2277        .map(|raw| match raw {
2278            "true" | "True" | "TRUE" => Ok(true),
2279            "false" | "False" | "FALSE" => Ok(false),
2280            _ => Err(AwsServiceError::aws_error(
2281                StatusCode::BAD_REQUEST,
2282                "InvalidParameterValue",
2283                format!("Boolean parameter value '{raw}' is invalid."),
2284            )),
2285        })
2286        .transpose()
2287}
2288
2289struct PaginationResult<T> {
2290    items: Vec<T>,
2291    next_marker: Option<String>,
2292}
2293
2294fn paginate<T, F>(
2295    mut items: Vec<T>,
2296    marker: Option<String>,
2297    max_records: Option<String>,
2298    get_id: F,
2299) -> Result<PaginationResult<T>, AwsServiceError>
2300where
2301    F: Fn(&T) -> &str,
2302{
2303    // Parse max_records with default 100, max 100
2304    let max = if let Some(max_str) = max_records {
2305        let parsed = max_str.parse::<i32>().map_err(|_| {
2306            AwsServiceError::aws_error(
2307                StatusCode::BAD_REQUEST,
2308                "InvalidParameterValue",
2309                "MaxRecords must be a valid integer.",
2310            )
2311        })?;
2312        if !(1..=100).contains(&parsed) {
2313            return Err(AwsServiceError::aws_error(
2314                StatusCode::BAD_REQUEST,
2315                "InvalidParameterValue",
2316                "MaxRecords must be between 1 and 100.",
2317            ));
2318        }
2319        parsed as usize
2320    } else {
2321        100
2322    };
2323
2324    // Decode marker to get starting identifier
2325    let start_id = if let Some(encoded_marker) = marker {
2326        let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
2327            AwsServiceError::aws_error(
2328                StatusCode::BAD_REQUEST,
2329                "InvalidParameterValue",
2330                "Marker is invalid.",
2331            )
2332        })?;
2333        let id = String::from_utf8(decoded).map_err(|_| {
2334            AwsServiceError::aws_error(
2335                StatusCode::BAD_REQUEST,
2336                "InvalidParameterValue",
2337                "Marker is invalid.",
2338            )
2339        })?;
2340        Some(id)
2341    } else {
2342        None
2343    };
2344
2345    // Find starting position
2346    let start_index = if let Some(ref start_id) = start_id {
2347        items
2348            .iter()
2349            .position(|item| get_id(item) == start_id)
2350            .map(|pos| pos + 1) // Start after the marker
2351            .unwrap_or(items.len()) // If not found, return empty result
2352    } else {
2353        0
2354    };
2355
2356    // Take items from start_index
2357    let total_items = items.len();
2358    let end_index = std::cmp::min(start_index + max, total_items);
2359    let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
2360
2361    // Create next marker if there are more items
2362    let next_marker = if end_index < total_items {
2363        paginated_items
2364            .last()
2365            .map(|item| BASE64.encode(get_id(item).as_bytes()))
2366    } else {
2367        None
2368    };
2369
2370    Ok(PaginationResult {
2371        items: paginated_items,
2372        next_marker,
2373    })
2374}
2375
2376fn validate_create_request(
2377    db_instance_identifier: &str,
2378    allocated_storage: i32,
2379    db_instance_class: &str,
2380    engine: &str,
2381    engine_version: &str,
2382    port: i32,
2383) -> Result<(), AwsServiceError> {
2384    if allocated_storage <= 0 {
2385        return Err(AwsServiceError::aws_error(
2386            StatusCode::BAD_REQUEST,
2387            "InvalidParameterValue",
2388            "AllocatedStorage must be greater than zero.",
2389        ));
2390    }
2391    if port <= 0 {
2392        return Err(AwsServiceError::aws_error(
2393            StatusCode::BAD_REQUEST,
2394            "InvalidParameterValue",
2395            "Port must be greater than zero.",
2396        ));
2397    }
2398    if !db_instance_identifier
2399        .chars()
2400        .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2401    {
2402        return Err(AwsServiceError::aws_error(
2403            StatusCode::BAD_REQUEST,
2404            "InvalidParameterValue",
2405            "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2406        ));
2407    }
2408    // Validate engine
2409    let supported_engines = [
2410        "postgres",
2411        "mysql",
2412        "mariadb",
2413        "oracle-ee",
2414        "oracle-se2",
2415        "oracle-ee-cdb",
2416        "oracle-se2-cdb",
2417        "sqlserver-ee",
2418        "sqlserver-se",
2419        "sqlserver-ex",
2420        "sqlserver-web",
2421        "db2-se",
2422        "db2-ae",
2423    ];
2424    if !supported_engines.contains(&engine) {
2425        return Err(AwsServiceError::aws_error(
2426            StatusCode::BAD_REQUEST,
2427            "InvalidParameterValue",
2428            format!("Engine '{}' is not supported.", engine),
2429        ));
2430    }
2431
2432    // Validate engine version. The Oracle/SQL Server/Db2 lists track
2433    // the major-minor versions actually shipped by the upstream
2434    // dev-edition images (gvenzl/oracle-free 23, mssql-server 2022,
2435    // db2_community 11.5). Adding a new version here also requires
2436    // wiring the image tag in `RdsRuntime::ensure_postgres`.
2437    let supported_versions = match engine {
2438        "postgres" => vec!["16.3", "15.5", "14.10", "13.13"],
2439        "mysql" => vec!["8.0.35", "8.0.28", "5.7.44"],
2440        "mariadb" => vec!["10.11.6", "10.6.16"],
2441        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
2442            vec!["23.0.0", "21.0.0", "19.0.0"]
2443        }
2444        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
2445            vec!["16.00.4085.2.v1", "15.00.4322.2.v1"]
2446        }
2447        "db2-se" | "db2-ae" => vec!["11.5.9.0.sb00000000.r1", "11.5.8.0.sb00000000.r1"],
2448        _ => vec![],
2449    };
2450
2451    if !supported_versions.contains(&engine_version) {
2452        return Err(AwsServiceError::aws_error(
2453            StatusCode::BAD_REQUEST,
2454            "InvalidParameterValue",
2455            format!("EngineVersion '{engine_version}' is not supported yet."),
2456        ));
2457    }
2458    validate_db_instance_class(db_instance_class)?;
2459    Ok(())
2460}
2461
2462fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2463    if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2464        return Err(AwsServiceError::aws_error(
2465            StatusCode::BAD_REQUEST,
2466            "InvalidParameterValue",
2467            format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2468        ));
2469    }
2470    Ok(())
2471}
2472
2473fn filter_engine_versions(
2474    versions: &[EngineVersionInfo],
2475    engine: &Option<String>,
2476    engine_version: &Option<String>,
2477    family: &Option<String>,
2478) -> Vec<EngineVersionInfo> {
2479    versions
2480        .iter()
2481        .filter(|candidate| {
2482            engine
2483                .as_ref()
2484                .is_none_or(|expected| candidate.engine == *expected)
2485        })
2486        .filter(|candidate| {
2487            engine_version
2488                .as_ref()
2489                .is_none_or(|expected| candidate.engine_version == *expected)
2490        })
2491        .filter(|candidate| {
2492            family
2493                .as_ref()
2494                .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2495        })
2496        .cloned()
2497        .collect()
2498}
2499
2500fn filter_orderable_options(
2501    options: &[OrderableDbInstanceOption],
2502    engine: &Option<String>,
2503    engine_version: &Option<String>,
2504    db_instance_class: &Option<String>,
2505    license_model: &Option<String>,
2506    vpc: Option<bool>,
2507) -> Vec<OrderableDbInstanceOption> {
2508    options
2509        .iter()
2510        .filter(|candidate| {
2511            engine
2512                .as_ref()
2513                .is_none_or(|expected| candidate.engine == *expected)
2514        })
2515        .filter(|candidate| {
2516            engine_version
2517                .as_ref()
2518                .is_none_or(|expected| candidate.engine_version == *expected)
2519        })
2520        .filter(|candidate| {
2521            db_instance_class
2522                .as_ref()
2523                .is_none_or(|expected| candidate.db_instance_class == *expected)
2524        })
2525        .filter(|candidate| {
2526            license_model
2527                .as_ref()
2528                .is_none_or(|expected| candidate.license_model == *expected)
2529        })
2530        .filter(|_| vpc.unwrap_or(true))
2531        .cloned()
2532        .collect()
2533}
2534
2535/// Build a `DbInstance` for a newly-created read replica, copying the
2536/// source instance's physical attributes and binding the replica's
2537/// identifier, ARN, resource id, container id and host port.
2538#[allow(clippy::too_many_arguments)]
2539/// Build a `DbInstance` from a restored snapshot. Copies the physical
2540/// attributes off the snapshot and binds the new instance's identifier,
2541/// ARN, resource id, container id and host port.
2542fn build_restored_instance(
2543    db_instance_identifier: &str,
2544    db_instance_arn: String,
2545    dbi_resource_id: String,
2546    created_at: chrono::DateTime<Utc>,
2547    vpc_security_group_ids: Vec<String>,
2548    snapshot: &DbSnapshot,
2549    running: &crate::runtime::RunningDbContainer,
2550) -> DbInstance {
2551    DbInstance {
2552        db_instance_identifier: db_instance_identifier.to_string(),
2553        db_instance_arn,
2554        db_instance_class: "db.t3.micro".to_string(),
2555        engine: snapshot.engine.clone(),
2556        engine_version: snapshot.engine_version.clone(),
2557        db_instance_status: "available".to_string(),
2558        master_username: snapshot.master_username.clone(),
2559        db_name: snapshot.db_name.clone(),
2560        endpoint_address: "127.0.0.1".to_string(),
2561        port: i32::from(running.host_port),
2562        allocated_storage: snapshot.allocated_storage,
2563        publicly_accessible: true,
2564        deletion_protection: false,
2565        created_at,
2566        dbi_resource_id,
2567        master_user_password: snapshot.master_user_password.clone(),
2568        container_id: running.container_id.clone(),
2569        host_port: running.host_port,
2570        tags: Vec::new(),
2571        read_replica_source_db_instance_identifier: None,
2572        read_replica_db_instance_identifiers: Vec::new(),
2573        vpc_security_group_ids,
2574        db_parameter_group_name: None,
2575        backup_retention_period: 1,
2576        preferred_backup_window: "03:00-04:00".to_string(),
2577        latest_restorable_time: Some(created_at),
2578        option_group_name: None,
2579        multi_az: false,
2580        pending_modified_values: None,
2581    }
2582}
2583
2584fn build_read_replica_instance(
2585    db_instance_identifier: &str,
2586    db_instance_arn: String,
2587    dbi_resource_id: String,
2588    created_at: chrono::DateTime<Utc>,
2589    source_db_instance_identifier: &str,
2590    source: &DbInstance,
2591    running: &crate::runtime::RunningDbContainer,
2592) -> DbInstance {
2593    DbInstance {
2594        db_instance_identifier: db_instance_identifier.to_string(),
2595        db_instance_arn,
2596        db_instance_class: source.db_instance_class.clone(),
2597        engine: source.engine.clone(),
2598        engine_version: source.engine_version.clone(),
2599        db_instance_status: "available".to_string(),
2600        master_username: source.master_username.clone(),
2601        db_name: source.db_name.clone(),
2602        endpoint_address: "127.0.0.1".to_string(),
2603        port: i32::from(running.host_port),
2604        allocated_storage: source.allocated_storage,
2605        publicly_accessible: source.publicly_accessible,
2606        deletion_protection: false,
2607        created_at,
2608        dbi_resource_id,
2609        master_user_password: source.master_user_password.clone(),
2610        container_id: running.container_id.clone(),
2611        host_port: running.host_port,
2612        tags: Vec::new(),
2613        read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.to_string()),
2614        read_replica_db_instance_identifiers: Vec::new(),
2615        vpc_security_group_ids: source.vpc_security_group_ids.clone(),
2616        db_parameter_group_name: source.db_parameter_group_name.clone(),
2617        backup_retention_period: source.backup_retention_period,
2618        preferred_backup_window: source.preferred_backup_window.clone(),
2619        latest_restorable_time: if source.backup_retention_period > 0 {
2620            Some(created_at)
2621        } else {
2622            None
2623        },
2624        option_group_name: source.option_group_name.clone(),
2625        multi_az: source.multi_az,
2626        pending_modified_values: None,
2627    }
2628}
2629
2630fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2631    fakecloud_core::query::query_response_xml(action, RDS_NS, inner, request_id)
2632}
2633
2634fn engine_version_xml(version: &EngineVersionInfo) -> String {
2635    format!(
2636        "<DBEngineVersion>\
2637         <Engine>{}</Engine>\
2638         <EngineVersion>{}</EngineVersion>\
2639         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2640         <DBEngineDescription>{}</DBEngineDescription>\
2641         <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2642         <Status>{}</Status>\
2643         </DBEngineVersion>",
2644        xml_escape(&version.engine),
2645        xml_escape(&version.engine_version),
2646        xml_escape(&version.db_parameter_group_family),
2647        xml_escape(&version.db_engine_description),
2648        xml_escape(&version.db_engine_version_description),
2649        xml_escape(&version.status),
2650    )
2651}
2652
2653fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2654    format!(
2655        "<OrderableDBInstanceOption>\
2656         <Engine>{}</Engine>\
2657         <EngineVersion>{}</EngineVersion>\
2658         <DBInstanceClass>{}</DBInstanceClass>\
2659         <LicenseModel>{}</LicenseModel>\
2660         <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2661         <MultiAZCapable>true</MultiAZCapable>\
2662         <ReadReplicaCapable>true</ReadReplicaCapable>\
2663         <Vpc>true</Vpc>\
2664         <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2665         <StorageType>{}</StorageType>\
2666         <SupportsIops>false</SupportsIops>\
2667         <MinStorageSize>{}</MinStorageSize>\
2668         <MaxStorageSize>{}</MaxStorageSize>\
2669         <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2670         </OrderableDBInstanceOption>",
2671        xml_escape(&option.engine),
2672        xml_escape(&option.engine_version),
2673        xml_escape(&option.db_instance_class),
2674        xml_escape(&option.license_model),
2675        xml_escape(&option.storage_type),
2676        option.min_storage_size,
2677        option.max_storage_size,
2678    )
2679}
2680
2681fn tag_xml(tag: &RdsTag) -> String {
2682    format!(
2683        "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2684        xml_escape(&tag.key),
2685        xml_escape(&tag.value),
2686    )
2687}
2688
2689fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2690    let status = status_override.unwrap_or(&instance.db_instance_status);
2691    let db_name_xml = instance
2692        .db_name
2693        .as_ref()
2694        .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2695        .unwrap_or_default();
2696
2697    let read_replica_source_xml = instance
2698        .read_replica_source_db_instance_identifier
2699        .as_ref()
2700        .map(|source| {
2701            format!(
2702                "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2703                xml_escape(source)
2704            )
2705        })
2706        .unwrap_or_default();
2707
2708    let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2709        "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2710    } else {
2711        format!(
2712            "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2713            instance
2714                .read_replica_db_instance_identifiers
2715                .iter()
2716                .map(|id| format!(
2717                    "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2718                    xml_escape(id)
2719                ))
2720                .collect::<String>()
2721        )
2722    };
2723
2724    let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2725        "<VpcSecurityGroups/>".to_string()
2726    } else {
2727        format!(
2728            "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2729            instance
2730                .vpc_security_group_ids
2731                .iter()
2732                .map(|sg_id| format!(
2733                    "<VpcSecurityGroupMembership>\
2734                     <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2735                     <Status>active</Status>\
2736                     </VpcSecurityGroupMembership>",
2737                    xml_escape(sg_id)
2738                ))
2739                .collect::<String>()
2740        )
2741    };
2742
2743    let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2744        Some(pg_name) => format!(
2745            "<DBParameterGroups>\
2746             <DBParameterGroup>\
2747             <DBParameterGroupName>{}</DBParameterGroupName>\
2748             <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2749             </DBParameterGroup>\
2750             </DBParameterGroups>",
2751            xml_escape(pg_name)
2752        ),
2753        None => "<DBParameterGroups/>".to_string(),
2754    };
2755
2756    let option_group_memberships_xml = match &instance.option_group_name {
2757        Some(og_name) => format!(
2758            "<OptionGroupMemberships>\
2759             <OptionGroupMembership>\
2760             <OptionGroupName>{}</OptionGroupName>\
2761             <Status>in-sync</Status>\
2762             </OptionGroupMembership>\
2763             </OptionGroupMemberships>",
2764            xml_escape(og_name)
2765        ),
2766        None => "<OptionGroupMemberships/>".to_string(),
2767    };
2768
2769    let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2770        let mut fields = Vec::new();
2771        if let Some(ref class) = pending.db_instance_class {
2772            fields.push(format!(
2773                "<DBInstanceClass>{}</DBInstanceClass>",
2774                xml_escape(class)
2775            ));
2776        }
2777        if let Some(allocated_storage) = pending.allocated_storage {
2778            fields.push(format!(
2779                "<AllocatedStorage>{}</AllocatedStorage>",
2780                allocated_storage
2781            ));
2782        }
2783        if let Some(backup_retention_period) = pending.backup_retention_period {
2784            fields.push(format!(
2785                "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2786                backup_retention_period
2787            ));
2788        }
2789        if let Some(multi_az) = pending.multi_az {
2790            fields.push(format!(
2791                "<MultiAZ>{}</MultiAZ>",
2792                if multi_az { "true" } else { "false" }
2793            ));
2794        }
2795        if let Some(ref engine_version) = pending.engine_version {
2796            fields.push(format!(
2797                "<EngineVersion>{}</EngineVersion>",
2798                xml_escape(engine_version)
2799            ));
2800        }
2801        if pending.master_user_password.is_some() {
2802            fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2803        }
2804        if !fields.is_empty() {
2805            format!(
2806                "<PendingModifiedValues>{}</PendingModifiedValues>",
2807                fields.join("")
2808            )
2809        } else {
2810            String::new()
2811        }
2812    } else {
2813        String::new()
2814    };
2815
2816    let latest_restorable_time_xml = instance
2817        .latest_restorable_time
2818        .map(|t| {
2819            format!(
2820                "<LatestRestorableTime>{}</LatestRestorableTime>",
2821                t.to_rfc3339()
2822            )
2823        })
2824        .unwrap_or_default();
2825
2826    format!(
2827        "<DBInstanceIdentifier>{identifier}</DBInstanceIdentifier>\
2828         <DBInstanceClass>{class}</DBInstanceClass>\
2829         <Engine>{engine}</Engine>\
2830         <DBInstanceStatus>{status}</DBInstanceStatus>\
2831         <MasterUsername>{master_username}</MasterUsername>\
2832         {db_name_xml}\
2833         <Endpoint><Address>{endpoint_address}</Address><Port>{port}</Port></Endpoint>\
2834         <AllocatedStorage>{allocated_storage}</AllocatedStorage>\
2835         <InstanceCreateTime>{create_time}</InstanceCreateTime>\
2836         <PreferredBackupWindow>{preferred_backup_window}</PreferredBackupWindow>\
2837         <BackupRetentionPeriod>{backup_retention_period}</BackupRetentionPeriod>\
2838         <DBSecurityGroups/>\
2839         {vpc_security_groups_xml}\
2840         {db_parameter_groups_xml}\
2841         <AvailabilityZone>us-east-1a</AvailabilityZone>\
2842         {latest_restorable_time_xml}\
2843         <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2844         <MultiAZ>{multi_az}</MultiAZ>\
2845         <EngineVersion>{engine_version}</EngineVersion>\
2846         <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2847         {read_replica_identifiers_xml}\
2848         {read_replica_source_xml}\
2849         <LicenseModel>{license_model}</LicenseModel>\
2850         {option_group_memberships_xml}\
2851         <PubliclyAccessible>{publicly_accessible}</PubliclyAccessible>\
2852         <StorageType>gp2</StorageType>\
2853         <DbInstancePort>{port}</DbInstancePort>\
2854         <StorageEncrypted>false</StorageEncrypted>\
2855         <DbiResourceId>{dbi_resource_id}</DbiResourceId>\
2856         <DeletionProtection>{deletion_protection}</DeletionProtection>\
2857         {pending_modified_values_xml}\
2858         <DBInstanceArn>{arn}</DBInstanceArn>",
2859        identifier = xml_escape(&instance.db_instance_identifier),
2860        class = xml_escape(&instance.db_instance_class),
2861        engine = xml_escape(&instance.engine),
2862        status = xml_escape(status),
2863        master_username = xml_escape(&instance.master_username),
2864        endpoint_address = xml_escape(&instance.endpoint_address),
2865        port = instance.port,
2866        allocated_storage = instance.allocated_storage,
2867        create_time = instance.created_at.to_rfc3339(),
2868        preferred_backup_window = xml_escape(&instance.preferred_backup_window),
2869        backup_retention_period = instance.backup_retention_period,
2870        multi_az = if instance.multi_az { "true" } else { "false" },
2871        engine_version = xml_escape(&instance.engine_version),
2872        license_model = license_model_for_engine(&instance.engine),
2873        publicly_accessible = if instance.publicly_accessible {
2874            "true"
2875        } else {
2876            "false"
2877        },
2878        dbi_resource_id = xml_escape(&instance.dbi_resource_id),
2879        deletion_protection = if instance.deletion_protection {
2880            "true"
2881        } else {
2882            "false"
2883        },
2884        arn = xml_escape(&instance.db_instance_arn),
2885    )
2886}
2887
2888fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2889    format!(
2890        "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2891         <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2892         <SnapshotCreateTime>{}</SnapshotCreateTime>\
2893         <Engine>{}</Engine>\
2894         <EngineVersion>{}</EngineVersion>\
2895         <AllocatedStorage>{}</AllocatedStorage>\
2896         <Status>{}</Status>\
2897         <Port>{}</Port>\
2898         <MasterUsername>{}</MasterUsername>\
2899         {}\
2900         <DbiResourceId>{}</DbiResourceId>\
2901         <SnapshotType>{}</SnapshotType>\
2902         <DBSnapshotArn>{}</DBSnapshotArn>",
2903        xml_escape(&snapshot.db_snapshot_identifier),
2904        xml_escape(&snapshot.db_instance_identifier),
2905        snapshot.snapshot_create_time.to_rfc3339(),
2906        xml_escape(&snapshot.engine),
2907        xml_escape(&snapshot.engine_version),
2908        snapshot.allocated_storage,
2909        xml_escape(&snapshot.status),
2910        snapshot.port,
2911        xml_escape(&snapshot.master_username),
2912        snapshot
2913            .db_name
2914            .as_ref()
2915            .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
2916            .unwrap_or_default(),
2917        xml_escape(&snapshot.dbi_resource_id),
2918        xml_escape(&snapshot.snapshot_type),
2919        xml_escape(&snapshot.db_snapshot_arn),
2920    )
2921}
2922
2923fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
2924    let subnets_xml = subnet_group
2925        .subnet_ids
2926        .iter()
2927        .zip(&subnet_group.subnet_availability_zones)
2928        .map(|(subnet_id, az)| {
2929            format!(
2930                "<Subnet>\
2931                 <SubnetIdentifier>{}</SubnetIdentifier>\
2932                 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
2933                 <SubnetStatus>Active</SubnetStatus>\
2934                 </Subnet>",
2935                xml_escape(subnet_id),
2936                xml_escape(az)
2937            )
2938        })
2939        .collect::<String>();
2940
2941    format!(
2942        "<DBSubnetGroupName>{}</DBSubnetGroupName>\
2943         <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
2944         <VpcId>{}</VpcId>\
2945         <SubnetGroupStatus>Complete</SubnetGroupStatus>\
2946         <Subnets>{}</Subnets>\
2947         <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
2948        xml_escape(&subnet_group.db_subnet_group_name),
2949        xml_escape(&subnet_group.db_subnet_group_description),
2950        xml_escape(&subnet_group.vpc_id),
2951        subnets_xml,
2952        xml_escape(&subnet_group.db_subnet_group_arn),
2953    )
2954}
2955
2956fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
2957    format!(
2958        "<DBParameterGroupName>{}</DBParameterGroupName>\
2959         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2960         <Description>{}</Description>\
2961         <DBParameterGroupArn>{}</DBParameterGroupArn>",
2962        xml_escape(&parameter_group.db_parameter_group_name),
2963        xml_escape(&parameter_group.db_parameter_group_family),
2964        xml_escape(&parameter_group.description),
2965        xml_escape(&parameter_group.db_parameter_group_arn),
2966    )
2967}
2968
2969fn db_instance_not_found(identifier: &str) -> AwsServiceError {
2970    AwsServiceError::aws_error(
2971        StatusCode::NOT_FOUND,
2972        "DBInstanceNotFound",
2973        format!("DBInstance {} not found.", identifier),
2974    )
2975}
2976
2977fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
2978    AwsServiceError::aws_error(
2979        StatusCode::NOT_FOUND,
2980        "DBSnapshotNotFound",
2981        format!("DBSnapshot {} not found.", identifier),
2982    )
2983}
2984
2985fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
2986    AwsServiceError::aws_error(
2987        StatusCode::NOT_FOUND,
2988        "DBInstanceNotFound",
2989        format!("DBInstance {resource_name} not found."),
2990    )
2991}
2992
2993fn find_instance_by_arn<'a>(
2994    state: &'a crate::state::RdsState,
2995    resource_name: &str,
2996) -> Result<&'a DbInstance, AwsServiceError> {
2997    state
2998        .instances
2999        .values()
3000        .find(|instance| instance.db_instance_arn == resource_name)
3001        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3002}
3003
3004fn find_instance_by_arn_mut<'a>(
3005    state: &'a mut crate::state::RdsState,
3006    resource_name: &str,
3007) -> Result<&'a mut DbInstance, AwsServiceError> {
3008    state
3009        .instances
3010        .values_mut()
3011        .find(|instance| instance.db_instance_arn == resource_name)
3012        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3013}
3014
3015fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
3016    for tag in incoming {
3017        if let Some(existing_tag) = existing
3018            .iter_mut()
3019            .find(|candidate| candidate.key == tag.key)
3020        {
3021            existing_tag.value = tag.value.clone();
3022        } else {
3023            existing.push(tag.clone());
3024        }
3025    }
3026}
3027
3028fn license_model_for_engine(engine: &str) -> &'static str {
3029    // Match AWS's reported license model exactly. Oracle and SQL Server
3030    // both use the BYOL/license-included split; fakecloud reports
3031    // license-included since the upstream dev-edition images are
3032    // free-to-use. Db2 is reported as bring-your-own-license to mirror
3033    // AWS's RDS for Db2 default.
3034    match engine {
3035        "mysql" | "mariadb" => "general-public-license",
3036        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "license-included",
3037        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "license-included",
3038        "db2-se" | "db2-ae" => "bring-your-own-license",
3039        _ => "postgresql-license",
3040    }
3041}
3042
3043fn default_db_name(engine: &str) -> &'static str {
3044    match engine {
3045        "mysql" | "mariadb" => "mysql",
3046        // Oracle's gvenzl image creates an `ORACLE_DATABASE` alongside
3047        // the built-in FREEPDB1 — keep `ORCL` as the default name to
3048        // match what AWS RDS for Oracle returns when you don't pass
3049        // `DBName`.
3050        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "ORCL",
3051        // SQL Server installs system DBs by default; AWS doesn't
3052        // create a user DB unless `DBName` is supplied. Use `master`
3053        // as the default the SDK can connect to.
3054        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "master",
3055        "db2-se" | "db2-ae" => "BLUDB",
3056        _ => "postgres",
3057    }
3058}
3059
3060/// Pick the port AWS defaults to for a freshly-created instance of
3061/// `engine`. Mirrors the AWS RDS defaults so client SDKs that connect
3062/// without an explicit `--port` flag hit the right listener.
3063fn default_port_for_engine(engine: &str) -> i32 {
3064    match engine {
3065        "postgres" => 5432,
3066        "mysql" | "mariadb" => 3306,
3067        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => 1521,
3068        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => 1433,
3069        "db2-se" | "db2-ae" => 50000,
3070        _ => 5432,
3071    }
3072}
3073
3074/// Pick the built-in parameter group name AWS assigns to a new
3075/// instance when the caller doesn't override it. The name encodes the
3076/// engine family plus its major version (e.g. `default.postgres16`,
3077/// `default.mysql8.0`, `default.oracle-ee-23`, `default.sqlserver-ex-16`,
3078/// `default.db2-se-11.5`).
3079fn default_parameter_group(engine: &str, engine_version: &str) -> String {
3080    match engine {
3081        "postgres" => {
3082            let major = engine_version.split('.').next().unwrap_or("16");
3083            format!("default.postgres{}", major)
3084        }
3085        "mysql" => {
3086            let major = if engine_version.starts_with("5.7") {
3087                "5.7"
3088            } else {
3089                "8.0"
3090            };
3091            format!("default.mysql{}", major)
3092        }
3093        "mariadb" => {
3094            let major = if engine_version.starts_with("10.11") {
3095                "10.11"
3096            } else {
3097                "10.6"
3098            };
3099            format!("default.mariadb{}", major)
3100        }
3101        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
3102            let major = engine_version.split('.').next().unwrap_or("23");
3103            format!("default.{engine}-{major}")
3104        }
3105        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
3106            // AWS uses the SQL Server major-version number ("16" for
3107            // 2022, "15" for 2019) in the default parameter group.
3108            let major = engine_version.split('.').next().unwrap_or("16");
3109            format!("default.{engine}-{major}")
3110        }
3111        "db2-se" | "db2-ae" => {
3112            // Db2 ships major.minor as the parameter-group key
3113            // (e.g. `default.db2-se-11.5`).
3114            let mut parts = engine_version.split('.');
3115            let major = parts.next().unwrap_or("11");
3116            let minor = parts.next().unwrap_or("5");
3117            format!("default.{engine}-{major}.{minor}")
3118        }
3119        _ => "default.postgres16".to_string(),
3120    }
3121}
3122
3123fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
3124    match error {
3125        RuntimeError::Unavailable => AwsServiceError::aws_error(
3126            StatusCode::SERVICE_UNAVAILABLE,
3127            "InvalidParameterValue",
3128            "Docker/Podman is required for RDS DB instances but is not available",
3129        ),
3130        RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
3131            StatusCode::INTERNAL_SERVER_ERROR,
3132            "InternalFailure",
3133            message,
3134        ),
3135    }
3136}
3137
3138#[cfg(test)]
3139mod tests {
3140    use std::collections::HashMap;
3141    use std::sync::Arc;
3142
3143    use bytes::Bytes;
3144    use chrono::Utc;
3145    use http::{HeaderMap, Method};
3146    use parking_lot::RwLock;
3147    use uuid::Uuid;
3148
3149    use super::{
3150        db_instance_xml, default_db_name, default_parameter_group, default_port_for_engine,
3151        filter_engine_versions, filter_orderable_options, license_model_for_engine, merge_tags,
3152        optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
3153        RdsSourceType,
3154    };
3155    use crate::state::{default_engine_versions, default_orderable_options, DbInstance, RdsTag};
3156    use fakecloud_core::delivery::DeliveryBus;
3157    use fakecloud_core::service::{AwsRequest, AwsService, AwsServiceError};
3158
3159    #[test]
3160    fn default_port_matches_aws_for_each_engine() {
3161        assert_eq!(default_port_for_engine("postgres"), 5432);
3162        assert_eq!(default_port_for_engine("mysql"), 3306);
3163        assert_eq!(default_port_for_engine("mariadb"), 3306);
3164        assert_eq!(default_port_for_engine("oracle-ee"), 1521);
3165        assert_eq!(default_port_for_engine("oracle-se2"), 1521);
3166        assert_eq!(default_port_for_engine("sqlserver-ee"), 1433);
3167        assert_eq!(default_port_for_engine("sqlserver-ex"), 1433);
3168        assert_eq!(default_port_for_engine("db2-se"), 50000);
3169        assert_eq!(default_port_for_engine("db2-ae"), 50000);
3170    }
3171
3172    #[test]
3173    fn default_parameter_group_uses_engine_major_version() {
3174        assert_eq!(
3175            default_parameter_group("postgres", "16.3"),
3176            "default.postgres16"
3177        );
3178        assert_eq!(
3179            default_parameter_group("mysql", "8.0.35"),
3180            "default.mysql8.0"
3181        );
3182        assert_eq!(
3183            default_parameter_group("oracle-ee", "23.0.0"),
3184            "default.oracle-ee-23"
3185        );
3186        assert_eq!(
3187            default_parameter_group("sqlserver-ex", "16.00.4085.2.v1"),
3188            "default.sqlserver-ex-16"
3189        );
3190        assert_eq!(
3191            default_parameter_group("db2-se", "11.5.9.0.sb00000000.r1"),
3192            "default.db2-se-11.5"
3193        );
3194    }
3195
3196    #[test]
3197    fn license_model_reflects_engine_class() {
3198        assert_eq!(license_model_for_engine("postgres"), "postgresql-license");
3199        assert_eq!(license_model_for_engine("mysql"), "general-public-license");
3200        assert_eq!(license_model_for_engine("oracle-ee"), "license-included");
3201        assert_eq!(license_model_for_engine("sqlserver-se"), "license-included");
3202        assert_eq!(license_model_for_engine("db2-ae"), "bring-your-own-license");
3203    }
3204
3205    #[test]
3206    fn default_db_name_picks_per_engine_default() {
3207        assert_eq!(default_db_name("postgres"), "postgres");
3208        assert_eq!(default_db_name("mysql"), "mysql");
3209        assert_eq!(default_db_name("oracle-ee"), "ORCL");
3210        assert_eq!(default_db_name("sqlserver-ex"), "master");
3211        assert_eq!(default_db_name("db2-se"), "BLUDB");
3212    }
3213
3214    #[test]
3215    fn validate_create_request_accepts_new_engines() {
3216        for (engine, version, port) in [
3217            ("oracle-ee", "23.0.0", 1521),
3218            ("sqlserver-ex", "16.00.4085.2.v1", 1433),
3219            ("db2-se", "11.5.9.0.sb00000000.r1", 50000),
3220        ] {
3221            validate_create_request("test-db", 20, "db.t3.micro", engine, version, port)
3222                .expect("engine should be accepted");
3223        }
3224    }
3225
3226    #[test]
3227    fn validate_create_request_rejects_unsupported_engine_version() {
3228        let err =
3229            validate_create_request("test-db", 20, "db.t3.micro", "oracle-ee", "12.0.0", 1521)
3230                .expect_err("12.x is not in the supported list");
3231        let msg = format!("{err:?}");
3232        assert!(msg.contains("EngineVersion"), "unexpected: {msg}");
3233    }
3234
3235    #[test]
3236    fn filter_engine_versions_matches_requested_engine() {
3237        let versions = default_engine_versions();
3238
3239        let filtered =
3240            filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
3241
3242        assert_eq!(filtered.len(), 4); // All postgres versions
3243        assert!(filtered.iter().all(|v| v.engine == "postgres"));
3244    }
3245
3246    #[test]
3247    fn filter_orderable_options_respects_instance_class() {
3248        let options = default_orderable_options();
3249
3250        let filtered = filter_orderable_options(
3251            &options,
3252            &Some("postgres".to_string()),
3253            &Some("16.3".to_string()),
3254            &Some("db.t3.micro".to_string()),
3255            &None,
3256            Some(true),
3257        );
3258
3259        assert_eq!(filtered.len(), 1);
3260        assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
3261    }
3262
3263    #[test]
3264    fn validate_create_request_rejects_unsupported_engine() {
3265        let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
3266            .expect_err("unsupported engine");
3267
3268        assert_eq!(error.code(), "InvalidParameterValue");
3269    }
3270
3271    #[test]
3272    fn optional_i32_param_rejects_invalid_integer() {
3273        let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
3274
3275        let error = optional_i32_param(&request, "Port").expect_err("invalid port");
3276
3277        assert_eq!(error.code(), "InvalidParameterValue");
3278    }
3279
3280    #[test]
3281    fn db_instance_xml_renders_endpoint_and_status() {
3282        let created_at = Utc::now();
3283        let instance = DbInstance {
3284            db_instance_identifier: "test-db".to_string(),
3285            db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
3286            db_instance_class: "db.t3.micro".to_string(),
3287            engine: "postgres".to_string(),
3288            engine_version: "16.3".to_string(),
3289            db_instance_status: "available".to_string(),
3290            master_username: "admin".to_string(),
3291            db_name: Some("appdb".to_string()),
3292            endpoint_address: "127.0.0.1".to_string(),
3293            port: 15432,
3294            allocated_storage: 20,
3295            publicly_accessible: true,
3296            deletion_protection: false,
3297            created_at,
3298            dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3299            master_user_password: "secret123".to_string(),
3300            container_id: "container".to_string(),
3301            host_port: 15432,
3302            tags: Vec::new(),
3303            read_replica_source_db_instance_identifier: None,
3304            read_replica_db_instance_identifiers: Vec::new(),
3305            vpc_security_group_ids: vec!["sg-12345678".to_string()],
3306            db_parameter_group_name: Some("default.postgres16".to_string()),
3307            backup_retention_period: 1,
3308            preferred_backup_window: "03:00-04:00".to_string(),
3309            latest_restorable_time: Some(created_at),
3310            option_group_name: None,
3311            multi_az: false,
3312            pending_modified_values: None,
3313        };
3314
3315        let xml = db_instance_xml(&instance, Some("creating"));
3316
3317        assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
3318        assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
3319        assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
3320    }
3321
3322    #[test]
3323    fn parse_tags_reads_rds_query_shape() {
3324        let request = request(
3325            "AddTagsToResource",
3326            &[
3327                ("Tags.Tag.1.Key", "env"),
3328                ("Tags.Tag.1.Value", "dev"),
3329                ("Tags.Tag.2.Key", "team"),
3330                ("Tags.Tag.2.Value", "core"),
3331            ],
3332        );
3333
3334        let tags = parse_tags(&request).expect("tags");
3335
3336        assert_eq!(
3337            tags,
3338            vec![
3339                RdsTag {
3340                    key: "env".to_string(),
3341                    value: "dev".to_string(),
3342                },
3343                RdsTag {
3344                    key: "team".to_string(),
3345                    value: "core".to_string(),
3346                }
3347            ]
3348        );
3349    }
3350
3351    #[test]
3352    fn parse_tag_keys_reads_member_shape() {
3353        let request = request(
3354            "RemoveTagsFromResource",
3355            &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
3356        );
3357
3358        let tag_keys = parse_tag_keys(&request).expect("tag keys");
3359
3360        assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
3361    }
3362
3363    #[test]
3364    fn merge_tags_updates_existing_values() {
3365        let mut tags = vec![RdsTag {
3366            key: "env".to_string(),
3367            value: "dev".to_string(),
3368        }];
3369
3370        merge_tags(
3371            &mut tags,
3372            &[
3373                RdsTag {
3374                    key: "env".to_string(),
3375                    value: "prod".to_string(),
3376                },
3377                RdsTag {
3378                    key: "team".to_string(),
3379                    value: "core".to_string(),
3380                },
3381            ],
3382        );
3383
3384        assert_eq!(tags.len(), 2);
3385        assert_eq!(tags[0].value, "prod");
3386        assert_eq!(tags[1].key, "team");
3387    }
3388
3389    #[tokio::test]
3390    async fn describe_engine_versions_returns_xml_body() {
3391        let service = RdsService::new(Arc::new(RwLock::new(
3392            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3393        )));
3394        let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
3395
3396        let response = service.handle(request).await.expect("response");
3397        let body = String::from_utf8(response.body.expect_bytes().to_vec()).expect("utf8");
3398
3399        assert!(body.contains("<DescribeDBEngineVersionsResponse"));
3400        assert!(body.contains("<Engine>postgres</Engine>"));
3401        assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
3402    }
3403
3404    fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
3405        let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
3406        for (key, value) in params {
3407            query_params.insert((*key).to_string(), (*value).to_string());
3408        }
3409
3410        AwsRequest {
3411            service: "rds".to_string(),
3412            action: action.to_string(),
3413            region: "us-east-1".to_string(),
3414            account_id: "123456789012".to_string(),
3415            request_id: "test-request-id".to_string(),
3416            headers: HeaderMap::new(),
3417            query_params,
3418            body: Bytes::new(),
3419            body_stream: parking_lot::Mutex::new(None),
3420            path_segments: vec![],
3421            raw_path: "/".to_string(),
3422            raw_query: String::new(),
3423            method: Method::POST,
3424            is_query_protocol: true,
3425            access_key_id: None,
3426            principal: None,
3427        }
3428    }
3429
3430    // ── Helpers for handler tests ────────────────────────────────────
3431
3432    fn make_service() -> RdsService {
3433        RdsService::new(Arc::new(RwLock::new(
3434            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3435        )))
3436    }
3437
3438    #[derive(Default)]
3439    struct CapturedEvent {
3440        source: String,
3441        detail_type: String,
3442        detail: String,
3443    }
3444
3445    #[derive(Default)]
3446    struct RecordingEb {
3447        events: std::sync::Mutex<Vec<CapturedEvent>>,
3448    }
3449
3450    impl fakecloud_core::delivery::EventBridgeDelivery for RecordingEb {
3451        fn put_event(&self, source: &str, detail_type: &str, detail: &str, _bus: &str) {
3452            self.events.lock().unwrap().push(CapturedEvent {
3453                source: source.to_string(),
3454                detail_type: detail_type.to_string(),
3455                detail: detail.to_string(),
3456            });
3457        }
3458    }
3459
3460    fn make_service_with_recorder() -> (RdsService, Arc<RecordingEb>) {
3461        let recorder = Arc::new(RecordingEb::default());
3462        let bus = Arc::new(DeliveryBus::new().with_eventbridge(recorder.clone()));
3463        let svc = RdsService::new(Arc::new(RwLock::new(
3464            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3465        )))
3466        .with_delivery_bus(bus);
3467        (svc, recorder)
3468    }
3469
3470    #[test]
3471    fn emit_event_emits_aws_rds_event_via_bus() {
3472        let (svc, rec) = make_service_with_recorder();
3473        svc.emit_event(
3474            RdsSourceType::DbInstance,
3475            "my-db",
3476            "arn:aws:rds:us-east-1:123456789012:db:my-db",
3477            "RDS-EVENT-0005",
3478            &["creation"],
3479            "DB instance created",
3480        );
3481        let events = rec.events.lock().unwrap();
3482        assert_eq!(events.len(), 1);
3483        let e = &events[0];
3484        assert_eq!(e.source, "aws.rds");
3485        assert_eq!(e.detail_type, "RDS DB Instance Event");
3486        let detail: serde_json::Value = serde_json::from_str(&e.detail).unwrap();
3487        assert_eq!(detail["EventID"], "RDS-EVENT-0005");
3488        assert_eq!(detail["SourceType"], "DB_INSTANCE");
3489        assert_eq!(detail["SourceIdentifier"], "my-db");
3490        assert_eq!(detail["Message"], "DB instance created");
3491        assert_eq!(detail["EventCategories"][0], "creation");
3492    }
3493
3494    #[test]
3495    fn emit_event_no_op_without_bus() {
3496        let svc = make_service();
3497        svc.emit_event(
3498            RdsSourceType::DbSnapshot,
3499            "snap",
3500            "arn:aws:rds:us-east-1:123456789012:snapshot:snap",
3501            "RDS-EVENT-0042",
3502            &["creation"],
3503            "Manual snapshot created",
3504        );
3505    }
3506
3507    #[test]
3508    fn rds_source_type_detail_type_mapping() {
3509        assert_eq!(
3510            RdsSourceType::DbInstance.detail_type(),
3511            "RDS DB Instance Event"
3512        );
3513        assert_eq!(
3514            RdsSourceType::DbSnapshot.detail_type(),
3515            "RDS DB Snapshot Event"
3516        );
3517        assert_eq!(
3518            RdsSourceType::DbParameterGroup.detail_type(),
3519            "RDS DB Parameter Group Event"
3520        );
3521    }
3522
3523    fn body_of(resp: fakecloud_core::service::AwsResponse) -> String {
3524        String::from_utf8(resp.body.expect_bytes().to_vec()).expect("utf8")
3525    }
3526
3527    fn seed_instance(svc: &RdsService, identifier: &str) -> String {
3528        let arn = format!("arn:aws:rds:us-east-1:123456789012:db:{identifier}");
3529        let mut accounts = svc.state.write();
3530        let state = accounts.default_mut();
3531        state.instances.insert(
3532            identifier.to_string(),
3533            DbInstance {
3534                db_instance_identifier: identifier.to_string(),
3535                db_instance_arn: arn.clone(),
3536                db_instance_class: "db.t3.micro".to_string(),
3537                engine: "postgres".to_string(),
3538                engine_version: "16.3".to_string(),
3539                db_instance_status: "available".to_string(),
3540                master_username: "admin".to_string(),
3541                db_name: Some("appdb".to_string()),
3542                endpoint_address: "127.0.0.1".to_string(),
3543                port: 15432,
3544                allocated_storage: 20,
3545                publicly_accessible: true,
3546                deletion_protection: false,
3547                created_at: Utc::now(),
3548                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3549                master_user_password: "secret".to_string(),
3550                container_id: "container".to_string(),
3551                host_port: 15432,
3552                tags: Vec::new(),
3553                read_replica_source_db_instance_identifier: None,
3554                read_replica_db_instance_identifiers: Vec::new(),
3555                vpc_security_group_ids: vec!["sg-12345678".to_string()],
3556                db_parameter_group_name: Some("default.postgres16".to_string()),
3557                backup_retention_period: 1,
3558                preferred_backup_window: "03:00-04:00".to_string(),
3559                latest_restorable_time: None,
3560                option_group_name: None,
3561                multi_az: false,
3562                pending_modified_values: None,
3563            },
3564        );
3565        arn
3566    }
3567
3568    fn assert_code<T>(result: Result<T, AwsServiceError>, expected_code: &str) -> AwsServiceError {
3569        match result {
3570            Ok(_) => panic!("expected error {expected_code}, got Ok"),
3571            Err(e) => {
3572                assert_eq!(e.code(), expected_code, "wrong error code");
3573                e
3574            }
3575        }
3576    }
3577
3578    // ── Tag operations ───────────────────────────────────────────────
3579
3580    #[test]
3581    fn add_tags_requires_resource_name() {
3582        let svc = make_service();
3583        let req = request("AddTagsToResource", &[]);
3584        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3585    }
3586
3587    #[test]
3588    fn add_tags_requires_at_least_one_tag() {
3589        let svc = make_service();
3590        let arn = seed_instance(&svc, "db1");
3591        let req = request("AddTagsToResource", &[("ResourceName", arn.as_str())]);
3592        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3593    }
3594
3595    #[test]
3596    fn add_tags_appends_then_list_tags_returns_them() {
3597        let svc = make_service();
3598        let arn = seed_instance(&svc, "db1");
3599        let add_req = request(
3600            "AddTagsToResource",
3601            &[
3602                ("ResourceName", arn.as_str()),
3603                ("Tags.Tag.1.Key", "env"),
3604                ("Tags.Tag.1.Value", "dev"),
3605            ],
3606        );
3607        svc.add_tags_to_resource(&add_req).unwrap();
3608
3609        let list_req = request("ListTagsForResource", &[("ResourceName", arn.as_str())]);
3610        let body = body_of(svc.list_tags_for_resource(&list_req).unwrap());
3611        assert!(body.contains("<Key>env</Key>"));
3612        assert!(body.contains("<Value>dev</Value>"));
3613    }
3614
3615    #[test]
3616    fn list_tags_rejects_filters_param() {
3617        let svc = make_service();
3618        let arn = seed_instance(&svc, "db1");
3619        let req = request(
3620            "ListTagsForResource",
3621            &[
3622                ("ResourceName", arn.as_str()),
3623                ("Filters.Filter.1.Name", "x"),
3624            ],
3625        );
3626        assert_code(svc.list_tags_for_resource(&req), "InvalidParameterValue");
3627    }
3628
3629    #[test]
3630    fn list_tags_unknown_arn_errors() {
3631        let svc = make_service();
3632        let req = request(
3633            "ListTagsForResource",
3634            &[("ResourceName", "arn:aws:rds:us-east-1:123456789012:db:nope")],
3635        );
3636        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
3637    }
3638
3639    #[test]
3640    fn remove_tags_strips_only_listed_keys() {
3641        let svc = make_service();
3642        let arn = seed_instance(&svc, "db1");
3643        {
3644            let mut __a = svc.state.write();
3645            let state = __a.default_mut();
3646            let inst = state.instances.get_mut("db1").unwrap();
3647            inst.tags = vec![
3648                RdsTag {
3649                    key: "env".to_string(),
3650                    value: "dev".to_string(),
3651                },
3652                RdsTag {
3653                    key: "team".to_string(),
3654                    value: "core".to_string(),
3655                },
3656            ];
3657        }
3658        let req = request(
3659            "RemoveTagsFromResource",
3660            &[("ResourceName", arn.as_str()), ("TagKeys.member.1", "env")],
3661        );
3662        svc.remove_tags_from_resource(&req).unwrap();
3663
3664        let __a = svc.state.read();
3665        let state = __a.default_ref();
3666        let tags = &state.instances.get("db1").unwrap().tags;
3667        assert_eq!(tags.len(), 1);
3668        assert_eq!(tags[0].key, "team");
3669    }
3670
3671    #[test]
3672    fn remove_tags_requires_keys() {
3673        let svc = make_service();
3674        let arn = seed_instance(&svc, "db1");
3675        let req = request("RemoveTagsFromResource", &[("ResourceName", arn.as_str())]);
3676        assert_code(svc.remove_tags_from_resource(&req), "MissingParameter");
3677    }
3678
3679    // ── DB Subnet Groups ─────────────────────────────────────────────
3680
3681    fn create_subnet_group(svc: &RdsService, name: &str) {
3682        let req = request(
3683            "CreateDBSubnetGroup",
3684            &[
3685                ("DBSubnetGroupName", name),
3686                ("DBSubnetGroupDescription", "test"),
3687                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3688                ("SubnetIds.SubnetIdentifier.2", "subnet-bbb"),
3689            ],
3690        );
3691        svc.create_db_subnet_group(&req).unwrap();
3692    }
3693
3694    #[test]
3695    fn create_db_subnet_group_requires_two_subnets() {
3696        let svc = make_service();
3697        let req = request(
3698            "CreateDBSubnetGroup",
3699            &[
3700                ("DBSubnetGroupName", "sg1"),
3701                ("DBSubnetGroupDescription", "t"),
3702                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3703            ],
3704        );
3705        assert_code(
3706            svc.create_db_subnet_group(&req),
3707            "DBSubnetGroupDoesNotCoverEnoughAZs",
3708        );
3709    }
3710
3711    #[test]
3712    fn create_db_subnet_group_rejects_empty_subnets() {
3713        let svc = make_service();
3714        let req = request(
3715            "CreateDBSubnetGroup",
3716            &[
3717                ("DBSubnetGroupName", "sg1"),
3718                ("DBSubnetGroupDescription", "t"),
3719            ],
3720        );
3721        assert_code(svc.create_db_subnet_group(&req), "InvalidParameterValue");
3722    }
3723
3724    #[test]
3725    fn create_db_subnet_group_rejects_duplicates() {
3726        let svc = make_service();
3727        create_subnet_group(&svc, "sg1");
3728        let req = request(
3729            "CreateDBSubnetGroup",
3730            &[
3731                ("DBSubnetGroupName", "sg1"),
3732                ("DBSubnetGroupDescription", "t"),
3733                ("SubnetIds.SubnetIdentifier.1", "subnet-x"),
3734                ("SubnetIds.SubnetIdentifier.2", "subnet-y"),
3735            ],
3736        );
3737        assert_code(
3738            svc.create_db_subnet_group(&req),
3739            "DBSubnetGroupAlreadyExists",
3740        );
3741    }
3742
3743    #[test]
3744    fn describe_db_subnet_groups_by_name_or_list() {
3745        let svc = make_service();
3746        create_subnet_group(&svc, "sg-alpha");
3747        create_subnet_group(&svc, "sg-beta");
3748
3749        let by_name = request(
3750            "DescribeDBSubnetGroups",
3751            &[("DBSubnetGroupName", "sg-alpha")],
3752        );
3753        let body = body_of(svc.describe_db_subnet_groups(&by_name).unwrap());
3754        assert!(body.contains("sg-alpha"));
3755        assert!(!body.contains("sg-beta"));
3756
3757        let list_all = request("DescribeDBSubnetGroups", &[]);
3758        let body = body_of(svc.describe_db_subnet_groups(&list_all).unwrap());
3759        assert!(body.contains("sg-alpha"));
3760        assert!(body.contains("sg-beta"));
3761    }
3762
3763    #[test]
3764    fn describe_db_subnet_groups_unknown_name_errors() {
3765        let svc = make_service();
3766        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
3767        assert_code(
3768            svc.describe_db_subnet_groups(&req),
3769            "DBSubnetGroupNotFoundFault",
3770        );
3771    }
3772
3773    #[test]
3774    fn delete_db_subnet_group_unknown_errors() {
3775        let svc = make_service();
3776        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
3777        assert_code(
3778            svc.delete_db_subnet_group(&req),
3779            "DBSubnetGroupNotFoundFault",
3780        );
3781    }
3782
3783    #[test]
3784    fn delete_db_subnet_group_removes_entry() {
3785        let svc = make_service();
3786        create_subnet_group(&svc, "sg1");
3787        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "sg1")]);
3788        svc.delete_db_subnet_group(&req).unwrap();
3789        assert!(svc.state.read().default_ref().subnet_groups.is_empty());
3790    }
3791
3792    #[test]
3793    fn modify_db_subnet_group_updates_subnet_ids() {
3794        let svc = make_service();
3795        create_subnet_group(&svc, "sg1");
3796        let req = request(
3797            "ModifyDBSubnetGroup",
3798            &[
3799                ("DBSubnetGroupName", "sg1"),
3800                ("SubnetIds.SubnetIdentifier.1", "subnet-new1"),
3801                ("SubnetIds.SubnetIdentifier.2", "subnet-new2"),
3802            ],
3803        );
3804        svc.modify_db_subnet_group(&req).unwrap();
3805
3806        let __a = svc.state.read();
3807        let state = __a.default_ref();
3808        let sg = state.subnet_groups.get("sg1").unwrap();
3809        assert_eq!(sg.subnet_ids, vec!["subnet-new1", "subnet-new2"]);
3810    }
3811
3812    // ── DB Parameter Groups ──────────────────────────────────────────
3813
3814    fn create_param_group(svc: &RdsService, name: &str) {
3815        let req = request(
3816            "CreateDBParameterGroup",
3817            &[
3818                ("DBParameterGroupName", name),
3819                ("DBParameterGroupFamily", "postgres16"),
3820                ("Description", "test"),
3821            ],
3822        );
3823        svc.create_db_parameter_group(&req).unwrap();
3824    }
3825
3826    #[test]
3827    fn create_db_parameter_group_rejects_unknown_family() {
3828        let svc = make_service();
3829        let req = request(
3830            "CreateDBParameterGroup",
3831            &[
3832                ("DBParameterGroupName", "pg1"),
3833                ("DBParameterGroupFamily", "oracle19"),
3834                ("Description", "t"),
3835            ],
3836        );
3837        assert_code(svc.create_db_parameter_group(&req), "InvalidParameterValue");
3838    }
3839
3840    #[test]
3841    fn create_db_parameter_group_rejects_duplicates() {
3842        let svc = make_service();
3843        create_param_group(&svc, "pg1");
3844        let req = request(
3845            "CreateDBParameterGroup",
3846            &[
3847                ("DBParameterGroupName", "pg1"),
3848                ("DBParameterGroupFamily", "postgres16"),
3849                ("Description", "t"),
3850            ],
3851        );
3852        assert_code(
3853            svc.create_db_parameter_group(&req),
3854            "DBParameterGroupAlreadyExists",
3855        );
3856    }
3857
3858    #[test]
3859    fn describe_db_parameter_groups_by_name_or_list() {
3860        let svc = make_service();
3861        create_param_group(&svc, "pg-alpha");
3862        create_param_group(&svc, "pg-beta");
3863        let by_name = request(
3864            "DescribeDBParameterGroups",
3865            &[("DBParameterGroupName", "pg-alpha")],
3866        );
3867        let body = body_of(svc.describe_db_parameter_groups(&by_name).unwrap());
3868        assert!(body.contains("pg-alpha"));
3869        assert!(!body.contains("pg-beta"));
3870        let list = request("DescribeDBParameterGroups", &[]);
3871        let body = body_of(svc.describe_db_parameter_groups(&list).unwrap());
3872        assert!(body.contains("pg-alpha"));
3873        assert!(body.contains("pg-beta"));
3874    }
3875
3876    #[test]
3877    fn describe_db_parameter_groups_unknown_name_errors() {
3878        let svc = make_service();
3879        let req = request(
3880            "DescribeDBParameterGroups",
3881            &[("DBParameterGroupName", "ghost")],
3882        );
3883        assert_code(
3884            svc.describe_db_parameter_groups(&req),
3885            "DBParameterGroupNotFound",
3886        );
3887    }
3888
3889    #[test]
3890    fn delete_db_parameter_group_rejects_default_groups() {
3891        let svc = make_service();
3892        let req = request(
3893            "DeleteDBParameterGroup",
3894            &[("DBParameterGroupName", "default.postgres16")],
3895        );
3896        assert_code(svc.delete_db_parameter_group(&req), "InvalidParameterValue");
3897    }
3898
3899    #[test]
3900    fn delete_db_parameter_group_unknown_errors() {
3901        let svc = make_service();
3902        let req = request(
3903            "DeleteDBParameterGroup",
3904            &[("DBParameterGroupName", "ghost")],
3905        );
3906        assert_code(
3907            svc.delete_db_parameter_group(&req),
3908            "DBParameterGroupNotFound",
3909        );
3910    }
3911
3912    #[test]
3913    fn delete_db_parameter_group_removes_entry() {
3914        let svc = make_service();
3915        create_param_group(&svc, "pg1");
3916        let req = request("DeleteDBParameterGroup", &[("DBParameterGroupName", "pg1")]);
3917        svc.delete_db_parameter_group(&req).unwrap();
3918        assert!(!svc
3919            .state
3920            .read()
3921            .default_ref()
3922            .parameter_groups
3923            .contains_key("pg1"));
3924    }
3925
3926    #[test]
3927    fn modify_db_parameter_group_updates_description() {
3928        let svc = make_service();
3929        create_param_group(&svc, "pg1");
3930        let req = request(
3931            "ModifyDBParameterGroup",
3932            &[
3933                ("DBParameterGroupName", "pg1"),
3934                ("Description", "shiny new"),
3935            ],
3936        );
3937        svc.modify_db_parameter_group(&req).unwrap();
3938        let __a = svc.state.read();
3939        let state = __a.default_ref();
3940        assert_eq!(
3941            state.parameter_groups.get("pg1").unwrap().description,
3942            "shiny new"
3943        );
3944    }
3945
3946    #[test]
3947    fn modify_db_parameter_group_unknown_errors() {
3948        let svc = make_service();
3949        let req = request(
3950            "ModifyDBParameterGroup",
3951            &[("DBParameterGroupName", "ghost"), ("Description", "x")],
3952        );
3953        assert_code(
3954            svc.modify_db_parameter_group(&req),
3955            "DBParameterGroupNotFound",
3956        );
3957    }
3958
3959    // ── DescribeDBInstances ──────────────────────────────────────────
3960
3961    #[test]
3962    fn describe_db_instances_by_id_returns_only_one() {
3963        let svc = make_service();
3964        seed_instance(&svc, "db1");
3965        seed_instance(&svc, "db2");
3966        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "db1")]);
3967        let body = body_of(svc.describe_db_instances(&req).unwrap());
3968        assert!(body.contains("<DBInstanceIdentifier>db1</DBInstanceIdentifier>"));
3969        assert!(!body.contains("<DBInstanceIdentifier>db2</DBInstanceIdentifier>"));
3970    }
3971
3972    #[test]
3973    fn describe_db_instances_unknown_id_errors() {
3974        let svc = make_service();
3975        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
3976        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
3977    }
3978
3979    #[test]
3980    fn describe_db_instances_lists_all_when_unbounded() {
3981        let svc = make_service();
3982        seed_instance(&svc, "db1");
3983        seed_instance(&svc, "db2");
3984        seed_instance(&svc, "db3");
3985        let req = request("DescribeDBInstances", &[]);
3986        let body = body_of(svc.describe_db_instances(&req).unwrap());
3987        for id in ["db1", "db2", "db3"] {
3988            assert!(body.contains(&format!(
3989                "<DBInstanceIdentifier>{id}</DBInstanceIdentifier>"
3990            )));
3991        }
3992    }
3993
3994    // ── ModifyDBInstance ─────────────────────────────────────────────
3995
3996    #[test]
3997    fn modify_db_instance_requires_at_least_one_change() {
3998        let svc = make_service();
3999        seed_instance(&svc, "db1");
4000        let req = request("ModifyDBInstance", &[("DBInstanceIdentifier", "db1")]);
4001        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4002    }
4003
4004    #[test]
4005    fn modify_db_instance_unknown_errors() {
4006        let svc = make_service();
4007        let req = request(
4008            "ModifyDBInstance",
4009            &[
4010                ("DBInstanceIdentifier", "ghost"),
4011                ("DBInstanceClass", "db.t3.small"),
4012            ],
4013        );
4014        assert_code(svc.modify_db_instance(&req), "DBInstanceNotFound");
4015    }
4016
4017    #[test]
4018    fn modify_db_instance_apply_immediately_updates_class() {
4019        let svc = make_service();
4020        seed_instance(&svc, "db1");
4021        let req = request(
4022            "ModifyDBInstance",
4023            &[
4024                ("DBInstanceIdentifier", "db1"),
4025                ("DBInstanceClass", "db.t3.small"),
4026                ("ApplyImmediately", "true"),
4027            ],
4028        );
4029        svc.modify_db_instance(&req).unwrap();
4030        let __a = svc.state.read();
4031        let state = __a.default_ref();
4032        assert_eq!(
4033            state.instances.get("db1").unwrap().db_instance_class,
4034            "db.t3.small"
4035        );
4036    }
4037
4038    #[test]
4039    fn modify_db_instance_pending_when_not_apply_immediately() {
4040        let svc = make_service();
4041        seed_instance(&svc, "db1");
4042        let req = request(
4043            "ModifyDBInstance",
4044            &[
4045                ("DBInstanceIdentifier", "db1"),
4046                ("DBInstanceClass", "db.t3.small"),
4047                ("ApplyImmediately", "false"),
4048            ],
4049        );
4050        svc.modify_db_instance(&req).unwrap();
4051        let __a = svc.state.read();
4052        let state = __a.default_ref();
4053        let inst = state.instances.get("db1").unwrap();
4054        assert_eq!(inst.db_instance_class, "db.t3.micro");
4055        assert_eq!(
4056            inst.pending_modified_values
4057                .as_ref()
4058                .unwrap()
4059                .db_instance_class
4060                .as_deref(),
4061            Some("db.t3.small"),
4062        );
4063    }
4064
4065    // ── Snapshots (sync ops only) ────────────────────────────────────
4066
4067    fn seed_snapshot(svc: &RdsService, snapshot_id: &str, instance_id: &str) {
4068        let mut __a = svc.state.write();
4069        let state = __a.default_mut();
4070        let arn = state.db_snapshot_arn(snapshot_id);
4071        state.snapshots.insert(
4072            snapshot_id.to_string(),
4073            crate::state::DbSnapshot {
4074                db_snapshot_identifier: snapshot_id.to_string(),
4075                db_snapshot_arn: arn,
4076                db_instance_identifier: instance_id.to_string(),
4077                snapshot_create_time: Utc::now(),
4078                engine: "postgres".to_string(),
4079                engine_version: "16.3".to_string(),
4080                allocated_storage: 20,
4081                status: "available".to_string(),
4082                port: 5432,
4083                master_username: "admin".to_string(),
4084                db_name: Some("appdb".to_string()),
4085                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
4086                snapshot_type: "manual".to_string(),
4087                master_user_password: "secret".to_string(),
4088                tags: Vec::new(),
4089                dump_data: Vec::new(),
4090            },
4091        );
4092    }
4093
4094    #[test]
4095    fn delete_db_snapshot_removes_entry() {
4096        let svc = make_service();
4097        seed_snapshot(&svc, "snap1", "db1");
4098        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "snap1")]);
4099        svc.delete_db_snapshot(&req).unwrap();
4100        assert!(svc.state.read().default_ref().snapshots.is_empty());
4101    }
4102
4103    #[test]
4104    fn delete_db_snapshot_unknown_errors() {
4105        let svc = make_service();
4106        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "ghost")]);
4107        assert_code(svc.delete_db_snapshot(&req), "DBSnapshotNotFound");
4108    }
4109
4110    #[test]
4111    fn describe_db_snapshots_rejects_both_filters() {
4112        let svc = make_service();
4113        let req = request(
4114            "DescribeDBSnapshots",
4115            &[("DBSnapshotIdentifier", "s"), ("DBInstanceIdentifier", "i")],
4116        );
4117        assert_code(
4118            svc.describe_db_snapshots(&req),
4119            "InvalidParameterCombination",
4120        );
4121    }
4122
4123    #[test]
4124    fn describe_db_snapshots_by_id_or_instance() {
4125        let svc = make_service();
4126        seed_snapshot(&svc, "snap1", "db1");
4127        seed_snapshot(&svc, "snap2", "db2");
4128
4129        let by_id = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "snap1")]);
4130        let body = body_of(svc.describe_db_snapshots(&by_id).unwrap());
4131        assert!(body.contains("snap1"));
4132        assert!(!body.contains("snap2"));
4133
4134        let by_instance = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "db2")]);
4135        let body = body_of(svc.describe_db_snapshots(&by_instance).unwrap());
4136        assert!(body.contains("snap2"));
4137        assert!(!body.contains("snap1"));
4138
4139        let list_all = request("DescribeDBSnapshots", &[]);
4140        let body = body_of(svc.describe_db_snapshots(&list_all).unwrap());
4141        assert!(body.contains("snap1"));
4142        assert!(body.contains("snap2"));
4143    }
4144
4145    #[test]
4146    fn describe_db_snapshots_unknown_id_errors() {
4147        let svc = make_service();
4148        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "ghost")]);
4149        assert_code(svc.describe_db_snapshots(&req), "DBSnapshotNotFound");
4150    }
4151
4152    // ── Error branch tests ──
4153
4154    #[test]
4155    fn describe_db_instances_not_found() {
4156        let svc = make_service();
4157        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
4158        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
4159    }
4160
4161    #[tokio::test]
4162    async fn delete_db_instance_not_found() {
4163        let svc = make_service();
4164        let req = request(
4165            "DeleteDBInstance",
4166            &[
4167                ("DBInstanceIdentifier", "ghost"),
4168                ("SkipFinalSnapshot", "true"),
4169            ],
4170        );
4171        assert_code(svc.delete_db_instance(&req).await, "DBInstanceNotFound");
4172    }
4173
4174    #[test]
4175    fn modify_db_instance_not_found() {
4176        let svc = make_service();
4177        let req = request(
4178            "ModifyDBInstance",
4179            &[
4180                ("DBInstanceIdentifier", "ghost"),
4181                ("AllocatedStorage", "20"),
4182            ],
4183        );
4184        // Validation fires before existence check
4185        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4186    }
4187
4188    #[tokio::test]
4189    async fn reboot_db_instance_not_found() {
4190        let svc = make_service();
4191        let req = request("RebootDBInstance", &[("DBInstanceIdentifier", "ghost")]);
4192        assert_code(svc.reboot_db_instance(&req).await, "DBInstanceNotFound");
4193    }
4194
4195    #[tokio::test]
4196    async fn create_db_snapshot_instance_not_found() {
4197        let svc = make_service();
4198        let req = request(
4199            "CreateDBSnapshot",
4200            &[
4201                ("DBInstanceIdentifier", "ghost"),
4202                ("DBSnapshotIdentifier", "snap1"),
4203            ],
4204        );
4205        assert_code(svc.create_db_snapshot(&req).await, "InvalidParameterValue");
4206    }
4207
4208    #[tokio::test]
4209    async fn restore_db_instance_snapshot_not_found() {
4210        let svc = make_service();
4211        let req = request(
4212            "RestoreDBInstanceFromDBSnapshot",
4213            &[
4214                ("DBInstanceIdentifier", "restored"),
4215                ("DBSnapshotIdentifier", "ghost-snap"),
4216            ],
4217        );
4218        assert_code(
4219            svc.restore_db_instance_from_db_snapshot(&req).await,
4220            "InvalidParameterValue",
4221        );
4222    }
4223
4224    #[tokio::test]
4225    async fn create_db_instance_read_replica_source_not_found() {
4226        let svc = make_service();
4227        let req = request(
4228            "CreateDBInstanceReadReplica",
4229            &[
4230                ("DBInstanceIdentifier", "replica"),
4231                ("SourceDBInstanceIdentifier", "ghost"),
4232            ],
4233        );
4234        assert_code(
4235            svc.create_db_instance_read_replica(&req).await,
4236            "InvalidParameterValue",
4237        );
4238    }
4239
4240    #[test]
4241    fn describe_db_engine_versions_basic() {
4242        let svc = make_service();
4243        let req = request("DescribeDBEngineVersions", &[]);
4244        let resp = svc.describe_db_engine_versions(&req).unwrap();
4245        let body = body_of(resp);
4246        assert!(body.contains("<DBEngineVersions>"));
4247    }
4248
4249    #[test]
4250    fn describe_orderable_db_instance_options_basic() {
4251        let svc = make_service();
4252        let req = request("DescribeOrderableDBInstanceOptions", &[("Engine", "mysql")]);
4253        let resp = svc.describe_orderable_db_instance_options(&req).unwrap();
4254        let body = body_of(resp);
4255        assert!(body.contains("<OrderableDBInstanceOptions>"));
4256    }
4257
4258    #[test]
4259    fn describe_db_parameter_group_not_found() {
4260        let svc = make_service();
4261        let req = request(
4262            "DescribeDBParameterGroups",
4263            &[("DBParameterGroupName", "ghost")],
4264        );
4265        assert_code(
4266            svc.describe_db_parameter_groups(&req),
4267            "DBParameterGroupNotFound",
4268        );
4269    }
4270
4271    #[test]
4272    fn delete_db_parameter_group_not_found() {
4273        let svc = make_service();
4274        let req = request(
4275            "DeleteDBParameterGroup",
4276            &[("DBParameterGroupName", "ghost")],
4277        );
4278        assert_code(
4279            svc.delete_db_parameter_group(&req),
4280            "DBParameterGroupNotFound",
4281        );
4282    }
4283
4284    #[test]
4285    fn describe_db_subnet_group_not_found() {
4286        let svc = make_service();
4287        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4288        assert_code(
4289            svc.describe_db_subnet_groups(&req),
4290            "DBSubnetGroupNotFoundFault",
4291        );
4292    }
4293
4294    #[test]
4295    fn delete_db_subnet_group_not_found() {
4296        let svc = make_service();
4297        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
4298        assert_code(
4299            svc.delete_db_subnet_group(&req),
4300            "DBSubnetGroupNotFoundFault",
4301        );
4302    }
4303
4304    #[test]
4305    fn add_tags_resource_not_found() {
4306        let svc = make_service();
4307        let req = request(
4308            "AddTagsToResource",
4309            &[
4310                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4311                ("Tags.member.1.Key", "k"),
4312                ("Tags.member.1.Value", "v"),
4313            ],
4314        );
4315        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
4316    }
4317
4318    #[test]
4319    fn list_tags_resource_not_found() {
4320        let svc = make_service();
4321        let req = request(
4322            "ListTagsForResource",
4323            &[("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost")],
4324        );
4325        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
4326    }
4327
4328    // ── snapshot operations ──
4329
4330    #[tokio::test]
4331    async fn create_db_snapshot_missing_id_errors() {
4332        let svc = make_service();
4333        let req = request(
4334            "CreateDBSnapshot",
4335            &[("DBInstanceIdentifier", "nonexistent")],
4336        );
4337        assert_code(svc.create_db_snapshot(&req).await, "MissingParameter");
4338    }
4339
4340    #[tokio::test]
4341    async fn create_db_snapshot_unknown_instance_errors() {
4342        let svc = make_service();
4343        let req = request(
4344            "CreateDBSnapshot",
4345            &[
4346                ("DBSnapshotIdentifier", "snap1"),
4347                ("DBInstanceIdentifier", "ghost"),
4348            ],
4349        );
4350        assert!(svc.create_db_snapshot(&req).await.is_err());
4351    }
4352
4353    // ── delete_db_instance ──
4354
4355    #[tokio::test]
4356    async fn delete_db_instance_missing_id_errors() {
4357        let svc = make_service();
4358        let req = request("DeleteDBInstance", &[]);
4359        assert_code(svc.delete_db_instance(&req).await, "MissingParameter");
4360    }
4361
4362    // ── reboot_db_instance ──
4363
4364    #[tokio::test]
4365    async fn reboot_db_instance_missing_id_errors() {
4366        let svc = make_service();
4367        let req = request("RebootDBInstance", &[]);
4368        assert_code(svc.reboot_db_instance(&req).await, "MissingParameter");
4369    }
4370
4371    // ── create_db_instance validation ──
4372
4373    #[tokio::test]
4374    async fn create_db_instance_missing_id_errors() {
4375        let svc = make_service();
4376        let req = request(
4377            "CreateDBInstance",
4378            &[
4379                ("Engine", "postgres"),
4380                ("DBInstanceClass", "db.t3.micro"),
4381                ("AllocatedStorage", "20"),
4382                ("MasterUsername", "admin"),
4383                ("MasterUserPassword", "secretpass"),
4384            ],
4385        );
4386        assert!(svc.create_db_instance(&req).await.is_err());
4387    }
4388
4389    #[tokio::test]
4390    async fn create_db_instance_unsupported_engine_errors() {
4391        let svc = make_service();
4392        let req = request(
4393            "CreateDBInstance",
4394            &[
4395                ("DBInstanceIdentifier", "db1"),
4396                ("Engine", "mongodb"),
4397                ("DBInstanceClass", "db.t3.micro"),
4398                ("AllocatedStorage", "20"),
4399                ("MasterUsername", "admin"),
4400                ("MasterUserPassword", "secretpass"),
4401            ],
4402        );
4403        assert!(svc.create_db_instance(&req).await.is_err());
4404    }
4405
4406    // ── restore_db_instance_from_db_snapshot ──
4407
4408    #[tokio::test]
4409    async fn restore_db_instance_missing_ids_errors() {
4410        let svc = make_service();
4411        let req = request("RestoreDBInstanceFromDBSnapshot", &[]);
4412        assert!(svc
4413            .restore_db_instance_from_db_snapshot(&req)
4414            .await
4415            .is_err());
4416    }
4417
4418    #[tokio::test]
4419    async fn restore_db_instance_unknown_snapshot_errors() {
4420        let svc = make_service();
4421        let req = request(
4422            "RestoreDBInstanceFromDBSnapshot",
4423            &[
4424                ("DBInstanceIdentifier", "restored"),
4425                ("DBSnapshotIdentifier", "missing"),
4426            ],
4427        );
4428        assert!(svc
4429            .restore_db_instance_from_db_snapshot(&req)
4430            .await
4431            .is_err());
4432    }
4433
4434    // ── create_db_instance_read_replica ──
4435
4436    #[tokio::test]
4437    async fn create_read_replica_missing_source_errors() {
4438        let svc = make_service();
4439        let req = request(
4440            "CreateDBInstanceReadReplica",
4441            &[("DBInstanceIdentifier", "replica1")],
4442        );
4443        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4444    }
4445
4446    #[tokio::test]
4447    async fn create_read_replica_unknown_source_errors() {
4448        let svc = make_service();
4449        let req = request(
4450            "CreateDBInstanceReadReplica",
4451            &[
4452                ("DBInstanceIdentifier", "replica1"),
4453                ("SourceDBInstanceIdentifier", "ghost"),
4454            ],
4455        );
4456        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4457    }
4458
4459    // ── describe_db_snapshots with filters ──
4460
4461    #[test]
4462    fn describe_db_snapshots_by_snapshot_id_only() {
4463        let svc = make_service();
4464        seed_snapshot(&svc, "s1", "inst1");
4465        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "s1")]);
4466        let resp = svc.describe_db_snapshots(&req).unwrap();
4467        let b = body_of(resp);
4468        assert!(b.contains("<DBSnapshotIdentifier>s1</DBSnapshotIdentifier>"));
4469    }
4470
4471    #[test]
4472    fn describe_db_snapshots_by_instance_id_returns_matching() {
4473        let svc = make_service();
4474        seed_snapshot(&svc, "s1", "inst1");
4475        seed_snapshot(&svc, "s2", "inst2");
4476        let req = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "inst1")]);
4477        let resp = svc.describe_db_snapshots(&req).unwrap();
4478        let b = body_of(resp);
4479        assert!(b.contains("s1"));
4480        assert!(!b.contains("<DBSnapshotIdentifier>s2</DBSnapshotIdentifier>"));
4481    }
4482
4483    // ── modify_db_parameter_group ──
4484
4485    #[test]
4486    fn modify_db_parameter_group_missing_name() {
4487        let svc = make_service();
4488        let req = request("ModifyDBParameterGroup", &[]);
4489        assert!(svc.modify_db_parameter_group(&req).is_err());
4490    }
4491
4492    // ── modify_db_subnet_group ──
4493
4494    #[test]
4495    fn modify_db_subnet_group_unknown_errors() {
4496        let svc = make_service();
4497        let req = request(
4498            "ModifyDBSubnetGroup",
4499            &[
4500                ("DBSubnetGroupName", "ghost"),
4501                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4502                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4503            ],
4504        );
4505        assert!(svc.modify_db_subnet_group(&req).is_err());
4506    }
4507
4508    // ── describe_db_instances ──
4509
4510    #[test]
4511    fn describe_db_instances_empty_returns_xml() {
4512        let svc = make_service();
4513        let req = request("DescribeDBInstances", &[]);
4514        let resp = svc.describe_db_instances(&req).unwrap();
4515        let b = body_of(resp);
4516        assert!(b.contains("DescribeDBInstancesResult"));
4517    }
4518
4519    #[test]
4520    fn describe_db_snapshots_empty_returns_empty_list() {
4521        let svc = make_service();
4522        let req = request("DescribeDBSnapshots", &[]);
4523        let resp = svc.describe_db_snapshots(&req).unwrap();
4524        let b = body_of(resp);
4525        assert!(b.contains("DescribeDBSnapshotsResult"));
4526    }
4527
4528    #[test]
4529    fn add_tags_unknown_resource_errors() {
4530        let svc = make_service();
4531        let req = request(
4532            "AddTagsToResource",
4533            &[
4534                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4535                ("Tags.member.1.Key", "k"),
4536                ("Tags.member.1.Value", "v"),
4537            ],
4538        );
4539        assert!(svc.add_tags_to_resource(&req).is_err());
4540    }
4541
4542    #[test]
4543    fn remove_tags_unknown_resource_errors() {
4544        let svc = make_service();
4545        let req = request(
4546            "RemoveTagsFromResource",
4547            &[
4548                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4549                ("TagKeys.member.1", "k"),
4550            ],
4551        );
4552        assert!(svc.remove_tags_from_resource(&req).is_err());
4553    }
4554
4555    #[test]
4556    fn create_db_parameter_group_missing_name_errors() {
4557        let svc = make_service();
4558        let req = request(
4559            "CreateDBParameterGroup",
4560            &[
4561                ("DBParameterGroupFamily", "postgres16"),
4562                ("Description", "d"),
4563            ],
4564        );
4565        assert!(svc.create_db_parameter_group(&req).is_err());
4566    }
4567
4568    #[test]
4569    fn create_db_subnet_group_missing_desc_errors() {
4570        let svc = make_service();
4571        let req = request(
4572            "CreateDBSubnetGroup",
4573            &[
4574                ("DBSubnetGroupName", "sg1"),
4575                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4576                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4577            ],
4578        );
4579        assert!(svc.create_db_subnet_group(&req).is_err());
4580    }
4581
4582    #[tokio::test]
4583    async fn create_db_instance_missing_class_errors() {
4584        let svc = make_service();
4585        let req = request(
4586            "CreateDBInstance",
4587            &[
4588                ("DBInstanceIdentifier", "miss-class"),
4589                ("Engine", "postgres"),
4590                ("AllocatedStorage", "20"),
4591                ("MasterUsername", "admin"),
4592                ("MasterUserPassword", "secretpass"),
4593            ],
4594        );
4595        assert!(svc.create_db_instance(&req).await.is_err());
4596    }
4597
4598    #[tokio::test]
4599    async fn create_db_instance_missing_master_username_errors() {
4600        let svc = make_service();
4601        let req = request(
4602            "CreateDBInstance",
4603            &[
4604                ("DBInstanceIdentifier", "miss-mu"),
4605                ("Engine", "postgres"),
4606                ("DBInstanceClass", "db.t3.micro"),
4607                ("AllocatedStorage", "20"),
4608                ("MasterUserPassword", "secretpass"),
4609            ],
4610        );
4611        assert!(svc.create_db_instance(&req).await.is_err());
4612    }
4613
4614    #[test]
4615    fn modify_db_instance_missing_id_errors() {
4616        let svc = make_service();
4617        let req = request("ModifyDBInstance", &[]);
4618        assert!(svc.modify_db_instance(&req).is_err());
4619    }
4620
4621    #[test]
4622    fn modify_db_parameter_group_unknown_pg_errors() {
4623        let svc = make_service();
4624        let req = request(
4625            "ModifyDBParameterGroup",
4626            &[
4627                ("DBParameterGroupName", "ghost"),
4628                ("Parameters.member.1.ParameterName", "p"),
4629                ("Parameters.member.1.ParameterValue", "v"),
4630                ("Parameters.member.1.ApplyMethod", "immediate"),
4631            ],
4632        );
4633        assert!(svc.modify_db_parameter_group(&req).is_err());
4634    }
4635
4636    #[test]
4637    fn describe_db_parameter_groups_unknown_errors() {
4638        let svc = make_service();
4639        let req = request(
4640            "DescribeDBParameterGroups",
4641            &[("DBParameterGroupName", "ghost")],
4642        );
4643        assert!(svc.describe_db_parameter_groups(&req).is_err());
4644    }
4645
4646    #[test]
4647    fn describe_db_subnet_groups_unknown_errors() {
4648        let svc = make_service();
4649        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4650        assert!(svc.describe_db_subnet_groups(&req).is_err());
4651    }
4652}