Skip to main content

fakecloud_rds/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use base64::engine::general_purpose::STANDARD as BASE64;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use tokio::sync::Mutex as AsyncMutex;
9
10use fakecloud_aws::xml::xml_escape;
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_persistence::SnapshotStore;
14
15use crate::runtime::{RdsRuntime, RuntimeError};
16use crate::state::{
17    default_engine_versions, default_orderable_options, DbInstance, DbParameterGroup, DbSnapshot,
18    DbSubnetGroup, EngineVersionInfo, OrderableDbInstanceOption, RdsSnapshot, RdsState, RdsTag,
19    SharedRdsState, RDS_SNAPSHOT_SCHEMA_VERSION,
20};
21
22const RDS_NS: &str = "http://rds.amazonaws.com/doc/2014-10-31/";
23
24fn is_mutating_action(action: &str) -> bool {
25    if matches!(
26        action,
27        "AddTagsToResource"
28            | "CreateDBInstance"
29            | "CreateDBInstanceReadReplica"
30            | "CreateDBParameterGroup"
31            | "CreateDBSnapshot"
32            | "CreateDBSubnetGroup"
33            | "DeleteDBInstance"
34            | "DeleteDBParameterGroup"
35            | "DeleteDBSnapshot"
36            | "DeleteDBSubnetGroup"
37            | "ModifyDBInstance"
38            | "ModifyDBParameterGroup"
39            | "ModifyDBSubnetGroup"
40            | "RebootDBInstance"
41            | "RemoveTagsFromResource"
42            | "RestoreDBInstanceFromDBSnapshot"
43    ) {
44        return true;
45    }
46    // Heuristic for the 140 extra ops: any verb that mutates state.
47    let mutating_prefixes = [
48        "Create",
49        "Modify",
50        "Delete",
51        "Reboot",
52        "Start",
53        "Stop",
54        "Failover",
55        "Switchover",
56        "Promote",
57        "Reset",
58        "Apply",
59        "Authorize",
60        "Revoke",
61        "Add",
62        "Remove",
63        "Register",
64        "Deregister",
65        "Copy",
66        "Restore",
67        "Backtrack",
68        "Cancel",
69        "Purchase",
70        "Disable",
71        "Enable",
72    ];
73    mutating_prefixes.iter().any(|p| action.starts_with(p))
74}
75const SUPPORTED_ACTIONS: &[&str] = &[
76    "AddRoleToDBCluster",
77    "AddRoleToDBInstance",
78    "AddSourceIdentifierToSubscription",
79    "AddTagsToResource",
80    "ApplyPendingMaintenanceAction",
81    "AuthorizeDBSecurityGroupIngress",
82    "BacktrackDBCluster",
83    "CancelExportTask",
84    "CopyDBClusterParameterGroup",
85    "CopyDBClusterSnapshot",
86    "CopyDBParameterGroup",
87    "CopyDBSnapshot",
88    "CopyOptionGroup",
89    "CreateBlueGreenDeployment",
90    "CreateCustomDBEngineVersion",
91    "CreateDBCluster",
92    "CreateDBClusterEndpoint",
93    "CreateDBClusterParameterGroup",
94    "CreateDBClusterSnapshot",
95    "CreateDBInstance",
96    "CreateDBInstanceReadReplica",
97    "CreateDBParameterGroup",
98    "CreateDBProxy",
99    "CreateDBProxyEndpoint",
100    "CreateDBSecurityGroup",
101    "CreateDBShardGroup",
102    "CreateDBSnapshot",
103    "CreateDBSubnetGroup",
104    "CreateEventSubscription",
105    "CreateGlobalCluster",
106    "CreateIntegration",
107    "CreateOptionGroup",
108    "CreateTenantDatabase",
109    "DeleteBlueGreenDeployment",
110    "DeleteCustomDBEngineVersion",
111    "DeleteDBCluster",
112    "DeleteDBClusterAutomatedBackup",
113    "DeleteDBClusterEndpoint",
114    "DeleteDBClusterParameterGroup",
115    "DeleteDBClusterSnapshot",
116    "DeleteDBInstance",
117    "DeleteDBInstanceAutomatedBackup",
118    "DeleteDBParameterGroup",
119    "DeleteDBProxy",
120    "DeleteDBProxyEndpoint",
121    "DeleteDBSecurityGroup",
122    "DeleteDBShardGroup",
123    "DeleteDBSnapshot",
124    "DeleteDBSubnetGroup",
125    "DeleteEventSubscription",
126    "DeleteGlobalCluster",
127    "DeleteIntegration",
128    "DeleteOptionGroup",
129    "DeleteTenantDatabase",
130    "DeregisterDBProxyTargets",
131    "DescribeAccountAttributes",
132    "DescribeBlueGreenDeployments",
133    "DescribeCertificates",
134    "DescribeDBClusterAutomatedBackups",
135    "DescribeDBClusterBacktracks",
136    "DescribeDBClusterEndpoints",
137    "DescribeDBClusterParameterGroups",
138    "DescribeDBClusterParameters",
139    "DescribeDBClusterSnapshotAttributes",
140    "DescribeDBClusterSnapshots",
141    "DescribeDBClusters",
142    "DescribeDBEngineVersions",
143    "DescribeDBInstanceAutomatedBackups",
144    "DescribeDBInstances",
145    "DescribeDBLogFiles",
146    "DescribeDBMajorEngineVersions",
147    "DescribeDBParameterGroups",
148    "DescribeDBParameters",
149    "DescribeDBProxies",
150    "DescribeDBProxyEndpoints",
151    "DescribeDBProxyTargetGroups",
152    "DescribeDBProxyTargets",
153    "DescribeDBRecommendations",
154    "DescribeDBSecurityGroups",
155    "DescribeDBShardGroups",
156    "DescribeDBSnapshotAttributes",
157    "DescribeDBSnapshotTenantDatabases",
158    "DescribeDBSnapshots",
159    "DescribeDBSubnetGroups",
160    "DescribeEngineDefaultClusterParameters",
161    "DescribeEngineDefaultParameters",
162    "DescribeEventCategories",
163    "DescribeEventSubscriptions",
164    "DescribeEvents",
165    "DescribeExportTasks",
166    "DescribeGlobalClusters",
167    "DescribeIntegrations",
168    "DescribeOptionGroupOptions",
169    "DescribeOptionGroups",
170    "DescribeOrderableDBInstanceOptions",
171    "DescribePendingMaintenanceActions",
172    "DescribeReservedDBInstances",
173    "DescribeReservedDBInstancesOfferings",
174    "DescribeSourceRegions",
175    "DescribeTenantDatabases",
176    "DescribeValidDBInstanceModifications",
177    "DisableHttpEndpoint",
178    "DownloadDBLogFilePortion",
179    "EnableHttpEndpoint",
180    "FailoverDBCluster",
181    "FailoverGlobalCluster",
182    "ListTagsForResource",
183    "ModifyActivityStream",
184    "ModifyCertificates",
185    "ModifyCurrentDBClusterCapacity",
186    "ModifyCustomDBEngineVersion",
187    "ModifyDBCluster",
188    "ModifyDBClusterEndpoint",
189    "ModifyDBClusterParameterGroup",
190    "ModifyDBClusterSnapshotAttribute",
191    "ModifyDBInstance",
192    "ModifyDBParameterGroup",
193    "ModifyDBProxy",
194    "ModifyDBProxyEndpoint",
195    "ModifyDBProxyTargetGroup",
196    "ModifyDBRecommendation",
197    "ModifyDBShardGroup",
198    "ModifyDBSnapshot",
199    "ModifyDBSnapshotAttribute",
200    "ModifyDBSubnetGroup",
201    "ModifyEventSubscription",
202    "ModifyGlobalCluster",
203    "ModifyIntegration",
204    "ModifyOptionGroup",
205    "ModifyTenantDatabase",
206    "PromoteReadReplica",
207    "PromoteReadReplicaDBCluster",
208    "PurchaseReservedDBInstancesOffering",
209    "RebootDBCluster",
210    "RebootDBInstance",
211    "RebootDBShardGroup",
212    "RegisterDBProxyTargets",
213    "RemoveFromGlobalCluster",
214    "RemoveRoleFromDBCluster",
215    "RemoveRoleFromDBInstance",
216    "RemoveSourceIdentifierFromSubscription",
217    "RemoveTagsFromResource",
218    "ResetDBClusterParameterGroup",
219    "ResetDBParameterGroup",
220    "RestoreDBClusterFromS3",
221    "RestoreDBClusterFromSnapshot",
222    "RestoreDBClusterToPointInTime",
223    "RestoreDBInstanceFromDBSnapshot",
224    "RestoreDBInstanceFromS3",
225    "RestoreDBInstanceToPointInTime",
226    "RevokeDBSecurityGroupIngress",
227    "StartActivityStream",
228    "StartDBCluster",
229    "StartDBInstance",
230    "StartDBInstanceAutomatedBackupsReplication",
231    "StartExportTask",
232    "StopActivityStream",
233    "StopDBCluster",
234    "StopDBInstance",
235    "StopDBInstanceAutomatedBackupsReplication",
236    "SwitchoverBlueGreenDeployment",
237    "SwitchoverGlobalCluster",
238    "SwitchoverReadReplica",
239];
240
241pub struct RdsService {
242    pub(crate) state: SharedRdsState,
243    runtime: Option<Arc<RdsRuntime>>,
244    snapshot_store: Option<Arc<dyn SnapshotStore>>,
245    snapshot_lock: Arc<AsyncMutex<()>>,
246    pub(crate) delivery_bus: Option<Arc<DeliveryBus>>,
247}
248
249/// Source type for RDS EventBridge events. Maps `aws.rds` detail-type.
250#[derive(Clone, Copy)]
251#[allow(dead_code, clippy::enum_variant_names)]
252pub(crate) enum RdsSourceType {
253    DbInstance,
254    DbSnapshot,
255    DbParameterGroup,
256}
257
258impl RdsSourceType {
259    fn as_str(self) -> &'static str {
260        match self {
261            Self::DbInstance => "DB_INSTANCE",
262            Self::DbSnapshot => "DB_SNAPSHOT",
263            Self::DbParameterGroup => "DB_PARAMETER_GROUP",
264        }
265    }
266
267    fn detail_type(self) -> &'static str {
268        match self {
269            Self::DbInstance => "RDS DB Instance Event",
270            Self::DbSnapshot => "RDS DB Snapshot Event",
271            Self::DbParameterGroup => "RDS DB Parameter Group Event",
272        }
273    }
274}
275
276impl RdsService {
277    pub(crate) fn state_handle(&self) -> &SharedRdsState {
278        &self.state
279    }
280}
281
282impl RdsService {
283    pub fn new(state: SharedRdsState) -> Self {
284        Self {
285            state,
286            runtime: None,
287            snapshot_store: None,
288            snapshot_lock: Arc::new(AsyncMutex::new(())),
289            delivery_bus: None,
290        }
291    }
292
293    pub fn with_runtime(mut self, runtime: Arc<RdsRuntime>) -> Self {
294        self.runtime = Some(runtime);
295        self
296    }
297
298    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
299        self.snapshot_store = Some(store);
300        self
301    }
302
303    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
304        self.delivery_bus = Some(bus);
305        self
306    }
307
308    /// Emit an `aws.rds` EventBridge event mirroring the AWS RDS event schema.
309    /// No-op when the delivery bus isn't wired (tests, minimal configs).
310    pub(crate) fn emit_event(
311        &self,
312        source_type: RdsSourceType,
313        source_identifier: &str,
314        source_arn: &str,
315        event_id: &str,
316        event_categories: &[&str],
317        message: &str,
318    ) {
319        let Some(ref bus) = self.delivery_bus else {
320            return;
321        };
322        let detail = serde_json::json!({
323            "EventCategories": event_categories,
324            "SourceType": source_type.as_str(),
325            "SourceArn": source_arn,
326            "Date": Utc::now().to_rfc3339(),
327            "Message": message,
328            "SourceIdentifier": source_identifier,
329            "EventID": event_id,
330        });
331        bus.put_event_to_eventbridge(
332            "aws.rds",
333            source_type.detail_type(),
334            &detail.to_string(),
335            "default",
336        );
337    }
338
339    async fn save_snapshot(&self) {
340        let Some(store) = self.snapshot_store.clone() else {
341            return;
342        };
343        let _guard = self.snapshot_lock.lock().await;
344        let snapshot = RdsSnapshot {
345            schema_version: RDS_SNAPSHOT_SCHEMA_VERSION,
346            state: None,
347            accounts: Some(self.state.read().clone()),
348        };
349        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
350            let bytes = serde_json::to_vec(&snapshot)
351                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
352            store.save(&bytes)
353        })
354        .await;
355        match join {
356            Ok(Ok(())) => {}
357            Ok(Err(err)) => tracing::error!(%err, "failed to write rds snapshot"),
358            Err(err) => tracing::error!(%err, "rds snapshot task panicked"),
359        }
360    }
361
362    /// Return the runtime or a ``ServiceUnavailable`` error if it was not configured.
363    ///
364    /// RDS operations that start, stop, or reach into a database container fail
365    /// with a consistent wire error when the daemon (Docker/Podman) is missing
366    /// rather than each caller restating the message.
367    fn require_runtime(&self) -> Result<&Arc<RdsRuntime>, AwsServiceError> {
368        self.runtime.as_ref().ok_or_else(|| {
369            AwsServiceError::aws_error(
370                StatusCode::SERVICE_UNAVAILABLE,
371                "InvalidParameterValue",
372                "Docker/Podman is required for RDS DB instances but is not available",
373            )
374        })
375    }
376}
377
378#[async_trait]
379impl AwsService for RdsService {
380    fn service_name(&self) -> &str {
381        "rds"
382    }
383
384    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
385        let mutates = is_mutating_action(request.action.as_str());
386        let result = match request.action.as_str() {
387            "AddTagsToResource" => self.add_tags_to_resource(&request),
388            "CreateDBInstance" => self.create_db_instance(&request).await,
389            "CreateDBInstanceReadReplica" => self.create_db_instance_read_replica(&request).await,
390            "CreateDBParameterGroup" => self.create_db_parameter_group(&request),
391            "CreateDBSnapshot" => self.create_db_snapshot(&request).await,
392            "CreateDBSubnetGroup" => self.create_db_subnet_group(&request),
393            "DeleteDBInstance" => self.delete_db_instance(&request).await,
394            "DeleteDBParameterGroup" => self.delete_db_parameter_group(&request),
395            "DeleteDBSnapshot" => self.delete_db_snapshot(&request),
396            "DeleteDBSubnetGroup" => self.delete_db_subnet_group(&request),
397            "DescribeDBEngineVersions" => self.describe_db_engine_versions(&request),
398            "DescribeDBInstances" => self.describe_db_instances(&request),
399            "DescribeDBParameterGroups" => self.describe_db_parameter_groups(&request),
400            "DescribeDBSnapshots" => self.describe_db_snapshots(&request),
401            "DescribeDBSubnetGroups" => self.describe_db_subnet_groups(&request),
402            "DescribeOrderableDBInstanceOptions" => {
403                self.describe_orderable_db_instance_options(&request)
404            }
405            "ListTagsForResource" => self.list_tags_for_resource(&request),
406            "ModifyDBInstance" => self.modify_db_instance(&request),
407            "ModifyDBParameterGroup" => self.modify_db_parameter_group(&request),
408            "ModifyDBSubnetGroup" => self.modify_db_subnet_group(&request),
409            "RebootDBInstance" => self.reboot_db_instance(&request).await,
410            "RemoveTagsFromResource" => self.remove_tags_from_resource(&request),
411            "RestoreDBInstanceFromDBSnapshot" => {
412                self.restore_db_instance_from_db_snapshot(&request).await
413            }
414            _ => self.handle_extra_action(&request),
415        };
416        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
417            self.save_snapshot().await;
418        }
419        result
420    }
421
422    fn supported_actions(&self) -> &[&str] {
423        SUPPORTED_ACTIONS
424    }
425}
426
427impl RdsService {
428    async fn create_db_instance(
429        &self,
430        request: &AwsRequest,
431    ) -> Result<AwsResponse, AwsServiceError> {
432        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
433        let allocated_storage = required_i32_param(request, "AllocatedStorage")?;
434        let db_instance_class = required_param(request, "DBInstanceClass")?;
435        let engine = required_param(request, "Engine")?;
436        let master_username = required_param(request, "MasterUsername")?;
437        let master_user_password = required_param(request, "MasterUserPassword")?;
438        let db_name = optional_param(request, "DBName");
439        let engine_version =
440            optional_param(request, "EngineVersion").unwrap_or_else(|| "16.3".to_string());
441        let publicly_accessible =
442            parse_optional_bool(optional_param(request, "PubliclyAccessible").as_deref())?
443                .unwrap_or(true);
444        let deletion_protection =
445            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?
446                .unwrap_or(false);
447        let port = optional_i32_param(request, "Port")?
448            .unwrap_or_else(|| default_port_for_engine(&engine));
449        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
450
451        let db_parameter_group_name = optional_param(request, "DBParameterGroupName")
452            .or_else(|| Some(default_parameter_group(&engine, &engine_version)));
453
454        let backup_retention_period =
455            optional_i32_param(request, "BackupRetentionPeriod")?.unwrap_or(1);
456        let preferred_backup_window = optional_param(request, "PreferredBackupWindow")
457            .unwrap_or_else(|| "03:00-04:00".to_string());
458        let option_group_name = optional_param(request, "OptionGroupName");
459        let multi_az =
460            parse_optional_bool(optional_param(request, "MultiAZ").as_deref())?.unwrap_or(false);
461
462        validate_create_request(
463            &db_instance_identifier,
464            allocated_storage,
465            &db_instance_class,
466            &engine,
467            &engine_version,
468            port,
469        )?;
470
471        {
472            let mut accounts = self.state.write();
473            let state = accounts.get_or_create(&request.account_id);
474            if !state.begin_instance_creation(&db_instance_identifier) {
475                return Err(AwsServiceError::aws_error(
476                    StatusCode::BAD_REQUEST,
477                    "DBInstanceAlreadyExists",
478                    format!("DBInstance {} already exists.", db_instance_identifier),
479                ));
480            }
481            // Validate parameter group exists if specified by the caller
482            if let Some(ref pg_name) = db_parameter_group_name {
483                if !state.parameter_groups.contains_key(pg_name) {
484                    state.cancel_instance_creation(&db_instance_identifier);
485                    return Err(AwsServiceError::aws_error(
486                        StatusCode::NOT_FOUND,
487                        "DBParameterGroupNotFound",
488                        format!("DBParameterGroup {} not found.", pg_name),
489                    ));
490                }
491            }
492        }
493
494        let runtime = self.require_runtime()?;
495
496        let logical_db_name = db_name
497            .clone()
498            .unwrap_or_else(|| default_db_name(&engine).to_string());
499        let running = runtime
500            .ensure_postgres(
501                &db_instance_identifier,
502                &engine,
503                &engine_version,
504                &master_username,
505                &master_user_password,
506                &logical_db_name,
507                &request.account_id,
508                &request.region,
509            )
510            .await
511            .map_err(|error| {
512                self.state
513                    .write()
514                    .get_or_create(&request.account_id)
515                    .cancel_instance_creation(&db_instance_identifier);
516                runtime_error_to_service_error(error)
517            })?;
518
519        let mut accounts = self.state.write();
520        let state = accounts.get_or_create(&request.account_id);
521        let created_at = Utc::now();
522        let instance = DbInstance {
523            db_instance_identifier: db_instance_identifier.clone(),
524            db_instance_arn: state.db_instance_arn(&db_instance_identifier),
525            db_instance_class: db_instance_class.clone(),
526            engine: engine.clone(),
527            engine_version: engine_version.clone(),
528            db_instance_status: "available".to_string(),
529            master_username: master_username.clone(),
530            db_name: db_name.clone(),
531            endpoint_address: "127.0.0.1".to_string(),
532            port: i32::from(running.host_port),
533            allocated_storage,
534            publicly_accessible,
535            deletion_protection,
536            created_at,
537            dbi_resource_id: state.next_dbi_resource_id(),
538            master_user_password,
539            container_id: running.container_id,
540            host_port: running.host_port,
541            tags: Vec::new(),
542            read_replica_source_db_instance_identifier: None,
543            read_replica_db_instance_identifiers: Vec::new(),
544            vpc_security_group_ids,
545            db_parameter_group_name,
546            backup_retention_period,
547            preferred_backup_window,
548            latest_restorable_time: if backup_retention_period > 0 {
549                Some(created_at)
550            } else {
551                None
552            },
553            option_group_name,
554            multi_az,
555            pending_modified_values: None,
556        };
557        state.finish_instance_creation(instance.clone());
558        let instance_arn = instance.db_instance_arn.clone();
559        drop(accounts);
560
561        self.emit_event(
562            RdsSourceType::DbInstance,
563            &db_instance_identifier,
564            &instance_arn,
565            "RDS-EVENT-0005",
566            &["creation"],
567            "DB instance created",
568        );
569
570        Ok(AwsResponse::xml(
571            StatusCode::OK,
572            xml_wrap(
573                "CreateDBInstance",
574                &format!(
575                    "<DBInstance>{}</DBInstance>",
576                    db_instance_xml(&instance, Some("creating"))
577                ),
578                &request.request_id,
579            ),
580        ))
581    }
582
583    async fn delete_db_instance(
584        &self,
585        request: &AwsRequest,
586    ) -> Result<AwsResponse, AwsServiceError> {
587        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
588        let skip_final_snapshot =
589            parse_optional_bool(optional_param(request, "SkipFinalSnapshot").as_deref())?
590                .unwrap_or(false);
591        let final_db_snapshot_identifier = optional_param(request, "FinalDBSnapshotIdentifier");
592
593        if skip_final_snapshot && final_db_snapshot_identifier.is_some() {
594            return Err(AwsServiceError::aws_error(
595                StatusCode::BAD_REQUEST,
596                "InvalidParameterCombination",
597                "FinalDBSnapshotIdentifier cannot be specified when SkipFinalSnapshot is enabled.",
598            ));
599        }
600        if !skip_final_snapshot && final_db_snapshot_identifier.is_none() {
601            return Err(AwsServiceError::aws_error(
602                StatusCode::BAD_REQUEST,
603                "InvalidParameterCombination",
604                "FinalDBSnapshotIdentifier is required when SkipFinalSnapshot is false or not specified.",
605            ));
606        }
607
608        // Check deletion protection BEFORE creating snapshot or making any changes
609        {
610            let accounts = self.state.read();
611            let empty = RdsState::new(&request.account_id, &request.region);
612            let state = accounts.get(&request.account_id).unwrap_or(&empty);
613            if let Some(instance) = state.instances.get(&db_instance_identifier) {
614                if instance.deletion_protection {
615                    return Err(AwsServiceError::aws_error(
616                        StatusCode::BAD_REQUEST,
617                        "InvalidDBInstanceState",
618                        format!(
619                            "DBInstance {} cannot be deleted because deletion protection is enabled.",
620                            db_instance_identifier
621                        ),
622                    ));
623                }
624            } else {
625                return Err(db_instance_not_found(&db_instance_identifier));
626            }
627        }
628
629        if let Some(ref snapshot_id) = final_db_snapshot_identifier {
630            self.create_final_db_snapshot(
631                &db_instance_identifier,
632                snapshot_id,
633                &request.account_id,
634                &request.region,
635            )
636            .await?;
637        }
638
639        let instance = {
640            let mut accounts = self.state.write();
641            let state = accounts.get_or_create(&request.account_id);
642            let instance = state
643                .instances
644                .remove(&db_instance_identifier)
645                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
646
647            if let Some(source_id) = &instance.read_replica_source_db_instance_identifier {
648                if let Some(source) = state.instances.get_mut(source_id) {
649                    source
650                        .read_replica_db_instance_identifiers
651                        .retain(|id| id != &db_instance_identifier);
652                }
653            }
654
655            for replica_id in &instance.read_replica_db_instance_identifiers {
656                if let Some(replica) = state.instances.get_mut(replica_id) {
657                    replica.read_replica_source_db_instance_identifier = None;
658                }
659            }
660
661            instance
662        };
663
664        if let Some(runtime) = &self.runtime {
665            runtime.stop_container(&db_instance_identifier).await;
666        }
667
668        self.emit_event(
669            RdsSourceType::DbInstance,
670            &db_instance_identifier,
671            &instance.db_instance_arn,
672            "RDS-EVENT-0003",
673            &["deletion"],
674            "DB instance deleted",
675        );
676
677        Ok(AwsResponse::xml(
678            StatusCode::OK,
679            xml_wrap(
680                "DeleteDBInstance",
681                &format!(
682                    "<DBInstance>{}</DBInstance>",
683                    db_instance_xml(&instance, Some("deleting"))
684                ),
685                &request.request_id,
686            ),
687        ))
688    }
689
690    /// Take a final snapshot of an instance that is about to be deleted,
691    /// persisting the dumped database into `state.snapshots`. The DLQ-style
692    /// conflict check runs twice — once under the read lock before paying
693    /// for the dump, once under the write lock before committing — to keep
694    /// concurrent deletes from colliding.
695    async fn create_final_db_snapshot(
696        &self,
697        db_instance_identifier: &str,
698        snapshot_id: &str,
699        account_id: &str,
700        region: &str,
701    ) -> Result<(), AwsServiceError> {
702        let runtime = self.runtime.as_ref().ok_or_else(|| {
703            AwsServiceError::aws_error(
704                StatusCode::SERVICE_UNAVAILABLE,
705                "InvalidParameterValue",
706                "Docker/Podman is required for RDS snapshots but is not available",
707            )
708        })?;
709
710        let (instance_for_snapshot, db_name) = {
711            let accounts = self.state.read();
712            let empty = RdsState::new(account_id, region);
713            let state = accounts.get(account_id).unwrap_or(&empty);
714
715            if state.snapshots.contains_key(snapshot_id) {
716                return Err(AwsServiceError::aws_error(
717                    StatusCode::CONFLICT,
718                    "DBSnapshotAlreadyExists",
719                    format!("DBSnapshot {snapshot_id} already exists."),
720                ));
721            }
722
723            let instance = state
724                .instances
725                .get(db_instance_identifier)
726                .cloned()
727                .ok_or_else(|| db_instance_not_found(db_instance_identifier))?;
728
729            let default_db = default_db_name(&instance.engine);
730            let db_name = instance
731                .db_name
732                .as_deref()
733                .unwrap_or(default_db)
734                .to_string();
735
736            (instance, db_name)
737        };
738
739        let dump_data = runtime
740            .dump_database(
741                db_instance_identifier,
742                &instance_for_snapshot.engine,
743                &instance_for_snapshot.master_username,
744                &instance_for_snapshot.master_user_password,
745                &db_name,
746            )
747            .await
748            .map_err(runtime_error_to_service_error)?;
749
750        let mut accounts = self.state.write();
751        let state = accounts.get_or_create(account_id);
752
753        if state.snapshots.contains_key(snapshot_id) {
754            return Err(AwsServiceError::aws_error(
755                StatusCode::CONFLICT,
756                "DBSnapshotAlreadyExists",
757                format!("DBSnapshot {snapshot_id} already exists."),
758            ));
759        }
760
761        let snapshot_arn = state.db_snapshot_arn(snapshot_id);
762
763        let snapshot = DbSnapshot {
764            db_snapshot_identifier: snapshot_id.to_string(),
765            db_snapshot_arn: snapshot_arn,
766            db_instance_identifier: db_instance_identifier.to_string(),
767            snapshot_create_time: Utc::now(),
768            engine: instance_for_snapshot.engine.clone(),
769            engine_version: instance_for_snapshot.engine_version.clone(),
770            allocated_storage: instance_for_snapshot.allocated_storage,
771            status: "available".to_string(),
772            port: instance_for_snapshot.port,
773            master_username: instance_for_snapshot.master_username.clone(),
774            db_name: instance_for_snapshot.db_name.clone(),
775            dbi_resource_id: instance_for_snapshot.dbi_resource_id.clone(),
776            snapshot_type: "manual".to_string(),
777            master_user_password: instance_for_snapshot.master_user_password.clone(),
778            tags: Vec::new(),
779            dump_data,
780        };
781
782        state.snapshots.insert(snapshot_id.to_string(), snapshot);
783        Ok(())
784    }
785
786    fn modify_db_instance(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
787        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
788        let db_instance_class = optional_param(request, "DBInstanceClass");
789        let deletion_protection =
790            parse_optional_bool(optional_param(request, "DeletionProtection").as_deref())?;
791        let apply_immediately =
792            parse_optional_bool(optional_param(request, "ApplyImmediately").as_deref())?;
793
794        // Parse VPC security group IDs - only if at least one is provided
795        let vpc_security_group_ids = {
796            let mut ids = Vec::new();
797            for index in 1.. {
798                let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
799                match optional_param(request, &sg_id_name) {
800                    Some(sg_id) => ids.push(sg_id),
801                    None => break,
802                }
803            }
804            if ids.is_empty() {
805                None
806            } else {
807                Some(ids)
808            }
809        };
810
811        if db_instance_class.is_none()
812            && deletion_protection.is_none()
813            && vpc_security_group_ids.is_none()
814        {
815            return Err(AwsServiceError::aws_error(
816                StatusCode::BAD_REQUEST,
817                "InvalidParameterCombination",
818                "At least one supported mutable field must be provided.",
819            ));
820        }
821        if let Some(ref class) = db_instance_class {
822            validate_db_instance_class(class)?;
823        }
824
825        let mut accounts = self.state.write();
826        let state = accounts.get_or_create(&request.account_id);
827        let instance = state
828            .instances
829            .get_mut(&db_instance_identifier)
830            .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
831
832        // If ApplyImmediately is false, stage changes as pending
833        if apply_immediately == Some(false) {
834            let pending = instance
835                .pending_modified_values
836                .get_or_insert(Default::default());
837            if let Some(class) = db_instance_class {
838                pending.db_instance_class = Some(class);
839            }
840            // Note: deletion_protection and vpc_security_group_ids are applied immediately
841            // regardless of ApplyImmediately flag (per AWS behavior)
842            if let Some(deletion_protection) = deletion_protection {
843                instance.deletion_protection = deletion_protection;
844            }
845            if let Some(security_group_ids) = vpc_security_group_ids {
846                instance.vpc_security_group_ids = security_group_ids;
847            }
848        } else {
849            // Apply immediately (default behavior)
850            if let Some(class) = db_instance_class {
851                instance.db_instance_class = class;
852            }
853            if let Some(deletion_protection) = deletion_protection {
854                instance.deletion_protection = deletion_protection;
855            }
856            if let Some(security_group_ids) = vpc_security_group_ids {
857                instance.vpc_security_group_ids = security_group_ids;
858            }
859        }
860        let instance_arn = instance.db_instance_arn.clone();
861        let xml = xml_wrap(
862            "ModifyDBInstance",
863            &format!(
864                "<DBInstance>{}</DBInstance>",
865                db_instance_xml(instance, Some("modifying"))
866            ),
867            &request.request_id,
868        );
869        drop(accounts);
870
871        self.emit_event(
872            RdsSourceType::DbInstance,
873            &db_instance_identifier,
874            &instance_arn,
875            "RDS-EVENT-0014",
876            &["configuration change"],
877            "DB instance was modified",
878        );
879
880        Ok(AwsResponse::xml(StatusCode::OK, xml))
881    }
882
883    async fn reboot_db_instance(
884        &self,
885        request: &AwsRequest,
886    ) -> Result<AwsResponse, AwsServiceError> {
887        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
888        let force_failover =
889            parse_optional_bool(optional_param(request, "ForceFailover").as_deref())?;
890        if force_failover == Some(true) {
891            return Err(AwsServiceError::aws_error(
892                StatusCode::BAD_REQUEST,
893                "InvalidParameterCombination",
894                "ForceFailover is not supported for single-instance PostgreSQL DB instances.",
895            ));
896        }
897
898        let instance = {
899            let accounts = self.state.read();
900            let empty = RdsState::new(&request.account_id, &request.region);
901            let state = accounts.get(&request.account_id).unwrap_or(&empty);
902            state
903                .instances
904                .get(&db_instance_identifier)
905                .cloned()
906                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?
907        };
908
909        let runtime = self.require_runtime()?;
910
911        let running = runtime
912            .restart_container(
913                &db_instance_identifier,
914                &instance.engine,
915                &instance.master_username,
916                &instance.master_user_password,
917                instance
918                    .db_name
919                    .as_deref()
920                    .unwrap_or(default_db_name(&instance.engine)),
921            )
922            .await
923            .map_err(runtime_error_to_service_error)?;
924
925        let instance = {
926            let mut accounts = self.state.write();
927            let state = accounts.get_or_create(&request.account_id);
928            let instance = state
929                .instances
930                .get_mut(&db_instance_identifier)
931                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
932            instance.host_port = running.host_port;
933            instance.port = i32::from(running.host_port);
934
935            // Apply any pending modifications
936            if let Some(pending) = instance.pending_modified_values.take() {
937                if let Some(class) = pending.db_instance_class {
938                    instance.db_instance_class = class;
939                }
940                if let Some(allocated_storage) = pending.allocated_storage {
941                    instance.allocated_storage = allocated_storage;
942                }
943                if let Some(backup_retention_period) = pending.backup_retention_period {
944                    instance.backup_retention_period = backup_retention_period;
945                }
946                if let Some(multi_az) = pending.multi_az {
947                    instance.multi_az = multi_az;
948                }
949                if let Some(engine_version) = pending.engine_version {
950                    instance.engine_version = engine_version;
951                }
952                if let Some(master_user_password) = pending.master_user_password {
953                    instance.master_user_password = master_user_password;
954                }
955            }
956
957            instance.clone()
958        };
959
960        self.emit_event(
961            RdsSourceType::DbInstance,
962            &db_instance_identifier,
963            &instance.db_instance_arn,
964            "RDS-EVENT-0006",
965            &["availability"],
966            "DB instance restarted",
967        );
968
969        Ok(AwsResponse::xml(
970            StatusCode::OK,
971            xml_wrap(
972                "RebootDBInstance",
973                &format!(
974                    "<DBInstance>{}</DBInstance>",
975                    db_instance_xml(&instance, Some("rebooting"))
976                ),
977                &request.request_id,
978            ),
979        ))
980    }
981
982    fn describe_db_engine_versions(
983        &self,
984        request: &AwsRequest,
985    ) -> Result<AwsResponse, AwsServiceError> {
986        let engine = optional_param(request, "Engine");
987        let engine_version = optional_param(request, "EngineVersion");
988        let family = optional_param(request, "DBParameterGroupFamily");
989        let default_only = parse_optional_bool(optional_param(request, "DefaultOnly").as_deref())?;
990
991        let mut versions = filter_engine_versions(
992            &default_engine_versions(),
993            &engine,
994            &engine_version,
995            &family,
996        );
997
998        if default_only.unwrap_or(false) {
999            versions.truncate(1);
1000        }
1001
1002        Ok(AwsResponse::xml(
1003            StatusCode::OK,
1004            xml_wrap(
1005                "DescribeDBEngineVersions",
1006                &format!(
1007                    "<DBEngineVersions>{}</DBEngineVersions>",
1008                    versions.iter().map(engine_version_xml).collect::<String>()
1009                ),
1010                &request.request_id,
1011            ),
1012        ))
1013    }
1014
1015    fn describe_db_instances(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1016        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1017        let marker = optional_param(request, "Marker");
1018        let max_records = optional_param(request, "MaxRecords");
1019
1020        let accounts = self.state.read();
1021        let empty = RdsState::new(&request.account_id, &request.region);
1022        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1023
1024        // If specific identifier requested, return just that one (no pagination)
1025        if let Some(identifier) = db_instance_identifier {
1026            let instance = state
1027                .instances
1028                .get(&identifier)
1029                .cloned()
1030                .ok_or_else(|| db_instance_not_found(&identifier))?;
1031
1032            return Ok(AwsResponse::xml(
1033                StatusCode::OK,
1034                xml_wrap(
1035                    "DescribeDBInstances",
1036                    &format!(
1037                        "<DBInstances><DBInstance>{}</DBInstance></DBInstances>",
1038                        db_instance_xml(&instance, None)
1039                    ),
1040                    &request.request_id,
1041                ),
1042            ));
1043        }
1044
1045        // Get all instances sorted by created_at, then identifier
1046        let mut instances: Vec<DbInstance> = state.instances.values().cloned().collect();
1047        instances.sort_by(|a, b| {
1048            a.created_at
1049                .cmp(&b.created_at)
1050                .then_with(|| a.db_instance_identifier.cmp(&b.db_instance_identifier))
1051        });
1052
1053        // Apply pagination
1054        let paginated = paginate(instances, marker, max_records, |inst| {
1055            &inst.db_instance_identifier
1056        })?;
1057
1058        let marker_xml = paginated
1059            .next_marker
1060            .as_ref()
1061            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1062            .unwrap_or_default();
1063
1064        Ok(AwsResponse::xml(
1065            StatusCode::OK,
1066            xml_wrap(
1067                "DescribeDBInstances",
1068                &format!(
1069                    "<DBInstances>{}</DBInstances>{}",
1070                    paginated
1071                        .items
1072                        .iter()
1073                        .map(|instance| {
1074                            format!(
1075                                "<DBInstance>{}</DBInstance>",
1076                                db_instance_xml(instance, None)
1077                            )
1078                        })
1079                        .collect::<String>(),
1080                    marker_xml
1081                ),
1082                &request.request_id,
1083            ),
1084        ))
1085    }
1086
1087    fn describe_orderable_db_instance_options(
1088        &self,
1089        request: &AwsRequest,
1090    ) -> Result<AwsResponse, AwsServiceError> {
1091        let engine = optional_param(request, "Engine");
1092        let engine_version = optional_param(request, "EngineVersion");
1093        let db_instance_class = optional_param(request, "DBInstanceClass");
1094        let license_model = optional_param(request, "LicenseModel");
1095        let vpc = parse_optional_bool(optional_param(request, "Vpc").as_deref())?;
1096
1097        let options = filter_orderable_options(
1098            &default_orderable_options(),
1099            &engine,
1100            &engine_version,
1101            &db_instance_class,
1102            &license_model,
1103            vpc,
1104        );
1105
1106        Ok(AwsResponse::xml(
1107            StatusCode::OK,
1108            xml_wrap(
1109                "DescribeOrderableDBInstanceOptions",
1110                &format!(
1111                    "<OrderableDBInstanceOptions>{}</OrderableDBInstanceOptions>",
1112                    options.iter().map(orderable_option_xml).collect::<String>()
1113                ),
1114                &request.request_id,
1115            ),
1116        ))
1117    }
1118
1119    fn add_tags_to_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1120        let resource_name = required_param(request, "ResourceName")?;
1121        let tags = parse_tags(request)?;
1122
1123        if tags.is_empty() {
1124            return Err(AwsServiceError::aws_error(
1125                StatusCode::BAD_REQUEST,
1126                "MissingParameter",
1127                "The request must contain the parameter Tags.",
1128            ));
1129        }
1130
1131        let mut accounts = self.state.write();
1132        let state = accounts.get_or_create(&request.account_id);
1133        let instance = find_instance_by_arn_mut(state, &resource_name)?;
1134        merge_tags(&mut instance.tags, &tags);
1135
1136        Ok(AwsResponse::xml(
1137            StatusCode::OK,
1138            xml_wrap("AddTagsToResource", "", &request.request_id),
1139        ))
1140    }
1141
1142    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1143        let resource_name = required_param(request, "ResourceName")?;
1144        if query_param_prefix_exists(request, "Filters.") {
1145            return Err(AwsServiceError::aws_error(
1146                StatusCode::BAD_REQUEST,
1147                "InvalidParameterValue",
1148                "Filters are not yet supported for ListTagsForResource.",
1149            ));
1150        }
1151
1152        let accounts = self.state.read();
1153        let empty = RdsState::new(&request.account_id, &request.region);
1154        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1155        let instance = find_instance_by_arn(state, &resource_name)?;
1156        let tag_xml = instance.tags.iter().map(tag_xml).collect::<String>();
1157
1158        Ok(AwsResponse::xml(
1159            StatusCode::OK,
1160            xml_wrap(
1161                "ListTagsForResource",
1162                &format!("<TagList>{tag_xml}</TagList>"),
1163                &request.request_id,
1164            ),
1165        ))
1166    }
1167
1168    fn remove_tags_from_resource(
1169        &self,
1170        request: &AwsRequest,
1171    ) -> Result<AwsResponse, AwsServiceError> {
1172        let resource_name = required_param(request, "ResourceName")?;
1173        let tag_keys = parse_tag_keys(request)?;
1174
1175        if tag_keys.is_empty() {
1176            return Err(AwsServiceError::aws_error(
1177                StatusCode::BAD_REQUEST,
1178                "MissingParameter",
1179                "The request must contain the parameter TagKeys.",
1180            ));
1181        }
1182
1183        let mut accounts = self.state.write();
1184        let state = accounts.get_or_create(&request.account_id);
1185        let instance = find_instance_by_arn_mut(state, &resource_name)?;
1186        instance
1187            .tags
1188            .retain(|tag| !tag_keys.iter().any(|key| key == &tag.key));
1189
1190        Ok(AwsResponse::xml(
1191            StatusCode::OK,
1192            xml_wrap("RemoveTagsFromResource", "", &request.request_id),
1193        ))
1194    }
1195
1196    async fn create_db_snapshot(
1197        &self,
1198        request: &AwsRequest,
1199    ) -> Result<AwsResponse, AwsServiceError> {
1200        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1201        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1202
1203        let runtime = self.runtime.as_ref().ok_or_else(|| {
1204            AwsServiceError::aws_error(
1205                StatusCode::SERVICE_UNAVAILABLE,
1206                "InvalidParameterValue",
1207                "Docker/Podman is required for RDS snapshots but is not available",
1208            )
1209        })?;
1210
1211        let (instance, db_name) = {
1212            let accounts = self.state.read();
1213            let empty = RdsState::new(&request.account_id, &request.region);
1214            let state = accounts.get(&request.account_id).unwrap_or(&empty);
1215
1216            if state.snapshots.contains_key(&db_snapshot_identifier) {
1217                return Err(AwsServiceError::aws_error(
1218                    StatusCode::CONFLICT,
1219                    "DBSnapshotAlreadyExists",
1220                    format!("DBSnapshot {db_snapshot_identifier} already exists."),
1221                ));
1222            }
1223
1224            let instance = state
1225                .instances
1226                .get(&db_instance_identifier)
1227                .cloned()
1228                .ok_or_else(|| db_instance_not_found(&db_instance_identifier))?;
1229
1230            let default_db = default_db_name(&instance.engine);
1231            let db_name = instance
1232                .db_name
1233                .as_deref()
1234                .unwrap_or(default_db)
1235                .to_string();
1236
1237            (instance, db_name)
1238        };
1239
1240        let dump_data = runtime
1241            .dump_database(
1242                &db_instance_identifier,
1243                &instance.engine,
1244                &instance.master_username,
1245                &instance.master_user_password,
1246                &db_name,
1247            )
1248            .await
1249            .map_err(runtime_error_to_service_error)?;
1250
1251        let mut accounts = self.state.write();
1252        let state = accounts.get_or_create(&request.account_id);
1253
1254        if state.snapshots.contains_key(&db_snapshot_identifier) {
1255            return Err(AwsServiceError::aws_error(
1256                StatusCode::CONFLICT,
1257                "DBSnapshotAlreadyExists",
1258                format!("DBSnapshot {db_snapshot_identifier} already exists."),
1259            ));
1260        }
1261
1262        let snapshot = DbSnapshot {
1263            db_snapshot_identifier: db_snapshot_identifier.clone(),
1264            db_snapshot_arn: state.db_snapshot_arn(&db_snapshot_identifier),
1265            db_instance_identifier: instance.db_instance_identifier.clone(),
1266            snapshot_create_time: Utc::now(),
1267            engine: instance.engine.clone(),
1268            engine_version: instance.engine_version.clone(),
1269            allocated_storage: instance.allocated_storage,
1270            status: "available".to_string(),
1271            port: instance.port,
1272            master_username: instance.master_username.clone(),
1273            db_name: instance.db_name.clone(),
1274            dbi_resource_id: instance.dbi_resource_id.clone(),
1275            snapshot_type: "manual".to_string(),
1276            master_user_password: instance.master_user_password.clone(),
1277            tags: Vec::new(),
1278            dump_data,
1279        };
1280
1281        state
1282            .snapshots
1283            .insert(db_snapshot_identifier.clone(), snapshot.clone());
1284        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1285        drop(accounts);
1286
1287        self.emit_event(
1288            RdsSourceType::DbSnapshot,
1289            &db_snapshot_identifier,
1290            &snapshot_arn,
1291            "RDS-EVENT-0042",
1292            &["creation"],
1293            "Manual snapshot created",
1294        );
1295
1296        Ok(AwsResponse::xml(
1297            StatusCode::OK,
1298            xml_wrap(
1299                "CreateDBSnapshot",
1300                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1301                &request.request_id,
1302            ),
1303        ))
1304    }
1305
1306    fn describe_db_snapshots(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1307        let db_snapshot_identifier = optional_param(request, "DBSnapshotIdentifier");
1308        let db_instance_identifier = optional_param(request, "DBInstanceIdentifier");
1309        let marker = optional_param(request, "Marker");
1310        let max_records = optional_param(request, "MaxRecords");
1311
1312        if db_snapshot_identifier.is_some() && db_instance_identifier.is_some() {
1313            return Err(AwsServiceError::aws_error(
1314                StatusCode::BAD_REQUEST,
1315                "InvalidParameterCombination",
1316                "Cannot specify both DBSnapshotIdentifier and DBInstanceIdentifier.",
1317            ));
1318        }
1319
1320        let accounts = self.state.read();
1321        let empty = RdsState::new(&request.account_id, &request.region);
1322        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1323
1324        // If specific snapshot requested, return just that one (no pagination)
1325        if let Some(snapshot_id) = db_snapshot_identifier {
1326            let snapshot = state
1327                .snapshots
1328                .get(&snapshot_id)
1329                .cloned()
1330                .ok_or_else(|| db_snapshot_not_found(&snapshot_id))?;
1331
1332            return Ok(AwsResponse::xml(
1333                StatusCode::OK,
1334                xml_wrap(
1335                    "DescribeDBSnapshots",
1336                    &format!(
1337                        "<DBSnapshots><DBSnapshot>{}</DBSnapshot></DBSnapshots>",
1338                        db_snapshot_xml(&snapshot)
1339                    ),
1340                    &request.request_id,
1341                ),
1342            ));
1343        }
1344
1345        // Get snapshots, filtered by instance identifier if provided
1346        let mut snapshots: Vec<DbSnapshot> = if let Some(instance_id) = db_instance_identifier {
1347            state
1348                .snapshots
1349                .values()
1350                .filter(|s| s.db_instance_identifier == instance_id)
1351                .cloned()
1352                .collect()
1353        } else {
1354            state.snapshots.values().cloned().collect()
1355        };
1356
1357        // Sort by creation time, then identifier
1358        snapshots.sort_by(|a, b| {
1359            a.snapshot_create_time
1360                .cmp(&b.snapshot_create_time)
1361                .then_with(|| a.db_snapshot_identifier.cmp(&b.db_snapshot_identifier))
1362        });
1363
1364        // Apply pagination
1365        let paginated = paginate(snapshots, marker, max_records, |snap| {
1366            &snap.db_snapshot_identifier
1367        })?;
1368
1369        let marker_xml = paginated
1370            .next_marker
1371            .as_ref()
1372            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1373            .unwrap_or_default();
1374
1375        Ok(AwsResponse::xml(
1376            StatusCode::OK,
1377            xml_wrap(
1378                "DescribeDBSnapshots",
1379                &format!(
1380                    "<DBSnapshots>{}</DBSnapshots>{}",
1381                    paginated
1382                        .items
1383                        .iter()
1384                        .map(|snapshot| format!(
1385                            "<DBSnapshot>{}</DBSnapshot>",
1386                            db_snapshot_xml(snapshot)
1387                        ))
1388                        .collect::<String>(),
1389                    marker_xml
1390                ),
1391                &request.request_id,
1392            ),
1393        ))
1394    }
1395
1396    fn delete_db_snapshot(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1397        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1398
1399        let mut accounts = self.state.write();
1400        let state = accounts.get_or_create(&request.account_id);
1401
1402        let snapshot = state
1403            .snapshots
1404            .remove(&db_snapshot_identifier)
1405            .ok_or_else(|| db_snapshot_not_found(&db_snapshot_identifier))?;
1406        let snapshot_arn = snapshot.db_snapshot_arn.clone();
1407        drop(accounts);
1408
1409        self.emit_event(
1410            RdsSourceType::DbSnapshot,
1411            &db_snapshot_identifier,
1412            &snapshot_arn,
1413            "RDS-EVENT-0041",
1414            &["deletion"],
1415            "Manual snapshot deleted",
1416        );
1417
1418        Ok(AwsResponse::xml(
1419            StatusCode::OK,
1420            xml_wrap(
1421                "DeleteDBSnapshot",
1422                &format!("<DBSnapshot>{}</DBSnapshot>", db_snapshot_xml(&snapshot)),
1423                &request.request_id,
1424            ),
1425        ))
1426    }
1427
1428    async fn restore_db_instance_from_db_snapshot(
1429        &self,
1430        request: &AwsRequest,
1431    ) -> Result<AwsResponse, AwsServiceError> {
1432        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1433        let db_snapshot_identifier = required_param(request, "DBSnapshotIdentifier")?;
1434        let vpc_security_group_ids = parse_vpc_security_group_ids(request);
1435
1436        let runtime = self.require_runtime()?;
1437
1438        let (snapshot, dbi_resource_id, db_instance_arn, created_at) = {
1439            let mut accounts = self.state.write();
1440            let state = accounts.get_or_create(&request.account_id);
1441
1442            if !state.begin_instance_creation(&db_instance_identifier) {
1443                return Err(AwsServiceError::aws_error(
1444                    StatusCode::CONFLICT,
1445                    "DBInstanceAlreadyExists",
1446                    format!("DBInstance {db_instance_identifier} already exists."),
1447                ));
1448            }
1449
1450            let snapshot = match state.snapshots.get(&db_snapshot_identifier).cloned() {
1451                Some(s) => s,
1452                None => {
1453                    state.cancel_instance_creation(&db_instance_identifier);
1454                    return Err(db_snapshot_not_found(&db_snapshot_identifier));
1455                }
1456            };
1457
1458            let dbi_resource_id = state.next_dbi_resource_id();
1459            let db_instance_arn = state.db_instance_arn(&db_instance_identifier);
1460            let created_at = Utc::now();
1461
1462            (snapshot, dbi_resource_id, db_instance_arn, created_at)
1463        };
1464
1465        let db_name = snapshot
1466            .db_name
1467            .as_deref()
1468            .unwrap_or(default_db_name(&snapshot.engine));
1469        let running = match runtime
1470            .ensure_postgres(
1471                &db_instance_identifier,
1472                &snapshot.engine,
1473                &snapshot.engine_version,
1474                &snapshot.master_username,
1475                &snapshot.master_user_password,
1476                db_name,
1477                &request.account_id,
1478                &request.region,
1479            )
1480            .await
1481        {
1482            Ok(running) => running,
1483            Err(e) => {
1484                self.state
1485                    .write()
1486                    .get_or_create(&request.account_id)
1487                    .cancel_instance_creation(&db_instance_identifier);
1488                return Err(runtime_error_to_service_error(e));
1489            }
1490        };
1491
1492        if let Err(e) = runtime
1493            .restore_database(
1494                &db_instance_identifier,
1495                &snapshot.engine,
1496                &snapshot.master_username,
1497                &snapshot.master_user_password,
1498                db_name,
1499                &snapshot.dump_data,
1500            )
1501            .await
1502        {
1503            self.state
1504                .write()
1505                .get_or_create(&request.account_id)
1506                .cancel_instance_creation(&db_instance_identifier);
1507            runtime.stop_container(&db_instance_identifier).await;
1508            return Err(runtime_error_to_service_error(e));
1509        }
1510
1511        let instance = build_restored_instance(
1512            &db_instance_identifier,
1513            db_instance_arn,
1514            dbi_resource_id,
1515            created_at,
1516            vpc_security_group_ids,
1517            &snapshot,
1518            &running,
1519        );
1520
1521        self.state
1522            .write()
1523            .get_or_create(&request.account_id)
1524            .finish_instance_creation(instance.clone());
1525
1526        self.emit_event(
1527            RdsSourceType::DbInstance,
1528            &db_instance_identifier,
1529            &instance.db_instance_arn,
1530            "RDS-EVENT-0043",
1531            &["creation"],
1532            "DB instance restored from snapshot",
1533        );
1534
1535        Ok(AwsResponse::xml(
1536            StatusCode::OK,
1537            xml_wrap(
1538                "RestoreDBInstanceFromDBSnapshot",
1539                &format!(
1540                    "<DBInstance>{}</DBInstance>",
1541                    db_instance_xml(&instance, None)
1542                ),
1543                &request.request_id,
1544            ),
1545        ))
1546    }
1547
1548    async fn create_db_instance_read_replica(
1549        &self,
1550        request: &AwsRequest,
1551    ) -> Result<AwsResponse, AwsServiceError> {
1552        let db_instance_identifier = required_param(request, "DBInstanceIdentifier")?;
1553        let source_db_instance_identifier = required_param(request, "SourceDBInstanceIdentifier")?;
1554
1555        let runtime = self.runtime.as_ref().ok_or_else(|| {
1556            AwsServiceError::aws_error(
1557                StatusCode::SERVICE_UNAVAILABLE,
1558                "InvalidParameterValue",
1559                "Docker/Podman is required for RDS read replicas but is not available",
1560            )
1561        })?;
1562
1563        let (source_instance, db_name) = {
1564            let mut accounts = self.state.write();
1565            let state = accounts.get_or_create(&request.account_id);
1566
1567            if !state.begin_instance_creation(&db_instance_identifier) {
1568                return Err(AwsServiceError::aws_error(
1569                    StatusCode::CONFLICT,
1570                    "DBInstanceAlreadyExists",
1571                    format!("DBInstance {db_instance_identifier} already exists."),
1572                ));
1573            }
1574
1575            let source_instance = match state.instances.get(&source_db_instance_identifier).cloned()
1576            {
1577                Some(inst) => inst,
1578                None => {
1579                    state.cancel_instance_creation(&db_instance_identifier);
1580                    return Err(db_instance_not_found(&source_db_instance_identifier));
1581                }
1582            };
1583
1584            let default_db = default_db_name(&source_instance.engine);
1585            let db_name = source_instance
1586                .db_name
1587                .as_deref()
1588                .unwrap_or(default_db)
1589                .to_string();
1590
1591            (source_instance, db_name)
1592        };
1593
1594        let dump_data = match runtime
1595            .dump_database(
1596                &source_db_instance_identifier,
1597                &source_instance.engine,
1598                &source_instance.master_username,
1599                &source_instance.master_user_password,
1600                &db_name,
1601            )
1602            .await
1603        {
1604            Ok(data) => data,
1605            Err(e) => {
1606                self.state
1607                    .write()
1608                    .get_or_create(&request.account_id)
1609                    .cancel_instance_creation(&db_instance_identifier);
1610                return Err(runtime_error_to_service_error(e));
1611            }
1612        };
1613
1614        let (dbi_resource_id, db_instance_arn) = {
1615            let accounts = self.state.read();
1616            let empty = RdsState::new(&request.account_id, &request.region);
1617            let s = accounts.get(&request.account_id).unwrap_or(&empty);
1618            (
1619                s.next_dbi_resource_id(),
1620                s.db_instance_arn(&db_instance_identifier),
1621            )
1622        };
1623        let created_at = Utc::now();
1624
1625        let running = match runtime
1626            .ensure_postgres(
1627                &db_instance_identifier,
1628                &source_instance.engine,
1629                &source_instance.engine_version,
1630                &source_instance.master_username,
1631                &source_instance.master_user_password,
1632                &db_name,
1633                &request.account_id,
1634                &request.region,
1635            )
1636            .await
1637        {
1638            Ok(running) => running,
1639            Err(e) => {
1640                self.state
1641                    .write()
1642                    .get_or_create(&request.account_id)
1643                    .cancel_instance_creation(&db_instance_identifier);
1644                return Err(runtime_error_to_service_error(e));
1645            }
1646        };
1647
1648        if let Err(e) = runtime
1649            .restore_database(
1650                &db_instance_identifier,
1651                &source_instance.engine,
1652                &source_instance.master_username,
1653                &source_instance.master_user_password,
1654                &db_name,
1655                &dump_data,
1656            )
1657            .await
1658        {
1659            self.state
1660                .write()
1661                .get_or_create(&request.account_id)
1662                .cancel_instance_creation(&db_instance_identifier);
1663            runtime.stop_container(&db_instance_identifier).await;
1664            return Err(runtime_error_to_service_error(e));
1665        }
1666
1667        let replica = build_read_replica_instance(
1668            &db_instance_identifier,
1669            db_instance_arn,
1670            dbi_resource_id,
1671            created_at,
1672            &source_db_instance_identifier,
1673            &source_instance,
1674            &running,
1675        );
1676
1677        let source_missing = {
1678            let mut accounts = self.state.write();
1679            let state = accounts.get_or_create(&request.account_id);
1680            match state.instances.get_mut(&source_db_instance_identifier) {
1681                Some(source) => {
1682                    source
1683                        .read_replica_db_instance_identifiers
1684                        .push(db_instance_identifier.clone());
1685                    state.finish_instance_creation(replica.clone());
1686                    false
1687                }
1688                None => {
1689                    state.cancel_instance_creation(&db_instance_identifier);
1690                    true
1691                }
1692            }
1693        };
1694
1695        if source_missing {
1696            runtime.stop_container(&db_instance_identifier).await;
1697            return Err(db_instance_not_found(&source_db_instance_identifier));
1698        }
1699
1700        self.emit_event(
1701            RdsSourceType::DbInstance,
1702            &db_instance_identifier,
1703            &replica.db_instance_arn,
1704            "RDS-EVENT-0005",
1705            &["creation", "read replica"],
1706            "Read replica DB instance created",
1707        );
1708
1709        Ok(AwsResponse::xml(
1710            StatusCode::OK,
1711            xml_wrap(
1712                "CreateDBInstanceReadReplica",
1713                &format!(
1714                    "<DBInstance>{}</DBInstance>",
1715                    db_instance_xml(&replica, None)
1716                ),
1717                &request.request_id,
1718            ),
1719        ))
1720    }
1721
1722    fn create_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1723        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1724        let db_subnet_group_description = required_param(request, "DBSubnetGroupDescription")?;
1725        let subnet_ids = parse_subnet_ids(request)?;
1726
1727        if subnet_ids.is_empty() {
1728            return Err(AwsServiceError::aws_error(
1729                StatusCode::BAD_REQUEST,
1730                "InvalidParameterValue",
1731                "At least one subnet must be specified.",
1732            ));
1733        }
1734
1735        if subnet_ids.len() < 2 {
1736            return Err(AwsServiceError::aws_error(
1737                StatusCode::BAD_REQUEST,
1738                "DBSubnetGroupDoesNotCoverEnoughAZs",
1739                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1740            ));
1741        }
1742
1743        let mut accounts = self.state.write();
1744        let state = accounts.get_or_create(&request.account_id);
1745
1746        if state.subnet_groups.contains_key(&db_subnet_group_name) {
1747            return Err(AwsServiceError::aws_error(
1748                StatusCode::CONFLICT,
1749                "DBSubnetGroupAlreadyExists",
1750                format!("DBSubnetGroup {db_subnet_group_name} already exists."),
1751            ));
1752        }
1753
1754        let vpc_id = format!("vpc-{}", uuid::Uuid::new_v4().simple());
1755        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1756            .map(|i| format!("{}{}", &state.region, char::from(b'a' + (i % 6) as u8)))
1757            .collect();
1758
1759        // Validate that subnets span at least 2 unique Availability Zones
1760        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1761        if unique_azs.len() < 2 {
1762            return Err(AwsServiceError::aws_error(
1763                StatusCode::BAD_REQUEST,
1764                "DBSubnetGroupDoesNotCoverEnoughAZs",
1765                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1766            ));
1767        }
1768
1769        let db_subnet_group_arn = state.db_subnet_group_arn(&db_subnet_group_name);
1770        let tags = parse_tags(request)?;
1771
1772        let subnet_group = DbSubnetGroup {
1773            db_subnet_group_name: db_subnet_group_name.clone(),
1774            db_subnet_group_arn,
1775            db_subnet_group_description,
1776            vpc_id,
1777            subnet_ids,
1778            subnet_availability_zones,
1779            tags,
1780        };
1781
1782        state
1783            .subnet_groups
1784            .insert(db_subnet_group_name, subnet_group.clone());
1785
1786        Ok(AwsResponse::xml(
1787            StatusCode::OK,
1788            xml_wrap(
1789                "CreateDBSubnetGroup",
1790                &format!(
1791                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1792                    db_subnet_group_xml(&subnet_group)
1793                ),
1794                &request.request_id,
1795            ),
1796        ))
1797    }
1798
1799    fn describe_db_subnet_groups(
1800        &self,
1801        request: &AwsRequest,
1802    ) -> Result<AwsResponse, AwsServiceError> {
1803        let db_subnet_group_name = optional_param(request, "DBSubnetGroupName");
1804        let marker = optional_param(request, "Marker");
1805        let max_records = optional_param(request, "MaxRecords");
1806
1807        let accounts = self.state.read();
1808        let empty = RdsState::new(&request.account_id, &request.region);
1809        let state = accounts.get(&request.account_id).unwrap_or(&empty);
1810
1811        // If specific subnet group requested, return just that one (no pagination)
1812        if let Some(name) = db_subnet_group_name {
1813            let sg = state.subnet_groups.get(&name).ok_or_else(|| {
1814                AwsServiceError::aws_error(
1815                    StatusCode::NOT_FOUND,
1816                    "DBSubnetGroupNotFoundFault",
1817                    format!("DBSubnetGroup {} not found.", name),
1818                )
1819            })?;
1820
1821            return Ok(AwsResponse::xml(
1822                StatusCode::OK,
1823                xml_wrap(
1824                    "DescribeDBSubnetGroups",
1825                    &format!(
1826                        "<DBSubnetGroups><DBSubnetGroup>{}</DBSubnetGroup></DBSubnetGroups>",
1827                        db_subnet_group_xml(sg)
1828                    ),
1829                    &request.request_id,
1830                ),
1831            ));
1832        }
1833
1834        // Get all subnet groups sorted by name
1835        let mut subnet_groups: Vec<DbSubnetGroup> = state.subnet_groups.values().cloned().collect();
1836        subnet_groups.sort_by(|a, b| a.db_subnet_group_name.cmp(&b.db_subnet_group_name));
1837
1838        // Apply pagination
1839        let paginated = paginate(subnet_groups, marker, max_records, |sg| {
1840            &sg.db_subnet_group_name
1841        })?;
1842
1843        let marker_xml = paginated
1844            .next_marker
1845            .as_ref()
1846            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
1847            .unwrap_or_default();
1848
1849        let body = paginated
1850            .items
1851            .iter()
1852            .map(|sg| format!("<DBSubnetGroup>{}</DBSubnetGroup>", db_subnet_group_xml(sg)))
1853            .collect::<Vec<_>>()
1854            .join("");
1855
1856        Ok(AwsResponse::xml(
1857            StatusCode::OK,
1858            xml_wrap(
1859                "DescribeDBSubnetGroups",
1860                &format!("<DBSubnetGroups>{}</DBSubnetGroups>{}", body, marker_xml),
1861                &request.request_id,
1862            ),
1863        ))
1864    }
1865
1866    fn delete_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1867        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1868
1869        let mut accounts = self.state.write();
1870        let state = accounts.get_or_create(&request.account_id);
1871
1872        if state.subnet_groups.remove(&db_subnet_group_name).is_none() {
1873            return Err(AwsServiceError::aws_error(
1874                StatusCode::NOT_FOUND,
1875                "DBSubnetGroupNotFoundFault",
1876                format!("DBSubnetGroup {db_subnet_group_name} not found."),
1877            ));
1878        }
1879
1880        Ok(AwsResponse::xml(
1881            StatusCode::OK,
1882            xml_wrap("DeleteDBSubnetGroup", "", &request.request_id),
1883        ))
1884    }
1885
1886    fn modify_db_subnet_group(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1887        let db_subnet_group_name = required_param(request, "DBSubnetGroupName")?;
1888        let subnet_ids = parse_subnet_ids(request)?;
1889
1890        if subnet_ids.is_empty() {
1891            return Err(AwsServiceError::aws_error(
1892                StatusCode::BAD_REQUEST,
1893                "InvalidParameterValue",
1894                "At least one subnet must be specified.",
1895            ));
1896        }
1897
1898        if subnet_ids.len() < 2 {
1899            return Err(AwsServiceError::aws_error(
1900                StatusCode::BAD_REQUEST,
1901                "DBSubnetGroupDoesNotCoverEnoughAZs",
1902                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1903            ));
1904        }
1905
1906        let mut accounts = self.state.write();
1907        let state = accounts.get_or_create(&request.account_id);
1908
1909        let region = state.region.clone();
1910
1911        let subnet_group = state
1912            .subnet_groups
1913            .get_mut(&db_subnet_group_name)
1914            .ok_or_else(|| {
1915                AwsServiceError::aws_error(
1916                    StatusCode::NOT_FOUND,
1917                    "DBSubnetGroupNotFoundFault",
1918                    format!("DBSubnetGroup {db_subnet_group_name} not found."),
1919                )
1920            })?;
1921
1922        let subnet_availability_zones: Vec<String> = (0..subnet_ids.len())
1923            .map(|i| format!("{}{}", &region, char::from(b'a' + (i % 6) as u8)))
1924            .collect();
1925
1926        // Validate that subnets span at least 2 unique Availability Zones
1927        let unique_azs: std::collections::HashSet<_> = subnet_availability_zones.iter().collect();
1928        if unique_azs.len() < 2 {
1929            return Err(AwsServiceError::aws_error(
1930                StatusCode::BAD_REQUEST,
1931                "DBSubnetGroupDoesNotCoverEnoughAZs",
1932                "DB Subnet Group must contain at least 2 subnets in different Availability Zones.",
1933            ));
1934        }
1935
1936        subnet_group.subnet_ids = subnet_ids;
1937        subnet_group.subnet_availability_zones = subnet_availability_zones;
1938
1939        let subnet_group_clone = subnet_group.clone();
1940
1941        Ok(AwsResponse::xml(
1942            StatusCode::OK,
1943            xml_wrap(
1944                "ModifyDBSubnetGroup",
1945                &format!(
1946                    "<DBSubnetGroup>{}</DBSubnetGroup>",
1947                    db_subnet_group_xml(&subnet_group_clone)
1948                ),
1949                &request.request_id,
1950            ),
1951        ))
1952    }
1953
1954    fn create_db_parameter_group(
1955        &self,
1956        request: &AwsRequest,
1957    ) -> Result<AwsResponse, AwsServiceError> {
1958        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
1959        let db_parameter_group_family = required_param(request, "DBParameterGroupFamily")?;
1960        let description = required_param(request, "Description")?;
1961
1962        // Validate parameter group family against supported engines and versions
1963        let valid_families = [
1964            "postgres16",
1965            "postgres15",
1966            "postgres14",
1967            "postgres13",
1968            "mysql8.0",
1969            "mysql5.7",
1970            "mariadb10.11",
1971            "mariadb10.6",
1972        ];
1973
1974        if !valid_families.contains(&db_parameter_group_family.as_str()) {
1975            return Err(AwsServiceError::aws_error(
1976                StatusCode::BAD_REQUEST,
1977                "InvalidParameterValue",
1978                format!("DBParameterGroupFamily '{db_parameter_group_family}' is not supported."),
1979            ));
1980        }
1981
1982        let mut accounts = self.state.write();
1983        let state = accounts.get_or_create(&request.account_id);
1984
1985        if state
1986            .parameter_groups
1987            .contains_key(&db_parameter_group_name)
1988        {
1989            return Err(AwsServiceError::aws_error(
1990                StatusCode::CONFLICT,
1991                "DBParameterGroupAlreadyExists",
1992                format!("DBParameterGroup {db_parameter_group_name} already exists."),
1993            ));
1994        }
1995
1996        let db_parameter_group_arn = state.db_parameter_group_arn(&db_parameter_group_name);
1997        let tags = parse_tags(request)?;
1998
1999        let parameter_group = DbParameterGroup {
2000            db_parameter_group_name: db_parameter_group_name.clone(),
2001            db_parameter_group_arn,
2002            db_parameter_group_family,
2003            description,
2004            parameters: std::collections::HashMap::new(),
2005            tags,
2006        };
2007
2008        state
2009            .parameter_groups
2010            .insert(db_parameter_group_name, parameter_group.clone());
2011
2012        Ok(AwsResponse::xml(
2013            StatusCode::OK,
2014            xml_wrap(
2015                "CreateDBParameterGroup",
2016                &format!(
2017                    "<DBParameterGroup>{}</DBParameterGroup>",
2018                    db_parameter_group_xml(&parameter_group)
2019                ),
2020                &request.request_id,
2021            ),
2022        ))
2023    }
2024
2025    fn describe_db_parameter_groups(
2026        &self,
2027        request: &AwsRequest,
2028    ) -> Result<AwsResponse, AwsServiceError> {
2029        let db_parameter_group_name = optional_param(request, "DBParameterGroupName");
2030        let marker = optional_param(request, "Marker");
2031        let max_records = optional_param(request, "MaxRecords");
2032
2033        let accounts = self.state.read();
2034        let empty = RdsState::new(&request.account_id, &request.region);
2035        let state = accounts.get(&request.account_id).unwrap_or(&empty);
2036
2037        // If specific parameter group requested, return just that one (no pagination)
2038        if let Some(name) = db_parameter_group_name {
2039            let pg = state.parameter_groups.get(&name).ok_or_else(|| {
2040                AwsServiceError::aws_error(
2041                    StatusCode::NOT_FOUND,
2042                    "DBParameterGroupNotFound",
2043                    format!("DBParameterGroup {} not found.", name),
2044                )
2045            })?;
2046
2047            return Ok(AwsResponse::xml(
2048                StatusCode::OK,
2049                xml_wrap(
2050                    "DescribeDBParameterGroups",
2051                    &format!(
2052                        "<DBParameterGroups><DBParameterGroup>{}</DBParameterGroup></DBParameterGroups>",
2053                        db_parameter_group_xml(pg)
2054                    ),
2055                    &request.request_id,
2056                ),
2057            ));
2058        }
2059
2060        // Get all parameter groups sorted by name
2061        let mut parameter_groups: Vec<DbParameterGroup> =
2062            state.parameter_groups.values().cloned().collect();
2063        parameter_groups.sort_by(|a, b| a.db_parameter_group_name.cmp(&b.db_parameter_group_name));
2064
2065        // Apply pagination
2066        let paginated = paginate(parameter_groups, marker, max_records, |pg| {
2067            &pg.db_parameter_group_name
2068        })?;
2069
2070        let marker_xml = paginated
2071            .next_marker
2072            .as_ref()
2073            .map(|m| format!("<Marker>{}</Marker>", xml_escape(m)))
2074            .unwrap_or_default();
2075
2076        let body = paginated
2077            .items
2078            .iter()
2079            .map(|pg| {
2080                format!(
2081                    "<DBParameterGroup>{}</DBParameterGroup>",
2082                    db_parameter_group_xml(pg)
2083                )
2084            })
2085            .collect::<Vec<_>>()
2086            .join("");
2087
2088        Ok(AwsResponse::xml(
2089            StatusCode::OK,
2090            xml_wrap(
2091                "DescribeDBParameterGroups",
2092                &format!(
2093                    "<DBParameterGroups>{}</DBParameterGroups>{}",
2094                    body, marker_xml
2095                ),
2096                &request.request_id,
2097            ),
2098        ))
2099    }
2100
2101    fn delete_db_parameter_group(
2102        &self,
2103        request: &AwsRequest,
2104    ) -> Result<AwsResponse, AwsServiceError> {
2105        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2106
2107        let mut accounts = self.state.write();
2108        let state = accounts.get_or_create(&request.account_id);
2109
2110        if db_parameter_group_name.starts_with("default.") {
2111            return Err(AwsServiceError::aws_error(
2112                StatusCode::BAD_REQUEST,
2113                "InvalidParameterValue",
2114                "Cannot delete default parameter groups.",
2115            ));
2116        }
2117
2118        if state
2119            .parameter_groups
2120            .remove(&db_parameter_group_name)
2121            .is_none()
2122        {
2123            return Err(AwsServiceError::aws_error(
2124                StatusCode::NOT_FOUND,
2125                "DBParameterGroupNotFound",
2126                format!("DBParameterGroup {db_parameter_group_name} not found."),
2127            ));
2128        }
2129
2130        Ok(AwsResponse::xml(
2131            StatusCode::OK,
2132            xml_wrap("DeleteDBParameterGroup", "", &request.request_id),
2133        ))
2134    }
2135
2136    fn modify_db_parameter_group(
2137        &self,
2138        request: &AwsRequest,
2139    ) -> Result<AwsResponse, AwsServiceError> {
2140        let db_parameter_group_name = required_param(request, "DBParameterGroupName")?;
2141
2142        let mut accounts = self.state.write();
2143        let state = accounts.get_or_create(&request.account_id);
2144
2145        let parameter_group = state
2146            .parameter_groups
2147            .get_mut(&db_parameter_group_name)
2148            .ok_or_else(|| {
2149                AwsServiceError::aws_error(
2150                    StatusCode::NOT_FOUND,
2151                    "DBParameterGroupNotFound",
2152                    format!("DBParameterGroup {db_parameter_group_name} not found."),
2153                )
2154            })?;
2155
2156        if let Some(new_description) = optional_param(request, "Description") {
2157            parameter_group.description = new_description;
2158        }
2159
2160        let parameter_group_clone = parameter_group.clone();
2161
2162        Ok(AwsResponse::xml(
2163            StatusCode::OK,
2164            xml_wrap(
2165                "ModifyDBParameterGroup",
2166                &format!(
2167                    "<DBParameterGroupName>{}</DBParameterGroupName>",
2168                    xml_escape(&parameter_group_clone.db_parameter_group_name)
2169                ),
2170                &request.request_id,
2171            ),
2172        ))
2173    }
2174}
2175
2176fn optional_param(req: &AwsRequest, name: &str) -> Option<String> {
2177    fakecloud_core::query::optional_query_param(req, name)
2178}
2179
2180fn required_param(req: &AwsRequest, name: &str) -> Result<String, AwsServiceError> {
2181    fakecloud_core::query::required_query_param(req, name)
2182}
2183
2184fn required_i32_param(req: &AwsRequest, name: &str) -> Result<i32, AwsServiceError> {
2185    let value = required_param(req, name)?;
2186    value.parse::<i32>().map_err(|_| {
2187        AwsServiceError::aws_error(
2188            StatusCode::BAD_REQUEST,
2189            "InvalidParameterValue",
2190            format!("Parameter {name} must be a valid integer."),
2191        )
2192    })
2193}
2194
2195fn optional_i32_param(req: &AwsRequest, name: &str) -> Result<Option<i32>, AwsServiceError> {
2196    optional_param(req, name)
2197        .map(|value| {
2198            value.parse::<i32>().map_err(|_| {
2199                AwsServiceError::aws_error(
2200                    StatusCode::BAD_REQUEST,
2201                    "InvalidParameterValue",
2202                    format!("Parameter {name} must be a valid integer."),
2203                )
2204            })
2205        })
2206        .transpose()
2207}
2208
2209fn parse_tags(req: &AwsRequest) -> Result<Vec<RdsTag>, AwsServiceError> {
2210    let mut tags = Vec::new();
2211    for index in 1.. {
2212        let key_name = format!("Tags.Tag.{index}.Key");
2213        let value_name = format!("Tags.Tag.{index}.Value");
2214        let key = optional_param(req, &key_name);
2215        let value = optional_param(req, &value_name);
2216
2217        match (key, value) {
2218            (Some(key), Some(value)) => tags.push(RdsTag { key, value }),
2219            (None, None) => break,
2220            _ => {
2221                return Err(AwsServiceError::aws_error(
2222                    StatusCode::BAD_REQUEST,
2223                    "InvalidParameterValue",
2224                    "Each tag must include both Key and Value.",
2225                ));
2226            }
2227        }
2228    }
2229
2230    Ok(tags)
2231}
2232
2233fn parse_tag_keys(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2234    let mut keys = Vec::new();
2235    for index in 1.. {
2236        let key_name = format!("TagKeys.member.{index}");
2237        match optional_param(req, &key_name) {
2238            Some(key) => keys.push(key),
2239            None => break,
2240        }
2241    }
2242
2243    Ok(keys)
2244}
2245
2246fn parse_subnet_ids(req: &AwsRequest) -> Result<Vec<String>, AwsServiceError> {
2247    let mut subnet_ids = Vec::new();
2248    for index in 1.. {
2249        let subnet_id_name = format!("SubnetIds.SubnetIdentifier.{index}");
2250        match optional_param(req, &subnet_id_name) {
2251            Some(subnet_id) => subnet_ids.push(subnet_id),
2252            None => break,
2253        }
2254    }
2255
2256    Ok(subnet_ids)
2257}
2258
2259fn parse_vpc_security_group_ids(req: &AwsRequest) -> Vec<String> {
2260    let mut security_group_ids = Vec::new();
2261    for index in 1.. {
2262        let sg_id_name = format!("VpcSecurityGroupIds.VpcSecurityGroupId.{index}");
2263        match optional_param(req, &sg_id_name) {
2264            Some(sg_id) => security_group_ids.push(sg_id),
2265            None => break,
2266        }
2267    }
2268
2269    // If no security groups provided, return a default one
2270    if security_group_ids.is_empty() {
2271        security_group_ids.push("sg-default".to_string());
2272    }
2273
2274    security_group_ids
2275}
2276
2277fn query_param_prefix_exists(req: &AwsRequest, prefix: &str) -> bool {
2278    req.query_params.keys().any(|key| key.starts_with(prefix))
2279}
2280
2281fn parse_optional_bool(value: Option<&str>) -> Result<Option<bool>, AwsServiceError> {
2282    value
2283        .map(|raw| match raw {
2284            "true" | "True" | "TRUE" => Ok(true),
2285            "false" | "False" | "FALSE" => Ok(false),
2286            _ => Err(AwsServiceError::aws_error(
2287                StatusCode::BAD_REQUEST,
2288                "InvalidParameterValue",
2289                format!("Boolean parameter value '{raw}' is invalid."),
2290            )),
2291        })
2292        .transpose()
2293}
2294
2295struct PaginationResult<T> {
2296    items: Vec<T>,
2297    next_marker: Option<String>,
2298}
2299
2300fn paginate<T, F>(
2301    mut items: Vec<T>,
2302    marker: Option<String>,
2303    max_records: Option<String>,
2304    get_id: F,
2305) -> Result<PaginationResult<T>, AwsServiceError>
2306where
2307    F: Fn(&T) -> &str,
2308{
2309    // Parse max_records with default 100, max 100
2310    let max = if let Some(max_str) = max_records {
2311        let parsed = max_str.parse::<i32>().map_err(|_| {
2312            AwsServiceError::aws_error(
2313                StatusCode::BAD_REQUEST,
2314                "InvalidParameterValue",
2315                "MaxRecords must be a valid integer.",
2316            )
2317        })?;
2318        if !(1..=100).contains(&parsed) {
2319            return Err(AwsServiceError::aws_error(
2320                StatusCode::BAD_REQUEST,
2321                "InvalidParameterValue",
2322                "MaxRecords must be between 1 and 100.",
2323            ));
2324        }
2325        parsed as usize
2326    } else {
2327        100
2328    };
2329
2330    // Decode marker to get starting identifier
2331    let start_id = if let Some(encoded_marker) = marker {
2332        let decoded = BASE64.decode(encoded_marker.as_bytes()).map_err(|_| {
2333            AwsServiceError::aws_error(
2334                StatusCode::BAD_REQUEST,
2335                "InvalidParameterValue",
2336                "Marker is invalid.",
2337            )
2338        })?;
2339        let id = String::from_utf8(decoded).map_err(|_| {
2340            AwsServiceError::aws_error(
2341                StatusCode::BAD_REQUEST,
2342                "InvalidParameterValue",
2343                "Marker is invalid.",
2344            )
2345        })?;
2346        Some(id)
2347    } else {
2348        None
2349    };
2350
2351    // Find starting position
2352    let start_index = if let Some(ref start_id) = start_id {
2353        items
2354            .iter()
2355            .position(|item| get_id(item) == start_id)
2356            .map(|pos| pos + 1) // Start after the marker
2357            .unwrap_or(items.len()) // If not found, return empty result
2358    } else {
2359        0
2360    };
2361
2362    // Take items from start_index
2363    let total_items = items.len();
2364    let end_index = std::cmp::min(start_index + max, total_items);
2365    let paginated_items: Vec<T> = items.drain(start_index..end_index).collect();
2366
2367    // Create next marker if there are more items
2368    let next_marker = if end_index < total_items {
2369        paginated_items
2370            .last()
2371            .map(|item| BASE64.encode(get_id(item).as_bytes()))
2372    } else {
2373        None
2374    };
2375
2376    Ok(PaginationResult {
2377        items: paginated_items,
2378        next_marker,
2379    })
2380}
2381
2382fn validate_create_request(
2383    db_instance_identifier: &str,
2384    allocated_storage: i32,
2385    db_instance_class: &str,
2386    engine: &str,
2387    engine_version: &str,
2388    port: i32,
2389) -> Result<(), AwsServiceError> {
2390    if allocated_storage <= 0 {
2391        return Err(AwsServiceError::aws_error(
2392            StatusCode::BAD_REQUEST,
2393            "InvalidParameterValue",
2394            "AllocatedStorage must be greater than zero.",
2395        ));
2396    }
2397    if port <= 0 {
2398        return Err(AwsServiceError::aws_error(
2399            StatusCode::BAD_REQUEST,
2400            "InvalidParameterValue",
2401            "Port must be greater than zero.",
2402        ));
2403    }
2404    if !db_instance_identifier
2405        .chars()
2406        .all(|ch| ch.is_ascii_alphanumeric() || ch == '-')
2407    {
2408        return Err(AwsServiceError::aws_error(
2409            StatusCode::BAD_REQUEST,
2410            "InvalidParameterValue",
2411            "DBInstanceIdentifier must contain only alphanumeric characters or hyphens.",
2412        ));
2413    }
2414    // Validate engine
2415    let supported_engines = [
2416        "postgres",
2417        "mysql",
2418        "mariadb",
2419        "oracle-ee",
2420        "oracle-se2",
2421        "oracle-ee-cdb",
2422        "oracle-se2-cdb",
2423        "sqlserver-ee",
2424        "sqlserver-se",
2425        "sqlserver-ex",
2426        "sqlserver-web",
2427        "db2-se",
2428        "db2-ae",
2429    ];
2430    if !supported_engines.contains(&engine) {
2431        return Err(AwsServiceError::aws_error(
2432            StatusCode::BAD_REQUEST,
2433            "InvalidParameterValue",
2434            format!("Engine '{}' is not supported.", engine),
2435        ));
2436    }
2437
2438    // Validate engine version. The Oracle/SQL Server/Db2 lists track
2439    // the major-minor versions actually shipped by the upstream
2440    // dev-edition images (gvenzl/oracle-free 23, mssql-server 2022,
2441    // db2_community 11.5). Adding a new version here also requires
2442    // wiring the image tag in `RdsRuntime::ensure_postgres`.
2443    let supported_versions = match engine {
2444        "postgres" => vec!["16.3", "15.5", "14.10", "13.13"],
2445        "mysql" => vec!["8.0.35", "8.0.28", "5.7.44"],
2446        "mariadb" => vec!["10.11.6", "10.6.16"],
2447        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
2448            vec!["23.0.0", "21.0.0", "19.0.0"]
2449        }
2450        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
2451            vec!["16.00.4085.2.v1", "15.00.4322.2.v1"]
2452        }
2453        "db2-se" | "db2-ae" => vec!["11.5.9.0.sb00000000.r1", "11.5.8.0.sb00000000.r1"],
2454        _ => vec![],
2455    };
2456
2457    if !supported_versions.contains(&engine_version) {
2458        return Err(AwsServiceError::aws_error(
2459            StatusCode::BAD_REQUEST,
2460            "InvalidParameterValue",
2461            format!("EngineVersion '{engine_version}' is not supported yet."),
2462        ));
2463    }
2464    validate_db_instance_class(db_instance_class)?;
2465    Ok(())
2466}
2467
2468fn validate_db_instance_class(db_instance_class: &str) -> Result<(), AwsServiceError> {
2469    if !crate::state::SUPPORTED_INSTANCE_CLASSES.contains(&db_instance_class) {
2470        return Err(AwsServiceError::aws_error(
2471            StatusCode::BAD_REQUEST,
2472            "InvalidParameterValue",
2473            format!("DBInstanceClass '{}' is not supported.", db_instance_class),
2474        ));
2475    }
2476    Ok(())
2477}
2478
2479fn filter_engine_versions(
2480    versions: &[EngineVersionInfo],
2481    engine: &Option<String>,
2482    engine_version: &Option<String>,
2483    family: &Option<String>,
2484) -> Vec<EngineVersionInfo> {
2485    versions
2486        .iter()
2487        .filter(|candidate| {
2488            engine
2489                .as_ref()
2490                .is_none_or(|expected| candidate.engine == *expected)
2491        })
2492        .filter(|candidate| {
2493            engine_version
2494                .as_ref()
2495                .is_none_or(|expected| candidate.engine_version == *expected)
2496        })
2497        .filter(|candidate| {
2498            family
2499                .as_ref()
2500                .is_none_or(|expected| candidate.db_parameter_group_family == *expected)
2501        })
2502        .cloned()
2503        .collect()
2504}
2505
2506fn filter_orderable_options(
2507    options: &[OrderableDbInstanceOption],
2508    engine: &Option<String>,
2509    engine_version: &Option<String>,
2510    db_instance_class: &Option<String>,
2511    license_model: &Option<String>,
2512    vpc: Option<bool>,
2513) -> Vec<OrderableDbInstanceOption> {
2514    options
2515        .iter()
2516        .filter(|candidate| {
2517            engine
2518                .as_ref()
2519                .is_none_or(|expected| candidate.engine == *expected)
2520        })
2521        .filter(|candidate| {
2522            engine_version
2523                .as_ref()
2524                .is_none_or(|expected| candidate.engine_version == *expected)
2525        })
2526        .filter(|candidate| {
2527            db_instance_class
2528                .as_ref()
2529                .is_none_or(|expected| candidate.db_instance_class == *expected)
2530        })
2531        .filter(|candidate| {
2532            license_model
2533                .as_ref()
2534                .is_none_or(|expected| candidate.license_model == *expected)
2535        })
2536        .filter(|_| vpc.unwrap_or(true))
2537        .cloned()
2538        .collect()
2539}
2540
2541/// Build a `DbInstance` for a newly-created read replica, copying the
2542/// source instance's physical attributes and binding the replica's
2543/// identifier, ARN, resource id, container id and host port.
2544#[allow(clippy::too_many_arguments)]
2545/// Build a `DbInstance` from a restored snapshot. Copies the physical
2546/// attributes off the snapshot and binds the new instance's identifier,
2547/// ARN, resource id, container id and host port.
2548fn build_restored_instance(
2549    db_instance_identifier: &str,
2550    db_instance_arn: String,
2551    dbi_resource_id: String,
2552    created_at: chrono::DateTime<Utc>,
2553    vpc_security_group_ids: Vec<String>,
2554    snapshot: &DbSnapshot,
2555    running: &crate::runtime::RunningDbContainer,
2556) -> DbInstance {
2557    DbInstance {
2558        db_instance_identifier: db_instance_identifier.to_string(),
2559        db_instance_arn,
2560        db_instance_class: "db.t3.micro".to_string(),
2561        engine: snapshot.engine.clone(),
2562        engine_version: snapshot.engine_version.clone(),
2563        db_instance_status: "available".to_string(),
2564        master_username: snapshot.master_username.clone(),
2565        db_name: snapshot.db_name.clone(),
2566        endpoint_address: "127.0.0.1".to_string(),
2567        port: i32::from(running.host_port),
2568        allocated_storage: snapshot.allocated_storage,
2569        publicly_accessible: true,
2570        deletion_protection: false,
2571        created_at,
2572        dbi_resource_id,
2573        master_user_password: snapshot.master_user_password.clone(),
2574        container_id: running.container_id.clone(),
2575        host_port: running.host_port,
2576        tags: Vec::new(),
2577        read_replica_source_db_instance_identifier: None,
2578        read_replica_db_instance_identifiers: Vec::new(),
2579        vpc_security_group_ids,
2580        db_parameter_group_name: None,
2581        backup_retention_period: 1,
2582        preferred_backup_window: "03:00-04:00".to_string(),
2583        latest_restorable_time: Some(created_at),
2584        option_group_name: None,
2585        multi_az: false,
2586        pending_modified_values: None,
2587    }
2588}
2589
2590fn build_read_replica_instance(
2591    db_instance_identifier: &str,
2592    db_instance_arn: String,
2593    dbi_resource_id: String,
2594    created_at: chrono::DateTime<Utc>,
2595    source_db_instance_identifier: &str,
2596    source: &DbInstance,
2597    running: &crate::runtime::RunningDbContainer,
2598) -> DbInstance {
2599    DbInstance {
2600        db_instance_identifier: db_instance_identifier.to_string(),
2601        db_instance_arn,
2602        db_instance_class: source.db_instance_class.clone(),
2603        engine: source.engine.clone(),
2604        engine_version: source.engine_version.clone(),
2605        db_instance_status: "available".to_string(),
2606        master_username: source.master_username.clone(),
2607        db_name: source.db_name.clone(),
2608        endpoint_address: "127.0.0.1".to_string(),
2609        port: i32::from(running.host_port),
2610        allocated_storage: source.allocated_storage,
2611        publicly_accessible: source.publicly_accessible,
2612        deletion_protection: false,
2613        created_at,
2614        dbi_resource_id,
2615        master_user_password: source.master_user_password.clone(),
2616        container_id: running.container_id.clone(),
2617        host_port: running.host_port,
2618        tags: Vec::new(),
2619        read_replica_source_db_instance_identifier: Some(source_db_instance_identifier.to_string()),
2620        read_replica_db_instance_identifiers: Vec::new(),
2621        vpc_security_group_ids: source.vpc_security_group_ids.clone(),
2622        db_parameter_group_name: source.db_parameter_group_name.clone(),
2623        backup_retention_period: source.backup_retention_period,
2624        preferred_backup_window: source.preferred_backup_window.clone(),
2625        latest_restorable_time: if source.backup_retention_period > 0 {
2626            Some(created_at)
2627        } else {
2628            None
2629        },
2630        option_group_name: source.option_group_name.clone(),
2631        multi_az: source.multi_az,
2632        pending_modified_values: None,
2633    }
2634}
2635
2636fn xml_wrap(action: &str, inner: &str, request_id: &str) -> String {
2637    fakecloud_core::query::query_response_xml(action, RDS_NS, inner, request_id)
2638}
2639
2640fn engine_version_xml(version: &EngineVersionInfo) -> String {
2641    format!(
2642        "<DBEngineVersion>\
2643         <Engine>{}</Engine>\
2644         <EngineVersion>{}</EngineVersion>\
2645         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2646         <DBEngineDescription>{}</DBEngineDescription>\
2647         <DBEngineVersionDescription>{}</DBEngineVersionDescription>\
2648         <Status>{}</Status>\
2649         </DBEngineVersion>",
2650        xml_escape(&version.engine),
2651        xml_escape(&version.engine_version),
2652        xml_escape(&version.db_parameter_group_family),
2653        xml_escape(&version.db_engine_description),
2654        xml_escape(&version.db_engine_version_description),
2655        xml_escape(&version.status),
2656    )
2657}
2658
2659fn orderable_option_xml(option: &OrderableDbInstanceOption) -> String {
2660    format!(
2661        "<OrderableDBInstanceOption>\
2662         <Engine>{}</Engine>\
2663         <EngineVersion>{}</EngineVersion>\
2664         <DBInstanceClass>{}</DBInstanceClass>\
2665         <LicenseModel>{}</LicenseModel>\
2666         <AvailabilityZones><AvailabilityZone><Name>us-east-1a</Name></AvailabilityZone></AvailabilityZones>\
2667         <MultiAZCapable>true</MultiAZCapable>\
2668         <ReadReplicaCapable>true</ReadReplicaCapable>\
2669         <Vpc>true</Vpc>\
2670         <SupportsStorageEncryption>true</SupportsStorageEncryption>\
2671         <StorageType>{}</StorageType>\
2672         <SupportsIops>false</SupportsIops>\
2673         <MinStorageSize>{}</MinStorageSize>\
2674         <MaxStorageSize>{}</MaxStorageSize>\
2675         <SupportsIAMDatabaseAuthentication>true</SupportsIAMDatabaseAuthentication>\
2676         </OrderableDBInstanceOption>",
2677        xml_escape(&option.engine),
2678        xml_escape(&option.engine_version),
2679        xml_escape(&option.db_instance_class),
2680        xml_escape(&option.license_model),
2681        xml_escape(&option.storage_type),
2682        option.min_storage_size,
2683        option.max_storage_size,
2684    )
2685}
2686
2687fn tag_xml(tag: &RdsTag) -> String {
2688    format!(
2689        "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
2690        xml_escape(&tag.key),
2691        xml_escape(&tag.value),
2692    )
2693}
2694
2695fn db_instance_xml(instance: &DbInstance, status_override: Option<&str>) -> String {
2696    let status = status_override.unwrap_or(&instance.db_instance_status);
2697    let db_name_xml = instance
2698        .db_name
2699        .as_ref()
2700        .map(|db_name| format!("<DBName>{}</DBName>", xml_escape(db_name)))
2701        .unwrap_or_default();
2702
2703    let read_replica_source_xml = instance
2704        .read_replica_source_db_instance_identifier
2705        .as_ref()
2706        .map(|source| {
2707            format!(
2708                "<ReadReplicaSourceDBInstanceIdentifier>{}</ReadReplicaSourceDBInstanceIdentifier>",
2709                xml_escape(source)
2710            )
2711        })
2712        .unwrap_or_default();
2713
2714    let read_replica_identifiers_xml = if instance.read_replica_db_instance_identifiers.is_empty() {
2715        "<ReadReplicaDBInstanceIdentifiers/>".to_string()
2716    } else {
2717        format!(
2718            "<ReadReplicaDBInstanceIdentifiers>{}</ReadReplicaDBInstanceIdentifiers>",
2719            instance
2720                .read_replica_db_instance_identifiers
2721                .iter()
2722                .map(|id| format!(
2723                    "<ReadReplicaDBInstanceIdentifier>{}</ReadReplicaDBInstanceIdentifier>",
2724                    xml_escape(id)
2725                ))
2726                .collect::<String>()
2727        )
2728    };
2729
2730    let vpc_security_groups_xml = if instance.vpc_security_group_ids.is_empty() {
2731        "<VpcSecurityGroups/>".to_string()
2732    } else {
2733        format!(
2734            "<VpcSecurityGroups>{}</VpcSecurityGroups>",
2735            instance
2736                .vpc_security_group_ids
2737                .iter()
2738                .map(|sg_id| format!(
2739                    "<VpcSecurityGroupMembership>\
2740                     <VpcSecurityGroupId>{}</VpcSecurityGroupId>\
2741                     <Status>active</Status>\
2742                     </VpcSecurityGroupMembership>",
2743                    xml_escape(sg_id)
2744                ))
2745                .collect::<String>()
2746        )
2747    };
2748
2749    let db_parameter_groups_xml = match &instance.db_parameter_group_name {
2750        Some(pg_name) => format!(
2751            "<DBParameterGroups>\
2752             <DBParameterGroup>\
2753             <DBParameterGroupName>{}</DBParameterGroupName>\
2754             <ParameterApplyStatus>in-sync</ParameterApplyStatus>\
2755             </DBParameterGroup>\
2756             </DBParameterGroups>",
2757            xml_escape(pg_name)
2758        ),
2759        None => "<DBParameterGroups/>".to_string(),
2760    };
2761
2762    let option_group_memberships_xml = match &instance.option_group_name {
2763        Some(og_name) => format!(
2764            "<OptionGroupMemberships>\
2765             <OptionGroupMembership>\
2766             <OptionGroupName>{}</OptionGroupName>\
2767             <Status>in-sync</Status>\
2768             </OptionGroupMembership>\
2769             </OptionGroupMemberships>",
2770            xml_escape(og_name)
2771        ),
2772        None => "<OptionGroupMemberships/>".to_string(),
2773    };
2774
2775    let pending_modified_values_xml = if let Some(ref pending) = instance.pending_modified_values {
2776        let mut fields = Vec::new();
2777        if let Some(ref class) = pending.db_instance_class {
2778            fields.push(format!(
2779                "<DBInstanceClass>{}</DBInstanceClass>",
2780                xml_escape(class)
2781            ));
2782        }
2783        if let Some(allocated_storage) = pending.allocated_storage {
2784            fields.push(format!(
2785                "<AllocatedStorage>{}</AllocatedStorage>",
2786                allocated_storage
2787            ));
2788        }
2789        if let Some(backup_retention_period) = pending.backup_retention_period {
2790            fields.push(format!(
2791                "<BackupRetentionPeriod>{}</BackupRetentionPeriod>",
2792                backup_retention_period
2793            ));
2794        }
2795        if let Some(multi_az) = pending.multi_az {
2796            fields.push(format!(
2797                "<MultiAZ>{}</MultiAZ>",
2798                if multi_az { "true" } else { "false" }
2799            ));
2800        }
2801        if let Some(ref engine_version) = pending.engine_version {
2802            fields.push(format!(
2803                "<EngineVersion>{}</EngineVersion>",
2804                xml_escape(engine_version)
2805            ));
2806        }
2807        if pending.master_user_password.is_some() {
2808            fields.push("<MasterUserPassword>****</MasterUserPassword>".to_string());
2809        }
2810        if !fields.is_empty() {
2811            format!(
2812                "<PendingModifiedValues>{}</PendingModifiedValues>",
2813                fields.join("")
2814            )
2815        } else {
2816            String::new()
2817        }
2818    } else {
2819        String::new()
2820    };
2821
2822    let latest_restorable_time_xml = instance
2823        .latest_restorable_time
2824        .map(|t| {
2825            format!(
2826                "<LatestRestorableTime>{}</LatestRestorableTime>",
2827                t.to_rfc3339()
2828            )
2829        })
2830        .unwrap_or_default();
2831
2832    format!(
2833        "<DBInstanceIdentifier>{identifier}</DBInstanceIdentifier>\
2834         <DBInstanceClass>{class}</DBInstanceClass>\
2835         <Engine>{engine}</Engine>\
2836         <DBInstanceStatus>{status}</DBInstanceStatus>\
2837         <MasterUsername>{master_username}</MasterUsername>\
2838         {db_name_xml}\
2839         <Endpoint><Address>{endpoint_address}</Address><Port>{port}</Port></Endpoint>\
2840         <AllocatedStorage>{allocated_storage}</AllocatedStorage>\
2841         <InstanceCreateTime>{create_time}</InstanceCreateTime>\
2842         <PreferredBackupWindow>{preferred_backup_window}</PreferredBackupWindow>\
2843         <BackupRetentionPeriod>{backup_retention_period}</BackupRetentionPeriod>\
2844         <DBSecurityGroups/>\
2845         {vpc_security_groups_xml}\
2846         {db_parameter_groups_xml}\
2847         <AvailabilityZone>us-east-1a</AvailabilityZone>\
2848         {latest_restorable_time_xml}\
2849         <PreferredMaintenanceWindow>sun:00:00-sun:00:30</PreferredMaintenanceWindow>\
2850         <MultiAZ>{multi_az}</MultiAZ>\
2851         <EngineVersion>{engine_version}</EngineVersion>\
2852         <AutoMinorVersionUpgrade>true</AutoMinorVersionUpgrade>\
2853         {read_replica_identifiers_xml}\
2854         {read_replica_source_xml}\
2855         <LicenseModel>{license_model}</LicenseModel>\
2856         {option_group_memberships_xml}\
2857         <PubliclyAccessible>{publicly_accessible}</PubliclyAccessible>\
2858         <StorageType>gp2</StorageType>\
2859         <DbInstancePort>{port}</DbInstancePort>\
2860         <StorageEncrypted>false</StorageEncrypted>\
2861         <DbiResourceId>{dbi_resource_id}</DbiResourceId>\
2862         <DeletionProtection>{deletion_protection}</DeletionProtection>\
2863         {pending_modified_values_xml}\
2864         <DBInstanceArn>{arn}</DBInstanceArn>",
2865        identifier = xml_escape(&instance.db_instance_identifier),
2866        class = xml_escape(&instance.db_instance_class),
2867        engine = xml_escape(&instance.engine),
2868        status = xml_escape(status),
2869        master_username = xml_escape(&instance.master_username),
2870        endpoint_address = xml_escape(&instance.endpoint_address),
2871        port = instance.port,
2872        allocated_storage = instance.allocated_storage,
2873        create_time = instance.created_at.to_rfc3339(),
2874        preferred_backup_window = xml_escape(&instance.preferred_backup_window),
2875        backup_retention_period = instance.backup_retention_period,
2876        multi_az = if instance.multi_az { "true" } else { "false" },
2877        engine_version = xml_escape(&instance.engine_version),
2878        license_model = license_model_for_engine(&instance.engine),
2879        publicly_accessible = if instance.publicly_accessible {
2880            "true"
2881        } else {
2882            "false"
2883        },
2884        dbi_resource_id = xml_escape(&instance.dbi_resource_id),
2885        deletion_protection = if instance.deletion_protection {
2886            "true"
2887        } else {
2888            "false"
2889        },
2890        arn = xml_escape(&instance.db_instance_arn),
2891    )
2892}
2893
2894fn db_snapshot_xml(snapshot: &DbSnapshot) -> String {
2895    format!(
2896        "<DBSnapshotIdentifier>{}</DBSnapshotIdentifier>\
2897         <DBInstanceIdentifier>{}</DBInstanceIdentifier>\
2898         <SnapshotCreateTime>{}</SnapshotCreateTime>\
2899         <Engine>{}</Engine>\
2900         <EngineVersion>{}</EngineVersion>\
2901         <AllocatedStorage>{}</AllocatedStorage>\
2902         <Status>{}</Status>\
2903         <Port>{}</Port>\
2904         <MasterUsername>{}</MasterUsername>\
2905         {}\
2906         <DbiResourceId>{}</DbiResourceId>\
2907         <SnapshotType>{}</SnapshotType>\
2908         <DBSnapshotArn>{}</DBSnapshotArn>",
2909        xml_escape(&snapshot.db_snapshot_identifier),
2910        xml_escape(&snapshot.db_instance_identifier),
2911        snapshot.snapshot_create_time.to_rfc3339(),
2912        xml_escape(&snapshot.engine),
2913        xml_escape(&snapshot.engine_version),
2914        snapshot.allocated_storage,
2915        xml_escape(&snapshot.status),
2916        snapshot.port,
2917        xml_escape(&snapshot.master_username),
2918        snapshot
2919            .db_name
2920            .as_ref()
2921            .map(|name| format!("<DBName>{}</DBName>", xml_escape(name)))
2922            .unwrap_or_default(),
2923        xml_escape(&snapshot.dbi_resource_id),
2924        xml_escape(&snapshot.snapshot_type),
2925        xml_escape(&snapshot.db_snapshot_arn),
2926    )
2927}
2928
2929fn db_subnet_group_xml(subnet_group: &DbSubnetGroup) -> String {
2930    let subnets_xml = subnet_group
2931        .subnet_ids
2932        .iter()
2933        .zip(&subnet_group.subnet_availability_zones)
2934        .map(|(subnet_id, az)| {
2935            format!(
2936                "<Subnet>\
2937                 <SubnetIdentifier>{}</SubnetIdentifier>\
2938                 <SubnetAvailabilityZone><Name>{}</Name></SubnetAvailabilityZone>\
2939                 <SubnetStatus>Active</SubnetStatus>\
2940                 </Subnet>",
2941                xml_escape(subnet_id),
2942                xml_escape(az)
2943            )
2944        })
2945        .collect::<String>();
2946
2947    format!(
2948        "<DBSubnetGroupName>{}</DBSubnetGroupName>\
2949         <DBSubnetGroupDescription>{}</DBSubnetGroupDescription>\
2950         <VpcId>{}</VpcId>\
2951         <SubnetGroupStatus>Complete</SubnetGroupStatus>\
2952         <Subnets>{}</Subnets>\
2953         <DBSubnetGroupArn>{}</DBSubnetGroupArn>",
2954        xml_escape(&subnet_group.db_subnet_group_name),
2955        xml_escape(&subnet_group.db_subnet_group_description),
2956        xml_escape(&subnet_group.vpc_id),
2957        subnets_xml,
2958        xml_escape(&subnet_group.db_subnet_group_arn),
2959    )
2960}
2961
2962fn db_parameter_group_xml(parameter_group: &DbParameterGroup) -> String {
2963    format!(
2964        "<DBParameterGroupName>{}</DBParameterGroupName>\
2965         <DBParameterGroupFamily>{}</DBParameterGroupFamily>\
2966         <Description>{}</Description>\
2967         <DBParameterGroupArn>{}</DBParameterGroupArn>",
2968        xml_escape(&parameter_group.db_parameter_group_name),
2969        xml_escape(&parameter_group.db_parameter_group_family),
2970        xml_escape(&parameter_group.description),
2971        xml_escape(&parameter_group.db_parameter_group_arn),
2972    )
2973}
2974
2975fn db_instance_not_found(identifier: &str) -> AwsServiceError {
2976    AwsServiceError::aws_error(
2977        StatusCode::NOT_FOUND,
2978        "DBInstanceNotFound",
2979        format!("DBInstance {} not found.", identifier),
2980    )
2981}
2982
2983fn db_snapshot_not_found(identifier: &str) -> AwsServiceError {
2984    AwsServiceError::aws_error(
2985        StatusCode::NOT_FOUND,
2986        "DBSnapshotNotFound",
2987        format!("DBSnapshot {} not found.", identifier),
2988    )
2989}
2990
2991fn db_instance_not_found_by_arn(resource_name: &str) -> AwsServiceError {
2992    AwsServiceError::aws_error(
2993        StatusCode::NOT_FOUND,
2994        "DBInstanceNotFound",
2995        format!("DBInstance {resource_name} not found."),
2996    )
2997}
2998
2999fn find_instance_by_arn<'a>(
3000    state: &'a crate::state::RdsState,
3001    resource_name: &str,
3002) -> Result<&'a DbInstance, AwsServiceError> {
3003    state
3004        .instances
3005        .values()
3006        .find(|instance| instance.db_instance_arn == resource_name)
3007        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3008}
3009
3010fn find_instance_by_arn_mut<'a>(
3011    state: &'a mut crate::state::RdsState,
3012    resource_name: &str,
3013) -> Result<&'a mut DbInstance, AwsServiceError> {
3014    state
3015        .instances
3016        .values_mut()
3017        .find(|instance| instance.db_instance_arn == resource_name)
3018        .ok_or_else(|| db_instance_not_found_by_arn(resource_name))
3019}
3020
3021fn merge_tags(existing: &mut Vec<RdsTag>, incoming: &[RdsTag]) {
3022    for tag in incoming {
3023        if let Some(existing_tag) = existing
3024            .iter_mut()
3025            .find(|candidate| candidate.key == tag.key)
3026        {
3027            existing_tag.value = tag.value.clone();
3028        } else {
3029            existing.push(tag.clone());
3030        }
3031    }
3032}
3033
3034fn license_model_for_engine(engine: &str) -> &'static str {
3035    // Match AWS's reported license model exactly. Oracle and SQL Server
3036    // both use the BYOL/license-included split; fakecloud reports
3037    // license-included since the upstream dev-edition images are
3038    // free-to-use. Db2 is reported as bring-your-own-license to mirror
3039    // AWS's RDS for Db2 default.
3040    match engine {
3041        "mysql" | "mariadb" => "general-public-license",
3042        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "license-included",
3043        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "license-included",
3044        "db2-se" | "db2-ae" => "bring-your-own-license",
3045        _ => "postgresql-license",
3046    }
3047}
3048
3049fn default_db_name(engine: &str) -> &'static str {
3050    match engine {
3051        "mysql" | "mariadb" => "mysql",
3052        // Oracle's gvenzl image creates an `ORACLE_DATABASE` alongside
3053        // the built-in FREEPDB1 — keep `ORCL` as the default name to
3054        // match what AWS RDS for Oracle returns when you don't pass
3055        // `DBName`.
3056        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => "ORCL",
3057        // SQL Server installs system DBs by default; AWS doesn't
3058        // create a user DB unless `DBName` is supplied. Use `master`
3059        // as the default the SDK can connect to.
3060        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => "master",
3061        "db2-se" | "db2-ae" => "BLUDB",
3062        _ => "postgres",
3063    }
3064}
3065
3066/// Pick the port AWS defaults to for a freshly-created instance of
3067/// `engine`. Mirrors the AWS RDS defaults so client SDKs that connect
3068/// without an explicit `--port` flag hit the right listener.
3069fn default_port_for_engine(engine: &str) -> i32 {
3070    match engine {
3071        "postgres" => 5432,
3072        "mysql" | "mariadb" => 3306,
3073        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => 1521,
3074        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => 1433,
3075        "db2-se" | "db2-ae" => 50000,
3076        _ => 5432,
3077    }
3078}
3079
3080/// Pick the built-in parameter group name AWS assigns to a new
3081/// instance when the caller doesn't override it. The name encodes the
3082/// engine family plus its major version (e.g. `default.postgres16`,
3083/// `default.mysql8.0`, `default.oracle-ee-23`, `default.sqlserver-ex-16`,
3084/// `default.db2-se-11.5`).
3085fn default_parameter_group(engine: &str, engine_version: &str) -> String {
3086    match engine {
3087        "postgres" => {
3088            let major = engine_version.split('.').next().unwrap_or("16");
3089            format!("default.postgres{}", major)
3090        }
3091        "mysql" => {
3092            let major = if engine_version.starts_with("5.7") {
3093                "5.7"
3094            } else {
3095                "8.0"
3096            };
3097            format!("default.mysql{}", major)
3098        }
3099        "mariadb" => {
3100            let major = if engine_version.starts_with("10.11") {
3101                "10.11"
3102            } else {
3103                "10.6"
3104            };
3105            format!("default.mariadb{}", major)
3106        }
3107        "oracle-ee" | "oracle-se2" | "oracle-ee-cdb" | "oracle-se2-cdb" => {
3108            let major = engine_version.split('.').next().unwrap_or("23");
3109            format!("default.{engine}-{major}")
3110        }
3111        "sqlserver-ee" | "sqlserver-se" | "sqlserver-ex" | "sqlserver-web" => {
3112            // AWS uses the SQL Server major-version number ("16" for
3113            // 2022, "15" for 2019) in the default parameter group.
3114            let major = engine_version.split('.').next().unwrap_or("16");
3115            format!("default.{engine}-{major}")
3116        }
3117        "db2-se" | "db2-ae" => {
3118            // Db2 ships major.minor as the parameter-group key
3119            // (e.g. `default.db2-se-11.5`).
3120            let mut parts = engine_version.split('.');
3121            let major = parts.next().unwrap_or("11");
3122            let minor = parts.next().unwrap_or("5");
3123            format!("default.{engine}-{major}.{minor}")
3124        }
3125        _ => "default.postgres16".to_string(),
3126    }
3127}
3128
3129fn runtime_error_to_service_error(error: RuntimeError) -> AwsServiceError {
3130    match error {
3131        RuntimeError::Unavailable => AwsServiceError::aws_error(
3132            StatusCode::SERVICE_UNAVAILABLE,
3133            "InvalidParameterValue",
3134            "Docker/Podman is required for RDS DB instances but is not available",
3135        ),
3136        RuntimeError::ContainerStartFailed(message) => AwsServiceError::aws_error(
3137            StatusCode::INTERNAL_SERVER_ERROR,
3138            "InternalFailure",
3139            message,
3140        ),
3141    }
3142}
3143
3144#[cfg(test)]
3145mod tests {
3146    use std::collections::HashMap;
3147    use std::sync::Arc;
3148
3149    use bytes::Bytes;
3150    use chrono::Utc;
3151    use http::{HeaderMap, Method};
3152    use parking_lot::RwLock;
3153    use uuid::Uuid;
3154
3155    use super::{
3156        db_instance_xml, default_db_name, default_parameter_group, default_port_for_engine,
3157        filter_engine_versions, filter_orderable_options, license_model_for_engine, merge_tags,
3158        optional_i32_param, parse_tag_keys, parse_tags, validate_create_request, RdsService,
3159        RdsSourceType,
3160    };
3161    use crate::state::{default_engine_versions, default_orderable_options, DbInstance, RdsTag};
3162    use fakecloud_core::delivery::DeliveryBus;
3163    use fakecloud_core::service::{AwsRequest, AwsService, AwsServiceError};
3164
3165    #[test]
3166    fn default_port_matches_aws_for_each_engine() {
3167        assert_eq!(default_port_for_engine("postgres"), 5432);
3168        assert_eq!(default_port_for_engine("mysql"), 3306);
3169        assert_eq!(default_port_for_engine("mariadb"), 3306);
3170        assert_eq!(default_port_for_engine("oracle-ee"), 1521);
3171        assert_eq!(default_port_for_engine("oracle-se2"), 1521);
3172        assert_eq!(default_port_for_engine("sqlserver-ee"), 1433);
3173        assert_eq!(default_port_for_engine("sqlserver-ex"), 1433);
3174        assert_eq!(default_port_for_engine("db2-se"), 50000);
3175        assert_eq!(default_port_for_engine("db2-ae"), 50000);
3176    }
3177
3178    #[test]
3179    fn default_parameter_group_uses_engine_major_version() {
3180        assert_eq!(
3181            default_parameter_group("postgres", "16.3"),
3182            "default.postgres16"
3183        );
3184        assert_eq!(
3185            default_parameter_group("mysql", "8.0.35"),
3186            "default.mysql8.0"
3187        );
3188        assert_eq!(
3189            default_parameter_group("oracle-ee", "23.0.0"),
3190            "default.oracle-ee-23"
3191        );
3192        assert_eq!(
3193            default_parameter_group("sqlserver-ex", "16.00.4085.2.v1"),
3194            "default.sqlserver-ex-16"
3195        );
3196        assert_eq!(
3197            default_parameter_group("db2-se", "11.5.9.0.sb00000000.r1"),
3198            "default.db2-se-11.5"
3199        );
3200    }
3201
3202    #[test]
3203    fn license_model_reflects_engine_class() {
3204        assert_eq!(license_model_for_engine("postgres"), "postgresql-license");
3205        assert_eq!(license_model_for_engine("mysql"), "general-public-license");
3206        assert_eq!(license_model_for_engine("oracle-ee"), "license-included");
3207        assert_eq!(license_model_for_engine("sqlserver-se"), "license-included");
3208        assert_eq!(license_model_for_engine("db2-ae"), "bring-your-own-license");
3209    }
3210
3211    #[test]
3212    fn default_db_name_picks_per_engine_default() {
3213        assert_eq!(default_db_name("postgres"), "postgres");
3214        assert_eq!(default_db_name("mysql"), "mysql");
3215        assert_eq!(default_db_name("oracle-ee"), "ORCL");
3216        assert_eq!(default_db_name("sqlserver-ex"), "master");
3217        assert_eq!(default_db_name("db2-se"), "BLUDB");
3218    }
3219
3220    #[test]
3221    fn validate_create_request_accepts_new_engines() {
3222        for (engine, version, port) in [
3223            ("oracle-ee", "23.0.0", 1521),
3224            ("sqlserver-ex", "16.00.4085.2.v1", 1433),
3225            ("db2-se", "11.5.9.0.sb00000000.r1", 50000),
3226        ] {
3227            validate_create_request("test-db", 20, "db.t3.micro", engine, version, port)
3228                .expect("engine should be accepted");
3229        }
3230    }
3231
3232    #[test]
3233    fn validate_create_request_rejects_unsupported_engine_version() {
3234        let err =
3235            validate_create_request("test-db", 20, "db.t3.micro", "oracle-ee", "12.0.0", 1521)
3236                .expect_err("12.x is not in the supported list");
3237        let msg = format!("{err:?}");
3238        assert!(msg.contains("EngineVersion"), "unexpected: {msg}");
3239    }
3240
3241    #[test]
3242    fn filter_engine_versions_matches_requested_engine() {
3243        let versions = default_engine_versions();
3244
3245        let filtered =
3246            filter_engine_versions(&versions, &Some("postgres".to_string()), &None, &None);
3247
3248        assert_eq!(filtered.len(), 4); // All postgres versions
3249        assert!(filtered.iter().all(|v| v.engine == "postgres"));
3250    }
3251
3252    #[test]
3253    fn filter_orderable_options_respects_instance_class() {
3254        let options = default_orderable_options();
3255
3256        let filtered = filter_orderable_options(
3257            &options,
3258            &Some("postgres".to_string()),
3259            &Some("16.3".to_string()),
3260            &Some("db.t3.micro".to_string()),
3261            &None,
3262            Some(true),
3263        );
3264
3265        assert_eq!(filtered.len(), 1);
3266        assert_eq!(filtered[0].db_instance_class, "db.t3.micro");
3267    }
3268
3269    #[test]
3270    fn validate_create_request_rejects_unsupported_engine() {
3271        let error = validate_create_request("test-db", 20, "db.t3.micro", "mysql", "16.3", 5432)
3272            .expect_err("unsupported engine");
3273
3274        assert_eq!(error.code(), "InvalidParameterValue");
3275    }
3276
3277    #[test]
3278    fn optional_i32_param_rejects_invalid_integer() {
3279        let request = request("CreateDBInstance", &[("Port", "not-a-number")]);
3280
3281        let error = optional_i32_param(&request, "Port").expect_err("invalid port");
3282
3283        assert_eq!(error.code(), "InvalidParameterValue");
3284    }
3285
3286    #[test]
3287    fn db_instance_xml_renders_endpoint_and_status() {
3288        let created_at = Utc::now();
3289        let instance = DbInstance {
3290            db_instance_identifier: "test-db".to_string(),
3291            db_instance_arn: "arn:aws:rds:us-east-1:123456789012:db:test-db".to_string(),
3292            db_instance_class: "db.t3.micro".to_string(),
3293            engine: "postgres".to_string(),
3294            engine_version: "16.3".to_string(),
3295            db_instance_status: "available".to_string(),
3296            master_username: "admin".to_string(),
3297            db_name: Some("appdb".to_string()),
3298            endpoint_address: "127.0.0.1".to_string(),
3299            port: 15432,
3300            allocated_storage: 20,
3301            publicly_accessible: true,
3302            deletion_protection: false,
3303            created_at,
3304            dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3305            master_user_password: "secret123".to_string(),
3306            container_id: "container".to_string(),
3307            host_port: 15432,
3308            tags: Vec::new(),
3309            read_replica_source_db_instance_identifier: None,
3310            read_replica_db_instance_identifiers: Vec::new(),
3311            vpc_security_group_ids: vec!["sg-12345678".to_string()],
3312            db_parameter_group_name: Some("default.postgres16".to_string()),
3313            backup_retention_period: 1,
3314            preferred_backup_window: "03:00-04:00".to_string(),
3315            latest_restorable_time: Some(created_at),
3316            option_group_name: None,
3317            multi_az: false,
3318            pending_modified_values: None,
3319        };
3320
3321        let xml = db_instance_xml(&instance, Some("creating"));
3322
3323        assert!(xml.contains("<DBInstanceIdentifier>test-db</DBInstanceIdentifier>"));
3324        assert!(xml.contains("<DBInstanceStatus>creating</DBInstanceStatus>"));
3325        assert!(xml.contains("<Address>127.0.0.1</Address><Port>15432</Port>"));
3326    }
3327
3328    #[test]
3329    fn parse_tags_reads_rds_query_shape() {
3330        let request = request(
3331            "AddTagsToResource",
3332            &[
3333                ("Tags.Tag.1.Key", "env"),
3334                ("Tags.Tag.1.Value", "dev"),
3335                ("Tags.Tag.2.Key", "team"),
3336                ("Tags.Tag.2.Value", "core"),
3337            ],
3338        );
3339
3340        let tags = parse_tags(&request).expect("tags");
3341
3342        assert_eq!(
3343            tags,
3344            vec![
3345                RdsTag {
3346                    key: "env".to_string(),
3347                    value: "dev".to_string(),
3348                },
3349                RdsTag {
3350                    key: "team".to_string(),
3351                    value: "core".to_string(),
3352                }
3353            ]
3354        );
3355    }
3356
3357    #[test]
3358    fn parse_tag_keys_reads_member_shape() {
3359        let request = request(
3360            "RemoveTagsFromResource",
3361            &[("TagKeys.member.1", "env"), ("TagKeys.member.2", "team")],
3362        );
3363
3364        let tag_keys = parse_tag_keys(&request).expect("tag keys");
3365
3366        assert_eq!(tag_keys, vec!["env".to_string(), "team".to_string()]);
3367    }
3368
3369    #[test]
3370    fn merge_tags_updates_existing_values() {
3371        let mut tags = vec![RdsTag {
3372            key: "env".to_string(),
3373            value: "dev".to_string(),
3374        }];
3375
3376        merge_tags(
3377            &mut tags,
3378            &[
3379                RdsTag {
3380                    key: "env".to_string(),
3381                    value: "prod".to_string(),
3382                },
3383                RdsTag {
3384                    key: "team".to_string(),
3385                    value: "core".to_string(),
3386                },
3387            ],
3388        );
3389
3390        assert_eq!(tags.len(), 2);
3391        assert_eq!(tags[0].value, "prod");
3392        assert_eq!(tags[1].key, "team");
3393    }
3394
3395    #[tokio::test]
3396    async fn describe_engine_versions_returns_xml_body() {
3397        let service = RdsService::new(Arc::new(RwLock::new(
3398            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3399        )));
3400        let request = request("DescribeDBEngineVersions", &[("Engine", "postgres")]);
3401
3402        let response = service.handle(request).await.expect("response");
3403        let body = String::from_utf8(response.body.expect_bytes().to_vec()).expect("utf8");
3404
3405        assert!(body.contains("<DescribeDBEngineVersionsResponse"));
3406        assert!(body.contains("<Engine>postgres</Engine>"));
3407        assert!(body.contains("<DBParameterGroupFamily>postgres16</DBParameterGroupFamily>"));
3408    }
3409
3410    fn request(action: &str, params: &[(&str, &str)]) -> AwsRequest {
3411        let mut query_params = HashMap::from([("Action".to_string(), action.to_string())]);
3412        for (key, value) in params {
3413            query_params.insert((*key).to_string(), (*value).to_string());
3414        }
3415
3416        AwsRequest {
3417            service: "rds".to_string(),
3418            action: action.to_string(),
3419            region: "us-east-1".to_string(),
3420            account_id: "123456789012".to_string(),
3421            request_id: "test-request-id".to_string(),
3422            headers: HeaderMap::new(),
3423            query_params,
3424            body: Bytes::new(),
3425            body_stream: parking_lot::Mutex::new(None),
3426            path_segments: vec![],
3427            raw_path: "/".to_string(),
3428            raw_query: String::new(),
3429            method: Method::POST,
3430            is_query_protocol: true,
3431            access_key_id: None,
3432            principal: None,
3433        }
3434    }
3435
3436    // ── Helpers for handler tests ────────────────────────────────────
3437
3438    fn make_service() -> RdsService {
3439        RdsService::new(Arc::new(RwLock::new(
3440            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3441        )))
3442    }
3443
3444    #[derive(Default)]
3445    struct CapturedEvent {
3446        source: String,
3447        detail_type: String,
3448        detail: String,
3449    }
3450
3451    #[derive(Default)]
3452    struct RecordingEb {
3453        events: std::sync::Mutex<Vec<CapturedEvent>>,
3454    }
3455
3456    impl fakecloud_core::delivery::EventBridgeDelivery for RecordingEb {
3457        fn put_event(&self, source: &str, detail_type: &str, detail: &str, _bus: &str) {
3458            self.events.lock().unwrap().push(CapturedEvent {
3459                source: source.to_string(),
3460                detail_type: detail_type.to_string(),
3461                detail: detail.to_string(),
3462            });
3463        }
3464    }
3465
3466    fn make_service_with_recorder() -> (RdsService, Arc<RecordingEb>) {
3467        let recorder = Arc::new(RecordingEb::default());
3468        let bus = Arc::new(DeliveryBus::new().with_eventbridge(recorder.clone()));
3469        let svc = RdsService::new(Arc::new(RwLock::new(
3470            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3471        )))
3472        .with_delivery_bus(bus);
3473        (svc, recorder)
3474    }
3475
3476    #[test]
3477    fn emit_event_emits_aws_rds_event_via_bus() {
3478        let (svc, rec) = make_service_with_recorder();
3479        svc.emit_event(
3480            RdsSourceType::DbInstance,
3481            "my-db",
3482            "arn:aws:rds:us-east-1:123456789012:db:my-db",
3483            "RDS-EVENT-0005",
3484            &["creation"],
3485            "DB instance created",
3486        );
3487        let events = rec.events.lock().unwrap();
3488        assert_eq!(events.len(), 1);
3489        let e = &events[0];
3490        assert_eq!(e.source, "aws.rds");
3491        assert_eq!(e.detail_type, "RDS DB Instance Event");
3492        let detail: serde_json::Value = serde_json::from_str(&e.detail).unwrap();
3493        assert_eq!(detail["EventID"], "RDS-EVENT-0005");
3494        assert_eq!(detail["SourceType"], "DB_INSTANCE");
3495        assert_eq!(detail["SourceIdentifier"], "my-db");
3496        assert_eq!(detail["Message"], "DB instance created");
3497        assert_eq!(detail["EventCategories"][0], "creation");
3498    }
3499
3500    #[test]
3501    fn emit_event_no_op_without_bus() {
3502        let svc = make_service();
3503        svc.emit_event(
3504            RdsSourceType::DbSnapshot,
3505            "snap",
3506            "arn:aws:rds:us-east-1:123456789012:snapshot:snap",
3507            "RDS-EVENT-0042",
3508            &["creation"],
3509            "Manual snapshot created",
3510        );
3511    }
3512
3513    #[test]
3514    fn rds_source_type_detail_type_mapping() {
3515        assert_eq!(
3516            RdsSourceType::DbInstance.detail_type(),
3517            "RDS DB Instance Event"
3518        );
3519        assert_eq!(
3520            RdsSourceType::DbSnapshot.detail_type(),
3521            "RDS DB Snapshot Event"
3522        );
3523        assert_eq!(
3524            RdsSourceType::DbParameterGroup.detail_type(),
3525            "RDS DB Parameter Group Event"
3526        );
3527    }
3528
3529    fn body_of(resp: fakecloud_core::service::AwsResponse) -> String {
3530        String::from_utf8(resp.body.expect_bytes().to_vec()).expect("utf8")
3531    }
3532
3533    fn seed_instance(svc: &RdsService, identifier: &str) -> String {
3534        let arn = format!("arn:aws:rds:us-east-1:123456789012:db:{identifier}");
3535        let mut accounts = svc.state.write();
3536        let state = accounts.default_mut();
3537        state.instances.insert(
3538            identifier.to_string(),
3539            DbInstance {
3540                db_instance_identifier: identifier.to_string(),
3541                db_instance_arn: arn.clone(),
3542                db_instance_class: "db.t3.micro".to_string(),
3543                engine: "postgres".to_string(),
3544                engine_version: "16.3".to_string(),
3545                db_instance_status: "available".to_string(),
3546                master_username: "admin".to_string(),
3547                db_name: Some("appdb".to_string()),
3548                endpoint_address: "127.0.0.1".to_string(),
3549                port: 15432,
3550                allocated_storage: 20,
3551                publicly_accessible: true,
3552                deletion_protection: false,
3553                created_at: Utc::now(),
3554                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
3555                master_user_password: "secret".to_string(),
3556                container_id: "container".to_string(),
3557                host_port: 15432,
3558                tags: Vec::new(),
3559                read_replica_source_db_instance_identifier: None,
3560                read_replica_db_instance_identifiers: Vec::new(),
3561                vpc_security_group_ids: vec!["sg-12345678".to_string()],
3562                db_parameter_group_name: Some("default.postgres16".to_string()),
3563                backup_retention_period: 1,
3564                preferred_backup_window: "03:00-04:00".to_string(),
3565                latest_restorable_time: None,
3566                option_group_name: None,
3567                multi_az: false,
3568                pending_modified_values: None,
3569            },
3570        );
3571        arn
3572    }
3573
3574    fn assert_code<T>(result: Result<T, AwsServiceError>, expected_code: &str) -> AwsServiceError {
3575        match result {
3576            Ok(_) => panic!("expected error {expected_code}, got Ok"),
3577            Err(e) => {
3578                assert_eq!(e.code(), expected_code, "wrong error code");
3579                e
3580            }
3581        }
3582    }
3583
3584    // ── Tag operations ───────────────────────────────────────────────
3585
3586    #[test]
3587    fn add_tags_requires_resource_name() {
3588        let svc = make_service();
3589        let req = request("AddTagsToResource", &[]);
3590        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3591    }
3592
3593    #[test]
3594    fn add_tags_requires_at_least_one_tag() {
3595        let svc = make_service();
3596        let arn = seed_instance(&svc, "db1");
3597        let req = request("AddTagsToResource", &[("ResourceName", arn.as_str())]);
3598        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
3599    }
3600
3601    #[test]
3602    fn add_tags_appends_then_list_tags_returns_them() {
3603        let svc = make_service();
3604        let arn = seed_instance(&svc, "db1");
3605        let add_req = request(
3606            "AddTagsToResource",
3607            &[
3608                ("ResourceName", arn.as_str()),
3609                ("Tags.Tag.1.Key", "env"),
3610                ("Tags.Tag.1.Value", "dev"),
3611            ],
3612        );
3613        svc.add_tags_to_resource(&add_req).unwrap();
3614
3615        let list_req = request("ListTagsForResource", &[("ResourceName", arn.as_str())]);
3616        let body = body_of(svc.list_tags_for_resource(&list_req).unwrap());
3617        assert!(body.contains("<Key>env</Key>"));
3618        assert!(body.contains("<Value>dev</Value>"));
3619    }
3620
3621    #[test]
3622    fn list_tags_rejects_filters_param() {
3623        let svc = make_service();
3624        let arn = seed_instance(&svc, "db1");
3625        let req = request(
3626            "ListTagsForResource",
3627            &[
3628                ("ResourceName", arn.as_str()),
3629                ("Filters.Filter.1.Name", "x"),
3630            ],
3631        );
3632        assert_code(svc.list_tags_for_resource(&req), "InvalidParameterValue");
3633    }
3634
3635    #[test]
3636    fn list_tags_unknown_arn_errors() {
3637        let svc = make_service();
3638        let req = request(
3639            "ListTagsForResource",
3640            &[("ResourceName", "arn:aws:rds:us-east-1:123456789012:db:nope")],
3641        );
3642        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
3643    }
3644
3645    #[test]
3646    fn remove_tags_strips_only_listed_keys() {
3647        let svc = make_service();
3648        let arn = seed_instance(&svc, "db1");
3649        {
3650            let mut __a = svc.state.write();
3651            let state = __a.default_mut();
3652            let inst = state.instances.get_mut("db1").unwrap();
3653            inst.tags = vec![
3654                RdsTag {
3655                    key: "env".to_string(),
3656                    value: "dev".to_string(),
3657                },
3658                RdsTag {
3659                    key: "team".to_string(),
3660                    value: "core".to_string(),
3661                },
3662            ];
3663        }
3664        let req = request(
3665            "RemoveTagsFromResource",
3666            &[("ResourceName", arn.as_str()), ("TagKeys.member.1", "env")],
3667        );
3668        svc.remove_tags_from_resource(&req).unwrap();
3669
3670        let __a = svc.state.read();
3671        let state = __a.default_ref();
3672        let tags = &state.instances.get("db1").unwrap().tags;
3673        assert_eq!(tags.len(), 1);
3674        assert_eq!(tags[0].key, "team");
3675    }
3676
3677    #[test]
3678    fn remove_tags_requires_keys() {
3679        let svc = make_service();
3680        let arn = seed_instance(&svc, "db1");
3681        let req = request("RemoveTagsFromResource", &[("ResourceName", arn.as_str())]);
3682        assert_code(svc.remove_tags_from_resource(&req), "MissingParameter");
3683    }
3684
3685    // ── DB Subnet Groups ─────────────────────────────────────────────
3686
3687    fn create_subnet_group(svc: &RdsService, name: &str) {
3688        let req = request(
3689            "CreateDBSubnetGroup",
3690            &[
3691                ("DBSubnetGroupName", name),
3692                ("DBSubnetGroupDescription", "test"),
3693                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3694                ("SubnetIds.SubnetIdentifier.2", "subnet-bbb"),
3695            ],
3696        );
3697        svc.create_db_subnet_group(&req).unwrap();
3698    }
3699
3700    #[test]
3701    fn create_db_subnet_group_requires_two_subnets() {
3702        let svc = make_service();
3703        let req = request(
3704            "CreateDBSubnetGroup",
3705            &[
3706                ("DBSubnetGroupName", "sg1"),
3707                ("DBSubnetGroupDescription", "t"),
3708                ("SubnetIds.SubnetIdentifier.1", "subnet-aaa"),
3709            ],
3710        );
3711        assert_code(
3712            svc.create_db_subnet_group(&req),
3713            "DBSubnetGroupDoesNotCoverEnoughAZs",
3714        );
3715    }
3716
3717    #[test]
3718    fn create_db_subnet_group_rejects_empty_subnets() {
3719        let svc = make_service();
3720        let req = request(
3721            "CreateDBSubnetGroup",
3722            &[
3723                ("DBSubnetGroupName", "sg1"),
3724                ("DBSubnetGroupDescription", "t"),
3725            ],
3726        );
3727        assert_code(svc.create_db_subnet_group(&req), "InvalidParameterValue");
3728    }
3729
3730    #[test]
3731    fn create_db_subnet_group_rejects_duplicates() {
3732        let svc = make_service();
3733        create_subnet_group(&svc, "sg1");
3734        let req = request(
3735            "CreateDBSubnetGroup",
3736            &[
3737                ("DBSubnetGroupName", "sg1"),
3738                ("DBSubnetGroupDescription", "t"),
3739                ("SubnetIds.SubnetIdentifier.1", "subnet-x"),
3740                ("SubnetIds.SubnetIdentifier.2", "subnet-y"),
3741            ],
3742        );
3743        assert_code(
3744            svc.create_db_subnet_group(&req),
3745            "DBSubnetGroupAlreadyExists",
3746        );
3747    }
3748
3749    #[test]
3750    fn describe_db_subnet_groups_by_name_or_list() {
3751        let svc = make_service();
3752        create_subnet_group(&svc, "sg-alpha");
3753        create_subnet_group(&svc, "sg-beta");
3754
3755        let by_name = request(
3756            "DescribeDBSubnetGroups",
3757            &[("DBSubnetGroupName", "sg-alpha")],
3758        );
3759        let body = body_of(svc.describe_db_subnet_groups(&by_name).unwrap());
3760        assert!(body.contains("sg-alpha"));
3761        assert!(!body.contains("sg-beta"));
3762
3763        let list_all = request("DescribeDBSubnetGroups", &[]);
3764        let body = body_of(svc.describe_db_subnet_groups(&list_all).unwrap());
3765        assert!(body.contains("sg-alpha"));
3766        assert!(body.contains("sg-beta"));
3767    }
3768
3769    #[test]
3770    fn describe_db_subnet_groups_unknown_name_errors() {
3771        let svc = make_service();
3772        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
3773        assert_code(
3774            svc.describe_db_subnet_groups(&req),
3775            "DBSubnetGroupNotFoundFault",
3776        );
3777    }
3778
3779    #[test]
3780    fn delete_db_subnet_group_unknown_errors() {
3781        let svc = make_service();
3782        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
3783        assert_code(
3784            svc.delete_db_subnet_group(&req),
3785            "DBSubnetGroupNotFoundFault",
3786        );
3787    }
3788
3789    #[test]
3790    fn delete_db_subnet_group_removes_entry() {
3791        let svc = make_service();
3792        create_subnet_group(&svc, "sg1");
3793        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "sg1")]);
3794        svc.delete_db_subnet_group(&req).unwrap();
3795        assert!(svc.state.read().default_ref().subnet_groups.is_empty());
3796    }
3797
3798    #[test]
3799    fn modify_db_subnet_group_updates_subnet_ids() {
3800        let svc = make_service();
3801        create_subnet_group(&svc, "sg1");
3802        let req = request(
3803            "ModifyDBSubnetGroup",
3804            &[
3805                ("DBSubnetGroupName", "sg1"),
3806                ("SubnetIds.SubnetIdentifier.1", "subnet-new1"),
3807                ("SubnetIds.SubnetIdentifier.2", "subnet-new2"),
3808            ],
3809        );
3810        svc.modify_db_subnet_group(&req).unwrap();
3811
3812        let __a = svc.state.read();
3813        let state = __a.default_ref();
3814        let sg = state.subnet_groups.get("sg1").unwrap();
3815        assert_eq!(sg.subnet_ids, vec!["subnet-new1", "subnet-new2"]);
3816    }
3817
3818    // ── DB Parameter Groups ──────────────────────────────────────────
3819
3820    fn create_param_group(svc: &RdsService, name: &str) {
3821        let req = request(
3822            "CreateDBParameterGroup",
3823            &[
3824                ("DBParameterGroupName", name),
3825                ("DBParameterGroupFamily", "postgres16"),
3826                ("Description", "test"),
3827            ],
3828        );
3829        svc.create_db_parameter_group(&req).unwrap();
3830    }
3831
3832    #[test]
3833    fn create_db_parameter_group_rejects_unknown_family() {
3834        let svc = make_service();
3835        let req = request(
3836            "CreateDBParameterGroup",
3837            &[
3838                ("DBParameterGroupName", "pg1"),
3839                ("DBParameterGroupFamily", "oracle19"),
3840                ("Description", "t"),
3841            ],
3842        );
3843        assert_code(svc.create_db_parameter_group(&req), "InvalidParameterValue");
3844    }
3845
3846    #[test]
3847    fn create_db_parameter_group_rejects_duplicates() {
3848        let svc = make_service();
3849        create_param_group(&svc, "pg1");
3850        let req = request(
3851            "CreateDBParameterGroup",
3852            &[
3853                ("DBParameterGroupName", "pg1"),
3854                ("DBParameterGroupFamily", "postgres16"),
3855                ("Description", "t"),
3856            ],
3857        );
3858        assert_code(
3859            svc.create_db_parameter_group(&req),
3860            "DBParameterGroupAlreadyExists",
3861        );
3862    }
3863
3864    #[test]
3865    fn describe_db_parameter_groups_by_name_or_list() {
3866        let svc = make_service();
3867        create_param_group(&svc, "pg-alpha");
3868        create_param_group(&svc, "pg-beta");
3869        let by_name = request(
3870            "DescribeDBParameterGroups",
3871            &[("DBParameterGroupName", "pg-alpha")],
3872        );
3873        let body = body_of(svc.describe_db_parameter_groups(&by_name).unwrap());
3874        assert!(body.contains("pg-alpha"));
3875        assert!(!body.contains("pg-beta"));
3876        let list = request("DescribeDBParameterGroups", &[]);
3877        let body = body_of(svc.describe_db_parameter_groups(&list).unwrap());
3878        assert!(body.contains("pg-alpha"));
3879        assert!(body.contains("pg-beta"));
3880    }
3881
3882    #[test]
3883    fn describe_db_parameter_groups_unknown_name_errors() {
3884        let svc = make_service();
3885        let req = request(
3886            "DescribeDBParameterGroups",
3887            &[("DBParameterGroupName", "ghost")],
3888        );
3889        assert_code(
3890            svc.describe_db_parameter_groups(&req),
3891            "DBParameterGroupNotFound",
3892        );
3893    }
3894
3895    #[test]
3896    fn delete_db_parameter_group_rejects_default_groups() {
3897        let svc = make_service();
3898        let req = request(
3899            "DeleteDBParameterGroup",
3900            &[("DBParameterGroupName", "default.postgres16")],
3901        );
3902        assert_code(svc.delete_db_parameter_group(&req), "InvalidParameterValue");
3903    }
3904
3905    #[test]
3906    fn delete_db_parameter_group_unknown_errors() {
3907        let svc = make_service();
3908        let req = request(
3909            "DeleteDBParameterGroup",
3910            &[("DBParameterGroupName", "ghost")],
3911        );
3912        assert_code(
3913            svc.delete_db_parameter_group(&req),
3914            "DBParameterGroupNotFound",
3915        );
3916    }
3917
3918    #[test]
3919    fn delete_db_parameter_group_removes_entry() {
3920        let svc = make_service();
3921        create_param_group(&svc, "pg1");
3922        let req = request("DeleteDBParameterGroup", &[("DBParameterGroupName", "pg1")]);
3923        svc.delete_db_parameter_group(&req).unwrap();
3924        assert!(!svc
3925            .state
3926            .read()
3927            .default_ref()
3928            .parameter_groups
3929            .contains_key("pg1"));
3930    }
3931
3932    #[test]
3933    fn modify_db_parameter_group_updates_description() {
3934        let svc = make_service();
3935        create_param_group(&svc, "pg1");
3936        let req = request(
3937            "ModifyDBParameterGroup",
3938            &[
3939                ("DBParameterGroupName", "pg1"),
3940                ("Description", "shiny new"),
3941            ],
3942        );
3943        svc.modify_db_parameter_group(&req).unwrap();
3944        let __a = svc.state.read();
3945        let state = __a.default_ref();
3946        assert_eq!(
3947            state.parameter_groups.get("pg1").unwrap().description,
3948            "shiny new"
3949        );
3950    }
3951
3952    #[test]
3953    fn modify_db_parameter_group_unknown_errors() {
3954        let svc = make_service();
3955        let req = request(
3956            "ModifyDBParameterGroup",
3957            &[("DBParameterGroupName", "ghost"), ("Description", "x")],
3958        );
3959        assert_code(
3960            svc.modify_db_parameter_group(&req),
3961            "DBParameterGroupNotFound",
3962        );
3963    }
3964
3965    // ── DescribeDBInstances ──────────────────────────────────────────
3966
3967    #[test]
3968    fn describe_db_instances_by_id_returns_only_one() {
3969        let svc = make_service();
3970        seed_instance(&svc, "db1");
3971        seed_instance(&svc, "db2");
3972        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "db1")]);
3973        let body = body_of(svc.describe_db_instances(&req).unwrap());
3974        assert!(body.contains("<DBInstanceIdentifier>db1</DBInstanceIdentifier>"));
3975        assert!(!body.contains("<DBInstanceIdentifier>db2</DBInstanceIdentifier>"));
3976    }
3977
3978    #[test]
3979    fn describe_db_instances_unknown_id_errors() {
3980        let svc = make_service();
3981        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
3982        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
3983    }
3984
3985    #[test]
3986    fn describe_db_instances_lists_all_when_unbounded() {
3987        let svc = make_service();
3988        seed_instance(&svc, "db1");
3989        seed_instance(&svc, "db2");
3990        seed_instance(&svc, "db3");
3991        let req = request("DescribeDBInstances", &[]);
3992        let body = body_of(svc.describe_db_instances(&req).unwrap());
3993        for id in ["db1", "db2", "db3"] {
3994            assert!(body.contains(&format!(
3995                "<DBInstanceIdentifier>{id}</DBInstanceIdentifier>"
3996            )));
3997        }
3998    }
3999
4000    // ── ModifyDBInstance ─────────────────────────────────────────────
4001
4002    #[test]
4003    fn modify_db_instance_requires_at_least_one_change() {
4004        let svc = make_service();
4005        seed_instance(&svc, "db1");
4006        let req = request("ModifyDBInstance", &[("DBInstanceIdentifier", "db1")]);
4007        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4008    }
4009
4010    #[test]
4011    fn modify_db_instance_unknown_errors() {
4012        let svc = make_service();
4013        let req = request(
4014            "ModifyDBInstance",
4015            &[
4016                ("DBInstanceIdentifier", "ghost"),
4017                ("DBInstanceClass", "db.t3.small"),
4018            ],
4019        );
4020        assert_code(svc.modify_db_instance(&req), "DBInstanceNotFound");
4021    }
4022
4023    #[test]
4024    fn modify_db_instance_apply_immediately_updates_class() {
4025        let svc = make_service();
4026        seed_instance(&svc, "db1");
4027        let req = request(
4028            "ModifyDBInstance",
4029            &[
4030                ("DBInstanceIdentifier", "db1"),
4031                ("DBInstanceClass", "db.t3.small"),
4032                ("ApplyImmediately", "true"),
4033            ],
4034        );
4035        svc.modify_db_instance(&req).unwrap();
4036        let __a = svc.state.read();
4037        let state = __a.default_ref();
4038        assert_eq!(
4039            state.instances.get("db1").unwrap().db_instance_class,
4040            "db.t3.small"
4041        );
4042    }
4043
4044    #[test]
4045    fn modify_db_instance_pending_when_not_apply_immediately() {
4046        let svc = make_service();
4047        seed_instance(&svc, "db1");
4048        let req = request(
4049            "ModifyDBInstance",
4050            &[
4051                ("DBInstanceIdentifier", "db1"),
4052                ("DBInstanceClass", "db.t3.small"),
4053                ("ApplyImmediately", "false"),
4054            ],
4055        );
4056        svc.modify_db_instance(&req).unwrap();
4057        let __a = svc.state.read();
4058        let state = __a.default_ref();
4059        let inst = state.instances.get("db1").unwrap();
4060        assert_eq!(inst.db_instance_class, "db.t3.micro");
4061        assert_eq!(
4062            inst.pending_modified_values
4063                .as_ref()
4064                .unwrap()
4065                .db_instance_class
4066                .as_deref(),
4067            Some("db.t3.small"),
4068        );
4069    }
4070
4071    // ── Snapshots (sync ops only) ────────────────────────────────────
4072
4073    fn seed_snapshot(svc: &RdsService, snapshot_id: &str, instance_id: &str) {
4074        let mut __a = svc.state.write();
4075        let state = __a.default_mut();
4076        let arn = state.db_snapshot_arn(snapshot_id);
4077        state.snapshots.insert(
4078            snapshot_id.to_string(),
4079            crate::state::DbSnapshot {
4080                db_snapshot_identifier: snapshot_id.to_string(),
4081                db_snapshot_arn: arn,
4082                db_instance_identifier: instance_id.to_string(),
4083                snapshot_create_time: Utc::now(),
4084                engine: "postgres".to_string(),
4085                engine_version: "16.3".to_string(),
4086                allocated_storage: 20,
4087                status: "available".to_string(),
4088                port: 5432,
4089                master_username: "admin".to_string(),
4090                db_name: Some("appdb".to_string()),
4091                dbi_resource_id: format!("db-{}", Uuid::new_v4().simple()),
4092                snapshot_type: "manual".to_string(),
4093                master_user_password: "secret".to_string(),
4094                tags: Vec::new(),
4095                dump_data: Vec::new(),
4096            },
4097        );
4098    }
4099
4100    #[test]
4101    fn delete_db_snapshot_removes_entry() {
4102        let svc = make_service();
4103        seed_snapshot(&svc, "snap1", "db1");
4104        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "snap1")]);
4105        svc.delete_db_snapshot(&req).unwrap();
4106        assert!(svc.state.read().default_ref().snapshots.is_empty());
4107    }
4108
4109    #[test]
4110    fn delete_db_snapshot_unknown_errors() {
4111        let svc = make_service();
4112        let req = request("DeleteDBSnapshot", &[("DBSnapshotIdentifier", "ghost")]);
4113        assert_code(svc.delete_db_snapshot(&req), "DBSnapshotNotFound");
4114    }
4115
4116    #[test]
4117    fn describe_db_snapshots_rejects_both_filters() {
4118        let svc = make_service();
4119        let req = request(
4120            "DescribeDBSnapshots",
4121            &[("DBSnapshotIdentifier", "s"), ("DBInstanceIdentifier", "i")],
4122        );
4123        assert_code(
4124            svc.describe_db_snapshots(&req),
4125            "InvalidParameterCombination",
4126        );
4127    }
4128
4129    #[test]
4130    fn describe_db_snapshots_by_id_or_instance() {
4131        let svc = make_service();
4132        seed_snapshot(&svc, "snap1", "db1");
4133        seed_snapshot(&svc, "snap2", "db2");
4134
4135        let by_id = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "snap1")]);
4136        let body = body_of(svc.describe_db_snapshots(&by_id).unwrap());
4137        assert!(body.contains("snap1"));
4138        assert!(!body.contains("snap2"));
4139
4140        let by_instance = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "db2")]);
4141        let body = body_of(svc.describe_db_snapshots(&by_instance).unwrap());
4142        assert!(body.contains("snap2"));
4143        assert!(!body.contains("snap1"));
4144
4145        let list_all = request("DescribeDBSnapshots", &[]);
4146        let body = body_of(svc.describe_db_snapshots(&list_all).unwrap());
4147        assert!(body.contains("snap1"));
4148        assert!(body.contains("snap2"));
4149    }
4150
4151    #[test]
4152    fn describe_db_snapshots_unknown_id_errors() {
4153        let svc = make_service();
4154        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "ghost")]);
4155        assert_code(svc.describe_db_snapshots(&req), "DBSnapshotNotFound");
4156    }
4157
4158    // ── Error branch tests ──
4159
4160    #[test]
4161    fn describe_db_instances_not_found() {
4162        let svc = make_service();
4163        let req = request("DescribeDBInstances", &[("DBInstanceIdentifier", "ghost")]);
4164        assert_code(svc.describe_db_instances(&req), "DBInstanceNotFound");
4165    }
4166
4167    #[tokio::test]
4168    async fn delete_db_instance_not_found() {
4169        let svc = make_service();
4170        let req = request(
4171            "DeleteDBInstance",
4172            &[
4173                ("DBInstanceIdentifier", "ghost"),
4174                ("SkipFinalSnapshot", "true"),
4175            ],
4176        );
4177        assert_code(svc.delete_db_instance(&req).await, "DBInstanceNotFound");
4178    }
4179
4180    #[test]
4181    fn modify_db_instance_not_found() {
4182        let svc = make_service();
4183        let req = request(
4184            "ModifyDBInstance",
4185            &[
4186                ("DBInstanceIdentifier", "ghost"),
4187                ("AllocatedStorage", "20"),
4188            ],
4189        );
4190        // Validation fires before existence check
4191        assert_code(svc.modify_db_instance(&req), "InvalidParameterCombination");
4192    }
4193
4194    #[tokio::test]
4195    async fn reboot_db_instance_not_found() {
4196        let svc = make_service();
4197        let req = request("RebootDBInstance", &[("DBInstanceIdentifier", "ghost")]);
4198        assert_code(svc.reboot_db_instance(&req).await, "DBInstanceNotFound");
4199    }
4200
4201    #[tokio::test]
4202    async fn create_db_snapshot_instance_not_found() {
4203        let svc = make_service();
4204        let req = request(
4205            "CreateDBSnapshot",
4206            &[
4207                ("DBInstanceIdentifier", "ghost"),
4208                ("DBSnapshotIdentifier", "snap1"),
4209            ],
4210        );
4211        assert_code(svc.create_db_snapshot(&req).await, "InvalidParameterValue");
4212    }
4213
4214    #[tokio::test]
4215    async fn restore_db_instance_snapshot_not_found() {
4216        let svc = make_service();
4217        let req = request(
4218            "RestoreDBInstanceFromDBSnapshot",
4219            &[
4220                ("DBInstanceIdentifier", "restored"),
4221                ("DBSnapshotIdentifier", "ghost-snap"),
4222            ],
4223        );
4224        assert_code(
4225            svc.restore_db_instance_from_db_snapshot(&req).await,
4226            "InvalidParameterValue",
4227        );
4228    }
4229
4230    #[tokio::test]
4231    async fn create_db_instance_read_replica_source_not_found() {
4232        let svc = make_service();
4233        let req = request(
4234            "CreateDBInstanceReadReplica",
4235            &[
4236                ("DBInstanceIdentifier", "replica"),
4237                ("SourceDBInstanceIdentifier", "ghost"),
4238            ],
4239        );
4240        assert_code(
4241            svc.create_db_instance_read_replica(&req).await,
4242            "InvalidParameterValue",
4243        );
4244    }
4245
4246    #[test]
4247    fn describe_db_engine_versions_basic() {
4248        let svc = make_service();
4249        let req = request("DescribeDBEngineVersions", &[]);
4250        let resp = svc.describe_db_engine_versions(&req).unwrap();
4251        let body = body_of(resp);
4252        assert!(body.contains("<DBEngineVersions>"));
4253    }
4254
4255    #[test]
4256    fn describe_orderable_db_instance_options_basic() {
4257        let svc = make_service();
4258        let req = request("DescribeOrderableDBInstanceOptions", &[("Engine", "mysql")]);
4259        let resp = svc.describe_orderable_db_instance_options(&req).unwrap();
4260        let body = body_of(resp);
4261        assert!(body.contains("<OrderableDBInstanceOptions>"));
4262    }
4263
4264    #[test]
4265    fn describe_db_parameter_group_not_found() {
4266        let svc = make_service();
4267        let req = request(
4268            "DescribeDBParameterGroups",
4269            &[("DBParameterGroupName", "ghost")],
4270        );
4271        assert_code(
4272            svc.describe_db_parameter_groups(&req),
4273            "DBParameterGroupNotFound",
4274        );
4275    }
4276
4277    #[test]
4278    fn delete_db_parameter_group_not_found() {
4279        let svc = make_service();
4280        let req = request(
4281            "DeleteDBParameterGroup",
4282            &[("DBParameterGroupName", "ghost")],
4283        );
4284        assert_code(
4285            svc.delete_db_parameter_group(&req),
4286            "DBParameterGroupNotFound",
4287        );
4288    }
4289
4290    #[test]
4291    fn describe_db_subnet_group_not_found() {
4292        let svc = make_service();
4293        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4294        assert_code(
4295            svc.describe_db_subnet_groups(&req),
4296            "DBSubnetGroupNotFoundFault",
4297        );
4298    }
4299
4300    #[test]
4301    fn delete_db_subnet_group_not_found() {
4302        let svc = make_service();
4303        let req = request("DeleteDBSubnetGroup", &[("DBSubnetGroupName", "ghost")]);
4304        assert_code(
4305            svc.delete_db_subnet_group(&req),
4306            "DBSubnetGroupNotFoundFault",
4307        );
4308    }
4309
4310    #[test]
4311    fn add_tags_resource_not_found() {
4312        let svc = make_service();
4313        let req = request(
4314            "AddTagsToResource",
4315            &[
4316                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4317                ("Tags.member.1.Key", "k"),
4318                ("Tags.member.1.Value", "v"),
4319            ],
4320        );
4321        assert_code(svc.add_tags_to_resource(&req), "MissingParameter");
4322    }
4323
4324    #[test]
4325    fn list_tags_resource_not_found() {
4326        let svc = make_service();
4327        let req = request(
4328            "ListTagsForResource",
4329            &[("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost")],
4330        );
4331        assert_code(svc.list_tags_for_resource(&req), "DBInstanceNotFound");
4332    }
4333
4334    // ── snapshot operations ──
4335
4336    #[tokio::test]
4337    async fn create_db_snapshot_missing_id_errors() {
4338        let svc = make_service();
4339        let req = request(
4340            "CreateDBSnapshot",
4341            &[("DBInstanceIdentifier", "nonexistent")],
4342        );
4343        assert_code(svc.create_db_snapshot(&req).await, "MissingParameter");
4344    }
4345
4346    #[tokio::test]
4347    async fn create_db_snapshot_unknown_instance_errors() {
4348        let svc = make_service();
4349        let req = request(
4350            "CreateDBSnapshot",
4351            &[
4352                ("DBSnapshotIdentifier", "snap1"),
4353                ("DBInstanceIdentifier", "ghost"),
4354            ],
4355        );
4356        assert!(svc.create_db_snapshot(&req).await.is_err());
4357    }
4358
4359    // ── delete_db_instance ──
4360
4361    #[tokio::test]
4362    async fn delete_db_instance_missing_id_errors() {
4363        let svc = make_service();
4364        let req = request("DeleteDBInstance", &[]);
4365        assert_code(svc.delete_db_instance(&req).await, "MissingParameter");
4366    }
4367
4368    // ── reboot_db_instance ──
4369
4370    #[tokio::test]
4371    async fn reboot_db_instance_missing_id_errors() {
4372        let svc = make_service();
4373        let req = request("RebootDBInstance", &[]);
4374        assert_code(svc.reboot_db_instance(&req).await, "MissingParameter");
4375    }
4376
4377    // ── create_db_instance validation ──
4378
4379    #[tokio::test]
4380    async fn create_db_instance_missing_id_errors() {
4381        let svc = make_service();
4382        let req = request(
4383            "CreateDBInstance",
4384            &[
4385                ("Engine", "postgres"),
4386                ("DBInstanceClass", "db.t3.micro"),
4387                ("AllocatedStorage", "20"),
4388                ("MasterUsername", "admin"),
4389                ("MasterUserPassword", "secretpass"),
4390            ],
4391        );
4392        assert!(svc.create_db_instance(&req).await.is_err());
4393    }
4394
4395    #[tokio::test]
4396    async fn create_db_instance_unsupported_engine_errors() {
4397        let svc = make_service();
4398        let req = request(
4399            "CreateDBInstance",
4400            &[
4401                ("DBInstanceIdentifier", "db1"),
4402                ("Engine", "mongodb"),
4403                ("DBInstanceClass", "db.t3.micro"),
4404                ("AllocatedStorage", "20"),
4405                ("MasterUsername", "admin"),
4406                ("MasterUserPassword", "secretpass"),
4407            ],
4408        );
4409        assert!(svc.create_db_instance(&req).await.is_err());
4410    }
4411
4412    // ── restore_db_instance_from_db_snapshot ──
4413
4414    #[tokio::test]
4415    async fn restore_db_instance_missing_ids_errors() {
4416        let svc = make_service();
4417        let req = request("RestoreDBInstanceFromDBSnapshot", &[]);
4418        assert!(svc
4419            .restore_db_instance_from_db_snapshot(&req)
4420            .await
4421            .is_err());
4422    }
4423
4424    #[tokio::test]
4425    async fn restore_db_instance_unknown_snapshot_errors() {
4426        let svc = make_service();
4427        let req = request(
4428            "RestoreDBInstanceFromDBSnapshot",
4429            &[
4430                ("DBInstanceIdentifier", "restored"),
4431                ("DBSnapshotIdentifier", "missing"),
4432            ],
4433        );
4434        assert!(svc
4435            .restore_db_instance_from_db_snapshot(&req)
4436            .await
4437            .is_err());
4438    }
4439
4440    // ── create_db_instance_read_replica ──
4441
4442    #[tokio::test]
4443    async fn create_read_replica_missing_source_errors() {
4444        let svc = make_service();
4445        let req = request(
4446            "CreateDBInstanceReadReplica",
4447            &[("DBInstanceIdentifier", "replica1")],
4448        );
4449        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4450    }
4451
4452    #[tokio::test]
4453    async fn create_read_replica_unknown_source_errors() {
4454        let svc = make_service();
4455        let req = request(
4456            "CreateDBInstanceReadReplica",
4457            &[
4458                ("DBInstanceIdentifier", "replica1"),
4459                ("SourceDBInstanceIdentifier", "ghost"),
4460            ],
4461        );
4462        assert!(svc.create_db_instance_read_replica(&req).await.is_err());
4463    }
4464
4465    // ── describe_db_snapshots with filters ──
4466
4467    #[test]
4468    fn describe_db_snapshots_by_snapshot_id_only() {
4469        let svc = make_service();
4470        seed_snapshot(&svc, "s1", "inst1");
4471        let req = request("DescribeDBSnapshots", &[("DBSnapshotIdentifier", "s1")]);
4472        let resp = svc.describe_db_snapshots(&req).unwrap();
4473        let b = body_of(resp);
4474        assert!(b.contains("<DBSnapshotIdentifier>s1</DBSnapshotIdentifier>"));
4475    }
4476
4477    #[test]
4478    fn describe_db_snapshots_by_instance_id_returns_matching() {
4479        let svc = make_service();
4480        seed_snapshot(&svc, "s1", "inst1");
4481        seed_snapshot(&svc, "s2", "inst2");
4482        let req = request("DescribeDBSnapshots", &[("DBInstanceIdentifier", "inst1")]);
4483        let resp = svc.describe_db_snapshots(&req).unwrap();
4484        let b = body_of(resp);
4485        assert!(b.contains("s1"));
4486        assert!(!b.contains("<DBSnapshotIdentifier>s2</DBSnapshotIdentifier>"));
4487    }
4488
4489    // ── modify_db_parameter_group ──
4490
4491    #[test]
4492    fn modify_db_parameter_group_missing_name() {
4493        let svc = make_service();
4494        let req = request("ModifyDBParameterGroup", &[]);
4495        assert!(svc.modify_db_parameter_group(&req).is_err());
4496    }
4497
4498    // ── modify_db_subnet_group ──
4499
4500    #[test]
4501    fn modify_db_subnet_group_unknown_errors() {
4502        let svc = make_service();
4503        let req = request(
4504            "ModifyDBSubnetGroup",
4505            &[
4506                ("DBSubnetGroupName", "ghost"),
4507                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4508                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4509            ],
4510        );
4511        assert!(svc.modify_db_subnet_group(&req).is_err());
4512    }
4513
4514    // ── describe_db_instances ──
4515
4516    #[test]
4517    fn describe_db_instances_empty_returns_xml() {
4518        let svc = make_service();
4519        let req = request("DescribeDBInstances", &[]);
4520        let resp = svc.describe_db_instances(&req).unwrap();
4521        let b = body_of(resp);
4522        assert!(b.contains("DescribeDBInstancesResult"));
4523    }
4524
4525    #[test]
4526    fn describe_db_snapshots_empty_returns_empty_list() {
4527        let svc = make_service();
4528        let req = request("DescribeDBSnapshots", &[]);
4529        let resp = svc.describe_db_snapshots(&req).unwrap();
4530        let b = body_of(resp);
4531        assert!(b.contains("DescribeDBSnapshotsResult"));
4532    }
4533
4534    #[test]
4535    fn add_tags_unknown_resource_errors() {
4536        let svc = make_service();
4537        let req = request(
4538            "AddTagsToResource",
4539            &[
4540                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4541                ("Tags.member.1.Key", "k"),
4542                ("Tags.member.1.Value", "v"),
4543            ],
4544        );
4545        assert!(svc.add_tags_to_resource(&req).is_err());
4546    }
4547
4548    #[test]
4549    fn remove_tags_unknown_resource_errors() {
4550        let svc = make_service();
4551        let req = request(
4552            "RemoveTagsFromResource",
4553            &[
4554                ("ResourceName", "arn:aws:rds:us-east-1:123:db:ghost"),
4555                ("TagKeys.member.1", "k"),
4556            ],
4557        );
4558        assert!(svc.remove_tags_from_resource(&req).is_err());
4559    }
4560
4561    #[test]
4562    fn create_db_parameter_group_missing_name_errors() {
4563        let svc = make_service();
4564        let req = request(
4565            "CreateDBParameterGroup",
4566            &[
4567                ("DBParameterGroupFamily", "postgres16"),
4568                ("Description", "d"),
4569            ],
4570        );
4571        assert!(svc.create_db_parameter_group(&req).is_err());
4572    }
4573
4574    #[test]
4575    fn create_db_subnet_group_missing_desc_errors() {
4576        let svc = make_service();
4577        let req = request(
4578            "CreateDBSubnetGroup",
4579            &[
4580                ("DBSubnetGroupName", "sg1"),
4581                ("SubnetIds.SubnetIdentifier.1", "subnet-a"),
4582                ("SubnetIds.SubnetIdentifier.2", "subnet-b"),
4583            ],
4584        );
4585        assert!(svc.create_db_subnet_group(&req).is_err());
4586    }
4587
4588    #[tokio::test]
4589    async fn create_db_instance_missing_class_errors() {
4590        let svc = make_service();
4591        let req = request(
4592            "CreateDBInstance",
4593            &[
4594                ("DBInstanceIdentifier", "miss-class"),
4595                ("Engine", "postgres"),
4596                ("AllocatedStorage", "20"),
4597                ("MasterUsername", "admin"),
4598                ("MasterUserPassword", "secretpass"),
4599            ],
4600        );
4601        assert!(svc.create_db_instance(&req).await.is_err());
4602    }
4603
4604    #[tokio::test]
4605    async fn create_db_instance_missing_master_username_errors() {
4606        let svc = make_service();
4607        let req = request(
4608            "CreateDBInstance",
4609            &[
4610                ("DBInstanceIdentifier", "miss-mu"),
4611                ("Engine", "postgres"),
4612                ("DBInstanceClass", "db.t3.micro"),
4613                ("AllocatedStorage", "20"),
4614                ("MasterUserPassword", "secretpass"),
4615            ],
4616        );
4617        assert!(svc.create_db_instance(&req).await.is_err());
4618    }
4619
4620    #[test]
4621    fn modify_db_instance_missing_id_errors() {
4622        let svc = make_service();
4623        let req = request("ModifyDBInstance", &[]);
4624        assert!(svc.modify_db_instance(&req).is_err());
4625    }
4626
4627    #[test]
4628    fn modify_db_parameter_group_unknown_pg_errors() {
4629        let svc = make_service();
4630        let req = request(
4631            "ModifyDBParameterGroup",
4632            &[
4633                ("DBParameterGroupName", "ghost"),
4634                ("Parameters.member.1.ParameterName", "p"),
4635                ("Parameters.member.1.ParameterValue", "v"),
4636                ("Parameters.member.1.ApplyMethod", "immediate"),
4637            ],
4638        );
4639        assert!(svc.modify_db_parameter_group(&req).is_err());
4640    }
4641
4642    #[test]
4643    fn describe_db_parameter_groups_unknown_errors() {
4644        let svc = make_service();
4645        let req = request(
4646            "DescribeDBParameterGroups",
4647            &[("DBParameterGroupName", "ghost")],
4648        );
4649        assert!(svc.describe_db_parameter_groups(&req).is_err());
4650    }
4651
4652    #[test]
4653    fn describe_db_subnet_groups_unknown_errors() {
4654        let svc = make_service();
4655        let req = request("DescribeDBSubnetGroups", &[("DBSubnetGroupName", "ghost")]);
4656        assert!(svc.describe_db_subnet_groups(&req).is_err());
4657    }
4658}